Jeff Becker
7 years ago
3 changed files with 225 additions and 2 deletions
@ -0,0 +1,220 @@ |
|||||||
|
#!/usr/bin/env python3 |
||||||
|
import os |
||||||
|
import json |
||||||
|
import logging |
||||||
|
import pprint |
||||||
|
import argparse |
||||||
|
import datetime |
||||||
|
|
||||||
|
try: |
||||||
|
import requests |
||||||
|
except ImportError: |
||||||
|
print("Error: requests module is required. apt install python3-requests") |
||||||
|
exit(1) |
||||||
|
|
||||||
|
# Disabling annoying warnings |
||||||
|
requests.packages.urllib3.disable_warnings( |
||||||
|
requests.packages.urllib3.exceptions.InsecureRequestWarning) |
||||||
|
|
||||||
|
INFO_METHODS = { |
||||||
|
"RouterInfo": { |
||||||
|
"i2p.router.uptime": "", |
||||||
|
"i2p.router.net.status": "", |
||||||
|
"i2p.router.netdb.knownpeers": "", |
||||||
|
"i2p.router.netdb.activepeers": "", |
||||||
|
"i2p.router.net.bw.inbound.1s": "", |
||||||
|
"i2p.router.net.bw.outbound.1s": "", |
||||||
|
"i2p.router.net.tunnels.participating": "", |
||||||
|
"i2p.router.net.tunnels.successrate": "", |
||||||
|
"i2p.router.net.total.received.bytes": "", |
||||||
|
"i2p.router.net.total.sent.bytes": "", |
||||||
|
}, |
||||||
|
"ClientServicesInfo": { |
||||||
|
"I2PTunnel": "", |
||||||
|
"SOCKS": "", |
||||||
|
"HTTPProxy": "", |
||||||
|
"SAM": "", |
||||||
|
"BOB": "", |
||||||
|
"I2CP": "", |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
STATUS = [ |
||||||
|
"OK", |
||||||
|
"TESTING", |
||||||
|
"FIREWALLED", |
||||||
|
"HIDDEN", |
||||||
|
"WARN_FIREWALLED_AND_FAST", |
||||||
|
"WARN_FIREWALLED_AND_FLOODFILL", |
||||||
|
"WARN_FIREWALLED_WITH_INBOUND_TCP", |
||||||
|
"WARN_FIREWALLED_WITH_UDP_DISABLED", |
||||||
|
"ERROR_I2CP", |
||||||
|
"ERROR_CLOCK_SKEW", |
||||||
|
"ERROR_PRIVATE_TCP_ADDRESS", |
||||||
|
"ERROR_SYMMETRIC_NAT", |
||||||
|
"ERROR_UDP_PORT_IN_USE", |
||||||
|
"ERROR_NO_ACTIVE_PEERS_CHECK_CONNECTION_AND_FIREWALL", |
||||||
|
"ERROR_UDP_DISABLED_AND_TCP_UNSET", |
||||||
|
] |
||||||
|
|
||||||
|
SUFFIXES = ['B','KB','MB','GB','TB'] |
||||||
|
|
||||||
|
def humanize_size(size, precision=2): |
||||||
|
"""Bytes to human readable size""" |
||||||
|
suffixIndex = 0 |
||||||
|
while size > 1024: |
||||||
|
suffixIndex += 1 |
||||||
|
size = size / 1024.0 |
||||||
|
return "{}{}".format(round(size ,precision), SUFFIXES[suffixIndex]) |
||||||
|
|
||||||
|
def humanize_time(milliseconds): |
||||||
|
"""Seconds to human readable time""" |
||||||
|
return str(datetime.timedelta(milliseconds=milliseconds)) |
||||||
|
|
||||||
|
class I2PControl(object): |
||||||
|
"""Talk to I2PControl API""" |
||||||
|
|
||||||
|
def __init__(self, url, password='itoopie'): |
||||||
|
self.url = url |
||||||
|
self.password = password |
||||||
|
self._token = None |
||||||
|
|
||||||
|
@property |
||||||
|
def token(self): |
||||||
|
"""Cached authentication token""" |
||||||
|
if not self._token: |
||||||
|
self._token = requests.post(self.url, |
||||||
|
json.dumps({'id': 1, 'method': 'Authenticate', |
||||||
|
'params': {'API': 1, 'Password': self.password}, |
||||||
|
'jsonrpc': '2.0'}), |
||||||
|
verify=False).json()["result"]["Token"] |
||||||
|
return self._token |
||||||
|
|
||||||
|
def request(self, method, params): |
||||||
|
"""Execute authenticated request""" |
||||||
|
return requests.post(self.url, |
||||||
|
json.dumps({'id': 1, 'method': method, 'params': params, |
||||||
|
'jsonrpc': '2.0', 'Token': self.token}), |
||||||
|
verify=False |
||||||
|
) |
||||||
|
|
||||||
|
class I2pdctl(object): |
||||||
|
"""i2pd control""" |
||||||
|
|
||||||
|
def __init__(self, ctl): |
||||||
|
self.ctl = ctl |
||||||
|
|
||||||
|
def execute(self, args): |
||||||
|
"""Execute raw method""" |
||||||
|
try: |
||||||
|
parameters = json.loads(args.parameters) |
||||||
|
except json.decoder.JSONDecodeError: |
||||||
|
print("Error: parameters should be a valid JSON object") |
||||||
|
return |
||||||
|
|
||||||
|
resp = self.ctl.request(args.method, parameters) |
||||||
|
|
||||||
|
if args.json_output: |
||||||
|
print(resp.text) |
||||||
|
elif "result" in resp: |
||||||
|
pprint.pprint(resp.json()["result"]) |
||||||
|
else: |
||||||
|
pprint.pprint(resp.json()) |
||||||
|
|
||||||
|
def raw_info(self, args): |
||||||
|
"""Retrieve raw JSON reply from method(s)""" |
||||||
|
res = {} |
||||||
|
for m in args.method: |
||||||
|
if m not in INFO_METHODS: |
||||||
|
print("Invalid method {}. Supported methods are {}".format(m, |
||||||
|
", ".join(INFO_METHODS.keys()))) |
||||||
|
return |
||||||
|
else: |
||||||
|
res[m] = self.ctl.request(m, INFO_METHODS[m]).json()['result'] |
||||||
|
|
||||||
|
print(json.dumps(res)) |
||||||
|
|
||||||
|
|
||||||
|
def print_info(self, args): |
||||||
|
"""Print information about a node in a human readable form""" |
||||||
|
def fancy_title(string): |
||||||
|
print("\n### {}".format(string)) |
||||||
|
|
||||||
|
ri_res = self.ctl.request("RouterInfo", INFO_METHODS["RouterInfo"]).json()['result'] |
||||||
|
try: |
||||||
|
csi_res = self.ctl.request("ClientServicesInfo", INFO_METHODS["ClientServicesInfo"]).json()['result'] |
||||||
|
except KeyError: |
||||||
|
csi_res = False |
||||||
|
|
||||||
|
fancy_title("Router info") |
||||||
|
print("Uptime: {}".format( |
||||||
|
humanize_time(int(ri_res["i2p.router.uptime"])))) |
||||||
|
print("Status: {}".format(STATUS[ri_res["i2p.router.net.status"]])) |
||||||
|
print("Tunnel creation success rate: {}%".format( |
||||||
|
ri_res["i2p.router.net.tunnels.successrate"])) |
||||||
|
print("Received: {} ({} B/s) / Sent: {} ({} B/s)".format( |
||||||
|
humanize_size(int(ri_res["i2p.router.net.total.received.bytes"])), |
||||||
|
humanize_size(int(ri_res["i2p.router.net.bw.inbound.1s"])), |
||||||
|
humanize_size(int(ri_res["i2p.router.net.total.sent.bytes"])), |
||||||
|
humanize_size(int(ri_res["i2p.router.net.bw.outbound.1s"])))) |
||||||
|
print("Known routers: {} / Active: {}".format( |
||||||
|
ri_res["i2p.router.netdb.knownpeers"], |
||||||
|
ri_res["i2p.router.netdb.activepeers"])) |
||||||
|
|
||||||
|
if csi_res: |
||||||
|
fancy_title("Interfaces") |
||||||
|
for n in ["HTTPProxy", "SOCKS", "BOB", "SAM", "I2CP"]: |
||||||
|
print("- {}:".format(n), "ON" if csi_res[n]["enabled"] == "true" else "OFF") |
||||||
|
|
||||||
|
if csi_res["I2PTunnel"]["client"]: |
||||||
|
fancy_title("Client I2P Tunnels") |
||||||
|
for tunnel in sorted(list(csi_res["I2PTunnel"]["client"].keys())): |
||||||
|
print("-", tunnel, csi_res["I2PTunnel"]["client"][tunnel]) |
||||||
|
|
||||||
|
if csi_res["I2PTunnel"]["server"]: |
||||||
|
fancy_title("Server I2P Tunnels") |
||||||
|
for tunnel in sorted(list(csi_res["I2PTunnel"]["server"].keys())): |
||||||
|
t = csi_res["I2PTunnel"]["server"][tunnel] |
||||||
|
if "port" in t: |
||||||
|
print("-", tunnel, "{}:{}".format(t["address"], t["port"])) |
||||||
|
else: |
||||||
|
print("-", tunnel, t["address"]) |
||||||
|
|
||||||
|
|
||||||
|
def main(): |
||||||
|
URL = os.getenv("I2PCONTROL_URL", "https://127.0.0.1:7650/") |
||||||
|
PASSWORD = os.getenv("I2PCONTROL_PASSWORD", "itoopie") |
||||||
|
|
||||||
|
ctl = I2pdctl(I2PControl(URL, PASSWORD)) |
||||||
|
|
||||||
|
parser = argparse.ArgumentParser() |
||||||
|
|
||||||
|
subparsers = parser.add_subparsers(title="actions",help="Command to execute") |
||||||
|
|
||||||
|
exec_parser = subparsers.add_parser("exec", description="Execute RPC method with parameters, return results or raw JSON") |
||||||
|
exec_parser.add_argument('method', help="RPC method name") |
||||||
|
exec_parser.add_argument('parameters', help="Parameters as raw JSON string") |
||||||
|
exec_parser.add_argument('-j', '--json-output', action="store_true", help="Output raw JSON reply") |
||||||
|
exec_parser.set_defaults(func=ctl.execute) |
||||||
|
|
||||||
|
raw_info_parser = subparsers.add_parser("raw_info", description="Retrieve JSON info from specified *Info methods") |
||||||
|
raw_info_parser.add_argument('method', nargs='*', |
||||||
|
help="RPC method(s) to retreive info. Supported methods: {}".format(", ".join(INFO_METHODS.keys()))) |
||||||
|
raw_info_parser.set_defaults(func=ctl.raw_info) |
||||||
|
|
||||||
|
print_info_parser = subparsers.add_parser("info", description="Print generic information about node in a human readable form") |
||||||
|
print_info_parser.set_defaults(func=ctl.print_info) |
||||||
|
|
||||||
|
args = parser.parse_args() |
||||||
|
|
||||||
|
if hasattr(args, "func"): |
||||||
|
try: |
||||||
|
args.func(args) |
||||||
|
except requests.exceptions.ConnectionError: |
||||||
|
print("Error: I2PControl URL is unavailable. Check your i2pd settings and network connection.") |
||||||
|
exit(1) |
||||||
|
else: |
||||||
|
parser.print_help() |
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
main() |
Loading…
Reference in new issue