diff --git a/ot_http.c b/ot_http.c index c544468..567cba3 100644 --- a/ot_http.c +++ b/ot_http.c @@ -369,12 +369,11 @@ static ot_keywords keywords_announce[] = { { "port", 1 }, { "left", 2 }, { "even #ifdef WANT_FULLLOG_NETWORKS { "lognet", 8 }, #endif +{ "peer_id", 9 }, { NULL, -3 } }; static ot_keywords keywords_announce_event[] = { { "completed", 1 }, { "stopped", 2 }, { NULL, -3 } }; static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws, char *read_ptr ) { int numwant, tmp, scanon; - ot_peer peer; - ot_hash *hash = NULL; unsigned short port = 0; char *write_ptr; ssize_t len; @@ -392,14 +391,18 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws, ot_ip6 proxied_ip; char *fwd = http_header( ws->request, ws->header_size, "x-forwarded-for" ); if( fwd && scan_ip6( fwd, proxied_ip ) ) - OT_SETIP( &peer, proxied_ip ); + OT_SETIP( &ws->peer, proxied_ip ); else - OT_SETIP( &peer, cookie->ip ); + OT_SETIP( &ws->peer, cookie->ip ); } else #endif - OT_SETIP( &peer, cookie->ip ); - OT_SETPORT( &peer, &port ); - OT_PEERFLAG( &peer ) = 0; + + ws->peer_id = NULL; + ws->hash = NULL; + + OT_SETIP( &ws->peer, cookie->ip ); + OT_SETPORT( &ws->peer, &port ); + OT_PEERFLAG( &ws->peer ) = 0; numwant = 50; scanon = 1; @@ -411,21 +414,21 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws, case 1: /* matched "port" */ len = scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ); if( ( len <= 0 ) || scan_fixed_int( write_ptr, len, &tmp ) || ( tmp > 0xffff ) ) HTTPERROR_400_PARAM; - port = htons( tmp ); OT_SETPORT( &peer, &port ); + port = htons( tmp ); OT_SETPORT( &ws->peer, &port ); break; case 2: /* matched "left" */ if( ( len = scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ) ) <= 0 ) HTTPERROR_400_PARAM; if( scan_fixed_int( write_ptr, len, &tmp ) ) tmp = 0; - if( !tmp ) OT_PEERFLAG( &peer ) |= PEER_FLAG_SEEDING; + if( !tmp ) OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_SEEDING; break; case 3: /* matched "event" */ switch( scan_find_keywords( keywords_announce_event, &read_ptr, SCAN_SEARCHPATH_VALUE ) ) { case -1: HTTPERROR_400_PARAM; case 1: /* matched "completed" */ - OT_PEERFLAG( &peer ) |= PEER_FLAG_COMPLETED; + OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_COMPLETED; break; case 2: /* matched "stopped" */ - OT_PEERFLAG( &peer ) |= PEER_FLAG_STOPPED; + OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_STOPPED; break; default: break; @@ -443,10 +446,10 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws, if( !tmp ) HTTPERROR_400_COMPACT; break; case 6: /* matched "info_hash" */ - if( hash ) HTTPERROR_400_DOUBLEHASH; + if( ws->hash ) HTTPERROR_400_DOUBLEHASH; /* ignore this, when we have less than 20 bytes */ if( scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ) != 20 ) HTTPERROR_400_PARAM; - hash = (ot_hash*)write_ptr; + ws->hash = (ot_hash*)write_ptr; break; #ifdef WANT_IP_FROM_QUERY_STRING case 7: /* matched "ip" */ @@ -455,7 +458,7 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws, len = scan_urlencoded_query( &read_ptr, tmp_buf2, SCAN_SEARCHPATH_VALUE ); tmp_buf2[len] = 0; if( ( len <= 0 ) || !scan_ip6( tmp_buf2, tmp_buf1 ) ) HTTPERROR_400_PARAM; - OT_SETIP( &peer, tmp_buf1 ); + OT_SETIP( &ws->peer, tmp_buf1 ); } break; #endif @@ -490,6 +493,12 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws, //} } #endif + break; + case 9: /* matched "peer_id" */ + /* ignore this, when we have less than 20 bytes */ + if( scan_urlencoded_query( &read_ptr, write_ptr = read_ptr, SCAN_SEARCHPATH_VALUE ) != 20 ) HTTPERROR_400_PARAM; + ws->peer_id = write_ptr; + break; } } @@ -501,13 +510,13 @@ static ssize_t http_handle_announce( const int64 sock, struct ot_workstruct *ws, stats_issue_event( EVENT_ACCEPT, FLAG_TCP, (uintptr_t)ws->reply ); /* Scanned whole query string */ - if( !hash ) + if( !ws->hash ) return ws->reply_size = sprintf( ws->reply, "d14:failure reason80:Your client forgot to send your torrent's info_hash. Please upgrade your client.e" ); - if( OT_PEERFLAG( &peer ) & PEER_FLAG_STOPPED ) - ws->reply_size = remove_peer_from_torrent( *hash, &peer, ws->reply, FLAG_TCP ); + if( OT_PEERFLAG( &ws->peer ) & PEER_FLAG_STOPPED ) + ws->reply_size = remove_peer_from_torrent( FLAG_TCP, ws ); else - ws->reply_size = add_peer_to_torrent_and_return_peers( *hash, &peer, FLAG_TCP, numwant, ws->reply ); + ws->reply_size = add_peer_to_torrent_and_return_peers( FLAG_TCP, ws, numwant ); stats_issue_event( EVENT_ANNOUNCE, FLAG_TCP, ws->reply_size); return ws->reply_size; diff --git a/ot_livesync.c b/ot_livesync.c index 9e1c723..87fe5cf 100644 --- a/ot_livesync.c +++ b/ot_livesync.c @@ -33,23 +33,9 @@ char groupip_1[4] = { 224,0,23,5 }; #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) -#ifdef WANT_SYNC_SCRAPE -#define LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE 1504 -#define LIVESYNC_OUTGOING_WATERMARK_SCRAPE (sizeof(ot_hash)+sizeof(uint64_t)+sizeof(uint32_t)) -#define LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE 100 - -#define LIVESYNC_FIRST_BEACON_DELAY (30*60) /* seconds */ -#define LIVESYNC_BEACON_INTERVAL 60 /* seconds */ -#define LIVESYNC_INQUIRE_THRESH 0.75 -#endif /* WANT_SYNC_SCRAPE */ - #define LIVESYNC_MAXDELAY 15 /* seconds */ -enum { OT_SYNC_PEER -#ifdef WANT_SYNC_SCRAPE - , OT_SYNC_SCRAPE_BEACON, OT_SYNC_SCRAPE_INQUIRE, OT_SYNC_SCRAPE_TELL -#endif -}; +enum { OT_SYNC_PEER }; /* Forward declaration */ static void * livesync_worker( void * args ); @@ -59,52 +45,24 @@ static int64 g_socket_in = -1; /* For incoming packets */ static int64 g_socket_out = -1; -static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; - -static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; -static uint8_t *g_peerbuffer_pos; -static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; +char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; +static size_t g_outbuf_data; static ot_time g_next_packet_time; -#ifdef WANT_SYNC_SCRAPE -/* Live sync scrape buffers, states and timers */ -static ot_time g_next_beacon_time; -static ot_time g_next_inquire_time; - -static uint8_t g_scrapebuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE]; -static uint8_t *g_scrapebuffer_pos; -static uint8_t *g_scrapebuffer_highwater = g_scrapebuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE - LIVESYNC_OUTGOING_WATERMARK_SCRAPE; - -static size_t g_inquire_remote_count; -static uint32_t g_inquire_remote_host; -static int g_inquire_inprogress; -static int g_inquire_bucket; -#endif /* WANT_SYNC_SCRAPE */ - static pthread_t thread_id; void livesync_init( ) { + if( g_socket_in == -1 ) exerr( "No socket address for live sync specified." ); /* Prepare outgoing peers buffer */ - g_peerbuffer_pos = g_peerbuffer_start; - memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); - g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); - -#ifdef WANT_SYNC_SCRAPE - /* Prepare outgoing scrape buffer */ - g_scrapebuffer_pos = g_scrapebuffer_start; - memcpy( g_scrapebuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( (char*)g_scrapebuffer_pos + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_TELL); - g_scrapebuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); - - /* Wind up timers for inquires */ - g_next_beacon_time = g_now_seconds + LIVESYNC_FIRST_BEACON_DELAY; -#endif /* WANT_SYNC_SCRAPE */ - g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; + memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) ); + uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER); + g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); + g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; + pthread_create( &thread_id, NULL, livesync_worker, NULL ); } @@ -148,264 +106,86 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { } static void livesync_issue_peersync( ) { - socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, - groupip_1, LIVESYNC_PORT); - g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); + socket_send4(g_socket_out, g_outbuf, g_outbuf_data, groupip_1, LIVESYNC_PORT); + g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t ); g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; } -static void livesync_handle_peersync( ssize_t datalen ) { +static void livesync_handle_peersync( struct ot_workstruct *ws ) { int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); /* Now basic sanity checks have been done on the live sync packet We might add more testing and logging. */ - while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { - ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); - ot_hash *hash = (ot_hash*)(g_inbuffer + off); + while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= ws->request_size ) { + memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), sizeof( ot_peer ) ); + ws->hash = (ot_hash*)(ws->request + off); if( !g_opentracker_running ) return; - if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) - remove_peer_from_torrent( *hash, peer, NULL, FLAG_MCA ); + if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_STOPPED ) + remove_peer_from_torrent( FLAG_MCA, ws ); else - add_peer_to_torrent( *hash, peer, FLAG_MCA ); + add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 ); off += sizeof( ot_hash ) + sizeof( ot_peer ); } stats_issue_event(EVENT_SYNC, 0, - (datalen - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / + (ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) / ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ))); } -#ifdef WANT_SYNC_SCRAPE -void livesync_issue_beacon( ) { - size_t torrent_count = mutex_get_torrent_count(); - uint8_t beacon[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ]; - - memcpy( beacon, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_BEACON); - uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + sizeof(uint32_t), (uint32_t)((uint64_t)(torrent_count)>>32) ); - uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + 2 * sizeof(uint32_t), (uint32_t)torrent_count ); - - socket_send4(g_socket_out, (char*)beacon, sizeof(beacon), groupip_1, LIVESYNC_PORT); -} - -void livesync_handle_beacon( ssize_t datalen ) { - size_t torrent_count_local, torrent_count_remote; - if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ) - return; - torrent_count_local = mutex_get_torrent_count(); - torrent_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + sizeof(uint32_t))) << 32); - torrent_count_remote |= (size_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + 2 * sizeof(uint32_t)); - - /* Empty tracker is useless */ - if( !torrent_count_remote ) return; - - if( ((double)torrent_count_local ) / ((double)torrent_count_remote) < LIVESYNC_INQUIRE_THRESH) { - if( !g_next_inquire_time ) { - g_next_inquire_time = g_now_seconds + 2 * LIVESYNC_BEACON_INTERVAL; - g_inquire_remote_count = 0; - } - - if( torrent_count_remote > g_inquire_remote_count ) { - g_inquire_remote_count = torrent_count_remote; - memcpy( &g_inquire_remote_host, g_inbuffer, sizeof( g_tracker_id ) ); - } - } -} - -void livesync_issue_inquire( ) { - uint8_t inquire[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id)]; - - memcpy( inquire, &g_tracker_id, sizeof( g_tracker_id ) ); - uint32_pack_big( (char*)inquire + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_INQUIRE); - memcpy( inquire + sizeof(g_tracker_id) + sizeof(uint32_t), &g_inquire_remote_host, sizeof( g_tracker_id ) ); - - socket_send4(g_socket_out, (char*)inquire, sizeof(inquire), groupip_1, LIVESYNC_PORT); -} - -void livesync_handle_inquire( ssize_t datalen ) { - if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id) ) - return; - - /* If it isn't us, they're inquiring, ignore inquiry */ - if( memcmp( &g_tracker_id, g_inbuffer, sizeof( g_tracker_id ) ) ) - return; - - /* Start scrape tell on next ticker */ - if( !g_inquire_inprogress ) { - g_inquire_inprogress = 1; - g_inquire_bucket = 0; - } -} - -void livesync_issue_tell( ) { - int packets_to_send = LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE; - while( packets_to_send > 0 && g_inquire_bucket < OT_BUCKET_COUNT ) { - ot_vector *torrents_list = mutex_bucket_lock( g_inquire_bucket ); - unsigned int j; - for( j=0; jsize; ++j ) { - ot_torrent *torrent = (ot_torrent*)(torrents_list->data) + j; - memcpy(g_scrapebuffer_pos, torrent->hash, sizeof(ot_hash)); - g_scrapebuffer_pos += sizeof(ot_hash); - uint32_pack_big( (char*)g_scrapebuffer_pos , (uint32_t)(g_now_minutes - torrent->peer_list->base )); - uint32_pack_big( (char*)g_scrapebuffer_pos + 4, (uint32_t)((uint64_t)(torrent->peer_list->down_count)>>32) ); - uint32_pack_big( (char*)g_scrapebuffer_pos + 8, (uint32_t)torrent->peer_list->down_count ); - g_scrapebuffer_pos += 12; - - if( g_scrapebuffer_pos >= g_scrapebuffer_highwater ) { - socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT); - g_scrapebuffer_pos = g_scrapebuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t); - --packets_to_send; - } - } - mutex_bucket_unlock( g_inquire_bucket++, 0 ); - if( !g_opentracker_running ) - return; - } - if( g_inquire_bucket == OT_BUCKET_COUNT ) { - socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT); - g_inquire_inprogress = 0; - } -} - -void livesync_handle_tell( ssize_t datalen ) { - int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); - - /* Some instance is in progress of telling. Our inquiry was successful. - Don't ask again until we see next beacon. */ - g_next_inquire_time = 0; - - /* Don't cause any new inquiries during another tracker's tell */ - if( g_next_beacon_time - g_now_seconds < LIVESYNC_BEACON_INTERVAL ) - g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; - - while( off + sizeof(ot_hash) + 12 <= (size_t)datalen ) { - ot_hash *hash = (ot_hash*)(g_inbuffer+off); - ot_vector *torrents_list = mutex_bucket_lock_by_hash(*hash); - size_t down_count_remote; - int exactmatch; - ot_torrent *torrent = vector_find_or_insert(torrents_list, hash, sizeof(ot_hash), OT_HASH_COMPARE_SIZE, &exactmatch); - - if( !torrent ) { - mutex_bucket_unlock_by_hash( *hash, 0 ); - continue; - } - - if( !exactmatch ) { - /* Create a new torrent entry, then */ - memcpy( &torrent->hash, hash, sizeof(ot_hash)); - - if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { - vector_remove_torrent( torrents_list, torrent ); - mutex_bucket_unlock_by_hash( *hash, 0 ); - continue; - } - - byte_zero( torrent->peer_list, sizeof( ot_peerlist ) ); - torrent->peer_list->base = g_now_minutes - uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash)); - } - - down_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + sizeof(uint32_t))) << 32); - down_count_remote |= (size_t) uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash ) + 2 * sizeof(uint32_t)); - - if( down_count_remote > torrent->peer_list->down_count ) - torrent->peer_list->down_count = down_count_remote; - /* else - We might think of sending a tell packet, if we have a much larger downloaded count - */ - - mutex_bucket_unlock( g_inquire_bucket++, exactmatch?0:1 ); - if( !g_opentracker_running ) - return; - off += sizeof(ot_hash) + 12; - } -} -#endif /* WANT_SYNC_SCRAPE */ - /* Tickle the live sync module from time to time, so no events get stuck when there's not enough traffic to fill udp packets fast enough */ void livesync_ticker( ) { - /* livesync_issue_peersync sets g_next_packet_time */ if( g_now_seconds > g_next_packet_time && - g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) + g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) ) livesync_issue_peersync(); - -#ifdef WANT_SYNC_SCRAPE - /* Send first beacon after running at least LIVESYNC_FIRST_BEACON_DELAY - seconds and not more often than every LIVESYNC_BEACON_INTERVAL seconds */ - if( g_now_seconds > g_next_beacon_time ) { - livesync_issue_beacon( ); - g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; - } - - /* If we're interested in an inquiry and waited long enough to see all - tracker's beacons, go ahead and inquire */ - if( g_next_inquire_time && g_now_seconds > g_next_inquire_time ) { - livesync_issue_inquire(); - - /* If packet gets lost, ask again after LIVESYNC_BEACON_INTERVAL */ - g_next_inquire_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; - } - - /* If we're in process of telling, let's tell. */ - if( g_inquire_inprogress ) - livesync_issue_tell( ); - -#endif /* WANT_SYNC_SCRAPE */ } /* Inform live sync about whats going on. */ -void livesync_tell( ot_hash const info_hash, const ot_peer * const peer ) { +void livesync_tell( struct ot_workstruct *ws ) { - memcpy( g_peerbuffer_pos, info_hash, sizeof(ot_hash) ); - memcpy( g_peerbuffer_pos+sizeof(ot_hash), peer, sizeof(ot_peer) ); + memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) ); + memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, sizeof(ot_peer) ); - g_peerbuffer_pos += sizeof(ot_hash)+sizeof(ot_peer); + g_outbuf_data += sizeof(ot_hash) + sizeof(ot_peer); - if( g_peerbuffer_pos >= g_peerbuffer_highwater ) + if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ) livesync_issue_peersync(); } static void * livesync_worker( void * args ) { + struct ot_workstruct ws; ot_ip6 in_ip; uint16_t in_port; - ssize_t datalen; (void)args; - + + /* Initialize our "thread local storage" */ + ws.inbuf = ws.request = malloc( LIVESYNC_INCOMING_BUFFSIZE ); + ws.outbuf = ws.reply = 0; + memcpy( in_ip, V4mappedprefix, sizeof( V4mappedprefix ) ); while( 1 ) { - datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); + ws.request_size = socket_recv4(g_socket_in, (char*)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); /* Expect at least tracker id and packet type */ - if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) + if( ws.request_size <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) continue; if( !accesslist_isblessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) continue; - if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { + if( !memcmp( ws.inbuf, &g_tracker_id, sizeof( g_tracker_id ) ) ) { /* TODO: log packet coming from ourselves */ continue; } - switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) { + switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) { case OT_SYNC_PEER: - livesync_handle_peersync( datalen ); - break; -#ifdef WANT_SYNC_SCRAPE - case OT_SYNC_SCRAPE_BEACON: - livesync_handle_beacon( datalen ); - break; - case OT_SYNC_SCRAPE_INQUIRE: - livesync_handle_inquire( datalen ); - break; - case OT_SYNC_SCRAPE_TELL: - livesync_handle_tell( datalen ); + livesync_handle_peersync( &ws ); break; -#endif /* WANT_SYNC_SCRAPE */ default: break; } diff --git a/ot_livesync.h b/ot_livesync.h index fe9d122..1a3ed45 100644 --- a/ot_livesync.h +++ b/ot_livesync.h @@ -35,44 +35,6 @@ 0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 ) ]* - ######## - ######## SCRAPE SYNC PROTOCOL ######## - ######## - - Each tracker instance SHOULD broadcast a beacon every LIVESYNC_BEACON_INTERVAL - seconds after running at least LIVESYNC_FIRST_BEACON_DELAY seconds: - - packet type SYNC_SCRAPE_BEACON - [ 0x0008 0x08 amount of torrents served - ] - - If a tracker instance receives a beacon from another instance that has more than - its torrent count plus a threshold, it inquires for a scrape. It must wait for at - least 2 * LIVESYNC_BEACON_INTERVAL seconds in order to inspect beacons from all - tracker instances and inquire only the one with most torrents. - - If it sees a SYNC_SCRAPE_TELL within that time frame, it's likely, that another - scrape sync is going on. It should reset its state to needs no inquiry. It should - be reenabled on the next beacon, if still needed. - - packet type SYNC_SCRAPE_INQUIRE - [ 0x0008 0x04 id of tracker instance to inquire - ] - - The inquired tracker instance answers with as many scrape tell packets it needs - to deliver stats about all its torrents - - packet type SYNC_SCRAPE_TELL - [ 0x0008 0x14 info_hash - 0x001c 0x04 base offset (i.e. when was it last announced, in minutes) - 0x0020 0x08 downloaded count - ]* - - Each tracker instance that receives a SYNC_SCRAPE_TELL, looks up each torrent and - compares downloaded count with its own counter. It can send out its own scrape - tell packets, if it knows more. However to not interrupt a scrape tell, a tracker - should wait LIVESYNC_BEACON_INTERVAL after receiving a scrape tell. - */ #ifdef WANT_SYNC_LIVE @@ -86,7 +48,7 @@ void livesync_deinit(); void livesync_bind_mcast( char *ip, uint16_t port ); /* Inform live sync about whats going on. */ -void livesync_tell( ot_hash const info_hash, const ot_peer * const peer ); +void livesync_tell( struct ot_workstruct *ws ); /* Tickle the live sync module from time to time, so no events get stuck when there's not enough traffic to fill udp packets fast diff --git a/ot_stats.c b/ot_stats.c index 43ab8fd..b6469f9 100644 --- a/ot_stats.c +++ b/ot_stats.c @@ -642,8 +642,9 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event case EVENT_COMPLETED: #ifdef WANT_SYSLOGS if( event_data) { + struct ot_workstruct *ws = (struct ot_workstruct *)event_data; char timestring[64]; - char hex_out[42]; + char hash_hex[42], peerid_hex[42], ip_readable[64]; struct tm time_now; time_t ttt; @@ -651,8 +652,19 @@ void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event localtime_r( &ttt, &time_now ); strftime( timestring, sizeof( timestring ), "%FT%T%z", &time_now ); - to_hex( hex_out, (uint8_t*)event_data ); - syslog( LOG_INFO, "time=%s event=completed info_hash=%s", timestring, hex_out ); + to_hex( hash_hex, *ws->hash ); + if( ws->peer_id ) + to_hex( peerid_hex, (uint8_t*)ws->peer_id ); + else { + *peerid_hex=0; + } + +#ifdef WANT_V6 + ip_readable[ fmt_ip6c( ip_readable, (char*)&ws->peer ) ] = 0; +#else + ip_readable[ fmt_ip4( ip_readable, (char*)&ws->peer ) ] = 0; +#endif + syslog( LOG_INFO, "time=%s event=completed info_hash=%s peer_id=%s ip=%s", timestring, hash_hex, peerid_hex, ip_readable ); } #endif ot_overall_completed++; diff --git a/ot_udp.c b/ot_udp.c index a95a4fa..bc2d6cc 100644 --- a/ot_udp.c +++ b/ot_udp.c @@ -29,8 +29,6 @@ static void udp_make_connectionid( uint32_t * connid, const ot_ip6 remoteip ) { /* UDP implementation according to http://xbtt.sourceforge.net/udp_tracker_protocol.html */ void handle_udp6( int64 serversocket, struct ot_workstruct *ws ) { - ot_peer peer; - ot_hash *hash = NULL; ot_ip6 remoteip; uint32_t *inpacket = (uint32_t*)ws->inbuf; uint32_t *outpacket = (uint32_t*)ws->outbuf; @@ -43,6 +41,10 @@ void handle_udp6( int64 serversocket, struct ot_workstruct *ws ) { stats_issue_event( EVENT_ACCEPT, FLAG_UDP, (uintptr_t)remoteip ); stats_issue_event( EVENT_READ, FLAG_UDP, byte_count ); + /* Initialise hash pointer */ + ws->hash = NULL; + ws->peer_id = NULL; + /* Minimum udp tracker packet size, also catches error */ if( byte_count < 16 ) return; @@ -71,33 +73,36 @@ void handle_udp6( int64 serversocket, struct ot_workstruct *ws ) { numwant = ntohl( inpacket[92/4] ); if (numwant > 200) numwant = 200; - event = ntohl( inpacket[80/4] ); - port = *(uint16_t*)( ((char*)inpacket) + 96 ); - hash = (ot_hash*)( ((char*)inpacket) + 16 ); + event = ntohl( inpacket[80/4] ); + port = *(uint16_t*)( ((char*)inpacket) + 96 ); + ws->hash = (ot_hash*)( ((char*)inpacket) + 16 ); - OT_SETIP( &peer, remoteip ); - OT_SETPORT( &peer, &port ); - OT_PEERFLAG( &peer ) = 0; + OT_SETIP( &ws->peer, remoteip ); + OT_SETPORT( &ws->peer, &port ); + OT_PEERFLAG( &ws->peer ) = 0; switch( event ) { - case 1: OT_PEERFLAG( &peer ) |= PEER_FLAG_COMPLETED; break; - case 3: OT_PEERFLAG( &peer ) |= PEER_FLAG_STOPPED; break; + case 1: OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_COMPLETED; break; + case 3: OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_STOPPED; break; default: break; } if( !left ) - OT_PEERFLAG( &peer ) |= PEER_FLAG_SEEDING; + OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_SEEDING; outpacket[0] = htonl( 1 ); /* announce action */ outpacket[1] = inpacket[12/4]; - if( OT_PEERFLAG( &peer ) & PEER_FLAG_STOPPED ) /* Peer is gone. */ - byte_count = remove_peer_from_torrent( *hash, &peer, ws->outbuf, FLAG_UDP ); - else - byte_count = 8 + add_peer_to_torrent_and_return_peers( *hash, &peer, FLAG_UDP, numwant, ((char*)outpacket) + 8 ); + if( OT_PEERFLAG( &ws->peer ) & PEER_FLAG_STOPPED ) { /* Peer is gone. */ + ws->reply = ws->outbuf; + ws->reply_size = remove_peer_from_torrent( FLAG_UDP, ws ); + } else { + ws->reply = ws->outbuf + 8; + ws->reply_size = 8 + add_peer_to_torrent_and_return_peers( FLAG_UDP, ws, numwant ); + } - socket_send6( serversocket, ws->outbuf, byte_count, remoteip, remoteport, 0 ); - stats_issue_event( EVENT_ANNOUNCE, FLAG_UDP, byte_count ); + socket_send6( serversocket, ws->outbuf, ws->reply_size, remoteip, remoteport, 0 ); + stats_issue_event( EVENT_ANNOUNCE, FLAG_UDP, ws->reply_size ); break; case 2: /* This is a scrape action */ diff --git a/trackerlogic.c b/trackerlogic.c index 5348927..7ae9bb1 100644 --- a/trackerlogic.c +++ b/trackerlogic.c @@ -71,36 +71,35 @@ void add_torrent_from_saved_state( ot_hash hash, ot_time base, size_t down_count return mutex_bucket_unlock_by_hash( hash, 1 ); } -size_t add_peer_to_torrent_and_return_peers( ot_hash hash, ot_peer *peer, PROTO_FLAG proto, size_t amount, char * reply ) { +size_t add_peer_to_torrent_and_return_peers( PROTO_FLAG proto, struct ot_workstruct *ws, size_t amount ) { int exactmatch, delta_torrentcount = 0; - size_t reply_size; ot_torrent *torrent; ot_peer *peer_dest; - ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); + ot_vector *torrents_list = mutex_bucket_lock_by_hash( *ws->hash ); - if( !accesslist_hashisvalid( hash ) ) { - mutex_bucket_unlock_by_hash( hash, 0 ); + if( !accesslist_hashisvalid( *ws->hash ) ) { + mutex_bucket_unlock_by_hash( *ws->hash, 0 ); if( proto == FLAG_TCP ) { const char invalid_hash[] = "d14:failure reason63:Requested download is not authorized for use with this tracker.e"; - memcpy( reply, invalid_hash, strlen( invalid_hash ) ); + memcpy( ws->reply, invalid_hash, strlen( invalid_hash ) ); return strlen( invalid_hash ); } return 0; } - torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); + torrent = vector_find_or_insert( torrents_list, (void*)ws->hash, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); if( !torrent ) { - mutex_bucket_unlock_by_hash( hash, 0 ); + mutex_bucket_unlock_by_hash( *ws->hash, 0 ); return 0; } if( !exactmatch ) { /* Create a new torrent entry, then */ - memcpy( torrent->hash, hash, sizeof(ot_hash) ); + memcpy( torrent->hash, *ws->hash, sizeof(ot_hash) ); if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { vector_remove_torrent( torrents_list, torrent ); - mutex_bucket_unlock_by_hash( hash, 0 ); + mutex_bucket_unlock_by_hash( *ws->hash, 0 ); return 0; } @@ -112,76 +111,76 @@ size_t add_peer_to_torrent_and_return_peers( ot_hash hash, ot_peer *peer, PROTO_ torrent->peer_list->base = g_now_minutes; /* Check for peer in torrent */ - peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch ); + peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), &ws->peer, &exactmatch ); if( !peer_dest ) { - mutex_bucket_unlock_by_hash( hash, delta_torrentcount ); + mutex_bucket_unlock_by_hash( *ws->hash, delta_torrentcount ); return 0; } /* Tell peer that it's fresh */ - OT_PEERTIME( peer ) = 0; + OT_PEERTIME( &ws->peer ) = 0; /* Sanitize flags: Whoever claims to have completed download, must be a seeder */ - if( ( OT_PEERFLAG( peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED ) - OT_PEERFLAG( peer ) ^= PEER_FLAG_COMPLETED; + if( ( OT_PEERFLAG( &ws->peer ) & ( PEER_FLAG_COMPLETED | PEER_FLAG_SEEDING ) ) == PEER_FLAG_COMPLETED ) + OT_PEERFLAG( &ws->peer ) ^= PEER_FLAG_COMPLETED; /* If we hadn't had a match create peer there */ if( !exactmatch ) { #ifdef WANT_SYNC_LIVE if( proto == FLAG_MCA ) - OT_PEERFLAG( peer ) |= PEER_FLAG_FROM_SYNC; + OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_FROM_SYNC; else - livesync_tell( hash, peer ); + livesync_tell( ws ); #endif torrent->peer_list->peer_count++; - if( OT_PEERFLAG(peer) & PEER_FLAG_COMPLETED ) { + if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_COMPLETED ) { torrent->peer_list->down_count++; - stats_issue_event( EVENT_COMPLETED, 0, (uintptr_t)torrent->hash ); + stats_issue_event( EVENT_COMPLETED, 0, (uintptr_t)ws ); } - if( OT_PEERFLAG(peer) & PEER_FLAG_SEEDING ) + if( OT_PEERFLAG(&ws->peer) & PEER_FLAG_SEEDING ) torrent->peer_list->seed_count++; } else { stats_issue_event( EVENT_RENEW, 0, OT_PEERTIME( peer_dest ) ); #ifdef WANT_SPOT_WOODPECKER if( ( OT_PEERTIME(peer_dest) > 0 ) && ( OT_PEERTIME(peer_dest) < 20 ) ) - stats_issue_event( EVENT_WOODPECKER, 0, (uintptr_t)peer ); + stats_issue_event( EVENT_WOODPECKER, 0, (uintptr_t)&ws->peer ); #endif #ifdef WANT_SYNC_LIVE /* Won't live sync peers that come back too fast. Only exception: fresh "completed" reports */ if( proto != FLAG_MCA ) { if( OT_PEERTIME( peer_dest ) > OT_CLIENT_SYNC_RENEW_BOUNDARY || - ( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) && (OT_PEERFLAG(peer) & PEER_FLAG_COMPLETED ) ) ) - livesync_tell( hash, peer ); + ( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) && (OT_PEERFLAG(&ws->peer) & PEER_FLAG_COMPLETED ) ) ) + livesync_tell( ws ); } #endif - if( (OT_PEERFLAG(peer_dest) & PEER_FLAG_SEEDING ) && !(OT_PEERFLAG(peer) & PEER_FLAG_SEEDING ) ) + if( (OT_PEERFLAG(peer_dest) & PEER_FLAG_SEEDING ) && !(OT_PEERFLAG(&ws->peer) & PEER_FLAG_SEEDING ) ) torrent->peer_list->seed_count--; - if( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_SEEDING ) && (OT_PEERFLAG(peer) & PEER_FLAG_SEEDING ) ) + if( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_SEEDING ) && (OT_PEERFLAG(&ws->peer) & PEER_FLAG_SEEDING ) ) torrent->peer_list->seed_count++; - if( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) && (OT_PEERFLAG(peer) & PEER_FLAG_COMPLETED ) ) { + if( !(OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) && (OT_PEERFLAG(&ws->peer) & PEER_FLAG_COMPLETED ) ) { torrent->peer_list->down_count++; - stats_issue_event( EVENT_COMPLETED, 0, (uintptr_t)torrent->hash ); + stats_issue_event( EVENT_COMPLETED, 0, (uintptr_t)ws ); } if( OT_PEERFLAG(peer_dest) & PEER_FLAG_COMPLETED ) - OT_PEERFLAG( peer ) |= PEER_FLAG_COMPLETED; + OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_COMPLETED; } - memcpy( peer_dest, peer, sizeof(ot_peer) ); + memcpy( peer_dest, &ws->peer, sizeof(ot_peer) ); #ifdef WANT_SYNC if( proto == FLAG_MCA ) { - mutex_bucket_unlock_by_hash( hash, delta_torrentcount ); + mutex_bucket_unlock_by_hash( *ws->hash, delta_torrentcount ); return 0; } #endif - reply_size = return_peers_for_torrent( torrent, amount, reply, proto ); - mutex_bucket_unlock_by_hash( torrent->hash, delta_torrentcount ); - return reply_size; + ws->reply_size = return_peers_for_torrent( torrent, amount, ws->reply, proto ); + mutex_bucket_unlock_by_hash( *ws->hash, delta_torrentcount ); + return ws->reply_size; } static size_t return_peers_all( ot_peerlist *peer_list, char *reply ) { @@ -350,23 +349,22 @@ size_t return_tcp_scrape_for_torrent( ot_hash *hash_list, int amount, char *repl } static ot_peerlist dummy_list; -size_t remove_peer_from_torrent( ot_hash hash, ot_peer *peer, char *reply, PROTO_FLAG proto ) { +size_t remove_peer_from_torrent( PROTO_FLAG proto, struct ot_workstruct *ws ) { int exactmatch; - size_t reply_size = 0; - ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); - ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); + ot_vector *torrents_list = mutex_bucket_lock_by_hash( *ws->hash ); + ot_torrent *torrent = binary_search( ws->hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); ot_peerlist *peer_list = &dummy_list; #ifdef WANT_SYNC_LIVE if( proto != FLAG_MCA ) { - OT_PEERFLAG( peer ) |= PEER_FLAG_STOPPED; - livesync_tell( hash, peer ); + OT_PEERFLAG( &ws->peer ) |= PEER_FLAG_STOPPED; + livesync_tell( ws ); } #endif if( exactmatch ) { peer_list = torrent->peer_list; - switch( vector_remove_peer( &peer_list->peers, peer ) ) { + switch( vector_remove_peer( &peer_list->peers, &ws->peer ) ) { case 2: peer_list->seed_count--; /* Fall throughs intended */ case 1: peer_list->peer_count--; /* Fall throughs intended */ default: break; @@ -375,19 +373,19 @@ size_t remove_peer_from_torrent( ot_hash hash, ot_peer *peer, char *reply, PROTO if( proto == FLAG_TCP ) { int erval = OT_CLIENT_REQUEST_INTERVAL_RANDOM; - reply_size = sprintf( reply, "d8:completei%zde10:incompletei%zde8:intervali%ie12:min intervali%ie" PEERS_BENCODED "0:e", peer_list->seed_count, peer_list->peer_count - peer_list->seed_count, erval, erval / 2 ); + ws->reply_size = sprintf( ws->reply, "d8:completei%zde10:incompletei%zde8:intervali%ie12:min intervali%ie" PEERS_BENCODED "0:e", peer_list->seed_count, peer_list->peer_count - peer_list->seed_count, erval, erval / 2 ); } /* Handle UDP reply */ if( proto == FLAG_UDP ) { - ((uint32_t*)reply)[2] = htonl( OT_CLIENT_REQUEST_INTERVAL_RANDOM ); - ((uint32_t*)reply)[3] = htonl( peer_list->peer_count - peer_list->seed_count ); - ((uint32_t*)reply)[4] = htonl( peer_list->seed_count); - reply_size = 20; + ((uint32_t*)ws->reply)[2] = htonl( OT_CLIENT_REQUEST_INTERVAL_RANDOM ); + ((uint32_t*)ws->reply)[3] = htonl( peer_list->peer_count - peer_list->seed_count ); + ((uint32_t*)ws->reply)[4] = htonl( peer_list->seed_count); + ws->reply_size = 20; } - mutex_bucket_unlock_by_hash( hash, 0 ); - return reply_size; + mutex_bucket_unlock_by_hash( *ws->hash, 0 ); + return ws->reply_size; } void iterate_all_torrents( int (*for_each)( ot_torrent* torrent, uintptr_t data ), uintptr_t data ) { diff --git a/trackerlogic.h b/trackerlogic.h index 4052fa4..5ae644b 100644 --- a/trackerlogic.h +++ b/trackerlogic.h @@ -108,22 +108,29 @@ struct ot_peerlist { struct ot_workstruct { /* Thread specific, static */ - char *inbuf; -#define G_INBUF_SIZE 8192 - char *outbuf; -#define G_OUTBUF_SIZE 8192 -#ifdef _DEBUG_HTTPERROR - char *debugbuf; -#define G_DEBUGBUF_SIZE 8192 + char *inbuf; +#define G_INBUF_SIZE 8192 + char *outbuf; +#define G_OUTBUF_SIZE 8192 +#ifdef _DEBUG_HTTPERROR + char *debugbuf; +#define G_DEBUGBUF_SIZE 8192 #endif + /* The peer currently in the working */ + ot_peer peer; + + /* Pointers into the request buffer */ + ot_hash *hash; + char *peer_id; + /* HTTP specific, non static */ - int keep_alive; - char *request; - ssize_t request_size; - ssize_t header_size; - char *reply; - ssize_t reply_size; + int keep_alive; + char *request; + ssize_t request_size; + ssize_t header_size; + char *reply; + ssize_t reply_size; }; /* @@ -150,9 +157,8 @@ void exerr( char * message ); /* add_peer_to_torrent does only release the torrent bucket if from_sync is set, otherwise it is released in return_peers_for_torrent */ -#define add_peer_to_torrent(hash,peer,proto) add_peer_to_torrent_and_return_peers(hash,peer,proto,0,NULL) -size_t add_peer_to_torrent_and_return_peers( ot_hash hash, ot_peer *peer, PROTO_FLAG proto, size_t amount, char * reply ); -size_t remove_peer_from_torrent( ot_hash hash, ot_peer *peer, char *reply, PROTO_FLAG proto ); +size_t add_peer_to_torrent_and_return_peers( PROTO_FLAG proto, struct ot_workstruct *ws, size_t amount ); +size_t remove_peer_from_torrent( PROTO_FLAG proto, struct ot_workstruct *ws ); size_t return_tcp_scrape_for_torrent( ot_hash *hash, int amount, char *reply ); size_t return_udp_scrape_for_torrent( ot_hash hash, char *reply ); void add_torrent_from_saved_state( ot_hash hash, ot_time base, size_t down_count );