From 8f8b928cc49f002b3c05a71d6b1d216e3e09eb76 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 18 Dec 2016 11:49:50 -0500 Subject: [PATCH 1/4] enable multiple acceptors in sam (initial) --- SAM.cpp | 82 ++++++++++++++++++++++++++++++++++++++++++++------------- SAM.h | 16 ++++++++--- 2 files changed, 76 insertions(+), 22 deletions(-) diff --git a/SAM.cpp b/SAM.cpp index 1fc1227a..ebb90aeb 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -54,11 +54,7 @@ namespace client case eSAMSocketTypeAcceptor: { if (m_Session) - { m_Session->DelSocket (shared_from_this ()); - if (m_Session->localDestination) - m_Session->localDestination->StopAcceptingStreams (); - } break; } default: @@ -289,6 +285,11 @@ namespace client dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); } + else + { + // start accepting streams because we're not a datagram session + m_Session->localDestination->AcceptStreams (std::bind (&SAMSession::AcceptI2P, m_Session, std::placeholders::_1)); + } if (m_Session->localDestination->IsReady ()) SendSessionCreateReplyOk (); @@ -401,20 +402,24 @@ namespace client m_Session = m_Owner.FindSession (id); if (m_Session) { - if (!m_Session->localDestination->IsAcceptingStreams ()) - { - m_SocketType = eSAMSocketTypeAcceptor; - m_Session->AddSocket (shared_from_this ()); - m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); - SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); - } - else - SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); + m_SocketType = eSAMSocketTypeAcceptor; + m_Session->AddSocket (shared_from_this ()); } else SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); } + void SAMSocket::Accept(std::shared_ptr stream) + { + if(stream) { + m_SocketType = eSAMSocketTypeStream; + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); + HandleI2PAccept(stream); + } else { + SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); + Terminate(); + } + } size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data) { LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len); @@ -659,10 +664,6 @@ namespace client LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID); m_Stream = stream; context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ()); - auto session = m_Owner.FindSession (m_ID); - if (session) - session->localDestination->StopAcceptingStreams (); - m_SocketType = eSAMSocketTypeStream; if (!m_IsSilent) { // get remote peer address @@ -704,8 +705,10 @@ namespace client } SAMSession::SAMSession (std::shared_ptr dest): - localDestination (dest) + localDestination (dest), + m_BacklogPumper(dest->GetService()) { + PumpBacklog(); } SAMSession::~SAMSession () @@ -714,6 +717,42 @@ namespace client i2p::client::context.DeleteLocalDestination (localDestination); } + void SAMSession::AcceptI2P(std::shared_ptr stream) + { + if(!stream) return; // fail + std::unique_lock lock(m_SocketsMutex); + if(m_Backlog.size() > SAM_MAX_ACCEPT_BACKLOG) { + stream->Close(); + return; + } + m_Backlog.push_back(stream); + } + + void SAMSession::PumpBacklog() + { + // pump backlog every 100ms + boost::posix_time::milliseconds dlt(100); + m_BacklogPumper.expires_from_now(dlt); + m_BacklogPumper.async_wait(std::bind(&SAMSession::HandlePumpBacklog, this, std::placeholders::_1)); + } + + void SAMSession::HandlePumpBacklog(const boost::system::error_code & ec) + { + if(ec) return; + + std::unique_lock lock(m_SocketsMutex); + for(auto & stream : m_Backlog) { + for (auto & sock : m_Sockets) { + auto t = sock->GetSocketType(); + if(t == eSAMSocketTypeAcceptor) { + sock->Accept(stream); + break; + } + } + } + PumpBacklog(); + } + void SAMSession::CloseStreams () { { @@ -721,9 +760,13 @@ namespace client for (auto& sock : m_Sockets) { sock->CloseStream(); } + for(auto & stream : m_Backlog) { + stream->Close(); + } } // XXX: should this be done inside locked parts? m_Sockets.clear(); + m_Backlog.clear(); } SAMBridge::SAMBridge (const std::string& address, int port): @@ -834,8 +877,9 @@ namespace client auto session = std::make_shared(localDestination); std::unique_lock l(m_SessionsMutex); auto ret = m_Sessions.insert (std::make_pair(id, session)); - if (!ret.second) + if (!ret.second) { LogPrint (eLogWarning, "SAM: Session ", id, " already exists"); + } return ret.first->second; } return nullptr; diff --git a/SAM.h b/SAM.h index db08c5a0..26c6ea29 100644 --- a/SAM.h +++ b/SAM.h @@ -20,7 +20,8 @@ namespace client { const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds - const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds + const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds + const int SAM_MAX_ACCEPT_BACKLOG = 50; const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n"; @@ -84,6 +85,8 @@ namespace client void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; SAMSocketType GetSocketType () const { return m_SocketType; }; + void Accept(std::shared_ptr stream); + private: void Terminate (); @@ -134,6 +137,8 @@ namespace client struct SAMSession { std::shared_ptr localDestination; + boost::asio::deadline_timer m_BacklogPumper; + std::list > m_Backlog; std::list > m_Sockets; std::mutex m_SocketsMutex; @@ -158,10 +163,15 @@ namespace client } return l; } - - SAMSession (std::shared_ptr dest); + + SAMSession (std::shared_ptr dest); ~SAMSession (); + void AcceptI2P(std::shared_ptr stream); + + void PumpBacklog(); + void HandlePumpBacklog(const boost::system::error_code & ec); + void CloseStreams (); }; From d5f27ecb0ea61a85da5dcb9178ea589bdd65c174 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 18 Dec 2016 12:28:32 -0500 Subject: [PATCH 2/4] fix termination crash --- SAM.cpp | 56 ++++++++++++++++++++++++++++++++++---------------------- SAM.h | 2 ++ 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/SAM.cpp b/SAM.cpp index ebb90aeb..5b21af51 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -404,6 +404,7 @@ namespace client { m_SocketType = eSAMSocketTypeAcceptor; m_Session->AddSocket (shared_from_this ()); + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } else SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); @@ -413,7 +414,6 @@ namespace client { if(stream) { m_SocketType = eSAMSocketTypeStream; - SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); HandleI2PAccept(stream); } else { SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); @@ -713,7 +713,6 @@ namespace client SAMSession::~SAMSession () { - CloseStreams(); i2p::client::context.DeleteLocalDestination (localDestination); } @@ -736,17 +735,30 @@ namespace client m_BacklogPumper.async_wait(std::bind(&SAMSession::HandlePumpBacklog, this, std::placeholders::_1)); } + std::shared_ptr SAMSession::FindAcceptor() + { + for (auto & sock : m_Sockets) { + auto t = sock->GetSocketType(); + if(t == eSAMSocketTypeAcceptor) { + return sock; + } + } + return nullptr; + } + void SAMSession::HandlePumpBacklog(const boost::system::error_code & ec) { if(ec) return; - - std::unique_lock lock(m_SocketsMutex); - for(auto & stream : m_Backlog) { - for (auto & sock : m_Sockets) { - auto t = sock->GetSocketType(); - if(t == eSAMSocketTypeAcceptor) { - sock->Accept(stream); - break; + { + std::unique_lock lock(m_SocketsMutex); + auto itr = m_Backlog.begin(); + while(itr != m_Backlog.end()) { + auto sock = FindAcceptor(); + if (sock) { + sock->Accept(*itr); + itr = m_Backlog.erase(itr); + } else { + ++itr; } } } @@ -755,18 +767,18 @@ namespace client void SAMSession::CloseStreams () { - { - std::lock_guard lock(m_SocketsMutex); - for (auto& sock : m_Sockets) { - sock->CloseStream(); - } - for(auto & stream : m_Backlog) { - stream->Close(); - } - } - // XXX: should this be done inside locked parts? - m_Sockets.clear(); - m_Backlog.clear(); + localDestination->GetService().post([&] () { + std::lock_guard lock(m_SocketsMutex); + for (auto& sock : m_Sockets) { + sock->CloseStream(); + } + for(auto & stream : m_Backlog) { + stream->Close(); + } + // XXX: should this be done inside locked parts? + m_Sockets.clear(); + m_Backlog.clear(); + }); } SAMBridge::SAMBridge (const std::string& address, int port): diff --git a/SAM.h b/SAM.h index 26c6ea29..810a1bb0 100644 --- a/SAM.h +++ b/SAM.h @@ -169,6 +169,8 @@ namespace client void AcceptI2P(std::shared_ptr stream); + std::shared_ptr FindAcceptor(); + void PumpBacklog(); void HandlePumpBacklog(const boost::system::error_code & ec); From 673b7a95b7ca6cca2a138f36649191a1b34b4779 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 18 Dec 2016 12:56:34 -0500 Subject: [PATCH 3/4] fix sam crash on exit and datagram crash with no outbound tunnel --- Datagram.cpp | 2 +- SAM.cpp | 28 ++++++++++++---------------- SAM.h | 1 - 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/Datagram.cpp b/Datagram.cpp index 9e314b8d..852e54f6 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -310,7 +310,7 @@ namespace datagram std::vector send; auto routingPath = GetSharedRoutingPath(); // if we don't have a routing path we will drop all queued messages - if(routingPath) + if(routingPath && routingPath->outboundTunnel && routingPath->remoteLease) { for (const auto & msg : m_SendQueue) { diff --git a/SAM.cpp b/SAM.cpp index 5b21af51..5bf9a74b 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -710,11 +710,6 @@ namespace client { PumpBacklog(); } - - SAMSession::~SAMSession () - { - i2p::client::context.DeleteLocalDestination (localDestination); - } void SAMSession::AcceptI2P(std::shared_ptr stream) { @@ -767,18 +762,19 @@ namespace client void SAMSession::CloseStreams () { + m_BacklogPumper.cancel(); localDestination->GetService().post([&] () { - std::lock_guard lock(m_SocketsMutex); - for (auto& sock : m_Sockets) { - sock->CloseStream(); - } - for(auto & stream : m_Backlog) { - stream->Close(); - } - // XXX: should this be done inside locked parts? - m_Sockets.clear(); - m_Backlog.clear(); - }); + std::lock_guard lock(m_SocketsMutex); + for (auto& sock : m_Sockets) { + sock->CloseStream(); + } + for(auto & stream : m_Backlog) { + stream->Close(); + } + m_Sockets.clear(); + m_Backlog.clear(); + i2p::client::context.DeleteLocalDestination (localDestination); + }); } SAMBridge::SAMBridge (const std::string& address, int port): diff --git a/SAM.h b/SAM.h index 810a1bb0..7d7d6d3a 100644 --- a/SAM.h +++ b/SAM.h @@ -165,7 +165,6 @@ namespace client } SAMSession (std::shared_ptr dest); - ~SAMSession (); void AcceptI2P(std::shared_ptr stream); From eb9ea97e2158b3fc2ff51c9d4b40adc13839604b Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 18 Dec 2016 13:01:28 -0500 Subject: [PATCH 4/4] don't crash --- SAM.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/SAM.cpp b/SAM.cpp index 5bf9a74b..e109e3ab 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -417,7 +417,8 @@ namespace client HandleI2PAccept(stream); } else { SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); - Terminate(); + auto s = shared_from_this (); + m_Owner.GetService ().post ([s] { s->Terminate (); }); } } size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data)