From 6d7847f2dfc2da364f5303207f3665533bfbbbbc Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 9 Jun 2020 16:26:45 -0400 Subject: [PATCH] send bulk datagrams --- libi2pd/Datagram.cpp | 14 ++++++++++++-- libi2pd/Datagram.h | 3 ++- libi2pd_client/I2PTunnel.cpp | 18 ++++++++++++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/libi2pd/Datagram.cpp b/libi2pd/Datagram.cpp index 04792bc5..5a6b8d90 100644 --- a/libi2pd/Datagram.cpp +++ b/libi2pd/Datagram.cpp @@ -56,6 +56,11 @@ namespace datagram session->SendMsg(msg); } + void DatagramDestination::FlushSendQueue (const i2p::data::IdentHash & ident) + { + ObtainSession(ident)->FlushSendQueue (); + } + void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort,uint8_t * const &buf, size_t len) { i2p::data::IdentityEx identity; @@ -366,6 +371,7 @@ namespace datagram void DatagramSession::FlushSendQueue () { + if (m_SendQueue.empty ()) return; std::vector send; auto routingPath = GetSharedRoutingPath(); // if we don't have a routing path we will drop all queued messages @@ -380,7 +386,6 @@ namespace datagram routingPath->outboundTunnel->SendTunnelDataMsg(send); } m_SendQueue.clear(); - ScheduleFlushSendQueue(); } void DatagramSession::ScheduleFlushSendQueue() @@ -388,7 +393,12 @@ namespace datagram boost::posix_time::milliseconds dlt(10); m_SendQueueTimer.expires_from_now(dlt); auto self = shared_from_this(); - m_SendQueueTimer.async_wait([self](const boost::system::error_code & ec) { if(ec) return; self->FlushSendQueue(); }); + m_SendQueueTimer.async_wait([self](const boost::system::error_code & ec) + { + if(ec) return; + self->FlushSendQueue(); + self->ScheduleFlushSendQueue(); + }); } } } diff --git a/libi2pd/Datagram.h b/libi2pd/Datagram.h index e81f738a..b7abcc03 100644 --- a/libi2pd/Datagram.h +++ b/libi2pd/Datagram.h @@ -59,6 +59,7 @@ namespace datagram /** send an i2np message to remote endpoint for this session */ void SendMsg(std::shared_ptr msg); + void FlushSendQueue(); /** get the last time in milliseconds for when we used this datagram session */ uint64_t LastActivity() const { return m_LastUse; } @@ -84,7 +85,6 @@ namespace datagram private: - void FlushSendQueue(); void ScheduleFlushSendQueue(); void HandleSend(std::shared_ptr msg); @@ -122,6 +122,7 @@ namespace datagram void SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash & ident, uint16_t fromPort = 0, uint16_t toPort = 0); void SendRawDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash & ident, uint16_t fromPort = 0, uint16_t toPort = 0); + void FlushSendQueue (const i2p::data::IdentHash & ident); void HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len, bool isRaw = false); void SetReceiver (const Receiver& receiver) { m_Receiver = receiver; }; diff --git a/libi2pd_client/I2PTunnel.cpp b/libi2pd_client/I2PTunnel.cpp index ea2517d7..048c9962 100644 --- a/libi2pd_client/I2PTunnel.cpp +++ b/libi2pd_client/I2PTunnel.cpp @@ -811,6 +811,24 @@ namespace client // send off to remote i2p destination LogPrint(eLogDebug, "UDP Client: send ", transferred, " to ", m_RemoteIdent->ToBase32(), ":", RemotePort); m_LocalDest->GetDatagramDestination()->SendDatagramTo(m_RecvBuff, transferred, *m_RemoteIdent, remotePort, RemotePort); + size_t numPackets = 0; + while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE) + { + boost::system::error_code ec; + size_t moreBytes = m_LocalSocket.available(ec); + if (ec || !moreBytes) break; + transferred = m_LocalSocket.receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU), m_RecvEndpoint, 0, ec); + remotePort = m_RecvEndpoint.port(); + // TODO: check remotePort + m_LocalDest->GetDatagramDestination()->SendDatagramTo(m_RecvBuff, transferred, *m_RemoteIdent, remotePort, RemotePort); + numPackets++; + } + if (numPackets) + { + LogPrint(eLogDebug, "UDP Client: sent ", numPackets, " more packets to ", m_RemoteIdent->ToBase32()); + m_LocalDest->GetDatagramDestination()->FlushSendQueue (*m_RemoteIdent); + } + // mark convo as active if (m_LastSession) m_LastSession->second = i2p::util::GetMillisecondsSinceEpoch();