diff --git a/Destination.cpp b/Destination.cpp index 1f984fa1..73423b4a 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -11,7 +11,7 @@ namespace i2p namespace stream { StreamingDestination::StreamingDestination (boost::asio::io_service& service, bool isPublic): - m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic) + m_Service (service), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) { m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); @@ -22,7 +22,7 @@ namespace stream } StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic): - m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic) + m_Service (service), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) { std::ifstream s(fullPath.c_str (), std::ifstream::binary); if (s.is_open ()) @@ -56,7 +56,7 @@ namespace stream } StreamingDestination::StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic): - m_Service (service), m_Keys (keys), m_LeaseSet (nullptr), m_IsPublic (isPublic) + m_Service (service), m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) { CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); @@ -78,6 +78,19 @@ namespace stream delete m_LeaseSet; } + void StreamingDestination::SendTunnelDataMsgs (const std::vector& msgs) + { + m_CurrentOutboundTunnel = m_Pool->GetNextOutboundTunnel (m_CurrentOutboundTunnel); + if (m_CurrentOutboundTunnel) + m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); + else + { + LogPrint ("No outbound tunnels in the pool"); + for (auto it: msgs) + DeleteI2NPMessage (it.data); + } + } + void StreamingDestination::HandleNextPacket (Packet * packet) { uint32_t sendStreamID = packet->GetSendStreamID (); diff --git a/Destination.h b/Destination.h index 97b62a5f..3c69a40c 100644 --- a/Destination.h +++ b/Destination.h @@ -32,6 +32,7 @@ namespace stream void ResetAcceptor () { m_Acceptor = nullptr; }; bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; void HandleNextPacket (Packet * packet); + void SendTunnelDataMsgs (const std::vector& msgs); // implements LocalDestination const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; @@ -39,6 +40,7 @@ namespace stream const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; }; void SetLeaseSetUpdated (); void HandleDataMessage (const uint8_t * buf, size_t len); + void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; }; private: @@ -54,6 +56,7 @@ namespace stream uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256]; i2p::tunnel::TunnelPool * m_Pool; + i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel; i2p::data::LeaseSet * m_LeaseSet; bool m_IsPublic; diff --git a/Streaming.cpp b/Streaming.cpp index 782c3e6c..64ce8d16 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -14,8 +14,7 @@ namespace stream const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (&remote), - m_RoutingSession (nullptr), m_CurrentOutboundTunnel (nullptr), - m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) + m_RoutingSession (nullptr), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -24,7 +23,7 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination * local): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), - m_RemoteLeaseSet (nullptr), m_RoutingSession (nullptr), m_CurrentOutboundTunnel (nullptr), + m_RemoteLeaseSet (nullptr), m_RoutingSession (nullptr), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); @@ -103,7 +102,7 @@ namespace stream { // we have received duplicate. Most likely our outbound tunnel is dead LogPrint ("Duplicate message ", receivedSeqn, " received"); - m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel + m_LocalDestination->ResetCurrentOutboundTunnel (); // pick another outbound tunnel UpdateCurrentRemoteLease (); // pick another lease SendQuickAck (); // resend ack for previous message again delete packet; // packet dropped @@ -418,35 +417,29 @@ namespace stream m_LeaseSetUpdated = false; } - m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); - if (m_CurrentOutboundTunnel) - { - auto ts = i2p::util::GetMillisecondsSinceEpoch (); - if (ts >= m_CurrentRemoteLease.endDate) - UpdateCurrentRemoteLease (); - if (ts < m_CurrentRemoteLease.endDate) - { - std::vector msgs; - for (auto it: packets) - { - auto msg = m_RoutingSession->WrapSingleMessage ( - CreateDataMessage (this, it->GetBuffer (), it->GetLength ()), - leaseSet); - msgs.push_back (i2p::tunnel::TunnelMessageBlock - { - i2p::tunnel::eDeliveryTypeTunnel, - m_CurrentRemoteLease.tunnelGateway, m_CurrentRemoteLease.tunnelID, - msg - }); - leaseSet = nullptr; // send leaseSet only one time - } - m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); - } - else - LogPrint ("All leases are expired"); + auto ts = i2p::util::GetMillisecondsSinceEpoch (); + if (ts >= m_CurrentRemoteLease.endDate) + UpdateCurrentRemoteLease (); + if (ts < m_CurrentRemoteLease.endDate) + { + std::vector msgs; + for (auto it: packets) + { + auto msg = m_RoutingSession->WrapSingleMessage ( + CreateDataMessage (this, it->GetBuffer (), it->GetLength ()), + leaseSet); + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { + i2p::tunnel::eDeliveryTypeTunnel, + m_CurrentRemoteLease.tunnelGateway, m_CurrentRemoteLease.tunnelID, + msg + }); + leaseSet = nullptr; // send leaseSet only one time + } + m_LocalDestination->SendTunnelDataMsgs (msgs); } - else - LogPrint ("No outbound tunnels in the pool"); + else + LogPrint ("All leases are expired"); } void Stream::ScheduleResend () @@ -476,7 +469,7 @@ namespace stream } if (packets.size () > 0) { - m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel + m_LocalDestination->ResetCurrentOutboundTunnel (); // pick another outbound tunnel UpdateCurrentRemoteLease (); // pick another lease SendPackets (packets); } diff --git a/Streaming.h b/Streaming.h index ae958d4f..8c570260 100644 --- a/Streaming.h +++ b/Streaming.h @@ -129,7 +129,6 @@ namespace stream const i2p::data::LeaseSet * m_RemoteLeaseSet; i2p::garlic::GarlicRoutingSession * m_RoutingSession; i2p::data::Lease m_CurrentRemoteLease; - i2p::tunnel::OutboundTunnel * m_CurrentOutboundTunnel; std::queue m_ReceiveQueue; std::set m_SavedPackets; std::set m_SentPackets; diff --git a/Tunnel.cpp b/Tunnel.cpp index 85abd5a8..05aa5801 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -188,7 +188,7 @@ namespace tunnel m_Gateway.SendTunnelDataMsg (block); } - void OutboundTunnel::SendTunnelDataMsg (std::vector msgs) + void OutboundTunnel::SendTunnelDataMsg (const std::vector& msgs) { std::unique_lock l(m_SendMutex); for (auto& it : msgs) diff --git a/Tunnel.h b/Tunnel.h index 12eec8ea..7ce1c906 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -78,7 +78,7 @@ namespace tunnel OutboundTunnel (TunnelConfig * config): Tunnel (config), m_Gateway (this) {}; void SendTunnelDataMsg (const uint8_t * gwHash, uint32_t gwTunnel, i2p::I2NPMessage * msg); - void SendTunnelDataMsg (std::vector msgs); // multiple messages + void SendTunnelDataMsg (const std::vector& msgs); // multiple messages const i2p::data::RouterInfo * GetEndpointRouter () const { return GetTunnelConfig ()->GetLastHop ()->router; }; size_t GetNumSentBytes () const { return m_Gateway.GetNumSentBytes (); };