Browse Source

The BIG refactoring [tm]. Too many changes to count them. If it doesn't suite you, revert to last version.

dynamic-accesslists
erdgeist 16 years ago
parent
commit
334c6e4bbb
  1. 7
      Makefile
  2. 42
      opentracker.c
  3. 11
      opentracker.xcodeproj/project.pbxproj
  4. 2
      ot_accesslist.c
  5. 7
      ot_accesslist.h
  6. 148
      ot_clean.c
  7. 8
      ot_clean.h
  8. 1
      ot_fullscrape.c
  9. 62
      ot_http.c
  10. 2
      ot_iovec.h
  11. 19
      ot_livesync.c
  12. 4
      ot_livesync.h
  13. 2
      ot_mutex.c
  14. 13
      ot_mutex.h
  15. 71
      ot_stats.c
  16. 4
      ot_stats.h
  17. 2
      ot_udp.c
  18. 239
      ot_vector.c
  19. 20
      ot_vector.h
  20. 12
      tests/testsuite.sh
  21. 2
      tests/testsuite2.sh
  22. 291
      trackerlogic.c
  23. 32
      trackerlogic.h

7
Makefile

@ -22,16 +22,13 @@ BINDIR?=$(PREFIX)/bin @@ -22,16 +22,13 @@ BINDIR?=$(PREFIX)/bin
#FEATURES+=-DWANT_ACCESSLIST_BLACK
#FEATURES+=-DWANT_ACCESSLIST_WHITE
#FEATURES+=-DWANT_SYNC_BATCH
#FEATURES+=-DWANT_SYNC_LIVE
#FEATURES+=-DWANT_UTORRENT1600_WORKAROUND
#FEATURES+=-DWANT_IP_FROM_QUERY_STRING
#FEATURES+=-DWANT_COMPRESSION_GZIP
#FEATURES+=-DWANT_LOG_NETWORKS
#FEATURES+=-DWANT_RESTRICT_STATS
#FEATURES+=-D_DEBUG_HTTPERROR
#FEATURES+=-D_DEBUG_VECTOR
FEATURES+=-DWANT_FULLSCRAPE
@ -42,8 +39,8 @@ CFLAGS+=-I$(LIBOWFAT_HEADERS) -Wall -pipe -Wextra #-pedantic -ansi @@ -42,8 +39,8 @@ CFLAGS+=-I$(LIBOWFAT_HEADERS) -Wall -pipe -Wextra #-pedantic -ansi
LDFLAGS+=-L$(LIBOWFAT_LIBRARY) -lowfat -pthread -lz
BINARY =opentracker
HEADERS=trackerlogic.h scan_urlencoded_query.h ot_mutex.h ot_stats.h ot_sync.h ot_vector.h ot_clean.h ot_udp.h ot_iovec.h ot_fullscrape.h ot_accesslist.h ot_http.h ot_livesync.h
SOURCES=opentracker.c trackerlogic.c scan_urlencoded_query.c ot_mutex.c ot_stats.c ot_sync.c ot_vector.c ot_clean.c ot_udp.c ot_iovec.c ot_fullscrape.c ot_accesslist.c ot_http.c ot_livesync.c
HEADERS=trackerlogic.h scan_urlencoded_query.h ot_mutex.h ot_stats.h ot_vector.h ot_clean.h ot_udp.h ot_iovec.h ot_fullscrape.h ot_accesslist.h ot_http.h ot_livesync.h
SOURCES=opentracker.c trackerlogic.c scan_urlencoded_query.c ot_mutex.c ot_stats.c ot_vector.c ot_clean.c ot_udp.c ot_iovec.c ot_fullscrape.c ot_accesslist.c ot_http.c ot_livesync.c
OBJECTS = $(SOURCES:%.c=%.o)
OBJECTS_debug = $(SOURCES:%.c=%.debug.o)

42
opentracker.c

