diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 932d16bf..9b283612 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -617,6 +617,17 @@ namespace transport if (msg) Send (msg); } + + void NTCPSession::SendI2NPMessages (const std::vector& msgs) + { + m_Server.GetService ().post (std::bind (&NTCPSession::PostI2NPMessages, shared_from_this (), msgs)); + } + + void NTCPSession::PostI2NPMessages (std::vector msgs) + { + for (auto it: msgs) + if (it) Send (it); + } void NTCPSession::ScheduleTermination () { diff --git a/NTCPSession.h b/NTCPSession.h index 43c7dc58..8435d5d9 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -61,6 +61,7 @@ namespace transport void ClientLogin (); void ServerLogin (); void SendI2NPMessage (I2NPMessage * msg); + void SendI2NPMessages (const std::vector& msgs); size_t GetNumSentBytes () const { return m_NumSentBytes; }; size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; @@ -68,6 +69,7 @@ namespace transport protected: void PostI2NPMessage (I2NPMessage * msg); + void PostI2NPMessages (std::vector msgs); void Connected (); void SendTimeSyncMessage (); void SetIsEstablished (bool isEstablished) { m_IsEstablished = isEstablished; } diff --git a/SSUSession.cpp b/SSUSession.cpp index 3bf95808..0cbbc818 100644 --- a/SSUSession.cpp +++ b/SSUSession.cpp @@ -827,7 +827,19 @@ namespace transport if (msg) m_Data.Send (msg); } - + + void SSUSession::SendI2NPMessages (const std::vector& msgs) + { + boost::asio::io_service& service = IsV6 () ? m_Server.GetServiceV6 () : m_Server.GetService (); + service.post (std::bind (&SSUSession::PostI2NPMessages, shared_from_this (), msgs)); + } + + void SSUSession::PostI2NPMessages (std::vector msgs) + { + for (auto it: msgs) + if (it) m_Data.Send (it); + } + void SSUSession::ProcessData (uint8_t * buf, size_t len) { m_Data.ProcessMessage (buf, len); diff --git a/SSUSession.h b/SSUSession.h index ce840636..565afc1f 100644 --- a/SSUSession.h +++ b/SSUSession.h @@ -66,6 +66,7 @@ namespace transport boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; }; bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); }; void SendI2NPMessage (I2NPMessage * msg); + void SendI2NPMessages (const std::vector& msgs); void SendPeerTest (); // Alice SessionState GetState () const { return m_State; }; @@ -81,6 +82,7 @@ namespace transport void CreateAESandMacKey (const uint8_t * pubKey); void PostI2NPMessage (I2NPMessage * msg); + void PostI2NPMessages (std::vector msgs); void ProcessMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); // call for established session void ProcessSessionRequest (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); void SendSessionRequest (); diff --git a/TransportSession.h b/TransportSession.h index 99d8d868..45ce9366 100644 --- a/TransportSession.h +++ b/TransportSession.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "Identity.h" #include "RouterInfo.h" #include "I2NPProtocol.h" @@ -66,6 +67,7 @@ namespace transport const i2p::data::IdentityEx& GetRemoteIdentity () { return m_RemoteIdentity; }; virtual void SendI2NPMessage (I2NPMessage * msg) = 0; + virtual void SendI2NPMessages (const std::vector& msgs) = 0; protected: diff --git a/Transports.cpp b/Transports.cpp index 4cb20eef..dfcfe253 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -184,6 +184,11 @@ namespace transport m_Service.post (std::bind (&Transports::PostMessage, this, ident, msg)); } + void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector& msgs) + { + m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs)); + } + void Transports::PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg) { if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) @@ -210,6 +215,36 @@ namespace transport it->second.delayedMessages.push_back (msg); } + void Transports::PostMessages (const i2p::data::IdentHash& ident, std::vector msgs) + { + if (ident == i2p::context.GetRouterInfo ().GetIdentHash ()) + { + // we send it to ourself + for (auto it: msgs) + i2p::HandleI2NPMessage (it); + return; + } + auto it = m_Peers.find (ident); + if (it == m_Peers.end ()) + { + auto r = netdb.FindRouter (ident); + it = m_Peers.insert (std::pair(ident, { 0, r, nullptr})).first; + if (!ConnectToPeer (ident, it->second)) + { + for (auto it1: msgs) + DeleteI2NPMessage (it1); + return; + } + } + if (it->second.session) + it->second.session->SendI2NPMessages (msgs); + else + { + for (auto it1: msgs) + it->second.delayedMessages.push_back (it1); + } + } + bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer) { if (peer.router) // we have RI already @@ -365,8 +400,7 @@ namespace transport if (it != m_Peers.end ()) { it->second.session = session; - for (auto it1: it->second.delayedMessages) - session->SendI2NPMessage (it1); + session->SendI2NPMessages (it->second.delayedMessages); it->second.delayedMessages.clear (); } else // incoming connection diff --git a/Transports.h b/Transports.h index ffc561ff..d09699d6 100644 --- a/Transports.h +++ b/Transports.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include #include @@ -56,7 +56,7 @@ namespace transport int numAttempts; std::shared_ptr router; std::shared_ptr session; - std::list delayedMessages; + std::vector delayedMessages; ~Peer () { @@ -80,6 +80,7 @@ namespace transport void ReuseDHKeysPair (DHKeysPair * pair); void SendMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); + void SendMessages (const i2p::data::IdentHash& ident, const std::vector& msgs); void CloseSession (std::shared_ptr router); void PeerConnected (std::shared_ptr session); @@ -91,6 +92,7 @@ namespace transport void RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident); void HandleRequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident); void PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg); + void PostMessages (const i2p::data::IdentHash& ident, std::vector msgs); void PostCloseSession (std::shared_ptr router); bool ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer); diff --git a/TunnelGateway.cpp b/TunnelGateway.cpp index 146c007c..a2352031 100644 --- a/TunnelGateway.cpp +++ b/TunnelGateway.cpp @@ -188,9 +188,9 @@ namespace tunnel { m_Tunnel->EncryptTunnelMsg (tunnelMsg); FillI2NPMessageHeader (tunnelMsg, eI2NPTunnelData); - i2p::transport::transports.SendMessage (m_Tunnel->GetNextIdentHash (), tunnelMsg); m_NumSentBytes += TUNNEL_DATA_MSG_SIZE; } + i2p::transport::transports.SendMessages (m_Tunnel->GetNextIdentHash (), tunnelMsgs); m_Buffer.ClearTunnelDataMsgs (); } }