From 38ee813e419371452c130e383b35d0d2cdbf14f0 Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 1 Dec 2014 18:29:46 -0500 Subject: [PATCH] wait for all messages acked before termination of a stream --- Streaming.cpp | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index ed41e70c..7a19123c 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -52,8 +52,7 @@ namespace stream for (auto it: m_SavedPackets) delete it; m_SavedPackets.clear (); - - Close (); + LogPrint (eLogDebug, "Stream deleted"); } void Stream::HandleNextPacket (Packet * packet) @@ -70,12 +69,12 @@ namespace stream if (!receivedSeqn && !isSyn) { // plain ack - LogPrint ("Plain ACK received"); + LogPrint (eLogDebug, "Plain ACK received"); delete packet; return; } - LogPrint ("Received seqn=", receivedSeqn); + LogPrint (eLogDebug, "Received seqn=", receivedSeqn); if (isSyn || receivedSeqn == m_LastReceivedSequenceNumber + 1) { // we have received next in sequence message @@ -115,7 +114,7 @@ namespace stream if (receivedSeqn <= m_LastReceivedSequenceNumber) { // we have received duplicate. Most likely our outbound tunnel is dead - LogPrint ("Duplicate message ", receivedSeqn, " received"); + LogPrint (eLogWarning, "Duplicate message ", receivedSeqn, " received"); m_CurrentOutboundTunnel = nullptr; // pick another outbound tunnel UpdateCurrentRemoteLease (); // pick another lease SendQuickAck (); // resend ack for previous message again @@ -123,7 +122,7 @@ namespace stream } else { - LogPrint ("Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); + LogPrint (eLogWarning, "Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); // save message and wait for missing message again SavePacket (packet); // send NACKs for missing messages ASAP @@ -147,11 +146,11 @@ namespace stream // process flags uint32_t receivedSeqn = packet->GetSeqn (); uint16_t flags = packet->GetFlags (); - LogPrint ("Process seqn=", receivedSeqn, ", flags=", flags); + LogPrint (eLogDebug, "Process seqn=", receivedSeqn, ", flags=", flags); const uint8_t * optionData = packet->GetOptionData (); if (flags & PACKET_FLAG_SYNCHRONIZE) - LogPrint ("Synchronize"); + LogPrint (eLogDebug, "Synchronize"); if (flags & PACKET_FLAG_DELAY_REQUESTED) { @@ -161,28 +160,28 @@ namespace stream if (flags & PACKET_FLAG_FROM_INCLUDED) { optionData += m_RemoteIdentity.FromBuffer (optionData, packet->GetOptionSize ()); - LogPrint ("From identity ", m_RemoteIdentity.GetIdentHash ().ToBase64 ()); + LogPrint (eLogInfo, "From identity ", m_RemoteIdentity.GetIdentHash ().ToBase64 ()); if (!m_RemoteLeaseSet) - LogPrint ("Incoming stream from ", m_RemoteIdentity.GetIdentHash ().ToBase64 ()); + LogPrint (eLogDebug, "Incoming stream from ", m_RemoteIdentity.GetIdentHash ().ToBase64 ()); } if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) { uint16_t maxPacketSize = be16toh (*(uint16_t *)optionData); - LogPrint ("Max packet size ", maxPacketSize); + LogPrint (eLogDebug, "Max packet size ", maxPacketSize); optionData += 2; } if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) { - LogPrint ("Signature"); + LogPrint (eLogDebug, "Signature"); uint8_t signature[256]; auto signatureLen = m_RemoteIdentity.GetSignatureLen (); memcpy (signature, optionData, signatureLen); memset (const_cast(optionData), 0, signatureLen); if (!m_RemoteIdentity.Verify (packet->GetBuffer (), packet->GetLength (), signature)) { - LogPrint ("Signature verification failed"); + LogPrint (eLogError, "Signature verification failed"); Close (); flags |= PACKET_FLAG_CLOSE; } @@ -203,13 +202,11 @@ namespace stream if (flags & PACKET_FLAG_CLOSE) { - LogPrint ("Closed"); - SendQuickAck (); // send ack for close explicitly? + LogPrint (eLogInfo, "Closed"); + Close (); m_IsOpen = false; m_IsReset = true; m_ReceiveTimer.cancel (); - m_ResendTimer.cancel (); - m_AckSendTimer.cancel (); } } @@ -233,13 +230,13 @@ namespace stream } if (nacked) { - LogPrint ("Packet ", seqn, " NACK"); + LogPrint (eLogDebug, "Packet ", seqn, " NACK"); it++; continue; } } auto sentPacket = *it; - LogPrint ("Packet ", seqn, " acknowledged"); + LogPrint (eLogDebug, "Packet ", seqn, " acknowledged"); m_SentPackets.erase (it++); delete sentPacket; } @@ -418,7 +415,7 @@ namespace stream m_LocalDestination.GetOwner ().Sign (packet, size, signature); p->len = size; - SendPacket (p); + m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); LogPrint ("FIN sent"); } }