diff --git a/Streaming.cpp b/Streaming.cpp index c1bc5802..01ca8cfc 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -29,7 +29,7 @@ namespace stream Stream::Stream (boost::asio::io_service& service, StreamingDestination * local): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), - m_IsOpen (true), m_IsOutgoing(true), m_LeaseSetUpdated (true), m_LocalDestination (local), + m_IsOpen (false), m_IsOutgoing(true), m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (nullptr), m_ReceiveTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); @@ -54,7 +54,8 @@ namespace stream m_SendStreamID = packet->GetReceiveStreamID (); int32_t receivedSeqn = packet->GetSeqn (); - if (!receivedSeqn && !packet->IsSYN ()) + bool isSyn = packet->IsSYN (); + if (!receivedSeqn && !isSyn) { // plain ack LogPrint ("Plain ACK received"); @@ -63,11 +64,11 @@ namespace stream } LogPrint ("Received seqn=", receivedSeqn); - if (receivedSeqn == m_LastReceivedSequenceNumber + 1) + if (isSyn || receivedSeqn == m_LastReceivedSequenceNumber + 1) { // we have received next in sequence message ProcessPacket (packet); - + // we should also try stored messages if any for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();) { @@ -84,7 +85,14 @@ namespace stream // send ack for last message if (m_IsOpen) - SendQuickAck (); + SendQuickAck (); + else if (isSyn) + { + // we have to send SYN back to incoming connection + m_IsOpen = true; + SendQuickAck (true); + } + } else { @@ -134,12 +142,11 @@ namespace stream LogPrint ("From identity ", m_RemoteIdentity.Hash ().ToBase64 ()); if (!m_RemoteLeaseSet) LogPrint ("Incoming stream from ", m_RemoteIdentity.Hash ().ToBase64 ()); - optionData += i2p::data::DEFAULT_IDENTITY_SIZE; } if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) { - uint16_t maxPacketSize = htobe16 (*(uint16_t *)optionData); + uint16_t maxPacketSize = be16toh (*(uint16_t *)optionData); LogPrint ("Max packet size ", maxPacketSize); optionData += 2; } @@ -166,6 +173,7 @@ namespace stream LogPrint ("Closed"); SendQuickAck (); // send ack for close explicitly? m_IsOpen = false; + m_ReceiveTimer.cancel (); } } @@ -194,10 +202,10 @@ namespace stream PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED | PACKET_FLAG_NO_ACK); size += 2; // flags - *(uint16_t *)(packet + size) = htobe16 (sizeof (i2p::data::Identity) + 40 + 2); // identity + signature + packet size + *(uint16_t *)(packet + size) = htobe16 (i2p::data::DEFAULT_IDENTITY_SIZE + 40 + 2); // identity + signature + packet size size += 2; // options size - memcpy (packet + size, &m_LocalDestination->GetIdentity (), sizeof (i2p::data::Identity)); - size += sizeof (i2p::data::Identity); // from + memcpy (packet + size, &m_LocalDestination->GetIdentity (), i2p::data::DEFAULT_IDENTITY_SIZE); + size += i2p::data::DEFAULT_IDENTITY_SIZE; // from *(uint16_t *)(packet + size) = htobe16 (STREAMING_MTU); size += 2; // max packet size uint8_t * signature = packet + size; // set it later @@ -224,7 +232,7 @@ namespace stream } - void Stream::SendQuickAck () + void Stream::SendQuickAck (bool syn) { uint8_t packet[MAX_PACKET_SIZE]; size_t size = 0; @@ -239,7 +247,7 @@ namespace stream packet[size] = 0; size++; // NACK count size++; // resend delay - *(uint16_t *)(packet + size) = 0; // nof flags set + *(uint16_t *)(packet + size) = syn ? htobe16 (PACKET_FLAG_SYNCHRONIZE) : 0; // nof flags set size += 2; // flags *(uint16_t *)(packet + size) = 0; // no options size += 2; // options size @@ -438,9 +446,9 @@ namespace stream else // new incoming stream { auto incomingStream = CreateNewIncomingStream (); + incomingStream->HandleNextPacket (packet); if (m_Acceptor != nullptr) m_Acceptor (incomingStream); - incomingStream->HandleNextPacket (packet); } } diff --git a/Streaming.h b/Streaming.h index e7b44e43..33c8dfeb 100644 --- a/Streaming.h +++ b/Streaming.h @@ -95,7 +95,7 @@ namespace stream private: - void SendQuickAck (); + void SendQuickAck (bool syn = false); bool SendPacket (Packet * packet); bool SendPacket (const uint8_t * buf, size_t len);