Browse Source

eliminate send buffer mutex completely

pull/1949/merge
orignal 1 year ago
parent
commit
21259204b1
  1. 11
      libi2pd/Streaming.cpp
  2. 2
      libi2pd/Streaming.h

11
libi2pd/Streaming.cpp

@ -19,11 +19,6 @@ namespace i2p
{ {
namespace stream namespace stream
{ {
void SendBufferQueue::Add (const uint8_t * buf, size_t len, SendHandler handler)
{
Add (std::make_shared<SendBuffer>(buf, len, handler));
}
void SendBufferQueue::Add (std::shared_ptr<SendBuffer> buf) void SendBufferQueue::Add (std::shared_ptr<SendBuffer> buf)
{ {
if (buf) if (buf)
@ -115,10 +110,7 @@ namespace stream
void Stream::CleanUp () void Stream::CleanUp ()
{ {
{
std::unique_lock<std::mutex> l(m_SendBufferMutex);
m_SendBuffer.CleanUp (); m_SendBuffer.CleanUp ();
}
while (!m_ReceiveQueue.empty ()) while (!m_ReceiveQueue.empty ())
{ {
auto packet = m_ReceiveQueue.front (); auto packet = m_ReceiveQueue.front ();
@ -553,8 +545,6 @@ namespace stream
bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet
std::vector<Packet *> packets; std::vector<Packet *> packets;
{
std::unique_lock<std::mutex> l(m_SendBufferMutex);
while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0)) while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0))
{ {
Packet * p = m_LocalDestination.NewPacket (); Packet * p = m_LocalDestination.NewPacket ();
@ -638,7 +628,6 @@ namespace stream
packets.push_back (p); packets.push_back (p);
numMsgs--; numMsgs--;
} }
}
if (packets.size () > 0) if (packets.size () > 0)
{ {
if (m_SavedPackets.empty ()) // no NACKS if (m_SavedPackets.empty ()) // no NACKS

2
libi2pd/Streaming.h

@ -135,7 +135,6 @@ namespace stream
SendBufferQueue (): m_Size (0) {}; SendBufferQueue (): m_Size (0) {};
~SendBufferQueue () { CleanUp (); }; ~SendBufferQueue () { CleanUp (); };
void Add (const uint8_t * buf, size_t len, SendHandler handler);
void Add (std::shared_ptr<SendBuffer> buf); void Add (std::shared_ptr<SendBuffer> buf);
size_t Get (uint8_t * buf, size_t len); size_t Get (uint8_t * buf, size_t len);
size_t GetSize () const { return m_Size; }; size_t GetSize () const { return m_Size; };
@ -251,7 +250,6 @@ namespace stream
size_t m_NumSentBytes, m_NumReceivedBytes; size_t m_NumSentBytes, m_NumReceivedBytes;
uint16_t m_Port; uint16_t m_Port;
std::mutex m_SendBufferMutex;
SendBufferQueue m_SendBuffer; SendBufferQueue m_SendBuffer;
int m_WindowSize, m_RTT, m_RTO, m_AckDelay; int m_WindowSize, m_RTT, m_RTO, m_AckDelay;
uint64_t m_LastWindowSizeIncreaseTime; uint64_t m_LastWindowSizeIncreaseTime;

Loading…
Cancel
Save