mirror of
https://github.com/PurpleI2P/i2pd.git
synced 2025-01-22 08:14:15 +00:00
rollback
This commit is contained in:
parent
f2c401b6c0
commit
27e1579e4c
107
SAM.cpp
107
SAM.cpp
@ -54,7 +54,11 @@ namespace client
|
|||||||
case eSAMSocketTypeAcceptor:
|
case eSAMSocketTypeAcceptor:
|
||||||
{
|
{
|
||||||
if (m_Session)
|
if (m_Session)
|
||||||
|
{
|
||||||
m_Session->DelSocket (shared_from_this ());
|
m_Session->DelSocket (shared_from_this ());
|
||||||
|
if (m_Session->localDestination)
|
||||||
|
m_Session->localDestination->StopAcceptingStreams ();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -285,11 +289,6 @@ namespace client
|
|||||||
dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (),
|
dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (),
|
||||||
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
|
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
// start accepting streams because we're not a datagram session
|
|
||||||
m_Session->localDestination->AcceptStreams (std::bind (&SAMSession::AcceptI2P, m_Session, std::placeholders::_1));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (m_Session->localDestination->IsReady ())
|
if (m_Session->localDestination->IsReady ())
|
||||||
SendSessionCreateReplyOk ();
|
SendSessionCreateReplyOk ();
|
||||||
@ -402,25 +401,20 @@ namespace client
|
|||||||
m_Session = m_Owner.FindSession (id);
|
m_Session = m_Owner.FindSession (id);
|
||||||
if (m_Session)
|
if (m_Session)
|
||||||
{
|
{
|
||||||
m_SocketType = eSAMSocketTypeAcceptor;
|
if (!m_Session->localDestination->IsAcceptingStreams ())
|
||||||
m_Session->AddSocket (shared_from_this ());
|
{
|
||||||
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
|
m_SocketType = eSAMSocketTypeAcceptor;
|
||||||
|
m_Session->AddSocket (shared_from_this ());
|
||||||
|
m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1));
|
||||||
|
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
|
SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SAMSocket::Accept(std::shared_ptr<i2p::stream::Stream> stream)
|
|
||||||
{
|
|
||||||
if(stream) {
|
|
||||||
m_SocketType = eSAMSocketTypeStream;
|
|
||||||
HandleI2PAccept(stream);
|
|
||||||
} else {
|
|
||||||
SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true);
|
|
||||||
auto s = shared_from_this ();
|
|
||||||
m_Owner.GetService ().post ([s] { s->Terminate (); });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data)
|
size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data)
|
||||||
{
|
{
|
||||||
LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len);
|
LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len);
|
||||||
@ -665,6 +659,10 @@ namespace client
|
|||||||
LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID);
|
LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID);
|
||||||
m_Stream = stream;
|
m_Stream = stream;
|
||||||
context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ());
|
context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ());
|
||||||
|
auto session = m_Owner.FindSession (m_ID);
|
||||||
|
if (session)
|
||||||
|
session->localDestination->StopAcceptingStreams ();
|
||||||
|
m_SocketType = eSAMSocketTypeStream;
|
||||||
if (!m_IsSilent)
|
if (!m_IsSilent)
|
||||||
{
|
{
|
||||||
// get remote peer address
|
// get remote peer address
|
||||||
@ -706,76 +704,26 @@ namespace client
|
|||||||
}
|
}
|
||||||
|
|
||||||
SAMSession::SAMSession (std::shared_ptr<ClientDestination> dest):
|
SAMSession::SAMSession (std::shared_ptr<ClientDestination> dest):
|
||||||
localDestination (dest),
|
localDestination (dest)
|
||||||
m_BacklogPumper(dest->GetService())
|
|
||||||
{
|
{
|
||||||
PumpBacklog();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void SAMSession::AcceptI2P(std::shared_ptr<i2p::stream::Stream> stream)
|
SAMSession::~SAMSession ()
|
||||||
{
|
{
|
||||||
if(!stream) return; // fail
|
CloseStreams();
|
||||||
std::unique_lock<std::mutex> lock(m_SocketsMutex);
|
i2p::client::context.DeleteLocalDestination (localDestination);
|
||||||
if(m_Backlog.size() > SAM_MAX_ACCEPT_BACKLOG) {
|
|
||||||
stream->Close();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
m_Backlog.push_back(stream);
|
|
||||||
}
|
|
||||||
|
|
||||||
void SAMSession::PumpBacklog()
|
|
||||||
{
|
|
||||||
// pump backlog every 100ms
|
|
||||||
boost::posix_time::milliseconds dlt(100);
|
|
||||||
m_BacklogPumper.expires_from_now(dlt);
|
|
||||||
m_BacklogPumper.async_wait(std::bind(&SAMSession::HandlePumpBacklog, this, std::placeholders::_1));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<SAMSocket> SAMSession::FindAcceptor()
|
|
||||||
{
|
|
||||||
for (auto & sock : m_Sockets) {
|
|
||||||
auto t = sock->GetSocketType();
|
|
||||||
if(t == eSAMSocketTypeAcceptor) {
|
|
||||||
return sock;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
void SAMSession::HandlePumpBacklog(const boost::system::error_code & ec)
|
|
||||||
{
|
|
||||||
if(ec) return;
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(m_SocketsMutex);
|
|
||||||
auto itr = m_Backlog.begin();
|
|
||||||
while(itr != m_Backlog.end()) {
|
|
||||||
auto sock = FindAcceptor();
|
|
||||||
if (sock) {
|
|
||||||
sock->Accept(*itr);
|
|
||||||
itr = m_Backlog.erase(itr);
|
|
||||||
} else {
|
|
||||||
++itr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
PumpBacklog();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void SAMSession::CloseStreams ()
|
void SAMSession::CloseStreams ()
|
||||||
{
|
{
|
||||||
m_BacklogPumper.cancel();
|
{
|
||||||
localDestination->GetService().post([&] () {
|
|
||||||
std::lock_guard<std::mutex> lock(m_SocketsMutex);
|
std::lock_guard<std::mutex> lock(m_SocketsMutex);
|
||||||
for (auto& sock : m_Sockets) {
|
for (auto& sock : m_Sockets) {
|
||||||
sock->CloseStream();
|
sock->CloseStream();
|
||||||
}
|
}
|
||||||
for(auto & stream : m_Backlog) {
|
}
|
||||||
stream->Close();
|
// XXX: should this be done inside locked parts?
|
||||||
}
|
m_Sockets.clear();
|
||||||
m_Sockets.clear();
|
|
||||||
m_Backlog.clear();
|
|
||||||
i2p::client::context.DeleteLocalDestination (localDestination);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SAMBridge::SAMBridge (const std::string& address, int port):
|
SAMBridge::SAMBridge (const std::string& address, int port):
|
||||||
@ -886,9 +834,8 @@ namespace client
|
|||||||
auto session = std::make_shared<SAMSession>(localDestination);
|
auto session = std::make_shared<SAMSession>(localDestination);
|
||||||
std::unique_lock<std::mutex> l(m_SessionsMutex);
|
std::unique_lock<std::mutex> l(m_SessionsMutex);
|
||||||
auto ret = m_Sessions.insert (std::make_pair(id, session));
|
auto ret = m_Sessions.insert (std::make_pair(id, session));
|
||||||
if (!ret.second) {
|
if (!ret.second)
|
||||||
LogPrint (eLogWarning, "SAM: Session ", id, " already exists");
|
LogPrint (eLogWarning, "SAM: Session ", id, " already exists");
|
||||||
}
|
|
||||||
return ret.first->second;
|
return ret.first->second;
|
||||||
}
|
}
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
19
SAM.h
19
SAM.h
@ -20,8 +20,7 @@ namespace client
|
|||||||
{
|
{
|
||||||
const size_t SAM_SOCKET_BUFFER_SIZE = 8192;
|
const size_t SAM_SOCKET_BUFFER_SIZE = 8192;
|
||||||
const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds
|
const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds
|
||||||
const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds
|
const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds
|
||||||
const int SAM_MAX_ACCEPT_BACKLOG = 50;
|
|
||||||
const char SAM_HANDSHAKE[] = "HELLO VERSION";
|
const char SAM_HANDSHAKE[] = "HELLO VERSION";
|
||||||
const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n";
|
const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n";
|
||||||
const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n";
|
const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n";
|
||||||
@ -85,8 +84,6 @@ namespace client
|
|||||||
void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; };
|
void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; };
|
||||||
SAMSocketType GetSocketType () const { return m_SocketType; };
|
SAMSocketType GetSocketType () const { return m_SocketType; };
|
||||||
|
|
||||||
void Accept(std::shared_ptr<i2p::stream::Stream> stream);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
void Terminate ();
|
void Terminate ();
|
||||||
@ -137,8 +134,6 @@ namespace client
|
|||||||
struct SAMSession
|
struct SAMSession
|
||||||
{
|
{
|
||||||
std::shared_ptr<ClientDestination> localDestination;
|
std::shared_ptr<ClientDestination> localDestination;
|
||||||
boost::asio::deadline_timer m_BacklogPumper;
|
|
||||||
std::list<std::shared_ptr<i2p::stream::Stream> > m_Backlog;
|
|
||||||
std::list<std::shared_ptr<SAMSocket> > m_Sockets;
|
std::list<std::shared_ptr<SAMSocket> > m_Sockets;
|
||||||
std::mutex m_SocketsMutex;
|
std::mutex m_SocketsMutex;
|
||||||
|
|
||||||
@ -163,15 +158,9 @@ namespace client
|
|||||||
}
|
}
|
||||||
return l;
|
return l;
|
||||||
}
|
}
|
||||||
|
|
||||||
SAMSession (std::shared_ptr<ClientDestination> dest);
|
SAMSession (std::shared_ptr<ClientDestination> dest);
|
||||||
|
~SAMSession ();
|
||||||
void AcceptI2P(std::shared_ptr<i2p::stream::Stream> stream);
|
|
||||||
|
|
||||||
std::shared_ptr<SAMSocket> FindAcceptor();
|
|
||||||
|
|
||||||
void PumpBacklog();
|
|
||||||
void HandlePumpBacklog(const boost::system::error_code & ec);
|
|
||||||
|
|
||||||
void CloseStreams ();
|
void CloseStreams ();
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user