Browse Source

Merge pull request #6103

0.13
Jeff Garzik 9 years ago
parent
commit
13b828270a
Failed to extract signature
  1. 22
      configure.ac
  2. 37
      contrib/zmq/zmq_sub.py
  3. 2
      depends/packages/packages.mk
  4. 26
      depends/packages/zeromq.mk
  5. 98
      doc/zmq.md
  6. 4
      qa/pull-tester/rpc-tests.sh
  7. 1
      qa/pull-tester/tests-config.sh.in
  8. 93
      qa/rpc-tests/zmq_test.py
  9. 26
      src/Makefile.am
  10. 3
      src/Makefile.qt.include
  11. 3
      src/Makefile.qttest.include
  12. 4
      src/Makefile.test.include
  13. 36
      src/init.cpp
  14. 1
      src/main.cpp
  15. 3
      src/validationinterface.cpp
  16. 3
      src/validationinterface.h
  17. 22
      src/zmq/zmqabstractnotifier.cpp
  18. 42
      src/zmq/zmqabstractnotifier.h
  19. 24
      src/zmq/zmqconfig.h
  20. 155
      src/zmq/zmqnotificationinterface.cpp
  21. 35
      src/zmq/zmqnotificationinterface.h
  22. 172
      src/zmq/zmqpublishnotifier.cpp
  23. 41
      src/zmq/zmqpublishnotifier.h

22
configure.ac

@ -137,6 +137,12 @@ AC_ARG_ENABLE([glibc-back-compat],
[use_glibc_compat=$enableval], [use_glibc_compat=$enableval],
[use_glibc_compat=no]) [use_glibc_compat=no])
AC_ARG_ENABLE([zmq],
[AC_HELP_STRING([--disable-zmq],
[Disable ZMQ notifications])],
[use_zmq=$enableval],
[use_zmq=yes])
AC_ARG_WITH([protoc-bindir],[AS_HELP_STRING([--with-protoc-bindir=BIN_DIR],[specify protoc bin path])], [protoc_bin_path=$withval], []) AC_ARG_WITH([protoc-bindir],[AS_HELP_STRING([--with-protoc-bindir=BIN_DIR],[specify protoc bin path])], [protoc_bin_path=$withval], [])
# Enable debug # Enable debug
@ -833,6 +839,22 @@ if test x$bitcoin_enable_qt != xno; then
fi fi
fi fi
# conditional search for and use libzmq
AC_MSG_CHECKING([whether to build ZMQ support])
if test "x$use_zmq" = "xyes"; then
AC_MSG_RESULT([yes])
PKG_CHECK_MODULES([ZMQ],[libzmq],
[AC_DEFINE([ENABLE_ZMQ],[1],[Define to 1 to enable ZMQ functions])],
[AC_DEFINE([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])
AC_MSG_WARN([libzmq not found, disabling])
use_zmq=no])
else
AC_MSG_RESULT([no, --disable-zmq used])
AC_DEFINE_UNQUOTED([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])
fi
AM_CONDITIONAL([ENABLE_ZMQ], [test "x$use_zmq" = "xyes"])
AC_MSG_CHECKING([whether to build test_bitcoin]) AC_MSG_CHECKING([whether to build test_bitcoin])
if test x$use_tests = xyes; then if test x$use_tests = xyes; then
AC_MSG_RESULT([yes]) AC_MSG_RESULT([yes])

37
contrib/zmq/zmq_sub.py

@ -0,0 +1,37 @@
#!/usr/bin/env python2
import array
import binascii
import zmq
port = 28332
zmqContext = zmq.Context()
zmqSubSocket = zmqContext.socket(zmq.SUB)
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx")
zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
try:
while True:
msg = zmqSubSocket.recv_multipart()
topic = str(msg[0])
body = msg[1]
if topic == "hashblock":
print "- HASH BLOCK -"
print binascii.hexlify(body)
elif topic == "hashtx":
print '- HASH TX -'
print binascii.hexlify(body)
elif topic == "rawblock":
print "- RAW BLOCK HEADER -"
print binascii.hexlify(body[:80])
elif topic == "rawtx":
print '- RAW TX -'
print binascii.hexlify(body)
except KeyboardInterrupt:
zmqContext.destroy()

