Browse Source

send SYN in streaming thread

pull/48/head
orignal 11 years ago
parent
commit
88214a0c58
  1. 36
      Streaming.cpp
  2. 7
      Streaming.h

36
Streaming.cpp

@ -141,7 +141,9 @@ namespace stream
void Stream::ConnectAndSend (uint8_t * buf, size_t len) void Stream::ConnectAndSend (uint8_t * buf, size_t len)
{ {
m_IsOpen = true; m_IsOpen = true;
uint8_t packet[STREAMING_MTU]; Packet * p = new Packet ();
uint8_t * packet = p->GetBuffer ();
// TODO: implement setters
size_t size = 0; size_t size = 0;
*(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID);
size += 4; // sendStreamID size += 4; // sendStreamID
@ -167,18 +169,18 @@ namespace stream
size += 2; // max packet size size += 2; // max packet size
uint8_t * signature = packet + size; // set it later uint8_t * signature = packet + size; // set it later
memset (signature, 0, 40); // zeroes for now memset (signature, 0, 40); // zeroes for now
size += 40; // signature size += 40; // signature
memcpy (packet + size, buf, len); memcpy (packet + size, buf, len);
size += len; // payload size += len; // payload
m_LocalDestination->Sign (packet, size, signature); m_LocalDestination->Sign (packet, size, signature);
p->len = size;
SendPacket (packet, size); m_Service.post (boost::bind (&Stream::SendPacket, this, p));
} }
void Stream::SendQuickAck () void Stream::SendQuickAck ()
{ {
uint8_t packet[STREAMING_MTU]; uint8_t packet[MAX_PACKET_SIZE];
size_t size = 0; size_t size = 0;
*(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID);
size += 4; // sendStreamID size += 4; // sendStreamID
@ -195,7 +197,7 @@ namespace stream
size += 2; // flags size += 2; // flags
*(uint16_t *)(packet + size) = 0; // no options *(uint16_t *)(packet + size) = 0; // no options
size += 2; // options size size += 2; // options size
if (SendPacket (packet, size)) if (SendPacket (packet, size))
LogPrint ("Quick Ack sent"); LogPrint ("Quick Ack sent");
} }
@ -205,7 +207,7 @@ namespace stream
if (m_IsOpen) if (m_IsOpen)
{ {
m_IsOpen = false; m_IsOpen = false;
uint8_t packet[STREAMING_MTU]; uint8_t packet[MAX_PACKET_SIZE];
size_t size = 0; size_t size = 0;
*(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID);
size += 4; // sendStreamID size += 4; // sendStreamID
@ -226,7 +228,7 @@ namespace stream
memset (packet + size, 0, 40); memset (packet + size, 0, 40);
size += 40; // signature size += 40; // signature
m_LocalDestination->Sign (packet, size, signature); m_LocalDestination->Sign (packet, size, signature);
if (SendPacket (packet, size)) if (SendPacket (packet, size))
LogPrint ("FIN sent"); LogPrint ("FIN sent");
m_ReceiveQueue.WakeUp (); m_ReceiveQueue.WakeUp ();
@ -266,7 +268,19 @@ namespace stream
return pos; return pos;
} }
bool Stream::SendPacket (uint8_t * packet, size_t size) bool Stream::SendPacket (Packet * packet)
{
if (packet)
{
bool ret = SendPacket (packet->GetBuffer (), packet->GetLength ());
delete packet;
return ret;
}
else
return false;
}
bool Stream::SendPacket (const uint8_t * buf, size_t len)
{ {
const I2NPMessage * leaseSet = nullptr; const I2NPMessage * leaseSet = nullptr;
if (m_LeaseSetUpdated) if (m_LeaseSetUpdated)
@ -275,7 +289,7 @@ namespace stream
m_LeaseSetUpdated = false; m_LeaseSetUpdated = false;
} }
I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet,
CreateDataMessage (this, packet, size), leaseSet); CreateDataMessage (this, buf, len), leaseSet);
if (!m_OutboundTunnel) if (!m_OutboundTunnel)
m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel ();
if (m_OutboundTunnel) if (m_OutboundTunnel)
@ -540,7 +554,7 @@ namespace stream
LogPrint ("Data: protocol ", buf[9], " is not supported"); LogPrint ("Data: protocol ", buf[9], " is not supported");
} }
I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len) I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len)
{ {
I2NPMessage * msg = NewI2NPMessage (); I2NPMessage * msg = NewI2NPMessage ();
CryptoPP::Gzip compressor; CryptoPP::Gzip compressor;

7
Streaming.h

@ -37,7 +37,7 @@ namespace stream
struct Packet struct Packet
{ {
uint8_t buf[1754]; uint8_t buf[MAX_PACKET_SIZE];
size_t len, offset; size_t len, offset;
Packet (): len (0), offset (0) {}; Packet (): len (0), offset (0) {};
@ -88,7 +88,8 @@ namespace stream
void ConnectAndSend (uint8_t * buf, size_t len); void ConnectAndSend (uint8_t * buf, size_t len);
void SendQuickAck (); void SendQuickAck ();
bool SendPacket (uint8_t * packet, size_t size); bool SendPacket (Packet * packet);
bool SendPacket (const uint8_t * buf, size_t len);
void SavePacket (Packet * packet); void SavePacket (Packet * packet);
void ProcessPacket (Packet * packet); void ProcessPacket (Packet * packet);
@ -183,7 +184,7 @@ namespace stream
// assuming data is I2CP message // assuming data is I2CP message
void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len); void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len);
I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len); I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len);
} }
} }

Loading…
Cancel
Save