From 1bbc12b36a63021d153f45096701a9999769a1f3 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 25 Mar 2014 14:26:39 -0400 Subject: [PATCH] AsyncReceive added --- Streaming.cpp | 5 +++-- Streaming.h | 32 +++++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/Streaming.cpp b/Streaming.cpp index 597ac79d..afa7feb6 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -1,6 +1,5 @@ #include #include -#include #include #include "Log.h" #include "RouterInfo.h" @@ -19,7 +18,7 @@ namespace stream const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (remote), - m_OutboundTunnel (nullptr) + m_OutboundTunnel (nullptr), m_ReceiveTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -27,6 +26,7 @@ namespace stream Stream::~Stream () { + m_ReceiveTimer.cancel (); while (auto packet = m_ReceiveQueue.Get ()) delete packet; for (auto it: m_SavedPackets) @@ -124,6 +124,7 @@ namespace stream LogPrint ("Closed"); m_IsOpen = false; m_ReceiveQueue.WakeUp (); + m_ReceiveTimer.cancel (); } } diff --git a/Streaming.h b/Streaming.h index af7a01f2..3de8f3e2 100644 --- a/Streaming.h +++ b/Streaming.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "I2PEndian.h" #include "Queue.h" @@ -80,10 +81,13 @@ namespace stream void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired + template + void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); + void Close (); void SetLeaseSetUpdated () { m_LeaseSetUpdated = true; }; - + private: void ConnectAndSend (uint8_t * buf, size_t len); @@ -96,6 +100,9 @@ namespace stream void UpdateCurrentRemoteLease (); + template + void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler); + private: boost::asio::io_service& m_Service; @@ -107,6 +114,7 @@ namespace stream i2p::util::Queue m_ReceiveQueue; std::set m_SavedPackets; i2p::tunnel::OutboundTunnel * m_OutboundTunnel; + boost::asio::deadline_timer m_ReceiveTimer; }; class StreamingDestination: public i2p::data::LocalDestination @@ -185,6 +193,28 @@ namespace stream // assuming data is I2CP message void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len); I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len); + +//------------------------------------------------- + + template + void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout) + { + m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(timeout)); + m_ReceiveTimer.async_wait (boost::bind (&Stream::HandleReceiveTimer, + this, boost::asio::placeholders::error, buffer, handler)); + } + + template + void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler) + { + // TODO: + if (ecode == boost::asio::error::operation_aborted) + // timeout not expired + handler (boost::system::error_code (), 0); + else + // timeout expired + handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), 0); + } } }