Browse Source

Put work into a staging area which makes it possible to check the latest work data received.

Then check the latest work data against a store of the current_block and use it to determine if we have moved to a new block.
This makes --no-longpoll work just as efficiently as longpoll, and works around when longpoll is unreliable.
nfactor-troky
Con Kolivas 14 years ago
parent
commit
ed233e8cb5
  1. 123
      main.c

123
main.c

@ -145,6 +145,7 @@ static char *rpc_user, *rpc_pass; @@ -145,6 +145,7 @@ static char *rpc_user, *rpc_pass;
struct thr_info *thr_info;
static int work_thr_id;
int longpoll_thr_id;
static int stage_thr_id;
struct work_restart *work_restart = NULL;
pthread_mutex_t time_lock;
static pthread_mutex_t hash_lock;
@ -154,6 +155,8 @@ static struct timeval total_tv_start, total_tv_end; @@ -154,6 +155,8 @@ static struct timeval total_tv_start, total_tv_end;
static int accepted, rejected;
int hw_errors;
static int total_queued;
static char current_block[36];
static char blank[36];
static void applog_and_exit(const char *fmt, ...)
{
@ -616,8 +619,6 @@ static void kill_work(void) @@ -616,8 +619,6 @@ static void kill_work(void)
}
}
static char current_block[36];
static void *get_work_thread(void *userdata)
{
struct workio_cmd *wc = (struct workio_cmd *)userdata;
@ -648,7 +649,7 @@ static void *get_work_thread(void *userdata) @@ -648,7 +649,7 @@ static void *get_work_thread(void *userdata)
}
/* send work to requesting thread */
if (unlikely(!tq_push(wc->thr->q, ret_work))) {
if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) {
applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
kill_work();
free(ret_work);
@ -674,23 +675,30 @@ static void *submit_work_thread(void *userdata) @@ -674,23 +675,30 @@ static void *submit_work_thread(void *userdata)
{
struct workio_cmd *wc = (struct workio_cmd *)userdata;
int failures = 0;
char *hexstr;
pthread_detach(pthread_self());
if (unlikely(strncmp((const char *)wc->u.work->data, current_block, 36))) {
applog(LOG_INFO, "Stale work detected, discarding");
hexstr = bin2hex(wc->u.work->data, 36);
if (unlikely(!hexstr)) {
applog(LOG_ERR, "submit_work_thread OOM");
goto out;
}
if (unlikely(strncmp(hexstr, current_block, 36))) {
applog(LOG_INFO, "Stale work detected, discarding");
goto out_free;
}
/* submit solution to bitcoin via JSON-RPC */
while (!submit_upstream_work(wc->u.work)) {
if (unlikely(strncmp((const char *)wc->u.work->data, current_block, 36))) {
if (unlikely(strncmp(hexstr, current_block, 36))) {
applog(LOG_INFO, "Stale work detected, discarding");
goto out;
goto out_free;
}
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries);
kill_work();
goto out;
goto out_free;
}
/* pause, then restart work-request loop */
@ -698,7 +706,8 @@ static void *submit_work_thread(void *userdata) @@ -698,7 +706,8 @@ static void *submit_work_thread(void *userdata)
opt_fail_pause);
sleep(opt_fail_pause);
}
out_free:
free(hexstr);
out:
workio_cmd_free(wc);
return NULL;
@ -715,6 +724,61 @@ static bool workio_submit_work(struct workio_cmd *wc) @@ -715,6 +724,61 @@ static bool workio_submit_work(struct workio_cmd *wc)
return true;
}
static void *stage_thread(void *userdata)
{
struct thr_info *mythr = userdata;
bool ok = true;
unsigned int i;
for (i = 0; i < 36; i++) {
strcat(current_block, "0");
strcat(blank, "0");
}
while (ok) {
struct work *work = NULL;
char *hexstr;
work = tq_pop(mythr->q, NULL);
if (unlikely(!work)) {
applog(LOG_ERR, "Failed to tq_pop in stage_thread");
ok = false;
break;
}
hexstr = bin2hex(work->data, 36);
if (unlikely(!hexstr)) {
applog(LOG_ERR, "stage_thread OOM");
break;
}
/* current_block is blanked out on successful longpoll */
if (likely(strncmp(current_block, blank, 36))) {
if (unlikely(strncmp(hexstr, current_block, 36))) {
if (want_longpoll)
applog(LOG_WARNING, "New block detected, possible missed longpoll, flushing work queue ");
else
applog(LOG_WARNING, "New block detected, flushing work queue ");
/* As we can't flush the work from here, signal
* the wakeup thread to restart all the
* threads */
work_restart[stage_thr_id].restart = 1;
}
}
memcpy(current_block, hexstr, 36);
free(hexstr);
if (unlikely(!tq_push(thr_info[0].q, work))) {
applog(LOG_ERR, "Failed to tq_push work in stage_thread");
ok = false;
break;
}
}
tq_freeze(mythr->q);
return NULL;
}
static void *workio_thread(void *userdata)
{
struct thr_info *mythr = userdata;
@ -884,11 +948,19 @@ static bool discard_request(void) @@ -884,11 +948,19 @@ static bool discard_request(void)
return true;
}
static void flush_requests(void)
static void flush_requests(bool longpoll)
{
int i, extra;
extra = requests_queued();
/* When flushing from longpoll, we don't know the new work yet. When
* not flushing from longpoll, the first work item is valid so do not
* discard it */
if (longpoll)
memcpy(current_block, blank, 36);
else
extra--;
for (i = 0; i < extra; i++) {
/* Queue a whole batch of new requests */
if (unlikely(!queue_request())) {
@ -928,7 +1000,7 @@ retry: @@ -928,7 +1000,7 @@ retry:
dec_queued();
memcpy(work, work_heap, sizeof(*work));
memcpy(current_block, work->data, 36);
ret = true;
free(work_heap);
out:
@ -1340,12 +1412,12 @@ out: @@ -1340,12 +1412,12 @@ out:
}
#endif /* HAVE_OPENCL */
static void restart_threads(void)
static void restart_threads(bool longpoll)
{
int i;
/* Discard old queued requests and get new ones */
flush_requests();
flush_requests(longpoll);
for (i = 0; i < opt_n_threads + gpu_threads; i++)
work_restart[i].restart = 1;
@ -1399,10 +1471,8 @@ static void *longpoll_thread(void *userdata) @@ -1399,10 +1471,8 @@ static void *longpoll_thread(void *userdata)
failures = 0;
json_decref(val);
if (!opt_quiet)
printf("LONGPOLL detected new block \n");
applog(LOG_INFO, "LONGPOLL detected new block");
restart_threads();
applog(LOG_WARNING, "LONGPOLL detected new block ");
restart_threads(true);
} else {
if (failures++ < 10) {
sleep(30);
@ -1437,6 +1507,10 @@ static void *wakeup_thread(void *userdata) @@ -1437,6 +1507,10 @@ static void *wakeup_thread(void *userdata)
while (1) {
sleep(interval);
hashmeter(-1, &zero_tv, 0);
if (unlikely(work_restart[stage_thr_id].restart)) {
restart_threads(false);
work_restart[stage_thr_id].restart = 0;
}
}
return NULL;
@ -1510,11 +1584,11 @@ int main (int argc, char *argv[]) @@ -1510,11 +1584,11 @@ int main (int argc, char *argv[])
openlog("cpuminer", LOG_PID, LOG_USER);
#endif
work_restart = calloc(opt_n_threads + gpu_threads, sizeof(*work_restart));
work_restart = calloc(opt_n_threads + 4 + gpu_threads, sizeof(*work_restart));
if (!work_restart)
return 1;
thr_info = calloc(opt_n_threads + 3 + gpu_threads, sizeof(*thr));
thr_info = calloc(opt_n_threads + 4 + gpu_threads, sizeof(*thr));
if (!thr_info)
return 1;
@ -1568,6 +1642,17 @@ int main (int argc, char *argv[]) @@ -1568,6 +1642,17 @@ int main (int argc, char *argv[])
}
}
stage_thr_id = opt_n_threads + gpu_threads + 3;
thr = &thr_info[stage_thr_id];
thr->q = tq_new();
if (!thr->q)
return 1;
/* start stage thread */
if (pthread_create(&thr->pth, NULL, stage_thread, thr)) {
applog(LOG_ERR, "stage thread create failed");
return 1;
}
/* Put enough work in the queue */
for (i = 0; i < opt_queue + opt_n_threads + gpu_threads; i++) {
if (unlikely(!queue_request())) {

Loading…
Cancel
Save