Browse Source

Merge pull request #9 from kevacoin-project/zmq

Implemented Keva ZMQ notification.
idb-fix
The Kevacoin Project 5 years ago committed by GitHub
parent
commit
5472bdf51a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      doc/zmq.md
  2. 34
      src/keva/main.cpp
  3. 19
      src/keva/main.h
  4. 7
      src/test/keva_tests.cpp
  5. 3
      src/validation.cpp
  6. 32
      src/validationinterface.cpp
  7. 24
      src/validationinterface.h
  8. 5
      src/zmq/zmqabstractnotifier.cpp
  9. 5
      src/zmq/zmqabstractnotifier.h
  10. 47
      src/zmq/zmqnotificationinterface.cpp
  11. 7
      src/zmq/zmqnotificationinterface.h
  12. 27
      src/zmq/zmqpublishnotifier.cpp
  13. 9
      src/zmq/zmqpublishnotifier.h

1
doc/zmq.md

@ -60,6 +60,7 @@ Currently, the following notifications are supported:
-zmqpubhashblock=address -zmqpubhashblock=address
-zmqpubrawblock=address -zmqpubrawblock=address
-zmqpubrawtx=address -zmqpubrawtx=address
-zmqpubkeva=address
The socket type is PUB and the address must be a valid ZeroMQ socket The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification. address. The same address can be used in more than one notification.

34
src/keva/main.cpp

@ -7,6 +7,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <keva/main.h> #include <keva/main.h>
#include "base58.h"
#include <chainparams.h> #include <chainparams.h>
#include <coins.h> #include <coins.h>
@ -20,6 +21,7 @@
#include <util.h> #include <util.h>
#include <utilstrencodings.h> #include <utilstrencodings.h>
#include <validation.h> #include <validation.h>
#include <validationinterface.h>
/* ************************************************************************** */ /* ************************************************************************** */
@ -228,6 +230,33 @@ CNameConflictTracker::AddConflictedEntry (CTransactionRef txRemoved)
/* ************************************************************************** */ /* ************************************************************************** */
CKevaNotifier::CKevaNotifier(CMainSignals* signals) {
this->signals = signals;
}
void CKevaNotifier::KevaNamespaceCreated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace) {
if (signals) {
CTransactionRef ptx = MakeTransactionRef(tx);
signals->KevaNamespaceCreated(ptx, nHeight, nameSpace);
}
}
void CKevaNotifier::KevaUpdated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key, const std::string& value) {
if (signals) {
CTransactionRef ptx = MakeTransactionRef(tx);
signals->KevaUpdated(ptx, nHeight, nameSpace, key, value);
}
}
void CKevaNotifier::KevaDeleted(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key) {
if (signals) {
CTransactionRef ptx = MakeTransactionRef(tx);
signals->KevaDeleted(ptx, nHeight, nameSpace, key);
}
}
/* ************************************************************************** */
bool bool
CheckKevaTransaction (const CTransaction& tx, unsigned nHeight, CheckKevaTransaction (const CTransaction& tx, unsigned nHeight,
const CCoinsView& view, const CCoinsView& view,
@ -345,7 +374,7 @@ CheckKevaTransaction (const CTransaction& tx, unsigned nHeight,
} }
void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight, void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight,
CCoinsViewCache& view, CBlockUndo& undo) CCoinsViewCache& view, CBlockUndo& undo, CKevaNotifier& notifier)
{ {
assert (nHeight != MEMPOOL_HEIGHT); assert (nHeight != MEMPOOL_HEIGHT);
if (!tx.IsKevacoin()) if (!tx.IsKevacoin())
@ -374,6 +403,7 @@ void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight,
CKevaData data; CKevaData data;
data.fromScript(nHeight, COutPoint(tx.GetHash(), i), op); data.fromScript(nHeight, COutPoint(tx.GetHash(), i), op);
view.SetName(nameSpace, key, data, false); view.SetName(nameSpace, key, data, false);
notifier.KevaNamespaceCreated(tx, nHeight, EncodeBase58Check(nameSpace));
} else if (op.isAnyUpdate()) { } else if (op.isAnyUpdate()) {
const valtype& nameSpace = op.getOpNamespace(); const valtype& nameSpace = op.getOpNamespace();
const valtype& key = op.getOpKey(); const valtype& key = op.getOpKey();
@ -389,10 +419,12 @@ void ApplyKevaTransaction(const CTransaction& tx, unsigned nHeight,
CKevaData oldData; CKevaData oldData;
if (view.GetName(nameSpace, key, oldData)) { if (view.GetName(nameSpace, key, oldData)) {
view.DeleteName(nameSpace, key); view.DeleteName(nameSpace, key);
notifier.KevaDeleted(tx, nHeight, EncodeBase58Check(nameSpace), ValtypeToString(key));
} }
} else { } else {
data.fromScript(nHeight, COutPoint(tx.GetHash(), i), op); data.fromScript(nHeight, COutPoint(tx.GetHash(), i), op);
view.SetName(nameSpace, key, data, false); view.SetName(nameSpace, key, data, false);
notifier.KevaUpdated(tx, nHeight, EncodeBase58Check(nameSpace), ValtypeToString(key), ValtypeToString(data.getValue()));
} }
} }
} }

