//========= Copyright Valve Corporation, All rights reserved. ============// // // Purpose: // // $NoKeywords: $ //============================================================================= #ifndef GC_JOBMGR_H #define GC_JOBMGR_H #ifdef _WIN32 #pragma once #endif #include "tier0/fasttimer.h" #include "tier1/utlpriorityqueue.h" #include "job.h" #include "workthreadpool.h" class GCConVar; #include "tier0/memdbgon.h" namespace GCSDK { #if defined(_DEBUG) // this is restricted to debug builds due to the performance cost // that could be changed by removing the expensive sm_listAllJobs.Find() command #define DEBUG_JOB_LIST #endif // defined(_DEBUG) struct JobStats_t { uint m_cJobsCurrent; uint m_cJobsTotal; uint m_cJobsFailed; uint64 m_cJobsTimedOut; // # of jobs timed out ever double m_flSumJobTimeMicrosec; double m_flSumSqJobTimeMicrosec; uint64 m_unMaxJobTimeMicrosec; uint m_cTimeslices; JobStats_t() { memset( this, 0, sizeof(JobStats_t) ); } }; struct JobStatsBucket_t { JobStatsBucket_t() { memset( this, 0, sizeof(JobStatsBucket_t) ); } char m_rgchName[64]; uint64 m_cCompletes; uint64 m_u64RunTimeMax; uint64 m_cTimeoutNetMsg; uint64 m_cLongInterYieldTime; uint64 m_cLocksAttempted; uint64 m_cLocksWaitedFor; uint64 m_cLocksFailed; uint64 m_cLocksLongHeld; uint64 m_cLocksLongWait; uint64 m_cWaitTimeout; uint64 m_u64JobDuration; uint64 m_cJobsPaused; uint64 m_cJobsFailed; uint64 m_u64RunTime; // use by ListJobs uint64 m_cPauseReasonNetworkMsg; uint64 m_cPauseReasonSleepForTime; uint64 m_cPauseReasonWaitingForLock; uint64 m_cPauseReasonYield; uint64 m_cPauseReasonSQL; uint64 m_cPauseReasonWorkItem; #ifdef DBGFLAG_VALIDATE void Validate( CValidator &validator, const char *pchName ) { VALIDATE_SCOPE(); } #endif }; enum EJobProfileAction { k_EJobProfileAction_ErrorReport = 0, k_EJobProfileAction_Start = 1, k_EJobProfileAction_Stop = 2, k_EJobProfileAction_Dump = 3, k_EJobProfileAction_Clear = 4, }; enum EJobProfileSortOrder { k_EJobProfileSortOrder_Alpha = 0, k_EJobProfileSortOrder_Count = 1, k_EJobProfileSortOrder_TotalRuntime = 2, }; struct JobProfileStats_t { int m_iJobProfileSort; CUtlMap< uint32, JobStatsBucket_t, int > *pmapStatsBucket; }; //----------------------------------------------------------------------------- // Purpose: This keeps track of all jobs that belong to a given hub. // It's primarily used for routing incoming messages to jobs. //----------------------------------------------------------------------------- class CJobMgr { public: // Constructors & destructors CJobMgr(); ~CJobMgr(); // gets the next available job ID JobID_t GetNewJobID(); // Set the thread count for the internal thread pool(s) void SetThreadPoolSize( uint cThreads ); // Run any sleeping jobs who's wakeup time has arrived and check for timeouts bool BFrameFuncRunSleepingJobs( CLimitTimer &limitTimer ); // Run any yielding jobs, even low priority ones bool BFrameFuncRunYieldingJobs( CLimitTimer &limitTimer ); // Route this message to an existing Job, or create a new one if that JobID does not exist bool BRouteMsgToJob( void *pParent, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo ); // Adds a new Job to the mgr and generates a JobID for it. void InsertJob( CJob &job ); // Removes a Job from the mgr (the caller is still responsible for freeing it) void RemoveJob( CJob &job ); //called by a job that has just been started to place itself on the yield queue instead of running void AddDelayedJobToYieldList( CJob &job ); #ifdef GC // resumes the specified job if it is, in fact, waiting for a SQL query to return bool BResumeSQLJob( JobID_t jobID ); // yields waiting for a query response bool BYieldingRunQuery( CJob &job, CGCSQLQueryGroup *pQueryGroup, ESchemaCatalog eSchemaCatalog ); // SQL profiling enum ESQLProfileSort { k_ESQLProfileSortTotalTime, k_ESQLProfileSortTotalCount, k_ESQLProfileSortAvgTime, k_ESQLProfileSortName }; void StartSQLProfiling(); void StopSQLProfiling(); void DumpSQLProfile( ESQLProfileSort eSort ); #endif // returns true if we're running any jobs of the specified name // slow to call if lots of jobs are running, should only be used by tests bool BIsJobRunning( const char *pchJobName ); // passes a network msg directly to the specified job void PassMsgToJob( CJob &job, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo ); // yields until a network message is received bool BYieldingWaitForMsg( CJob &job ); // yields for a set amount of time bool BYieldingWaitTime( CJob &job, uint32 cMicrosecondsToSleep ); // simple yield until Run() called again bool BYield( CJob &job ); // Yield only if job manager decides we need to bool BYieldIfNeeded( CJob &job, bool *pbYielded ); // Thread pool work item bool BYieldingWaitForWorkItem( CJob &job, const char *pszWorkItemName = NULL ); bool BRouteWorkItemCompleted( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ true ); } bool BRouteWorkItemCompletedIfExists( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ true ); } bool BRouteWorkItemCompletedDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ false ); } bool BRouteWorkItemCompletedIfExistsDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ false ); } void AddThreadedJobWorkItem( CWorkItem *pWorkItem ); void StopWorkThreads() { m_WorkThreadPool.StopWorkThreads(); } static int ProfileSortFunc( void *pCtx, const int *lhs, const int *rhs ); void ProfileJobs( EJobProfileAction ejobProfileAction, EJobProfileSortOrder iSortOrder = k_EJobProfileSortOrder_Alpha ); int DumpJobSummary(); void DumpJob( JobID_t jobID, int nPrintLocksMax = 20 ) const; int CountJobs() const; // counts currently active jobs void CheckThreadID(); // make sure we are still in the correct thread int CountYieldingJobs() const { return m_ListJobsYieldingRegPri.Count(); } // counts jobs currently in a yielding state bool HasOutstandingThreadPoolWorkItems(); void SetIsShuttingDown(); bool GetIsShuttingDown() const { return m_bIsShuttingDown; } void *GetMainMemoryDebugInfo() { return g_memMainDebugInfo.Base(); } #ifdef DBGFLAG_VALIDATE void Validate( CValidator &validator, const char *pchName ); // Validate our internal structures static void ValidateStatics( CValidator &validator, const char *pchName ); #endif /* DBGFLAG_VALIDATE */ // wakes up a job that was waiting on a lock void WakeupLockedJob( CJob &job ); // returns true if there is a job active with the specified ID bool BJobExists( JobID_t jobID ) const; // returns a job CJob *GetPJob( JobID_t jobID ); const CJob *GetPJob( JobID_t jobID ) const; JobStats_t& GetJobStats() { return m_JobStats; } // Access work thread pool directly CWorkThreadPool *AccessWorkThreadPool() { return &m_WorkThreadPool; } // Debug helpers // dumps a list of all running jobs across ALL job managers void DumpJobs( const char *pszJobName, int nMax, int nPrintLocksMax = 1 ) const; // cause a debug break in the given job static void DebugJob( int iJob ); // disable/enable yielding for debugging void SetPauseAllowed( bool bNewPauseAllowed ) { m_bDebugDisallowPause = !bNewPauseAllowed; } private: bool BRouteWorkItemCompletedInternal( JobID_t jobID, bool bWorkItemCanceled, bool bShouldExist, bool bResumeImmediately ); // Create a new job for this message bool BLaunchJobFromNetworkMsg( void *pParent, const JobMsgInfo_t &jobMsgInfo, IMsgNetPacket *pNetPacket ); // Internal add to yield list (looks at priority) void AddToYieldList( CJob &job ); // Get an IJob given a job ID and pause reason bool BGetIJob( JobID_t jobID, EJobPauseReason eJobPauseReason, bool bShouldExist, int *pIJob ); // Map containing all of our jobs CUtlMap<JobID_t, CJob *, int> m_MapJob; // jobs simply waiting until the next Run() struct JobYielding_t { JobID_t m_JobID; uint m_nIteration; }; CUtlLinkedList<JobYielding_t, int> m_ListJobsYieldingRegPri; bool BResumeYieldingJobs( CLimitTimer &limitTimer ); bool BResumeYieldingJobsFromList( CUtlLinkedList<JobYielding_t, int> &listJobsYielding, uint nCurrentIteration, CLimitTimer &limitTimer ); uint m_nCurrentYieldIterationRegPri; // jobs waiting on a timer struct JobSleeping_t { JobID_t m_JobID; CJobTime m_SWakeupTime; CJobTime m_STimeTouched; }; CUtlPriorityQueue<JobSleeping_t> m_QueueJobSleeping; bool BResumeSleepingJobs( CLimitTimer &limitTimer ); static bool JobSleepingLessFunc( JobSleeping_t const &lhs, JobSleeping_t const &rhs ); // timeout list of jobs, ordered from oldest to newest struct JobTimeout_t { JobID_t m_JobID; CJobTime m_STimePaused; CJobTime m_STimeTouched; uint32 m_cHeartbeatsBeforeTimeout; }; CUtlLinkedList<JobTimeout_t, int> m_ListJobTimeouts; CUtlMap<JobID_t, int, int> m_MapJobTimeoutsIndexByJobID; void PauseJob( CJob &job, EJobPauseReason eJobPauseReason ); void CheckForJobTimeouts( CLimitTimer &limitTimer ); void TimeoutJob( CJob &job ); bool m_bJobTimedOut; // thread pool usage, for running job functions in other threads CWorkThreadPool m_WorkThreadPool; void AccumulateStatsofJob( CJob &job ); void RecordOrphanedMessage( MsgType_t eMsg, JobID_t jobIDTarget ); // stats info JobStats_t m_JobStats; // static job registration static void RegisterJobType( const JobType_t *pJobType ); friend void Job_RegisterJobType( const JobType_t *pJobType ); JobID_t m_unNextJobID; uint m_unFrameFuncThreadID; // the thread is JobMgr is working in bool m_bProfiling; bool m_bIsShuttingDown; int m_cErrorsToReport; CUtlMap< uint32, JobStatsBucket_t, int > m_mapStatsBucket; CUtlMap<MsgType_t, int, int> m_mapOrphanMessages; CUtlMemory<unsigned char> g_memMainDebugInfo; #ifdef GC // sql profiling bool m_bSQLProfiling; CFastTimer m_sqlTimer; struct PendingSQLJob_t { int64 m_nStartMicrosec; int32 m_iBucket; }; struct SQLProfileBucket_t { int64 m_nTotalMicrosec; uint32 m_unCount; }; CUtlHashMapLarge<GID_t, PendingSQLJob_t> m_mapSQLQueriesInFlight; CUtlDict<SQLProfileBucket_t> m_dictSQLBuckets; struct SQLProfileCtx_t { ESQLProfileSort m_eSort; CUtlDict<SQLProfileBucket_t> *pdictBuckets; }; static int SQLProfileSortFunc( void *pCtx, const int *lhs, const int *rhs ); #endif #ifdef DEBUG_JOB_LIST // static job debug list static CUtlLinkedList<CJob *, int> sm_listAllJobs; #endif bool m_bDebugDisallowPause; }; //----------------------------------------------------------------------------- // Purpose: passthrough function just so the CJob internal data can be kept private //----------------------------------------------------------------------------- inline void Job_RegisterJobType( const JobType_t *pJobType ) { CJobMgr::RegisterJobType( pJobType ); } //----------------------------------------------------------------------------- // Purpose: passthrough function just so the CJob internal data can be kept private //----------------------------------------------------------------------------- inline void Job_SetJobType( CJob &job, const JobType_t *pJobType ) { job.m_pJobType = pJobType; } //----------------------------------------------------------------------------- // Purpose: job registration macro //----------------------------------------------------------------------------- #define GC_REG_JOB( parentclass, jobclass, jobname, msg, servertype ) \ GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \ static const GCSDK::JobType_t g_JobType_##jobclass = { jobname, (GCSDK::MsgType_t)msg, servertype, (GCSDK::JobCreationFunc_t)CreateJob_##jobclass }; \ GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \ { \ GCSDK::CJob *job = GCSDK::CJob::AllocateJob<jobclass>( pParent ); \ if ( job ) \ { \ Job_SetJobType( *job, &g_JobType_##jobclass ); \ if ( pvStartParam ) job->SetStartParam( pvStartParam ); \ } \ else \ { \ AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \ } \ return job; \ } \ static class CRegJob_##jobclass \ { \ public: CRegJob_##jobclass() \ { \ Job_RegisterJobType( &g_JobType_##jobclass ); \ } \ } g_RegJob_##jobclass; //----------------------------------------------------------------------------- // Purpose: job registration macro for job triggered by web api request //----------------------------------------------------------------------------- #define REG_WEBAPI_JOB( parentclass, jobclass, jobname, servertype ) \ CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \ static const JobType_t g_JobType_##jobclass = { jobname, k_EGCMsgInvalid, servertype, (JobCreationFunc_t)CreateJob_##jobclass }; \ CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \ { \ CJob *job = CJob::AllocateJob<jobclass>( pParent ); \ if ( job ) \ { \ Job_SetJobType( *job, &g_JobType_##jobclass ); \ if ( pvStartParam ) job->SetStartParam( pvStartParam ); \ } \ else \ { \ AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \ } \ return job; \ } \ static class CRegJob_##jobclass \ { \ public: CRegJob_##jobclass() \ { \ Job_RegisterJobType( &g_JobType_##jobclass ); \ } \ } g_RegJob_##jobclass; } // namespace GCSDK #include "tier0/memdbgoff.h" #endif // GC_JOBMGR_H