From c762e41b05382f7e85406c4ee3e08e511388d440 Mon Sep 17 00:00:00 2001 From: orignal Date: Sun, 19 Jan 2014 12:01:12 -0500 Subject: [PATCH] check received sequence number for gaps and duplicates --- HTTPServer.cpp | 2 ++ Streaming.cpp | 46 +++++++++++++++++++++++++++++++++------------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 99bd90f4..a4d3f1e5 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -183,6 +183,8 @@ namespace util std::stringstream ss; uint8_t buf[8192]; size_t r = s->Receive (buf, 8192, 30); // 30 seconds + if (!r && s->IsEstablished ()) // nothing received but connection is established + r = s->Receive (buf, 8192, 30); // wait for another 30 secondd if (r) // we recieved data { ss << std::string ((char *)buf, r); diff --git a/Streaming.cpp b/Streaming.cpp index 30feb6f6..99d51618 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -30,12 +30,12 @@ namespace stream void Stream::HandleNextPacket (Packet * packet) { - const uint8_t * end = packet->buf + packet->len, * buf = packet->buf; + const uint8_t * buf = packet->buf; buf += 4; // sendStreamID if (!m_SendStreamID) m_SendStreamID = be32toh (*(uint32_t *)buf); buf += 4; // receiveStreamID - m_LastReceivedSequenceNumber = be32toh (*(uint32_t *)buf); + uint32_t receivedSeqn = be32toh (*(uint32_t *)buf); buf += 4; // sequenceNum buf += 4; // ackThrough int nackCount = buf[0]; @@ -68,17 +68,37 @@ namespace stream } // we have reached payload section - LogPrint ("seqn=",m_LastReceivedSequenceNumber,", flags=", flags); - std::string str((const char *)buf, end-buf); - LogPrint ("Payload: ", str); - - packet->offset = buf - packet->buf; - if (packet->GetLength () > 0) - m_ReceiveQueue.Put (packet); - else - delete packet; - - SendQuickAck (); + LogPrint ("seqn=", receivedSeqn, ", flags=", flags); + if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1) + { + // we have received next message + packet->offset = buf - packet->buf; + if (packet->GetLength () > 0) + m_ReceiveQueue.Put (packet); + else + delete packet; + + m_LastReceivedSequenceNumber = receivedSeqn; + SendQuickAck (); + } + else + { + if (receivedSeqn <= m_LastReceivedSequenceNumber) + { + // we have received duplicate. Most likely our outbound tunnel is dead + LogPrint ("Duplicate message ", receivedSeqn, " received"); + m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); // pick another tunnel + if (m_OutboundTunnel) + SendQuickAck (); // resend ack for previous message again + } + else + { + LogPrint ("Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); + // actually do nothing. just wait for missing message again + } + delete packet; // packet dropped + } + if (flags & PACKET_FLAG_CLOSE) { LogPrint ("Closed");