From 0ad30785247a4bddcb79c8d594b11c52f718c4d6 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 22 Dec 2016 10:08:35 -0500 Subject: [PATCH 01/15] open log stream in log thread --- Log.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Log.cpp b/Log.cpp index a38fc808..7cd4205e 100644 --- a/Log.cpp +++ b/Log.cpp @@ -72,7 +72,6 @@ namespace log { { if (!m_IsRunning) { - Reopen (); m_IsRunning = true; m_Thread = new std::thread (std::bind (&Log::Run, this)); } @@ -162,6 +161,7 @@ namespace log { void Log::Run () { + Reopen (); while (m_IsRunning) { std::shared_ptr msg; From 5babfb0f1eb123a37f2082b2dc6044dac6bf1f57 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 22 Dec 2016 10:52:26 -0500 Subject: [PATCH 02/15] fixed #724 --- DaemonLinux.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/DaemonLinux.cpp b/DaemonLinux.cpp index edd15f07..07e27e9d 100644 --- a/DaemonLinux.cpp +++ b/DaemonLinux.cpp @@ -76,12 +76,10 @@ namespace i2p return false; } -#if !defined(__OpenBSD__) // point std{in,out,err} descriptors to /dev/null - stdin = freopen("/dev/null", "r", stdin); - stdout = freopen("/dev/null", "w", stdout); - stderr = freopen("/dev/null", "w", stderr); -#endif + freopen("/dev/null", "r", stdin); + freopen("/dev/null", "w", stdout); + freopen("/dev/null", "w", stderr); } // set proc limits From 442c63d7a4a447a120400fcb3428bfc2c1ec18cc Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 22 Dec 2016 13:32:06 -0500 Subject: [PATCH 03/15] #746. initialize io_service after daeminization --- Transports.cpp | 59 +++++++++++++++++++++++++++++++------------------- Transports.h | 8 +++---- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/Transports.cpp b/Transports.cpp index 1b9d52a1..b422297f 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -108,8 +108,8 @@ namespace transport Transports transports; Transports::Transports (): - m_IsOnline (true), m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), - m_PeerCleanupTimer (m_Service), m_PeerTestTimer (m_Service), + m_IsOnline (true), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), + m_Work (nullptr), m_PeerCleanupTimer (nullptr), m_PeerTestTimer (nullptr), m_NTCPServer (nullptr), m_SSUServer (nullptr), m_DHKeysPairSupplier (5), // 5 pre-generated keys m_TotalSentBytes(0), m_TotalReceivedBytes(0), m_InBandwidth (0), m_OutBandwidth (0), m_LastInBandwidthUpdateBytes (0), m_LastOutBandwidthUpdateBytes (0), m_LastBandwidthUpdateTime (0) @@ -119,10 +119,25 @@ namespace transport Transports::~Transports () { Stop (); + if (m_Service) + { + delete m_PeerCleanupTimer; m_PeerCleanupTimer = nullptr; + delete m_PeerTestTimer; m_PeerTestTimer = nullptr; + delete m_Work; m_Work = nullptr; + delete m_Service; m_Service = nullptr; + } } void Transports::Start (bool enableNTCP, bool enableSSU) { + if (!m_Service) + { + m_Service = new boost::asio::io_service (); + m_Work = new boost::asio::io_service::work (*m_Service); + m_PeerCleanupTimer = new boost::asio::deadline_timer (*m_Service); + m_PeerTestTimer = new boost::asio::deadline_timer (*m_Service); + } + m_DHKeysPairSupplier.Start (); m_IsRunning = true; m_Thread = new std::thread (std::bind (&Transports::Run, this)); @@ -167,16 +182,16 @@ namespace transport LogPrint (eLogError, "Transports: SSU server already exists"); } } - m_PeerCleanupTimer.expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT)); - m_PeerCleanupTimer.async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); - m_PeerTestTimer.expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL)); - m_PeerTestTimer.async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1)); + m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT)); + m_PeerCleanupTimer->async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); + m_PeerTestTimer->expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL)); + m_PeerTestTimer->async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1)); } void Transports::Stop () { - m_PeerCleanupTimer.cancel (); - m_PeerTestTimer.cancel (); + if (m_PeerCleanupTimer) m_PeerCleanupTimer->cancel (); + if (m_PeerTestTimer) m_PeerTestTimer->cancel (); m_Peers.clear (); if (m_SSUServer) { @@ -193,7 +208,7 @@ namespace transport m_DHKeysPairSupplier.Stop (); m_IsRunning = false; - m_Service.stop (); + if (m_Service) m_Service->stop (); if (m_Thread) { m_Thread->join (); @@ -204,11 +219,11 @@ namespace transport void Transports::Run () { - while (m_IsRunning) + while (m_IsRunning && m_Service) { try { - m_Service.run (); + m_Service->run (); } catch (std::exception& ex) { @@ -251,7 +266,7 @@ namespace transport #ifdef WITH_EVENTS EmitEvent({{"type" , "transport.sendmsg"}, {"ident", ident.ToBase64()}, {"number", std::to_string(msgs.size())}}); #endif - m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs)); + m_Service->post (std::bind (&Transports::PostMessages, this, ident, msgs)); } void Transports::PostMessages (i2p::data::IdentHash ident, std::vector > msgs) @@ -386,7 +401,7 @@ namespace transport void Transports::RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident) { - m_Service.post (std::bind (&Transports::HandleRequestComplete, this, r, ident)); + m_Service->post (std::bind (&Transports::HandleRequestComplete, this, r, ident)); } void Transports::HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident) @@ -411,7 +426,7 @@ namespace transport void Transports::NTCPResolve (const std::string& addr, const i2p::data::IdentHash& ident) { - auto resolver = std::make_shared(m_Service); + auto resolver = std::make_shared(*m_Service); resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""), std::bind (&Transports::HandleNTCPResolve, this, std::placeholders::_1, std::placeholders::_2, ident, resolver)); @@ -454,7 +469,7 @@ namespace transport void Transports::SSUResolve (const std::string& addr, const i2p::data::IdentHash& ident) { - auto resolver = std::make_shared(m_Service); + auto resolver = std::make_shared(*m_Service); resolver->async_resolve (boost::asio::ip::tcp::resolver::query (addr, ""), std::bind (&Transports::HandleSSUResolve, this, std::placeholders::_1, std::placeholders::_2, ident, resolver)); @@ -497,7 +512,7 @@ namespace transport void Transports::CloseSession (std::shared_ptr router) { if (!router) return; - m_Service.post (std::bind (&Transports::PostCloseSession, this, router)); + m_Service->post (std::bind (&Transports::PostCloseSession, this, router)); } void Transports::PostCloseSession (std::shared_ptr router) @@ -584,7 +599,7 @@ namespace transport void Transports::PeerConnected (std::shared_ptr session) { - m_Service.post([session, this]() + m_Service->post([session, this]() { auto remoteIdentity = session->GetRemoteIdentity (); if (!remoteIdentity) return; @@ -632,7 +647,7 @@ namespace transport void Transports::PeerDisconnected (std::shared_ptr session) { - m_Service.post([session, this]() + m_Service->post([session, this]() { auto remoteIdentity = session->GetRemoteIdentity (); if (!remoteIdentity) return; @@ -690,8 +705,8 @@ namespace transport UpdateBandwidth (); // TODO: use separate timer(s) for it if (i2p::context.GetStatus () == eRouterStatusTesting) // if still testing, repeat peer test DetectExternalIP (); - m_PeerCleanupTimer.expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT)); - m_PeerCleanupTimer.async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); + m_PeerCleanupTimer->expires_from_now (boost::posix_time::seconds(5*SESSION_CREATION_TIMEOUT)); + m_PeerCleanupTimer->async_wait (std::bind (&Transports::HandlePeerCleanupTimer, this, std::placeholders::_1)); } } @@ -700,8 +715,8 @@ namespace transport if (ecode != boost::asio::error::operation_aborted) { PeerTest (); - m_PeerTestTimer.expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL)); - m_PeerTestTimer.async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1)); + m_PeerTestTimer->expires_from_now (boost::posix_time::minutes(PEER_TEST_INTERVAL)); + m_PeerTestTimer->async_wait (std::bind (&Transports::HandlePeerTestTimer, this, std::placeholders::_1)); } } diff --git a/Transports.h b/Transports.h index d83c0370..a68838e2 100644 --- a/Transports.h +++ b/Transports.h @@ -84,7 +84,7 @@ namespace transport bool IsOnline() const { return m_IsOnline; }; void SetOnline (bool online) { m_IsOnline = online; }; - boost::asio::io_service& GetService () { return m_Service; }; + boost::asio::io_service& GetService () { return *m_Service; }; std::shared_ptr GetNextDHKeysPair (); void ReuseDHKeysPair (std::shared_ptr pair); @@ -144,9 +144,9 @@ namespace transport bool m_IsOnline, m_IsRunning; std::thread * m_Thread; - boost::asio::io_service m_Service; - boost::asio::io_service::work m_Work; - boost::asio::deadline_timer m_PeerCleanupTimer, m_PeerTestTimer; + boost::asio::io_service * m_Service; + boost::asio::io_service::work * m_Work; + boost::asio::deadline_timer * m_PeerCleanupTimer, * m_PeerTestTimer; NTCPServer * m_NTCPServer; SSUServer * m_SSUServer; From f2c401b6c008d190e38386d0d58752043dae23eb Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 22 Dec 2016 15:00:40 -0500 Subject: [PATCH 04/15] fixed some memory leak --- util.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/util.cpp b/util.cpp index ca2b61c2..5cc1fdb5 100644 --- a/util.cpp +++ b/util.cpp @@ -275,14 +275,15 @@ namespace net if (cur_ifname == ifname && cur->ifa_addr && cur->ifa_addr->sa_family == af) { // match - char * addr = new char[INET6_ADDRSTRLEN]; - bzero(addr, INET6_ADDRSTRLEN); - if(af == AF_INET) - inet_ntop(af, &((sockaddr_in *)cur->ifa_addr)->sin_addr, addr, INET6_ADDRSTRLEN); - else - inet_ntop(af, &((sockaddr_in6 *)cur->ifa_addr)->sin6_addr, addr, INET6_ADDRSTRLEN); - freeifaddrs(addrs); - std::string cur_ifaddr(addr); + char * addr = new char[INET6_ADDRSTRLEN]; + bzero(addr, INET6_ADDRSTRLEN); + if(af == AF_INET) + inet_ntop(af, &((sockaddr_in *)cur->ifa_addr)->sin_addr, addr, INET6_ADDRSTRLEN); + else + inet_ntop(af, &((sockaddr_in6 *)cur->ifa_addr)->sin6_addr, addr, INET6_ADDRSTRLEN); + freeifaddrs(addrs); + std::string cur_ifaddr(addr); + delete[] addr; return boost::asio::ip::address::from_string(cur_ifaddr); } cur = cur->ifa_next; From 27e1579e4caa1dfdd7bc090f89911ae04de6d41d Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 22 Dec 2016 19:38:17 -0500 Subject: [PATCH 05/15] rollback --- SAM.cpp | 107 ++++++++++++++------------------------------------------ SAM.h | 19 +++------- 2 files changed, 31 insertions(+), 95 deletions(-) diff --git a/SAM.cpp b/SAM.cpp index c61e8577..cb843685 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -54,7 +54,11 @@ namespace client case eSAMSocketTypeAcceptor: { if (m_Session) + { m_Session->DelSocket (shared_from_this ()); + if (m_Session->localDestination) + m_Session->localDestination->StopAcceptingStreams (); + } break; } default: @@ -285,11 +289,6 @@ namespace client dest->SetReceiver (std::bind (&SAMSocket::HandleI2PDatagramReceive, shared_from_this (), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); } - else - { - // start accepting streams because we're not a datagram session - m_Session->localDestination->AcceptStreams (std::bind (&SAMSession::AcceptI2P, m_Session, std::placeholders::_1)); - } if (m_Session->localDestination->IsReady ()) SendSessionCreateReplyOk (); @@ -402,25 +401,20 @@ namespace client m_Session = m_Owner.FindSession (id); if (m_Session) { - m_SocketType = eSAMSocketTypeAcceptor; - m_Session->AddSocket (shared_from_this ()); - SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); + if (!m_Session->localDestination->IsAcceptingStreams ()) + { + m_SocketType = eSAMSocketTypeAcceptor; + m_Session->AddSocket (shared_from_this ()); + m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); + } + else + SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); } else SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); } - void SAMSocket::Accept(std::shared_ptr stream) - { - if(stream) { - m_SocketType = eSAMSocketTypeStream; - HandleI2PAccept(stream); - } else { - SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); - auto s = shared_from_this (); - m_Owner.GetService ().post ([s] { s->Terminate (); }); - } - } size_t SAMSocket::ProcessDatagramSend (char * buf, size_t len, const char * data) { LogPrint (eLogDebug, "SAM: datagram send: ", buf, " ", len); @@ -665,6 +659,10 @@ namespace client LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID); m_Stream = stream; context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ()); + auto session = m_Owner.FindSession (m_ID); + if (session) + session->localDestination->StopAcceptingStreams (); + m_SocketType = eSAMSocketTypeStream; if (!m_IsSilent) { // get remote peer address @@ -706,76 +704,26 @@ namespace client } SAMSession::SAMSession (std::shared_ptr dest): - localDestination (dest), - m_BacklogPumper(dest->GetService()) - { - PumpBacklog(); - } - - void SAMSession::AcceptI2P(std::shared_ptr stream) - { - if(!stream) return; // fail - std::unique_lock lock(m_SocketsMutex); - if(m_Backlog.size() > SAM_MAX_ACCEPT_BACKLOG) { - stream->Close(); - return; - } - m_Backlog.push_back(stream); - } - - void SAMSession::PumpBacklog() + localDestination (dest) { - // pump backlog every 100ms - boost::posix_time::milliseconds dlt(100); - m_BacklogPumper.expires_from_now(dlt); - m_BacklogPumper.async_wait(std::bind(&SAMSession::HandlePumpBacklog, this, std::placeholders::_1)); - } - - std::shared_ptr SAMSession::FindAcceptor() - { - for (auto & sock : m_Sockets) { - auto t = sock->GetSocketType(); - if(t == eSAMSocketTypeAcceptor) { - return sock; - } - } - return nullptr; } - - void SAMSession::HandlePumpBacklog(const boost::system::error_code & ec) + + SAMSession::~SAMSession () { - if(ec) return; - { - std::unique_lock lock(m_SocketsMutex); - auto itr = m_Backlog.begin(); - while(itr != m_Backlog.end()) { - auto sock = FindAcceptor(); - if (sock) { - sock->Accept(*itr); - itr = m_Backlog.erase(itr); - } else { - ++itr; - } - } - } - PumpBacklog(); + CloseStreams(); + i2p::client::context.DeleteLocalDestination (localDestination); } void SAMSession::CloseStreams () { - m_BacklogPumper.cancel(); - localDestination->GetService().post([&] () { + { std::lock_guard lock(m_SocketsMutex); for (auto& sock : m_Sockets) { sock->CloseStream(); } - for(auto & stream : m_Backlog) { - stream->Close(); - } - m_Sockets.clear(); - m_Backlog.clear(); - i2p::client::context.DeleteLocalDestination (localDestination); - }); + } + // XXX: should this be done inside locked parts? + m_Sockets.clear(); } SAMBridge::SAMBridge (const std::string& address, int port): @@ -886,9 +834,8 @@ namespace client auto session = std::make_shared(localDestination); std::unique_lock l(m_SessionsMutex); auto ret = m_Sessions.insert (std::make_pair(id, session)); - if (!ret.second) { + if (!ret.second) LogPrint (eLogWarning, "SAM: Session ", id, " already exists"); - } return ret.first->second; } return nullptr; diff --git a/SAM.h b/SAM.h index 7d7d6d3a..db08c5a0 100644 --- a/SAM.h +++ b/SAM.h @@ -20,8 +20,7 @@ namespace client { const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds - const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds - const int SAM_MAX_ACCEPT_BACKLOG = 50; + const int SAM_SESSION_READINESS_CHECK_INTERVAL = 20; // in seconds const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; const char SAM_HANDSHAKE_I2P_ERROR[] = "HELLO REPLY RESULT=I2P_ERROR\n"; @@ -85,8 +84,6 @@ namespace client void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; SAMSocketType GetSocketType () const { return m_SocketType; }; - void Accept(std::shared_ptr stream); - private: void Terminate (); @@ -137,8 +134,6 @@ namespace client struct SAMSession { std::shared_ptr localDestination; - boost::asio::deadline_timer m_BacklogPumper; - std::list > m_Backlog; std::list > m_Sockets; std::mutex m_SocketsMutex; @@ -163,15 +158,9 @@ namespace client } return l; } - - SAMSession (std::shared_ptr dest); - - void AcceptI2P(std::shared_ptr stream); - - std::shared_ptr FindAcceptor(); - - void PumpBacklog(); - void HandlePumpBacklog(const boost::system::error_code & ec); + + SAMSession (std::shared_ptr dest); + ~SAMSession (); void CloseStreams (); }; From 213629ef52c87f637c1bb5e57b81ec858078255b Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 22 Dec 2016 20:30:50 -0500 Subject: [PATCH 06/15] drop highest bit for token --- HTTPServer.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 84818e8e..af5dfe73 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -775,6 +775,7 @@ namespace http { { uint32_t token; RAND_bytes ((uint8_t *)&token, 4); + token |= 0x7FFFFFFF; // clear first bit auto ts = i2p::util::GetSecondsSinceEpoch (); for (auto it = m_Tokens.begin (); it != m_Tokens.end (); ) { From 573ee0b5849262bea7a3fb0f0ffc2006dc344fd4 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 22 Dec 2016 20:34:06 -0500 Subject: [PATCH 07/15] fixed typo --- HTTPServer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index af5dfe73..48ec9bd2 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -775,7 +775,7 @@ namespace http { { uint32_t token; RAND_bytes ((uint8_t *)&token, 4); - token |= 0x7FFFFFFF; // clear first bit + token &= 0x7FFFFFFF; // clear first bit auto ts = i2p::util::GetSecondsSinceEpoch (); for (auto it = m_Tokens.begin (); it != m_Tokens.end (); ) { From 88a48a5c793b9336363872b5c1073cf6d2595f38 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 23 Dec 2016 10:09:40 -0500 Subject: [PATCH 08/15] implement AcceptOnce for multiple acceptors --- Streaming.cpp | 23 +++++++++++++++++++++++ Streaming.h | 2 ++ 2 files changed, 25 insertions(+) diff --git a/Streaming.cpp b/Streaming.cpp index 762dd9d6..ff14aadb 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -1037,6 +1037,29 @@ namespace stream m_Acceptor = nullptr; } + void StreamingDestination::AcceptOnce (const Acceptor& acceptor) + { + m_Owner->GetService ().post([acceptor, this](void) + { + if (!m_PendingIncomingStreams.empty ()) + { + acceptor (m_PendingIncomingStreams.front ()); + m_PendingIncomingStreams.pop_front (); + if (m_PendingIncomingStreams.empty ()) + m_PendingIncomingTimer.cancel (); + } + else // we must save old acceptor and set it back + { + auto oldAcceptor = m_Acceptor; + m_Acceptor = [acceptor, oldAcceptor, this](std::shared_ptr stream) + { + acceptor (stream); + m_Acceptor = oldAcceptor; + }; + } + }); + } + void StreamingDestination::HandlePendingIncomingTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) diff --git a/Streaming.h b/Streaming.h index 65a8d8b4..1593f961 100644 --- a/Streaming.h +++ b/Streaming.h @@ -223,6 +223,8 @@ namespace stream void SetAcceptor (const Acceptor& acceptor); void ResetAcceptor (); bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; + void AcceptOnce (const Acceptor& acceptor); + std::shared_ptr GetOwner () const { return m_Owner; }; void SetOwner (std::shared_ptr owner) { m_Owner = owner; }; uint16_t GetLocalPort () const { return m_LocalPort; }; From b363b50320cbea66420da184e7b62b046a4e7570 Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 24 Dec 2016 08:53:35 -0500 Subject: [PATCH 09/15] multiple acceptors --- Destination.cpp | 6 ++++++ Destination.h | 3 ++- SAM.cpp | 17 +++++++++++++---- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/Destination.cpp b/Destination.cpp index 491a9f29..f3cb9af8 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -844,6 +844,12 @@ namespace client return false; } + void ClientDestination::AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor) + { + if (m_StreamingDestination) + m_StreamingDestination->AcceptOnce (acceptor); + } + std::shared_ptr ClientDestination::CreateStreamingDestination (int port, bool gzip) { auto dest = std::make_shared (GetSharedFromThis (), port, gzip); diff --git a/Destination.h b/Destination.h index 1ccb0a9c..7a4e0b64 100644 --- a/Destination.h +++ b/Destination.h @@ -186,7 +186,8 @@ namespace client void AcceptStreams (const i2p::stream::StreamingDestination::Acceptor& acceptor); void StopAcceptingStreams (); bool IsAcceptingStreams () const; - + void AcceptOnce (const i2p::stream::StreamingDestination::Acceptor& acceptor); + // datagram i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; }; i2p::datagram::DatagramDestination * CreateDatagramDestination (); diff --git a/SAM.cpp b/SAM.cpp index cb843685..aa7de1c8 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -405,7 +405,8 @@ namespace client { m_SocketType = eSAMSocketTypeAcceptor; m_Session->AddSocket (shared_from_this ()); - m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); + if (!m_Session->localDestination->IsAcceptingStreams ()) + m_Session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } else @@ -657,12 +658,20 @@ namespace client if (stream) { LogPrint (eLogDebug, "SAM: incoming I2P connection for session ", m_ID); + m_SocketType = eSAMSocketTypeStream; m_Stream = stream; context.GetAddressBook ().InsertAddress (stream->GetRemoteIdentity ()); auto session = m_Owner.FindSession (m_ID); - if (session) - session->localDestination->StopAcceptingStreams (); - m_SocketType = eSAMSocketTypeStream; + if (session) + { + // find more pending acceptors + for (auto it: session->ListSockets ()) + if (it->m_SocketType == eSAMSocketTypeAcceptor) + { + session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); + break; + } + } if (!m_IsSilent) { // get remote peer address From 8f51dc2c228b0ce534be15a3bbfb4ab32dc417af Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 24 Dec 2016 09:55:59 -0500 Subject: [PATCH 10/15] reload acceptor with correct stream --- SAM.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SAM.cpp b/SAM.cpp index aa7de1c8..20f565cc 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -668,7 +668,7 @@ namespace client for (auto it: session->ListSockets ()) if (it->m_SocketType == eSAMSocketTypeAcceptor) { - session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); + session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, it, std::placeholders::_1)); break; } } From 652226dbf020685b024e0b1b2a1aa0afd0e34b45 Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 24 Dec 2016 16:34:18 -0500 Subject: [PATCH 11/15] allow multiple acceptors --- SAM.cpp | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/SAM.cpp b/SAM.cpp index 20f565cc..cb704ebd 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -400,17 +400,12 @@ namespace client m_ID = id; m_Session = m_Owner.FindSession (id); if (m_Session) - { + { + m_SocketType = eSAMSocketTypeAcceptor; + m_Session->AddSocket (shared_from_this ()); if (!m_Session->localDestination->IsAcceptingStreams ()) - { - m_SocketType = eSAMSocketTypeAcceptor; - m_Session->AddSocket (shared_from_this ()); - if (!m_Session->localDestination->IsAcceptingStreams ()) - m_Session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); - SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); - } - else - SendMessageReply (SAM_STREAM_STATUS_I2P_ERROR, strlen(SAM_STREAM_STATUS_I2P_ERROR), true); + m_Session->localDestination->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } else SendMessageReply (SAM_STREAM_STATUS_INVALID_ID, strlen(SAM_STREAM_STATUS_INVALID_ID), true); From 5bc2001ce3fd2dc679ed5a82c65419d4f5f4c4a2 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sat, 24 Dec 2016 16:05:44 -0500 Subject: [PATCH 12/15] Fix Tunnel Gateway Leak --- TunnelGateway.cpp | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/TunnelGateway.cpp b/TunnelGateway.cpp index ad423fc0..9f13d84c 100644 --- a/TunnelGateway.cpp +++ b/TunnelGateway.cpp @@ -20,6 +20,7 @@ namespace tunnel TunnelGatewayBuffer::~TunnelGatewayBuffer () { + ClearTunnelDataMsgs (); } void TunnelGatewayBuffer::PutI2NPMsg (const TunnelMessageBlock& block) @@ -48,7 +49,7 @@ namespace tunnel di[0] = block.deliveryType << 5; // set delivery type // create fragments - std::shared_ptr msg = block.data; + const std::shared_ptr & msg = block.data; size_t fullMsgLen = diLen + msg->GetLength () + 2; // delivery instructions + payload + 2 bytes length if (fullMsgLen <= m_RemainingSize) { @@ -115,9 +116,13 @@ namespace tunnel m_CurrentTunnelDataMsg->len += s+7; if (isLastFragment) { - m_RemainingSize -= s+7; - if (!m_RemainingSize) - CompleteCurrentTunnelDataMessage (); + if(m_RemainingSize < (s+7)) { + LogPrint (eLogError, "TunnelGateway: remaining size overflow: ", m_RemainingSize, " < ", s+7); + } else { + m_RemainingSize -= s+7; + if (m_RemainingSize == 0) + CompleteCurrentTunnelDataMessage (); + } } else CompleteCurrentTunnelDataMessage (); @@ -138,10 +143,12 @@ namespace tunnel void TunnelGatewayBuffer::ClearTunnelDataMsgs () { m_TunnelDataMsgs.clear (); + m_CurrentTunnelDataMsg = nullptr; } void TunnelGatewayBuffer::CreateCurrentTunnelDataMessage () { + m_CurrentTunnelDataMsg = nullptr; m_CurrentTunnelDataMsg = NewI2NPShortMessage (); m_CurrentTunnelDataMsg->Align (12); // we reserve space for padding @@ -196,7 +203,7 @@ namespace tunnel void TunnelGateway::SendBuffer () { m_Buffer.CompleteCurrentTunnelDataMessage (); - auto tunnelMsgs = m_Buffer.GetTunnelDataMsgs (); + const auto & tunnelMsgs = m_Buffer.GetTunnelDataMsgs (); for (auto& tunnelMsg : tunnelMsgs) { m_Tunnel->EncryptTunnelMsg (tunnelMsg, tunnelMsg); From 59dd479a6d3460fea9042e244c4348f4399a93e4 Mon Sep 17 00:00:00 2001 From: orignal Date: Mon, 26 Dec 2016 17:19:54 -0500 Subject: [PATCH 13/15] check if address not found --- AddressBook.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/AddressBook.cpp b/AddressBook.cpp index 685acbc2..4dad5614 100644 --- a/AddressBook.cpp +++ b/AddressBook.cpp @@ -636,7 +636,11 @@ namespace client if (address.length () > 0) { // TODO: verify from - m_Addresses[address] = buf + 8; + i2p::data::IdentHash hash(buf + 8); + if (!hash.IsZero ()) + m_Addresses[address] = hash; + else + LogPrint (eLogInfo, "AddressBook: Lookup response: ", address, " not found"); } } From f3d4077142bc0b57c22be3d2b66ba64bd5651165 Mon Sep 17 00:00:00 2001 From: Jeff Date: Mon, 26 Dec 2016 18:47:47 -0500 Subject: [PATCH 14/15] dont re-request LS --- Datagram.cpp | 9 +++++++-- Datagram.h | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Datagram.cpp b/Datagram.cpp index 852e54f6..db29e83c 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -168,7 +168,8 @@ namespace datagram const i2p::data::IdentHash & remoteIdent) : m_LocalDestination(localDestination), m_RemoteIdent(remoteIdent), - m_SendQueueTimer(localDestination->GetService()) + m_SendQueueTimer(localDestination->GetService()), + m_RequestingLS(false) { m_LastUse = i2p::util::GetMillisecondsSinceEpoch (); ScheduleFlushSendQueue(); @@ -221,7 +222,10 @@ namespace datagram } if(!m_RemoteLeaseSet) { // no remote lease set - m_LocalDestination->RequestDestination(m_RemoteIdent, std::bind(&DatagramSession::HandleLeaseSetUpdated, this, std::placeholders::_1)); + if(!m_RequestingLS) { + m_RequestingLS = true; + m_LocalDestination->RequestDestination(m_RemoteIdent, std::bind(&DatagramSession::HandleLeaseSetUpdated, this, std::placeholders::_1)); + } return nullptr; } m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); @@ -290,6 +294,7 @@ namespace datagram void DatagramSession::HandleLeaseSetUpdated(std::shared_ptr ls) { + m_RequestingLS = false; if(!ls) return; // only update lease set if found and newer than previous lease set uint64_t oldExpire = 0; diff --git a/Datagram.h b/Datagram.h index 8891f0cc..2eb180d6 100644 --- a/Datagram.h +++ b/Datagram.h @@ -88,7 +88,7 @@ namespace datagram boost::asio::deadline_timer m_SendQueueTimer; std::vector > m_SendQueue; uint64_t m_LastUse; - + bool m_RequestingLS; }; const size_t MAX_DATAGRAM_SIZE = 32768; From b1b5904852b3141d2a4314244efb733e386bc64e Mon Sep 17 00:00:00 2001 From: orignal Date: Tue, 27 Dec 2016 22:45:51 -0500 Subject: [PATCH 15/15] show SOCKS proxy as client tunnel --- ClientContext.h | 1 + HTTPServer.cpp | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/ClientContext.h b/ClientContext.h index db74a19e..e392c8a2 100644 --- a/ClientContext.h +++ b/ClientContext.h @@ -113,6 +113,7 @@ namespace client const decltype(m_ClientForwards)& GetClientForwards () const { return m_ClientForwards; } const decltype(m_ServerForwards)& GetServerForwards () const { return m_ServerForwards; } const i2p::proxy::HTTPProxy * GetHttpProxy () const { return m_HttpProxy; } + const i2p::proxy::SOCKSProxy * GetSocksProxy () const { return m_SocksProxy; } }; extern ClientContext context; diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 48ec9bd2..1ba22303 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -604,6 +604,15 @@ namespace http { s << i2p::client::context.GetAddressBook ().ToAddress(ident); s << "
\r\n"<< std::endl; } + auto socksProxy = i2p::client::context.GetSocksProxy (); + if (socksProxy) + { + auto& ident = socksProxy->GetLocalDestination ()->GetIdentHash(); + s << ""; + s << "SOCKS Proxy" << " ⇐ "; + s << i2p::client::context.GetAddressBook ().ToAddress(ident); + s << "
\r\n"<< std::endl; + } s << "
\r\nServer Tunnels:
\r\n
\r\n"; for (auto& it: i2p::client::context.GetServerTunnels ()) {