Browse Source

move unsent I2NP messages to new session if replaced

pull/2072/head
orignal 7 months ago
parent
commit
bc8adf1433
  1. 21
      libi2pd/NTCP2.cpp
  2. 1
      libi2pd/NTCP2.h
  3. 4
      libi2pd/SSU2Session.cpp

21
libi2pd/NTCP2.cpp

@ -412,6 +412,7 @@ namespace transport
m_IsEstablished = true; m_IsEstablished = true;
m_Establisher.reset (nullptr); m_Establisher.reset (nullptr);
SetTerminationTimeout (NTCP2_TERMINATION_TIMEOUT); SetTerminationTimeout (NTCP2_TERMINATION_TIMEOUT);
SendQueue ();
transports.PeerConnected (shared_from_this ()); transports.PeerConnected (shared_from_this ());
} }
@ -1133,7 +1134,7 @@ namespace transport
void NTCP2Session::SendQueue () void NTCP2Session::SendQueue ()
{ {
if (!m_SendQueue.empty ()) if (!m_SendQueue.empty () && m_IsEstablished)
{ {
std::vector<std::shared_ptr<I2NPMessage> > msgs; std::vector<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
@ -1170,6 +1171,21 @@ namespace transport
} }
} }
void NTCP2Session::MoveSendQueue (std::shared_ptr<NTCP2Session> other)
{
if (!other || m_SendQueue.empty ()) return;
std::vector<std::shared_ptr<I2NPMessage> > msgs;
auto ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it: m_SendQueue)
if (!it->IsExpired (ts))
msgs.push_back (it);
else
it->Drop ();
m_SendQueue.clear ();
if (!msgs.empty ())
other->PostI2NPMessages (msgs);
}
size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len) size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len)
{ {
if (len < 3) return 0; if (len < 3) return 0;
@ -1263,7 +1279,7 @@ namespace transport
else else
m_SendQueue.push_back (std::move (it)); m_SendQueue.push_back (std::move (it));
if (!m_IsSending) if (!m_IsSending && m_IsEstablished)
SendQueue (); SendQueue ();
else if (m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE) else if (m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE)
{ {
@ -1426,6 +1442,7 @@ namespace transport
{ {
// replace by new session // replace by new session
auto s = it->second; auto s = it->second;
s->MoveSendQueue (session);
m_NTCP2Sessions.erase (it); m_NTCP2Sessions.erase (it);
s->Terminate (); s->Terminate ();
} }

1
libi2pd/NTCP2.h

@ -151,6 +151,7 @@ namespace transport
void SendLocalRouterInfo (bool update) override; // after handshake or by update void SendLocalRouterInfo (bool update) override; // after handshake or by update
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override; void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
void MoveSendQueue (std::shared_ptr<NTCP2Session> other);
private: private:

4
libi2pd/SSU2Session.cpp

@ -383,10 +383,12 @@ namespace transport
m_SendQueue.push_back (std::move (it)); m_SendQueue.push_back (std::move (it));
} }
} }
if (IsEstablished ())
{
SendQueue (); SendQueue ();
if (m_SendQueue.size () > 0) // windows is full if (m_SendQueue.size () > 0) // windows is full
Resend (i2p::util::GetMillisecondsSinceEpoch ()); Resend (i2p::util::GetMillisecondsSinceEpoch ());
}
SetSendQueueSize (m_SendQueue.size ()); SetSendQueueSize (m_SendQueue.size ());
} }

Loading…
Cancel
Save