Browse Source

moved StreamingDestination inside ClientDestination

pull/105/head
orignal 10 years ago
parent
commit
4d97b0e206
  1. 22
      ClientContext.cpp
  2. 16
      ClientContext.h
  3. 91
      Destination.cpp
  4. 52
      Destination.h
  5. 6
      HTTPServer.cpp
  6. 12
      I2PTunnel.cpp
  7. 13
      I2PTunnel.h
  8. 16
      SAM.cpp
  9. 3
      SAM.h
  10. 2
      SOCKS.cpp
  11. 102
      Streaming.cpp
  12. 41
      Streaming.h

22
ClientContext.cpp

@ -27,7 +27,7 @@ namespace client @@ -27,7 +27,7 @@ namespace client
{
if (!m_SharedLocalDestination)
{
m_SharedLocalDestination = new i2p::stream::StreamingDestination (false, i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // non-public, DSA
m_SharedLocalDestination = new ClientDestination (false, i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // non-public, DSA
m_Destinations[m_SharedLocalDestination->GetIdentity ().GetIdentHash ()] = m_SharedLocalDestination;
m_SharedLocalDestination->Start ();
}
@ -41,7 +41,7 @@ namespace client @@ -41,7 +41,7 @@ namespace client
std::string ircDestination = i2p::util::config::GetArg("-ircdest", "");
if (ircDestination.length () > 0) // ircdest is presented
{
i2p::stream::StreamingDestination * localDestination = nullptr;
ClientDestination * localDestination = nullptr;
std::string ircKeys = i2p::util::config::GetArg("-irckeys", "");
if (ircKeys.length () > 0)
localDestination = i2p::client::context.LoadLocalDestination (ircKeys, false);
@ -125,7 +125,7 @@ namespace client @@ -125,7 +125,7 @@ namespace client
#else
it->path();
#endif
auto localDestination = new i2p::stream::StreamingDestination (fullPath, true);
auto localDestination = new ClientDestination (fullPath, true);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
numDestinations++;
}
@ -134,25 +134,25 @@ namespace client @@ -134,25 +134,25 @@ namespace client
LogPrint (numDestinations, " local destinations loaded");
}
i2p::stream::StreamingDestination * ClientContext::LoadLocalDestination (const std::string& filename, bool isPublic)
ClientDestination * ClientContext::LoadLocalDestination (const std::string& filename, bool isPublic)
{
auto localDestination = new i2p::stream::StreamingDestination (i2p::util::filesystem::GetFullPath (filename), isPublic);
auto localDestination = new ClientDestination (i2p::util::filesystem::GetFullPath (filename), isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
localDestination->Start ();
return localDestination;
}
i2p::stream::StreamingDestination * ClientContext::CreateNewLocalDestination (bool isPublic, i2p::data::SigningKeyType sigType)
ClientDestination * ClientContext::CreateNewLocalDestination (bool isPublic, i2p::data::SigningKeyType sigType)
{
auto localDestination = new i2p::stream::StreamingDestination (isPublic, sigType);
auto localDestination = new ClientDestination (isPublic, sigType);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination;
localDestination->Start ();
return localDestination;
}
void ClientContext::DeleteLocalDestination (i2p::stream::StreamingDestination * destination)
void ClientContext::DeleteLocalDestination (ClientDestination * destination)
{
if (!destination) return;
auto it = m_Destinations.find (destination->GetIdentHash ());
@ -168,7 +168,7 @@ namespace client @@ -168,7 +168,7 @@ namespace client
}
}
i2p::stream::StreamingDestination * ClientContext::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic)
ClientDestination * ClientContext::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic)
{
auto it = m_Destinations.find (keys.GetPublic ().GetIdentHash ());
if (it != m_Destinations.end ())
@ -181,14 +181,14 @@ namespace client @@ -181,14 +181,14 @@ namespace client
}
return nullptr;
}
auto localDestination = new i2p::stream::StreamingDestination (keys, isPublic);
auto localDestination = new ClientDestination (keys, isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination;
localDestination->Start ();
return localDestination;
}
i2p::stream::StreamingDestination * ClientContext::FindLocalDestination (const i2p::data::IdentHash& destination) const
ClientDestination * ClientContext::FindLocalDestination (const i2p::data::IdentHash& destination) const
{
auto it = m_Destinations.find (destination);
if (it != m_Destinations.end ())

16
ClientContext.h

@ -22,12 +22,12 @@ namespace client @@ -22,12 +22,12 @@ namespace client
void Start ();
void Stop ();
i2p::stream::StreamingDestination * GetSharedLocalDestination () const { return m_SharedLocalDestination; };
i2p::stream::StreamingDestination * CreateNewLocalDestination (bool isPublic = true, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // transient
i2p::stream::StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true);
void DeleteLocalDestination (i2p::stream::StreamingDestination * destination);
i2p::stream::StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination) const;
i2p::stream::StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic);
ClientDestination * GetSharedLocalDestination () const { return m_SharedLocalDestination; };
ClientDestination * CreateNewLocalDestination (bool isPublic = true, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // transient
ClientDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true);
void DeleteLocalDestination (ClientDestination * destination);
ClientDestination * FindLocalDestination (const i2p::data::IdentHash& destination) const;
ClientDestination * LoadLocalDestination (const std::string& filename, bool isPublic);
private:
@ -36,8 +36,8 @@ namespace client @@ -36,8 +36,8 @@ namespace client
private:
std::mutex m_DestinationsMutex;
std::map<i2p::data::IdentHash, i2p::stream::StreamingDestination *> m_Destinations;
i2p::stream::StreamingDestination * m_SharedLocalDestination;
std::map<i2p::data::IdentHash, ClientDestination *> m_Destinations;
ClientDestination * m_SharedLocalDestination;
i2p::proxy::HTTPProxy * m_HttpProxy;
i2p::proxy::SOCKSProxy * m_SocksProxy;

