Torque3D Documentation / _generateds / asyncPacketQueue.h

asyncPacketQueue.h

Engine/source/platform/async/asyncPacketQueue.h

Time-based packet streaming.

More...

Classes:

class

Time-based packet stream queue.

class

Information about the time slice covered by an individual packet currently on the queue.

Detailed Description

Time-based packet streaming.

The classes contained in this file can be used for any kind of continuous playback that depends on discrete samplings of a source stream (i.e. any kind of digital media streaming).

  1
  2//-----------------------------------------------------------------------------
  3// Copyright (c) 2012 GarageGames, LLC
  4//
  5// Permission is hereby granted, free of charge, to any person obtaining a copy
  6// of this software and associated documentation files (the "Software"), to
  7// deal in the Software without restriction, including without limitation the
  8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9// sell copies of the Software, and to permit persons to whom the Software is
 10// furnished to do so, subject to the following conditions:
 11//
 12// The above copyright notice and this permission notice shall be included in
 13// all copies or substantial portions of the Software.
 14//
 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 21// IN THE SOFTWARE.
 22//-----------------------------------------------------------------------------
 23
 24#ifndef _ASYNCPACKETQUEUE_H_
 25#define _ASYNCPACKETQUEUE_H_
 26
 27#ifndef _TFIXEDSIZEQUEUE_H_
 28#include "core/util/tFixedSizeDeque.h"
 29#endif
 30
 31#ifndef _TSTREAM_H_
 32#include "core/stream/tStream.h"
 33#endif
 34
 35#ifndef _TYPETRAITS_H_
 36#include "platform/typetraits.h"
 37#endif
 38
 39
 40//#define DEBUG_SPEW
 41
 42
 43/// @file
 44/// Time-based packet streaming.
 45///
 46/// The classes contained in this file can be used for any kind
 47/// of continuous playback that depends on discrete samplings of
 48/// a source stream (i.e. any kind of digital media streaming).
 49
 50
 51
 52//--------------------------------------------------------------------------
 53//    Async packet queue.
 54//--------------------------------------------------------------------------
 55
 56/// Time-based packet stream queue.
 57///
 58/// AsyncPacketQueue writes data packets to a consumer stream in sync to
 59/// a tick time source.  Outdated packets may optionally be dropped automatically
 60/// by the queue.  A fixed maximum number of packets can reside in the queue
 61/// concurrently at any one time.
 62///
 63/// Be aware that using single item queues for synchronizing to a timer
 64/// will usually result in bad timing behavior when packet uploading takes
 65/// any non-trivial amount of time.
 66///
 67/// @note While the queue associates a variable tick count with each
 68///   individual packet, the queue fill status is measured in number of
 69///   packets rather than in total tick time.
 70///
 71/// @param Packet Value type of packets passed through this queue.
 72/// @param TimeSource Value type for time tick source to which the queue
 73///   is synchronized.
 74/// @param Consumer Value type of stream to which the packets are written.
 75///
 76template< typename Packet, typename TimeSource = IPositionable< U32 >*, typename Consumer = IOutputStream< Packet >*, typename Tick = U32 >
 77class AsyncPacketQueue
 78{
 79   public:
 80
 81      typedef void Parent;
 82
 83      /// The type of data packets being streamed through this queue.
 84      typedef typename TypeTraits< Packet >::BaseType PacketType;
 85
 86      /// The type of consumer that receives the packets from this queue.
 87      typedef typename TypeTraits< Consumer >::BaseType ConsumerType;
 88
 89      ///
 90      typedef typename TypeTraits< TimeSource >::BaseType TimeSourceType;
 91      
 92      /// Type for counting ticks.
 93      typedef Tick TickType;
 94
 95   protected:
 96
 97      /// Information about the time slice covered by an
 98      /// individual packet currently on the queue.
 99      struct QueuedPacket
100      {
101         /// First tick contained in this packet.
102         TickType mStartTick;
103
104         /// First tick *not* contained in this packet anymore.
105         TickType mEndTick;
106
107         QueuedPacket( TickType start, TickType end )
108            : mStartTick( start ), mEndTick( end ) {}
109
110         /// Return the total number of ticks in this packet.
111         TickType getNumTicks() const
112         {
113            return ( mEndTick - mStartTick );
114         }
115      };
116
117      typedef FixedSizeDeque< QueuedPacket> PacketQueue;
118
119      /// If true, packets that have missed their proper queuing timeframe
120      /// will be dropped.  If false, they will be queued nonetheless.
121      bool mDropPackets;
122
123      /// Total number of ticks spanned by the total queue playback time.
124      /// If this is zero, the total queue time is considered to be infinite.
125      TickType mTotalTicks;
126
127      /// Total number of ticks submitted to the queue so far.
128      TickType mTotalQueuedTicks;
129
130      /// Queue that holds records for each packet currently in the queue.  New packets
131      /// are added to back.
132      PacketQueue mPacketQueue;
133
134      /// The time source to which we are sync'ing.
135      TimeSource mTimeSource;
136
137      /// The output stream that this queue feeds into.
138      Consumer mConsumer;
139
140      /// Total number of packets queued so far.
141      U32 mTotalQueuedPackets;
142      
143   public:
144
145      /// Construct an AsyncPacketQueue of the given length.
146      ///
147      /// @param maxQueuedPackets The length of the queue in packets.  Only a maximum of
148      ///   'maxQueuedPackets' packets can be concurrently in the queue at any one time.
149      /// @param timeSource The tick time source to which the queue synchronizes.
150      /// @param consumer The output stream that receives the packets in sync to timeSource.
151      /// @param totalTicks The total number of ticks that will be played back through the
152      ///   queue; if 0, the length is considered indefinite.
153      /// @param dropPackets Whether the queue should drop outdated packets; if dropped, a
154      ///   packet will not reach the consumer.
155      AsyncPacketQueue(    U32 maxQueuedPackets,
156                           TimeSource timeSource,
157                           Consumer consumer,
158                           TickType totalTicks = 0,
159                           bool dropPackets = false )
160         : mDropPackets( dropPackets ),
161           mTotalTicks( totalTicks ),
162           mTotalQueuedTicks( 0 ),
163           mPacketQueue( maxQueuedPackets ),
164           mTimeSource( timeSource ),
165           mConsumer( consumer )
166
167      {
168         mTotalQueuedPackets = 0;
169      }
170
171      /// Return true if there are currently
172      bool isEmpty() const { return mPacketQueue.isEmpty(); }
173
174      /// Return true if all packets have been streamed.
175      bool isAtEnd() const;
176
177      /// Return true if the queue needs one or more new packets to be submitted.
178      bool needPacket();
179
180      /// Submit a data packet to the queue.
181      ///
182      /// @param packet The data packet.
183      /// @param packetTicks The duration of the packet in ticks.
184      /// @param isLast If true, the packet is the last one in the stream.
185      /// @param packetPos The absolute position of the packet in the stream; if this is not supplied
186      ///   the packet is assumed to immediately follow the preceding packet.
187      ///
188      /// @return true if the packet has been queued or false if it has been dropped.
189      bool submitPacket(   Packet packet,
190                           TickType packetTicks,
191                           bool isLast = false,
192                           TickType packetPos = TypeTraits< TickType >::MAX );
193
194      /// Return the current playback position according to the time source.
195      TickType getCurrentTick() const { return Deref( mTimeSource ).getPosition(); }
196
197      /// Return the total number of ticks that have been queued so far.
198      TickType getTotalQueuedTicks() const { return mTotalQueuedTicks; }
199      
200      /// Return the total number of packets that have been queued so far.
201      U32 getTotalQueuedPackets() const { return mTotalQueuedPackets; }
202};
203
204template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
205inline bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::isAtEnd() const
206{
207   // Never at end if infinite.
208
209   if( !mTotalTicks )
210      return false;
211
212   // Otherwise, we're at end if we're past the total tick count.
213
214   return ( getCurrentTick() >= mTotalTicks
215            && ( mDropPackets || mTotalQueuedTicks >= mTotalTicks ) );
216}
217
218template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
219bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::needPacket()
220{
221   // Never need more packets once we have reached the
222   // end.
223
224   if( isAtEnd() )
225      return false;
226
227   // Always needs packets while the queue is not
228   // filled up completely.
229
230   if( mPacketQueue.capacity() != 0 )
231      return true;
232
233   // Unqueue packets that have expired their playtime.
234
235   TickType currentTick = getCurrentTick();
236   while( mPacketQueue.size() && currentTick >= mPacketQueue.front().mEndTick )
237   {
238      #ifdef DEBUG_SPEW
239      Platform::outputDebugString( "[AsyncPacketQueue] expired packet #%i: %i-%i (tick: %i; queue: %i)",
240         mTotalQueuedPackets - mPacketQueue.size(),
241         U32( mPacketQueue.front().mStartTick ),
242         U32( mPacketQueue.front().mEndTick ),
243         U32( currentTick ),
244         mPacketQueue.size() );
245      #endif
246      
247      mPacketQueue.popFront();
248   }
249
250   // Need more packets if the queue isn't full anymore.
251
252   return ( mPacketQueue.capacity() != 0 );
253}
254
255template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
256bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::submitPacket( Packet packet, TickType packetTicks, bool isLast, TickType packetPos )
257{
258   AssertFatal( mPacketQueue.capacity() != 0,
259      "AsyncPacketQueue::submitPacket() - Queue is full!" );
260
261   TickType packetStartPos;
262   TickType packetEndPos;
263   
264   if( packetPos != TypeTraits< TickType >::MAX )
265   {
266      packetStartPos = packetPos;
267      packetEndPos = packetPos + packetTicks;
268   }
269   else
270   {
271      packetStartPos = mTotalQueuedTicks;
272      packetEndPos = mTotalQueuedTicks + packetTicks;
273   }
274
275   // Check whether the packet is outdated, if enabled.
276
277   bool dropPacket = false;
278   if( mDropPackets )
279   {
280      TickType currentTick = getCurrentTick();
281      if( currentTick >= packetEndPos )
282         dropPacket = true;
283   }
284
285   #ifdef DEBUG_SPEW
286   Platform::outputDebugString( "[AsyncPacketQueue] new packet #%i: %i-%i (ticks: %i, current: %i, queue: %i)%s",
287      mTotalQueuedPackets,
288      U32( mTotalQueuedTicks ),
289      U32( packetEndPos ),
290      U32( packetTicks ),
291      U32( getCurrentTick() ),
292      mPacketQueue.size(),
293      dropPacket ? " !! DROPPED !!" : "" );
294   #endif
295
296   // Queue the packet.
297
298   if( !dropPacket )
299   {
300      mPacketQueue.pushBack( QueuedPacket( packetStartPos, packetEndPos ) );
301      Deref( mConsumer ).write( &packet, 1 );
302   }
303
304   mTotalQueuedTicks = packetEndPos;
305   if( isLast && !mTotalTicks )
306      mTotalTicks = mTotalQueuedTicks;
307      
308   mTotalQueuedPackets ++;
309   
310   return !dropPacket;
311}
312
313#undef DEBUG_SPEW
314#endif // _ASYNCPACKETQUEUE_H_
315