Torque3D Documentation / _generateds / asyncPacketStream.h

asyncPacketStream.h

Engine/source/platform/async/asyncPacketStream.h

Input stream filter definitions for turning linear streams into streams that yield data in discrete packets using background reads.

More...

Classes:

class

Stream packet read by an asynchronous packet stream.

class

A packet stream turns a continuous stream of elements into a stream of discrete packets of elements.

class

Asynchronous work item for reading a packet from the source stream.

Detailed Description

Input stream filter definitions for turning linear streams into streams that yield data in discrete packets using background reads.

  1
  2//-----------------------------------------------------------------------------
  3// Copyright (c) 2012 GarageGames, LLC
  4//
  5// Permission is hereby granted, free of charge, to any person obtaining a copy
  6// of this software and associated documentation files (the "Software"), to
  7// deal in the Software without restriction, including without limitation the
  8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9// sell copies of the Software, and to permit persons to whom the Software is
 10// furnished to do so, subject to the following conditions:
 11//
 12// The above copyright notice and this permission notice shall be included in
 13// all copies or substantial portions of the Software.
 14//
 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 21// IN THE SOFTWARE.
 22//-----------------------------------------------------------------------------
 23
 24#ifndef _ASYNCPACKETSTREAM_H_
 25#define _ASYNCPACKETSTREAM_H_
 26
 27#ifndef _ASYNCBUFFEREDSTREAM_H_
 28   #include "platform/async/asyncBufferedStream.h"
 29#endif
 30#ifndef _RAWDATA_H_
 31   #include "core/util/rawData.h"
 32#endif
 33#ifndef _THREADPOOLASYNCIO_H_
 34   #include "platform/threads/threadPoolAsyncIO.h"
 35#endif
 36
 37
 38//#define DEBUG_SPEW
 39
 40
 41/// @file
 42/// Input stream filter definitions for turning linear streams into
 43/// streams that yield data in discrete packets using background
 44/// reads.
 45
 46
 47//--------------------------------------------------------------------------
 48//    Async stream packets.
 49//--------------------------------------------------------------------------
 50
 51/// Stream packet read by an asynchronous packet stream.
 52template< typename T >
 53class AsyncPacket : public RawDataT< T >
 54{
 55   public:
 56
 57      typedef RawDataT< T> Parent;
 58
 59      AsyncPacket()
 60         : mIndex( 0 ), mSizeActual( 0 ), mIsLast( false ) {}
 61      AsyncPacket( T* data, U32 size, bool ownMemory = false )
 62         : Parent( data, size, ownMemory ),
 63           mIndex( 0 ), mSizeActual( 0 ), mIsLast( false ) {}
 64
 65      /// Running number in stream.
 66      U32 mIndex;
 67
 68      /// Number of items that have actually been read into the packet.
 69      /// This may be less than "size" for end-of-stream packets in non-looping
 70      /// streams.
 71      ///
 72      /// @note Extraneous space at the end of the packet will be cleared using
 73      ///   constructArray() calls.
 74      U32 mSizeActual;
 75
 76      /// If true this is the last packet in the stream.
 77      bool mIsLast;
 78};
 79
 80//--------------------------------------------------------------------------
 81//    Async packet streams.
 82//--------------------------------------------------------------------------
 83
 84/// A packet stream turns a continuous stream of elements into a
 85/// stream of discrete packets of elements.
 86///
 87/// All packets are of the exact same size even if, for end-of-stream
 88/// packets, they actually contain less data than their actual size.
 89/// Extraneous space is cleared.
 90///
 91/// @note For looping streams, the stream must implement the
 92///   IResettable interface.
 93template< typename Stream, class Packet = AsyncPacket< typename TypeTraits< Stream >::BaseType::ElementType > >
 94class AsyncPacketBufferedInputStream : public AsyncBufferedInputStream< Packet*, Stream >
 95{
 96   public:
 97
 98      typedef AsyncBufferedInputStream< Packet*, Stream> Parent;
 99      typedef Packet PacketType;
100      typedef typename TypeTraits< Stream >::BaseType StreamType;
101
102   protected:
103
104      class PacketReadItem;
105      friend class PacketReadItem; // _onArrival
106
107      /// Asynchronous work item for reading a packet from the source stream.
108      class PacketReadItem : public AsyncReadItem< typename Parent::SourceElementType, StreamType >
109      {
110         public:
111
112            typedef AsyncReadItem< typename AsyncPacketBufferedInputStream< Stream, Packet >::SourceElementType, StreamType > Parent;
113
114            PacketReadItem( const ThreadSafeRef< AsyncPacketBufferedInputStream< Stream, Packet> >& asyncStream,
115                            PacketType* packet,
116                            U32 numElements,
117                            ThreadPool::Context* context = NULL )
118               : Parent( asyncStream->getSourceStream(), numElements, 0, *packet, false, 0, context ),
119                 mAsyncStream( asyncStream ),
120                 mPacket( packet ) {}
121
122         protected:
123
124            typedef ThreadSafeRef< AsyncPacketBufferedInputStream< Stream, Packet> > AsyncPacketStreamPtr;
125
126            /// The issueing async state.
127            AsyncPacketStreamPtr mAsyncStream;
128
129            /// The packet that receives the data.
130            PacketType* mPacket;
131
132            // WorkItem
133            virtual void execute()
134            {
135               Parent::execute();
136               mPacket->mSizeActual += this->mNumElementsRead;
137               
138               #ifdef DEBUG_SPEW
139               Platform::outputDebugString( "[AsyncPacketStream] read %i elements into packet #%i with size %i",
140                  this->mNumElementsRead, mPacket->mIndex, mPacket->size );
141               #endif
142
143               // Handle extraneous space at end of packet.
144
145               if( this->cancellationPoint() ) return;
146               U32 numExtraElements = mPacket->size - this->mNumElementsRead;
147               if( numExtraElements )
148               {                  
149                  if( mAsyncStream->mIsLooping
150                      && dynamic_cast< IResettable* >( &Deref( this->getStream() ) ) )
151                  {
152                     #ifdef DEBUG_SPEW
153                     Platform::outputDebugString( "[AsyncPacketStream] resetting stream and reading %i more elements", numExtraElements );
154                     #endif
155
156                     // Wrap around and start re-reading from beginning of stream.
157
158                     dynamic_cast< IResettable* >( &Deref( this->getStream() ) )->reset();
159                     
160                     this->mOffsetInBuffer += this->mNumElementsRead;
161                     this->mOffsetInStream = 0;
162                     this->mNumElements = numExtraElements;
163
164                     this->_prep();
165                     Parent::execute();
166                     
167                     mPacket->mSizeActual += this->mNumElementsRead;
168                  }
169                  else
170                     constructArray( &mPacket->data[ this->mNumElementsRead ], numExtraElements );
171               }
172
173               // Buffer the packet.
174
175               if( this->cancellationPoint() ) return;
176               mAsyncStream->_onArrival( mPacket );
177            }
178            virtual void onCancelled()
179            {
180               Parent::onCancelled();
181               destructSingle< PacketType* >( mPacket );
182               mAsyncStream = NULL;
183            }
184      };
185
186      typedef ThreadSafeRef< PacketReadItem> PacketReadItemRef;
187
188      /// Number of elements to read per packet.
189      U32 mPacketSize;
190
191      /// Running number of next stream packet.
192      U32 mNextPacketIndex;
193            
194      /// Total number of elements in the source stream.
195      U32 mNumTotalSourceElements;
196
197      /// Create a new stream packet of the given size.
198      virtual PacketType* _newPacket( U32 packetSize ) { return constructSingle< PacketType* >( packetSize ); }
199
200      /// Request the next packet from the underlying stream.
201      virtual void _requestNext();
202
203      /// Create a new work item that reads "numElements" into "packet".
204      virtual void _newReadItem( PacketReadItemRef& outRef,
205                                 PacketType* packet,
206                                 U32 numElements )
207      {
208         outRef = new PacketReadItem( this, packet, numElements, this->mThreadContext );
209      }
210
211   public:
212
213      /// Construct a new packet stream reading from "stream".
214      ///
215      /// @note If looping is used and "stream" is not read from the beginning, "stream" should
216      ///   implement IPositionable<U32> or ISizeable<U32> so the async stream can tell how many elements
217      ///   there actually are in the stream after resetting.
218      ///
219      /// @param stream The source stream from which to read the actual data elements.
220      /// @param packetSize Size of stream packets returned by the stream in number of elements.
221      /// @param numSourceElementsToRead Number of elements to read from "stream".
222      /// @param numReadAhead Number of packets to read and buffer in advance.
223      /// @param isLooping If true, the packet stream will loop infinitely over the source stream.
224      /// @param pool The ThreadPool to use for asynchronous packet reads.
225      /// @param context The ThreadContext to place asynchronous packet reads in.
226      AsyncPacketBufferedInputStream( const Stream& stream,
227                              U32 packetSize,
228                              U32 numSourceElementsToRead = 0,
229                              U32 numReadAhead = Parent::DEFAULT_STREAM_LOOKAHEAD,
230                              bool isLooping = false,
231                              ThreadPool* pool = &ThreadPool::GLOBAL(),
232                              ThreadContext* context = ThreadContext::ROOT_CONTEXT() );
233      
234      /// @return the size of stream packets returned by this stream in number of elements.
235      U32 getPacketSize() const { return mPacketSize; }
236};
237
238template< typename Stream, class Packet >
239AsyncPacketBufferedInputStream< Stream, Packet >::AsyncPacketBufferedInputStream
240         (  const Stream& stream,
241            U32 packetSize,
242            U32 numSourceElementsToRead,
243            U32 numReadAhead,
244            bool isLooping,
245            ThreadPool* threadPool,
246            ThreadContext* threadContext )
247   : Parent( stream, numSourceElementsToRead, numReadAhead, isLooping, threadPool, threadContext ),
248     mPacketSize( packetSize ),
249     mNextPacketIndex( 0 ),
250     mNumTotalSourceElements( numSourceElementsToRead )
251{
252   AssertFatal( mPacketSize > 0,
253      "AsyncPacketStream::AsyncPacketStream() - packet size cannot be zero" );
254      
255   // Determine total number of elements in stream, if possible.
256      
257   IPositionable< U32>* positionable = dynamic_cast< IPositionable< U32>* >( &Deref( stream ) );
258   if( positionable )
259      mNumTotalSourceElements += positionable->getPosition();
260   else
261   {
262      ISizeable< U32>* sizeable = dynamic_cast< ISizeable< U32>* >( &Deref( stream ) );
263      if( sizeable )
264         mNumTotalSourceElements = sizeable->getSize();
265   }
266   
267   #ifdef DEBUG_SPEW
268   Platform::outputDebugString( "[AsyncPacketStream] %i remaining, %i total (%i packets)",
269      this->mNumRemainingSourceElements, mNumTotalSourceElements,
270      ( this->mNumRemainingSourceElements / mPacketSize ) + ( this->mNumRemainingSourceElements % mPacketSize ? 1 : 0 ) );
271   #endif
272}
273
274template< typename Stream, class Packet >
275void AsyncPacketBufferedInputStream< Stream, Packet >::_requestNext()
276{
277   Stream& stream = this->getSourceStream();
278   bool isEOS = !this->mNumRemainingSourceElements;
279   if( isEOS && this->mIsLooping )
280   {
281      StreamType* s = &Deref( stream );
282      IResettable* resettable = dynamic_cast< IResettable* >( s );
283      if( resettable )
284      {
285         IPositionable< U32>* positionable = dynamic_cast< IPositionable< U32>* >( &Deref( stream ) );
286         U32 pos;
287         if(positionable)
288            pos = positionable->getPosition();
289         
290         resettable->reset();
291         isEOS = false;
292         this->mNumRemainingSourceElements = mNumTotalSourceElements;
293         
294         if( positionable )
295         {
296            positionable->setPosition(pos);
297            U32 dur = stream->getDuration();
298            if(dur != 0) //avoiding division by zero? not needed, probably
299               this->mNumRemainingSourceElements -= (U32)(mNumTotalSourceElements*(F32)pos/dur);
300         }
301      }
302   }
303   else if( isEOS )
304      return;
305
306   //TODO: scale priority depending on feed status
307
308   // Allocate a packet.
309
310   U32 numElements = mPacketSize;
311   PacketType* packet = _newPacket( numElements );
312   packet->mIndex = mNextPacketIndex;
313   mNextPacketIndex ++;
314
315   // Queue a stream packet work item.
316
317   if( numElements >= this->mNumRemainingSourceElements )
318   {
319      if( !this->mIsLooping )
320      {
321         this->mNumRemainingSourceElements = 0;
322         packet->mIsLast = true;
323      }
324      else
325         this->mNumRemainingSourceElements = ( this->mNumTotalSourceElements - numElements + this->mNumRemainingSourceElements );
326   }
327   else
328      this->mNumRemainingSourceElements -= numElements;
329      
330   #ifdef DEBUG_SPEW
331   Platform::outputDebugString( "[AsyncPacketStream] packet %i, %i remaining, %i total",
332      packet->mIndex, this->mNumRemainingSourceElements, mNumTotalSourceElements );
333   #endif
334
335   ThreadSafeRef< PacketReadItem> workItem;
336   _newReadItem( workItem, packet, numElements );
337   this->mThreadPool->queueWorkItem( workItem );
338}
339
340#undef DEBUG_SPEW
341#endif // !_ASYNCPACKETSTREAM_H_
342