@ -33,23 +33,9 @@ char groupip_1[4] = { 224,0,23,5 };
# define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
# define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
# define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
# define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
# ifdef WANT_SYNC_SCRAPE
# define LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE 1504
# define LIVESYNC_OUTGOING_WATERMARK_SCRAPE (sizeof(ot_hash)+sizeof(uint64_t)+sizeof(uint32_t))
# define LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE 100
# define LIVESYNC_FIRST_BEACON_DELAY (30*60) /* seconds */
# define LIVESYNC_BEACON_INTERVAL 60 /* seconds */
# define LIVESYNC_INQUIRE_THRESH 0.75
# endif /* WANT_SYNC_SCRAPE */
# define LIVESYNC_MAXDELAY 15 /* seconds */
# define LIVESYNC_MAXDELAY 15 /* seconds */
enum { OT_SYNC_PEER
enum { OT_SYNC_PEER } ;
# ifdef WANT_SYNC_SCRAPE
, OT_SYNC_SCRAPE_BEACON , OT_SYNC_SCRAPE_INQUIRE , OT_SYNC_SCRAPE_TELL
# endif
} ;
/* Forward declaration */
/* Forward declaration */
static void * livesync_worker ( void * args ) ;
static void * livesync_worker ( void * args ) ;
@ -59,52 +45,24 @@ static int64 g_socket_in = -1;
/* For incoming packets */
/* For incoming packets */
static int64 g_socket_out = - 1 ;
static int64 g_socket_out = - 1 ;
static uint8_t g_inbuffer [ LIVESYNC_INCOMING_BUFFSIZE ] ;
static uint8_t g_peerbuffer_start [ LIVESYNC_OUTGOING_BUFFSIZE_PEERS ] ;
static uint8_t * g_peerbuffer_pos ;
static uint8_t * g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS ;
char g_outbuf [ LIVESYNC_OUTGOING_BUFFSIZE_PEERS ] ;
static size_t g_outbuf_data ;
static ot_time g_next_packet_time ;
static ot_time g_next_packet_time ;
# ifdef WANT_SYNC_SCRAPE
/* Live sync scrape buffers, states and timers */
static ot_time g_next_beacon_time ;
static ot_time g_next_inquire_time ;
static uint8_t g_scrapebuffer_start [ LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE ] ;
static uint8_t * g_scrapebuffer_pos ;
static uint8_t * g_scrapebuffer_highwater = g_scrapebuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE - LIVESYNC_OUTGOING_WATERMARK_SCRAPE ;
static size_t g_inquire_remote_count ;
static uint32_t g_inquire_remote_host ;
static int g_inquire_inprogress ;
static int g_inquire_bucket ;
# endif /* WANT_SYNC_SCRAPE */
static pthread_t thread_id ;
static pthread_t thread_id ;
void livesync_init ( ) {
void livesync_init ( ) {
if ( g_socket_in = = - 1 )
if ( g_socket_in = = - 1 )
exerr ( " No socket address for live sync specified. " ) ;
exerr ( " No socket address for live sync specified. " ) ;
/* Prepare outgoing peers buffer */
/* Prepare outgoing peers buffer */
g_peerbuffer_pos = g_peerbuffer_start ;
memcpy ( g_outbuf , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
memcpy ( g_peerbuffer_pos , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
uint32_pack_big ( g_outbuf + sizeof ( g_tracker_id ) , OT_SYNC_PEER ) ;
uint32_pack_big ( ( char * ) g_peerbuffer_pos + sizeof ( g_tracker_id ) , OT_SYNC_PEER ) ;
g_outbuf_data = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
g_peerbuffer_pos + = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
# ifdef WANT_SYNC_SCRAPE
/* Prepare outgoing scrape buffer */
g_scrapebuffer_pos = g_scrapebuffer_start ;
memcpy ( g_scrapebuffer_pos , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
uint32_pack_big ( ( char * ) g_scrapebuffer_pos + sizeof ( g_tracker_id ) , OT_SYNC_SCRAPE_TELL ) ;
g_scrapebuffer_pos + = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
/* Wind up timers for inquires */
g_next_beacon_time = g_now_seconds + LIVESYNC_FIRST_BEACON_DELAY ;
# endif /* WANT_SYNC_SCRAPE */
g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY ;
g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY ;
pthread_create ( & thread_id , NULL , livesync_worker , NULL ) ;
pthread_create ( & thread_id , NULL , livesync_worker , NULL ) ;
}
}
@ -148,264 +106,86 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
}
}
static void livesync_issue_peersync ( ) {
static void livesync_issue_peersync ( ) {
socket_send4 ( g_socket_out , ( char * ) g_peerbuffer_start , g_peerbuffer_pos - g_peerbuffer_start ,
socket_send4 ( g_socket_out , g_outbuf , g_outbuf_data , groupip_1 , LIVESYNC_PORT ) ;
groupip_1 , LIVESYNC_PORT ) ;
g_outbuf_data = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
g_peerbuffer_pos = g_peerbuffer_start + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY ;
g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY ;
}
}
static void livesync_handle_peersync ( ssize_t datalen ) {
static void livesync_handle_peersync ( struct ot_workstruct * ws ) {
int off = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
int off = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
/* Now basic sanity checks have been done on the live sync packet
/* Now basic sanity checks have been done on the live sync packet
We might add more testing and logging . */
We might add more testing and logging . */
while ( off + ( ssize_t ) sizeof ( ot_hash ) + ( ssize_t ) sizeof ( ot_peer ) < = datalen ) {
while ( off + ( ssize_t ) sizeof ( ot_hash ) + ( ssize_t ) sizeof ( ot_peer ) < = ws - > request_size ) {
ot_peer * peer = ( ot_peer * ) ( g_inbuffer + off + sizeof ( ot_hash ) ) ;
memcpy ( & ws - > peer , ws - > request + off + sizeof ( ot_hash ) , sizeof ( ot_peer ) ) ;
ot_hash * hash = ( ot_hash * ) ( g_inbuffer + off ) ;
ws - > hash = ( ot_hash * ) ( ws - > request + off ) ;
if ( ! g_opentracker_running ) return ;
if ( ! g_opentracker_running ) return ;
if ( OT_PEERFLAG ( peer ) & PEER_FLAG_STOPPED )
if ( OT_PEERFLAG ( & ws - > peer ) & PEER_FLAG_STOPPED )
remove_peer_from_torrent ( * hash , peer , NULL , FLAG_MCA ) ;
remove_peer_from_torrent ( FLAG_MCA , ws ) ;
else
else
add_peer_to_torrent ( * hash , peer , FLAG_MCA ) ;
add_peer_to_torrent_and_return_peers ( FLAG_MCA , ws , /* amount = */ 0 ) ;
off + = sizeof ( ot_hash ) + sizeof ( ot_peer ) ;
off + = sizeof ( ot_hash ) + sizeof ( ot_peer ) ;
}
}
stats_issue_event ( EVENT_SYNC , 0 ,
stats_issue_event ( EVENT_SYNC , 0 ,
( datalen - sizeof ( g_tracker_id ) - sizeof ( uint32_t ) ) /
( ws - > request_size - sizeof ( g_tracker_id ) - sizeof ( uint32_t ) ) /
( ( ssize_t ) sizeof ( ot_hash ) + ( ssize_t ) sizeof ( ot_peer ) ) ) ;
( ( ssize_t ) sizeof ( ot_hash ) + ( ssize_t ) sizeof ( ot_peer ) ) ) ;
}
}
# ifdef WANT_SYNC_SCRAPE
void livesync_issue_beacon ( ) {
size_t torrent_count = mutex_get_torrent_count ( ) ;
uint8_t beacon [ sizeof ( g_tracker_id ) + sizeof ( uint32_t ) + sizeof ( uint64_t ) ] ;
memcpy ( beacon , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
uint32_pack_big ( ( char * ) beacon + sizeof ( g_tracker_id ) , OT_SYNC_SCRAPE_BEACON ) ;
uint32_pack_big ( ( char * ) beacon + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) , ( uint32_t ) ( ( uint64_t ) ( torrent_count ) > > 32 ) ) ;
uint32_pack_big ( ( char * ) beacon + sizeof ( g_tracker_id ) + 2 * sizeof ( uint32_t ) , ( uint32_t ) torrent_count ) ;
socket_send4 ( g_socket_out , ( char * ) beacon , sizeof ( beacon ) , groupip_1 , LIVESYNC_PORT ) ;
}
void livesync_handle_beacon ( ssize_t datalen ) {
size_t torrent_count_local , torrent_count_remote ;
if ( datalen ! = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) + sizeof ( uint64_t ) )
return ;
torrent_count_local = mutex_get_torrent_count ( ) ;
torrent_count_remote = ( size_t ) ( ( ( uint64_t ) uint32_read_big ( ( char * ) g_inbuffer + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ) ) < < 32 ) ;
torrent_count_remote | = ( size_t ) uint32_read_big ( ( char * ) g_inbuffer + sizeof ( g_tracker_id ) + 2 * sizeof ( uint32_t ) ) ;
/* Empty tracker is useless */
if ( ! torrent_count_remote ) return ;
if ( ( ( double ) torrent_count_local ) / ( ( double ) torrent_count_remote ) < LIVESYNC_INQUIRE_THRESH ) {
if ( ! g_next_inquire_time ) {
g_next_inquire_time = g_now_seconds + 2 * LIVESYNC_BEACON_INTERVAL ;
g_inquire_remote_count = 0 ;
}
if ( torrent_count_remote > g_inquire_remote_count ) {
g_inquire_remote_count = torrent_count_remote ;
memcpy ( & g_inquire_remote_host , g_inbuffer , sizeof ( g_tracker_id ) ) ;
}
}
}
void livesync_issue_inquire ( ) {
uint8_t inquire [ sizeof ( g_tracker_id ) + sizeof ( uint32_t ) + sizeof ( g_tracker_id ) ] ;
memcpy ( inquire , & g_tracker_id , sizeof ( g_tracker_id ) ) ;
uint32_pack_big ( ( char * ) inquire + sizeof ( g_tracker_id ) , OT_SYNC_SCRAPE_INQUIRE ) ;
memcpy ( inquire + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) , & g_inquire_remote_host , sizeof ( g_tracker_id ) ) ;
socket_send4 ( g_socket_out , ( char * ) inquire , sizeof ( inquire ) , groupip_1 , LIVESYNC_PORT ) ;
}
void livesync_handle_inquire ( ssize_t datalen ) {
if ( datalen ! = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) + sizeof ( g_tracker_id ) )
return ;
/* If it isn't us, they're inquiring, ignore inquiry */
if ( memcmp ( & g_tracker_id , g_inbuffer , sizeof ( g_tracker_id ) ) )
return ;
/* Start scrape tell on next ticker */
if ( ! g_inquire_inprogress ) {
g_inquire_inprogress = 1 ;
g_inquire_bucket = 0 ;
}
}
void livesync_issue_tell ( ) {
int packets_to_send = LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE ;
while ( packets_to_send > 0 & & g_inquire_bucket < OT_BUCKET_COUNT ) {
ot_vector * torrents_list = mutex_bucket_lock ( g_inquire_bucket ) ;
unsigned int j ;
for ( j = 0 ; j < torrents_list - > size ; + + j ) {
ot_torrent * torrent = ( ot_torrent * ) ( torrents_list - > data ) + j ;
memcpy ( g_scrapebuffer_pos , torrent - > hash , sizeof ( ot_hash ) ) ;
g_scrapebuffer_pos + = sizeof ( ot_hash ) ;
uint32_pack_big ( ( char * ) g_scrapebuffer_pos , ( uint32_t ) ( g_now_minutes - torrent - > peer_list - > base ) ) ;
uint32_pack_big ( ( char * ) g_scrapebuffer_pos + 4 , ( uint32_t ) ( ( uint64_t ) ( torrent - > peer_list - > down_count ) > > 32 ) ) ;
uint32_pack_big ( ( char * ) g_scrapebuffer_pos + 8 , ( uint32_t ) torrent - > peer_list - > down_count ) ;
g_scrapebuffer_pos + = 12 ;
if ( g_scrapebuffer_pos > = g_scrapebuffer_highwater ) {
socket_send4 ( g_socket_out , ( char * ) g_scrapebuffer_start , g_scrapebuffer_pos - g_scrapebuffer_start , groupip_1 , LIVESYNC_PORT ) ;
g_scrapebuffer_pos = g_scrapebuffer_start + sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
- - packets_to_send ;
}
}
mutex_bucket_unlock ( g_inquire_bucket + + , 0 ) ;
if ( ! g_opentracker_running )
return ;
}
if ( g_inquire_bucket = = OT_BUCKET_COUNT ) {
socket_send4 ( g_socket_out , ( char * ) g_scrapebuffer_start , g_scrapebuffer_pos - g_scrapebuffer_start , groupip_1 , LIVESYNC_PORT ) ;
g_inquire_inprogress = 0 ;
}
}
void livesync_handle_tell ( ssize_t datalen ) {
int off = sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ;
/* Some instance is in progress of telling. Our inquiry was successful.
Don ' t ask again until we see next beacon . */
g_next_inquire_time = 0 ;
/* Don't cause any new inquiries during another tracker's tell */
if ( g_next_beacon_time - g_now_seconds < LIVESYNC_BEACON_INTERVAL )
g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL ;
while ( off + sizeof ( ot_hash ) + 12 < = ( size_t ) datalen ) {
ot_hash * hash = ( ot_hash * ) ( g_inbuffer + off ) ;
ot_vector * torrents_list = mutex_bucket_lock_by_hash ( * hash ) ;
size_t down_count_remote ;
int exactmatch ;
ot_torrent * torrent = vector_find_or_insert ( torrents_list , hash , sizeof ( ot_hash ) , OT_HASH_COMPARE_SIZE , & exactmatch ) ;
if ( ! torrent ) {
mutex_bucket_unlock_by_hash ( * hash , 0 ) ;
continue ;
}
if ( ! exactmatch ) {
/* Create a new torrent entry, then */
memcpy ( & torrent - > hash , hash , sizeof ( ot_hash ) ) ;
if ( ! ( torrent - > peer_list = malloc ( sizeof ( ot_peerlist ) ) ) ) {
vector_remove_torrent ( torrents_list , torrent ) ;
mutex_bucket_unlock_by_hash ( * hash , 0 ) ;
continue ;
}
byte_zero ( torrent - > peer_list , sizeof ( ot_peerlist ) ) ;
torrent - > peer_list - > base = g_now_minutes - uint32_read_big ( ( char * ) g_inbuffer + off + sizeof ( ot_hash ) ) ;
}
down_count_remote = ( size_t ) ( ( ( uint64_t ) uint32_read_big ( ( char * ) g_inbuffer + off + sizeof ( ot_hash ) + sizeof ( uint32_t ) ) ) < < 32 ) ;
down_count_remote | = ( size_t ) uint32_read_big ( ( char * ) g_inbuffer + off + sizeof ( ot_hash ) + 2 * sizeof ( uint32_t ) ) ;
if ( down_count_remote > torrent - > peer_list - > down_count )
torrent - > peer_list - > down_count = down_count_remote ;
/* else
We might think of sending a tell packet , if we have a much larger downloaded count
*/
mutex_bucket_unlock ( g_inquire_bucket + + , exactmatch ? 0 : 1 ) ;
if ( ! g_opentracker_running )
return ;
off + = sizeof ( ot_hash ) + 12 ;
}
}
# endif /* WANT_SYNC_SCRAPE */
/* Tickle the live sync module from time to time, so no events get
/* Tickle the live sync module from time to time, so no events get
stuck when there ' s not enough traffic to fill udp packets fast
stuck when there ' s not enough traffic to fill udp packets fast
enough */
enough */
void livesync_ticker ( ) {
void livesync_ticker ( ) {
/* livesync_issue_peersync sets g_next_packet_time */
/* livesync_issue_peersync sets g_next_packet_time */
if ( g_now_seconds > g_next_packet_time & &
if ( g_now_seconds > g_next_packet_time & &
g_peerbuffer_pos > g_peerbuffer_start + sizeof ( g_tracker_id ) )
g_outbuf_data > sizeof ( g_tracker_id ) + sizeof ( uint32_t ) )
livesync_issue_peersync ( ) ;
livesync_issue_peersync ( ) ;
# ifdef WANT_SYNC_SCRAPE
/* Send first beacon after running at least LIVESYNC_FIRST_BEACON_DELAY
seconds and not more often than every LIVESYNC_BEACON_INTERVAL seconds */
if ( g_now_seconds > g_next_beacon_time ) {
livesync_issue_beacon ( ) ;
g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL ;
}
/* If we're interested in an inquiry and waited long enough to see all
tracker ' s beacons , go ahead and inquire */
if ( g_next_inquire_time & & g_now_seconds > g_next_inquire_time ) {
livesync_issue_inquire ( ) ;
/* If packet gets lost, ask again after LIVESYNC_BEACON_INTERVAL */
g_next_inquire_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL ;
}
/* If we're in process of telling, let's tell. */
if ( g_inquire_inprogress )
livesync_issue_tell ( ) ;
# endif /* WANT_SYNC_SCRAPE */
}
}
/* Inform live sync about whats going on. */
/* Inform live sync about whats going on. */
void livesync_tell ( ot_hash const info_hash , const ot_peer * const peer ) {
void livesync_tell ( struct ot_workstruct * ws ) {
memcpy ( g_peerbuffer_pos , info_ hash , sizeof ( ot_hash ) ) ;
memcpy ( g_outbuf + g_outbuf_data , ws - > hash , sizeof ( ot_hash ) ) ;
memcpy ( g_peerbuffer_pos + sizeof ( ot_hash ) , peer , sizeof ( ot_peer ) ) ;
memcpy ( g_outbuf + g_outbuf_data + sizeof ( ot_hash ) , & ws - > peer , sizeof ( ot_peer ) ) ;
g_peerbuffer_pos + = sizeof ( ot_hash ) + sizeof ( ot_peer ) ;
g_outbuf_data + = sizeof ( ot_hash ) + sizeof ( ot_peer ) ;
if ( g_peerbuffer_pos > = g_peerbuffer_highwater )
if ( g_outbuf_data > = LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS )
livesync_issue_peersync ( ) ;
livesync_issue_peersync ( ) ;
}
}
static void * livesync_worker ( void * args ) {
static void * livesync_worker ( void * args ) {
struct ot_workstruct ws ;
ot_ip6 in_ip ; uint16_t in_port ;
ot_ip6 in_ip ; uint16_t in_port ;
ssize_t datalen ;
( void ) args ;
( void ) args ;
/* Initialize our "thread local storage" */
ws . inbuf = ws . request = malloc ( LIVESYNC_INCOMING_BUFFSIZE ) ;
ws . outbuf = ws . reply = 0 ;
memcpy ( in_ip , V4mappedprefix , sizeof ( V4mappedprefix ) ) ;
memcpy ( in_ip , V4mappedprefix , sizeof ( V4mappedprefix ) ) ;
while ( 1 ) {
while ( 1 ) {
datalen = socket_recv4 ( g_socket_in , ( char * ) g_inbuffer , LIVESYNC_INCOMING_BUFFSIZE , 12 + ( char * ) in_ip , & in_port ) ;
ws . request_size = socket_recv4 ( g_socket_in , ( char * ) ws . inbuf , LIVESYNC_INCOMING_BUFFSIZE , 12 + ( char * ) in_ip , & in_port ) ;
/* Expect at least tracker id and packet type */
/* Expect at least tracker id and packet type */
if ( datalen < = ( ssize_t ) ( sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ) )
if ( ws . request_size < = ( ssize_t ) ( sizeof ( g_tracker_id ) + sizeof ( uint32_t ) ) )
continue ;
continue ;
if ( ! accesslist_isblessed ( in_ip , OT_PERMISSION_MAY_LIVESYNC ) )
if ( ! accesslist_isblessed ( in_ip , OT_PERMISSION_MAY_LIVESYNC ) )
continue ;
continue ;
if ( ! memcmp ( g_inbuffer , & g_tracker_id , sizeof ( g_tracker_id ) ) ) {
if ( ! memcmp ( ws . inbuf , & g_tracker_id , sizeof ( g_tracker_id ) ) ) {
/* TODO: log packet coming from ourselves */
/* TODO: log packet coming from ourselves */
continue ;
continue ;
}
}
switch ( uint32_read_big ( sizeof ( g_tracker_id ) + ( char * ) g_inbuffer ) ) {
switch ( uint32_read_big ( sizeof ( g_tracker_id ) + ( char * ) ws . inbuf ) ) {
case OT_SYNC_PEER :
case OT_SYNC_PEER :
livesync_handle_peersync ( datalen ) ;
livesync_handle_peersync ( & ws ) ;
break ;
# ifdef WANT_SYNC_SCRAPE
case OT_SYNC_SCRAPE_BEACON :
livesync_handle_beacon ( datalen ) ;
break ;
case OT_SYNC_SCRAPE_INQUIRE :
livesync_handle_inquire ( datalen ) ;
break ;
case OT_SYNC_SCRAPE_TELL :
livesync_handle_tell ( datalen ) ;
break ;
break ;
# endif /* WANT_SYNC_SCRAPE */
default :
default :
break ;
break ;
}
}