Browse Source

Implement proper flagging of idle pools, test them with the watchdog thread, and failover correctly.

nfactor-troky
Con Kolivas 13 years ago
parent
commit
6305146bea
  1. 172
      main.c
  2. 3
      miner.h

172
main.c

@ -235,9 +235,8 @@ static void add_pool(void)
applog(LOG_ERR, "Failed to pthread_mutex_init in add_pool"); applog(LOG_ERR, "Failed to pthread_mutex_init in add_pool");
exit (1); exit (1);
} }
/* Make sure the pool doesn't think we've been idle since time 0 if /* Make sure the pool doesn't think we've been idle since time 0 */
* we rush to !localgen */ pool->tv_idle.tv_sec = ~0UL;
pool->tv_localgen.tv_sec = ~0UL;
} }
/* Pool variant of test and set */ /* Pool variant of test and set */
@ -273,23 +272,6 @@ static struct pool *current_pool(void)
return pool; return pool;
} }
static void switch_pools(void)
{
pthread_mutex_lock(&control_lock);
pool_no++;
if (pool_no >= total_pools)
pool_no = 0;
currentpool = &pools[pool_no];
gettimeofday(&currentpool->tv_localgen, NULL);
applog(LOG_WARNING, "Prolonged outage. Attempting to switch to %s", currentpool->rpc_url);
pthread_mutex_unlock(&control_lock);
/* Reset the queued amount to allow more to be queued for the new pool */
pthread_mutex_lock(&qd_lock);
total_queued = 0;
pthread_mutex_unlock(&qd_lock);
}
/* FIXME: Use asprintf for better errors. */ /* FIXME: Use asprintf for better errors. */
static char *set_algo(const char *arg, enum sha256_algos *algo) static char *set_algo(const char *arg, enum sha256_algos *algo)
{ {
@ -843,11 +825,11 @@ static bool submit_upstream_work(const struct work *work)
if (!pool_tset(pool, &pool->submit_fail)) { if (!pool_tset(pool, &pool->submit_fail)) {
total_ro++; total_ro++;
pool->remotefail_occasions++; pool->remotefail_occasions++;
applog(LOG_WARNING, "Upstream communication failure, caching submissions"); applog(LOG_WARNING, "Pool %d communication failure, caching submissions", pool->pool_no);
} }
goto out; goto out;
} else if (pool_tclear(pool, &pool->submit_fail)) } else if (pool_tclear(pool, &pool->submit_fail))
applog(LOG_WARNING, "Upstream communication resumed, submitting work"); applog(LOG_WARNING, "Pool %d communication resumed, submitting work", pool->pool_no);
res = json_object_get(val, "result"); res = json_object_get(val, "result");
@ -1144,11 +1126,9 @@ static void inc_staged(struct pool *pool, int inc, bool lp)
if (lp) { if (lp) {
lp_staged += inc; lp_staged += inc;
total_staged += inc; total_staged += inc;
pool->idle = true; } else if (lp_staged)
} else if (lp_staged) { --lp_staged;
if (!--lp_staged) else
pool->idle = false;
} else
total_staged += inc; total_staged += inc;
pthread_mutex_unlock(&stgd_lock); pthread_mutex_unlock(&stgd_lock);
} }
@ -1180,6 +1160,61 @@ static int real_staged(void)
return ret; return ret;
} }
static void switch_pools(void)
{
struct pool *pool, *last_pool;
int i, pools_active = 0;
for (i = 0; i < total_pools; i++) {
pool = &pools[i];
if (!pool->idle)
pools_active++;
}
if (!pools_active) {
applog(LOG_ERR, "No pools active, waiting...");
goto out;
}
pthread_mutex_lock(&control_lock);
last_pool = currentpool;
switch (pool_strategy) {
/* Both of these set to the master pool */
case POOL_FAILOVER:
case POOL_LOADBALANCE:
for (i = 0; i < total_pools; i++) {
if (!pools[i].idle) {
pool_no = i;
break;
}
}
break;
/* Both of these simply increment and cycle */
case POOL_ROUNDROBIN:
case POOL_ROTATE:
pool_no++;
if (pool_no >= total_pools)
pool_no = 0;
break;
default:
break;
}
currentpool = &pools[pool_no];
pool = currentpool;
pthread_mutex_unlock(&control_lock);
if (pool != last_pool)
applog(LOG_WARNING, "Switching to %s", pool->rpc_url);
/* Reset the queued amount to allow more to be queued for the new pool */
pthread_mutex_lock(&qd_lock);
total_queued = 0;
pthread_mutex_unlock(&qd_lock);
out:
inc_staged(pool, 1, true);
}
static void set_curblock(char *hexstr) static void set_curblock(char *hexstr)
{ {
struct timeval tv_now; struct timeval tv_now;
@ -1430,6 +1465,7 @@ static bool pool_active(struct pool *pool)
pool->getwork_requested++; pool->getwork_requested++;
inc_queued(); inc_queued();
ret = true; ret = true;
gettimeofday(&pool->tv_idle, NULL);
} else { } else {
applog(LOG_DEBUG, "Successfully retreived but FAILED to decipher work from pool %u %s", applog(LOG_DEBUG, "Successfully retreived but FAILED to decipher work from pool %u %s",
pool->pool_no, pool->rpc_url); pool->pool_no, pool->rpc_url);
@ -1444,6 +1480,20 @@ out:
return ret; return ret;
} }
static void pool_died(struct pool *pool)
{
applog(LOG_WARNING, "Pool %d %s not responding!", pool->pool_no, pool->rpc_url);
gettimeofday(&pool->tv_idle, NULL);
switch_pools();
}
static void pool_resus(struct pool *pool)
{
applog(LOG_WARNING, "Pool %d %s recovered", pool->pool_no, pool->rpc_url);
if (pool->pool_no < pool_no && pool_strategy == POOL_FAILOVER)
switch_pools();
}
static bool queue_request(void) static bool queue_request(void)
{ {
int maxq = opt_queue + mining_threads; int maxq = opt_queue + mining_threads;
@ -1552,29 +1602,20 @@ retry:
uint32_t ntime; uint32_t ntime;
/* Only print this message once each time we shift to localgen */ /* Only print this message once each time we shift to localgen */
if (!pool_tset(pool, &pool->localgen)) { if (!pool_tset(pool, &pool->idle)) {
applog(LOG_WARNING, "Server not providing work fast enough, generating work locally"); applog(LOG_WARNING, "Server not providing work fast enough, generating work locally");
pool->localgen_occasions++; pool->localgen_occasions++;
total_lo++; total_lo++;
gettimeofday(&pool->tv_localgen, NULL); gettimeofday(&pool->tv_idle, NULL);
} else { } else {
struct timeval tv_now, diff; struct timeval tv_now, diff;
gettimeofday(&tv_now, NULL); gettimeofday(&tv_now, NULL);
timeval_subtract(&diff, &tv_now, &pool->tv_localgen); timeval_subtract(&diff, &tv_now, &pool->tv_idle);
if (total_pools > 1) { /* Attempt to switch pools if this one has been unresponsive for >half
/* Attempt to switch pools if this one has been unresponsive for >half * a block's duration */
* a block's duration */ if (diff.tv_sec > 300) {
if (diff.tv_sec > 300) { pool_died(pool);
switch_pools();
inc_staged(pool, 1, true);
goto retry;
}
} else if (diff.tv_sec > 600) {
/* A new block appears on average every 10 mins */
applog(LOG_WARNING, "Prolonged outage. Going idle till network recovers.");
/* Force every thread to wait for new work */
inc_staged(pool, 1, true);
goto retry; goto retry;
} }
} }
@ -1594,23 +1635,18 @@ retry:
/* wait for 1st response, or get cached response */ /* wait for 1st response, or get cached response */
work_heap = tq_pop(getq, &abstime); work_heap = tq_pop(getq, &abstime);
if (unlikely(!work_heap)) { if (unlikely(!work_heap)) {
if (total_pools > 1) { /* Attempt to switch pools if this one has mandatory work that
/* Attempt to switch pools if this one has mandatory * has timed out or does not support rolltime */
* work that has timed out or does not support rolltime */ pool->localgen_occasions++;
pool->localgen_occasions++; total_lo++;
total_lo++; pool_died(pool);
switch_pools();
inc_staged(pool, 1, true);
goto retry;
}
if (!pool_tset(pool, &pool->localgen))
applog(LOG_WARNING, "Timed out waiting for work from server");
goto retry; goto retry;
} }
pool = work_heap->pool;
/* If we make it here we have succeeded in getting fresh work */ /* If we make it here we have succeeded in getting fresh work */
if (pool_tclear(pool, &pool->localgen)) if (pool_tclear(pool, &pool->idle))
applog(LOG_WARNING, "Resuming with work from server"); pool_resus(pool);
dec_queued(); dec_queued();
memcpy(work, work_heap, sizeof(*work)); memcpy(work, work_heap, sizeof(*work));
@ -2366,6 +2402,18 @@ static void *watchdog_thread(void *userdata)
} }
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
for (i = 0; i < total_pools; i++) {
struct pool *pool = &pools[i];
/* Test pool is idle once every minute */
if (pool->idle && now.tv_sec - pool->tv_idle.tv_sec > 60) {
gettimeofday(&pool->tv_idle, NULL);
if (pool_active(pool) && pool_tclear(pool, &pool->idle))
pool_resus(pool);
}
}
//for (i = 0; i < mining_threads; i++) { //for (i = 0; i < mining_threads; i++) {
for (i = 0; i < gpu_threads; i++) { for (i = 0; i < gpu_threads; i++) {
struct thr_info *thr = &thr_info[i]; struct thr_info *thr = &thr_info[i];
@ -2459,7 +2507,7 @@ static void print_summary(void)
int main (int argc, char *argv[]) int main (int argc, char *argv[])
{ {
unsigned int i, j = 0, x, y; unsigned int i, j = 0, x, y, pools_active = 0;
struct sigaction handler; struct sigaction handler;
struct thr_info *thr; struct thr_info *thr;
char name[256]; char name[256];
@ -2688,14 +2736,20 @@ int main (int argc, char *argv[])
struct pool *pool; struct pool *pool;
pool = &pools[i]; pool = &pools[i];
if (pool_active(pool)) if (pool_active(pool)) {
applog(LOG_INFO, "Pool %d %s active", pool->pool_no, pool->rpc_url); applog(LOG_INFO, "Pool %d %s active", pool->pool_no, pool->rpc_url);
else { pools_active++;
} else {
applog(LOG_WARNING, "Unable to get work from pool %d %s", pool->pool_no, pool->rpc_url); applog(LOG_WARNING, "Unable to get work from pool %d %s", pool->pool_no, pool->rpc_url);
pool->idle = true; pool->idle = true;
} }
} }
if (!pools_active) {
applog(LOG_ERR, "No pools active! Exiting.");
return 0;
}
#ifdef HAVE_OPENCL #ifdef HAVE_OPENCL
i = 0; i = 0;

3
miner.h

@ -269,7 +269,6 @@ struct pool {
int pool_no; int pool_no;
int accepted, rejected; int accepted, rejected;
bool submit_fail; bool submit_fail;
bool localgen;
bool idle; bool idle;
bool has_rolltime; bool has_rolltime;
bool probed; bool probed;
@ -278,7 +277,7 @@ struct pool {
unsigned int discarded_work; unsigned int discarded_work;
unsigned int localgen_occasions; unsigned int localgen_occasions;
unsigned int remotefail_occasions; unsigned int remotefail_occasions;
struct timeval tv_localgen; struct timeval tv_idle;
char *rpc_url; char *rpc_url;
char *rpc_userpass; char *rpc_userpass;

Loading…
Cancel
Save