mirror of
https://github.com/twisterarmy/twister-core.git
synced 2025-01-25 22:14:15 +00:00
Support multi-threaded JSON-RPC
Change internal HTTP JSON-RPC server from single-threaded to thread-per-connection model. The IP filter list is applied prior to starting the thread, which then processes the RPC. A mutex covers the entire RPC operation, because not all RPC operations are thread-safe. [minor modifications by jgarzik, to make change upstream-ready]
This commit is contained in:
parent
203f9e6c00
commit
e9205293bd
@ -46,6 +46,8 @@ extern Value importprivkey(const Array& params, bool fHelp);
|
||||
|
||||
const Object emptyobj;
|
||||
|
||||
void ThreadRPCServer3(void* parg);
|
||||
|
||||
Object JSONRPCError(int code, const string& message)
|
||||
{
|
||||
Object error;
|
||||
@ -2021,7 +2023,7 @@ Value getwork(const Array& params, bool fHelp)
|
||||
throw JSONRPCError(-10, "Bitcoin is downloading blocks...");
|
||||
|
||||
typedef map<uint256, pair<CBlock*, CScript> > mapNewBlock_t;
|
||||
static mapNewBlock_t mapNewBlock;
|
||||
static mapNewBlock_t mapNewBlock; // FIXME: thread safety
|
||||
static vector<CBlock*> vNewBlock;
|
||||
static CReserveKey reservekey(pwalletMain);
|
||||
|
||||
@ -2573,20 +2575,34 @@ private:
|
||||
SSLStream& stream;
|
||||
};
|
||||
|
||||
class AcceptedConnection
|
||||
{
|
||||
public:
|
||||
SSLStream sslStream;
|
||||
SSLIOStreamDevice d;
|
||||
iostreams::stream<SSLIOStreamDevice> stream;
|
||||
|
||||
ip::tcp::endpoint peer;
|
||||
|
||||
AcceptedConnection(asio::io_service &io_service, ssl::context &context,
|
||||
bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL),
|
||||
stream(d) { ; }
|
||||
};
|
||||
|
||||
void ThreadRPCServer(void* parg)
|
||||
{
|
||||
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg));
|
||||
try
|
||||
{
|
||||
vnThreadsRunning[THREAD_RPCSERVER]++;
|
||||
vnThreadsRunning[THREAD_RPCLISTENER]++;
|
||||
ThreadRPCServer2(parg);
|
||||
vnThreadsRunning[THREAD_RPCSERVER]--;
|
||||
vnThreadsRunning[THREAD_RPCLISTENER]--;
|
||||
}
|
||||
catch (std::exception& e) {
|
||||
vnThreadsRunning[THREAD_RPCSERVER]--;
|
||||
vnThreadsRunning[THREAD_RPCLISTENER]--;
|
||||
PrintException(&e, "ThreadRPCServer()");
|
||||
} catch (...) {
|
||||
vnThreadsRunning[THREAD_RPCSERVER]--;
|
||||
vnThreadsRunning[THREAD_RPCLISTENER]--;
|
||||
PrintException(NULL, "ThreadRPCServer()");
|
||||
}
|
||||
printf("ThreadRPCServer exiting\n");
|
||||
@ -2664,54 +2680,67 @@ void ThreadRPCServer2(void* parg)
|
||||
loop
|
||||
{
|
||||
// Accept connection
|
||||
SSLStream sslStream(io_service, context);
|
||||
SSLIOStreamDevice d(sslStream, fUseSSL);
|
||||
iostreams::stream<SSLIOStreamDevice> stream(d);
|
||||
AcceptedConnection *conn =
|
||||
new AcceptedConnection(io_service, context, fUseSSL);
|
||||
|
||||
vnThreadsRunning[THREAD_RPCLISTENER]--;
|
||||
acceptor.accept(conn->sslStream.lowest_layer(), conn->peer);
|
||||
vnThreadsRunning[THREAD_RPCLISTENER]++;
|
||||
|
||||
ip::tcp::endpoint peer;
|
||||
vnThreadsRunning[THREAD_RPCSERVER]--;
|
||||
acceptor.accept(sslStream.lowest_layer(), peer);
|
||||
vnThreadsRunning[4]++;
|
||||
if (fShutdown)
|
||||
{
|
||||
delete conn;
|
||||
return;
|
||||
}
|
||||
|
||||
// Restrict callers by IP
|
||||
if (!ClientAllowed(peer.address().to_string()))
|
||||
// Restrict callers by IP. It is important to
|
||||
// do this before starting client thread, to filter out
|
||||
// certain DoS and misbehaving clients.
|
||||
if (!ClientAllowed(conn->peer.address().to_string()))
|
||||
{
|
||||
// Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake.
|
||||
if (!fUseSSL)
|
||||
stream << HTTPReply(403, "") << std::flush;
|
||||
continue;
|
||||
conn->stream << HTTPReply(403, "") << std::flush;
|
||||
delete conn;
|
||||
}
|
||||
|
||||
// start HTTP client thread
|
||||
else if (!CreateThread(ThreadRPCServer3, conn)) {
|
||||
printf("Failed to create RPC server client thread\n");
|
||||
delete conn;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ThreadRPCServer3(void* parg)
|
||||
{
|
||||
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer3(parg));
|
||||
vnThreadsRunning[THREAD_RPCHANDLER]++;
|
||||
AcceptedConnection *conn = (AcceptedConnection *) parg;
|
||||
|
||||
do {
|
||||
map<string, string> mapHeaders;
|
||||
string strRequest;
|
||||
|
||||
boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest));
|
||||
if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30))))
|
||||
{ // Timed out:
|
||||
acceptor.cancel();
|
||||
printf("ThreadRPCServer ReadHTTP timeout\n");
|
||||
continue;
|
||||
}
|
||||
ReadHTTP(conn->stream, mapHeaders, strRequest);
|
||||
|
||||
// Check authorization
|
||||
if (mapHeaders.count("authorization") == 0)
|
||||
{
|
||||
stream << HTTPReply(401, "") << std::flush;
|
||||
continue;
|
||||
conn->stream << HTTPReply(401, "") << std::flush;
|
||||
break;
|
||||
}
|
||||
if (!HTTPAuthorized(mapHeaders))
|
||||
{
|
||||
printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str());
|
||||
printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str());
|
||||
/* Deter brute-forcing short passwords.
|
||||
If this results in a DOS the user really
|
||||
shouldn't have their RPC port exposed.*/
|
||||
if (mapArgs["-rpcpassword"].size() < 20)
|
||||
Sleep(250);
|
||||
|
||||
stream << HTTPReply(401, "") << std::flush;
|
||||
continue;
|
||||
conn->stream << HTTPReply(401, "") << std::flush;
|
||||
break;
|
||||
}
|
||||
|
||||
Value id = Value::null;
|
||||
@ -2750,17 +2779,22 @@ void ThreadRPCServer2(void* parg)
|
||||
|
||||
// Send reply
|
||||
string strReply = JSONRPCReply(result, Value::null, id);
|
||||
stream << HTTPReply(200, strReply) << std::flush;
|
||||
conn->stream << HTTPReply(200, strReply) << std::flush;
|
||||
}
|
||||
catch (Object& objError)
|
||||
{
|
||||
ErrorReply(stream, objError, id);
|
||||
ErrorReply(conn->stream, objError, id);
|
||||
break;
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
ErrorReply(stream, JSONRPCError(-32700, e.what()), id);
|
||||
ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
while (0);
|
||||
delete conn;
|
||||
vnThreadsRunning[THREAD_RPCHANDLER]--;
|
||||
}
|
||||
|
||||
json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array ¶ms) const
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <string>
|
||||
#include <map>
|
||||
|
||||
#define BOOST_SPIRIT_THREADSAFE
|
||||
#include "json/json_spirit_reader_template.h"
|
||||
#include "json/json_spirit_writer_template.h"
|
||||
#include "json/json_spirit_utils.h"
|
||||
|
@ -1839,12 +1839,13 @@ bool StopNode()
|
||||
if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
|
||||
if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
|
||||
if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
|
||||
if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n");
|
||||
if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n");
|
||||
if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n");
|
||||
if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
|
||||
if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
|
||||
if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
|
||||
if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n");
|
||||
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0)
|
||||
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0)
|
||||
Sleep(20);
|
||||
Sleep(50);
|
||||
DumpAddresses();
|
||||
|
Loading…
x
Reference in New Issue
Block a user