Browse Source

split Streaming to stream and destination

pull/102/head
orignal 10 years ago
parent
commit
482fc0e8b1
  1. 1
      Daemon.cpp
  2. 414
      Destination.cpp
  3. 125
      Destination.h
  4. 2
      Garlic.cpp
  5. 1
      HTTPServer.cpp
  6. 1
      I2PTunnel.cpp
  7. 1
      SAM.cpp
  8. 1
      SOCKS.cpp
  9. 410
      Streaming.cpp
  10. 110
      Streaming.h
  11. 2
      Win32/i2pd.vcxproj
  12. 1
      build/CMakeLists.txt
  13. 7
      build/autotools/Makefile.in
  14. 14
      filelist.mk

1
Daemon.cpp

@ -13,6 +13,7 @@ @@ -13,6 +13,7 @@
#include "Garlic.h"
#include "util.h"
#include "Streaming.h"
#include "Destination.h"
#include "HTTPServer.h"
#include "HTTPProxy.h"
#include "SOCKS.h"

414
Destination.cpp

@ -0,0 +1,414 @@ @@ -0,0 +1,414 @@
#include <fstream>
#include <algorithm>
#include <cryptopp/dh.h>
#include <cryptopp/gzip.h>
#include "Log.h"
#include "util.h"
#include "Destination.h"
namespace i2p
{
namespace stream
{
StreamingDestination::StreamingDestination (boost::asio::io_service& service, bool isPublic):
m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
if (m_IsPublic)
LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created");
}
StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic):
m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{
std::ifstream s(fullPath.c_str (), std::ifstream::binary);
if (s.is_open ())
{
s.seekg (0, std::ios::end);
size_t len = s.tellg();
s.seekg (0, std::ios::beg);
uint8_t * buf = new uint8_t[len];
s.read ((char *)buf, len);
m_Keys.FromBuffer (buf, len);
delete[] buf;
LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p loaded");
}
else
{
LogPrint ("Can't open file ", fullPath, " Creating new one");
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/);
std::ofstream f (fullPath, std::ofstream::binary | std::ofstream::out);
size_t len = m_Keys.GetFullLen ();
uint8_t * buf = new uint8_t[len];
len = m_Keys.ToBuffer (buf, len);
f.write ((char *)buf, len);
delete[] buf;
LogPrint ("New private keys file ", fullPath, " for ", m_Keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p created");
}
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
}
StreamingDestination::StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic):
m_Service (service), m_Keys (keys), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
if (m_IsPublic)
LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created");
}
StreamingDestination::~StreamingDestination ()
{
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
for (auto it: m_Streams)
delete it.second;
m_Streams.clear ();
}
if (m_Pool)
i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);
delete m_LeaseSet;
}
void StreamingDestination::HandleNextPacket (Packet * packet)
{
uint32_t sendStreamID = packet->GetSendStreamID ();
if (sendStreamID)
{
auto it = m_Streams.find (sendStreamID);
if (it != m_Streams.end ())
it->second->HandleNextPacket (packet);
else
{
LogPrint ("Unknown stream ", sendStreamID);
delete packet;
}
}
else // new incoming stream
{
auto incomingStream = CreateNewIncomingStream ();
incomingStream->HandleNextPacket (packet);
if (m_Acceptor != nullptr)
m_Acceptor (incomingStream);
else
{
LogPrint ("Acceptor for incoming stream is not set");
DeleteStream (incomingStream);
}
}
}
Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote)
{
Stream * s = new Stream (m_Service, this, remote);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
}
Stream * StreamingDestination::CreateNewIncomingStream ()
{
Stream * s = new Stream (m_Service, this);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
}
void StreamingDestination::DeleteStream (Stream * stream)
{
if (stream)
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
auto it = m_Streams.find (stream->GetRecvStreamID ());
if (it != m_Streams.end ())
{
m_Streams.erase (it);
delete stream;
}
}
}
const i2p::data::LeaseSet * StreamingDestination::GetLeaseSet ()
{
if (!m_Pool) return nullptr;
if (!m_LeaseSet)
UpdateLeaseSet ();
return m_LeaseSet;
}
void StreamingDestination::UpdateLeaseSet ()
{
auto newLeaseSet = new i2p::data::LeaseSet (*m_Pool);
if (!m_LeaseSet)
m_LeaseSet = newLeaseSet;
else
{
// TODO: implement it better
*m_LeaseSet = *newLeaseSet;
delete newLeaseSet;
}
}
void StreamingDestination::SetLeaseSetUpdated ()
{
UpdateLeaseSet ();
for (auto it: m_Streams)
it.second->SetLeaseSetUpdated ();
if (m_IsPublic)
i2p::data::netdb.PublishLeaseSet (m_LeaseSet, m_Pool);
}
StreamingDestinations destinations;
void StreamingDestinations::Start ()
{
if (!m_SharedLocalDestination)
{
m_SharedLocalDestination = new StreamingDestination (m_Service, false); // non-public
m_Destinations[m_SharedLocalDestination->GetIdentity ().GetIdentHash ()] = m_SharedLocalDestination;
}
// LoadLocalDestinations ();
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&StreamingDestinations::Run, this));
}
void StreamingDestinations::Stop ()
{
for (auto it: m_Destinations)
delete it.second;
m_Destinations.clear ();
m_SharedLocalDestination = 0; // deleted through m_Destination
m_IsRunning = false;
m_Service.stop ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = 0;
}
}
void StreamingDestinations::Run ()
{
m_Service.run ();
}
void StreamingDestinations::LoadLocalDestinations ()
{
int numDestinations = 0;
boost::filesystem::path p (i2p::util::filesystem::GetDataDir());
boost::filesystem::directory_iterator end;
for (boost::filesystem::directory_iterator it (p); it != end; ++it)
{
if (boost::filesystem::is_regular_file (*it) && it->path ().extension () == ".dat")
{
auto fullPath =
#if BOOST_VERSION > 10500
it->path().string();
#else
it->path();
#endif
auto localDestination = new StreamingDestination (m_Service, fullPath, true);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
numDestinations++;
}
}
if (numDestinations > 0)
LogPrint (numDestinations, " local destinations loaded");
}
StreamingDestination * StreamingDestinations::LoadLocalDestination (const std::string& filename, bool isPublic)
{
auto localDestination = new StreamingDestination (m_Service, i2p::util::filesystem::GetFullPath (filename), isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
return localDestination;
}
StreamingDestination * StreamingDestinations::CreateNewLocalDestination (bool isPublic)
{
auto localDestination = new StreamingDestination (m_Service, isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
return localDestination;
}
void StreamingDestinations::DeleteLocalDestination (StreamingDestination * destination)
{
if (!destination) return;
auto it = m_Destinations.find (destination->GetIdentHash ());
if (it != m_Destinations.end ())
{
auto d = it->second;
{
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations.erase (it);
}
delete d;
}
}
StreamingDestination * StreamingDestinations::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic)
{
auto it = m_Destinations.find (keys.GetPublic ().GetIdentHash ());
if (it != m_Destinations.end ())
{
LogPrint ("Local destination ", keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p exists");
return nullptr;
}
auto localDestination = new StreamingDestination (m_Service, keys, isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination;
return localDestination;
}
Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote)
{
if (!m_SharedLocalDestination) return nullptr;
return m_SharedLocalDestination->CreateNewOutgoingStream (remote);
}
void StreamingDestinations::DeleteStream (Stream * stream)
{
if (stream)
stream->GetLocalDestination ()->DeleteStream (stream);
}
void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet)
{
m_Service.post (boost::bind (&StreamingDestinations::PostNextPacket, this, destination, packet));
}
void StreamingDestinations::PostNextPacket (i2p::data::IdentHash destination, Packet * packet)
{
auto it = m_Destinations.find (destination);
if (it != m_Destinations.end ())
it->second->HandleNextPacket (packet);
else
{
LogPrint ("Local destination ", destination.ToBase64 (), " not found");
delete packet;
}
}
StreamingDestination * StreamingDestinations::FindLocalDestination (const i2p::data::IdentHash& destination) const
{
auto it = m_Destinations.find (destination);
if (it != m_Destinations.end ())
return it->second;
return nullptr;
}
Stream * CreateStream (const i2p::data::LeaseSet& remote)
{
return destinations.CreateClientStream (remote);
}
void DeleteStream (Stream * stream)
{
destinations.DeleteStream (stream);
}
void StartStreaming ()
{
destinations.Start ();
}
void StopStreaming ()
{
destinations.Stop ();
}
StreamingDestination * GetSharedLocalDestination ()
{
return destinations.GetSharedLocalDestination ();
}
StreamingDestination * CreateNewLocalDestination (bool isPublic)
{
return destinations.CreateNewLocalDestination (isPublic);
}
StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic)
{
return destinations.CreateNewLocalDestination (keys, isPublic);
}
void DeleteLocalDestination (StreamingDestination * destination)
{
destinations.DeleteLocalDestination (destination);
}
StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination)
{
return destinations.FindLocalDestination (destination);
}
StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic)
{
return destinations.LoadLocalDestination (filename, isPublic);
}
const StreamingDestinations& GetLocalDestinations ()
{
return destinations;
}
void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len)
{
uint32_t length = be32toh (*(uint32_t *)buf);
buf += 4;
// we assume I2CP payload
if (buf[9] == 6) // streaming protocol
{
// unzip it
CryptoPP::Gunzip decompressor;
decompressor.Put (buf, length);
decompressor.MessageEnd();
Packet * uncompressed = new Packet;
uncompressed->offset = 0;
uncompressed->len = decompressor.MaxRetrievable ();
if (uncompressed->len > MAX_PACKET_SIZE)
{
LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size");
uncompressed->len = MAX_PACKET_SIZE;
}
decompressor.Get (uncompressed->buf, uncompressed->len);
// then forward to streaming engine thread
destinations.HandleNextPacket (destination, uncompressed);
}
else
LogPrint ("Data: protocol ", buf[9], " is not supported");
}
I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len)
{
I2NPMessage * msg = NewI2NPShortMessage ();
CryptoPP::Gzip compressor; // DEFAULT_DEFLATE_LEVEL
if (len <= COMPRESSION_THRESHOLD_SIZE)
compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL);
compressor.Put (payload, len);
compressor.MessageEnd();
int size = compressor.MaxRetrievable ();
uint8_t * buf = msg->GetPayload ();
*(uint32_t *)buf = htobe32 (size); // length
buf += 4;
compressor.Get (buf, size);
memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later
buf[9] = 6; // streaming protocol
msg->len += size + 4;
FillI2NPMessageHeader (msg, eI2NPData);
return msg;
}
}
}

