Browse Source

Implement the ability to live add, enable, disable, and switch to pools.

nfactor-troky
Con Kolivas 13 years ago
parent
commit
b0a8f279f7
  1. 275
      main.c
  2. 1
      miner.h
  3. 11
      util.c

275
main.c

@ -190,9 +190,10 @@ static unsigned int new_blocks;
static unsigned int local_work; static unsigned int local_work;
static unsigned int total_lo, total_ro; static unsigned int total_lo, total_ro;
static struct pool *pools = NULL; #define MAX_POOLS (32)
static struct pool *currentpool;
static int pool_no; static struct pool *pools[MAX_POOLS];
static struct pool *currentpool = NULL;
static int total_pools; static int total_pools;
static enum pool_strategy pool_strategy = POOL_FAILOVER; static enum pool_strategy pool_strategy = POOL_FAILOVER;
static int opt_rotate_period; static int opt_rotate_period;
@ -222,18 +223,15 @@ static void applog_and_exit(const char *fmt, ...)
static void add_pool(void) static void add_pool(void)
{ {
int poolno;
struct pool *pool; struct pool *pool;
poolno = total_pools++; pool = calloc(sizeof(struct pool), 1);
pools = realloc(pools, sizeof(struct pool) * total_pools); if (!pool) {
if (!pools) { applog(LOG_ERR, "Failed to malloc pool in add_pool");
applog(LOG_ERR, "Failed to malloc pools in add_pool");
exit (1); exit (1);
} }
pool = &pools[poolno]; pool->pool_no = total_pools;
memset(pool, 0, sizeof(struct pool)); pools[total_pools++] = pool;
pool->pool_no = poolno;
if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL))) { if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL))) {
applog(LOG_ERR, "Failed to pthread_mutex_init in add_pool"); applog(LOG_ERR, "Failed to pthread_mutex_init in add_pool");
exit (1); exit (1);
@ -367,7 +365,7 @@ static char *set_url(const char *arg, char **p)
total_urls++; total_urls++;
if (total_urls > total_pools) if (total_urls > total_pools)
add_pool(); add_pool();
pool = &pools[total_urls - 1]; pool = pools[total_urls - 1];
opt_set_charp(arg, &pool->rpc_url); opt_set_charp(arg, &pool->rpc_url);
if (strncmp(arg, "http://", 7) && if (strncmp(arg, "http://", 7) &&
@ -387,7 +385,7 @@ static char *set_user(const char *arg, char **p)
if (total_users > total_pools) if (total_users > total_pools)
add_pool(); add_pool();
pool = &pools[total_users - 1]; pool = pools[total_users - 1];
opt_set_charp(arg, &pool->rpc_user); opt_set_charp(arg, &pool->rpc_user);
return NULL; return NULL;
@ -403,7 +401,7 @@ static char *set_pass(const char *arg, char **p)
if (total_passes > total_pools) if (total_passes > total_pools)
add_pool(); add_pool();
pool = &pools[total_passes - 1]; pool = pools[total_passes - 1];
opt_set_charp(arg, &pool->rpc_pass); opt_set_charp(arg, &pool->rpc_pass);
return NULL; return NULL;
@ -419,7 +417,7 @@ static char *set_userpass(const char *arg, char **p)
if (total_userpasses > total_pools) if (total_userpasses > total_pools)
add_pool(); add_pool();
pool = &pools[total_userpasses - 1]; pool = pools[total_userpasses - 1];
opt_set_charp(arg, &pool->rpc_userpass); opt_set_charp(arg, &pool->rpc_userpass);
return NULL; return NULL;
@ -788,11 +786,13 @@ static void print_status(int thr_id)
void log_curses(const char *f, va_list ap) void log_curses(const char *f, va_list ap)
{ {
if (curses_active && !opt_loginput) { if (curses_active) {
pthread_mutex_lock(&curses_lock); if (!opt_loginput) {
vw_printw(logwin, f, ap); pthread_mutex_lock(&curses_lock);
wrefresh(logwin); vw_printw(logwin, f, ap);
pthread_mutex_unlock(&curses_lock); wrefresh(logwin);
pthread_mutex_unlock(&curses_lock);
}
} else } else
vprintf(f, ap); vprintf(f, ap);
} }
@ -917,9 +917,9 @@ static inline struct pool *select_pool(void)
rotating_pool++; rotating_pool++;
if (rotating_pool >= total_pools) if (rotating_pool >= total_pools)
rotating_pool = 0; rotating_pool = 0;
pool = &pools[rotating_pool]; pool = pools[rotating_pool];
if (!pool->idle && pool->enabled) if (!pool->idle && pool->enabled)
return &pools[rotating_pool]; return pools[rotating_pool];
} }
return current_pool(); return current_pool();
} }
@ -1202,32 +1202,57 @@ static int real_staged(void)
return ret; return ret;
} }
static void switch_pools(void) /* Find the pool that currently has the highest priority */
static struct pool *priority_pool(int choice)
{ {
struct pool *pool, *last_pool; struct pool *ret = NULL;
int i, pools_active = 0; int i;
for (i = 0; i < total_pools; i++) { for (i = 0; i < total_pools; i++) {
pool = &pools[i]; struct pool *pool = pools[i];
if (!pool->idle && pool->enabled) if (pool->prio == choice) {
pools_active++; ret = pool;
break;
}
} }
if (!pools_active) { if (unlikely(!ret)) {
applog(LOG_ERR, "No pools active, waiting..."); applog(LOG_ERR, "WTF No pool %d found!", choice);
goto out; return pools[choice];
} }
return ret;
}
static void switch_pools(struct pool *selected)
{
struct pool *pool, *last_pool;
int i, pool_no;
pthread_mutex_lock(&control_lock); pthread_mutex_lock(&control_lock);
last_pool = currentpool; last_pool = currentpool;
pool_no = currentpool->pool_no;
/* Switch selected to pool number 0 and move the rest down */
if (selected) {
if (selected->prio != 0) {
for (i = 0; i < total_pools; i++) {
pool = pools[i];
if (pool->prio < selected->prio)
pool->prio++;
}
selected->prio = 0;
}
}
switch (pool_strategy) { switch (pool_strategy) {
/* Both of these set to the master pool */ /* Both of these set to the master pool */
case POOL_FAILOVER: case POOL_FAILOVER:
case POOL_LOADBALANCE: case POOL_LOADBALANCE:
for (i = 0; i < total_pools; i++) { for (i = 0; i < total_pools; i++) {
if (!pools[i].idle && pools[i].enabled) { pool = priority_pool(i);
pool_no = i; if (!pool->idle && pool->enabled) {
pool_no = pool->pool_no;
break; break;
} }
} }
@ -1235,6 +1260,10 @@ static void switch_pools(void)
/* Both of these simply increment and cycle */ /* Both of these simply increment and cycle */
case POOL_ROUNDROBIN: case POOL_ROUNDROBIN:
case POOL_ROTATE: case POOL_ROTATE:
if (selected) {
pool_no = selected->pool_no;
break;
}
pool_no++; pool_no++;
if (pool_no >= total_pools) if (pool_no >= total_pools)
pool_no = 0; pool_no = 0;
@ -1242,7 +1271,8 @@ static void switch_pools(void)
default: default:
break; break;
} }
currentpool = &pools[pool_no];
currentpool = pools[pool_no];
pool = currentpool; pool = currentpool;
pthread_mutex_unlock(&control_lock); pthread_mutex_unlock(&control_lock);
@ -1253,7 +1283,7 @@ static void switch_pools(void)
pthread_mutex_lock(&qd_lock); pthread_mutex_lock(&qd_lock);
total_queued = 0; total_queued = 0;
pthread_mutex_unlock(&qd_lock); pthread_mutex_unlock(&qd_lock);
out:
inc_staged(pool, 1, true); inc_staged(pool, 1, true);
} }
@ -1330,27 +1360,95 @@ static void *stage_thread(void *userdata)
return NULL; return NULL;
} }
static char *curses_input(const char *query);
static int curses_int(const char *query)
{
int ret;
char *cvar;
cvar = curses_input(query);
ret = atoi(cvar);
free(cvar);
return ret;
}
static bool input_pool(bool live);
static void display_pools(void) static void display_pools(void)
{ {
int i, cp = current_pool()->pool_no; int i, active = 0;
struct pool *pool;
int selected;
char input;
opt_loginput = true; opt_loginput = true;
updated:
clear_logwin(); clear_logwin();
pthread_mutex_lock(&curses_lock); pthread_mutex_lock(&curses_lock);
for (i = 0; i < total_pools; i++) { for (i = 0; i < total_pools; i++) {
struct pool *pool = &pools[i]; pool = pools[i];
if (i == cp) if (pool == current_pool())
wattron(logwin, A_BOLD); wattron(logwin, A_BOLD);
if (!pool->enabled)
wattron(logwin, A_DIM);
wprintw(logwin, "%s Pool %d: %s User:%s\n", pool->enabled? "Enabled" : "Disabled", wprintw(logwin, "%s Pool %d: %s User:%s\n", pool->enabled? "Enabled" : "Disabled",
pool->pool_no, pool->rpc_url, pool->rpc_user); pool->pool_no, pool->rpc_url, pool->rpc_user);
wattroff(logwin, A_BOLD); wattroff(logwin, A_BOLD | A_DIM);
} }
//wprintw(logwin, "[A]dd pool [S]witch pool [D]isable pool [E]nable pool"); retry:
wprintw(logwin, "Press any key to continue\n"); wprintw(logwin, "\n[A]dd pool [S]witch pool [D]isable pool [E]nable pool\n");
wprintw(logwin, "Or press any other key to continue\n");
wrefresh(logwin); wrefresh(logwin);
pthread_mutex_unlock(&curses_lock); pthread_mutex_unlock(&curses_lock);
i = getch(); input = getch();
if (!strncasecmp(&input, "a", 1)) {
input_pool(true);
goto updated;
} else if (!strncasecmp(&input, "s", 1)) {
selected = curses_int("Select pool number");
if (selected < 0 || selected >= total_pools) {
wprintw(logwin, "Invalid selection");
goto retry;
}
pool = pools[selected];
pool->enabled = true;
switch_pools(pool);
goto updated;
} else if (!strncasecmp(&input, "d", 1)) {
for (i = 0; i < total_pools; i++) {
if ((pools[i])->enabled)
active++;
}
if (active <= 1) {
wprintw(logwin, "Cannot disable last pool");
goto retry;
}
selected = curses_int("Select pool number");
if (selected < 0 || selected >= total_pools) {
wprintw(logwin, "Invalid selection");
goto retry;
}
pool = pools[selected];
pool->enabled = false;
if (pool == current_pool())
switch_pools(NULL);
goto updated;
} else if (!strncasecmp(&input, "e", 1)) {
selected = curses_int("Select pool number");
if (selected < 0 || selected >= total_pools) {
wprintw(logwin, "Invalid selection");
goto retry;
}
pool = pools[selected];
pool->enabled = true;
if (pool->prio < current_pool()->prio)
switch_pools(pool);
goto updated;
}
opt_loginput = false; opt_loginput = false;
clear_logwin(); clear_logwin();
} }
@ -1552,6 +1650,7 @@ static bool pool_active(struct pool *pool)
return false; return false;
} }
applog(LOG_WARNING, "Testing pool %s", pool->rpc_url);
val = json_rpc_call(curl, pool->rpc_url, pool->rpc_userpass, rpc_req, val = json_rpc_call(curl, pool->rpc_url, pool->rpc_userpass, rpc_req,
true, false, pool); true, false, pool);
@ -1580,9 +1679,11 @@ static bool pool_active(struct pool *pool)
free(work); free(work);
} }
json_decref(val); json_decref(val);
} else } else {
applog(LOG_DEBUG, "FAILED to retrieve work from pool %u %s", applog(LOG_DEBUG, "FAILED to retrieve work from pool %u %s",
pool->pool_no, pool->rpc_url); pool->pool_no, pool->rpc_url);
applog(LOG_WARNING, "Pool down, URL or credentials invalid");
}
out: out:
curl_easy_cleanup(curl); curl_easy_cleanup(curl);
return ret; return ret;
@ -1592,14 +1693,14 @@ static void pool_died(struct pool *pool)
{ {
applog(LOG_WARNING, "Pool %d %s not responding!", pool->pool_no, pool->rpc_url); applog(LOG_WARNING, "Pool %d %s not responding!", pool->pool_no, pool->rpc_url);
gettimeofday(&pool->tv_idle, NULL); gettimeofday(&pool->tv_idle, NULL);
switch_pools(); switch_pools(NULL);
} }
static void pool_resus(struct pool *pool) static void pool_resus(struct pool *pool)
{ {
applog(LOG_WARNING, "Pool %d %s recovered", pool->pool_no, pool->rpc_url); applog(LOG_WARNING, "Pool %d %s recovered", pool->pool_no, pool->rpc_url);
if (pool->pool_no < pool_no && pool_strategy == POOL_FAILOVER) if (pool->prio < current_pool()->prio && pool_strategy == POOL_FAILOVER)
switch_pools(); switch_pools(NULL);
} }
static bool queue_request(void) static bool queue_request(void)
@ -2518,7 +2619,7 @@ static void *watchdog_thread(void *userdata)
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
for (i = 0; i < total_pools; i++) { for (i = 0; i < total_pools; i++) {
struct pool *pool = &pools[i]; struct pool *pool = pools[i];
if (!pool->enabled) if (!pool->enabled)
continue; continue;
@ -2533,7 +2634,7 @@ static void *watchdog_thread(void *userdata)
if (pool_strategy == POOL_ROTATE && now.tv_sec - rotate_tv.tv_sec > 60 * opt_rotate_period) { if (pool_strategy == POOL_ROTATE && now.tv_sec - rotate_tv.tv_sec > 60 * opt_rotate_period) {
gettimeofday(&rotate_tv, NULL); gettimeofday(&rotate_tv, NULL);
switch_pools(); switch_pools(NULL);
} }
//for (i = 0; i < mining_threads; i++) { //for (i = 0; i < mining_threads; i++) {
@ -2600,7 +2701,7 @@ static void print_summary(void)
if (total_pools > 1) { if (total_pools > 1) {
for (i = 0; i < total_pools; i++) { for (i = 0; i < total_pools; i++) {
struct pool *pool = &pools[i]; struct pool *pool = pools[i];
printf("Pool: %s\n", pool->rpc_url); printf("Pool: %s\n", pool->rpc_url);
printf(" Queued work requests: %d\n", pool->getwork_requested); printf(" Queued work requests: %d\n", pool->getwork_requested);
@ -2647,6 +2748,7 @@ static char *curses_input(const char *query)
{ {
char *input; char *input;
echo();
input = malloc(255); input = malloc(255);
if (!input) if (!input)
quit(1, "Failed to malloc input"); quit(1, "Failed to malloc input");
@ -2655,44 +2757,43 @@ static char *curses_input(const char *query)
wrefresh(logwin); wrefresh(logwin);
wgetnstr(logwin, input, 255); wgetnstr(logwin, input, 255);
leaveok(logwin, true); leaveok(logwin, true);
noecho();
return input; return input;
} }
static bool input_pool(bool live) static bool input_pool(bool live)
{ {
char *url, *user, *pass; char *url, *user, *pass;
int poolno = total_pools;
struct pool *pool; struct pool *pool;
bool ret = false;
echo();
immedok(logwin, true); immedok(logwin, true);
if (total_pools == MAX_POOLS) {
wprintw(logwin, "Reached maximum number of pools.\n");
goto out;
}
wprintw(logwin, "Input server details.\n"); wprintw(logwin, "Input server details.\n");
url = curses_input("URL"); url = curses_input("URL");
if (strncmp(url, "http://", 7) && if (strncmp(url, "http://", 7) &&
strncmp(url, "https://", 8)) { strncmp(url, "https://", 8)) {
applog(LOG_ERR, "URL must start with http:// or https://"); applog(LOG_ERR, "URL must start with http:// or https://");
return false; goto out;
} }
user = curses_input("Username"); user = curses_input("Username");
if (!user) if (!user)
return false; goto out;
pass = curses_input("Password"); pass = curses_input("Password");
if (!pass) if (!pass)
return false; goto out;
wclear(logwin);
immedok(logwin, false);
noecho();
pools = realloc(pools, sizeof(struct pool) * (total_pools + 1)); pool = calloc(sizeof(struct pool), 1);
if (!pools) if (!pool)
quit(1, "Failed to malloc pools in input_pool"); quit(1, "Failed to realloc pools in input_pool");
pool = &pools[poolno]; pool->pool_no = total_pools;
memset(pool, 0, sizeof(struct pool)); pool->prio = total_pools;
pool->pool_no = poolno;
if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL))) if (unlikely(pthread_mutex_init(&pool->pool_lock, NULL)))
quit (1, "Failed to pthread_mutex_init in input_pool"); quit (1, "Failed to pthread_mutex_init in input_pool");
pool->rpc_url = url; pool->rpc_url = url;
@ -2705,18 +2806,25 @@ static bool input_pool(bool live)
pool->tv_idle.tv_sec = ~0UL; pool->tv_idle.tv_sec = ~0UL;
if (!live) { /* Test the pool before we enable it if we're live running, otherwise
total_pools++; * it will be tested separately */
return true; ret = true;
} if (live && pool_active(pool))
/* Test the pool before we enable it if we're live running*/
if (pool_active(pool)) {
pool->enabled = true; pool->enabled = true;
total_pools++; pools[total_pools++] = pool;
return true; out:
immedok(logwin, false);
if (!ret) {
free(pool);
if (url)
free(url);
if (user)
free(user);
if (pass)
free(pass);
} }
return false; return ret;
} }
int main (int argc, char *argv[]) int main (int argc, char *argv[])
@ -2842,7 +2950,7 @@ int main (int argc, char *argv[])
} }
for (i = 0; i < total_pools; i++) { for (i = 0; i < total_pools; i++) {
struct pool *pool = &pools[i]; struct pool *pool = pools[i];
if (!pool->rpc_userpass) { if (!pool->rpc_userpass) {
if (!pool->rpc_user || !pool->rpc_pass) if (!pool->rpc_user || !pool->rpc_pass)
@ -2861,8 +2969,8 @@ int main (int argc, char *argv[])
quit(1, "Failed to find colon delimiter in userpass"); quit(1, "Failed to find colon delimiter in userpass");
} }
} }
/* Set the current pool to pool 0 */ /* Set the currentpool to pool 0 */
currentpool = pools; currentpool = pools[0];
#ifdef HAVE_SYSLOG_H #ifdef HAVE_SYSLOG_H
if (use_syslog) if (use_syslog)
@ -2943,13 +3051,18 @@ int main (int argc, char *argv[])
for (i = 0; i < total_pools; i++) { for (i = 0; i < total_pools; i++) {
struct pool *pool; struct pool *pool;
pool = &pools[i]; pool = pools[i];
if (pool_active(pool)) { if (pool_active(pool)) {
if (!currentpool)
currentpool = pool;
applog(LOG_INFO, "Pool %d %s active", pool->pool_no, pool->rpc_url); applog(LOG_INFO, "Pool %d %s active", pool->pool_no, pool->rpc_url);
pools_active++; pools_active++;
pool->enabled = true; pool->enabled = true;
} else } else {
if (pool == currentpool)
currentpool = NULL;
applog(LOG_WARNING, "Unable to get work from pool %d %s", pool->pool_no, pool->rpc_url); applog(LOG_WARNING, "Unable to get work from pool %d %s", pool->pool_no, pool->rpc_url);
}
} }
if (!pools_active) if (!pools_active)
@ -3069,8 +3182,6 @@ int main (int argc, char *argv[])
free(gpus); free(gpus);
if (opt_n_threads) if (opt_n_threads)
free(cpus); free(cpus);
if (pools)
free(pools);
curl_global_cleanup(); curl_global_cleanup();

1
miner.h

@ -267,6 +267,7 @@ typedef struct {
struct pool { struct pool {
int pool_no; int pool_no;
int prio;
int accepted, rejected; int accepted, rejected;
bool submit_fail; bool submit_fail;
bool idle; bool idle;

11
util.c

@ -285,8 +285,11 @@ json_t *json_rpc_call(CURL *curl, const char *url,
/* it is assumed that 'curl' is freshly [re]initialized at this pt */ /* it is assumed that 'curl' is freshly [re]initialized at this pt */
if (probe) if (probe) {
probing = ((want_longpoll && !have_longpoll) || !pool->probed); probing = ((want_longpoll && !have_longpoll) || !pool->probed);
/* Probe for only 10 seconds */
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10);
}
if (opt_protocol) if (opt_protocol)
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1); curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
@ -409,10 +412,8 @@ err_out:
databuf_free(&all_data); databuf_free(&all_data);
curl_slist_free_all(headers); curl_slist_free_all(headers);
curl_easy_reset(curl); curl_easy_reset(curl);
if (!successful_connect) { if (!successful_connect)
kill_work(); applog(LOG_DEBUG, "Failed to connect in json_rpc_call");
applog(LOG_ERR, "Failed to connect - wrong URL or login details?");
}
return NULL; return NULL;
} }

Loading…
Cancel
Save