1
0
mirror of https://github.com/PurpleI2P/i2pd.git synced 2025-01-22 04:04:16 +00:00

fix sam race conditions

This commit is contained in:
Jeff Becker 2018-01-15 08:19:57 -05:00
parent b91efaa973
commit 207212557e
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
3 changed files with 227 additions and 136 deletions

View File

@ -69,7 +69,7 @@ namespace stream
*/ */
const uint64_t DEFAULT_BAN_INTERVAL = 60 * 60 * 1000; const uint64_t DEFAULT_BAN_INTERVAL = 60 * 60 * 1000;
struct Packet struct Packet
{ {
size_t len, offset; size_t len, offset;
uint8_t buf[MAX_PACKET_SIZE]; uint8_t buf[MAX_PACKET_SIZE];
@ -276,8 +276,8 @@ namespace stream
/** set max connections per minute per destination */ /** set max connections per minute per destination */
void SetMaxConnsPerMinute(const uint32_t conns); void SetMaxConnsPerMinute(const uint32_t conns);
Packet * NewPacket () { return m_PacketsPool.Acquire (); } Packet * NewPacket () { return new Packet; }
void DeletePacket (Packet * p) { m_PacketsPool.Release (p); } void DeletePacket (Packet * p) { delete p; }
private: private:
@ -316,7 +316,7 @@ namespace stream
std::vector<i2p::data::IdentHash> m_Banned; std::vector<i2p::data::IdentHash> m_Banned;
uint64_t m_LastBanClear; uint64_t m_LastBanClear;
i2p::util::MemoryPool<Packet> m_PacketsPool; //i2p::util::MemoryPool<Packet> m_PacketsPool;
bool m_EnableDrop; bool m_EnableDrop;
public: public:
@ -334,16 +334,21 @@ namespace stream
void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout) void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout)
{ {
auto s = shared_from_this(); auto s = shared_from_this();
m_Service.post ([=](void) m_Service.post ([s, buffer, handler, timeout](void)
{ {
if (!s->m_ReceiveQueue.empty () || s->m_Status == eStreamStatusReset) if (!s->m_ReceiveQueue.empty () || s->m_Status == eStreamStatusReset)
s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0); s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0);
else else
{ {
int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout; int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout;
s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t)); s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t));
s->m_ReceiveTimer.async_wait ([=](const boost::system::error_code& ecode) int left = timeout - t;
{ s->HandleReceiveTimer (ecode, buffer, handler, timeout - t); }); auto self = s->shared_from_this();
self->m_ReceiveTimer.async_wait (
[self, buffer, handler, left](const boost::system::error_code & ec)
{
self->HandleReceiveTimer(ec, buffer, handler, left);
});
} }
}); });
} }

View File