91
Destination.cpp

@ -21,6 +21,7 @@ namespace client @@ -21,6 +21,7 @@ namespace client
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
if (m_IsPublic)
LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created");
m_StreamingDestination = new i2p::stream::StreamingDestination (*this); // TODO:
}
ClientDestination::ClientDestination (const std::string& fullPath, bool isPublic):
@ -56,6 +57,7 @@ namespace client @@ -56,6 +57,7 @@ namespace client
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey);
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
m_StreamingDestination = new i2p::stream::StreamingDestination (*this); // TODO:
}
ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic):
@ -67,6 +69,7 @@ namespace client @@ -67,6 +69,7 @@ namespace client
m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel
if (m_IsPublic)
LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created");
m_StreamingDestination = new i2p::stream::StreamingDestination (*this); // TODO:
}
ClientDestination::~ClientDestination ()
@ -79,6 +82,7 @@ namespace client @@ -79,6 +82,7 @@ namespace client
delete m_LeaseSet;
delete m_Work;
delete m_Service;
delete m_StreamingDestination; // TODO
}
void ClientDestination::Run ()
@ -94,10 +98,12 @@ namespace client @@ -94,10 +98,12 @@ namespace client
m_Pool->SetActive (true);
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&ClientDestination::Run, this));
m_StreamingDestination->Start ();
}
void ClientDestination::Stop ()
{
m_StreamingDestination->Stop ();
if (m_Pool)
i2p::tunnel::tunnels.StopTunnelPool (m_Pool);
m_IsRunning = false;
@ -250,7 +256,7 @@ namespace client @@ -250,7 +256,7 @@ namespace client
if (uncompressed->len <= i2p::stream::MAX_PACKET_SIZE)
{
decompressor.Get (uncompressed->buf, uncompressed->len);
HandleNextPacket (uncompressed);
m_StreamingDestination->HandleNextPacket (uncompressed);
}
else
{
@ -286,87 +292,4 @@ namespace client @@ -286,87 +292,4 @@ namespace client
return msg;
}
}
namespace stream
{
void StreamingDestination::Start ()
{
ClientDestination::Start ();
}
void StreamingDestination::Stop ()
{
ResetAcceptor ();
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
for (auto it: m_Streams)
delete it.second;
m_Streams.clear ();
}
ClientDestination::Stop ();
}
void StreamingDestination::HandleNextPacket (Packet * packet)
{
uint32_t sendStreamID = packet->GetSendStreamID ();
if (sendStreamID)
{
auto it = m_Streams.find (sendStreamID);
if (it != m_Streams.end ())
it->second->HandleNextPacket (packet);
else
{
LogPrint ("Unknown stream ", sendStreamID);
delete packet;
}
}
else // new incoming stream
{
auto incomingStream = CreateNewIncomingStream ();
incomingStream->HandleNextPacket (packet);
if (m_Acceptor != nullptr)
m_Acceptor (incomingStream);
else
{
LogPrint ("Acceptor for incoming stream is not set");
DeleteStream (incomingStream);
}
}
}
Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote)
{
Stream * s = new Stream (*GetService (), *this, remote);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
}
Stream * StreamingDestination::CreateNewIncomingStream ()
{
Stream * s = new Stream (*GetService (), *this);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
}
void StreamingDestination::DeleteStream (Stream * stream)
{
if (stream)
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
auto it = m_Streams.find (stream->GetRecvStreamID ());
if (it != m_Streams.end ())
{
m_Streams.erase (it);
if (GetService ())
GetService ()->post ([stream](void) { delete stream; });
else
delete stream;
}
}
}
}
}

