//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose: 
//
//=============================================================================//

#include <winsock2.h>
#include "vmpi_filesystem_internal.h"
#include "zlib.h"
#include "vstdlib/random.h"


#define MINIMUM_SLEEP_MS				1

// NOTE: This number comes from measurements on our network to find out how fast
// we can broadcast without the network freaking out.
//
// This number can be changed on the command line with the -mpi_FileTransmitRate parameter.
int MULTICAST_TRANSMIT_RATE = (1024*1000);	// N megs per second

// Defines when we'll stop transmitting a file to a client.
// (After we've transmitted the file to the client N times and we haven't heard an ack back for M seconds).
#define MIN_FILE_CYCLE_COUNT		5
#define CLIENT_FILE_ACK_TIMEOUT		20



// ------------------------------------------------------------------------------------------------------------------------ //
// Global helpers.
// ------------------------------------------------------------------------------------------------------------------------ //

static void SendMulticastIP( const CIPAddr *pAddr )
{
	unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_MULTICAST_ADDR };
	VMPI_Send2Chunks( 
		packetID, sizeof( packetID ),
		pAddr, sizeof( *pAddr ),
		VMPI_PERSISTENT );
}


static bool IsOpeningForWriteAccess( const char *pOptions )
{
	return strchr( pOptions, 'w' ) || strchr( pOptions, 'a' ) || strchr( pOptions, '+' );
}


// This does a fast zlib compression of the source data into the 'out' buffer.
static bool ZLibCompress( const void *pData, int len, CUtlVector<char> &out )
{
	if ( len == 0 )
	{
		out.Purge();
		return true;
	}

	int outStartLen = len;
RETRY:;

	// Prepare the compression stream.
	z_stream zs;
	memset( &zs, 0, sizeof( zs ) );
	
	if ( deflateInit( &zs, 1 ) != Z_OK )
		return false;
	

	// Now compress it into the output buffer.
	out.SetSize( outStartLen );

	zs.next_in = (unsigned char*)pData;
	zs.avail_in = len;

	zs.next_out = (unsigned char*)out.Base();
	zs.avail_out = out.Count();

	int ret = deflate( &zs, Z_FINISH );
	deflateEnd( &zs );

	if ( ret == Z_STREAM_END )
	{
		// Get rid of whatever was left over.
		out.RemoveMultiple( zs.total_out, out.Count() - zs.total_out );
		return true;
	}
	else if ( ret == Z_OK )
	{
		// Need more space in the output buffer.
		outStartLen += 1024 * 128;
		goto RETRY;
	}
	else
	{
		return false;
	}
}


// ------------------------------------------------------------------------------------------------------------------------ //
// CVMPIFile_PassThru
// ------------------------------------------------------------------------------------------------------------------------ //

class CVMPIFile_PassThru : public IVMPIFile
{
public:
	void Init( IBaseFileSystem *pPassThru, FileHandle_t fp )
	{
		m_pPassThru = pPassThru;
		m_fp = fp;
	}

	virtual void Close()
	{
		m_pPassThru->Close( m_fp );
		delete this;
	}

	virtual void Seek( int pos, FileSystemSeek_t seekType )
	{
		m_pPassThru->Seek( m_fp, pos, seekType );
	}

	virtual unsigned int Tell()
	{
		return m_pPassThru->Tell( m_fp );
	}

	virtual unsigned int Size()
	{
		return m_pPassThru->Size( m_fp );
	}

	virtual void Flush()
	{
		m_pPassThru->Flush( m_fp );
	}
	
	virtual int Read( void* pOutput, int size )
	{
		return m_pPassThru->Read( pOutput, size, m_fp );
	}

	virtual int Write( void const* pInput, int size )
	{
		return m_pPassThru->Write( pInput, size, m_fp ); 
	}


private:
	IBaseFileSystem *m_pPassThru;
	FileHandle_t m_fp;
};



// ---------------------------------------------------------------------------------------------------- //
// CTransmitRateMgr coordinates with any other currently-running VMPI jobs, and they all will cut
// down their transmission rate to stay within MULTICAST_TRANSMIT_RATE.
// ---------------------------------------------------------------------------------------------------- //

#define TRANSMITRATEMGR_BROADCAST_INVERVAL	(1.0 / 3.0)	// How many times per second we broadcast our presence.
#define TRANSMITRATEMGR_EXPIRE_TIME			0.7			// How long it'll go before deciding a guy is not transmitting anymore.

static char s_cTransmitRateMgrPacket[] = {2,6,-3,2,1,-66};

class CTransmitRateMgr
{
public:
	CTransmitRateMgr();

	void ReadPackets();
	void BroadcastPresence();

	double GetMicrosecondsPerByte() const;

private:
	class CMachineRecord
	{
	public:
		unsigned long m_UniqueID;
		float m_flLastTime;
	};
	CUtlVector<CMachineRecord> m_MachineRecords;

	unsigned long m_UniqueID;
	float m_flLastBroadcastTime;
	double	m_nMicrosecondsPerByte;
	ISocket *m_pSocket;
};

CTransmitRateMgr::CTransmitRateMgr()
{
	m_nMicrosecondsPerByte = 1000000.0 / (double)MULTICAST_TRANSMIT_RATE;
	m_flLastBroadcastTime = 0;

	// Build a (hopefully) unique ID.
	m_UniqueID = (unsigned long)this;
	CCycleCount cnt;
	cnt.Sample();
	m_UniqueID += cnt.GetMicroseconds();
	Sleep( 1 );
	m_UniqueID += cnt.GetMicroseconds();

	m_pSocket = CreateIPSocket();
	if ( m_pSocket )
	{
		m_pSocket->BindToAny( VMPI_MASTER_FILESYSTEM_BROADCAST_PORT );
	}
}

void CTransmitRateMgr::ReadPackets()
{
	if ( !m_pSocket )
		return;

	float flCurTime = Plat_FloatTime();

	// First, update/add records.
	while ( 1 )
	{
		char data[512];
		CIPAddr ipFrom;
		int len = m_pSocket->RecvFrom( data, sizeof( data ), &ipFrom );
		if ( len == -1 )
			break;
		
		if ( len == sizeof( s_cTransmitRateMgrPacket ) + sizeof( unsigned long ) && 
			 memcmp( data, s_cTransmitRateMgrPacket, sizeof( s_cTransmitRateMgrPacket ) ) == 0 )
		{
			unsigned long id = *((unsigned long*)&data[sizeof(s_cTransmitRateMgrPacket)]);
			if ( id == m_UniqueID )
				continue;

			int i;
			for ( i=0; i < m_MachineRecords.Count(); i++ )
			{
				if ( m_MachineRecords[i].m_UniqueID == id )
				{
					m_MachineRecords[i].m_flLastTime = flCurTime;
					break;
				}
			}

			if ( i == m_MachineRecords.Count() )			   
			{
				int index = m_MachineRecords.AddToTail();
				m_MachineRecords[index].m_UniqueID = id;
				m_MachineRecords[index].m_flLastTime = flCurTime;
			}
		}
	}

	// Now, expire any old records.
	for ( int i=0; i < m_MachineRecords.Count(); i++ )
	{
		if ( (flCurTime - m_MachineRecords[i].m_flLastTime) > TRANSMITRATEMGR_EXPIRE_TIME )
		{
			m_MachineRecords.Remove( i );
			--i;
		}
	}

	// Recalculate our transmit rate (assuming we're receiving our own broadcast packets).
	m_nMicrosecondsPerByte = 1000000.0 / (double)(MULTICAST_TRANSMIT_RATE / (m_MachineRecords.Count() + 1));
}

