diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index f7737646..63cf20a2 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -474,6 +474,29 @@ namespace stream Close (); // check is all outgoing messages have been sent and we can send close } + size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) + { + if (!len) return 0; + size_t ret = 0; + std::condition_variable newDataReceived; + std::mutex newDataReceivedMutex; + std::unique_lock l(newDataReceivedMutex); + AsyncReceive (boost::asio::buffer (buf, len), + [&ret, &newDataReceived, &newDataReceivedMutex](const boost::system::error_code& ecode, std::size_t bytes_transferred) + { + if (ecode == boost::asio::error::timed_out) + ret = 0; + else + ret = bytes_transferred; + std::unique_lock l(newDataReceivedMutex); + newDataReceived.notify_all (); + }, + timeout); + if (newDataReceived.wait_for (l, std::chrono::seconds (timeout)) == std::cv_status::timeout) + ret = 0; + return ret; + } + size_t Stream::Send (const uint8_t * buf, size_t len) { AsyncSend (buf, len, nullptr); diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 19f2f8f6..1662f0e4 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -185,7 +185,8 @@ namespace stream template void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); }; - + size_t Receive (uint8_t * buf, size_t len, int timeout); + void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); }; /** only call close from destination thread, use Stream::AsyncClose for other threads */ @@ -336,11 +337,10 @@ namespace stream int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout; s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t)); int left = timeout - t; - auto self = s->shared_from_this(); - self->m_ReceiveTimer.async_wait ( - [self, buffer, handler, left](const boost::system::error_code & ec) + s->m_ReceiveTimer.async_wait ( + [s, buffer, handler, left](const boost::system::error_code & ec) { - self->HandleReceiveTimer(ec, buffer, handler, left); + s->HandleReceiveTimer(ec, buffer, handler, left); }); } }); diff --git a/libi2pd_client/AddressBook.cpp b/libi2pd_client/AddressBook.cpp index 3a35a817..62a619c4 100644 --- a/libi2pd_client/AddressBook.cpp +++ b/libi2pd_client/AddressBook.cpp @@ -892,24 +892,20 @@ namespace client int numAttempts = 0; while (!end) { - stream->AsyncReceive (boost::asio::buffer (recv_buf, 4096), - [&](const boost::system::error_code& ecode, std::size_t bytes_transferred) - { - if (bytes_transferred) - response.append ((char *)recv_buf, bytes_transferred); - if (ecode == boost::asio::error::timed_out || !stream->IsOpen ()) - end = true; - newDataReceived.notify_all (); - }, - SUBSCRIPTION_REQUEST_TIMEOUT); - std::unique_lock l(newDataReceivedMutex); - // wait 1 more second - if (newDataReceived.wait_for (l, std::chrono::seconds (SUBSCRIPTION_REQUEST_TIMEOUT + 1)) == std::cv_status::timeout) + size_t received = stream->Receive (recv_buf, 4096, SUBSCRIPTION_REQUEST_TIMEOUT); + if (received) + { + response.append ((char *)recv_buf, received); + if (!stream->IsOpen ()) end = true; + } + else if (!stream->IsOpen ()) + end = true; + else { LogPrint (eLogError, "Addressbook: Subscriptions request timeout expired"); numAttempts++; if (numAttempts > 5) end = true; - } + } } // process remaining buffer while (size_t len = stream->ReadSome (recv_buf, sizeof(recv_buf)))