Add ZeroMQ support. Notify blocks and transactions via ZeroMQ
[bitcoinplatinum.git] / src / zmq / zmqpublishnotifier.cpp
blob0a6d7d0dbc57d94d73888abeea332b10ac94aca0
1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 #include "zmqpublishnotifier.h"
6 #include "main.h"
7 #include "util.h"
9 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
11 // Internal function to send multipart message
12 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
14 va_list args;
15 va_start(args, size);
17 while (1)
19 zmq_msg_t msg;
21 int rc = zmq_msg_init_size(&msg, size);
22 if (rc != 0)
24 zmqError("Unable to initialize ZMQ msg");
25 return -1;
28 void *buf = zmq_msg_data(&msg);
29 memcpy(buf, data, size);
31 data = va_arg(args, const void*);
33 rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
34 if (rc == -1)
36 zmqError("Unable to send ZMQ msg");
37 zmq_msg_close(&msg);
38 return -1;
41 zmq_msg_close(&msg);
43 if (!data)
44 break;
46 size = va_arg(args, size_t);
48 return 0;
51 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
53 assert(!psocket);
55 // check if address is being used by other publish notifier
56 std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
58 if (i==mapPublishNotifiers.end())
60 psocket = zmq_socket(pcontext, ZMQ_PUB);
61 if (!psocket)
63 zmqError("Failed to create socket");
64 return false;
67 int rc = zmq_bind(psocket, address.c_str());
68 if (rc!=0)
70 zmqError("Failed to bind address");
71 return false;
74 // register this notifier for the address, so it can be reused for other publish notifier
75 mapPublishNotifiers.insert(std::make_pair(address, this));
76 return true;
78 else
80 LogPrint("zmq", " Reuse socket for address %s\n", address);
82 psocket = i->second->psocket;
83 mapPublishNotifiers.insert(std::make_pair(address, this));
85 return true;
89 void CZMQAbstractPublishNotifier::Shutdown()
91 assert(psocket);
93 int count = mapPublishNotifiers.count(address);
95 // remove this notifier from the list of publishers using this address
96 typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
97 std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
99 for (iterator it = iterpair.first; it != iterpair.second; ++it)
101 if (it->second==this)
103 mapPublishNotifiers.erase(it);
104 break;
108 if (count == 1)
110 LogPrint("zmq", "Close socket at address %s\n", address);
111 int linger = 0;
112 zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
113 zmq_close(psocket);
116 psocket = 0;
119 bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash)
121 LogPrint("zmq", "Publish hash block %s\n", hash.GetHex());
122 char data[32];
123 for (unsigned int i = 0; i < 32; i++)
124 data[31 - i] = hash.begin()[i];
125 int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
126 return rc == 0;
129 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
131 uint256 hash = transaction.GetHash();
132 LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex());
133 char data[32];
134 for (unsigned int i = 0; i < 32; i++)
135 data[31 - i] = hash.begin()[i];
136 int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
137 return rc == 0;
140 bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash)
142 LogPrint("zmq", "Publish raw block %s\n", hash.GetHex());
144 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
146 LOCK(cs_main);
148 CBlock block;
149 CBlockIndex* pblockindex = mapBlockIndex[hash];
151 if(!ReadBlockFromDisk(block, pblockindex))
153 zmqError("Can't read block from disk");
154 return false;
157 ss << block;
160 int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
161 return rc == 0;
164 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
166 uint256 hash = transaction.GetHash();
167 LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex());
168 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
169 ss << transaction;
170 int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
171 return rc == 0;