Merge #11849: [tests] Assert that only one NetworkThread exists

5c8ff26 [tests] Add NetworkThread assertions (John Newbery)
34e08b3 [tests] Fix network threading in functional tests (John Newbery)
74e64f2 [tests] Use network_thread_start() in tests. (John Newbery)
5fc6e71 [tests] Add network_thread_ utility functions. (John Newbery)

Pull request description:

  Add assert that only one NetworkThread exists at any time in functional tests, and fix cases where that wasn't true.

  fixes #11776

Tree-SHA512: fe5d1c59005f94bf66e11bb23ccf274b1cd9913741b56ea11dbcd21db4cc0b53b4413c0c4c16dbcd6ac611adad5e5cc2baaa39720598ce7b6393889945d06298
This commit is contained in:
Wladimir J. van der Laan 2017-12-12 12:52:33 +01:00
commit ad1820cbad
No known key found for this signature in database
GPG Key ID: 1E4AED62986CD25D
23 changed files with 96 additions and 42 deletions

View File

@ -68,7 +68,7 @@ contains the higher level logic for processing P2P payloads and connecting to
the Bitcoin Core node application logic. For custom behaviour, subclass the the Bitcoin Core node application logic. For custom behaviour, subclass the
P2PInterface object and override the callback methods. P2PInterface object and override the callback methods.
- Call `NetworkThread.start()` after all `P2PInterface` objects are created to - Call `network_thread_start()` after all `P2PInterface` objects are created to
start the networking thread. (Continue with the test logic in your existing start the networking thread. (Continue with the test logic in your existing
thread.) thread.)

View File

@ -38,7 +38,8 @@ from test_framework.mininode import (CBlockHeader,
CTransaction, CTransaction,
CTxIn, CTxIn,
CTxOut, CTxOut,
NetworkThread, network_thread_join,
network_thread_start,
P2PInterface, P2PInterface,
msg_block, msg_block,
msg_headers) msg_headers)
@ -98,7 +99,7 @@ class AssumeValidTest(BitcoinTestFramework):
# Connect to node0 # Connect to node0
p2p0 = self.nodes[0].add_p2p_connection(BaseNode()) p2p0 = self.nodes[0].add_p2p_connection(BaseNode())
NetworkThread().start() # Start up network handling in another thread network_thread_start()
self.nodes[0].p2p.wait_for_verack() self.nodes[0].p2p.wait_for_verack()
# Build the blockchain # Build the blockchain
@ -159,13 +160,22 @@ class AssumeValidTest(BitcoinTestFramework):
self.block_time += 1 self.block_time += 1
height += 1 height += 1
# We're adding new connections so terminate the network thread
self.nodes[0].disconnect_p2ps()
network_thread_join()
# Start node1 and node2 with assumevalid so they accept a block with a bad signature. # Start node1 and node2 with assumevalid so they accept a block with a bad signature.
self.start_node(1, extra_args=["-assumevalid=" + hex(block102.sha256)]) self.start_node(1, extra_args=["-assumevalid=" + hex(block102.sha256)])
p2p1 = self.nodes[1].add_p2p_connection(BaseNode())
p2p1.wait_for_verack()
self.start_node(2, extra_args=["-assumevalid=" + hex(block102.sha256)]) self.start_node(2, extra_args=["-assumevalid=" + hex(block102.sha256)])
p2p0 = self.nodes[0].add_p2p_connection(BaseNode())
p2p1 = self.nodes[1].add_p2p_connection(BaseNode())
p2p2 = self.nodes[2].add_p2p_connection(BaseNode()) p2p2 = self.nodes[2].add_p2p_connection(BaseNode())
network_thread_start()
p2p0.wait_for_verack()
p2p1.wait_for_verack()
p2p2.wait_for_verack() p2p2.wait_for_verack()
# send header lists to all three nodes # send header lists to all three nodes

View File

@ -68,7 +68,7 @@ class BIP65Test(BitcoinTestFramework):
def run_test(self): def run_test(self):
self.nodes[0].add_p2p_connection(P2PInterface()) self.nodes[0].add_p2p_connection(P2PInterface())
NetworkThread().start() # Start up network handling in another thread network_thread_start()
# wait_for_verack ensures that the P2P connection is fully up. # wait_for_verack ensures that the P2P connection is fully up.
self.nodes[0].p2p.wait_for_verack() self.nodes[0].p2p.wait_for_verack()

View File

