From e3190a4ca9c931a65b6286211fda0ff23c8dc22a Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 15 Feb 2015 14:17:55 -0500 Subject: [PATCH] SSU data receive batching --- SSU.cpp | 2 ++ SSUData.cpp | 7 +++++-- SSUData.h | 1 + SSUSession.cpp | 11 ++++++++++- SSUSession.h | 3 +++ 5 files changed, 21 insertions(+), 3 deletions(-) diff --git a/SSU.cpp b/SSU.cpp index f5c6287d..20c86a81 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -218,6 +218,7 @@ namespace transport auto packet = it1; if (!session || session->GetRemoteEndpoint () != packet->from) // we received packet for other session than previous { + if (session) session->FlushData (); auto it = m_Sessions.find (packet->from); if (it != m_Sessions.end ()) session = it->second; @@ -235,6 +236,7 @@ namespace transport session->ProcessNextMessage (packet->buf, packet->len, packet->from); delete packet; } + if (session) session->FlushData (); } std::shared_ptr SSUServer::FindSession (std::shared_ptr router) const diff --git a/SSUData.cpp b/SSUData.cpp index f59a172d..c34fb690 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -258,10 +258,13 @@ namespace transport SendFragmentAck (msgID, fragmentNum); buf += fragmentSize; } - if (numFragments > 0) - m_Handler.Flush (); } + void SSUData::FlushReceivedMessage () + { + m_Handler.Flush (); + } + void SSUData::ProcessMessage (uint8_t * buf, size_t len) { //uint8_t * start = buf; diff --git a/SSUData.h b/SSUData.h index b7d1a9e2..8419f501 100644 --- a/SSUData.h +++ b/SSUData.h @@ -87,6 +87,7 @@ namespace transport void Stop (); void ProcessMessage (uint8_t * buf, size_t len); + void FlushReceivedMessage (); void Send (i2p::I2NPMessage * msg); void UpdatePacketSize (const i2p::data::IdentHash& remoteIdent); diff --git a/SSUSession.cpp b/SSUSession.cpp index 7b81903c..a6fc2b52 100644 --- a/SSUSession.cpp +++ b/SSUSession.cpp @@ -17,7 +17,7 @@ namespace transport std::shared_ptr router, bool peerTest ): TransportSession (router), m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_Timer (GetService ()), m_PeerTest (peerTest),m_State (eSessionStateUnknown), m_IsSessionKey (false), m_RelayTag (0), - m_NumSentBytes (0), m_NumReceivedBytes (0), m_Data (*this) + m_NumSentBytes (0), m_NumReceivedBytes (0), m_Data (*this), m_IsDataReceived (false) { m_CreationTime = i2p::util::GetSecondsSinceEpoch (); } @@ -866,8 +866,17 @@ namespace transport void SSUSession::ProcessData (uint8_t * buf, size_t len) { m_Data.ProcessMessage (buf, len); + m_IsDataReceived = true; } + void SSUSession::FlushData () + { + if (m_IsDataReceived) + { + m_Data.FlushReceivedMessage (); + m_IsDataReceived = false; + } + } void SSUSession::ProcessPeerTest (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint) { diff --git a/SSUSession.h b/SSUSession.h index 1a2690f1..e8ca5a7f 100644 --- a/SSUSession.h +++ b/SSUSession.h @@ -79,6 +79,8 @@ namespace transport uint32_t GetRelayTag () const { return m_RelayTag; }; uint32_t GetCreationTime () const { return m_CreationTime; }; + void FlushData (); + private: boost::asio::io_service& GetService (); @@ -139,6 +141,7 @@ namespace transport size_t m_NumSentBytes, m_NumReceivedBytes; uint32_t m_CreationTime; // seconds since epoch SSUData m_Data; + bool m_IsDataReceived; };