Browse Source

cleanup received messages list by timestamp

pull/1696/head
orignal 3 years ago
parent
commit
31bdce1f1f
  1. 19
      libi2pd/SSUData.cpp
  2. 3
      libi2pd/SSUData.h

19
libi2pd/SSUData.cpp

@ -246,8 +246,8 @@ namespace transport
{ {
if (!m_ReceivedMessages.count (msgID)) if (!m_ReceivedMessages.count (msgID))
{ {
m_ReceivedMessages.insert (msgID);
m_LastMessageReceivedTime = i2p::util::GetSecondsSinceEpoch (); m_LastMessageReceivedTime = i2p::util::GetSecondsSinceEpoch ();
m_ReceivedMessages.emplace (msgID, m_LastMessageReceivedTime);
if (!msg->IsExpired ()) if (!msg->IsExpired ())
{ {
m_Handler.PutNextMessage (msg); m_Handler.PutNextMessage (msg);
@ -511,10 +511,21 @@ namespace transport
else else
++it; ++it;
} }
// decay
if (m_ReceivedMessages.size () > MAX_NUM_RECEIVED_MESSAGES || if (m_ReceivedMessages.size () > MAX_NUM_RECEIVED_MESSAGES || ts > m_LastMessageReceivedTime + DECAY_INTERVAL)
i2p::util::GetSecondsSinceEpoch () > m_LastMessageReceivedTime + DECAY_INTERVAL) // decay
m_ReceivedMessages.clear (); m_ReceivedMessages.clear ();
else
{
// delete old received messages
for (auto it = m_ReceivedMessages.begin (); it != m_ReceivedMessages.end ();)
{
if (ts > it->second + RECEIVED_MESSAGES_CLEANUP_TIMEOUT)
it = m_ReceivedMessages.erase (it);
else
++it;
}
}
ScheduleIncompleteMessagesCleanup (); ScheduleIncompleteMessagesCleanup ();
} }

3
libi2pd/SSUData.h

@ -40,6 +40,7 @@ namespace transport
const int MAX_NUM_RESENDS = 5; const int MAX_NUM_RESENDS = 5;
const int DECAY_INTERVAL = 20; // in seconds const int DECAY_INTERVAL = 20; // in seconds
const int INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds const int INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds
const int RECEIVED_MESSAGES_CLEANUP_TIMEOUT = 40; // in seconds
const unsigned int MAX_NUM_RECEIVED_MESSAGES = 1000; // how many msgID we store for duplicates check const unsigned int MAX_NUM_RECEIVED_MESSAGES = 1000; // how many msgID we store for duplicates check
const int MAX_OUTGOING_WINDOW_SIZE = 200; // how many unacked message we can store const int MAX_OUTGOING_WINDOW_SIZE = 200; // how many unacked message we can store
// data flags // data flags
@ -128,7 +129,7 @@ namespace transport
SSUSession& m_Session; SSUSession& m_Session;
std::unordered_map<uint32_t, std::shared_ptr<IncompleteMessage> > m_IncompleteMessages; std::unordered_map<uint32_t, std::shared_ptr<IncompleteMessage> > m_IncompleteMessages;
std::unordered_map<uint32_t, std::shared_ptr<SentMessage> > m_SentMessages; std::unordered_map<uint32_t, std::shared_ptr<SentMessage> > m_SentMessages;
std::unordered_set<uint32_t> m_ReceivedMessages; std::unordered_map<uint32_t, uint64_t> m_ReceivedMessages; // msgID -> timestamp in seconds
boost::asio::deadline_timer m_ResendTimer, m_IncompleteMessagesCleanupTimer; boost::asio::deadline_timer m_ResendTimer, m_IncompleteMessagesCleanupTimer;
int m_MaxPacketSize, m_PacketSize; int m_MaxPacketSize, m_PacketSize;
i2p::I2NPMessagesHandler m_Handler; i2p::I2NPMessagesHandler m_Handler;

Loading…
Cancel
Save