@ -15,31 +15,22 @@ namespace i2p
{ {
namespace client namespace client
{ {
SAMSocket::SAMSocket (SAMBridge& owner): SAMSocket::SAMSocket (SAMBridge& owner, std::shared_ptr<Socket_t> socket):
m_Owner (owner), m_Socket (m_Owner.GetService ()), m_Timer (m_Owner.GetService ()), m_Owner (owner), m_Socket(socket), m_Timer (m_Owner.GetService ()),
m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false), m_BufferOffset (0),
m_IsAccepting (false), m_Stream (nullptr), m_Session (nullptr) m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false),
m_IsAccepting (false), m_Stream (nullptr)
{ {
} }
SAMSocket::~SAMSocket () SAMSocket::~SAMSocket ()
{ {
Terminate ("~SAMSocket()"); if(m_Stream)
}
void SAMSocket::CloseStream (const char* reason)
{
LogPrint (eLogDebug, "SAMSocket::CloseStream, reason: ", reason);
if (m_Stream)
{ {
m_Stream->Close (); m_Stream->Close ();
m_Stream.reset (); m_Stream.reset ();
} }
} auto Session = m_Owner.FindSession(m_ID);
void SAMSocket::Terminate (const char* reason)
{
CloseStream (reason);
switch (m_SocketType) switch (m_SocketType)
{ {
@ -48,17 +39,17 @@ namespace client
break; break;
case eSAMSocketTypeStream: case eSAMSocketTypeStream:
{ {
if (m_Session) if (Session)
m_Session->DelSocket (shared_from_this ()); Session->DelSocket (this);
break; break;
} }
case eSAMSocketTypeAcceptor: case eSAMSocketTypeAcceptor:
{ {
if (m_Session) if (Session)
{ {
m_Session->DelSocket (shared_from_this ()); Session->DelSocket (this);
if (m_IsAccepting && m_Session->localDestination) if (m_IsAccepting && Session->localDestination)
m_Session->localDestination->StopAcceptingStreams (); Session->localDestination->StopAcceptingStreams ();
} }
break; break;
} }
@ -66,15 +57,54 @@ namespace client
; ;
} }
m_SocketType = eSAMSocketTypeTerminated; m_SocketType = eSAMSocketTypeTerminated;
if (m_Socket.is_open()) m_Socket.close (); if (m_Socket && m_Socket->is_open()) m_Socket->close ();
m_Session = nullptr; m_Socket.reset ();
}
void SAMSocket::Terminate (const char* reason)
{
if(m_Stream)
{
m_Stream->Close ();
m_Stream.reset ();
}
auto Session = m_Owner.FindSession(m_ID);
switch (m_SocketType)
{
case eSAMSocketTypeSession:
m_Owner.CloseSession (m_ID);
break;
case eSAMSocketTypeStream:
{
if (Session)
Session->DelSocket (this);
break;
}
case eSAMSocketTypeAcceptor:
{
if (Session)
{
Session->DelSocket (this);
if (m_IsAccepting && Session->localDestination)
Session->localDestination->StopAcceptingStreams ();
}
break;
}
default:
;
}
m_SocketType = eSAMSocketTypeTerminated;
if (m_Socket && m_Socket->is_open()) m_Socket->close ();
m_Socket.reset ();
} }
void SAMSocket::ReceiveHandshake () void SAMSocket::ReceiveHandshake ()
{ {
m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), if(m_Socket)
std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
std::placeholders::_1, std::placeholders::_2)); std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
std::placeholders::_1, std::placeholders::_2));
} }
void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
@ -121,7 +151,7 @@ namespace client
#else #else
size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ()); size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ());
#endif #endif
boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), boost::asio::async_write (*m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (),
std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (), std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (),
std::placeholders::_1, std::placeholders::_2)); std::placeholders::_1, std::placeholders::_2));
} }
@ -144,9 +174,9 @@ namespace client
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
Terminate ("SAM: handshake reply send error"); Terminate ("SAM: handshake reply send error");
} }
else else if(m_Socket)
{ {
m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
std::bind(&SAMSocket::HandleMessage, shared_from_this (), std::bind(&SAMSocket::HandleMessage, shared_from_this (),
std::placeholders::_1, std::placeholders::_2)); std::placeholders::_1, std::placeholders::_2));
} }
@ -157,7 +187,7 @@ namespace client
LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg); LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg);
if (!m_IsSilent) if (!m_IsSilent)
boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), boost::asio::async_write (*m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (),
std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (), std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, close)); std::placeholders::_1, std::placeholders::_2, close));
else else
@ -306,19 +336,19 @@ namespace client
} }
// create destination // create destination
m_Session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, &params); auto session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, &params);
if (m_Session) if (session)
{ {
m_SocketType = eSAMSocketTypeSession; m_SocketType = eSAMSocketTypeSession;
if (style == SAM_VALUE_DATAGRAM) if (style == SAM_VALUE_DATAGRAM)
{ {
m_Session->UDPEndpoint = forward; session->UDPEndpoint = forward;
auto dest = m_Session->localDestination->CreateDatagramDestination (); auto dest = session->localDestination->CreateDatagramDestination ();
dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
} }
if (m_Session->localDestination->IsReady ()) if (session->localDestination->IsReady ())
SendSessionCreateReplyOk (); SendSessionCreateReplyOk ();
else else
{ {
@ -335,30 +365,38 @@ namespace client
{ {
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
{ {
if (m_Session->localDestination->IsReady ()) auto session = m_Owner.FindSession(m_ID);
SendSessionCreateReplyOk (); if(session)
else
{ {
m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL)); if (session->localDestination->IsReady ())
m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer, SendSessionCreateReplyOk ();
shared_from_this (), std::placeholders::_1)); else
{
m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL));
m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer,
shared_from_this (), std::placeholders::_1));
}
} }
} }
} }
void SAMSocket::SendSessionCreateReplyOk () void SAMSocket::SendSessionCreateReplyOk ()
{ {
uint8_t buf[1024]; auto session = m_Owner.FindSession(m_ID);
char priv[1024]; if (session)
size_t l = m_Session->localDestination->GetPrivateKeys ().ToBuffer (buf, 1024); {
size_t l1 = i2p::data::ByteStreamToBase64 (buf, l, priv, 1024); uint8_t buf[1024];
priv[l1] = 0; char priv[1024];
size_t l = session->localDestination->GetPrivateKeys ().ToBuffer (buf, 1024);
size_t l1 = i2p::data::ByteStreamToBase64 (buf, l, priv, 1024);
priv[l1] = 0;
#ifdef _MSC_VER #ifdef _MSC_VER
size_t l2 = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv); size_t l2 = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv);
#else #else
size_t l2 = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv); size_t l2 = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv);
#endif #endif
SendMessageReply (m_Buffer, l2, false); SendMessageReply (m_Buffer, l2, false);
}
} }
void SAMSocket::ProcessStreamConnect (char * buf, size_t len, size_t rem) void SAMSocket::ProcessStreamConnect (char * buf, size_t len, size_t rem)
@ -371,8 +409,8 @@ namespace client
std::string& silent = params[SAM_PARAM_SILENT]; std::string& silent = params[SAM_PARAM_SILENT];
if (silent == SAM_VALUE_TRUE) m_IsSilent = true; if (silent == SAM_VALUE_TRUE) m_IsSilent = true;
m_ID = id; m_ID = id;
m_Session = m_Owner.FindSession (id); auto session = m_Owner.FindSession (id);
if (m_Session) if (session)
{ {
if (rem > 0) // handle follow on data if (rem > 0) // handle follow on data
{ {
@ -387,12 +425,12 @@ namespace client
if (l > 0) if (l > 0)
{ {
context.GetAddressBook().InsertAddress(dest); context.GetAddressBook().InsertAddress(dest);
auto leaseSet = m_Session->localDestination->FindLeaseSet(dest->GetIdentHash()); auto leaseSet = session->localDestination->FindLeaseSet(dest->GetIdentHash());
if (leaseSet) if (leaseSet)
Connect(leaseSet); Connect(leaseSet);
else else
{ {
m_Session->localDestination->RequestDestination(dest->GetIdentHash(), session->localDestination->RequestDestination(dest->GetIdentHash(),
std::bind(&SAMSocket::HandleConnectLeaseSetRequestComplete, std::bind(&SAMSocket::HandleConnectLeaseSetRequestComplete,
shared_from_this(), std::placeholders::_1)); shared_from_this(), std::placeholders::_1));
} }
@ -406,13 +444,17 @@ namespace client
void SAMSocket::Connect (std::shared_ptr<const i2p::data::LeaseSet> remote) void SAMSocket::Connect (std::shared_ptr<const i2p::data::LeaseSet> remote)
{ {
m_SocketType = eSAMSocketTypeStream; auto session = m_Owner.FindSession(m_ID);
m_Session->AddSocket (shared_from_this ()); if(session)
m_Stream = m_Session->localDestination->CreateStream (remote); {
m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send m_SocketType = eSAMSocketTypeStream;
m_BufferOffset = 0; session->AddSocket (shared_from_this ());
I2PReceive (); m_Stream = session->localDestination->CreateStream (remote);
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send
m_BufferOffset = 0;
I2PReceive ();
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
}
} }
void SAMSocket::HandleConnectLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet) void SAMSocket::HandleConnectLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet)
@ -435,15 +477,15 @@ namespace client
std::string& silent = params[SAM_PARAM_SILENT]; std::string& silent = params[SAM_PARAM_SILENT];
if (silent == SAM_VALUE_TRUE) m_IsSilent = true; if (silent == SAM_VALUE_TRUE) m_IsSilent = true;
m_ID = id; m_ID = id;
m_Session = m_Owner.FindSession (id); auto session = m_Owner.FindSession (id);
if (m_Session) if (session)
{ {
m_SocketType = eSAMSocketTypeAcceptor; m_SocketType = eSAMSocketTypeAcceptor;
m_Session->AddSocket (shared_from_this ()); session->AddSocket (shared_from_this ());
if (!m_Session->localDestination->IsAcceptingStreams ()) if (!session->localDestination->IsAcceptingStreams ())
{ {
m_IsAccepting = true; m_IsAccepting = true;
m_Session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
} }
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
} }
@ -459,9 +501,10 @@ namespace client
size_t size = std::stoi(params[SAM_PARAM_SIZE]), offset = data - buf; size_t size = std::stoi(params[SAM_PARAM_SIZE]), offset = data - buf;
if (offset + size <= len) if (offset + size <= len)
{ {
if (m_Session) auto session = m_Owner.FindSession(m_ID);
if (session)
{ {
auto d = m_Session->localDestination->GetDatagramDestination (); auto d = session->localDestination->GetDatagramDestination ();
if (d) if (d)
{ {
i2p::data::IdentityEx dest; i2p::data::IdentityEx dest;
@ -516,7 +559,8 @@ namespace client
std::string& name = params[SAM_PARAM_NAME]; std::string& name = params[SAM_PARAM_NAME];
std::shared_ptr<const i2p::data::IdentityEx> identity; std::shared_ptr<const i2p::data::IdentityEx> identity;
i2p::data::IdentHash ident; i2p::data::IdentHash ident;
auto dest = m_Session == nullptr ? context.GetSharedLocalDestination() : m_Session->localDestination; auto session = m_Owner.FindSession(m_ID);
auto dest = session == nullptr ? context.GetSharedLocalDestination() : session->localDestination;
if (name == "ME") if (name == "ME")
SendNamingLookupReply (dest->GetIdentity ()); SendNamingLookupReply (dest->GetIdentity ());
else if ((identity = context.GetAddressBook ().GetAddress (name)) != nullptr) else if ((identity = context.GetAddressBook ().GetAddress (name)) != nullptr)
@ -612,16 +656,18 @@ namespace client
LogPrint (eLogError, "SAM: Buffer is full, terminate"); LogPrint (eLogError, "SAM: Buffer is full, terminate");
Terminate ("Buffer is full"); Terminate ("Buffer is full");
return; return;
} } else if (m_Socket)
m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), m_Socket->async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage, std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
shared_from_this (), std::placeholders::_1, std::placeholders::_2)); shared_from_this (), std::placeholders::_1, std::placeholders::_2));
else
LogPrint(eLogError, "SAM: receive with no native socket");
} }
void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{ {
if (ecode) if (ecode)
{ {
LogPrint (eLogError, "SAM: read error: ", ecode.message ()); LogPrint (eLogError, "SAM: read error: ", ecode.message ());
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
Terminate ("read error"); Terminate ("read error");
@ -637,7 +683,7 @@ namespace client
[s](const boost::system::error_code& ecode) [s](const boost::system::error_code& ecode)
{ {
if (!ecode) if (!ecode)
s->Receive (); s->m_Owner.GetService ().post ([s] { s->Receive (); });
else else
s->m_Owner.GetService ().post ([s] { s->Terminate ("AsyncSend failed"); }); s->m_Owner.GetService ().post ([s] { s->Terminate ("AsyncSend failed"); });
}); });
@ -650,21 +696,21 @@ namespace client
if (m_Stream) if (m_Stream)
{ {
if (m_Stream->GetStatus () == i2p::stream::eStreamStatusNew || if (m_Stream->GetStatus () == i2p::stream::eStreamStatusNew ||
m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular
{ {
m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE), m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE),
std::bind (&SAMSocket::HandleI2PReceive, shared_from_this (), std::bind (&SAMSocket::HandleI2PReceive, shared_from_this(),
std::placeholders::_1, std::placeholders::_2), std::placeholders::_1, std::placeholders::_2),
SAM_SOCKET_CONNECTION_MAX_IDLE); SAM_SOCKET_CONNECTION_MAX_IDLE);
} }
else // closed by peer else // closed by peer
{ {
uint8_t * buff = new uint8_t[SAM_SOCKET_BUFFER_SIZE];
// get remaning data // get remaning data
auto len = m_Stream->ReadSome (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE); auto len = m_Stream->ReadSome (buff, SAM_SOCKET_BUFFER_SIZE);
if (len > 0) // still some data if (len > 0) // still some data
{ {
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len), WriteI2PDataImmediate(buff, len);
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1));
} }
else // no more data else // no more data
Terminate ("no more data"); Terminate ("no more data");
@ -672,6 +718,30 @@ namespace client
} }
} }
void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz)
{
if(m_Socket)
boost::asio::async_write (
*m_Socket,
boost::asio::buffer (buff, sz),
boost::asio::transfer_all(),
std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination
else
LogPrint(eLogError, "SAM: no native socket");
}
void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff)
{
delete [] buff;
}
void SAMSocket::WriteI2PData(size_t sz)
{
uint8_t * sendbuff = new uint8_t[sz];
memcpy(sendbuff, m_StreamBuffer, sz);
WriteI2PDataImmediate(sendbuff, sz);
}
void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{ {
if (ecode) if (ecode)
@ -680,8 +750,9 @@ namespace client
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
{ {
if (bytes_transferred > 0) if (bytes_transferred > 0)
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), {
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); // postpone termination WriteI2PData(bytes_transferred);
}
else else
{ {
auto s = shared_from_this (); auto s = shared_from_this ();
@ -696,13 +767,18 @@ namespace client
} }
else else
{ {
if (m_SocketType != eSAMSocketTypeTerminated) // check for possible race condition with Terminate() if (m_SocketType != eSAMSocketTypeTerminated)
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), {
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); if (bytes_transferred > 0)
{
WriteI2PData(bytes_transferred);
}
I2PReceive();
}
} }
} }
void SAMSocket::HandleWriteI2PData (const boost::system::error_code& ecode) void SAMSocket::HandleWriteI2PData (const boost::system::error_code& ecode, size_t bytes_transferred)
{ {
if (ecode) if (ecode)
{ {
@ -711,7 +787,9 @@ namespace client
Terminate ("socket write error at HandleWriteI2PData"); Terminate ("socket write error at HandleWriteI2PData");
} }
else else
{
I2PReceive (); I2PReceive ();
}
} }
void SAMSocket::HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream) void SAMSocket::HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream)
@ -760,39 +838,42 @@ namespace client
{ {
LogPrint (eLogDebug, "SAM: datagram received ", len); LogPrint (eLogDebug, "SAM: datagram received ", len);
auto base64 = from.ToBase64 (); auto base64 = from.ToBase64 ();
auto ep = m_Session->UDPEndpoint; auto session = m_Owner.FindSession(m_ID);
if (ep) if(session)
{ {
// udp forward enabled auto ep = session->UDPEndpoint;
size_t bsz = base64.size(); if (ep)
size_t sz = bsz + 1 + len;
// build datagram body
uint8_t * data = new uint8_t[sz];
// Destination
memcpy(data, base64.c_str(), bsz);
// linefeed
data[bsz] = '\n';
// Payload
memcpy(data+bsz+1, buf, len);
// send to remote endpoint
m_Owner.SendTo(data, sz, ep);
delete [] data;
}
else
{
#ifdef _MSC_VER
size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
#else
size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
#endif
if (len < SAM_SOCKET_BUFFER_SIZE - l)
{ {
memcpy (m_StreamBuffer + l, buf, len); // udp forward enabled
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len + l), size_t bsz = base64.size();
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); size_t sz = bsz + 1 + len;
// build datagram body
uint8_t * data = new uint8_t[sz];
// Destination
memcpy(data, base64.c_str(), bsz);
// linefeed
data[bsz] = '\n';
// Payload
memcpy(data+bsz+1, buf, len);
// send to remote endpoint
m_Owner.SendTo(data, sz, ep);
delete [] data;
} }
else else
LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer"); {
#ifdef _MSC_VER
size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
#else
size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
#endif
if (len < SAM_SOCKET_BUFFER_SIZE - l)
{
memcpy (m_StreamBuffer + l, buf, len);
WriteI2PData(len + l);
}
else
LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer");
}
} }
} }
@ -875,8 +956,9 @@ namespace client
void SAMBridge::Accept () void SAMBridge::Accept ()
{ {
auto newSocket = std::make_shared<SAMSocket> (*this); auto native = std::make_shared<boost::asio::ip::tcp::socket>(m_Service);
m_Acceptor.async_accept (newSocket->GetSocket (), std::bind (&SAMBridge::HandleAccept, this, auto newSocket = std::make_shared<SAMSocket> (*this, native);
m_Acceptor.async_accept (*native, std::bind (&SAMBridge::HandleAccept, this,
std::placeholders::_1, newSocket)); std::placeholders::_1, newSocket));
} }

