1
0
mirror of https://github.com/PurpleI2P/i2pd.git synced 2025-01-18 16:49:58 +00:00

STREAM ACCEPT queue

This commit is contained in:
orignal 2023-11-17 13:44:30 -05:00
parent 21259204b1
commit 94255ebaf4
2 changed files with 21 additions and 13 deletions

View File

@ -601,10 +601,15 @@ namespace client
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
} }
else // already accepting else if (session->acceptQueue.size () < SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE)
{
// already accepting, queue up
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
session->acceptQueue.push_back (shared_from_this());
}
else
{ {
// TODO: implement queue LogPrint (eLogInfo, "SAM: Session ", m_ID, " accept queue is full ", session->acceptQueue.size ());
LogPrint (eLogInfo, "SAM: Session ", m_ID, " is already accepting");
SendStreamI2PError ("Already accepting"); SendStreamI2PError ("Already accepting");
} }
} }
@ -1057,16 +1062,16 @@ namespace client
m_Stream = stream; m_Stream = stream;
context.GetAddressBook ().InsertFullAddress (stream->GetRemoteIdentity ()); context.GetAddressBook ().InsertFullAddress (stream->GetRemoteIdentity ());
auto session = m_Owner.FindSession (m_ID); auto session = m_Owner.FindSession (m_ID);
if (session) if (session && !session->acceptQueue.empty ())
{ {
// find more pending acceptors // pending acceptors
for (auto & it: m_Owner.ListSockets (m_ID)) auto socket = session->acceptQueue.front ();
if (it->m_SocketType == eSAMSocketTypeAcceptor) session->acceptQueue.pop_front ();
{ if (socket && socket->GetSocketType () == eSAMSocketTypeAcceptor)
it->m_IsAccepting = true; {
session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, it, std::placeholders::_1)); socket->m_IsAccepting = true;
break; session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, socket, std::placeholders::_1));
} }
} }
if (!m_IsSilent) if (!m_IsSilent)
{ {

View File

@ -31,6 +31,8 @@ namespace client
const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const size_t SAM_SOCKET_BUFFER_SIZE = 8192;
const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds
const int SAM_SESSION_READINESS_CHECK_INTERVAL = 3; // in seconds const int SAM_SESSION_READINESS_CHECK_INTERVAL = 3; // in seconds
const size_t SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE = 64;
const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE[] = "HELLO VERSION";
const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n";
const char SAM_HANDSHAKE_NOVERSION[] = "HELLO REPLY RESULT=NOVERSION\n"; const char SAM_HANDSHAKE_NOVERSION[] = "HELLO REPLY RESULT=NOVERSION\n";
@ -189,7 +191,8 @@ namespace client
std::string Name; std::string Name;
SAMSessionType Type; SAMSessionType Type;
std::shared_ptr<boost::asio::ip::udp::endpoint> UDPEndpoint; // TODO: move std::shared_ptr<boost::asio::ip::udp::endpoint> UDPEndpoint; // TODO: move
std::list<std::shared_ptr<SAMSocket> > acceptQueue;
SAMSession (SAMBridge & parent, const std::string & name, SAMSessionType type); SAMSession (SAMBridge & parent, const std::string & name, SAMSessionType type);
virtual ~SAMSession () {}; virtual ~SAMSession () {};