125
Destination.h

@ -0,0 +1,125 @@ @@ -0,0 +1,125 @@
#ifndef DESTINATION_H__
#define DESTINATION_H__
#include <thread>
#include <mutex>
#include "Identity.h"
#include "TunnelPool.h"
#include "CryptoConst.h"
#include "NetDb.h"
#include "Streaming.h"
namespace i2p
{
namespace stream
{
class StreamingDestination: public i2p::data::LocalDestination
{
public:
StreamingDestination (boost::asio::io_service& service, bool isPublic);
StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic);
StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic);
~StreamingDestination ();
const i2p::data::LeaseSet * GetLeaseSet ();
i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; };
Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream);
void SetAcceptor (const std::function<void (Stream *)>& acceptor) { m_Acceptor = acceptor; };
void ResetAcceptor () { m_Acceptor = nullptr; };
bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
void HandleNextPacket (Packet * packet);
// implements LocalDestination
const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; };
const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; };
const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; };
void SetLeaseSetUpdated ();
private:
Stream * CreateNewIncomingStream ();
void UpdateLeaseSet ();
private:
boost::asio::io_service& m_Service;
std::mutex m_StreamsMutex;
std::map<uint32_t, Stream *> m_Streams;
i2p::data::PrivateKeys m_Keys;
uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256];
i2p::tunnel::TunnelPool * m_Pool;
i2p::data::LeaseSet * m_LeaseSet;
bool m_IsPublic;
std::function<void (Stream *)> m_Acceptor;
};
class StreamingDestinations
{
public:
StreamingDestinations (): m_IsRunning (false), m_Thread (nullptr),
m_Work (m_Service), m_SharedLocalDestination (nullptr) {};
~StreamingDestinations () {};
void Start ();
void Stop ();
void HandleNextPacket (i2p::data::IdentHash destination, Packet * packet);
Stream * CreateClientStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream);
StreamingDestination * GetSharedLocalDestination () const { return m_SharedLocalDestination; };
StreamingDestination * CreateNewLocalDestination (bool isPublic);
StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic);
void DeleteLocalDestination (StreamingDestination * destination);
StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination) const;
StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic);
private:
void Run ();
void LoadLocalDestinations ();
void PostNextPacket (i2p::data::IdentHash destination, Packet * packet);
private:
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work;
std::mutex m_DestinationsMutex;
std::map<i2p::data::IdentHash, StreamingDestination *> m_Destinations;
StreamingDestination * m_SharedLocalDestination;
public:
// for HTTP
const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; };
};
Stream * CreateStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream);
void StartStreaming ();
void StopStreaming ();
StreamingDestination * GetSharedLocalDestination ();
StreamingDestination * CreateNewLocalDestination (bool isPublic = true);
StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true);
void DeleteLocalDestination (StreamingDestination * destination);
StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination);
StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic);
// for HTTP
const StreamingDestinations& GetLocalDestinations ();
// assuming data is I2CP message
void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len);
I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len);
}
}
#endif

