From 3f409d0e285489e5b66caf176e038af0e7fe469b Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 09:53:31 -0400 Subject: [PATCH 01/11] add deferred ready checking for destination --- libi2pd/Destination.cpp | 396 ++++++++++++++++++++++------------------ libi2pd/Destination.h | 14 +- 2 files changed, 231 insertions(+), 179 deletions(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index d45fdf47..5f17856b 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -13,9 +13,10 @@ namespace i2p namespace client { LeaseSetDestination::LeaseSetDestination (bool isPublic, const std::map * params): - m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), - m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), - m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service) + m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), + m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), + m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service), + m_ReadyCheckTimer(m_Service) { int inLen = DEFAULT_INBOUND_TUNNEL_LENGTH; int inQty = DEFAULT_INBOUND_TUNNELS_QUANTITY; @@ -83,77 +84,80 @@ namespace client LeaseSetDestination::~LeaseSetDestination () { - if (m_IsRunning) + if (m_IsRunning) Stop (); if (m_Pool) - i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); + i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); for (auto& it: m_LeaseSetRequests) it.second->Complete (nullptr); - } + } void LeaseSetDestination::Run () { while (m_IsRunning) { try - { + { m_Service.run (); } catch (std::exception& ex) { LogPrint (eLogError, "Destination: runtime exception: ", ex.what ()); - } - } - } + } + } + } bool LeaseSetDestination::Start () - { + { if (!m_IsRunning) - { + { LoadTags (); m_IsRunning = true; m_Pool->SetLocalDestination (shared_from_this ()); - m_Pool->SetActive (true); + m_Pool->SetActive (true); m_CleanupTimer.expires_from_now (boost::posix_time::minutes (DESTINATION_CLEANUP_TIMEOUT)); m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); m_Thread = new std::thread (std::bind (&LeaseSetDestination::Run, shared_from_this ())); - + m_ReadyCheckTimer.expires_from_now(boost::posix_time::seconds (1)); + m_ReadyCheckTimer.async_wait(std::bind(&LeaseSetDestination::HandleReadyCheckTimer, + shared_from_this (), std::placeholders::_1)); return true; - } + } else return false; } - + bool LeaseSetDestination::Stop () - { + { if (m_IsRunning) - { + { m_CleanupTimer.cancel (); m_PublishConfirmationTimer.cancel (); - m_PublishVerificationTimer.cancel (); + m_PublishVerificationTimer.cancel (); + m_ReadyCheckTimer.cancel (); m_IsRunning = false; if (m_Pool) - { + { m_Pool->SetLocalDestination (nullptr); i2p::tunnel::tunnels.StopTunnelPool (m_Pool); - } + } m_Service.stop (); if (m_Thread) - { - m_Thread->join (); + { + m_Thread->join (); delete m_Thread; m_Thread = 0; - } + } SaveTags (); CleanUp (); // GarlicDestination return true; - } + } else return false; - } - + } + std::shared_ptr LeaseSetDestination::FindLeaseSet (const i2p::data::IdentHash& ident) { std::shared_ptr remoteLS; @@ -164,7 +168,7 @@ namespace client remoteLS = it->second; } - if (remoteLS) + if (remoteLS) { if (!remoteLS->IsExpired ()) { @@ -193,9 +197,9 @@ namespace client m_RemoteLeaseSets.erase (ident); return nullptr; } - } + } else - { + { auto ls = i2p::data::netdb.FindLeaseSet (ident); if (ls && !ls->IsExpired ()) { @@ -203,7 +207,7 @@ namespace client std::lock_guard _lock(m_RemoteLeaseSetsMutex); m_RemoteLeaseSets[ident] = ls; return ls; - } + } } return nullptr; } @@ -215,11 +219,11 @@ namespace client UpdateLeaseSet (); std::lock_guard l(m_LeaseSetMutex); return m_LeaseSet; - } + } void LeaseSetDestination::SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet) { - { + { std::lock_guard l(m_LeaseSetMutex); m_LeaseSet.reset (newLeaseSet); } @@ -229,21 +233,26 @@ namespace client m_PublishVerificationTimer.cancel (); Publish (); } - } - + } + + void LeaseSetDestination::AddReadyCallback(ReadyCallback cb) + { + m_ReadyCallbacks.push_back(cb); + } + void LeaseSetDestination::UpdateLeaseSet () { - int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels - if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum + int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels + if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum CreateNewLeaseSet (m_Pool->GetInboundTunnels (numTunnels)); - } + } bool LeaseSetDestination::SubmitSessionKey (const uint8_t * key, const uint8_t * tag) { struct { uint8_t k[32], t[32]; - } data; + } data; memcpy (data.k, key, 32); memcpy (data.t, tag, 32); auto s = shared_from_this (); @@ -256,42 +265,42 @@ namespace client void LeaseSetDestination::ProcessGarlicMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); + m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); } void LeaseSetDestination::ProcessDeliveryStatusMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); + m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); } void LeaseSetDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr from) { uint8_t typeID = buf[I2NP_HEADER_TYPEID_OFFSET]; switch (typeID) - { + { case eI2NPData: HandleDataMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); break; case eI2NPDeliveryStatus: // we assume tunnel tests non-encrypted HandleDeliveryStatusMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); - break; + break; case eI2NPDatabaseStore: HandleDatabaseStoreMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); break; case eI2NPDatabaseSearchReply: HandleDatabaseSearchReplyMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); - break; + break; default: i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); - } - } + } + } void LeaseSetDestination::HandleDatabaseStoreMessage (const uint8_t * buf, size_t len) { uint32_t replyToken = bufbe32toh (buf + DATABASE_STORE_REPLY_TOKEN_OFFSET); size_t offset = DATABASE_STORE_HEADER_SIZE; - if (replyToken) + if (replyToken) { LogPrint (eLogInfo, "Destination: Reply token is ignored for DatabaseStore"); offset += 36; @@ -307,8 +316,8 @@ namespace client { leaseSet = it->second; if (leaseSet->IsNewer (buf + offset, len - offset)) - { - leaseSet->Update (buf + offset, len - offset); + { + leaseSet->Update (buf + offset, len - offset); if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key) LogPrint (eLogDebug, "Destination: Remote LeaseSet updated"); else @@ -322,7 +331,7 @@ namespace client LogPrint (eLogDebug, "Destination: Remote LeaseSet is older. Not updated"); } else - { + { leaseSet = std::make_shared (buf + offset, len - offset); if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key) { @@ -339,18 +348,18 @@ namespace client LogPrint (eLogError, "Destination: New remote LeaseSet failed"); leaseSet = nullptr; } - } - } + } + } else LogPrint (eLogError, "Destination: Unexpected client's DatabaseStore type ", buf[DATABASE_STORE_TYPE_OFFSET], ", dropped"); - + auto it1 = m_LeaseSetRequests.find (key); if (it1 != m_LeaseSetRequests.end ()) { it1->second->requestTimeoutTimer.cancel (); if (it1->second) it1->second->Complete (leaseSet); m_LeaseSetRequests.erase (it1); - } + } } void LeaseSetDestination::HandleDatabaseSearchReplyMessage (const uint8_t * buf, size_t len) @@ -364,7 +373,7 @@ namespace client auto request = it->second; bool found = false; if (request->excluded.size () < MAX_NUM_FLOODFILLS_PER_REQUEST) - { + { for (int i = 0; i < num; i++) { i2p::data::IdentHash peerHash (buf + 33 + i*32); @@ -372,28 +381,28 @@ namespace client { LogPrint (eLogInfo, "Destination: Found new floodfill, request it"); // TODO: recheck this message i2p::data::netdb.RequestDestination (peerHash); - } + } } - + auto floodfill = i2p::data::netdb.GetClosestFloodfill (key, request->excluded); if (floodfill) { LogPrint (eLogInfo, "Destination: Requesting ", key.ToBase64 (), " at ", floodfill->GetIdentHash ().ToBase64 ()); if (SendLeaseSetRequest (key, floodfill, request)) found = true; - } - } + } + } if (!found) - { + { LogPrint (eLogInfo, "Destination: ", key.ToBase64 (), " was not found on ", MAX_NUM_FLOODFILLS_PER_REQUEST, " floodfills"); request->Complete (nullptr); m_LeaseSetRequests.erase (key); - } - } - else + } + } + else LogPrint (eLogWarning, "Destination: Request for ", key.ToBase64 (), " not found"); - } - + } + void LeaseSetDestination::HandleDeliveryStatusMessage (std::shared_ptr msg) { uint32_t msgID = bufbe32toh (msg->GetPayload () + DELIVERY_STATUS_MSGID_OFFSET); @@ -405,20 +414,20 @@ namespace client // schedule verification m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_VERIFICATION_TIMEOUT)); m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); } else i2p::garlic::GarlicDestination::HandleDeliveryStatusMessage (msg); - } + } void LeaseSetDestination::SetLeaseSetUpdated () - { + { UpdateLeaseSet (); } - + void LeaseSetDestination::Publish () - { - if (!m_LeaseSet || !m_Pool) + { + if (!m_LeaseSet || !m_Pool) { LogPrint (eLogError, "Destination: Can't publish non-existing LeaseSet"); return; @@ -435,9 +444,9 @@ namespace client m_PublishDelayTimer.cancel (); m_PublishDelayTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_MIN_INTERVAL)); m_PublishDelayTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishDelayTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); return; - } + } auto outbound = m_Pool->GetNextOutboundTunnel (); if (!outbound) { @@ -450,28 +459,28 @@ namespace client LogPrint (eLogError, "Destination: Can't publish LeaseSet. No inbound tunnels"); return; } - auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills); + auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills); if (!floodfill) { LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found"); m_ExcludedFloodfills.clear (); return; - } + } m_ExcludedFloodfills.insert (floodfill->GetIdentHash ()); LogPrint (eLogDebug, "Destination: Publish LeaseSet of ", GetIdentHash ().ToBase32 ()); RAND_bytes ((uint8_t *)&m_PublishReplyToken, 4); - auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound)); + auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound)); m_PublishConfirmationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_CONFIRMATION_TIMEOUT)); m_PublishConfirmationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishConfirmationTimer, - shared_from_this (), std::placeholders::_1)); - outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg); + shared_from_this (), std::placeholders::_1)); + outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg); m_LastSubmissionTime = ts; } void LeaseSetDestination::HandlePublishConfirmationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) - { + { if (m_PublishReplyToken) { LogPrint (eLogWarning, "Destination: Publish confirmation was not received in ", PUBLISH_CONFIRMATION_TIMEOUT, " seconds, will try again"); @@ -484,25 +493,25 @@ namespace client void LeaseSetDestination::HandlePublishVerificationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) - { + { auto s = shared_from_this (); - RequestLeaseSet (GetIdentHash (), + RequestLeaseSet (GetIdentHash (), // "this" added due to bug in gcc 4.7-4.8 [s,this](std::shared_ptr leaseSet) { - if (leaseSet) + if (leaseSet) { if (s->m_LeaseSet && *s->m_LeaseSet == *leaseSet) { // we got latest LeasetSet LogPrint (eLogDebug, "Destination: published LeaseSet verified for ", GetIdentHash().ToBase32()); s->m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_REGULAR_VERIFICATION_INTERNAL)); - s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1)); + s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1)); return; - } + } else LogPrint (eLogDebug, "Destination: LeaseSet is different than just published for ", GetIdentHash().ToBase32()); - } + } else LogPrint (eLogWarning, "Destination: couldn't find published LeaseSet for ", GetIdentHash().ToBase32()); // we have to publish again @@ -515,16 +524,16 @@ namespace client { if (ecode != boost::asio::error::operation_aborted) Publish (); - } - + } + bool LeaseSetDestination::RequestDestination (const i2p::data::IdentHash& dest, RequestComplete requestComplete) { - if (!m_Pool || !IsReady ()) - { - if (requestComplete) + if (!m_Pool || !IsReady ()) + { + if (requestComplete) m_Service.post ([requestComplete](void){requestComplete (nullptr);}); return false; - } + } m_Service.post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), dest, requestComplete)); return true; } @@ -536,14 +545,14 @@ namespace client { auto it = s->m_LeaseSetRequests.find (dest); if (it != s->m_LeaseSetRequests.end ()) - { - auto requestComplete = it->second; + { + auto requestComplete = it->second; s->m_LeaseSetRequests.erase (it); if (notify && requestComplete) requestComplete->Complete (nullptr); - } - }); + } + }); } - + void LeaseSetDestination::RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete) { std::set excluded; @@ -564,28 +573,28 @@ namespace client m_LeaseSetRequests.erase (ret.first); if (requestComplete) requestComplete (nullptr); } - } + } else // duplicate { LogPrint (eLogInfo, "Destination: Request of LeaseSet ", dest.ToBase64 (), " is pending already"); if (ts > ret.first->second->requestTime + MAX_LEASESET_REQUEST_TIMEOUT) - { + { // something went wrong m_LeaseSetRequests.erase (ret.first); if (requestComplete) requestComplete (nullptr); } else if (requestComplete) ret.first->second->requestComplete.push_back (requestComplete); - } - } + } + } else - { + { LogPrint (eLogError, "Destination: Can't request LeaseSet, no floodfills found"); if (requestComplete) requestComplete (nullptr); - } - } - - bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest, + } + } + + bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr nextFloodfill, std::shared_ptr request) { if (!request->replyTunnel || !request->replyTunnel->IsEstablished ()) @@ -594,36 +603,36 @@ namespace client if (!request->outboundTunnel || !request->outboundTunnel->IsEstablished ()) request->outboundTunnel = m_Pool->GetNextOutboundTunnel (); if (!request->outboundTunnel) LogPrint (eLogError, "Destination: Can't send LeaseSet request, no outbound tunnels found"); - + if (request->replyTunnel && request->outboundTunnel) - { + { request->excluded.insert (nextFloodfill->GetIdentHash ()); request->requestTimeoutTimer.cancel (); uint8_t replyKey[32], replyTag[32]; - RAND_bytes (replyKey, 32); // random session key + RAND_bytes (replyKey, 32); // random session key RAND_bytes (replyTag, 32); // random session tag AddSessionKey (replyKey, replyTag); auto msg = WrapMessage (nextFloodfill, - CreateLeaseSetDatabaseLookupMsg (dest, request->excluded, + CreateLeaseSetDatabaseLookupMsg (dest, request->excluded, request->replyTunnel, replyKey, replyTag)); request->outboundTunnel->SendTunnelDataMsg ( { - i2p::tunnel::TunnelMessageBlock - { + i2p::tunnel::TunnelMessageBlock + { i2p::tunnel::eDeliveryTypeRouter, nextFloodfill->GetIdentHash (), 0, msg } - }); + }); request->requestTimeoutTimer.expires_from_now (boost::posix_time::seconds(LEASESET_REQUEST_TIMEOUT)); request->requestTimeoutTimer.async_wait (std::bind (&LeaseSetDestination::HandleRequestTimoutTimer, shared_from_this (), std::placeholders::_1, dest)); - } + } else return false; return true; - } + } void LeaseSetDestination::HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest) { @@ -641,26 +650,26 @@ namespace client { // reset tunnels, because one them might fail it->second->outboundTunnel = nullptr; - it->second->replyTunnel = nullptr; + it->second->replyTunnel = nullptr; done = !SendLeaseSetRequest (dest, floodfill, it->second); } else done = true; } else - { + { LogPrint (eLogWarning, "Destination: ", dest.ToBase64 (), " was not found within ", MAX_LEASESET_REQUEST_TIMEOUT, " seconds"); done = true; } - + if (done) { - auto requestComplete = it->second; + auto requestComplete = it->second; m_LeaseSetRequests.erase (it); if (requestComplete) requestComplete->Complete (nullptr); - } - } - } + } + } + } } void LeaseSetDestination::HandleCleanupTimer (const boost::system::error_code& ecode) @@ -674,7 +683,31 @@ namespace client m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, shared_from_this (), std::placeholders::_1)); } - } + } + + void LeaseSetDestination::HandleReadyCheckTimer(const boost::system::error_code & ec) + { + if (ec != boost::asio::error::operation_aborted) + { + // TODO: locking ? + if(IsReady()) + { + for (auto & itr : m_ReadyCallbacks) + { + itr(ec); + } + m_ReadyCallbacks.clear(); + } + } + else + { + for (auto & itr : m_ReadyCallbacks) + { + itr(ec); + } + m_ReadyCallbacks.clear(); + } + } void LeaseSetDestination::CleanupRemoteLeaseSets () { @@ -686,11 +719,11 @@ namespace client { LogPrint (eLogWarning, "Destination: Remote LeaseSet ", it->second->GetIdentHash ().ToBase64 (), " expired"); it = m_RemoteLeaseSets.erase (it); - } - else + } + else ++it; } - } + } ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params): LeaseSetDestination (isPublic, params), @@ -703,26 +736,26 @@ namespace client i2p::crypto::GenerateElGamalKeyPair(m_EncryptionPrivateKey, m_EncryptionPublicKey); if (isPublic) LogPrint (eLogInfo, "Destination: Local address ", GetIdentHash().ToBase32 (), " created"); - } + } - ClientDestination::~ClientDestination () + ClientDestination::~ClientDestination () { - } - + } + bool ClientDestination::Start () { if (LeaseSetDestination::Start ()) - { + { m_StreamingDestination = std::make_shared (GetSharedFromThis ()); // TODO: - m_StreamingDestination->Start (); + m_StreamingDestination->Start (); for (auto& it: m_StreamingDestinationsByPorts) it.second->Start (); return true; - } + } else return false; - } - + } + bool ClientDestination::Stop () { if (LeaseSetDestination::Stop ()) @@ -732,21 +765,21 @@ namespace client //m_StreamingDestination->SetOwner (nullptr); m_StreamingDestination = nullptr; for (auto& it: m_StreamingDestinationsByPorts) - { + { it.second->Stop (); //it.second->SetOwner (nullptr); } m_StreamingDestinationsByPorts.clear (); if (m_DatagramDestination) - { + { delete m_DatagramDestination; m_DatagramDestination = nullptr; - } + } return true; } else return false; - } + } #ifdef I2LUA void ClientDestination::Ready(ReadyPromise & p) @@ -773,14 +806,14 @@ namespace client ScheduleCheckForReady(p); } #endif - + void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len) { uint32_t length = bufbe32toh (buf); buf += 4; // we assume I2CP payload uint16_t fromPort = bufbe16toh (buf + 4), // source - toPort = bufbe16toh (buf + 6); // destination + toPort = bufbe16toh (buf + 6); // destination switch (buf[9]) { case PROTOCOL_TYPE_STREAMING: @@ -804,28 +837,41 @@ namespace client LogPrint (eLogError, "Destination: Data: unexpected protocol ", buf[9]); } } - - void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) + + void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) { - if (!streamRequestComplete) + if (!streamRequestComplete) { LogPrint (eLogError, "Destination: request callback is not specified in CreateStream"); return; - } - auto leaseSet = FindLeaseSet (dest); - if (leaseSet) - streamRequestComplete(CreateStream (leaseSet, port)); + } + if(IsReady()) + { + auto leaseSet = FindLeaseSet (dest); + if (leaseSet) + streamRequestComplete(CreateStream (leaseSet, port)); + else + { + auto s = GetSharedFromThis (); + RequestDestination (dest, + [s, streamRequestComplete, port](std::shared_ptr ls) + { + if (ls) + streamRequestComplete(s->CreateStream (ls, port)); + else + streamRequestComplete (nullptr); + }); + } + } else { - auto s = GetSharedFromThis (); - RequestDestination (dest, - [s, streamRequestComplete, port](std::shared_ptr ls) - { - if (ls) - streamRequestComplete(s->CreateStream (ls, port)); + // call if tunnel is not ready + AddReadyCallback([&](const boost::system::error_code & ec) { + if(ec) + streamRequestComplete(nullptr); else - streamRequestComplete (nullptr); - }); + CreateStream(streamRequestComplete, dest, port); + }); } } @@ -837,18 +883,18 @@ namespace client return nullptr; } - std::shared_ptr ClientDestination::GetStreamingDestination (int port) const - { - if (port) + std::shared_ptr ClientDestination::GetStreamingDestination (int port) const + { + if (port) { auto it = m_StreamingDestinationsByPorts.find (port); if (it != m_StreamingDestinationsByPorts.end ()) return it->second; - } + } // if port is zero or not found, use default destination - return m_StreamingDestination; + return m_StreamingDestination; } - + void ClientDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor) { if (m_StreamingDestination) @@ -860,35 +906,35 @@ namespace client if (m_StreamingDestination) m_StreamingDestination->ResetAcceptor (); } - + bool ClientDestination::IsAcceptingStreams () const { if (m_StreamingDestination) return m_StreamingDestination->IsAcceptorSet (); return false; - } + } void ClientDestination::AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor) { if (m_StreamingDestination) m_StreamingDestination->AcceptOnce (acceptor); - } - + } + std::shared_ptr ClientDestination::CreateStreamingDestination (int port, bool gzip) { - auto dest = std::make_shared (GetSharedFromThis (), port, gzip); + auto dest = std::make_shared (GetSharedFromThis (), port, gzip); if (port) m_StreamingDestinationsByPorts[port] = dest; - else // update default + else // update default m_StreamingDestination = dest; return dest; - } - + } + i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination () { if (m_DatagramDestination == nullptr) m_DatagramDestination = new i2p::datagram::DatagramDestination (GetSharedFromThis ()); - return m_DatagramDestination; + return m_DatagramDestination; } std::vector > ClientDestination::GetAllStreams () const @@ -898,12 +944,12 @@ namespace client { for (auto& it: m_StreamingDestination->GetStreams ()) ret.push_back (it.second); - } + } for (auto& it: m_StreamingDestinationsByPorts) for (auto& it1: it.second->GetStreams ()) ret.push_back (it1.second); return ret; - } + } void ClientDestination::PersistTemporaryKeys () { @@ -927,7 +973,7 @@ namespace client return; } LogPrint(eLogError, "Destinations: Can't save keys to ", path); - } + } void ClientDestination::CreateNewLeaseSet (std::vector > tunnels) { @@ -935,7 +981,7 @@ namespace client // sign Sign (leaseSet->GetBuffer (), leaseSet->GetBufferLen () - leaseSet->GetSignatureLen (), leaseSet->GetSignature ()); // TODO SetLeaseSet (leaseSet); - } + } void ClientDestination::CleanupDestination () { diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h index ef7437fb..4e299322 100644 --- a/libi2pd/Destination.h +++ b/libi2pd/Destination.h @@ -63,6 +63,7 @@ namespace client public std::enable_shared_from_this { typedef std::function leaseSet)> RequestComplete; + typedef std::function ReadyCallback; // leaseSet = nullptr means not found struct LeaseSetRequest { @@ -108,6 +109,8 @@ namespace client void ProcessDeliveryStatusMessage (std::shared_ptr msg); void SetLeaseSetUpdated (); + void AddReadyCallback(ReadyCallback cb); + protected: void SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet); @@ -131,7 +134,8 @@ namespace client void RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete); bool SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr nextFloodfill, std::shared_ptr request); void HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest); - void HandleCleanupTimer (const boost::system::error_code& ecode); + void HandleCleanupTimer (const boost::system::error_code& ecode); + void HandleReadyCheckTimer (const boost::system::error_code& ecode); void CleanupRemoteLeaseSets (); private: @@ -152,7 +156,9 @@ namespace client std::set m_ExcludedFloodfills; // for publishing boost::asio::deadline_timer m_PublishConfirmationTimer, m_PublishVerificationTimer, - m_PublishDelayTimer, m_CleanupTimer; + m_PublishDelayTimer, m_CleanupTimer, m_ReadyCheckTimer; + + std::vector m_ReadyCallbacks; public: @@ -182,9 +188,9 @@ namespace client void Sign (const uint8_t * buf, int len, uint8_t * signature) const { m_Keys.Sign (buf, len, signature); }; // ref counter - int Acquire () { return ++m_RefCounter; }; + int Acquire () { return ++m_RefCounter; }; int Release () { return --m_RefCounter; }; - int GetRefCounter () const { return m_RefCounter; }; + int GetRefCounter () const { return m_RefCounter; }; // streaming std::shared_ptr CreateStreamingDestination (int port, bool gzip = true); // additional From f87a51034e187ae4ca192bff224ba8bc0f362d67 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:07:09 -0400 Subject: [PATCH 02/11] re trigger timer --- libi2pd/Destination.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index 5f17856b..f45e6a8a 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -698,6 +698,9 @@ namespace client } m_ReadyCallbacks.clear(); } + m_ReadyCheckTimer.expires_from_now(boost::posix_time::seconds (1)); + m_ReadyCheckTimer.async_wait(std::bind(&LeaseSetDestination::HandleReadyCheckTimer, + shared_from_this (), std::placeholders::_1)); } else { @@ -730,7 +733,7 @@ namespace client m_Keys (keys), m_DatagramDestination (nullptr), m_RefCounter (0), m_ReadyChecker(GetService()) { - if (isPublic) + if (isPublic) PersistTemporaryKeys (); else i2p::crypto::GenerateElGamalKeyPair(m_EncryptionPrivateKey, m_EncryptionPublicKey); From 897cfad399851055583d330bb7f0d4c2025fe786 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:12:59 -0400 Subject: [PATCH 03/11] tabify --- libi2pd/Destination.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h index 4e299322..88770a11 100644 --- a/libi2pd/Destination.h +++ b/libi2pd/Destination.h @@ -63,7 +63,7 @@ namespace client public std::enable_shared_from_this { typedef std::function leaseSet)> RequestComplete; - typedef std::function ReadyCallback; + typedef std::function ReadyCallback; // leaseSet = nullptr means not found struct LeaseSetRequest { @@ -109,7 +109,7 @@ namespace client void ProcessDeliveryStatusMessage (std::shared_ptr msg); void SetLeaseSetUpdated (); - void AddReadyCallback(ReadyCallback cb); + void AddReadyCallback(ReadyCallback cb); protected: @@ -134,8 +134,8 @@ namespace client void RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete); bool SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr nextFloodfill, std::shared_ptr request); void HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest); - void HandleCleanupTimer (const boost::system::error_code& ecode); - void HandleReadyCheckTimer (const boost::system::error_code& ecode); + void HandleCleanupTimer (const boost::system::error_code& ecode); + void HandleReadyCheckTimer (const boost::system::error_code& ecode); void CleanupRemoteLeaseSets (); private: @@ -158,7 +158,7 @@ namespace client boost::asio::deadline_timer m_PublishConfirmationTimer, m_PublishVerificationTimer, m_PublishDelayTimer, m_CleanupTimer, m_ReadyCheckTimer; - std::vector m_ReadyCallbacks; + std::vector m_ReadyCallbacks; public: From 7af3b751d45c6038dfbd6bd01b022a10934d43ea Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:14:06 -0400 Subject: [PATCH 04/11] clarify --- libi2pd/Destination.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index f45e6a8a..a32e2ffa 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -868,7 +868,7 @@ namespace client } else { - // call if tunnel is not ready + // call later if tunnel is not ready AddReadyCallback([&](const boost::system::error_code & ec) { if(ec) streamRequestComplete(nullptr); From 4e4def4fb90f8a0cb1faf61a935479f7527168bf Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:24:07 -0400 Subject: [PATCH 05/11] use shared from this --- libi2pd/Destination.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index a32e2ffa..e544b082 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -869,11 +869,12 @@ namespace client else { // call later if tunnel is not ready - AddReadyCallback([&](const boost::system::error_code & ec) { + auto s = GetSharedFromThis(); + AddReadyCallback([s, streamRequestComplete, dest, port](const boost::system::error_code & ec) { if(ec) streamRequestComplete(nullptr); else - CreateStream(streamRequestComplete, dest, port); + s->CreateStream(streamRequestComplete, dest, port); }); } } From a6f62a99b9583395eaf242fe4d2c56c0d4efa06a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:37:53 -0400 Subject: [PATCH 06/11] Revert "use shared from this" This reverts commit 4e4def4fb90f8a0cb1faf61a935479f7527168bf. --- libi2pd/Destination.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index e544b082..a32e2ffa 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -869,12 +869,11 @@ namespace client else { // call later if tunnel is not ready - auto s = GetSharedFromThis(); - AddReadyCallback([s, streamRequestComplete, dest, port](const boost::system::error_code & ec) { + AddReadyCallback([&](const boost::system::error_code & ec) { if(ec) streamRequestComplete(nullptr); else - s->CreateStream(streamRequestComplete, dest, port); + CreateStream(streamRequestComplete, dest, port); }); } } From 27782ceddd2c14c389a2772fa74e8e2c68aae90a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:37:56 -0400 Subject: [PATCH 07/11] Revert "clarify" This reverts commit 7af3b751d45c6038dfbd6bd01b022a10934d43ea. --- libi2pd/Destination.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index a32e2ffa..f45e6a8a 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -868,7 +868,7 @@ namespace client } else { - // call later if tunnel is not ready + // call if tunnel is not ready AddReadyCallback([&](const boost::system::error_code & ec) { if(ec) streamRequestComplete(nullptr); From d7e4deab4e0a1823128dd03fbba22aec52f8d43b Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:37:57 -0400 Subject: [PATCH 08/11] Revert "tabify" This reverts commit 897cfad399851055583d330bb7f0d4c2025fe786. --- libi2pd/Destination.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h index 88770a11..4e299322 100644 --- a/libi2pd/Destination.h +++ b/libi2pd/Destination.h @@ -63,7 +63,7 @@ namespace client public std::enable_shared_from_this { typedef std::function leaseSet)> RequestComplete; - typedef std::function ReadyCallback; + typedef std::function ReadyCallback; // leaseSet = nullptr means not found struct LeaseSetRequest { @@ -109,7 +109,7 @@ namespace client void ProcessDeliveryStatusMessage (std::shared_ptr msg); void SetLeaseSetUpdated (); - void AddReadyCallback(ReadyCallback cb); + void AddReadyCallback(ReadyCallback cb); protected: @@ -134,8 +134,8 @@ namespace client void RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete); bool SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr nextFloodfill, std::shared_ptr request); void HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest); - void HandleCleanupTimer (const boost::system::error_code& ecode); - void HandleReadyCheckTimer (const boost::system::error_code& ecode); + void HandleCleanupTimer (const boost::system::error_code& ecode); + void HandleReadyCheckTimer (const boost::system::error_code& ecode); void CleanupRemoteLeaseSets (); private: @@ -158,7 +158,7 @@ namespace client boost::asio::deadline_timer m_PublishConfirmationTimer, m_PublishVerificationTimer, m_PublishDelayTimer, m_CleanupTimer, m_ReadyCheckTimer; - std::vector m_ReadyCallbacks; + std::vector m_ReadyCallbacks; public: From 41ce9d47e5d9206bd512bcabc3190c67a83d4e1b Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:37:58 -0400 Subject: [PATCH 09/11] Revert "re trigger timer" This reverts commit f87a51034e187ae4ca192bff224ba8bc0f362d67. --- libi2pd/Destination.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index f45e6a8a..5f17856b 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -698,9 +698,6 @@ namespace client } m_ReadyCallbacks.clear(); } - m_ReadyCheckTimer.expires_from_now(boost::posix_time::seconds (1)); - m_ReadyCheckTimer.async_wait(std::bind(&LeaseSetDestination::HandleReadyCheckTimer, - shared_from_this (), std::placeholders::_1)); } else { @@ -733,7 +730,7 @@ namespace client m_Keys (keys), m_DatagramDestination (nullptr), m_RefCounter (0), m_ReadyChecker(GetService()) { - if (isPublic) + if (isPublic) PersistTemporaryKeys (); else i2p::crypto::GenerateElGamalKeyPair(m_EncryptionPrivateKey, m_EncryptionPublicKey); From 416589cc930f0646b02d0db239f60d55aa3db05a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:38:26 -0400 Subject: [PATCH 10/11] Revert "add deferred ready checking for destination" This reverts commit 3f409d0e285489e5b66caf176e038af0e7fe469b. --- libi2pd/Destination.cpp | 396 ++++++++++++++++++---------------------- libi2pd/Destination.h | 14 +- 2 files changed, 179 insertions(+), 231 deletions(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index 5f17856b..d45fdf47 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -13,10 +13,9 @@ namespace i2p namespace client { LeaseSetDestination::LeaseSetDestination (bool isPublic, const std::map * params): - m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), - m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), - m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service), - m_ReadyCheckTimer(m_Service) + m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), + m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), + m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service) { int inLen = DEFAULT_INBOUND_TUNNEL_LENGTH; int inQty = DEFAULT_INBOUND_TUNNELS_QUANTITY; @@ -84,80 +83,77 @@ namespace client LeaseSetDestination::~LeaseSetDestination () { - if (m_IsRunning) + if (m_IsRunning) Stop (); if (m_Pool) - i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); + i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); for (auto& it: m_LeaseSetRequests) it.second->Complete (nullptr); - } + } void LeaseSetDestination::Run () { while (m_IsRunning) { try - { + { m_Service.run (); } catch (std::exception& ex) { LogPrint (eLogError, "Destination: runtime exception: ", ex.what ()); - } - } - } + } + } + } bool LeaseSetDestination::Start () - { + { if (!m_IsRunning) - { + { LoadTags (); m_IsRunning = true; m_Pool->SetLocalDestination (shared_from_this ()); - m_Pool->SetActive (true); + m_Pool->SetActive (true); m_CleanupTimer.expires_from_now (boost::posix_time::minutes (DESTINATION_CLEANUP_TIMEOUT)); m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); m_Thread = new std::thread (std::bind (&LeaseSetDestination::Run, shared_from_this ())); - m_ReadyCheckTimer.expires_from_now(boost::posix_time::seconds (1)); - m_ReadyCheckTimer.async_wait(std::bind(&LeaseSetDestination::HandleReadyCheckTimer, - shared_from_this (), std::placeholders::_1)); + return true; - } + } else return false; } - + bool LeaseSetDestination::Stop () - { + { if (m_IsRunning) - { + { m_CleanupTimer.cancel (); m_PublishConfirmationTimer.cancel (); - m_PublishVerificationTimer.cancel (); - m_ReadyCheckTimer.cancel (); + m_PublishVerificationTimer.cancel (); m_IsRunning = false; if (m_Pool) - { + { m_Pool->SetLocalDestination (nullptr); i2p::tunnel::tunnels.StopTunnelPool (m_Pool); - } + } m_Service.stop (); if (m_Thread) - { - m_Thread->join (); + { + m_Thread->join (); delete m_Thread; m_Thread = 0; - } + } SaveTags (); CleanUp (); // GarlicDestination return true; - } + } else return false; - } - + } + std::shared_ptr LeaseSetDestination::FindLeaseSet (const i2p::data::IdentHash& ident) { std::shared_ptr remoteLS; @@ -168,7 +164,7 @@ namespace client remoteLS = it->second; } - if (remoteLS) + if (remoteLS) { if (!remoteLS->IsExpired ()) { @@ -197,9 +193,9 @@ namespace client m_RemoteLeaseSets.erase (ident); return nullptr; } - } + } else - { + { auto ls = i2p::data::netdb.FindLeaseSet (ident); if (ls && !ls->IsExpired ()) { @@ -207,7 +203,7 @@ namespace client std::lock_guard _lock(m_RemoteLeaseSetsMutex); m_RemoteLeaseSets[ident] = ls; return ls; - } + } } return nullptr; } @@ -219,11 +215,11 @@ namespace client UpdateLeaseSet (); std::lock_guard l(m_LeaseSetMutex); return m_LeaseSet; - } + } void LeaseSetDestination::SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet) { - { + { std::lock_guard l(m_LeaseSetMutex); m_LeaseSet.reset (newLeaseSet); } @@ -233,26 +229,21 @@ namespace client m_PublishVerificationTimer.cancel (); Publish (); } - } - - void LeaseSetDestination::AddReadyCallback(ReadyCallback cb) - { - m_ReadyCallbacks.push_back(cb); - } - + } + void LeaseSetDestination::UpdateLeaseSet () { - int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels - if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum + int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels + if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum CreateNewLeaseSet (m_Pool->GetInboundTunnels (numTunnels)); - } + } bool LeaseSetDestination::SubmitSessionKey (const uint8_t * key, const uint8_t * tag) { struct { uint8_t k[32], t[32]; - } data; + } data; memcpy (data.k, key, 32); memcpy (data.t, tag, 32); auto s = shared_from_this (); @@ -265,42 +256,42 @@ namespace client void LeaseSetDestination::ProcessGarlicMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); + m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); } void LeaseSetDestination::ProcessDeliveryStatusMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); + m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); } void LeaseSetDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr from) { uint8_t typeID = buf[I2NP_HEADER_TYPEID_OFFSET]; switch (typeID) - { + { case eI2NPData: HandleDataMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); break; case eI2NPDeliveryStatus: // we assume tunnel tests non-encrypted HandleDeliveryStatusMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); - break; + break; case eI2NPDatabaseStore: HandleDatabaseStoreMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); break; case eI2NPDatabaseSearchReply: HandleDatabaseSearchReplyMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); - break; + break; default: i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); - } - } + } + } void LeaseSetDestination::HandleDatabaseStoreMessage (const uint8_t * buf, size_t len) { uint32_t replyToken = bufbe32toh (buf + DATABASE_STORE_REPLY_TOKEN_OFFSET); size_t offset = DATABASE_STORE_HEADER_SIZE; - if (replyToken) + if (replyToken) { LogPrint (eLogInfo, "Destination: Reply token is ignored for DatabaseStore"); offset += 36; @@ -316,8 +307,8 @@ namespace client { leaseSet = it->second; if (leaseSet->IsNewer (buf + offset, len - offset)) - { - leaseSet->Update (buf + offset, len - offset); + { + leaseSet->Update (buf + offset, len - offset); if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key) LogPrint (eLogDebug, "Destination: Remote LeaseSet updated"); else @@ -331,7 +322,7 @@ namespace client LogPrint (eLogDebug, "Destination: Remote LeaseSet is older. Not updated"); } else - { + { leaseSet = std::make_shared (buf + offset, len - offset); if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key) { @@ -348,18 +339,18 @@ namespace client LogPrint (eLogError, "Destination: New remote LeaseSet failed"); leaseSet = nullptr; } - } - } + } + } else LogPrint (eLogError, "Destination: Unexpected client's DatabaseStore type ", buf[DATABASE_STORE_TYPE_OFFSET], ", dropped"); - + auto it1 = m_LeaseSetRequests.find (key); if (it1 != m_LeaseSetRequests.end ()) { it1->second->requestTimeoutTimer.cancel (); if (it1->second) it1->second->Complete (leaseSet); m_LeaseSetRequests.erase (it1); - } + } } void LeaseSetDestination::HandleDatabaseSearchReplyMessage (const uint8_t * buf, size_t len) @@ -373,7 +364,7 @@ namespace client auto request = it->second; bool found = false; if (request->excluded.size () < MAX_NUM_FLOODFILLS_PER_REQUEST) - { + { for (int i = 0; i < num; i++) { i2p::data::IdentHash peerHash (buf + 33 + i*32); @@ -381,28 +372,28 @@ namespace client { LogPrint (eLogInfo, "Destination: Found new floodfill, request it"); // TODO: recheck this message i2p::data::netdb.RequestDestination (peerHash); - } + } } - + auto floodfill = i2p::data::netdb.GetClosestFloodfill (key, request->excluded); if (floodfill) { LogPrint (eLogInfo, "Destination: Requesting ", key.ToBase64 (), " at ", floodfill->GetIdentHash ().ToBase64 ()); if (SendLeaseSetRequest (key, floodfill, request)) found = true; - } - } + } + } if (!found) - { + { LogPrint (eLogInfo, "Destination: ", key.ToBase64 (), " was not found on ", MAX_NUM_FLOODFILLS_PER_REQUEST, " floodfills"); request->Complete (nullptr); m_LeaseSetRequests.erase (key); - } - } - else + } + } + else LogPrint (eLogWarning, "Destination: Request for ", key.ToBase64 (), " not found"); - } - + } + void LeaseSetDestination::HandleDeliveryStatusMessage (std::shared_ptr msg) { uint32_t msgID = bufbe32toh (msg->GetPayload () + DELIVERY_STATUS_MSGID_OFFSET); @@ -414,20 +405,20 @@ namespace client // schedule verification m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_VERIFICATION_TIMEOUT)); m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); } else i2p::garlic::GarlicDestination::HandleDeliveryStatusMessage (msg); - } + } void LeaseSetDestination::SetLeaseSetUpdated () - { + { UpdateLeaseSet (); } - + void LeaseSetDestination::Publish () - { - if (!m_LeaseSet || !m_Pool) + { + if (!m_LeaseSet || !m_Pool) { LogPrint (eLogError, "Destination: Can't publish non-existing LeaseSet"); return; @@ -444,9 +435,9 @@ namespace client m_PublishDelayTimer.cancel (); m_PublishDelayTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_MIN_INTERVAL)); m_PublishDelayTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishDelayTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); return; - } + } auto outbound = m_Pool->GetNextOutboundTunnel (); if (!outbound) { @@ -459,28 +450,28 @@ namespace client LogPrint (eLogError, "Destination: Can't publish LeaseSet. No inbound tunnels"); return; } - auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills); + auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills); if (!floodfill) { LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found"); m_ExcludedFloodfills.clear (); return; - } + } m_ExcludedFloodfills.insert (floodfill->GetIdentHash ()); LogPrint (eLogDebug, "Destination: Publish LeaseSet of ", GetIdentHash ().ToBase32 ()); RAND_bytes ((uint8_t *)&m_PublishReplyToken, 4); - auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound)); + auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound)); m_PublishConfirmationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_CONFIRMATION_TIMEOUT)); m_PublishConfirmationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishConfirmationTimer, - shared_from_this (), std::placeholders::_1)); - outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg); + shared_from_this (), std::placeholders::_1)); + outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg); m_LastSubmissionTime = ts; } void LeaseSetDestination::HandlePublishConfirmationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) - { + { if (m_PublishReplyToken) { LogPrint (eLogWarning, "Destination: Publish confirmation was not received in ", PUBLISH_CONFIRMATION_TIMEOUT, " seconds, will try again"); @@ -493,25 +484,25 @@ namespace client void LeaseSetDestination::HandlePublishVerificationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) - { + { auto s = shared_from_this (); - RequestLeaseSet (GetIdentHash (), + RequestLeaseSet (GetIdentHash (), // "this" added due to bug in gcc 4.7-4.8 [s,this](std::shared_ptr leaseSet) { - if (leaseSet) + if (leaseSet) { if (s->m_LeaseSet && *s->m_LeaseSet == *leaseSet) { // we got latest LeasetSet LogPrint (eLogDebug, "Destination: published LeaseSet verified for ", GetIdentHash().ToBase32()); s->m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_REGULAR_VERIFICATION_INTERNAL)); - s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1)); + s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1)); return; - } + } else LogPrint (eLogDebug, "Destination: LeaseSet is different than just published for ", GetIdentHash().ToBase32()); - } + } else LogPrint (eLogWarning, "Destination: couldn't find published LeaseSet for ", GetIdentHash().ToBase32()); // we have to publish again @@ -524,16 +515,16 @@ namespace client { if (ecode != boost::asio::error::operation_aborted) Publish (); - } - + } + bool LeaseSetDestination::RequestDestination (const i2p::data::IdentHash& dest, RequestComplete requestComplete) { - if (!m_Pool || !IsReady ()) - { - if (requestComplete) + if (!m_Pool || !IsReady ()) + { + if (requestComplete) m_Service.post ([requestComplete](void){requestComplete (nullptr);}); return false; - } + } m_Service.post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), dest, requestComplete)); return true; } @@ -545,14 +536,14 @@ namespace client { auto it = s->m_LeaseSetRequests.find (dest); if (it != s->m_LeaseSetRequests.end ()) - { - auto requestComplete = it->second; + { + auto requestComplete = it->second; s->m_LeaseSetRequests.erase (it); if (notify && requestComplete) requestComplete->Complete (nullptr); - } - }); + } + }); } - + void LeaseSetDestination::RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete) { std::set excluded; @@ -573,28 +564,28 @@ namespace client m_LeaseSetRequests.erase (ret.first); if (requestComplete) requestComplete (nullptr); } - } + } else // duplicate { LogPrint (eLogInfo, "Destination: Request of LeaseSet ", dest.ToBase64 (), " is pending already"); if (ts > ret.first->second->requestTime + MAX_LEASESET_REQUEST_TIMEOUT) - { + { // something went wrong m_LeaseSetRequests.erase (ret.first); if (requestComplete) requestComplete (nullptr); } else if (requestComplete) ret.first->second->requestComplete.push_back (requestComplete); - } - } + } + } else - { + { LogPrint (eLogError, "Destination: Can't request LeaseSet, no floodfills found"); if (requestComplete) requestComplete (nullptr); - } - } - - bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest, + } + } + + bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr nextFloodfill, std::shared_ptr request) { if (!request->replyTunnel || !request->replyTunnel->IsEstablished ()) @@ -603,36 +594,36 @@ namespace client if (!request->outboundTunnel || !request->outboundTunnel->IsEstablished ()) request->outboundTunnel = m_Pool->GetNextOutboundTunnel (); if (!request->outboundTunnel) LogPrint (eLogError, "Destination: Can't send LeaseSet request, no outbound tunnels found"); - + if (request->replyTunnel && request->outboundTunnel) - { + { request->excluded.insert (nextFloodfill->GetIdentHash ()); request->requestTimeoutTimer.cancel (); uint8_t replyKey[32], replyTag[32]; - RAND_bytes (replyKey, 32); // random session key + RAND_bytes (replyKey, 32); // random session key RAND_bytes (replyTag, 32); // random session tag AddSessionKey (replyKey, replyTag); auto msg = WrapMessage (nextFloodfill, - CreateLeaseSetDatabaseLookupMsg (dest, request->excluded, + CreateLeaseSetDatabaseLookupMsg (dest, request->excluded, request->replyTunnel, replyKey, replyTag)); request->outboundTunnel->SendTunnelDataMsg ( { - i2p::tunnel::TunnelMessageBlock - { + i2p::tunnel::TunnelMessageBlock + { i2p::tunnel::eDeliveryTypeRouter, nextFloodfill->GetIdentHash (), 0, msg } - }); + }); request->requestTimeoutTimer.expires_from_now (boost::posix_time::seconds(LEASESET_REQUEST_TIMEOUT)); request->requestTimeoutTimer.async_wait (std::bind (&LeaseSetDestination::HandleRequestTimoutTimer, shared_from_this (), std::placeholders::_1, dest)); - } + } else return false; return true; - } + } void LeaseSetDestination::HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest) { @@ -650,26 +641,26 @@ namespace client { // reset tunnels, because one them might fail it->second->outboundTunnel = nullptr; - it->second->replyTunnel = nullptr; + it->second->replyTunnel = nullptr; done = !SendLeaseSetRequest (dest, floodfill, it->second); } else done = true; } else - { + { LogPrint (eLogWarning, "Destination: ", dest.ToBase64 (), " was not found within ", MAX_LEASESET_REQUEST_TIMEOUT, " seconds"); done = true; } - + if (done) { - auto requestComplete = it->second; + auto requestComplete = it->second; m_LeaseSetRequests.erase (it); if (requestComplete) requestComplete->Complete (nullptr); - } - } - } + } + } + } } void LeaseSetDestination::HandleCleanupTimer (const boost::system::error_code& ecode) @@ -683,31 +674,7 @@ namespace client m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, shared_from_this (), std::placeholders::_1)); } - } - - void LeaseSetDestination::HandleReadyCheckTimer(const boost::system::error_code & ec) - { - if (ec != boost::asio::error::operation_aborted) - { - // TODO: locking ? - if(IsReady()) - { - for (auto & itr : m_ReadyCallbacks) - { - itr(ec); - } - m_ReadyCallbacks.clear(); - } - } - else - { - for (auto & itr : m_ReadyCallbacks) - { - itr(ec); - } - m_ReadyCallbacks.clear(); - } - } + } void LeaseSetDestination::CleanupRemoteLeaseSets () { @@ -719,11 +686,11 @@ namespace client { LogPrint (eLogWarning, "Destination: Remote LeaseSet ", it->second->GetIdentHash ().ToBase64 (), " expired"); it = m_RemoteLeaseSets.erase (it); - } - else + } + else ++it; } - } + } ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params): LeaseSetDestination (isPublic, params), @@ -736,26 +703,26 @@ namespace client i2p::crypto::GenerateElGamalKeyPair(m_EncryptionPrivateKey, m_EncryptionPublicKey); if (isPublic) LogPrint (eLogInfo, "Destination: Local address ", GetIdentHash().ToBase32 (), " created"); - } + } - ClientDestination::~ClientDestination () + ClientDestination::~ClientDestination () { - } - + } + bool ClientDestination::Start () { if (LeaseSetDestination::Start ()) - { + { m_StreamingDestination = std::make_shared (GetSharedFromThis ()); // TODO: - m_StreamingDestination->Start (); + m_StreamingDestination->Start (); for (auto& it: m_StreamingDestinationsByPorts) it.second->Start (); return true; - } + } else return false; - } - + } + bool ClientDestination::Stop () { if (LeaseSetDestination::Stop ()) @@ -765,21 +732,21 @@ namespace client //m_StreamingDestination->SetOwner (nullptr); m_StreamingDestination = nullptr; for (auto& it: m_StreamingDestinationsByPorts) - { + { it.second->Stop (); //it.second->SetOwner (nullptr); } m_StreamingDestinationsByPorts.clear (); if (m_DatagramDestination) - { + { delete m_DatagramDestination; m_DatagramDestination = nullptr; - } + } return true; } else return false; - } + } #ifdef I2LUA void ClientDestination::Ready(ReadyPromise & p) @@ -806,14 +773,14 @@ namespace client ScheduleCheckForReady(p); } #endif - + void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len) { uint32_t length = bufbe32toh (buf); buf += 4; // we assume I2CP payload uint16_t fromPort = bufbe16toh (buf + 4), // source - toPort = bufbe16toh (buf + 6); // destination + toPort = bufbe16toh (buf + 6); // destination switch (buf[9]) { case PROTOCOL_TYPE_STREAMING: @@ -837,41 +804,28 @@ namespace client LogPrint (eLogError, "Destination: Data: unexpected protocol ", buf[9]); } } - - void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) + + void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) { - if (!streamRequestComplete) + if (!streamRequestComplete) { LogPrint (eLogError, "Destination: request callback is not specified in CreateStream"); return; - } - if(IsReady()) - { - auto leaseSet = FindLeaseSet (dest); - if (leaseSet) - streamRequestComplete(CreateStream (leaseSet, port)); - else - { - auto s = GetSharedFromThis (); - RequestDestination (dest, - [s, streamRequestComplete, port](std::shared_ptr ls) - { - if (ls) - streamRequestComplete(s->CreateStream (ls, port)); - else - streamRequestComplete (nullptr); - }); - } - } + } + auto leaseSet = FindLeaseSet (dest); + if (leaseSet) + streamRequestComplete(CreateStream (leaseSet, port)); else { - // call if tunnel is not ready - AddReadyCallback([&](const boost::system::error_code & ec) { - if(ec) - streamRequestComplete(nullptr); + auto s = GetSharedFromThis (); + RequestDestination (dest, + [s, streamRequestComplete, port](std::shared_ptr ls) + { + if (ls) + streamRequestComplete(s->CreateStream (ls, port)); else - CreateStream(streamRequestComplete, dest, port); - }); + streamRequestComplete (nullptr); + }); } } @@ -883,18 +837,18 @@ namespace client return nullptr; } - std::shared_ptr ClientDestination::GetStreamingDestination (int port) const - { - if (port) + std::shared_ptr ClientDestination::GetStreamingDestination (int port) const + { + if (port) { auto it = m_StreamingDestinationsByPorts.find (port); if (it != m_StreamingDestinationsByPorts.end ()) return it->second; - } + } // if port is zero or not found, use default destination - return m_StreamingDestination; + return m_StreamingDestination; } - + void ClientDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor) { if (m_StreamingDestination) @@ -906,35 +860,35 @@ namespace client if (m_StreamingDestination) m_StreamingDestination->ResetAcceptor (); } - + bool ClientDestination::IsAcceptingStreams () const { if (m_StreamingDestination) return m_StreamingDestination->IsAcceptorSet (); return false; - } + } void ClientDestination::AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor) { if (m_StreamingDestination) m_StreamingDestination->AcceptOnce (acceptor); - } - + } + std::shared_ptr ClientDestination::CreateStreamingDestination (int port, bool gzip) { - auto dest = std::make_shared (GetSharedFromThis (), port, gzip); + auto dest = std::make_shared (GetSharedFromThis (), port, gzip); if (port) m_StreamingDestinationsByPorts[port] = dest; - else // update default + else // update default m_StreamingDestination = dest; return dest; - } - + } + i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination () { if (m_DatagramDestination == nullptr) m_DatagramDestination = new i2p::datagram::DatagramDestination (GetSharedFromThis ()); - return m_DatagramDestination; + return m_DatagramDestination; } std::vector > ClientDestination::GetAllStreams () const @@ -944,12 +898,12 @@ namespace client { for (auto& it: m_StreamingDestination->GetStreams ()) ret.push_back (it.second); - } + } for (auto& it: m_StreamingDestinationsByPorts) for (auto& it1: it.second->GetStreams ()) ret.push_back (it1.second); return ret; - } + } void ClientDestination::PersistTemporaryKeys () { @@ -973,7 +927,7 @@ namespace client return; } LogPrint(eLogError, "Destinations: Can't save keys to ", path); - } + } void ClientDestination::CreateNewLeaseSet (std::vector > tunnels) { @@ -981,7 +935,7 @@ namespace client // sign Sign (leaseSet->GetBuffer (), leaseSet->GetBufferLen () - leaseSet->GetSignatureLen (), leaseSet->GetSignature ()); // TODO SetLeaseSet (leaseSet); - } + } void ClientDestination::CleanupDestination () { diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h index 4e299322..ef7437fb 100644 --- a/libi2pd/Destination.h +++ b/libi2pd/Destination.h @@ -63,7 +63,6 @@ namespace client public std::enable_shared_from_this { typedef std::function leaseSet)> RequestComplete; - typedef std::function ReadyCallback; // leaseSet = nullptr means not found struct LeaseSetRequest { @@ -109,8 +108,6 @@ namespace client void ProcessDeliveryStatusMessage (std::shared_ptr msg); void SetLeaseSetUpdated (); - void AddReadyCallback(ReadyCallback cb); - protected: void SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet); @@ -134,8 +131,7 @@ namespace client void RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete); bool SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr nextFloodfill, std::shared_ptr request); void HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest); - void HandleCleanupTimer (const boost::system::error_code& ecode); - void HandleReadyCheckTimer (const boost::system::error_code& ecode); + void HandleCleanupTimer (const boost::system::error_code& ecode); void CleanupRemoteLeaseSets (); private: @@ -156,9 +152,7 @@ namespace client std::set m_ExcludedFloodfills; // for publishing boost::asio::deadline_timer m_PublishConfirmationTimer, m_PublishVerificationTimer, - m_PublishDelayTimer, m_CleanupTimer, m_ReadyCheckTimer; - - std::vector m_ReadyCallbacks; + m_PublishDelayTimer, m_CleanupTimer; public: @@ -188,9 +182,9 @@ namespace client void Sign (const uint8_t * buf, int len, uint8_t * signature) const { m_Keys.Sign (buf, len, signature); }; // ref counter - int Acquire () { return ++m_RefCounter; }; + int Acquire () { return ++m_RefCounter; }; int Release () { return --m_RefCounter; }; - int GetRefCounter () const { return m_RefCounter; }; + int GetRefCounter () const { return m_RefCounter; }; // streaming std::shared_ptr CreateStreamingDestination (int port, bool gzip = true); // additional From 1ea6d2016d58ae841d183c21316a68df35cc5340 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 12:08:22 -0400 Subject: [PATCH 11/11] add initial connection timeout for i2ptunnel --- libi2pd/Destination.cpp | 328 +++++++++++++++---------------- libi2pd_client/ClientContext.cpp | 25 ++- libi2pd_client/ClientContext.h | 1 + libi2pd_client/I2PService.cpp | 134 ++++++++++--- libi2pd_client/I2PService.h | 24 ++- libi2pd_client/I2PTunnel.cpp | 194 +++++++++--------- 6 files changed, 408 insertions(+), 298 deletions(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index d45fdf47..b0e3365a 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -13,8 +13,8 @@ namespace i2p namespace client { LeaseSetDestination::LeaseSetDestination (bool isPublic, const std::map * params): - m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), - m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), + m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), + m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service) { int inLen = DEFAULT_INBOUND_TUNNEL_LENGTH; @@ -83,77 +83,77 @@ namespace client LeaseSetDestination::~LeaseSetDestination () { - if (m_IsRunning) + if (m_IsRunning) Stop (); if (m_Pool) - i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); + i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); for (auto& it: m_LeaseSetRequests) it.second->Complete (nullptr); - } + } void LeaseSetDestination::Run () { while (m_IsRunning) { try - { + { m_Service.run (); } catch (std::exception& ex) { LogPrint (eLogError, "Destination: runtime exception: ", ex.what ()); - } - } - } + } + } + } bool LeaseSetDestination::Start () - { + { if (!m_IsRunning) - { + { LoadTags (); m_IsRunning = true; m_Pool->SetLocalDestination (shared_from_this ()); - m_Pool->SetActive (true); + m_Pool->SetActive (true); m_CleanupTimer.expires_from_now (boost::posix_time::minutes (DESTINATION_CLEANUP_TIMEOUT)); m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); m_Thread = new std::thread (std::bind (&LeaseSetDestination::Run, shared_from_this ())); - + return true; - } + } else return false; } - + bool LeaseSetDestination::Stop () - { + { if (m_IsRunning) - { + { m_CleanupTimer.cancel (); m_PublishConfirmationTimer.cancel (); - m_PublishVerificationTimer.cancel (); + m_PublishVerificationTimer.cancel (); m_IsRunning = false; if (m_Pool) - { + { m_Pool->SetLocalDestination (nullptr); i2p::tunnel::tunnels.StopTunnelPool (m_Pool); - } + } m_Service.stop (); if (m_Thread) - { - m_Thread->join (); + { + m_Thread->join (); delete m_Thread; m_Thread = 0; - } + } SaveTags (); CleanUp (); // GarlicDestination return true; - } + } else return false; - } - + } + std::shared_ptr LeaseSetDestination::FindLeaseSet (const i2p::data::IdentHash& ident) { std::shared_ptr remoteLS; @@ -164,7 +164,7 @@ namespace client remoteLS = it->second; } - if (remoteLS) + if (remoteLS) { if (!remoteLS->IsExpired ()) { @@ -193,9 +193,9 @@ namespace client m_RemoteLeaseSets.erase (ident); return nullptr; } - } + } else - { + { auto ls = i2p::data::netdb.FindLeaseSet (ident); if (ls && !ls->IsExpired ()) { @@ -203,7 +203,7 @@ namespace client std::lock_guard _lock(m_RemoteLeaseSetsMutex); m_RemoteLeaseSets[ident] = ls; return ls; - } + } } return nullptr; } @@ -215,11 +215,11 @@ namespace client UpdateLeaseSet (); std::lock_guard l(m_LeaseSetMutex); return m_LeaseSet; - } + } void LeaseSetDestination::SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet) { - { + { std::lock_guard l(m_LeaseSetMutex); m_LeaseSet.reset (newLeaseSet); } @@ -229,21 +229,21 @@ namespace client m_PublishVerificationTimer.cancel (); Publish (); } - } - + } + void LeaseSetDestination::UpdateLeaseSet () { - int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels - if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum + int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels + if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum CreateNewLeaseSet (m_Pool->GetInboundTunnels (numTunnels)); - } + } bool LeaseSetDestination::SubmitSessionKey (const uint8_t * key, const uint8_t * tag) { struct { uint8_t k[32], t[32]; - } data; + } data; memcpy (data.k, key, 32); memcpy (data.t, tag, 32); auto s = shared_from_this (); @@ -256,42 +256,42 @@ namespace client void LeaseSetDestination::ProcessGarlicMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); + m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); } void LeaseSetDestination::ProcessDeliveryStatusMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); + m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); } void LeaseSetDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr from) { uint8_t typeID = buf[I2NP_HEADER_TYPEID_OFFSET]; switch (typeID) - { + { case eI2NPData: HandleDataMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); break; case eI2NPDeliveryStatus: // we assume tunnel tests non-encrypted HandleDeliveryStatusMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); - break; + break; case eI2NPDatabaseStore: HandleDatabaseStoreMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); break; case eI2NPDatabaseSearchReply: HandleDatabaseSearchReplyMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); - break; + break; default: i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); - } - } + } + } void LeaseSetDestination::HandleDatabaseStoreMessage (const uint8_t * buf, size_t len) { uint32_t replyToken = bufbe32toh (buf + DATABASE_STORE_REPLY_TOKEN_OFFSET); size_t offset = DATABASE_STORE_HEADER_SIZE; - if (replyToken) + if (replyToken) { LogPrint (eLogInfo, "Destination: Reply token is ignored for DatabaseStore"); offset += 36; @@ -307,8 +307,8 @@ namespace client { leaseSet = it->second; if (leaseSet->IsNewer (buf + offset, len - offset)) - { - leaseSet->Update (buf + offset, len - offset); + { + leaseSet->Update (buf + offset, len - offset); if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key) LogPrint (eLogDebug, "Destination: Remote LeaseSet updated"); else @@ -322,7 +322,7 @@ namespace client LogPrint (eLogDebug, "Destination: Remote LeaseSet is older. Not updated"); } else - { + { leaseSet = std::make_shared (buf + offset, len - offset); if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key) { @@ -339,18 +339,18 @@ namespace client LogPrint (eLogError, "Destination: New remote LeaseSet failed"); leaseSet = nullptr; } - } - } + } + } else LogPrint (eLogError, "Destination: Unexpected client's DatabaseStore type ", buf[DATABASE_STORE_TYPE_OFFSET], ", dropped"); - + auto it1 = m_LeaseSetRequests.find (key); if (it1 != m_LeaseSetRequests.end ()) { it1->second->requestTimeoutTimer.cancel (); if (it1->second) it1->second->Complete (leaseSet); m_LeaseSetRequests.erase (it1); - } + } } void LeaseSetDestination::HandleDatabaseSearchReplyMessage (const uint8_t * buf, size_t len) @@ -364,7 +364,7 @@ namespace client auto request = it->second; bool found = false; if (request->excluded.size () < MAX_NUM_FLOODFILLS_PER_REQUEST) - { + { for (int i = 0; i < num; i++) { i2p::data::IdentHash peerHash (buf + 33 + i*32); @@ -372,28 +372,28 @@ namespace client { LogPrint (eLogInfo, "Destination: Found new floodfill, request it"); // TODO: recheck this message i2p::data::netdb.RequestDestination (peerHash); - } + } } - + auto floodfill = i2p::data::netdb.GetClosestFloodfill (key, request->excluded); if (floodfill) { LogPrint (eLogInfo, "Destination: Requesting ", key.ToBase64 (), " at ", floodfill->GetIdentHash ().ToBase64 ()); if (SendLeaseSetRequest (key, floodfill, request)) found = true; - } - } + } + } if (!found) - { + { LogPrint (eLogInfo, "Destination: ", key.ToBase64 (), " was not found on ", MAX_NUM_FLOODFILLS_PER_REQUEST, " floodfills"); request->Complete (nullptr); m_LeaseSetRequests.erase (key); - } - } - else + } + } + else LogPrint (eLogWarning, "Destination: Request for ", key.ToBase64 (), " not found"); - } - + } + void LeaseSetDestination::HandleDeliveryStatusMessage (std::shared_ptr msg) { uint32_t msgID = bufbe32toh (msg->GetPayload () + DELIVERY_STATUS_MSGID_OFFSET); @@ -405,20 +405,20 @@ namespace client // schedule verification m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_VERIFICATION_TIMEOUT)); m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); } else i2p::garlic::GarlicDestination::HandleDeliveryStatusMessage (msg); - } + } void LeaseSetDestination::SetLeaseSetUpdated () - { + { UpdateLeaseSet (); } - + void LeaseSetDestination::Publish () - { - if (!m_LeaseSet || !m_Pool) + { + if (!m_LeaseSet || !m_Pool) { LogPrint (eLogError, "Destination: Can't publish non-existing LeaseSet"); return; @@ -435,9 +435,9 @@ namespace client m_PublishDelayTimer.cancel (); m_PublishDelayTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_MIN_INTERVAL)); m_PublishDelayTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishDelayTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); return; - } + } auto outbound = m_Pool->GetNextOutboundTunnel (); if (!outbound) { @@ -450,28 +450,28 @@ namespace client LogPrint (eLogError, "Destination: Can't publish LeaseSet. No inbound tunnels"); return; } - auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills); + auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills); if (!floodfill) { LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found"); m_ExcludedFloodfills.clear (); return; - } + } m_ExcludedFloodfills.insert (floodfill->GetIdentHash ()); LogPrint (eLogDebug, "Destination: Publish LeaseSet of ", GetIdentHash ().ToBase32 ()); RAND_bytes ((uint8_t *)&m_PublishReplyToken, 4); - auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound)); + auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound)); m_PublishConfirmationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_CONFIRMATION_TIMEOUT)); m_PublishConfirmationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishConfirmationTimer, - shared_from_this (), std::placeholders::_1)); - outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg); + shared_from_this (), std::placeholders::_1)); + outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg); m_LastSubmissionTime = ts; } void LeaseSetDestination::HandlePublishConfirmationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) - { + { if (m_PublishReplyToken) { LogPrint (eLogWarning, "Destination: Publish confirmation was not received in ", PUBLISH_CONFIRMATION_TIMEOUT, " seconds, will try again"); @@ -484,25 +484,25 @@ namespace client void LeaseSetDestination::HandlePublishVerificationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) - { + { auto s = shared_from_this (); - RequestLeaseSet (GetIdentHash (), + RequestLeaseSet (GetIdentHash (), // "this" added due to bug in gcc 4.7-4.8 [s,this](std::shared_ptr leaseSet) { - if (leaseSet) + if (leaseSet) { if (s->m_LeaseSet && *s->m_LeaseSet == *leaseSet) { // we got latest LeasetSet LogPrint (eLogDebug, "Destination: published LeaseSet verified for ", GetIdentHash().ToBase32()); s->m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_REGULAR_VERIFICATION_INTERNAL)); - s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1)); + s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1)); return; - } + } else LogPrint (eLogDebug, "Destination: LeaseSet is different than just published for ", GetIdentHash().ToBase32()); - } + } else LogPrint (eLogWarning, "Destination: couldn't find published LeaseSet for ", GetIdentHash().ToBase32()); // we have to publish again @@ -515,16 +515,16 @@ namespace client { if (ecode != boost::asio::error::operation_aborted) Publish (); - } - + } + bool LeaseSetDestination::RequestDestination (const i2p::data::IdentHash& dest, RequestComplete requestComplete) { - if (!m_Pool || !IsReady ()) - { - if (requestComplete) + if (!m_Pool || !IsReady ()) + { + if (requestComplete) m_Service.post ([requestComplete](void){requestComplete (nullptr);}); return false; - } + } m_Service.post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), dest, requestComplete)); return true; } @@ -536,14 +536,14 @@ namespace client { auto it = s->m_LeaseSetRequests.find (dest); if (it != s->m_LeaseSetRequests.end ()) - { - auto requestComplete = it->second; + { + auto requestComplete = it->second; s->m_LeaseSetRequests.erase (it); if (notify && requestComplete) requestComplete->Complete (nullptr); - } - }); + } + }); } - + void LeaseSetDestination::RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete) { std::set excluded; @@ -564,28 +564,28 @@ namespace client m_LeaseSetRequests.erase (ret.first); if (requestComplete) requestComplete (nullptr); } - } + } else // duplicate { LogPrint (eLogInfo, "Destination: Request of LeaseSet ", dest.ToBase64 (), " is pending already"); if (ts > ret.first->second->requestTime + MAX_LEASESET_REQUEST_TIMEOUT) - { + { // something went wrong m_LeaseSetRequests.erase (ret.first); if (requestComplete) requestComplete (nullptr); } else if (requestComplete) ret.first->second->requestComplete.push_back (requestComplete); - } - } + } + } else - { + { LogPrint (eLogError, "Destination: Can't request LeaseSet, no floodfills found"); if (requestComplete) requestComplete (nullptr); - } - } - - bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest, + } + } + + bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr nextFloodfill, std::shared_ptr request) { if (!request->replyTunnel || !request->replyTunnel->IsEstablished ()) @@ -594,36 +594,36 @@ namespace client if (!request->outboundTunnel || !request->outboundTunnel->IsEstablished ()) request->outboundTunnel = m_Pool->GetNextOutboundTunnel (); if (!request->outboundTunnel) LogPrint (eLogError, "Destination: Can't send LeaseSet request, no outbound tunnels found"); - + if (request->replyTunnel && request->outboundTunnel) - { + { request->excluded.insert (nextFloodfill->GetIdentHash ()); request->requestTimeoutTimer.cancel (); uint8_t replyKey[32], replyTag[32]; - RAND_bytes (replyKey, 32); // random session key + RAND_bytes (replyKey, 32); // random session key RAND_bytes (replyTag, 32); // random session tag AddSessionKey (replyKey, replyTag); auto msg = WrapMessage (nextFloodfill, - CreateLeaseSetDatabaseLookupMsg (dest, request->excluded, + CreateLeaseSetDatabaseLookupMsg (dest, request->excluded, request->replyTunnel, replyKey, replyTag)); request->outboundTunnel->SendTunnelDataMsg ( { - i2p::tunnel::TunnelMessageBlock - { + i2p::tunnel::TunnelMessageBlock + { i2p::tunnel::eDeliveryTypeRouter, nextFloodfill->GetIdentHash (), 0, msg } - }); + }); request->requestTimeoutTimer.expires_from_now (boost::posix_time::seconds(LEASESET_REQUEST_TIMEOUT)); request->requestTimeoutTimer.async_wait (std::bind (&LeaseSetDestination::HandleRequestTimoutTimer, shared_from_this (), std::placeholders::_1, dest)); - } + } else return false; return true; - } + } void LeaseSetDestination::HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest) { @@ -641,26 +641,26 @@ namespace client { // reset tunnels, because one them might fail it->second->outboundTunnel = nullptr; - it->second->replyTunnel = nullptr; + it->second->replyTunnel = nullptr; done = !SendLeaseSetRequest (dest, floodfill, it->second); } else done = true; } else - { + { LogPrint (eLogWarning, "Destination: ", dest.ToBase64 (), " was not found within ", MAX_LEASESET_REQUEST_TIMEOUT, " seconds"); done = true; } - + if (done) { - auto requestComplete = it->second; + auto requestComplete = it->second; m_LeaseSetRequests.erase (it); if (requestComplete) requestComplete->Complete (nullptr); - } - } - } + } + } + } } void LeaseSetDestination::HandleCleanupTimer (const boost::system::error_code& ecode) @@ -674,7 +674,7 @@ namespace client m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, shared_from_this (), std::placeholders::_1)); } - } + } void LeaseSetDestination::CleanupRemoteLeaseSets () { @@ -686,43 +686,43 @@ namespace client { LogPrint (eLogWarning, "Destination: Remote LeaseSet ", it->second->GetIdentHash ().ToBase64 (), " expired"); it = m_RemoteLeaseSets.erase (it); - } - else + } + else ++it; } - } + } ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params): LeaseSetDestination (isPublic, params), m_Keys (keys), m_DatagramDestination (nullptr), m_RefCounter (0), m_ReadyChecker(GetService()) { - if (isPublic) + if (isPublic) PersistTemporaryKeys (); else i2p::crypto::GenerateElGamalKeyPair(m_EncryptionPrivateKey, m_EncryptionPublicKey); if (isPublic) LogPrint (eLogInfo, "Destination: Local address ", GetIdentHash().ToBase32 (), " created"); - } + } - ClientDestination::~ClientDestination () + ClientDestination::~ClientDestination () { - } - + } + bool ClientDestination::Start () { if (LeaseSetDestination::Start ()) - { + { m_StreamingDestination = std::make_shared (GetSharedFromThis ()); // TODO: - m_StreamingDestination->Start (); + m_StreamingDestination->Start (); for (auto& it: m_StreamingDestinationsByPorts) it.second->Start (); return true; - } + } else return false; - } - + } + bool ClientDestination::Stop () { if (LeaseSetDestination::Stop ()) @@ -732,21 +732,21 @@ namespace client //m_StreamingDestination->SetOwner (nullptr); m_StreamingDestination = nullptr; for (auto& it: m_StreamingDestinationsByPorts) - { + { it.second->Stop (); //it.second->SetOwner (nullptr); } m_StreamingDestinationsByPorts.clear (); if (m_DatagramDestination) - { + { delete m_DatagramDestination; m_DatagramDestination = nullptr; - } + } return true; } else return false; - } + } #ifdef I2LUA void ClientDestination::Ready(ReadyPromise & p) @@ -773,14 +773,14 @@ namespace client ScheduleCheckForReady(p); } #endif - + void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len) { uint32_t length = bufbe32toh (buf); buf += 4; // we assume I2CP payload uint16_t fromPort = bufbe16toh (buf + 4), // source - toPort = bufbe16toh (buf + 6); // destination + toPort = bufbe16toh (buf + 6); // destination switch (buf[9]) { case PROTOCOL_TYPE_STREAMING: @@ -804,14 +804,14 @@ namespace client LogPrint (eLogError, "Destination: Data: unexpected protocol ", buf[9]); } } - - void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) + + void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) { - if (!streamRequestComplete) + if (!streamRequestComplete) { LogPrint (eLogError, "Destination: request callback is not specified in CreateStream"); return; - } + } auto leaseSet = FindLeaseSet (dest); if (leaseSet) streamRequestComplete(CreateStream (leaseSet, port)); @@ -837,18 +837,18 @@ namespace client return nullptr; } - std::shared_ptr ClientDestination::GetStreamingDestination (int port) const - { - if (port) + std::shared_ptr ClientDestination::GetStreamingDestination (int port) const + { + if (port) { auto it = m_StreamingDestinationsByPorts.find (port); if (it != m_StreamingDestinationsByPorts.end ()) return it->second; - } + } // if port is zero or not found, use default destination - return m_StreamingDestination; + return m_StreamingDestination; } - + void ClientDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor) { if (m_StreamingDestination) @@ -860,35 +860,35 @@ namespace client if (m_StreamingDestination) m_StreamingDestination->ResetAcceptor (); } - + bool ClientDestination::IsAcceptingStreams () const { if (m_StreamingDestination) return m_StreamingDestination->IsAcceptorSet (); return false; - } + } void ClientDestination::AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor) { if (m_StreamingDestination) m_StreamingDestination->AcceptOnce (acceptor); - } - + } + std::shared_ptr ClientDestination::CreateStreamingDestination (int port, bool gzip) { - auto dest = std::make_shared (GetSharedFromThis (), port, gzip); + auto dest = std::make_shared (GetSharedFromThis (), port, gzip); if (port) m_StreamingDestinationsByPorts[port] = dest; - else // update default + else // update default m_StreamingDestination = dest; return dest; - } - + } + i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination () { if (m_DatagramDestination == nullptr) m_DatagramDestination = new i2p::datagram::DatagramDestination (GetSharedFromThis ()); - return m_DatagramDestination; + return m_DatagramDestination; } std::vector > ClientDestination::GetAllStreams () const @@ -898,12 +898,12 @@ namespace client { for (auto& it: m_StreamingDestination->GetStreams ()) ret.push_back (it.second); - } + } for (auto& it: m_StreamingDestinationsByPorts) for (auto& it1: it.second->GetStreams ()) ret.push_back (it1.second); return ret; - } + } void ClientDestination::PersistTemporaryKeys () { @@ -927,7 +927,7 @@ namespace client return; } LogPrint(eLogError, "Destinations: Can't save keys to ", path); - } + } void ClientDestination::CreateNewLeaseSet (std::vector > tunnels) { @@ -935,7 +935,7 @@ namespace client // sign Sign (leaseSet->GetBuffer (), leaseSet->GetBufferLen () - leaseSet->GetSignatureLen (), leaseSet->GetSignature ()); // TODO SetLeaseSet (leaseSet); - } + } void ClientDestination::CleanupDestination () { diff --git a/libi2pd_client/ClientContext.cpp b/libi2pd_client/ClientContext.cpp index 4eea297e..08c164ae 100644 --- a/libi2pd_client/ClientContext.cpp +++ b/libi2pd_client/ClientContext.cpp @@ -48,7 +48,7 @@ namespace client std::shared_ptr localDestination; bool httproxy; i2p::config::GetOption("httpproxy.enabled", httproxy); - if (httproxy) + if (httproxy) { std::string httpProxyKeys; i2p::config::GetOption("httpproxy.keys", httpProxyKeys); std::string httpProxyAddr; i2p::config::GetOption("httpproxy.address", httpProxyAddr); @@ -68,12 +68,12 @@ namespace client else LogPrint(eLogError, "Clients: failed to load HTTP Proxy key"); } - try + try { m_HttpProxy = new i2p::proxy::HTTPProxy(httpProxyAddr, httpProxyPort, localDestination); m_HttpProxy->Start(); - } - catch (std::exception& e) + } + catch (std::exception& e) { LogPrint(eLogError, "Clients: Exception in HTTP Proxy: ", e.what()); } @@ -253,7 +253,7 @@ namespace client void ClientContext::ReloadConfig () { - // TODO: handle config changes + // TODO: handle config changes /*std::string config; i2p::config::GetOption("conf", config); i2p::config::ParseConfig(config);*/ @@ -269,7 +269,7 @@ namespace client std::unique_lock l(m_DestinationsMutex); for (auto it = m_Destinations.begin (); it != m_Destinations.end ();) { - auto dest = it->second; + auto dest = it->second; if (dest->GetRefCounter () > 0) ++it; // skip else { @@ -551,6 +551,13 @@ namespace client clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); clientEndpoint = ((I2PClientTunnel*)clientTunnel)->GetLocalEndpoint (); } + uint32_t timeout = section.second.get(I2P_CLIENT_TUNNEL_CONNECT_TIMEOUT, 0); + if(timeout) + { + clientTunnel->SetConnectTimeout(timeout); + LogPrint(eLogInfo, "Clients: I2P Client tunnel connect timeout set to ", timeout); + } + auto ins = m_ClientTunnels.insert (std::make_pair (clientEndpoint, std::unique_ptr(clientTunnel))); if (ins.second) { @@ -657,7 +664,7 @@ namespace client } auto ins = m_ServerTunnels.insert (std::make_pair ( std::make_pair (localDestination->GetIdentHash (), inPort), - std::unique_ptr(serverTunnel))); + std::unique_ptr(serverTunnel))); if (ins.second) { serverTunnel->Start (); @@ -715,8 +722,8 @@ namespace client it = c.erase (it); } else - it++; - } + it++; + } } template diff --git a/libi2pd_client/ClientContext.h b/libi2pd_client/ClientContext.h index 0eef1f79..f22c0817 100644 --- a/libi2pd_client/ClientContext.h +++ b/libi2pd_client/ClientContext.h @@ -36,6 +36,7 @@ namespace client const char I2P_CLIENT_TUNNEL_SIGNATURE_TYPE[] = "signaturetype"; const char I2P_CLIENT_TUNNEL_DESTINATION_PORT[] = "destinationport"; const char I2P_CLIENT_TUNNEL_MATCH_TUNNELS[] = "matchtunnels"; + const char I2P_CLIENT_TUNNEL_CONNECT_TIMEOUT[] = "connecttimeout"; const char I2P_SERVER_TUNNEL_HOST[] = "host"; const char I2P_SERVER_TUNNEL_HOST_OVERRIDE[] = "hostoverride"; const char I2P_SERVER_TUNNEL_PORT[] = "port"; diff --git a/libi2pd_client/I2PService.cpp b/libi2pd_client/I2PService.cpp index 9348ec48..a90cf3d4 100644 --- a/libi2pd_client/I2PService.cpp +++ b/libi2pd_client/I2PService.cpp @@ -2,6 +2,7 @@ #include "Identity.h" #include "ClientContext.h" #include "I2PService.h" +#include namespace i2p { @@ -11,37 +12,101 @@ namespace client I2PService::I2PService (std::shared_ptr localDestination): m_LocalDestination (localDestination ? localDestination : - i2p::client::context.CreateNewLocalDestination (false, I2P_SERVICE_DEFAULT_KEY_TYPE)), isUpdated (true) + i2p::client::context.CreateNewLocalDestination (false, I2P_SERVICE_DEFAULT_KEY_TYPE)), + m_ReadyTimer(m_LocalDestination->GetService()), + m_ConnectTimeout(0), + isUpdated (true) { - m_LocalDestination->Acquire (); + m_LocalDestination->Acquire (); } - + I2PService::I2PService (i2p::data::SigningKeyType kt): m_LocalDestination (i2p::client::context.CreateNewLocalDestination (false, kt)), + m_ReadyTimer(m_LocalDestination->GetService()), + m_ConnectTimeout(0), isUpdated (true) { m_LocalDestination->Acquire (); } - - I2PService::~I2PService () - { - ClearHandlers (); - if (m_LocalDestination) m_LocalDestination->Release (); + + I2PService::~I2PService () + { + ClearHandlers (); + if (m_LocalDestination) m_LocalDestination->Release (); } void I2PService::ClearHandlers () { + if(m_ConnectTimeout) + m_ReadyTimer.cancel(); std::unique_lock l(m_HandlersMutex); for (auto it: m_Handlers) it->Terminate (); m_Handlers.clear(); } - + + void I2PService::SetConnectTimeout(uint32_t timeout) + { + if(timeout && !m_ConnectTimeout) + { + TriggerReadyCheckTimer(); + } + else if (m_ConnectTimeout && !timeout) + { + m_ReadyTimer.cancel(); + } + m_ConnectTimeout = timeout; + } + + void I2PService::AddReadyCallback(ReadyCallback cb) + { + uint32_t now = i2p::util::GetSecondsSinceEpoch(); + uint32_t tm = now + m_ConnectTimeout; + LogPrint(eLogDebug, "I2PService::AddReadyCallback() ", tm, " ", now); + m_ReadyCallbacks.push_back({cb, tm}); + } + + void I2PService::TriggerReadyCheckTimer() + { + m_ReadyTimer.expires_from_now(boost::posix_time::seconds (1)); + m_ReadyTimer.async_wait(std::bind(&I2PService::HandleReadyCheckTimer, this, std::placeholders::_1)); + } + + void I2PService::HandleReadyCheckTimer(const boost::system::error_code &ec) + { + if(ec || m_LocalDestination->IsReady()) + { + for(auto & itr : m_ReadyCallbacks) + itr.first(ec); + m_ReadyCallbacks.clear(); + } + else if(!m_LocalDestination->IsReady()) + { + // expire timed out requests + uint32_t now = i2p::util::GetSecondsSinceEpoch (); + auto itr = m_ReadyCallbacks.begin(); + while(itr != m_ReadyCallbacks.end()) + { + if(itr->second >= now) + { + itr->first(boost::asio::error::timed_out); + itr = m_ReadyCallbacks.erase(itr); + } + else + ++itr; + } + } + if(!ec) + TriggerReadyCheckTimer(); + } + void I2PService::CreateStream (StreamRequestComplete streamRequestComplete, const std::string& dest, int port) { assert(streamRequestComplete); i2p::data::IdentHash identHash; if (i2p::client::context.GetAddressBook ().GetIdentHash (dest, identHash)) - m_LocalDestination->CreateStream (streamRequestComplete, identHash, port); + { + CreateStream(streamRequestComplete, identHash, port); + } else { LogPrint (eLogWarning, "I2PService: Remote destination not found: ", dest); @@ -49,6 +114,29 @@ namespace client } } + void I2PService::CreateStream(StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash & identHash, int port) + { + if(m_ConnectTimeout) + { + if(m_LocalDestination->IsReady()) + m_LocalDestination->CreateStream (streamRequestComplete, identHash, port); + else + { + AddReadyCallback([this, streamRequestComplete, identHash, port] (const boost::system::error_code & ec) { + if(ec) + { + LogPrint(eLogWarning, "I2PService::CeateStream() ", ec.message()); + streamRequestComplete(nullptr); + } + else + this->m_LocalDestination->CreateStream(streamRequestComplete, identHash, port); + }); + } + } + else + m_LocalDestination->CreateStream(streamRequestComplete, identHash, port); + } + TCPIPPipe::TCPIPPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream) { boost::asio::socket_base::receive_buffer_size option(TCP_IP_PIPE_BUFFER_SIZE); @@ -60,7 +148,7 @@ namespace client { Terminate(); } - + void TCPIPPipe::Start() { AsyncReceiveUpstream(); @@ -84,7 +172,7 @@ namespace client } Done(shared_from_this()); } - + void TCPIPPipe::AsyncReceiveUpstream() { if (m_up) { @@ -132,12 +220,12 @@ namespace client shared_from_this(), std::placeholders::_1) ); - } else { + } else { LogPrint(eLogError, "TCPIPPipe: downstream write: no socket"); } } - - + + void TCPIPPipe::HandleDownstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered) { LogPrint(eLogDebug, "TCPIPPipe: downstream: ", (int) bytes_transfered, " bytes received"); @@ -162,7 +250,7 @@ namespace client AsyncReceiveUpstream(); } } - + void TCPIPPipe::HandleUpstreamWrite(const boost::system::error_code & ecode) { if (ecode) { LogPrint(eLogError, "TCPIPPipe: upstream write error:" , ecode.message()); @@ -172,7 +260,7 @@ namespace client AsyncReceiveDownstream(); } } - + void TCPIPPipe::HandleUpstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered) { LogPrint(eLogDebug, "TCPIPPipe: upstream ", (int)bytes_transfered, " bytes received"); @@ -187,7 +275,7 @@ namespace client DownstreamWrite(bytes_transfered); } } - + void TCPIPAcceptor::Start () { m_Acceptor.reset (new boost::asio::ip::tcp::acceptor (GetService (), m_LocalEndpoint)); @@ -198,10 +286,10 @@ namespace client void TCPIPAcceptor::Stop () { if (m_Acceptor) - { + { m_Acceptor->close(); m_Acceptor.reset (nullptr); - } + } m_Timer.cancel (); ClearHandlers(); } @@ -219,12 +307,12 @@ namespace client { LogPrint(eLogDebug, "I2PService: ", GetName(), " accepted"); auto handler = CreateHandler(socket); - if (handler) + if (handler) { AddHandler(handler); handler->Handle(); - } - else + } + else socket->close(); Accept(); } diff --git a/libi2pd_client/I2PService.h b/libi2pd_client/I2PService.h index cf0cfd98..b708125f 100644 --- a/libi2pd_client/I2PService.h +++ b/libi2pd_client/I2PService.h @@ -14,8 +14,10 @@ namespace i2p namespace client { class I2PServiceHandler; - class I2PService + class I2PService : std::enable_shared_from_this { + public: + typedef std::function ReadyCallback; public: I2PService (std::shared_ptr localDestination = nullptr); I2PService (i2p::data::SigningKeyType kt); @@ -33,25 +35,37 @@ namespace client } void ClearHandlers (); + void SetConnectTimeout(uint32_t timeout); + + void AddReadyCallback(ReadyCallback cb); + inline std::shared_ptr GetLocalDestination () { return m_LocalDestination; } inline std::shared_ptr GetLocalDestination () const { return m_LocalDestination; } inline void SetLocalDestination (std::shared_ptr dest) { m_LocalDestination = dest; } void CreateStream (StreamRequestComplete streamRequestComplete, const std::string& dest, int port = 0); - + void CreateStream(StreamRequestComplete complete, const i2p::data::IdentHash & ident, int port); inline boost::asio::io_service& GetService () { return m_LocalDestination->GetService (); } virtual void Start () = 0; virtual void Stop () = 0; virtual const char* GetName() { return "Generic I2P Service"; } + + private: + void TriggerReadyCheckTimer(); + void HandleReadyCheckTimer(const boost::system::error_code & ec); + private: std::shared_ptr m_LocalDestination; std::unordered_set > m_Handlers; std::mutex m_HandlersMutex; + std::vector > m_ReadyCallbacks; + boost::asio::deadline_timer m_ReadyTimer; + uint32_t m_ConnectTimeout; public: - bool isUpdated; // transient, used during reload only + bool isUpdated; // transient, used during reload only }; /*Simple interface for I2PHandlers, allows detection of finalization amongst other things */ @@ -64,7 +78,7 @@ namespace client virtual void Handle() {}; //Start handling the socket void Terminate () { Kill (); }; - + protected: // Call when terminating or handing over to avoid race conditions inline bool Kill () { return m_Dead.exchange(true); } @@ -102,7 +116,7 @@ namespace client uint8_t m_upstream_buf[TCP_IP_PIPE_BUFFER_SIZE], m_downstream_buf[TCP_IP_PIPE_BUFFER_SIZE]; std::shared_ptr m_up, m_down; }; - + /* TODO: support IPv6 too */ //This is a service that listens for connections on the IP network and interacts with I2P class TCPIPAcceptor: public I2PService diff --git a/libi2pd_client/I2PTunnel.cpp b/libi2pd_client/I2PTunnel.cpp index 71b09e1a..8d559aee 100644 --- a/libi2pd_client/I2PTunnel.cpp +++ b/libi2pd_client/I2PTunnel.cpp @@ -14,19 +14,19 @@ namespace client static void I2PTunnelSetSocketOptions(std::shared_ptr socket) { if (socket && socket->is_open()) - { + { boost::asio::socket_base::receive_buffer_size option(I2P_TUNNEL_CONNECTION_BUFFER_SIZE); socket->set_option(option); } } - + I2PTunnelConnection::I2PTunnelConnection (I2PService * owner, std::shared_ptr socket, - std::shared_ptr leaseSet, int port): + std::shared_ptr leaseSet, int port): I2PServiceHandler(owner), m_Socket (socket), m_RemoteEndpoint (socket->remote_endpoint ()), m_IsQuiet (true) { m_Stream = GetOwner()->GetLocalDestination ()->CreateStream (leaseSet, port); - } + } I2PTunnelConnection::I2PTunnelConnection (I2PService * owner, std::shared_ptr socket, std::shared_ptr stream): @@ -44,15 +44,15 @@ namespace client I2PTunnelConnection::~I2PTunnelConnection () { - } - + } + void I2PTunnelConnection::I2PConnect (const uint8_t * msg, size_t len) { if (m_Stream) { if (msg) m_Stream->Send (msg, len); // connect and send - else + else m_Stream->Send (m_Buffer, 0); // connect } StreamReceive (); @@ -68,11 +68,11 @@ namespace client boost::asio::ip::address ourIP = boost::asio::ip::address_v4 (bytes); return ourIP; } - + static void MapToLoopback(const std::shared_ptr & sock, const i2p::data::IdentHash & addr) { - // bind to 127.x.x.x address + // bind to 127.x.x.x address // where x.x.x are first three bytes from ident auto ourIP = GetLoopbackAddressFor(addr); sock->bind (boost::asio::ip::tcp::endpoint (ourIP, 0)); @@ -82,7 +82,7 @@ namespace client void I2PTunnelConnection::Connect (bool isUniqueLocal) { I2PTunnelSetSocketOptions(m_Socket); - if (m_Socket) + if (m_Socket) { #ifdef __linux__ if (isUniqueLocal && m_RemoteEndpoint.address ().is_v4 () && @@ -96,8 +96,8 @@ namespace client m_Socket->async_connect (m_RemoteEndpoint, std::bind (&I2PTunnelConnection::HandleConnect, shared_from_this (), std::placeholders::_1)); } - } - + } + void I2PTunnelConnection::Terminate () { if (Kill()) return; @@ -105,21 +105,21 @@ namespace client { m_Stream->Close (); m_Stream.reset (); - } + } boost::system::error_code ec; m_Socket->shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); // avoid RST m_Socket->close (); Done(shared_from_this ()); - } + } void I2PTunnelConnection::Receive () { - m_Socket->async_read_some (boost::asio::buffer(m_Buffer, I2P_TUNNEL_CONNECTION_BUFFER_SIZE), - std::bind(&I2PTunnelConnection::HandleReceived, shared_from_this (), + m_Socket->async_read_some (boost::asio::buffer(m_Buffer, I2P_TUNNEL_CONNECTION_BUFFER_SIZE), + std::bind(&I2PTunnelConnection::HandleReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - } - + } + void I2PTunnelConnection::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) @@ -131,9 +131,9 @@ namespace client } } else - { + { if (m_Stream) - { + { auto s = shared_from_this (); m_Stream->AsyncSend (m_Buffer, bytes_transferred, [s](const boost::system::error_code& ecode) @@ -143,9 +143,9 @@ namespace client else s->Terminate (); }); - } + } } - } + } void I2PTunnelConnection::HandleWrite (const boost::system::error_code& ecode) { @@ -165,12 +165,12 @@ namespace client { if (m_Stream->GetStatus () == i2p::stream::eStreamStatusNew || m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular - { + { m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, I2P_TUNNEL_CONNECTION_BUFFER_SIZE), std::bind (&I2PTunnelConnection::HandleStreamReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2), I2P_TUNNEL_CONNECTION_MAX_IDLE); - } + } else // closed by peer { // get remaning data @@ -178,10 +178,10 @@ namespace client if (len > 0) // still some data Write (m_StreamBuffer, len); else // no more data - Terminate (); - } - } - } + Terminate (); + } + } + } void I2PTunnelConnection::HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) { @@ -231,8 +231,8 @@ namespace client memcpy (m_StreamBuffer, dest.c_str (), dest.size ()); } HandleStreamReceive (boost::system::error_code (), dest.size ()); - } - Receive (); + } + Receive (); } } @@ -253,20 +253,20 @@ namespace client { if (line == "\r") endOfHeader = true; else - { + { if (!m_ConnectionSent && !line.compare(0, 10, "Connection")) - { + { m_OutHeader << "Connection: close\r\n"; m_ConnectionSent = true; - } + } else if (!m_ProxyConnectionSent && !line.compare(0, 16, "Proxy-Connection")) - { + { m_OutHeader << "Proxy-Connection: close\r\n"; m_ProxyConnectionSent = true; - } + } else m_OutHeader << line << "\n"; - } + } } else break; @@ -283,10 +283,10 @@ namespace client I2PTunnelConnection::Write ((uint8_t *)m_OutHeader.str ().c_str (), m_OutHeader.str ().length ()); } } - } - + } + I2PServerTunnelConnectionHTTP::I2PServerTunnelConnectionHTTP (I2PService * owner, std::shared_ptr stream, - std::shared_ptr socket, + std::shared_ptr socket, const boost::asio::ip::tcp::endpoint& target, const std::string& host): I2PTunnelConnection (owner, stream, socket, target), m_Host (host), m_HeaderSent (false), m_From (stream->GetRemoteIdentity ()) { @@ -297,7 +297,7 @@ namespace client if (m_HeaderSent) I2PTunnelConnection::Write (buf, len); else - { + { m_InHeader.clear (); m_InHeader.write ((const char *)buf, len); std::string line; @@ -309,12 +309,12 @@ namespace client { if (line == "\r") endOfHeader = true; else - { + { if (m_Host.length () > 0 && line.find ("Host:") != std::string::npos) m_OutHeader << "Host: " << m_Host << "\r\n"; // override host else m_OutHeader << line << "\n"; - } + } } else break; @@ -335,13 +335,13 @@ namespace client m_HeaderSent = true; I2PTunnelConnection::Write ((uint8_t *)m_OutHeader.str ().c_str (), m_OutHeader.str ().length ()); } - } + } } I2PTunnelConnectionIRC::I2PTunnelConnectionIRC (I2PService * owner, std::shared_ptr stream, - std::shared_ptr socket, + std::shared_ptr socket, const boost::asio::ip::tcp::endpoint& target, const std::string& webircpass): - I2PTunnelConnection (owner, stream, socket, target), m_From (stream->GetRemoteIdentity ()), + I2PTunnelConnection (owner, stream, socket, target), m_From (stream->GetRemoteIdentity ()), m_NeedsWebIrc (webircpass.length() ? true : false), m_WebircPass (webircpass) { } @@ -349,7 +349,7 @@ namespace client void I2PTunnelConnectionIRC::Write (const uint8_t * buf, size_t len) { m_OutPacket.str (""); - if (m_NeedsWebIrc) + if (m_NeedsWebIrc) { m_NeedsWebIrc = false; m_OutPacket << "WEBIRC " << m_WebircPass << " cgiirc " << context.GetAddressBook ().ToAddress (m_From->GetIdentHash ()) << " " << GetSocket ()->local_endpoint ().address () << std::endl; @@ -357,12 +357,12 @@ namespace client m_InPacket.clear (); m_InPacket.write ((const char *)buf, len); - + while (!m_InPacket.eof () && !m_InPacket.fail ()) { std::string line; std::getline (m_InPacket, line); - if (line.length () == 0 && m_InPacket.eof ()) + if (line.length () == 0 && m_InPacket.eof ()) m_InPacket.str (""); auto pos = line.find ("USER"); if (!pos) // start of line @@ -375,7 +375,7 @@ namespace client m_OutPacket << line.substr (0, pos); m_OutPacket << context.GetAddressBook ().ToAddress (m_From->GetIdentHash ()); m_OutPacket << line.substr (nextpos) << '\n'; - } + } else m_OutPacket << line << '\n'; } @@ -389,7 +389,7 @@ namespace client public: I2PClientTunnelHandler (I2PClientTunnel * parent, i2p::data::IdentHash destination, int destinationPort, std::shared_ptr socket): - I2PServiceHandler(parent), m_DestinationIdentHash(destination), + I2PServiceHandler(parent), m_DestinationIdentHash(destination), m_DestinationPort (destinationPort), m_Socket(socket) {}; void Handle(); void Terminate(); @@ -402,8 +402,8 @@ namespace client void I2PClientTunnelHandler::Handle() { - GetOwner()->GetLocalDestination ()->CreateStream ( - std::bind (&I2PClientTunnelHandler::HandleStreamRequestComplete, shared_from_this(), std::placeholders::_1), + GetOwner()->CreateStream ( + std::bind (&I2PClientTunnelHandler::HandleStreamRequestComplete, shared_from_this(), std::placeholders::_1), m_DestinationIdentHash, m_DestinationPort); } @@ -436,12 +436,12 @@ namespace client Done(shared_from_this()); } - I2PClientTunnel::I2PClientTunnel (const std::string& name, const std::string& destination, - const std::string& address, int port, std::shared_ptr localDestination, int destinationPort): - TCPIPAcceptor (address, port, localDestination), m_Name (name), m_Destination (destination), - m_DestinationIdentHash (nullptr), m_DestinationPort (destinationPort) + I2PClientTunnel::I2PClientTunnel (const std::string& name, const std::string& destination, + const std::string& address, int port, std::shared_ptr localDestination, int destinationPort): + TCPIPAcceptor (address, port, localDestination), m_Name (name), m_Destination (destination), + m_DestinationIdentHash (nullptr), m_DestinationPort (destinationPort) { - } + } void I2PClientTunnel::Start () { @@ -480,8 +480,8 @@ namespace client return nullptr; } - I2PServerTunnel::I2PServerTunnel (const std::string& name, const std::string& address, - int port, std::shared_ptr localDestination, int inport, bool gzip): + I2PServerTunnel::I2PServerTunnel (const std::string& name, const std::string& address, + int port, std::shared_ptr localDestination, int inport, bool gzip): I2PService (localDestination), m_IsUniqueLocal(true), m_Name (name), m_Address (address), m_Port (port), m_IsAccessList (false) { m_PortDestination = localDestination->CreateStreamingDestination (inport > 0 ? inport : port, gzip); @@ -489,10 +489,10 @@ namespace client void I2PServerTunnel::Start () { - m_Endpoint.port (m_Port); + m_Endpoint.port (m_Port); boost::system::error_code ec; auto addr = boost::asio::ip::address::from_string (m_Address, ec); - if (!ec) + if (!ec) { m_Endpoint.address (addr); Accept (); @@ -500,27 +500,27 @@ namespace client else { auto resolver = std::make_shared(GetService ()); - resolver->async_resolve (boost::asio::ip::tcp::resolver::query (m_Address, ""), - std::bind (&I2PServerTunnel::HandleResolve, this, + resolver->async_resolve (boost::asio::ip::tcp::resolver::query (m_Address, ""), + std::bind (&I2PServerTunnel::HandleResolve, this, std::placeholders::_1, std::placeholders::_2, resolver)); - } + } } void I2PServerTunnel::Stop () { ClearHandlers (); - } + } - void I2PServerTunnel::HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, + void I2PServerTunnel::HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, std::shared_ptr resolver) - { + { if (!ecode) - { + { auto addr = (*it).endpoint ().address (); LogPrint (eLogInfo, "I2PTunnel: server tunnel ", (*it).host_name (), " has been resolved to ", addr); m_Endpoint.address (addr); - Accept (); - } + Accept (); + } else LogPrint (eLogError, "I2PTunnel: Unable to resolve server tunnel address: ", ecode.message ()); } @@ -528,7 +528,7 @@ namespace client void I2PServerTunnel::SetAccessList (const std::set& accessList) { m_AccessList = accessList; - m_IsAccessList = true; + m_IsAccessList = true; } void I2PServerTunnel::Accept () @@ -536,7 +536,7 @@ namespace client if (m_PortDestination) m_PortDestination->SetAcceptor (std::bind (&I2PServerTunnel::HandleAccept, this, std::placeholders::_1)); - auto localDestination = GetLocalDestination (); + auto localDestination = GetLocalDestination (); if (localDestination) { if (!localDestination->IsAcceptingStreams ()) // set it as default if not set yet @@ -549,7 +549,7 @@ namespace client void I2PServerTunnel::HandleAccept (std::shared_ptr stream) { if (stream) - { + { if (m_IsAccessList) { if (!m_AccessList.count (stream->GetRemoteIdentity ()->GetIdentHash ())) @@ -563,31 +563,31 @@ namespace client auto conn = CreateI2PConnection (stream); AddHandler (conn); conn->Connect (m_IsUniqueLocal); - } + } } std::shared_ptr I2PServerTunnel::CreateI2PConnection (std::shared_ptr stream) { return std::make_shared (this, stream, std::make_shared (GetService ()), GetEndpoint ()); - + } - I2PServerTunnelHTTP::I2PServerTunnelHTTP (const std::string& name, const std::string& address, - int port, std::shared_ptr localDestination, + I2PServerTunnelHTTP::I2PServerTunnelHTTP (const std::string& name, const std::string& address, + int port, std::shared_ptr localDestination, const std::string& host, int inport, bool gzip): - I2PServerTunnel (name, address, port, localDestination, inport, gzip), + I2PServerTunnel (name, address, port, localDestination, inport, gzip), m_Host (host) { } std::shared_ptr I2PServerTunnelHTTP::CreateI2PConnection (std::shared_ptr stream) { - return std::make_shared (this, stream, + return std::make_shared (this, stream, std::make_shared (GetService ()), GetEndpoint (), m_Host); } - I2PServerTunnelIRC::I2PServerTunnelIRC (const std::string& name, const std::string& address, - int port, std::shared_ptr localDestination, + I2PServerTunnelIRC::I2PServerTunnelIRC (const std::string& name, const std::string& address, + int port, std::shared_ptr localDestination, const std::string& webircpass, int inport, bool gzip): I2PServerTunnel (name, address, port, localDestination, inport, gzip), m_WebircPass (webircpass) @@ -631,7 +631,7 @@ namespace client m_Sessions.erase(port); } } - + UDPSessionPtr I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) { auto ih = from.GetIdentHash(); @@ -646,12 +646,12 @@ namespace client } boost::asio::ip::address addr; /** create new udp session */ - if(m_IsUniqueLocal && m_LocalAddress.is_loopback()) + if(m_IsUniqueLocal && m_LocalAddress.is_loopback()) { auto ident = from.GetIdentHash(); addr = GetLoopbackAddressFor(ident); - } - else + } + else addr = m_LocalAddress; boost::asio::ip::udp::endpoint ep(addr, 0); m_Sessions.push_back(std::make_shared(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort)); @@ -680,7 +680,7 @@ namespace client IPSocket.async_receive_from(boost::asio::buffer(m_Buffer, I2P_UDP_MAX_MTU), FromEndpoint, std::bind(&UDPSession::HandleReceived, this, std::placeholders::_1, std::placeholders::_2)); } - + void UDPSession::HandleReceived(const boost::system::error_code & ecode, std::size_t len) { if(!ecode) @@ -694,8 +694,8 @@ namespace client } } - - + + I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) : m_IsUniqueLocal(true), @@ -713,7 +713,7 @@ namespace client { auto dgram = m_LocalDest->GetDatagramDestination(); if (dgram) dgram->ResetReceiver(); - + LogPrint(eLogInfo, "UDPServer: done"); } @@ -742,7 +742,7 @@ namespace client } return sessions; } - + I2PUDPClientTunnel::I2PUDPClientTunnel(const std::string & name, const std::string &remoteDest, boost::asio::ip::udp::endpoint localEndpoint, std::shared_ptr localDestination, @@ -764,8 +764,8 @@ namespace client std::placeholders::_5)); } - - + + void I2PUDPClientTunnel::Start() { m_LocalDest->Start(); if (m_ResolveThread == nullptr) @@ -803,14 +803,14 @@ namespace client m_Sessions[remotePort].second = i2p::util::GetMillisecondsSinceEpoch(); RecvFromLocal(); } - + std::vector > I2PUDPClientTunnel::GetSessions() { // TODO: implement std::vector > infos; return infos; } - + void I2PUDPClientTunnel::TryResolving() { LogPrint(eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest); i2p::data::IdentHash * h = new i2p::data::IdentHash; @@ -846,11 +846,11 @@ namespace client } } else - LogPrint(eLogWarning, "UDP Client: not tracking udp session using port ", (int) toPort); + LogPrint(eLogWarning, "UDP Client: not tracking udp session using port ", (int) toPort); } else LogPrint(eLogWarning, "UDP Client: unwarrented traffic from ", from.GetIdentHash().ToBase32()); - + } I2PUDPClientTunnel::~I2PUDPClientTunnel() { @@ -858,7 +858,7 @@ namespace client if (dgram) dgram->ResetReceiver(); m_Sessions.clear(); - + if(m_LocalSocket.is_open()) m_LocalSocket.close();