Modified source engine (2017) developed by valve and leaked in 2020. Not for commercial purporses
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

456 lines
14 KiB

5 years ago
//========= Copyright Valve Corporation, All rights reserved. ============//
//
//=======================================================================================//
#include "sv_sessionblockpublisher.h"
#include "sv_replaycontext.h"
#include "demofile/demoformat.h"
#include "sv_recordingsession.h"
#include "sv_recordingsessionblock.h"
#include "sv_sessioninfopublisher.h"
// memdbgon must be the last include file in a .cpp file!!!
#include "tier0/memdbgon.h"
//----------------------------------------------------------------------------------------
CSessionBlockPublisher::CSessionBlockPublisher( CServerRecordingSession *pSession,
CSessionInfoPublisher *pSessionInfoPublisher )
: m_pSession( pSession ),
m_pSessionInfoPublisher( pSessionInfoPublisher )
{
// Cache the dump interval so it can't be modified during a round - doing so would require
// an update on all clients.
extern ConVar replay_block_dump_interval;
m_nDumpInterval = MAX( MIN_SERVER_DUMP_INTERVAL, replay_block_dump_interval.GetInt() );
// Write the first block 15 or so seconds from now
m_flLastBlockWriteTime = g_pEngine->GetHostTime();
}
CSessionBlockPublisher::~CSessionBlockPublisher()
{
}
void CSessionBlockPublisher::PublishAllSynchronous()
{
while ( !IsDone() )
{
Think();
}
}
void CSessionBlockPublisher::AbortPublish()
{
FOR_EACH_LL( m_lstPublishingBlocks, it )
{
CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ it ];
IFilePublisher *&pPublisher = pCurBlock->m_pPublisher; // Shorthand
if ( !pPublisher )
continue;
// Already done?
if ( pPublisher->IsDone() )
continue;
pPublisher->AbortAndCleanup();
}
// Remove all blocks
m_lstPublishingBlocks.RemoveAll();
}
void CSessionBlockPublisher::OnStartRecording()
{
}
void CSessionBlockPublisher::OnStopRecord( bool bAborting )
{
if ( !bAborting )
{
// Write one final session block.
WriteAndPublishSessionBlock();
}
}
ReplayHandle_t CSessionBlockPublisher::GetSessionHandle() const
{
return m_pSession->GetHandle();
}
void CSessionBlockPublisher::WriteAndPublishSessionBlock()
{
// Make sure there is data to write
uint8 *pSessionBuffer;
int nSessionBufferSize;
g_pEngine->GetSessionRecordBuffer( &pSessionBuffer, &nSessionBufferSize ); // This will get called the last client disconnects from the server - but in waiting for players state we won't have a demo buffer
if ( !pSessionBuffer || nSessionBufferSize == 0 )
return;
// Create a new block
CServerRecordingSessionBlock *pNewBlock = SV_CastBlock( SV_GetRecordingSessionBlockManager()->CreateAndGenerateHandle() );
if ( !pNewBlock )
{
Warning( "Failed to create replay \"%s\"\n", pNewBlock->m_szFullFilename );
delete pNewBlock;
return;
}
if ( m_pSession->m_nServerStartRecordTick < 0 )
{
Warning( "Error: Current recording start tick was not properly setup. Aborting block write.\n" );
delete pNewBlock;
return;
}
// Figure out what the current block is
const int iCurrentSessionBlock = m_pSession->GetNumBlocks();
// Add an entry to the server index with the "writing" status set
const char *pFullFilename = Replay_va(
"%s%s_part_%u.%s", SV_GetTmpDir(),
SV_GetRecordingSessionManager()->GetCurrentSessionName(), iCurrentSessionBlock, BLOCK_FILE_EXTENSION
);
V_strcpy( pNewBlock->m_szFullFilename, pFullFilename );
pNewBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_INVALID; // Must be set here to trigger write
pNewBlock->m_nRemoteStatus = CBaseRecordingSessionBlock::STATUS_WRITING;
pNewBlock->m_iReconstruction = iCurrentSessionBlock;
pNewBlock->m_hSession = m_pSession->GetHandle();
// Match the session's lock - the block will be unlocked once recording has stopped and all publishing is complete.
pNewBlock->SetLocked( true );
// Commit the replay to the history manager's list
SV_GetRecordingSessionBlockManager()->Add( pNewBlock );
// Also store a pointer to the block in the session - NOTE: session will not attempt to free this pointer
m_pSession->AddBlock( pNewBlock, false );
// Cache the block temporarily while the binary block itself writes to disk - NOTE: will not attempt to free
m_lstPublishingBlocks.AddToTail( pNewBlock );
// Write the block now
PublishBlock( pNewBlock ); // pNewBlock->m_nWriteStatus modified here
IF_REPLAY_DBG( Warning( "%f: (%i) Publishing new block, %s\n", g_pEngine->GetHostTime(), iCurrentSessionBlock, pNewBlock->GetFilename() ) );
}
void CSessionBlockPublisher::GatherBlockData( uint8 *pSessionBuffer, int nSessionBufferSize, CServerRecordingSessionBlock *pBlock, unsigned char **ppSafeBlockData, int *pBlockSize )
{
const int nHeaderSize = sizeof( demoheader_t );
int nBlockOffset = 0;
const int nBlockSize = nSessionBufferSize;
int nTotalSize = nBlockSize;
demoheader_t *pHeader = NULL;
// If this is the first block, pass in a header to be written. Otherwise, just write the block.
if ( !pBlock->m_iReconstruction )
{
// Setup start tick in the header
pHeader = g_pEngine->GetReplayDemoHeader();
// Add header size
nBlockOffset = nHeaderSize;
nTotalSize += nHeaderSize;
}
// Make a copy of the block
unsigned char *pBuffer = new unsigned char[ nTotalSize ];
unsigned char *pBlockCopy = pBuffer + nBlockOffset;
// Only write the header if necessary
if ( pHeader )
{
demoheader_t littleEndianHeader = *pHeader;
littleEndianHeader.playback_time = FLT_MAX;
littleEndianHeader.playback_ticks = INT_MAX;
littleEndianHeader.playback_frames = INT_MAX;
// Byteswap
ByteSwap_demoheader_t( littleEndianHeader );
// Write header
V_memcpy( pBuffer, &littleEndianHeader, sizeof( littleEndianHeader ) );
}
// Note that pBlockCopy is based on pBuffer, which was allocated with nBlockSize PLUS
// header size - this will not overflow.
V_memcpy( pBlockCopy, pSessionBuffer, nBlockSize );
// Copy to "out" parameters
*pBlockSize = nTotalSize;
*ppSafeBlockData = pBuffer;
}
void CSessionBlockPublisher::PublishBlock( CServerRecordingSessionBlock *pBlock )
{
uint8 *pSessionBuffer;
int nSessionBufferSize;
if ( !g_pEngine->GetSessionRecordBuffer( &pSessionBuffer, &nSessionBufferSize ) )
{
Warning( "Block publish failed!\n" );
return;
}
unsigned char *pSafeBlockData;
int nBlockSize;
GatherBlockData( pSessionBuffer, nSessionBufferSize, pBlock, &pSafeBlockData, &nBlockSize );
// We've got what we need and can reset the put ptr
g_pEngine->ResetReplayRecordBuffer();
AssertMsg( !pBlock->m_pPublisher, "No publisher should exist for this block yet!" );
// Set status to working
pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_WORKING;
// Get the number of bytes written
pBlock->m_uFileSize = nBlockSize;
// Make sure the main thread doesn't unload the block while it's being published
pBlock->SetLocked( true );
// Asynchronously publish to fileserver
PublishFileParams_t params;
params.m_pOutFilename = pBlock->m_szFullFilename;
params.m_pSrcData = pSafeBlockData;
params.m_nSrcSize = nBlockSize;
params.m_pCallbackHandler = this;
params.m_nCompressorType = COMPRESSORTYPE_BZ2;
params.m_bHash = true;
params.m_bFreeSrcData = true;
params.m_bDeleteFile = false;
params.m_pUserData = pBlock;
pBlock->m_pPublisher = SV_PublishFile( params );
}
void CSessionBlockPublisher::OnPublishComplete( const IFilePublisher *pPublisher, void *pUserData )
{
CServerRecordingSessionBlock *pBlock = (CServerRecordingSessionBlock *)pUserData;
Assert( pBlock );
// Set block status
if ( pPublisher->GetStatus() == IFilePublisher::PUBLISHSTATUS_OK )
{
pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_SUCCESS;
}
else
{
pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_FAILED;
// Publish failed - handle as needed
g_pServerReplayContext->OnPublishFailed();
}
// Did the block compress OK?
if ( pPublisher->Compressed() )
{
// Cache compressor type
pBlock->m_nCompressorType = pPublisher->GetCompressorType();
const int nCompressedSize = pPublisher->GetCompressedSize();
const float flRatio = (float)pBlock->m_uFileSize / nCompressedSize;
IF_REPLAY_DBG( Warning( "Block compression ratio: %.3f:1\n", flRatio ) );
// Update size
pBlock->m_uUncompressedSize = pBlock->m_uFileSize;
pBlock->m_uFileSize = nCompressedSize;
}
// Get the MD5
if ( pPublisher->Hashed() )
{
pPublisher->GetHash( pBlock->m_aHash );
}
// Now that m_nWriteStatus has been set in the block, the session info will be updated
// accordingly the next time PublishThink() is run.
// Mark the block as dirty since it was modified
Assert( pBlock->m_nWriteStatus != CServerRecordingSessionBlock::WRITESTATUS_INVALID );
SV_GetRecordingSessionBlockManager()->FlagForFlush( pBlock, false );
IF_REPLAY_DBG( Warning( "Publish complete for block %s\n", pBlock->GetDebugName() ) );
}
void CSessionBlockPublisher::OnPublishAborted( const IFilePublisher *pPublisher )
{
CServerRecordingSessionBlock *pBlock = FindBlockFromPublisher( pPublisher );
// Update the block's status
if ( pBlock )
{
pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_FAILED;
}
g_pServerReplayContext->OnPublishFailed();
}
CServerRecordingSessionBlock *CSessionBlockPublisher::FindBlockFromPublisher( const IFilePublisher *pPublisher )
{
FOR_EACH_LL( m_lstPublishingBlocks, i )
{
CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ i ];
if ( pCurBlock->m_pPublisher == pPublisher )
{
return pCurBlock;
}
}
AssertMsg( 0, "Could not find block with the given publisher!" );
return NULL;
}
void CSessionBlockPublisher::Think()
{
// NOTE: This member function gets called even if replay is disabled. This is intentional.
VPROF_BUDGET( "CSessionBlockPublisher::Think", VPROF_BUDGETGROUP_REPLAY );
PublishThink();
}
void CSessionBlockPublisher::PublishThink()
{
AssertMsg( m_pSession->IsLocked(), "The session isn't locked, which means blocks can be being deleted and will probably cause a crash." );
// Go through all currently publishing blocks and free/think
FOR_EACH_LL( m_lstPublishingBlocks, it )
{
CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ it ];
IFilePublisher *&pPublisher = pCurBlock->m_pPublisher; // Shorthand
if ( !pPublisher )
continue;
// If the publisher's done, free it
if ( pPublisher->IsDone() )
{
delete pPublisher;
pPublisher = NULL;
}
else
{
// Let the publisher think
pPublisher->Think();
}
}
// Write a new session block out right now?
float flHostTime = g_pEngine->GetHostTime();
if ( m_flLastBlockWriteTime != 0.0f &&
flHostTime - m_flLastBlockWriteTime >= m_nDumpInterval &&
m_pSession->m_bRecording )
{
Assert( m_nDumpInterval > 0 );
// Write it
WriteAndPublishSessionBlock();
// Update the time
m_flLastBlockWriteTime = flHostTime;
}
// Check status of any replays that are being written
bool bUpdateSessionInfo = false;
for( int it = m_lstPublishingBlocks.Head(); it != m_lstPublishingBlocks.InvalidIndex(); )
{
CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ it ];
// Updated when write status is set to success or failure
int nPendingRequestStatus = CBaseRecordingSessionBlock::STATUS_INVALID;
// If set to anything besides InvalidIndex(), it will be removed from the list
int itRemove = m_lstPublishingBlocks.InvalidIndex();
bool bWriteBlockInfoToDisk = false;
switch ( pCurBlock->m_nWriteStatus )
{
case CServerRecordingSessionBlock::WRITESTATUS_INVALID:
AssertMsg( 0, "Why is m_nWriteStatus WRITESTATUS_INVALID here?" );
break;
case CServerRecordingSessionBlock::WRITESTATUS_WORKING: // Do nothing if still writing
break;
case CServerRecordingSessionBlock::WRITESTATUS_SUCCESS:
IF_REPLAY_DBG2( Warning( " Block %i marked as succeeded.\n", pCurBlock->m_iReconstruction ) );
pCurBlock->m_nRemoteStatus = CBaseRecordingSessionBlock::STATUS_READYFORDOWNLOAD;
nPendingRequestStatus = pCurBlock->m_nRemoteStatus;
bWriteBlockInfoToDisk = true;
itRemove = it;
break;
case CServerRecordingSessionBlock::WRITESTATUS_FAILED:
default: // Error?
IF_REPLAY_DBG2( Warning( " Block %i marked as failed.\n", pCurBlock->m_iReconstruction ) );
pCurBlock->m_nRemoteStatus = CBaseRecordingSessionBlock::STATUS_ERROR;
pCurBlock->m_nHttpError = CBaseRecordingSessionBlock::ERROR_WRITEFAILED;
nPendingRequestStatus = pCurBlock->m_nRemoteStatus;
bWriteBlockInfoToDisk = true;
itRemove = it;
// TODO: Retry
}
if ( bWriteBlockInfoToDisk )
{
// Save the master index file
Assert( pCurBlock->m_nWriteStatus != CServerRecordingSessionBlock::WRITESTATUS_INVALID );
SV_GetRecordingSessionBlockManager()->FlagForFlush( pCurBlock, false );
}
// Find the owning session
Assert( pCurBlock->m_hSession == m_pSession->GetHandle() );
// Refresh session info file
if ( nPendingRequestStatus != CBaseRecordingSessionBlock::STATUS_INVALID )
{
// Update it after this loop
bUpdateSessionInfo = true;
}
// Update iterator
it = m_lstPublishingBlocks.Next( it );
// Remove?
if ( itRemove != m_lstPublishingBlocks.InvalidIndex() )
{
IF_REPLAY_DBG( Warning( "Removing block %i from publisher\n", pCurBlock->m_iReconstruction ) );
// Free/clear publisher
delete pCurBlock->m_pPublisher;
pCurBlock->m_pPublisher = NULL;
// Removes from the list but doesn't free, since any pointer here points to a block somewhere
m_lstPublishingBlocks.Unlink( itRemove );
}
}
// Publish session info file now if it isn't already publishing
if ( bUpdateSessionInfo )
{
m_pSessionInfoPublisher->Publish();
}
}
bool CSessionBlockPublisher::IsDone() const
{
return m_lstPublishingBlocks.Count() == 0;
}
#ifdef _DEBUG
void CSessionBlockPublisher::Validate()
{
FOR_EACH_LL( m_lstPublishingBlocks, i )
{
CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ i ];
Assert( pCurBlock->m_nRemoteStatus == CBaseRecordingSessionBlock::STATUS_READYFORDOWNLOAD );
}
}
#endif
//----------------------------------------------------------------------------------------