Browse Source

send destination port for streaming

pull/105/head
orignal 10 years ago
parent
commit
65719ae645
  1. 4
      Destination.cpp
  2. 2
      Destination.h
  3. 7
      HTTPProxy.cpp
  4. 17
      HTTPServer.cpp
  5. 8
      HTTPServer.h
  6. 60
      Streaming.cpp
  7. 9
      Streaming.h

4
Destination.cpp

@ -268,10 +268,10 @@ namespace client @@ -268,10 +268,10 @@ namespace client
}
}
i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote)
i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote, int port)
{
if (m_StreamingDestination)
return m_StreamingDestination->CreateNewOutgoingStream (remote);
return m_StreamingDestination->CreateNewOutgoingStream (remote, port);
return nullptr;
}

2
Destination.h

@ -41,7 +41,7 @@ namespace client @@ -41,7 +41,7 @@ namespace client
// streaming
i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; };
i2p::stream::Stream * CreateStream (const i2p::data::LeaseSet& remote);
i2p::stream::Stream * CreateStream (const i2p::data::LeaseSet& remote, int port = 0);
void AcceptStreams (const std::function<void (i2p::stream::Stream *)>& acceptor);
void StopAcceptingStreams ();
bool IsAcceptingStreams () const;

7
HTTPProxy.cpp

@ -52,10 +52,11 @@ namespace proxy @@ -52,10 +52,11 @@ namespace proxy
}
path=m[4].str();
}
LogPrint("server is: ",server, "\n path is: ",path);
LogPrint("server is: ",server, " port is: ", port, "\n path is: ",path);
r.uri = path;
r.method = method;
r.host = server;
r.port = boost::lexical_cast<int>(port);
}
@ -77,8 +78,8 @@ namespace proxy @@ -77,8 +78,8 @@ namespace proxy
}
}
LogPrint("Requesting ", r.host, " with path ", r.uri, " and method ", r.method);
SendToAddress (r.host, m_Buffer, m_BufferLen);
LogPrint("Requesting ", r.host, ":", r.port, " with path ", r.uri, " and method ", r.method);
SendToAddress (r.host, r.port, m_Buffer, m_BufferLen);
}
}

17
HTTPServer.cpp

