diff --git a/Datagram.cpp b/Datagram.cpp index aed831b7..72dfe67c 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -12,12 +12,17 @@ namespace i2p namespace datagram { DatagramDestination::DatagramDestination (std::shared_ptr owner): - m_Owner (owner.get()), m_Receiver (nullptr) + m_Owner (owner.get()), + m_CleanupTimer(owner->GetService()), + m_Receiver (nullptr) { + ScheduleCleanup(); } DatagramDestination::~DatagramDestination () { + m_CleanupTimer.cancel(); + m_Sessions.clear(); } void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort) @@ -41,45 +46,11 @@ namespace datagram else owner->Sign (buf1, len, signature); - auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); - auto remote = owner->FindLeaseSet (ident); - if (remote) - owner->GetService ().post (std::bind (&DatagramDestination::SendMsg, this, msg, remote)); - else - owner->RequestDestination (ident, std::bind (&DatagramDestination::HandleLeaseSetRequestComplete, this, std::placeholders::_1, msg)); - } + auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); + auto session = ObtainSession(ident); + session->SendMsg(msg); + } - void DatagramDestination::HandleLeaseSetRequestComplete (std::shared_ptr remote, std::shared_ptr msg) - { - if (remote) - SendMsg (msg, remote); - } - - void DatagramDestination::SendMsg (std::shared_ptr msg, std::shared_ptr remote) - { - auto outboundTunnel = m_Owner->GetTunnelPool ()->GetNextOutboundTunnel (); - auto leases = remote->GetNonExpiredLeases (); - if (!leases.empty () && outboundTunnel) - { - std::vector msgs; - uint32_t i = rand () % leases.size (); - auto garlic = m_Owner->WrapMessage (remote, msg, true); - msgs.push_back (i2p::tunnel::TunnelMessageBlock - { - i2p::tunnel::eDeliveryTypeTunnel, - leases[i]->tunnelGateway, leases[i]->tunnelID, - garlic - }); - outboundTunnel->SendTunnelDataMsg (msgs); - } - else - { - if (outboundTunnel) - LogPrint (eLogWarning, "Failed to send datagram. All leases expired"); - else - LogPrint (eLogWarning, "Failed to send datagram. No outbound tunnels"); - } - } void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { @@ -139,7 +110,223 @@ namespace datagram else msg = nullptr; return msg; - } + } + + void DatagramDestination::ScheduleCleanup() + { + m_CleanupTimer.expires_from_now(boost::posix_time::seconds(DATAGRAM_SESSION_CLEANUP_INTERVAL)); + m_CleanupTimer.async_wait(std::bind(&DatagramDestination::HandleCleanUp, this, std::placeholders::_1)); + } + + void DatagramDestination::HandleCleanUp(const boost::system::error_code & ecode) + { + if(ecode) + return; + std::lock_guard lock(m_SessionsMutex); + auto now = i2p::util::GetMillisecondsSinceEpoch(); + LogPrint(eLogDebug, "DatagramDestination: clean up sessions"); + std::vector expiredSessions; + // for each session ... + for (auto & e : m_Sessions) { + // check if expired + if(now - e.second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE) + expiredSessions.push_back(e.first); // we are expired + } + // for each expired session ... + for (auto & ident : expiredSessions) { + // remove the expired session + LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", ident.ToBase32()); + m_Sessions.erase(ident); + } + } + + std::shared_ptr DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident) + { + std::shared_ptr session = nullptr; + std::lock_guard lock(m_SessionsMutex); + auto itr = m_Sessions.find(ident); + if (itr == m_Sessions.end()) { + // not found, create new session + session = std::make_shared(m_Owner, ident); + m_Sessions[ident] = session; + } else { + session = itr->second; + } + return session; + } + + DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination, + const i2p::data::IdentHash & remoteIdent) : + m_LocalDestination(localDestination), + m_RemoteIdentity(remoteIdent), + m_LastUse(i2p::util::GetMillisecondsSinceEpoch()) + { + } + + void DatagramSession::SendMsg(std::shared_ptr msg) + { + // we used this session + m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); + // schedule send + m_LocalDestination->GetService().post(std::bind(&DatagramSession::HandleSend, this, msg)); + } + + void DatagramSession::HandleSend(std::shared_ptr msg) + { + // do we have a routing session? + if(m_RoutingSession) + { + // do we have a routing path ? + auto routingPath = m_RoutingSession->GetSharedRoutingPath(); + if(!routingPath) + { + // no routing path, try getting one + routingPath = GetNextRoutingPath(); + if(routingPath) // remember the routing path if we got one + m_RoutingSession->SetSharedRoutingPath(routingPath); + } + // make sure we have a routing path + if (routingPath) + { + auto outboundTunnel = routingPath->outboundTunnel; + if (outboundTunnel) + { + if(outboundTunnel->IsEstablished()) + { + // we have a routing path and routing session and the outbound tunnel we are using is good + // wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW + auto m = m_RoutingSession->WrapSingleMessage(msg); + routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{ + i2p::tunnel::eDeliveryTypeTunnel, + routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, + m + }}); + return; + } + } + } + } + // we couldn't send so let's try resetting the routing path and updating lease set + ResetRoutingPath(); + UpdateLeaseSet(msg); + + } + + std::shared_ptr DatagramSession::GetNextRoutingPath() + { + std::shared_ptr outboundTunnel = nullptr; + std::shared_ptr routingPath = nullptr; + // get existing routing path if we have one + if(m_RoutingSession) + routingPath = m_RoutingSession->GetSharedRoutingPath(); + // do we have an existing outbound tunnel and routing path? + if(routingPath && routingPath->outboundTunnel) + { + // is the outbound tunnel we are using good? + if (routingPath->outboundTunnel->IsEstablished()) + { + // ya so let's stick with it + outboundTunnel = routingPath->outboundTunnel; + } + else + outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels + // don't reuse the old path as we are making a new one + routingPath = nullptr; + } + // do we have an outbound tunnel that works already ? + if(!outboundTunnel) + outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started + + if(outboundTunnel) + { + // get next available lease + auto lease = GetNextLease(); + if(lease) + { + // we have a valid lease to use and an outbound tunnel + // create new routing path + uint32_t now = i2p::util::GetSecondsSinceEpoch(); + routingPath = std::make_shared(i2p::garlic::GarlicRoutingPath{ + outboundTunnel, + lease, + 0, + now, + 0 + }); + } + } + return routingPath; + } + + void DatagramSession::ResetRoutingPath() + { + if(m_RoutingSession) + { + auto routingPath = m_RoutingSession->GetSharedRoutingPath(); + if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path + { + // get outbound tunnel on this path + auto outboundTunnel = routingPath->outboundTunnel; + // is this outbound tunnel there and established + if (outboundTunnel && outboundTunnel->IsEstablished()) + m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine + } + // reset the routing path + m_RoutingSession->SetSharedRoutingPath(nullptr); + } + } + + std::shared_ptr DatagramSession::GetNextLease() + { + std::shared_ptr next = nullptr; + if(m_RemoteLeaseSet) + { + std::vector exclude; + for(const auto & ident : m_InvalidIBGW) + exclude.push_back(ident); + // find get all leases that are not in our ban list + auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude] (const i2p::data::Lease & l) -> bool { + if(exclude.size()) + { + auto end = std::end(exclude); + return std::find_if(exclude.begin(), end, [l] ( const i2p::data::IdentHash & ident) -> bool { + return ident == l.tunnelGateway; + }) != end; + } + else + return false; + }); + if(leases.size()) + { + // pick random valid next lease + uint32_t idx = rand() % leases.size(); + next = leases[idx]; + } + } + return next; + } + + void DatagramSession::UpdateLeaseSet(std::shared_ptr msg) + { + LogPrint(eLogInfo, "DatagramSession: updating lease set"); + m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg)); + } + + void DatagramSession::HandleGotLeaseSet(std::shared_ptr remoteIdent, std::shared_ptr msg) + { + if(remoteIdent) { + // update routing session + if(m_RoutingSession) + m_RoutingSession = nullptr; + m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true); + // clear invalid IBGW as we have a new lease set + m_InvalidIBGW.clear(); + m_RemoteLeaseSet = remoteIdent; + // send the message that was queued if it was provided + if(msg) + HandleSend(msg); + } + } } } diff --git a/Datagram.h b/Datagram.h index 51fd3553..dd3ba3ed 100644 --- a/Datagram.h +++ b/Datagram.h @@ -9,6 +9,7 @@ #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" +#include "Garlic.h" namespace i2p { @@ -18,7 +19,52 @@ namespace client } namespace datagram { - const size_t MAX_DATAGRAM_SIZE = 32768; + + // seconds interval for cleanup timer + const int DATAGRAM_SESSION_CLEANUP_INTERVAL = 30; + // milliseconds for max session idle time (10 minutes) + const uint64_t DATAGRAM_SESSION_MAX_IDLE = 3600 * 1000; + + + class DatagramSession + { + public: + DatagramSession(i2p::client::ClientDestination * localDestination, + const i2p::data::IdentHash & remoteIdent); + + /** send an i2np message to remote endpoint for this session */ + void SendMsg(std::shared_ptr msg); + /** get the last time in milliseconds for when we used this datagram session */ + uint64_t LastActivity() const { return m_LastUse; } + private: + + /** get next usable routing path, try reusing outbound tunnels */ + std::shared_ptr GetNextRoutingPath(); + /** + * mark current routing path as invalid and clear it + * if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time + */ + void ResetRoutingPath(); + + /** get next usable lease, does not fetch or update if expired or have no lease set */ + std::shared_ptr GetNextLease(); + + void HandleSend(std::shared_ptr msg); + void HandleGotLeaseSet(std::shared_ptr remoteIdent, + std::shared_ptr msg); + void UpdateLeaseSet(std::shared_ptr msg=nullptr); + + private: + i2p::client::ClientDestination * m_LocalDestination; + i2p::data::IdentHash m_RemoteIdentity; + std::shared_ptr m_RoutingSession; + // Ident hash of IBGW that are invalid + std::vector m_InvalidIBGW; + std::shared_ptr m_RemoteLeaseSet; + uint64_t m_LastUse; + }; + + const size_t MAX_DATAGRAM_SIZE = 32768; class DatagramDestination { typedef std::function Receiver; @@ -36,19 +82,26 @@ namespace datagram void SetReceiver (const Receiver& receiver, uint16_t port) { m_ReceiversByPorts[port] = receiver; }; void ResetReceiver (uint16_t port) { m_ReceiversByPorts.erase (port); }; - + private: - - void HandleLeaseSetRequestComplete (std::shared_ptr leaseSet, std::shared_ptr msg); + // clean up after next tick + void ScheduleCleanup(); + + // clean up stale sessions and expire tags + void HandleCleanUp(const boost::system::error_code & ecode); + + std::shared_ptr ObtainSession(const i2p::data::IdentHash & ident); std::shared_ptr CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort); - void SendMsg (std::shared_ptr msg, std::shared_ptr remote); + void HandleDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); private: - i2p::client::ClientDestination * m_Owner; + boost::asio::deadline_timer m_CleanupTimer; Receiver m_Receiver; // default + std::mutex m_SessionsMutex; + std::map > m_Sessions; std::map m_ReceiversByPorts; i2p::data::GzipInflator m_Inflator; diff --git a/Destination.cpp b/Destination.cpp index 56681884..349c5785 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -701,6 +701,8 @@ namespace client m_StreamingDestination = nullptr; for (auto& it: m_StreamingDestinationsByPorts) it.second->Stop (); + if(m_DatagramDestination) + delete m_DatagramDestination; m_DatagramDestination = nullptr; return true; } diff --git a/Garlic.h b/Garlic.h index 6a92b94a..b37c20e8 100644 --- a/Garlic.h +++ b/Garlic.h @@ -107,7 +107,7 @@ namespace garlic std::shared_ptr GetSharedRoutingPath (); void SetSharedRoutingPath (std::shared_ptr path); - + private: size_t CreateAESBlock (uint8_t * buf, std::shared_ptr msg); diff --git a/LeaseSet.cpp b/LeaseSet.cpp index 72f236a5..ab0407cc 100644 --- a/LeaseSet.cpp +++ b/LeaseSet.cpp @@ -169,8 +169,13 @@ namespace data if (now >= m_ExpirationTime) return true; return m_ExpirationTime - now <= dlt; } - - const std::vector > LeaseSet::GetNonExpiredLeases (bool withThreshold) const + + const std::vector > LeaseSet::GetNonExpiredLeases (bool withThreshold) const + { + return GetNonExpiredLeasesExcluding( [] (const Lease & l) -> bool { return false; }, withThreshold); + } + + const std::vector > LeaseSet::GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold) const { auto ts = i2p::util::GetMillisecondsSinceEpoch (); std::vector > leases; @@ -181,7 +186,7 @@ namespace data endDate += LEASE_ENDDATE_THRESHOLD; else endDate -= LEASE_ENDDATE_THRESHOLD; - if (ts < endDate) + if (ts < endDate && !exclude(*it)) leases.push_back (it); } return leases; diff --git a/LeaseSet.h b/LeaseSet.h index d9e74bab..9b6b7844 100644 --- a/LeaseSet.h +++ b/LeaseSet.h @@ -38,6 +38,8 @@ namespace data }; }; + typedef std::function LeaseInspectFunc; + const size_t MAX_LS_BUFFER_SIZE = 3072; const size_t LEASE_SIZE = 44; // 32 + 4 + 8 const uint8_t MAX_NUM_LEASES = 16; @@ -56,6 +58,7 @@ namespace data size_t GetBufferLen () const { return m_BufferLen; }; bool IsValid () const { return m_IsValid; }; const std::vector > GetNonExpiredLeases (bool withThreshold = true) const; + const std::vector > GetNonExpiredLeasesExcluding (LeaseInspectFunc exclude, bool withThreshold = true) const; bool HasExpiredLeases () const; bool IsExpired () const; bool IsEmpty () const { return m_Leases.empty (); }; diff --git a/TunnelPool.h b/TunnelPool.h index 0cd2057d..b32a554f 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -45,7 +45,6 @@ namespace tunnel std::shared_ptr GetNextOutboundTunnel (std::shared_ptr excluded = nullptr) const; std::shared_ptr GetNextInboundTunnel (std::shared_ptr excluded = nullptr) const; std::shared_ptr GetNewOutboundTunnel (std::shared_ptr old) const; - void TestTunnels (); void ProcessGarlicMessage (std::shared_ptr msg); void ProcessDeliveryStatus (std::shared_ptr msg);