diff --git a/Streaming.cpp b/Streaming.cpp index 86bfcd14..f8e1e99d 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -11,49 +11,49 @@ namespace i2p { namespace stream { - Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, + Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, std::shared_ptr remote, int port): m_Service (service), - m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), - m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), - m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), - m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), + m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), + m_RemoteLeaseSet (remote), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), + m_AckSendTimer (m_Service), m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (port), m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); m_RemoteIdentity = remote->GetIdentity (); - } + } Stream::Stream (boost::asio::io_service& service, StreamingDestination& local): - m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), + m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_Status (eStreamStatusNew), m_IsAckSendScheduled (false), m_LocalDestination (local), - m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), - m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), + m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), + m_NumSentBytes (0), m_NumReceivedBytes (0), m_Port (0), m_WindowSize (MIN_WINDOW_SIZE), m_RTT (INITIAL_RTT), m_RTO (INITIAL_RTO), m_LastWindowSizeIncreaseTime (0), m_NumResendAttempts (0) { RAND_bytes ((uint8_t *)&m_RecvStreamID, 4); } Stream::~Stream () - { - CleanUp (); + { + CleanUp (); LogPrint (eLogDebug, "Streaming: Stream deleted"); - } + } void Stream::Terminate () { m_AckSendTimer.cancel (); m_ReceiveTimer.cancel (); m_ResendTimer.cancel (); - if (m_SendHandler) + if (m_SendHandler) { auto handler = m_SendHandler; m_SendHandler = nullptr; handler (boost::asio::error::make_error_code (boost::asio::error::operation_aborted)); } - CleanUp (); - m_LocalDestination.DeleteStream (shared_from_this ()); - } + //CleanUp (); /* Need to recheck - broke working on windows */ + m_LocalDestination.DeleteStream (shared_from_this ()); + } void Stream::CleanUp () { @@ -64,25 +64,25 @@ namespace stream m_ReceiveQueue.pop (); m_LocalDestination.DeletePacket (packet); } - + for (auto it: m_SentPackets) m_LocalDestination.DeletePacket (it); m_SentPackets.clear (); - + for (auto it: m_SavedPackets) m_LocalDestination.DeletePacket (it); m_SavedPackets.clear (); - } - + } + void Stream::HandleNextPacket (Packet * packet) { m_NumReceivedBytes += packet->GetLength (); - if (!m_SendStreamID) - m_SendStreamID = packet->GetReceiveStreamID (); + if (!m_SendStreamID) + m_SendStreamID = packet->GetReceiveStreamID (); if (!packet->IsNoAck ()) // ack received ProcessAck (packet); - + int32_t receivedSeqn = packet->GetSeqn (); bool isSyn = packet->IsSYN (); if (!receivedSeqn && !isSyn) @@ -95,13 +95,13 @@ namespace stream LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn, " on sSID=", m_SendStreamID); if (receivedSeqn == m_LastReceivedSequenceNumber + 1) - { + { // we have received next in sequence message ProcessPacket (packet); - + // we should also try stored messages if any for (auto it = m_SavedPackets.begin (); it != m_SavedPackets.end ();) - { + { if ((*it)->GetSeqn () == (uint32_t)(m_LastReceivedSequenceNumber + 1)) { Packet * savedPacket = *it; @@ -125,35 +125,35 @@ namespace stream m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer, shared_from_this (), std::placeholders::_1)); } - } + } else if (isSyn) // we have to send SYN back to incoming connection - SendBuffer (); // also sets m_IsOpen - } - else - { + SendBuffer (); // also sets m_IsOpen + } + else + { if (receivedSeqn <= m_LastReceivedSequenceNumber) { // we have received duplicate LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); SendQuickAck (); // resend ack for previous message again m_LocalDestination.DeletePacket (packet); // packet dropped - } + } else { LogPrint (eLogWarning, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); // save message and wait for missing message again SavePacket (packet); if (m_LastReceivedSequenceNumber >= 0) - { + { // send NACKs for missing messages ASAP if (m_IsAckSendScheduled) { - m_IsAckSendScheduled = false; + m_IsAckSendScheduled = false; m_AckSendTimer.cancel (); } SendQuickAck (); - } + } else { // wait for SYN @@ -161,16 +161,16 @@ namespace stream m_AckSendTimer.expires_from_now (boost::posix_time::milliseconds(ACK_SEND_TIMEOUT)); m_AckSendTimer.async_wait (std::bind (&Stream::HandleAckSendTimer, shared_from_this (), std::placeholders::_1)); - } - } - } - } + } + } + } + } void Stream::SavePacket (Packet * packet) { if (!m_SavedPackets.insert (packet).second) m_LocalDestination.DeletePacket (packet); - } + } void Stream::ProcessPacket (Packet * packet) { @@ -178,52 +178,52 @@ namespace stream uint32_t receivedSeqn = packet->GetSeqn (); uint16_t flags = packet->GetFlags (); LogPrint (eLogDebug, "Streaming: Process seqn=", receivedSeqn, ", flags=", flags); - + const uint8_t * optionData = packet->GetOptionData (); if (flags & PACKET_FLAG_DELAY_REQUESTED) optionData += 2; - + if (flags & PACKET_FLAG_FROM_INCLUDED) { m_RemoteIdentity = std::make_shared(optionData, packet->GetOptionSize ()); optionData += m_RemoteIdentity->GetFullLen (); if (!m_RemoteLeaseSet) LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), ", sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); - } + } if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) { uint16_t maxPacketSize = bufbe16toh (optionData); LogPrint (eLogDebug, "Streaming: Max packet size ", maxPacketSize); optionData += 2; - } - + } + if (flags & PACKET_FLAG_SIGNATURE_INCLUDED) { - uint8_t signature[256]; + uint8_t signature[256]; auto signatureLen = m_RemoteIdentity->GetSignatureLen (); memcpy (signature, optionData, signatureLen); memset (const_cast(optionData), 0, signatureLen); if (!m_RemoteIdentity->Verify (packet->GetBuffer (), packet->GetLength (), signature)) - { + { LogPrint (eLogError, "Streaming: Signature verification failed, sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); Close (); flags |= PACKET_FLAG_CLOSE; - } + } memcpy (const_cast(optionData), signature, signatureLen); optionData += signatureLen; - } + } packet->offset = packet->GetPayload () - packet->buf; if (packet->GetLength () > 0) - { + { m_ReceiveQueue.push (packet); m_ReceiveTimer.cancel (); - } + } else m_LocalDestination.DeletePacket (packet); - + m_LastReceivedSequenceNumber = receivedSeqn; if (flags & PACKET_FLAG_RESET) @@ -237,9 +237,9 @@ namespace stream if (m_Status != eStreamStatusClosed) SendClose (); m_Status = eStreamStatusClosed; - Terminate (); + Terminate (); } - } + } void Stream::ProcessAck (Packet * packet) { @@ -250,7 +250,7 @@ namespace stream { LogPrint (eLogError, "Streaming: Unexpected ackThrough=", ackThrough, " > seqn=", m_SequenceNumber); return; - } + } int nackCount = packet->GetNACKCount (); for (auto it = m_SentPackets.begin (); it != m_SentPackets.end ();) { @@ -271,7 +271,7 @@ namespace stream LogPrint (eLogDebug, "Streaming: Packet ", seqn, " NACK"); ++it; continue; - } + } } auto sentPacket = *it; uint64_t rtt = ts - sentPacket->sendTime; @@ -284,7 +284,7 @@ namespace stream m_RTO = m_RTT*1.5; // TODO: implement it better LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); - m_LocalDestination.DeletePacket (sentPacket); + m_LocalDestination.DeletePacket (sentPacket); acknowledged = true; if (m_WindowSize < WINDOW_SIZE) m_WindowSize++; // slow start @@ -312,13 +312,13 @@ namespace stream { m_NumResendAttempts = 0; SendBuffer (); - } + } if (m_Status == eStreamStatusClosed) Terminate (); else if (m_Status == eStreamStatusClosing) Close (); // check is all outgoing messages have been sent and we can send close - } - + } + size_t Stream::Send (const uint8_t * buf, size_t len) { if (len > 0 && buf) @@ -326,14 +326,14 @@ namespace stream std::unique_lock l(m_SendBufferMutex); m_SendBuffer.clear (); m_SendBuffer.write ((const char *)buf, len); - } + } m_Service.post (std::bind (&Stream::SendBuffer, shared_from_this ())); return len; - } + } void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler) { - if (m_SendHandler) + if (m_SendHandler) handler (boost::asio::error::make_error_code (boost::asio::error::in_progress)); else m_SendHandler = handler; @@ -341,10 +341,10 @@ namespace stream } void Stream::SendBuffer () - { + { int numMsgs = m_WindowSize - m_SentPackets.size (); - if (numMsgs <= 0) return; // window is full - + if (numMsgs <= 0) return; // window is full + bool isNoAck = m_LastReceivedSequenceNumber < 0; // first packet std::vector packets; { @@ -361,20 +361,20 @@ namespace stream size += 4; // receiveStreamID htobe32buf (packet + size, m_SequenceNumber++); size += 4; // sequenceNum - if (isNoAck) + if (isNoAck) htobuf32 (packet + size, 0); else htobe32buf (packet + size, m_LastReceivedSequenceNumber); size += 4; // ack Through - packet[size] = 0; + packet[size] = 0; size++; // NACK count packet[size] = m_RTO/1000; size++; // resend delay if (m_Status == eStreamStatusNew) - { + { // initial packet m_Status = eStreamStatusOpen; - uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | + uint16_t flags = PACKET_FLAG_SYNCHRONIZE | PACKET_FLAG_FROM_INCLUDED | PACKET_FLAG_SIGNATURE_INCLUDED | PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED; if (isNoAck) flags |= PACKET_FLAG_NO_ACK; htobe16buf (packet + size, flags); @@ -383,7 +383,7 @@ namespace stream size_t signatureLen = m_LocalDestination.GetOwner ()->GetIdentity ()->GetSignatureLen (); htobe16buf (packet + size, identityLen + signatureLen + 2); // identity + signature + packet size size += 2; // options size - m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen); + m_LocalDestination.GetOwner ()->GetIdentity ()->ToBuffer (packet + size, identityLen); size += identityLen; // from htobe16buf (packet + size, STREAMING_MTU); size += 2; // max packet size @@ -393,7 +393,7 @@ namespace stream m_SendBuffer.read ((char *)(packet + size), STREAMING_MTU - size); size += m_SendBuffer.gcount (); // payload m_LocalDestination.GetOwner ()->Sign (packet, size, signature); - } + } else { // follow on packet @@ -401,9 +401,9 @@ namespace stream size += 2; // flags htobuf16 (packet + size, 0); // no options size += 2; // options size - m_SendBuffer.read((char *)(packet + size), STREAMING_MTU - size); + m_SendBuffer.read((char *)(packet + size), STREAMING_MTU - size); size += m_SendBuffer.gcount (); // payload - } + } p->len = size; packets.push_back (p); numMsgs--; @@ -413,14 +413,14 @@ namespace stream m_SendHandler (boost::system::error_code ()); m_SendHandler = nullptr; } - } + } if (packets.size () > 0) { if (m_SavedPackets.empty ()) // no NACKS - { - m_IsAckSendScheduled = false; + { + m_IsAckSendScheduled = false; m_AckSendTimer.cancel (); - } + } bool isEmpty = m_SentPackets.empty (); auto ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto& it: packets) @@ -433,9 +433,9 @@ namespace stream SendClose (); if (isEmpty) ScheduleResend (); - } + } } - + void Stream::SendQuickAck () { int32_t lastReceivedSeqn = m_LastReceivedSequenceNumber; @@ -443,15 +443,15 @@ namespace stream { int32_t seqn = (*m_SavedPackets.rbegin ())->GetSeqn (); if (seqn > lastReceivedSeqn) lastReceivedSeqn = seqn; - } - if (lastReceivedSeqn < 0) - { + } + if (lastReceivedSeqn < 0) + { LogPrint (eLogError, "Streaming: No packets have been received yet"); return; } - + Packet p; - uint8_t * packet = p.GetBuffer (); + uint8_t * packet = p.GetBuffer (); size_t size = 0; htobe32buf (packet + size, m_SendStreamID); size += 4; // sendStreamID @@ -462,8 +462,8 @@ namespace stream htobe32buf (packet + size, lastReceivedSeqn); size += 4; // ack Through uint8_t numNacks = 0; - if (lastReceivedSeqn > m_LastReceivedSequenceNumber) - { + if (lastReceivedSeqn > m_LastReceivedSequenceNumber) + { // fill NACKs uint8_t * nacks = packet + size + 1; auto nextSeqn = m_LastReceivedSequenceNumber + 1; @@ -475,35 +475,35 @@ namespace stream LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); htobe32buf (packet + 12, nextSeqn); // change ack Through break; - } + } for (uint32_t i = nextSeqn; i < seqn; i++) { htobe32buf (nacks, i); nacks += 4; numNacks++; - } + } nextSeqn = seqn + 1; } - packet[size] = numNacks; - size++; // NACK count + packet[size] = numNacks; + size++; // NACK count size += numNacks*4; // NACKs - } + } else { // No NACKs - packet[size] = 0; - size++; // NACK count - } + packet[size] = 0; + size++; // NACK count + } size++; // resend delay htobuf16 (packet + size, 0); // nof flags set size += 2; // flags htobuf16 (packet + size, 0); // no options size += 2; // options size - p.len = size; + p.len = size; SendPackets (std::vector { &p }); LogPrint (eLogDebug, "Streaming: Quick Ack sent. ", (int)numNacks, " NACKs"); - } + } void Stream::Close () { @@ -518,7 +518,7 @@ namespace stream break; case eStreamStatusReset: // TODO: send reset - Terminate (); + Terminate (); break; case eStreamStatusClosing: if (m_SentPackets.empty () && m_SendBuffer.eof ()) // nothing to send @@ -530,10 +530,10 @@ namespace stream case eStreamStatusClosed: // already closed Terminate (); - break; + break; default: LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status, "sSID=", m_SendStreamID); - }; + }; } void Stream::SendClose () @@ -549,7 +549,7 @@ namespace stream size += 4; // sequenceNum htobe32buf (packet + size, m_LastReceivedSequenceNumber >= 0 ? m_LastReceivedSequenceNumber : 0); size += 4; // ack Through - packet[size] = 0; + packet[size] = 0; size++; // NACK count size++; // resend delay htobe16buf (packet + size, PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED); @@ -561,12 +561,12 @@ namespace stream memset (packet + size, 0, signatureLen); size += signatureLen; // signature m_LocalDestination.GetOwner ()->Sign (packet, size, signature); - + p->len = size; m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID); - } - + } + size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) { size_t pos = 0; @@ -581,18 +581,18 @@ namespace stream { m_ReceiveQueue.pop (); m_LocalDestination.DeletePacket (packet); - } - } - return pos; + } + } + return pos; } bool Stream::SendPacket (Packet * packet) { if (packet) - { + { if (m_IsAckSendScheduled) { - m_IsAckSendScheduled = false; + m_IsAckSendScheduled = false; m_AckSendTimer.cancel (); } SendPackets (std::vector { packet }); @@ -600,17 +600,17 @@ namespace stream m_SentPackets.insert (packet); if (isEmpty) ScheduleResend (); - return true; - } + return true; + } else return false; - } - + } + void Stream::SendPackets (const std::vector& packets) { if (!m_RemoteLeaseSet) { - UpdateCurrentRemoteLease (); + UpdateCurrentRemoteLease (); if (!m_RemoteLeaseSet) { LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet, sSID=", m_SendStreamID); @@ -618,7 +618,7 @@ namespace stream } } if (!m_RoutingSession || !m_RoutingSession->GetOwner ()) // expired and detached - m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true); + m_RoutingSession = m_LocalDestination.GetOwner ()->GetRoutingSession (m_RemoteLeaseSet, true); if (!m_CurrentOutboundTunnel && m_RoutingSession) // first message to send { // try to get shared path first @@ -630,7 +630,7 @@ namespace stream m_RTT = routingPath->rtt; m_RTO = m_RTT*1.5; // TODO: implement it better } - } + } if (!m_CurrentOutboundTunnel || !m_CurrentOutboundTunnel->IsEstablished ()) m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); if (!m_CurrentOutboundTunnel) @@ -639,30 +639,30 @@ namespace stream return; } - auto ts = i2p::util::GetMillisecondsSinceEpoch (); + auto ts = i2p::util::GetMillisecondsSinceEpoch (); if (!m_CurrentRemoteLease || !m_CurrentRemoteLease->endDate || // excluded from LeaseSet ts >= m_CurrentRemoteLease->endDate - i2p::data::LEASE_ENDDATE_THRESHOLD) UpdateCurrentRemoteLease (true); if (m_CurrentRemoteLease && ts < m_CurrentRemoteLease->endDate + i2p::data::LEASE_ENDDATE_THRESHOLD) - { + { std::vector msgs; for (auto it: packets) - { + { auto msg = m_RoutingSession->WrapSingleMessage (m_LocalDestination.CreateDataMessage (it->GetBuffer (), it->GetLength (), m_Port)); - msgs.push_back (i2p::tunnel::TunnelMessageBlock - { + msgs.push_back (i2p::tunnel::TunnelMessageBlock + { i2p::tunnel::eDeliveryTypeTunnel, m_CurrentRemoteLease->tunnelGateway, m_CurrentRemoteLease->tunnelID, msg - }); + }); m_NumSentBytes += it->GetLength (); } m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); - } + } else - { + { LogPrint (eLogWarning, "Streaming: Remote lease is not available, sSID=", m_SendStreamID); - if (m_RoutingSession) + if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); // invalidate routing path } } @@ -670,8 +670,8 @@ namespace stream void Stream::SendUpdatedLeaseSet () { if (m_RoutingSession) - { - if (m_RoutingSession->IsLeaseSetNonConfirmed ()) + { + if (m_RoutingSession->IsLeaseSetNonConfirmed ()) { auto ts = i2p::util::GetMillisecondsSinceEpoch (); if (ts > m_RoutingSession->GetLeaseSetSubmissionTime () + i2p::garlic::LEASET_CONFIRMATION_TIMEOUT) @@ -679,19 +679,19 @@ namespace stream // LeaseSet was not confirmed, should try other tunnels LogPrint (eLogWarning, "Streaming: LeaseSet was not confrimed in ", i2p::garlic::LEASET_CONFIRMATION_TIMEOUT, " milliseconds. Trying to resubmit"); m_RoutingSession->SetSharedRoutingPath (nullptr); - m_CurrentOutboundTunnel = nullptr; + m_CurrentOutboundTunnel = nullptr; m_CurrentRemoteLease = nullptr; SendQuickAck (); - } - } + } + } else if (m_RoutingSession->IsLeaseSetUpdated ()) - { + { LogPrint (eLogDebug, "Streaming: sending updated LeaseSet"); SendQuickAck (); - } - } - } - + } + } + } + void Stream::ScheduleResend () { m_ResendTimer.cancel (); @@ -701,11 +701,11 @@ namespace stream m_ResendTimer.async_wait (std::bind (&Stream::HandleResendTimer, shared_from_this (), std::placeholders::_1)); } - + void Stream::HandleResendTimer (const boost::system::error_code& ecode) { - if (ecode != boost::asio::error::operation_aborted) - { + if (ecode != boost::asio::error::operation_aborted) + { // check for resend attempts if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS) { @@ -713,7 +713,7 @@ namespace stream m_Status = eStreamStatusReset; Close (); return; - } + } // collect packets to resend auto ts = i2p::util::GetMillisecondsSinceEpoch (); @@ -724,8 +724,8 @@ namespace stream { it->sendTime = ts; packets.push_back (it); - } - } + } + } // select tunnels if necessary and send if (packets.size () > 0) @@ -733,7 +733,7 @@ namespace stream m_NumResendAttempts++; m_RTO *= 2; switch (m_NumResendAttempts) - { + { case 1: // congesion avoidance m_WindowSize /= 2; if (m_WindowSize < MIN_WINDOW_SIZE) m_WindowSize = MIN_WINDOW_SIZE; @@ -745,21 +745,21 @@ namespace stream if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); UpdateCurrentRemoteLease (); // pick another lease LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream with rSID=", m_RecvStreamID, ", sSID=", m_SendStreamID); - break; + break; case 3: - // pick another outbound tunnel + // pick another outbound tunnel if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); - m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); + m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); break; - default: ; - } + default: ; + } SendPackets (packets); - } + } ScheduleResend (); - } - } - + } + } + void Stream::HandleAckSendTimer (const boost::system::error_code& ecode) { if (m_IsAckSendScheduled) @@ -770,10 +770,10 @@ namespace stream m_Status = eStreamStatusReset; Close (); return; - } + } if (m_Status == eStreamStatusOpen) { - if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ()) + if (m_RoutingSession && m_RoutingSession->IsLeaseSetNonConfirmed ()) { // seems something went wrong and we should re-select tunnels m_CurrentOutboundTunnel = nullptr; @@ -782,19 +782,19 @@ namespace stream SendQuickAck (); } m_IsAckSendScheduled = false; - } + } } void Stream::UpdateCurrentRemoteLease (bool expired) { - if (!m_RemoteLeaseSet || m_RemoteLeaseSet->IsExpired ()) + if (!m_RemoteLeaseSet || m_RemoteLeaseSet->IsExpired ()) { m_RemoteLeaseSet = m_LocalDestination.GetOwner ()->FindLeaseSet (m_RemoteIdentity->GetIdentHash ()); - if (!m_RemoteLeaseSet) - { + if (!m_RemoteLeaseSet) + { LogPrint (eLogWarning, "Streaming: LeaseSet ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), " not found"); m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ()); // try to request for a next attempt - } + } } if (m_RemoteLeaseSet) { @@ -805,11 +805,11 @@ namespace stream { expired = false; m_LocalDestination.GetOwner ()->RequestDestination (m_RemoteIdentity->GetIdentHash ()); // time to request - leases = m_RemoteLeaseSet->GetNonExpiredLeases (true); // then with threshold + leases = m_RemoteLeaseSet->GetNonExpiredLeases (true); // then with threshold } if (!leases.empty ()) - { - bool updated = false; + { + bool updated = false; if (expired && m_CurrentRemoteLease) { for (const auto& it: leases) @@ -818,34 +818,34 @@ namespace stream m_CurrentRemoteLease = it; updated = true; break; - } + } } if (!updated) { uint32_t i = rand () % leases.size (); if (m_CurrentRemoteLease && leases[i]->tunnelID == m_CurrentRemoteLease->tunnelID) - // make sure we don't select previous + // make sure we don't select previous i = (i + 1) % leases.size (); // if so, pick next - m_CurrentRemoteLease = leases[i]; + m_CurrentRemoteLease = leases[i]; } - } + } else - { + { LogPrint (eLogWarning, "Streaming: All remote leases are expired"); m_RemoteLeaseSet = nullptr; m_CurrentRemoteLease = nullptr; // we have requested expired before, no need to do it twice - } + } } else { LogPrint (eLogWarning, "Streaming: Remote LeaseSet not found"); m_CurrentRemoteLease = nullptr; - } - } + } + } - StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort, bool gzip): - m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), + StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort, bool gzip): + m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), m_LastIncomingReceiveStreamID (0), m_PendingIncomingTimer (m_Owner->GetService ()), m_ConnTrackTimer(m_Owner->GetService()), @@ -853,13 +853,13 @@ namespace stream m_LastBanClear(i2p::util::GetMillisecondsSinceEpoch()) { } - + StreamingDestination::~StreamingDestination () { for (auto& it: m_SavedPackets) { for (auto it1: it.second) DeletePacket (it1); - it.second.clear (); + it.second.clear (); } m_SavedPackets.clear (); } @@ -868,9 +868,9 @@ namespace stream { ScheduleConnTrack(); } - + void StreamingDestination::Stop () - { + { ResetAcceptor (); m_PendingIncomingTimer.cancel (); m_PendingIncomingStreams.clear (); @@ -883,34 +883,34 @@ namespace stream std::unique_lock l(m_ConnsMutex); m_Conns.clear (); } - } - + } + void StreamingDestination::HandleNextPacket (Packet * packet) { uint32_t sendStreamID = packet->GetSendStreamID (); if (sendStreamID) - { + { auto it = m_Streams.find (sendStreamID); if (it != m_Streams.end ()) it->second->HandleNextPacket (packet); else - { + { LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID); DeletePacket (packet); } - } - else + } + else { if (packet->IsSYN () && !packet->GetSeqn ()) // new incoming stream - { + { uint32_t receiveStreamID = packet->GetReceiveStreamID (); - if (receiveStreamID == m_LastIncomingReceiveStreamID) + if (receiveStreamID == m_LastIncomingReceiveStreamID) { // already pending LogPrint(eLogWarning, "Streaming: Incoming streaming with rSID=", receiveStreamID, " already exists"); DeletePacket (packet); // drop it, because previous should be connected return; - } + } auto incomingStream = CreateNewIncomingStream (); incomingStream->HandleNextPacket (packet); // SYN auto ident = incomingStream->GetRemoteIdentity(); @@ -926,7 +926,7 @@ namespace stream } } m_LastIncomingReceiveStreamID = receiveStreamID; - + // handle saved packets if any { auto it = m_SavedPackets.find (receiveStreamID); @@ -936,7 +936,7 @@ namespace stream for (auto it1: it->second) incomingStream->HandleNextPacket (it1); m_SavedPackets.erase (it); - } + } } // accept if (m_Acceptor != nullptr) @@ -949,17 +949,17 @@ namespace stream m_PendingIncomingStreams.push_back (incomingStream); m_PendingIncomingTimer.cancel (); m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); - m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, + m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, shared_from_this (), std::placeholders::_1)); LogPrint (eLogDebug, "Streaming: Pending incoming stream added, rSID=", receiveStreamID); } else - { + { LogPrint (eLogWarning, "Streaming: Pending incoming streams backlog exceeds ", MAX_PENDING_INCOMING_BACKLOG); incomingStream->Close (); - } - } - } + } + } + } else // follow on packet without SYN { uint32_t receiveStreamID = packet->GetReceiveStreamID (); @@ -994,17 +994,17 @@ namespace stream } }); } - } - } - } - + } + } + } + std::shared_ptr StreamingDestination::CreateNewOutgoingStream (std::shared_ptr remote, int port) { auto s = std::make_shared (m_Owner->GetService (), *this, remote, port); std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; return s; - } + } std::shared_ptr StreamingDestination::CreateNewIncomingStream () { @@ -1017,39 +1017,39 @@ namespace stream void StreamingDestination::DeleteStream (std::shared_ptr stream) { if (stream) - { + { std::unique_lock l(m_StreamsMutex); auto it = m_Streams.find (stream->GetRecvStreamID ()); if (it != m_Streams.end ()) m_Streams.erase (it); - } - } + } + } - void StreamingDestination::SetAcceptor (const Acceptor& acceptor) - { + void StreamingDestination::SetAcceptor (const Acceptor& acceptor) + { m_Acceptor = acceptor; // we must set it immediately for IsAcceptorSet auto s = shared_from_this (); m_Owner->GetService ().post([s](void) - { + { // take care about incoming queue for (auto& it: s->m_PendingIncomingStreams) if (it->GetStatus () == eStreamStatusOpen) // still open? s->m_Acceptor (it); s->m_PendingIncomingStreams.clear (); s->m_PendingIncomingTimer.cancel (); - }); + }); } - void StreamingDestination::ResetAcceptor () - { - if (m_Acceptor) m_Acceptor (nullptr); - m_Acceptor = nullptr; + void StreamingDestination::ResetAcceptor () + { + if (m_Acceptor) m_Acceptor (nullptr); + m_Acceptor = nullptr; } void StreamingDestination::AcceptOnce (const Acceptor& acceptor) { m_Owner->GetService ().post([acceptor, this](void) - { + { if (!m_PendingIncomingStreams.empty ()) { acceptor (m_PendingIncomingStreams.front ()); @@ -1059,14 +1059,14 @@ namespace stream } else // we must save old acceptor and set it back { - auto oldAcceptor = m_Acceptor; + auto oldAcceptor = m_Acceptor; m_Acceptor = [acceptor, oldAcceptor, this](std::shared_ptr stream) { acceptor (stream); m_Acceptor = oldAcceptor; }; } - }); + }); } void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode) @@ -1077,9 +1077,9 @@ namespace stream for (auto& it: m_PendingIncomingStreams) it->Close (); m_PendingIncomingStreams.clear (); - } - } - + } + } + void StreamingDestination::HandleDataMessagePayload (const uint8_t * buf, size_t len) { // unzip it @@ -1087,7 +1087,7 @@ namespace stream uncompressed->offset = 0; uncompressed->len = m_Inflator.Inflate (buf, len, uncompressed->buf, MAX_PACKET_SIZE); if (uncompressed->len) - HandleNextPacket (uncompressed); + HandleNextPacket (uncompressed); else DeletePacket (uncompressed); } @@ -1107,11 +1107,11 @@ namespace stream { htobe32buf (msg->GetPayload (), size); // length htobe16buf (buf + 4, m_LocalPort); // source port - htobe16buf (buf + 6, toPort); // destination port + htobe16buf (buf + 6, toPort); // destination port buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol - msg->len += size; + msg->len += size; msg->FillI2NPMessageHeader (eI2NPData); - } + } else msg = nullptr; return msg; @@ -1164,15 +1164,15 @@ namespace stream } // reschedule timer ScheduleConnTrack(); - } + } } void StreamingDestination::ScheduleConnTrack() { m_ConnTrackTimer.expires_from_now (boost::posix_time::seconds(60)); m_ConnTrackTimer.async_wait ( - std::bind (&StreamingDestination::HandleConnTrack, + std::bind (&StreamingDestination::HandleConnTrack, shared_from_this (), std::placeholders::_1)); } -} -} +} +}