1
0
mirror of https://github.com/PurpleI2P/i2pd.git synced 2025-01-11 17:37:53 +00:00

Initial SAM cleanup

This commit is contained in:
Jeff Becker 2018-04-24 09:45:16 -04:00
parent fa154cc4d6
commit 4643c92d33
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
6 changed files with 118 additions and 153 deletions

View File

@ -649,7 +649,7 @@ namespace http {
s << i2p::client::context.GetAddressBook ().ToAddress(ident) << "</a><br>\r\n"; s << i2p::client::context.GetAddressBook ().ToAddress(ident) << "</a><br>\r\n";
s << "<br>\r\n"; s << "<br>\r\n";
s << "<b>Streams:</b><br>\r\n"; s << "<b>Streams:</b><br>\r\n";
for (const auto& it: session->ListSockets()) for (const auto& it: sam->ListSockets(id))
{ {
switch (it->GetSocketType ()) switch (it->GetSocketType ())
{ {

View File

@ -727,7 +727,7 @@ namespace client
sam_session.put("name", name); sam_session.put("name", name);
sam_session.put("address", i2p::client::context.GetAddressBook ().ToAddress(ident)); sam_session.put("address", i2p::client::context.GetAddressBook ().ToAddress(ident));
for (const auto& socket: it.second->ListSockets()) for (const auto& socket: sam->ListSockets(it.first))
{ {
boost::property_tree::ptree stream; boost::property_tree::ptree stream;
stream.put("type", socket->GetSocketType ()); stream.put("type", socket->GetSocketType ());

View File

@ -578,9 +578,7 @@ namespace stream
if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send
{ {
m_Status = eStreamStatusClosed; m_Status = eStreamStatusClosed;
// close could be called from another thread so do SendClose from the destination thread SendClose();
// this is so m_LocalDestination.NewPacket () does not trigger a race condition
m_Service.post(std::bind(&Stream::SendClose, shared_from_this()));
} }
break; break;
case eStreamStatusClosed: case eStreamStatusClosed:

View File

@ -165,6 +165,9 @@ namespace stream
void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0);
size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); }; size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); };
void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); };
/** only call close from destination thread, use Stream::AsyncClose for other threads */
void Close (); void Close ();
void Cancel () { m_ReceiveTimer.cancel (); }; void Cancel () { m_ReceiveTimer.cancel (); };

View File

