Implemented Keva ZMQ notification.

This commit is contained in:
Just Wonder 2020-01-30 18:15:12 -08:00
parent 2a90a04efb
commit 1dd2c67eb7
13 changed files with 214 additions and 6 deletions

View File

@ -60,6 +60,7 @@ Currently, the following notifications are supported:
-zmqpubhashblock=address
-zmqpubrawblock=address
-zmqpubrawtx=address
-zmqpubkeva=address
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.

View File

@ -7,6 +7,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <keva/main.h>
#include "base58.h"
#include <chainparams.h>
#include <coins.h>
@ -20,6 +21,7 @@
#include <util.h>
#include <utilstrencodings.h>
#include <validation.h>
#include <validationinterface.h>
/* ************************************************************************** */
@ -228,6 +230,33 @@ CNameConflictTracker::AddConflictedEntry (CTransactionRef txRemoved)
/* ************************************************************************** */
CKevaNotifier::CKevaNotifier(CMainSignals* signals) {
this->signals = signals;
}
void CKevaNotifier::KevaNamespaceCreated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace) {
if (signals) {
CTransactionRef ptx = MakeTransactionRef(tx);
signals->KevaNamespaceCreated(ptx, nHeight, nameSpace);
}
}
void CKevaNotifier::KevaUpdated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key, const std::string& value) {
if (signals) {
CTransactionRef ptx = MakeTransactionRef(tx);
signals->KevaUpdated(ptx, nHeight, nameSpace, key, value);
}
}
void CKevaNotifier::KevaDeleted(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key) {
if (signals) {
CTransactionRef ptx = MakeTransactionRef(tx);
signals->KevaDeleted(ptx, nHeight, nameSpace, key);
}
}
/* ************************************************************************** */
bool
CheckKevaTransaction (const CTransaction& tx, unsigned nHeight,
const CCoinsView& view,
@ -345,7 +374,7 @@ CheckKevaTransaction (const CTransaction& tx, unsigned nHeight,
}
void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight,
CCoinsViewCache& view, CBlockUndo& undo)
CCoinsViewCache& view, CBlockUndo& undo, CKevaNotifier& notifier)
{
assert (nHeight != MEMPOOL_HEIGHT);
if (!tx.IsKevacoin())
@ -374,6 +403,7 @@ void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight,
CKevaData data;
data.fromScript(nHeight, COutPoint(tx.GetHash(), i), op);
view.SetName(nameSpace, key, data, false);
notifier.KevaNamespaceCreated(tx, nHeight, EncodeBase58Check(nameSpace));
} else if (op.isAnyUpdate()) {
const valtype& nameSpace = op.getOpNamespace();
const valtype& key = op.getOpKey();
@ -389,10 +419,12 @@ void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight,
CKevaData oldData;
if (view.GetName(nameSpace, key, oldData)) {
view.DeleteName(nameSpace, key);
notifier.KevaDeleted(tx, nHeight, EncodeBase58Check(nameSpace), ValtypeToString(key));
}
} else {
data.fromScript(nHeight, COutPoint(tx.GetHash(), i), op);
view.SetName(nameSpace, key, data, false);
notifier.KevaUpdated(tx, nHeight, EncodeBase58Check(nameSpace), ValtypeToString(key), ValtypeToString(data.getValue()));
}
}
}

View File

