diff --git a/SSU.cpp b/SSU.cpp index 440b4f5b..1ad3c3fd 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -160,7 +160,19 @@ namespace transport if (!ecode) { packet->len = bytes_transferred; - m_Service.post (std::bind (&SSUServer::HandleReceivedBuffer, this, packet)); + std::vector packets; + packets.push_back (packet); + + size_t moreBytes = m_Socket.available(); + while (moreBytes && packets.size () < 25) + { + packet = new SSUPacket (); + packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from); + packets.push_back (packet); + moreBytes = m_Socket.available(); + } + + m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets)); Receive (); } else @@ -175,7 +187,19 @@ namespace transport if (!ecode) { packet->len = bytes_transferred; - m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedBuffer, this, packet)); + std::vector packets; + packets.push_back (packet); + + size_t moreBytes = m_SocketV6.available (); + while (moreBytes && packets.size () < 25) + { + packet = new SSUPacket (); + packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from); + packets.push_back (packet); + moreBytes = m_SocketV6.available(); + } + + m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets)); ReceiveV6 (); } else @@ -185,24 +209,28 @@ namespace transport } } - void SSUServer::HandleReceivedBuffer (SSUPacket * packet) + void SSUServer::HandleReceivedPackets (std::vector packets) { - std::shared_ptr session; - auto it = m_Sessions.find (packet->from); - if (it != m_Sessions.end ()) - session = it->second; - if (!session) + for (auto it1: packets) { - session = std::make_shared (*this, packet->from); - session->WaitForConnect (); + auto packet = it1; + std::shared_ptr session; + auto it = m_Sessions.find (packet->from); + if (it != m_Sessions.end ()) + session = it->second; + if (!session) { - std::unique_lock l(m_SessionsMutex); - m_Sessions[packet->from] = session; - } - LogPrint ("New SSU session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created"); + session = std::make_shared (*this, packet->from); + session->WaitForConnect (); + { + std::unique_lock l(m_SessionsMutex); + m_Sessions[packet->from] = session; + } + LogPrint ("New SSU session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created"); + } + session->ProcessNextMessage (packet->buf, packet->len, packet->from); + delete packet; } - session->ProcessNextMessage (packet->buf, packet->len, packet->from); - delete packet; } std::shared_ptr SSUServer::FindSession (std::shared_ptr router) const diff --git a/SSU.h b/SSU.h index 01ca2607..70cea541 100644 --- a/SSU.h +++ b/SSU.h @@ -62,7 +62,7 @@ namespace transport void ReceiveV6 (); void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); - void HandleReceivedBuffer (SSUPacket * packet); + void HandleReceivedPackets (std::vector packets); template std::shared_ptr GetRandomSession (Filter filter);