diff --git a/Streaming.cpp b/Streaming.cpp index d365e57c..a537def8 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -15,8 +15,9 @@ namespace i2p namespace stream { Stream::Stream (StreamingDestination * local, const i2p::data::LeaseSet& remote): - m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), - m_LocalDestination (local), m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) + m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), + m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), + m_RemoteLeaseSet (remote), m_OutboundTunnel (nullptr) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -167,19 +168,8 @@ namespace stream memcpy (packet + size, buf, len); size += len; // payload m_LocalDestination->Sign (packet, size, signature); - I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, - CreateDataMessage (this, packet, size), m_LocalDestination->GetLeaseSet ()); - - if (!m_OutboundTunnel) - m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); - auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); - if (m_OutboundTunnel && !leases.empty ()) - { - auto& lease = *leases.begin (); // TODO: - m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); - } - else - DeleteI2NPMessage (msg); + + SendPacket (packet, size); } void Stream::SendQuickAck () @@ -202,25 +192,8 @@ namespace stream *(uint16_t *)(packet + size) = 0; // no options size += 2; // options size - I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, - CreateDataMessage (this, packet, size)); - if (m_OutboundTunnel) - { - auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); - if (!leases.empty ()) - { - auto& lease = *leases.begin (); // TODO: - m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); - LogPrint ("Quick Ack sent"); - } - else - { - LogPrint ("All leases are expired"); - DeleteI2NPMessage (msg); - } - } - else - DeleteI2NPMessage (msg); + if (SendPacket (packet, size)) + LogPrint ("Quick Ack sent"); } void Stream::Close () @@ -250,17 +223,8 @@ namespace stream size += 40; // signature m_LocalDestination->Sign (packet, size, signature); - I2NPMessage * msg = i2p::garlic::routing.WrapSingleMessage (m_RemoteLeaseSet, - CreateDataMessage (this, packet, size)); - auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); - if (m_OutboundTunnel && !leases.empty ()) - { - auto& lease = *leases.begin (); // TODO: - m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); - LogPrint ("FIN sent"); - } - else - DeleteI2NPMessage (msg); + if (SendPacket (packet, size)) + LogPrint ("FIN sent"); m_ReceiveQueue.WakeUp (); } } @@ -297,6 +261,41 @@ namespace stream } return pos; } + + bool Stream::SendPacket (uint8_t * packet, size_t size) + { + I2NPMessage * leaseSet = nullptr; + if (m_LeaseSetUpdated) + { + leaseSet = m_LocalDestination->GetLeaseSet (); + m_LeaseSetUpdated = false; + } + I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, + CreateDataMessage (this, packet, size), leaseSet); + if (!m_OutboundTunnel) + m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); + if (m_OutboundTunnel) + { + auto leases = m_RemoteLeaseSet.GetNonExpiredLeases (); + if (!leases.empty ()) + { + auto& lease = *leases.begin (); // TODO: + m_OutboundTunnel->SendTunnelDataMsg (lease.tunnelGateway, lease.tunnelID, msg); + return true; + } + else + { + LogPrint ("All leases are expired"); + DeleteI2NPMessage (msg); + } + } + else + { + LogPrint ("No outbound tunnels in the pool"); + DeleteI2NPMessage (msg); + } + return false; + } StreamingDestination * sharedLocalDestination = nullptr; @@ -356,6 +355,8 @@ namespace stream m_LeaseSet = newLeaseSet; if (oldLeaseSet) DeleteI2NPMessage (oldLeaseSet); + for (auto it: m_Streams) + it.second->SetLeaseSetUpdated (); } I2NPMessage * StreamingDestination::GetLeaseSet () @@ -450,7 +451,7 @@ namespace stream uncompressed->len = decompressor.MaxRetrievable (); if (uncompressed->len > MAX_PACKET_SIZE) { - LogPrint ("Recieved packet size exceeds mac packer size"); + LogPrint ("Recieved packet size exceeds mac packet size"); uncompressed->len = MAX_PACKET_SIZE; } decompressor.Get (uncompressed->buf, uncompressed->len); diff --git a/Streaming.h b/Streaming.h index b0f7fda9..973c2a55 100644 --- a/Streaming.h +++ b/Streaming.h @@ -78,11 +78,14 @@ namespace stream size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired void Close (); + + void SetLeaseSetUpdated () { m_LeaseSetUpdated = true; }; private: void ConnectAndSend (uint8_t * buf, size_t len); void SendQuickAck (); + bool SendPacket (uint8_t * packet, size_t size); void SavePacket (Packet * packet); void ProcessPacket (Packet * packet); @@ -90,7 +93,7 @@ namespace stream private: uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; - bool m_IsOpen; + bool m_IsOpen, m_LeaseSetUpdated; StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet& m_RemoteLeaseSet; i2p::util::Queue m_ReceiveQueue;