Browse Source

Use two separate curl instances for submit and get and use separate threads for each to prevent one blocking the other.

nfactor-troky
Con Kolivas 14 years ago
parent
commit
88d9d631e3
  1. 140
      cpu-miner.c

140
cpu-miner.c

@ -74,6 +74,7 @@ static inline void affine_to_cpu(int id, int cpu)
enum workio_commands { enum workio_commands {
WC_GET_WORK, WC_GET_WORK,
WC_SUBMIT_WORK, WC_SUBMIT_WORK,
WC_DIE,
}; };
struct workio_cmd { struct workio_cmd {
@ -426,15 +427,48 @@ static void workio_cmd_free(struct workio_cmd *wc)
free(wc); free(wc);
} }
static bool workio_get_work(struct workio_cmd *wc, CURL *curl) static void kill_work(void)
{
struct workio_cmd *wc;
applog(LOG_INFO, "Received kill message");
wc = calloc(1, sizeof(*wc));
if (unlikely(!wc)) {
applog(LOG_ERR, "Failed to calloc wc in kill_work");
/* We're just trying to die anyway, so forget graceful */
exit (1);
}
wc->cmd = WC_DIE;
wc->thr = 0;
if (unlikely(!tq_push(thr_info[work_thr_id].q, wc))) {
applog(LOG_ERR, "Failed to tq_push work in kill_work");
exit (1);
}
}
struct io_data{
struct workio_cmd *wc;
CURL *curl;
};
static pthread_t *get_thread = NULL;
static pthread_t *submit_thread = NULL;
static void *get_work_thread(void *userdata)
{ {
struct io_data *io_data = (struct io_data *)userdata;
struct workio_cmd *wc = io_data->wc;
CURL *curl = io_data->curl;
struct work *ret_work; struct work *ret_work;
int failures = 0; int failures = 0;
ret_work = calloc(1, sizeof(*ret_work)); ret_work = calloc(1, sizeof(*ret_work));
if (!ret_work) { if (!ret_work) {
applog(LOG_ERR, "Failed to calloc ret_work in workio_get_work"); applog(LOG_ERR, "Failed to calloc ret_work in workio_get_work");
return false; kill_work();
goto out;
} }
/* obtain new work from bitcoin via JSON-RPC */ /* obtain new work from bitcoin via JSON-RPC */
@ -442,11 +476,12 @@ static bool workio_get_work(struct workio_cmd *wc, CURL *curl)
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) { if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "json_rpc_call failed, terminating workio thread"); applog(LOG_ERR, "json_rpc_call failed, terminating workio thread");
free(ret_work); free(ret_work);
return false; kill_work();
goto out;
} }
/* pause, then restart work-request loop */ /* pause, then restart work-request loop */
applog(LOG_ERR, "json_rpc_call failed, retry after %d seconds", applog(LOG_ERR, "json_rpc_call failed on get work, retry after %d seconds",
opt_fail_pause); opt_fail_pause);
sleep(opt_fail_pause); sleep(opt_fail_pause);
} }
@ -454,29 +489,96 @@ static bool workio_get_work(struct workio_cmd *wc, CURL *curl)
/* send work to requesting thread */ /* send work to requesting thread */
if (unlikely(!tq_push(wc->thr->q, ret_work))) { if (unlikely(!tq_push(wc->thr->q, ret_work))) {
applog(LOG_ERR, "Failed to tq_push work in workio_get_work"); applog(LOG_ERR, "Failed to tq_push work in workio_get_work");
kill_work();
free(ret_work); free(ret_work);
} }
out:
free(io_data);
workio_cmd_free(wc);
return NULL;
}
static bool workio_get_work(struct workio_cmd *wc, CURL *curl)
{
struct io_data *id = malloc(sizeof(struct io_data));
if (unlikely(!id)) {
applog(LOG_ERR, "Failed to malloc id in workio_get_work");
return false;
}
id->wc = wc;
id->curl = curl;
if (!get_thread) {
get_thread = malloc(sizeof(get_thread));
if (unlikely(!get_thread)) {
applog(LOG_ERR, "Failed to malloc get_thread in workio_get_work");
return false;
}
} else
pthread_join(*get_thread, NULL);
if (pthread_create(get_thread, NULL, get_work_thread, (void *)id)) {
applog(LOG_ERR, "Failed to create get_work_thread");
free(id);
return false;
}
return true; return true;
} }
static bool workio_submit_work(struct workio_cmd *wc, CURL *curl) static void *submit_work_thread(void *userdata)
{ {
struct io_data *io_data = (struct io_data *)userdata;
struct workio_cmd *wc = io_data->wc;
CURL *curl = io_data->curl;
int failures = 0; int failures = 0;
/* submit solution to bitcoin via JSON-RPC */ /* submit solution to bitcoin via JSON-RPC */
while (!submit_upstream_work(curl, wc->u.work)) { while (!submit_upstream_work(curl, wc->u.work)) {
if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) { if (unlikely((opt_retries >= 0) && (++failures > opt_retries))) {
applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries); applog(LOG_ERR, "Failed %d retries ...terminating workio thread", opt_retries);
return false; kill_work();
goto out;
} }
/* pause, then restart work-request loop */ /* pause, then restart work-request loop */
applog(LOG_ERR, "...retry after %d seconds", applog(LOG_ERR, "json_rpc_call failed on submit_work, retry after %d seconds",
opt_fail_pause); opt_fail_pause);
sleep(opt_fail_pause); sleep(opt_fail_pause);
} }
out:
workio_cmd_free(wc);
free(io_data);
return NULL;
}
static bool workio_submit_work(struct workio_cmd *wc, CURL *curl)
{
struct io_data *id = malloc(sizeof(struct io_data));
if (unlikely(!id)) {
applog(LOG_ERR, "Failed to malloc id in workio_submit_work");
return false;
}
id->wc = wc;
id->curl = curl;
if (!submit_thread) {
submit_thread = malloc(sizeof(submit_thread));
if (unlikely(!submit_thread)) {
applog(LOG_ERR, "Failed to malloc submit_thread in workio_submit_work");
return false;
}
} else
pthread_join(*submit_thread, NULL);
if (pthread_create(submit_thread, NULL, submit_work_thread, (void *)id)) {
applog(LOG_ERR, "Failed to create submit_work_thread");
free(id);
return false;
}
return true; return true;
} }
@ -484,10 +586,11 @@ static void *workio_thread(void *userdata)
{ {
struct thr_info *mythr = userdata; struct thr_info *mythr = userdata;
bool ok = true; bool ok = true;
CURL *curl; CURL *get_curl, *submit_curl;
curl = curl_easy_init(); get_curl = curl_easy_init();
if (unlikely(!curl)) { submit_curl = curl_easy_init();
if (unlikely(!get_curl || !submit_curl)) {
applog(LOG_ERR, "CURL initialization failed"); applog(LOG_ERR, "CURL initialization failed");
return NULL; return NULL;
} }
@ -497,7 +600,7 @@ static void *workio_thread(void *userdata)
/* wait for workio_cmd sent to us, on our queue */ /* wait for workio_cmd sent to us, on our queue */
wc = tq_pop(mythr->q, NULL); wc = tq_pop(mythr->q, NULL);
if (!wc) { if (unlikely(!wc)) {
ok = false; ok = false;
break; break;
} }
@ -505,22 +608,21 @@ static void *workio_thread(void *userdata)
/* process workio_cmd */ /* process workio_cmd */
switch (wc->cmd) { switch (wc->cmd) {
case WC_GET_WORK: case WC_GET_WORK:
ok = workio_get_work(wc, curl); ok = workio_get_work(wc, get_curl);
break; break;
case WC_SUBMIT_WORK: case WC_SUBMIT_WORK:
ok = workio_submit_work(wc, curl); ok = workio_submit_work(wc, submit_curl);
break; break;
case WC_DIE:
default: /* should never happen */ default:
ok = false; ok = false;
break; break;
} }
workio_cmd_free(wc);
} }
tq_freeze(mythr->q); tq_freeze(mythr->q);
curl_easy_cleanup(curl); curl_easy_cleanup(submit_curl);
curl_easy_cleanup(get_curl);
return NULL; return NULL;
} }
@ -1446,8 +1548,6 @@ int main (int argc, char *argv[])
return 1; return 1;
} }
pthread_detach(thr->pth); pthread_detach(thr->pth);
sleep(1); /* don't pound RPC server all at once */
} }
applog(LOG_INFO, "%d cpu miner threads started, " applog(LOG_INFO, "%d cpu miner threads started, "

Loading…
Cancel
Save