Browse Source

lock queue's mutex less often

pull/2108/head
orignal 1 month ago
parent
commit
c86e0ec371
  1. 15
      libi2pd/Queue.h
  2. 26
      libi2pd/Tunnel.cpp

15
libi2pd/Queue.h

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2020, The PurpleI2P Project * Copyright (c) 2013-2024, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -107,6 +107,19 @@ namespace util
return GetNonThreadSafe (true); return GetNonThreadSafe (true);
} }
void GetWholeQueue (std::queue<Element>& queue)
{
if (!queue.empty ())
{
std::queue<Element> newQueue;
queue.swap (newQueue);
}
{
std::unique_lock<std::mutex> l(m_QueueMutex);
m_Queue.swap (queue);
}
}
private: private:
Element GetNonThreadSafe (bool peek = false) Element GetNonThreadSafe (bool peek = false)

26
libi2pd/Tunnel.cpp

@ -483,14 +483,17 @@ namespace tunnel
{ {
try try
{ {
auto msg = m_Queue.GetNextWithTimeout (1000); // 1 sec std::queue <std::shared_ptr<I2NPMessage> > msgs;
if (msg) if (m_Queue.Wait (1,0)) // 1 sec
{ {
m_Queue.GetWholeQueue (msgs);
int numMsgs = 0; int numMsgs = 0;
uint32_t prevTunnelID = 0, tunnelID = 0; uint32_t prevTunnelID = 0, tunnelID = 0;
std::shared_ptr<TunnelBase> prevTunnel; std::shared_ptr<TunnelBase> prevTunnel;
do while (!msgs.empty ())
{ {
auto msg = msgs.front (); msgs.pop ();
if (!msg) continue;
std::shared_ptr<TunnelBase> tunnel; std::shared_ptr<TunnelBase> tunnel;
uint8_t typeID = msg->GetTypeID (); uint8_t typeID = msg->GetTypeID ();
switch (typeID) switch (typeID)
@ -530,17 +533,18 @@ namespace tunnel
LogPrint (eLogWarning, "Tunnel: Unexpected message type ", (int) typeID); LogPrint (eLogWarning, "Tunnel: Unexpected message type ", (int) typeID);
} }
msg = (numMsgs <= MAX_TUNNEL_MSGS_BATCH_SIZE) ? m_Queue.Get () : nullptr; prevTunnelID = tunnelID;
if (msg) prevTunnel = tunnel;
numMsgs++;
if (msgs.empty ())
{ {
prevTunnelID = tunnelID; if (numMsgs < MAX_TUNNEL_MSGS_BATCH_SIZE && !m_Queue.IsEmpty ())
prevTunnel = tunnel; m_Queue.GetWholeQueue (msgs); // try more
numMsgs++; else if (tunnel)
tunnel->FlushTunnelDataMsgs (); // otherwise flush last
} }
else if (tunnel)
tunnel->FlushTunnelDataMsgs ();
} }
while (msg);
} }
if (i2p::transport::transports.IsOnline()) if (i2p::transport::transports.IsOnline())

Loading…
Cancel
Save