threadPool.cpp

Engine/source/platform/threads/threadPool.cpp

More...

Classes:

class

Value wrapper for work items while placed on priority queue.

Detailed Description

  1
  2//-----------------------------------------------------------------------------
  3// Copyright (c) 2012 GarageGames, LLC
  4//
  5// Permission is hereby granted, free of charge, to any person obtaining a copy
  6// of this software and associated documentation files (the "Software"), to
  7// deal in the Software without restriction, including without limitation the
  8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9// sell copies of the Software, and to permit persons to whom the Software is
 10// furnished to do so, subject to the following conditions:
 11//
 12// The above copyright notice and this permission notice shall be included in
 13// all copies or substantial portions of the Software.
 14//
 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 21// IN THE SOFTWARE.
 22//-----------------------------------------------------------------------------
 23
 24#include "platform/threads/threadPool.h"
 25#include "platform/threads/thread.h"
 26#include "platform/platformCPUCount.h"
 27#include "core/strings/stringFunctions.h"
 28#include "core/util/tSingleton.h"
 29
 30
 31//#define DEBUG_SPEW
 32
 33
 34//=============================================================================
 35//    ThreadPool::Context.
 36//=============================================================================
 37
 38ThreadPool::Context ThreadPool::Context::smRootContext( "ROOT", NULL, 1.0 );
 39
 40//--------------------------------------------------------------------------
 41
 42ThreadPool::Context::Context( const char* name, ThreadPool::Context* parent, F32 priorityBias )
 43   : mParent( parent ),
 44     mName( name ),
 45     mChildren( 0 ),
 46     mSibling( 0 ),
 47     mPriorityBias( priorityBias ),
 48     mAccumulatedPriorityBias( 0.0 )
 49{
 50   if( parent )
 51   {
 52      mSibling = mParent->mChildren;
 53      mParent->mChildren = this;
 54   }
 55}
 56
 57//--------------------------------------------------------------------------
 58
 59ThreadPool::Context::~Context()
 60{
 61   if( mParent )
 62      for( Context* context = mParent->mChildren, *prev = 0; context != 0; prev = context, context = context->mSibling )
 63         if( context == this )
 64         {
 65            if( !prev )
 66               mParent->mChildren = this->mSibling;
 67            else
 68               prev->mSibling = this->mSibling;
 69         }
 70}
 71
 72//--------------------------------------------------------------------------
 73
 74ThreadPool::Context* ThreadPool::Context::getChild( const char* name )
 75{
 76   for( Context* child = getChildren(); child != 0; child = child->getSibling() )
 77      if( dStricmp( child->getName(), name ) == 0 )
 78         return child;
 79   return 0;
 80}
 81
 82//--------------------------------------------------------------------------
 83
 84F32 ThreadPool::Context::getAccumulatedPriorityBias()
 85{
 86   if( !mAccumulatedPriorityBias )
 87      updateAccumulatedPriorityBiases();
 88   return mAccumulatedPriorityBias;
 89}
 90
 91//--------------------------------------------------------------------------
 92
 93void ThreadPool::Context::setPriorityBias( F32 value )
 94{
 95   mPriorityBias = value;
 96   mAccumulatedPriorityBias = 0.0;
 97}
 98
 99//--------------------------------------------------------------------------
