Browse Source

remove SSU

pull/1811/head
orignal 2 years ago
parent
commit
7705423c42
  1. 77
      libi2pd/BloomFilter.cpp
  2. 39
      libi2pd/BloomFilter.h
  3. 120
      libi2pd/Crypto.cpp
  4. 22
      libi2pd/Crypto.h
  5. 996
      libi2pd/SSU.cpp
  6. 159
      libi2pd/SSU.h
  7. 516
      libi2pd/SSUData.cpp
  8. 131
      libi2pd/SSUData.h
  9. 1318
      libi2pd/SSUSession.cpp
  10. 177
      libi2pd/SSUSession.h

77
libi2pd/BloomFilter.cpp

@ -1,77 +0,0 @@
/*
* Copyright (c) 2013-2020, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
* See full license text in LICENSE file at top of project tree
*/
#include "BloomFilter.h"
#include "I2PEndian.h"
#include <array>
#include <openssl/sha.h>
namespace i2p
{
namespace util
{
/** @brief decaying bloom filter implementation */
class DecayingBloomFilter : public IBloomFilter
{
public:
DecayingBloomFilter(const std::size_t size)
{
m_Size = size;
m_Data = new uint8_t[size];
}
/** @brief implements IBloomFilter::~IBloomFilter */
~DecayingBloomFilter()
{
delete [] m_Data;
}
/** @brief implements IBloomFilter::Add */
bool Add(const uint8_t * data, std::size_t len)
{
std::size_t idx;
uint8_t mask;
Get(data, len, idx, mask);
if(m_Data[idx] & mask) return false; // filter hit
m_Data[idx] |= mask;
return true;
}
/** @brief implements IBloomFilter::Decay */
void Decay()
{
// reset bloom filter buffer
memset(m_Data, 0, m_Size);
}
private:
/** @brief get bit index for for data */
void Get(const uint8_t * data, std::size_t len, std::size_t & idx, uint8_t & bm)
{
bm = 1;
uint8_t digest[32];
// TODO: use blake2 because it's faster
SHA256(data, len, digest);
uint64_t i = buf64toh(digest);
idx = i % m_Size;
bm <<= (i % 8);
}
uint8_t * m_Data;
std::size_t m_Size;
};
BloomFilterPtr BloomFilter(std::size_t capacity)
{
return std::make_shared<DecayingBloomFilter>(capacity);
}
}
}

39
libi2pd/BloomFilter.h

@ -1,39 +0,0 @@
/*
* Copyright (c) 2013-2020, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
* See full license text in LICENSE file at top of project tree
*/
#ifndef BLOOM_FILTER_H_
#define BLOOM_FILTER_H_
#include <memory>
#include <cstdint>
namespace i2p
{
namespace util
{
/** @brief interface for bloom filter */
struct IBloomFilter
{
/** @brief destructor */
virtual ~IBloomFilter() {};
/** @brief add entry to bloom filter, return false if filter hit otherwise return true */
virtual bool Add(const uint8_t * data, std::size_t len) = 0;
/** @brief optionally decay old entries */
virtual void Decay() = 0;
};
typedef std::shared_ptr<IBloomFilter> BloomFilterPtr;
/** @brief create bloom filter */
BloomFilterPtr BloomFilter(std::size_t capacity = 1024 * 8);
}
}
#endif

120
libi2pd/Crypto.cpp

@ -239,55 +239,6 @@ namespace crypto
static BIGNUM * (* g_ElggTable)[255] = nullptr; static BIGNUM * (* g_ElggTable)[255] = nullptr;
// DH
DHKeys::DHKeys ()
{
m_DH = DH_new ();
DH_set0_pqg (m_DH, BN_dup (elgp), NULL, BN_dup (elgg));
DH_set0_key (m_DH, NULL, NULL);
}
DHKeys::~DHKeys ()
{
DH_free (m_DH);
}
void DHKeys::GenerateKeys ()
{
BIGNUM * priv_key = NULL, * pub_key = NULL;
#if !defined(__x86_64__) // use short exponent for non x64
priv_key = BN_new ();
BN_rand (priv_key, ELGAMAL_SHORT_EXPONENT_NUM_BITS, 0, 1);
#endif
if (g_ElggTable)
{
#if defined(__x86_64__)
priv_key = BN_new ();
BN_rand (priv_key, ELGAMAL_FULL_EXPONENT_NUM_BITS, 0, 1);
#endif
auto ctx = BN_CTX_new ();
pub_key = ElggPow (priv_key, g_ElggTable, ctx);
DH_set0_key (m_DH, pub_key, priv_key);
BN_CTX_free (ctx);
}
else
{
DH_set0_key (m_DH, NULL, priv_key);
DH_generate_key (m_DH);
DH_get0_key (m_DH, (const BIGNUM **)&pub_key, (const BIGNUM **)&priv_key);
}
bn2buf (pub_key, m_PublicKey, 256);
}
void DHKeys::Agree (const uint8_t * pub, uint8_t * shared)
{
BIGNUM * pk = BN_bin2bn (pub, 256, NULL);
DH_compute_key (shared, pk, m_DH);
BN_free (pk);
}
// x25519 // x25519
X25519Keys::X25519Keys () X25519Keys::X25519Keys ()
{ {
@ -601,77 +552,6 @@ namespace crypto
BN_CTX_free (ctx); BN_CTX_free (ctx);
} }
// HMAC
const uint64_t IPAD = 0x3636363636363636;
const uint64_t OPAD = 0x5C5C5C5C5C5C5C5C;
static const uint64_t ipads[] = { IPAD, IPAD, IPAD, IPAD };
static const uint64_t opads[] = { OPAD, OPAD, OPAD, OPAD };
void HMACMD5Digest (uint8_t * msg, size_t len, const MACKey& key, uint8_t * digest)
// key is 32 bytes
// digest is 16 bytes
// block size is 64 bytes
{
uint64_t buf[256];
uint64_t hash[12]; // 96 bytes
#if (defined(__x86_64__) || defined(__i386__)) && defined(__AVX__) // not all X86 targets supports AVX (like old Pentium, see #1600)
if(i2p::cpu::avx)
{
__asm__
(
"vmovups %[key], %%ymm0 \n"
"vmovups %[ipad], %%ymm1 \n"
"vmovups %%ymm1, 32(%[buf]) \n"
"vxorps %%ymm0, %%ymm1, %%ymm1 \n"
"vmovups %%ymm1, (%[buf]) \n"
"vmovups %[opad], %%ymm1 \n"
"vmovups %%ymm1, 32(%[hash]) \n"
"vxorps %%ymm0, %%ymm1, %%ymm1 \n"
"vmovups %%ymm1, (%[hash]) \n"
"vzeroall \n" // end of AVX
"movups %%xmm0, 80(%[hash]) \n" // zero last 16 bytes
:
: [key]"m"(*(const uint8_t *)key), [ipad]"m"(*ipads), [opad]"m"(*opads),
[buf]"r"(buf), [hash]"r"(hash)
: "memory", "%xmm0" // TODO: change to %ymm0 later
);
}
else
#endif
{
// ikeypad
buf[0] = key.GetLL ()[0] ^ IPAD;
buf[1] = key.GetLL ()[1] ^ IPAD;
buf[2] = key.GetLL ()[2] ^ IPAD;
buf[3] = key.GetLL ()[3] ^ IPAD;
buf[4] = IPAD;
buf[5] = IPAD;
buf[6] = IPAD;
buf[7] = IPAD;
// okeypad
hash[0] = key.GetLL ()[0] ^ OPAD;
hash[1] = key.GetLL ()[1] ^ OPAD;
hash[2] = key.GetLL ()[2] ^ OPAD;
hash[3] = key.GetLL ()[3] ^ OPAD;
hash[4] = OPAD;
hash[5] = OPAD;
hash[6] = OPAD;
hash[7] = OPAD;
// fill last 16 bytes with zeros (first hash size assumed 32 bytes in I2P)
memset (hash + 10, 0, 16);
}
// concatenate with msg
memcpy (buf + 8, msg, len);
// calculate first hash
MD5((uint8_t *)buf, len + 64, (uint8_t *)(hash + 8)); // 16 bytes
// calculate digest
MD5((uint8_t *)hash, 96, digest);
}
// AES // AES
#ifdef __AES__ #ifdef __AES__
#define KeyExpansion256(round0,round1) \ #define KeyExpansion256(round0,round1) \

22
libi2pd/Crypto.h

