From 14cdb531c8c6a00629769133898fafd23d22be93 Mon Sep 17 00:00:00 2001 From: hagen Date: Wed, 29 Jun 2016 01:00:00 +0000 Subject: [PATCH 1/2] * Streaming.cpp : tune logs --- Streaming.cpp | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index e5260556..2d3c92ce 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -87,7 +87,7 @@ namespace stream return; } - LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn); + LogPrint (eLogDebug, "Streaming: Received seqn=", receivedSeqn, " on sSID=", m_SendStreamID); if (receivedSeqn == m_LastReceivedSequenceNumber + 1) { // we have received next in sequence message @@ -129,13 +129,13 @@ namespace stream if (receivedSeqn <= m_LastReceivedSequenceNumber) { // we have received duplicate - LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " received"); + LogPrint (eLogWarning, "Streaming: Duplicate message ", receivedSeqn, " on sSID=", m_SendStreamID); SendQuickAck (); // resend ack for previous message again delete packet; // packet dropped } else { - LogPrint (eLogWarning, "Streaming: Missing messages from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); + LogPrint (eLogWarning, "Streaming: Missing messages on sSID=", m_SendStreamID, ": from ", m_LastReceivedSequenceNumber + 1, " to ", receivedSeqn - 1); // save message and wait for missing message again SavePacket (packet); if (m_LastReceivedSequenceNumber >= 0) @@ -183,7 +183,7 @@ namespace stream m_RemoteIdentity = std::make_shared(optionData, packet->GetOptionSize ()); optionData += m_RemoteIdentity->GetFullLen (); if (!m_RemoteLeaseSet) - LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 ()); + LogPrint (eLogDebug, "Streaming: Incoming stream from ", m_RemoteIdentity->GetIdentHash ().ToBase64 (), ", sSID=", m_SendStreamID, ", rSID=", m_RecvStreamID); } if (flags & PACKET_FLAG_MAX_PACKET_SIZE_INCLUDED) @@ -263,7 +263,7 @@ namespace stream uint64_t rtt = ts - sentPacket->sendTime; m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); m_RTO = m_RTT*1.5; // TODO: implement it better - LogPrint (eLogDebug, "Packet ", seqn, " acknowledged rtt=", rtt); + LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt); m_SentPackets.erase (it++); delete sentPacket; acknowledged = true; @@ -451,7 +451,7 @@ namespace stream auto seqn = it->GetSeqn (); if (numNacks + (seqn - nextSeqn) >= 256) { - LogPrint (eLogError, "Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); + LogPrint (eLogError, "Streaming: Number of NACKs exceeds 256. seqn=", seqn, " nextSeqn=", nextSeqn); htobe32buf (packet + 12, nextSeqn); // change ack Through break; } @@ -492,7 +492,7 @@ namespace stream m_Status = eStreamStatusClosing; Close (); // recursion if (m_Status == eStreamStatusClosing) //still closing - LogPrint (eLogInfo, "Streaming: Trying to send stream data before closing"); + LogPrint (eLogDebug, "Streaming: Trying to send stream data before closing, sSID=", m_SendStreamID); break; case eStreamStatusReset: SendClose (); @@ -514,7 +514,7 @@ namespace stream m_LocalDestination.DeleteStream (shared_from_this ()); break; default: - LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status); + LogPrint (eLogWarning, "Streaming: Unexpected stream status ", (int)m_Status, "sSID=", m_SendStreamID); }; } @@ -546,7 +546,7 @@ namespace stream p->len = size; m_Service.post (std::bind (&Stream::SendPacket, shared_from_this (), p)); - LogPrint (eLogDebug, "Streaming: FIN sent"); + LogPrint (eLogDebug, "Streaming: FIN sent, sSID=", m_SendStreamID); } size_t Stream::ConcatenatePackets (uint8_t * buf, size_t len) @@ -600,7 +600,7 @@ namespace stream UpdateCurrentRemoteLease (); if (!m_RemoteLeaseSet) { - LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet"); + LogPrint (eLogError, "Streaming: Can't send packets, missing remote LeaseSet, sSID=", m_SendStreamID); return; } } @@ -625,7 +625,7 @@ namespace stream m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNewOutboundTunnel (m_CurrentOutboundTunnel); if (!m_CurrentOutboundTunnel) { - LogPrint (eLogError, "Streaming: No outbound tunnels in the pool"); + LogPrint (eLogError, "Streaming: No outbound tunnels in the pool, sSID=", m_SendStreamID); return; } @@ -649,7 +649,7 @@ namespace stream m_CurrentOutboundTunnel->SendTunnelDataMsg (msgs); } else - LogPrint (eLogWarning, "Streaming: All leases are expired"); + LogPrint (eLogWarning, "Streaming: All leases are expired, sSID=", m_SendStreamID); } @@ -668,7 +668,7 @@ namespace stream // check for resend attempts if (m_NumResendAttempts >= MAX_NUM_RESEND_ATTEMPTS) { - LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate"); + LogPrint (eLogWarning, "Streaming: packet was not ACKed after ", MAX_NUM_RESEND_ATTEMPTS, " attempts, terminate, sSID=", m_SendStreamID); m_Status = eStreamStatusReset; Close (); return; @@ -703,13 +703,13 @@ namespace stream case 4: if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); UpdateCurrentRemoteLease (); // pick another lease - LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream"); + LogPrint (eLogWarning, "Streaming: Another remote lease has been selected for stream with sSID=", m_SendStreamID); break; case 3: // pick another outbound tunnel if (m_RoutingSession) m_RoutingSession->SetSharedRoutingPath (nullptr); m_CurrentOutboundTunnel = m_LocalDestination.GetOwner ()->GetTunnelPool ()->GetNextOutboundTunnel (m_CurrentOutboundTunnel); - LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream"); + LogPrint (eLogWarning, "Streaming: Another outbound tunnel has been selected for stream with sSID=", m_SendStreamID); break; default: ; } @@ -725,7 +725,7 @@ namespace stream { if (m_LastReceivedSequenceNumber < 0) { - LogPrint (eLogWarning, "Streaming: SYN has not been recived after ", ACK_SEND_TIMEOUT, " milliseconds after follow on, terminate"); + LogPrint (eLogWarning, "Streaming: SYN has not been recived after ", ACK_SEND_TIMEOUT, " milliseconds after follow on, terminate sSID=", m_SendStreamID); m_Status = eStreamStatusReset; Close (); return; @@ -828,7 +828,7 @@ namespace stream it->second->HandleNextPacket (packet); else { - LogPrint (eLogError, "Streaming: Unknown stream sendStreamID=", sendStreamID); + LogPrint (eLogError, "Streaming: Unknown stream sSID=", sendStreamID); delete packet; } } @@ -844,7 +844,7 @@ namespace stream auto it = m_SavedPackets.find (receiveStreamID); if (it != m_SavedPackets.end ()) { - LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for receiveStreamID=", receiveStreamID); + LogPrint (eLogDebug, "Streaming: Processing ", it->second.size (), " saved packets for rSID=", receiveStreamID); for (auto it1: it->second) incomingStream->HandleNextPacket (it1); m_SavedPackets.erase (it); @@ -863,7 +863,7 @@ namespace stream m_PendingIncomingTimer.expires_from_now (boost::posix_time::seconds(PENDING_INCOMING_TIMEOUT)); m_PendingIncomingTimer.async_wait (std::bind (&StreamingDestination::HandlePendingIncomingTimer, shared_from_this (), std::placeholders::_1)); - LogPrint (eLogDebug, "Streaming: Pending incoming stream added"); + LogPrint (eLogDebug, "Streaming: Pending incoming stream added, rSID=", receiveStreamID); } else { From f6e988d6fdca673104b43461d92f723ea9072911 Mon Sep 17 00:00:00 2001 From: orignal Date: Wed, 29 Jun 2016 11:26:46 -0400 Subject: [PATCH 2/2] support zero-hops tunnels for destinations --- Tunnel.cpp | 28 ++++++++++++++++++++-------- Tunnel.h | 11 +++++++---- TunnelConfig.h | 5 +++++ TunnelPool.cpp | 21 +++++++++++++++------ 4 files changed, 47 insertions(+), 18 deletions(-) diff --git a/Tunnel.cpp b/Tunnel.cpp index 961cc97e..478b57bc 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -768,6 +768,22 @@ namespace tunnel return newTunnel; } + std::shared_ptr Tunnels::CreateInboundTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel) + { + if (config->IsEmpty ()) + return CreateZeroHopsInboundTunnel (); + else + return CreateTunnel(config, outboundTunnel); + } + + std::shared_ptr Tunnels::CreateOutboundTunnel (std::shared_ptr config) + { + if (config->IsEmpty ()) + return CreateZeroHopsOutboundTunnel (); + else + return CreateTunnel(config); + } + void Tunnels::AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel) { m_PendingInboundTunnels[replyMsgID] = tunnel; @@ -815,20 +831,22 @@ namespace tunnel } - void Tunnels::CreateZeroHopsInboundTunnel () + std::shared_ptr Tunnels::CreateZeroHopsInboundTunnel () { auto inboundTunnel = std::make_shared (); inboundTunnel->SetState (eTunnelStateEstablished); m_InboundTunnels.push_back (inboundTunnel); m_Tunnels[inboundTunnel->GetTunnelID ()] = inboundTunnel; + return inboundTunnel; } - void Tunnels::CreateZeroHopsOutboundTunnel () + std::shared_ptr Tunnels::CreateZeroHopsOutboundTunnel () { auto outboundTunnel = std::make_shared (); outboundTunnel->SetState (eTunnelStateEstablished); m_OutboundTunnels.push_back (outboundTunnel); // we don't insert into m_Tunnels + return outboundTunnel; } int Tunnels::GetTransitTunnelsExpirationTimeout () @@ -861,12 +879,6 @@ namespace tunnel // TODO: locking return m_OutboundTunnels.size(); } - -#ifdef ANDROID_ARM7A - template std::shared_ptr Tunnels::CreateTunnel(std::shared_ptr, std::shared_ptr); - template std::shared_ptr Tunnels::CreateTunnel(std::shared_ptr, std::shared_ptr); -#endif - } } diff --git a/Tunnel.h b/Tunnel.h index 43417e5d..5bc8b195 100644 --- a/Tunnel.h +++ b/Tunnel.h @@ -176,10 +176,10 @@ namespace tunnel void AddTransitTunnel (std::shared_ptr tunnel); void AddOutboundTunnel (std::shared_ptr newTunnel); void AddInboundTunnel (std::shared_ptr newTunnel); + std::shared_ptr CreateInboundTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel); + std::shared_ptr CreateOutboundTunnel (std::shared_ptr config); void PostTunnelData (std::shared_ptr msg); void PostTunnelData (const std::vector >& msgs); - template - std::shared_ptr CreateTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel = nullptr); void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel); void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr tunnel); std::shared_ptr CreateTunnelPool (int numInboundHops, @@ -189,6 +189,9 @@ namespace tunnel private: + template + std::shared_ptr CreateTunnel (std::shared_ptr config, std::shared_ptr outboundTunnel = nullptr); + template std::shared_ptr GetPendingTunnel (uint32_t replyMsgID, const std::map >& pendingTunnels); @@ -204,8 +207,8 @@ namespace tunnel void ManagePendingTunnels (PendingTunnels& pendingTunnels); void ManageTunnelPools (); - void CreateZeroHopsInboundTunnel (); - void CreateZeroHopsOutboundTunnel (); + std::shared_ptr CreateZeroHopsInboundTunnel (); + std::shared_ptr CreateZeroHopsOutboundTunnel (); private: diff --git a/TunnelConfig.h b/TunnelConfig.h index 0340254e..7546c9b2 100644 --- a/TunnelConfig.h +++ b/TunnelConfig.h @@ -159,6 +159,11 @@ namespace tunnel return num; } + bool IsEmpty () const + { + return !m_FirstHop; + } + virtual bool IsInbound () const { return m_FirstHop->isGateway; } virtual uint32_t GetTunnelID () const diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 5e7e8ec4..92e5f6ff 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -329,8 +329,9 @@ namespace tunnel bool TunnelPool::SelectPeers (std::vector >& peers, bool isInbound) { if (m_ExplicitPeers) return SelectExplicitPeers (peers, isInbound); - auto prevHop = i2p::context.GetSharedRouterInfo (); int numHops = isInbound ? m_NumInboundHops : m_NumOutboundHops; + if (numHops <= 0) return true; // peers is empty + auto prevHop = i2p::context.GetSharedRouterInfo (); if (i2p::transport::transports.GetNumPeers () > 25) { auto r = i2p::transport::transports.GetRandomPeer (); @@ -390,8 +391,10 @@ namespace tunnel if (SelectPeers (peers, true)) { std::reverse (peers.begin (), peers.end ()); - auto tunnel = tunnels.CreateTunnel (std::make_shared (peers), outboundTunnel); + auto tunnel = tunnels.CreateInboundTunnel (std::make_shared (peers), outboundTunnel); tunnel->SetTunnelPool (shared_from_this ()); + if (tunnel->IsEstablished ()) // zero hops + TunnelCreated (tunnel); } else LogPrint (eLogError, "Tunnels: Can't create inbound tunnel, no peers available"); @@ -403,8 +406,10 @@ namespace tunnel if (!outboundTunnel) outboundTunnel = tunnels.GetNextOutboundTunnel (); LogPrint (eLogDebug, "Tunnels: Re-creating destination inbound tunnel..."); - auto newTunnel = tunnels.CreateTunnel (std::make_shared(tunnel->GetPeers ()), outboundTunnel); + auto newTunnel = tunnels.CreateInboundTunnel (std::make_shared(tunnel->GetPeers ()), outboundTunnel); newTunnel->SetTunnelPool (shared_from_this()); + if (newTunnel->IsEstablished ()) // zero hops + TunnelCreated (newTunnel); } void TunnelPool::CreateOutboundTunnel () @@ -418,9 +423,11 @@ namespace tunnel std::vector > peers; if (SelectPeers (peers, false)) { - auto tunnel = tunnels.CreateTunnel ( + auto tunnel = tunnels.CreateOutboundTunnel ( std::make_shared (peers, inboundTunnel->GetNextTunnelID (), inboundTunnel->GetNextIdentHash ())); tunnel->SetTunnelPool (shared_from_this ()); + if (tunnel->IsEstablished ()) // zero hops + TunnelCreated (tunnel); } else LogPrint (eLogError, "Tunnels: Can't create outbound tunnel, no peers available"); @@ -437,10 +444,12 @@ namespace tunnel if (inboundTunnel) { LogPrint (eLogDebug, "Tunnels: Re-creating destination outbound tunnel..."); - auto newTunnel = tunnels.CreateTunnel ( + auto newTunnel = tunnels.CreateOutboundTunnel ( std::make_shared (tunnel->GetPeers (), inboundTunnel->GetNextTunnelID (), inboundTunnel->GetNextIdentHash ())); newTunnel->SetTunnelPool (shared_from_this ()); + if (newTunnel->IsEstablished ()) // zero hops + TunnelCreated (newTunnel); } else LogPrint (eLogDebug, "Tunnels: Can't re-create outbound tunnel, no inbound tunnels found"); @@ -449,7 +458,7 @@ namespace tunnel void TunnelPool::CreatePairedInboundTunnel (std::shared_ptr outboundTunnel) { LogPrint (eLogDebug, "Tunnels: Creating paired inbound tunnel..."); - auto tunnel = tunnels.CreateTunnel (std::make_shared(outboundTunnel->GetInvertedPeers ()), outboundTunnel); + auto tunnel = tunnels.CreateInboundTunnel (std::make_shared(outboundTunnel->GetInvertedPeers ()), outboundTunnel); tunnel->SetTunnelPool (shared_from_this ()); } }