You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
751 lines
23 KiB
751 lines
23 KiB
//========= Copyright Valve Corporation, All rights reserved. ============// |
|
// |
|
// Purpose: |
|
// |
|
// $NoKeywords: $ |
|
//============================================================================= |
|
|
|
|
|
#include "stdafx.h" |
|
#include "tslist.h" |
|
#include <workthreadpool.h> |
|
#include <gclogger.h> |
|
|
|
#include "tier0/memdbgon.h" |
|
|
|
namespace GCSDK { |
|
|
|
IWorkThreadPoolSignal *CWorkThreadPool::sm_pWorkItemsCompletedSignal = NULL; |
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: CWorkThread constructors |
|
//----------------------------------------------------------------------------- |
|
CWorkThread::CWorkThread( CWorkThreadPool *pThreadPool ) |
|
: m_pThreadPool( pThreadPool ), |
|
m_bExitThread( false ), |
|
m_bFinished( false ) |
|
{ |
|
} |
|
|
|
CWorkThread::CWorkThread( CWorkThreadPool *pThreadPool, const char *pszName ) |
|
: m_pThreadPool( pThreadPool ), |
|
m_bExitThread( false ), |
|
m_bFinished( false ) |
|
{ |
|
SetName( pszName ); |
|
} |
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: Tell work thread pool not to set event on every item added (SetEvent is very expensive) |
|
//----------------------------------------------------------------------------- |
|
void CWorkThreadPool::SetNeverSetEventOnAdd( bool bNeverSet ) |
|
{ |
|
bool bWasSet = m_bNeverSetOnAdd; |
|
m_bNeverSetOnAdd = bNeverSet; |
|
|
|
// In case of disabling set right away to make sure if we have pending work we execute it now with no latency |
|
if ( bWasSet && !m_bNeverSetOnAdd ) |
|
m_EventNewWorkItem.Set(); |
|
} |
|
|
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: performs the work loop for the thread, waits for work, |
|
// notifies the owner (the pool) as it completes work and before it exits |
|
//----------------------------------------------------------------------------- |
|
int CWorkThread::Run() |
|
{ |
|
// manage our thread pool's statistics |
|
++m_pThreadPool->m_cThreadsRunning; |
|
|
|
#ifdef _SERVER |
|
g_CompletionPortManager.AssociateCallingThreadWithIOCP(); |
|
#endif |
|
|
|
OnStart(); |
|
|
|
#if 0 // need to port over new vprof code |
|
#if defined( VPROF_ENABLED ) |
|
CVProfile *pProfile = GetVProfProfileForCurrentThread(); |
|
#endif |
|
#endif |
|
|
|
CWorkThreadPool *pPool = m_pThreadPool; |
|
|
|
int nIterations = 0; |
|
const int nMaxFastIterations = 4; |
|
while ( !m_bExitThread ) |
|
{ |
|
#if 0 // game vprof doesn't yet support TLS'd vprof instances, until new vprof code is ported |
|
#if defined( VPROF_ENABLED ) |
|
if ( pProfile ) |
|
pProfile->MarkFrame( GetName() ); |
|
#endif |
|
#endif |
|
pPool->m_cActiveThreads++; |
|
|
|
nIterations = 0; |
|
while ( (pPool->BNeverSetEventOnAdd() && nIterations < nMaxFastIterations) || nIterations == 0 ) |
|
{ |
|
// process any items which have arrived |
|
CWorkItem *pWorkItem = pPool->GetNextWorkItemToProcess( ); |
|
while ( pWorkItem ) |
|
{ |
|
#if 0 |
|
pPool->m_StatWaitTime.Update( pWorkItem->WaitingTime() ); |
|
#endif |
|
if ( pWorkItem->HasTimedOut() ) |
|
{ |
|
pWorkItem->m_bCanceled = true; |
|
} |
|
else |
|
{ |
|
// call the work item to do its work |
|
pWorkItem->m_bCanceled = false; |
|
|
|
CFastTimer fastTimer; |
|
fastTimer.Start(); |
|
pWorkItem->m_bRunning = true; |
|
bool bSuccess = pWorkItem->ThreadProcess( this ); |
|
pWorkItem->m_bRunning = false; |
|
fastTimer.End(); |
|
CCycleCount cycleCount = fastTimer.GetDuration(); |
|
pWorkItem->SetCycleCount(cycleCount); |
|
#if 0 |
|
pPool->m_StatExecutionTime.Update( cycleCount.GetUlMicroseconds() ); |
|
#endif |
|
if ( bSuccess ) |
|
pPool->m_cSuccesses ++; |
|
else |
|
pWorkItem->m_bResubmit ? pPool->m_cRetries++ : pPool->m_cFailures++; |
|
} |
|
|
|
// do we need to resubmit this item? |
|
if ( pWorkItem->m_bResubmit ) |
|
{ |
|
pWorkItem->m_bResubmit = false; |
|
pWorkItem->m_bCanceled = false; |
|
// put it at the tail of the incoming queue |
|
pPool->AddWorkItem( pWorkItem ); |
|
pWorkItem->Release(); // dec since AddWorkItem added 1 more again |
|
} |
|
else |
|
{ |
|
// put it in the outgoing queue |
|
pPool->OnWorkItemCompleted( pWorkItem ); |
|
} |
|
|
|
// If we are flagged as exiting don't try to get more work, we need to exit right away and orphan the work |
|
// to avoid blocking shutdown. |
|
if ( !m_bExitThread ) |
|
{ |
|
// get the next work item (if any) |
|
pWorkItem = pPool->GetNextWorkItemToProcess( ); |
|
} |
|
else |
|
{ |
|
pWorkItem = NULL; |
|
} |
|
|
|
#if 0 // game vprof doesn't yet support TLS'd vprof instances, until new vprof code is ported |
|
#if defined( VPROF_ENABLED ) |
|
if ( pProfile && pWorkItem ) |
|
pProfile->MarkFrame( GetName() ); |
|
#endif |
|
#endif |
|
} |
|
|
|
if ( m_bExitThread ) |
|
break; |
|
|
|
++nIterations; |
|
if ( pPool->BNeverSetEventOnAdd() && nIterations < nMaxFastIterations ) |
|
{ |
|
VPROF_BUDGET( "CWorkThread -- Sleep", VPROF_BUDGETGROUP_SLEEPING ); |
|
ThreadSleep( 2 ); |
|
} |
|
} |
|
|
|
pPool->m_cActiveThreads--; |
|
|
|
// wait for a new work item to arrive in the queue, check the counts first just to be sure |
|
{ |
|
VPROF_BUDGET( "CWorkThread -- Sleep", VPROF_BUDGETGROUP_SLEEPING ); |
|
#ifdef _SERVER |
|
if ( pPool->BNeverSetEventOnAdd() ) |
|
pPool->m_EventNewWorkItem.Wait( 15 ); |
|
else |
|
pPool->m_EventNewWorkItem.Wait( 50 ); |
|
#else |
|
pPool->m_EventNewWorkItem.Wait( 50 ); |
|
#endif |
|
} |
|
} |
|
|
|
// Since we are exiting, we must have been signaled to shutdown, and we should signal any remaining threads |
|
// since each signal wakes only one thread. |
|
pPool->m_EventNewWorkItem.Set(); |
|
|
|
m_bFinished = true; |
|
|
|
// updates stats |
|
--m_pThreadPool->m_cThreadsRunning; |
|
|
|
return EXIT_SUCCESS; |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: Construct a new CWorkThreadPool object |
|
//----------------------------------------------------------------------------- |
|
CWorkThreadPool::CWorkThreadPool( const char *pszThreadName ) |
|
: |
|
#if 0 |
|
m_StatWaitTime( 100 ), |
|
m_StatExecutionTime( 100 ), |
|
#endif |
|
m_bThreadsInitialized( false ), |
|
m_cThreadsRunning( 0 ), |
|
m_cActiveThreads( 0 ), |
|
m_bMayHaveJobTimeouts( false ), |
|
m_bExiting( false ), |
|
m_bAutoCreateThreads( false ), |
|
m_cMaxThreads( 0 ), |
|
m_cFailures( 0 ), |
|
m_cSuccesses( 0 ), |
|
m_pWorkThreadConstructor( NULL ), |
|
m_ulLastCompletedSequenceNumber( 0 ), |
|
m_ulLastUsedSequenceNumber( 0 ), |
|
m_ulLastDispatchedSequenceNumber( 0 ), |
|
m_bEnsureOutputOrdering( false ), |
|
m_bNeverSetOnAdd( false ) |
|
{ |
|
Assert( pszThreadName != NULL ); |
|
Q_strncpy( m_szThreadNamePfx, pszThreadName, sizeof( m_szThreadNamePfx ) ); |
|
m_LimitTimerCreateNewThreads.SetLimit( 1 ); |
|
|
|
m_pTSQueueToProcess = new CTSQueue< CWorkItem* >; |
|
m_pTSQueueCompleted = new CTSQueue< CWorkItem* >; |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: destructor; does assertion checks to make sure we weere shut down cleanly |
|
// cleans up even if we weren't cleanly stopped |
|
//----------------------------------------------------------------------------- |
|
CWorkThreadPool::~CWorkThreadPool() |
|
{ |
|
// If you hit this you probably didn't call StopWorkThreads() first |
|
AssertMsg1( ( !m_bThreadsInitialized || m_bExiting ) && 0 == m_cThreadsRunning, |
|
"CWorkThreadPool::~CWorkThreadPool(): Thread pool %s shutdown incorrectly.\n", |
|
m_szThreadNamePfx ); |
|
|
|
if ( m_WorkThreads.Count() ) |
|
{ |
|
StopWorkThreads(); |
|
Assert( 0 == m_WorkThreads.Count() ); |
|
} |
|
|
|
Assert( 0 == m_cThreadsRunning ); |
|
|
|
// WARNING: We need to release any items left in the queues |
|
CWorkItem *pWorkItem = NULL; |
|
if ( m_pTSQueueCompleted->Count() > 0 ) |
|
{ |
|
EmitWarning( SPEW_THREADS, 2, "CWorkThreadPool::~CWorkThreadPool: work complete queue not empty, %d items discarded.\n", m_pTSQueueCompleted->Count() ); |
|
pWorkItem = NULL; |
|
while ( m_pTSQueueCompleted->PopItem( &pWorkItem ) ) |
|
{ |
|
while( pWorkItem->Release() ) |
|
{ |
|
/* nothing */ |
|
} |
|
} |
|
} |
|
|
|
if ( m_pTSQueueToProcess->Count() > 0 ) |
|
{ |
|
EmitWarning( SPEW_THREADS, 2, "CWorkThreadPool::~CWorkThreadPool: work processing queue not empty: %d items discarded.\n", m_pTSQueueToProcess->Count() ); |
|
while ( m_pTSQueueToProcess->PopItem( &pWorkItem ) ) |
|
{ |
|
while( pWorkItem->Release() ) |
|
{ |
|
/* nothing */ |
|
} |
|
} |
|
} |
|
|
|
delete m_pTSQueueToProcess; |
|
delete m_pTSQueueCompleted; |
|
} |
|
|
|
|
|
#if 0 |
|
//----------------------------------------------------------------------------- |
|
// Purpose: estimate the current backlog time using previous execution time, |
|
// the number of outstanding items, and the number of running threads |
|
//----------------------------------------------------------------------------- |
|
uint64 CWorkThreadPool::GetCurrentBacklogTime() const |
|
{ |
|
if ( m_WorkThreads.Count() == 0 ) |
|
return 0; |
|
return ( m_pTSQueueToProcess->Count() * m_StatExecutionTime.GetUlAvg() ) / m_WorkThreads.Count(); |
|
} |
|
#endif |
|
|
|
|
|
int CWorkThreadPool::AddWorkThread( CWorkThread *pThread ) |
|
{ |
|
AUTO_LOCK( m_WorkThreadMutex ); |
|
Assert( pThread ); |
|
return m_WorkThreads.AddToTail( pThread ); |
|
} |
|
|
|
|
|
void CWorkThreadPool::StartWorkThread( CWorkThread *pWorkThread, int iName ) |
|
{ |
|
char rgchThreadName[32]; |
|
Q_snprintf( rgchThreadName, sizeof( rgchThreadName ), "%s:%d", m_szThreadNamePfx, iName ); |
|
pWorkThread->SetName( rgchThreadName ); |
|
if ( !pWorkThread->Start() ) |
|
EmitError( SPEW_THREADS, "CWorkThreadPool::StartWorkThread: Thread creation failed.\n" ); |
|
} |
|
|
|
|
|
void CWorkThreadPool::StartWorkThreads() |
|
{ |
|
m_bThreadsInitialized = true; |
|
if ( 0 == m_WorkThreads.Count() ) |
|
{ |
|
EmitWarning( SPEW_THREADS, 2, "CWorkThreadPool::StartWorkThreads: called with no threads in the pool, this is probably a bug.\n" ); |
|
return; |
|
} |
|
m_bExiting = false; |
|
m_cThreadsRunning = 0; |
|
AUTO_LOCK( m_WorkThreadMutex ); |
|
FOR_EACH_VEC( m_WorkThreads, i ) |
|
{ |
|
StartWorkThread( m_WorkThreads[i], i ); |
|
} |
|
|
|
// XXX why? |
|
while ( m_cThreadsRunning == (uint) 0 ) |
|
{ |
|
ThreadSleep( 1 ); |
|
} |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: stops whatever work threads we're running |
|
// this must be called before the thread pool object is destroyed |
|
//----------------------------------------------------------------------------- |
|
void CWorkThreadPool::StopWorkThreads() |
|
{ |
|
// indicate that we're shutting down; |
|
// don't accept more work in this thread |
|
m_bExiting = true; |
|
|
|
AUTO_LOCK( m_WorkThreadMutex ); |
|
|
|
FOR_EACH_VEC( m_WorkThreads, i ) |
|
{ |
|
m_WorkThreads[i]->m_bExitThread = true; |
|
m_WorkThreads[i]->Cancel(); |
|
} |
|
|
|
// loop until all threads are dead |
|
while ( true ) |
|
{ |
|
// This thread already holds the mutex; recursive try-lock should always succeed |
|
DbgVerify( BTryDeleteExitedWorkerThreads() ); |
|
|
|
if ( m_WorkThreads.Count() == 0 ) |
|
break; |
|
|
|
// Keep waking up threads until they're all dead. |
|
m_EventNewWorkItem.Set(); |
|
|
|
#ifdef _PS3 |
|
// call to abort any running call to gethostbyname(). |
|
// this is called over all the remaining work threads, while |
|
// waiting for the rest of the work threads to finish so that they won't |
|
// spuriously block on new calls to gethostbyname() as the |
|
// sys_net_abort_resolver call only stops the next call to the |
|
// network API, not any future calls. |
|
|
|
FOR_EACH_VEC( m_WorkThreads, iPS3 ) |
|
{ |
|
// PS3 hack to abort gethostbyname() calls that may be blocking... |
|
sys_net_abort_resolver( m_WorkThreads[ iPS3 ]->GetThreadID(), SYS_NET_ABORT_STRICT_CHECK ); |
|
} |
|
#endif |
|
|
|
const uint k_uJoinTimeoutMillisec = 10000; // 10 seconds seems pretty arbitrary. |
|
|
|
CWorkThread *pWorkThread = m_WorkThreads[0]; |
|
bool bJoined = pWorkThread->Join( k_uJoinTimeoutMillisec ); |
|
if ( !bJoined ) |
|
{ |
|
// Print thread id as a pointer for cross-platform compatibility |
|
EmitWarning( SPEW_THREADS, 2, "Thread \"%s\" (ID %p) failed to shut down", pWorkThread->GetName(), (void*)pWorkThread->GetThreadID() ); |
|
} |
|
else |
|
{ |
|
// Succesful join means that the thread has terminated. |
|
if ( !pWorkThread->m_bFinished ) |
|
{ |
|
// This would be a logic error in the thread proc if it ever tripped. |
|
AssertMsg( false, "pWorkThread->m_bFinished is false but thread is not running" ); |
|
// Recover by flagging the thread as potentially eligable for deletion, since it's dead. |
|
pWorkThread->m_bFinished = true; |
|
} |
|
} |
|
} |
|
|
|
Assert( m_WorkThreads.Count() == 0 && m_cThreadsRunning == (uint32) 0 ); |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: sees if we have a non-zero number of work threads, |
|
// or a non-zero number of active threads |
|
//----------------------------------------------------------------------------- |
|
bool CWorkThreadPool::HasWorkItemsToProcess() const |
|
{ |
|
return ( m_pTSQueueToProcess->Count() > 0 ) |
|
|| ( m_cActiveThreads > 0 ); |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: sets dynamic thread construction |
|
//----------------------------------------------------------------------------- |
|
void CWorkThreadPool::SetWorkThreadAutoConstruct( int cMaxThreads, IWorkThreadFactory *pWorkThreadConstructor ) |
|
{ |
|
AUTO_LOCK( m_WorkThreadMutex ); |
|
|
|
m_bThreadsInitialized = true; |
|
m_bAutoCreateThreads = true; |
|
m_cMaxThreads = MAX( 1, cMaxThreads ); |
|
m_pWorkThreadConstructor = pWorkThreadConstructor; |
|
|
|
// If we have too many threads now, mark some to exit next time they loop. |
|
for ( int i = m_cMaxThreads; i < m_WorkThreads.Count(); i++ ) |
|
{ |
|
m_WorkThreads[i]->m_bExitThread = true; |
|
} |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: Adds a work item |
|
// Output: true if successful, |
|
// false if a low priority work item is not added due to a busy system |
|
// false if this work pool is shutting down and work isn't being accepted |
|
// NOTE: Adding normal priority items should always succeed |
|
//----------------------------------------------------------------------------- |
|
bool CWorkThreadPool::AddWorkItem( CWorkItem *pWorkItem ) |
|
{ |
|
Assert( !m_bExiting ); |
|
if ( m_bExiting ) |
|
return false; |
|
|
|
if ( m_bEnsureOutputOrdering ) |
|
{ |
|
AssertMsg( pWorkItem->m_bResubmit == false, "CWorkThreadPool can't support item auto resubmission when ensuring output ordering" ); |
|
} |
|
|
|
// if we're in auto-create mode, make sure we have enough threads running |
|
if ( m_bAutoCreateThreads && m_WorkThreads.Count() < m_cMaxThreads ) |
|
{ |
|
int cPendingItems = m_pTSQueueToProcess->Count(); |
|
|
|
// we shouldn't get more than 12 items queued per already existing thread, otherwise we |
|
// want to create a new thread to help us keep up. |
|
if ( m_WorkThreads.Count() < 1 || m_WorkThreads.Count() * 12 < ( cPendingItems + 1 ) ) |
|
{ |
|
if ( m_WorkThreads.Count() >= 2 && !m_LimitTimerCreateNewThreads.BLimitReached() ) |
|
{ |
|
// Don't create more yet, we don't want to create them too fast |
|
} |
|
else |
|
{ |
|
// create another thread |
|
CWorkThread *pWorkThread = NULL; |
|
if ( m_pWorkThreadConstructor ) |
|
{ |
|
pWorkThread = m_pWorkThreadConstructor->CreateWorkerThread( this ); |
|
} |
|
else |
|
{ |
|
pWorkThread = new CWorkThread( this ); |
|
} |
|
if( pWorkThread != NULL ) |
|
{ |
|
int iName = AddWorkThread( pWorkThread ); |
|
StartWorkThread( pWorkThread, iName ); |
|
} |
|
m_LimitTimerCreateNewThreads.SetLimit( 250*k_nThousand ); |
|
} |
|
} |
|
} |
|
|
|
// |
|
// Do we actually have any threads ? If creating threads can fail, then maybe we don't ! |
|
// In that case, this WorkItem is not going to run ! |
|
// |
|
if ( m_WorkThreads.Count() == 0 ) |
|
{ |
|
Assert(false); |
|
return false ; |
|
} |
|
|
|
|
|
// WARNING: We need to call pWorkItem AddRef() and Release() at all entry/exit points for the thread pool system. |
|
pWorkItem->AddRef(); |
|
|
|
pWorkItem->m_ulSequenceNumber = (++m_ulLastUsedSequenceNumber); |
|
m_pTSQueueToProcess->PushItem( pWorkItem ); |
|
|
|
if ( !BNeverSetEventOnAdd() && m_cActiveThreads == 0 ) |
|
{ |
|
VPROF_BUDGET( "SetEvent()", VPROF_BUDGETGROUP_THREADINGMAIN ); |
|
m_EventNewWorkItem.Set(); |
|
} |
|
|
|
return true; |
|
} |
|
|
|
|
|
CWorkItem *CWorkThreadPool::GetNextCompletedWorkItem( ) |
|
{ |
|
CWorkItem *pWorkItem = NULL; |
|
|
|
// Use a while loop just in case ref counts get screwed up and an item gets deleted when we release our reference to it |
|
while ( m_pTSQueueCompleted->PopItem( &pWorkItem ) ) |
|
{ |
|
// WARNING: We need to call workitem AddRef() and Release() at all entry/exit points for the thread pool system. |
|
// Release() returns the current refcount of the object (after decrementing it by one) and should be non-zero unless the |
|
// the caller has released it already. |
|
if ( pWorkItem != NULL && pWorkItem->Release() > 0 ) |
|
{ |
|
return pWorkItem; |
|
} |
|
} |
|
|
|
return NULL; |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: gets the next work item to process. This non-blocking function |
|
// returns NULL immediately if there's nothing left in the queue. |
|
// otherwise, a pointer to the next CWorkItem. |
|
//----------------------------------------------------------------------------- |
|
CWorkItem *CWorkThreadPool::GetNextWorkItemToProcess( ) |
|
{ |
|
CWorkItem *pWorkItem = NULL; |
|
|
|
if ( m_pTSQueueToProcess->Count() && m_pTSQueueToProcess->PopItem( &pWorkItem ) ) |
|
{ |
|
return pWorkItem; |
|
} |
|
|
|
return NULL; |
|
} |
|
|
|
|
|
bool CWorkThreadPool::BDispatchCompletedWorkItems( const CLimitTimer &limitTimer, CJobMgr *pJobMgr ) |
|
{ |
|
BTryDeleteExitedWorkerThreads(); |
|
|
|
CWorkItem *pWorkItem = GetNextCompletedWorkItem( ); |
|
while ( pWorkItem != NULL ) |
|
{ |
|
uint64 ulSequenceNumber = pWorkItem->m_ulSequenceNumber; |
|
// NOTE: despite its name, this YIELDS - the target job |
|
// is resumed, and we resume here. |
|
if ( !pWorkItem->DispatchCompletedWorkItem( pJobMgr ) ) |
|
{ |
|
EmitWarning( SPEW_THREADS, 2, "Work Item for Work Pool %s completed but job no longer existed to notify\n", m_szThreadNamePfx == NULL ? "UNKNOWN" :m_szThreadNamePfx ); |
|
AssertMsg1( m_bMayHaveJobTimeouts, "Work Item for Work Pool %s completed but job no longer existed to notify", m_szThreadNamePfx == NULL ? "UNKNOWN" :m_szThreadNamePfx ); |
|
} |
|
|
|
// pWorkItem was released by DispatchCompletedWorkItem |
|
m_ulLastDispatchedSequenceNumber = ulSequenceNumber; |
|
if ( limitTimer.BLimitReached() ) |
|
break; |
|
|
|
pWorkItem = GetNextCompletedWorkItem( ); |
|
} |
|
|
|
return ( GetCompletedWorkItemCount() > 0 ); |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: delete any thread objects that have exited |
|
// we'll make sure the thread has actually ended; |
|
// if they haven't, they'll remain in the threads to delete list |
|
//----------------------------------------------------------------------------- |
|
bool CWorkThreadPool::BTryDeleteExitedWorkerThreads() |
|
{ |
|
if ( m_WorkThreadMutex.TryLock() ) |
|
{ |
|
if ( m_cThreadsRunning < (uint) m_WorkThreads.Count() ) |
|
{ |
|
FOR_EACH_VEC_BACK( m_WorkThreads, i ) |
|
{ |
|
CWorkThread *pWorkThread = m_WorkThreads[i]; |
|
if ( pWorkThread->m_bFinished && !pWorkThread->IsThreadRunning() ) |
|
{ |
|
m_WorkThreads.FastRemove( i ); |
|
delete pWorkThread; |
|
} |
|
} |
|
} |
|
m_WorkThreadMutex.Unlock(); |
|
return true; |
|
} |
|
return false; |
|
} |
|
|
|
|
|
bool CWorkItem::DispatchCompletedWorkItem( CJobMgr *pJobMgr ) |
|
{ |
|
// Check if this work item needs to signal a job |
|
if ( pJobMgr && k_GIDNil != m_JobID ) |
|
{ |
|
if ( !pJobMgr->BRouteWorkItemCompletedIfExists( m_JobID, m_bCanceled ) ) |
|
return false; |
|
} |
|
else if ( k_GIDNil != m_JobID ) |
|
{ |
|
// This should never happen since we have already released our reference to the work item |
|
// and the calling job should have released its ref when it exited |
|
AssertMsg( false, "CWorkItem::DispatchCompletedWorkItem: got a work item with no job ID" ); |
|
} |
|
|
|
return true; |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: Called by the worker thread when it finishes an individual work item |
|
// This function will see if our work is meant to be well-ordred; if so, |
|
// it will do the necessary work to ensure ordering. |
|
// |
|
// It adds the item to the completed work item list so |
|
// the pool owner can retrieve it and checks to see if any threads |
|
// deserve to be shut down. |
|
//----------------------------------------------------------------------------- |
|
void CWorkThreadPool::OnWorkItemCompleted( CWorkItem *pWorkItem ) |
|
{ |
|
if ( sm_pWorkItemsCompletedSignal != NULL ) |
|
sm_pWorkItemsCompletedSignal->Signal(); |
|
|
|
if ( !m_bEnsureOutputOrdering ) |
|
{ |
|
// Since we aren't locking this sequence number could get screwed up a bit, but it's |
|
// pretty meaningless if ensure output ordering if off anyway... |
|
m_ulLastCompletedSequenceNumber = pWorkItem->m_ulSequenceNumber; |
|
m_pTSQueueCompleted->PushItem( pWorkItem ); |
|
} |
|
else |
|
{ |
|
// In the ordered case we need to lock completely here since we'll be moving around between |
|
// various data structures and also need to ensure the ordering of items in the TS queue |
|
m_MutexOnItemCompletedOrdered.Lock(); |
|
if ( m_ulLastCompletedSequenceNumber + 1 == pWorkItem->m_ulSequenceNumber ) |
|
{ |
|
m_ulLastCompletedSequenceNumber = pWorkItem->m_ulSequenceNumber; |
|
m_pTSQueueCompleted->PushItem( pWorkItem ); |
|
|
|
// We walk the vector multiple times, but it should be very short as items are likely to come in |
|
// close to in order, just mixed up a little if we have lots of threads or one item is much more |
|
// costly than others. |
|
bool bFoundNext = false; |
|
do |
|
{ |
|
bFoundNext = false; |
|
FOR_EACH_VEC( m_vecCompletedAndWaiting, i ) |
|
{ |
|
CWorkItem *pWaiting = m_vecCompletedAndWaiting[i]; |
|
if ( m_ulLastCompletedSequenceNumber + 1 == pWaiting->m_ulSequenceNumber ) |
|
{ |
|
m_ulLastCompletedSequenceNumber = pWaiting->m_ulSequenceNumber; |
|
m_pTSQueueCompleted->PushItem( pWaiting ); |
|
m_vecCompletedAndWaiting.FastRemove( i ); |
|
bFoundNext = true; |
|
break; |
|
} |
|
} |
|
} while ( bFoundNext == true ); |
|
} |
|
else |
|
{ |
|
m_vecCompletedAndWaiting.AddToTail( pWorkItem ); |
|
} |
|
m_MutexOnItemCompletedOrdered.Unlock(); |
|
} |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: return the count of items we've queued to process |
|
//----------------------------------------------------------------------------- |
|
int CWorkThreadPool::GetWorkItemToProcessCount() const |
|
{ |
|
return m_pTSQueueToProcess->Count(); |
|
} |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: return the count of items we've completed but not notified the consumer about |
|
//----------------------------------------------------------------------------- |
|
int CWorkThreadPool::GetCompletedWorkItemCount() const |
|
{ |
|
int nCount = m_pTSQueueCompleted->Count(); |
|
return nCount; |
|
} |
|
|
|
|
|
#ifdef DBGFLAG_VALIDATE |
|
//----------------------------------------------------------------------------- |
|
// Purpose: Validates memory |
|
//----------------------------------------------------------------------------- |
|
void CWorkThreadPool::Validate( CValidator &validator, const char *pchName ) |
|
{ |
|
VALIDATE_SCOPE(); |
|
AUTO_LOCK( m_WorkThreadMutex ); |
|
|
|
ValidateObj( m_WorkThreads ); |
|
FOR_EACH_VEC( m_WorkThreads, iWorkThread ) |
|
{ |
|
m_WorkThreads[ iWorkThread ]->Suspend(); |
|
ValidatePtr( m_WorkThreads[ iWorkThread ] ); |
|
} |
|
|
|
ValidateAlignedPtr( m_pTSQueueCompleted ); |
|
ValidateAlignedPtr( m_pTSQueueToProcess ); |
|
ValidateObj( m_vecCompletedAndWaiting ); |
|
FOR_EACH_VEC( m_vecCompletedAndWaiting, j ) |
|
{ |
|
ValidatePtr( m_vecCompletedAndWaiting.Element( j ) ); |
|
} |
|
|
|
FOR_EACH_VEC( m_WorkThreads, iWorkThread ) |
|
{ |
|
m_WorkThreads[ iWorkThread ]->Resume(); |
|
} |
|
|
|
#if 0 |
|
ValidateObj( m_StatExecutionTime ); |
|
ValidateObj( m_StatWaitTime ); |
|
#endif |
|
} |
|
#endif // DBGFLAG_VALIDATE |
|
|
|
} // namespace GCSDK
|
|
|