Browse Source

Use a persistent single separate thread for stratum share submission that uses workqueues since all stratum sends are serialised.

nfactor-troky
Con Kolivas 12 years ago
parent
commit
3a2008ac61
  1. 275
      cgminer.c
  2. 4
      miner.h

275
cgminer.c

@ -3220,105 +3220,6 @@ static void *submit_work_thread(void *userdata)
applog(LOG_DEBUG, "Creating extra submit work thread"); applog(LOG_DEBUG, "Creating extra submit work thread");
if (stale_work(work, true)) {
if (opt_submit_stale)
applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no);
else if (pool->submit_old)
applog(LOG_NOTICE, "Pool %d stale share detected, submitting as pool requested", pool->pool_no);
else {
applog(LOG_NOTICE, "Pool %d stale share detected, discarding", pool->pool_no);
sharelog("discard", work);
mutex_lock(&stats_lock);
total_stale++;
pool->stale_shares++;
total_diff_stale += work->work_difficulty;
pool->diff_stale += work->work_difficulty;
mutex_unlock(&stats_lock);
goto out;
}
work->stale = true;
}
if (work->stratum) {
struct stratum_share *sshare = calloc(sizeof(struct stratum_share), 1);
uint32_t *hash32 = (uint32_t *)work->hash, nonce;
bool submitted = false;
char *noncehex;
char s[1024];
sshare->sshare_time = time(NULL);
/* This work item is freed in parse_stratum_response */
sshare->work = work;
nonce = *((uint32_t *)(work->data + 76));
noncehex = bin2hex((const unsigned char *)&nonce, 4);
memset(s, 0, 1024);
mutex_lock(&sshare_lock);
/* Give the stratum share a unique id */
sshare->id = swork_id++;
mutex_unlock(&sshare_lock);
sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}",
pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id);
free(noncehex);
applog(LOG_INFO, "Submitting share %08lx to pool %d",
(long unsigned int)htole32(hash32[6]), pool->pool_no);
/* Try resubmitting for up to 2 minutes if we fail to submit
* once and the stratum pool nonce1 still matches suggesting
* we may be able to resume. */
while (time(NULL) < sshare->sshare_time + 120) {
bool sessionid_match;
if (likely(stratum_send(pool, s, strlen(s)))) {
if (pool_tclear(pool, &pool->submit_fail))
applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
mutex_lock(&sshare_lock);
HASH_ADD_INT(stratum_shares, id, sshare);
pool->sshares++;
mutex_unlock(&sshare_lock);
applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
submitted = true;
break;
}
if (!pool_tset(pool, &pool->submit_fail) && cnx_needed(pool)) {
applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
total_ro++;
pool->remotefail_occasions++;
}
if (opt_lowmem) {
applog(LOG_DEBUG, "Lowmem option prevents resubmitting stratum share");
break;
}
cg_rlock(&pool->data_lock);
sessionid_match = (pool->nonce1 && !strcmp(work->nonce1, pool->nonce1));
cg_runlock(&pool->data_lock);
if (!sessionid_match) {
applog(LOG_DEBUG, "No matching session id for resubmitting stratum share");
break;
}
/* Retry every 5 seconds */
sleep(5);
}
if (unlikely(!submitted)) {
applog(LOG_DEBUG, "Failed to submit stratum share, discarding");
free_work(work);
free(sshare);
pool->stale_shares++;
total_stale++;
}
goto out;
}
ce = pop_curl_entry(pool); ce = pop_curl_entry(pool);
/* submit solution to bitcoin via JSON-RPC */ /* submit solution to bitcoin via JSON-RPC */
while (!submit_upstream_work(work, ce->curl, resubmit)) { while (!submit_upstream_work(work, ce->curl, resubmit)) {
@ -3337,6 +3238,7 @@ static void *submit_work_thread(void *userdata)
pool->diff_stale += work->work_difficulty; pool->diff_stale += work->work_difficulty;
mutex_unlock(&stats_lock); mutex_unlock(&stats_lock);
free_work(work);
break; break;
} }
@ -3344,7 +3246,7 @@ static void *submit_work_thread(void *userdata)
applog(LOG_INFO, "json_rpc_call failed on submit_work, retrying"); applog(LOG_INFO, "json_rpc_call failed on submit_work, retrying");
} }
push_curl_entry(ce, pool); push_curl_entry(ce, pool);
out:
return NULL; return NULL;
} }
@ -4909,18 +4811,18 @@ static bool supports_resume(struct pool *pool)
return ret; return ret;
} }
/* One stratum thread per pool that has stratum waits on the socket checking /* One stratum receive thread per pool that has stratum waits on the socket
* for new messages and for the integrity of the socket connection. We reset * checking for new messages and for the integrity of the socket connection. We
* the connection based on the integrity of the receive side only as the send * reset the connection based on the integrity of the receive side only as the
* side will eventually expire data it fails to send. */ * send side will eventually expire data it fails to send. */
static void *stratum_thread(void *userdata) static void *stratum_rthread(void *userdata)
{ {
struct pool *pool = (struct pool *)userdata; struct pool *pool = (struct pool *)userdata;
char threadname[16]; char threadname[16];
pthread_detach(pthread_self()); pthread_detach(pthread_self());
snprintf(threadname, 16, "stratum/%d", pool->pool_no); snprintf(threadname, 16, "StratumR/%d", pool->pool_no);
RenameThread(threadname); RenameThread(threadname);
while (42) { while (42) {
@ -5023,10 +4925,125 @@ out:
return NULL; return NULL;
} }
static void init_stratum_thread(struct pool *pool) /* Each pool has one stratum send thread for sending shares to avoid many
* threads being created for submission since all sends need to be serialised
* anyway. */
static void *stratum_sthread(void *userdata)
{ {
if (unlikely(pthread_create(&pool->stratum_thread, NULL, stratum_thread, (void *)pool))) struct pool *pool = (struct pool *)userdata;
quit(1, "Failed to create stratum thread"); char threadname[16];
pthread_detach(pthread_self());
snprintf(threadname, 16, "StratumS/%d", pool->pool_no);
RenameThread(threadname);
pool->stratum_q = tq_new();
if (!pool->stratum_q)
quit(1, "Failed to create stratum_q in stratum_sthread");
while (42) {
struct stratum_share *sshare;
uint32_t *hash32, nonce;
struct work *work;
bool submitted;
char *noncehex;
char s[1024];
if (unlikely(pool->removed))
break;
work = tq_pop(pool->stratum_q, NULL);
if (unlikely(!work))
quit(1, "Stratum q returned empty work");
sshare = calloc(sizeof(struct stratum_share), 1);
hash32 = (uint32_t *)work->hash;
submitted = false;
sshare->sshare_time = time(NULL);
/* This work item is freed in parse_stratum_response */
sshare->work = work;
nonce = *((uint32_t *)(work->data + 76));
noncehex = bin2hex((const unsigned char *)&nonce, 4);
memset(s, 0, 1024);
mutex_lock(&sshare_lock);
/* Give the stratum share a unique id */
sshare->id = swork_id++;
mutex_unlock(&sshare_lock);
sprintf(s, "{\"params\": [\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"id\": %d, \"method\": \"mining.submit\"}",
pool->rpc_user, work->job_id, work->nonce2, work->ntime, noncehex, sshare->id);
free(noncehex);
applog(LOG_INFO, "Submitting share %08lx to pool %d",
(long unsigned int)htole32(hash32[6]), pool->pool_no);
/* Try resubmitting for up to 2 minutes if we fail to submit
* once and the stratum pool nonce1 still matches suggesting
* we may be able to resume. */
while (time(NULL) < sshare->sshare_time + 120) {
bool sessionid_match;
if (likely(stratum_send(pool, s, strlen(s)))) {
if (pool_tclear(pool, &pool->submit_fail))
applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
mutex_lock(&sshare_lock);
HASH_ADD_INT(stratum_shares, id, sshare);
pool->sshares++;
mutex_unlock(&sshare_lock);
applog(LOG_DEBUG, "Successfully submitted, adding to stratum_shares db");
submitted = true;
break;
}
if (!pool_tset(pool, &pool->submit_fail) && cnx_needed(pool)) {
applog(LOG_WARNING, "Pool %d stratum share submission failure", pool->pool_no);
total_ro++;
pool->remotefail_occasions++;
}
if (opt_lowmem) {
applog(LOG_DEBUG, "Lowmem option prevents resubmitting stratum share");
break;
}
cg_rlock(&pool->data_lock);
sessionid_match = (pool->nonce1 && !strcmp(work->nonce1, pool->nonce1));
cg_runlock(&pool->data_lock);
if (!sessionid_match) {
applog(LOG_DEBUG, "No matching session id for resubmitting stratum share");
break;
}
/* Retry every 5 seconds */
sleep(5);
}
if (unlikely(!submitted)) {
applog(LOG_DEBUG, "Failed to submit stratum share, discarding");
free_work(work);
free(sshare);
pool->stale_shares++;
total_stale++;
}
}
/* Freeze the work queue but don't free up its memory in case there is
* work still trying to be submitted to the removed pool. */
tq_freeze(pool->stratum_q);
return NULL;
}
static void init_stratum_threads(struct pool *pool)
{
if (unlikely(pthread_create(&pool->stratum_sthread, NULL, stratum_sthread, (void *)pool)))
quit(1, "Failed to create stratum sthread");
if (unlikely(pthread_create(&pool->stratum_rthread, NULL, stratum_rthread, (void *)pool)))
quit(1, "Failed to create stratum rthread");
} }
static void *longpoll_thread(void *userdata); static void *longpoll_thread(void *userdata);
@ -5069,7 +5086,7 @@ retry_stratum:
bool ret = initiate_stratum(pool) && auth_stratum(pool); bool ret = initiate_stratum(pool) && auth_stratum(pool);
if (ret) if (ret)
init_stratum_thread(pool); init_stratum_threads(pool);
else else
pool_tclear(pool, &pool->stratum_init); pool_tclear(pool, &pool->stratum_init);
return ret; return ret;
@ -5476,13 +5493,45 @@ static struct work *get_work(struct thr_info *thr, const int thr_id)
void submit_work_async(struct work *work_in, struct timeval *tv_work_found) void submit_work_async(struct work *work_in, struct timeval *tv_work_found)
{ {
struct work *work = copy_work(work_in); struct work *work = copy_work(work_in);
struct pool *pool = work->pool;
pthread_t submit_thread; pthread_t submit_thread;
if (tv_work_found) if (tv_work_found)
copy_time(&work->tv_work_found, tv_work_found); copy_time(&work->tv_work_found, tv_work_found);
applog(LOG_DEBUG, "Pushing submit work to work thread");
if (unlikely(pthread_create(&submit_thread, NULL, submit_work_thread, (void *)work))) if (stale_work(work, true)) {
quit(1, "Failed to create submit_work_thread"); if (opt_submit_stale)
applog(LOG_NOTICE, "Pool %d stale share detected, submitting as user requested", pool->pool_no);
else if (pool->submit_old)
applog(LOG_NOTICE, "Pool %d stale share detected, submitting as pool requested", pool->pool_no);
else {
applog(LOG_NOTICE, "Pool %d stale share detected, discarding", pool->pool_no);
sharelog("discard", work);
mutex_lock(&stats_lock);
total_stale++;
pool->stale_shares++;
total_diff_stale += work->work_difficulty;
pool->diff_stale += work->work_difficulty;
mutex_unlock(&stats_lock);
free_work(work);
return;
}
work->stale = true;
}
if (work->stratum) {
applog(LOG_DEBUG, "Pushing pool %d work to stratum queue", pool->pool_no);
if (unlikely(!tq_push(pool->stratum_q, work))) {
applog(LOG_DEBUG, "Discarding work from removed pool");
free_work(work);
}
} else {
applog(LOG_DEBUG, "Pushing submit work to work thread");
if (unlikely(pthread_create(&submit_thread, NULL, submit_work_thread, (void *)work)))
quit(1, "Failed to create submit_work_thread");
}
} }
void inc_hw_errors(struct thr_info *thr) void inc_hw_errors(struct thr_info *thr)

4
miner.h

@ -1126,8 +1126,10 @@ struct pool {
bool stratum_init; bool stratum_init;
bool stratum_notify; bool stratum_notify;
struct stratum_work swork; struct stratum_work swork;
pthread_t stratum_thread; pthread_t stratum_sthread;
pthread_t stratum_rthread;
pthread_mutex_t stratum_lock; pthread_mutex_t stratum_lock;
struct thread_q *stratum_q;
int sshares; /* stratum shares submitted waiting on response */ int sshares; /* stratum shares submitted waiting on response */
/* GBT variables */ /* GBT variables */

Loading…
Cancel
Save