1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
6 #include <chainparams.h>
8 #include <zmq/zmqpublishnotifier.h>
9 #include <validation.h>
11 #include <rpc/server.h>
13 static std::multimap
<std::string
, CZMQAbstractPublishNotifier
*> mapPublishNotifiers
;
15 static const char *MSG_HASHBLOCK
= "hashblock";
16 static const char *MSG_HASHTX
= "hashtx";
17 static const char *MSG_RAWBLOCK
= "rawblock";
18 static const char *MSG_RAWTX
= "rawtx";
20 // Internal function to send multipart message
21 static int zmq_send_multipart(void *sock
, const void* data
, size_t size
, ...)
30 int rc
= zmq_msg_init_size(&msg
, size
);
33 zmqError("Unable to initialize ZMQ msg");
38 void *buf
= zmq_msg_data(&msg
);
39 memcpy(buf
, data
, size
);
41 data
= va_arg(args
, const void*);
43 rc
= zmq_msg_send(&msg
, sock
, data
? ZMQ_SNDMORE
: 0);
46 zmqError("Unable to send ZMQ msg");
57 size
= va_arg(args
, size_t);
63 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext
)
67 // check if address is being used by other publish notifier
68 std::multimap
<std::string
, CZMQAbstractPublishNotifier
*>::iterator i
= mapPublishNotifiers
.find(address
);
70 if (i
==mapPublishNotifiers
.end())
72 psocket
= zmq_socket(pcontext
, ZMQ_PUB
);
75 zmqError("Failed to create socket");
79 int rc
= zmq_bind(psocket
, address
.c_str());
82 zmqError("Failed to bind address");
87 // register this notifier for the address, so it can be reused for other publish notifier
88 mapPublishNotifiers
.insert(std::make_pair(address
, this));
93 LogPrint(BCLog::ZMQ
, "zmq: Reusing socket for address %s\n", address
);
95 psocket
= i
->second
->psocket
;
96 mapPublishNotifiers
.insert(std::make_pair(address
, this));
102 void CZMQAbstractPublishNotifier::Shutdown()
106 int count
= mapPublishNotifiers
.count(address
);
108 // remove this notifier from the list of publishers using this address
109 typedef std::multimap
<std::string
, CZMQAbstractPublishNotifier
*>::iterator iterator
;
110 std::pair
<iterator
, iterator
> iterpair
= mapPublishNotifiers
.equal_range(address
);
112 for (iterator it
= iterpair
.first
; it
!= iterpair
.second
; ++it
)
114 if (it
->second
==this)
116 mapPublishNotifiers
.erase(it
);
123 LogPrint(BCLog::ZMQ
, "Close socket at address %s\n", address
);
125 zmq_setsockopt(psocket
, ZMQ_LINGER
, &linger
, sizeof(linger
));
132 bool CZMQAbstractPublishNotifier::SendMessage(const char *command
, const void* data
, size_t size
)
136 /* send three parts, command & data & a LE 4byte sequence number */
137 unsigned char msgseq
[sizeof(uint32_t)];
138 WriteLE32(&msgseq
[0], nSequence
);
139 int rc
= zmq_send_multipart(psocket
, command
, strlen(command
), data
, size
, msgseq
, (size_t)sizeof(uint32_t), nullptr);
143 /* increment memory only sequence number after sending */
149 bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex
*pindex
)
151 uint256 hash
= pindex
->GetBlockHash();
152 LogPrint(BCLog::ZMQ
, "zmq: Publish hashblock %s\n", hash
.GetHex());
154 for (unsigned int i
= 0; i
< 32; i
++)
155 data
[31 - i
] = hash
.begin()[i
];
156 return SendMessage(MSG_HASHBLOCK
, data
, 32);
159 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction
&transaction
)
161 uint256 hash
= transaction
.GetHash();
162 LogPrint(BCLog::ZMQ
, "zmq: Publish hashtx %s\n", hash
.GetHex());
164 for (unsigned int i
= 0; i
< 32; i
++)
165 data
[31 - i
] = hash
.begin()[i
];
166 return SendMessage(MSG_HASHTX
, data
, 32);
169 bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex
*pindex
)
171 LogPrint(BCLog::ZMQ
, "zmq: Publish rawblock %s\n", pindex
->GetBlockHash().GetHex());
173 const Consensus::Params
& consensusParams
= Params().GetConsensus();
174 CDataStream
ss(SER_NETWORK
, PROTOCOL_VERSION
| RPCSerializationFlags());
178 if(!ReadBlockFromDisk(block
, pindex
, consensusParams
))
180 zmqError("Can't read block from disk");
187 return SendMessage(MSG_RAWBLOCK
, &(*ss
.begin()), ss
.size());
190 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction
&transaction
)
192 uint256 hash
= transaction
.GetHash();
193 LogPrint(BCLog::ZMQ
, "zmq: Publish rawtx %s\n", hash
.GetHex());
194 CDataStream
ss(SER_NETWORK
, PROTOCOL_VERSION
| RPCSerializationFlags());
196 return SendMessage(MSG_RAWTX
, &(*ss
.begin()), ss
.size());