threadPoolAsyncIO.h
Engine/source/platform/threads/threadPoolAsyncIO.h
Thread pool work items for asynchronous stream I/O.
Classes:
class
Abstract superclass of async I/O work items.
class
Work item to asynchronously read from a stream.
class
Work item for writing to an output stream.
Detailed Description
Thread pool work items for asynchronous stream I/O.
Through the use of stream filters, this can be basically used for any type of asynchronous stream processing.
1 2//----------------------------------------------------------------------------- 3// Copyright (c) 2012 GarageGames, LLC 4// 5// Permission is hereby granted, free of charge, to any person obtaining a copy 6// of this software and associated documentation files (the "Software"), to 7// deal in the Software without restriction, including without limitation the 8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9// sell copies of the Software, and to permit persons to whom the Software is 10// furnished to do so, subject to the following conditions: 11// 12// The above copyright notice and this permission notice shall be included in 13// all copies or substantial portions of the Software. 14// 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21// IN THE SOFTWARE. 22//----------------------------------------------------------------------------- 23 24#ifndef _THREADPOOLASYNCIO_H_ 25#define _THREADPOOLASYNCIO_H_ 26 27#ifndef _THREADPOOL_H_ 28# include "platform/threads/threadPool.h" 29#endif 30#ifndef _RAWDATA_H_ 31# include "core/util/rawData.h" 32#endif 33#ifndef _TSTREAM_H_ 34# include "core/stream/tStream.h" 35#endif 36 37 38//RDTODO: I/O error handling 39 40/// @file 41/// Thread pool work items for asynchronous stream I/O. 42/// Through the use of stream filters, this can be basically used for any 43/// type of asynchronous stream processing. 44 45//-------------------------------------------------------------------------- 46// AsyncIOItem. 47//-------------------------------------------------------------------------- 48 49/// Abstract superclass of async I/O work items. 50/// 51/// Supports both offset-based stream I/O as well as I/O on streams with 52/// implicit positions. Note that if you use the latter type, make sure 53/// that no other thread is messing with the stream at the same time or 54/// chaos will ensue. 55/// 56/// @param T Type of elements being streamed. 57template< typename T, class Stream > 58class AsyncIOItem : public ThreadPool::WorkItem 59{ 60 public: 61 62 typedef WorkItem Parent; 63 typedef T ValueType; 64 typedef RawDataT< ValueType> BufferType; 65 typedef U32 OffsetType; 66 typedef Stream StreamType; 67 68 protected: 69 70 /// Buffer keeping/receiving the data elements. 71 BufferType mBuffer; 72 73 /// The stream to read from/write to. 74 StreamType* mStream; 75 76 /// Number of elements to read from/write to the stream. 77 U32 mNumElements; 78 79 /// Offset in "mBuffer" from where to read/where to start writing to. 80 U32 mOffsetInBuffer; 81 82 /// Offset in stream from where to read/where to write to. 83 /// @note This is only meaningful if the stream is an offset I/O 84 /// stream. For a stream that is can do both types of I/O, 85 /// explicit offsets are preferred and this value is used. 86 OffsetType mOffsetInStream; 87 88 /// 89 ValueType* getBufferPtr() 90 { 91 return &getBuffer().data[ getOffsetInBuffer() ]; 92 } 93 94 public: 95 96 /// 97 /// If the stream uses implicit positioning, then the supplied "offsetInStream" 98 /// is meaningless and ignored. 99 AsyncIOItem( StreamType* stream, U32 numElements, OffsetType offsetInStream, 100 ThreadContext* context = 0 ) 101 : Parent( context ), 102 mStream( stream ), 103 mNumElements( numElements ), 104 mOffsetInBuffer( 0 ), 105 mOffsetInStream( offsetInStream ) {} 106 107 /// Construct a read item on "stream" that stores data into the given "buffer". 108 /// 109 AsyncIOItem( StreamType* stream, BufferType& buffer, U32 offsetInBuffer, 110 U32 numElements, OffsetType offsetInStream, bool takeOwnershipOfBuffer = true, 111 ThreadContext* context = 0 ) 112 : Parent( context ), 113 mBuffer( buffer ), 114 mStream( stream ), 115 mNumElements( numElements ), 116 mOffsetInBuffer( offsetInBuffer ), 117 mOffsetInStream( offsetInStream ) 118 { 119 if( takeOwnershipOfBuffer ) 120 mBuffer.ownMemory = true; 121 } 122 123 /// Return the stream being written to/read from. 124 StreamType* getStream() 125 { 126 return mStream; 127 } 128 129 /// Return the data buffer being written to/read from. 130 /// @note This may not yet have been allocated. 131 BufferType& getBuffer() 132 { 133 return mBuffer; 134 135 } 136 137 /// Return the number of elements involved in the transfer. 138 U32 getNumElements() 139 { 140 return mNumElements; 141 } 142 143 /// Return the position in the data buffer at which to start the transfer. 144 U32 getOffsetInBuffer() 145 { 146 return mOffsetInBuffer; 147 } 148 149 /// Return the position in the stream at which to start the transfer. 150 /// @note Only meaningful for streams that support offset I/O. 151 OffsetType getOffsetInStream() 152 { 153 return mOffsetInStream; 154 } 155}; 156 157//-------------------------------------------------------------------------- 158// AsyncReadItem. 159//-------------------------------------------------------------------------- 160 161//RDTODO: error handling 162/// Work item to asynchronously read from a stream. 163/// 164/// The given stream type may implement any of the input stream 165/// interfaces. Preference is given to IAsyncInputStream, then to 166/// IOffsetInputStream, and only if none of these are implemented 167/// IInputStream is used. 168/// 169/// For IAsyncInputStreams, the async read operation is issued immediately 170/// on the constructing thread and then picked up on the worker thread. 171/// This ensures optimal use of concurrency. 172 173template< typename T, class Stream = IOffsetInputStream< T > > 174class AsyncReadItem : public AsyncIOItem< T, Stream > 175{ 176 public: 177 178 typedef AsyncIOItem< T, Stream> Parent; 179 typedef typename Parent::StreamType StreamType; 180 typedef typename Parent::OffsetType OffsetType; 181 typedef typename Parent::BufferType BufferType; 182 typedef typename Parent::ValueType ValueType; 183 184 /// Construct a read item that reads "numElements" at "offsetInStream" 185 /// from "stream". 186 /// 187 /// Since with this constructor no data buffer is supplied, it will be 188 /// dynamically allocated by the read() method. Note that this only makes 189 /// sense if this class is subclassed and processing is done on the buffer 190 /// after it has been read. 191 /// 192 /// @param stream The stream to read from. 193 /// @param numElement The number of elements to read from the stream. 194 /// @param offsetInStream The offset at which to read from the stream; 195 /// ignored if the stream uses implicit positioning 196 /// @param context The tread pool context to place the item into. 197 AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream, 198 ThreadContext* context = 0 ) 199 : Parent( stream, numElements, offsetInStream, context ) 200 { 201 _prep(); 202 } 203 204 AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream, 205 BufferType& buffer, bool takeOwnershipOfBuffer = false, 206 U32 offsetInBuffer = 0, ThreadContext* context = 0 ) 207 : Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context ) 208 { 209 _prep(); 210 } 211 212 /// @return The number of elements actually read from the stream. 213 U32 getNumElementsRead() 214 { 215 return mNumElementsRead; 216 } 217 218 protected: 219 220 /// Handle of asynchronous stream read, if we are using an async interface. 221 void* mAsyncHandle; 222 223 /// After the read operation has completed, this holds the number of 224 /// elements actually read from the stream. 225 U32 mNumElementsRead; 226 227 virtual void execute(); 228 229 void _allocBuffer() 230 { 231 if( !this->getBuffer().data ) 232 this->getBuffer().alloc( this->getNumElements() ); 233 } 234 235 void _prep() 236 { 237 IAsyncInputStream< T>* s = dynamic_cast< IAsyncInputStream< T>* >( this->getStream() ); 238 if( s ) 239 { 240 _allocBuffer(); 241 mAsyncHandle = s->issueReadAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() ); 242 } 243 } 244 245 // Helper functions to differentiate between stream types. 246 247 void _read( IInputStream< T>* stream ) 248 { 249 mNumElementsRead = stream->read( this->getBufferPtr(), this->getNumElements() ); 250 } 251 void _read( IOffsetInputStream< T>* stream ) 252 { 253 mNumElementsRead = stream->readAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() ); 254 } 255 void _read( IAsyncInputStream< T>* stream ) 256 { 257 stream->tryCompleteReadAt( mAsyncHandle, mNumElementsRead, true ); 258 } 259}; 260 261template< typename T, class Stream > 262void AsyncReadItem< T, Stream >::execute() 263{ 264 _allocBuffer(); 265 266 // Read the data. Do a dynamic cast for any of the 267 // interfaces we prefer. 268 269 if( this->cancellationPoint() ) return; 270 StreamType* stream = this->getStream(); 271 if( dynamic_cast< IAsyncInputStream< T>* >( stream ) ) 272 _read( ( IAsyncInputStream< T>* ) stream ); 273 else if( dynamic_cast< IOffsetInputStream< T>* >( stream ) ) 274 _read( ( IOffsetInputStream< T>* ) stream ); 275 else 276 _read( stream ); 277} 278 279//-------------------------------------------------------------------------- 280// AsyncWriteItem. 281//-------------------------------------------------------------------------- 282 283/// Work item for writing to an output stream. 284/// 285/// The stream being written to may implement any of the given output stream 286/// interfaces. Preference is given to IAsyncOutputStream, then to 287/// IOffsetOutputStream, and only if none of these is implemented IOutputStream 288/// is used. 289/// 290/// A useful feature is to yield ownership of the data buffer to the 291/// write item. This way, this can be pretty much used in a fire-and-forget 292/// manner where after submission, no further synchronization happens 293/// between the client and the work item. 294/// 295/// @note Be aware that if writing to an output stream that has an implicit 296/// position property, multiple concurrent writes will interfere with each other. 297template< typename T, class Stream = IOffsetOutputStream< T > > 298class AsyncWriteItem : public AsyncIOItem< T, Stream > 299{ 300 public: 301 302 typedef AsyncIOItem< T, Stream> Parent; 303 typedef typename Parent::StreamType StreamType; 304 typedef typename Parent::OffsetType OffsetType; 305 typedef typename Parent::BufferType BufferType; 306 typedef typename Parent::ValueType ValueType; 307 308 AsyncWriteItem( StreamType* stream, U32 numElements, OffsetType offsetInStream, 309 BufferType& buffer, bool takeOwnershipOfBuffer = true, 310 U32 offsetInBuffer = 0, ThreadContext* context = 0 ) 311 : Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context ) 312 { 313 _prep( stream ); 314 } 315 316 protected: 317 318 /// Handle of asynchronous write operation, if the stream implements IAsyncOutputStream. 319 void* mAsyncHandle; 320 321 virtual void execute(); 322 323 void _prep( StreamType* stream ) 324 { 325 IAsyncOutputStream< T>* s = dynamic_cast< IAsyncOutputStream< T>* >( stream ); 326 if( s ) 327 mAsyncHandle = s->issueWriteAt( this->getOffset(), this->getBufferPtr(), this->getNumElements() ); 328 } 329 330 void _write( IOutputStream< T>* stream ) 331 { 332 stream->write( this->getBufferPtr(), this->getNumElements() ); 333 } 334 void _write( IOffsetOutputStream< T>* stream ) 335 { 336 stream->writeAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() ); 337 } 338 void _write( IAsyncOutputStream< T>* stream ) 339 { 340 stream->tryCompleteWriteAt( mAsyncHandle, true ); 341 } 342}; 343 344template< typename T, class Stream > 345void AsyncWriteItem< T, Stream >::execute() 346{ 347 if( this->cancellationPoint() ) return; 348 349 StreamType* stream = this->getStream(); 350 if( dynamic_cast< IAsyncOutputStream< T>* >( stream ) ) 351 _write( ( IAsyncOutputStream< T>* ) stream ); 352 if( dynamic_cast< IOffsetOutputStream< T>* >( stream ) ) 353 _write( ( IOffsetOutputStream< T>* ) stream ); 354 else 355 _write( stream ); 356} 357 358#endif // _THREADPOOLASYNCIO_H_ 359