Browse Source

use std::list and splice fr msg queue

pull/2108/head
orignal 1 month ago
parent
commit
8210911bc5
  1. 6
      libi2pd/I2NPProtocol.cpp
  2. 3
      libi2pd/I2NPProtocol.h
  3. 4
      libi2pd/NetDb.cpp
  4. 27
      libi2pd/Queue.h
  5. 6
      libi2pd/Tunnel.cpp
  6. 2
      libi2pd/Tunnel.h

6
libi2pd/I2NPProtocol.cpp

@ -932,14 +932,8 @@ namespace i2p
void I2NPMessagesHandler::Flush () void I2NPMessagesHandler::Flush ()
{ {
if (!m_TunnelMsgs.empty ()) if (!m_TunnelMsgs.empty ())
{
i2p::tunnel::tunnels.PostTunnelData (m_TunnelMsgs); i2p::tunnel::tunnels.PostTunnelData (m_TunnelMsgs);
m_TunnelMsgs.clear ();
}
if (!m_TunnelGatewayMsgs.empty ()) if (!m_TunnelGatewayMsgs.empty ())
{
i2p::tunnel::tunnels.PostTunnelData (m_TunnelGatewayMsgs); i2p::tunnel::tunnels.PostTunnelData (m_TunnelGatewayMsgs);
m_TunnelGatewayMsgs.clear ();
}
} }
} }

3
libi2pd/I2NPProtocol.h

@ -13,6 +13,7 @@
#include <string.h> #include <string.h>
#include <unordered_set> #include <unordered_set>
#include <memory> #include <memory>
#include <list>
#include <functional> #include <functional>
#include "Crypto.h" #include "Crypto.h"
#include "I2PEndian.h" #include "I2PEndian.h"
@ -328,7 +329,7 @@ namespace tunnel
private: private:
std::vector<std::shared_ptr<I2NPMessage> > m_TunnelMsgs, m_TunnelGatewayMsgs; std::list<std::shared_ptr<I2NPMessage> > m_TunnelMsgs, m_TunnelGatewayMsgs;
}; };
} }

4
libi2pd/NetDb.cpp

