diff --git a/cgminer.c b/cgminer.c index 3037e6d8..1b04fdd4 100644 --- a/cgminer.c +++ b/cgminer.c @@ -3220,105 +3220,6 @@ static void *submit_work_thread(void *userdata) applog(LOG_DEBUG, "Creating extra submit work thread"); - if (stale_work(work, true)) { - if (opt_submit_stale) - applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no); - else if (pool->submit_old) - applog(LOG_NOTICE, "Pool %d stale share detected, submitting as pool requested", pool->pool_no); - else { - applog(LOG_NOTICE, "Pool %d stale share detected, discarding", pool->pool_no); - sharelog("discard", work); - - mutex_lock(&stats_lock); - total_stale++; - pool->stale_shares++; - total_diff_stale += work->work_difficulty; - pool->diff_stale += work->work_difficulty; - mutex_unlock(&stats_lock); - - goto out; - } - work->stale = true; - } - - if (work->stratum) { - struct stratum_share *sshare = calloc(sizeof(struct stratum_share), 1); - uint32_t *hash32 = (uint32_t *)work->hash, nonce; - bool submitted = false; - char *noncehex; - char s[1024]; - - sshare->sshare_time = time(NULL); - /* This work item is freed in parse_stratum_response */ - sshare->work = work; - nonce = *((uint32_t *)(work->data + 76)); - noncehex = bin2hex((const unsigned char *)&nonce, 4); - memset(s, 0, 1024); - - mutex_lock(&sshare_lock); - /* Give the stratum share a unique id */ - sshare->id = swork_id++; - mutex_unlock(&sshare_lock); - - sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}", - pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id); - free(noncehex); - - applog(LOG_INFO, "Submitting share %08lx to pool %d", - (long unsigned int)htole32(hash32[6]), pool->pool_no); - - /* Try resubmitting for up to 2 minutes if we fail to submit - * once and the stratum pool nonce1 still matches suggesting - * we may be able to resume. */ - while (time(NULL) < sshare->sshare_time + 120) { - bool sessionid_match; - - if (likely(stratum_send(pool, s, strlen(s)))) { - if (pool_tclear(pool, &pool->submit_fail)) - applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no); - - mutex_lock(&sshare_lock); - HASH_ADD_INT(stratum_shares, id, sshare); - pool->sshares++; - mutex_unlock(&sshare_lock); - - applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db"); - submitted = true; - break; - } - if (!pool_tset(pool, &pool->submit_fail) && cnx_needed(pool)) { - applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no); - total_ro++; - pool->remotefail_occasions++; - } - - if (opt_lowmem) { - applog(LOG_DEBUG, "Lowmem option prevents resubmitting stratum share"); - break; - } - - cg_rlock(&pool->data_lock); - sessionid_match = (pool->nonce1 && !strcmp(work->nonce1, pool->nonce1)); - cg_runlock(&pool->data_lock); - - if (!sessionid_match) { - applog(LOG_DEBUG, "No matching session id for resubmitting stratum share"); - break; - } - /* Retry every 5 seconds */ - sleep(5); - } - - if (unlikely(!submitted)) { - applog(LOG_DEBUG, "Failed to submit stratum share, discarding"); - free_work(work); - free(sshare); - pool->stale_shares++; - total_stale++; - } - goto out; - } - ce = pop_curl_entry(pool); /* submit solution to bitcoin via JSON-RPC */ while (!submit_upstream_work(work, ce->curl, resubmit)) { @@ -3337,6 +3238,7 @@ static void *submit_work_thread(void *userdata) pool->diff_stale += work->work_difficulty; mutex_unlock(&stats_lock); + free_work(work); break; } @@ -3344,7 +3246,7 @@ static void *submit_work_thread(void *userdata) applog(LOG_INFO, "json_rpc_call failed on submit_work, retrying"); } push_curl_entry(ce, pool); -out: + return NULL; } @@ -4909,18 +4811,18 @@ static bool supports_resume(struct pool *pool) return ret; } -/* One stratum thread per pool that has stratum waits on the socket checking - * for new messages and for the integrity of the socket connection. We reset - * the connection based on the integrity of the receive side only as the send - * side will eventually expire data it fails to send. */ -static void *stratum_thread(void *userdata) +/* One stratum receive thread per pool that has stratum waits on the socket + * checking for new messages and for the integrity of the socket connection. We + * reset the connection based on the integrity of the receive side only as the + * send side will eventually expire data it fails to send. */ +static void *stratum_rthread(void *userdata) { struct pool *pool = (struct pool *)userdata; char threadname[16]; pthread_detach(pthread_self()); - snprintf(threadname, 16, "stratum/%d", pool->pool_no); + snprintf(threadname, 16, "StratumR/%d", pool->pool_no); RenameThread(threadname); while (42) { @@ -5023,10 +4925,125 @@ out: return NULL; } -static void init_stratum_thread(struct pool *pool) +/* Each pool has one stratum send thread for sending shares to avoid many + * threads being created for submission since all sends need to be serialised + * anyway. */ +static void *stratum_sthread(void *userdata) { - if (unlikely(pthread_create(&pool->stratum_thread, NULL, stratum_thread, (void *)pool))) - quit(1, "Failed to create stratum thread"); + struct pool *pool = (struct pool *)userdata; + char threadname[16]; + + pthread_detach(pthread_self()); + + snprintf(threadname, 16, "StratumS/%d", pool->pool_no); + RenameThread(threadname); + + pool->stratum_q = tq_new(); + if (!pool->stratum_q) + quit(1, "Failed to create stratum_q in stratum_sthread"); + + while (42) { + struct stratum_share *sshare; + uint32_t *hash32, nonce; + struct work *work; + bool submitted; + char *noncehex; + char s[1024]; + + if (unlikely(pool->removed)) + break; + + work = tq_pop(pool->stratum_q, NULL); + if (unlikely(!work)) + quit(1, "Stratum q returned empty work"); + + sshare = calloc(sizeof(struct stratum_share), 1); + hash32 = (uint32_t *)work->hash; + submitted = false; + + sshare->sshare_time = time(NULL); + /* This work item is freed in parse_stratum_response */ + sshare->work = work; + nonce = *((uint32_t *)(work->data + 76)); + noncehex = bin2hex((const unsigned char *)&nonce, 4); + memset(s, 0, 1024); + + mutex_lock(&sshare_lock); + /* Give the stratum share a unique id */ + sshare->id = swork_id++; + mutex_unlock(&sshare_lock); + + sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}", + pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id); + free(noncehex); + + applog(LOG_INFO, "Submitting share %08lx to pool %d", + (long unsigned int)htole32(hash32[6]), pool->pool_no); + + /* Try resubmitting for up to 2 minutes if we fail to submit + * once and the stratum pool nonce1 still matches suggesting + * we may be able to resume. */ + while (time(NULL) < sshare->sshare_time + 120) { + bool sessionid_match; + + if (likely(stratum_send(pool, s, strlen(s)))) { + if (pool_tclear(pool, &pool->submit_fail)) + applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no); + + mutex_lock(&sshare_lock); + HASH_ADD_INT(stratum_shares, id, sshare); + pool->sshares++; + mutex_unlock(&sshare_lock); + + applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db"); + submitted = true; + break; + } + if (!pool_tset(pool, &pool->submit_fail) && cnx_needed(pool)) { + applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no); + total_ro++; + pool->remotefail_occasions++; + } + + if (opt_lowmem) { + applog(LOG_DEBUG, "Lowmem option prevents resubmitting stratum share"); + break; + } + + cg_rlock(&pool->data_lock); + sessionid_match = (pool->nonce1 && !strcmp(work->nonce1, pool->nonce1)); + cg_runlock(&pool->data_lock); + + if (!sessionid_match) { + applog(LOG_DEBUG, "No matching session id for resubmitting stratum share"); + break; + } + /* Retry every 5 seconds */ + sleep(5); + } + + if (unlikely(!submitted)) { + applog(LOG_DEBUG, "Failed to submit stratum share, discarding"); + free_work(work); + free(sshare); + pool->stale_shares++; + total_stale++; + } + } + + /* Freeze the work queue but don't free up its memory in case there is + * work still trying to be submitted to the removed pool. */ + tq_freeze(pool->stratum_q); + + return NULL; +} + +static void init_stratum_threads(struct pool *pool) +{ + if (unlikely(pthread_create(&pool->stratum_sthread, NULL, stratum_sthread, (void *)pool))) + quit(1, "Failed to create stratum sthread"); + if (unlikely(pthread_create(&pool->stratum_rthread, NULL, stratum_rthread, (void *)pool))) + quit(1, "Failed to create stratum rthread"); } static void *longpoll_thread(void *userdata); @@ -5069,7 +5086,7 @@ retry_stratum: bool ret = initiate_stratum(pool) && auth_stratum(pool); if (ret) - init_stratum_thread(pool); + init_stratum_threads(pool); else pool_tclear(pool, &pool->stratum_init); return ret; @@ -5476,13 +5493,45 @@ static struct work *get_work(struct thr_info *thr, const int thr_id) void submit_work_async(struct work *work_in, struct timeval *tv_work_found) { struct work *work = copy_work(work_in); + struct pool *pool = work->pool; pthread_t submit_thread; if (tv_work_found) copy_time(&work->tv_work_found, tv_work_found); - applog(LOG_DEBUG, "Pushing submit work to work thread"); - if (unlikely(pthread_create(&submit_thread, NULL, submit_work_thread, (void *)work))) - quit(1, "Failed to create submit_work_thread"); + + if (stale_work(work, true)) { + if (opt_submit_stale) + applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no); + else if (pool->submit_old) + applog(LOG_NOTICE, "Pool %d stale share detected, submitting as pool requested", pool->pool_no); + else { + applog(LOG_NOTICE, "Pool %d stale share detected, discarding", pool->pool_no); + sharelog("discard", work); + + mutex_lock(&stats_lock); + total_stale++; + pool->stale_shares++; + total_diff_stale += work->work_difficulty; + pool->diff_stale += work->work_difficulty; + mutex_unlock(&stats_lock); + + free_work(work); + return; + } + work->stale = true; + } + + if (work->stratum) { + applog(LOG_DEBUG, "Pushing pool %d work to stratum queue", pool->pool_no); + if (unlikely(!tq_push(pool->stratum_q, work))) { + applog(LOG_DEBUG, "Discarding work from removed pool"); + free_work(work); + } + } else { + applog(LOG_DEBUG, "Pushing submit work to work thread"); + if (unlikely(pthread_create(&submit_thread, NULL, submit_work_thread, (void *)work))) + quit(1, "Failed to create submit_work_thread"); + } } void inc_hw_errors(struct thr_info *thr) diff --git a/miner.h b/miner.h index 471b3727..2e6da646 100644 --- a/miner.h +++ b/miner.h @@ -1126,8 +1126,10 @@ struct pool { bool stratum_init; bool stratum_notify; struct stratum_work swork; - pthread_t stratum_thread; + pthread_t stratum_sthread; + pthread_t stratum_rthread; pthread_mutex_t stratum_lock; + struct thread_q *stratum_q; int sshares; /* stratum shares submitted waiting on response */ /* GBT variables */