@ -246,7 +246,6 @@ struct thread_q *getq;
static int total_work ;
static int total_work ;
struct work * staged_work = NULL ;
struct work * staged_work = NULL ;
static int staged_extras ;
struct schedtime {
struct schedtime {
bool enable ;
bool enable ;
@ -1335,7 +1334,7 @@ void decay_time(double *f, double fadd)
* f = ( fadd + * f * 0.58 ) / 1.58 ;
* f = ( fadd + * f * 0.58 ) / 1.58 ;
}
}
static int requests _staged( void )
static int total _staged( void )
{
{
int ret ;
int ret ;
@ -1345,6 +1344,16 @@ static int requests_staged(void)
return ret ;
return ret ;
}
}
static int pool_staged ( struct pool * pool )
{
int ret ;
mutex_lock ( stgd_lock ) ;
ret = pool - > staged ;
mutex_unlock ( stgd_lock ) ;
return ret ;
}
# ifdef HAVE_CURSES
# ifdef HAVE_CURSES
WINDOW * mainwin , * statuswin , * logwin ;
WINDOW * mainwin , * statuswin , * logwin ;
# endif
# endif
@ -1451,7 +1460,7 @@ static void curses_print_status(void)
mvwprintw ( statuswin , 2 , 0 , " %s " , statusline ) ;
mvwprintw ( statuswin , 2 , 0 , " %s " , statusline ) ;
wclrtoeol ( statuswin ) ;
wclrtoeol ( statuswin ) ;
mvwprintw ( statuswin , 3 , 0 , " TQ: %d ST: %d SS: %d DW: %d NB: %d LW: %d GF: %d RF: %d " ,
mvwprintw ( statuswin , 3 , 0 , " TQ: %d ST: %d SS: %d DW: %d NB: %d LW: %d GF: %d RF: %d " ,
total_queued , requests _staged( ) , total_stale , total_discarded , new_blocks ,
total_queued , total _staged( ) , total_stale , total_discarded , new_blocks ,
local_work , total_go , total_ro ) ;
local_work , total_go , total_ro ) ;
wclrtoeol ( statuswin ) ;
wclrtoeol ( statuswin ) ;
if ( pool_strategy = = POOL_LOADBALANCE & & total_pools > 1 )
if ( pool_strategy = = POOL_LOADBALANCE & & total_pools > 1 )
@ -2207,6 +2216,41 @@ static void push_curl_entry(struct curl_ent *ce, struct pool *pool)
mutex_unlock ( & pool - > pool_lock ) ;
mutex_unlock ( & pool - > pool_lock ) ;
}
}
/* This is overkill, but at least we'll know accurately how much work is
* queued to prevent ever being left without work */
static void inc_queued ( struct pool * pool )
{
if ( unlikely ( ! pool ) )
return ;
mutex_lock ( & qd_lock ) ;
pool - > queued + + ;
total_queued + + ;
mutex_unlock ( & qd_lock ) ;
}
static void dec_queued ( struct pool * pool )
{
if ( unlikely ( ! pool ) )
return ;
mutex_lock ( & qd_lock ) ;
pool - > queued - - ;
total_queued - - ;
mutex_unlock ( & qd_lock ) ;
}
static int current_queued ( void )
{
struct pool * pool = current_pool ( ) ;
int ret ;
mutex_lock ( & qd_lock ) ;
ret = pool - > queued ;
mutex_unlock ( & qd_lock ) ;
return ret ;
}
/* ce and pool may appear uninitialised at push_curl_entry, but they're always
/* ce and pool may appear uninitialised at push_curl_entry, but they're always
* set when we don ' t have opt_benchmark enabled */
* set when we don ' t have opt_benchmark enabled */
static void * get_work_thread ( void * userdata )
static void * get_work_thread ( void * userdata )
@ -2230,6 +2274,7 @@ static void *get_work_thread(void *userdata)
get_benchmark_work ( ret_work ) ;
get_benchmark_work ( ret_work ) ;
else {
else {
pool = ret_work - > pool = select_pool ( wc - > lagging ) ;
pool = ret_work - > pool = select_pool ( wc - > lagging ) ;
inc_queued ( pool ) ;
ce = pop_curl_entry ( pool ) ;
ce = pop_curl_entry ( pool ) ;
@ -2249,6 +2294,8 @@ static void *get_work_thread(void *userdata)
fail_pause + = opt_fail_pause ;
fail_pause + = opt_fail_pause ;
}
}
fail_pause = opt_fail_pause ;
fail_pause = opt_fail_pause ;
dec_queued ( pool ) ;
}
}
applog ( LOG_DEBUG , " Pushing work to requesting thread " ) ;
applog ( LOG_DEBUG , " Pushing work to requesting thread " ) ;
@ -2503,11 +2550,6 @@ void switch_pools(struct pool *selected)
if ( pool ! = last_pool )
if ( pool ! = last_pool )
applog ( LOG_WARNING , " Switching to %s " , pool - > rpc_url ) ;
applog ( LOG_WARNING , " Switching to %s " , pool - > rpc_url ) ;
/* Reset the queued amount to allow more to be queued for the new pool */
mutex_lock ( & qd_lock ) ;
total_queued = 0 ;
mutex_unlock ( & qd_lock ) ;
mutex_lock ( & lp_lock ) ;
mutex_lock ( & lp_lock ) ;
pthread_cond_broadcast ( & lp_cond ) ;
pthread_cond_broadcast ( & lp_cond ) ;
mutex_unlock ( & lp_lock ) ;
mutex_unlock ( & lp_lock ) ;
@ -2525,58 +2567,16 @@ static void discard_work(struct work *work)
free_work ( work ) ;
free_work ( work ) ;
}
}
/* This is overkill, but at least we'll know accurately how much work is
* queued to prevent ever being left without work */
static void inc_queued ( void )
{
mutex_lock ( & qd_lock ) ;
total_queued + + ;
mutex_unlock ( & qd_lock ) ;
}
static void dec_queued ( struct work * work )
{
if ( work - > clone )
return ;
mutex_lock ( & qd_lock ) ;
if ( total_queued > 0 )
total_queued - - ;
mutex_unlock ( & qd_lock ) ;
}
static int requests_queued ( void )
{
int ret ;
mutex_lock ( & qd_lock ) ;
ret = total_queued ;
mutex_unlock ( & qd_lock ) ;
return ret ;
}
static void subtract_queued ( int work_units )
{
mutex_lock ( & qd_lock ) ;
total_queued - = work_units ;
if ( total_queued < 0 )
total_queued = 0 ;
mutex_unlock ( & qd_lock ) ;
}
static void discard_stale ( void )
static void discard_stale ( void )
{
{
struct work * work , * tmp ;
struct work * work , * tmp ;
int stale = 0 , nonclone = 0 ;
int stale = 0 ;
mutex_lock ( stgd_lock ) ;
mutex_lock ( stgd_lock ) ;
HASH_ITER ( hh , staged_work , work , tmp ) {
HASH_ITER ( hh , staged_work , work , tmp ) {
if ( stale_work ( work , false ) ) {
if ( stale_work ( work , false ) ) {
HASH_DEL ( staged_work , work ) ;
HASH_DEL ( staged_work , work ) ;
if ( work - > clone )
work - > pool - > staged - - ;
- - staged_extras ;
else
nonclone + + ;
discard_work ( work ) ;
discard_work ( work ) ;
stale + + ;
stale + + ;
}
}
@ -2584,9 +2584,6 @@ static void discard_stale(void)
mutex_unlock ( stgd_lock ) ;
mutex_unlock ( stgd_lock ) ;
applog ( LOG_DEBUG , " Discarded %d stales that didn't match current hash " , stale ) ;
applog ( LOG_DEBUG , " Discarded %d stales that didn't match current hash " , stale ) ;
/* Dec queued outside the loop to not have recursive locks */
subtract_queued ( nonclone ) ;
}
}
bool queue_request ( struct thr_info * thr , bool needed ) ;
bool queue_request ( struct thr_info * thr , bool needed ) ;
@ -2749,9 +2746,8 @@ static bool hash_push(struct work *work)
mutex_lock ( stgd_lock ) ;
mutex_lock ( stgd_lock ) ;
if ( likely ( ! getq - > frozen ) ) {
if ( likely ( ! getq - > frozen ) ) {
HASH_ADD_INT ( staged_work , id , work ) ;
HASH_ADD_INT ( staged_work , id , work ) ;
work - > pool - > staged + + ;
HASH_SORT ( staged_work , tv_sort ) ;
HASH_SORT ( staged_work , tv_sort ) ;
if ( work - > clone )
+ + staged_extras ;
} else
} else
rc = false ;
rc = false ;
pthread_cond_signal ( & getq - > cond ) ;
pthread_cond_signal ( & getq - > cond ) ;
@ -3611,7 +3607,6 @@ static bool pool_active(struct pool *pool, bool pinging)
tq_push ( thr_info [ stage_thr_id ] . q , work ) ;
tq_push ( thr_info [ stage_thr_id ] . q , work ) ;
total_getworks + + ;
total_getworks + + ;
pool - > getwork_requested + + ;
pool - > getwork_requested + + ;
inc_queued ( ) ;
ret = true ;
ret = true ;
gettimeofday ( & pool - > tv_idle , NULL ) ;
gettimeofday ( & pool - > tv_idle , NULL ) ;
} else {
} else {
@ -3692,93 +3687,48 @@ static void pool_resus(struct pool *pool)
switch_pools ( NULL ) ;
switch_pools ( NULL ) ;
}
}
static time_t requested_tv_sec ;
static bool control_tset ( bool * var )
{
bool ret ;
mutex_lock ( & control_lock ) ;
ret = * var ;
* var = true ;
mutex_unlock ( & control_lock ) ;
return ret ;
}
static void control_tclear ( bool * var )
{
mutex_lock ( & control_lock ) ;
* var = false ;
mutex_unlock ( & control_lock ) ;
}
static bool queueing ;
bool queue_request ( struct thr_info * thr , bool needed )
bool queue_request ( struct thr_info * thr , bool needed )
{
{
int cq , ts , maxq = opt_queue + mining_threads ;
struct workio_cmd * wc ;
struct workio_cmd * wc ;
struct timeval now ;
time_t scan_post ;
int rq , rs ;
bool ret = true ;
bool ret = true ;
/* Prevent multiple requests being executed at once */
cq = current_queued ( ) ;
if ( control_tset ( & queueing ) )
ts = total_staged ( ) ;
return ret ;
rq = requests_queued ( ) ;
rs = requests_staged ( ) ;
/* Grab more work every 2/3 of the scan time to avoid all work expiring
* at the same time */
scan_post = opt_scantime * 2 / 3 ;
if ( scan_post < 5 )
scan_post = 5 ;
gettimeofday ( & now , NULL ) ;
/* Test to make sure we have enough work for pools without rolltime
/* Test to make sure we have enough work for pools without rolltime
* and enough original work for pools with rolltime */
* and enough original work for pools with rolltime */
if ( ( rq > = mining_threads | | rs > = mining_threads ) & &
if ( ( cq > = opt_queue & & ts > = maxq ) | | cq > = maxq ) {
rq > staged_extras + opt_queue & &
/* If we're queueing work faster than we can stage it, consider the
now . tv_sec - requested_tv_sec < scan_post )
* system lagging and allow work to be gathered from another pool if
goto out ;
* possible */
if ( needed & ! ts ) {
requested_tv_sec = now . tv_sec ;
wc - > lagging = true ;
applog ( LOG_DEBUG , " Pool lagging " ) ;
inc_queued ( ) ;
}
return true ;
}
/* fill out work request message */
/* fill out work request message */
wc = calloc ( 1 , sizeof ( * wc ) ) ;
wc = calloc ( 1 , sizeof ( * wc ) ) ;
if ( unlikely ( ! wc ) ) {
if ( unlikely ( ! wc ) ) {
applog ( LOG_ERR , " Failed to calloc wc in queue_request " ) ;
applog ( LOG_ERR , " Failed to calloc wc in queue_request " ) ;
ret = false ;
return false ;
goto out ;
}
}
wc - > cmd = WC_GET_WORK ;
wc - > cmd = WC_GET_WORK ;
wc - > thr = thr ;
wc - > thr = thr ;
/* If we're queueing work faster than we can stage it, consider the
* system lagging and allow work to be gathered from another pool if
* possible */
if ( rq & & needed & & ! rs & & ! opt_fail_only )
wc - > lagging = true ;
applog ( LOG_DEBUG , " Queueing getwork request to work thread " ) ;
applog ( LOG_DEBUG , " Queueing getwork request to work thread " ) ;
/* send work request to workio thread */
/* send work request to workio thread */
if ( unlikely ( ! tq_push ( thr_info [ work_thr_id ] . q , wc ) ) ) {
if ( unlikely ( ! tq_push ( thr_info [ work_thr_id ] . q , wc ) ) ) {
applog ( LOG_ERR , " Failed to tq_push in queue_request " ) ;
applog ( LOG_ERR , " Failed to tq_push in queue_request " ) ;
workio_cmd_free ( wc ) ;
workio_cmd_free ( wc ) ;
ret = false ;
return false ;
}
}
out :
return true ;
control_tclear ( & queueing ) ;
return ret ;
}
}
static struct work * hash_pop ( const struct timespec * abstime )
static struct work * hash_pop ( const struct timespec * abstime )
@ -3794,8 +3744,6 @@ static struct work *hash_pop(const struct timespec *abstime)
if ( HASH_COUNT ( staged_work ) ) {
if ( HASH_COUNT ( staged_work ) ) {
work = staged_work ;
work = staged_work ;
HASH_DEL ( staged_work , work ) ;
HASH_DEL ( staged_work , work ) ;
if ( work - > clone )
- - staged_extras ;
if ( HASH_COUNT ( staged_work ) < mining_threads )
if ( HASH_COUNT ( staged_work ) < mining_threads )
queue = true ;
queue = true ;
}
}
@ -3870,7 +3818,7 @@ static struct work *make_clone(struct work *work)
* the future */
* the future */
static struct work * clone_work ( struct work * work )
static struct work * clone_work ( struct work * work )
{
{
int mrs = mining_threads - requests _staged( ) ;
int mrs = mining_threads + opt_queue - total _staged( ) ;
struct work * work_clone ;
struct work * work_clone ;
bool cloned ;
bool cloned ;
@ -3910,8 +3858,8 @@ static bool get_work(struct work *work, bool requested, struct thr_info *thr,
struct timespec abstime = { 0 , 0 } ;
struct timespec abstime = { 0 , 0 } ;
struct timeval now ;
struct timeval now ;
struct work * work_heap ;
struct work * work_heap ;
int failures = 0 , cq ;
struct pool * pool ;
struct pool * pool ;
int failures = 0 ;
/* Tell the watchdog thread this thread is waiting on getwork and
/* Tell the watchdog thread this thread is waiting on getwork and
* should not be restarted */
* should not be restarted */
@ -3922,9 +3870,11 @@ static bool get_work(struct work *work, bool requested, struct thr_info *thr,
thread_reportin ( thr ) ;
thread_reportin ( thr ) ;
return true ;
return true ;
}
}
cq = current_queued ( ) ;
retry :
retry :
pool = current_pool ( ) ;
pool = current_pool ( ) ;
if ( ! requested | | requests_queued ( ) < opt_queue ) {
if ( ! requested | | cq < opt_queue ) {
if ( unlikely ( ! queue_request ( thr , true ) ) ) {
if ( unlikely ( ! queue_request ( thr , true ) ) ) {
applog ( LOG_WARNING , " Failed to queue_request in get_work " ) ;
applog ( LOG_WARNING , " Failed to queue_request in get_work " ) ;
goto out ;
goto out ;
@ -3937,7 +3887,7 @@ retry:
goto out ;
goto out ;
}
}
if ( ! pool - > lagging & & requested & & ! newreq & & ! requests_staged ( ) & & requests_queued ( ) > = mining_threads ) {
if ( ! pool - > lagging & & requested & & ! newreq & & ! pool_staged ( pool ) & & cq > = mining_threads + opt_queue ) {
struct cgpu_info * cgpu = thr - > cgpu ;
struct cgpu_info * cgpu = thr - > cgpu ;
bool stalled = true ;
bool stalled = true ;
int i ;
int i ;
@ -3974,7 +3924,6 @@ retry:
}
}
if ( stale_work ( work_heap , false ) ) {
if ( stale_work ( work_heap , false ) ) {
dec_queued ( work_heap ) ;
discard_work ( work_heap ) ;
discard_work ( work_heap ) ;
goto retry ;
goto retry ;
}
}
@ -3989,7 +3938,6 @@ retry:
work_heap = clone_work ( work_heap ) ;
work_heap = clone_work ( work_heap ) ;
memcpy ( work , work_heap , sizeof ( struct work ) ) ;
memcpy ( work , work_heap , sizeof ( struct work ) ) ;
dec_queued ( work_heap ) ;
free_work ( work_heap ) ;
free_work ( work_heap ) ;
ret = true ;
ret = true ;
@ -4368,10 +4316,8 @@ static void convert_to_work(json_t *val, int rolltime, struct pool *pool)
if ( unlikely ( ! stage_work ( work ) ) )
if ( unlikely ( ! stage_work ( work ) ) )
free_work ( work ) ;
free_work ( work ) ;
else {
else
inc_queued ( ) ;
applog ( LOG_DEBUG , " Converted longpoll data to work " ) ;
applog ( LOG_DEBUG , " Converted longpoll data to work " ) ;
}
}
}
/* If we want longpoll, enable it for the chosen default pool, or, if
/* If we want longpoll, enable it for the chosen default pool, or, if
@ -4568,12 +4514,12 @@ static void *watchpool_thread(void __maybe_unused *userdata)
}
}
/* Work is sorted according to age, so discard the oldest work items, leaving
/* Work is sorted according to age, so discard the oldest work items, leaving
* only 1 staged work item per mining thread */
* only 1 / 3 more staged work item than mining threads */
static void age_work ( void )
static void age_work ( void )
{
{
int discarded = 0 ;
int discarded = 0 ;
while ( requests _staged( ) > mining_threads * 4 / 3 + opt_queue ) {
while ( total _staged( ) > mining_threads * 4 / 3 + opt_queue ) {
struct work * work = hash_pop ( NULL ) ;
struct work * work = hash_pop ( NULL ) ;
if ( unlikely ( ! work ) )
if ( unlikely ( ! work ) )
@ -4609,8 +4555,8 @@ static void *watchdog_thread(void __maybe_unused *userdata)
struct timeval now ;
struct timeval now ;
sleep ( interval ) ;
sleep ( interval ) ;
if ( requests_queued ( ) < opt_queue )
queue_request ( NULL , false ) ;
queue_request ( NULL , false ) ;
age_work ( ) ;
age_work ( ) ;