Browse Source

receive multiple UDP packets

pull/158/head
orignal 10 years ago
parent
commit
c6c414c382
  1. 60
      SSU.cpp
  2. 2
      SSU.h

60
SSU.cpp

@ -160,7 +160,19 @@ namespace transport
if (!ecode) if (!ecode)
{ {
packet->len = bytes_transferred; packet->len = bytes_transferred;
m_Service.post (std::bind (&SSUServer::HandleReceivedBuffer, this, packet)); std::vector<SSUPacket *> packets;
packets.push_back (packet);
size_t moreBytes = m_Socket.available();
while (moreBytes && packets.size () < 25)
{
packet = new SSUPacket ();
packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from);
packets.push_back (packet);
moreBytes = m_Socket.available();
}
m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets));
Receive (); Receive ();
} }
else else
@ -175,7 +187,19 @@ namespace transport
if (!ecode) if (!ecode)
{ {
packet->len = bytes_transferred; packet->len = bytes_transferred;
m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedBuffer, this, packet)); std::vector<SSUPacket *> packets;
packets.push_back (packet);
size_t moreBytes = m_SocketV6.available ();
while (moreBytes && packets.size () < 25)
{
packet = new SSUPacket ();
packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from);
packets.push_back (packet);
moreBytes = m_SocketV6.available();
}
m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets));
ReceiveV6 (); ReceiveV6 ();
} }
else else
@ -185,24 +209,28 @@ namespace transport
} }
} }
void SSUServer::HandleReceivedBuffer (SSUPacket * packet) void SSUServer::HandleReceivedPackets (std::vector<SSUPacket *> packets)
{ {
std::shared_ptr<SSUSession> session; for (auto it1: packets)
auto it = m_Sessions.find (packet->from);
if (it != m_Sessions.end ())
session = it->second;
if (!session)
{ {
session = std::make_shared<SSUSession> (*this, packet->from); auto packet = it1;
session->WaitForConnect (); std::shared_ptr<SSUSession> session;
auto it = m_Sessions.find (packet->from);
if (it != m_Sessions.end ())
session = it->second;
if (!session)
{ {
std::unique_lock<std::mutex> l(m_SessionsMutex); session = std::make_shared<SSUSession> (*this, packet->from);
m_Sessions[packet->from] = session; session->WaitForConnect ();
} {
LogPrint ("New SSU session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created"); std::unique_lock<std::mutex> l(m_SessionsMutex);
m_Sessions[packet->from] = session;
}
LogPrint ("New SSU session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created");
}
session->ProcessNextMessage (packet->buf, packet->len, packet->from);
delete packet;
} }
session->ProcessNextMessage (packet->buf, packet->len, packet->from);
delete packet;
} }
std::shared_ptr<SSUSession> SSUServer::FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const std::shared_ptr<SSUSession> SSUServer::FindSession (std::shared_ptr<const i2p::data::RouterInfo> router) const

2
SSU.h

@ -62,7 +62,7 @@ namespace transport
void ReceiveV6 (); void ReceiveV6 ();
void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
void HandleReceivedBuffer (SSUPacket * packet); void HandleReceivedPackets (std::vector<SSUPacket *> packets);
template<typename Filter> template<typename Filter>
std::shared_ptr<SSUSession> GetRandomSession (Filter filter); std::shared_ptr<SSUSession> GetRandomSession (Filter filter);

Loading…
Cancel
Save