void CTransmitRateMgr::BroadcastPresence()
{
	if ( !m_pSocket )
		return;

	float flCurTime = Plat_FloatTime();
	if ( (flCurTime - m_flLastBroadcastTime) < TRANSMITRATEMGR_BROADCAST_INVERVAL )
		return;

	m_flLastBroadcastTime = flCurTime;
	
	char cData[sizeof( s_cTransmitRateMgrPacket ) + sizeof( unsigned long )];    
	memcpy( cData, s_cTransmitRateMgrPacket, sizeof( s_cTransmitRateMgrPacket ) );
	*((unsigned long*)&cData[ sizeof( s_cTransmitRateMgrPacket ) ] ) = m_UniqueID;

	m_pSocket->Broadcast( cData, sizeof( cData ), VMPI_MASTER_FILESYSTEM_BROADCAST_PORT );
}

inline double CTransmitRateMgr::GetMicrosecondsPerByte() const
{
	return m_nMicrosecondsPerByte;
}


// ---------------------------------------------------------------------------------------------------- //
// CRateLimiter manages waiting for small periods of time between packets so the rate is
// whatever we want it to be.
//
// It also will give up some CPU time to other processes every 50 milliseconds.
// ---------------------------------------------------------------------------------------------------- //

class CRateLimiter
{
public:

			CRateLimiter();
	
	void	GiveUpTimeSlice();
	void	NoteExcessTimeTaken( unsigned long excessTimeInMicroseconds );


public:

	DWORD m_SleepIntervalMS; // Give up a timeslice every N milliseconds.

	// Since we sleep once in a while, we time how long the sleep took and we beef 
	// up the transmit rate until we've accounted for the time lost during the sleep.
	DWORD m_AccumulatedSleepMicroseconds;

	// When was the last time we gave up a little bit of CPU to other programs.
	CCycleCount m_LastSleepTime;
};

CRateLimiter::CRateLimiter()
{
	m_SleepIntervalMS = 50;
	m_AccumulatedSleepMicroseconds = 0;
	m_LastSleepTime.Sample();
}

void CRateLimiter::GiveUpTimeSlice()
{
	// Sleep again?
	CCycleCount currentTime, dtSinceLastSleep;
	currentTime.Sample();
	CCycleCount::Sub( currentTime, m_LastSleepTime, dtSinceLastSleep );

	if ( dtSinceLastSleep.GetMilliseconds() >= m_SleepIntervalMS )
	{
		CFastTimer sleepTimer;
		
		sleepTimer.Start();
		Sleep( 10 );
		sleepTimer.End();

		m_AccumulatedSleepMicroseconds += sleepTimer.GetDuration().GetMicroseconds();
		m_LastSleepTime.Sample();
	}
}


void CRateLimiter::NoteExcessTimeTaken( unsigned long excessTimeInMicroseconds )
{
	// Note: we give up time slices above.
	if ( excessTimeInMicroseconds > m_AccumulatedSleepMicroseconds )
	{ 
		excessTimeInMicroseconds -= m_AccumulatedSleepMicroseconds;
		m_AccumulatedSleepMicroseconds = 0;

		CCycleCount startCount;
		startCount.Sample();
		while ( 1 )
		{
			CCycleCount curCount, diff;
			curCount.Sample();

			CCycleCount::Sub( curCount, startCount, diff );
			if ( diff.GetMicroseconds() >= excessTimeInMicroseconds )
				break;
		}
	}
	else
	{
		m_AccumulatedSleepMicroseconds -= excessTimeInMicroseconds;
		excessTimeInMicroseconds = 0;
	}
}


// ------------------------------------------------------------------------------------------------------------------------ //
// CMasterMulticastThread.
// ------------------------------------------------------------------------------------------------------------------------ //

class CMasterMulticastThread
{
public:

					CMasterMulticastThread();
					~CMasterMulticastThread();

	// This creates the socket and starts the thread (initially in an idle state since it doesn't
	// know of any files anyone wants).
	bool			Init( IBaseFileSystem *pPassThru, unsigned short localPort, const CIPAddr *pAddr, int maxFileSystemMemoryUsage );
	void			Term();

	// Returns -1 if there is an error.
	int				FindOrAddFile( const char *pFilename, const char *pPathID );	
	const CUtlVector<char>& GetFileData( int iFile ) const;

	// When a client requests a files, this is called to tell the thread to start
	// adding chunks from the specified file into the queue it's multicasting.
	//
	// Returns -1 if the file isn't there. Otherwise, it returns the file ID
	// that will be sent along with the file's chunks in the multicast packets.
	int				AddFileRequest( const char *pFilename, const char *pPathID, int clientID, bool *bZeroLength );
	
	// As each client receives multicasted chunks, they ack them so the master can
	// stop transmitting any chunks it knows nobody wants.
	void			OnChunkReceived( int fileID, int clientID, int iChunk );
	void			OnFileReceived( int fileID, int clientID );

	// Call this if a client disconnects so it can stop sending anything this client wants.
	void			OnClientDisconnect( int clientID, bool bGrabCriticalSection=true );

	void CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength );

