diff --git a/AddressBook.cpp b/AddressBook.cpp index b3f0f1ce..b993f456 100644 --- a/AddressBook.cpp +++ b/AddressBook.cpp @@ -546,8 +546,8 @@ namespace client auto datagram = dest->GetDatagramDestination (); if (!datagram) datagram = dest->CreateDatagramDestination (); - datagram->SetReceiver (std::bind (&AddressBook::HandleLookupResponse, this, - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5), + datagram->SetReceiver (std::bind (&AddressBook::HandleLookupResponse, this, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5), ADDRESS_RESPONSE_DATAGRAM_PORT); } } @@ -558,8 +558,7 @@ namespace client if (dest) { auto datagram = dest->GetDatagramDestination (); - if (datagram) - datagram->ResetReceiver (ADDRESS_RESPONSE_DATAGRAM_PORT); + if (datagram) datagram->ResetReceiver (ADDRESS_RESPONSE_DATAGRAM_PORT); } } diff --git a/ClientContext.cpp b/ClientContext.cpp index aa18ecdf..3eee42b2 100644 --- a/ClientContext.cpp +++ b/ClientContext.cpp @@ -17,7 +17,8 @@ namespace client ClientContext::ClientContext (): m_SharedLocalDestination (nullptr), m_HttpProxy (nullptr), m_SocksProxy (nullptr), m_SamBridge (nullptr), - m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr) + m_BOBCommandChannel (nullptr), m_I2CPServer (nullptr), + m_CleanupUDPTimer(m_Service, boost::posix_time::seconds(1)) { } @@ -39,8 +40,9 @@ namespace client m_SharedLocalDestination->Start (); } + m_AddressBook.Start (); - + std::shared_ptr localDestination; bool httproxy; i2p::config::GetOption("httpproxy.enabled", httproxy); if (httproxy) { @@ -51,8 +53,10 @@ namespace client if (httpProxyKeys.length () > 0) { i2p::data::PrivateKeys keys; - LoadPrivateKeys (keys, httpProxyKeys); - localDestination = CreateNewLocalDestination (keys, false); + if(LoadPrivateKeys (keys, httpProxyKeys)) + localDestination = CreateNewLocalDestination (keys, false); + else + LogPrint(eLogError, "Clients: failed to load HTTP Proxy key"); } try { m_HttpProxy = new i2p::proxy::HTTPProxy(httpProxyAddr, httpProxyPort, localDestination); @@ -61,7 +65,7 @@ namespace client LogPrint(eLogError, "Clients: Exception in HTTP Proxy: ", e.what()); } } - + bool socksproxy; i2p::config::GetOption("socksproxy.enabled", socksproxy); if (socksproxy) { std::string socksProxyKeys; i2p::config::GetOption("socksproxy.keys", socksProxyKeys); @@ -83,10 +87,21 @@ namespace client LogPrint(eLogError, "Clients: Exception in SOCKS Proxy: ", e.what()); } } - + + if ( m_ServiceThread == nullptr ) { + m_ServiceThread = new std::thread([&] () { + LogPrint(eLogInfo, "ClientContext: starting service"); + m_Service.run(); + LogPrint(eLogError, "ClientContext: service died"); + }); + ScheduleCleanupUDP(); + } + + // I2P tunnels ReadTunnels (); + // SAM bool sam; i2p::config::GetOption("sam.enabled", sam); if (sam) { @@ -193,20 +208,40 @@ namespace client } LogPrint(eLogInfo, "Clients: stopping AddressBook"); - m_AddressBook.Stop (); + m_AddressBook.Stop (); + + { + std::lock_guard lock(m_ForwardsMutex); + m_ServerForwards.clear(); + m_ClientForwards.clear(); + } + + for (auto& it: m_Destinations) it.second->Stop (); m_Destinations.clear (); - m_SharedLocalDestination = nullptr; + m_SharedLocalDestination = nullptr; + // stop io service thread + if(m_ServiceThread) + { + m_Service.stop(); + m_ServiceThread->join(); + delete m_ServiceThread; + m_ServiceThread = nullptr; + } } void ClientContext::ReloadConfig () { - ReadTunnels (); // TODO: it reads new tunnels only, should be implemented better + std::string config; i2p::config::GetOption("conf", config); + i2p::config::ParseConfig(config); + Stop(); + Start(); } - void ClientContext::LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType) + bool ClientContext::LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType) { + bool success = true; std::string fullPath = i2p::fs::DataDirPath (filename); std::ifstream s(fullPath, std::ifstream::binary); if (s.is_open ()) @@ -216,9 +251,14 @@ namespace client s.seekg (0, std::ios::beg); uint8_t * buf = new uint8_t[len]; s.read ((char *)buf, len); - keys.FromBuffer (buf, len); + if(!keys.FromBuffer (buf, len)) + { + LogPrint (eLogError, "Clients: failed to load keyfile ", filename); + success = false; + } + else + LogPrint (eLogInfo, "Clients: Local address ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " loaded"); delete[] buf; - LogPrint (eLogInfo, "Clients: Local address ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " loaded"); } else { @@ -232,7 +272,31 @@ namespace client delete[] buf; LogPrint (eLogInfo, "Clients: New private keys file ", fullPath, " for ", m_AddressBook.ToAddress(keys.GetPublic ()->GetIdentHash ()), " created"); - } + } + return success; + } + + std::vector > ClientContext::GetForwardInfosFor(const i2p::data::IdentHash & destination) + { + std::vector > infos; + std::lock_guard lock(m_ForwardsMutex); + for(const auto & c : m_ClientForwards) + { + if (c.second->IsLocalDestination(destination)) + { + for (auto & i : c.second->GetSessions()) infos.push_back(i); + break; + } + } + for(const auto & s : m_ServerForwards) + { + if(std::get<0>(s.first) == destination) + { + for( auto & i : s.second->GetSessions()) infos.push_back(i); + break; + } + } + return infos; } std::shared_ptr ClientContext::CreateNewLocalDestination (bool isPublic, i2p::data::SigningKeyType sigType, @@ -337,7 +401,7 @@ namespace client try { std::string type = section.second.get (I2P_TUNNELS_SECTION_TYPE); - if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT) + if (type == I2P_TUNNELS_SECTION_TYPE_CLIENT || type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { // mandatory params std::string dest = section.second.get (I2P_CLIENT_TUNNEL_DESTINATION); @@ -355,22 +419,43 @@ namespace client if (keys.length () > 0) { i2p::data::PrivateKeys k; - LoadPrivateKeys (k, keys, sigType); - localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ()); - if (!localDestination) - localDestination = CreateNewLocalDestination (k, false, &options); + if(LoadPrivateKeys (k, keys, sigType)) + { + localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ()); + if (!localDestination) + localDestination = CreateNewLocalDestination (k, type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT, &options); + } } - auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); - if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), - std::unique_ptr(clientTunnel))).second) - { - clientTunnel->Start (); - numClientTunnels++; + if (type == I2P_TUNNELS_SECTION_TYPE_UDPCLIENT) { + // udp client + // TODO: hostnames + boost::asio::ip::udp::endpoint end(boost::asio::ip::address::from_string(address), port); + if (!localDestination) + { + localDestination = m_SharedLocalDestination; + } + auto clientTunnel = new I2PUDPClientTunnel(name, dest, end, localDestination, destinationPort); + if(m_ClientForwards.insert(std::make_pair(end, std::unique_ptr(clientTunnel))).second) + { + clientTunnel->Start(); + } + else + LogPrint(eLogError, "Clients: I2P Client forward for endpoint ", end, " already exists"); + + } else { + // tcp client + auto clientTunnel = new I2PClientTunnel (name, dest, address, port, localDestination, destinationPort); + if (m_ClientTunnels.insert (std::make_pair (clientTunnel->GetAcceptor ().local_endpoint (), + std::unique_ptr(clientTunnel))).second) + { + clientTunnel->Start (); + numClientTunnels++; + } + else + LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); } - else - LogPrint (eLogError, "Clients: I2P client tunnel for endpoint ", clientTunnel->GetAcceptor ().local_endpoint (), " already exists"); } - else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC) + else if (type == I2P_TUNNELS_SECTION_TYPE_SERVER || type == I2P_TUNNELS_SECTION_TYPE_HTTP || type == I2P_TUNNELS_SECTION_TYPE_IRC || type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) { // mandatory params std::string host = section.second.get (I2P_SERVER_TUNNEL_HOST); @@ -383,17 +468,43 @@ namespace client std::string webircpass = section.second.get (I2P_SERVER_TUNNEL_WEBIRC_PASSWORD, ""); bool gzip = section.second.get (I2P_SERVER_TUNNEL_GZIP, true); i2p::data::SigningKeyType sigType = section.second.get (I2P_SERVER_TUNNEL_SIGNATURE_TYPE, i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256); + uint32_t maxConns = section.second.get(i2p::stream::I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN, i2p::stream::DEFAULT_MAX_CONNS_PER_MIN); + std::string address = section.second.get (I2P_SERVER_TUNNEL_ADDRESS, "127.0.0.1"); + // I2CP std::map options; ReadI2CPOptions (section, options); std::shared_ptr localDestination = nullptr; i2p::data::PrivateKeys k; - LoadPrivateKeys (k, keys, sigType); + if(!LoadPrivateKeys (k, keys, sigType)) + continue; localDestination = FindLocalDestination (k.GetPublic ()->GetIdentHash ()); if (!localDestination) localDestination = CreateNewLocalDestination (k, true, &options); - + if (type == I2P_TUNNELS_SECTION_TYPE_UDPSERVER) + { + // udp server tunnel + // TODO: hostnames + auto localAddress = boost::asio::ip::address::from_string(address); + boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address::from_string(host), port); + I2PUDPServerTunnel * serverTunnel = new I2PUDPServerTunnel(name, localDestination, localAddress, endpoint, port); + std::lock_guard lock(m_ForwardsMutex); + if(m_ServerForwards.insert( + std::make_pair( + std::make_pair( + localDestination->GetIdentHash(), port), + std::unique_ptr(serverTunnel))).second) + { + serverTunnel->Start(); + LogPrint(eLogInfo, "Clients: I2P Server Forward created for UDP Endpoint ", host, ":", port, " bound on ", address, " for ",localDestination->GetIdentHash().ToBase32()); + } + else + LogPrint(eLogError, "Clients: I2P Server Forward for destination/port ", m_AddressBook.ToAddress(localDestination->GetIdentHash()), "/", port, "already exists"); + + continue; + } + I2PServerTunnel * serverTunnel; if (type == I2P_TUNNELS_SECTION_TYPE_HTTP) serverTunnel = new I2PServerTunnelHTTP (name, host, port, localDestination, hostOverride, inPort, gzip); @@ -402,6 +513,10 @@ namespace client else // regular server tunnel by default serverTunnel = new I2PServerTunnel (name, host, port, localDestination, inPort, gzip); + LogPrint(eLogInfo, "Clients: Set Max Conns To ", maxConns); + serverTunnel->SetMaxConnsPerMinute(maxConns); + + if (accessList.length () > 0) { std::set idents; @@ -439,6 +554,23 @@ namespace client } LogPrint (eLogInfo, "Clients: ", numClientTunnels, " I2P client tunnels created"); LogPrint (eLogInfo, "Clients: ", numServerTunnels, " I2P server tunnels created"); - } + } + + void ClientContext::ScheduleCleanupUDP() + { + // schedule cleanup in 1 second + m_CleanupUDPTimer.expires_at(m_CleanupUDPTimer.expires_at() + boost::posix_time::seconds(1)); + m_CleanupUDPTimer.async_wait(std::bind(&ClientContext::CleanupUDP, this, std::placeholders::_1)); + } + + void ClientContext::CleanupUDP(const boost::system::error_code & ecode) + { + if(!ecode) + { + std::lock_guard lock(m_ForwardsMutex); + for ( auto & s : m_ServerForwards ) s.second->ExpireStale(); + ScheduleCleanupUDP(); + } + } } } diff --git a/ClientContext.h b/ClientContext.h index f5696902..581ff466 100644 --- a/ClientContext.h +++ b/ClientContext.h @@ -24,6 +24,8 @@ namespace client const char I2P_TUNNELS_SECTION_TYPE_SERVER[] = "server"; const char I2P_TUNNELS_SECTION_TYPE_HTTP[] = "http"; const char I2P_TUNNELS_SECTION_TYPE_IRC[] = "irc"; + const char I2P_TUNNELS_SECTION_TYPE_UDPCLIENT[] = "udpclient"; + const char I2P_TUNNELS_SECTION_TYPE_UDPSERVER[] = "udpserver"; const char I2P_CLIENT_TUNNEL_PORT[] = "port"; const char I2P_CLIENT_TUNNEL_ADDRESS[] = "address"; const char I2P_CLIENT_TUNNEL_DESTINATION[] = "destination"; @@ -39,7 +41,8 @@ namespace client const char I2P_SERVER_TUNNEL_ACCESS_LIST[] = "accesslist"; const char I2P_SERVER_TUNNEL_GZIP[] = "gzip"; const char I2P_SERVER_TUNNEL_WEBIRC_PASSWORD[] = "webircpassword"; - + const char I2P_SERVER_TUNNEL_ADDRESS[] = "address"; + class ClientContext { public: @@ -59,11 +62,13 @@ namespace client const std::map * params = nullptr); void DeleteLocalDestination (std::shared_ptr destination); std::shared_ptr FindLocalDestination (const i2p::data::IdentHash& destination) const; - void LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256); + bool LoadPrivateKeys (i2p::data::PrivateKeys& keys, const std::string& filename, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256); AddressBook& GetAddressBook () { return m_AddressBook; }; const SAMBridge * GetSAMBridge () const { return m_SamBridge; }; - + + std::vector > GetForwardInfosFor(const i2p::data::IdentHash & destination); + private: void ReadTunnels (); @@ -72,6 +77,9 @@ namespace client template void ReadI2CPOptions (const Section& section, std::map& options) const; + void CleanupUDP(const boost::system::error_code & ecode); + void ScheduleCleanupUDP(); + private: std::mutex m_DestinationsMutex; @@ -84,15 +92,27 @@ namespace client i2p::proxy::SOCKSProxy * m_SocksProxy; std::map > m_ClientTunnels; // local endpoint->tunnel std::map, std::unique_ptr > m_ServerTunnels; // ->tunnel + + std::mutex m_ForwardsMutex; + std::map > m_ClientForwards; // local endpoint -> udp tunnel + std::map, std::unique_ptr > m_ServerForwards; // -> udp tunnel + SAMBridge * m_SamBridge; BOBCommandChannel * m_BOBCommandChannel; I2CPServer * m_I2CPServer; + boost::asio::io_service m_Service; + std::thread * m_ServiceThread; + + boost::asio::deadline_timer m_CleanupUDPTimer; + public: // for HTTP const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; }; const decltype(m_ClientTunnels)& GetClientTunnels () const { return m_ClientTunnels; }; const decltype(m_ServerTunnels)& GetServerTunnels () const { return m_ServerTunnels; }; + const decltype(m_ClientForwards)& GetClientForwards () const { return m_ClientForwards; } + const decltype(m_ServerForwards)& GetServerForwards () const { return m_ServerForwards; } }; extern ClientContext context; diff --git a/DaemonLinux.cpp b/DaemonLinux.cpp index f6664926..22d7dec8 100644 --- a/DaemonLinux.cpp +++ b/DaemonLinux.cpp @@ -20,7 +20,7 @@ void handle_signal(int sig) switch (sig) { case SIGHUP: - LogPrint(eLogInfo, "Daemon: Got SIGHUP, reopening log..."); + LogPrint(eLogInfo, "Daemon: Got SIGHUP, reopening logs and tunnel configuration..."); i2p::log::Logger().Reopen (); i2p::client::context.ReloadConfig(); break; diff --git a/Datagram.cpp b/Datagram.cpp index 2015622c..cecbcc40 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -12,72 +12,45 @@ namespace i2p namespace datagram { DatagramDestination::DatagramDestination (std::shared_ptr owner): - m_Owner (owner), m_Receiver (nullptr) + m_Owner (owner.get()), + m_CleanupTimer(owner->GetService()), + m_Receiver (nullptr) { + ScheduleCleanup(); } DatagramDestination::~DatagramDestination () { + m_CleanupTimer.cancel(); + m_Sessions.clear(); } void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort) { + auto owner = m_Owner; + auto i = owner->GetIdentity(); uint8_t buf[MAX_DATAGRAM_SIZE]; - auto identityLen = m_Owner->GetIdentity ()->ToBuffer (buf, MAX_DATAGRAM_SIZE); + auto identityLen = i->ToBuffer (buf, MAX_DATAGRAM_SIZE); uint8_t * signature = buf + identityLen; - auto signatureLen = m_Owner->GetIdentity ()->GetSignatureLen (); + auto signatureLen = i->GetSignatureLen (); uint8_t * buf1 = signature + signatureLen; size_t headerLen = identityLen + signatureLen; memcpy (buf1, payload, len); - if (m_Owner->GetIdentity ()->GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1) + if (i->GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1) { uint8_t hash[32]; SHA256(buf1, len, hash); - m_Owner->Sign (hash, 32, signature); + owner->Sign (hash, 32, signature); } else - m_Owner->Sign (buf1, len, signature); + owner->Sign (buf1, len, signature); - auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); - auto remote = m_Owner->FindLeaseSet (ident); - if (remote) - m_Owner->GetService ().post (std::bind (&DatagramDestination::SendMsg, this, msg, remote)); - else - m_Owner->RequestDestination (ident, std::bind (&DatagramDestination::HandleLeaseSetRequestComplete, this, std::placeholders::_1, msg)); + auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); + auto session = ObtainSession(ident); + session->SendMsg(msg); } - void DatagramDestination::HandleLeaseSetRequestComplete (std::shared_ptr remote, std::shared_ptr msg) - { - if (remote) - SendMsg (msg, remote); - } - - void DatagramDestination::SendMsg (std::shared_ptr msg, std::shared_ptr remote) - { - auto outboundTunnel = m_Owner->GetTunnelPool ()->GetNextOutboundTunnel (); - auto leases = remote->GetNonExpiredLeases (); - if (!leases.empty () && outboundTunnel) - { - std::vector msgs; - uint32_t i = rand () % leases.size (); - auto garlic = m_Owner->WrapMessage (remote, msg, true); - msgs.push_back (i2p::tunnel::TunnelMessageBlock - { - i2p::tunnel::eDeliveryTypeTunnel, - leases[i]->tunnelGateway, leases[i]->tunnelID, - garlic - }); - outboundTunnel->SendTunnelDataMsg (msgs); - } - else - { - if (outboundTunnel) - LogPrint (eLogWarning, "Failed to send datagram. All leases expired"); - else - LogPrint (eLogWarning, "Failed to send datagram. No outbound tunnels"); - } - } void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { @@ -98,18 +71,26 @@ namespace datagram if (verified) { - auto it = m_ReceiversByPorts.find (toPort); - if (it != m_ReceiversByPorts.end ()) - it->second (identity, fromPort, toPort, buf + headerLen, len -headerLen); - else if (m_Receiver != nullptr) - m_Receiver (identity, fromPort, toPort, buf + headerLen, len -headerLen); + auto r = FindReceiver(toPort); + if(r) + r(identity, fromPort, toPort, buf + headerLen, len -headerLen); else - LogPrint (eLogWarning, "Receiver for datagram is not set"); + LogPrint (eLogWarning, "DatagramDestination: no receiver for port ", toPort); } else LogPrint (eLogWarning, "Datagram signature verification failed"); } + DatagramDestination::Receiver DatagramDestination::FindReceiver(uint16_t port) + { + std::lock_guard lock(m_ReceiversMutex); + Receiver r = m_Receiver; + auto itr = m_ReceiversByPorts.find(port); + if (itr != m_ReceiversByPorts.end()) + r = itr->second; + return r; + } + void DatagramDestination::HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { // unzip it @@ -137,7 +118,338 @@ namespace datagram else msg = nullptr; return msg; - } + } + + void DatagramDestination::ScheduleCleanup() + { + m_CleanupTimer.expires_from_now(boost::posix_time::seconds(DATAGRAM_SESSION_CLEANUP_INTERVAL)); + m_CleanupTimer.async_wait(std::bind(&DatagramDestination::HandleCleanUp, this, std::placeholders::_1)); + } + + void DatagramDestination::HandleCleanUp(const boost::system::error_code & ecode) + { + if(ecode) + return; + std::lock_guard lock(m_SessionsMutex); + auto now = i2p::util::GetMillisecondsSinceEpoch(); + LogPrint(eLogDebug, "DatagramDestination: clean up sessions"); + std::vector expiredSessions; + // for each session ... + for (auto & e : m_Sessions) { + // check if expired + if(now - e.second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE) + expiredSessions.push_back(e.first); // we are expired + } + // for each expired session ... + for (auto & ident : expiredSessions) { + // remove the expired session + LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", ident.ToBase32()); + m_Sessions.erase(ident); + } + m_Owner->CleanupExpiredTags(); + ScheduleCleanup(); + } + + std::shared_ptr DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident) + { + std::shared_ptr session = nullptr; + std::lock_guard lock(m_SessionsMutex); + auto itr = m_Sessions.find(ident); + if (itr == m_Sessions.end()) { + // not found, create new session + session = std::make_shared(m_Owner, ident); + m_Sessions[ident] = session; + } else { + session = itr->second; + } + return session; + } + + std::shared_ptr DatagramDestination::GetInfoForRemote(const i2p::data::IdentHash & remote) + { + std::lock_guard lock(m_SessionsMutex); + for ( auto & item : m_Sessions) + { + if(item.first == remote) return std::make_shared(item.second->GetSessionInfo()); + } + return nullptr; + } + + DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination, + const i2p::data::IdentHash & remoteIdent) : + m_LocalDestination(localDestination), + m_RemoteIdentity(remoteIdent), + m_LastUse(i2p::util::GetMillisecondsSinceEpoch ()), + m_LastPathChange(0), + m_LastSuccess(0) + { + } + + void DatagramSession::SendMsg(std::shared_ptr msg) + { + // we used this session + m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); + // schedule send + m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg)); + } + + DatagramSession::Info DatagramSession::GetSessionInfo() const + { + if(!m_RoutingSession) + return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + + auto routingPath = m_RoutingSession->GetSharedRoutingPath(); + if (!routingPath) + return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + auto lease = routingPath->remoteLease; + auto tunnel = routingPath->outboundTunnel; + if(lease) + { + if(tunnel) + return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess); + else + return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse, m_LastSuccess); + } + else if(tunnel) + return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess); + else + return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + } + + void DatagramSession::HandleSend(std::shared_ptr msg) + { + if(!m_RoutingSession) + { + // try to get one + if(m_RemoteLeaseSet) m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); + else + { + UpdateLeaseSet(msg); + return; + } + } + // do we have a routing session? + if(m_RoutingSession) + { + // should we switch paths? + if(ShouldUpdateRoutingPath ()) + { + LogPrint(eLogDebug, "DatagramSession: try getting new routing path"); + // try switching paths + auto path = GetNextRoutingPath(); + if(path) + UpdateRoutingPath (path); + else + ResetRoutingPath(); + } + auto routingPath = m_RoutingSession->GetSharedRoutingPath (); + // make sure we have a routing path + if (routingPath) + { + auto outboundTunnel = routingPath->outboundTunnel; + if (outboundTunnel) + { + if(outboundTunnel->IsEstablished()) + { + m_LastSuccess = i2p::util::GetMillisecondsSinceEpoch (); + // we have a routing path and routing session and the outbound tunnel we are using is good + // wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW + auto m = m_RoutingSession->WrapSingleMessage(msg); + routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{ + i2p::tunnel::eDeliveryTypeTunnel, + routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, + m + }}); + return; + } + } + } + } + auto now = i2p::util::GetMillisecondsSinceEpoch (); + // if this path looks dead reset the routing path since we didn't seem to be able to get a path in time + if (m_LastPathChange && now - m_LastPathChange >= DATAGRAM_SESSION_PATH_TIMEOUT ) ResetRoutingPath(); + UpdateLeaseSet(msg); + + } + + void DatagramSession::UpdateRoutingPath(const std::shared_ptr & path) + { + if(m_RoutingSession == nullptr && m_RemoteLeaseSet) + m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); + if(!m_RoutingSession) return; + // set routing path and update time we last updated the routing path + m_RoutingSession->SetSharedRoutingPath (path); + m_LastPathChange = i2p::util::GetMillisecondsSinceEpoch (); + } + + bool DatagramSession::ShouldUpdateRoutingPath() const + { + auto now = i2p::util::GetMillisecondsSinceEpoch (); + // we need to rotate paths becuase the routing path is too old + if (now - m_LastPathChange >= DATAGRAM_SESSION_PATH_SWITCH_INTERVAL) return true; + // our path looks dead so we need to rotate paths + if (now - m_LastSuccess >= DATAGRAM_SESSION_PATH_TIMEOUT) return true; + // if we have a routing session and routing path we don't need to switch paths + return m_RoutingSession != nullptr && m_RoutingSession->GetSharedRoutingPath () != nullptr; + } + + + bool DatagramSession::ShouldSwitchLease() const + { + std::shared_ptr routingPath = nullptr; + std::shared_ptr currentLease = nullptr; + if(m_RoutingSession) + routingPath = m_RoutingSession->GetSharedRoutingPath (); + if(routingPath) + currentLease = routingPath->remoteLease; + if(currentLease) // if we have a lease return true if it's about to expire otherwise return false + return currentLease->ExpiresWithin( DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE ); + // we have no current lease, we should switch + return true; + } + + std::shared_ptr DatagramSession::GetNextRoutingPath() + { + std::shared_ptr outboundTunnel = nullptr; + std::shared_ptr routingPath = nullptr; + // get existing routing path if we have one + if(m_RoutingSession) + routingPath = m_RoutingSession->GetSharedRoutingPath(); + // do we have an existing outbound tunnel and routing path? + if(routingPath && routingPath->outboundTunnel) + { + // is the outbound tunnel we are using good? + if (routingPath->outboundTunnel->IsEstablished()) + { + // ya so let's stick with it + outboundTunnel = routingPath->outboundTunnel; + } + else + outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels + } + // do we have an outbound tunnel that works already ? + if(!outboundTunnel) + outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started + + if(outboundTunnel) + { + std::shared_ptr lease = nullptr; + // should we switch leases ? + if (ShouldSwitchLease ()) + { + // yes, get next available lease + lease = GetNextLease(); + } + else if (routingPath) + { + if(routingPath->remoteLease) + { + if(routingPath->remoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE)) + lease = GetNextLease(); + else + lease = routingPath->remoteLease; + } + } + else + lease = GetNextLease(); + if(lease) + { + // we have a valid lease to use and an outbound tunnel + // create new routing path + uint32_t now = i2p::util::GetSecondsSinceEpoch(); + routingPath = std::make_shared(i2p::garlic::GarlicRoutingPath{ + outboundTunnel, + lease, + 0, + now, + 0 + }); + } + else // we don't have a new routing path to give + routingPath = nullptr; + } + return routingPath; + } + + void DatagramSession::ResetRoutingPath() + { + if(m_RoutingSession) + { + auto routingPath = m_RoutingSession->GetSharedRoutingPath(); + if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path + { + // get outbound tunnel on this path + auto outboundTunnel = routingPath->outboundTunnel; + // is this outbound tunnel there and established + if (outboundTunnel && outboundTunnel->IsEstablished()) + m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine + } + // reset the routing path + UpdateRoutingPath(nullptr); + } + } + + std::shared_ptr DatagramSession::GetNextLease() + { + auto now = i2p::util::GetMillisecondsSinceEpoch (); + std::shared_ptr next = nullptr; + if(m_RemoteLeaseSet) + { + std::vector exclude; + for(const auto & ident : m_InvalidIBGW) + exclude.push_back(ident); + // find get all leases that are not in our ban list and are not going to expire within our lease set handover window + fudge + auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude, now] (const i2p::data::Lease & l) -> bool { + if(exclude.size()) + { + auto end = std::end(exclude); + return std::find_if(exclude.begin(), end, [l, now] ( const i2p::data::IdentHash & ident) -> bool { + return ident == l.tunnelGateway; + }) != end; + } + else + return false; + }); + if(leases.size()) + { + // pick random valid next lease + uint32_t idx = rand() % leases.size(); + next = leases[idx]; + } + else + LogPrint(eLogWarning, "DatagramDestination: no leases to use"); + } + return next; + } + + void DatagramSession::UpdateLeaseSet(std::shared_ptr msg) + { + LogPrint(eLogInfo, "DatagramSession: updating lease set"); + m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg)); + } + + void DatagramSession::HandleGotLeaseSet(std::shared_ptr remoteIdent, std::shared_ptr msg) + { + if(remoteIdent) + { + // update routing session + if(m_RoutingSession) + m_RoutingSession = nullptr; + m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true); + // clear invalid IBGW as we have a new lease set + m_InvalidIBGW.clear(); + m_RemoteLeaseSet = remoteIdent; + // update routing path + auto path = GetNextRoutingPath(); + if (path) + UpdateRoutingPath(path); + else + ResetRoutingPath(); + // send the message that was queued if it was provided + if(msg) + HandleSend(msg); + } + } } } diff --git a/Datagram.h b/Datagram.h index c593fad2..06b3048e 100644 --- a/Datagram.h +++ b/Datagram.h @@ -9,6 +9,7 @@ #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" +#include "Garlic.h" namespace i2p { @@ -18,13 +19,100 @@ namespace client } namespace datagram { - const size_t MAX_DATAGRAM_SIZE = 32768; + + // seconds interval for cleanup timer + const int DATAGRAM_SESSION_CLEANUP_INTERVAL = 3; + // milliseconds for max session idle time + const uint64_t DATAGRAM_SESSION_MAX_IDLE = 10 * 60 * 1000; + // milliseconds for how long we try sticking to a dead routing path before trying to switch + const uint64_t DATAGRAM_SESSION_PATH_TIMEOUT = 5000; + // milliseconds interval a routing path is used before switching + const uint64_t DATAGRAM_SESSION_PATH_SWITCH_INTERVAL = 60 * 1000; + // milliseconds before lease expire should we try switching leases + const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW = 10 * 1000; + // milliseconds fudge factor for leases handover + const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE = 1000; + + + class DatagramSession + { + public: + DatagramSession(i2p::client::ClientDestination * localDestination, + const i2p::data::IdentHash & remoteIdent); + + /** send an i2np message to remote endpoint for this session */ + void SendMsg(std::shared_ptr msg); + /** get the last time in milliseconds for when we used this datagram session */ + uint64_t LastActivity() const { return m_LastUse; } + /** get the last time in milliseconds when we successfully sent data */ + uint64_t LastSuccess() const { return m_LastSuccess; } + struct Info + { + std::shared_ptr IBGW; + std::shared_ptr OBEP; + const uint64_t activity; + const uint64_t success; + Info() : IBGW(nullptr), OBEP(nullptr), activity(0), success(0) {} + Info(const uint8_t * ibgw, const uint8_t * obep, const uint64_t a, const uint64_t s) : + activity(a), + success(s) { + if(ibgw) IBGW = std::make_shared(ibgw); + else IBGW = nullptr; + if(obep) OBEP = std::make_shared(obep); + else OBEP = nullptr; + } + }; + + Info GetSessionInfo() const; + + + private: + + /** update our routing path we are using, mark that we have changed paths */ + void UpdateRoutingPath(const std::shared_ptr & path); + + /** return true if we should switch routing paths because of path lifetime or timeout otherwise false */ + bool ShouldUpdateRoutingPath() const; + + /** return true if we should switch the lease for out routing path otherwise return false */ + bool ShouldSwitchLease() const; + + /** get next usable routing path, try reusing outbound tunnels */ + std::shared_ptr GetNextRoutingPath(); + /** + * mark current routing path as invalid and clear it + * if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time + */ + void ResetRoutingPath(); + + /** get next usable lease, does not fetch or update if expired or have no lease set */ + std::shared_ptr GetNextLease(); + + void HandleSend(std::shared_ptr msg); + void HandleGotLeaseSet(std::shared_ptr remoteIdent, + std::shared_ptr msg); + void UpdateLeaseSet(std::shared_ptr msg=nullptr); + + private: + i2p::client::ClientDestination * m_LocalDestination; + i2p::data::IdentHash m_RemoteIdentity; + std::shared_ptr m_RoutingSession; + // Ident hash of IBGW that are invalid + std::vector m_InvalidIBGW; + std::shared_ptr m_RemoteLeaseSet; + uint64_t m_LastUse; + uint64_t m_LastPathChange; + uint64_t m_LastSuccess; + }; + + const size_t MAX_DATAGRAM_SIZE = 32768; class DatagramDestination { typedef std::function Receiver; public: + DatagramDestination (std::shared_ptr owner); ~DatagramDestination (); @@ -34,21 +122,34 @@ namespace datagram void SetReceiver (const Receiver& receiver) { m_Receiver = receiver; }; void ResetReceiver () { m_Receiver = nullptr; }; - void SetReceiver (const Receiver& receiver, uint16_t port) { m_ReceiversByPorts[port] = receiver; }; - void ResetReceiver (uint16_t port) { m_ReceiversByPorts.erase (port); }; + void SetReceiver (const Receiver& receiver, uint16_t port) { std::lock_guard lock(m_ReceiversMutex); m_ReceiversByPorts[port] = receiver; }; + void ResetReceiver (uint16_t port) { std::lock_guard lock(m_ReceiversMutex); m_ReceiversByPorts.erase (port); }; + std::shared_ptr GetInfoForRemote(const i2p::data::IdentHash & remote); + private: - - void HandleLeaseSetRequestComplete (std::shared_ptr leaseSet, std::shared_ptr msg); + // clean up after next tick + void ScheduleCleanup(); + + // clean up stale sessions and expire tags + void HandleCleanUp(const boost::system::error_code & ecode); + + std::shared_ptr ObtainSession(const i2p::data::IdentHash & ident); std::shared_ptr CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort); - void SendMsg (std::shared_ptr msg, std::shared_ptr remote); + void HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + /** find a receiver by port, if none by port is found try default receiever, otherwise returns nullptr */ + Receiver FindReceiver(uint16_t port); + private: - - std::shared_ptr m_Owner; + i2p::client::ClientDestination * m_Owner; + boost::asio::deadline_timer m_CleanupTimer; Receiver m_Receiver; // default + std::mutex m_SessionsMutex; + std::map > m_Sessions; + std::mutex m_ReceiversMutex; std::map m_ReceiversByPorts; i2p::data::GzipInflator m_Inflator; diff --git a/Destination.cpp b/Destination.cpp index 5842e26d..03d0dd69 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -168,14 +168,32 @@ namespace client else return false; } - + std::shared_ptr LeaseSetDestination::FindLeaseSet (const i2p::data::IdentHash& ident) { + std::lock_guard lock(m_RemoteLeaseSetsMutex); auto it = m_RemoteLeaseSets.find (ident); if (it != m_RemoteLeaseSets.end ()) - { + { if (!it->second->IsExpired ()) + { + if (it->second->ExpiresSoon()) + { + LogPrint(eLogDebug, "Destination: Lease Set expires soon, updating before expire"); + // update now before expiration for smooth handover + RequestDestination(ident, [this, ident] (std::shared_ptr ls) { + if(ls && !ls->IsExpired()) + { + ls->PopulateLeases(); + { + std::lock_guard _lock(m_RemoteLeaseSetsMutex); + m_RemoteLeaseSets[ident] = ls; + } + } + }); + } return it->second; + } else LogPrint (eLogWarning, "Destination: remote LeaseSet expired"); } @@ -185,7 +203,10 @@ namespace client if (ls && !ls->IsExpired ()) { ls->PopulateLeases (); // since we don't store them in netdb - m_RemoteLeaseSets[ident] = ls; + { + std::lock_guard lock(m_RemoteLeaseSetsMutex); + m_RemoteLeaseSets[ident] = ls; + } return ls; } } @@ -280,6 +301,7 @@ namespace client if (buf[DATABASE_STORE_TYPE_OFFSET] == 1) // LeaseSet { LogPrint (eLogDebug, "Remote LeaseSet"); + std::lock_guard lock(m_RemoteLeaseSetsMutex); auto it = m_RemoteLeaseSets.find (buf + DATABASE_STORE_KEY_OFFSET); if (it != m_RemoteLeaseSets.end ()) { @@ -628,6 +650,7 @@ namespace client void LeaseSetDestination::CleanupRemoteLeaseSets () { auto ts = i2p::util::GetMillisecondsSinceEpoch (); + std::lock_guard lock(m_RemoteLeaseSetsMutex); for (auto it = m_RemoteLeaseSets.begin (); it != m_RemoteLeaseSets.end ();) { if (it->second->IsEmpty () || ts > it->second->GetExpirationTime ()) // leaseset expired @@ -642,7 +665,8 @@ namespace client ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params): LeaseSetDestination (isPublic, params), - m_Keys (keys), m_DatagramDestination (nullptr) + m_Keys (keys), m_DatagramDestination (nullptr), + m_ReadyChecker(GetService()) { if (isPublic) PersistTemporaryKeys (); @@ -654,8 +678,6 @@ namespace client ClientDestination::~ClientDestination () { - if (m_DatagramDestination) - delete m_DatagramDestination; } bool ClientDestination::Start () @@ -676,22 +698,44 @@ namespace client { if (LeaseSetDestination::Stop ()) { + m_ReadyChecker.cancel(); m_StreamingDestination->Stop (); m_StreamingDestination = nullptr; for (auto& it: m_StreamingDestinationsByPorts) it.second->Stop (); - if (m_DatagramDestination) - { - auto d = m_DatagramDestination; - m_DatagramDestination = nullptr; - delete d; - } - return true; + if(m_DatagramDestination) + delete m_DatagramDestination; + m_DatagramDestination = nullptr; + return true; } else return false; } + void ClientDestination::Ready(ReadyPromise & p) + { + ScheduleCheckForReady(&p); + } + + void ClientDestination::ScheduleCheckForReady(ReadyPromise * p) + { + // tick every 100ms + m_ReadyChecker.expires_from_now(boost::posix_time::milliseconds(100)); + m_ReadyChecker.async_wait([&, p] (const boost::system::error_code & ecode) { + HandleCheckForReady(ecode, p); + }); + } + + void ClientDestination::HandleCheckForReady(const boost::system::error_code & ecode, ReadyPromise * p) + { + if(ecode) // error happened + p->set_value(nullptr); + else if(IsReady()) // we are ready + p->set_value(std::shared_ptr(this)); + else // we are not ready + ScheduleCheckForReady(p); + } + void ClientDestination::HandleDataMessage (const uint8_t * buf, size_t len) { uint32_t length = bufbe32toh (buf); @@ -796,9 +840,9 @@ namespace client return dest; } - i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination () + i2p::datagram::DatagramDestination * ClientDestination::CreateDatagramDestination () { - if (!m_DatagramDestination) + if (m_DatagramDestination == nullptr) m_DatagramDestination = new i2p::datagram::DatagramDestination (GetSharedFromThis ()); return m_DatagramDestination; } diff --git a/Destination.h b/Destination.h index ac7ef7c9..8150c72d 100644 --- a/Destination.h +++ b/Destination.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "Identity.h" #include "TunnelPool.h" @@ -122,6 +123,7 @@ namespace client std::thread * m_Thread; boost::asio::io_service m_Service; boost::asio::io_service::work m_Work; + mutable std::mutex m_RemoteLeaseSetsMutex; std::map > m_RemoteLeaseSets; std::map > m_LeaseSetRequests; @@ -142,13 +144,19 @@ namespace client class ClientDestination: public LeaseSetDestination { public: - + // type for informing that a client destination is ready + typedef std::promise > ReadyPromise; + ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic, const std::map * params = nullptr); ~ClientDestination (); bool Start (); bool Stop (); - + + // informs promise with shared_from_this() when this destination is ready to use + // if cancelled before ready, informs promise with nullptr + void Ready(ReadyPromise & p); + const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; void Sign (const uint8_t * buf, int len, uint8_t * signature) const { m_Keys.Sign (buf, len, signature); }; @@ -163,8 +171,8 @@ namespace client bool IsAcceptingStreams () const; // datagram - i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; }; - i2p::datagram::DatagramDestination * CreateDatagramDestination (); + i2p::datagram::DatagramDestination * GetDatagramDestination () const { return m_DatagramDestination; }; + i2p::datagram::DatagramDestination * CreateDatagramDestination (); // implements LocalDestination const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; }; @@ -182,6 +190,9 @@ namespace client { return std::static_pointer_cast(shared_from_this ()); } void PersistTemporaryKeys (); + void ScheduleCheckForReady(ReadyPromise * p); + void HandleCheckForReady(const boost::system::error_code & ecode, ReadyPromise * p); + private: i2p::data::PrivateKeys m_Keys; @@ -189,8 +200,10 @@ namespace client std::shared_ptr m_StreamingDestination; // default std::map > m_StreamingDestinationsByPorts; - i2p::datagram::DatagramDestination * m_DatagramDestination; + i2p::datagram::DatagramDestination * m_DatagramDestination; + boost::asio::deadline_timer m_ReadyChecker; + public: // for HTTP only diff --git a/FS.cpp b/FS.cpp index 663c1916..a809e8c4 100644 --- a/FS.cpp +++ b/FS.cpp @@ -158,6 +158,13 @@ namespace fs { } void HashedStorage::Traverse(std::vector & files) { + Iterate([&files] (const std::string & fname) { + files.push_back(fname); + }); + } + + void HashedStorage::Iterate(FilenameVisitor v) + { boost::filesystem::path p(root); boost::filesystem::recursive_directory_iterator it(p); boost::filesystem::recursive_directory_iterator end; @@ -166,7 +173,7 @@ namespace fs { if (!boost::filesystem::is_regular_file( it->status() )) continue; const std::string & t = it->path().string(); - files.push_back(t); + v(t); } } } // fs diff --git a/FS.h b/FS.h index 0437ccf9..c476aa63 100644 --- a/FS.h +++ b/FS.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace i2p { namespace fs { @@ -43,6 +44,7 @@ namespace fs { std::string suffix; /**< suffix of file in storage (extension) */ public: + typedef std::function FilenameVisitor; HashedStorage(const char *n, const char *p1, const char *p2, const char *s): name(n), prefix1(p1), prefix2(p2), suffix(s) {}; @@ -58,6 +60,8 @@ namespace fs { void Remove(const std::string & ident); /** find all files in storage and store list in provided vector */ void Traverse(std::vector & files); + /** visit every file in this storage with a visitor */ + void Iterate(FilenameVisitor v); }; /** @brief Returns current application name, default 'i2pd' */ diff --git a/Garlic.cpp b/Garlic.cpp index 3521ffcf..1cd1676e 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -178,7 +178,7 @@ namespace garlic // create message if (!tagFound) // new session { - LogPrint (eLogWarning, "Garlic: No tags available, will use ElGamal"); + LogPrint (eLogInfo, "Garlic: No tags available, will use ElGamal"); if (!m_Destination) { LogPrint (eLogError, "Garlic: Can't use ElGamal for unknown destination"); diff --git a/Garlic.h b/Garlic.h index 8d2e850a..76d8eaf3 100644 --- a/Garlic.h +++ b/Garlic.h @@ -108,7 +108,7 @@ namespace garlic std::shared_ptr GetSharedRoutingPath (); void SetSharedRoutingPath (std::shared_ptr path); - + private: size_t CreateAESBlock (uint8_t * buf, std::shared_ptr msg); diff --git a/HTTPServer.cpp b/HTTPServer.cpp index db46404f..bffe8614 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -337,7 +337,8 @@ namespace http { s << "" << it->GetWindowSize () << ""; s << "" << (int)it->GetStatus () << ""; s << "
\r\n" << std::endl; - } + } + s << ""; } } diff --git a/I2PService.cpp b/I2PService.cpp index 4f907f18..f5ebcb0c 100644 --- a/I2PService.cpp +++ b/I2PService.cpp @@ -32,7 +32,12 @@ namespace client } } - TCPIPPipe::TCPIPPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream) {} + TCPIPPipe::TCPIPPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream) + { + boost::asio::socket_base::receive_buffer_size option(TCP_IP_PIPE_BUFFER_SIZE); + upstream->set_option(option); + downstream->set_option(option); + } TCPIPPipe::~TCPIPPipe() { diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index cca56120..0c58ba9d 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -9,6 +9,17 @@ namespace i2p { namespace client { + + /** set standard socket options */ + static void I2PTunnelSetSocketOptions(std::shared_ptr socket) + { + if (socket && socket->is_open()) + { + boost::asio::socket_base::receive_buffer_size option(I2P_TUNNEL_CONNECTION_BUFFER_SIZE); + socket->set_option(option); + } + } + I2PTunnelConnection::I2PTunnelConnection (I2PService * owner, std::shared_ptr socket, std::shared_ptr leaseSet, int port): I2PServiceHandler(owner), m_Socket (socket), m_RemoteEndpoint (socket->remote_endpoint ()), @@ -34,7 +45,7 @@ namespace client I2PTunnelConnection::~I2PTunnelConnection () { } - + void I2PTunnelConnection::I2PConnect (const uint8_t * msg, size_t len) { if (m_Stream) @@ -50,6 +61,7 @@ namespace client void I2PTunnelConnection::Connect () { + I2PTunnelSetSocketOptions(m_Socket); if (m_Socket) { #ifdef __linux__ // bind to 127.x.x.x address @@ -67,7 +79,7 @@ namespace client m_Socket->bind (boost::asio::ip::tcp::endpoint (ourIP, 0)); } #endif - m_Socket->async_connect (m_RemoteEndpoint, std::bind (&I2PTunnelConnection::HandleConnect, + m_Socket->async_connect (m_RemoteEndpoint, std::bind (&I2PTunnelConnection::HandleConnect, shared_from_this (), std::placeholders::_1)); } } @@ -401,7 +413,7 @@ namespace client { m_PortDestination = localDestination->CreateStreamingDestination (inport > 0 ? inport : port, gzip); } - + void I2PServerTunnel::Start () { m_Endpoint.port (m_Port); @@ -516,5 +528,241 @@ namespace client conn->Connect (); } -} -} + void I2PUDPServerTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + std::lock_guard lock(m_SessionsMutex); + auto session = ObtainUDPSession(from, toPort, fromPort); + session->IPSocket.send_to(boost::asio::buffer(buf, len), m_RemoteEndpoint); + session->LastActivity = i2p::util::GetMillisecondsSinceEpoch(); + + } + + void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) { + std::lock_guard lock(m_SessionsMutex); + uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); + std::remove_if(m_Sessions.begin(), m_Sessions.end(), [now, delta](const UDPSession * u) -> bool { + return now - u->LastActivity >= delta; + }); + } + + UDPSession * I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) + { + auto ih = from.GetIdentHash(); + for ( UDPSession * s : m_Sessions ) + { + if ( s->Identity == ih) + { + /** found existing session */ + LogPrint(eLogDebug, "UDPServer: found session ", s->IPSocket.local_endpoint(), " ", ih.ToBase32()); + return s; + } + } + /** create new udp session */ + boost::asio::ip::udp::endpoint ep(m_LocalAddress, 0); + m_Sessions.push_back(new UDPSession(ep, m_LocalDest, m_RemoteEndpoint, ih, localPort, remotePort)); + return m_Sessions.back(); + } + + UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint, + const std::shared_ptr & localDestination, + boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash to, + uint16_t ourPort, uint16_t theirPort) : + m_Destination(localDestination->GetDatagramDestination()), + m_Service(localDestination->GetService()), + IPSocket(localDestination->GetService(), localEndpoint), + Identity(to), + SendEndpoint(endpoint), + LastActivity(i2p::util::GetMillisecondsSinceEpoch()), + LocalPort(ourPort), + RemotePort(theirPort) + { + Receive(); + } + + + void UDPSession::Receive() { + LogPrint(eLogDebug, "UDPSession: Receive"); + IPSocket.async_receive_from(boost::asio::buffer(m_Buffer, I2P_UDP_MAX_MTU), + FromEndpoint, std::bind(&UDPSession::HandleReceived, this, std::placeholders::_1, std::placeholders::_2)); + } + + void UDPSession::HandleReceived(const boost::system::error_code & ecode, std::size_t len) + { + if(!ecode) + { + LogPrint(eLogDebug, "UDPSession: forward ", len, "B from ", FromEndpoint); + LastActivity = i2p::util::GetMillisecondsSinceEpoch(); + uint8_t * data = new uint8_t[len]; + memcpy(data, m_Buffer, len); + m_Service.post([&,len, data] () { + m_Destination->SendDatagramTo(data, len, Identity, 0, 0); + delete [] data; + }); + + Receive(); + } else { + LogPrint(eLogError, "UDPSession: ", ecode.message()); + } + } + + + + I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, + const boost::asio::ip::address& localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) : + m_Name(name), + LocalPort(port), + m_LocalAddress(localAddress), + m_RemoteEndpoint(forwardTo) + { + m_LocalDest = localDestination; + m_LocalDest->Start(); + auto dgram = m_LocalDest->CreateDatagramDestination(); + dgram->SetReceiver(std::bind(&I2PUDPServerTunnel::HandleRecvFromI2P, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); + } + + I2PUDPServerTunnel::~I2PUDPServerTunnel() + { + auto dgram = m_LocalDest->GetDatagramDestination(); + if (dgram) dgram->ResetReceiver(); + + LogPrint(eLogInfo, "UDPServer: done"); + } + + void I2PUDPServerTunnel::Start() { + m_LocalDest->Start(); + } + + std::vector > I2PUDPServerTunnel::GetSessions() + { + std::vector > sessions; + std::lock_guard lock(m_SessionsMutex); + for ( UDPSession * s : m_Sessions ) + { + if (!s->m_Destination) continue; + auto info = s->m_Destination->GetInfoForRemote(s->Identity); + if(!info) continue; + + auto sinfo = std::make_shared(); + sinfo->Name = m_Name; + sinfo->LocalIdent = std::make_shared(m_LocalDest->GetIdentHash().data()); + sinfo->RemoteIdent = std::make_shared(s->Identity.data()); + sinfo->CurrentIBGW = info->IBGW; + sinfo->CurrentOBEP = info->OBEP; + sessions.push_back(sinfo); + } + return sessions; + } + + I2PUDPClientTunnel::I2PUDPClientTunnel(const std::string & name, const std::string &remoteDest, + boost::asio::ip::udp::endpoint localEndpoint, + std::shared_ptr localDestination, + uint16_t remotePort) : + m_Name(name), + m_Session(nullptr), + m_RemoteDest(remoteDest), + m_LocalDest(localDestination), + m_LocalEndpoint(localEndpoint), + m_RemoteIdent(nullptr), + m_ResolveThread(nullptr), + LocalPort(localEndpoint.port()), + RemotePort(remotePort), + m_cancel_resolve(false) + { + auto dgram = m_LocalDest->CreateDatagramDestination(); + dgram->SetReceiver(std::bind(&I2PUDPClientTunnel::HandleRecvFromI2P, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4, + std::placeholders::_5)); + } + + + + void I2PUDPClientTunnel::Start() { + m_LocalDest->Start(); + if (m_ResolveThread == nullptr) + m_ResolveThread = new std::thread(std::bind(&I2PUDPClientTunnel::TryResolving, this)); + } + + std::vector > I2PUDPClientTunnel::GetSessions() + { + std::vector > infos; + if(m_Session && m_LocalDest) + { + auto s = m_Session; + if (s->m_Destination) + { + auto info = m_Session->m_Destination->GetInfoForRemote(s->Identity); + if(info) + { + auto sinfo = std::make_shared(); + sinfo->Name = m_Name; + sinfo->LocalIdent = std::make_shared(m_LocalDest->GetIdentHash().data()); + sinfo->RemoteIdent = std::make_shared(s->Identity.data()); + sinfo->CurrentIBGW = info->IBGW; + sinfo->CurrentOBEP = info->OBEP; + infos.push_back(sinfo); + } + } + } + return infos; + } + + void I2PUDPClientTunnel::TryResolving() { + LogPrint(eLogInfo, "UDP Tunnel: Trying to resolve ", m_RemoteDest); + m_RemoteIdent = new i2p::data::IdentHash; + m_RemoteIdent->Fill(0); + + while(!context.GetAddressBook().GetIdentHash(m_RemoteDest, *m_RemoteIdent) && !m_cancel_resolve) + { + LogPrint(eLogWarning, "UDP Tunnel: failed to lookup ", m_RemoteDest); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + if(m_cancel_resolve) + { + LogPrint(eLogError, "UDP Tunnel: lookup of ", m_RemoteDest, " was cancelled"); + return; + } + LogPrint(eLogInfo, "UDP Tunnel: resolved ", m_RemoteDest, " to ", m_RemoteIdent->ToBase32()); + // delete existing session + if(m_Session) delete m_Session; + + boost::asio::ip::udp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 0); + m_Session = new UDPSession(m_LocalEndpoint, m_LocalDest, ep, *m_RemoteIdent, LocalPort, RemotePort); + } + + void I2PUDPClientTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) + { + if(m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent) + { + // address match + if(m_Session) + { + // tell session + LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32()); + m_Session->IPSocket.send_to(boost::asio::buffer(buf, len), m_Session->FromEndpoint); + } + else + LogPrint(eLogWarning, "UDP Client: no session"); + } + else + LogPrint(eLogWarning, "UDP Client: unwarrented traffic from ", from.GetIdentHash().ToBase32()); + + } + + I2PUDPClientTunnel::~I2PUDPClientTunnel() { + auto dgram = m_LocalDest->GetDatagramDestination(); + if (dgram) dgram->ResetReceiver(); + + if (m_Session) delete m_Session; + m_cancel_resolve = true; + + if(m_ResolveThread) + { + m_ResolveThread->join(); + delete m_ResolveThread; + m_ResolveThread = nullptr; + } + if (m_RemoteIdent) delete m_RemoteIdent; + } +} +} diff --git a/I2PTunnel.h b/I2PTunnel.h index bec0f9a4..2418da30 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -9,6 +9,7 @@ #include #include "Identity.h" #include "Destination.h" +#include "Datagram.h" #include "Streaming.h" #include "I2PService.h" @@ -128,8 +129,113 @@ namespace client std::string m_Name, m_Destination; const i2p::data::IdentHash * m_DestinationIdentHash; int m_DestinationPort; - }; + }; + + + /** 2 minute timeout for udp sessions */ + const uint64_t I2P_UDP_SESSION_TIMEOUT = 1000 * 60 * 2; + + /** max size for i2p udp */ + const size_t I2P_UDP_MAX_MTU = i2p::datagram::MAX_DATAGRAM_SIZE; + struct UDPSession + { + i2p::datagram::DatagramDestination * m_Destination; + boost::asio::io_service & m_Service; + boost::asio::ip::udp::socket IPSocket; + i2p::data::IdentHash Identity; + boost::asio::ip::udp::endpoint FromEndpoint; + boost::asio::ip::udp::endpoint SendEndpoint; + uint64_t LastActivity; + + uint16_t LocalPort; + uint16_t RemotePort; + + uint8_t m_Buffer[I2P_UDP_MAX_MTU]; + + UDPSession(boost::asio::ip::udp::endpoint localEndpoint, + const std::shared_ptr & localDestination, + boost::asio::ip::udp::endpoint remote, const i2p::data::IdentHash ident, + uint16_t ourPort, uint16_t theirPort); + void HandleReceived(const boost::system::error_code & ecode, std::size_t len); + void Receive(); + }; + + + /** read only info about a datagram session */ + struct DatagramSessionInfo + { + /** the name of this forward */ + std::string Name; + /** ident hash of local destination */ + std::shared_ptr LocalIdent; + /** ident hash of remote destination */ + std::shared_ptr RemoteIdent; + /** ident hash of IBGW in use currently in this session or nullptr if none is set */ + std::shared_ptr CurrentIBGW; + /** ident hash of OBEP in use for this session or nullptr if none is set */ + std::shared_ptr CurrentOBEP; + /** i2p router's udp endpoint */ + boost::asio::ip::udp::endpoint LocalEndpoint; + /** client's udp endpoint */ + boost::asio::ip::udp::endpoint RemoteEndpoint; + /** how long has this converstation been idle in ms */ + uint64_t idle; + }; + + /** server side udp tunnel, many i2p inbound to 1 ip outbound */ + class I2PUDPServerTunnel + { + public: + I2PUDPServerTunnel(const std::string & name, + std::shared_ptr localDestination, + const boost::asio::ip::address & localAddress, + boost::asio::ip::udp::endpoint forwardTo, uint16_t port); + ~I2PUDPServerTunnel(); + /** expire stale udp conversations */ + void ExpireStale(const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); + void Start(); + const char * GetName() const { return m_Name.c_str(); } + std::vector > GetSessions(); + private: + void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + UDPSession * ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); + private: + const std::string m_Name; + const uint16_t LocalPort; + boost::asio::ip::address m_LocalAddress; + boost::asio::ip::udp::endpoint m_RemoteEndpoint; + std::mutex m_SessionsMutex; + std::vector m_Sessions; + std::shared_ptr m_LocalDest; + }; + + class I2PUDPClientTunnel + { + public: + I2PUDPClientTunnel(const std::string & name, const std::string &remoteDest, + boost::asio::ip::udp::endpoint localEndpoint, std::shared_ptr localDestination, + uint16_t remotePort); + ~I2PUDPClientTunnel(); + void Start(); + const char * GetName() const { return m_Name.c_str(); } + std::vector > GetSessions(); + bool IsLocalDestination(const i2p::data::IdentHash & destination) const { return destination == m_LocalDest->GetIdentHash(); } + private: + void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); + void TryResolving(); + const std::string m_Name; + UDPSession * m_Session; + const std::string m_RemoteDest; + std::shared_ptr m_LocalDest; + const boost::asio::ip::udp::endpoint m_LocalEndpoint; + i2p::data::IdentHash * m_RemoteIdent; + std::thread * m_ResolveThread; + uint16_t LocalPort; + uint16_t RemotePort; + bool m_cancel_resolve; + }; + class I2PServerTunnel: public I2PService { public: @@ -148,8 +254,10 @@ namespace client const boost::asio::ip::tcp::endpoint& GetEndpoint () const { return m_Endpoint; } const char* GetName() { return m_Name.c_str (); } - - private: + + void SetMaxConnsPerMinute(const uint32_t conns) { m_PortDestination->SetMaxConnsPerMinute(conns); } + + private: void HandleResolve (const boost::system::error_code& ecode, boost::asio::ip::tcp::resolver::iterator it, std::shared_ptr resolver); @@ -165,7 +273,7 @@ namespace client boost::asio::ip::tcp::endpoint m_Endpoint; std::shared_ptr m_PortDestination; std::set m_AccessList; - bool m_IsAccessList; + bool m_IsAccessList; }; class I2PServerTunnelHTTP: public I2PServerTunnel diff --git a/Identity.cpp b/Identity.cpp index 71ca007f..6d37d34e 100644 --- a/Identity.cpp +++ b/Identity.cpp @@ -200,7 +200,9 @@ namespace data } memcpy (&m_StandardIdentity, buf, DEFAULT_IDENTITY_SIZE); - delete[] m_ExtendedBuffer; m_ExtendedBuffer = nullptr; + if(m_ExtendedBuffer) delete[] m_ExtendedBuffer; + m_ExtendedBuffer = nullptr; + m_ExtendedLen = bufbe16toh (m_StandardIdentity.certificate + 1); if (m_ExtendedLen) { @@ -410,6 +412,7 @@ namespace data memcpy (m_PrivateKey, buf + ret, 256); // private key always 256 ret += 256; size_t signingPrivateKeySize = m_Public->GetSigningPrivateKeyLen (); + if(signingPrivateKeySize + ret > len) return 0; // overflow memcpy (m_SigningPrivateKey, buf + ret, signingPrivateKeySize); ret += signingPrivateKeySize; m_Signer = nullptr; @@ -422,7 +425,8 @@ namespace data size_t ret = m_Public->ToBuffer (buf, len); memcpy (buf + ret, m_PrivateKey, 256); // private key always 256 ret += 256; - size_t signingPrivateKeySize = m_Public->GetSigningPrivateKeyLen (); + size_t signingPrivateKeySize = m_Public->GetSigningPrivateKeyLen (); + if(ret + signingPrivateKeySize > len) return 0; // overflow memcpy (buf + ret, m_SigningPrivateKey, signingPrivateKeySize); ret += signingPrivateKeySize; return ret; @@ -452,11 +456,12 @@ namespace data void PrivateKeys::Sign (const uint8_t * buf, int len, uint8_t * signature) const { - if (m_Signer) - m_Signer->Sign (buf, len, signature); + if (!m_Signer) + CreateSigner(); + m_Signer->Sign (buf, len, signature); } - void PrivateKeys::CreateSigner () + void PrivateKeys::CreateSigner () const { switch (m_Public->GetSigningKeyType ()) { diff --git a/Identity.h b/Identity.h index 841acf65..8f3e9e9d 100644 --- a/Identity.h +++ b/Identity.h @@ -92,6 +92,8 @@ namespace data CryptoKeyType GetCryptoKeyType () const; void DropVerifier () const; // to save memory + bool operator == (const IdentityEx & other) const { return GetIdentHash() == other.GetIdentHash(); } + private: void CreateVerifier () const; @@ -133,14 +135,14 @@ namespace data private: - void CreateSigner (); + void CreateSigner () const; private: std::shared_ptr m_Public; uint8_t m_PrivateKey[256]; uint8_t m_SigningPrivateKey[1024]; // assume private key doesn't exceed 1024 bytes - std::unique_ptr m_Signer; + mutable std::unique_ptr m_Signer; }; // kademlia diff --git a/LeaseSet.cpp b/LeaseSet.cpp index 89cad0df..04dc77c5 100644 --- a/LeaseSet.cpp +++ b/LeaseSet.cpp @@ -162,8 +162,21 @@ namespace data { return ExtractTimestamp (buf, len) > ExtractTimestamp (m_Buffer, m_BufferLen); } - - const std::vector > LeaseSet::GetNonExpiredLeases (bool withThreshold) const + + bool LeaseSet::ExpiresSoon(const uint64_t dlt, const uint64_t fudge) const + { + auto now = i2p::util::GetMillisecondsSinceEpoch (); + if (fudge) now += rand() % fudge; + if (now >= m_ExpirationTime) return true; + return m_ExpirationTime - now <= dlt; + } + + const std::vector > LeaseSet::GetNonExpiredLeases (bool withThreshold) const + { + return GetNonExpiredLeasesExcluding( [] (const Lease & l) -> bool { return false; }, withThreshold); + } + + const std::vector > LeaseSet::GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold) const { auto ts = i2p::util::GetMillisecondsSinceEpoch (); std::vector > leases; @@ -174,7 +187,7 @@ namespace data endDate += LEASE_ENDDATE_THRESHOLD; else endDate -= LEASE_ENDDATE_THRESHOLD; - if (ts < endDate) + if (ts < endDate && !exclude(*it)) leases.push_back (it); } return leases; diff --git a/LeaseSet.h b/LeaseSet.h index c174ac39..95d8bf90 100644 --- a/LeaseSet.h +++ b/LeaseSet.h @@ -7,6 +7,7 @@ #include #include #include "Identity.h" +#include "Timestamp.h" namespace i2p { @@ -24,7 +25,13 @@ namespace data IdentHash tunnelGateway; uint32_t tunnelID; uint64_t endDate; // 0 means invalid - bool isUpdated; // trasient + bool isUpdated; // trasient + /* return true if this lease expires within t millisecond + fudge factor */ + bool ExpiresWithin( const uint64_t t, const uint64_t fudge = 1000 ) const { + auto expire = i2p::util::GetMillisecondsSinceEpoch (); + if(fudge) expire += rand() % fudge; + return endDate - expire >= t; + } }; struct LeaseCmp @@ -38,6 +45,8 @@ namespace data }; }; + typedef std::function LeaseInspectFunc; + const size_t MAX_LS_BUFFER_SIZE = 3072; const size_t LEASE_SIZE = 44; // 32 + 4 + 8 const uint8_t MAX_NUM_LEASES = 16; @@ -56,10 +65,12 @@ namespace data size_t GetBufferLen () const { return m_BufferLen; }; bool IsValid () const { return m_IsValid; }; const std::vector > GetNonExpiredLeases (bool withThreshold = true) const; + const std::vector > GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold = true) const; bool HasExpiredLeases () const; bool IsExpired () const; bool IsEmpty () const { return m_Leases.empty (); }; uint64_t GetExpirationTime () const { return m_ExpirationTime; }; + bool ExpiresSoon(const uint64_t dlt=1000 * 5, const uint64_t fudge = 0) const ; bool operator== (const LeaseSet& other) const { return m_BufferLen == other.m_BufferLen && !memcmp (m_Buffer, other.m_Buffer, m_BufferLen); }; diff --git a/Log.h b/Log.h index 6762899f..bedc98f8 100644 --- a/Log.h +++ b/Log.h @@ -40,8 +40,20 @@ enum LogType { #endif }; +#ifdef _WIN32 + const char LOG_COLOR_ERROR[] = ""; + const char LOG_COLOR_WARNING[] = ""; + const char LOG_COLOR_RESET[] = ""; +#else + const char LOG_COLOR_ERROR[] = "\033[1;31m"; + const char LOG_COLOR_WARNING[] = "\033[1;33m"; + const char LOG_COLOR_RESET[] = "\033[0m"; +#endif + + namespace i2p { namespace log { + struct LogMsg; /* forward declaration */ class Log @@ -177,8 +189,16 @@ void LogPrint (LogLevel level, TArgs... args) // fold message to single string std::stringstream ss(""); - LogPrint (ss, args ...); + if(level == eLogError) // if log level is ERROR color log message red + ss << LOG_COLOR_ERROR; + else if (level == eLogWarning) // if log level is WARN color log message yellow + ss << LOG_COLOR_WARNING; + LogPrint (ss, args ...); + + // reset color + ss << LOG_COLOR_RESET; + auto msg = std::make_shared(level, std::time(nullptr), ss.str()); msg->tid = std::this_thread::get_id(); log.Append(msg); diff --git a/Makefile.linux b/Makefile.linux index b0549f43..ddff76a7 100644 --- a/Makefile.linux +++ b/Makefile.linux @@ -32,9 +32,14 @@ ifeq ($(USE_STATIC),yes) # Using 'getaddrinfo' in statically linked applications requires at runtime # the shared libraries from the glibc version used for linking LIBDIR := /usr/lib - LDLIBS = -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options - LDLIBS += -lssl -lcrypto -lz -ldl -lpthread -lrt - LDLIBS += -static-libstdc++ -static-libgcc -static + LDLIBS = $(LIBDIR)/libboost_system.a + LDLIBS += $(LIBDIR)/libboost_date_time.a + LDLIBS += $(LIBDIR)/libboost_filesystem.a + LDLIBS += $(LIBDIR)/libboost_program_options.a + LDLIBS += $(LIBDIR)/libssl.a + LDLIBS += $(LIBDIR)/libcrypto.a + LDLIBS += $(LIBDIR)/libz.a + LDLIBS += -lpthread -static-libstdc++ -static-libgcc -lrt USE_AESNI := no else LDLIBS = -lcrypto -lssl -lz -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options -lpthread diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 10af063d..20dc9ab0 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -747,6 +747,7 @@ namespace transport auto& addresses = context.GetRouterInfo ().GetAddresses (); for (const auto& address: addresses) { + if (!address) continue; if (address->transportStyle == i2p::data::RouterInfo::eTransportNTCP) { if (address->host.is_v4()) @@ -844,6 +845,7 @@ namespace transport if (it != m_NTCPSessions.end ()) { LogPrint (eLogWarning, "NTCP: session to ", ident.ToBase64 (), " already exists"); + session->Terminate(); return false; } m_NTCPSessions.insert (std::pair >(ident, session)); diff --git a/NetDb.cpp b/NetDb.cpp index dc018326..846f4b3c 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -34,11 +34,11 @@ namespace data } void NetDb::Start () - { + { m_Storage.SetPlace(i2p::fs::GetDataDir()); m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64); InitProfilesStorage (); - m_Families.LoadCertificates (); + m_Families.LoadCertificates (); Load (); if (m_RouterInfos.size () < 25) // reseed if # of router less than 50 Reseed (); @@ -68,7 +68,7 @@ namespace data m_Requests.Stop (); } } - + void NetDb::Run () { uint32_t lastSave = 0, lastPublish = 0, lastExploratory = 0, lastManageRequest = 0, lastDestinationCleanup = 0; @@ -329,6 +329,67 @@ namespace data for ( auto & entry : m_LeaseSets) v(entry.first, entry.second); } + + void NetDb::VisitStoredRouterInfos(RouterInfoVisitor v) + { + m_Storage.Iterate([v] (const std::string & filename) { + auto ri = std::make_shared(filename); + v(ri); + }); + } + + void NetDb::VisitRouterInfos(RouterInfoVisitor v) + { + std::unique_lock lock(m_RouterInfosMutex); + for ( const auto & item : m_RouterInfos ) + v(item.second); + } + + size_t NetDb::VisitRandomRouterInfos(RouterInfoFilter filter, RouterInfoVisitor v, size_t n) + { + std::vector > found; + const size_t max_iters_per_cyle = 3; + size_t iters = max_iters_per_cyle; + while(n > 0) + { + std::unique_lock lock(m_RouterInfosMutex); + uint32_t idx = rand () % m_RouterInfos.size (); + uint32_t i = 0; + for (const auto & it : m_RouterInfos) { + if(i >= idx) // are we at the random start point? + { + // yes, check if we want this one + if(filter(it.second)) + { + // we have a match + --n; + found.push_back(it.second); + // reset max iterations per cycle + iters = max_iters_per_cyle; + break; + } + } + else // not there yet + ++i; + } + // we have enough + if(n == 0) break; + --iters; + // have we tried enough this cycle ? + if(!iters) { + // yes let's try the next cycle + --n; + iters = max_iters_per_cyle; + } + } + // visit the ones we found + size_t visited = 0; + for(const auto & ri : found ) { + v(ri); + ++visited; + } + return visited; + } void NetDb::Load () { diff --git a/NetDb.h b/NetDb.h index 43b069b9..d8ee148a 100644 --- a/NetDb.h +++ b/NetDb.h @@ -8,6 +8,7 @@ #include #include #include +#include #include "Base.h" #include "Gzip.h" @@ -35,7 +36,13 @@ namespace data /** function for visiting a leaseset stored in a floodfill */ typedef std::function)> LeaseSetVisitor; - + + /** function for visiting a router info we have locally */ + typedef std::function)> RouterInfoVisitor; + + /** function for visiting a router info and determining if we want to use it */ + typedef std::function)> RouterInfoFilter; + class NetDb { public: @@ -45,7 +52,7 @@ namespace data void Start (); void Stop (); - + bool AddRouterInfo (const uint8_t * buf, int len); bool AddRouterInfo (const IdentHash& ident, const uint8_t * buf, int len); bool AddLeaseSet (const IdentHash& ident, const uint8_t * buf, int len, std::shared_ptr from); @@ -86,7 +93,12 @@ namespace data /** visit all lease sets we currently store */ void VisitLeaseSets(LeaseSetVisitor v); - + /** visit all router infos we have currently on disk, usually insanely expensive, does not access in memory RI */ + void VisitStoredRouterInfos(RouterInfoVisitor v); + /** visit all router infos we have loaded in memory, cheaper than VisitLocalRouterInfos but locks access while visiting */ + void VisitRouterInfos(RouterInfoVisitor v); + /** visit N random router that match using filter, then visit them with a visitor, return number of RouterInfos that were visited */ + size_t VisitRandomRouterInfos(RouterInfoFilter f, RouterInfoVisitor v, size_t n); private: void Load (); @@ -103,7 +115,7 @@ namespace data std::shared_ptr GetRandomRouter (Filter filter) const; private: - + mutable std::mutex m_LeaseSetsMutex; std::map > m_LeaseSets; mutable std::mutex m_RouterInfosMutex; diff --git a/Streaming.cpp b/Streaming.cpp index e6f5b32f..02e738c8 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -269,9 +269,14 @@ namespace stream } auto sentPacket = *it; uint64_t rtt = ts - sentPacket->sendTime; + if(ts < sentPacket->sendTime) + { + LogPrint(eLogError, "Streaming: Packet ", seqn, "sent from the future, sendTime=", sentPacket->sendTime); + rtt = 1; + } m_RTT = (m_RTT*seqn + rtt)/(seqn + 1); m_RTO = m_RTT*1.5; // TODO: implement it better - LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt); + LogPrint (eLogDebug, "Streaming: Packet ", seqn, " acknowledged rtt=", rtt, " sentTime=", sentPacket->sendTime); m_SentPackets.erase (it++); delete sentPacket; acknowledged = true; @@ -803,7 +808,10 @@ namespace stream StreamingDestination::StreamingDestination (std::shared_ptr owner, uint16_t localPort, bool gzip): m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip), - m_PendingIncomingTimer (m_Owner->GetService ()) + m_PendingIncomingTimer (m_Owner->GetService ()), + m_ConnTrackTimer(m_Owner->GetService()), + m_ConnsPerMinute(DEFAULT_MAX_CONNS_PER_MIN), + m_LastBanClear(i2p::util::GetMillisecondsSinceEpoch()) { } @@ -818,17 +826,23 @@ namespace stream } void StreamingDestination::Start () - { + { + ScheduleConnTrack(); } void StreamingDestination::Stop () { ResetAcceptor (); m_PendingIncomingTimer.cancel (); + m_ConnTrackTimer.cancel(); { std::unique_lock l(m_StreamsMutex); m_Streams.clear (); - } + } + { + std::unique_lock l(m_ConnsMutex); + m_Conns.clear (); + } } void StreamingDestination::HandleNextPacket (Packet * packet) @@ -852,6 +866,18 @@ namespace stream auto incomingStream = CreateNewIncomingStream (); uint32_t receiveStreamID = packet->GetReceiveStreamID (); incomingStream->HandleNextPacket (packet); // SYN + auto ident = incomingStream->GetRemoteIdentity(); + if(ident) + { + auto ih = ident->GetIdentHash(); + if(DropNewStream(ih)) + { + // drop + LogPrint(eLogWarning, "Streaming: Dropping connection, too many inbound streams from ", ih.ToBase32()); + incomingStream->Terminate(); + return; + } + } // handle saved packets if any { auto it = m_SavedPackets.find (receiveStreamID); @@ -862,7 +888,7 @@ namespace stream incomingStream->HandleNextPacket (it1); m_SavedPackets.erase (it); } - } + } // accept if (m_Acceptor != nullptr) m_Acceptor (incomingStream); @@ -1015,6 +1041,64 @@ namespace stream else msg = nullptr; return msg; - } + } + + void StreamingDestination::SetMaxConnsPerMinute(const uint32_t conns) + { + m_ConnsPerMinute = conns; + LogPrint(eLogDebug, "Streaming: Set max conns per minute per destination to ", conns); + } + + bool StreamingDestination::DropNewStream(const i2p::data::IdentHash & ih) + { + std::lock_guard lock(m_ConnsMutex); + if (m_Banned.size() > MAX_BANNED_CONNS) return true; // overload + auto end = std::end(m_Banned); + if ( std::find(std::begin(m_Banned), end, ih) != end) return true; // already banned + auto itr = m_Conns.find(ih); + if (itr == m_Conns.end()) + m_Conns[ih] = 0; + + m_Conns[ih] += 1; + + bool ban = m_Conns[ih] >= m_ConnsPerMinute; + if (ban) + { + m_Banned.push_back(ih); + m_Conns.erase(ih); + LogPrint(eLogWarning, "Streaming: ban ", ih.ToBase32()); + } + return ban; + } + + void StreamingDestination::HandleConnTrack(const boost::system::error_code& ecode) + { + if (ecode != boost::asio::error::operation_aborted) + { + { // acquire lock + std::lock_guard lock(m_ConnsMutex); + // clear conn tracking + m_Conns.clear(); + // check for ban clear + auto ts = i2p::util::GetMillisecondsSinceEpoch(); + if (ts - m_LastBanClear >= DEFAULT_BAN_INTERVAL) + { + // clear bans + m_Banned.clear(); + m_LastBanClear = ts; + } + } + // reschedule timer + ScheduleConnTrack(); + } + } + + void StreamingDestination::ScheduleConnTrack() + { + m_ConnTrackTimer.expires_from_now (boost::posix_time::seconds(60)); + m_ConnTrackTimer.async_wait ( + std::bind (&StreamingDestination::HandleConnTrack, + shared_from_this (), std::placeholders::_1)); + } } } diff --git a/Streaming.h b/Streaming.h index c29b62f9..efa4d69e 100644 --- a/Streaming.h +++ b/Streaming.h @@ -51,6 +51,22 @@ namespace stream const int INITIAL_RTO = 9000; // in milliseconds const size_t MAX_PENDING_INCOMING_BACKLOG = 128; const int PENDING_INCOMING_TIMEOUT = 10; // in seconds + + /** i2cp option for limiting inbound stremaing connections */ + const char I2CP_PARAM_STREAMING_MAX_CONNS_PER_MIN[] = "maxconns"; + /** default maximum connections attempts per minute per destination */ + const uint32_t DEFAULT_MAX_CONNS_PER_MIN = 600; + + /** + * max banned destinations per local destination + * TODO: make configurable + */ + const uint16_t MAX_BANNED_CONNS = 9999; + /** + * length of a ban in ms + * TODO: make configurable + */ + const uint64_t DEFAULT_BAN_INTERVAL = 60 * 60 * 1000; struct Packet { @@ -134,10 +150,11 @@ namespace stream size_t GetSendBufferSize () const { return m_SendBuffer.rdbuf ()->in_avail (); }; int GetWindowSize () const { return m_WindowSize; }; int GetRTT () const { return m_RTT; }; - - private: + /** don't call me */ void Terminate (); + + private: void SendBuffer (); void SendQuickAck (); @@ -210,12 +227,22 @@ namespace stream void HandleDataMessagePayload (const uint8_t * buf, size_t len); std::shared_ptr CreateDataMessage (const uint8_t * payload, size_t len, uint16_t toPort); + /** set max connections per minute per destination */ + void SetMaxConnsPerMinute(const uint32_t conns); + private: void HandleNextPacket (Packet * packet); std::shared_ptr CreateNewIncomingStream (); void HandlePendingIncomingTimer (const boost::system::error_code& ecode); + /** handle cleaning up connection tracking for ratelimits */ + void HandleConnTrack(const boost::system::error_code& ecode); + + bool DropNewStream(const i2p::data::IdentHash & ident); + + void ScheduleConnTrack(); + private: std::shared_ptr m_Owner; @@ -227,7 +254,16 @@ namespace stream std::list > m_PendingIncomingStreams; boost::asio::deadline_timer m_PendingIncomingTimer; std::map > m_SavedPackets; // receiveStreamID->packets, arrived before SYN - + + std::mutex m_ConnsMutex; + /** how many connections per minute did each identity have */ + std::map m_Conns; + boost::asio::deadline_timer m_ConnTrackTimer; + uint32_t m_ConnsPerMinute; + /** banned identities */ + std::vector m_Banned; + uint64_t m_LastBanClear; + public: i2p::data::GzipInflator m_Inflator; diff --git a/Tag.h b/Tag.h index b6f94de7..92e2f1a5 100644 --- a/Tag.h +++ b/Tag.h @@ -20,7 +20,7 @@ namespace data { { public: - Tag (const uint8_t * buf) { memcpy (m_Buf, buf, sz); }; + Tag (const uint8_t * buf) { memcpy (m_Buf, buf, sz); }; Tag (const Tag& ) = default; #ifndef _WIN32 // FIXME!!! msvs 2013 can't compile it Tag (Tag&& ) = default; @@ -50,6 +50,14 @@ namespace data { return true; } + const uint8_t * data() const { return m_Buf; } + + /** fill with a value */ + void Fill(uint8_t c) + { + memset(m_Buf, c, sz); + } + std::string ToBase64 () const { char str[sz*2]; diff --git a/Transports.cpp b/Transports.cpp index b1c174f6..a29cac15 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -114,6 +114,7 @@ namespace transport auto& addresses = context.GetRouterInfo ().GetAddresses (); for (const auto& address : addresses) { + if (!address) continue; if (m_NTCPServer == nullptr && enableNTCP) { m_NTCPServer = new NTCPServer (); diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 119556aa..35272f2c 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -15,7 +15,8 @@ namespace tunnel { TunnelPool::TunnelPool (int numInboundHops, int numOutboundHops, int numInboundTunnels, int numOutboundTunnels): m_NumInboundHops (numInboundHops), m_NumOutboundHops (numOutboundHops), - m_NumInboundTunnels (numInboundTunnels), m_NumOutboundTunnels (numOutboundTunnels), m_IsActive (true) + m_NumInboundTunnels (numInboundTunnels), m_NumOutboundTunnels (numOutboundTunnels), m_IsActive (true), + m_CustomPeerSelector(nullptr) { } @@ -327,9 +328,18 @@ namespace tunnel bool TunnelPool::SelectPeers (std::vector >& peers, bool isInbound) { - if (m_ExplicitPeers) return SelectExplicitPeers (peers, isInbound); int numHops = isInbound ? m_NumInboundHops : m_NumOutboundHops; - if (numHops <= 0) return true; // peers is empty + // peers is empty + if (numHops <= 0) return true; + // custom peer selector in use ? + { + std::lock_guard lock(m_CustomPeerSelectorMutex); + if (m_CustomPeerSelector) + return m_CustomPeerSelector->SelectPeers(peers, numHops, isInbound); + } + // explicit peers in use + if (m_ExplicitPeers) return SelectExplicitPeers (peers, isInbound); + auto prevHop = i2p::context.GetSharedRouterInfo (); if(i2p::transport::transports.RoutesRestricted()) { @@ -477,6 +487,23 @@ namespace tunnel LogPrint (eLogDebug, "Tunnels: Creating paired inbound tunnel..."); auto tunnel = tunnels.CreateInboundTunnel (std::make_shared(outboundTunnel->GetInvertedPeers ()), outboundTunnel); tunnel->SetTunnelPool (shared_from_this ()); - } + } + + void TunnelPool::SetCustomPeerSelector(TunnelPeerSelector selector) + { + std::lock_guard lock(m_CustomPeerSelectorMutex); + m_CustomPeerSelector = selector; + } + + void TunnelPool::UnsetCustomPeerSelector() + { + SetCustomPeerSelector(nullptr); + } + + bool TunnelPool::HasCustomPeerSelector() + { + std::lock_guard lock(m_CustomPeerSelectorMutex); + return m_CustomPeerSelector != nullptr; + } } } diff --git a/TunnelPool.h b/TunnelPool.h index 0cd2057d..d5bcf18f 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -23,6 +23,16 @@ namespace tunnel class InboundTunnel; class OutboundTunnel; + /** interface for custom tunnel peer selection algorithm */ + struct ITunnelPeerSelector + { + typedef std::shared_ptr Peer; + typedef std::vector TunnelPath; + virtual bool SelectPeers(TunnelPath & peers, int hops, bool isInbound) = 0; + }; + + typedef std::shared_ptr TunnelPeerSelector; + class TunnelPool: public std::enable_shared_from_this // per local destination { public: @@ -45,7 +55,6 @@ namespace tunnel std::shared_ptr GetNextOutboundTunnel (std::shared_ptr excluded = nullptr) const; std::shared_ptr GetNextInboundTunnel (std::shared_ptr excluded = nullptr) const; std::shared_ptr GetNewOutboundTunnel (std::shared_ptr old) const; - void TestTunnels (); void ProcessGarlicMessage (std::shared_ptr msg); void ProcessDeliveryStatus (std::shared_ptr msg); @@ -56,9 +65,12 @@ namespace tunnel int GetNumInboundTunnels () const { return m_NumInboundTunnels; }; int GetNumOutboundTunnels () const { return m_NumOutboundTunnels; }; - - private: + void SetCustomPeerSelector(TunnelPeerSelector selector); + void UnsetCustomPeerSelector(); + bool HasCustomPeerSelector(); + private: + void CreateInboundTunnel (); void CreateOutboundTunnel (); void CreatePairedInboundTunnel (std::shared_ptr outboundTunnel); @@ -80,7 +92,8 @@ namespace tunnel mutable std::mutex m_TestsMutex; std::map, std::shared_ptr > > m_Tests; bool m_IsActive; - + std::mutex m_CustomPeerSelectorMutex; + TunnelPeerSelector m_CustomPeerSelector; public: // for HTTP only