#!/usr/bin/env python3 import os import json import pprint import argparse import datetime import ssl import urllib.request import urllib.parse import urllib.error INFO_METHODS = { "RouterInfo": { "i2p.router.uptime": "", "i2p.router.net.status": "", "i2p.router.netdb.knownpeers": "", "i2p.router.netdb.activepeers": "", "i2p.router.net.bw.inbound.1s": "", "i2p.router.net.bw.outbound.1s": "", "i2p.router.net.tunnels.participating": "", "i2p.router.net.tunnels.successrate": "", "i2p.router.net.total.received.bytes": "", "i2p.router.net.total.sent.bytes": "", }, "ClientServicesInfo": { "I2PTunnel": "", "SOCKS": "", "HTTPProxy": "", "SAM": "", "BOB": "", "I2CP": "", } } STATUS = [ "OK", "TESTING", "FIREWALLED", "HIDDEN", "WARN_FIREWALLED_AND_FAST", "WARN_FIREWALLED_AND_FLOODFILL", "WARN_FIREWALLED_WITH_INBOUND_TCP", "WARN_FIREWALLED_WITH_UDP_DISABLED", "ERROR_I2CP", "ERROR_CLOCK_SKEW", "ERROR_PRIVATE_TCP_ADDRESS", "ERROR_SYMMETRIC_NAT", "ERROR_UDP_PORT_IN_USE", "ERROR_NO_ACTIVE_PEERS_CHECK_CONNECTION_AND_FIREWALL", "ERROR_UDP_DISABLED_AND_TCP_UNSET", ] SUFFIXES = ['B','KB','MB','GB','TB'] def humanize_size(size, precision=2): """Bytes to human readable size""" suffixIndex = 0 while size > 1024: suffixIndex += 1 size = size / 1024.0 return "{}{}".format(round(size ,precision), SUFFIXES[suffixIndex]) def humanize_time(milliseconds): """Seconds to human readable time""" return str(datetime.timedelta(milliseconds=milliseconds)) def do_post(url, data): req = urllib.request.Request(url, data=data.encode()) with urllib.request.urlopen(req, context=ssl._create_unverified_context()) as f: resp = f.read().decode('utf-8') return json.loads(resp) class I2PControl(object): """Talk to I2PControl API""" def __init__(self, url, password='itoopie'): self.url = url self.password = password self._token = None @property def token(self): """Cached authentication token""" if not self._token: self._token = do_post(self.url, json.dumps({'id': 1, 'method': 'Authenticate', 'params': {'API': 1, 'Password': self.password}, 'jsonrpc': '2.0'}))["result"]["Token"] return self._token def request(self, method, params): """Execute authenticated request""" return do_post(self.url, json.dumps({'id': 1, 'method': method, 'params': params, 'jsonrpc': '2.0', 'Token': self.token})) class I2pdctl(object): """i2pd control""" def __init__(self, ctl): self.ctl = ctl def execute(self, args): """Execute raw method""" try: parameters = json.loads(args.parameters) except json.decoder.JSONDecodeError: print("Error: parameters should be a valid JSON object") return resp = self.ctl.request(args.method, parameters) if args.json_output: print(resp.text) elif "result" in resp: pprint.pprint(resp["result"]) else: pprint.pprint(resp) def raw_info(self, args): """Retrieve raw JSON reply from method(s)""" res = {} for m in args.method: if m not in INFO_METHODS: print("Invalid method {}. Supported methods are {}".format(m, ", ".join(INFO_METHODS.keys()))) return else: res[m] = self.ctl.request(m, INFO_METHODS[m])['result'] print(json.dumps(res)) def print_info(self, args): """Print information about a node in a human readable form""" def fancy_title(string): print("\n### {}".format(string)) ri_res = self.ctl.request("RouterInfo", INFO_METHODS["RouterInfo"])['result'] try: csi_res = self.ctl.request("ClientServicesInfo", INFO_METHODS["ClientServicesInfo"])['result'] except KeyError: csi_res = False fancy_title("Router info") print("Uptime: {}".format( humanize_time(int(ri_res["i2p.router.uptime"])))) print("Status: {}".format(STATUS[ri_res["i2p.router.net.status"]])) print("Tunnel creation success rate: {}%".format( ri_res["i2p.router.net.tunnels.successrate"])) print("Received: {} ({} B/s) / Sent: {} ({} B/s)".format( humanize_size(int(ri_res["i2p.router.net.total.received.bytes"])), humanize_size(int(ri_res["i2p.router.net.bw.inbound.1s"])), humanize_size(int(ri_res["i2p.router.net.total.sent.bytes"])), humanize_size(int(ri_res["i2p.router.net.bw.outbound.1s"])))) print("Known routers: {} / Active: {}".format( ri_res["i2p.router.netdb.knownpeers"], ri_res["i2p.router.netdb.activepeers"])) if csi_res: fancy_title("Interfaces") for n in ["HTTPProxy", "SOCKS", "BOB", "SAM", "I2CP"]: print("- {}:".format(n), "ON" if csi_res[n]["enabled"] == "true" else "OFF") if csi_res["I2PTunnel"]["client"]: fancy_title("Client I2P Tunnels") for tunnel in sorted(list(csi_res["I2PTunnel"]["client"].keys())): print("-", tunnel, csi_res["I2PTunnel"]["client"][tunnel]) if csi_res["I2PTunnel"]["server"]: fancy_title("Server I2P Tunnels") for tunnel in sorted(list(csi_res["I2PTunnel"]["server"].keys())): t = csi_res["I2PTunnel"]["server"][tunnel] if "port" in t: print("-", tunnel, "{}:{}".format(t["address"], t["port"])) else: print("-", tunnel, t["address"]) def main(): URL = os.getenv("I2PCONTROL_URL", "https://127.0.0.1:7650/") PASSWORD = os.getenv("I2PCONTROL_PASSWORD", "itoopie") ctl = I2pdctl(I2PControl(URL, PASSWORD)) parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(title="actions",help="Command to execute") exec_parser = subparsers.add_parser("exec", description="Execute RPC method with parameters, return results or raw JSON") exec_parser.add_argument('method', help="RPC method name") exec_parser.add_argument('parameters', help="Parameters as raw JSON string") exec_parser.add_argument('-j', '--json-output', action="store_true", help="Output raw JSON reply") exec_parser.set_defaults(func=ctl.execute) raw_info_parser = subparsers.add_parser("raw_info", description="Retrieve JSON info from specified *Info methods") raw_info_parser.add_argument('method', nargs='*', help="RPC method(s) to retreive info. Supported methods: {}".format(", ".join(INFO_METHODS.keys()))) raw_info_parser.set_defaults(func=ctl.raw_info) print_info_parser = subparsers.add_parser("info", description="Print generic information about node in a human readable form") print_info_parser.set_defaults(func=ctl.print_info) args = parser.parse_args() if hasattr(args, "func"): try: args.func(args) except urllib.error.URLError: print("Error: I2PControl URL is unavailable. Check your i2pd settings and network connection.") exit(1) else: parser.print_help() if __name__ == "__main__": main()