Browse Source

update and show send queue size for transports

pull/1833/head
orignal 2 years ago
parent
commit
7b341d5d30
  1. 6
      daemon/HTTPServer.cpp
  2. 9
      libi2pd/NTCP2.cpp
  3. 9
      libi2pd/SSU2Session.cpp
  4. 8
      libi2pd/TransportSession.h

6
daemon/HTTPServer.cpp

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2022, The PurpleI2P Project * Copyright (c) 2013-2023, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -805,6 +805,8 @@ namespace http {
tmp_s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]"; tmp_s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]";
if (it.second->GetRelayTag ()) if (it.second->GetRelayTag ())
tmp_s << " [itag:" << it.second->GetRelayTag () << "]"; tmp_s << " [itag:" << it.second->GetRelayTag () << "]";
if (it.second->GetSendQueueSize () > 0)
tmp_s << " [queue:" << it.second->GetSendQueueSize () << "]";
tmp_s << "</div>\r\n" << std::endl; tmp_s << "</div>\r\n" << std::endl;
cnt++; cnt++;
} }
@ -818,6 +820,8 @@ namespace http {
tmp_s6 << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]"; tmp_s6 << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]";
if (it.second->GetRelayTag ()) if (it.second->GetRelayTag ())
tmp_s6 << " [itag:" << it.second->GetRelayTag () << "]"; tmp_s6 << " [itag:" << it.second->GetRelayTag () << "]";
if (it.second->GetSendQueueSize () > 0)
tmp_s6 << " [queue:" << it.second->GetSendQueueSize () << "]";
tmp_s6 << "</div>\r\n" << std::endl; tmp_s6 << "</div>\r\n" << std::endl;
cnt6++; cnt6++;
} }

9
libi2pd/NTCP2.cpp

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2022, The PurpleI2P Project * Copyright (c) 2013-2023, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -374,6 +374,7 @@ namespace transport
transports.PeerDisconnected (shared_from_this ()); transports.PeerDisconnected (shared_from_this ());
m_Server.RemoveNTCP2Session (shared_from_this ()); m_Server.RemoveNTCP2Session (shared_from_this ());
m_SendQueue.clear (); m_SendQueue.clear ();
m_SendQueueSize = 0;
LogPrint (eLogDebug, "NTCP2: Session terminated"); LogPrint (eLogDebug, "NTCP2: Session terminated");
} }
} }
@ -1057,7 +1058,10 @@ namespace transport
SendRouterInfo (); SendRouterInfo ();
} }
else else
{
SendQueue (); SendQueue ();
m_SendQueueSize = m_SendQueue.size ();
}
} }
} }
@ -1165,7 +1169,7 @@ namespace transport
{ {
if (m_IsTerminated) return; if (m_IsTerminated) return;
for (auto it: msgs) for (auto it: msgs)
m_SendQueue.push_back (it); m_SendQueue.push_back (std::move (it));
if (!m_IsSending) if (!m_IsSending)
SendQueue (); SendQueue ();
else if (m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE) else if (m_SendQueue.size () > NTCP2_MAX_OUTGOING_QUEUE_SIZE)
@ -1174,6 +1178,7 @@ namespace transport
GetIdentHashBase64(), " exceeds ", NTCP2_MAX_OUTGOING_QUEUE_SIZE); GetIdentHashBase64(), " exceeds ", NTCP2_MAX_OUTGOING_QUEUE_SIZE);
Terminate (); Terminate ();
} }
m_SendQueueSize = m_SendQueue.size ();
} }
void NTCP2Session::SendLocalRouterInfo (bool update) void NTCP2Session::SendLocalRouterInfo (bool update)

