From b1c8723609578b05b999f9cb58b9c90c4787f9d6 Mon Sep 17 00:00:00 2001 From: erdgeist <> Date: Thu, 22 Nov 2007 04:39:08 +0000 Subject: [PATCH] Introducing compression for fullscrapes and tpbs stats --- opentracker.c | 59 +++++++++++++++++++++++++++-------------- ot_fullscrape.c | 70 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 104 insertions(+), 25 deletions(-) diff --git a/opentracker.c b/opentracker.c index a2ada33..57c7e9a 100644 --- a/opentracker.c +++ b/opentracker.c @@ -43,6 +43,7 @@ /* Globals */ static const size_t SUCCESS_HTTP_HEADER_LENGTH = 80; +static const size_t SUCCESS_HTTP_HEADER_LENGHT_CONTENT_ENCODING = 32; static const size_t SUCCESS_HTTP_SIZE_OFF = 17; static uint32_t g_adminip_addresses[OT_ADMINIP_MAX]; static unsigned int g_adminip_count = 0; @@ -73,15 +74,14 @@ static size_t ot_sockets_count = 0; #ifdef _DEBUG_HTTPERROR static char debug_request[8192]; -#define _DEBUG_HTTPERROR_PARAM( param ) , param -#else -#define _DEBUG_HTTPERROR_PARAM( param ) #endif typedef enum { STRUCT_HTTP_FLAG_ARRAY_USED = 1, STRUCT_HTTP_FLAG_IOB_USED = 2, - STRUCT_HTTP_FLAG_WAITINGFORTASK = 4 + STRUCT_HTTP_FLAG_WAITINGFORTASK = 4, + STRUCT_HTTP_FLAG_GZIP = 8, + STRUCT_HTTP_FLAG_BZIP2 = 16 } STRUCT_HTTP_FLAG; struct http_data { @@ -100,7 +100,7 @@ static int ot_ip_compare( const void *a, const void *b ) { return memcmp( a,b,4 int main( int argc, char **argv ); static void httperror( const int64 s, const char *title, const char *message ); -static void httpresponse( const int64 s, char *data _DEBUG_HTTPERROR_PARAM(size_t l ) ); +static void httpresponse( const int64 s, char *data, size_t l ); static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovector ); static void senddata( const int64 s, char *buffer, const size_t size ); @@ -162,7 +162,7 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec iovec_free( &iovec_entries, &iovector ); HTTPERROR_500; } - + /* If this socket collected request in a buffer, free it now */ if( h->flag & STRUCT_HTTP_FLAG_ARRAY_USED ) { @@ -173,20 +173,24 @@ static void sendiovecdata( const int64 s, int iovec_entries, struct iovec *iovec /* If we came here, wait for the answer is over */ h->flag &= ~STRUCT_HTTP_FLAG_WAITINGFORTASK; - /* Our answers never are 0 bytes. Return an error. */ - if( !iovec_entries || !iovector[0].iov_len ) { - iovec_free( &iovec_entries, &iovector ); + /* Our answers never are 0 vectors. Return an error. */ + if( !iovec_entries ) { HTTPERROR_500; } /* Prepare space for http header */ - header = malloc( SUCCESS_HTTP_HEADER_LENGTH ); + header = malloc( SUCCESS_HTTP_HEADER_LENGTH + SUCCESS_HTTP_HEADER_LENGHT_CONTENT_ENCODING ); if( !header ) { iovec_free( &iovec_entries, &iovector ); HTTPERROR_500; } - header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size ); + if( h->flag & STRUCT_HTTP_FLAG_GZIP ) + header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: gzip\r\nContent-Length: %zd\r\n\r\n", size ); + else if( h->flag & STRUCT_HTTP_FLAG_BZIP2 ) + header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Encoding: bzip2\r\nContent-Length: %zd\r\n\r\n", size ); + else + header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size ); iob_reset( &h->batch ); iob_addbuf_free( &h->batch, header, header_size ); @@ -241,9 +245,9 @@ static void senddata( const int64 s, char *buffer, size_t size ) { } } -static void httpresponse( const int64 s, char *data _DEBUG_HTTPERROR_PARAM( size_t l ) ) { +static void httpresponse( const int64 s, char *data, size_t l ) { struct http_data* h = io_getcookie( s ); - char *c; + char *c, *d=data; ot_peer peer; ot_torrent *torrent; ot_hash *hash = NULL; @@ -253,6 +257,9 @@ static void httpresponse( const int64 s, char *data _DEBUG_HTTPERROR_PARAM( size ssize_t len; size_t reply_size = 0, reply_off; + /* Touch l and d in case it is unused */ + l = l; d = d; + #ifdef _DEBUG_HTTPERROR if( l >= sizeof( debug_request ) ) l = sizeof( debug_request) - 1; @@ -379,6 +386,12 @@ LOG_TO_STDERR( "sync: %d.%d.%d.%d\n", h->ip[0], h->ip[1], h->ip[2], h->ip[3] ); } if( mode == TASK_STATS_TPB ) { +#ifdef WANT_COMPRESSION_GZIP + if( strnstr( d, "gzip", l ) ) { + h->flag |= STRUCT_HTTP_FLAG_GZIP; + format |= TASK_FLAG_GZIP; + } +#endif /* Pass this task to the worker thread */ h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; fullscrape_deliver( s, format ); @@ -403,9 +416,17 @@ LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now #ifdef _DEBUG_HTTPERROR write( 2, debug_request, l ); #endif + format = 0; +#ifdef WANT_COMPRESSION_GZIP + if( strnstr( d, "gzip", l ) ) { + h->flag |= STRUCT_HTTP_FLAG_GZIP; + format = TASK_FLAG_GZIP; + } +#endif + /* Pass this task to the worker thread */ h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK; - fullscrape_deliver( s, TASK_FULLSCRAPE ); + fullscrape_deliver( s, TASK_FULLSCRAPE | format ); io_dontwantread( s ); return; } @@ -655,7 +676,7 @@ static void handle_read( const int64 clientsocket ) { /* If we get the whole request in one packet, handle it without copying */ if( !array_start( &h->request ) ) { if( memchr( static_inbuf, '\n', l ) ) - return httpresponse( clientsocket, static_inbuf _DEBUG_HTTPERROR_PARAM( l ) ); + return httpresponse( clientsocket, static_inbuf, l ); h->flag |= STRUCT_HTTP_FLAG_ARRAY_USED; return array_catb( &h->request, static_inbuf, l ); } @@ -670,7 +691,7 @@ static void handle_read( const int64 clientsocket ) { return httperror( clientsocket, "500 request too long", "You sent too much headers"); if( memchr( array_start( &h->request ), '\n', array_bytes( &h->request ) ) ) - return httpresponse( clientsocket, array_start( &h->request ) _DEBUG_HTTPERROR_PARAM( array_bytes( &h->request ) ) ); + return httpresponse( clientsocket, array_start( &h->request ), array_bytes( &h->request ) ); } static void handle_write( const int64 clientsocket ) { @@ -722,9 +743,9 @@ static void handle_timeouted( void ) { } static void server_mainloop( ) { - time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL; - struct iovec *iovector; - int iovec_entries; + time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL; + struct iovec *iovector; + int iovec_entries; for( ; ; ) { int64 i; diff --git a/ot_fullscrape.c b/ot_fullscrape.c index bb78b8a..d9c872e 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c @@ -7,8 +7,12 @@ #include #include #include +#ifdef WANT_COMPRESSION_GZIP +#include +#endif /* Libowfat */ +#include "byte.h" #include "textcode.h" /* Opentracker */ @@ -24,7 +28,7 @@ #define OT_SCRAPE_CHUNK_SIZE (512*1024) /* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ -#define OT_FULLSCRAPE_MAXENTRYLEN 100 +#define OT_FULLSCRAPE_MAXENTRYLEN 256 /* Forward declaration */ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); @@ -66,8 +70,12 @@ void fullscrape_deliver( int64 socket, ot_tasktype tasktype ) { } static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { - int bucket; - char *r, *re; + int bucket; + char *r, *re; +#ifdef WANT_COMPRESSION_GZIP + char compress_buffer[OT_FULLSCRAPE_MAXENTRYLEN]; + z_stream strm; +#endif /* Setup return vector... */ *iovec_entries = 0; @@ -79,8 +87,21 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas This works as a low watermark */ re = r + OT_SCRAPE_CHUNK_SIZE; +#ifdef WANT_COMPRESSION_GZIP + if( mode & TASK_FLAG_GZIP ) { + byte_zero( &strm, sizeof(strm) ); + strm.next_in = (ot_byte*)r; + if( deflateInit2(&strm,9,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) + fprintf( stderr, "not ok.\n" ); + + strm.next_out = (unsigned char*)r; + strm.avail_out = OT_SCRAPE_CHUNK_SIZE; + r = compress_buffer; + } +#endif + /* Reply dictionary only needed for bencoded fullscrape */ - if( mode == TASK_FULLSCRAPE ) { + if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { memmove( r, "d5:filesd", 9 ); r += 9; } @@ -97,7 +118,7 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[tor_offset] ).hash; - switch( mode ) { + switch( mode & TASK_TASK_MASK ) { case TASK_FULLSCRAPE: default: /* push hash as bencoded string */ @@ -122,6 +143,16 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas break; } +#ifdef WANT_COMPRESSION_GZIP + if( mode & TASK_FLAG_GZIP ) { + strm.next_in = (ot_byte*)compress_buffer; + strm.avail_in = r - compress_buffer; + if( deflate( &strm, Z_NO_FLUSH ) != Z_OK ) + fprintf( stderr, "Not ok.\n" ); + r = (char*)strm.next_out; + } +#endif + /* If we reached our low watermark in buffer... */ if( re - r <= OT_FULLSCRAPE_MAXENTRYLEN ) { @@ -134,6 +165,10 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas /* If this fails: free buffers */ iovec_free( iovec_entries, iovector ); +#ifdef WANT_COMPRESSION_GZIP + deflateEnd(&strm); +#endif + /* Release lock on current bucket and return */ mutex_bucket_unlock( bucket ); return; @@ -141,7 +176,19 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas /* Adjust new end of output buffer */ re = r + OT_SCRAPE_CHUNK_SIZE; + +#ifdef WANT_COMPRESSION_GZIP + if( mode & TASK_FLAG_GZIP ) { + strm.next_out = (ot_byte*)r; + strm.avail_out = OT_SCRAPE_CHUNK_SIZE; + } +#endif + } +#ifdef WANT_COMPRESSION_GZIP + if( mode & TASK_FLAG_GZIP ) { + r = compress_buffer; } +#endif } /* All torrents done: release lock on currenct bucket */ @@ -149,10 +196,21 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas } /* Close bencoded scrape dictionary if necessary */ - if( mode == TASK_FULLSCRAPE ) { + if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { *r++='e'; *r++='e'; } +#ifdef WANT_COMPRESSION_GZIP + if( mode & TASK_FLAG_GZIP ) { + strm.next_in = (ot_byte*) compress_buffer; + strm.avail_in = r - compress_buffer; + if( deflate( &strm, Z_FINISH ) != Z_STREAM_END ) + fprintf( stderr, "Not ok.\n" ); + r = (char*)strm.next_out; + deflateEnd(&strm); + } +#endif + /* Release unused memory in current output buffer */ iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) ); }