diff --git a/SSU.cpp b/SSU.cpp index b118c70a..9075d157 100644 --- a/SSU.cpp +++ b/SSU.cpp @@ -796,6 +796,18 @@ namespace ssu LogPrint ("SSU receive error: ", ecode.message ()); } + SSUSession * SSUServer::FindSession (const i2p::data::RouterInfo * router) + { + if (!router) return nullptr; + auto address = router->GetSSUAddress (); + if (!address) return nullptr; + auto it = m_Sessions.find (boost::asio::ip::udp::endpoint (address->host, address->port)); + if (it != m_Sessions.end ()) + return it->second; + else + return nullptr; + } + SSUSession * SSUServer::GetSession (const i2p::data::RouterInfo * router) { SSUSession * session = nullptr; diff --git a/SSU.h b/SSU.h index c0c82e3e..c3b2c476 100644 --- a/SSU.h +++ b/SSU.h @@ -126,6 +126,7 @@ namespace ssu void Start (); void Stop (); SSUSession * GetSession (const i2p::data::RouterInfo * router); + SSUSession * FindSession (const i2p::data::RouterInfo * router); void DeleteSession (SSUSession * session); void DeleteAllSessions (); diff --git a/Streaming.cpp b/Streaming.cpp index 40bc8929..d68f08c7 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -1,6 +1,5 @@ #include #include -#include #include #include "Log.h" #include "RouterInfo.h" @@ -19,7 +18,7 @@ namespace stream const i2p::data::LeaseSet& remote): m_Service (service), m_SendStreamID (0), m_SequenceNumber (0), m_LastReceivedSequenceNumber (0), m_IsOpen (false), m_LeaseSetUpdated (true), m_LocalDestination (local), m_RemoteLeaseSet (remote), - m_OutboundTunnel (nullptr) + m_OutboundTunnel (nullptr), m_ReceiveTimer (m_Service) { m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); UpdateCurrentRemoteLease (); @@ -27,6 +26,7 @@ namespace stream Stream::~Stream () { + m_ReceiveTimer.cancel (); while (auto packet = m_ReceiveQueue.Get ()) delete packet; for (auto it: m_SavedPackets) @@ -57,7 +57,10 @@ namespace stream } else break; - } + } + + // send ack for last message + SendQuickAck (); } else { @@ -117,13 +120,13 @@ namespace stream delete packet; m_LastReceivedSequenceNumber = receivedSeqn; - SendQuickAck (); if (flags & PACKET_FLAG_CLOSE) { LogPrint ("Closed"); m_IsOpen = false; m_ReceiveQueue.WakeUp (); + m_ReceiveTimer.cancel (); } } @@ -141,7 +144,9 @@ namespace stream void Stream::ConnectAndSend (uint8_t * buf, size_t len) { m_IsOpen = true; - uint8_t packet[STREAMING_MTU]; + Packet * p = new Packet (); + uint8_t * packet = p->GetBuffer (); + // TODO: implement setters size_t size = 0; *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); size += 4; // sendStreamID @@ -167,18 +172,18 @@ namespace stream size += 2; // max packet size uint8_t * signature = packet + size; // set it later memset (signature, 0, 40); // zeroes for now - size += 40; // signature - + size += 40; // signature memcpy (packet + size, buf, len); size += len; // payload m_LocalDestination->Sign (packet, size, signature); + p->len = size; - SendPacket (packet, size); + m_Service.post (boost::bind (&Stream::SendPacket, this, p)); } void Stream::SendQuickAck () { - uint8_t packet[STREAMING_MTU]; + uint8_t packet[MAX_PACKET_SIZE]; size_t size = 0; *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); size += 4; // sendStreamID @@ -195,7 +200,7 @@ namespace stream size += 2; // flags *(uint16_t *)(packet + size) = 0; // no options size += 2; // options size - + if (SendPacket (packet, size)) LogPrint ("Quick Ack sent"); } @@ -205,7 +210,7 @@ namespace stream if (m_IsOpen) { m_IsOpen = false; - uint8_t packet[STREAMING_MTU]; + uint8_t packet[MAX_PACKET_SIZE]; size_t size = 0; *(uint32_t *)(packet + size) = htobe32 (m_SendStreamID); size += 4; // sendStreamID @@ -226,7 +231,7 @@ namespace stream memset (packet + size, 0, 40); size += 40; // signature m_LocalDestination->Sign (packet, size, signature); - + if (SendPacket (packet, size)) LogPrint ("FIN sent"); m_ReceiveQueue.WakeUp (); @@ -266,7 +271,19 @@ namespace stream return pos; } - bool Stream::SendPacket (uint8_t * packet, size_t size) + bool Stream::SendPacket (Packet * packet) + { + if (packet) + { + bool ret = SendPacket (packet->GetBuffer (), packet->GetLength ()); + delete packet; + return ret; + } + else + return false; + } + + bool Stream::SendPacket (const uint8_t * buf, size_t len) { const I2NPMessage * leaseSet = nullptr; if (m_LeaseSetUpdated) @@ -275,7 +292,7 @@ namespace stream m_LeaseSetUpdated = false; } I2NPMessage * msg = i2p::garlic::routing.WrapMessage (m_RemoteLeaseSet, - CreateDataMessage (this, packet, size), leaseSet); + CreateDataMessage (this, buf, len), leaseSet); if (!m_OutboundTunnel) m_OutboundTunnel = m_LocalDestination->GetTunnelPool ()->GetNextOutboundTunnel (); if (m_OutboundTunnel) @@ -540,7 +557,7 @@ namespace stream LogPrint ("Data: protocol ", buf[9], " is not supported"); } - I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len) + I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len) { I2NPMessage * msg = NewI2NPMessage (); CryptoPP::Gzip compressor; diff --git a/Streaming.h b/Streaming.h index 484479b2..3de8f3e2 100644 --- a/Streaming.h +++ b/Streaming.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "I2PEndian.h" #include "Queue.h" @@ -37,7 +38,7 @@ namespace stream struct Packet { - uint8_t buf[1754]; + uint8_t buf[MAX_PACKET_SIZE]; size_t len, offset; Packet (): len (0), offset (0) {}; @@ -80,21 +81,28 @@ namespace stream void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds size_t Receive (uint8_t * buf, size_t len, int timeout = 0); // returns 0 if timeout expired + template + void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); + void Close (); void SetLeaseSetUpdated () { m_LeaseSetUpdated = true; }; - + private: void ConnectAndSend (uint8_t * buf, size_t len); void SendQuickAck (); - bool SendPacket (uint8_t * packet, size_t size); + bool SendPacket (Packet * packet); + bool SendPacket (const uint8_t * buf, size_t len); void SavePacket (Packet * packet); void ProcessPacket (Packet * packet); void UpdateCurrentRemoteLease (); + template + void HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler); + private: boost::asio::io_service& m_Service; @@ -106,6 +114,7 @@ namespace stream i2p::util::Queue m_ReceiveQueue; std::set m_SavedPackets; i2p::tunnel::OutboundTunnel * m_OutboundTunnel; + boost::asio::deadline_timer m_ReceiveTimer; }; class StreamingDestination: public i2p::data::LocalDestination @@ -183,7 +192,29 @@ namespace stream // assuming data is I2CP message void HandleDataMessage (i2p::data::IdentHash destination, const uint8_t * buf, size_t len); - I2NPMessage * CreateDataMessage (Stream * s, uint8_t * payload, size_t len); + I2NPMessage * CreateDataMessage (Stream * s, const uint8_t * payload, size_t len); + +//------------------------------------------------- + + template + void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout) + { + m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(timeout)); + m_ReceiveTimer.async_wait (boost::bind (&Stream::HandleReceiveTimer, + this, boost::asio::placeholders::error, buffer, handler)); + } + + template + void Stream::HandleReceiveTimer (const boost::system::error_code& ecode, const Buffer& buffer, ReceiveHandler handler) + { + // TODO: + if (ecode == boost::asio::error::operation_aborted) + // timeout not expired + handler (boost::system::error_code (), 0); + else + // timeout expired + handler (boost::asio::error::make_error_code (boost::asio::error::timed_out), 0); + } } } diff --git a/Transports.cpp b/Transports.cpp index 9dc733c4..60de2710 100644 --- a/Transports.cpp +++ b/Transports.cpp @@ -156,34 +156,36 @@ namespace i2p void Transports::PostMessage (const i2p::data::IdentHash& ident, i2p::I2NPMessage * msg) { auto session = FindNTCPSession (ident); - if (!session) + if (session) + session->SendI2NPMessage (msg); + else { RouterInfo * r = netdb.FindRouter (ident); if (r) { - auto address = r->GetNTCPAddress (); - if (address) - { - session = new i2p::ntcp::NTCPClient (m_Service, address->host, address->port, *r); - AddNTCPSession (session); - } + auto ssuSession = m_SSUServer ? m_SSUServer->FindSession (r) : nullptr; + if (ssuSession) + ssuSession->SendI2NPMessage (msg); else { - // SSU always have lower prioprity than NTCP - // TODO: it shouldn't - LogPrint ("No NTCP addresses available. Trying SSU"); - address = r->GetSSUAddress (); - if (address && m_SSUServer) - { - auto s = m_SSUServer->GetSession (r); + // existing session not found. create new + // try NTCP first + auto address = r->GetNTCPAddress (); + if (address) + { + auto s = new i2p::ntcp::NTCPClient (m_Service, address->host, address->port, *r); + AddNTCPSession (s); + s->SendI2NPMessage (msg); + } + else + { + // then SSU + auto s = m_SSUServer ? m_SSUServer->GetSession (r) : nullptr; if (s) - { s->SendI2NPMessage (msg); - return; - } + else + LogPrint ("No NTCP and SSU addresses available"); } - else - LogPrint ("No SSU addresses available"); } } else @@ -192,10 +194,6 @@ namespace i2p i2p::data::netdb.RequestDestination (ident); } } - if (session) - session->SendI2NPMessage (msg); - else - LogPrint ("Session not found"); } void Transports::DetectExternalIP ()