qa: Remove unused NodeConn members
[bitcoinplatinum.git] / test / functional / test_framework / comptool.py
blob723826bae4c7008d24fb7e9d459983f0b42e1dff
1 #!/usr/bin/env python3
2 # Copyright (c) 2015-2016 The Bitcoin Core developers
3 # Distributed under the MIT software license, see the accompanying
4 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 """Compare two or more bitcoinds to each other.
7 To use, create a class that implements get_tests(), and pass it in
8 as the test generator to TestManager. get_tests() should be a python
9 generator that returns TestInstance objects. See below for definition.
11 TestNode behaves as follows:
12 Configure with a BlockStore and TxStore
13 on_inv: log the message but don't request
14 on_headers: log the chain tip
15 on_pong: update ping response map (for synchronization)
16 on_getheaders: provide headers via BlockStore
17 on_getdata: provide blocks via BlockStore
18 """
20 from .mininode import *
21 from .blockstore import BlockStore, TxStore
22 from .util import p2p_port, wait_until
24 import logging
26 logger=logging.getLogger("TestFramework.comptool")
28 global mininode_lock
30 class RejectResult():
31 """Outcome that expects rejection of a transaction or block."""
32 def __init__(self, code, reason=b''):
33 self.code = code
34 self.reason = reason
35 def match(self, other):
36 if self.code != other.code:
37 return False
38 return other.reason.startswith(self.reason)
39 def __repr__(self):
40 return '%i:%s' % (self.code,self.reason or '*')
42 class TestNode(NodeConnCB):
44 def __init__(self, block_store, tx_store):
45 super().__init__()
46 self.conn = None
47 self.bestblockhash = None
48 self.block_store = block_store
49 self.block_request_map = {}
50 self.tx_store = tx_store
51 self.tx_request_map = {}
52 self.block_reject_map = {}
53 self.tx_reject_map = {}
55 # When the pingmap is non-empty we're waiting for
56 # a response
57 self.pingMap = {}
58 self.lastInv = []
59 self.closed = False
61 def on_close(self, conn):
62 self.closed = True
64 def add_connection(self, conn):
65 self.conn = conn
67 def on_headers(self, conn, message):
68 if len(message.headers) > 0:
69 best_header = message.headers[-1]
70 best_header.calc_sha256()
71 self.bestblockhash = best_header.sha256
73 def on_getheaders(self, conn, message):
74 response = self.block_store.headers_for(message.locator, message.hashstop)
75 if response is not None:
76 conn.send_message(response)
78 def on_getdata(self, conn, message):
79 [conn.send_message(r) for r in self.block_store.get_blocks(message.inv)]
80 [conn.send_message(r) for r in self.tx_store.get_transactions(message.inv)]
82 for i in message.inv:
83 if i.type == 1 or i.type == 1 | (1 << 30): # MSG_TX or MSG_WITNESS_TX
84 self.tx_request_map[i.hash] = True
85 elif i.type == 2 or i.type == 2 | (1 << 30): # MSG_BLOCK or MSG_WITNESS_BLOCK
86 self.block_request_map[i.hash] = True
88 def on_inv(self, conn, message):
89 self.lastInv = [x.hash for x in message.inv]
91 def on_pong(self, conn, message):
92 try:
93 del self.pingMap[message.nonce]
94 except KeyError:
95 raise AssertionError("Got pong for unknown ping [%s]" % repr(message))
97 def on_reject(self, conn, message):
98 if message.message == b'tx':
99 self.tx_reject_map[message.data] = RejectResult(message.code, message.reason)
100 if message.message == b'block':
101 self.block_reject_map[message.data] = RejectResult(message.code, message.reason)
103 def send_inv(self, obj):
104 mtype = 2 if isinstance(obj, CBlock) else 1
105 self.conn.send_message(msg_inv([CInv(mtype, obj.sha256)]))
107 def send_getheaders(self):
108 # We ask for headers from their last tip.
109 m = msg_getheaders()
110 m.locator = self.block_store.get_locator(self.bestblockhash)
111 self.conn.send_message(m)
113 def send_header(self, header):
114 m = msg_headers()
115 m.headers.append(header)
116 self.conn.send_message(m)
118 # This assumes BIP31
119 def send_ping(self, nonce):
120 self.pingMap[nonce] = True
121 self.conn.send_message(msg_ping(nonce))
123 def received_ping_response(self, nonce):
124 return nonce not in self.pingMap
126 def send_mempool(self):
127 self.lastInv = []
128 self.conn.send_message(msg_mempool())
130 # TestInstance:
132 # Instances of these are generated by the test generator, and fed into the
133 # comptool.
135 # "blocks_and_transactions" should be an array of
136 # [obj, True/False/None, hash/None]:
137 # - obj is either a CBlock, CBlockHeader, or a CTransaction, and
138 # - the second value indicates whether the object should be accepted
139 # into the blockchain or mempool (for tests where we expect a certain
140 # answer), or "None" if we don't expect a certain answer and are just
141 # comparing the behavior of the nodes being tested.
142 # - the third value is the hash to test the tip against (if None or omitted,
143 # use the hash of the block)
144 # - NOTE: if a block header, no test is performed; instead the header is
145 # just added to the block_store. This is to facilitate block delivery
146 # when communicating with headers-first clients (when withholding an
147 # intermediate block).
148 # sync_every_block: if True, then each block will be inv'ed, synced, and
149 # nodes will be tested based on the outcome for the block. If False,
150 # then inv's accumulate until all blocks are processed (or max inv size
151 # is reached) and then sent out in one inv message. Then the final block
152 # will be synced across all connections, and the outcome of the final
153 # block will be tested.
154 # sync_every_tx: analogous to behavior for sync_every_block, except if outcome
155 # on the final tx is None, then contents of entire mempool are compared
156 # across all connections. (If outcome of final tx is specified as true
157 # or false, then only the last tx is tested against outcome.)
159 class TestInstance():
160 def __init__(self, objects=None, sync_every_block=True, sync_every_tx=False):
161 self.blocks_and_transactions = objects if objects else []
162 self.sync_every_block = sync_every_block
163 self.sync_every_tx = sync_every_tx
165 class TestManager():
167 def __init__(self, testgen, datadir):
168 self.test_generator = testgen
169 self.connections = []
170 self.test_nodes = []
171 self.block_store = BlockStore(datadir)
172 self.tx_store = TxStore(datadir)
173 self.ping_counter = 1
175 def add_all_connections(self, nodes):
176 for i in range(len(nodes)):
177 # Create a p2p connection to each node
178 test_node = TestNode(self.block_store, self.tx_store)
179 self.test_nodes.append(test_node)
180 self.connections.append(NodeConn('127.0.0.1', p2p_port(i), test_node))
181 # Make sure the TestNode (callback class) has a reference to its
182 # associated NodeConn
183 test_node.add_connection(self.connections[-1])
185 def clear_all_connections(self):
186 self.connections = []
187 self.test_nodes = []
189 def wait_for_disconnections(self):
190 def disconnected():
191 return all(node.closed for node in self.test_nodes)
192 wait_until(disconnected, timeout=10, lock=mininode_lock)
194 def wait_for_verack(self):
195 return all(node.wait_for_verack() for node in self.test_nodes)
197 def wait_for_pings(self, counter):
198 def received_pongs():
199 return all(node.received_ping_response(counter) for node in self.test_nodes)
200 wait_until(received_pongs, lock=mininode_lock)
202 # sync_blocks: Wait for all connections to request the blockhash given
203 # then send get_headers to find out the tip of each node, and synchronize
204 # the response by using a ping (and waiting for pong with same nonce).
205 def sync_blocks(self, blockhash, num_blocks):
206 def blocks_requested():
207 return all(
208 blockhash in node.block_request_map and node.block_request_map[blockhash]
209 for node in self.test_nodes
212 # --> error if not requested
213 wait_until(blocks_requested, attempts=20*num_blocks, lock=mininode_lock)
215 # Send getheaders message
216 [ c.cb.send_getheaders() for c in self.connections ]
218 # Send ping and wait for response -- synchronization hack
219 [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
220 self.wait_for_pings(self.ping_counter)
221 self.ping_counter += 1
223 # Analogous to sync_block (see above)
224 def sync_transaction(self, txhash, num_events):
225 # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
226 def transaction_requested():
227 return all(
228 txhash in node.tx_request_map and node.tx_request_map[txhash]
229 for node in self.test_nodes
232 # --> error if not requested
233 wait_until(transaction_requested, attempts=20*num_events, lock=mininode_lock)
235 # Get the mempool
236 [ c.cb.send_mempool() for c in self.connections ]
238 # Send ping and wait for response -- synchronization hack
239 [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
240 self.wait_for_pings(self.ping_counter)
241 self.ping_counter += 1
243 # Sort inv responses from each node
244 with mininode_lock:
245 [ c.cb.lastInv.sort() for c in self.connections ]
247 # Verify that the tip of each connection all agree with each other, and
248 # with the expected outcome (if given)
249 def check_results(self, blockhash, outcome):
250 with mininode_lock:
251 for c in self.connections:
252 if outcome is None:
253 if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
254 return False
255 elif isinstance(outcome, RejectResult): # Check that block was rejected w/ code
256 if c.cb.bestblockhash == blockhash:
257 return False
258 if blockhash not in c.cb.block_reject_map:
259 logger.error('Block not in reject map: %064x' % (blockhash))
260 return False
261 if not outcome.match(c.cb.block_reject_map[blockhash]):
262 logger.error('Block rejected with %s instead of expected %s: %064x' % (c.cb.block_reject_map[blockhash], outcome, blockhash))
263 return False
264 elif ((c.cb.bestblockhash == blockhash) != outcome):
265 return False
266 return True
268 # Either check that the mempools all agree with each other, or that
269 # txhash's presence in the mempool matches the outcome specified.
270 # This is somewhat of a strange comparison, in that we're either comparing
271 # a particular tx to an outcome, or the entire mempools altogether;
272 # perhaps it would be useful to add the ability to check explicitly that
273 # a particular tx's existence in the mempool is the same across all nodes.
274 def check_mempool(self, txhash, outcome):
275 with mininode_lock:
276 for c in self.connections:
277 if outcome is None:
278 # Make sure the mempools agree with each other
279 if c.cb.lastInv != self.connections[0].cb.lastInv:
280 return False
281 elif isinstance(outcome, RejectResult): # Check that tx was rejected w/ code
282 if txhash in c.cb.lastInv:
283 return False
284 if txhash not in c.cb.tx_reject_map:
285 logger.error('Tx not in reject map: %064x' % (txhash))
286 return False
287 if not outcome.match(c.cb.tx_reject_map[txhash]):
288 logger.error('Tx rejected with %s instead of expected %s: %064x' % (c.cb.tx_reject_map[txhash], outcome, txhash))
289 return False
290 elif ((txhash in c.cb.lastInv) != outcome):
291 return False
292 return True
294 def run(self):
295 # Wait until verack is received
296 self.wait_for_verack()
298 test_number = 0
299 tests = self.test_generator.get_tests()
300 for test_instance in tests:
301 test_number += 1
302 logger.info("Running test %d: %s line %s" % (test_number, tests.gi_code.co_filename, tests.gi_frame.f_lineno))
303 # We use these variables to keep track of the last block
304 # and last transaction in the tests, which are used
305 # if we're not syncing on every block or every tx.
306 [ block, block_outcome, tip ] = [ None, None, None ]
307 [ tx, tx_outcome ] = [ None, None ]
308 invqueue = []
310 for test_obj in test_instance.blocks_and_transactions:
311 b_or_t = test_obj[0]
312 outcome = test_obj[1]
313 # Determine if we're dealing with a block or tx
314 if isinstance(b_or_t, CBlock): # Block test runner
315 block = b_or_t
316 block_outcome = outcome
317 tip = block.sha256
318 # each test_obj can have an optional third argument
319 # to specify the tip we should compare with
320 # (default is to use the block being tested)
321 if len(test_obj) >= 3:
322 tip = test_obj[2]
324 # Add to shared block_store, set as current block
325 # If there was an open getdata request for the block
326 # previously, and we didn't have an entry in the
327 # block_store, then immediately deliver, because the
328 # node wouldn't send another getdata request while
329 # the earlier one is outstanding.
330 first_block_with_hash = True
331 if self.block_store.get(block.sha256) is not None:
332 first_block_with_hash = False
333 with mininode_lock:
334 self.block_store.add_block(block)
335 for c in self.connections:
336 if first_block_with_hash and block.sha256 in c.cb.block_request_map and c.cb.block_request_map[block.sha256] == True:
337 # There was a previous request for this block hash
338 # Most likely, we delivered a header for this block
339 # but never had the block to respond to the getdata
340 c.send_message(msg_block(block))
341 else:
342 c.cb.block_request_map[block.sha256] = False
343 # Either send inv's to each node and sync, or add
344 # to invqueue for later inv'ing.
345 if (test_instance.sync_every_block):
346 # if we expect success, send inv and sync every block
347 # if we expect failure, just push the block and see what happens.
348 if outcome == True:
349 [ c.cb.send_inv(block) for c in self.connections ]
350 self.sync_blocks(block.sha256, 1)
351 else:
352 [ c.send_message(msg_block(block)) for c in self.connections ]
353 [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
354 self.wait_for_pings(self.ping_counter)
355 self.ping_counter += 1
356 if (not self.check_results(tip, outcome)):
357 raise AssertionError("Test failed at test %d" % test_number)
358 else:
359 invqueue.append(CInv(2, block.sha256))
360 elif isinstance(b_or_t, CBlockHeader):
361 block_header = b_or_t
362 self.block_store.add_header(block_header)
363 [ c.cb.send_header(block_header) for c in self.connections ]
365 else: # Tx test runner
366 assert(isinstance(b_or_t, CTransaction))
367 tx = b_or_t
368 tx_outcome = outcome
369 # Add to shared tx store and clear map entry
370 with mininode_lock:
371 self.tx_store.add_transaction(tx)
372 for c in self.connections:
373 c.cb.tx_request_map[tx.sha256] = False
374 # Again, either inv to all nodes or save for later
375 if (test_instance.sync_every_tx):
376 [ c.cb.send_inv(tx) for c in self.connections ]
377 self.sync_transaction(tx.sha256, 1)
378 if (not self.check_mempool(tx.sha256, outcome)):
379 raise AssertionError("Test failed at test %d" % test_number)
380 else:
381 invqueue.append(CInv(1, tx.sha256))
382 # Ensure we're not overflowing the inv queue
383 if len(invqueue) == MAX_INV_SZ:
384 [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
385 invqueue = []
387 # Do final sync if we weren't syncing on every block or every tx.
388 if (not test_instance.sync_every_block and block is not None):
389 if len(invqueue) > 0:
390 [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
391 invqueue = []
392 self.sync_blocks(block.sha256, len(test_instance.blocks_and_transactions))
393 if (not self.check_results(tip, block_outcome)):
394 raise AssertionError("Block test failed at test %d" % test_number)
395 if (not test_instance.sync_every_tx and tx is not None):
396 if len(invqueue) > 0:
397 [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
398 invqueue = []
399 self.sync_transaction(tx.sha256, len(test_instance.blocks_and_transactions))
400 if (not self.check_mempool(tx.sha256, tx_outcome)):
401 raise AssertionError("Mempool test failed at test %d" % test_number)
403 [ c.disconnect_node() for c in self.connections ]
404 self.wait_for_disconnections()
405 self.block_store.close()
406 self.tx_store.close()