diff --git a/AddressBook.cpp b/AddressBook.cpp index db424308..685acbc2 100644 --- a/AddressBook.cpp +++ b/AddressBook.cpp @@ -842,7 +842,7 @@ namespace client else memset (response + 8, 0, 32); // not found memset (response + 40, 0, 4); // set expiration time to zero - m_LocalDestination->GetDatagramDestination ()->SendDatagramTo (response, 44, from.GetIdentHash (), toPort, fromPort); + m_LocalDestination->GetDatagramDestination ()->SendDatagramTo (response, 44, from.GetIdentHash(), toPort, fromPort); } void AddressResolver::AddAddress (const std::string& name, const i2p::data::IdentHash& ident) diff --git a/Config.cpp b/Config.cpp index 9e8fb657..e7f7fa16 100644 --- a/Config.cpp +++ b/Config.cpp @@ -90,7 +90,8 @@ namespace config { ("httpproxy.inbound.quantity", value()->default_value("5"), "HTTP proxy inbound tunnels quantity") ("httpproxy.outbound.quantity", value()->default_value("5"), "HTTP proxy outbound tunnels quantity") ("httpproxy.latency.min", value()->default_value("0"), "HTTP proxy min latency for tunnels") - ("httpproxy.latency.max", value()->default_value("0"), "HTTP proxy max latency for tunnels") + ("httpproxy.latency.max", value()->default_value("0"), "HTTP proxy max latency for tunnels") + ("httpproxy.outproxy", value()->default_value(""), "HTTP proxy upstream out proxy url") ; options_description socksproxy("SOCKS Proxy options"); @@ -171,7 +172,7 @@ namespace config { "https://i2p.mooo.com/netDb/," "https://netdb.i2p2.no/," "https://us.reseed.i2p2.no:444/," - "https://uk.reseed.i2p2.no:444/," +// "https://uk.reseed.i2p2.no:444/," // mamoth's shit "https://i2p-0.manas.ca:8443/," "https://reseed.i2p.vzaws.com:8443/," "https://download.xxlspeed.com/," diff --git a/Crypto.cpp b/Crypto.cpp index 5de3a4ee..a4a794ac 100644 --- a/Crypto.cpp +++ b/Crypto.cpp @@ -386,9 +386,11 @@ namespace crypto // HMAC const uint64_t IPAD = 0x3636363636363636; const uint64_t OPAD = 0x5C5C5C5C5C5C5C5C; - + +#if defined(__AVX__) static const uint64_t ipads[] = { IPAD, IPAD, IPAD, IPAD }; static const uint64_t opads[] = { OPAD, OPAD, OPAD, OPAD }; +#endif void HMACMD5Digest (uint8_t * msg, size_t len, const MACKey& key, uint8_t * digest) // key is 32 bytes @@ -402,18 +404,19 @@ namespace crypto ( "vmovups %[key], %%ymm0 \n" "vmovups %[ipad], %%ymm1 \n" - "vmovups %%ymm1, 32%[buf] \n" + "vmovups %%ymm1, 32(%[buf]) \n" "vxorps %%ymm0, %%ymm1, %%ymm1 \n" - "vmovups %%ymm1, %[buf] \n" + "vmovups %%ymm1, (%[buf]) \n" "vmovups %[opad], %%ymm1 \n" - "vmovups %%ymm1, 32%[hash] \n" + "vmovups %%ymm1, 32(%[hash]) \n" "vxorps %%ymm0, %%ymm1, %%ymm1 \n" - "vmovups %%ymm1, %[hash] \n" + "vmovups %%ymm1, (%[hash]) \n" "vzeroall \n" // end of AVX - "movups %%xmm0, 80%[hash] \n" // zero last 16 bytes - : [buf]"=m"(*buf), [hash]"=m"(*hash) - : [key]"m"(*(const uint8_t *)key), [ipad]"m"(*ipads), [opad]"m"(*opads) - : "memory" + "movups %%xmm0, 80(%[hash]) \n" // zero last 16 bytes + : + : [key]"m"(*(const uint8_t *)key), [ipad]"m"(*ipads), [opad]"m"(*opads), + [buf]"r"(buf), [hash]"r"(hash) + : "memory", "%xmm0" // TODO: change to %ymm0 later ); #else // ikeypad @@ -421,13 +424,19 @@ namespace crypto buf[1] = key.GetLL ()[1] ^ IPAD; buf[2] = key.GetLL ()[2] ^ IPAD; buf[3] = key.GetLL ()[3] ^ IPAD; - memcpy (buf + 4, ipads, 32); + buf[4] = IPAD; + buf[5] = IPAD; + buf[6] = IPAD; + buf[7] = IPAD; // okeypad hash[0] = key.GetLL ()[0] ^ OPAD; hash[1] = key.GetLL ()[1] ^ OPAD; hash[2] = key.GetLL ()[2] ^ OPAD; hash[3] = key.GetLL ()[3] ^ OPAD; - memcpy (hash + 4, opads, 32); + hash[4] = OPAD; + hash[5] = OPAD; + hash[6] = OPAD; + hash[7] = OPAD; // fill last 16 bytes with zeros (first hash size assumed 32 bytes in I2P) memset (hash + 10, 0, 16); #endif diff --git a/Crypto.h b/Crypto.h index a04a93da..9e35f073 100644 --- a/Crypto.h +++ b/Crypto.h @@ -76,7 +76,18 @@ namespace crypto void operator^=(const ChipherBlock& other) // XOR { -#if defined(__x86_64__) || defined(__SSE__) // for Intel x84 or with SSE +#if defined(__AVX__) // AVX + __asm__ + ( + "vmovups (%[buf]), %%xmm0 \n" + "vmovups (%[other]), %%xmm1 \n" + "vxorps %%xmm0, %%xmm1, %%xmm0 \n" + "vmovups %%xmm0, (%[buf]) \n" + : + : [buf]"r"(buf), [other]"r"(other.buf) + : "%xmm0", "%xmm1", "memory" + ); +#elif defined(__SSE__) // SSE __asm__ ( "movups (%[buf]), %%xmm0 \n" diff --git a/Datagram.cpp b/Datagram.cpp index d0b0737a..f8437108 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -22,9 +22,9 @@ namespace datagram { m_Sessions.clear(); } - - void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort) - { + + void DatagramDestination::SendDatagramTo(const uint8_t * payload, size_t len, const i2p::data::IdentHash & identity, uint16_t fromPort, uint16_t toPort) + { auto owner = m_Owner; std::vector v(MAX_DATAGRAM_SIZE); uint8_t * buf = v.data(); @@ -45,8 +45,7 @@ namespace datagram owner->Sign (buf1, len, signature); auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); - auto session = ObtainSession(ident); - session->SendMsg(msg); + ObtainSession(identity)->SendMsg(msg); } @@ -69,6 +68,8 @@ namespace datagram if (verified) { + auto h = identity.GetIdentHash(); + ObtainSession(h)->Ack(); auto r = FindReceiver(toPort); if(r) r(identity, fromPort, toPort, buf + headerLen, len -headerLen); @@ -138,15 +139,15 @@ namespace datagram } } - std::shared_ptr DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident) + std::shared_ptr DatagramDestination::ObtainSession(const i2p::data::IdentHash & identity) { std::shared_ptr session = nullptr; std::lock_guard lock(m_SessionsMutex); - auto itr = m_Sessions.find(ident); + auto itr = m_Sessions.find(identity); if (itr == m_Sessions.end()) { // not found, create new session - session = std::make_shared(m_Owner, ident); - m_Sessions[ident] = session; + session = std::make_shared(m_Owner, identity); + m_Sessions[identity] = session; } else { session = itr->second; } @@ -164,13 +165,13 @@ namespace datagram } DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination, - const i2p::data::IdentHash & remoteIdent) : + const i2p::data::IdentHash & remoteIdent) : m_LocalDestination(localDestination), - m_RemoteIdentity(remoteIdent), - m_LastUse(i2p::util::GetMillisecondsSinceEpoch ()), - m_LastPathChange(0), - m_LastSuccess(0) + m_RemoteIdent(remoteIdent), + m_SendQueueTimer(localDestination->GetService()) { + m_LastUse = i2p::util::GetMillisecondsSinceEpoch (); + ScheduleFlushSendQueue(); } void DatagramSession::SendMsg(std::shared_ptr msg) @@ -184,262 +185,149 @@ namespace datagram DatagramSession::Info DatagramSession::GetSessionInfo() const { if(!m_RoutingSession) - return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + return DatagramSession::Info(nullptr, nullptr, m_LastUse); auto routingPath = m_RoutingSession->GetSharedRoutingPath(); if (!routingPath) - return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + return DatagramSession::Info(nullptr, nullptr, m_LastUse); auto lease = routingPath->remoteLease; auto tunnel = routingPath->outboundTunnel; if(lease) { if(tunnel) - return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess); + return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse); else - return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse, m_LastSuccess); + return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse); } else if(tunnel) - return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess); + return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse); else - return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); - } - - void DatagramSession::HandleSend(std::shared_ptr msg) - { - if(!m_RoutingSession) - { - // try to get one - if(m_RemoteLeaseSet) m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); - else - { - UpdateLeaseSet(msg); - return; - } - } - // do we have a routing session? - if(m_RoutingSession) - { - // should we switch paths? - if(ShouldUpdateRoutingPath ()) - { - LogPrint(eLogDebug, "DatagramSession: try getting new routing path"); - // try switching paths - auto path = GetNextRoutingPath(); - if(path) - UpdateRoutingPath (path); - else - ResetRoutingPath(); - } - auto routingPath = m_RoutingSession->GetSharedRoutingPath (); - // make sure we have a routing path - if (routingPath) - { - auto outboundTunnel = routingPath->outboundTunnel; - if (outboundTunnel) - { - if(outboundTunnel->IsEstablished()) - { - m_LastSuccess = i2p::util::GetMillisecondsSinceEpoch (); - // we have a routing path and routing session and the outbound tunnel we are using is good - // wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW - auto m = m_RoutingSession->WrapSingleMessage(msg); - routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{ - i2p::tunnel::eDeliveryTypeTunnel, - routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, - m - }}); - return; - } - } - } - } - auto now = i2p::util::GetMillisecondsSinceEpoch (); - // if this path looks dead reset the routing path since we didn't seem to be able to get a path in time - if (m_LastPathChange && now - m_LastPathChange >= DATAGRAM_SESSION_PATH_TIMEOUT ) ResetRoutingPath(); - UpdateLeaseSet(msg); - - } - - void DatagramSession::UpdateRoutingPath(const std::shared_ptr & path) - { - if(m_RoutingSession == nullptr && m_RemoteLeaseSet) - m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); - if(!m_RoutingSession) return; - // set routing path and update time we last updated the routing path - m_RoutingSession->SetSharedRoutingPath (path); - m_LastPathChange = i2p::util::GetMillisecondsSinceEpoch (); + return DatagramSession::Info(nullptr, nullptr, m_LastUse); } - bool DatagramSession::ShouldUpdateRoutingPath() const + void DatagramSession::Ack() { - bool dead = m_RoutingSession == nullptr || m_RoutingSession->GetSharedRoutingPath () == nullptr; - auto now = i2p::util::GetMillisecondsSinceEpoch (); - // we need to rotate paths becuase the routing path is too old - // if (now - m_LastPathChange >= DATAGRAM_SESSION_PATH_SWITCH_INTERVAL) return true; - // too fast switching paths - if (now - m_LastPathChange < DATAGRAM_SESSION_PATH_MIN_LIFETIME ) return false; - // our path looks dead so we need to rotate paths - if (now - m_LastSuccess >= DATAGRAM_SESSION_PATH_TIMEOUT) return !dead; - // if we have a routing session and routing path we don't need to switch paths - return dead; + m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); + auto path = GetSharedRoutingPath(); + if(path) + path->updateTime = i2p::util::GetSecondsSinceEpoch (); } - - bool DatagramSession::ShouldSwitchLease() const - { - std::shared_ptr routingPath = nullptr; - std::shared_ptr currentLease = nullptr; - if(m_RoutingSession) - routingPath = m_RoutingSession->GetSharedRoutingPath (); - if(routingPath) - currentLease = routingPath->remoteLease; - if(currentLease) // if we have a lease return true if it's about to expire otherwise return false - return currentLease->ExpiresWithin( DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE ); - // we have no current lease, we should switch - return currentLease == nullptr; - } - - std::shared_ptr DatagramSession::GetNextRoutingPath() + std::shared_ptr DatagramSession::GetSharedRoutingPath () { - std::shared_ptr outboundTunnel = nullptr; - std::shared_ptr routingPath = nullptr; - // get existing routing path if we have one - if(m_RoutingSession) - routingPath = m_RoutingSession->GetSharedRoutingPath(); - // do we have an existing outbound tunnel and routing path? - if(routingPath && routingPath->outboundTunnel) - { - // is the outbound tunnel we are using good? - if (routingPath->outboundTunnel->IsEstablished()) - { - // ya so let's stick with it - outboundTunnel = routingPath->outboundTunnel; + if(!m_RoutingSession) { + if(!m_RemoteLeaseSet) { + m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent); } - else - outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels + if(!m_RemoteLeaseSet) { + // no remote lease set + m_LocalDestination->RequestDestination(m_RemoteIdent, std::bind(&DatagramSession::HandleLeaseSetUpdated, this, std::placeholders::_1)); + return nullptr; + } + m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); } - // do we have an outbound tunnel that works already ? - if(!outboundTunnel) - outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started - - if(outboundTunnel) - { - std::shared_ptr lease = nullptr; - // should we switch leases ? - if (ShouldSwitchLease ()) - { - // yes, get next available lease - lease = GetNextLease(); + auto path = m_RoutingSession->GetSharedRoutingPath(); + if(path) { + if (m_CurrentOutboundTunnel && !m_CurrentOutboundTunnel->IsEstablished()) { + // bad outbound tunnel, switch outbound tunnel + m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(m_CurrentOutboundTunnel); + path->outboundTunnel = m_CurrentOutboundTunnel; } - else if (routingPath) - { - if(routingPath->remoteLease) - { - if(routingPath->remoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE)) - lease = GetNextLease(); - else - lease = routingPath->remoteLease; + if(m_CurrentRemoteLease && m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) { + // bad lease, switch to next one + if(m_RemoteLeaseSet) { + auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding([&](const i2p::data::Lease& l) -> bool { + return l.tunnelGateway == m_CurrentRemoteLease->tunnelGateway; + }); + auto sz = ls.size(); + if (sz) { + auto idx = rand() % sz; + m_CurrentRemoteLease = ls[idx]; + } + } else { + // no remote lease set? + LogPrint(eLogWarning, "DatagramSession: no cached remote lease set for ", m_RemoteIdent.ToBase32()); } + path->remoteLease = m_CurrentRemoteLease; } - else - lease = GetNextLease(); - if(lease) - { - // we have a valid lease to use and an outbound tunnel - // create new routing path - uint32_t now = i2p::util::GetSecondsSinceEpoch(); - routingPath = std::make_shared(i2p::garlic::GarlicRoutingPath{ - outboundTunnel, - lease, - 0, - now, - 0 - }); + } else { + // no current path, make one + path = std::make_shared(); + // switch outbound tunnel if bad + if(m_CurrentOutboundTunnel == nullptr || ! m_CurrentOutboundTunnel->IsEstablished()) { + m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(m_CurrentOutboundTunnel); + } + // switch lease if bad + if(m_CurrentRemoteLease == nullptr || m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) { + if(!m_RemoteLeaseSet) { + m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent); + } + if(m_RemoteLeaseSet) { + // pick random next good lease + auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding([&] (const i2p::data::Lease & l) -> bool { + if(m_CurrentRemoteLease) + return l.tunnelGateway == m_CurrentRemoteLease->tunnelGateway; + return false; + }); + auto sz = ls.size(); + if(sz) { + auto idx = rand() % sz; + m_CurrentRemoteLease = ls[idx]; + } + } else { + // no remote lease set currently, bail + LogPrint(eLogWarning, "DatagramSession: no remote lease set found for ", m_RemoteIdent.ToBase32()); + return nullptr; + } } - else // we don't have a new routing path to give - routingPath = nullptr; + path->outboundTunnel = m_CurrentOutboundTunnel; + path->remoteLease = m_CurrentRemoteLease; + m_RoutingSession->SetSharedRoutingPath(path); } - return routingPath; + return path; + } - void DatagramSession::ResetRoutingPath() + void DatagramSession::HandleLeaseSetUpdated(std::shared_ptr ls) { - if(m_RoutingSession) - { - auto routingPath = m_RoutingSession->GetSharedRoutingPath(); - if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path - { - // get outbound tunnel on this path - auto outboundTunnel = routingPath->outboundTunnel; - // is this outbound tunnel there and established - if (outboundTunnel && outboundTunnel->IsEstablished()) - m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine - } - // reset the routing path - UpdateRoutingPath(nullptr); - } + if(!ls) return; + // only update lease set if found and newer than previous lease set + uint64_t oldExpire = 0; + if(m_RemoteLeaseSet) oldExpire = m_RemoteLeaseSet->GetExpirationTime(); + if(ls && ls->GetExpirationTime() > oldExpire) m_RemoteLeaseSet = ls; } - std::shared_ptr DatagramSession::GetNextLease() + void DatagramSession::HandleSend(std::shared_ptr msg) { - auto now = i2p::util::GetMillisecondsSinceEpoch (); - std::shared_ptr next = nullptr; - if(m_RemoteLeaseSet) + m_SendQueue.push_back(msg); + // flush queue right away if full + if(m_SendQueue.size() >= DATAGRAM_SEND_QUEUE_MAX_SIZE) FlushSendQueue(); + } + + void DatagramSession::FlushSendQueue () + { + + std::vector send; + auto routingPath = GetSharedRoutingPath(); + // if we don't have a routing path we will drop all queued messages + if(routingPath) { - std::vector exclude; - for(const auto & ident : m_InvalidIBGW) - exclude.push_back(ident); - // find get all leases that are not in our ban list and are not going to expire within our lease set handover window + fudge - auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude, now] (const i2p::data::Lease & l) -> bool { - if(exclude.size()) - { - auto end = std::end(exclude); - return std::find_if(exclude.begin(), end, [l, now] ( const i2p::data::IdentHash & ident) -> bool { - return ident == l.tunnelGateway; - }) != end; - } - else - return false; - }); - if(leases.size()) + for (const auto & msg : m_SendQueue) { - // pick random valid next lease - uint32_t idx = rand() % leases.size(); - next = leases[idx]; + auto m = m_RoutingSession->WrapSingleMessage(msg); + send.push_back(i2p::tunnel::TunnelMessageBlock{i2p::tunnel::eDeliveryTypeTunnel,routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, m}); } - else - LogPrint(eLogWarning, "DatagramDestination: no leases to use"); + routingPath->outboundTunnel->SendTunnelDataMsg(send); } - return next; - } - - void DatagramSession::UpdateLeaseSet(std::shared_ptr msg) - { - LogPrint(eLogInfo, "DatagramSession: updating lease set"); - m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg)); + m_SendQueue.clear(); + ScheduleFlushSendQueue(); } - void DatagramSession::HandleGotLeaseSet(std::shared_ptr remoteIdent, std::shared_ptr msg) + void DatagramSession::ScheduleFlushSendQueue() { - if(remoteIdent) - { - // update routing session - if(m_RoutingSession) - m_RoutingSession = nullptr; - m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true); - // clear invalid IBGW as we have a new lease set - m_InvalidIBGW.clear(); - m_RemoteLeaseSet = remoteIdent; - // update routing path - auto path = GetNextRoutingPath(); - if (path) - UpdateRoutingPath(path); - else - ResetRoutingPath(); - // send the message that was queued if it was provided - if(msg) - HandleSend(msg); - } + boost::posix_time::milliseconds dlt(100); + m_SendQueueTimer.expires_from_now(dlt); + m_SendQueueTimer.async_wait([&](const boost::system::error_code & ec) { if(ec) return; FlushSendQueue(); }); } } } diff --git a/Datagram.h b/Datagram.h index dc63cccb..8891f0cc 100644 --- a/Datagram.h +++ b/Datagram.h @@ -31,29 +31,33 @@ namespace datagram const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE = 1000; // milliseconds minimum time between path switches const uint64_t DATAGRAM_SESSION_PATH_MIN_LIFETIME = 5 * 1000; + // max 64 messages buffered in send queue for each datagram session + const size_t DATAGRAM_SEND_QUEUE_MAX_SIZE = 64; class DatagramSession { public: DatagramSession(i2p::client::ClientDestination * localDestination, - const i2p::data::IdentHash & remoteIdent); + const i2p::data::IdentHash & remoteIdent); + + + /** @brief ack the garlic routing path */ + void Ack(); /** send an i2np message to remote endpoint for this session */ void SendMsg(std::shared_ptr msg); /** get the last time in milliseconds for when we used this datagram session */ uint64_t LastActivity() const { return m_LastUse; } - /** get the last time in milliseconds when we successfully sent data */ - uint64_t LastSuccess() const { return m_LastSuccess; } + struct Info { std::shared_ptr IBGW; std::shared_ptr OBEP; const uint64_t activity; - const uint64_t success; - Info() : IBGW(nullptr), OBEP(nullptr), activity(0), success(0) {} - Info(const uint8_t * ibgw, const uint8_t * obep, const uint64_t a, const uint64_t s) : - activity(a), - success(s) { + + Info() : IBGW(nullptr), OBEP(nullptr), activity(0) {} + Info(const uint8_t * ibgw, const uint8_t * obep, const uint64_t a) : + activity(a) { if(ibgw) IBGW = std::make_shared(ibgw); else IBGW = nullptr; if(obep) OBEP = std::make_shared(obep); @@ -63,44 +67,28 @@ namespace datagram Info GetSessionInfo() const; - private: - /** update our routing path we are using, mark that we have changed paths */ - void UpdateRoutingPath(const std::shared_ptr & path); + void FlushSendQueue(); + void ScheduleFlushSendQueue(); - /** return true if we should switch routing paths because of path lifetime or timeout otherwise false */ - bool ShouldUpdateRoutingPath() const; + void HandleSend(std::shared_ptr msg); + + std::shared_ptr GetSharedRoutingPath(); + + void HandleLeaseSetUpdated(std::shared_ptr ls); - /** return true if we should switch the lease for out routing path otherwise return false */ - bool ShouldSwitchLease() const; - - /** get next usable routing path, try reusing outbound tunnels */ - std::shared_ptr GetNextRoutingPath(); - /** - * mark current routing path as invalid and clear it - * if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time - */ - void ResetRoutingPath(); - - /** get next usable lease, does not fetch or update if expired or have no lease set */ - std::shared_ptr GetNextLease(); - - void HandleSend(std::shared_ptr msg); - void HandleGotLeaseSet(std::shared_ptr remoteIdent, - std::shared_ptr msg); - void UpdateLeaseSet(std::shared_ptr msg=nullptr); - private: i2p::client::ClientDestination * m_LocalDestination; - i2p::data::IdentHash m_RemoteIdentity; - std::shared_ptr m_RoutingSession; - // Ident hash of IBGW that are invalid - std::vector m_InvalidIBGW; - std::shared_ptr m_RemoteLeaseSet; - uint64_t m_LastUse; - uint64_t m_LastPathChange; - uint64_t m_LastSuccess; + i2p::data::IdentHash m_RemoteIdent; + std::shared_ptr m_RemoteLeaseSet; + std::shared_ptr m_RoutingSession; + std::shared_ptr m_CurrentRemoteLease; + std::shared_ptr m_CurrentOutboundTunnel; + boost::asio::deadline_timer m_SendQueueTimer; + std::vector > m_SendQueue; + uint64_t m_LastUse; + }; const size_t MAX_DATAGRAM_SIZE = 32768; @@ -112,9 +100,9 @@ namespace datagram DatagramDestination (std::shared_ptr owner); - ~DatagramDestination (); + ~DatagramDestination (); - void SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort = 0, uint16_t toPort = 0); + void SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash & ident, uint16_t fromPort = 0, uint16_t toPort = 0); void HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void SetReceiver (const Receiver& receiver) { m_Receiver = receiver; }; @@ -130,7 +118,7 @@ namespace datagram private: - std::shared_ptr ObtainSession(const i2p::data::IdentHash & ident); + std::shared_ptr ObtainSession(const i2p::data::IdentHash & ident); std::shared_ptr CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort); diff --git a/Destination.cpp b/Destination.cpp index 61960464..491a9f29 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -544,11 +544,14 @@ namespace client else // duplicate { LogPrint (eLogInfo, "Destination: Request of LeaseSet ", dest.ToBase64 (), " is pending already"); - // TODO: implement it properly - //ret.first->second->requestComplete.push_back (requestComplete); if (ts > ret.first->second->requestTime + MAX_LEASESET_REQUEST_TIMEOUT) + { + // something went wrong m_LeaseSetRequests.erase (ret.first); - if (requestComplete) requestComplete (nullptr); + if (requestComplete) requestComplete (nullptr); + } + else if (requestComplete) + ret.first->second->requestComplete.push_back (requestComplete); } } else diff --git a/Event.cpp b/Event.cpp index e148538e..4bc6d594 100644 --- a/Event.cpp +++ b/Event.cpp @@ -17,15 +17,44 @@ namespace i2p void EventCore::QueueEvent(const EventType & ev) { - if(m_listener) - m_listener->HandleEvent(ev); + if(m_listener) m_listener->HandleEvent(ev); + } + + void EventCore::CollectEvent(const std::string & type, const std::string & ident, uint64_t val) + { + std::unique_lock lock(m_collect_mutex); + std::string key = type + "." + ident; + if (m_collected.find(key) == m_collected.end()) + { + m_collected[key] = {type, key, 0}; + } + m_collected[key].Val += val; + } + + void EventCore::PumpCollected(EventListener * listener) + { + std::unique_lock lock(m_collect_mutex); + if(listener) + { + for(const auto & ev : m_collected) { + listener->HandlePumpEvent({{"type", ev.second.Key}, {"ident", ev.second.Ident}}, ev.second.Val); + } + } + m_collected.clear(); } } } -void EmitEvent(const EventType & e) +void QueueIntEvent(const std::string & type, const std::string & ident, uint64_t val) { #ifdef WITH_EVENTS + i2p::event::core.CollectEvent(type, ident, val); +#endif +} + +void EmitEvent(const EventType & e) +{ +#if WITH_EVENTS i2p::event::core.QueueEvent(e); #endif } diff --git a/Event.h b/Event.h index 1ab37847..a9f97df2 100644 --- a/Event.h +++ b/Event.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include @@ -16,15 +18,27 @@ namespace i2p public: virtual ~EventListener() {}; virtual void HandleEvent(const EventType & ev) = 0; + /** @brief handle collected event when pumped */ + virtual void HandlePumpEvent(const EventType & ev, const uint64_t & val) = 0; }; class EventCore { public: void QueueEvent(const EventType & ev); + void CollectEvent(const std::string & type, const std::string & ident, uint64_t val); void SetListener(EventListener * l); - + void PumpCollected(EventListener * l); + private: + std::mutex m_collect_mutex; + struct CollectedEvent + { + std::string Key; + std::string Ident; + uint64_t Val; + }; + std::map m_collected; EventListener * m_listener = nullptr; }; #ifdef WITH_EVENTS @@ -32,6 +46,8 @@ namespace i2p #endif } } + +void QueueIntEvent(const std::string & type, const std::string & ident, uint64_t val); void EmitEvent(const EventType & ev); #endif diff --git a/Garlic.cpp b/Garlic.cpp index ab20ac8f..81941cfc 100644 --- a/Garlic.cpp +++ b/Garlic.cpp @@ -20,8 +20,7 @@ namespace garlic std::shared_ptr destination, int numTags, bool attachLeaseSet): m_Owner (owner), m_Destination (destination), m_NumTags (numTags), m_LeaseSetUpdateStatus (attachLeaseSet ? eLeaseSetUpdated : eLeaseSetDoNotSend), - m_LeaseSetUpdateMsgID (0), - m_ElGamalEncryption (new i2p::crypto::ElGamalEncryption (destination->GetEncryptionPublicKey ())) + m_LeaseSetUpdateMsgID (0) { // create new session tags and session key RAND_bytes (m_SessionKey, 32); @@ -29,7 +28,7 @@ namespace garlic } GarlicRoutingSession::GarlicRoutingSession (const uint8_t * sessionKey, const SessionTag& sessionTag): - m_Owner (nullptr), m_Destination (nullptr), m_NumTags (1), m_LeaseSetUpdateStatus (eLeaseSetDoNotSend), m_LeaseSetUpdateMsgID (0) + m_Owner (nullptr), m_NumTags (1), m_LeaseSetUpdateStatus (eLeaseSetDoNotSend), m_LeaseSetUpdateMsgID (0) { memcpy (m_SessionKey, sessionKey, 32); m_Encryption.SetKey (m_SessionKey); @@ -188,7 +187,8 @@ namespace garlic RAND_bytes (elGamal.preIV, 32); // Pre-IV uint8_t iv[32]; // IV is first 16 bytes SHA256(elGamal.preIV, 32, iv); - m_ElGamalEncryption->Encrypt ((uint8_t *)&elGamal, buf, true); + i2p::crypto::ElGamalEncryption elGamalEncryption (m_Destination->GetEncryptionPublicKey ()); + elGamalEncryption.Encrypt ((uint8_t *)&elGamal, buf, true); m_Encryption.SetIV (iv); buf += 514; len += 514; @@ -315,7 +315,7 @@ namespace garlic { uint64_t ts = i2p::util::GetMillisecondsSinceEpoch () + 8000; // 8 sec size_t size = 0; - if (isDestination && m_Destination) + if (isDestination) { buf[size] = eGarlicDeliveryTypeDestination << 5;// delivery instructions flag destination size++; diff --git a/Garlic.h b/Garlic.h index a7e2d264..b5dcd1eb 100644 --- a/Garlic.h +++ b/Garlic.h @@ -128,6 +128,7 @@ namespace garlic GarlicDestination * m_Owner; std::shared_ptr m_Destination; + i2p::crypto::AESKey m_SessionKey; std::list m_SessionTags; int m_NumTags; @@ -138,7 +139,6 @@ namespace garlic uint64_t m_LeaseSetSubmissionTime; // in milliseconds i2p::crypto::CBCEncryption m_Encryption; - std::unique_ptr m_ElGamalEncryption; std::shared_ptr m_SharedRoutingPath; diff --git a/HTTP.cpp b/HTTP.cpp index 08615e5b..201a4e3d 100644 --- a/HTTP.cpp +++ b/HTTP.cpp @@ -259,16 +259,21 @@ namespace http { return eoh + strlen(HTTP_EOH); } - std::string HTTPReq::to_string() { - std::stringstream ss; - ss << method << " " << uri << " " << version << CRLF; + void HTTPReq::write(std::ostream & o) { + o << method << " " << uri << " " << version << CRLF; for (auto & h : headers) { - ss << h.first << ": " << h.second << CRLF; + o << h.first << ": " << h.second << CRLF; } - ss << CRLF; - return ss.str(); + o << CRLF; } + std::string HTTPReq::to_string() + { + std::stringstream ss; + write(ss); + return ss.str(); + } + bool HTTPRes::is_chunked() { auto it = headers.find("Transfer-Encoding"); if (it == headers.end()) diff --git a/HTTP.h b/HTTP.h index 847cf347..581e4a34 100644 --- a/HTTP.h +++ b/HTTP.h @@ -82,6 +82,9 @@ namespace http { /** @brief Serialize HTTP request to string */ std::string to_string(); + + void write(std::ostream & o); + }; struct HTTPRes : HTTPMsg { @@ -116,6 +119,8 @@ namespace http { */ std::string to_string(); + void write(std::ostream & o); + /** @brief Checks that response declared as chunked data */ bool is_chunked(); diff --git a/HTTPProxy.cpp b/HTTPProxy.cpp index c40989d1..33a2a85c 100644 --- a/HTTPProxy.cpp +++ b/HTTPProxy.cpp @@ -64,15 +64,40 @@ namespace proxy { void HostNotFound(std::string & host); void SendProxyError(std::string & content); + void ForwardToUpstreamProxy(); + void HandleUpstreamHTTPProxyConnect(const boost::system::error_code & ec); + void HandleUpstreamSocksProxyConnect(const boost::system::error_code & ec); + + void HandleSocksProxySendHandshake(const boost::system::error_code & ec, std::size_t bytes_transfered); + void HandleSocksProxyReply(const boost::system::error_code & ec, std::size_t bytes_transfered); + + typedef std::function ProxyResolvedHandler; + + void HandleUpstreamProxyResolved(const boost::system::error_code & ecode, boost::asio::ip::tcp::resolver::iterator itr, ProxyResolvedHandler handler); + + void SocksProxySuccess(); + void HandoverToUpstreamProxy(); + uint8_t m_recv_chunk[8192]; std::string m_recv_buf; // from client std::string m_send_buf; // to upstream std::shared_ptr m_sock; - + std::shared_ptr m_proxysock; + boost::asio::ip::tcp::resolver m_proxy_resolver; + i2p::http::URL m_ProxyURL; + i2p::http::URL m_RequestURL; + uint8_t m_socks_buf[255+8]; // for socks request/response + ssize_t m_req_len; + i2p::http::URL m_ClientRequestURL; + i2p::http::HTTPReq m_ClientRequest; + i2p::http::HTTPRes m_ClientResponse; + std::stringstream m_ClientRequestBuffer; public: HTTPReqHandler(HTTPProxy * parent, std::shared_ptr sock) : - I2PServiceHandler(parent), m_sock(sock) {} + I2PServiceHandler(parent), m_sock(sock), + m_proxysock(std::make_shared(parent->GetService())), + m_proxy_resolver(parent->GetService()) {} ~HTTPReqHandler() { Terminate(); } void Handle () { AsyncSockRead(); } /* overload */ }; @@ -97,6 +122,13 @@ namespace proxy { m_sock->close(); m_sock = nullptr; } + if(m_proxysock) + { + LogPrint(eLogDebug, "HTTPProxy: close proxysock"); + if(m_proxysock->is_open()) + m_proxysock->close(); + m_proxysock = nullptr; + } Done(shared_from_this()); } @@ -142,7 +174,7 @@ namespace proxy { << "\r\n"; res.body = ss.str(); std::string response = res.to_string(); - boost::asio::async_write(*m_sock, boost::asio::buffer(response), + boost::asio::async_write(*m_sock, boost::asio::buffer(response), boost::asio::transfer_all(), std::bind(&HTTPReqHandler::SentHTTPFailed, shared_from_this(), std::placeholders::_1)); } @@ -198,54 +230,51 @@ namespace proxy { */ bool HTTPReqHandler::HandleRequest() { - i2p::http::HTTPReq req; - i2p::http::URL url; std::string b64; - int req_len = 0; - req_len = req.parse(m_recv_buf); + m_req_len = m_ClientRequest.parse(m_recv_buf); - if (req_len == 0) + if (m_req_len == 0) return false; /* need more data */ - if (req_len < 0) { + if (m_req_len < 0) { LogPrint(eLogError, "HTTPProxy: unable to parse request"); GenericProxyError("Invalid request", "Proxy unable to parse your request"); return true; /* parse error */ } /* parsing success, now let's look inside request */ - LogPrint(eLogDebug, "HTTPProxy: requested: ", req.uri); - url.parse(req.uri); + LogPrint(eLogDebug, "HTTPProxy: requested: ", m_ClientRequest.uri); + m_RequestURL.parse(m_ClientRequest.uri); - if (ExtractAddressHelper(url, b64)) { - i2p::client::context.GetAddressBook ().InsertAddress (url.host, b64); - LogPrint (eLogInfo, "HTTPProxy: added b64 from addresshelper for ", url.host); - std::string full_url = url.to_string(); + if (ExtractAddressHelper(m_RequestURL, b64)) { + i2p::client::context.GetAddressBook ().InsertAddress (m_RequestURL.host, b64); + LogPrint (eLogInfo, "HTTPProxy: added b64 from addresshelper for ", m_RequestURL.host); + std::string full_url = m_RequestURL.to_string(); std::stringstream ss; - ss << "Host " << url.host << " added to router's addressbook from helper. " + ss << "Host " << m_RequestURL.host << " added to router's addressbook from helper. " << "Click here to proceed."; GenericProxyInfo("Addresshelper found", ss.str().c_str()); return true; /* request processed */ } - SanitizeHTTPRequest(req); + SanitizeHTTPRequest(m_ClientRequest); - std::string dest_host = url.host; - uint16_t dest_port = url.port; + std::string dest_host = m_RequestURL.host; + uint16_t dest_port = m_RequestURL.port; /* always set port, even if missing in request */ if (!dest_port) { - dest_port = (url.schema == "https") ? 443 : 80; + dest_port = (m_RequestURL.schema == "https") ? 443 : 80; } /* detect dest_host, set proper 'Host' header in upstream request */ - auto h = req.headers.find("Host"); + auto h = m_ClientRequest.headers.find("Host"); if (dest_host != "") { /* absolute url, replace 'Host' header */ std::string h = dest_host; if (dest_port != 0 && dest_port != 80) h += ":" + std::to_string(dest_port); - req.add_header("Host", h, true); - } else if (h != req.headers.end()) { + m_ClientRequest.add_header("Host", h, true); + } else if (h != m_ClientRequest.headers.end()) { /* relative url and 'Host' header provided. transparent proxy mode? */ i2p::http::URL u; std::string t = "http://" + h->second; @@ -265,23 +294,31 @@ namespace proxy { HostNotFound(dest_host); return true; /* request processed */ } - /* TODO: outproxy handler here */ } else { - LogPrint (eLogWarning, "HTTPProxy: outproxy failure for ", dest_host, ": not implemented yet"); - std::string message = "Host" + dest_host + "not inside I2P network, but outproxy support not implemented yet"; - GenericProxyError("Outproxy failure", message.c_str()); + std::string outproxyUrl; i2p::config::GetOption("httpproxy.outproxy", outproxyUrl); + if(outproxyUrl.size()) { + LogPrint (eLogDebug, "HTTPProxy: use outproxy ", outproxyUrl); + if(m_ProxyURL.parse(outproxyUrl)) + ForwardToUpstreamProxy(); + else + GenericProxyError("Outproxy failure", "bad outproxy settings"); + } else { + LogPrint (eLogWarning, "HTTPProxy: outproxy failure for ", dest_host, ": no outprxy enabled"); + std::string message = "Host" + dest_host + "not inside I2P network, but outproxy is not enabled"; + GenericProxyError("Outproxy failure", message.c_str()); + } return true; } /* make relative url */ - url.schema = ""; - url.host = ""; - req.uri = url.to_string(); + m_RequestURL.schema = ""; + m_RequestURL.host = ""; + m_ClientRequest.uri = m_RequestURL.to_string(); /* drop original request from recv buffer */ - m_recv_buf.erase(0, req_len); + m_recv_buf.erase(0, m_req_len); /* build new buffer from modified request and data from original request */ - m_send_buf = req.to_string(); + m_send_buf = m_ClientRequest.to_string(); m_send_buf.append(m_recv_buf); /* connect to destination */ LogPrint(eLogDebug, "HTTPProxy: connecting to host ", dest_host, ":", dest_port); @@ -290,6 +327,144 @@ namespace proxy { return true; } + void HTTPReqHandler::ForwardToUpstreamProxy() + { + LogPrint(eLogDebug, "HTTPProxy: forward to upstream"); + // build http requset + + m_ClientRequestURL = m_RequestURL; + LogPrint(eLogDebug, "HTTPProxy: ", m_ClientRequestURL.host); + m_ClientRequestURL.schema = ""; + m_ClientRequestURL.host = ""; + m_ClientRequest.uri = m_ClientRequestURL.to_string(); + + m_ClientRequest.write(m_ClientRequestBuffer); + m_ClientRequestBuffer << m_recv_buf.substr(m_req_len); + + // assume http if empty schema + if (m_ProxyURL.schema == "" || m_ProxyURL.schema == "http") { + // handle upstream http proxy + if (!m_ProxyURL.port) m_ProxyURL.port = 80; + boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port)); + m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) { + m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamHTTPProxyConnect, this, std::placeholders::_1)); + })); + } else if (m_ProxyURL.schema == "socks") { + // handle upstream socks proxy + if (!m_ProxyURL.port) m_ProxyURL.port = 9050; // default to tor default if not specified + boost::asio::ip::tcp::resolver::query q(m_ProxyURL.host, std::to_string(m_ProxyURL.port)); + m_proxy_resolver.async_resolve(q, std::bind(&HTTPReqHandler::HandleUpstreamProxyResolved, this, std::placeholders::_1, std::placeholders::_2, [&](boost::asio::ip::tcp::endpoint ep) { + m_proxysock->async_connect(ep, std::bind(&HTTPReqHandler::HandleUpstreamSocksProxyConnect, this, std::placeholders::_1)); + })); + } else { + // unknown type, complain + GenericProxyError("unknown outproxy url", m_ProxyURL.to_string().c_str()); + } + } + + void HTTPReqHandler::HandleUpstreamProxyResolved(const boost::system::error_code & ec, boost::asio::ip::tcp::resolver::iterator it, ProxyResolvedHandler handler) + { + if(ec) GenericProxyError("cannot resolve upstream proxy", ec.message().c_str()); + else handler(*it); + } + + void HTTPReqHandler::HandleUpstreamSocksProxyConnect(const boost::system::error_code & ec) + { + if(!ec) { + if(m_RequestURL.host.size() > 255) { + GenericProxyError("hostname too long", m_RequestURL.host.c_str()); + return; + } + uint16_t port = m_RequestURL.port; + if(!port) port = 80; + LogPrint(eLogDebug, "HTTPProxy: connected to socks upstream"); + + std::string host = m_RequestURL.host; + std::size_t reqsize = 0; + m_socks_buf[0] = '\x04'; + m_socks_buf[1] = 1; + htobe16buf(m_socks_buf+2, port); + m_socks_buf[4] = 0; + m_socks_buf[5] = 0; + m_socks_buf[6] = 0; + m_socks_buf[7] = 1; + // user id + m_socks_buf[8] = 'i'; + m_socks_buf[9] = '2'; + m_socks_buf[10] = 'p'; + m_socks_buf[11] = 'd'; + m_socks_buf[12] = 0; + reqsize += 13; + memcpy(m_socks_buf+ reqsize, host.c_str(), host.size()); + reqsize += host.size(); + m_socks_buf[++reqsize] = 0; + boost::asio::async_write(*m_proxysock, boost::asio::buffer(m_socks_buf, reqsize), boost::asio::transfer_all(), std::bind(&HTTPReqHandler::HandleSocksProxySendHandshake, this, std::placeholders::_1, std::placeholders::_2)); + } else GenericProxyError("cannot connect to upstream socks proxy", ec.message().c_str()); + } + + void HTTPReqHandler::HandleSocksProxySendHandshake(const boost::system::error_code & ec, std::size_t bytes_transferred) + { + LogPrint(eLogDebug, "HTTPProxy: upstream socks handshake sent"); + if(ec) GenericProxyError("Cannot negotiate with socks proxy", ec.message().c_str()); + else m_proxysock->async_read_some(boost::asio::buffer(m_socks_buf, 8), std::bind(&HTTPReqHandler::HandleSocksProxyReply, this, std::placeholders::_1, std::placeholders::_2)); + } + + void HTTPReqHandler::HandoverToUpstreamProxy() + { + LogPrint(eLogDebug, "HTTPProxy: handover to socks proxy"); + auto connection = std::make_shared(GetOwner(), m_proxysock, m_sock); + m_sock = nullptr; + m_proxysock = nullptr; + GetOwner()->AddHandler(connection); + connection->Start(); + Terminate(); + } + + void HTTPReqHandler::SocksProxySuccess() + { + if(m_ClientRequest.method == "CONNECT") { + m_ClientResponse.code = 200; + m_send_buf = m_ClientResponse.to_string(); + boost::asio::async_write(*m_sock, boost::asio::buffer(m_send_buf), boost::asio::transfer_all(), [&] (const boost::system::error_code & ec, std::size_t transferred) { + if(ec) GenericProxyError("socks proxy error", ec.message().c_str()); + else HandoverToUpstreamProxy(); + }); + } else { + m_send_buf = m_ClientRequestBuffer.str(); + LogPrint(eLogDebug, "HTTPProxy: send ", m_send_buf.size(), " bytes"); + boost::asio::async_write(*m_proxysock, boost::asio::buffer(m_send_buf), boost::asio::transfer_all(), [&](const boost::system::error_code & ec, std::size_t transferred) { + if(ec) GenericProxyError("failed to send request to upstream", ec.message().c_str()); + else HandoverToUpstreamProxy(); + }); + } + } + + void HTTPReqHandler::HandleSocksProxyReply(const boost::system::error_code & ec, std::size_t bytes_transferred) + { + if(!ec) + { + if(m_socks_buf[1] == 90) { + // success + SocksProxySuccess(); + } else { + std::stringstream ss; + ss << "error code: "; + ss << (int) m_socks_buf[1]; + std::string msg = ss.str(); + GenericProxyError("Socks Proxy error", msg.c_str()); + } + } + else GenericProxyError("No Reply From socks proxy", ec.message().c_str()); + } + + void HTTPReqHandler::HandleUpstreamHTTPProxyConnect(const boost::system::error_code & ec) + { + if(!ec) { + LogPrint(eLogDebug, "HTTPProxy: connected to http upstream"); + GenericProxyError("cannot connect", "http out proxy not implemented"); + } else GenericProxyError("cannot connect to upstream http proxy", ec.message().c_str()); + } + /* will be called after some data received from client */ void HTTPReqHandler::HandleSockRecv(const boost::system::error_code & ecode, std::size_t len) { diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 5deb7c60..84818e8e 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -451,26 +451,26 @@ namespace http { s << "
\r\n"; } - static void ShowCommands (std::stringstream& s) + static void ShowCommands (std::stringstream& s, uint32_t token) { /* commands */ s << "Router Commands
\r\n"; - s << " Run peer test
\r\n"; + s << " Run peer test
\r\n"; //s << " Reload config
\r\n"; if (i2p::context.AcceptsTunnels ()) - s << " Decline transit tunnels
\r\n"; + s << " Decline transit tunnels
\r\n"; else - s << " Accept transit tunnels
\r\n"; + s << " Accept transit tunnels
\r\n"; #if (!defined(WIN32) && !defined(QT_GUI_LIB) && !defined(ANDROID)) if (Daemon.gracefulShutdownInterval) - s << " Cancel graceful shutdown
"; + s << " Cancel graceful shutdown
"; else - s << " Start graceful shutdown
\r\n"; + s << " Start graceful shutdown
\r\n"; #endif #ifdef WIN32_APP - s << " Graceful shutdown
\r\n"; + s << " Graceful shutdown
\r\n"; #endif - s << " Force shutdown
\r\n"; + s << " Force shutdown
\r\n"; } static void ShowTransitTunnels (std::stringstream& s) @@ -756,6 +756,7 @@ namespace http { SendReply (res, content); } + std::map HTTPConnection::m_Tokens; void HTTPConnection::HandlePage (const HTTPReq& req, HTTPRes& res, std::stringstream& s) { std::map params; @@ -771,7 +772,20 @@ namespace http { else if (page == HTTP_PAGE_TUNNELS) ShowTunnels (s); else if (page == HTTP_PAGE_COMMANDS) - ShowCommands (s); + { + uint32_t token; + RAND_bytes ((uint8_t *)&token, 4); + auto ts = i2p::util::GetSecondsSinceEpoch (); + for (auto it = m_Tokens.begin (); it != m_Tokens.end (); ) + { + if (ts > it->second + TOKEN_EXPIRATION_TIMEOUT) + it = m_Tokens.erase (it); + else + ++it; + } + m_Tokens[token] = ts; + ShowCommands (s, token); + } else if (page == HTTP_PAGE_TRANSIT_TUNNELS) ShowTransitTunnels (s); else if (page == HTTP_PAGE_LOCAL_DESTINATIONS) @@ -798,13 +812,19 @@ namespace http { void HTTPConnection::HandleCommand (const HTTPReq& req, HTTPRes& res, std::stringstream& s) { std::map params; - std::string cmd(""); URL url; url.parse(req.uri); url.parse_query(params); - cmd = params["cmd"]; + std::string token = params["token"]; + if (token.empty () || m_Tokens.find (std::stoi (token)) == m_Tokens.end ()) + { + ShowError(s, "Invalid token"); + return; + } + + std::string cmd = params["cmd"]; if (cmd == HTTP_COMMAND_RUN_PEER_TEST) i2p::transport::transports.PeerTest (); else if (cmd == HTTP_COMMAND_RELOAD_CONFIG) diff --git a/HTTPServer.h b/HTTPServer.h index 4a32702d..ec56e08a 100644 --- a/HTTPServer.h +++ b/HTTPServer.h @@ -1,10 +1,20 @@ #ifndef HTTP_SERVER_H__ #define HTTP_SERVER_H__ -namespace i2p { -namespace http { - extern const char *itoopieFavicon; - const size_t HTTP_CONNECTION_BUFFER_SIZE = 8192; +#include +#include +#include +#include +#include +#include +#include "HTTP.h" + +namespace i2p +{ +namespace http +{ + const size_t HTTP_CONNECTION_BUFFER_SIZE = 8192; + const int TOKEN_EXPIRATION_TIMEOUT = 30; // in seconds class HTTPConnection: public std::enable_shared_from_this { @@ -35,6 +45,8 @@ namespace http { bool needAuth; std::string user; std::string pass; + + static std::map m_Tokens; // token->timestamp in seconds }; class HTTPServer diff --git a/I2PService.cpp b/I2PService.cpp index f5ebcb0c..efcf61aa 100644 --- a/I2PService.cpp +++ b/I2PService.cpp @@ -53,7 +53,6 @@ namespace client void TCPIPPipe::Terminate() { if(Kill()) return; - Done(shared_from_this()); if (m_up) { if (m_up->is_open()) { m_up->close(); @@ -66,6 +65,7 @@ namespace client } m_down = nullptr; } + Done(shared_from_this()); } void TCPIPPipe::AsyncReceiveUpstream() @@ -90,11 +90,11 @@ namespace client } } - void TCPIPPipe::UpstreamWrite(const uint8_t * buf, size_t len) + void TCPIPPipe::UpstreamWrite(size_t len) { if (m_up) { LogPrint(eLogDebug, "TCPIPPipe: upstream: ", (int) len, " bytes written"); - boost::asio::async_write(*m_up, boost::asio::buffer(buf, len), + boost::asio::async_write(*m_up, boost::asio::buffer(m_upstream_buf, len), boost::asio::transfer_all(), std::bind(&TCPIPPipe::HandleUpstreamWrite, shared_from_this(), @@ -105,11 +105,11 @@ namespace client } } - void TCPIPPipe::DownstreamWrite(const uint8_t * buf, size_t len) + void TCPIPPipe::DownstreamWrite(size_t len) { if (m_down) { LogPrint(eLogDebug, "TCPIPPipe: downstream: ", (int) len, " bytes written"); - boost::asio::async_write(*m_down, boost::asio::buffer(buf, len), + boost::asio::async_write(*m_down, boost::asio::buffer(m_downstream_buf, len), boost::asio::transfer_all(), std::bind(&TCPIPPipe::HandleDownstreamWrite, shared_from_this(), @@ -131,9 +131,8 @@ namespace client } else { if (bytes_transfered > 0 ) { memcpy(m_upstream_buf, m_downstream_to_up_buf, bytes_transfered); - UpstreamWrite(m_upstream_buf, bytes_transfered); } - AsyncReceiveDownstream(); + UpstreamWrite(bytes_transfered); } } @@ -142,6 +141,8 @@ namespace client LogPrint(eLogError, "TCPIPPipe: downstream write error:" , ecode.message()); if (ecode != boost::asio::error::operation_aborted) Terminate(); + } else { + AsyncReceiveUpstream(); } } @@ -150,6 +151,8 @@ namespace client LogPrint(eLogError, "TCPIPPipe: upstream write error:" , ecode.message()); if (ecode != boost::asio::error::operation_aborted) Terminate(); + } else { + AsyncReceiveDownstream(); } } @@ -162,10 +165,9 @@ namespace client Terminate(); } else { if (bytes_transfered > 0 ) { - memcpy(m_upstream_buf, m_upstream_to_down_buf, bytes_transfered); - DownstreamWrite(m_upstream_buf, bytes_transfered); + memcpy(m_downstream_buf, m_upstream_to_down_buf, bytes_transfered); } - AsyncReceiveUpstream(); + DownstreamWrite(bytes_transfered); } } diff --git a/I2PService.h b/I2PService.h index 59746a6f..795b075e 100644 --- a/I2PService.h +++ b/I2PService.h @@ -77,7 +77,7 @@ namespace client std::atomic m_Dead; //To avoid cleaning up multiple times }; - const size_t TCP_IP_PIPE_BUFFER_SIZE = 8192; + const size_t TCP_IP_PIPE_BUFFER_SIZE = 8192 * 8; // bidirectional pipe for 2 tcp/ip sockets class TCPIPPipe: public I2PServiceHandler, public std::enable_shared_from_this { @@ -93,8 +93,8 @@ namespace client void HandleDownstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transferred); void HandleUpstreamWrite(const boost::system::error_code & ecode); void HandleDownstreamWrite(const boost::system::error_code & ecode); - void UpstreamWrite(const uint8_t * buf, size_t len); - void DownstreamWrite(const uint8_t * buf, size_t len); + void UpstreamWrite(size_t len); + void DownstreamWrite(size_t len); private: uint8_t m_upstream_to_down_buf[TCP_IP_PIPE_BUFFER_SIZE], m_downstream_to_up_buf[TCP_IP_PIPE_BUFFER_SIZE]; uint8_t m_upstream_buf[TCP_IP_PIPE_BUFFER_SIZE], m_downstream_buf[TCP_IP_PIPE_BUFFER_SIZE]; diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 93bcff0c..b6c2d705 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -92,7 +92,9 @@ namespace client m_Stream->Close (); m_Stream.reset (); } + m_Socket->shutdown(boost::asio::ip::tcp::socket::shutdown_send); // avoid RST m_Socket->close (); + Done(shared_from_this ()); } @@ -107,9 +109,11 @@ namespace client { if (ecode) { - LogPrint (eLogError, "I2PTunnel: read error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) + { + LogPrint (eLogError, "I2PTunnel: read error: ", ecode.message ()); Terminate (); + } } else { @@ -540,15 +544,32 @@ namespace client void I2PUDPServerTunnel::ExpireStale(const uint64_t delta) { std::lock_guard lock(m_SessionsMutex); uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); - std::remove_if(m_Sessions.begin(), m_Sessions.end(), [now, delta](const UDPSession * u) -> bool { - return now - u->LastActivity >= delta; - }); + auto itr = m_Sessions.begin(); + while(itr != m_Sessions.end()) { + if(now - (*itr)->LastActivity >= delta ) + itr = m_Sessions.erase(itr); + else + ++itr; + } } - - UDPSession * I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) + + void I2PUDPClientTunnel::ExpireStale(const uint64_t delta) { + std::lock_guard lock(m_SessionsMutex); + uint64_t now = i2p::util::GetMillisecondsSinceEpoch(); + std::vector removePorts; + for (const auto & s : m_Sessions) { + if (now - s.second.second >= delta) + removePorts.push_back(s.first); + } + for(auto port : removePorts) { + m_Sessions.erase(port); + } + } + + std::shared_ptr I2PUDPServerTunnel::ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort) { auto ih = from.GetIdentHash(); - for ( UDPSession * s : m_Sessions ) + for (auto & s : m_Sessions ) { if ( s->Identity == ih) { @@ -559,8 +580,9 @@ namespace client } /** create new udp session */ boost::asio::ip::udp::endpoint ep(m_LocalAddress, 0); - m_Sessions.push_back(new UDPSession(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort)); - return m_Sessions.back(); + m_Sessions.push_back(std::make_shared(ep, m_LocalDest, m_RemoteEndpoint, &ih, localPort, remotePort)); + auto & back = m_Sessions.back(); + return back; } UDPSession::UDPSession(boost::asio::ip::udp::endpoint localEndpoint, @@ -568,7 +590,6 @@ namespace client boost::asio::ip::udp::endpoint endpoint, const i2p::data::IdentHash * to, uint16_t ourPort, uint16_t theirPort) : m_Destination(localDestination->GetDatagramDestination()), - m_Service(localDestination->GetService()), IPSocket(localDestination->GetService(), localEndpoint), SendEndpoint(endpoint), LastActivity(i2p::util::GetMillisecondsSinceEpoch()), @@ -592,7 +613,7 @@ namespace client { LogPrint(eLogDebug, "UDPSession: forward ", len, "B from ", FromEndpoint); LastActivity = i2p::util::GetMillisecondsSinceEpoch(); - m_Destination->SendDatagramTo(m_Buffer, len, Identity, 0, 0); + m_Destination->SendDatagramTo(m_Buffer, len, Identity, LocalPort, RemotePort); Receive(); } else { LogPrint(eLogError, "UDPSession: ", ecode.message()); @@ -602,9 +623,8 @@ namespace client I2PUDPServerTunnel::I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, - const boost::asio::ip::address& localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) : + boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port) : m_Name(name), - LocalPort(port), m_LocalAddress(localAddress), m_RemoteEndpoint(forwardTo) { @@ -630,7 +650,7 @@ namespace client { std::vector > sessions; std::lock_guard lock(m_SessionsMutex); - for ( UDPSession * s : m_Sessions ) + for (auto & s : m_Sessions ) { if (!s->m_Destination) continue; auto info = s->m_Destination->GetInfoForRemote(s->Identity); @@ -652,13 +672,12 @@ namespace client std::shared_ptr localDestination, uint16_t remotePort) : m_Name(name), - m_Session(nullptr), m_RemoteDest(remoteDest), m_LocalDest(localDestination), m_LocalEndpoint(localEndpoint), m_RemoteIdent(nullptr), m_ResolveThread(nullptr), - LocalPort(localEndpoint.port()), + m_LocalSocket(localDestination->GetService(), localEndpoint), RemotePort(remotePort), m_cancel_resolve(false) { @@ -675,29 +694,44 @@ namespace client m_LocalDest->Start(); if (m_ResolveThread == nullptr) m_ResolveThread = new std::thread(std::bind(&I2PUDPClientTunnel::TryResolving, this)); + RecvFromLocal(); } + void I2PUDPClientTunnel::RecvFromLocal() + { + m_LocalSocket.async_receive_from(boost::asio::buffer(m_RecvBuff, I2P_UDP_MAX_MTU), + m_RecvEndpoint, std::bind(&I2PUDPClientTunnel::HandleRecvFromLocal, this, std::placeholders::_1, std::placeholders::_2)); + } + + void I2PUDPClientTunnel::HandleRecvFromLocal(const boost::system::error_code & ec, std::size_t transferred) + { + if(ec) { + LogPrint(eLogError, "UDP Client: ", ec.message()); + return; + } + if(!m_RemoteIdent) { + LogPrint(eLogWarning, "UDP Client: remote endpoint not resolved yet"); + RecvFromLocal(); + return; // drop, remote not resolved + } + auto remotePort = m_RecvEndpoint.port(); + auto itr = m_Sessions.find(remotePort); + if (itr == m_Sessions.end()) { + // track new udp convo + m_Sessions[remotePort] = {boost::asio::ip::udp::endpoint(m_RecvEndpoint), 0}; + } + // send off to remote i2p destination + LogPrint(eLogDebug, "UDP Client: send ", transferred, " to ", m_RemoteIdent->ToBase32(), ":", RemotePort); + m_LocalDest->GetDatagramDestination()->SendDatagramTo(m_RecvBuff, transferred, *m_RemoteIdent, remotePort, RemotePort); + // mark convo as active + m_Sessions[remotePort].second = i2p::util::GetMillisecondsSinceEpoch(); + RecvFromLocal(); + } + std::vector > I2PUDPClientTunnel::GetSessions() { + // TODO: implement std::vector > infos; - if(m_Session && m_LocalDest) - { - auto s = m_Session; - if (s->m_Destination) - { - auto info = m_Session->m_Destination->GetInfoForRemote(s->Identity); - if(info) - { - auto sinfo = std::make_shared(); - sinfo->Name = m_Name; - sinfo->LocalIdent = std::make_shared(m_LocalDest->GetIdentHash().data()); - sinfo->RemoteIdent = std::make_shared(s->Identity.data()); - sinfo->CurrentIBGW = info->IBGW; - sinfo->CurrentOBEP = info->OBEP; - infos.push_back(sinfo); - } - } - } return infos; } @@ -717,26 +751,28 @@ namespace client return; } LogPrint(eLogInfo, "UDP Tunnel: resolved ", m_RemoteDest, " to ", m_RemoteIdent->ToBase32()); - // delete existing session - if(m_Session) delete m_Session; - - boost::asio::ip::udp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 0); - m_Session = new UDPSession(m_LocalEndpoint, m_LocalDest, ep, m_RemoteIdent, LocalPort, RemotePort); } void I2PUDPClientTunnel::HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) { if(m_RemoteIdent && from.GetIdentHash() == *m_RemoteIdent) { - // address match - if(m_Session) + auto itr = m_Sessions.find(toPort); + // found convo ? + if(itr != m_Sessions.end()) { - // tell session - LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32()); - m_Session->IPSocket.send_to(boost::asio::buffer(buf, len), m_Session->FromEndpoint); + // found convo + if (len > 0) { + LogPrint(eLogDebug, "UDP Client: got ", len, "B from ", from.GetIdentHash().ToBase32()); + uint8_t sendbuf[len]; + memcpy(sendbuf, buf, len); + m_LocalSocket.send_to(boost::asio::buffer(buf, len), itr->second.first); + // mark convo as active + itr->second.second = i2p::util::GetMillisecondsSinceEpoch(); + } } else - LogPrint(eLogWarning, "UDP Client: no session"); + LogPrint(eLogWarning, "UDP Client: not tracking udp session using port ", (int) toPort); } else LogPrint(eLogWarning, "UDP Client: unwarrented traffic from ", from.GetIdentHash().ToBase32()); @@ -747,7 +783,11 @@ namespace client auto dgram = m_LocalDest->GetDatagramDestination(); if (dgram) dgram->ResetReceiver(); - if (m_Session) delete m_Session; + m_Sessions.clear(); + + if(m_LocalSocket.is_open()) + m_LocalSocket.close(); + m_cancel_resolve = true; if(m_ResolveThread) diff --git a/I2PTunnel.h b/I2PTunnel.h index e6f0e84f..b49efe40 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -141,7 +142,6 @@ namespace client struct UDPSession { i2p::datagram::DatagramDestination * m_Destination; - boost::asio::io_service & m_Service; boost::asio::ip::udp::socket IPSocket; i2p::data::IdentHash Identity; boost::asio::ip::udp::endpoint FromEndpoint; @@ -189,7 +189,7 @@ namespace client public: I2PUDPServerTunnel(const std::string & name, std::shared_ptr localDestination, - const boost::asio::ip::address & localAddress, + boost::asio::ip::address localAddress, boost::asio::ip::udp::endpoint forwardTo, uint16_t port); ~I2PUDPServerTunnel(); /** expire stale udp conversations */ @@ -202,15 +202,14 @@ namespace client private: void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); - UDPSession * ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); + std::shared_ptr ObtainUDPSession(const i2p::data::IdentityEx& from, uint16_t localPort, uint16_t remotePort); private: const std::string m_Name; - const uint16_t LocalPort; boost::asio::ip::address m_LocalAddress; boost::asio::ip::udp::endpoint m_RemoteEndpoint; std::mutex m_SessionsMutex; - std::vector m_Sessions; + std::vector > m_Sessions; std::shared_ptr m_LocalDest; }; @@ -228,18 +227,25 @@ namespace client bool IsLocalDestination(const i2p::data::IdentHash & destination) const { return destination == m_LocalDest->GetIdentHash(); } std::shared_ptr GetLocalDestination () const { return m_LocalDest; } + void ExpireStale(const uint64_t delta=I2P_UDP_SESSION_TIMEOUT); private: + typedef std::pair UDPConvo; + void RecvFromLocal(); + void HandleRecvFromLocal(const boost::system::error_code & e, std::size_t transferred); void HandleRecvFromI2P(const i2p::data::IdentityEx& from, uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void TryResolving(); const std::string m_Name; - UDPSession * m_Session; + std::mutex m_SessionsMutex; + std::map m_Sessions; // maps i2p port -> local udp convo const std::string m_RemoteDest; std::shared_ptr m_LocalDest; const boost::asio::ip::udp::endpoint m_LocalEndpoint; i2p::data::IdentHash * m_RemoteIdent; std::thread * m_ResolveThread; - uint16_t LocalPort; + boost::asio::ip::udp::socket m_LocalSocket; + boost::asio::ip::udp::endpoint m_RecvEndpoint; + uint8_t m_RecvBuff[I2P_UDP_MAX_MTU]; uint16_t RemotePort; bool m_cancel_resolve; }; diff --git a/Identity.h b/Identity.h index 49dada48..7a9d049a 100644 --- a/Identity.h +++ b/Identity.h @@ -133,7 +133,7 @@ namespace data size_t FromBase64(const std::string& s); std::string ToBase64 () const; - static PrivateKeys CreateRandomKeys (SigningKeyType type = SIGNING_KEY_TYPE_DSA_SHA1); + static PrivateKeys CreateRandomKeys (SigningKeyType type = SIGNING_KEY_TYPE_EDDSA_SHA512_ED25519); private: diff --git a/LeaseSet.cpp b/LeaseSet.cpp index 04dc77c5..bddb517e 100644 --- a/LeaseSet.cpp +++ b/LeaseSet.cpp @@ -91,7 +91,7 @@ namespace data if (m_StoreLeases) { auto ret = m_Leases.insert (std::make_shared(lease)); - if (!ret.second) *(*ret.first) = lease; // update existing + if (!ret.second) (*ret.first)->endDate = lease.endDate; // update existing (*ret.first)->isUpdated = true; // check if lease's gateway is in our netDb if (!netdb.FindRouter (lease.tunnelGateway)) diff --git a/Makefile.mingw b/Makefile.mingw index e2dae747..40cf14b3 100644 --- a/Makefile.mingw +++ b/Makefile.mingw @@ -39,9 +39,13 @@ endif # don't change following line to ifeq ($(USE_AESNI),yes) !!! ifeq ($(USE_AESNI),1) - CPU_FLAGS = -maes -DAESNI + CPU_FLAGS += -maes -DAESNI else - CPU_FLAGS = -msse + CPU_FLAGS += -msse +endif + +ifeq ($(USE_AVX),1) + CPU_FLAGS += -mavx endif ifeq ($(USE_ASLR),yes) diff --git a/Makefile.osx b/Makefile.osx index f40ce1af..c8a7de2a 100644 --- a/Makefile.osx +++ b/Makefile.osx @@ -3,21 +3,26 @@ CXXFLAGS = -g -Wall -std=c++11 -DMAC_OSX #CXXFLAGS = -g -O2 -Wall -std=c++11 INCFLAGS = -I/usr/local/include -I/usr/local/ssl/include LDFLAGS = -Wl,-rpath,/usr/local/lib -L/usr/local/lib -L/usr/local/ssl/lib + +ifeq ($(USE_STATIC),yes) +LDLIBS = -lz -lcrypto -lssl /usr/local/lib/libboost_system.a /usr/local/lib/libboost_date_time.a /usr/local/lib/libboost_filesystem.a /usr/local/lib/libboost_program_options.a -lpthread +else LDLIBS = -lz -lcrypto -lssl -lboost_system -lboost_date_time -lboost_filesystem -lboost_program_options -lpthread +endif ifeq ($(USE_UPNP),yes) LDFLAGS += -ldl CXXFLAGS += -DUSE_UPNP endif -# OSX Notes -# http://www.hutsby.net/2011/08/macs-with-aes-ni.html -# Seems like all recent Mac's have AES-NI, after firmware upgrade 2.2 -# Found no good way to detect it from command line. TODO: Might be some osx sysinfo magic ifeq ($(USE_AESNI),yes) CXXFLAGS += -maes -DAESNI endif +ifeq ($(USE_AVX),yes) + CXXFLAGS += -mavx +endif + # Disabled, since it will be the default make rule. I think its better # to define the default rule in Makefile and not Makefile. - torkel #install: all diff --git a/NTCPSession.cpp b/NTCPSession.cpp index 81cfe687..1d3cd95b 100644 --- a/NTCPSession.cpp +++ b/NTCPSession.cpp @@ -610,7 +610,7 @@ namespace transport if (!m_NextMessage->IsExpired ()) { #ifdef WITH_EVENTS - EmitEvent({{"type", "transport.recvmsg"} , {"ident", GetIdentHashBase64()}, {"number", "1"}}); + QueueIntEvent("transport.recvmsg", GetIdentHashBase64(), 1); #endif m_Handler.PutNextMessage (m_NextMessage); } diff --git a/NetDb.cpp b/NetDb.cpp index a1f0fcb7..88d5483f 100644 --- a/NetDb.cpp +++ b/NetDb.cpp @@ -912,7 +912,6 @@ namespace data uint8_t randomHash[32]; std::vector msgs; - std::set floodfills; LogPrint (eLogInfo, "NetDb: exploring new ", numDestinations, " routers ..."); for (int i = 0; i < numDestinations; i++) { @@ -924,9 +923,8 @@ namespace data return; } auto floodfill = GetClosestFloodfill (randomHash, dest->GetExcludedPeers ()); - if (floodfill && !floodfills.count (floodfill.get ())) // request floodfill only once + if (floodfill) { - floodfills.insert (floodfill.get ()); if (i2p::transport::transports.IsConnected (floodfill->GetIdentHash ())) throughTunnels = false; if (throughTunnels) diff --git a/RouterInfo.cpp b/RouterInfo.cpp index b570d6c2..4ae2f9e5 100644 --- a/RouterInfo.cpp +++ b/RouterInfo.cpp @@ -167,19 +167,19 @@ namespace data { uint8_t supportedTransports = 0; bool isValidAddress = true; - Address address; - s.read ((char *)&address.cost, sizeof (address.cost)); - s.read ((char *)&address.date, sizeof (address.date)); + auto address = std::make_shared
(); + s.read ((char *)&address->cost, sizeof (address->cost)); + s.read ((char *)&address->date, sizeof (address->date)); char transportStyle[5]; ReadString (transportStyle, 5, s); if (!strcmp (transportStyle, "NTCP")) - address.transportStyle = eTransportNTCP; + address->transportStyle = eTransportNTCP; else if (!strcmp (transportStyle, "SSU")) - address.transportStyle = eTransportSSU; + address->transportStyle = eTransportSSU; else - address.transportStyle = eTransportUnknown; - address.port = 0; - address.mtu = 0; + address->transportStyle = eTransportUnknown; + address->port = 0; + address->mtu = 0; uint16_t size, r = 0; s.read ((char *)&size, sizeof (size)); if (!s) return; size = be16toh (size); @@ -194,35 +194,35 @@ namespace data if (!strcmp (key, "host")) { boost::system::error_code ecode; - address.host = boost::asio::ip::address::from_string (value, ecode); + address->host = boost::asio::ip::address::from_string (value, ecode); if (ecode) { - if (address.transportStyle == eTransportNTCP) + if (address->transportStyle == eTransportNTCP) { supportedTransports |= eNTCPV4; // TODO: - address.addressString = value; + address->addressString = value; } else { supportedTransports |= eSSUV4; // TODO: - address.addressString = value; + address->addressString = value; } } else { // add supported protocol - if (address.host.is_v4 ()) - supportedTransports |= (address.transportStyle == eTransportNTCP) ? eNTCPV4 : eSSUV4; + if (address->host.is_v4 ()) + supportedTransports |= (address->transportStyle == eTransportNTCP) ? eNTCPV4 : eSSUV4; else - supportedTransports |= (address.transportStyle == eTransportNTCP) ? eNTCPV6 : eSSUV6; + supportedTransports |= (address->transportStyle == eTransportNTCP) ? eNTCPV6 : eSSUV6; } } else if (!strcmp (key, "port")) - address.port = boost::lexical_cast(value); + address->port = boost::lexical_cast(value); else if (!strcmp (key, "mtu")) - address.mtu = boost::lexical_cast(value); + address->mtu = boost::lexical_cast(value); else if (!strcmp (key, "key")) - Base64ToByteStream (value, strlen (value), address.key, 32); + Base64ToByteStream (value, strlen (value), address->key, 32); else if (!strcmp (key, "caps")) ExtractCaps (value); else if (key[0] == 'i') @@ -237,9 +237,9 @@ namespace data LogPrint (eLogError, "RouterInfo: Unexpected introducer's index ", index, " skipped"); if (s) continue; else return; } - if (index >= address.introducers.size ()) - address.introducers.resize (index + 1); - Introducer& introducer = address.introducers.at (index); + if (index >= address->introducers.size ()) + address->introducers.resize (index + 1); + Introducer& introducer = address->introducers.at (index); if (!strcmp (key, "ihost")) { boost::system::error_code ecode; @@ -256,7 +256,7 @@ namespace data } if (isValidAddress) { - addresses->push_back(std::make_shared
(address)); + addresses->push_back(address); m_SupportedTransports |= supportedTransports; } } diff --git a/SSUData.cpp b/SSUData.cpp index ad38cf25..48e3e3f1 100644 --- a/SSUData.cpp +++ b/SSUData.cpp @@ -240,7 +240,7 @@ namespace transport if (!msg->IsExpired ()) { #ifdef WITH_EVENTS - EmitEvent({{"type", "transport.recvmsg"} , {"ident", m_Session.GetIdentHashBase64()}, {"number", "1"}}); + QueueIntEvent("transport.recvmsg", m_Session.GetIdentHashBase64(), 1); #endif m_Handler.PutNextMessage (msg); } diff --git a/SSUSession.cpp b/SSUSession.cpp index 696d367f..0e277432 100644 --- a/SSUSession.cpp +++ b/SSUSession.cpp @@ -1101,7 +1101,7 @@ namespace transport { // we are Alice LogPrint (eLogDebug, "SSU: sending peer test"); - auto address = i2p::context.GetRouterInfo ().GetSSUAddress (false); + auto address = i2p::context.GetRouterInfo ().GetSSUAddress (i2p::context.SupportsV4 ()); if (!address) { LogPrint (eLogInfo, "SSU is not supported. Can't send peer test"); diff --git a/Transports.cpp b/Transports.cpp index 1b9d52a1..efcd6742 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -249,7 +249,7 @@ namespace transport void Transports::SendMessages (const i2p::data::IdentHash& ident, const std::vector >& msgs) { #ifdef WITH_EVENTS - EmitEvent({{"type" , "transport.sendmsg"}, {"ident", ident.ToBase64()}, {"number", std::to_string(msgs.size())}}); + QueueIntEvent("transport.send", ident.ToBase64(), msgs.size()); #endif m_Service.post (std::bind (&Transports::PostMessages, this, ident, msgs)); } diff --git a/Tunnel.cpp b/Tunnel.cpp index e1f5c035..e271052e 100644 --- a/Tunnel.cpp +++ b/Tunnel.cpp @@ -612,6 +612,7 @@ namespace tunnel for (auto it = pendingTunnels.begin (); it != pendingTunnels.end ();) { auto tunnel = it->second; + auto pool = tunnel->GetTunnelPool(); switch (tunnel->GetState ()) { case eTunnelStatePending: @@ -637,6 +638,8 @@ namespace tunnel #ifdef WITH_EVENTS EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); #endif + // for i2lua + if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultTimeout); // delete it = pendingTunnels.erase (it); m_NumFailedTunnelCreations++; @@ -649,6 +652,9 @@ namespace tunnel #ifdef WITH_EVENTS EmitTunnelEvent("tunnel.state", tunnel.get(), eTunnelStateBuildFailed); #endif + // for i2lua + if(pool) pool->OnTunnelBuildResult(tunnel, eBuildResultRejected); + it = pendingTunnels.erase (it); m_NumFailedTunnelCreations++; break; diff --git a/TunnelPool.cpp b/TunnelPool.cpp index 07057918..dfdfbf58 100644 --- a/TunnelPool.cpp +++ b/TunnelPool.cpp @@ -81,6 +81,8 @@ namespace tunnel } if (m_LocalDestination) m_LocalDestination->SetLeaseSetUpdated (); + + OnTunnelBuildResult(createdTunnel, eBuildResultOkay); } void TunnelPool::TunnelExpired (std::shared_ptr expiredTunnel) @@ -109,6 +111,8 @@ namespace tunnel std::unique_lock l(m_OutboundTunnelsMutex); m_OutboundTunnels.insert (createdTunnel); } + OnTunnelBuildResult(createdTunnel, eBuildResultOkay); + //CreatePairedInboundTunnel (createdTunnel); } @@ -579,5 +583,11 @@ namespace tunnel } return tun; } + + void TunnelPool::OnTunnelBuildResult(std::shared_ptr tunnel, TunnelBuildResult result) + { + auto peers = tunnel->GetPeers(); + if(m_CustomPeerSelector) m_CustomPeerSelector->OnBuildResult(peers, tunnel->IsInbound(), result); + } } } diff --git a/TunnelPool.h b/TunnelPool.h index 6a73bd67..9e2a3e24 100644 --- a/TunnelPool.h +++ b/TunnelPool.h @@ -23,12 +23,21 @@ namespace tunnel class InboundTunnel; class OutboundTunnel; + + enum TunnelBuildResult { + eBuildResultOkay, // tunnel was built okay + eBuildResultRejected, // tunnel build was explicitly rejected + eBuildResultTimeout // tunnel build timed out + }; + /** interface for custom tunnel peer selection algorithm */ struct ITunnelPeerSelector { typedef std::shared_ptr Peer; typedef std::vector TunnelPath; + virtual bool SelectPeers(TunnelPath & peers, int hops, bool isInbound) = 0; + virtual bool OnBuildResult(TunnelPath & peers, bool isInbound, TunnelBuildResult result) = 0; }; typedef std::shared_ptr TunnelPeerSelector; @@ -79,6 +88,8 @@ namespace tunnel /** @brief get the lowest latency tunnel in this tunnel pool regardless of latency requirements */ std::shared_ptr GetLowestLatencyInboundTunnel(std::shared_ptr exclude=nullptr) const; std::shared_ptr GetLowestLatencyOutboundTunnel(std::shared_ptr exclude=nullptr) const; + + void OnTunnelBuildResult(std::shared_ptr tunnel, TunnelBuildResult result); private: diff --git a/Websocket.cpp b/Websocket.cpp index 0de44efe..a1a58c5f 100644 --- a/Websocket.cpp +++ b/Websocket.cpp @@ -2,6 +2,7 @@ #include "Log.h" #include +#include #include #include @@ -27,7 +28,11 @@ namespace i2p typedef ServerImpl::message_ptr MessagePtr; public: - WebsocketServerImpl(const std::string & addr, int port) : m_run(false), m_thread(nullptr) + WebsocketServerImpl(const std::string & addr, int port) : + m_run(false), + m_ws_thread(nullptr), + m_ev_thread(nullptr), + m_WebsocketTicker(m_Service) { m_server.init_asio(); m_server.set_open_handler(std::bind(&WebsocketServerImpl::ConnOpened, this, std::placeholders::_1)); @@ -44,7 +49,7 @@ namespace i2p void Start() { m_run = true; m_server.start_accept(); - m_thread = new std::thread([&] () { + m_ws_thread = new std::thread([&] () { while(m_run) { try { m_server.run(); @@ -53,16 +58,35 @@ namespace i2p } } }); + m_ev_thread = new std::thread([&] () { + while(m_run) { + try { + m_Service.run(); + break; + } catch (std::exception & e ) { + LogPrint(eLogError, "Websocket service: ", e.what()); + } + } + }); + ScheduleTick(); } void Stop() { m_run = false; + m_Service.stop(); m_server.stop(); - if(m_thread) { - m_thread->join(); - delete m_thread; + + if(m_ev_thread) { + m_ev_thread->join(); + delete m_ev_thread; } - m_thread = nullptr; + m_ev_thread = nullptr; + + if(m_ws_thread) { + m_ws_thread->join(); + delete m_ws_thread; + } + m_ws_thread = nullptr; } void ConnOpened(ServerConn c) @@ -82,11 +106,40 @@ namespace i2p (void) conn; (void) msg; } + + void HandleTick(const boost::system::error_code & ec) + { + + if(ec != boost::asio::error::operation_aborted) + LogPrint(eLogError, "Websocket ticker: ", ec.message()); + // pump collected events to us + i2p::event::core.PumpCollected(this); + ScheduleTick(); + } + + void ScheduleTick() + { + LogPrint(eLogDebug, "Websocket schedule tick"); + boost::posix_time::seconds dlt(1); + m_WebsocketTicker.expires_from_now(dlt); + m_WebsocketTicker.async_wait(std::bind(&WebsocketServerImpl::HandleTick, this, std::placeholders::_1)); + } + + /** @brief called from m_ev_thread */ + void HandlePumpEvent(const EventType & ev, const uint64_t & val) + { + EventType e; + for (const auto & i : ev) + e[i.first] = i.second; + + e["number"] = std::to_string(val); + HandleEvent(e); + } + /** @brief called from m_ws_thread */ void HandleEvent(const EventType & ev) { std::lock_guard lock(m_connsMutex); - LogPrint(eLogDebug, "websocket event"); boost::property_tree::ptree event; for (const auto & item : ev) { event.put(item.first, item.second); @@ -105,10 +158,13 @@ namespace i2p private: typedef std::set > ConnList; bool m_run; - std::thread * m_thread; + std::thread * m_ws_thread; + std::thread * m_ev_thread; std::mutex m_connsMutex; ConnList m_conns; ServerImpl m_server; + boost::asio::io_service m_Service; + boost::asio::deadline_timer m_WebsocketTicker; }; diff --git a/version.h b/version.h index ef88cbcb..ed84efd1 100644 --- a/version.h +++ b/version.h @@ -21,7 +21,7 @@ #define I2P_VERSION_MAJOR 0 #define I2P_VERSION_MINOR 9 -#define I2P_VERSION_MICRO 27 +#define I2P_VERSION_MICRO 28 #define I2P_VERSION_PATCH 0 #define I2P_VERSION MAKE_VERSION(I2P_VERSION_MAJOR, I2P_VERSION_MINOR, I2P_VERSION_MICRO)