Browse Source

clang-format

master
Dirk Engling 7 months ago
parent
commit
b1606fd37e
  1. 168
      proxy.c

168
proxy.c

@ -4,33 +4,33 @@ @@ -4,33 +4,33 @@
$Id$ */
/* System */
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <pthread.h>
#include <pwd.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <pwd.h>
#include <ctype.h>
#include <pthread.h>
/* Libowfat */
#include "socket.h"
#include "byte.h"
#include "io.h"
#include "iob.h"
#include "byte.h"
#include "scan.h"
#include "ip6.h"
#include "ndelay.h"
#include "scan.h"
#include "socket.h"
/* Opentracker */
#include "trackerlogic.h"
#include "ot_vector.h"
#include "ot_mutex.h"
#include "ot_stats.h"
#include "ot_vector.h"
#include "trackerlogic.h"
#ifndef WANT_SYNC_LIVE
#define WANT_SYNC_LIVE
@ -136,8 +136,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size @@ -136,8 +136,7 @@ size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size
/* Create a new torrent entry, then */
memcpy(torrent->hash, hash, sizeof(ot_hash));
if( !( torrent->peer_list6 = malloc( sizeof (ot_peerlist) ) ) ||
!( torrent->peer_list4 = malloc( sizeof (ot_peerlist) ) ) ) {
if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) {
vector_remove_torrent(torrents_list, torrent);
mutex_bucket_unlock_by_hash(hash, 0);
return -1;
@ -177,9 +176,12 @@ size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_ @@ -177,9 +176,12 @@ size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_
if (exactmatch) {
ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) {
case 2: peer_list->seed_count--; /* Intentional fallthrough */
case 1: peer_list->peer_count--; /* Intentional fallthrough */
default: break;
case 2:
peer_list->seed_count--; /* Intentional fallthrough */
case 1:
peer_list->peer_count--; /* Intentional fallthrough */
default:
break;
}
}
@ -284,14 +286,14 @@ static void handle_reconnects( void ) { @@ -284,14 +286,14 @@ static void handle_reconnects( void ) {
if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) {
int64 newfd = socket_tcp6();
fprintf(stderr, "(Re)connecting to peer...");
if( newfd < 0 ) continue; /* No socket for you */
if (newfd < 0)
continue; /* No socket for you */
io_fd(newfd);
if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) {
io_close(newfd);
continue;
}
if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 &&
errno != EINPROGRESS && errno != EWOULDBLOCK ) {
if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) {
close(newfd);
continue;
}
@ -374,8 +376,7 @@ static void handle_read( int64 peersocket ) { @@ -374,8 +376,7 @@ static void handle_read( int64 peersocket ) {
/* See, if we already have a connection to that peer */
for (i = 0; i < MAX_PEERS; ++i)
if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED &&
g_connections[i].tracker_id == tracker_id ) {
if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) {
fprintf(stderr, "Peer already connected. Closing connection.\n");
goto close_socket;
}
@ -529,22 +530,27 @@ static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { @@ -529,22 +530,27 @@ static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) {
return sock;
}
static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) {
const char *s = src;
int off, bracket = 0;
while( isspace(*s) ) ++s;
if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */
while (isspace(*s))
++s;
if (*s == '[')
++s, ++bracket; /* for v6 style notation */
if (!(off = scan_ip6(s, ip)))
return 0;
s += off;
if( *s == 0 || isspace(*s)) return s-src;
if( *s == ']' && bracket ) ++s;
if (*s == 0 || isspace(*s))
return s - src;
if (*s == ']' && bracket)
++s;
if (!ip6_isv4mapped(ip)) {
if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0;
if ((bracket && *(s) != ':') || (*(s) != '.'))
return 0;
s++;
} else {
if( *(s++) != ':' ) return 0;
if (*(s++) != ':')
return 0;
}
if (!(off = scan_ushort(s, port)))
return 0;
@ -567,33 +573,48 @@ int main( int argc, char **argv ) { @@ -567,33 +573,48 @@ int main( int argc, char **argv ) {
while (scanon) {
switch (getopt(argc, argv, ":l:c:L:h")) {
case -1: scanon = 0; break;
case -1:
scanon = 0;
break;
case 'l':
tmpport = 0;
if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
usage(argv[0]);
exit(1);
}
ot_try_bind(serverip, tmpport);
++sbound;
break;
case 'c':
if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" );
if (g_connection_count > MAX_PEERS / 2)
exerr("Connection limit exceeded.\n");
tmpport = 0;
if( !scan_ip6_port( optarg,
g_connections[g_connection_count].ip,
&g_connections[g_connection_count].port ) ||
!g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); }
if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) {
usage(argv[0]);
exit(1);
}
g_connections[g_connection_count++].state = FLAG_OUTGOING;
break;
case 'L':
tmpport = 9696;
if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); }
livesync_bind_mcast( serverip, tmpport); ++lbound; break;
if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
usage(argv[0]);
exit(1);
}
livesync_bind_mcast(serverip, tmpport);
++lbound;
break;
default:
case '?': usage( argv[0] ); exit( 1 );
case '?':
usage(argv[0]);
exit(1);
}
}
if( !lbound ) exerr( "No livesync port bound." );
if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." );
if (!lbound)
exerr("No livesync port bound.");
if (!g_connection_count && !sbound)
exerr("No streamsync port bound.");
pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL);
pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL);
@ -613,33 +634,41 @@ static void * streamsync_worker( void * args ) { @@ -613,33 +634,41 @@ static void * streamsync_worker( void * args ) {
size_t mem, mem_a = 0, mem_b = 0;
uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c;
if( !torrents_list->size ) goto unlock_continue;
if (!torrents_list->size)
goto unlock_continue;
/* For each torrent in this bucket.. */
for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
/* Address torrents members */
ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list;
switch (peer_list->peer_count) {
case 2: count_two++; break;
case 1: count_one++; break;
case 0: break;
default: count_def++;
case 2:
count_two++;
break;
case 1:
count_one++;
break;
case 0:
break;
default:
count_def++;
count_peers += peer_list->peer_count;
}
}
/* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) +
( count_one + 2 * count_two + count_peers ) * 7;
mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7;
fprintf(stderr, "Mem: %zd\n", mem);
ptr = ptr_a = ptr_b = ptr_c = malloc(mem);
if( !ptr ) goto unlock_continue;
if (!ptr)
goto unlock_continue;
if (count_one > 4 || !count_def) {
mem_a = 1 + 1 + 2 + count_one * (19 + 7);
ptr_b += mem_a; ptr_c += mem_a;
ptr_b += mem_a;
ptr_c += mem_a;
ptr_a[0] = 1; /* Offset 0: packet type 1 */
ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
ptr_a[2] = count_one >> 8;
@ -678,10 +707,17 @@ static void * streamsync_worker( void * args ) { @@ -678,10 +707,17 @@ static void * streamsync_worker( void * args ) {
/* Determine destination slot */
count_peers = peer_list->peer_count;
switch (count_peers) {
case 0: continue;
case 1: dst = mem_a ? &ptr_a : &ptr_c; break;
case 2: dst = mem_b ? &ptr_b : &ptr_c; break;
default: dst = &ptr_c; break;
case 0:
continue;
case 1:
dst = mem_a ? &ptr_a : &ptr_c;
break;
case 2:
dst = mem_b ? &ptr_b : &ptr_c;
break;
default:
dst = &ptr_c;
break;
}
/* Copy tail of info_hash, advance pointer */
@ -715,8 +751,10 @@ unlock_continue: @@ -715,8 +751,10 @@ unlock_continue:
if (ptr) {
int i;
if( ptr_b > ptr_c ) ptr_c = ptr_b;
if( ptr_a > ptr_c ) ptr_c = ptr_a;
if (ptr_b > ptr_c)
ptr_c = ptr_b;
if (ptr_a > ptr_c)
ptr_c = ptr_a;
mem = ptr_c - ptr;
for (i = 0; i < MAX_PEERS; ++i) {
@ -739,16 +777,14 @@ unlock_continue: @@ -739,16 +777,14 @@ unlock_continue:
}
static void livesync_issue_peersync() {
socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start,
groupip_1, LIVESYNC_PORT);
socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT);
g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t);
g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
}
void livesync_ticker() {
/* livesync_issue_peersync sets g_next_packet_time */
if( time(NULL) > g_next_packet_time &&
g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) )
if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id))
livesync_issue_peersync();
}
@ -785,7 +821,8 @@ static void process_indata( proxy_peer * peer ) { @@ -785,7 +821,8 @@ static void process_indata( proxy_peer * peer ) {
/* If we're not inside of a packet, make a new one */
if (!peer->packet_tcount) {
/* Ensure the header is complete or postpone processing */
if( data + 4 > dataend ) break;
if (data + 4 > dataend)
break;
peer->packet_type = data[0];
peer->packet_tprefix = data[1];
peer->packet_tcount = data[2] * 256 + data[3];
@ -794,7 +831,8 @@ printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer @@ -794,7 +831,8 @@ printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer
}
/* Ensure size for a minimal torrent block */
if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break;
if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend)
break;
/* Advance pointer to peer count or peers */
hash = data;
@ -804,7 +842,8 @@ printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer @@ -804,7 +842,8 @@ printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer
peers = peer->packet_type;
if (!peers) {
int shift = 0;
do peers |= ( 0x7f & *data ) << ( 7 * shift );
do
peers |= (0x7f & *data) << (7 * shift);
while (*(data++) & 0x80 && shift++ < 6);
}
#if 0
@ -830,7 +869,8 @@ printf( "peers: %zd\n", peers ); @@ -830,7 +869,8 @@ printf( "peers: %zd\n", peers );
static void *livesync_worker(void *args) {
(void)args;
while (1) {
ot_ip6 in_ip; uint16_t in_port;
ot_ip6 in_ip;
uint16_t in_port;
size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port);
/* Expect at least tracker id and packet type */

Loading…
Cancel
Save