Browse Source

Merge pull request #790 from majestrate/sam-datagrams

udp datagrams and whitespace cleanups in SAM
pull/791/head
orignal 8 years ago committed by GitHub
parent
commit
9030b3e04c
  1. 2
      Makefile.linux
  2. 314
      SAM.cpp
  3. 63
      SAM.h

2
Makefile.linux

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
# set defaults instead redefine
CXXFLAGS ?= -g -Wall -Wextra -Wno-unused-parameter -pedantic
CXXFLAGS ?= -g -Wall -Wextra -Wno-unused-parameter -pedantic -Wno-misleading-indentation
INCFLAGS ?=
## NOTE: The NEEDED_CXXFLAGS are here so that custom CXXFLAGS can be specified at build time

314
SAM.cpp

@ -15,9 +15,9 @@ namespace i2p @@ -15,9 +15,9 @@ namespace i2p
{
namespace client
{
SAMSocket::SAMSocket (SAMBridge& owner):
SAMSocket::SAMSocket (SAMBridge& owner):
m_Owner (owner), m_Socket (m_Owner.GetService ()), m_Timer (m_Owner.GetService ()),
m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false),
m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false),
m_Stream (nullptr), m_Session (nullptr)
{
}
@ -25,21 +25,21 @@ namespace client @@ -25,21 +25,21 @@ namespace client
SAMSocket::~SAMSocket ()
{
Terminate ();
}
}
void SAMSocket::CloseStream ()
{
if (m_Stream)
{
{
m_Stream->Close ();
m_Stream.reset ();
}
}
}
}
void SAMSocket::Terminate ()
{
CloseStream ();
switch (m_SocketType)
{
case eSAMSocketTypeSession:
@ -47,14 +47,14 @@ namespace client @@ -47,14 +47,14 @@ namespace client
break;
case eSAMSocketTypeStream:
{
if (m_Session)
if (m_Session)
m_Session->DelSocket (shared_from_this ());
break;
}
case eSAMSocketTypeAcceptor:
{
if (m_Session)
{
{
m_Session->DelSocket (shared_from_this ());
if (m_Session->localDestination)
m_Session->localDestination->StopAcceptingStreams ();
@ -71,8 +71,8 @@ namespace client @@ -71,8 +71,8 @@ namespace client
void SAMSocket::ReceiveHandshake ()
{
m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
std::placeholders::_1, std::placeholders::_2));
}
@ -85,7 +85,7 @@ namespace client @@ -85,7 +85,7 @@ namespace client
Terminate ();
}
else
{
{
m_Buffer[bytes_transferred] = 0;
char * eol = (char *)memchr (m_Buffer, '\n', bytes_transferred);
if (eol)
@ -94,8 +94,8 @@ namespace client @@ -94,8 +94,8 @@ namespace client
char * separator = strchr (m_Buffer, ' ');
if (separator)
{
separator = strchr (separator + 1, ' ');
if (separator)
separator = strchr (separator + 1, ' ');
if (separator)
*separator = 0;
}
@ -117,13 +117,13 @@ namespace client @@ -117,13 +117,13 @@ namespace client
{
#ifdef _MSC_VER
size_t l = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ());
#else
#else
size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ());
#endif
boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (),
std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (),
std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (),
std::placeholders::_1, std::placeholders::_2));
}
}
else
SendMessageReply (SAM_HANDSHAKE_I2P_ERROR, strlen (SAM_HANDSHAKE_I2P_ERROR), true);
}
@ -145,25 +145,25 @@ namespace client @@ -145,25 +145,25 @@ namespace client
}
else
{
m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
std::bind(&SAMSocket::HandleMessage, shared_from_this (),
std::placeholders::_1, std::placeholders::_2));
}
m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
std::bind(&SAMSocket::HandleMessage, shared_from_this (),
std::placeholders::_1, std::placeholders::_2));
}
}
void SAMSocket::SendMessageReply (const char * msg, size_t len, bool close)
{
if (!m_IsSilent)
if (!m_IsSilent)
boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (),
std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (),
std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, close));
else
{
if (close)
Terminate ();
else
Receive ();
}
Receive ();
}
}
void SAMSocket::HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close)
@ -179,8 +179,8 @@ namespace client @@ -179,8 +179,8 @@ namespace client
if (close)
Terminate ();
else
Receive ();
}
Receive ();
}
}
void SAMSocket::HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred)
@ -205,8 +205,8 @@ namespace client @@ -205,8 +205,8 @@ namespace client
char * separator = strchr (m_Buffer, ' ');
if (separator)
{
separator = strchr (separator + 1, ' ');
if (separator)
separator = strchr (separator + 1, ' ');
if (separator)
*separator = 0;
else
separator = eol;
@ -236,12 +236,12 @@ namespace client @@ -236,12 +236,12 @@ namespace client
*separator = ' ';
*eol = '\n';
}
}
}
// since it's SAM v1 reply is not expected
Receive ();
}
else
{
else
{
LogPrint (eLogError, "SAM: unexpected message ", m_Buffer);
Terminate ();
}
@ -252,8 +252,9 @@ namespace client @@ -252,8 +252,9 @@ namespace client
Terminate ();
}
}
else
{
{
LogPrint (eLogWarning, "SAM: incomplete message ", bytes_transferred);
m_BufferOffset = bytes_transferred;
// try to receive remaining message
@ -267,10 +268,10 @@ namespace client @@ -267,10 +268,10 @@ namespace client
LogPrint (eLogDebug, "SAM: session create: ", buf);
std::map<std::string, std::string> params;
ExtractParams (buf, params);
std::string& style = params[SAM_PARAM_STYLE];
std::string& style = params[SAM_PARAM_STYLE];
std::string& id = params[SAM_PARAM_ID];
std::string& destination = params[SAM_PARAM_DESTINATION];
m_ID = id;
m_ID = id;
if (m_Owner.FindSession (id))
{
// session exists
@ -278,15 +279,39 @@ namespace client @@ -278,15 +279,39 @@ namespace client
return;
}
// create destination
m_Session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, &params);
std::shared_ptr<boost::asio::ip::udp::endpoint> forward = nullptr;
if (style == SAM_VALUE_DATAGRAM && params.find(SAM_VALUE_HOST) != params.end() && params.find(SAM_VALUE_PORT) != params.end())
{
// udp forward selected
boost::system::error_code e;
// TODO: support hostnames in udp forward
auto addr = boost::asio::ip::address::from_string(params[SAM_VALUE_HOST], e);
if (e)
{
// not an ip address
SendI2PError("Invalid IP Address in HOST");
return;
}
auto port = std::stoi(params[SAM_VALUE_PORT]);
if (port == -1)
{
SendI2PError("Invalid port");
return;
}
forward = std::make_shared<boost::asio::ip::udp::endpoint>(addr, port);
}
// create destination
m_Session = m_Owner.CreateSession (id, destination == SAM_VALUE_TRANSIENT ? "" : destination, &params);
if (m_Session)
{
m_SocketType = eSAMSocketTypeSession;
if (style == SAM_VALUE_DATAGRAM)
{
m_Session->UDPEndpoint = forward;
auto dest = m_Session->localDestination->CreateDatagramDestination ();
dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (),
dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
}
@ -296,7 +321,7 @@ namespace client @@ -296,7 +321,7 @@ namespace client
{
m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL));
m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer,
shared_from_this (), std::placeholders::_1));
shared_from_this (), std::placeholders::_1));
}
}
else
@ -314,7 +339,7 @@ namespace client @@ -314,7 +339,7 @@ namespace client
m_Timer.expires_from_now (boost::posix_time::seconds(SAM_SESSION_READINESS_CHECK_INTERVAL));
m_Timer.async_wait (std::bind (&SAMSocket::HandleSessionReadinessCheckTimer,
shared_from_this (), std::placeholders::_1));
}
}
}
}
@ -327,7 +352,7 @@ namespace client @@ -327,7 +352,7 @@ namespace client
priv[l1] = 0;
#ifdef _MSC_VER
size_t l2 = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv);
#else
#else
size_t l2 = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_CREATE_REPLY_OK, priv);
#endif
SendMessageReply (m_Buffer, l2, false);
@ -341,7 +366,7 @@ namespace client @@ -341,7 +366,7 @@ namespace client
std::string& id = params[SAM_PARAM_ID];
std::string& destination = params[SAM_PARAM_DESTINATION];
std::string& silent = params[SAM_PARAM_SILENT];
if (silent == SAM_VALUE_TRUE) m_IsSilent = true;
if (silent == SAM_VALUE_TRUE) m_IsSilent = true;
m_ID = id;
m_Session = m_Owner.FindSession (id);
if (m_Session)
@ -365,7 +390,7 @@ namespace client @@ -365,7 +390,7 @@ namespace client
SendMessageReply(SAM_SESSION_STATUS_INVALID_KEY, strlen(SAM_SESSION_STATUS_INVALID_KEY), true);
}
else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
}
void SAMSocket::Connect (std::shared_ptr<const i2p::data::LeaseSet> remote)
@ -374,7 +399,7 @@ namespace client @@ -374,7 +399,7 @@ namespace client
m_Session->AddSocket (shared_from_this ());
m_Stream = m_Session->localDestination->CreateStream (remote);
m_Stream->Send ((uint8_t *)m_Buffer, 0); // connect
I2PReceive ();
I2PReceive ();
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
}
@ -396,17 +421,17 @@ namespace client @@ -396,17 +421,17 @@ namespace client
ExtractParams (buf, params);
std::string& id = params[SAM_PARAM_ID];
std::string& silent = params[SAM_PARAM_SILENT];
if (silent == SAM_VALUE_TRUE) m_IsSilent = true;
if (silent == SAM_VALUE_TRUE) m_IsSilent = true;
m_ID = id;
m_Session = m_Owner.FindSession (id);
if (m_Session)
{
{
m_SocketType = eSAMSocketTypeAcceptor;
m_Session->AddSocket (shared_from_this ());
if (!m_Session->localDestination->IsAcceptingStreams ())
m_Session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
}
}
else
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
}
@ -418,9 +443,9 @@ namespace client @@ -418,9 +443,9 @@ namespace client
ExtractParams (buf, params);
size_t size = std::stoi(params[SAM_PARAM_SIZE]), offset = data - buf;
if (offset + size <= len)
{
{
if (m_Session)
{
{
auto d = m_Session->localDestination->GetDatagramDestination ();
if (d)
{
@ -433,24 +458,24 @@ namespace client @@ -433,24 +458,24 @@ namespace client
}
else
LogPrint (eLogError, "SAM: session is not created from DATAGRAM SEND");
}
}
else
{
LogPrint (eLogWarning, "SAM: sent datagram size ", size, " exceeds buffer ", len - offset);
return 0; // try to receive more
}
}
return offset + size;
}
}
void SAMSocket::ProcessDestGenerate ()
{
LogPrint (eLogDebug, "SAM: dest generate");
auto keys = i2p::data::PrivateKeys::CreateRandomKeys ();
#ifdef _MSC_VER
size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY,
size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY,
keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ());
#else
size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY,
#else
size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY,
keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ());
#endif
SendMessageReply (m_Buffer, len, false);
@ -479,45 +504,56 @@ namespace client @@ -479,45 +504,56 @@ namespace client
std::bind (&SAMSocket::HandleNamingLookupLeaseSetRequestComplete,
shared_from_this (), std::placeholders::_1, ident));
}
else
else
{
LogPrint (eLogError, "SAM: naming failed, unknown address ", name);
#ifdef _MSC_VER
size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, name.c_str());
#else
#else
size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY, name.c_str());
#endif
SendMessageReply (m_Buffer, len, false);
}
}
}
void SAMSocket::HandleNamingLookupLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet, i2p::data::IdentHash ident)
void SAMSocket::SendI2PError(const std::string & msg)
{
LogPrint (eLogError, "SAM: i2p error ", msg);
#ifdef _MSC_VER
size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_STATUS_I2P_ERROR, msg.c_str());
#else
size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_SESSION_STATUS_I2P_ERROR, msg.c_str());
#endif
SendMessageReply (m_Buffer, len, true);
}
void SAMSocket::HandleNamingLookupLeaseSetRequestComplete (std::shared_ptr<i2p::data::LeaseSet> leaseSet, i2p::data::IdentHash ident)
{
if (leaseSet)
{
{
context.GetAddressBook ().InsertAddress (leaseSet->GetIdentity ());
SendNamingLookupReply (leaseSet->GetIdentity ());
}
}
else
{
LogPrint (eLogError, "SAM: naming lookup failed. LeaseSet for ", ident.ToBase32 (), " not found");
#ifdef _MSC_VER
size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY,
size_t len = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY,
context.GetAddressBook ().ToAddress (ident).c_str());
#else
#else
size_t len = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY_INVALID_KEY,
context.GetAddressBook ().ToAddress (ident).c_str());
#endif
SendMessageReply (m_Buffer, len, false);
}
}
}
void SAMSocket::SendNamingLookupReply (std::shared_ptr<const i2p::data::IdentityEx> identity)
{
auto base64 = identity->ToBase64 ();
#ifdef _MSC_VER
size_t l = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY, base64.c_str ());
#else
size_t l = sprintf_s (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY, base64.c_str ());
#else
size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_NAMING_REPLY, base64.c_str ());
#endif
SendMessageReply (m_Buffer, l, false);
@ -525,7 +561,7 @@ namespace client @@ -525,7 +561,7 @@ namespace client
void SAMSocket::ExtractParams (char * buf, std::map<std::string, std::string>& params)
{
char * separator;
char * separator;
do
{
separator = strchr (buf, ' ');
@ -536,11 +572,11 @@ namespace client @@ -536,11 +572,11 @@ namespace client
*value = 0;
value++;
params[buf] = value;
}
}
buf = separator + 1;
}
while (separator);
}
}
void SAMSocket::Receive ()
{
@ -550,7 +586,7 @@ namespace client @@ -550,7 +586,7 @@ namespace client
Terminate ();
return;
}
m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
shared_from_this (), std::placeholders::_1, std::placeholders::_2));
}
@ -566,17 +602,17 @@ namespace client @@ -566,17 +602,17 @@ namespace client
else
{
if (m_Stream)
{
{
auto s = shared_from_this ();
m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred,
[s](const boost::system::error_code& ecode)
{
if (!ecode)
s->Receive ();
else
else
s->m_Owner.GetService ().post ([s] { s->Terminate (); });
});
}
}
}
}
@ -586,7 +622,7 @@ namespace client @@ -586,7 +622,7 @@ namespace client
{
if (m_Stream->GetStatus () == i2p::stream::eStreamStatusNew ||
m_Stream->GetStatus () == i2p::stream::eStreamStatusOpen) // regular
{
{
m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE),
std::bind (&SAMSocket::HandleI2PReceive, shared_from_this (),
std::placeholders::_1, std::placeholders::_2),
@ -602,10 +638,10 @@ namespace client @@ -602,10 +638,10 @@ namespace client
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1));
}
else // no more data
Terminate ();
}
Terminate ();
}
}
}
}
void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{
@ -617,17 +653,17 @@ namespace client @@ -617,17 +653,17 @@ namespace client
if (bytes_transferred > 0)
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred),
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1)); // postpone termination
else
{
else
{
auto s = shared_from_this ();
m_Owner.GetService ().post ([s] { s->Terminate (); });
}
}
else
{
else
{
auto s = shared_from_this ();
m_Owner.GetService ().post ([s] { s->Terminate (); });
}
}
}
else
{
@ -658,7 +694,7 @@ namespace client @@ -658,7 +694,7 @@ namespace client
context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ());
auto session = m_Owner.FindSession (m_ID);
if (session)
{
{
// find more pending acceptors
for (auto it: session->ListSockets ())
if (it->m_SocketType == eSAMSocketTypeAcceptor)
@ -674,44 +710,66 @@ namespace client @@ -674,44 +710,66 @@ namespace client
const size_t ident_len = ident_ptr->GetFullLen();
uint8_t* ident = new uint8_t[ident_len];
// send remote peer address as base64
// send remote peer address as base64
const size_t l = ident_ptr->ToBuffer (ident, ident_len);
const size_t l1 = i2p::data::ByteStreamToBase64 (ident, l, (char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE);
delete[] ident;
m_StreamBuffer[l1] = '\n';
HandleI2PReceive (boost::system::error_code (), l1 +1); // we send identity like it has been received from stream
}
}
else
I2PReceive ();
}
else
LogPrint (eLogWarning, "SAM: I2P acceptor has been reset");
}
}
void SAMSocket::HandleI2PDatagramReceive (const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len)
{
LogPrint (eLogDebug, "SAM: datagram received ", len);
auto base64 = from.ToBase64 ();
auto ep = m_Session->UDPEndpoint;
if (ep)
{
// udp forward enabled
size_t bsz = base64.size();
size_t sz = bsz + 1 + len;
// build datagram body
uint8_t * data = new uint8_t[sz];
// Destination
memcpy(data, base64.c_str(), bsz);
// linefeed
data[bsz] = '\n';
// Payload
memcpy(data+bsz+1, buf, len);
// send to remote endpoint
m_Owner.SendTo(data, sz, ep);
delete [] data;
}
else
{
#ifdef _MSC_VER
size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
#else
size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
size_t l = sprintf_s ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
#else
size_t l = snprintf ((char *)m_StreamBuffer, SAM_SOCKET_BUFFER_SIZE, SAM_DATAGRAM_RECEIVED, base64.c_str (), (long unsigned int)len);
#endif
if (len < SAM_SOCKET_BUFFER_SIZE - l)
{
memcpy (m_StreamBuffer + l, buf, len);
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len + l),
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1));
if (len < SAM_SOCKET_BUFFER_SIZE - l)
{
memcpy (m_StreamBuffer + l, buf, len);
boost::asio::async_write (m_Socket, boost::asio::buffer (m_StreamBuffer, len + l),
std::bind (&SAMSocket::HandleWriteI2PData, shared_from_this (), std::placeholders::_1));
}
else
LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer");
}
else
LogPrint (eLogWarning, "SAM: received datagram size ", len," exceeds buffer");
}
SAMSession::SAMSession (std::shared_ptr<ClientDestination> dest):
localDestination (dest)
localDestination (dest),
UDPEndpoint(nullptr)
{
}
SAMSession::~SAMSession ()
{
CloseStreams();
@ -741,7 +799,7 @@ namespace client @@ -741,7 +799,7 @@ namespace client
{
if (m_IsRunning)
Stop ();
}
}
void SAMBridge::Start ()
{
@ -760,26 +818,26 @@ namespace client @@ -760,26 +818,26 @@ namespace client
m_Sessions.clear ();
m_Service.stop ();
if (m_Thread)
{
m_Thread->join ();
{
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
}
}
}
void SAMBridge::Run ()
{
void SAMBridge::Run ()
{
while (m_IsRunning)
{
try
{
{
m_Service.run ();
}
catch (std::exception& ex)
{
LogPrint (eLogError, "SAM: runtime exception: ", ex.what ());
}
}
}
}
}
void SAMBridge::Accept ()
@ -796,7 +854,7 @@ namespace client @@ -796,7 +854,7 @@ namespace client
boost::system::error_code ec;
auto ep = socket->GetSocket ().remote_endpoint (ec);
if (!ec)
{
{
LogPrint (eLogDebug, "SAM: new connection from ", ep);
socket->ReceiveHandshake ();
}
@ -810,10 +868,10 @@ namespace client @@ -810,10 +868,10 @@ namespace client
Accept ();
}
std::shared_ptr<SAMSession> SAMBridge::CreateSession (const std::string& id, const std::string& destination,
std::shared_ptr<SAMSession> SAMBridge::CreateSession (const std::string& id, const std::string& destination,
const std::map<std::string, std::string> * params)
{
std::shared_ptr<ClientDestination> localDestination = nullptr;
std::shared_ptr<ClientDestination> localDestination = nullptr;
if (destination != "")
{
i2p::data::PrivateKeys keys;
@ -828,10 +886,10 @@ namespace client @@ -828,10 +886,10 @@ namespace client
{
auto it = params->find (SAM_PARAM_SIGNATURE_TYPE);
if (it != params->end ())
// TODO: extract string values
// TODO: extract string values
signatureType = std::stoi(it->second);
}
localDestination = i2p::client::context.CreateNewLocalDestination (true, signatureType, params);
localDestination = i2p::client::context.CreateNewLocalDestination (true, signatureType, params);
}
if (localDestination)
{
@ -852,13 +910,13 @@ namespace client @@ -852,13 +910,13 @@ namespace client
std::unique_lock<std::mutex> l(m_SessionsMutex);
auto it = m_Sessions.find (id);
if (it != m_Sessions.end ())
{
{
session = it->second;
m_Sessions.erase (it);
}
}
}
}
if (session)
{
{
session->localDestination->StopAcceptingStreams ();
session->CloseStreams ();
}
@ -873,12 +931,20 @@ namespace client @@ -873,12 +931,20 @@ namespace client
return nullptr;
}
void SAMBridge::SendTo(const uint8_t * buf, size_t len, std::shared_ptr<boost::asio::ip::udp::endpoint> remote)
{
if(remote)
{
m_DatagramSocket.send_to(boost::asio::buffer(buf, len), *remote);
}
}
void SAMBridge::ReceiveDatagram ()
{
m_DatagramSocket.async_receive_from (
boost::asio::buffer (m_DatagramReceiveBuffer, i2p::datagram::MAX_DATAGRAM_SIZE),
boost::asio::buffer (m_DatagramReceiveBuffer, i2p::datagram::MAX_DATAGRAM_SIZE),
m_SenderEndpoint,
std::bind (&SAMBridge::HandleReceivedDatagram, this, std::placeholders::_1, std::placeholders::_2));
std::bind (&SAMBridge::HandleReceivedDatagram, this, std::placeholders::_1, std::placeholders::_2));
}
void SAMBridge::HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred)
@ -888,7 +954,7 @@ namespace client @@ -888,7 +954,7 @@ namespace client
m_DatagramReceiveBuffer[bytes_transferred] = 0;
char * eol = strchr ((char *)m_DatagramReceiveBuffer, '\n');
*eol = 0; eol++;
size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer);
size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer);
LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen);
char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' ');
if (sessionID)
@ -900,12 +966,12 @@ namespace client @@ -900,12 +966,12 @@ namespace client
*destination = 0; destination++;
auto session = FindSession (sessionID);
if (session)
{
{
i2p::data::IdentityEx dest;
dest.FromBase64 (destination);
session->localDestination->GetDatagramDestination ()->
SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ());
}
}
else
LogPrint (eLogError, "SAM: Session ", sessionID, " not found");
}

