[tests] don't override __init__() in individual tests
[bitcoinplatinum.git] / test / functional / zmq_test.py
blob3f2668ee8724db13627f0e8ad33a9186f3e5ab34
1 #!/usr/bin/env python3
2 # Copyright (c) 2015-2016 The Bitcoin Core developers
3 # Distributed under the MIT software license, see the accompanying
4 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 """Test the ZMQ API."""
6 import configparser
7 import os
8 import struct
10 from test_framework.test_framework import BitcoinTestFramework, SkipTest
11 from test_framework.util import (assert_equal,
12 bytes_to_hex_str,
15 class ZMQTest (BitcoinTestFramework):
16 def set_test_params(self):
17 self.num_nodes = 2
19 def setup_nodes(self):
20 # Try to import python3-zmq. Skip this test if the import fails.
21 try:
22 import zmq
23 except ImportError:
24 raise SkipTest("python3-zmq module not available.")
26 # Check that bitcoin has been built with ZMQ enabled
27 config = configparser.ConfigParser()
28 if not self.options.configfile:
29 self.options.configfile = os.path.dirname(__file__) + "/../config.ini"
30 config.read_file(open(self.options.configfile))
32 if not config["components"].getboolean("ENABLE_ZMQ"):
33 raise SkipTest("bitcoind has not been built with zmq enabled.")
35 self.zmqContext = zmq.Context()
36 self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
37 self.zmqSubSocket.set(zmq.RCVTIMEO, 60000)
38 self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock")
39 self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx")
40 ip_address = "tcp://127.0.0.1:28332"
41 self.zmqSubSocket.connect(ip_address)
42 self.extra_args = [['-zmqpubhashtx=%s' % ip_address, '-zmqpubhashblock=%s' % ip_address], []]
43 self.add_nodes(self.num_nodes, self.extra_args)
44 self.start_nodes()
46 def run_test(self):
47 try:
48 self._zmq_test()
49 finally:
50 # Destroy the zmq context
51 self.log.debug("Destroying zmq context")
52 self.zmqContext.destroy(linger=None)
54 def _zmq_test(self):
55 genhashes = self.nodes[0].generate(1)
56 self.sync_all()
58 self.log.info("Wait for tx")
59 msg = self.zmqSubSocket.recv_multipart()
60 topic = msg[0]
61 assert_equal(topic, b"hashtx")
62 body = msg[1]
63 msgSequence = struct.unpack('<I', msg[-1])[-1]
64 assert_equal(msgSequence, 0) # must be sequence 0 on hashtx
66 self.log.info("Wait for block")
67 msg = self.zmqSubSocket.recv_multipart()
68 topic = msg[0]
69 body = msg[1]
70 msgSequence = struct.unpack('<I', msg[-1])[-1]
71 assert_equal(msgSequence, 0) # must be sequence 0 on hashblock
72 blkhash = bytes_to_hex_str(body)
74 assert_equal(genhashes[0], blkhash) # blockhash from generate must be equal to the hash received over zmq
76 self.log.info("Generate 10 blocks (and 10 coinbase txes)")
77 n = 10
78 genhashes = self.nodes[1].generate(n)
79 self.sync_all()
81 zmqHashes = []
82 blockcount = 0
83 for x in range(n * 2):
84 msg = self.zmqSubSocket.recv_multipart()
85 topic = msg[0]
86 body = msg[1]
87 if topic == b"hashblock":
88 zmqHashes.append(bytes_to_hex_str(body))
89 msgSequence = struct.unpack('<I', msg[-1])[-1]
90 assert_equal(msgSequence, blockcount + 1)
91 blockcount += 1
93 for x in range(n):
94 assert_equal(genhashes[x], zmqHashes[x]) # blockhash from generate must be equal to the hash received over zmq
96 self.log.info("Wait for tx from second node")
97 # test tx from a second node
98 hashRPC = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
99 self.sync_all()
101 # now we should receive a zmq msg because the tx was broadcast
102 msg = self.zmqSubSocket.recv_multipart()
103 topic = msg[0]
104 body = msg[1]
105 assert_equal(topic, b"hashtx")
106 hashZMQ = bytes_to_hex_str(body)
107 msgSequence = struct.unpack('<I', msg[-1])[-1]
108 assert_equal(msgSequence, blockcount + 1)
110 assert_equal(hashRPC, hashZMQ) # txid from sendtoaddress must be equal to the hash received over zmq
112 if __name__ == '__main__':
113 ZMQTest().main()