asyncBufferedStream.h
Engine/source/platform/async/asyncBufferedStream.h
Classes:
class
Asynchronous work item for reading an element from the source stream.
class
A stream filter that performs background read-aheads on its source stream and buffers the results.
Detailed Description
1 2//----------------------------------------------------------------------------- 3// Copyright (c) 2012 GarageGames, LLC 4// 5// Permission is hereby granted, free of charge, to any person obtaining a copy 6// of this software and associated documentation files (the "Software"), to 7// deal in the Software without restriction, including without limitation the 8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9// sell copies of the Software, and to permit persons to whom the Software is 10// furnished to do so, subject to the following conditions: 11// 12// The above copyright notice and this permission notice shall be included in 13// all copies or substantial portions of the Software. 14// 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21// IN THE SOFTWARE. 22//----------------------------------------------------------------------------- 23 24#ifndef _ASYNCBUFFEREDSTREAM_H_ 25#define _ASYNCBUFFEREDSTREAM_H_ 26 27#ifndef _TSTREAM_H_ 28 #include "core/stream/tStream.h" 29#endif 30#ifndef _THREADPOOL_H_ 31 #include "platform/threads/threadPool.h" 32#endif 33#ifndef _THREADSAFEDEQUE_H_ 34 #include "platform/threads/threadSafeDeque.h" 35#endif 36 37 38// Disable nonsense warning about unreferenced 39// local function on VC. 40#ifdef TORQUE_COMPILER_VISUALC 41 #pragma warning( disable: 4505 ) 42#endif 43 44 45template< typename T, class Stream > 46class AsyncBufferedReadItem; 47 48 49 50//============================================================================= 51// AsyncBufferedInputStream. 52//============================================================================= 53 54 55/// 56template< typename T, class Stream = IInputStream< T >* > 57class AsyncBufferedInputStream : public IInputStreamFilter< T, Stream >, 58 public ThreadSafeRefCount< AsyncBufferedInputStream< T, Stream > > 59{ 60 public: 61 62 typedef IInputStreamFilter< T, Stream> Parent; 63 64 /// Type of elements read, buffered, and returned by this stream. 65 typedef typename Parent::ElementType ElementType; 66 67 /// Type of the source stream being read by this stream. 68 typedef typename Parent::SourceStreamType SourceStreamType; 69 70 /// Type of elements being read from the source stream. 71 /// 72 /// @note This does not need to correspond to the type of elements buffered 73 /// in this stream. 74 typedef typename Parent::SourceElementType SourceElementType; 75 76 enum 77 { 78 /// The number of elements to buffer in advance by default. 79 DEFAULT_STREAM_LOOKAHEAD = 3 80 }; 81 82 friend class AsyncBufferedReadItem< T, Stream >; // _onArrival 83 84 protected: 85 86 /// Stream elements are kept on deques that can be concurrently 87 /// accessed by multiple threads. 88 typedef ThreadSafeDeque< ElementType> ElementList; 89 90 /// If true, the stream will restart over from the beginning once 91 /// it has been read in entirety. 92 bool mIsLooping; 93 94 /// If true, no further requests should be issued on this stream. 95 /// @note This in itself doesn't say anything about pending requests. 96 bool mIsStopped; 97 98 /// Number of source elements remaining in the source stream. 99 U32 mNumRemainingSourceElements; 100 101 /// Number of elements currently on buffer list. 102 U32 mNumBufferedElements; 103 104 /// Maximum number of elements allowed on buffer list. 105 U32 mMaxBufferedElements; 106 107 /// List of buffered elements. 108 ElementList mBufferedElements; 109 110 /// The thread pool to which read items are queued. 111 ThreadPool* mThreadPool; 112 113 /// The thread context used for prioritizing read items in the pool. 114 ThreadContext* mThreadContext; 115 116 /// Request the next element from the underlying stream. 117 virtual void _requestNext() = 0; 118 119 /// Called when an element read has been completed on the underlying stream. 120 virtual void _onArrival( const ElementType& element ); 121 122 public: 123 124 /// Construct a new buffered stream reading from "source". 125 /// 126 /// @param stream The source stream from which to read the actual data elements. 127 /// @param numSourceElementsToRead Total number of elements to read from "stream". 128 /// @param numReadAhead Number of packets to read and buffer in advance. 129 /// @param isLooping If true, the packet stream will loop infinitely over the source stream. 130 /// @param pool The ThreadPool to use for asynchronous packet reads. 131 /// @param context The ThreadContext to place asynchronous packet reads in. 132 AsyncBufferedInputStream( const Stream& stream, 133 U32 numSourceElementsToRead = 0, 134 U32 numReadAhead = DEFAULT_STREAM_LOOKAHEAD, 135 bool isLooping = false, 136 ThreadPool* pool = &ThreadPool::GLOBAL(), 137 ThreadContext* context = ThreadContext::ROOT_CONTEXT() ); 138 139 virtual ~AsyncBufferedInputStream(); 140 141 /// @return true if the stream is looping infinitely. 142 bool isLooping() const { return mIsLooping; } 143 144 /// @return the number of elements that will be read and buffered in advance. 145 U32 getReadAhead() const { return mMaxBufferedElements; } 146 147 /// Initiate the request chain of the element stream. 148 void start() { _requestNext(); } 149 150 /// Call for the request chain of the element stream to stop at the next 151 /// synchronization point. 152 void stop() { mIsStopped = true; } 153 154 // IInputStream. 155 virtual U32 read( ElementType* buffer, U32 num ); 156}; 157 158//----------------------------------------------------------------------------- 159 160template< typename T, typename Stream > 161AsyncBufferedInputStream< T, Stream >::AsyncBufferedInputStream 162 ( const Stream& stream, 163 U32 numSourceElementsToRead, 164 U32 numReadAhead, 165 bool isLooping, 166 ThreadPool* threadPool, 167 ThreadContext* threadContext ) 168 : Parent( stream ), 169 mIsLooping( isLooping ), 170 mIsStopped( false ), 171 mNumRemainingSourceElements( numSourceElementsToRead ), 172 mNumBufferedElements( 0 ), 173 mMaxBufferedElements( numReadAhead ), 174 mThreadPool( threadPool ), 175 mThreadContext( threadContext ) 176{ 177 if( mIsLooping ) 178 { 179 // Stream is looping so we don't count down source elements. 180 181 mNumRemainingSourceElements = 0; 182 } 183 else if( !mNumRemainingSourceElements ) 184 { 185 // If not given number of elements to read, see if the source 186 // stream is sizeable. If so, take its size as the number of elements. 187 188 if( dynamic_cast< ISizeable<>* >( &Deref( stream ) ) ) 189 mNumRemainingSourceElements = ( ( ISizeable<>* ) &Deref( stream ) )->getSize(); 190 else 191 { 192 // Can't tell how many source elements there are. 193 194 mNumRemainingSourceElements = U32_MAX; 195 } 196 } 197} 198 199//----------------------------------------------------------------------------- 200 201template< typename T, typename Stream > 202AsyncBufferedInputStream< T, Stream >::~AsyncBufferedInputStream() 203{ 204 ElementType element; 205 while( mBufferedElements.tryPopFront( element ) ) 206 destructSingle( element ); 207} 208 209//----------------------------------------------------------------------------- 210 211template< typename T, typename Stream > 212void AsyncBufferedInputStream< T, Stream >::_onArrival( const ElementType& element ) 213{ 214 mBufferedElements.pushBack( element ); 215 216 // Adjust buffer count. 217 218 while( 1 ) 219 { 220 S32 numBuffered = mNumBufferedElements; 221 if( dCompareAndSwap( mNumBufferedElements, numBuffered, numBuffered + 1 ) ) 222 { 223 // If we haven't run against the lookahead limit and haven't reached 224 // the end of the stream, immediately trigger a new request. 225 226 if( !mIsStopped && ( numBuffered + 1 ) < mMaxBufferedElements ) 227 _requestNext(); 228 229 break; 230 } 231 } 232} 233 234//----------------------------------------------------------------------------- 235 236template< typename T, typename Stream > 237U32 AsyncBufferedInputStream< T, Stream >::read( ElementType* buffer, U32 num ) 238{ 239 if( !num ) 240 return 0; 241 242 U32 numRead = 0; 243 for( U32 i = 0; i < num; ++ i ) 244 { 245 // Try to pop a element off the buffered element list. 246 247 ElementType element; 248 if( mBufferedElements.tryPopFront( element ) ) 249 { 250 buffer[ i ] = element; 251 numRead ++; 252 } 253 else 254 break; 255 } 256 257 // Get the request chain going again, if it has stopped. 258 259 while( 1 ) 260 { 261 U32 numBuffered = mNumBufferedElements; 262 U32 newNumBuffered = numBuffered - numRead; 263 264 if( dCompareAndSwap( mNumBufferedElements, numBuffered, newNumBuffered ) ) 265 { 266 if( numBuffered == mMaxBufferedElements ) 267 _requestNext(); 268 269 break; 270 } 271 } 272 273 return numRead; 274} 275 276 277//============================================================================= 278// AsyncSingleBufferedInputStream. 279//============================================================================= 280 281 282/// Asynchronous work item for reading an element from the source stream. 283template< typename T, typename Stream = IInputStream< T >* > 284class AsyncBufferedReadItem : public ThreadWorkItem 285{ 286 public: 287 288 typedef ThreadWorkItem Parent; 289 typedef ThreadSafeRef< AsyncBufferedInputStream< T, Stream> > AsyncStreamRef; 290 291 protected: 292 293 /// The issueing async state. 294 AsyncStreamRef mAsyncStream; 295 296 /// 297 Stream mSourceStream; 298 299 /// The element read from the stream. 300 T mElement; 301 302 // WorkItem 303 virtual void execute() 304 { 305 if( Deref( mSourceStream ).read( &mElement, 1 ) ) 306 { 307 // Buffer the element. 308 309 if( this->cancellationPoint() ) return; 310 mAsyncStream->_onArrival( mElement ); 311 } 312 } 313 virtual void onCancelled() 314 { 315 Parent::onCancelled(); 316 destructSingle( mElement ); 317 mAsyncStream = NULL; 318 } 319 320 public: 321 322 /// 323 AsyncBufferedReadItem( 324 const AsyncStreamRef& asyncStream, 325 ThreadPool::Context* context = NULL 326 ) 327 : Parent( context ), 328 mAsyncStream( asyncStream ), 329 mSourceStream( asyncStream->getSourceStream() ) 330 { 331 } 332 333}; 334 335 336/// A stream filter that performs background read-aheads on its source stream 337/// and buffers the results. 338/// 339/// As each element is read in an independent threaded operation, reading an 340/// element should invole a certain amount of work for using this class to 341/// make sense. 342/// 343/// @note For looping streams, the stream must implement the IResettable interface. 344/// 345template< typename T, typename Stream = IInputStream< T >*, class ReadItem = AsyncBufferedReadItem< T, Stream > > 346class AsyncSingleBufferedInputStream : public AsyncBufferedInputStream< T, Stream > 347{ 348 public: 349 350 typedef AsyncBufferedInputStream< T, Stream> Parent; 351 typedef typename Parent::ElementType ElementType; 352 typedef typename Parent::SourceElementType SourceElementType; 353 typedef typename Parent::SourceStreamType SourceStreamType; 354 355 protected: 356 357 // AsyncBufferedInputStream. 358 virtual void _requestNext(); 359 360 /// Create a new work item that reads the next element. 361 virtual void _newReadItem( ThreadSafeRef< ThreadWorkItem>& outRef ) 362 { 363 outRef = new ReadItem( this, this->mThreadContext ); 364 } 365 366 public: 367 368 /// Construct a new buffered stream reading from "source". 369 /// 370 /// @param stream The source stream from which to read the actual data elements. 371 /// @param numSourceElementsToRead Total number of elements to read from "stream". 372 /// @param numReadAhead Number of packets to read and buffer in advance. 373 /// @param isLooping If true, the packet stream will loop infinitely over the source stream. 374 /// @param pool The ThreadPool to use for asynchronous packet reads. 375 /// @param context The ThreadContext to place asynchronous packet reads in. 376 AsyncSingleBufferedInputStream( const Stream& stream, 377 U32 numSourceElementsToRead = 0, 378 U32 numReadAhead = Parent::DEFAULT_STREAM_LOOKAHEAD, 379 bool isLooping = false, 380 ThreadPool* pool = &ThreadPool::GLOBAL(), 381 ThreadContext* context = ThreadContext::ROOT_CONTEXT() ) 382 : Parent( stream, 383 numSourceElementsToRead, 384 numReadAhead, 385 isLooping, 386 pool, 387 context ) {} 388}; 389 390//----------------------------------------------------------------------------- 391 392template< typename T, typename Stream, class ReadItem > 393void AsyncSingleBufferedInputStream< T, Stream, ReadItem >::_requestNext() 394{ 395 Stream& stream = this->getSourceStream(); 396 bool isEOS = !this->mNumRemainingSourceElements; 397 if( isEOS && this->mIsLooping ) 398 { 399 SourceStreamType* s = &Deref( stream ); 400 dynamic_cast< IResettable* >( s )->reset(); 401 isEOS = false; 402 } 403 else if( isEOS ) 404 return; 405 406 //TODO: could scale priority depending on feed status 407 408 // Queue a stream packet work item. 409 410 if( !this->mIsLooping && this->mNumRemainingSourceElements != U32_MAX ) 411 -- this->mNumRemainingSourceElements; 412 413 ThreadSafeRef< ThreadWorkItem> workItem; 414 _newReadItem( workItem ); 415 this->mThreadPool->queueWorkItem( workItem ); 416} 417 418#endif // !_ASYNCBUFFEREDSTREAM_H_ 419