From ed0af7b93733d1e956fafb551d0217736a63a494 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 1 Aug 2014 14:54:14 -0400 Subject: [PATCH] incoming streams support --- Streaming.cpp | 83 ++++++++++++++++++++++++++++++++++++--------------- Streaming.h | 23 +++++++++----- 2 files changed, 74 insertions(+), 32 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index e00ec13e..e4ce6d5e 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -10,6 +10,7 @@ #include "Timestamp.h" #include "CryptoConst.h" #include "Garlic.h" +#include "NetDb.h" #include "Streaming.h" namespace i2p @@ -18,14 +19,22 @@ namespace stream { Stream::Stream (boost::asio::io_service& service, StreamingDestination * local, const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), - m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), - m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (remote), - m_ReceiveTimer (m_Service) + m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), + m_IsOutgoing(true), m_LeaseSetUpdated (true), m_LocalDestination (local), + m_RemoteLeaseSet (&remote), m_ReceiveTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); } + Stream::Stream (boost::asio::io_service& service, StreamingDestination * local): + m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), + m_IsOpen (false), m_IsOutgoing(true), m_LeaseSetUpdated (true), m_LocalDestination (local), + m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service) + { + m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); + } + Stream::~Stream () { m_ReceiveTimer.cancel (); @@ -41,9 +50,9 @@ namespace stream void Stream::HandleNextPacket (Packet * packet) { - if (!m_SendStreamID) - m_SendStreamID = packet->GetReceiveStreamID (); - + if (!m_SendStreamID) + m_SendStreamID = packet->GetReceiveStreamID (); + uint32_t receivedSeqn = packet->GetSeqn (); LogPrint ("Received seqn=", receivedSeqn); if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1) @@ -115,6 +124,13 @@ namespace stream if (flags & PACKET_FLAG_FROM_INCLUDED) { LogPrint ("From identity"); + if (!m_RemoteLeaseSet) + { + i2p::data::Identity * identity = (i2p::data::Identity *)optionData; + m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (identity->Hash ()); + if (!m_RemoteLeaseSet) + LogPrint ("LeaseSet ", identity->Hash ().ToBase64 (), " not found"); + } optionData += sizeof (i2p::data::Identity); } @@ -281,6 +297,12 @@ namespace stream bool Stream::SendPacket (const uint8_t * buf, size_t len) { + if (!m_RemoteLeaseSet) + { + LogPrint ("Can't send packet. Missing remote LeaseSet"); + return false; + } + I2NPMessage * leaseSet = nullptr; if (m_LeaseSetUpdated) @@ -289,7 +311,7 @@ namespace stream m_LeaseSetUpdated = false; } - I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, + I2NPMessage * msg = i2p::garlic::routing.WrapMessage (*m_RemoteLeaseSet, CreateDataMessage (this, buf, len), leaseSet); auto outboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); if (outboundTunnel) @@ -318,18 +340,24 @@ namespace stream void Stream::UpdateCurrentRemoteLease () { - auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); - if (!leases.empty ()) - { - uint32_t i = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (0, leases.size () - 1); - m_CurrentRemoteLease = leases[i]; - } + if (m_RemoteLeaseSet) + { + auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (); + if (!leases.empty ()) + { + uint32_t i = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (0, leases.size () - 1); + m_CurrentRemoteLease = leases[i]; + } + else + m_CurrentRemoteLease.endDate = 0; + } else m_CurrentRemoteLease.endDate = 0; } - StreamingDestination::StreamingDestination (): m_LeaseSet (nullptr) + StreamingDestination::StreamingDestination (boost::asio::io_service& service): + m_Service (service), m_LeaseSet (nullptr) { m_Keys = i2p::data::CreateRandomKeys (); @@ -341,7 +369,8 @@ namespace stream m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel } - StreamingDestination::StreamingDestination (const std::string& fullPath): m_LeaseSet (nullptr) + StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath): + m_Service (service), m_LeaseSet (nullptr) { std::ifstream s(fullPath.c_str (), std::ifstream::binary); if (s.is_open ()) @@ -378,21 +407,27 @@ namespace stream delete packet; } } - else + else // new incoming stream { - LogPrint ("Uncoming stream is not implemented yet"); - delete packet; + auto incomingStream = CreateNewIncomingStream (); + incomingStream->HandleNextPacket (packet); } } - Stream * StreamingDestination::CreateNewStream (boost::asio::io_service& service, - const i2p::data::LeaseSet& remote) + Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) { - Stream * s = new Stream (service, this, remote); + Stream * s = new Stream (m_Service, this, remote); m_Streams[s->GetRecvStreamID ()] = s; return s; } + Stream * StreamingDestination::CreateNewIncomingStream () + { + Stream * s = new Stream (m_Service, this); + m_Streams[s->GetRecvStreamID ()] = s; + return s; + } + void StreamingDestination::DeleteStream (Stream * stream) { if (stream) @@ -429,7 +464,7 @@ namespace stream { if (!m_SharedLocalDestination) { - m_SharedLocalDestination = new StreamingDestination (); + m_SharedLocalDestination = new StreamingDestination (m_Service); m_Destinations[m_SharedLocalDestination->GetIdentHash ()] = m_SharedLocalDestination; } LoadLocalDestinations (); @@ -475,7 +510,7 @@ namespace stream #else it->path(); #endif - auto localDestination = new StreamingDestination (fullPath); + auto localDestination = new StreamingDestination (m_Service, fullPath); m_Destinations[localDestination->GetIdentHash ()] = localDestination; numDestinations++; } @@ -487,7 +522,7 @@ namespace stream Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote) { if (!m_SharedLocalDestination) return nullptr; - return m_SharedLocalDestination->CreateNewStream (m_Service, remote); + return m_SharedLocalDestination->CreateNewOutgoingStream (remote); } void StreamingDestinations::DeleteClientStream (Stream * stream) diff --git a/Streaming.h b/Streaming.h index c138ae03..39b15ced 100644 --- a/Streaming.h +++ b/Streaming.h @@ -69,11 +69,13 @@ namespace stream { public: - Stream (boost::asio::io_service& service, StreamingDestination * local, const i2p::data::LeaseSet& remote); + Stream (boost::asio::io_service& service, StreamingDestination * local, const i2p::data::LeaseSet& remote); // outgoing + Stream (boost::asio::io_service& service, StreamingDestination * local); // incoming + ~Stream (); uint32_t GetSendStreamID () const { return m_SendStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; - const i2p::data::LeaseSet& GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; + const i2p::data::LeaseSet * GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; bool IsOpen () const { return m_IsOpen; }; bool IsEstablished () const { return m_SendStreamID; }; @@ -106,9 +108,9 @@ namespace stream boost::asio::io_service& m_Service; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; - bool m_IsOpen, m_LeaseSetUpdated; + bool m_IsOpen, m_IsOutgoing, m_LeaseSetUpdated; StreamingDestination * m_LocalDestination; - const i2p::data::LeaseSet& m_RemoteLeaseSet; + const i2p::data::LeaseSet * m_RemoteLeaseSet; i2p::data::Lease m_CurrentRemoteLease; std::queue m_ReceiveQueue; std::set m_SavedPackets; @@ -119,15 +121,15 @@ namespace stream { public: - StreamingDestination (); - StreamingDestination (const std::string& fullPath); + StreamingDestination (boost::asio::io_service& service); + StreamingDestination (boost::asio::io_service& service, const std::string& fullPath); ~StreamingDestination (); const i2p::data::PrivateKeys& GetKeys () const { return m_Keys; }; I2NPMessage * GetLeaseSetMsg (); i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; }; - Stream * CreateNewStream (boost::asio::io_service& service, const i2p::data::LeaseSet& remote); + Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); void DeleteStream (Stream * stream); void HandleNextPacket (Packet * packet); @@ -137,9 +139,14 @@ namespace stream const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; }; const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; }; void Sign (const uint8_t * buf, int len, uint8_t * signature) const; - + + private: + + Stream * CreateNewIncomingStream (); + private: + boost::asio::io_service& m_Service; std::map m_Streams; i2p::data::PrivateKeys m_Keys; i2p::data::IdentHash m_IdentHash;