From 482fc0e8b1c3c0e63069d903bd8ce194a7a392cf Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 5 Oct 2014 08:54:59 -0400 Subject: [PATCH] split Streaming to stream and destination --- Daemon.cpp | 1 + Destination.cpp | 414 ++++++++++++++++++++++++++++++++++++ Destination.h | 125 +++++++++++ Garlic.cpp | 2 +- HTTPServer.cpp | 1 + I2PTunnel.cpp | 1 + SAM.cpp | 1 + SOCKS.cpp | 1 + Streaming.cpp | 410 +---------------------------------- Streaming.h | 110 ---------- Win32/i2pd.vcxproj | 4 +- build/CMakeLists.txt | 1 + build/autotools/Makefile.in | 7 +- filelist.mk | 14 +- 14 files changed, 562 insertions(+), 530 deletions(-) create mode 100644 Destination.cpp create mode 100644 Destination.h diff --git a/Daemon.cpp b/Daemon.cpp index d0242bf7..b23eab10 100644 --- a/Daemon.cpp +++ b/Daemon.cpp @@ -13,6 +13,7 @@ #include "Garlic.h" #include "util.h" #include "Streaming.h" +#include "Destination.h" #include "HTTPServer.h" #include "HTTPProxy.h" #include "SOCKS.h" diff --git a/Destination.cpp b/Destination.cpp new file mode 100644 index 00000000..0974ecdf --- /dev/null +++ b/Destination.cpp @@ -0,0 +1,414 @@ +#include +#include +#include +#include +#include "Log.h" +#include "util.h" +#include "Destination.h" + +namespace i2p +{ +namespace stream +{ + StreamingDestination::StreamingDestination (boost::asio::io_service& service, bool isPublic): + m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic) + { + m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA + CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); + dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); + m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel + if (m_IsPublic) + LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created"); + } + + StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic): + m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic) + { + std::ifstream s(fullPath.c_str (), std::ifstream::binary); + if (s.is_open ()) + { + s.seekg (0, std::ios::end); + size_t len = s.tellg(); + s.seekg (0, std::ios::beg); + uint8_t * buf = new uint8_t[len]; + s.read ((char *)buf, len); + m_Keys.FromBuffer (buf, len); + delete[] buf; + LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p loaded"); + } + else + { + LogPrint ("Can't open file ", fullPath, " Creating new one"); + m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); + std::ofstream f (fullPath, std::ofstream::binary | std::ofstream::out); + size_t len = m_Keys.GetFullLen (); + uint8_t * buf = new uint8_t[len]; + len = m_Keys.ToBuffer (buf, len); + f.write ((char *)buf, len); + delete[] buf; + + LogPrint ("New private keys file ", fullPath, " for ", m_Keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p created"); + } + + CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); + dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); + m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel + } + + StreamingDestination::StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic): + m_Service (service), m_Keys (keys), m_LeaseSet (nullptr), m_IsPublic (isPublic) + { + CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); + dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); + m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel + if (m_IsPublic) + LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created"); + } + + StreamingDestination::~StreamingDestination () + { + { + std::unique_lock l(m_StreamsMutex); + for (auto it: m_Streams) + delete it.second; + m_Streams.clear (); + } + if (m_Pool) + i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); + delete m_LeaseSet; + } + + void StreamingDestination::HandleNextPacket (Packet * packet) + { + uint32_t sendStreamID = packet->GetSendStreamID (); + if (sendStreamID) + { + auto it = m_Streams.find (sendStreamID); + if (it != m_Streams.end ()) + it->second->HandleNextPacket (packet); + else + { + LogPrint ("Unknown stream ", sendStreamID); + delete packet; + } + } + else // new incoming stream + { + auto incomingStream = CreateNewIncomingStream (); + incomingStream->HandleNextPacket (packet); + if (m_Acceptor != nullptr) + m_Acceptor (incomingStream); + else + { + LogPrint ("Acceptor for incoming stream is not set"); + DeleteStream (incomingStream); + } + } + } + + Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) + { + Stream * s = new Stream (m_Service, this, remote); + std::unique_lock l(m_StreamsMutex); + m_Streams[s->GetRecvStreamID ()] = s; + return s; + } + + Stream * StreamingDestination::CreateNewIncomingStream () + { + Stream * s = new Stream (m_Service, this); + std::unique_lock l(m_StreamsMutex); + m_Streams[s->GetRecvStreamID ()] = s; + return s; + } + + void StreamingDestination::DeleteStream (Stream * stream) + { + if (stream) + { + std::unique_lock l(m_StreamsMutex); + auto it = m_Streams.find (stream->GetRecvStreamID ()); + if (it != m_Streams.end ()) + { + m_Streams.erase (it); + delete stream; + } + } + } + + const i2p::data::LeaseSet * StreamingDestination::GetLeaseSet () + { + if (!m_Pool) return nullptr; + if (!m_LeaseSet) + UpdateLeaseSet (); + return m_LeaseSet; + } + + void StreamingDestination::UpdateLeaseSet () + { + auto newLeaseSet = new i2p::data::LeaseSet (*m_Pool); + if (!m_LeaseSet) + m_LeaseSet = newLeaseSet; + else + { + // TODO: implement it better + *m_LeaseSet = *newLeaseSet; + delete newLeaseSet; + } + } + + void StreamingDestination::SetLeaseSetUpdated () + { + UpdateLeaseSet (); + for (auto it: m_Streams) + it.second->SetLeaseSetUpdated (); + if (m_IsPublic) + i2p::data::netdb.PublishLeaseSet (m_LeaseSet, m_Pool); + } + + StreamingDestinations destinations; + void StreamingDestinations::Start () + { + if (!m_SharedLocalDestination) + { + m_SharedLocalDestination = new StreamingDestination (m_Service, false); // non-public + m_Destinations[m_SharedLocalDestination->GetIdentity ().GetIdentHash ()] = m_SharedLocalDestination; + } + // LoadLocalDestinations (); + + m_IsRunning = true; + m_Thread = new std::thread (std::bind (&StreamingDestinations::Run, this)); + } + + void StreamingDestinations::Stop () + { + for (auto it: m_Destinations) + delete it.second; + m_Destinations.clear (); + m_SharedLocalDestination = 0; // deleted through m_Destination + + m_IsRunning = false; + m_Service.stop (); + if (m_Thread) + { + m_Thread->join (); + delete m_Thread; + m_Thread = 0; + } + } + + void StreamingDestinations::Run () + { + m_Service.run (); + } + + void StreamingDestinations::LoadLocalDestinations () + { + int numDestinations = 0; + boost::filesystem::path p (i2p::util::filesystem::GetDataDir()); + boost::filesystem::directory_iterator end; + for (boost::filesystem::directory_iterator it (p); it != end; ++it) + { + if (boost::filesystem::is_regular_file (*it) && it->path ().extension () == ".dat") + { + auto fullPath = +#if BOOST_VERSION > 10500 + it->path().string(); +#else + it->path(); +#endif + auto localDestination = new StreamingDestination (m_Service, fullPath, true); + m_Destinations[localDestination->GetIdentHash ()] = localDestination; + numDestinations++; + } + } + if (numDestinations > 0) + LogPrint (numDestinations, " local destinations loaded"); + } + + StreamingDestination * StreamingDestinations::LoadLocalDestination (const std::string& filename, bool isPublic) + { + auto localDestination = new StreamingDestination (m_Service, i2p::util::filesystem::GetFullPath (filename), isPublic); + std::unique_lock l(m_DestinationsMutex); + m_Destinations[localDestination->GetIdentHash ()] = localDestination; + return localDestination; + } + + StreamingDestination * StreamingDestinations::CreateNewLocalDestination (bool isPublic) + { + auto localDestination = new StreamingDestination (m_Service, isPublic); + std::unique_lock l(m_DestinationsMutex); + m_Destinations[localDestination->GetIdentHash ()] = localDestination; + return localDestination; + } + + void StreamingDestinations::DeleteLocalDestination (StreamingDestination * destination) + { + if (!destination) return; + auto it = m_Destinations.find (destination->GetIdentHash ()); + if (it != m_Destinations.end ()) + { + auto d = it->second; + { + std::unique_lock l(m_DestinationsMutex); + m_Destinations.erase (it); + } + delete d; + } + } + + StreamingDestination * StreamingDestinations::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic) + { + auto it = m_Destinations.find (keys.GetPublic ().GetIdentHash ()); + if (it != m_Destinations.end ()) + { + LogPrint ("Local destination ", keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p exists"); + return nullptr; + } + auto localDestination = new StreamingDestination (m_Service, keys, isPublic); + std::unique_lock l(m_DestinationsMutex); + m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination; + return localDestination; + } + + Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote) + { + if (!m_SharedLocalDestination) return nullptr; + return m_SharedLocalDestination->CreateNewOutgoingStream (remote); + } + + void StreamingDestinations::DeleteStream (Stream * stream) + { + if (stream) + stream->GetLocalDestination ()->DeleteStream (stream); + } + + void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet) + { + m_Service.post (boost::bind (&StreamingDestinations::PostNextPacket, this, destination, packet)); + } + + void StreamingDestinations::PostNextPacket (i2p::data::IdentHash destination, Packet * packet) + { + auto it = m_Destinations.find (destination); + if (it != m_Destinations.end ()) + it->second->HandleNextPacket (packet); + else + { + LogPrint ("Local destination ", destination.ToBase64 (), " not found"); + delete packet; + } + } + + StreamingDestination * StreamingDestinations::FindLocalDestination (const i2p::data::IdentHash& destination) const + { + auto it = m_Destinations.find (destination); + if (it != m_Destinations.end ()) + return it->second; + return nullptr; + } + + Stream * CreateStream (const i2p::data::LeaseSet& remote) + { + return destinations.CreateClientStream (remote); + } + + void DeleteStream (Stream * stream) + { + destinations.DeleteStream (stream); + } + + void StartStreaming () + { + destinations.Start (); + } + + void StopStreaming () + { + destinations.Stop (); + } + + StreamingDestination * GetSharedLocalDestination () + { + return destinations.GetSharedLocalDestination (); + } + + StreamingDestination * CreateNewLocalDestination (bool isPublic) + { + return destinations.CreateNewLocalDestination (isPublic); + } + + StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic) + { + return destinations.CreateNewLocalDestination (keys, isPublic); + } + + void DeleteLocalDestination (StreamingDestination * destination) + { + destinations.DeleteLocalDestination (destination); + } + + StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination) + { + return destinations.FindLocalDestination (destination); + } + + StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic) + { + return destinations.LoadLocalDestination (filename, isPublic); + } + + const StreamingDestinations& GetLocalDestinations () + { + return destinations; + } + + void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len) + { + uint32_t length = be32toh (*(uint32_t *)buf); + buf += 4; + // we assume I2CP payload + if (buf[9] == 6) // streaming protocol + { + // unzip it + CryptoPP::Gunzip decompressor; + decompressor.Put (buf, length); + decompressor.MessageEnd(); + Packet * uncompressed = new Packet; + uncompressed->offset = 0; + uncompressed->len = decompressor.MaxRetrievable (); + if (uncompressed->len > MAX_PACKET_SIZE) + { + LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size"); + uncompressed->len = MAX_PACKET_SIZE; + } + decompressor.Get (uncompressed->buf, uncompressed->len); + // then forward to streaming engine thread + destinations.HandleNextPacket (destination, uncompressed); + } + else + LogPrint ("Data: protocol ", buf[9], " is not supported"); + } + + I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len) + { + I2NPMessage * msg = NewI2NPShortMessage (); + CryptoPP::Gzip compressor; // DEFAULT_DEFLATE_LEVEL + if (len <= COMPRESSION_THRESHOLD_SIZE) + compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); + compressor.Put (payload, len); + compressor.MessageEnd(); + int size = compressor.MaxRetrievable (); + uint8_t * buf = msg->GetPayload (); + *(uint32_t *)buf = htobe32 (size); // length + buf += 4; + compressor.Get (buf, size); + memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later + buf[9] = 6; // streaming protocol + msg->len += size + 4; + FillI2NPMessageHeader (msg, eI2NPData); + + return msg; + } +} +} diff --git a/Destination.h b/Destination.h new file mode 100644 index 00000000..fccde2f1 --- /dev/null +++ b/Destination.h @@ -0,0 +1,125 @@ +#ifndef DESTINATION_H__ +#define DESTINATION_H__ + +#include +#include +#include "Identity.h" +#include "TunnelPool.h" +#include "CryptoConst.h" +#include "NetDb.h" +#include "Streaming.h" + +namespace i2p +{ +namespace stream +{ + class StreamingDestination: public i2p::data::LocalDestination + { + public: + + StreamingDestination (boost::asio::io_service& service, bool isPublic); + StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic); + StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic); + ~StreamingDestination (); + + const i2p::data::LeaseSet * GetLeaseSet (); + i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; }; + + Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); + void DeleteStream (Stream * stream); + void SetAcceptor (const std::function& acceptor) { m_Acceptor = acceptor; }; + void ResetAcceptor () { m_Acceptor = nullptr; }; + bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; + void HandleNextPacket (Packet * packet); + + // implements LocalDestination + const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; + const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; }; + const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; }; + void SetLeaseSetUpdated (); + + private: + + Stream * CreateNewIncomingStream (); + void UpdateLeaseSet (); + + private: + + boost::asio::io_service& m_Service; + std::mutex m_StreamsMutex; + std::map m_Streams; + i2p::data::PrivateKeys m_Keys; + uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256]; + + i2p::tunnel::TunnelPool * m_Pool; + i2p::data::LeaseSet * m_LeaseSet; + bool m_IsPublic; + + std::function m_Acceptor; + }; + + class StreamingDestinations + { + public: + + StreamingDestinations (): m_IsRunning (false), m_Thread (nullptr), + m_Work (m_Service), m_SharedLocalDestination (nullptr) {}; + ~StreamingDestinations () {}; + + void Start (); + void Stop (); + + void HandleNextPacket (i2p::data::IdentHash destination, Packet * packet); + + Stream * CreateClientStream (const i2p::data::LeaseSet& remote); + void DeleteStream (Stream * stream); + StreamingDestination * GetSharedLocalDestination () const { return m_SharedLocalDestination; }; + StreamingDestination * CreateNewLocalDestination (bool isPublic); + StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic); + void DeleteLocalDestination (StreamingDestination * destination); + StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination) const; + StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic); + + private: + + void Run (); + void LoadLocalDestinations (); + void PostNextPacket (i2p::data::IdentHash destination, Packet * packet); + + private: + + bool m_IsRunning; + std::thread * m_Thread; + boost::asio::io_service m_Service; + boost::asio::io_service::work m_Work; + + std::mutex m_DestinationsMutex; + std::map m_Destinations; + StreamingDestination * m_SharedLocalDestination; + + public: + // for HTTP + const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; }; + }; + + + Stream * CreateStream (const i2p::data::LeaseSet& remote); + void DeleteStream (Stream * stream); + void StartStreaming (); + void StopStreaming (); + StreamingDestination * GetSharedLocalDestination (); + StreamingDestination * CreateNewLocalDestination (bool isPublic = true); + StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true); + void DeleteLocalDestination (StreamingDestination * destination); + StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination); + StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic); + // for HTTP + const StreamingDestinations& GetLocalDestinations (); + + // assuming data is I2CP message + void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len); + I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len); +} +} + +#endif diff --git a/Garlic.cpp b/Garlic.cpp index dc5c3841..2b5d7112 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -7,7 +7,7 @@ #include "Tunnel.h" #include "TunnelPool.h" #include "Timestamp.h" -#include "Streaming.h" +#include "Destination.h" #include "Garlic.h" namespace i2p diff --git a/HTTPServer.cpp b/HTTPServer.cpp index b89946e6..da40285a 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -8,6 +8,7 @@ #include "NetDb.h" #include "I2PEndian.h" #include "Streaming.h" +#include "Destination.h" #include "RouterContext.h" #include "HTTPServer.h" diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 07bc6fa1..f5b2b6e6 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -2,6 +2,7 @@ #include "base64.h" #include "Log.h" #include "NetDb.h" +#include "Destination.h" #include "I2PTunnel.h" namespace i2p diff --git a/SAM.cpp b/SAM.cpp index 9b423ff8..a9f1a772 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -5,6 +5,7 @@ #include "Identity.h" #include "Log.h" #include "NetDb.h" +#include "Destination.h" #include "SAM.h" namespace i2p diff --git a/SOCKS.cpp b/SOCKS.cpp index 7d686073..9968b97c 100644 --- a/SOCKS.cpp +++ b/SOCKS.cpp @@ -1,6 +1,7 @@ #include "SOCKS.h" #include "Identity.h" #include "NetDb.h" +#include "Destination.h" #include #include #include diff --git a/Streaming.cpp b/Streaming.cpp index 4ff5273e..782c3e6c 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -1,15 +1,9 @@ -#include -#include -#include -#include -#include "util.h" #include "Log.h" #include "RouterInfo.h" #include "RouterContext.h" #include "Tunnel.h" #include "Timestamp.h" -#include "CryptoConst.h" -#include "NetDb.h" +#include "Destination.h" #include "Streaming.h" namespace i2p @@ -513,408 +507,6 @@ namespace stream } else m_CurrentRemoteLease.endDate = 0; - } - - - StreamingDestination::StreamingDestination (boost::asio::io_service& service, bool isPublic): - m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic) - { - m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA - CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); - dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); - m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel - if (m_IsPublic) - LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created"); - } - - StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic): - m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic) - { - std::ifstream s(fullPath.c_str (), std::ifstream::binary); - if (s.is_open ()) - { - s.seekg (0, std::ios::end); - size_t len = s.tellg(); - s.seekg (0, std::ios::beg); - uint8_t * buf = new uint8_t[len]; - s.read ((char *)buf, len); - m_Keys.FromBuffer (buf, len); - delete[] buf; - LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p loaded"); - } - else - { - LogPrint ("Can't open file ", fullPath, " Creating new one"); - m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); - std::ofstream f (fullPath, std::ofstream::binary | std::ofstream::out); - size_t len = m_Keys.GetFullLen (); - uint8_t * buf = new uint8_t[len]; - len = m_Keys.ToBuffer (buf, len); - f.write ((char *)buf, len); - delete[] buf; - - LogPrint ("New private keys file ", fullPath, " for ", m_Keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p created"); - } - - CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); - dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); - m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel - } - - StreamingDestination::StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic): - m_Service (service), m_Keys (keys), m_LeaseSet (nullptr), m_IsPublic (isPublic) - { - CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); - dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); - m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel - if (m_IsPublic) - LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created"); - } - - StreamingDestination::~StreamingDestination () - { - { - std::unique_lock l(m_StreamsMutex); - for (auto it: m_Streams) - delete it.second; - m_Streams.clear (); - } - if (m_Pool) - i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); - delete m_LeaseSet; - } - - void StreamingDestination::HandleNextPacket (Packet * packet) - { - uint32_t sendStreamID = packet->GetSendStreamID (); - if (sendStreamID) - { - auto it = m_Streams.find (sendStreamID); - if (it != m_Streams.end ()) - it->second->HandleNextPacket (packet); - else - { - LogPrint ("Unknown stream ", sendStreamID); - delete packet; - } - } - else // new incoming stream - { - auto incomingStream = CreateNewIncomingStream (); - incomingStream->HandleNextPacket (packet); - if (m_Acceptor != nullptr) - m_Acceptor (incomingStream); - else - { - LogPrint ("Acceptor for incoming stream is not set"); - DeleteStream (incomingStream); - } - } - } - - Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) - { - Stream * s = new Stream (m_Service, this, remote); - std::unique_lock l(m_StreamsMutex); - m_Streams[s->GetRecvStreamID ()] = s; - return s; - } - - Stream * StreamingDestination::CreateNewIncomingStream () - { - Stream * s = new Stream (m_Service, this); - std::unique_lock l(m_StreamsMutex); - m_Streams[s->GetRecvStreamID ()] = s; - return s; - } - - void StreamingDestination::DeleteStream (Stream * stream) - { - if (stream) - { - std::unique_lock l(m_StreamsMutex); - auto it = m_Streams.find (stream->GetRecvStreamID ()); - if (it != m_Streams.end ()) - { - m_Streams.erase (it); - delete stream; - } - } - } - - const i2p::data::LeaseSet * StreamingDestination::GetLeaseSet () - { - if (!m_Pool) return nullptr; - if (!m_LeaseSet) - UpdateLeaseSet (); - return m_LeaseSet; - } - - void StreamingDestination::UpdateLeaseSet () - { - auto newLeaseSet = new i2p::data::LeaseSet (*m_Pool); - if (!m_LeaseSet) - m_LeaseSet = newLeaseSet; - else - { - // TODO: implement it better - *m_LeaseSet = *newLeaseSet; - delete newLeaseSet; - } - } - - void StreamingDestination::SetLeaseSetUpdated () - { - UpdateLeaseSet (); - for (auto it: m_Streams) - it.second->SetLeaseSetUpdated (); - if (m_IsPublic) - i2p::data::netdb.PublishLeaseSet (m_LeaseSet, m_Pool); - } - - StreamingDestinations destinations; - void StreamingDestinations::Start () - { - if (!m_SharedLocalDestination) - { - m_SharedLocalDestination = new StreamingDestination (m_Service, false); // non-public - m_Destinations[m_SharedLocalDestination->GetIdentity ().GetIdentHash ()] = m_SharedLocalDestination; - } - // LoadLocalDestinations (); - - m_IsRunning = true; - m_Thread = new std::thread (std::bind (&StreamingDestinations::Run, this)); - } - - void StreamingDestinations::Stop () - { - for (auto it: m_Destinations) - delete it.second; - m_Destinations.clear (); - m_SharedLocalDestination = 0; // deleted through m_Destination - - m_IsRunning = false; - m_Service.stop (); - if (m_Thread) - { - m_Thread->join (); - delete m_Thread; - m_Thread = 0; - } - } - - void StreamingDestinations::Run () - { - m_Service.run (); - } - - void StreamingDestinations::LoadLocalDestinations () - { - int numDestinations = 0; - boost::filesystem::path p (i2p::util::filesystem::GetDataDir()); - boost::filesystem::directory_iterator end; - for (boost::filesystem::directory_iterator it (p); it != end; ++it) - { - if (boost::filesystem::is_regular_file (*it) && it->path ().extension () == ".dat") - { - auto fullPath = -#if BOOST_VERSION > 10500 - it->path().string(); -#else - it->path(); -#endif - auto localDestination = new StreamingDestination (m_Service, fullPath, true); - m_Destinations[localDestination->GetIdentHash ()] = localDestination; - numDestinations++; - } - } - if (numDestinations > 0) - LogPrint (numDestinations, " local destinations loaded"); - } - - StreamingDestination * StreamingDestinations::LoadLocalDestination (const std::string& filename, bool isPublic) - { - auto localDestination = new StreamingDestination (m_Service, i2p::util::filesystem::GetFullPath (filename), isPublic); - std::unique_lock l(m_DestinationsMutex); - m_Destinations[localDestination->GetIdentHash ()] = localDestination; - return localDestination; - } - - StreamingDestination * StreamingDestinations::CreateNewLocalDestination (bool isPublic) - { - auto localDestination = new StreamingDestination (m_Service, isPublic); - std::unique_lock l(m_DestinationsMutex); - m_Destinations[localDestination->GetIdentHash ()] = localDestination; - return localDestination; - } - - void StreamingDestinations::DeleteLocalDestination (StreamingDestination * destination) - { - if (!destination) return; - auto it = m_Destinations.find (destination->GetIdentHash ()); - if (it != m_Destinations.end ()) - { - auto d = it->second; - { - std::unique_lock l(m_DestinationsMutex); - m_Destinations.erase (it); - } - delete d; - } - } - - StreamingDestination * StreamingDestinations::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic) - { - auto it = m_Destinations.find (keys.GetPublic ().GetIdentHash ()); - if (it != m_Destinations.end ()) - { - LogPrint ("Local destination ", keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p exists"); - return nullptr; - } - auto localDestination = new StreamingDestination (m_Service, keys, isPublic); - std::unique_lock l(m_DestinationsMutex); - m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination; - return localDestination; - } - - Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote) - { - if (!m_SharedLocalDestination) return nullptr; - return m_SharedLocalDestination->CreateNewOutgoingStream (remote); - } - - void StreamingDestinations::DeleteStream (Stream * stream) - { - if (stream) - stream->GetLocalDestination ()->DeleteStream (stream); - } - - void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet) - { - m_Service.post (boost::bind (&StreamingDestinations::PostNextPacket, this, destination, packet)); - } - - void StreamingDestinations::PostNextPacket (i2p::data::IdentHash destination, Packet * packet) - { - auto it = m_Destinations.find (destination); - if (it != m_Destinations.end ()) - it->second->HandleNextPacket (packet); - else - { - LogPrint ("Local destination ", destination.ToBase64 (), " not found"); - delete packet; - } - } - - StreamingDestination * StreamingDestinations::FindLocalDestination (const i2p::data::IdentHash& destination) const - { - auto it = m_Destinations.find (destination); - if (it != m_Destinations.end ()) - return it->second; - return nullptr; - } - - Stream * CreateStream (const i2p::data::LeaseSet& remote) - { - return destinations.CreateClientStream (remote); - } - - void DeleteStream (Stream * stream) - { - destinations.DeleteStream (stream); - } - - void StartStreaming () - { - destinations.Start (); - } - - void StopStreaming () - { - destinations.Stop (); - } - - StreamingDestination * GetSharedLocalDestination () - { - return destinations.GetSharedLocalDestination (); - } - - StreamingDestination * CreateNewLocalDestination (bool isPublic) - { - return destinations.CreateNewLocalDestination (isPublic); - } - - StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic) - { - return destinations.CreateNewLocalDestination (keys, isPublic); - } - - void DeleteLocalDestination (StreamingDestination * destination) - { - destinations.DeleteLocalDestination (destination); - } - - StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination) - { - return destinations.FindLocalDestination (destination); - } - - StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic) - { - return destinations.LoadLocalDestination (filename, isPublic); } - - const StreamingDestinations& GetLocalDestinations () - { - return destinations; - } - - void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len) - { - uint32_t length = be32toh (*(uint32_t *)buf); - buf += 4; - // we assume I2CP payload - if (buf[9] == 6) // streaming protocol - { - // unzip it - CryptoPP::Gunzip decompressor; - decompressor.Put (buf, length); - decompressor.MessageEnd(); - Packet * uncompressed = new Packet; - uncompressed->offset = 0; - uncompressed->len = decompressor.MaxRetrievable (); - if (uncompressed->len > MAX_PACKET_SIZE) - { - LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size"); - uncompressed->len = MAX_PACKET_SIZE; - } - decompressor.Get (uncompressed->buf, uncompressed->len); - // then forward to streaming engine thread - destinations.HandleNextPacket (destination, uncompressed); - } - else - LogPrint ("Data: protocol ", buf[9], " is not supported"); - } - - I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len) - { - I2NPMessage * msg = NewI2NPShortMessage (); - CryptoPP::Gzip compressor; // DEFAULT_DEFLATE_LEVEL - if (len <= COMPRESSION_THRESHOLD_SIZE) - compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); - compressor.Put (payload, len); - compressor.MessageEnd(); - int size = compressor.MaxRetrievable (); - uint8_t * buf = msg->GetPayload (); - *(uint32_t *)buf = htobe32 (size); // length - buf += 4; - compressor.Get (buf, size); - memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later - buf[9] = 6; // streaming protocol - msg->len += size + 4; - FillI2NPMessageHeader (msg, eI2NPData); - - return msg; - } } } diff --git a/Streaming.h b/Streaming.h index fc29e735..66f16282 100644 --- a/Streaming.h +++ b/Streaming.h @@ -6,17 +6,13 @@ #include #include #include -#include #include -#include #include #include -#include #include "I2PEndian.h" #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" -#include "TunnelPool.h" #include "Garlic.h" namespace i2p @@ -138,112 +134,6 @@ namespace stream std::set m_SentPackets; boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer; }; - - class StreamingDestination: public i2p::data::LocalDestination - { - public: - - StreamingDestination (boost::asio::io_service& service, bool isPublic); - StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic); - StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic); - ~StreamingDestination (); - - const i2p::data::LeaseSet * GetLeaseSet (); - i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; }; - - Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); - void DeleteStream (Stream * stream); - void SetAcceptor (const std::function& acceptor) { m_Acceptor = acceptor; }; - void ResetAcceptor () { m_Acceptor = nullptr; }; - bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; - void HandleNextPacket (Packet * packet); - - // implements LocalDestination - const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; - const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; }; - const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; }; - void SetLeaseSetUpdated (); - - private: - - Stream * CreateNewIncomingStream (); - void UpdateLeaseSet (); - - private: - - boost::asio::io_service& m_Service; - std::mutex m_StreamsMutex; - std::map m_Streams; - i2p::data::PrivateKeys m_Keys; - uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256]; - - i2p::tunnel::TunnelPool * m_Pool; - i2p::data::LeaseSet * m_LeaseSet; - bool m_IsPublic; - - std::function m_Acceptor; - }; - - class StreamingDestinations - { - public: - - StreamingDestinations (): m_IsRunning (false), m_Thread (nullptr), - m_Work (m_Service), m_SharedLocalDestination (nullptr) {}; - ~StreamingDestinations () {}; - - void Start (); - void Stop (); - - void HandleNextPacket (i2p::data::IdentHash destination, Packet * packet); - - Stream * CreateClientStream (const i2p::data::LeaseSet& remote); - void DeleteStream (Stream * stream); - StreamingDestination * GetSharedLocalDestination () const { return m_SharedLocalDestination; }; - StreamingDestination * CreateNewLocalDestination (bool isPublic); - StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic); - void DeleteLocalDestination (StreamingDestination * destination); - StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination) const; - StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic); - - private: - - void Run (); - void LoadLocalDestinations (); - void PostNextPacket (i2p::data::IdentHash destination, Packet * packet); - - private: - - bool m_IsRunning; - std::thread * m_Thread; - boost::asio::io_service m_Service; - boost::asio::io_service::work m_Work; - - std::mutex m_DestinationsMutex; - std::map m_Destinations; - StreamingDestination * m_SharedLocalDestination; - - public: - // for HTTP - const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; }; - }; - - Stream * CreateStream (const i2p::data::LeaseSet& remote); - void DeleteStream (Stream * stream); - void StartStreaming (); - void StopStreaming (); - StreamingDestination * GetSharedLocalDestination (); - StreamingDestination * CreateNewLocalDestination (bool isPublic = true); - StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true); - void DeleteLocalDestination (StreamingDestination * destination); - StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination); - StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic); - // for HTTP - const StreamingDestinations& GetLocalDestinations (); - - // assuming data is I2CP message - void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len); - I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len); //------------------------------------------------- diff --git a/Win32/i2pd.vcxproj b/Win32/i2pd.vcxproj index f3c761ef..eb5abb12 100644 --- a/Win32/i2pd.vcxproj +++ b/Win32/i2pd.vcxproj @@ -35,6 +35,7 @@ + @@ -72,6 +73,7 @@ + @@ -171,4 +173,4 @@ - \ No newline at end of file + diff --git a/build/CMakeLists.txt b/build/CMakeLists.txt index db7db5d2..3ad94ab3 100644 --- a/build/CMakeLists.txt +++ b/build/CMakeLists.txt @@ -30,6 +30,7 @@ set (SOURCES "${CMAKE_SOURCE_DIR}/SSU.cpp" "${CMAKE_SOURCE_DIR}/SSUData.cpp" "${CMAKE_SOURCE_DIR}/Streaming.cpp" + "${CMAKE_SOURCE_DIR}/Destination.cpp" "${CMAKE_SOURCE_DIR}/TransitTunnel.cpp" "${CMAKE_SOURCE_DIR}/Tunnel.cpp" "${CMAKE_SOURCE_DIR}/TunnelGateway.cpp" diff --git a/build/autotools/Makefile.in b/build/autotools/Makefile.in index a408a39c..5071e1be 100644 --- a/build/autotools/Makefile.in +++ b/build/autotools/Makefile.in @@ -114,7 +114,7 @@ am_i2p_OBJECTS = AddressBook.$(OBJEXT) CryptoConst.$(OBJEXT) \ Transports.$(OBJEXT) Tunnel.$(OBJEXT) TunnelEndpoint.$(OBJEXT) \ TunnelGateway.$(OBJEXT) TunnelPool.$(OBJEXT) UPnP.$(OBJEXT) \ aes.$(OBJEXT) base64.$(OBJEXT) i2p.$(OBJEXT) util.$(OBJEXT) \ - SAM.$(OBJEXT) + SAM.$(OBJEXT) Destination.$(OBJEXT) i2p_OBJECTS = $(am_i2p_OBJECTS) i2p_LDADD = $(LDADD) AM_V_P = $(am__v_P_@AM_V@) @@ -324,7 +324,7 @@ i2p_SOURCES = AddressBook.cpp CryptoConst.cpp Daemon.cpp \ SSUData.cpp Streaming.cpp TransitTunnel.cpp \ Transports.cpp Tunnel.cpp TunnelEndpoint.cpp \ TunnelGateway.cpp TunnelPool.cpp UPnP.cpp aes.cpp \ - base64.cpp i2p.cpp util.cpp \ + base64.cpp i2p.cpp util.cpp SAM.cpp Destination.cpp \ \ AddressBook.h CryptoConst.h Daemon.h ElGamal.h \ Garlic.h HTTPProxy.h HTTPServer.h I2NPProtocol.h \ @@ -335,7 +335,7 @@ i2p_SOURCES = AddressBook.cpp CryptoConst.cpp Daemon.cpp \ TransitTunnel.h Transports.h Tunnel.h TunnelBase.h \ TunnelConfig.h TunnelEndpoint.h TunnelGateway.h \ TunnelPool.h UPnP.h aes.h base64.h config.h hmac.h \ - util.h version.h + util.h version.h Destination.h AM_LDFLAGS = @BOOST_DATE_TIME_LIB@ @BOOST_FILESYSTEM_LIB@ \ @BOOST_PROGRAM_OPTIONS_LIB@ @BOOST_REGEX_LIB@ \ @@ -470,6 +470,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SSU.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SSUData.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Streaming.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Destination.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TransitTunnel.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Transports.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Tunnel.Po@am__quote@ diff --git a/filelist.mk b/filelist.mk index 8a44be18..f8d3279c 100644 --- a/filelist.mk +++ b/filelist.mk @@ -2,16 +2,18 @@ CPP_FILES := CryptoConst.cpp base64.cpp NTCPSession.cpp RouterInfo.cpp Transports.cpp \ RouterContext.cpp NetDb.cpp LeaseSet.cpp Tunnel.cpp TunnelEndpoint.cpp TunnelGateway.cpp \ - TransitTunnel.cpp I2NPProtocol.cpp Log.cpp Garlic.cpp HTTPServer.cpp Streaming.cpp Identity.cpp \ - SSU.cpp util.cpp Reseed.cpp DaemonLinux.cpp SSUData.cpp i2p.cpp aes.cpp SOCKS.cpp UPnP.cpp \ - TunnelPool.cpp HTTPProxy.cpp AddressBook.cpp Daemon.cpp I2PTunnel.cpp SAM.cpp + TransitTunnel.cpp I2NPProtocol.cpp Log.cpp Garlic.cpp HTTPServer.cpp Streaming.cpp \ + Destination.cpp Identity.cpp SSU.cpp util.cpp Reseed.cpp DaemonLinux.cpp SSUData.cpp \ + aes.cpp SOCKS.cpp UPnP.cpp TunnelPool.cpp HTTPProxy.cpp AddressBook.cpp Daemon.cpp \ + I2PTunnel.cpp SAM.cpp i2p.cpp H_FILES := CryptoConst.h base64.h NTCPSession.h RouterInfo.h Transports.h \ RouterContext.h NetDb.h LeaseSet.h Tunnel.h TunnelEndpoint.h TunnelGateway.h \ - TransitTunnel.h I2NPProtocol.h Log.h Garlic.h HTTPServer.h Streaming.h Identity.h \ - SSU.h util.h Reseed.h DaemonLinux.h SSUData.h i2p.h aes.h SOCKS.h UPnP.h TunnelPool.h \ - HTTPProxy.h AddressBook.h Daemon.h I2PTunnel.h version.h Signature.h SAM.h + TransitTunnel.h I2NPProtocol.h Log.h Garlic.h HTTPServer.h Streaming.h Destination.h \ + Identity.h SSU.h util.h Reseed.h DaemonLinux.h SSUData.h i2p.h aes.h SOCKS.h \ + UPnP.h TunnelPool.h HTTPProxy.h AddressBook.h Daemon.h I2PTunnel.h version.h \ + Signature.h SAM.h OBJECTS = $(addprefix obj/, $(notdir $(CPP_FILES:.cpp=.o)))