Browse Source

resend packets

pull/1752/head
orignal 2 years ago
parent
commit
2fef595b83
  1. 65
      libi2pd/SSU2.cpp
  2. 6
      libi2pd/SSU2.h

65
libi2pd/SSU2.cpp

@ -104,8 +104,8 @@ namespace transport @@ -104,8 +104,8 @@ namespace transport
for (auto it: msgs)
m_SendQueue.push_back (it);
SendQueue ();
}
}
void SSU2Session::SendQueue ()
{
if (!m_SendQueue.empty ())
@ -146,6 +146,28 @@ namespace transport @@ -146,6 +146,28 @@ namespace transport
}
}
}
void SSU2Session::Resend (uint64_t ts)
{
if (m_SendQueue.empty ()) return;
for (auto it = m_SentPackets.begin (); it != m_SentPackets.end (); )
if (ts > it->second->nextResendTime)
{
if (it->second->numResends > SSU2_MAX_NUM_RESENDS)
it = m_SentPackets.erase (it);
else
{
m_Server.Send (it->second->header.buf, 16, it->second->payload, it->second->payloadLen, m_RemoteEndpoint);
it->second->numResends++;
it->second->nextResendTime = ts + it->second->numResends*SSU2_RESEND_INTERVAL;
m_LastActivityTimestamp = ts;
m_NumSentBytes += it->second->payloadLen + 16;
it++;
}
}
else
it++;
}
void SSU2Session::ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len)
{
@ -765,19 +787,14 @@ namespace transport @@ -765,19 +787,14 @@ namespace transport
{
if (m_SentPackets.empty ()) return;
uint32_t ackThrough = bufbe32toh (buf);
auto it = m_SentPackets.rbegin ();
while (it != m_SentPackets.rend () && it->first > ackThrough) ++it; // find first less pack <= ackThrough
if (it == m_SentPackets.rend ()) return;
int32_t firstPacketNum = ackThrough - buf[4]; // acnt
if (firstPacketNum < 0) firstPacketNum = 0;
uint32_t firstPacketNum = ackThrough > buf[4] ? ackThrough - buf[4] : 0; // acnt
auto it = m_SentPackets.begin ();
while (it != m_SentPackets.end () && it->first < firstPacketNum) it++; // find first acked packet
if (it == m_SentPackets.end ()) return; // not found
auto it1 = it;
while (it1 != m_SentPackets.rend () && it1->first >= (uint32_t)firstPacketNum) it1++;
if (it1 == m_SentPackets.rend ())
{
m_SentPackets.erase (m_SentPackets.begin (), it.base ());
return;
}
m_SentPackets.erase (it1.base (), it.base ());
while (it1 != m_SentPackets.end () && it1->first <= ackThrough) it1++;
it1--;
m_SentPackets.erase (it, it1);
// TODO: handle ranges
}
@ -944,7 +961,7 @@ namespace transport @@ -944,7 +961,7 @@ namespace transport
SSU2Server::SSU2Server ():
RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()),
m_TerminationTimer (GetService ())
m_TerminationTimer (GetService ()), m_ResendTimer (GetService ())
{
}
@ -1200,6 +1217,24 @@ namespace transport @@ -1200,6 +1217,24 @@ namespace transport
}
}
void SSU2Server::ScheduleResend ()
{
m_ResendTimer.expires_from_now (boost::posix_time::seconds(SSU2_RESEND_INTERVAL));
m_ResendTimer.async_wait (std::bind (&SSU2Server::HandleResendTimer,
this, std::placeholders::_1));
}
void SSU2Server::HandleResendTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto it: m_Sessions)
it.second->Resend (ts);
ScheduleResend ();
}
}
void SSU2Server::UpdateOutgoingToken (const boost::asio::ip::udp::endpoint& ep, uint64_t token, uint32_t exp)
{
m_OutgoingTokens[ep] = {token, exp};

6
libi2pd/SSU2.h

@ -122,6 +122,7 @@ namespace transport @@ -122,6 +122,7 @@ namespace transport
void TerminateByTimeout ();
void Done () override;
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
void Resend (uint64_t ts);
bool IsEstablished () const { return m_State == eSSU2SessionStateEstablished; };
uint64_t GetConnID () const { return m_SourceConnID; };
@ -223,6 +224,9 @@ namespace transport @@ -223,6 +224,9 @@ namespace transport
void ScheduleTermination ();
void HandleTerminationTimer (const boost::system::error_code& ecode);
void ScheduleResend ();
void HandleResendTimer (const boost::system::error_code& ecode);
private:
@ -231,7 +235,7 @@ namespace transport @@ -231,7 +235,7 @@ namespace transport
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions;
std::map<boost::asio::ip::udp::endpoint, std::pair<uint64_t, uint32_t> > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds)
i2p::util::MemoryPoolMt<Packet> m_PacketsPool;
boost::asio::deadline_timer m_TerminationTimer;
boost::asio::deadline_timer m_TerminationTimer, m_ResendTimer;
public:

Loading…
Cancel
Save