#!/usr/bin/env python3 # Copyright (c) 2014-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ rpc-tests.py - run regression test suite This module calls down into individual test cases via subprocess. It will forward all unrecognized arguments onto the individual test scripts. For a description of arguments recognized by test scripts, see `qa/pull-tester/test_framework/test_framework.py:BitcoinTestFramework.main`. """ import argparse import configparser import os import time import shutil import sys import subprocess import tempfile import re # Parse arguments and pass through unrecognised args parser = argparse.ArgumentParser(add_help=False, usage='%(prog)s [rpc-test.py options] [script options] [scripts]', description=__doc__, epilog=''' Help text and arguments for individual test script:''', formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--coverage', action='store_true', help='generate a basic coverage report for the RPC interface') parser.add_argument('--extended', action='store_true', help='run the extended test suite in addition to the basic tests') parser.add_argument('--help', '-h', '-?', action='store_true', help='print help text and exit') parser.add_argument('--jobs', '-j', type=int, default=4, help='how many test scripts to run in parallel. Default=4.') parser.add_argument('--nozmq', action='store_true', help='do not run the zmq tests') parser.add_argument('--win', action='store_true', help='signal that this is running in a Windows environment and that we should run the tests') (args, unknown_args) = parser.parse_known_args() #Create a set to store arguments and create the passon string tests = set(arg for arg in unknown_args if arg[:2] != "--") passon_args = [arg for arg in unknown_args if arg[:2] == "--"] BOLD = ("","") if os.name == 'posix': # primitive formatting on supported # terminal via ANSI escape sequences: BOLD = ('\033[0m', '\033[1m') # Read config generated by configure. config = configparser.ConfigParser() config.read_file(open(os.path.dirname(__file__) + "/tests_config.ini")) ENABLE_WALLET = config["components"]["ENABLE_WALLET"] == "True" ENABLE_UTILS = config["components"]["ENABLE_UTILS"] == "True" ENABLE_BITCOIND = config["components"]["ENABLE_BITCOIND"] == "True" ENABLE_ZMQ = config["components"]["ENABLE_ZMQ"] == "True" and not args.nozmq RPC_TESTS_DIR = config["environment"]["SRCDIR"] + '/qa/rpc-tests/' print_help = args.help jobs = args.jobs #Set env vars if "BITCOIND" not in os.environ: os.environ["BITCOIND"] = config["environment"]["BUILDDIR"] + '/src/bitcoind' + config["environment"]["EXEEXT"] if config["environment"]["EXEEXT"] == ".exe" and not args.win: # https://github.com/bitcoin/bitcoin/commit/d52802551752140cf41f0d9a225a43e84404d3e9 # https://github.com/bitcoin/bitcoin/pull/5677#issuecomment-136646964 print("Win tests currently disabled by default. Use --win option to enable") sys.exit(0) if not (ENABLE_WALLET and ENABLE_UTILS and ENABLE_BITCOIND): print("No rpc tests to run. Wallet, utils, and bitcoind must all be enabled") sys.exit(0) # python3-zmq may not be installed. Handle this gracefully and with some helpful info if ENABLE_ZMQ: try: import zmq except ImportError: print("ERROR: \"import zmq\" failed. Use -nozmq to run without the ZMQ tests." "To run zmq tests, see dependency info in /qa/README.md.") raise BASE_SCRIPTS= [ # longest test should go first, to favor running tests in parallel 'wallet-hd.py', 'walletbackup.py', # vv Tests less than 5m vv 'p2p-fullblocktest.py', 'fundrawtransaction.py', 'p2p-compactblocks.py', 'segwit.py', # vv Tests less than 2m vv 'wallet.py', 'wallet-accounts.py', 'p2p-segwit.py', 'wallet-dump.py', 'listtransactions.py', # vv Tests less than 60s vv 'sendheaders.py', 'zapwallettxes.py', 'importmulti.py', 'mempool_limit.py', 'merkle_blocks.py', 'receivedby.py', 'abandonconflict.py', 'bip68-112-113-p2p.py', 'rawtransactions.py', 'reindex.py', # vv Tests less than 30s vv 'mempool_resurrect_test.py', 'txn_doublespend.py --mineblock', 'txn_clone.py', 'getchaintips.py', 'rest.py', 'mempool_spendcoinbase.py', 'mempool_reorg.py', 'httpbasics.py', 'multi_rpc.py', 'proxy_test.py', 'signrawtransactions.py', 'nodehandling.py', 'decodescript.py', 'blockchain.py', 'disablewallet.py', 'keypool.py', 'p2p-mempool.py', 'prioritise_transaction.py', 'invalidblockrequest.py', 'invalidtxrequest.py', 'p2p-versionbits-warning.py', 'preciousblock.py', 'importprunedfunds.py', 'signmessages.py', 'nulldummy.py', 'import-rescan.py', 'bumpfee.py', 'rpcnamedargs.py', 'listsinceblock.py', ] ZMQ_SCRIPTS = ["zmq_test.py"] EXTENDED_SCRIPTS = [ 'pruning.py', # vv Tests less than 20m vv 'smartfees.py', # vv Tests less than 5m vv 'maxuploadtarget.py', 'mempool_packages.py', # vv Tests less than 2m vv 'bip68-sequence.py', 'getblocktemplate_longpoll.py', # vv Tests less than 60s vv 'bip9-softforks.py', 'p2p-feefilter.py', 'rpcbind_test.py', # vv Tests less than 30s vv 'bip65-cltv.py', 'bip65-cltv-p2p.py', 'bipdersig-p2p.py', 'bipdersig.py', 'getblocktemplate_proposals.py', 'txn_doublespend.py', 'txn_clone.py --mineblock', 'forknotify.py', 'invalidateblock.py', 'maxblocksinflight.py', 'p2p-acceptblock.py', 'replace-by-fee.py', ] ALL_SCRIPTS = BASE_SCRIPTS + ZMQ_SCRIPTS + EXTENDED_SCRIPTS def runtests(): # Build list of tests if len(tests) != 0: # Individual tests have been specified. Run specified tests that exist # in the ALL_SCRIPTS list. Accept the name with or without .py extension. test_list = [t for t in ALL_SCRIPTS if (t in tests or re.sub(".py$", "", t) in tests)] if len(test_list) == 0: print("No valid test scripts specified. Check that your test is in one " "of the test lists in rpc-tests.py or run rpc-tests.py with no arguments to run all tests") sys.exit(0) else: # No individual tests have been specified. Run base tests, and # optionally ZMQ tests and extended tests. test_list = BASE_SCRIPTS if ENABLE_ZMQ: test_list += ZMQ_SCRIPTS if args.extended: test_list += EXTENDED_SCRIPTS # TODO: BASE_SCRIPTS and EXTENDED_SCRIPTS are sorted by runtime # (for parallel running efficiency). This combined list will is no # longer sorted. if args.help: # Print help for rpc-tests.py, then print help of the first script and exit. parser.print_help() subprocess.check_call((RPC_TESTS_DIR + test_list[0]).split() + ['-h']) sys.exit(0) coverage = None if args.coverage: coverage = RPCCoverage() print("Initializing coverage directory at %s\n" % coverage.dir) flags = ["--srcdir=%s/src" % config["environment"]["BUILDDIR"]] + passon_args flags.append("--cachedir=%s/qa/cache" % config["environment"]["BUILDDIR"]) if coverage: flags.append(coverage.flag) if len(test_list) > 1 and jobs > 1: # Populate cache subprocess.check_output([RPC_TESTS_DIR + 'create_cache.py'] + flags) #Run Tests max_len_name = len(max(test_list, key=len)) time_sum = 0 time0 = time.time() job_queue = RPCTestHandler(jobs, test_list, flags) results = BOLD[1] + "%s | %s | %s\n\n" % ("TEST".ljust(max_len_name), "PASSED", "DURATION") + BOLD[0] all_passed = True for _ in range(len(test_list)): (name, stdout, stderr, passed, duration) = job_queue.get_next() all_passed = all_passed and passed time_sum += duration print('\n' + BOLD[1] + name + BOLD[0] + ":") print('' if passed else stdout + '\n', end='') print('' if stderr == '' else 'stderr:\n' + stderr + '\n', end='') results += "%s | %s | %s s\n" % (name.ljust(max_len_name), str(passed).ljust(6), duration) print("Pass: %s%s%s, Duration: %s s\n" % (BOLD[1], passed, BOLD[0], duration)) results += BOLD[1] + "\n%s | %s | %s s (accumulated)" % ("ALL".ljust(max_len_name), str(all_passed).ljust(6), time_sum) + BOLD[0] print(results) print("\nRuntime: %s s" % (int(time.time() - time0))) if coverage: coverage.report_rpc_coverage() print("Cleaning up coverage data") coverage.cleanup() sys.exit(not all_passed) class RPCTestHandler: """ Trigger the testscrips passed in via the list. """ def __init__(self, num_tests_parallel, test_list=None, flags=None): assert(num_tests_parallel >= 1) self.num_jobs = num_tests_parallel self.test_list = test_list self.flags = flags self.num_running = 0 # In case there is a graveyard of zombie bitcoinds, we can apply a # pseudorandom offset to hopefully jump over them. # (625 is PORT_RANGE/MAX_NODES) self.portseed_offset = int(time.time() * 1000) % 625 self.jobs = [] def get_next(self): while self.num_running < self.num_jobs and self.test_list: # Add tests self.num_running += 1 t = self.test_list.pop(0) port_seed = ["--portseed={}".format(len(self.test_list) + self.portseed_offset)] log_stdout = tempfile.SpooledTemporaryFile(max_size=2**16) log_stderr = tempfile.SpooledTemporaryFile(max_size=2**16) self.jobs.append((t, time.time(), subprocess.Popen((RPC_TESTS_DIR + t).split() + self.flags + port_seed, universal_newlines=True, stdout=log_stdout, stderr=log_stderr), log_stdout, log_stderr)) if not self.jobs: raise IndexError('pop from empty list') while True: # Return first proc that finishes time.sleep(.5) for j in self.jobs: (name, time0, proc, log_out, log_err) = j if proc.poll() is not None: log_out.seek(0), log_err.seek(0) [stdout, stderr] = [l.read().decode('utf-8') for l in (log_out, log_err)] log_out.close(), log_err.close() passed = stderr == "" and proc.returncode == 0 self.num_running -= 1 self.jobs.remove(j) return name, stdout, stderr, passed, int(time.time() - time0) print('.', end='', flush=True) class RPCCoverage(object): """ Coverage reporting utilities for pull-tester. Coverage calculation works by having each test script subprocess write coverage files into a particular directory. These files contain the RPC commands invoked during testing, as well as a complete listing of RPC commands per `bitcoin-cli help` (`rpc_interface.txt`). After all tests complete, the commands run are combined and diff'd against the complete list to calculate uncovered RPC commands. See also: qa/rpc-tests/test_framework/coverage.py """ def __init__(self): self.dir = tempfile.mkdtemp(prefix="coverage") self.flag = '--coveragedir=%s' % self.dir def report_rpc_coverage(self): """ Print out RPC commands that were unexercised by tests. """ uncovered = self._get_uncovered_rpc_commands() if uncovered: print("Uncovered RPC commands:") print("".join((" - %s\n" % i) for i in sorted(uncovered))) else: print("All RPC commands covered.") def cleanup(self): return shutil.rmtree(self.dir) def _get_uncovered_rpc_commands(self): """ Return a set of currently untested RPC commands. """ # This is shared from `qa/rpc-tests/test-framework/coverage.py` REFERENCE_FILENAME = 'rpc_interface.txt' COVERAGE_FILE_PREFIX = 'coverage.' coverage_ref_filename = os.path.join(self.dir, REFERENCE_FILENAME) coverage_filenames = set() all_cmds = set() covered_cmds = set() if not os.path.isfile(coverage_ref_filename): raise RuntimeError("No coverage reference found") with open(coverage_ref_filename, 'r') as f: all_cmds.update([i.strip() for i in f.readlines()]) for root, dirs, files in os.walk(self.dir): for filename in files: if filename.startswith(COVERAGE_FILE_PREFIX): coverage_filenames.add(os.path.join(root, filename)) for filename in coverage_filenames: with open(filename, 'r') as f: covered_cmds.update([i.strip() for i in f.readlines()]) return all_cmds - covered_cmds if __name__ == '__main__': runtests()