@ -62,24 +62,6 @@ namespace crypto
// RSA // RSA
const BIGNUM * GetRSAE (); const BIGNUM * GetRSAE ();
// DH
class DHKeys
{
public:
DHKeys ();
~DHKeys ();
void GenerateKeys ();
const uint8_t * GetPublicKey () const { return m_PublicKey; };
void Agree (const uint8_t * pub, uint8_t * shared);
private:
DH * m_DH;
uint8_t m_PublicKey[256];
};
// x25519 // x25519
class X25519Keys class X25519Keys
{ {
@ -121,10 +103,6 @@ namespace crypto
bool ECIESDecrypt (const EC_GROUP * curve, const BIGNUM * key, const uint8_t * encrypted, uint8_t * data); // 514 bytes encrypted, 222 data bool ECIESDecrypt (const EC_GROUP * curve, const BIGNUM * key, const uint8_t * encrypted, uint8_t * data); // 514 bytes encrypted, 222 data
void GenerateECIESKeyPair (const EC_GROUP * curve, BIGNUM *& priv, EC_POINT *& pub); void GenerateECIESKeyPair (const EC_GROUP * curve, BIGNUM *& priv, EC_POINT *& pub);
// HMAC
typedef i2p::data::Tag<32> MACKey;
void HMACMD5Digest (uint8_t * msg, size_t len, const MACKey& key, uint8_t * digest);
// AES // AES
struct ChipherBlock struct ChipherBlock
{ {

996
libi2pd/SSU.cpp

@ -1,996 +0,0 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
* See full license text in LICENSE file at top of project tree
*/
#include <string.h>
#include "Log.h"
#include "Timestamp.h"
#include "RouterContext.h"
#include "NetDb.hpp"
#include "Config.h"
#include "util.h"
#include "SSU.h"
#if defined(__linux__) && !defined(_NETINET_IN_H)
#include <linux/in6.h>
#endif
#ifdef _WIN32
#include <boost/winapi/error_codes.hpp>
#endif
namespace i2p
{
namespace transport
{
SSUServer::SSUServer (int port):
m_IsRunning(false), m_Thread (nullptr),
m_ReceiversThread (nullptr), m_ReceiversThreadV6 (nullptr), m_Work (m_Service),
m_ReceiversWork (m_ReceiversService), m_ReceiversWorkV6 (m_ReceiversServiceV6),
m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port),
m_Socket (m_ReceiversService), m_SocketV6 (m_ReceiversServiceV6),
m_IntroducersUpdateTimer (m_Service), m_IntroducersUpdateTimerV6 (m_Service),
m_PeerTestsCleanupTimer (m_Service), m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_Service),
m_IsSyncClockFromPeers (true)
{
}
SSUServer::~SSUServer ()
{
}
void SSUServer::OpenSocket ()
{
try
{
m_Socket.open (boost::asio::ip::udp::v4());
m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (SSU_SOCKET_RECEIVE_BUFFER_SIZE));
m_Socket.set_option (boost::asio::socket_base::send_buffer_size (SSU_SOCKET_SEND_BUFFER_SIZE));
m_Socket.bind (m_Endpoint);
LogPrint (eLogInfo, "SSU: Start listening v4 port ", m_Endpoint.port());
}
catch ( std::exception & ex )
{
LogPrint (eLogError, "SSU: Failed to bind to v4 port ", m_Endpoint.port(), ": ", ex.what());
ThrowFatal ("Unable to start IPv4 SSU transport at port ", m_Endpoint.port(), ": ", ex.what ());
}
}
void SSUServer::OpenSocketV6 ()
{
try
{
m_SocketV6.open (boost::asio::ip::udp::v6());
m_SocketV6.set_option (boost::asio::ip::v6_only (true));
m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (SSU_SOCKET_RECEIVE_BUFFER_SIZE));
m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (SSU_SOCKET_SEND_BUFFER_SIZE));
#if defined(__linux__) && !defined(_NETINET_IN_H)
if (m_EndpointV6.address() == boost::asio::ip::address().from_string("::")) // only if not binded to address
{
// Set preference to use public IPv6 address -- tested on linux, not works on windows, and not tested on others
#if (BOOST_VERSION >= 105500)
typedef boost::asio::detail::socket_option::integer<BOOST_ASIO_OS_DEF(IPPROTO_IPV6), IPV6_ADDR_PREFERENCES> ipv6PreferAddr;
#else
typedef boost::asio::detail::socket_option::integer<IPPROTO_IPV6, IPV6_ADDR_PREFERENCES> ipv6PreferAddr;
#endif
m_SocketV6.set_option (ipv6PreferAddr(IPV6_PREFER_SRC_PUBLIC | IPV6_PREFER_SRC_HOME | IPV6_PREFER_SRC_NONCGA));
}
#endif
m_SocketV6.bind (m_EndpointV6);
LogPrint (eLogInfo, "SSU: Start listening v6 port ", m_EndpointV6.port());
}
catch ( std::exception & ex )
{
LogPrint (eLogError, "SSU: Failed to bind to v6 port ", m_EndpointV6.port(), ": ", ex.what());
ThrowFatal ("Unable to start IPv6 SSU transport at port ", m_Endpoint.port(), ": ", ex.what ());
}
}
void SSUServer::Start ()
{
i2p::config::GetOption("nettime.frompeers", m_IsSyncClockFromPeers);
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&SSUServer::Run, this));
if (context.SupportsV4 ())
{
OpenSocket ();
m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this));
m_ReceiversService.post (std::bind (&SSUServer::Receive, this));
ScheduleTermination ();
ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers
}
if (context.SupportsV6 ())
{
OpenSocketV6 ();
m_ReceiversThreadV6 = new std::thread (std::bind (&SSUServer::RunReceiversV6, this));
m_ReceiversServiceV6.post (std::bind (&SSUServer::ReceiveV6, this));
ScheduleTerminationV6 ();
ScheduleIntroducersUpdateTimerV6 (); // wait for 30 seconds and decide if we need introducers
}
SchedulePeerTestsCleanupTimer ();
}
void SSUServer::Stop ()
{
DeleteAllSessions ();
m_IsRunning = false;
m_TerminationTimer.cancel ();
m_TerminationTimerV6.cancel ();
m_IntroducersUpdateTimer.cancel ();
m_IntroducersUpdateTimerV6.cancel ();
m_Service.stop ();
m_Socket.close ();
m_SocketV6.close ();
m_ReceiversService.stop ();
m_ReceiversServiceV6.stop ();
if (m_ReceiversThread)
{
m_ReceiversThread->join ();
delete m_ReceiversThread;
m_ReceiversThread = nullptr;
}
if (m_ReceiversThreadV6)
{
m_ReceiversThreadV6->join ();
delete m_ReceiversThreadV6;
m_ReceiversThreadV6 = nullptr;
}
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = nullptr;
}
}
void SSUServer::Run ()
{
i2p::util::SetThreadName("SSU");
while (m_IsRunning)
{
try
{
m_Service.run ();
}
catch (std::exception& ex)
{
LogPrint (eLogError, "SSU: Server runtime exception: ", ex.what ());
}
}
}
void SSUServer::RunReceivers ()
{
i2p::util::SetThreadName("SSUv4");
while (m_IsRunning)
{
try
{
m_ReceiversService.run ();
}
catch (std::exception& ex)
{
LogPrint (eLogError, "SSU: Receivers runtime exception: ", ex.what ());
if (m_IsRunning)
{
// restart socket
m_Socket.close ();
OpenSocket ();
Receive ();
}
}
}
}
void SSUServer::RunReceiversV6 ()
{
i2p::util::SetThreadName("SSUv6");
while (m_IsRunning)
{
try
{
m_ReceiversServiceV6.run ();
}
catch (std::exception& ex)
{
LogPrint (eLogError, "SSU: v6 receivers runtime exception: ", ex.what ());
if (m_IsRunning)
{
m_SocketV6.close ();
OpenSocketV6 ();
ReceiveV6 ();
}
}
}
}
void SSUServer::SetLocalAddress (const boost::asio::ip::address& localAddress)
{
if (localAddress.is_v6 ())
m_EndpointV6.address (localAddress);
else if (localAddress.is_v4 ())
m_Endpoint.address (localAddress);
}
void SSUServer::AddRelay (uint32_t tag, std::shared_ptr<SSUSession> relay)
{
m_Relays.emplace (tag, relay);
}
void SSUServer::RemoveRelay (uint32_t tag)
{
m_Relays.erase (tag);
}
std::shared_ptr<SSUSession> SSUServer::FindRelaySession (uint32_t tag)
{
auto it = m_Relays.find (tag);
if (it != m_Relays.end ())
{
if (it->second->GetState () == eSessionStateEstablished)
return it->second;
else
m_Relays.erase (it);
}
return nullptr;
}
void SSUServer::Send (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to)
{
boost::system::error_code ec;
if (to.protocol () == boost::asio::ip::udp::v4())
m_Socket.send_to (boost::asio::buffer (buf, len), to, 0, ec);
else
m_SocketV6.send_to (boost::asio::buffer (buf, len), to, 0, ec);
if (ec)
{
LogPrint (eLogError, "SSU: Send exception: ", ec.message (), " while trying to send data to ", to.address (), ":", to.port (), " (length: ", len, ")");
}
}
void SSUServer::Receive ()
{
SSUPacket * packet = m_PacketsPool.AcquireMt ();
m_Socket.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from,
std::bind (&SSUServer::HandleReceivedFrom, this, std::placeholders::_1, std::placeholders::_2, packet));
}
void SSUServer::ReceiveV6 ()
{
SSUPacket * packet = m_PacketsPool.AcquireMt ();
m_SocketV6.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from,
std::bind (&SSUServer::HandleReceivedFromV6, this, std::placeholders::_1, std::placeholders::_2, packet));
}
void SSUServer::HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet)
{
if (!ecode
|| ecode == boost::asio::error::connection_refused
|| ecode == boost::asio::error::connection_reset
|| ecode == boost::asio::error::network_unreachable
|| ecode == boost::asio::error::host_unreachable
#ifdef _WIN32 // windows can throw WinAPI error, which is not handled by ASIO
|| ecode.value() == boost::winapi::ERROR_CONNECTION_REFUSED_
|| ecode.value() == boost::winapi::ERROR_NETWORK_UNREACHABLE_
|| ecode.value() == boost::winapi::ERROR_HOST_UNREACHABLE_
#endif
)
// just try continue reading when received ICMP response otherwise socket can crash,
// but better to find out which host were sent it and mark that router as unreachable
{
packet->len = bytes_transferred;
std::vector<SSUPacket *> packets;
packets.push_back (packet);
boost::system::error_code ec;
size_t moreBytes = m_Socket.available(ec);
if (!ec)
{
while (moreBytes && packets.size () < 25)
{
packet = m_PacketsPool.AcquireMt ();
packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from, 0, ec);
if (!ec)
{
packets.push_back (packet);
moreBytes = m_Socket.available(ec);
if (ec) break;
}
else
{
LogPrint (eLogError, "SSU: receive_from error: code ", ec.value(), ": ", ec.message ());
m_PacketsPool.ReleaseMt (packet);
break;
}
}
}
m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_Sessions));
Receive ();
}
else
{
m_PacketsPool.ReleaseMt (packet);
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogError, "SSU: Receive error: code ", ecode.value(), ": ", ecode.message ());
m_Socket.close ();
OpenSocket ();
Receive ();
}
}
}
void SSUServer::HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet)
{
if (!ecode
|| ecode == boost::asio::error::connection_refused
|| ecode == boost::asio::error::connection_reset
|| ecode == boost::asio::error::network_unreachable
|| ecode == boost::asio::error::host_unreachable
#ifdef _WIN32 // windows can throw WinAPI error, which is not handled by ASIO
|| ecode.value() == boost::winapi::ERROR_CONNECTION_REFUSED_
|| ecode.value() == boost::winapi::ERROR_NETWORK_UNREACHABLE_
|| ecode.value() == boost::winapi::ERROR_HOST_UNREACHABLE_
#endif
)
// just try continue reading when received ICMP response otherwise socket can crash,
// but better to find out which host were sent it and mark that router as unreachable
{
packet->len = bytes_transferred;
std::vector<SSUPacket *> packets;
packets.push_back (packet);
boost::system::error_code ec;
size_t moreBytes = m_SocketV6.available (ec);
if (!ec)
{
while (moreBytes && packets.size () < 25)
{
packet = m_PacketsPool.AcquireMt ();
packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from, 0, ec);
if (!ec)
{
packets.push_back (packet);
moreBytes = m_SocketV6.available(ec);
if (ec) break;
}
else
{
LogPrint (eLogError, "SSU: v6 receive_from error: code ", ec.value(), ": ", ec.message ());
m_PacketsPool.ReleaseMt (packet);;
break;
}
}
}
m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_SessionsV6));
ReceiveV6 ();
}
else
{
m_PacketsPool.ReleaseMt (packet);
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogError, "SSU: v6 receive error: code ", ecode.value(), ": ", ecode.message ());
m_SocketV6.close ();
OpenSocketV6 ();
ReceiveV6 ();
}
}
}
void SSUServer::HandleReceivedPackets (std::vector<SSUPacket *> packets,
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > * sessions)
{
if (!m_IsRunning) return;
std::shared_ptr<SSUSession> session;
for (auto& packet: packets)
{
try
{
if (!session || session->GetRemoteEndpoint () != packet->from) // we received packet for other session than previous
{
if (session)
{
session->FlushData ();
session = nullptr;
}
auto it = sessions->find (packet->from);
if (it != sessions->end ())
session = it->second;
if (!session && packet->len > 0)
{
session = std::make_shared<SSUSession> (*this, packet->from);
session->WaitForConnect ();
(*sessions)[packet->from] = session;
LogPrint (eLogDebug, "SSU: New session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created");
}
}
if (session)
session->ProcessNextMessage (packet->buf, packet->len, packet->from);
}
catch (std::exception& ex)
{
LogPrint (eLogError, "SSU: HandleReceivedPackets ", ex.what ());
if (session) session->FlushData ();
session = nullptr;
}
}
m_PacketsPool.ReleaseMt (packets);
if (session) session->FlushData ();
}
std::shared_ptr<SSUSession> SSUServer::FindSession (const boost::asio::ip::udp::endpoint& e) const
{
auto& sessions = e.address ().is_v6 () ? m_SessionsV6 : m_Sessions;
auto it = sessions.find (e);
if (it != sessions.end ())
return it->second;
else
return nullptr;
}
bool SSUServer::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest, bool v4only)
{
auto address = router->GetSSUAddress (v4only || !context.SupportsV6 ());
if (address)
return CreateSession (router, address, peerTest);
else
LogPrint (eLogWarning, "SSU: Router ", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), " doesn't have SSU address");
return false;
}
bool SSUServer::CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router,
std::shared_ptr<const i2p::data::RouterInfo::Address> address, bool peerTest)
{
if (router && address)
{
if (address->UsesIntroducer ())
m_Service.post (std::bind (&SSUServer::CreateSessionThroughIntroducer, this, router, address, peerTest)); // always V4 thread
else
{
if (address->host.is_unspecified () || !address->port) return false;
boost::asio::ip::udp::endpoint remoteEndpoint (address->host, address->port);
m_Service.post (std::bind (&SSUServer::CreateDirectSession, this, router, remoteEndpoint, peerTest));
}
}
else
return false;
return true;
}
void SSUServer::CreateDirectSession (std::shared_ptr<const i2p::data::RouterInfo> router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest)
{
auto& sessions = remoteEndpoint.address ().is_v6 () ? m_SessionsV6 : m_Sessions;
auto it = sessions.find (remoteEndpoint);
if (it != sessions.end ())
{
auto session = it->second;
if (peerTest && session->GetState () == eSessionStateEstablished)
session->SendPeerTest ();
}
else
{
// otherwise create new session
auto session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest);
sessions[remoteEndpoint] = session;
// connect
LogPrint (eLogDebug, "SSU: Creating new session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), "] ",
remoteEndpoint.address ().to_string (), ":", remoteEndpoint.port ());
session->Connect ();
}
}
void SSUServer::CreateSessionThroughIntroducer (std::shared_ptr<const i2p::data::RouterInfo> router,
std::shared_ptr<const i2p::data::RouterInfo::Address> address, bool peerTest)
{
if (router && address && address->UsesIntroducer ())
{
if (address->IsV4 () && !i2p::context.SupportsV4 ()) return;
if (address->IsV6 () && !i2p::context.SupportsV6 ()) return;
if (!address->host.is_unspecified () && address->port)
{
// we rarely come here
auto& sessions = address->host.is_v6 () ? m_SessionsV6 : m_Sessions;
boost::asio::ip::udp::endpoint remoteEndpoint (address->host, address->port);
auto it = sessions.find (remoteEndpoint);
// check if session is presented already
if (it != sessions.end ())
{
auto session = it->second;
if (peerTest && session->GetState () == eSessionStateEstablished)
session->SendPeerTest ();
return;
}
}
// create new session
int numIntroducers = address->ssu->introducers.size ();
if (numIntroducers > 0)
{
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
std::shared_ptr<SSUSession> introducerSession;
const i2p::data::RouterInfo::Introducer * introducer = nullptr;
// we might have a session to introducer already
auto offset = rand ();
for (int i = 0; i < numIntroducers; i++)
{
auto intr = &(address->ssu->introducers[(offset + i)%numIntroducers]);
if (!intr->iPort) continue; // skip invalid introducer
if (intr->iExp > 0 && ts > intr->iExp) continue; // skip expired introducer
boost::asio::ip::udp::endpoint ep (intr->iHost, intr->iPort);
if (ep.address ().is_v4 () && address->IsV4 ()) // ipv4
{
if (!introducer) introducer = intr;
auto it = m_Sessions.find (ep);
if (it != m_Sessions.end ())
{
introducerSession = it->second;
break;
}
}
if (ep.address ().is_v6 () && address->IsV6 ()) // ipv6
{
if (!introducer) introducer = intr;
auto it = m_SessionsV6.find (ep);
if (it != m_SessionsV6.end ())
{
introducerSession = it->second;
break;
}
}
}
if (!introducer)
{
LogPrint (eLogWarning, "SSU: Can't connect to unreachable router and no compatibe non-expired introducers presented");
return;
}
if (introducerSession) // session found
LogPrint (eLogWarning, "SSU: Session to introducer already exists");
else // create new
{
LogPrint (eLogDebug, "SSU: Creating new session to introducer ", introducer->iHost);
boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort);
introducerSession = std::make_shared<SSUSession> (*this, introducerEndpoint, router);
if (introducerEndpoint.address ().is_v4 ())
m_Sessions[introducerEndpoint] = introducerSession;
else if (introducerEndpoint.address ().is_v6 ())
m_SessionsV6[introducerEndpoint] = introducerSession;
}
if (!address->host.is_unspecified () && address->port)
{
// create session
boost::asio::ip::udp::endpoint remoteEndpoint (address->host, address->port);
auto session = std::make_shared<SSUSession> (*this, remoteEndpoint, router, peerTest);
if (address->host.is_v4 ())
m_Sessions[remoteEndpoint] = session;
else if (address->host.is_v6 ())
m_SessionsV6[remoteEndpoint] = session;
// introduce
LogPrint (eLogInfo, "SSU: Introduce new session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()),
"] through introducer ", introducer->iHost, ":", introducer->iPort);
session->WaitForIntroduction ();
if ((address->host.is_v4 () && i2p::context.GetStatus () == eRouterStatusFirewalled) ||
(address->host.is_v6 () && i2p::context.GetStatusV6 () == eRouterStatusFirewalled))
{
uint8_t buf[1];
Send (buf, 0, remoteEndpoint); // send HolePunch
}
}
introducerSession->Introduce (*introducer, router);
}
else
LogPrint (eLogWarning, "SSU: Can't connect to unreachable router and no introducers present");
}
}
void SSUServer::DeleteSession (std::shared_ptr<SSUSession> session)
{
if (session)
{
session->Close ();
auto& ep = session->GetRemoteEndpoint ();
if (ep.address ().is_v6 ())
m_SessionsV6.erase (ep);
else
m_Sessions.erase (ep);
}
}
void SSUServer::DeleteAllSessions ()
{
for (auto& it: m_Sessions)
it.second->Close ();
m_Sessions.clear ();
for (auto& it: m_SessionsV6)
it.second->Close ();
m_SessionsV6.clear ();
}
template<typename Filter>
std::shared_ptr<SSUSession> SSUServer::GetRandomV4Session (Filter filter) // v4 only
{
std::vector<std::shared_ptr<SSUSession> > filteredSessions;
for (const auto& s :m_Sessions)
if (filter (s.second)) filteredSessions.push_back (s.second);
if (filteredSessions.size () > 0)
{
auto ind = rand () % filteredSessions.size ();
return filteredSessions[ind];
}
return nullptr;
}
std::shared_ptr<SSUSession> SSUServer::GetRandomEstablishedV4Session (std::shared_ptr<const SSUSession> excluded) // v4 only
{
return GetRandomV4Session (
[excluded](std::shared_ptr<SSUSession> session)->bool
{
return session->GetState () == eSessionStateEstablished && session != excluded;
}
);
}
template<typename Filter>
std::shared_ptr<SSUSession> SSUServer::GetRandomV6Session (Filter filter) // v6 only
{
std::vector<std::shared_ptr<SSUSession> > filteredSessions;
for (const auto& s :m_SessionsV6)
if (filter (s.second)) filteredSessions.push_back (s.second);
if (filteredSessions.size () > 0)
{
auto ind = rand () % filteredSessions.size ();
return filteredSessions[ind];
}
return nullptr;
}
std::shared_ptr<SSUSession> SSUServer::GetRandomEstablishedV6Session (std::shared_ptr<const SSUSession> excluded) // v6 only
{
return GetRandomV6Session (
[excluded](std::shared_ptr<SSUSession> session)->bool
{
return session->GetState () == eSessionStateEstablished && session != excluded;
}
);
}
std::list<std::shared_ptr<SSUSession> > SSUServer::FindIntroducers (int maxNumIntroducers,
bool v4, std::set<i2p::data::IdentHash>& excluded)
{
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
std::list<std::shared_ptr<SSUSession> > ret;
const auto& sessions = v4 ? m_Sessions : m_SessionsV6;
for (const auto& s : sessions)
{
if (s.second->GetRelayTag () && s.second->GetState () == eSessionStateEstablished &&
ts < s.second->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_EXPIRATION)
ret.push_back (s.second);
else if (s.second->GetRemoteIdentity ())
excluded.insert (s.second->GetRemoteIdentity ()->GetIdentHash ());
}
if ((int)ret.size () > maxNumIntroducers)
{
// shink ret randomly
int sz = ret.size () - maxNumIntroducers;
for (int i = 0; i < sz; i++)
{
auto ind = rand () % ret.size ();
auto it = ret.begin ();
std::advance (it, ind);
ret.erase (it);
}
}
return ret;
}
void SSUServer::RescheduleIntroducersUpdateTimer ()
{
m_IntroducersUpdateTimer.cancel ();
m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL/2));
m_IntroducersUpdateTimer.async_wait (std::bind (&SSUServer::HandleIntroducersUpdateTimer,
this, std::placeholders::_1, true));
}
void SSUServer::ScheduleIntroducersUpdateTimer ()
{
m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL));
m_IntroducersUpdateTimer.async_wait (std::bind (&SSUServer::HandleIntroducersUpdateTimer,
this, std::placeholders::_1, true));
}
void SSUServer::RescheduleIntroducersUpdateTimerV6 ()
{
m_IntroducersUpdateTimerV6.cancel ();
m_IntroducersUpdateTimerV6.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL/2));
m_IntroducersUpdateTimerV6.async_wait (std::bind (&SSUServer::HandleIntroducersUpdateTimer,
this, std::placeholders::_1, false));
}
void SSUServer::ScheduleIntroducersUpdateTimerV6 ()
{
m_IntroducersUpdateTimerV6.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL));
m_IntroducersUpdateTimerV6.async_wait (std::bind (&SSUServer::HandleIntroducersUpdateTimer,
this, std::placeholders::_1, false));
}
void SSUServer::HandleIntroducersUpdateTimer (const boost::system::error_code& ecode, bool v4)
{
if (ecode != boost::asio::error::operation_aborted)
{
// timeout expired
if (v4)
{
if (i2p::context.GetStatus () == eRouterStatusTesting)
{
// we still don't know if we need introducers
ScheduleIntroducersUpdateTimer ();
return;
}
if (i2p::context.GetStatus () != eRouterStatusFirewalled)
{
// we don't need introducers
m_Introducers.clear ();
return;
}
// we are firewalled
if (!i2p::context.IsUnreachable ()) i2p::context.SetUnreachable (true, false); // v4
}
else
{
if (i2p::context.GetStatusV6 () == eRouterStatusTesting)
{
// we still don't know if we need introducers
ScheduleIntroducersUpdateTimerV6 ();
return;
}
if (i2p::context.GetStatusV6 () != eRouterStatusFirewalled)
{
// we don't need introducers
m_IntroducersV6.clear ();
return;
}
// we are firewalled
auto addr = i2p::context.GetRouterInfo ().GetSSUV6Address ();
if (addr && addr->ssu && addr->ssu->introducers.empty ())
i2p::context.SetUnreachable (false, true); // v6
}
std::list<boost::asio::ip::udp::endpoint> newList;
size_t numIntroducers = 0;
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
std::set<i2p::data::IdentHash> excluded;
auto& introducers = v4 ? m_Introducers : m_IntroducersV6;
for (const auto& it : introducers)
{
auto session = FindSession (it);
if (session)
{
if (ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_EXPIRATION)
session->SendKeepAlive ();
if (ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION)
{
newList.push_back (it);
numIntroducers++;
if (session->GetRemoteIdentity ())
excluded.insert (session->GetRemoteIdentity ()->GetIdentHash ());
}
else
session = nullptr;
}
if (!session)
i2p::context.RemoveIntroducer (it);
}
if (numIntroducers < SSU_MAX_NUM_INTRODUCERS)
{
// create new
auto sessions = FindIntroducers (SSU_MAX_NUM_INTRODUCERS, v4, excluded); // try to find if duplicates
if (sessions.empty () && !introducers.empty ())
{
// bump creation time for previous introducers if no new sessions found
LogPrint (eLogDebug, "SSU: No new introducers found. Trying to reuse existing");
for (const auto& it : introducers)
{
auto session = FindSession (it);
if (session)
session->SetCreationTime (session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION);
}
// try again
excluded.clear ();
sessions = FindIntroducers (SSU_MAX_NUM_INTRODUCERS, v4, excluded);
}
for (const auto& it1: sessions)
{
const auto& ep = it1->GetRemoteEndpoint ();
i2p::data::RouterInfo::Introducer introducer;
introducer.iHost = ep.address ();
introducer.iPort = ep.port ();
introducer.iTag = it1->GetRelayTag ();
introducer.iKey = it1->GetIntroKey ();
introducer.iExp = it1->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_EXPIRATION;
if (i2p::context.AddIntroducer (introducer))
{
newList.push_back (ep);
if (newList.size () >= SSU_MAX_NUM_INTRODUCERS) break;
}
if (it1->GetRemoteIdentity ())
excluded.insert (it1->GetRemoteIdentity ()->GetIdentHash ());
}
}
introducers = newList;
if (introducers.size () < SSU_MAX_NUM_INTRODUCERS)
{
for (auto i = introducers.size (); i < SSU_MAX_NUM_INTRODUCERS; i++)
{
auto introducer = i2p::data::netdb.GetRandomIntroducer (v4, excluded);
if (introducer)
{
auto address = v4 ? introducer->GetSSUAddress (true) : introducer->GetSSUV6Address ();
if (address && !address->host.is_unspecified () && address->port)
{
boost::asio::ip::udp::endpoint ep (address->host, address->port);
if (std::find (introducers.begin (), introducers.end (), ep) == introducers.end ()) // not connected yet
{
CreateDirectSession (introducer, ep, false);
excluded.insert (introducer->GetIdentHash ());
}
}
}
else
{
LogPrint (eLogDebug, "SSU: Can't find more introducers");
break;
}
}
}
if (v4)
ScheduleIntroducersUpdateTimer ();
else
ScheduleIntroducersUpdateTimerV6 ();
}
}
void SSUServer::NewPeerTest (uint32_t nonce, PeerTestParticipant role, std::shared_ptr<SSUSession> session)
{
m_PeerTests[nonce] = { i2p::util::GetMillisecondsSinceEpoch (), role, session };
}
PeerTestParticipant SSUServer::GetPeerTestParticipant (uint32_t nonce)
{
auto it = m_PeerTests.find (nonce);
if (it != m_PeerTests.end ())
return it->second.role;
else
return ePeerTestParticipantUnknown;
}
std::shared_ptr<SSUSession> SSUServer::GetPeerTestSession (uint32_t nonce)
{
auto it = m_PeerTests.find (nonce);
if (it != m_PeerTests.end ())
return it->second.session;
else
return nullptr;
}
void SSUServer::UpdatePeerTest (uint32_t nonce, PeerTestParticipant role)
{
auto it = m_PeerTests.find (nonce);
if (it != m_PeerTests.end ())
it->second.role = role;
}
void SSUServer::RemovePeerTest (uint32_t nonce)
{
m_PeerTests.erase (nonce);
}
void SSUServer::SchedulePeerTestsCleanupTimer ()
{
m_PeerTestsCleanupTimer.expires_from_now (boost::posix_time::seconds(SSU_PEER_TEST_TIMEOUT));
m_PeerTestsCleanupTimer.async_wait (std::bind (&SSUServer::HandlePeerTestsCleanupTimer,
this, std::placeholders::_1));
}
void SSUServer::HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
int numDeleted = 0;
uint64_t ts = i2p::util::GetMillisecondsSinceEpoch ();
for (auto it = m_PeerTests.begin (); it != m_PeerTests.end ();)
{
if (ts > it->second.creationTime + SSU_PEER_TEST_TIMEOUT*1000LL)
{
numDeleted++;
it = m_PeerTests.erase (it);
}
else
++it;
}
if (numDeleted > 0)
LogPrint (eLogDebug, "SSU: ", numDeleted, " peer tests have been expired");
// some cleaups. TODO: use separate timer
m_FragmentsPool.CleanUp ();
m_IncompleteMessagesPool.CleanUp ();
m_SentMessagesPool.CleanUp ();
SchedulePeerTestsCleanupTimer ();
}
}
void SSUServer::ScheduleTermination ()
{
uint64_t timeout = SSU_TERMINATION_CHECK_TIMEOUT + (rand () % SSU_TERMINATION_CHECK_TIMEOUT)/5;
m_TerminationTimer.expires_from_now (boost::posix_time::seconds(timeout));
m_TerminationTimer.async_wait (std::bind (&SSUServer::HandleTerminationTimer,
this, std::placeholders::_1));
}
void SSUServer::HandleTerminationTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto& it: m_Sessions)
if (it.second->IsTerminationTimeoutExpired (ts))
{
auto session = it.second;
if (it.first != session->GetRemoteEndpoint ())
LogPrint (eLogWarning, "SSU: Remote endpoint ", session->GetRemoteEndpoint (), " doesn't match key ", it.first, " adjusted");
m_Service.post ([session]
{
LogPrint (eLogWarning, "SSU: No activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds");
session->Failed ();
});
}
else
it.second->CleanUp (ts);
ScheduleTermination ();
}
}
void SSUServer::ScheduleTerminationV6 ()
{
uint64_t timeout = SSU_TERMINATION_CHECK_TIMEOUT + (rand () % SSU_TERMINATION_CHECK_TIMEOUT)/5;
m_TerminationTimerV6.expires_from_now (boost::posix_time::seconds(timeout));
m_TerminationTimerV6.async_wait (std::bind (&SSUServer::HandleTerminationTimerV6,
this, std::placeholders::_1));
}
void SSUServer::HandleTerminationTimerV6 (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
auto ts = i2p::util::GetSecondsSinceEpoch ();
for (auto& it: m_SessionsV6)
if (it.second->IsTerminationTimeoutExpired (ts))
{
auto session = it.second;
if (it.first != session->GetRemoteEndpoint ())
LogPrint (eLogWarning, "SSU: Remote endpoint ", session->GetRemoteEndpoint (), " doesn't match key ", it.first);
m_Service.post ([session]
{
LogPrint (eLogWarning, "SSU: No activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds");
session->Failed ();
});
}
else
it.second->CleanUp (ts);
ScheduleTerminationV6 ();
}
}
}
}