19
src/keva/main.h

@ -28,6 +28,8 @@ class CCoinsViewCache;
class CTxMemPool; class CTxMemPool;
class CTxMemPoolEntry; class CTxMemPoolEntry;
class CValidationState; class CValidationState;
class CMainSignals;
class CKevaNotifier;
typedef std::vector<unsigned char> valtype; typedef std::vector<unsigned char> valtype;
@ -217,6 +219,21 @@ public:
}; };
/**
* Notify Keva transactions. This is for ZMQ notification.
*/
class CKevaNotifier
{
private:
CMainSignals* signals;
public:
CKevaNotifier(CMainSignals*);
void KevaNamespaceCreated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace);
void KevaUpdated(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key, const std::string& value);
void KevaDeleted(const CTransaction& tx, unsigned nHeight, const std::string& nameSpace, const std::string& key);
};
/* ************************************************************************** */ /* ************************************************************************** */
/** /**
@ -242,7 +259,7 @@ bool CheckKevaTransaction (const CTransaction& tx, unsigned nHeight,
* @param undo Record undo information here. * @param undo Record undo information here.
*/ */
void ApplyKevaTransaction (const CTransaction& tx, unsigned nHeight, void ApplyKevaTransaction (const CTransaction& tx, unsigned nHeight,
CCoinsViewCache& view, CBlockUndo& undo); CCoinsViewCache& view, CBlockUndo& undo, CKevaNotifier& notifier);
/** /**
* Expire all names at the given height. This removes their coins * Expire all names at the given height. This removes their coins

7
src/test/keva_tests.cpp

@ -716,6 +716,7 @@ BOOST_AUTO_TEST_CASE(keva_updates_undo)
CCoinsViewCache view(&dummyView); CCoinsViewCache view(&dummyView);
CBlockUndo undo; CBlockUndo undo;
CKevaData data; CKevaData data;
CKevaNotifier kevaNotifier(NULL);
const CScript scrNew = CKevaScript::buildKevaNamespace(addr, nameSpace, displayName); const CScript scrNew = CKevaScript::buildKevaNamespace(addr, nameSpace, displayName);
const CScript scr1_1 = CKevaScript::buildKevaPut(addr, nameSpace, key1, value1_old); const CScript scr1_1 = CKevaScript::buildKevaPut(addr, nameSpace, key1, value1_old);
@ -730,14 +731,14 @@ BOOST_AUTO_TEST_CASE(keva_updates_undo)
CMutableTransaction mtx; CMutableTransaction mtx;
mtx.SetKevacoin(); mtx.SetKevacoin();
mtx.vout.push_back(CTxOut(COIN, scrNew)); mtx.vout.push_back(CTxOut(COIN, scrNew));
ApplyKevaTransaction(mtx, 100, view, undo); ApplyKevaTransaction(mtx, 100, view, undo, kevaNotifier);
BOOST_CHECK(!view.GetName(nameSpace, key1, data)); BOOST_CHECK(!view.GetName(nameSpace, key1, data));
BOOST_CHECK(view.GetNamespace(nameSpace, data)); BOOST_CHECK(view.GetNamespace(nameSpace, data));
BOOST_CHECK(undo.vkevaundo.size() == 1); BOOST_CHECK(undo.vkevaundo.size() == 1);
mtx.vout.clear(); mtx.vout.clear();
mtx.vout.push_back(CTxOut(COIN, scr1_1)); mtx.vout.push_back(CTxOut(COIN, scr1_1));
ApplyKevaTransaction(mtx, 200, view, undo); ApplyKevaTransaction(mtx, 200, view, undo, kevaNotifier);
BOOST_CHECK(view.GetName(nameSpace, key1, data)); BOOST_CHECK(view.GetName(nameSpace, key1, data));
BOOST_CHECK(data.getHeight() == 200); BOOST_CHECK(data.getHeight() == 200);
BOOST_CHECK(data.getValue() == value1_old); BOOST_CHECK(data.getValue() == value1_old);
@ -747,7 +748,7 @@ BOOST_AUTO_TEST_CASE(keva_updates_undo)
mtx.vout.clear(); mtx.vout.clear();
mtx.vout.push_back(CTxOut(COIN, scr1_2)); mtx.vout.push_back(CTxOut(COIN, scr1_2));
ApplyKevaTransaction(mtx, 300, view, undo); ApplyKevaTransaction(mtx, 300, view, undo, kevaNotifier);
BOOST_CHECK(view.GetName(nameSpace, key1, data)); BOOST_CHECK(view.GetName(nameSpace, key1, data));
BOOST_CHECK(data.getHeight() == 300); BOOST_CHECK(data.getHeight() == 300);
BOOST_CHECK(data.getValue() == value1_new); BOOST_CHECK(data.getValue() == value1_new);

3
src/validation.cpp

@ -2022,7 +2022,8 @@ bool CChainState::ConnectBlock(const CBlock& block, CValidationState& state, CBl
blockundo.vtxundo.push_back(CTxUndo()); blockundo.vtxundo.push_back(CTxUndo());
} }
UpdateCoins(tx, view, i == 0 ? undoDummy : blockundo.vtxundo.back(), pindex->nHeight); UpdateCoins(tx, view, i == 0 ? undoDummy : blockundo.vtxundo.back(), pindex->nHeight);
ApplyKevaTransaction(tx, pindex->nHeight, view, blockundo); CKevaNotifier kevaNotifier(&GetMainSignals());
ApplyKevaTransaction(tx, pindex->nHeight, view, blockundo, kevaNotifier);
} }
int64_t nTime3 = GetTimeMicros(); nTimeConnect += nTime3 - nTime2; int64_t nTime3 = GetTimeMicros(); nTimeConnect += nTime3 - nTime2;
LogPrint(BCLog::BENCH, " - Connect %u transactions: %.2fms (%.3fms/tx, %.3fms/txin) [%.2fs (%.2fms/blk)]\n", (unsigned)block.vtx.size(), MILLI * (nTime3 - nTime2), MILLI * (nTime3 - nTime2) / block.vtx.size(), nInputs <= 1 ? 0 : MILLI * (nTime3 - nTime2) / (nInputs-1), nTimeConnect * MICRO, nTimeConnect * MILLI / nBlocksTotal); LogPrint(BCLog::BENCH, " - Connect %u transactions: %.2fms (%.3fms/tx, %.3fms/txin) [%.2fs (%.2fms/blk)]\n", (unsigned)block.vtx.size(), MILLI * (nTime3 - nTime2), MILLI * (nTime3 - nTime2) / block.vtx.size(), nInputs <= 1 ? 0 : MILLI * (nTime3 - nTime2) / (nInputs-1), nTimeConnect * MICRO, nTimeConnect * MILLI / nBlocksTotal);

32
src/validationinterface.cpp

@ -30,6 +30,11 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked; boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock; boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;
/** Keva related */
boost::signals2::signal<void (const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace)> KevaNamespaceCreated;
boost::signals2::signal<void (const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value)> KevaUpdated;
boost::signals2::signal<void (const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key)> KevaDeleted;
// We are not allowed to assume the scheduler only runs in one thread, // We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating // but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :( // our own queue here :(
@ -83,6 +88,11 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2)); g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2));
g_signals.m_internals->BlockChecked.connect(boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2)); g_signals.m_internals->BlockChecked.connect(boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2));
g_signals.m_internals->NewPoWValidBlock.connect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2)); g_signals.m_internals->NewPoWValidBlock.connect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
/** Keva related */
g_signals.m_internals->KevaNamespaceCreated.connect(boost::bind(&CValidationInterface::KevaNamespaceCreated, pwalletIn, _1, _2, _3));
g_signals.m_internals->KevaUpdated.connect(boost::bind(&CValidationInterface::KevaUpdated, pwalletIn, _1, _2, _3, _4, _5));
g_signals.m_internals->KevaDeleted.connect(boost::bind(&CValidationInterface::KevaDeleted, pwalletIn, _1, _2, _3, _4));
} }
void UnregisterValidationInterface(CValidationInterface* pwalletIn) { void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
@ -95,6 +105,11 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1)); g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1));
g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3)); g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3));
g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2)); g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2));
/** Keva related */
g_signals.m_internals->KevaNamespaceCreated.disconnect(boost::bind(&CValidationInterface::KevaNamespaceCreated, pwalletIn, _1, _2, _3));
g_signals.m_internals->KevaUpdated.disconnect(boost::bind(&CValidationInterface::KevaUpdated, pwalletIn, _1, _2, _3, _4, _5));
g_signals.m_internals->KevaDeleted.disconnect(boost::bind(&CValidationInterface::KevaDeleted, pwalletIn, _1, _2, _3, _4));
} }
void UnregisterAllValidationInterfaces() { void UnregisterAllValidationInterfaces() {
@ -110,6 +125,11 @@ void UnregisterAllValidationInterfaces() {
g_signals.m_internals->TransactionRemovedFromMempool.disconnect_all_slots(); g_signals.m_internals->TransactionRemovedFromMempool.disconnect_all_slots();
g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots(); g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots();
g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots(); g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots();
/** Keva related */
g_signals.m_internals->KevaNamespaceCreated.disconnect_all_slots();
g_signals.m_internals->KevaUpdated.disconnect_all_slots();
g_signals.m_internals->KevaDeleted.disconnect_all_slots();
} }
void CallFunctionInValidationInterfaceQueue(std::function<void ()> func) { void CallFunctionInValidationInterfaceQueue(std::function<void ()> func) {
@ -179,3 +199,15 @@ void CMainSignals::BlockChecked(const CBlock& block, const CValidationState& sta
void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) { void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
m_internals->NewPoWValidBlock(pindex, block); m_internals->NewPoWValidBlock(pindex, block);
} }
void CMainSignals::KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace) {
m_internals->KevaNamespaceCreated(ptx, height, nameSpace);
}
void CMainSignals::KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) {
m_internals->KevaUpdated(ptx, height, nameSpace, key, value);
}
void CMainSignals::KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) {
m_internals->KevaDeleted(ptx, height, nameSpace, key);
}

