From 763547f465507299ae1b6425bb5d6df7f36d8559 Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 27 Jan 2015 19:12:27 -0500 Subject: [PATCH] fixed corrupted NTCP messages --- NTCPSession.cpp | 32 +++++++++++++++++++++++++++----- NTCPSession.h | 3 +++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/NTCPSession.cpp b/NTCPSession.cpp index df1e9bc9..83e43c18 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -21,7 +21,7 @@ namespace transport NTCPSession::NTCPSession (NTCPServer& server, std::shared_ptr in_RemoteRouter): TransportSession (in_RemoteRouter), m_Server (server), m_Socket (m_Server.GetService ()), m_TerminationTimer (m_Server.GetService ()), m_IsEstablished (false), m_ReceiveBufferOffset (0), - m_NextMessage (nullptr), m_NumSentBytes (0), m_NumReceivedBytes (0) + m_NextMessage (nullptr), m_IsSending (false), m_NumSentBytes (0), m_NumReceivedBytes (0) { m_DHKeysPair = transports.GetNextDHKeysPair (); m_Establisher = new Establisher; @@ -32,6 +32,8 @@ namespace transport delete m_Establisher; if (m_NextMessage) i2p::DeleteI2NPMessage (m_NextMessage); + for (auto it: m_SendQueue) + DeleteI2NPMessage (it); } void NTCPSession::CreateAESKey (uint8_t * pubKey, i2p::crypto::AESKey& key) @@ -538,6 +540,7 @@ namespace transport void NTCPSession::Send (i2p::I2NPMessage * msg) { + m_IsSending = true; boost::asio::async_write (m_Socket, CreateMsgBuffer (msg), boost::asio::transfer_all (), std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector{ msg })); } @@ -581,6 +584,7 @@ namespace transport void NTCPSession::Send (const std::vector& msgs) { + m_IsSending = true; std::vector bufs; for (auto it: msgs) bufs.push_back (CreateMsgBuffer (it)); @@ -590,6 +594,7 @@ namespace transport void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector msgs) { + m_IsSending = false; for (auto it: msgs) if (it) i2p::DeleteI2NPMessage (it); if (ecode) @@ -602,8 +607,14 @@ namespace transport else { m_NumSentBytes += bytes_transferred; - ScheduleTermination (); // reset termination timer - } + if (!m_SendQueue.empty()) + { + Send (m_SendQueue); + m_SendQueue.clear (); + } + else + ScheduleTermination (); // reset termination timer + } } @@ -620,7 +631,12 @@ namespace transport void NTCPSession::PostI2NPMessage (I2NPMessage * msg) { if (msg) - Send (msg); + { + if (m_IsSending) + m_SendQueue.push_back (msg); + else + Send (msg); + } } void NTCPSession::SendI2NPMessages (const std::vector& msgs) @@ -630,7 +646,13 @@ namespace transport void NTCPSession::PostI2NPMessages (std::vector msgs) { - Send (msgs); + if (m_IsSending) + { + for (auto it: msgs) + m_SendQueue.push_back (it); + } + else + Send (msgs); } void NTCPSession::ScheduleTermination () diff --git a/NTCPSession.h b/NTCPSession.h index bf2cb17e..66f38538 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -132,6 +132,9 @@ namespace transport i2p::I2NPMessage * m_NextMessage; size_t m_NextMessageOffset; i2p::I2NPMessagesHandler m_Handler; + + bool m_IsSending; + std::vector m_SendQueue; size_t m_NumSentBytes, m_NumReceivedBytes; };