private:

	class CChunkInfo
	{
	public:
		unsigned short	m_iChunk;
		unsigned short	m_RefCount;				// How many clients want this chunk.
		unsigned short	m_iActiveChunksIndex;	// Index into m_ActiveChunks.
	};


	// This stores a client's reference to a file so it knows which pieces of the file the client needs.
	class CClientFileInfo
	{
	public:
		bool NeedsChunk( int i ) const { return (m_ChunksToSend[i>>3] & (1 << (i&7))) != 0; }	
	
	public:
		int							m_ClientID;
		CUtlVector<unsigned char>	m_ChunksToSend;	// One bit for each chunk that this client still wants.
		int m_nChunksLeft;

		// TCP transmission only.
		int m_TCP_LastChunkAcked;
		int m_TCP_LastChunkSent;
		
		float m_flTransmitStartTime;
		
		float m_flLastAckTime;		// Last time we heard an ack back from this client about this file.
									// If this goes for too long, then we assume that the client is
									// in a screwed state, and we stop sending the file to him.
		int m_nTimesFileCycled;		// How many times has the master multicast thread cycled over this file?
									// We won't kick the client until we've cycled over the file a few times
									// after the client asked for it.
	};


	class CMulticastFile
	{
	public:
		~CMulticastFile()
		{
			m_Clients.PurgeAndDeleteElements();
		}

		const char* GetFilename() { return m_Filename.Base(); }
		const char* GetPathID() { return m_PathID.Base(); }


	public:
		int m_nCycles; // How many times has the multicast thread visited this file?

		// This is sent along with every packet. If a client gets a chunk and doesn't have that file's
		// info, the client will receive that file too.
		CUtlVector<char> m_Filename;
		CUtlVector<char> m_PathID;

		CMulticastFileInfo m_Info;

		// This is stored so the app can read out the uncompressed data.
		CUtlVector<char>				m_UncompressedData;

		// zlib-compressed file data
		CUtlVector<char>				m_Data;	

		// This gets set to false if we run over our memory limit and start caching file data out.
		// Then it'll reload the data if a client requests the file.
		bool m_bDataLoaded;

		// m_Chunks holds the chunks by index.
		// m_ActiveChunks holds them sorted by whether they're active or not.
		// 
		// Each chunk has a refcount. While the refcount is > 0, the chunk is in the first
		// half of m_ActiveChunks. When the refcount gets to 0, the chunk is moved to the end of 
		// m_ActiveChunks. That way, we can iterate through the chunks that need to be sent and 
		// stop iterating the first time we hit one with a refcount of 0.
		CUtlVector<CChunkInfo>			m_Chunks;
		CUtlLinkedList<CChunkInfo*,int>	m_ActiveChunks;

		// This tells which clients want pieces of this file.
		CUtlLinkedList<CClientFileInfo*,int>	m_Clients;
	};


private:

	static DWORD WINAPI StaticMulticastThread( LPVOID pParameter );
	DWORD MulticastThread();

	bool CheckClientTimeouts();
	bool Thread_SendFileChunk_Multicast( int *pnBytesSent );
	void Thread_SeekToNextActiveChunk();

	// In TCP mode, we send new chunks as they are acked.
	void TCP_SendNextChunk( CMulticastFile *pFile, CClientFileInfo *pClient );

	void EnsureMemoryLimit( CMulticastFile *pIgnore );

	// Called after pFile->m_UncompressedData has been setup. This compresses the data, prepares the header,
	// copies the filename, and adds it into the queue for the multicast thread.
	int FinishFileSetup( CMulticastFile *pFile, const char *pFilename, const char *pPathID, bool bFileAlreadyExisted );

	void IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk );
	void DecrementChunkRefCount( int iFile, int iChunk );
	
	int FindFile( const char *pFilename, const char *pPathID );

	bool FindWarningSuppression( const char *pFilename );
	void AddWarningSuppression( const char *pFilename );

private:
	
	CUtlLinkedList<CMulticastFile*,int>		m_Files;
	
	unsigned long m_nCurMemoryUsage;	// Total of all the file data we have loaded.
	unsigned long m_nMaxMemoryUsage;	// 0 means that there is no limit.

	// This tracks how many chunks we have that want to be sent.
	int m_nTotalActiveChunks;

	SOCKET m_Socket;
	sockaddr_in m_MulticastAddr;

	HANDLE m_hMainThread;
	IBaseFileSystem *m_pPassThru;
	
	HANDLE m_hThread;
	CRITICAL_SECTION m_CS;

	// Events used to communicate with our thread.
	HANDLE m_hTermEvent;

	// The thread walks through this as it spews chunks of data.
	volatile int m_iCurFile;			// Index into m_Files.
	volatile int m_iCurActiveChunk;		// Current index into CMulticastFile::m_ActiveChunks.

	CUtlLinkedList<char*,int> m_WarningSuppressions;
};


CMasterMulticastThread::CMasterMulticastThread()
{
	m_hThread = m_hMainThread = NULL;
	m_Socket = INVALID_SOCKET;
	m_nTotalActiveChunks = 0;
	m_iCurFile = m_iCurActiveChunk = -1;
	m_pPassThru = NULL;
	
	m_hTermEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
	InitializeCriticalSection( &m_CS );
	m_nCurMemoryUsage = m_nMaxMemoryUsage = 0;
}


CMasterMulticastThread::~CMasterMulticastThread()
{
	Term();
	
	CloseHandle( m_hTermEvent );

	DeleteCriticalSection( &m_CS );
}


bool CMasterMulticastThread::Init( IBaseFileSystem *pPassThru, unsigned short localPort, const CIPAddr *pAddr, int maxMemoryUsage )
{
	Term();

	m_nMaxMemoryUsage = maxMemoryUsage;
	Assert( m_nCurMemoryUsage == 0 );
	m_nCurMemoryUsage = 0;

	if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP )
	{
		// No need for an extra socket in this mode.
		m_Socket = INVALID_SOCKET;
	}
	else
	{
		// First, create our socket.
		m_Socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_IP );
		if ( m_Socket == INVALID_SOCKET )
		{
			Warning( "CMasterMulticastThread::Init - socket() failed\n" );
			return false;
		}

		// Bind to INADDR_ANY.
		CIPAddr localAddr( 0, 0, 0, 0, localPort );
		
		sockaddr_in addr;
		IPAddrToSockAddr( &localAddr, &addr );

		int status = bind( m_Socket, (sockaddr*)&addr, sizeof(addr) );
		if ( status != 0 )
		{
			Term();
			Warning( "CMasterMulticastThread::Init - bind( %d.%d.%d.%d:%d ) failed\n", EXPAND_ADDR( *pAddr ) );
			return false;
		}
		
		if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST )
		{
			// Set up for broadcast
			BOOL bBroadcast = TRUE;
			if ( setsockopt( m_Socket, SOL_SOCKET, SO_BROADCAST, (char*)&bBroadcast, bBroadcast ) == SOCKET_ERROR )
			{
				Term();
				Warning( "CMasterMulticastThread::Init - setsockopt() failed to set broadcast mode\n" );
				return false;
			}
		}

		// Remember the address we want to send to.
		IPAddrToSockAddr( pAddr, &m_MulticastAddr );

		// Now create our thread.
		DWORD dwThreadID = 0;
		m_hThread = CreateThread( NULL, 0, &CMasterMulticastThread::StaticMulticastThread, this, 0, &dwThreadID );
		if ( !m_hThread )
		{
			Term();
			Warning( "CMasterMulticastThread::Init - CreateThread failed\n" );
			return false;
		}

		SetThreadPriority( m_hThread, THREAD_PRIORITY_LOWEST );
	}

	// For debug mode to verify that we don't try to open files while in another thread.
	m_hMainThread = GetCurrentThread();

	m_pPassThru = pPassThru;
	return true;
}


