diff --git a/api/crawler.py b/api/crawler.py index 8ccdc3f..3bf00f1 100644 --- a/api/crawler.py +++ b/api/crawler.py @@ -1,5 +1,6 @@ #some of this code was contributed by Arcelier #original code https://github.com/Arceliar/yggdrasil-map/blob/master/scripts/crawl-dht.py +#multithreaded by neilalexander import psycopg2 import json @@ -7,14 +8,49 @@ import socket import sys import time import ipaddress +import traceback +from threading import Lock, Thread +from Queue import Queue + +class Worker(Thread): + def __init__(self, tasks): + Thread.__init__(self) + self.tasks = tasks + self.daemon = True + self.start() + + def run(self): + while True: + func, args, kargs = self.tasks.get() + try: + func(*args, **kargs) + except Exception as e: + pass + finally: + self.tasks.task_done() + +class ThreadPool: + def __init__(self, threads): + self.tasks = Queue(128) + for _ in range(threads): + Worker(self.tasks) + + def add(self, func, *args, **kargs): + self.tasks.put((func, args, kargs)) + + def wait(self): + self.tasks.join() visited = dict() # Add nodes after a successful lookup response rumored = dict() # Add rumors about nodes to ping timedout = dict() +nodeinfo = dict() +nodeinfomutex = Lock() +nodeinfopool = ThreadPool(30) host_port = ('localhost', 9001) -DB_PASSWORD = "password" +DB_PASSWORD = "akemi2501" DB_USER = "yggindex" DB_NAME = "yggindex" DB_HOST = "localhost" @@ -37,6 +73,14 @@ def getDHTPingRequest(key, coords, target=None): return '{{"request":"dhtPing", "box_pub_key":"{}", "coords":"{}"}}'.format(key, coords) +def getNodeInfoRequest(key, coords): + return '{{"request":"getNodeInfo", "box_pub_key":"{}", "coords":"{}"}}'.format(key, coords) + + +def getNodeInfoTask(address, info): + handleNodeInfo(address, doRequest(getNodeInfoRequest(info['box_pub_key'], info['coords']))) + + def doRequest(req): try: ygg = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -70,20 +114,41 @@ def valid_ipv6_check(ipv6add): def insert_new_entry(ipv6, coords): try: - dbconn = psycopg2.connect(host=DB_HOST,database=DB_NAME, user=DB_USER, password=DB_PASSWORD) + nodename = "" + nodejson = "{}" + if ipv6 in nodeinfo: + with nodeinfomutex: + nodejson = json.dumps(nodeinfo[ipv6]) + nodename = nodeinfo[ipv6]["name"] if "name" in nodeinfo[ipv6] else "" + dbconn = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD) cur = dbconn.cursor() - - cur.execute('''INSERT INTO yggindex(ipv6, coords, unixtstamp)\ - VALUES(''' + "'" + ipv6 + "'," + "'" + coords + "'," + str(int(time.time())) + ''')\ - ON CONFLICT (ipv6) DO UPDATE\ - SET unixtstamp = ''' + "'" + str(int(time.time())) + "'," +''' \ - coords = ''' + "'" + coords + "'" + ''';''') - + cur.execute( + "INSERT INTO yggindex (ipv6, coords, unixtstamp, name) VALUES(%s, %s, %s, %s) ON CONFLICT (ipv6) DO UPDATE SET unixtstamp=%s, coords=%s, name=%s;", + (ipv6, coords, str(int(time.time())), nodename, str(int(time.time())), coords, nodename) + ) + cur.execute( + "INSERT INTO yggnodeinfo (ipv6, nodeinfo, timestamp) VALUES(%s, %s, NOW()) ON CONFLICT (ipv6) DO UPDATE SET nodeinfo=%s, timestamp=NOW();", + (ipv6, nodejson, nodejson) + ) dbconn.commit() cur.close() dbconn.close() - except: - print "database error inserting" + except Exception as e: + print("database error inserting") + traceback.print_exc() + +def handleNodeInfo(address, data): + global nodeinfo + + with nodeinfomutex: + nodeinfo[str(address)] = {} + if not data: + return + if 'response' not in data: + return + if 'nodeinfo' not in data['response']: + return + nodeinfo[str(address)] = data['response']['nodeinfo'] def handleResponse(address, info, data): global visited @@ -103,9 +168,10 @@ def handleResponse(address, info, data): rumored[addr] = rumor if address not in visited: visited[str(address)] = info['coords'] - if address in timedout: + if address in timedout: del timedout[address] + nodeinfopool.add(getNodeInfoTask, address, info) # Get self info selfInfo = doRequest('{"request":"getSelf"}') @@ -118,10 +184,12 @@ for k,v in selfInfo['response']['self'].iteritems(): while len(rumored) > 0: for k,v in rumored.iteritems(): handleResponse(k, v, doRequest(getDHTPingRequest(v['box_pub_key'], v['coords']))) - break + break del rumored[k] #End +nodeinfopool.wait() + for x, y in visited.iteritems(): if valid_ipv6_check(x) and check_coords(y): insert_new_entry(x, y)