From 1dd2c67eb7eada2693c3e52788d5e19d94e302f9 Mon Sep 17 00:00:00 2001 From: Just Wonder Date: Thu, 30 Jan 2020 18:15:12 -0800 Subject: [PATCH] Implemented Keva ZMQ notification. --- doc/zmq.md | 1 + src/keva/main.cpp | 34 +++++++++++++++++++- src/keva/main.h | 19 ++++++++++- src/test/keva_tests.cpp | 7 +++-- src/validation.cpp | 3 +- src/validationinterface.cpp | 32 +++++++++++++++++++ src/validationinterface.h | 24 ++++++++++++++ src/zmq/zmqabstractnotifier.cpp | 5 +++ src/zmq/zmqabstractnotifier.h | 5 +++ src/zmq/zmqnotificationinterface.cpp | 47 ++++++++++++++++++++++++++++ src/zmq/zmqnotificationinterface.h | 7 +++++ src/zmq/zmqpublishnotifier.cpp | 27 ++++++++++++++++ src/zmq/zmqpublishnotifier.h | 9 ++++++ 13 files changed, 214 insertions(+), 6 deletions(-) diff --git a/doc/zmq.md b/doc/zmq.md index cc8d3541f..e0acc3c7c 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -60,6 +60,7 @@ Currently, the following notifications are supported: -zmqpubhashblock=address -zmqpubrawblock=address -zmqpubrawtx=address + -zmqpubkeva=address The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. diff --git a/src/keva/main.cpp b/src/keva/main.cpp index 96b9ade7a..3160060ae 100644 --- a/src/keva/main.cpp +++ b/src/keva/main.cpp @@ -7,6 +7,7 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include +#include "base58.h" #include #include @@ -20,6 +21,7 @@ #include #include #include +#include /* ************************************************************************** */ @@ -228,6 +230,33 @@ CNameConflictTracker::AddConflictedEntry (CTransactionRef txRemoved) /* ************************************************************************** */ +CKevaNotifier::CKevaNotifier(CMainSignals* signals) { + this->signals = signals; +} + +void CKevaNotifier::KevaNamespaceCreated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace) { + if (signals) { + CTransactionRef ptx = MakeTransactionRef(tx); + signals->KevaNamespaceCreated(ptx, nHeight, nameSpace); + } +} + +void CKevaNotifier::KevaUpdated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key, const std::string& value) { + if (signals) { + CTransactionRef ptx = MakeTransactionRef(tx); + signals->KevaUpdated(ptx, nHeight, nameSpace, key, value); + } +} + +void CKevaNotifier::KevaDeleted(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key) { + if (signals) { + CTransactionRef ptx = MakeTransactionRef(tx); + signals->KevaDeleted(ptx, nHeight, nameSpace, key); + } +} + +/* ************************************************************************** */ + bool CheckKevaTransaction (const CTransaction& tx, unsigned nHeight, const CCoinsView& view, @@ -345,7 +374,7 @@ CheckKevaTransaction (const CTransaction& tx, unsigned nHeight, } void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight, - CCoinsViewCache& view, CBlockUndo& undo) + CCoinsViewCache& view, CBlockUndo& undo, CKevaNotifier& notifier) { assert (nHeight != MEMPOOL_HEIGHT); if (!tx.IsKevacoin()) @@ -374,6 +403,7 @@ void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight, CKevaData data; data.fromScript(nHeight, COutPoint(tx.GetHash(), i), op); view.SetName(nameSpace, key, data, false); + notifier.KevaNamespaceCreated(tx, nHeight, EncodeBase58Check(nameSpace)); } else if (op.isAnyUpdate()) { const valtype& nameSpace = op.getOpNamespace(); const valtype& key = op.getOpKey(); @@ -389,10 +419,12 @@ void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight, CKevaData oldData; if (view.GetName(nameSpace, key, oldData)) { view.DeleteName(nameSpace, key); + notifier.KevaDeleted(tx, nHeight, EncodeBase58Check(nameSpace), ValtypeToString(key)); } } else { data.fromScript(nHeight, COutPoint(tx.GetHash(), i), op); view.SetName(nameSpace, key, data, false); + notifier.KevaUpdated(tx, nHeight, EncodeBase58Check(nameSpace), ValtypeToString(key), ValtypeToString(data.getValue())); } } } diff --git a/src/keva/main.h b/src/keva/main.h index 0de11816f..ee367a3fc 100644 --- a/src/keva/main.h +++ b/src/keva/main.h @@ -28,6 +28,8 @@ class CCoinsViewCache; class CTxMemPool; class CTxMemPoolEntry; class CValidationState; +class CMainSignals; +class CKevaNotifier; typedef std::vector valtype; @@ -217,6 +219,21 @@ public: }; +/** + * Notify Keva transactions. This is for ZMQ notification. + */ +class CKevaNotifier +{ +private: + CMainSignals* signals; + +public: + CKevaNotifier(CMainSignals*); + void KevaNamespaceCreated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace); + void KevaUpdated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key, const std::string& value); + void KevaDeleted(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key); +}; + /* ************************************************************************** */ /** @@ -242,7 +259,7 @@ bool CheckKevaTransaction (const CTransaction& tx, unsigned nHeight, * @param undo Record undo information here. */ void ApplyKevaTransaction (const CTransaction& tx, unsigned nHeight, - CCoinsViewCache& view, CBlockUndo& undo); + CCoinsViewCache& view, CBlockUndo& undo, CKevaNotifier& notifier); /** * Expire all names at the given height. This removes their coins diff --git a/src/test/keva_tests.cpp b/src/test/keva_tests.cpp index 18e68547c..97101ece2 100644 --- a/src/test/keva_tests.cpp +++ b/src/test/keva_tests.cpp @@ -716,6 +716,7 @@ BOOST_AUTO_TEST_CASE(keva_updates_undo) CCoinsViewCache view(&dummyView); CBlockUndo undo; CKevaData data; + CKevaNotifier kevaNotifier(NULL); const CScript scrNew = CKevaScript::buildKevaNamespace(addr, nameSpace, displayName); const CScript scr1_1 = CKevaScript::buildKevaPut(addr, nameSpace, key1, value1_old); @@ -730,14 +731,14 @@ BOOST_AUTO_TEST_CASE(keva_updates_undo) CMutableTransaction mtx; mtx.SetKevacoin(); mtx.vout.push_back(CTxOut(COIN, scrNew)); - ApplyKevaTransaction(mtx, 100, view, undo); + ApplyKevaTransaction(mtx, 100, view, undo, kevaNotifier); BOOST_CHECK(!view.GetName(nameSpace, key1, data)); BOOST_CHECK(view.GetNamespace(nameSpace, data)); BOOST_CHECK(undo.vkevaundo.size() == 1); mtx.vout.clear(); mtx.vout.push_back(CTxOut(COIN, scr1_1)); - ApplyKevaTransaction(mtx, 200, view, undo); + ApplyKevaTransaction(mtx, 200, view, undo, kevaNotifier); BOOST_CHECK(view.GetName(nameSpace, key1, data)); BOOST_CHECK(data.getHeight() == 200); BOOST_CHECK(data.getValue() == value1_old); @@ -747,7 +748,7 @@ BOOST_AUTO_TEST_CASE(keva_updates_undo) mtx.vout.clear(); mtx.vout.push_back(CTxOut(COIN, scr1_2)); - ApplyKevaTransaction(mtx, 300, view, undo); + ApplyKevaTransaction(mtx, 300, view, undo, kevaNotifier); BOOST_CHECK(view.GetName(nameSpace, key1, data)); BOOST_CHECK(data.getHeight() == 300); BOOST_CHECK(data.getValue() == value1_new); diff --git a/src/validation.cpp b/src/validation.cpp index 8097630e7..5fd7b2900 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -2022,7 +2022,8 @@ bool CChainState::ConnectBlock(const CBlock& block, CValidationState& state, CBl blockundo.vtxundo.push_back(CTxUndo()); } UpdateCoins(tx, view, i == 0 ? undoDummy : blockundo.vtxundo.back(), pindex->nHeight); - ApplyKevaTransaction(tx, pindex->nHeight, view, blockundo); + CKevaNotifier kevaNotifier(&GetMainSignals()); + ApplyKevaTransaction(tx, pindex->nHeight, view, blockundo, kevaNotifier); } int64_t nTime3 = GetTimeMicros(); nTimeConnect += nTime3 - nTime2; LogPrint(BCLog::BENCH, " - Connect %u transactions: %.2fms (%.3fms/tx, %.3fms/txin) [%.2fs (%.2fms/blk)]\n", (unsigned)block.vtx.size(), MILLI * (nTime3 - nTime2), MILLI * (nTime3 - nTime2) / block.vtx.size(), nInputs <= 1 ? 0 : MILLI * (nTime3 - nTime2) / (nInputs-1), nTimeConnect * MICRO, nTimeConnect * MILLI / nBlocksTotal); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 90513bc6c..a99c2eb1b 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -30,6 +30,11 @@ struct MainSignalsInstance { boost::signals2::signal BlockChecked; boost::signals2::signal&)> NewPoWValidBlock; + /** Keva related */ + boost::signals2::signal KevaNamespaceCreated; + boost::signals2::signal KevaUpdated; + boost::signals2::signal KevaDeleted; + // We are not allowed to assume the scheduler only runs in one thread, // but must ensure all callbacks happen in-order, so we end up creating // our own queue here :( @@ -83,6 +88,11 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) { g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2)); g_signals.m_internals->BlockChecked.connect(boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2)); g_signals.m_internals->NewPoWValidBlock.connect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2)); + + /** Keva related */ + g_signals.m_internals->KevaNamespaceCreated.connect(boost::bind(&CValidationInterface::KevaNamespaceCreated, pwalletIn, _1, _2, _3)); + g_signals.m_internals->KevaUpdated.connect(boost::bind(&CValidationInterface::KevaUpdated, pwalletIn, _1, _2, _3, _4, _5)); + g_signals.m_internals->KevaDeleted.connect(boost::bind(&CValidationInterface::KevaDeleted, pwalletIn, _1, _2, _3, _4)); } void UnregisterValidationInterface(CValidationInterface* pwalletIn) { @@ -95,6 +105,11 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) { g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1)); g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3)); g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2)); + + /** Keva related */ + g_signals.m_internals->KevaNamespaceCreated.disconnect(boost::bind(&CValidationInterface::KevaNamespaceCreated, pwalletIn, _1, _2, _3)); + g_signals.m_internals->KevaUpdated.disconnect(boost::bind(&CValidationInterface::KevaUpdated, pwalletIn, _1, _2, _3, _4, _5)); + g_signals.m_internals->KevaDeleted.disconnect(boost::bind(&CValidationInterface::KevaDeleted, pwalletIn, _1, _2, _3, _4)); } void UnregisterAllValidationInterfaces() { @@ -110,6 +125,11 @@ void UnregisterAllValidationInterfaces() { g_signals.m_internals->TransactionRemovedFromMempool.disconnect_all_slots(); g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots(); g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots(); + + /** Keva related */ + g_signals.m_internals->KevaNamespaceCreated.disconnect_all_slots(); + g_signals.m_internals->KevaUpdated.disconnect_all_slots(); + g_signals.m_internals->KevaDeleted.disconnect_all_slots(); } void CallFunctionInValidationInterfaceQueue(std::function func) { @@ -179,3 +199,15 @@ void CMainSignals::BlockChecked(const CBlock& block, const CValidationState& sta void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr &block) { m_internals->NewPoWValidBlock(pindex, block); } + +void CMainSignals::KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace) { + m_internals->KevaNamespaceCreated(ptx, height, nameSpace); +} + +void CMainSignals::KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) { + m_internals->KevaUpdated(ptx, height, nameSpace, key, value); +} + +void CMainSignals::KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) { + m_internals->KevaDeleted(ptx, height, nameSpace, key); +} diff --git a/src/validationinterface.h b/src/validationinterface.h index 784757a28..a20d09287 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -110,6 +110,25 @@ protected: * callback was generated (not necessarily now) */ virtual void BlockChecked(const CBlock&, const CValidationState&) {} + + /** + * Keva related interface. + * Notifies listeners of a new namespace. + */ + virtual void KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace) {} + + /** + * Keva related interface. + * Notifies listeners of a key creation or update. + */ + virtual void KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) {} + + /** + * Keva related interface. + * Notifies listeners of a key creation or update. + */ + virtual void KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) {} + /** * Notifies listeners that a block which builds directly on our current tip * has been received and connected to the headers tree, though not validated yet */ @@ -154,6 +173,11 @@ public: void Broadcast(int64_t nBestBlockTime, CConnman* connman); void BlockChecked(const CBlock&, const CValidationState&); void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr&); + + /** Keva related */ + void KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace); + void KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value); + void KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key); }; CMainSignals& GetMainSignals(); diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index fc1ff6d03..23dca8ce8 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -20,3 +20,8 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/ { return true; } + +bool CZMQAbstractNotifier::NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type, const std::string& nameSpace, const std::string& key, const std::string& value) +{ + return true; +} diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 7270ae203..9b1e5ab74 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -35,6 +35,11 @@ public: virtual bool NotifyBlock(const CBlockIndex *pindex); virtual bool NotifyTransaction(const CTransaction &transaction); + virtual bool NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type, + const std::string& nameSpace, + const std::string& key = std::string(), + const std::string& value = std::string()); + protected: void *psocket; std::string type; diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index a24b61ac8..099b6a41b 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -10,6 +10,10 @@ #include #include +const static unsigned int KEVA_TYPE_CREATE_NAMESPACE = 0; +const static unsigned int KEVA_TYPE_UPDATE_KEY = 1; +const static unsigned int KEVA_TYPE_DELETE_KEY = 2; + void zmqError(const char *str) { LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno)); @@ -48,6 +52,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() factories["pubhashtx"] = CZMQAbstractNotifier::Create; factories["pubrawblock"] = CZMQAbstractNotifier::Create; factories["pubrawtx"] = CZMQAbstractNotifier::Create; + factories["pubkeva"] = CZMQAbstractNotifier::Create; for (const auto& entry : factories) { @@ -190,4 +195,46 @@ void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyKeva(ptx, height, KEVA_TYPE_CREATE_NAMESPACE, nameSpace)) { + i++; + } else { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + +void CZMQNotificationInterface::KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) +{ + for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyKeva(ptx, height, KEVA_TYPE_UPDATE_KEY, nameSpace, key, value)) { + i++; + } else { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + +void CZMQNotificationInterface::KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) +{ + for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyKeva(ptx, height, KEVA_TYPE_DELETE_KEY, nameSpace, key)) { + i++; + } else { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + CZMQNotificationInterface* g_zmq_notification_interface = nullptr; diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index fa36cf27f..30a450547 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -32,6 +32,13 @@ protected: void BlockDisconnected(const std::shared_ptr& pblock) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; + /** + * Keva related interface. + */ + void KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace) override; + void KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) override; + void KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) override; + private: CZMQNotificationInterface(); diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 8c9acef25..0a7b56e47 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -2,6 +2,7 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. +#include #include #include #include @@ -16,6 +17,7 @@ static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHTX = "hashtx"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; +static const char *MSG_KEVA = "keva"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) @@ -195,3 +197,28 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr ss << transaction; return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } + +bool CZMQPublishKevaNotifier::NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type, const std::string& nameSpace, const std::string& key, const std::string& value) +{ + uint256 hash = ptx->GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish keva height: %d, tx: %s\n", height, ptx->GetHash().ToString().c_str()); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); + ss << height << *ptx << nameSpace; + + UniValue entry(UniValue::VOBJ); + entry.pushKV("tx", hash.ToString()); + entry.pushKV("height", (int)height); + entry.pushKV("type", (int)type); + entry.pushKV("namespace", nameSpace); + + if (key.size() > 0) { + entry.pushKV("key", key); + } + + if (value.size() > 0) { + entry.pushKV("value", key); + } + + std::string data = entry.write(0, 0); + return SendMessage(MSG_KEVA, &(*data.begin()), data.size()); +} diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index d53bba997..b92f3e219 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -52,4 +52,13 @@ public: bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishKevaNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type, + const std::string& nameSpace, + const std::string& key = std::string(), + const std::string& value = std::string()) override; +}; + #endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H