1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 #include "zmqpublishnotifier.h"
9 static std::multimap
<std::string
, CZMQAbstractPublishNotifier
*> mapPublishNotifiers
;
11 // Internal function to send multipart message
12 static int zmq_send_multipart(void *sock
, const void* data
, size_t size
, ...)
21 int rc
= zmq_msg_init_size(&msg
, size
);
24 zmqError("Unable to initialize ZMQ msg");
28 void *buf
= zmq_msg_data(&msg
);
29 memcpy(buf
, data
, size
);
31 data
= va_arg(args
, const void*);
33 rc
= zmq_msg_send(&msg
, sock
, data
? ZMQ_SNDMORE
: 0);
36 zmqError("Unable to send ZMQ msg");
46 size
= va_arg(args
, size_t);
51 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext
)
55 // check if address is being used by other publish notifier
56 std::multimap
<std::string
, CZMQAbstractPublishNotifier
*>::iterator i
= mapPublishNotifiers
.find(address
);
58 if (i
==mapPublishNotifiers
.end())
60 psocket
= zmq_socket(pcontext
, ZMQ_PUB
);
63 zmqError("Failed to create socket");
67 int rc
= zmq_bind(psocket
, address
.c_str());
70 zmqError("Failed to bind address");
74 // register this notifier for the address, so it can be reused for other publish notifier
75 mapPublishNotifiers
.insert(std::make_pair(address
, this));
80 LogPrint("zmq", " Reuse socket for address %s\n", address
);
82 psocket
= i
->second
->psocket
;
83 mapPublishNotifiers
.insert(std::make_pair(address
, this));
89 void CZMQAbstractPublishNotifier::Shutdown()
93 int count
= mapPublishNotifiers
.count(address
);
95 // remove this notifier from the list of publishers using this address
96 typedef std::multimap
<std::string
, CZMQAbstractPublishNotifier
*>::iterator iterator
;
97 std::pair
<iterator
, iterator
> iterpair
= mapPublishNotifiers
.equal_range(address
);
99 for (iterator it
= iterpair
.first
; it
!= iterpair
.second
; ++it
)
101 if (it
->second
==this)
103 mapPublishNotifiers
.erase(it
);
110 LogPrint("zmq", "Close socket at address %s\n", address
);
112 zmq_setsockopt(psocket
, ZMQ_LINGER
, &linger
, sizeof(linger
));
119 bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256
&hash
)
121 LogPrint("zmq", "Publish hash block %s\n", hash
.GetHex());
123 for (unsigned int i
= 0; i
< 32; i
++)
124 data
[31 - i
] = hash
.begin()[i
];
125 int rc
= zmq_send_multipart(psocket
, "hashblock", 9, data
, 32, 0);
129 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction
&transaction
)
131 uint256 hash
= transaction
.GetHash();
132 LogPrint("zmq", "Publish hash transaction %s\n", hash
.GetHex());
134 for (unsigned int i
= 0; i
< 32; i
++)
135 data
[31 - i
] = hash
.begin()[i
];
136 int rc
= zmq_send_multipart(psocket
, "hashtx", 6, data
, 32, 0);
140 bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256
&hash
)
142 LogPrint("zmq", "Publish raw block %s\n", hash
.GetHex());
144 CDataStream
ss(SER_NETWORK
, PROTOCOL_VERSION
);
149 CBlockIndex
* pblockindex
= mapBlockIndex
[hash
];
151 if(!ReadBlockFromDisk(block
, pblockindex
))
153 zmqError("Can't read block from disk");
160 int rc
= zmq_send_multipart(psocket
, "rawblock", 8, &(*ss
.begin()), ss
.size(), 0);
164 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction
&transaction
)
166 uint256 hash
= transaction
.GetHash();
167 LogPrint("zmq", "Publish raw transaction %s\n", hash
.GetHex());
168 CDataStream
ss(SER_NETWORK
, PROTOCOL_VERSION
);
170 int rc
= zmq_send_multipart(psocket
, "rawtx", 5, &(*ss
.begin()), ss
.size(), 0);