diff --git a/scripts/i2pdctl b/scripts/i2pdctl new file mode 100755 index 0000000..7d926ae --- /dev/null +++ b/scripts/i2pdctl @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +import os +import json +import logging +import pprint +import argparse + +try: + import requests +except ImportError: + print("Error: requests module is required. apt install python3-requests") + exit(1) + +# Disabling annoying warnings +requests.packages.urllib3.disable_warnings( + requests.packages.urllib3.exceptions.InsecureRequestWarning) + +INFO_METHODS = { + "RouterInfo": { + "i2p.router.uptime": "", + "i2p.router.net.status": "", + "i2p.router.netdb.knownpeers": "", + "i2p.router.netdb.activepeers": "", + "i2p.router.net.bw.inbound.1s": "", + "i2p.router.net.bw.outbound.1s": "", + "i2p.router.net.tunnels.participating": "", + "i2p.router.net.tunnels.successrate": "", + "i2p.router.net.total.received.bytes": "", + "i2p.router.net.total.sent.bytes": "", + }, + "ClientServicesInfo": { + "I2PTunnel": "", + "SOCKS": "", + "HTTPProxy": "", + "SAM": "", + "BOB": "", + "I2CP": "", + } +} + +STATUS = [ + "OK", + "TESTING", + "FIREWALLED", + "HIDDEN", + "WARN_FIREWALLED_AND_FAST", + "WARN_FIREWALLED_AND_FLOODFILL", + "WARN_FIREWALLED_WITH_INBOUND_TCP", + "WARN_FIREWALLED_WITH_UDP_DISABLED", + "ERROR_I2CP", + "ERROR_CLOCK_SKEW", + "ERROR_PRIVATE_TCP_ADDRESS", + "ERROR_SYMMETRIC_NAT", + "ERROR_UDP_PORT_IN_USE", + "ERROR_NO_ACTIVE_PEERS_CHECK_CONNECTION_AND_FIREWALL", + "ERROR_UDP_DISABLED_AND_TCP_UNSET", +] + +class I2PControl(object): + """Talk to I2PControl API""" + + def __init__(self, url, password='itoopie'): + self.url = url + self.password = password + self._token = None + + @property + def token(self): + """Cached authentication token""" + if not self._token: + self._token = requests.post(self.url, + json.dumps({'id': 1, 'method': 'Authenticate', + 'params': {'API': 1, 'Password': self.password}, + 'jsonrpc': '2.0'}), + verify=False).json()["result"]["Token"] + return self._token + + def request(self, method, params): + """Execute authenticated request""" + return requests.post(self.url, + json.dumps({'id': 1, 'method': method, 'params': params, + 'jsonrpc': '2.0', 'Token': self.token}), + verify=False + ) + +class I2pdctl(object): + """i2pd control""" + + def __init__(self, ctl): + self.ctl = ctl + + def execute(self, args): + """Execute raw method""" + try: + arguments = json.loads(args.arguments) + except json.decoder.JSONDecodeError: + print("Error: arguments should be a valid JSON object") + return + + resp = self.ctl.request(args.method, arguments) + + if args.json_output: + print(resp.text) + elif "result" in resp: + pprint.pprint(resp.json()["result"]) + else: + pprint.pprint(resp.json()) + + def raw_info(self, args): + """Retrieve raw JSON reply from method(s)""" + res = {} + for m in args.method: + if m not in INFO_METHODS: + print("Invalid method {}. Supported methods are {}".format(m, + ", ".join(INFO_METHODS.keys()))) + return + else: + res[m] = self.ctl.request(m, INFO_METHODS[m]).json()['result'] + + print(json.dumps(res)) + + + def print_info(self, args): + """Print information about a node in a human readable form""" + def fancy_title(string): + print("\n### {}".format(string)) + + ri_res = self.ctl.request("RouterInfo", INFO_METHODS["RouterInfo"]).json()['result'] + try: + csi_res = self.ctl.request("ClientServicesInfo", INFO_METHODS["ClientServicesInfo"]).json()['result'] + except KeyError: + csi_res = False + + fancy_title("Router info") + print("Uptime: {}".format(ri_res["i2p.router.uptime"])) + print("Status: {}".format(STATUS[ri_res["i2p.router.net.status"]])) + print("Tunnel creation success rate: {}%".format( + ri_res["i2p.router.net.tunnels.successrate"])) + print("Received: {} ({} B/s) / Sent: {} ({} B/s)".format( + ri_res["i2p.router.net.total.received.bytes"], + ri_res["i2p.router.net.bw.inbound.1s"], + ri_res["i2p.router.net.total.sent.bytes"], + ri_res["i2p.router.net.bw.outbound.1s"])) + print("Known routers: {} / Active: {}".format( + ri_res["i2p.router.netdb.knownpeers"], + ri_res["i2p.router.netdb.activepeers"])) + + if csi_res: + fancy_title("Interfaces") + for n in ["HTTPProxy", "SOCKS", "BOB", "SAM", "I2CP"]: + print("- {}:".format(n), "ON" if csi_res[n]["enabled"] == "true" else "OFF") + + if csi_res["I2PTunnel"]["client"]: + fancy_title("Client I2P Tunnels") + for tunnel in sorted(list(csi_res["I2PTunnel"]["client"].keys())): + print("-", tunnel, csi_res["I2PTunnel"]["client"][tunnel]) + + if csi_res["I2PTunnel"]["server"]: + fancy_title("Server I2P Tunnels") + for tunnel in sorted(list(csi_res["I2PTunnel"]["server"].keys())): + t = csi_res["I2PTunnel"]["server"][tunnel] + if "port" in t: + print("-", tunnel, "{}:{}".format(t["address"], t["port"])) + else: + print("-", tunnel, t["address"]) + + +def main(): + URL = os.getenv("I2PCONTROL_URL", "https://127.0.0.1:7650/") + + ctl = I2pdctl(I2PControl(URL)) + + parser = argparse.ArgumentParser() + + subparsers = parser.add_subparsers(title="actions",help="Command to execute") + + exec_parser = subparsers.add_parser("exec", description="Execute RPC method with parameters, return results or raw JSON") + exec_parser.add_argument('method', help="RPC method name") + exec_parser.add_argument('parameters', help="Parameters as raw JSON string") + exec_parser.add_argument('-j', '--json-output', action="store_true", help="Output raw JSON reply") + exec_parser.set_defaults(func=ctl.execute) + + raw_info_parser = subparsers.add_parser("raw_info", description="Retrieve JSON info from specified *Info methods") + raw_info_parser.add_argument('method', nargs='*', + help="RPC method(s) to retreive info. Supported methods: {}".format(", ".join(INFO_METHODS.keys()))) + raw_info_parser.set_defaults(func=ctl.raw_info) + + print_info_parser = subparsers.add_parser("info", description="Print generic information about node in a human readable form") + print_info_parser.set_defaults(func=ctl.print_info) + + args = parser.parse_args() + + if hasattr(args, "func"): + try: + args.func(args) + except requests.exceptions.ConnectionError: + print("Error: I2PControl URL is unavailable. Check your i2pd settings and network connection.") + exit(1) + else: + parser.print_help() + +if __name__ == "__main__": + main()