Browse Source

moved Data payload processing to StreamingDestination

pull/105/head
orignal 10 years ago
parent
commit
3da35c592f
  1. 52
      Destination.cpp
  2. 1
      Destination.h
  3. 47
      Streaming.cpp
  4. 7
      Streaming.h

52
Destination.cpp

@ -1,7 +1,6 @@
#include <fstream> #include <fstream>
#include <algorithm> #include <algorithm>
#include <cryptopp/dh.h> #include <cryptopp/dh.h>
#include <cryptopp/gzip.h>
#include "Log.h" #include "Log.h"
#include "util.h" #include "util.h"
#include "NetDb.h" #include "NetDb.h"
@ -244,51 +243,16 @@ namespace client
uint32_t length = be32toh (*(uint32_t *)buf); uint32_t length = be32toh (*(uint32_t *)buf);
buf += 4; buf += 4;
// we assume I2CP payload // we assume I2CP payload
if (buf[9] == PROTOCOL_TYPE_STREAMING && m_StreamingDestination) // streaming protocol switch (buf[9])
{ {
// unzip it case PROTOCOL_TYPE_STREAMING:
CryptoPP::Gunzip decompressor; // streaming protocol
decompressor.Put (buf, length); if (m_StreamingDestination)
decompressor.MessageEnd(); m_StreamingDestination->HandleDataMessagePayload (buf, length);
i2p::stream::Packet * uncompressed = new i2p::stream::Packet; break;
uncompressed->offset = 0; default:
uncompressed->len = decompressor.MaxRetrievable (); LogPrint ("Data: unexpected protocol ", buf[9]);
if (uncompressed->len <= i2p::stream::MAX_PACKET_SIZE)
{
decompressor.Get (uncompressed->buf, uncompressed->len);
m_StreamingDestination->HandleNextPacket (uncompressed);
}
else
{
LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size. Skipped");
delete uncompressed;
}
} }
else
LogPrint ("Data: unexpected protocol ", buf[9]);
}
I2NPMessage * ClientDestination::CreateDataMessage (const uint8_t * payload, size_t len)
{
I2NPMessage * msg = NewI2NPShortMessage ();
CryptoPP::Gzip compressor;
if (len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE)
compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL);
else
compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL);
compressor.Put (payload, len);
compressor.MessageEnd();
int size = compressor.MaxRetrievable ();
uint8_t * buf = msg->GetPayload ();
*(uint32_t *)buf = htobe32 (size); // length
buf += 4;
compressor.Get (buf, size);
memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later
buf[9] = PROTOCOL_TYPE_STREAMING; // streaming protocol. TODO:
msg->len += size + 4;
FillI2NPMessageHeader (msg, eI2NPData);
return msg;
} }
i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote) i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote)

1
Destination.h

@ -61,7 +61,6 @@ namespace client
// I2CP // I2CP
void HandleDataMessage (const uint8_t * buf, size_t len); void HandleDataMessage (const uint8_t * buf, size_t len);
I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len);
private: private:

47
Streaming.cpp

@ -1,3 +1,4 @@
#include <cryptopp/gzip.h>
#include "Log.h" #include "Log.h"
#include "RouterInfo.h" #include "RouterInfo.h"
#include "RouterContext.h" #include "RouterContext.h"
@ -441,7 +442,7 @@ namespace stream
for (auto it: packets) for (auto it: packets)
{ {
auto msg = m_RoutingSession->WrapSingleMessage ( auto msg = m_RoutingSession->WrapSingleMessage (
m_LocalDestination.GetOwner ().CreateDataMessage (it->GetBuffer (), it->GetLength ())); m_LocalDestination.CreateDataMessage (it->GetBuffer (), it->GetLength ()));
msgs.push_back (i2p::tunnel::TunnelMessageBlock msgs.push_back (i2p::tunnel::TunnelMessageBlock
{ {
i2p::tunnel::eDeliveryTypeTunnel, i2p::tunnel::eDeliveryTypeTunnel,
@ -606,6 +607,50 @@ namespace stream
} }
} }
void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len)
{
// unzip it
CryptoPP::Gunzip decompressor;
decompressor.Put (buf, len);
decompressor.MessageEnd();
Packet * uncompressed = new Packet;
uncompressed->offset = 0;
uncompressed->len = decompressor.MaxRetrievable ();
if (uncompressed->len <= MAX_PACKET_SIZE)
{
decompressor.Get (uncompressed->buf, uncompressed->len);
HandleNextPacket (uncompressed);
}
else
{
LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size. Skipped");
delete uncompressed;
}
}
I2NPMessage * StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len)
{
I2NPMessage * msg = NewI2NPShortMessage ();
CryptoPP::Gzip compressor;
if (len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE)
compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL);
else
compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL);
compressor.Put (payload, len);
compressor.MessageEnd();
int size = compressor.MaxRetrievable ();
uint8_t * buf = msg->GetPayload ();
*(uint32_t *)buf = htobe32 (size); // length
buf += 4;
compressor.Get (buf, size);
memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later
buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol
msg->len += size + 4;
FillI2NPMessageHeader (msg, eI2NPData);
return msg;
}
void DeleteStream (Stream * stream) void DeleteStream (Stream * stream)
{ {
if (stream) if (stream)

7
Streaming.h

@ -160,13 +160,14 @@ namespace stream
void SetAcceptor (const std::function<void (Stream *)>& acceptor) { m_Acceptor = acceptor; }; void SetAcceptor (const std::function<void (Stream *)>& acceptor) { m_Acceptor = acceptor; };
void ResetAcceptor () { m_Acceptor = nullptr; }; void ResetAcceptor () { m_Acceptor = nullptr; };
bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
// ClientDestination
i2p::client::ClientDestination& GetOwner () { return m_Owner; }; i2p::client::ClientDestination& GetOwner () { return m_Owner; };
void HandleNextPacket (Packet * packet);
I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len);
void HandleDataMessagePayload (const uint8_t * buf, size_t len);
private: private:
void HandleNextPacket (Packet * packet);
Stream * CreateNewIncomingStream (); Stream * CreateNewIncomingStream ();
private: private:

Loading…
Cancel
Save