From 7a87bee999fc82a8f175bf533e59ef216575d541 Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Fri, 18 Mar 2011 02:53:13 -0400 Subject: [PATCH] Add long polling support --- cpu-miner.c | 122 ++++++++++++++++++++++++++++++++++++++++++++-------- miner.h | 14 +++++- util.c | 81 +++++++++++++++++++++++++++++++++- 3 files changed, 196 insertions(+), 21 deletions(-) diff --git a/cpu-miner.c b/cpu-miner.c index c7a6672b..b521ea7b 100644 --- a/cpu-miner.c +++ b/cpu-miner.c @@ -31,12 +31,6 @@ #define DEF_RPC_URL "http://127.0.0.1:8332/" #define DEF_RPC_USERPASS "rpcuser:rpcpass" -struct thr_info { - int id; - pthread_t pth; - struct thread_q *q; -}; - enum workio_commands { WC_GET_WORK, WC_SUBMIT_WORK, @@ -78,18 +72,21 @@ static const char *algo_names[] = { bool opt_debug = false; bool opt_protocol = false; +bool want_longpoll = true; +bool have_longpoll = false; static bool opt_quiet = false; static int opt_retries = 10; static int opt_fail_pause = 30; -static int opt_scantime = 5; +int opt_scantime = 5; static json_t *opt_config; static const bool opt_time = true; static enum sha256_algos opt_algo = ALGO_C; static int opt_n_threads = 1; static char *rpc_url; static char *userpass; -static struct thr_info *thr_info; +struct thr_info *thr_info; static int work_thr_id; +int longpoll_thr_id; struct work_restart *work_restart = NULL; @@ -130,6 +127,9 @@ static struct option_help options_help[] = { { "debug", "(-D) Enable debug output (default: off)" }, + { "no-longpoll", + "Disable X-Long-Polling support (default: enabled)" }, + { "protocol-dump", "(-P) Verbose dump of protocol-level activities (default: off)" }, @@ -170,6 +170,7 @@ static struct option options[] = { { "scantime", 1, NULL, 's' }, { "url", 1, NULL, 1001 }, { "userpass", 1, NULL, 1002 }, + { "no-longpoll", 0, NULL, 1003 }, { } }; @@ -262,7 +263,7 @@ static bool submit_upstream_work(CURL *curl, const struct work *work) fprintf(stderr, "DBG: sending RPC call:\n%s", s); /* issue JSON-RPC request */ - val = json_rpc_call(curl, rpc_url, userpass, s); + val = json_rpc_call(curl, rpc_url, userpass, s, false, false); if (!val) { fprintf(stderr, "submit_upstream_work json_rpc_call failed\n"); goto out; @@ -285,14 +286,16 @@ out: return rc; } +static const char *rpc_req = + "{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n"; + static bool get_upstream_work(CURL *curl, struct work *work) { - static const char *rpc_req = - "{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n"; json_t *val; bool rc; - val = json_rpc_call(curl, rpc_url, userpass, rpc_req); + val = json_rpc_call(curl, rpc_url, userpass, rpc_req, + want_longpoll, false); if (!val) return false; @@ -593,7 +596,7 @@ out: return NULL; } -void restart_threads(void) +static void restart_threads(void) { int i; @@ -601,6 +604,68 @@ void restart_threads(void) work_restart[i].restart = 1; } +static void *longpoll_thread(void *userdata) +{ + struct thr_info *mythr = userdata; + CURL *curl = NULL; + char *copy_start, *hdr_path, *lp_url = NULL; + bool need_slash = false; + int failures = 0; + + hdr_path = tq_pop(mythr->q, NULL); + if (!hdr_path) + goto out; + copy_start = (*hdr_path == '/') ? (hdr_path + 1) : hdr_path; + if (rpc_url[strlen(rpc_url) - 1] != '/') + need_slash = true; + + lp_url = malloc(strlen(rpc_url) + strlen(copy_start) + 2); + if (!lp_url) + goto out; + + sprintf(lp_url, "%s%s%s", rpc_url, need_slash ? "/" : "", copy_start); + + fprintf(stderr, "Long-polling activated for %s\n", lp_url); + + curl = curl_easy_init(); + if (!curl) { + fprintf(stderr, "CURL initialization failed\n"); + goto out; + } + + while (1) { + json_t *val; + + val = json_rpc_call(curl, lp_url, userpass, rpc_req, + false, true); + if (val) { + failures = 0; + json_decref(val); + fprintf(stderr, "LONGPOLL detected new block\n"); + restart_threads(); + } else { + if (failures++ < 10) { + sleep(30); + fprintf(stderr, + "longpoll failed, sleeping for 30s\n"); + } else { + fprintf(stderr, + "longpoll failed, ending thread\n"); + goto out; + } + } + } + +out: + free(hdr_path); + free(lp_url); + tq_freeze(mythr->q); + if (curl) + curl_easy_cleanup(curl); + + return NULL; +} + static void show_usage(void) { int i; @@ -696,6 +761,9 @@ static void parse_arg (int key, char *arg) free(userpass); userpass = strdup(arg); break; + case 1003: + want_longpoll = false; + break; default: show_usage(); } @@ -763,17 +831,18 @@ int main (int argc, char *argv[]) if (setpriority(PRIO_PROCESS, 0, 19)) perror("setpriority"); - thr_info = calloc(opt_n_threads + 1, sizeof(*thr)); - if (!thr_info) - return 1; - work_restart = calloc(opt_n_threads, sizeof(*work_restart)); if (!work_restart) return 1; + thr_info = calloc(opt_n_threads + 2, sizeof(*thr)); + if (!thr_info) + return 1; + + /* init workio thread info */ work_thr_id = opt_n_threads; thr = &thr_info[work_thr_id]; - thr->id = opt_n_threads; + thr->id = work_thr_id; thr->q = tq_new(); if (!thr->q) return 1; @@ -784,6 +853,23 @@ int main (int argc, char *argv[]) return 1; } + /* init longpoll thread info */ + if (want_longpoll) { + longpoll_thr_id = opt_n_threads + 1; + thr = &thr_info[longpoll_thr_id]; + thr->id = longpoll_thr_id; + thr->q = tq_new(); + if (!thr->q) + return 1; + + /* start longpoll thread */ + if (pthread_create(&thr->pth, NULL, longpoll_thread, thr)) { + fprintf(stderr, "longpoll thread create failed\n"); + return 1; + } + } else + longpoll_thr_id = -1; + /* start mining threads */ for (i = 0; i < opt_n_threads; i++) { thr = &thr_info[i]; diff --git a/miner.h b/miner.h index 0a758c13..dd32ee76 100644 --- a/miner.h +++ b/miner.h @@ -42,6 +42,12 @@ #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0])) #endif +struct thr_info { + int id; + pthread_t pth; + struct thread_q *q; +}; + static inline uint32_t swab32(uint32_t v) { #ifdef WANT_BUILTIN_BSWAP @@ -70,7 +76,7 @@ extern bool opt_debug; extern bool opt_protocol; extern const uint32_t sha256_init_state[]; extern json_t *json_rpc_call(CURL *curl, const char *url, const char *userpass, - const char *rpc_req); + const char *rpc_req, bool, bool); extern char *bin2hex(const unsigned char *p, size_t len); extern bool hex2bin(unsigned char *p, const char *hexstr, size_t len); @@ -110,6 +116,9 @@ timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y); extern bool fulltest(const unsigned char *hash, const unsigned char *target); +extern int opt_scantime; +extern bool want_longpoll; +extern bool have_longpoll; struct thread_q; struct work_restart { @@ -117,8 +126,9 @@ struct work_restart { char padding[128 - sizeof(unsigned long)]; }; +extern struct thr_info *thr_info; +extern int longpoll_thr_id; extern struct work_restart *work_restart; -extern void restart_threads(void); extern struct thread_q *tq_new(void); extern void tq_free(struct thread_q *tq); diff --git a/util.c b/util.c index 55e602c0..5b02e833 100644 --- a/util.c +++ b/util.c @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -29,6 +30,10 @@ struct upload_buffer { size_t len; }; +struct header_info { + char *lp_path; +}; + struct tq_ent { void *data; struct list_head q_node; @@ -95,8 +100,62 @@ static size_t upload_data_cb(void *ptr, size_t size, size_t nmemb, return len; } +static size_t resp_hdr_cb(void *ptr, size_t size, size_t nmemb, void *user_data) +{ + struct header_info *hi = user_data; + size_t remlen, slen, ptrlen = size * nmemb; + char *rem, *val = NULL, *key = NULL; + void *tmp; + + if (opt_protocol) + printf("In resp_hdr_cb\n"); + + val = calloc(1, ptrlen); + key = calloc(1, ptrlen); + if (!key || !val) + goto out; + + tmp = memchr(ptr, ':', ptrlen); + if (!tmp || (tmp == ptr)) /* skip empty keys / blanks */ + goto out; + slen = tmp - ptr; + if ((slen + 1) == ptrlen) /* skip key w/ no value */ + goto out; + memcpy(key, ptr, slen); /* store & nul term key */ + key[slen] = 0; + + rem = ptr + slen + 1; /* trim value's leading whitespace */ + remlen = ptrlen - slen - 1; + while ((remlen > 0) && (isspace(*rem))) { + remlen--; + rem++; + } + + memcpy(val, rem, remlen); /* store value, trim trailing ws */ + val[remlen] = 0; + while ((*val) && (isspace(val[strlen(val) - 1]))) { + val[strlen(val) - 1] = 0; + } + if (!*val) /* skip blank value */ + goto out; + + if (opt_protocol) + printf("HTTP hdr(%s): %s\n", key, val); + + if (!strcasecmp("X-Long-Polling", key)) { + hi->lp_path = val; /* steal memory reference */ + val = NULL; + } + +out: + free(key); + free(val); + return ptrlen; +} + json_t *json_rpc_call(CURL *curl, const char *url, - const char *userpass, const char *rpc_req) + const char *userpass, const char *rpc_req, + bool longpoll_scan, bool longpoll) { json_t *val, *err_val, *res_val; int rc; @@ -106,9 +165,15 @@ json_t *json_rpc_call(CURL *curl, const char *url, struct curl_slist *headers = NULL; char len_hdr[64]; char curl_err_str[CURL_ERROR_SIZE]; + long timeout = longpoll ? (60 * 60) : (60 * 10); + struct header_info hi = { }; + bool lp_scanning = false; /* it is assumed that 'curl' is freshly [re]initialized at this pt */ + if (longpoll_scan) + lp_scanning = want_longpoll && !have_longpoll; + if (opt_protocol) curl_easy_setopt(curl, CURLOPT_VERBOSE, 1); curl_easy_setopt(curl, CURLOPT_URL, url); @@ -121,6 +186,11 @@ json_t *json_rpc_call(CURL *curl, const char *url, curl_easy_setopt(curl, CURLOPT_READDATA, &upload_data); curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_err_str); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout); + if (lp_scanning) { + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, resp_hdr_cb); + curl_easy_setopt(curl, CURLOPT_HEADERDATA, &hi); + } if (userpass) { curl_easy_setopt(curl, CURLOPT_USERPWD, userpass); curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); @@ -148,6 +218,15 @@ json_t *json_rpc_call(CURL *curl, const char *url, goto err_out; } + /* If X-Long-Polling was found, activate long polling */ + if (hi.lp_path) { + have_longpoll = true; + opt_scantime = 60; + tq_push(thr_info[longpoll_thr_id].q, hi.lp_path); + } else + free(hi.lp_path); + hi.lp_path = NULL; + val = json_loads(all_data.buf, &err); if (!val) { fprintf(stderr, "JSON decode failed(%d): %s\n", err.line, err.text);