diff --git a/README.md b/README.md index ae88b41..6090d49 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,22 @@ # ygg-node-db server to collect DHT views from yggdrasil nodes and store in a data base + + +### send-view.py + +Very simple to use just add in crontab and run once an hour. + +__Access External Admin API:__ +If you want to get access to an external Admin API on another server: + +__example__ +send-view.py 192.168.1.100 9001 + + +## Todo + +maybe some kind of testing for current uploads? +maybe add api token to prevent abuse? +create restrictions on how much data can be sent maybe? +add rate limiting for sends and requests/ rate limit in nginx for requests +add postgress function for alternative use diff --git a/send-view.py b/send-view.py new file mode 100644 index 0000000..ff9da9c --- /dev/null +++ b/send-view.py @@ -0,0 +1,50 @@ +import socket +import json +import sys + +GETDHT = '{"request":"getDHT", "keepalive":true}' +GETSWITCHPEERS = '{"request":"getSwitchPeers"}' +SERVER = "y.yakamo.org" + +#gives the option to get data from an external server instead and send that +#if no options given it will default to localhost instead +if len(sys.argv) == 3: + host_port = (sys.argv[1], int(sys.argv[2])) +else: + host_port = ('localhost', 9001) + +def send_view_to_server(tosend): + if tosend: + attempts = 3 + while attempts: + try: + conn = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + conn.connect((SERVER, 45671)) + conn.send(tosend) + conn.close() + print "sent ok" + break + except: + attempts -= 1 + + +def collect_dht_getswitchpeers(serport): + try: + ygg = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ygg.connect(host_port) + + ygg.send(GETDHT) + dhtdata = json.loads(ygg.recv(1024 * 15)) + + ygg.send(GETSWITCHPEERS) + switchdata = json.loads(ygg.recv(1024 * 15)) + + temp_dict = {} + temp_dict["dhtpack"] = dhtdata + temp_dict["switchpack"] = switchdata + + return json.dumps(temp_dict).encode() + except: + return None + +send_view_to_server(collect_dht_getswitchpeers(host_port)) diff --git a/vserv.py b/vserv.py new file mode 100644 index 0000000..3a3a521 --- /dev/null +++ b/vserv.py @@ -0,0 +1,102 @@ +#server for collecting DHT info + +import json +import sqlite3 +from sqlite3 import Error +import os +import socket +#ipaddress needs to be installed its not part of the standard lib +import ipaddress +import thread + +SERVER = "" +DB_PATH = "vservdb/" + + +def check_coords(coords): + coord_len = coords.replace(' ', '').replace('[', '').replace(']', '') + digits_found = [x for x in coord_len if x.isdigit()] + + if len(digits_found) == len(coord_len): + crus = True + else: + crus = False + return crus + + +def valid_ipv6_check(ipv6add): + try: + if ipaddress.IPv6Address(unicode(ipv6add)): + addr = True + except: + addr = False + return addr + + +def isdatabase(db_path): + if not os.path.exists(db_path + "yggindex.db"): + if not os.path.exists(db_path): + os.makedirs(db_path) + try: + conn = sqlite3.connect(db_path + 'yggindex.db') + c = conn.cursor() + c.execute('''create table yggindex(ipv6 varchar(45) UNIQUE, coords varchar(50), + ut unixtime default (strftime('%s','now')))''') + conn.commit() + except Error as e: + print(e) + finally: + conn.close() + else: + print("found database will not create a new one") + + +def insert_new_entry(db_path, ipv6, coords): + try: + conn = sqlite3.connect(db_path + "yggindex.db") + c = conn.cursor() + conn.execute('''INSERT OR REPLACE INTO yggindex(ipv6, coords) VALUES(?, ?)''',\ + (ipv6, coords)) + conn.commit() + conn.close() + except Error as e: + print e + + +def error_check_insert_into_db(dht, switchpeers): + try: + if dht.get("status") == "success": + for x, y in dht["response"]["dht"].iteritems(): + if valid_ipv6_check(x) and check_coords(y["coords"]): + insert_new_entry(DB_PATH, x, y["coords"]) + + if dht.get("status") == "success": + for x in switchpeers["response"]["switchpeers"].iteritems(): + if valid_ipv6_check(x[1]["ip"]) and check_coords(x[1]["coords"]): + insert_new_entry(DB_PATH, x[1]["ip"], x[1]["coords"]) + except: + print"error in json file, aborting" + + +def proccess_incoming_data(datty, addr): + print addr + try: + ddata = datty.recv(1024 * 20) + data = json.loads(ddata.decode()) + error_check_insert_into_db(data["dhtpack"], data["switchpack"]) + except: + print "ignoring, data was not json" + + +isdatabase(DB_PATH) + +conn = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) +conn.bind((SERVER, 45671)) +conn.listen(30) + +while True: + try: + dataraw, addr = conn.accept() + thread.start_new_thread(proccess_incoming_data, (dataraw, addr)) + except Exception: + print "bloop" diff --git a/yggapi.py b/yggapi.py new file mode 100644 index 0000000..e61ae12 --- /dev/null +++ b/yggapi.py @@ -0,0 +1,81 @@ +from flask import Flask, request +from flask_restful import Resource, Api +from sqlalchemy import create_engine +from json import dumps +#rate limiting support +from flask.ext.jsonpify import jsonify +from flask_limiter import Limiter +from flask_limiter.util import get_remote_address +import time +import sys +import os + +#check if a database exists or not +db_path = "vservdb/yggindex.db" +if not os.path.exists(db_path): + print "could not find a database" + sys.exit(0) + +db_connect = create_engine('sqlite:///' + db_path) +app = Flask(__name__) +api = Api(app) + +limiter = Limiter( + app, + key_func=get_remote_address, + default_limits=["500/day", "60/hour"] +) + +#quickly figure out which is old or new +def age_calc(ustamp): + if (time.time() - ustamp) <= 14400 : + return True + else: + return False + +#active nodes in the past 4hrs +class nodes_current(Resource): + def get(self): + conn = db_connect.connect() + query = conn.execute("select * from yggindex") + nodes = {} + for i in query.cursor.fetchall(): + if age_calc(i[2]): + nodes[i[0]] = [i[1],i[2]] + nodelist = {} + nodelist['yggnodes'] = nodes + return nodelist + +#nodes that may not be active anymore or have been offline for a while such as laptops +#could be used as last seen +class nodes_old(Resource): + def get(self): + conn = db_connect.connect() + query = conn.execute("select * from yggindex") + nodes = {} + for i in query.cursor.fetchall(): + if not age_calc(i[2]): + nodes[i[0]] = [i[1],i[2]] + nodelist = {} + nodelist['yggnodes'] = nodes + return nodelist + +#return entire database of nodes regardless of age +class nodes_all(Resource): + def get(self): + conn = db_connect.connect() + query = conn.execute("select * from yggindex") + nodes = {} + for i in query.cursor.fetchall(): + nodes[i[0]] = [i[1],i[2]] + nodelist = {} + nodelist['yggnodes'] = nodes + return nodelist + +#sort out the api request here for the url +api.add_resource(nodes_current, '/current') +api.add_resource(nodes_old, '/old') +api.add_resource(nodes_all, '/all') + +if __name__ == '__main__': + app.run(host='::', port=3000)