Browse Source

better resend and tunnel reselection algorithm

pull/167/head
orignal 10 years ago
parent
commit
5f3b17af64
  1. 55
      Streaming.cpp
  2. 6
      Streaming.h
  3. 13
      TunnelPool.cpp
  4. 6
      TunnelPool.h

55
Streaming.cpp

@ -18,7 +18,7 @@ namespace stream
m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service),
m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port),
m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO),
m_LastWindowSizeIncreaseTime (0) m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease (); UpdateCurrentRemoteLease ();
@ -29,7 +29,7 @@ namespace stream
m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service),
m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE),
m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0) m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0)
{ {
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
} }
@ -267,7 +267,10 @@ namespace stream
if (m_SentPackets.empty ()) if (m_SentPackets.empty ())
m_ResendTimer.cancel (); m_ResendTimer.cancel ();
if (acknowledged) if (acknowledged)
{
m_NumResendAttempts = 0;
SendBuffer (); SendBuffer ();
}
if (m_Status == eStreamStatusClosing) if (m_Status == eStreamStatusClosing)
Close (); // all outgoing messages have been sent Close (); // all outgoing messages have been sent
} }
@ -604,43 +607,49 @@ namespace stream
{ {
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
{ {
// check for resend attempts
if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS)
{
LogPrint (eLogWarning, "Stream packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate");
m_Status = eStreamStatusReset;
Close ();
return;
}
// collect packets to resend
auto ts = i2p::util::GetMillisecondsSinceEpoch (); auto ts = i2p::util::GetMillisecondsSinceEpoch ();
bool congesion = false, first = true;
std::vector<Packet *> packets; std::vector<Packet *> packets;
for (auto it : m_SentPackets) for (auto it : m_SentPackets)
{ {
if (ts < it->sendTime + m_RTO) continue; // don't resend too early if (ts >= it->sendTime + m_RTO)
it->numResendAttempts++;
if (first && it->numResendAttempts == 1) // detect congesion at first attempt of first packet only
congesion = true;
first = false;
if (it->numResendAttempts <= MAX_NUM_RESEND_ATTEMPTS)
{ {
it->sendTime = ts; it->sendTime = ts;
packets.push_back (it); packets.push_back (it);
} }
else
{
LogPrint (eLogWarning, "Packet ", it->GetSeqn (), " was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate");
m_Status = eStreamStatusReset;
Close ();
return;
}
} }
// select tunnels if necessary and send
if (packets.size () > 0) if (packets.size () > 0)
{ {
if (congesion) m_NumResendAttempts++;
switch (m_NumResendAttempts)
{ {
// congesion avoidance case 1: // congesion avoidance
m_WindowSize /= 2; m_WindowSize /= 2;
if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE;
} break;
else case 2:
{ case 4:
// congesion avoidance didn't help
m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel
UpdateCurrentRemoteLease (); // pick another lease UpdateCurrentRemoteLease (); // pick another lease
m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change m_RTO = INITIAL_RTO; // drop RTO to initial upon tunnels pair change
LogPrint (eLogWarning, "Another remote lease has been selected stream");
break;
case 3:
// pick another outbound tunnel
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ().GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel);
LogPrint (eLogWarning, "Another outbound tunnel has been selected for stream");
break;
default: ;
} }
SendPackets (packets); SendPackets (packets);
} }

6
Streaming.h

@ -42,7 +42,7 @@ namespace stream
const size_t MAX_PACKET_SIZE = 4096; const size_t MAX_PACKET_SIZE = 4096;
const size_t COMPRESSION_THRESHOLD_SIZE = 66; const size_t COMPRESSION_THRESHOLD_SIZE = 66;
const int ACK_SEND_TIMEOUT = 200; // in milliseconds const int ACK_SEND_TIMEOUT = 200; // in milliseconds
const int MAX_NUM_RESEND_ATTEMPTS = 5; const int MAX_NUM_RESEND_ATTEMPTS = 6;
const int WINDOW_SIZE = 6; // in messages const int WINDOW_SIZE = 6; // in messages
const int MIN_WINDOW_SIZE = 1; const int MIN_WINDOW_SIZE = 1;
const int MAX_WINDOW_SIZE = 128; const int MAX_WINDOW_SIZE = 128;
@ -53,10 +53,9 @@ namespace stream
{ {
size_t len, offset; size_t len, offset;
uint8_t buf[MAX_PACKET_SIZE]; uint8_t buf[MAX_PACKET_SIZE];
int numResendAttempts;
uint64_t sendTime; uint64_t sendTime;
Packet (): len (0), offset (0), numResendAttempts (0), sendTime (0) {}; Packet (): len (0), offset (0), sendTime (0) {};
uint8_t * GetBuffer () { return buf + offset; }; uint8_t * GetBuffer () { return buf + offset; };
size_t GetLength () const { return len - offset; }; size_t GetLength () const { return len - offset; };
@ -179,6 +178,7 @@ namespace stream
std::stringstream m_SendBuffer; std::stringstream m_SendBuffer;
int m_WindowSize, m_RTT, m_RTO; int m_WindowSize, m_RTT, m_RTO;
uint64_t m_LastWindowSizeIncreaseTime; uint64_t m_LastWindowSizeIncreaseTime;
int m_NumResendAttempts;
}; };
class StreamingDestination class StreamingDestination

13
TunnelPool.cpp

@ -101,20 +101,20 @@ namespace tunnel
return v; return v;
} }
std::shared_ptr<OutboundTunnel> TunnelPool::GetNextOutboundTunnel () const std::shared_ptr<OutboundTunnel> TunnelPool::GetNextOutboundTunnel (std::shared_ptr<OutboundTunnel> excluded) const
{ {
std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex); std::unique_lock<std::mutex> l(m_OutboundTunnelsMutex);
return GetNextTunnel (m_OutboundTunnels); return GetNextTunnel (m_OutboundTunnels, excluded);
} }
std::shared_ptr<InboundTunnel> TunnelPool::GetNextInboundTunnel () const std::shared_ptr<InboundTunnel> TunnelPool::GetNextInboundTunnel (std::shared_ptr<InboundTunnel> excluded) const
{ {
std::unique_lock<std::mutex> l(m_InboundTunnelsMutex); std::unique_lock<std::mutex> l(m_InboundTunnelsMutex);
return GetNextTunnel (m_InboundTunnels); return GetNextTunnel (m_InboundTunnels, excluded);
} }
template<class TTunnels> template<class TTunnels>
typename TTunnels::value_type TunnelPool::GetNextTunnel (TTunnels& tunnels) const typename TTunnels::value_type TunnelPool::GetNextTunnel (TTunnels& tunnels, typename TTunnels::value_type excluded) const
{ {
if (tunnels.empty ()) return nullptr; if (tunnels.empty ()) return nullptr;
CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator (); CryptoPP::RandomNumberGenerator& rnd = i2p::context.GetRandomNumberGenerator ();
@ -122,13 +122,14 @@ namespace tunnel
typename TTunnels::value_type tunnel = nullptr; typename TTunnels::value_type tunnel = nullptr;
for (auto it: tunnels) for (auto it: tunnels)
{ {
if (it->IsEstablished ()) if (it->IsEstablished () && it != excluded)
{ {
tunnel = it; tunnel = it;
i++; i++;
} }
if (i > ind && tunnel) break; if (i > ind && tunnel) break;
} }
if (!tunnel && excluded && excluded->IsEstablished ()) tunnel = excluded;
return tunnel; return tunnel;
} }

