diff --git a/Streaming.cpp b/Streaming.cpp index 40bc8929..597ac79d 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -141,7 +141,9 @@ namespace stream void Stream::ConnectAndSend (uint8_t * buf, size_t len) { m_IsOpen = true; - uint8_t packet[STREAMING_MTU]; + Packet * p = new Packet (); + uint8_t * packet = p->GetBuffer (); + // TODO: implement setters size_t size = 0; *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); size += 4; // sendStreamID @@ -167,18 +169,18 @@ namespace stream size += 2; // max packet size uint8_t * signature = packet + size; // set it later memset (signature, 0, 40); // zeroes for now - size += 40; // signature - + size += 40; // signature memcpy (packet + size, buf, len); size += len; // payload m_LocalDestination->Sign (packet, size, signature); + p->len = size; - SendPacket (packet, size); + m_Service.post (boost::bind (&Stream::SendPacket, this, p)); } void Stream::SendQuickAck () { - uint8_t packet[STREAMING_MTU]; + uint8_t packet[MAX_PACKET_SIZE]; size_t size = 0; *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); size += 4; // sendStreamID @@ -195,7 +197,7 @@ namespace stream size += 2; // flags *(uint16_t *)(packet + size) = 0; // no options size += 2; // options size - + if (SendPacket (packet, size)) LogPrint ("Quick Ack sent"); } @@ -205,7 +207,7 @@ namespace stream if (m_IsOpen) { m_IsOpen = false; - uint8_t packet[STREAMING_MTU]; + uint8_t packet[MAX_PACKET_SIZE]; size_t size = 0; *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); size += 4; // sendStreamID @@ -226,7 +228,7 @@ namespace stream memset (packet + size, 0, 40); size += 40; // signature m_LocalDestination->Sign (packet, size, signature); - + if (SendPacket (packet, size)) LogPrint ("FIN sent"); m_ReceiveQueue.WakeUp (); @@ -266,7 +268,19 @@ namespace stream return pos; } - bool Stream::SendPacket (uint8_t * packet, size_t size) + bool Stream::SendPacket (Packet * packet) + { + if (packet) + { + bool ret = SendPacket (packet->GetBuffer (), packet->GetLength ()); + delete packet; + return ret; + } + else + return false; + } + + bool Stream::SendPacket (const uint8_t * buf, size_t len) { const I2NPMessage * leaseSet = nullptr; if (m_LeaseSetUpdated) @@ -275,7 +289,7 @@ namespace stream m_LeaseSetUpdated = false; } I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, - CreateDataMessage (this, packet, size), leaseSet); + CreateDataMessage (this, buf, len), leaseSet); if (!m_OutboundTunnel) m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); if (m_OutboundTunnel) @@ -540,7 +554,7 @@ namespace stream LogPrint ("Data: protocol ", buf[9], " is not supported"); } - I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len) + I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len) { I2NPMessage * msg = NewI2NPMessage (); CryptoPP::Gzip compressor; diff --git a/Streaming.h b/Streaming.h index 484479b2..af7a01f2 100644 --- a/Streaming.h +++ b/Streaming.h @@ -37,7 +37,7 @@ namespace stream struct Packet { - uint8_t buf[1754]; + uint8_t buf[MAX_PACKET_SIZE]; size_t len, offset; Packet (): len (0), offset (0) {}; @@ -88,7 +88,8 @@ namespace stream void ConnectAndSend (uint8_t * buf, size_t len); void SendQuickAck (); - bool SendPacket (uint8_t * packet, size_t size); + bool SendPacket (Packet * packet); + bool SendPacket (const uint8_t * buf, size_t len); void SavePacket (Packet * packet); void ProcessPacket (Packet * packet); @@ -183,7 +184,7 @@ namespace stream // assuming data is I2CP message void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len); - I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len); + I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len); } }