diff --git a/Datagram.cpp b/Datagram.cpp index 9e314b8d..852e54f6 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -310,7 +310,7 @@ namespace datagram std::vector send; auto routingPath = GetSharedRoutingPath(); // if we don't have a routing path we will drop all queued messages - if(routingPath) + if(routingPath && routingPath->outboundTunnel && routingPath->remoteLease) { for (const auto & msg : m_SendQueue) { diff --git a/SAM.cpp b/SAM.cpp index cb843685..c61e8577 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -54,11 +54,7 @@ namespace client case eSAMSocketTypeAcceptor: { if (m_Session) - { m_Session->DelSocket (shared_from_this ()); - if (m_Session->localDestination) - m_Session->localDestination->StopAcceptingStreams (); - } break; } default: @@ -289,6 +285,11 @@ namespace client dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); } + else + { + // start accepting streams because we're not a datagram session + m_Session->localDestination->AcceptStreams (std::bind (&SAMSession::AcceptI2P, m_Session, std::placeholders::_1)); + } if (m_Session->localDestination->IsReady ()) SendSessionCreateReplyOk (); @@ -401,20 +402,25 @@ namespace client m_Session = m_Owner.FindSession (id); if (m_Session) { - if (!m_Session->localDestination->IsAcceptingStreams ()) - { - m_SocketType = eSAMSocketTypeAcceptor; - m_Session->AddSocket (shared_from_this ()); - m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); - SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); - } - else - SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); + m_SocketType = eSAMSocketTypeAcceptor; + m_Session->AddSocket (shared_from_this ()); + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } else SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); } + void SAMSocket::Accept(std::shared_ptr stream) + { + if(stream) { + m_SocketType = eSAMSocketTypeStream; + HandleI2PAccept(stream); + } else { + SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); + auto s = shared_from_this (); + m_Owner.GetService ().post ([s] { s->Terminate (); }); + } + } size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data) { LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len); @@ -659,10 +665,6 @@ namespace client LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID); m_Stream = stream; context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ()); - auto session = m_Owner.FindSession (m_ID); - if (session) - session->localDestination->StopAcceptingStreams (); - m_SocketType = eSAMSocketTypeStream; if (!m_IsSilent) { // get remote peer address @@ -704,26 +706,76 @@ namespace client } SAMSession::SAMSession (std::shared_ptr dest): - localDestination (dest) + localDestination (dest), + m_BacklogPumper(dest->GetService()) { + PumpBacklog(); } - - SAMSession::~SAMSession () + + void SAMSession::AcceptI2P(std::shared_ptr stream) { - CloseStreams(); - i2p::client::context.DeleteLocalDestination (localDestination); + if(!stream) return; // fail + std::unique_lock lock(m_SocketsMutex); + if(m_Backlog.size() > SAM_MAX_ACCEPT_BACKLOG) { + stream->Close(); + return; + } + m_Backlog.push_back(stream); } - void SAMSession::CloseStreams () + void SAMSession::PumpBacklog() { + // pump backlog every 100ms + boost::posix_time::milliseconds dlt(100); + m_BacklogPumper.expires_from_now(dlt); + m_BacklogPumper.async_wait(std::bind(&SAMSession::HandlePumpBacklog, this, std::placeholders::_1)); + } + + std::shared_ptr SAMSession::FindAcceptor() + { + for (auto & sock : m_Sockets) { + auto t = sock->GetSocketType(); + if(t == eSAMSocketTypeAcceptor) { + return sock; + } + } + return nullptr; + } + + void SAMSession::HandlePumpBacklog(const boost::system::error_code & ec) + { + if(ec) return; { + std::unique_lock lock(m_SocketsMutex); + auto itr = m_Backlog.begin(); + while(itr != m_Backlog.end()) { + auto sock = FindAcceptor(); + if (sock) { + sock->Accept(*itr); + itr = m_Backlog.erase(itr); + } else { + ++itr; + } + } + } + PumpBacklog(); + } + + void SAMSession::CloseStreams () + { + m_BacklogPumper.cancel(); + localDestination->GetService().post([&] () { std::lock_guard lock(m_SocketsMutex); for (auto& sock : m_Sockets) { sock->CloseStream(); } - } - // XXX: should this be done inside locked parts? - m_Sockets.clear(); + for(auto & stream : m_Backlog) { + stream->Close(); + } + m_Sockets.clear(); + m_Backlog.clear(); + i2p::client::context.DeleteLocalDestination (localDestination); + }); } SAMBridge::SAMBridge (const std::string& address, int port): @@ -834,8 +886,9 @@ namespace client auto session = std::make_shared(localDestination); std::unique_lock l(m_SessionsMutex); auto ret = m_Sessions.insert (std::make_pair(id, session)); - if (!ret.second) + if (!ret.second) { LogPrint (eLogWarning, "SAM: Session ", id, " already exists"); + } return ret.first->second; } return nullptr; diff --git a/SAM.h b/SAM.h index db08c5a0..7d7d6d3a 100644 --- a/SAM.h +++ b/SAM.h @@ -20,7 +20,8 @@ namespace client { const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds - const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds + const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds + const int SAM_MAX_ACCEPT_BACKLOG = 50; const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n"; @@ -84,6 +85,8 @@ namespace client void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; SAMSocketType GetSocketType () const { return m_SocketType; }; + void Accept(std::shared_ptr stream); + private: void Terminate (); @@ -134,6 +137,8 @@ namespace client struct SAMSession { std::shared_ptr localDestination; + boost::asio::deadline_timer m_BacklogPumper; + std::list > m_Backlog; std::list > m_Sockets; std::mutex m_SocketsMutex; @@ -158,9 +163,15 @@ namespace client } return l; } - - SAMSession (std::shared_ptr dest); - ~SAMSession (); + + SAMSession (std::shared_ptr dest); + + void AcceptI2P(std::shared_ptr stream); + + std::shared_ptr FindAcceptor(); + + void PumpBacklog(); + void HandlePumpBacklog(const boost::system::error_code & ec); void CloseStreams (); };