diff --git a/Streaming.cpp b/Streaming.cpp index e5260556..2d3c92ce 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -87,7 +87,7 @@ namespace stream return; } - LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn); + LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn, " on sSID=", m_SendStreamID); if (receivedSeqn == m_LastReceivedSequenceNumber + 1) { // we have received next in sequence message @@ -129,13 +129,13 @@ namespace stream if (receivedSeqn <= m_LastReceivedSequenceNumber) { // we have received duplicate - LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " received"); + LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); SendQuickAck (); // resend ack for previous message again delete packet; // packet dropped } else { - LogPrint (eLogWarning, "Streaming: Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); + LogPrint (eLogWarning, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); // save message and wait for missing message again SavePacket (packet); if (m_LastReceivedSequenceNumber >= 0) @@ -183,7 +183,7 @@ namespace stream m_RemoteIdentity = std::make_shared(optionData, packet->GetOptionSize ()); optionData += m_RemoteIdentity->GetFullLen (); if (!m_RemoteLeaseSet) - LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 ()); + LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), ", sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); } if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) @@ -263,7 +263,7 @@ namespace stream uint64_t rtt = ts - sentPacket->sendTime; m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); m_RTO = m_RTT*1.5; // TODO: implement it better - LogPrint (eLogDebug, "Packet ", seqn, " acknowledged rtt=", rtt); + LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt); m_SentPackets.erase (it++); delete sentPacket; acknowledged = true; @@ -451,7 +451,7 @@ namespace stream auto seqn = it->GetSeqn (); if (numNacks + (seqn - nextSeqn) >= 256) { - LogPrint (eLogError, "Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); + LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); htobe32buf (packet + 12, nextSeqn); // change ack Through break; } @@ -492,7 +492,7 @@ namespace stream m_Status = eStreamStatusClosing; Close (); // recursion if (m_Status == eStreamStatusClosing) //still closing - LogPrint (eLogInfo, "Streaming: Trying to send stream data before closing"); + LogPrint (eLogDebug, "Streaming: Trying to send stream data before closing, sSID=", m_SendStreamID); break; case eStreamStatusReset: SendClose (); @@ -514,7 +514,7 @@ namespace stream m_LocalDestination.DeleteStream (shared_from_this ()); break; default: - LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status); + LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status, "sSID=", m_SendStreamID); }; } @@ -546,7 +546,7 @@ namespace stream p->len = size; m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); - LogPrint (eLogDebug, "Streaming: FIN sent"); + LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID); } size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) @@ -600,7 +600,7 @@ namespace stream UpdateCurrentRemoteLease (); if (!m_RemoteLeaseSet) { - LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet"); + LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet, sSID=", m_SendStreamID); return; } } @@ -625,7 +625,7 @@ namespace stream m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); if (!m_CurrentOutboundTunnel) { - LogPrint (eLogError, "Streaming: No outbound tunnels in the pool"); + LogPrint (eLogError, "Streaming: No outbound tunnels in the pool, sSID=", m_SendStreamID); return; } @@ -649,7 +649,7 @@ namespace stream m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); } else - LogPrint (eLogWarning, "Streaming: All leases are expired"); + LogPrint (eLogWarning, "Streaming: All leases are expired, sSID=", m_SendStreamID); } @@ -668,7 +668,7 @@ namespace stream // check for resend attempts if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS) { - LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate"); + LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate, sSID=", m_SendStreamID); m_Status = eStreamStatusReset; Close (); return; @@ -703,13 +703,13 @@ namespace stream case 4: if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); UpdateCurrentRemoteLease (); // pick another lease - LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream"); + LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream with sSID=", m_SendStreamID); break; case 3: // pick another outbound tunnel if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); - LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream"); + LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); break; default: ; } @@ -725,7 +725,7 @@ namespace stream { if (m_LastReceivedSequenceNumber < 0) { - LogPrint (eLogWarning, "Streaming: SYN has not been recived after ", ACK_SEND_TIMEOUT, " milliseconds after follow on, terminate"); + LogPrint (eLogWarning, "Streaming: SYN has not been recived after ", ACK_SEND_TIMEOUT, " milliseconds after follow on, terminate sSID=", m_SendStreamID); m_Status = eStreamStatusReset; Close (); return; @@ -828,7 +828,7 @@ namespace stream it->second->HandleNextPacket (packet); else { - LogPrint (eLogError, "Streaming: Unknown stream sendStreamID=", sendStreamID); + LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID); delete packet; } } @@ -844,7 +844,7 @@ namespace stream auto it = m_SavedPackets.find (receiveStreamID); if (it != m_SavedPackets.end ()) { - LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for receiveStreamID=", receiveStreamID); + LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for rSID=", receiveStreamID); for (auto it1: it->second) incomingStream->HandleNextPacket (it1); m_SavedPackets.erase (it); @@ -863,7 +863,7 @@ namespace stream m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, shared_from_this (), std::placeholders::_1)); - LogPrint (eLogDebug, "Streaming: Pending incoming stream added"); + LogPrint (eLogDebug, "Streaming: Pending incoming stream added, rSID=", receiveStreamID); } else { diff --git a/Tunnel.cpp b/Tunnel.cpp index 0789e335..6a36501f 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -769,6 +769,22 @@ namespace tunnel return newTunnel; } + std::shared_ptr Tunnels::CreateInboundTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel) + { + if (config->IsEmpty ()) + return CreateZeroHopsInboundTunnel (); + else + return CreateTunnel(config, outboundTunnel); + } + + std::shared_ptr Tunnels::CreateOutboundTunnel (std::shared_ptr config) + { + if (config->IsEmpty ()) + return CreateZeroHopsOutboundTunnel (); + else + return CreateTunnel(config); + } + void Tunnels::AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel) { m_PendingInboundTunnels[replyMsgID] = tunnel; @@ -816,20 +832,22 @@ namespace tunnel } - void Tunnels::CreateZeroHopsInboundTunnel () + std::shared_ptr Tunnels::CreateZeroHopsInboundTunnel () { auto inboundTunnel = std::make_shared (); inboundTunnel->SetState (eTunnelStateEstablished); m_InboundTunnels.push_back (inboundTunnel); m_Tunnels[inboundTunnel->GetTunnelID ()] = inboundTunnel; + return inboundTunnel; } - void Tunnels::CreateZeroHopsOutboundTunnel () + std::shared_ptr Tunnels::CreateZeroHopsOutboundTunnel () { auto outboundTunnel = std::make_shared (); outboundTunnel->SetState (eTunnelStateEstablished); m_OutboundTunnels.push_back (outboundTunnel); // we don't insert into m_Tunnels + return outboundTunnel; } int Tunnels::GetTransitTunnelsExpirationTimeout () @@ -862,12 +880,6 @@ namespace tunnel // TODO: locking return m_OutboundTunnels.size(); } - -#ifdef ANDROID_ARM7A - template std::shared_ptr Tunnels::CreateTunnel(std::shared_ptr, std::shared_ptr); - template std::shared_ptr Tunnels::CreateTunnel(std::shared_ptr, std::shared_ptr); -#endif - } } diff --git a/Tunnel.h b/Tunnel.h index d7fb2456..0d35b682 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -176,10 +176,10 @@ namespace tunnel void AddTransitTunnel (std::shared_ptr tunnel); void AddOutboundTunnel (std::shared_ptr newTunnel); void AddInboundTunnel (std::shared_ptr newTunnel); + std::shared_ptr CreateInboundTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel); + std::shared_ptr CreateOutboundTunnel (std::shared_ptr config); void PostTunnelData (std::shared_ptr msg); void PostTunnelData (const std::vector >& msgs); - template - std::shared_ptr CreateTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel = nullptr); void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel); void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel); std::shared_ptr CreateTunnelPool (int numInboundHops, @@ -189,6 +189,9 @@ namespace tunnel private: + template + std::shared_ptr CreateTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel = nullptr); + template std::shared_ptr GetPendingTunnel (uint32_t replyMsgID, const std::map >& pendingTunnels); @@ -204,8 +207,8 @@ namespace tunnel void ManagePendingTunnels (PendingTunnels& pendingTunnels); void ManageTunnelPools (); - void CreateZeroHopsInboundTunnel (); - void CreateZeroHopsOutboundTunnel (); + std::shared_ptr CreateZeroHopsInboundTunnel (); + std::shared_ptr CreateZeroHopsOutboundTunnel (); private: diff --git a/TunnelConfig.h b/TunnelConfig.h index 0340254e..7546c9b2 100644 --- a/TunnelConfig.h +++ b/TunnelConfig.h @@ -159,6 +159,11 @@ namespace tunnel return num; } + bool IsEmpty () const + { + return !m_FirstHop; + } + virtual bool IsInbound () const { return m_FirstHop->isGateway; } virtual uint32_t GetTunnelID () const diff --git a/TunnelPool.cpp b/TunnelPool.cpp index ab77c9d7..443cdcfa 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -329,17 +329,18 @@ namespace tunnel bool TunnelPool::SelectPeers (std::vector >& peers, bool isInbound) { if (m_ExplicitPeers) return SelectExplicitPeers (peers, isInbound); - auto prevHop = i2p::context.GetSharedRouterInfo (); int numHops = isInbound ? m_NumInboundHops : m_NumOutboundHops; - if(i2p::transport::transports.RoutesRestricted()) - { - /** if routes are restricted prepend trusted first hop */ - auto hop = i2p::transport::transports.GetRestrictedPeer(); - if(!hop) return false; - peers.push_back(hop->GetRouterIdentity()); - prevHop = hop; - } - for (int i = 0; i < numHops; i++) + if (numHops <= 0) return true; + auto prevHop = i2p::context.GetSharedRouterInfo(); + if(i2p::transport::transports.RoutesRestricted()) + { + /** if routes are restricted prepend trusted first hop */ + auto hop = i2p::transport::transports.GetRestrictedPeer(); + if(!hop) return false; + peers.push_back(hop->GetRouterIdentity()); + prevHop = hop; + } + for(int i = 0; i < numHops; i++ ) { auto hop = SelectNextHop (prevHop); if (!hop) @@ -387,8 +388,10 @@ namespace tunnel if (SelectPeers (peers, true)) { std::reverse (peers.begin (), peers.end ()); - auto tunnel = tunnels.CreateTunnel (std::make_shared (peers), outboundTunnel); + auto tunnel = tunnels.CreateInboundTunnel (std::make_shared (peers), outboundTunnel); tunnel->SetTunnelPool (shared_from_this ()); + if (tunnel->IsEstablished ()) // zero hops + TunnelCreated (tunnel); } else LogPrint (eLogError, "Tunnels: Can't create inbound tunnel, no peers available"); @@ -400,8 +403,10 @@ namespace tunnel if (!outboundTunnel) outboundTunnel = tunnels.GetNextOutboundTunnel (); LogPrint (eLogDebug, "Tunnels: Re-creating destination inbound tunnel..."); - auto newTunnel = tunnels.CreateTunnel (std::make_shared(tunnel->GetPeers ()), outboundTunnel); + auto newTunnel = tunnels.CreateInboundTunnel (std::make_shared(tunnel->GetPeers ()), outboundTunnel); newTunnel->SetTunnelPool (shared_from_this()); + if (newTunnel->IsEstablished ()) // zero hops + TunnelCreated (newTunnel); } void TunnelPool::CreateOutboundTunnel () @@ -415,9 +420,11 @@ namespace tunnel std::vector > peers; if (SelectPeers (peers, false)) { - auto tunnel = tunnels.CreateTunnel ( + auto tunnel = tunnels.CreateOutboundTunnel ( std::make_shared (peers, inboundTunnel->GetNextTunnelID (), inboundTunnel->GetNextIdentHash ())); tunnel->SetTunnelPool (shared_from_this ()); + if (tunnel->IsEstablished ()) // zero hops + TunnelCreated (tunnel); } else LogPrint (eLogError, "Tunnels: Can't create outbound tunnel, no peers available"); @@ -434,10 +441,12 @@ namespace tunnel if (inboundTunnel) { LogPrint (eLogDebug, "Tunnels: Re-creating destination outbound tunnel..."); - auto newTunnel = tunnels.CreateTunnel ( + auto newTunnel = tunnels.CreateOutboundTunnel ( std::make_shared (tunnel->GetPeers (), inboundTunnel->GetNextTunnelID (), inboundTunnel->GetNextIdentHash ())); newTunnel->SetTunnelPool (shared_from_this ()); + if (newTunnel->IsEstablished ()) // zero hops + TunnelCreated (newTunnel); } else LogPrint (eLogDebug, "Tunnels: Can't re-create outbound tunnel, no inbound tunnels found"); @@ -446,7 +455,7 @@ namespace tunnel void TunnelPool::CreatePairedInboundTunnel (std::shared_ptr outboundTunnel) { LogPrint (eLogDebug, "Tunnels: Creating paired inbound tunnel..."); - auto tunnel = tunnels.CreateTunnel (std::make_shared(outboundTunnel->GetInvertedPeers ()), outboundTunnel); + auto tunnel = tunnels.CreateInboundTunnel (std::make_shared(outboundTunnel->GetInvertedPeers ()), outboundTunnel); tunnel->SetTunnelPool (shared_from_this ()); } }