Browse Source

Split huge iovecs over multiple io_batches

dynamic-accesslists
Dirk Engling 4 years ago
parent
commit
95f1780f0b
  1. 27
      opentracker.c
  2. 2
      ot_fullscrape.c
  3. 36
      ot_http.c
  4. 3
      ot_http.h

27
opentracker.c

@ -156,7 +156,10 @@ static size_t header_complete( char * request, ssize_t byte_count ) {
static void handle_dead( const int64 sock ) { static void handle_dead( const int64 sock ) {
struct http_data* cookie=io_getcookie( sock ); struct http_data* cookie=io_getcookie( sock );
if( cookie ) { if( cookie ) {
iob_reset( &cookie->batch ); size_t i;
for ( i = 0; i < cookie->batches; ++i)
iob_reset( cookie->batch + i );
free( cookie->batch );
array_reset( &cookie->request ); array_reset( &cookie->request );
if( cookie->flag & STRUCT_HTTP_FLAG_WAITINGFORTASK ) if( cookie->flag & STRUCT_HTTP_FLAG_WAITINGFORTASK )
mutex_workqueue_canceltask( sock ); mutex_workqueue_canceltask( sock );
@ -204,12 +207,22 @@ static void handle_read( const int64 sock, struct ot_workstruct *ws ) {
static void handle_write( const int64 sock ) { static void handle_write( const int64 sock ) {
struct http_data* cookie=io_getcookie( sock ); struct http_data* cookie=io_getcookie( sock );
if( cookie ) { size_t i;
int64 res = iob_send( sock, &cookie->batch );
if (res == 0 || res == -3) /* Look for the first io_batch still containing bytes to write */
handle_dead( sock ); if( cookie )
} else for( i = 0; i < cookie->batches; ++i )
handle_dead( sock ); if( cookie->batch[i].bytesleft ) {
int64 res = iob_send( sock, cookie->batch + i );
if( res == -3 )
break;
if( res == -1 || res > 0 || i < cookie->batches - 1 )
return;
}
handle_dead( sock );
} }
static void handle_accept( const int64 serversocket ) { static void handle_accept( const int64 serversocket ) {

2
ot_fullscrape.c

@ -191,7 +191,7 @@ static void fullscrape_make_gzip( int *iovec_entries, struct iovec **iovector, o
byte_zero( &strm, sizeof(strm) ); byte_zero( &strm, sizeof(strm) );
strm.next_out = (uint8_t*)r; strm.next_out = (uint8_t*)r;
strm.avail_out = OT_SCRAPE_CHUNK_SIZE; strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
if( deflateInit2(&strm,7,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) if( deflateInit2(&strm,7,Z_DEFLATED,31,9,Z_DEFAULT_STRATEGY) != Z_OK )
fprintf( stderr, "not ok.\n" ); fprintf( stderr, "not ok.\n" );
if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) {

36
ot_http.c

@ -31,6 +31,7 @@
#include "ot_accesslist.h" #include "ot_accesslist.h"
#define OT_MAXMULTISCRAPE_COUNT 64 #define OT_MAXMULTISCRAPE_COUNT 64
#define OT_BATCH_LIMIT (1024*1024*16)
extern char *g_redirecturl; extern char *g_redirecturl;
char *g_stats_path; char *g_stats_path;
@ -75,7 +76,13 @@ static void http_senddata( const int64 sock, struct ot_workstruct *ws ) {
} }
memcpy( outbuf, ws->reply + written_size, ws->reply_size - written_size ); memcpy( outbuf, ws->reply + written_size, ws->reply_size - written_size );
iob_addbuf_free( &cookie->batch, outbuf, ws->reply_size - written_size ); if ( !cookie->batch ) {
cookie->batch = malloc( sizeof(io_batch) );
memset( cookie->batch, 0, sizeof(io_batch) );
cookie->batches = 1;
}
iob_addbuf_free( cookie->batch, outbuf, ws->reply_size - written_size );
/* writeable short data sockets just have a tcp timeout */ /* writeable short data sockets just have a tcp timeout */
if( !ws->keep_alive ) { if( !ws->keep_alive ) {
@ -152,12 +159,29 @@ ssize_t http_sendiovecdata( const int64 sock, struct ot_workstruct *ws, int iove
else else
header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size ); header_size = sprintf( header, "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %zd\r\n\r\n", size );
iob_reset( &cookie->batch ); if (!cookie->batch ) {
iob_addbuf_free( &cookie->batch, header, header_size ); cookie->batch = malloc( sizeof(io_batch) );
memset( cookie->batch, 0, sizeof(io_batch) );
cookie->batches = 1;
}
iob_addbuf_free( cookie->batch, header, header_size );
/* Split huge iovectors into separate io_batches */
for( i=0; i<iovec_entries; ++i ) {
io_batch *current = cookie->batch + cookie->batches;
/* If the current batch's limit is reached, try to reallocate a new batch to work on */
if( current->bytesleft > OT_BATCH_LIMIT ) {
io_batch * new_batch = realloc( current, (cookie->batches + 1) * sizeof(io_batch) );
if( new_batch ) {
cookie->batches++;
current = cookie->batch = new_batch;
memset( current, 0, sizeof(io_batch) );
}
}
/* Will move to ot_iovec.c */ iob_addbuf_munmap( current, iovector[i].iov_base, iovector[i].iov_len );
for( i=0; i<iovec_entries; ++i ) }
iob_addbuf_munmap( &cookie->batch, iovector[i].iov_base, iovector[i].iov_len );
free( iovector ); free( iovector );
/* writeable sockets timeout after 10 minutes */ /* writeable sockets timeout after 10 minutes */

3
ot_http.h

@ -14,7 +14,8 @@ typedef enum {
struct http_data { struct http_data {
array request; array request;
io_batch batch; io_batch *batch;
size_t batches;
ot_ip6 ip; ot_ip6 ip;
STRUCT_HTTP_FLAG flag; STRUCT_HTTP_FLAG flag;
}; };

Loading…
Cancel
Save