You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
602 lines
16 KiB
602 lines
16 KiB
//========= Copyright Valve Corporation, All rights reserved. ============// |
|
// |
|
// Purpose: |
|
// |
|
//============================================================================= |
|
|
|
#include "vmpi.h" |
|
#include "vmpi_distribute_work.h" |
|
#include "tier0/platform.h" |
|
#include "tier0/dbg.h" |
|
#include "utlvector.h" |
|
#include "utllinkedlist.h" |
|
#include "vmpi_dispatch.h" |
|
#include "pacifier.h" |
|
#include "vstdlib/random.h" |
|
#include "mathlib/mathlib.h" |
|
#include "threadhelpers.h" |
|
#include "threads.h" |
|
#include "tier1/strtools.h" |
|
#include "tier1/utlmap.h" |
|
#include "tier1/smartptr.h" |
|
#include "tier0/icommandline.h" |
|
#include "cmdlib.h" |
|
#include "vmpi_distribute_tracker.h" |
|
#include "vmpi_distribute_work_internal.h" |
|
|
|
|
|
#define DW_SUBPACKETID_WU_ASSIGNMENT (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0) |
|
|
|
|
|
|
|
static int s_numWusToDeal = -1; |
|
|
|
void VMPI_SetWorkUnitsPartitionSize( int numWusToDeal ) |
|
{ |
|
s_numWusToDeal = numWusToDeal; |
|
} |
|
|
|
|
|
class CWorkUnitInfo |
|
{ |
|
public: |
|
WUIndexType m_iWorkUnit; |
|
}; |
|
|
|
|
|
class CWULookupInfo |
|
{ |
|
public: |
|
CWULookupInfo() : m_iWUInfo( -1 ), m_iPartition( -222222 ), m_iPartitionListIndex( -1 ) {} |
|
|
|
public: |
|
int m_iWUInfo; // Index into m_WUInfo. |
|
int m_iPartition; // Which partition it's in. |
|
int m_iPartitionListIndex; // Index into its partition's m_WUs. |
|
}; |
|
|
|
|
|
class CPartitionInfo |
|
{ |
|
public: |
|
typedef CUtlLinkedList< WUIndexType, int > PartitionWUs; |
|
|
|
public: |
|
int m_iPartition; // Index into m_Partitions. |
|
int m_iWorker; // Who owns this partition? |
|
PartitionWUs m_WUs; // Which WUs are in this partition? |
|
}; |
|
|
|
|
|
// Work units tracker to track consecutive finished blocks |
|
class CWorkUnitsTracker |
|
{ |
|
public: |
|
CWorkUnitsTracker() {} |
|
|
|
public: |
|
// Initializes the unit tracker to receive numUnits in future |
|
void PrepareForWorkUnits( uint64 numUnits ); |
|
// Signals that a work unit has been finished |
|
// returns a zero-based index of the next pending work unit |
|
// up to which the task list has been processed fully now |
|
// because the received work unit filled the gap or was the next pending work unit. |
|
// returns 0 to indicate that this work unit is a "faster processed future work unit". |
|
uint64 WorkUnitFinished( uint64 iWorkUnit ); |
|
|
|
public: |
|
enum WUInfo { kNone, kTrigger, kDone }; |
|
CVisibleWindowVector< uint8 > m_arrInfo; |
|
}; |
|
|
|
void CWorkUnitsTracker::PrepareForWorkUnits( uint64 numUnits ) |
|
{ |
|
m_arrInfo.Reset( numUnits + 1 ); |
|
|
|
if ( numUnits ) |
|
{ |
|
m_arrInfo.ExpandWindow( 2ull, kNone ); |
|
m_arrInfo.Get( 0ull ) = kTrigger; |
|
} |
|
} |
|
|
|
uint64 CWorkUnitsTracker::WorkUnitFinished( uint64 iWorkUnit ) |
|
{ |
|
uint64 uiResult = uint64( 0 ); |
|
|
|
if ( iWorkUnit >= m_arrInfo.FirstPossibleIndex() && iWorkUnit < m_arrInfo.PastPossibleIndex() ) |
|
{ |
|
// Need to access the element |
|
m_arrInfo.ExpandWindow( iWorkUnit + 1, kNone ); |
|
|
|
// Set it done |
|
uint8 &rchThere = m_arrInfo.Get( iWorkUnit ), chThere = rchThere; |
|
rchThere = kDone; |
|
|
|
// Should we trigger? |
|
if ( kTrigger == chThere ) |
|
{ |
|
// Go along all "done" work units and trigger the last found one |
|
while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) && |
|
( kDone == m_arrInfo.Get( iWorkUnit ) ) ) |
|
continue; |
|
|
|
m_arrInfo.Get( iWorkUnit ) = kTrigger; |
|
m_arrInfo.ShrinkWindow( iWorkUnit - 1 ); |
|
uiResult = iWorkUnit; |
|
} |
|
else if( iWorkUnit == m_arrInfo.FirstPossibleIndex() ) |
|
{ |
|
// Go along all "done" work units and shrink including the last found one |
|
while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) && |
|
( kDone == m_arrInfo.Get( iWorkUnit ) ) ) |
|
continue; |
|
|
|
m_arrInfo.ShrinkWindow( iWorkUnit - 1 ); |
|
} |
|
} |
|
|
|
return uiResult; |
|
} |
|
|
|
CWorkUnitsTracker g_MasterWorkUnitsTracker; |
|
|
|
|
|
|
|
static bool CompareSoonestWorkUnitSets( CPartitionInfo::PartitionWUs * const &x, CPartitionInfo::PartitionWUs * const &y ) |
|
{ |
|
// Compare by fourth/second/first job in the partitions |
|
WUIndexType missing = ~WUIndexType(0); |
|
WUIndexType jobsX[4] = { missing, missing, missing, missing }; |
|
WUIndexType jobsY[4] = { missing, missing, missing, missing }; |
|
int counter = 0; |
|
|
|
counter = 0; |
|
FOR_EACH_LL( (*x), i ) |
|
{ |
|
jobsX[ counter ++ ] = (*x)[i]; |
|
if ( counter >= 4 ) |
|
break; |
|
} |
|
|
|
counter = 0; |
|
FOR_EACH_LL( (*y), i ) |
|
{ |
|
jobsY[ counter ++ ] = (*y)[i]; |
|
if ( counter >= 4 ) |
|
break; |
|
} |
|
|
|
// Compare |
|
if ( jobsX[3] != jobsY[3] ) |
|
return ( jobsX[3] < jobsY[3] ); |
|
|
|
if ( jobsX[1] != jobsY[1] ) |
|
return ( jobsX[1] < jobsY[1] ); |
|
|
|
return jobsX[0] < jobsY[0]; |
|
} |
|
|
|
|
|
|
|
class CDistributor_DefaultMaster : public IWorkUnitDistributorMaster |
|
{ |
|
public: |
|
virtual void Release() |
|
{ |
|
delete this; |
|
} |
|
|
|
virtual void DistributeWork_Master( CDSInfo *pInfo ) |
|
{ |
|
m_pInfo = pInfo; |
|
g_MasterWorkUnitsTracker.PrepareForWorkUnits( m_pInfo->m_nWorkUnits ); |
|
|
|
m_WULookup.Reset( pInfo->m_nWorkUnits ); |
|
while ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() ) |
|
{ |
|
VMPI_DispatchNextMessage( 200 ); |
|
|
|
VMPITracker_HandleDebugKeypresses(); |
|
|
|
if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() ) |
|
break; |
|
} |
|
} |
|
|
|
virtual void OnWorkerReady( int iSource ) |
|
{ |
|
AssignWUsToWorker( iSource ); |
|
} |
|
|
|
virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) |
|
{ |
|
CWULookupInfo *pLookup = NULL; |
|
if ( iWorkUnit >= m_WULookup.FirstPossibleIndex() && iWorkUnit < m_WULookup.PastVisibleIndex() ) |
|
pLookup = &m_WULookup.Get( iWorkUnit ); |
|
|
|
if ( !pLookup || pLookup->m_iWUInfo == -1 ) |
|
return false; |
|
|
|
// Mark this WU finished and remove it from the list of pending WUs. |
|
m_WUInfo.Remove( pLookup->m_iWUInfo ); |
|
pLookup->m_iWUInfo = -1; |
|
|
|
|
|
// Get rid of the WU from its partition. |
|
int iPartition = pLookup->m_iPartition; |
|
CPartitionInfo *pPartition = m_Partitions[iPartition]; |
|
pPartition->m_WUs.Remove( pLookup->m_iPartitionListIndex ); |
|
|
|
// Shrink the window of the lookup work units |
|
if ( iWorkUnit == m_WULookup.FirstPossibleIndex() ) |
|
{ |
|
WUIndexType kwu = iWorkUnit; |
|
for ( WUIndexType kwuEnd = m_WULookup.PastVisibleIndex(); kwu < kwuEnd; ++ kwu ) |
|
{ |
|
if ( -1 != m_WULookup.Get( kwu ).m_iWUInfo && kwu > iWorkUnit ) |
|
break; |
|
} |
|
m_WULookup.ShrinkWindow( kwu - 1 ); |
|
} |
|
|
|
// Give the worker some new work if need be. |
|
if ( pPartition->m_WUs.Count() == 0 ) |
|
{ |
|
int iPartitionWorker = pPartition->m_iWorker; |
|
delete pPartition; |
|
m_Partitions.Remove( iPartition ); |
|
|
|
// If there are any more WUs remaining, give the worker from this partition some more of them. |
|
if ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() ) |
|
{ |
|
AssignWUsToWorker( iPartitionWorker ); |
|
} |
|
} |
|
|
|
uint64 iDoneWorkUnits = g_MasterWorkUnitsTracker.WorkUnitFinished( iWorkUnit ); |
|
if ( iDoneWorkUnits && g_pDistributeWorkCallbacks ) |
|
{ |
|
g_pDistributeWorkCallbacks->OnWorkUnitsCompleted( iDoneWorkUnits ); |
|
} |
|
|
|
return true; |
|
} |
|
|
|
virtual void DisconnectHandler( int workerID ) |
|
{ |
|
int iPartitionLookup = FindPartitionByWorker( workerID ); |
|
if ( iPartitionLookup != -1 ) |
|
{ |
|
// Mark this guy's partition as unowned so another worker can get it. |
|
CPartitionInfo *pPartition = m_Partitions[iPartitionLookup]; |
|
pPartition->m_iWorker = -1; |
|
} |
|
} |
|
|
|
CPartitionInfo* AddPartition( int iWorker ) |
|
{ |
|
CPartitionInfo *pNew = new CPartitionInfo; |
|
pNew->m_iPartition = m_Partitions.AddToTail( pNew ); |
|
pNew->m_iWorker = iWorker; |
|
return pNew; |
|
} |
|
|
|
bool SplitWUsPartition( CPartitionInfo *pPartitionLarge, |
|
CPartitionInfo **ppFirstHalf, CPartitionInfo **ppSecondHalf, |
|
int iFirstHalfWorker, int iSecondHalfWorker ) |
|
{ |
|
int nCount = pPartitionLarge->m_WUs.Count(); |
|
|
|
if ( nCount > 1 ) // Allocate the partitions for the two workers |
|
{ |
|
*ppFirstHalf = AddPartition( iFirstHalfWorker ); |
|
*ppSecondHalf = AddPartition( iSecondHalfWorker ); |
|
} |
|
else // Specially transfer a partition with too few work units |
|
{ |
|
*ppFirstHalf = NULL; |
|
*ppSecondHalf = AddPartition( iSecondHalfWorker ); |
|
} |
|
|
|
// Prepare for transfer |
|
CPartitionInfo *arrNewParts[2] = { *ppFirstHalf ? *ppFirstHalf : *ppSecondHalf, *ppSecondHalf }; |
|
|
|
// Transfer the work units: |
|
// alternate first/second halves |
|
// don't put more than "half deal units" tasks into the second half |
|
// e.g. { 1, 2, 3, 4 } |
|
// becomes: 1st half { 1, 2 }, 2nd half { 3, 4 } |
|
for ( int k = 0; k < nCount; ++ k ) |
|
{ |
|
int iHead = pPartitionLarge->m_WUs.Head(); |
|
WUIndexType iWU = pPartitionLarge->m_WUs[ iHead ]; |
|
pPartitionLarge->m_WUs.Remove( iHead ); |
|
|
|
/* |
|
int nHalf = !!( ( k % 2 ) || ( k >= nCount - 1 ) ); |
|
if ( k == 5 ) // no more than 2 jobs to branch off |
|
arrNewParts[ 1 ] = arrNewParts[ 0 ]; |
|
*/ |
|
int nHalf = !( k < nCount/2 ); |
|
CPartitionInfo *pTo = arrNewParts[ nHalf ]; |
|
|
|
CWULookupInfo &li = m_WULookup.Get( iWU ); |
|
li.m_iPartition = pTo->m_iPartition; |
|
li.m_iPartitionListIndex = pTo->m_WUs.AddToTail( iWU ); |
|
} |
|
|
|
// LogPartitionsWorkUnits( pInfo ); |
|
return true; |
|
} |
|
|
|
void AssignWUsToWorker( int iWorker ) |
|
{ |
|
// Get rid of this worker's old partition. |
|
int iPrevious = FindPartitionByWorker( iWorker ); |
|
if ( iPrevious != -1 ) |
|
{ |
|
delete m_Partitions[iPrevious]; |
|
m_Partitions.Remove( iPrevious ); |
|
} |
|
|
|
if ( g_iVMPIVerboseLevel >= 1 ) |
|
Msg( "A" ); |
|
|
|
|
|
CVisibleWindowVector< CWULookupInfo > &vlkup = m_WULookup; |
|
if ( CommandLine()->FindParm( "-mpi_NoScheduler" ) ) |
|
{ |
|
Warning( "\n\n-mpi_NoScheduler found: Warning - this should only be used for testing and with 1 worker!\n\n" ); |
|
vlkup.ExpandWindow( m_pInfo->m_nWorkUnits ); |
|
CPartitionInfo *pPartition = AddPartition( iWorker ); |
|
for ( int i=0; i < m_pInfo->m_nWorkUnits; i++ ) |
|
{ |
|
CWorkUnitInfo info; |
|
info.m_iWorkUnit = i; |
|
|
|
CWULookupInfo &li = vlkup.Get( i ); |
|
li.m_iPartition = pPartition->m_iPartition; |
|
li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i ); |
|
li.m_iWUInfo = m_WUInfo.AddToTail( info ); |
|
} |
|
|
|
SendPartitionToWorker( pPartition, iWorker ); |
|
return; |
|
} |
|
|
|
|
|
// Any partitions abandoned by workers? |
|
int iAbandonedPartition = FindPartitionByWorker( -1 ); |
|
if ( -1 != iAbandonedPartition ) |
|
{ |
|
CPartitionInfo *pPartition = m_Partitions[ iAbandonedPartition ]; |
|
pPartition->m_iWorker = iWorker; |
|
SendPartitionToWorker( pPartition, iWorker ); |
|
} |
|
|
|
// Any absolutely untouched partitions yet? |
|
else if ( vlkup.PastVisibleIndex() < vlkup.PastPossibleIndex() ) |
|
{ |
|
// Figure out how many WUs to include in a batch |
|
int numWusToDeal = s_numWusToDeal; |
|
if ( numWusToDeal <= 0 ) |
|
{ |
|
uint64 uiFraction = vlkup.PastPossibleIndex() / g_nMaxWorkerCount; |
|
Assert( uiFraction < INT_MAX/2 ); |
|
|
|
numWusToDeal = int( uiFraction ); |
|
if ( numWusToDeal <= 0 ) |
|
numWusToDeal = 8; |
|
} |
|
|
|
// Allocate room for upcoming work units lookup |
|
WUIndexType iBegin = vlkup.PastVisibleIndex(); |
|
WUIndexType iEnd = min( iBegin + g_nMaxWorkerCount * numWusToDeal, vlkup.PastPossibleIndex() ); |
|
vlkup.ExpandWindow( iEnd - 1 ); |
|
|
|
// Allocate a partition |
|
size_t numPartitions = min( ( size_t )(iEnd - iBegin), ( size_t )g_nMaxWorkerCount ); |
|
CArrayAutoPtr< CPartitionInfo * > spArrPartitions( new CPartitionInfo* [ numPartitions ] ); |
|
CPartitionInfo **arrPartitions = spArrPartitions.Get(); |
|
|
|
arrPartitions[0] = AddPartition( iWorker ); |
|
for ( size_t k = 1; k < numPartitions; ++ k ) |
|
arrPartitions[k] = AddPartition( -1 ); |
|
|
|
// Assign upcoming work units to the partitions. |
|
for ( WUIndexType i = iBegin ; i < iEnd; ++ i ) |
|
{ |
|
CWorkUnitInfo info; |
|
info.m_iWorkUnit = i; |
|
|
|
CPartitionInfo *pPartition = arrPartitions[ size_t( (i - iBegin) % numPartitions ) ]; |
|
|
|
CWULookupInfo &li = vlkup.Get( i ); |
|
li.m_iPartition = pPartition->m_iPartition; |
|
li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i ); |
|
li.m_iWUInfo = m_WUInfo.AddToTail( info ); |
|
} |
|
|
|
// Now send this guy the WU list in his partition. |
|
SendPartitionToWorker( arrPartitions[0], iWorker ); |
|
} |
|
|
|
// Split one of the last partitions to finish sooner |
|
else |
|
{ |
|
// Find a partition to split. |
|
int iPartToSplit = FindSoonestPartition(); |
|
if ( iPartToSplit >= 0 ) |
|
{ |
|
CPartitionInfo *pPartition = m_Partitions[ iPartToSplit ]; |
|
|
|
CPartitionInfo *pOldHalf = NULL, *pNewHalf = NULL; |
|
int iOldWorker = pPartition->m_iWorker, iNewWorker = iWorker; |
|
if ( SplitWUsPartition( pPartition, &pOldHalf, &pNewHalf, iOldWorker, iNewWorker ) ) |
|
{ |
|
if ( pOldHalf ) |
|
SendPartitionToWorker( pOldHalf, iOldWorker ); |
|
if ( pNewHalf ) |
|
SendPartitionToWorker( pNewHalf, iNewWorker ); |
|
|
|
// Delete the partition that got split |
|
Assert( pPartition->m_WUs.Count() == 0 ); |
|
delete pPartition; |
|
m_Partitions.Remove( iPartToSplit ); |
|
} |
|
} |
|
} |
|
} |
|
|
|
int FindSoonestPartition() |
|
{ |
|
CUtlLinkedList < CPartitionInfo *, int > &lst = m_Partitions; |
|
|
|
// Sorted partitions |
|
CUtlMap< CPartitionInfo::PartitionWUs *, int > sortedPartitions ( CompareSoonestWorkUnitSets ); |
|
sortedPartitions.EnsureCapacity( lst.Count() ); |
|
FOR_EACH_LL( lst, i ) |
|
{ |
|
sortedPartitions.Insert( &lst[i]->m_WUs, i ); |
|
} |
|
|
|
if ( sortedPartitions.Count() ) |
|
{ |
|
return sortedPartitions.Element( sortedPartitions.FirstInorder() ); |
|
} |
|
|
|
return lst.Head(); |
|
} |
|
|
|
int FindPartitionByWorker( int iWorker ) |
|
{ |
|
FOR_EACH_LL( m_Partitions, i ) |
|
{ |
|
if ( m_Partitions[i]->m_iWorker == iWorker ) |
|
return i; |
|
} |
|
return -1; |
|
} |
|
|
|
void SendPartitionToWorker( CPartitionInfo *pPartition, int iWorker ) |
|
{ |
|
// Stuff the next nWUs work units into the buffer. |
|
MessageBuffer mb; |
|
PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WU_ASSIGNMENT ); |
|
|
|
FOR_EACH_LL( pPartition->m_WUs, i ) |
|
{ |
|
WUIndexType iWU = pPartition->m_WUs[i]; |
|
mb.write( &iWU, sizeof( iWU ) ); |
|
VMPITracker_WorkUnitSentToWorker( ( int ) iWU, iWorker ); |
|
} |
|
|
|
VMPI_SendData( mb.data, mb.getLen(), iWorker ); |
|
} |
|
|
|
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) |
|
{ |
|
return false; |
|
} |
|
|
|
private: |
|
|
|
CDSInfo *m_pInfo; |
|
|
|
CUtlLinkedList<CPartitionInfo*,int> m_Partitions; |
|
CVisibleWindowVector<CWULookupInfo> m_WULookup; // Map work unit index to CWorkUnitInfo. |
|
CUtlLinkedList<CWorkUnitInfo,int> m_WUInfo; // Sorted with most elegible WU at the head. |
|
}; |
|
|
|
|
|
|
|
class CDistributor_DefaultWorker : public IWorkUnitDistributorWorker |
|
{ |
|
public: |
|
virtual void Release() |
|
{ |
|
delete this; |
|
} |
|
|
|
virtual void Init( CDSInfo *pInfo ) |
|
{ |
|
} |
|
|
|
virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) |
|
{ |
|
CCriticalSectionLock csLock( &m_CS ); |
|
csLock.Lock(); |
|
|
|
// NOTE: this is called from INSIDE worker threads. |
|
if ( m_WorkUnits.Count() == 0 ) |
|
{ |
|
return false; |
|
} |
|
else |
|
{ |
|
*pWUIndex = m_WorkUnits[ m_WorkUnits.Head() ]; |
|
m_WorkUnits.Remove( m_WorkUnits.Head() ); |
|
return true; |
|
} |
|
} |
|
|
|
virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) |
|
{ |
|
} |
|
|
|
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) |
|
{ |
|
if ( pBuf->data[1] == DW_SUBPACKETID_WU_ASSIGNMENT ) |
|
{ |
|
// If the message wasn't even related to the current DistributeWork() call we're on, ignore it. |
|
if ( bIgnoreContents ) |
|
return true; |
|
|
|
if ( ((pBuf->getLen() - pBuf->getOffset()) % sizeof( WUIndexType )) != 0 ) |
|
{ |
|
Error( "DistributeWork: invalid work units packet from master" ); |
|
} |
|
|
|
// Parse out the work unit indices. |
|
CCriticalSectionLock csLock( &m_CS ); |
|
csLock.Lock(); |
|
|
|
m_WorkUnits.Purge(); |
|
|
|
int nIndices = (pBuf->getLen() - pBuf->getOffset()) / sizeof( WUIndexType ); |
|
for ( int i=0; i < nIndices; i++ ) |
|
{ |
|
WUIndexType iWU; |
|
pBuf->read( &iWU, sizeof( iWU ) ); |
|
|
|
// Add the index to the list. |
|
m_WorkUnits.AddToTail( iWU ); |
|
} |
|
|
|
csLock.Unlock(); |
|
|
|
return true; |
|
} |
|
else |
|
{ |
|
return false; |
|
} |
|
} |
|
|
|
// Threads eat up the list of WUs in here. |
|
CCriticalSection m_CS; |
|
CUtlLinkedList<WUIndexType, int> m_WorkUnits; // A list of work units assigned to this worker |
|
}; |
|
|
|
|
|
|
|
|
|
IWorkUnitDistributorMaster* CreateWUDistributor_DefaultMaster() |
|
{ |
|
return new CDistributor_DefaultMaster; |
|
} |
|
IWorkUnitDistributorWorker* CreateWUDistributor_DefaultWorker() |
|
{ |
|
return new CDistributor_DefaultWorker; |
|
}
|
|
|