diff --git a/Streaming.cpp b/Streaming.cpp index 1c53c4d3..e9d5c3b6 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -126,6 +126,13 @@ namespace stream LogPrint ("Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); // save message and wait for missing message again SavePacket (packet); + // send NACKs for missing messages ASAP + if (m_IsAckSendScheduled) + { + m_IsAckSendScheduled = false; + m_AckSendTimer.cancel (); + } + SendQuickAck (); } } } @@ -320,6 +327,18 @@ namespace stream void Stream::SendQuickAck () { + int32_t lastReceivedSeqn = m_LastReceivedSequenceNumber; + if (!m_SavedPackets.empty ()) + { + int32_t seqn = (*m_SavedPackets.rbegin ())->GetSeqn (); + if (seqn > lastReceivedSeqn) lastReceivedSeqn = seqn; + } + if (lastReceivedSeqn < 0) + { + LogPrint (eLogError, "No packets have been received yet"); + return; + } + Packet p; uint8_t * packet = p.GetBuffer (); size_t size = 0; @@ -329,10 +348,35 @@ namespace stream size += 4; // receiveStreamID *(uint32_t *)(packet + size) = 0; // this is plain Ack message size += 4; // sequenceNum - *(uint32_t *)(packet + size) = htobe32 (m_LastReceivedSequenceNumber); + *(uint32_t *)(packet + size) = htobe32 (lastReceivedSeqn); size += 4; // ack Through - packet[size] = 0; - size++; // NACK count + if (lastReceivedSeqn > m_LastReceivedSequenceNumber) + { + // fill NACKs + uint8_t * nacks = packet + size + 1; + uint8_t numNacks = 0; + auto nextSeqn = m_LastReceivedSequenceNumber + 1; + for (auto it: m_SavedPackets) + { + auto seqn = it->GetSeqn (); + for (uint32_t i = nextSeqn; i < seqn; i++) + { + *(uint32_t *)nacks = htobe32 (i); + nacks += 4; + numNacks++; + } + nextSeqn = seqn + 1; + } + packet[size] = numNacks; + size++; // NACK count + packet += numNacks*4; // NACKs + } + else + { + // No NACKs + packet[size] = 0; + size++; // NACK count + } size++; // resend delay *(uint16_t *)(packet + size) = 0; // nof flags set size += 2; // flags