2
depends/packages/packages.mk

@ -1,4 +1,6 @@
packages:=boost openssl libevent packages:=boost openssl libevent
packages_darwin:=zeromq
packages_linux:=zeromq
native_packages := native_ccache native_comparisontool native_packages := native_ccache native_comparisontool
qt_native_packages = native_protobuf qt_native_packages = native_protobuf

26
depends/packages/zeromq.mk

@ -0,0 +1,26 @@
package=zeromq
$(package)_version=4.0.4
$(package)_download_path=http://download.zeromq.org
$(package)_file_name=$(package)-$($(package)_version).tar.gz
$(package)_sha256_hash=1ef71d46e94f33e27dd5a1661ed626cd39be4d2d6967792a275040e34457d399
define $(package)_set_vars
$(package)_config_opts=--without-documentation --disable-shared
$(package)_config_opts_linux=--with-pic
endef
define $(package)_config_cmds
$($(package)_autoconf)
endef
define $(package)_build_cmds
$(MAKE) -C src
endef
define $(package)_stage_cmds
$(MAKE) -C src DESTDIR=$($(package)_staging_dir) install
endef
define $(package)_postprocess_cmds
rm -rf bin share
endef

98
doc/zmq.md

@ -0,0 +1,98 @@
# Block and Transaction Broadcasting With ZeroMQ
[ZeroMQ](http://zeromq.org/) is a lightweight wrapper around TCP
connections, inter-process communications, and shared-memory,
providing various message-oriented semantics such as publish/subcribe,
request/reply, and push/pull.
The Bitcoin Core daemon can be configured to act as a trusted "border
router", implementing the bitcoin wire protocol and relay, making
consensus decisions, maintaining the local blockchain database,
broadcasting locally generated transactions into the network, and
providing a queryable RPC interface to interact on a polled basis for
requesting blockchain related data. However, there exists only a
limited service to notify external software of events like the arrival
of new blocks or transactions.
The ZeroMQ facility implements a notification interface through a
set of specific notifiers. Currently there are notifiers that publish
blocks and transactions. This read-only facility requires only the
connection of a corresponding ZeroMQ subscriber port in receiving
software; it is not authenticated nor is there any two-way protocol
involvement. Therefore, subscribers should validate the received data
since it may be out of date, incomplete or even invalid.
ZeroMQ sockets are self-connecting and self-healing; that is, connects
made between two endpoints will be automatically restored after an
outage, and either end may be freely started or stopped in any order.
Because ZeroMQ is message oriented, subscribers receive transactions
and blocks all-at-once and do not need to implement any sort of
buffering or reassembly.
## Prerequisites
The ZeroMQ feature in Bitcoin Core uses only a very small part of the
ZeroMQ C API, and is thus compatible with any version of ZeroMQ
from 2.1 onward, including all versions in the 3.x and 4.x release
series. Typically, it is packaged by distributions as something like
*libzmq-dev*.
The C++ wrapper for ZeroMQ is *not* needed.
## Enabling
By default, the ZeroMQ port functionality is enabled. Two steps are
required to enable--compiling in the ZeroMQ code, and configuring
runtime operation on the command-line or configuration file.
$ ./configure --enable-zmq (other options)
This will produce a binary that is capable of providing the ZeroMQ
facility, but will not do so until also configured properly.
## Usage
Currently, the following notifications are supported:
-zmqpubhashtx=address
-zmqpubhashblock=address
-zmqpubrawblock=address
-zmqpubrawtx=address
The socket type is PUB and the address must be a valid ZeroMQ
socket address. The same address can be used in more than one notification.
For instance:
$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw
Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the notification
`-zmqpubhashtx` the topic is `hashtx` (no null terminator) and the body is the
hexadecimal transaction hash (32 bytes).
These options can also be provided in bitcoin.conf.
ZeroMQ endpoint specifiers for TCP (and others) are documented in the
[ZeroMQ API](http://api.zeromq.org).
Client side, then, the ZeroMQ subscriber socket must have the
ZMQ_SUBSCRIBE option set to one or either of these prefixes (for instance, just `hash`); without
doing so will result in no messages arriving. Please see `contrib/zmq/zmq_sub.py`
for a working example.
## Remarks
From the perspective of bitcoind, the ZeroMQ socket is write-only; PUB
sockets don't even have a read function. Thus, there is no state
introduced into bitcoind directly. Furthermore, no information is
broadcast that wasn't already received from the public P2P network.
No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur and just
the tip will be notified. It is up to the subscriber to retrieve the chain
from the last known block to the new tip.

4
qa/pull-tester/rpc-tests.sh

@ -59,6 +59,10 @@ testScriptsExt=(
'p2p-acceptblock.py' 'p2p-acceptblock.py'
); );
if [ "x$ENABLE_ZMQ" = "x1" ]; then
testScripts=( ${testScripts[@]} 'zmq_test.py' )
fi
extArg="-extended" extArg="-extended"
passOn=${@#$extArg} passOn=${@#$extArg}

1
qa/pull-tester/tests-config.sh.in

@ -10,6 +10,7 @@ EXEEXT="@EXEEXT@"
@ENABLE_WALLET_TRUE@ENABLE_WALLET=1 @ENABLE_WALLET_TRUE@ENABLE_WALLET=1
@BUILD_BITCOIN_UTILS_TRUE@ENABLE_UTILS=1 @BUILD_BITCOIN_UTILS_TRUE@ENABLE_UTILS=1
@BUILD_BITCOIND_TRUE@ENABLE_BITCOIND=1 @BUILD_BITCOIND_TRUE@ENABLE_BITCOIND=1
@ENABLE_ZMQ_TRUE@ENABLE_ZMQ=1
REAL_BITCOIND="$BUILDDIR/src/bitcoind${EXEEXT}" REAL_BITCOIND="$BUILDDIR/src/bitcoind${EXEEXT}"
REAL_BITCOINCLI="$BUILDDIR/src/bitcoin-cli${EXEEXT}" REAL_BITCOINCLI="$BUILDDIR/src/bitcoin-cli${EXEEXT}"

93
qa/rpc-tests/zmq_test.py

@ -0,0 +1,93 @@
#!/usr/bin/env python2
# Copyright (c) 2015 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
# Test ZMQ interface
#
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import *
import zmq
import binascii
from test_framework.mininode import hash256
try:
import http.client as httplib
except ImportError:
import httplib
try:
import urllib.parse as urlparse
except ImportError:
import urlparse
class ZMQTest (BitcoinTestFramework):
port = 28332
def setup_nodes(self):
self.zmqContext = zmq.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % self.port)
# Note: proxies are not used to connect to local nodes
# this is because the proxy to use is based on CService.GetNetwork(), which return NET_UNROUTABLE for localhost
return start_nodes(4, self.options.tmpdir, extra_args=[
['-zmqpubhashtx=tcp://127.0.0.1:'+str(self.port), '-zmqpubhashblock=tcp://127.0.0.1:'+str(self.port)],
[],
[],
[]
])
def run_test(self):
self.sync_all()
genhashes = self.nodes[0].generate(1);
self.sync_all()
print "listen..."
msg = self.zmqSubSocket.recv_multipart()
topic = str(msg[0])
body = msg[1]
msg = self.zmqSubSocket.recv_multipart()
topic = str(msg[0])
body = msg[1]
blkhash = binascii.hexlify(body)
assert_equal(genhashes[0], blkhash) #blockhash from generate must be equal to the hash received over zmq
n = 10
genhashes = self.nodes[1].generate(n);
self.sync_all()
zmqHashes = []
for x in range(0,n*2):
msg = self.zmqSubSocket.recv_multipart()
topic = str(msg[0])
body = msg[1]
if topic == "hashblock":
zmqHashes.append(binascii.hexlify(body))
for x in range(0,n):
assert_equal(genhashes[x], zmqHashes[x]) #blockhash from generate must be equal to the hash received over zmq
#test tx from a second node
hashRPC = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
self.sync_all()
#now we should receive a zmq msg because the tx was broadcastet
msg = self.zmqSubSocket.recv_multipart()
topic = str(msg[0])
body = msg[1]
hashZMQ = ""
if topic == "hashtx":
hashZMQ = binascii.hexlify(body)
assert_equal(hashRPC, hashZMQ) #blockhash from generate must be equal to the hash received over zmq
if __name__ == '__main__':
ZMQTest ().main ()

26
src/Makefile.am

@ -48,6 +48,9 @@ if ENABLE_WALLET
BITCOIN_INCLUDES += $(BDB_CPPFLAGS) BITCOIN_INCLUDES += $(BDB_CPPFLAGS)
EXTRA_LIBRARIES += libbitcoin_wallet.a EXTRA_LIBRARIES += libbitcoin_wallet.a
endif endif
if ENABLE_ZMQ
EXTRA_LIBRARIES += libbitcoin_zmq.a
endif
if BUILD_BITCOIN_LIBS if BUILD_BITCOIN_LIBS
lib_LTLIBRARIES = libbitcoinconsensus.la lib_LTLIBRARIES = libbitcoinconsensus.la
@ -157,7 +160,12 @@ BITCOIN_CORE_H = \
wallet/db.h \ wallet/db.h \
wallet/wallet.h \ wallet/wallet.h \
wallet/wallet_ismine.h \ wallet/wallet_ismine.h \
wallet/walletdb.h wallet/walletdb.h \
zmq/zmqabstractnotifier.h \
zmq/zmqconfig.h\
zmq/zmqnotificationinterface.h \
zmq/zmqpublishnotifier.h
obj/build.h: FORCE obj/build.h: FORCE
@$(MKDIR_P) $(builddir)/obj @$(MKDIR_P) $(builddir)/obj
@ -199,6 +207,17 @@ libbitcoin_server_a_SOURCES = \
validationinterface.cpp \ validationinterface.cpp \
$(BITCOIN_CORE_H) $(BITCOIN_CORE_H)
if ENABLE_ZMQ
LIBBITCOIN_ZMQ=libbitcoin_zmq.a
libbitcoin_zmq_a_CPPFLAGS = $(BITCOIN_INCLUDES)
libbitcoin_zmq_a_SOURCES = \
zmq/zmqabstractnotifier.cpp \
zmq/zmqnotificationinterface.cpp \
zmq/zmqpublishnotifier.cpp
endif
# wallet: shared between bitcoind and bitcoin-qt, but only linked # wallet: shared between bitcoind and bitcoin-qt, but only linked
# when wallet enabled # when wallet enabled
libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES) libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES)
@ -320,12 +339,15 @@ bitcoind_LDADD = \
$(LIBMEMENV) \ $(LIBMEMENV) \
$(LIBSECP256K1) $(LIBSECP256K1)
if ENABLE_ZMQ
bitcoind_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
endif
if ENABLE_WALLET if ENABLE_WALLET
bitcoind_LDADD += libbitcoin_wallet.a bitcoind_LDADD += libbitcoin_wallet.a
endif endif
bitcoind_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) bitcoind_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)
#
# bitcoin-cli binary # # bitcoin-cli binary #
bitcoin_cli_SOURCES = bitcoin-cli.cpp bitcoin_cli_SOURCES = bitcoin-cli.cpp