void CMasterMulticastThread::Term()
{
	// Stop the thread if it is running.
	if ( m_hThread )
	{
		SetEvent( m_hTermEvent );
		WaitForSingleObject( m_hThread, INFINITE );
		CloseHandle( m_hThread );

		m_hThread = NULL;
	}

	// Close the socket.
	if ( m_Socket != INVALID_SOCKET )
	{
		closesocket( m_Socket );
		m_Socket = INVALID_SOCKET;
	}

	// Free up other data.
	m_Files.PurgeAndDeleteElements();
	m_nCurMemoryUsage = m_nMaxMemoryUsage = 0;
}


void CMasterMulticastThread::TCP_SendNextChunk( CMulticastFile *pFile, CClientFileInfo *pClient )
{
	// No more chunks to send?
	if ( (pClient->m_TCP_LastChunkSent+1) >= pFile->m_Info.m_nChunks )
		return;
	
	// Figure out what data we'd be sending.
	int iChunkToSend = pClient->m_TCP_LastChunkSent + 1;
	int iStartByte = iChunkToSend * TCP_CHUNK_PAYLOAD_SIZE;
	int iEndByte = min( iStartByte + TCP_CHUNK_PAYLOAD_SIZE, pFile->m_Data.Count() );
	
	// If the start point is past the end, then we're done sending the file to this client.
	if ( iStartByte >= pFile->m_Data.Count() )
		return;

	// Record that we sent this data.
	pClient->m_TCP_LastChunkSent = iChunkToSend;

	// Assemble the packet.
	unsigned char cPacket[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_CHUNK };

	const void *chunks[5] =
	{
		cPacket,
		&pFile->m_Info,
		&iChunkToSend,
		pFile->GetFilename(),
		&pFile->m_Data[iStartByte]
	};

	int chunkLengths[5] =
	{
		sizeof( cPacket ),
		sizeof( pFile->m_Info ),
		sizeof( m_iCurActiveChunk ),
		strlen( pFile->GetFilename() ) + 1,
		iEndByte - iStartByte
	};
	
	VMPI_SendChunks( chunks, chunkLengths, 5, pClient->m_ClientID );
}


int CMasterMulticastThread::AddFileRequest( const char *pFilename, const char *pPathID, int clientID, bool *bZeroLength )
{
	// Firstly, do we already have this file?
	int iFile = FindOrAddFile( pFilename, pPathID );
	if ( iFile == -1 )
		return -1;

	CMulticastFile *pFile = m_Files[iFile];

	// Now that we have a file setup, merge in this client's info.
	EnterCriticalSection( &m_CS );

		CClientFileInfo *pClient = new CClientFileInfo;
		pClient->m_TCP_LastChunkAcked = -1;
		pClient->m_TCP_LastChunkSent = -1;
		pClient->m_ClientID = clientID;
		pClient->m_flLastAckTime = Plat_FloatTime();
		pClient->m_flTransmitStartTime = pClient->m_flLastAckTime;
		pClient->m_nTimesFileCycled = 0;
		pClient->m_nChunksLeft = pFile->m_Info.m_nChunks;
		pClient->m_ChunksToSend.SetSize( PAD_NUMBER( pFile->m_Info.m_nChunks, 8 ) / 8 );
		memset( pClient->m_ChunksToSend.Base(), 0xFF, pClient->m_ChunksToSend.Count() );
		pFile->m_Clients.AddToTail( pClient );

		for ( int i=0; i < pFile->m_Chunks.Count(); i++ )
		{
			IncrementChunkRefCount( pFile, i );
		}

		// In TCP mode, let's get the sliding window started..
		if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP )
		{
			for ( int iDepth=0; iDepth < TCP_CHUNK_QUEUE_LEN; iDepth++ )
				TCP_SendNextChunk( pFile, pClient );
		}
	
	LeaveCriticalSection( &m_CS );

	*bZeroLength = (pFile->m_Info.m_UncompressedSize == 0);

	return iFile;
}


void CMasterMulticastThread::OnChunkReceived( int fileID, int clientID, int iChunk )
{
	if ( !m_Files.IsValidIndex( fileID ) )
	{
		Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID );
		return;
	}

	CMulticastFile *pFile = m_Files[fileID];
	CClientFileInfo *pClient = NULL;
	FOR_EACH_LL( pFile->m_Clients, iClient )
	{
		if ( pFile->m_Clients[iClient]->m_ClientID == clientID )
		{
			pClient = pFile->m_Clients[iClient];
			break;
		}
	}
	if ( !pClient )
	{
		// This will spam sometimes if a worker stops responding and we timeout on it,
		// but then it comes back alive and starts responding. So let's ignore its packets silently.
		//Warning( "CMasterMulticastThread::OnChunkReceived: invalid client ID (%d) for file %s\n", clientID, pFile->GetFilename() );
		return;
	}
	
	if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP )
	{
		// Send the next chunk, if there is one.
		EnterCriticalSection( &m_CS );
		TCP_SendNextChunk( pFile, pClient );
		LeaveCriticalSection( &m_CS );
	}
	else
	{
		if ( !pFile->m_Chunks.IsValidIndex( iChunk ) )
		{
			Warning( "CMasterMulticastThread::OnChunkReceived: invalid chunk index (%d) for file %s\n", iChunk, pFile->GetFilename() );
			return;
		}

		// Mark that this client doesn't need this chunk anymore.
		pClient->m_ChunksToSend[iChunk >> 3] &= ~(1 << (iChunk & 7));
		pClient->m_nChunksLeft--;

		pClient->m_flLastAckTime = Plat_FloatTime();
		if ( pClient->m_nChunksLeft == 0 && g_iVMPIVerboseLevel >= 2 )
			Warning( "Client %d got file %s\n", clientID, pFile->GetFilename() );
		
		EnterCriticalSection( &m_CS );
			DecrementChunkRefCount( fileID, iChunk );
		LeaveCriticalSection( &m_CS );
	}
}


void CMasterMulticastThread::OnFileReceived( int fileID, int clientID )
{
	if ( !m_Files.IsValidIndex( fileID ) )
	{
		Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID );
		return;
	}

	CMulticastFile *pFile = m_Files[fileID];
	for ( int i=0; i < pFile->m_Info.m_nChunks; i++ )
		OnChunkReceived( fileID, clientID, i );
}