@ -122,7 +122,7 @@ namespace data
uint64_t lastProfilesCleanup = i2p::util::GetMonotonicMilliseconds (), lastObsoleteProfilesCleanup = lastProfilesCleanup; uint64_t lastProfilesCleanup = i2p::util::GetMonotonicMilliseconds (), lastObsoleteProfilesCleanup = lastProfilesCleanup;
int16_t profilesCleanupVariance = 0, obsoleteProfilesCleanVariance = 0; int16_t profilesCleanupVariance = 0, obsoleteProfilesCleanVariance = 0;
std::queue <std::shared_ptr<const I2NPMessage> > msgs; std::list<std::shared_ptr<const I2NPMessage> > msgs;
while (m_IsRunning) while (m_IsRunning)
{ {
try try
@ -132,7 +132,7 @@ namespace data
m_Queue.GetWholeQueue (msgs); m_Queue.GetWholeQueue (msgs);
while (!msgs.empty ()) while (!msgs.empty ())
{ {
auto msg = msgs.front (); msgs.pop (); auto msg = msgs.front (); msgs.pop_front ();
if (!msg) continue; if (!msg) continue;
LogPrint(eLogDebug, "NetDb: Got request with type ", (int) msg->GetTypeID ()); LogPrint(eLogDebug, "NetDb: Got request with type ", (int) msg->GetTypeID ());
switch (msg->GetTypeID ()) switch (msg->GetTypeID ())

27
libi2pd/Queue.h

@ -9,8 +9,7 @@
#ifndef QUEUE_H__ #ifndef QUEUE_H__
#define QUEUE_H__ #define QUEUE_H__
#include <queue> #include <list>
#include <vector>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <condition_variable> #include <condition_variable>
@ -29,22 +28,20 @@ namespace util
void Put (Element e) void Put (Element e)
{ {
std::unique_lock<std::mutex> l(m_QueueMutex); std::unique_lock<std::mutex> l(m_QueueMutex);
m_Queue.push (std::move(e)); m_Queue.push_back (std::move(e));
m_NonEmpty.notify_one (); m_NonEmpty.notify_one ();
} }
template<template<typename, typename...>class Container, typename... R> void Put (std::list<Element>& list)
void Put (const Container<Element, R...>& vec)
{ {
if (!vec.empty ()) if (!list.empty ())
{ {
std::unique_lock<std::mutex> l(m_QueueMutex); std::unique_lock<std::mutex> l(m_QueueMutex);
for (const auto& it: vec) m_Queue.splice (m_Queue.end (), list);
m_Queue.push (std::move(it));
m_NonEmpty.notify_one (); m_NonEmpty.notify_one ();
} }
} }
Element GetNext () Element GetNext ()
{ {
std::unique_lock<std::mutex> l(m_QueueMutex); std::unique_lock<std::mutex> l(m_QueueMutex);
@ -107,11 +104,11 @@ namespace util
return GetNonThreadSafe (true); return GetNonThreadSafe (true);
} }
void GetWholeQueue (std::queue<Element>& queue) void GetWholeQueue (std::list<Element>& queue)
{ {
if (!queue.empty ()) if (!queue.empty ())
{ {
std::queue<Element> newQueue; std::list<Element> newQueue;
queue.swap (newQueue); queue.swap (newQueue);
} }
{ {
@ -128,7 +125,7 @@ namespace util
{ {
auto el = m_Queue.front (); auto el = m_Queue.front ();
if (!peek) if (!peek)
m_Queue.pop (); m_Queue.pop_front ();
return el; return el;
} }
return nullptr; return nullptr;
@ -136,7 +133,7 @@ namespace util
private: private:
std::queue<Element> m_Queue; std::list<Element> m_Queue;
std::mutex m_QueueMutex; std::mutex m_QueueMutex;
std::condition_variable m_NonEmpty; std::condition_variable m_NonEmpty;
}; };

6
libi2pd/Tunnel.cpp

@ -479,7 +479,7 @@ namespace tunnel
std::this_thread::sleep_for (std::chrono::seconds(1)); // wait for other parts are ready std::this_thread::sleep_for (std::chrono::seconds(1)); // wait for other parts are ready
uint64_t lastTs = 0, lastPoolsTs = 0, lastMemoryPoolTs = 0; uint64_t lastTs = 0, lastPoolsTs = 0, lastMemoryPoolTs = 0;
std::queue <std::shared_ptr<I2NPMessage> > msgs; std::list<std::shared_ptr<I2NPMessage> > msgs;
while (m_IsRunning) while (m_IsRunning)
{ {
try try
@ -492,7 +492,7 @@ namespace tunnel
std::shared_ptr<TunnelBase> prevTunnel; std::shared_ptr<TunnelBase> prevTunnel;
while (!msgs.empty ()) while (!msgs.empty ())
{ {
auto msg = msgs.front (); msgs.pop (); auto msg = msgs.front (); msgs.pop_front ();
if (!msg) continue; if (!msg) continue;
std::shared_ptr<TunnelBase> tunnel; std::shared_ptr<TunnelBase> tunnel;
uint8_t typeID = msg->GetTypeID (); uint8_t typeID = msg->GetTypeID ();
@ -830,7 +830,7 @@ namespace tunnel
if (msg) m_Queue.Put (msg); if (msg) m_Queue.Put (msg);
} }
void Tunnels::PostTunnelData (const std::vector<std::shared_ptr<I2NPMessage> >& msgs) void Tunnels::PostTunnelData (std::list<std::shared_ptr<I2NPMessage> >& msgs)
{ {
m_Queue.Put (msgs); m_Queue.Put (msgs);
} }

2
libi2pd/Tunnel.h

@ -229,7 +229,7 @@ namespace tunnel
std::shared_ptr<InboundTunnel> CreateInboundTunnel (std::shared_ptr<TunnelConfig> config, std::shared_ptr<TunnelPool> pool, std::shared_ptr<OutboundTunnel> outboundTunnel); std::shared_ptr<InboundTunnel> CreateInboundTunnel (std::shared_ptr<TunnelConfig> config, std::shared_ptr<TunnelPool> pool, std::shared_ptr<OutboundTunnel> outboundTunnel);
std::shared_ptr<OutboundTunnel> CreateOutboundTunnel (std::shared_ptr<TunnelConfig> config, std::shared_ptr<TunnelPool> pool); std::shared_ptr<OutboundTunnel> CreateOutboundTunnel (std::shared_ptr<TunnelConfig> config, std::shared_ptr<TunnelPool> pool);
void PostTunnelData (std::shared_ptr<I2NPMessage> msg); void PostTunnelData (std::shared_ptr<I2NPMessage> msg);
void PostTunnelData (const std::vector<std::shared_ptr<I2NPMessage> >& msgs); void PostTunnelData (std::list<std::shared_ptr<I2NPMessage> >& msgs); // and cleanup msgs
void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<InboundTunnel> tunnel); void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<InboundTunnel> tunnel);
void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<OutboundTunnel> tunnel); void AddPendingTunnel (uint32_t replyMsgID, std::shared_ptr<OutboundTunnel> tunnel);
std::shared_ptr<TunnelPool> CreateTunnelPool (int numInboundHops, std::shared_ptr<TunnelPool> CreateTunnelPool (int numInboundHops,

Loading…
Cancel
Save