Browse Source

Faster hardware can easily outstrip the speed we can get work and submit shares when using only one connection per pool.

Test the queued list to see if any get/submits are already queued and if they are, start recruiting extra connections by generating new threads.
This allows us to reuse network connections at low loads but recuit new open connections as they're needed, so that cgminer can scale to hardware of any size.
nfactor-troky
Con Kolivas 13 years ago
parent
commit
5ad942d187
  1. 141
      cgminer.c

141
cgminer.c

@ -1585,7 +1585,7 @@ bool regeneratehash(const struct work *work) @@ -1585,7 +1585,7 @@ bool regeneratehash(const struct work *work)
return false;
}
static bool submit_upstream_work(const struct work *work)
static bool submit_upstream_work(const struct work *work, CURL *curl)
{
char *hexstr = NULL;
json_t *val, *res;
@ -1594,7 +1594,6 @@ static bool submit_upstream_work(const struct work *work) @@ -1594,7 +1594,6 @@ static bool submit_upstream_work(const struct work *work)
int thr_id = work->thr_id;
struct cgpu_info *cgpu = thr_info[thr_id].cgpu;
struct pool *pool = work->pool;
CURL *curl = pool->submit_curl;
bool rolltime;
uint32_t *hash32;
char hashshow[64+1] = "";
@ -1770,10 +1769,9 @@ static void get_benchmark_work(struct work *work) @@ -1770,10 +1769,9 @@ static void get_benchmark_work(struct work *work)
memcpy(work, &bench_block, min_size);
}
static bool get_upstream_work(struct work *work)
static bool get_upstream_work(struct work *work, CURL *curl)
{
struct pool *pool = work->pool;
CURL *curl = pool->getwork_curl;
json_t *val = NULL;
bool rc = false;
int retries = 0;
@ -1996,7 +1994,7 @@ static void *get_work_thread(void *userdata) @@ -1996,7 +1994,7 @@ static void *get_work_thread(void *userdata)
ret_work->pool = pool;
/* obtain new work from bitcoin via JSON-RPC */
while (!get_upstream_work(ret_work)) {
while (!get_upstream_work(ret_work, pool->getwork_curl)) {
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
free_work(ret_work);
@ -2026,11 +2024,72 @@ static void *get_work_thread(void *userdata) @@ -2026,11 +2024,72 @@ static void *get_work_thread(void *userdata)
return NULL;
}
static void *get_extra_work(void *userdata)
{
struct workio_cmd *wc = (struct workio_cmd *)userdata;
struct work *ret_work = make_work();;
CURL *curl = curl_easy_init();
int failures = 0;
pthread_detach(pthread_self());
applog(LOG_DEBUG, "Creating extra get work thread");
if (wc->thr)
ret_work->thr = wc->thr;
else
ret_work->thr = NULL;
ret_work->pool = select_pool(wc->lagging);
/* obtain new work from bitcoin via JSON-RPC */
while (!get_upstream_work(ret_work, curl)) {
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
free_work(ret_work);
kill_work();
goto out;
}
/* pause, then restart work-request loop */
applog(LOG_DEBUG, "json_rpc_call failed on get work, retry after %d seconds",
fail_pause);
sleep(fail_pause);
fail_pause += opt_fail_pause;
}
fail_pause = opt_fail_pause;
applog(LOG_DEBUG, "Pushing work to requesting thread");
/* send work to requesting thread */
if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) {
applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
kill_work();
free_work(ret_work);
}
out:
workio_cmd_free(wc);
curl_easy_cleanup(curl);
return NULL;
}
/* As per the submit work system, we try to reuse the existing curl handles,
* but start recruiting extra connections if we start accumulating queued
* requests */
static bool workio_get_work(struct workio_cmd *wc)
{
struct pool *pool = select_pool(wc->lagging);
pthread_t get_thread;
return tq_push(pool->getwork_q, wc);
if (list_empty(&pool->getwork_q->q))
return tq_push(pool->getwork_q, wc);
if (unlikely(pthread_create(&get_thread, NULL, get_extra_work, (void *)wc))) {
applog(LOG_ERR, "Failed to create get_work_thread");
return false;
}
return true;
}
static bool stale_work(struct work *work, bool share)
@ -2100,7 +2159,7 @@ static void *submit_work_thread(void *userdata) @@ -2100,7 +2159,7 @@ static void *submit_work_thread(void *userdata)
}
/* submit solution to bitcoin via JSON-RPC */
while (!submit_upstream_work(work)) {
while (!submit_upstream_work(work, pool->submit_curl)) {
if (!opt_submit_stale && stale_work(work, true) && !pool->submit_old) {
applog(LOG_NOTICE, "Stale share detected on submit retry, discarding");
total_stale++;
@ -2126,9 +2185,75 @@ static void *submit_work_thread(void *userdata) @@ -2126,9 +2185,75 @@ static void *submit_work_thread(void *userdata)
return NULL;
}
static void *submit_extra_work(void *userdata)
{
struct workio_cmd *wc = (struct workio_cmd *)userdata;
struct work *work = wc->u.work;
struct pool *pool = work->pool;
CURL *curl = curl_easy_init();
int failures = 0;
pthread_detach(pthread_self());
applog(LOG_DEBUG, "Creating extra submit work thread");
if (stale_work(work, true)) {
if (opt_submit_stale)
applog(LOG_NOTICE, "Stale share detected, submitting as user requested");
else if (pool->submit_old)
applog(LOG_NOTICE, "Stale share detected, submitting as pool requested");
else {
applog(LOG_NOTICE, "Stale share detected, discarding");
sharelog("discard", work);
total_stale++;
pool->stale_shares++;
goto out;
}
}
/* submit solution to bitcoin via JSON-RPC */
while (!submit_upstream_work(work, curl)) {
if (!opt_submit_stale && stale_work(work, true)) {
applog(LOG_NOTICE, "Stale share detected, discarding");
total_stale++;
pool->stale_shares++;
break;
}
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries);
kill_work();
break;
}
/* pause, then restart work-request loop */
applog(LOG_INFO, "json_rpc_call failed on submit_work, retry after %d seconds",
fail_pause);
sleep(fail_pause);
fail_pause += opt_fail_pause;
}
fail_pause = opt_fail_pause;
out:
workio_cmd_free(wc);
curl_easy_cleanup(curl);
return NULL;
}
/* We try to reuse curl handles as much as possible, but if there is already
* work queued to be submitted, we start generating extra handles to submit
* the shares to avoid ever increasing backlogs. This allows us to scale to
* any size hardware */
static bool workio_submit_work(struct workio_cmd *wc)
{
return tq_push(wc->u.work->pool->submit_q, wc);
pthread_t submit_thread;
if (list_empty(&wc->u.work->pool->submit_q->q))
return tq_push(wc->u.work->pool->submit_q, wc);
if (unlikely(pthread_create(&submit_thread, NULL, submit_extra_work, (void *)wc))) {
applog(LOG_ERR, "Failed to create submit_work_thread");
return false;
}
return true;
}
/* Find the pool that currently has the highest priority */

Loading…
Cancel
Save