@ -1,6 +1,8 @@
@@ -1,6 +1,8 @@
#some of this code was contributed by Arcelier
#original code https://github.com/Arceliar/yggdrasil-map/blob/master/scripts/crawl-dht.py
#multithreaded by neilalexander
#!/usr/bin/env python
# some of this code was contributed by Arcelier
# original code https://github.com/Arceliar/yggdrasil-map/blob/master/scripts/crawl-dht.py
# multithreaded by neilalexander
import psycopg2
import json
@ -12,6 +14,20 @@ import traceback
@@ -12,6 +14,20 @@ import traceback
from threading import Lock , Thread
from Queue import Queue
# Configuration to use TCP connection or unix domain socket for admin connection to yggdrasil
useAdminSock = True
yggAdminTCP = ( ' localhost ' , 9001 )
yggAdminSock = ( ' /var/run/yggdrasil.sock ' )
DB_PASSWORD = " password "
DB_USER = " yggindex "
DB_NAME = " yggindex "
DB_HOST = " localhost "
## Save in database node info fields like buildname, buildarch, buildplatform, buildversion (True/False)?
saveDefaultNodeInfo = False
removableFileds = [ ' buildname ' , ' buildarch ' , ' buildplatform ' , ' buildversion ' ]
class Worker ( Thread ) :
def __init__ ( self , tasks ) :
Thread . __init__ ( self )
@ -48,13 +64,6 @@ nodeinfo = dict()
@@ -48,13 +64,6 @@ nodeinfo = dict()
nodeinfomutex = Lock ( )
nodeinfopool = ThreadPool ( 30 )
host_port = ( ' localhost ' , 9001 )
DB_PASSWORD = " akemi2501 "
DB_USER = " yggindex "
DB_NAME = " yggindex "
DB_HOST = " localhost "
def recv_until_done ( soc ) :
all_data = [ ]
while True :
@ -83,8 +92,13 @@ def getNodeInfoTask(address, info):
@@ -83,8 +92,13 @@ def getNodeInfoTask(address, info):
def doRequest ( req ) :
try :
if useAdminSock :
ygg = socket . socket ( socket . AF_UNIX , socket . SOCK_STREAM )
ygg . connect ( yggAdminSock )
else :
ygg = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
ygg . connect ( host_port )
ygg . connect ( yggAdminTCP )
ygg . send ( req )
data = json . loads ( recv_until_done ( ygg ) )
return data
@ -118,23 +132,33 @@ def insert_new_entry(ipv6, coords):
@@ -118,23 +132,33 @@ def insert_new_entry(ipv6, coords):
nodejson = " {} "
if ipv6 in nodeinfo :
with nodeinfomutex :
if not saveDefaultNodeInfo :
# remove default Node info fields
for field in removableFileds :
tmprm = nodeinfo [ ipv6 ] . pop ( field , None )
nodejson = json . dumps ( nodeinfo [ ipv6 ] )
nodename = nodeinfo [ ipv6 ] [ " name " ] if " name " in nodeinfo [ ipv6 ] else " "
dbconn = psycopg2 . connect ( host = DB_HOST , database = DB_NAME , user = DB_USER , password = DB_PASSWORD )
cur = dbconn . cursor ( )
timestamp = str ( int ( time . time ( ) ) )
cur . execute (
" INSERT INTO yggindex (ipv6, coords, unixtstamp, name) VALUES( %s , %s , %s , %s ) ON CONFLICT (ipv6) DO UPDATE SET unixtstamp= %s , coords= %s , name= %s ; " ,
( ipv6 , coords , str ( int ( time . time ( ) ) ) , nodename , str ( int ( time . time ( ) ) ) , coords , nodename )
( ipv6 , coords , timestamp , nodename , timestamp , coords , nodename )
)
cur . execute (
" INSERT INTO yggnodeinfo (ipv6, nodeinfo, timestamp) VALUES( %s , %s , NOW() ) ON CONFLICT (ipv6) DO UPDATE SET nodeinfo= %s , timestamp=NOW() ; " ,
( ipv6 , nodejson , nodejson )
" INSERT INTO yggnodeinfo (ipv6, nodeinfo, timestamp) VALUES( %s , %s , %s ) ON CONFLICT (ipv6) DO UPDATE SET nodeinfo=%s , timestamp= %s ;" ,
( ipv6 , nodejson , timestamp , nodejson , timestamp )
)
dbconn . commit ( )
cur . close ( )
dbconn . close ( )
except Exception as e :
print ( " database error inserting " )
print " database error inserting "
traceback . print_exc ( )
def handleNodeInfo ( address , data ) :
@ -183,6 +207,7 @@ for k,v in selfInfo['response']['self'].iteritems():
@@ -183,6 +207,7 @@ for k,v in selfInfo['response']['self'].iteritems():
# Loop over rumored nodes and ping them, adding to visited if they respond
while len ( rumored ) > 0 :
for k , v in rumored . iteritems ( ) :
#print "Processing", v['coords']
handleResponse ( k , v , doRequest ( getDHTPingRequest ( v [ ' box_pub_key ' ] , v [ ' coords ' ] ) ) )
break
del rumored [ k ]