diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index e2f8c730..03416137 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -447,28 +447,29 @@ namespace transport return std::max (bwCongestionLevel, tbwCongestionLevel); } - void Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg) + std::future > Transports::SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg) { if (m_IsOnline) - SendMessages (ident, { msg }); + return SendMessages (ident, { msg }); + return {}; // invalid future } - void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs) + std::future > Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs) { std::list > msgs1; msgs.swap (msgs1); - SendMessages (ident, std::move (msgs1)); + return SendMessages (ident, std::move (msgs1)); } - void Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs) + std::future > Transports::SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs) { - boost::asio::post (*m_Service, [this, ident, msgs = std::move(msgs)] () mutable + return boost::asio::post (*m_Service, boost::asio::use_future ([this, ident, msgs = std::move(msgs)] () mutable { - PostMessages (ident, msgs); - }); + return PostMessages (ident, msgs); + })); } - void Transports::PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs) + std::shared_ptr Transports::PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs) { if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) { @@ -476,9 +477,9 @@ namespace transport for (auto& it: msgs) m_LoopbackHandler.PutNextMessage (std::move (it)); m_LoopbackHandler.Flush (); - return; + return nullptr; } - if(RoutesRestricted() && !IsRestrictedPeer(ident)) return; + if(RoutesRestricted() && !IsRestrictedPeer(ident)) return nullptr; std::shared_ptr peer; { std::lock_guard l(m_PeersMutex); @@ -489,13 +490,13 @@ namespace transport if (!peer) { // check if not banned - if (i2p::data::IsRouterBanned (ident)) return; // don't create peer to unreachable router + if (i2p::data::IsRouterBanned (ident)) return nullptr; // don't create peer to unreachable router // try to connect bool connected = false; try { auto r = netdb.FindRouter (ident); - if (r && (r->IsUnreachable () || !r->IsReachableFrom (i2p::context.GetRouterInfo ()))) return; // router found but non-reachable + if (r && (r->IsUnreachable () || !r->IsReachableFrom (i2p::context.GetRouterInfo ()))) return nullptr; // router found but non-reachable peer = std::make_shared(r, i2p::util::GetSecondsSinceEpoch ()); { @@ -509,12 +510,16 @@ namespace transport { LogPrint (eLogError, "Transports: PostMessages exception:", ex.what ()); } - if (!connected) return; + if (!connected) return nullptr; } - if (!peer) return; + if (!peer) return nullptr; if (peer->IsConnected ()) - peer->sessions.front ()->SendI2NPMessages (msgs); + { + auto session = peer->sessions.front (); + if (session) session->SendI2NPMessages (msgs); + return session; + } else { auto sz = peer->delayedMessages.size (); @@ -527,7 +532,7 @@ namespace transport LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped"); std::lock_guard l(m_PeersMutex); m_Peers.erase (ident); - return; + return nullptr; } } if (sz > MAX_NUM_DELAYED_MESSAGES/2) @@ -549,6 +554,7 @@ namespace transport m_Peers.erase (ident); } } + return nullptr; } bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer) diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index 5829b9f3..b72d5b70 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -143,9 +144,9 @@ namespace transport std::shared_ptr GetNextX25519KeysPair (); void ReuseX25519KeysPair (std::shared_ptr pair); - void SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg); - void SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs); - void SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs); + std::future > SendMessage (const i2p::data::IdentHash& ident, std::shared_ptr msg); + std::future > SendMessages (const i2p::data::IdentHash& ident, std::list >& msgs); + std::future > SendMessages (const i2p::data::IdentHash& ident, std::list >&& msgs); void PeerConnected (std::shared_ptr session); void PeerDisconnected (std::shared_ptr session); @@ -189,7 +190,7 @@ namespace transport void Run (); void RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident); void HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident); - void PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs); + std::shared_ptr PostMessages (const i2p::data::IdentHash& ident, std::list >& msgs); bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer); void SetPriority (std::shared_ptr peer) const; void HandlePeerCleanupTimer (const boost::system::error_code& ecode); diff --git a/libi2pd/TunnelGateway.cpp b/libi2pd/TunnelGateway.cpp index 78a63fc4..4b9e037a 100644 --- a/libi2pd/TunnelGateway.cpp +++ b/libi2pd/TunnelGateway.cpp @@ -220,6 +220,7 @@ namespace tunnel void TunnelGateway::SendBuffer () { + // create list or tunnel messages m_Buffer.CompleteCurrentTunnelDataMessage (); std::list > newTunnelMsgs; const auto& tunnelDataMsgs = m_Buffer.GetTunnelDataMsgs (); @@ -234,7 +235,36 @@ namespace tunnel m_NumSentBytes += TUNNEL_DATA_MSG_SIZE; } m_Buffer.ClearTunnelDataMsgs (); - i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs)); + + // send + if (m_CurrentTransport && !m_CurrentTransport->IsEstablished ()) // check if session became invalid since last call + m_CurrentTransport = nullptr; + if (!m_CurrentTransport) + { + // try to obtain transport from peding reequest or send thought transport is not complete + if (m_PendingTransport.valid ()) // pending request? + { + if (m_PendingTransport.wait_for(std::chrono::seconds(0)) == std::future_status::ready) + { + // pending request complete + m_CurrentTransport = m_PendingTransport.get (); // take tarnsports used in pending request + if (m_CurrentTransport && !m_CurrentTransport->IsEstablished ()) + m_CurrentTransport = nullptr; + } + else // still pending + { + // send through transports, but don't update pedning transport + i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs)); + return; + } + } + } + if (m_CurrentTransport) // session is good + // send to session directly + m_CurrentTransport->SendI2NPMessages (newTunnelMsgs); + else // no session yet + // send through transports + m_PendingTransport = i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), std::move (newTunnelMsgs)); } } } diff --git a/libi2pd/TunnelGateway.h b/libi2pd/TunnelGateway.h index 741bbe84..ba36ac73 100644 --- a/libi2pd/TunnelGateway.h +++ b/libi2pd/TunnelGateway.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2021, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -12,7 +12,9 @@ #include #include #include +#include #include "I2NPProtocol.h" +#include "TransportSession.h" #include "TunnelBase.h" namespace i2p @@ -57,6 +59,8 @@ namespace tunnel TunnelBase * m_Tunnel; TunnelGatewayBuffer m_Buffer; size_t m_NumSentBytes; + std::shared_ptr m_CurrentTransport; + std::future > m_PendingTransport; }; } }