From 416589cc930f0646b02d0db239f60d55aa3db05a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 31 Aug 2017 10:38:26 -0400 Subject: [PATCH] Revert "add deferred ready checking for destination" This reverts commit 3f409d0e285489e5b66caf176e038af0e7fe469b. --- libi2pd/Destination.cpp | 396 ++++++++++++++++++---------------------- libi2pd/Destination.h | 14 +- 2 files changed, 179 insertions(+), 231 deletions(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index 5f17856b..d45fdf47 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -13,10 +13,9 @@ namespace i2p namespace client { LeaseSetDestination::LeaseSetDestination (bool isPublic, const std::map * params): - m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), - m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), - m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service), - m_ReadyCheckTimer(m_Service) + m_IsRunning (false), m_Thread (nullptr), m_IsPublic (isPublic), + m_PublishReplyToken (0), m_LastSubmissionTime (0), m_PublishConfirmationTimer (m_Service), + m_PublishVerificationTimer (m_Service), m_PublishDelayTimer (m_Service), m_CleanupTimer (m_Service) { int inLen = DEFAULT_INBOUND_TUNNEL_LENGTH; int inQty = DEFAULT_INBOUND_TUNNELS_QUANTITY; @@ -84,80 +83,77 @@ namespace client LeaseSetDestination::~LeaseSetDestination () { - if (m_IsRunning) + if (m_IsRunning) Stop (); if (m_Pool) - i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); + i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); for (auto& it: m_LeaseSetRequests) it.second->Complete (nullptr); - } + } void LeaseSetDestination::Run () { while (m_IsRunning) { try - { + { m_Service.run (); } catch (std::exception& ex) { LogPrint (eLogError, "Destination: runtime exception: ", ex.what ()); - } - } - } + } + } + } bool LeaseSetDestination::Start () - { + { if (!m_IsRunning) - { + { LoadTags (); m_IsRunning = true; m_Pool->SetLocalDestination (shared_from_this ()); - m_Pool->SetActive (true); + m_Pool->SetActive (true); m_CleanupTimer.expires_from_now (boost::posix_time::minutes (DESTINATION_CLEANUP_TIMEOUT)); m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); m_Thread = new std::thread (std::bind (&LeaseSetDestination::Run, shared_from_this ())); - m_ReadyCheckTimer.expires_from_now(boost::posix_time::seconds (1)); - m_ReadyCheckTimer.async_wait(std::bind(&LeaseSetDestination::HandleReadyCheckTimer, - shared_from_this (), std::placeholders::_1)); + return true; - } + } else return false; } - + bool LeaseSetDestination::Stop () - { + { if (m_IsRunning) - { + { m_CleanupTimer.cancel (); m_PublishConfirmationTimer.cancel (); - m_PublishVerificationTimer.cancel (); - m_ReadyCheckTimer.cancel (); + m_PublishVerificationTimer.cancel (); m_IsRunning = false; if (m_Pool) - { + { m_Pool->SetLocalDestination (nullptr); i2p::tunnel::tunnels.StopTunnelPool (m_Pool); - } + } m_Service.stop (); if (m_Thread) - { - m_Thread->join (); + { + m_Thread->join (); delete m_Thread; m_Thread = 0; - } + } SaveTags (); CleanUp (); // GarlicDestination return true; - } + } else return false; - } - + } + std::shared_ptr LeaseSetDestination::FindLeaseSet (const i2p::data::IdentHash& ident) { std::shared_ptr remoteLS; @@ -168,7 +164,7 @@ namespace client remoteLS = it->second; } - if (remoteLS) + if (remoteLS) { if (!remoteLS->IsExpired ()) { @@ -197,9 +193,9 @@ namespace client m_RemoteLeaseSets.erase (ident); return nullptr; } - } + } else - { + { auto ls = i2p::data::netdb.FindLeaseSet (ident); if (ls && !ls->IsExpired ()) { @@ -207,7 +203,7 @@ namespace client std::lock_guard _lock(m_RemoteLeaseSetsMutex); m_RemoteLeaseSets[ident] = ls; return ls; - } + } } return nullptr; } @@ -219,11 +215,11 @@ namespace client UpdateLeaseSet (); std::lock_guard l(m_LeaseSetMutex); return m_LeaseSet; - } + } void LeaseSetDestination::SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet) { - { + { std::lock_guard l(m_LeaseSetMutex); m_LeaseSet.reset (newLeaseSet); } @@ -233,26 +229,21 @@ namespace client m_PublishVerificationTimer.cancel (); Publish (); } - } - - void LeaseSetDestination::AddReadyCallback(ReadyCallback cb) - { - m_ReadyCallbacks.push_back(cb); - } - + } + void LeaseSetDestination::UpdateLeaseSet () { - int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels - if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum + int numTunnels = m_Pool->GetNumInboundTunnels () + 2; // 2 backup tunnels + if (numTunnels > i2p::data::MAX_NUM_LEASES) numTunnels = i2p::data::MAX_NUM_LEASES; // 16 tunnels maximum CreateNewLeaseSet (m_Pool->GetInboundTunnels (numTunnels)); - } + } bool LeaseSetDestination::SubmitSessionKey (const uint8_t * key, const uint8_t * tag) { struct { uint8_t k[32], t[32]; - } data; + } data; memcpy (data.k, key, 32); memcpy (data.t, tag, 32); auto s = shared_from_this (); @@ -265,42 +256,42 @@ namespace client void LeaseSetDestination::ProcessGarlicMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); + m_Service.post (std::bind (&LeaseSetDestination::HandleGarlicMessage, shared_from_this (), msg)); } void LeaseSetDestination::ProcessDeliveryStatusMessage (std::shared_ptr msg) { - m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); + m_Service.post (std::bind (&LeaseSetDestination::HandleDeliveryStatusMessage, shared_from_this (), msg)); } void LeaseSetDestination::HandleI2NPMessage (const uint8_t * buf, size_t len, std::shared_ptr from) { uint8_t typeID = buf[I2NP_HEADER_TYPEID_OFFSET]; switch (typeID) - { + { case eI2NPData: HandleDataMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); break; case eI2NPDeliveryStatus: // we assume tunnel tests non-encrypted HandleDeliveryStatusMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); - break; + break; case eI2NPDatabaseStore: HandleDatabaseStoreMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); break; case eI2NPDatabaseSearchReply: HandleDatabaseSearchReplyMessage (buf + I2NP_HEADER_SIZE, bufbe16toh (buf + I2NP_HEADER_SIZE_OFFSET)); - break; + break; default: i2p::HandleI2NPMessage (CreateI2NPMessage (buf, GetI2NPMessageLength (buf), from)); - } - } + } + } void LeaseSetDestination::HandleDatabaseStoreMessage (const uint8_t * buf, size_t len) { uint32_t replyToken = bufbe32toh (buf + DATABASE_STORE_REPLY_TOKEN_OFFSET); size_t offset = DATABASE_STORE_HEADER_SIZE; - if (replyToken) + if (replyToken) { LogPrint (eLogInfo, "Destination: Reply token is ignored for DatabaseStore"); offset += 36; @@ -316,8 +307,8 @@ namespace client { leaseSet = it->second; if (leaseSet->IsNewer (buf + offset, len - offset)) - { - leaseSet->Update (buf + offset, len - offset); + { + leaseSet->Update (buf + offset, len - offset); if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key) LogPrint (eLogDebug, "Destination: Remote LeaseSet updated"); else @@ -331,7 +322,7 @@ namespace client LogPrint (eLogDebug, "Destination: Remote LeaseSet is older. Not updated"); } else - { + { leaseSet = std::make_shared (buf + offset, len - offset); if (leaseSet->IsValid () && leaseSet->GetIdentHash () == key) { @@ -348,18 +339,18 @@ namespace client LogPrint (eLogError, "Destination: New remote LeaseSet failed"); leaseSet = nullptr; } - } - } + } + } else LogPrint (eLogError, "Destination: Unexpected client's DatabaseStore type ", buf[DATABASE_STORE_TYPE_OFFSET], ", dropped"); - + auto it1 = m_LeaseSetRequests.find (key); if (it1 != m_LeaseSetRequests.end ()) { it1->second->requestTimeoutTimer.cancel (); if (it1->second) it1->second->Complete (leaseSet); m_LeaseSetRequests.erase (it1); - } + } } void LeaseSetDestination::HandleDatabaseSearchReplyMessage (const uint8_t * buf, size_t len) @@ -373,7 +364,7 @@ namespace client auto request = it->second; bool found = false; if (request->excluded.size () < MAX_NUM_FLOODFILLS_PER_REQUEST) - { + { for (int i = 0; i < num; i++) { i2p::data::IdentHash peerHash (buf + 33 + i*32); @@ -381,28 +372,28 @@ namespace client { LogPrint (eLogInfo, "Destination: Found new floodfill, request it"); // TODO: recheck this message i2p::data::netdb.RequestDestination (peerHash); - } + } } - + auto floodfill = i2p::data::netdb.GetClosestFloodfill (key, request->excluded); if (floodfill) { LogPrint (eLogInfo, "Destination: Requesting ", key.ToBase64 (), " at ", floodfill->GetIdentHash ().ToBase64 ()); if (SendLeaseSetRequest (key, floodfill, request)) found = true; - } - } + } + } if (!found) - { + { LogPrint (eLogInfo, "Destination: ", key.ToBase64 (), " was not found on ", MAX_NUM_FLOODFILLS_PER_REQUEST, " floodfills"); request->Complete (nullptr); m_LeaseSetRequests.erase (key); - } - } - else + } + } + else LogPrint (eLogWarning, "Destination: Request for ", key.ToBase64 (), " not found"); - } - + } + void LeaseSetDestination::HandleDeliveryStatusMessage (std::shared_ptr msg) { uint32_t msgID = bufbe32toh (msg->GetPayload () + DELIVERY_STATUS_MSGID_OFFSET); @@ -414,20 +405,20 @@ namespace client // schedule verification m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_VERIFICATION_TIMEOUT)); m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); } else i2p::garlic::GarlicDestination::HandleDeliveryStatusMessage (msg); - } + } void LeaseSetDestination::SetLeaseSetUpdated () - { + { UpdateLeaseSet (); } - + void LeaseSetDestination::Publish () - { - if (!m_LeaseSet || !m_Pool) + { + if (!m_LeaseSet || !m_Pool) { LogPrint (eLogError, "Destination: Can't publish non-existing LeaseSet"); return; @@ -444,9 +435,9 @@ namespace client m_PublishDelayTimer.cancel (); m_PublishDelayTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_MIN_INTERVAL)); m_PublishDelayTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishDelayTimer, - shared_from_this (), std::placeholders::_1)); + shared_from_this (), std::placeholders::_1)); return; - } + } auto outbound = m_Pool->GetNextOutboundTunnel (); if (!outbound) { @@ -459,28 +450,28 @@ namespace client LogPrint (eLogError, "Destination: Can't publish LeaseSet. No inbound tunnels"); return; } - auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills); + auto floodfill = i2p::data::netdb.GetClosestFloodfill (m_LeaseSet->GetIdentHash (), m_ExcludedFloodfills); if (!floodfill) { LogPrint (eLogError, "Destination: Can't publish LeaseSet, no more floodfills found"); m_ExcludedFloodfills.clear (); return; - } + } m_ExcludedFloodfills.insert (floodfill->GetIdentHash ()); LogPrint (eLogDebug, "Destination: Publish LeaseSet of ", GetIdentHash ().ToBase32 ()); RAND_bytes ((uint8_t *)&m_PublishReplyToken, 4); - auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound)); + auto msg = WrapMessage (floodfill, i2p::CreateDatabaseStoreMsg (m_LeaseSet, m_PublishReplyToken, inbound)); m_PublishConfirmationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_CONFIRMATION_TIMEOUT)); m_PublishConfirmationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishConfirmationTimer, - shared_from_this (), std::placeholders::_1)); - outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg); + shared_from_this (), std::placeholders::_1)); + outbound->SendTunnelDataMsg (floodfill->GetIdentHash (), 0, msg); m_LastSubmissionTime = ts; } void LeaseSetDestination::HandlePublishConfirmationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) - { + { if (m_PublishReplyToken) { LogPrint (eLogWarning, "Destination: Publish confirmation was not received in ", PUBLISH_CONFIRMATION_TIMEOUT, " seconds, will try again"); @@ -493,25 +484,25 @@ namespace client void LeaseSetDestination::HandlePublishVerificationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) - { + { auto s = shared_from_this (); - RequestLeaseSet (GetIdentHash (), + RequestLeaseSet (GetIdentHash (), // "this" added due to bug in gcc 4.7-4.8 [s,this](std::shared_ptr leaseSet) { - if (leaseSet) + if (leaseSet) { if (s->m_LeaseSet && *s->m_LeaseSet == *leaseSet) { // we got latest LeasetSet LogPrint (eLogDebug, "Destination: published LeaseSet verified for ", GetIdentHash().ToBase32()); s->m_PublishVerificationTimer.expires_from_now (boost::posix_time::seconds(PUBLISH_REGULAR_VERIFICATION_INTERNAL)); - s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1)); + s->m_PublishVerificationTimer.async_wait (std::bind (&LeaseSetDestination::HandlePublishVerificationTimer, s, std::placeholders::_1)); return; - } + } else LogPrint (eLogDebug, "Destination: LeaseSet is different than just published for ", GetIdentHash().ToBase32()); - } + } else LogPrint (eLogWarning, "Destination: couldn't find published LeaseSet for ", GetIdentHash().ToBase32()); // we have to publish again @@ -524,16 +515,16 @@ namespace client { if (ecode != boost::asio::error::operation_aborted) Publish (); - } - + } + bool LeaseSetDestination::RequestDestination (const i2p::data::IdentHash& dest, RequestComplete requestComplete) { - if (!m_Pool || !IsReady ()) - { - if (requestComplete) + if (!m_Pool || !IsReady ()) + { + if (requestComplete) m_Service.post ([requestComplete](void){requestComplete (nullptr);}); return false; - } + } m_Service.post (std::bind (&LeaseSetDestination::RequestLeaseSet, shared_from_this (), dest, requestComplete)); return true; } @@ -545,14 +536,14 @@ namespace client { auto it = s->m_LeaseSetRequests.find (dest); if (it != s->m_LeaseSetRequests.end ()) - { - auto requestComplete = it->second; + { + auto requestComplete = it->second; s->m_LeaseSetRequests.erase (it); if (notify && requestComplete) requestComplete->Complete (nullptr); - } - }); + } + }); } - + void LeaseSetDestination::RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete) { std::set excluded; @@ -573,28 +564,28 @@ namespace client m_LeaseSetRequests.erase (ret.first); if (requestComplete) requestComplete (nullptr); } - } + } else // duplicate { LogPrint (eLogInfo, "Destination: Request of LeaseSet ", dest.ToBase64 (), " is pending already"); if (ts > ret.first->second->requestTime + MAX_LEASESET_REQUEST_TIMEOUT) - { + { // something went wrong m_LeaseSetRequests.erase (ret.first); if (requestComplete) requestComplete (nullptr); } else if (requestComplete) ret.first->second->requestComplete.push_back (requestComplete); - } - } + } + } else - { + { LogPrint (eLogError, "Destination: Can't request LeaseSet, no floodfills found"); if (requestComplete) requestComplete (nullptr); - } - } - - bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest, + } + } + + bool LeaseSetDestination::SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr nextFloodfill, std::shared_ptr request) { if (!request->replyTunnel || !request->replyTunnel->IsEstablished ()) @@ -603,36 +594,36 @@ namespace client if (!request->outboundTunnel || !request->outboundTunnel->IsEstablished ()) request->outboundTunnel = m_Pool->GetNextOutboundTunnel (); if (!request->outboundTunnel) LogPrint (eLogError, "Destination: Can't send LeaseSet request, no outbound tunnels found"); - + if (request->replyTunnel && request->outboundTunnel) - { + { request->excluded.insert (nextFloodfill->GetIdentHash ()); request->requestTimeoutTimer.cancel (); uint8_t replyKey[32], replyTag[32]; - RAND_bytes (replyKey, 32); // random session key + RAND_bytes (replyKey, 32); // random session key RAND_bytes (replyTag, 32); // random session tag AddSessionKey (replyKey, replyTag); auto msg = WrapMessage (nextFloodfill, - CreateLeaseSetDatabaseLookupMsg (dest, request->excluded, + CreateLeaseSetDatabaseLookupMsg (dest, request->excluded, request->replyTunnel, replyKey, replyTag)); request->outboundTunnel->SendTunnelDataMsg ( { - i2p::tunnel::TunnelMessageBlock - { + i2p::tunnel::TunnelMessageBlock + { i2p::tunnel::eDeliveryTypeRouter, nextFloodfill->GetIdentHash (), 0, msg } - }); + }); request->requestTimeoutTimer.expires_from_now (boost::posix_time::seconds(LEASESET_REQUEST_TIMEOUT)); request->requestTimeoutTimer.async_wait (std::bind (&LeaseSetDestination::HandleRequestTimoutTimer, shared_from_this (), std::placeholders::_1, dest)); - } + } else return false; return true; - } + } void LeaseSetDestination::HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest) { @@ -650,26 +641,26 @@ namespace client { // reset tunnels, because one them might fail it->second->outboundTunnel = nullptr; - it->second->replyTunnel = nullptr; + it->second->replyTunnel = nullptr; done = !SendLeaseSetRequest (dest, floodfill, it->second); } else done = true; } else - { + { LogPrint (eLogWarning, "Destination: ", dest.ToBase64 (), " was not found within ", MAX_LEASESET_REQUEST_TIMEOUT, " seconds"); done = true; } - + if (done) { - auto requestComplete = it->second; + auto requestComplete = it->second; m_LeaseSetRequests.erase (it); if (requestComplete) requestComplete->Complete (nullptr); - } - } - } + } + } + } } void LeaseSetDestination::HandleCleanupTimer (const boost::system::error_code& ecode) @@ -683,31 +674,7 @@ namespace client m_CleanupTimer.async_wait (std::bind (&LeaseSetDestination::HandleCleanupTimer, shared_from_this (), std::placeholders::_1)); } - } - - void LeaseSetDestination::HandleReadyCheckTimer(const boost::system::error_code & ec) - { - if (ec != boost::asio::error::operation_aborted) - { - // TODO: locking ? - if(IsReady()) - { - for (auto & itr : m_ReadyCallbacks) - { - itr(ec); - } - m_ReadyCallbacks.clear(); - } - } - else - { - for (auto & itr : m_ReadyCallbacks) - { - itr(ec); - } - m_ReadyCallbacks.clear(); - } - } + } void LeaseSetDestination::CleanupRemoteLeaseSets () { @@ -719,11 +686,11 @@ namespace client { LogPrint (eLogWarning, "Destination: Remote LeaseSet ", it->second->GetIdentHash ().ToBase64 (), " expired"); it = m_RemoteLeaseSets.erase (it); - } - else + } + else ++it; } - } + } ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params): LeaseSetDestination (isPublic, params), @@ -736,26 +703,26 @@ namespace client i2p::crypto::GenerateElGamalKeyPair(m_EncryptionPrivateKey, m_EncryptionPublicKey); if (isPublic) LogPrint (eLogInfo, "Destination: Local address ", GetIdentHash().ToBase32 (), " created"); - } + } - ClientDestination::~ClientDestination () + ClientDestination::~ClientDestination () { - } - + } + bool ClientDestination::Start () { if (LeaseSetDestination::Start ()) - { + { m_StreamingDestination = std::make_shared (GetSharedFromThis ()); // TODO: - m_StreamingDestination->Start (); + m_StreamingDestination->Start (); for (auto& it: m_StreamingDestinationsByPorts) it.second->Start (); return true; - } + } else return false; - } - + } + bool ClientDestination::Stop () { if (LeaseSetDestination::Stop ()) @@ -765,21 +732,21 @@ namespace client //m_StreamingDestination->SetOwner (nullptr); m_StreamingDestination = nullptr; for (auto& it: m_StreamingDestinationsByPorts) - { + { it.second->Stop (); //it.second->SetOwner (nullptr); } m_StreamingDestinationsByPorts.clear (); if (m_DatagramDestination) - { + { delete m_DatagramDestination; m_DatagramDestination = nullptr; - } + } return true; } else return false; - } + } #ifdef I2LUA void ClientDestination::Ready(ReadyPromise & p) @@ -806,14 +773,14 @@ namespace client ScheduleCheckForReady(p); } #endif - + void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len) { uint32_t length = bufbe32toh (buf); buf += 4; // we assume I2CP payload uint16_t fromPort = bufbe16toh (buf + 4), // source - toPort = bufbe16toh (buf + 6); // destination + toPort = bufbe16toh (buf + 6); // destination switch (buf[9]) { case PROTOCOL_TYPE_STREAMING: @@ -837,41 +804,28 @@ namespace client LogPrint (eLogError, "Destination: Data: unexpected protocol ", buf[9]); } } - - void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) + + void ClientDestination::CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port) { - if (!streamRequestComplete) + if (!streamRequestComplete) { LogPrint (eLogError, "Destination: request callback is not specified in CreateStream"); return; - } - if(IsReady()) - { - auto leaseSet = FindLeaseSet (dest); - if (leaseSet) - streamRequestComplete(CreateStream (leaseSet, port)); - else - { - auto s = GetSharedFromThis (); - RequestDestination (dest, - [s, streamRequestComplete, port](std::shared_ptr ls) - { - if (ls) - streamRequestComplete(s->CreateStream (ls, port)); - else - streamRequestComplete (nullptr); - }); - } - } + } + auto leaseSet = FindLeaseSet (dest); + if (leaseSet) + streamRequestComplete(CreateStream (leaseSet, port)); else { - // call if tunnel is not ready - AddReadyCallback([&](const boost::system::error_code & ec) { - if(ec) - streamRequestComplete(nullptr); + auto s = GetSharedFromThis (); + RequestDestination (dest, + [s, streamRequestComplete, port](std::shared_ptr ls) + { + if (ls) + streamRequestComplete(s->CreateStream (ls, port)); else - CreateStream(streamRequestComplete, dest, port); - }); + streamRequestComplete (nullptr); + }); } } @@ -883,18 +837,18 @@ namespace client return nullptr; } - std::shared_ptr ClientDestination::GetStreamingDestination (int port) const - { - if (port) + std::shared_ptr ClientDestination::GetStreamingDestination (int port) const + { + if (port) { auto it = m_StreamingDestinationsByPorts.find (port); if (it != m_StreamingDestinationsByPorts.end ()) return it->second; - } + } // if port is zero or not found, use default destination - return m_StreamingDestination; + return m_StreamingDestination; } - + void ClientDestination::AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor) { if (m_StreamingDestination) @@ -906,35 +860,35 @@ namespace client if (m_StreamingDestination) m_StreamingDestination->ResetAcceptor (); } - + bool ClientDestination::IsAcceptingStreams () const { if (m_StreamingDestination) return m_StreamingDestination->IsAcceptorSet (); return false; - } + } void ClientDestination::AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor) { if (m_StreamingDestination) m_StreamingDestination->AcceptOnce (acceptor); - } - + } + std::shared_ptr ClientDestination::CreateStreamingDestination (int port, bool gzip) { - auto dest = std::make_shared (GetSharedFromThis (), port, gzip); + auto dest = std::make_shared (GetSharedFromThis (), port, gzip); if (port) m_StreamingDestinationsByPorts[port] = dest; - else // update default + else // update default m_StreamingDestination = dest; return dest; - } - + } + i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination () { if (m_DatagramDestination == nullptr) m_DatagramDestination = new i2p::datagram::DatagramDestination (GetSharedFromThis ()); - return m_DatagramDestination; + return m_DatagramDestination; } std::vector > ClientDestination::GetAllStreams () const @@ -944,12 +898,12 @@ namespace client { for (auto& it: m_StreamingDestination->GetStreams ()) ret.push_back (it.second); - } + } for (auto& it: m_StreamingDestinationsByPorts) for (auto& it1: it.second->GetStreams ()) ret.push_back (it1.second); return ret; - } + } void ClientDestination::PersistTemporaryKeys () { @@ -973,7 +927,7 @@ namespace client return; } LogPrint(eLogError, "Destinations: Can't save keys to ", path); - } + } void ClientDestination::CreateNewLeaseSet (std::vector > tunnels) { @@ -981,7 +935,7 @@ namespace client // sign Sign (leaseSet->GetBuffer (), leaseSet->GetBufferLen () - leaseSet->GetSignatureLen (), leaseSet->GetSignature ()); // TODO SetLeaseSet (leaseSet); - } + } void ClientDestination::CleanupDestination () { diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h index 4e299322..ef7437fb 100644 --- a/libi2pd/Destination.h +++ b/libi2pd/Destination.h @@ -63,7 +63,6 @@ namespace client public std::enable_shared_from_this { typedef std::function leaseSet)> RequestComplete; - typedef std::function ReadyCallback; // leaseSet = nullptr means not found struct LeaseSetRequest { @@ -109,8 +108,6 @@ namespace client void ProcessDeliveryStatusMessage (std::shared_ptr msg); void SetLeaseSetUpdated (); - void AddReadyCallback(ReadyCallback cb); - protected: void SetLeaseSet (i2p::data::LocalLeaseSet * newLeaseSet); @@ -134,8 +131,7 @@ namespace client void RequestLeaseSet (const i2p::data::IdentHash& dest, RequestComplete requestComplete); bool SendLeaseSetRequest (const i2p::data::IdentHash& dest, std::shared_ptr nextFloodfill, std::shared_ptr request); void HandleRequestTimoutTimer (const boost::system::error_code& ecode, const i2p::data::IdentHash& dest); - void HandleCleanupTimer (const boost::system::error_code& ecode); - void HandleReadyCheckTimer (const boost::system::error_code& ecode); + void HandleCleanupTimer (const boost::system::error_code& ecode); void CleanupRemoteLeaseSets (); private: @@ -156,9 +152,7 @@ namespace client std::set m_ExcludedFloodfills; // for publishing boost::asio::deadline_timer m_PublishConfirmationTimer, m_PublishVerificationTimer, - m_PublishDelayTimer, m_CleanupTimer, m_ReadyCheckTimer; - - std::vector m_ReadyCallbacks; + m_PublishDelayTimer, m_CleanupTimer; public: @@ -188,9 +182,9 @@ namespace client void Sign (const uint8_t * buf, int len, uint8_t * signature) const { m_Keys.Sign (buf, len, signature); }; // ref counter - int Acquire () { return ++m_RefCounter; }; + int Acquire () { return ++m_RefCounter; }; int Release () { return --m_RefCounter; }; - int GetRefCounter () const { return m_RefCounter; }; + int GetRefCounter () const { return m_RefCounter; }; // streaming std::shared_ptr CreateStreamingDestination (int port, bool gzip = true); // additional