Browse Source

Create a hash list of all the blocks created and search them to detect when a new block has definitely appeared, using that information to detect stale work and discard it.

nfactor-troky
Con Kolivas 13 years ago
parent
commit
93f4163aca
  1. 266
      main.c

266
main.c

@ -270,6 +270,13 @@ static char *current_hash; @@ -270,6 +270,13 @@ static char *current_hash;
static char datestamp[40];
static char blocktime[30];
struct block {
char hash[37];
UT_hash_handle hh;
};
static struct block *blocks = NULL;
static char *opt_kernel = NULL;
#if defined(unix)
@ -1829,18 +1836,13 @@ static bool stale_work(struct work *work) @@ -1829,18 +1836,13 @@ static bool stale_work(struct work *work)
if ((now.tv_sec - work->tv_staged.tv_sec) >= opt_scantime)
return true;
/* Only use the primary pool for determination as the work may
* interleave at times of new blocks */
if (work->pool != current_pool())
return ret;
hexstr = bin2hex(work->data, 36);
hexstr = bin2hex(work->data, 18);
if (unlikely(!hexstr)) {
applog(LOG_ERR, "submit_work_thread OOM");
return ret;
}
if (strncmp(hexstr, current_block, 36))
if (strcmp(hexstr, current_block))
ret = true;
free(hexstr);
@ -2031,6 +2033,95 @@ static void switch_pools(struct pool *selected) @@ -2031,6 +2033,95 @@ static void switch_pools(struct pool *selected)
inc_staged(pool, 1, true);
}
static void discard_work(struct work *work)
{
if (!work->clone && !work->rolls && !work->mined) {
if (work->pool)
work->pool->discarded_work++;
total_discarded++;
if (opt_debug)
applog(LOG_DEBUG, "Discarded work");
} else if (opt_debug)
applog(LOG_DEBUG, "Discarded cloned or rolled work");
free_work(work);
}
/* This is overkill, but at least we'll know accurately how much work is
* queued to prevent ever being left without work */
static void inc_queued(void)
{
mutex_lock(&qd_lock);
total_queued++;
mutex_unlock(&qd_lock);
}
static void dec_queued(void)
{
mutex_lock(&qd_lock);
if (total_queued > 0)
total_queued--;
mutex_unlock(&qd_lock);
dec_staged(1);
}
static int requests_queued(void)
{
int ret;
mutex_lock(&qd_lock);
ret = total_queued;
mutex_unlock(&qd_lock);
return ret;
}
static int discard_stale(void)
{
struct work *work, *tmp;
int i, stale = 0;
mutex_lock(&getq->mutex);
HASH_ITER(hh, staged_work, work, tmp) {
if (stale_work(work)) {
HASH_DEL(staged_work, work);
discard_work(work);
stale++;
}
}
mutex_unlock(&getq->mutex);
if (opt_debug)
applog(LOG_DEBUG, "Discarded %d stales that didn't match current hash", stale);
/* Dec queued outside the loop to not have recursive locks */
for (i = 0; i < stale; i++)
dec_queued();
return stale;
}
static bool queue_request(struct thr_info *thr, bool needed);
static void restart_threads(void)
{
struct pool *pool = current_pool();
int i, stale;
block_changed = BLOCK_NONE;
/* Discard staged work that is now stale */
stale = discard_stale();
for (i = 0; i < stale; i++)
queue_request(NULL, true);
/* Temporarily increase the staged count so that the pool is not seen
* as lagging when a new block hits */
inc_staged(pool, mining_threads, true);
for (i = 0; i < mining_threads; i++)
work_restart[i].restart = 1;
}
static void set_curblock(char *hexstr, unsigned char *hash)
{
unsigned char hash_swap[32];
@ -2041,7 +2132,7 @@ static void set_curblock(char *hexstr, unsigned char *hash) @@ -2041,7 +2132,7 @@ static void set_curblock(char *hexstr, unsigned char *hash)
* we might be accessing its data elsewhere */
if (current_hash)
old_hash = current_hash;
memcpy(current_block, hexstr, 36);
strcpy(current_block, hexstr);
gettimeofday(&tv_now, NULL);
get_timestamp(blocktime, &tv_now);
swap256(hash_swap, hash);
@ -2054,33 +2145,40 @@ static void set_curblock(char *hexstr, unsigned char *hash) @@ -2054,33 +2145,40 @@ static void set_curblock(char *hexstr, unsigned char *hash)
static void test_work_current(struct work *work)
{
struct block *s;
char *hexstr;
/* Only use the primary pool for determination */
if (work->pool != current_pool() || work->cloned || work->rolls || work->clone)
return;
hexstr = bin2hex(work->data, 36);
hexstr = bin2hex(work->data, 18);
if (unlikely(!hexstr)) {
applog(LOG_ERR, "stage_thread OOM");
return;
}
/* current_block is blanked out on successful longpoll */
if (unlikely(strncmp(hexstr, current_block, 36))) {
/* Search to see if this block exists yet and if not, consider it a
* new block and set the current block details to this one */
mutex_lock(&getq->mutex);
HASH_FIND_STR(blocks, hexstr, s);
mutex_unlock(&getq->mutex);
if (!s) {
s = calloc(sizeof(struct block), 1);
if (unlikely(!s))
quit (1, "test_work_current OOM");
strcpy(s->hash, hexstr);
mutex_lock(&getq->mutex);
HASH_ADD_STR(blocks, hash, s);
mutex_unlock(&getq->mutex);
set_curblock(hexstr, work->data);
new_blocks++;
if (block_changed != BLOCK_LP && block_changed != BLOCK_FIRST) {
block_changed = BLOCK_DETECT;
new_blocks++;
if (have_longpoll)
applog(LOG_WARNING, "New block detected on network before longpoll, waiting on fresh work");
else
applog(LOG_WARNING, "New block detected on network, waiting on fresh work");
/* As we can't flush the work from here, signal the
* wakeup thread to restart all the threads */
work_restart[watchdog_thr_id].restart = 1;
} else
block_changed = BLOCK_NONE;
set_curblock(hexstr, work->data);
restart_threads();
}
free(hexstr);
@ -2804,34 +2902,6 @@ out_unlock: @@ -2804,34 +2902,6 @@ out_unlock:
}
}
/* This is overkill, but at least we'll know accurately how much work is
* queued to prevent ever being left without work */
static void inc_queued(void)
{
mutex_lock(&qd_lock);
total_queued++;
mutex_unlock(&qd_lock);
}
static void dec_queued(void)
{
mutex_lock(&qd_lock);
if (total_queued > 0)
total_queued--;
mutex_unlock(&qd_lock);
dec_staged(1);
}
static int requests_queued(void)
{
int ret;
mutex_lock(&qd_lock);
ret = total_queued;
mutex_unlock(&qd_lock);
return ret;
}
static bool pool_active(struct pool *pool, bool pinging)
{
bool ret = false;
@ -2951,19 +3021,6 @@ out: @@ -2951,19 +3021,6 @@ out:
return true;
}
static void discard_work(struct work *work)
{
if (!work->clone && !work->rolls && !work->mined) {
if (work->pool)
work->pool->discarded_work++;
total_discarded++;
if (opt_debug)
applog(LOG_DEBUG, "Discarded work");
} else if (opt_debug)
applog(LOG_DEBUG, "Discarded cloned or rolled work");
free_work(work);
}
struct work *hash_pop(const struct timespec *abstime)
{
struct work *work = NULL;
@ -2990,55 +3047,6 @@ out: @@ -2990,55 +3047,6 @@ out:
return work;
}
static void discard_staged(void)
{
struct timespec abstime = {};
struct timeval now;
struct work *work_heap;
/* Just in case we fell in a hole and missed a queue filling */
if (unlikely(!requests_staged()))
return;
gettimeofday(&now, NULL);
abstime.tv_sec = now.tv_sec + 60;
if (opt_debug)
applog(LOG_DEBUG, "Popping work to discard staged");
work_heap = hash_pop(&abstime);
if (unlikely(!work_heap))
return;
discard_work(work_heap);
dec_queued();
}
static void flush_requests(void)
{
struct pool *pool = current_pool();
int i, stale;
/* We should have one fresh work item staged from the block change. */
stale = requests_staged() - 1;
/* Temporarily increase the staged count so that get_work thinks there
* is work available instead of making threads reuse existing work */
inc_staged(pool, mining_threads, true);
for (i = 0; i < stale; i++) {
/* Queue a whole batch of new requests */
if (unlikely(!queue_request(NULL, true))) {
applog(LOG_ERR, "Failed to queue requests in flush_requests");
kill_work();
break;
}
/* Pop off the old requests. Cancelling the requests would be better
* but is tricky */
discard_staged();
}
}
static inline bool should_roll(struct work *work)
{
int rs;
@ -3123,6 +3131,7 @@ retry: @@ -3123,6 +3131,7 @@ retry:
if (requested && !pool_tset(pool, &pool->lagging)) {
applog(LOG_WARNING, "Pool %d not providing work fast enough",
pool->pool_no);
applog(LOG_WARNING, "staged %d", requests_staged());
pool->localgen_occasions++;
total_lo++;
}
@ -3801,20 +3810,6 @@ out: @@ -3801,20 +3810,6 @@ out:
}
#endif /* HAVE_OPENCL */
static void restart_threads(void)
{
int i;
if (block_changed == BLOCK_DETECT)
block_changed = BLOCK_NONE;
/* Discard old queued requests and get new ones */
flush_requests();
for (i = 0; i < mining_threads; i++)
work_restart[i].restart = 1;
}
/* Stage another work item from the work returned in a longpoll */
static void convert_to_work(json_t *val, bool rolltime)
{
@ -3899,9 +3894,7 @@ static void *longpoll_thread(void *userdata) @@ -3899,9 +3894,7 @@ static void *longpoll_thread(void *userdata)
* sure it's only done once per new block */
if (block_changed != BLOCK_DETECT) {
block_changed = BLOCK_LP;
new_blocks++;
applog(LOG_WARNING, "LONGPOLL detected new block on network, waiting on fresh work");
restart_threads();
} else {
applog(LOG_WARNING, "LONGPOLL received after new block already detected");
block_changed = BLOCK_NONE;
@ -4490,6 +4483,8 @@ out: @@ -4490,6 +4483,8 @@ out:
int main (int argc, char *argv[])
{
unsigned int i, x, y, pools_active = 0;
struct block *block, *tmpblock;
struct work *work, *tmpwork;
struct sigaction handler;
struct thr_info *thr;
char name[256];
@ -4528,11 +4523,13 @@ int main (int argc, char *argv[]) @@ -4528,11 +4523,13 @@ int main (int argc, char *argv[])
skip_to_bench = 1;
#endif // defined(WIN32)
for (i = 0; i < 36; i++)
strcat(current_block, "0");
current_hash = calloc(sizeof(current_hash), 1);
if (unlikely(!current_hash))
block = calloc(sizeof(struct block), 1);
if (unlikely(!block))
quit (1, "main OOM");
for (i = 0; i < 36; i++)
strcat(block->hash, "0");
HASH_ADD_STR(blocks, hash, block);
strcpy(current_block, block->hash);
// Reckon number of cores in the box
#if defined(WIN32)
@ -4940,6 +4937,15 @@ int main (int argc, char *argv[]) @@ -4940,6 +4937,15 @@ int main (int argc, char *argv[])
if (opt_n_threads)
free(cpus);
HASH_ITER(hh, staged_work, work, tmpwork) {
HASH_DEL(staged_work, work);
free_work(work);
}
HASH_ITER(hh, blocks, block, tmpblock) {
HASH_DEL(blocks, block);
free(block);
}
curl_global_cleanup();
return 0;

Loading…
Cancel
Save