You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
641 lines
16 KiB
641 lines
16 KiB
5 years ago
|
//========= Copyright Valve Corporation, All rights reserved. ============//
|
||
|
//
|
||
|
// Purpose:
|
||
|
//
|
||
|
// $NoKeywords: $
|
||
|
//
|
||
|
//=============================================================================//
|
||
|
|
||
|
#include <windows.h>
|
||
|
#include "vis.h"
|
||
|
#include "threads.h"
|
||
|
#include "stdlib.h"
|
||
|
#include "pacifier.h"
|
||
|
#include "mpi_stats.h"
|
||
|
#include "vmpi.h"
|
||
|
#include "vmpi_dispatch.h"
|
||
|
#include "vmpi_filesystem.h"
|
||
|
#include "vmpi_distribute_work.h"
|
||
|
#include "iphelpers.h"
|
||
|
#include "threadhelpers.h"
|
||
|
#include "vstdlib/random.h"
|
||
|
#include "vmpi_tools_shared.h"
|
||
|
#include <conio.h>
|
||
|
#include "scratchpad_helpers.h"
|
||
|
|
||
|
|
||
|
#define VMPI_VVIS_PACKET_ID 1
|
||
|
// Sub packet IDs.
|
||
|
#define VMPI_SUBPACKETID_DISCONNECT_NOTIFY 3 // We send ourselves this when there is a disconnect.
|
||
|
#define VMPI_SUBPACKETID_BASEPORTALVIS 5
|
||
|
#define VMPI_SUBPACKETID_PORTALFLOW 6
|
||
|
#define VMPI_BASEPORTALVIS_RESULTS 7
|
||
|
#define VMPI_BASEPORTALVIS_WORKER_DONE 8
|
||
|
#define VMPI_PORTALFLOW_RESULTS 9
|
||
|
#define VMPI_SUBPACKETID_BASEPORTALVIS_SYNC 11
|
||
|
#define VMPI_SUBPACKETID_PORTALFLOW_SYNC 12
|
||
|
#define VMPI_SUBPACKETID_MC_ADDR 13
|
||
|
|
||
|
// DistributeWork owns this packet ID.
|
||
|
#define VMPI_DISTRIBUTEWORK_PACKETID 2
|
||
|
|
||
|
|
||
|
extern bool fastvis;
|
||
|
|
||
|
// The worker waits until these are true.
|
||
|
bool g_bBasePortalVisSync = false;
|
||
|
bool g_bPortalFlowSync = false;
|
||
|
|
||
|
CUtlVector<char> g_BasePortalVisResultsFilename;
|
||
|
|
||
|
CCycleCount g_CPUTime;
|
||
|
|
||
|
|
||
|
// This stuff is all for the multicast channel the master uses to send out the portal results.
|
||
|
ISocket *g_pPortalMCSocket = NULL;
|
||
|
CIPAddr g_PortalMCAddr;
|
||
|
bool g_bGotMCAddr = false;
|
||
|
HANDLE g_hMCThread = NULL;
|
||
|
CEvent g_MCThreadExitEvent;
|
||
|
unsigned long g_PortalMCThreadUniqueID = 0;
|
||
|
int g_nMulticastPortalsReceived = 0;
|
||
|
|
||
|
|
||
|
// Handle VVIS packets.
|
||
|
bool VVIS_DispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID )
|
||
|
{
|
||
|
switch ( pBuf->data[1] )
|
||
|
{
|
||
|
case VMPI_SUBPACKETID_MC_ADDR:
|
||
|
{
|
||
|
pBuf->setOffset( 2 );
|
||
|
pBuf->read( &g_PortalMCAddr, sizeof( g_PortalMCAddr ) );
|
||
|
g_bGotMCAddr = true;
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
case VMPI_SUBPACKETID_DISCONNECT_NOTIFY:
|
||
|
{
|
||
|
// This is just used to cause nonblocking dispatches to jump out so loops like the one
|
||
|
// in AppBarrier can handle the fact that there are disconnects.
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
case VMPI_SUBPACKETID_BASEPORTALVIS_SYNC:
|
||
|
{
|
||
|
g_bBasePortalVisSync = true;
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
case VMPI_SUBPACKETID_PORTALFLOW_SYNC:
|
||
|
{
|
||
|
g_bPortalFlowSync = true;
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
case VMPI_BASEPORTALVIS_RESULTS:
|
||
|
{
|
||
|
const char *pFilename = &pBuf->data[2];
|
||
|
g_BasePortalVisResultsFilename.CopyArray( pFilename, strlen( pFilename ) + 1 );
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
default:
|
||
|
{
|
||
|
return false;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
CDispatchReg g_VVISDispatchReg( VMPI_VVIS_PACKET_ID, VVIS_DispatchFn ); // register to handle the messages we want
|
||
|
CDispatchReg g_DistributeWorkReg( VMPI_DISTRIBUTEWORK_PACKETID, DistributeWorkDispatch );
|
||
|
|
||
|
|
||
|
|
||
|
void VMPI_DeletePortalMCSocket()
|
||
|
{
|
||
|
// Stop the thread if it exists.
|
||
|
if ( g_hMCThread )
|
||
|
{
|
||
|
g_MCThreadExitEvent.SetEvent();
|
||
|
WaitForSingleObject( g_hMCThread, INFINITE );
|
||
|
CloseHandle( g_hMCThread );
|
||
|
g_hMCThread = NULL;
|
||
|
}
|
||
|
|
||
|
if ( g_pPortalMCSocket )
|
||
|
{
|
||
|
g_pPortalMCSocket->Release();
|
||
|
g_pPortalMCSocket = NULL;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
void VVIS_SetupMPI( int &argc, char **&argv )
|
||
|
{
|
||
|
if ( !VMPI_FindArg( argc, argv, "-mpi", "" ) && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), "" ) )
|
||
|
return;
|
||
|
|
||
|
CmdLib_AtCleanup( VMPI_Stats_Term );
|
||
|
CmdLib_AtCleanup( VMPI_DeletePortalMCSocket );
|
||
|
|
||
|
VMPI_Stats_InstallSpewHook();
|
||
|
|
||
|
// Force local mode?
|
||
|
VMPIRunMode mode;
|
||
|
if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Local ), "" ) )
|
||
|
mode = VMPI_RUN_LOCAL;
|
||
|
else
|
||
|
mode = VMPI_RUN_NETWORKED;
|
||
|
|
||
|
//
|
||
|
// Extract mpi specific arguments
|
||
|
//
|
||
|
Msg( "Initializing VMPI...\n" );
|
||
|
if ( !VMPI_Init( argc, argv, "dependency_info_vvis.txt", HandleMPIDisconnect, mode ) )
|
||
|
{
|
||
|
Error( "MPI_Init failed." );
|
||
|
}
|
||
|
|
||
|
StatsDB_InitStatsDatabase( argc, argv, "dbinfo_vvis.txt" );
|
||
|
}
|
||
|
|
||
|
|
||
|
void ProcessBasePortalVis( int iThread, uint64 iPortal, MessageBuffer *pBuf )
|
||
|
{
|
||
|
CTimeAdder adder( &g_CPUTime );
|
||
|
|
||
|
BasePortalVis( iThread, iPortal );
|
||
|
|
||
|
// Send my result to the master
|
||
|
if ( pBuf )
|
||
|
{
|
||
|
portal_t * p = &portals[iPortal];
|
||
|
pBuf->write( p->portalfront, portalbytes );
|
||
|
pBuf->write( p->portalflood, portalbytes );
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
void ReceiveBasePortalVis( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker )
|
||
|
{
|
||
|
portal_t * p = &portals[iWorkUnit];
|
||
|
if ( p->portalflood != 0 || p->portalfront != 0 || p->portalvis != 0)
|
||
|
{
|
||
|
Msg("Duplicate portal %llu\n", iWorkUnit);
|
||
|
}
|
||
|
|
||
|
if ( pBuf->getLen() - pBuf->getOffset() != portalbytes*2 )
|
||
|
Error( "Invalid packet in ReceiveBasePortalVis." );
|
||
|
|
||
|
//
|
||
|
// allocate memory for bitwise vis solutions for this portal
|
||
|
//
|
||
|
p->portalfront = (byte*)malloc (portalbytes);
|
||
|
pBuf->read( p->portalfront, portalbytes );
|
||
|
|
||
|
p->portalflood = (byte*)malloc (portalbytes);
|
||
|
pBuf->read( p->portalflood, portalbytes );
|
||
|
|
||
|
p->portalvis = (byte*)malloc (portalbytes);
|
||
|
memset (p->portalvis, 0, portalbytes);
|
||
|
|
||
|
p->nummightsee = CountBits( p->portalflood, g_numportals*2 );
|
||
|
}
|
||
|
|
||
|
|
||
|
//-----------------------------------------
|
||
|
//
|
||
|
// Run BasePortalVis across all available processing nodes
|
||
|
// Then collect and redistribute the results.
|
||
|
//
|
||
|
void RunMPIBasePortalVis()
|
||
|
{
|
||
|
int i;
|
||
|
|
||
|
Msg( "\n\nportalbytes: %d\nNum Work Units: %d\nTotal data size: %d\n", portalbytes, g_numportals*2, portalbytes*g_numportals*2 );
|
||
|
Msg("%-20s ", "BasePortalVis:");
|
||
|
if ( g_bMPIMaster )
|
||
|
StartPacifier("");
|
||
|
|
||
|
|
||
|
VMPI_SetCurrentStage( "RunMPIBasePortalVis" );
|
||
|
|
||
|
// Note: we're aiming for about 1500 portals in a map, so about 3000 work units.
|
||
|
g_CPUTime.Init();
|
||
|
double elapsed = DistributeWork(
|
||
|
g_numportals * 2, // # work units
|
||
|
VMPI_DISTRIBUTEWORK_PACKETID, // packet ID
|
||
|
ProcessBasePortalVis, // Worker function to process work units
|
||
|
ReceiveBasePortalVis // Master function to receive work results
|
||
|
);
|
||
|
|
||
|
if ( g_bMPIMaster )
|
||
|
{
|
||
|
EndPacifier( false );
|
||
|
Msg( " (%d)\n", (int)elapsed );
|
||
|
}
|
||
|
|
||
|
//
|
||
|
// Distribute the results to all the workers.
|
||
|
//
|
||
|
if ( g_bMPIMaster )
|
||
|
{
|
||
|
if ( !fastvis )
|
||
|
{
|
||
|
VMPI_SetCurrentStage( "SendPortalResults" );
|
||
|
|
||
|
// Store all the portal results in a temp file and multicast that to the workers.
|
||
|
CUtlVector<char> allPortalData;
|
||
|
allPortalData.SetSize( g_numportals * 2 * portalbytes * 2 );
|
||
|
|
||
|
char *pOut = allPortalData.Base();
|
||
|
for ( i=0; i < g_numportals * 2; i++)
|
||
|
{
|
||
|
portal_t *p = &portals[i];
|
||
|
|
||
|
memcpy( pOut, p->portalfront, portalbytes );
|
||
|
pOut += portalbytes;
|
||
|
|
||
|
memcpy( pOut, p->portalflood, portalbytes );
|
||
|
pOut += portalbytes;
|
||
|
}
|
||
|
|
||
|
const char *pVirtualFilename = "--portal-results--";
|
||
|
VMPI_FileSystem_CreateVirtualFile( pVirtualFilename, allPortalData.Base(), allPortalData.Count() );
|
||
|
|
||
|
char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_BASEPORTALVIS_RESULTS };
|
||
|
VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), pVirtualFilename, strlen( pVirtualFilename ) + 1, VMPI_PERSISTENT );
|
||
|
}
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
VMPI_SetCurrentStage( "RecvPortalResults" );
|
||
|
|
||
|
// Wait until we've received the filename from the master.
|
||
|
while ( g_BasePortalVisResultsFilename.Count() == 0 )
|
||
|
{
|
||
|
VMPI_DispatchNextMessage();
|
||
|
}
|
||
|
|
||
|
// Open
|
||
|
FileHandle_t fp = g_pFileSystem->Open( g_BasePortalVisResultsFilename.Base(), "rb", VMPI_VIRTUAL_FILES_PATH_ID );
|
||
|
if ( !fp )
|
||
|
Error( "Can't open '%s' to read portal info.", g_BasePortalVisResultsFilename.Base() );
|
||
|
|
||
|
for ( i=0; i < g_numportals * 2; i++)
|
||
|
{
|
||
|
portal_t *p = &portals[i];
|
||
|
|
||
|
p->portalfront = (byte*)malloc (portalbytes);
|
||
|
g_pFileSystem->Read( p->portalfront, portalbytes, fp );
|
||
|
|
||
|
p->portalflood = (byte*)malloc (portalbytes);
|
||
|
g_pFileSystem->Read( p->portalflood, portalbytes, fp );
|
||
|
|
||
|
p->portalvis = (byte*)malloc (portalbytes);
|
||
|
memset (p->portalvis, 0, portalbytes);
|
||
|
|
||
|
p->nummightsee = CountBits (p->portalflood, g_numportals*2);
|
||
|
}
|
||
|
|
||
|
g_pFileSystem->Close( fp );
|
||
|
}
|
||
|
|
||
|
|
||
|
if ( !g_bMPIMaster )
|
||
|
{
|
||
|
if ( g_iVMPIVerboseLevel >= 1 )
|
||
|
Msg( "\n%% worker CPU utilization during BasePortalVis: %.1f\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads );
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
void ProcessPortalFlow( int iThread, uint64 iPortal, MessageBuffer *pBuf )
|
||
|
{
|
||
|
// Process Portal and distribute results
|
||
|
CTimeAdder adder( &g_CPUTime );
|
||
|
|
||
|
PortalFlow( iThread, iPortal );
|
||
|
|
||
|
// Send my result to root and potentially the other slaves
|
||
|
// The slave results are read in RecursiveLeafFlow
|
||
|
//
|
||
|
if ( pBuf )
|
||
|
{
|
||
|
portal_t * p = sorted_portals[iPortal];
|
||
|
pBuf->write( p->portalvis, portalbytes );
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
void ReceivePortalFlow( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker )
|
||
|
{
|
||
|
portal_t *p = sorted_portals[iWorkUnit];
|
||
|
|
||
|
if ( p->status != stat_done )
|
||
|
{
|
||
|
pBuf->read( p->portalvis, portalbytes );
|
||
|
p->status = stat_done;
|
||
|
|
||
|
|
||
|
// Multicast the status of this portal out.
|
||
|
if ( g_pPortalMCSocket )
|
||
|
{
|
||
|
char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_PORTALFLOW_RESULTS };
|
||
|
void *chunks[4] = { cPacketID, &g_PortalMCThreadUniqueID, &iWorkUnit, p->portalvis };
|
||
|
int chunkLengths[4] = { sizeof( cPacketID ), sizeof( g_PortalMCThreadUniqueID ), sizeof( iWorkUnit ), portalbytes };
|
||
|
|
||
|
g_pPortalMCSocket->SendChunksTo( &g_PortalMCAddr, chunks, chunkLengths, ARRAYSIZE( chunks ) );
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
DWORD WINAPI PortalMCThreadFn( LPVOID p )
|
||
|
{
|
||
|
CUtlVector<char> data;
|
||
|
data.SetSize( portalbytes + 128 );
|
||
|
|
||
|
DWORD waitTime = 0;
|
||
|
while ( WaitForSingleObject( g_MCThreadExitEvent.GetEventHandle(), waitTime ) != WAIT_OBJECT_0 )
|
||
|
{
|
||
|
CIPAddr ipFrom;
|
||
|
int len = g_pPortalMCSocket->RecvFrom( data.Base(), data.Count(), &ipFrom );
|
||
|
if ( len == -1 )
|
||
|
{
|
||
|
waitTime = 20;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
// These lengths must match exactly what is sent in ReceivePortalFlow.
|
||
|
if ( len == 2 + sizeof( g_PortalMCThreadUniqueID ) + sizeof( int ) + portalbytes )
|
||
|
{
|
||
|
// Perform more validation...
|
||
|
if ( data[0] == VMPI_VVIS_PACKET_ID && data[1] == VMPI_PORTALFLOW_RESULTS )
|
||
|
{
|
||
|
if ( *((unsigned long*)&data[2]) == g_PortalMCThreadUniqueID )
|
||
|
{
|
||
|
int iWorkUnit = *((int*)&data[6]);
|
||
|
if ( iWorkUnit >= 0 && iWorkUnit < g_numportals*2 )
|
||
|
{
|
||
|
portal_t *p = sorted_portals[iWorkUnit];
|
||
|
if ( p )
|
||
|
{
|
||
|
++g_nMulticastPortalsReceived;
|
||
|
memcpy( p->portalvis, &data[10], portalbytes );
|
||
|
p->status = stat_done;
|
||
|
waitTime = 0;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
|
||
|
void MCThreadCleanupFn()
|
||
|
{
|
||
|
g_MCThreadExitEvent.SetEvent();
|
||
|
}
|
||
|
|
||
|
|
||
|
// --------------------------------------------------------------------------------- //
|
||
|
// Cheesy hack to let them stop the job early and keep the results of what has
|
||
|
// been done so far.
|
||
|
// --------------------------------------------------------------------------------- //
|
||
|
|
||
|
class CVisDistributeWorkCallbacks : public IWorkUnitDistributorCallbacks
|
||
|
{
|
||
|
public:
|
||
|
CVisDistributeWorkCallbacks()
|
||
|
{
|
||
|
m_bExitedEarly = false;
|
||
|
m_iState = STATE_NONE;
|
||
|
}
|
||
|
|
||
|
virtual bool Update()
|
||
|
{
|
||
|
if ( kbhit() )
|
||
|
{
|
||
|
int key = toupper( getch() );
|
||
|
if ( m_iState == STATE_NONE )
|
||
|
{
|
||
|
if ( key == 'M' )
|
||
|
{
|
||
|
m_iState = STATE_AT_MENU;
|
||
|
Warning("\n\n"
|
||
|
"----------------------\n"
|
||
|
"1. Write scratchpad file.\n"
|
||
|
"2. Exit early and use fast vis for remaining portals.\n"
|
||
|
"\n"
|
||
|
"0. Exit menu.\n"
|
||
|
"----------------------\n"
|
||
|
"\n"
|
||
|
);
|
||
|
}
|
||
|
}
|
||
|
else if ( m_iState == STATE_AT_MENU )
|
||
|
{
|
||
|
if ( key == '1' )
|
||
|
{
|
||
|
Warning(
|
||
|
"\n"
|
||
|
"\nWriting scratchpad file."
|
||
|
"\nCommand line: scratchpad3dviewer -file scratch.pad\n"
|
||
|
"\nRed portals are the portals that are fast vis'd."
|
||
|
"\n"
|
||
|
);
|
||
|
m_iState = STATE_NONE;
|
||
|
IScratchPad3D *pPad = ScratchPad3D_Create( "scratch.pad" );
|
||
|
if ( pPad )
|
||
|
{
|
||
|
ScratchPad_DrawWorld( pPad, false );
|
||
|
|
||
|
// Draw the portals that haven't been vis'd.
|
||
|
for ( int i=0; i < g_numportals*2; i++ )
|
||
|
{
|
||
|
portal_t *p = sorted_portals[i];
|
||
|
ScratchPad_DrawWinding( pPad, p->winding->numpoints, p->winding->points, Vector( 1, 0, 0 ), Vector( .3, .3, .3 ) );
|
||
|
}
|
||
|
|
||
|
pPad->Release();
|
||
|
}
|
||
|
}
|
||
|
else if ( key == '2' )
|
||
|
{
|
||
|
// Exit the process early.
|
||
|
m_bExitedEarly = true;
|
||
|
return true;
|
||
|
}
|
||
|
else if ( key == '0' )
|
||
|
{
|
||
|
m_iState = STATE_NONE;
|
||
|
Warning( "\n\nExited menu.\n\n" );
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
public:
|
||
|
enum
|
||
|
{
|
||
|
STATE_NONE,
|
||
|
STATE_AT_MENU
|
||
|
};
|
||
|
|
||
|
bool m_bExitedEarly;
|
||
|
int m_iState; // STATE_ enum.
|
||
|
};
|
||
|
|
||
|
|
||
|
CVisDistributeWorkCallbacks g_VisDistributeWorkCallbacks;
|
||
|
|
||
|
|
||
|
void CheckExitedEarly()
|
||
|
{
|
||
|
if ( g_VisDistributeWorkCallbacks.m_bExitedEarly )
|
||
|
{
|
||
|
Warning( "\nExited early, using fastvis results...\n" );
|
||
|
Warning( "Exited early, using fastvis results...\n" );
|
||
|
|
||
|
// Use the fastvis results for portals that we didn't get results for.
|
||
|
for ( int i=0; i < g_numportals*2; i++ )
|
||
|
{
|
||
|
if ( sorted_portals[i]->status != stat_done )
|
||
|
{
|
||
|
sorted_portals[i]->portalvis = sorted_portals[i]->portalflood;
|
||
|
sorted_portals[i]->status = stat_done;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
//-----------------------------------------
|
||
|
//
|
||
|
// Run PortalFlow across all available processing nodes
|
||
|
//
|
||
|
void RunMPIPortalFlow()
|
||
|
{
|
||
|
Msg( "%-20s ", "MPIPortalFlow:" );
|
||
|
if ( g_bMPIMaster )
|
||
|
StartPacifier("");
|
||
|
|
||
|
// Workers wait until we get the MC socket address.
|
||
|
g_PortalMCThreadUniqueID = StatsDB_GetUniqueJobID();
|
||
|
if ( g_bMPIMaster )
|
||
|
{
|
||
|
CCycleCount cnt;
|
||
|
cnt.Sample();
|
||
|
CUniformRandomStream randomStream;
|
||
|
randomStream.SetSeed( cnt.GetMicroseconds() );
|
||
|
|
||
|
g_PortalMCAddr.port = randomStream.RandomInt( 22000, 25000 ); // Pulled out of something else.
|
||
|
g_PortalMCAddr.ip[0] = (unsigned char)RandomInt( 225, 238 );
|
||
|
g_PortalMCAddr.ip[1] = (unsigned char)RandomInt( 0, 255 );
|
||
|
g_PortalMCAddr.ip[2] = (unsigned char)RandomInt( 0, 255 );
|
||
|
g_PortalMCAddr.ip[3] = (unsigned char)RandomInt( 3, 255 );
|
||
|
|
||
|
g_pPortalMCSocket = CreateIPSocket();
|
||
|
int i=0;
|
||
|
for ( i; i < 5; i++ )
|
||
|
{
|
||
|
if ( g_pPortalMCSocket->BindToAny( randomStream.RandomInt( 20000, 30000 ) ) )
|
||
|
break;
|
||
|
}
|
||
|
if ( i == 5 )
|
||
|
{
|
||
|
Error( "RunMPIPortalFlow: can't open a socket to multicast on." );
|
||
|
}
|
||
|
|
||
|
char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_SUBPACKETID_MC_ADDR };
|
||
|
VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), &g_PortalMCAddr, sizeof( g_PortalMCAddr ), VMPI_PERSISTENT );
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
VMPI_SetCurrentStage( "wait for MC address" );
|
||
|
|
||
|
while ( !g_bGotMCAddr )
|
||
|
{
|
||
|
VMPI_DispatchNextMessage();
|
||
|
}
|
||
|
|
||
|
// Open our multicast receive socket.
|
||
|
g_pPortalMCSocket = CreateMulticastListenSocket( g_PortalMCAddr );
|
||
|
if ( !g_pPortalMCSocket )
|
||
|
{
|
||
|
char err[512];
|
||
|
IP_GetLastErrorString( err, sizeof( err ) );
|
||
|
Error( "RunMPIPortalFlow: CreateMulticastListenSocket failed. (%s).", err );
|
||
|
}
|
||
|
|
||
|
// Make a thread to listen for the data on the multicast socket.
|
||
|
DWORD dwDummy = 0;
|
||
|
g_MCThreadExitEvent.Init( false, false );
|
||
|
|
||
|
// Make sure we kill the MC thread if the app exits ungracefully.
|
||
|
CmdLib_AtCleanup( MCThreadCleanupFn );
|
||
|
|
||
|
g_hMCThread = CreateThread(
|
||
|
NULL,
|
||
|
0,
|
||
|
PortalMCThreadFn,
|
||
|
NULL,
|
||
|
0,
|
||
|
&dwDummy );
|
||
|
|
||
|
if ( !g_hMCThread )
|
||
|
{
|
||
|
Error( "RunMPIPortalFlow: CreateThread failed for multicast receive thread." );
|
||
|
}
|
||
|
}
|
||
|
|
||
|
VMPI_SetCurrentStage( "RunMPIBasePortalFlow" );
|
||
|
|
||
|
|
||
|
g_pDistributeWorkCallbacks = &g_VisDistributeWorkCallbacks;
|
||
|
|
||
|
g_CPUTime.Init();
|
||
|
double elapsed = DistributeWork(
|
||
|
g_numportals * 2, // # work units
|
||
|
VMPI_DISTRIBUTEWORK_PACKETID, // packet ID
|
||
|
ProcessPortalFlow, // Worker function to process work units
|
||
|
ReceivePortalFlow // Master function to receive work results
|
||
|
);
|
||
|
|
||
|
g_pDistributeWorkCallbacks = NULL;
|
||
|
|
||
|
CheckExitedEarly();
|
||
|
|
||
|
// Stop the multicast stuff.
|
||
|
VMPI_DeletePortalMCSocket();
|
||
|
|
||
|
if( !g_bMPIMaster )
|
||
|
{
|
||
|
if ( g_iVMPIVerboseLevel >= 1 )
|
||
|
{
|
||
|
Msg( "Received %d (out of %d) portals from multicast.\n", g_nMulticastPortalsReceived, g_numportals * 2 );
|
||
|
Msg( "%.1f%% CPU utilization during PortalFlow\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads );
|
||
|
}
|
||
|
|
||
|
Msg( "VVIS worker finished. Over and out.\n" );
|
||
|
VMPI_SetCurrentStage( "worker done" );
|
||
|
|
||
|
CmdLib_Exit( 0 );
|
||
|
}
|
||
|
|
||
|
if ( g_bMPIMaster )
|
||
|
{
|
||
|
EndPacifier( false );
|
||
|
Msg( " (%d)\n", (int)elapsed );
|
||
|
}
|
||
|
}
|
||
|
|