From e5fdee272b497b76ad8966c409aeeddb68b600b2 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 8 Feb 2015 08:50:05 -0500 Subject: [PATCH] clean obsolete SSU data --- SSUData.cpp | 89 ++++++++++++++++++++++++++++++++++++++++++++------ SSUData.h | 16 +++++++-- SSUSession.cpp | 8 ++--- SSUSession.h | 2 +- 4 files changed, 98 insertions(+), 17 deletions(-) diff --git a/SSUData.cpp b/SSUData.cpp index 58387a07..a7ff413e 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -11,7 +11,8 @@ namespace i2p namespace transport { SSUData::SSUData (SSUSession& session): - m_Session (session), m_ResendTimer (session.m_Server.GetService ()) + m_Session (session), m_ResendTimer (session.GetService ()), m_DecayTimer (session.GetService ()), + m_IncompleteMessagesCleanupTimer (session.GetService ()) { m_MaxPacketSize = session.IsV6 () ? SSU_V6_MAX_PACKET_SIZE : SSU_V4_MAX_PACKET_SIZE; m_PacketSize = m_MaxPacketSize; @@ -28,9 +29,16 @@ namespace transport delete it.second; } + void SSUData::Start () + { + ScheduleIncompleteMessagesCleanup (); + } + void SSUData::Stop () { m_ResendTimer.cancel (); + m_DecayTimer.cancel (); + m_IncompleteMessagesCleanupTimer.cancel (); } void SSUData::AdjustPacketSize (const i2p::data::RouterInfo& remoteRouter) @@ -209,7 +217,9 @@ namespace transport // missing fragment LogPrint (eLogWarning, "Missing fragments from ", (int)incompleteMessage->nextFragmentNum, " to ", fragmentNum - 1, " of message ", msgID); auto savedFragment = new Fragment (fragmentNum, buf, fragmentSize, isLast); - if (!incompleteMessage->savedFragments.insert (std::unique_ptr(savedFragment)).second) + if (incompleteMessage->savedFragments.insert (std::unique_ptr(savedFragment)).second) + incompleteMessage->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch (); + else LogPrint (eLogWarning, "Fragment ", (int)fragmentNum, " of message ", msgID, " already saved"); } isLast = false; @@ -228,7 +238,10 @@ namespace transport { if (!m_ReceivedMessages.count (msgID)) { - if (m_ReceivedMessages.size () > 100) m_ReceivedMessages.clear (); + if (m_ReceivedMessages.size () > MAX_NUM_RECEIVED_MESSAGES) + m_ReceivedMessages.clear (); + else + ScheduleDecay (); m_ReceivedMessages.insert (msgID); m_Handler.PutNextMessage (msg); } @@ -401,26 +414,82 @@ namespace transport m_ResendTimer.async_wait ([s](const boost::system::error_code& ecode) { s->m_Data.HandleResendTimer (ecode); }); } - + void SSUData::HandleResendTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) { uint32_t ts = i2p::util::GetSecondsSinceEpoch (); - for (auto it : m_SentMessages) + for (auto it = m_SentMessages.begin (); it != m_SentMessages.end ();) { - if (ts >= it.second->nextResendTime && it.second->numResends < MAX_NUM_RESENDS) + if (ts >= it->second->nextResendTime) { - for (auto& f: it.second->fragments) - if (f) m_Session.Send (f->buf, f->len); // resend + if (it->second->numResends < MAX_NUM_RESENDS) + { + for (auto& f: it->second->fragments) + if (f) m_Session.Send (f->buf, f->len); // resend - it.second->numResends++; - it.second->nextResendTime += it.second->numResends*RESEND_INTERVAL; + it->second->numResends++; + it->second->nextResendTime += it->second->numResends*RESEND_INTERVAL; + it++; + } + else + { + LogPrint (eLogError, "SSU message has not been ACKed after ", MAX_NUM_RESENDS, " attempts. Deleted"); + delete it->second; + it = m_SentMessages.erase (it); + } } + else + it++; } ScheduleResend (); } } + + void SSUData::ScheduleDecay () + { + m_DecayTimer.cancel (); + m_DecayTimer.expires_from_now (boost::posix_time::seconds(DECAY_INTERVAL)); + auto s = m_Session.shared_from_this(); + m_ResendTimer.async_wait ([s](const boost::system::error_code& ecode) + { s->m_Data.HandleDecayTimer (ecode); }); + } + + void SSUData::HandleDecayTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + m_ReceivedMessages.clear (); + } + + void SSUData::ScheduleIncompleteMessagesCleanup () + { + m_IncompleteMessagesCleanupTimer.cancel (); + m_IncompleteMessagesCleanupTimer.expires_from_now (boost::posix_time::seconds(INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT)); + auto s = m_Session.shared_from_this(); + m_IncompleteMessagesCleanupTimer.async_wait ([s](const boost::system::error_code& ecode) + { s->m_Data.HandleIncompleteMessagesCleanupTimer (ecode); }); + } + + void SSUData::HandleIncompleteMessagesCleanupTimer (const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + uint32_t ts = i2p::util::GetSecondsSinceEpoch (); + for (auto it = m_IncomleteMessages.begin (); it != m_IncomleteMessages.end ();) + { + if (ts > it->second->lastFragmentInsertTime + INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT) + { + LogPrint (eLogError, "SSU message ", it->first, " was not completed in ", INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT, " .Deleted"); + delete it->second; + it = m_IncomleteMessages.erase (it); + } + else + it++; + } + ScheduleIncompleteMessagesCleanup (); + } + } } } diff --git a/SSUData.h b/SSUData.h index 021c5bf1..f56c81d2 100644 --- a/SSUData.h +++ b/SSUData.h @@ -26,6 +26,9 @@ namespace transport const size_t SSU_V6_MAX_PACKET_SIZE = SSU_MTU_V6 - IPV6_HEADER_SIZE - UDP_HEADER_SIZE; // 1424 const int RESEND_INTERVAL = 3; // in seconds const int MAX_NUM_RESENDS = 5; + const int DECAY_INTERVAL = 20; // in seconds + const int MAX_NUM_RECEIVED_MESSAGES = 1000; // how many msgID we store for duplicates check + const int INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds // data flags const uint8_t DATA_FLAG_EXTENDED_DATA_INCLUDED = 0x02; const uint8_t DATA_FLAG_WANT_REPLY = 0x04; @@ -58,9 +61,10 @@ namespace transport { I2NPMessage * msg; int nextFragmentNum; + uint32_t lastFragmentInsertTime; // in seconds std::set, FragmentCmp> savedFragments; - IncompleteMessage (I2NPMessage * m): msg (m), nextFragmentNum (0) {}; + IncompleteMessage (I2NPMessage * m): msg (m), nextFragmentNum (0), lastFragmentInsertTime (0) {}; ~IncompleteMessage () { if (msg) DeleteI2NPMessage (msg); }; }; @@ -78,6 +82,8 @@ namespace transport SSUData (SSUSession& session); ~SSUData (); + + void Start (); void Stop (); void ProcessMessage (uint8_t * buf, size_t len); @@ -95,6 +101,12 @@ namespace transport void ScheduleResend (); void HandleResendTimer (const boost::system::error_code& ecode); + + void ScheduleDecay (); + void HandleDecayTimer (const boost::system::error_code& ecode); + + void ScheduleIncompleteMessagesCleanup (); + void HandleIncompleteMessagesCleanupTimer (const boost::system::error_code& ecode); void AdjustPacketSize (const i2p::data::RouterInfo& remoteRouter); @@ -104,7 +116,7 @@ namespace transport std::map m_IncomleteMessages; std::map m_SentMessages; std::set m_ReceivedMessages; - boost::asio::deadline_timer m_ResendTimer; + boost::asio::deadline_timer m_ResendTimer, m_DecayTimer, m_IncompleteMessagesCleanupTimer; int m_MaxPacketSize, m_PacketSize; i2p::I2NPMessagesHandler m_Handler; }; diff --git a/SSUSession.cpp b/SSUSession.cpp index d360baf3..ed7fa9a4 100644 --- a/SSUSession.cpp +++ b/SSUSession.cpp @@ -15,10 +15,9 @@ namespace transport { SSUSession::SSUSession (SSUServer& server, boost::asio::ip::udp::endpoint& remoteEndpoint, std::shared_ptr router, bool peerTest ): TransportSession (router), - m_Server (server), m_RemoteEndpoint (remoteEndpoint), - m_Timer (GetService ()), m_PeerTest (peerTest), - m_State (eSessionStateUnknown), m_IsSessionKey (false), m_RelayTag (0), - m_Data (*this), m_NumSentBytes (0), m_NumReceivedBytes (0) + m_Server (server), m_RemoteEndpoint (remoteEndpoint), m_Timer (GetService ()), + m_PeerTest (peerTest),m_State (eSessionStateUnknown), m_IsSessionKey (false), m_RelayTag (0), + m_NumSentBytes (0), m_NumReceivedBytes (0), m_Data (*this) { m_CreationTime = i2p::util::GetSecondsSinceEpoch (); } @@ -780,6 +779,7 @@ namespace transport delete m_DHKeysPair; m_DHKeysPair = nullptr; } + m_Data.Start (); m_Data.Send (CreateDatabaseStoreMsg ()); transports.PeerConnected (shared_from_this ()); if (m_PeerTest && (m_RemoteRouter && m_RemoteRouter->IsPeerTesting ())) diff --git a/SSUSession.h b/SSUSession.h index 0ff1a9f1..1a2690f1 100644 --- a/SSUSession.h +++ b/SSUSession.h @@ -136,9 +136,9 @@ namespace transport i2p::crypto::CBCDecryption m_SessionKeyDecryption; i2p::crypto::AESKey m_SessionKey; i2p::crypto::MACKey m_MacKey; - SSUData m_Data; size_t m_NumSentBytes, m_NumReceivedBytes; uint32_t m_CreationTime; // seconds since epoch + SSUData m_Data; };