1
0
mirror of git://erdgeist.org/opentracker synced 2025-01-26 06:36:26 +00:00
opentracker/proxy.c
Romain Porte 019d58d154 fix implicit fallthrough spelling
This commit fixes the syntax of the implicit fallthrough comments, in
order to be matched by GCC (and probably other compilers as well) with
the following regular expression:

[ \t.!]*([Ee]lse,? |[Ii]ntentional(ly)? )? fall(s | |-)?thr(ough|u)[
\t.!]*(-[^\n\r]*)?

See: https://gcc.gnu.org/onlinedocs/gcc/Warning-Options.html#index-Wimplicit-fallthrough=
2021-08-22 14:42:17 +02:00

849 lines
27 KiB
C

/* This software was written by Dirk Engling <erdgeist@erdgeist.org>
It is considered beerware. Prost. Skol. Cheers or whatever.
$Id$ */
/* System */
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <pwd.h>
#include <ctype.h>
#include <pthread.h>
/* Libowfat */
#include "socket.h"
#include "io.h"
#include "iob.h"
#include "byte.h"
#include "scan.h"
#include "ip6.h"
#include "ndelay.h"
/* Opentracker */
#include "trackerlogic.h"
#include "ot_vector.h"
#include "ot_mutex.h"
#include "ot_stats.h"
#ifndef WANT_SYNC_LIVE
#define WANT_SYNC_LIVE
#endif
#include "ot_livesync.h"
ot_ip6 g_serverip;
uint16_t g_serverport = 9009;
uint32_t g_tracker_id;
char groupip_1[4] = { 224,0,23,5 };
int g_self_pipe[2];
/* If you have more than 10 peers, don't use this proxy
Use 20 slots for 10 peers to have room for 10 incoming connection slots
*/
#define MAX_PEERS 20
#define LIVESYNC_INCOMING_BUFFSIZE (256*256)
#define STREAMSYNC_OUTGOING_BUFFSIZE (256*256)
#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480
#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash))
#define LIVESYNC_MAXDELAY 15 /* seconds */
/* The amount of time a complete sync cycle should take */
#define OT_SYNC_INTERVAL_MINUTES 2
/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
#define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) )
enum { OT_SYNC_PEER };
enum { FLAG_SERVERSOCKET = 1 };
/* For incoming packets */
static int64 g_socket_in = -1;
static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];
/* For outgoing packets */
static int64 g_socket_out = -1;
static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
static uint8_t *g_peerbuffer_pos;
static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
static ot_time g_next_packet_time;
static void * livesync_worker( void * args );
static void * streamsync_worker( void * args );
static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer );
void exerr( char * message ) {
fprintf( stderr, "%s\n", message );
exit( 111 );
}
void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) {
(void) event;
(void) proto;
(void) event_data;
}
void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
char tmpip[4] = {0,0,0,0};
char *v4ip;
if( !ip6_isv4mapped(ip))
exerr("v6 mcast support not yet available.");
v4ip = ip+12;
if( g_socket_in != -1 )
exerr("Error: Livesync listen ip specified twice.");
if( ( g_socket_in = socket_udp4( )) < 0)
exerr("Error: Cant create live sync incoming socket." );
ndelay_off(g_socket_in);
if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 )
exerr("Error: Cant bind live sync incoming socket." );
if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) )
exerr("Error: Cant make live sync incoming socket join mcast group.");
if( ( g_socket_out = socket_udp4()) < 0)
exerr("Error: Cant create live sync outgoing socket." );
if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 )
exerr("Error: Cant bind live sync outgoing socket." );
socket_mcttl4(g_socket_out, 1);
socket_mcloop4(g_socket_out, 1);
}
size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer ) {
int exactmatch;
ot_torrent *torrent;
ot_peer *peer_dest;
ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
if( !torrent )
return -1;
if( !exactmatch ) {
/* Create a new torrent entry, then */
memcpy( torrent->hash, hash, sizeof(ot_hash) );
if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) {
vector_remove_torrent( torrents_list, torrent );
mutex_bucket_unlock_by_hash( hash, 0 );
return -1;
}
byte_zero( torrent->peer_list, sizeof( ot_peerlist ) );
}
/* Check for peer in torrent */
peer_dest = vector_find_or_insert_peer( &(torrent->peer_list->peers), peer, &exactmatch );
if( !peer_dest ) {
mutex_bucket_unlock_by_hash( hash, 0 );
return -1;
}
/* Tell peer that it's fresh */
OT_PEERTIME( peer ) = 0;
/* If we hadn't had a match create peer there */
if( !exactmatch ) {
torrent->peer_list->peer_count++;
if( OT_PEERFLAG(peer) & PEER_FLAG_SEEDING )
torrent->peer_list->seed_count++;
}
memcpy( peer_dest, peer, sizeof(ot_peer) );
mutex_bucket_unlock_by_hash( hash, 0 );
return 0;
}
size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer ) {
int exactmatch;
ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash );
ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch );
if( exactmatch ) {
ot_peerlist *peer_list = torrent->peer_list;
switch( vector_remove_peer( &peer_list->peers, peer ) ) {
case 2: peer_list->seed_count--; /* Intentional fallthrough */
case 1: peer_list->peer_count--; /* Intentional fallthrough */
default: break;
}
}
mutex_bucket_unlock_by_hash( hash, 0 );
return 0;
}
void free_peerlist( ot_peerlist *peer_list ) {
if( peer_list->peers.data ) {
if( OT_PEERLIST_HASBUCKETS( peer_list ) ) {
ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data);
while( peer_list->peers.size-- )
free( bucket_list++->data );
}
free( peer_list->peers.data );
}
free( peer_list );
}
static void livesync_handle_peersync( ssize_t datalen ) {
int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
fprintf( stderr, "." );
while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) {
ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash));
ot_hash *hash = (ot_hash*)(g_inbuffer + off);
if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED )
remove_peer_from_torrent_proxy( *hash, peer );
else
add_peer_to_torrent_proxy( *hash, peer );
off += sizeof( ot_hash ) + sizeof( ot_peer );
}
}
int usage( char *self ) {
fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self );
return 0;
}
enum {
FLAG_OUTGOING = 0x80,
FLAG_DISCONNECTED = 0x00,
FLAG_CONNECTING = 0x01,
FLAG_WAITTRACKERID = 0x02,
FLAG_CONNECTED = 0x03,
FLAG_MASK = 0x07
};
#define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING)
#define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED)
#define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED)
#define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING)
#define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID)
#define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED)
typedef struct {
int state; /* Whether we want to connect, how far our handshake is, etc. */
ot_ip6 ip; /* The peer to connect to */
uint16_t port; /* The peers port */
uint8_t indata[8192*16]; /* Any data not processed yet */
size_t indata_length; /* Length of unprocessed data */
uint32_t tracker_id; /* How the other end greeted */
int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
io_batch outdata; /* The iobatch containing our sync data */
size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */
uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */
uint8_t packet_type; /* Type of current packet */
uint32_t packet_tid; /* Tracker id for current packet */
} proxy_peer;
static void process_indata( proxy_peer * peer );
void reset_info_block( proxy_peer * peer ) {
peer->indata_length = 0;
peer->tracker_id = 0;
peer->fd = -1;
peer->packet_tcount = 0;
iob_reset( &peer->outdata );
PROXYPEER_SETDISCONNECTED( peer->state );
}
/* Number of connections to peers
* If a peer's IP is set, we try to reconnect, when the connection drops
* If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
* Multiple connections to/from the same ip are okay, if tracker_id doesn't match
* Reconnect attempts occur only twice a minute
*/
static int g_connection_count;
static ot_time g_connection_reconn;
static proxy_peer g_connections[MAX_PEERS];
static void handle_reconnects( void ) {
int i;
for( i=0; i<g_connection_count; ++i )
if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) {
int64 newfd = socket_tcp6( );
fprintf( stderr, "(Re)connecting to peer..." );
if( newfd < 0 ) continue; /* No socket for you */
io_fd(newfd);
if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) {
io_close( newfd );
continue;
}
if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 &&
errno != EINPROGRESS && errno != EWOULDBLOCK ) {
close(newfd);
continue;
}
io_wantwrite(newfd); /* So we will be informed when it is connected */
io_setcookie(newfd,g_connections+i);
/* Prepare connection info block */
reset_info_block( g_connections+i );
g_connections[i].fd = newfd;
PROXYPEER_SETCONNECTING( g_connections[i].state );
}
g_connection_reconn = time(NULL) + 30;
}
/* Handle incoming connection requests, check against whitelist */
static void handle_accept( int64 serversocket ) {
int64 newfd;
ot_ip6 ip;
uint16 port;
while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) {
/* XXX some access control */
/* Put fd into a non-blocking mode */
io_nonblock( newfd );
if( !io_fd( newfd ) )
io_close( newfd );
else {
/* Find a new home for our incoming connection */
int i;
for( i=0; i<MAX_PEERS; ++i )
if( g_connections[i].state == FLAG_DISCONNECTED )
break;
if( i == MAX_PEERS ) {
fprintf( stderr, "No room for incoming connection." );
close( newfd );
continue;
}
/* Prepare connection info block */
reset_info_block( g_connections+i );
PROXYPEER_SETCONNECTING( g_connections[i].state );
g_connections[i].port = port;
g_connections[i].fd = newfd;
io_setcookie( newfd, g_connections + i );
/* We expect the connecting side to begin with its tracker_id */
io_wantread( newfd );
}
}
return;
}
/* New sync data on the stream */
static void handle_read( int64 peersocket ) {
int i;
int64 datalen;
uint32_t tracker_id;
proxy_peer *peer = io_getcookie( peersocket );
if( !peer ) {
/* Can't happen ;) */
io_close( peersocket );
return;
}
switch( peer->state & FLAG_MASK ) {
case FLAG_DISCONNECTED:
io_close( peersocket );
break; /* Shouldnt happen */
case FLAG_CONNECTING:
case FLAG_WAITTRACKERID:
/* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) )
goto close_socket;
/* See, if we already have a connection to that peer */
for( i=0; i<MAX_PEERS; ++i )
if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED &&
g_connections[i].tracker_id == tracker_id ) {
fprintf( stderr, "Peer already connected. Closing connection.\n" );
goto close_socket;
}
/* Also no need for soliloquy */
if( tracker_id == g_tracker_id )
goto close_socket;
/* The new connection is good, send our tracker_id on incoming connections */
if( peer->state == FLAG_CONNECTING )
if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) )
goto close_socket;
peer->tracker_id = tracker_id;
PROXYPEER_SETCONNECTED( peer->state );
if( peer->state & FLAG_OUTGOING )
fprintf( stderr, "succeeded.\n" );
else
fprintf( stderr, "Incoming connection successful.\n" );
break;
close_socket:
fprintf( stderr, "Handshake incomplete, closing socket\n" );
io_close( peersocket );
reset_info_block( peer );
break;
case FLAG_CONNECTED:
/* Here we acutally expect data from peer
indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length );
if( !datalen || datalen < -1 ) {
fprintf( stderr, "Connection closed by remote peer.\n" );
io_close( peersocket );
reset_info_block( peer );
} else if( datalen > 0 ) {
peer->indata_length += datalen;
process_indata( peer );
}
break;
}
}
/* Can write new sync data to the stream */
static void handle_write( int64 peersocket ) {
proxy_peer *peer = io_getcookie( peersocket );
if( !peer ) {
/* Can't happen ;) */
io_close( peersocket );
return;
}
switch( peer->state & FLAG_MASK ) {
case FLAG_DISCONNECTED:
default: /* Should not happen */
io_close( peersocket );
break;
case FLAG_CONNECTING:
/* Ensure that the connection is established and handle connection error */
if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) {
fprintf( stderr, "failed\n" );
reset_info_block( peer );
io_close( peersocket );
break;
}
if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) {
PROXYPEER_SETWAITTRACKERID( peer->state );
io_dontwantwrite( peersocket );
io_wantread( peersocket );
} else {
fprintf( stderr, "Handshake incomplete, closing socket\n" );
io_close( peersocket );
reset_info_block( peer );
}
break;
case FLAG_CONNECTED:
switch( iob_send( peersocket, &peer->outdata ) ) {
case 0: /* all data sent */
io_dontwantwrite( peersocket );
break;
case -3: /* an error occured */
io_close( peersocket );
reset_info_block( peer );
break;
default: /* Normal operation or eagain */
break;
}
break;
}
return;
}
static void server_mainloop() {
int64 sock;
/* inlined livesync_init() */
memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) );
g_peerbuffer_pos = g_peerbuffer_start;
memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) );
uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER);
g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t);
g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
while(1) {
/* See, if we need to connect to anyone */
if( time(NULL) > g_connection_reconn )
handle_reconnects( );
/* Wait for io events until next approx reconn check time */
io_waituntil2( 30*1000 );
/* Loop over readable sockets */
while( ( sock = io_canread( ) ) != -1 ) {
const void *cookie = io_getcookie( sock );
if( (uintptr_t)cookie == FLAG_SERVERSOCKET )
handle_accept( sock );
else
handle_read( sock );
}
/* Loop over writable sockets */
while( ( sock = io_canwrite( ) ) != -1 )
handle_write( sock );
livesync_ticker( );
}
}
static void panic( const char *routine ) {
fprintf( stderr, "%s: %s\n", routine, strerror(errno) );
exit( 111 );
}
static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) {
int64 sock = socket_tcp6( );
if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 )
panic( "socket_bind6_reuse" );
if( socket_listen( sock, SOMAXCONN) == -1 )
panic( "socket_listen" );
if( !io_fd( sock ) )
panic( "io_fd" );
io_setcookie( sock, (void*)FLAG_SERVERSOCKET );
io_wantread( sock );
return sock;
}
static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) {
const char *s = src;
int off, bracket = 0;
while( isspace(*s) ) ++s;
if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */
if( !(off = scan_ip6( s, ip ) ) )
return 0;
s += off;
if( *s == 0 || isspace(*s)) return s-src;
if( *s == ']' && bracket ) ++s;
if( !ip6_isv4mapped(ip)){
if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0;
s++;
} else {
if( *(s++) != ':' ) return 0;
}
if( !(off = scan_ushort (s, port ) ) )
return 0;
return off+s-src;
}
int main( int argc, char **argv ) {
static pthread_t sync_in_thread_id;
static pthread_t sync_out_thread_id;
ot_ip6 serverip;
uint16_t tmpport;
int scanon = 1, lbound = 0, sbound = 0;
srandom( time(NULL) );
#ifdef WANT_ARC4RANDOM
g_tracker_id = arc4random();
#else
g_tracker_id = random();
#endif
noipv6=1;
while( scanon ) {
switch( getopt( argc, argv, ":l:c:L:h" ) ) {
case -1: scanon = 0; break;
case 'l':
tmpport = 0;
if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
ot_try_bind( serverip, tmpport );
++sbound;
break;
case 'c':
if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" );
tmpport = 0;
if( !scan_ip6_port( optarg,
g_connections[g_connection_count].ip,
&g_connections[g_connection_count].port ) ||
!g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); }
g_connections[g_connection_count++].state = FLAG_OUTGOING;
break;
case 'L':
tmpport = 9696;
if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
livesync_bind_mcast( serverip, tmpport); ++lbound; break;
default:
case '?': usage( argv[0] ); exit( 1 );
}
}
if( !lbound ) exerr( "No livesync port bound." );
if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." );
pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL );
pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL );
server_mainloop();
return 0;
}
static void * streamsync_worker( void * args ) {
(void)args;
while( 1 ) {
int bucket;
/* For each bucket... */
for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
/* Get exclusive access to that bucket */
ot_vector *torrents_list = mutex_bucket_lock( bucket );
size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
size_t mem, mem_a = 0, mem_b = 0;
uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c;
if( !torrents_list->size ) goto unlock_continue;
/* For each torrent in this bucket.. */
for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
/* Address torrents members */
ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list;
switch( peer_list->peer_count ) {
case 2: count_two++; break;
case 1: count_one++; break;
case 0: break;
default: count_def++;
count_peers += peer_list->peer_count;
}
}
/* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) +
( count_one + 2 * count_two + count_peers ) * 7;
fprintf( stderr, "Mem: %zd\n", mem );
ptr = ptr_a = ptr_b = ptr_c = malloc( mem );
if( !ptr ) goto unlock_continue;
if( count_one > 4 || !count_def ) {
mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 );
ptr_b += mem_a; ptr_c += mem_a;
ptr_a[0] = 1; /* Offset 0: packet type 1 */
ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
ptr_a[2] = count_one >> 8;
ptr_a[3] = count_one & 255;
ptr_a += 4;
} else
count_def += count_one;
if( count_two > 4 || !count_def ) {
mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 );
ptr_c += mem_b;
ptr_b[0] = 2; /* Offset 0: packet type 2 */
ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
ptr_b[2] = count_two >> 8;
ptr_b[3] = count_two & 255;
ptr_b += 4;
} else
count_def += count_two;
if( count_def ) {
ptr_c[0] = 0; /* Offset 0: packet type 0 */
ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
ptr_c[2] = count_def >> 8;
ptr_c[3] = count_def & 255;
ptr_c += 4;
}
/* For each torrent in this bucket.. */
for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) {
/* Address torrents members */
ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset;
ot_peerlist *peer_list = torrent->peer_list;
ot_peer *peers = (ot_peer*)(peer_list->peers.data);
uint8_t **dst;
/* Determine destination slot */
count_peers = peer_list->peer_count;
switch( count_peers ) {
case 0: continue;
case 1: dst = mem_a ? &ptr_a : &ptr_c; break;
case 2: dst = mem_b ? &ptr_b : &ptr_c; break;
default: dst = &ptr_c; break;
}
/* Copy tail of info_hash, advance pointer */
memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1);
*dst += sizeof( ot_hash ) - 1;
/* Encode peer count */
if( dst == &ptr_c )
while( count_peers ) {
if( count_peers <= 0x7f )
*(*dst)++ = count_peers;
else
*(*dst)++ = 0x80 | ( count_peers & 0x7f );
count_peers >>= 7;
}
/* Copy peers */
count_peers = peer_list->peer_count;
while( count_peers-- ) {
memcpy( *dst, peers++, OT_IP_SIZE + 3 );
*dst += OT_IP_SIZE + 3;
}
free_peerlist(peer_list);
}
free( torrents_list->data );
memset( torrents_list, 0, sizeof(*torrents_list ) );
unlock_continue:
mutex_bucket_unlock( bucket, 0 );
if( ptr ) {
int i;
if( ptr_b > ptr_c ) ptr_c = ptr_b;
if( ptr_a > ptr_c ) ptr_c = ptr_a;
mem = ptr_c - ptr;
for( i=0; i < MAX_PEERS; ++i ) {
if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) {
void *tmp = malloc( mem );
if( tmp ) {
memcpy( tmp, ptr, mem );
iob_addbuf_free( &g_connections[i].outdata, tmp, mem );
io_wantwrite( g_connections[i].fd );
}
}
}
free( ptr );
}
usleep( OT_SYNC_SLEEP );
}
}
return 0;
}
static void livesync_issue_peersync( ) {
socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start,
groupip_1, LIVESYNC_PORT);
g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t );
g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
}
void livesync_ticker( ) {
/* livesync_issue_peersync sets g_next_packet_time */
if( time(NULL) > g_next_packet_time &&
g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
livesync_issue_peersync();
}
static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) {
// unsigned int i;
*g_peerbuffer_pos = prefix;
memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 );
memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 );
#if 0
/* Dump info_hash */
for( i=0; i<sizeof(ot_hash); ++i )
printf( "%02X", g_peerbuffer_pos[i] );
putchar( ':' );
#endif
g_peerbuffer_pos += sizeof(ot_hash);
#if 0
printf( "%hhu.%hhu.%hhu.%hhu:%hu (%02X %02X)\n", g_peerbuffer_pos[0], g_peerbuffer_pos[1], g_peerbuffer_pos[2], g_peerbuffer_pos[3],
g_peerbuffer_pos[4] | ( g_peerbuffer_pos[5] << 8 ), g_peerbuffer_pos[6], g_peerbuffer_pos[7] );
#endif
g_peerbuffer_pos += sizeof(ot_peer);
if( g_peerbuffer_pos >= g_peerbuffer_highwater )
livesync_issue_peersync();
}
static void process_indata( proxy_peer * peer ) {
size_t consumed, peers;
uint8_t *data = peer->indata, *hash;
uint8_t *dataend = data + peer->indata_length;
while( 1 ) {
/* If we're not inside of a packet, make a new one */
if( !peer->packet_tcount ) {
/* Ensure the header is complete or postpone processing */
if( data + 4 > dataend ) break;
peer->packet_type = data[0];
peer->packet_tprefix = data[1];
peer->packet_tcount = data[2] * 256 + data[3];
data += 4;
printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount );
}
/* Ensure size for a minimal torrent block */
if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break;
/* Advance pointer to peer count or peers */
hash = data;
data += sizeof(ot_hash) - 1;
/* Type 0 has peer count encoded before each peers */
peers = peer->packet_type;
if( !peers ) {
int shift = 0;
do peers |= ( 0x7f & *data ) << ( 7 * shift );
while ( *(data++) & 0x80 && shift++ < 6 );
}
#if 0
printf( "peers: %zd\n", peers );
#endif
/* Ensure enough data being read to hold all peers */
if( data + (OT_IP_SIZE + 3) * peers > dataend ) {
data = hash;
break;
}
while( peers-- ) {
livesync_proxytell( peer->packet_tprefix, hash, data );
data += OT_IP_SIZE + 3;
}
--peer->packet_tcount;
}
consumed = data - peer->indata;
memmove( peer->indata, data, peer->indata_length - consumed );
peer->indata_length -= consumed;
}
static void * livesync_worker( void * args ) {
(void)args;
while( 1 ) {
ot_ip6 in_ip; uint16_t in_port;
size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port);
/* Expect at least tracker id and packet type */
if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) )
continue;
if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) {
/* drop packet coming from ourselves */
continue;
}
switch( uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ) {
case OT_SYNC_PEER:
livesync_handle_peersync( datalen );
break;
default:
// fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );
break;
}
}
return 0;
}