@ -45,7 +45,7 @@ bip112tx_special - test negative argument to OP_CSV
from test_framework.test_framework import ComparisonTestFramework from test_framework.test_framework import ComparisonTestFramework
from test_framework.util import * from test_framework.util import *
from test_framework.mininode import ToHex, CTransaction, NetworkThread from test_framework.mininode import ToHex, CTransaction, network_thread_start
from test_framework.blocktools import create_coinbase, create_block from test_framework.blocktools import create_coinbase, create_block
from test_framework.comptool import TestInstance, TestManager from test_framework.comptool import TestInstance, TestManager
from test_framework.script import * from test_framework.script import *
@ -100,7 +100,7 @@ class BIP68_112_113Test(ComparisonTestFramework):
def run_test(self): def run_test(self):
test = TestManager(self, self.options.tmpdir) test = TestManager(self, self.options.tmpdir)
test.add_all_connections(self.nodes) test.add_all_connections(self.nodes)
NetworkThread().start() # Start up network handling in another thread network_thread_start()
test.run() test.run()
def send_generic_input_tx(self, node, coinbases): def send_generic_input_tx(self, node, coinbases):

View File

@ -22,7 +22,7 @@ import itertools
from test_framework.test_framework import ComparisonTestFramework from test_framework.test_framework import ComparisonTestFramework
from test_framework.util import * from test_framework.util import *
from test_framework.mininode import CTransaction, NetworkThread from test_framework.mininode import CTransaction, network_thread_start
from test_framework.blocktools import create_coinbase, create_block from test_framework.blocktools import create_coinbase, create_block
from test_framework.comptool import TestInstance, TestManager from test_framework.comptool import TestInstance, TestManager
from test_framework.script import CScript, OP_1NEGATE, OP_CHECKSEQUENCEVERIFY, OP_DROP from test_framework.script import CScript, OP_1NEGATE, OP_CHECKSEQUENCEVERIFY, OP_DROP
@ -36,7 +36,7 @@ class BIP9SoftForksTest(ComparisonTestFramework):
def run_test(self): def run_test(self):
self.test = TestManager(self, self.options.tmpdir) self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes) self.test.add_all_connections(self.nodes)
NetworkThread().start() # Start up network handling in another thread network_thread_start()
self.test.run() self.test.run()
def create_transaction(self, node, coinbase, to_address, amount): def create_transaction(self, node, coinbase, to_address, amount):
@ -245,7 +245,7 @@ class BIP9SoftForksTest(ComparisonTestFramework):
self.setup_chain() self.setup_chain()
self.setup_network() self.setup_network()
self.test.add_all_connections(self.nodes) self.test.add_all_connections(self.nodes)
NetworkThread().start() network_thread_start()
self.test.p2p_connections[0].wait_for_verack() self.test.p2p_connections[0].wait_for_verack()
def get_tests(self): def get_tests(self):

View File

@ -56,7 +56,7 @@ class BIP66Test(BitcoinTestFramework):
def run_test(self): def run_test(self):
self.nodes[0].add_p2p_connection(P2PInterface()) self.nodes[0].add_p2p_connection(P2PInterface())
NetworkThread().start() # Start up network handling in another thread network_thread_start()
# wait_for_verack ensures that the P2P connection is fully up. # wait_for_verack ensures that the P2P connection is fully up.
self.nodes[0].p2p.wait_for_verack() self.nodes[0].p2p.wait_for_verack()

View File

@ -17,11 +17,12 @@ from collections import defaultdict
from test_framework.blocktools import (create_block, create_coinbase) from test_framework.blocktools import (create_block, create_coinbase)
from test_framework.mininode import ( from test_framework.mininode import (
CInv, CInv,
NetworkThread,
P2PInterface, P2PInterface,
mininode_lock, mininode_lock,
msg_block, msg_block,
msg_getdata, msg_getdata,
network_thread_join,
network_thread_start,
) )
from test_framework.test_framework import BitcoinTestFramework from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import ( from test_framework.util import (
@ -131,12 +132,12 @@ class ExampleTest(BitcoinTestFramework):
def run_test(self): def run_test(self):
"""Main test logic""" """Main test logic"""
# Create a P2P connection to one of the nodes # Create P2P connections to two of the nodes
self.nodes[0].add_p2p_connection(BaseNode()) self.nodes[0].add_p2p_connection(BaseNode())
# Start up network handling in another thread. This needs to be called # Start up network handling in another thread. This needs to be called
# after the P2P connections have been created. # after the P2P connections have been created.
NetworkThread().start() network_thread_start()
# wait_for_verack ensures that the P2P connection is fully up. # wait_for_verack ensures that the P2P connection is fully up.
self.nodes[0].p2p.wait_for_verack() self.nodes[0].p2p.wait_for_verack()
@ -188,7 +189,14 @@ class ExampleTest(BitcoinTestFramework):
connect_nodes(self.nodes[1], 2) connect_nodes(self.nodes[1], 2)
self.log.info("Add P2P connection to node2") self.log.info("Add P2P connection to node2")
# We can't add additional P2P connections once the network thread has started. Disconnect the connection
# to node0, wait for the network thread to terminate, then connect to node2. This is specific to
# the current implementation of the network thread and may be improved in future.
self.nodes[0].disconnect_p2ps()
network_thread_join()
self.nodes[2].add_p2p_connection(BaseNode()) self.nodes[2].add_p2p_connection(BaseNode())
network_thread_start()
self.nodes[2].p2p.wait_for_verack() self.nodes[2].p2p.wait_for_verack()
self.log.info("Wait for node2 reach current tip. Test that it has propagated all the blocks to us") self.log.info("Wait for node2 reach current tip. Test that it has propagated all the blocks to us")

View File

@ -15,6 +15,7 @@ from test_framework.test_framework import ComparisonTestFramework
from test_framework.util import * from test_framework.util import *
from test_framework.comptool import TestManager, TestInstance, RejectResult from test_framework.comptool import TestManager, TestInstance, RejectResult
from test_framework.blocktools import * from test_framework.blocktools import *
from test_framework.mininode import network_thread_start
import copy import copy
import time import time
@ -32,7 +33,7 @@ class InvalidBlockRequestTest(ComparisonTestFramework):
test.add_all_connections(self.nodes) test.add_all_connections(self.nodes)
self.tip = None self.tip = None
self.block_time = None self.block_time = None
NetworkThread().start() # Start up network handling in another thread network_thread_start()
test.run() test.run()
def get_tests(self): def get_tests(self):

View File

@ -28,7 +28,7 @@ class InvalidTxRequestTest(ComparisonTestFramework):
test.add_all_connections(self.nodes) test.add_all_connections(self.nodes)
self.tip = None self.tip = None
self.block_time = None self.block_time = None
NetworkThread().start() # Start up network handling in another thread network_thread_start()
test.run() test.run()
def get_tests(self): def get_tests(self):

View File

@ -57,7 +57,7 @@ class MaxUploadTest(BitcoinTestFramework):
for _ in range(3): for _ in range(3):
p2p_conns.append(self.nodes[0].add_p2p_connection(TestNode())) p2p_conns.append(self.nodes[0].add_p2p_connection(TestNode()))
NetworkThread().start() # Start up network handling in another thread network_thread_start()
for p2pc in p2p_conns: for p2pc in p2p_conns:
p2pc.wait_for_verack() p2pc.wait_for_verack()
@ -149,7 +149,7 @@ class MaxUploadTest(BitcoinTestFramework):
# Reconnect to self.nodes[0] # Reconnect to self.nodes[0]
self.nodes[0].add_p2p_connection(TestNode()) self.nodes[0].add_p2p_connection(TestNode())
NetworkThread().start() # Start up network handling in another thread network_thread_start()
self.nodes[0].p2p.wait_for_verack() self.nodes[0].p2p.wait_for_verack()
#retrieve 20 blocks which should be enough to break the 1MB limit #retrieve 20 blocks which should be enough to break the 1MB limit

View File

@ -15,7 +15,7 @@ Generate 427 more blocks.
from test_framework.test_framework import BitcoinTestFramework from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import * from test_framework.util import *
from test_framework.mininode import CTransaction, NetworkThread from test_framework.mininode import CTransaction, network_thread_start
from test_framework.blocktools import create_coinbase, create_block, add_witness_commitment from test_framework.blocktools import create_coinbase, create_block, add_witness_commitment
from test_framework.script import CScript from test_framework.script import CScript
from io import BytesIO from io import BytesIO
@ -50,7 +50,7 @@ class NULLDUMMYTest(BitcoinTestFramework):
self.wit_address = self.nodes[0].addwitnessaddress(self.address) self.wit_address = self.nodes[0].addwitnessaddress(self.address)
self.wit_ms_address = self.nodes[0].addwitnessaddress(self.ms_address) self.wit_ms_address = self.nodes[0].addwitnessaddress(self.ms_address)
NetworkThread().start() # Start up network handling in another thread network_thread_start()
self.coinbase_blocks = self.nodes[0].generate(2) # Block 2 self.coinbase_blocks = self.nodes[0].generate(2) # Block 2
coinbase_txid = [] coinbase_txid = []
for i in self.coinbase_blocks: for i in self.coinbase_blocks:

View File

@ -83,7 +83,7 @@ class AcceptBlockTest(BitcoinTestFramework):
# min_work_node connects to node1 (whitelisted) # min_work_node connects to node1 (whitelisted)
min_work_node = self.nodes[1].add_p2p_connection(P2PInterface()) min_work_node = self.nodes[1].add_p2p_connection(P2PInterface())
NetworkThread().start() # Start up network handling in another thread network_thread_start()
# Test logic begins here # Test logic begins here
test_node.wait_for_verack() test_node.wait_for_verack()
@ -207,9 +207,13 @@ class AcceptBlockTest(BitcoinTestFramework):
# disconnect/reconnect first # disconnect/reconnect first
self.nodes[0].disconnect_p2ps() self.nodes[0].disconnect_p2ps()
test_node = self.nodes[0].add_p2p_connection(P2PInterface()) self.nodes[1].disconnect_p2ps()
network_thread_join()
test_node = self.nodes[0].add_p2p_connection(P2PInterface())
network_thread_start()
test_node.wait_for_verack() test_node.wait_for_verack()
test_node.send_message(msg_block(block_h1f)) test_node.send_message(msg_block(block_h1f))
test_node.sync_with_ping() test_node.sync_with_ping()
@ -294,7 +298,7 @@ class AcceptBlockTest(BitcoinTestFramework):
self.nodes[0].disconnect_p2ps() self.nodes[0].disconnect_p2ps()
test_node = self.nodes[0].add_p2p_connection(P2PInterface()) test_node = self.nodes[0].add_p2p_connection(P2PInterface())
NetworkThread().start() # Start up network handling in another thread network_thread_start()
test_node.wait_for_verack() test_node.wait_for_verack()
# We should have failed reorg and switched back to 290 (but have block 291) # We should have failed reorg and switched back to 290 (but have block 291)

View File

@ -792,7 +792,7 @@ class CompactBlocksTest(BitcoinTestFramework):
self.segwit_node = self.nodes[1].add_p2p_connection(TestNode(), services=NODE_NETWORK|NODE_WITNESS) self.segwit_node = self.nodes[1].add_p2p_connection(TestNode(), services=NODE_NETWORK|NODE_WITNESS)
self.old_node = self.nodes[1].add_p2p_connection(TestNode(), services=NODE_NETWORK) self.old_node = self.nodes[1].add_p2p_connection(TestNode(), services=NODE_NETWORK)
NetworkThread().start() # Start up network handling in another thread network_thread_start()
self.test_node.wait_for_verack() self.test_node.wait_for_verack()

View File

@ -49,7 +49,7 @@ class FeeFilterTest(BitcoinTestFramework):
# Setup the p2p connections and start up the network thread. # Setup the p2p connections and start up the network thread.
self.nodes[0].add_p2p_connection(TestNode()) self.nodes[0].add_p2p_connection(TestNode())
NetworkThread().start() network_thread_start()
self.nodes[0].p2p.wait_for_verack() self.nodes[0].p2p.wait_for_verack()
# Test that invs are received for all txs at feerate of 20 sat/byte # Test that invs are received for all txs at feerate of 20 sat/byte

View File

@ -13,12 +13,12 @@ import time
from test_framework.blocktools import (create_block, create_coinbase) from test_framework.blocktools import (create_block, create_coinbase)
from test_framework.mininode import ( from test_framework.mininode import (
CInv, CInv,
NetworkThread,
P2PInterface, P2PInterface,
msg_headers, msg_headers,
msg_block, msg_block,
msg_getdata, msg_getdata,
msg_getheaders, msg_getheaders,
network_thread_start,
wait_until, wait_until,
) )
from test_framework.test_framework import BitcoinTestFramework from test_framework.test_framework import BitcoinTestFramework
@ -77,7 +77,7 @@ class P2PFingerprintTest(BitcoinTestFramework):
def run_test(self): def run_test(self):
node0 = self.nodes[0].add_p2p_connection(P2PInterface()) node0 = self.nodes[0].add_p2p_connection(P2PInterface())
NetworkThread().start() network_thread_start()
node0.wait_for_verack() node0.wait_for_verack()
# Set node time to 60 days ago # Set node time to 60 days ago

View File

@ -18,6 +18,7 @@ from test_framework.blocktools import *
import time import time
from test_framework.key import CECKey from test_framework.key import CECKey
from test_framework.script import * from test_framework.script import *
from test_framework.mininode import network_thread_start
import struct import struct
class PreviousSpendableOutput(): class PreviousSpendableOutput():
@ -68,7 +69,7 @@ class FullBlockTest(ComparisonTestFramework):
def run_test(self): def run_test(self):
self.test = TestManager(self, self.options.tmpdir) self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes) self.test.add_all_connections(self.nodes)
NetworkThread().start() # Start up network handling in another thread network_thread_start()
self.test.run() self.test.run()
def add_transactions_to_block(self, block, tx_list): def add_transactions_to_block(self, block, tx_list):

View File

@ -103,7 +103,7 @@ class P2PLeakTest(BitcoinTestFramework):
unsupported_service_bit5_node = self.nodes[0].add_p2p_connection(CLazyNode(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_5) unsupported_service_bit5_node = self.nodes[0].add_p2p_connection(CLazyNode(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_5)
unsupported_service_bit7_node = self.nodes[0].add_p2p_connection(CLazyNode(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_7) unsupported_service_bit7_node = self.nodes[0].add_p2p_connection(CLazyNode(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_7)
NetworkThread().start() # Start up network handling in another thread network_thread_start()
wait_until(lambda: no_version_bannode.ever_connected, timeout=10, lock=mininode_lock) wait_until(lambda: no_version_bannode.ever_connected, timeout=10, lock=mininode_lock)
wait_until(lambda: no_version_idlenode.ever_connected, timeout=10, lock=mininode_lock) wait_until(lambda: no_version_idlenode.ever_connected, timeout=10, lock=mininode_lock)
@ -126,8 +126,9 @@ class P2PLeakTest(BitcoinTestFramework):
self.nodes[0].disconnect_p2ps() self.nodes[0].disconnect_p2ps()
# Wait until all connections are closed # Wait until all connections are closed and the network thread has terminated
wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 0) wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 0)
network_thread_join()
# Make sure no unexpected messages came in # Make sure no unexpected messages came in
assert(no_version_bannode.unexpected_msg == False) assert(no_version_bannode.unexpected_msg == False)
@ -142,7 +143,8 @@ class P2PLeakTest(BitcoinTestFramework):
allowed_service_bit5_node = self.nodes[0].add_p2p_connection(P2PInterface(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_5) allowed_service_bit5_node = self.nodes[0].add_p2p_connection(P2PInterface(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_5)
allowed_service_bit7_node = self.nodes[0].add_p2p_connection(P2PInterface(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_7) allowed_service_bit7_node = self.nodes[0].add_p2p_connection(P2PInterface(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_7)
NetworkThread().start() # Network thread stopped when all previous P2PInterfaces disconnected. Restart it # Network thread stopped when all previous P2PInterfaces disconnected. Restart it
network_thread_start()
wait_until(lambda: allowed_service_bit5_node.message_count["verack"], lock=mininode_lock) wait_until(lambda: allowed_service_bit5_node.message_count["verack"], lock=mininode_lock)
wait_until(lambda: allowed_service_bit7_node.message_count["verack"], lock=mininode_lock) wait_until(lambda: allowed_service_bit7_node.message_count["verack"], lock=mininode_lock)

View File

@ -21,7 +21,7 @@ class P2PMempoolTests(BitcoinTestFramework):
def run_test(self): def run_test(self):
# Add a p2p connection # Add a p2p connection
self.nodes[0].add_p2p_connection(P2PInterface()) self.nodes[0].add_p2p_connection(P2PInterface())
NetworkThread().start() network_thread_start()
self.nodes[0].p2p.wait_for_verack() self.nodes[0].p2p.wait_for_verack()
#request mempool #request mempool

View File

@ -1882,7 +1882,7 @@ class SegWitTest(BitcoinTestFramework):
# self.std_node is for testing node1 (fRequireStandard=true) # self.std_node is for testing node1 (fRequireStandard=true)
self.std_node = self.nodes[1].add_p2p_connection(TestNode(), services=NODE_NETWORK|NODE_WITNESS) self.std_node = self.nodes[1].add_p2p_connection(TestNode(), services=NODE_NETWORK|NODE_WITNESS)
NetworkThread().start() # Start up network handling in another thread network_thread_start()
# Keep a place to store utxo's that can be used in later tests # Keep a place to store utxo's that can be used in later tests
self.utxo = [] self.utxo = []

View File

@ -43,7 +43,7 @@ class TimeoutsTest(BitcoinTestFramework):
no_version_node = self.nodes[0].add_p2p_connection(TestNode(), send_version=False) no_version_node = self.nodes[0].add_p2p_connection(TestNode(), send_version=False)
no_send_node = self.nodes[0].add_p2p_connection(TestNode(), send_version=False) no_send_node = self.nodes[0].add_p2p_connection(TestNode(), send_version=False)
NetworkThread().start() # Start up network handling in another thread network_thread_start()
sleep(1) sleep(1)

View File

@ -66,7 +66,7 @@ class VersionBitsWarningTest(BitcoinTestFramework):
# Setup the p2p connection and start up the network thread. # Setup the p2p connection and start up the network thread.
self.nodes[0].add_p2p_connection(TestNode()) self.nodes[0].add_p2p_connection(TestNode())
NetworkThread().start() # Start up network handling in another thread network_thread_start()
# Test logic begins here # Test logic begins here
self.nodes[0].p2p.wait_for_verack() self.nodes[0].p2p.wait_for_verack()

View File

@ -90,7 +90,7 @@ from test_framework.mininode import (
CBlockHeader, CBlockHeader,
CInv, CInv,
NODE_WITNESS, NODE_WITNESS,
NetworkThread, network_thread_start,
P2PInterface, P2PInterface,
mininode_lock, mininode_lock,
msg_block, msg_block,
@ -238,7 +238,7 @@ class SendHeadersTest(BitcoinTestFramework):
# will occur outside of direct fetching # will occur outside of direct fetching
test_node = self.nodes[0].add_p2p_connection(BaseNode(), services=NODE_WITNESS) test_node = self.nodes[0].add_p2p_connection(BaseNode(), services=NODE_WITNESS)
NetworkThread().start() # Start up network handling in another thread network_thread_start()
# Test logic begins here # Test logic begins here
inv_node.wait_for_verack() inv_node.wait_for_verack()

View File

@ -18,7 +18,7 @@ import logging
import socket import socket
import struct import struct
import sys import sys
from threading import RLock, Thread import threading
from test_framework.messages import * from test_framework.messages import *
from test_framework.util import wait_until from test_framework.util import wait_until
@ -69,6 +69,10 @@ class P2PConnection(asyncore.dispatcher):
sub-classed and the on_message() callback overridden.""" sub-classed and the on_message() callback overridden."""
def __init__(self): def __init__(self):
# All P2PConnections must be created before starting the NetworkThread.
# assert that the network thread is not running.
assert not network_thread_running()
super().__init__(map=mininode_socket_map) super().__init__(map=mininode_socket_map)
def peer_connect(self, dstaddr, dstport, net="regtest"): def peer_connect(self, dstaddr, dstport, net="regtest"):
@ -397,9 +401,12 @@ mininode_socket_map = dict()
# and whenever adding anything to the send buffer (in send_message()). This # and whenever adding anything to the send buffer (in send_message()). This
# lock should be acquired in the thread running the test logic to synchronize # lock should be acquired in the thread running the test logic to synchronize
# access to any data shared with the P2PInterface or P2PConnection. # access to any data shared with the P2PInterface or P2PConnection.
mininode_lock = RLock() mininode_lock = threading.RLock()
class NetworkThread(threading.Thread):
def __init__(self):
super().__init__(name="NetworkThread")
class NetworkThread(Thread):
def run(self): def run(self):
while mininode_socket_map: while mininode_socket_map:
# We check for whether to disconnect outside of the asyncore # We check for whether to disconnect outside of the asyncore
@ -412,3 +419,24 @@ class NetworkThread(Thread):
[obj.handle_close() for obj in disconnected] [obj.handle_close() for obj in disconnected]
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1)
logger.debug("Network thread closing") logger.debug("Network thread closing")
def network_thread_start():
"""Start the network thread."""
# Only one network thread may run at a time
assert not network_thread_running()
NetworkThread().start()
def network_thread_running():
"""Return whether the network thread is running."""
return any([thread.name == "NetworkThread" for thread in threading.enumerate()])
def network_thread_join(timeout=10):
"""Wait timeout seconds for the network thread to terminate.
Throw if the network thread doesn't terminate in timeout seconds."""
network_threads = [thread for thread in threading.enumerate() if thread.name == "NetworkThread"]
assert len(network_threads) <= 1
for thread in network_threads:
thread.join(timeout)
assert not thread.is_alive()