Browse Source

Create one longpoll thread per pool, using backup pools for those pools that don't have longpoll.

Use the work created from the longpoll return only if we don't have failover-enabled, and only flag the work as a longpoll if it is the current pool.
This will work around the problem of trying to restart the single longpoll thread on pool changes that was leading to race conditions.
It will also have less work restarts from the multiple longpolls received from different pools.
nfactor-troky
Con Kolivas 13 years ago
parent
commit
eda382990a
  1. 117
      cgminer.c
  2. 6
      miner.h
  3. 2
      util.c

117
cgminer.c

@ -143,7 +143,6 @@ char *cgminer_path;
struct thr_info *thr_info; struct thr_info *thr_info;
static int work_thr_id; static int work_thr_id;
int longpoll_thr_id;
static int stage_thr_id; static int stage_thr_id;
static int watchpool_thr_id; static int watchpool_thr_id;
static int watchdog_thr_id; static int watchdog_thr_id;
@ -379,6 +378,7 @@ static void sharelog(const char*disposition, const struct work*work)
static void *submit_work_thread(void *userdata); static void *submit_work_thread(void *userdata);
static void *get_work_thread(void *userdata); static void *get_work_thread(void *userdata);
static void *longpoll_thread(void *userdata);
static void add_pool(void) static void add_pool(void)
{ {
@ -402,6 +402,8 @@ static void add_pool(void)
quit(1, "Failed to create pool submit thread"); quit(1, "Failed to create pool submit thread");
if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool))) if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool)))
quit(1, "Failed to create pool getwork thread"); quit(1, "Failed to create pool getwork thread");
if (unlikely(pthread_create(&pool->longpoll_thread, NULL, longpoll_thread, (void *)pool)))
quit(1, "Failed to create pool longpoll thread");
} }
/* Pool variant of test and set */ /* Pool variant of test and set */
@ -1935,11 +1937,6 @@ static void __kill_work(void)
thr = &thr_info[stage_thr_id]; thr = &thr_info[stage_thr_id];
thr_info_cancel(thr); thr_info_cancel(thr);
applog(LOG_DEBUG, "Killing off longpoll thread");
thr = &thr_info[longpoll_thr_id];
if (have_longpoll)
thr_info_cancel(thr);
applog(LOG_DEBUG, "Killing off API thread"); applog(LOG_DEBUG, "Killing off API thread");
thr = &thr_info[api_thr_id]; thr = &thr_info[api_thr_id];
thr_info_cancel(thr); thr_info_cancel(thr);
@ -1987,9 +1984,6 @@ static void sighandler(int __maybe_unused sig)
kill_work(); kill_work();
} }
static void start_longpoll(void);
static void stop_longpoll(void);
/* One get work thread is created per pool, so as to use one curl handle for /* One get work thread is created per pool, so as to use one curl handle for
* all getwork reqeusts from the same pool, minimising connections opened, but * all getwork reqeusts from the same pool, minimising connections opened, but
* separate from the submit work curl handle to not delay share submissions due * separate from the submit work curl handle to not delay share submissions due
@ -2015,12 +2009,6 @@ static void *get_work_thread(void *userdata)
struct work *ret_work; struct work *ret_work;
int failures = 0; int failures = 0;
if (unlikely(!pool->is_lp && pool == current_pool() &&
pool->hdr_path && !pool_tset(pool, &pool->lp_sent))) {
stop_longpoll();
start_longpoll();
}
ret_work = make_work(); ret_work = make_work();
if (wc->thr) if (wc->thr)
@ -2568,6 +2556,7 @@ void remove_pool(struct pool *pool)
} }
/* Give it an invalid number */ /* Give it an invalid number */
pool->pool_no = total_pools; pool->pool_no = total_pools;
pool->removed = true;
total_pools--; total_pools--;
} }
@ -3820,18 +3809,27 @@ enum {
/* Stage another work item from the work returned in a longpoll */ /* Stage another work item from the work returned in a longpoll */
static void convert_to_work(json_t *val, bool rolltime, struct pool *pool) static void convert_to_work(json_t *val, bool rolltime, struct pool *pool)
{ {
struct pool *cp = current_pool();
struct work *work, *work_clone; struct work *work, *work_clone;
bool rc; bool rc;
/* Don't use as work if we have failover-only enabled */
if (pool != cp && opt_fail_only)
return;
work = make_work(); work = make_work();
rc = work_decode(json_object_get(val, "result"), work); rc = work_decode(json_object_get(val, "result"), work);
if (unlikely(!rc)) { if (unlikely(!rc)) {
applog(LOG_ERR, "Could not convert longpoll data to work"); applog(LOG_ERR, "Could not convert longpoll data to work");
free_work(work);
return; return;
} }
work->pool = pool; work->pool = pool;
work->rolltime = rolltime; work->rolltime = rolltime;
/* Only flag this as longpoll work if the pool is the current pool */
if (pool == cp)
work->longpoll = true; work->longpoll = true;
/* We'll be checking this work item twice, but we already know it's /* We'll be checking this work item twice, but we already know it's
@ -3864,9 +3862,8 @@ static void convert_to_work(json_t *val, bool rolltime, struct pool *pool)
/* If we want longpoll, enable it for the chosen default pool, or, if /* If we want longpoll, enable it for the chosen default pool, or, if
* the pool does not support longpoll, find the first one that does * the pool does not support longpoll, find the first one that does
* and use its longpoll support */ * and use its longpoll support */
static struct pool *select_longpoll_pool(void) static struct pool *select_longpoll_pool(struct pool *cp)
{ {
struct pool *cp = current_pool();
int i; int i;
if (cp->hdr_path) if (cp->hdr_path)
@ -3882,37 +3879,37 @@ static struct pool *select_longpoll_pool(void)
static void *longpoll_thread(void *userdata) static void *longpoll_thread(void *userdata)
{ {
struct thr_info *mythr = userdata; struct pool *cp = (struct pool *)userdata;
/* This *pool is the source of the actual longpoll, not the pool we've
* tied it to */
struct pool *pool = NULL;
struct timeval start, end; struct timeval start, end;
struct pool *sp, *pool;
CURL *curl = NULL; CURL *curl = NULL;
int failures = 0; int failures = 0;
bool rolltime; bool rolltime;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
curl = curl_easy_init(); curl = curl_easy_init();
if (unlikely(!curl)) { if (unlikely(!curl)) {
applog(LOG_ERR, "CURL initialisation failed"); applog(LOG_ERR, "CURL initialisation failed");
goto out; goto out;
} }
tq_pop(mythr->q, NULL); retry_pool:
pool = select_longpoll_pool(cp);
pool = select_longpoll_pool();
if (!pool) { if (!pool) {
applog(LOG_WARNING, "No long-poll found on any pool server"); applog(LOG_WARNING, "No suitable long-poll found for pool %s", cp->rpc_url);
while (!pool) { while (!pool) {
sleep(30); sleep(60);
pool = select_longpoll_pool(); pool = select_longpoll_pool(cp);
} }
} }
pool->is_lp = true; if (cp == pool)
have_longpoll = true;
applog(LOG_WARNING, "Long-polling activated for %s", pool->lp_url); applog(LOG_WARNING, "Long-polling activated for %s", pool->lp_url);
else
applog(LOG_WARNING, "Long-polling activated for pool %s via %s", cp->rpc_url, pool->lp_url);
while (1) { while (42) {
json_t *val, *soval; json_t *val, *soval;
gettimeofday(&start, NULL); gettimeofday(&start, NULL);
@ -3951,45 +3948,23 @@ static void *longpoll_thread(void *userdata)
goto out; goto out;
} }
} }
sp = select_longpoll_pool(); if (pool != cp) {
if (sp != pool) { pool = select_longpoll_pool(cp);
pool->is_lp = false; if (unlikely(!pool))
pool = sp; goto retry_pool;
pool->is_lp = true;
applog(LOG_WARNING, "Long-polling changed to %s", pool->lp_url);
} }
if (unlikely(pool->removed))
break;
} }
out: out:
if (curl) if (curl)
curl_easy_cleanup(curl); curl_easy_cleanup(curl);
tq_freeze(mythr->q);
return NULL; return NULL;
} }
static void stop_longpoll(void)
{
struct thr_info *thr = &thr_info[longpoll_thr_id];
thr_info_cancel(thr);
if (have_longpoll)
pthread_join(thr->pth, NULL);
have_longpoll = false;
tq_freeze(thr->q);
}
static void start_longpoll(void)
{
struct thr_info *thr = &thr_info[longpoll_thr_id];
tq_thaw(thr->q);
if (unlikely(thr_info_create(thr, NULL, longpoll_thread, thr)))
quit(1, "longpoll thread create failed");
applog(LOG_DEBUG, "Pushing ping to longpoll thread");
tq_push(thr_info[longpoll_thr_id].q, &ping);
}
void reinit_device(struct cgpu_info *cgpu) void reinit_device(struct cgpu_info *cgpu)
{ {
if (cgpu->api->reinit_device) if (cgpu->api->reinit_device)
@ -4872,7 +4847,7 @@ int main(int argc, char *argv[])
fork_monitor(); fork_monitor();
#endif // defined(unix) #endif // defined(unix)
total_threads = mining_threads + 8; total_threads = mining_threads + 7;
work_restart = calloc(total_threads, sizeof(*work_restart)); work_restart = calloc(total_threads, sizeof(*work_restart));
if (!work_restart) if (!work_restart)
quit(1, "Failed to calloc work_restart"); quit(1, "Failed to calloc work_restart");
@ -4893,15 +4868,7 @@ int main(int argc, char *argv[])
if (thr_info_create(thr, NULL, workio_thread, thr)) if (thr_info_create(thr, NULL, workio_thread, thr))
quit(1, "workio thread create failed"); quit(1, "workio thread create failed");
/* init longpoll thread info */ stage_thr_id = mining_threads + 1;
longpoll_thr_id = mining_threads + 1;
thr = &thr_info[longpoll_thr_id];
thr->id = longpoll_thr_id;
thr->q = tq_new();
if (!thr->q)
quit(1, "Failed to tq_new");
stage_thr_id = mining_threads + 2;
thr = &thr_info[stage_thr_id]; thr = &thr_info[stage_thr_id];
thr->q = tq_new(); thr->q = tq_new();
if (!thr->q) if (!thr->q)
@ -4970,8 +4937,6 @@ int main(int argc, char *argv[])
} }
} while (!pools_active); } while (!pools_active);
start_longpoll();
begin_bench: begin_bench:
total_mhashes_done = 0; total_mhashes_done = 0;
for (i = 0; i < total_devices; i++) { for (i = 0; i < total_devices; i++) {
@ -5039,14 +5004,14 @@ begin_bench:
gettimeofday(&total_tv_start, NULL); gettimeofday(&total_tv_start, NULL);
gettimeofday(&total_tv_end, NULL); gettimeofday(&total_tv_end, NULL);
watchpool_thr_id = mining_threads + 3; watchpool_thr_id = mining_threads + 2;
thr = &thr_info[watchpool_thr_id]; thr = &thr_info[watchpool_thr_id];
/* start watchpool thread */ /* start watchpool thread */
if (thr_info_create(thr, NULL, watchpool_thread, NULL)) if (thr_info_create(thr, NULL, watchpool_thread, NULL))
quit(1, "watchpool thread create failed"); quit(1, "watchpool thread create failed");
pthread_detach(thr->pth); pthread_detach(thr->pth);
watchdog_thr_id = mining_threads + 4; watchdog_thr_id = mining_threads + 3;
thr = &thr_info[watchdog_thr_id]; thr = &thr_info[watchdog_thr_id];
/* start watchdog thread */ /* start watchdog thread */
if (thr_info_create(thr, NULL, watchdog_thread, NULL)) if (thr_info_create(thr, NULL, watchdog_thread, NULL))
@ -5054,7 +5019,7 @@ begin_bench:
pthread_detach(thr->pth); pthread_detach(thr->pth);
/* Create reinit gpu thread */ /* Create reinit gpu thread */
gpur_thr_id = mining_threads + 5; gpur_thr_id = mining_threads + 4;
thr = &thr_info[gpur_thr_id]; thr = &thr_info[gpur_thr_id];
thr->q = tq_new(); thr->q = tq_new();
if (!thr->q) if (!thr->q)
@ -5063,7 +5028,7 @@ begin_bench:
quit(1, "reinit_gpu thread create failed"); quit(1, "reinit_gpu thread create failed");
/* Create API socket thread */ /* Create API socket thread */
api_thr_id = mining_threads + 6; api_thr_id = mining_threads + 5;
thr = &thr_info[api_thr_id]; thr = &thr_info[api_thr_id];
if (thr_info_create(thr, NULL, api_thread, thr)) if (thr_info_create(thr, NULL, api_thread, thr))
quit(1, "API thread create failed"); quit(1, "API thread create failed");
@ -5073,7 +5038,7 @@ begin_bench:
/* Create curses input thread for keyboard input. Create this last so /* Create curses input thread for keyboard input. Create this last so
* that we know all threads are created since this can call kill_work * that we know all threads are created since this can call kill_work
* to try and shut down ll previous threads. */ * to try and shut down ll previous threads. */
input_thr_id = mining_threads + 7; input_thr_id = mining_threads + 6;
thr = &thr_info[input_thr_id]; thr = &thr_info[input_thr_id];
if (thr_info_create(thr, NULL, input_thread, thr)) if (thr_info_create(thr, NULL, input_thread, thr))
quit(1, "input thread create failed"); quit(1, "input thread create failed");

6
miner.h

@ -542,7 +542,6 @@ extern int num_processors;
extern int hw_errors; extern int hw_errors;
extern bool use_syslog; extern bool use_syslog;
extern struct thr_info *thr_info; extern struct thr_info *thr_info;
extern int longpoll_thr_id;
extern struct work_restart *work_restart; extern struct work_restart *work_restart;
extern struct cgpu_info gpus[MAX_GPUDEVICES]; extern struct cgpu_info gpus[MAX_GPUDEVICES];
extern int gpu_threads; extern int gpu_threads;
@ -603,17 +602,17 @@ struct pool {
int pool_no; int pool_no;
int prio; int prio;
int accepted, rejected; int accepted, rejected;
bool submit_fail; bool submit_fail;
bool idle; bool idle;
bool lagging; bool lagging;
bool probed; bool probed;
bool enabled; bool enabled;
bool submit_old; bool submit_old;
bool removed;
char *hdr_path; char *hdr_path;
char *lp_url; char *lp_url;
bool lp_sent;
bool is_lp;
unsigned int getwork_requested; unsigned int getwork_requested;
unsigned int stale_shares; unsigned int stale_shares;
@ -631,6 +630,7 @@ struct pool {
struct thread_q *submit_q; struct thread_q *submit_q;
struct thread_q *getwork_q; struct thread_q *getwork_q;
pthread_t longpoll_thread;
pthread_t submit_thread; pthread_t submit_thread;
pthread_t getwork_thread; pthread_t getwork_thread;

2
util.c

@ -350,8 +350,6 @@ json_t *json_rpc_call(CURL *curl, const char *url,
} }
rc = curl_easy_perform(curl); rc = curl_easy_perform(curl);
if (longpoll)
pool_tclear(pool, &pool->lp_sent);
if (rc) { if (rc) {
applog(LOG_INFO, "HTTP request failed: %s", curl_err_str); applog(LOG_INFO, "HTTP request failed: %s", curl_err_str);
goto err_out; goto err_out;

Loading…
Cancel
Save