2
Garlic.cpp

@ -7,7 +7,7 @@ @@ -7,7 +7,7 @@
#include "Tunnel.h"
#include "TunnelPool.h"
#include "Timestamp.h"
#include "Streaming.h"
#include "Destination.h"
#include "Garlic.h"
namespace i2p

1
HTTPServer.cpp

@ -8,6 +8,7 @@ @@ -8,6 +8,7 @@
#include "NetDb.h"
#include "I2PEndian.h"
#include "Streaming.h"
#include "Destination.h"
#include "RouterContext.h"
#include "HTTPServer.h"

1
I2PTunnel.cpp

@ -2,6 +2,7 @@ @@ -2,6 +2,7 @@
#include "base64.h"
#include "Log.h"
#include "NetDb.h"
#include "Destination.h"
#include "I2PTunnel.h"
namespace i2p

1
SAM.cpp

@ -5,6 +5,7 @@ @@ -5,6 +5,7 @@
#include "Identity.h"
#include "Log.h"
#include "NetDb.h"
#include "Destination.h"
#include "SAM.h"
namespace i2p

1
SOCKS.cpp

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
#include "SOCKS.h"
#include "Identity.h"
#include "NetDb.h"
#include "Destination.h"
#include <cstring>
#include <stdexcept>
#include <boost/date_time/posix_time/posix_time.hpp>

