@ -2,7 +2,7 @@
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
""" Test the ZMQ API . """
""" Test the ZMQ notification interface . """
import configparser
import configparser
import os
import os
import struct
import struct
@ -13,6 +13,25 @@ from test_framework.util import (assert_equal,
hash256 ,
hash256 ,
)
)
class ZMQSubscriber :
def __init__ ( self , socket , topic ) :
self . sequence = 0
self . socket = socket
self . topic = topic
import zmq
self . socket . setsockopt ( zmq . SUBSCRIBE , self . topic )
def receive ( self ) :
topic , body , seq = self . socket . recv_multipart ( )
# Topic should match the subscriber topic.
assert_equal ( topic , self . topic )
# Sequence should be incremental.
assert_equal ( struct . unpack ( ' <I ' , seq ) [ - 1 ] , self . sequence )
self . sequence + = 1
return body
class ZMQTest ( BitcoinTestFramework ) :
class ZMQTest ( BitcoinTestFramework ) :
def set_test_params ( self ) :
def set_test_params ( self ) :
self . num_nodes = 2
self . num_nodes = 2
@ -24,26 +43,33 @@ class ZMQTest (BitcoinTestFramework):
except ImportError :
except ImportError :
raise SkipTest ( " python3-zmq module not available. " )
raise SkipTest ( " python3-zmq module not available. " )
# Check that bitcoin has been built with ZMQ enabled
# Check that bitcoin has been built with ZMQ enabled.
config = configparser . ConfigParser ( )
config = configparser . ConfigParser ( )
if not self . options . configfile :
if not self . options . configfile :
self . options . configfile = os . path . dirname ( __file__ ) + " / ../config.ini"
self . options . configfile = os . path . abspath ( os . path . join ( os . path . dirname ( __file__ ) , " ../config.ini " ) )
config . read_file ( open ( self . options . configfile ) )
config . read_file ( open ( self . options . configfile ) )
if not config [ " components " ] . getboolean ( " ENABLE_ZMQ " ) :
if not config [ " components " ] . getboolean ( " ENABLE_ZMQ " ) :
raise SkipTest ( " bitcoind has not been built with zmq enabled. " )
raise SkipTest ( " bitcoind has not been built with zmq enabled. " )
self . zmqContext = zmq . Context ( )
# Initialize ZMQ context and socket.
self . zmqSubSocket = self . zmqContext . socket ( zmq . SUB )
# All messages are received in the same socket which means
self . zmqSubSocket . set ( zmq . RCVTIMEO , 60000 )
# that this test fails if the publishing order changes.
self . zmqSubSocket . setsockopt ( zmq . SUBSCRIBE , b " hashblock " )
# Note that the publishing order is not defined in the documentation and
self . zmqSubSocket . setsockopt ( zmq . SUBSCRIBE , b " hashtx " )
# is subject to change.
self . zmqSubSocket . setsockopt ( zmq . SUBSCRIBE , b " rawblock " )
address = " tcp://127.0.0.1:28332 "
self . zmqSubSocket . setsockopt ( zmq . SUBSCRIBE , b " rawtx " )
self . zmq_context = zmq . Context ( )
ip_address = " tcp://127.0.0.1:28332 "
socket = self . zmq_context . socket ( zmq . SUB )
self . zmqSubSocket . connect ( ip_address )
socket . set ( zmq . RCVTIMEO , 60000 )
self . extra_args = [ [ ' -zmqpubhashblock= %s ' % ip_address , ' -zmqpubhashtx= %s ' % ip_address ,
socket . connect ( address )
' -zmqpubrawblock= %s ' % ip_address , ' -zmqpubrawtx= %s ' % ip_address ] , [ ] ]
# Subscribe to all available topics.
self . hashblock = ZMQSubscriber ( socket , b " hashblock " )
self . hashtx = ZMQSubscriber ( socket , b " hashtx " )
self . rawblock = ZMQSubscriber ( socket , b " rawblock " )
self . rawtx = ZMQSubscriber ( socket , b " rawtx " )
self . extra_args = [ [ " -zmqpub %s = %s " % ( sub . topic . decode ( ) , address ) for sub in [ self . hashblock , self . hashtx , self . rawblock , self . rawtx ] ] , [ ] ]
self . add_nodes ( self . num_nodes , self . extra_args )
self . add_nodes ( self . num_nodes , self . extra_args )
self . start_nodes ( )
self . start_nodes ( )
@ -51,103 +77,45 @@ class ZMQTest (BitcoinTestFramework):
try :
try :
self . _zmq_test ( )
self . _zmq_test ( )
finally :
finally :
# Destroy the zmq context
# Destroy the ZMQ context.
self . log . debug ( " Destroying zmq context " )
self . log . debug ( " Destroying ZMQ context " )
self . zmqC ontext . destroy ( linger = None )
self . zmq_c ontext . destroy ( linger = None )
def _zmq_test ( self ) :
def _zmq_test ( self ) :
genhashes = self . nodes [ 0 ] . generate ( 1 )
num_blocks = 5
self . log . info ( " Generate %(n)d blocks (and %(n)d coinbase txes) " % { " n " : num_blocks } )
genhashes = self . nodes [ 0 ] . generate ( num_blocks )
self . sync_all ( )
self . sync_all ( )
self . log . info ( " Wait for tx " )
for x in range ( num_blocks ) :
msg = self . zmqSubSocket . recv_multipart ( )
# Should receive the coinbase txid.
topic = msg [ 0 ]
txid = self . hashtx . receive ( )
assert_equal ( topic , b " hashtx " )
txhash = msg [ 1 ]
# Should receive the coinbase raw transaction.
msgSequence = struct . unpack ( ' <I ' , msg [ - 1 ] ) [ - 1 ]
hex = self . rawtx . receive ( )
assert_equal ( msgSequence , 0 ) # must be sequence 0 on hashtx
assert_equal ( hash256 ( hex ) , txid )
# rawtx
msg = self . zmqSubSocket . recv_multipart ( )
topic = msg [ 0 ]
assert_equal ( topic , b " rawtx " )
body = msg [ 1 ]
msgSequence = struct . unpack ( ' <I ' , msg [ - 1 ] ) [ - 1 ]
assert_equal ( msgSequence , 0 ) # must be sequence 0 on rawtx
# Check that the rawtx hashes to the hashtx
assert_equal ( hash256 ( body ) , txhash )
self . log . info ( " Wait for block " )
msg = self . zmqSubSocket . recv_multipart ( )
topic = msg [ 0 ]
assert_equal ( topic , b " hashblock " )
body = msg [ 1 ]
msgSequence = struct . unpack ( ' <I ' , msg [ - 1 ] ) [ - 1 ]
assert_equal ( msgSequence , 0 ) # must be sequence 0 on hashblock
blkhash = bytes_to_hex_str ( body )
assert_equal ( genhashes [ 0 ] , blkhash ) # blockhash from generate must be equal to the hash received over zmq
# rawblock
msg = self . zmqSubSocket . recv_multipart ( )
topic = msg [ 0 ]
assert_equal ( topic , b " rawblock " )
body = msg [ 1 ]
msgSequence = struct . unpack ( ' <I ' , msg [ - 1 ] ) [ - 1 ]
assert_equal ( msgSequence , 0 ) #must be sequence 0 on rawblock
# Check the hash of the rawblock's header matches generate
assert_equal ( genhashes [ 0 ] , bytes_to_hex_str ( hash256 ( body [ : 80 ] ) ) )
self . log . info ( " Generate 10 blocks (and 10 coinbase txes) " )
n = 10
genhashes = self . nodes [ 1 ] . generate ( n )
self . sync_all ( )
zmqHashes = [ ]
# Should receive the generated block hash.
zmqRawHashed = [ ]
hash = bytes_to_hex_str ( self . hashblock . receive ( ) )
blockcount = 0
assert_equal ( genhashes [ x ] , hash )
for x in range ( n * 4 ) :
# The block should only have the coinbase txid.
msg = self . zmqSubSocket . recv_multipart ( )
assert_equal ( [ bytes_to_hex_str ( txid ) ] , self . nodes [ 1 ] . getblock ( hash ) [ " tx " ] )
topic = msg [ 0 ]
body = msg [ 1 ]
# Should receive the generated raw block.
if topic == b " hashblock " :
block = self . rawblock . receive ( )
zmqHashes . append ( bytes_to_hex_str ( body ) )
assert_equal ( genhashes [ x ] , bytes_to_hex_str ( hash256 ( block [ : 80 ] ) ) )
msgSequence = struct . unpack ( ' <I ' , msg [ - 1 ] ) [ - 1 ]
assert_equal ( msgSequence , blockcount + 1 )
blockcount + = 1
if topic == b " rawblock " :
zmqRawHashed . append ( bytes_to_hex_str ( hash256 ( body [ : 80 ] ) ) )
msgSequence = struct . unpack ( ' <I ' , msg [ - 1 ] ) [ - 1 ]
assert_equal ( msgSequence , blockcount )
for x in range ( n ) :
assert_equal ( genhashes [ x ] , zmqHashes [ x ] ) # blockhash from generate must be equal to the hash received over zmq
assert_equal ( genhashes [ x ] , zmqRawHashed [ x ] )
self . log . info ( " Wait for tx from second node " )
self . log . info ( " Wait for tx from second node " )
# test tx from a second node
payment_txid = self . nodes [ 1 ] . sendtoaddress ( self . nodes [ 0 ] . getnewaddress ( ) , 1.0 )
hashRPC = self . nodes [ 1 ] . sendtoaddress ( self . nodes [ 0 ] . getnewaddress ( ) , 1.0 )
self . sync_all ( )
self . sync_all ( )
# now we should receive a zmq msg because the tx was broadcast
# Should receive the broadcasted txid.
msg = self . zmqSubSocket . recv_multipart ( )
txid = self . hashtx . receive ( )
topic = msg [ 0 ]
assert_equal ( payment_txid , bytes_to_hex_str ( txid ) )
assert_equal ( topic , b " hashtx " )
body = msg [ 1 ]
# Should receive the broadcasted raw transaction.
hashZMQ = bytes_to_hex_str ( body )
hex = self . rawtx . receive ( )
msgSequence = struct . unpack ( ' <I ' , msg [ - 1 ] ) [ - 1 ]
assert_equal ( payment_txid , bytes_to_hex_str ( hash256 ( hex ) ) )
assert_equal ( msgSequence , blockcount + 1 )
msg = self . zmqSubSocket . recv_multipart ( )
topic = msg [ 0 ]
assert_equal ( topic , b " rawtx " )
body = msg [ 1 ]
hashedZMQ = bytes_to_hex_str ( hash256 ( body ) )
msgSequence = struct . unpack ( ' <I ' , msg [ - 1 ] ) [ - 1 ]
assert_equal ( msgSequence , blockcount + 1 )
assert_equal ( hashRPC , hashZMQ ) # txid from sendtoaddress must be equal to the hash received over zmq
assert_equal ( hashRPC , hashedZMQ )
if __name__ == ' __main__ ' :
if __name__ == ' __main__ ' :
ZMQTest ( ) . main ( )
ZMQTest ( ) . main ( )