I2P: End-to-End encrypted and anonymous Internet https://i2pd.website/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

150 lines
2.7 KiB

#ifndef QUEUE_H__
#define QUEUE_H__
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
namespace i2p
namespace util
template<typename Element>
class Queue
void Put (Element * e)
std::unique_lock<std::mutex> l(m_QueueMutex);
m_Queue.push (e);
m_NonEmpty.notify_one ();
Element * GetNext ()
std::unique_lock<std::mutex> l(m_QueueMutex);
Element * el = GetNonThreadSafe ();
if (!el)
m_NonEmpty.wait (l);
el = GetNonThreadSafe ();
return el;
Element * GetNextWithTimeout (int usec)
std::unique_lock<std::mutex> l(m_QueueMutex);
Element * el = GetNonThreadSafe ();
if (!el)
m_NonEmpty.wait_for (l, std::chrono::milliseconds (usec));
el = GetNonThreadSafe ();
return el;
void Wait ()
std::unique_lock<std::mutex> l(m_QueueMutex);
m_NonEmpty.wait (l);
bool Wait (int sec, int usec)
std::unique_lock<std::mutex> l(m_QueueMutex);
return m_NonEmpty.wait_for (l, std::chrono::seconds (sec) + std::chrono::milliseconds (usec)) != std::cv_status::timeout;
bool IsEmpty ()
std::unique_lock<std::mutex> l(m_QueueMutex);
return m_Queue.empty ();
void WakeUp () { m_NonEmpty.notify_all (); };
Element * Get ()
std::unique_lock<std::mutex> l(m_QueueMutex);
return GetNonThreadSafe ();
Element * Peek ()
std::unique_lock<std::mutex> l(m_QueueMutex);
return GetNonThreadSafe (true);
Element * GetNonThreadSafe (bool peek = false)
if (!m_Queue.empty ())
Element * el = m_Queue.front ();
if (!peek)
m_Queue.pop ();
return el;
return nullptr;
std::queue<Element *> m_Queue;
std::mutex m_QueueMutex;
std::condition_variable m_NonEmpty;
template<class Msg>
class MsgQueue: public Queue<Msg>
typedef std::function<void()> OnEmpty;
MsgQueue (): m_IsRunning (true), m_Thread (std::bind (&MsgQueue<Msg>::Run, this)) {};
~MsgQueue () { Stop (); };
void Stop()
if (m_IsRunning)
m_IsRunning = false;
Queue<Msg>::WakeUp ();
void SetOnEmpty (OnEmpty const & e) { m_OnEmpty = e; };
void Run ()
while (m_IsRunning)
while (Msg * msg = Queue<Msg>::Get ())
msg->Process ();
delete msg;
if (m_OnEmpty != nullptr)
m_OnEmpty ();
Queue<Msg>::Wait ();
bool m_IsRunning;
std::thread m_Thread;
OnEmpty m_OnEmpty;