diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index c62945da..527e5273 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -31,7 +31,8 @@ namespace transport std::shared_ptr addr, bool peerTest): TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT), m_Server (server), m_Address (addr), m_DestConnID (0), m_SourceConnID (0), - m_State (eSSU2SessionStateUnknown), m_SendPacketNum (0), m_ReceivePacketNum (0) + m_State (eSSU2SessionStateUnknown), m_SendPacketNum (0), m_ReceivePacketNum (0), + m_IsDataReceived (false) { m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState); if (in_RemoteRouter && m_Address) @@ -739,15 +740,11 @@ namespace transport m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); m_NumReceivedBytes += len; if (UpdateReceivePacketNum (packetNum)) - { - if (HandlePayload (payload, payloadSize)) - SendQuickAck (); // TODO: don't send too requently - } + HandlePayload (payload, payloadSize); } - bool SSU2Session::HandlePayload (const uint8_t * buf, size_t len) + void SSU2Session::HandlePayload (const uint8_t * buf, size_t len) { - bool isData = false; size_t offset = 0; while (offset < len) { @@ -786,18 +783,18 @@ namespace transport memcpy (nextMsg->GetNTCP2Header (), buf + offset, size); nextMsg->FromNTCP2 (); // SSU2 has the same format as NTCP2 m_Handler.PutNextMessage (std::move (nextMsg)); - isData = true; + m_IsDataReceived = true; break; } case eSSU2BlkFirstFragment: LogPrint (eLogDebug, "SSU2: First fragment"); HandleFirstFragment (buf + offset, size); - isData = true; + m_IsDataReceived = true; break; case eSSU2BlkFollowOnFragment: LogPrint (eLogDebug, "SSU2: Follow-on fragment"); HandleFollowOnFragment (buf + offset, size); - isData = true; + m_IsDataReceived = true; break; case eSSU2BlkTermination: LogPrint (eLogDebug, "SSU2: Termination"); @@ -852,8 +849,6 @@ namespace transport } offset += size; } - m_Handler.Flush (); - return isData; } void SSU2Session::HandleAck (const uint8_t * buf, size_t len) @@ -1215,6 +1210,16 @@ namespace transport m_OutOfSequencePackets.clear (); } } + + void SSU2Session::FlushData () + { + if (m_IsDataReceived) + { + SendQuickAck (); + m_Handler.Flush (); + m_IsDataReceived = false; + } + } SSU2Server::SSU2Server (): RunnableServiceWithWork ("SSU2"), m_ReceiveService ("SSU2r"), @@ -1325,7 +1330,34 @@ namespace transport { i2p::transport::transports.UpdateReceivedBytes (bytes_transferred); packet->len = bytes_transferred; - GetService ().post (std::bind (&SSU2Server::HandleReceivedPacket, this, packet)); + std::vector packets; + packets.push_back (packet); + + boost::system::error_code ec; + size_t moreBytes = socket.available (ec); + if (!ec) + { + while (moreBytes && packets.size () < 32) + { + packet = m_PacketsPool.AcquireMt (); + packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MTU), packet->from, 0, ec); + if (!ec) + { + i2p::transport::transports.UpdateReceivedBytes (packet->len); + packets.push_back (packet); + moreBytes = socket.available(ec); + if (ec) break; + } + else + { + LogPrint (eLogError, "SSU2: receive_from error: code ", ec.value(), ": ", ec.message ()); + m_PacketsPool.ReleaseMt (packet); + break; + } + } + } + + GetService ().post (std::bind (&SSU2Server::HandleReceivedPacket, this, packets)); Receive (socket); } else @@ -1342,13 +1374,12 @@ namespace transport } } - void SSU2Server::HandleReceivedPacket (Packet * packet) + void SSU2Server::HandleReceivedPacket (std::vector packets) { - if (packet) - { + for (auto& packet: packets) ProcessNextPacket (packet->buf, packet->len, packet->from); - m_PacketsPool.ReleaseMt (packet); - } + m_PacketsPool.ReleaseMt (packets); + if (m_LastSession) m_LastSession->FlushData (); } void SSU2Server::AddSession (std::shared_ptr session) @@ -1375,6 +1406,7 @@ namespace transport connID ^= CreateHeaderMask (i2p::context.GetSSU2IntroKey (), buf + (len - 24)); if (!m_LastSession || m_LastSession->GetConnID () != connID) { + if (m_LastSession) m_LastSession->FlushData (); auto it = m_Sessions.find (connID); if (it != m_Sessions.end ()) m_LastSession = it->second; diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index bfc1eaa0..13d149d9 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -136,6 +136,7 @@ namespace transport void Terminate (); void TerminateByTimeout (); void CleanUp (uint64_t ts); + void FlushData (); void Done () override; void SendI2NPMessages (const std::vector >& msgs) override; void Resend (uint64_t ts); @@ -168,7 +169,7 @@ namespace transport void SendQuickAck (); void SendTermination (); - bool HandlePayload (const uint8_t * buf, size_t len); // returns true is contains data + void HandlePayload (const uint8_t * buf, size_t len); void HandleAck (const uint8_t * buf, size_t len); void HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum); bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep); @@ -203,6 +204,7 @@ namespace transport std::map > m_IncompleteMessages; // I2NP std::list > m_SendQueue; i2p::I2NPMessagesHandler m_Handler; + bool m_IsDataReceived; }; class SSU2Server: private i2p::util::RunnableServiceWithWork @@ -255,7 +257,7 @@ namespace transport void Receive (boost::asio::ip::udp::socket& socket); void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, Packet * packet, boost::asio::ip::udp::socket& socket); - void HandleReceivedPacket (Packet * packet); + void HandleReceivedPacket (std::vector packets); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void ScheduleTermination ();