mirror of
https://github.com/PurpleI2P/i2pd.git
synced 2025-01-28 01:44:14 +00:00
fixed corrupted NTCP messages
This commit is contained in:
parent
b3e08b2cf4
commit
763547f465
@ -21,7 +21,7 @@ namespace transport
|
|||||||
NTCPSession::NTCPSession (NTCPServer& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter):
|
NTCPSession::NTCPSession (NTCPServer& server, std::shared_ptr<const i2p::data::RouterInfo> in_RemoteRouter):
|
||||||
TransportSession (in_RemoteRouter), m_Server (server), m_Socket (m_Server.GetService ()),
|
TransportSession (in_RemoteRouter), m_Server (server), m_Socket (m_Server.GetService ()),
|
||||||
m_TerminationTimer (m_Server.GetService ()), m_IsEstablished (false), m_ReceiveBufferOffset (0),
|
m_TerminationTimer (m_Server.GetService ()), m_IsEstablished (false), m_ReceiveBufferOffset (0),
|
||||||
m_NextMessage (nullptr), m_NumSentBytes (0), m_NumReceivedBytes (0)
|
m_NextMessage (nullptr), m_IsSending (false), m_NumSentBytes (0), m_NumReceivedBytes (0)
|
||||||
{
|
{
|
||||||
m_DHKeysPair = transports.GetNextDHKeysPair ();
|
m_DHKeysPair = transports.GetNextDHKeysPair ();
|
||||||
m_Establisher = new Establisher;
|
m_Establisher = new Establisher;
|
||||||
@ -32,6 +32,8 @@ namespace transport
|
|||||||
delete m_Establisher;
|
delete m_Establisher;
|
||||||
if (m_NextMessage)
|
if (m_NextMessage)
|
||||||
i2p::DeleteI2NPMessage (m_NextMessage);
|
i2p::DeleteI2NPMessage (m_NextMessage);
|
||||||
|
for (auto it: m_SendQueue)
|
||||||
|
DeleteI2NPMessage (it);
|
||||||
}
|
}
|
||||||
|
|
||||||
void NTCPSession::CreateAESKey (uint8_t * pubKey, i2p::crypto::AESKey& key)
|
void NTCPSession::CreateAESKey (uint8_t * pubKey, i2p::crypto::AESKey& key)
|
||||||
@ -538,6 +540,7 @@ namespace transport
|
|||||||
|
|
||||||
void NTCPSession::Send (i2p::I2NPMessage * msg)
|
void NTCPSession::Send (i2p::I2NPMessage * msg)
|
||||||
{
|
{
|
||||||
|
m_IsSending = true;
|
||||||
boost::asio::async_write (m_Socket, CreateMsgBuffer (msg), boost::asio::transfer_all (),
|
boost::asio::async_write (m_Socket, CreateMsgBuffer (msg), boost::asio::transfer_all (),
|
||||||
std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector<I2NPMessage *>{ msg }));
|
std::bind(&NTCPSession::HandleSent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::vector<I2NPMessage *>{ msg }));
|
||||||
}
|
}
|
||||||
@ -581,6 +584,7 @@ namespace transport
|
|||||||
|
|
||||||
void NTCPSession::Send (const std::vector<I2NPMessage *>& msgs)
|
void NTCPSession::Send (const std::vector<I2NPMessage *>& msgs)
|
||||||
{
|
{
|
||||||
|
m_IsSending = true;
|
||||||
std::vector<boost::asio::const_buffer> bufs;
|
std::vector<boost::asio::const_buffer> bufs;
|
||||||
for (auto it: msgs)
|
for (auto it: msgs)
|
||||||
bufs.push_back (CreateMsgBuffer (it));
|
bufs.push_back (CreateMsgBuffer (it));
|
||||||
@ -590,6 +594,7 @@ namespace transport
|
|||||||
|
|
||||||
void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector<I2NPMessage *> msgs)
|
void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, std::vector<I2NPMessage *> msgs)
|
||||||
{
|
{
|
||||||
|
m_IsSending = false;
|
||||||
for (auto it: msgs)
|
for (auto it: msgs)
|
||||||
if (it) i2p::DeleteI2NPMessage (it);
|
if (it) i2p::DeleteI2NPMessage (it);
|
||||||
if (ecode)
|
if (ecode)
|
||||||
@ -602,8 +607,14 @@ namespace transport
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
m_NumSentBytes += bytes_transferred;
|
m_NumSentBytes += bytes_transferred;
|
||||||
ScheduleTermination (); // reset termination timer
|
if (!m_SendQueue.empty())
|
||||||
}
|
{
|
||||||
|
Send (m_SendQueue);
|
||||||
|
m_SendQueue.clear ();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
ScheduleTermination (); // reset termination timer
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -620,7 +631,12 @@ namespace transport
|
|||||||
void NTCPSession::PostI2NPMessage (I2NPMessage * msg)
|
void NTCPSession::PostI2NPMessage (I2NPMessage * msg)
|
||||||
{
|
{
|
||||||
if (msg)
|
if (msg)
|
||||||
Send (msg);
|
{
|
||||||
|
if (m_IsSending)
|
||||||
|
m_SendQueue.push_back (msg);
|
||||||
|
else
|
||||||
|
Send (msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void NTCPSession::SendI2NPMessages (const std::vector<I2NPMessage *>& msgs)
|
void NTCPSession::SendI2NPMessages (const std::vector<I2NPMessage *>& msgs)
|
||||||
@ -630,7 +646,13 @@ namespace transport
|
|||||||
|
|
||||||
void NTCPSession::PostI2NPMessages (std::vector<I2NPMessage *> msgs)
|
void NTCPSession::PostI2NPMessages (std::vector<I2NPMessage *> msgs)
|
||||||
{
|
{
|
||||||
Send (msgs);
|
if (m_IsSending)
|
||||||
|
{
|
||||||
|
for (auto it: msgs)
|
||||||
|
m_SendQueue.push_back (it);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
Send (msgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
void NTCPSession::ScheduleTermination ()
|
void NTCPSession::ScheduleTermination ()
|
||||||
|
@ -132,6 +132,9 @@ namespace transport
|
|||||||
i2p::I2NPMessage * m_NextMessage;
|
i2p::I2NPMessage * m_NextMessage;
|
||||||
size_t m_NextMessageOffset;
|
size_t m_NextMessageOffset;
|
||||||
i2p::I2NPMessagesHandler m_Handler;
|
i2p::I2NPMessagesHandler m_Handler;
|
||||||
|
|
||||||
|
bool m_IsSending;
|
||||||
|
std::vector<I2NPMessage *> m_SendQueue;
|
||||||
|
|
||||||
size_t m_NumSentBytes, m_NumReceivedBytes;
|
size_t m_NumSentBytes, m_NumReceivedBytes;
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user