diff --git a/sync_daemon.pl b/sync_daemon.pl new file mode 100644 index 0000000..5f49422 --- /dev/null +++ b/sync_daemon.pl @@ -0,0 +1,291 @@ +#!/usr/bin/perl + +use strict; + +#use Convert::Bencode_XS qw(:all); +use Convert::Bencode qw(:all); +use Data::Dumper; +use LWP::UserAgent; +use URI::Escape; + +# enable verbose output +my $debug = 0; + +# tracker from where we get our sync data +my @trackers = ('127.0.0.1:8989'); +# tracker to upload merged data +my @client_tracker = ('127.0.0.1:8989'); + +# time to wait between syncs +my $sleeptime = '300'; + +# SSL cert and key +my $ssl_cert = 'cert.pem'; +my $ssl_key = 'key.pem'; + +foreach(@trackers) { + print "Syncing from: $_\n"; +} +foreach(@client_tracker) { + print "Syncing to: $_\n"; +} + +my $file = shift; + + +# global hash for storing the merged syncs +my %merged_syncs; + +while(1) { + %merged_syncs; + my @bencoded_sync_data; + + # fetch the sync from every tracker and put it into an array in bencoded form + foreach my $tracker (@trackers) { + my $bencoded_sync = fetch_sync($tracker); +# my $bencoded_sync = fetch_sync_from_file($file); + if($bencoded_sync ne 0 && $bencoded_sync =~ /^d4\:sync/) { + push(@bencoded_sync_data,$bencoded_sync); + } + } + + # bdecode every sync and throw it into the merged-sync + foreach my $bencoded_sync (@bencoded_sync_data) { + print "Doing merge...\n"; + merge_sync(bdecode($bencoded_sync)); + my $num_torrents = keys(%merged_syncs); + + print "number of torrents: $num_torrents\n"; + } + + # number of max. peers in one changeset + my $peer_limit = 500; + # max number of changesets per commit + my $max_changesets = 10; + my $hash_count = 0; + my $peer_count = 0; + my $changeset; + my @escaped_changesets; + + # run until all hashes are put into changesets and commited to the trackers + while(keys(%merged_syncs) != 0) { + + foreach my $hash (keys(%merged_syncs)) { + print "Starting new changeset\n" if($peer_count == 0 && $debug); + my $num_peers = keys(%{$merged_syncs{$hash}}); + + print "\t$peer_count peers for $hash_count hashes in changeset\n" if($debug); + + my $pack_hash = pack('H*',$hash); + + # as long as the peer_limit is not reached, add new hashes with peers to the changeset hash-table + if($peer_count < $peer_limit) { + print "\t\tAdd $num_peers peers for $hash changeset\n" if($debug); + $peer_count = $peer_count + $num_peers; + foreach my $peer_socket (keys(%{$merged_syncs{$hash}})) { + + my $flags = $merged_syncs{$hash}{$peer_socket}; + + print "\t\t\tAdd $peer_socket $flags\n" if($debug); + + my $pack_peer = packme($peer_socket,$flags); + + $changeset->{'sync'}->{$pack_hash} = $changeset->{'sync'}->{$pack_hash}.$pack_peer; + } + $hash_count++; + # hash is stored in the changeset, delete it from the hash-table + delete $merged_syncs{$hash}; + } + + # the peer_limit is reached or we are out of torrents, so start preparing a changeset + if($peer_count >= $peer_limit || keys(%merged_syncs) == 0) { + + print "Commit changeset for $hash_count hashes with $peer_count peers total\n" if($debug); + + # bencode the changeset + my $enc_changeset = bencode($changeset); + + # URL-escape the changeset and put into an array of changesets + my $foobar = uri_escape($enc_changeset); + push(@escaped_changesets,$foobar); + + # the changeset is ready and stored, so delete it from the changeset hash-table + delete $changeset->{'sync'}; + + $hash_count = 0; + $peer_count = 0; + print "\n\n\n" if($debug); + } + + # if enought changesets are prepared or we are out of torrents for more changesets, + # sync the changesets to the trackers + if($#escaped_changesets == $max_changesets || keys(%merged_syncs) == 0) { + print "\tSync...\n"; + sync_to_tracker(\@escaped_changesets); + undef @escaped_changesets; + } + + } + } + + print "Sleeping for $sleeptime seconds\n"; + sleep $sleeptime; +} + +sub connect_tracker { + # connect a tracker via HTTPS, returns the body of the response + my $url = shift; + + $ENV{HTTPS_DEBUG} = 0; + $ENV{HTTPS_CERT_FILE} = $ssl_cert; + $ENV{HTTPS_KEY_FILE} = $ssl_key; + + my $ua = new LWP::UserAgent; + my $req = new HTTP::Request('GET', $url); + my $res = $ua->request($req); + + my $content = $res->content; + + if($res->is_success()) { + return $content; + } else { + print $res->code."|".$res->status_line."\n"; + return 0; + } +} + +sub sync_to_tracker { + # commit changesets to a tracker + my @changesets = @{(shift)}; + + # prepare the URI with URL-encoded changesets concatenated by a & + my $uri = 'sync?'; + foreach my $set (@changesets) { + $uri .= 'changeset='.$set.'&'; + } + my $uri_length = length($uri); + + # commit the collection of changesets to the tracker via HTTPS + foreach my $tracker (@client_tracker) { + print "\t\tTracker: $tracker (URI: $uri_length)\n"; + my $url = "https://$tracker/".$uri; + connect_tracker($url); + } +} + +sub packme { + # pack data + # returns ipaddr, port and flags in packed format + my $peer_socket = shift; + my $flags = shift; + + my($a,$b,$c,$d,$port) = split(/[\.,\:]/,$peer_socket); + my $pack_peer = pack('C4 n1 b16',$a,$b,$c,$d,$port,$flags); + + return $pack_peer; +} + +sub unpackme { + # unpack packed data + # returns ipaddr. in quad-form with port (a.b.c.d:port) and flags as bitfield + # data is packed as: + # - 4 byte ipaddr. (unsigned char value) + # - 2 byte port (unsigned short in "network" (big-endian) order) + # - 2 byte flags (bit string (ascending bit order inside each byte)) + my $data = shift; + + my($a, $b, $c, $d, $port, $flags) = unpack('C4 n1 b16',$data); + my $peer_socket = "$a\.$b\.$c\.$d\:$port"; + + return($peer_socket,$flags); +} + +sub fetch_sync { + # fetch sync from a tracker + my $tracker = shift; + my $url = "https://$tracker/sync"; + + print "Fetching from $url\n"; + + my $body = connect_tracker($url); + + if($body && $body =~ /^d4\:sync/) { + return $body; + } else { + return 0; + } +} + +sub fetch_sync_from_file { + # fetch sync from a file + my $file = shift; + my $body; + print "Fetching from file $file\n"; + open(FILE,"<$file"); + while() { + $body .= $_; + } + close(FILE); + return $body; +} + +sub merge_sync { + # This builds a hash table with the torrenthash as keys. The value is a hash table again with the peer-socket as keys + # and flags in the value + # Example: + # 60dd2beb4197f71677c0f5ba92b956f7d04651e5 => + # 192.168.23.23:2323 => 0000000000000000 + # 23.23.23.23:2342 => 0000000100000000 + # b220b4d7136e84a88abc090db88bec8604a808f3 => + # 42.23.42.23:55555 => 0000000000000000 + + my $hashref = shift; + + my $nonuniq_hash_counter = 0; + my $nonuniq_peer_counter = 0; + my $hash_counter = 0; + my $peer_counter = 0; + + foreach my $key (keys(%{$hashref->{'sync'}})) { + # start merge for every sha1-hash in the sync + my $hash = unpack('H*',$key); + + $hash_counter++; + $nonuniq_hash_counter++ if exists $merged_syncs{$hash}; + + while(${$hashref->{'sync'}}{$key} ne "") + { + # split the value into 8-byte and unpack it for getting peer-socket and flags + my($peer_socket,$flags) = unpackme(substr(${$hashref->{'sync'}}{$key},0,8,'')); + + $peer_counter++; + $nonuniq_peer_counter++ if exists $merged_syncs{$hash}{$peer_socket}; + + # Create a hash table with sha1-hash as key and a hash table as value. + # The hash table in the value has the peer-socket as key and flags as value + # If the entry already exists, the flags are ORed together, if not it is ORed with 0 + $merged_syncs{$hash}{$peer_socket} = $flags | $merged_syncs{$hash}{$peer_socket}; + } + } + print "$hash_counter hashes $nonuniq_hash_counter non-uniq, $peer_counter peers $nonuniq_peer_counter non-uniq.\n"; + +} + +sub test_decode { + my $hashref = shift; + + print "CHANGESET DEBUG OUTPUT\n"; + + print Dumper $hashref; + foreach my $key (keys(%{$hashref->{'sync'}})) { + my $hash = unpack('H*',$key); + + print "Changeset for $hash\n"; + while(${$hashref->{'sync'}}{$key} ne "") + { + + my($peer_socket,$flags) = unpackme(substr(${$hashref->{'sync'}}{$key},0,8,'')); + print "\tSocket: $peer_socket Flags: $flags\n"; + } + } +}