diff --git a/Destination.cpp b/Destination.cpp index 80951dc4..61960464 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -87,6 +87,8 @@ namespace client Stop (); if (m_Pool) i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); + for (auto& it: m_LeaseSetRequests) + it.second->Complete (nullptr); } void LeaseSetDestination::Run () @@ -128,14 +130,7 @@ namespace client { m_CleanupTimer.cancel (); m_PublishConfirmationTimer.cancel (); - m_PublishVerificationTimer.cancel (); - - for (auto& it: m_LeaseSetRequests) - { - it.second->Complete (nullptr); - it.second->requestTimeoutTimer.cancel (); - } - m_LeaseSetRequests.clear (); + m_PublishVerificationTimer.cancel (); m_IsRunning = false; if (m_Pool) @@ -532,7 +527,8 @@ namespace client if (floodfill) { auto request = std::make_shared (m_Service); - request->requestComplete.push_back (requestComplete); + if (requestComplete) + request->requestComplete.push_back (requestComplete); auto ts = i2p::util::GetSecondsSinceEpoch (); auto ret = m_LeaseSetRequests.insert (std::pair >(dest,request)); if (ret.second) // inserted @@ -542,7 +538,7 @@ namespace client { // request failed m_LeaseSetRequests.erase (ret.first); - requestComplete (nullptr); + if (requestComplete) requestComplete (nullptr); } } else // duplicate @@ -552,13 +548,13 @@ namespace client //ret.first->second->requestComplete.push_back (requestComplete); if (ts > ret.first->second->requestTime + MAX_LEASESET_REQUEST_TIMEOUT) m_LeaseSetRequests.erase (ret.first); - requestComplete (nullptr); + if (requestComplete) requestComplete (nullptr); } } else { LogPrint (eLogError, "Destination: Can't request LeaseSet, no floodfills found"); - requestComplete (nullptr); + if (requestComplete) requestComplete (nullptr); } } @@ -706,12 +702,12 @@ namespace client { m_ReadyChecker.cancel(); m_StreamingDestination->Stop (); - m_StreamingDestination->SetOwner (nullptr); + //m_StreamingDestination->SetOwner (nullptr); m_StreamingDestination = nullptr; for (auto& it: m_StreamingDestinationsByPorts) { it.second->Stop (); - it.second->SetOwner (nullptr); + //it.second->SetOwner (nullptr); } m_StreamingDestinationsByPorts.clear (); if (m_DatagramDestination) diff --git a/Makefile.linux b/Makefile.linux index 28081c81..8def8299 100644 --- a/Makefile.linux +++ b/Makefile.linux @@ -45,10 +45,14 @@ else LDLIBS = -lcrypto -lssl -lz -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options -lpthread endif -# UPNP Support (miniupnpc 1.5 or 1.6) +# UPNP Support (miniupnpc 1.5 and higher) ifeq ($(USE_UPNP),yes) - LDFLAGS += -lminiupnpc CXXFLAGS += -DUSE_UPNP +ifeq ($(USE_STATIC),yes) + LDLIBS += $(LIBDIR)/libminiupnpc.a +else + LDLIBS += -lminiupnpc +endif endif IS_64 := $(shell $(CXX) -dumpmachine 2>&1 | $(GREP) -c "64") diff --git a/NTCPSession.cpp b/NTCPSession.cpp index b65532be..81cfe687 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -537,11 +537,11 @@ namespace transport { boost::system::error_code ec; size_t moreBytes = m_Socket.available(ec); - if (moreBytes) + if (moreBytes && !ec) { if (moreBytes > NTCP_BUFFER_SIZE - m_ReceiveBufferOffset) moreBytes = NTCP_BUFFER_SIZE - m_ReceiveBufferOffset; - moreBytes = m_Socket.read_some (boost::asio::buffer (m_ReceiveBuffer + m_ReceiveBufferOffset, moreBytes)); + moreBytes = m_Socket.read_some (boost::asio::buffer (m_ReceiveBuffer + m_ReceiveBufferOffset, moreBytes), ec); if (ec) { LogPrint (eLogInfo, "NTCP: Read more bytes error: ", ec.message ()); @@ -589,7 +589,8 @@ namespace transport else { // timestamp - LogPrint (eLogDebug, "NTCP: Timestamp"); + int diff = (int)bufbe32toh (buf + 2) - (int)i2p::util::GetSecondsSinceEpoch (); + LogPrint (eLogInfo, "NTCP: Timestamp. Time difference ", diff, " seconds"); return true; } } @@ -650,7 +651,7 @@ namespace transport sendBuffer = m_TimeSyncBuffer; len = 4; htobuf16(sendBuffer, 0); - htobe32buf (sendBuffer + 2, time (0)); + htobe32buf (sendBuffer + 2, i2p::util::GetSecondsSinceEpoch ()); } int rem = (len + 6) & 0x0F; // %16 int padding = 0; @@ -803,6 +804,12 @@ namespace transport void NTCPServer::Stop () { + { + // we have to copy it because Terminate changes m_NTCPSessions + auto ntcpSessions = m_NTCPSessions; + for (auto& it: ntcpSessions) + it.second->Terminate (); + } m_NTCPSessions.clear (); if (m_IsRunning) @@ -951,18 +958,31 @@ namespace transport { LogPrint (eLogDebug, "NTCP: Connecting to ", address ,":", port); m_Service.post([=]() - { + { if (this->AddNTCPSession (conn)) + { + auto timer = std::make_shared(m_Service); + timer->expires_from_now (boost::posix_time::seconds(NTCP_CONNECT_TIMEOUT)); + timer->async_wait ([conn](const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + LogPrint (eLogInfo, "NTCP: Not connected in ", NTCP_CONNECT_TIMEOUT, " seconds"); + conn->Terminate (); + } + }); conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port), - std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn)); + std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn, timer)); + } }); } - void NTCPServer::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn) + void NTCPServer::HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn, std::shared_ptr timer) { + timer->cancel (); if (ecode) { - LogPrint (eLogError, "NTCP: Connect error ", ecode.message ()); + LogPrint (eLogInfo, "NTCP: Connect error ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ()->GetIdentHash (), true); conn->Terminate (); diff --git a/NTCPSession.h b/NTCPSession.h index a3d95c62..639942f2 100644 --- a/NTCPSession.h +++ b/NTCPSession.h @@ -36,6 +36,7 @@ namespace transport const size_t NTCP_MAX_MESSAGE_SIZE = 16384; const size_t NTCP_BUFFER_SIZE = 4160; // fits 4 tunnel messages (4*1028) + const int NTCP_CONNECT_TIMEOUT = 5; // 5 seconds const int NTCP_TERMINATION_TIMEOUT = 120; // 2 minutes const int NTCP_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds const size_t NTCP_DEFAULT_PHASE3_SIZE = 2/*size*/ + i2p::data::DEFAULT_IDENTITY_SIZE/*387*/ + 4/*ts*/ + 15/*padding*/ + 40/*signature*/; // 448 @@ -153,7 +154,7 @@ namespace transport void HandleAccept (std::shared_ptr conn, const boost::system::error_code& error); void HandleAcceptV6 (std::shared_ptr conn, const boost::system::error_code& error); - void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn); + void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr conn, std::shared_ptr timer); // timer void ScheduleTermination (); diff --git a/NetDb.cpp b/NetDb.cpp index 5a622441..e75cf217 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -1008,12 +1008,12 @@ namespace data }); } - std::shared_ptr NetDb::GetRandomPeerTestRouter () const + std::shared_ptr NetDb::GetRandomPeerTestRouter (bool v4only) const { return GetRandomRouter ( - [](std::shared_ptr router)->bool + [v4only](std::shared_ptr router)->bool { - return !router->IsHidden () && router->IsPeerTesting (); + return !router->IsHidden () && router->IsPeerTesting () && router->IsSSU (v4only); }); } diff --git a/NetDb.h b/NetDb.h index ba65b1e4..954cc74e 100644 --- a/NetDb.h +++ b/NetDb.h @@ -69,7 +69,7 @@ namespace data std::shared_ptr GetRandomRouter () const; std::shared_ptr GetRandomRouter (std::shared_ptr compatibleWith) const; std::shared_ptr GetHighBandwidthRandomRouter (std::shared_ptr compatibleWith) const; - std::shared_ptr GetRandomPeerTestRouter () const; + std::shared_ptr GetRandomPeerTestRouter (bool v4only = true) const; std::shared_ptr GetRandomIntroducer () const; std::shared_ptr GetClosestFloodfill (const IdentHash& destination, const std::set& excluded, bool closeThanUsOnly = false) const; std::vector GetClosestFloodfills (const IdentHash& destination, size_t num, diff --git a/SSU.cpp b/SSU.cpp index 72091e00..198fed4b 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -20,11 +20,7 @@ namespace transport m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) { - m_SocketV6.open (boost::asio::ip::udp::v6()); - m_SocketV6.set_option (boost::asio::ip::v6_only (true)); - m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (65535)); - m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (65535)); - m_SocketV6.bind (m_EndpointV6); + OpenSocketV6 (); } SSUServer::SSUServer (int port): @@ -32,27 +28,36 @@ namespace transport m_Thread (nullptr), m_ThreadV6 (nullptr), m_ReceiversThread (nullptr), m_Work (m_Service), m_WorkV6 (m_ServiceV6), m_ReceiversWork (m_ReceiversService), m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port), - m_Socket (m_ReceiversService, m_Endpoint), m_SocketV6 (m_ReceiversService), + m_Socket (m_ReceiversService), m_SocketV6 (m_ReceiversService), m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_ServiceV6) { - - m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535)); - m_Socket.set_option (boost::asio::socket_base::send_buffer_size (65535)); + OpenSocket (); if (context.SupportsV6 ()) - { - m_SocketV6.open (boost::asio::ip::udp::v6()); - m_SocketV6.set_option (boost::asio::ip::v6_only (true)); - m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (65535)); - m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (65535)); - m_SocketV6.bind (m_EndpointV6); - } + OpenSocketV6 (); } SSUServer::~SSUServer () { } + void SSUServer::OpenSocket () + { + m_Socket.open (boost::asio::ip::udp::v4()); + m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (65535)); + m_Socket.set_option (boost::asio::socket_base::send_buffer_size (65535)); + m_Socket.bind (m_Endpoint); + } + + void SSUServer::OpenSocketV6 () + { + m_SocketV6.open (boost::asio::ip::udp::v6()); + m_SocketV6.set_option (boost::asio::ip::v6_only (true)); + m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (65535)); + m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (65535)); + m_SocketV6.bind (m_EndpointV6); + } + void SSUServer::Start () { m_IsRunning = true; @@ -194,21 +199,40 @@ namespace transport boost::system::error_code ec; size_t moreBytes = m_Socket.available(ec); - while (moreBytes && packets.size () < 25) - { - packet = new SSUPacket (); - packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from); - packets.push_back (packet); - moreBytes = m_Socket.available(); - } + if (!ec) + { + while (moreBytes && packets.size () < 25) + { + packet = new SSUPacket (); + packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from, 0, ec); + if (!ec) + { + packets.push_back (packet); + moreBytes = m_Socket.available(ec); + if (ec) break; + } + else + { + LogPrint (eLogError, "SSU: receive_from error: ", ec.message ()); + delete packet; + break; + } + } + } m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_Sessions)); Receive (); } else { - LogPrint (eLogError, "SSU: receive error: ", ecode.message ()); delete packet; + if (ecode != boost::asio::error::operation_aborted) + { + LogPrint (eLogError, "SSU: receive error: ", ecode.message ()); + m_Socket.close (); + OpenSocket (); + Receive (); + } } } @@ -220,22 +244,42 @@ namespace transport std::vector packets; packets.push_back (packet); - size_t moreBytes = m_SocketV6.available (); - while (moreBytes && packets.size () < 25) + boost::system::error_code ec; + size_t moreBytes = m_SocketV6.available (ec); + if (!ec) { - packet = new SSUPacket (); - packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from); - packets.push_back (packet); - moreBytes = m_SocketV6.available(); + while (moreBytes && packets.size () < 25) + { + packet = new SSUPacket (); + packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from, 0, ec); + if (!ec) + { + packets.push_back (packet); + moreBytes = m_SocketV6.available(ec); + if (ec) break; + } + else + { + LogPrint (eLogError, "SSU: v6 receive_from error: ", ec.message ()); + delete packet; + break; + } + } } - + m_ServiceV6.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_SessionsV6)); ReceiveV6 (); } else { - LogPrint (eLogError, "SSU: v6 receive error: ", ecode.message ()); delete packet; + if (ecode != boost::asio::error::operation_aborted) + { + LogPrint (eLogError, "SSU: v6 receive error: ", ecode.message ()); + m_SocketV6.close (); + OpenSocketV6 (); + ReceiveV6 (); + } } } @@ -298,9 +342,9 @@ namespace transport return nullptr; } - void SSUServer::CreateSession (std::shared_ptr router, bool peerTest) + void SSUServer::CreateSession (std::shared_ptr router, bool peerTest, bool v4only) { - auto address = router->GetSSUAddress (!context.SupportsV6 ()); + auto address = router->GetSSUAddress (v4only || !context.SupportsV6 ()); if (address) CreateSession (router, address->host, address->port, peerTest); else diff --git a/SSU.h b/SSU.h index 182fa0eb..9d8e2878 100644 --- a/SSU.h +++ b/SSU.h @@ -42,7 +42,7 @@ namespace transport ~SSUServer (); void Start (); void Stop (); - void CreateSession (std::shared_ptr router, bool peerTest = false); + void CreateSession (std::shared_ptr router, bool peerTest = false, bool v4only = false); void CreateSession (std::shared_ptr router, const boost::asio::ip::address& addr, int port, bool peerTest = false); void CreateDirectSession (std::shared_ptr router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest); @@ -68,6 +68,8 @@ namespace transport private: + void OpenSocket (); + void OpenSocketV6 (); void Run (); void RunV6 (); void RunReceivers (); diff --git a/Transports.cpp b/Transports.cpp index d06d8670..1b9d52a1 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -519,7 +519,7 @@ namespace transport void Transports::DetectExternalIP () { if (RoutesRestricted()) - { + { LogPrint(eLogInfo, "Transports: restricted routes enabled, not detecting ip"); i2p::context.SetStatus (eRouterStatusOK); return; @@ -527,13 +527,14 @@ namespace transport if (m_SSUServer) { bool nat; i2p::config::GetOption("nat", nat); - if (nat) + bool isv4 = i2p::context.SupportsV4 (); + if (nat && isv4) i2p::context.SetStatus (eRouterStatusTesting); for (int i = 0; i < 5; i++) { - auto router = i2p::data::netdb.GetRandomPeerTestRouter (); - if (router && router->IsSSU (!context.SupportsV6 ())) - m_SSUServer->CreateSession (router, true); // peer test + auto router = i2p::data::netdb.GetRandomPeerTestRouter (isv4); // v4 only if v4 + if (router) + m_SSUServer->CreateSession (router, true, isv4); // peer test else { // if not peer test capable routers found pick any @@ -549,23 +550,25 @@ namespace transport void Transports::PeerTest () { - if (RoutesRestricted()) return; + if (RoutesRestricted() || !i2p::context.SupportsV4 ()) return; if (m_SSUServer) - { + { bool statusChanged = false; for (int i = 0; i < 5; i++) { - auto router = i2p::data::netdb.GetRandomPeerTestRouter (); - if (router && router->IsSSU (!context.SupportsV6 ())) + auto router = i2p::data::netdb.GetRandomPeerTestRouter (true); // v4 only + if (router) { if (!statusChanged) { statusChanged = true; i2p::context.SetStatus (eRouterStatusTesting); // first time only } - m_SSUServer->CreateSession (router, true); // peer test + m_SSUServer->CreateSession (router, true, true); // peer test v4 } - } + } + if (!statusChanged) + LogPrint (eLogWarning, "Can't find routers for peer test"); } }