24
src/validationinterface.h

@ -110,6 +110,25 @@ protected:
* callback was generated (not necessarily now) * callback was generated (not necessarily now)
*/ */
virtual void BlockChecked(const CBlock&, const CValidationState&) {} virtual void BlockChecked(const CBlock&, const CValidationState&) {}
/**
* Keva related interface.
* Notifies listeners of a new namespace.
*/
virtual void KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace) {}
/**
* Keva related interface.
* Notifies listeners of a key creation or update.
*/
virtual void KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) {}
/**
* Keva related interface.
* Notifies listeners of a key creation or update.
*/
virtual void KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) {}
/** /**
* Notifies listeners that a block which builds directly on our current tip * Notifies listeners that a block which builds directly on our current tip
* has been received and connected to the headers tree, though not validated yet */ * has been received and connected to the headers tree, though not validated yet */
@ -154,6 +173,11 @@ public:
void Broadcast(int64_t nBestBlockTime, CConnman* connman); void Broadcast(int64_t nBestBlockTime, CConnman* connman);
void BlockChecked(const CBlock&, const CValidationState&); void BlockChecked(const CBlock&, const CValidationState&);
void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&); void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&);
/** Keva related */
void KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace);
void KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value);
void KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key);
}; };
CMainSignals& GetMainSignals(); CMainSignals& GetMainSignals();

