From 04dc34260fe803baf6fdd65c94dab8b47461b1b1 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 26 Feb 2017 15:05:14 -0500 Subject: [PATCH] replaced stringstream by a list of buffers --- Streaming.cpp | 98 +++++++++++++++++++++++++++++++++------------------ Streaming.h | 50 ++++++++++++++++++++++---- 2 files changed, 107 insertions(+), 41 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 357a3373..0045b10d 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -11,6 +11,50 @@ namespace i2p { namespace stream { + void SendBufferQueue::Add (const uint8_t * buf, size_t len, SendHandler handler) + { + m_Buffers.push_back (std::make_shared(buf, len, handler)); + m_Size += len; + } + + size_t SendBufferQueue::Get (uint8_t * buf, size_t len) + { + size_t offset = 0; + while (!m_Buffers.empty () && offset < len) + { + auto nextBuffer = m_Buffers.front (); + auto rem = nextBuffer->GetRemainingSize (); + if (offset + rem <= len) + { + // whole buffer + memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), rem); + offset += rem; + m_Buffers.pop_front (); // delete it + } + else + { + // partially + rem = len - offset; + memcpy (buf + offset, nextBuffer->GetRemaningBuffer (), len - offset); + nextBuffer->offset += (len - offset); + offset = len; // break + } + } + m_Size -= offset; + return offset; + } + + void SendBufferQueue::CleanUp () + { + if (!m_Buffers.empty ()) + { + for (auto it: m_Buffers) + it->Cancel (); + m_Buffers.clear (); + m_Size = 0; + } + } + Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, std::shared_ptr remote, int port): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), @@ -45,19 +89,16 @@ namespace stream m_AckSendTimer.cancel (); m_ReceiveTimer.cancel (); m_ResendTimer.cancel (); - if (m_SendHandler) - { - auto handler = m_SendHandler; - m_SendHandler = nullptr; - handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted)); - } //CleanUp (); /* Need to recheck - broke working on windows */ m_LocalDestination.DeleteStream (shared_from_this ()); } void Stream::CleanUp () { - m_SendBuffer.str (""); + { + std::unique_lock l(m_SendBufferMutex); + m_SendBuffer.CleanUp (); + } while (!m_ReceiveQueue.empty ()) { auto packet = m_ReceiveQueue.front (); @@ -321,23 +362,21 @@ namespace stream size_t Stream::Send (const uint8_t * buf, size_t len) { - if (len > 0 && buf) - { - std::unique_lock l(m_SendBufferMutex); - m_SendBuffer.clear (); - m_SendBuffer.write ((const char *)buf, len); - } - m_Service.post (std::bind (&Stream::SendBuffer, shared_from_this ())); + // TODO: check max buffer size + AsyncSend (buf, len, nullptr); return len; } void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler) { - if (m_SendHandler) - handler (boost::asio::error::make_error_code (boost::asio::error::in_progress)); - else - m_SendHandler = handler; - Send (buf, len); + if (len > 0 && buf) + { + std::unique_lock l(m_SendBufferMutex); + m_SendBuffer.Add (buf, len, handler); + } + else if (handler) + handler(boost::system::error_code ()); + m_Service.post (std::bind (&Stream::SendBuffer, shared_from_this ())); } void Stream::SendBuffer () @@ -349,7 +388,7 @@ namespace stream std::vector packets; { std::unique_lock l(m_SendBufferMutex); - while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0)) + while ((m_Status == eStreamStatusNew) || (IsEstablished () && !m_SendBuffer.IsEmpty () && numMsgs > 0)) { Packet * p = m_LocalDestination.NewPacket (); uint8_t * packet = p->GetBuffer (); @@ -390,8 +429,7 @@ namespace stream uint8_t * signature = packet + size; // set it later memset (signature, 0, signatureLen); // zeroes for now size += signatureLen; // signature - m_SendBuffer.read ((char *)(packet + size), STREAMING_MTU - size); - size += m_SendBuffer.gcount (); // payload + size += m_SendBuffer.Get (packet + size, STREAMING_MTU - size); // payload m_LocalDestination.GetOwner ()->Sign (packet, size, signature); } else @@ -401,22 +439,12 @@ namespace stream size += 2; // flags htobuf16 (packet + size, 0); // no options size += 2; // options size - m_SendBuffer.read((char *)(packet + size), STREAMING_MTU - size); - size += m_SendBuffer.gcount (); // payload + size += m_SendBuffer.Get(packet + size, STREAMING_MTU - size); // payload } p->len = size; packets.push_back (p); numMsgs--; } - if (m_SendBuffer.eof ()) - { - m_SendBuffer.str(""); // clean up buffer - if (m_SendHandler) - { - m_SendHandler (boost::system::error_code ()); - m_SendHandler = nullptr; - } - } } if (packets.size () > 0) { @@ -433,7 +461,7 @@ namespace stream m_SentPackets.insert (it); } SendPackets (packets); - if (m_Status == eStreamStatusClosing && m_SendBuffer.eof ()) + if (m_Status == eStreamStatusClosing && m_SendBuffer.IsEmpty ()) SendClose (); if (isEmpty) ScheduleResend (); @@ -525,7 +553,7 @@ namespace stream Terminate (); break; case eStreamStatusClosing: - if (m_SentPackets.empty () && m_SendBuffer.eof ()) // nothing to send + if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send { m_Status = eStreamStatusClosed; SendClose (); diff --git a/Streaming.h b/Streaming.h index bfea74ed..be8f3f1a 100644 --- a/Streaming.h +++ b/Streaming.h @@ -3,7 +3,6 @@ #include #include -#include #include #include #include @@ -104,6 +103,48 @@ namespace stream }; }; + typedef std::function SendHandler; + struct SendBuffer + { + uint8_t * buf; + size_t len, offset; + SendHandler handler; + + SendBuffer (const uint8_t * b, size_t l, SendHandler h): + len(l), offset (0), handler(h) + { + buf = new uint8_t[len]; + memcpy (buf, b, len); + } + ~SendBuffer () + { + delete[] buf; + if (handler) handler(boost::system::error_code ()); + } + size_t GetRemainingSize () const { return len - offset; }; + const uint8_t * GetRemaningBuffer () const { return buf + offset; }; + void Cancel () { if (handler) handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted)); handler = nullptr; }; + }; + + class SendBufferQueue + { + public: + + SendBufferQueue (): m_Size (0) {}; + ~SendBufferQueue () { CleanUp (); }; + + void Add (const uint8_t * buf, size_t len, SendHandler handler); + size_t Get (uint8_t * buf, size_t len); + size_t GetSize () const { return m_Size; }; + bool IsEmpty () const { return m_Buffers.empty (); }; + void CleanUp (); + + private: + + std::list > m_Buffers; + size_t m_Size; + }; + enum StreamStatus { eStreamStatusNew = 0, @@ -118,8 +159,6 @@ namespace stream { public: - typedef std::function SendHandler; - Stream (boost::asio::io_service& service, StreamingDestination& local, std::shared_ptr remote, int port = 0); // outgoing Stream (boost::asio::io_service& service, StreamingDestination& local); // incoming @@ -149,7 +188,7 @@ namespace stream size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; size_t GetSendQueueSize () const { return m_SentPackets.size (); }; size_t GetReceiveQueueSize () const { return m_ReceiveQueue.size (); }; - size_t GetSendBufferSize () const { return m_SendBuffer.rdbuf ()->in_avail (); }; + size_t GetSendBufferSize () const { return m_SendBuffer.GetSize (); }; int GetWindowSize () const { return m_WindowSize; }; int GetRTT () const { return m_RTT; }; @@ -202,11 +241,10 @@ namespace stream uint16_t m_Port; std::mutex m_SendBufferMutex; - std::stringstream m_SendBuffer; + SendBufferQueue m_SendBuffer; int m_WindowSize, m_RTT, m_RTO; uint64_t m_LastWindowSizeIncreaseTime; int m_NumResendAttempts; - SendHandler m_SendHandler; }; class StreamingDestination: public std::enable_shared_from_this