You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
699 lines
20 KiB
699 lines
20 KiB
//========= Copyright Valve Corporation, All rights reserved. ============// |
|
// |
|
// Purpose: |
|
// |
|
//============================================================================= |
|
|
|
#include "vmpi.h" |
|
#include "vmpi_distribute_work.h" |
|
#include "tier0/platform.h" |
|
#include "tier0/dbg.h" |
|
#include "utlvector.h" |
|
#include "utllinkedlist.h" |
|
#include "vmpi_dispatch.h" |
|
#include "pacifier.h" |
|
#include "vstdlib/random.h" |
|
#include "mathlib/mathlib.h" |
|
#include "threadhelpers.h" |
|
#include "threads.h" |
|
#include "tier1/strtools.h" |
|
#include "tier1/utlmap.h" |
|
#include "tier1/smartptr.h" |
|
#include "tier0/icommandline.h" |
|
#include "cmdlib.h" |
|
#include "vmpi_distribute_tracker.h" |
|
#include "vmpi_distribute_work_internal.h" |
|
|
|
|
|
|
|
#define DW_SUBPACKETID_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0) |
|
#define DW_SUBPACKETID_REQUEST_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+1) |
|
#define DW_SUBPACKETID_WUS_COMPLETED_LIST (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+2) |
|
|
|
|
|
|
|
// This is a pretty simple iterator. Basically, it holds a matrix of numbers. |
|
// Each row is assigned to a worker, and the worker just walks through his row. |
|
// |
|
// When a worker reaches the end of his row, it gets a little trickier. |
|
// They'll start doing their neighbor's row |
|
// starting at the back and continue on. At about this time, the master should reshuffle the |
|
// remaining work units to evenly distribute them amongst the workers. |
|
class CWorkUnitWalker |
|
{ |
|
public: |
|
CWorkUnitWalker() |
|
{ |
|
m_nWorkUnits = 0; |
|
} |
|
|
|
// This is all that's needed for it to start assigning work units. |
|
void Init( WUIndexType matrixWidth, WUIndexType matrixHeight, WUIndexType nWorkUnits ) |
|
{ |
|
m_nWorkUnits = nWorkUnits; |
|
m_MatrixWidth = matrixWidth; |
|
m_MatrixHeight = matrixHeight; |
|
Assert( m_MatrixWidth * m_MatrixHeight >= nWorkUnits ); |
|
|
|
m_WorkerInfos.RemoveAll(); |
|
m_WorkerInfos.EnsureCount( m_MatrixHeight ); |
|
for ( int i=0; i < m_MatrixHeight; i++ ) |
|
{ |
|
m_WorkerInfos[i].m_iStartWorkUnit = matrixWidth * i; |
|
m_WorkerInfos[i].m_iWorkUnitOffset = 0; |
|
} |
|
} |
|
|
|
// This is the main function of the shuffler |
|
bool GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex, bool *bWorkerFinishedHisColumn ) |
|
{ |
|
if ( iWorker < 0 || iWorker >= m_WorkerInfos.Count() ) |
|
{ |
|
Assert( false ); |
|
return false; |
|
} |
|
|
|
// If this worker has walked through all the work units, then he's done. |
|
CWorkerInfo *pWorker = &m_WorkerInfos[iWorker]; |
|
if ( pWorker->m_iWorkUnitOffset >= m_nWorkUnits ) |
|
return false; |
|
|
|
// If we've gone past the end of our work unit list, then we start at the BACK of the other rows of work units |
|
// in the hopes that we won't collide with the guy working there. We also should tell the master to reshuffle. |
|
WUIndexType iWorkUnitOffset = pWorker->m_iWorkUnitOffset; |
|
if ( iWorkUnitOffset >= m_MatrixWidth ) |
|
{ |
|
WUIndexType xOffset = iWorkUnitOffset % m_MatrixWidth; |
|
WUIndexType yOffset = iWorkUnitOffset / m_MatrixWidth; |
|
xOffset = m_MatrixWidth - xOffset - 1; |
|
iWorkUnitOffset = yOffset * m_MatrixWidth + xOffset; |
|
*bWorkerFinishedHisColumn = true; |
|
} |
|
else |
|
{ |
|
*bWorkerFinishedHisColumn = false; |
|
} |
|
|
|
*pWUIndex = (pWorker->m_iStartWorkUnit + iWorkUnitOffset) % m_nWorkUnits; |
|
++pWorker->m_iWorkUnitOffset; |
|
return true; |
|
} |
|
|
|
|
|
private: |
|
class CWorkerInfo |
|
{ |
|
public: |
|
WUIndexType m_iStartWorkUnit; |
|
WUIndexType m_iWorkUnitOffset; // Which work unit in my list of work units am I working on? |
|
}; |
|
|
|
WUIndexType m_nWorkUnits; |
|
WUIndexType m_MatrixWidth; |
|
WUIndexType m_MatrixHeight; |
|
CUtlVector<CWorkerInfo> m_WorkerInfos; |
|
}; |
|
|
|
|
|
class IShuffleRequester |
|
{ |
|
public: |
|
virtual void RequestShuffle() = 0; |
|
}; |
|
|
|
|
|
// This is updated every time the master decides to reshuffle. |
|
// In-between shuffles, you can call NoteWorkUnitCompleted when a work unit is completed |
|
// and it'll avoid returning that work unit from GetNextWorkUnit again, but it WON'T |
|
class CShuffledWorkUnitWalker |
|
{ |
|
public: |
|
void Init( WUIndexType nWorkUnits, IShuffleRequester *pRequester ) |
|
{ |
|
m_iLastShuffleRequest = 0; |
|
m_iCurShuffle = 1; |
|
m_flLastShuffleTime = Plat_FloatTime(); |
|
m_pShuffleRequester = pRequester; |
|
|
|
int nBytes = PAD_NUMBER( nWorkUnits, 8 ) / 8; |
|
m_CompletedWUBits.SetSize( nBytes ); |
|
m_LocalCompletedWUBits.SetSize( nBytes ); |
|
for ( WUIndexType i=0; i < m_CompletedWUBits.Count(); i++ ) |
|
m_LocalCompletedWUBits[i] = m_CompletedWUBits[i] = 0; |
|
|
|
// Setup our list of work units remaining. |
|
for ( WUIndexType iWU=0; iWU < nWorkUnits; iWU++ ) |
|
{ |
|
// Note: we're making an assumption here that if we add entries to a CUtlLinkedList in ascending order, their indices |
|
// will be ascending 1-by-1 as well. If that assumption breaks, we can create an extra array here to map WU indices to the linked list indices. |
|
WUIndexType index = m_WorkUnitsRemaining.AddToTail( iWU ); |
|
if ( index != iWU ) |
|
{ |
|
Error( "CShuffledWorkUnitWalker: assumption on CUtlLinkedList indexing failed.\n" ); |
|
} |
|
} |
|
} |
|
|
|
void Shuffle( int nWorkers ) |
|
{ |
|
if ( nWorkers == 0 ) |
|
return; |
|
|
|
++m_iCurShuffle; |
|
m_flLastShuffleTime = Plat_FloatTime(); |
|
|
|
CCriticalSectionLock csLock( &m_CS ); |
|
csLock.Lock(); |
|
|
|
m_WorkUnitsMap.RemoveAll(); |
|
m_WorkUnitsMap.EnsureCount( m_WorkUnitsRemaining.Count() ); |
|
|
|
// Here's the shuffle. The CWorkUnitWalker is going to walk each worker through its own group from 0-W, |
|
// and our job is to interleave it so when worker 0 goes [0,1,2] and worker 1 goes [100,101,102], they're actually |
|
// doing [0,N,2N] and [1,N+1,2N+1] where N=# of workers. |
|
|
|
// The grid is RxW long, and R*W is >= nWorkUnits. |
|
// R = # units per worker = width of the matrix |
|
// W = # workers = height of the matrix |
|
WUIndexType matrixHeight = nWorkers; |
|
WUIndexType matrixWidth = m_WorkUnitsRemaining.Count() / matrixHeight; |
|
if ( (m_WorkUnitsRemaining.Count() % matrixHeight) != 0 ) |
|
++matrixWidth; |
|
|
|
Assert( matrixWidth * matrixHeight >= m_WorkUnitsRemaining.Count() ); |
|
|
|
WUIndexType iWorkUnit = 0; |
|
FOR_EACH_LL( m_WorkUnitsRemaining, i ) |
|
{ |
|
WUIndexType xCoord = iWorkUnit / matrixHeight; |
|
WUIndexType yCoord = iWorkUnit % matrixHeight; |
|
Assert( xCoord < matrixWidth ); |
|
Assert( yCoord < matrixHeight ); |
|
|
|
m_WorkUnitsMap[yCoord*matrixWidth+xCoord] = m_WorkUnitsRemaining[i]; |
|
++iWorkUnit; |
|
} |
|
|
|
m_Walker.Init( matrixWidth, matrixHeight, m_WorkUnitsRemaining.Count() ); |
|
} |
|
|
|
// Threadsafe. |
|
bool Thread_IsWorkUnitCompleted( WUIndexType iWU ) |
|
{ |
|
CCriticalSectionLock csLock( &m_CS ); |
|
csLock.Lock(); |
|
|
|
byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7)); |
|
return (val != 0); |
|
} |
|
|
|
WUIndexType Thread_NumWorkUnitsRemaining() |
|
{ |
|
CCriticalSectionLock csLock( &m_CS ); |
|
csLock.Lock(); |
|
|
|
return m_WorkUnitsRemaining.Count(); |
|
} |
|
|
|
bool Thread_GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex ) |
|
{ |
|
CCriticalSectionLock csLock( &m_CS ); |
|
csLock.Lock(); |
|
|
|
while ( 1 ) |
|
{ |
|
WUIndexType iUnmappedWorkUnit; |
|
bool bWorkerFinishedHisColumn; |
|
if ( !m_Walker.GetNextWorkUnit( iWorker, &iUnmappedWorkUnit, &bWorkerFinishedHisColumn ) ) |
|
return false; |
|
|
|
// If we've done all the work units assigned to us in the last shuffle, then request a reshuffle. |
|
if ( bWorkerFinishedHisColumn ) |
|
HandleWorkerFinishedColumn(); |
|
|
|
// Check the pending list. |
|
*pWUIndex = m_WorkUnitsMap[iUnmappedWorkUnit]; |
|
byte bIsCompleted = m_CompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7)); |
|
byte bIsCompletedLocally = m_LocalCompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7)); |
|
if ( !bIsCompleted && !bIsCompletedLocally ) |
|
return true; |
|
} |
|
} |
|
|
|
void HandleWorkerFinishedColumn() |
|
{ |
|
if ( m_iLastShuffleRequest != m_iCurShuffle ) |
|
{ |
|
double flCurTime = Plat_FloatTime(); |
|
if ( flCurTime - m_flLastShuffleTime > 2.0f ) |
|
{ |
|
m_pShuffleRequester->RequestShuffle(); |
|
m_iLastShuffleRequest = m_iCurShuffle; |
|
} |
|
} |
|
} |
|
|
|
void Thread_NoteWorkUnitCompleted( WUIndexType iWU ) |
|
{ |
|
CCriticalSectionLock csLock( &m_CS ); |
|
csLock.Lock(); |
|
|
|
byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7)); |
|
if ( val == 0 ) |
|
{ |
|
m_WorkUnitsRemaining.Remove( iWU ); |
|
m_CompletedWUBits[iWU >> 3] |= (1 << (iWU & 7)); |
|
} |
|
} |
|
|
|
void Thread_NoteLocalWorkUnitCompleted( WUIndexType iWU ) |
|
{ |
|
CCriticalSectionLock csLock( &m_CS ); |
|
csLock.Lock(); |
|
m_LocalCompletedWUBits[iWU >> 3] |= (1 << (iWU & 7)); |
|
} |
|
|
|
CRC32_t GetShuffleCRC() |
|
{ |
|
#ifdef _DEBUG |
|
static bool bCalcShuffleCRC = true; |
|
#else |
|
static bool bCalcShuffleCRC = VMPI_IsParamUsed( mpi_CalcShuffleCRC ); |
|
#endif |
|
if ( bCalcShuffleCRC ) |
|
{ |
|
CCriticalSectionLock csLock( &m_CS ); |
|
csLock.Lock(); |
|
|
|
CRC32_t ret; |
|
CRC32_Init( &ret ); |
|
|
|
FOR_EACH_LL( m_WorkUnitsRemaining, i ) |
|
{ |
|
WUIndexType iWorkUnit = m_WorkUnitsRemaining[i]; |
|
CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) ); |
|
} |
|
|
|
for ( int i=0; i < m_WorkUnitsMap.Count(); i++ ) |
|
{ |
|
WUIndexType iWorkUnit = m_WorkUnitsMap[i]; |
|
CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) ); |
|
} |
|
|
|
CRC32_Final( &ret ); |
|
return ret; |
|
} |
|
else |
|
{ |
|
return false; |
|
} |
|
} |
|
|
|
private: |
|
// These are PENDING WU completions until we call Shuffle() again, at which point we actually reorder the list |
|
// based on the completed WUs. |
|
CUtlVector<byte> m_CompletedWUBits; // Bit vector of completed WUs. |
|
CUtlLinkedList<WUIndexType, WUIndexType> m_WorkUnitsRemaining; |
|
CUtlVector<WUIndexType> m_WorkUnitsMap; // Maps the 0-N indices in the CWorkUnitWalker to the list of remaining work units. |
|
|
|
// Helps us avoid some duplicates that happen during shuffling if we've completed some WUs and sent them |
|
// to the master, but the master hasn't included them in the DW_SUBPACKETID_WUS_COMPLETED_LIST yet. |
|
CUtlVector<byte> m_LocalCompletedWUBits; // Bit vector of completed WUs. |
|
|
|
// Used to control how frequently we request a reshuffle. |
|
unsigned int m_iCurShuffle; |
|
unsigned int m_iLastShuffleRequest; // The index of the shuffle we last requested a reshuffle on (don't request a reshuffle on the same one). |
|
double m_flLastShuffleTime; |
|
IShuffleRequester *m_pShuffleRequester; |
|
|
|
CWorkUnitWalker m_Walker; |
|
CCriticalSection m_CS; |
|
}; |
|
|
|
|
|
|
|
class CDistributor_SDKMaster : public IWorkUnitDistributorMaster, public IShuffleRequester |
|
{ |
|
public: |
|
virtual void Release() |
|
{ |
|
delete this; |
|
} |
|
|
|
static void Master_WorkerThread_Static( int iThread, void *pUserData ) |
|
{ |
|
((CDistributor_SDKMaster*)pUserData)->Master_WorkerThread( iThread ); |
|
} |
|
|
|
void Master_WorkerThread( int iThread ) |
|
{ |
|
while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 && !g_bVMPIEarlyExit ) |
|
{ |
|
WUIndexType iWU; |
|
if ( !m_WorkUnitWalker.Thread_GetNextWorkUnit( 0, &iWU ) ) |
|
{ |
|
// Wait until there are some WUs to do. |
|
VMPI_Sleep( 10 ); |
|
continue; |
|
} |
|
|
|
// Do this work unit. |
|
m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); // We do this before it's completed because otherwise if a Shuffle() occurs, |
|
// the other thread might happen to pickup this work unit and we don't want that. |
|
m_pInfo->m_WorkerInfo.m_pProcessFn( iThread, iWU, NULL ); |
|
NotifyLocalMasterCompletedWorkUnit( iWU ); |
|
} |
|
} |
|
|
|
virtual void DistributeWork_Master( CDSInfo *pInfo ) |
|
{ |
|
m_pInfo = pInfo; |
|
m_bForceShuffle = false; |
|
m_bShuffleRequested = false; |
|
m_flLastShuffleRequestServiceTime = Plat_FloatTime(); |
|
|
|
// Spawn idle-priority worker threads right here. |
|
m_bUsingMasterLocalThreads = (pInfo->m_WorkerInfo.m_pProcessFn != 0); |
|
if ( VMPI_IsParamUsed( mpi_NoMasterWorkerThreads ) ) |
|
{ |
|
Msg( "%s found. No worker threads will be created.\n", VMPI_GetParamString( mpi_NoMasterWorkerThreads ) ); |
|
m_bUsingMasterLocalThreads = false; |
|
} |
|
m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this ); |
|
Shuffle(); |
|
|
|
if ( m_bUsingMasterLocalThreads ) |
|
RunThreads_Start( Master_WorkerThread_Static, this, k_eRunThreadsPriority_Idle ); |
|
|
|
uint64 lastShuffleTime = Plat_MSTime(); |
|
while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 ) |
|
{ |
|
VMPI_DispatchNextMessage( 200 ); |
|
CheckLocalMasterCompletedWorkUnits(); |
|
|
|
VMPITracker_HandleDebugKeypresses(); |
|
if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() ) |
|
break; |
|
|
|
// Reshuffle the work units optimally every certain interval. |
|
if ( m_bForceShuffle || CheckShuffleRequest() ) |
|
{ |
|
Shuffle(); |
|
lastShuffleTime = Plat_MSTime(); |
|
m_bForceShuffle = false; |
|
} |
|
} |
|
|
|
RunThreads_End(); |
|
} |
|
|
|
virtual void RequestShuffle() |
|
{ |
|
m_bShuffleRequested = true; |
|
} |
|
|
|
bool CheckShuffleRequest() |
|
{ |
|
if ( m_bShuffleRequested ) |
|
{ |
|
double flCurTime = Plat_FloatTime(); |
|
if ( flCurTime - m_flLastShuffleRequestServiceTime > 2.0f ) // Only handle shuffle requests every so often. |
|
{ |
|
m_flLastShuffleRequestServiceTime = flCurTime; |
|
m_bShuffleRequested = false; |
|
return true; |
|
} |
|
} |
|
|
|
return false; |
|
} |
|
|
|
void Shuffle() |
|
{ |
|
// Build a list of who's working. |
|
CUtlVector<unsigned short> whosWorking; |
|
if ( m_bUsingMasterLocalThreads ) |
|
{ |
|
whosWorking.AddToTail( VMPI_MASTER_ID ); |
|
Assert( VMPI_MASTER_ID == 0 ); |
|
} |
|
|
|
{ |
|
CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); |
|
for ( int i=0; i < pWorkersReady->m_WorkersReady.Count(); i++ ) |
|
{ |
|
int iWorker = pWorkersReady->m_WorkersReady[i]; |
|
if ( VMPI_IsProcConnected( iWorker ) ) |
|
whosWorking.AddToTail( iWorker ); |
|
} |
|
m_WorkersReadyCS.Unlock(); |
|
} |
|
|
|
// Before sending the shuffle command, tell any of these active workers about the pending WUs completed. |
|
CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); |
|
|
|
m_WUSCompletedMessageBuffer.setLen( 0 ); |
|
if ( BuildWUsCompletedMessage( pWUsCompleted->m_Pending, m_WUSCompletedMessageBuffer ) > 0 ) |
|
{ |
|
for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ ) |
|
{ |
|
VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), whosWorking[i] ); |
|
} |
|
} |
|
pWUsCompleted->m_Completed.AddMultipleToTail( pWUsCompleted->m_Pending.Count(), pWUsCompleted->m_Pending.Base() ); // Add the pending ones to the full list now. |
|
pWUsCompleted->m_Pending.RemoveAll(); |
|
|
|
m_WUsCompletedCS.Unlock(); |
|
|
|
// Shuffle ourselves. |
|
m_WorkUnitWalker.Shuffle( whosWorking.Count() ); |
|
|
|
// Send the shuffle command to the workers. |
|
MessageBuffer mb; |
|
PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_SHUFFLE ); |
|
|
|
unsigned short nWorkers = whosWorking.Count(); |
|
mb.write( &nWorkers, sizeof( nWorkers ) ); |
|
|
|
CRC32_t shuffleCRC = m_WorkUnitWalker.GetShuffleCRC(); |
|
mb.write( &shuffleCRC, sizeof( shuffleCRC ) ); |
|
|
|
// Now for each worker, assign him an index in the shuffle and send the shuffle command. |
|
int workerIDPos = mb.getLen(); |
|
unsigned short id = 0; |
|
mb.write( &id, sizeof( id ) ); |
|
for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ ) |
|
{ |
|
id = (unsigned short)i; |
|
mb.update( workerIDPos, &id, sizeof( id ) ); |
|
VMPI_SendData( mb.data, mb.getLen(), whosWorking[i] ); |
|
} |
|
} |
|
|
|
int BuildWUsCompletedMessage( CUtlVector<WUIndexType> &wusCompleted, MessageBuffer &mb ) |
|
{ |
|
PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WUS_COMPLETED_LIST ); |
|
m_pInfo->WriteWUIndex( wusCompleted.Count(), &mb ); |
|
for ( int i=0; i < wusCompleted.Count(); i++ ) |
|
{ |
|
m_pInfo->WriteWUIndex( wusCompleted[i], &mb ); |
|
} |
|
return wusCompleted.Count(); |
|
} |
|
|
|
virtual void OnWorkerReady( int iSource ) |
|
{ |
|
CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); |
|
if ( pWorkersReady->m_WorkersReady.Find( iSource ) == -1 ) |
|
{ |
|
pWorkersReady->m_WorkersReady.AddToTail( iSource ); |
|
|
|
// Get this guy up to speed on which WUs are done. |
|
{ |
|
CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); |
|
m_WUSCompletedMessageBuffer.setLen( 0 ); |
|
BuildWUsCompletedMessage( pWUsCompleted->m_Completed, m_WUSCompletedMessageBuffer ); |
|
m_WUsCompletedCS.Unlock(); |
|
} |
|
|
|
VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), iSource ); |
|
m_bForceShuffle = true; |
|
} |
|
m_WorkersReadyCS.Unlock(); |
|
} |
|
|
|
virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) |
|
{ |
|
return Thread_HandleWorkUnitResults( iWorkUnit ); |
|
} |
|
|
|
bool Thread_HandleWorkUnitResults( WUIndexType iWorkUnit ) |
|
{ |
|
if ( m_WorkUnitWalker.Thread_IsWorkUnitCompleted( iWorkUnit ) ) |
|
{ |
|
return false; |
|
} |
|
else |
|
{ |
|
m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWorkUnit ); |
|
|
|
// We need the lock on here because our own worker threads can call into here. |
|
CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); |
|
pWUsCompleted->m_Pending.AddToTail( iWorkUnit ); |
|
m_WUsCompletedCS.Unlock(); |
|
return true; |
|
} |
|
} |
|
|
|
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) |
|
{ |
|
if ( pBuf->data[1] == DW_SUBPACKETID_REQUEST_SHUFFLE ) |
|
{ |
|
if ( bIgnoreContents ) |
|
return true; |
|
|
|
m_bShuffleRequested = true; |
|
} |
|
return false; |
|
} |
|
|
|
virtual void DisconnectHandler( int workerID ) |
|
{ |
|
CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); |
|
|
|
if ( pWorkersReady->m_WorkersReady.Find( workerID ) != -1 ) |
|
m_bForceShuffle = true; |
|
|
|
m_WorkersReadyCS.Unlock(); |
|
} |
|
|
|
public: |
|
CDSInfo *m_pInfo; |
|
|
|
class CWorkersReady |
|
{ |
|
public: |
|
CUtlVector<int> m_WorkersReady; // The list of workers who have said they're ready to participate. |
|
}; |
|
CCriticalSectionData<CWorkersReady> m_WorkersReadyCS; |
|
|
|
class CWUsCompleted |
|
{ |
|
public: |
|
CUtlVector<WUIndexType> m_Completed; // WUs completed that we have sent to workers. |
|
CUtlVector<WUIndexType> m_Pending; // WUs completed that we haven't sent to workers. |
|
}; |
|
CCriticalSectionData<CWUsCompleted> m_WUsCompletedCS; |
|
MessageBuffer m_WUSCompletedMessageBuffer; // Used to send lists of completed WUs. |
|
int m_bUsingMasterLocalThreads; |
|
|
|
bool m_bForceShuffle; |
|
bool m_bShuffleRequested; |
|
double m_flLastShuffleRequestServiceTime; |
|
|
|
CShuffledWorkUnitWalker m_WorkUnitWalker; |
|
}; |
|
|
|
|
|
class CDistributor_SDKWorker : public IWorkUnitDistributorWorker, public IShuffleRequester |
|
{ |
|
public: |
|
virtual void Init( CDSInfo *pInfo ) |
|
{ |
|
m_iMyWorkUnitWalkerID = -1; |
|
m_pInfo = pInfo; |
|
m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this ); |
|
} |
|
|
|
virtual void Release() |
|
{ |
|
delete this; |
|
} |
|
|
|
virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) |
|
{ |
|
// If we don't have an ID yet, we haven't received a Shuffle() command, so we're waiting for that before working. |
|
// TODO: we could do some random WUs here while we're waiting, although that could suck if the WUs take forever to do |
|
// and they're duplicates. |
|
if ( m_iMyWorkUnitWalkerID == -1 ) |
|
return false; |
|
|
|
// Look in our current shuffled list of work units for the next one. |
|
return m_WorkUnitWalker.Thread_GetNextWorkUnit( m_iMyWorkUnitWalkerID, pWUIndex ); |
|
} |
|
|
|
virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) |
|
{ |
|
m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); |
|
} |
|
|
|
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) |
|
{ |
|
// If it's a SHUFFLE message, then shuffle.. |
|
if ( pBuf->data[1] == DW_SUBPACKETID_SHUFFLE ) |
|
{ |
|
if ( bIgnoreContents ) |
|
return true; |
|
|
|
unsigned short nWorkers, myID; |
|
CRC32_t shuffleCRC; |
|
pBuf->read( &nWorkers, sizeof( nWorkers ) ); |
|
pBuf->read( &shuffleCRC, sizeof( shuffleCRC ) ); |
|
pBuf->read( &myID, sizeof( myID ) ); |
|
m_iMyWorkUnitWalkerID = myID; |
|
|
|
m_WorkUnitWalker.Shuffle( nWorkers ); |
|
if ( m_WorkUnitWalker.GetShuffleCRC() != shuffleCRC ) |
|
{ |
|
static int nWarnings = 1; |
|
if ( ++nWarnings <= 2 ) |
|
Warning( "\nShuffle CRC mismatch\n" ); |
|
} |
|
return true; |
|
} |
|
else if ( pBuf->data[1] == DW_SUBPACKETID_WUS_COMPLETED_LIST ) |
|
{ |
|
if ( bIgnoreContents ) |
|
return true; |
|
|
|
WUIndexType nCompleted; |
|
m_pInfo->ReadWUIndex( &nCompleted, pBuf ); |
|
for ( WUIndexType i=0; i < nCompleted; i++ ) |
|
{ |
|
WUIndexType iWU; |
|
m_pInfo->ReadWUIndex( &iWU, pBuf ); |
|
m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWU ); |
|
} |
|
|
|
return true; |
|
} |
|
|
|
return false; |
|
} |
|
|
|
virtual void RequestShuffle() |
|
{ |
|
// Ok.. request a reshuffle. |
|
MessageBuffer mb; |
|
PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_REQUEST_SHUFFLE ); |
|
VMPI_SendData( mb.data, mb.getLen(), VMPI_MASTER_ID ); |
|
} |
|
|
|
private: |
|
CDSInfo *m_pInfo; |
|
CShuffledWorkUnitWalker m_WorkUnitWalker; |
|
int m_iMyWorkUnitWalkerID; |
|
}; |
|
|
|
|
|
|
|
IWorkUnitDistributorMaster* CreateWUDistributor_SDKMaster() |
|
{ |
|
return new CDistributor_SDKMaster; |
|
} |
|
|
|
IWorkUnitDistributorWorker* CreateWUDistributor_SDKWorker() |
|
{ |
|
return new CDistributor_SDKWorker; |
|
} |
|
|
|
|