//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose:
//
// $NoKeywords: $
//=============================================================================

#ifndef GC_JOBMGR_H
#define GC_JOBMGR_H
#ifdef _WIN32
#pragma once
#endif

#include "tier0/fasttimer.h"
#include "tier1/utlpriorityqueue.h"
#include "job.h"
#include "workthreadpool.h"
class GCConVar;

#include "tier0/memdbgon.h"

namespace GCSDK
{

#if defined(_DEBUG)
// this is restricted to debug builds due to the performance cost
// that could be changed by removing the expensive sm_listAllJobs.Find() command
#define DEBUG_JOB_LIST
#endif // defined(_DEBUG)

struct JobStats_t
{
	uint m_cJobsCurrent;
	uint m_cJobsTotal;
	uint m_cJobsFailed;
	uint64 m_cJobsTimedOut;		// # of jobs timed out ever
	double m_flSumJobTimeMicrosec;
	double m_flSumSqJobTimeMicrosec;
	uint64 m_unMaxJobTimeMicrosec;

	uint m_cTimeslices;

	JobStats_t()
	{
		memset( this, 0, sizeof(JobStats_t) );
	}
};

struct JobStatsBucket_t
{
	JobStatsBucket_t()
	{
		memset( this, 0, sizeof(JobStatsBucket_t) );
	}
	char m_rgchName[64];
	uint64 m_cCompletes;
	uint64 m_u64RunTimeMax;
	uint64 m_cTimeoutNetMsg;
	uint64 m_cLongInterYieldTime;
	uint64 m_cLocksAttempted;
	uint64 m_cLocksWaitedFor;
	uint64 m_cLocksFailed;
	uint64 m_cLocksLongHeld;
	uint64 m_cLocksLongWait;
	uint64 m_cWaitTimeout;
	uint64 m_u64JobDuration;
	uint64 m_cJobsPaused;
	uint64 m_cJobsFailed;
	uint64 m_u64RunTime;
	// use by ListJobs
	uint64 m_cPauseReasonNetworkMsg;
	uint64 m_cPauseReasonSleepForTime;
	uint64 m_cPauseReasonWaitingForLock;
	uint64 m_cPauseReasonYield;
	uint64 m_cPauseReasonSQL;
	uint64 m_cPauseReasonWorkItem;

#ifdef DBGFLAG_VALIDATE
	void Validate( CValidator &validator, const char *pchName )
	{
		VALIDATE_SCOPE();
	}
#endif
};

enum EJobProfileAction 
{
	k_EJobProfileAction_ErrorReport = 0,
	k_EJobProfileAction_Start = 1,
	k_EJobProfileAction_Stop = 2,
	k_EJobProfileAction_Dump = 3,
	k_EJobProfileAction_Clear = 4,
};

enum EJobProfileSortOrder 
{
	k_EJobProfileSortOrder_Alpha = 0,
	k_EJobProfileSortOrder_Count = 1,
	k_EJobProfileSortOrder_TotalRuntime = 2,
};

struct JobProfileStats_t
{
	int m_iJobProfileSort;
	CUtlMap< uint32, JobStatsBucket_t, int > *pmapStatsBucket;
};

//-----------------------------------------------------------------------------
// Purpose: This keeps track of all jobs that belong to a given hub.
//			It's primarily used for routing incoming messages to jobs.
//-----------------------------------------------------------------------------
class CJobMgr
{ 
public:
	// Constructors & destructors
	CJobMgr();
	~CJobMgr();

	// gets the next available job ID
	JobID_t GetNewJobID();

	// Set the thread count for the internal thread pool(s)
	void SetThreadPoolSize( uint cThreads );

	// Run any sleeping jobs who's wakeup time has arrived and check for timeouts
	bool BFrameFuncRunSleepingJobs( CLimitTimer &limitTimer );

	// Run any yielding jobs, even low priority ones
	bool BFrameFuncRunYieldingJobs( CLimitTimer &limitTimer );