@ -15,8 +15,8 @@ namespace i2p
{ {
namespace client namespace client
{ {
SAMSocket::SAMSocket (SAMBridge& owner, std::shared_ptr<Socket_t> socket): SAMSocket::SAMSocket (SAMBridge& owner):
m_Owner (owner), m_Socket(socket), m_Timer (m_Owner.GetService ()), m_Owner (owner), m_Socket(owner.GetService()), m_Timer (m_Owner.GetService ()),
m_BufferOffset (0), m_BufferOffset (0),
m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false),
m_IsAccepting (false), m_Stream (nullptr) m_IsAccepting (false), m_Stream (nullptr)
@ -25,51 +25,18 @@ namespace client
SAMSocket::~SAMSocket () SAMSocket::~SAMSocket ()
{ {
if(m_Stream) m_Stream.reset ();
{ if (m_Socket.is_open()) m_Socket.close ();
m_Stream->Close ();
m_Stream.reset ();
}
auto Session = m_Owner.FindSession(m_ID);
switch (m_SocketType)
{
case eSAMSocketTypeSession:
m_Owner.CloseSession (m_ID);
break;
case eSAMSocketTypeStream:
{
if (Session)
Session->DelSocket (this);
break;
}
case eSAMSocketTypeAcceptor:
{
if (Session)
{
Session->DelSocket (this);
if (m_IsAccepting && Session->localDestination)
Session->localDestination->StopAcceptingStreams ();
}
break;
}
default:
;
}
m_SocketType = eSAMSocketTypeTerminated;
if (m_Socket && m_Socket->is_open()) m_Socket->close ();
m_Socket.reset ();
} }
void SAMSocket::Terminate (const char* reason) void SAMSocket::Terminate (const char* reason)
{ {
if(m_Stream) if(m_Stream)
{ {
m_Stream->Close (); m_Stream->AsyncClose ();
m_Stream.reset (); m_Stream.reset ();
} }
auto Session = m_Owner.FindSession(m_ID); auto Session = m_Owner.FindSession(m_ID);
switch (m_SocketType) switch (m_SocketType)
{ {
case eSAMSocketTypeSession: case eSAMSocketTypeSession:
@ -77,15 +44,12 @@ namespace client
break; break;
case eSAMSocketTypeStream: case eSAMSocketTypeStream:
{ {
if (Session)
Session->DelSocket (this);
break; break;
} }
case eSAMSocketTypeAcceptor: case eSAMSocketTypeAcceptor:
{ {
if (Session) if (Session)
{ {
Session->DelSocket (this);
if (m_IsAccepting && Session->localDestination) if (m_IsAccepting && Session->localDestination)
Session->localDestination->StopAcceptingStreams (); Session->localDestination->StopAcceptingStreams ();
} }
@ -95,16 +59,15 @@ namespace client
; ;
} }
m_SocketType = eSAMSocketTypeTerminated; m_SocketType = eSAMSocketTypeTerminated;
if (m_Socket && m_Socket->is_open()) m_Socket->close (); if (m_Socket.is_open()) m_Socket.close ();
m_Socket.reset (); m_Owner.RemoveSocket(this);
} }
void SAMSocket::ReceiveHandshake () void SAMSocket::ReceiveHandshake ()
{ {
if(m_Socket) m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (),
std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), std::placeholders::_1, std::placeholders::_2));
std::placeholders::_1, std::placeholders::_2));
} }
static bool SAMVersionAcceptable(const std::string & ver) static bool SAMVersionAcceptable(const std::string & ver)
@ -125,7 +88,7 @@ namespace client
void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{ {
if (ecode) if (ecode)
{ {
LogPrint (eLogError, "SAM: handshake read error: ", ecode.message ()); LogPrint (eLogError, "SAM: handshake read error: ", ecode.message ());
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
Terminate ("SAM: handshake read error"); Terminate ("SAM: handshake read error");
@ -184,7 +147,7 @@ namespace client
#else #else
size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ()); size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ());
#endif #endif
boost::asio::async_write (*m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (),
std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (), std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (),
std::placeholders::_1, std::placeholders::_2)); std::placeholders::_1, std::placeholders::_2));
} }
@ -199,17 +162,22 @@ namespace client
} }
} }
bool SAMSocket::IsSession(const std::string & id) const
{
return id == m_ID;
}
void SAMSocket::HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred) void SAMSocket::HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred)
{ {
if (ecode) if (ecode)
{ {
LogPrint (eLogError, "SAM: handshake reply send error: ", ecode.message ()); LogPrint (eLogError, "SAM: handshake reply send error: ", ecode.message ());
if (ecode != boost::asio::error::operation_aborted) if (ecode != boost::asio::error::operation_aborted)
Terminate ("SAM: handshake reply send error"); Terminate ("SAM: handshake reply send error");
} }
else if(m_Socket) else
{ {
m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE),
std::bind(&SAMSocket::HandleMessage, shared_from_this (), std::bind(&SAMSocket::HandleMessage, shared_from_this (),
std::placeholders::_1, std::placeholders::_2)); std::placeholders::_1, std::placeholders::_2));
} }
@ -220,7 +188,7 @@ namespace client
LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg); LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg);
if (!m_IsSilent) if (!m_IsSilent)
boost::asio::async_write (*m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (),
std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (), std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (),
std::placeholders::_1, std::placeholders::_2, close)); std::placeholders::_1, std::placeholders::_2, close));
else else
@ -501,7 +469,6 @@ namespace client
if(session) if(session)
{ {
m_SocketType = eSAMSocketTypeStream; m_SocketType = eSAMSocketTypeStream;
session->AddSocket (shared_from_this ());
m_Stream = session->localDestination->CreateStream (remote); m_Stream = session->localDestination->CreateStream (remote);
m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send
m_BufferOffset = 0; m_BufferOffset = 0;
@ -534,7 +501,6 @@ namespace client
if (session) if (session)
{ {
m_SocketType = eSAMSocketTypeAcceptor; m_SocketType = eSAMSocketTypeAcceptor;
session->AddSocket (shared_from_this ());
if (!session->localDestination->IsAcceptingStreams ()) if (!session->localDestination->IsAcceptingStreams ())
{ {
m_IsAccepting = true; m_IsAccepting = true;
@ -704,17 +670,9 @@ namespace client
void SAMSocket::Receive () void SAMSocket::Receive ()
{ {
if (m_BufferOffset >= SAM_SOCKET_BUFFER_SIZE) m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
{ std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
LogPrint (eLogError, "SAM: Buffer is full, terminate"); shared_from_this (), std::placeholders::_1, std::placeholders::_2));
Terminate ("Buffer is full");
return;
} else if (m_Socket)
m_Socket->async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset),
std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage,
shared_from_this (), std::placeholders::_1, std::placeholders::_2));
else
LogPrint(eLogError, "SAM: receive with no native socket");
} }
void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred)
@ -731,15 +689,12 @@ namespace client
{ {
bytes_transferred += m_BufferOffset; bytes_transferred += m_BufferOffset;
m_BufferOffset = 0; m_BufferOffset = 0;
auto s = shared_from_this ();
m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred, m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred,
[s](const boost::system::error_code& ecode) std::bind(&SAMSocket::HandleStreamSend, shared_from_this(), std::placeholders::_1));
{ }
if (!ecode) else
s->m_Owner.GetService ().post ([s] { s->Receive (); }); {
else Terminate("No Stream Remaining");
s->m_Owner.GetService ().post ([s] { s->Terminate ("AsyncSend failed"); });
});
} }
} }
} }
@ -773,14 +728,11 @@ namespace client
void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz) void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz)
{ {
if(m_Socket) boost::asio::async_write (
boost::asio::async_write ( m_Socket,
*m_Socket, boost::asio::buffer (buff, sz),
boost::asio::buffer (buff, sz), boost::asio::transfer_all(),
boost::asio::transfer_all(), std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination
std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination
else
LogPrint(eLogError, "SAM: no native socket");
} }
void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff) void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff)
@ -858,7 +810,7 @@ namespace client
if (session) if (session)
{ {
// find more pending acceptors // find more pending acceptors
for (auto it: session->ListSockets ()) for (auto it: m_Owner.ListSockets (m_ID))
if (it->m_SocketType == eSAMSocketTypeAcceptor) if (it->m_SocketType == eSAMSocketTypeAcceptor)
{ {
it->m_IsAccepting = true; it->m_IsAccepting = true;
@ -930,12 +882,19 @@ namespace client
} }
} }
SAMSession::SAMSession (std::shared_ptr<ClientDestination> dest): void SAMSocket::HandleStreamSend(const boost::system::error_code & ec)
{
m_Owner.GetService ().post (std::bind( !ec ? &SAMSocket::Receive : &SAMSocket::TerminateClose, shared_from_this()));
}
SAMSession::SAMSession (SAMBridge & parent, const std::string & id, std::shared_ptr<ClientDestination> dest):
m_Bridge(parent),
localDestination (dest), localDestination (dest),
UDPEndpoint(nullptr) UDPEndpoint(nullptr),
Name(id)
{ {
} }
SAMSession::~SAMSession () SAMSession::~SAMSession ()
{ {
CloseStreams(); CloseStreams();
@ -944,15 +903,10 @@ namespace client
void SAMSession::CloseStreams () void SAMSession::CloseStreams ()
{ {
std::vector<std::shared_ptr<SAMSocket> > socks; for(const auto & itr : m_Bridge.ListSockets(Name))
{ {
std::lock_guard<std::mutex> lock(m_SocketsMutex); itr->Terminate(nullptr);
for (const auto& sock : m_Sockets) {
socks.push_back(sock);
}
} }
for (auto & sock : socks ) sock->Terminate("SAMSession::CloseStreams()");
m_Sockets.clear();
} }
SAMBridge::SAMBridge (const std::string& address, int port): SAMBridge::SAMBridge (const std::string& address, int port):
@ -1009,12 +963,16 @@ namespace client
void SAMBridge::Accept () void SAMBridge::Accept ()
{ {
auto native = std::make_shared<boost::asio::ip::tcp::socket>(m_Service); auto newSocket = std::make_shared<SAMSocket>(*this);
auto newSocket = std::make_shared<SAMSocket> (*this, native); m_Acceptor.async_accept (newSocket->GetSocket(), std::bind (&SAMBridge::HandleAccept, this,
m_Acceptor.async_accept (*native, std::bind (&SAMBridge::HandleAccept, this,
std::placeholders::_1, newSocket)); std::placeholders::_1, newSocket));
} }
void SAMBridge::RemoveSocket(const SAMSocket * socket)
{
m_OpenSockets.remove_if([socket](const std::shared_ptr<SAMSocket> & item) -> bool { return item.get() == socket; });
}
void SAMBridge::HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<SAMSocket> socket) void SAMBridge::HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<SAMSocket> socket)
{ {
if (!ecode) if (!ecode)
@ -1024,6 +982,7 @@ namespace client
if (!ec) if (!ec)
{ {
LogPrint (eLogDebug, "SAM: new connection from ", ep); LogPrint (eLogDebug, "SAM: new connection from ", ep);
m_OpenSockets.push_back(socket);
socket->ReceiveHandshake (); socket->ReceiveHandshake ();
} }
else else
@ -1066,7 +1025,7 @@ namespace client
if (localDestination) if (localDestination)
{ {
localDestination->Acquire (); localDestination->Acquire ();
auto session = std::make_shared<SAMSession>(localDestination); auto session = std::make_shared<SAMSession>(*this, id, localDestination);
std::unique_lock<std::mutex> l(m_SessionsMutex); std::unique_lock<std::mutex> l(m_SessionsMutex);
auto ret = m_Sessions.insert (std::make_pair(id, session)); auto ret = m_Sessions.insert (std::make_pair(id, session));
if (!ret.second) if (!ret.second)
@ -1105,6 +1064,18 @@ namespace client
return nullptr; return nullptr;
} }
std::list<std::shared_ptr<SAMSocket> > SAMBridge::ListSockets(const std::string & id) const
{
std::list<std::shared_ptr<SAMSocket > > list;
{
std::unique_lock<std::mutex> l(m_SessionsMutex);
for (const auto & itr : m_OpenSockets)
if (itr->IsSession(id))
list.push_back(itr);
}
return list;
}
void SAMBridge::SendTo(const uint8_t * buf, size_t len, std::shared_ptr<boost::asio::ip::udp::endpoint> remote) void SAMBridge::SendTo(const uint8_t * buf, size_t len, std::shared_ptr<boost::asio::ip::udp::endpoint> remote)
{ {
if(remote) if(remote)
@ -1127,33 +1098,38 @@ namespace client
{ {
m_DatagramReceiveBuffer[bytes_transferred] = 0; m_DatagramReceiveBuffer[bytes_transferred] = 0;
char * eol = strchr ((char *)m_DatagramReceiveBuffer, '\n'); char * eol = strchr ((char *)m_DatagramReceiveBuffer, '\n');
*eol = 0; eol++; if(eol)
size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer);
LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen);
char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' ');
if (sessionID)
{ {
sessionID++; *eol = 0; eol++;
char * destination = strchr (sessionID, ' '); size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer);
if (destination) LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen);
char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' ');
if (sessionID)
{ {
*destination = 0; destination++; sessionID++;
auto session = FindSession (sessionID); char * destination = strchr (sessionID, ' ');
if (session) if (destination)
{ {
i2p::data::IdentityEx dest; *destination = 0; destination++;
dest.FromBase64 (destination); auto session = FindSession (sessionID);
session->localDestination->GetDatagramDestination ()-> if (session)
SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ()); {
i2p::data::IdentityEx dest;
dest.FromBase64 (destination);
session->localDestination->GetDatagramDestination ()->
SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ());
}
else
LogPrint (eLogError, "SAM: Session ", sessionID, " not found");
} }
else else
LogPrint (eLogError, "SAM: Session ", sessionID, " not found"); LogPrint (eLogError, "SAM: Missing destination key");
} }
else else
LogPrint (eLogError, "SAM: Missing destination key"); LogPrint (eLogError, "SAM: Missing sessionID");
} }
else else
LogPrint (eLogError, "SAM: Missing sessionID"); LogPrint(eLogError, "SAM: invalid datagram");
ReceiveDatagram (); ReceiveDatagram ();
} }
else else

