diff --git a/cgminer.c b/cgminer.c index 341dff83..5fce8aa3 100644 --- a/cgminer.c +++ b/cgminer.c @@ -378,6 +378,9 @@ sharelog(const char*disposition, const struct work*work) { applog(LOG_ERR, "sharelog fwrite error"); } +static void *submit_work_thread(void *userdata); +static void *get_work_thread(void *userdata); + static void add_pool(void) { struct pool *pool; @@ -395,6 +398,11 @@ static void add_pool(void) } /* Make sure the pool doesn't think we've been idle since time 0 */ pool->tv_idle.tv_sec = ~0UL; + + if (unlikely(pthread_create(&pool->submit_thread, NULL, submit_work_thread, (void *)pool))) + quit(1, "Failed to create pool submit thread"); + if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool))) + quit(1, "Failed to create pool getwork thread"); } /* Pool variant of test and set */ @@ -1600,18 +1608,13 @@ static bool submit_upstream_work(const struct work *work) bool rc = false; int thr_id = work->thr_id; struct cgpu_info *cgpu = thr_info[thr_id].cgpu; - CURL *curl = curl_easy_init(); struct pool *pool = work->pool; + CURL *curl = pool->submit_curl; bool rolltime; uint32_t *hash32; char hashshow[64+1] = ""; bool isblock; - if (unlikely(!curl)) { - applog(LOG_ERR, "CURL initialisation failed"); - return rc; - } - #ifdef __BIG_ENDIAN__ int swapcounter = 0; for (swapcounter = 0; swapcounter < 32; swapcounter++) @@ -1745,7 +1748,6 @@ static bool submit_upstream_work(const struct work *work) out: free(hexstr); out_nofree: - curl_easy_cleanup(curl); return rc; } @@ -1789,41 +1791,26 @@ static void get_benchmark_work(struct work *work) memcpy(work, &bench_block, min_size); } -static bool get_upstream_work(struct work *work, bool lagging) +static bool get_upstream_work(struct work *work) { - bool rc = false, req_longpoll = false; - struct pool *pool; + struct pool *pool = work->pool; + CURL *curl = pool->getwork_curl; json_t *val = NULL; + bool rc = false; int retries = 0; - CURL *curl; char *url; - curl = curl_easy_init(); - if (unlikely(!curl)) { - applog(LOG_ERR, "CURL initialisation failed"); - return rc; - } - - pool = select_pool(lagging); applog(LOG_DEBUG, "DBG: sending %s get RPC call: %s", pool->rpc_url, rpc_req); url = pool->rpc_url; - /* If this is the current pool and supports longpoll but has not sent - * a longpoll, send one now */ - if (unlikely(want_longpoll && !pool->is_lp && pool == current_pool() && - pool->hdr_path && !pool_tset(pool, &pool->lp_sent))) { - req_longpoll = true; - url = pool->lp_url; - } - retry: /* A single failure response here might be reported as a dead pool and * there may be temporary denied messages etc. falsely reporting * failure so retry a few times before giving up */ while (!val && retries++ < 3) { val = json_rpc_call(curl, url, pool->rpc_userpass, rpc_req, - false, req_longpoll, &work->rolltime, pool, false); + false, false, &work->rolltime, pool, false); } if (unlikely(!val)) { applog(LOG_DEBUG, "Failed json_rpc_call in get_upstream_work"); @@ -1838,13 +1825,12 @@ retry: goto retry; } work->pool = pool; - work->longpoll = req_longpoll; + work->longpoll = false; total_getworks++; pool->getwork_requested++; json_decref(val); out: - curl_easy_cleanup(curl); return rc; } @@ -2001,60 +1987,85 @@ static void sighandler(int __maybe_unused sig) kill_work(); } +static void start_longpoll(void); +static void stop_longpoll(void); + +/* One get work thread is created per pool, so as to use one curl handle for + * all getwork reqeusts from the same pool, minimising connections opened, but + * separate from the submit work curl handle to not delay share submissions due + * to getwork traffic */ static void *get_work_thread(void *userdata) { - struct workio_cmd *wc = (struct workio_cmd *)userdata; - struct work *ret_work; - int failures = 0; + struct pool *pool = (struct pool *)userdata; + struct workio_cmd *wc; pthread_detach(pthread_self()); - ret_work = make_work(); - if (wc->thr) - ret_work->thr = wc->thr; - else - ret_work->thr = NULL; + /* getwork_q memory never freed */ + pool->getwork_q = tq_new(); + if (!pool->getwork_q) + quit(1, "Failed to tq_new in get_work_thread"); - /* obtain new work from bitcoin via JSON-RPC */ - while (!get_upstream_work(ret_work, wc->lagging)) { - if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) { - applog(LOG_ERR, "json_rpc_call failed, terminating workio thread"); - free_work(ret_work); - kill_work(); - goto out; + /* getwork_curl never cleared */ + pool->getwork_curl = curl_easy_init(); + if (unlikely(!pool->getwork_curl)) + quit(1, "Failed to initialise pool getwork CURL"); + + while ((wc = tq_pop(pool->getwork_q, NULL)) != NULL) { + struct work *ret_work; + int failures = 0; + + if (unlikely(want_longpoll && !pool->is_lp && pool == current_pool() && + pool->hdr_path && !pool_tset(pool, &pool->lp_sent))) { + stop_longpoll(); + start_longpoll(); } - /* pause, then restart work-request loop */ - applog(LOG_DEBUG, "json_rpc_call failed on get work, retry after %d seconds", - fail_pause); - sleep(fail_pause); - fail_pause += opt_fail_pause; - } - fail_pause = opt_fail_pause; + ret_work = make_work(); + + if (wc->thr) + ret_work->thr = wc->thr; + else + ret_work->thr = NULL; + + ret_work->pool = pool; + + /* obtain new work from bitcoin via JSON-RPC */ + while (!get_upstream_work(ret_work)) { + if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) { + applog(LOG_ERR, "json_rpc_call failed, terminating workio thread"); + free_work(ret_work); + kill_work(); + break; + } + + /* pause, then restart work-request loop */ + applog(LOG_DEBUG, "json_rpc_call failed on get work, retry after %d seconds", + fail_pause); + sleep(fail_pause); + fail_pause += opt_fail_pause; + } + fail_pause = opt_fail_pause; - applog(LOG_DEBUG, "Pushing work to requesting thread"); + applog(LOG_DEBUG, "Pushing work to requesting thread"); - /* send work to requesting thread */ - if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) { - applog(LOG_ERR, "Failed to tq_push work in workio_get_work"); - kill_work(); - free_work(ret_work); + /* send work to requesting thread */ + if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) { + applog(LOG_ERR, "Failed to tq_push work in workio_get_work"); + kill_work(); + free_work(ret_work); + } + workio_cmd_free(wc); } -out: - workio_cmd_free(wc); return NULL; } static bool workio_get_work(struct workio_cmd *wc) { - pthread_t get_thread; + struct pool *pool = select_pool(wc->lagging); - if (unlikely(pthread_create(&get_thread, NULL, get_work_thread, (void *)wc))) { - applog(LOG_ERR, "Failed to create get_work_thread"); - return false; - } - return true; + return tq_push(pool->getwork_q, wc); } static bool stale_work(struct work *work, bool share) @@ -2080,64 +2091,76 @@ static bool stale_work(struct work *work, bool share) return false; } +/* One submit work thread is created per pool, so as to use one curl handle + * for all submissions to the same pool, minimising connections opened, but + * separate from the getwork curl handle to not delay share submission due to + * getwork traffic */ static void *submit_work_thread(void *userdata) { - struct workio_cmd *wc = (struct workio_cmd *)userdata; - struct work *work = wc->u.work; - struct pool *pool = work->pool; - int failures = 0; + struct pool *pool = (struct pool *)userdata; + struct workio_cmd *wc; pthread_detach(pthread_self()); - if (stale_work(work, true)) { - if (opt_submit_stale) - applog(LOG_NOTICE, "Stale share detected, submitting as user requested"); - else if (pool->submit_old) - applog(LOG_NOTICE, "Stale share detected, submitting as pool requested"); - else { - applog(LOG_NOTICE, "Stale share detected, discarding"); - sharelog("discard", work); - total_stale++; - pool->stale_shares++; - goto out; + /* submit_q memory never freed */ + pool->submit_q = tq_new(); + if (!pool->submit_q ) + quit(1, "Failed to tq_new in submit_work_thread"); + + /* submit_curl never cleared */ + pool->submit_curl = curl_easy_init(); + if (unlikely(!pool->submit_curl)) + quit(1, "Failed to initialise pool submit CURL"); + + while ((wc = tq_pop(pool->submit_q, NULL)) != NULL) { + struct work *work = wc->u.work; + int failures = 0; + + if (stale_work(work, true)) { + if (opt_submit_stale) + applog(LOG_NOTICE, "Stale share detected, submitting as user requested"); + else if (pool->submit_old) + applog(LOG_NOTICE, "Stale share detected, submitting as pool requested"); + else { + applog(LOG_NOTICE, "Stale share detected, discarding"); + sharelog("discard", work); + total_stale++; + pool->stale_shares++; + workio_cmd_free(wc); + continue; + } } - } - /* submit solution to bitcoin via JSON-RPC */ - while (!submit_upstream_work(work)) { - if (!opt_submit_stale && stale_work(work, true)) { - applog(LOG_NOTICE, "Stale share detected, discarding"); - total_stale++; - pool->stale_shares++; - break; - } - if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) { - applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries); - kill_work(); - break; - } + /* submit solution to bitcoin via JSON-RPC */ + while (!submit_upstream_work(work)) { + if (!opt_submit_stale && stale_work(work, true)) { + applog(LOG_NOTICE, "Stale share detected, discarding"); + total_stale++; + pool->stale_shares++; + break; + } + if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) { + applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries); + kill_work(); + break; + } - /* pause, then restart work-request loop */ - applog(LOG_INFO, "json_rpc_call failed on submit_work, retry after %d seconds", - fail_pause); - sleep(fail_pause); - fail_pause += opt_fail_pause; + /* pause, then restart work-request loop */ + applog(LOG_INFO, "json_rpc_call failed on submit_work, retry after %d seconds", + fail_pause); + sleep(fail_pause); + fail_pause += opt_fail_pause; + } + fail_pause = opt_fail_pause; + workio_cmd_free(wc); } - fail_pause = opt_fail_pause; -out: - workio_cmd_free(wc); + return NULL; } static bool workio_submit_work(struct workio_cmd *wc) { - pthread_t submit_thread; - - if (unlikely(pthread_create(&submit_thread, NULL, submit_work_thread, (void *)wc))) { - applog(LOG_ERR, "Failed to create submit_work_thread"); - return false; - } - return true; + return tq_push(wc->u.work->pool->submit_q, wc); } /* Find the pool that currently has the highest priority */ @@ -2904,9 +2927,6 @@ retry: } #endif -static void start_longpoll(void); -static void stop_longpoll(void); - #ifdef HAVE_CURSES static void set_options(void) { @@ -3880,7 +3900,6 @@ static void *longpoll_thread(void *userdata) bool rolltime; pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); - pthread_detach(pthread_self()); curl = curl_easy_init(); if (unlikely(!curl)) { @@ -3959,12 +3978,13 @@ out: return NULL; } -__maybe_unused static void stop_longpoll(void) { struct thr_info *thr = &thr_info[longpoll_thr_id]; thr_info_cancel(thr); + if (have_longpoll) + pthread_join(thr->pth, NULL); have_longpoll = false; tq_freeze(thr->q); } diff --git a/miner.h b/miner.h index 8669e81d..a8fd4070 100644 --- a/miner.h +++ b/miner.h @@ -627,6 +627,15 @@ struct pool { char *rpc_user, *rpc_pass; pthread_mutex_t pool_lock; + + struct thread_q *submit_q; + struct thread_q *getwork_q; + + pthread_t submit_thread; + pthread_t getwork_thread; + + CURL *submit_curl; + CURL *getwork_curl; }; struct work {