diff --git a/Config.cpp b/Config.cpp index 64333d74..7e396b88 100644 --- a/Config.cpp +++ b/Config.cpp @@ -178,7 +178,8 @@ namespace config { "https://download.xxlspeed.com/," "https://reseed-ru.lngserv.ru/," "https://reseed.atomike.ninja/," - "https://reseed.memcpy.io/" + "https://reseed.memcpy.io/", + "https://reseed.onion.im/" ), "Reseed URLs, separated by comma") ; diff --git a/Datagram.cpp b/Datagram.cpp index 705e10ef..c501edf2 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -99,6 +99,8 @@ namespace datagram size_t uncompressedLen = m_Inflator.Inflate (buf, len, uncompressed, MAX_DATAGRAM_SIZE); if (uncompressedLen) HandleDatagram (fromPort, toPort, uncompressed, uncompressedLen); + else + LogPrint (eLogWarning, "Datagram: decompression failed"); } std::shared_ptr<I2NPMessage> DatagramDestination::CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort) @@ -134,6 +136,7 @@ namespace datagram if (now - it->second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE) { LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", it->first.ToBase32()); + it->second->Stop (); it = m_Sessions.erase (it); // we are expired } else @@ -149,6 +152,7 @@ namespace datagram if (itr == m_Sessions.end()) { // not found, create new session session = std::make_shared<DatagramSession>(m_Owner, identity); + session->Start (); m_Sessions[identity] = session; } else { session = itr->second; @@ -172,11 +176,20 @@ namespace datagram m_RemoteIdent(remoteIdent), m_SendQueueTimer(localDestination->GetService()), m_RequestingLS(false) + { + } + + void DatagramSession::Start () { m_LastUse = i2p::util::GetMillisecondsSinceEpoch (); ScheduleFlushSendQueue(); } + void DatagramSession::Stop () + { + m_SendQueueTimer.cancel (); + } + void DatagramSession::SendMsg(std::shared_ptr<I2NPMessage> msg) { // we used this session diff --git a/Datagram.h b/Datagram.h index a10f2646..b317d101 100644 --- a/Datagram.h +++ b/Datagram.h @@ -37,8 +37,10 @@ namespace datagram class DatagramSession : public std::enable_shared_from_this<DatagramSession> { public: - DatagramSession(i2p::client::ClientDestination * localDestination, - const i2p::data::IdentHash & remoteIdent); + DatagramSession(i2p::client::ClientDestination * localDestination, const i2p::data::IdentHash & remoteIdent); + + void Start (); + void Stop (); /** @brief ack the garlic routing path */ diff --git a/SSU.cpp b/SSU.cpp index 9dc05608..93eedb23 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -13,12 +13,13 @@ namespace transport SSUServer::SSUServer (const boost::asio::ip::address & addr, int port): m_OnlyV6(true), m_IsRunning(false), - m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr), - m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), - m_EndpointV6 (addr, port), - m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), - m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), - m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) + m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr), + m_ReceiversThreadV6 (nullptr), m_Work (m_Service), m_WorkV6 (m_ServiceV6), + m_ReceiversWork (m_ReceiversService), m_ReceiversWorkV6 (m_ReceiversServiceV6), + m_EndpointV6 (addr, port), m_Socket (m_ReceiversService, m_Endpoint), + m_SocketV6 (m_ReceiversServiceV6), m_IntroducersUpdateTimer (m_Service), + m_PeerTestsCleanupTimer (m_Service), m_TerminationTimer (m_Service), + m_TerminationTimerV6 (m_ServiceV6) { OpenSocketV6 (); } @@ -26,9 +27,10 @@ namespace transport SSUServer::SSUServer (int port): m_OnlyV6(false), m_IsRunning(false), m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr), - m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), + m_ReceiversThreadV6 (nullptr), m_Work (m_Service), m_WorkV6 (m_ServiceV6), + m_ReceiversWork (m_ReceiversService), m_ReceiversWorkV6 (m_ReceiversServiceV6), m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port), - m_Socket (m_ReceiversService), m_SocketV6 (m_ReceiversService), + m_Socket (m_ReceiversService), m_SocketV6 (m_ReceiversServiceV6), m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) { @@ -44,8 +46,8 @@ namespace transport void SSUServer::OpenSocket () { m_Socket.open (boost::asio::ip::udp::v4()); - m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535)); - m_Socket.set_option (boost::asio::socket_base::send_buffer_size (65535)); + m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (SSU_SOCKET_RECEIVE_BUFFER_SIZE)); + m_Socket.set_option (boost::asio::socket_base::send_buffer_size (SSU_SOCKET_SEND_BUFFER_SIZE)); m_Socket.bind (m_Endpoint); } @@ -53,25 +55,26 @@ namespace transport { m_SocketV6.open (boost::asio::ip::udp::v6()); m_SocketV6.set_option (boost::asio::ip::v6_only (true)); - m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (65535)); - m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (65535)); + m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (SSU_SOCKET_RECEIVE_BUFFER_SIZE)); + m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (SSU_SOCKET_SEND_BUFFER_SIZE)); m_SocketV6.bind (m_EndpointV6); } void SSUServer::Start () { m_IsRunning = true; - m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this)); if (!m_OnlyV6) { + m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this)); m_Thread = new std::thread (std::bind (&SSUServer::Run, this)); m_ReceiversService.post (std::bind (&SSUServer::Receive, this)); ScheduleTermination (); } if (context.SupportsV6 ()) { + m_ReceiversThreadV6 = new std::thread (std::bind (&SSUServer::RunReceiversV6, this)); m_ThreadV6 = new std::thread (std::bind (&SSUServer::RunV6, this)); - m_ReceiversService.post (std::bind (&SSUServer::ReceiveV6, this)); + m_ReceiversServiceV6.post (std::bind (&SSUServer::ReceiveV6, this)); ScheduleTerminationV6 (); } SchedulePeerTestsCleanupTimer (); @@ -89,6 +92,7 @@ namespace transport m_ServiceV6.stop (); m_SocketV6.close (); m_ReceiversService.stop (); + m_ReceiversServiceV6.stop (); if (m_ReceiversThread) { m_ReceiversThread->join (); @@ -101,6 +105,12 @@ namespace transport delete m_Thread; m_Thread = nullptr; } + if (m_ReceiversThreadV6) + { + m_ReceiversThreadV6->join (); + delete m_ReceiversThreadV6; + m_ReceiversThreadV6 = nullptr; + } if (m_ThreadV6) { m_ThreadV6->join (); @@ -154,6 +164,21 @@ namespace transport } } + void SSUServer::RunReceiversV6 () + { + while (m_IsRunning) + { + try + { + m_ReceiversServiceV6.run (); + } + catch (std::exception& ex) + { + LogPrint (eLogError, "SSU: v6 receivers runtime exception: ", ex.what ()); + } + } + } + void SSUServer::AddRelay (uint32_t tag, std::shared_ptr<SSUSession> relay) { m_Relays[tag] = relay; diff --git a/SSU.h b/SSU.h index adbed739..c9881fd0 100644 --- a/SSU.h +++ b/SSU.h @@ -25,6 +25,8 @@ namespace transport const int SSU_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour const int SSU_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds const size_t SSU_MAX_NUM_INTRODUCERS = 3; + const size_t SSU_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K + const size_t SSU_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K struct SSUPacket { @@ -74,6 +76,7 @@ namespace transport void Run (); void RunV6 (); void RunReceivers (); + void RunReceiversV6 (); void Receive (); void ReceiveV6 (); void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet); @@ -111,9 +114,9 @@ namespace transport bool m_OnlyV6; bool m_IsRunning; - std::thread * m_Thread, * m_ThreadV6, * m_ReceiversThread; - boost::asio::io_service m_Service, m_ServiceV6, m_ReceiversService; - boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork; + std::thread * m_Thread, * m_ThreadV6, * m_ReceiversThread, * m_ReceiversThreadV6; + boost::asio::io_service m_Service, m_ServiceV6, m_ReceiversService, m_ReceiversServiceV6; + boost::asio::io_service::work m_Work, m_WorkV6, m_ReceiversWork, m_ReceiversWorkV6; boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6; boost::asio::ip::udp::socket m_Socket, m_SocketV6; boost::asio::deadline_timer m_IntroducersUpdateTimer, m_PeerTestsCleanupTimer, @@ -122,7 +125,7 @@ namespace transport std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions, m_SessionsV6; std::map<uint32_t, std::shared_ptr<SSUSession> > m_Relays; // we are introducer std::map<uint32_t, PeerTest> m_PeerTests; // nonce -> creation time in milliseconds - + public: // for HTTP only const decltype(m_Sessions)& GetSessions () const { return m_Sessions; }; diff --git a/SSUData.cpp b/SSUData.cpp index c700d8e6..5ca4dac3 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -52,7 +52,7 @@ namespace transport void SSUData::AdjustPacketSize (std::shared_ptr<const i2p::data::RouterInfo> remoteRouter) { - if (remoteRouter) return; + if (!remoteRouter) return; auto ssuAddress = remoteRouter->GetSSUAddress (); if (ssuAddress && ssuAddress->ssu->mtu) { diff --git a/contrib/certificates/reseed/lazygravy_at_mail.i2p.crt b/contrib/certificates/reseed/lazygravy_at_mail.i2p.crt new file mode 100644 index 00000000..1ff5346c --- /dev/null +++ b/contrib/certificates/reseed/lazygravy_at_mail.i2p.crt @@ -0,0 +1,34 @@ +-----BEGIN CERTIFICATE----- +MIIFzTCCA7WgAwIBAgIQCnVoosrOolXsY+bR5kByeTANBgkqhkiG9w0BAQsFADBy +MQswCQYDVQQGEwJYWDELMAkGA1UEBxMCWFgxCzAJBgNVBAkTAlhYMR4wHAYDVQQK +ExVJMlAgQW5vbnltb3VzIE5ldHdvcmsxDDAKBgNVBAsTA0kyUDEbMBkGA1UEAwwS +bGF6eWdyYXZ5QG1haWwuaTJwMB4XDTE2MTIyNzE1NDEzNloXDTI2MTIyNzE1NDEz +NlowcjELMAkGA1UEBhMCWFgxCzAJBgNVBAcTAlhYMQswCQYDVQQJEwJYWDEeMBwG +A1UEChMVSTJQIEFub255bW91cyBOZXR3b3JrMQwwCgYDVQQLEwNJMlAxGzAZBgNV +BAMMEmxhenlncmF2eUBtYWlsLmkycDCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCC +AgoCggIBAN3q+0nUzz9+CBSoXUNf8K6kIc9zF+OP1NVBmOu3zTtkcEnhTtoDNXeU +EV8DhlBhEACbPomA+szQ5zp3O3OYQc2NV50S7KKqlfn5LBBE3BL2grTeBxUMysDd +0TlpxcHKwaog4TZtkHxeNO94F1vgeOkOnlpCQ6H3cMkPEGG3zu1A1ccgPiYO838/ +HNMkSF//VZJLOfPe1vmn9xTB7wZ0DLpEh12QZGg3irA+QDX5zy6Ffl+/Lp+L4tXT +uPZUaC6CL6EABX4DvQcFrOtiWfkbi/ROgYCeTrYw1XbDHfPc+MBxGo1bX7JjnD0o +mFFvo+PjxvWDmCad2TaITh6DwGEeWKu8NtJAyaO5p1ntauuWGB5Xzua4aMmIy7GT +esHQkhW+5IooM0R5bZI8/KXo4Bj52bX5qv+oBiExc6PUUTLWyjoWHb7fKdddwGfc +lUfniV/fw7/9ysIkQZcXLDCXR6O/nH9aGDZ7bxHedw4/LxAXYPfNojb5j7ZVa65o +PWD5xuQfbE+95DdbnKjcjYiam4kjApe7YPwOhtoRJYSGAkrpIMfzFxCXgjTsi3Kw +Ov+sYmBvWBK4ROWQZTgHei3x4FpAGWHCAeTeeQGKmWQ8tT7ZklWD9fBm3J/KXo7I +WCxRW9oedItyqbRuAGxqaoaGSk6TtPVjyPIUExDp1dr4p1nM1TOLAgMBAAGjXzBd +MA4GA1UdDwEB/wQEAwIChDAdBgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEw +DwYDVR0TAQH/BAUwAwEB/zAbBgNVHQ4EFAQSbGF6eWdyYXZ5QG1haWwuaTJwMA0G +CSqGSIb3DQEBCwUAA4ICAQA2fei/JajeQ7Rn0Hu3IhgF9FDXyxDfcS9Kp+gHE56A +50VOtOcvAQabi/+lt5DqkiBwanj0Ti/ydFRyEmPo45+fUfFuCgXcofro8PGGqFEz +rZGtknH/0hiGfhLR9yQXY8xFS4yvLZvuIcTHa9QPJg3tB9KeYQzF91NQVb5XAyE7 +O3RvollADTV31Xbhxjb7lgra6ff9dZQJE6xtlSk/mnhILjlW80+iPKuj3exBgbJv +ktiR4ZT4xjh1ZgNJX5br86MZrhyyyGWwHWHS0e443eSrrmAPD69zxsfvhoikRX1z +tDz0zB70DwS4pSbVrFuWaIAcbg36vWO8tYPBzV8iBB/tBTURGJjv6Q0EoI5GHmJi +LOhU3B6xublv8Tcoc3tgMqI9STnWROtTiCS6LsWNSXhVpIZqvaiOEtPN4HyL33sf +j5rfPq76gKrTloeLnwLGq0Rs94ScffYkBap3fQ/ALb87LQcwSN4EkObur5pcd7TS +qNdanvCGK8v1UYVzH4l9jekPGsM5euohwAkIl1kZ6+tqGY/MTa7HwTTQyLDTco1t +sPy6neN46+H5DYHADyU5H2G39Kk3WcLmPtfxlPDM6e73+47fJkXnmiaWM0Lrt80y +Enng6bFGMZH01ZsqBk09H+Uswv8h7k69q9uWAS95KE0omCMVtIpoPZXTnRhe6mBC ++g== +-----END CERTIFICATE----- diff --git a/util.h b/util.h index 2ee1c286..38ca6b28 100644 --- a/util.h +++ b/util.h @@ -72,6 +72,12 @@ namespace util std::bind (&MemoryPool<T>::Release, this, std::placeholders::_1)); } + template<typename... TArgs> + std::shared_ptr<T> AcquireShared (TArgs&&... args) + { + return std::shared_ptr<T>(Acquire (args...), std::bind (&MemoryPool<T>::Release, this, std::placeholders::_1)); + } + protected: T * m_Head;