	// Route this message to an existing Job, or create a new one if that JobID does not exist
	bool BRouteMsgToJob( void *pParent, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo );

	// Adds a new Job to the mgr and generates a JobID for it.
	void InsertJob( CJob &job );

	// Removes a Job from the mgr (the caller is still responsible for freeing it)
	void RemoveJob( CJob &job );

	//called by a job that has just been started to place itself on the yield queue instead of running
	void AddDelayedJobToYieldList( CJob &job );

#ifdef GC
	// resumes the specified job if it is, in fact, waiting for a SQL query to return
	bool BResumeSQLJob( JobID_t jobID );

	// yields waiting for a query response
	bool BYieldingRunQuery( CJob &job, CGCSQLQueryGroup *pQueryGroup, ESchemaCatalog eSchemaCatalog );

	// SQL profiling
	enum ESQLProfileSort
	{
		k_ESQLProfileSortTotalTime,
		k_ESQLProfileSortTotalCount,
		k_ESQLProfileSortAvgTime,
		k_ESQLProfileSortName
	};

	void StartSQLProfiling();
	void StopSQLProfiling();
	void DumpSQLProfile( ESQLProfileSort eSort );
#endif

	// returns true if we're running any jobs of the specified name
	// slow to call if lots of jobs are running, should only be used by tests
	bool BIsJobRunning( const char *pchJobName );

	// passes a network msg directly to the specified job
	void PassMsgToJob( CJob &job, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo );

	// yields until a network message is received
	bool BYieldingWaitForMsg( CJob &job );

	// yields for a set amount of time
	bool BYieldingWaitTime( CJob &job, uint32 cMicrosecondsToSleep );

	// simple yield until Run() called again
	bool BYield( CJob &job );

	// Yield only if job manager decides we need to
	bool BYieldIfNeeded( CJob &job, bool *pbYielded );

	// Thread pool work item
	bool BYieldingWaitForWorkItem( CJob &job, const char *pszWorkItemName = NULL );
	bool BRouteWorkItemCompleted( JobID_t jobID, bool bWorkItemCanceled )	{ return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ true ); }
	bool BRouteWorkItemCompletedIfExists( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ true ); }
	bool BRouteWorkItemCompletedDelayed( JobID_t jobID, bool bWorkItemCanceled )	{ return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ false ); }
	bool BRouteWorkItemCompletedIfExistsDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ false ); }

	void AddThreadedJobWorkItem( CWorkItem *pWorkItem );
	void StopWorkThreads() { m_WorkThreadPool.StopWorkThreads(); }

	static int ProfileSortFunc( void *pCtx, const int *lhs, const int *rhs );

	void ProfileJobs( EJobProfileAction ejobProfileAction, EJobProfileSortOrder iSortOrder = k_EJobProfileSortOrder_Alpha );
	int DumpJobSummary();
	void DumpJob( JobID_t jobID, int nPrintLocksMax = 20 ) const;
	int CountJobs() const;	// counts currently active jobs
	void CheckThreadID(); // make sure we are still in the correct thread
	int CountYieldingJobs() const { return m_ListJobsYieldingRegPri.Count(); } // counts jobs currently in a yielding state
	bool HasOutstandingThreadPoolWorkItems();
	
	void SetIsShuttingDown();
	bool GetIsShuttingDown() const { return m_bIsShuttingDown; }

	void *GetMainMemoryDebugInfo() { return g_memMainDebugInfo.Base(); }

#ifdef DBGFLAG_VALIDATE
	void Validate( CValidator &validator, const char *pchName );		// Validate our internal structures
	static void ValidateStatics( CValidator &validator, const char *pchName );
