threadPool.h
Engine/source/platform/threads/threadPool.h
Interface for an asynchronous work manager.
Classes:
class
Asynchronous work manager.
class
A ThreadPool context defines a logical context in which WorkItems are being executed.
class
An action to execute on a worker thread from the pool.
Public Typedefs
ThreadContext
ThreadWorkItem
Detailed Description
Interface for an asynchronous work manager.
Public Typedefs
typedef ThreadPool::Context ThreadContext
typedef ThreadPool::WorkItem ThreadWorkItem
1 2//----------------------------------------------------------------------------- 3// Copyright (c) 2012 GarageGames, LLC 4// 5// Permission is hereby granted, free of charge, to any person obtaining a copy 6// of this software and associated documentation files (the "Software"), to 7// deal in the Software without restriction, including without limitation the 8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9// sell copies of the Software, and to permit persons to whom the Software is 10// furnished to do so, subject to the following conditions: 11// 12// The above copyright notice and this permission notice shall be included in 13// all copies or substantial portions of the Software. 14// 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21// IN THE SOFTWARE. 22//----------------------------------------------------------------------------- 23 24#ifndef _THREADPOOL_H_ 25#define _THREADPOOL_H_ 26 27#ifndef _THREADSAFEREFCOUNT_H_ 28 #include "platform/threads/threadSafeRefCount.h" 29#endif 30#ifndef _THREADSAFEPRIORITYQUEUE_H_ 31 #include "platform/threads/threadSafePriorityQueue.h" 32#endif 33#ifndef _PLATFORM_THREAD_SEMAPHORE_H_ 34 #include "platform/threads/semaphore.h" 35#endif 36#ifndef _TSINGLETON_H_ 37 #include "core/util/tSingleton.h" 38#endif 39 40 41/// @file 42/// Interface for an asynchronous work manager. 43 44 45/// Asynchronous work manager. 46/// 47/// Thread pooling allows to submit work items for background execution. 48/// Each work item will be placed on a queue and, based on a total priority 49/// ordering, executed when it has the highest priority and a worker thread 50/// becomes available. 51/// 52/// @note The global pool maintains the invariant that only the main thread 53/// may submit items in order to be able to flush the item queue reliably 54/// from the main thread itself. If other threads were issuing items to 55/// the queue, the queue may never empty out and the main thread will 56/// deadlock. 57/// 58/// Flushing is the simplest method to guarantee that no asynchronous 59/// operation is pending in a specific case (deletion of the target object 60/// being the most common case). However, when possible, avoid this 61/// situation and design your work items to operate independently, 62/// e.g. by having only a single point of access to data that may have 63/// disappeared in the meantime and putting a check around that single 64/// access so that the item will silently die when its target object has 65/// disappeared. 66/// 67/// The cleanest safe solution to this is to create a separate concurrently 68/// reference-counted structure that holds all interfacing state and 69/// functionality shared between a work item and its issueing code. This way 70/// the host object can safely disappear with the interfacing structure 71/// automatically being released once the last concurrent work item has been 72/// processed or discarded. 73/// 74class ThreadPool 75{ 76 public: 77 78 /// A ThreadPool context defines a logical context in which WorkItems are 79 /// being executed. Their primary use is for biasing priorities of 80 /// WorkItems. 81 /// 82 /// Contexts are arranged in a tree hierarchy. Each parent node's priority 83 /// bias scales all the priority biases underneath it. 84 /// 85 /// Note that instances of this class are meant to be instantiated 86 /// globally only. 87 /// 88 class Context 89 { 90 protected: 91 92 /// Superordinate context; scales this context's priority bias. 93 Context* mParent; 94 95 /// First child. 96 Context* mChildren; 97 98 /// Next sibling in child chain. 99 Context* mSibling; 100 101 /// Name of this context. Should be unique in parent namespace. 102 const char* mName; 103 104 /// Priority scale factor of this context. 105 F32 mPriorityBias; 106 107 /// Accumulated scale factor. 108 F32 mAccumulatedPriorityBias; 109 110 /// The root context; does not modify priorities. All contexts should be direct or indirect children of this one. 111 static Context smRootContext; 112 113 /// Recursively update cached accumulated priority biases. 114 void updateAccumulatedPriorityBiases(); 115 116 public: 117 118 Context( const char* name, Context* parent, F32 priorityBias ); 119 ~Context(); 120 121 /// Return the name of the worker threading context. 122 const char* getName() const 123 { 124 return mName; 125 } 126 127 /// Return the context's own work item priority bias. 128 F32 getPriorityBias() const 129 { 130 return mPriorityBias; 131 } 132 133 /// Return the superordinate node to the current context. 134 Context* getParent() const 135 { 136 return mParent; 137 } 138 139 /// Return the next sibling to the current context. 140 Context* getSibling() const 141 { 142 return mSibling; 143 } 144 145 /// Return the first child context. 146 Context* getChildren() const 147 { 148 return mChildren; 149 } 150 151 /// Return the root context. 152 static Context* ROOT_CONTEXT() 153 { 154 return &smRootContext; 155 } 156 157 /// 158 F32 getAccumulatedPriorityBias(); 159 160 /// 161 Context* getChild( const char* name ); 162 163 /// 164 void setPriorityBias( F32 value ); 165 }; 166 167 /// An action to execute on a worker thread from the pool. 168 /// 169 /// Work items are concurrently reference-counted and will be 170 /// automatically released once the last reference disappears. 171 /// 172 class WorkItem : public ThreadSafeRefCount< WorkItem > 173 { 174 public: 175 176 typedef ThreadSafeRefCount< WorkItem> Parent; 177 178 protected: 179 180 /// The work context of this item. 181 Context* mContext; 182 183 /// Mark a point in a work item's execution where the item can 184 /// be safely cancelled. 185 /// 186 /// This method should be called by subclasses' execute() methods 187 /// whenever an item can be safely cancelled. When it returns true, 188 /// the work item should exit from its execute() method. 189 bool cancellationPoint(); 190 191 /// Called when the item has been cancelled. 192 virtual void onCancelled() {} 193 194 /// Execute the actions associated with this work item. 195 /// This is the primary function to implement by subclasses. 196 virtual void execute() = 0; 197 198 /// This flag is set after the execute() method has completed. 199 bool mExecuted; 200 201 public: 202 203 /// Construct a new work item. 204 /// 205 /// @param context The work context in which the item should be placed. 206 /// If NULL, the root context will be used. 207 WorkItem( Context* context = 0 ) 208 : mContext( context ? context : Context::ROOT_CONTEXT() ), 209 mExecuted( false ) 210 { 211 } 212 213 virtual ~WorkItem() {} 214 215 /// Return the work context associated with the work item. 216 inline Context* getContext() const 217 { 218 return mContext; 219 } 220 221 /// Process the work item. 222 void process(); 223 224 /// Return true if the work item should be cancelled. 225 /// 226 /// This method can be overridden by subclasses. It's value will be 227 /// checked each time cancellationPoint() is called. When it returns 228 /// true, the item's process() method will exit automatically. 229 /// 230 /// @return true, if item should be cancelled; default is false. 231 /// @see ThreadPool::WorkItem::cancellationPoint 232 virtual bool isCancellationRequested(); 233 234 /// Return the item's base priority value. 235 /// @return item priority; defaults to 1.0. 236 virtual F32 getPriority(); 237 238 /// Has this work item been executed already? 239 bool hasExecuted() const 240 { 241 return mExecuted; 242 } 243 }; 244 245 typedef ThreadSafeRef< WorkItem> WorkItemPtr; 246 struct GlobalThreadPool; 247 248 protected: 249 250 struct WorkItemWrapper; 251 struct WorkerThread; 252 253 friend struct WorkerThread; // mSemaphore, mNumThreadsAwake, mThreads 254 255 typedef ThreadSafePriorityQueueWithUpdate< WorkItemWrapper, F32> QueueType; 256 257 /// Name of this pool. Mainly for debugging. Used to name worker threads. 258 String mName; 259 260 /// Number of worker threads spawned by the pool. 261 U32 mNumThreads; 262 263 /// Number of worker threads in non-sleeping state. 264 U32 mNumThreadsAwake; 265 266 /// Number of worker threads guaranteed to be non-blocking. 267 U32 mNumThreadsReady; 268 269 /// Number of work items that have not yet completed execution. 270 U32 mNumPendingItems; 271 272 /// Semaphore used to wake up threads, if necessary. 273 Semaphore mSemaphore; 274 275 /// Threaded priority queue for concurrent access by worker threads. 276 QueueType mWorkItemQueue; 277 278 /// List of worker threads. 279 WorkerThread* mThreads; 280 281 /// Force all work items to execute on main thread; 282 /// turns this into a single-threaded system. 283 /// Primarily useful to find whether malfunctions are caused 284 /// by parallel execution or not. 285 static bool smForceAllMainThread; 286 287 /// 288 static U32 smMainThreadTimeMS; 289 290 /// Work queue for main thread; can be used to ping back work items to 291 /// main thread that need processing that can only happen on main thread. 292 static QueueType smMainThreadQueue; 293 294 public: 295 296 /// Create a new thread pool with the given number of worker threads. 297 /// 298 /// If numThreads is zero (the default), the number of threads created 299 /// will be based on the number of CPU cores available. 300 /// 301 /// @param numThreads Number of threads to create or zero for default. 302 ThreadPool( const char* name, U32 numThreads = 0 ); 303 304 ~ThreadPool(); 305 306 /// Manually shutdown threads outside of static destructors. 307 void shutdown(); 308 309 /// 310 void queueWorkItem( WorkItem* item ); 311 312 /// 313 /// <em>For the global pool, it is very important to only ever call 314 /// this function on the main thread and to let work items only ever 315 /// come from the main thread. Otherwise this function has the potential 316 /// of dead-locking as new work items may constantly be fed to the queue 317 /// without it ever getting empty.</em> 318 /// 319 /// @param timeOut Soft limit on the number of milliseconds to wait for 320 /// the queue to flush out. -1 = infinite. 321 void flushWorkItems( S32 timeOut = -1 ); 322 323 /// If you're using a non-global thread pool to parallelise some work, you 324 /// may want to block until all the parallel work is complete. As with 325 /// flushWorkItems, this method may block indefinitely if new items keep 326 /// getting added to the pool before old ones finish. 327 /// 328 /// <em>This method will not wait for items queued on the main thread using 329 /// queueWorkItemOnMainThread!</em> 330 /// 331 /// @param timeOut Soft limit on the number of milliseconds to wait for 332 /// all items to complete. -1 = infinite. 333 void waitForAllItems( S32 timeOut = -1 ); 334 335 /// Add a work item to the main thread's work queue. 336 /// 337 /// The main thread's work queue will be processed each frame using 338 /// a set timeout to limit the work being done. Nonetheless, work 339 /// items will not be suspended in-midst of processing, so make sure 340 /// that whatever work you issue to the main thread is light work 341 /// or you may see short hangs in gameplay. 342 /// 343 /// To reiterate this: any code executed through this interface directly 344 /// adds to frame processing time on the main thread. 345 /// 346 /// This method *may* (and is meant to) be called from threads 347 /// other than the main thread. 348 static void queueWorkItemOnMainThread( WorkItem* item ); 349 350 /// Process work items waiting on the main thread's work queue. 351 /// 352 /// There is a soft limit imposed on the time this method is allowed 353 /// to run so as to balance frame-to-frame load. However, work 354 /// items, once their processing is initiated, will not be suspended 355 /// and will run for as long as they take to complete, so make sure 356 /// individual items perform as little work as necessary. 357 /// 358 /// @see ThreadPool::getMainThreadThesholdTimeMS 359 static void processMainThreadWorkItems(); 360 361 /// Return the interval in which item priorities are updated on the queue. 362 /// @return update interval in milliseconds. 363 U32 getQueueUpdateInterval() const 364 { 365 return mWorkItemQueue.getUpdateInterval(); 366 } 367 368 /// Return the priority increment applied to work items on each passing of the update interval. 369 F32 getQueueTimeBasedPriorityBoost() const 370 { 371 return mWorkItemQueue.getTimeBasedPriorityBoost(); 372 } 373 374 /// Set the update interval of the work item queue to the given value. 375 /// @param milliSeconds Time between updates in milliseconds. 376 void setQueueUpdateInterval( U32 milliSeconds ) 377 { 378 mWorkItemQueue.setUpdateInterval( milliSeconds ); 379 } 380 381 /// Set the priority increment applied to work items on each update interval. 382 /// @param value Priority increment. Set to zero to deactivate. 383 void setQueueTimeBasedPriorityBoost( F32 value ) 384 { 385 mWorkItemQueue.setTimeBasedPriorityBoost( value ); 386 } 387 388 /// 389 static U32& getMainThreadThresholdTimeMS() 390 { 391 return smMainThreadTimeMS; 392 } 393 394 /// 395 static bool& getForceAllMainThread() 396 { 397 return smForceAllMainThread; 398 } 399 400 /// Return the global thread pool singleton. 401 static ThreadPool& GLOBAL(); 402}; 403 404typedef ThreadPool::Context ThreadContext; 405typedef ThreadPool::WorkItem ThreadWorkItem; 406 407 408struct ThreadPool::GlobalThreadPool : public ThreadPool, public ManagedSingleton< GlobalThreadPool > 409{ 410 typedef ThreadPool Parent; 411 412 GlobalThreadPool() 413 : Parent( "GLOBAL" ) {} 414 415 // For ManagedSingleton. 416 static const char* getSingletonName() { return "GlobalThreadPool"; } 417}; 418 419inline ThreadPool& ThreadPool::GLOBAL() 420{ 421 return *( GlobalThreadPool::instance() ); 422} 423 424#endif // !_THREADPOOL_H_ 425