From c61cd350eebf573d48e31e475d6484854e7111e1 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 20 Jan 2015 22:35:27 -0500 Subject: [PATCH] send multiple messages though single write call --- NTCPSession.cpp | 43 +++++++++++++++++++++++++++++++++++++------ NTCPSession.h | 7 +++++-- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 9b283612..6b040c8f 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -547,6 +547,12 @@ namespace transport } void NTCPSession::Send (i2p::I2NPMessage * msg) + { + boost::asio::async_write (m_Socket, CreateMsgBuffer (msg), boost::asio::transfer_all (), + std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, msg)); + } + + boost::asio::const_buffers_1 NTCPSession::CreateMsgBuffer (I2NPMessage * msg) { uint8_t * sendBuffer; int len; @@ -579,10 +585,8 @@ namespace transport int l = len + padding + 6; m_Encryption.Encrypt(sendBuffer, l, sendBuffer); - - boost::asio::async_write (m_Socket, boost::asio::buffer (sendBuffer, l), boost::asio::transfer_all (), - std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, msg)); - } + return boost::asio::buffer ((const uint8_t *)sendBuffer, l); + } void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, i2p::I2NPMessage * msg) { @@ -602,6 +606,34 @@ namespace transport } } + void NTCPSession::Send (const std::vector& msgs) + { + std::vector bufs; + for (auto it: msgs) + bufs.push_back (CreateMsgBuffer (it)); + boost::asio::async_write (m_Socket, bufs, boost::asio::transfer_all (), + std::bind(&NTCPSession::HandleBatchSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, msgs)); + } + + void NTCPSession::HandleBatchSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector msgs) + { + for (auto it: msgs) + if (it) i2p::DeleteI2NPMessage (it); + if (ecode) + { + LogPrint (eLogWarning, "Couldn't send msgs: ", ecode.message ()); + // we shouldn't call Terminate () here, because HandleReceive takes care + // TODO: 'delete this' statement in Terminate () must be eliminated later + // Terminate (); + } + else + { + m_NumSentBytes += bytes_transferred; + ScheduleTermination (); // reset termination timer + } + } + + void NTCPSession::SendTimeSyncMessage () { Send (nullptr); @@ -625,8 +657,7 @@ namespace transport void NTCPSession::PostI2NPMessages (std::vector msgs) { - for (auto it: msgs) - if (it) Send (it); + Send (msgs); } void NTCPSession::ScheduleTermination () diff --git a/NTCPSession.h b/NTCPSession.h index 8435d5d9..d9082dab 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -101,9 +101,12 @@ namespace transport bool DecryptNextBlock (const uint8_t * encrypted); void Send (i2p::I2NPMessage * msg); + boost::asio::const_buffers_1 CreateMsgBuffer (I2NPMessage * msg); void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, i2p::I2NPMessage * msg); - - + void Send (const std::vector& msgs); + void HandleBatchSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector msgs); + + // timer void ScheduleTermination (); void HandleTerminationTimer (const boost::system::error_code& ecode);