Browse Source

use same outbound tunnel for streaming as long as possible

pull/93/head
orignal 10 years ago
parent
commit
dda80703d2
  1. 14
      Streaming.cpp
  2. 1
      Streaming.h
  3. 20
      TunnelPool.cpp
  4. 8
      TunnelPool.h

14
Streaming.cpp

@ -21,7 +21,8 @@ namespace stream
const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0),
m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false),
m_LeaseSetUpdated (true), m_LocalDestination (local), m_LeaseSetUpdated (true), m_LocalDestination (local),
m_RemoteLeaseSet (&remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) m_RemoteLeaseSet (&remote), m_CurrentOutboundTunnel (nullptr),
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
@ -30,7 +31,8 @@ namespace stream
Stream::Stream (boost::asio::io_service& service, StreamingDestination * local): Stream::Stream (boost::asio::io_service& service, StreamingDestination * local):
m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1),
m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local),
m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) m_RemoteLeaseSet (nullptr), m_CurrentOutboundTunnel (nullptr),
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
} }
@ -102,6 +104,7 @@ namespace stream
{ {
// we have received duplicate. Most likely our outbound tunnel is dead // we have received duplicate. Most likely our outbound tunnel is dead
LogPrint ("Duplicate message ", receivedSeqn, " received"); LogPrint ("Duplicate message ", receivedSeqn, " received");
m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel
UpdateCurrentRemoteLease (); // pick another lease UpdateCurrentRemoteLease (); // pick another lease
SendQuickAck (); // resend ack for previous message again SendQuickAck (); // resend ack for previous message again
delete packet; // packet dropped delete packet; // packet dropped
@ -401,8 +404,8 @@ namespace stream
m_LeaseSetUpdated = false; m_LeaseSetUpdated = false;
} }
auto outboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
if (outboundTunnel) if (m_CurrentOutboundTunnel)
{ {
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
if (ts >= m_CurrentRemoteLease.endDate) if (ts >= m_CurrentRemoteLease.endDate)
@ -422,7 +425,7 @@ namespace stream
}); });
leaseSet = nullptr; // send leaseSet only one time leaseSet = nullptr; // send leaseSet only one time
} }
outboundTunnel->SendTunnelDataMsg (msgs); m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs);
} }
else else
LogPrint ("All leases are expired"); LogPrint ("All leases are expired");
@ -458,6 +461,7 @@ namespace stream
} }
if (packets.size () > 0) if (packets.size () > 0)
{ {
m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel
UpdateCurrentRemoteLease (); // pick another lease UpdateCurrentRemoteLease (); // pick another lease
SendPackets (packets); SendPackets (packets);
} }

1
Streaming.h

@ -128,6 +128,7 @@ namespace stream
i2p::data::Identity m_RemoteIdentity; i2p::data::Identity m_RemoteIdentity;
const i2p::data::LeaseSet * m_RemoteLeaseSet; const i2p::data::LeaseSet * m_RemoteLeaseSet;
i2p::data::Lease m_CurrentRemoteLease; i2p::data::Lease m_CurrentRemoteLease;
i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel;
std::queue<Packet *> m_ReceiveQueue; std::queue<Packet *> m_ReceiveQueue;
std::set<Packet *, PacketCmp> m_SavedPackets; std::set<Packet *, PacketCmp> m_SavedPackets;
std::set<Packet *, PacketCmp> m_SentPackets; std::set<Packet *, PacketCmp> m_SentPackets;

20
TunnelPool.cpp

@ -74,23 +74,29 @@ namespace tunnel
return v; return v;
} }
OutboundTunnel * TunnelPool::GetNextOutboundTunnel () OutboundTunnel * TunnelPool::GetNextOutboundTunnel (OutboundTunnel * suggested)
{ {
return GetNextTunnel (m_OutboundTunnels); return GetNextTunnel (m_OutboundTunnels, suggested);
} }
InboundTunnel * TunnelPool::GetNextInboundTunnel () InboundTunnel * TunnelPool::GetNextInboundTunnel (InboundTunnel * suggested)
{ {
return GetNextTunnel (m_InboundTunnels); return GetNextTunnel (m_InboundTunnels, suggested);
} }
template<class TTunnels> template<class TTunnels>
typename TTunnels::value_type TunnelPool::GetNextTunnel (TTunnels& tunnels) typename TTunnels::value_type TunnelPool::GetNextTunnel (TTunnels& tunnels,
typename TTunnels::value_type suggested)
{ {
if (tunnels.empty ()) return nullptr; if (tunnels.empty ()) return nullptr;
uint64_t ts = i2p::util::GetSecondsSinceEpoch ();
if (suggested && tunnels.count (suggested) > 0 &&
ts + TUNNEL_EXPIRATION_THRESHOLD <= suggested->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT)
return suggested;
for (auto it: tunnels) for (auto it: tunnels)
if (!it->IsFailed ()) if (!it->IsFailed () && ts + TUNNEL_EXPIRATION_THRESHOLD <=
return it; it->GetCreationTime () + TUNNEL_EXPIRATION_TIMEOUT)
return it;
return nullptr; return nullptr;
} }

8
TunnelPool.h

@ -19,6 +19,7 @@ namespace tunnel
class InboundTunnel; class InboundTunnel;
class OutboundTunnel; class OutboundTunnel;
const int TUNNEL_EXPIRATION_THRESHOLD = 60; // 1 minute
class TunnelPool // per local destination class TunnelPool // per local destination
{ {
public: public:
@ -37,8 +38,8 @@ namespace tunnel
void TunnelCreated (OutboundTunnel * createdTunnel); void TunnelCreated (OutboundTunnel * createdTunnel);
void TunnelExpired (OutboundTunnel * expiredTunnel); void TunnelExpired (OutboundTunnel * expiredTunnel);
std::vector<InboundTunnel *> GetInboundTunnels (int num) const; std::vector<InboundTunnel *> GetInboundTunnels (int num) const;
OutboundTunnel * GetNextOutboundTunnel (); OutboundTunnel * GetNextOutboundTunnel (OutboundTunnel * suggested = nullptr);
InboundTunnel * GetNextInboundTunnel (); InboundTunnel * GetNextInboundTunnel (InboundTunnel * suggested = nullptr);
const i2p::data::IdentHash& GetIdentHash () { return m_LocalDestination.GetIdentHash (); }; const i2p::data::IdentHash& GetIdentHash () { return m_LocalDestination.GetIdentHash (); };
void TestTunnels (); void TestTunnels ();
@ -51,7 +52,8 @@ namespace tunnel
void RecreateInboundTunnel (InboundTunnel * tunnel); void RecreateInboundTunnel (InboundTunnel * tunnel);
void RecreateOutboundTunnel (OutboundTunnel * tunnel); void RecreateOutboundTunnel (OutboundTunnel * tunnel);
template<class TTunnels> template<class TTunnels>
typename TTunnels::value_type GetNextTunnel (TTunnels& tunnels); typename TTunnels::value_type GetNextTunnel (TTunnels& tunnels,
typename TTunnels::value_type suggested = nullptr);
private: private:

Loading…
Cancel
Save