threadPool.cpp

Engine/source/platform/threads/threadPool.cpp

More...

Classes:

class

Value wrapper for work items while placed on priority queue.

Detailed Description

  1
  2//-----------------------------------------------------------------------------
  3// Copyright (c) 2012 GarageGames, LLC
  4//
  5// Permission is hereby granted, free of charge, to any person obtaining a copy
  6// of this software and associated documentation files (the "Software"), to
  7// deal in the Software without restriction, including without limitation the
  8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9// sell copies of the Software, and to permit persons to whom the Software is
 10// furnished to do so, subject to the following conditions:
 11//
 12// The above copyright notice and this permission notice shall be included in
 13// all copies or substantial portions of the Software.
 14//
 15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 21// IN THE SOFTWARE.
 22//-----------------------------------------------------------------------------
 23
 24#include "platform/threads/threadPool.h"
 25#include "platform/threads/thread.h"
 26#include "platform/platformCPUCount.h"
 27#include "core/strings/stringFunctions.h"
 28#include "core/util/tSingleton.h"
 29
 30
 31//#define DEBUG_SPEW
 32
 33
 34//=============================================================================
 35//    ThreadPool::Context.
 36//=============================================================================
 37
 38ThreadPool::Context ThreadPool::Context::smRootContext( "ROOT", NULL, 1.0 );
 39
 40//--------------------------------------------------------------------------
 41
 42ThreadPool::Context::Context( const char* name, ThreadPool::Context* parent, F32 priorityBias )
 43   : mName( name ),
 44     mParent( parent ),
 45     mSibling( 0 ),
 46     mChildren( 0 ),
 47     mPriorityBias( priorityBias ),
 48     mAccumulatedPriorityBias( 0.0 )
 49{
 50   if( parent )
 51   {
 52      mSibling = mParent->mChildren;
 53      mParent->mChildren = this;
 54   }
 55}
 56
 57//--------------------------------------------------------------------------
 58
 59ThreadPool::Context::~Context()
 60{
 61   if( mParent )
 62      for( Context* context = mParent->mChildren, *prev = 0; context != 0; prev = context, context = context->mSibling )
 63         if( context == this )
 64         {
 65            if( !prev )
 66               mParent->mChildren = this->mSibling;
 67            else
 68               prev->mSibling = this->mSibling;
 69         }
 70}
 71
 72//--------------------------------------------------------------------------
 73
 74ThreadPool::Context* ThreadPool::Context::getChild( const char* name )
 75{
 76   for( Context* child = getChildren(); child != 0; child = child->getSibling() )
 77      if( dStricmp( child->getName(), name ) == 0 )
 78         return child;
 79   return 0;
 80}
 81
 82//--------------------------------------------------------------------------
 83
 84F32 ThreadPool::Context::getAccumulatedPriorityBias()
 85{
 86   if( !mAccumulatedPriorityBias )
 87      updateAccumulatedPriorityBiases();
 88   return mAccumulatedPriorityBias;
 89}
 90
 91//--------------------------------------------------------------------------
 92
 93void ThreadPool::Context::setPriorityBias( F32 value )
 94{
 95   mPriorityBias = value;
 96   mAccumulatedPriorityBias = 0.0;
 97}
 98
 99//--------------------------------------------------------------------------