void CMasterMulticastThread::OnClientDisconnect( int clientID, bool bGrabCriticalSection )
{
	if ( bGrabCriticalSection )
		EnterCriticalSection( &m_CS );

	// Remove all references from this client.
	FOR_EACH_LL( m_Files, iFile )
	{
		CMulticastFile *pFile = m_Files[iFile];

		FOR_EACH_LL( pFile->m_Clients, iClient )
		{
			CClientFileInfo *pClient = pFile->m_Clients[iClient];
			
			if ( pClient->m_ClientID != clientID )
				continue;

			// Ok, this is our man. Decrement the refcount of any chunks this client wanted.
			for ( int iChunk=0; iChunk < pFile->m_Info.m_nChunks; iChunk++ )
			{
				if ( pClient->NeedsChunk( iChunk ) )
				{
					DecrementChunkRefCount( iFile, iChunk );
				}
			}

			delete pClient;
			pFile->m_Clients.Remove( iClient );

			break;
		}
	}

	if ( bGrabCriticalSection )
		LeaveCriticalSection( &m_CS );
}


void CMasterMulticastThread::CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength )
{
	const char *pPathID = VMPI_VIRTUAL_FILES_PATH_ID;

	int iFile = FindFile( pFilename, pPathID );
	if ( iFile != -1 )
		Error( "CMasterMulticastThread::CreateVirtualFile( %s ) - file already exists!", pFilename );

	CMulticastFile *pFile = new CMulticastFile;
	pFile->m_UncompressedData.CopyArray( (const char*)pData, fileLength );

	FinishFileSetup( pFile, pFilename, pPathID, false );
}


DWORD WINAPI CMasterMulticastThread::StaticMulticastThread( LPVOID pParameter )
{
	return ((CMasterMulticastThread*)pParameter)->MulticastThread();
}


bool CMasterMulticastThread::CheckClientTimeouts()
{
	bool bRet = false;
	CMulticastFile *pFile = m_Files[m_iCurFile];

	float flCurTime = Plat_FloatTime();

	int iNext;
	for( int iCur=pFile->m_Clients.Head(); iCur != pFile->m_Clients.InvalidIndex(); iCur=iNext )
	{
		iNext = pFile->m_Clients.Next( iCur );
		CClientFileInfo *pInfo = pFile->m_Clients[iCur];

		// If the client has already fully received this file, don't bother timing out on it.
		if ( pInfo->m_nChunksLeft == 0 )
			continue;

		++pInfo->m_nTimesFileCycled;
		if ( pInfo->m_nTimesFileCycled >= MIN_FILE_CYCLE_COUNT && (flCurTime - pInfo->m_flLastAckTime) > CLIENT_FILE_ACK_TIMEOUT )
		{
			// For debug output, get the most recent time we heard any ack from this client at all.
			float flMostRecentTime = pInfo->m_flLastAckTime;
			FOR_EACH_LL( m_Files, iTestFile )
			{
				CMulticastFile *pTestFile = m_Files[iTestFile];
				FOR_EACH_LL( pTestFile->m_Clients, iTestClient )
				{
					if ( pTestFile->m_Clients[iTestClient]->m_ClientID == pInfo->m_ClientID )
					{
						flMostRecentTime = max( flMostRecentTime, pTestFile->m_Clients[iTestClient]->m_flLastAckTime );
					}
				}
			}

			Warning( "\nClient %s timed out on file %s (latest: %.2f / cur: %.2f).\n", 
				VMPI_GetMachineName( pInfo->m_ClientID ), pFile->GetFilename(), flMostRecentTime, flCurTime );
			OnClientDisconnect( pInfo->m_ClientID, false );
			bRet = true; // yes, we booted a client.
		}
	}
	
	return bRet;
}

inline bool CMasterMulticastThread::Thread_SendFileChunk_Multicast( int *pnBytesSent )
{
	// Send the next chunk (file, size, time, chunk data).
	CMulticastFile *pFile = m_Files[m_iCurFile];
	
	int iStartByte = m_iCurActiveChunk * MULTICAST_CHUNK_PAYLOAD_SIZE;
	int iEndByte = min( iStartByte + MULTICAST_CHUNK_PAYLOAD_SIZE, pFile->m_Data.Count() );

	WSABUF bufs[4];
	bufs[0].buf = (char*)&pFile->m_Info;
	bufs[0].len = sizeof( pFile->m_Info );

	bufs[1].buf = (char*)&m_iCurActiveChunk;
	bufs[1].len = sizeof( m_iCurActiveChunk );

	bufs[2].buf = (char*)pFile->GetFilename();
	bufs[2].len = strlen( pFile->GetFilename() ) + 1;

	bufs[3].buf = &pFile->m_Data[iStartByte];
	bufs[3].len = iEndByte - iStartByte;

	DWORD nBytesSent = 0;
	DWORD nWantedBytes = ( bufs[0].len + bufs[1].len + bufs[2].len + bufs[3].len );
	bool bSuccess;

	if ( m_MulticastAddr.sin_addr.S_un.S_un_b.s_b1 == 127 && 
			m_MulticastAddr.sin_addr.S_un.S_un_b.s_b2 == 0 && 
			m_MulticastAddr.sin_addr.S_un.S_un_b.s_b3 == 0 && 
			m_MulticastAddr.sin_addr.S_un.S_un_b.s_b4 == 1 )
	{
		// For some mysterious reason, WSASendTo only sends the first buffer
		// if we're sending to 127.0.0.1 (ie: in local mode).
		char allData[1024*8];
		if ( nWantedBytes > sizeof( allData ) )
			Error( "nWantedBytes > sizeof( allData )" );

		memcpy( &allData[0], bufs[0].buf, bufs[0].len );
		memcpy( &allData[bufs[0].len], bufs[1].buf, bufs[1].len );
		memcpy( &allData[bufs[0].len+bufs[1].len], bufs[2].buf, bufs[2].len );
		memcpy( &allData[bufs[0].len+bufs[1].len+bufs[2].len], bufs[3].buf, bufs[3].len );
		int ret = sendto( m_Socket, allData, nWantedBytes, 0, (sockaddr*)&m_MulticastAddr, sizeof( m_MulticastAddr ) );
		bSuccess = (ret == (int)nWantedBytes);
	}
	else
	{
		WSASendTo( 
			m_Socket, 
			bufs, 
			ARRAYSIZE( bufs ), 
			&nBytesSent, 
			0, 
			(sockaddr*)&m_MulticastAddr, 
			sizeof( m_MulticastAddr ),
			NULL,
			NULL );
		bSuccess = (nBytesSent == nWantedBytes);
	}

	// Handle errors.. let it get a few errors, then quit.
	if ( bSuccess )
	{
		*pnBytesSent = (int)nBytesSent;
	}
	else
	{
		static int nWarnings = 0;
		++nWarnings;
		if ( nWarnings < 10 )
		{
			Warning( "\nMulticastThread: WSASendTo with %d bytes sent %d bytes.\n", nWantedBytes, nBytesSent );

			char *lpMsgBuf;
			if ( FormatMessage( 
				FORMAT_MESSAGE_ALLOCATE_BUFFER | 
				FORMAT_MESSAGE_FROM_SYSTEM | 
				FORMAT_MESSAGE_IGNORE_INSERTS,
				NULL,
				GetLastError(),
				MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
				(char*)&lpMsgBuf,
				0,
				NULL 
			) )
			{
				Warning( "%s", lpMsgBuf );
				LocalFree( lpMsgBuf );	
			}
		}
		else if ( nWarnings == 10 )
		{
			Warning( "\nThis machine's ability to multicast may be broken. Please reboot and try again.\n" );
		}
	}

	return bSuccess;
}