410
Streaming.cpp

@ -1,15 +1,9 @@ @@ -1,15 +1,9 @@
#include <fstream>
#include <algorithm>
#include <cryptopp/dh.h>
#include <cryptopp/gzip.h>
#include "util.h"
#include "Log.h"
#include "RouterInfo.h"
#include "RouterContext.h"
#include "Tunnel.h"
#include "Timestamp.h"
#include "CryptoConst.h"
#include "NetDb.h"
#include "Destination.h"
#include "Streaming.h"
namespace i2p
@ -514,407 +508,5 @@ namespace stream @@ -514,407 +508,5 @@ namespace stream
else
m_CurrentRemoteLease.endDate = 0;
}
StreamingDestination::StreamingDestination (boost::asio::io_service& service, bool isPublic):
m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
if (m_IsPublic)
LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created");
}
StreamingDestination::StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic):
m_Service (service), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{
std::ifstream s(fullPath.c_str (), std::ifstream::binary);
if (s.is_open ())
{
s.seekg (0, std::ios::end);
size_t len = s.tellg();
s.seekg (0, std::ios::beg);
uint8_t * buf = new uint8_t[len];
s.read ((char *)buf, len);
m_Keys.FromBuffer (buf, len);
delete[] buf;
LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p loaded");
}
else
{
LogPrint ("Can't open file ", fullPath, " Creating new one");
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/);
std::ofstream f (fullPath, std::ofstream::binary | std::ofstream::out);
size_t len = m_Keys.GetFullLen ();
uint8_t * buf = new uint8_t[len];
len = m_Keys.ToBuffer (buf, len);
f.write ((char *)buf, len);
delete[] buf;
LogPrint ("New private keys file ", fullPath, " for ", m_Keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p created");
}
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
}
StreamingDestination::StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic):
m_Service (service), m_Keys (keys), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
if (m_IsPublic)
LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created");
}
StreamingDestination::~StreamingDestination ()
{
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
for (auto it: m_Streams)
delete it.second;
m_Streams.clear ();
}
if (m_Pool)
i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);
delete m_LeaseSet;
}
void StreamingDestination::HandleNextPacket (Packet * packet)
{
uint32_t sendStreamID = packet->GetSendStreamID ();
if (sendStreamID)
{
auto it = m_Streams.find (sendStreamID);
if (it != m_Streams.end ())
it->second->HandleNextPacket (packet);
else
{
LogPrint ("Unknown stream ", sendStreamID);
delete packet;
}
}
else // new incoming stream
{
auto incomingStream = CreateNewIncomingStream ();
incomingStream->HandleNextPacket (packet);
if (m_Acceptor != nullptr)
m_Acceptor (incomingStream);
else
{
LogPrint ("Acceptor for incoming stream is not set");
DeleteStream (incomingStream);
}
}
}
Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote)
{
Stream * s = new Stream (m_Service, this, remote);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
}
Stream * StreamingDestination::CreateNewIncomingStream ()
{
Stream * s = new Stream (m_Service, this);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
}
void StreamingDestination::DeleteStream (Stream * stream)
{
if (stream)
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
auto it = m_Streams.find (stream->GetRecvStreamID ());
if (it != m_Streams.end ())
{
m_Streams.erase (it);
delete stream;
}
}
}
const i2p::data::LeaseSet * StreamingDestination::GetLeaseSet ()
{
if (!m_Pool) return nullptr;
if (!m_LeaseSet)
UpdateLeaseSet ();
return m_LeaseSet;
}
void StreamingDestination::UpdateLeaseSet ()
{
auto newLeaseSet = new i2p::data::LeaseSet (*m_Pool);
if (!m_LeaseSet)
m_LeaseSet = newLeaseSet;
else
{
// TODO: implement it better
*m_LeaseSet = *newLeaseSet;
delete newLeaseSet;
}
}
void StreamingDestination::SetLeaseSetUpdated ()
{
UpdateLeaseSet ();
for (auto it: m_Streams)
it.second->SetLeaseSetUpdated ();
if (m_IsPublic)
i2p::data::netdb.PublishLeaseSet (m_LeaseSet, m_Pool);
}
StreamingDestinations destinations;
void StreamingDestinations::Start ()
{
if (!m_SharedLocalDestination)
{
m_SharedLocalDestination = new StreamingDestination (m_Service, false); // non-public
m_Destinations[m_SharedLocalDestination->GetIdentity ().GetIdentHash ()] = m_SharedLocalDestination;
}
// LoadLocalDestinations ();
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&StreamingDestinations::Run, this));
}
void StreamingDestinations::Stop ()
{
for (auto it: m_Destinations)
delete it.second;
m_Destinations.clear ();
m_SharedLocalDestination = 0; // deleted through m_Destination
m_IsRunning = false;
m_Service.stop ();
if (m_Thread)
{
m_Thread->join ();
delete m_Thread;
m_Thread = 0;
}
}
void StreamingDestinations::Run ()
{
m_Service.run ();
}
void StreamingDestinations::LoadLocalDestinations ()
{
int numDestinations = 0;
boost::filesystem::path p (i2p::util::filesystem::GetDataDir());
boost::filesystem::directory_iterator end;
for (boost::filesystem::directory_iterator it (p); it != end; ++it)
{
if (boost::filesystem::is_regular_file (*it) && it->path ().extension () == ".dat")
{
auto fullPath =
#if BOOST_VERSION > 10500
it->path().string();
#else
it->path();
#endif
auto localDestination = new StreamingDestination (m_Service, fullPath, true);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
numDestinations++;
}
}
if (numDestinations > 0)
LogPrint (numDestinations, " local destinations loaded");
}
StreamingDestination * StreamingDestinations::LoadLocalDestination (const std::string& filename, bool isPublic)
{
auto localDestination = new StreamingDestination (m_Service, i2p::util::filesystem::GetFullPath (filename), isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
return localDestination;
}
StreamingDestination * StreamingDestinations::CreateNewLocalDestination (bool isPublic)
{
auto localDestination = new StreamingDestination (m_Service, isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
return localDestination;
}
void StreamingDestinations::DeleteLocalDestination (StreamingDestination * destination)
{
if (!destination) return;
auto it = m_Destinations.find (destination->GetIdentHash ());
if (it != m_Destinations.end ())
{
auto d = it->second;
{
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations.erase (it);
}
delete d;
}
}
StreamingDestination * StreamingDestinations::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic)
{
auto it = m_Destinations.find (keys.GetPublic ().GetIdentHash ());
if (it != m_Destinations.end ())
{
LogPrint ("Local destination ", keys.GetPublic ().GetIdentHash ().ToBase32 (), ".b32.i2p exists");
return nullptr;
}
auto localDestination = new StreamingDestination (m_Service, keys, isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination;
return localDestination;
}
Stream * StreamingDestinations::CreateClientStream (const i2p::data::LeaseSet& remote)
{
if (!m_SharedLocalDestination) return nullptr;
return m_SharedLocalDestination->CreateNewOutgoingStream (remote);
}
void StreamingDestinations::DeleteStream (Stream * stream)
{
if (stream)
stream->GetLocalDestination ()->DeleteStream (stream);
}
void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet)
{
m_Service.post (boost::bind (&StreamingDestinations::PostNextPacket, this, destination, packet));
}
void StreamingDestinations::PostNextPacket (i2p::data::IdentHash destination, Packet * packet)
{
auto it = m_Destinations.find (destination);
if (it != m_Destinations.end ())
it->second->HandleNextPacket (packet);
else
{
LogPrint ("Local destination ", destination.ToBase64 (), " not found");
delete packet;
}
}
StreamingDestination * StreamingDestinations::FindLocalDestination (const i2p::data::IdentHash& destination) const
{
auto it = m_Destinations.find (destination);
if (it != m_Destinations.end ())
return it->second;
return nullptr;
}
Stream * CreateStream (const i2p::data::LeaseSet& remote)
{
return destinations.CreateClientStream (remote);
}
void DeleteStream (Stream * stream)
{
destinations.DeleteStream (stream);
}
void StartStreaming ()
{
destinations.Start ();
}
void StopStreaming ()
{
destinations.Stop ();
}
StreamingDestination * GetSharedLocalDestination ()
{
return destinations.GetSharedLocalDestination ();
}
StreamingDestination * CreateNewLocalDestination (bool isPublic)
{
return destinations.CreateNewLocalDestination (isPublic);
}
StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic)
{
return destinations.CreateNewLocalDestination (keys, isPublic);
}
void DeleteLocalDestination (StreamingDestination * destination)
{
destinations.DeleteLocalDestination (destination);
}
StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination)
{
return destinations.FindLocalDestination (destination);
}
StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic)
{
return destinations.LoadLocalDestination (filename, isPublic);
}
const StreamingDestinations& GetLocalDestinations ()
{
return destinations;
}
void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len)
{
uint32_t length = be32toh (*(uint32_t *)buf);
buf += 4;
// we assume I2CP payload
if (buf[9] == 6) // streaming protocol
{
// unzip it
CryptoPP::Gunzip decompressor;
decompressor.Put (buf, length);
decompressor.MessageEnd();
Packet * uncompressed = new Packet;
uncompressed->offset = 0;
uncompressed->len = decompressor.MaxRetrievable ();
if (uncompressed->len > MAX_PACKET_SIZE)
{
LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size");
uncompressed->len = MAX_PACKET_SIZE;
}
decompressor.Get (uncompressed->buf, uncompressed->len);
// then forward to streaming engine thread
destinations.HandleNextPacket (destination, uncompressed);
}
else
LogPrint ("Data: protocol ", buf[9], " is not supported");
}
I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len)
{
I2NPMessage * msg = NewI2NPShortMessage ();
CryptoPP::Gzip compressor; // DEFAULT_DEFLATE_LEVEL
if (len <= COMPRESSION_THRESHOLD_SIZE)
compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL);
compressor.Put (payload, len);
compressor.MessageEnd();
int size = compressor.MaxRetrievable ();
uint8_t * buf = msg->GetPayload ();
*(uint32_t *)buf = htobe32 (size); // length
buf += 4;
compressor.Get (buf, size);
memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later
buf[9] = 6; // streaming protocol
msg->len += size + 4;
FillI2NPMessageHeader (msg, eI2NPData);
return msg;
}
}
}

