asyncPacketStream.h
Engine/source/platform/async/asyncPacketStream.h
Input stream filter definitions for turning linear streams into streams that yield data in discrete packets using background reads.
Classes:
class
Stream packet read by an asynchronous packet stream.
class
A packet stream turns a continuous stream of elements into a stream of discrete packets of elements.
class
Asynchronous work item for reading a packet from the source stream.
Detailed Description
Input stream filter definitions for turning linear streams into streams that yield data in discrete packets using background reads.
1 2//----------------------------------------------------------------------------- 3// Copyright (c) 2012 GarageGames, LLC 4// 5// Permission is hereby granted, free of charge, to any person obtaining a copy 6// of this software and associated documentation files (the "Software"), to 7// deal in the Software without restriction, including without limitation the 8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9// sell copies of the Software, and to permit persons to whom the Software is 10// furnished to do so, subject to the following conditions: 11// 12// The above copyright notice and this permission notice shall be included in 13// all copies or substantial portions of the Software. 14// 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21// IN THE SOFTWARE. 22//----------------------------------------------------------------------------- 23 24#ifndef _ASYNCPACKETSTREAM_H_ 25#define _ASYNCPACKETSTREAM_H_ 26 27#ifndef _ASYNCBUFFEREDSTREAM_H_ 28 #include "platform/async/asyncBufferedStream.h" 29#endif 30#ifndef _RAWDATA_H_ 31 #include "core/util/rawData.h" 32#endif 33#ifndef _THREADPOOLASYNCIO_H_ 34 #include "platform/threads/threadPoolAsyncIO.h" 35#endif 36 37 38//#define DEBUG_SPEW 39 40 41/// @file 42/// Input stream filter definitions for turning linear streams into 43/// streams that yield data in discrete packets using background 44/// reads. 45 46 47//-------------------------------------------------------------------------- 48// Async stream packets. 49//-------------------------------------------------------------------------- 50 51/// Stream packet read by an asynchronous packet stream. 52template< typename T > 53class AsyncPacket : public RawDataT< T > 54{ 55 public: 56 57 typedef RawDataT< T> Parent; 58 59 AsyncPacket() 60 : mIndex( 0 ), mSizeActual( 0 ), mIsLast( false ) {} 61 AsyncPacket( T* data, U32 size, bool ownMemory = false ) 62 : Parent( data, size, ownMemory ), 63 mIndex( 0 ), mSizeActual( 0 ), mIsLast( false ) {} 64 65 /// Running number in stream. 66 U32 mIndex; 67 68 /// Number of items that have actually been read into the packet. 69 /// This may be less than "size" for end-of-stream packets in non-looping 70 /// streams. 71 /// 72 /// @note Extraneous space at the end of the packet will be cleared using 73 /// constructArray() calls. 74 U32 mSizeActual; 75 76 /// If true this is the last packet in the stream. 77 bool mIsLast; 78}; 79 80//-------------------------------------------------------------------------- 81// Async packet streams. 82//-------------------------------------------------------------------------- 83 84/// A packet stream turns a continuous stream of elements into a 85/// stream of discrete packets of elements. 86/// 87/// All packets are of the exact same size even if, for end-of-stream 88/// packets, they actually contain less data than their actual size. 89/// Extraneous space is cleared. 90/// 91/// @note For looping streams, the stream must implement the 92/// IResettable interface. 93template< typename Stream, class Packet = AsyncPacket< typename TypeTraits< Stream >::BaseType::ElementType > > 94class AsyncPacketBufferedInputStream : public AsyncBufferedInputStream< Packet*, Stream > 95{ 96 public: 97 98 typedef AsyncBufferedInputStream< Packet*, Stream> Parent; 99 typedef Packet PacketType; 100 typedef typename TypeTraits< Stream >::BaseType StreamType; 101 102 protected: 103 104 class PacketReadItem; 105 friend class PacketReadItem; // _onArrival 106 107 /// Asynchronous work item for reading a packet from the source stream. 108 class PacketReadItem : public AsyncReadItem< typename Parent::SourceElementType, StreamType > 109 { 110 public: 111 112 typedef AsyncReadItem< typename AsyncPacketBufferedInputStream< Stream, Packet >::SourceElementType, StreamType > Parent; 113 114 PacketReadItem( const ThreadSafeRef< AsyncPacketBufferedInputStream< Stream, Packet> >& asyncStream, 115 PacketType* packet, 116 U32 numElements, 117 ThreadPool::Context* context = NULL ) 118 : Parent( asyncStream->getSourceStream(), numElements, 0, *packet, false, 0, context ), 119 mAsyncStream( asyncStream ), 120 mPacket( packet ) {} 121 122 protected: 123 124 typedef ThreadSafeRef< AsyncPacketBufferedInputStream< Stream, Packet> > AsyncPacketStreamPtr; 125 126 /// The issueing async state. 127 AsyncPacketStreamPtr mAsyncStream; 128 129 /// The packet that receives the data. 130 PacketType* mPacket; 131 132 // WorkItem 133 virtual void execute() 134 { 135 Parent::execute(); 136 mPacket->mSizeActual += this->mNumElementsRead; 137 138 #ifdef DEBUG_SPEW 139 Platform::outputDebugString( "[AsyncPacketStream] read %i elements into packet #%i with size %i", 140 this->mNumElementsRead, mPacket->mIndex, mPacket->size ); 141 #endif 142 143 // Handle extraneous space at end of packet. 144 145 if( this->cancellationPoint() ) return; 146 U32 numExtraElements = mPacket->size - this->mNumElementsRead; 147 if( numExtraElements ) 148 { 149 if( mAsyncStream->mIsLooping 150 && dynamic_cast< IResettable* >( &Deref( this->getStream() ) ) ) 151 { 152 #ifdef DEBUG_SPEW 153 Platform::outputDebugString( "[AsyncPacketStream] resetting stream and reading %i more elements", numExtraElements ); 154 #endif 155 156 // Wrap around and start re-reading from beginning of stream. 157 158 dynamic_cast< IResettable* >( &Deref( this->getStream() ) )->reset(); 159 160 this->mOffsetInBuffer += this->mNumElementsRead; 161 this->mOffsetInStream = 0; 162 this->mNumElements = numExtraElements; 163 164 this->_prep(); 165 Parent::execute(); 166 167 mPacket->mSizeActual += this->mNumElementsRead; 168 } 169 else 170 constructArray( &mPacket->data[ this->mNumElementsRead ], numExtraElements ); 171 } 172 173 // Buffer the packet. 174 175 if( this->cancellationPoint() ) return; 176 mAsyncStream->_onArrival( mPacket ); 177 } 178 virtual void onCancelled() 179 { 180 Parent::onCancelled(); 181 destructSingle< PacketType* >( mPacket ); 182 mAsyncStream = NULL; 183 } 184 }; 185 186 typedef ThreadSafeRef< PacketReadItem> PacketReadItemRef; 187 188 /// Number of elements to read per packet. 189 U32 mPacketSize; 190 191 /// Running number of next stream packet. 192 U32 mNextPacketIndex; 193 194 /// Total number of elements in the source stream. 195 U32 mNumTotalSourceElements; 196 197 /// Create a new stream packet of the given size. 198 virtual PacketType* _newPacket( U32 packetSize ) { return constructSingle< PacketType* >( packetSize ); } 199 200 /// Request the next packet from the underlying stream. 201 virtual void _requestNext(); 202 203 /// Create a new work item that reads "numElements" into "packet". 204 virtual void _newReadItem( PacketReadItemRef& outRef, 205 PacketType* packet, 206 U32 numElements ) 207 { 208 outRef = new PacketReadItem( this, packet, numElements, this->mThreadContext ); 209 } 210 211 public: 212 213 /// Construct a new packet stream reading from "stream". 214 /// 215 /// @note If looping is used and "stream" is not read from the beginning, "stream" should 216 /// implement IPositionable<U32> or ISizeable<U32> so the async stream can tell how many elements 217 /// there actually are in the stream after resetting. 218 /// 219 /// @param stream The source stream from which to read the actual data elements. 220 /// @param packetSize Size of stream packets returned by the stream in number of elements. 221 /// @param numSourceElementsToRead Number of elements to read from "stream". 222 /// @param numReadAhead Number of packets to read and buffer in advance. 223 /// @param isLooping If true, the packet stream will loop infinitely over the source stream. 224 /// @param pool The ThreadPool to use for asynchronous packet reads. 225 /// @param context The ThreadContext to place asynchronous packet reads in. 226 AsyncPacketBufferedInputStream( const Stream& stream, 227 U32 packetSize, 228 U32 numSourceElementsToRead = 0, 229 U32 numReadAhead = Parent::DEFAULT_STREAM_LOOKAHEAD, 230 bool isLooping = false, 231 ThreadPool* pool = &ThreadPool::GLOBAL(), 232 ThreadContext* context = ThreadContext::ROOT_CONTEXT() ); 233 234 /// @return the size of stream packets returned by this stream in number of elements. 235 U32 getPacketSize() const { return mPacketSize; } 236}; 237 238template< typename Stream, class Packet > 239AsyncPacketBufferedInputStream< Stream, Packet >::AsyncPacketBufferedInputStream 240 ( const Stream& stream, 241 U32 packetSize, 242 U32 numSourceElementsToRead, 243 U32 numReadAhead, 244 bool isLooping, 245 ThreadPool* threadPool, 246 ThreadContext* threadContext ) 247 : Parent( stream, numSourceElementsToRead, numReadAhead, isLooping, threadPool, threadContext ), 248 mPacketSize( packetSize ), 249 mNextPacketIndex( 0 ), 250 mNumTotalSourceElements( numSourceElementsToRead ) 251{ 252 AssertFatal( mPacketSize > 0, 253 "AsyncPacketStream::AsyncPacketStream() - packet size cannot be zero" ); 254 255 // Determine total number of elements in stream, if possible. 256 257 IPositionable< U32>* positionable = dynamic_cast< IPositionable< U32>* >( &Deref( stream ) ); 258 if( positionable ) 259 mNumTotalSourceElements += positionable->getPosition(); 260 else 261 { 262 ISizeable< U32>* sizeable = dynamic_cast< ISizeable< U32>* >( &Deref( stream ) ); 263 if( sizeable ) 264 mNumTotalSourceElements = sizeable->getSize(); 265 } 266 267 #ifdef DEBUG_SPEW 268 Platform::outputDebugString( "[AsyncPacketStream] %i remaining, %i total (%i packets)", 269 this->mNumRemainingSourceElements, mNumTotalSourceElements, 270 ( this->mNumRemainingSourceElements / mPacketSize ) + ( this->mNumRemainingSourceElements % mPacketSize ? 1 : 0 ) ); 271 #endif 272} 273 274template< typename Stream, class Packet > 275void AsyncPacketBufferedInputStream< Stream, Packet >::_requestNext() 276{ 277 Stream& stream = this->getSourceStream(); 278 bool isEOS = !this->mNumRemainingSourceElements; 279 if( isEOS && this->mIsLooping ) 280 { 281 StreamType* s = &Deref( stream ); 282 IResettable* resettable = dynamic_cast< IResettable* >( s ); 283 if( resettable ) 284 { 285 IPositionable< U32>* positionable = dynamic_cast< IPositionable< U32>* >( &Deref( stream ) ); 286 U32 pos; 287 if(positionable) 288 pos = positionable->getPosition(); 289 290 resettable->reset(); 291 isEOS = false; 292 this->mNumRemainingSourceElements = mNumTotalSourceElements; 293 294 if( positionable ) 295 { 296 positionable->setPosition(pos); 297 U32 dur = stream->getDuration(); 298 if(dur != 0) //avoiding division by zero? not needed, probably 299 this->mNumRemainingSourceElements -= (U32)(mNumTotalSourceElements*(F32)pos/dur); 300 } 301 } 302 } 303 else if( isEOS ) 304 return; 305 306 //TODO: scale priority depending on feed status 307 308 // Allocate a packet. 309 310 U32 numElements = mPacketSize; 311 PacketType* packet = _newPacket( numElements ); 312 packet->mIndex = mNextPacketIndex; 313 mNextPacketIndex ++; 314 315 // Queue a stream packet work item. 316 317 if( numElements >= this->mNumRemainingSourceElements ) 318 { 319 if( !this->mIsLooping ) 320 { 321 this->mNumRemainingSourceElements = 0; 322 packet->mIsLast = true; 323 } 324 else 325 this->mNumRemainingSourceElements = ( this->mNumTotalSourceElements - numElements + this->mNumRemainingSourceElements ); 326 } 327 else 328 this->mNumRemainingSourceElements -= numElements; 329 330 #ifdef DEBUG_SPEW 331 Platform::outputDebugString( "[AsyncPacketStream] packet %i, %i remaining, %i total", 332 packet->mIndex, this->mNumRemainingSourceElements, mNumTotalSourceElements ); 333 #endif 334 335 ThreadSafeRef< PacketReadItem> workItem; 336 _newReadItem( workItem, packet, numElements ); 337 this->mThreadPool->queueWorkItem( workItem ); 338} 339 340#undef DEBUG_SPEW 341#endif // !_ASYNCPACKETSTREAM_H_ 342