Browse Source

First shot on chunked transfers

master
Dirk Engling 3 months ago
parent
commit
1a70d9f9ef
  1. 39
      opentracker.c
  2. 33
      ot_fullscrape.c
  3. 93
      ot_http.c
  4. 10
      ot_http.h
  5. 21
      ot_iovec.c
  6. 1
      ot_iovec.h
  7. 54
      ot_mutex.c
  8. 5
      ot_mutex.h
  9. 23
      trackerlogic.c
  10. 1
      trackerlogic.h

39
opentracker.c

@ -79,6 +79,7 @@ static void defaul_signal_handlers( void ) {
sigaddset (&signal_mask, SIGPIPE); sigaddset (&signal_mask, SIGPIPE);
sigaddset (&signal_mask, SIGHUP); sigaddset (&signal_mask, SIGHUP);
sigaddset (&signal_mask, SIGINT); sigaddset (&signal_mask, SIGINT);
sigaddset (&signal_mask, SIGALRM);
pthread_sigmask (SIG_BLOCK, &signal_mask, NULL); pthread_sigmask (SIG_BLOCK, &signal_mask, NULL);
} }
@ -90,7 +91,7 @@ static void install_signal_handlers( void ) {
sa.sa_handler = signal_handler; sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask); sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART; sa.sa_flags = SA_RESTART;
if ((sigaction(SIGINT, &sa, NULL) == -1)) if ((sigaction(SIGINT, &sa, NULL) == -1) || (sigaction(SIGALRM, &sa, NULL) == -1) )
panic( "install_signal_handlers" ); panic( "install_signal_handlers" );
sigaddset (&signal_mask, SIGINT); sigaddset (&signal_mask, SIGINT);
@ -208,15 +209,23 @@ static void handle_read( const int64 sock, struct ot_workstruct *ws ) {
static void handle_write( const int64 sock ) { static void handle_write( const int64 sock ) {
struct http_data* cookie=io_getcookie( sock ); struct http_data* cookie=io_getcookie( sock );
size_t i; size_t i;
int chunked = 0;
/* Look for the first io_batch still containing bytes to write */ /* Look for the first io_batch still containing bytes to write */
if( cookie ) if( cookie ) {
for( i = 0; i < cookie->batches; ++i ) if( cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER )
chunked = 1;
for( i = 0; i < cookie->batches; ++i ) {
fprintf(stderr, "handle_write inspects batch %d of %d (bytes left: %d)\n", i, cookie->batches, cookie->batch[i].bytesleft);
if( cookie->batch[i].bytesleft ) { if( cookie->batch[i].bytesleft ) {
int64 res = iob_send( sock, cookie->batch + i ); int64 res = iob_send( sock, cookie->batch + i );
if( res == -3 ) fprintf(stderr, "handle_write yields res %lld when trying to iob_send\n", res);
break; if( res == -3 ) {
handle_dead( sock );
return;
}
if( !cookie->batch[i].bytesleft ) if( !cookie->batch[i].bytesleft )
continue; continue;
@ -224,8 +233,17 @@ static void handle_write( const int64 sock ) {
if( res == -1 || res > 0 || i < cookie->batches - 1 ) if( res == -1 || res > 0 || i < cookie->batches - 1 )
return; return;
} }
}
}
handle_dead( sock ); /* In a chunked transfer after all batches accumulated have been sent, wait for the next one */
if( chunked ) {
//fprintf( stderr, "handle_write is STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER => dont want write on sock %lld\n", sock);
//io_dontwantwrite( sock );
} else {
fprintf( stderr, "handle_write is STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER => handle dead on sock %lld\n", sock);
handle_dead( sock );
}
} }
static void handle_accept( const int64 serversocket ) { static void handle_accept( const int64 serversocket ) {
@ -266,7 +284,7 @@ static void * server_mainloop( void * args ) {
struct ot_workstruct ws; struct ot_workstruct ws;
time_t next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL; time_t next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
struct iovec *iovector; struct iovec *iovector;
int iovec_entries; int iovec_entries, is_partial;
(void)args; (void)args;
@ -305,8 +323,8 @@ static void * server_mainloop( void * args ) {
handle_read( sock, &ws ); handle_read( sock, &ws );
} }
while( ( sock = mutex_workqueue_popresult( &iovec_entries, &iovector ) ) != -1 ) while( ( sock = mutex_workqueue_popresult( &iovec_entries, &iovector, &is_partial ) ) != -1 )
http_sendiovecdata( sock, &ws, iovec_entries, iovector ); http_sendiovecdata( sock, &ws, iovec_entries, iovector, is_partial );
while( ( sock = io_canwrite( ) ) != -1 ) while( ( sock = io_canwrite( ) ) != -1 )
handle_write( sock ); handle_write( sock );
@ -318,9 +336,6 @@ static void * server_mainloop( void * args ) {
} }
livesync_ticker(); livesync_ticker();
/* Enforce setting the clock */
signal_handler( SIGALRM );
} }
return 0; return 0;
} }

