Browse Source

fixed race condition

pull/6/head
orignal 11 years ago
parent
commit
66eccc287d
  1. 54
      NTCPSession.cpp
  2. 18
      NTCPSession.h

54
NTCPSession.cpp

@ -49,9 +49,17 @@ namespace ntcp
void NTCPSession::Terminate () void NTCPSession::Terminate ()
{ {
m_IsEstablished = false;
m_Socket.close (); m_Socket.close ();
// TODO: notify tunnels // TODO: notify tunnels
i2p::transports.RemoveNTCPSession (this); i2p::transports.RemoveNTCPSession (this);
delete this;
}
void NTCPSession::Connected ()
{
m_IsEstablished = true;
i2p::transports.AddNTCPSession (this);
} }
void NTCPSession::ClientLogin () void NTCPSession::ClientLogin ()
@ -287,7 +295,7 @@ namespace ntcp
else else
{ {
LogPrint ("Phase 4 sent: ", bytes_transferred); LogPrint ("Phase 4 sent: ", bytes_transferred);
m_IsEstablished = true; Connected ();
m_ReceiveBufferOffset = 0; m_ReceiveBufferOffset = 0;
m_DecryptedBufferOffset = 0; m_DecryptedBufferOffset = 0;
Receive (); Receive ();
@ -323,16 +331,10 @@ namespace ntcp
Terminate (); Terminate ();
return; return;
} }
m_IsEstablished = true; Connected ();
SendTimeSyncMessage (); SendTimeSyncMessage ();
SendI2NPMessage (CreateDatabaseStoreMsg ()); // we tell immediately who we are
uint8_t buf1[1000];
int l = CreateDatabaseStoreMsg (buf1, 1000);
SendMessage (buf1, l);
l = CreateDeliveryStatusMsg (buf1, 1000);
SendMessage (buf1, l);
m_ReceiveBufferOffset = 0; m_ReceiveBufferOffset = 0;
m_DecryptedBufferOffset = 0; m_DecryptedBufferOffset = 0;
@ -411,31 +413,35 @@ namespace ntcp
void NTCPSession::HandleNextMessage (uint8_t * buf, int len, int dataSize) void NTCPSession::HandleNextMessage (uint8_t * buf, int len, int dataSize)
{ {
if (dataSize) if (dataSize)
i2p::HandleI2NPMessage (*this, buf+2, dataSize); i2p::HandleI2NPMessage (buf+2, dataSize);
else else
LogPrint ("Timestamp"); LogPrint ("Timestamp");
} }
void NTCPSession::Send (const uint8_t * buf, int len, bool zeroSize) void NTCPSession::Send (const uint8_t * buf, int len, bool zeroSize)
{ {
*((uint16_t *)m_SendBuffer) = zeroSize ? 0 :htobe16 (len); uint8_t * sendBuffer = new uint8_t[NTCP_MAX_MESSAGE_SIZE];
*((uint16_t *)sendBuffer) = zeroSize ? 0 :htobe16 (len);
int rem = (len + 6) % 16; int rem = (len + 6) % 16;
int padding = 0; int padding = 0;
if (rem > 0) padding = 16 - rem; if (rem > 0) padding = 16 - rem;
memcpy (m_SendBuffer + 2, buf, len); memcpy (sendBuffer + 2, buf, len);
// TODO: fill padding // TODO: fill padding
m_Adler.CalculateDigest (m_SendBuffer + len + 2 + padding, m_SendBuffer, len + 2+ padding); m_Adler.CalculateDigest (sendBuffer + len + 2 + padding, sendBuffer, len + 2+ padding);
int l = len + padding + 6; int l = len + padding + 6;
m_Encryption.ProcessData(m_SendBuffer, m_SendBuffer, l); {
std::lock_guard<std::mutex> lock (m_EncryptionMutex);
m_Encryption.ProcessData(sendBuffer, sendBuffer, l);
}
boost::asio::async_write (m_Socket, boost::asio::buffer (m_SendBuffer, l), boost::asio::transfer_all (), boost::asio::async_write (m_Socket, boost::asio::buffer (sendBuffer, l), boost::asio::transfer_all (),
boost::bind(&NTCPSession::HandleSent, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); boost::bind(&NTCPSession::HandleSent, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, sendBuffer));
} }
void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred) void NTCPSession::HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint8_t * sentBuffer)
{ {
delete sentBuffer;
if (ecode) if (ecode)
{ {
LogPrint ("Couldn't send msg: ", ecode.message ()); LogPrint ("Couldn't send msg: ", ecode.message ());
@ -453,11 +459,20 @@ namespace ntcp
Send ((uint8_t *)&t, 4, true); Send ((uint8_t *)&t, 4, true);
} }
void NTCPSession::SendMessage (uint8_t * buf, int len) void NTCPSession::SendMessage (const uint8_t * buf, int len)
{ {
Send (buf, len); Send (buf, len);
} }
void NTCPSession::SendI2NPMessage (I2NPMessage * msg)
{
if (msg)
{
Send (msg->buf, msg->len);
DeleteI2NPMessage (msg);
}
}
NTCPClient::NTCPClient (boost::asio::io_service& service, const char * address, NTCPClient::NTCPClient (boost::asio::io_service& service, const char * address,
int port, const i2p::data::RouterInfo& in_RouterInfo): NTCPSession (m_Socket, &in_RouterInfo), int port, const i2p::data::RouterInfo& in_RouterInfo): NTCPSession (m_Socket, &in_RouterInfo),
m_Socket (service), m_Endpoint (boost::asio::ip::address::from_string (address), port) m_Socket (service), m_Endpoint (boost::asio::ip::address::from_string (address), port)
@ -467,6 +482,7 @@ namespace ntcp
void NTCPClient::Connect () void NTCPClient::Connect ()
{ {
LogPrint ("Connecting to ", m_Endpoint.address ().to_string (),":", m_Endpoint.port ());
m_Socket.async_connect (m_Endpoint, boost::bind (&NTCPClient::HandleConnect, m_Socket.async_connect (m_Endpoint, boost::bind (&NTCPClient::HandleConnect,
this, boost::asio::placeholders::error)); this, boost::asio::placeholders::error));
} }

18
NTCPSession.h

@ -2,11 +2,13 @@
#define NTCP_SESSION_H__ #define NTCP_SESSION_H__
#include <inttypes.h> #include <inttypes.h>
#include <mutex>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <cryptopp/modes.h> #include <cryptopp/modes.h>
#include <cryptopp/aes.h> #include <cryptopp/aes.h>
#include <cryptopp/adler32.h> #include <cryptopp/adler32.h>
#include "RouterInfo.h" #include "RouterInfo.h"
#include "I2NPProtocol.h"
namespace i2p namespace i2p
{ {
@ -58,7 +60,6 @@ namespace ntcp
#pragma pack() #pragma pack()
const int NTCP_MAX_MESSAGE_SIZE = 16384;
class NTCPSession class NTCPSession
{ {
public: public:
@ -71,8 +72,13 @@ namespace ntcp
void ClientLogin (); void ClientLogin ();
void ServerLogin (); void ServerLogin ();
void SendMessage (uint8_t * buf, int len); void SendMessage (const uint8_t * buf, int len);
void SendI2NPMessage (I2NPMessage * msg);
protected:
void Terminate (); void Terminate ();
void Connected ();
private: private:
@ -100,7 +106,7 @@ namespace ntcp
void HandleNextMessage (uint8_t * buf, int len, int dataSize); void HandleNextMessage (uint8_t * buf, int len, int dataSize);
void Send (const uint8_t * buf, int len, bool zeroSize = false); void Send (const uint8_t * buf, int len, bool zeroSize = false);
void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleSent (const boost::system::error_code& ecode, std::size_t bytes_transferred, uint8_t * sentBuffer);
void SendTimeSyncMessage (); void SendTimeSyncMessage ();
@ -120,11 +126,13 @@ namespace ntcp
NTCPPhase3 m_Phase3; NTCPPhase3 m_Phase3;
NTCPPhase4 m_Phase4; NTCPPhase4 m_Phase4;
uint8_t m_ReceiveBuffer[NTCP_MAX_MESSAGE_SIZE*2], m_SendBuffer[NTCP_MAX_MESSAGE_SIZE]; uint8_t m_ReceiveBuffer[i2p::NTCP_MAX_MESSAGE_SIZE*2];
int m_ReceiveBufferOffset; int m_ReceiveBufferOffset;
uint8_t m_DecryptedBuffer[NTCP_MAX_MESSAGE_SIZE*2]; uint8_t m_DecryptedBuffer[i2p::NTCP_MAX_MESSAGE_SIZE*2];
int m_DecryptedBufferOffset; int m_DecryptedBufferOffset;
std::mutex m_EncryptionMutex;
}; };
class NTCPClient: public NTCPSession class NTCPClient: public NTCPSession

Loading…
Cancel
Save