diff --git a/Destination.cpp b/Destination.cpp index 9053c271..dc544ba6 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include "Log.h" #include "util.h" #include "NetDb.h" @@ -244,51 +243,16 @@ namespace client uint32_t length = be32toh (*(uint32_t *)buf); buf += 4; // we assume I2CP payload - if (buf[9] == PROTOCOL_TYPE_STREAMING && m_StreamingDestination) // streaming protocol - { - // unzip it - CryptoPP::Gunzip decompressor; - decompressor.Put (buf, length); - decompressor.MessageEnd(); - i2p::stream::Packet * uncompressed = new i2p::stream::Packet; - uncompressed->offset = 0; - uncompressed->len = decompressor.MaxRetrievable (); - if (uncompressed->len <= i2p::stream::MAX_PACKET_SIZE) - { - decompressor.Get (uncompressed->buf, uncompressed->len); - m_StreamingDestination->HandleNextPacket (uncompressed); - } - else - { - LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size. Skipped"); - delete uncompressed; - } - } - else - LogPrint ("Data: unexpected protocol ", buf[9]); - } - - I2NPMessage * ClientDestination::CreateDataMessage (const uint8_t * payload, size_t len) - { - I2NPMessage * msg = NewI2NPShortMessage (); - CryptoPP::Gzip compressor; - if (len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE) - compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); - else - compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL); - compressor.Put (payload, len); - compressor.MessageEnd(); - int size = compressor.MaxRetrievable (); - uint8_t * buf = msg->GetPayload (); - *(uint32_t *)buf = htobe32 (size); // length - buf += 4; - compressor.Get (buf, size); - memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later - buf[9] = PROTOCOL_TYPE_STREAMING; // streaming protocol. TODO: - msg->len += size + 4; - FillI2NPMessageHeader (msg, eI2NPData); - - return msg; + switch (buf[9]) + { + case PROTOCOL_TYPE_STREAMING: + // streaming protocol + if (m_StreamingDestination) + m_StreamingDestination->HandleDataMessagePayload (buf, length); + break; + default: + LogPrint ("Data: unexpected protocol ", buf[9]); + } } i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote) diff --git a/Destination.h b/Destination.h index f15c1588..d7547ecf 100644 --- a/Destination.h +++ b/Destination.h @@ -61,7 +61,6 @@ namespace client // I2CP void HandleDataMessage (const uint8_t * buf, size_t len); - I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len); private: diff --git a/Streaming.cpp b/Streaming.cpp index 82bd90dd..572d12c2 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -1,3 +1,4 @@ +#include #include "Log.h" #include "RouterInfo.h" #include "RouterContext.h" @@ -441,7 +442,7 @@ namespace stream for (auto it: packets) { auto msg = m_RoutingSession->WrapSingleMessage ( - m_LocalDestination.GetOwner ().CreateDataMessage (it->GetBuffer (), it->GetLength ())); + m_LocalDestination.CreateDataMessage (it->GetBuffer (), it->GetLength ())); msgs.push_back (i2p::tunnel::TunnelMessageBlock { i2p::tunnel::eDeliveryTypeTunnel, @@ -606,6 +607,50 @@ namespace stream } } + void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len) + { + // unzip it + CryptoPP::Gunzip decompressor; + decompressor.Put (buf, len); + decompressor.MessageEnd(); + Packet * uncompressed = new Packet; + uncompressed->offset = 0; + uncompressed->len = decompressor.MaxRetrievable (); + if (uncompressed->len <= MAX_PACKET_SIZE) + { + decompressor.Get (uncompressed->buf, uncompressed->len); + HandleNextPacket (uncompressed); + } + else + { + LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size. Skipped"); + delete uncompressed; + } + } + + I2NPMessage * StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len) + { + I2NPMessage * msg = NewI2NPShortMessage (); + CryptoPP::Gzip compressor; + if (len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE) + compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); + else + compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL); + compressor.Put (payload, len); + compressor.MessageEnd(); + int size = compressor.MaxRetrievable (); + uint8_t * buf = msg->GetPayload (); + *(uint32_t *)buf = htobe32 (size); // length + buf += 4; + compressor.Get (buf, size); + memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later + buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol + msg->len += size + 4; + FillI2NPMessageHeader (msg, eI2NPData); + + return msg; + } + void DeleteStream (Stream * stream) { if (stream) diff --git a/Streaming.h b/Streaming.h index ee7d9eb0..6d57a8ad 100644 --- a/Streaming.h +++ b/Streaming.h @@ -160,13 +160,14 @@ namespace stream void SetAcceptor (const std::function& acceptor) { m_Acceptor = acceptor; }; void ResetAcceptor () { m_Acceptor = nullptr; }; bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; - - // ClientDestination i2p::client::ClientDestination& GetOwner () { return m_Owner; }; - void HandleNextPacket (Packet * packet); + + I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len); + void HandleDataMessagePayload (const uint8_t * buf, size_t len); private: + void HandleNextPacket (Packet * packet); Stream * CreateNewIncomingStream (); private: