diff --git a/I2NPProtocol.cpp b/I2NPProtocol.cpp index 51e566e7..c15ff95a 100644 --- a/I2NPProtocol.cpp +++ b/I2NPProtocol.cpp @@ -579,4 +579,32 @@ namespace i2p } } } + + I2NPMessagesHandler::~I2NPMessagesHandler () + { + Flush (); + } + + void I2NPMessagesHandler::PutNextMessage (I2NPMessage * msg) + { + if (msg) + { + if (msg->GetTypeID () == eI2NPTunnelData) + { + LogPrint ("TunnelData"); + m_TunnelMsgs.push_back (msg); + } + else + HandleI2NPMessage (msg); + } + } + + void I2NPMessagesHandler::Flush () + { + if (!m_TunnelMsgs.empty ()) + { + i2p::tunnel::tunnels.PostTunnelData (m_TunnelMsgs); + m_TunnelMsgs.clear (); + } + } } diff --git a/I2NPProtocol.h b/I2NPProtocol.h index 8f8dddd3..f5509cbd 100644 --- a/I2NPProtocol.h +++ b/I2NPProtocol.h @@ -216,6 +216,19 @@ namespace tunnel size_t GetI2NPMessageLength (const uint8_t * msg); void HandleI2NPMessage (uint8_t * msg, size_t len); void HandleI2NPMessage (I2NPMessage * msg); + + class I2NPMessagesHandler + { + public: + + ~I2NPMessagesHandler (); + void PutNextMessage (I2NPMessage * msg); + void Flush (); + + private: + + std::vector m_TunnelMsgs; + }; } #endif diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 0a0a52fa..df1e9bc9 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -481,6 +481,7 @@ namespace transport } if (m_ReceiveBufferOffset > 0) memcpy (m_ReceiveBuffer, nextBlock, m_ReceiveBufferOffset); + m_Handler.Flush (); } ScheduleTermination (); // reset termination timer @@ -529,7 +530,7 @@ namespace transport if (m_NextMessageOffset >= m_NextMessage->len + 4) // +checksum { // we have a complete I2NP message - i2p::HandleI2NPMessage (m_NextMessage); + m_Handler.PutNextMessage (m_NextMessage); m_NextMessage = nullptr; } return true; diff --git a/NTCPSession.h b/NTCPSession.h index 6098a596..bf2cb17e 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -42,7 +42,7 @@ namespace transport #pragma pack() const size_t NTCP_MAX_MESSAGE_SIZE = 16384; - const size_t NTCP_BUFFER_SIZE = 1040; // fits one tunnel message (1028) + const size_t NTCP_BUFFER_SIZE = 4160; // fits 4 tunnel messages (4*1028) const int NTCP_TERMINATION_TIMEOUT = 120; // 2 minutes const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448 @@ -131,7 +131,8 @@ namespace transport i2p::I2NPMessage * m_NextMessage; size_t m_NextMessageOffset; - + i2p::I2NPMessagesHandler m_Handler; + size_t m_NumSentBytes, m_NumReceivedBytes; }; diff --git a/Queue.h b/Queue.h index f98d7178..2749da6a 100644 --- a/Queue.h +++ b/Queue.h @@ -2,6 +2,7 @@ #define QUEUE_H__ #include +#include #include #include #include @@ -23,6 +24,17 @@ namespace util m_NonEmpty.notify_one (); } + void Put (const std::vector& vec) + { + if (!vec.empty ()) + { + std::unique_lock l(m_QueueMutex); + for (auto it: vec) + m_Queue.push (it); + m_NonEmpty.notify_one (); + } + } + Element * GetNext () { std::unique_lock l(m_QueueMutex); diff --git a/Tunnel.cpp b/Tunnel.cpp index 5eb461ff..8ce376a4 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -576,6 +576,11 @@ namespace tunnel if (msg) m_Queue.Put (msg); } + void Tunnels::PostTunnelData (const std::vector& msgs) + { + m_Queue.Put (msgs); + } + template TTunnel * Tunnels::CreateTunnel (TunnelConfig * config, OutboundTunnel * outboundTunnel) { diff --git a/Tunnel.h b/Tunnel.h index 91d6330a..4925df18 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -130,6 +130,7 @@ namespace tunnel void AddOutboundTunnel (OutboundTunnel * newTunnel); void AddInboundTunnel (InboundTunnel * newTunnel); void PostTunnelData (I2NPMessage * msg); + void PostTunnelData (const std::vector& msgs); template TTunnel * CreateTunnel (TunnelConfig * config, OutboundTunnel * outboundTunnel = 0); std::shared_ptr CreateTunnelPool (i2p::garlic::GarlicDestination * localDestination, int numInboundHops, int numOuboundHops);