diff --git a/Streaming.cpp b/Streaming.cpp index 07f1a72a..c1bc5802 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -19,7 +19,7 @@ namespace stream { Stream::Stream (boost::asio::io_service& service, StreamingDestination * local, const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), - m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), + m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_IsOutgoing(true), m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (&remote), m_ReceiveTimer (m_Service) { @@ -28,7 +28,7 @@ namespace stream } Stream::Stream (boost::asio::io_service& service, StreamingDestination * local): - m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), + m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (true), m_IsOutgoing(true), m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service) { @@ -53,7 +53,7 @@ namespace stream if (!m_SendStreamID) m_SendStreamID = packet->GetReceiveStreamID (); - uint32_t receivedSeqn = packet->GetSeqn (); + int32_t receivedSeqn = packet->GetSeqn (); if (!receivedSeqn && !packet->IsSYN ()) { // plain ack @@ -63,7 +63,7 @@ namespace stream } LogPrint ("Received seqn=", receivedSeqn); - if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1) + if (receivedSeqn == m_LastReceivedSequenceNumber + 1) { // we have received next in sequence message ProcessPacket (packet); @@ -71,7 +71,7 @@ namespace stream // we should also try stored messages if any for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();) { - if ((*it)->GetSeqn () == m_LastReceivedSequenceNumber + 1) + if ((*it)->GetSeqn () == (uint32_t)(m_LastReceivedSequenceNumber + 1)) { Packet * savedPacket = *it; m_SavedPackets.erase (it++); @@ -123,26 +123,24 @@ namespace stream LogPrint ("Synchronize"); } + if (flags & PACKET_FLAG_DELAY_REQUESTED) + { + optionData += 2; + } + if (flags & PACKET_FLAG_FROM_INCLUDED) { - LogPrint ("From identity"); - optionData += m_RemoteIdentity.FromBuffer (optionData, packet->GetOptionSize ()); - if (m_RemoteLeaseSet) - { - if (m_RemoteIdentity.Hash () != m_RemoteLeaseSet->GetIdentHash ()) // check recieved identity - { - LogPrint ("Unexpected identity ", m_RemoteIdentity.Hash ().ToBase64 (), " ", m_RemoteLeaseSet->GetIdentHash ().ToBase64 (), " expected"); - m_RemoteLeaseSet = nullptr; - } - } - else + optionData += m_RemoteIdentity.FromBuffer (optionData, i2p::data::DEFAULT_IDENTITY_SIZE); + LogPrint ("From identity ", m_RemoteIdentity.Hash ().ToBase64 ()); + if (!m_RemoteLeaseSet) LogPrint ("Incoming stream from ", m_RemoteIdentity.Hash ().ToBase64 ()); - optionData += sizeof (i2p::data::Identity); + optionData += i2p::data::DEFAULT_IDENTITY_SIZE; } if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) { - LogPrint ("Max packet size"); + uint16_t maxPacketSize = htobe16 (*(uint16_t *)optionData); + LogPrint ("Max packet size ", maxPacketSize); optionData += 2; } @@ -633,7 +631,7 @@ namespace stream uncompressed->len = decompressor.MaxRetrievable (); if (uncompressed->len > MAX_PACKET_SIZE) { - LogPrint ("Recieved packet size exceeds mac packet size"); + LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size"); uncompressed->len = MAX_PACKET_SIZE; } decompressor.Get (uncompressed->buf, uncompressed->len); diff --git a/Streaming.h b/Streaming.h index c76f508a..e7b44e43 100644 --- a/Streaming.h +++ b/Streaming.h @@ -34,7 +34,7 @@ namespace stream const uint16_t PACKET_FLAG_NO_ACK = 0x0400; const size_t STREAMING_MTU = 1730; - const size_t MAX_PACKET_SIZE = 1754; + const size_t MAX_PACKET_SIZE = 4096; struct Packet { @@ -111,7 +111,8 @@ namespace stream private: boost::asio::io_service& m_Service; - uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber, m_LastReceivedSequenceNumber; + uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; + int32_t m_LastReceivedSequenceNumber; bool m_IsOpen, m_IsOutgoing, m_LeaseSetUpdated; StreamingDestination * m_LocalDestination; i2p::data::Identity m_RemoteIdentity;