3
src/Makefile.qt.include

@ -361,6 +361,9 @@ qt_bitcoin_qt_LDADD = qt/libbitcoinqt.a $(LIBBITCOIN_SERVER)
if ENABLE_WALLET if ENABLE_WALLET
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET) qt_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
endif endif
if ENABLE_ZMQ
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
endif
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \ qt_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \
$(BOOST_LIBS) $(QT_LIBS) $(QT_DBUS_LIBS) $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \ $(BOOST_LIBS) $(QT_LIBS) $(QT_DBUS_LIBS) $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \
$(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)

3
src/Makefile.qttest.include

@ -30,6 +30,9 @@ qt_test_test_bitcoin_qt_LDADD = $(LIBBITCOINQT) $(LIBBITCOIN_SERVER)
if ENABLE_WALLET if ENABLE_WALLET
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET) qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
endif endif
if ENABLE_ZMQ
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
endif
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) \ qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) \
$(LIBMEMENV) $(BOOST_LIBS) $(QT_DBUS_LIBS) $(QT_TEST_LIBS) $(QT_LIBS) \ $(LIBMEMENV) $(BOOST_LIBS) $(QT_DBUS_LIBS) $(QT_TEST_LIBS) $(QT_LIBS) \
$(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \ $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \

4
src/Makefile.test.include

@ -100,6 +100,10 @@ endif
test_test_bitcoin_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) test_test_bitcoin_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS)
test_test_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static test_test_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static
if ENABLE_ZMQ
test_test_bitcoin_LDADD += $(ZMQ_LIBS)
endif
nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES) nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES)
$(BITCOIN_TESTS): $(GENERATED_TEST_FILES) $(BITCOIN_TESTS): $(GENERATED_TEST_FILES)