#endif /* DBGFLAG_VALIDATE */

	// wakes up a job that was waiting on a lock
	void WakeupLockedJob( CJob &job );

    // returns true if there is a job active with the specified ID
	bool BJobExists( JobID_t jobID ) const;

	// returns a job
	CJob *GetPJob( JobID_t jobID );
	const CJob *GetPJob( JobID_t jobID ) const;

	JobStats_t& GetJobStats() { return m_JobStats; }

	// Access work thread pool directly
	CWorkThreadPool *AccessWorkThreadPool() { return &m_WorkThreadPool; }

	// Debug helpers
	// dumps a list of all running jobs across ALL job managers
	void DumpJobs( const char *pszJobName, int nMax, int nPrintLocksMax = 1 ) const;
	// cause a debug break in the given job
	static void DebugJob( int iJob );

	// disable/enable yielding for debugging
	void SetPauseAllowed( bool bNewPauseAllowed ) { m_bDebugDisallowPause = !bNewPauseAllowed; }

private:

	bool BRouteWorkItemCompletedInternal( JobID_t jobID, bool bWorkItemCanceled, bool bShouldExist, bool bResumeImmediately );

	// Create a new job for this message
	bool BLaunchJobFromNetworkMsg( void *pParent, const JobMsgInfo_t &jobMsgInfo, IMsgNetPacket *pNetPacket );

	// Internal add to yield list (looks at priority)
	void AddToYieldList( CJob &job );

	// Get an IJob given a job ID and pause reason
	bool BGetIJob( JobID_t jobID, EJobPauseReason eJobPauseReason, bool bShouldExist, int *pIJob );

	// Map containing all of our jobs
	CUtlMap<JobID_t, CJob *, int> m_MapJob;

	// jobs simply waiting until the next Run()
	struct JobYielding_t
	{
		JobID_t m_JobID;
		uint m_nIteration;
	};
	CUtlLinkedList<JobYielding_t, int> m_ListJobsYieldingRegPri;
	bool BResumeYieldingJobs( CLimitTimer &limitTimer );
	bool BResumeYieldingJobsFromList( CUtlLinkedList<JobYielding_t, int> &listJobsYielding, uint nCurrentIteration, CLimitTimer &limitTimer );
	uint m_nCurrentYieldIterationRegPri;

	// jobs waiting on a timer
	struct JobSleeping_t
	{
		JobID_t m_JobID;
		CJobTime m_SWakeupTime;
		CJobTime m_STimeTouched;
	};
	CUtlPriorityQueue<JobSleeping_t> m_QueueJobSleeping;
	bool BResumeSleepingJobs( CLimitTimer &limitTimer );
	static bool JobSleepingLessFunc( JobSleeping_t const &lhs, JobSleeping_t const &rhs );

	// timeout list of jobs, ordered from oldest to newest
	struct JobTimeout_t
	{
		JobID_t m_JobID;
		CJobTime m_STimePaused;
		CJobTime m_STimeTouched;
		uint32 m_cHeartbeatsBeforeTimeout;
	};
	CUtlLinkedList<JobTimeout_t, int> m_ListJobTimeouts;
	CUtlMap<JobID_t, int, int> m_MapJobTimeoutsIndexByJobID;
	void PauseJob( CJob &job, EJobPauseReason eJobPauseReason );
	void CheckForJobTimeouts( CLimitTimer &limitTimer );
	void TimeoutJob( CJob &job );
	bool m_bJobTimedOut;

	// thread pool usage, for running job functions in other threads
	CWorkThreadPool m_WorkThreadPool;

	void AccumulateStatsofJob( CJob &job );
	void RecordOrphanedMessage( MsgType_t eMsg, JobID_t jobIDTarget );

	// stats info
	JobStats_t m_JobStats;

	// static job registration
	static void RegisterJobType( const JobType_t *pJobType ); 
	friend void Job_RegisterJobType( const JobType_t *pJobType );

	JobID_t m_unNextJobID;
	uint m_unFrameFuncThreadID; // the thread is JobMgr is working in
	bool m_bProfiling;
	bool m_bIsShuttingDown;
	int m_cErrorsToReport;
	CUtlMap< uint32, JobStatsBucket_t, int > m_mapStatsBucket;
	CUtlMap<MsgType_t, int, int> m_mapOrphanMessages;
	CUtlMemory<unsigned char> g_memMainDebugInfo;

