mirror of
https://github.com/PurpleI2P/i2pd.git
synced 2025-01-18 16:49:58 +00:00
Merge pull request #1058 from majestrate/streaming_race_fix_2018_01_15
Streaming race fix 2018 01 15
This commit is contained in:
commit
39ca07bcc6
@ -378,9 +378,15 @@ namespace stream
|
||||
|
||||
size_t Stream::Send (const uint8_t * buf, size_t len)
|
||||
{
|
||||
// TODO: check max buffer size
|
||||
size_t sent = len;
|
||||
while(len > MAX_PACKET_SIZE)
|
||||
{
|
||||
AsyncSend (buf, MAX_PACKET_SIZE, nullptr);
|
||||
buf += MAX_PACKET_SIZE;
|
||||
len -= MAX_PACKET_SIZE;
|
||||
}
|
||||
AsyncSend (buf, len, nullptr);
|
||||
return len;
|
||||
return sent;
|
||||
}
|
||||
|
||||
void Stream::AsyncSend (const uint8_t * buf, size_t len, SendHandler handler)
|
||||
|
@ -276,10 +276,9 @@ namespace stream
|
||||
/** set max connections per minute per destination */
|
||||
void SetMaxConnsPerMinute(const uint32_t conns);
|
||||
|
||||
Packet * NewPacket () { return m_PacketsPool.Acquire (); }
|
||||
void DeletePacket (Packet * p) { m_PacketsPool.Release (p); }
|
||||
Packet * NewPacket () { return m_PacketsPool.Acquire(); }
|
||||
void DeletePacket (Packet * p) { return m_PacketsPool.Release(p); }
|
||||
|
||||
private:
|
||||
|
||||
void AcceptOnceAcceptor (std::shared_ptr<Stream> stream, Acceptor acceptor, Acceptor prev);
|
||||
|
||||
@ -334,16 +333,21 @@ namespace stream
|
||||
void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout)
|
||||
{
|
||||
auto s = shared_from_this();
|
||||
m_Service.post ([=](void)
|
||||
m_Service.post ([s, buffer, handler, timeout](void)
|
||||
{
|
||||
if (!s->m_ReceiveQueue.empty () || s->m_Status == eStreamStatusReset)
|
||||
s->HandleReceiveTimer (boost::asio::error::make_error_code (boost::asio::error::operation_aborted), buffer, handler, 0);
|
||||
else
|
||||
{
|
||||
int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout;
|
||||
int t = (timeout > MAX_RECEIVE_TIMEOUT) ? MAX_RECEIVE_TIMEOUT : timeout;
|
||||
s->m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(t));
|
||||
s->m_ReceiveTimer.async_wait ([=](const boost::system::error_code& ecode)
|
||||
{ s->HandleReceiveTimer (ecode, buffer, handler, timeout - t); });
|
||||
int left = timeout - t;
|
||||
auto self = s->shared_from_this();
|
||||
self->m_ReceiveTimer.async_wait (
|
||||
[self, buffer, handler, left](const boost::system::error_code & ec)
|
||||
{
|
||||
self->HandleReceiveTimer(ec, buffer, handler, left);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -15,32 +15,23 @@ namespace i2p
|
||||
{
|
||||
namespace client
|
||||
{
|
||||
SAMSocket::SAMSocket (SAMBridge& owner):
|
||||
m_Owner (owner), m_Socket (m_Owner.GetService ()), m_Timer (m_Owner.GetService ()),
|
||||
m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false),
|
||||
m_IsAccepting (false), m_Stream (nullptr), m_Session (nullptr)
|
||||
SAMSocket::SAMSocket (SAMBridge& owner, std::shared_ptr<Socket_t> socket):
|
||||
m_Owner (owner), m_Socket(socket), m_Timer (m_Owner.GetService ()),
|
||||
m_BufferOffset (0),
|
||||
m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false),
|
||||
m_IsAccepting (false), m_Stream (nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
SAMSocket::~SAMSocket ()
|
||||
{
|
||||
Terminate ("~SAMSocket()");
|
||||
}
|
||||
|
||||
void SAMSocket::CloseStream (const char* reason)
|
||||
{
|
||||
LogPrint (eLogDebug, "SAMSocket::CloseStream, reason: ", reason);
|
||||
if (m_Stream)
|
||||
if(m_Stream)
|
||||
{
|
||||
m_Stream->Close ();
|
||||
m_Stream.reset ();
|
||||
}
|
||||
}
|
||||
|
||||
void SAMSocket::Terminate (const char* reason)
|
||||
{
|
||||
CloseStream (reason);
|
||||
|
||||
auto Session = m_Owner.FindSession(m_ID);
|
||||
|
||||
switch (m_SocketType)
|
||||
{
|
||||
case eSAMSocketTypeSession:
|
||||
@ -48,17 +39,17 @@ namespace client
|
||||
break;
|
||||
case eSAMSocketTypeStream:
|
||||
{
|
||||
if (m_Session)
|
||||
m_Session->DelSocket (shared_from_this ());
|
||||
if (Session)
|
||||
Session->DelSocket (this);
|
||||
break;
|
||||
}
|
||||
case eSAMSocketTypeAcceptor:
|
||||
{
|
||||
if (m_Session)
|
||||
if (Session)
|
||||
{
|
||||
m_Session->DelSocket (shared_from_this ());
|
||||
if (m_IsAccepting && m_Session->localDestination)
|
||||
m_Session->localDestination->StopAcceptingStreams ();
|
||||
Session->DelSocket (this);
|
||||
if (m_IsAccepting && Session->localDestination)
|
||||
Session->localDestination->StopAcceptingStreams ();
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -66,15 +57,54 @@ namespace client
|
||||
;
|
||||
}
|
||||
m_SocketType = eSAMSocketTypeTerminated;
|
||||
if (m_Socket.is_open()) m_Socket.close ();
|
||||
m_Session = nullptr;
|
||||
if (m_Socket && m_Socket->is_open()) m_Socket->close ();
|
||||
m_Socket.reset ();
|
||||
}
|
||||
|
||||
void SAMSocket::Terminate (const char* reason)
|
||||
{
|
||||
if(m_Stream)
|
||||
{
|
||||
m_Stream->Close ();
|
||||
m_Stream.reset ();
|
||||
}
|
||||
auto Session = m_Owner.FindSession(m_ID);
|
||||
|
||||
switch (m_SocketType)
|
||||
{
|
||||
case eSAMSocketTypeSession:
|
||||
m_Owner.CloseSession (m_ID);
|
||||
break;
|
||||
case eSAMSocketTypeStream:
|
||||
{
|
||||
if (Session)
|
||||
Session->DelSocket (this);
|
||||
break;
|
||||
}
|
||||
case eSAMSocketTypeAcceptor:
|
||||
{
|
||||
if (Session)
|
||||
{
|
||||
Session->DelSocket (this);
|
||||
if (m_IsAccepting && Session->localDestination)
|
||||
Session->localDestination->StopAcceptingStreams ();
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
;
|
||||
}
|
||||
m_SocketType = eSAMSocketTypeTerminated;
|
||||
if (m_Socket && m_Socket->is_open()) m_Socket->close ();
|
||||
m_Socket.reset ();
|
||||
}
|
||||
|
||||
void SAMSocket::ReceiveHandshake ()
|
||||
{
|
||||
m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
|
||||
std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2));
|
||||
if(m_Socket)
|
||||
m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
|
||||
std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
|
||||
void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
@ -121,7 +151,7 @@ namespace client
|
||||
#else
|
||||
size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ());
|
||||
#endif
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (),
|
||||
boost::asio::async_write (*m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (),
|
||||
std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
@ -144,9 +174,9 @@ namespace client
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ("SAM: handshake reply send error");
|
||||
}
|
||||
else
|
||||
else if(m_Socket)
|
||||
{
|
||||
m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
|
||||
m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
|
||||
std::bind(&SAMSocket::HandleMessage, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
@ -157,7 +187,7 @@ namespace client
|
||||
LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg);
|
||||
|
||||
if (!m_IsSilent)
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (),
|
||||
boost::asio::async_write (*m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (),
|
||||
std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2, close));
|
||||
else
|
||||
@ -306,19 +336,19 @@ namespace client
|
||||
}
|
||||
|
||||
// create destination
|
||||
m_Session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, ¶ms);
|
||||
if (m_Session)
|
||||
auto session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, ¶ms);
|
||||
if (session)
|
||||
{
|
||||
m_SocketType = eSAMSocketTypeSession;
|
||||
if (style == SAM_VALUE_DATAGRAM)
|
||||
{
|
||||
m_Session->UDPEndpoint = forward;
|
||||
auto dest = m_Session->localDestination->CreateDatagramDestination ();
|
||||
session->UDPEndpoint = forward;
|
||||
auto dest = session->localDestination->CreateDatagramDestination ();
|
||||
dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (),
|
||||
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
|
||||
}
|
||||
|
||||
if (m_Session->localDestination->IsReady ())
|
||||
if (session->localDestination->IsReady ())
|
||||
SendSessionCreateReplyOk ();
|
||||
else
|
||||
{
|
||||
@ -335,30 +365,38 @@ namespace client
|
||||
{
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
if (m_Session->localDestination->IsReady ())
|
||||
SendSessionCreateReplyOk ();
|
||||
else
|
||||
auto session = m_Owner.FindSession(m_ID);
|
||||
if(session)
|
||||
{
|
||||
m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL));
|
||||
m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer,
|
||||
shared_from_this (), std::placeholders::_1));
|
||||
if (session->localDestination->IsReady ())
|
||||
SendSessionCreateReplyOk ();
|
||||
else
|
||||
{
|
||||
m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL));
|
||||
m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer,
|
||||
shared_from_this (), std::placeholders::_1));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SAMSocket::SendSessionCreateReplyOk ()
|
||||
{
|
||||
uint8_t buf[1024];
|
||||
char priv[1024];
|
||||
size_t l = m_Session->localDestination->GetPrivateKeys ().ToBuffer (buf, 1024);
|
||||
size_t l1 = i2p::data::ByteStreamToBase64 (buf, l, priv, 1024);
|
||||
priv[l1] = 0;
|
||||
auto session = m_Owner.FindSession(m_ID);
|
||||
if (session)
|
||||
{
|
||||
uint8_t buf[1024];
|
||||
char priv[1024];
|
||||
size_t l = session->localDestination->GetPrivateKeys ().ToBuffer (buf, 1024);
|
||||
size_t l1 = i2p::data::ByteStreamToBase64 (buf, l, priv, 1024);
|
||||
priv[l1] = 0;
|
||||
#ifdef _MSC_VER
|
||||
size_t l2 = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv);
|
||||
size_t l2 = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv);
|
||||
#else
|
||||
size_t l2 = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv);
|
||||
size_t l2 = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv);
|
||||
#endif
|
||||
SendMessageReply (m_Buffer, l2, false);
|
||||
SendMessageReply (m_Buffer, l2, false);
|
||||
}
|
||||
}
|
||||
|
||||
void SAMSocket::ProcessStreamConnect (char * buf, size_t len, size_t rem)
|
||||
@ -371,8 +409,8 @@ namespace client
|
||||
std::string& silent = params[SAM_PARAM_SILENT];
|
||||
if (silent == SAM_VALUE_TRUE) m_IsSilent = true;
|
||||
m_ID = id;
|
||||
m_Session = m_Owner.FindSession (id);
|
||||
if (m_Session)
|
||||
auto session = m_Owner.FindSession (id);
|
||||
if (session)
|
||||
{
|
||||
if (rem > 0) // handle follow on data
|
||||
{
|
||||
@ -387,12 +425,12 @@ namespace client
|
||||
if (l > 0)
|
||||
{
|
||||
context.GetAddressBook().InsertAddress(dest);
|
||||
auto leaseSet = m_Session->localDestination->FindLeaseSet(dest->GetIdentHash());
|
||||
auto leaseSet = session->localDestination->FindLeaseSet(dest->GetIdentHash());
|
||||
if (leaseSet)
|
||||
Connect(leaseSet);
|
||||
else
|
||||
{
|
||||
m_Session->localDestination->RequestDestination(dest->GetIdentHash(),
|
||||
session->localDestination->RequestDestination(dest->GetIdentHash(),
|
||||
std::bind(&SAMSocket::HandleConnectLeaseSetRequestComplete,
|
||||
shared_from_this(), std::placeholders::_1));
|
||||
}
|
||||
@ -406,13 +444,17 @@ namespace client
|
||||
|
||||
void SAMSocket::Connect (std::shared_ptr<const i2p::data::LeaseSet> remote)
|
||||
{
|
||||
m_SocketType = eSAMSocketTypeStream;
|
||||
m_Session->AddSocket (shared_from_this ());
|
||||
m_Stream = m_Session->localDestination->CreateStream (remote);
|
||||
m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send
|
||||
m_BufferOffset = 0;
|
||||
I2PReceive ();
|
||||
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
|
||||
auto session = m_Owner.FindSession(m_ID);
|
||||
if(session)
|
||||
{
|
||||
m_SocketType = eSAMSocketTypeStream;
|
||||
session->AddSocket (shared_from_this ());
|
||||
m_Stream = session->localDestination->CreateStream (remote);
|
||||
m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send
|
||||
m_BufferOffset = 0;
|
||||
I2PReceive ();
|
||||
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
|
||||
}
|
||||
}
|
||||
|
||||
void SAMSocket::HandleConnectLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet)
|
||||
@ -435,15 +477,15 @@ namespace client
|
||||
std::string& silent = params[SAM_PARAM_SILENT];
|
||||
if (silent == SAM_VALUE_TRUE) m_IsSilent = true;
|
||||
m_ID = id;
|
||||
m_Session = m_Owner.FindSession (id);
|
||||
if (m_Session)
|
||||
auto session = m_Owner.FindSession (id);
|
||||
if (session)
|
||||
{
|
||||
m_SocketType = eSAMSocketTypeAcceptor;
|
||||
m_Session->AddSocket (shared_from_this ());
|
||||
if (!m_Session->localDestination->IsAcceptingStreams ())
|
||||
session->AddSocket (shared_from_this ());
|
||||
if (!session->localDestination->IsAcceptingStreams ())
|
||||
{
|
||||
m_IsAccepting = true;
|
||||
m_Session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
|
||||
m_IsAccepting = true;
|
||||
session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
|
||||
}
|
||||
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
|
||||
}
|
||||
@ -459,9 +501,10 @@ namespace client
|
||||
size_t size = std::stoi(params[SAM_PARAM_SIZE]), offset = data - buf;
|
||||
if (offset + size <= len)
|
||||
{
|
||||
if (m_Session)
|
||||
auto session = m_Owner.FindSession(m_ID);
|
||||
if (session)
|
||||
{
|
||||
auto d = m_Session->localDestination->GetDatagramDestination ();
|
||||
auto d = session->localDestination->GetDatagramDestination ();
|
||||
if (d)
|
||||
{
|
||||
i2p::data::IdentityEx dest;
|
||||
@ -516,7 +559,8 @@ namespace client
|
||||
std::string& name = params[SAM_PARAM_NAME];
|
||||
std::shared_ptr<const i2p::data::IdentityEx> identity;
|
||||
i2p::data::IdentHash ident;
|
||||
auto dest = m_Session == nullptr ? context.GetSharedLocalDestination() : m_Session->localDestination;
|
||||
auto session = m_Owner.FindSession(m_ID);
|
||||
auto dest = session == nullptr ? context.GetSharedLocalDestination() : session->localDestination;
|
||||
if (name == "ME")
|
||||
SendNamingLookupReply (dest->GetIdentity ());
|
||||
else if ((identity = context.GetAddressBook ().GetAddress (name)) != nullptr)
|
||||
@ -612,16 +656,18 @@ namespace client
|
||||
LogPrint (eLogError, "SAM: Buffer is full, terminate");
|
||||
Terminate ("Buffer is full");
|
||||
return;
|
||||
}
|
||||
m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
|
||||
std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
|
||||
shared_from_this (), std::placeholders::_1, std::placeholders::_2));
|
||||
} else if (m_Socket)
|
||||
m_Socket->async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
|
||||
std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
|
||||
shared_from_this (), std::placeholders::_1, std::placeholders::_2));
|
||||
else
|
||||
LogPrint(eLogError, "SAM: receive with no native socket");
|
||||
}
|
||||
|
||||
void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
{
|
||||
LogPrint (eLogError, "SAM: read error: ", ecode.message ());
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
Terminate ("read error");
|
||||
@ -637,8 +683,8 @@ namespace client
|
||||
[s](const boost::system::error_code& ecode)
|
||||
{
|
||||
if (!ecode)
|
||||
s->Receive ();
|
||||
else
|
||||
s->m_Owner.GetService ().post ([s] { s->Receive (); });
|
||||
else
|
||||
s->m_Owner.GetService ().post ([s] { s->Terminate ("AsyncSend failed"); });
|
||||
});
|
||||
}
|
||||
@ -650,21 +696,21 @@ namespace client
|
||||
if (m_Stream)
|
||||
{
|
||||
if (m_Stream->GetStatus () == i2p::stream::eStreamStatusNew ||
|
||||
m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular
|
||||
m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular
|
||||
{
|
||||
m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE),
|
||||
std::bind (&SAMSocket::HandleI2PReceive, shared_from_this (),
|
||||
std::bind (&SAMSocket::HandleI2PReceive, shared_from_this(),
|
||||
std::placeholders::_1, std::placeholders::_2),
|
||||
SAM_SOCKET_CONNECTION_MAX_IDLE);
|
||||
SAM_SOCKET_CONNECTION_MAX_IDLE);
|
||||
}
|
||||
else // closed by peer
|
||||
{
|
||||
uint8_t * buff = new uint8_t[SAM_SOCKET_BUFFER_SIZE];
|
||||
// get remaning data
|
||||
auto len = m_Stream->ReadSome (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE);
|
||||
auto len = m_Stream->ReadSome (buff, SAM_SOCKET_BUFFER_SIZE);
|
||||
if (len > 0) // still some data
|
||||
{
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len),
|
||||
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1));
|
||||
WriteI2PDataImmediate(buff, len);
|
||||
}
|
||||
else // no more data
|
||||
Terminate ("no more data");
|
||||
@ -672,6 +718,30 @@ namespace client
|
||||
}
|
||||
}
|
||||
|
||||
void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz)
|
||||
{
|
||||
if(m_Socket)
|
||||
boost::asio::async_write (
|
||||
*m_Socket,
|
||||
boost::asio::buffer (buff, sz),
|
||||
boost::asio::transfer_all(),
|
||||
std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination
|
||||
else
|
||||
LogPrint(eLogError, "SAM: no native socket");
|
||||
}
|
||||
|
||||
void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff)
|
||||
{
|
||||
delete [] buff;
|
||||
}
|
||||
|
||||
void SAMSocket::WriteI2PData(size_t sz)
|
||||
{
|
||||
uint8_t * sendbuff = new uint8_t[sz];
|
||||
memcpy(sendbuff, m_StreamBuffer, sz);
|
||||
WriteI2PDataImmediate(sendbuff, sz);
|
||||
}
|
||||
|
||||
void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred)
|
||||
{
|
||||
if (ecode)
|
||||
@ -680,8 +750,9 @@ namespace client
|
||||
if (ecode != boost::asio::error::operation_aborted)
|
||||
{
|
||||
if (bytes_transferred > 0)
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred),
|
||||
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); // postpone termination
|
||||
{
|
||||
WriteI2PData(bytes_transferred);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto s = shared_from_this ();
|
||||
@ -696,13 +767,18 @@ namespace client
|
||||
}
|
||||
else
|
||||
{
|
||||
if (m_SocketType != eSAMSocketTypeTerminated) // check for possible race condition with Terminate()
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred),
|
||||
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1));
|
||||
if (m_SocketType != eSAMSocketTypeTerminated)
|
||||
{
|
||||
if (bytes_transferred > 0)
|
||||
{
|
||||
WriteI2PData(bytes_transferred);
|
||||
}
|
||||
I2PReceive();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SAMSocket::HandleWriteI2PData (const boost::system::error_code& ecode)
|
||||
void SAMSocket::HandleWriteI2PData (const boost::system::error_code& ecode, size_t bytes_transferred)
|
||||
{
|
||||
if (ecode)
|
||||
{
|
||||
@ -711,7 +787,9 @@ namespace client
|
||||
Terminate ("socket write error at HandleWriteI2PData");
|
||||
}
|
||||
else
|
||||
{
|
||||
I2PReceive ();
|
||||
}
|
||||
}
|
||||
|
||||
void SAMSocket::HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream)
|
||||
@ -760,39 +838,42 @@ namespace client
|
||||
{
|
||||
LogPrint (eLogDebug, "SAM: datagram received ", len);
|
||||
auto base64 = from.ToBase64 ();
|
||||
auto ep = m_Session->UDPEndpoint;
|
||||
if (ep)
|
||||
auto session = m_Owner.FindSession(m_ID);
|
||||
if(session)
|
||||
{
|
||||
// udp forward enabled
|
||||
size_t bsz = base64.size();
|
||||
size_t sz = bsz + 1 + len;
|
||||
// build datagram body
|
||||
uint8_t * data = new uint8_t[sz];
|
||||
// Destination
|
||||
memcpy(data, base64.c_str(), bsz);
|
||||
// linefeed
|
||||
data[bsz] = '\n';
|
||||
// Payload
|
||||
memcpy(data+bsz+1, buf, len);
|
||||
// send to remote endpoint
|
||||
m_Owner.SendTo(data, sz, ep);
|
||||
delete [] data;
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef _MSC_VER
|
||||
size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
|
||||
#else
|
||||
size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
|
||||
#endif
|
||||
if (len < SAM_SOCKET_BUFFER_SIZE - l)
|
||||
auto ep = session->UDPEndpoint;
|
||||
if (ep)
|
||||
{
|
||||
memcpy (m_StreamBuffer + l, buf, len);
|
||||
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len + l),
|
||||
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1));
|
||||
// udp forward enabled
|
||||
size_t bsz = base64.size();
|
||||
size_t sz = bsz + 1 + len;
|
||||
// build datagram body
|
||||
uint8_t * data = new uint8_t[sz];
|
||||
// Destination
|
||||
memcpy(data, base64.c_str(), bsz);
|
||||
// linefeed
|
||||
data[bsz] = '\n';
|
||||
// Payload
|
||||
memcpy(data+bsz+1, buf, len);
|
||||
// send to remote endpoint
|
||||
m_Owner.SendTo(data, sz, ep);
|
||||
delete [] data;
|
||||
}
|
||||
else
|
||||
LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer");
|
||||
{
|
||||
#ifdef _MSC_VER
|
||||
size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
|
||||
#else
|
||||
size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
|
||||
#endif
|
||||
if (len < SAM_SOCKET_BUFFER_SIZE - l)
|
||||
{
|
||||
memcpy (m_StreamBuffer + l, buf, len);
|
||||
WriteI2PData(len + l);
|
||||
}
|
||||
else
|
||||
LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -875,8 +956,9 @@ namespace client
|
||||
|
||||
void SAMBridge::Accept ()
|
||||
{
|
||||
auto newSocket = std::make_shared<SAMSocket> (*this);
|
||||
m_Acceptor.async_accept (newSocket->GetSocket (), std::bind (&SAMBridge::HandleAccept, this,
|
||||
auto native = std::make_shared<boost::asio::ip::tcp::socket>(m_Service);
|
||||
auto newSocket = std::make_shared<SAMSocket> (*this, native);
|
||||
m_Acceptor.async_accept (*native, std::bind (&SAMBridge::HandleAccept, this,
|
||||
std::placeholders::_1, newSocket));
|
||||
}
|
||||
|
||||
|
@ -79,18 +79,18 @@ namespace client
|
||||
{
|
||||
public:
|
||||
|
||||
SAMSocket (SAMBridge& owner);
|
||||
~SAMSocket ();
|
||||
void CloseStream (const char* reason); // TODO: implement it better
|
||||
typedef boost::asio::ip::tcp::socket Socket_t;
|
||||
SAMSocket (SAMBridge& owner, std::shared_ptr<Socket_t> socket);
|
||||
~SAMSocket ();
|
||||
|
||||
boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; };
|
||||
boost::asio::ip::tcp::socket& GetSocket () { return *m_Socket; };
|
||||
void ReceiveHandshake ();
|
||||
void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; };
|
||||
SAMSocketType GetSocketType () const { return m_SocketType; };
|
||||
|
||||
void Terminate (const char* reason);
|
||||
void Terminate (const char* reason);
|
||||
|
||||
private:
|
||||
private:
|
||||
|
||||
void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
@ -103,7 +103,7 @@ namespace client
|
||||
void I2PReceive ();
|
||||
void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred);
|
||||
void HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream);
|
||||
void HandleWriteI2PData (const boost::system::error_code& ecode);
|
||||
void HandleWriteI2PData (const boost::system::error_code& ecode, size_t sz);
|
||||
void HandleI2PDatagramReceive (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len);
|
||||
|
||||
void ProcessSessionCreate (char * buf, size_t len);
|
||||
@ -122,10 +122,14 @@ namespace client
|
||||
void HandleSessionReadinessCheckTimer (const boost::system::error_code& ecode);
|
||||
void SendSessionCreateReplyOk ();
|
||||
|
||||
void WriteI2PData(size_t sz);
|
||||
void WriteI2PDataImmediate(uint8_t * ptr, size_t sz);
|
||||
|
||||
void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff);
|
||||
private:
|
||||
|
||||
SAMBridge& m_Owner;
|
||||
boost::asio::ip::tcp::socket m_Socket;
|
||||
std::shared_ptr<Socket_t> m_Socket;
|
||||
boost::asio::deadline_timer m_Timer;
|
||||
char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1];
|
||||
size_t m_BufferOffset;
|
||||
@ -135,7 +139,6 @@ namespace client
|
||||
bool m_IsSilent;
|
||||
bool m_IsAccepting; // for eSAMSocketTypeAcceptor only
|
||||
std::shared_ptr<i2p::stream::Stream> m_Stream;
|
||||
std::shared_ptr<SAMSession> m_Session;
|
||||
};
|
||||
|
||||
struct SAMSession
|
||||
@ -146,15 +149,15 @@ namespace client
|
||||
std::mutex m_SocketsMutex;
|
||||
|
||||
/** safely add a socket to this session */
|
||||
void AddSocket(const std::shared_ptr<SAMSocket> & sock) {
|
||||
void AddSocket(std::shared_ptr<SAMSocket> sock) {
|
||||
std::lock_guard<std::mutex> lock(m_SocketsMutex);
|
||||
m_Sockets.push_back(sock);
|
||||
}
|
||||
|
||||
/** safely remove a socket from this session */
|
||||
void DelSocket(const std::shared_ptr<SAMSocket> & sock) {
|
||||
void DelSocket(SAMSocket * sock) {
|
||||
std::lock_guard<std::mutex> lock(m_SocketsMutex);
|
||||
m_Sockets.remove(sock);
|
||||
m_Sockets.remove_if([sock](const std::shared_ptr<SAMSocket> s) -> bool { return s.get() == sock; });
|
||||
}
|
||||
|
||||
/** get a list holding a copy of all sam sockets from this session */
|
||||
|
Loading…
x
Reference in New Issue
Block a user