@ -20,21 +20,22 @@ msg_block, msg_tx, msg_headers, etc.:
@@ -20,21 +20,22 @@ msg_block, msg_tx, msg_headers, etc.:
ser_ * , deser_ * : functions that handle serialization / deserialization
"""
import struct
import socket
import asyncore
import time
import sys
import random
from . util import hex_str_to_bytes , bytes_to_hex_str
from io import BytesIO
from codecs import encode
from collections import defaultdict
import copy
import hashlib
from threading import RLock
from threading import Thread
from io import BytesIO
import logging
import copy
import random
import socket
import struct
import sys
import time
from threading import RLock , Thread
from test_framework . siphash import siphash256
from test_framework . util import hex_str_to_bytes , bytes_to_hex_str
BIP0031_VERSION = 60000
MY_VERSION = 70014 # past bip-31 for ping/pong
@ -1465,30 +1466,57 @@ class msg_witness_blocktxn(msg_blocktxn):
@@ -1465,30 +1466,57 @@ class msg_witness_blocktxn(msg_blocktxn):
r + = self . block_transactions . serialize ( with_witness = True )
return r
# This is what a callback should look like for NodeConn
# Reimplement the on_* functions to provide handling for events
class NodeConnCB ( object ) :
""" Callback and helper functions for P2P connection to a bitcoind node.
Individual testcases should subclass this and override the on_ * methods
if they want to alter message handling behaviour .
"""
def __init__ ( self ) :
self . verack_received = False
# Track whether we have a P2P connection open to the node
self . connected = False
self . connection = None
# Track number of messages of each type received and the most recent
# message of each type
self . message_count = defaultdict ( int )
self . last_message = { }
# A count of the number of ping messages we've sent to the node
self . ping_counter = 1
# deliver_sleep_time is helpful for debugging race conditions in p2p
# tests; it causes message delivery to sleep for the specified time
# before acquiring the global lock and delivering the next message.
self . deliver_sleep_time = None
# Remember the services our peer has advertised
self . peer_services = None
self . connection = None
self . ping_counter = 1
self . last_pong = msg_pong ( )
# Message receiving methods
def deliver ( self , conn , message ) :
""" Receive message and dispatch message to appropriate callback.
We keep a count of how many of each message type has been received
and the most recent message of each type .
Optionally waits for deliver_sleep_time before dispatching message .
"""
deliver_sleep = self . get_deliver_sleep_time ( )
if deliver_sleep is not None :
time . sleep ( deliver_sleep )
with mininode_lock :
try :
getattr ( self , ' on_ ' + message . command . decode ( ' ascii ' ) ) ( conn , message )
command = message . command . decode ( ' ascii ' )
self . message_count [ command ] + = 1
self . last_message [ command ] = message
getattr ( self , ' on_ ' + command ) ( conn , message )
except :
logger . exception ( " ERROR delivering %s " % repr ( message ) )
print ( " ERROR delivering %s ( %s ) " % ( repr ( message ) ,
sys . exc_info ( ) [ 0 ] ) )
def set_deliver_sleep_time ( self , value ) :
with mininode_lock :
@ -1498,14 +1526,20 @@ class NodeConnCB(object):
@@ -1498,14 +1526,20 @@ class NodeConnCB(object):
with mininode_lock :
return self . deliver_sleep_time
# Callbacks which can be overridden by subclasses
#################################################
# Callback methods. Can be overridden by subclasses in individual test
# cases to provide custom message handling behaviour.
def on_open ( self , conn ) :
self . connected = True
def on_close ( self , conn ) :
self . connected = False
self . connection = None
def on_addr ( self , conn , message ) : pass
def on_alert ( self , conn , message ) : pass
def on_block ( self , conn , message ) : pass
def on_blocktxn ( self , conn , message ) : pass
def on_close ( self , conn ) : pass
def on_cmpctblock ( self , conn , message ) : pass
def on_feefilter ( self , conn , message ) : pass
def on_getaddr ( self , conn , message ) : pass
@ -1515,7 +1549,7 @@ class NodeConnCB(object):
@@ -1515,7 +1549,7 @@ class NodeConnCB(object):
def on_getheaders ( self , conn , message ) : pass
def on_headers ( self , conn , message ) : pass
def on_mempool ( self , conn ) : pass
def on_open ( self , conn ) : pass
def on_pong ( self , conn , message ) : pass
def on_reject ( self , conn , message ) : pass
def on_sendcmpct ( self , conn , message ) : pass
def on_sendheaders ( self , conn , message ) : pass
@ -1533,9 +1567,6 @@ class NodeConnCB(object):
@@ -1533,9 +1567,6 @@ class NodeConnCB(object):
if conn . ver_send > BIP0031_VERSION :
conn . send_message ( msg_pong ( message . nonce ) )
def on_pong ( self , conn , message ) :
self . last_pong = message
def on_verack ( self , conn , message ) :
conn . ver_recv = conn . ver_send
self . verack_received = True
@ -1548,15 +1579,53 @@ class NodeConnCB(object):
@@ -1548,15 +1579,53 @@ class NodeConnCB(object):
conn . ver_recv = conn . ver_send
conn . nServices = message . nServices
# Helper functions
##################
# Connection helper methods
def add_connection ( self , conn ) :
self . connection = conn
# Wrapper for the NodeConn's send_message function
def wait_for_disconnect ( self , timeout = 60 ) :
test_function = lambda : not self . connected
assert wait_until ( test_function , timeout = timeout )
# Message receiving helper methods
def sync ( self , test_function , timeout = 60 ) :
while timeout > 0 :
with mininode_lock :
if test_function ( ) :
return
time . sleep ( 0.05 )
timeout - = 0.05
raise AssertionError ( " Sync failed to complete " )
def wait_for_block ( self , blockhash , timeout = 60 ) :
test_function = lambda : self . last_message . get ( " block " ) and self . last_message [ " block " ] . block . rehash ( ) == blockhash
self . sync ( test_function , timeout )
def wait_for_getdata ( self , timeout = 60 ) :
test_function = lambda : self . last_message . get ( " getdata " )
self . sync ( test_function , timeout )
def wait_for_getheaders ( self , timeout = 60 ) :
test_function = lambda : self . last_message . get ( " getheaders " )
self . sync ( test_function , timeout )
def wait_for_inv ( self , expected_inv , timeout = 60 ) :
test_function = lambda : self . last_message . get ( " inv " ) and self . last_message [ " inv " ] != expected_inv
self . sync ( test_function , timeout )
def wait_for_verack ( self , timeout = 60 ) :
test_function = lambda : self . message_count [ " verack " ]
self . sync ( test_function , timeout )
# Message sending helper functions
def send_message ( self , message ) :
self . connection . send_message ( message )
if self . connection :
self . connection . send_message ( message )
else :
logger . error ( " Cannot send message. No connection to node! " )
def send_and_ping ( self , message ) :
self . send_message ( message )
@ -1564,28 +1633,15 @@ class NodeConnCB(object):
@@ -1564,28 +1633,15 @@ class NodeConnCB(object):
# Sync up with the node
def sync_with_ping ( self , timeout = 60 ) :
def received_pong ( ) :
return ( self . last_pong . nonce == self . ping_counter )
self . send_message ( msg_ping ( nonce = self . ping_counter ) )
success = wait_until ( received_pong , timeout = timeout )
test_function = lambda : self . last_message . get ( " pong " ) and self . last_message [ " pong " ] . nonce == self . ping_counter
success = wait_until ( test_function , timeout = timeout )
if not success :
logger . error ( " sync_with_ping failed! " )
raise AssertionError ( " sync_with_ping failed! " )
self . ping_counter + = 1
return success
# Spin until verack message is received from the node.
# Tests may want to use this as a signal that the test can begin.
# This can be called from the testing thread, so it needs to acquire the
# global lock.
def wait_for_verack ( self ) :
while True :
with mininode_lock :
if self . verack_received :
return
time . sleep ( 0.05 )
# The actual NodeConn class
# This class provides an interface for a p2p connection to a specified node
class NodeConn ( asyncore . dispatcher ) :