void CMasterMulticastThread::Thread_SeekToNextActiveChunk()
{
	// Make sure we're on a valid chunk.
	if ( m_iCurFile == -1 )
	{
		Assert( m_Files.Count() > 0 );
		m_iCurFile = m_Files.Head();
		m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head();
	}

	while ( 1 )
	{
		if ( m_iCurActiveChunk == m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() ||
			m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount == 0 )
		{
			// Now check for client timeouts.
			// (This is kicking clients unjustly for some reason.. need to debug).
			if ( CheckClientTimeouts() && m_nTotalActiveChunks == 0 )
				break;

			// Finished with that file. Send the next one.
			m_iCurFile = m_Files.Next( m_iCurFile );
			if ( m_iCurFile == m_Files.InvalidIndex() )
				m_iCurFile = m_Files.Head();

			m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head();
		}

		if ( m_iCurActiveChunk != m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() )
		{
			// Only break if we're on an active chunk.
			if ( m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount != 0 )
			{
				break;
			}

			m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk );
		}
	}
}


DWORD CMasterMulticastThread::MulticastThread()
{
	CTransmitRateMgr transmitRateMgr;
	CRateLimiter rateLimiter;

	
	DWORD msToWait = 0; // Only temporarily used if we don't have any data to send.

	while ( WaitForSingleObject( m_hTermEvent, msToWait ) != WAIT_OBJECT_0 )
	{
		rateLimiter.GiveUpTimeSlice();
		msToWait = 0;

		EnterCriticalSection( &m_CS );
			
			transmitRateMgr.ReadPackets();
		
			// If we have nothing to send then kick back for a while.
			if ( m_nTotalActiveChunks == 0 )
			{
				LeaveCriticalSection( &m_CS );
				msToWait = 50;
				continue;
			}

			// Ok, now we're active, so send out our presence to other CTransmitRateMgrs on the network.
			transmitRateMgr.BroadcastPresence();

			
			// We're going to time how long this chunk took to send.
			CFastTimer timer;
			timer.Start();

			Thread_SeekToNextActiveChunk();

			// We have to do this check a second time here because the CheckClientTimeouts() call may have
			// booted our last client. If we don't check it here, we might be transmitting
			// something we don't want to transmit. Also, if we don't break out of the loop above,
			// it can prevent the process from ever exiting because it'll never exit that while() loop.
			if ( m_nTotalActiveChunks == 0 )
			{
				LeaveCriticalSection( &m_CS );
				msToWait = 50;
				continue;
			}

			int nBytesSent = 0;

			bool bSuccess;
			bSuccess = Thread_SendFileChunk_Multicast( &nBytesSent );

			g_nMulticastBytesSent += (int)nBytesSent;

			// Move to the next chunk.
			m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk );

		LeaveCriticalSection( &m_CS );


		// Measure how long it took to send this.
		timer.End();
		unsigned long timeTaken = timer.GetDuration().GetMicroseconds();

		
		// Measure how long it should have taken.
		unsigned long estimatedPacketHeaderSize = 32;
		unsigned long optimalTimeTaken = (unsigned long)( transmitRateMgr.GetMicrosecondsPerByte() * (nBytesSent + estimatedPacketHeaderSize) );

		
		// If we went faster than we should have, then wait for the difference in time.
		if ( timeTaken < optimalTimeTaken )
		{
			rateLimiter.NoteExcessTimeTaken( optimalTimeTaken - timeTaken );
		}
	}

	return 0;
}


void CMasterMulticastThread::IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk )
{
	CChunkInfo *pChunk = &pFile->m_Chunks[iChunk];

	if ( pChunk->m_RefCount == 0 )
	{
		++m_nTotalActiveChunks;
		
		// Move the chunk to the head of the list since it is now active.
		pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex );
		pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToHead( pChunk );
	}

	pChunk->m_RefCount++;
}


void CMasterMulticastThread::DecrementChunkRefCount( int iFile, int iChunk )
{
	CMulticastFile *pFile = m_Files[iFile];
	CChunkInfo *pChunk = &pFile->m_Chunks[iChunk];

	if ( pChunk->m_RefCount == 0 )
	{
		Error( "CMasterMulticastThread::DecrementChunkRefCount - refcount already zero!\n" );
	}

	pChunk->m_RefCount--;
	if ( pChunk->m_RefCount == 0 )
	{
		--m_nTotalActiveChunks;
		
		// If this is the current chunk the thread is reading on, seek up to the next chunk so
		// the thread doesn't spin off into the next file and skip its current file's contents.
		if ( iFile == m_iCurFile && pChunk->m_iActiveChunksIndex == m_iCurActiveChunk )
		{
			m_iCurActiveChunk = pFile->m_ActiveChunks.Next( pChunk->m_iActiveChunksIndex );
		}

		// Move the chunk to the end of the list since it is now inactive.
		pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex );
		pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk );
	}
}


int CMasterMulticastThread::FindFile( const char *pName, const char *pPathID )
{
	FOR_EACH_LL( m_Files, i )
	{
		CMulticastFile *pFile = m_Files[i];
		if ( stricmp( pFile->GetFilename(), pName ) == 0 && stricmp( pFile->GetPathID(), pPathID ) == 0 )
			return i;
	}
	return -1;
}


bool CMasterMulticastThread::FindWarningSuppression( const char *pFilename )
{
	FOR_EACH_LL( m_WarningSuppressions, i )
	{
		if ( Q_stricmp( m_WarningSuppressions[i], pFilename ) == 0 )
			return true;
	}
	return false;
}


