@ -2398,8 +2398,11 @@ static void inc_queued(void)
mutex_unlock ( & qd_lock ) ;
mutex_unlock ( & qd_lock ) ;
}
}
static void dec_queued ( void )
static void dec_queued ( struct work * work )
{
{
if ( work - > clone )
return ;
mutex_lock ( & qd_lock ) ;
mutex_lock ( & qd_lock ) ;
if ( total_queued > 0 )
if ( total_queued > 0 )
total_queued - - ;
total_queued - - ;
@ -2416,10 +2419,19 @@ static int requests_queued(void)
return ret ;
return ret ;
}
}
static void subtract_queued ( int work_units )
{
mutex_lock ( & qd_lock ) ;
total_queued - = work_units ;
if ( total_queued < 0 )
total_queued = 0 ;
mutex_unlock ( & qd_lock ) ;
}
static int discard_stale ( void )
static int discard_stale ( void )
{
{
struct work * work , * tmp ;
struct work * work , * tmp ;
int i , stale = 0 ;
int stale = 0 , nonclon e = 0 ;
mutex_lock ( stgd_lock ) ;
mutex_lock ( stgd_lock ) ;
HASH_ITER ( hh , staged_work , work , tmp ) {
HASH_ITER ( hh , staged_work , work , tmp ) {
@ -2427,6 +2439,8 @@ static int discard_stale(void)
HASH_DEL ( staged_work , work ) ;
HASH_DEL ( staged_work , work ) ;
if ( work - > clone )
if ( work - > clone )
- - staged_extras ;
- - staged_extras ;
else
nonclone + + ;
discard_work ( work ) ;
discard_work ( work ) ;
stale + + ;
stale + + ;
}
}
@ -2436,8 +2450,7 @@ static int discard_stale(void)
applog ( LOG_DEBUG , " Discarded %d stales that didn't match current hash " , stale ) ;
applog ( LOG_DEBUG , " Discarded %d stales that didn't match current hash " , stale ) ;
/* Dec queued outside the loop to not have recursive locks */
/* Dec queued outside the loop to not have recursive locks */
for ( i = 0 ; i < stale ; i + + )
subtract_queued ( nonclone ) ;
dec_queued ( ) ;
return stale ;
return stale ;
}
}
@ -3542,7 +3555,6 @@ static bool queue_request(struct thr_info *thr, bool needed)
wc = calloc ( 1 , sizeof ( * wc ) ) ;
wc = calloc ( 1 , sizeof ( * wc ) ) ;
if ( unlikely ( ! wc ) ) {
if ( unlikely ( ! wc ) ) {
applog ( LOG_ERR , " Failed to calloc wc in queue_request " ) ;
applog ( LOG_ERR , " Failed to calloc wc in queue_request " ) ;
dec_queued ( ) ;
return false ;
return false ;
}
}
@ -3564,7 +3576,6 @@ static bool queue_request(struct thr_info *thr, bool needed)
if ( unlikely ( ! tq_push ( thr_info [ work_thr_id ] . q , wc ) ) ) {
if ( unlikely ( ! tq_push ( thr_info [ work_thr_id ] . q , wc ) ) ) {
applog ( LOG_ERR , " Failed to tq_push in queue_request " ) ;
applog ( LOG_ERR , " Failed to tq_push in queue_request " ) ;
workio_cmd_free ( wc ) ;
workio_cmd_free ( wc ) ;
dec_queued ( ) ;
return false ;
return false ;
}
}
@ -3666,7 +3677,6 @@ static struct work *clone_work(struct work *work)
cloned = false ;
cloned = false ;
break ;
break ;
}
}
inc_queued ( ) ;
roll_work ( work ) ;
roll_work ( work ) ;
work_clone = make_clone ( work ) ;
work_clone = make_clone ( work ) ;
/* Roll it again to prevent duplicates should this be used
/* Roll it again to prevent duplicates should this be used
@ -3756,7 +3766,7 @@ retry:
}
}
if ( stale_work ( work_heap , false ) ) {
if ( stale_work ( work_heap , false ) ) {
dec_queued ( ) ;
dec_queued ( work_heap ) ;
discard_work ( work_heap ) ;
discard_work ( work_heap ) ;
goto retry ;
goto retry ;
}
}
@ -3771,8 +3781,8 @@ retry:
work_heap = clone_work ( work_heap ) ;
work_heap = clone_work ( work_heap ) ;
memcpy ( work , work_heap , sizeof ( struct work ) ) ;
memcpy ( work , work_heap , sizeof ( struct work ) ) ;
dec_queued ( work_heap ) ;
free_work ( work_heap ) ;
free_work ( work_heap ) ;
dec_queued ( ) ;
ret = true ;
ret = true ;
out :
out :