52
Destination.h

@ -28,6 +28,7 @@ namespace client @@ -28,6 +28,7 @@ namespace client
bool IsRunning () const { return m_IsRunning; };
boost::asio::io_service * GetService () { return m_Service; };
i2p::tunnel::TunnelPool * GetTunnelPool () { return m_Pool; };
i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; };
bool IsReady () const { return m_LeaseSet && m_LeaseSet->HasNonExpiredLeases (); };
void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; };
@ -52,10 +53,6 @@ namespace client @@ -52,10 +53,6 @@ namespace client
void HandleDataMessage (const uint8_t * buf, size_t len);
I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len);
protected:
virtual void HandleNextPacket (i2p::stream::Packet * packet) = 0; // TODO
private:
void Run ();
@ -77,55 +74,14 @@ namespace client @@ -77,55 +74,14 @@ namespace client
i2p::data::LeaseSet * m_LeaseSet;
bool m_IsPublic;
i2p::stream::StreamingDestination * m_StreamingDestination;
public:
// for HTTP only
int GetNumRemoteLeaseSets () const { return m_RemoteLeaseSets.size (); };
};
}
namespace stream
{
class StreamingDestination: public i2p::client::ClientDestination
{
public:
StreamingDestination (bool isPublic, i2p::data::SigningKeyType sigType):
ClientDestination (isPublic, sigType) {};
StreamingDestination (const std::string& fullPath, bool isPublic):
ClientDestination (fullPath, isPublic) {};
StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic):
ClientDestination (keys, isPublic) {};
~StreamingDestination () {};
void Start ();
void Stop ();
Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream);
void SetAcceptor (const std::function<void (Stream *)>& acceptor) { m_Acceptor = acceptor; };
void ResetAcceptor () { m_Acceptor = nullptr; };
bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
// ClientDestination
void HandleNextPacket (Packet * packet);
private:
Stream * CreateNewIncomingStream ();
private:
std::mutex m_StreamsMutex;
std::map<uint32_t, Stream *> m_Streams;
std::function<void (Stream *)> m_Acceptor;
public:
// for HTTP only
const decltype(m_Streams)& GetStreams () const { return m_Streams; };
};
}
}
}
#endif

6
HTTPServer.cpp

