From f75de6af82d5fc525d797518f9cf30342a8d7a16 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 25 Jan 2015 17:43:34 -0500 Subject: [PATCH] window --- Streaming.cpp | 108 +++++--------------------------------------------- Streaming.h | 2 +- 2 files changed, 12 insertions(+), 98 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 92f30ac2..f6b50176 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -211,6 +211,7 @@ namespace stream void Stream::ProcessAck (Packet * packet) { + bool acknowledged = false; uint32_t ackThrough = packet->GetAckThrough (); int nackCount = packet->GetNACKCount (); for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) @@ -238,12 +239,15 @@ namespace stream LogPrint (eLogDebug, "Packet ", seqn, " acknowledged"); m_SentPackets.erase (it++); delete sentPacket; + acknowledged = true; } else break; } if (m_SentPackets.empty ()) m_ResendTimer.cancel (); + if (acknowledged) + SendBuffer (); } size_t Stream::Send (const uint8_t * buf, size_t len) @@ -255,86 +259,19 @@ namespace stream m_SendBuffer.write ((const char *)buf, len); } m_Service.post (std::bind (&Stream::SendBuffer, shared_from_this ())); - /*bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet - std::vector packets; - while (!m_IsOpen || len > 0) - { - Packet * p = new Packet (); - uint8_t * packet = p->GetBuffer (); - // TODO: implement setters - size_t size = 0; - htobe32buf (packet + size, m_SendStreamID); - size += 4; // sendStreamID - htobe32buf (packet + size, m_RecvStreamID); - size += 4; // receiveStreamID - htobe32buf (packet + size, m_SequenceNumber++); - size += 4; // sequenceNum - if (isNoAck) - htobe32buf (packet + size, m_LastReceivedSequenceNumber); - else - htobuf32 (packet + size, 0); - size += 4; // ack Through - packet[size] = 0; - size++; // NACK count - packet[size] = RESEND_TIMEOUT; - size++; // resend delay - if (!m_IsOpen) - { - // initial packet - m_IsOpen = true; m_IsReset = false; - uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | - PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED; - if (isNoAck) flags |= PACKET_FLAG_NO_ACK; - htobe16buf (packet + size, flags); - size += 2; // flags - size_t identityLen = m_LocalDestination.GetOwner ().GetIdentity ().GetFullLen (); - size_t signatureLen = m_LocalDestination.GetOwner ().GetIdentity ().GetSignatureLen (); - htobe16buf (packet + size, identityLen + signatureLen + 2); // identity + signature + packet size - size += 2; // options size - m_LocalDestination.GetOwner ().GetIdentity ().ToBuffer (packet + size, identityLen); - size += identityLen; // from - htobe16buf (packet + size, STREAMING_MTU); - size += 2; // max packet size - uint8_t * signature = packet + size; // set it later - memset (signature, 0, signatureLen); // zeroes for now - size += signatureLen; // signature - size_t sentLen = STREAMING_MTU - size; - if (len < sentLen) sentLen = len; - memcpy (packet + size, buf, sentLen); - buf += sentLen; - len -= sentLen; - size += sentLen; // payload - m_LocalDestination.GetOwner ().Sign (packet, size, signature); - } - else - { - // follow on packet - htobuf16 (packet + size, 0); - size += 2; // flags - htobuf16 (packet + size, 0); // no options - size += 2; // options size - size_t sentLen = STREAMING_MTU - size; - if (len < sentLen) sentLen = len; - memcpy (packet + size, buf, sentLen); - buf += sentLen; - len -= sentLen; - size += sentLen; // payload - } - p->len = size; - packets.push_back (p); - } - if (packets.size () > 0) - m_Service.post (std::bind (&Stream::PostPackets, shared_from_this (), packets));*/ return len; } void Stream::SendBuffer () - { + { + int numMsgs = WINDOW_SIZE - m_SentPackets.size (); + if (numMsgs <= 0) return; // window is full + bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet std::vector packets; { std::unique_lock l(m_SendBufferMutex); - while (!m_IsOpen || !m_SendBuffer.eof ()) + while (!m_IsOpen || (IsEstablished () && !m_SendBuffer.eof () && numMsgs > 0)) { Packet * p = new Packet (); uint8_t * packet = p->GetBuffer (); @@ -391,6 +328,7 @@ namespace stream } p->len = size; packets.push_back (p); + numMsgs--; } } if (packets.size () > 0) @@ -549,30 +487,6 @@ namespace stream else return false; } - - void Stream::PostPackets (const std::vector packets) - { - if (m_IsOpen) - { - if (packets.size () > 0) - { - m_IsAckSendScheduled = false; - m_AckSendTimer.cancel (); - } - bool isEmpty = m_SentPackets.empty (); - for (auto it: packets) - m_SentPackets.insert (it); - SendPackets (packets); - if (isEmpty) - ScheduleResend (); - } - else - { - // delete - for (auto it: packets) - delete it; - } - } void Stream::SendPackets (const std::vector& packets) { @@ -635,7 +549,7 @@ namespace stream packets.push_back (it); else { - LogPrint (eLogWarning, "Packet ", it->GetSeqn (), "was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate"); + LogPrint (eLogWarning, "Packet ", it->GetSeqn (), " was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts. Terminate"); m_IsOpen = false; m_IsReset = true; m_ReceiveTimer.cancel (); diff --git a/Streaming.h b/Streaming.h index ab94febf..83514966 100644 --- a/Streaming.h +++ b/Streaming.h @@ -44,6 +44,7 @@ namespace stream const int RESEND_TIMEOUT = 10; // in seconds const int ACK_SEND_TIMEOUT = 200; // in milliseconds const int MAX_NUM_RESEND_ATTEMPTS = 5; + const int WINDOW_SIZE = 6; // in messages struct Packet { @@ -117,7 +118,6 @@ namespace stream void SendBuffer (); void SendQuickAck (); bool SendPacket (Packet * packet); - void PostPackets (const std::vector packets); void SendPackets (const std::vector& packets); void SavePacket (Packet * packet);