diff --git a/Transports.cpp b/Transports.cpp index b8e35bcb..7414bc1d 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -193,65 +193,82 @@ namespace transport i2p::HandleI2NPMessage (msg); return; } - std::shared_ptr session = m_NTCPServer->FindNTCPSession (ident); - if (!session) + + auto it = m_Peers.find (ident); + if (it == m_Peers.end ()) { auto r = netdb.FindRouter (ident); - if (r) - { - if (m_SSUServer) - session = m_SSUServer->FindSession (r); - if (!session) + it = m_Peers.insert (std::pair(ident, { 0, r, nullptr})).first; + if (!ConnectToPeer (ident, it->second)) + { + DeleteI2NPMessage (msg); + return; + } + } + if (it->second.session) + it->second.session->SendI2NPMessage (msg); + else + it->second.delayedMessages.push_back (msg); + } + + bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer) + { + if (peer.router) // we have RI already + { + if (!peer.numAttempts) // NTCP + { + peer.numAttempts++; + auto address = peer.router->GetNTCPAddress (!context.SupportsV6 ()); + if (address && !peer.router->UsesIntroducer () && !peer.router->IsUnreachable ()) { - // existing session not found. create new - // try NTCP first if message size < 16K - auto address = r->GetNTCPAddress (!context.SupportsV6 ()); - if (address && !r->UsesIntroducer () && !r->IsUnreachable () && msg->GetLength () < NTCP_MAX_MESSAGE_SIZE) - { - auto s = std::make_shared (*m_NTCPServer, r); - session = s; - m_NTCPServer->Connect (address->host, address->port, s); - } - else - { - // then SSU - if (m_SSUServer) - session = m_SSUServer->GetSession (r); - if (!session) - { - LogPrint ("No NTCP and SSU addresses available"); - DeleteI2NPMessage (msg); - } - } + auto s = std::make_shared (*m_NTCPServer, peer.router); + m_NTCPServer->Connect (address->host, address->port, s); + return true; } } - else + else if (peer.numAttempts == 1)// SSU { - LogPrint ("Router not found. Requested"); - i2p::data::netdb.RequestDestination (ident); - auto resendTimer = new boost::asio::deadline_timer (m_Service); - resendTimer->expires_from_now (boost::posix_time::seconds(5)); // 5 seconds - resendTimer->async_wait (boost::bind (&Transports::HandleResendTimer, - this, boost::asio::placeholders::error, resendTimer, ident, msg)); + peer.numAttempts++; + if (m_SSUServer) + { + if (m_SSUServer->GetSession (peer.router)) + return true; + } } + LogPrint (eLogError, "No NTCP and SSU addresses available"); + m_Peers.erase (ident); + return false; + } + else // otherwise request RI + { + LogPrint ("Router not found. Requested"); + i2p::data::netdb.RequestDestination (ident); + auto resendTimer = new boost::asio::deadline_timer (m_Service); + resendTimer->expires_from_now (boost::posix_time::seconds(5)); // 5 seconds + resendTimer->async_wait (boost::bind (&Transports::HandleResendTimer, + this, boost::asio::placeholders::error, resendTimer, ident)); } - if (session) - session->SendI2NPMessage (msg); + return true; } - + void Transports::HandleResendTimer (const boost::system::error_code& ecode, - boost::asio::deadline_timer * timer, const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg) + boost::asio::deadline_timer * timer, const i2p::data::IdentHash& ident) { - auto r = netdb.FindRouter (ident); - if (r) - { - LogPrint ("Router found. Sending message"); - PostMessage (ident, msg); - } - else - { - LogPrint ("Router not found. Failed to send message"); - DeleteI2NPMessage (msg); + auto it = m_Peers.find (ident); + if (it != m_Peers.end ()) + { + auto r = netdb.FindRouter (ident); + if (r) + { + LogPrint ("Router found. Trying to connect"); + it->second.router = r; + ConnectToPeer (ident, it->second); + } + else + { + LogPrint ("Router not found. Failed to send messages"); + m_Peers.erase (it); + } } delete timer; } @@ -304,9 +321,10 @@ namespace transport it->second.session = session; for (auto it1: it->second.delayedMessages) session->SendI2NPMessage (it1); + it->second.delayedMessages.clear (); } - /* else // incoming connection - m_Peers[ident] = { nullptr, session };*/ + else // incoming connection + m_Peers[ident] = { 0, nullptr, session }; }); } @@ -317,8 +335,12 @@ namespace transport auto ident = session->GetRemoteIdentity ().GetIdentHash (); auto it = m_Peers.find (ident); if (it != m_Peers.end ()) - m_Peers.erase (it); - // TODO:: check for delayed messages + { + if (it->second.delayedMessages.size () > 0) + ConnectToPeer (ident, it->second); + else + m_Peers.erase (it); + } }); } } diff --git a/Transports.h b/Transports.h index 6ab9330e..2ea375e2 100644 --- a/Transports.h +++ b/Transports.h @@ -53,6 +53,7 @@ namespace transport struct Peer { + int numAttempts; std::shared_ptr router; std::shared_ptr session; std::list delayedMessages; @@ -88,10 +89,11 @@ namespace transport void Run (); void HandleResendTimer (const boost::system::error_code& ecode, boost::asio::deadline_timer * timer, - const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); + const i2p::data::IdentHash& ident); void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); void PostCloseSession (std::shared_ptr router); - + bool ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer); + void DetectExternalIP (); private: