From 746f53ba0792d786e3038bc9f466a8bc7d0e9198 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 29 Nov 2020 14:59:34 -0500 Subject: [PATCH] use SendBufferQueue for queued messages from I2P --- libi2pd/Streaming.cpp | 12 ++++- libi2pd/Streaming.h | 6 +++ libi2pd_client/I2CP.cpp | 113 +++++++++++++++++++--------------------- libi2pd_client/I2CP.h | 11 ++-- 4 files changed, 75 insertions(+), 67 deletions(-) diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index 80e8aecf..05b34d9e 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -21,10 +21,18 @@ namespace stream { void SendBufferQueue::Add (const uint8_t * buf, size_t len, SendHandler handler) { - m_Buffers.push_back (std::make_shared(buf, len, handler)); - m_Size += len; + Add (std::make_shared(buf, len, handler)); } + void SendBufferQueue::Add (std::shared_ptr buf) + { + if (buf) + { + m_Buffers.push_back (buf); + m_Size += buf->len; + } + } + size_t SendBufferQueue::Get (uint8_t * buf, size_t len) { size_t offset = 0; diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index e8b8db91..ab3c4439 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -111,6 +111,11 @@ namespace stream buf = new uint8_t[len]; memcpy (buf, b, len); } + SendBuffer (size_t l): // creat empty buffer + len(l), offset (0) + { + buf = new uint8_t[len]; + } ~SendBuffer () { delete[] buf; @@ -129,6 +134,7 @@ namespace stream ~SendBufferQueue () { CleanUp (); }; void Add (const uint8_t * buf, size_t len, SendHandler handler); + void Add (std::shared_ptr buf); size_t Get (uint8_t * buf, size_t len); size_t GetSize () const { return m_Size; }; bool IsEmpty () const { return m_Buffers.empty (); }; diff --git a/libi2pd_client/I2CP.cpp b/libi2pd_client/I2CP.cpp index 82ee7841..cb618b5d 100644 --- a/libi2pd_client/I2CP.cpp +++ b/libi2pd_client/I2CP.cpp @@ -360,13 +360,8 @@ namespace client m_Socket->close (); m_Socket = nullptr; } - if (m_SendQueue) - { - for (auto& it: *m_SendQueue) - delete[] boost::asio::buffer_cast(it); - m_SendQueue->clear (); - m_SendQueue = nullptr; - } + if (!m_SendQueue.IsEmpty ()) + m_SendQueue.CleanUp (); if (m_SessionID != 0xFFFF) { m_Owner.RemoveSession (GetSessionID ()); @@ -383,11 +378,32 @@ namespace client LogPrint (eLogError, "I2CP: Message to send is too long ", l); return; } - uint8_t * buf = m_IsSending ? new uint8_t[l] : m_SendBuffer; + auto sendBuf = m_IsSending ? std::make_shared (l) : nullptr; + uint8_t * buf = sendBuf ? sendBuf->buf : m_SendBuffer; htobe32buf (buf + I2CP_HEADER_LENGTH_OFFSET, len); buf[I2CP_HEADER_TYPE_OFFSET] = type; memcpy (buf + I2CP_HEADER_SIZE, payload, len); - SendBuffer (buf, l); + if (sendBuf) + { + if (m_SendQueue.GetSize () < I2CP_MAX_SEND_QUEUE_SIZE) + m_SendQueue.Add (sendBuf); + else + { + LogPrint (eLogWarning, "I2CP: send queue size exceeds ", I2CP_MAX_SEND_QUEUE_SIZE); + return; + } + } + else + { + auto socket = m_Socket; + if (socket) + { + m_IsSending = true; + boost::asio::async_write (*socket, boost::asio::buffer (m_SendBuffer, l), + boost::asio::transfer_all (), std::bind(&I2CPSession::HandleI2CPMessageSent, + shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + } + } } void I2CPSession::HandleI2CPMessageSent (const boost::system::error_code& ecode, std::size_t bytes_transferred) @@ -397,62 +413,22 @@ namespace client if (ecode != boost::asio::error::operation_aborted) Terminate (); } - else if (m_SendQueue) + else if (!m_SendQueue.IsEmpty ()) { auto socket = m_Socket; if (socket) { - auto queue = m_SendQueue; - m_SendQueue = nullptr; - boost::asio::async_write (*socket, *queue, boost::asio::transfer_all (), - std::bind(&I2CPSession::HandleI2CPMessageSentQueue, shared_from_this (), - std::placeholders::_1, std::placeholders::_2, queue)); + auto len = m_SendQueue.Get (m_SendBuffer, I2CP_MAX_MESSAGE_LENGTH); + boost::asio::async_write (*socket, boost::asio::buffer (m_SendBuffer, len), + boost::asio::transfer_all (),std::bind(&I2CPSession::HandleI2CPMessageSent, + shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } + else + m_IsSending = false; } else m_IsSending = false; } - - void I2CPSession::HandleI2CPMessageSentQueue (const boost::system::error_code& ecode, std::size_t bytes_transferred, SendQueue queue) - { - for (auto& it: *queue) - delete[] boost::asio::buffer_cast(it); - queue->clear (); - - HandleI2CPMessageSent (ecode, bytes_transferred); - } - - void I2CPSession::SendBuffer (uint8_t * buf, size_t len) - { - auto socket = m_Socket; - if (socket) - { - if (m_IsSending) - { - auto sendQueue = m_SendQueue; - if (!sendQueue) - { - sendQueue = std::make_shared (); - m_SendQueue = sendQueue; - } - else if (sendQueue->size () > I2CP_MAX_SEND_QUEUE_SIZE) - { - LogPrint (eLogError, "I2CP: Queue size exceeds ", I2CP_MAX_SEND_QUEUE_SIZE); - return; - } - sendQueue->push_back ({buf, len}); - } - else - { - m_IsSending = true; - boost::asio::async_write (*socket, boost::asio::buffer (buf, len), boost::asio::transfer_all (), - std::bind(&I2CPSession::HandleI2CPMessageSent, shared_from_this (), - std::placeholders::_1, std::placeholders::_2)); - } - } - else - LogPrint (eLogError, "I2CP: Can't write to the socket"); - } std::string I2CPSession::ExtractString (const uint8_t * buf, size_t len) { @@ -885,14 +861,35 @@ namespace client LogPrint (eLogError, "I2CP: Message to send is too long ", l); return; } - uint8_t * buf = m_IsSending ? new uint8_t[l] : m_SendBuffer; + auto sendBuf = m_IsSending ? std::make_shared (l) : nullptr; + uint8_t * buf = sendBuf ? sendBuf->buf : m_SendBuffer; htobe32buf (buf + I2CP_HEADER_LENGTH_OFFSET, len + 10); buf[I2CP_HEADER_TYPE_OFFSET] = I2CP_MESSAGE_PAYLOAD_MESSAGE; htobe16buf (buf + I2CP_HEADER_SIZE, m_SessionID); htobe32buf (buf + I2CP_HEADER_SIZE + 2, m_MessageID++); htobe32buf (buf + I2CP_HEADER_SIZE + 6, len); memcpy (buf + I2CP_HEADER_SIZE + 10, payload, len); - SendBuffer (buf, l); + if (sendBuf) + { + if (m_SendQueue.GetSize () < I2CP_MAX_SEND_QUEUE_SIZE) + m_SendQueue.Add (sendBuf); + else + { + LogPrint (eLogWarning, "I2CP: send queue size exceeds ", I2CP_MAX_SEND_QUEUE_SIZE); + return; + } + } + else + { + auto socket = m_Socket; + if (socket) + { + m_IsSending = true; + boost::asio::async_write (*socket, boost::asio::buffer (m_SendBuffer, l), + boost::asio::transfer_all (), std::bind(&I2CPSession::HandleI2CPMessageSent, + shared_from_this (), std::placeholders::_1, std::placeholders::_2)); + } + } } I2CPServer::I2CPServer (const std::string& interface, int port, bool isSingleThread): diff --git a/libi2pd_client/I2CP.h b/libi2pd_client/I2CP.h index 460b8638..32f32221 100644 --- a/libi2pd_client/I2CP.h +++ b/libi2pd_client/I2CP.h @@ -17,6 +17,7 @@ #include #include "util.h" #include "Destination.h" +#include "Streaming.h" namespace i2p { @@ -25,7 +26,7 @@ namespace client const uint8_t I2CP_PROTOCOL_BYTE = 0x2A; const size_t I2CP_SESSION_BUFFER_SIZE = 4096; const size_t I2CP_MAX_MESSAGE_LENGTH = 65535; - const size_t I2CP_MAX_SEND_QUEUE_SIZE = 256; + const size_t I2CP_MAX_SEND_QUEUE_SIZE = 1024*1024; // in bytes, 1M const size_t I2CP_HEADER_LENGTH_OFFSET = 0; const size_t I2CP_HEADER_TYPE_OFFSET = I2CP_HEADER_LENGTH_OFFSET + 4; @@ -125,8 +126,6 @@ namespace client class I2CPServer; class I2CPSession: public std::enable_shared_from_this { - typedef std::shared_ptr > SendQueue; - public: #ifdef ANDROID @@ -171,14 +170,12 @@ namespace client void HandleReceivedPayload (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleMessage (); void Terminate (); - void SendBuffer (uint8_t * buf, size_t len); void HandleI2CPMessageSent (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void HandleI2CPMessageSentQueue (const boost::system::error_code& ecode, std::size_t bytes_transferred, SendQueue queue); + std::string ExtractString (const uint8_t * buf, size_t len); size_t PutString (uint8_t * buf, size_t len, const std::string& str); void ExtractMapping (const uint8_t * buf, size_t len, std::map& mapping); - void SendSessionStatusMessage (uint8_t status); void SendHostReplyMessage (uint32_t requestID, std::shared_ptr identity); @@ -197,7 +194,7 @@ namespace client // to client bool m_IsSending; uint8_t m_SendBuffer[I2CP_MAX_MESSAGE_LENGTH]; - SendQueue m_SendQueue; + i2p::stream::SendBufferQueue m_SendQueue; }; typedef void (I2CPSession::*I2CPMessageHandler)(const uint8_t * buf, size_t len);