100
101void ThreadPool::Context::updateAccumulatedPriorityBiases()
102{
103   // Update our own priority bias.
104
105   mAccumulatedPriorityBias = mPriorityBias;
106   for( Context* context = getParent(); context != 0; context = context->getParent() )
107      mAccumulatedPriorityBias *= context->getPriorityBias();
108   
109   // Update our children.
110
111   for( Context* child = getChildren(); child != 0; child = child->getSibling() )
112      child->updateAccumulatedPriorityBiases();
113}
114
115//=============================================================================
116//    ThreadPool::WorkItem.
117//=============================================================================
118
119//--------------------------------------------------------------------------
120
121void ThreadPool::WorkItem::process()
122{
123   execute();
124   mExecuted = true;
125}
126
127//--------------------------------------------------------------------------
128
129bool ThreadPool::WorkItem::isCancellationRequested()
130{
131   return false;
132}
133
134//--------------------------------------------------------------------------
135
136bool ThreadPool::WorkItem::cancellationPoint()
137{
138   if( isCancellationRequested() )
139   {
140      onCancelled();
141      return true;
142   }
143   else
144      return false;
145}
146
147//--------------------------------------------------------------------------
148
149F32 ThreadPool::WorkItem::getPriority()
150{
151   return 1.0;
152}
153
154//=============================================================================
155//    ThreadPool::WorkItemWrapper.
156//=============================================================================
157
158/// Value wrapper for work items while placed on priority queue.
159/// Conforms to interface dictated by ThreadSafePriorityQueueWithUpdate.
160///
161/// @see ThreadSafePriorityQueueWithUpdate
162/// @see ThreadPool::WorkItem
163///
164struct ThreadPool::WorkItemWrapper : public ThreadSafeRef< WorkItem >
165{
166   typedef ThreadSafeRef< WorkItem> Parent;
167
168   WorkItemWrapper() {}
169   WorkItemWrapper( WorkItem* item )
170      : Parent( item ) {}
171
172   bool           isAlive();
173   F32            getPriority();
174};
175
176inline bool ThreadPool::WorkItemWrapper::isAlive()
177{
178   WorkItem* item = ptr();
179   if( !item )
180      return false;
181   else if( item->isCancellationRequested() )
182   {
183      ( *this ) = 0;
184      return false;
185   }
186   else
187      return true;
188}
189
190inline F32 ThreadPool::WorkItemWrapper::getPriority()
191{
192   WorkItem* item = ptr();
193   AssertFatal( item != 0, "ThreadPool::WorkItemWrapper::getPriority - called on dead item" );
194
195   // Compute a scaled priority value based on the item's context.
196   return ( item->getContext()->getAccumulatedPriorityBias() * item->getPriority() );
197}
198
199//=============================================================================
200//    ThreadPool::WorkerThread.
201//=============================================================================
202
203///
204///
205struct ThreadPool::WorkerThread : public Thread
206{
207   WorkerThread( ThreadPool* pool, U32 index );
208
209   WorkerThread*     getNext();
210   virtual void      run( void* arg = 0 );
211
212private:
213   U32               mIndex;
214   ThreadPool*       mPool;
215   WorkerThread*     mNext;
216};
217
218ThreadPool::WorkerThread::WorkerThread( ThreadPool* pool, U32 index )
219   : mIndex( index ),
220     mPool( pool )
221{
222   // Link us to the pool's thread list.
223
224   mNext = pool->mThreads;
225   pool->mThreads = this;
226}
227
228inline ThreadPool::WorkerThread* ThreadPool::WorkerThread::getNext()
229{
230   return mNext;
231}
232
233void ThreadPool::WorkerThread::run( void* arg )
234{
235   #ifdef TORQUE_DEBUG
236   {
237      // Set the thread's name for debugging.
238      char buffer[ 2048 ];
239      dSprintf( buffer, sizeof( buffer ), "ThreadPool(%s) WorkerThread %i", mPool->mName.c_str(), mIndex );
240      _setName( buffer );
241   }
242   #endif
243
244   while( 1 )
245   {
246      if( checkForStop() )
247      {
248#ifdef DEBUG_SPEW
249         Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' exits", getId() );
250#endif
251         dFetchAndAdd( mPool->mNumThreads, ( U32 ) -1 );
252         return;
253      }
254
255      // Mark us as potentially blocking.
256      dFetchAndAdd( mPool->mNumThreadsReady, ( U32 ) -1 );
257
258      bool waitForSignal = false;
259      {
260         // Try to take an item from the queue.  Do
261         // this in a separate block, so we'll be
262         // releasing the item after we have finished.
263
264         WorkItemWrapper workItem;
265         if( mPool->mWorkItemQueue.takeNext( workItem ) )
266         {
267            // Mark us as non-blocking as this loop definitely
268            // won't wait on the semaphore.
269            dFetchAndAdd( mPool->mNumThreadsReady, 1 );
270
271#ifdef DEBUG_SPEW
272            Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem );
273#endif
274            workItem->process();
275
276            dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 );
277         }
278         else
279            waitForSignal = true;
280      }
281
282      if( waitForSignal )
283      {
284         dFetchAndAdd( mPool->mNumThreadsAwake, ( U32 ) -1 );
285
286#ifdef DEBUG_SPEW
287         Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' going to sleep", getId() );
288#endif
289         mPool->mSemaphore.acquire();
290#ifdef DEBUG_SPEW
291         Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' waking up", getId() );
292#endif
293
294         dFetchAndAdd( mPool->mNumThreadsAwake, 1 );
295         dFetchAndAdd( mPool->mNumThreadsReady, 1 );
296      }
297   }
298}
299
300//=============================================================================
301//    ThreadPool.
302//=============================================================================
303
304bool                          ThreadPool::smForceAllMainThread;
305U32                           ThreadPool::smMainThreadTimeMS;
306ThreadPool::QueueType         ThreadPool::smMainThreadQueue;
307
308//--------------------------------------------------------------------------
309
310ThreadPool::ThreadPool( const char* name, U32 numThreads )
311   : mName( name ),
312     mNumThreads( numThreads ),
313     mNumThreadsAwake( 0 ),
314     mNumPendingItems( 0 ),
315     mSemaphore( 0 ),
316     mThreads( 0 )
317{
318   // Number of worker threads to create.
319
320   if( !mNumThreads )
321   {
322      // Use platformCPUInfo directly as in the case of the global pool,
323      // Platform::SystemInfo will not yet have been initialized.
324      
325      U32 numLogical = 0;
326      U32 numPhysical = 0;
327      U32 numCores = 0;
328
329      CPUInfo::CPUCount( numLogical, numCores, numPhysical );
330      
331      const U32 baseCount = getMax( numLogical, numCores );
332      mNumThreads = (baseCount > 0) ? baseCount : 2;
333   }
334   
335   #ifdef DEBUG_SPEW
336   Platform::outputDebugString( "[ThreadPool] spawning %i threads", mNumThreads );
337   #endif
338
339   // Create the threads.
340
341   mNumThreadsAwake = mNumThreads;
342   mNumThreadsReady = mNumThreads;
343   for( U32 i = 0; i < mNumThreads; i ++ )
344   {
345      WorkerThread* thread = new WorkerThread( this, i );
346      thread->start();
347   }
348}
349
350//--------------------------------------------------------------------------
351
352ThreadPool::~ThreadPool()
353{
354   shutdown();
355}
356
357//--------------------------------------------------------------------------
358
359void ThreadPool::shutdown()
360{
361   const U32 numThreads = mNumThreads;
362   
363   // Tell our worker threads to stop.
364
365   for( WorkerThread* thread = mThreads; thread != 0; thread = thread->getNext() )
366      thread->stop();
367
368   // Release the semaphore as many times as there are threads.
369   // Doing this separately guarantees we're not waking a thread
370   // that hasn't been set its stop flag yet.
371
372   for( U32 n = 0; n < numThreads; ++ n )
373      mSemaphore.release();
374
375   // Delete each worker thread.  Wait until death as we're prone to
376   // running into issues with decomposing work item lists otherwise.
377
378   for( WorkerThread* thread = mThreads; thread != 0; )
379   {
380      WorkerThread* next = thread->getNext();
381      thread->join();
382      delete thread;
383      thread = next;
384   }
385
386   mThreads = NULL;
387   mNumThreads = 0;
388}
389
390//--------------------------------------------------------------------------
391
392void ThreadPool::queueWorkItem( WorkItem* item )
393{
394   bool executeRightAway = ( getForceAllMainThread() );
395#ifdef DEBUG_SPEW
396   Platform::outputDebugString( "[ThreadPool] %s work item '0x%x'",
397                                ( executeRightAway ? "executing" : "queuing" ),
398                                item );
399#endif
400
401   if( executeRightAway )
402      item->process();
403   else
404   {
405      // Put the item in the queue.
406      dFetchAndAdd( mNumPendingItems, 1 );
407      mWorkItemQueue.insert( item->getPriority(), item );
408
409      mSemaphore.release();
410   }
411}
412
413//--------------------------------------------------------------------------
414
415void ThreadPool::flushWorkItems( S32 timeOut )
416{
417   AssertFatal( mNumThreads, "ThreadPool::flushWorkItems() - no worker threads in pool" );
418   
419   U32 endTime = 0;
420   if( timeOut != -1 )
421      endTime = Platform::getRealMilliseconds() + timeOut;
422
423   // Spinlock until the queue is empty.
424
425   while( !mWorkItemQueue.isEmpty() )
426   {
427      Platform::sleep( 25 );
428
429      // Stop if we have exceeded our processing time budget.
430
431      if( timeOut != -1
432          && Platform::getRealMilliseconds() >= endTime )
433          break;
434   }
435}
436
437void ThreadPool::waitForAllItems( S32 timeOut )
438{
439   U32 endTime = 0;
440   if( timeOut != -1 )
441      endTime = Platform::getRealMilliseconds() + timeOut;
442
443   // Spinlock until there are no items that have not been processed.
444
445   while( dAtomicRead( mNumPendingItems ) )
446   {
447      Platform::sleep( 25 );
448
449      // Stop if we have exceeded our processing time budget.
450
451      if( timeOut != -1
452          && Platform::getRealMilliseconds() >= endTime )
453          break;
454   }
455}
456
457//--------------------------------------------------------------------------
458
459void ThreadPool::queueWorkItemOnMainThread( WorkItem* item )
460{
461   smMainThreadQueue.insert( item->getPriority(), item );
462}
463
464//--------------------------------------------------------------------------
465
466void ThreadPool::processMainThreadWorkItems()
467{
468   AssertFatal( ThreadManager::isMainThread(),
469      "ThreadPool::processMainThreadWorkItems - this function must only be called on the main thread" );
470
471   U32 timeLimit = ( Platform::getRealMilliseconds() + getMainThreadThresholdTimeMS() );
472
473   do
474   {
475      WorkItemWrapper item;
476      if( !smMainThreadQueue.takeNext( item ) )
477         break;
478      else
479         item->process();
480   }
481   while( Platform::getRealMilliseconds() < timeLimit );
482}
483