5
src/zmq/zmqabstractnotifier.cpp

@ -20,3 +20,8 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/
{ {
return true; return true;
} }
bool CZMQAbstractNotifier::NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type, const std::string& nameSpace, const std::string& key, const std::string& value)
{
return true;
}

5
src/zmq/zmqabstractnotifier.h

@ -35,6 +35,11 @@ public:
virtual bool NotifyBlock(const CBlockIndex *pindex); virtual bool NotifyBlock(const CBlockIndex *pindex);
virtual bool NotifyTransaction(const CTransaction &transaction); virtual bool NotifyTransaction(const CTransaction &transaction);
virtual bool NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type,
const std::string& nameSpace,
const std::string& key = std::string(),
const std::string& value = std::string());
protected: protected:
void *psocket; void *psocket;
std::string type; std::string type;

47
src/zmq/zmqnotificationinterface.cpp

@ -10,6 +10,10 @@
#include <streams.h> #include <streams.h>
#include <util.h> #include <util.h>
const static unsigned int KEVA_TYPE_CREATE_NAMESPACE = 0;
const static unsigned int KEVA_TYPE_UPDATE_KEY = 1;
const static unsigned int KEVA_TYPE_DELETE_KEY = 2;
void zmqError(const char *str) void zmqError(const char *str)
{ {
LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno)); LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
@ -48,6 +52,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
factories["pubkeva"] = CZMQAbstractNotifier::Create<CZMQPublishKevaNotifier>;
for (const auto& entry : factories) for (const auto& entry : factories)
{ {
@ -190,4 +195,46 @@ void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CB
} }
} }
void CZMQNotificationInterface::KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyKeva(ptx, height, KEVA_TYPE_CREATE_NAMESPACE, nameSpace)) {
i++;
} else {
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyKeva(ptx, height, KEVA_TYPE_UPDATE_KEY, nameSpace, key, value)) {
i++;
} else {
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyKeva(ptx, height, KEVA_TYPE_DELETE_KEY, nameSpace, key)) {
i++;
} else {
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
CZMQNotificationInterface* g_zmq_notification_interface = nullptr; CZMQNotificationInterface* g_zmq_notification_interface = nullptr;

7
src/zmq/zmqnotificationinterface.h

@ -32,6 +32,13 @@ protected:
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) override; void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
/**
* Keva related interface.
*/
void KevaNamespaceCreated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace) override;
void KevaUpdated(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key, const std::string& value) override;
void KevaDeleted(const CTransactionRef &ptx, unsigned int height, const std::string& nameSpace, const std::string& key) override;
private: private:
CZMQNotificationInterface(); CZMQNotificationInterface();