void CMasterMulticastThread::AddWarningSuppression( const char *pFilename )
{
	char *pBlah = new char[ strlen( pFilename ) + 1 ];
	strcpy( pBlah, pFilename );
	m_WarningSuppressions.AddToTail( pBlah );
}


int CMasterMulticastThread::FindOrAddFile( const char *pFilename, const char *pPathID )
{
	CMulticastFile *pFile = NULL;
	bool bFileAlreadyExisted = false;

	// See if we've already opened this file.
	int iFile = FindFile( pFilename, pPathID );
	if ( iFile != -1 )
	{
		pFile = m_Files[iFile];
		if ( pFile->m_bDataLoaded )
		{
			return iFile;
		}
		else
		{
			// Ok, we have the file entry, but its data has been freed, so we need to reload it.
			EnterCriticalSection( &m_CS );
			bFileAlreadyExisted = true;
		}
	}

	// Can't open a file outside our main thread, because we have to talk to the filesystem
	// and the filesystem doesn't support that.
	Assert( GetCurrentThread() == m_hMainThread );

	// When the worker originally asked for the path ID, they could pass NULL and it would come through as "".
	// Now set it back to null for the filesystem we're passing the call to.
	FileHandle_t fp = m_pPassThru->Open( pFilename, "rb", pPathID[0] == 0 ? NULL : pPathID );
	if ( !fp )
	{
		if ( bFileAlreadyExisted )
			LeaveCriticalSection( &m_CS );

		return -1;
	}

	if ( !bFileAlreadyExisted )
		pFile = new CMulticastFile;

	pFile->m_UncompressedData.SetSize( m_pPassThru->Size( fp ) );
	m_pPassThru->Read( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), fp );
	m_pPassThru->Close( fp );

	int iRet = FinishFileSetup( pFile, pFilename, pPathID, bFileAlreadyExisted );
	if ( bFileAlreadyExisted )
		LeaveCriticalSection( &m_CS );

	return iRet;
}


int CMasterMulticastThread::FinishFileSetup( CMulticastFile *pFile, const char *pFilename, const char *pPathID, bool bFileAlreadyExisted )
{
	// Compress the file's contents.
	if ( !ZLibCompress( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), pFile->m_Data ) )
	{
		delete pFile;
		return -1;
	}

	pFile->m_bDataLoaded = true;
	int chunkSize = VMPI_GetChunkPayloadSize();

	// Get this file in the queue.
	if ( !bFileAlreadyExisted )
	{
		pFile->m_Filename.SetSize( strlen( pFilename ) + 1 );
		strcpy( pFile->m_Filename.Base(), pFilename );

		pFile->m_PathID.SetSize( strlen( pPathID ) + 1 );
		strcpy( pFile->m_PathID.Base(), pPathID );

		pFile->m_nCycles = 0;

		pFile->m_Info.m_CompressedSize = pFile->m_Data.Count();
		pFile->m_Info.m_UncompressedSize = pFile->m_UncompressedData.Count();

		pFile->m_Info.m_nChunks = PAD_NUMBER( pFile->m_Info.m_CompressedSize, chunkSize ) / chunkSize;

		// Initialize the chunks.
		pFile->m_Chunks.SetSize( pFile->m_Info.m_nChunks );
		for ( int i=0; i < pFile->m_Chunks.Count(); i++ )
		{
			CChunkInfo *pChunk = &pFile->m_Chunks[i];

			pChunk->m_iChunk = (unsigned short)i;
			pChunk->m_RefCount = 0;
			pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk );
		}

		EnterCriticalSection( &m_CS );
	}

	// Boot some other file out of memory if we're out of space.		
	m_nCurMemoryUsage += ( pFile->m_Info.m_CompressedSize + pFile->m_Info.m_UncompressedSize );
	EnsureMemoryLimit( pFile );

	if ( !bFileAlreadyExisted )
	{
		pFile->m_Info.m_FileID = m_Files.AddToTail( pFile );
		LeaveCriticalSection( &m_CS );
	}

	return pFile->m_Info.m_FileID;
}


void CMasterMulticastThread::EnsureMemoryLimit( CMulticastFile *pIgnore )
{
	if ( m_nMaxMemoryUsage != 0 && m_nCurMemoryUsage > m_nMaxMemoryUsage )
	{
		// Free all the files that we can.
		FOR_EACH_LL( m_Files, iFile )
		{
			CMulticastFile *pFile = m_Files[iFile];
			if ( pFile == pIgnore || !pFile->m_bDataLoaded )
				continue;

			if ( pFile->m_ActiveChunks.Count() == 0 )
			{
				m_nCurMemoryUsage -= ( pFile->m_Info.m_CompressedSize + pFile->m_Info.m_UncompressedSize );

				pFile->m_Data.Purge();
				pFile->m_UncompressedData.Purge();
				pFile->m_bDataLoaded = false;
			}
		}
	}
}


const CUtlVector<char>& CMasterMulticastThread::GetFileData( int iFile ) const
{
	return m_Files[iFile]->m_UncompressedData;
}


// ------------------------------------------------------------------------------------------------------------------------ //
// CMasterVMPIFileSystem implementation.
// ------------------------------------------------------------------------------------------------------------------------ //

class CMasterVMPIFileSystem : public CBaseVMPIFileSystem
{
public:
	CMasterVMPIFileSystem();
	virtual ~CMasterVMPIFileSystem();
	
	bool Init( int maxMemoryUsage, IFileSystem *pPassThru );
	virtual void Term();

	virtual FileHandle_t Open( const char *pFilename, const char *pOptions, const char *pathID );
	virtual bool HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID );

	virtual void CreateVirtualFile( const char *pFilename, const void *pData, int fileLength );

	virtual CSysModule 		*LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly );
	virtual void			UnloadModule( CSysModule *pModule );


private:

	static void OnClientDisconnect( int procID, const char *pReason );


private:
	CMasterMulticastThread m_MasterThread;
	IFileSystem *m_pMasterVMPIFileSystemPassThru;

	static CMasterVMPIFileSystem *s_pMasterVMPIFileSystem;
};


CMasterVMPIFileSystem *CMasterVMPIFileSystem::s_pMasterVMPIFileSystem = NULL;


CBaseVMPIFileSystem* CreateMasterVMPIFileSystem( int maxMemoryUsage, IFileSystem *pPassThru )
{
	CMasterVMPIFileSystem *pRet = new CMasterVMPIFileSystem;
	g_pBaseVMPIFileSystem = pRet;
	if ( pRet->Init( maxMemoryUsage, pPassThru ) )
	{
		return pRet;
	}
	else
	{
		delete pRet;
		g_pBaseVMPIFileSystem = NULL;
		return NULL;
	}
}


CMasterVMPIFileSystem::CMasterVMPIFileSystem()
{
	Assert( !s_pMasterVMPIFileSystem );
	s_pMasterVMPIFileSystem = this;
}