@ -517,7 +517,7 @@ namespace util @@ -517,7 +517,7 @@ namespace util
if (m_Stream)
{
m_Stream->Close ();
i2p::client::context.GetSharedLocalDestination ()->DeleteStream (m_Stream);
i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream);
m_Stream = nullptr;
}
m_Socket->close ();
@ -813,7 +813,7 @@ namespace util @@ -813,7 +813,7 @@ namespace util
}
}
s << "<br><b>Streams:</b><br>";
for (auto it: dest->GetStreams ())
for (auto it: dest->GetStreamingDestination ()->GetStreams ())
{
s << it.first << "->" << it.second->GetRemoteIdentity ().GetIdentHash ().ToBase32 () << ".b32.i2p ";
s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]";
@ -880,7 +880,7 @@ namespace util @@ -880,7 +880,7 @@ namespace util
void HTTPConnection::SendToDestination (const i2p::data::LeaseSet * remote, const char * buf, size_t len)
{
if (!m_Stream)
m_Stream = i2p::client::context.GetSharedLocalDestination ()->CreateNewOutgoingStream (*remote);
m_Stream = i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream (*remote);
if (m_Stream)
{
m_Stream->Send ((uint8_t *)buf, len);

12
I2PTunnel.cpp

@ -14,7 +14,7 @@ namespace client @@ -14,7 +14,7 @@ namespace client
boost::asio::ip::tcp::socket * socket, const i2p::data::LeaseSet * leaseSet):
m_Socket (socket), m_Owner (owner)
{
m_Stream = m_Owner->GetLocalDestination ()->CreateNewOutgoingStream (*leaseSet);
m_Stream = m_Owner->GetLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream (*leaseSet);
m_Stream->Send (m_Buffer, 0); // connect
StreamReceive ();
Receive ();
@ -39,7 +39,7 @@ namespace client @@ -39,7 +39,7 @@ namespace client
if (m_Stream)
{
m_Stream->Close ();
m_Owner->GetLocalDestination ()->DeleteStream (m_Stream);
m_Owner->GetLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream);
m_Stream = nullptr;
}
m_Socket->close ();
@ -115,7 +115,7 @@ namespace client @@ -115,7 +115,7 @@ namespace client
if (ecode != boost::asio::error::operation_aborted)
{
if (m_Stream) m_Stream->Close ();
m_Owner->GetLocalDestination ()->DeleteStream (m_Stream);
m_Owner->GetLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream);
m_Stream = nullptr;
}
}
@ -145,7 +145,7 @@ namespace client @@ -145,7 +145,7 @@ namespace client
}
I2PClientTunnel::I2PClientTunnel (boost::asio::io_service& service, const std::string& destination,
int port, i2p::stream::StreamingDestination * localDestination):
int port, ClientDestination * localDestination):
I2PTunnel (service, localDestination ? localDestination :
i2p::client::context.CreateNewLocalDestination (false, i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256)),
m_Acceptor (service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)),
@ -251,7 +251,7 @@ namespace client @@ -251,7 +251,7 @@ namespace client
}
I2PServerTunnel::I2PServerTunnel (boost::asio::io_service& service, const std::string& address, int port,
i2p::stream::StreamingDestination * localDestination): I2PTunnel (service, localDestination),
ClientDestination * localDestination): I2PTunnel (service, localDestination),
m_Endpoint (boost::asio::ip::address::from_string (address), port)
{
}
@ -270,7 +270,7 @@ namespace client @@ -270,7 +270,7 @@ namespace client
{
auto localDestination = GetLocalDestination ();
if (localDestination)
localDestination->SetAcceptor (std::bind (&I2PServerTunnel::HandleAccept, this, std::placeholders::_1));
localDestination->GetStreamingDestination ()->SetAcceptor (std::bind (&I2PServerTunnel::HandleAccept, this, std::placeholders::_1));
else
LogPrint ("Local destination not set for server tunnel");
}

13
I2PTunnel.h

@ -6,6 +6,7 @@ @@ -6,6 +6,7 @@
#include <set>
#include <boost/asio.hpp>
#include "Identity.h"
#include "Destination.h"
#include "Streaming.h"
namespace i2p
@ -51,22 +52,22 @@ namespace client @@ -51,22 +52,22 @@ namespace client
{
public:
I2PTunnel (boost::asio::io_service& service, i2p::stream::StreamingDestination * localDestination):
I2PTunnel (boost::asio::io_service& service, ClientDestination * localDestination):
m_Service (service), m_LocalDestination (localDestination) {};
virtual ~I2PTunnel () { ClearConnections (); };
void AddConnection (I2PTunnelConnection * conn);
void RemoveConnection (I2PTunnelConnection * conn);
void ClearConnections ();
i2p::stream::StreamingDestination * GetLocalDestination () { return m_LocalDestination; };
void SetLocalDestination (i2p::stream::StreamingDestination * dest) { m_LocalDestination = dest; };
ClientDestination * GetLocalDestination () { return m_LocalDestination; };
void SetLocalDestination (ClientDestination * dest) { m_LocalDestination = dest; };
boost::asio::io_service& GetService () { return m_Service; };
private:
boost::asio::io_service& m_Service;
i2p::stream::StreamingDestination * m_LocalDestination;
ClientDestination * m_LocalDestination;
std::set<I2PTunnelConnection *> m_Connections;
};
@ -75,7 +76,7 @@ namespace client @@ -75,7 +76,7 @@ namespace client
public:
I2PClientTunnel (boost::asio::io_service& service, const std::string& destination, int port,
i2p::stream::StreamingDestination * localDestination = nullptr);
ClientDestination * localDestination = nullptr);
~I2PClientTunnel ();
void Start ();
@ -102,7 +103,7 @@ namespace client @@ -102,7 +103,7 @@ namespace client
public:
I2PServerTunnel (boost::asio::io_service& service, const std::string& address, int port,
i2p::stream::StreamingDestination * localDestination);
ClientDestination * localDestination);
void Start ();
void Stop ();