110
Streaming.h

@ -6,17 +6,13 @@ @@ -6,17 +6,13 @@
#include <map>
#include <set>
#include <queue>
#include <thread>
#include <functional>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <cryptopp/dsa.h>
#include "I2PEndian.h"
#include "Identity.h"
#include "LeaseSet.h"
#include "I2NPProtocol.h"
#include "TunnelPool.h"
#include "Garlic.h"
namespace i2p
@ -139,112 +135,6 @@ namespace stream @@ -139,112 +135,6 @@ namespace stream
boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer;
};
class StreamingDestination: public i2p::data::LocalDestination
{
public:
StreamingDestination (boost::asio::io_service& service, bool isPublic);
StreamingDestination (boost::asio::io_service& service, const std::string& fullPath, bool isPublic);
StreamingDestination (boost::asio::io_service& service, const i2p::data::PrivateKeys& keys, bool isPublic);
~StreamingDestination ();
const i2p::data::LeaseSet * GetLeaseSet ();
i2p::tunnel::TunnelPool * GetTunnelPool () const { return m_Pool; };
Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream);
void SetAcceptor (const std::function<void (Stream *)>& acceptor) { m_Acceptor = acceptor; };
void ResetAcceptor () { m_Acceptor = nullptr; };
bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
void HandleNextPacket (Packet * packet);
// implements LocalDestination
const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; };
const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; };
const uint8_t * GetEncryptionPublicKey () const { return m_EncryptionPublicKey; };
void SetLeaseSetUpdated ();
private:
Stream * CreateNewIncomingStream ();
void UpdateLeaseSet ();
private:
boost::asio::io_service& m_Service;
std::mutex m_StreamsMutex;
std::map<uint32_t, Stream *> m_Streams;
i2p::data::PrivateKeys m_Keys;
uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256];
i2p::tunnel::TunnelPool * m_Pool;
i2p::data::LeaseSet * m_LeaseSet;
bool m_IsPublic;
std::function<void (Stream *)> m_Acceptor;
};
class StreamingDestinations
{
public:
StreamingDestinations (): m_IsRunning (false), m_Thread (nullptr),
m_Work (m_Service), m_SharedLocalDestination (nullptr) {};
~StreamingDestinations () {};
void Start ();
void Stop ();
void HandleNextPacket (i2p::data::IdentHash destination, Packet * packet);
Stream * CreateClientStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream);
StreamingDestination * GetSharedLocalDestination () const { return m_SharedLocalDestination; };
StreamingDestination * CreateNewLocalDestination (bool isPublic);
StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic);
void DeleteLocalDestination (StreamingDestination * destination);
StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination) const;
StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic);
private:
void Run ();
void LoadLocalDestinations ();
void PostNextPacket (i2p::data::IdentHash destination, Packet * packet);
private:
bool m_IsRunning;
std::thread * m_Thread;
boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work;
std::mutex m_DestinationsMutex;
std::map<i2p::data::IdentHash, StreamingDestination *> m_Destinations;
StreamingDestination * m_SharedLocalDestination;
public:
// for HTTP
const decltype(m_Destinations)& GetDestinations () const { return m_Destinations; };
};
Stream * CreateStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream);
void StartStreaming ();
void StopStreaming ();
StreamingDestination * GetSharedLocalDestination ();
StreamingDestination * CreateNewLocalDestination (bool isPublic = true);
StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true);
void DeleteLocalDestination (StreamingDestination * destination);
StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination);
StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic);
// for HTTP
const StreamingDestinations& GetLocalDestinations ();
// assuming data is I2CP message
void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len);
I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len);
//-------------------------------------------------
template<typename Buffer, typename ReceiveHandler>