63
SAM.h

@ -20,45 +20,48 @@ namespace client @@ -20,45 +20,48 @@ namespace client
{
const size_t SAM_SOCKET_BUFFER_SIZE = 8192;
const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds
const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds
const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds
const char SAM_HANDSHAKE[] = "HELLO VERSION";
const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n";
const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n";
const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n";
const char SAM_SESSION_CREATE[] = "SESSION CREATE";
const char SAM_SESSION_CREATE_REPLY_OK[] = "SESSION STATUS RESULT=OK DESTINATION=%s\n";
const char SAM_SESSION_CREATE_DUPLICATED_ID[] = "SESSION STATUS RESULT=DUPLICATED_ID\n";
const char SAM_SESSION_CREATE_DUPLICATED_DEST[] = "SESSION STATUS RESULT=DUPLICATED_DEST\n";
const char SAM_SESSION_CREATE_DUPLICATED_DEST[] = "SESSION STATUS RESULT=DUPLICATED_DEST\n";
const char SAM_SESSION_STATUS_INVALID_KEY[] = "SESSION STATUS RESULT=INVALID_KEY\n";
const char SAM_SESSION_STATUS_I2P_ERROR[] = "SESSION STATUS RESULT=I2P_ERROR MESSAGE=%s\n";
const char SAM_STREAM_CONNECT[] = "STREAM CONNECT";
const char SAM_STREAM_STATUS_OK[] = "STREAM STATUS RESULT=OK\n";
const char SAM_STREAM_STATUS_INVALID_ID[] = "STREAM STATUS RESULT=INVALID_ID\n";
const char SAM_STREAM_STATUS_CANT_REACH_PEER[] = "STREAM STATUS RESULT=CANT_REACH_PEER\n";
const char SAM_STREAM_STATUS_I2P_ERROR[] = "STREAM STATUS RESULT=I2P_ERROR\n";
const char SAM_STREAM_ACCEPT[] = "STREAM ACCEPT";
const char SAM_STREAM_ACCEPT[] = "STREAM ACCEPT";
const char SAM_DATAGRAM_SEND[] = "DATAGRAM SEND";
const char SAM_DEST_GENERATE[] = "DEST GENERATE";
const char SAM_DEST_REPLY[] = "DEST REPLY PUB=%s PRIV=%s\n";
const char SAM_DEST_REPLY[] = "DEST REPLY PUB=%s PRIV=%s\n";
const char SAM_DEST_REPLY_I2P_ERROR[] = "DEST REPLY RESULT=I2P_ERROR\n";
const char SAM_NAMING_LOOKUP[] = "NAMING LOOKUP";
const char SAM_NAMING_REPLY[] = "NAMING REPLY RESULT=OK NAME=ME VALUE=%s\n";
const char SAM_DATAGRAM_RECEIVED[] = "DATAGRAM RECEIVED DESTINATION=%s SIZE=%lu\n";
const char SAM_NAMING_REPLY_INVALID_KEY[] = "NAMING REPLY RESULT=INVALID_KEY NAME=%s\n";
const char SAM_NAMING_REPLY_KEY_NOT_FOUND[] = "NAMING REPLY RESULT=INVALID_KEY_NOT_FOUND NAME=%s\n";
const char SAM_PARAM_MIN[] = "MIN";
const char SAM_PARAM_MAX[] = "MAX";
const char SAM_PARAM_STYLE[] = "STYLE";
const char SAM_PARAM_ID[] = "ID";
const char SAM_PARAM_MIN[] = "MIN";
const char SAM_PARAM_MAX[] = "MAX";
const char SAM_PARAM_STYLE[] = "STYLE";
const char SAM_PARAM_ID[] = "ID";
const char SAM_PARAM_SILENT[] = "SILENT";
const char SAM_PARAM_DESTINATION[] = "DESTINATION";
const char SAM_PARAM_DESTINATION[] = "DESTINATION";
const char SAM_PARAM_NAME[] = "NAME";
const char SAM_PARAM_SIGNATURE_TYPE[] = "SIGNATURE_TYPE";
const char SAM_PARAM_SIGNATURE_TYPE[] = "SIGNATURE_TYPE";
const char SAM_PARAM_SIZE[] = "SIZE";
const char SAM_VALUE_TRANSIENT[] = "TRANSIENT";
const char SAM_VALUE_TRANSIENT[] = "TRANSIENT";
const char SAM_VALUE_STREAM[] = "STREAM";
const char SAM_VALUE_DATAGRAM[] = "DATAGRAM";
const char SAM_VALUE_RAW[] = "RAW";
const char SAM_VALUE_TRUE[] = "true";
const char SAM_VALUE_FALSE[] = "false";
const char SAM_VALUE_RAW[] = "RAW";
const char SAM_VALUE_TRUE[] = "true";
const char SAM_VALUE_FALSE[] = "false";
const char SAM_VALUE_HOST[] = "HOST";
const char SAM_VALUE_PORT[] = "PORT";
enum SAMSocketType
{
@ -76,8 +79,8 @@ namespace client @@ -76,8 +79,8 @@ namespace client
public:
SAMSocket (SAMBridge& owner);
~SAMSocket ();
void CloseStream (); // TODO: implement it better
~SAMSocket ();
void CloseStream (); // TODO: implement it better
boost::asio::ip::tcp::socket& GetSocket () { return m_Socket; };
void ReceiveHandshake ();
@ -86,16 +89,16 @@ namespace client @@ -86,16 +89,16 @@ namespace client
private:
void Terminate ();
void Terminate ();
void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void SendMessageReply (const char * msg, size_t len, bool close);
void SendMessageReply (const char * msg, size_t len, bool close);
void HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close);
void Receive ();
void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void I2PReceive ();
void I2PReceive ();
void HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleI2PAccept (std::shared_ptr<i2p::stream::Stream> stream);
void HandleWriteI2PData (const boost::system::error_code& ecode);
@ -106,7 +109,8 @@ namespace client @@ -106,7 +109,8 @@ namespace client
void ProcessStreamAccept (char * buf, size_t len);
void ProcessDestGenerate ();
void ProcessNamingLookup (char * buf, size_t len);
size_t ProcessDatagramSend (char * buf, size_t len, const char * data); // from SAM 1.0
void SendI2PError(const std::string & msg);
size_t ProcessDatagramSend (char * buf, size_t len, const char * data); // from SAM 1.0
void ExtractParams (char * buf, std::map<std::string, std::string>& params);
void Connect (std::shared_ptr<const i2p::data::LeaseSet> remote);
@ -129,12 +133,13 @@ namespace client @@ -129,12 +133,13 @@ namespace client
bool m_IsSilent;
std::shared_ptr<i2p::stream::Stream> m_Stream;
std::shared_ptr<SAMSession> m_Session;
};
};
struct SAMSession
{
std::shared_ptr<ClientDestination> localDestination;
std::list<std::shared_ptr<SAMSocket> > m_Sockets;
std::shared_ptr<boost::asio::ip::udp::endpoint> UDPEndpoint;
std::mutex m_SocketsMutex;
/** safely add a socket to this session */
@ -158,8 +163,8 @@ namespace client @@ -158,8 +163,8 @@ namespace client
}
return l;
}
SAMSession (std::shared_ptr<ClientDestination> dest);
SAMSession (std::shared_ptr<ClientDestination> dest);
~SAMSession ();
void CloseStreams ();
@ -174,13 +179,16 @@ namespace client @@ -174,13 +179,16 @@ namespace client
void Start ();
void Stop ();
boost::asio::io_service& GetService () { return m_Service; };
std::shared_ptr<SAMSession> CreateSession (const std::string& id, const std::string& destination, // empty string means transient
const std::map<std::string, std::string> * params);
void CloseSession (const std::string& id);
std::shared_ptr<SAMSession> FindSession (const std::string& id) const;
/** send raw data to remote endpoint from our UDP Socket */
void SendTo(const uint8_t * buf, size_t len, std::shared_ptr<boost::asio::ip::udp::endpoint> remote);
private:
void Run ();
@ -194,7 +202,7 @@ namespace client @@ -194,7 +202,7 @@ namespace client
private:
bool m_IsRunning;
std::thread * m_Thread;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::ip::tcp::acceptor m_Acceptor;
boost::asio::ip::udp::endpoint m_DatagramEndpoint, m_SenderEndpoint;
@ -207,9 +215,8 @@ namespace client @@ -207,9 +215,8 @@ namespace client
// for HTTP
const decltype(m_Sessions)& GetSessions () const { return m_Sessions; };
};
};
}
}
#endif

Loading…
Cancel
Save