Torque3D Documentation / _generateds / threadPoolAsyncIO.h

threadPoolAsyncIO.h

Engine/source/platform/threads/threadPoolAsyncIO.h

Thread pool work items for asynchronous stream I/O.

More...

Classes:

class

Abstract superclass of async I/O work items.

class

Work item to asynchronously read from a stream.

class

Work item for writing to an output stream.

Detailed Description

Thread pool work items for asynchronous stream I/O.

Through the use of stream filters, this can be basically used for any type of asynchronous stream processing.

  1
  2//-----------------------------------------------------------------------------
  3// Copyright (c) 2012 GarageGames, LLC
  4//
  5// Permission is hereby granted, free of charge, to any person obtaining a copy
  6// of this software and associated documentation files (the "Software"), to
  7// deal in the Software without restriction, including without limitation the
  8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9// sell copies of the Software, and to permit persons to whom the Software is
 10// furnished to do so, subject to the following conditions:
 11//
 12// The above copyright notice and this permission notice shall be included in
 13// all copies or substantial portions of the Software.
 14//
 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 21// IN THE SOFTWARE.
 22//-----------------------------------------------------------------------------
 23
 24#ifndef _THREADPOOLASYNCIO_H_
 25#define _THREADPOOLASYNCIO_H_
 26
 27#ifndef _THREADPOOL_H_
 28#  include "platform/threads/threadPool.h"
 29#endif
 30#ifndef _RAWDATA_H_
 31#  include "core/util/rawData.h"
 32#endif
 33#ifndef _TSTREAM_H_
 34#  include "core/stream/tStream.h"
 35#endif
 36
 37
 38//RDTODO: I/O error handling
 39
 40/// @file
 41/// Thread pool work items for asynchronous stream I/O.
 42/// Through the use of stream filters, this can be basically used for any
 43/// type of asynchronous stream processing.
 44
 45//--------------------------------------------------------------------------
 46//    AsyncIOItem.
 47//--------------------------------------------------------------------------
 48
 49/// Abstract superclass of async I/O work items.
 50///
 51/// Supports both offset-based stream I/O as well as I/O on streams with
 52/// implicit positions.  Note that if you use the latter type, make sure
 53/// that no other thread is messing with the stream at the same time or
 54/// chaos will ensue.
 55///
 56/// @param T Type of elements being streamed.
 57template< typename T, class Stream >
 58class AsyncIOItem : public ThreadPool::WorkItem
 59{
 60   public:
 61
 62      typedef WorkItem Parent;
 63      typedef T ValueType;
 64      typedef RawDataT< ValueType> BufferType;
 65      typedef U32 OffsetType;
 66      typedef Stream StreamType;
 67
 68   protected:
 69
 70      /// Buffer keeping/receiving the data elements.
 71      BufferType mBuffer;
 72      
 73      /// The stream to read from/write to.
 74      StreamType* mStream;
 75
 76      /// Number of elements to read from/write to the stream.
 77      U32 mNumElements;
 78
 79      /// Offset in "mBuffer" from where to read/where to start writing to.
 80      U32 mOffsetInBuffer;
 81
 82      /// Offset in stream from where to read/where to write to.
 83      /// @note This is only meaningful if the stream is an offset I/O
 84      ///   stream.  For a stream that is can do both types of I/O,
 85      ///   explicit offsets are preferred and this value is used.
 86      OffsetType mOffsetInStream;
 87
 88      ///
 89      ValueType* getBufferPtr()
 90      {
 91         return &getBuffer().data[ getOffsetInBuffer() ]; 
 92      }
 93
 94   public:
 95   
 96      ///
 97      /// If the stream uses implicit positioning, then the supplied "offsetInStream"
 98      /// is meaningless and ignored.
 99      AsyncIOItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
100                   ThreadContext* context = 0 )
101         : Parent( context ),
102           mStream( stream ),
103           mNumElements( numElements ),
104           mOffsetInBuffer( 0 ),
105           mOffsetInStream( offsetInStream ) {}
106
107      /// Construct a read item on "stream" that stores data into the given "buffer".
108      ///
109      AsyncIOItem( StreamType* stream, BufferType& buffer, U32 offsetInBuffer,
110                   U32 numElements, OffsetType offsetInStream, bool takeOwnershipOfBuffer = true,
111                   ThreadContext* context = 0 )
112         : Parent( context ),
113           mBuffer( buffer ),
114           mStream( stream ),
115           mNumElements( numElements ),
116           mOffsetInBuffer( offsetInBuffer ),
117           mOffsetInStream( offsetInStream )
118      {
119         if( takeOwnershipOfBuffer )
120            mBuffer.ownMemory = true;
121      }
122
123      /// Return the stream being written to/read from.
124      StreamType* getStream()
125      {
126         return mStream;
127      }
128
129      /// Return the data buffer being written to/read from.
130      /// @note This may not yet have been allocated.
131      BufferType& getBuffer()
132      {
133         return mBuffer;
134
135      }
136
137      /// Return the number of elements involved in the transfer.
138      U32 getNumElements()
139      {
140         return mNumElements;
141      }
142
143      /// Return the position in the data buffer at which to start the transfer.
144      U32 getOffsetInBuffer()
145      {
146         return mOffsetInBuffer;
147      }
148
149      /// Return the position in the stream at which to start the transfer.
150      /// @note Only meaningful for streams that support offset I/O.
151      OffsetType getOffsetInStream()
152      {
153         return mOffsetInStream;
154      }
155};
156
157//--------------------------------------------------------------------------
158//    AsyncReadItem.
159//--------------------------------------------------------------------------
160
161//RDTODO: error handling
162/// Work item to asynchronously read from a stream.
163///
164/// The given stream type may implement any of the input stream
165/// interfaces.  Preference is given to IAsyncInputStream, then to
166/// IOffsetInputStream, and only if none of these are implemented
167/// IInputStream is used.
168///
169/// For IAsyncInputStreams, the async read operation is issued immediately
170/// on the constructing thread and then picked up on the worker thread.
171/// This ensures optimal use of concurrency.
172
173template< typename T, class Stream = IOffsetInputStream< T > >
174class AsyncReadItem : public AsyncIOItem< T, Stream >
175{
176   public:
177
178      typedef AsyncIOItem< T, Stream> Parent;
179      typedef typename Parent::StreamType StreamType;
180      typedef typename Parent::OffsetType OffsetType;
181      typedef typename Parent::BufferType BufferType;
182      typedef typename Parent::ValueType ValueType;
183
184      /// Construct a read item that reads "numElements" at "offsetInStream"
185      /// from "stream".
186      ///
187      /// Since with this constructor no data buffer is supplied, it will be
188      /// dynamically allocated by the read() method.  Note that this only makes
189      /// sense if this class is subclassed and processing is done on the buffer
190      /// after it has been read.
191      ///
192      /// @param stream The stream to read from.
193      /// @param numElement The number of elements to read from the stream.
194      /// @param offsetInStream The offset at which to read from the stream;
195      ///   ignored if the stream uses implicit positioning
196      /// @param context The tread pool context to place the item into.
197      AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
198                     ThreadContext* context = 0 )
199         : Parent( stream, numElements, offsetInStream, context )
200      {
201         _prep();
202      }
203
204      AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
205                     BufferType& buffer, bool takeOwnershipOfBuffer = false,
206                     U32 offsetInBuffer = 0, ThreadContext* context = 0 )
207         : Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
208      {
209         _prep();
210      }
211
212      /// @return The number of elements actually read from the stream.
213      U32 getNumElementsRead()
214      {
215         return mNumElementsRead;
216      }
217
218   protected:
219
220      /// Handle of asynchronous stream read, if we are using an async interface.
221      void* mAsyncHandle;
222
223      /// After the read operation has completed, this holds the number of
224      /// elements actually read from the stream.
225      U32 mNumElementsRead;
226
227      virtual void execute();
228      
229      void _allocBuffer()
230      {
231         if( !this->getBuffer().data )
232            this->getBuffer().alloc( this->getNumElements() );
233      }
234
235      void _prep()
236      {
237         IAsyncInputStream< T>* s = dynamic_cast< IAsyncInputStream< T>* >( this->getStream() );
238         if( s )
239         {
240            _allocBuffer();
241            mAsyncHandle = s->issueReadAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
242         }
243      }
244
245      // Helper functions to differentiate between stream types.
246
247      void _read( IInputStream< T>* stream )
248      {
249         mNumElementsRead = stream->read( this->getBufferPtr(), this->getNumElements() );
250      }
251      void _read( IOffsetInputStream< T>* stream )
252      {
253         mNumElementsRead = stream->readAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
254      }
255      void _read( IAsyncInputStream< T>* stream )
256      {
257         stream->tryCompleteReadAt( mAsyncHandle, mNumElementsRead, true );
258      }
259};
260
261template< typename T, class Stream >
262void AsyncReadItem< T, Stream >::execute()
263{
264   _allocBuffer();
265   
266   // Read the data.  Do a dynamic cast for any of the
267   // interfaces we prefer.
268
269   if( this->cancellationPoint() ) return;
270   StreamType* stream = this->getStream();
271   if( dynamic_cast< IAsyncInputStream< T>* >( stream ) )
272      _read( ( IAsyncInputStream< T>* ) stream );
273   else if( dynamic_cast< IOffsetInputStream< T>* >( stream ) )
274      _read( ( IOffsetInputStream< T>* ) stream );
275   else
276      _read( stream );
277}
278
279//--------------------------------------------------------------------------
280//    AsyncWriteItem.
281//--------------------------------------------------------------------------
282
283/// Work item for writing to an output stream.
284///
285/// The stream being written to may implement any of the given output stream
286/// interfaces.  Preference is given to IAsyncOutputStream, then to
287/// IOffsetOutputStream, and only if none of these is implemented IOutputStream
288/// is used.
289///
290/// A useful feature is to yield ownership of the data buffer to the
291/// write item.  This way, this can be pretty much used in a fire-and-forget
292/// manner where after submission, no further synchronization happens
293/// between the client and the work item.
294///
295/// @note Be aware that if writing to an output stream that has an implicit
296///   position property, multiple concurrent writes will interfere with each other.
297template< typename T, class Stream = IOffsetOutputStream< T > >
298class AsyncWriteItem : public AsyncIOItem< T, Stream >
299{
300   public:
301
302      typedef AsyncIOItem< T, Stream> Parent;
303      typedef typename Parent::StreamType StreamType;
304      typedef typename Parent::OffsetType OffsetType;
305      typedef typename Parent::BufferType BufferType;
306      typedef typename Parent::ValueType ValueType;
307
308      AsyncWriteItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
309                      BufferType& buffer, bool takeOwnershipOfBuffer = true,
310                      U32 offsetInBuffer = 0, ThreadContext* context = 0 )
311         : Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
312      {
313         _prep( stream );
314      }
315
316   protected:
317
318      /// Handle of asynchronous write operation, if the stream implements IAsyncOutputStream.
319      void* mAsyncHandle;
320      
321      virtual void execute();
322
323      void _prep( StreamType* stream )
324      {
325         IAsyncOutputStream< T>* s = dynamic_cast< IAsyncOutputStream< T>* >( stream );
326         if( s )
327            mAsyncHandle = s->issueWriteAt( this->getOffset(), this->getBufferPtr(), this->getNumElements() );
328      }
329
330      void _write( IOutputStream< T>* stream )
331      {
332         stream->write( this->getBufferPtr(), this->getNumElements() );
333      }
334      void _write( IOffsetOutputStream< T>* stream )
335      {
336         stream->writeAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
337      }
338      void _write( IAsyncOutputStream< T>* stream )
339      {
340         stream->tryCompleteWriteAt( mAsyncHandle, true );
341      }
342};
343
344template< typename T, class Stream >
345void AsyncWriteItem< T, Stream >::execute()
346{
347   if( this->cancellationPoint() ) return;
348
349   StreamType* stream = this->getStream();
350   if( dynamic_cast< IAsyncOutputStream< T>* >( stream ) )
351      _write( ( IAsyncOutputStream< T>* ) stream );
352   if( dynamic_cast< IOffsetOutputStream< T>* >( stream ) )
353      _write( ( IOffsetOutputStream< T>* ) stream );
354   else
355      _write( stream );
356}
357
358#endif // _THREADPOOLASYNCIO_H_
359