From a0780ba08ac456f8bcfb69e288e0a064b79b5423 Mon Sep 17 00:00:00 2001 From: Giel van Schijndel Date: Wed, 10 Aug 2011 15:07:46 +0200 Subject: [PATCH] Generalise RPC connection handling code to allow more listening sockets Using this modification it should be relatively easy to, at a later time, listen on multiple addresses (even Unix domain sockets should be possible). Signed-off-by: Giel van Schijndel --- src/bitcoinrpc.cpp | 140 +++++++++++++++++++++++++++++++-------------- 1 file changed, 97 insertions(+), 43 deletions(-) diff --git a/src/bitcoinrpc.cpp b/src/bitcoinrpc.cpp index 08425b40..8a278070 100644 --- a/src/bitcoinrpc.cpp +++ b/src/bitcoinrpc.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -24,7 +25,7 @@ #include #include #include -typedef boost::asio::ssl::stream SSLStream; +#include #define printf OutputDebugStringF // MinGW 3.4.5 gets "fatal error: had to relocate PCH" if the json headers are @@ -2571,9 +2572,10 @@ bool ClientAllowed(const boost::asio::ip::address& address) // // IOStream device that speaks SSL but can also speak non-SSL // +template class SSLIOStreamDevice : public iostreams::device { public: - SSLIOStreamDevice(SSLStream &streamIn, bool fUseSSLIn) : stream(streamIn) + SSLIOStreamDevice(asio::ssl::stream &streamIn, bool fUseSSLIn) : stream(streamIn) { fUseSSL = fUseSSLIn; fNeedHandshake = fUseSSLIn; @@ -2617,21 +2619,54 @@ public: private: bool fNeedHandshake; bool fUseSSL; - SSLStream& stream; + asio::ssl::stream& stream; }; class AcceptedConnection { - public: - SSLStream sslStream; - SSLIOStreamDevice d; - iostreams::stream stream; +public: + virtual ~AcceptedConnection() {} + + virtual std::iostream& stream() = 0; + virtual std::string peer_address_to_string() const = 0; + virtual void close() = 0; +}; - ip::tcp::endpoint peer; +template +class AcceptedConnectionImpl : public AcceptedConnection +{ +public: + AcceptedConnectionImpl( + asio::io_service& io_service, + ssl::context &context, + bool fUseSSL) : + sslStream(io_service, context), + _d(sslStream, fUseSSL), + _stream(_d) + { + } + + virtual std::iostream& stream() + { + return _stream; + } + + virtual std::string peer_address_to_string() const + { + return peer.address().to_string(); + } - AcceptedConnection(asio::io_service &io_service, ssl::context &context, - bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL), - stream(d) { ; } + virtual void close() + { + _stream.close(); + } + + typename Protocol::endpoint peer; + asio::ssl::stream sslStream; + +private: + SSLIOStreamDevice _d; + iostreams::stream< SSLIOStreamDevice > _stream; }; void ThreadRPCServer(void* parg) @@ -2654,7 +2689,8 @@ void ThreadRPCServer(void* parg) } // Forward declaration required for RPCListen -static void RPCAcceptHandler(boost::shared_ptr acceptor, +template +static void RPCAcceptHandler(boost::shared_ptr< basic_socket_acceptor > acceptor, ssl::context& context, bool fUseSSL, AcceptedConnection* conn, @@ -2663,18 +2699,19 @@ static void RPCAcceptHandler(boost::shared_ptr acceptor, /** * Sets up I/O resources to accept and handle a new connection. */ -static void RPCListen(boost::shared_ptr acceptor, +template +static void RPCListen(boost::shared_ptr< basic_socket_acceptor > acceptor, ssl::context& context, const bool fUseSSL) { // Accept connection - AcceptedConnection* conn = new AcceptedConnection(acceptor->get_io_service(), context, fUseSSL); + AcceptedConnectionImpl* conn = new AcceptedConnectionImpl(acceptor->get_io_service(), context, fUseSSL); acceptor->async_accept( conn->sslStream.lowest_layer(), conn->peer, - boost::bind(&RPCAcceptHandler, + boost::bind(&RPCAcceptHandler, acceptor, boost::ref(context), fUseSSL, @@ -2685,7 +2722,8 @@ static void RPCListen(boost::shared_ptr acceptor, /** * Accept and handle incoming connection. */ -static void RPCAcceptHandler(boost::shared_ptr acceptor, +template +static void RPCAcceptHandler(boost::shared_ptr< basic_socket_acceptor > acceptor, ssl::context& context, const bool fUseSSL, AcceptedConnection* conn, @@ -2696,6 +2734,8 @@ static void RPCAcceptHandler(boost::shared_ptr acceptor, // Immediately start accepting new connections RPCListen(acceptor, context, fUseSSL); + AcceptedConnectionImpl* tcp_conn = dynamic_cast< AcceptedConnectionImpl* >(conn); + // TODO: Actually handle errors if (error) { @@ -2705,11 +2745,12 @@ static void RPCAcceptHandler(boost::shared_ptr acceptor, // Restrict callers by IP. It is important to // do this before starting client thread, to filter out // certain DoS and misbehaving clients. - else if (!ClientAllowed(conn->peer.address())) + else if (tcp_conn + && !ClientAllowed(tcp_conn->peer.address())) { // Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake. if (!fUseSSL) - conn->stream << HTTPReply(403, "", false) << std::flush; + conn->stream() << HTTPReply(403, "", false) << std::flush; delete conn; } @@ -2778,21 +2819,21 @@ void ThreadRPCServer2(void* parg) asio::ip::address bindAddress = loopback ? asio::ip::address_v6::loopback() : asio::ip::address_v6::any(); ip::tcp::endpoint endpoint(bindAddress, GetArg("-rpcport", 8332)); - boost::shared_ptr acceptor, acceptor4; + std::list< boost::shared_ptr > acceptors; try { - acceptor.reset(new ip::tcp::acceptor(io_service)); - acceptor->open(endpoint.protocol()); - acceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + acceptors.push_back(boost::shared_ptr(new ip::tcp::acceptor(io_service))); + acceptors.back()->open(endpoint.protocol()); + acceptors.back()->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); // Try making the socket dual IPv6/IPv4 (if listening on the "any" address) boost::system::error_code v6_only_error; - acceptor->set_option(boost::asio::ip::v6_only(loopback), v6_only_error); + acceptors.back()->set_option(boost::asio::ip::v6_only(loopback), v6_only_error); - acceptor->bind(endpoint); - acceptor->listen(socket_base::max_connections); + acceptors.back()->bind(endpoint); + acceptors.back()->listen(socket_base::max_connections); - RPCListen(acceptor, context, fUseSSL); + RPCListen(acceptors.back(), context, fUseSSL); // If dual IPv6/IPv4 failed (or we're opening loopback interfaces only), open IPv4 separately if (loopback || v6_only_error) @@ -2800,13 +2841,13 @@ void ThreadRPCServer2(void* parg) bindAddress = loopback ? asio::ip::address_v4::loopback() : asio::ip::address_v4::any(); endpoint.address(bindAddress); - acceptor4.reset(new ip::tcp::acceptor(io_service)); - acceptor4->open(endpoint.protocol()); - acceptor4->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); - acceptor4->bind(endpoint); - acceptor4->listen(socket_base::max_connections); + acceptors.push_back(boost::shared_ptr(new ip::tcp::acceptor(io_service))); + acceptors.back()->open(endpoint.protocol()); + acceptors.back()->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + acceptors.back()->bind(endpoint); + acceptors.back()->listen(socket_base::max_connections); - RPCListen(acceptor4, context, fUseSSL); + RPCListen(acceptors.back(), context, fUseSSL); } } catch(boost::system::system_error &e) @@ -2821,6 +2862,19 @@ void ThreadRPCServer2(void* parg) while (!fShutdown) io_service.run_one(); vnThreadsRunning[THREAD_RPCLISTENER]++; + + // Terminate all outstanding accept-requests + BOOST_FOREACH(boost::shared_ptr& acceptor, acceptors) + { + acceptor->cancel(); + acceptor->close(); + } + acceptors.clear(); + + // Handle any actions that are still in progress. + vnThreadsRunning[THREAD_RPCLISTENER]--; + io_service.run(); + vnThreadsRunning[THREAD_RPCLISTENER]++; } void ThreadRPCServer3(void* parg) @@ -2833,7 +2887,7 @@ void ThreadRPCServer3(void* parg) loop { if (fShutdown || !fRun) { - conn->stream.close(); + conn->close(); delete conn; --vnThreadsRunning[THREAD_RPCHANDLER]; return; @@ -2841,24 +2895,24 @@ void ThreadRPCServer3(void* parg) map mapHeaders; string strRequest; - ReadHTTP(conn->stream, mapHeaders, strRequest); + ReadHTTP(conn->stream(), mapHeaders, strRequest); // Check authorization if (mapHeaders.count("authorization") == 0) { - conn->stream << HTTPReply(401, "", false) << std::flush; + conn->stream() << HTTPReply(401, "", false) << std::flush; break; } if (!HTTPAuthorized(mapHeaders)) { - printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str()); + printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer_address_to_string().c_str()); /* Deter brute-forcing short passwords. If this results in a DOS the user really shouldn't have their RPC port exposed.*/ if (mapArgs["-rpcpassword"].size() < 20) Sleep(250); - conn->stream << HTTPReply(401, "", false) << std::flush; + conn->stream() << HTTPReply(401, "", false) << std::flush; break; } if (mapHeaders["connection"] == "close") @@ -2900,16 +2954,16 @@ void ThreadRPCServer3(void* parg) // Send reply string strReply = JSONRPCReply(result, Value::null, id); - conn->stream << HTTPReply(200, strReply, fRun) << std::flush; + conn->stream() << HTTPReply(200, strReply, fRun) << std::flush; } catch (Object& objError) { - ErrorReply(conn->stream, objError, id); + ErrorReply(conn->stream(), objError, id); break; } catch (std::exception& e) { - ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id); + ErrorReply(conn->stream(), JSONRPCError(-32700, e.what()), id); break; } } @@ -2961,9 +3015,9 @@ Object CallRPC(const string& strMethod, const Array& params) asio::io_service io_service; ssl::context context(io_service, ssl::context::sslv23); context.set_options(ssl::context::no_sslv2); - SSLStream sslStream(io_service, context); - SSLIOStreamDevice d(sslStream, fUseSSL); - iostreams::stream stream(d); + asio::ssl::stream sslStream(io_service, context); + SSLIOStreamDevice d(sslStream, fUseSSL); + iostreams::stream< SSLIOStreamDevice > stream(d); if (!d.connect(GetArg("-rpcconnect", "127.0.0.1"), GetArg("-rpcport", "8332"))) throw runtime_error("couldn't connect to server");