diff --git a/Destination.cpp b/Destination.cpp index b0e33d63..eb793d7f 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -268,10 +268,10 @@ namespace client } } - i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote) + i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote, int port) { if (m_StreamingDestination) - return m_StreamingDestination->CreateNewOutgoingStream (remote); + return m_StreamingDestination->CreateNewOutgoingStream (remote, port); return nullptr; } diff --git a/Destination.h b/Destination.h index b605cc9d..35a1653b 100644 --- a/Destination.h +++ b/Destination.h @@ -41,7 +41,7 @@ namespace client // streaming i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; }; - i2p::stream::Stream * CreateStream (const i2p::data::LeaseSet& remote); + i2p::stream::Stream * CreateStream (const i2p::data::LeaseSet& remote, int port = 0); void AcceptStreams (const std::function& acceptor); void StopAcceptingStreams (); bool IsAcceptingStreams () const; diff --git a/HTTPProxy.cpp b/HTTPProxy.cpp index 7ad64cfa..a3d95842 100644 --- a/HTTPProxy.cpp +++ b/HTTPProxy.cpp @@ -52,10 +52,11 @@ namespace proxy } path=m[4].str(); } - LogPrint("server is: ",server, "\n path is: ",path); + LogPrint("server is: ",server, " port is: ", port, "\n path is: ",path); r.uri = path; r.method = method; r.host = server; + r.port = boost::lexical_cast(port); } @@ -77,8 +78,8 @@ namespace proxy } } - LogPrint("Requesting ", r.host, " with path ", r.uri, " and method ", r.method); - SendToAddress (r.host, m_Buffer, m_BufferLen); + LogPrint("Requesting ", r.host, ":", r.port, " with path ", r.uri, " and method ", r.method); + SendToAddress (r.host, r.port, m_Buffer, m_BufferLen); } } diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 7ab07005..b4c7f1dc 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -839,10 +839,10 @@ namespace util { std::string request = "GET " + uri + " HTTP/1.1\r\nHost:" + address + "\r\n"; LogPrint("HTTP Client Request: ", request); - SendToAddress (address, request.c_str (), request.size ()); + SendToAddress (address, 80, request.c_str (), request.size ()); } - void HTTPConnection::SendToAddress (const std::string& address, const char * buf, size_t len) + void HTTPConnection::SendToAddress (const std::string& address, int port, const char * buf, size_t len) { i2p::data::IdentHash destination; if (!i2p::data::netdb.GetAddressBook ().GetIdentHash (address, destination)) @@ -854,33 +854,34 @@ namespace util auto leaseSet = i2p::client::context.GetSharedLocalDestination ()->FindLeaseSet (destination); if (leaseSet && leaseSet->HasNonExpiredLeases ()) - SendToDestination (leaseSet, buf, len); + SendToDestination (leaseSet, port, buf, len); else { i2p::data::netdb.RequestDestination (destination, true, i2p::client::context.GetSharedLocalDestination ()->GetTunnelPool ()); m_Timer.expires_from_now (boost::posix_time::seconds(HTTP_DESTINATION_REQUEST_TIMEOUT)); m_Timer.async_wait (boost::bind (&HTTPConnection::HandleDestinationRequestTimeout, - this, boost::asio::placeholders::error, destination, buf, len)); + this, boost::asio::placeholders::error, destination, port, buf, len)); } } - void HTTPConnection::HandleDestinationRequestTimeout (const boost::system::error_code& ecode, i2p::data::IdentHash destination, const char * buf, size_t len) + void HTTPConnection::HandleDestinationRequestTimeout (const boost::system::error_code& ecode, + i2p::data::IdentHash destination, int port, const char * buf, size_t len) { if (ecode != boost::asio::error::operation_aborted) { auto leaseSet = i2p::client::context.GetSharedLocalDestination ()->FindLeaseSet (destination); if (leaseSet && leaseSet->HasNonExpiredLeases ()) - SendToDestination (leaseSet, buf, len); + SendToDestination (leaseSet, port, buf, len); else // still no LeaseSet SendReply (leaseSet ? "" + itoopieImage + "
Leases expired" : "" + itoopieImage + "LeaseSet not found", 504); } } - void HTTPConnection::SendToDestination (const i2p::data::LeaseSet * remote, const char * buf, size_t len) + void HTTPConnection::SendToDestination (const i2p::data::LeaseSet * remote, int port, const char * buf, size_t len) { if (!m_Stream) - m_Stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (*remote); + m_Stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (*remote, port); if (m_Stream) { m_Stream->Send ((uint8_t *)buf, len); diff --git a/HTTPServer.h b/HTTPServer.h index 30545639..c7d6b0e4 100644 --- a/HTTPServer.h +++ b/HTTPServer.h @@ -29,6 +29,7 @@ namespace util std::string method; std::string uri; std::string host; + int port; int http_version_major; int http_version_minor; std::vector
headers; @@ -88,9 +89,10 @@ namespace util virtual void RunRequest (); void HandleDestinationRequest(const std::string& address, const std::string& uri); - void SendToAddress (const std::string& address, const char * buf, size_t len); - void HandleDestinationRequestTimeout (const boost::system::error_code& ecode, i2p::data::IdentHash destination, const char * buf, size_t len); - void SendToDestination (const i2p::data::LeaseSet * remote, const char * buf, size_t len); + void SendToAddress (const std::string& address, int port, const char * buf, size_t len); + void HandleDestinationRequestTimeout (const boost::system::error_code& ecode, + i2p::data::IdentHash destination, int port, const char * buf, size_t len); + void SendToDestination (const i2p::data::LeaseSet * remote, int port, const char * buf, size_t len); public: diff --git a/Streaming.cpp b/Streaming.cpp index 572d12c2..dba42d29 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -12,12 +12,12 @@ namespace i2p namespace stream { Stream::Stream (boost::asio::io_service& service, StreamingDestination& local, - const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), + const i2p::data::LeaseSet& remote, int port): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_RemoteLeaseSet (&remote), m_RoutingSession (nullptr), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), - m_NumReceivedBytes (0) + m_NumReceivedBytes (0), m_Port (port) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -28,7 +28,7 @@ namespace stream m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local), m_RemoteLeaseSet (nullptr), m_RoutingSession (nullptr), m_ReceiveTimer (m_Service), m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0), - m_NumReceivedBytes (0) + m_NumReceivedBytes (0), m_Port (0) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } @@ -441,8 +441,7 @@ namespace stream std::vector msgs; for (auto it: packets) { - auto msg = m_RoutingSession->WrapSingleMessage ( - m_LocalDestination.CreateDataMessage (it->GetBuffer (), it->GetLength ())); + auto msg = m_RoutingSession->WrapSingleMessage (CreateDataMessage (it->GetBuffer (), it->GetLength ())); msgs.push_back (i2p::tunnel::TunnelMessageBlock { i2p::tunnel::eDeliveryTypeTunnel, @@ -531,6 +530,30 @@ namespace stream m_CurrentRemoteLease.endDate = 0; } + I2NPMessage * Stream::CreateDataMessage (const uint8_t * payload, size_t len) + { + I2NPMessage * msg = NewI2NPShortMessage (); + CryptoPP::Gzip compressor; + if (len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE) + compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); + else + compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL); + compressor.Put (payload, len); + compressor.MessageEnd(); + int size = compressor.MaxRetrievable (); + uint8_t * buf = msg->GetPayload (); + *(uint32_t *)buf = htobe32 (size); // length + buf += 4; + compressor.Get (buf, size); + *(uint16_t *)(buf + 4) = 0; // source port + *(uint16_t *)(buf + 6) = htobe16 (m_Port); // destination port + buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol + msg->len += size + 4; + FillI2NPMessageHeader (msg, eI2NPData); + + return msg; + } + void StreamingDestination::Start () { } @@ -574,9 +597,9 @@ namespace stream } } - Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) + Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port) { - Stream * s = new Stream (*m_Owner.GetService (), *this, remote); + Stream * s = new Stream (*m_Owner.GetService (), *this, remote, port); std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; return s; @@ -628,29 +651,6 @@ namespace stream } } - I2NPMessage * StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len) - { - I2NPMessage * msg = NewI2NPShortMessage (); - CryptoPP::Gzip compressor; - if (len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE) - compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL); - else - compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL); - compressor.Put (payload, len); - compressor.MessageEnd(); - int size = compressor.MaxRetrievable (); - uint8_t * buf = msg->GetPayload (); - *(uint32_t *)buf = htobe32 (size); // length - buf += 4; - compressor.Get (buf, size); - memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later - buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol - msg->len += size + 4; - FillI2NPMessageHeader (msg, eI2NPData); - - return msg; - } - void DeleteStream (Stream * stream) { if (stream) diff --git a/Streaming.h b/Streaming.h index 6d57a8ad..1c8958b2 100644 --- a/Streaming.h +++ b/Streaming.h @@ -82,7 +82,8 @@ namespace stream { public: - Stream (boost::asio::io_service& service, StreamingDestination& local, const i2p::data::LeaseSet& remote); // outgoing + Stream (boost::asio::io_service& service, StreamingDestination& local, + const i2p::data::LeaseSet& remote, int port = 0); // outgoing Stream (boost::asio::io_service& service, StreamingDestination& local); // incoming ~Stream (); @@ -126,6 +127,8 @@ namespace stream void ScheduleResend (); void HandleResendTimer (const boost::system::error_code& ecode); void HandleAckSendTimer (const boost::system::error_code& ecode); + + I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len); private: @@ -143,6 +146,7 @@ namespace stream std::set m_SentPackets; boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer, m_AckSendTimer; size_t m_NumSentBytes, m_NumReceivedBytes; + uint16_t m_Port; }; class StreamingDestination @@ -155,14 +159,13 @@ namespace stream void Start (); void Stop (); - Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); + Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port = 0); void DeleteStream (Stream * stream); void SetAcceptor (const std::function& acceptor) { m_Acceptor = acceptor; }; void ResetAcceptor () { m_Acceptor = nullptr; }; bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; i2p::client::ClientDestination& GetOwner () { return m_Owner; }; - I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len); void HandleDataMessagePayload (const uint8_t * buf, size_t len); private: