asyncPacketQueue.h
Engine/source/platform/async/asyncPacketQueue.h
Time-based packet streaming.
Classes:
class
Time-based packet stream queue.
class
Information about the time slice covered by an individual packet currently on the queue.
Detailed Description
Time-based packet streaming.
The classes contained in this file can be used for any kind of continuous playback that depends on discrete samplings of a source stream (i.e. any kind of digital media streaming).
1 2//----------------------------------------------------------------------------- 3// Copyright (c) 2012 GarageGames, LLC 4// 5// Permission is hereby granted, free of charge, to any person obtaining a copy 6// of this software and associated documentation files (the "Software"), to 7// deal in the Software without restriction, including without limitation the 8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9// sell copies of the Software, and to permit persons to whom the Software is 10// furnished to do so, subject to the following conditions: 11// 12// The above copyright notice and this permission notice shall be included in 13// all copies or substantial portions of the Software. 14// 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21// IN THE SOFTWARE. 22//----------------------------------------------------------------------------- 23 24#ifndef _ASYNCPACKETQUEUE_H_ 25#define _ASYNCPACKETQUEUE_H_ 26 27#ifndef _TFIXEDSIZEQUEUE_H_ 28#include "core/util/tFixedSizeDeque.h" 29#endif 30 31#ifndef _TSTREAM_H_ 32#include "core/stream/tStream.h" 33#endif 34 35#ifndef _TYPETRAITS_H_ 36#include "platform/typetraits.h" 37#endif 38 39 40//#define DEBUG_SPEW 41 42 43/// @file 44/// Time-based packet streaming. 45/// 46/// The classes contained in this file can be used for any kind 47/// of continuous playback that depends on discrete samplings of 48/// a source stream (i.e. any kind of digital media streaming). 49 50 51 52//-------------------------------------------------------------------------- 53// Async packet queue. 54//-------------------------------------------------------------------------- 55 56/// Time-based packet stream queue. 57/// 58/// AsyncPacketQueue writes data packets to a consumer stream in sync to 59/// a tick time source. Outdated packets may optionally be dropped automatically 60/// by the queue. A fixed maximum number of packets can reside in the queue 61/// concurrently at any one time. 62/// 63/// Be aware that using single item queues for synchronizing to a timer 64/// will usually result in bad timing behavior when packet uploading takes 65/// any non-trivial amount of time. 66/// 67/// @note While the queue associates a variable tick count with each 68/// individual packet, the queue fill status is measured in number of 69/// packets rather than in total tick time. 70/// 71/// @param Packet Value type of packets passed through this queue. 72/// @param TimeSource Value type for time tick source to which the queue 73/// is synchronized. 74/// @param Consumer Value type of stream to which the packets are written. 75/// 76template< typename Packet, typename TimeSource = IPositionable< U32 >*, typename Consumer = IOutputStream< Packet >*, typename Tick = U32 > 77class AsyncPacketQueue 78{ 79 public: 80 81 typedef void Parent; 82 83 /// The type of data packets being streamed through this queue. 84 typedef typename TypeTraits< Packet >::BaseType PacketType; 85 86 /// The type of consumer that receives the packets from this queue. 87 typedef typename TypeTraits< Consumer >::BaseType ConsumerType; 88 89 /// 90 typedef typename TypeTraits< TimeSource >::BaseType TimeSourceType; 91 92 /// Type for counting ticks. 93 typedef Tick TickType; 94 95 protected: 96 97 /// Information about the time slice covered by an 98 /// individual packet currently on the queue. 99 struct QueuedPacket 100 { 101 /// First tick contained in this packet. 102 TickType mStartTick; 103 104 /// First tick *not* contained in this packet anymore. 105 TickType mEndTick; 106 107 QueuedPacket( TickType start, TickType end ) 108 : mStartTick( start ), mEndTick( end ) {} 109 110 /// Return the total number of ticks in this packet. 111 TickType getNumTicks() const 112 { 113 return ( mEndTick - mStartTick ); 114 } 115 }; 116 117 typedef FixedSizeDeque< QueuedPacket> PacketQueue; 118 119 /// If true, packets that have missed their proper queuing timeframe 120 /// will be dropped. If false, they will be queued nonetheless. 121 bool mDropPackets; 122 123 /// Total number of ticks spanned by the total queue playback time. 124 /// If this is zero, the total queue time is considered to be infinite. 125 TickType mTotalTicks; 126 127 /// Total number of ticks submitted to the queue so far. 128 TickType mTotalQueuedTicks; 129 130 /// Queue that holds records for each packet currently in the queue. New packets 131 /// are added to back. 132 PacketQueue mPacketQueue; 133 134 /// The time source to which we are sync'ing. 135 TimeSource mTimeSource; 136 137 /// The output stream that this queue feeds into. 138 Consumer mConsumer; 139 140 /// Total number of packets queued so far. 141 U32 mTotalQueuedPackets; 142 143 public: 144 145 /// Construct an AsyncPacketQueue of the given length. 146 /// 147 /// @param maxQueuedPackets The length of the queue in packets. Only a maximum of 148 /// 'maxQueuedPackets' packets can be concurrently in the queue at any one time. 149 /// @param timeSource The tick time source to which the queue synchronizes. 150 /// @param consumer The output stream that receives the packets in sync to timeSource. 151 /// @param totalTicks The total number of ticks that will be played back through the 152 /// queue; if 0, the length is considered indefinite. 153 /// @param dropPackets Whether the queue should drop outdated packets; if dropped, a 154 /// packet will not reach the consumer. 155 AsyncPacketQueue( U32 maxQueuedPackets, 156 TimeSource timeSource, 157 Consumer consumer, 158 TickType totalTicks = 0, 159 bool dropPackets = false ) 160 : mDropPackets( dropPackets ), 161 mTotalTicks( totalTicks ), 162 mTotalQueuedTicks( 0 ), 163 mPacketQueue( maxQueuedPackets ), 164 mTimeSource( timeSource ), 165 mConsumer( consumer ) 166 167 { 168 mTotalQueuedPackets = 0; 169 } 170 171 /// Return true if there are currently 172 bool isEmpty() const { return mPacketQueue.isEmpty(); } 173 174 /// Return true if all packets have been streamed. 175 bool isAtEnd() const; 176 177 /// Return true if the queue needs one or more new packets to be submitted. 178 bool needPacket(); 179 180 /// Submit a data packet to the queue. 181 /// 182 /// @param packet The data packet. 183 /// @param packetTicks The duration of the packet in ticks. 184 /// @param isLast If true, the packet is the last one in the stream. 185 /// @param packetPos The absolute position of the packet in the stream; if this is not supplied 186 /// the packet is assumed to immediately follow the preceding packet. 187 /// 188 /// @return true if the packet has been queued or false if it has been dropped. 189 bool submitPacket( Packet packet, 190 TickType packetTicks, 191 bool isLast = false, 192 TickType packetPos = TypeTraits< TickType >::MAX ); 193 194 /// Return the current playback position according to the time source. 195 TickType getCurrentTick() const { return Deref( mTimeSource ).getPosition(); } 196 197 /// Return the total number of ticks that have been queued so far. 198 TickType getTotalQueuedTicks() const { return mTotalQueuedTicks; } 199 200 /// Return the total number of packets that have been queued so far. 201 U32 getTotalQueuedPackets() const { return mTotalQueuedPackets; } 202}; 203 204template< typename Packet, typename TimeSource, typename Consumer, typename Tick > 205inline bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::isAtEnd() const 206{ 207 // Never at end if infinite. 208 209 if( !mTotalTicks ) 210 return false; 211 212 // Otherwise, we're at end if we're past the total tick count. 213 214 return ( getCurrentTick() >= mTotalTicks 215 && ( mDropPackets || mTotalQueuedTicks >= mTotalTicks ) ); 216} 217 218template< typename Packet, typename TimeSource, typename Consumer, typename Tick > 219bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::needPacket() 220{ 221 // Never need more packets once we have reached the 222 // end. 223 224 if( isAtEnd() ) 225 return false; 226 227 // Always needs packets while the queue is not 228 // filled up completely. 229 230 if( mPacketQueue.capacity() != 0 ) 231 return true; 232 233 // Unqueue packets that have expired their playtime. 234 235 TickType currentTick = getCurrentTick(); 236 while( mPacketQueue.size() && currentTick >= mPacketQueue.front().mEndTick ) 237 { 238 #ifdef DEBUG_SPEW 239 Platform::outputDebugString( "[AsyncPacketQueue] expired packet #%i: %i-%i (tick: %i; queue: %i)", 240 mTotalQueuedPackets - mPacketQueue.size(), 241 U32( mPacketQueue.front().mStartTick ), 242 U32( mPacketQueue.front().mEndTick ), 243 U32( currentTick ), 244 mPacketQueue.size() ); 245 #endif 246 247 mPacketQueue.popFront(); 248 } 249 250 // Need more packets if the queue isn't full anymore. 251 252 return ( mPacketQueue.capacity() != 0 ); 253} 254 255template< typename Packet, typename TimeSource, typename Consumer, typename Tick > 256bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::submitPacket( Packet packet, TickType packetTicks, bool isLast, TickType packetPos ) 257{ 258 AssertFatal( mPacketQueue.capacity() != 0, 259 "AsyncPacketQueue::submitPacket() - Queue is full!" ); 260 261 TickType packetStartPos; 262 TickType packetEndPos; 263 264 if( packetPos != TypeTraits< TickType >::MAX ) 265 { 266 packetStartPos = packetPos; 267 packetEndPos = packetPos + packetTicks; 268 } 269 else 270 { 271 packetStartPos = mTotalQueuedTicks; 272 packetEndPos = mTotalQueuedTicks + packetTicks; 273 } 274 275 // Check whether the packet is outdated, if enabled. 276 277 bool dropPacket = false; 278 if( mDropPackets ) 279 { 280 TickType currentTick = getCurrentTick(); 281 if( currentTick >= packetEndPos ) 282 dropPacket = true; 283 } 284 285 #ifdef DEBUG_SPEW 286 Platform::outputDebugString( "[AsyncPacketQueue] new packet #%i: %i-%i (ticks: %i, current: %i, queue: %i)%s", 287 mTotalQueuedPackets, 288 U32( mTotalQueuedTicks ), 289 U32( packetEndPos ), 290 U32( packetTicks ), 291 U32( getCurrentTick() ), 292 mPacketQueue.size(), 293 dropPacket ? " !! DROPPED !!" : "" ); 294 #endif 295 296 // Queue the packet. 297 298 if( !dropPacket ) 299 { 300 mPacketQueue.pushBack( QueuedPacket( packetStartPos, packetEndPos ) ); 301 Deref( mConsumer ).write( &packet, 1 ); 302 } 303 304 mTotalQueuedTicks = packetEndPos; 305 if( isLast && !mTotalTicks ) 306 mTotalTicks = mTotalQueuedTicks; 307 308 mTotalQueuedPackets ++; 309 310 return !dropPacket; 311} 312 313#undef DEBUG_SPEW 314#endif // _ASYNCPACKETQUEUE_H_ 315