diff --git a/Garlic.cpp b/Garlic.cpp index 5e9796dd..b4b4623a 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -392,7 +392,7 @@ namespace garlic // later on we should let destination decide I2NPHeader * header = (I2NPHeader *)buf; if (header->typeID == eI2NPData) - i2p::stream::HandleDataMessage (&destination, buf + sizeof (I2NPHeader), be16toh (header->size)); + i2p::stream::HandleDataMessage (destination, buf + sizeof (I2NPHeader), be16toh (header->size)); else LogPrint ("Unexpected I2NP garlic message ", (int)header->typeID); break; diff --git a/Streaming.cpp b/Streaming.cpp index c69d53de..da1d9ac6 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -14,10 +14,11 @@ namespace i2p { namespace stream { - Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet& remote): - m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), - m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), - m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) + Stream::Stream (boost::asio::io_service& service, StreamingDestination * local, + const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), + m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), + m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (remote), + m_OutboundTunnel (nullptr) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -312,7 +313,6 @@ namespace stream m_CurrentRemoteLease.endDate = 0; } - StreamingDestination * sharedLocalDestination = nullptr; StreamingDestination::StreamingDestination (): m_LeaseSet (nullptr) { @@ -354,9 +354,10 @@ namespace stream } } - Stream * StreamingDestination::CreateNewStream (const i2p::data::LeaseSet& remote) + Stream * StreamingDestination::CreateNewStream (boost::asio::io_service& service, + const i2p::data::LeaseSet& remote) { - Stream * s = new Stream (this, remote); + Stream * s = new Stream (service, this, remote); m_Streams[s->GetRecvStreamID ()] = s; return s; } @@ -435,32 +436,78 @@ namespace stream CryptoPP::DSA::Signer signer (m_SigningPrivateKey); signer.SignMessage (i2p::context.GetRandomNumberGenerator (), buf, len, signature); } + + StreamingDestinations destinations; + void StreamingDestinations::Start () + { + if (!m_SharedLocalDestination) + m_SharedLocalDestination = new StreamingDestination (); + + m_IsRunning = true; + m_Thread = new std::thread (std::bind (&StreamingDestinations::Run, this)); + } + + void StreamingDestinations::Stop () + { + delete m_SharedLocalDestination; + + m_IsRunning = false; + m_Service.stop (); + if (m_Thread) + { + m_Thread->join (); + delete m_Thread; + m_Thread = 0; + } + } + + void StreamingDestinations::Run () + { + m_Service.run (); + } + + Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote) + { + if (!m_SharedLocalDestination) return nullptr; + return m_SharedLocalDestination->CreateNewStream (m_Service, remote); + } + + void StreamingDestinations::DeleteClientStream (Stream * stream) + { + if (m_SharedLocalDestination) + m_SharedLocalDestination->DeleteStream (stream); + else + delete stream; + } + + void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet) + { + // TODO: we have onle one destination, might be more + if (m_SharedLocalDestination) + m_SharedLocalDestination->HandleNextPacket (packet); + } Stream * CreateStream (const i2p::data::LeaseSet& remote) { - if (!sharedLocalDestination) - sharedLocalDestination = new StreamingDestination (); - return sharedLocalDestination->CreateNewStream (remote); + return destinations.CreateClientStream (remote); } void DeleteStream (Stream * stream) { - if (sharedLocalDestination) - sharedLocalDestination->DeleteStream (stream); + destinations.DeleteClientStream (stream); } void StartStreaming () { - if (!sharedLocalDestination) - sharedLocalDestination = new StreamingDestination (); + destinations.Start (); } void StopStreaming () { - delete sharedLocalDestination; + destinations.Stop (); } - void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len) + void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len) { uint32_t length = be32toh (*(uint32_t *)buf); buf += 4; @@ -481,9 +528,7 @@ namespace stream } decompressor.Get (uncompressed->buf, uncompressed->len); // then forward to streaming engine - // TODO: we have onle one destination, might be more - if (sharedLocalDestination) - sharedLocalDestination->HandleNextPacket (uncompressed); + destinations.HandleNextPacket (destination, uncompressed); } else LogPrint ("Data: protocol ", buf[9], " is not supported"); diff --git a/Streaming.h b/Streaming.h index a053803e..ac569234 100644 --- a/Streaming.h +++ b/Streaming.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include "I2PEndian.h" #include "Queue.h" @@ -67,7 +69,7 @@ namespace stream { public: - Stream (StreamingDestination * local, const i2p::data::LeaseSet& remote); + Stream (boost::asio::io_service& service, StreamingDestination * local, const i2p::data::LeaseSet& remote); ~Stream (); uint32_t GetSendStreamID () const { return m_SendStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; @@ -95,6 +97,7 @@ namespace stream private: + boost::asio::io_service& m_Service; uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; bool m_IsOpen, m_LeaseSetUpdated; StreamingDestination * m_LocalDestination; @@ -119,7 +122,7 @@ namespace stream i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; }; void Sign (uint8_t * buf, int len, uint8_t * signature) const; - Stream * CreateNewStream (const i2p::data::LeaseSet& remote); + Stream * CreateNewStream (boost::asio::io_service& service, const i2p::data::LeaseSet& remote); void DeleteStream (Stream * stream); void HandleNextPacket (Packet * packet); @@ -142,13 +145,43 @@ namespace stream CryptoPP::DSA::PrivateKey m_SigningPrivateKey; }; + class StreamingDestinations + { + public: + + StreamingDestinations (): m_IsRunning (false), m_Thread (nullptr), + m_Work (m_Service), m_SharedLocalDestination (nullptr) {}; + ~StreamingDestinations () {}; + + void Start (); + void Stop (); + + void HandleNextPacket (i2p::data::IdentHash destination, Packet * packet); + + Stream * CreateClientStream (const i2p::data::LeaseSet& remote); + void DeleteClientStream (Stream * stream); + + private: + + void Run (); + + private: + + bool m_IsRunning; + std::thread * m_Thread; + boost::asio::io_service m_Service; + boost::asio::io_service::work m_Work; + + StreamingDestination * m_SharedLocalDestination; + }; + Stream * CreateStream (const i2p::data::LeaseSet& remote); void DeleteStream (Stream * stream); void StartStreaming (); void StopStreaming (); // assuming data is I2CP message - void HandleDataMessage (i2p::data::IdentHash * destination, const uint8_t * buf, size_t len); + void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len); I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len); } }