View File

@ -77,24 +77,27 @@ namespace client
class SAMBridge; class SAMBridge;
struct SAMSession; struct SAMSession;
class SAMSocket: public std::enable_shared_from_this<SAMSocket> class SAMSocket :public std::enable_shared_from_this<SAMSocket>
{ {
public: public:
typedef boost::asio::ip::tcp::socket Socket_t; typedef boost::asio::ip::tcp::socket Socket_t;
SAMSocket (SAMBridge& owner, std::shared_ptr<Socket_t> socket); SAMSocket (SAMBridge& owner);
~SAMSocket (); ~SAMSocket ();
boost::asio::ip::tcp::socket& GetSocket () { return *m_Socket; }; Socket_t& GetSocket () { return m_Socket; };
void ReceiveHandshake (); void ReceiveHandshake ();
void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; };
SAMSocketType GetSocketType () const { return m_SocketType; }; SAMSocketType GetSocketType () const { return m_SocketType; };
void Terminate (const char* reason); void Terminate (const char* reason);
bool IsSession(const std::string & id) const;
private: private:
void TerminateClose() { Terminate(nullptr); }
void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred);
void SendMessageReply (const char * msg, size_t len, bool close); void SendMessageReply (const char * msg, size_t len, bool close);
@ -128,10 +131,12 @@ namespace client
void WriteI2PDataImmediate(uint8_t * ptr, size_t sz); void WriteI2PDataImmediate(uint8_t * ptr, size_t sz);
void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff); void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff);
void HandleStreamSend(const boost::system::error_code & ec);
private: private:
SAMBridge& m_Owner; SAMBridge& m_Owner;
std::shared_ptr<Socket_t> m_Socket; Socket_t m_Socket;
boost::asio::deadline_timer m_Timer; boost::asio::deadline_timer m_Timer;
char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1]; char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1];
size_t m_BufferOffset; size_t m_BufferOffset;
@ -145,34 +150,12 @@ namespace client
struct SAMSession struct SAMSession
{ {
SAMBridge & m_Bridge;
std::shared_ptr<ClientDestination> localDestination; std::shared_ptr<ClientDestination> localDestination;
std::list<std::shared_ptr<SAMSocket> > m_Sockets;
std::shared_ptr<boost::asio::ip::udp::endpoint> UDPEndpoint; std::shared_ptr<boost::asio::ip::udp::endpoint> UDPEndpoint;
std::mutex m_SocketsMutex; std::string Name;
/** safely add a socket to this session */ SAMSession (SAMBridge & parent, const std::string & name, std::shared_ptr<ClientDestination> dest);
void AddSocket(std::shared_ptr<SAMSocket> sock) {
std::lock_guard<std::mutex> lock(m_SocketsMutex);
m_Sockets.push_back(sock);
}
/** safely remove a socket from this session */
void DelSocket(SAMSocket * sock) {
std::lock_guard<std::mutex> lock(m_SocketsMutex);
m_Sockets.remove_if([sock](const std::shared_ptr<SAMSocket> s) -> bool { return s.get() == sock; });
}
/** get a list holding a copy of all sam sockets from this session */
std::list<std::shared_ptr<SAMSocket> > ListSockets() {
std::list<std::shared_ptr<SAMSocket> > l;
{
std::lock_guard<std::mutex> lock(m_SocketsMutex);
for(const auto& sock : m_Sockets ) l.push_back(sock);
}
return l;
}
SAMSession (std::shared_ptr<ClientDestination> dest);
~SAMSession (); ~SAMSession ();
void CloseStreams (); void CloseStreams ();
@ -194,15 +177,19 @@ namespace client
void CloseSession (const std::string& id); void CloseSession (const std::string& id);
std::shared_ptr<SAMSession> FindSession (const std::string& id) const; std::shared_ptr<SAMSession> FindSession (const std::string& id) const;
std::list<std::shared_ptr<SAMSocket> > ListSockets(const std::string & id) const;
/** send raw data to remote endpoint from our UDP Socket */ /** send raw data to remote endpoint from our UDP Socket */
void SendTo(const uint8_t * buf, size_t len, std::shared_ptr<boost::asio::ip::udp::endpoint> remote); void SendTo(const uint8_t * buf, size_t len, std::shared_ptr<boost::asio::ip::udp::endpoint> remote);
void RemoveSocket(const SAMSocket * socket);
private: private:
void Run (); void Run ();
void Accept (); void Accept ();
void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<SAMSocket> socket); void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr<SAMSocket> socket);
void ReceiveDatagram (); void ReceiveDatagram ();
void HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred);
@ -217,6 +204,7 @@ namespace client
boost::asio::ip::udp::socket m_DatagramSocket; boost::asio::ip::udp::socket m_DatagramSocket;
mutable std::mutex m_SessionsMutex; mutable std::mutex m_SessionsMutex;
std::map<std::string, std::shared_ptr<SAMSession> > m_Sessions; std::map<std::string, std::shared_ptr<SAMSession> > m_Sessions;
std::list<std::shared_ptr<SAMSocket> > m_OpenSockets;
uint8_t m_DatagramReceiveBuffer[i2p::datagram::MAX_DATAGRAM_SIZE+1]; uint8_t m_DatagramReceiveBuffer[i2p::datagram::MAX_DATAGRAM_SIZE+1];
public: public: