From 27e1579e4caa1dfdd7bc090f89911ae04de6d41d Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 22 Dec 2016 19:38:17 -0500 Subject: [PATCH] rollback --- SAM.cpp | 107 ++++++++++++++------------------------------------------ SAM.h | 19 +++------- 2 files changed, 31 insertions(+), 95 deletions(-) diff --git a/SAM.cpp b/SAM.cpp index c61e8577..cb843685 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -54,7 +54,11 @@ namespace client case eSAMSocketTypeAcceptor: { if (m_Session) + { m_Session->DelSocket (shared_from_this ()); + if (m_Session->localDestination) + m_Session->localDestination->StopAcceptingStreams (); + } break; } default: @@ -285,11 +289,6 @@ namespace client dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); } - else - { - // start accepting streams because we're not a datagram session - m_Session->localDestination->AcceptStreams (std::bind (&SAMSession::AcceptI2P, m_Session, std::placeholders::_1)); - } if (m_Session->localDestination->IsReady ()) SendSessionCreateReplyOk (); @@ -402,25 +401,20 @@ namespace client m_Session = m_Owner.FindSession (id); if (m_Session) { - m_SocketType = eSAMSocketTypeAcceptor; - m_Session->AddSocket (shared_from_this ()); - SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); + if (!m_Session->localDestination->IsAcceptingStreams ()) + { + m_SocketType = eSAMSocketTypeAcceptor; + m_Session->AddSocket (shared_from_this ()); + m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); + } + else + SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); } else SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); } - void SAMSocket::Accept(std::shared_ptr stream) - { - if(stream) { - m_SocketType = eSAMSocketTypeStream; - HandleI2PAccept(stream); - } else { - SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); - auto s = shared_from_this (); - m_Owner.GetService ().post ([s] { s->Terminate (); }); - } - } size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data) { LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len); @@ -665,6 +659,10 @@ namespace client LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID); m_Stream = stream; context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ()); + auto session = m_Owner.FindSession (m_ID); + if (session) + session->localDestination->StopAcceptingStreams (); + m_SocketType = eSAMSocketTypeStream; if (!m_IsSilent) { // get remote peer address @@ -706,76 +704,26 @@ namespace client } SAMSession::SAMSession (std::shared_ptr dest): - localDestination (dest), - m_BacklogPumper(dest->GetService()) - { - PumpBacklog(); - } - - void SAMSession::AcceptI2P(std::shared_ptr stream) - { - if(!stream) return; // fail - std::unique_lock lock(m_SocketsMutex); - if(m_Backlog.size() > SAM_MAX_ACCEPT_BACKLOG) { - stream->Close(); - return; - } - m_Backlog.push_back(stream); - } - - void SAMSession::PumpBacklog() + localDestination (dest) { - // pump backlog every 100ms - boost::posix_time::milliseconds dlt(100); - m_BacklogPumper.expires_from_now(dlt); - m_BacklogPumper.async_wait(std::bind(&SAMSession::HandlePumpBacklog, this, std::placeholders::_1)); - } - - std::shared_ptr SAMSession::FindAcceptor() - { - for (auto & sock : m_Sockets) { - auto t = sock->GetSocketType(); - if(t == eSAMSocketTypeAcceptor) { - return sock; - } - } - return nullptr; } - - void SAMSession::HandlePumpBacklog(const boost::system::error_code & ec) + + SAMSession::~SAMSession () { - if(ec) return; - { - std::unique_lock lock(m_SocketsMutex); - auto itr = m_Backlog.begin(); - while(itr != m_Backlog.end()) { - auto sock = FindAcceptor(); - if (sock) { - sock->Accept(*itr); - itr = m_Backlog.erase(itr); - } else { - ++itr; - } - } - } - PumpBacklog(); + CloseStreams(); + i2p::client::context.DeleteLocalDestination (localDestination); } void SAMSession::CloseStreams () { - m_BacklogPumper.cancel(); - localDestination->GetService().post([&] () { + { std::lock_guard lock(m_SocketsMutex); for (auto& sock : m_Sockets) { sock->CloseStream(); } - for(auto & stream : m_Backlog) { - stream->Close(); - } - m_Sockets.clear(); - m_Backlog.clear(); - i2p::client::context.DeleteLocalDestination (localDestination); - }); + } + // XXX: should this be done inside locked parts? + m_Sockets.clear(); } SAMBridge::SAMBridge (const std::string& address, int port): @@ -886,9 +834,8 @@ namespace client auto session = std::make_shared(localDestination); std::unique_lock l(m_SessionsMutex); auto ret = m_Sessions.insert (std::make_pair(id, session)); - if (!ret.second) { + if (!ret.second) LogPrint (eLogWarning, "SAM: Session ", id, " already exists"); - } return ret.first->second; } return nullptr; diff --git a/SAM.h b/SAM.h index 7d7d6d3a..db08c5a0 100644 --- a/SAM.h +++ b/SAM.h @@ -20,8 +20,7 @@ namespace client { const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds - const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds - const int SAM_MAX_ACCEPT_BACKLOG = 50; + const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n"; @@ -85,8 +84,6 @@ namespace client void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; SAMSocketType GetSocketType () const { return m_SocketType; }; - void Accept(std::shared_ptr stream); - private: void Terminate (); @@ -137,8 +134,6 @@ namespace client struct SAMSession { std::shared_ptr localDestination; - boost::asio::deadline_timer m_BacklogPumper; - std::list > m_Backlog; std::list > m_Sockets; std::mutex m_SocketsMutex; @@ -163,15 +158,9 @@ namespace client } return l; } - - SAMSession (std::shared_ptr dest); - - void AcceptI2P(std::shared_ptr stream); - - std::shared_ptr FindAcceptor(); - - void PumpBacklog(); - void HandlePumpBacklog(const boost::system::error_code & ec); + + SAMSession (std::shared_ptr dest); + ~SAMSession (); void CloseStreams (); };