#ifdef GC
	// sql profiling
	bool m_bSQLProfiling;
	CFastTimer	m_sqlTimer;
	
	struct PendingSQLJob_t
	{
		int64 m_nStartMicrosec;
		int32 m_iBucket;
	};

	struct SQLProfileBucket_t
	{
		int64 m_nTotalMicrosec;
		uint32 m_unCount;
	};

	CUtlHashMapLarge<GID_t, PendingSQLJob_t> m_mapSQLQueriesInFlight;
	CUtlDict<SQLProfileBucket_t> m_dictSQLBuckets;

	struct SQLProfileCtx_t
	{
		ESQLProfileSort m_eSort;
		CUtlDict<SQLProfileBucket_t> *pdictBuckets;
	};
	static int SQLProfileSortFunc( void *pCtx, const int *lhs, const int *rhs );
#endif

#ifdef DEBUG_JOB_LIST
	// static job debug list
	static CUtlLinkedList<CJob *, int> sm_listAllJobs;
#endif

	bool m_bDebugDisallowPause;
};


//-----------------------------------------------------------------------------
// Purpose: passthrough function just so the CJob internal data can be kept private
//-----------------------------------------------------------------------------
inline void Job_RegisterJobType( const JobType_t *pJobType )
{
	CJobMgr::RegisterJobType( pJobType );
}


//-----------------------------------------------------------------------------
// Purpose: passthrough function just so the CJob internal data can be kept private
//-----------------------------------------------------------------------------
inline void Job_SetJobType( CJob &job, const JobType_t *pJobType )
{
	job.m_pJobType = pJobType;
}


//-----------------------------------------------------------------------------
// Purpose: job registration macro
//-----------------------------------------------------------------------------
#define GC_REG_JOB( parentclass, jobclass, jobname, msg, servertype ) \
	GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \
	static const GCSDK::JobType_t g_JobType_##jobclass = { jobname, (GCSDK::MsgType_t)msg, servertype, (GCSDK::JobCreationFunc_t)CreateJob_##jobclass }; \
	GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \
	{ \
		GCSDK::CJob *job = GCSDK::CJob::AllocateJob<jobclass>( pParent ); \
		if ( job ) \
		{ \
			Job_SetJobType( *job, &g_JobType_##jobclass ); \
			if ( pvStartParam ) job->SetStartParam( pvStartParam ); \
		} \
		else \
		{ \
			AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \
		} \
		return job; \
	} \
	static class CRegJob_##jobclass \
	{ \
	public: CRegJob_##jobclass() \
		{ \
		Job_RegisterJobType( &g_JobType_##jobclass ); \
		} \
	} g_RegJob_##jobclass;


//-----------------------------------------------------------------------------
// Purpose: job registration macro for job triggered by web api request
//-----------------------------------------------------------------------------
#define REG_WEBAPI_JOB( parentclass, jobclass, jobname, servertype ) \
	CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \
	static const JobType_t g_JobType_##jobclass = { jobname, k_EGCMsgInvalid, servertype, (JobCreationFunc_t)CreateJob_##jobclass }; \
	CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \
{ \
	CJob *job = CJob::AllocateJob<jobclass>( pParent ); \
	if ( job ) \
	{ \
		Job_SetJobType( *job, &g_JobType_##jobclass ); \
		if ( pvStartParam ) job->SetStartParam( pvStartParam ); \
	} \
	else \
	{ \
		AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \
	} \
	return job; \
} \
	static class CRegJob_##jobclass \
{ \
public: CRegJob_##jobclass() \
{ \
	Job_RegisterJobType( &g_JobType_##jobclass ); \
} \
} g_RegJob_##jobclass;



} // namespace GCSDK

#include "tier0/memdbgoff.h"

#endif // GC_JOBMGR_H