You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
386 lines
13 KiB
386 lines
13 KiB
//========= Copyright Valve Corporation, All rights reserved. ============// |
|
// |
|
// Purpose: A thread pool implementation. You give it CWorkItems, |
|
// it processes them asynchronously, and hands them back to you when they've |
|
// been completed. |
|
// |
|
// To declare a queue, provide the implementation of a CWorkItem subtype, |
|
// the thread name prefix for threads in the pool, and the number of work |
|
// threads you want. |
|
// |
|
// CNet uses this class to offload encryption to a separate thread, |
|
// so that's a good place to start looking for usage examples. |
|
// |
|
//============================================================================= |
|
|
|
#ifndef WORKTHREADPOOL_H |
|
#define WORKTHREADPOOL_H |
|
#ifdef _WIN32 |
|
#pragma once |
|
#endif |
|
|
|
#include <refcount.h> |
|
#include <reliabletimer.h> |
|
#include "jobtime.h" |
|
|
|
// forward declaration for CTSQueue which we can't statically allocate as our member |
|
// because of alignment issues on Win64 |
|
template <class T, bool bTestOptimizer> |
|
class CTSQueue; |
|
|
|
namespace GCSDK { |
|
|
|
// forward declarations |
|
class CWorkThread; |
|
class CJobMgr; |
|
|
|
|
|
// these functions return pointers to fixed string in the code section. We need this for VPROF nodes |
|
#define DECLARE_WORK_ITEM( classname ) \ |
|
virtual const char* GetDispatchCompletedName() const { return #classname"::DispatchCompleted"; } \ |
|
virtual const char* GetThreadProcessName() const { return #classname"::ThreadProcess"; } |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: Work item base class. Derive from this for specific work item types. |
|
// The derived type ideally should be self-contained with all data it |
|
// needs to perform the work. |
|
//----------------------------------------------------------------------------- |
|
class CWorkItem : public CRefCount |
|
{ |
|
public: |
|
CWorkItem() |
|
: m_JobID( k_GIDNil ), |
|
m_bRunning( false ), |
|
m_bResubmit( false ), |
|
m_bCanceled( false ), |
|
m_ulSequenceNumber( 0 ) |
|
{ |
|
m_jobtimeTimeout.SetLTime( 0 ); |
|
m_jobtimeQueued.SetToJobTime(); |
|
} |
|
|
|
CWorkItem( JobID_t jobID ) |
|
: m_JobID( jobID ), |
|
m_bRunning( false ), |
|
m_bResubmit( false ), |
|
m_bCanceled( false ), |
|
m_ulSequenceNumber( 0 ) |
|
{ |
|
m_jobtimeTimeout.SetLTime( 0 ); |
|
m_jobtimeQueued.SetToJobTime(); |
|
} |
|
|
|
CWorkItem( JobID_t jobID, int64 cTimeoutMicroseconds ) |
|
: m_JobID( jobID ), |
|
m_bRunning( false ), |
|
m_bResubmit( false ), |
|
m_bCanceled( false ), |
|
m_ulSequenceNumber( 0 ) |
|
{ |
|
SetPreExecuteTimeout( cTimeoutMicroseconds ); |
|
m_jobtimeQueued.SetToJobTime(); |
|
} |
|
|
|
void SetJobID( JobID_t jobID ) |
|
{ |
|
Assert(jobID != k_GIDNil) ; |
|
m_JobID = jobID; |
|
} |
|
JobID_t GetJobID() const { return m_JobID; } |
|
|
|
bool HasTimedOut() const { return m_jobtimeTimeout.LTime() != 0 && m_jobtimeTimeout.CServerMicroSecsPassed() > 0; } |
|
int64 WaitingTime() const { return m_jobtimeQueued.CServerMicroSecsPassed(); } |
|
void SetPreExecuteTimeout( int64 cMicroSeconds ) { m_jobtimeTimeout.SetFromJobTime( cMicroSeconds ); } |
|
bool BPreExecuteTimeoutSet( ) const { return m_jobtimeTimeout.LTime() != 0; } |
|
void ForceTimeOut() { m_jobtimeTimeout.SetFromJobTime( -1 );} |
|
bool BIsRunning() const { return m_bRunning; } // true if running right now |
|
bool WasCancelled() const { return m_bCanceled; } |
|
void SetCycleCount( CCycleCount& cycleCount ) { m_CycleCount = cycleCount ; } |
|
CCycleCount GetCycleCount() { return m_CycleCount; } |
|
uint64 GetSequenceNumber() { return m_ulSequenceNumber; } |
|
|
|
// Work threads can call this to force a work item to be reprocessed (added to the end of the process queue) |
|
void SetResubmit( bool bResubmit ) { m_bResubmit = bResubmit; } |
|
|
|
// these functions return pointers to fixed string in the code section. |
|
// We need this for VPROF nodes, you must use the DECLARE_WORK_ITEM macro |
|
virtual const char* GetDispatchCompletedName() const = 0; |
|
virtual const char* GetThreadProcessName() const = 0; |
|
|
|
// Return false if your operation failed in some way that you would want to know about |
|
// The CWorkThreadPool will count the failures. |
|
virtual bool ThreadProcess( CWorkThread *pThread ) = 0; // called by the worker thread |
|
virtual bool DispatchCompletedWorkItem( CJobMgr *jobMgr ); // called by main loop after item completed |
|
|
|
#ifdef DBGFLAG_VALIDATE |
|
virtual void Validate( CValidator &validator, const char *pchName ) {} // Validate our internal structures |
|
#endif |
|
|
|
protected: |
|
// note: destructor is private. This is a ref-counted object, private destructor ensures callers can't accidentally delete |
|
// directly, or declare on stack |
|
virtual ~CWorkItem() { } |
|
|
|
friend class CWorkThread; |
|
friend class CWorkThreadPool; |
|
uint64 m_ulSequenceNumber; // Sequence number for the work item, used when enforcing output ordering as matching input order |
|
CCycleCount m_CycleCount; // A record of how long it took to execute this particular work item ! |
|
|
|
private: |
|
bool m_bResubmit; // true if the item should be resubmitted after last run |
|
volatile bool m_bRunning; // true if the work item is running right now |
|
bool m_bCanceled; // true if the work was canceled due to timeout |
|
CJobTime m_jobtimeTimeout; // time at which this result is no longer valid, so it shouldn't start to be processed |
|
CJobTime m_jobtimeQueued; |
|
JobID_t m_JobID; |
|
}; |
|
|
|
// forward decl |
|
class CWorkThreadPool; |
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: Generic work thread implementation, to be specialized if necessary |
|
//----------------------------------------------------------------------------- |
|
class CWorkThread : public CThread |
|
{ |
|
public: |
|
CWorkThread( CWorkThreadPool *pThreadPool ); |
|
CWorkThread( CWorkThreadPool *pThreadPool, const char *pszName ); |
|
|
|
virtual ~CWorkThread() |
|
{ |
|
} |
|
|
|
virtual int Run(); |
|
|
|
virtual void Cancel() |
|
{ |
|
} |
|
|
|
protected: |
|
CWorkThreadPool *m_pThreadPool; // parent pool |
|
volatile bool m_bExitThread; // set by CWorkThreadPool::StopWorkerThreads and possibly by subclasses of CWorkThread |
|
volatile bool m_bFinished; // set by CWorkThread::Run [note: must still check IsThreadRunning, and/or call Join] |
|
virtual void OnStart() { } |
|
virtual void OnExit() { } |
|
|
|
#ifdef DBGFLAG_VALIDATE |
|
public: |
|
virtual void Validate( CValidator &validator, const char *pchName ) |
|
{ |
|
VALIDATE_SCOPE(); |
|
}; |
|
#endif // DBGFLAG_VALIDATE |
|
|
|
friend class CWorkThreadPool; |
|
}; |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// callback class to create work threads |
|
//----------------------------------------------------------------------------- |
|
class IWorkThreadFactory |
|
{ |
|
public: |
|
virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool ) = 0; |
|
}; |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// reusable trivial implementation of IWorkThreadFactory |
|
//----------------------------------------------------------------------------- |
|
template<class T> |
|
class CWorkThreadFactory : public IWorkThreadFactory |
|
{ |
|
public: |
|
virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool ) |
|
{ |
|
return new T( pWorkThreadPool ); |
|
} |
|
}; |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: interface class for object that the WorkThreadPool can signal when |
|
// there are completed work items to process |
|
//----------------------------------------------------------------------------- |
|
class IWorkThreadPoolSignal |
|
{ |
|
public: |
|
virtual void Signal() = 0; |
|
}; |
|
|
|
|
|
//----------------------------------------------------------------------------- |
|
// Purpose: pool of work threads. |
|
//----------------------------------------------------------------------------- |
|
class CWorkThreadPool |
|
{ |
|
friend class CWorkThread; |
|
public: |
|
|
|
static void SetWorkItemCompletedSignal( IWorkThreadPoolSignal *pObject ) |
|
{ |
|
sm_pWorkItemsCompletedSignal = pObject; |
|
} |
|
|
|
|
|
CWorkThreadPool( const char *pszThreadNamePfx ); |
|
|
|
// eventually it might be nice to be able to resize these pools via console command |
|
// in that case, we'd want a constructor like this, and a PoolSize accessor/mutator pair |
|
// it makes this class much more complicated, however (growing the pool is easy, shrinking it |
|
// is less easy) so we'll punt for now. |
|
/* CWorkThreadPool( const char *pszName = "unnamed thread" ) : CWorkThreadPool( pszName, -1 ); */ |
|
|
|
virtual ~CWorkThreadPool(); |
|
|
|
// Setting this will ensure that items of the same priority complete and get dispatched in the same order |
|
// they are added to the threadpool. This has a small additional locking overhead and can increase latency |
|
// as items that are actually completed out-of-order have to queue waiting on earlier items. |
|
void SetEnsureOutputOrdering( bool bEnsureOutputOrdering ) { m_bEnsureOutputOrdering = bEnsureOutputOrdering; } |
|
|
|
void AllowTimeouts( bool bMayHaveJobTimeouts ) { m_bMayHaveJobTimeouts = bMayHaveJobTimeouts; } |
|
|
|
int AddWorkThread( CWorkThread *pThread ); |
|
void StartWorkThreads(); // gentlemen, start your engines |
|
void StopWorkThreads(); // stop work threads |
|
bool HasWorkItemsToProcess() const; |
|
|
|
// sets it to use dynamic worker thread construction |
|
// if pWorkThreadControl is NULL, just creates a standard CWorkThread object |
|
void SetWorkThreadAutoConstruct( int cMaxThreads, IWorkThreadFactory *pWorkThreadConstructor ); |
|
|
|
bool AddWorkItem( CWorkItem *pWorkItem ); // add a work item to the queue to process |
|
CWorkItem *GetNextCompletedWorkItem( ); // get next completed work item and it's priority if needed |
|
const char *GetThreadNamePrefix() const { return m_szThreadNamePfx; } |
|
|
|
void SetNeverSetEventOnAdd( bool bNeverSet ); |
|
bool BNeverSetEventOnAdd() { return m_bNeverSetOnAdd; } |
|
|
|
// get count of completed work items |
|
// can't be inline because of m_TSQueueCompleted type |
|
int GetCompletedWorkItemCount() const; |
|
|
|
// get count of work items to process |
|
// can't be inline because of m_TSQueueToProcess type |
|
int GetWorkItemToProcessCount() const; |
|
|
|
uint64 GetLastUsedSequenceNumber( ) const |
|
{ |
|
return m_ulLastUsedSequenceNumber; |
|
} |
|
|
|
uint64 GetLastCompletedSequenceNumber( ) const |
|
{ |
|
return m_ulLastCompletedSequenceNumber; |
|
} |
|
|
|
uint64 GetLastDispatchedSequenceNumber( ) const |
|
{ |
|
return m_ulLastDispatchedSequenceNumber; |
|
} |
|
|
|
#if 0 |
|
uint64 GetAveExecutionTime() const |
|
{ |
|
return m_StatExecutionTime.GetUlAvg(); |
|
} |
|
uint64 GetAveWaitTime() const |
|
{ |
|
return m_StatWaitTime.GetUlAvg(); |
|
} |
|
uint64 GetCurrentBacklogTime() const; |
|
#endif |
|
|
|
int CountCompletedSuccess() const { return m_cSuccesses; } |
|
int CountRetries() const { return m_cRetries; } |
|
int CountCompletedFailed() const { return m_cFailures; } |
|
|
|
bool BDispatchCompletedWorkItems( const CLimitTimer &limitTimer, CJobMgr *pJobMgr ); |
|
bool BExiting() const { return m_bExiting; } |
|
|
|
int GetWorkerCount() const { return m_WorkThreads.Count(); } |
|
|
|
uint GetActiveThreadCount() const { return m_cActiveThreads; } |
|
|
|
// make sure you lock before using this |
|
const CWorkThread *GetWorkThread( int iIndex ) const |
|
{ |
|
Assert( iIndex >= 0 && iIndex < m_WorkThreads.Count() ); |
|
return m_WorkThreads[iIndex]; |
|
} |
|
|
|
protected: |
|
|
|
// STATICS |
|
static IWorkThreadPoolSignal *sm_pWorkItemsCompletedSignal; |
|
|
|
// MEMBERS |
|
CWorkItem *GetNextWorkItemToProcess( ); |
|
void StartWorkThread( CWorkThread *pWorkThread, int iName ); |
|
|
|
// meaningful thread name prefix |
|
char m_szThreadNamePfx[32]; |
|
// have we actually initialized the threadpool? |
|
bool m_bThreadsInitialized; |
|
|
|
// Incoming queue: queue of all work items to process |
|
// must be dynamically allocated for alignment requirements on Win64 |
|
CTSQueue< CWorkItem *, false > *m_pTSQueueToProcess; |
|
|
|
// Outgoing queues: queue of all completed work items |
|
// must be dynamically allocated for alignment requirements on Win64 |
|
CTSQueue< CWorkItem *, false > *m_pTSQueueCompleted; |
|
|
|
// Vectors of completed, but out of order and waiting work items, only used when bEnsureOutputOrdering == true |
|
CThreadMutex m_MutexOnItemCompletedOrdered; |
|
CUtlVector< CWorkItem * > m_vecCompletedAndWaiting; |
|
|
|
// Should we emit work items in the same order they are received (on a per priority basis) |
|
bool m_bEnsureOutputOrdering; |
|
|
|
// Sequence numbers |
|
uint64 m_ulLastUsedSequenceNumber; |
|
uint64 m_ulLastCompletedSequenceNumber; |
|
uint64 m_ulLastDispatchedSequenceNumber; |
|
|
|
bool m_bMayHaveJobTimeouts; |
|
CUtlVector< CWorkThread * > m_WorkThreads; |
|
CThreadMutex m_WorkThreadMutex; |
|
CInterlockedUInt m_cThreadsRunning; // how many threads are running |
|
volatile bool m_bExiting; // are we exiting |
|
CThreadEvent m_EventNewWorkItem; // event set when a new work item is available to process |
|
CInterlockedInt m_cActiveThreads; |
|
volatile bool m_bNeverSetOnAdd; |
|
|
|
bool m_bAutoCreateThreads; |
|
int m_cMaxThreads; |
|
IWorkThreadFactory *m_pWorkThreadConstructor; |
|
|
|
// override this method if you want to do any special handling of completed work items. Default implementation puts |
|
// work items in our completed item queue. |
|
virtual void OnWorkItemCompleted( CWorkItem *pWorkItem ); |
|
|
|
bool BTryDeleteExitedWorkerThreads(); |
|
|
|
int m_cSuccesses; |
|
int m_cFailures; |
|
int m_cRetries; |
|
#if 0 |
|
CStat m_StatExecutionTime; |
|
CStat m_StatWaitTime; |
|
#endif |
|
CLimitTimer m_LimitTimerCreateNewThreads; |
|
|
|
#ifdef DBGFLAG_VALIDATE |
|
public: |
|
void Validate( CValidator &validator, const char *pchName ); |
|
#endif |
|
}; |
|
|
|
} // namespace GCSDK |
|
|
|
|
|
#endif // WORKTHREAD_H
|
|
|