|
|
|
@ -156,7 +156,34 @@ namespace stream
@@ -156,7 +156,34 @@ namespace stream
|
|
|
|
|
delete newLeaseSet; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void StreamingDestination::HandleDataMessage (const uint8_t * buf, size_t len) |
|
|
|
|
{ |
|
|
|
|
uint32_t length = be32toh (*(uint32_t *)buf); |
|
|
|
|
buf += 4; |
|
|
|
|
// we assume I2CP payload
|
|
|
|
|
if (buf[9] == 6) // streaming protocol
|
|
|
|
|
{ |
|
|
|
|
// unzip it
|
|
|
|
|
CryptoPP::Gunzip decompressor; |
|
|
|
|
decompressor.Put (buf, length); |
|
|
|
|
decompressor.MessageEnd(); |
|
|
|
|
Packet * uncompressed = new Packet; |
|
|
|
|
uncompressed->offset = 0; |
|
|
|
|
uncompressed->len = decompressor.MaxRetrievable (); |
|
|
|
|
if (uncompressed->len > MAX_PACKET_SIZE) |
|
|
|
|
{ |
|
|
|
|
LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size"); |
|
|
|
|
uncompressed->len = MAX_PACKET_SIZE; |
|
|
|
|
} |
|
|
|
|
decompressor.Get (uncompressed->buf, uncompressed->len); |
|
|
|
|
// then forward to streaming thread
|
|
|
|
|
m_Service.post (boost::bind (&StreamingDestination::HandleNextPacket, this, uncompressed)); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
LogPrint ("Data: unexpected protocol ", buf[9]); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void StreamingDestination::SetLeaseSetUpdated () |
|
|
|
|
{ |
|
|
|
|
UpdateLeaseSet (); |
|
|
|
@ -282,23 +309,6 @@ namespace stream
@@ -282,23 +309,6 @@ namespace stream
|
|
|
|
|
if (stream) |
|
|
|
|
stream->GetLocalDestination ()->DeleteStream (stream); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet) |
|
|
|
|
{ |
|
|
|
|
m_Service.post (boost::bind (&StreamingDestinations::PostNextPacket, this, destination, packet)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void StreamingDestinations::PostNextPacket (i2p::data::IdentHash destination, Packet * packet) |
|
|
|
|
{ |
|
|
|
|
auto it = m_Destinations.find (destination); |
|
|
|
|
if (it != m_Destinations.end ()) |
|
|
|
|
it->second->HandleNextPacket (packet); |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
LogPrint ("Local destination ", destination.ToBase64 (), " not found"); |
|
|
|
|
delete packet; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
StreamingDestination * StreamingDestinations::FindLocalDestination (const i2p::data::IdentHash& destination) const |
|
|
|
|
{ |
|
|
|
@ -362,33 +372,6 @@ namespace stream
@@ -362,33 +372,6 @@ namespace stream
|
|
|
|
|
{ |
|
|
|
|
return destinations; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len) |
|
|
|
|
{ |
|
|
|
|
uint32_t length = be32toh (*(uint32_t *)buf); |
|
|
|
|
buf += 4; |
|
|
|
|
// we assume I2CP payload
|
|
|
|
|
if (buf[9] == 6) // streaming protocol
|
|
|
|
|
{ |
|
|
|
|
// unzip it
|
|
|
|
|
CryptoPP::Gunzip decompressor; |
|
|
|
|
decompressor.Put (buf, length); |
|
|
|
|
decompressor.MessageEnd(); |
|
|
|
|
Packet * uncompressed = new Packet; |
|
|
|
|
uncompressed->offset = 0; |
|
|
|
|
uncompressed->len = decompressor.MaxRetrievable (); |
|
|
|
|
if (uncompressed->len > MAX_PACKET_SIZE) |
|
|
|
|
{ |
|
|
|
|
LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size"); |
|
|
|
|
uncompressed->len = MAX_PACKET_SIZE; |
|
|
|
|
} |
|
|
|
|
decompressor.Get (uncompressed->buf, uncompressed->len); |
|
|
|
|
// then forward to streaming engine thread
|
|
|
|
|
destinations.HandleNextPacket (destination, uncompressed); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
LogPrint ("Data: protocol ", buf[9], " is not supported"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len) |
|
|
|
|
{ |
|
|
|
|