diff --git a/libi2pd_client/I2PTunnel.cpp b/libi2pd_client/I2PTunnel.cpp index 48b1e79d..74175b14 100644 --- a/libi2pd_client/I2PTunnel.cpp +++ b/libi2pd_client/I2PTunnel.cpp @@ -611,6 +611,7 @@ namespace client void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { + if (!m_LastSession || m_LastSession->Identity.GetLL()[0] != from.GetIdentHash ().GetLL()[0]) { std::lock_guard lock(m_SessionsMutex); m_LastSession = ObtainUDPSession(from, toPort, fromPort); @@ -658,7 +659,7 @@ namespace client auto ih = from.GetIdentHash(); for (auto & s : m_Sessions ) { - if ( s->Identity == ih) + if (s->Identity.GetLL()[0] == ih.GetLL()[0]) { /** found existing session */ LogPrint(eLogDebug, "UDPServer: found session ", s->IPSocket.local_endpoint(), " ", ih.ToBase32()); @@ -707,11 +708,25 @@ namespace client { LogPrint(eLogDebug, "UDPSession: forward ", len, "B from ", FromEndpoint); LastActivity = i2p::util::GetMillisecondsSinceEpoch(); - m_Destination->SendDatagramTo(m_Buffer, len, Identity, LocalPort, RemotePort); + auto session = m_Destination->GetSession (Identity); + m_Destination->SendDatagram(session, m_Buffer, len, LocalPort, RemotePort); + size_t numPackets = 0; + while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE) + { + boost::system::error_code ec; + size_t moreBytes = IPSocket.available(ec); + if (ec || !moreBytes) break; + len = IPSocket.receive_from (boost::asio::buffer (m_Buffer, I2P_UDP_MAX_MTU), FromEndpoint, 0, ec); + m_Destination->SendRawDatagram (session, m_Buffer, len, LocalPort, RemotePort); + numPackets++; + } + if (numPackets > 0) + LogPrint(eLogDebug, "UDPSession: forward more ", numPackets, "packets B from ", FromEndpoint); + m_Destination->FlushSendQueue (session); Receive(); - } else { + } + else LogPrint(eLogError, "UDPSession: ", ecode.message()); - } } I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, @@ -781,6 +796,8 @@ namespace client std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); + dgram->SetRawReceiver(std::bind(&I2PUDPClientTunnel::HandleRecvFromI2PRaw, this, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } void I2PUDPClientTunnel::Start() { @@ -880,26 +897,30 @@ namespace client void I2PUDPClientTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { if(m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent) + HandleRecvFromI2PRaw (fromPort, toPort, buf, len); + else + LogPrint(eLogWarning, "UDP Client: unwarranted traffic from ", from.GetIdentHash().ToBase32()); + } + + void I2PUDPClientTunnel::HandleRecvFromI2PRaw(uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + auto itr = m_Sessions.find(toPort); + // found convo ? + if(itr != m_Sessions.end()) { - auto itr = m_Sessions.find(toPort); - // found convo ? - if(itr != m_Sessions.end()) + // found convo + if (len > 0) { - // found convo - if (len > 0) { - LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32()); - m_LocalSocket.send_to(boost::asio::buffer(buf, len), itr->second->first); - // mark convo as active - itr->second->second = i2p::util::GetMillisecondsSinceEpoch(); - } + LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", m_RemoteIdent ? m_RemoteIdent->ToBase32() : ""); + m_LocalSocket.send_to(boost::asio::buffer(buf, len), itr->second->first); + // mark convo as active + itr->second->second = i2p::util::GetMillisecondsSinceEpoch(); } - else - LogPrint(eLogWarning, "UDP Client: not tracking udp session using port ", (int) toPort); } else - LogPrint(eLogWarning, "UDP Client: unwarranted traffic from ", from.GetIdentHash().ToBase32()); + LogPrint(eLogWarning, "UDP Client: not tracking udp session using port ", (int) toPort); } - + I2PUDPClientTunnel::~I2PUDPClientTunnel() { auto dgram = m_LocalDest->GetDatagramDestination(); if (dgram) dgram->ResetReceiver(); diff --git a/libi2pd_client/I2PTunnel.h b/libi2pd_client/I2PTunnel.h index 19e44e79..19d553da 100644 --- a/libi2pd_client/I2PTunnel.h +++ b/libi2pd_client/I2PTunnel.h @@ -275,7 +275,11 @@ namespace client void RecvFromLocal(); void HandleRecvFromLocal(const boost::system::error_code & e, std::size_t transferred); void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + void HandleRecvFromI2PRaw(uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void TryResolving(); + + private: + const std::string m_Name; std::mutex m_SessionsMutex; std::unordered_map > m_Sessions; // maps i2p port -> local udp convo