From 5eb67a08c9274aede27ea79e283fa39fdb235530 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 27 Mar 2014 13:24:23 -0400 Subject: [PATCH 1/4] send reply --- HTTPServer.cpp | 56 ++++++++++++++++++++++++++++---------------------- HTTPServer.h | 3 ++- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 30fb461b..6d5f24ce 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -1,4 +1,5 @@ #include +#include #include #include "base64.h" #include "Log.h" @@ -76,10 +77,6 @@ namespace util } else HandleRequest (); - boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), - boost::bind (&HTTPConnection::HandleWrite, this, - boost::asio::placeholders::error)); - //Receive (); } else if (ecode != boost::asio::error::operation_aborted) Terminate (); @@ -97,9 +94,10 @@ namespace util return ""; } - void HTTPConnection::HandleWrite (const boost::system::error_code& ecode) + void HTTPConnection::HandleWrite (const boost::system::error_code& ecode, bool terminate) { - Terminate (); + if (terminate) + Terminate (); } void HTTPConnection::HandleRequest () @@ -108,12 +106,7 @@ namespace util s << ""; FillContent (s); s << ""; - m_Reply.content = s.str (); - m_Reply.headers.resize(2); - m_Reply.headers[0].name = "Content-Length"; - m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); - m_Reply.headers[1].name = "Content-Type"; - m_Reply.headers[1].value = "text/html"; + SendReply (s.str ()); } void HTTPConnection::FillContent (std::stringstream& s) @@ -211,6 +204,7 @@ namespace util if (!addr) { LogPrint ("Unknown address ", address); + SendReply ("Unknown address"); return; } destination = *addr; @@ -234,12 +228,7 @@ namespace util leaseSet = i2p::data::netdb.FindLeaseSet (destination); if (!leaseSet || !leaseSet->HasNonExpiredLeases ()) // still no LeaseSet { - m_Reply.content = leaseSet ? "Leases expired" : "LeaseSet not found"; - m_Reply.headers.resize(2); - m_Reply.headers[0].name = "Content-Length"; - m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); - m_Reply.headers[1].name = "Content-Type"; - m_Reply.headers[1].value = "text/html"; + SendReply (leaseSet ? "Leases expired" : "LeaseSet not found"); return; } } @@ -261,6 +250,9 @@ namespace util m_Reply.content = ss.str (); // send "as is" m_Reply.headers.resize(0); // no headers + boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), + boost::bind (&HTTPConnection::HandleWrite, this, + boost::asio::placeholders::error, true)); return; } else // nothing received @@ -268,17 +260,33 @@ namespace util s->Close (); DeleteStream (s); - m_Reply.content = ss.str (); - m_Reply.headers.resize(2); - m_Reply.headers[0].name = "Content-Length"; - m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); - m_Reply.headers[1].name = "Content-Type"; - m_Reply.headers[1].value = "text/html"; + SendReply (ss.str ()); } } void HTTPConnection::HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) { + if (bytes_transferred) + { + } + else + { + SendReply ("Not responding"); + } + } + + void HTTPConnection::SendReply (const std::string& content) + { + m_Reply.content = content; + m_Reply.headers.resize(2); + m_Reply.headers[0].name = "Content-Length"; + m_Reply.headers[0].value = boost::lexical_cast(m_Reply.content.size()); + m_Reply.headers[1].name = "Content-Type"; + m_Reply.headers[1].value = "text/html"; + + boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), + boost::bind (&HTTPConnection::HandleWrite, this, + boost::asio::placeholders::error, true)); } HTTPServer::HTTPServer (int port): diff --git a/HTTPServer.h b/HTTPServer.h index b966db81..f964cae3 100644 --- a/HTTPServer.h +++ b/HTTPServer.h @@ -46,7 +46,8 @@ namespace util void Receive (); void HandleReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void HandleWrite(const boost::system::error_code& ecode); + void HandleWrite(const boost::system::error_code& ecode, bool terminate); + void SendReply (const std::string& content); void HandleRequest (); void HandleDestinationRequest (const std::string& address, const std::string& uri); From 86e233d77a4ff6d2effbcf9872fff6cd0d8bd2d5 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 27 Mar 2014 15:42:23 -0400 Subject: [PATCH 2/4] Asyc receive from stream --- HTTPServer.cpp | 60 ++++++++++++++++++++++++++++++++++---------------- HTTPServer.h | 10 ++++++--- 2 files changed, 48 insertions(+), 22 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 6d5f24ce..00cac390 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -7,7 +7,6 @@ #include "TransitTunnel.h" #include "Transports.h" #include "NetDb.h" -#include "Streaming.h" #include "HTTPServer.h" namespace i2p @@ -44,6 +43,11 @@ namespace util void HTTPConnection::Terminate () { + if (m_Stream) + { + m_Stream->Close (); + DeleteStream (m_Stream); + } m_Socket->close (); delete this; } @@ -94,10 +98,17 @@ namespace util return ""; } - void HTTPConnection::HandleWrite (const boost::system::error_code& ecode, bool terminate) + void HTTPConnection::HandleWriteReply (const boost::system::error_code& ecode) + { + Terminate (); + } + + void HTTPConnection::HandleWrite (const boost::system::error_code& ecode) { - if (terminate) + if (ecode || (m_Stream && !m_Stream->IsOpen ())) Terminate (); + else // data keeps coming + AsyncStreamReceive (); } void HTTPConnection::HandleRequest () @@ -231,47 +242,58 @@ namespace util SendReply (leaseSet ? "Leases expired" : "LeaseSet not found"); return; } - } - auto s = i2p::stream::CreateStream (*leaseSet); - if (s) + } + if (!m_Stream) + m_Stream = i2p::stream::CreateStream (*leaseSet); + if (m_Stream) { std::string request = "GET " + uri + " HTTP/1.1\n Host:" + fullAddress + "\n"; - s->Send ((uint8_t *)request.c_str (), request.length (), 10); + m_Stream->Send ((uint8_t *)request.c_str (), request.length (), 10); std::stringstream ss; uint8_t buf[8192]; - size_t r = s->Receive (buf, 8192, 30); // 30 seconds - if (!r && s->IsEstablished ()) // nothing received but connection is established - r = s->Receive (buf, 8192, 30); // wait for another 30 secondd + size_t r = m_Stream->Receive (buf, 8192, 30); // 30 seconds + if (!r && m_Stream->IsEstablished ()) // nothing received but connection is established + r = m_Stream->Receive (buf, 8192, 30); // wait for another 30 secondd if (r) // we recieved data { ss << std::string ((char *)buf, r); - while (s->IsOpen () && (r = s->Receive (buf, 8192, 30)) > 0) + while (m_Stream->IsOpen () && (r = m_Stream->Receive (buf, 8192, 30)) > 0) ss << std::string ((char *)buf,r); m_Reply.content = ss.str (); // send "as is" m_Reply.headers.resize(0); // no headers boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), - boost::bind (&HTTPConnection::HandleWrite, this, - boost::asio::placeholders::error, true)); + boost::bind (&HTTPConnection::HandleWriteReply, this, boost::asio::placeholders::error)); return; } else // nothing received ss << "Not responding"; - s->Close (); - DeleteStream (s); - SendReply (ss.str ()); } } + void HTTPConnection::AsyncStreamReceive () + { + if (m_Stream) + m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, 8192), + boost::protect (boost::bind (&HTTPConnection::HandleStreamReceive, this, + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)), + 30); // 30 seconds timeout + } + void HTTPConnection::HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (bytes_transferred) { + boost::asio::async_write (*m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), + boost::bind (&HTTPConnection::HandleWrite, this, boost::asio::placeholders::error)); } else { - SendReply ("Not responding"); + if (m_Stream && m_Stream->IsOpen ()) + SendReply ("Not responding"); + else + Terminate (); } } @@ -285,8 +307,8 @@ namespace util m_Reply.headers[1].value = "text/html"; boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), - boost::bind (&HTTPConnection::HandleWrite, this, - boost::asio::placeholders::error, true)); + boost::bind (&HTTPConnection::HandleWriteReply, this, + boost::asio::placeholders::error)); } HTTPServer::HTTPServer (int port): diff --git a/HTTPServer.h b/HTTPServer.h index f964cae3..3fed5bd0 100644 --- a/HTTPServer.h +++ b/HTTPServer.h @@ -5,6 +5,7 @@ #include #include #include +#include "Streaming.h" namespace i2p { @@ -37,7 +38,7 @@ namespace util public: - HTTPConnection (boost::asio::ip::tcp::socket * socket): m_Socket (socket) { Receive (); }; + HTTPConnection (boost::asio::ip::tcp::socket * socket): m_Socket (socket), m_Stream (nullptr) { Receive (); }; ~HTTPConnection () { delete m_Socket; } private: @@ -45,8 +46,10 @@ namespace util void Terminate (); void Receive (); void HandleReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void AsyncStreamReceive (); void HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); - void HandleWrite(const boost::system::error_code& ecode, bool terminate); + void HandleWriteReply(const boost::system::error_code& ecode); + void HandleWrite (const boost::system::error_code& ecode); void SendReply (const std::string& content); void HandleRequest (); @@ -57,7 +60,8 @@ namespace util private: boost::asio::ip::tcp::socket * m_Socket; - char m_Buffer[8192]; + i2p::stream::Stream * m_Stream; + char m_Buffer[8192], m_StreamBuffer[8192]; request m_Request; reply m_Reply; }; From 8728aa28403efefe5ad469e7a5591e5787c1da81 Mon Sep 17 00:00:00 2001 From: orignal Date: Thu, 27 Mar 2014 15:56:13 -0400 Subject: [PATCH 3/4] check buffer first for async receive --- Streaming.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Streaming.h b/Streaming.h index 8de2d241..b25c70a0 100644 --- a/Streaming.h +++ b/Streaming.h @@ -200,6 +200,16 @@ namespace stream template void Stream::AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout) { + if (!m_ReceiveQueue.IsEmpty ()) + { + size_t received = ConcatenatePackets (boost::asio::buffer_cast(buffer), boost::asio::buffer_size(buffer)); + if (received) + { + // TODO: post to stream's thread + handler (boost::system::error_code (), received); + return; + } + } m_ReceiveTimer.expires_from_now (boost::posix_time::seconds(timeout)); m_ReceiveTimer.async_wait (boost::bind (&Stream::HandleReceiveTimer, this, boost::asio::placeholders::error, buffer, handler)); From 086b0d5418673111b8b0ca5951846ac803b518a9 Mon Sep 17 00:00:00 2001 From: orignal Date: Sat, 29 Mar 2014 08:11:00 -0400 Subject: [PATCH 4/4] switch to AsyncReceive --- HTTPServer.cpp | 24 +++--------------------- Streaming.cpp | 3 +++ 2 files changed, 6 insertions(+), 21 deletions(-) diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 00cac390..d2061164 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -226,6 +226,7 @@ namespace util if (i2p::data::Base32ToByteStream (address.c_str (), address.length (), (uint8_t *)destination, 32) != 32) { LogPrint ("Invalid Base32 address ", address); + SendReply ("Invalid Base32 address"); return; } fullAddress = address + ".b32.i2p"; @@ -249,26 +250,7 @@ namespace util { std::string request = "GET " + uri + " HTTP/1.1\n Host:" + fullAddress + "\n"; m_Stream->Send ((uint8_t *)request.c_str (), request.length (), 10); - std::stringstream ss; - uint8_t buf[8192]; - size_t r = m_Stream->Receive (buf, 8192, 30); // 30 seconds - if (!r && m_Stream->IsEstablished ()) // nothing received but connection is established - r = m_Stream->Receive (buf, 8192, 30); // wait for another 30 secondd - if (r) // we recieved data - { - ss << std::string ((char *)buf, r); - while (m_Stream->IsOpen () && (r = m_Stream->Receive (buf, 8192, 30)) > 0) - ss << std::string ((char *)buf,r); - - m_Reply.content = ss.str (); // send "as is" - m_Reply.headers.resize(0); // no headers - boost::asio::async_write (*m_Socket, m_Reply.to_buffers(), - boost::bind (&HTTPConnection::HandleWriteReply, this, boost::asio::placeholders::error)); - return; - } - else // nothing received - ss << "Not responding"; - SendReply (ss.str ()); + AsyncStreamReceive (); } } @@ -278,7 +260,7 @@ namespace util m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, 8192), boost::protect (boost::bind (&HTTPConnection::HandleStreamReceive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)), - 30); // 30 seconds timeout + 45); // 45 seconds timeout } void HTTPConnection::HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) diff --git a/Streaming.cpp b/Streaming.cpp index 7af4d291..bf8eacd3 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -115,7 +115,10 @@ namespace stream packet->offset = packet->GetPayload () - packet->buf; if (packet->GetLength () > 0) + { m_ReceiveQueue.Put (packet); + m_ReceiveTimer.cancel (); + } else delete packet;