16
SAM.cpp

@ -26,7 +26,7 @@ namespace client @@ -26,7 +26,7 @@ namespace client
{
m_Stream->Close ();
if (m_Session && m_Session->localDestination)
m_Session->localDestination->DeleteStream (m_Stream);
m_Session->localDestination->GetStreamingDestination ()->DeleteStream (m_Stream);
}
}
@ -36,7 +36,7 @@ namespace client @@ -36,7 +36,7 @@ namespace client
{
m_Stream->Close ();
if (m_Session && m_Session->localDestination)
m_Session->localDestination->DeleteStream (m_Stream);
m_Session->localDestination->GetStreamingDestination ()->DeleteStream (m_Stream);
m_Stream = nullptr;
}
switch (m_SocketType)
@ -55,7 +55,7 @@ namespace client @@ -55,7 +55,7 @@ namespace client
if (m_Session)
{
m_Session->sockets.remove (this);
m_Session->localDestination->ResetAcceptor ();
m_Session->localDestination->GetStreamingDestination ()->ResetAcceptor ();
}
break;
}
@ -295,7 +295,7 @@ namespace client @@ -295,7 +295,7 @@ namespace client
{
m_SocketType = eSAMSocketTypeStream;
m_Session->sockets.push_back (this);
m_Stream = m_Session->localDestination->CreateNewOutgoingStream (remote);
m_Stream = m_Session->localDestination->GetStreamingDestination ()->CreateNewOutgoingStream (remote);
m_Stream->Send ((uint8_t *)m_Buffer, 0); // connect
I2PReceive ();
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
@ -344,11 +344,11 @@ namespace client @@ -344,11 +344,11 @@ namespace client
m_Session = m_Owner.FindSession (id);
if (m_Session)
{
if (!m_Session->localDestination->IsAcceptorSet ())
if (!m_Session->localDestination->GetStreamingDestination ()->IsAcceptorSet ())
{
m_SocketType = eSAMSocketTypeAcceptor;
m_Session->sockets.push_back (this);
m_Session->localDestination->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1));
m_Session->localDestination->GetStreamingDestination ()->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1));
SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false);
}
else
@ -507,7 +507,7 @@ namespace client @@ -507,7 +507,7 @@ namespace client
m_Stream = stream;
auto session = m_Owner.FindSession (m_ID);
if (session)
session->localDestination->ResetAcceptor ();
session->localDestination->GetStreamingDestination ()->ResetAcceptor ();
if (!m_IsSilent)
{
// send remote peer address
@ -595,7 +595,7 @@ namespace client @@ -595,7 +595,7 @@ namespace client
SAMSession * SAMBridge::CreateSession (const std::string& id, const std::string& destination)
{
i2p::stream::StreamingDestination * localDestination = nullptr;
ClientDestination * localDestination = nullptr;
if (destination != "")
{
uint8_t * buf = new uint8_t[destination.length ()];

3
SAM.h

@ -11,6 +11,7 @@ @@ -11,6 +11,7 @@
#include "Identity.h"
#include "LeaseSet.h"
#include "Streaming.h"
#include "Destination.h"
namespace i2p
{
@ -115,7 +116,7 @@ namespace client @@ -115,7 +116,7 @@ namespace client
struct SAMSession
{
i2p::stream::StreamingDestination * localDestination;
ClientDestination * localDestination;
std::list<SAMSocket *> sockets;
};

2
SOCKS.cpp

@ -224,7 +224,7 @@ namespace proxy @@ -224,7 +224,7 @@ namespace proxy
void SOCKS4AHandler::SentConnectionSuccess(const boost::system::error_code & ecode)
{
LogPrint("--- socks4a making connection");
m_stream = i2p::client::context.GetSharedLocalDestination ()->CreateNewOutgoingStream(*m_ls);
m_stream = i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream(*m_ls);
m_state = OKAY;
LogPrint("--- socks4a state is ", m_state);
AsyncSockRead();

102
Streaming.cpp

@ -115,7 +115,7 @@ namespace stream @@ -115,7 +115,7 @@ namespace stream
{
// we have received duplicate. Most likely our outbound tunnel is dead
LogPrint ("Duplicate message ", receivedSeqn, " received");
m_LocalDestination.ResetCurrentOutboundTunnel (); // pick another outbound tunnel
m_LocalDestination.GetOwner ().ResetCurrentOutboundTunnel (); // pick another outbound tunnel
UpdateCurrentRemoteLease (); // pick another lease
SendQuickAck (); // resend ack for previous message again
delete packet; // packet dropped
@ -274,11 +274,11 @@ namespace stream @@ -274,11 +274,11 @@ namespace stream
if (isNoAck) flags |= PACKET_FLAG_NO_ACK;
*(uint16_t *)(packet + size) = htobe16 (flags);
size += 2; // flags
size_t identityLen = m_LocalDestination.GetIdentity ().GetFullLen ();
size_t signatureLen = m_LocalDestination.GetIdentity ().GetSignatureLen ();
size_t identityLen = m_LocalDestination.GetOwner ().GetIdentity ().GetFullLen ();
size_t signatureLen = m_LocalDestination.GetOwner ().GetIdentity ().GetSignatureLen ();
*(uint16_t *)(packet + size) = htobe16 (identityLen + signatureLen + 2); // identity + signature + packet size
size += 2; // options size
m_LocalDestination.GetIdentity ().ToBuffer (packet + size, identityLen);
m_LocalDestination.GetOwner ().GetIdentity ().ToBuffer (packet + size, identityLen);
size += identityLen; // from
*(uint16_t *)(packet + size) = htobe16 (STREAMING_MTU);
size += 2; // max packet size
@ -291,7 +291,7 @@ namespace stream @@ -291,7 +291,7 @@ namespace stream
buf += sentLen;
len -= sentLen;
size += sentLen; // payload
m_LocalDestination.Sign (packet, size, signature);
m_LocalDestination.GetOwner ().Sign (packet, size, signature);
}
else
{
@ -362,13 +362,13 @@ namespace stream @@ -362,13 +362,13 @@ namespace stream
size++; // resend delay
*(uint16_t *)(packet + size) = htobe16 (PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED);
size += 2; // flags
size_t signatureLen = m_LocalDestination.GetIdentity ().GetSignatureLen ();
size_t signatureLen = m_LocalDestination.GetOwner ().GetIdentity ().GetSignatureLen ();
*(uint16_t *)(packet + size) = htobe16 (signatureLen); // signature only
size += 2; // options size
uint8_t * signature = packet + size;
memset (packet + size, 0, signatureLen);
size += signatureLen; // signature
m_LocalDestination.Sign (packet, size, signature);
m_LocalDestination.GetOwner ().Sign (packet, size, signature);
p->len = size;
SendPacket (p);
@ -441,7 +441,7 @@ namespace stream @@ -441,7 +441,7 @@ namespace stream
for (auto it: packets)
{
auto msg = m_RoutingSession->WrapSingleMessage (
m_LocalDestination.CreateDataMessage (it->GetBuffer (), it->GetLength ()));
m_LocalDestination.GetOwner ().CreateDataMessage (it->GetBuffer (), it->GetLength ()));
msgs.push_back (i2p::tunnel::TunnelMessageBlock
{
i2p::tunnel::eDeliveryTypeTunnel,
@ -450,7 +450,7 @@ namespace stream @@ -450,7 +450,7 @@ namespace stream
});
m_NumSentBytes += it->GetLength ();
}
m_LocalDestination.SendTunnelDataMsgs (msgs);
m_LocalDestination.GetOwner ().SendTunnelDataMsgs (msgs);
}
else
LogPrint ("All leases are expired");
@ -484,7 +484,7 @@ namespace stream @@ -484,7 +484,7 @@ namespace stream
}
if (packets.size () > 0)
{
m_LocalDestination.ResetCurrentOutboundTunnel (); // pick another outbound tunnel
m_LocalDestination.GetOwner ().ResetCurrentOutboundTunnel (); // pick another outbound tunnel
UpdateCurrentRemoteLease (); // pick another lease
SendPackets (packets);
}
@ -506,14 +506,14 @@ namespace stream @@ -506,14 +506,14 @@ namespace stream
{
if (!m_RemoteLeaseSet)
{
m_RemoteLeaseSet = m_LocalDestination.FindLeaseSet (m_RemoteIdentity.GetIdentHash ());
m_RemoteLeaseSet = m_LocalDestination.GetOwner ().FindLeaseSet (m_RemoteIdentity.GetIdentHash ());
if (!m_RemoteLeaseSet)
LogPrint ("LeaseSet ", m_RemoteIdentity.GetIdentHash ().ToBase64 (), " not found");
}
if (m_RemoteLeaseSet)
{
if (!m_RoutingSession)
m_RoutingSession = m_LocalDestination.GetRoutingSession (*m_RemoteLeaseSet, 32);
m_RoutingSession = m_LocalDestination.GetOwner ().GetRoutingSession (*m_RemoteLeaseSet, 32);
auto leases = m_RemoteLeaseSet->GetNonExpiredLeases ();
if (!leases.empty ())
{
@ -522,12 +522,88 @@ namespace stream @@ -522,12 +522,88 @@ namespace stream
}
else
{
m_RemoteLeaseSet = m_LocalDestination.FindLeaseSet (m_RemoteIdentity.GetIdentHash ()); // re-request expired
m_RemoteLeaseSet = m_LocalDestination.GetOwner ().FindLeaseSet (m_RemoteIdentity.GetIdentHash ()); // re-request expired
m_CurrentRemoteLease.endDate = 0;
}
}
else
m_CurrentRemoteLease.endDate = 0;
}
void StreamingDestination::Start ()
{
}
void StreamingDestination::Stop ()
{
ResetAcceptor ();
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
for (auto it: m_Streams)
delete it.second;
m_Streams.clear ();
}
}
void StreamingDestination::HandleNextPacket (Packet * packet)
{
uint32_t sendStreamID = packet->GetSendStreamID ();
if (sendStreamID)
{
auto it = m_Streams.find (sendStreamID);
if (it != m_Streams.end ())
it->second->HandleNextPacket (packet);
else
{
LogPrint ("Unknown stream ", sendStreamID);
delete packet;
}
}
else // new incoming stream
{
auto incomingStream = CreateNewIncomingStream ();
incomingStream->HandleNextPacket (packet);
if (m_Acceptor != nullptr)
m_Acceptor (incomingStream);
else
{
LogPrint ("Acceptor for incoming stream is not set");
DeleteStream (incomingStream);
}
}
}
Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote)
{
Stream * s = new Stream (*m_Owner.GetService (), *this, remote);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
}
Stream * StreamingDestination::CreateNewIncomingStream ()
{
Stream * s = new Stream (*m_Owner.GetService (), *this);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
}
void StreamingDestination::DeleteStream (Stream * stream)
{
if (stream)
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
auto it = m_Streams.find (stream->GetRecvStreamID ());
if (it != m_Streams.end ())
{
m_Streams.erase (it);
if (m_Owner.GetService ())
m_Owner.GetService ()->post ([stream](void) { delete stream; });
else
delete stream;
}
}
}
}
}