159
libi2pd/SSU.h

@ -1,159 +0,0 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
* See full license text in LICENSE file at top of project tree
*/
#ifndef SSU_H__
#define SSU_H__
#include <inttypes.h>
#include <string.h>
#include <map>
#include <list>
#include <set>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
#include "Crypto.h"
#include "util.h"
#include "I2PEndian.h"
#include "Identity.h"
#include "RouterInfo.h"
#include "I2NPProtocol.h"
#include "SSUSession.h"
namespace i2p
{
namespace transport
{
const int SSU_KEEP_ALIVE_INTERVAL = 30; // 30 seconds
const int SSU_PEER_TEST_TIMEOUT = 60; // 60 seconds
const int SSU_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour
const int SSU_TO_INTRODUCER_SESSION_EXPIRATION = 4800; // 80 minutes
const int SSU_TERMINATION_CHECK_TIMEOUT = 30; // 30 seconds
const size_t SSU_MAX_NUM_INTRODUCERS = 3;
const size_t SSU_SOCKET_RECEIVE_BUFFER_SIZE = 0x1FFFF; // 128K
const size_t SSU_SOCKET_SEND_BUFFER_SIZE = 0x1FFFF; // 128K
struct SSUPacket
{
i2p::crypto::AESAlignedBuffer<SSU_MTU_V6 + 18> buf; // max MTU + iv + size
boost::asio::ip::udp::endpoint from;
size_t len;
};
class SSUServer
{
public:
SSUServer (int port);
~SSUServer ();
void Start ();
void Stop ();
bool CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router, bool peerTest = false, bool v4only = false);
bool CreateSession (std::shared_ptr<const i2p::data::RouterInfo> router,
std::shared_ptr<const i2p::data::RouterInfo::Address> address, bool peerTest = false);
void CreateDirectSession (std::shared_ptr<const i2p::data::RouterInfo> router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest);
std::shared_ptr<SSUSession> FindSession (const boost::asio::ip::udp::endpoint& e) const;
std::shared_ptr<SSUSession> GetRandomEstablishedV4Session (std::shared_ptr<const SSUSession> excluded);
std::shared_ptr<SSUSession> GetRandomEstablishedV6Session (std::shared_ptr<const SSUSession> excluded);
void DeleteSession (std::shared_ptr<SSUSession> session);
void DeleteAllSessions ();
boost::asio::io_service& GetService () { return m_Service; };
i2p::util::MemoryPool<Fragment>& GetFragmentsPool () { return m_FragmentsPool; };
i2p::util::MemoryPool<IncompleteMessage>& GetIncompleteMessagesPool () { return m_IncompleteMessagesPool; };
i2p::util::MemoryPool<SentMessage>& GetSentMessagesPool () { return m_SentMessagesPool; };
uint16_t GetPort () const { return m_Endpoint.port (); };
bool IsSyncClockFromPeers () const { return m_IsSyncClockFromPeers; };
void SetLocalAddress (const boost::asio::ip::address& localAddress);
void Send (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to);
void AddRelay (uint32_t tag, std::shared_ptr<SSUSession> relay);
void RemoveRelay (uint32_t tag);
std::shared_ptr<SSUSession> FindRelaySession (uint32_t tag);
void RescheduleIntroducersUpdateTimer ();
void RescheduleIntroducersUpdateTimerV6 ();
void NewPeerTest (uint32_t nonce, PeerTestParticipant role, std::shared_ptr<SSUSession> session = nullptr);
PeerTestParticipant GetPeerTestParticipant (uint32_t nonce);
std::shared_ptr<SSUSession> GetPeerTestSession (uint32_t nonce);
void UpdatePeerTest (uint32_t nonce, PeerTestParticipant role);
void RemovePeerTest (uint32_t nonce);
private:
void OpenSocket ();
void OpenSocketV6 ();
void Run ();
void RunReceivers ();
void RunReceiversV6 ();
void Receive ();
void ReceiveV6 ();
void HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
void HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet);
void HandleReceivedPackets (std::vector<SSUPacket *> packets,
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> >* sessions);
void CreateSessionThroughIntroducer (std::shared_ptr<const i2p::data::RouterInfo> router,
std::shared_ptr<const i2p::data::RouterInfo::Address> address, bool peerTest = false);
template<typename Filter>
std::shared_ptr<SSUSession> GetRandomV4Session (Filter filter);
template<typename Filter>
std::shared_ptr<SSUSession> GetRandomV6Session (Filter filter);
std::list<std::shared_ptr<SSUSession> > FindIntroducers (int maxNumIntroducers, bool v4, std::set<i2p::data::IdentHash>& excluded);
void ScheduleIntroducersUpdateTimer ();
void ScheduleIntroducersUpdateTimerV6 ();
void HandleIntroducersUpdateTimer (const boost::system::error_code& ecode, bool v4);
void SchedulePeerTestsCleanupTimer ();
void HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode);
// timer
void ScheduleTermination ();
void HandleTerminationTimer (const boost::system::error_code& ecode);
void ScheduleTerminationV6 ();
void HandleTerminationTimerV6 (const boost::system::error_code& ecode);
private:
struct PeerTest
{
uint64_t creationTime;
PeerTestParticipant role;
std::shared_ptr<SSUSession> session; // for Bob to Alice
};
volatile bool m_IsRunning;
std::thread * m_Thread, * m_ReceiversThread, * m_ReceiversThreadV6;
boost::asio::io_service m_Service, m_ReceiversService, m_ReceiversServiceV6;
boost::asio::io_service::work m_Work, m_ReceiversWork, m_ReceiversWorkV6;
boost::asio::ip::udp::endpoint m_Endpoint, m_EndpointV6;
boost::asio::ip::udp::socket m_Socket, m_SocketV6;
boost::asio::deadline_timer m_IntroducersUpdateTimer, m_IntroducersUpdateTimerV6,
m_PeerTestsCleanupTimer, m_TerminationTimer, m_TerminationTimerV6;
bool m_IsSyncClockFromPeers;
std::list<boost::asio::ip::udp::endpoint> m_Introducers, m_IntroducersV6; // introducers we are connected to
std::map<boost::asio::ip::udp::endpoint, std::shared_ptr<SSUSession> > m_Sessions, m_SessionsV6;
std::map<uint32_t, std::shared_ptr<SSUSession> > m_Relays; // we are introducer
std::map<uint32_t, PeerTest> m_PeerTests; // nonce -> creation time in milliseconds
i2p::util::MemoryPool<Fragment> m_FragmentsPool;
i2p::util::MemoryPool<IncompleteMessage> m_IncompleteMessagesPool;
i2p::util::MemoryPool<SentMessage> m_SentMessagesPool;
i2p::util::MemoryPoolMt<SSUPacket> m_PacketsPool;
public:
// for HTTP only
const decltype(m_Sessions)& GetSessions () const { return m_Sessions; };
const decltype(m_SessionsV6)& GetSessionsV6 () const { return m_SessionsV6; };
};
}
}
#endif

