Browse Source

Move more complicated stats code to its own thread

dynamic-accesslists
erdgeist 16 years ago
parent
commit
e89a8aaf58
  1. 155
      ot_stats.c
  2. 1
      ot_stats.h

155
ot_stats.c

@ -11,6 +11,7 @@ @@ -11,6 +11,7 @@
#include <sys/mman.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
/* Libowfat */
#include "byte.h"
@ -19,6 +20,7 @@ @@ -19,6 +20,7 @@
/* Opentracker */
#include "trackerlogic.h"
#include "ot_mutex.h"
#include "ot_iovec.h"
#include "ot_stats.h"
#ifndef NO_FULLSCRAPE_LOGGING
@ -27,6 +29,10 @@ @@ -27,6 +29,10 @@
#define LOG_TO_STDERR( ... )
#endif
/* Forward declaration */
static void stats_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
#define OT_STATS_TMPSIZE 8192
/* Clumsy counters... to be rethought */
static unsigned long long ot_overall_tcp_connections = 0;
static unsigned long long ot_overall_udp_connections = 0;
@ -153,43 +159,43 @@ static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e= @@ -153,43 +159,43 @@ static char*to_hex(char*d,uint8_t*s){char*m="0123456789ABCDEF";char *t=d;char*e=
typedef struct { size_t val; ot_torrent * torrent; } ot_record;
/* Fetches stats from tracker */
size_t stats_top5_txt( char * reply ) {
size_t stats_top10_txt( char * reply ) {
size_t j;
ot_record top5s[5], top5c[5];
ot_record top10s[10], top10c[10];
char *r = reply, hex_out[42];
int idx, bucket;
byte_zero( top5s, sizeof( top5s ) );
byte_zero( top5c, sizeof( top5c ) );
byte_zero( top10s, sizeof( top10s ) );
byte_zero( top10c, sizeof( top10c ) );
for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
ot_vector *torrents_list = mutex_bucket_lock( bucket );
for( j=0; j<torrents_list->size; ++j ) {
ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[j] ).peer_list;
int idx = 4; while( (idx >= 0) && ( peer_list->peer_count > top5c[idx].val ) ) --idx;
if ( idx++ != 4 ) {
memmove( top5c + idx + 1, top5c + idx, ( 4 - idx ) * sizeof( ot_record ) );
top5c[idx].val = peer_list->peer_count;
top5c[idx].torrent = (ot_torrent*)(torrents_list->data) + j;
int idx = 9; while( (idx >= 0) && ( peer_list->peer_count > top10c[idx].val ) ) --idx;
if ( idx++ != 9 ) {
memmove( top10c + idx + 1, top10c + idx, ( 9 - idx ) * sizeof( ot_record ) );
top10c[idx].val = peer_list->peer_count;
top10c[idx].torrent = (ot_torrent*)(torrents_list->data) + j;
}
idx = 4; while( (idx >= 0) && ( peer_list->seed_count > top5s[idx].val ) ) --idx;
if ( idx++ != 4 ) {
memmove( top5s + idx + 1, top5s + idx, ( 4 - idx ) * sizeof( ot_record ) );
top5s[idx].val = peer_list->seed_count;
top5s[idx].torrent = (ot_torrent*)(torrents_list->data) + j;
idx = 9; while( (idx >= 0) && ( peer_list->seed_count > top10s[idx].val ) ) --idx;
if ( idx++ != 9 ) {
memmove( top10s + idx + 1, top10s + idx, ( 9 - idx ) * sizeof( ot_record ) );
top10s[idx].val = peer_list->seed_count;
top10s[idx].torrent = (ot_torrent*)(torrents_list->data) + j;
}
}
mutex_bucket_unlock( bucket );
}
r += sprintf( r, "Top5 torrents by peers:\n" );
for( idx=0; idx<5; ++idx )
if( top5c[idx].torrent )
r += sprintf( r, "\t%zd\t%s\n", top5c[idx].val, to_hex( hex_out, top5c[idx].torrent->hash) );
r += sprintf( r, "Top5 torrents by seeds:\n" );
for( idx=0; idx<5; ++idx )
if( top5s[idx].torrent )
r += sprintf( r, "\t%zd\t%s\n", top5s[idx].val, to_hex( hex_out, top5s[idx].torrent->hash) );
r += sprintf( r, "Top 10 torrents by peers:\n" );
for( idx=0; idx<10; ++idx )
if( top10c[idx].torrent )
r += sprintf( r, "\t%zd\t%s\n", top10c[idx].val, to_hex( hex_out, top10c[idx].torrent->hash) );
r += sprintf( r, "Top 10 torrents by seeds:\n" );
for( idx=0; idx<10; ++idx )
if( top10s[idx].torrent )
r += sprintf( r, "\t%zd\t%s\n", top10s[idx].val, to_hex( hex_out, top10s[idx].torrent->hash) );
return r - reply;
}
@ -284,6 +290,52 @@ bailout_cleanup: @@ -284,6 +290,52 @@ bailout_cleanup:
return 0;
}
/*
struct {
size_t size
size_t space
size_t count
}
*/
static ssize_t stats_vector_usage( char * reply ) {
size_t i, j, *vec_member;
char *r = reply;
int exactmatch, bucket;
ot_vector bucketsizes;
memset( &bucketsizes, 0, sizeof( bucketsizes ));
for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
ot_vector *torrents_list = mutex_bucket_lock( bucket );
for( i=0; i<torrents_list->size; ++i ) {
ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[i] ).peer_list;
for( j=0; j<OT_POOLS_COUNT; ++j ) {
if( ! ( vec_member = vector_find_or_insert(&bucketsizes, &peer_list->peers[j].size, 3 * sizeof( size_t ), 2 * sizeof(size_t), &exactmatch) ) ) {
mutex_bucket_unlock( bucket );
return 0;
}
if( !exactmatch ) {
vec_member[0] = peer_list->peers[j].size;
vec_member[1] = peer_list->peers[j].space;
vec_member[2] = 0;
} else
++vec_member[2];
}
}
mutex_bucket_unlock( bucket );
}
for( i = 0; i<bucketsizes.size; ++i ) {
r += sprintf( r, "%zd\t%zd\t%zd\n", ((size_t*)bucketsizes.data)[3*i], ((size_t*)bucketsizes.data)[3*i+1], ((size_t*)bucketsizes.data)[3*i+2] );
/* Prevent overflow. 8k should be enough for debugging */
if( r - reply > OT_STATS_TMPSIZE - 3*10+3 /* 3*%zd + 2*\t + \n */ )
break;
}
return r - reply;
}
static unsigned long events_per_time( unsigned long long events, time_t t ) {
return events / ( (unsigned int)t ? (unsigned int)t : 1 );
}
@ -454,7 +506,7 @@ size_t stats_return_tracker_version( char *reply ) { @@ -454,7 +506,7 @@ size_t stats_return_tracker_version( char *reply ) {
size_t return_stats_for_tracker( char *reply, int mode, int format ) {
format = format;
switch( mode ) {
switch( mode & TASK_TASK_MASK ) {
case TASK_STATS_CONNS:
return stats_connections_mrtg( reply );
case TASK_STATS_SCRAPE:
@ -463,18 +515,10 @@ size_t return_stats_for_tracker( char *reply, int mode, int format ) { @@ -463,18 +515,10 @@ size_t return_stats_for_tracker( char *reply, int mode, int format ) {
return stats_udpconnections_mrtg( reply );
case TASK_STATS_TCP:
return stats_tcpconnections_mrtg( reply );
case TASK_STATS_PEERS:
return stats_peers_mrtg( reply );
case TASK_STATS_TORRENTS:
return stats_torrents_mrtg( reply );
case TASK_STATS_TORADDREM:
return stats_toraddrem_mrtg( reply );
case TASK_STATS_STARTSTOP:
return stats_startstop_mrtg( reply );
case TASK_STATS_SLASH24S:
return stats_slash24s_txt( reply, 25, 16 );
case TASK_STATS_TOP5:
return stats_top5_txt( reply );
case TASK_STATS_FULLSCRAPE:
return stats_fullscrapes_mrtg( reply );
case TASK_STATS_HTTPERRORS:
@ -484,12 +528,37 @@ size_t return_stats_for_tracker( char *reply, int mode, int format ) { @@ -484,12 +528,37 @@ size_t return_stats_for_tracker( char *reply, int mode, int format ) {
#ifdef WANT_LOG_NETWORKS
case TASK_STATS_BUSY_NETWORKS:
return stats_return_busy_networks( reply );
#endif
#ifdef _DEBUG_VECTOR
case TASK_STATS_VECTOR_DEBUG:
return vector_info( reply );
#endif
default:
return 0;
}
}
static void stats_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
char *r;
*iovec_entries = 0;
*iovector = NULL;
if( !( r = iovec_increase( iovec_entries, iovector, OT_STATS_TMPSIZE ) ) )
return;
switch( mode & TASK_TASK_MASK ) {
case TASK_STATS_TORRENTS: r += stats_torrents_mrtg( r ); break;
case TASK_STATS_PEERS: r += stats_peers_mrtg( r ); break;
case TASK_STATS_SLASH24S: r += stats_slash24s_txt( r, 25, 16 ); break;
case TASK_STATS_TOP10: r += stats_top10_txt( r ); break;
case TASK_STATS_MEMORY: r += stats_vector_usage( r ); break;
default:
iovec_free(iovec_entries, iovector);
return;
}
iovec_fixlast( iovec_entries, iovector, r );
}
void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uint32_t event_data ) {
switch( event ) {
case EVENT_ACCEPT:
@ -537,12 +606,34 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uint32_t event_ @@ -537,12 +606,34 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uint32_t event_
}
}
static void * stats_worker( void * args ) {
int iovec_entries;
struct iovec *iovector;
args = args;
while( 1 ) {
ot_tasktype tasktype = TASK_STATS;
ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
stats_make( &iovec_entries, &iovector, tasktype );
if( mutex_workqueue_pushresult( taskid, iovec_entries, iovector ) )
iovec_free( &iovec_entries, &iovector );
}
return NULL;
}
void stats_deliver( int64 socket, int tasktype ) {
mutex_workqueue_pushtask( socket, tasktype );
}
static pthread_t thread_id;
void stats_init( ) {
ot_start_time = g_now;
pthread_create( &thread_id, NULL, stats_worker, NULL );
}
void stats_deinit( ) {
pthread_cancel( thread_id );
}
const char *g_version_stats_c = "$Source$: $Revision$\n";

1
ot_stats.h

@ -35,6 +35,7 @@ enum { @@ -35,6 +35,7 @@ enum {
};
void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uint32_t event_data );
void stats_deliver( int64 socket, int tasktype );
size_t return_stats_for_tracker( char *reply, int mode, int format );
size_t stats_return_tracker_version( char *reply );
void stats_init( );

Loading…
Cancel
Save