Browse Source

send own LeasetSet through a stalled stream

pull/685/head
orignal 8 years ago
parent
commit
3167ae21b0
  1. 3
      Garlic.h
  2. 26
      Streaming.cpp
  3. 24
      Streaming.h

3
Garlic.h

@ -105,7 +105,8 @@ namespace garlic @@ -105,7 +105,8 @@ namespace garlic
if (m_LeaseSetUpdateStatus != eLeaseSetDoNotSend) m_LeaseSetUpdateStatus = eLeaseSetUpdated;
};
bool IsLeaseSetNonConfirmed () const { return m_LeaseSetUpdateStatus == eLeaseSetSubmitted; };
bool IsLeaseSetUpdated () const { return m_LeaseSetUpdateStatus == eLeaseSetUpdated; };
std::shared_ptr<GarlicRoutingPath> GetSharedRoutingPath ();
void SetSharedRoutingPath (std::shared_ptr<GarlicRoutingPath> path);

26
Streaming.cpp

@ -659,6 +659,29 @@ namespace stream @@ -659,6 +659,29 @@ namespace stream
LogPrint (eLogWarning, "Streaming: All leases are expired, sSID=", m_SendStreamID);
}
void Stream::SendUpdatedLeaseSet ()
{
if (m_RoutingSession && m_RoutingSession->IsLeaseSetUpdated ())
{
if (!m_CurrentRemoteLease)
UpdateCurrentRemoteLease (true);
if (m_CurrentRemoteLease)
{
auto msg = m_RoutingSession->WrapSingleMessage (nullptr);
auto outboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel ();
if (outboundTunnel)
m_CurrentOutboundTunnel->SendTunnelDataMsg (
{
i2p::tunnel::TunnelMessageBlock
{
i2p::tunnel::eDeliveryTypeTunnel,
m_CurrentRemoteLease->tunnelGateway, m_CurrentRemoteLease->tunnelID,
msg
}
});
}
}
}
void Stream::ScheduleResend ()
{
@ -760,7 +783,10 @@ namespace stream @@ -760,7 +783,10 @@ namespace stream
{
m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ());
if (!m_RemoteLeaseSet)
{
LogPrint (eLogWarning, "Streaming: LeaseSet ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " not found");
m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ()); // try to request for a next attempt
}
}
if (m_RemoteLeaseSet)
{

24
Streaming.h

@ -51,6 +51,7 @@ namespace stream @@ -51,6 +51,7 @@ namespace stream
const int INITIAL_RTO = 9000; // in milliseconds
const size_t MAX_PENDING_INCOMING_BACKLOG = 128;
const int PENDING_INCOMING_TIMEOUT = 10; // in seconds
const int MAX_RECEIVE_TIMEOUT = 60; // in seconds
/** i2cp option for limiting inbound stremaing connections */
const char I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN[] = "maxconns";
@ -161,6 +162,7 @@ namespace stream @@ -161,6 +162,7 @@ namespace stream
void SendClose ();
bool SendPacket (Packet * packet);
void SendPackets (const std::vector<Packet *>& packets);
void SendUpdatedLeaseSet ();
void SavePacket (Packet * packet);
void ProcessPacket (Packet * packet);
@ -170,7 +172,7 @@ namespace stream @@ -170,7 +172,7 @@ namespace stream
void UpdateCurrentRemoteLease (bool expired = false);
template<typename Buffer, typename ReceiveHandler>
void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler);
void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout);
void ScheduleResend ();
void HandleResendTimer (const boost::system::error_code& ecode);
@ -282,18 +284,19 @@ namespace stream @@ -282,18 +284,19 @@ namespace stream
m_Service.post ([=](void)
{
if (!m_ReceiveQueue.empty () || m_Status == eStreamStatusReset)
s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler);
s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0);
else
{
s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(timeout));
int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout;
s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t));
s->m_ReceiveTimer.async_wait ([=](const boost::system::error_code& ecode)
{ s->HandleReceiveTimer (ecode, buffer, handler); });
{ s->HandleReceiveTimer (ecode, buffer, handler, timeout - t); });
}
});
}
template<typename Buffer, typename ReceiveHandler>
void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler)
void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler, int remainingTimeout)
{
size_t received = ConcatenatePackets (boost::asio::buffer_cast<uint8_t *>(buffer), boost::asio::buffer_size(buffer));
if (received > 0)
@ -307,8 +310,17 @@ namespace stream @@ -307,8 +310,17 @@ namespace stream
handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), 0);
}
else
{
// timeout expired
handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), received);
if (remainingTimeout <= 0)
handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), received);
else
{
// itermediate iterrupt
SendUpdatedLeaseSet (); // send our leaseset if applicable
AsyncReceive (buffer, handler, remainingTimeout);
}
}
}
}
}

Loading…
Cancel
Save