Browse Source

update

master
yakamok 6 years ago committed by GitHub
parent
commit
0bcd54d34c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 94
      api/crawler.py

94
api/crawler.py

@ -1,5 +1,6 @@
#some of this code was contributed by Arcelier #some of this code was contributed by Arcelier
#original code https://github.com/Arceliar/yggdrasil-map/blob/master/scripts/crawl-dht.py #original code https://github.com/Arceliar/yggdrasil-map/blob/master/scripts/crawl-dht.py
#multithreaded by neilalexander
import psycopg2 import psycopg2
import json import json
@ -7,14 +8,49 @@ import socket
import sys import sys
import time import time
import ipaddress import ipaddress
import traceback
from threading import Lock, Thread
from Queue import Queue
class Worker(Thread):
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as e:
pass
finally:
self.tasks.task_done()
class ThreadPool:
def __init__(self, threads):
self.tasks = Queue(128)
for _ in range(threads):
Worker(self.tasks)
def add(self, func, *args, **kargs):
self.tasks.put((func, args, kargs))
def wait(self):
self.tasks.join()
visited = dict() # Add nodes after a successful lookup response visited = dict() # Add nodes after a successful lookup response
rumored = dict() # Add rumors about nodes to ping rumored = dict() # Add rumors about nodes to ping
timedout = dict() timedout = dict()
nodeinfo = dict()
nodeinfomutex = Lock()
nodeinfopool = ThreadPool(30)
host_port = ('localhost', 9001) host_port = ('localhost', 9001)
DB_PASSWORD = "password" DB_PASSWORD = "akemi2501"
DB_USER = "yggindex" DB_USER = "yggindex"
DB_NAME = "yggindex" DB_NAME = "yggindex"
DB_HOST = "localhost" DB_HOST = "localhost"
@ -37,6 +73,14 @@ def getDHTPingRequest(key, coords, target=None):
return '{{"request":"dhtPing", "box_pub_key":"{}", "coords":"{}"}}'.format(key, coords) return '{{"request":"dhtPing", "box_pub_key":"{}", "coords":"{}"}}'.format(key, coords)
def getNodeInfoRequest(key, coords):
return '{{"request":"getNodeInfo", "box_pub_key":"{}", "coords":"{}"}}'.format(key, coords)
def getNodeInfoTask(address, info):
handleNodeInfo(address, doRequest(getNodeInfoRequest(info['box_pub_key'], info['coords'])))
def doRequest(req): def doRequest(req):
try: try:
ygg = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ygg = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -70,20 +114,41 @@ def valid_ipv6_check(ipv6add):
def insert_new_entry(ipv6, coords): def insert_new_entry(ipv6, coords):
try: try:
dbconn = psycopg2.connect(host=DB_HOST,database=DB_NAME, user=DB_USER, password=DB_PASSWORD) nodename = ""
nodejson = "{}"
if ipv6 in nodeinfo:
with nodeinfomutex:
nodejson = json.dumps(nodeinfo[ipv6])
nodename = nodeinfo[ipv6]["name"] if "name" in nodeinfo[ipv6] else ""
dbconn = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
cur = dbconn.cursor() cur = dbconn.cursor()
cur.execute(
cur.execute('''INSERT INTO yggindex(ipv6, coords, unixtstamp)\ "INSERT INTO yggindex (ipv6, coords, unixtstamp, name) VALUES(%s, %s, %s, %s) ON CONFLICT (ipv6) DO UPDATE SET unixtstamp=%s, coords=%s, name=%s;",
VALUES(''' + "'" + ipv6 + "'," + "'" + coords + "'," + str(int(time.time())) + ''')\ (ipv6, coords, str(int(time.time())), nodename, str(int(time.time())), coords, nodename)
ON CONFLICT (ipv6) DO UPDATE\ )
SET unixtstamp = ''' + "'" + str(int(time.time())) + "'," +''' \ cur.execute(
coords = ''' + "'" + coords + "'" + ''';''') "INSERT INTO yggnodeinfo (ipv6, nodeinfo, timestamp) VALUES(%s, %s, NOW()) ON CONFLICT (ipv6) DO UPDATE SET nodeinfo=%s, timestamp=NOW();",
(ipv6, nodejson, nodejson)
)
dbconn.commit() dbconn.commit()
cur.close() cur.close()
dbconn.close() dbconn.close()
except: except Exception as e:
print "database error inserting" print("database error inserting")
traceback.print_exc()
def handleNodeInfo(address, data):
global nodeinfo
with nodeinfomutex:
nodeinfo[str(address)] = {}
if not data:
return
if 'response' not in data:
return
if 'nodeinfo' not in data['response']:
return
nodeinfo[str(address)] = data['response']['nodeinfo']
def handleResponse(address, info, data): def handleResponse(address, info, data):
global visited global visited
@ -103,9 +168,10 @@ def handleResponse(address, info, data):
rumored[addr] = rumor rumored[addr] = rumor
if address not in visited: if address not in visited:
visited[str(address)] = info['coords'] visited[str(address)] = info['coords']
if address in timedout: if address in timedout:
del timedout[address] del timedout[address]
nodeinfopool.add(getNodeInfoTask, address, info)
# Get self info # Get self info
selfInfo = doRequest('{"request":"getSelf"}') selfInfo = doRequest('{"request":"getSelf"}')
@ -118,10 +184,12 @@ for k,v in selfInfo['response']['self'].iteritems():
while len(rumored) > 0: while len(rumored) > 0:
for k,v in rumored.iteritems(): for k,v in rumored.iteritems():
handleResponse(k, v, doRequest(getDHTPingRequest(v['box_pub_key'], v['coords']))) handleResponse(k, v, doRequest(getDHTPingRequest(v['box_pub_key'], v['coords'])))
break break
del rumored[k] del rumored[k]
#End #End
nodeinfopool.wait()
for x, y in visited.iteritems(): for x, y in visited.iteritems():
if valid_ipv6_check(x) and check_coords(y): if valid_ipv6_check(x) and check_coords(y):
insert_new_entry(x, y) insert_new_entry(x, y)

Loading…
Cancel
Save