@ -28,6 +28,8 @@ class CCoinsViewCache;
class CTxMemPool;
class CTxMemPoolEntry;
class CValidationState;
class CMainSignals;
class CKevaNotifier;
typedef std::vector<unsigned char> valtype;
@ -217,6 +219,21 @@ public:
};
/**
* Notify Keva transactions. This is for ZMQ notification.
*/
class CKevaNotifier
{
private:
CMainSignals* signals;
public:
CKevaNotifier(CMainSignals*);
void KevaNamespaceCreated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace);
void KevaUpdated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key, const std::string& value);
void KevaDeleted(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key);
};
/* ************************************************************************** */
/**
@ -242,7 +259,7 @@ bool CheckKevaTransaction (const CTransaction& tx, unsigned nHeight,
* @param undo Record undo information here.
*/
void ApplyKevaTransaction (const CTransaction& tx, unsigned nHeight,
CCoinsViewCache& view, CBlockUndo& undo);
CCoinsViewCache& view, CBlockUndo& undo, CKevaNotifier& notifier);
/**
* Expire all names at the given height. This removes their coins

View File

@ -716,6 +716,7 @@ BOOST_AUTO_TEST_CASE(keva_updates_undo)
CCoinsViewCache view(&dummyView);
CBlockUndo undo;
CKevaData data;
CKevaNotifier kevaNotifier(NULL);
const CScript scrNew = CKevaScript::buildKevaNamespace(addr, nameSpace, displayName);
const CScript scr1_1 = CKevaScript::buildKevaPut(addr, nameSpace, key1, value1_old);
@ -730,14 +731,14 @@ BOOST_AUTO_TEST_CASE(keva_updates_undo)
CMutableTransaction mtx;
mtx.SetKevacoin();
mtx.vout.push_back(CTxOut(COIN, scrNew));
ApplyKevaTransaction(mtx, 100, view, undo);
ApplyKevaTransaction(mtx, 100, view, undo, kevaNotifier);
BOOST_CHECK(!view.GetName(nameSpace, key1, data));
BOOST_CHECK(view.GetNamespace(nameSpace, data));
BOOST_CHECK(undo.vkevaundo.size() == 1);
mtx.vout.clear();
mtx.vout.push_back(CTxOut(COIN, scr1_1));
ApplyKevaTransaction(mtx, 200, view, undo);
ApplyKevaTransaction(mtx, 200, view, undo, kevaNotifier);
BOOST_CHECK(view.GetName(nameSpace, key1, data));
BOOST_CHECK(data.getHeight() == 200);
BOOST_CHECK(data.getValue() == value1_old);
@ -747,7 +748,7 @@ BOOST_AUTO_TEST_CASE(keva_updates_undo)
mtx.vout.clear();
mtx.vout.push_back(CTxOut(COIN, scr1_2));
ApplyKevaTransaction(mtx, 300, view, undo);
ApplyKevaTransaction(mtx, 300, view, undo, kevaNotifier);
BOOST_CHECK(view.GetName(nameSpace, key1, data));
BOOST_CHECK(data.getHeight() == 300);
BOOST_CHECK(data.getValue() == value1_new);

View File

@ -2022,7 +2022,8 @@ bool CChainState::ConnectBlock(const CBlock& block, CValidationState& state, CBl
blockundo.vtxundo.push_back(CTxUndo());
}
UpdateCoins(tx, view, i == 0 ? undoDummy : blockundo.vtxundo.back(), pindex->nHeight);
ApplyKevaTransaction(tx, pindex->nHeight, view, blockundo);
CKevaNotifier kevaNotifier(&GetMainSignals());
ApplyKevaTransaction(tx, pindex->nHeight, view, blockundo, kevaNotifier);
}
int64_t nTime3 = GetTimeMicros(); nTimeConnect += nTime3 - nTime2;
LogPrint(BCLog::BENCH, " - Connect %u transactions: %.2fms (%.3fms/tx, %.3fms/txin) [%.2fs (%.2fms/blk)]\n", (unsigned)block.vtx.size(), MILLI * (nTime3 - nTime2), MILLI * (nTime3 - nTime2) / block.vtx.size(), nInputs <= 1 ? 0 : MILLI * (nTime3 - nTime2) / (nInputs-1), nTimeConnect * MICRO, nTimeConnect * MILLI / nBlocksTotal);

View File

@ -30,6 +30,11 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;
/** Keva related */
boost::signals2::signal<void (const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace)> KevaNamespaceCreated;
boost::signals2::signal<void (const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value)> KevaUpdated;
boost::signals2::signal<void (const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key)> KevaDeleted;
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
@ -83,6 +88,11 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2));
g_signals.m_internals->BlockChecked.connect(boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2));
g_signals.m_internals->NewPoWValidBlock.connect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
/** Keva related */
g_signals.m_internals->KevaNamespaceCreated.connect(boost::bind(&CValidationInterface::KevaNamespaceCreated, pwalletIn, _1, _2, _3));
g_signals.m_internals->KevaUpdated.connect(boost::bind(&CValidationInterface::KevaUpdated, pwalletIn, _1, _2, _3, _4, _5));
g_signals.m_internals->KevaDeleted.connect(boost::bind(&CValidationInterface::KevaDeleted, pwalletIn, _1, _2, _3, _4));
}
void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
@ -95,6 +105,11 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
/** Keva related */
g_signals.m_internals->KevaNamespaceCreated.disconnect(boost::bind(&CValidationInterface::KevaNamespaceCreated, pwalletIn, _1, _2, _3));
g_signals.m_internals->KevaUpdated.disconnect(boost::bind(&CValidationInterface::KevaUpdated, pwalletIn, _1, _2, _3, _4, _5));
g_signals.m_internals->KevaDeleted.disconnect(boost::bind(&CValidationInterface::KevaDeleted, pwalletIn, _1, _2, _3, _4));
}
void UnregisterAllValidationInterfaces() {
@ -110,6 +125,11 @@ void UnregisterAllValidationInterfaces() {
g_signals.m_internals->TransactionRemovedFromMempool.disconnect_all_slots();
g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots();
g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots();
/** Keva related */
g_signals.m_internals->KevaNamespaceCreated.disconnect_all_slots();
g_signals.m_internals->KevaUpdated.disconnect_all_slots();
g_signals.m_internals->KevaDeleted.disconnect_all_slots();
}
void CallFunctionInValidationInterfaceQueue(std::function<void ()> func) {
@ -179,3 +199,15 @@ void CMainSignals::BlockChecked(const CBlock& block, const CValidationState& sta
void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
m_internals->NewPoWValidBlock(pindex, block);
}
void CMainSignals::KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace) {
m_internals->KevaNamespaceCreated(ptx, height, nameSpace);
}
void CMainSignals::KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) {
m_internals->KevaUpdated(ptx, height, nameSpace, key, value);
}
void CMainSignals::KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) {
m_internals->KevaDeleted(ptx, height, nameSpace, key);
}

