|
|
@ -87,7 +87,7 @@ namespace stream |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn); |
|
|
|
LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn, " on sSID=", m_SendStreamID); |
|
|
|
if (receivedSeqn == m_LastReceivedSequenceNumber + 1) |
|
|
|
if (receivedSeqn == m_LastReceivedSequenceNumber + 1) |
|
|
|
{ |
|
|
|
{ |
|
|
|
// we have received next in sequence message
|
|
|
|
// we have received next in sequence message
|
|
|
@ -129,13 +129,13 @@ namespace stream |
|
|
|
if (receivedSeqn <= m_LastReceivedSequenceNumber) |
|
|
|
if (receivedSeqn <= m_LastReceivedSequenceNumber) |
|
|
|
{ |
|
|
|
{ |
|
|
|
// we have received duplicate
|
|
|
|
// we have received duplicate
|
|
|
|
LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " received"); |
|
|
|
LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); |
|
|
|
SendQuickAck (); // resend ack for previous message again
|
|
|
|
SendQuickAck (); // resend ack for previous message again
|
|
|
|
delete packet; // packet dropped
|
|
|
|
delete packet; // packet dropped
|
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
else |
|
|
|
{ |
|
|
|
{ |
|
|
|
LogPrint (eLogWarning, "Streaming: Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); |
|
|
|
LogPrint (eLogWarning, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); |
|
|
|
// save message and wait for missing message again
|
|
|
|
// save message and wait for missing message again
|
|
|
|
SavePacket (packet); |
|
|
|
SavePacket (packet); |
|
|
|
if (m_LastReceivedSequenceNumber >= 0) |
|
|
|
if (m_LastReceivedSequenceNumber >= 0) |
|
|
@ -183,7 +183,7 @@ namespace stream |
|
|
|
m_RemoteIdentity = std::make_shared<i2p::data::IdentityEx>(optionData, packet->GetOptionSize ()); |
|
|
|
m_RemoteIdentity = std::make_shared<i2p::data::IdentityEx>(optionData, packet->GetOptionSize ()); |
|
|
|
optionData += m_RemoteIdentity->GetFullLen (); |
|
|
|
optionData += m_RemoteIdentity->GetFullLen (); |
|
|
|
if (!m_RemoteLeaseSet) |
|
|
|
if (!m_RemoteLeaseSet) |
|
|
|
LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 ()); |
|
|
|
LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), ", sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) |
|
|
|
if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) |
|
|
@ -263,7 +263,7 @@ namespace stream |
|
|
|
uint64_t rtt = ts - sentPacket->sendTime; |
|
|
|
uint64_t rtt = ts - sentPacket->sendTime; |
|
|
|
m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); |
|
|
|
m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); |
|
|
|
m_RTO = m_RTT*1.5; // TODO: implement it better
|
|
|
|
m_RTO = m_RTT*1.5; // TODO: implement it better
|
|
|
|
LogPrint (eLogDebug, "Packet ", seqn, " acknowledged rtt=", rtt); |
|
|
|
LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt); |
|
|
|
m_SentPackets.erase (it++); |
|
|
|
m_SentPackets.erase (it++); |
|
|
|
delete sentPacket; |
|
|
|
delete sentPacket; |
|
|
|
acknowledged = true; |
|
|
|
acknowledged = true; |
|
|
@ -451,7 +451,7 @@ namespace stream |
|
|
|
auto seqn = it->GetSeqn (); |
|
|
|
auto seqn = it->GetSeqn (); |
|
|
|
if (numNacks + (seqn - nextSeqn) >= 256) |
|
|
|
if (numNacks + (seqn - nextSeqn) >= 256) |
|
|
|
{ |
|
|
|
{ |
|
|
|
LogPrint (eLogError, "Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); |
|
|
|
LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); |
|
|
|
htobe32buf (packet + 12, nextSeqn); // change ack Through
|
|
|
|
htobe32buf (packet + 12, nextSeqn); // change ack Through
|
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
@ -492,7 +492,7 @@ namespace stream |
|
|
|
m_Status = eStreamStatusClosing; |
|
|
|
m_Status = eStreamStatusClosing; |
|
|
|
Close (); // recursion
|
|
|
|
Close (); // recursion
|
|
|
|
if (m_Status == eStreamStatusClosing) //still closing
|
|
|
|
if (m_Status == eStreamStatusClosing) //still closing
|
|
|
|
LogPrint (eLogInfo, "Streaming: Trying to send stream data before closing"); |
|
|
|
LogPrint (eLogDebug, "Streaming: Trying to send stream data before closing, sSID=", m_SendStreamID); |
|
|
|
break; |
|
|
|
break; |
|
|
|
case eStreamStatusReset: |
|
|
|
case eStreamStatusReset: |
|
|
|
SendClose (); |
|
|
|
SendClose (); |
|
|
@ -514,7 +514,7 @@ namespace stream |
|
|
|
m_LocalDestination.DeleteStream (shared_from_this ()); |
|
|
|
m_LocalDestination.DeleteStream (shared_from_this ()); |
|
|
|
break; |
|
|
|
break; |
|
|
|
default: |
|
|
|
default: |
|
|
|
LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status); |
|
|
|
LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status, "sSID=", m_SendStreamID); |
|
|
|
}; |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -546,7 +546,7 @@ namespace stream |
|
|
|
|
|
|
|
|
|
|
|
p->len = size; |
|
|
|
p->len = size; |
|
|
|
m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); |
|
|
|
m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); |
|
|
|
LogPrint (eLogDebug, "Streaming: FIN sent"); |
|
|
|
LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) |
|
|
|
size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) |
|
|
@ -600,7 +600,7 @@ namespace stream |
|
|
|
UpdateCurrentRemoteLease (); |
|
|
|
UpdateCurrentRemoteLease (); |
|
|
|
if (!m_RemoteLeaseSet) |
|
|
|
if (!m_RemoteLeaseSet) |
|
|
|
{ |
|
|
|
{ |
|
|
|
LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet"); |
|
|
|
LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet, sSID=", m_SendStreamID); |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -625,7 +625,7 @@ namespace stream |
|
|
|
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); |
|
|
|
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); |
|
|
|
if (!m_CurrentOutboundTunnel) |
|
|
|
if (!m_CurrentOutboundTunnel) |
|
|
|
{ |
|
|
|
{ |
|
|
|
LogPrint (eLogError, "Streaming: No outbound tunnels in the pool"); |
|
|
|
LogPrint (eLogError, "Streaming: No outbound tunnels in the pool, sSID=", m_SendStreamID); |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -649,7 +649,7 @@ namespace stream |
|
|
|
m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); |
|
|
|
m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); |
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
else |
|
|
|
LogPrint (eLogWarning, "Streaming: All leases are expired"); |
|
|
|
LogPrint (eLogWarning, "Streaming: All leases are expired, sSID=", m_SendStreamID); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -668,7 +668,7 @@ namespace stream |
|
|
|
// check for resend attempts
|
|
|
|
// check for resend attempts
|
|
|
|
if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS) |
|
|
|
if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS) |
|
|
|
{ |
|
|
|
{ |
|
|
|
LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate"); |
|
|
|
LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate, sSID=", m_SendStreamID); |
|
|
|
m_Status = eStreamStatusReset; |
|
|
|
m_Status = eStreamStatusReset; |
|
|
|
Close (); |
|
|
|
Close (); |
|
|
|
return; |
|
|
|
return; |
|
|
@ -703,13 +703,13 @@ namespace stream |
|
|
|
case 4: |
|
|
|
case 4: |
|
|
|
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); |
|
|
|
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); |
|
|
|
UpdateCurrentRemoteLease (); // pick another lease
|
|
|
|
UpdateCurrentRemoteLease (); // pick another lease
|
|
|
|
LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream"); |
|
|
|
LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream with sSID=", m_SendStreamID); |
|
|
|
break; |
|
|
|
break; |
|
|
|
case 3: |
|
|
|
case 3: |
|
|
|
// pick another outbound tunnel
|
|
|
|
// pick another outbound tunnel
|
|
|
|
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); |
|
|
|
if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); |
|
|
|
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); |
|
|
|
m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); |
|
|
|
LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream"); |
|
|
|
LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); |
|
|
|
break; |
|
|
|
break; |
|
|
|
default: ; |
|
|
|
default: ; |
|
|
|
} |
|
|
|
} |
|
|
@ -725,7 +725,7 @@ namespace stream |
|
|
|
{ |
|
|
|
{ |
|
|
|
if (m_LastReceivedSequenceNumber < 0) |
|
|
|
if (m_LastReceivedSequenceNumber < 0) |
|
|
|
{ |
|
|
|
{ |
|
|
|
LogPrint (eLogWarning, "Streaming: SYN has not been recived after ", ACK_SEND_TIMEOUT, " milliseconds after follow on, terminate"); |
|
|
|
LogPrint (eLogWarning, "Streaming: SYN has not been recived after ", ACK_SEND_TIMEOUT, " milliseconds after follow on, terminate sSID=", m_SendStreamID); |
|
|
|
m_Status = eStreamStatusReset; |
|
|
|
m_Status = eStreamStatusReset; |
|
|
|
Close (); |
|
|
|
Close (); |
|
|
|
return; |
|
|
|
return; |
|
|
@ -828,7 +828,7 @@ namespace stream |
|
|
|
it->second->HandleNextPacket (packet); |
|
|
|
it->second->HandleNextPacket (packet); |
|
|
|
else |
|
|
|
else |
|
|
|
{ |
|
|
|
{ |
|
|
|
LogPrint (eLogError, "Streaming: Unknown stream sendStreamID=", sendStreamID); |
|
|
|
LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID); |
|
|
|
delete packet; |
|
|
|
delete packet; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -844,7 +844,7 @@ namespace stream |
|
|
|
auto it = m_SavedPackets.find (receiveStreamID); |
|
|
|
auto it = m_SavedPackets.find (receiveStreamID); |
|
|
|
if (it != m_SavedPackets.end ()) |
|
|
|
if (it != m_SavedPackets.end ()) |
|
|
|
{ |
|
|
|
{ |
|
|
|
LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for receiveStreamID=", receiveStreamID); |
|
|
|
LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for rSID=", receiveStreamID); |
|
|
|
for (auto it1: it->second) |
|
|
|
for (auto it1: it->second) |
|
|
|
incomingStream->HandleNextPacket (it1); |
|
|
|
incomingStream->HandleNextPacket (it1); |
|
|
|
m_SavedPackets.erase (it); |
|
|
|
m_SavedPackets.erase (it); |
|
|
@ -863,7 +863,7 @@ namespace stream |
|
|
|
m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); |
|
|
|
m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); |
|
|
|
m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, |
|
|
|
m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, |
|
|
|
shared_from_this (), std::placeholders::_1)); |
|
|
|
shared_from_this (), std::placeholders::_1)); |
|
|
|
LogPrint (eLogDebug, "Streaming: Pending incoming stream added"); |
|
|
|
LogPrint (eLogDebug, "Streaming: Pending incoming stream added, rSID=", receiveStreamID); |
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
else |
|
|
|
{ |
|
|
|
{ |
|
|
|