516
libi2pd/SSUData.cpp

@ -1,516 +0,0 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
* See full license text in LICENSE file at top of project tree
*/
#include <stdlib.h>
#include "Log.h"
#include "Timestamp.h"
#include "NetDb.hpp"
#include "SSU.h"
#include "SSUData.h"
namespace i2p
{
namespace transport
{
void IncompleteMessage::AttachNextFragment (const uint8_t * fragment, size_t fragmentSize)
{
if (msg->len + fragmentSize > msg->maxLen)
{
LogPrint (eLogWarning, "SSU: I2NP message size ", msg->maxLen, " is not enough");
auto newMsg = NewI2NPMessage ();
*newMsg = *msg;
msg = newMsg;
}
if (msg->Concat (fragment, fragmentSize) < fragmentSize)
LogPrint (eLogError, "SSU: I2NP buffer overflow ", msg->maxLen);
nextFragmentNum++;
}
SSUData::SSUData (SSUSession& session):
m_Session (session), m_ResendTimer (session.GetService ()),
m_MaxPacketSize (session.IsV6 () ? SSU_V6_MAX_PACKET_SIZE : SSU_V4_MAX_PACKET_SIZE),
m_PacketSize (m_MaxPacketSize), m_LastMessageReceivedTime (0)
{
}
SSUData::~SSUData ()
{
}
void SSUData::Start ()
{
}
void SSUData::Stop ()
{
m_ResendTimer.cancel ();
m_IncompleteMessages.clear ();
m_SentMessages.clear ();
m_ReceivedMessages.clear ();
}
void SSUData::AdjustPacketSize (std::shared_ptr<const i2p::data::RouterInfo> remoteRouter)
{
if (!remoteRouter) return;
auto ssuAddress = remoteRouter->GetSSUAddress ();
if (ssuAddress && ssuAddress->ssu->mtu)
{
if (m_Session.IsV6 ())
m_PacketSize = ssuAddress->ssu->mtu - IPV6_HEADER_SIZE - UDP_HEADER_SIZE;
else
m_PacketSize = ssuAddress->ssu->mtu - IPV4_HEADER_SIZE - UDP_HEADER_SIZE;
if (m_PacketSize > 0)
{
// make sure packet size multiple of 16
m_PacketSize >>= 4;
m_PacketSize <<= 4;
if (m_PacketSize > m_MaxPacketSize) m_PacketSize = m_MaxPacketSize;
LogPrint (eLogDebug, "SSU: MTU=", ssuAddress->ssu->mtu, " packet size=", m_PacketSize);
}
else
{
LogPrint (eLogWarning, "SSU: Unexpected MTU ", ssuAddress->ssu->mtu);
m_PacketSize = m_MaxPacketSize;
}
}
}
void SSUData::UpdatePacketSize (const i2p::data::IdentHash& remoteIdent)
{
auto routerInfo = i2p::data::netdb.FindRouter (remoteIdent);
if (routerInfo)
AdjustPacketSize (routerInfo);
}
void SSUData::ProcessSentMessageAck (uint32_t msgID)
{
auto it = m_SentMessages.find (msgID);
if (it != m_SentMessages.end ())
{
m_SentMessages.erase (it);
if (m_SentMessages.empty ())
m_ResendTimer.cancel ();
}
}
void SSUData::ProcessAcks (uint8_t *& buf, uint8_t flag)
{
if (flag & DATA_FLAG_EXPLICIT_ACKS_INCLUDED)
{
// explicit ACKs
uint8_t numAcks =*buf;
buf++;
for (int i = 0; i < numAcks; i++)
ProcessSentMessageAck (bufbe32toh (buf+i*4));
buf += numAcks*4;
}
if (flag & DATA_FLAG_ACK_BITFIELDS_INCLUDED)
{
// explicit ACK bitfields
uint8_t numBitfields =*buf;
buf++;
for (int i = 0; i < numBitfields; i++)
{
uint32_t msgID = bufbe32toh (buf);
buf += 4; // msgID
auto it = m_SentMessages.find (msgID);
// process individual Ack bitfields
bool isNonLast = false;
int fragment = 0;
do
{
uint8_t bitfield = *buf;
isNonLast = bitfield & 0x80;
bitfield &= 0x7F; // clear MSB
if (bitfield && it != m_SentMessages.end ())
{
int numSentFragments = it->second->fragments.size ();
// process bits
uint8_t mask = 0x01;
for (int j = 0; j < 7; j++)
{
if (bitfield & mask)
{
if (fragment < numSentFragments)
it->second->fragments[fragment] = nullptr;
}
fragment++;
mask <<= 1;
}
}
buf++;
}
while (isNonLast);
}
}
}
void SSUData::ProcessFragments (uint8_t * buf)
{
uint8_t numFragments = *buf; // number of fragments
buf++;
for (int i = 0; i < numFragments; i++)
{
uint32_t msgID = bufbe32toh (buf); // message ID
buf += 4;
uint8_t frag[4] = {0};
memcpy (frag + 1, buf, 3);
buf += 3;
uint32_t fragmentInfo = bufbe32toh (frag); // fragment info
uint16_t fragmentSize = fragmentInfo & 0x3FFF; // bits 0 - 13
bool isLast = fragmentInfo & 0x010000; // bit 16
uint8_t fragmentNum = fragmentInfo >> 17; // bits 23 - 17
if (fragmentSize >= SSU_V4_MAX_PACKET_SIZE)
{
LogPrint (eLogError, "SSU: Fragment size ", fragmentSize, " exceeds max SSU packet size");
return;
}
// find message with msgID
auto it = m_IncompleteMessages.find (msgID);
if (it == m_IncompleteMessages.end ())
{
// create new message
auto msg = NewI2NPShortMessage ();
msg->len -= I2NP_SHORT_HEADER_SIZE;
it = m_IncompleteMessages.insert (std::make_pair (msgID,
m_Session.GetServer ().GetIncompleteMessagesPool ().AcquireShared (std::move (msg)))).first;
}
auto& incompleteMessage = it->second;
// mark fragment as received
if (fragmentNum < 64)
incompleteMessage->receivedFragmentsBits |= (uint64_t(0x01) << fragmentNum);
else
LogPrint (eLogWarning, "SSU: Fragment number ", fragmentNum, " exceeds 64");
// handle current fragment
if (fragmentNum == incompleteMessage->nextFragmentNum)
{
// expected fragment
incompleteMessage->AttachNextFragment (buf, fragmentSize);
if (!isLast && !incompleteMessage->savedFragments.empty ())
{
// try saved fragments
for (auto it1 = incompleteMessage->savedFragments.begin (); it1 != incompleteMessage->savedFragments.end ();)
{
auto& savedFragment = *it1;
if (savedFragment->fragmentNum == incompleteMessage->nextFragmentNum)
{
incompleteMessage->AttachNextFragment (savedFragment->buf, savedFragment->len);
isLast = savedFragment->isLast;
incompleteMessage->savedFragments.erase (it1++);
}
else
break;
}
if (isLast)
LogPrint (eLogDebug, "SSU: Message ", msgID, " complete");
}
}
else
{
if (fragmentNum < incompleteMessage->nextFragmentNum)
// duplicate fragment
LogPrint (eLogWarning, "SSU: Duplicate fragment ", (int)fragmentNum, " of message ", msgID, ", ignored");
else
{
// missing fragment
LogPrint (eLogWarning, "SSU: Missing fragments from ", (int)incompleteMessage->nextFragmentNum, " to ", fragmentNum - 1, " of message ", msgID);
auto savedFragment = m_Session.GetServer ().GetFragmentsPool ().AcquireShared (fragmentNum, buf, fragmentSize, isLast);
if (incompleteMessage->savedFragments.insert (savedFragment).second)
incompleteMessage->lastFragmentInsertTime = i2p::util::GetSecondsSinceEpoch ();
else
LogPrint (eLogWarning, "SSU: Fragment ", (int)fragmentNum, " of message ", msgID, " already saved");
}
isLast = false;
}
if (isLast)
{
// delete incomplete message
auto msg = incompleteMessage->msg;
incompleteMessage->msg = nullptr;
m_IncompleteMessages.erase (msgID);
// process message
SendMsgAck (msgID);
msg->FromSSU (msgID);
if (m_Session.GetState () == eSessionStateEstablished)
{
if (!m_ReceivedMessages.count (msgID))
{
m_LastMessageReceivedTime = i2p::util::GetSecondsSinceEpoch ();
m_ReceivedMessages.emplace (msgID, m_LastMessageReceivedTime);
if (!msg->IsExpired ())
{
m_Handler.PutNextMessage (std::move (msg));
}
else
LogPrint (eLogDebug, "SSU: message expired");
}
else
LogPrint (eLogWarning, "SSU: Message ", msgID, " already received");
}
else
{
// we expect DeliveryStatus
if (msg->GetTypeID () == eI2NPDeliveryStatus)
{
LogPrint (eLogDebug, "SSU: session established");
m_Session.Established ();
}
else
LogPrint (eLogError, "SSU: unexpected message ", (int)msg->GetTypeID ());
}
}
else
SendFragmentAck (msgID, incompleteMessage->receivedFragmentsBits);
buf += fragmentSize;
}
}
void SSUData::FlushReceivedMessage ()
{
m_Handler.Flush ();
}
void SSUData::ProcessMessage (uint8_t * buf, size_t len)
{
//uint8_t * start = buf;
uint8_t flag = *buf;
buf++;
LogPrint (eLogDebug, "SSU: Process data, flags=", (int)flag, ", len=", len);
// process acks if presented
if (flag & (DATA_FLAG_ACK_BITFIELDS_INCLUDED | DATA_FLAG_EXPLICIT_ACKS_INCLUDED))
ProcessAcks (buf, flag);
// extended data if presented
if (flag & DATA_FLAG_EXTENDED_DATA_INCLUDED)
{
uint8_t extendedDataSize = *buf;
buf++; // size
LogPrint (eLogDebug, "SSU: extended data of ", extendedDataSize, " bytes present");
buf += extendedDataSize;
}
// process data
ProcessFragments (buf);
}
void SSUData::Send (std::shared_ptr<i2p::I2NPMessage> msg)
{
uint32_t msgID = msg->ToSSU ();
if (m_SentMessages.find (msgID) != m_SentMessages.end())
{
LogPrint (eLogWarning, "SSU: message ", msgID, " already sent");
return;
}
if (m_SentMessages.empty ()) // schedule resend at first message only
ScheduleResend ();
auto ret = m_SentMessages.emplace (msgID, m_Session.GetServer ().GetSentMessagesPool ().AcquireShared ());
auto& sentMessage = ret.first->second;
if (ret.second)
{
sentMessage->nextResendTime = i2p::util::GetSecondsSinceEpoch () + RESEND_INTERVAL;
sentMessage->numResends = 0;
}
auto& fragments = sentMessage->fragments;
size_t payloadSize = m_PacketSize - sizeof (SSUHeader) - 9; // 9 = flag + #frg(1) + messageID(4) + frag info (3)
size_t len = msg->GetLength ();
uint8_t * msgBuf = msg->GetSSUHeader ();
uint32_t fragmentNum = 0;
while (len > 0 && fragmentNum <= 127)
{
auto fragment = m_Session.GetServer ().GetFragmentsPool ().AcquireShared ();
fragment->fragmentNum = fragmentNum;
uint8_t * payload = fragment->buf + sizeof (SSUHeader);
*payload = DATA_FLAG_WANT_REPLY; // for compatibility
payload++;
*payload = 1; // always 1 message fragment per message
payload++;
htobe32buf (payload, msgID);
payload += 4;
bool isLast = (len <= payloadSize) || fragmentNum == 127; // 127 fragments max
size_t size = isLast ? len : payloadSize;
uint32_t fragmentInfo = (fragmentNum << 17);
if (isLast)
fragmentInfo |= 0x010000;
fragmentInfo |= size;
fragmentInfo = htobe32 (fragmentInfo);
memcpy (payload, (uint8_t *)(&fragmentInfo) + 1, 3);
payload += 3;
memcpy (payload, msgBuf, size);
size += payload - fragment->buf;
uint8_t rem = size & 0x0F;
if (rem) // make sure 16 bytes boundary
{
auto padding = 16 - rem;
memset (fragment->buf + size, 0, padding);
size += padding;
}
fragment->len = size;
fragments.push_back (fragment);
// encrypt message with session key
uint8_t buf[SSU_V4_MAX_PACKET_SIZE + 18];
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, fragment->buf, size, buf);
try
{
m_Session.Send (buf, size);
}
catch (boost::system::system_error& ec)
{
LogPrint (eLogWarning, "SSU: Can't send data fragment ", ec.what ());
}
if (!isLast)
{
len -= payloadSize;
msgBuf += payloadSize;
}
else
len = 0;
fragmentNum++;
}
}
void SSUData::SendMsgAck (uint32_t msgID)
{
uint8_t buf[48 + 18] = {0}; // actual length is 44 = 37 + 7 but pad it to multiple of 16
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = DATA_FLAG_EXPLICIT_ACKS_INCLUDED; // flag
payload++;
*payload = 1; // number of ACKs
payload++;
htobe32buf (payload, msgID); // msgID
payload += 4;
*payload = 0; // number of fragments
// encrypt message with session key
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, 48);
m_Session.Send (buf, 48);
}
void SSUData::SendFragmentAck (uint32_t msgID, uint64_t bits)
{
if (!bits) return;
uint8_t buf[64 + 18] = {0};
uint8_t * payload = buf + sizeof (SSUHeader);
*payload = DATA_FLAG_ACK_BITFIELDS_INCLUDED; // flag
payload++;
*payload = 1; // number of ACK bitfields
payload++;
// one ack
*(uint32_t *)(payload) = htobe32 (msgID); // msgID
payload += 4;
size_t len = 0;
while (bits)
{
*payload = (bits & 0x7F); // next 7 bits
bits >>= 7;
if (bits) *payload &= 0x80; // 0x80 means non-last
payload++; len++;
}
*payload = 0; // number of fragments
len = (len <= 4) ? 48 : 64; // 48 = 37 + 7 + 4
// encrypt message with session key
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, buf, len);
m_Session.Send (buf, len);
}
void SSUData::ScheduleResend()
{
m_ResendTimer.cancel ();
m_ResendTimer.expires_from_now (boost::posix_time::seconds(RESEND_INTERVAL));
auto s = m_Session.shared_from_this();
m_ResendTimer.async_wait ([s](const boost::system::error_code& ecode)
{ s->m_Data.HandleResendTimer (ecode); });
}
void SSUData::HandleResendTimer (const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
uint8_t buf[SSU_V4_MAX_PACKET_SIZE + 18];
uint32_t ts = i2p::util::GetSecondsSinceEpoch ();
int numResent = 0;
for (auto it = m_SentMessages.begin (); it != m_SentMessages.end ();)
{
if (ts >= it->second->nextResendTime)
{
if (it->second->numResends < MAX_NUM_RESENDS)
{
for (auto& f: it->second->fragments)
if (f)
{
try
{
m_Session.FillHeaderAndEncrypt (PAYLOAD_TYPE_DATA, f->buf, f->len, buf);
m_Session.Send (buf, f->len); // resend
numResent++;
}
catch (boost::system::system_error& ec)
{
LogPrint (eLogWarning, "SSU: Can't resend message ", it->first, " data fragment: ", ec.what ());
}
}
it->second->numResends++;
it->second->nextResendTime += it->second->numResends*RESEND_INTERVAL;
++it;
}
else
{
LogPrint (eLogInfo, "SSU: message ", it->first, " has not been ACKed after ", MAX_NUM_RESENDS, " attempts, deleted");
it = m_SentMessages.erase (it);
}
}
else
++it;
}
if (m_SentMessages.empty ()) return; // nothing to resend
if (numResent < MAX_OUTGOING_WINDOW_SIZE)
ScheduleResend ();
else
{
LogPrint (eLogError, "SSU: resend window exceeds max size. Session terminated");
m_Session.Close ();
}
}
}
void SSUData::CleanUp (uint64_t ts)
{
for (auto it = m_IncompleteMessages.begin (); it != m_IncompleteMessages.end ();)
{
if (ts > it->second->lastFragmentInsertTime + INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT)
{
LogPrint (eLogWarning, "SSU: message ", it->first, " was not completed in ", INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT, " seconds, deleted");
it = m_IncompleteMessages.erase (it);
}
else
++it;
}
if (m_ReceivedMessages.size () > MAX_NUM_RECEIVED_MESSAGES || ts > m_LastMessageReceivedTime + DECAY_INTERVAL)
// decay
m_ReceivedMessages.clear ();
else
{
// delete old received messages
for (auto it = m_ReceivedMessages.begin (); it != m_ReceivedMessages.end ();)
{
if (ts > it->second + RECEIVED_MESSAGES_CLEANUP_TIMEOUT)
it = m_ReceivedMessages.erase (it);
else
++it;
}
}
}
}
}

