diff --git a/libi2pd/SSU2.cpp b/libi2pd/SSU2.cpp index 6c5e2f82..dc6ad613 100644 --- a/libi2pd/SSU2.cpp +++ b/libi2pd/SSU2.cpp @@ -264,10 +264,10 @@ namespace transport } } - void SSU2Server::AddPendingOutgoingSession (std::shared_ptr session) + bool SSU2Server::AddPendingOutgoingSession (std::shared_ptr session) { - if (session) - m_PendingOutgoingSessions.emplace (session->GetRemoteEndpoint (), session); + if (!session) return false; + return m_PendingOutgoingSessions.emplace (session->GetRemoteEndpoint (), session).second; } std::shared_ptr SSU2Server::FindSession (const i2p::data::IdentHash& ident) const @@ -278,6 +278,14 @@ namespace transport return nullptr; } + std::shared_ptr SSU2Server::FindPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) const + { + auto it = m_PendingOutgoingSessions.find (ep); + if (it != m_PendingOutgoingSessions.end ()) + return it->second; + return nullptr; + } + std::shared_ptr SSU2Server::GetRandomSession (i2p::data::RouterInfo::CompatibleTransports remoteTransports) const { if (m_Sessions.empty ()) return nullptr; @@ -429,14 +437,40 @@ namespace transport { if (router && address) { + // check is no peding session + bool isValidEndpoint = !address->host.is_unspecified () && address->port; + if (isValidEndpoint) + { + auto s = FindPendingOutgoingSession (boost::asio::ip::udp::endpoint (address->host, address->port)); + if (s) + { + if (peerTest) + { + // if peer test requested add it to the list for pending session + auto onEstablished = s->GetOnEstablished (); + if (onEstablished) + s->SetOnEstablished ([s, onEstablished]() + { + onEstablished (); + s->SendPeerTest (); + }); + else + s->SetOnEstablished ([s]() { s->SendPeerTest (); }); + } + return false; + } + } + auto session = std::make_shared (*this, router, address); if (peerTest) session->SetOnEstablished ([session]() {session->SendPeerTest (); }); if (address->UsesIntroducer ()) GetService ().post (std::bind (&SSU2Server::ConnectThroughIntroducer, this, session)); - else + else if (isValidEndpoint) // we can't connect without endpoint GetService ().post ([session]() { session->Connect (); }); + else + return false; } else return false; @@ -479,13 +513,29 @@ namespace transport auto addr = address->IsV6 () ? r->GetSSU2V6Address () : r->GetSSU2V4Address (); if (addr) { - auto s = std::make_shared (*this, r, addr); - s->SetOnEstablished ( - [session, s, relayTag]() + bool isValidEndpoint = !addr->host.is_unspecified () && addr->port; + if (isValidEndpoint) + { + auto s = FindPendingOutgoingSession (boost::asio::ip::udp::endpoint (addr->host, addr->port)); + if (!s) + { + s = std::make_shared (*this, r, addr); + s->SetOnEstablished ([session, s, relayTag]() { s->Introduce (session, relayTag); }); + s->Connect (); + } + else { - s->Introduce (session, relayTag); - }); - s->Connect (); + auto onEstablished = s->GetOnEstablished (); + if (onEstablished) + s->SetOnEstablished ([session, s, relayTag, onEstablished]() + { + onEstablished (); + s->Introduce (session, relayTag); + }); + else + s->SetOnEstablished ([session, s, relayTag]() {s->Introduce (session, relayTag); }); + } + } } } } diff --git a/libi2pd/SSU2.h b/libi2pd/SSU2.h index e90922e3..78eb19c4 100644 --- a/libi2pd/SSU2.h +++ b/libi2pd/SSU2.h @@ -54,8 +54,9 @@ namespace transport void AddSession (std::shared_ptr session); void RemoveSession (uint64_t connID); void AddSessionByRouterHash (std::shared_ptr session); - void AddPendingOutgoingSession (std::shared_ptr session); + bool AddPendingOutgoingSession (std::shared_ptr session); std::shared_ptr FindSession (const i2p::data::IdentHash& ident) const; + std::shared_ptr FindPendingOutgoingSession (const boost::asio::ip::udp::endpoint& ep) const; std::shared_ptr GetRandomSession (i2p::data::RouterInfo::CompatibleTransports remoteTransports) const; void AddRelay (uint32_t tag, std::shared_ptr relay); diff --git a/libi2pd/SSU2Session.cpp b/libi2pd/SSU2Session.cpp index c064943e..77d3b787 100644 --- a/libi2pd/SSU2Session.cpp +++ b/libi2pd/SSU2Session.cpp @@ -403,8 +403,13 @@ namespace transport i2p::crypto::ChaCha20 (headerX, 48, m_Address->i, nonce, headerX); m_NoiseState->MixHash (payload, payloadSize); // h = SHA256(h || encrypted payload from Session Request) for SessionCreated // send - m_Server.AddPendingOutgoingSession (shared_from_this ()); - m_Server.Send (header.buf, 16, headerX, 48, payload, payloadSize, m_RemoteEndpoint); + if (m_Server.AddPendingOutgoingSession (shared_from_this ())) + m_Server.Send (header.buf, 16, headerX, 48, payload, payloadSize, m_RemoteEndpoint); + else + { + LogPrint (eLogWarning, "SSU2: SessionRequest request to ", m_RemoteEndpoint, " already pending"); + Terminate (); + } } void SSU2Session::ProcessSessionRequest (Header& header, uint8_t * buf, size_t len) @@ -739,8 +744,13 @@ namespace transport memset (nonce, 0, 12); i2p::crypto::ChaCha20 (h + 16, 16, m_Address->i, nonce, h + 16); // send - m_Server.AddPendingOutgoingSession (shared_from_this ()); - m_Server.Send (header.buf, 16, h + 16, 16, payload, payloadSize, m_RemoteEndpoint); + if (m_Server.AddPendingOutgoingSession (shared_from_this ())) + m_Server.Send (header.buf, 16, h + 16, 16, payload, payloadSize, m_RemoteEndpoint); + else + { + LogPrint (eLogWarning, "SSU2: TokenRequest request to ", m_RemoteEndpoint, " already pending"); + Terminate (); + } } void SSU2Session::ProcessTokenRequest (Header& header, uint8_t * buf, size_t len) diff --git a/libi2pd/SSU2Session.h b/libi2pd/SSU2Session.h index 5b45253d..2439033a 100644 --- a/libi2pd/SSU2Session.h +++ b/libi2pd/SSU2Session.h @@ -176,6 +176,7 @@ namespace transport i2p::data::RouterInfo::CompatibleTransports GetRemoteTransports () const { return m_RemoteTransports; }; std::shared_ptr GetAddress () const { return m_Address; }; void SetOnEstablished (OnEstablished e) { m_OnEstablished = e; }; + OnEstablished GetOnEstablished () const { return m_OnEstablished; }; void Connect (); bool Introduce (std::shared_ptr session, uint32_t relayTag);