threadSafePriorityQueue.h
Engine/source/platform/threads/threadSafePriorityQueue.h
Template code for an efficient thread-safe priority queue implementation.
Classes:
class
Fast, lock-free priority queue implementation for concurrent access.
class
A queue node.
class
Fast, lock-free priority queue implementation for concurrent access that performs dynamic re-prioritization of items.
Detailed Description
Template code for an efficient thread-safe priority queue implementation.
There are two alternative implementations to choose from: ThreadSafePriorityQueue and ThreadSafePriorityQueueWithUpdate where the latter adds concurrent status updates of queue items to the former implementation.
1 2//----------------------------------------------------------------------------- 3// Copyright (c) 2012 GarageGames, LLC 4// 5// Permission is hereby granted, free of charge, to any person obtaining a copy 6// of this software and associated documentation files (the "Software"), to 7// deal in the Software without restriction, including without limitation the 8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9// sell copies of the Software, and to permit persons to whom the Software is 10// furnished to do so, subject to the following conditions: 11// 12// The above copyright notice and this permission notice shall be included in 13// all copies or substantial portions of the Software. 14// 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21// IN THE SOFTWARE. 22//----------------------------------------------------------------------------- 23 24#ifndef _THREADSAFEPRIORITYQUEUE_H_ 25#define _THREADSAFEPRIORITYQUEUE_H_ 26 27#ifndef _PLATFORMINTRINSICS_H_ 28 #include "platform/platformIntrinsics.h" 29#endif 30#ifndef _THREADSAFEREFCOUNT_H_ 31 #include "platform/threads/threadSafeRefCount.h" 32#endif 33#ifndef _TYPETRAITS_H_ 34 #include "platform/typetraits.h" 35#endif 36 37 38// Disable TMM's new operator grabbing. 39#include "platform/tmm_off.h" 40 41 42//#define DEBUG_SPEW 43 44 45/// @file 46/// Template code for an efficient thread-safe priority queue 47/// implementation. There are two alternative implementations to 48/// choose from: ThreadSafePriorityQueue and ThreadSafePriorityQueueWithUpdate 49/// where the latter adds concurrent status updates of queue items to 50/// the former implementation. 51 52 53//-------------------------------------------------------------------------- 54// ThreadSafePriorityQueue. 55//-------------------------------------------------------------------------- 56 57/// Fast, lock-free priority queue implementation for concurrent access. 58/// 59/// Equal priorities are allowed and are placed <em>before</em> existing items of 60/// identical priority in the queue. 61/// 62/// Based on (but with significant deviations from) "Fast and Lock-Free Concurrent 63/// Priority Queues for Multi-Thread Systems" by Hakan Sundell and Philippas Tsigas. 64/// Parts of the skiplist code is based on work by William Pugh. 65/// 66/// @param T The item value type. Must have a default constructor. 67/// @param K The priority key type. Must be comparable, have a default constructor, 68/// and be a valid template parameter to TypeTraits. 69/// @param SORT_MIN_TO_MAX If true, the queue sorts from minimum to maximum priority or 70/// the reverse if false. 71/// @param MAX_LEVEL The number of levels a node can have at most. 72/// @param PROBABILISTIC_BIAS The probabilistic level distribution factor for 73/// the skiplist. Multiplied by 100 and turned into int to conform to restrictions 74/// on non-type template parameters. 75/// 76/// @see TypeTraits 77 78template< typename T, typename K = F32, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 > 79struct ThreadSafePriorityQueue 80{ 81 typedef T ValueType; 82 typedef K KeyType; 83 84 enum { MAX_LEVEL_CONST = MAX_LEVEL }; 85 86 ThreadSafePriorityQueue(); 87 88 bool isEmpty(); 89 void insert( KeyType priority, const T& value ); 90 bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) ); 91 92protected: 93 struct Node; 94 typedef ThreadSafeRef< Node> NodePtr; 95 friend class ThreadSafeRefCount< Node >; 96 friend struct DeleteSingle; 97 98 /// A queue node. 99 /// 100 /// Nodes are reference-counted to coordinate memory management 101 /// between the different threads. Reclamation happens on the 102 /// thread that releases the last reference. 103 /// 104 /// Reference-counting and deletion requests are kept separate. 105 /// A given node is marked for deletion and will then have its references 106 /// progressively disappear and eventually be reclaimed once the 107 /// reference count drops to zero. 108 /// 109 /// Note that 'Next' references are released by the destructor which 110 /// is only called when the reference count to the node itself drops to 111 /// zero. This is to avoid threads getting trapped in a node with no 112 /// link out. 113 114 struct Node : public ThreadSafeRefCount< Node > 115 { 116 typedef ThreadSafeRefCount< Node> Parent; 117 118 Node( KeyType priority, const ValueType& value ); 119 ~Node(); 120 121 KeyType getPriority() { return mPriority; } 122 ValueType& getValue() { return mValue; } 123 U32 getLevel(); 124 NodePtr& getNext( U32 level ); 125 126 bool isMarkedForDeletion(); 127 bool tryMarkForDeletion(); 128 129 void clearValue() { mValue = ValueType(); } 130 131 static U32 randomLevel(); 132 133 void* operator new( size_t size, S32 level = -1 ); 134 void operator delete( void* ptr ); 135 136 private: 137 KeyType mPriority; ///< Priority key. 138 U32 mLevel; ///< Level count and deletion bit (highest). 139 ValueType mValue; 140 Node* mNext[ 1 ]; ///< Variable-sized array of next pointers. 141 142 struct FreeList 143 { 144 bool mDestroyed; 145 Node* mNodes; 146 147 ~FreeList(); 148 }; 149 150 static FreeList smFreeLists[ MAX_LEVEL ]; 151 }; 152 153 NodePtr mHead; ///< Artificial head node. 154 NodePtr mTail; ///< Artificial tail node. 155 156 void readNext( NodePtr& refPrev, NodePtr& refNext, U32 level ); 157 void scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority ); 158 void scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority ); 159 void insert( KeyType priority, const T& value, NodePtr& outResult ); 160 void helpDelete(); 161}; 162 163template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 164typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::smFreeLists[ MAX_LEVEL ]; 165 166/// Construct an empty queue. 167/// 168/// Internally, this creates a head node with maximal priority and a tail node with minimal priority, 169/// both at maximum level. 170 171template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 172ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueue() 173{ 174 NodePtr::unsafeWrite( mHead, new ( MAX_LEVEL - 1 ) 175 Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MIN : TypeTraits< KeyType >::MAX, ValueType() ) ); 176 NodePtr::unsafeWrite( mTail, new ( MAX_LEVEL - 1 ) 177 Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN, ValueType() ) ); 178 179 for( U32 level = 0; level < MAX_LEVEL; level ++ ) 180 mHead->getNext( level ) = mTail; 181} 182 183/// Return true if the queue does not currently contain an item. 184 185template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 186bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::isEmpty() 187{ 188 return ( mHead->getNext( 0 ) == mTail ); 189} 190 191/// Insert the given value into the queue at the place determined by the given priority. 192 193template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 194inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value ) 195{ 196 NodePtr result; 197 insert( priority, value, result ); 198} 199 200template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 201void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value, NodePtr& outResult ) 202{ 203 // Create a new node at a random level. 204 205 outResult = NULL; 206 NodePtr::unsafeWrite( outResult, new Node( priority, value ) ); 207 U32 resultNodeLevel = outResult->getLevel(); 208 209 // Link up all the levels. Do this bottom-up instead of 210 // top-down (as would be the right way for a skiplist) so 211 // that our list state always remains valid. If going top-down, 212 // we'll insert nodes with NULL pointers at their lower levels. 213 214 U32 currentLevel = 0; 215 do 216 { 217 while( 1 ) 218 { 219 NodePtr nextNode; 220 NodePtr prevNode; 221 222 scanFromHead( prevNode, nextNode, currentLevel, priority ); 223 224 outResult->getNext( currentLevel ) = nextNode; 225 if( prevNode->getNext( currentLevel ).trySetFromTo( nextNode, outResult, NodePtr::TAG_FailIfSet ) ) 226 break; 227 else 228 outResult->getNext( currentLevel ) = 0; 229 } 230 231 currentLevel ++; 232 } 233 while( currentLevel <= resultNodeLevel 234 && !outResult->isMarkedForDeletion() ); // No point linking up remaining levels if another thread already took this node. 235} 236 237/// Take the item with the highest priority from the queue. 238/// 239/// @param outValue Reference to where the resulting value should be stored. 240/// @param upToPriority Priority limit (inclusive) up to which items are taken from the queue. 241/// @return true if there was a matching item in the queue. 242 243template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 244bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority ) 245{ 246 // Iterate through to the first unmarked node. 247 248 NodePtr prevNode = mHead; 249 while( 1 ) 250 { 251 NodePtr node; 252 readNext( prevNode, node, 0 ); 253 254 if( node == mTail ) 255 return false; // End reached. 256 257 bool priorityThresholdReached = SORT_MIN_TO_MAX 258 ? ( upToPriority >= node->getPriority() ) 259 : ( upToPriority <= node->getPriority() ); 260 261 if( !priorityThresholdReached ) 262 return false; 263 else 264 { 265 // Try to mark the node for deletion. Only if that succeeds, taking the 266 // node was a success and we can return. If it fails, spin and try again. 267 268 if( node->tryMarkForDeletion() ) 269 { 270 helpDelete(); 271 272 // Node is now off the list and will disappear as soon as 273 // all references held by threads (including this one) 274 // go out of scope. 275 276 outValue = node->getValue(); 277 node->clearValue(); 278 279 return true; 280 } 281 } 282 } 283} 284 285/// Update the given references to the next non-deleted node at the given level. 286/// refPrev will be updated to reference the immediate predecessor of the next 287/// node returned. Note that this can be a node in deleted state. 288/// 289/// @param refPrev Reference to a node of which the successor node should be 290/// returned. Updated to immediate predecessor of refNext on return. 291/// @param refNext Reference to update to refer to next non-deleted node on 292/// the given level. 293/// @param level Skiplist level to operate on. 294 295template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 296inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::readNext( NodePtr& refPrev, NodePtr& refNext, U32 level ) 297{ 298 while( 1 ) 299 { 300 refNext = refPrev->getNext( level ); 301 AssertFatal( refNext != NULL, "ThreadSafePriorityQueue::readNext() - next is NULL" ); 302 if( !refNext->isMarkedForDeletion() || refNext == mTail ) 303 break; 304 305 refPrev = refNext; 306 } 307} 308 309/// Scan for the position at which to insert a node of the given priority. 310/// Upon return, the position between refPrev and refNext is the one to insert at. 311/// 312/// @param refPrev position at which to start scanning; updated to match insert position. 313/// @param refNext 314 315template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 316void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority ) 317{ 318 while( 1 ) 319 { 320 readNext( refPrev, refNext, level ); 321 if( refNext == mTail 322 || ( SORT_MIN_TO_MAX 323 ? ( refNext->getPriority() > priority ) 324 : ( refNext->getPriority() < priority ) ) ) 325 break; 326 327 refPrev = refNext; 328 } 329} 330 331/// 332 333template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 334void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority ) 335{ 336 // Purge dead nodes at left end of queue so 337 // we don't get stuck hitting the same node 338 // in deletable state over and over again. 339 helpDelete(); 340 341 S32 currentLevel = MAX_LEVEL - 1; 342 refPrev = mHead; 343 do 344 { 345 scan( refPrev, refNext, currentLevel, priority ); 346 currentLevel --; 347 } 348 while( currentLevel >= S32( level ) ); 349} 350 351template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 352void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::helpDelete() 353{ 354 // Clean out all the references from head. 355 // Spin over a given reference on each level until head 356 // clearly refers to a node in non-deletable state. This 357 // makes this code work cooperatively with other threads 358 // doing takeNexts on prior or later nodes while also 359 // guaranteeing that all next pointers to us will eventually 360 // disappear. 361 // 362 // Note that this is *the only place* where we will be cleaning 363 // out our lists. 364 365 S32 level = MAX_LEVEL - 1; 366 do 367 { 368 while( 1 ) 369 { 370 NodePtr ptr = mHead->getNext( level ); 371 if( !ptr->isMarkedForDeletion() ) 372 break; 373 else 374 { 375 NodePtr& next = ptr->getNext( level ); 376 next.setTag(); 377 mHead->getNext( level ).trySetFromTo( ptr, next, NodePtr::TAG_Unset ); 378 AssertFatal( next->getRefCount() >= 2, "ThreadSafePriorityQueue::helpDelete() - invalid refcount" ); 379 } 380 } 381 382 level --; 383 } 384 while( level >= 0 ); 385} 386 387template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 388inline ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::Node( KeyType priority, const ValueType& value ) 389 : Parent( false ), 390 mPriority( priority ), 391 mValue( value ) 392{ 393 dMemset( mNext, 0, sizeof( Node* ) * ( getLevel() + 1 ) ); 394 395 // Level is already set by the allocation routines. 396} 397 398template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 399ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::~Node() 400{ 401 for( U32 level = 0; level < ( getLevel() + 1 ); level ++ ) 402 getNext( level ) = NULL; 403} 404 405/// Return the skip list level the node is at. 406 407template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 408inline U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getLevel() 409{ 410 // Mask out the deletion request bit. 411 412 return ( mLevel & 0x7FFFFFFF ); 413} 414 415/// Return the successor node at the given level. 416/// @param level The level of the desired successor node; must be within the node's level bounds. 417 418template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 419inline typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::NodePtr& ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getNext( U32 level ) 420{ 421 return *reinterpret_cast< NodePtr* >( &mNext[ level ] ); 422} 423 424/// Return true if the node is marked to be deleted. 425 426template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 427inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::isMarkedForDeletion() 428{ 429 return ( mLevel & 0x80000000 ); 430} 431 432/// Attempt to mark the node for deletion. If the mark bit has not yet been set 433/// and setting it on the current thread succeeds, returns true. 434/// 435/// @return true, if the marking succeeded. 436 437template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 438inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::tryMarkForDeletion() 439{ 440 U32 oldVal = mLevel & 0x7FFFFFFF; 441 U32 newVal = oldVal | 0x80000000; 442 443 return ( dCompareAndSwap( mLevel, oldVal, newVal ) ); 444} 445 446/// Choose a random level. 447/// 448/// The chosen level depends on the given PROBABILISTIC_BIAS and MAX_LEVEL, 449/// but is not affected by the actual number of nodes in a queue. 450 451template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 452U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::randomLevel() 453{ 454 U32 level = 0; 455 while( Platform::getRandom() < ( ( ( F32 ) PROBABILISTIC_BIAS ) / 100 ) && level < ( MAX_LEVEL - 1 ) ) 456 level ++; 457 return level; 458} 459 460/// Allocate a new node. 461/// The node comes with a reference count of one and its level already set. 462/// 463/// @param level The level to allocate the node at. If this is -1, a random level is chosen. 464/// @return a new node. 465 466template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 467void* ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator new( size_t size, S32 level ) 468{ 469 if( level == -1 ) 470 level = randomLevel(); 471 472 Node* node = 0; 473 while( 1 ) 474 { 475 // Try to take a node from the freelist. If there's none, 476 // allocate a new one. 477 478 if( !smFreeLists[ level ].mDestroyed ) 479 node = Node::safeRead( smFreeLists[ level ].mNodes ); 480 481 if( !node ) 482 { 483 node = ( Node* ) dMalloc( sizeof( Node ) + sizeof( Node* ) * level ); 484 dMemset( node, 0, sizeof( Node ) ); 485 node->mLevel = level; 486 node->addRef(); 487 break; 488 } 489 else if( dCompareAndSwap( smFreeLists[ level ].mNodes, node, node->mNext[ 0 ] ) ) 490 { 491 node->clearLowestBit(); 492 break; 493 } 494 else 495 node->release(); // Other thread was quicker than us; release. 496 } 497 498 AssertFatal( node->getRefCount() != 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" ); 499 AssertFatal( ( node->getRefCount() % 2 ) == 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" ); 500 return node; 501} 502 503/// Reclaim a node. 504/// 505/// @param node The node to reclaim. Must refer to a Node instance. 506 507template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 508void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator delete( void* ptr ) 509{ 510 //TODO: limit number of nodes kept 511 512 Node* node = ( Node* ) ptr; 513 U32 level = node->getLevel(); 514 node->mLevel = level; // Reset the node's deletion bit. 515 516 while( !smFreeLists[ level ].mDestroyed ) 517 { 518 // Put the node on the freelist. 519 520 Node* freeList = smFreeLists[ level ].mNodes; 521 node->mNext[ 0 ] = freeList; 522 523 if( dCompareAndSwap( smFreeLists[ level ].mNodes, freeList, node ) ) 524 { 525 node = NULL; 526 break; 527 } 528 } 529 530 if( node ) 531 dFree( node ); 532} 533 534template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 535ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList::~FreeList() 536{ 537 mDestroyed = true; 538 while( mNodes ) 539 { 540 //FIXME: could leak some bytes under unfortunate circumstances (this in 541 // combination with mDestroyed is a dependent write) 542 543 Node* next = mNodes; 544 if( dCompareAndSwap( mNodes, next, next->mNext[ 0 ] ) ) 545 dFree( next ); 546 } 547} 548 549//-------------------------------------------------------------------------- 550// ThreadSafePriorityQueueWithUpdate. 551//-------------------------------------------------------------------------- 552 553/// Fast, lock-free priority queue implementation for concurrent access that 554/// performs dynamic re-prioritization of items. 555/// 556/// Within the bounds of a set update interval UPDATE_INTERVAL, the takeNext 557/// method is guaranteed to always return the item that has the highest priority 558/// at the time the method is called rather than at the time items were inserted 559/// into the queue. 560/// 561/// Values placed on the queue must implement the following interface: 562/// 563/// @code 564/// template< typename K > 565/// struct IThreadSafePriorityQueueItem 566/// { 567/// // Default constructor. 568/// IThreadSafePriorityQueueItem(); 569/// 570/// // Return the current priority. 571/// // This must run normally even if the item is already dead. 572/// K getPriority(); 573/// 574/// // Return true if the item is still meant to be waiting in the queue. 575/// bool isAlive(); 576/// }; 577/// @endcode 578 579template< typename T, typename K, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 > 580struct ThreadSafePriorityQueueWithUpdate : public ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS > 581{ 582 583 typedef T ValueType; 584 typedef K KeyType; 585 586 enum { DEFAULT_UPDATE_INTERVAL = 256 }; 587 588 ThreadSafePriorityQueueWithUpdate( U32 updateInterval = DEFAULT_UPDATE_INTERVAL ); 589 590 void insert( KeyType priority, const T& value ); 591 bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) ); 592 593 U32 getUpdateInterval() const; 594 void setUpdateInterval( U32 value ); 595 596 KeyType getTimeBasedPriorityBoost() const; 597 void setTimeBasedPriorityBoost( KeyType value ); 598 599 void updatePriorities(); 600 601protected: 602 typedef ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS> Parent; 603 typedef U32 TickType; 604 typedef typename Parent::NodePtr NodePtr; 605 606 U32 mUpdateInterval; 607 KeyType mPriorityBoost; ///< If this is non-zero, priorities will be boosted by this amount each update. This can be used to prevent constant high-priority inserts to starve low-priority items already in the queue. 608 609 /// Work queue for node updates. 610 ThreadSafePriorityQueue< NodePtr, TickType, true, MAX_LEVEL, PROBABILISTIC_BIAS> mUpdateQueue; 611 612 TickType getTick() { return Platform::getRealMilliseconds(); } 613}; 614 615template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 616ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueueWithUpdate( U32 updateInterval ) 617 : mUpdateInterval( updateInterval ), 618 mPriorityBoost( TypeTraits< KeyType >::ZERO ) 619{ 620} 621 622/// Return the current update interval in milliseconds. 623 624template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 625U32 ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getUpdateInterval() const 626{ 627 return mUpdateInterval; 628} 629 630/// Set update interval of queue to given value. 631/// 632/// <em>Call this method on the main thread only.</em> 633/// 634/// @param value Time between priority updates in milliseconds. 635 636template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 637void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setUpdateInterval( U32 value ) 638{ 639 mUpdateInterval = value; 640} 641 642/// Return the delta to apply to priorities on each update. 643/// Set to zero to deactivate time-based priority adjustments. 644 645template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 646K ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getTimeBasedPriorityBoost() const 647{ 648 return mPriorityBoost; 649} 650 651/// Set the delta for time-based priority adjustments to the given value. 652/// 653/// <em>Call this method on the main thread only.</em> 654/// 655/// @param value The new priority adjustment value. 656 657template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 658void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setTimeBasedPriorityBoost( KeyType value ) 659{ 660 mPriorityBoost = value; 661} 662 663template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 664void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value ) 665{ 666 NodePtr node; 667 Parent::insert( priority, value, node ); 668 mUpdateQueue.insert( getTick() + getUpdateInterval(), node ); 669} 670 671template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 672bool ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority ) 673{ 674 updatePriorities(); 675 676 bool result = false; 677 do 678 { 679 result = Parent::takeNext( outValue, upToPriority ); 680 } 681 while( result && !outValue.isAlive() ); 682 683 return result; 684} 685 686/// 687 688template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > 689void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::updatePriorities() 690{ 691 TickType currentTime = getTick(); 692 U32 numNodesUpdated = 0; 693 U32 numNodesDead = 0; 694 U32 numNodesChanged = 0; 695 696 NodePtr node; 697 while( mUpdateQueue.takeNext( node, currentTime ) ) 698 { 699 numNodesUpdated ++; 700 701 // Since we're updating nodes on the update queue only periodically, 702 // their associated values or main queue nodes may have died in the 703 // meantime. If so, we just discard them here. 704 705 if( node->getValue().isAlive() 706 && !node->isMarkedForDeletion() ) 707 { 708 KeyType newPriority = node->getValue().getPriority() + getTimeBasedPriorityBoost(); 709 if( newPriority != node->getPriority() ) 710 { 711 // Node is outdated. Reinsert with new priority and mark the 712 // old node for deletion. 713 714 insert( newPriority, node->getValue() ); 715 node->tryMarkForDeletion(); 716 numNodesChanged ++; 717 } 718 else 719 { 720 // Node is still current. Just move to end. 721 722 mUpdateQueue.insert( currentTime + getUpdateInterval(), node ); 723 } 724 } 725 else 726 numNodesDead ++; 727 } 728 729 #ifdef DEBUG_SPEW 730 if( numNodesUpdated ) 731 Platform::outputDebugString( "[ThreadSafePriorityQueueWithUpdate] updated %i nodes (%i changed, %i dead)", 732 numNodesUpdated, numNodesChanged, numNodesDead ); 733 #endif 734} 735 736// Re-enable TMM if necessary. 737#include "platform/tmm_on.h" 738 739#undef DEBUG_SPEW 740 741#endif // !_THREADSAFEPRIORITYQUEUE_H_ 742