Torque3D Documentation / _generateds / threadSafePriorityQueue.h

threadSafePriorityQueue.h

Engine/source/platform/threads/threadSafePriorityQueue.h

Template code for an efficient thread-safe priority queue implementation.

More...

Classes:

class

Fast, lock-free priority queue implementation for concurrent access.

class

Fast, lock-free priority queue implementation for concurrent access that performs dynamic re-prioritization of items.

Detailed Description

Template code for an efficient thread-safe priority queue implementation.

There are two alternative implementations to choose from: ThreadSafePriorityQueue and ThreadSafePriorityQueueWithUpdate where the latter adds concurrent status updates of queue items to the former implementation.

  1
  2//-----------------------------------------------------------------------------
  3// Copyright (c) 2012 GarageGames, LLC
  4//
  5// Permission is hereby granted, free of charge, to any person obtaining a copy
  6// of this software and associated documentation files (the "Software"), to
  7// deal in the Software without restriction, including without limitation the
  8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9// sell copies of the Software, and to permit persons to whom the Software is
 10// furnished to do so, subject to the following conditions:
 11//
 12// The above copyright notice and this permission notice shall be included in
 13// all copies or substantial portions of the Software.
 14//
 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 21// IN THE SOFTWARE.
 22//-----------------------------------------------------------------------------
 23
 24#ifndef _THREADSAFEPRIORITYQUEUE_H_
 25#define _THREADSAFEPRIORITYQUEUE_H_
 26
 27#ifndef _PLATFORMINTRINSICS_H_
 28   #include "platform/platformIntrinsics.h"
 29#endif
 30#ifndef _THREADSAFEREFCOUNT_H_
 31   #include "platform/threads/threadSafeRefCount.h"
 32#endif
 33#ifndef _TYPETRAITS_H_
 34   #include "platform/typetraits.h"
 35#endif
 36
 37
 38// Disable TMM's new operator grabbing.
 39#include "platform/tmm_off.h"
 40
 41
 42//#define DEBUG_SPEW
 43
 44
 45/// @file
 46/// Template code for an efficient thread-safe priority queue
 47/// implementation.  There are two alternative implementations to
 48/// choose from: ThreadSafePriorityQueue and ThreadSafePriorityQueueWithUpdate
 49/// where the latter adds concurrent status updates of queue items to
 50/// the former implementation.
 51
 52
 53//--------------------------------------------------------------------------
 54//    ThreadSafePriorityQueue.
 55//--------------------------------------------------------------------------
 56
 57/// Fast, lock-free priority queue implementation for concurrent access.
 58///
 59/// Equal priorities are allowed and are placed <em>before</em> existing items of
 60/// identical priority in the queue.
 61///
 62/// Based on (but with significant deviations from) "Fast and Lock-Free Concurrent
 63/// Priority Queues for Multi-Thread Systems" by Hakan Sundell and Philippas Tsigas.
 64/// Parts of the skiplist code is based on work by William Pugh.
 65///
 66/// @param T The item value type.  Must have a default constructor.
 67/// @param K The priority key type.  Must be comparable, have a default constructor,
 68///     and be a valid template parameter to TypeTraits.
 69/// @param SORT_MIN_TO_MAX If true, the queue sorts from minimum to maximum priority or
 70///     the reverse if false.
 71/// @param MAX_LEVEL The number of levels a node can have at most.
 72/// @param PROBABILISTIC_BIAS The probabilistic level distribution factor for
 73///     the skiplist.  Multiplied by 100 and turned into int to conform to restrictions
 74///     on non-type template parameters.
 75///
 76/// @see TypeTraits
 77
 78template< typename T, typename K = F32, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
 79struct ThreadSafePriorityQueue
 80{
 81   typedef T ValueType;
 82   typedef K KeyType;
 83
 84   enum { MAX_LEVEL_CONST = MAX_LEVEL };
 85
 86   ThreadSafePriorityQueue();
 87
 88   bool              isEmpty();
 89   void              insert( KeyType priority, const T& value );
 90   bool              takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
 91
 92protected:
 93   struct Node;
 94   typedef ThreadSafeRef< Node> NodePtr;
 95   friend class ThreadSafeRefCount< Node >;
 96   friend struct DeleteSingle;
 97   
 98   /// A queue node.
 99   ///
100   /// Nodes are reference-counted to coordinate memory management
101   /// between the different threads.  Reclamation happens on the
102   /// thread that releases the last reference.
103   ///
104   /// Reference-counting and deletion requests are kept separate.
105   /// A given node is marked for deletion and will then have its references
106   /// progressively disappear and eventually be reclaimed once the
107   /// reference count drops to zero.  
108   ///
109   /// Note that 'Next' references are released by the destructor which
110   /// is only called when the reference count to the node itself drops to
111   /// zero.  This is to avoid threads getting trapped in a node with no
112   /// link out.
113
114   struct Node : public ThreadSafeRefCount< Node >
115   {
116      typedef ThreadSafeRefCount< Node> Parent;
117
118      Node( KeyType priority, const ValueType& value );
119      ~Node();
120
121      KeyType        getPriority()                 { return mPriority; }
122      ValueType&     getValue()                    { return mValue; }
123      U32            getLevel();
124      NodePtr&       getNext( U32 level );
125
126      bool           isMarkedForDeletion();
127      bool           tryMarkForDeletion();
128      
129      void           clearValue()                  { mValue = ValueType(); }
130
131      static U32     randomLevel();
132
133      void*          operator new( size_t size, S32 level = -1 );
134      void           operator delete( void* ptr );
135
136   private:
137      KeyType        mPriority;     ///< Priority key.
138      U32            mLevel;        ///< Level count and deletion bit (highest).
139      ValueType      mValue;
140      Node*          mNext[ 1 ];    ///< Variable-sized array of next pointers.
141
142      struct FreeList
143      {
144         bool        mDestroyed;
145         Node*       mNodes;
146
147         ~FreeList();
148      };
149
150      static FreeList smFreeLists[ MAX_LEVEL ];
151   };
152
153   NodePtr           mHead;         ///< Artificial head node.
154   NodePtr           mTail;         ///< Artificial tail node.
155
156   void              readNext( NodePtr& refPrev, NodePtr& refNext, U32 level );
157   void              scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
158   void              scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
159   void              insert( KeyType priority, const T& value, NodePtr& outResult );
160   void              helpDelete();
161};
162
163template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
164typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::smFreeLists[ MAX_LEVEL ];
165
166/// Construct an empty queue.
167///
168/// Internally, this creates a head node with maximal priority and a tail node with minimal priority,
169/// both at maximum level.
170
171template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
172ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueue()
173{
174   NodePtr::unsafeWrite( mHead, new ( MAX_LEVEL - 1 )
175      Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MIN : TypeTraits< KeyType >::MAX, ValueType() ) );
176   NodePtr::unsafeWrite( mTail, new ( MAX_LEVEL - 1 )
177      Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN, ValueType() ) );
178
179   for( U32 level = 0; level < MAX_LEVEL; level ++ )
180      mHead->getNext( level ) = mTail;
181}
182
183/// Return true if the queue does not currently contain an item.
184
185template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
186bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::isEmpty()
187{
188   return ( mHead->getNext( 0 ) == mTail );
189}
190
191/// Insert the given value into the queue at the place determined by the given priority.
192
193template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
194inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
195{
196   NodePtr result;
197   insert( priority, value, result );
198}
199
200template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
201void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value, NodePtr& outResult )
202{
203   // Create a new node at a random level.
204
205   outResult = NULL;
206   NodePtr::unsafeWrite( outResult, new Node( priority, value ) );
207   U32 resultNodeLevel = outResult->getLevel();
208
209   // Link up all the levels.  Do this bottom-up instead of
210   // top-down (as would be the right way for a skiplist) so
211   // that our list state always remains valid.  If going top-down,
212   // we'll insert nodes with NULL pointers at their lower levels.
213
214   U32 currentLevel = 0;
215   do 
216   {
217      while( 1 )
218      {
219         NodePtr nextNode;
220         NodePtr prevNode;
221         
222         scanFromHead( prevNode, nextNode, currentLevel, priority );        
223
224         outResult->getNext( currentLevel ) = nextNode;
225         if( prevNode->getNext( currentLevel ).trySetFromTo( nextNode, outResult, NodePtr::TAG_FailIfSet ) )
226            break;
227         else
228            outResult->getNext( currentLevel ) = 0;
229      }
230
231      currentLevel ++;
232   }
233   while(    currentLevel <= resultNodeLevel
234          && !outResult->isMarkedForDeletion() ); // No point linking up remaining levels if another thread already took this node.
235}
236
237/// Take the item with the highest priority from the queue.
238///
239/// @param outValue Reference to where the resulting value should be stored.
240/// @param upToPriority Priority limit (inclusive) up to which items are taken from the queue.
241/// @return true if there was a matching item in the queue.
242
243template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
244bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
245{
246   // Iterate through to the first unmarked node.
247
248   NodePtr prevNode = mHead;
249   while( 1 )
250   {
251      NodePtr node;
252      readNext( prevNode, node, 0 );
253
254      if( node == mTail )
255         return false; // End reached.
256
257      bool priorityThresholdReached = SORT_MIN_TO_MAX
258         ? ( upToPriority >= node->getPriority() )
259         : ( upToPriority <= node->getPriority() );
260
261      if( !priorityThresholdReached )
262         return false;
263      else
264      {
265         // Try to mark the node for deletion.  Only if that succeeds, taking the
266         // node was a success and we can return.  If it fails, spin and try again.
267
268         if( node->tryMarkForDeletion() )
269         {
270            helpDelete();
271
272            // Node is now off the list and will disappear as soon as
273            // all references held by threads (including this one)
274            // go out of scope.
275
276            outValue = node->getValue();
277            node->clearValue();
278
279            return true;
280         }
281      }
282   }
283}
284
285/// Update the given references to the next non-deleted node at the given level.
286/// refPrev will be updated to reference the immediate predecessor of the next
287/// node returned.  Note that this can be a node in deleted state.
288///
289/// @param refPrev Reference to a node of which the successor node should be
290///    returned.  Updated to immediate predecessor of refNext on return.
291/// @param refNext Reference to update to refer to next non-deleted node on
292///    the given level.
293/// @param level Skiplist level to operate on.
294
295template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
296inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::readNext( NodePtr& refPrev, NodePtr& refNext, U32 level )
297{
298   while( 1 )
299   {
300      refNext = refPrev->getNext( level );
301      AssertFatal( refNext != NULL, "ThreadSafePriorityQueue::readNext() - next is NULL" );
302      if( !refNext->isMarkedForDeletion() || refNext == mTail )
303         break;
304
305      refPrev = refNext;
306   }
307}
308
309/// Scan for the position at which to insert a node of the given priority.
310/// Upon return, the position between refPrev and refNext is the one to insert at.
311///
312/// @param refPrev position at which to start scanning; updated to match insert position.
313/// @param refNext
314
315template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
316void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
317{
318   while( 1 )
319   {
320      readNext( refPrev, refNext, level );
321      if( refNext == mTail
322         || ( SORT_MIN_TO_MAX
323            ? ( refNext->getPriority() > priority )
324            : ( refNext->getPriority() < priority ) ) )
325         break;
326
327      refPrev = refNext;
328   }
329}
330
331///
332
333template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
334void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
335{
336   // Purge dead nodes at left end of queue so
337   // we don't get stuck hitting the same node
338   // in deletable state over and over again.
339   helpDelete();
340
341   S32 currentLevel = MAX_LEVEL - 1;
342   refPrev = mHead;
343   do
344   {
345      scan( refPrev, refNext, currentLevel, priority );
346      currentLevel --;
347   }
348   while( currentLevel >= S32( level ) );
349}
350
351template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
352void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::helpDelete()
353{
354   // Clean out all the references from head.
355   // Spin over a given reference on each level until head
356   // clearly refers to a node in non-deletable state.  This
357   // makes this code work cooperatively with other threads
358   // doing takeNexts on prior or later nodes while also
359   // guaranteeing that all next pointers to us will eventually
360   // disappear.
361   //
362   // Note that this is *the only place* where we will be cleaning
363   // out our lists.
364
365   S32 level = MAX_LEVEL - 1;
366   do
367   {
368      while( 1 )
369      {
370         NodePtr ptr = mHead->getNext( level );
371         if( !ptr->isMarkedForDeletion() )
372            break;
373         else
374         {
375            NodePtr& next = ptr->getNext( level );
376            next.setTag();
377            mHead->getNext( level ).trySetFromTo( ptr, next, NodePtr::TAG_Unset );
378            AssertFatal( next->getRefCount() >= 2, "ThreadSafePriorityQueue::helpDelete() - invalid refcount" );
379         }
380      }
381
382      level --;
383   }
384   while( level >= 0 );
385}
386
387template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
388inline ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::Node( KeyType priority, const ValueType& value )
389   : Parent( false ),
390     mPriority( priority ),
391     mValue( value )
392{
393   dMemset( mNext, 0, sizeof( Node* ) * ( getLevel() + 1 ) );
394
395   // Level is already set by the allocation routines.
396}
397
398template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
399ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::~Node()
400{
401   for( U32 level = 0; level < ( getLevel() + 1 ); level ++ )
402      getNext( level ) = NULL;
403}
404
405/// Return the skip list level the node is at.
406
407template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
408inline U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getLevel()
409{
410   // Mask out the deletion request bit.
411
412   return ( mLevel & 0x7FFFFFFF );
413}
414
415/// Return the successor node at the given level.
416/// @param level The level of the desired successor node; must be within the node's level bounds.
417
418template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
419inline typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::NodePtr& ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getNext( U32 level )
420{
421   return *reinterpret_cast< NodePtr* >( &mNext[ level ] );
422}
423
424/// Return true if the node is marked to be deleted.
425
426template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
427inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::isMarkedForDeletion()
428{
429   return ( mLevel & 0x80000000 );
430}
431
432/// Attempt to mark the node for deletion.  If the mark bit has not yet been set
433/// and setting it on the current thread succeeds, returns true.
434///
435/// @return true, if the marking succeeded.
436
437template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
438inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::tryMarkForDeletion()
439{
440   U32 oldVal = mLevel & 0x7FFFFFFF;
441   U32 newVal = oldVal | 0x80000000;
442
443   return ( dCompareAndSwap( mLevel, oldVal, newVal ) );
444}
445
446/// Choose a random level.
447///
448/// The chosen level depends on the given PROBABILISTIC_BIAS and MAX_LEVEL,
449/// but is not affected by the actual number of nodes in a queue.
450
451template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
452U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::randomLevel()
453{
454   U32 level = 0;
455   while( Platform::getRandom() < ( ( ( F32 ) PROBABILISTIC_BIAS ) / 100 ) && level < ( MAX_LEVEL - 1 ) )
456      level ++;
457   return level;
458}
459
460/// Allocate a new node.
461/// The node comes with a reference count of one and its level already set.
462///
463/// @param level The level to allocate the node at.  If this is -1, a random level is chosen.
464/// @return a new node.
465
466template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
467void* ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator new( size_t size, S32 level )
468{
469   if( level == -1 )
470      level = randomLevel();
471
472   Node* node = 0;
473   while( 1 )
474   {
475      // Try to take a node from the freelist.  If there's none,
476      // allocate a new one.
477
478      if( !smFreeLists[ level ].mDestroyed )
479         node = Node::safeRead( smFreeLists[ level ].mNodes );
480
481      if( !node )
482      {
483         node = ( Node* ) dMalloc( sizeof( Node ) + sizeof( Node* ) * level );
484         dMemset( node, 0, sizeof( Node ) );
485         node->mLevel = level;
486         node->addRef();
487         break;
488      }
489      else if( dCompareAndSwap( smFreeLists[ level ].mNodes, node, node->mNext[ 0 ] ) )
490      {
491         node->clearLowestBit();
492         break;
493      }
494      else
495         node->release(); // Other thread was quicker than us; release.
496   }
497
498   AssertFatal( node->getRefCount() != 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
499   AssertFatal( ( node->getRefCount() % 2 ) == 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
500   return node;
501}
502
503/// Reclaim a node.
504///
505/// @param node The node to reclaim.  Must refer to a Node instance.
506
507template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
508void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator delete( void* ptr )
509{
510   //TODO: limit number of nodes kept
511
512   Node* node = ( Node* ) ptr;
513   U32 level = node->getLevel();
514   node->mLevel = level; // Reset the node's deletion bit.
515
516   while( !smFreeLists[ level ].mDestroyed )
517   {
518      // Put the node on the freelist.
519
520      Node* freeList = smFreeLists[ level ].mNodes;
521      node->mNext[ 0 ] = freeList;
522      
523      if( dCompareAndSwap( smFreeLists[ level ].mNodes, freeList, node ) )
524      {
525         node = NULL;
526         break;
527      }
528   }
529   
530   if( node )
531      dFree( node );
532}
533
534template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
535ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList::~FreeList()
536{
537   mDestroyed = true;
538   while( mNodes )
539   {
540      //FIXME: could leak some bytes under unfortunate circumstances (this in
541      //   combination with mDestroyed is a dependent write)
542
543      Node* next = mNodes;
544      if( dCompareAndSwap( mNodes, next, next->mNext[ 0 ] ) )
545         dFree( next );
546   }
547}
548
549//--------------------------------------------------------------------------
550//    ThreadSafePriorityQueueWithUpdate.
551//--------------------------------------------------------------------------
552
553/// Fast, lock-free priority queue implementation for concurrent access that
554/// performs dynamic re-prioritization of items.
555///
556/// Within the bounds of a set update interval UPDATE_INTERVAL, the takeNext
557/// method is guaranteed to always return the item that has the highest priority
558/// at the time the method is called rather than at the time items were inserted
559/// into the queue.
560///
561/// Values placed on the queue must implement the following interface:
562///
563/// @code
564/// template&lt; typename K >
565/// struct IThreadSafePriorityQueueItem
566/// {
567///    // Default constructor.
568///    IThreadSafePriorityQueueItem();
569///
570///    // Return the current priority.
571///    // This must run normally even if the item is already dead.
572///    K getPriority();
573///
574///    // Return true if the item is still meant to be waiting in the queue.
575///    bool isAlive();
576/// };
577/// @endcode
578
579template< typename T, typename K, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
580struct ThreadSafePriorityQueueWithUpdate : public ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >
581{
582
583   typedef T ValueType;
584   typedef K KeyType;
585
586   enum { DEFAULT_UPDATE_INTERVAL = 256 };
587
588   ThreadSafePriorityQueueWithUpdate( U32 updateInterval = DEFAULT_UPDATE_INTERVAL );
589
590   void              insert( KeyType priority, const T& value );
591   bool              takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
592
593   U32               getUpdateInterval() const;
594   void              setUpdateInterval( U32 value );
595
596   KeyType           getTimeBasedPriorityBoost() const;
597   void              setTimeBasedPriorityBoost( KeyType value );
598
599   void              updatePriorities();
600
601protected:
602   typedef ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS> Parent;
603   typedef U32 TickType;
604   typedef typename Parent::NodePtr NodePtr;
605
606   U32               mUpdateInterval;
607   KeyType           mPriorityBoost;      ///< If this is non-zero, priorities will be boosted by this amount each update.  This can be used to prevent constant high-priority inserts to starve low-priority items already in the queue.
608
609   /// Work queue for node updates.
610   ThreadSafePriorityQueue< NodePtr, TickType, true, MAX_LEVEL, PROBABILISTIC_BIAS> mUpdateQueue;
611
612   TickType          getTick()            { return Platform::getRealMilliseconds(); }
613};
614
615template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
616ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueueWithUpdate( U32 updateInterval )
617   : mUpdateInterval( updateInterval ),
618     mPriorityBoost( TypeTraits< KeyType >::ZERO )
619{
620}
621
622/// Return the current update interval in milliseconds.
623
624template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
625U32 ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getUpdateInterval() const
626{
627   return mUpdateInterval;
628}
629
630/// Set update interval of queue to given value.
631///
632/// <em>Call this method on the main thread only.</em>
633///
634/// @param value Time between priority updates in milliseconds.
635
636template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
637void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setUpdateInterval( U32 value )
638{
639   mUpdateInterval = value;
640}
641
642/// Return the delta to apply to priorities on each update.
643/// Set to zero to deactivate time-based priority adjustments.
644
645template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
646K ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getTimeBasedPriorityBoost() const
647{
648   return mPriorityBoost;
649}
650
651/// Set the delta for time-based priority adjustments to the given value.
652///
653/// <em>Call this method on the main thread only.</em>
654///
655/// @param value The new priority adjustment value.
656
657template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
658void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setTimeBasedPriorityBoost( KeyType value )
659{
660   mPriorityBoost = value;
661}
662
663template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
664void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
665{
666   NodePtr node;
667   Parent::insert( priority, value, node );
668   mUpdateQueue.insert( getTick() + getUpdateInterval(), node );
669}
670
671template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
672bool ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
673{
674   updatePriorities();
675
676   bool result = false;
677   do
678   {
679      result = Parent::takeNext( outValue, upToPriority );
680   }
681   while( result && !outValue.isAlive() );
682   
683   return result;
684}
685
686///
687
688template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
689void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::updatePriorities()
690{
691   TickType currentTime       = getTick();
692   U32      numNodesUpdated   = 0;
693   U32      numNodesDead      = 0;
694   U32      numNodesChanged   = 0;
695
696   NodePtr node;
697   while( mUpdateQueue.takeNext( node, currentTime ) )
698   {
699      numNodesUpdated ++;
700
701      // Since we're updating nodes on the update queue only periodically,
702      // their associated values or main queue nodes may have died in the
703      // meantime.  If so, we just discard them here.
704
705      if( node->getValue().isAlive()
706          && !node->isMarkedForDeletion() )
707      {
708         KeyType newPriority = node->getValue().getPriority() + getTimeBasedPriorityBoost();
709         if( newPriority != node->getPriority() )
710         {
711            // Node is outdated.  Reinsert with new priority and mark the
712            // old node for deletion.
713
714            insert( newPriority, node->getValue() );
715            node->tryMarkForDeletion();
716            numNodesChanged ++;
717         }
718         else
719         {
720            // Node is still current.  Just move to end.
721
722            mUpdateQueue.insert( currentTime + getUpdateInterval(), node );
723         }
724      }
725      else
726         numNodesDead ++;
727   }
728
729   #ifdef DEBUG_SPEW
730   if( numNodesUpdated )
731      Platform::outputDebugString( "[ThreadSafePriorityQueueWithUpdate] updated %i nodes (%i changed, %i dead)",
732                                   numNodesUpdated, numNodesChanged, numNodesDead );
733   #endif
734}
735
736// Re-enable TMM if necessary.
737#include "platform/tmm_on.h"
738
739#undef DEBUG_SPEW
740
741#endif // !_THREADSAFEPRIORITYQUEUE_H_
742