33
ot_fullscrape.c

@ -36,9 +36,9 @@
#define OT_SCRAPE_MAXENTRYLEN 256 #define OT_SCRAPE_MAXENTRYLEN 256
/* Forward declaration */ /* Forward declaration */
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); static void fullscrape_make( int taskid, ot_tasktype mode);
#ifdef WANT_COMPRESSION_GZIP #ifdef WANT_COMPRESSION_GZIP
static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); static void fullscrape_make_gzip( int taskid, ot_tasktype mode);
#endif #endif
/* Converter function from memory to human readable hex strings /* Converter function from memory to human readable hex strings
@ -49,9 +49,6 @@ static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=
It grabs tasks from mutex_tasklist and delivers results back It grabs tasks from mutex_tasklist and delivers results back
*/ */
static void * fullscrape_worker( void * args ) { static void * fullscrape_worker( void * args ) {
int iovec_entries;
struct iovec *iovector;
(void) args; (void) args;
while( g_opentracker_running ) { while( g_opentracker_running ) {
@ -59,12 +56,11 @@ static void * fullscrape_worker( void * args ) {
ot_taskid taskid = mutex_workqueue_poptask( &tasktype ); ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
#ifdef WANT_COMPRESSION_GZIP #ifdef WANT_COMPRESSION_GZIP
if (tasktype & TASK_FLAG_GZIP) if (tasktype & TASK_FLAG_GZIP)
fullscrape_make_gzip( &iovec_entries, &iovector, tasktype ); fullscrape_make_gzip( taskid, tasktype );
else else
#endif #endif
fullscrape_make( &iovec_entries, &iovector, tasktype ); fullscrape_make( taskid, tasktype );
if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) ) mutex_workqueue_pushchunked( taskid, NULL );
iovec_free( &iovec_entries, &iovector );
} }
return NULL; return NULL;
} }
@ -123,14 +119,13 @@ static char * fullscrape_write_one( ot_tasktype mode, char *r, ot_torrent *torre
return r; return r;
} }
static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { static void fullscrape_make( int taskid, ot_tasktype mode ) {
int bucket; int bucket;
char *r, *re; char *r, *re;
struct iovec iovector = { NULL, 0 };
/* Setup return vector... */ /* Setup return vector... */
*iovec_entries = 0; r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE );
*iovector = NULL;
r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE );
if( !r ) if( !r )
return; return;
@ -152,8 +147,14 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash ); r = fullscrape_write_one( mode, r, torrents+i, &torrents[i].hash );
if( r > re) { if( r > re) {
iovector.iov_len = r - (char *)iovector.iov_base;
if (mutex_workqueue_pushchunked(taskid, &iovector) ) {
free(iovector.iov_base);
return mutex_bucket_unlock( bucket, 0 );
}
/* Allocate a fresh output buffer at the end of our buffers list */ /* Allocate a fresh output buffer at the end of our buffers list */
r = iovec_fix_increase_or_free( iovec_entries, iovector, r, OT_SCRAPE_CHUNK_SIZE ); r = iovector.iov_base = malloc( OT_SCRAPE_CHUNK_SIZE );
if( !r ) if( !r )
return mutex_bucket_unlock( bucket, 0 ); return mutex_bucket_unlock( bucket, 0 );
@ -174,7 +175,9 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
r += sprintf( r, "ee" ); r += sprintf( r, "ee" );
/* Release unused memory in current output buffer */ /* Release unused memory in current output buffer */
iovec_fixlast( iovec_entries, iovector, r ); iovector.iov_len = r - (char *)iovector.iov_base;
if( mutex_workqueue_pushchunked(taskid, &iovector) )
free(iovector.iov_base);
} }
#ifdef WANT_COMPRESSION_GZIP #ifdef WANT_COMPRESSION_GZIP