2
Win32/i2pd.vcxproj

@ -35,6 +35,7 @@ @@ -35,6 +35,7 @@
<ClCompile Include="..\SSU.cpp" />
<ClCompile Include="..\SSUData.cpp" />
<ClCompile Include="..\Streaming.cpp" />
<ClCompile Include="..\Destination.cpp" />
<ClCompile Include="..\TransitTunnel.cpp" />
<ClCompile Include="..\Transports.cpp" />
<ClCompile Include="..\Tunnel.cpp" />
@ -72,6 +73,7 @@ @@ -72,6 +73,7 @@
<ClInclude Include="..\SSU.h" />
<ClInclude Include="..\SSUData.h" />
<ClInclude Include="..\Streaming.h" />
<ClInclude Include="..\Destination.h" />
<ClInclude Include="..\Timestamp.h" />
<ClInclude Include="..\TransitTunnel.h" />
<ClInclude Include="..\Transports.h" />

1
build/CMakeLists.txt

@ -30,6 +30,7 @@ set (SOURCES @@ -30,6 +30,7 @@ set (SOURCES
"${CMAKE_SOURCE_DIR}/SSU.cpp"
"${CMAKE_SOURCE_DIR}/SSUData.cpp"
"${CMAKE_SOURCE_DIR}/Streaming.cpp"
"${CMAKE_SOURCE_DIR}/Destination.cpp"
"${CMAKE_SOURCE_DIR}/TransitTunnel.cpp"
"${CMAKE_SOURCE_DIR}/Tunnel.cpp"
"${CMAKE_SOURCE_DIR}/TunnelGateway.cpp"

7
build/autotools/Makefile.in

@ -114,7 +114,7 @@ am_i2p_OBJECTS = AddressBook.$(OBJEXT) CryptoConst.$(OBJEXT) \ @@ -114,7 +114,7 @@ am_i2p_OBJECTS = AddressBook.$(OBJEXT) CryptoConst.$(OBJEXT) \
Transports.$(OBJEXT) Tunnel.$(OBJEXT) TunnelEndpoint.$(OBJEXT) \
TunnelGateway.$(OBJEXT) TunnelPool.$(OBJEXT) UPnP.$(OBJEXT) \
aes.$(OBJEXT) base64.$(OBJEXT) i2p.$(OBJEXT) util.$(OBJEXT) \
SAM.$(OBJEXT)
SAM.$(OBJEXT) Destination.$(OBJEXT)
i2p_OBJECTS = $(am_i2p_OBJECTS)
i2p_LDADD = $(LDADD)
AM_V_P = $(am__v_P_@AM_V@)
@ -324,7 +324,7 @@ i2p_SOURCES = AddressBook.cpp CryptoConst.cpp Daemon.cpp \ @@ -324,7 +324,7 @@ i2p_SOURCES = AddressBook.cpp CryptoConst.cpp Daemon.cpp \
SSUData.cpp Streaming.cpp TransitTunnel.cpp \
Transports.cpp Tunnel.cpp TunnelEndpoint.cpp \
TunnelGateway.cpp TunnelPool.cpp UPnP.cpp aes.cpp \
base64.cpp i2p.cpp util.cpp \
base64.cpp i2p.cpp util.cpp SAM.cpp Destination.cpp \
\
AddressBook.h CryptoConst.h Daemon.h ElGamal.h \
Garlic.h HTTPProxy.h HTTPServer.h I2NPProtocol.h \
@ -335,7 +335,7 @@ i2p_SOURCES = AddressBook.cpp CryptoConst.cpp Daemon.cpp \ @@ -335,7 +335,7 @@ i2p_SOURCES = AddressBook.cpp CryptoConst.cpp Daemon.cpp \
TransitTunnel.h Transports.h Tunnel.h TunnelBase.h \
TunnelConfig.h TunnelEndpoint.h TunnelGateway.h \
TunnelPool.h UPnP.h aes.h base64.h config.h hmac.h \
util.h version.h
util.h version.h Destination.h
AM_LDFLAGS = @BOOST_DATE_TIME_LIB@ @BOOST_FILESYSTEM_LIB@ \
@BOOST_PROGRAM_OPTIONS_LIB@ @BOOST_REGEX_LIB@ \
@ -470,6 +470,7 @@ distclean-compile: @@ -470,6 +470,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SSU.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SSUData.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Streaming.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Destination.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TransitTunnel.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Transports.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Tunnel.Po@am__quote@

14
filelist.mk

@ -2,16 +2,18 @@ @@ -2,16 +2,18 @@
CPP_FILES := CryptoConst.cpp base64.cpp NTCPSession.cpp RouterInfo.cpp Transports.cpp \
RouterContext.cpp NetDb.cpp LeaseSet.cpp Tunnel.cpp TunnelEndpoint.cpp TunnelGateway.cpp \
TransitTunnel.cpp I2NPProtocol.cpp Log.cpp Garlic.cpp HTTPServer.cpp Streaming.cpp Identity.cpp \
SSU.cpp util.cpp Reseed.cpp DaemonLinux.cpp SSUData.cpp i2p.cpp aes.cpp SOCKS.cpp UPnP.cpp \
TunnelPool.cpp HTTPProxy.cpp AddressBook.cpp Daemon.cpp I2PTunnel.cpp SAM.cpp
TransitTunnel.cpp I2NPProtocol.cpp Log.cpp Garlic.cpp HTTPServer.cpp Streaming.cpp \
Destination.cpp Identity.cpp SSU.cpp util.cpp Reseed.cpp DaemonLinux.cpp SSUData.cpp \
aes.cpp SOCKS.cpp UPnP.cpp TunnelPool.cpp HTTPProxy.cpp AddressBook.cpp Daemon.cpp \
I2PTunnel.cpp SAM.cpp i2p.cpp
H_FILES := CryptoConst.h base64.h NTCPSession.h RouterInfo.h Transports.h \
RouterContext.h NetDb.h LeaseSet.h Tunnel.h TunnelEndpoint.h TunnelGateway.h \
TransitTunnel.h I2NPProtocol.h Log.h Garlic.h HTTPServer.h Streaming.h Identity.h \
SSU.h util.h Reseed.h DaemonLinux.h SSUData.h i2p.h aes.h SOCKS.h UPnP.h TunnelPool.h \
HTTPProxy.h AddressBook.h Daemon.h I2PTunnel.h version.h Signature.h SAM.h
TransitTunnel.h I2NPProtocol.h Log.h Garlic.h HTTPServer.h Streaming.h Destination.h \
Identity.h SSU.h util.h Reseed.h DaemonLinux.h SSUData.h i2p.h aes.h SOCKS.h \
UPnP.h TunnelPool.h HTTPProxy.h AddressBook.h Daemon.h I2PTunnel.h version.h \
Signature.h SAM.h
OBJECTS = $(addprefix obj/, $(notdir $(CPP_FILES:.cpp=.o)))

Loading…
Cancel
Save