From 9fd60b52f1d59b18dcc7e7f179cc33021887c759 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 8 Nov 2022 19:52:43 -0500 Subject: [PATCH] sync StreamCreate --- libi2pd/Destination.cpp | 29 ++++++++++++++++++++ libi2pd/Destination.h | 5 ++++ libi2pd_client/AddressBook.cpp | 49 +++++++++++----------------------- 3 files changed, 49 insertions(+), 34 deletions(-) diff --git a/libi2pd/Destination.cpp b/libi2pd/Destination.cpp index 719f5830..581fc733 100644 --- a/libi2pd/Destination.cpp +++ b/libi2pd/Destination.cpp @@ -1129,6 +1129,35 @@ namespace client }); } + template + std::shared_ptr ClientDestination::CreateStreamSync (const Dest& dest, int port) + { + std::shared_ptr stream; + std::condition_variable streamRequestComplete; + std::mutex streamRequestCompleteMutex; + std::unique_lock l(streamRequestCompleteMutex); + CreateStream ( + [&streamRequestComplete, &streamRequestCompleteMutex, &stream](std::shared_ptr s) + { + stream = s; + std::unique_lock l(streamRequestCompleteMutex); + streamRequestComplete.notify_all (); + }, + dest, port); + streamRequestComplete.wait (l); + return stream; + } + + std::shared_ptr ClientDestination::CreateStream (const i2p::data::IdentHash& dest, int port) + { + return CreateStreamSync (dest, port); + } + + std::shared_ptr ClientDestination::CreateStream (std::shared_ptr dest, int port) + { + return CreateStreamSync (dest, port); + } + std::shared_ptr ClientDestination::CreateStream (std::shared_ptr remote, int port) { if (m_StreamingDestination) diff --git a/libi2pd/Destination.h b/libi2pd/Destination.h index 4b08ec51..9d369a92 100644 --- a/libi2pd/Destination.h +++ b/libi2pd/Destination.h @@ -247,6 +247,8 @@ namespace client // following methods operate with default streaming destination void CreateStream (StreamRequestComplete streamRequestComplete, const i2p::data::IdentHash& dest, int port = 0); void CreateStream (StreamRequestComplete streamRequestComplete, std::shared_ptr dest, int port = 0); + std::shared_ptr CreateStream (const i2p::data::IdentHash& dest, int port = 0); // sync + std::shared_ptr CreateStream (std::shared_ptr dest, int port = 0); // sync std::shared_ptr CreateStream (std::shared_ptr remote, int port = 0); void SendPing (const i2p::data::IdentHash& to); void SendPing (std::shared_ptr to); @@ -282,6 +284,9 @@ namespace client void PersistTemporaryKeys (EncryptionKey * keys, bool isSingleKey); void ReadAuthKey (const std::string& group, const std::map * params); + template + std::shared_ptr CreateStreamSync (const Dest& dest, int port); + private: i2p::data::PrivateKeys m_Keys; diff --git a/libi2pd_client/AddressBook.cpp b/libi2pd_client/AddressBook.cpp index 62a619c4..07f91d2e 100644 --- a/libi2pd_client/AddressBook.cpp +++ b/libi2pd_client/AddressBook.cpp @@ -833,40 +833,22 @@ namespace client } else m_Ident = addr->identHash; - /* this code block still needs some love */ - std::condition_variable newDataReceived; - std::mutex newDataReceivedMutex; - auto leaseSet = i2p::client::context.GetSharedLocalDestination ()->FindLeaseSet (m_Ident); - if (!leaseSet) + // save url parts for later use + std::string dest_host = url.host; + int dest_port = url.port ? url.port : 80; + // try to create stream to addressbook site + auto stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (m_Ident, dest_port); + if (!stream) { - std::unique_lock l(newDataReceivedMutex); - i2p::client::context.GetSharedLocalDestination ()->RequestDestination (m_Ident, - [&newDataReceived, &leaseSet, &newDataReceivedMutex](std::shared_ptr ls) - { - leaseSet = ls; - std::unique_lock l1(newDataReceivedMutex); - newDataReceived.notify_all (); - }); - if (newDataReceived.wait_for (l, std::chrono::seconds (SUBSCRIPTION_REQUEST_TIMEOUT)) == std::cv_status::timeout) - { - LogPrint (eLogError, "Addressbook: Subscription LeaseSet request timeout expired"); - i2p::client::context.GetSharedLocalDestination ()->CancelDestinationRequest (m_Ident, false); // don't notify, because we know it already - return false; - } - } - if (!leaseSet) { - /* still no leaseset found */ LogPrint (eLogError, "Addressbook: LeaseSet for address ", url.host, " not found"); return false; - } - if (m_Etag.empty() && m_LastModified.empty()) { + } + if (m_Etag.empty() && m_LastModified.empty()) + { m_Book.GetEtag (m_Ident, m_Etag, m_LastModified); LogPrint (eLogDebug, "Addressbook: Loaded for ", url.host, ": ETag: ", m_Etag, ", Last-Modified: ", m_LastModified); } - /* save url parts for later use */ - std::string dest_host = url.host; - int dest_port = url.port ? url.port : 80; - /* create http request & send it */ + // create http request & send it i2p::http::HTTPReq req; req.AddHeader("Host", dest_host); req.AddHeader("User-Agent", "Wget/1.11.4"); @@ -877,15 +859,14 @@ namespace client req.AddHeader("If-None-Match", m_Etag); if (!m_LastModified.empty()) req.AddHeader("If-Modified-Since", m_LastModified); - /* convert url to relative */ + // convert url to relative url.schema = ""; url.host = ""; req.uri = url.to_string(); req.version = "HTTP/1.1"; - auto stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (leaseSet, dest_port); std::string request = req.to_string(); stream->Send ((const uint8_t *) request.data(), request.length()); - /* read response */ + // read response std::string response; uint8_t recv_buf[4096]; bool end = false; @@ -910,7 +891,7 @@ namespace client // process remaining buffer while (size_t len = stream->ReadSome (recv_buf, sizeof(recv_buf))) response.append ((char *)recv_buf, len); - /* parse response */ + // parse response i2p::http::HTTPRes res; int res_head_len = res.parse(response); if (res_head_len < 0) @@ -923,7 +904,7 @@ namespace client LogPrint(eLogError, "Addressbook: Incomplete http response from ", dest_host, ", interrupted by timeout"); return false; } - /* assert: res_head_len > 0 */ + // assert: res_head_len > 0 response.erase(0, res_head_len); if (res.code == 304) { @@ -946,7 +927,7 @@ namespace client LogPrint(eLogError, "Addressbook: Response size mismatch, expected: ", len, ", got: ", response.length(), "bytes"); return false; } - /* assert: res.code == 200 */ + // assert: res.code == 200 auto it = res.headers.find("ETag"); if (it != res.headers.end()) m_Etag = it->second; it = res.headers.find("Last-Modified");