From 36de881041b66017d5da4d89b3eb67403d13943b Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 6 Mar 2015 21:39:05 -0500 Subject: [PATCH] send close after buffer --- Streaming.cpp | 66 +++++++++++++++++++++++++++++---------------------- Streaming.h | 1 + 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index d3c3e225..520800f6 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -359,6 +359,8 @@ namespace stream m_SentPackets.insert (it); } SendPackets (packets); + if (!m_IsOpen && m_SendBuffer.eof ()) + SendClose (); if (isEmpty) ScheduleResend (); } @@ -438,37 +440,12 @@ namespace stream if (m_IsOpen) { m_IsOpen = false; - Packet * p = new Packet (); - uint8_t * packet = p->GetBuffer (); - size_t size = 0; - htobe32buf (packet + size, m_SendStreamID); - size += 4; // sendStreamID - htobe32buf (packet + size, m_RecvStreamID); - size += 4; // receiveStreamID - htobe32buf (packet + size, m_SequenceNumber++); - size += 4; // sequenceNum - htobe32buf (packet + size, m_LastReceivedSequenceNumber); - size += 4; // ack Through - packet[size] = 0; - size++; // NACK count - size++; // resend delay - htobe16buf (packet + size, PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED); - size += 2; // flags - size_t signatureLen = m_LocalDestination.GetOwner ().GetIdentity ().GetSignatureLen (); - htobe16buf (packet + size, signatureLen); // signature only - size += 2; // options size - uint8_t * signature = packet + size; - memset (packet + size, 0, signatureLen); - size += signatureLen; // signature - m_LocalDestination.GetOwner ().Sign (packet, size, signature); - - p->len = size; - m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); - LogPrint ("FIN sent"); + if (m_SendBuffer.eof ()) // nothing to send + SendClose (); } - if (m_IsReset || (m_SentPackets.empty () && m_SendBuffer.eof ())) + if (m_IsReset || m_SentPackets.empty ()) { - // no more outgoing data or closed by peer + // closed by peer or everything has been acknowledged m_ReceiveTimer.cancel (); m_LocalDestination.DeleteStream (shared_from_this ()); } @@ -477,6 +454,37 @@ namespace stream } + void Stream::SendClose () + { + Packet * p = new Packet (); + uint8_t * packet = p->GetBuffer (); + size_t size = 0; + htobe32buf (packet + size, m_SendStreamID); + size += 4; // sendStreamID + htobe32buf (packet + size, m_RecvStreamID); + size += 4; // receiveStreamID + htobe32buf (packet + size, m_SequenceNumber++); + size += 4; // sequenceNum + htobe32buf (packet + size, m_LastReceivedSequenceNumber); + size += 4; // ack Through + packet[size] = 0; + size++; // NACK count + size++; // resend delay + htobe16buf (packet + size, PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED); + size += 2; // flags + size_t signatureLen = m_LocalDestination.GetOwner ().GetIdentity ().GetSignatureLen (); + htobe16buf (packet + size, signatureLen); // signature only + size += 2; // options size + uint8_t * signature = packet + size; + memset (packet + size, 0, signatureLen); + size += signatureLen; // signature + m_LocalDestination.GetOwner ().Sign (packet, size, signature); + + p->len = size; + m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); + LogPrint ("FIN sent"); + } + size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) { size_t pos = 0; diff --git a/Streaming.h b/Streaming.h index d48a9389..73db7692 100644 --- a/Streaming.h +++ b/Streaming.h @@ -124,6 +124,7 @@ namespace stream void SendBuffer (); void SendQuickAck (); + void SendClose (); bool SendPacket (Packet * packet); void SendPackets (const std::vector& packets);