Browse Source

Implement a scaleable networking framework designed to cope with any sized network requirements, yet minimise the number of connections being reoped.

Do this by create a ring buffer linked list of curl handles to be used by getwork, recruiting extra handles when none is immediately available.
nfactor-troky
Con Kolivas 13 years ago
parent
commit
7d288eac9f
  1. 111
      cgminer.c
  2. 7
      miner.h

111
cgminer.c

@ -379,7 +379,6 @@ static void sharelog(const char*disposition, const struct work*work)
} }
static void *submit_work_thread(void *userdata); static void *submit_work_thread(void *userdata);
static void *get_work_thread(void *userdata);
static void add_pool(void) static void add_pool(void)
{ {
@ -392,13 +391,13 @@ static void add_pool(void)
pools[total_pools++] = pool; pools[total_pools++] = pool;
if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL))) if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL)))
quit(1, "Failed to pthread_mutex_init in add_pool"); quit(1, "Failed to pthread_mutex_init in add_pool");
INIT_LIST_HEAD(&pool->curlring);
/* Make sure the pool doesn't think we've been idle since time 0 */ /* Make sure the pool doesn't think we've been idle since time 0 */
pool->tv_idle.tv_sec = ~0UL; pool->tv_idle.tv_sec = ~0UL;
if (unlikely(pthread_create(&pool->submit_thread, NULL, submit_work_thread, (void *)pool))) if (unlikely(pthread_create(&pool->submit_thread, NULL, submit_work_thread, (void *)pool)))
quit(1, "Failed to create pool submit thread"); quit(1, "Failed to create pool submit thread");
if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool)))
quit(1, "Failed to create pool getwork thread");
} }
/* Pool variant of test and set */ /* Pool variant of test and set */
@ -1957,77 +1956,46 @@ static void sighandler(int __maybe_unused sig)
kill_work(); kill_work();
} }
/* One get work thread is created per pool, so as to use one curl handle for /* Called with pool_lock held */
* all getwork reqeusts from the same pool, minimising connections opened, but static void recruit_curl(struct pool *pool)
* separate from the submit work curl handle to not delay share submissions due
* to getwork traffic */
static void *get_work_thread(void *userdata)
{ {
struct pool *pool = (struct pool *)userdata; struct curl_ent *ce = calloc(sizeof(struct curl_ent), 1);
struct workio_cmd *wc;
CURL *curl;
pthread_detach(pthread_self());
/* getwork_q memory never freed */ ce->curl = curl_easy_init();
pool->getwork_q = tq_new(); if (unlikely(!ce->curl || !ce))
if (!pool->getwork_q) quit(1, "Failed to init in recruit_curl");
quit(1, "Failed to tq_new in get_work_thread");
curl = curl_easy_init(); list_add(&ce->node, &pool->curlring);
if (unlikely(!curl)) applog(LOG_DEBUG, "Recruited new curl for pool %d", pool->pool_no);
quit(1, "Failed to initialise pool getwork CURL"); }
while ((wc = tq_pop(pool->getwork_q, NULL)) != NULL) {
struct work *ret_work;
int failures = 0;
ret_work = make_work();
if (wc->thr)
ret_work->thr = wc->thr;
else
ret_work->thr = NULL;
ret_work->pool = pool;
/* obtain new work from bitcoin via JSON-RPC */
while (!get_upstream_work(ret_work, curl)) {
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
free_work(ret_work);
kill_work();
break;
}
/* pause, then restart work-request loop */ static struct curl_ent *pop_curl_entry(struct pool *pool)
applog(LOG_DEBUG, "json_rpc_call failed on get work, retry after %d seconds", {
fail_pause); struct curl_ent *ce;
sleep(fail_pause);
fail_pause += opt_fail_pause;
}
fail_pause = opt_fail_pause;
applog(LOG_DEBUG, "Pushing work to requesting thread"); mutex_lock(&pool->pool_lock);
if (list_empty(&pool->curlring))
recruit_curl(pool);
ce = list_entry(pool->curlring.next, struct curl_ent, node);
list_del(&ce->node);
mutex_unlock(&pool->pool_lock);;
/* send work to requesting thread */ return ce;
if (unlikely(!tq_push(thr_info[stage_thr_id].q, ret_work))) { }
applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
kill_work();
free_work(ret_work);
}
workio_cmd_free(wc);
}
curl_easy_cleanup(curl); static void push_curl_entry(struct curl_ent *ce, struct pool *pool)
return NULL; {
mutex_lock(&pool->pool_lock);
list_add(&ce->node, &pool->curlring);
mutex_unlock(&pool->pool_lock);
} }
static void *get_extra_work(void *userdata) static void *get_work_thread(void *userdata)
{ {
struct workio_cmd *wc = (struct workio_cmd *)userdata; struct workio_cmd *wc = (struct workio_cmd *)userdata;
struct work *ret_work = make_work();; struct work *ret_work = make_work();
CURL *curl = curl_easy_init(); struct curl_ent *ce;
struct pool *pool;
int failures = 0; int failures = 0;
pthread_detach(pthread_self()); pthread_detach(pthread_self());
@ -2039,10 +2007,11 @@ static void *get_extra_work(void *userdata)
else else
ret_work->thr = NULL; ret_work->thr = NULL;
ret_work->pool = select_pool(wc->lagging); pool = ret_work->pool = select_pool(wc->lagging);
ce = pop_curl_entry(pool);
/* obtain new work from bitcoin via JSON-RPC */ /* obtain new work from bitcoin via JSON-RPC */
while (!get_upstream_work(ret_work, curl)) { while (!get_upstream_work(ret_work, ce->curl)) {
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) { if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "json_rpc_call failed, terminating workio thread"); applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
free_work(ret_work); free_work(ret_work);
@ -2069,7 +2038,7 @@ static void *get_extra_work(void *userdata)
out: out:
workio_cmd_free(wc); workio_cmd_free(wc);
curl_easy_cleanup(curl); push_curl_entry(ce, pool);
return NULL; return NULL;
} }
@ -2078,13 +2047,9 @@ out:
* requests */ * requests */
static bool workio_get_work(struct workio_cmd *wc) static bool workio_get_work(struct workio_cmd *wc)
{ {
struct pool *pool = select_pool(wc->lagging);
pthread_t get_thread; pthread_t get_thread;
if (list_empty(&pool->getwork_q->q) || pool->submit_fail) if (unlikely(pthread_create(&get_thread, NULL, get_work_thread, (void *)wc))) {
return tq_push(pool->getwork_q, wc);
if (unlikely(pthread_create(&get_thread, NULL, get_extra_work, (void *)wc))) {
applog(LOG_ERR, "Failed to create get_work_thread"); applog(LOG_ERR, "Failed to create get_work_thread");
return false; return false;
} }
@ -4450,6 +4415,8 @@ int add_pool_details(bool live, char *url, char *user, char *pass)
pool->prio = total_pools; pool->prio = total_pools;
if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL))) if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL)))
quit (1, "Failed to pthread_mutex_init in input_pool"); quit (1, "Failed to pthread_mutex_init in input_pool");
INIT_LIST_HEAD(&pool->curlring);
pool->rpc_url = url; pool->rpc_url = url;
pool->rpc_user = user; pool->rpc_user = user;
pool->rpc_pass = pass; pool->rpc_pass = pass;
@ -4462,8 +4429,6 @@ int add_pool_details(bool live, char *url, char *user, char *pass)
if (unlikely(pthread_create(&pool->submit_thread, NULL, submit_work_thread, (void *)pool))) if (unlikely(pthread_create(&pool->submit_thread, NULL, submit_work_thread, (void *)pool)))
quit(1, "Failed to create pool submit thread"); quit(1, "Failed to create pool submit thread");
if (unlikely(pthread_create(&pool->getwork_thread, NULL, get_work_thread, (void *)pool)))
quit(1, "Failed to create pool getwork thread");
/* Test the pool is not idle if we're live running, otherwise /* Test the pool is not idle if we're live running, otherwise
* it will be tested separately */ * it will be tested separately */

7
miner.h

@ -598,6 +598,11 @@ typedef struct {
} dev_blk_ctx; } dev_blk_ctx;
#endif #endif
struct curl_ent {
CURL *curl;
struct list_head node;
};
struct pool { struct pool {
int pool_no; int pool_no;
int prio; int prio;
@ -634,6 +639,8 @@ struct pool {
pthread_t longpoll_thread; pthread_t longpoll_thread;
pthread_t submit_thread; pthread_t submit_thread;
pthread_t getwork_thread; pthread_t getwork_thread;
struct list_head curlring;
}; };
struct work { struct work {

Loading…
Cancel
Save