mirror of git://erdgeist.org/opentracker
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
239 lines
7.1 KiB
239 lines
7.1 KiB
/* This software was written by Dirk Engling <erdgeist@erdgeist.org> |
|
It is considered beerware. Prost. Skol. Cheers or whatever. |
|
|
|
$id$ */ |
|
|
|
/* System */ |
|
#include <pthread.h> |
|
#include <stdlib.h> |
|
#include <string.h> |
|
#include <sys/types.h> |
|
#include <sys/uio.h> |
|
#include <unistd.h> |
|
|
|
/* Libowfat */ |
|
#include "byte.h" |
|
#include "ip6.h" |
|
#include "ndelay.h" |
|
#include "socket.h" |
|
|
|
/* Opentracker */ |
|
#include "ot_accesslist.h" |
|
#include "ot_livesync.h" |
|
#include "ot_mutex.h" |
|
#include "ot_stats.h" |
|
#include "trackerlogic.h" |
|
|
|
#ifdef WANT_SYNC_LIVE |
|
|
|
char groupip_1[4] = {224, 0, 23, 5}; |
|
|
|
#define LIVESYNC_INCOMING_BUFFSIZE (256 * 256) |
|
|
|
#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
|
#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash)) |
|
|
|
#define LIVESYNC_MAXDELAY 15 /* seconds */ |
|
|
|
enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; |
|
|
|
/* Forward declaration */ |
|
static void *livesync_worker(void *args); |
|
|
|
/* For outgoing packets */ |
|
static int64 g_socket_in = -1; |
|
|
|
/* For incoming packets */ |
|
static int64 g_socket_out = -1; |
|
|
|
static pthread_mutex_t g_outbuf_mutex = PTHREAD_MUTEX_INITIALIZER; |
|
typedef struct { |
|
uint8_t data[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
|
size_t fill; |
|
ot_time next_packet_time; |
|
} sync_buffer; |
|
|
|
static sync_buffer g_v6_buf; |
|
static sync_buffer g_v4_buf; |
|
|
|
static pthread_t thread_id; |
|
void livesync_init() { |
|
|
|
if (g_socket_in == -1) |
|
exerr("No socket address for live sync specified."); |
|
|
|
/* Prepare outgoing peers buffer */ |
|
memcpy(g_v6_buf.data, &g_tracker_id, sizeof(g_tracker_id)); |
|
memcpy(g_v4_buf.data, &g_tracker_id, sizeof(g_tracker_id)); |
|
|
|
uint32_pack_big((char *)g_v6_buf.data + sizeof(g_tracker_id), OT_SYNC_PEER6); |
|
uint32_pack_big((char *)g_v4_buf.data + sizeof(g_tracker_id), OT_SYNC_PEER4); |
|
|
|
g_v6_buf.fill = sizeof(g_tracker_id) + sizeof(uint32_t); |
|
g_v4_buf.fill = sizeof(g_tracker_id) + sizeof(uint32_t); |
|
|
|
g_v6_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
|
g_v4_buf.next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
|
|
|
pthread_create(&thread_id, NULL, livesync_worker, NULL); |
|
} |
|
|
|
void livesync_deinit() { |
|
if (g_socket_in != -1) |
|
close(g_socket_in); |
|
if (g_socket_out != -1) |
|
close(g_socket_out); |
|
|
|
pthread_cancel(thread_id); |
|
} |
|
|
|
void livesync_bind_mcast(ot_ip6 ip, uint16_t port) { |
|
char tmpip[4] = {0, 0, 0, 0}; |
|
char *v4ip; |
|
|
|
if (!ip6_isv4mapped(ip)) |
|
exerr("v6 mcast support not yet available."); |
|
v4ip = ip + 12; |
|
|
|
if (g_socket_in != -1) |
|
exerr("Error: Livesync listen ip specified twice."); |
|
|
|
if ((g_socket_in = socket_udp4()) < 0) |
|
exerr("Error: Cant create live sync incoming socket."); |
|
ndelay_off(g_socket_in); |
|
|
|
if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1) |
|
exerr("Error: Cant bind live sync incoming socket."); |
|
|
|
if (socket_mcjoin4(g_socket_in, groupip_1, v4ip)) |
|
exerr("Error: Cant make live sync incoming socket join mcast group."); |
|
|
|
if ((g_socket_out = socket_udp4()) < 0) |
|
exerr("Error: Cant create live sync outgoing socket."); |
|
if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1) |
|
exerr("Error: Cant bind live sync outgoing socket."); |
|
|
|
socket_mcttl4(g_socket_out, 1); |
|
socket_mcloop4(g_socket_out, 0); |
|
} |
|
|
|
/* Caller MUST hold g_outbuf_mutex. Returns with g_outbuf_mutex unlocked */ |
|
static void livesync_issue_peersync(sync_buffer *buf) { |
|
char mycopy[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; |
|
size_t fill = buf->fill; |
|
|
|
memcpy(mycopy, buf->data, fill); |
|
buf->fill = sizeof(g_tracker_id) + sizeof(uint32_t); |
|
buf->next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; |
|
|
|
/* From now this thread has a local copy of the buffer and |
|
has modified the protected element */ |
|
pthread_mutex_unlock(&g_outbuf_mutex); |
|
|
|
socket_send4(g_socket_out, mycopy, fill, groupip_1, LIVESYNC_PORT); |
|
} |
|
|
|
static void livesync_handle_peersync(struct ot_workstruct *ws, size_t peer_size) { |
|
size_t off = sizeof(g_tracker_id) + sizeof(uint32_t); |
|
|
|
/* Now basic sanity checks have been done on the live sync packet |
|
We might add more testing and logging. */ |
|
while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= ws->request_size) { |
|
memcpy(&ws->peer, ws->request + off + sizeof(ot_hash), peer_size); |
|
ws->hash = (ot_hash *)(ws->request + off); |
|
|
|
if (!g_opentracker_running) |
|
return; |
|
|
|
if (OT_PEERFLAG(ws->peer) & PEER_FLAG_STOPPED) |
|
remove_peer_from_torrent(FLAG_MCA, ws); |
|
else |
|
add_peer_to_torrent_and_return_peers(FLAG_MCA, ws, /* amount = */ 0); |
|
|
|
off += sizeof(ot_hash) + peer_size; |
|
} |
|
|
|
stats_issue_event(EVENT_SYNC, 0, (ws->request_size - sizeof(g_tracker_id) - sizeof(uint32_t)) / ((ssize_t)sizeof(ot_hash) + peer_size)); |
|
} |
|
|
|
/* Tickle the live sync module from time to time, so no events get |
|
stuck when there's not enough traffic to fill udp packets fast |
|
enough */ |
|
void livesync_ticker() { |
|
/* livesync_issue_peersync sets g_next_packet_time */ |
|
pthread_mutex_lock(&g_outbuf_mutex); |
|
if (g_now_seconds > g_v6_buf.next_packet_time && g_v6_buf.fill > sizeof(g_tracker_id) + sizeof(uint32_t)) |
|
livesync_issue_peersync(&g_v6_buf); |
|
else |
|
pthread_mutex_unlock(&g_outbuf_mutex); |
|
|
|
pthread_mutex_lock(&g_outbuf_mutex); |
|
if (g_now_seconds > g_v4_buf.next_packet_time && g_v4_buf.fill > sizeof(g_tracker_id) + sizeof(uint32_t)) |
|
livesync_issue_peersync(&g_v4_buf); |
|
else |
|
pthread_mutex_unlock(&g_outbuf_mutex); |
|
} |
|
|
|
/* Inform live sync about whats going on. */ |
|
void livesync_tell(struct ot_workstruct *ws) { |
|
size_t peer_size; /* initialized in next line */ |
|
ot_peer *peer_src = peer_from_peer6(&ws->peer, &peer_size); |
|
sync_buffer *dest_buf = peer_size == OT_PEER_SIZE6 ? &g_v6_buf : &g_v4_buf; |
|
|
|
pthread_mutex_lock(&g_outbuf_mutex); |
|
|
|
memcpy(dest_buf->data + dest_buf->fill, ws->hash, sizeof(ot_hash)); |
|
dest_buf->fill += sizeof(ot_hash); |
|
|
|
memcpy(dest_buf->data + dest_buf->fill, peer_src, peer_size); |
|
dest_buf->fill += peer_size; |
|
|
|
if (dest_buf->fill >= LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS) |
|
livesync_issue_peersync(dest_buf); |
|
else |
|
pthread_mutex_unlock(&g_outbuf_mutex); |
|
} |
|
|
|
static void *livesync_worker(void *args) { |
|
struct ot_workstruct ws; |
|
ot_ip6 in_ip; |
|
uint16_t in_port; |
|
|
|
(void)args; |
|
|
|
/* Initialize our "thread local storage" */ |
|
ws.inbuf = ws.request = malloc(LIVESYNC_INCOMING_BUFFSIZE); |
|
ws.outbuf = ws.reply = 0; |
|
|
|
memcpy(in_ip, V4mappedprefix, sizeof(V4mappedprefix)); |
|
|
|
while (1) { |
|
ws.request_size = socket_recv4(g_socket_in, (char *)ws.inbuf, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port); |
|
|
|
/* Expect at least tracker id and packet type */ |
|
if (ws.request_size <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t))) |
|
continue; |
|
if (!accesslist_is_blessed(in_ip, OT_PERMISSION_MAY_LIVESYNC)) |
|
continue; |
|
if (!memcmp(ws.inbuf, &g_tracker_id, sizeof(g_tracker_id))) { |
|
/* TODO: log packet coming from ourselves */ |
|
continue; |
|
} |
|
|
|
switch (uint32_read_big(sizeof(g_tracker_id) + (char *)ws.inbuf)) { |
|
case OT_SYNC_PEER6: |
|
livesync_handle_peersync(&ws, OT_PEER_SIZE6); |
|
break; |
|
case OT_SYNC_PEER4: |
|
livesync_handle_peersync(&ws, OT_PEER_SIZE4); |
|
break; |
|
default: |
|
break; |
|
} |
|
} |
|
|
|
/* Never returns. */ |
|
return NULL; |
|
} |
|
|
|
#endif
|
|
|