1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 #include "zmqnotificationinterface.h"
6 #include "zmqpublishnotifier.h"
9 #include "validation.h"
13 void zmqError(const char *str
)
15 LogPrint(BCLog::ZMQ
, "zmq: Error: %s, errno=%s\n", str
, zmq_strerror(errno
));
18 CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr)
22 CZMQNotificationInterface::~CZMQNotificationInterface()
26 for (std::list
<CZMQAbstractNotifier
*>::iterator i
=notifiers
.begin(); i
!=notifiers
.end(); ++i
)
32 CZMQNotificationInterface
* CZMQNotificationInterface::Create()
34 CZMQNotificationInterface
* notificationInterface
= nullptr;
35 std::map
<std::string
, CZMQNotifierFactory
> factories
;
36 std::list
<CZMQAbstractNotifier
*> notifiers
;
38 factories
["pubhashblock"] = CZMQAbstractNotifier::Create
<CZMQPublishHashBlockNotifier
>;
39 factories
["pubhashtx"] = CZMQAbstractNotifier::Create
<CZMQPublishHashTransactionNotifier
>;
40 factories
["pubrawblock"] = CZMQAbstractNotifier::Create
<CZMQPublishRawBlockNotifier
>;
41 factories
["pubrawtx"] = CZMQAbstractNotifier::Create
<CZMQPublishRawTransactionNotifier
>;
43 for (std::map
<std::string
, CZMQNotifierFactory
>::const_iterator i
=factories
.begin(); i
!=factories
.end(); ++i
)
45 std::string
arg("-zmq" + i
->first
);
46 if (gArgs
.IsArgSet(arg
))
48 CZMQNotifierFactory factory
= i
->second
;
49 std::string address
= gArgs
.GetArg(arg
, "");
50 CZMQAbstractNotifier
*notifier
= factory();
51 notifier
->SetType(i
->first
);
52 notifier
->SetAddress(address
);
53 notifiers
.push_back(notifier
);
57 if (!notifiers
.empty())
59 notificationInterface
= new CZMQNotificationInterface();
60 notificationInterface
->notifiers
= notifiers
;
62 if (!notificationInterface
->Initialize())
64 delete notificationInterface
;
65 notificationInterface
= nullptr;
69 return notificationInterface
;
72 // Called at startup to conditionally set up ZMQ socket(s)
73 bool CZMQNotificationInterface::Initialize()
75 LogPrint(BCLog::ZMQ
, "zmq: Initialize notification interface\n");
78 pcontext
= zmq_init(1);
82 zmqError("Unable to initialize context");
86 std::list
<CZMQAbstractNotifier
*>::iterator i
=notifiers
.begin();
87 for (; i
!=notifiers
.end(); ++i
)
89 CZMQAbstractNotifier
*notifier
= *i
;
90 if (notifier
->Initialize(pcontext
))
92 LogPrint(BCLog::ZMQ
, " Notifier %s ready (address = %s)\n", notifier
->GetType(), notifier
->GetAddress());
96 LogPrint(BCLog::ZMQ
, " Notifier %s failed (address = %s)\n", notifier
->GetType(), notifier
->GetAddress());
101 if (i
!=notifiers
.end())
109 // Called during shutdown sequence
110 void CZMQNotificationInterface::Shutdown()
112 LogPrint(BCLog::ZMQ
, "zmq: Shutdown notification interface\n");
115 for (std::list
<CZMQAbstractNotifier
*>::iterator i
=notifiers
.begin(); i
!=notifiers
.end(); ++i
)
117 CZMQAbstractNotifier
*notifier
= *i
;
118 LogPrint(BCLog::ZMQ
, " Shutdown notifier %s at %s\n", notifier
->GetType(), notifier
->GetAddress());
119 notifier
->Shutdown();
121 zmq_ctx_destroy(pcontext
);
127 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex
*pindexNew
, const CBlockIndex
*pindexFork
, bool fInitialDownload
)
129 if (fInitialDownload
|| pindexNew
== pindexFork
) // In IBD or blocks were disconnected without any new ones
132 for (std::list
<CZMQAbstractNotifier
*>::iterator i
= notifiers
.begin(); i
!=notifiers
.end(); )
134 CZMQAbstractNotifier
*notifier
= *i
;
135 if (notifier
->NotifyBlock(pindexNew
))
141 notifier
->Shutdown();
142 i
= notifiers
.erase(i
);
147 void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef
& ptx
)
149 // Used by BlockConnected and BlockDisconnected as well, because they're
150 // all the same external callback.
151 const CTransaction
& tx
= *ptx
;
153 for (std::list
<CZMQAbstractNotifier
*>::iterator i
= notifiers
.begin(); i
!=notifiers
.end(); )
155 CZMQAbstractNotifier
*notifier
= *i
;
156 if (notifier
->NotifyTransaction(tx
))
162 notifier
->Shutdown();
163 i
= notifiers
.erase(i
);
168 void CZMQNotificationInterface::BlockConnected(const std::shared_ptr
<const CBlock
>& pblock
, const CBlockIndex
* pindexConnected
, const std::vector
<CTransactionRef
>& vtxConflicted
)
170 for (const CTransactionRef
& ptx
: pblock
->vtx
) {
171 // Do a normal notify for each transaction added in the block
172 TransactionAddedToMempool(ptx
);
176 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr
<const CBlock
>& pblock
)
178 for (const CTransactionRef
& ptx
: pblock
->vtx
) {
179 // Do a normal notify for each transaction removed in block disconnection
180 TransactionAddedToMempool(ptx
);