93
ot_http.c

@ -121,9 +121,10 @@ ssize_t http_issue_error( const int64 sock, struct ot_workstruct *ws, int code )
return ws->reply_size = -2; return ws->reply_size = -2;
} }
ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector ) { ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector, int is_partial ) {
struct http_data *cookie = io_getcookie( sock ); struct http_data *cookie = io_getcookie( sock );
char *header; char *header;
const char *encoding = "";
int i; int i;
size_t header_size, size = iovec_length( &iovec_entries, (const struct iovec **)&iovector ); size_t header_size, size = iovec_length( &iovec_entries, (const struct iovec **)&iovector );
tai6464 t; tai6464 t;
@ -140,54 +141,72 @@ ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iove
/* If we came here, wait for the answer is over */ /* If we came here, wait for the answer is over */
cookie->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK; cookie->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK;
/* Our answers never are 0 vectors. Return an error. */ fprintf(stderr, "http_sendiovecdata sending %d iovec entries found cookie->batch == %p\n", iovec_entries, cookie->batch);
if( !iovec_entries ) {
HTTPERROR_500;
}
/* Prepare space for http header */ if( iovec_entries ) {
header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGTH_CONTENT_ENCODING );
if( !header ) {
iovec_free( &iovec_entries, &iovector );
HTTPERROR_500;
}
if( cookie->flag & STRUCT_HTTP_FLAG_GZIP ) /* Prepare space for http header */
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: gzip\r\nContent-Length: %zd\r\n\r\n", size ); header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGTH_CONTENT_ENCODING );
else if( cookie->flag & STRUCT_HTTP_FLAG_BZIP2 ) if( !header ) {
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: bzip2\r\nContent-Length: %zd\r\n\r\n", size ); iovec_free( &iovec_entries, &iovector );
else HTTPERROR_500;
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size ); }
if (!cookie->batch ) { if( cookie->flag & STRUCT_HTTP_FLAG_GZIP )
cookie->batch = malloc( sizeof(io_batch) ); encoding = "Content-Encoding: gzip\r\n";
memset( cookie->batch, 0, sizeof(io_batch) ); else if( cookie->flag & STRUCT_HTTP_FLAG_BZIP2 )
cookie->batches = 1; encoding = "Content-Encoding: bzip2\r\n";
}
iob_addbuf_free( cookie->batch, header, header_size ); if( !(cookie->flag & STRUCT_HTTP_FLAG_CHUNKED) )
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n%sContent-Length: %zd\r\n\r\n", encoding, size );
else {
if ( !(cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER )) {
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n%sTransfer-Encoding: chunked\r\n\r\n%zx\r\n", encoding, size );
cookie->flag |= STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER;
} else
header_size = sprintf( header, "%zx\r\n", size );
}
/* Split huge iovectors into separate io_batches */ if (!cookie->batch ) {
for( i=0; i<iovec_entries; ++i ) { cookie->batch = malloc( sizeof(io_batch) );
io_batch *current = cookie->batch + cookie->batches - 1; memset( cookie->batch, 0, sizeof(io_batch) );
cookie->batches = 1;
}
iob_addbuf_free( cookie->batch, header, header_size );
/* If the current batch's limit is reached, try to reallocate a new batch to work on */ /* Split huge iovectors into separate io_batches */
if( current->bytesleft > OT_BATCH_LIMIT ) { for( i=0; i<iovec_entries; ++i ) {
io_batch * new_batch = realloc( current, (cookie->batches + 1) * sizeof(io_batch) ); io_batch *current = cookie->batch + cookie->batches - 1;
/* If the current batch's limit is reached, try to reallocate a new batch to work on */
if( current->bytesleft > OT_BATCH_LIMIT ) {
fprintf(stderr, "http_sendiovecdata found batch above limit: %zd\n", current->bytesleft);
io_batch * new_batch = realloc( cookie->batch, (cookie->batches + 1) * sizeof(io_batch) );
if( new_batch ) { if( new_batch ) {
cookie->batches++; cookie->batch = new_batch;
current = cookie->batch = new_batch; current = cookie->batch + cookie->batches++;
memset( current, 0, sizeof(io_batch) ); memset( current, 0, sizeof(io_batch) );
} }
}
fprintf(stderr, "http_sendiovecdata calling iob_addbuf_free with %zd\n", iovector[i].iov_len);
iob_addbuf_free( current, iovector[i].iov_base, iovector[i].iov_len );
} }
free( iovector );
if ( cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER )
iob_addbuf(cookie->batch + cookie->batches - 1, "\r\n", 2);
}
iob_addbuf_free( current, iovector[i].iov_base, iovector[i].iov_len ); if ((cookie->flag & STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER) && cookie->batch && !is_partial) {
fprintf(stderr, "http_sendiovecdata adds a terminating 0 size buffer to batch\n");
iob_addbuf(cookie->batch + cookie->batches - 1, "0\r\n\r\n", 5);
cookie->flag &= ~STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER;
} }
free( iovector );
/* writeable sockets timeout after 10 minutes */ /* writeable sockets timeout after 10 minutes */
taia_now( &t ); taia_addsec( &t, &t, OT_CLIENT_TIMEOUT_SEND ); taia_now( &t ); taia_addsec( &t, &t, OT_CLIENT_TIMEOUT_SEND );
io_timeout( sock, t ); io_timeout( sock, t );
io_dontwantread( sock ); io_dontwantread( sock );
fprintf (stderr, "http_sendiovecdata marks socket %lld as wantwrite\n", sock);
io_wantwrite( sock ); io_wantwrite( sock );
return 0; return 0;
} }
@ -254,7 +273,7 @@ static const ot_keywords keywords_format[] =
#endif #endif
#endif #endif
/* Pass this task to the worker thread */ /* Pass this task to the worker thread */
cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK | STRUCT_HTTP_FLAG_CHUNKED;
/* Clients waiting for us should not easily timeout */ /* Clients waiting for us should not easily timeout */
taia_uint( &t, 0 ); io_timeout( sock, t ); taia_uint( &t, 0 ); io_timeout( sock, t );
@ -278,7 +297,7 @@ static const ot_keywords keywords_format[] =
} }
#ifdef WANT_MODEST_FULLSCRAPES #ifdef WANT_MODEST_FULLSCRAPES
static pthread_mutex_t g_modest_fullscrape_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t g_modest_fullscrape_mutex = PTHREAD_MUTEX_INITIALIZER;
static ot_vector g_modest_fullscrape_timeouts; static ot_vector g_modest_fullscrape_timeouts;
typedef struct { ot_ip6 ip; ot_time last_fullscrape; } ot_scrape_log; typedef struct { ot_ip6 ip; ot_time last_fullscrape; } ot_scrape_log;
#endif #endif
@ -325,7 +344,7 @@ static ssize_t http_handle_fullscrape( const int64 sock, struct ot_workstruct *w
#endif #endif
/* Pass this task to the worker thread */ /* Pass this task to the worker thread */
cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; cookie->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK | STRUCT_HTTP_FLAG_CHUNKED;
/* Clients waiting for us should not easily timeout */ /* Clients waiting for us should not easily timeout */
taia_uint( &t, 0 ); io_timeout( sock, t ); taia_uint( &t, 0 ); io_timeout( sock, t );
fullscrape_deliver( sock, TASK_FULLSCRAPE | format ); fullscrape_deliver( sock, TASK_FULLSCRAPE | format );

10
ot_http.h

@ -7,9 +7,11 @@
#define OT_HTTP_H__ #define OT_HTTP_H__
typedef enum { typedef enum {
STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, STRUCT_HTTP_FLAG_WAITINGFORTASK = 1,
STRUCT_HTTP_FLAG_GZIP = 2, STRUCT_HTTP_FLAG_GZIP = 2,
STRUCT_HTTP_FLAG_BZIP2 = 4 STRUCT_HTTP_FLAG_BZIP2 = 4,
STRUCT_HTTP_FLAG_CHUNKED = 8,
STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16
} STRUCT_HTTP_FLAG; } STRUCT_HTTP_FLAG;
struct http_data { struct http_data {
@ -21,7 +23,7 @@ struct http_data {
}; };
ssize_t http_handle_request( const int64 s, struct ot_workstruct *ws ); ssize_t http_handle_request( const int64 s, struct ot_workstruct *ws );
ssize_t http_sendiovecdata( const int64 s, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector ); ssize_t http_sendiovecdata( const int64 s, struct ot_workstruct *ws, int iovec_entries, struct iovec *iovector, int is_partial );
ssize_t http_issue_error( const int64 s, struct ot_workstruct *ws, int code ); ssize_t http_issue_error( const int64 s, struct ot_workstruct *ws, int code );
extern char *g_stats_path; extern char *g_stats_path;

21
ot_iovec.c

@ -35,6 +35,26 @@ void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_al
return new_data; return new_data;
} }
void *iovec_append( int *iovec_entries, struct iovec **iovector, struct iovec *append_iovector) {
int new_entries = *iovec_entries + 1;
struct iovec *new_vec = realloc( *iovector, new_entries * sizeof( struct iovec ) );
if( !new_vec )
return NULL;
/* Take over data from appended iovec */
new_vec[*iovec_entries].iov_base = append_iovector->iov_base;
new_vec[*iovec_entries].iov_len = append_iovector->iov_len;
append_iovector->iov_base = NULL;
append_iovector->iov_len = 0;
*iovector = new_vec;
*iovec_entries = new_entries;
return new_vec;
}
void iovec_free( int *iovec_entries, struct iovec **iovector ) { void iovec_free( int *iovec_entries, struct iovec **iovector ) {
int i; int i;
for( i=0; i<*iovec_entries; ++i ) for( i=0; i<*iovec_entries; ++i )
@ -64,7 +84,6 @@ void *iovec_fix_increase_or_free( int *iovec_entries, struct iovec **iovector, v
return new_data; return new_data;
} }
size_t iovec_length( const int *iovec_entries, const struct iovec **iovector ) { size_t iovec_length( const int *iovec_entries, const struct iovec **iovector ) {
size_t length = 0; size_t length = 0;
int i; int i;

1
ot_iovec.h

@ -9,6 +9,7 @@
#include <sys/uio.h> #include <sys/uio.h>
void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_alloc ); void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_alloc );
void *iovec_append( int *iovec_entries, struct iovec **iovector, struct iovec *append_iovector );
void iovec_fixlast( int *iovec_entries, struct iovec **iovector, void *last_ptr ); void iovec_fixlast( int *iovec_entries, struct iovec **iovector, void *last_ptr );
void iovec_free( int *iovec_entries, struct iovec **iovector ); void iovec_free( int *iovec_entries, struct iovec **iovector );

54
ot_mutex.c

@ -17,6 +17,7 @@
/* Opentracker */ /* Opentracker */
#include "trackerlogic.h" #include "trackerlogic.h"
#include "ot_iovec.h"
#include "ot_mutex.h" #include "ot_mutex.h"
#include "ot_stats.h" #include "ot_stats.h"
@ -194,23 +195,66 @@ int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iove
return task ? 0 : -1; return task ? 0 : -1;
} }
int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec ) { int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec) {
struct ot_task * task;
const char byte = 'o';
/* Want exclusive access to tasklist */
pthread_mutex_lock( &tasklist_mutex );
for (task = tasklist; task; task = task->next)
if (task->taskid == taskid) {
if( iovec ) {
fprintf(stderr, "mutex_workqueue_pushchunked pushing on taskid %d\n", taskid);
if (!iovec_append(&task->iovec_entries, &task->iovec, iovec) )
return -1;
task->tasktype = TASK_DONE_PARTIAL;
} else {
fprintf(stderr, "mutex_workqueue_pushchunked finished taskid %d\n", taskid);
task->tasktype = TASK_DONE;
}
break;
}
/* Release lock */
pthread_mutex_unlock( &tasklist_mutex );
io_trywrite( g_self_pipe[1], &byte, 1 );
if(!task)
fprintf(stderr, "mutex_workqueue_pushchunked taskid %d not found\n", taskid);
/* Indicate whether the worker has to throw away results */
return task ? 0 : -1;
}
int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovec, int *is_partial ) {
struct ot_task ** task; struct ot_task ** task;
int64 sock = -1; int64 sock = -1;
*is_partial = 0;
/* Want exclusive access to tasklist */ /* Want exclusive access to tasklist */
pthread_mutex_lock( &tasklist_mutex ); pthread_mutex_lock( &tasklist_mutex );
for (task = &tasklist; *task; task = &((*task)->next)) for (task = &tasklist; *task; task = &((*task)->next))
if ((*task)->tasktype == TASK_DONE) { if (((*task)->tasktype & TASK_CLASS_MASK ) == TASK_DONE) {
struct ot_task *ptask = *task; struct ot_task *ptask = *task;
fprintf(stderr, "Got task %d type %d with %d entries\n", (*task)->taskid, (*task)->tasktype, ptask->iovec_entries);
*iovec_entries = ptask->iovec_entries; *iovec_entries = ptask->iovec_entries;
*iovec = ptask->iovec; *iovec = ptask->iovec;
sock = ptask->sock; sock = ptask->sock;
*task = ptask->next; if ((*task)->tasktype == TASK_DONE) {
free( ptask ); *task = ptask->next;
free( ptask );
} else {
ptask->iovec_entries = 0;
ptask->iovec = NULL;
*is_partial = 1;
/* Prevent task from showing up immediately again unless new data was added */
(*task)->tasktype = TASK_FULLSCRAPE;
}
break; break;
} }

5
ot_mutex.h

@ -54,9 +54,11 @@ typedef enum {
TASK_DMEM = 0x0300, TASK_DMEM = 0x0300,
TASK_DONE = 0x0f00, TASK_DONE = 0x0f00,
TASK_DONE_PARTIAL = 0x0f01,
TASK_FLAG_GZIP = 0x1000, TASK_FLAG_GZIP = 0x1000,
TASK_FLAG_BZIP2 = 0x2000, TASK_FLAG_BZIP2 = 0x2000,
TASK_FLAG_CHUNKED = 0x4000,
TASK_TASK_MASK = 0x0fff, TASK_TASK_MASK = 0x0fff,
TASK_CLASS_MASK = 0x0f00, TASK_CLASS_MASK = 0x0f00,
@ -70,6 +72,7 @@ void mutex_workqueue_canceltask( int64 sock );
void mutex_workqueue_pushsuccess( ot_taskid taskid ); void mutex_workqueue_pushsuccess( ot_taskid taskid );
ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype ); ot_taskid mutex_workqueue_poptask( ot_tasktype *tasktype );
int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector ); int mutex_workqueue_pushresult( ot_taskid taskid, int iovec_entries, struct iovec *iovector );
int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector ); int mutex_workqueue_pushchunked(ot_taskid taskid, struct iovec *iovec);
int64 mutex_workqueue_popresult( int *iovec_entries, struct iovec ** iovector, int *is_partial );
#endif #endif

