From 975265b0afe539994a0638caad7d70e328299323 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 7 Dec 2016 11:52:20 -0500 Subject: [PATCH] more --- Event.cpp | 35 +++++++++++++++++++++--- Event.h | 18 ++++++++++++- NTCPSession.cpp | 2 +- SSUData.cpp | 2 +- Transports.cpp | 2 +- Websocket.cpp | 72 +++++++++++++++++++++++++++++++++++++++++++------ 6 files changed, 116 insertions(+), 15 deletions(-) diff --git a/Event.cpp b/Event.cpp index e148538e..4bc6d594 100644 --- a/Event.cpp +++ b/Event.cpp @@ -17,15 +17,44 @@ namespace i2p void EventCore::QueueEvent(const EventType & ev) { - if(m_listener) - m_listener->HandleEvent(ev); + if(m_listener) m_listener->HandleEvent(ev); + } + + void EventCore::CollectEvent(const std::string & type, const std::string & ident, uint64_t val) + { + std::unique_lock lock(m_collect_mutex); + std::string key = type + "." + ident; + if (m_collected.find(key) == m_collected.end()) + { + m_collected[key] = {type, key, 0}; + } + m_collected[key].Val += val; + } + + void EventCore::PumpCollected(EventListener * listener) + { + std::unique_lock lock(m_collect_mutex); + if(listener) + { + for(const auto & ev : m_collected) { + listener->HandlePumpEvent({{"type", ev.second.Key}, {"ident", ev.second.Ident}}, ev.second.Val); + } + } + m_collected.clear(); } } } -void EmitEvent(const EventType & e) +void QueueIntEvent(const std::string & type, const std::string & ident, uint64_t val) { #ifdef WITH_EVENTS + i2p::event::core.CollectEvent(type, ident, val); +#endif +} + +void EmitEvent(const EventType & e) +{ +#if WITH_EVENTS i2p::event::core.QueueEvent(e); #endif } diff --git a/Event.h b/Event.h index 1ab37847..a9f97df2 100644 --- a/Event.h +++ b/Event.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include @@ -16,15 +18,27 @@ namespace i2p public: virtual ~EventListener() {}; virtual void HandleEvent(const EventType & ev) = 0; + /** @brief handle collected event when pumped */ + virtual void HandlePumpEvent(const EventType & ev, const uint64_t & val) = 0; }; class EventCore { public: void QueueEvent(const EventType & ev); + void CollectEvent(const std::string & type, const std::string & ident, uint64_t val); void SetListener(EventListener * l); - + void PumpCollected(EventListener * l); + private: + std::mutex m_collect_mutex; + struct CollectedEvent + { + std::string Key; + std::string Ident; + uint64_t Val; + }; + std::map m_collected; EventListener * m_listener = nullptr; }; #ifdef WITH_EVENTS @@ -32,6 +46,8 @@ namespace i2p #endif } } + +void QueueIntEvent(const std::string & type, const std::string & ident, uint64_t val); void EmitEvent(const EventType & ev); #endif diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 81cfe687..1d3cd95b 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -610,7 +610,7 @@ namespace transport if (!m_NextMessage->IsExpired ()) { #ifdef WITH_EVENTS - EmitEvent({{"type", "transport.recvmsg"} , {"ident", GetIdentHashBase64()}, {"number", "1"}}); + QueueIntEvent("transport.recvmsg", GetIdentHashBase64(), 1); #endif m_Handler.PutNextMessage (m_NextMessage); } diff --git a/SSUData.cpp b/SSUData.cpp index ad38cf25..48e3e3f1 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -240,7 +240,7 @@ namespace transport if (!msg->IsExpired ()) { #ifdef WITH_EVENTS - EmitEvent({{"type", "transport.recvmsg"} , {"ident", m_Session.GetIdentHashBase64()}, {"number", "1"}}); + QueueIntEvent("transport.recvmsg", m_Session.GetIdentHashBase64(), 1); #endif m_Handler.PutNextMessage (msg); } diff --git a/Transports.cpp b/Transports.cpp index 1b9d52a1..efcd6742 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -249,7 +249,7 @@ namespace transport void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs) { #ifdef WITH_EVENTS - EmitEvent({{"type" , "transport.sendmsg"}, {"ident", ident.ToBase64()}, {"number", std::to_string(msgs.size())}}); + QueueIntEvent("transport.send", ident.ToBase64(), msgs.size()); #endif m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs)); } diff --git a/Websocket.cpp b/Websocket.cpp index 0de44efe..a1a58c5f 100644 --- a/Websocket.cpp +++ b/Websocket.cpp @@ -2,6 +2,7 @@ #include "Log.h" #include +#include #include #include @@ -27,7 +28,11 @@ namespace i2p typedef ServerImpl::message_ptr MessagePtr; public: - WebsocketServerImpl(const std::string & addr, int port) : m_run(false), m_thread(nullptr) + WebsocketServerImpl(const std::string & addr, int port) : + m_run(false), + m_ws_thread(nullptr), + m_ev_thread(nullptr), + m_WebsocketTicker(m_Service) { m_server.init_asio(); m_server.set_open_handler(std::bind(&WebsocketServerImpl::ConnOpened, this, std::placeholders::_1)); @@ -44,7 +49,7 @@ namespace i2p void Start() { m_run = true; m_server.start_accept(); - m_thread = new std::thread([&] () { + m_ws_thread = new std::thread([&] () { while(m_run) { try { m_server.run(); @@ -53,16 +58,35 @@ namespace i2p } } }); + m_ev_thread = new std::thread([&] () { + while(m_run) { + try { + m_Service.run(); + break; + } catch (std::exception & e ) { + LogPrint(eLogError, "Websocket service: ", e.what()); + } + } + }); + ScheduleTick(); } void Stop() { m_run = false; + m_Service.stop(); m_server.stop(); - if(m_thread) { - m_thread->join(); - delete m_thread; + + if(m_ev_thread) { + m_ev_thread->join(); + delete m_ev_thread; } - m_thread = nullptr; + m_ev_thread = nullptr; + + if(m_ws_thread) { + m_ws_thread->join(); + delete m_ws_thread; + } + m_ws_thread = nullptr; } void ConnOpened(ServerConn c) @@ -82,11 +106,40 @@ namespace i2p (void) conn; (void) msg; } + + void HandleTick(const boost::system::error_code & ec) + { + + if(ec != boost::asio::error::operation_aborted) + LogPrint(eLogError, "Websocket ticker: ", ec.message()); + // pump collected events to us + i2p::event::core.PumpCollected(this); + ScheduleTick(); + } + + void ScheduleTick() + { + LogPrint(eLogDebug, "Websocket schedule tick"); + boost::posix_time::seconds dlt(1); + m_WebsocketTicker.expires_from_now(dlt); + m_WebsocketTicker.async_wait(std::bind(&WebsocketServerImpl::HandleTick, this, std::placeholders::_1)); + } + + /** @brief called from m_ev_thread */ + void HandlePumpEvent(const EventType & ev, const uint64_t & val) + { + EventType e; + for (const auto & i : ev) + e[i.first] = i.second; + + e["number"] = std::to_string(val); + HandleEvent(e); + } + /** @brief called from m_ws_thread */ void HandleEvent(const EventType & ev) { std::lock_guard lock(m_connsMutex); - LogPrint(eLogDebug, "websocket event"); boost::property_tree::ptree event; for (const auto & item : ev) { event.put(item.first, item.second); @@ -105,10 +158,13 @@ namespace i2p private: typedef std::set > ConnList; bool m_run; - std::thread * m_thread; + std::thread * m_ws_thread; + std::thread * m_ev_thread; std::mutex m_connsMutex; ConnList m_conns; ServerImpl m_server; + boost::asio::io_service m_Service; + boost::asio::deadline_timer m_WebsocketTicker; };