Browse Source

Merge pull request #881 from majestrate/ntcp-socks

NTCP SOCKS/HTTP Proxy support
pull/883/head
orignal 8 years ago committed by GitHub
parent
commit
36c4719570
  1. 1
      daemon/Daemon.cpp
  2. 1
      libi2pd/Config.cpp
  3. 265
      libi2pd/NTCPSession.cpp
  4. 30
      libi2pd/NTCPSession.h
  5. 54
      libi2pd/Transports.cpp

1
daemon/Daemon.cpp

@ -269,6 +269,7 @@ namespace i2p @@ -269,6 +269,7 @@ namespace i2p
LogPrint(eLogInfo, "Daemon: starting Transports");
if(!ssu) LogPrint(eLogInfo, "Daemon: ssu disabled");
if(!ntcp) LogPrint(eLogInfo, "Daemon: ntcp disabled");
i2p::transport::transports.Start(ntcp, ssu);
if (i2p::transport::transports.IsBoundNTCP() || i2p::transport::transports.IsBoundSSU()) {
LogPrint(eLogInfo, "Daemon: Transports started");

1
libi2pd/Config.cpp

@ -57,6 +57,7 @@ namespace config { @@ -57,6 +57,7 @@ namespace config {
("share", value<int>()->default_value(100), "Limit of transit traffic from max bandwidth in percents. (default: 100")
("ntcp", value<bool>()->zero_tokens()->default_value(true), "Enable NTCP transport")
("ssu", value<bool>()->zero_tokens()->default_value(true), "Enable SSU transport")
("ntcpproxy", value<std::string>()->default_value(""), "proxy url for ntcp transport")
#ifdef _WIN32
("svcctl", value<std::string>()->default_value(""), "Windows service management ('install' or 'remove')")
("insomnia", value<bool>()->zero_tokens()->default_value(false), "Prevent system from sleeping")

265
libi2pd/NTCPSession.cpp

@ -12,6 +12,7 @@ @@ -12,6 +12,7 @@
#include "Transports.h"
#include "NetDb.hpp"
#include "NTCPSession.h"
#include "HTTP.h"
#ifdef WITH_EVENTS
#include "Event.h"
#endif
@ -788,7 +789,8 @@ namespace transport @@ -788,7 +789,8 @@ namespace transport
//-----------------------------------------
NTCPServer::NTCPServer ():
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service),
m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr)
m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr),
m_ProxyType(eNoProxy), m_Resolver(m_Service), m_ProxyEndpoint(nullptr)
{
}
@ -803,6 +805,24 @@ namespace transport @@ -803,6 +805,24 @@ namespace transport
{
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&NTCPServer::Run, this));
// we are using a proxy, don't create any acceptors
if(UsingProxy())
{
// TODO: resolve proxy until it is resolved
boost::asio::ip::tcp::resolver::query q(m_ProxyAddress, std::to_string(m_ProxyPort));
boost::system::error_code e;
auto itr = m_Resolver.resolve(q, e);
if(e)
{
LogPrint(eLogError, "NTCP: Failed to resolve proxy ", e.message());
}
else
{
m_ProxyEndpoint = new boost::asio::ip::tcp::endpoint(*itr);
}
}
else
{
// create acceptors
auto& addresses = context.GetRouterInfo ().GetAddresses ();
for (const auto& address: addresses)
@ -814,8 +834,7 @@ namespace transport @@ -814,8 +834,7 @@ namespace transport
{
try
{
m_NTCPAcceptor = new boost::asio::ip::tcp::acceptor (m_Service,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address->port));
m_NTCPAcceptor = new boost::asio::ip::tcp::acceptor (m_Service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), address->port));
} catch ( std::exception & ex ) {
/** fail to bind ip4 */
LogPrint(eLogError, "NTCP: Failed to bind to ip4 port ",address->port, ex.what());
@ -824,8 +843,7 @@ namespace transport @@ -824,8 +843,7 @@ namespace transport
LogPrint (eLogInfo, "NTCP: Start listening TCP port ", address->port);
auto conn = std::make_shared<NTCPSession>(*this);
m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this,
conn, std::placeholders::_1));
m_NTCPAcceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAccept, this, conn, std::placeholders::_1));
}
else if (address->host.is_v6() && context.SupportsV6 ())
{
@ -834,14 +852,12 @@ namespace transport @@ -834,14 +852,12 @@ namespace transport
{
m_NTCPV6Acceptor->open (boost::asio::ip::tcp::v6());
m_NTCPV6Acceptor->set_option (boost::asio::ip::v6_only (true));
m_NTCPV6Acceptor->bind (boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v6(), address->port));
m_NTCPV6Acceptor->listen ();
LogPrint (eLogInfo, "NTCP: Start listening V6 TCP port ", address->port);
auto conn = std::make_shared<NTCPSession> (*this);
m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6,
this, conn, std::placeholders::_1));
m_NTCPV6Acceptor->async_accept(conn->GetSocket (), std::bind (&NTCPServer::HandleAcceptV6, this, conn, std::placeholders::_1));
} catch ( std::exception & ex ) {
LogPrint(eLogError, "NTCP: failed to bind to ip6 port ", address->port);
continue;
@ -849,6 +865,7 @@ namespace transport @@ -849,6 +865,7 @@ namespace transport
}
}
}
}
ScheduleTermination ();
}
}
@ -886,6 +903,11 @@ namespace transport @@ -886,6 +903,11 @@ namespace transport
delete m_Thread;
m_Thread = nullptr;
}
if(m_ProxyEndpoint)
{
delete m_ProxyEndpoint;
m_ProxyEndpoint = nullptr;
}
}
}
@ -989,25 +1011,50 @@ namespace transport @@ -989,25 +1011,50 @@ namespace transport
}
}
void NTCPServer::Connect (const boost::asio::ip::address& address, int port, std::shared_ptr<NTCPSession> conn)
void NTCPServer::Connect(const boost::asio::ip::address & address, uint16_t port, std::shared_ptr<NTCPSession> conn)
{
LogPrint (eLogDebug, "NTCP: Connecting to ", address ,":", port);
m_Service.post([=]()
{
m_Service.post([=]() {
if (this->AddNTCPSession (conn))
{
auto timer = std::make_shared<boost::asio::deadline_timer>(m_Service);
timer->expires_from_now (boost::posix_time::seconds(NTCP_CONNECT_TIMEOUT));
timer->async_wait ([conn](const boost::system::error_code& ecode)
{
timer->async_wait ([conn](const boost::system::error_code& ecode) {
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogInfo, "NTCP: Not connected in ", NTCP_CONNECT_TIMEOUT, " seconds");
conn->Terminate ();
}
});
conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port),
std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn, timer));
conn->GetSocket ().async_connect (boost::asio::ip::tcp::endpoint (address, port), std::bind (&NTCPServer::HandleConnect, this, std::placeholders::_1, conn, timer));
}
});
}
void NTCPServer::ConnectWithProxy (const std::string& host, uint16_t port, RemoteAddressType addrtype, std::shared_ptr<NTCPSession> conn)
{
if(m_ProxyEndpoint == nullptr)
{
return;
}
m_Service.post([=]() {
if (this->AddNTCPSession (conn))
{
auto timer = std::make_shared<boost::asio::deadline_timer>(m_Service);
auto timeout = NTCP_CONNECT_TIMEOUT * 5;
conn->SetTerminationTimeout(timeout * 2);
timer->expires_from_now (boost::posix_time::seconds(timeout));
timer->async_wait ([conn, timeout](const boost::system::error_code& ecode) {
if (ecode != boost::asio::error::operation_aborted)
{
LogPrint (eLogInfo, "NTCP: Not connected in ", timeout, " seconds");
i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ()->GetIdentHash (), true);
conn->Terminate ();
}
});
conn->GetSocket ().async_connect (*m_ProxyEndpoint, std::bind (&NTCPServer::HandleProxyConnect, this, std::placeholders::_1, conn, timer, host, port, addrtype));
}
});
}
@ -1031,6 +1078,191 @@ namespace transport @@ -1031,6 +1078,191 @@ namespace transport
}
}
void NTCPServer::UseProxy(ProxyType proxytype, const std::string & addr, uint16_t port)
{
m_ProxyType = proxytype;
m_ProxyAddress = addr;
m_ProxyPort = port;
}
void NTCPServer::HandleProxyConnect(const boost::system::error_code& ecode, std::shared_ptr<NTCPSession> conn, std::shared_ptr<boost::asio::deadline_timer> timer, const std::string & host, uint16_t port, RemoteAddressType addrtype)
{
if(ecode)
{
LogPrint(eLogWarning, "NTCP: failed to connect to proxy ", ecode.message());
timer->cancel();
conn->Terminate();
return;
}
if(m_ProxyType == eSocksProxy)
{
// TODO: support username/password auth etc
uint8_t buff[3] = {0x05, 0x01, 0x00};
boost::asio::async_write(conn->GetSocket(), boost::asio::buffer(buff, 3), boost::asio::transfer_all(), [=] (const boost::system::error_code & ec, std::size_t transferred) {
(void) transferred;
if(ec)
{
LogPrint(eLogWarning, "NTCP: socks5 write error ", ec.message());
}
});
uint8_t readbuff[2];
boost::asio::async_read(conn->GetSocket(), boost::asio::buffer(readbuff, 2), [=](const boost::system::error_code & ec, std::size_t transferred) {
if(ec)
{
LogPrint(eLogError, "NTCP: socks5 read error ", ec.message());
timer->cancel();
conn->Terminate();
return;
}
else if(transferred == 2)
{
if(readbuff[1] == 0x00)
{
AfterSocksHandshake(conn, timer, host, port, addrtype);
return;
}
else if (readbuff[1] == 0xff)
{
LogPrint(eLogError, "NTCP: socks5 proxy rejected authentication");
timer->cancel();
conn->Terminate();
return;
}
}
LogPrint(eLogError, "NTCP: socks5 server gave invalid response");
timer->cancel();
conn->Terminate();
});
}
else if(m_ProxyType == eHTTPProxy)
{
i2p::http::HTTPReq req;
req.method = "CONNECT";
req.version ="HTTP/1.1";
if(addrtype == eIP6Address)
req.uri = "[" + host + "]:" + std::to_string(port);
else
req.uri = host + ":" + std::to_string(port);
boost::asio::streambuf writebuff;
std::ostream out(&writebuff);
out << req.to_string();
boost::asio::async_write(conn->GetSocket(), writebuff.data(), boost::asio::transfer_all(), [=](const boost::system::error_code & ec, std::size_t transferred) {
(void) transferred;
if(ec)
LogPrint(eLogError, "NTCP: http proxy write error ", ec.message());
});
boost::asio::streambuf * readbuff = new boost::asio::streambuf;
boost::asio::async_read_until(conn->GetSocket(), *readbuff, "\r\n\r\n", [=] (const boost::system::error_code & ec, std::size_t transferred) {
if(ec)
{
LogPrint(eLogError, "NTCP: http proxy read error ", ec.message());
timer->cancel();
conn->Terminate();
}
else
{
readbuff->commit(transferred);
i2p::http::HTTPRes res;
if(res.parse(boost::asio::buffer_cast<const char*>(readbuff->data()), readbuff->size()) > 0)
{
if(res.code == 200)
{
timer->cancel();
conn->ClientLogin();
delete readbuff;
return;
}
else
{
LogPrint(eLogError, "NTCP: http proxy rejected request ", res.code);
}
}
else
LogPrint(eLogError, "NTCP: http proxy gave malformed response");
timer->cancel();
conn->Terminate();
delete readbuff;
}
});
}
else
LogPrint(eLogError, "NTCP: unknown proxy type, invalid state");
}
void NTCPServer::AfterSocksHandshake(std::shared_ptr<NTCPSession> conn, std::shared_ptr<boost::asio::deadline_timer> timer, const std::string & host, uint16_t port, RemoteAddressType addrtype)
{
// build request
size_t sz = 0;
uint8_t buff[256];
uint8_t readbuff[256];
buff[0] = 0x05;
buff[1] = 0x01;
buff[2] = 0x00;
if(addrtype == eIP4Address)
{
buff[3] = 0x01;
auto addr = boost::asio::ip::address::from_string(host).to_v4();
auto addrbytes = addr.to_bytes();
auto addrsize = addrbytes.size();
memcpy(buff+4, addrbytes.data(), addrsize);
}
else if (addrtype == eIP6Address)
{
buff[3] = 0x04;
auto addr = boost::asio::ip::address::from_string(host).to_v6();
auto addrbytes = addr.to_bytes();
auto addrsize = addrbytes.size();
memcpy(buff+4, addrbytes.data(), addrsize);
}
else if (addrtype == eHostname)
{
buff[3] = 0x03;
size_t addrsize = host.size();
sz = addrsize + 1 + 4;
if (2 + sz > sizeof(buff))
{
// too big
return;
}
buff[4] = (uint8_t) addrsize;
memcpy(buff+4, host.c_str(), addrsize);
}
htobe16buf(buff+sz, port);
sz += 2;
boost::asio::async_write(conn->GetSocket(), boost::asio::buffer(buff, sz), boost::asio::transfer_all(), [=](const boost::system::error_code & ec, std::size_t written) {
if(ec)
{
LogPrint(eLogError, "NTCP: failed to write handshake to socks proxy ", ec.message());
return;
}
});
boost::asio::async_read(conn->GetSocket(), boost::asio::buffer(readbuff, sz), [=](const boost::system::error_code & e, std::size_t transferred) {
if(e)
{
LogPrint(eLogError, "NTCP: socks proxy read error ", e.message());
}
else if(transferred == sz)
{
if( readbuff[1] == 0x00)
{
timer->cancel();
conn->ClientLogin();
return;
}
}
if(!e)
i2p::data::netdb.SetUnreachable (conn->GetRemoteIdentity ()->GetIdentHash (), true);
timer->cancel();
conn->Terminate();
});
}
void NTCPServer::ScheduleTermination ()
{
m_TerminationTimer.expires_from_now (boost::posix_time::seconds(NTCP_TERMINATION_CHECK_TIMEOUT));
@ -1049,8 +1281,7 @@ namespace transport @@ -1049,8 +1281,7 @@ namespace transport
{
auto session = it.second;
// Termniate modifies m_NTCPSession, so we postpone it
m_Service.post ([session]
{
m_Service.post ([session] {
LogPrint (eLogDebug, "NTCP: No activity for ", session->GetTerminationTimeout (), " seconds");
session->Terminate ();
});

30
libi2pd/NTCPSession.h

@ -131,6 +131,21 @@ namespace transport @@ -131,6 +131,21 @@ namespace transport
{
public:
enum RemoteAddressType
{
eIP4Address,
eIP6Address,
eHostname
};
enum ProxyType
{
eNoProxy,
eSocksProxy,
eHTTPProxy
};
NTCPServer ();
~NTCPServer ();
@ -140,10 +155,15 @@ namespace transport @@ -140,10 +155,15 @@ namespace transport
bool AddNTCPSession (std::shared_ptr<NTCPSession> session);
void RemoveNTCPSession (std::shared_ptr<NTCPSession> session);
std::shared_ptr<NTCPSession> FindNTCPSession (const i2p::data::IdentHash& ident);
void Connect (const boost::asio::ip::address& address, int port, std::shared_ptr<NTCPSession> conn);
void ConnectWithProxy (const std::string& addr, uint16_t port, RemoteAddressType addrtype, std::shared_ptr<NTCPSession> conn);
void Connect(const boost::asio::ip::address & address, uint16_t port, std::shared_ptr<NTCPSession> conn);
bool IsBoundV4() const { return m_NTCPAcceptor != nullptr; };
bool IsBoundV6() const { return m_NTCPV6Acceptor != nullptr; };
bool NetworkIsReady() const { return IsBoundV4() || IsBoundV6() || UsingProxy(); };
bool UsingProxy() const { return m_ProxyType != eNoProxy; };
void UseProxy(ProxyType proxy, const std::string & address, uint16_t port);
boost::asio::io_service& GetService () { return m_Service; };
@ -155,6 +175,9 @@ namespace transport @@ -155,6 +175,9 @@ namespace transport
void HandleConnect (const boost::system::error_code& ecode, std::shared_ptr<NTCPSession> conn, std::shared_ptr<boost::asio::deadline_timer> timer);
void HandleProxyConnect(const boost::system::error_code& ecode, std::shared_ptr<NTCPSession> conn, std::shared_ptr<boost::asio::deadline_timer> timer, const std::string & host, uint16_t port, RemoteAddressType adddrtype);
void AfterSocksHandshake(std::shared_ptr<NTCPSession> conn, std::shared_ptr<boost::asio::deadline_timer> timer, const std::string & host, uint16_t port, RemoteAddressType adddrtype);
// timer
void ScheduleTermination ();
void HandleTerminationTimer (const boost::system::error_code& ecode);
@ -170,6 +193,11 @@ namespace transport @@ -170,6 +193,11 @@ namespace transport
std::map<i2p::data::IdentHash, std::shared_ptr<NTCPSession> > m_NTCPSessions; // access from m_Thread only
std::list<std::shared_ptr<NTCPSession> > m_PendingIncomingSessions;
ProxyType m_ProxyType;
std::string m_ProxyAddress;
uint16_t m_ProxyPort;
boost::asio::ip::tcp::resolver m_Resolver;
boost::asio::ip::tcp::endpoint * m_ProxyEndpoint;
public:
// for HTTP/I2PControl

54
libi2pd/Transports.cpp

@ -5,6 +5,7 @@ @@ -5,6 +5,7 @@
#include "NetDb.hpp"
#include "Transports.h"
#include "Config.h"
#include "HTTP.h"
#ifdef WITH_EVENTS
#include "Event.h"
#include "util.h"
@ -144,6 +145,39 @@ namespace transport @@ -144,6 +145,39 @@ namespace transport
m_DHKeysPairSupplier.Start ();
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&Transports::Run, this));
std::string ntcpproxy; i2p::config::GetOption("ntcpproxy", ntcpproxy);
i2p::http::URL proxyurl;
if(ntcpproxy.size() && enableNTCP)
{
if(proxyurl.parse(ntcpproxy))
{
if(proxyurl.schema == "socks" || proxyurl.schema == "http")
{
m_NTCPServer = new NTCPServer();
NTCPServer::ProxyType proxytype = NTCPServer::eSocksProxy;
if (proxyurl.schema == "http")
proxytype = NTCPServer::eHTTPProxy;
m_NTCPServer->UseProxy(proxytype, proxyurl.host, proxyurl.port) ;
m_NTCPServer->Start();
if(!m_NTCPServer->NetworkIsReady())
{
LogPrint(eLogError, "Transports: NTCP failed to start with proxy");
m_NTCPServer->Stop();
delete m_NTCPServer;
m_NTCPServer = nullptr;
}
}
else
LogPrint(eLogError, "Transports: unsupported NTCP proxy URL ", ntcpproxy);
}
else
LogPrint(eLogError, "Transports: invalid NTCP proxy url ", ntcpproxy);
return;
}
// create acceptors
auto& addresses = context.GetRouterInfo ().GetAddresses ();
for (const auto& address : addresses)
@ -350,6 +384,17 @@ namespace transport @@ -350,6 +384,17 @@ namespace transport
if (!peer.router->UsesIntroducer () && !peer.router->IsUnreachable ())
{
auto s = std::make_shared<NTCPSession> (*m_NTCPServer, peer.router);
if(m_NTCPServer->UsingProxy())
{
NTCPServer::RemoteAddressType remote = NTCPServer::eIP4Address;
std::string addr = address->host.to_string();
if(address->host.is_v6())
remote = NTCPServer::eIP6Address;
m_NTCPServer->ConnectWithProxy(addr, address->port, remote, s);
}
else
m_NTCPServer->Connect (address->host, address->port, s);
return true;
}
@ -357,9 +402,17 @@ namespace transport @@ -357,9 +402,17 @@ namespace transport
else // we don't have address
{
if (address->addressString.length () > 0) // trying to resolve
{
if(m_NTCPServer->UsingProxy())
{
auto s = std::make_shared<NTCPSession> (*m_NTCPServer, peer.router);
m_NTCPServer->ConnectWithProxy(address->addressString, address->port, NTCPServer::eHostname, s);
}
else
{
LogPrint (eLogDebug, "Transports: Resolving NTCP ", address->addressString);
NTCPResolve (address->addressString, ident);
}
return true;
}
}
@ -814,4 +867,3 @@ namespace transport @@ -814,4 +867,3 @@ namespace transport
}
}
}

Loading…
Cancel
Save