@ -5,42 +5,36 @@ @@ -5,42 +5,36 @@
$Id$ */
/* System */
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <pwd.h>
#include <ctype.h>
#include <arpa/inet.h>
/* Libowfat */
#include "socket.h"
#include "io.h"
#include "iob.h"
#include "array.h"
#include "byte.h"
#include "fmt.h"
#include "scan.h"
#include "ip4.h"
/* Opentracker */
#include "trackerlogic.h"
#include "ot_iovec.h"
#include "ot_mutex.h"
#include "ot_http.h"
#include "ot_udp.h"
#include "ot_clean.h"
#include "ot_accesslist.h"
#include "ot_stats.h"
#include "ot_livesync.h"
/* Globals */
time_t g_now;
time_t g_now_seconds;
char * g_redirecturl = NULL;
uint32_t g_tracker_id;
@ -61,7 +55,7 @@ static void signal_handler( int s ) { @@ -61,7 +55,7 @@ static void signal_handler( int s ) {
trackerlogic_deinit();
exit( 0 );
} else if( s == SIGALRM ) {
g_now = time(NULL);
g_now_seconds = time(NULL);
alarm(5);
}
}
@ -135,7 +129,7 @@ static ssize_t handle_read( const int64 clientsocket ) { @@ -135,7 +129,7 @@ static ssize_t handle_read( const int64 clientsocket ) {
if( array_failed( &h->request ) )
return http_issue_error( clientsocket, CODE_HTTPERROR_500 );
if( ( array_bytes( &h->request ) > 8192 ) && !accesslist_isblessed( (char*)&h->ip, OT_PERMISSION_MAY_SYNC ) )
if( array_bytes( &h->request ) > 8192 )
return http_issue_error( clientsocket, CODE_HTTPERROR_500 );
if( memchr( array_start( &h->request ), '\n', array_bytes( &h->request ) ) )
@ -178,7 +172,7 @@ static void handle_accept( const int64 serversocket ) { @@ -178,7 +172,7 @@ static void handle_accept( const int64 serversocket ) {
/* That breaks taia encapsulation. But there is no way to take system
time this often in FreeBSD and libowfat does not allow to set unix time */
taia_uint( &t, 0 ); /* Clear t */
tai_unix( &(t.sec), (g_now + OT_CLIENT_TIMEOUT) );
tai_unix( &(t.sec), (g_now_seconds + OT_CLIENT_TIMEOUT) );
io_timeout( i, t );
}
@ -187,8 +181,7 @@ static void handle_accept( const int64 serversocket ) { @@ -187,8 +181,7 @@ static void handle_accept( const int64 serversocket ) {
}
static void server_mainloop( ) {
static time_t ot_last_clean_time;
time_t next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
time_t next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
struct iovec *iovector;
int iovec_entries;
@ -213,20 +206,14 @@ static void server_mainloop( ) { @@ -213,20 +206,14 @@ static void server_mainloop( ) {
while( ( i = io_canwrite( ) ) != -1 )
handle_write( i );
if( g_now > next_timeout_check ) {
if( g_now_seconds > next_timeout_check ) {
while( ( i = io_timeouted() ) != -1 )
handle_dead( i );
next_timeout_check = g_now + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
next_timeout_check = g_now_seconds + OT_CLIENT_TIMEOUT_CHECKINTERVAL;
}
livesync_ticker();
/* See if we need to move our pools */
if( NOW != ot_last_clean_time ) {
ot_last_clean_time = NOW;
clean_all_torrents();
}
/* Enforce setting the clock */
signal_handler( SIGALRM );
}
@ -266,7 +253,7 @@ char * set_config_option( char **option, char *value ) { @@ -266,7 +253,7 @@ char * set_config_option( char **option, char *value ) {
fprintf( stderr, "Setting config option: %s\n", value );
#endif
while( isspace(*value) ) ++value;
if( *option ) free( *option );
free( *option );
return *option = strdup( value );
}
@ -342,11 +329,6 @@ int parse_configfile( char * config_filename ) { @@ -342,11 +329,6 @@ int parse_configfile( char * config_filename ) {
#endif
} else if(!byte_diff(p, 20, "tracker.redirect_url" ) && isspace(p[20])) {
set_config_option( &g_redirecturl, p+21 );
#ifdef WANT_SYNC_BATCH
} else if(!byte_diff(p, 26, "batchsync.cluster.admin_ip" ) && isspace(p[26])) {
if(!scan_ip4( p+27, tmpip )) goto parse_error;
accesslist_blessip( tmpip, OT_PERMISSION_MAY_SYNC );
#endif
#ifdef WANT_SYNC_LIVE
} else if(!byte_diff(p, 24, "livesync.cluster.node_ip" ) && isspace(p[24])) {
if( !scan_ip4( p+25, tmpip )) goto parse_error;
@ -408,7 +390,7 @@ while( scanon ) { @@ -408,7 +390,7 @@ while( scanon ) {
break;
case 'f': bound += parse_configfile( optarg ); break;
case 'h': help( argv[0] ); exit( 0 );
case 'v': write( 2, static_inbuf, stats_return_tracker_version( static_inbuf )); exit( 0 );
case 'v': stats_return_tracker_version( static_inbuf ); fputs( static_inbuf, stderr ); exit( 0 );
default:
case '?': usage( argv[0] ); exit( 1 );
}
@ -435,7 +417,7 @@ while( scanon ) { @@ -435,7 +417,7 @@ while( scanon ) {
signal( SIGINT, signal_handler );
signal( SIGALRM, signal_handler );
g_now = time( NULL );
g_now_seconds = time( NULL );
if( trackerlogic_init( g_serverdir ? g_serverdir : "." ) == -1 )
panic( "Logic not started" );

11
opentracker.xcodeproj/project.pbxproj

@ -14,7 +14,6 @@ @@ -14,7 +14,6 @@
654A808A0CD832FD009035DE /* scan_urlencoded_query.c in Sources */ = {isa = PBXBuildFile; fileRef = 654A80850CD832FC009035DE /* scan_urlencoded_query.c */; };
654A808B0CD832FD009035DE /* trackerlogic.c in Sources */ = {isa = PBXBuildFile; fileRef = 654A80870CD832FC009035DE /* trackerlogic.c */; };
65542D8B0CE078E800469330 /* ot_vector.c in Sources */ = {isa = PBXBuildFile; fileRef = 65542D8A0CE078E800469330 /* ot_vector.c */; };
65542D8E0CE07BA900469330 /* ot_sync.c in Sources */ = {isa = PBXBuildFile; fileRef = 65542D8D0CE07BA900469330 /* ot_sync.c */; };
65542D930CE07CED00469330 /* ot_mutex.c in Sources */ = {isa = PBXBuildFile; fileRef = 65542D8F0CE07CED00469330 /* ot_mutex.c */; };
65542D940CE07CED00469330 /* ot_stats.c in Sources */ = {isa = PBXBuildFile; fileRef = 65542D910CE07CED00469330 /* ot_stats.c */; };
65542E750CE08B9100469330 /* ot_clean.c in Sources */ = {isa = PBXBuildFile; fileRef = 65542E740CE08B9100469330 /* ot_clean.c */; };
@ -53,8 +52,6 @@ @@ -53,8 +52,6 @@
654A80880CD832FC009035DE /* trackerlogic.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = trackerlogic.h; sourceTree = "<group>"; };
65542D890CE078E800469330 /* ot_vector.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ot_vector.h; sourceTree = "<group>"; };
65542D8A0CE078E800469330 /* ot_vector.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = ot_vector.c; sourceTree = "<group>"; };
65542D8C0CE07BA900469330 /* ot_sync.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ot_sync.h; sourceTree = "<group>"; };
65542D8D0CE07BA900469330 /* ot_sync.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = ot_sync.c; sourceTree = "<group>"; };
65542D8F0CE07CED00469330 /* ot_mutex.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = ot_mutex.c; sourceTree = "<group>"; };
65542D900CE07CED00469330 /* ot_mutex.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ot_mutex.h; sourceTree = "<group>"; };
65542D910CE07CED00469330 /* ot_stats.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = ot_stats.c; sourceTree = "<group>"; };
@ -114,7 +111,6 @@ @@ -114,7 +111,6 @@
653A56B40CE28EC5000CF140 /* ot_iovec.c */,
65542D8F0CE07CED00469330 /* ot_mutex.c */,
65542D910CE07CED00469330 /* ot_stats.c */,
65542D8D0CE07BA900469330 /* ot_sync.c */,
65542EE70CE0CA6B00469330 /* ot_udp.c */,
65542D8A0CE078E800469330 /* ot_vector.c */,
654A80850CD832FC009035DE /* scan_urlencoded_query.c */,
@ -144,9 +140,8 @@ @@ -144,9 +140,8 @@
653A56B30CE28EC5000CF140 /* ot_iovec.h */,
65542D900CE07CED00469330 /* ot_mutex.h */,
65542D920CE07CED00469330 /* ot_stats.h */,
65542D8C0CE07BA900469330 /* ot_sync.h */,
65542EE60CE0CA6B00469330 /* ot_udp.h */,
65542D890CE078E800469330 /* ot_vector.h */,
65542EE60CE0CA6B00469330 /* ot_udp.h */,
654A80860CD832FC009035DE /* scan_urlencoded_query.h */,
654A80880CD832FC009035DE /* trackerlogic.h */,
);
@ -244,7 +239,6 @@ @@ -244,7 +239,6 @@
654A808A0CD832FD009035DE /* scan_urlencoded_query.c in Sources */,
654A808B0CD832FD009035DE /* trackerlogic.c in Sources */,
65542D8B0CE078E800469330 /* ot_vector.c in Sources */,
65542D8E0CE07BA900469330 /* ot_sync.c in Sources */,
65542D930CE07CED00469330 /* ot_mutex.c in Sources */,
65542D940CE07CED00469330 /* ot_stats.c in Sources */,
65542E750CE08B9100469330 /* ot_clean.c in Sources */,
@ -282,6 +276,7 @@ @@ -282,6 +276,7 @@
isa = XCBuildConfiguration;
buildSettings = {
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
GCC_ENABLE_FIX_AND_CONTINUE = YES;
GCC_MODEL_TUNING = G5;
INSTALL_PATH = /usr/local/bin;
LIBRARY_SEARCH_PATHS = (
@ -295,6 +290,7 @@ @@ -295,6 +290,7 @@
1DEB928A08733DD80010E9CD /* Debug */ = {
isa = XCBuildConfiguration;
buildSettings = {
GCC_PREPROCESSOR_DEFINITIONS = WANT_IP_FROM_QUERY_STRING;
GCC_WARN_ABOUT_RETURN_TYPE = YES;
GCC_WARN_UNUSED_VARIABLE = YES;
HEADER_SEARCH_PATHS = ../libowfat/;
@ -309,6 +305,7 @@ @@ -309,6 +305,7 @@
buildSettings = {
ARCHS = ppc;
DEAD_CODE_STRIPPING = NO;
GCC_PREPROCESSOR_DEFINITIONS = WANT_IP_FROM_QUERY_STRING;
GCC_WARN_ABOUT_RETURN_TYPE = YES;
GCC_WARN_UNUSED_VARIABLE = YES;
HEADER_SEARCH_PATHS = ../libowfat/;

2
ot_accesslist.c

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
/* Opentracker */
#include "trackerlogic.h"
#include "ot_accesslist.h"
#include "ot_vector.h"
/* GLOBAL VARIABLES */
#ifdef WANT_ACCESSLIST
@ -110,7 +111,6 @@ int accesslist_blessip( char *ip, ot_permissions permissions ) { @@ -110,7 +111,6 @@ int accesslist_blessip( char *ip, ot_permissions permissions ) {
uint8_t *_ip = (uint8_t*)ip;
fprintf( stderr, "Blessing ip address %d.%d.%d.%d with:", _ip[0], _ip[1], _ip[2], _ip[3]);
if( permissions & OT_PERMISSION_MAY_STAT ) fputs( " may_fetch_stats", stderr );
if( permissions & OT_PERMISSION_MAY_SYNC ) fputs( " may_sync_batch", stderr );
if( permissions & OT_PERMISSION_MAY_LIVESYNC ) fputs( " may_sync_live", stderr );
if( permissions & OT_PERMISSION_MAY_FULLSCRAPE ) fputs( " may_fetch_fullscrapes", stderr );
if( !permissions ) fputs(" nothing.\n", stderr); else fputs(".\n", stderr );

7
ot_accesslist.h

@ -7,7 +7,7 @@ @@ -7,7 +7,7 @@
#define __OT_ACCESSLIST_H__
#if defined ( WANT_ACCESSLIST_BLACK ) && defined (WANT_ACCESSLIST_WHITE )
#error WANT_ACCESSLIST_BLACK and WANT_ACCESSLIST_WHITE are exclusive.
# error WANT_ACCESSLIST_BLACK and WANT_ACCESSLIST_WHITE are exclusive.
#endif
#if defined ( WANT_ACCESSLIST_BLACK ) || defined (WANT_ACCESSLIST_WHITE )
@ -24,9 +24,8 @@ extern char *g_accesslist_filename; @@ -24,9 +24,8 @@ extern char *g_accesslist_filename;
typedef enum {
OT_PERMISSION_MAY_FULLSCRAPE = 0x1,
OT_PERMISSION_MAY_SYNC = 0x2,
OT_PERMISSION_MAY_STAT = 0x4,
OT_PERMISSION_MAY_LIVESYNC = 0x8
OT_PERMISSION_MAY_STAT = 0x2,
OT_PERMISSION_MAY_LIVESYNC = 0x4
} ot_permissions;
int accesslist_blessip( char * ip, ot_permissions permissions );

148
ot_clean.c

@ -7,29 +7,53 @@ @@ -7,29 +7,53 @@
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/uio.h>
#include <unistd.h>
#include <stdint.h>
/* Libowfat */
#include "byte.h"
#include "io.h"
/* Opentracker */
#include "trackerlogic.h"
#include "ot_mutex.h"
#include "ot_vector.h"
#include "ot_clean.h"
/* Returns amount of removed peers */
static ssize_t clean_single_bucket( ot_peer *peers, size_t peer_count, time_t timedout, int *removed_seeders ) {
ot_peer *last_peer = peers + peer_count, *insert_point;
time_t timediff;
/* Two scan modes: unless there is one peer removed, just increase ot_peertime */
while( peers < last_peer ) {
if( ( timediff = timedout + OT_PEERTIME( peers ) ) >= OT_PEER_TIMEOUT )
break;
OT_PEERTIME( peers++ ) = timediff;
}
/* If we at least remove one peer, we have to copy */
insert_point = peers;
while( peers < last_peer )
if( ( timediff = timedout + OT_PEERTIME( peers ) ) < OT_PEER_TIMEOUT ) {
OT_PEERTIME( peers ) = timediff;
*(uint64_t*)(insert_point++) = *(uint64_t*)(peers++);
} else
if( OT_FLAG( peers++ ) & PEER_FLAG_SEEDING )
(*removed_seeders)++;
return peers - insert_point;
}
/* Clean a single torrent
return 1 if torrent timed out
*/
int clean_single_torrent( ot_torrent *torrent ) {
ot_peerlist *peer_list = torrent->peer_list;
size_t peers_count = 0, seeds_count;
time_t timedout = (int)( NOW - peer_list->base );
int i;
#ifdef WANT_SYNC_BATCH
char *new_peers;
#endif
ot_vector *bucket_list = &peer_list->peers;
time_t timedout = (time_t)( g_now_minutes - peer_list->base );
int num_buckets = 1, removed_seeders = 0;
/* No need to clean empty torrent */
if( !timedout )
return 0;
@ -38,97 +62,67 @@ int clean_single_torrent( ot_torrent *torrent ) { @@ -38,97 +62,67 @@ int clean_single_torrent( ot_torrent *torrent ) {
return 1;
/* Nothing to be cleaned here? Test if torrent is worth keeping */
if( timedout > OT_POOLS_COUNT ) {
if( timedout > OT_PEER_TIMEOUT ) {
if( !peer_list->peer_count )
return peer_list->down_count ? 0 : 1;
timedout = OT_POOLS_COUNT;
timedout = OT_PEER_TIMEOUT;
}
/* Release vectors that have timed out */
for( i = OT_POOLS_COUNT - timedout; i < OT_POOLS_COUNT; ++i )
free( peer_list->peers[i].data);
/* Shift vectors back by the amount of pools that were shifted out */
memmove( peer_list->peers + timedout, peer_list->peers, sizeof( ot_vector ) * ( OT_POOLS_COUNT - timedout ) );
byte_zero( peer_list->peers, sizeof( ot_vector ) * timedout );
/* Shift back seed counts as well */
memmove( peer_list->seed_counts + timedout, peer_list->seed_counts, sizeof( size_t ) * ( OT_POOLS_COUNT - timedout ) );
byte_zero( peer_list->seed_counts, sizeof( size_t ) * timedout );
#ifdef WANT_SYNC_BATCH
/* Save the block modified within last OT_POOLS_TIMEOUT */
if( peer_list->peers[1].size &&
( new_peers = realloc( peer_list->changeset.data, sizeof( ot_peer ) * peer_list->peers[1].size ) ) )
{
memmove( new_peers, peer_list->peers[1].data, peer_list->peers[1].size );
peer_list->changeset.data = new_peers;
peer_list->changeset.size = sizeof( ot_peer ) * peer_list->peers[1].size;
} else {
free( peer_list->changeset.data );
memset( &peer_list->changeset, 0, sizeof( ot_vector ) );
if( OT_PEERLIST_HASBUCKETS( peer_list ) ) {
num_buckets = bucket_list->size;
bucket_list = (ot_vector *)bucket_list->data;
}
#endif
peers_count = seeds_count = 0;
for( i = 0; i < OT_POOLS_COUNT; ++i ) {
peers_count += peer_list->peers[i].size;
seeds_count += peer_list->seed_counts[i];
while( num_buckets-- ) {
size_t removed_peers = clean_single_bucket( bucket_list->data, bucket_list->size, timedout, &removed_seeders );
peer_list->peer_count -= removed_peers;
bucket_list->size -= removed_peers;
if( bucket_list->size < removed_peers )
vector_fixup_peers( bucket_list );
++bucket_list;
}
peer_list->seed_count = seeds_count;
peer_list->peer_count = peers_count;
if( peers_count )
peer_list->base = NOW;
peer_list->seed_count -= removed_seeders;
/* See, if we need to convert a torrent from simple vector to bucket list */
if( ( peer_list->peer_count > OT_PEER_BUCKET_MINCOUNT ) || OT_PEERLIST_HASBUCKETS(peer_list) )
vector_redistribute_buckets( peer_list );
if( peer_list->peer_count )
peer_list->base = g_now_minutes;
else {
/* When we got here, the last time that torrent
has been touched is OT_POOLS_COUNT units before */
peer_list->base = NOW - OT_POOLS_COUNT;
has been touched is OT_PEER_TIMEOUT Minutes before */
peer_list->base = g_now_minutes - OT_PEER_TIMEOUT;
}
return 0;
}
static void clean_make() {
int bucket;
for( bucket = OT_BUCKET_COUNT - 1; bucket >= 0; --bucket ) {
ot_vector *torrents_list = mutex_bucket_lock( bucket );
size_t toffs;
for( toffs=0; toffs<torrents_list->size; ++toffs ) {
ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + toffs;
if( clean_single_torrent( torrent ) ) {
vector_remove_torrent( torrents_list, torrent );
--toffs; continue;
}
}
mutex_bucket_unlock( bucket );
/* We want the cleanup to be spread about 2 Minutes to reduce load spikes
during cleanup. Sleeping around two minutes was chosen to allow enough
time for the actual work and fluctuations in timer. */
usleep( ( 2 * 60 * 1000000 ) / OT_BUCKET_COUNT );
}
}
/* Clean up all peers in current bucket, remove timedout pools and
torrents */
torrents */
static void * clean_worker( void * args ) {
args = args;
args=args;
while( 1 ) {
ot_tasktype tasktype = TASK_CLEAN;
ot_taskid taskid = mutex_workqueue_poptask( &tasktype );
clean_make( );
mutex_workqueue_pushsuccess( taskid );
int bucket = OT_BUCKET_COUNT;
while( bucket-- ) {
ot_vector *torrents_list = mutex_bucket_lock( bucket );
size_t toffs;
for( toffs=0; toffs<torrents_list->size; ++toffs ) {
ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + toffs;
if( clean_single_torrent( torrent ) ) {
vector_remove_torrent( torrents_list, torrent );
--toffs; continue;
}
}
mutex_bucket_unlock( bucket );
usleep( OT_CLEAN_SLEEP );
}
}
return NULL;
}
void clean_all_torrents( ) {
mutex_workqueue_pushtask( 0, TASK_CLEAN );
}
static pthread_t thread_id;
void clean_init( void ) {
pthread_create( &thread_id, NULL, clean_worker, NULL );

8
ot_clean.h

@ -6,10 +6,14 @@ @@ -6,10 +6,14 @@
#ifndef __OT_CLEAN_H__
#define __OT_CLEAN_H__
/* The amount of time a clean cycle should take */
#define OT_CLEAN_INTERVAL_MINUTES 2
/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
#define OT_CLEAN_SLEEP ( ( ( OT_CLEAN_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) )
void clean_init( void );
void clean_deinit( void );
void clean_all_torrents( void );
int clean_single_torrent( ot_torrent *torrent );
#endif

1
ot_fullscrape.c

@ -7,7 +7,6 @@ @@ -7,7 +7,6 @@
/* System */
#include <sys/param.h>
#include <sys/uio.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>

62
ot_http.c

@ -5,7 +5,6 @@ @@ -5,7 +5,6 @@
/* System */
#include <sys/types.h>
#include <sys/uio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <stdio.h>
@ -26,7 +25,6 @@ @@ -26,7 +25,6 @@
#include "ot_fullscrape.h"
#include "ot_stats.h"
#include "ot_accesslist.h"
#include "ot_sync.h"
#define OT_MAXMULTISCRAPE_COUNT 64
static ot_hash multiscrape_buf[OT_MAXMULTISCRAPE_COUNT];
@ -165,52 +163,6 @@ ssize_t http_sendiovecdata( const int64 client_socket, int iovec_entries, struct @@ -165,52 +163,6 @@ ssize_t http_sendiovecdata( const int64 client_socket, int iovec_entries, struct
return 0;
}
#ifdef WANT_SYNC_BATCH
static ssize_t http_handle_sync( const int64 client_socket, char *data ) {
struct http_data* h = io_getcookie( client_socket );
size_t len;
int mode = SYNC_OUT, scanon = 1;
char *c = data;
if( !accesslist_isblessed( h->ip, OT_PERMISSION_MAY_SYNC ) )
HTTPERROR_403_IP;
while( scanon ) {
switch( scan_urlencoded_query( &c, data = c, SCAN_SEARCHPATH_PARAM ) ) {
case -2: scanon = 0; break; /* TERMINATOR */
case -1: HTTPERROR_400_PARAM; /* PARSE ERROR */
default: scan_urlencoded_skipvalue( &c ); break;
case 9:
if(byte_diff(data,9,"changeset")) {
scan_urlencoded_skipvalue( &c );
continue;
}
/* ignore this, when we dont at least see "d4:syncdee" */
if( ( len = scan_urlencoded_query( &c, data = c, SCAN_SEARCHPATH_VALUE ) ) < 10 ) HTTPERROR_400_PARAM;
if( add_changeset_to_tracker( (uint8_t*)data, len ) ) HTTPERROR_400_PARAM;
if( mode == SYNC_OUT ) {
stats_issue_event( EVENT_SYNC_IN, FLAG_TCP, 0 );
mode = SYNC_IN;
}
break;
}
}
if( mode == SYNC_OUT ) {
/* Pass this task to the worker thread */
h->flag |= STRUCT_HTTP_FLAG_WAITINGFORTASK;
stats_issue_event( EVENT_SYNC_OUT_REQUEST, FLAG_TCP, 0 );
sync_deliver( client_socket );
io_dontwantread( client_socket );
return -2;
}
/* Simple but proof for now */
memmove( static_outbuf + SUCCESS_HTTP_HEADER_LENGTH, "OK", 2);
return 2;
}
#endif
static ssize_t http_handle_stats( const int64 client_socket, char *data, char *d, size_t l ) {
char *c = data;
int mode = TASK_STATS_PEERS, scanon = 1, format = 0;
@ -245,10 +197,6 @@ static ssize_t http_handle_stats( const int64 client_socket, char *data, char *d @@ -245,10 +197,6 @@ static ssize_t http_handle_stats( const int64 client_socket, char *data, char *d
mode = TASK_STATS_UDP;
else if( !byte_diff(data,4,"busy"))
mode = TASK_STATS_BUSY_NETWORKS;
else if( !byte_diff(data,4,"dmem"))
mode = TASK_STATS_MEMORY;
else if( !byte_diff(data,4,"vdeb"))
mode = TASK_STATS_VECTOR_DEBUG;
else if( !byte_diff(data,4,"torr"))
mode = TASK_STATS_TORRENTS;
else if( !byte_diff(data,4,"fscr"))
@ -265,7 +213,7 @@ static ssize_t http_handle_stats( const int64 client_socket, char *data, char *d @@ -265,7 +213,7 @@ static ssize_t http_handle_stats( const int64 client_socket, char *data, char *d
case 5:
if( !byte_diff(data,5,"top10"))
mode = TASK_STATS_TOP10;
if( !byte_diff(data,5,"renew"))
else if( !byte_diff(data,5,"renew"))
mode = TASK_STATS_RENEW;
else
HTTPERROR_400_PARAM;
@ -524,7 +472,7 @@ static ssize_t http_handle_announce( const int64 client_socket, char *data ) { @@ -524,7 +472,7 @@ static ssize_t http_handle_announce( const int64 client_socket, char *data ) {
len = remove_peer_from_torrent( hash, &peer, SUCCESS_HTTP_HEADER_LENGTH + static_outbuf, FLAG_TCP );
else {
torrent = add_peer_to_torrent( hash, &peer WANT_SYNC_PARAM( 0 ) );
if( !torrent || !( len = return_peers_for_torrent( hash, numwant, SUCCESS_HTTP_HEADER_LENGTH + static_outbuf, FLAG_TCP ) ) ) HTTPERROR_500;
if( !torrent || !( len = return_peers_for_torrent( torrent, numwant, SUCCESS_HTTP_HEADER_LENGTH + static_outbuf, FLAG_TCP ) ) ) HTTPERROR_500;
}
stats_issue_event( EVENT_ANNOUNCE, FLAG_TCP, len);
return len;
@ -573,12 +521,6 @@ ssize_t http_handle_request( const int64 client_socket, char *data, size_t recv_ @@ -573,12 +521,6 @@ ssize_t http_handle_request( const int64 client_socket, char *data, size_t recv_
reply_size = http_handle_scrape( client_socket, c );
/* All the rest is matched the standard way */
else switch( len ) {
#ifdef WANT_SYNC_BATCH
case 4: /* sync ? */
if( byte_diff( data, 4, "sync") ) HTTPERROR_404;
reply_size = http_handle_sync( client_socket, c );
break;
#endif
case 5: /* stats ? */
if( byte_diff( data, 5, "stats") ) HTTPERROR_404;
reply_size = http_handle_stats( client_socket, c, recv_header, recv_length );

2
ot_iovec.h

@ -6,6 +6,8 @@ @@ -6,6 +6,8 @@
#ifndef __OT_IOVEC_H__
#define __OT_IOVEC_H__
#include <sys/uio.h>
void *iovec_increase( int *iovec_entries, struct iovec **iovector, size_t new_alloc );
void iovec_fixlast( int *iovec_entries, struct iovec **iovector, void *last_ptr );
void iovec_free( int *iovec_entries, struct iovec **iovector );

19
ot_livesync.c

@ -50,7 +50,7 @@ void livesync_init( ) { @@ -50,7 +50,7 @@ void livesync_init( ) {
livesync_outbuffer_pos = livesync_outbuffer_start;
memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
livesync_outbuffer_pos += sizeof( g_tracker_id );
livesync_lastpacket_time = g_now;
livesync_lastpacket_time = g_now_seconds;
pthread_create( &thread_id, NULL, livesync_worker, NULL );
}
@ -88,14 +88,13 @@ static void livesync_issuepacket( ) { @@ -88,14 +88,13 @@ static void livesync_issuepacket( ) {
socket_send4(g_livesync_socket_out, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start,
groupip_1, LIVESYNC_PORT);
livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id );
livesync_lastpacket_time = g_now;
livesync_lastpacket_time = g_now_seconds;
}
/* Inform live sync about whats going on. */
void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer, const uint8_t peerflag ) {
void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer ) {
memmove( livesync_outbuffer_pos , info_hash, sizeof(ot_hash));
memmove( livesync_outbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer));
OT_FLAG( livesync_outbuffer_pos + sizeof(ot_hash) ) |= peerflag;
livesync_outbuffer_pos += sizeof(ot_hash) + sizeof(ot_peer);
if( livesync_outbuffer_pos >= livesync_outbuffer_highwater )
@ -106,7 +105,7 @@ void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer, const @@ -106,7 +105,7 @@ void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer, const
stuck when there's not enough traffic to fill udp packets fast
enough */
void livesync_ticker( ) {
if( ( g_now - livesync_lastpacket_time > LIVESYNC_MAXDELAY) &&
if( ( g_now_seconds - livesync_lastpacket_time > LIVESYNC_MAXDELAY) &&
( livesync_outbuffer_pos > livesync_outbuffer_start + sizeof( g_tracker_id ) ) )
livesync_issuepacket();
}
@ -126,22 +125,22 @@ static void * livesync_worker( void * args ) { @@ -126,22 +125,22 @@ static void * livesync_worker( void * args ) {
continue;
if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) {
// TODO: log invalid sync packet
/* TODO: log invalid sync packet */
continue;
}
if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) {
// TODO: log invalid sync packet
/* TODO: log invalid sync packet */
continue;
}
if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
// TODO: log packet coming from ourselves
/* TODO: log packet coming from ourselves */
continue;
}
// Now basic sanity checks have been done on the live sync packet
// We might add more testing and logging.
/* Now basic sanity checks have been done on the live sync packet
We might add more testing and logging. */
while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash));
ot_hash *hash = (ot_hash*)(livesync_inbuffer + off);

4
ot_livesync.h

@ -35,7 +35,6 @@ @@ -35,7 +35,6 @@
]+
]*
*/
#ifdef WANT_SYNC_LIVE
@ -49,7 +48,7 @@ void livesync_deinit(); @@ -49,7 +48,7 @@ void livesync_deinit();
void livesync_bind_mcast( char *ip, uint16_t port );
/* Inform live sync about whats going on. */
void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer, const uint8_t peerflag );
void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer );
/* Tickle the live sync module from time to time, so no events get
stuck when there's not enough traffic to fill udp packets fast
@ -63,7 +62,6 @@ void handle_livesync( const int64 serversocket ); @@ -63,7 +62,6 @@ void handle_livesync( const int64 serversocket );
/* If no syncing is required, save calling code from #ifdef
constructions */
#define livesync_init()
#define livesync_ticker()
#define handle_livesync(a)

2
ot_mutex.c

@ -174,7 +174,7 @@ void mutex_workqueue_canceltask( int64 socket ) { @@ -174,7 +174,7 @@ void mutex_workqueue_canceltask( int64 socket ) {
/* Free task's iovec */
for( i=0; i<(*task)->iovec_entries; ++i )
munmap( iovec[i].iov_base , iovec[i].iov_len );
munmap( iovec[i].iov_base, iovec[i].iov_len );
*task = (*task)->next;
free( ptask );

13
ot_mutex.h

@ -6,6 +6,8 @@ @@ -6,6 +6,8 @@
#ifndef __OT_MUTEX_H__
#define __OT_MUTEX_H__
#include <sys/uio.h>
void mutex_init( );
void mutex_deinit( );
@ -27,27 +29,20 @@ typedef enum { @@ -27,27 +29,20 @@ typedef enum {
TASK_STATS_TORADDREM = 0x0009,
TASK_STATS_VERSION = 0x000a,
TASK_STATS_BUSY_NETWORKS = 0x000b,
TASK_STATS_VECTOR_DEBUG = 0x000c,
TASK_STATS_RENEW = 0x000d,
TASK_STATS_RENEW = 0x000c,
TASK_STATS = 0x0100, /* Mask */
TASK_STATS_TORRENTS = 0x0101,
TASK_STATS_PEERS = 0x0102,
TASK_STATS_SLASH24S = 0x0103,
TASK_STATS_TOP10 = 0x0104,
TASK_STATS_MEMORY = 0x0105,
TASK_FULLSCRAPE = 0x0200, /* Default mode */
TASK_FULLSCRAPE_TPB_BINARY = 0x0201,
TASK_FULLSCRAPE_TPB_ASCII = 0x0202,
TASK_FULLSCRAPE_TPB_URLENCODED = 0x0203,
TASK_CLEAN = 0x0300,
TASK_SYNC_OUT = 0x0400,
TASK_SYNC_IN = 0x0401,
TASK_DMEM = 0x0500,
TASK_DMEM = 0x0300,
TASK_DONE = 0x0f00,

71
ot_stats.c

@ -46,7 +46,7 @@ static unsigned long long ot_full_scrape_count = 0; @@ -46,7 +46,7 @@ static unsigned long long ot_full_scrape_count = 0;
static unsigned long long ot_full_scrape_request_count = 0;
static unsigned long long ot_full_scrape_size = 0;
static unsigned long long ot_failed_request_counts[CODE_HTTPERROR_COUNT];
static unsigned long long ot_renewed[OT_POOLS_COUNT];
static unsigned long long ot_renewed[OT_PEER_TIMEOUT];
static time_t ot_start_time;
@ -214,7 +214,7 @@ static size_t stats_slash24s_txt( char * reply, size_t amount, uint32_t thresh ) @@ -214,7 +214,7 @@ static size_t stats_slash24s_txt( char * reply, size_t amount, uint32_t thresh )
uint32_t *counts[ NUM_BUFS ];
uint32_t slash24s[amount*2]; /* first dword amount, second dword subnet */
int bucket;
// int bucket;
size_t i, j, k, l;
char *r = reply;
@ -223,6 +223,8 @@ static size_t stats_slash24s_txt( char * reply, size_t amount, uint32_t thresh ) @@ -223,6 +223,8 @@ static size_t stats_slash24s_txt( char * reply, size_t amount, uint32_t thresh )
r += sprintf( r, "Stats for all /24s with more than %u announced torrents:\n\n", thresh );
#if 0
/* XXX: TOOD: Doesn't work yet with new peer storage model */
for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
ot_vector *torrents_list = mutex_bucket_lock( bucket );
for( j=0; j<torrents_list->size; ++j ) {
@ -248,6 +250,7 @@ static size_t stats_slash24s_txt( char * reply, size_t amount, uint32_t thresh ) @@ -248,6 +250,7 @@ static size_t stats_slash24s_txt( char * reply, size_t amount, uint32_t thresh )
}
mutex_bucket_unlock( bucket );
}
#endif
k = l = 0; /* Debug: count allocated bufs */
for( i=0; i < NUM_BUFS; ++i ) {
@ -283,8 +286,6 @@ static size_t stats_slash24s_txt( char * reply, size_t amount, uint32_t thresh ) @@ -283,8 +286,6 @@ static size_t stats_slash24s_txt( char * reply, size_t amount, uint32_t thresh )
return r - reply;
bailout_cleanup:
for( i=0; i < NUM_BUFS; ++i )
free( counts[i] );
@ -299,44 +300,6 @@ bailout_cleanup: @@ -299,44 +300,6 @@ bailout_cleanup:
}
*/
static ssize_t stats_vector_usage( char * reply ) {
size_t i, j, *vec_member;
char *r = reply;
int exactmatch, bucket;
ot_vector bucketsizes;
memset( &bucketsizes, 0, sizeof( bucketsizes ));
for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
ot_vector *torrents_list = mutex_bucket_lock( bucket );
for( i=0; i<torrents_list->size; ++i ) {
ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[i] ).peer_list;
for( j=0; j<OT_POOLS_COUNT; ++j ) {
if( ! ( vec_member = vector_find_or_insert(&bucketsizes, &peer_list->peers[j].size, 3 * sizeof( size_t ), 2 * sizeof(size_t), &exactmatch) ) ) {
mutex_bucket_unlock( bucket );
return 0;
}
if( !exactmatch ) {
vec_member[0] = peer_list->peers[j].size;
vec_member[1] = peer_list->peers[j].space;
vec_member[2] = 1;
} else
++vec_member[2];
}
}
mutex_bucket_unlock( bucket );
}
for( i = 0; i<bucketsizes.size; ++i ) {
r += sprintf( r, "%zd\t%zd\t%zd\n", ((size_t*)bucketsizes.data)[3*i], ((size_t*)bucketsizes.data)[3*i+1], ((size_t*)bucketsizes.data)[3*i+2] );
/* Prevent overflow. 8k should be enough for debugging */
if( r - reply > OT_STATS_TMPSIZE - 3*10+3 /* 3*%zd + 2*\t + \n */ )
break;
}
return r - reply;
}
static unsigned long events_per_time( unsigned long long events, time_t t ) {
return events / ( (unsigned int)t ? (unsigned int)t : 1 );
}
@ -497,20 +460,20 @@ static size_t stats_return_renew_bucket( char * reply ) { @@ -497,20 +460,20 @@ static size_t stats_return_renew_bucket( char * reply ) {
char *r = reply;
int i;
for( i=0; i<OT_POOLS_COUNT; ++i )
for( i=0; i<OT_PEER_TIMEOUT; ++i )
r+=sprintf(r,"%02i %llu\n", i, ot_renewed[i] );
return r - reply;
}
extern const char
*g_version_opentracker_c, *g_version_accesslist_c, *g_version_clean_c, *g_version_fullscrape_c, *g_version_http_c,
*g_version_iovec_c, *g_version_mutex_c, *g_version_stats_c, *g_version_sync_c, *g_version_udp_c, *g_version_vector_c,
*g_version_iovec_c, *g_version_mutex_c, *g_version_stats_c, *g_version_udp_c, *g_version_vector_c,
*g_version_scan_urlencoded_query_c, *g_version_trackerlogic_c, *g_version_livesync_c;
size_t stats_return_tracker_version( char *reply ) {
return sprintf( reply, "%s%s%s%s%s%s%s%s%s%s%s%s%s%s",
return sprintf( reply, "%s%s%s%s%s%s%s%s%s%s%s%s%s",
g_version_opentracker_c, g_version_accesslist_c, g_version_clean_c, g_version_fullscrape_c, g_version_http_c,
g_version_iovec_c, g_version_mutex_c, g_version_stats_c, g_version_sync_c, g_version_udp_c, g_version_vector_c,
g_version_iovec_c, g_version_mutex_c, g_version_stats_c, g_version_udp_c, g_version_vector_c,
g_version_scan_urlencoded_query_c, g_version_trackerlogic_c, g_version_livesync_c );
}
@ -540,10 +503,6 @@ size_t return_stats_for_tracker( char *reply, int mode, int format ) { @@ -540,10 +503,6 @@ size_t return_stats_for_tracker( char *reply, int mode, int format ) {
#ifdef WANT_LOG_NETWORKS
case TASK_STATS_BUSY_NETWORKS:
return stats_return_busy_networks( reply );
#endif
#ifdef _DEBUG_VECTOR
case TASK_STATS_VECTOR_DEBUG:
return vector_info( reply );
#endif
default:
return 0;
@ -563,7 +522,6 @@ static void stats_make( int *iovec_entries, struct iovec **iovector, ot_tasktype @@ -563,7 +522,6 @@ static void stats_make( int *iovec_entries, struct iovec **iovector, ot_tasktype
case TASK_STATS_PEERS: r += stats_peers_mrtg( r ); break;
case TASK_STATS_SLASH24S: r += stats_slash24s_txt( r, 25, 16 ); break;
case TASK_STATS_TOP10: r += stats_top10_txt( r ); break;
case TASK_STATS_MEMORY: r += stats_vector_usage( r ); break;
default:
iovec_free(iovec_entries, iovector);
return;
@ -594,14 +552,14 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uint32_t event_ @@ -594,14 +552,14 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uint32_t event_
case EVENT_FULLSCRAPE_REQUEST:
{
uint8_t ip[4]; *(uint32_t*)ip = (uint32_t)proto; /* ugly hack to transfer ip to stats */
LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now - ot_start_time), ip[0], ip[1], ip[2], ip[3] );
LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE\n", (unsigned int)(g_now_seconds - ot_start_time)/60, ip[0], ip[1], ip[2], ip[3] );
ot_full_scrape_request_count++;
}
break;
case EVENT_FULLSCRAPE_REQUEST_GZIP:
{
uint8_t ip[4]; *(uint32_t*)ip = (uint32_t)proto; /* ugly hack to transfer ip to stats */
LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE GZIP\n", (unsigned int)(g_now - ot_start_time), ip[0], ip[1], ip[2], ip[3] );
LOG_TO_STDERR( "[%08d] scrp: %d.%d.%d.%d - FULL SCRAPE GZIP\n", (unsigned int)(g_now_seconds - ot_start_time)/60, ip[0], ip[1], ip[2], ip[3] );
ot_full_scrape_request_count++;
}
break;
@ -611,11 +569,6 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uint32_t event_ @@ -611,11 +569,6 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uint32_t event_
case EVENT_RENEW:
ot_renewed[event_data]++;
break;
case EVENT_SYNC_IN_REQUEST:
case EVENT_SYNC_IN:
case EVENT_SYNC_OUT_REQUEST:
case EVENT_SYNC_OUT:
break;
default:
break;
}
@ -643,7 +596,7 @@ void stats_deliver( int64 socket, int tasktype ) { @@ -643,7 +596,7 @@ void stats_deliver( int64 socket, int tasktype ) {
static pthread_t thread_id;
void stats_init( ) {
ot_start_time = g_now;
ot_start_time = g_now_seconds;
pthread_create( &thread_id, NULL, stats_worker, NULL );
}

4
ot_stats.h

@ -16,10 +16,6 @@ typedef enum { @@ -16,10 +16,6 @@ typedef enum {
EVENT_FULLSCRAPE_REQUEST,
EVENT_FULLSCRAPE_REQUEST_GZIP,
EVENT_FULLSCRAPE, /* TCP only */
EVENT_SYNC_IN_REQUEST,
EVENT_SYNC_IN,
EVENT_SYNC_OUT_REQUEST,
EVENT_SYNC_OUT,
EVENT_FAILED
} ot_status_event;

2
ot_udp.c

@ -115,7 +115,7 @@ void handle_udp4( int64 serversocket ) { @@ -115,7 +115,7 @@ void handle_udp4( int64 serversocket ) {
if( !torrent )
return; /* XXX maybe send error */
r = 8 + return_peers_for_torrent( hash, numwant, static_outbuf + 8, FLAG_UDP );
r = 8 + return_peers_for_torrent( torrent, numwant, static_outbuf + 8, FLAG_UDP );
}
socket_send4( serversocket, static_outbuf, r, remoteip, remoteport );

239
ot_vector.c

@ -6,69 +6,65 @@ @@ -6,69 +6,65 @@
/* System */
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdio.h>
/* Opentracker */
#include "trackerlogic.h"
#include "ot_vector.h"
#ifdef _DEBUG_VECTOR
#include <stdio.h>
static uint64_t vector_debug_inc[32];
static uint64_t vector_debug_noinc[32];
static uint64_t vector_debug_dec[32];
static uint64_t vector_debug_nodec[32];
static void vector_debug( size_t old_size, ssize_t diff_size, size_t old_space, ssize_t diff_space ) {
int x = 0;
while( old_space ) { old_space>>=1; ++x; }
old_size = old_size;
if( diff_size == -1 )
if( diff_space ) vector_debug_dec[x]++; else vector_debug_nodec[x]++;
else
if( diff_space ) vector_debug_inc[x]++; else vector_debug_noinc[x]++;
}
/* Libowfat */
#include "uint32.h"
size_t vector_info( char * reply ) {
char * r = reply;
int i;
for( i=1; i<28; ++i )
r += sprintf( r, " inc % 12d -> % 12d: % 16lld\n", 1<<(i-1), 8<<(i-1), vector_debug_inc[i] );
for( i=1; i<28; ++i )
r += sprintf( r, "noinc % 12d -> % 12d: % 16lld\n", 1<<(i-1), 1<<(i-1), vector_debug_noinc[i] );
for( i=1; i<28; ++i )
r += sprintf( r, " dec % 12d -> % 12d: % 16lld\n", 1<<(i-1), 4<<(i-1), vector_debug_dec[i] );
for( i=1; i<28; ++i )
r += sprintf( r, "nodec % 12d -> % 12d: % 16lld\n", 1<<(i-1), 1<<(i-1), vector_debug_nodec[i] );
return r - reply;
static int vector_compare_peer(const void *peer1, const void *peer2 ) {
int32_t cmp = (int32_t)uint32_read(peer1) - (int32_t)uint32_read(peer2);
if (cmp == 0) cmp = ((int8_t*)peer1)[4] - ((int8_t*)peer2)[4];
if (cmp == 0) cmp = ((int8_t*)peer1)[5] - ((int8_t*)peer2)[5];
return cmp;
}
#endif
/* This function gives us a binary search that returns a pointer, even if
no exact match is found. In that case it sets exactmatch 0 and gives
calling functions the chance to insert data
NOTE: Minimal compare_size is 4.
*/
void *binary_search( const void * const key, const void * base, const size_t member_count, const size_t member_size,
size_t compare_size, int *exactmatch ) {
size_t mc = member_count;
uint8_t *lookat = ((uint8_t*)base) + member_size * (member_count >> 1);
size_t offs, mc = member_count;
int8_t *lookat = ((int8_t*)base) + member_size * (mc >> 1);
int32_t key_cache = (int32_t)uint32_read(key);
*exactmatch = 1;
while( mc ) {
int cmp = memcmp( lookat, key, compare_size);
if (cmp == 0) return (void *)lookat;
int32_t cmp = key_cache - (int32_t)uint32_read(lookat);
if (cmp == 0) {
for( offs = 4; cmp == 0 && offs < compare_size; ++offs )
cmp = ((int8_t*)key)[offs] - lookat[offs];
if( cmp == 0 )
return (void *)lookat;
}
if (cmp < 0) {
base = (void*)(lookat + member_size);
--mc;
}
mc >>= 1;
lookat = ((uint8_t*)base) + member_size * (mc >> 1);
lookat = ((int8_t*)base) + member_size * (mc >> 1);
}
*exactmatch = 0;
return (void*)lookat;
}
static uint8_t vector_hash_peer( ot_peer *peer, int bucket_count ) {
unsigned int hash = 5381, i = 6;
uint8_t *p = (uint8_t*)peer;
while( i-- ) hash += (hash<<5) + *(p++);
return hash % bucket_count;
}
/* This is the generic insert operation for our vector type.
It tries to locate the object at "key" with size "member_size" by comparing its first "compare_size" bytes with
those of objects in vector. Our special "binary_search" function does that and either returns the match or a
@ -78,17 +74,13 @@ void *binary_search( const void * const key, const void * base, const size_t mem @@ -78,17 +74,13 @@ void *binary_search( const void * const key, const void * base, const size_t mem
*/
void *vector_find_or_insert( ot_vector *vector, void *key, size_t member_size, size_t compare_size, int *exactmatch ) {
uint8_t *match = binary_search( key, vector->data, vector->size, member_size, compare_size, exactmatch );
#ifdef _DEBUG_VECTOR
size_t old_space = vector->space;
#endif
if( *exactmatch ) return match;
if( vector->size + 1 >= vector->space ) {
if( vector->size + 1 > vector->space ) {
size_t new_space = vector->space ? OT_VECTOR_GROW_RATIO * vector->space : OT_VECTOR_MIN_MEMBERS;
uint8_t *new_data = realloc( vector->data, new_space * member_size );
if( !new_data ) return NULL;
/* Adjust pointer if it moved by realloc */
match = new_data + (match - (uint8_t*)vector->data);
@ -97,56 +89,48 @@ void *vector_find_or_insert( ot_vector *vector, void *key, size_t member_size, s @@ -97,56 +89,48 @@ void *vector_find_or_insert( ot_vector *vector, void *key, size_t member_size, s
}
memmove( match + member_size, match, ((uint8_t*)vector->data) + member_size * vector->size - match );
#ifdef _DEBUG_VECTOR
vector_debug( vector->size, 1, old_space, vector->space - old_space );
#endif
vector->size++;
return match;
}
/* This function checks, whether our peer vector is a real vector
or a list of buckets and dispatches accordingly */
ot_peer *vector_find_or_insert_peer( ot_vector *vector, ot_peer *peer, int *exactmatch ) {
/* If space is zero but size is set, we're dealing with a list of vector->size buckets */
if( vector->space < vector->size )
vector = ((ot_vector*)vector->data) + vector_hash_peer(peer, vector->size );
return vector_find_or_insert( vector, peer, sizeof(ot_peer), OT_PEER_COMPARE_SIZE, exactmatch );
}
/* This is the non-generic delete from vector-operation specialized for peers in pools.
Set hysteresis == 0 if you expect the vector not to ever grow again.
It returns 0 if no peer was found (and thus not removed)
1 if a non-seeding peer was removed
2 if a seeding peer was removed
*/
int vector_remove_peer( ot_vector *vector, ot_peer *peer, int hysteresis ) {
int vector_remove_peer( ot_vector *vector, ot_peer *peer ) {
int exactmatch;
size_t shrink_thresh = hysteresis ? OT_VECTOR_SHRINK_THRESH : OT_VECTOR_SHRINK_RATIO;
ot_peer *end = ((ot_peer*)vector->data) + vector->size;
ot_peer *match;
#ifdef _DEBUG_VECTOR
size_t old_space = vector->space;
#endif
ot_peer *match, *end;
if( !vector->size ) return 0;
match = binary_search( peer, vector->data, vector->size, sizeof( ot_peer ), OT_PEER_COMPARE_SIZE, &exactmatch );
/* If space is zero but size is set, we're dealing with a list of vector->size buckets */
if( vector->space < vector->size )
vector = ((ot_vector*)vector->data) + vector_hash_peer(peer, vector->size );
end = ((ot_peer*)vector->data) + vector->size;
match = binary_search( peer, vector->data, vector->size, sizeof( ot_peer ), OT_PEER_COMPARE_SIZE, &exactmatch );
if( !exactmatch ) return 0;
exactmatch = ( OT_FLAG( match ) & PEER_FLAG_SEEDING ) ? 2 : 1;
memmove( match, match + 1, sizeof(ot_peer) * ( end - match - 1 ) );
if( ( --vector->size * shrink_thresh < vector->space ) && ( vector->space >= OT_VECTOR_SHRINK_RATIO * OT_VECTOR_MIN_MEMBERS ) ) {
vector->space /= OT_VECTOR_SHRINK_RATIO;
vector->data = realloc( vector->data, vector->space * sizeof( ot_peer ) );
}
if( !vector->size ) {
/* for peer pools its safe to let them go,
in 999 of 1000 this happens in older pools, that won't ever grow again */
free( vector->data );
vector->data = NULL;
vector->space = 0;
}
#ifdef _DEBUG_VECTOR
vector_debug( vector->size+1, -1, old_space, vector->space - old_space );
#endif
vector->size--;
vector_fixup_peers( vector );
return exactmatch;
}
void vector_remove_torrent( ot_vector *vector, ot_torrent *match ) {
ot_torrent *end = ((ot_torrent*)vector->data) + vector->size;
#ifdef _DEBUG_VECTOR
size_t old_space = vector->space;
#endif
if( !vector->size ) return;
@ -159,9 +143,118 @@ void vector_remove_torrent( ot_vector *vector, ot_torrent *match ) { @@ -159,9 +143,118 @@ void vector_remove_torrent( ot_vector *vector, ot_torrent *match ) {
vector->space /= OT_VECTOR_SHRINK_RATIO;
vector->data = realloc( vector->data, vector->space * sizeof( ot_torrent ) );
}
#ifdef _DEBUG_VECTOR
vector_debug( vector->size+1, -1, old_space, vector->space - old_space );
#endif
}
void vector_clean_list( ot_vector * vector, int num_buckets ) {
while( num_buckets-- )
free( vector[num_buckets].data );
free( vector );
return;
}
void vector_redistribute_buckets( ot_peerlist * peer_list ) {
int tmp, bucket, bucket_size_new, num_buckets_new, num_buckets_old = 1;
ot_vector * bucket_list_new, * bucket_list_old = &peer_list->peers;
if( OT_PEERLIST_HASBUCKETS( peer_list ) ) {
num_buckets_old = peer_list->peers.size;
bucket_list_old = peer_list->peers.data;
}
if( peer_list->peer_count < 255 )
num_buckets_new = 1;
else if( peer_list->peer_count > 8192 )
num_buckets_new = 64;
else if( peer_list->peer_count >= 512 && peer_list->peer_count < 4096 )
num_buckets_new = 16;
else if( peer_list->peer_count < 512 && num_buckets_old <= 16 )
num_buckets_new = num_buckets_old;
else if( peer_list->peer_count < 512 )
num_buckets_new = 1;
else if( peer_list->peer_count < 8192 && num_buckets_old > 1 )
num_buckets_new = num_buckets_old;
else
num_buckets_new = 16;
if( num_buckets_new == num_buckets_old )
return;
/* Assume near perfect distribution */
bucket_list_new = malloc( num_buckets_new * sizeof( ot_vector ) );
if( !bucket_list_new) return;
bzero( bucket_list_new, num_buckets_new * sizeof( ot_vector ) );
tmp = peer_list->peer_count / num_buckets_new;
bucket_size_new = OT_VECTOR_MIN_MEMBERS;
while( bucket_size_new < tmp)
bucket_size_new *= OT_VECTOR_GROW_RATIO;
/* preallocate vectors to hold all peers */
for( bucket=0; bucket<num_buckets_new; ++bucket ) {
bucket_list_new[bucket].space = bucket_size_new;
bucket_list_new[bucket].data = malloc( bucket_size_new * sizeof(ot_peer) );
if( !bucket_list_new[bucket].data )
return vector_clean_list( bucket_list_new, num_buckets_new );
}
/* Now sort them into the correct bucket */
for( bucket=0; bucket<num_buckets_old; ++bucket ) {
ot_peer * peers_old = bucket_list_old[bucket].data, * peers_new;
int peer_count_old = bucket_list_old[bucket].size;
while( peer_count_old-- ) {
ot_vector * bucket_dest = bucket_list_new;
if( num_buckets_new > 1 )
bucket_dest += vector_hash_peer(peers_old, num_buckets_new);
if( bucket_dest->size + 1 > bucket_dest->space ) {
void * tmp = realloc( bucket_dest->data, sizeof(ot_peer) * OT_VECTOR_GROW_RATIO * bucket_dest->space );
if( !tmp ) return vector_clean_list( bucket_list_new, num_buckets_new );
bucket_dest->data = tmp;
bucket_dest->space *= OT_VECTOR_GROW_RATIO;
}
peers_new = (ot_peer*)bucket_dest->data;
*(uint64_t*)(peers_new + bucket_dest->size++) = *(uint64_t*)(peers_old++);
}
}
/* Now sort each bucket to later allow bsearch */
for( bucket=0; bucket<num_buckets_new; ++bucket )
qsort( bucket_list_new[bucket].data, bucket_list_new[bucket].size, sizeof( ot_peer ), vector_compare_peer );
/* Everything worked fine. Now link new bucket_list to peer_list */
if( OT_PEERLIST_HASBUCKETS( peer_list) )
vector_clean_list( (ot_vector*)peer_list->peers.data, peer_list->peers.size );
else
free( peer_list->peers.data );
if( num_buckets_new > 1 ) {
peer_list->peers.data = bucket_list_new;
peer_list->peers.size = num_buckets_new;
peer_list->peers.space = 0; /* Magic marker for "is list of buckets" */
} else {
peer_list->peers.data = bucket_list_new->data;
peer_list->peers.size = bucket_list_new->size;
peer_list->peers.space = bucket_list_new->space;
free( bucket_list_new );
}
}
void vector_fixup_peers( ot_vector * vector ) {
int need_fix = 0;
if( !vector->size ) {
free( vector->data );
vector->data = NULL;
vector->space = 0;
return;
}
while( ( vector->size * OT_VECTOR_SHRINK_THRESH < vector->space ) &&
( vector->space >= OT_VECTOR_SHRINK_RATIO * OT_VECTOR_MIN_MEMBERS ) ) {
vector->space /= OT_VECTOR_SHRINK_RATIO;
need_fix++;
}
if( need_fix )
vector->data = realloc( vector->data, vector->space * sizeof( ot_peer ) );
}
const char *g_version_vector_c = "$Source$: $Revision$\n";

20
ot_vector.h

@ -12,21 +12,23 @@ @@ -12,21 +12,23 @@
#define OT_VECTOR_SHRINK_THRESH 4
#define OT_VECTOR_SHRINK_RATIO 2
#define OT_PEER_BUCKET_MINCOUNT 512
#define OT_PEER_BUCKET_MAXCOUNT 256
typedef struct {
void *data;
size_t size;
size_t space;
} ot_vector;
void *binary_search( const void * const key, const void * base, const size_t member_count, const size_t member_size,
size_t compare_size, int *exactmatch );
void *vector_find_or_insert( ot_vector *vector, void *key, size_t member_size, size_t compare_size, int *exactmatch );
int vector_remove_peer( ot_vector *vector, ot_peer *peer, int hysteresis );
void vector_remove_torrent( ot_vector *vector, ot_torrent *match );
void *binary_search( const void * const key, const void * base, const size_t member_count, const size_t member_size,
size_t compare_size, int *exactmatch );
void *vector_find_or_insert( ot_vector *vector, void *key, size_t member_size, size_t compare_size, int *exactmatch );
ot_peer *vector_find_or_insert_peer( ot_vector *vector, ot_peer *peer, int *exactmatch );
#ifdef _DEBUG_VECTOR
size_t vector_info( char * reply );
#endif
int vector_remove_peer( ot_vector *vector, ot_peer *peer );
void vector_remove_torrent( ot_vector *vector, ot_torrent *match );
void vector_redistribute_buckets( ot_peerlist * peer_list );
void vector_fixup_peers( ot_vector * vector );
#endif

12
tests/testsuite.sh

@ -1,15 +1,11 @@ @@ -1,15 +1,11 @@
#!/bin/sh
while true; do
request_string="GET /announce?info_hash=\
%$(printf %02X $(( $RANDOM & 0xff )) )\
%$(printf %02X $(( $RANDOM & 0xff )) )\
2345678901234567\
%$(printf %02X $(( $RANDOM & 0xff )) )\
%$(printf %02X $(( $RANDOM & 0xff )) )\
&ip=$(( $RANDOM & 0xff )).17.13.15&port=$(( $RANDOM & 0xff )) HTTP/1.0\n"
request_string="GET /announce?info_hash=0123456789012345678\
%$(printf %02X $(( $RANDOM & 0xf )) )\
&ip=$(( $RANDOM & 0xf )).$(( $RANDOM & 0xf )).13.16&port=$(( $RANDOM & 0xff )) HTTP/1.0\n"
# echo $request_string
echo $request_string
# echo
echo $request_string | nc 127.0.0.1 6969 >/dev/null
# echo

2
tests/testsuite2.sh

@ -8,7 +8,7 @@ while true; do @@ -8,7 +8,7 @@ while true; do
echo $request_string
echo
echo $request_string | nc 10.0.1.3 6969 >/dev/null
echo $request_string | nc 23.23.23.237 6969 >/dev/null
echo
done

291
trackerlogic.c

@ -7,17 +7,12 @@ @@ -7,17 +7,12 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/uio.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <unistd.h>
#include <time.h>
#include <math.h>
#include <errno.h>
#include <stdint.h>
/* Libowfat */
#include "scan.h"
#include "byte.h"
#include "io.h"
@ -28,26 +23,26 @@ @@ -28,26 +23,26 @@
#include "ot_clean.h"
#include "ot_accesslist.h"
#include "ot_fullscrape.h"
#include "ot_sync.h"
#include "ot_livesync.h"
void free_peerlist( ot_peerlist *peer_list ) {
size_t i;
for( i=0; i<OT_POOLS_COUNT; ++i )
if( peer_list->peers[i].data )
free( peer_list->peers[i].data );
#ifdef WANT_SYNC_BATCH
free( peer_list->changeset.data );
#endif
if( peer_list->peers.data ) {
if( OT_PEERLIST_HASBUCKETS( peer_list ) ) {
ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data);
while( peer_list->peers.size-- )
free( bucket_list++->data );
}
free( peer_list->peers.data );
}
free( peer_list );
}
ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer WANT_SYNC_PARAM( int from_changeset ) ) {
ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer WANT_SYNC_PARAM( int from_sync ) ) {
int exactmatch;
ot_torrent *torrent;
ot_peer *peer_dest;
ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ), *peer_pool;
int base_pool = 0;
ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
if( !accesslist_hashisvalid( hash ) ) {
mutex_bucket_unlock_by_hash( hash );
@ -75,106 +70,135 @@ ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer WANT_SYNC_PARAM( @@ -75,106 +70,135 @@ ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer WANT_SYNC_PARAM(
clean_single_torrent( torrent );
/* Timestamp our first pool */
torrent->peer_list->base = NOW;
torrent->peer_list->base = g_now_minutes;
/* Check for peer in torrent */
peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch );
if( !peer_dest ) {
mutex_bucket_unlock_by_hash( hash );
return NULL;
}
/* Tell peer that it's fresh */
OT_PEERTIME( peer ) = 0;
/* Sanitize flags: Whoever claims to have completed download, must be a seeder */
if( ( OT_FLAG( peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED )
OT_FLAG( peer ) ^= PEER_FLAG_COMPLETED;
#ifdef WANT_SYNC
if( from_changeset ) {
/* Check, whether peer already is in current pool, do nothing if so */
peer_pool = &torrent->peer_list->peers[0];
binary_search( peer, peer_pool->data, peer_pool->size, sizeof(ot_peer), OT_PEER_COMPARE_SIZE, &exactmatch );
if( exactmatch ) {
mutex_bucket_unlock_by_hash( hash );
return torrent;
}
base_pool = 1;
if( torrent->peer_list->base < NOW )
torrent->peer_list->base = NOW;
}
#endif
peer_pool = &torrent->peer_list->peers[ base_pool ];
peer_dest = vector_find_or_insert( peer_pool, (void*)peer, sizeof( ot_peer ), OT_PEER_COMPARE_SIZE, &exactmatch );
/* If we hadn't had a match in current pool, create peer there and
remove it from all older pools */
/* If we hadn't had a match create peer there */
if( !exactmatch ) {
int i;
memmove( peer_dest, peer, sizeof( ot_peer ) );
torrent->peer_list->peer_count++;
#ifdef WANT_SYNC_LIVE
if( !from_changeset )
livesync_tell( hash, peer, PEER_FLAG_LEECHING );
if( !from_sync )
livesync_tell( hash, peer );
#endif
if( OT_FLAG( peer ) & PEER_FLAG_COMPLETED )
torrent->peer_list->peer_count++;
if( OT_FLAG(peer) & PEER_FLAG_COMPLETED )
torrent->peer_list->down_count++;
if( OT_FLAG(peer) & PEER_FLAG_SEEDING ) {
torrent->peer_list->seed_counts[ base_pool ]++;
if( OT_FLAG(peer) & PEER_FLAG_SEEDING )
torrent->peer_list->seed_count++;
}
for( i= base_pool + 1; i<OT_POOLS_COUNT; ++i ) {
switch( vector_remove_peer( &torrent->peer_list->peers[i], peer, 0 ) ) {
case 0: continue;
case 2: torrent->peer_list->seed_counts[i]--;
torrent->peer_list->seed_count--;
case 1: default:
torrent->peer_list->peer_count--;
mutex_bucket_unlock_by_hash( hash );
stats_issue_event( EVENT_RENEW, 0, i );
return torrent;
}
}
} else {
if( (OT_FLAG(peer_dest) & PEER_FLAG_SEEDING ) && !(OT_FLAG(peer) & PEER_FLAG_SEEDING ) ) {
torrent->peer_list->seed_counts[ base_pool ]--;
stats_issue_event( EVENT_RENEW, 0, OT_PEERTIME( peer_dest ) );
if( (OT_FLAG(peer_dest) & PEER_FLAG_SEEDING ) && !(OT_FLAG(peer) & PEER_FLAG_SEEDING ) )
torrent->peer_list->seed_count--;
}
if( !(OT_FLAG(peer_dest) & PEER_FLAG_SEEDING ) && (OT_FLAG(peer) & PEER_FLAG_SEEDING ) ) {
torrent->peer_list->seed_counts[ base_pool ]++;
if( !(OT_FLAG(peer_dest) & PEER_FLAG_SEEDING ) && (OT_FLAG(peer) & PEER_FLAG_SEEDING ) )
torrent->peer_list->seed_count++;
}
if( !(OT_FLAG( peer_dest ) & PEER_FLAG_COMPLETED ) && (OT_FLAG( peer ) & PEER_FLAG_COMPLETED ) )
if( !(OT_FLAG(peer_dest) & PEER_FLAG_COMPLETED ) && (OT_FLAG(peer) & PEER_FLAG_COMPLETED ) )
torrent->peer_list->down_count++;
if( OT_FLAG( peer_dest ) & PEER_FLAG_COMPLETED )
if( OT_FLAG(peer_dest) & PEER_FLAG_COMPLETED )
OT_FLAG( peer ) |= PEER_FLAG_COMPLETED;
stats_issue_event( EVENT_RENEW, 0, base_pool );
memmove( peer_dest, peer, sizeof( ot_peer ) );
}
mutex_bucket_unlock_by_hash( hash );
*(uint64_t*)(peer_dest) = *(uint64_t*)(peer);
#ifdef WANT_SYNC
/* In order to avoid an unlock/lock between add_peers and return_peers,
we only unlock the bucket if return_peers won't do the job: either
if we return NULL or if no reply is expected, i.e. when called
from livesync code. */
if( from_sync )
mutex_bucket_unlock_by_hash( hash );
#endif
return torrent;
}
static size_t return_peers_all( ot_peerlist *peer_list, char *reply ) {
unsigned int bucket, num_buckets = 1;
ot_vector * bucket_list = &peer_list->peers;
char * r = reply;
if( OT_PEERLIST_HASBUCKETS(peer_list) ) {
num_buckets = bucket_list->size;
bucket_list = (ot_vector *)bucket_list->data;
}
for( bucket = 0; bucket<num_buckets; ++bucket ) {
ot_peer * peers = (ot_peer*)bucket_list[bucket].data;
size_t peer_count = bucket_list[bucket].size;
while( peer_count-- )
memmove( r+=6, peers++, 6 );
}
return r - reply;
}
static size_t return_peers_selection( ot_peerlist *peer_list, size_t amount, char *reply ) {
unsigned int bucket_offset, bucket_index = 0, num_buckets = 1;
ot_vector * bucket_list = &peer_list->peers;
unsigned int shifted_pc = peer_list->peer_count;
unsigned int shifted_step = 0;
unsigned int shift = 0;
char * r = reply;
if( OT_PEERLIST_HASBUCKETS(peer_list) ) {
num_buckets = bucket_list->size;
bucket_list = (ot_vector *)bucket_list->data;
}
/* Make fixpoint arithmetic as exact as possible */
#define MAXPRECBIT (1<<(8*sizeof(int)-3))
while( !(shifted_pc & MAXPRECBIT ) ) { shifted_pc <<= 1; shift++; }
shifted_step = shifted_pc/amount;
#undef MAXPRECBIT
/* Initialize somewhere in the middle of peers so that
fixpoint's aliasing doesn't alway miss the same peers */
bucket_offset = random() % peer_list->peer_count;
while( amount-- ) {
/* This is the aliased, non shifted range, next value may fall into */
unsigned int diff = ( ( ( amount + 1 ) * shifted_step ) >> shift ) -
( ( amount * shifted_step ) >> shift );
bucket_offset += 1 + random() % diff;
while( bucket_offset >= bucket_list[bucket_index].size ) {
bucket_offset -= bucket_list[bucket_index].size;
bucket_index = ( bucket_index + 1 ) % num_buckets;
}
memmove( r, ((ot_peer*)bucket_list[bucket_index].data) + bucket_offset, 6 );
r += 6;
}
return r - reply;
}
/* Compiles a list of random peers for a torrent
* reply must have enough space to hold 92+6*amount bytes
* Selector function can be anything, maybe test for seeds, etc.
* RANDOM may return huge values
* does not yet check not to return self
* the bucket, torrent resides in has been locked by the
add_peer call, the ot_torrent * was gathered from, so we
have to unlock it here.
*/
size_t return_peers_for_torrent( ot_hash *hash, size_t amount, char *reply, PROTO_FLAG proto ) {
char *r = reply;
int exactmatch;
ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
size_t return_peers_for_torrent( ot_torrent *torrent, size_t amount, char *reply, PROTO_FLAG proto ) {
ot_peerlist *peer_list = torrent->peer_list;
size_t index;
if( !torrent ) {
mutex_bucket_unlock_by_hash( hash );
return 0;
}
char *r = reply;
if( peer_list->peer_count < amount )
if( amount > peer_list->peer_count )
amount = peer_list->peer_count;
if( proto == FLAG_TCP )
r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zde8:intervali%ie5:peers%zd:", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count, OT_CLIENT_REQUEST_INTERVAL_RANDOM, 6*amount );
else {
@ -185,40 +209,16 @@ size_t return_peers_for_torrent( ot_hash *hash, size_t amount, char *reply, PROT @@ -185,40 +209,16 @@ size_t return_peers_for_torrent( ot_hash *hash, size_t amount, char *reply, PROT
}
if( amount ) {
unsigned int pool_offset, pool_index = 0;;
unsigned int shifted_pc = peer_list->peer_count;
unsigned int shifted_step = 0;
unsigned int shift = 0;
/* Make fixpoint arithmetic as exact as possible */
#define MAXPRECBIT (1<<(8*sizeof(int)-3))
while( !(shifted_pc & MAXPRECBIT ) ) { shifted_pc <<= 1; shift++; }
shifted_step = shifted_pc/amount;
#undef MAXPRECBIT
/* Initialize somewhere in the middle of peers so that
fixpoint's aliasing doesn't alway miss the same peers */
pool_offset = random() % peer_list->peer_count;
for( index = 0; index < amount; ++index ) {
/* This is the aliased, non shifted range, next value may fall into */
unsigned int diff = ( ( ( index + 1 ) * shifted_step ) >> shift ) -
( ( index * shifted_step ) >> shift );
pool_offset += 1 + random() % diff;
while( pool_offset >= peer_list->peers[pool_index].size ) {
pool_offset -= peer_list->peers[pool_index].size;
pool_index = ( pool_index + 1 ) % OT_POOLS_COUNT;
}
memmove( r, ((ot_peer*)peer_list->peers[pool_index].data) + pool_offset, 6 );
r += 6;
}
if( amount == peer_list->peer_count )
r += return_peers_all( peer_list, r );
else
r += return_peers_selection( peer_list, amount, r );
}
if( proto == FLAG_TCP )
*r++ = 'e';
mutex_bucket_unlock_by_hash( hash );
mutex_bucket_unlock_by_hash( &torrent->hash );
return r - reply;
}
@ -274,64 +274,43 @@ size_t return_tcp_scrape_for_torrent( ot_hash *hash_list, int amount, char *repl @@ -274,64 +274,43 @@ size_t return_tcp_scrape_for_torrent( ot_hash *hash_list, int amount, char *repl
return r - reply;
}
static ot_peerlist dummy_list;
size_t remove_peer_from_torrent( ot_hash *hash, ot_peer *peer, char *reply, PROTO_FLAG proto ) {
int exactmatch;
size_t index;
size_t reply_size = 0;
ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
ot_peerlist *peer_list;
ot_peerlist *peer_list = &dummy_list;
#ifdef WANT_SYNC_LIVE
if( proto != FLAG_MCA )
livesync_tell( hash, peer, PEER_FLAG_STOPPED );
#endif
if( !exactmatch ) {
mutex_bucket_unlock_by_hash( hash );
if( proto == FLAG_TCP )
return sprintf( reply, "d8:completei0e10:incompletei0e8:intervali%ie5:peers0:e", OT_CLIENT_REQUEST_INTERVAL_RANDOM );
/* Create fake packet to satisfy parser on the other end */
if( proto == FLAG_UDP ) {
((uint32_t*)reply)[2] = htonl( OT_CLIENT_REQUEST_INTERVAL_RANDOM );
((uint32_t*)reply)[3] = ((uint32_t*)reply)[4] = 0;
return (size_t)20;
}
if( proto == FLAG_MCA )
return 0;
if( proto != FLAG_MCA ) {
OT_FLAG( peer ) |= PEER_FLAG_STOPPED;
livesync_tell( hash, peer );
}
#endif
peer_list = torrent->peer_list;
for( index = 0; index<OT_POOLS_COUNT; ++index ) {
switch( vector_remove_peer( &peer_list->peers[index], peer, index == 0 ) ) {
case 0: continue;
case 2: peer_list->seed_counts[index]--;
peer_list->seed_count--;
case 1: default:
peer_list->peer_count--;
goto exit_loop;
if( exactmatch ) {
peer_list = torrent->peer_list;
switch( vector_remove_peer( &peer_list->peers, peer ) ) {
case 2: peer_list->seed_count--; /* Fall throughs intended */
case 1: peer_list->peer_count--; /* Fall throughs intended */
default: break;
}
}
exit_loop:
if( proto == FLAG_TCP ) {
size_t reply_size = sprintf( reply, "d8:completei%zde10:incompletei%zde8:intervali%ie5:peers0:e", peer_list->seed_count, peer_list->peer_count - peer_list->seed_count, OT_CLIENT_REQUEST_INTERVAL_RANDOM );
mutex_bucket_unlock_by_hash( hash );
return reply_size;
}
if( proto == FLAG_TCP )
reply_size = sprintf( reply, "d8:completei%zde10:incompletei%zde8:intervali%ie5:peers0:e", peer_list->seed_count, peer_list->peer_count - peer_list->seed_count, OT_CLIENT_REQUEST_INTERVAL_RANDOM );
/* Handle UDP reply */
if( proto == FLAG_UDP ) {
((uint32_t*)reply)[2] = htonl( OT_CLIENT_REQUEST_INTERVAL_RANDOM );
((uint32_t*)reply)[3] = htonl( peer_list->peer_count - peer_list->seed_count );
((uint32_t*)reply)[4] = htonl( peer_list->seed_count);
reply_size = 20;
}
mutex_bucket_unlock_by_hash( hash );
return (size_t)20;
return reply_size;
}
void exerr( char * message ) {
@ -354,7 +333,6 @@ int trackerlogic_init( const char * const serverdir ) { @@ -354,7 +333,6 @@ int trackerlogic_init( const char * const serverdir ) {
fullscrape_init( );
accesslist_init( );
livesync_init( );
sync_init( );
stats_init( );
return 0;
@ -366,7 +344,6 @@ void trackerlogic_deinit( void ) { @@ -366,7 +344,6 @@ void trackerlogic_deinit( void ) {
/* Deinitialise background worker threads */
stats_deinit( );
sync_deinit( );
livesync_init( );
accesslist_init( );
fullscrape_deinit( );

32
trackerlogic.h

@ -22,7 +22,7 @@ typedef time_t ot_time; @@ -22,7 +22,7 @@ typedef time_t ot_time;
#define OT_CLIENT_REQUEST_VARIATION (60*6)
#define OT_TORRENT_TIMEOUT_HOURS 24
#define OT_TORRENT_TIMEOUT ((60*60*OT_TORRENT_TIMEOUT_HOURS)/OT_POOLS_TIMEOUT)
#define OT_TORRENT_TIMEOUT (60*OT_TORRENT_TIMEOUT_HOURS)
#define OT_CLIENT_REQUEST_INTERVAL_RANDOM ( OT_CLIENT_REQUEST_INTERVAL - OT_CLIENT_REQUEST_VARIATION/2 + (int)( random( ) % OT_CLIENT_REQUEST_VARIATION ) )
@ -34,15 +34,12 @@ typedef time_t ot_time; @@ -34,15 +34,12 @@ typedef time_t ot_time;
#define OT_ADMINIP_MAX 64
#define OT_MAX_THREADS 16
/* This list points to 9 pools of peers each grouped in five-minute-intervals
thus achieving a timeout of 2700s or 45 minutes
These pools are sorted by its binary content */
#define OT_POOLS_COUNT 9
#define OT_POOLS_TIMEOUT (60*5)
#define OT_PEER_TIMEOUT 45
/* From opentracker.c */
extern time_t g_now;
#define NOW (g_now/OT_POOLS_TIMEOUT)
extern time_t g_now_seconds;
#define g_now_minutes (g_now_seconds/60)
extern uint32_t g_tracker_id;
typedef enum { FLAG_TCP, FLAG_UDP, FLAG_MCA } PROTO_FLAG;
@ -57,6 +54,7 @@ static const uint8_t PEER_FLAG_LEECHING = 0x00; @@ -57,6 +54,7 @@ static const uint8_t PEER_FLAG_LEECHING = 0x00;
#define OT_SETIP( peer, ip ) memmove((peer),(ip),4);
#define OT_SETPORT( peer, port ) memmove(((uint8_t*)peer)+4,(port),2);
#define OT_FLAG(peer) (((uint8_t*)(peer))[6])
#define OT_PEERTIME(peer) (((uint8_t*)(peer))[7])
#define OT_PEER_COMPARE_SIZE ((size_t)6)
#define OT_HASH_COMPARE_SIZE (sizeof(ot_hash))
@ -75,18 +73,18 @@ struct ot_peerlist { @@ -75,18 +73,18 @@ struct ot_peerlist {
size_t seed_count;
size_t peer_count;
size_t down_count;
size_t seed_counts[ OT_POOLS_COUNT ];
ot_vector peers[ OT_POOLS_COUNT ];
#ifdef WANT_SYNC_BATCH
ot_vector changeset;
#endif
/* normal peers vector or
pointer to ot_vector[32] buckets if data != NULL and space == 0
*/
ot_vector peers;
};
#define OT_PEERLIST_HASBUCKETS(peer_list) ((peer_list) && ((peer_list)->peers.size > (peer_list)->peers.space))
/*
Exported functions
*/
#if defined( WANT_SYNC_BATCH ) || defined( WANT_SYNC_LIVE )
#ifdef WANT_SYNC_LIVE
#define WANT_SYNC
#endif
@ -100,9 +98,11 @@ int trackerlogic_init( const char * const serverdir ); @@ -100,9 +98,11 @@ int trackerlogic_init( const char * const serverdir );
void trackerlogic_deinit( void );
void exerr( char * message );
ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer WANT_SYNC_PARAM( int from_changeset ) );
/* add_peer_to_torrent does only release the torrent bucket if from_sync is set,
otherwise it is released in return_peers_for_torrent */
size_t return_peers_for_torrent( ot_torrent *torrent, size_t amount, char *reply, PROTO_FLAG proto );
ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer WANT_SYNC_PARAM( int from_sync ) );
size_t remove_peer_from_torrent( ot_hash *hash, ot_peer *peer, char *reply, PROTO_FLAG proto );
size_t return_peers_for_torrent( ot_hash *hash, size_t amount, char *reply, PROTO_FLAG proto );
size_t return_tcp_scrape_for_torrent( ot_hash *hash, int amount, char *reply );
size_t return_udp_scrape_for_torrent( ot_hash *hash, char *reply );

Loading…
Cancel
Save