9
libi2pd/SSU2Session.cpp

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022, The PurpleI2P Project * Copyright (c) 2023, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -216,6 +216,7 @@ namespace transport
m_SessionConfirmedFragment.reset (nullptr); m_SessionConfirmedFragment.reset (nullptr);
m_PathChallenge.reset (nullptr); m_PathChallenge.reset (nullptr);
m_SendQueue.clear (); m_SendQueue.clear ();
m_SendQueueSize = 0;
m_SentPackets.clear (); m_SentPackets.clear ();
m_IncompleteMessages.clear (); m_IncompleteMessages.clear ();
m_RelaySessions.clear (); m_RelaySessions.clear ();
@ -290,7 +291,7 @@ namespace transport
{ {
if (m_State == eSSU2SessionStateTerminated) return; if (m_State == eSSU2SessionStateTerminated) return;
for (auto it: msgs) for (auto it: msgs)
m_SendQueue.push_back (it); m_SendQueue.push_back (std::move (it));
SendQueue (); SendQueue ();
if (m_SendQueue.size () > 0) // windows is full if (m_SendQueue.size () > 0) // windows is full
@ -304,6 +305,7 @@ namespace transport
RequestTermination (eSSU2TerminationReasonTimeout); RequestTermination (eSSU2TerminationReasonTimeout);
} }
} }
m_SendQueueSize = m_SendQueue.size ();
} }
bool SSU2Session::SendQueue () bool SSU2Session::SendQueue ()
@ -463,6 +465,7 @@ namespace transport
LogPrint (eLogInfo, "SSU2: Packet was not Acked after ", it->second->numResends, " attempts. Terminate session"); LogPrint (eLogInfo, "SSU2: Packet was not Acked after ", it->second->numResends, " attempts. Terminate session");
m_SentPackets.clear (); m_SentPackets.clear ();
m_SendQueue.clear (); m_SendQueue.clear ();
m_SendQueueSize = 0;
RequestTermination (eSSU2TerminationReasonTimeout); RequestTermination (eSSU2TerminationReasonTimeout);
return resentPackets.size (); return resentPackets.size ();
} }
@ -2838,6 +2841,8 @@ namespace transport
void SSU2Session::FlushData () void SSU2Session::FlushData ()
{ {
bool sent = SendQueue (); // if we have something to send bool sent = SendQueue (); // if we have something to send
if (sent)
m_SendQueueSize = m_SendQueue.size ();
if (m_IsDataReceived) if (m_IsDataReceived)
{ {
if (!sent) SendQuickAck (); if (!sent) SendQuickAck ();

8
libi2pd/TransportSession.h

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2022, The PurpleI2P Project * Copyright (c) 2013-2023, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -74,7 +74,8 @@ namespace transport
public: public:
TransportSession (std::shared_ptr<const i2p::data::RouterInfo> router, int terminationTimeout): TransportSession (std::shared_ptr<const i2p::data::RouterInfo> router, int terminationTimeout):
m_NumSentBytes (0), m_NumReceivedBytes (0), m_IsOutgoing (router), m_TerminationTimeout (terminationTimeout), m_NumSentBytes (0), m_NumReceivedBytes (0), m_SendQueueSize (0),
m_IsOutgoing (router), m_TerminationTimeout (terminationTimeout),
m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ()) m_LastActivityTimestamp (i2p::util::GetSecondsSinceEpoch ())
{ {
if (router) if (router)
@ -100,6 +101,7 @@ namespace transport
size_t GetNumSentBytes () const { return m_NumSentBytes; }; size_t GetNumSentBytes () const { return m_NumSentBytes; };
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; }; size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
size_t GetSendQueueSize () const { return m_SendQueueSize; };
bool IsOutgoing () const { return m_IsOutgoing; }; bool IsOutgoing () const { return m_IsOutgoing; };
int GetTerminationTimeout () const { return m_TerminationTimeout; }; int GetTerminationTimeout () const { return m_TerminationTimeout; };
@ -119,7 +121,7 @@ namespace transport
std::shared_ptr<const i2p::data::IdentityEx> m_RemoteIdentity; std::shared_ptr<const i2p::data::IdentityEx> m_RemoteIdentity;
mutable std::mutex m_RemoteIdentityMutex; mutable std::mutex m_RemoteIdentityMutex;
size_t m_NumSentBytes, m_NumReceivedBytes; size_t m_NumSentBytes, m_NumReceivedBytes, m_SendQueueSize;
bool m_IsOutgoing; bool m_IsOutgoing;
int m_TerminationTimeout; int m_TerminationTimeout;
uint64_t m_LastActivityTimestamp; uint64_t m_LastActivityTimestamp;

Loading…
Cancel
Save