36
src/init.cpp

@ -38,7 +38,6 @@
#include "wallet/wallet.h" #include "wallet/wallet.h"
#include "wallet/walletdb.h" #include "wallet/walletdb.h"
#endif #endif
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
@ -55,6 +54,10 @@
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <openssl/crypto.h> #include <openssl/crypto.h>
#if ENABLE_ZMQ
#include "zmq/zmqnotificationinterface.h"
#endif
using namespace std; using namespace std;
#ifdef ENABLE_WALLET #ifdef ENABLE_WALLET
@ -62,6 +65,10 @@ CWallet* pwalletMain = NULL;
#endif #endif
bool fFeeEstimatesInitialized = false; bool fFeeEstimatesInitialized = false;
#if ENABLE_ZMQ
static CZMQNotificationInterface* pzmqNotificationInterface = NULL;
#endif
#ifdef WIN32 #ifdef WIN32
// Win32 LevelDB doesn't use filedescriptors, and the ones used for // Win32 LevelDB doesn't use filedescriptors, and the ones used for
// accessing block files don't count towards the fd_set size limit // accessing block files don't count towards the fd_set size limit
@ -211,6 +218,16 @@ void Shutdown()
if (pwalletMain) if (pwalletMain)
pwalletMain->Flush(true); pwalletMain->Flush(true);
#endif #endif
#if ENABLE_ZMQ
if (pzmqNotificationInterface) {
UnregisterValidationInterface(pzmqNotificationInterface);
pzmqNotificationInterface->Shutdown();
delete pzmqNotificationInterface;
pzmqNotificationInterface = NULL;
}
#endif
#ifndef WIN32 #ifndef WIN32
try { try {
boost::filesystem::remove(GetPidFile()); boost::filesystem::remove(GetPidFile());
@ -375,6 +392,14 @@ std::string HelpMessage(HelpMessageMode mode)
" " + _("(1 = keep tx meta data e.g. account owner and payment request information, 2 = drop tx meta data)")); " " + _("(1 = keep tx meta data e.g. account owner and payment request information, 2 = drop tx meta data)"));
#endif #endif
#if ENABLE_ZMQ
strUsage += HelpMessageGroup(_("ZeroMQ notification options:"));
strUsage += HelpMessageOpt("-zmqpubhashblock=<address>", _("Enable publish hash block in <address>"));
strUsage += HelpMessageOpt("-zmqpubhashtransaction=<address>", _("Enable publish hash transaction in <address>"));
strUsage += HelpMessageOpt("-zmqpubrawblock=<address>", _("Enable publish raw block in <address>"));
strUsage += HelpMessageOpt("-zmqpubrawtransaction=<address>", _("Enable publish raw transaction in <address>"));
#endif
strUsage += HelpMessageGroup(_("Debugging/Testing options:")); strUsage += HelpMessageGroup(_("Debugging/Testing options:"));
if (showDebug) if (showDebug)
{ {
@ -1125,6 +1150,15 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
BOOST_FOREACH(const std::string& strDest, mapMultiArgs["-seednode"]) BOOST_FOREACH(const std::string& strDest, mapMultiArgs["-seednode"])
AddOneShot(strDest); AddOneShot(strDest);
#if ENABLE_ZMQ
pzmqNotificationInterface = CZMQNotificationInterface::CreateWithArguments(mapArgs);
if (pzmqNotificationInterface) {
pzmqNotificationInterface->Initialize();
RegisterValidationInterface(pzmqNotificationInterface);
}
#endif
// ********************************************************* Step 7: load block chain // ********************************************************* Step 7: load block chain
fReindex = GetBoolArg("-reindex", false); fReindex = GetBoolArg("-reindex", false);

1
src/main.cpp

@ -2303,6 +2303,7 @@ bool ActivateBestChain(CValidationState &state, const CBlock *pblock) {
pnode->PushInventory(CInv(MSG_BLOCK, hashNewTip)); pnode->PushInventory(CInv(MSG_BLOCK, hashNewTip));
} }
// Notify external listeners about the new tip. // Notify external listeners about the new tip.
GetMainSignals().UpdatedBlockTip(hashNewTip);
uiInterface.NotifyBlockTip(hashNewTip); uiInterface.NotifyBlockTip(hashNewTip);
} }
} while(pindexMostWork != chainActive.Tip()); } while(pindexMostWork != chainActive.Tip());

