#!/usr/bin/env python3 import os import json import logging import pprint import argparse try: import requests except ImportError: print("Error: requests module is required. apt install python3-requests") exit(1) # Disabling annoying warnings requests.packages.urllib3.disable_warnings( requests.packages.urllib3.exceptions.InsecureRequestWarning) INFO_METHODS = { "RouterInfo": { "i2p.router.uptime": "", "i2p.router.net.status": "", "i2p.router.netdb.knownpeers": "", "i2p.router.netdb.activepeers": "", "i2p.router.net.bw.inbound.1s": "", "i2p.router.net.bw.outbound.1s": "", "i2p.router.net.tunnels.participating": "", "i2p.router.net.tunnels.successrate": "", "i2p.router.net.total.received.bytes": "", "i2p.router.net.total.sent.bytes": "", }, "ClientServicesInfo": { "I2PTunnel": "", "SOCKS": "", "HTTPProxy": "", "SAM": "", "BOB": "", "I2CP": "", } } STATUS = [ "OK", "TESTING", "FIREWALLED", "HIDDEN", "WARN_FIREWALLED_AND_FAST", "WARN_FIREWALLED_AND_FLOODFILL", "WARN_FIREWALLED_WITH_INBOUND_TCP", "WARN_FIREWALLED_WITH_UDP_DISABLED", "ERROR_I2CP", "ERROR_CLOCK_SKEW", "ERROR_PRIVATE_TCP_ADDRESS", "ERROR_SYMMETRIC_NAT", "ERROR_UDP_PORT_IN_USE", "ERROR_NO_ACTIVE_PEERS_CHECK_CONNECTION_AND_FIREWALL", "ERROR_UDP_DISABLED_AND_TCP_UNSET", ] class I2PControl(object): """Talk to I2PControl API""" def __init__(self, url, password='itoopie'): self.url = url self.password = password self._token = None @property def token(self): """Cached authentication token""" if not self._token: self._token = requests.post(self.url, json.dumps({'id': 1, 'method': 'Authenticate', 'params': {'API': 1, 'Password': self.password}, 'jsonrpc': '2.0'}), verify=False).json()["result"]["Token"] return self._token def request(self, method, params): """Execute authenticated request""" return requests.post(self.url, json.dumps({'id': 1, 'method': method, 'params': params, 'jsonrpc': '2.0', 'Token': self.token}), verify=False ) class I2pdctl(object): """i2pd control""" def __init__(self, ctl): self.ctl = ctl def execute(self, args): """Execute raw method""" try: arguments = json.loads(args.arguments) except json.decoder.JSONDecodeError: print("Error: arguments should be a valid JSON object") return resp = self.ctl.request(args.method, arguments) if args.json_output: print(resp.text) elif "result" in resp: pprint.pprint(resp.json()["result"]) else: pprint.pprint(resp.json()) def raw_info(self, args): """Retrieve raw JSON reply from method(s)""" res = {} for m in args.method: if m not in INFO_METHODS: print("Invalid method {}. Supported methods are {}".format(m, ", ".join(INFO_METHODS.keys()))) return else: res[m] = self.ctl.request(m, INFO_METHODS[m]).json()['result'] print(json.dumps(res)) def print_info(self, args): """Print information about a node in a human readable form""" def fancy_title(string): print("\n### {}".format(string)) ri_res = self.ctl.request("RouterInfo", INFO_METHODS["RouterInfo"]).json()['result'] try: csi_res = self.ctl.request("ClientServicesInfo", INFO_METHODS["ClientServicesInfo"]).json()['result'] except KeyError: csi_res = False fancy_title("Router info") print("Uptime: {}".format(ri_res["i2p.router.uptime"])) print("Status: {}".format(STATUS[ri_res["i2p.router.net.status"]])) print("Tunnel creation success rate: {}%".format( ri_res["i2p.router.net.tunnels.successrate"])) print("Received: {} ({} B/s) / Sent: {} ({} B/s)".format( ri_res["i2p.router.net.total.received.bytes"], ri_res["i2p.router.net.bw.inbound.1s"], ri_res["i2p.router.net.total.sent.bytes"], ri_res["i2p.router.net.bw.outbound.1s"])) print("Known routers: {} / Active: {}".format( ri_res["i2p.router.netdb.knownpeers"], ri_res["i2p.router.netdb.activepeers"])) if csi_res: fancy_title("Interfaces") for n in ["HTTPProxy", "SOCKS", "BOB", "SAM", "I2CP"]: print("- {}:".format(n), "ON" if csi_res[n]["enabled"] == "true" else "OFF") if csi_res["I2PTunnel"]["client"]: fancy_title("Client I2P Tunnels") for tunnel in sorted(list(csi_res["I2PTunnel"]["client"].keys())): print("-", tunnel, csi_res["I2PTunnel"]["client"][tunnel]) if csi_res["I2PTunnel"]["server"]: fancy_title("Server I2P Tunnels") for tunnel in sorted(list(csi_res["I2PTunnel"]["server"].keys())): t = csi_res["I2PTunnel"]["server"][tunnel] if "port" in t: print("-", tunnel, "{}:{}".format(t["address"], t["port"])) else: print("-", tunnel, t["address"]) def main(): URL = os.getenv("I2PCONTROL_URL", "https://127.0.0.1:7650/") ctl = I2pdctl(I2PControl(URL)) parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(title="actions",help="Command to execute") exec_parser = subparsers.add_parser("exec", description="Execute RPC method with parameters, return results or raw JSON") exec_parser.add_argument('method', help="RPC method name") exec_parser.add_argument('parameters', help="Parameters as raw JSON string") exec_parser.add_argument('-j', '--json-output', action="store_true", help="Output raw JSON reply") exec_parser.set_defaults(func=ctl.execute) raw_info_parser = subparsers.add_parser("raw_info", description="Retrieve JSON info from specified *Info methods") raw_info_parser.add_argument('method', nargs='*', help="RPC method(s) to retreive info. Supported methods: {}".format(", ".join(INFO_METHODS.keys()))) raw_info_parser.set_defaults(func=ctl.raw_info) print_info_parser = subparsers.add_parser("info", description="Print generic information about node in a human readable form") print_info_parser.set_defaults(func=ctl.print_info) args = parser.parse_args() if hasattr(args, "func"): try: args.func(args) except requests.exceptions.ConnectionError: print("Error: I2PControl URL is unavailable. Check your i2pd settings and network connection.") exit(1) else: parser.print_help() if __name__ == "__main__": main()