diff --git a/Destination.cpp b/Destination.cpp index 0974ecdf..1f984fa1 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -156,7 +156,34 @@ namespace stream delete newLeaseSet; } } - + + void StreamingDestination::HandleDataMessage (const uint8_t * buf, size_t len) + { + uint32_t length = be32toh (*(uint32_t *)buf); + buf += 4; + // we assume I2CP payload + if (buf[9] == 6) // streaming protocol + { + // unzip it + CryptoPP::Gunzip decompressor; + decompressor.Put (buf, length); + decompressor.MessageEnd(); + Packet * uncompressed = new Packet; + uncompressed->offset = 0; + uncompressed->len = decompressor.MaxRetrievable (); + if (uncompressed->len > MAX_PACKET_SIZE) + { + LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size"); + uncompressed->len = MAX_PACKET_SIZE; + } + decompressor.Get (uncompressed->buf, uncompressed->len); + // then forward to streaming thread + m_Service.post (boost::bind (&StreamingDestination::HandleNextPacket, this, uncompressed)); + } + else + LogPrint ("Data: unexpected protocol ", buf[9]); + } + void StreamingDestination::SetLeaseSetUpdated () { UpdateLeaseSet (); @@ -282,23 +309,6 @@ namespace stream if (stream) stream->GetLocalDestination ()->DeleteStream (stream); } - - void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet) - { - m_Service.post (boost::bind (&StreamingDestinations::PostNextPacket, this, destination, packet)); - } - - void StreamingDestinations::PostNextPacket (i2p::data::IdentHash destination, Packet * packet) - { - auto it = m_Destinations.find (destination); - if (it != m_Destinations.end ()) - it->second->HandleNextPacket (packet); - else - { - LogPrint ("Local destination ", destination.ToBase64 (), " not found"); - delete packet; - } - } StreamingDestination * StreamingDestinations::FindLocalDestination (const i2p::data::IdentHash& destination) const { @@ -362,33 +372,6 @@ namespace stream { return destinations; } - - void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len) - { - uint32_t length = be32toh (*(uint32_t *)buf); - buf += 4; - // we assume I2CP payload - if (buf[9] == 6) // streaming protocol - { - // unzip it - CryptoPP::Gunzip decompressor; - decompressor.Put (buf, length); - decompressor.MessageEnd(); - Packet * uncompressed = new Packet; - uncompressed->offset = 0; - uncompressed->len = decompressor.MaxRetrievable (); - if (uncompressed->len > MAX_PACKET_SIZE) - { - LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size"); - uncompressed->len = MAX_PACKET_SIZE; - } - decompressor.Get (uncompressed->buf, uncompressed->len); - // then forward to streaming engine thread - destinations.HandleNextPacket (destination, uncompressed); - } - else - LogPrint ("Data: protocol ", buf[9], " is not supported"); - } I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len) { diff --git a/Destination.h b/Destination.h index fccde2f1..e00f1b3e 100644 --- a/Destination.h +++ b/Destination.h @@ -37,6 +37,7 @@ namespace stream const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; }; const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; }; void SetLeaseSetUpdated (); + void HandleDataMessage (const uint8_t * buf, size_t len); private: @@ -69,8 +70,6 @@ namespace stream void Start (); void Stop (); - void HandleNextPacket (i2p::data::IdentHash destination, Packet * packet); - Stream * CreateClientStream (const i2p::data::LeaseSet& remote); void DeleteStream (Stream * stream); StreamingDestination * GetSharedLocalDestination () const { return m_SharedLocalDestination; }; @@ -84,7 +83,6 @@ namespace stream void Run (); void LoadLocalDestinations (); - void PostNextPacket (i2p::data::IdentHash destination, Packet * packet); private: @@ -117,7 +115,6 @@ namespace stream const StreamingDestinations& GetLocalDestinations (); // assuming data is I2CP message - void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len); I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len); } } diff --git a/Garlic.cpp b/Garlic.cpp index 2b5d7112..e3cfa5e1 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -434,13 +434,16 @@ namespace garlic case eGarlicDeliveryTypeDestination: { LogPrint ("Garlic type destination"); - i2p::data::IdentHash destination (buf); - buf += 32; - // we assume streaming protocol for destination - // later on we should let destination decide + buf += 32; // destination. check it later or for multiple destinations I2NPHeader * header = (I2NPHeader *)buf; if (header->typeID == eI2NPData) - i2p::stream::HandleDataMessage (destination, buf + sizeof (I2NPHeader), be16toh (header->size)); + { + auto pool = from ? from->GetTunnelPool () : nullptr; + if (pool) + pool->GetLocalDestination ().HandleDataMessage (buf + sizeof (I2NPHeader), be16toh (header->size)); + else + LogPrint ("Local destination doesn't exist"); + } else LogPrint ("Unexpected I2NP garlic message ", (int)header->typeID); break; diff --git a/Identity.h b/Identity.h index e352ddb5..291edbec 100644 --- a/Identity.h +++ b/Identity.h @@ -243,6 +243,7 @@ namespace data virtual const uint8_t * GetEncryptionPrivateKey () const = 0; virtual const uint8_t * GetEncryptionPublicKey () const = 0; virtual void SetLeaseSetUpdated () = 0; + virtual void HandleDataMessage (const uint8_t * buf, size_t len) = 0; const IdentityEx& GetIdentity () const { return GetPrivateKeys ().GetPublic (); }; const IdentHash& GetIdentHash () const { return GetIdentity ().GetIdentHash (); }; diff --git a/RouterContext.h b/RouterContext.h index 188c0c93..ffb4a1e5 100644 --- a/RouterContext.h +++ b/RouterContext.h @@ -40,6 +40,7 @@ namespace i2p const uint8_t * GetEncryptionPrivateKey () const { return m_Keys.GetPrivateKey (); }; const uint8_t * GetEncryptionPublicKey () const { return GetIdentity ().GetStandardIdentity ().publicKey; }; void SetLeaseSetUpdated () {}; + void HandleDataMessage (const uint8_t * buf, size_t len) {}; private: diff --git a/TunnelPool.h b/TunnelPool.h index c08d5ece..c4a09d1c 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -30,7 +30,8 @@ namespace tunnel const uint8_t * GetEncryptionPrivateKey () const { return m_LocalDestination.GetEncryptionPrivateKey (); }; const uint8_t * GetEncryptionPublicKey () const { return m_LocalDestination.GetEncryptionPublicKey (); }; - const i2p::data::LocalDestination& GetLocalDestination () const { return m_LocalDestination; }; + const i2p::data::LocalDestination& GetLocalDestination () const { return m_LocalDestination; }; + i2p::data::LocalDestination& GetLocalDestination () { return m_LocalDestination; }; bool IsExploratory () const { return GetIdentHash () == i2p::context.GetRouterIdentHash (); }; void CreateTunnels ();