diff --git a/libi2pd/SSU.cpp b/libi2pd/SSU.cpp index 86e76b8e..7b129811 100644 --- a/libi2pd/SSU.cpp +++ b/libi2pd/SSU.cpp @@ -952,6 +952,8 @@ namespace transport session->Failed (); }); } + else + it.second->CleanUp (); ScheduleTermination (); } } @@ -980,6 +982,8 @@ namespace transport session->Failed (); }); } + else + it.second->CleanUp (); ScheduleTerminationV6 (); } } diff --git a/libi2pd/SSUData.cpp b/libi2pd/SSUData.cpp index 7d048799..0468956b 100644 --- a/libi2pd/SSUData.cpp +++ b/libi2pd/SSUData.cpp @@ -33,7 +33,6 @@ namespace transport SSUData::SSUData (SSUSession& session): m_Session (session), m_ResendTimer (session.GetService ()), - m_IncompleteMessagesCleanupTimer (session.GetService ()), m_MaxPacketSize (session.IsV6 () ? SSU_V6_MAX_PACKET_SIZE : SSU_V4_MAX_PACKET_SIZE), m_PacketSize (m_MaxPacketSize), m_LastMessageReceivedTime (0) { @@ -45,13 +44,11 @@ namespace transport void SSUData::Start () { - ScheduleIncompleteMessagesCleanup (); } void SSUData::Stop () { m_ResendTimer.cancel (); - m_IncompleteMessagesCleanupTimer.cancel (); m_IncompleteMessages.clear (); m_SentMessages.clear (); m_ReceivedMessages.clear (); @@ -487,48 +484,34 @@ namespace transport } } - void SSUData::ScheduleIncompleteMessagesCleanup () + void SSUData::CleanUp () { - m_IncompleteMessagesCleanupTimer.cancel (); - m_IncompleteMessagesCleanupTimer.expires_from_now (boost::posix_time::seconds(INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT)); - auto s = m_Session.shared_from_this(); - m_IncompleteMessagesCleanupTimer.async_wait ([s](const boost::system::error_code& ecode) - { s->m_Data.HandleIncompleteMessagesCleanupTimer (ecode); }); - } - - void SSUData::HandleIncompleteMessagesCleanupTimer (const boost::system::error_code& ecode) - { - if (ecode != boost::asio::error::operation_aborted) + uint32_t ts = i2p::util::GetSecondsSinceEpoch (); + for (auto it = m_IncompleteMessages.begin (); it != m_IncompleteMessages.end ();) { - uint32_t ts = i2p::util::GetSecondsSinceEpoch (); - for (auto it = m_IncompleteMessages.begin (); it != m_IncompleteMessages.end ();) + if (ts > it->second->lastFragmentInsertTime + INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT) { - if (ts > it->second->lastFragmentInsertTime + INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT) - { - LogPrint (eLogWarning, "SSU: message ", it->first, " was not completed in ", INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT, " seconds, deleted"); - it = m_IncompleteMessages.erase (it); - } - else - ++it; + LogPrint (eLogWarning, "SSU: message ", it->first, " was not completed in ", INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT, " seconds, deleted"); + it = m_IncompleteMessages.erase (it); } - - if (m_ReceivedMessages.size () > MAX_NUM_RECEIVED_MESSAGES || ts > m_LastMessageReceivedTime + DECAY_INTERVAL) - // decay - m_ReceivedMessages.clear (); else + ++it; + } + + if (m_ReceivedMessages.size () > MAX_NUM_RECEIVED_MESSAGES || ts > m_LastMessageReceivedTime + DECAY_INTERVAL) + // decay + m_ReceivedMessages.clear (); + else + { + // delete old received messages + for (auto it = m_ReceivedMessages.begin (); it != m_ReceivedMessages.end ();) { - // delete old received messages - for (auto it = m_ReceivedMessages.begin (); it != m_ReceivedMessages.end ();) - { - if (ts > it->second + RECEIVED_MESSAGES_CLEANUP_TIMEOUT) - it = m_ReceivedMessages.erase (it); - else - ++it; - } + if (ts > it->second + RECEIVED_MESSAGES_CLEANUP_TIMEOUT) + it = m_ReceivedMessages.erase (it); + else + ++it; } - - ScheduleIncompleteMessagesCleanup (); - } - } + } + } } } diff --git a/libi2pd/SSUData.h b/libi2pd/SSUData.h index 3dcff1d4..590a538c 100644 --- a/libi2pd/SSUData.h +++ b/libi2pd/SSUData.h @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include "I2NPProtocol.h" @@ -101,7 +100,8 @@ namespace transport void Start (); void Stop (); - + void CleanUp (); + void ProcessMessage (uint8_t * buf, size_t len); void FlushReceivedMessage (); void Send (std::shared_ptr msg); @@ -120,17 +120,13 @@ namespace transport void ScheduleResend (); void HandleResendTimer (const boost::system::error_code& ecode); - void ScheduleIncompleteMessagesCleanup (); - void HandleIncompleteMessagesCleanupTimer (const boost::system::error_code& ecode); - - private: SSUSession& m_Session; std::unordered_map > m_IncompleteMessages; std::unordered_map > m_SentMessages; std::unordered_map m_ReceivedMessages; // msgID -> timestamp in seconds - boost::asio::deadline_timer m_ResendTimer, m_IncompleteMessagesCleanupTimer; + boost::asio::deadline_timer m_ResendTimer; int m_MaxPacketSize, m_PacketSize; i2p::I2NPMessagesHandler m_Handler; uint32_t m_LastMessageReceivedTime; // in second diff --git a/libi2pd/SSUSession.cpp b/libi2pd/SSUSession.cpp index 59ee578b..e01b0c0e 100644 --- a/libi2pd/SSUSession.cpp +++ b/libi2pd/SSUSession.cpp @@ -1004,6 +1004,12 @@ namespace transport } } + void SSUSession::CleanUp () + { + m_Data.CleanUp (); + // TODO: clean up m_RelayRequests + } + void SSUSession::ProcessPeerTest (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint) { uint32_t nonce = bufbe32toh (buf); // 4 bytes diff --git a/libi2pd/SSUSession.h b/libi2pd/SSUSession.h index 462afc35..acf232b9 100644 --- a/libi2pd/SSUSession.h +++ b/libi2pd/SSUSession.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2020, The PurpleI2P Project +* Copyright (c) 2013-2021, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -106,6 +106,7 @@ namespace transport void SetCreationTime (uint32_t ts) { m_CreationTime = ts; }; // for introducers void FlushData (); + void CleanUp (); private: