Browse Source

[tests] fix flake8 warnings in test_framework.py and util.py

0.15
John Newbery 8 years ago
parent
commit
f1fe5368f1
  1. 23
      test/functional/test_framework/test_framework.py
  2. 38
      test/functional/test_framework/util.py

23
test/functional/test_framework/test_framework.py

@ -14,6 +14,7 @@ import subprocess @@ -14,6 +14,7 @@ import subprocess
import sys
import tempfile
import time
import traceback
from .util import (
PortSeed,
@ -354,18 +355,13 @@ class BitcoinTestFramework(object): @@ -354,18 +355,13 @@ class BitcoinTestFramework(object):
for i in range(num_nodes):
initialize_datadir(test_dir, i)
# Test framework for doing p2p comparison testing, which sets up some bitcoind
# binaries:
# 1 binary: test binary
# 2 binaries: 1 test binary, 1 ref binary
# n>2 binaries: 1 test binary, n-1 ref binaries
class SkipTest(Exception):
"""This exception is raised to skip a test"""
def __init__(self, message):
self.message = message
class ComparisonTestFramework(BitcoinTestFramework):
"""Test framework for doing p2p comparison testing
Sets up some bitcoind binaries:
- 1 binary: test binary
- 2 binaries: 1 test binary, 1 ref binary
- n>2 binaries: 1 test binary, n-1 ref binaries"""
def __init__(self):
super().__init__()
@ -388,3 +384,8 @@ class ComparisonTestFramework(BitcoinTestFramework): @@ -388,3 +384,8 @@ class ComparisonTestFramework(BitcoinTestFramework):
self.num_nodes, self.options.tmpdir, extra_args,
binary=[self.options.testbinary] +
[self.options.refbinary] * (self.num_nodes - 1))
class SkipTest(Exception):
"""This exception is raised to skip a test"""
def __init__(self, message):
self.message = message

38
test/functional/test_framework/util.py

@ -4,20 +4,19 @@ @@ -4,20 +4,19 @@
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Helpful routines for regression testing."""
import os
from binascii import hexlify, unhexlify
from base64 import b64encode
from binascii import hexlify, unhexlify
from decimal import Decimal, ROUND_DOWN
import json
import errno
import http.client
import json
import logging
import os
import random
import re
import subprocess
import tempfile
import time
import re
import errno
import logging
from . import coverage
from .authproxy import AuthServiceProxy, JSONRPCException
@ -234,7 +233,7 @@ def wait_for_bitcoind_start(process, datadir, i, rpchost=None): @@ -234,7 +233,7 @@ def wait_for_bitcoind_start(process, datadir, i, rpchost=None):
try:
# Check if .cookie file to be created
rpc = get_rpc_proxy(rpc_url(datadir, i, rpchost), i)
blocks = rpc.getblockcount()
rpc.getblockcount()
break # break out of loop on success
except IOError as e:
if e.errno != errno.ECONNREFUSED: # Port not yet open?
@ -259,7 +258,8 @@ def _start_node(i, dirname, extra_args=None, rpchost=None, timewait=None, binary @@ -259,7 +258,8 @@ def _start_node(i, dirname, extra_args=None, rpchost=None, timewait=None, binary
if binary is None:
binary = os.getenv("BITCOIND", "bitcoind")
args = [binary, "-datadir=" + datadir, "-server", "-keypool=1", "-discover=0", "-rest", "-logtimemicros", "-debug", "-debugexclude=libevent", "-debugexclude=leveldb", "-mocktime=" + str(get_mocktime()), "-uacomment=testnode%d" % i]
if extra_args is not None: args.extend(extra_args)
if extra_args is not None:
args.extend(extra_args)
bitcoind_processes[i] = subprocess.Popen(args, stderr=stderr)
logger.debug("initialize_chain: bitcoind started, waiting for RPC to come up")
wait_for_bitcoind_start(bitcoind_processes[i], datadir, i, rpchost)
@ -295,15 +295,18 @@ def _start_nodes(num_nodes, dirname, extra_args=None, rpchost=None, timewait=Non @@ -295,15 +295,18 @@ def _start_nodes(num_nodes, dirname, extra_args=None, rpchost=None, timewait=Non
This function should only be called from within test_framework, not by individual test scripts."""
if extra_args is None: extra_args = [ None for _ in range(num_nodes) ]
if binary is None: binary = [ None for _ in range(num_nodes) ]
if extra_args is None:
extra_args = [None] * num_nodes
if binary is None:
binary = [None] * num_nodes
assert_equal(len(extra_args), num_nodes)
assert_equal(len(binary), num_nodes)
rpcs = []
try:
for i in range(num_nodes):
rpcs.append(_start_node(i, dirname, extra_args[i], rpchost, timewait=timewait, binary=binary[i]))
except: # If one node failed to start, stop the others
except:
# If one node failed to start, stop the others
_stop_nodes(rpcs)
raise
return rpcs
@ -372,7 +375,6 @@ def find_output(node, txid, amount): @@ -372,7 +375,6 @@ def find_output(node, txid, amount):
return i
raise RuntimeError("find_output txid %s : %s not found" % (txid, str(amount)))
def gather_inputs(from_node, amount_needed, confirmations_required=1):
"""
Return a random set of unspent txouts that are enough to pay amount_needed
@ -515,7 +517,7 @@ def assert_array_result(object_array, to_match, expected, should_not_find = Fals @@ -515,7 +517,7 @@ def assert_array_result(object_array, to_match, expected, should_not_find = Fals
If the should_not_find flag is true, to_match should not be found
in object_array
"""
if should_not_find == True:
if should_not_find:
assert_equal(expected, {})
num_matched = 0
for item in object_array:
@ -525,15 +527,15 @@ def assert_array_result(object_array, to_match, expected, should_not_find = Fals @@ -525,15 +527,15 @@ def assert_array_result(object_array, to_match, expected, should_not_find = Fals
all_match = False
if not all_match:
continue
elif should_not_find == True:
elif should_not_find:
num_matched = num_matched + 1
for key, value in expected.items():
if item[key] != value:
raise AssertionError("%s : expected %s=%s" % (str(item), str(key), str(value)))
num_matched = num_matched + 1
if num_matched == 0 and should_not_find != True:
if num_matched == 0 and not should_not_find:
raise AssertionError("No objects matched %s" % (str(to_match)))
if num_matched > 0 and should_not_find == True:
if num_matched > 0 and should_not_find:
raise AssertionError("Objects were found %s" % (str(to_match)))
def satoshi_round(amount):
@ -559,7 +561,7 @@ def create_confirmed_utxos(fee, node, count): @@ -559,7 +561,7 @@ def create_confirmed_utxos(fee, node, count):
outputs[addr2] = satoshi_round(send_value / 2)
raw_tx = node.createrawtransaction(inputs, outputs)
signed_tx = node.signrawtransaction(raw_tx)["hex"]
txid = node.sendrawtransaction(signed_tx)
node.sendrawtransaction(signed_tx)
while (node.getmempoolinfo()['size'] > 0):
node.generate(1)

Loading…
Cancel
Save