You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
550 lines
15 KiB
550 lines
15 KiB
// network.cpp - written and placed in the public domain by Wei Dai |
|
|
|
#include "pch.h" |
|
#include "network.h" |
|
#include "wait.h" |
|
|
|
#define CRYPTOPP_TRACE_NETWORK 0 |
|
|
|
NAMESPACE_BEGIN(CryptoPP) |
|
|
|
#ifdef HIGHRES_TIMER_AVAILABLE |
|
|
|
lword LimitedBandwidth::ComputeCurrentTransceiveLimit() |
|
{ |
|
if (!m_maxBytesPerSecond) |
|
return ULONG_MAX; |
|
|
|
double curTime = GetCurTimeAndCleanUp(); |
|
lword total = 0; |
|
for (OpQueue::size_type i=0; i!=m_ops.size(); ++i) |
|
total += m_ops[i].second; |
|
return SaturatingSubtract(m_maxBytesPerSecond, total); |
|
} |
|
|
|
double LimitedBandwidth::TimeToNextTransceive() |
|
{ |
|
if (!m_maxBytesPerSecond) |
|
return 0; |
|
|
|
if (!m_nextTransceiveTime) |
|
ComputeNextTransceiveTime(); |
|
|
|
return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble()); |
|
} |
|
|
|
void LimitedBandwidth::NoteTransceive(lword size) |
|
{ |
|
if (m_maxBytesPerSecond) |
|
{ |
|
double curTime = GetCurTimeAndCleanUp(); |
|
m_ops.push_back(std::make_pair(curTime, size)); |
|
m_nextTransceiveTime = 0; |
|
} |
|
} |
|
|
|
void LimitedBandwidth::ComputeNextTransceiveTime() |
|
{ |
|
double curTime = GetCurTimeAndCleanUp(); |
|
lword total = 0; |
|
for (unsigned int i=0; i!=m_ops.size(); ++i) |
|
total += m_ops[i].second; |
|
m_nextTransceiveTime = |
|
(total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000; |
|
} |
|
|
|
double LimitedBandwidth::GetCurTimeAndCleanUp() |
|
{ |
|
if (!m_maxBytesPerSecond) |
|
return 0; |
|
|
|
double curTime = m_timer.ElapsedTimeAsDouble(); |
|
while (m_ops.size() && (m_ops.front().first + 1000 < curTime)) |
|
m_ops.pop_front(); |
|
return curTime; |
|
} |
|
|
|
void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack) |
|
{ |
|
double nextTransceiveTime = TimeToNextTransceive(); |
|
if (nextTransceiveTime) |
|
container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack)); |
|
} |
|
|
|
// ************************************************************* |
|
|
|
size_t NonblockingSource::GeneralPump2( |
|
lword& byteCount, bool blockingOutput, |
|
unsigned long maxTime, bool checkDelimiter, byte delimiter) |
|
{ |
|
m_blockedBySpeedLimit = false; |
|
|
|
if (!GetMaxBytesPerSecond()) |
|
{ |
|
size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter); |
|
m_doPumpBlocked = (ret != 0); |
|
return ret; |
|
} |
|
|
|
bool forever = (maxTime == INFINITE_TIME); |
|
unsigned long timeToGo = maxTime; |
|
Timer timer(Timer::MILLISECONDS, forever); |
|
lword maxSize = byteCount; |
|
byteCount = 0; |
|
|
|
timer.StartTimer(); |
|
|
|
while (true) |
|
{ |
|
lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount); |
|
|
|
if (curMaxSize || m_doPumpBlocked) |
|
{ |
|
if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); |
|
size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter); |
|
m_doPumpBlocked = (ret != 0); |
|
if (curMaxSize) |
|
{ |
|
NoteTransceive(curMaxSize); |
|
byteCount += curMaxSize; |
|
} |
|
if (ret) |
|
return ret; |
|
} |
|
|
|
if (maxSize != ULONG_MAX && byteCount >= maxSize) |
|
break; |
|
|
|
if (!forever) |
|
{ |
|
timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); |
|
if (!timeToGo) |
|
break; |
|
} |
|
|
|
double waitTime = TimeToNextTransceive(); |
|
if (!forever && waitTime > timeToGo) |
|
{ |
|
m_blockedBySpeedLimit = true; |
|
break; |
|
} |
|
|
|
WaitObjectContainer container; |
|
LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0)); |
|
container.Wait((unsigned long)waitTime); |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking) |
|
{ |
|
if (messageCount == 0) |
|
return 0; |
|
|
|
messageCount = 0; |
|
|
|
lword byteCount; |
|
do { |
|
byteCount = LWORD_MAX; |
|
RETURN_IF_NONZERO(Pump2(byteCount, blocking)); |
|
} while(byteCount == LWORD_MAX); |
|
|
|
if (!m_messageEndSent && SourceExhausted()) |
|
{ |
|
RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true)); |
|
m_messageEndSent = true; |
|
messageCount = 1; |
|
} |
|
return 0; |
|
} |
|
|
|
lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize) |
|
{ |
|
m_blockedBySpeedLimit = false; |
|
|
|
size_t curBufSize = GetCurrentBufferSize(); |
|
if (curBufSize <= targetSize && (targetSize || !EofPending())) |
|
return 0; |
|
|
|
if (!GetMaxBytesPerSecond()) |
|
return DoFlush(maxTime, targetSize); |
|
|
|
bool forever = (maxTime == INFINITE_TIME); |
|
unsigned long timeToGo = maxTime; |
|
Timer timer(Timer::MILLISECONDS, forever); |
|
lword totalFlushed = 0; |
|
|
|
timer.StartTimer(); |
|
|
|
while (true) |
|
{ |
|
size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit()); |
|
if (flushSize || EofPending()) |
|
{ |
|
if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); |
|
size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize); |
|
if (ret) |
|
{ |
|
NoteTransceive(ret); |
|
curBufSize -= ret; |
|
totalFlushed += ret; |
|
} |
|
} |
|
|
|
if (curBufSize <= targetSize && (targetSize || !EofPending())) |
|
break; |
|
|
|
if (!forever) |
|
{ |
|
timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); |
|
if (!timeToGo) |
|
break; |
|
} |
|
|
|
double waitTime = TimeToNextTransceive(); |
|
if (!forever && waitTime > timeToGo) |
|
{ |
|
m_blockedBySpeedLimit = true; |
|
break; |
|
} |
|
|
|
WaitObjectContainer container; |
|
LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0)); |
|
container.Wait((unsigned long)waitTime); |
|
} |
|
|
|
return totalFlushed; |
|
} |
|
|
|
bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking) |
|
{ |
|
TimedFlush(blocking ? INFINITE_TIME : 0); |
|
return hardFlush && (!!GetCurrentBufferSize() || EofPending()); |
|
} |
|
|
|
// ************************************************************* |
|
|
|
NetworkSource::NetworkSource(BufferedTransformation *attachment) |
|
: NonblockingSource(attachment), m_buf(1024*16) |
|
, m_waitingForResult(false), m_outputBlocked(false) |
|
, m_dataBegin(0), m_dataEnd(0) |
|
{ |
|
} |
|
|
|
unsigned int NetworkSource::GetMaxWaitObjectCount() const |
|
{ |
|
return LimitedBandwidth::GetMaxWaitObjectCount() |
|
+ GetReceiver().GetMaxWaitObjectCount() |
|
+ AttachedTransformation()->GetMaxWaitObjectCount(); |
|
} |
|
|
|
void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack) |
|
{ |
|
if (BlockedBySpeedLimit()) |
|
LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack)); |
|
else if (!m_outputBlocked) |
|
{ |
|
if (m_dataBegin == m_dataEnd) |
|
AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack)); |
|
else |
|
container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack)); |
|
} |
|
|
|
AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack)); |
|
} |
|
|
|
size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter) |
|
{ |
|
NetworkReceiver &receiver = AccessReceiver(); |
|
|
|
lword maxSize = byteCount; |
|
byteCount = 0; |
|
bool forever = maxTime == INFINITE_TIME; |
|
Timer timer(Timer::MILLISECONDS, forever); |
|
BufferedTransformation *t = AttachedTransformation(); |
|
|
|
if (m_outputBlocked) |
|
goto DoOutput; |
|
|
|
while (true) |
|
{ |
|
if (m_dataBegin == m_dataEnd) |
|
{ |
|
if (receiver.EofReceived()) |
|
break; |
|
|
|
if (m_waitingForResult) |
|
{ |
|
if (receiver.MustWaitForResult() && |
|
!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), |
|
CallStack("NetworkSource::DoPump() - wait receive result", 0))) |
|
break; |
|
|
|
unsigned int recvResult = receiver.GetReceiveResult(); |
|
#if CRYPTOPP_TRACE_NETWORK |
|
OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str()); |
|
#endif |
|
m_dataEnd += recvResult; |
|
m_waitingForResult = false; |
|
|
|
if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size()) |
|
goto ReceiveNoWait; |
|
} |
|
else |
|
{ |
|
m_dataEnd = m_dataBegin = 0; |
|
|
|
if (receiver.MustWaitToReceive()) |
|
{ |
|
if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), |
|
CallStack("NetworkSource::DoPump() - wait receive", 0))) |
|
break; |
|
|
|
receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd); |
|
m_waitingForResult = true; |
|
} |
|
else |
|
{ |
|
ReceiveNoWait: |
|
m_waitingForResult = true; |
|
// call Receive repeatedly as long as data is immediately available, |
|
// because some receivers tend to return data in small pieces |
|
#if CRYPTOPP_TRACE_NETWORK |
|
OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str()); |
|
#endif |
|
while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd)) |
|
{ |
|
unsigned int recvResult = receiver.GetReceiveResult(); |
|
#if CRYPTOPP_TRACE_NETWORK |
|
OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str()); |
|
#endif |
|
m_dataEnd += recvResult; |
|
if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2) |
|
{ |
|
m_waitingForResult = false; |
|
break; |
|
} |
|
} |
|
} |
|
} |
|
} |
|
else |
|
{ |
|
m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount); |
|
|
|
if (checkDelimiter) |
|
m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin); |
|
|
|
DoOutput: |
|
size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput); |
|
if (result) |
|
{ |
|
if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), |
|
CallStack("NetworkSource::DoPump() - wait attachment", 0))) |
|
goto DoOutput; |
|
else |
|
{ |
|
m_outputBlocked = true; |
|
return result; |
|
} |
|
} |
|
m_outputBlocked = false; |
|
|
|
byteCount += m_putSize; |
|
m_dataBegin += m_putSize; |
|
if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter) |
|
break; |
|
if (maxSize != ULONG_MAX && byteCount == maxSize) |
|
break; |
|
// once time limit is reached, return even if there is more data waiting |
|
// but make 0 a special case so caller can request a large amount of data to be |
|
// pumped as long as it is immediately available |
|
if (maxTime > 0 && timer.ElapsedTime() > maxTime) |
|
break; |
|
} |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
// ************************************************************* |
|
|
|
NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound) |
|
: m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound) |
|
, m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE) |
|
, m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) |
|
, m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0) |
|
, m_currentSpeed(0), m_maxObservedSpeed(0) |
|
{ |
|
} |
|
|
|
float NetworkSink::ComputeCurrentSpeed() |
|
{ |
|
if (m_speedTimer.ElapsedTime() > 1000) |
|
{ |
|
m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime(); |
|
m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f); |
|
m_byteCountSinceLastTimerReset = 0; |
|
m_speedTimer.StartTimer(); |
|
// OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str()); |
|
} |
|
return m_currentSpeed; |
|
} |
|
|
|
float NetworkSink::GetMaxObservedSpeed() const |
|
{ |
|
lword m = GetMaxBytesPerSecond(); |
|
return m ? STDMIN(m_maxObservedSpeed, float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed; |
|
} |
|
|
|
unsigned int NetworkSink::GetMaxWaitObjectCount() const |
|
{ |
|
return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount(); |
|
} |
|
|
|
void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack) |
|
{ |
|
if (BlockedBySpeedLimit()) |
|
LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack)); |
|
else if (m_wasBlocked) |
|
AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack)); |
|
else if (!m_buffer.IsEmpty()) |
|
AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack)); |
|
else if (EofPending()) |
|
AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack)); |
|
} |
|
|
|
size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking) |
|
{ |
|
if (m_eofState == EOF_DONE) |
|
{ |
|
if (length || messageEnd) |
|
throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent"); |
|
|
|
return 0; |
|
} |
|
|
|
if (m_eofState > EOF_NONE) |
|
goto EofSite; |
|
|
|
{ |
|
if (m_skipBytes) |
|
{ |
|
assert(length >= m_skipBytes); |
|
inString += m_skipBytes; |
|
length -= m_skipBytes; |
|
} |
|
|
|
m_buffer.Put(inString, length); |
|
|
|
if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound) |
|
TimedFlush(0, 0); |
|
|
|
size_t targetSize = messageEnd ? 0 : m_maxBufferSize; |
|
if (blocking) |
|
TimedFlush(INFINITE_TIME, targetSize); |
|
|
|
if (m_buffer.CurrentSize() > targetSize) |
|
{ |
|
assert(!blocking); |
|
m_wasBlocked = true; |
|
m_skipBytes += length; |
|
size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize); |
|
return STDMAX<size_t>(blockedBytes, 1); |
|
} |
|
|
|
m_wasBlocked = false; |
|
m_skipBytes = 0; |
|
} |
|
|
|
if (messageEnd) |
|
{ |
|
m_eofState = EOF_PENDING_SEND; |
|
|
|
EofSite: |
|
TimedFlush(blocking ? INFINITE_TIME : 0, 0); |
|
if (m_eofState != EOF_DONE) |
|
return 1; |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize) |
|
{ |
|
NetworkSender &sender = AccessSender(); |
|
|
|
bool forever = maxTime == INFINITE_TIME; |
|
Timer timer(Timer::MILLISECONDS, forever); |
|
unsigned int totalFlushSize = 0; |
|
|
|
while (true) |
|
{ |
|
if (m_buffer.CurrentSize() <= targetSize) |
|
break; |
|
|
|
if (m_needSendResult) |
|
{ |
|
if (sender.MustWaitForResult() && |
|
!sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), |
|
CallStack("NetworkSink::DoFlush() - wait send result", 0))) |
|
break; |
|
|
|
unsigned int sendResult = sender.GetSendResult(); |
|
#if CRYPTOPP_TRACE_NETWORK |
|
OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str()); |
|
#endif |
|
m_buffer.Skip(sendResult); |
|
totalFlushSize += sendResult; |
|
m_needSendResult = false; |
|
|
|
if (!m_buffer.AnyRetrievable()) |
|
break; |
|
} |
|
|
|
unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0; |
|
if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0))) |
|
break; |
|
|
|
size_t contiguousSize = 0; |
|
const byte *block = m_buffer.Spy(contiguousSize); |
|
|
|
#if CRYPTOPP_TRACE_NETWORK |
|
OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str()); |
|
#endif |
|
sender.Send(block, contiguousSize); |
|
m_needSendResult = true; |
|
|
|
if (maxTime > 0 && timeOut == 0) |
|
break; // once time limit is reached, return even if there is more data waiting |
|
} |
|
|
|
m_byteCountSinceLastTimerReset += totalFlushSize; |
|
ComputeCurrentSpeed(); |
|
|
|
if (m_buffer.IsEmpty() && !m_needSendResult) |
|
{ |
|
if (m_eofState == EOF_PENDING_SEND) |
|
{ |
|
sender.SendEof(); |
|
m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE; |
|
} |
|
|
|
while (m_eofState == EOF_PENDING_DELIVERY) |
|
{ |
|
unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0; |
|
if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0))) |
|
break; |
|
|
|
if (sender.EofSent()) |
|
m_eofState = EOF_DONE; |
|
} |
|
} |
|
|
|
return totalFlushSize; |
|
} |
|
|
|
#endif // #ifdef HIGHRES_TIMER_AVAILABLE |
|
|
|
NAMESPACE_END
|
|
|