CMasterVMPIFileSystem::~CMasterVMPIFileSystem()
{
	Assert( s_pMasterVMPIFileSystem == this );
	s_pMasterVMPIFileSystem = NULL;
}


bool CMasterVMPIFileSystem::Init( int maxMemoryUsage, IFileSystem *pPassThru )
{
	// Only init the BASE filesystem passthru. Leave the IFileSystem passthru using NULL so it'll crash 
	// immediately if they try to use a function we don't support.
	InitPassThru( pPassThru, true );
	m_pMasterVMPIFileSystemPassThru = pPassThru;

	// Pick a random IP in the multicast range (224.0.0.2 to 239.255.255.255);
	CCycleCount cnt;
	cnt.Sample();
	RandomSeed( (int)cnt.GetMicroseconds() );

	int localPort = 23412; // This can be anything.

	unsigned short port = RandomInt( 22000, 25000 );
	if ( VMPI_GetRunMode() == VMPI_RUN_NETWORKED )
	{
		if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_MULTICAST )
		{
			m_MulticastIP.port = port;
			m_MulticastIP.ip[0] = (unsigned char)RandomInt( 225, 238 );
			m_MulticastIP.ip[1] = (unsigned char)RandomInt( 0, 255 );
			m_MulticastIP.ip[2] = (unsigned char)RandomInt( 0, 255 );
			m_MulticastIP.ip[3] = (unsigned char)RandomInt( 3, 255 );
		}
		else if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST )
		{
			m_MulticastIP.Init( 0xFF, 0xFF, 0xFF, 0xFF, port );
		}
	}
	else
	{
		// Doesn't matter.. we don't use the multicast IP in TCP mode.
		m_MulticastIP.Init( 0, 0, 0, 0, 0 );
	}

	if ( !m_MasterThread.Init( pPassThru, localPort, &m_MulticastIP, maxMemoryUsage ) )
		return false;

	// Send out the multicast addr to all the clients.
	SendMulticastIP( &m_MulticastIP );

	// Make sure we're notified when a client disconnects so we can unlink them from the 
	// multicast thread's structures.
	VMPI_AddDisconnectHandler( &CMasterVMPIFileSystem::OnClientDisconnect );
	return true;
}


void CMasterVMPIFileSystem::Term()
{
	m_MasterThread.Term();
}


FileHandle_t CMasterVMPIFileSystem::Open( const char *pFilename, const char *pOptions, const char *pPathID )
{
	Assert( g_bUseMPI );

	if ( g_bDisableFileAccess )
		Error( "Open( %s, %s ) - file access has been disabled.", pFilename, pOptions );

	// Use a stdio file if they want to write to it.
	bool bWriteAccess = IsOpeningForWriteAccess( pOptions );
	if ( bWriteAccess )
	{
		FileHandle_t fp = m_pBaseFileSystemPassThru->Open( pFilename, pOptions, pPathID );
		if ( fp == FILESYSTEM_INVALID_HANDLE )
			return FILESYSTEM_INVALID_HANDLE;

		CVMPIFile_PassThru *pFile = new CVMPIFile_PassThru;
		pFile->Init( m_pBaseFileSystemPassThru, fp );
		return (FileHandle_t)pFile;
	}

	// Internally, we require path IDs to be non-null. We'll convert it back to null whenever we make filesystem calls though.
	if ( !pPathID )
		pPathID = "";

	// Have our multicast thread load all the data so it's there when workers want it.
	int iFile = m_MasterThread.FindOrAddFile( pFilename, pPathID );
	if ( iFile == -1 )
		return FILESYSTEM_INVALID_HANDLE;

	const CUtlVector<char> &data = m_MasterThread.GetFileData( iFile );

	CVMPIFile_Memory *pFile = new CVMPIFile_Memory;
	pFile->Init( data.Base(), data.Count(), strchr( pOptions, 't' ) ? 't' : 'b' );
	return (FileHandle_t)pFile;
}


void CMasterVMPIFileSystem::OnClientDisconnect( int procID, const char *pReason )
{
	s_pMasterVMPIFileSystem->m_MasterThread.OnClientDisconnect( procID );
}


void CMasterVMPIFileSystem::CreateVirtualFile( const char *pFilename, const void *pData, int fileLength )
{
	m_MasterThread.CreateVirtualFile( pFilename, pData, fileLength );
}

bool CMasterVMPIFileSystem::HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID )
{
	// Handle this packet.
	int subPacketID = pBuf->data[1];
	switch( subPacketID )
	{
		case VMPI_FSPACKETID_FILE_REQUEST:
		{
			int requestID = *((int*)&pBuf->data[2]);
			const char *pFilename = (const char*)&pBuf->data[6];
			const char *pPathID = (const char*)pFilename + strlen( pFilename ) + 1;
			
			if ( g_iVMPIVerboseLevel >= 2 )
				Msg( "Client %d requested '%s'\n", iSource, pFilename );

			bool bZeroLength;
			int fileID = m_MasterThread.AddFileRequest( pFilename, pPathID, iSource, &bZeroLength );
			
			// Send back the file ID.
			unsigned char cPacket[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_RESPONSE };
			void *pChunks[4] = { cPacket, &requestID, &fileID, &bZeroLength };
			int chunkLen[4] = { sizeof( cPacket ), sizeof( requestID ), sizeof( fileID ), sizeof( bZeroLength ) };

			VMPI_SendChunks( pChunks, chunkLen, ARRAYSIZE( pChunks ), iSource );
		}
		return true;

		case VMPI_FSPACKETID_CHUNK_RECEIVED:
		{
			unsigned short *pFileID = (unsigned short*)&pBuf->data[2];
			unsigned short *pChunkID = pFileID+1;

			int nChunks = (pBuf->getLen() - 2) / 4;
			for ( int i=0; i < nChunks; i++ )
			{
				m_MasterThread.OnChunkReceived( *pFileID, iSource, *pChunkID );
				pFileID += 2;
				pChunkID += 2;
			}
		}
		return true;

		case VMPI_FSPACKETID_FILE_RECEIVED:
		{
			unsigned short *pFileID = (unsigned short*)&pBuf->data[2];
			m_MasterThread.OnFileReceived( *pFileID, iSource );
		}
		return true;
		
		default:
			return false;
	}
}


CSysModule* CMasterVMPIFileSystem::LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly )
{
	return m_pMasterVMPIFileSystemPassThru->LoadModule( pFileName, pPathID, bValidatedDllOnly );
}

void CMasterVMPIFileSystem::UnloadModule( CSysModule *pModule )
{
	m_pMasterVMPIFileSystemPassThru->UnloadModule( pModule );
}