23
trackerlogic.c

@ -517,6 +517,29 @@ size_t peer_size_from_peer6(ot_peer6 *peer) {
return OT_PEER_SIZE4; return OT_PEER_SIZE4;
} }
void trackerlogic_add_random_torrents(size_t amount) {
struct ot_workstruct ws;
memset( &ws, 0, sizeof(ws) );
ws.inbuf=malloc(G_INBUF_SIZE);
ws.outbuf=malloc(G_OUTBUF_SIZE);
ws.reply=ws.outbuf;
ws.hash=ws.inbuf;
while( amount-- ) {
arc4random_buf(ws.hash, sizeof(ot_hash));
arc4random_buf(&ws.peer, sizeof(ws.peer));
OT_PEERFLAG(ws.peer) &= PEER_FLAG_SEEDING | PEER_FLAG_COMPLETED | PEER_FLAG_STOPPED;
add_peer_to_torrent_and_return_peers( FLAG_TCP, &ws, 1 );
}
free(ws.inbuf);
free(ws.outbuf);
}
void exerr( char * message ) { void exerr( char * message ) {
fprintf( stderr, "%s\n", message ); fprintf( stderr, "%s\n", message );
exit( 111 ); exit( 111 );

1
trackerlogic.h

@ -190,6 +190,7 @@ size_t remove_peer_from_torrent( PROTO_FLAG proto, struct ot_workstruct *ws );
size_t return_tcp_scrape_for_torrent( ot_hash const *hash_list, int amount, char *reply ); size_t return_tcp_scrape_for_torrent( ot_hash const *hash_list, int amount, char *reply );
size_t return_udp_scrape_for_torrent( ot_hash const hash, char *reply ); size_t return_udp_scrape_for_torrent( ot_hash const hash, char *reply );
void add_torrent_from_saved_state( ot_hash const hash, ot_time base, size_t down_count ); void add_torrent_from_saved_state( ot_hash const hash, ot_time base, size_t down_count );
void trackerlogic_add_random_torrents(size_t amount);
/* torrent iterator */ /* torrent iterator */
void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data ); void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data );

Loading…
Cancel
Save