@ -839,10 +839,10 @@ namespace util @@ -839,10 +839,10 @@ namespace util
{
std::string request = "GET " + uri + " HTTP/1.1\r\nHost:" + address + "\r\n";
LogPrint("HTTP Client Request: ", request);
SendToAddress (address, request.c_str (), request.size ());
SendToAddress (address, 80, request.c_str (), request.size ());
}
void HTTPConnection::SendToAddress (const std::string& address, const char * buf, size_t len)
void HTTPConnection::SendToAddress (const std::string& address, int port, const char * buf, size_t len)
{
i2p::data::IdentHash destination;
if (!i2p::data::netdb.GetAddressBook ().GetIdentHash (address, destination))
@ -854,33 +854,34 @@ namespace util @@ -854,33 +854,34 @@ namespace util
auto leaseSet = i2p::client::context.GetSharedLocalDestination ()->FindLeaseSet (destination);
if (leaseSet && leaseSet->HasNonExpiredLeases ())
SendToDestination (leaseSet, buf, len);
SendToDestination (leaseSet, port, buf, len);
else
{
i2p::data::netdb.RequestDestination (destination, true, i2p::client::context.GetSharedLocalDestination ()->GetTunnelPool ());
m_Timer.expires_from_now (boost::posix_time::seconds(HTTP_DESTINATION_REQUEST_TIMEOUT));
m_Timer.async_wait (boost::bind (&HTTPConnection::HandleDestinationRequestTimeout,
this, boost::asio::placeholders::error, destination, buf, len));
this, boost::asio::placeholders::error, destination, port, buf, len));
}
}
void HTTPConnection::HandleDestinationRequestTimeout (const boost::system::error_code& ecode, i2p::data::IdentHash destination, const char * buf, size_t len)
void HTTPConnection::HandleDestinationRequestTimeout (const boost::system::error_code& ecode,
i2p::data::IdentHash destination, int port, const char * buf, size_t len)
{
if (ecode != boost::asio::error::operation_aborted)
{
auto leaseSet = i2p::client::context.GetSharedLocalDestination ()->FindLeaseSet (destination);
if (leaseSet && leaseSet->HasNonExpiredLeases ())
SendToDestination (leaseSet, buf, len);
SendToDestination (leaseSet, port, buf, len);
else
// still no LeaseSet
SendReply (leaseSet ? "<html>" + itoopieImage + "<br>Leases expired</html>" : "<html>" + itoopieImage + "LeaseSet not found</html>", 504);
}
}
void HTTPConnection::SendToDestination (const i2p::data::LeaseSet * remote, const char * buf, size_t len)
void HTTPConnection::SendToDestination (const i2p::data::LeaseSet * remote, int port, const char * buf, size_t len)
{
if (!m_Stream)
m_Stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (*remote);
m_Stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (*remote, port);
if (m_Stream)
{
m_Stream->Send ((uint8_t *)buf, len);

8
HTTPServer.h

@ -29,6 +29,7 @@ namespace util @@ -29,6 +29,7 @@ namespace util
std::string method;
std::string uri;
std::string host;
int port;
int http_version_major;
int http_version_minor;
std::vector<header> headers;
@ -88,9 +89,10 @@ namespace util @@ -88,9 +89,10 @@ namespace util
virtual void RunRequest ();
void HandleDestinationRequest(const std::string& address, const std::string& uri);
void SendToAddress (const std::string& address, const char * buf, size_t len);
void HandleDestinationRequestTimeout (const boost::system::error_code& ecode, i2p::data::IdentHash destination, const char * buf, size_t len);
void SendToDestination (const i2p::data::LeaseSet * remote, const char * buf, size_t len);
void SendToAddress (const std::string& address, int port, const char * buf, size_t len);
void HandleDestinationRequestTimeout (const boost::system::error_code& ecode,
i2p::data::IdentHash destination, int port, const char * buf, size_t len);
void SendToDestination (const i2p::data::LeaseSet * remote, int port, const char * buf, size_t len);
public:

60
Streaming.cpp

@ -12,12 +12,12 @@ namespace i2p @@ -12,12 +12,12 @@ namespace i2p
namespace stream
{
Stream::Stream (boost::asio::io_service& service, StreamingDestination& local,
const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0),
const i2p::data::LeaseSet& remote, int port): m_Service (service), m_SendStreamID (0),
m_SequenceNumber (0), m_LastReceivedSequenceNumber (-1), m_IsOpen (false),
m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_RemoteLeaseSet (&remote), m_RoutingSession (nullptr), m_ReceiveTimer (m_Service),
m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0),
m_NumReceivedBytes (0)
m_NumReceivedBytes (0), m_Port (port)
{
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
UpdateCurrentRemoteLease ();
@ -28,7 +28,7 @@ namespace stream @@ -28,7 +28,7 @@ namespace stream
m_IsOpen (false), m_IsReset (false), m_IsAckSendScheduled (false), m_LocalDestination (local),
m_RemoteLeaseSet (nullptr), m_RoutingSession (nullptr), m_ReceiveTimer (m_Service),
m_ResendTimer (m_Service), m_AckSendTimer (m_Service), m_NumSentBytes (0),
m_NumReceivedBytes (0)
m_NumReceivedBytes (0), m_Port (0)
{
m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 ();
}
@ -441,8 +441,7 @@ namespace stream @@ -441,8 +441,7 @@ namespace stream
std::vector<i2p::tunnel::TunnelMessageBlock> msgs;
for (auto it: packets)
{
auto msg = m_RoutingSession->WrapSingleMessage (
m_LocalDestination.CreateDataMessage (it->GetBuffer (), it->GetLength ()));
auto msg = m_RoutingSession->WrapSingleMessage (CreateDataMessage (it->GetBuffer (), it->GetLength ()));
msgs.push_back (i2p::tunnel::TunnelMessageBlock
{
i2p::tunnel::eDeliveryTypeTunnel,
@ -531,6 +530,30 @@ namespace stream @@ -531,6 +530,30 @@ namespace stream
m_CurrentRemoteLease.endDate = 0;
}
I2NPMessage * Stream::CreateDataMessage (const uint8_t * payload, size_t len)
{
I2NPMessage * msg = NewI2NPShortMessage ();
CryptoPP::Gzip compressor;
if (len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE)
compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL);
else
compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL);
compressor.Put (payload, len);
compressor.MessageEnd();
int size = compressor.MaxRetrievable ();
uint8_t * buf = msg->GetPayload ();
*(uint32_t *)buf = htobe32 (size); // length
buf += 4;
compressor.Get (buf, size);
*(uint16_t *)(buf + 4) = 0; // source port
*(uint16_t *)(buf + 6) = htobe16 (m_Port); // destination port
buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol
msg->len += size + 4;
FillI2NPMessageHeader (msg, eI2NPData);
return msg;
}
void StreamingDestination::Start ()
{
}
@ -574,9 +597,9 @@ namespace stream @@ -574,9 +597,9 @@ namespace stream
}
}
Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote)
Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port)
{
Stream * s = new Stream (*m_Owner.GetService (), *this, remote);
Stream * s = new Stream (*m_Owner.GetService (), *this, remote, port);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s;
return s;
@ -628,29 +651,6 @@ namespace stream @@ -628,29 +651,6 @@ namespace stream
}
}
I2NPMessage * StreamingDestination::CreateDataMessage (const uint8_t * payload, size_t len)
{
I2NPMessage * msg = NewI2NPShortMessage ();
CryptoPP::Gzip compressor;
if (len <= i2p::stream::COMPRESSION_THRESHOLD_SIZE)
compressor.SetDeflateLevel (CryptoPP::Gzip::MIN_DEFLATE_LEVEL);
else
compressor.SetDeflateLevel (CryptoPP::Gzip::DEFAULT_DEFLATE_LEVEL);
compressor.Put (payload, len);
compressor.MessageEnd();
int size = compressor.MaxRetrievable ();
uint8_t * buf = msg->GetPayload ();
*(uint32_t *)buf = htobe32 (size); // length
buf += 4;
compressor.Get (buf, size);
memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later
buf[9] = i2p::client::PROTOCOL_TYPE_STREAMING; // streaming protocol
msg->len += size + 4;
FillI2NPMessageHeader (msg, eI2NPData);
return msg;
}
void DeleteStream (Stream * stream)
{
if (stream)

9
Streaming.h

@ -82,7 +82,8 @@ namespace stream @@ -82,7 +82,8 @@ namespace stream
{
public:
Stream (boost::asio::io_service& service, StreamingDestination& local, const i2p::data::LeaseSet& remote); // outgoing
Stream (boost::asio::io_service& service, StreamingDestination& local,
const i2p::data::LeaseSet& remote, int port = 0); // outgoing
Stream (boost::asio::io_service& service, StreamingDestination& local); // incoming
~Stream ();
@ -127,6 +128,8 @@ namespace stream @@ -127,6 +128,8 @@ namespace stream
void HandleResendTimer (const boost::system::error_code& ecode);
void HandleAckSendTimer (const boost::system::error_code& ecode);
I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len);
private:
boost::asio::io_service& m_Service;
@ -143,6 +146,7 @@ namespace stream @@ -143,6 +146,7 @@ namespace stream
std::set<Packet *, PacketCmp> m_SentPackets;
boost::asio::deadline_timer m_ReceiveTimer, m_ResendTimer, m_AckSendTimer;
size_t m_NumSentBytes, m_NumReceivedBytes;
uint16_t m_Port;
};
class StreamingDestination
@ -155,14 +159,13 @@ namespace stream @@ -155,14 +159,13 @@ namespace stream
void Start ();
void Stop ();
Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote);
Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote, int port = 0);
void DeleteStream (Stream * stream);
void SetAcceptor (const std::function<void (Stream *)>& acceptor) { m_Acceptor = acceptor; };
void ResetAcceptor () { m_Acceptor = nullptr; };
bool IsAcceptorSet () const { return m_Acceptor != nullptr; };
i2p::client::ClientDestination& GetOwner () { return m_Owner; };
I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len);
void HandleDataMessagePayload (const uint8_t * buf, size_t len);
private:

Loading…
Cancel
Save