From a046af1806f4915b85f53f7ad03259a347ab67bf Mon Sep 17 00:00:00 2001 From: orignal Date: Wed, 15 Oct 2014 12:07:06 -0400 Subject: [PATCH] don't use netDb subcriptions anymore --- HTTPServer.cpp | 2 +- I2PTunnel.cpp | 57 +++++++++++++++++++++++++++++++++----------------- I2PTunnel.h | 6 +++++- NetDb.cpp | 30 -------------------------- NetDb.h | 4 ---- SOCKS.cpp | 2 +- 6 files changed, 45 insertions(+), 56 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 38759c3e..fcbeb7df 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -858,7 +858,7 @@ namespace util auto leaseSet = i2p::stream::GetSharedLocalDestination ()->FindLeaseSet (destination); if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) { - i2p::data::netdb.Subscribe(destination, i2p::stream::GetSharedLocalDestination ()->GetTunnelPool ()); + i2p::data::netdb.RequestDestination (destination, true, i2p::stream::GetSharedLocalDestination ()->GetTunnelPool ()); std::this_thread::sleep_for (std::chrono::seconds(10)); // wait for 10 seconds leaseSet = i2p::stream::GetSharedLocalDestination ()->FindLeaseSet (destination); if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) // still no LeaseSet diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 765bc997..6a19deb2 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -147,7 +147,8 @@ namespace stream int port, StreamingDestination * localDestination): I2PTunnel (service, localDestination ? localDestination : GetSharedLocalDestination ()), m_Acceptor (service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)), - m_Destination (destination), m_DestinationIdentHash (nullptr), m_RemoteLeaseSet (nullptr) + m_Timer (service), m_Destination (destination), m_DestinationIdentHash (nullptr), + m_RemoteLeaseSet (nullptr) { } @@ -161,12 +162,7 @@ namespace stream i2p::data::IdentHash identHash; if (i2p::data::netdb.GetAddressBook ().GetIdentHash (m_Destination, identHash)) m_DestinationIdentHash = new i2p::data::IdentHash (identHash); - if (m_DestinationIdentHash) - { - i2p::data::netdb.Subscribe (*m_DestinationIdentHash, GetLocalDestination ()->GetTunnelPool ()); - m_RemoteLeaseSet = GetLocalDestination ()->FindLeaseSet (*m_DestinationIdentHash); - } - else + if (!m_DestinationIdentHash) LogPrint ("I2PTunnel unknown destination ", m_Destination); m_Acceptor.listen (); Accept (); @@ -175,6 +171,7 @@ namespace stream void I2PClientTunnel::Stop () { m_Acceptor.close(); + m_Timer.cancel (); ClearConnections (); m_DestinationIdentHash = nullptr; } @@ -201,28 +198,50 @@ namespace stream if (i2p::data::netdb.GetAddressBook ().GetIdentHash (m_Destination, identHash)) { m_DestinationIdentHash = new i2p::data::IdentHash (identHash); - i2p::data::netdb.Subscribe (*m_DestinationIdentHash, GetLocalDestination ()->GetTunnelPool ()); + i2p::data::netdb.RequestDestination (*m_DestinationIdentHash, GetLocalDestination ()->GetTunnelPool ()); + m_Timer.expires_from_now (boost::posix_time::seconds (I2P_TUNNEL_DESTINATION_REQUEST_TIMEOUT)); + m_Timer.async_wait (boost::bind (&I2PClientTunnel::HandleDestinationRequestTimer, + this, boost::asio::placeholders::error, socket)); } } } - - if (m_RemoteLeaseSet) // leaseSet found - { - LogPrint ("New I2PTunnel connection"); - auto connection = new I2PTunnelConnection (this, socket, m_RemoteLeaseSet); - AddConnection (connection); - } else - { - LogPrint ("LeaseSet for I2PTunnel destination not found"); - delete socket; - } + CreateConnection (socket); Accept (); } else delete socket; } + void I2PClientTunnel::HandleDestinationRequestTimer (const boost::system::error_code& ecode, boost::asio::ip::tcp::socket * socket) + { + if (ecode != boost::asio::error::operation_aborted) + { + if (m_DestinationIdentHash) + { + m_RemoteLeaseSet = GetLocalDestination ()->FindLeaseSet (*m_DestinationIdentHash); + CreateConnection (socket); + return; + } + } + delete socket; + } + + void I2PClientTunnel::CreateConnection (boost::asio::ip::tcp::socket * socket) + { + if (m_RemoteLeaseSet) // leaseSet found + { + LogPrint ("New I2PTunnel connection"); + auto connection = new I2PTunnelConnection (this, socket, m_RemoteLeaseSet); + AddConnection (connection); + } + else + { + LogPrint ("LeaseSet for I2PTunnel destination not found"); + delete socket; + } + } + I2PServerTunnel::I2PServerTunnel (boost::asio::io_service& service, const std::string& address, int port, StreamingDestination * localDestination): I2PTunnel (service, localDestination), m_Endpoint (boost::asio::ip::address::from_string (address), port) diff --git a/I2PTunnel.h b/I2PTunnel.h index ea452c9f..ead14a9f 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -14,6 +14,7 @@ namespace stream { const size_t I2P_TUNNEL_CONNECTION_BUFFER_SIZE = 8192; const int I2P_TUNNEL_CONNECTION_MAX_IDLE = 3600; // in seconds + const int I2P_TUNNEL_DESTINATION_REQUEST_TIMEOUT = 10; // in seconds class I2PTunnel; class I2PTunnelConnection @@ -84,10 +85,13 @@ namespace stream void Accept (); void HandleAccept (const boost::system::error_code& ecode, boost::asio::ip::tcp::socket * socket); - + void HandleDestinationRequestTimer (const boost::system::error_code& ecode, boost::asio::ip::tcp::socket * socket); + void CreateConnection (boost::asio::ip::tcp::socket * socket); + private: boost::asio::ip::tcp::acceptor m_Acceptor; + boost::asio::deadline_timer m_Timer; std::string m_Destination; const i2p::data::IdentHash * m_DestinationIdentHash; const i2p::data::LeaseSet * m_RemoteLeaseSet; diff --git a/NetDb.cpp b/NetDb.cpp index ce544b55..eb7561f7 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -149,7 +149,6 @@ namespace data { SaveUpdated (m_NetDbPath); ManageLeaseSets (); - ValidateSubscriptions (); } lastSave = ts; } @@ -856,35 +855,6 @@ namespace data return r; } - void NetDb::Subscribe (const IdentHash& ident, i2p::tunnel::TunnelPool * pool) - { - LeaseSet * leaseSet = FindLeaseSet (ident); - if (!leaseSet) - { - LogPrint ("LeaseSet requested"); - RequestDestination (ident, true, pool); - } - m_Subscriptions[ident] = pool; - } - - void NetDb::Unsubscribe (const IdentHash& ident) - { - m_Subscriptions.erase (ident); - } - - void NetDb::ValidateSubscriptions () - { - for (auto it : m_Subscriptions) - { - LeaseSet * leaseSet = FindLeaseSet (it.first); - if (!leaseSet || leaseSet->HasExpiredLeases ()) - { - LogPrint ("LeaseSet re-requested"); - RequestDestination (it.first, true, it.second); - } - } - } - void NetDb::ManageLeaseSets () { for (auto it = m_LeaseSets.begin (); it != m_LeaseSets.end ();) diff --git a/NetDb.h b/NetDb.h index d05b155b..ca62163c 100644 --- a/NetDb.h +++ b/NetDb.h @@ -69,8 +69,6 @@ namespace data LeaseSet * FindLeaseSet (const IdentHash& destination) const; AddressBook& GetAddressBook () { return m_AddressBook; };// TODO: move AddressBook away from NetDb - void Subscribe (const IdentHash& ident, i2p::tunnel::TunnelPool * pool = nullptr); // keep LeaseSets upto date - void Unsubscribe (const IdentHash& ident); void PublishLeaseSet (const LeaseSet * leaseSet, i2p::tunnel::TunnelPool * pool); void RequestDestination (const IdentHash& destination, bool isLeaseSet = false, i2p::tunnel::TunnelPool * pool = nullptr); @@ -98,7 +96,6 @@ namespace data void Run (); // exploratory thread void Explore (int numDestinations); void Publish (); - void ValidateSubscriptions (); const RouterInfo * GetClosestFloodfill (const IdentHash& destination, const std::set& excluded) const; void ManageLeaseSets (); @@ -118,7 +115,6 @@ namespace data std::vector m_Floodfills; std::mutex m_RequestedDestinationsMutex; std::map m_RequestedDestinations; - std::map m_Subscriptions; bool m_IsRunning; int m_ReseedRetries; diff --git a/SOCKS.cpp b/SOCKS.cpp index 9968b97c..284337e5 100644 --- a/SOCKS.cpp +++ b/SOCKS.cpp @@ -159,7 +159,7 @@ namespace proxy LogPrint("--- sock4a find lease set"); m_ls = i2p::data::netdb.FindLeaseSet(m_dest); if (!m_ls || m_ls->HasNonExpiredLeases()) { - i2p::data::netdb.Subscribe(m_dest); + i2p::data::netdb.RequestDestination (m_dest, true, i2p::stream::GetSharedLocalDestination ()->GetTunnelPool ()); m_ls_timer.expires_from_now(boost::posix_time::seconds(socks_leaseset_timeout)); m_ls_timer.async_wait(boost::bind(&SOCKS4AHandler::LeaseSetTimeout, this, boost::asio::placeholders::error)); } else {