@ -378,6 +378,9 @@ sharelog(const char*disposition, const struct work*work) {
applog ( LOG_ERR , " sharelog fwrite error " ) ;
applog ( LOG_ERR , " sharelog fwrite error " ) ;
}
}
static void * submit_work_thread ( void * userdata ) ;
static void * get_work_thread ( void * userdata ) ;
static void add_pool ( void )
static void add_pool ( void )
{
{
struct pool * pool ;
struct pool * pool ;
@ -395,6 +398,11 @@ static void add_pool(void)
}
}
/* Make sure the pool doesn't think we've been idle since time 0 */
/* Make sure the pool doesn't think we've been idle since time 0 */
pool - > tv_idle . tv_sec = ~ 0UL ;
pool - > tv_idle . tv_sec = ~ 0UL ;
if ( unlikely ( pthread_create ( & pool - > submit_thread , NULL , submit_work_thread , ( void * ) pool ) ) )
quit ( 1 , " Failed to create pool submit thread " ) ;
if ( unlikely ( pthread_create ( & pool - > getwork_thread , NULL , get_work_thread , ( void * ) pool ) ) )
quit ( 1 , " Failed to create pool getwork thread " ) ;
}
}
/* Pool variant of test and set */
/* Pool variant of test and set */
@ -1600,18 +1608,13 @@ static bool submit_upstream_work(const struct work *work)
bool rc = false ;
bool rc = false ;
int thr_id = work - > thr_id ;
int thr_id = work - > thr_id ;
struct cgpu_info * cgpu = thr_info [ thr_id ] . cgpu ;
struct cgpu_info * cgpu = thr_info [ thr_id ] . cgpu ;
CURL * curl = curl_easy_init ( ) ;
struct pool * pool = work - > pool ;
struct pool * pool = work - > pool ;
CURL * curl = pool - > submit_curl ;
bool rolltime ;
bool rolltime ;
uint32_t * hash32 ;
uint32_t * hash32 ;
char hashshow [ 64 + 1 ] = " " ;
char hashshow [ 64 + 1 ] = " " ;
bool isblock ;
bool isblock ;
if ( unlikely ( ! curl ) ) {
applog ( LOG_ERR , " CURL initialisation failed " ) ;
return rc ;
}
# ifdef __BIG_ENDIAN__
# ifdef __BIG_ENDIAN__
int swapcounter = 0 ;
int swapcounter = 0 ;
for ( swapcounter = 0 ; swapcounter < 32 ; swapcounter + + )
for ( swapcounter = 0 ; swapcounter < 32 ; swapcounter + + )
@ -1745,7 +1748,6 @@ static bool submit_upstream_work(const struct work *work)
out :
out :
free ( hexstr ) ;
free ( hexstr ) ;
out_nofree :
out_nofree :
curl_easy_cleanup ( curl ) ;
return rc ;
return rc ;
}
}
@ -1789,41 +1791,26 @@ static void get_benchmark_work(struct work *work)
memcpy ( work , & bench_block , min_size ) ;
memcpy ( work , & bench_block , min_size ) ;
}
}
static bool get_upstream_work ( struct work * work , bool lagging )
static bool get_upstream_work ( struct work * work )
{
{
bool rc = false , req_longpoll = false ;
struct pool * pool = work - > pool ;
struct pool * poo l;
CURL * curl = pool - > getwork_cur l;
json_t * val = NULL ;
json_t * val = NULL ;
bool rc = false ;
int retries = 0 ;
int retries = 0 ;
CURL * curl ;
char * url ;
char * url ;
curl = curl_easy_init ( ) ;
if ( unlikely ( ! curl ) ) {
applog ( LOG_ERR , " CURL initialisation failed " ) ;
return rc ;
}
pool = select_pool ( lagging ) ;
applog ( LOG_DEBUG , " DBG: sending %s get RPC call: %s " , pool - > rpc_url , rpc_req ) ;
applog ( LOG_DEBUG , " DBG: sending %s get RPC call: %s " , pool - > rpc_url , rpc_req ) ;
url = pool - > rpc_url ;
url = pool - > rpc_url ;
/* If this is the current pool and supports longpoll but has not sent
* a longpoll , send one now */
if ( unlikely ( want_longpoll & & ! pool - > is_lp & & pool = = current_pool ( ) & &
pool - > hdr_path & & ! pool_tset ( pool , & pool - > lp_sent ) ) ) {
req_longpoll = true ;
url = pool - > lp_url ;
}
retry :
retry :
/* A single failure response here might be reported as a dead pool and
/* A single failure response here might be reported as a dead pool and
* there may be temporary denied messages etc . falsely reporting
* there may be temporary denied messages etc . falsely reporting
* failure so retry a few times before giving up */
* failure so retry a few times before giving up */
while ( ! val & & retries + + < 3 ) {
while ( ! val & & retries + + < 3 ) {
val = json_rpc_call ( curl , url , pool - > rpc_userpass , rpc_req ,
val = json_rpc_call ( curl , url , pool - > rpc_userpass , rpc_req ,
false , req_longpoll , & work - > rolltime , pool , false ) ;
false , false , & work - > rolltime , pool , false ) ;
}
}
if ( unlikely ( ! val ) ) {
if ( unlikely ( ! val ) ) {
applog ( LOG_DEBUG , " Failed json_rpc_call in get_upstream_work " ) ;
applog ( LOG_DEBUG , " Failed json_rpc_call in get_upstream_work " ) ;
@ -1838,13 +1825,12 @@ retry:
goto retry ;
goto retry ;
}
}
work - > pool = pool ;
work - > pool = pool ;
work - > longpoll = req_longpoll ;
work - > longpoll = false ;
total_getworks + + ;
total_getworks + + ;
pool - > getwork_requested + + ;
pool - > getwork_requested + + ;
json_decref ( val ) ;
json_decref ( val ) ;
out :
out :
curl_easy_cleanup ( curl ) ;
return rc ;
return rc ;
}
}
@ -2001,13 +1987,40 @@ static void sighandler(int __maybe_unused sig)
kill_work ( ) ;
kill_work ( ) ;
}
}
static void start_longpoll ( void ) ;
static void stop_longpoll ( void ) ;
/* One get work thread is created per pool, so as to use one curl handle for
* all getwork reqeusts from the same pool , minimising connections opened , but
* separate from the submit work curl handle to not delay share submissions due
* to getwork traffic */
static void * get_work_thread ( void * userdata )
static void * get_work_thread ( void * userdata )
{
{
struct workio_cmd * wc = ( struct workio_cmd * ) userdata ;
struct pool * pool = ( struct pool * ) userdata ;
struct workio_cmd * wc ;
pthread_detach ( pthread_self ( ) ) ;
/* getwork_q memory never freed */
pool - > getwork_q = tq_new ( ) ;
if ( ! pool - > getwork_q )
quit ( 1 , " Failed to tq_new in get_work_thread " ) ;
/* getwork_curl never cleared */
pool - > getwork_curl = curl_easy_init ( ) ;
if ( unlikely ( ! pool - > getwork_curl ) )
quit ( 1 , " Failed to initialise pool getwork CURL " ) ;
while ( ( wc = tq_pop ( pool - > getwork_q , NULL ) ) ! = NULL ) {
struct work * ret_work ;
struct work * ret_work ;
int failures = 0 ;
int failures = 0 ;
pthread_detach ( pthread_self ( ) ) ;
if ( unlikely ( want_longpoll & & ! pool - > is_lp & & pool = = current_pool ( ) & &
pool - > hdr_path & & ! pool_tset ( pool , & pool - > lp_sent ) ) ) {
stop_longpoll ( ) ;
start_longpoll ( ) ;
}
ret_work = make_work ( ) ;
ret_work = make_work ( ) ;
if ( wc - > thr )
if ( wc - > thr )
@ -2015,13 +2028,15 @@ static void *get_work_thread(void *userdata)
else
else
ret_work - > thr = NULL ;
ret_work - > thr = NULL ;
ret_work - > pool = pool ;
/* obtain new work from bitcoin via JSON-RPC */
/* obtain new work from bitcoin via JSON-RPC */
while ( ! get_upstream_work ( ret_work , wc - > lagging ) ) {
while ( ! get_upstream_work ( ret_work ) ) {
if ( unlikely ( ( opt_retries > = 0 ) & & ( + + failures > opt_retries ) ) ) {
if ( unlikely ( ( opt_retries > = 0 ) & & ( + + failures > opt_retries ) ) ) {
applog ( LOG_ERR , " json_rpc_call failed, terminating workio thread " ) ;
applog ( LOG_ERR , " json_rpc_call failed, terminating workio thread " ) ;
free_work ( ret_work ) ;
free_work ( ret_work ) ;
kill_work ( ) ;
kill_work ( ) ;
goto out ;
break ;
}
}
/* pause, then restart work-request loop */
/* pause, then restart work-request loop */
@ -2040,21 +2055,17 @@ static void *get_work_thread(void *userdata)
kill_work ( ) ;
kill_work ( ) ;
free_work ( ret_work ) ;
free_work ( ret_work ) ;
}
}
out :
workio_cmd_free ( wc ) ;
workio_cmd_free ( wc ) ;
}
return NULL ;
return NULL ;
}
}
static bool workio_get_work ( struct workio_cmd * wc )
static bool workio_get_work ( struct workio_cmd * wc )
{
{
pthread_t get_thread ;
struct pool * pool = select_pool ( wc - > lagging ) ;
if ( unlikely ( pthread_create ( & get_thread , NULL , get_work_thread , ( void * ) wc ) ) ) {
return tq_push ( pool - > getwork_q , wc ) ;
applog ( LOG_ERR , " Failed to create get_work_thread " ) ;
return false ;
}
return true ;
}
}
static bool stale_work ( struct work * work , bool share )
static bool stale_work ( struct work * work , bool share )
@ -2080,15 +2091,31 @@ static bool stale_work(struct work *work, bool share)
return false ;
return false ;
}
}
/* One submit work thread is created per pool, so as to use one curl handle
* for all submissions to the same pool , minimising connections opened , but
* separate from the getwork curl handle to not delay share submission due to
* getwork traffic */
static void * submit_work_thread ( void * userdata )
static void * submit_work_thread ( void * userdata )
{
{
struct workio_cmd * wc = ( struct workio_cmd * ) userdata ;
struct pool * pool = ( struct pool * ) userdata ;
struct work * work = wc - > u . work ;
struct workio_cmd * wc ;
struct pool * pool = work - > pool ;
int failures = 0 ;
pthread_detach ( pthread_self ( ) ) ;
pthread_detach ( pthread_self ( ) ) ;
/* submit_q memory never freed */
pool - > submit_q = tq_new ( ) ;
if ( ! pool - > submit_q )
quit ( 1 , " Failed to tq_new in submit_work_thread " ) ;
/* submit_curl never cleared */
pool - > submit_curl = curl_easy_init ( ) ;
if ( unlikely ( ! pool - > submit_curl ) )
quit ( 1 , " Failed to initialise pool submit CURL " ) ;
while ( ( wc = tq_pop ( pool - > submit_q , NULL ) ) ! = NULL ) {
struct work * work = wc - > u . work ;
int failures = 0 ;
if ( stale_work ( work , true ) ) {
if ( stale_work ( work , true ) ) {
if ( opt_submit_stale )
if ( opt_submit_stale )
applog ( LOG_NOTICE , " Stale share detected, submitting as user requested " ) ;
applog ( LOG_NOTICE , " Stale share detected, submitting as user requested " ) ;
@ -2099,7 +2126,8 @@ static void *submit_work_thread(void *userdata)
sharelog ( " discard " , work ) ;
sharelog ( " discard " , work ) ;
total_stale + + ;
total_stale + + ;
pool - > stale_shares + + ;
pool - > stale_shares + + ;
goto out ;
workio_cmd_free ( wc ) ;
continue ;
}
}
}
}
@ -2124,20 +2152,15 @@ static void *submit_work_thread(void *userdata)
fail_pause + = opt_fail_pause ;
fail_pause + = opt_fail_pause ;
}
}
fail_pause = opt_fail_pause ;
fail_pause = opt_fail_pause ;
out :
workio_cmd_free ( wc ) ;
workio_cmd_free ( wc ) ;
}
return NULL ;
return NULL ;
}
}
static bool workio_submit_work ( struct workio_cmd * wc )
static bool workio_submit_work ( struct workio_cmd * wc )
{
{
pthread_t submit_thread ;
return tq_push ( wc - > u . work - > pool - > submit_q , wc ) ;
if ( unlikely ( pthread_create ( & submit_thread , NULL , submit_work_thread , ( void * ) wc ) ) ) {
applog ( LOG_ERR , " Failed to create submit_work_thread " ) ;
return false ;
}
return true ;
}
}
/* Find the pool that currently has the highest priority */
/* Find the pool that currently has the highest priority */
@ -2904,9 +2927,6 @@ retry:
}
}
# endif
# endif
static void start_longpoll ( void ) ;
static void stop_longpoll ( void ) ;
# ifdef HAVE_CURSES
# ifdef HAVE_CURSES
static void set_options ( void )
static void set_options ( void )
{
{
@ -3880,7 +3900,6 @@ static void *longpoll_thread(void *userdata)
bool rolltime ;
bool rolltime ;
pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) ;
pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) ;
pthread_detach ( pthread_self ( ) ) ;
curl = curl_easy_init ( ) ;
curl = curl_easy_init ( ) ;
if ( unlikely ( ! curl ) ) {
if ( unlikely ( ! curl ) ) {
@ -3959,12 +3978,13 @@ out:
return NULL ;
return NULL ;
}
}
__maybe_unused
static void stop_longpoll ( void )
static void stop_longpoll ( void )
{
{
struct thr_info * thr = & thr_info [ longpoll_thr_id ] ;
struct thr_info * thr = & thr_info [ longpoll_thr_id ] ;
thr_info_cancel ( thr ) ;
thr_info_cancel ( thr ) ;
if ( have_longpoll )
pthread_join ( thr - > pth , NULL ) ;
have_longpoll = false ;
have_longpoll = false ;
tq_freeze ( thr - > q ) ;
tq_freeze ( thr - > q ) ;
}
}