Browse Source

more generic queue

pull/210/head
orignal 10 years ago
parent
commit
465945f8a8
  1. 2
      NetDb.h
  2. 30
      Queue.h
  3. 2
      Tunnel.h

2
NetDb.h

@ -89,7 +89,7 @@ namespace data
bool m_IsRunning; bool m_IsRunning;
std::thread * m_Thread; std::thread * m_Thread;
i2p::util::Queue<I2NPMessage> m_Queue; // of I2NPDatabaseStoreMsg i2p::util::Queue<I2NPMessage *> m_Queue; // of I2NPDatabaseStoreMsg
Reseeder * m_Reseeder; Reseeder * m_Reseeder;

30
Queue.h

@ -17,14 +17,14 @@ namespace util
{ {
public: public:
void Put (Element * e) void Put (Element e)
{ {
std::unique_lock<std::mutex> l(m_QueueMutex); std::unique_lock<std::mutex> l(m_QueueMutex);
m_Queue.push (e); m_Queue.push (e);
m_NonEmpty.notify_one (); m_NonEmpty.notify_one ();
} }
void Put (const std::vector<Element *>& vec) void Put (const std::vector<Element>& vec)
{ {
if (!vec.empty ()) if (!vec.empty ())
{ {
@ -35,10 +35,10 @@ namespace util
} }
} }
Element * GetNext () Element GetNext ()
{ {
std::unique_lock<std::mutex> l(m_QueueMutex); std::unique_lock<std::mutex> l(m_QueueMutex);
Element * el = GetNonThreadSafe (); auto el = GetNonThreadSafe ();
if (!el) if (!el)
{ {
m_NonEmpty.wait (l); m_NonEmpty.wait (l);
@ -47,10 +47,10 @@ namespace util
return el; return el;
} }
Element * GetNextWithTimeout (int usec) Element GetNextWithTimeout (int usec)
{ {
std::unique_lock<std::mutex> l(m_QueueMutex); std::unique_lock<std::mutex> l(m_QueueMutex);
Element * el = GetNonThreadSafe (); auto el = GetNonThreadSafe ();
if (!el) if (!el)
{ {
m_NonEmpty.wait_for (l, std::chrono::milliseconds (usec)); m_NonEmpty.wait_for (l, std::chrono::milliseconds (usec));
@ -85,13 +85,13 @@ namespace util
void WakeUp () { m_NonEmpty.notify_all (); }; void WakeUp () { m_NonEmpty.notify_all (); };
Element * Get () Element Get ()
{ {
std::unique_lock<std::mutex> l(m_QueueMutex); std::unique_lock<std::mutex> l(m_QueueMutex);
return GetNonThreadSafe (); return GetNonThreadSafe ();
} }
Element * Peek () Element Peek ()
{ {
std::unique_lock<std::mutex> l(m_QueueMutex); std::unique_lock<std::mutex> l(m_QueueMutex);
return GetNonThreadSafe (true); return GetNonThreadSafe (true);
@ -99,11 +99,11 @@ namespace util
private: private:
Element * GetNonThreadSafe (bool peek = false) Element GetNonThreadSafe (bool peek = false)
{ {
if (!m_Queue.empty ()) if (!m_Queue.empty ())
{ {
Element * el = m_Queue.front (); auto el = m_Queue.front ();
if (!peek) if (!peek)
m_Queue.pop (); m_Queue.pop ();
return el; return el;
@ -113,13 +113,13 @@ namespace util
private: private:
std::queue<Element *> m_Queue; std::queue<Element> m_Queue;
std::mutex m_QueueMutex; std::mutex m_QueueMutex;
std::condition_variable m_NonEmpty; std::condition_variable m_NonEmpty;
}; };
template<class Msg> template<class Msg>
class MsgQueue: public Queue<Msg> class MsgQueue: public Queue<Msg *>
{ {
public: public:
@ -132,7 +132,7 @@ namespace util
if (m_IsRunning) if (m_IsRunning)
{ {
m_IsRunning = false; m_IsRunning = false;
Queue<Msg>::WakeUp (); Queue<Msg *>::WakeUp ();
m_Thread.join(); m_Thread.join();
} }
} }
@ -145,7 +145,7 @@ namespace util
{ {
while (m_IsRunning) while (m_IsRunning)
{ {
while (Msg * msg = Queue<Msg>::Get ()) while (auto msg = Queue<Msg *>::Get ())
{ {
msg->Process (); msg->Process ();
delete msg; delete msg;
@ -153,7 +153,7 @@ namespace util
if (m_OnEmpty != nullptr) if (m_OnEmpty != nullptr)
m_OnEmpty (); m_OnEmpty ();
if (m_IsRunning) if (m_IsRunning)
Queue<Msg>::Wait (); Queue<Msg *>::Wait ();
} }
} }

2
Tunnel.h

@ -177,7 +177,7 @@ namespace tunnel
std::mutex m_PoolsMutex; std::mutex m_PoolsMutex;
std::list<std::shared_ptr<TunnelPool>> m_Pools; std::list<std::shared_ptr<TunnelPool>> m_Pools;
std::shared_ptr<TunnelPool> m_ExploratoryPool; std::shared_ptr<TunnelPool> m_ExploratoryPool;
i2p::util::Queue<I2NPMessage> m_Queue; i2p::util::Queue<I2NPMessage *> m_Queue;
// some stats // some stats
int m_NumSuccesiveTunnelCreations, m_NumFailedTunnelCreations; int m_NumSuccesiveTunnelCreations, m_NumFailedTunnelCreations;

Loading…
Cancel
Save