From 3167ae21b0d95a9445e8ea96957811ac91eed2fe Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 22 Oct 2016 20:08:15 -0400 Subject: [PATCH] send own LeasetSet through a stalled stream --- Garlic.h | 3 ++- Streaming.cpp | 26 ++++++++++++++++++++++++++ Streaming.h | 24 ++++++++++++++++++------ 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/Garlic.h b/Garlic.h index 76d8eaf3..0fa5403c 100644 --- a/Garlic.h +++ b/Garlic.h @@ -105,7 +105,8 @@ namespace garlic if (m_LeaseSetUpdateStatus != eLeaseSetDoNotSend) m_LeaseSetUpdateStatus = eLeaseSetUpdated; }; bool IsLeaseSetNonConfirmed () const { return m_LeaseSetUpdateStatus == eLeaseSetSubmitted; }; - + bool IsLeaseSetUpdated () const { return m_LeaseSetUpdateStatus == eLeaseSetUpdated; }; + std::shared_ptr GetSharedRoutingPath (); void SetSharedRoutingPath (std::shared_ptr path); diff --git a/Streaming.cpp b/Streaming.cpp index 02e738c8..78465b87 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -659,6 +659,29 @@ namespace stream LogPrint (eLogWarning, "Streaming: All leases are expired, sSID=", m_SendStreamID); } + void Stream::SendUpdatedLeaseSet () + { + if (m_RoutingSession && m_RoutingSession->IsLeaseSetUpdated ()) + { + if (!m_CurrentRemoteLease) + UpdateCurrentRemoteLease (true); + if (m_CurrentRemoteLease) + { + auto msg = m_RoutingSession->WrapSingleMessage (nullptr); + auto outboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (); + if (outboundTunnel) + m_CurrentOutboundTunnel->SendTunnelDataMsg ( + { + i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeTunnel, + m_CurrentRemoteLease->tunnelGateway, m_CurrentRemoteLease->tunnelID, + msg + } + }); + } + } + } void Stream::ScheduleResend () { @@ -760,7 +783,10 @@ namespace stream { m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ()); if (!m_RemoteLeaseSet) + { LogPrint (eLogWarning, "Streaming: LeaseSet ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " not found"); + m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ()); // try to request for a next attempt + } } if (m_RemoteLeaseSet) { diff --git a/Streaming.h b/Streaming.h index efa4d69e..75ce4c56 100644 --- a/Streaming.h +++ b/Streaming.h @@ -51,6 +51,7 @@ namespace stream const int INITIAL_RTO = 9000; // in milliseconds const size_t MAX_PENDING_INCOMING_BACKLOG = 128; const int PENDING_INCOMING_TIMEOUT = 10; // in seconds + const int MAX_RECEIVE_TIMEOUT = 60; // in seconds /** i2cp option for limiting inbound stremaing connections */ const char I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN[] = "maxconns"; @@ -161,6 +162,7 @@ namespace stream void SendClose (); bool SendPacket (Packet * packet); void SendPackets (const std::vector& packets); + void SendUpdatedLeaseSet (); void SavePacket (Packet * packet); void ProcessPacket (Packet * packet); @@ -170,7 +172,7 @@ namespace stream void UpdateCurrentRemoteLease (bool expired = false); template - void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler); + void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout); void ScheduleResend (); void HandleResendTimer (const boost::system::error_code& ecode); @@ -282,18 +284,19 @@ namespace stream m_Service.post ([=](void) { if (!m_ReceiveQueue.empty () || m_Status == eStreamStatusReset) - s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler); + s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0); else { - s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(timeout)); + int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout; + s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t)); s->m_ReceiveTimer.async_wait ([=](const boost::system::error_code& ecode) - { s->HandleReceiveTimer (ecode, buffer, handler); }); + { s->HandleReceiveTimer (ecode, buffer, handler, timeout - t); }); } }); } template - void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler) + void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout) { size_t received = ConcatenatePackets (boost::asio::buffer_cast(buffer), boost::asio::buffer_size(buffer)); if (received > 0) @@ -307,8 +310,17 @@ namespace stream handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), 0); } else + { // timeout expired - handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), received); + if (remainingTimeout <= 0) + handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), received); + else + { + // itermediate iterrupt + SendUpdatedLeaseSet (); // send our leaseset if applicable + AsyncReceive (buffer, handler, remainingTimeout); + } + } } } }