From b38104b9862a57271567c3dcb7e004addbe70ffc Mon Sep 17 00:00:00 2001 From: erdgeist <> Date: Tue, 27 Mar 2007 12:07:29 +0000 Subject: [PATCH] Added outbound part of sync Proposed format: d4:syncd20:8*N:(xxxxyy)*Nee, therefore had to refactor torrent cleanup now that it will hit all torrents once every OT_POOL_TIMEOUT units. --- opentracker.c | 41 +++------- trackerlogic.c | 213 +++++++++++++++++++++++++++++++++---------------- trackerlogic.h | 3 +- 3 files changed, 157 insertions(+), 100 deletions(-) diff --git a/opentracker.c b/opentracker.c index b8c90d2..7be1421 100644 --- a/opentracker.c +++ b/opentracker.c @@ -200,31 +200,9 @@ static void httpresponse( const int64 s, char *data ) { switch( scan_urlencoded_query( &c, data = c, SCAN_PATH ) ) { case 4: /* sync ? */ if( byte_diff( data, 4, "sync") ) HTTPERROR_404; - scanon = 1; - - while( scanon ) { - switch( scan_urlencoded_query( &c, data = c, SCAN_SEARCHPATH_PARAM ) ) { - case -2: scanon = 0; break; /* TERMINATOR */ - case -1: HTTPERROR_400_PARAM; /* PARSE ERROR */ - case 9: - if(byte_diff(data,9,"info_hash")) { - scan_urlencoded_query( &c, NULL, SCAN_SEARCHPATH_VALUE ); - continue; - } - /* ignore this, when we have less than 20 bytes */ - if( scan_urlencoded_query( &c, data = c, SCAN_SEARCHPATH_VALUE ) != 20 ) HTTPERROR_400_PARAM; - hash = (ot_hash*)data; /* Fall through intended */ - break; - default: - scan_urlencoded_query( &c, NULL, SCAN_SEARCHPATH_VALUE ); - break; - } - } - - if( !hash ) HTTPERROR_400_PARAM; - if( !( reply_size = return_sync_for_torrent( hash, &reply ) ) ) HTTPERROR_500; - + if( !( reply_size = return_changeset_for_tracker( &reply ) ) ) HTTPERROR_500; return sendmallocdata( s, reply, reply_size ); + case 5: /* stats ? */ if( byte_diff(data,5,"stats")) HTTPERROR_404; scanon = 1; @@ -523,11 +501,13 @@ static void handle_read( const int64 clientsocket ) { array_catb( &h->request, static_inbuf, l ); if( array_failed( &h->request ) ) - httperror( clientsocket, "500 Server Error", "Request too long."); - else if( array_bytes( &h->request ) > 8192 ) - httperror( clientsocket, "500 request too long", "You sent too much headers"); - else if( memchr( array_start( &h->request ), '\n', array_length( &h->request, 1 ) ) ) - httpresponse( clientsocket, array_start( &h->request ) ); + return httperror( clientsocket, "500 Server Error", "Request too long."); + + if( array_bytes( &h->request ) > 8192 ) + return httperror( clientsocket, "500 request too long", "You sent too much headers"); + + if( memchr( array_start( &h->request ), '\n', array_length( &h->request, 1 ) ) ) + return httpresponse( clientsocket, array_start( &h->request ) ); } static void handle_write( const int64 clientsocket ) { @@ -701,6 +681,9 @@ static void server_mainloop( ) { taia_now( &next_timeout_check ); taia_addsec( &next_timeout_check, &next_timeout_check, OT_CLIENT_TIMEOUT_CHECKINTERVAL); } + + /* See if we need to move our pools */ + clean_all_torrents(); } } diff --git a/trackerlogic.c b/trackerlogic.c index c8576f6..d2d279e 100644 --- a/trackerlogic.c +++ b/trackerlogic.c @@ -25,6 +25,10 @@ /* GLOBAL VARIABLES */ static ot_vector all_torrents[256]; +static ot_vector changeset; +size_t changeset_size = 0; +time_t last_clean_time = 0; + #ifdef WANT_CLOSED_TRACKER int g_closedtracker = 1; static ot_torrent* const OT_TORRENT_NOT_ON_WHITELIST = (ot_torrent*)1; @@ -158,33 +162,6 @@ static int vector_remove_torrent( ot_vector *vector, ot_hash *hash ) { return 1; } -/* This function deallocates all timedouted pools and shifts all other pools - it Returns 1 if torrent itself has not seen an announce for more than OT_TORRENT_TIMEOUT time units - 0 if torrent is not yet timed out - Note: We expect NOW as a parameter since calling time() may be expensive -*/ -static int clean_peerlist( time_t time_now, ot_peerlist *peer_list ) { - int i, timedout = (int)( time_now - peer_list->base ); - - if( !timedout ) return 0; - if( timedout > OT_POOLS_COUNT ) timedout = OT_POOLS_COUNT; - - for( i = OT_POOLS_COUNT - timedout; i < OT_POOLS_COUNT; ++i ) - free( peer_list->peers[i].data); - - memmove( peer_list->peers + timedout, peer_list->peers, sizeof( ot_vector ) * (OT_POOLS_COUNT-timedout) ); - byte_zero( peer_list->peers, sizeof( ot_vector ) * timedout ); - - memmove( peer_list->seed_count + timedout, peer_list->seed_count, sizeof( size_t ) * ( OT_POOLS_COUNT - timedout) ); - byte_zero( peer_list->seed_count, sizeof( size_t ) * timedout ); - - if( timedout == OT_POOLS_COUNT ) - return time_now - peer_list->base > OT_TORRENT_TIMEOUT; - - peer_list->base = time_now; - return 0; -} - ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer ) { int exactmatch; ot_torrent *torrent; @@ -219,8 +196,7 @@ ot_torrent *add_peer_to_torrent( ot_hash *hash, ot_peer *peer ) { byte_zero( torrent->peer_list, sizeof( ot_peerlist ) ); torrent->peer_list->base = NOW; - } else - clean_peerlist( NOW, torrent->peer_list ); + } /* Sanitize flags: Whoever claims to have completed download, must be a seeder */ if( ( OT_FLAG( peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED ) @@ -294,7 +270,9 @@ size_t return_peers_for_torrent( ot_torrent *torrent, size_t amount, char *reply peer_count += torrent->peer_list->peers[index].size; seed_count += torrent->peer_list->seed_count[index]; } - if( peer_count < amount ) amount = peer_count; + + if( peer_count < amount ) + amount = peer_count; if( is_tcp ) r += sprintf( r, "d8:completei%zde10:incompletei%zde8:intervali%ie5:peers%zd:", seed_count, peer_count-seed_count, OT_CLIENT_REQUEST_INTERVAL_RANDOM, 6*amount ); @@ -348,10 +326,8 @@ size_t return_fullscrape_for_tracker( char **reply ) { int i, k; char *r; - for( i=0; i<256; ++i ) { - ot_vector *torrents_list = &all_torrents[i]; - torrent_count += torrents_list->size; - } + for( i=0; i<256; ++i ) + torrent_count += all_torrents[i].size; if( !( r = *reply = malloc( 128*torrent_count ) ) ) return 0; @@ -380,7 +356,6 @@ size_t return_memstat_for_tracker( char **reply ) { size_t torrent_count = 0, j; int i, k; char *r; - time_t time_now = NOW; for( i=0; i<256; ++i ) { ot_vector *torrents_list = &all_torrents[i]; @@ -398,7 +373,6 @@ size_t return_memstat_for_tracker( char **reply ) { ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[j] ).peer_list; ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[j] ).hash; r += sprintf( r, "\n%s:\n", to_hex( (ot_byte*)hash ) ); - clean_peerlist( time_now, peer_list ); for( k=0; kpeers[k].size), (unsigned int)peer_list->peers[k].space ); } @@ -418,7 +392,6 @@ size_t return_udp_scrape_for_torrent( ot_hash *hash, char *reply ) { memset( reply, 0, 12); } else { unsigned long *r = (unsigned long*) reply; - clean_peerlist( NOW, torrent->peer_list ); for( i=0; ipeer_list->peers[i].size; @@ -440,7 +413,6 @@ size_t return_tcp_scrape_for_torrent( ot_hash *hash, char *reply ) { ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); if( !exactmatch ) return sprintf( r, "d5:filesdee" ); - clean_peerlist( NOW, torrent->peer_list ); for( i=0; ipeer_list->peers[i].size; @@ -453,36 +425,145 @@ size_t return_tcp_scrape_for_torrent( ot_hash *hash, char *reply ) { return r - reply; } -size_t return_sync_for_torrent( ot_hash *hash, char **reply ) { - int exactmatch; - size_t peers = 0; - char *r; - ot_vector *torrents_list = &all_torrents[*hash[0]]; - ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); +/* Throw away old changeset */ +static void release_changeset( void ) { + ot_byte **changeset_ptrs = (ot_byte**)(changeset.data); + int i; + + for( i = 0; i < changeset.size; ++i ) + free( changeset_ptrs[i] ); + + free( changeset_ptrs ); + byte_zero( &changeset, sizeof( changeset ) ); + + changeset_size = 0; +} + +static void add_pool_to_changeset( ot_hash *hash, ot_peer *peers, size_t peer_count ) { + ot_byte *pool_copy = (ot_byte *)malloc( sizeof( size_t ) + sizeof( ot_hash ) + sizeof( ot_peer ) * peer_count + 13 ); + size_t r = 0; + + if( !pool_copy ) + return; + + memmove( pool_copy + sizeof( size_t ), "20:", 3 ); + memmove( pool_copy + sizeof( size_t ) + 3, hash, sizeof( ot_hash ) ); + r = sizeof( size_t ) + 3 + sizeof( ot_hash ); + r += sprintf( (char*)pool_copy + r, "%zd:", sizeof( ot_peer ) * peer_count ); + memmove( pool_copy + r, peers, sizeof( ot_peer ) * peer_count ); + r += sizeof( ot_peer ) * peer_count; + + /* Without the length field */ + *(size_t*)pool_copy = r - sizeof( size_t ); + + if( changeset.size + 1 >= changeset.space ) { + size_t new_space = changeset.space ? OT_VECTOR_GROW_RATIO * changeset.space : OT_VECTOR_MIN_MEMBERS; + ot_byte *new_data = realloc( changeset.data, new_space * sizeof( ot_byte *) ); + + if( !new_data ) + return free( pool_copy ); + + changeset.data = new_data; + changeset.space = new_space; + } + + ((ot_byte**)changeset.data)[changeset.size++] = pool_copy; + + /* Without the length field */ + changeset_size += r - sizeof( size_t ); +} + +/* Proposed output format + d4:syncd20:8*N:(xxxxyy)*Nee +*/ +size_t return_changeset_for_tracker( char **reply ) { + size_t i, r = 8; + + clean_all_torrents(); + + *reply = malloc( 8 + changeset_size + 2 ); + if( !*reply ) + return 0; - if( exactmatch ) { - clean_peerlist( NOW, torrent->peer_list ); - peers = torrent->peer_list->peers[0].size; + memmove( *reply, "d4:syncd", 8 ); + for( i = 0; i < changeset.size; ++i ) { + ot_byte *data = ((ot_byte**)changeset.data)[i]; + memmove( *reply + r, data + sizeof( size_t ), *(size_t*)data ); + r += *(size_t*)data; } - if( !( r = *reply = malloc( 10 + peers * sizeof( ot_peer ) ) ) ) return 0; + (*reply)[r++] = 'e'; + (*reply)[r++] = 'e'; + + return r; +} + +/* Clean up all torrents, remove timedout pools and + torrents, also prepare new changeset */ +void clean_all_torrents( void ) { + int i, j, k; + time_t time_now = NOW; + size_t peers_count; + + if( time_now <= last_clean_time ) + return; + last_clean_time = time_now; + + release_changeset(); - memmove( r, "d4:sync", 7 ); - r += 7; - r += sprintf( r, "%zd:", peers * sizeof( ot_peer ) ); - if( peers ) { - memmove( r, torrent->peer_list->peers[0].data, peers * sizeof( ot_peer ) ); - r += peers * sizeof( ot_peer ); + for( i=0; i<256; ++i ) { + ot_vector *torrents_list = &all_torrents[i]; + for( j=0; jsize; ++j ) { + ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[j] ).peer_list; + ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[j] ).hash; + + time_t timedout = (int)( time_now - peer_list->base ); + + /* Torrent has idled out */ + if( timedout > OT_TORRENT_TIMEOUT ) { + vector_remove_torrent( torrents_list, hash ); + --j; + } + + /* If nothing to be cleaned here, handle next torrent */ + if( timedout > OT_POOLS_COUNT ) + continue; + + /* Release vectors that have timed out */ + for( k = OT_POOLS_COUNT - timedout; k < OT_POOLS_COUNT; ++k ) + free( peer_list->peers[k].data); + + /* Shift vectors back by the amount of pools that were shifted out */ + memmove( peer_list->peers + timedout, peer_list->peers, sizeof( ot_vector ) * ( OT_POOLS_COUNT - timedout ) ); + byte_zero( peer_list->peers, sizeof( ot_vector ) * timedout ); + + /* Shift back seed counts as well */ + memmove( peer_list->seed_count + timedout, peer_list->seed_count, sizeof( size_t ) * ( OT_POOLS_COUNT - timedout ) ); + byte_zero( peer_list->seed_count, sizeof( size_t ) * timedout ); + + /* Save the block modified within last OT_POOLS_TIMEOUT */ + if( peer_list->peers[1].size ) + add_pool_to_changeset( hash, peer_list->peers[1].data, peer_list->peers[1].size ); + + peers_count = 0; + for( k = 0; k < OT_POOLS_COUNT; ++k ) + peers_count += peer_list->peers[k].size; + + if( peers_count ) { + peer_list->base = time_now; + } else { + /* When we got here, the last time that torrent + has been touched is OT_POOLS_COUNT units before */ + peer_list->base = time_now - OT_POOLS_COUNT; + } + } } - *r++ = 'e'; - return r - *reply; } typedef struct { int val; ot_torrent * torrent; } ot_record; /* Fetches stats from tracker */ size_t return_stats_for_tracker( char *reply, int mode ) { - time_t time_now = NOW; size_t torrent_count = 0, peer_count = 0, seed_count = 0, j; ot_record top5s[5], top5c[5]; char *r = reply; @@ -498,12 +579,6 @@ size_t return_stats_for_tracker( char *reply, int mode ) { ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[j] ).peer_list; size_t local_peers = 0, local_seeds = 0; - if( clean_peerlist( time_now, peer_list ) ) { - ot_hash *hash =&( ((ot_torrent*)(torrents_list->data))[j] ).hash; - vector_remove_torrent( torrents_list, hash ); - --j; - continue; - } for( k=0; kpeers[k].size; local_seeds += peer_list->seed_count[k]; @@ -549,12 +624,6 @@ void remove_peer_from_torrent( ot_hash *hash, ot_peer *peer ) { if( !exactmatch ) return; - /* Maybe this does the job */ - if( clean_peerlist( NOW, torrent->peer_list ) ) { - vector_remove_torrent( torrents_list, hash ); - return; - } - for( i=0; ipeer_list->peers[i], peer, i == 0 ) ) { case 0: continue; @@ -572,7 +641,9 @@ int init_logic( const char * const serverdir ) { srandom( time(NULL) ); /* Initialize control structures */ - byte_zero( all_torrents, sizeof (all_torrents) ); + byte_zero( all_torrents, sizeof( all_torrents ) ); + byte_zero( &changeset, sizeof( changeset ) ); + changeset_size = 0; return 0; } @@ -591,4 +662,6 @@ void deinit_logic( void ) { } } byte_zero( all_torrents, sizeof (all_torrents)); + byte_zero( &changeset, sizeof( changeset ) ); + changeset_size = 0; } diff --git a/trackerlogic.h b/trackerlogic.h index fbc86ea..dfd5047 100644 --- a/trackerlogic.h +++ b/trackerlogic.h @@ -98,9 +98,10 @@ size_t return_peers_for_torrent( ot_torrent *torrent, size_t amount, char *reply size_t return_fullscrape_for_tracker( char **reply ); size_t return_tcp_scrape_for_torrent( ot_hash *hash, char *reply ); size_t return_udp_scrape_for_torrent( ot_hash *hash, char *reply ); -size_t return_sync_for_torrent( ot_hash *hash, char **reply ); size_t return_stats_for_tracker( char *reply, int mode ); size_t return_memstat_for_tracker( char **reply ); +size_t return_changeset_for_tracker( char **reply ); +void clean_all_torrents( void ); void remove_peer_from_torrent( ot_hash *hash, ot_peer *peer ); #endif