Browse Source

Merge pull request #42 from PurpleI2P/updates

Add i2pdctl tool
pull/44/head
l-n-s 7 years ago committed by GitHub
parent
commit
14964bd384
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 204
      scripts/i2pdctl

204
scripts/i2pdctl

@ -0,0 +1,204 @@ @@ -0,0 +1,204 @@
#!/usr/bin/env python3
import os
import json
import logging
import pprint
import argparse
try:
import requests
except ImportError:
print("Error: requests module is required. apt install python3-requests")
exit(1)
# Disabling annoying warnings
requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecureRequestWarning)
INFO_METHODS = {
"RouterInfo": {
"i2p.router.uptime": "",
"i2p.router.net.status": "",
"i2p.router.netdb.knownpeers": "",
"i2p.router.netdb.activepeers": "",
"i2p.router.net.bw.inbound.1s": "",
"i2p.router.net.bw.outbound.1s": "",
"i2p.router.net.tunnels.participating": "",
"i2p.router.net.tunnels.successrate": "",
"i2p.router.net.total.received.bytes": "",
"i2p.router.net.total.sent.bytes": "",
},
"ClientServicesInfo": {
"I2PTunnel": "",
"SOCKS": "",
"HTTPProxy": "",
"SAM": "",
"BOB": "",
"I2CP": "",
}
}
STATUS = [
"OK",
"TESTING",
"FIREWALLED",
"HIDDEN",
"WARN_FIREWALLED_AND_FAST",
"WARN_FIREWALLED_AND_FLOODFILL",
"WARN_FIREWALLED_WITH_INBOUND_TCP",
"WARN_FIREWALLED_WITH_UDP_DISABLED",
"ERROR_I2CP",
"ERROR_CLOCK_SKEW",
"ERROR_PRIVATE_TCP_ADDRESS",
"ERROR_SYMMETRIC_NAT",
"ERROR_UDP_PORT_IN_USE",
"ERROR_NO_ACTIVE_PEERS_CHECK_CONNECTION_AND_FIREWALL",
"ERROR_UDP_DISABLED_AND_TCP_UNSET",
]
class I2PControl(object):
"""Talk to I2PControl API"""
def __init__(self, url, password='itoopie'):
self.url = url
self.password = password
self._token = None
@property
def token(self):
"""Cached authentication token"""
if not self._token:
self._token = requests.post(self.url,
json.dumps({'id': 1, 'method': 'Authenticate',
'params': {'API': 1, 'Password': self.password},
'jsonrpc': '2.0'}),
verify=False).json()["result"]["Token"]
return self._token
def request(self, method, params):
"""Execute authenticated request"""
return requests.post(self.url,
json.dumps({'id': 1, 'method': method, 'params': params,
'jsonrpc': '2.0', 'Token': self.token}),
verify=False
)
class I2pdctl(object):
"""i2pd control"""
def __init__(self, ctl):
self.ctl = ctl
def execute(self, args):
"""Execute raw method"""
try:
arguments = json.loads(args.arguments)
except json.decoder.JSONDecodeError:
print("Error: arguments should be a valid JSON object")
return
resp = self.ctl.request(args.method, arguments)
if args.json_output:
print(resp.text)
elif "result" in resp:
pprint.pprint(resp.json()["result"])
else:
pprint.pprint(resp.json())
def raw_info(self, args):
"""Retrieve raw JSON reply from method(s)"""
res = {}
for m in args.method:
if m not in INFO_METHODS:
print("Invalid method {}. Supported methods are {}".format(m,
", ".join(INFO_METHODS.keys())))
return
else:
res[m] = self.ctl.request(m, INFO_METHODS[m]).json()['result']
print(json.dumps(res))
def print_info(self, args):
"""Print information about a node in a human readable form"""
def fancy_title(string):
print("\n### {}".format(string))
ri_res = self.ctl.request("RouterInfo", INFO_METHODS["RouterInfo"]).json()['result']
try:
csi_res = self.ctl.request("ClientServicesInfo", INFO_METHODS["ClientServicesInfo"]).json()['result']
except KeyError:
csi_res = False
fancy_title("Router info")
print("Uptime: {}".format(ri_res["i2p.router.uptime"]))
print("Status: {}".format(STATUS[ri_res["i2p.router.net.status"]]))
print("Tunnel creation success rate: {}%".format(
ri_res["i2p.router.net.tunnels.successrate"]))
print("Received: {} ({} B/s) / Sent: {} ({} B/s)".format(
ri_res["i2p.router.net.total.received.bytes"],
ri_res["i2p.router.net.bw.inbound.1s"],
ri_res["i2p.router.net.total.sent.bytes"],
ri_res["i2p.router.net.bw.outbound.1s"]))
print("Known routers: {} / Active: {}".format(
ri_res["i2p.router.netdb.knownpeers"],
ri_res["i2p.router.netdb.activepeers"]))
if csi_res:
fancy_title("Interfaces")
for n in ["HTTPProxy", "SOCKS", "BOB", "SAM", "I2CP"]:
print("- {}:".format(n), "ON" if csi_res[n]["enabled"] == "true" else "OFF")
if csi_res["I2PTunnel"]["client"]:
fancy_title("Client I2P Tunnels")
for tunnel in sorted(list(csi_res["I2PTunnel"]["client"].keys())):
print("-", tunnel, csi_res["I2PTunnel"]["client"][tunnel])
if csi_res["I2PTunnel"]["server"]:
fancy_title("Server I2P Tunnels")
for tunnel in sorted(list(csi_res["I2PTunnel"]["server"].keys())):
t = csi_res["I2PTunnel"]["server"][tunnel]
if "port" in t:
print("-", tunnel, "{}:{}".format(t["address"], t["port"]))
else:
print("-", tunnel, t["address"])
def main():
URL = os.getenv("I2PCONTROL_URL", "https://127.0.0.1:7650/")
PASSWORD = os.getenv("I2PCONTROL_PASSWORD", "itoopie")
ctl = I2pdctl(I2PControl(URL, PASSWORD))
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(title="actions",help="Command to execute")
exec_parser = subparsers.add_parser("exec", description="Execute RPC method with parameters, return results or raw JSON")
exec_parser.add_argument('method', help="RPC method name")
exec_parser.add_argument('parameters', help="Parameters as raw JSON string")
exec_parser.add_argument('-j', '--json-output', action="store_true", help="Output raw JSON reply")
exec_parser.set_defaults(func=ctl.execute)
raw_info_parser = subparsers.add_parser("raw_info", description="Retrieve JSON info from specified *Info methods")
raw_info_parser.add_argument('method', nargs='*',
help="RPC method(s) to retreive info. Supported methods: {}".format(", ".join(INFO_METHODS.keys())))
raw_info_parser.set_defaults(func=ctl.raw_info)
print_info_parser = subparsers.add_parser("info", description="Print generic information about node in a human readable form")
print_info_parser.set_defaults(func=ctl.print_info)
args = parser.parse_args()
if hasattr(args, "func"):
try:
args.func(args)
except requests.exceptions.ConnectionError:
print("Error: I2PControl URL is unavailable. Check your i2pd settings and network connection.")
exit(1)
else:
parser.print_help()
if __name__ == "__main__":
main()
Loading…
Cancel
Save