Browse Source

handle incoming packets in batch

pull/1752/head
orignal 3 years ago
parent
commit
1e2a0a4549
  1. 68
      libi2pd/SSU2.cpp
  2. 6
      libi2pd/SSU2.h

68
libi2pd/SSU2.cpp

@ -31,7 +31,8 @@ namespace transport
std::shared_ptr<const i2p::data::RouterInfo::Address> addr, bool peerTest): std::shared_ptr<const i2p::data::RouterInfo::Address> addr, bool peerTest):
TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT), TransportSession (in_RemoteRouter, SSU2_CONNECT_TIMEOUT),
m_Server (server), m_Address (addr), m_DestConnID (0), m_SourceConnID (0), m_Server (server), m_Address (addr), m_DestConnID (0), m_SourceConnID (0),
m_State (eSSU2SessionStateUnknown), m_SendPacketNum (0), m_ReceivePacketNum (0) m_State (eSSU2SessionStateUnknown), m_SendPacketNum (0), m_ReceivePacketNum (0),
m_IsDataReceived (false)
{ {
m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState); m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState);
if (in_RemoteRouter && m_Address) if (in_RemoteRouter && m_Address)
@ -739,15 +740,11 @@ namespace transport
m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch (); m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch ();
m_NumReceivedBytes += len; m_NumReceivedBytes += len;
if (UpdateReceivePacketNum (packetNum)) if (UpdateReceivePacketNum (packetNum))
{ HandlePayload (payload, payloadSize);
if (HandlePayload (payload, payloadSize))
SendQuickAck (); // TODO: don't send too requently
}
} }
bool SSU2Session::HandlePayload (const uint8_t * buf, size_t len) void SSU2Session::HandlePayload (const uint8_t * buf, size_t len)
{ {
bool isData = false;
size_t offset = 0; size_t offset = 0;
while (offset < len) while (offset < len)
{ {
@ -786,18 +783,18 @@ namespace transport
memcpy (nextMsg->GetNTCP2Header (), buf + offset, size); memcpy (nextMsg->GetNTCP2Header (), buf + offset, size);
nextMsg->FromNTCP2 (); // SSU2 has the same format as NTCP2 nextMsg->FromNTCP2 (); // SSU2 has the same format as NTCP2
m_Handler.PutNextMessage (std::move (nextMsg)); m_Handler.PutNextMessage (std::move (nextMsg));
isData = true; m_IsDataReceived = true;
break; break;
} }
case eSSU2BlkFirstFragment: case eSSU2BlkFirstFragment:
LogPrint (eLogDebug, "SSU2: First fragment"); LogPrint (eLogDebug, "SSU2: First fragment");
HandleFirstFragment (buf + offset, size); HandleFirstFragment (buf + offset, size);
isData = true; m_IsDataReceived = true;
break; break;
case eSSU2BlkFollowOnFragment: case eSSU2BlkFollowOnFragment:
LogPrint (eLogDebug, "SSU2: Follow-on fragment"); LogPrint (eLogDebug, "SSU2: Follow-on fragment");
HandleFollowOnFragment (buf + offset, size); HandleFollowOnFragment (buf + offset, size);
isData = true; m_IsDataReceived = true;
break; break;
case eSSU2BlkTermination: case eSSU2BlkTermination:
LogPrint (eLogDebug, "SSU2: Termination"); LogPrint (eLogDebug, "SSU2: Termination");
@ -852,8 +849,6 @@ namespace transport
} }
offset += size; offset += size;
} }
m_Handler.Flush ();
return isData;
} }
void SSU2Session::HandleAck (const uint8_t * buf, size_t len) void SSU2Session::HandleAck (const uint8_t * buf, size_t len)
@ -1216,6 +1211,16 @@ namespace transport
} }
} }
void SSU2Session::FlushData ()
{
if (m_IsDataReceived)
{
SendQuickAck ();
m_Handler.Flush ();
m_IsDataReceived = false;
}
}
SSU2Server::SSU2Server (): SSU2Server::SSU2Server ():
RunnableServiceWithWork ("SSU2"), m_ReceiveService ("SSU2r"), RunnableServiceWithWork ("SSU2"), m_ReceiveService ("SSU2r"),
m_SocketV4 (m_ReceiveService.GetService ()), m_SocketV6 (m_ReceiveService.GetService ()), m_SocketV4 (m_ReceiveService.GetService ()), m_SocketV6 (m_ReceiveService.GetService ()),
@ -1325,7 +1330,34 @@ namespace transport
{ {
i2p::transport::transports.UpdateReceivedBytes (bytes_transferred); i2p::transport::transports.UpdateReceivedBytes (bytes_transferred);
packet->len = bytes_transferred; packet->len = bytes_transferred;
GetService ().post (std::bind (&SSU2Server::HandleReceivedPacket, this, packet)); std::vector<Packet *> packets;
packets.push_back (packet);
boost::system::error_code ec;
size_t moreBytes = socket.available (ec);
if (!ec)
{
while (moreBytes && packets.size () < 32)
{
packet = m_PacketsPool.AcquireMt ();
packet->len = socket.receive_from (boost::asio::buffer (packet->buf, SSU2_MTU), packet->from, 0, ec);
if (!ec)
{
i2p::transport::transports.UpdateReceivedBytes (packet->len);
packets.push_back (packet);
moreBytes = socket.available(ec);
if (ec) break;
}
else
{
LogPrint (eLogError, "SSU2: receive_from error: code ", ec.value(), ": ", ec.message ());
m_PacketsPool.ReleaseMt (packet);
break;
}
}
}
GetService ().post (std::bind (&SSU2Server::HandleReceivedPacket, this, packets));
Receive (socket); Receive (socket);
} }
else else
@ -1342,13 +1374,12 @@ namespace transport
} }
} }
void SSU2Server::HandleReceivedPacket (Packet * packet) void SSU2Server::HandleReceivedPacket (std::vector<Packet *> packets)
{
if (packet)
{ {
for (auto& packet: packets)
ProcessNextPacket (packet->buf, packet->len, packet->from); ProcessNextPacket (packet->buf, packet->len, packet->from);
m_PacketsPool.ReleaseMt (packet); m_PacketsPool.ReleaseMt (packets);
} if (m_LastSession) m_LastSession->FlushData ();
} }
void SSU2Server::AddSession (std::shared_ptr<SSU2Session> session) void SSU2Server::AddSession (std::shared_ptr<SSU2Session> session)
@ -1375,6 +1406,7 @@ namespace transport
connID ^= CreateHeaderMask (i2p::context.GetSSU2IntroKey (), buf + (len - 24)); connID ^= CreateHeaderMask (i2p::context.GetSSU2IntroKey (), buf + (len - 24));
if (!m_LastSession || m_LastSession->GetConnID () != connID) if (!m_LastSession || m_LastSession->GetConnID () != connID)
{ {
if (m_LastSession) m_LastSession->FlushData ();
auto it = m_Sessions.find (connID); auto it = m_Sessions.find (connID);
if (it != m_Sessions.end ()) if (it != m_Sessions.end ())
m_LastSession = it->second; m_LastSession = it->second;

6
libi2pd/SSU2.h

@ -136,6 +136,7 @@ namespace transport
void Terminate (); void Terminate ();
void TerminateByTimeout (); void TerminateByTimeout ();
void CleanUp (uint64_t ts); void CleanUp (uint64_t ts);
void FlushData ();
void Done () override; void Done () override;
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override; void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) override;
void Resend (uint64_t ts); void Resend (uint64_t ts);
@ -168,7 +169,7 @@ namespace transport
void SendQuickAck (); void SendQuickAck ();
void SendTermination (); void SendTermination ();
bool HandlePayload (const uint8_t * buf, size_t len); // returns true is contains data void HandlePayload (const uint8_t * buf, size_t len);
void HandleAck (const uint8_t * buf, size_t len); void HandleAck (const uint8_t * buf, size_t len);
void HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum); void HandleAckRange (uint32_t firstPacketNum, uint32_t lastPacketNum);
bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep); bool ExtractEndpoint (const uint8_t * buf, size_t size, boost::asio::ip::udp::endpoint& ep);
@ -203,6 +204,7 @@ namespace transport
std::map<uint32_t, std::shared_ptr<SSU2IncompleteMessage> > m_IncompleteMessages; // I2NP std::map<uint32_t, std::shared_ptr<SSU2IncompleteMessage> > m_IncompleteMessages; // I2NP
std::list<std::shared_ptr<I2NPMessage> > m_SendQueue; std::list<std::shared_ptr<I2NPMessage> > m_SendQueue;
i2p::I2NPMessagesHandler m_Handler; i2p::I2NPMessagesHandler m_Handler;
bool m_IsDataReceived;
}; };
class SSU2Server: private i2p::util::RunnableServiceWithWork class SSU2Server: private i2p::util::RunnableServiceWithWork
@ -255,7 +257,7 @@ namespace transport
void Receive (boost::asio::ip::udp::socket& socket); void Receive (boost::asio::ip::udp::socket& socket);
void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred,
Packet * packet, boost::asio::ip::udp::socket& socket); Packet * packet, boost::asio::ip::udp::socket& socket);
void HandleReceivedPacket (Packet * packet); void HandleReceivedPacket (std::vector<Packet *> packets);
void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
void ScheduleTermination (); void ScheduleTermination ();

Loading…
Cancel
Save