Browse Source

check received sequence number for gaps and duplicates

pull/11/merge
orignal 11 years ago
parent
commit
c762e41b05
  1. 2
      HTTPServer.cpp
  2. 40
      Streaming.cpp

2
HTTPServer.cpp

@ -183,6 +183,8 @@ namespace util
std::stringstream ss; std::stringstream ss;
uint8_t buf[8192]; uint8_t buf[8192];
size_t r = s->Receive (buf, 8192, 30); // 30 seconds size_t r = s->Receive (buf, 8192, 30); // 30 seconds
if (!r && s->IsEstablished ()) // nothing received but connection is established
r = s->Receive (buf, 8192, 30); // wait for another 30 secondd
if (r) // we recieved data if (r) // we recieved data
{ {
ss << std::string ((char *)buf, r); ss << std::string ((char *)buf, r);

40
Streaming.cpp

@ -30,12 +30,12 @@ namespace stream
void Stream::HandleNextPacket (Packet * packet) void Stream::HandleNextPacket (Packet * packet)
{ {
const uint8_t * end = packet->buf + packet->len, * buf = packet->buf; const uint8_t * buf = packet->buf;
buf += 4; // sendStreamID buf += 4; // sendStreamID
if (!m_SendStreamID) if (!m_SendStreamID)
m_SendStreamID = be32toh (*(uint32_t *)buf); m_SendStreamID = be32toh (*(uint32_t *)buf);
buf += 4; // receiveStreamID buf += 4; // receiveStreamID
m_LastReceivedSequenceNumber = be32toh (*(uint32_t *)buf); uint32_t receivedSeqn = be32toh (*(uint32_t *)buf);
buf += 4; // sequenceNum buf += 4; // sequenceNum
buf += 4; // ackThrough buf += 4; // ackThrough
int nackCount = buf[0]; int nackCount = buf[0];
@ -68,17 +68,37 @@ namespace stream
} }
// we have reached payload section // we have reached payload section
LogPrint ("seqn=",m_LastReceivedSequenceNumber,", flags=", flags); LogPrint ("seqn=", receivedSeqn, ", flags=", flags);
std::string str((const char *)buf, end-buf); if (!receivedSeqn || receivedSeqn == m_LastReceivedSequenceNumber + 1)
LogPrint ("Payload: ", str); {
// we have received next message
packet->offset = buf - packet->buf;
if (packet->GetLength () > 0)
m_ReceiveQueue.Put (packet);
else
delete packet;
packet->offset = buf - packet->buf; m_LastReceivedSequenceNumber = receivedSeqn;
if (packet->GetLength () > 0) SendQuickAck ();
m_ReceiveQueue.Put (packet); }
else else
delete packet; {
if (receivedSeqn <= m_LastReceivedSequenceNumber)
{
// we have received duplicate. Most likely our outbound tunnel is dead
LogPrint ("Duplicate message ", receivedSeqn, " received");
m_OutboundTunnel = i2p::tunnel::tunnels.GetNextOutboundTunnel (); // pick another tunnel
if (m_OutboundTunnel)
SendQuickAck (); // resend ack for previous message again
}
else
{
LogPrint ("Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1);
// actually do nothing. just wait for missing message again
}
delete packet; // packet dropped
}
SendQuickAck ();
if (flags & PACKET_FLAG_CLOSE) if (flags & PACKET_FLAG_CLOSE)
{ {
LogPrint ("Closed"); LogPrint ("Closed");

Loading…
Cancel
Save