threadPool.cpp
Engine/source/platform/threads/threadPool.cpp
Classes:
class
Value wrapper for work items while placed on priority queue.
Detailed Description
1 2//----------------------------------------------------------------------------- 3// Copyright (c) 2012 GarageGames, LLC 4// 5// Permission is hereby granted, free of charge, to any person obtaining a copy 6// of this software and associated documentation files (the "Software"), to 7// deal in the Software without restriction, including without limitation the 8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9// sell copies of the Software, and to permit persons to whom the Software is 10// furnished to do so, subject to the following conditions: 11// 12// The above copyright notice and this permission notice shall be included in 13// all copies or substantial portions of the Software. 14// 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21// IN THE SOFTWARE. 22//----------------------------------------------------------------------------- 23 24#include "platform/threads/threadPool.h" 25#include "platform/threads/thread.h" 26#include "platform/platformCPUCount.h" 27#include "core/strings/stringFunctions.h" 28#include "core/util/tSingleton.h" 29 30 31//#define DEBUG_SPEW 32 33 34//============================================================================= 35// ThreadPool::Context. 36//============================================================================= 37 38ThreadPool::Context ThreadPool::Context::smRootContext( "ROOT", NULL, 1.0 ); 39 40//-------------------------------------------------------------------------- 41 42ThreadPool::Context::Context( const char* name, ThreadPool::Context* parent, F32 priorityBias ) 43 : mParent( parent ), 44 mName( name ), 45 mChildren( 0 ), 46 mSibling( 0 ), 47 mPriorityBias( priorityBias ), 48 mAccumulatedPriorityBias( 0.0 ) 49{ 50 if( parent ) 51 { 52 mSibling = mParent->mChildren; 53 mParent->mChildren = this; 54 } 55} 56 57//-------------------------------------------------------------------------- 58 59ThreadPool::Context::~Context() 60{ 61 if( mParent ) 62 for( Context* context = mParent->mChildren, *prev = 0; context != 0; prev = context, context = context->mSibling ) 63 if( context == this ) 64 { 65 if( !prev ) 66 mParent->mChildren = this->mSibling; 67 else 68 prev->mSibling = this->mSibling; 69 } 70} 71 72//-------------------------------------------------------------------------- 73 74ThreadPool::Context* ThreadPool::Context::getChild( const char* name ) 75{ 76 for( Context* child = getChildren(); child != 0; child = child->getSibling() ) 77 if( dStricmp( child->getName(), name ) == 0 ) 78 return child; 79 return 0; 80} 81 82//-------------------------------------------------------------------------- 83 84F32 ThreadPool::Context::getAccumulatedPriorityBias() 85{ 86 if( !mAccumulatedPriorityBias ) 87 updateAccumulatedPriorityBiases(); 88 return mAccumulatedPriorityBias; 89} 90 91//-------------------------------------------------------------------------- 92 93void ThreadPool::Context::setPriorityBias( F32 value ) 94{ 95 mPriorityBias = value; 96 mAccumulatedPriorityBias = 0.0; 97} 98 99//-------------------------------------------------------------------------- 100 101void ThreadPool::Context::updateAccumulatedPriorityBiases() 102{ 103 // Update our own priority bias. 104 105 mAccumulatedPriorityBias = mPriorityBias; 106 for( Context* context = getParent(); context != 0; context = context->getParent() ) 107 mAccumulatedPriorityBias *= context->getPriorityBias(); 108 109 // Update our children. 110 111 for( Context* child = getChildren(); child != 0; child = child->getSibling() ) 112 child->updateAccumulatedPriorityBiases(); 113} 114 115//============================================================================= 116// ThreadPool::WorkItem. 117//============================================================================= 118 119//-------------------------------------------------------------------------- 120 121void ThreadPool::WorkItem::process() 122{ 123 execute(); 124 mExecuted = true; 125} 126 127//-------------------------------------------------------------------------- 128 129bool ThreadPool::WorkItem::isCancellationRequested() 130{ 131 return false; 132} 133 134//-------------------------------------------------------------------------- 135 136bool ThreadPool::WorkItem::cancellationPoint() 137{ 138 if( isCancellationRequested() ) 139 { 140 onCancelled(); 141 return true; 142 } 143 else 144 return false; 145} 146 147//-------------------------------------------------------------------------- 148 149F32 ThreadPool::WorkItem::getPriority() 150{ 151 return 1.0; 152} 153 154//============================================================================= 155// ThreadPool::WorkItemWrapper. 156//============================================================================= 157 158/// Value wrapper for work items while placed on priority queue. 159/// Conforms to interface dictated by ThreadSafePriorityQueueWithUpdate. 160/// 161/// @see ThreadSafePriorityQueueWithUpdate 162/// @see ThreadPool::WorkItem 163/// 164struct ThreadPool::WorkItemWrapper : public ThreadSafeRef< WorkItem > 165{ 166 typedef ThreadSafeRef< WorkItem> Parent; 167 168 WorkItemWrapper() {} 169 WorkItemWrapper( WorkItem* item ) 170 : Parent( item ) {} 171 172 bool isAlive(); 173 F32 getPriority(); 174}; 175 176inline bool ThreadPool::WorkItemWrapper::isAlive() 177{ 178 WorkItem* item = ptr(); 179 if( !item ) 180 return false; 181 else if( item->isCancellationRequested() ) 182 { 183 ( *this ) = 0; 184 return false; 185 } 186 else 187 return true; 188} 189 190inline F32 ThreadPool::WorkItemWrapper::getPriority() 191{ 192 WorkItem* item = ptr(); 193 AssertFatal( item != 0, "ThreadPool::WorkItemWrapper::getPriority - called on dead item" ); 194 195 // Compute a scaled priority value based on the item's context. 196 return ( item->getContext()->getAccumulatedPriorityBias() * item->getPriority() ); 197} 198 199//============================================================================= 200// ThreadPool::WorkerThread. 201//============================================================================= 202 203/// 204/// 205struct ThreadPool::WorkerThread : public Thread 206{ 207 WorkerThread( ThreadPool* pool, U32 index ); 208 209 WorkerThread* getNext(); 210 virtual void run( void* arg = 0 ); 211 212private: 213 U32 mIndex; 214 ThreadPool* mPool; 215 WorkerThread* mNext; 216}; 217 218ThreadPool::WorkerThread::WorkerThread( ThreadPool* pool, U32 index ) 219 : mIndex( index ), 220 mPool( pool ) 221{ 222 // Link us to the pool's thread list. 223 224 mNext = pool->mThreads; 225 pool->mThreads = this; 226} 227 228inline ThreadPool::WorkerThread* ThreadPool::WorkerThread::getNext() 229{ 230 return mNext; 231} 232 233void ThreadPool::WorkerThread::run( void* arg ) 234{ 235 #ifdef TORQUE_DEBUG 236 { 237 // Set the thread's name for debugging. 238 char buffer[ 2048 ]; 239 dSprintf( buffer, sizeof( buffer ), "ThreadPool(%s) WorkerThread %i", mPool->mName.c_str(), mIndex ); 240 _setName( buffer ); 241 } 242 #endif 243 244 while( 1 ) 245 { 246 if( checkForStop() ) 247 { 248#ifdef DEBUG_SPEW 249 Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' exits", getId() ); 250#endif 251 dFetchAndAdd( mPool->mNumThreads, ( U32 ) -1 ); 252 return; 253 } 254 255 // Mark us as potentially blocking. 256 dFetchAndAdd( mPool->mNumThreadsReady, ( U32 ) -1 ); 257 258 bool waitForSignal = false; 259 { 260 // Try to take an item from the queue. Do 261 // this in a separate block, so we'll be 262 // releasing the item after we have finished. 263 264 WorkItemWrapper workItem; 265 if( mPool->mWorkItemQueue.takeNext( workItem ) ) 266 { 267 // Mark us as non-blocking as this loop definitely 268 // won't wait on the semaphore. 269 dFetchAndAdd( mPool->mNumThreadsReady, 1 ); 270 271#ifdef DEBUG_SPEW 272 Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem ); 273#endif 274 workItem->process(); 275 276 dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 ); 277 } 278 else 279 waitForSignal = true; 280 } 281 282 if( waitForSignal ) 283 { 284 dFetchAndAdd( mPool->mNumThreadsAwake, ( U32 ) -1 ); 285 286#ifdef DEBUG_SPEW 287 Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' going to sleep", getId() ); 288#endif 289 mPool->mSemaphore.acquire(); 290#ifdef DEBUG_SPEW 291 Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' waking up", getId() ); 292#endif 293 294 dFetchAndAdd( mPool->mNumThreadsAwake, 1 ); 295 dFetchAndAdd( mPool->mNumThreadsReady, 1 ); 296 } 297 } 298} 299 300//============================================================================= 301// ThreadPool. 302//============================================================================= 303 304bool ThreadPool::smForceAllMainThread; 305U32 ThreadPool::smMainThreadTimeMS; 306ThreadPool::QueueType ThreadPool::smMainThreadQueue; 307 308//-------------------------------------------------------------------------- 309 310ThreadPool::ThreadPool( const char* name, U32 numThreads ) 311 : mName( name ), 312 mNumThreads( numThreads ), 313 mNumThreadsAwake( 0 ), 314 mNumPendingItems( 0 ), 315 mSemaphore( 0 ), 316 mThreads( 0 ) 317{ 318 // Number of worker threads to create. 319 320 if( !mNumThreads ) 321 { 322 // Use platformCPUInfo directly as in the case of the global pool, 323 // Platform::SystemInfo will not yet have been initialized. 324 325 U32 numLogical = 0; 326 U32 numPhysical = 0; 327 U32 numCores = 0; 328 329 CPUInfo::CPUCount( numLogical, numCores, numPhysical ); 330 331 const U32 baseCount = getMax( numLogical, numCores ); 332 mNumThreads = (baseCount > 0) ? baseCount : 2; 333 } 334 335 #ifdef DEBUG_SPEW 336 Platform::outputDebugString( "[ThreadPool] spawning %i threads", mNumThreads ); 337 #endif 338 339 // Create the threads. 340 341 mNumThreadsAwake = mNumThreads; 342 mNumThreadsReady = mNumThreads; 343 for( U32 i = 0; i < mNumThreads; i ++ ) 344 { 345 WorkerThread* thread = new WorkerThread( this, i ); 346 thread->start(); 347 } 348} 349 350//-------------------------------------------------------------------------- 351 352ThreadPool::~ThreadPool() 353{ 354 shutdown(); 355} 356 357//-------------------------------------------------------------------------- 358 359void ThreadPool::shutdown() 360{ 361 const U32 numThreads = mNumThreads; 362 363 // Tell our worker threads to stop. 364 365 for( WorkerThread* thread = mThreads; thread != 0; thread = thread->getNext() ) 366 thread->stop(); 367 368 // Release the semaphore as many times as there are threads. 369 // Doing this separately guarantees we're not waking a thread 370 // that hasn't been set its stop flag yet. 371 372 for( U32 n = 0; n < numThreads; ++ n ) 373 mSemaphore.release(); 374 375 // Delete each worker thread. Wait until death as we're prone to 376 // running into issues with decomposing work item lists otherwise. 377 378 for( WorkerThread* thread = mThreads; thread != 0; ) 379 { 380 WorkerThread* next = thread->getNext(); 381 thread->join(); 382 delete thread; 383 thread = next; 384 } 385 386 mThreads = NULL; 387 mNumThreads = 0; 388} 389 390//-------------------------------------------------------------------------- 391 392void ThreadPool::queueWorkItem( WorkItem* item ) 393{ 394 bool executeRightAway = ( getForceAllMainThread() ); 395#ifdef DEBUG_SPEW 396 Platform::outputDebugString( "[ThreadPool] %s work item '0x%x'", 397 ( executeRightAway ? "executing" : "queuing" ), 398 item ); 399#endif 400 401 if( executeRightAway ) 402 item->process(); 403 else 404 { 405 // Put the item in the queue. 406 dFetchAndAdd( mNumPendingItems, 1 ); 407 mWorkItemQueue.insert( item->getPriority(), item ); 408 409 mSemaphore.release(); 410 } 411} 412 413//-------------------------------------------------------------------------- 414 415void ThreadPool::flushWorkItems( S32 timeOut ) 416{ 417 AssertFatal( mNumThreads, "ThreadPool::flushWorkItems() - no worker threads in pool" ); 418 419 U32 endTime = 0; 420 if( timeOut != -1 ) 421 endTime = Platform::getRealMilliseconds() + timeOut; 422 423 // Spinlock until the queue is empty. 424 425 while( !mWorkItemQueue.isEmpty() ) 426 { 427 Platform::sleep( 25 ); 428 429 // Stop if we have exceeded our processing time budget. 430 431 if( timeOut != -1 432 && Platform::getRealMilliseconds() >= endTime ) 433 break; 434 } 435} 436 437void ThreadPool::waitForAllItems( S32 timeOut ) 438{ 439 U32 endTime = 0; 440 if( timeOut != -1 ) 441 endTime = Platform::getRealMilliseconds() + timeOut; 442 443 // Spinlock until there are no items that have not been processed. 444 445 while( dAtomicRead( mNumPendingItems ) ) 446 { 447 Platform::sleep( 25 ); 448 449 // Stop if we have exceeded our processing time budget. 450 451 if( timeOut != -1 452 && Platform::getRealMilliseconds() >= endTime ) 453 break; 454 } 455} 456 457//-------------------------------------------------------------------------- 458 459void ThreadPool::queueWorkItemOnMainThread( WorkItem* item ) 460{ 461 smMainThreadQueue.insert( item->getPriority(), item ); 462} 463 464//-------------------------------------------------------------------------- 465 466void ThreadPool::processMainThreadWorkItems() 467{ 468 AssertFatal( ThreadManager::isMainThread(), 469 "ThreadPool::processMainThreadWorkItems - this function must only be called on the main thread" ); 470 471 U32 timeLimit = ( Platform::getRealMilliseconds() + getMainThreadThresholdTimeMS() ); 472 473 do 474 { 475 WorkItemWrapper item; 476 if( !smMainThreadQueue.takeNext( item ) ) 477 break; 478 else 479 item->process(); 480 } 481 while( Platform::getRealMilliseconds() < timeLimit ); 482} 483