diff --git a/SAM.cpp b/SAM.cpp index ebb90aeb..5b21af51 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -404,6 +404,7 @@ namespace client { m_SocketType = eSAMSocketTypeAcceptor; m_Session->AddSocket (shared_from_this ()); + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } else SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); @@ -413,7 +414,6 @@ namespace client { if(stream) { m_SocketType = eSAMSocketTypeStream; - SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); HandleI2PAccept(stream); } else { SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); @@ -713,7 +713,6 @@ namespace client SAMSession::~SAMSession () { - CloseStreams(); i2p::client::context.DeleteLocalDestination (localDestination); } @@ -736,17 +735,30 @@ namespace client m_BacklogPumper.async_wait(std::bind(&SAMSession::HandlePumpBacklog, this, std::placeholders::_1)); } + std::shared_ptr SAMSession::FindAcceptor() + { + for (auto & sock : m_Sockets) { + auto t = sock->GetSocketType(); + if(t == eSAMSocketTypeAcceptor) { + return sock; + } + } + return nullptr; + } + void SAMSession::HandlePumpBacklog(const boost::system::error_code & ec) { if(ec) return; - - std::unique_lock lock(m_SocketsMutex); - for(auto & stream : m_Backlog) { - for (auto & sock : m_Sockets) { - auto t = sock->GetSocketType(); - if(t == eSAMSocketTypeAcceptor) { - sock->Accept(stream); - break; + { + std::unique_lock lock(m_SocketsMutex); + auto itr = m_Backlog.begin(); + while(itr != m_Backlog.end()) { + auto sock = FindAcceptor(); + if (sock) { + sock->Accept(*itr); + itr = m_Backlog.erase(itr); + } else { + ++itr; } } } @@ -755,18 +767,18 @@ namespace client void SAMSession::CloseStreams () { - { - std::lock_guard lock(m_SocketsMutex); - for (auto& sock : m_Sockets) { - sock->CloseStream(); - } - for(auto & stream : m_Backlog) { - stream->Close(); - } - } - // XXX: should this be done inside locked parts? - m_Sockets.clear(); - m_Backlog.clear(); + localDestination->GetService().post([&] () { + std::lock_guard lock(m_SocketsMutex); + for (auto& sock : m_Sockets) { + sock->CloseStream(); + } + for(auto & stream : m_Backlog) { + stream->Close(); + } + // XXX: should this be done inside locked parts? + m_Sockets.clear(); + m_Backlog.clear(); + }); } SAMBridge::SAMBridge (const std::string& address, int port): diff --git a/SAM.h b/SAM.h index 26c6ea29..810a1bb0 100644 --- a/SAM.h +++ b/SAM.h @@ -169,6 +169,8 @@ namespace client void AcceptI2P(std::shared_ptr stream); + std::shared_ptr FindAcceptor(); + void PumpBacklog(); void HandlePumpBacklog(const boost::system::error_code & ec);