threadPool.cpp
Engine/source/platform/threads/threadPool.cpp
Classes:
class
Value wrapper for work items while placed on priority queue.
Detailed Description
1 2//----------------------------------------------------------------------------- 3// Copyright (c) 2012 GarageGames, LLC 4// 5// Permission is hereby granted, free of charge, to any person obtaining a copy 6// of this software and associated documentation files (the "Software"), to 7// deal in the Software without restriction, including without limitation the 8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9// sell copies of the Software, and to permit persons to whom the Software is 10// furnished to do so, subject to the following conditions: 11// 12// The above copyright notice and this permission notice shall be included in 13// all copies or substantial portions of the Software. 14// 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21// IN THE SOFTWARE. 22//----------------------------------------------------------------------------- 23 24#include "platform/threads/threadPool.h" 25#include "platform/threads/thread.h" 26#include "platform/platformCPUCount.h" 27#include "core/strings/stringFunctions.h" 28#include "core/util/tSingleton.h" 29 30 31//#define DEBUG_SPEW 32 33 34//============================================================================= 35// ThreadPool::Context. 36//============================================================================= 37 38ThreadPool::Context ThreadPool::Context::smRootContext( "ROOT", NULL, 1.0 ); 39 40//-------------------------------------------------------------------------- 41 42ThreadPool::Context::Context( const char* name, ThreadPool::Context* parent, F32 priorityBias ) 43 : mName( name ), 44 mParent( parent ), 45 mSibling( 0 ), 46 mChildren( 0 ), 47 mPriorityBias( priorityBias ), 48 mAccumulatedPriorityBias( 0.0 ) 49{ 50 if( parent ) 51 { 52 mSibling = mParent->mChildren; 53 mParent->mChildren = this; 54 } 55} 56 57//-------------------------------------------------------------------------- 58 59ThreadPool::Context::~Context() 60{ 61 if( mParent ) 62 for( Context* context = mParent->mChildren, *prev = 0; context != 0; prev = context, context = context->mSibling ) 63 if( context == this ) 64 { 65 if( !prev ) 66 mParent->mChildren = this->mSibling; 67 else 68 prev->mSibling = this->mSibling; 69 } 70} 71 72//-------------------------------------------------------------------------- 73 74ThreadPool::Context* ThreadPool::Context::getChild( const char* name ) 75{ 76 for( Context* child = getChildren(); child != 0; child = child->getSibling() ) 77 if( dStricmp( child->getName(), name ) == 0 ) 78 return child; 79 return 0; 80} 81 82//-------------------------------------------------------------------------- 83 84F32 ThreadPool::Context::getAccumulatedPriorityBias() 85{ 86 if( !mAccumulatedPriorityBias ) 87 updateAccumulatedPriorityBiases(); 88 return mAccumulatedPriorityBias; 89} 90 91//-------------------------------------------------------------------------- 92 93void ThreadPool::Context::setPriorityBias( F32 value ) 94{ 95 mPriorityBias = value; 96 mAccumulatedPriorityBias = 0.0; 97} 98 99//-------------------------------------------------------------------------- 100 101void ThreadPool::Context::updateAccumulatedPriorityBiases() 102{ 103 // Update our own priority bias. 104 105 mAccumulatedPriorityBias = mPriorityBias; 106 for( Context* context = getParent(); context != 0; context = context->getParent() ) 107 mAccumulatedPriorityBias *= context->getPriorityBias(); 108 109 // Update our children. 110 111 for( Context* child = getChildren(); child != 0; child = child->getSibling() ) 112 child->updateAccumulatedPriorityBiases(); 113} 114 115//============================================================================= 116// ThreadPool::WorkItem. 117//============================================================================= 118 119//-------------------------------------------------------------------------- 120 121void ThreadPool::WorkItem::process() 122{ 123 execute(); 124 mExecuted = true; 125} 126 127//-------------------------------------------------------------------------- 128 129bool ThreadPool::WorkItem::isCancellationRequested() 130{ 131 return false; 132} 133 134//-------------------------------------------------------------------------- 135 136bool ThreadPool::WorkItem::cancellationPoint() 137{ 138 if( isCancellationRequested() ) 139 { 140 onCancelled(); 141 return true; 142 } 143 else 144 return false; 145} 146 147//-------------------------------------------------------------------------- 148 149F32 ThreadPool::WorkItem::getPriority() 150{ 151 return 1.0; 152} 153 154//============================================================================= 155// ThreadPool::WorkItemWrapper. 156//============================================================================= 157 158/// Value wrapper for work items while placed on priority queue. 159/// Conforms to interface dictated by ThreadSafePriorityQueueWithUpdate. 160/// 161/// @see ThreadSafePriorityQueueWithUpdate 162/// @see ThreadPool::WorkItem 163/// 164struct ThreadPool::WorkItemWrapper : public ThreadSafeRef< WorkItem > 165{ 166 typedef ThreadSafeRef< WorkItem> Parent; 167 168 WorkItemWrapper() {} 169 WorkItemWrapper( WorkItem* item ) 170 : Parent( item ) {} 171 172 bool isAlive(); 173 F32 getPriority(); 174}; 175 176inline bool ThreadPool::WorkItemWrapper::isAlive() 177{ 178 WorkItem* item = ptr(); 179 if( !item ) 180 return false; 181 else if( item->isCancellationRequested() ) 182 { 183 ( *this ) = 0; 184 return false; 185 } 186 else 187 return true; 188} 189 190inline F32 ThreadPool::WorkItemWrapper::getPriority() 191{ 192 WorkItem* item = ptr(); 193 AssertFatal( item != 0, "ThreadPool::WorkItemWrapper::getPriority - called on dead item" ); 194 195 // Compute a scaled priority value based on the item's context. 196 return ( item->getContext()->getAccumulatedPriorityBias() * item->getPriority() ); 197} 198 199//============================================================================= 200// ThreadPool::WorkerThread. 201//============================================================================= 202 203/// 204/// 205struct ThreadPool::WorkerThread : public Thread 206{ 207 WorkerThread( ThreadPool* pool, U32 index ); 208 209 WorkerThread* getNext(); 210 virtual void run( void* arg = 0 ); 211 212private: 213 U32 mIndex; 214 ThreadPool* mPool; 215 WorkerThread* mNext; 216}; 217 218ThreadPool::WorkerThread::WorkerThread( ThreadPool* pool, U32 index ) 219 : mPool( pool ), 220 mIndex( index ) 221{ 222 // Link us to the pool's thread list. 223 224 mNext = pool->mThreads; 225 pool->mThreads = this; 226} 227 228inline ThreadPool::WorkerThread* ThreadPool::WorkerThread::getNext() 229{ 230 return mNext; 231} 232 233void ThreadPool::WorkerThread::run( void* arg ) 234{ 235 #ifdef TORQUE_DEBUG 236 { 237 // Set the thread's name for debugging. 238 char buffer[ 2048 ]; 239 dSprintf( buffer, sizeof( buffer ), "ThreadPool(%s) WorkerThread %i", mPool->mName.c_str(), mIndex ); 240 _setName( buffer ); 241 } 242 #endif 243 244#if defined(TORQUE_OS_XENON) 245 // On Xbox 360 you must explicitly assign software threads to hardware threads. 246 247 // This will distribute job threads across the secondary CPUs leaving both 248 // primary CPU cores available to the "main" thread. This will help prevent 249 // more L2 thrashing of the main thread/core. 250 static U32 sCoreAssignment = 2; 251 XSetThreadProcessor( GetCurrentThread(), sCoreAssignment ); 252 sCoreAssignment = sCoreAssignment < 6 ? sCoreAssignment + 1 : 2; 253#endif 254 255 while( 1 ) 256 { 257 if( checkForStop() ) 258 { 259#ifdef DEBUG_SPEW 260 Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' exits", getId() ); 261#endif 262 dFetchAndAdd( mPool->mNumThreads, ( U32 ) -1 ); 263 return; 264 } 265 266 // Mark us as potentially blocking. 267 dFetchAndAdd( mPool->mNumThreadsReady, ( U32 ) -1 ); 268 269 bool waitForSignal = false; 270 { 271 // Try to take an item from the queue. Do 272 // this in a separate block, so we'll be 273 // releasing the item after we have finished. 274 275 WorkItemWrapper workItem; 276 if( mPool->mWorkItemQueue.takeNext( workItem ) ) 277 { 278 // Mark us as non-blocking as this loop definitely 279 // won't wait on the semaphore. 280 dFetchAndAdd( mPool->mNumThreadsReady, 1 ); 281 282#ifdef DEBUG_SPEW 283 Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem ); 284#endif 285 workItem->process(); 286 287 dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 ); 288 } 289 else 290 waitForSignal = true; 291 } 292 293 if( waitForSignal ) 294 { 295 dFetchAndAdd( mPool->mNumThreadsAwake, ( U32 ) -1 ); 296 297#ifdef DEBUG_SPEW 298 Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' going to sleep", getId() ); 299#endif 300 mPool->mSemaphore.acquire(); 301#ifdef DEBUG_SPEW 302 Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' waking up", getId() ); 303#endif 304 305 dFetchAndAdd( mPool->mNumThreadsAwake, 1 ); 306 dFetchAndAdd( mPool->mNumThreadsReady, 1 ); 307 } 308 } 309} 310 311//============================================================================= 312// ThreadPool. 313//============================================================================= 314 315bool ThreadPool::smForceAllMainThread; 316U32 ThreadPool::smMainThreadTimeMS; 317ThreadPool::QueueType ThreadPool::smMainThreadQueue; 318 319//-------------------------------------------------------------------------- 320 321ThreadPool::ThreadPool( const char* name, U32 numThreads ) 322 : mName( name ), 323 mNumThreads( numThreads ), 324 mNumThreadsAwake( 0 ), 325 mNumPendingItems( 0 ), 326 mThreads( 0 ), 327 mSemaphore( 0 ) 328{ 329 // Number of worker threads to create. 330 331 if( !mNumThreads ) 332 { 333 // Use platformCPUInfo directly as in the case of the global pool, 334 // Platform::SystemInfo will not yet have been initialized. 335 336 U32 numLogical = 0; 337 U32 numPhysical = 0; 338 U32 numCores = 0; 339 340 CPUInfo::CPUCount( numLogical, numCores, numPhysical ); 341 342 const U32 baseCount = getMax( numLogical, numCores ); 343 mNumThreads = (baseCount > 0) ? baseCount : 2; 344 } 345 346 #ifdef DEBUG_SPEW 347 Platform::outputDebugString( "[ThreadPool] spawning %i threads", mNumThreads ); 348 #endif 349 350 // Create the threads. 351 352 mNumThreadsAwake = mNumThreads; 353 mNumThreadsReady = mNumThreads; 354 for( U32 i = 0; i < mNumThreads; i ++ ) 355 { 356 WorkerThread* thread = new WorkerThread( this, i ); 357 thread->start(); 358 } 359} 360 361//-------------------------------------------------------------------------- 362 363ThreadPool::~ThreadPool() 364{ 365 shutdown(); 366} 367 368//-------------------------------------------------------------------------- 369 370void ThreadPool::shutdown() 371{ 372 const U32 numThreads = mNumThreads; 373 374 // Tell our worker threads to stop. 375 376 for( WorkerThread* thread = mThreads; thread != 0; thread = thread->getNext() ) 377 thread->stop(); 378 379 // Release the semaphore as many times as there are threads. 380 // Doing this separately guarantees we're not waking a thread 381 // that hasn't been set its stop flag yet. 382 383 for( U32 n = 0; n < numThreads; ++ n ) 384 mSemaphore.release(); 385 386 // Delete each worker thread. Wait until death as we're prone to 387 // running into issues with decomposing work item lists otherwise. 388 389 for( WorkerThread* thread = mThreads; thread != 0; ) 390 { 391 WorkerThread* next = thread->getNext(); 392 thread->join(); 393 delete thread; 394 thread = next; 395 } 396 397 mThreads = NULL; 398 mNumThreads = 0; 399} 400 401//-------------------------------------------------------------------------- 402 403void ThreadPool::queueWorkItem( WorkItem* item ) 404{ 405 bool executeRightAway = ( getForceAllMainThread() ); 406#ifdef DEBUG_SPEW 407 Platform::outputDebugString( "[ThreadPool] %s work item '0x%x'", 408 ( executeRightAway ? "executing" : "queuing" ), 409 item ); 410#endif 411 412 if( executeRightAway ) 413 item->process(); 414 else 415 { 416 // Put the item in the queue. 417 dFetchAndAdd( mNumPendingItems, 1 ); 418 mWorkItemQueue.insert( item->getPriority(), item ); 419 420 mSemaphore.release(); 421 } 422} 423 424//-------------------------------------------------------------------------- 425 426void ThreadPool::flushWorkItems( S32 timeOut ) 427{ 428 AssertFatal( mNumThreads, "ThreadPool::flushWorkItems() - no worker threads in pool" ); 429 430 U32 endTime = 0; 431 if( timeOut != -1 ) 432 endTime = Platform::getRealMilliseconds() + timeOut; 433 434 // Spinlock until the queue is empty. 435 436 while( !mWorkItemQueue.isEmpty() ) 437 { 438 Platform::sleep( 25 ); 439 440 // Stop if we have exceeded our processing time budget. 441 442 if( timeOut != -1 443 && Platform::getRealMilliseconds() >= endTime ) 444 break; 445 } 446} 447 448void ThreadPool::waitForAllItems( S32 timeOut ) 449{ 450 U32 endTime = 0; 451 if( timeOut != -1 ) 452 endTime = Platform::getRealMilliseconds() + timeOut; 453 454 // Spinlock until there are no items that have not been processed. 455 456 while( dAtomicRead( mNumPendingItems ) ) 457 { 458 Platform::sleep( 25 ); 459 460 // Stop if we have exceeded our processing time budget. 461 462 if( timeOut != -1 463 && Platform::getRealMilliseconds() >= endTime ) 464 break; 465 } 466} 467 468//-------------------------------------------------------------------------- 469 470void ThreadPool::queueWorkItemOnMainThread( WorkItem* item ) 471{ 472 smMainThreadQueue.insert( item->getPriority(), item ); 473} 474 475//-------------------------------------------------------------------------- 476 477void ThreadPool::processMainThreadWorkItems() 478{ 479 AssertFatal( ThreadManager::isMainThread(), 480 "ThreadPool::processMainThreadWorkItems - this function must only be called on the main thread" ); 481 482 U32 timeLimit = ( Platform::getRealMilliseconds() + getMainThreadThresholdTimeMS() ); 483 484 do 485 { 486 WorkItemWrapper item; 487 if( !smMainThreadQueue.takeNext( item ) ) 488 break; 489 else 490 item->process(); 491 } 492 while( Platform::getRealMilliseconds() < timeLimit ); 493} 494
