From e1027ffb7b2ca580de68b0b8804bbcbad28e7931 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 11 Apr 2014 21:13:52 -0400 Subject: [PATCH] replace ReceiveQueue to std::queue --- Streaming.cpp | 35 ++++++++++++++++------------------- Streaming.h | 6 +++--- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index f8c63378..cb0cc285 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -28,8 +28,12 @@ namespace stream Stream::~Stream () { m_ReceiveTimer.cancel (); - while (auto packet = m_ReceiveQueue.Get ()) + while (!m_ReceiveQueue.empty ()) + { + auto packet = m_ReceiveQueue.front (); + m_ReceiveQueue.pop (); delete packet; + } for (auto it: m_SavedPackets) delete it; } @@ -118,7 +122,7 @@ namespace stream packet->offset = packet->GetPayload () - packet->buf; if (packet->GetLength () > 0) { - m_ReceiveQueue.Put (packet); + m_ReceiveQueue.push (packet); m_ReceiveTimer.cancel (); } else @@ -131,7 +135,6 @@ namespace stream LogPrint ("Closed"); SendQuickAck (); // send ack for close explicitly? m_IsOpen = false; - m_ReceiveQueue.WakeUp (); } } @@ -239,30 +242,24 @@ namespace stream if (SendPacket (packet, size)) LogPrint ("FIN sent"); - m_ReceiveQueue.WakeUp (); } } size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) { size_t pos = 0; - while (pos < len) + while (pos < len && !m_ReceiveQueue.empty ()) { - Packet * packet = m_ReceiveQueue.Peek (); - if (packet) + Packet * packet = m_ReceiveQueue.front (); + size_t l = std::min (packet->GetLength (), len - pos); + memcpy (buf + pos, packet->GetBuffer (), l); + pos += l; + packet->offset += l; + if (!packet->GetLength ()) { - size_t l = std::min (packet->GetLength (), len - pos); - memcpy (buf + pos, packet->GetBuffer (), l); - pos += l; - packet->offset += l; - if (!packet->GetLength ()) - { - m_ReceiveQueue.Get (); - delete packet; - } - } - else // no more data available - break; + m_ReceiveQueue.pop (); + delete packet; + } } return pos; } diff --git a/Streaming.h b/Streaming.h index 4d597345..3a0e5114 100644 --- a/Streaming.h +++ b/Streaming.h @@ -5,12 +5,12 @@ #include #include #include +#include #include #include #include #include #include "I2PEndian.h" -#include "Queue.h" #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" @@ -112,7 +112,7 @@ namespace stream StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet& m_RemoteLeaseSet; i2p::data::Lease m_CurrentRemoteLease; - i2p::util::Queue m_ReceiveQueue; + std::queue m_ReceiveQueue; std::set m_SavedPackets; i2p::tunnel::OutboundTunnel * m_OutboundTunnel; boost::asio::deadline_timer m_ReceiveTimer; @@ -204,7 +204,7 @@ namespace stream template void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout) { - if (!m_ReceiveQueue.IsEmpty ()) + if (!m_ReceiveQueue.empty ()) { size_t received = ConcatenatePackets (boost::asio::buffer_cast(buffer), boost::asio::buffer_size(buffer)); if (received)