mirror of
git://erdgeist.org/opentracker
synced 2025-01-27 15:16:31 +00:00
Enable live syncing v6 peers
This commit is contained in:
parent
308e91a2fa
commit
a09609d94e
@ -35,7 +35,7 @@ char groupip_1[4] = { 224,0,23,5 };
|
|||||||
|
|
||||||
#define LIVESYNC_MAXDELAY 15 /* seconds */
|
#define LIVESYNC_MAXDELAY 15 /* seconds */
|
||||||
|
|
||||||
enum { OT_SYNC_PEER };
|
enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
|
||||||
|
|
||||||
/* Forward declaration */
|
/* Forward declaration */
|
||||||
static void * livesync_worker( void * args );
|
static void * livesync_worker( void * args );
|
||||||
@ -47,9 +47,14 @@ static int64 g_socket_in = -1;
|
|||||||
static int64 g_socket_out = -1;
|
static int64 g_socket_out = -1;
|
||||||
|
|
||||||
static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
char g_outbuf[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
|
typedef struct {
|
||||||
static size_t g_outbuf_data;
|
uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
|
||||||
static ot_time g_next_packet_time;
|
size_t fill;
|
||||||
|
ot_time next_packet_time;
|
||||||
|
} sync_buffer;
|
||||||
|
|
||||||
|
static sync_buffer g_v6_buf;
|
||||||
|
static sync_buffer g_v4_buf;
|
||||||
|
|
||||||
static pthread_t thread_id;
|
static pthread_t thread_id;
|
||||||
void livesync_init( ) {
|
void livesync_init( ) {
|
||||||
@ -58,11 +63,17 @@ void livesync_init( ) {
|
|||||||
exerr( "No socket address for live sync specified." );
|
exerr( "No socket address for live sync specified." );
|
||||||
|
|
||||||
/* Prepare outgoing peers buffer */
|
/* Prepare outgoing peers buffer */
|
||||||
memcpy( g_outbuf, &g_tracker_id, sizeof( g_tracker_id ) );
|
memcpy( g_v6_buf.data, &g_tracker_id, sizeof( g_tracker_id ) );
|
||||||
uint32_pack_big( g_outbuf + sizeof( g_tracker_id ), OT_SYNC_PEER);
|
memcpy( g_v4_buf.data, &g_tracker_id, sizeof( g_tracker_id ) );
|
||||||
g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
|
|
||||||
|
|
||||||
g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
|
uint32_pack_big( (char*)g_v6_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER6);
|
||||||
|
uint32_pack_big( (char*)g_v4_buf.data + sizeof( g_tracker_id ), OT_SYNC_PEER4);
|
||||||
|
|
||||||
|
g_v6_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t );
|
||||||
|
g_v4_buf.fill = sizeof( g_tracker_id ) + sizeof( uint32_t );
|
||||||
|
|
||||||
|
g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
|
||||||
|
g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
|
||||||
|
|
||||||
pthread_create( &thread_id, NULL, livesync_worker, NULL );
|
pthread_create( &thread_id, NULL, livesync_worker, NULL );
|
||||||
}
|
}
|
||||||
@ -107,28 +118,28 @@ void livesync_bind_mcast( ot_ip6 ip, uint16_t port) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */
|
/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */
|
||||||
static void livesync_issue_peersync( ) {
|
static void livesync_issue_peersync( sync_buffer *buf ) {
|
||||||
char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
|
char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
|
||||||
size_t data = g_outbuf_data;
|
size_t fill = buf->fill;
|
||||||
|
|
||||||
memcpy( mycopy, g_outbuf, data );
|
memcpy( mycopy, buf->data, fill );
|
||||||
g_outbuf_data = sizeof( g_tracker_id ) + sizeof( uint32_t );
|
buf->fill = sizeof( g_tracker_id ) + sizeof( uint32_t );
|
||||||
g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
|
buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY;
|
||||||
|
|
||||||
/* From now this thread has a local copy of the buffer and
|
/* From now this thread has a local copy of the buffer and
|
||||||
has modified the protected element */
|
has modified the protected element */
|
||||||
pthread_mutex_unlock(&g_outbuf_mutex);
|
pthread_mutex_unlock(&g_outbuf_mutex);
|
||||||
|
|
||||||
socket_send4(g_socket_out, mycopy, data, groupip_1, LIVESYNC_PORT);
|
socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void livesync_handle_peersync( struct ot_workstruct *ws ) {
|
static void livesync_handle_peersync( struct ot_workstruct *ws, size_t peer_size ) {
|
||||||
int off = sizeof( g_tracker_id ) + sizeof( uint32_t );
|
size_t off = sizeof( g_tracker_id ) + sizeof( uint32_t );
|
||||||
|
|
||||||
/* Now basic sanity checks have been done on the live sync packet
|
/* Now basic sanity checks have been done on the live sync packet
|
||||||
We might add more testing and logging. */
|
We might add more testing and logging. */
|
||||||
while( off + (ssize_t)sizeof( ot_hash ) + OT_PEER_SIZE4 <= ws->request_size ) {
|
while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= ws->request_size ) {
|
||||||
memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), OT_PEER_SIZE4 );
|
memcpy( &ws->peer, ws->request + off + sizeof(ot_hash), peer_size );
|
||||||
ws->hash = (ot_hash*)(ws->request + off);
|
ws->hash = (ot_hash*)(ws->request + off);
|
||||||
|
|
||||||
if( !g_opentracker_running ) return;
|
if( !g_opentracker_running ) return;
|
||||||
@ -138,12 +149,12 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) {
|
|||||||
else
|
else
|
||||||
add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 );
|
add_peer_to_torrent_and_return_peers( FLAG_MCA, ws, /* amount = */ 0 );
|
||||||
|
|
||||||
off += sizeof( ot_hash ) + sizeof( ot_peer );
|
off += sizeof( ot_hash ) + peer_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
stats_issue_event(EVENT_SYNC, 0,
|
stats_issue_event(EVENT_SYNC, 0,
|
||||||
(ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) /
|
(ws->request_size - sizeof( g_tracker_id ) - sizeof( uint32_t ) ) /
|
||||||
((ssize_t)sizeof( ot_hash ) + OT_PEER_SIZE4));
|
((ssize_t)sizeof( ot_hash ) + peer_size));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Tickle the live sync module from time to time, so no events get
|
/* Tickle the live sync module from time to time, so no events get
|
||||||
@ -152,24 +163,36 @@ static void livesync_handle_peersync( struct ot_workstruct *ws ) {
|
|||||||
void livesync_ticker( ) {
|
void livesync_ticker( ) {
|
||||||
/* livesync_issue_peersync sets g_next_packet_time */
|
/* livesync_issue_peersync sets g_next_packet_time */
|
||||||
pthread_mutex_lock(&g_outbuf_mutex);
|
pthread_mutex_lock(&g_outbuf_mutex);
|
||||||
if( g_now_seconds > g_next_packet_time &&
|
if( g_now_seconds > g_v6_buf.next_packet_time &&
|
||||||
g_outbuf_data > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
|
g_v6_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
|
||||||
livesync_issue_peersync();
|
livesync_issue_peersync(&g_v6_buf);
|
||||||
|
else
|
||||||
|
pthread_mutex_unlock(&g_outbuf_mutex);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&g_outbuf_mutex);
|
||||||
|
if( g_now_seconds > g_v4_buf.next_packet_time &&
|
||||||
|
g_v4_buf.fill > sizeof( g_tracker_id ) + sizeof( uint32_t ) )
|
||||||
|
livesync_issue_peersync(&g_v4_buf);
|
||||||
else
|
else
|
||||||
pthread_mutex_unlock(&g_outbuf_mutex);
|
pthread_mutex_unlock(&g_outbuf_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Inform live sync about whats going on. */
|
/* Inform live sync about whats going on. */
|
||||||
void livesync_tell( struct ot_workstruct *ws ) {
|
void livesync_tell( struct ot_workstruct *ws ) {
|
||||||
|
size_t peer_size; /* initialized in next line */
|
||||||
|
ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size);
|
||||||
|
sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf;
|
||||||
|
|
||||||
pthread_mutex_lock(&g_outbuf_mutex);
|
pthread_mutex_lock(&g_outbuf_mutex);
|
||||||
|
|
||||||
memcpy( g_outbuf + g_outbuf_data, ws->hash, sizeof(ot_hash) );
|
memcpy( dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash) );
|
||||||
memcpy( g_outbuf + g_outbuf_data + sizeof(ot_hash), &ws->peer, OT_PEER_SIZE4 );
|
dest_buf->fill += sizeof(ot_hash);
|
||||||
|
|
||||||
g_outbuf_data += sizeof(ot_hash) + OT_PEER_SIZE4;
|
memcpy( dest_buf->data + dest_buf->fill, peer_src, peer_size );
|
||||||
|
dest_buf->fill += peer_size;
|
||||||
|
|
||||||
if( g_outbuf_data >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS )
|
if( dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS )
|
||||||
livesync_issue_peersync();
|
livesync_issue_peersync(dest_buf);
|
||||||
else
|
else
|
||||||
pthread_mutex_unlock(&g_outbuf_mutex);
|
pthread_mutex_unlock(&g_outbuf_mutex);
|
||||||
}
|
}
|
||||||
@ -200,8 +223,11 @@ static void * livesync_worker( void * args ) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) {
|
switch( uint32_read_big( sizeof( g_tracker_id ) + (char *)ws.inbuf ) ) {
|
||||||
case OT_SYNC_PEER:
|
case OT_SYNC_PEER6:
|
||||||
livesync_handle_peersync( &ws );
|
livesync_handle_peersync( &ws, OT_PEER_SIZE6 );
|
||||||
|
break;
|
||||||
|
case OT_SYNC_PEER4:
|
||||||
|
livesync_handle_peersync( &ws, OT_PEER_SIZE4 );
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
@ -28,13 +28,19 @@
|
|||||||
Each tracker instance accumulates announce requests until its buffer is
|
Each tracker instance accumulates announce requests until its buffer is
|
||||||
full or a timeout is reached. Then it broadcasts its live sync packer:
|
full or a timeout is reached. Then it broadcasts its live sync packer:
|
||||||
|
|
||||||
packet type SYNC_LIVE
|
packet type SYNC_LIVE4
|
||||||
[ 0x0008 0x14 info_hash
|
[ 0x0008 0x14 info_hash
|
||||||
0x001c 0x04 peer's ipv4 address
|
0x001c 0x04 peer's ipv4 address
|
||||||
0x0020 0x02 peer's port
|
0x0020 0x02 peer's port
|
||||||
0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 )
|
0x0024 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 )
|
||||||
]*
|
]*
|
||||||
|
|
||||||
|
packet type SYNC_LIVE6
|
||||||
|
[ 0x0008 0x14 info_hash
|
||||||
|
0x001c 0x10 peer's ipv6 address
|
||||||
|
0x002c 0x02 peer's port
|
||||||
|
0x002e 0x02 peer flags v1 ( SEEDING = 0x80, COMPLETE = 0x40, STOPPED = 0x20 )
|
||||||
|
]*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifdef WANT_SYNC_LIVE
|
#ifdef WANT_SYNC_LIVE
|
||||||
|
Loading…
x
Reference in New Issue
Block a user