You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
815 lines
23 KiB
815 lines
23 KiB
//========= Copyright Valve Corporation, All rights reserved. ============// |
|
// |
|
// Purpose: |
|
// |
|
//=============================================================================// |
|
|
|
#include <winsock2.h> |
|
#include "vmpi_filesystem_internal.h" |
|
#include "threadhelpers.h" |
|
#include "zlib.h" |
|
|
|
|
|
#define NUM_BUFFERED_CHUNK_ACKS 512 |
|
#define ACK_FLUSH_INTERVAL 500 // Flush the ack queue twice per second. |
|
|
|
|
|
static bool g_bReceivedMulticastIP = false; |
|
static CIPAddr g_MulticastIP; |
|
|
|
|
|
CCriticalSection g_FileResponsesCS; |
|
|
|
class CFileResponse |
|
{ |
|
public: |
|
int m_RequestID; |
|
int m_Response; |
|
bool m_bZeroLength; |
|
}; |
|
|
|
CUtlVector<CFileResponse> g_FileResponses; |
|
int g_RequestID = 0; |
|
|
|
|
|
class CFileChunkPacket |
|
{ |
|
public: |
|
int m_Len; |
|
char m_Data[1]; |
|
}; |
|
CUtlLinkedList<CFileChunkPacket*, int> g_FileChunkPackets; // This is also protected by g_FileResponsesCS. |
|
|
|
|
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
// Classes. |
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
|
|
class CWorkerFile |
|
{ |
|
public: |
|
const char* GetFilename() { return m_Filename.Base(); } |
|
const char* GetPathID() { return m_PathID.Base(); } |
|
bool IsReadyToRead() const { return m_nChunksToReceive == 0; } |
|
|
|
|
|
public: |
|
CFastTimer m_Timer; // To see how long it takes to download the file. |
|
|
|
// This has to be sent explicitly as part of the file info or else the protocol |
|
// breaks on empty files. |
|
bool m_bZeroLength; |
|
|
|
// This is false until we get any packets about the file. In the packets, |
|
// we find out what the size is supposed to be. |
|
bool m_bGotCompressedSize; |
|
|
|
// The ID the master uses to refer to this file. |
|
int m_FileID; |
|
|
|
CUtlVector<char> m_Filename; |
|
CUtlVector<char> m_PathID; |
|
|
|
// First data comes in here, then when it's all there, it is inflated into m_UncompressedData. |
|
CUtlVector<char> m_CompressedData; |
|
|
|
// 1 bit for each chunk. |
|
CUtlVector<unsigned char> m_ChunksReceived; |
|
|
|
// When this is zero, the file is done being received and m_UncompressedData is valid. |
|
int m_nChunksToReceive; |
|
CUtlVector<char> m_UncompressedData; |
|
}; |
|
|
|
|
|
|
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
// Global helpers. |
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
|
|
static void RecvMulticastIP( CIPAddr *pAddr ) |
|
{ |
|
while ( !g_bReceivedMulticastIP ) |
|
VMPI_DispatchNextMessage(); |
|
|
|
*pAddr = g_MulticastIP; |
|
} |
|
|
|
|
|
static bool ZLibDecompress( const void *pInput, int inputLen, void *pOut, int outLen ) |
|
{ |
|
if ( inputLen == 0 ) |
|
{ |
|
// Zero-length file? |
|
return true; |
|
} |
|
|
|
z_stream decompressStream; |
|
|
|
// Initialize the decompression stream. |
|
memset( &decompressStream, 0, sizeof( decompressStream ) ); |
|
if ( inflateInit( &decompressStream ) != Z_OK ) |
|
return false; |
|
|
|
// Decompress all this stuff and write it to the file. |
|
decompressStream.next_in = (unsigned char*)pInput; |
|
decompressStream.avail_in = inputLen; |
|
|
|
char *pOutChar = (char*)pOut; |
|
while ( decompressStream.avail_in ) |
|
{ |
|
decompressStream.total_out = 0; |
|
decompressStream.next_out = (unsigned char*)pOutChar; |
|
decompressStream.avail_out = outLen - (pOutChar - (char*)pOut); |
|
|
|
int ret = inflate( &decompressStream, Z_NO_FLUSH ); |
|
if ( ret != Z_OK && ret != Z_STREAM_END ) |
|
return false; |
|
|
|
|
|
pOutChar += decompressStream.total_out; |
|
|
|
if ( ret == Z_STREAM_END ) |
|
{ |
|
if ( (pOutChar - (char*)pOut) == outLen ) |
|
{ |
|
return true; |
|
} |
|
else |
|
{ |
|
Assert( false ); |
|
return false; |
|
} |
|
} |
|
} |
|
|
|
Assert( false ); // Should have gotten to Z_STREAM_END. |
|
return false; |
|
} |
|
|
|
|
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
// CWorkerMulticastListener implementation. |
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
|
|
class CWorkerMulticastListener |
|
{ |
|
public: |
|
CWorkerMulticastListener() |
|
{ |
|
m_nUnfinishedFiles = 0; |
|
} |
|
|
|
~CWorkerMulticastListener() |
|
{ |
|
Term(); |
|
} |
|
|
|
bool Init( const CIPAddr &mcAddr ) |
|
{ |
|
m_MulticastAddr = mcAddr; |
|
m_hMainThread = GetCurrentThread(); |
|
return true; |
|
} |
|
|
|
void Term() |
|
{ |
|
m_WorkerFiles.PurgeAndDeleteElements(); |
|
} |
|
|
|
|
|
CWorkerFile* RequestFileFromServer( const char *pFilename, const char *pPathID ) |
|
{ |
|
Assert( pPathID ); |
|
Assert( FindWorkerFile( pFilename, pPathID ) == NULL ); |
|
|
|
// Send a request to the master to find out if this file even exists. |
|
CCriticalSectionLock csLock( &g_FileResponsesCS ); |
|
csLock.Lock(); |
|
int requestID = g_RequestID++; |
|
csLock.Unlock(); |
|
|
|
unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_REQUEST }; |
|
const void *pChunks[4] = { packetID, &requestID, (void*)pFilename, pPathID }; |
|
int chunkLengths[4] = { sizeof( packetID ), sizeof( requestID ), strlen( pFilename ) + 1, strlen( pPathID ) + 1 }; |
|
VMPI_SendChunks( pChunks, chunkLengths, ARRAYSIZE( pChunks ), 0 ); |
|
|
|
// Wait for the file ID to come back. |
|
CFileResponse response; |
|
response.m_Response = -1; |
|
response.m_bZeroLength = true; |
|
|
|
// We're in a worker thread.. the main thread should be dispatching all the messages, so let it |
|
// do that until we get our response. |
|
while ( 1 ) |
|
{ |
|
bool bGotIt = false; |
|
csLock.Lock(); |
|
for ( int iResponse=0; iResponse < g_FileResponses.Count(); iResponse++ ) |
|
{ |
|
if ( g_FileResponses[iResponse].m_RequestID == requestID ) |
|
{ |
|
response = g_FileResponses[iResponse]; |
|
g_FileResponses.Remove( iResponse ); |
|
bGotIt = true; |
|
break; |
|
} |
|
} |
|
csLock.Unlock(); |
|
|
|
if ( bGotIt ) |
|
break; |
|
|
|
if ( GetCurrentThread() == m_hMainThread ) |
|
VMPI_DispatchNextMessage( 20 ); |
|
else |
|
Sleep( 20 ); |
|
} |
|
|
|
// If we get -1 back, it means the file doesn't exist. |
|
int fileID = response.m_Response; |
|
if ( fileID == -1 ) |
|
return NULL; |
|
|
|
CWorkerFile *pTestFile = new CWorkerFile; |
|
|
|
pTestFile->m_Filename.SetSize( strlen( pFilename ) + 1 ); |
|
strcpy( pTestFile->m_Filename.Base(), pFilename ); |
|
|
|
pTestFile->m_PathID.SetSize( strlen( pPathID ) + 1 ); |
|
strcpy( pTestFile->m_PathID.Base(), pPathID ); |
|
|
|
pTestFile->m_FileID = fileID; |
|
pTestFile->m_nChunksToReceive = 9999; |
|
pTestFile->m_Timer.Start(); |
|
m_WorkerFiles.AddToTail( pTestFile ); |
|
pTestFile->m_bGotCompressedSize = false; |
|
pTestFile->m_bZeroLength = response.m_bZeroLength; |
|
|
|
++m_nUnfinishedFiles; |
|
|
|
return pTestFile; |
|
} |
|
|
|
void FlushAckChunks( unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], int &nChunksToAck, DWORD &lastAckTime ) |
|
{ |
|
if ( nChunksToAck ) |
|
{ |
|
// Tell the master we received this chunk. |
|
unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_CHUNK_RECEIVED }; |
|
void *pChunks[2] = { packetID, chunksToAck }; |
|
int chunkLengths[2] = { sizeof( packetID ), nChunksToAck * 4 }; |
|
VMPI_SendChunks( pChunks, chunkLengths, 2, 0 ); |
|
nChunksToAck = 0; |
|
} |
|
|
|
lastAckTime = GetTickCount(); |
|
} |
|
|
|
void MaybeFlushAckChunks( unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], int &nChunksToAck, DWORD &lastAckTime ) |
|
{ |
|
if ( nChunksToAck && GetTickCount() - lastAckTime > ACK_FLUSH_INTERVAL ) |
|
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); |
|
} |
|
|
|
void AddAckChunk( |
|
unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], |
|
int &nChunksToAck, |
|
DWORD &lastAckTime, |
|
int fileID, |
|
int iChunk ) |
|
{ |
|
chunksToAck[nChunksToAck][0] = (unsigned short)fileID; |
|
chunksToAck[nChunksToAck][1] = (unsigned short)iChunk; |
|
|
|
// TCP filesystem acks all chunks immediately so it'll send more. |
|
++nChunksToAck; |
|
if ( nChunksToAck == NUM_BUFFERED_CHUNK_ACKS || VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP ) |
|
{ |
|
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); |
|
} |
|
} |
|
|
|
// Returns the length of the packet's data or -1 if there is nothing. |
|
int CheckFileChunkPackets( char *data, int dataSize ) |
|
{ |
|
// Using TCP.. pop the next received packet off the stack. |
|
CCriticalSectionLock csLock( &g_FileResponsesCS ); |
|
csLock.Lock(); |
|
if ( g_FileChunkPackets.Count() <= 0 ) |
|
return -1; |
|
|
|
CFileChunkPacket *pPacket = g_FileChunkPackets[ g_FileChunkPackets.Head() ]; |
|
g_FileChunkPackets.Remove( g_FileChunkPackets.Head() ); |
|
|
|
// Yes, this is inefficient, but the amount of data we're handling here is tiny so the |
|
// effect is negligible. |
|
int len; |
|
if ( pPacket->m_Len > dataSize ) |
|
{ |
|
len = -1; |
|
Warning( "CWorkerMulticastListener::ListenFor: Got a section of data too long (%d bytes).", pPacket->m_Len ); |
|
} |
|
else |
|
{ |
|
memcpy( data, pPacket->m_Data, pPacket->m_Len ); |
|
len = pPacket->m_Len; |
|
} |
|
|
|
free( pPacket ); |
|
return len; |
|
} |
|
|
|
void ShowSDKWorkerMsg( const char *pMsg, ... ) |
|
{ |
|
if ( !g_bMPIMaster && VMPI_IsSDKMode() ) |
|
{ |
|
va_list marker; |
|
va_start( marker, pMsg ); |
|
char str[4096]; |
|
V_vsnprintf( str, sizeof( str ), pMsg, marker ); |
|
va_end( marker ); |
|
Msg( "%s", str ); |
|
} |
|
} |
|
|
|
// This is the main function the workers use to pick files out of the multicast stream. |
|
// The app is waiting for a specific file, but we receive and ack any files we can until |
|
// we get the file they're looking for, then we return. |
|
// |
|
// NOTE: ideally, this would be in a thread, but it adds lots of complications and may |
|
// not be worth it. |
|
CWorkerFile* ListenFor( const char *pFilename, const char *pPathID ) |
|
{ |
|
CWorkerFile *pFile = FindWorkerFile( pFilename, pPathID ); |
|
if ( !pFile ) |
|
{ |
|
// Ok, we haven't requested this file yet. Create an entry for it and |
|
// tell the master we'd like this file. |
|
pFile = RequestFileFromServer( pFilename, pPathID ); |
|
if ( !pFile ) |
|
return NULL; |
|
|
|
// If it's zero-length, we can return right now. |
|
if ( pFile->m_bZeroLength ) |
|
{ |
|
--m_nUnfinishedFiles; |
|
return pFile; |
|
} |
|
} |
|
|
|
// Setup a filename to print some debug spew with. |
|
char printableFilename[58]; |
|
if ( V_strlen( pFilename ) > ARRAYSIZE( printableFilename ) - 1 ) |
|
{ |
|
V_strncpy( printableFilename, "[...]", sizeof( printableFilename ) ); |
|
V_strncat( printableFilename, &pFilename[V_strlen(pFilename) - ARRAYSIZE(printableFilename) + 1 + V_strlen(printableFilename)], sizeof( printableFilename ) ); |
|
} |
|
else |
|
{ |
|
V_strncpy( printableFilename, pFilename, sizeof( printableFilename ) ); |
|
} |
|
ShowSDKWorkerMsg( "\rRecv %s (0%%) ", printableFilename ); |
|
int iChunkPayloadSize = VMPI_GetChunkPayloadSize(); |
|
|
|
// Now start listening to the stream. |
|
// Note: no need to setup anything when in TCP mode - we just use the regular |
|
// VMPI dispatch stuff to handle that. |
|
ISocket *pSocket = NULL; |
|
if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_MULTICAST ) |
|
{ |
|
pSocket = CreateMulticastListenSocket( m_MulticastAddr ); |
|
|
|
if ( !pSocket ) |
|
{ |
|
char str[512]; |
|
IP_GetLastErrorString( str, sizeof( str ) ); |
|
Warning( "CreateMulticastListenSocket (%d.%d.%d.%d:%d) failed\n%s\n", EXPAND_ADDR( m_MulticastAddr ), str ); |
|
return NULL; |
|
} |
|
} |
|
else if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST ) |
|
{ |
|
pSocket = CreateIPSocket(); |
|
if ( !pSocket->BindToAny( m_MulticastAddr.port ) ) |
|
{ |
|
pSocket->Release(); |
|
pSocket = NULL; |
|
} |
|
} |
|
|
|
unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2]; |
|
int nChunksToAck = 0; |
|
DWORD lastAckTime = GetTickCount(); |
|
|
|
// Now just receive multicast data until this file has been received. |
|
while ( m_nUnfinishedFiles > 0 ) |
|
{ |
|
char data[MAX_CHUNK_PAYLOAD_SIZE+1024]; |
|
int len = -1; |
|
|
|
if ( pSocket ) |
|
{ |
|
CIPAddr ipFrom; |
|
len = pSocket->RecvFrom( data, sizeof( data ), &ipFrom ); |
|
} |
|
else |
|
{ |
|
len = CheckFileChunkPackets( data, sizeof( data ) ); |
|
} |
|
|
|
if ( len == -1 ) |
|
{ |
|
// Sleep for 10ms and also handle socket errors. |
|
Sleep( 0 ); |
|
VMPI_DispatchNextMessage( 10 ); |
|
continue; |
|
} |
|
|
|
g_nMulticastBytesReceived += len; |
|
|
|
// Alrighty. Figure out what the deal is with this file. |
|
CMulticastFileInfo *pInfo = (CMulticastFileInfo*)data; |
|
int *piChunk = (int*)( pInfo + 1 ); |
|
const char *pTestFilename = (const char*)( piChunk + 1 ); |
|
const char *pPayload = pTestFilename + strlen( pFilename ) + 1; |
|
int payloadLen = len - ( pPayload - data ); |
|
if ( payloadLen < 0 ) |
|
{ |
|
Warning( "CWorkerMulticastListener::ListenFor: invalid packet received on multicast group\n" ); |
|
continue; |
|
} |
|
|
|
|
|
if ( pInfo->m_FileID != pFile->m_FileID ) |
|
continue; |
|
|
|
CWorkerFile *pTestFile = FindWorkerFile( pInfo->m_FileID ); |
|
if ( !pTestFile ) |
|
Error( "FindWorkerFile( %s ) failed\n", pTestFilename ); |
|
|
|
// TODO: reenable this code and disable the if right above here. |
|
// We always get "invalid payload length" errors on the workers when using this, but |
|
// I haven't been able to figure out why yet. |
|
/* |
|
// Put the data into whatever file it belongs in. |
|
if ( !pTestFile ) |
|
{ |
|
pTestFile = RequestFileFromServer( pTestFilename ); |
|
if ( !pTestFile ) |
|
continue; |
|
} |
|
*/ |
|
|
|
// Is this the first packet about this file? |
|
if ( !pTestFile->m_bGotCompressedSize ) |
|
{ |
|
pTestFile->m_bGotCompressedSize = true; |
|
pTestFile->m_CompressedData.SetSize( pInfo->m_CompressedSize ); |
|
pTestFile->m_UncompressedData.SetSize( pInfo->m_UncompressedSize ); |
|
pTestFile->m_ChunksReceived.SetSize( PAD_NUMBER( pInfo->m_nChunks, 8 ) / 8 ); |
|
pTestFile->m_nChunksToReceive = pInfo->m_nChunks; |
|
memset( pTestFile->m_ChunksReceived.Base(), 0, pTestFile->m_ChunksReceived.Count() ); |
|
} |
|
|
|
// Validate the chunk index and uncompressed size. |
|
int iChunk = *piChunk; |
|
if ( iChunk < 0 || iChunk >= pInfo->m_nChunks ) |
|
{ |
|
Error( "ListenFor(): invalid chunk index (%d) for file '%s'\n", iChunk, pTestFilename ); |
|
} |
|
|
|
// Only handle this if we didn't already received the chunk. |
|
if ( !(pTestFile->m_ChunksReceived[iChunk >> 3] & (1 << (iChunk & 7))) ) |
|
{ |
|
// Make sure the file is properly setup to receive the data into. |
|
if ( (int)pInfo->m_UncompressedSize != pTestFile->m_UncompressedData.Count() || |
|
(int)pInfo->m_CompressedSize != pTestFile->m_CompressedData.Count() ) |
|
{ |
|
Error( "ListenFor(): invalid compressed or uncompressed size.\n" |
|
"pInfo = '%s', pTestFile = '%s'\n" |
|
"Compressed (pInfo = %d, pTestFile = %d)\n" |
|
"Uncompressed (pInfo = %d, pTestFile = %d)\n", |
|
pTestFilename, |
|
pTestFile->GetFilename(), |
|
pInfo->m_CompressedSize, |
|
pTestFile->m_CompressedData.Count(), |
|
pInfo->m_UncompressedSize, |
|
pTestFile->m_UncompressedData.Count() |
|
); |
|
} |
|
|
|
int iChunkStart = iChunk * iChunkPayloadSize; |
|
int iChunkEnd = min( iChunkStart + iChunkPayloadSize, pTestFile->m_CompressedData.Count() ); |
|
int chunkLen = iChunkEnd - iChunkStart; |
|
|
|
if ( chunkLen != payloadLen ) |
|
{ |
|
Error( "ListenFor(): invalid payload length for '%s' (%d should be %d)\n" |
|
"pInfo = '%s', pTestFile = '%s'\n" |
|
"Chunk %d out of %d. Compressed size: %d\n", |
|
pTestFile->GetFilename(), |
|
payloadLen, |
|
chunkLen, |
|
pTestFilename, |
|
pTestFile->GetFilename(), |
|
iChunk, |
|
pInfo->m_nChunks, |
|
pInfo->m_CompressedSize |
|
); |
|
} |
|
|
|
memcpy( &pTestFile->m_CompressedData[iChunkStart], pPayload, chunkLen ); |
|
pTestFile->m_ChunksReceived[iChunk >> 3] |= (1 << (iChunk & 7)); |
|
|
|
--pTestFile->m_nChunksToReceive; |
|
|
|
if ( pTestFile == pFile ) |
|
{ |
|
int percent = 100 - (100 * pFile->m_nChunksToReceive) / pInfo->m_nChunks; |
|
ShowSDKWorkerMsg( "\rRecv %s (%d%%) [chunk %d/%d] ", printableFilename, percent, pInfo->m_nChunks - pFile->m_nChunksToReceive, pInfo->m_nChunks ); |
|
} |
|
|
|
// Remember to ack what we received. |
|
AddAckChunk( chunksToAck, nChunksToAck, lastAckTime, pInfo->m_FileID, iChunk ); |
|
|
|
// If we're done receiving the data, unpack it. |
|
if ( pTestFile->m_nChunksToReceive == 0 ) |
|
{ |
|
// Ack the file. |
|
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); |
|
|
|
pTestFile->m_Timer.End(); |
|
|
|
pTestFile->m_UncompressedData.SetSize( pInfo->m_UncompressedSize ); |
|
--m_nUnfinishedFiles; |
|
|
|
if ( !ZLibDecompress( |
|
pTestFile->m_CompressedData.Base(), |
|
pTestFile->m_CompressedData.Count(), |
|
pTestFile->m_UncompressedData.Base(), |
|
pTestFile->m_UncompressedData.Count() ) ) |
|
{ |
|
if ( pSocket ) |
|
pSocket->Release(); |
|
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); |
|
Error( "ZLibDecompress failed.\n" ); |
|
return NULL; |
|
} |
|
|
|
char str[512]; |
|
V_snprintf( str, sizeof( str ), "Got %s (%dk) in %.2fs", |
|
printableFilename, |
|
(pTestFile->m_UncompressedData.Count() + 511) / 1024, |
|
pTestFile->m_Timer.GetDuration().GetSeconds() |
|
); |
|
Msg( "\r%-79s\n", str ); |
|
|
|
// Won't be needing this anymore. |
|
pTestFile->m_CompressedData.Purge(); |
|
} |
|
} |
|
|
|
MaybeFlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); |
|
} |
|
|
|
Assert( pFile->IsReadyToRead() ); |
|
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); |
|
if ( pSocket ) |
|
pSocket->Release(); |
|
|
|
return pFile; |
|
} |
|
|
|
CWorkerFile* FindWorkerFile( const char *pFilename, const char *pPathID ) |
|
{ |
|
FOR_EACH_LL( m_WorkerFiles, i ) |
|
{ |
|
CWorkerFile *pWorkerFile = m_WorkerFiles[i]; |
|
|
|
if ( stricmp( pWorkerFile->GetFilename(), pFilename ) == 0 && stricmp( pWorkerFile->GetPathID(), pPathID ) == 0 ) |
|
return pWorkerFile; |
|
} |
|
return NULL; |
|
} |
|
|
|
CWorkerFile* FindWorkerFile( int fileID ) |
|
{ |
|
FOR_EACH_LL( m_WorkerFiles, i ) |
|
{ |
|
if ( m_WorkerFiles[i]->m_FileID == fileID ) |
|
return m_WorkerFiles[i]; |
|
} |
|
return NULL; |
|
} |
|
|
|
|
|
private: |
|
CIPAddr m_MulticastAddr; |
|
|
|
CUtlLinkedList<CWorkerFile*, int> m_WorkerFiles; |
|
|
|
HANDLE m_hMainThread; |
|
|
|
// How many files do we have open that we haven't finished receiving from the server yet? |
|
// We always keep waiting for data until this is zero. |
|
int m_nUnfinishedFiles; |
|
}; |
|
|
|
|
|
|
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
// CWorkerVMPIFileSystem implementation. |
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
|
|
class CWorkerVMPIFileSystem : public CBaseVMPIFileSystem |
|
{ |
|
public: |
|
InitReturnVal_t Init(); |
|
virtual void Term(); |
|
|
|
virtual FileHandle_t Open( const char *pFilename, const char *pOptions, const char *pathID ); |
|
virtual bool HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID ); |
|
|
|
virtual void CreateVirtualFile( const char *pFilename, const void *pData, int fileLength ); |
|
virtual long GetFileTime( const char *pFileName, const char *pathID ); |
|
virtual bool IsFileWritable( const char *pFileName, const char *pPathID ); |
|
virtual bool SetFileWritable( char const *pFileName, bool writable, const char *pPathID ); |
|
|
|
virtual CSysModule *LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly ); |
|
virtual void UnloadModule( CSysModule *pModule ); |
|
|
|
private: |
|
CWorkerMulticastListener m_Listener; |
|
}; |
|
|
|
|
|
CBaseVMPIFileSystem* CreateWorkerVMPIFileSystem() |
|
{ |
|
CWorkerVMPIFileSystem *pRet = new CWorkerVMPIFileSystem; |
|
g_pBaseVMPIFileSystem = pRet; |
|
if ( pRet->Init() ) |
|
{ |
|
return pRet; |
|
} |
|
else |
|
{ |
|
delete pRet; |
|
g_pBaseVMPIFileSystem = NULL; |
|
return NULL; |
|
} |
|
} |
|
|
|
|
|
InitReturnVal_t CWorkerVMPIFileSystem::Init() |
|
{ |
|
// Get the multicast addr to listen on. |
|
CIPAddr mcAddr; |
|
RecvMulticastIP( &mcAddr ); |
|
|
|
return m_Listener.Init( mcAddr ) ? INIT_OK : INIT_FAILED; |
|
} |
|
|
|
|
|
void CWorkerVMPIFileSystem::Term() |
|
{ |
|
m_Listener.Term(); |
|
} |
|
|
|
|
|
FileHandle_t CWorkerVMPIFileSystem::Open( const char *pFilename, const char *pOptions, const char *pathID ) |
|
{ |
|
Assert( g_bUseMPI ); |
|
|
|
// When it finally asks the filesystem for a file, it'll pass NULL for pathID if it's "". |
|
if ( !pathID ) |
|
pathID = ""; |
|
|
|
if ( g_bDisableFileAccess ) |
|
Error( "Open( %s, %s ) - file access has been disabled.", pFilename, pOptions ); |
|
|
|
// Workers can't open anything for write access. |
|
bool bWriteAccess = (Q_stristr( pOptions, "w" ) != 0); |
|
if ( bWriteAccess ) |
|
return FILESYSTEM_INVALID_HANDLE; |
|
|
|
// Do we have this file's data already? |
|
CWorkerFile *pFile = m_Listener.FindWorkerFile( pFilename, pathID ); |
|
if ( !pFile || !pFile->IsReadyToRead() ) |
|
{ |
|
// Ok, start listening to the multicast stream until we get the file we want. |
|
|
|
// NOTE: it might make sense here to have the client ask for a list of ALL the files that |
|
// the master currently has and wait to receive all of them (so we don't come back a bunch |
|
// of times and listen |
|
|
|
// NOTE NOTE: really, the best way to do this is to have a thread on the workers that sits there |
|
// and listens to the multicast stream. Any time the master opens a new file up, it assumes |
|
// all the workers need the file, and it starts to send it on the multicast stream until |
|
// the worker threads respond that they all have it. |
|
// |
|
// (NOTE: this probably means that the clients would have to ack the chunks on a UDP socket that |
|
// the thread owns). |
|
// |
|
// This would simplify all the worries about a client missing half the stream and having to |
|
// wait for another cycle through it. |
|
pFile = m_Listener.ListenFor( pFilename, pathID ); |
|
|
|
if ( !pFile ) |
|
{ |
|
return FILESYSTEM_INVALID_HANDLE; |
|
} |
|
} |
|
|
|
// Ok! Got the file. now setup a memory stream they can read out of it with. |
|
CVMPIFile_Memory *pOut = new CVMPIFile_Memory; |
|
pOut->Init( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), strchr( pOptions, 't' ) ? 't' : 'b' ); |
|
return (FileHandle_t)pOut; |
|
} |
|
|
|
|
|
void CWorkerVMPIFileSystem::CreateVirtualFile( const char *pFilename, const void *pData, int fileLength ) |
|
{ |
|
Error( "CreateVirtualFile not supported in VMPI worker filesystem." ); |
|
} |
|
|
|
|
|
long CWorkerVMPIFileSystem::GetFileTime( const char *pFileName, const char *pathID ) |
|
{ |
|
Error( "GetFileTime not supported in VMPI worker filesystem." ); |
|
return 0; |
|
} |
|
|
|
|
|
bool CWorkerVMPIFileSystem::IsFileWritable( const char *pFileName, const char *pPathID ) |
|
{ |
|
Error( "GetFileTime not supported in VMPI worker filesystem." ); |
|
return false; |
|
} |
|
|
|
|
|
bool CWorkerVMPIFileSystem::SetFileWritable( char const *pFileName, bool writable, const char *pPathID ) |
|
{ |
|
Error( "GetFileTime not supported in VMPI worker filesystem." ); |
|
return false; |
|
} |
|
|
|
bool CWorkerVMPIFileSystem::HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID ) |
|
{ |
|
// Handle this packet. |
|
int subPacketID = pBuf->data[1]; |
|
switch( subPacketID ) |
|
{ |
|
case VMPI_FSPACKETID_MULTICAST_ADDR: |
|
{ |
|
char *pInPos = &pBuf->data[2]; |
|
|
|
g_MulticastIP = *((CIPAddr*)pInPos); |
|
pInPos += sizeof( g_MulticastIP ); |
|
|
|
g_bReceivedMulticastIP = true; |
|
} |
|
return true; |
|
|
|
case VMPI_FSPACKETID_FILE_RESPONSE: |
|
{ |
|
CCriticalSectionLock csLock( &g_FileResponsesCS ); |
|
csLock.Lock(); |
|
|
|
CFileResponse res; |
|
res.m_RequestID = *((int*)&pBuf->data[2]); |
|
res.m_Response = *((int*)&pBuf->data[6]); |
|
res.m_bZeroLength = *((bool*)&pBuf->data[10]); |
|
|
|
g_FileResponses.AddToTail( res ); |
|
} |
|
return true; |
|
|
|
case VMPI_FSPACKETID_FILE_CHUNK: |
|
{ |
|
int nDataBytes = pBuf->getLen() - 2; |
|
|
|
CFileChunkPacket *pPacket = (CFileChunkPacket*)malloc( sizeof( CFileChunkPacket ) + nDataBytes - 1 ); |
|
memcpy( pPacket->m_Data, &pBuf->data[2], nDataBytes ); |
|
pPacket->m_Len = nDataBytes; |
|
|
|
CCriticalSectionLock csLock( &g_FileResponsesCS ); |
|
csLock.Lock(); |
|
g_FileChunkPackets.AddToTail( pPacket ); |
|
} |
|
return true; |
|
|
|
default: |
|
return false; |
|
} |
|
} |
|
|
|
CSysModule* CWorkerVMPIFileSystem::LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly ) |
|
{ |
|
return Sys_LoadModule( pFileName ); |
|
} |
|
|
|
void CWorkerVMPIFileSystem::UnloadModule( CSysModule *pModule ) |
|
{ |
|
Sys_UnloadModule( pModule ); |
|
}
|
|
|