Torque3D Documentation / _generateds / asyncBufferedStream.h

asyncBufferedStream.h

Engine/source/platform/async/asyncBufferedStream.h

More...

Classes:

class

Asynchronous work item for reading an element from the source stream.

class

A stream filter that performs background read-aheads on its source stream and buffers the results.

Detailed Description

  1
  2//-----------------------------------------------------------------------------
  3// Copyright (c) 2012 GarageGames, LLC
  4//
  5// Permission is hereby granted, free of charge, to any person obtaining a copy
  6// of this software and associated documentation files (the "Software"), to
  7// deal in the Software without restriction, including without limitation the
  8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9// sell copies of the Software, and to permit persons to whom the Software is
 10// furnished to do so, subject to the following conditions:
 11//
 12// The above copyright notice and this permission notice shall be included in
 13// all copies or substantial portions of the Software.
 14//
 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 21// IN THE SOFTWARE.
 22//-----------------------------------------------------------------------------
 23
 24#ifndef _ASYNCBUFFEREDSTREAM_H_
 25#define _ASYNCBUFFEREDSTREAM_H_
 26
 27#ifndef _TSTREAM_H_
 28   #include "core/stream/tStream.h"
 29#endif
 30#ifndef _THREADPOOL_H_
 31   #include "platform/threads/threadPool.h"
 32#endif
 33#ifndef _THREADSAFEDEQUE_H_
 34   #include "platform/threads/threadSafeDeque.h"
 35#endif
 36
 37
 38// Disable nonsense warning about unreferenced
 39// local function on VC.
 40#ifdef TORQUE_COMPILER_VISUALC
 41   #pragma warning( disable: 4505 )
 42#endif
 43
 44
 45template< typename T, class Stream >
 46class AsyncBufferedReadItem;
 47
 48
 49
 50//=============================================================================
 51//    AsyncBufferedInputStream.
 52//=============================================================================
 53
 54
 55///
 56template< typename T, class Stream = IInputStream< T >* >
 57class AsyncBufferedInputStream : public IInputStreamFilter< T, Stream >,
 58                                 public ThreadSafeRefCount< AsyncBufferedInputStream< T, Stream > >
 59{
 60   public:
 61
 62      typedef IInputStreamFilter< T, Stream> Parent;
 63      
 64      /// Type of elements read, buffered, and returned by this stream.
 65      typedef typename Parent::ElementType ElementType;
 66      
 67      /// Type of the source stream being read by this stream.
 68      typedef typename Parent::SourceStreamType SourceStreamType;
 69      
 70      /// Type of elements being read from the source stream.
 71      ///
 72      /// @note This does not need to correspond to the type of elements buffered
 73      ///   in this stream.
 74      typedef typename Parent::SourceElementType SourceElementType;
 75
 76      enum
 77      {
 78         /// The number of elements to buffer in advance by default.
 79         DEFAULT_STREAM_LOOKAHEAD = 3
 80      };
 81      
 82      friend class AsyncBufferedReadItem< T, Stream >; // _onArrival
 83
 84   protected:
 85
 86      /// Stream elements are kept on deques that can be concurrently
 87      /// accessed by multiple threads.
 88      typedef ThreadSafeDeque< ElementType> ElementList;
 89
 90      /// If true, the stream will restart over from the beginning once
 91      /// it has been read in entirety.
 92      bool mIsLooping;
 93
 94      /// If true, no further requests should be issued on this stream.
 95      /// @note This in itself doesn't say anything about pending requests.
 96      bool mIsStopped;
 97
 98      /// Number of source elements remaining in the source stream.
 99      U32 mNumRemainingSourceElements;
100
101      /// Number of elements currently on buffer list.
102      U32 mNumBufferedElements;
103
104      /// Maximum number of elements allowed on buffer list.
105      U32 mMaxBufferedElements;
106
107      /// List of buffered elements.
108      ElementList mBufferedElements;
109
110      /// The thread pool to which read items are queued.
111      ThreadPool* mThreadPool;
112
113      /// The thread context used for prioritizing read items in the pool.
114      ThreadContext* mThreadContext;
115
116      /// Request the next element from the underlying stream.
117      virtual void _requestNext() = 0;
118
119      /// Called when an element read has been completed on the underlying stream.
120      virtual void _onArrival( const ElementType& element );
121
122   public:
123
124      /// Construct a new buffered stream reading from "source".
125      ///
126      /// @param stream The source stream from which to read the actual data elements.
127      /// @param numSourceElementsToRead Total number of elements to read from "stream".
128      /// @param numReadAhead Number of packets to read and buffer in advance.
129      /// @param isLooping If true, the packet stream will loop infinitely over the source stream.
130      /// @param pool The ThreadPool to use for asynchronous packet reads.
131      /// @param context The ThreadContext to place asynchronous packet reads in.
132      AsyncBufferedInputStream(  const Stream& stream,
133                                 U32 numSourceElementsToRead = 0,
134                                 U32 numReadAhead = DEFAULT_STREAM_LOOKAHEAD,
135                                 bool isLooping = false,
136                                 ThreadPool* pool = &ThreadPool::GLOBAL(),
137                                 ThreadContext* context = ThreadContext::ROOT_CONTEXT() );
138
139      virtual ~AsyncBufferedInputStream();
140      
141      /// @return true if the stream is looping infinitely.
142      bool isLooping() const { return mIsLooping; }
143
144      /// @return the number of elements that will be read and buffered in advance.
145      U32 getReadAhead() const { return mMaxBufferedElements; }
146
147      /// Initiate the request chain of the element stream.
148      void start() { _requestNext(); }
149
150      /// Call for the request chain of the element stream to stop at the next
151      /// synchronization point.
152      void stop() { mIsStopped = true; }
153
154      // IInputStream.
155      virtual U32 read( ElementType* buffer, U32 num );
156};
157
158//-----------------------------------------------------------------------------
159
160template< typename T, typename Stream >
161AsyncBufferedInputStream< T, Stream >::AsyncBufferedInputStream
162         (  const Stream& stream,
163            U32 numSourceElementsToRead,
164            U32 numReadAhead,
165            bool isLooping,
166            ThreadPool* threadPool,
167            ThreadContext* threadContext )
168   : Parent( stream ),
169     mIsLooping( isLooping ),
170     mIsStopped( false ),
171     mNumRemainingSourceElements( numSourceElementsToRead ),
172     mNumBufferedElements( 0 ),
173     mMaxBufferedElements( numReadAhead ),
174     mThreadPool( threadPool ),
175     mThreadContext( threadContext )
176{
177   if( mIsLooping )
178   {
179      // Stream is looping so we don't count down source elements.
180      
181      mNumRemainingSourceElements = 0;
182   }
183   else if( !mNumRemainingSourceElements )
184   {
185      // If not given number of elements to read, see if the source
186      // stream is sizeable.  If so, take its size as the number of elements.
187      
188      if( dynamic_cast< ISizeable<>* >( &Deref( stream ) ) )
189         mNumRemainingSourceElements = ( ( ISizeable<>* ) &Deref( stream ) )->getSize();
190      else
191      {
192         // Can't tell how many source elements there are.
193         
194         mNumRemainingSourceElements = U32_MAX;
195      }
196   }
197}
198
199//-----------------------------------------------------------------------------
200
201template< typename T, typename Stream >
202AsyncBufferedInputStream< T, Stream >::~AsyncBufferedInputStream()
203{
204   ElementType element;
205   while( mBufferedElements.tryPopFront( element ) )
206      destructSingle( element );
207}
208
209//-----------------------------------------------------------------------------
210
211template< typename T, typename Stream >
212void AsyncBufferedInputStream< T, Stream >::_onArrival( const ElementType& element )
213{
214   mBufferedElements.pushBack( element );
215   
216   // Adjust buffer count.
217
218   while( 1 )
219   {
220      S32 numBuffered = mNumBufferedElements;
221      if( dCompareAndSwap( mNumBufferedElements, numBuffered, numBuffered + 1 ) )
222      {
223         // If we haven't run against the lookahead limit and haven't reached
224         // the end of the stream, immediately trigger a new request.
225         
226         if( !mIsStopped && ( numBuffered + 1 ) < mMaxBufferedElements )
227            _requestNext();
228            
229         break;
230      }
231   }
232}
233
234//-----------------------------------------------------------------------------
235
236template< typename T, typename Stream >
237U32 AsyncBufferedInputStream< T, Stream >::read( ElementType* buffer, U32 num )
238{
239   if( !num )
240      return 0;
241
242   U32 numRead = 0;
243   for( U32 i = 0; i < num; ++ i )
244   {
245      // Try to pop a element off the buffered element list.
246   
247      ElementType element;
248      if( mBufferedElements.tryPopFront( element ) )
249      {
250         buffer[ i ] = element;
251         numRead ++;
252      }
253      else
254         break;
255   }
256
257   // Get the request chain going again, if it has stopped.
258   
259   while( 1 )
260   {
261      U32 numBuffered = mNumBufferedElements;
262      U32 newNumBuffered = numBuffered - numRead;
263      
264      if( dCompareAndSwap( mNumBufferedElements, numBuffered, newNumBuffered ) )
265      {
266         if( numBuffered == mMaxBufferedElements )
267            _requestNext();
268         
269         break;
270      }
271   }
272
273   return numRead;
274}
275
276
277//=============================================================================
278//    AsyncSingleBufferedInputStream.
279//=============================================================================
280
281
282/// Asynchronous work item for reading an element from the source stream.
283template< typename T, typename Stream = IInputStream< T >* >
284class AsyncBufferedReadItem : public ThreadWorkItem
285{
286   public:
287
288      typedef ThreadWorkItem Parent;
289      typedef ThreadSafeRef< AsyncBufferedInputStream< T, Stream> > AsyncStreamRef;
290      
291   protected:
292
293      /// The issueing async state.
294      AsyncStreamRef mAsyncStream;
295      
296      ///
297      Stream mSourceStream;
298
299      /// The element read from the stream.
300      T mElement;
301
302      // WorkItem
303      virtual void execute()
304      {
305         if( Deref( mSourceStream ).read( &mElement, 1 ) )
306         {                  
307            // Buffer the element.
308
309            if( this->cancellationPoint() ) return;
310            mAsyncStream->_onArrival( mElement );
311         }
312      }
313      virtual void onCancelled()
314      {
315         Parent::onCancelled();
316         destructSingle( mElement );
317         mAsyncStream = NULL;
318      }
319      
320   public:
321
322      ///
323      AsyncBufferedReadItem(
324         const AsyncStreamRef& asyncStream,
325         ThreadPool::Context* context = NULL
326      )
327         : Parent( context ),
328           mAsyncStream( asyncStream ),
329           mSourceStream( asyncStream->getSourceStream() )
330      {
331      }
332
333};
334
335
336/// A stream filter that performs background read-aheads on its source stream
337/// and buffers the results.
338///
339/// As each element is read in an independent threaded operation, reading an
340/// element should invole a certain amount of work for using this class to
341/// make sense.
342///
343/// @note For looping streams, the stream must implement the IResettable interface.
344///
345template< typename T, typename Stream = IInputStream< T >*, class ReadItem = AsyncBufferedReadItem< T, Stream > >
346class AsyncSingleBufferedInputStream : public AsyncBufferedInputStream< T, Stream >
347{
348   public:
349
350      typedef AsyncBufferedInputStream< T, Stream> Parent;
351      typedef typename Parent::ElementType ElementType;
352      typedef typename Parent::SourceElementType SourceElementType;
353      typedef typename Parent::SourceStreamType SourceStreamType;
354      
355   protected:
356         
357      // AsyncBufferedInputStream.
358      virtual void _requestNext();
359
360      /// Create a new work item that reads the next element.
361      virtual void _newReadItem( ThreadSafeRef< ThreadWorkItem>& outRef )
362      {
363         outRef = new ReadItem( this, this->mThreadContext );
364      }
365            
366   public:
367
368      /// Construct a new buffered stream reading from "source".
369      ///
370      /// @param stream The source stream from which to read the actual data elements.
371      /// @param numSourceElementsToRead Total number of elements to read from "stream".
372      /// @param numReadAhead Number of packets to read and buffer in advance.
373      /// @param isLooping If true, the packet stream will loop infinitely over the source stream.
374      /// @param pool The ThreadPool to use for asynchronous packet reads.
375      /// @param context The ThreadContext to place asynchronous packet reads in.
376      AsyncSingleBufferedInputStream(  const Stream& stream,
377                                       U32 numSourceElementsToRead = 0,
378                                       U32 numReadAhead = Parent::DEFAULT_STREAM_LOOKAHEAD,
379                                       bool isLooping = false,
380                                       ThreadPool* pool = &ThreadPool::GLOBAL(),
381                                       ThreadContext* context = ThreadContext::ROOT_CONTEXT() )
382         : Parent(   stream,
383                     numSourceElementsToRead,
384                     numReadAhead,
385                     isLooping,
386                     pool,
387                     context ) {}
388};
389
390//-----------------------------------------------------------------------------
391
392template< typename T, typename Stream, class ReadItem >
393void AsyncSingleBufferedInputStream< T, Stream, ReadItem >::_requestNext()
394{
395   Stream& stream = this->getSourceStream();
396   bool isEOS = !this->mNumRemainingSourceElements;
397   if( isEOS && this->mIsLooping )
398   {
399      SourceStreamType* s = &Deref( stream );
400      dynamic_cast< IResettable* >( s )->reset();
401      isEOS = false;
402   }
403   else if( isEOS )
404      return;
405
406   //TODO: could scale priority depending on feed status
407
408   // Queue a stream packet work item.
409
410   if( !this->mIsLooping && this->mNumRemainingSourceElements != U32_MAX )
411      -- this->mNumRemainingSourceElements;
412      
413   ThreadSafeRef< ThreadWorkItem> workItem;
414   _newReadItem( workItem );
415   this->mThreadPool->queueWorkItem( workItem );
416}
417
418#endif // !_ASYNCBUFFEREDSTREAM_H_
419