27
src/zmq/zmqpublishnotifier.cpp

@ -2,6 +2,7 @@
// Distributed under the MIT software license, see the accompanying // Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <univalue.h>
#include <chain.h> #include <chain.h>
#include <chainparams.h> #include <chainparams.h>
#include <streams.h> #include <streams.h>
@ -16,6 +17,7 @@ static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx"; static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx"; static const char *MSG_RAWTX = "rawtx";
static const char *MSG_KEVA = "keva";
// Internal function to send multipart message // Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
@ -195,3 +197,28 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
ss << transaction; ss << transaction;
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
} }
bool CZMQPublishKevaNotifier::NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type, const std::string& nameSpace, const std::string& key, const std::string& value)
{
uint256 hash = ptx->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish keva height: %d, tx: %s\n", height, ptx->GetHash().ToString().c_str());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
ss << height << *ptx << nameSpace;
UniValue entry(UniValue::VOBJ);
entry.pushKV("tx", hash.ToString());
entry.pushKV("height", (int)height);
entry.pushKV("type", (int)type);
entry.pushKV("namespace", nameSpace);
if (key.size() > 0) {
entry.pushKV("key", key);
}
if (value.size() > 0) {
entry.pushKV("value", key);
}
std::string data = entry.write(0, 0);
return SendMessage(MSG_KEVA, &(*data.begin()), data.size());
}

9
src/zmq/zmqpublishnotifier.h

@ -52,4 +52,13 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override; bool NotifyTransaction(const CTransaction &transaction) override;
}; };
class CZMQPublishKevaNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyKeva(const CTransactionRef &ptx, unsigned int height, unsigned int type,
const std::string& nameSpace,
const std::string& key = std::string(),
const std::string& value = std::string()) override;
};
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H #endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H

Loading…
Cancel
Save