6
TunnelPool.h

@ -39,8 +39,8 @@ namespace tunnel
void TunnelCreated (std::shared_ptr<OutboundTunnel> createdTunnel); void TunnelCreated (std::shared_ptr<OutboundTunnel> createdTunnel);
void TunnelExpired (std::shared_ptr<OutboundTunnel> expiredTunnel); void TunnelExpired (std::shared_ptr<OutboundTunnel> expiredTunnel);
std::vector<std::shared_ptr<InboundTunnel> > GetInboundTunnels (int num) const; std::vector<std::shared_ptr<InboundTunnel> > GetInboundTunnels (int num) const;
std::shared_ptr<OutboundTunnel> GetNextOutboundTunnel () const; std::shared_ptr<OutboundTunnel> GetNextOutboundTunnel (std::shared_ptr<OutboundTunnel> excluded = nullptr) const;
std::shared_ptr<InboundTunnel> GetNextInboundTunnel () const; std::shared_ptr<InboundTunnel> GetNextInboundTunnel (std::shared_ptr<InboundTunnel> excluded = nullptr) const;
void TestTunnels (); void TestTunnels ();
void ProcessGarlicMessage (I2NPMessage * msg); void ProcessGarlicMessage (I2NPMessage * msg);
@ -57,7 +57,7 @@ namespace tunnel
void RecreateInboundTunnel (std::shared_ptr<InboundTunnel> tunnel); void RecreateInboundTunnel (std::shared_ptr<InboundTunnel> tunnel);
void RecreateOutboundTunnel (std::shared_ptr<OutboundTunnel> tunnel); void RecreateOutboundTunnel (std::shared_ptr<OutboundTunnel> tunnel);
template<class TTunnels> template<class TTunnels>
typename TTunnels::value_type GetNextTunnel (TTunnels& tunnels) const; typename TTunnels::value_type GetNextTunnel (TTunnels& tunnels, typename TTunnels::value_type excluded) const;
std::shared_ptr<const i2p::data::RouterInfo> SelectNextHop (std::shared_ptr<const i2p::data::RouterInfo> prevHop) const; std::shared_ptr<const i2p::data::RouterInfo> SelectNextHop (std::shared_ptr<const i2p::data::RouterInfo> prevHop) const;
private: private:

Loading…
Cancel
Save