Browse Source

Merge #8421: httpserver: drop boost (#8023 dependency)

7e87033 httpserver: replace boost threads with std (Cory Fields)
d3773ca httpserver: explicitly detach worker threads (Cory Fields)
755aa05 httpserver: use a future rather than relying on boost's try_join_for (Cory Fields)
0.14
Wladimir J. van der Laan 9 years ago
parent
commit
b77bb95b3c
No known key found for this signature in database
GPG Key ID: 74810B012346C9A6
  1. 58
      src/httpserver.cpp
  2. 10
      src/httpserver.h

58
src/httpserver.cpp

@ -19,6 +19,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <signal.h> #include <signal.h>
#include <future>
#include <event2/event.h> #include <event2/event.h>
#include <event2/http.h> #include <event2/http.h>
@ -34,9 +35,6 @@
#endif #endif
#endif #endif
#include <boost/algorithm/string/case_conv.hpp> // for to_lower()
#include <boost/foreach.hpp>
/** Maximum size of http request (request line + headers) */ /** Maximum size of http request (request line + headers) */
static const size_t MAX_HEADERS_SIZE = 8192; static const size_t MAX_HEADERS_SIZE = 8192;
@ -68,8 +66,8 @@ class WorkQueue
{ {
private: private:
/** Mutex protects entire object */ /** Mutex protects entire object */
CWaitableCriticalSection cs; std::mutex cs;
CConditionVariable cond; std::condition_variable cond;
std::deque<std::unique_ptr<WorkItem>> queue; std::deque<std::unique_ptr<WorkItem>> queue;
bool running; bool running;
size_t maxDepth; size_t maxDepth;
@ -82,12 +80,12 @@ private:
WorkQueue &wq; WorkQueue &wq;
ThreadCounter(WorkQueue &w): wq(w) ThreadCounter(WorkQueue &w): wq(w)
{ {
boost::lock_guard<boost::mutex> lock(wq.cs); std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads += 1; wq.numThreads += 1;
} }
~ThreadCounter() ~ThreadCounter()
{ {
boost::lock_guard<boost::mutex> lock(wq.cs); std::lock_guard<std::mutex> lock(wq.cs);
wq.numThreads -= 1; wq.numThreads -= 1;
wq.cond.notify_all(); wq.cond.notify_all();
} }
@ -108,7 +106,7 @@ public:
/** Enqueue a work item */ /** Enqueue a work item */
bool Enqueue(WorkItem* item) bool Enqueue(WorkItem* item)
{ {
boost::unique_lock<boost::mutex> lock(cs); std::unique_lock<std::mutex> lock(cs);
if (queue.size() >= maxDepth) { if (queue.size() >= maxDepth) {
return false; return false;
} }
@ -123,7 +121,7 @@ public:
while (running) { while (running) {
std::unique_ptr<WorkItem> i; std::unique_ptr<WorkItem> i;
{ {
boost::unique_lock<boost::mutex> lock(cs); std::unique_lock<std::mutex> lock(cs);
while (running && queue.empty()) while (running && queue.empty())
cond.wait(lock); cond.wait(lock);
if (!running) if (!running)
@ -137,14 +135,14 @@ public:
/** Interrupt and exit loops */ /** Interrupt and exit loops */
void Interrupt() void Interrupt()
{ {
boost::unique_lock<boost::mutex> lock(cs); std::unique_lock<std::mutex> lock(cs);
running = false; running = false;
cond.notify_all(); cond.notify_all();
} }
/** Wait for worker threads to exit */ /** Wait for worker threads to exit */
void WaitExit() void WaitExit()
{ {
boost::unique_lock<boost::mutex> lock(cs); std::unique_lock<std::mutex> lock(cs);
while (numThreads > 0) while (numThreads > 0)
cond.wait(lock); cond.wait(lock);
} }
@ -152,7 +150,7 @@ public:
/** Return current depth of queue */ /** Return current depth of queue */
size_t Depth() size_t Depth()
{ {
boost::unique_lock<boost::mutex> lock(cs); std::unique_lock<std::mutex> lock(cs);
return queue.size(); return queue.size();
} }
}; };
@ -189,7 +187,7 @@ static bool ClientAllowed(const CNetAddr& netaddr)
{ {
if (!netaddr.IsValid()) if (!netaddr.IsValid())
return false; return false;
BOOST_FOREACH (const CSubNet& subnet, rpc_allow_subnets) for(const CSubNet& subnet : rpc_allow_subnets)
if (subnet.Match(netaddr)) if (subnet.Match(netaddr))
return true; return true;
return false; return false;
@ -203,7 +201,7 @@ static bool InitHTTPAllowList()
rpc_allow_subnets.push_back(CSubNet("::1")); // always allow IPv6 localhost rpc_allow_subnets.push_back(CSubNet("::1")); // always allow IPv6 localhost
if (mapMultiArgs.count("-rpcallowip")) { if (mapMultiArgs.count("-rpcallowip")) {
const std::vector<std::string>& vAllow = mapMultiArgs["-rpcallowip"]; const std::vector<std::string>& vAllow = mapMultiArgs["-rpcallowip"];
BOOST_FOREACH (std::string strAllow, vAllow) { for (std::string strAllow : vAllow) {
CSubNet subnet(strAllow); CSubNet subnet(strAllow);
if (!subnet.IsValid()) { if (!subnet.IsValid()) {
uiInterface.ThreadSafeMessageBox( uiInterface.ThreadSafeMessageBox(
@ -215,7 +213,7 @@ static bool InitHTTPAllowList()
} }
} }
std::string strAllowed; std::string strAllowed;
BOOST_FOREACH (const CSubNet& subnet, rpc_allow_subnets) for (const CSubNet& subnet : rpc_allow_subnets)
strAllowed += subnet.ToString() + " "; strAllowed += subnet.ToString() + " ";
LogPrint("http", "Allowing HTTP connections from: %s\n", strAllowed); LogPrint("http", "Allowing HTTP connections from: %s\n", strAllowed);
return true; return true;
@ -302,13 +300,14 @@ static void http_reject_request_cb(struct evhttp_request* req, void*)
} }
/** Event dispatcher thread */ /** Event dispatcher thread */
static void ThreadHTTP(struct event_base* base, struct evhttp* http) static bool ThreadHTTP(struct event_base* base, struct evhttp* http)
{ {
RenameThread("bitcoin-http"); RenameThread("bitcoin-http");
LogPrint("http", "Entering http event loop\n"); LogPrint("http", "Entering http event loop\n");
event_base_dispatch(base); event_base_dispatch(base);
// Event loop will be interrupted by InterruptHTTPServer() // Event loop will be interrupted by InterruptHTTPServer()
LogPrint("http", "Exited http event loop\n"); LogPrint("http", "Exited http event loop\n");
return event_base_got_break(base) == 0;
} }
/** Bind HTTP server to specified addresses */ /** Bind HTTP server to specified addresses */
@ -437,17 +436,22 @@ bool InitHTTPServer()
return true; return true;
} }
boost::thread threadHTTP; std::thread threadHTTP;
std::future<bool> threadResult;
bool StartHTTPServer() bool StartHTTPServer()
{ {
LogPrint("http", "Starting HTTP server\n"); LogPrint("http", "Starting HTTP server\n");
int rpcThreads = std::max((long)GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); int rpcThreads = std::max((long)GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads); LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
threadHTTP = boost::thread(boost::bind(&ThreadHTTP, eventBase, eventHTTP)); std::packaged_task<bool(event_base*, evhttp*)> task(ThreadHTTP);
threadResult = task.get_future();
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);
for (int i = 0; i < rpcThreads; i++) for (int i = 0; i < rpcThreads; i++) {
boost::thread(boost::bind(&HTTPWorkQueueRun, workQueue)); std::thread rpc_worker(HTTPWorkQueueRun, workQueue);
rpc_worker.detach();
}
return true; return true;
} }
@ -456,7 +460,7 @@ void InterruptHTTPServer()
LogPrint("http", "Interrupting HTTP server\n"); LogPrint("http", "Interrupting HTTP server\n");
if (eventHTTP) { if (eventHTTP) {
// Unlisten sockets // Unlisten sockets
BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) { for (evhttp_bound_socket *socket : boundSockets) {
evhttp_del_accept_socket(eventHTTP, socket); evhttp_del_accept_socket(eventHTTP, socket);
} }
// Reject requests on current connections // Reject requests on current connections
@ -482,15 +486,11 @@ void StopHTTPServer()
// master that appears to be solved, so in the future that solution // master that appears to be solved, so in the future that solution
// could be used again (if desirable). // could be used again (if desirable).
// (see discussion in https://github.com/bitcoin/bitcoin/pull/6990) // (see discussion in https://github.com/bitcoin/bitcoin/pull/6990)
#if BOOST_VERSION >= 105000 if (threadResult.valid() && threadResult.wait_for(std::chrono::milliseconds(2000)) == std::future_status::timeout) {
if (!threadHTTP.try_join_for(boost::chrono::milliseconds(2000))) {
#else
if (!threadHTTP.timed_join(boost::posix_time::milliseconds(2000))) {
#endif
LogPrintf("HTTP event loop did not exit within allotted time, sending loopbreak\n"); LogPrintf("HTTP event loop did not exit within allotted time, sending loopbreak\n");
event_base_loopbreak(eventBase); event_base_loopbreak(eventBase);
threadHTTP.join();
} }
threadHTTP.join();
} }
if (eventHTTP) { if (eventHTTP) {
evhttp_free(eventHTTP); evhttp_free(eventHTTP);
@ -517,7 +517,7 @@ static void httpevent_callback_fn(evutil_socket_t, short, void* data)
delete self; delete self;
} }
HTTPEvent::HTTPEvent(struct event_base* base, bool deleteWhenTriggered, const boost::function<void(void)>& handler): HTTPEvent::HTTPEvent(struct event_base* base, bool deleteWhenTriggered, const std::function<void(void)>& handler):
deleteWhenTriggered(deleteWhenTriggered), handler(handler) deleteWhenTriggered(deleteWhenTriggered), handler(handler)
{ {
ev = event_new(base, -1, 0, httpevent_callback_fn, this); ev = event_new(base, -1, 0, httpevent_callback_fn, this);
@ -599,7 +599,7 @@ void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
assert(evb); assert(evb);
evbuffer_add(evb, strReply.data(), strReply.size()); evbuffer_add(evb, strReply.data(), strReply.size());
HTTPEvent* ev = new HTTPEvent(eventBase, true, HTTPEvent* ev = new HTTPEvent(eventBase, true,
boost::bind(evhttp_send_reply, req, nStatus, (const char*)NULL, (struct evbuffer *)NULL)); std::bind(evhttp_send_reply, req, nStatus, (const char*)NULL, (struct evbuffer *)NULL));
ev->trigger(0); ev->trigger(0);
replySent = true; replySent = true;
req = 0; // transferred back to main thread req = 0; // transferred back to main thread

10
src/httpserver.h

@ -7,9 +7,7 @@
#include <string> #include <string>
#include <stdint.h> #include <stdint.h>
#include <boost/thread.hpp> #include <functional>
#include <boost/scoped_ptr.hpp>
#include <boost/function.hpp>
static const int DEFAULT_HTTP_THREADS=4; static const int DEFAULT_HTTP_THREADS=4;
static const int DEFAULT_HTTP_WORKQUEUE=16; static const int DEFAULT_HTTP_WORKQUEUE=16;
@ -35,7 +33,7 @@ void InterruptHTTPServer();
void StopHTTPServer(); void StopHTTPServer();
/** Handler for requests to a certain HTTP path */ /** Handler for requests to a certain HTTP path */
typedef boost::function<void(HTTPRequest* req, const std::string &)> HTTPRequestHandler; typedef std::function<void(HTTPRequest* req, const std::string &)> HTTPRequestHandler;
/** Register handler for prefix. /** Register handler for prefix.
* If multiple handlers match a prefix, the first-registered one will * If multiple handlers match a prefix, the first-registered one will
* be invoked. * be invoked.
@ -132,7 +130,7 @@ public:
* deleteWhenTriggered deletes this event object after the event is triggered (and the handler called) * deleteWhenTriggered deletes this event object after the event is triggered (and the handler called)
* handler is the handler to call when the event is triggered. * handler is the handler to call when the event is triggered.
*/ */
HTTPEvent(struct event_base* base, bool deleteWhenTriggered, const boost::function<void(void)>& handler); HTTPEvent(struct event_base* base, bool deleteWhenTriggered, const std::function<void(void)>& handler);
~HTTPEvent(); ~HTTPEvent();
/** Trigger the event. If tv is 0, trigger it immediately. Otherwise trigger it after /** Trigger the event. If tv is 0, trigger it immediately. Otherwise trigger it after
@ -141,7 +139,7 @@ public:
void trigger(struct timeval* tv); void trigger(struct timeval* tv);
bool deleteWhenTriggered; bool deleteWhenTriggered;
boost::function<void(void)> handler; std::function<void(void)> handler;
private: private:
struct event* ev; struct event* ev;
}; };

Loading…
Cancel
Save