131
libi2pd/SSUData.h

@ -1,131 +0,0 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
* See full license text in LICENSE file at top of project tree
*/
#ifndef SSU_DATA_H__
#define SSU_DATA_H__
#include <inttypes.h>
#include <string.h>
#include <vector>
#include <map>
#include <unordered_map>
#include <memory>
#include <boost/asio.hpp>
#include "I2NPProtocol.h"
#include "Identity.h"
#include "RouterInfo.h"
#include "TransportSession.h"
namespace i2p
{
namespace transport
{
const size_t SSU_MTU_V4 = 1484;
const size_t SSU_MTU_V6 = 1488;
const size_t SSU_V4_MAX_PACKET_SIZE = SSU_MTU_V4 - IPV4_HEADER_SIZE - UDP_HEADER_SIZE; // 1456
const size_t SSU_V6_MAX_PACKET_SIZE = SSU_MTU_V6 - IPV6_HEADER_SIZE - UDP_HEADER_SIZE; // 1440
const int RESEND_INTERVAL = 3; // in seconds
const int MAX_NUM_RESENDS = 5;
const int DECAY_INTERVAL = 20; // in seconds
const int INCOMPLETE_MESSAGES_CLEANUP_TIMEOUT = 30; // in seconds
const int RECEIVED_MESSAGES_CLEANUP_TIMEOUT = 40; // in seconds
const unsigned int MAX_NUM_RECEIVED_MESSAGES = 1000; // how many msgID we store for duplicates check
const int MAX_OUTGOING_WINDOW_SIZE = 200; // how many unacked message we can store
// data flags
const uint8_t DATA_FLAG_EXTENDED_DATA_INCLUDED = 0x02;
const uint8_t DATA_FLAG_WANT_REPLY = 0x04;
const uint8_t DATA_FLAG_REQUEST_PREVIOUS_ACKS = 0x08;
const uint8_t DATA_FLAG_EXPLICIT_CONGESTION_NOTIFICATION = 0x10;
const uint8_t DATA_FLAG_ACK_BITFIELDS_INCLUDED = 0x40;
const uint8_t DATA_FLAG_EXPLICIT_ACKS_INCLUDED = 0x80;
struct Fragment
{
int fragmentNum;
size_t len;
bool isLast;
uint8_t buf[SSU_V4_MAX_PACKET_SIZE + 18]; // use biggest
Fragment () = default;
Fragment (int n, const uint8_t * b, int l, bool last):
fragmentNum (n), len (l), isLast (last) { memcpy (buf, b, len); };
};
struct FragmentCmp
{
bool operator() (const std::shared_ptr<Fragment>& f1, const std::shared_ptr<Fragment>& f2) const
{
return f1->fragmentNum < f2->fragmentNum;
};
};
struct IncompleteMessage
{
std::shared_ptr<I2NPMessage> msg;
int nextFragmentNum;
uint32_t lastFragmentInsertTime; // in seconds
uint64_t receivedFragmentsBits;
std::set<std::shared_ptr<Fragment>, FragmentCmp> savedFragments;
IncompleteMessage (std::shared_ptr<I2NPMessage>&& m): msg (m), nextFragmentNum (0),
lastFragmentInsertTime (0), receivedFragmentsBits (0) {};
void AttachNextFragment (const uint8_t * fragment, size_t fragmentSize);
};
struct SentMessage
{
std::vector<std::shared_ptr<Fragment> > fragments;
uint32_t nextResendTime; // in seconds
int numResends;
};
class SSUSession;
class SSUData
{
public:
SSUData (SSUSession& session);
~SSUData ();
void Start ();
void Stop ();
void CleanUp (uint64_t ts);
void ProcessMessage (uint8_t * buf, size_t len);
void FlushReceivedMessage ();
void Send (std::shared_ptr<i2p::I2NPMessage> msg);
void AdjustPacketSize (std::shared_ptr<const i2p::data::RouterInfo> remoteRouter);
void UpdatePacketSize (const i2p::data::IdentHash& remoteIdent);
private:
void SendMsgAck (uint32_t msgID);
void SendFragmentAck (uint32_t msgID, uint64_t bits);
void ProcessAcks (uint8_t *& buf, uint8_t flag);
void ProcessFragments (uint8_t * buf);
void ProcessSentMessageAck (uint32_t msgID);
void ScheduleResend ();
void HandleResendTimer (const boost::system::error_code& ecode);
private:
SSUSession& m_Session;
std::map<uint32_t, std::shared_ptr<IncompleteMessage> > m_IncompleteMessages;
std::map<uint32_t, std::shared_ptr<SentMessage> > m_SentMessages;
std::unordered_map<uint32_t, uint64_t> m_ReceivedMessages; // msgID -> timestamp in seconds
boost::asio::deadline_timer m_ResendTimer;
int m_MaxPacketSize, m_PacketSize;
i2p::I2NPMessagesHandler m_Handler;
uint32_t m_LastMessageReceivedTime; // in second
};
}
}
#endif