100
101void ThreadPool::Context::updateAccumulatedPriorityBiases()
102{
103   // Update our own priority bias.
104
105   mAccumulatedPriorityBias = mPriorityBias;
106   for( Context* context = getParent(); context != 0; context = context->getParent() )
107      mAccumulatedPriorityBias *= context->getPriorityBias();
108   
109   // Update our children.
110
111   for( Context* child = getChildren(); child != 0; child = child->getSibling() )
112      child->updateAccumulatedPriorityBiases();
113}
114
115//=============================================================================
116//    ThreadPool::WorkItem.
117//=============================================================================
118
119//--------------------------------------------------------------------------
120
121void ThreadPool::WorkItem::process()
122{
123   execute();
124   mExecuted = true;
125}
126
127//--------------------------------------------------------------------------
128
129bool ThreadPool::WorkItem::isCancellationRequested()
130{
131   return false;
132}
133
134//--------------------------------------------------------------------------
135
136bool ThreadPool::WorkItem::cancellationPoint()
137{
138   if( isCancellationRequested() )
139   {
140      onCancelled();
141      return true;
142   }
143   else
144      return false;
145}
146
147//--------------------------------------------------------------------------
148
149F32 ThreadPool::WorkItem::getPriority()
150{
151   return 1.0;
152}
153
154//=============================================================================
155//    ThreadPool::WorkItemWrapper.
156//=============================================================================
157
158/// Value wrapper for work items while placed on priority queue.
159/// Conforms to interface dictated by ThreadSafePriorityQueueWithUpdate.
160///
161/// @see ThreadSafePriorityQueueWithUpdate
162/// @see ThreadPool::WorkItem
163///
164struct ThreadPool::WorkItemWrapper : public ThreadSafeRef< WorkItem >
165{
166   typedef ThreadSafeRef< WorkItem> Parent;
167
168   WorkItemWrapper() {}
169   WorkItemWrapper( WorkItem* item )
170      : Parent( item ) {}
171
172   bool           isAlive();
173   F32            getPriority();
174};
175
176inline bool ThreadPool::WorkItemWrapper::isAlive()
177{
178   WorkItem* item = ptr();
179   if( !item )
180      return false;
181   else if( item->isCancellationRequested() )
182   {
183      ( *this ) = 0;
184      return false;
185   }
186   else
187      return true;
188}
189
190inline F32 ThreadPool::WorkItemWrapper::getPriority()
191{
192   WorkItem* item = ptr();
193   AssertFatal( item != 0, "ThreadPool::WorkItemWrapper::getPriority - called on dead item" );
194
195   // Compute a scaled priority value based on the item's context.
196   return ( item->getContext()->getAccumulatedPriorityBias() * item->getPriority() );
197}
198
199//=============================================================================
200//    ThreadPool::WorkerThread.
201//=============================================================================
202
203///
204///
205struct ThreadPool::WorkerThread : public Thread
206{
207   WorkerThread( ThreadPool* pool, U32 index );
208
209   WorkerThread*     getNext();
210   virtual void      run( void* arg = 0 );
211
212private:
213   U32               mIndex;
214   ThreadPool*       mPool;
215   WorkerThread*     mNext;
216};
217
218ThreadPool::WorkerThread::WorkerThread( ThreadPool* pool, U32 index )
219   : mPool( pool ),
220     mIndex( index )
221{
222   // Link us to the pool's thread list.
223
224   mNext = pool->mThreads;
225   pool->mThreads = this;
226}
227
228inline ThreadPool::WorkerThread* ThreadPool::WorkerThread::getNext()
229{
230   return mNext;
231}
232
233void ThreadPool::WorkerThread::run( void* arg )
234{
235   #ifdef TORQUE_DEBUG
236   {
237      // Set the thread's name for debugging.
238      char buffer[ 2048 ];
239      dSprintf( buffer, sizeof( buffer ), "ThreadPool(%s) WorkerThread %i", mPool->mName.c_str(), mIndex );
240      _setName( buffer );
241   }
242   #endif
243
244#if defined(TORQUE_OS_XENON)
245   // On Xbox 360 you must explicitly assign software threads to hardware threads.
246
247   // This will distribute job threads across the secondary CPUs leaving both
248   // primary CPU cores available to the "main" thread. This will help prevent
249   // more L2 thrashing of the main thread/core.
250   static U32 sCoreAssignment = 2;
251   XSetThreadProcessor( GetCurrentThread(), sCoreAssignment );
252   sCoreAssignment = sCoreAssignment < 6 ? sCoreAssignment + 1 : 2;
253#endif
254      
255   while( 1 )
256   {
257      if( checkForStop() )
258      {
259#ifdef DEBUG_SPEW
260         Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' exits", getId() );
261#endif
262         dFetchAndAdd( mPool->mNumThreads, ( U32 ) -1 );
263         return;
264      }
265
266      // Mark us as potentially blocking.
267      dFetchAndAdd( mPool->mNumThreadsReady, ( U32 ) -1 );
268
269      bool waitForSignal = false;
270      {
271         // Try to take an item from the queue.  Do
272         // this in a separate block, so we'll be
273         // releasing the item after we have finished.
274
275         WorkItemWrapper workItem;
276         if( mPool->mWorkItemQueue.takeNext( workItem ) )
277         {
278            // Mark us as non-blocking as this loop definitely
279            // won't wait on the semaphore.
280            dFetchAndAdd( mPool->mNumThreadsReady, 1 );
281
282#ifdef DEBUG_SPEW
283            Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem );
284#endif
285            workItem->process();
286
287            dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 );
288         }
289         else
290            waitForSignal = true;
291      }
292
293      if( waitForSignal )
294      {
295         dFetchAndAdd( mPool->mNumThreadsAwake, ( U32 ) -1 );
296
297#ifdef DEBUG_SPEW
298         Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' going to sleep", getId() );
299#endif
300         mPool->mSemaphore.acquire();
301#ifdef DEBUG_SPEW
302         Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' waking up", getId() );
303#endif
304
305         dFetchAndAdd( mPool->mNumThreadsAwake, 1 );
306         dFetchAndAdd( mPool->mNumThreadsReady, 1 );
307      }
308   }
309}
310
311//=============================================================================
312//    ThreadPool.
313//=============================================================================
314
315bool                          ThreadPool::smForceAllMainThread;
316U32                           ThreadPool::smMainThreadTimeMS;
317ThreadPool::QueueType         ThreadPool::smMainThreadQueue;
318
319//--------------------------------------------------------------------------
320
321ThreadPool::ThreadPool( const char* name, U32 numThreads )
322   : mName( name ),
323     mNumThreads( numThreads ),
324     mNumThreadsAwake( 0 ),
325     mNumPendingItems( 0 ),
326     mThreads( 0 ),
327     mSemaphore( 0 )
328{
329   // Number of worker threads to create.
330
331   if( !mNumThreads )
332   {
333      // Use platformCPUInfo directly as in the case of the global pool,
334      // Platform::SystemInfo will not yet have been initialized.
335      
336      U32 numLogical = 0;
337      U32 numPhysical = 0;
338      U32 numCores = 0;
339
340      CPUInfo::CPUCount( numLogical, numCores, numPhysical );
341      
342      const U32 baseCount = getMax( numLogical, numCores );
343      mNumThreads = (baseCount > 0) ? baseCount : 2;
344   }
345   
346   #ifdef DEBUG_SPEW
347   Platform::outputDebugString( "[ThreadPool] spawning %i threads", mNumThreads );
348   #endif
349
350   // Create the threads.
351
352   mNumThreadsAwake = mNumThreads;
353   mNumThreadsReady = mNumThreads;
354   for( U32 i = 0; i < mNumThreads; i ++ )
355   {
356      WorkerThread* thread = new WorkerThread( this, i );
357      thread->start();
358   }
359}
360
361//--------------------------------------------------------------------------
362
363ThreadPool::~ThreadPool()
364{
365   shutdown();
366}
367
368//--------------------------------------------------------------------------
369
370void ThreadPool::shutdown()
371{
372   const U32 numThreads = mNumThreads;
373   
374   // Tell our worker threads to stop.
375
376   for( WorkerThread* thread = mThreads; thread != 0; thread = thread->getNext() )
377      thread->stop();
378
379   // Release the semaphore as many times as there are threads.
380   // Doing this separately guarantees we're not waking a thread
381   // that hasn't been set its stop flag yet.
382
383   for( U32 n = 0; n < numThreads; ++ n )
384      mSemaphore.release();
385
386   // Delete each worker thread.  Wait until death as we're prone to
387   // running into issues with decomposing work item lists otherwise.
388
389   for( WorkerThread* thread = mThreads; thread != 0; )
390   {
391      WorkerThread* next = thread->getNext();
392      thread->join();
393      delete thread;
394      thread = next;
395   }
396
397   mThreads = NULL;
398   mNumThreads = 0;
399}
400
401//--------------------------------------------------------------------------
402
403void ThreadPool::queueWorkItem( WorkItem* item )
404{
405   bool executeRightAway = ( getForceAllMainThread() );
406#ifdef DEBUG_SPEW
407   Platform::outputDebugString( "[ThreadPool] %s work item '0x%x'",
408                                ( executeRightAway ? "executing" : "queuing" ),
409                                item );
410#endif
411
412   if( executeRightAway )
413      item->process();
414   else
415   {
416      // Put the item in the queue.
417      dFetchAndAdd( mNumPendingItems, 1 );
418      mWorkItemQueue.insert( item->getPriority(), item );
419
420      mSemaphore.release();
421   }
422}
423
424//--------------------------------------------------------------------------
425
426void ThreadPool::flushWorkItems( S32 timeOut )
427{
428   AssertFatal( mNumThreads, "ThreadPool::flushWorkItems() - no worker threads in pool" );
429   
430   U32 endTime = 0;
431   if( timeOut != -1 )
432      endTime = Platform::getRealMilliseconds() + timeOut;
433
434   // Spinlock until the queue is empty.
435
436   while( !mWorkItemQueue.isEmpty() )
437   {
438      Platform::sleep( 25 );
439
440      // Stop if we have exceeded our processing time budget.
441
442      if( timeOut != -1
443          && Platform::getRealMilliseconds() >= endTime )
444          break;
445   }
446}
447
448void ThreadPool::waitForAllItems( S32 timeOut )
449{
450   U32 endTime = 0;
451   if( timeOut != -1 )
452      endTime = Platform::getRealMilliseconds() + timeOut;
453
454   // Spinlock until there are no items that have not been processed.
455
456   while( dAtomicRead( mNumPendingItems ) )
457   {
458      Platform::sleep( 25 );
459
460      // Stop if we have exceeded our processing time budget.
461
462      if( timeOut != -1
463          && Platform::getRealMilliseconds() >= endTime )
464          break;
465   }
466}
467
468//--------------------------------------------------------------------------
469
470void ThreadPool::queueWorkItemOnMainThread( WorkItem* item )
471{
472   smMainThreadQueue.insert( item->getPriority(), item );
473}
474
475//--------------------------------------------------------------------------
476
477void ThreadPool::processMainThreadWorkItems()
478{
479   AssertFatal( ThreadManager::isMainThread(),
480      "ThreadPool::processMainThreadWorkItems - this function must only be called on the main thread" );
481
482   U32 timeLimit = ( Platform::getRealMilliseconds() + getMainThreadThresholdTimeMS() );
483
484   do
485   {
486      WorkItemWrapper item;
487      if( !smMainThreadQueue.takeNext( item ) )
488         break;
489      else
490         item->process();
491   }
492   while( Platform::getRealMilliseconds() < timeLimit );
493}
494