41
Streaming.h

@ -18,6 +18,10 @@ @@ -18,6 +18,10 @@
namespace i2p
{
namespace client
{
class ClientDestination;
}
namespace stream
{
const uint16_t PACKET_FLAG_SYNCHRONIZE = 0x0001;
@ -141,6 +145,43 @@ namespace stream @@ -141,6 +145,43 @@ namespace stream
size_t m_NumSentBytes, m_NumReceivedBytes;
};
class StreamingDestination
{
public:
StreamingDestination (i2p::client::ClientDestination& owner): m_Owner (owner) {};
~StreamingDestination () {};
void Start ();
void Stop ();
Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote);
void DeleteStream (Stream * stream);
void SetAcceptor (const std::function<void (Stream *)>& acceptor) { m_Acceptor = acceptor; };
void ResetAcceptor () { m_Acceptor = nullptr; };
bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
// ClientDestination
i2p::client::ClientDestination& GetOwner () { return m_Owner; };
void HandleNextPacket (Packet * packet);
private:
Stream * CreateNewIncomingStream ();
private:
i2p::client::ClientDestination& m_Owner;
std::mutex m_StreamsMutex;
std::map<uint32_t, Stream *> m_Streams;
std::function<void (Stream *)> m_Acceptor;
public:
// for HTTP only
const decltype(m_Streams)& GetStreams () const { return m_Streams; };
};
//-------------------------------------------------
template<typename Buffer, typename ReceiveHandler>

Loading…
Cancel
Save