View File

@ -110,6 +110,25 @@ protected:
* callback was generated (not necessarily now)
*/
virtual void BlockChecked(const CBlock&, const CValidationState&) {}
/**
* Keva related interface.
* Notifies listeners of a new namespace.
*/
virtual void KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace) {}
/**
* Keva related interface.
* Notifies listeners of a key creation or update.
*/
virtual void KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) {}
/**
* Keva related interface.
* Notifies listeners of a key creation or update.
*/
virtual void KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) {}
/**
* Notifies listeners that a block which builds directly on our current tip
* has been received and connected to the headers tree, though not validated yet */
@ -154,6 +173,11 @@ public:
void Broadcast(int64_t nBestBlockTime, CConnman* connman);
void BlockChecked(const CBlock&, const CValidationState&);
void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&);
/** Keva related */
void KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace);
void KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value);
void KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key);
};
CMainSignals& GetMainSignals();

View File

@ -20,3 +20,8 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/
{
return true;
}
bool CZMQAbstractNotifier::NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type, const std::string& nameSpace, const std::string& key, const std::string& value)
{
return true;
}

View File

@ -35,6 +35,11 @@ public:
virtual bool NotifyBlock(const CBlockIndex *pindex);
virtual bool NotifyTransaction(const CTransaction &transaction);
virtual bool NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type,
const std::string& nameSpace,
const std::string& key = std::string(),
const std::string& value = std::string());
protected:
void *psocket;
std::string type;

View File

@ -10,6 +10,10 @@
#include <streams.h>
#include <util.h>
const static unsigned int KEVA_TYPE_CREATE_NAMESPACE = 0;
const static unsigned int KEVA_TYPE_UPDATE_KEY = 1;
const static unsigned int KEVA_TYPE_DELETE_KEY = 2;
void zmqError(const char *str)
{
LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
@ -48,6 +52,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
factories["pubkeva"] = CZMQAbstractNotifier::Create<CZMQPublishKevaNotifier>;
for (const auto& entry : factories)
{
@ -190,4 +195,46 @@ void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CB
}
}
void CZMQNotificationInterface::KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyKeva(ptx, height, KEVA_TYPE_CREATE_NAMESPACE, nameSpace)) {
i++;
} else {
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyKeva(ptx, height, KEVA_TYPE_UPDATE_KEY, nameSpace, key, value)) {
i++;
} else {
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyKeva(ptx, height, KEVA_TYPE_DELETE_KEY, nameSpace, key)) {
i++;
} else {
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
CZMQNotificationInterface* g_zmq_notification_interface = nullptr;

View File

@ -32,6 +32,13 @@ protected:
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
/**
* Keva related interface.
*/
void KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace) override;
void KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) override;
void KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) override;
private:
CZMQNotificationInterface();

View File

@ -2,6 +2,7 @@
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <univalue.h>
#include <chain.h>
#include <chainparams.h>
#include <streams.h>
@ -16,6 +17,7 @@ static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_KEVA = "keva";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
@ -195,3 +197,28 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
ss << transaction;
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}
bool CZMQPublishKevaNotifier::NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type, const std::string& nameSpace, const std::string& key, const std::string& value)
{
uint256 hash = ptx->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish keva height: %d, tx: %s\n", height, ptx->GetHash().ToString().c_str());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
ss << height << *ptx << nameSpace;
UniValue entry(UniValue::VOBJ);
entry.pushKV("tx", hash.ToString());
entry.pushKV("height", (int)height);
entry.pushKV("type", (int)type);
entry.pushKV("namespace", nameSpace);
if (key.size() > 0) {
entry.pushKV("key", key);
}
if (value.size() > 0) {
entry.pushKV("value", key);
}
std::string data = entry.write(0, 0);
return SendMessage(MSG_KEVA, &(*data.begin()), data.size());
}

View File

@ -52,4 +52,13 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override;
};
class CZMQPublishKevaNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type,
const std::string& nameSpace,
const std::string& key = std::string(),
const std::string& value = std::string()) override;
};
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H