threadPool.h

Engine/source/platform/threads/threadPool.h

Interface for an asynchronous work manager.

More...

Classes:

class

Asynchronous work manager.

class

A ThreadPool context defines a logical context in which WorkItems are being executed.

class

An action to execute on a worker thread from the pool.

Public Typedefs

ThreadContext 
ThreadWorkItem 

Detailed Description

Interface for an asynchronous work manager.

Public Typedefs

typedef ThreadPool::Context ThreadContext 
typedef ThreadPool::WorkItem ThreadWorkItem 
  1
  2//-----------------------------------------------------------------------------
  3// Copyright (c) 2012 GarageGames, LLC
  4//
  5// Permission is hereby granted, free of charge, to any person obtaining a copy
  6// of this software and associated documentation files (the "Software"), to
  7// deal in the Software without restriction, including without limitation the
  8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9// sell copies of the Software, and to permit persons to whom the Software is
 10// furnished to do so, subject to the following conditions:
 11//
 12// The above copyright notice and this permission notice shall be included in
 13// all copies or substantial portions of the Software.
 14//
 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 21// IN THE SOFTWARE.
 22//-----------------------------------------------------------------------------
 23
 24#ifndef _THREADPOOL_H_
 25#define _THREADPOOL_H_
 26
 27#ifndef _THREADSAFEREFCOUNT_H_
 28   #include "platform/threads/threadSafeRefCount.h"
 29#endif
 30#ifndef _THREADSAFEPRIORITYQUEUE_H_
 31   #include "platform/threads/threadSafePriorityQueue.h"
 32#endif
 33#ifndef _PLATFORM_THREAD_SEMAPHORE_H_
 34   #include "platform/threads/semaphore.h"
 35#endif
 36#ifndef _TSINGLETON_H_
 37   #include "core/util/tSingleton.h"
 38#endif
 39
 40
 41/// @file
 42/// Interface for an asynchronous work manager.
 43
 44
 45/// Asynchronous work manager.
 46///
 47/// Thread pooling allows to submit work items for background execution.
 48/// Each work item will be placed on a queue and, based on a total priority
 49/// ordering, executed when it has the highest priority and a worker thread
 50/// becomes available.
 51///
 52/// @note The global pool maintains the invariant that only the main thread
 53///   may submit items in order to be able to flush the item queue reliably
 54///   from the main thread itself.  If other threads were issuing items to
 55///   the queue, the queue may never empty out and the main thread will
 56///   deadlock.
 57///
 58///   Flushing is the simplest method to guarantee that no asynchronous
 59///   operation is pending in a specific case (deletion of the target object
 60///   being the most common case).  However, when possible, avoid this
 61///   situation and design your work items to operate independently,
 62///   e.g. by having only a single point of access to data that may have
 63///   disappeared in the meantime and putting a check around that single
 64///   access so that the item will silently die when its target object has
 65///   disappeared.
 66///
 67///   The cleanest safe solution to this is to create a separate concurrently
 68///   reference-counted structure that holds all interfacing state and
 69///   functionality shared between a work item and its issueing code.  This way
 70///   the host object can safely disappear with the interfacing structure
 71///   automatically being released once the last concurrent work item has been
 72///   processed or discarded.
 73///
 74class ThreadPool
 75{
 76   public:
 77   
 78      /// A ThreadPool context defines a logical context in which WorkItems are
 79      /// being executed.  Their primary use is for biasing priorities of
 80      /// WorkItems.
 81      ///
 82      /// Contexts are arranged in a tree hierarchy.  Each parent node's priority
 83      /// bias scales all the priority biases underneath it.
 84      ///
 85      /// Note that instances of this class are meant to be instantiated
 86      /// globally only.
 87      ///
 88      class Context
 89      {
 90         protected:
 91         
 92            /// Superordinate context; scales this context's priority bias.
 93            Context* mParent;
 94            
 95            /// First child.
 96            Context* mChildren;
 97            
 98            /// Next sibling in child chain.
 99            Context* mSibling;
100            
101            /// Name of this context.  Should be unique in parent namespace.
102            const char* mName;
103            
104            /// Priority scale factor of this context.
105            F32 mPriorityBias;
106            
107            /// Accumulated scale factor.
108            F32 mAccumulatedPriorityBias;
109
110            /// The root context; does not modify priorities.  All contexts should be direct or indirect children of this one.
111            static Context smRootContext;
112
113            /// Recursively update cached accumulated priority biases.
114            void updateAccumulatedPriorityBiases();
115
116         public:
117         
118            Context( const char* name, Context* parent, F32 priorityBias );
119            ~Context();
120
121            /// Return the name of the worker threading context.
122            const char* getName() const
123            {
124               return mName;
125            }
126
127            /// Return the context's own work item priority bias.
128            F32 getPriorityBias() const
129            {
130               return mPriorityBias;
131            }
132
133            /// Return the superordinate node to the current context.
134            Context* getParent() const
135            {
136               return mParent;
137            }
138
139            /// Return the next sibling to the current context.
140            Context* getSibling() const
141            {
142               return mSibling;
143            }
144
145            /// Return the first child context.
146            Context* getChildren() const
147            {
148               return mChildren;
149            }
150
151            /// Return the root context.
152            static Context* ROOT_CONTEXT()
153            {
154               return &smRootContext;
155            }
156            
157            ///
158            F32 getAccumulatedPriorityBias();
159            
160            ///
161            Context* getChild( const char* name );
162
163            ///
164            void setPriorityBias( F32 value );
165      };
166
167      /// An action to execute on a worker thread from the pool.
168      ///
169      /// Work items are concurrently reference-counted and will be
170      /// automatically released once the last reference disappears.
171      ///
172      class WorkItem : public ThreadSafeRefCount< WorkItem >
173      {
174         public:
175
176            typedef ThreadSafeRefCount< WorkItem> Parent;
177
178         protected:
179         
180            /// The work context of this item.
181            Context* mContext;
182
183            /// Mark a point in a work item's execution where the item can
184            /// be safely cancelled.
185            ///
186            /// This method should be called by subclasses' execute() methods
187            /// whenever an item can be safely cancelled.  When it returns true,
188            /// the work item should exit from its execute() method.
189            bool cancellationPoint();
190            
191            /// Called when the item has been cancelled.
192            virtual void onCancelled() {}
193
194            /// Execute the actions associated with this work item.
195            /// This is the primary function to implement by subclasses.
196            virtual void execute() = 0;
197
198            /// This flag is set after the execute() method has completed.
199            bool mExecuted;
200
201         public:
202         
203            /// Construct a new work item.
204            ///
205            /// @param context The work context in which the item should be placed.
206            ///    If NULL, the root context will be used.
207            WorkItem( Context* context = 0 )
208               : mContext( context ? context : Context::ROOT_CONTEXT() ),
209                 mExecuted( false )
210            {
211            }
212            
213            virtual ~WorkItem() {}
214
215            /// Return the work context associated with the work item.
216            inline Context* getContext() const
217            {
218               return mContext;
219            }
220
221            /// Process the work item.
222            void process();
223            
224            /// Return true if the work item should be cancelled.
225            ///
226            /// This method can be overridden by subclasses.  It's value will be
227            /// checked each time cancellationPoint() is called.  When it returns
228            /// true, the item's process() method will exit automatically.
229            ///
230            /// @return true, if item should be cancelled; default is false.
231            /// @see ThreadPool::WorkItem::cancellationPoint
232            virtual bool isCancellationRequested();
233            
234            /// Return the item's base priority value.
235            /// @return item priority; defaults to 1.0.
236            virtual F32 getPriority();
237
238            /// Has this work item been executed already?
239            bool hasExecuted() const
240            {
241               return mExecuted;
242            }
243      };
244
245      typedef ThreadSafeRef< WorkItem> WorkItemPtr;
246      struct GlobalThreadPool;
247      
248   protected:
249   
250      struct WorkItemWrapper;
251      struct WorkerThread;
252
253      friend struct WorkerThread; // mSemaphore, mNumThreadsAwake, mThreads
254
255      typedef ThreadSafePriorityQueueWithUpdate< WorkItemWrapper, F32> QueueType;
256
257      /// Name of this pool.  Mainly for debugging.  Used to name worker threads.
258      String mName;
259      
260      /// Number of worker threads spawned by the pool.
261      U32 mNumThreads;
262      
263      /// Number of worker threads in non-sleeping state.
264      U32 mNumThreadsAwake;
265      
266      /// Number of worker threads guaranteed to be non-blocking.
267      U32 mNumThreadsReady;
268
269      /// Number of work items that have not yet completed execution.
270      U32 mNumPendingItems;
271      
272      /// Semaphore used to wake up threads, if necessary.
273      Semaphore mSemaphore;
274      
275      /// Threaded priority queue for concurrent access by worker threads.
276      QueueType mWorkItemQueue;
277      
278      /// List of worker threads.
279      WorkerThread* mThreads;
280
281      /// Force all work items to execute on main thread;
282      /// turns this into a single-threaded system.
283      /// Primarily useful to find whether malfunctions are caused
284      /// by parallel execution or not.
285      static bool smForceAllMainThread;
286      
287      ///
288      static U32 smMainThreadTimeMS;
289            
290      /// Work queue for main thread; can be used to ping back work items to
291      /// main thread that need processing that can only happen on main thread.
292      static QueueType smMainThreadQueue;
293
294   public:
295
296      /// Create a new thread pool with the given number of worker threads.
297      ///
298      /// If numThreads is zero (the default), the number of threads created
299      /// will be based on the number of CPU cores available.
300      ///
301      /// @param numThreads Number of threads to create or zero for default.
302      ThreadPool( const char* name, U32 numThreads = 0 );
303      
304      ~ThreadPool();
305
306      /// Manually shutdown threads outside of static destructors.
307      void shutdown();
308
309      ///
310      void queueWorkItem( WorkItem* item );
311      
312      ///
313      /// <em>For the global pool, it is very important to only ever call
314      /// this function on the main thread and to let work items only ever
315      /// come from the main thread.  Otherwise this function has the potential
316      /// of dead-locking as new work items may constantly be fed to the queue
317      /// without it ever getting empty.</em>
318      ///
319      /// @param timeOut Soft limit on the number of milliseconds to wait for
320      ///   the queue to flush out.  -1 = infinite.
321      void flushWorkItems( S32 timeOut = -1 );
322
323      /// If you're using a non-global thread pool to parallelise some work, you
324      /// may want to block until all the parallel work is complete. As with
325      /// flushWorkItems, this method may block indefinitely if new items keep
326      /// getting added to the pool before old ones finish.
327      ///
328      /// <em>This method will not wait for items queued on the main thread using
329      /// queueWorkItemOnMainThread!</em>
330      ///
331      /// @param timeOut Soft limit on the number of milliseconds to wait for
332      ///   all items to complete.  -1 = infinite.
333      void waitForAllItems( S32 timeOut = -1 );
334
335      /// Add a work item to the main thread's work queue.
336      ///
337      /// The main thread's work queue will be processed each frame using
338      /// a set timeout to limit the work being done.  Nonetheless, work
339      /// items will not be suspended in-midst of processing, so make sure
340      /// that whatever work you issue to the main thread is light work
341      /// or you may see short hangs in gameplay.
342      ///
343      /// To reiterate this: any code executed through this interface directly
344      /// adds to frame processing time on the main thread.
345      ///
346      /// This method *may* (and is meant to) be called from threads
347      /// other than the main thread.
348      static void queueWorkItemOnMainThread( WorkItem* item );
349      
350      /// Process work items waiting on the main thread's work queue.
351      ///
352      /// There is a soft limit imposed on the time this method is allowed
353      /// to run so as to balance frame-to-frame load.  However, work
354      /// items, once their processing is initiated, will not be suspended
355      /// and will run for as long as they take to complete, so make sure
356      /// individual items perform as little work as necessary.
357      ///
358      /// @see ThreadPool::getMainThreadThesholdTimeMS
359      static void processMainThreadWorkItems();
360
361      /// Return the interval in which item priorities are updated on the queue.
362      /// @return update interval in milliseconds.
363      U32 getQueueUpdateInterval() const
364      {
365         return mWorkItemQueue.getUpdateInterval();
366      }
367
368      /// Return the priority increment applied to work items on each passing of the update interval.
369      F32 getQueueTimeBasedPriorityBoost() const
370      {
371         return mWorkItemQueue.getTimeBasedPriorityBoost();
372      }
373
374      /// Set the update interval of the work item queue to the given value.
375      /// @param milliSeconds Time between updates in milliseconds.
376      void setQueueUpdateInterval( U32 milliSeconds )
377      {
378         mWorkItemQueue.setUpdateInterval( milliSeconds );
379      }
380
381      /// Set the priority increment applied to work items on each update interval.
382      /// @param value Priority increment.  Set to zero to deactivate.
383      void setQueueTimeBasedPriorityBoost( F32 value )
384      {
385         mWorkItemQueue.setTimeBasedPriorityBoost( value );
386      }
387
388      ///
389      static U32& getMainThreadThresholdTimeMS()
390      {
391         return smMainThreadTimeMS;
392      }
393
394      ///
395      static bool& getForceAllMainThread()
396      {
397         return smForceAllMainThread;
398      }
399
400      /// Return the global thread pool singleton.
401      static ThreadPool& GLOBAL();
402};
403
404typedef ThreadPool::Context ThreadContext;
405typedef ThreadPool::WorkItem ThreadWorkItem;
406
407
408struct ThreadPool::GlobalThreadPool : public ThreadPool, public ManagedSingleton< GlobalThreadPool >
409{
410   typedef ThreadPool Parent;
411   
412   GlobalThreadPool()
413      : Parent( "GLOBAL" ) {}
414      
415   // For ManagedSingleton.
416   static const char* getSingletonName() { return "GlobalThreadPool"; }
417};
418
419inline ThreadPool& ThreadPool::GLOBAL()
420{
421   return *( GlobalThreadPool::instance() );
422}
423
424#endif // !_THREADPOOL_H_
425