diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 54da4654..ffdeb5ac 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -601,16 +601,27 @@ namespace client SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); } - else if (session->acceptQueue.size () < SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE) - { - // already accepting, queue up - SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); - session->acceptQueue.push_back (shared_from_this()); - } - else + else { - LogPrint (eLogInfo, "SAM: Session ", m_ID, " accept queue is full ", session->acceptQueue.size ()); - SendStreamI2PError ("Already accepting"); + auto ts = i2p::util::GetSecondsSinceEpoch (); + while (!session->acceptQueue.empty () && session->acceptQueue.front ().second + SAM_SESSION_MAX_ACCEPT_INTERVAL > ts) + { + auto socket = session->acceptQueue.front ().first; + session->acceptQueue.pop_front (); + if (socket) + m_Owner.GetService ().post (std::bind(&SAMSocket::TerminateClose, socket)); + } + if (session->acceptQueue.size () < SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE) + { + // already accepting, queue up + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); + session->acceptQueue.push_back (std::make_pair(shared_from_this(), ts)); + } + else + { + LogPrint (eLogInfo, "SAM: Session ", m_ID, " accept queue is full ", session->acceptQueue.size ()); + SendStreamI2PError ("Already accepting"); + } } } else @@ -1065,12 +1076,23 @@ namespace client if (session && !session->acceptQueue.empty ()) { // pending acceptors - auto socket = session->acceptQueue.front (); - session->acceptQueue.pop_front (); - if (socket && socket->GetSocketType () == eSAMSocketTypeAcceptor) - { - socket->m_IsAccepting = true; - session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, socket, std::placeholders::_1)); + auto ts = i2p::util::GetSecondsSinceEpoch (); + while (!session->acceptQueue.empty () && session->acceptQueue.front ().second + SAM_SESSION_MAX_ACCEPT_INTERVAL > ts) + { + auto socket = session->acceptQueue.front ().first; + session->acceptQueue.pop_front (); + if (socket) + m_Owner.GetService ().post (std::bind(&SAMSocket::TerminateClose, socket)); + } + if (!session->acceptQueue.empty ()) + { + auto socket = session->acceptQueue.front ().first; + session->acceptQueue.pop_front (); + if (socket && socket->GetSocketType () == eSAMSocketTypeAcceptor) + { + socket->m_IsAccepting = true; + session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, socket, std::placeholders::_1)); + } } } if (!m_IsSilent) diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index 2841afb9..34af5a62 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -31,7 +31,8 @@ namespace client const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds const int SAM_SESSION_READINESS_CHECK_INTERVAL = 3; // in seconds - const size_t SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE = 64; + const size_t SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE = 50; + const size_t SAM_SESSION_MAX_ACCEPT_INTERVAL = 3; // in seconds const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; @@ -191,7 +192,7 @@ namespace client std::string Name; SAMSessionType Type; std::shared_ptr UDPEndpoint; // TODO: move - std::list > acceptQueue; + std::list, uint64_t> > acceptQueue; // socket, receive time in seconds SAMSession (SAMBridge & parent, const std::string & name, SAMSessionType type); virtual ~SAMSession () {};