From ed233e8cb55597eb0f221365b18af34dfd0fa124 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 7 Jul 2011 09:58:26 +1000 Subject: [PATCH] Put work into a staging area which makes it possible to check the latest work data received. Then check the latest work data against a store of the current_block and use it to determine if we have moved to a new block. This makes --no-longpoll work just as efficiently as longpoll, and works around when longpoll is unreliable. --- main.c | 123 ++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 104 insertions(+), 19 deletions(-) diff --git a/main.c b/main.c index a783eaca..e77ac977 100644 --- a/main.c +++ b/main.c @@ -145,6 +145,7 @@ static char *rpc_user, *rpc_pass; struct thr_info *thr_info; static int work_thr_id; int longpoll_thr_id; +static int stage_thr_id; struct work_restart *work_restart = NULL; pthread_mutex_t time_lock; static pthread_mutex_t hash_lock; @@ -154,6 +155,8 @@ static struct timeval total_tv_start, total_tv_end; static int accepted, rejected; int hw_errors; static int total_queued; +static char current_block[36]; +static char blank[36]; static void applog_and_exit(const char *fmt, ...) { @@ -616,8 +619,6 @@ static void kill_work(void) } } -static char current_block[36]; - static void *get_work_thread(void *userdata) { struct workio_cmd *wc = (struct workio_cmd *)userdata; @@ -648,7 +649,7 @@ static void *get_work_thread(void *userdata) } /* send work to requesting thread */ - if (unlikely(!tq_push(wc->thr->q, ret_work))) { + if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) { applog(LOG_ERR, "Failed to tq_push work in workio_get_work"); kill_work(); free(ret_work); @@ -674,23 +675,30 @@ static void *submit_work_thread(void *userdata) { struct workio_cmd *wc = (struct workio_cmd *)userdata; int failures = 0; + char *hexstr; pthread_detach(pthread_self()); - if (unlikely(strncmp((const char *)wc->u.work->data, current_block, 36))) { - applog(LOG_INFO, "Stale work detected, discarding"); + + hexstr = bin2hex(wc->u.work->data, 36); + if (unlikely(!hexstr)) { + applog(LOG_ERR, "submit_work_thread OOM"); goto out; } + if (unlikely(strncmp(hexstr, current_block, 36))) { + applog(LOG_INFO, "Stale work detected, discarding"); + goto out_free; + } /* submit solution to bitcoin via JSON-RPC */ while (!submit_upstream_work(wc->u.work)) { - if (unlikely(strncmp((const char *)wc->u.work->data, current_block, 36))) { + if (unlikely(strncmp(hexstr, current_block, 36))) { applog(LOG_INFO, "Stale work detected, discarding"); - goto out; + goto out_free; } if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) { applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries); kill_work(); - goto out; + goto out_free; } /* pause, then restart work-request loop */ @@ -698,7 +706,8 @@ static void *submit_work_thread(void *userdata) opt_fail_pause); sleep(opt_fail_pause); } - +out_free: + free(hexstr); out: workio_cmd_free(wc); return NULL; @@ -715,6 +724,61 @@ static bool workio_submit_work(struct workio_cmd *wc) return true; } +static void *stage_thread(void *userdata) +{ + struct thr_info *mythr = userdata; + bool ok = true; + unsigned int i; + + for (i = 0; i < 36; i++) { + strcat(current_block, "0"); + strcat(blank, "0"); + } + + while (ok) { + struct work *work = NULL; + char *hexstr; + + work = tq_pop(mythr->q, NULL); + if (unlikely(!work)) { + applog(LOG_ERR, "Failed to tq_pop in stage_thread"); + ok = false; + break; + } + + hexstr = bin2hex(work->data, 36); + if (unlikely(!hexstr)) { + applog(LOG_ERR, "stage_thread OOM"); + break; + } + + /* current_block is blanked out on successful longpoll */ + if (likely(strncmp(current_block, blank, 36))) { + if (unlikely(strncmp(hexstr, current_block, 36))) { + if (want_longpoll) + applog(LOG_WARNING, "New block detected, possible missed longpoll, flushing work queue "); + else + applog(LOG_WARNING, "New block detected, flushing work queue "); + /* As we can't flush the work from here, signal + * the wakeup thread to restart all the + * threads */ + work_restart[stage_thr_id].restart = 1; + } + } + memcpy(current_block, hexstr, 36); + free(hexstr); + + if (unlikely(!tq_push(thr_info[0].q, work))) { + applog(LOG_ERR, "Failed to tq_push work in stage_thread"); + ok = false; + break; + } + } + + tq_freeze(mythr->q); + return NULL; +} + static void *workio_thread(void *userdata) { struct thr_info *mythr = userdata; @@ -884,11 +948,19 @@ static bool discard_request(void) return true; } -static void flush_requests(void) +static void flush_requests(bool longpoll) { int i, extra; extra = requests_queued(); + /* When flushing from longpoll, we don't know the new work yet. When + * not flushing from longpoll, the first work item is valid so do not + * discard it */ + if (longpoll) + memcpy(current_block, blank, 36); + else + extra--; + for (i = 0; i < extra; i++) { /* Queue a whole batch of new requests */ if (unlikely(!queue_request())) { @@ -928,7 +1000,7 @@ retry: dec_queued(); memcpy(work, work_heap, sizeof(*work)); - memcpy(current_block, work->data, 36); + ret = true; free(work_heap); out: @@ -1340,12 +1412,12 @@ out: } #endif /* HAVE_OPENCL */ -static void restart_threads(void) +static void restart_threads(bool longpoll) { int i; /* Discard old queued requests and get new ones */ - flush_requests(); + flush_requests(longpoll); for (i = 0; i < opt_n_threads + gpu_threads; i++) work_restart[i].restart = 1; @@ -1399,10 +1471,8 @@ static void *longpoll_thread(void *userdata) failures = 0; json_decref(val); - if (!opt_quiet) - printf("LONGPOLL detected new block \n"); - applog(LOG_INFO, "LONGPOLL detected new block"); - restart_threads(); + applog(LOG_WARNING, "LONGPOLL detected new block "); + restart_threads(true); } else { if (failures++ < 10) { sleep(30); @@ -1437,6 +1507,10 @@ static void *wakeup_thread(void *userdata) while (1) { sleep(interval); hashmeter(-1, &zero_tv, 0); + if (unlikely(work_restart[stage_thr_id].restart)) { + restart_threads(false); + work_restart[stage_thr_id].restart = 0; + } } return NULL; @@ -1510,11 +1584,11 @@ int main (int argc, char *argv[]) openlog("cpuminer", LOG_PID, LOG_USER); #endif - work_restart = calloc(opt_n_threads + gpu_threads, sizeof(*work_restart)); + work_restart = calloc(opt_n_threads + 4 + gpu_threads, sizeof(*work_restart)); if (!work_restart) return 1; - thr_info = calloc(opt_n_threads + 3 + gpu_threads, sizeof(*thr)); + thr_info = calloc(opt_n_threads + 4 + gpu_threads, sizeof(*thr)); if (!thr_info) return 1; @@ -1568,6 +1642,17 @@ int main (int argc, char *argv[]) } } + stage_thr_id = opt_n_threads + gpu_threads + 3; + thr = &thr_info[stage_thr_id]; + thr->q = tq_new(); + if (!thr->q) + return 1; + /* start stage thread */ + if (pthread_create(&thr->pth, NULL, stage_thread, thr)) { + applog(LOG_ERR, "stage thread create failed"); + return 1; + } + /* Put enough work in the queue */ for (i = 0; i < opt_queue + opt_n_threads + gpu_threads; i++) { if (unlikely(!queue_request())) {