[tests] Avoid passing around member variables in test_framework
[bitcoinplatinum.git] / test / functional / zmq_test.py
blobb5a22ea07fc8f6b5d7050d03413bb9ce061382e5
1 #!/usr/bin/env python3
2 # Copyright (c) 2015-2016 The Bitcoin Core developers
3 # Distributed under the MIT software license, see the accompanying
4 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 """Test the ZMQ API."""
6 import configparser
7 import os
8 import struct
10 from test_framework.test_framework import BitcoinTestFramework, SkipTest
11 from test_framework.util import (assert_equal,
12 bytes_to_hex_str,
15 class ZMQTest (BitcoinTestFramework):
17 def __init__(self):
18 super().__init__()
19 self.num_nodes = 2
21 def setup_nodes(self):
22 # Try to import python3-zmq. Skip this test if the import fails.
23 try:
24 import zmq
25 except ImportError:
26 raise SkipTest("python3-zmq module not available.")
28 # Check that bitcoin has been built with ZMQ enabled
29 config = configparser.ConfigParser()
30 if not self.options.configfile:
31 self.options.configfile = os.path.dirname(__file__) + "/../config.ini"
32 config.read_file(open(self.options.configfile))
34 if not config["components"].getboolean("ENABLE_ZMQ"):
35 raise SkipTest("bitcoind has not been built with zmq enabled.")
37 self.zmqContext = zmq.Context()
38 self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
39 self.zmqSubSocket.set(zmq.RCVTIMEO, 60000)
40 self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock")
41 self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx")
42 ip_address = "tcp://127.0.0.1:28332"
43 self.zmqSubSocket.connect(ip_address)
44 self.extra_args = [['-zmqpubhashtx=%s' % ip_address, '-zmqpubhashblock=%s' % ip_address], []]
45 self.add_nodes(self.num_nodes, self.extra_args)
46 self.start_nodes()
48 def run_test(self):
49 try:
50 self._zmq_test()
51 finally:
52 # Destroy the zmq context
53 self.log.debug("Destroying zmq context")
54 self.zmqContext.destroy(linger=None)
56 def _zmq_test(self):
57 genhashes = self.nodes[0].generate(1)
58 self.sync_all()
60 self.log.info("Wait for tx")
61 msg = self.zmqSubSocket.recv_multipart()
62 topic = msg[0]
63 assert_equal(topic, b"hashtx")
64 body = msg[1]
65 msgSequence = struct.unpack('<I', msg[-1])[-1]
66 assert_equal(msgSequence, 0) # must be sequence 0 on hashtx
68 self.log.info("Wait for block")
69 msg = self.zmqSubSocket.recv_multipart()
70 topic = msg[0]
71 body = msg[1]
72 msgSequence = struct.unpack('<I', msg[-1])[-1]
73 assert_equal(msgSequence, 0) # must be sequence 0 on hashblock
74 blkhash = bytes_to_hex_str(body)
76 assert_equal(genhashes[0], blkhash) # blockhash from generate must be equal to the hash received over zmq
78 self.log.info("Generate 10 blocks (and 10 coinbase txes)")
79 n = 10
80 genhashes = self.nodes[1].generate(n)
81 self.sync_all()
83 zmqHashes = []
84 blockcount = 0
85 for x in range(n * 2):
86 msg = self.zmqSubSocket.recv_multipart()
87 topic = msg[0]
88 body = msg[1]
89 if topic == b"hashblock":
90 zmqHashes.append(bytes_to_hex_str(body))
91 msgSequence = struct.unpack('<I', msg[-1])[-1]
92 assert_equal(msgSequence, blockcount + 1)
93 blockcount += 1
95 for x in range(n):
96 assert_equal(genhashes[x], zmqHashes[x]) # blockhash from generate must be equal to the hash received over zmq
98 self.log.info("Wait for tx from second node")
99 # test tx from a second node
100 hashRPC = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
101 self.sync_all()
103 # now we should receive a zmq msg because the tx was broadcast
104 msg = self.zmqSubSocket.recv_multipart()
105 topic = msg[0]
106 body = msg[1]
107 assert_equal(topic, b"hashtx")
108 hashZMQ = bytes_to_hex_str(body)
109 msgSequence = struct.unpack('<I', msg[-1])[-1]
110 assert_equal(msgSequence, blockcount + 1)
112 assert_equal(hashRPC, hashZMQ) # txid from sendtoaddress must be equal to the hash received over zmq
114 if __name__ == '__main__':
115 ZMQTest().main()