Browse Source

Set and get the current pool under lock.

nfactor-troky
Con Kolivas 13 years ago
parent
commit
ecda75946b
  1. 73
      main.c

73
main.c

@ -181,7 +181,7 @@ static unsigned int local_work;
static unsigned int total_lo, total_ro; static unsigned int total_lo, total_ro;
static struct pool *pools = NULL; static struct pool *pools = NULL;
static struct pool *cp; /* Current pool */ static struct pool *currentpool;
static int total_pools; static int total_pools;
static bool curses_active = false; static bool curses_active = false;
@ -247,6 +247,23 @@ static bool pool_tclear(struct pool *pool, bool *var)
return ret; return ret;
} }
static struct pool *current_pool(void)
{
struct pool *pool;
pthread_mutex_lock(&control_lock);
pool = currentpool;
pthread_mutex_unlock(&control_lock);
return pool;
}
static void set_current_pool(struct pool *pool)
{
pthread_mutex_lock(&control_lock);
currentpool = pool;
pthread_mutex_unlock(&control_lock);
}
/* FIXME: Use asprintf for better errors. */ /* FIXME: Use asprintf for better errors. */
static char *set_algo(const char *arg, enum sha256_algos *algo) static char *set_algo(const char *arg, enum sha256_algos *algo)
{ {
@ -651,7 +668,7 @@ static void text_print_status(int thr_id)
/* Must be called with curses mutex lock held and curses_active */ /* Must be called with curses mutex lock held and curses_active */
static void curses_print_status(int thr_id) static void curses_print_status(int thr_id)
{ {
struct pool *pool = cp; struct pool *pool = current_pool();
wmove(statuswin, 0, 0); wmove(statuswin, 0, 0);
wattron(statuswin, A_BOLD); wattron(statuswin, A_BOLD);
@ -732,7 +749,7 @@ static bool submit_upstream_work(const struct work *work)
int thr_id = work->thr_id; int thr_id = work->thr_id;
struct cgpu_info *cgpu = thr_info[thr_id].cgpu; struct cgpu_info *cgpu = thr_info[thr_id].cgpu;
CURL *curl = curl_easy_init(); CURL *curl = curl_easy_init();
struct pool *pool = cp; struct pool *pool = work->pool;
if (unlikely(!curl)) { if (unlikely(!curl)) {
applog(LOG_ERR, "CURL initialisation failed"); applog(LOG_ERR, "CURL initialisation failed");
@ -816,7 +833,7 @@ static const char *rpc_req =
static bool get_upstream_work(struct work *work) static bool get_upstream_work(struct work *work)
{ {
struct pool *pool = cp; struct pool *pool = current_pool();
json_t *val; json_t *val;
bool rc = false; bool rc = false;
CURL *curl = curl_easy_init(); CURL *curl = curl_easy_init();
@ -834,6 +851,7 @@ static bool get_upstream_work(struct work *work)
} }
rc = work_decode(json_object_get(val, "result"), work); rc = work_decode(json_object_get(val, "result"), work);
work->pool = pool;
json_decref(val); json_decref(val);
out: out:
@ -997,12 +1015,13 @@ static bool stale_work(struct work *work)
static void *submit_work_thread(void *userdata) static void *submit_work_thread(void *userdata)
{ {
struct workio_cmd *wc = (struct workio_cmd *)userdata; struct workio_cmd *wc = (struct workio_cmd *)userdata;
struct pool *pool = cp; struct work *work = wc->u.work;
struct pool *pool = work->pool;
int failures = 0; int failures = 0;
pthread_detach(pthread_self()); pthread_detach(pthread_self());
if (stale_work(wc->u.work)) { if (stale_work(work)) {
applog(LOG_WARNING, "Stale share detected, discarding"); applog(LOG_WARNING, "Stale share detected, discarding");
total_stale++; total_stale++;
pool->stale_shares++; pool->stale_shares++;
@ -1010,8 +1029,8 @@ static void *submit_work_thread(void *userdata)
} }
/* submit solution to bitcoin via JSON-RPC */ /* submit solution to bitcoin via JSON-RPC */
while (!submit_upstream_work(wc->u.work)) { while (!submit_upstream_work(work)) {
if (stale_work(wc->u.work)) { if (stale_work(work)) {
applog(LOG_WARNING, "Stale share detected, discarding"); applog(LOG_WARNING, "Stale share detected, discarding");
total_stale++; total_stale++;
pool->stale_shares++; pool->stale_shares++;
@ -1044,10 +1063,8 @@ static bool workio_submit_work(struct workio_cmd *wc)
return true; return true;
} }
static void inc_staged(int inc, bool lp) static void inc_staged(struct pool *pool, int inc, bool lp)
{ {
struct pool *pool = cp;
pthread_mutex_lock(&stgd_lock); pthread_mutex_lock(&stgd_lock);
if (lp) { if (lp) {
lp_staged += inc; lp_staged += inc;
@ -1161,7 +1178,7 @@ static void *stage_thread(void *userdata)
ok = false; ok = false;
break; break;
} }
inc_staged(1, false); inc_staged(work->pool, 1, false);
} }
tq_freeze(mythr->q); tq_freeze(mythr->q);
@ -1305,13 +1322,15 @@ static bool queue_request(void)
{ {
int maxq = opt_queue + mining_threads; int maxq = opt_queue + mining_threads;
struct workio_cmd *wc; struct workio_cmd *wc;
struct pool *pool = cp; struct pool *pool;
/* If we've been generating lots of local work we may already have /* If we've been generating lots of local work we may already have
* enough in the queue */ * enough in the queue */
if (requests_queued() >= maxq || real_staged() >= maxq) if (requests_queued() >= maxq || real_staged() >= maxq)
return true; return true;
pool = current_pool();
/* fill out work request message */ /* fill out work request message */
wc = calloc(1, sizeof(*wc)); wc = calloc(1, sizeof(*wc));
if (unlikely(!wc)) { if (unlikely(!wc)) {
@ -1338,7 +1357,7 @@ static bool queue_request(void)
static void discard_staged(void) static void discard_staged(void)
{ {
struct work *work_heap; struct work *work_heap;
struct pool *pool = cp; struct pool *pool;
/* Just in case we fell in a hole and missed a queue filling */ /* Just in case we fell in a hole and missed a queue filling */
if (unlikely(!requests_staged())) if (unlikely(!requests_staged()))
@ -1348,6 +1367,7 @@ static void discard_staged(void)
if (unlikely(!work_heap)) if (unlikely(!work_heap))
return; return;
pool = work_heap->pool;
free(work_heap); free(work_heap);
dec_queued(); dec_queued();
pool->discarded_work++; pool->discarded_work++;
@ -1356,6 +1376,7 @@ static void discard_staged(void)
static void flush_requests(bool longpoll) static void flush_requests(bool longpoll)
{ {
struct pool *pool = current_pool();
int i, stale; int i, stale;
/* We should have one fresh work item staged from the block change. */ /* We should have one fresh work item staged from the block change. */
@ -1365,7 +1386,7 @@ static void flush_requests(bool longpoll)
/* Temporarily increase the staged count so that get_work thinks there /* Temporarily increase the staged count so that get_work thinks there
* is work available instead of making threads reuse existing work */ * is work available instead of making threads reuse existing work */
inc_staged(mining_threads, true); inc_staged(pool, mining_threads, true);
for (i = 0; i < stale; i++) { for (i = 0; i < stale; i++) {
/* Queue a whole batch of new requests */ /* Queue a whole batch of new requests */
@ -1382,7 +1403,7 @@ static void flush_requests(bool longpoll)
static bool get_work(struct work *work, bool queued) static bool get_work(struct work *work, bool queued)
{ {
struct pool *pool = cp; struct pool *pool = current_pool();
struct work *work_heap; struct work *work_heap;
bool ret = false; bool ret = false;
int failures = 0; int failures = 0;
@ -1412,7 +1433,7 @@ retry:
/* A new block appears on average every 10 mins */ /* A new block appears on average every 10 mins */
applog(LOG_WARNING, "Prolonged outage. Going idle till network recovers."); applog(LOG_WARNING, "Prolonged outage. Going idle till network recovers.");
/* Force every thread to wait for new work */ /* Force every thread to wait for new work */
inc_staged(mining_threads, true); inc_staged(pool, mining_threads, true);
goto retry; goto retry;
} }
} }
@ -1960,7 +1981,7 @@ static void *longpoll_thread(void *userdata)
char *copy_start, *hdr_path, *lp_url = NULL; char *copy_start, *hdr_path, *lp_url = NULL;
bool need_slash = false; bool need_slash = false;
int failures = 0; int failures = 0;
struct pool *pool = cp; struct pool *pool = current_pool();
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
@ -2047,6 +2068,7 @@ out:
return NULL; return NULL;
} }
#if 0
static void reinit_cputhread(int thr_id) static void reinit_cputhread(int thr_id)
{ {
struct thr_info *thr = &thr_info[thr_id]; struct thr_info *thr = &thr_info[thr_id];
@ -2114,12 +2136,13 @@ static void reinit_thread(int thr_id)
else else
reinit_cputhread(thr_id); reinit_cputhread(thr_id);
} }
#else #else /* HAVE_OPENCL */
static void reinit_thread(int thr_id) static void reinit_thread(int thr_id)
{ {
reinit_cputhread(thr_id); reinit_cputhread(thr_id);
} }
#endif #endif
#endif /* 0 */
/* Determine which are the first threads belonging to a device and if they're /* Determine which are the first threads belonging to a device and if they're
* active */ * active */
@ -2142,7 +2165,6 @@ static void *watchdog_thread(void *userdata)
{ {
const unsigned int interval = opt_log_interval / 2 ? : 1; const unsigned int interval = opt_log_interval / 2 ? : 1;
struct timeval zero_tv; struct timeval zero_tv;
struct pool *pool = cp;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
@ -2182,10 +2204,8 @@ static void *watchdog_thread(void *userdata)
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
#if 0 #if 0
for (i = 0; i < mining_threads; i++) { //for (i = 0; i < mining_threads; i++) {
#else
for (i = 0; i < gpu_threads; i++) { for (i = 0; i < gpu_threads; i++) {
#endif
struct thr_info *thr = &thr_info[i]; struct thr_info *thr = &thr_info[i];
/* Do not kill threads waiting on longpoll staged work /* Do not kill threads waiting on longpoll staged work
@ -2203,6 +2223,7 @@ static void *watchdog_thread(void *userdata)
applog(LOG_WARNING, "Thread %d restarted", i); applog(LOG_WARNING, "Thread %d restarted", i);
} }
} }
#endif
} }
return NULL; return NULL;
@ -2339,8 +2360,8 @@ int main (int argc, char *argv[])
return 1; return 1;
} }
cp = &pools[0]; set_current_pool(pools);
pool = cp; pool = current_pool();
if (total_devices) { if (total_devices) {
if (total_devices > nDevs) { if (total_devices > nDevs) {
@ -2466,7 +2487,7 @@ int main (int argc, char *argv[])
/* Flag the work as ready forcing the mining threads to wait till we /* Flag the work as ready forcing the mining threads to wait till we
* actually put something into the queue */ * actually put something into the queue */
inc_staged(mining_threads, true); inc_staged(current_pool(), mining_threads, true);
/* Create a unique get work queue */ /* Create a unique get work queue */
getq = tq_new(); getq = tq_new();

Loading…
Cancel
Save