3
src/validationinterface.cpp

@ -13,6 +13,7 @@ CMainSignals& GetMainSignals()
} }
void RegisterValidationInterface(CValidationInterface* pwalletIn) { void RegisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.UpdatedBlockTip.connect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1));
g_signals.SyncTransaction.connect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2)); g_signals.SyncTransaction.connect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2));
g_signals.UpdatedTransaction.connect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1)); g_signals.UpdatedTransaction.connect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
g_signals.SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1)); g_signals.SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
@ -32,6 +33,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.SetBestChain.disconnect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1)); g_signals.SetBestChain.disconnect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
g_signals.UpdatedTransaction.disconnect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1)); g_signals.UpdatedTransaction.disconnect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
g_signals.SyncTransaction.disconnect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2)); g_signals.SyncTransaction.disconnect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2));
g_signals.UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1));
} }
void UnregisterAllValidationInterfaces() { void UnregisterAllValidationInterfaces() {
@ -43,6 +45,7 @@ void UnregisterAllValidationInterfaces() {
g_signals.SetBestChain.disconnect_all_slots(); g_signals.SetBestChain.disconnect_all_slots();
g_signals.UpdatedTransaction.disconnect_all_slots(); g_signals.UpdatedTransaction.disconnect_all_slots();
g_signals.SyncTransaction.disconnect_all_slots(); g_signals.SyncTransaction.disconnect_all_slots();
g_signals.UpdatedBlockTip.disconnect_all_slots();
} }
void SyncWithWallets(const CTransaction &tx, const CBlock *pblock) { void SyncWithWallets(const CTransaction &tx, const CBlock *pblock) {

3
src/validationinterface.h

@ -30,6 +30,7 @@ void SyncWithWallets(const CTransaction& tx, const CBlock* pblock = NULL);
class CValidationInterface { class CValidationInterface {
protected: protected:
virtual void UpdatedBlockTip(const uint256 &newHashTip) {}
virtual void SyncTransaction(const CTransaction &tx, const CBlock *pblock) {} virtual void SyncTransaction(const CTransaction &tx, const CBlock *pblock) {}
virtual void SetBestChain(const CBlockLocator &locator) {} virtual void SetBestChain(const CBlockLocator &locator) {}
virtual void UpdatedTransaction(const uint256 &hash) {} virtual void UpdatedTransaction(const uint256 &hash) {}
@ -44,6 +45,8 @@ protected:
}; };
struct CMainSignals { struct CMainSignals {
/** Notifies listeners of updated block chain tip */
boost::signals2::signal<void (const uint256 &)> UpdatedBlockTip;
/** Notifies listeners of updated transaction data (transaction, and optionally the block it is found in. */ /** Notifies listeners of updated transaction data (transaction, and optionally the block it is found in. */
boost::signals2::signal<void (const CTransaction &, const CBlock *)> SyncTransaction; boost::signals2::signal<void (const CTransaction &, const CBlock *)> SyncTransaction;
/** Notifies listeners of an updated transaction without new data (for now: a coinbase potentially becoming visible). */ /** Notifies listeners of an updated transaction without new data (for now: a coinbase potentially becoming visible). */

22
src/zmq/zmqabstractnotifier.cpp

@ -0,0 +1,22 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "zmqabstractnotifier.h"
#include "util.h"
CZMQAbstractNotifier::~CZMQAbstractNotifier()
{
assert(!psocket);
}
bool CZMQAbstractNotifier::NotifyBlock(const uint256 &/*hash*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
{
return true;
}

42
src/zmq/zmqabstractnotifier.h

@ -0,0 +1,42 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
#include "zmqconfig.h"
class CZMQAbstractNotifier;
typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
class CZMQAbstractNotifier
{
public:
CZMQAbstractNotifier() : psocket(0) { }
virtual ~CZMQAbstractNotifier();
template <typename T>
static CZMQAbstractNotifier* Create()
{
return new T();
}
std::string GetType() const { return type; }
void SetType(const std::string &t) { type = t; }
std::string GetAddress() const { return address; }
void SetAddress(const std::string &a) { address = a; }
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
virtual bool NotifyBlock(const uint256 &hash);
virtual bool NotifyTransaction(const CTransaction &transaction);
protected:
void *psocket;
std::string type;
std::string address;
};
#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H

24
src/zmq/zmqconfig.h

@ -0,0 +1,24 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQCONFIG_H
#define BITCOIN_ZMQ_ZMQCONFIG_H
#if defined(HAVE_CONFIG_H)
#include "config/bitcoin-config.h"
#endif
#include <stdarg.h>
#include <string>
#if ENABLE_ZMQ
#include <zmq.h>
#endif
#include "primitives/block.h"
#include "primitives/transaction.h"
void zmqError(const char *str);
#endif // BITCOIN_ZMQ_ZMQCONFIG_H

155
src/zmq/zmqnotificationinterface.cpp

@ -0,0 +1,155 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "zmqnotificationinterface.h"
#include "zmqpublishnotifier.h"
#include "version.h"
#include "main.h"
#include "streams.h"
#include "util.h"
void zmqError(const char *str)
{
LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno));
}
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
{
}
CZMQNotificationInterface::~CZMQNotificationInterface()
{
// ensure Shutdown if Initialize is called
assert(!pcontext);
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
delete *i;
}
}
CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
{
CZMQNotificationInterface* notificationInterface = NULL;
std::map<std::string, CZMQNotifierFactory> factories;
std::list<CZMQAbstractNotifier*> notifiers;
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
{
std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
if (j!=args.end())
{
CZMQNotifierFactory factory = i->second;
std::string address = j->second;
CZMQAbstractNotifier *notifier = factory();
notifier->SetType(i->first);
notifier->SetAddress(address);
notifiers.push_back(notifier);
}
}
if (!notifiers.empty())
{
notificationInterface = new CZMQNotificationInterface();
notificationInterface->notifiers = notifiers;
}
return notificationInterface;
}
// Called at startup to conditionally set up ZMQ socket(s)
bool CZMQNotificationInterface::Initialize()
{
LogPrint("zmq", "Initialize notification interface\n");
assert(!pcontext);
pcontext = zmq_init(1);
if (!pcontext)
{
zmqError("Unable to initialize context");
return false;
}
std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
for (; i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
}
else
{
LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
if (i!=notifiers.end())
{
Shutdown();
return false;
}
return false;
}
// Called during shutdown sequence
void CZMQNotificationInterface::Shutdown()
{
LogPrint("zmq", "Shutdown notification interface\n");
if (pcontext)
{
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
zmq_ctx_destroy(pcontext);
pcontext = 0;
}
}
void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyBlock(hash))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyTransaction(tx))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}

35
src/zmq/zmqnotificationinterface.h

@ -0,0 +1,35 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
#include "validationinterface.h"
#include <string>
#include <map>
class CZMQAbstractNotifier;
class CZMQNotificationInterface : public CValidationInterface
{
public:
virtual ~CZMQNotificationInterface();
static CZMQNotificationInterface* CreateWithArguments(const std::map<std::string, std::string> &args);
bool Initialize();
void Shutdown();
protected: // CValidationInterface
void SyncTransaction(const CTransaction &tx, const CBlock *pblock);
void UpdatedBlockTip(const uint256 &newHashTip);
private:
CZMQNotificationInterface();
void *pcontext;
std::list<CZMQAbstractNotifier*> notifiers;
};
#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H

172
src/zmq/zmqpublishnotifier.cpp

@ -0,0 +1,172 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "zmqpublishnotifier.h"
#include "main.h"
#include "util.h"
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
{
va_list args;
va_start(args, size);
while (1)
{
zmq_msg_t msg;
int rc = zmq_msg_init_size(&msg, size);
if (rc != 0)
{
zmqError("Unable to initialize ZMQ msg");
return -1;
}
void *buf = zmq_msg_data(&msg);
memcpy(buf, data, size);
data = va_arg(args, const void*);
rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
if (rc == -1)
{
zmqError("Unable to send ZMQ msg");
zmq_msg_close(&msg);
return -1;
}
zmq_msg_close(&msg);
if (!data)
break;
size = va_arg(args, size_t);
}
return 0;
}
bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
{
assert(!psocket);
// check if address is being used by other publish notifier
std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
if (i==mapPublishNotifiers.end())
{
psocket = zmq_socket(pcontext, ZMQ_PUB);
if (!psocket)
{
zmqError("Failed to create socket");
return false;
}
int rc = zmq_bind(psocket, address.c_str());
if (rc!=0)
{
zmqError("Failed to bind address");
return false;
}
// register this notifier for the address, so it can be reused for other publish notifier
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
else
{
LogPrint("zmq", " Reuse socket for address %s\n", address);
psocket = i->second->psocket;
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
}
void CZMQAbstractPublishNotifier::Shutdown()
{
assert(psocket);
int count = mapPublishNotifiers.count(address);
// remove this notifier from the list of publishers using this address
typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
for (iterator it = iterpair.first; it != iterpair.second; ++it)
{
if (it->second==this)
{
mapPublishNotifiers.erase(it);
break;
}
}
if (count == 1)
{
LogPrint("zmq", "Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
}
psocket = 0;
}
bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash)
{
LogPrint("zmq", "Publish hash block %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
return rc == 0;
}
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
return rc == 0;
}
bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash)
{
LogPrint("zmq", "Publish raw block %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
{
LOCK(cs_main);
CBlock block;
CBlockIndex* pblockindex = mapBlockIndex[hash];
if(!ReadBlockFromDisk(block, pblockindex))
{
zmqError("Can't read block from disk");
return false;
}
ss << block;
}
int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
return rc == 0;
}
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
return rc == 0;
}

41
src/zmq/zmqpublishnotifier.h

@ -0,0 +1,41 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
#include "zmqabstractnotifier.h"
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
public:
bool Initialize(void *pcontext);
void Shutdown();
};
class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlock(const uint256 &hash);
};
class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyTransaction(const CTransaction &transaction);
};
class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlock(const uint256 &hash);
};
class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyTransaction(const CTransaction &transaction);
};
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
Loading…
Cancel
Save