You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1606 lines
44 KiB
1606 lines
44 KiB
//========= Copyright Valve Corporation, All rights reserved. ============// |
|
// |
|
// Purpose: |
|
// |
|
//=============================================================================// |
|
|
|
#include <winsock2.h> |
|
#include "vmpi_filesystem_internal.h" |
|
#include "zlib.h" |
|
#include "vstdlib/random.h" |
|
|
|
|
|
#define MINIMUM_SLEEP_MS 1 |
|
|
|
// NOTE: This number comes from measurements on our network to find out how fast |
|
// we can broadcast without the network freaking out. |
|
// |
|
// This number can be changed on the command line with the -mpi_FileTransmitRate parameter. |
|
int MULTICAST_TRANSMIT_RATE = (1024*1000); // N megs per second |
|
|
|
// Defines when we'll stop transmitting a file to a client. |
|
// (After we've transmitted the file to the client N times and we haven't heard an ack back for M seconds). |
|
#define MIN_FILE_CYCLE_COUNT 5 |
|
#define CLIENT_FILE_ACK_TIMEOUT 20 |
|
|
|
|
|
|
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
// Global helpers. |
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
|
|
static void SendMulticastIP( const CIPAddr *pAddr ) |
|
{ |
|
unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_MULTICAST_ADDR }; |
|
VMPI_Send2Chunks( |
|
packetID, sizeof( packetID ), |
|
pAddr, sizeof( *pAddr ), |
|
VMPI_PERSISTENT ); |
|
} |
|
|
|
|
|
static bool IsOpeningForWriteAccess( const char *pOptions ) |
|
{ |
|
return strchr( pOptions, 'w' ) || strchr( pOptions, 'a' ) || strchr( pOptions, '+' ); |
|
} |
|
|
|
|
|
// This does a fast zlib compression of the source data into the 'out' buffer. |
|
static bool ZLibCompress( const void *pData, int len, CUtlVector<char> &out ) |
|
{ |
|
if ( len == 0 ) |
|
{ |
|
out.Purge(); |
|
return true; |
|
} |
|
|
|
int outStartLen = len; |
|
RETRY:; |
|
|
|
// Prepare the compression stream. |
|
z_stream zs; |
|
memset( &zs, 0, sizeof( zs ) ); |
|
|
|
if ( deflateInit( &zs, 1 ) != Z_OK ) |
|
return false; |
|
|
|
|
|
// Now compress it into the output buffer. |
|
out.SetSize( outStartLen ); |
|
|
|
zs.next_in = (unsigned char*)pData; |
|
zs.avail_in = len; |
|
|
|
zs.next_out = (unsigned char*)out.Base(); |
|
zs.avail_out = out.Count(); |
|
|
|
int ret = deflate( &zs, Z_FINISH ); |
|
deflateEnd( &zs ); |
|
|
|
if ( ret == Z_STREAM_END ) |
|
{ |
|
// Get rid of whatever was left over. |
|
out.RemoveMultiple( zs.total_out, out.Count() - zs.total_out ); |
|
return true; |
|
} |
|
else if ( ret == Z_OK ) |
|
{ |
|
// Need more space in the output buffer. |
|
outStartLen += 1024 * 128; |
|
goto RETRY; |
|
} |
|
else |
|
{ |
|
return false; |
|
} |
|
} |
|
|
|
|
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
// CVMPIFile_PassThru |
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
|
|
class CVMPIFile_PassThru : public IVMPIFile |
|
{ |
|
public: |
|
void Init( IBaseFileSystem *pPassThru, FileHandle_t fp ) |
|
{ |
|
m_pPassThru = pPassThru; |
|
m_fp = fp; |
|
} |
|
|
|
virtual void Close() |
|
{ |
|
m_pPassThru->Close( m_fp ); |
|
delete this; |
|
} |
|
|
|
virtual void Seek( int pos, FileSystemSeek_t seekType ) |
|
{ |
|
m_pPassThru->Seek( m_fp, pos, seekType ); |
|
} |
|
|
|
virtual unsigned int Tell() |
|
{ |
|
return m_pPassThru->Tell( m_fp ); |
|
} |
|
|
|
virtual unsigned int Size() |
|
{ |
|
return m_pPassThru->Size( m_fp ); |
|
} |
|
|
|
virtual void Flush() |
|
{ |
|
m_pPassThru->Flush( m_fp ); |
|
} |
|
|
|
virtual int Read( void* pOutput, int size ) |
|
{ |
|
return m_pPassThru->Read( pOutput, size, m_fp ); |
|
} |
|
|
|
virtual int Write( void const* pInput, int size ) |
|
{ |
|
return m_pPassThru->Write( pInput, size, m_fp ); |
|
} |
|
|
|
|
|
private: |
|
IBaseFileSystem *m_pPassThru; |
|
FileHandle_t m_fp; |
|
}; |
|
|
|
|
|
|
|
// ---------------------------------------------------------------------------------------------------- // |
|
// CTransmitRateMgr coordinates with any other currently-running VMPI jobs, and they all will cut |
|
// down their transmission rate to stay within MULTICAST_TRANSMIT_RATE. |
|
// ---------------------------------------------------------------------------------------------------- // |
|
|
|
#define TRANSMITRATEMGR_BROADCAST_INVERVAL (1.0 / 3.0) // How many times per second we broadcast our presence. |
|
#define TRANSMITRATEMGR_EXPIRE_TIME 0.7 // How long it'll go before deciding a guy is not transmitting anymore. |
|
|
|
static char s_cTransmitRateMgrPacket[] = {2,6,-3,2,1,-66}; |
|
|
|
class CTransmitRateMgr |
|
{ |
|
public: |
|
CTransmitRateMgr(); |
|
|
|
void ReadPackets(); |
|
void BroadcastPresence(); |
|
|
|
double GetMicrosecondsPerByte() const; |
|
|
|
private: |
|
class CMachineRecord |
|
{ |
|
public: |
|
unsigned long m_UniqueID; |
|
float m_flLastTime; |
|
}; |
|
CUtlVector<CMachineRecord> m_MachineRecords; |
|
|
|
unsigned long m_UniqueID; |
|
float m_flLastBroadcastTime; |
|
double m_nMicrosecondsPerByte; |
|
ISocket *m_pSocket; |
|
}; |
|
|
|
CTransmitRateMgr::CTransmitRateMgr() |
|
{ |
|
m_nMicrosecondsPerByte = 1000000.0 / (double)MULTICAST_TRANSMIT_RATE; |
|
m_flLastBroadcastTime = 0; |
|
|
|
// Build a (hopefully) unique ID. |
|
m_UniqueID = (unsigned long)this; |
|
CCycleCount cnt; |
|
cnt.Sample(); |
|
m_UniqueID += cnt.GetMicroseconds(); |
|
Sleep( 1 ); |
|
m_UniqueID += cnt.GetMicroseconds(); |
|
|
|
m_pSocket = CreateIPSocket(); |
|
if ( m_pSocket ) |
|
{ |
|
m_pSocket->BindToAny( VMPI_MASTER_FILESYSTEM_BROADCAST_PORT ); |
|
} |
|
} |
|
|
|
void CTransmitRateMgr::ReadPackets() |
|
{ |
|
if ( !m_pSocket ) |
|
return; |
|
|
|
float flCurTime = Plat_FloatTime(); |
|
|
|
// First, update/add records. |
|
while ( 1 ) |
|
{ |
|
char data[512]; |
|
CIPAddr ipFrom; |
|
int len = m_pSocket->RecvFrom( data, sizeof( data ), &ipFrom ); |
|
if ( len == -1 ) |
|
break; |
|
|
|
if ( len == sizeof( s_cTransmitRateMgrPacket ) + sizeof( unsigned long ) && |
|
memcmp( data, s_cTransmitRateMgrPacket, sizeof( s_cTransmitRateMgrPacket ) ) == 0 ) |
|
{ |
|
unsigned long id = *((unsigned long*)&data[sizeof(s_cTransmitRateMgrPacket)]); |
|
if ( id == m_UniqueID ) |
|
continue; |
|
|
|
int i; |
|
for ( i=0; i < m_MachineRecords.Count(); i++ ) |
|
{ |
|
if ( m_MachineRecords[i].m_UniqueID == id ) |
|
{ |
|
m_MachineRecords[i].m_flLastTime = flCurTime; |
|
break; |
|
} |
|
} |
|
|
|
if ( i == m_MachineRecords.Count() ) |
|
{ |
|
int index = m_MachineRecords.AddToTail(); |
|
m_MachineRecords[index].m_UniqueID = id; |
|
m_MachineRecords[index].m_flLastTime = flCurTime; |
|
} |
|
} |
|
} |
|
|
|
// Now, expire any old records. |
|
for ( int i=0; i < m_MachineRecords.Count(); i++ ) |
|
{ |
|
if ( (flCurTime - m_MachineRecords[i].m_flLastTime) > TRANSMITRATEMGR_EXPIRE_TIME ) |
|
{ |
|
m_MachineRecords.Remove( i ); |
|
--i; |
|
} |
|
} |
|
|
|
// Recalculate our transmit rate (assuming we're receiving our own broadcast packets). |
|
m_nMicrosecondsPerByte = 1000000.0 / (double)(MULTICAST_TRANSMIT_RATE / (m_MachineRecords.Count() + 1)); |
|
} |
|
|
|
void CTransmitRateMgr::BroadcastPresence() |
|
{ |
|
if ( !m_pSocket ) |
|
return; |
|
|
|
float flCurTime = Plat_FloatTime(); |
|
if ( (flCurTime - m_flLastBroadcastTime) < TRANSMITRATEMGR_BROADCAST_INVERVAL ) |
|
return; |
|
|
|
m_flLastBroadcastTime = flCurTime; |
|
|
|
char cData[sizeof( s_cTransmitRateMgrPacket ) + sizeof( unsigned long )]; |
|
memcpy( cData, s_cTransmitRateMgrPacket, sizeof( s_cTransmitRateMgrPacket ) ); |
|
*((unsigned long*)&cData[ sizeof( s_cTransmitRateMgrPacket ) ] ) = m_UniqueID; |
|
|
|
m_pSocket->Broadcast( cData, sizeof( cData ), VMPI_MASTER_FILESYSTEM_BROADCAST_PORT ); |
|
} |
|
|
|
inline double CTransmitRateMgr::GetMicrosecondsPerByte() const |
|
{ |
|
return m_nMicrosecondsPerByte; |
|
} |
|
|
|
|
|
// ---------------------------------------------------------------------------------------------------- // |
|
// CRateLimiter manages waiting for small periods of time between packets so the rate is |
|
// whatever we want it to be. |
|
// |
|
// It also will give up some CPU time to other processes every 50 milliseconds. |
|
// ---------------------------------------------------------------------------------------------------- // |
|
|
|
class CRateLimiter |
|
{ |
|
public: |
|
|
|
CRateLimiter(); |
|
|
|
void GiveUpTimeSlice(); |
|
void NoteExcessTimeTaken( unsigned long excessTimeInMicroseconds ); |
|
|
|
|
|
public: |
|
|
|
DWORD m_SleepIntervalMS; // Give up a timeslice every N milliseconds. |
|
|
|
// Since we sleep once in a while, we time how long the sleep took and we beef |
|
// up the transmit rate until we've accounted for the time lost during the sleep. |
|
DWORD m_AccumulatedSleepMicroseconds; |
|
|
|
// When was the last time we gave up a little bit of CPU to other programs. |
|
CCycleCount m_LastSleepTime; |
|
}; |
|
|
|
CRateLimiter::CRateLimiter() |
|
{ |
|
m_SleepIntervalMS = 50; |
|
m_AccumulatedSleepMicroseconds = 0; |
|
m_LastSleepTime.Sample(); |
|
} |
|
|
|
void CRateLimiter::GiveUpTimeSlice() |
|
{ |
|
// Sleep again? |
|
CCycleCount currentTime, dtSinceLastSleep; |
|
currentTime.Sample(); |
|
CCycleCount::Sub( currentTime, m_LastSleepTime, dtSinceLastSleep ); |
|
|
|
if ( dtSinceLastSleep.GetMilliseconds() >= m_SleepIntervalMS ) |
|
{ |
|
CFastTimer sleepTimer; |
|
|
|
sleepTimer.Start(); |
|
Sleep( 10 ); |
|
sleepTimer.End(); |
|
|
|
m_AccumulatedSleepMicroseconds += sleepTimer.GetDuration().GetMicroseconds(); |
|
m_LastSleepTime.Sample(); |
|
} |
|
} |
|
|
|
|
|
void CRateLimiter::NoteExcessTimeTaken( unsigned long excessTimeInMicroseconds ) |
|
{ |
|
// Note: we give up time slices above. |
|
if ( excessTimeInMicroseconds > m_AccumulatedSleepMicroseconds ) |
|
{ |
|
excessTimeInMicroseconds -= m_AccumulatedSleepMicroseconds; |
|
m_AccumulatedSleepMicroseconds = 0; |
|
|
|
CCycleCount startCount; |
|
startCount.Sample(); |
|
while ( 1 ) |
|
{ |
|
CCycleCount curCount, diff; |
|
curCount.Sample(); |
|
|
|
CCycleCount::Sub( curCount, startCount, diff ); |
|
if ( diff.GetMicroseconds() >= excessTimeInMicroseconds ) |
|
break; |
|
} |
|
} |
|
else |
|
{ |
|
m_AccumulatedSleepMicroseconds -= excessTimeInMicroseconds; |
|
excessTimeInMicroseconds = 0; |
|
} |
|
} |
|
|
|
|
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
// CMasterMulticastThread. |
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
|
|
class CMasterMulticastThread |
|
{ |
|
public: |
|
|
|
CMasterMulticastThread(); |
|
~CMasterMulticastThread(); |
|
|
|
// This creates the socket and starts the thread (initially in an idle state since it doesn't |
|
// know of any files anyone wants). |
|
bool Init( IBaseFileSystem *pPassThru, unsigned short localPort, const CIPAddr *pAddr, int maxFileSystemMemoryUsage ); |
|
void Term(); |
|
|
|
// Returns -1 if there is an error. |
|
int FindOrAddFile( const char *pFilename, const char *pPathID ); |
|
const CUtlVector<char>& GetFileData( int iFile ) const; |
|
|
|
// When a client requests a files, this is called to tell the thread to start |
|
// adding chunks from the specified file into the queue it's multicasting. |
|
// |
|
// Returns -1 if the file isn't there. Otherwise, it returns the file ID |
|
// that will be sent along with the file's chunks in the multicast packets. |
|
int AddFileRequest( const char *pFilename, const char *pPathID, int clientID, bool *bZeroLength ); |
|
|
|
// As each client receives multicasted chunks, they ack them so the master can |
|
// stop transmitting any chunks it knows nobody wants. |
|
void OnChunkReceived( int fileID, int clientID, int iChunk ); |
|
void OnFileReceived( int fileID, int clientID ); |
|
|
|
// Call this if a client disconnects so it can stop sending anything this client wants. |
|
void OnClientDisconnect( int clientID, bool bGrabCriticalSection=true ); |
|
|
|
void CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength ); |
|
|
|
private: |
|
|
|
class CChunkInfo |
|
{ |
|
public: |
|
unsigned short m_iChunk; |
|
unsigned short m_RefCount; // How many clients want this chunk. |
|
unsigned short m_iActiveChunksIndex; // Index into m_ActiveChunks. |
|
}; |
|
|
|
|
|
// This stores a client's reference to a file so it knows which pieces of the file the client needs. |
|
class CClientFileInfo |
|
{ |
|
public: |
|
bool NeedsChunk( int i ) const { return (m_ChunksToSend[i>>3] & (1 << (i&7))) != 0; } |
|
|
|
public: |
|
int m_ClientID; |
|
CUtlVector<unsigned char> m_ChunksToSend; // One bit for each chunk that this client still wants. |
|
int m_nChunksLeft; |
|
|
|
// TCP transmission only. |
|
int m_TCP_LastChunkAcked; |
|
int m_TCP_LastChunkSent; |
|
|
|
float m_flTransmitStartTime; |
|
|
|
float m_flLastAckTime; // Last time we heard an ack back from this client about this file. |
|
// If this goes for too long, then we assume that the client is |
|
// in a screwed state, and we stop sending the file to him. |
|
int m_nTimesFileCycled; // How many times has the master multicast thread cycled over this file? |
|
// We won't kick the client until we've cycled over the file a few times |
|
// after the client asked for it. |
|
}; |
|
|
|
|
|
class CMulticastFile |
|
{ |
|
public: |
|
~CMulticastFile() |
|
{ |
|
m_Clients.PurgeAndDeleteElements(); |
|
} |
|
|
|
const char* GetFilename() { return m_Filename.Base(); } |
|
const char* GetPathID() { return m_PathID.Base(); } |
|
|
|
|
|
public: |
|
int m_nCycles; // How many times has the multicast thread visited this file? |
|
|
|
// This is sent along with every packet. If a client gets a chunk and doesn't have that file's |
|
// info, the client will receive that file too. |
|
CUtlVector<char> m_Filename; |
|
CUtlVector<char> m_PathID; |
|
|
|
CMulticastFileInfo m_Info; |
|
|
|
// This is stored so the app can read out the uncompressed data. |
|
CUtlVector<char> m_UncompressedData; |
|
|
|
// zlib-compressed file data |
|
CUtlVector<char> m_Data; |
|
|
|
// This gets set to false if we run over our memory limit and start caching file data out. |
|
// Then it'll reload the data if a client requests the file. |
|
bool m_bDataLoaded; |
|
|
|
// m_Chunks holds the chunks by index. |
|
// m_ActiveChunks holds them sorted by whether they're active or not. |
|
// |
|
// Each chunk has a refcount. While the refcount is > 0, the chunk is in the first |
|
// half of m_ActiveChunks. When the refcount gets to 0, the chunk is moved to the end of |
|
// m_ActiveChunks. That way, we can iterate through the chunks that need to be sent and |
|
// stop iterating the first time we hit one with a refcount of 0. |
|
CUtlVector<CChunkInfo> m_Chunks; |
|
CUtlLinkedList<CChunkInfo*,int> m_ActiveChunks; |
|
|
|
// This tells which clients want pieces of this file. |
|
CUtlLinkedList<CClientFileInfo*,int> m_Clients; |
|
}; |
|
|
|
|
|
private: |
|
|
|
static DWORD WINAPI StaticMulticastThread( LPVOID pParameter ); |
|
DWORD MulticastThread(); |
|
|
|
bool CheckClientTimeouts(); |
|
bool Thread_SendFileChunk_Multicast( int *pnBytesSent ); |
|
void Thread_SeekToNextActiveChunk(); |
|
|
|
// In TCP mode, we send new chunks as they are acked. |
|
void TCP_SendNextChunk( CMulticastFile *pFile, CClientFileInfo *pClient ); |
|
|
|
void EnsureMemoryLimit( CMulticastFile *pIgnore ); |
|
|
|
// Called after pFile->m_UncompressedData has been setup. This compresses the data, prepares the header, |
|
// copies the filename, and adds it into the queue for the multicast thread. |
|
int FinishFileSetup( CMulticastFile *pFile, const char *pFilename, const char *pPathID, bool bFileAlreadyExisted ); |
|
|
|
void IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk ); |
|
void DecrementChunkRefCount( int iFile, int iChunk ); |
|
|
|
int FindFile( const char *pFilename, const char *pPathID ); |
|
|
|
bool FindWarningSuppression( const char *pFilename ); |
|
void AddWarningSuppression( const char *pFilename ); |
|
|
|
private: |
|
|
|
CUtlLinkedList<CMulticastFile*,int> m_Files; |
|
|
|
unsigned long m_nCurMemoryUsage; // Total of all the file data we have loaded. |
|
unsigned long m_nMaxMemoryUsage; // 0 means that there is no limit. |
|
|
|
// This tracks how many chunks we have that want to be sent. |
|
int m_nTotalActiveChunks; |
|
|
|
SOCKET m_Socket; |
|
sockaddr_in m_MulticastAddr; |
|
|
|
HANDLE m_hMainThread; |
|
IBaseFileSystem *m_pPassThru; |
|
|
|
HANDLE m_hThread; |
|
CRITICAL_SECTION m_CS; |
|
|
|
// Events used to communicate with our thread. |
|
HANDLE m_hTermEvent; |
|
|
|
// The thread walks through this as it spews chunks of data. |
|
volatile int m_iCurFile; // Index into m_Files. |
|
volatile int m_iCurActiveChunk; // Current index into CMulticastFile::m_ActiveChunks. |
|
|
|
CUtlLinkedList<char*,int> m_WarningSuppressions; |
|
}; |
|
|
|
|
|
CMasterMulticastThread::CMasterMulticastThread() |
|
{ |
|
m_hThread = m_hMainThread = NULL; |
|
m_Socket = INVALID_SOCKET; |
|
m_nTotalActiveChunks = 0; |
|
m_iCurFile = m_iCurActiveChunk = -1; |
|
m_pPassThru = NULL; |
|
|
|
m_hTermEvent = CreateEvent( NULL, FALSE, FALSE, NULL ); |
|
InitializeCriticalSection( &m_CS ); |
|
m_nCurMemoryUsage = m_nMaxMemoryUsage = 0; |
|
} |
|
|
|
|
|
CMasterMulticastThread::~CMasterMulticastThread() |
|
{ |
|
Term(); |
|
|
|
CloseHandle( m_hTermEvent ); |
|
|
|
DeleteCriticalSection( &m_CS ); |
|
} |
|
|
|
|
|
bool CMasterMulticastThread::Init( IBaseFileSystem *pPassThru, unsigned short localPort, const CIPAddr *pAddr, int maxMemoryUsage ) |
|
{ |
|
Term(); |
|
|
|
m_nMaxMemoryUsage = maxMemoryUsage; |
|
Assert( m_nCurMemoryUsage == 0 ); |
|
m_nCurMemoryUsage = 0; |
|
|
|
if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP ) |
|
{ |
|
// No need for an extra socket in this mode. |
|
m_Socket = INVALID_SOCKET; |
|
} |
|
else |
|
{ |
|
// First, create our socket. |
|
m_Socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_IP ); |
|
if ( m_Socket == INVALID_SOCKET ) |
|
{ |
|
Warning( "CMasterMulticastThread::Init - socket() failed\n" ); |
|
return false; |
|
} |
|
|
|
// Bind to INADDR_ANY. |
|
CIPAddr localAddr( 0, 0, 0, 0, localPort ); |
|
|
|
sockaddr_in addr; |
|
IPAddrToSockAddr( &localAddr, &addr ); |
|
|
|
int status = bind( m_Socket, (sockaddr*)&addr, sizeof(addr) ); |
|
if ( status != 0 ) |
|
{ |
|
Term(); |
|
Warning( "CMasterMulticastThread::Init - bind( %d.%d.%d.%d:%d ) failed\n", EXPAND_ADDR( *pAddr ) ); |
|
return false; |
|
} |
|
|
|
if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST ) |
|
{ |
|
// Set up for broadcast |
|
BOOL bBroadcast = TRUE; |
|
if ( setsockopt( m_Socket, SOL_SOCKET, SO_BROADCAST, (char*)&bBroadcast, bBroadcast ) == SOCKET_ERROR ) |
|
{ |
|
Term(); |
|
Warning( "CMasterMulticastThread::Init - setsockopt() failed to set broadcast mode\n" ); |
|
return false; |
|
} |
|
} |
|
|
|
// Remember the address we want to send to. |
|
IPAddrToSockAddr( pAddr, &m_MulticastAddr ); |
|
|
|
// Now create our thread. |
|
DWORD dwThreadID = 0; |
|
m_hThread = CreateThread( NULL, 0, &CMasterMulticastThread::StaticMulticastThread, this, 0, &dwThreadID ); |
|
if ( !m_hThread ) |
|
{ |
|
Term(); |
|
Warning( "CMasterMulticastThread::Init - CreateThread failed\n" ); |
|
return false; |
|
} |
|
|
|
SetThreadPriority( m_hThread, THREAD_PRIORITY_LOWEST ); |
|
} |
|
|
|
// For debug mode to verify that we don't try to open files while in another thread. |
|
m_hMainThread = GetCurrentThread(); |
|
|
|
m_pPassThru = pPassThru; |
|
return true; |
|
} |
|
|
|
|
|
void CMasterMulticastThread::Term() |
|
{ |
|
// Stop the thread if it is running. |
|
if ( m_hThread ) |
|
{ |
|
SetEvent( m_hTermEvent ); |
|
WaitForSingleObject( m_hThread, INFINITE ); |
|
CloseHandle( m_hThread ); |
|
|
|
m_hThread = NULL; |
|
} |
|
|
|
// Close the socket. |
|
if ( m_Socket != INVALID_SOCKET ) |
|
{ |
|
closesocket( m_Socket ); |
|
m_Socket = INVALID_SOCKET; |
|
} |
|
|
|
// Free up other data. |
|
m_Files.PurgeAndDeleteElements(); |
|
m_nCurMemoryUsage = m_nMaxMemoryUsage = 0; |
|
} |
|
|
|
|
|
void CMasterMulticastThread::TCP_SendNextChunk( CMulticastFile *pFile, CClientFileInfo *pClient ) |
|
{ |
|
// No more chunks to send? |
|
if ( (pClient->m_TCP_LastChunkSent+1) >= pFile->m_Info.m_nChunks ) |
|
return; |
|
|
|
// Figure out what data we'd be sending. |
|
int iChunkToSend = pClient->m_TCP_LastChunkSent + 1; |
|
int iStartByte = iChunkToSend * TCP_CHUNK_PAYLOAD_SIZE; |
|
int iEndByte = min( iStartByte + TCP_CHUNK_PAYLOAD_SIZE, pFile->m_Data.Count() ); |
|
|
|
// If the start point is past the end, then we're done sending the file to this client. |
|
if ( iStartByte >= pFile->m_Data.Count() ) |
|
return; |
|
|
|
// Record that we sent this data. |
|
pClient->m_TCP_LastChunkSent = iChunkToSend; |
|
|
|
// Assemble the packet. |
|
unsigned char cPacket[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_CHUNK }; |
|
|
|
const void *chunks[5] = |
|
{ |
|
cPacket, |
|
&pFile->m_Info, |
|
&iChunkToSend, |
|
pFile->GetFilename(), |
|
&pFile->m_Data[iStartByte] |
|
}; |
|
|
|
int chunkLengths[5] = |
|
{ |
|
sizeof( cPacket ), |
|
sizeof( pFile->m_Info ), |
|
sizeof( m_iCurActiveChunk ), |
|
strlen( pFile->GetFilename() ) + 1, |
|
iEndByte - iStartByte |
|
}; |
|
|
|
VMPI_SendChunks( chunks, chunkLengths, 5, pClient->m_ClientID ); |
|
} |
|
|
|
|
|
int CMasterMulticastThread::AddFileRequest( const char *pFilename, const char *pPathID, int clientID, bool *bZeroLength ) |
|
{ |
|
// Firstly, do we already have this file? |
|
int iFile = FindOrAddFile( pFilename, pPathID ); |
|
if ( iFile == -1 ) |
|
return -1; |
|
|
|
CMulticastFile *pFile = m_Files[iFile]; |
|
|
|
// Now that we have a file setup, merge in this client's info. |
|
EnterCriticalSection( &m_CS ); |
|
|
|
CClientFileInfo *pClient = new CClientFileInfo; |
|
pClient->m_TCP_LastChunkAcked = -1; |
|
pClient->m_TCP_LastChunkSent = -1; |
|
pClient->m_ClientID = clientID; |
|
pClient->m_flLastAckTime = Plat_FloatTime(); |
|
pClient->m_flTransmitStartTime = pClient->m_flLastAckTime; |
|
pClient->m_nTimesFileCycled = 0; |
|
pClient->m_nChunksLeft = pFile->m_Info.m_nChunks; |
|
pClient->m_ChunksToSend.SetSize( PAD_NUMBER( pFile->m_Info.m_nChunks, 8 ) / 8 ); |
|
memset( pClient->m_ChunksToSend.Base(), 0xFF, pClient->m_ChunksToSend.Count() ); |
|
pFile->m_Clients.AddToTail( pClient ); |
|
|
|
for ( int i=0; i < pFile->m_Chunks.Count(); i++ ) |
|
{ |
|
IncrementChunkRefCount( pFile, i ); |
|
} |
|
|
|
// In TCP mode, let's get the sliding window started.. |
|
if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP ) |
|
{ |
|
for ( int iDepth=0; iDepth < TCP_CHUNK_QUEUE_LEN; iDepth++ ) |
|
TCP_SendNextChunk( pFile, pClient ); |
|
} |
|
|
|
LeaveCriticalSection( &m_CS ); |
|
|
|
*bZeroLength = (pFile->m_Info.m_UncompressedSize == 0); |
|
|
|
return iFile; |
|
} |
|
|
|
|
|
void CMasterMulticastThread::OnChunkReceived( int fileID, int clientID, int iChunk ) |
|
{ |
|
if ( !m_Files.IsValidIndex( fileID ) ) |
|
{ |
|
Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID ); |
|
return; |
|
} |
|
|
|
CMulticastFile *pFile = m_Files[fileID]; |
|
CClientFileInfo *pClient = NULL; |
|
FOR_EACH_LL( pFile->m_Clients, iClient ) |
|
{ |
|
if ( pFile->m_Clients[iClient]->m_ClientID == clientID ) |
|
{ |
|
pClient = pFile->m_Clients[iClient]; |
|
break; |
|
} |
|
} |
|
if ( !pClient ) |
|
{ |
|
// This will spam sometimes if a worker stops responding and we timeout on it, |
|
// but then it comes back alive and starts responding. So let's ignore its packets silently. |
|
//Warning( "CMasterMulticastThread::OnChunkReceived: invalid client ID (%d) for file %s\n", clientID, pFile->GetFilename() ); |
|
return; |
|
} |
|
|
|
if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP ) |
|
{ |
|
// Send the next chunk, if there is one. |
|
EnterCriticalSection( &m_CS ); |
|
TCP_SendNextChunk( pFile, pClient ); |
|
LeaveCriticalSection( &m_CS ); |
|
} |
|
else |
|
{ |
|
if ( !pFile->m_Chunks.IsValidIndex( iChunk ) ) |
|
{ |
|
Warning( "CMasterMulticastThread::OnChunkReceived: invalid chunk index (%d) for file %s\n", iChunk, pFile->GetFilename() ); |
|
return; |
|
} |
|
|
|
// Mark that this client doesn't need this chunk anymore. |
|
pClient->m_ChunksToSend[iChunk >> 3] &= ~(1 << (iChunk & 7)); |
|
pClient->m_nChunksLeft--; |
|
|
|
pClient->m_flLastAckTime = Plat_FloatTime(); |
|
if ( pClient->m_nChunksLeft == 0 && g_iVMPIVerboseLevel >= 2 ) |
|
Warning( "Client %d got file %s\n", clientID, pFile->GetFilename() ); |
|
|
|
EnterCriticalSection( &m_CS ); |
|
DecrementChunkRefCount( fileID, iChunk ); |
|
LeaveCriticalSection( &m_CS ); |
|
} |
|
} |
|
|
|
|
|
void CMasterMulticastThread::OnFileReceived( int fileID, int clientID ) |
|
{ |
|
if ( !m_Files.IsValidIndex( fileID ) ) |
|
{ |
|
Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID ); |
|
return; |
|
} |
|
|
|
CMulticastFile *pFile = m_Files[fileID]; |
|
for ( int i=0; i < pFile->m_Info.m_nChunks; i++ ) |
|
OnChunkReceived( fileID, clientID, i ); |
|
} |
|
|
|
|
|
void CMasterMulticastThread::OnClientDisconnect( int clientID, bool bGrabCriticalSection ) |
|
{ |
|
if ( bGrabCriticalSection ) |
|
EnterCriticalSection( &m_CS ); |
|
|
|
// Remove all references from this client. |
|
FOR_EACH_LL( m_Files, iFile ) |
|
{ |
|
CMulticastFile *pFile = m_Files[iFile]; |
|
|
|
FOR_EACH_LL( pFile->m_Clients, iClient ) |
|
{ |
|
CClientFileInfo *pClient = pFile->m_Clients[iClient]; |
|
|
|
if ( pClient->m_ClientID != clientID ) |
|
continue; |
|
|
|
// Ok, this is our man. Decrement the refcount of any chunks this client wanted. |
|
for ( int iChunk=0; iChunk < pFile->m_Info.m_nChunks; iChunk++ ) |
|
{ |
|
if ( pClient->NeedsChunk( iChunk ) ) |
|
{ |
|
DecrementChunkRefCount( iFile, iChunk ); |
|
} |
|
} |
|
|
|
delete pClient; |
|
pFile->m_Clients.Remove( iClient ); |
|
|
|
break; |
|
} |
|
} |
|
|
|
if ( bGrabCriticalSection ) |
|
LeaveCriticalSection( &m_CS ); |
|
} |
|
|
|
|
|
void CMasterMulticastThread::CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength ) |
|
{ |
|
const char *pPathID = VMPI_VIRTUAL_FILES_PATH_ID; |
|
|
|
int iFile = FindFile( pFilename, pPathID ); |
|
if ( iFile != -1 ) |
|
Error( "CMasterMulticastThread::CreateVirtualFile( %s ) - file already exists!", pFilename ); |
|
|
|
CMulticastFile *pFile = new CMulticastFile; |
|
pFile->m_UncompressedData.CopyArray( (const char*)pData, fileLength ); |
|
|
|
FinishFileSetup( pFile, pFilename, pPathID, false ); |
|
} |
|
|
|
|
|
DWORD WINAPI CMasterMulticastThread::StaticMulticastThread( LPVOID pParameter ) |
|
{ |
|
return ((CMasterMulticastThread*)pParameter)->MulticastThread(); |
|
} |
|
|
|
|
|
bool CMasterMulticastThread::CheckClientTimeouts() |
|
{ |
|
bool bRet = false; |
|
CMulticastFile *pFile = m_Files[m_iCurFile]; |
|
|
|
float flCurTime = Plat_FloatTime(); |
|
|
|
int iNext; |
|
for( int iCur=pFile->m_Clients.Head(); iCur != pFile->m_Clients.InvalidIndex(); iCur=iNext ) |
|
{ |
|
iNext = pFile->m_Clients.Next( iCur ); |
|
CClientFileInfo *pInfo = pFile->m_Clients[iCur]; |
|
|
|
// If the client has already fully received this file, don't bother timing out on it. |
|
if ( pInfo->m_nChunksLeft == 0 ) |
|
continue; |
|
|
|
++pInfo->m_nTimesFileCycled; |
|
if ( pInfo->m_nTimesFileCycled >= MIN_FILE_CYCLE_COUNT && (flCurTime - pInfo->m_flLastAckTime) > CLIENT_FILE_ACK_TIMEOUT ) |
|
{ |
|
// For debug output, get the most recent time we heard any ack from this client at all. |
|
float flMostRecentTime = pInfo->m_flLastAckTime; |
|
FOR_EACH_LL( m_Files, iTestFile ) |
|
{ |
|
CMulticastFile *pTestFile = m_Files[iTestFile]; |
|
FOR_EACH_LL( pTestFile->m_Clients, iTestClient ) |
|
{ |
|
if ( pTestFile->m_Clients[iTestClient]->m_ClientID == pInfo->m_ClientID ) |
|
{ |
|
flMostRecentTime = max( flMostRecentTime, pTestFile->m_Clients[iTestClient]->m_flLastAckTime ); |
|
} |
|
} |
|
} |
|
|
|
Warning( "\nClient %s timed out on file %s (latest: %.2f / cur: %.2f).\n", |
|
VMPI_GetMachineName( pInfo->m_ClientID ), pFile->GetFilename(), flMostRecentTime, flCurTime ); |
|
OnClientDisconnect( pInfo->m_ClientID, false ); |
|
bRet = true; // yes, we booted a client. |
|
} |
|
} |
|
|
|
return bRet; |
|
} |
|
|
|
inline bool CMasterMulticastThread::Thread_SendFileChunk_Multicast( int *pnBytesSent ) |
|
{ |
|
// Send the next chunk (file, size, time, chunk data). |
|
CMulticastFile *pFile = m_Files[m_iCurFile]; |
|
|
|
int iStartByte = m_iCurActiveChunk * MULTICAST_CHUNK_PAYLOAD_SIZE; |
|
int iEndByte = min( iStartByte + MULTICAST_CHUNK_PAYLOAD_SIZE, pFile->m_Data.Count() ); |
|
|
|
WSABUF bufs[4]; |
|
bufs[0].buf = (char*)&pFile->m_Info; |
|
bufs[0].len = sizeof( pFile->m_Info ); |
|
|
|
bufs[1].buf = (char*)&m_iCurActiveChunk; |
|
bufs[1].len = sizeof( m_iCurActiveChunk ); |
|
|
|
bufs[2].buf = (char*)pFile->GetFilename(); |
|
bufs[2].len = strlen( pFile->GetFilename() ) + 1; |
|
|
|
bufs[3].buf = &pFile->m_Data[iStartByte]; |
|
bufs[3].len = iEndByte - iStartByte; |
|
|
|
DWORD nBytesSent = 0; |
|
DWORD nWantedBytes = ( bufs[0].len + bufs[1].len + bufs[2].len + bufs[3].len ); |
|
bool bSuccess; |
|
|
|
if ( m_MulticastAddr.sin_addr.S_un.S_un_b.s_b1 == 127 && |
|
m_MulticastAddr.sin_addr.S_un.S_un_b.s_b2 == 0 && |
|
m_MulticastAddr.sin_addr.S_un.S_un_b.s_b3 == 0 && |
|
m_MulticastAddr.sin_addr.S_un.S_un_b.s_b4 == 1 ) |
|
{ |
|
// For some mysterious reason, WSASendTo only sends the first buffer |
|
// if we're sending to 127.0.0.1 (ie: in local mode). |
|
char allData[1024*8]; |
|
if ( nWantedBytes > sizeof( allData ) ) |
|
Error( "nWantedBytes > sizeof( allData )" ); |
|
|
|
memcpy( &allData[0], bufs[0].buf, bufs[0].len ); |
|
memcpy( &allData[bufs[0].len], bufs[1].buf, bufs[1].len ); |
|
memcpy( &allData[bufs[0].len+bufs[1].len], bufs[2].buf, bufs[2].len ); |
|
memcpy( &allData[bufs[0].len+bufs[1].len+bufs[2].len], bufs[3].buf, bufs[3].len ); |
|
int ret = sendto( m_Socket, allData, nWantedBytes, 0, (sockaddr*)&m_MulticastAddr, sizeof( m_MulticastAddr ) ); |
|
bSuccess = (ret == (int)nWantedBytes); |
|
} |
|
else |
|
{ |
|
WSASendTo( |
|
m_Socket, |
|
bufs, |
|
ARRAYSIZE( bufs ), |
|
&nBytesSent, |
|
0, |
|
(sockaddr*)&m_MulticastAddr, |
|
sizeof( m_MulticastAddr ), |
|
NULL, |
|
NULL ); |
|
bSuccess = (nBytesSent == nWantedBytes); |
|
} |
|
|
|
// Handle errors.. let it get a few errors, then quit. |
|
if ( bSuccess ) |
|
{ |
|
*pnBytesSent = (int)nBytesSent; |
|
} |
|
else |
|
{ |
|
static int nWarnings = 0; |
|
++nWarnings; |
|
if ( nWarnings < 10 ) |
|
{ |
|
Warning( "\nMulticastThread: WSASendTo with %d bytes sent %d bytes.\n", nWantedBytes, nBytesSent ); |
|
|
|
char *lpMsgBuf; |
|
if ( FormatMessage( |
|
FORMAT_MESSAGE_ALLOCATE_BUFFER | |
|
FORMAT_MESSAGE_FROM_SYSTEM | |
|
FORMAT_MESSAGE_IGNORE_INSERTS, |
|
NULL, |
|
GetLastError(), |
|
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language |
|
(char*)&lpMsgBuf, |
|
0, |
|
NULL |
|
) ) |
|
{ |
|
Warning( "%s", lpMsgBuf ); |
|
LocalFree( lpMsgBuf ); |
|
} |
|
} |
|
else if ( nWarnings == 10 ) |
|
{ |
|
Warning( "\nThis machine's ability to multicast may be broken. Please reboot and try again.\n" ); |
|
} |
|
} |
|
|
|
return bSuccess; |
|
} |
|
|
|
|
|
void CMasterMulticastThread::Thread_SeekToNextActiveChunk() |
|
{ |
|
// Make sure we're on a valid chunk. |
|
if ( m_iCurFile == -1 ) |
|
{ |
|
Assert( m_Files.Count() > 0 ); |
|
m_iCurFile = m_Files.Head(); |
|
m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head(); |
|
} |
|
|
|
while ( 1 ) |
|
{ |
|
if ( m_iCurActiveChunk == m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() || |
|
m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount == 0 ) |
|
{ |
|
// Now check for client timeouts. |
|
// (This is kicking clients unjustly for some reason.. need to debug). |
|
if ( CheckClientTimeouts() && m_nTotalActiveChunks == 0 ) |
|
break; |
|
|
|
// Finished with that file. Send the next one. |
|
m_iCurFile = m_Files.Next( m_iCurFile ); |
|
if ( m_iCurFile == m_Files.InvalidIndex() ) |
|
m_iCurFile = m_Files.Head(); |
|
|
|
m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head(); |
|
} |
|
|
|
if ( m_iCurActiveChunk != m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() ) |
|
{ |
|
// Only break if we're on an active chunk. |
|
if ( m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount != 0 ) |
|
{ |
|
break; |
|
} |
|
|
|
m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk ); |
|
} |
|
} |
|
} |
|
|
|
|
|
DWORD CMasterMulticastThread::MulticastThread() |
|
{ |
|
CTransmitRateMgr transmitRateMgr; |
|
CRateLimiter rateLimiter; |
|
|
|
|
|
DWORD msToWait = 0; // Only temporarily used if we don't have any data to send. |
|
|
|
while ( WaitForSingleObject( m_hTermEvent, msToWait ) != WAIT_OBJECT_0 ) |
|
{ |
|
rateLimiter.GiveUpTimeSlice(); |
|
msToWait = 0; |
|
|
|
EnterCriticalSection( &m_CS ); |
|
|
|
transmitRateMgr.ReadPackets(); |
|
|
|
// If we have nothing to send then kick back for a while. |
|
if ( m_nTotalActiveChunks == 0 ) |
|
{ |
|
LeaveCriticalSection( &m_CS ); |
|
msToWait = 50; |
|
continue; |
|
} |
|
|
|
// Ok, now we're active, so send out our presence to other CTransmitRateMgrs on the network. |
|
transmitRateMgr.BroadcastPresence(); |
|
|
|
|
|
// We're going to time how long this chunk took to send. |
|
CFastTimer timer; |
|
timer.Start(); |
|
|
|
Thread_SeekToNextActiveChunk(); |
|
|
|
// We have to do this check a second time here because the CheckClientTimeouts() call may have |
|
// booted our last client. If we don't check it here, we might be transmitting |
|
// something we don't want to transmit. Also, if we don't break out of the loop above, |
|
// it can prevent the process from ever exiting because it'll never exit that while() loop. |
|
if ( m_nTotalActiveChunks == 0 ) |
|
{ |
|
LeaveCriticalSection( &m_CS ); |
|
msToWait = 50; |
|
continue; |
|
} |
|
|
|
int nBytesSent = 0; |
|
|
|
bool bSuccess; |
|
bSuccess = Thread_SendFileChunk_Multicast( &nBytesSent ); |
|
|
|
g_nMulticastBytesSent += (int)nBytesSent; |
|
|
|
// Move to the next chunk. |
|
m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk ); |
|
|
|
LeaveCriticalSection( &m_CS ); |
|
|
|
|
|
// Measure how long it took to send this. |
|
timer.End(); |
|
unsigned long timeTaken = timer.GetDuration().GetMicroseconds(); |
|
|
|
|
|
// Measure how long it should have taken. |
|
unsigned long estimatedPacketHeaderSize = 32; |
|
unsigned long optimalTimeTaken = (unsigned long)( transmitRateMgr.GetMicrosecondsPerByte() * (nBytesSent + estimatedPacketHeaderSize) ); |
|
|
|
|
|
// If we went faster than we should have, then wait for the difference in time. |
|
if ( timeTaken < optimalTimeTaken ) |
|
{ |
|
rateLimiter.NoteExcessTimeTaken( optimalTimeTaken - timeTaken ); |
|
} |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
|
|
void CMasterMulticastThread::IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk ) |
|
{ |
|
CChunkInfo *pChunk = &pFile->m_Chunks[iChunk]; |
|
|
|
if ( pChunk->m_RefCount == 0 ) |
|
{ |
|
++m_nTotalActiveChunks; |
|
|
|
// Move the chunk to the head of the list since it is now active. |
|
pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex ); |
|
pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToHead( pChunk ); |
|
} |
|
|
|
pChunk->m_RefCount++; |
|
} |
|
|
|
|
|
void CMasterMulticastThread::DecrementChunkRefCount( int iFile, int iChunk ) |
|
{ |
|
CMulticastFile *pFile = m_Files[iFile]; |
|
CChunkInfo *pChunk = &pFile->m_Chunks[iChunk]; |
|
|
|
if ( pChunk->m_RefCount == 0 ) |
|
{ |
|
Error( "CMasterMulticastThread::DecrementChunkRefCount - refcount already zero!\n" ); |
|
} |
|
|
|
pChunk->m_RefCount--; |
|
if ( pChunk->m_RefCount == 0 ) |
|
{ |
|
--m_nTotalActiveChunks; |
|
|
|
// If this is the current chunk the thread is reading on, seek up to the next chunk so |
|
// the thread doesn't spin off into the next file and skip its current file's contents. |
|
if ( iFile == m_iCurFile && pChunk->m_iActiveChunksIndex == m_iCurActiveChunk ) |
|
{ |
|
m_iCurActiveChunk = pFile->m_ActiveChunks.Next( pChunk->m_iActiveChunksIndex ); |
|
} |
|
|
|
// Move the chunk to the end of the list since it is now inactive. |
|
pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex ); |
|
pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk ); |
|
} |
|
} |
|
|
|
|
|
int CMasterMulticastThread::FindFile( const char *pName, const char *pPathID ) |
|
{ |
|
FOR_EACH_LL( m_Files, i ) |
|
{ |
|
CMulticastFile *pFile = m_Files[i]; |
|
if ( stricmp( pFile->GetFilename(), pName ) == 0 && stricmp( pFile->GetPathID(), pPathID ) == 0 ) |
|
return i; |
|
} |
|
return -1; |
|
} |
|
|
|
|
|
bool CMasterMulticastThread::FindWarningSuppression( const char *pFilename ) |
|
{ |
|
FOR_EACH_LL( m_WarningSuppressions, i ) |
|
{ |
|
if ( Q_stricmp( m_WarningSuppressions[i], pFilename ) == 0 ) |
|
return true; |
|
} |
|
return false; |
|
} |
|
|
|
|
|
void CMasterMulticastThread::AddWarningSuppression( const char *pFilename ) |
|
{ |
|
char *pBlah = new char[ strlen( pFilename ) + 1 ]; |
|
strcpy( pBlah, pFilename ); |
|
m_WarningSuppressions.AddToTail( pBlah ); |
|
} |
|
|
|
|
|
int CMasterMulticastThread::FindOrAddFile( const char *pFilename, const char *pPathID ) |
|
{ |
|
CMulticastFile *pFile = NULL; |
|
bool bFileAlreadyExisted = false; |
|
|
|
// See if we've already opened this file. |
|
int iFile = FindFile( pFilename, pPathID ); |
|
if ( iFile != -1 ) |
|
{ |
|
pFile = m_Files[iFile]; |
|
if ( pFile->m_bDataLoaded ) |
|
{ |
|
return iFile; |
|
} |
|
else |
|
{ |
|
// Ok, we have the file entry, but its data has been freed, so we need to reload it. |
|
EnterCriticalSection( &m_CS ); |
|
bFileAlreadyExisted = true; |
|
} |
|
} |
|
|
|
// Can't open a file outside our main thread, because we have to talk to the filesystem |
|
// and the filesystem doesn't support that. |
|
Assert( GetCurrentThread() == m_hMainThread ); |
|
|
|
// When the worker originally asked for the path ID, they could pass NULL and it would come through as "". |
|
// Now set it back to null for the filesystem we're passing the call to. |
|
FileHandle_t fp = m_pPassThru->Open( pFilename, "rb", pPathID[0] == 0 ? NULL : pPathID ); |
|
if ( !fp ) |
|
{ |
|
if ( bFileAlreadyExisted ) |
|
LeaveCriticalSection( &m_CS ); |
|
|
|
return -1; |
|
} |
|
|
|
if ( !bFileAlreadyExisted ) |
|
pFile = new CMulticastFile; |
|
|
|
pFile->m_UncompressedData.SetSize( m_pPassThru->Size( fp ) ); |
|
m_pPassThru->Read( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), fp ); |
|
m_pPassThru->Close( fp ); |
|
|
|
int iRet = FinishFileSetup( pFile, pFilename, pPathID, bFileAlreadyExisted ); |
|
if ( bFileAlreadyExisted ) |
|
LeaveCriticalSection( &m_CS ); |
|
|
|
return iRet; |
|
} |
|
|
|
|
|
int CMasterMulticastThread::FinishFileSetup( CMulticastFile *pFile, const char *pFilename, const char *pPathID, bool bFileAlreadyExisted ) |
|
{ |
|
// Compress the file's contents. |
|
if ( !ZLibCompress( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), pFile->m_Data ) ) |
|
{ |
|
delete pFile; |
|
return -1; |
|
} |
|
|
|
pFile->m_bDataLoaded = true; |
|
int chunkSize = VMPI_GetChunkPayloadSize(); |
|
|
|
// Get this file in the queue. |
|
if ( !bFileAlreadyExisted ) |
|
{ |
|
pFile->m_Filename.SetSize( strlen( pFilename ) + 1 ); |
|
strcpy( pFile->m_Filename.Base(), pFilename ); |
|
|
|
pFile->m_PathID.SetSize( strlen( pPathID ) + 1 ); |
|
strcpy( pFile->m_PathID.Base(), pPathID ); |
|
|
|
pFile->m_nCycles = 0; |
|
|
|
pFile->m_Info.m_CompressedSize = pFile->m_Data.Count(); |
|
pFile->m_Info.m_UncompressedSize = pFile->m_UncompressedData.Count(); |
|
|
|
pFile->m_Info.m_nChunks = PAD_NUMBER( pFile->m_Info.m_CompressedSize, chunkSize ) / chunkSize; |
|
|
|
// Initialize the chunks. |
|
pFile->m_Chunks.SetSize( pFile->m_Info.m_nChunks ); |
|
for ( int i=0; i < pFile->m_Chunks.Count(); i++ ) |
|
{ |
|
CChunkInfo *pChunk = &pFile->m_Chunks[i]; |
|
|
|
pChunk->m_iChunk = (unsigned short)i; |
|
pChunk->m_RefCount = 0; |
|
pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk ); |
|
} |
|
|
|
EnterCriticalSection( &m_CS ); |
|
} |
|
|
|
// Boot some other file out of memory if we're out of space. |
|
m_nCurMemoryUsage += ( pFile->m_Info.m_CompressedSize + pFile->m_Info.m_UncompressedSize ); |
|
EnsureMemoryLimit( pFile ); |
|
|
|
if ( !bFileAlreadyExisted ) |
|
{ |
|
pFile->m_Info.m_FileID = m_Files.AddToTail( pFile ); |
|
LeaveCriticalSection( &m_CS ); |
|
} |
|
|
|
return pFile->m_Info.m_FileID; |
|
} |
|
|
|
|
|
void CMasterMulticastThread::EnsureMemoryLimit( CMulticastFile *pIgnore ) |
|
{ |
|
if ( m_nMaxMemoryUsage != 0 && m_nCurMemoryUsage > m_nMaxMemoryUsage ) |
|
{ |
|
// Free all the files that we can. |
|
FOR_EACH_LL( m_Files, iFile ) |
|
{ |
|
CMulticastFile *pFile = m_Files[iFile]; |
|
if ( pFile == pIgnore || !pFile->m_bDataLoaded ) |
|
continue; |
|
|
|
if ( pFile->m_ActiveChunks.Count() == 0 ) |
|
{ |
|
m_nCurMemoryUsage -= ( pFile->m_Info.m_CompressedSize + pFile->m_Info.m_UncompressedSize ); |
|
|
|
pFile->m_Data.Purge(); |
|
pFile->m_UncompressedData.Purge(); |
|
pFile->m_bDataLoaded = false; |
|
} |
|
} |
|
} |
|
} |
|
|
|
|
|
const CUtlVector<char>& CMasterMulticastThread::GetFileData( int iFile ) const |
|
{ |
|
return m_Files[iFile]->m_UncompressedData; |
|
} |
|
|
|
|
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
// CMasterVMPIFileSystem implementation. |
|
// ------------------------------------------------------------------------------------------------------------------------ // |
|
|
|
class CMasterVMPIFileSystem : public CBaseVMPIFileSystem |
|
{ |
|
public: |
|
CMasterVMPIFileSystem(); |
|
virtual ~CMasterVMPIFileSystem(); |
|
|
|
bool Init( int maxMemoryUsage, IFileSystem *pPassThru ); |
|
virtual void Term(); |
|
|
|
virtual FileHandle_t Open( const char *pFilename, const char *pOptions, const char *pathID ); |
|
virtual bool HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID ); |
|
|
|
virtual void CreateVirtualFile( const char *pFilename, const void *pData, int fileLength ); |
|
|
|
virtual CSysModule *LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly ); |
|
virtual void UnloadModule( CSysModule *pModule ); |
|
|
|
|
|
private: |
|
|
|
static void OnClientDisconnect( int procID, const char *pReason ); |
|
|
|
|
|
private: |
|
CMasterMulticastThread m_MasterThread; |
|
IFileSystem *m_pMasterVMPIFileSystemPassThru; |
|
|
|
static CMasterVMPIFileSystem *s_pMasterVMPIFileSystem; |
|
}; |
|
|
|
|
|
CMasterVMPIFileSystem *CMasterVMPIFileSystem::s_pMasterVMPIFileSystem = NULL; |
|
|
|
|
|
CBaseVMPIFileSystem* CreateMasterVMPIFileSystem( int maxMemoryUsage, IFileSystem *pPassThru ) |
|
{ |
|
CMasterVMPIFileSystem *pRet = new CMasterVMPIFileSystem; |
|
g_pBaseVMPIFileSystem = pRet; |
|
if ( pRet->Init( maxMemoryUsage, pPassThru ) ) |
|
{ |
|
return pRet; |
|
} |
|
else |
|
{ |
|
delete pRet; |
|
g_pBaseVMPIFileSystem = NULL; |
|
return NULL; |
|
} |
|
} |
|
|
|
|
|
CMasterVMPIFileSystem::CMasterVMPIFileSystem() |
|
{ |
|
Assert( !s_pMasterVMPIFileSystem ); |
|
s_pMasterVMPIFileSystem = this; |
|
} |
|
|
|
|
|
CMasterVMPIFileSystem::~CMasterVMPIFileSystem() |
|
{ |
|
Assert( s_pMasterVMPIFileSystem == this ); |
|
s_pMasterVMPIFileSystem = NULL; |
|
} |
|
|
|
|
|
bool CMasterVMPIFileSystem::Init( int maxMemoryUsage, IFileSystem *pPassThru ) |
|
{ |
|
// Only init the BASE filesystem passthru. Leave the IFileSystem passthru using NULL so it'll crash |
|
// immediately if they try to use a function we don't support. |
|
InitPassThru( pPassThru, true ); |
|
m_pMasterVMPIFileSystemPassThru = pPassThru; |
|
|
|
// Pick a random IP in the multicast range (224.0.0.2 to 239.255.255.255); |
|
CCycleCount cnt; |
|
cnt.Sample(); |
|
RandomSeed( (int)cnt.GetMicroseconds() ); |
|
|
|
int localPort = 23412; // This can be anything. |
|
|
|
unsigned short port = RandomInt( 22000, 25000 ); |
|
if ( VMPI_GetRunMode() == VMPI_RUN_NETWORKED ) |
|
{ |
|
if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_MULTICAST ) |
|
{ |
|
m_MulticastIP.port = port; |
|
m_MulticastIP.ip[0] = (unsigned char)RandomInt( 225, 238 ); |
|
m_MulticastIP.ip[1] = (unsigned char)RandomInt( 0, 255 ); |
|
m_MulticastIP.ip[2] = (unsigned char)RandomInt( 0, 255 ); |
|
m_MulticastIP.ip[3] = (unsigned char)RandomInt( 3, 255 ); |
|
} |
|
else if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST ) |
|
{ |
|
m_MulticastIP.Init( 0xFF, 0xFF, 0xFF, 0xFF, port ); |
|
} |
|
} |
|
else |
|
{ |
|
// Doesn't matter.. we don't use the multicast IP in TCP mode. |
|
m_MulticastIP.Init( 0, 0, 0, 0, 0 ); |
|
} |
|
|
|
if ( !m_MasterThread.Init( pPassThru, localPort, &m_MulticastIP, maxMemoryUsage ) ) |
|
return false; |
|
|
|
// Send out the multicast addr to all the clients. |
|
SendMulticastIP( &m_MulticastIP ); |
|
|
|
// Make sure we're notified when a client disconnects so we can unlink them from the |
|
// multicast thread's structures. |
|
VMPI_AddDisconnectHandler( &CMasterVMPIFileSystem::OnClientDisconnect ); |
|
return true; |
|
} |
|
|
|
|
|
void CMasterVMPIFileSystem::Term() |
|
{ |
|
m_MasterThread.Term(); |
|
} |
|
|
|
|
|
FileHandle_t CMasterVMPIFileSystem::Open( const char *pFilename, const char *pOptions, const char *pPathID ) |
|
{ |
|
Assert( g_bUseMPI ); |
|
|
|
if ( g_bDisableFileAccess ) |
|
Error( "Open( %s, %s ) - file access has been disabled.", pFilename, pOptions ); |
|
|
|
// Use a stdio file if they want to write to it. |
|
bool bWriteAccess = IsOpeningForWriteAccess( pOptions ); |
|
if ( bWriteAccess ) |
|
{ |
|
FileHandle_t fp = m_pBaseFileSystemPassThru->Open( pFilename, pOptions, pPathID ); |
|
if ( fp == FILESYSTEM_INVALID_HANDLE ) |
|
return FILESYSTEM_INVALID_HANDLE; |
|
|
|
CVMPIFile_PassThru *pFile = new CVMPIFile_PassThru; |
|
pFile->Init( m_pBaseFileSystemPassThru, fp ); |
|
return (FileHandle_t)pFile; |
|
} |
|
|
|
// Internally, we require path IDs to be non-null. We'll convert it back to null whenever we make filesystem calls though. |
|
if ( !pPathID ) |
|
pPathID = ""; |
|
|
|
// Have our multicast thread load all the data so it's there when workers want it. |
|
int iFile = m_MasterThread.FindOrAddFile( pFilename, pPathID ); |
|
if ( iFile == -1 ) |
|
return FILESYSTEM_INVALID_HANDLE; |
|
|
|
const CUtlVector<char> &data = m_MasterThread.GetFileData( iFile ); |
|
|
|
CVMPIFile_Memory *pFile = new CVMPIFile_Memory; |
|
pFile->Init( data.Base(), data.Count(), strchr( pOptions, 't' ) ? 't' : 'b' ); |
|
return (FileHandle_t)pFile; |
|
} |
|
|
|
|
|
void CMasterVMPIFileSystem::OnClientDisconnect( int procID, const char *pReason ) |
|
{ |
|
s_pMasterVMPIFileSystem->m_MasterThread.OnClientDisconnect( procID ); |
|
} |
|
|
|
|
|
void CMasterVMPIFileSystem::CreateVirtualFile( const char *pFilename, const void *pData, int fileLength ) |
|
{ |
|
m_MasterThread.CreateVirtualFile( pFilename, pData, fileLength ); |
|
} |
|
|
|
bool CMasterVMPIFileSystem::HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID ) |
|
{ |
|
// Handle this packet. |
|
int subPacketID = pBuf->data[1]; |
|
switch( subPacketID ) |
|
{ |
|
case VMPI_FSPACKETID_FILE_REQUEST: |
|
{ |
|
int requestID = *((int*)&pBuf->data[2]); |
|
const char *pFilename = (const char*)&pBuf->data[6]; |
|
const char *pPathID = (const char*)pFilename + strlen( pFilename ) + 1; |
|
|
|
if ( g_iVMPIVerboseLevel >= 2 ) |
|
Msg( "Client %d requested '%s'\n", iSource, pFilename ); |
|
|
|
bool bZeroLength; |
|
int fileID = m_MasterThread.AddFileRequest( pFilename, pPathID, iSource, &bZeroLength ); |
|
|
|
// Send back the file ID. |
|
unsigned char cPacket[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_RESPONSE }; |
|
void *pChunks[4] = { cPacket, &requestID, &fileID, &bZeroLength }; |
|
int chunkLen[4] = { sizeof( cPacket ), sizeof( requestID ), sizeof( fileID ), sizeof( bZeroLength ) }; |
|
|
|
VMPI_SendChunks( pChunks, chunkLen, ARRAYSIZE( pChunks ), iSource ); |
|
} |
|
return true; |
|
|
|
case VMPI_FSPACKETID_CHUNK_RECEIVED: |
|
{ |
|
unsigned short *pFileID = (unsigned short*)&pBuf->data[2]; |
|
unsigned short *pChunkID = pFileID+1; |
|
|
|
int nChunks = (pBuf->getLen() - 2) / 4; |
|
for ( int i=0; i < nChunks; i++ ) |
|
{ |
|
m_MasterThread.OnChunkReceived( *pFileID, iSource, *pChunkID ); |
|
pFileID += 2; |
|
pChunkID += 2; |
|
} |
|
} |
|
return true; |
|
|
|
case VMPI_FSPACKETID_FILE_RECEIVED: |
|
{ |
|
unsigned short *pFileID = (unsigned short*)&pBuf->data[2]; |
|
m_MasterThread.OnFileReceived( *pFileID, iSource ); |
|
} |
|
return true; |
|
|
|
default: |
|
return false; |
|
} |
|
} |
|
|
|
|
|
CSysModule* CMasterVMPIFileSystem::LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly ) |
|
{ |
|
return m_pMasterVMPIFileSystemPassThru->LoadModule( pFileName, pPathID, bValidatedDllOnly ); |
|
} |
|
|
|
void CMasterVMPIFileSystem::UnloadModule( CSysModule *pModule ) |
|
{ |
|
m_pMasterVMPIFileSystemPassThru->UnloadModule( pModule ); |
|
} |
|
|
|
|