Browse Source

separate receive thread

pull/1752/head
orignal 3 years ago
parent
commit
5891b1ceb2
  1. 47
      libi2pd/SSU2.cpp
  2. 14
      libi2pd/SSU2.h

47
libi2pd/SSU2.cpp

@ -1109,7 +1109,8 @@ namespace transport
} }
SSU2Server::SSU2Server (): SSU2Server::SSU2Server ():
RunnableServiceWithWork ("SSU2"), m_Socket (GetService ()), m_SocketV6 (GetService ()), RunnableServiceWithWork ("SSU2"), m_ReceiveServiceV4 ("SSU2v4"), m_ReceiveServiceV6 ("SSU2v6"),
m_SocketV4 (m_ReceiveServiceV4.GetService ()), m_SocketV6 (m_ReceiveServiceV6.GetService ()),
m_TerminationTimer (GetService ()), m_ResendTimer (GetService ()) m_TerminationTimer (GetService ()), m_ResendTimer (GetService ())
{ {
} }
@ -1139,9 +1140,25 @@ namespace transport
if (port) if (port)
{ {
if (address->IsV4 ()) if (address->IsV4 ())
Receive (OpenSocket (boost::asio::ip::udp::endpoint (boost::asio::ip::udp::v4(), port))); {
m_ReceiveServiceV4.Start ();
OpenSocket (boost::asio::ip::udp::endpoint (boost::asio::ip::udp::v4(), port));
m_ReceiveServiceV4.GetService ().post(
[this]()
{
Receive (m_SocketV4);
});
}
if (address->IsV6 ()) if (address->IsV6 ())
Receive (OpenSocket (boost::asio::ip::udp::endpoint (boost::asio::ip::udp::v6(), port))); {
m_ReceiveServiceV6.Start ();
OpenSocket (boost::asio::ip::udp::endpoint (boost::asio::ip::udp::v6(), port));
m_ReceiveServiceV6.GetService ().post(
[this]()
{
Receive (m_SocketV6);
});
}
} }
else else
LogPrint (eLogError, "SSU2: Can't start server because port not specified"); LogPrint (eLogError, "SSU2: Can't start server because port not specified");
@ -1153,6 +1170,12 @@ namespace transport
void SSU2Server::Stop () void SSU2Server::Stop ()
{ {
if (context.SupportsV4 ())
m_ReceiveServiceV4.Stop ();
if (context.SupportsV6 ())
m_ReceiveServiceV6.Stop ();
if (IsRunning ()) if (IsRunning ())
m_TerminationTimer.cancel (); m_TerminationTimer.cancel ();
@ -1161,7 +1184,7 @@ namespace transport
boost::asio::ip::udp::socket& SSU2Server::OpenSocket (const boost::asio::ip::udp::endpoint& localEndpoint) boost::asio::ip::udp::socket& SSU2Server::OpenSocket (const boost::asio::ip::udp::endpoint& localEndpoint)
{ {
boost::asio::ip::udp::socket& socket = localEndpoint.address ().is_v6 () ? m_SocketV6 : m_Socket; boost::asio::ip::udp::socket& socket = localEndpoint.address ().is_v6 () ? m_SocketV6 : m_SocketV4;
try try
{ {
socket.open (localEndpoint.protocol ()); socket.open (localEndpoint.protocol ());
@ -1194,8 +1217,7 @@ namespace transport
{ {
i2p::transport::transports.UpdateReceivedBytes (bytes_transferred); i2p::transport::transports.UpdateReceivedBytes (bytes_transferred);
packet->len = bytes_transferred; packet->len = bytes_transferred;
ProcessNextPacket (packet->buf, packet->len, packet->from); GetService ().post (std::bind (&SSU2Server::HandleReceivedPacket, this, packet));
m_PacketsPool.ReleaseMt (packet);
Receive (socket); Receive (socket);
} }
else else
@ -1212,6 +1234,15 @@ namespace transport
} }
} }
void SSU2Server::HandleReceivedPacket (Packet * packet)
{
if (packet)
{
ProcessNextPacket (packet->buf, packet->len, packet->from);
m_PacketsPool.ReleaseMt (packet);
}
}
void SSU2Server::AddSession (std::shared_ptr<SSU2Session> session) void SSU2Server::AddSession (std::shared_ptr<SSU2Session> session)
{ {
if (session) if (session)
@ -1275,7 +1306,7 @@ namespace transport
if (to.address ().is_v6 ()) if (to.address ().is_v6 ())
m_SocketV6.send_to (bufs, to, 0, ec); m_SocketV6.send_to (bufs, to, 0, ec);
else else
m_Socket.send_to (bufs, to, 0, ec); m_SocketV4.send_to (bufs, to, 0, ec);
i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen); i2p::transport::transports.UpdateSentBytes (headerLen + payloadLen);
} }
@ -1292,7 +1323,7 @@ namespace transport
if (to.address ().is_v6 ()) if (to.address ().is_v6 ())
m_SocketV6.send_to (bufs, to, 0, ec); m_SocketV6.send_to (bufs, to, 0, ec);
else else
m_Socket.send_to (bufs, to, 0, ec); m_SocketV4.send_to (bufs, to, 0, ec);
i2p::transport::transports.UpdateSentBytes (headerLen + headerXLen + payloadLen); i2p::transport::transports.UpdateSentBytes (headerLen + headerXLen + payloadLen);
} }

14
libi2pd/SSU2.h

@ -210,6 +210,16 @@ namespace transport
boost::asio::ip::udp::endpoint from; boost::asio::ip::udp::endpoint from;
}; };
class ReceiveService: public i2p::util::RunnableService
{
public:
ReceiveService (const std::string& name): RunnableService (name) {};
boost::asio::io_service& GetService () { return GetIOService (); };
void Start () { StartIOService (); };
void Stop () { StopIOService (); };
};
public: public:
SSU2Server (); SSU2Server ();
@ -241,6 +251,7 @@ namespace transport
void Receive (boost::asio::ip::udp::socket& socket); void Receive (boost::asio::ip::udp::socket& socket);
void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred, void HandleReceivedFrom (const boost::system::error_code& ecode, size_t bytes_transferred,
Packet * packet, boost::asio::ip::udp::socket& socket); Packet * packet, boost::asio::ip::udp::socket& socket);
void HandleReceivedPacket (Packet * packet);
void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void ProcessNextPacket (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
void ScheduleTermination (); void ScheduleTermination ();
@ -251,7 +262,8 @@ namespace transport
private: private:
boost::asio::ip::udp::socket m_Socket, m_SocketV6; ReceiveService m_ReceiveServiceV4, m_ReceiveServiceV6;
boost::asio::ip::udp::socket m_SocketV4, m_SocketV6;
std::unordered_map<uint64_t, std::shared_ptr<SSU2Session> > m_Sessions; std::unordered_map<uint64_t, std::shared_ptr<SSU2Session> > m_Sessions;
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions; std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSU2Session> > m_PendingOutgoingSessions;
std::map<boost::asio::ip::udp::endpoint, std::pair<uint64_t, uint32_t> > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds) std::map<boost::asio::ip::udp::endpoint, std::pair<uint64_t, uint32_t> > m_IncomingTokens, m_OutgoingTokens; // remote endpoint -> (token, expires in seconds)

Loading…
Cancel
Save