1318
libi2pd/SSUSession.cpp

File diff suppressed because it is too large Load Diff

177
libi2pd/SSUSession.h

@ -1,177 +0,0 @@
/*
* Copyright (c) 2013-2022, The PurpleI2P Project
*
* This file is part of Purple i2pd project and licensed under BSD3
*
* See full license text in LICENSE file at top of project tree
*/
#ifndef SSU_SESSION_H__
#define SSU_SESSION_H__
#include <inttypes.h>
#include <set>
#include <memory>
#include "Crypto.h"
#include "I2NPProtocol.h"
#include "TransportSession.h"
#include "SSUData.h"
namespace i2p
{
namespace transport
{
const uint8_t SSU_HEADER_EXTENDED_OPTIONS_INCLUDED = 0x04;
struct SSUHeader
{
uint8_t mac[16];
uint8_t iv[16];
uint8_t flag;
uint8_t time[4];
uint8_t GetPayloadType () const { return flag >> 4; };
bool IsExtendedOptions () const { return flag & SSU_HEADER_EXTENDED_OPTIONS_INCLUDED; };
};
const int SSU_CONNECT_TIMEOUT = 5; // 5 seconds
const int SSU_TERMINATION_TIMEOUT = 330; // 5.5 minutes
const int SSU_CLOCK_SKEW = 60; // in seconds
const int SSU_CLOCK_THRESHOLD = 15; // in seconds, if more we should adjust
const size_t SSU_MAX_I2NP_MESSAGE_SIZE = 32768;
// payload types (4 bits)
const uint8_t PAYLOAD_TYPE_SESSION_REQUEST = 0;
const uint8_t PAYLOAD_TYPE_SESSION_CREATED = 1;
const uint8_t PAYLOAD_TYPE_SESSION_CONFIRMED = 2;
const uint8_t PAYLOAD_TYPE_RELAY_REQUEST = 3;
const uint8_t PAYLOAD_TYPE_RELAY_RESPONSE = 4;
const uint8_t PAYLOAD_TYPE_RELAY_INTRO = 5;
const uint8_t PAYLOAD_TYPE_DATA = 6;
const uint8_t PAYLOAD_TYPE_PEER_TEST = 7;
const uint8_t PAYLOAD_TYPE_SESSION_DESTROYED = 8;
// extended options
const uint16_t EXTENDED_OPTIONS_FLAG_REQUEST_RELAY_TAG = 0x0001;
enum SessionState
{
eSessionStateUnknown,
eSessionStateIntroduced,
eSessionStateEstablished,
eSessionStateClosed,
eSessionStateFailed
};
enum PeerTestParticipant
{
ePeerTestParticipantUnknown = 0,
ePeerTestParticipantAlice1,
ePeerTestParticipantAlice2,
ePeerTestParticipantBob,
ePeerTestParticipantCharlie
};
class SSUServer;
class SSUSession: public TransportSession, public std::enable_shared_from_this<SSUSession>
{
public:
SSUSession (SSUServer& server, boost::asio::ip::udp::endpoint& remoteEndpoint,
std::shared_ptr<const i2p::data::RouterInfo> router = nullptr, bool peerTest = false);
void ProcessNextMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
~SSUSession ();
void Connect ();
void WaitForConnect ();
void Introduce (const i2p::data::RouterInfo::Introducer& introducer,
std::shared_ptr<const i2p::data::RouterInfo> to); // Alice to Charlie
void WaitForIntroduction ();
void Close ();
void Done ();
void Failed ();
const boost::asio::ip::udp::endpoint& GetRemoteEndpoint () { return m_RemoteEndpoint; };
SSUServer& GetServer () { return m_Server; };
bool IsV6 () const { return m_RemoteEndpoint.address ().is_v6 (); };
void SendI2NPMessages (const std::vector<std::shared_ptr<I2NPMessage> >& msgs);
void SendPeerTest (); // Alice
SessionState GetState () const { return m_State; };
size_t GetNumSentBytes () const { return m_NumSentBytes; };
size_t GetNumReceivedBytes () const { return m_NumReceivedBytes; };
void SendKeepAlive ();
uint32_t GetRelayTag () const { return m_RelayTag; };
const i2p::data::RouterInfo::IntroKey& GetIntroKey () const { return m_IntroKey; };
void FlushData ();
void CleanUp (uint64_t ts);
private:
boost::asio::io_service& GetService ();
void CreateAESandMacKey (const uint8_t * pubKey);
size_t GetSSUHeaderSize (const uint8_t * buf) const;
void PostI2NPMessages (std::vector<std::shared_ptr<I2NPMessage> > msgs);
void ProcessMessage (uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint); // call for established session
void ProcessSessionRequest (const uint8_t * buf, size_t len);
void SendSessionRequest ();
void SendRelayRequest (const i2p::data::RouterInfo::Introducer& introducer, uint32_t nonce);
void ProcessSessionCreated (uint8_t * buf, size_t len);
void SendSessionCreated (const uint8_t * x, bool sendRelayTag = true);
void ProcessSessionConfirmed (const uint8_t * buf, size_t len);
void SendSessionConfirmed (const uint8_t * y, const uint8_t * ourAddress, size_t ourAddressLen);
void ProcessRelayRequest (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& from);
void SendRelayResponse (uint32_t nonce, const boost::asio::ip::udp::endpoint& from,
const uint8_t * introKey, const boost::asio::ip::udp::endpoint& to);
void SendRelayIntro (std::shared_ptr<SSUSession> session, const boost::asio::ip::udp::endpoint& from);
void ProcessRelayResponse (const uint8_t * buf, size_t len);
void ProcessRelayIntro (const uint8_t * buf, size_t len);
void Established ();
void ScheduleConnectTimer ();
void HandleConnectTimer (const boost::system::error_code& ecode);
void ProcessPeerTest (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& senderEndpoint);
void SendPeerTest (uint32_t nonce, const boost::asio::ip::address& address, uint16_t port, const uint8_t * introKey, bool toAddress = true, bool sendAddress = true);
void ProcessData (uint8_t * buf, size_t len);
void SendSessionDestroyed ();
void Send (uint8_t type, const uint8_t * payload, size_t len); // with session key
void Send (const uint8_t * buf, size_t size);
void FillHeaderAndEncrypt (uint8_t payloadType, uint8_t * buf, size_t len, const i2p::crypto::AESKey& aesKey,
const uint8_t * iv, const i2p::crypto::MACKey& macKey, uint8_t flag = 0);
void FillHeaderAndEncrypt (uint8_t payloadType, uint8_t * buf, size_t len); // with session key
void FillHeaderAndEncrypt (uint8_t payloadType, uint8_t * in, size_t len, uint8_t * out); // with session key
void Decrypt (uint8_t * buf, size_t len, const i2p::crypto::AESKey& aesKey);
void DecryptSessionKey (uint8_t * buf, size_t len);
bool Validate (uint8_t * buf, size_t len, const i2p::crypto::MACKey& macKey);
void Reset ();
static size_t ExtractIPAddressAndPort (const uint8_t * buf, size_t len, boost::asio::ip::address& ip, uint16_t& port); // returns actual buf size
private:
friend class SSUData; // TODO: change in later
SSUServer& m_Server;
const boost::asio::ip::udp::endpoint m_RemoteEndpoint;
boost::asio::deadline_timer m_ConnectTimer;
bool m_IsPeerTest;
SessionState m_State;
bool m_IsSessionKey;
uint32_t m_RelayTag; // received from peer
uint32_t m_SentRelayTag; // sent by us
i2p::crypto::CBCEncryption m_SessionKeyEncryption;
i2p::crypto::CBCDecryption m_SessionKeyDecryption;
i2p::crypto::AESKey m_SessionKey;
i2p::crypto::MACKey m_MacKey;
i2p::data::RouterInfo::IntroKey m_IntroKey;
SSUData m_Data;
bool m_IsDataReceived;
std::unique_ptr<SignedData> m_SignedData; // we need it for SessionConfirmed only
std::map<uint32_t, std::pair <std::shared_ptr<const i2p::data::RouterInfo>, uint64_t > > m_RelayRequests; // nonce->(Charlie, timestamp)
std::shared_ptr<i2p::crypto::DHKeys> m_DHKeysPair; // X - for client and Y - for server
};
}
}
#endif
Loading…
Cancel
Save