diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 8a066f39..54783f9b 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -69,6 +69,7 @@ namespace transport m_State = eSSU2SessionStateTerminated; transports.PeerDisconnected (shared_from_this ()); m_Server.RemoveSession (m_SourceConnID); + m_SendQueue.clear (); LogPrint (eLogDebug, "SSU2: Session terminated"); } } @@ -85,6 +86,65 @@ namespace transport m_EphemeralKeys = nullptr; m_NoiseState.reset (nullptr); SetTerminationTimeout (SSU2_TERMINATION_TIMEOUT); + transports.PeerConnected (shared_from_this ()); + } + + void SSU2Session::Done () + { + m_Server.GetService ().post (std::bind (&SSU2Session::Terminate, shared_from_this ())); + } + + void SSU2Session::SendI2NPMessages (const std::vector >& msgs) + { + m_Server.GetService ().post (std::bind (&SSU2Session::PostI2NPMessages, shared_from_this (), msgs)); + } + + void SSU2Session::PostI2NPMessages (std::vector > msgs) + { + for (auto it: msgs) + m_SendQueue.push_back (it); + SendQueue (); + } + + void SSU2Session::SendQueue () + { + if (!m_SendQueue.empty ()) + { + uint8_t payload[SSU2_MAX_PAYLOAD_SIZE]; + size_t payloadSize = 0; + payloadSize += CreateAckBlock (payload + payloadSize, SSU2_MAX_PAYLOAD_SIZE - payloadSize); + while (!m_SendQueue.empty ()) + { + auto msg = m_SendQueue.front (); + size_t len = msg->GetNTCP2Length (); + if (len + 3 < SSU2_MAX_PAYLOAD_SIZE - payloadSize) + { + m_SendQueue.pop_front (); + payloadSize += CreateI2NPBlock (payload + payloadSize, SSU2_MAX_PAYLOAD_SIZE - payloadSize, std::move (msg)); + } + else if (len > SSU2_MAX_PAYLOAD_SIZE - 32) // message too long + m_SendQueue.pop_front (); // drop it. TODO: fragmentation + else + { + // send right a way + if (payloadSize + 16 < SSU2_MAX_PAYLOAD_SIZE) + payloadSize += CreatePaddingBlock (payload + payloadSize, SSU2_MAX_PAYLOAD_SIZE - payloadSize); + auto packet = SendData (payload, payloadSize); + if (packet) + m_SentPackets.emplace (packet->packetNum, packet); + payloadSize = 0; + payloadSize += CreateAckBlock (payload + payloadSize, SSU2_MAX_PAYLOAD_SIZE - payloadSize); + } + }; + if (payloadSize) + { + if (payloadSize + 16 < SSU2_MAX_PAYLOAD_SIZE) + payloadSize += CreatePaddingBlock (payload + payloadSize, SSU2_MAX_PAYLOAD_SIZE - payloadSize); + auto packet = SendData (payload, payloadSize); + if (packet) + m_SentPackets.emplace (packet->packetNum, packet); + } + } } void SSU2Session::ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len) @@ -534,28 +594,33 @@ namespace transport return true; } - void SSU2Session::SendData (const uint8_t * buf, size_t len) + std::shared_ptr SSU2Session::SendData (const uint8_t * buf, size_t len) { if (len < 8) { LogPrint (eLogWarning, "SSU2: Data message payload is too short ", (int)len); - return; + return nullptr; } - Header header; + auto packet = std::make_shared(); + packet->packetNum = m_SendPacketNum; + packet->numResends = 0; + Header& header = packet->header; header.h.connID = m_DestConnID; header.h.packetNum = htobe32 (m_SendPacketNum); header.h.type = eSSU2Data; memset (header.h.flags, 0, 3); - uint8_t payload[SSU2_MTU]; uint8_t nonce[12]; CreateNonce (m_SendPacketNum, nonce); - i2p::crypto::AEADChaCha20Poly1305 (buf, len, header.buf, 16, m_KeyDataSend, nonce, payload, SSU2_MTU, true); - header.ll[0] ^= CreateHeaderMask (m_Address->i, payload + (len - 8)); - header.ll[1] ^= CreateHeaderMask (m_KeyDataSend + 32, payload + (len + 4)); - m_Server.Send (header.buf, 16, payload, len + 16, m_RemoteEndpoint); + i2p::crypto::AEADChaCha20Poly1305 (buf, len, header.buf, 16, m_KeyDataSend, nonce, packet->payload, SSU2_MTU, true); + header.ll[0] ^= CreateHeaderMask (m_Address->i, packet->payload + (len - 8)); + header.ll[1] ^= CreateHeaderMask (m_KeyDataSend + 32, packet->payload + (len + 4)); + packet->payloadLen = len + 16; + m_Server.Send (header.buf, 16, packet->payload, packet->payloadLen, m_RemoteEndpoint); m_SendPacketNum++; m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); - m_NumSentBytes += len + 32; + packet->nextResendTime = m_LastActivityTimestamp + SSU2_RESEND_INTERVAL; + m_NumSentBytes += packet->payloadLen + 16; + return packet; } void SSU2Session::ProcessData (uint8_t * buf, size_t len) @@ -797,6 +862,18 @@ namespace transport return 0; return paddingSize + 3; } + + size_t SSU2Session::CreateI2NPBlock (uint8_t * buf, size_t len, std::shared_ptr&& msg) + { + msg->ToNTCP2 (); + auto msgBuf = msg->GetNTCP2Header (); + auto msgLen = msg->GetNTCP2Length (); + if (msgLen + 3 > len) msgLen = len - 3; + buf[0] = eSSU2BlkI2NPMessage; + htobe16buf (buf + 1, msgLen); // size + memcpy (buf + 3, msgBuf, msgLen); + return msgLen + 3; + } std::shared_ptr SSU2Session::ExtractRouterInfo (const uint8_t * buf, size_t size) { diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 37a6f8fd..8b04bbc4 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -29,6 +29,9 @@ namespace transport const size_t SSU2_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K const size_t SSU2_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K const size_t SSU2_MTU = 1488; + const size_t SSU2_MAX_PAYLOAD_SIZE = SSU2_MTU - 32; + const int SSU2_RESEND_INTERVAL = 3; // in seconds + const int SSU2_MAX_NUM_RESENDS = 5; enum SSU2MessageType { @@ -97,9 +100,10 @@ namespace transport struct SentPacket { - Header h; + Header header; uint8_t payload[SSU2_MTU]; - size_t payloadLen; + size_t payloadLen; + uint32_t packetNum; uint32_t nextResendTime; // in seconds int numResends; }; @@ -116,8 +120,8 @@ namespace transport void Connect (); void Terminate (); void TerminateByTimeout (); - void Done () override {}; - void SendI2NPMessages (const std::vector >& msgs) override {}; + void Done () override; + void SendI2NPMessages (const std::vector >& msgs) override; bool IsEstablished () const { return m_State == eSSU2SessionStateEstablished; }; uint64_t GetConnID () const { return m_SourceConnID; }; @@ -130,6 +134,8 @@ namespace transport private: void Established (); + void PostI2NPMessages (std::vector > msgs); + void SendQueue (); void ProcessSessionRequest (Header& header, uint8_t * buf, size_t len); void ProcessTokenRequest (Header& header, uint8_t * buf, size_t len); @@ -140,7 +146,7 @@ namespace transport void KDFDataPhase (uint8_t * keydata_ab, uint8_t * keydata_ba); void SendTokenRequest (); void SendRetry (); - void SendData (const uint8_t * buf, size_t len); + std::shared_ptr SendData (const uint8_t * buf, size_t len); void SendQuickAck (); void SendTermination (); @@ -154,6 +160,7 @@ namespace transport size_t CreateAddressBlock (const boost::asio::ip::udp::endpoint& ep, uint8_t * buf, size_t len); size_t CreateAckBlock (uint8_t * buf, size_t len); size_t CreatePaddingBlock (uint8_t * buf, size_t len, size_t minSize = 0); + size_t CreateI2NPBlock (uint8_t * buf, size_t len, std::shared_ptr&& msg); private: @@ -168,6 +175,7 @@ namespace transport uint32_t m_SendPacketNum, m_ReceivePacketNum; std::set m_OutOfSequencePackets; // packet nums > receive packet num std::map > m_SentPackets; // packetNum -> packet + std::list > m_SendQueue; i2p::I2NPMessagesHandler m_Handler; };