View File

@ -79,11 +79,11 @@ namespace client
{ {
public: public:
SAMSocket (SAMBridge& owner); typedef boost::asio::ip::tcp::socket Socket_t;
SAMSocket (SAMBridge& owner, std::shared_ptr<Socket_t> socket);
~SAMSocket (); ~SAMSocket ();
void CloseStream (const char* reason); // TODO: implement it better
boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; }; boost::asio::ip::tcp::socket& GetSocket () { return *m_Socket; };
void ReceiveHandshake (); void ReceiveHandshake ();
void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; };
SAMSocketType GetSocketType () const { return m_SocketType; }; SAMSocketType GetSocketType () const { return m_SocketType; };
@ -103,7 +103,7 @@ namespace client
void I2PReceive (); void I2PReceive ();
void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream); void HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream);
void HandleWriteI2PData (const boost::system::error_code& ecode); void HandleWriteI2PData (const boost::system::error_code& ecode, size_t sz);
void HandleI2PDatagramReceive (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void HandleI2PDatagramReceive (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);
void ProcessSessionCreate (char * buf, size_t len); void ProcessSessionCreate (char * buf, size_t len);
@ -122,10 +122,15 @@ namespace client
void HandleSessionReadinessCheckTimer (const boost::system::error_code& ecode); void HandleSessionReadinessCheckTimer (const boost::system::error_code& ecode);
void SendSessionCreateReplyOk (); void SendSessionCreateReplyOk ();
void WriteI2PData(size_t sz);
void WriteI2PDataImmediate(uint8_t * ptr, size_t sz);
void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff);
private: private:
SAMBridge& m_Owner; SAMBridge& m_Owner;
boost::asio::ip::tcp::socket m_Socket; std::shared_ptr<Socket_t> m_Socket;
boost::asio::deadline_timer m_Timer; boost::asio::deadline_timer m_Timer;
char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1]; char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1];
size_t m_BufferOffset; size_t m_BufferOffset;
@ -135,7 +140,6 @@ namespace client
bool m_IsSilent; bool m_IsSilent;
bool m_IsAccepting; // for eSAMSocketTypeAcceptor only bool m_IsAccepting; // for eSAMSocketTypeAcceptor only
std::shared_ptr<i2p::stream::Stream> m_Stream; std::shared_ptr<i2p::stream::Stream> m_Stream;
std::shared_ptr<SAMSession> m_Session;
}; };
struct SAMSession struct SAMSession
@ -146,15 +150,15 @@ namespace client
std::mutex m_SocketsMutex; std::mutex m_SocketsMutex;
/** safely add a socket to this session */ /** safely add a socket to this session */
void AddSocket(const std::shared_ptr<SAMSocket> & sock) { void AddSocket(std::shared_ptr<SAMSocket> sock) {
std::lock_guard<std::mutex> lock(m_SocketsMutex); std::lock_guard<std::mutex> lock(m_SocketsMutex);
m_Sockets.push_back(sock); m_Sockets.push_back(sock);
} }
/** safely remove a socket from this session */ /** safely remove a socket from this session */
void DelSocket(const std::shared_ptr<SAMSocket> & sock) { void DelSocket(SAMSocket * sock) {
std::lock_guard<std::mutex> lock(m_SocketsMutex); std::lock_guard<std::mutex> lock(m_SocketsMutex);
m_Sockets.remove(sock); m_Sockets.remove_if([sock](const std::shared_ptr<SAMSocket> s) -> bool { return s.get() == sock; });
} }
/** get a list holding a copy of all sam sockets from this session */ /** get a list holding a copy of all sam sockets from this session */