diff --git a/libi2pd/NTCP2.cpp b/libi2pd/NTCP2.cpp index 2f2c92b1..164ec217 100644 --- a/libi2pd/NTCP2.cpp +++ b/libi2pd/NTCP2.cpp @@ -412,6 +412,7 @@ namespace transport m_IsEstablished = true; m_Establisher.reset (nullptr); SetTerminationTimeout (NTCP2_TERMINATION_TIMEOUT); + SendQueue (); transports.PeerConnected (shared_from_this ()); } @@ -1133,7 +1134,7 @@ namespace transport void NTCP2Session::SendQueue () { - if (!m_SendQueue.empty ()) + if (!m_SendQueue.empty () && m_IsEstablished) { std::vector > msgs; auto ts = i2p::util::GetMillisecondsSinceEpoch (); @@ -1170,6 +1171,21 @@ namespace transport } } + void NTCP2Session::MoveSendQueue (std::shared_ptr other) + { + if (!other || m_SendQueue.empty ()) return; + std::vector > msgs; + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + for (auto it: m_SendQueue) + if (!it->IsExpired (ts)) + msgs.push_back (it); + else + it->Drop (); + m_SendQueue.clear (); + if (!msgs.empty ()) + other->PostI2NPMessages (msgs); + } + size_t NTCP2Session::CreatePaddingBlock (size_t msgLen, uint8_t * buf, size_t len) { if (len < 3) return 0; @@ -1263,7 +1279,7 @@ namespace transport else m_SendQueue.push_back (std::move (it)); - if (!m_IsSending) + if (!m_IsSending && m_IsEstablished) SendQueue (); else if (m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE) { @@ -1426,6 +1442,7 @@ namespace transport { // replace by new session auto s = it->second; + s->MoveSendQueue (session); m_NTCP2Sessions.erase (it); s->Terminate (); } diff --git a/libi2pd/NTCP2.h b/libi2pd/NTCP2.h index a566b38d..1efb482b 100644 --- a/libi2pd/NTCP2.h +++ b/libi2pd/NTCP2.h @@ -151,7 +151,8 @@ namespace transport void SendLocalRouterInfo (bool update) override; // after handshake or by update void SendI2NPMessages (const std::vector >& msgs) override; - + void MoveSendQueue (std::shared_ptr other); + private: void Established (); diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index 9552d20a..f5b36fcf 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -383,10 +383,12 @@ namespace transport m_SendQueue.push_back (std::move (it)); } } - SendQueue (); - - if (m_SendQueue.size () > 0) // windows is full - Resend (i2p::util::GetMillisecondsSinceEpoch ()); + if (IsEstablished ()) + { + SendQueue (); + if (m_SendQueue.size () > 0) // windows is full + Resend (i2p::util::GetMillisecondsSinceEpoch ()); + } SetSendQueueSize (m_SendQueue.size ()); }