diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 724759b5..96541ea7 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -62,6 +62,23 @@ namespace transport SendTokenRequest (); } + void SSU2Session::Terminate () + { + if (m_State != eSSU2SessionStateTerminated) + { + m_State = eSSU2SessionStateTerminated; + transports.PeerDisconnected (shared_from_this ()); + m_Server.RemoveSession (m_SourceConnID); + LogPrint (eLogDebug, "SSU2: Session terminated"); + } + } + + void SSU2Session::TerminateByTimeout () + { + SendTermination (); + m_Server.GetService ().post (std::bind (&SSU2Session::Terminate, shared_from_this ())); + } + void SSU2Session::Established () { m_State = eSSU2SessionStateEstablished; @@ -129,7 +146,7 @@ namespace transport i2p::crypto::ChaCha20 (headerX, 48, m_Address->i, nonce, headerX); m_NoiseState->MixHash (payload, payloadSize); // h = SHA256(h || encrypted payload from Session Request) for SessionCreated // send - m_Server.AddPendingOutgoingSession (m_RemoteEndpoint, shared_from_this ()); + m_Server.AddPendingOutgoingSession (shared_from_this ()); m_Server.Send (header.buf, 16, headerX, 48, payload, payloadSize, m_RemoteEndpoint); } @@ -159,7 +176,7 @@ namespace transport // payload HandlePayload (decryptedPayload.data (), decryptedPayload.size ()); - m_Server.AddSession (m_SourceConnID, shared_from_this ()); + m_Server.AddSession (shared_from_this ()); SendSessionCreated (headerX + 16); } @@ -241,7 +258,7 @@ namespace transport // payload HandlePayload (decryptedPayload.data (), decryptedPayload.size ()); - m_Server.AddSession (m_SourceConnID, shared_from_this ()); + m_Server.AddSession (shared_from_this ()); SendSessionConfirmed (headerX + 16); KDFDataPhase (m_KeyDataSend, m_KeyDataReceive); Established (); @@ -423,7 +440,7 @@ namespace transport header.ll[1] ^= CreateHeaderMask (m_Address->i, payload + (payloadSize - 12)); i2p::crypto::ChaCha20 (h + 16, 16, m_Address->i, nonce, h + 16); // send - m_Server.AddPendingOutgoingSession (m_RemoteEndpoint, shared_from_this ()); + m_Server.AddPendingOutgoingSession (shared_from_this ()); m_Server.Send (header.buf, 16, h + 16, 16, payload, payloadSize, m_RemoteEndpoint); } @@ -620,6 +637,8 @@ namespace transport case eSSU2BlkFollowOnFragment: break; case eSSU2BlkTermination: + LogPrint (eLogDebug, "SSU2: Termination"); + Terminate (); break; case eSSU2BlkRelayRequest: break; @@ -781,6 +800,17 @@ namespace transport payloadSize += CreatePaddingBlock (payload + payloadSize, SSU2_MTU - payloadSize); SendData (payload, payloadSize); } + + void SSU2Session::SendTermination () + { + uint8_t payload[32]; + size_t payloadSize = 12; + payload[0] = eSSU2BlkTermination; + htobe16buf (payload + 1, 9); + memset (payload + 3, 0, 9); + payloadSize += CreatePaddingBlock (payload + payloadSize, 32 - payloadSize); + SendData (payload, payloadSize); + } SSU2Server::SSU2Server (): RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()), @@ -886,14 +916,21 @@ namespace transport } } - void SSU2Server::AddSession (uint64_t connID, std::shared_ptr session) + void SSU2Server::AddSession (std::shared_ptr session) { - m_Sessions.emplace (connID, session); + if (session) + m_Sessions.emplace (session->GetConnID (), session); } - void SSU2Server::AddPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep, std::shared_ptr session) + void SSU2Server::RemoveSession (uint64_t connID) { - m_PendingOutgoingSessions.emplace (ep, session); + m_Sessions.erase (connID); + } + + void SSU2Server::AddPendingOutgoingSession (std::shared_ptr session) + { + if (session) + m_PendingOutgoingSessions.emplace (session->GetRemoteEndpoint (), session); } void SSU2Server::ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint) @@ -1005,7 +1042,8 @@ namespace transport { if (it->second->IsTerminationTimeoutExpired (ts)) { - //it->second->Terminate (); + if (it->second->IsEstablished ()) + it->second->TerminateByTimeout (); it = m_Sessions.erase (it); } else diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index 7aa8483f..2ebabf3c 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -69,7 +69,7 @@ namespace transport { eSSU2SessionStateUnknown, eSSU2SessionStateEstablished, - eSSU2SessionStateClosed, + eSSU2SessionStateTerminated, eSSU2SessionStateFailed }; @@ -101,12 +101,15 @@ namespace transport ~SSU2Session (); void SetRemoteEndpoint (const boost::asio::ip::udp::endpoint& ep) { m_RemoteEndpoint = ep; }; - + const boost::asio::ip::udp::endpoint& GetRemoteEndpoint () const { return m_RemoteEndpoint; }; + void Connect (); - void Established (); + void Terminate (); + void TerminateByTimeout (); void Done () override {}; void SendI2NPMessages (const std::vector >& msgs) override {}; bool IsEstablished () const { return m_State == eSSU2SessionStateEstablished; }; + uint64_t GetConnID () const { return m_SourceConnID; }; void ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len); bool ProcessSessionCreated (uint8_t * buf, size_t len); @@ -116,6 +119,8 @@ namespace transport private: + void Established (); + void ProcessSessionRequest (Header& header, uint8_t * buf, size_t len); void ProcessTokenRequest (Header& header, uint8_t * buf, size_t len); @@ -127,6 +132,7 @@ namespace transport void SendRetry (); void SendData (const uint8_t * buf, size_t len); void SendQuickAck (); + void SendTermination (); void HandlePayload (const uint8_t * buf, size_t len); bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep); @@ -169,8 +175,9 @@ namespace transport void Stop (); boost::asio::io_service& GetService () { return GetIOService (); }; - void AddSession (uint64_t connID, std::shared_ptr session); - void AddPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep, std::shared_ptr session); + void AddSession (std::shared_ptr session); + void RemoveSession (uint64_t connID); + void AddPendingOutgoingSession (std::shared_ptr session); void Send (const uint8_t * header, size_t headerLen, const uint8_t * payload, size_t payloadLen, const boost::asio::ip::udp::endpoint& to);