2 # Copyright (c) 2015-2016 The Bitcoin Core developers
3 # Distributed under the MIT software license, see the accompanying
4 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 """Compare two or more bitcoinds to each other.
7 To use, create a class that implements get_tests(), and pass it in
8 as the test generator to TestManager. get_tests() should be a python
9 generator that returns TestInstance objects. See below for definition.
11 TestNode behaves as follows:
12 Configure with a BlockStore and TxStore
13 on_inv: log the message but don't request
14 on_headers: log the chain tip
15 on_pong: update ping response map (for synchronization)
16 on_getheaders: provide headers via BlockStore
17 on_getdata: provide blocks via BlockStore
20 from .mininode
import *
21 from .blockstore
import BlockStore
, TxStore
22 from .util
import p2p_port
, wait_until
26 logger
=logging
.getLogger("TestFramework.comptool")
31 """Outcome that expects rejection of a transaction or block."""
32 def __init__(self
, code
, reason
=b
''):
35 def match(self
, other
):
36 if self
.code
!= other
.code
:
38 return other
.reason
.startswith(self
.reason
)
40 return '%i:%s' % (self
.code
,self
.reason
or '*')
42 class TestNode(NodeConnCB
):
44 def __init__(self
, block_store
, tx_store
):
47 self
.bestblockhash
= None
48 self
.block_store
= block_store
49 self
.block_request_map
= {}
50 self
.tx_store
= tx_store
51 self
.tx_request_map
= {}
52 self
.block_reject_map
= {}
53 self
.tx_reject_map
= {}
55 # When the pingmap is non-empty we're waiting for
61 def on_close(self
, conn
):
64 def add_connection(self
, conn
):
67 def on_headers(self
, conn
, message
):
68 if len(message
.headers
) > 0:
69 best_header
= message
.headers
[-1]
70 best_header
.calc_sha256()
71 self
.bestblockhash
= best_header
.sha256
73 def on_getheaders(self
, conn
, message
):
74 response
= self
.block_store
.headers_for(message
.locator
, message
.hashstop
)
75 if response
is not None:
76 conn
.send_message(response
)
78 def on_getdata(self
, conn
, message
):
79 [conn
.send_message(r
) for r
in self
.block_store
.get_blocks(message
.inv
)]
80 [conn
.send_message(r
) for r
in self
.tx_store
.get_transactions(message
.inv
)]
83 if i
.type == 1 or i
.type == 1 |
(1 << 30): # MSG_TX or MSG_WITNESS_TX
84 self
.tx_request_map
[i
.hash] = True
85 elif i
.type == 2 or i
.type == 2 |
(1 << 30): # MSG_BLOCK or MSG_WITNESS_BLOCK
86 self
.block_request_map
[i
.hash] = True
88 def on_inv(self
, conn
, message
):
89 self
.lastInv
= [x
.hash for x
in message
.inv
]
91 def on_pong(self
, conn
, message
):
93 del self
.pingMap
[message
.nonce
]
95 raise AssertionError("Got pong for unknown ping [%s]" % repr(message
))
97 def on_reject(self
, conn
, message
):
98 if message
.message
== b
'tx':
99 self
.tx_reject_map
[message
.data
] = RejectResult(message
.code
, message
.reason
)
100 if message
.message
== b
'block':
101 self
.block_reject_map
[message
.data
] = RejectResult(message
.code
, message
.reason
)
103 def send_inv(self
, obj
):
104 mtype
= 2 if isinstance(obj
, CBlock
) else 1
105 self
.conn
.send_message(msg_inv([CInv(mtype
, obj
.sha256
)]))
107 def send_getheaders(self
):
108 # We ask for headers from their last tip.
110 m
.locator
= self
.block_store
.get_locator(self
.bestblockhash
)
111 self
.conn
.send_message(m
)
113 def send_header(self
, header
):
115 m
.headers
.append(header
)
116 self
.conn
.send_message(m
)
119 def send_ping(self
, nonce
):
120 self
.pingMap
[nonce
] = True
121 self
.conn
.send_message(msg_ping(nonce
))
123 def received_ping_response(self
, nonce
):
124 return nonce
not in self
.pingMap
126 def send_mempool(self
):
128 self
.conn
.send_message(msg_mempool())
132 # Instances of these are generated by the test generator, and fed into the
135 # "blocks_and_transactions" should be an array of
136 # [obj, True/False/None, hash/None]:
137 # - obj is either a CBlock, CBlockHeader, or a CTransaction, and
138 # - the second value indicates whether the object should be accepted
139 # into the blockchain or mempool (for tests where we expect a certain
140 # answer), or "None" if we don't expect a certain answer and are just
141 # comparing the behavior of the nodes being tested.
142 # - the third value is the hash to test the tip against (if None or omitted,
143 # use the hash of the block)
144 # - NOTE: if a block header, no test is performed; instead the header is
145 # just added to the block_store. This is to facilitate block delivery
146 # when communicating with headers-first clients (when withholding an
147 # intermediate block).
148 # sync_every_block: if True, then each block will be inv'ed, synced, and
149 # nodes will be tested based on the outcome for the block. If False,
150 # then inv's accumulate until all blocks are processed (or max inv size
151 # is reached) and then sent out in one inv message. Then the final block
152 # will be synced across all connections, and the outcome of the final
153 # block will be tested.
154 # sync_every_tx: analogous to behavior for sync_every_block, except if outcome
155 # on the final tx is None, then contents of entire mempool are compared
156 # across all connections. (If outcome of final tx is specified as true
157 # or false, then only the last tx is tested against outcome.)
159 class TestInstance():
160 def __init__(self
, objects
=None, sync_every_block
=True, sync_every_tx
=False):
161 self
.blocks_and_transactions
= objects
if objects
else []
162 self
.sync_every_block
= sync_every_block
163 self
.sync_every_tx
= sync_every_tx
167 def __init__(self
, testgen
, datadir
):
168 self
.test_generator
= testgen
169 self
.connections
= []
171 self
.block_store
= BlockStore(datadir
)
172 self
.tx_store
= TxStore(datadir
)
173 self
.ping_counter
= 1
175 def add_all_connections(self
, nodes
):
176 for i
in range(len(nodes
)):
177 # Create a p2p connection to each node
178 test_node
= TestNode(self
.block_store
, self
.tx_store
)
179 self
.test_nodes
.append(test_node
)
180 self
.connections
.append(NodeConn('127.0.0.1', p2p_port(i
), test_node
))
181 # Make sure the TestNode (callback class) has a reference to its
182 # associated NodeConn
183 test_node
.add_connection(self
.connections
[-1])
185 def clear_all_connections(self
):
186 self
.connections
= []
189 def wait_for_disconnections(self
):
191 return all(node
.closed
for node
in self
.test_nodes
)
192 wait_until(disconnected
, timeout
=10, lock
=mininode_lock
)
194 def wait_for_verack(self
):
195 return all(node
.wait_for_verack() for node
in self
.test_nodes
)
197 def wait_for_pings(self
, counter
):
198 def received_pongs():
199 return all(node
.received_ping_response(counter
) for node
in self
.test_nodes
)
200 wait_until(received_pongs
, lock
=mininode_lock
)
202 # sync_blocks: Wait for all connections to request the blockhash given
203 # then send get_headers to find out the tip of each node, and synchronize
204 # the response by using a ping (and waiting for pong with same nonce).
205 def sync_blocks(self
, blockhash
, num_blocks
):
206 def blocks_requested():
208 blockhash
in node
.block_request_map
and node
.block_request_map
[blockhash
]
209 for node
in self
.test_nodes
212 # --> error if not requested
213 wait_until(blocks_requested
, attempts
=20*num_blocks
, lock
=mininode_lock
)
215 # Send getheaders message
216 [ c
.cb
.send_getheaders() for c
in self
.connections
]
218 # Send ping and wait for response -- synchronization hack
219 [ c
.cb
.send_ping(self
.ping_counter
) for c
in self
.connections
]
220 self
.wait_for_pings(self
.ping_counter
)
221 self
.ping_counter
+= 1
223 # Analogous to sync_block (see above)
224 def sync_transaction(self
, txhash
, num_events
):
225 # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
226 def transaction_requested():
228 txhash
in node
.tx_request_map
and node
.tx_request_map
[txhash
]
229 for node
in self
.test_nodes
232 # --> error if not requested
233 wait_until(transaction_requested
, attempts
=20*num_events
, lock
=mininode_lock
)
236 [ c
.cb
.send_mempool() for c
in self
.connections
]
238 # Send ping and wait for response -- synchronization hack
239 [ c
.cb
.send_ping(self
.ping_counter
) for c
in self
.connections
]
240 self
.wait_for_pings(self
.ping_counter
)
241 self
.ping_counter
+= 1
243 # Sort inv responses from each node
245 [ c
.cb
.lastInv
.sort() for c
in self
.connections
]
247 # Verify that the tip of each connection all agree with each other, and
248 # with the expected outcome (if given)
249 def check_results(self
, blockhash
, outcome
):
251 for c
in self
.connections
:
253 if c
.cb
.bestblockhash
!= self
.connections
[0].cb
.bestblockhash
:
255 elif isinstance(outcome
, RejectResult
): # Check that block was rejected w/ code
256 if c
.cb
.bestblockhash
== blockhash
:
258 if blockhash
not in c
.cb
.block_reject_map
:
259 logger
.error('Block not in reject map: %064x' % (blockhash
))
261 if not outcome
.match(c
.cb
.block_reject_map
[blockhash
]):
262 logger
.error('Block rejected with %s instead of expected %s: %064x' % (c
.cb
.block_reject_map
[blockhash
], outcome
, blockhash
))
264 elif ((c
.cb
.bestblockhash
== blockhash
) != outcome
):
268 # Either check that the mempools all agree with each other, or that
269 # txhash's presence in the mempool matches the outcome specified.
270 # This is somewhat of a strange comparison, in that we're either comparing
271 # a particular tx to an outcome, or the entire mempools altogether;
272 # perhaps it would be useful to add the ability to check explicitly that
273 # a particular tx's existence in the mempool is the same across all nodes.
274 def check_mempool(self
, txhash
, outcome
):
276 for c
in self
.connections
:
278 # Make sure the mempools agree with each other
279 if c
.cb
.lastInv
!= self
.connections
[0].cb
.lastInv
:
281 elif isinstance(outcome
, RejectResult
): # Check that tx was rejected w/ code
282 if txhash
in c
.cb
.lastInv
:
284 if txhash
not in c
.cb
.tx_reject_map
:
285 logger
.error('Tx not in reject map: %064x' % (txhash
))
287 if not outcome
.match(c
.cb
.tx_reject_map
[txhash
]):
288 logger
.error('Tx rejected with %s instead of expected %s: %064x' % (c
.cb
.tx_reject_map
[txhash
], outcome
, txhash
))
290 elif ((txhash
in c
.cb
.lastInv
) != outcome
):
295 # Wait until verack is received
296 self
.wait_for_verack()
299 tests
= self
.test_generator
.get_tests()
300 for test_instance
in tests
:
302 logger
.info("Running test %d: %s line %s" % (test_number
, tests
.gi_code
.co_filename
, tests
.gi_frame
.f_lineno
))
303 # We use these variables to keep track of the last block
304 # and last transaction in the tests, which are used
305 # if we're not syncing on every block or every tx.
306 [ block
, block_outcome
, tip
] = [ None, None, None ]
307 [ tx
, tx_outcome
] = [ None, None ]
310 for test_obj
in test_instance
.blocks_and_transactions
:
312 outcome
= test_obj
[1]
313 # Determine if we're dealing with a block or tx
314 if isinstance(b_or_t
, CBlock
): # Block test runner
316 block_outcome
= outcome
318 # each test_obj can have an optional third argument
319 # to specify the tip we should compare with
320 # (default is to use the block being tested)
321 if len(test_obj
) >= 3:
324 # Add to shared block_store, set as current block
325 # If there was an open getdata request for the block
326 # previously, and we didn't have an entry in the
327 # block_store, then immediately deliver, because the
328 # node wouldn't send another getdata request while
329 # the earlier one is outstanding.
330 first_block_with_hash
= True
331 if self
.block_store
.get(block
.sha256
) is not None:
332 first_block_with_hash
= False
334 self
.block_store
.add_block(block
)
335 for c
in self
.connections
:
336 if first_block_with_hash
and block
.sha256
in c
.cb
.block_request_map
and c
.cb
.block_request_map
[block
.sha256
] == True:
337 # There was a previous request for this block hash
338 # Most likely, we delivered a header for this block
339 # but never had the block to respond to the getdata
340 c
.send_message(msg_block(block
))
342 c
.cb
.block_request_map
[block
.sha256
] = False
343 # Either send inv's to each node and sync, or add
344 # to invqueue for later inv'ing.
345 if (test_instance
.sync_every_block
):
346 # if we expect success, send inv and sync every block
347 # if we expect failure, just push the block and see what happens.
349 [ c
.cb
.send_inv(block
) for c
in self
.connections
]
350 self
.sync_blocks(block
.sha256
, 1)
352 [ c
.send_message(msg_block(block
)) for c
in self
.connections
]
353 [ c
.cb
.send_ping(self
.ping_counter
) for c
in self
.connections
]
354 self
.wait_for_pings(self
.ping_counter
)
355 self
.ping_counter
+= 1
356 if (not self
.check_results(tip
, outcome
)):
357 raise AssertionError("Test failed at test %d" % test_number
)
359 invqueue
.append(CInv(2, block
.sha256
))
360 elif isinstance(b_or_t
, CBlockHeader
):
361 block_header
= b_or_t
362 self
.block_store
.add_header(block_header
)
363 [ c
.cb
.send_header(block_header
) for c
in self
.connections
]
365 else: # Tx test runner
366 assert(isinstance(b_or_t
, CTransaction
))
369 # Add to shared tx store and clear map entry
371 self
.tx_store
.add_transaction(tx
)
372 for c
in self
.connections
:
373 c
.cb
.tx_request_map
[tx
.sha256
] = False
374 # Again, either inv to all nodes or save for later
375 if (test_instance
.sync_every_tx
):
376 [ c
.cb
.send_inv(tx
) for c
in self
.connections
]
377 self
.sync_transaction(tx
.sha256
, 1)
378 if (not self
.check_mempool(tx
.sha256
, outcome
)):
379 raise AssertionError("Test failed at test %d" % test_number
)
381 invqueue
.append(CInv(1, tx
.sha256
))
382 # Ensure we're not overflowing the inv queue
383 if len(invqueue
) == MAX_INV_SZ
:
384 [ c
.send_message(msg_inv(invqueue
)) for c
in self
.connections
]
387 # Do final sync if we weren't syncing on every block or every tx.
388 if (not test_instance
.sync_every_block
and block
is not None):
389 if len(invqueue
) > 0:
390 [ c
.send_message(msg_inv(invqueue
)) for c
in self
.connections
]
392 self
.sync_blocks(block
.sha256
, len(test_instance
.blocks_and_transactions
))
393 if (not self
.check_results(tip
, block_outcome
)):
394 raise AssertionError("Block test failed at test %d" % test_number
)
395 if (not test_instance
.sync_every_tx
and tx
is not None):
396 if len(invqueue
) > 0:
397 [ c
.send_message(msg_inv(invqueue
)) for c
in self
.connections
]
399 self
.sync_transaction(tx
.sha256
, len(test_instance
.blocks_and_transactions
))
400 if (not self
.check_mempool(tx
.sha256
, tx_outcome
)):
401 raise AssertionError("Mempool test failed at test %d" % test_number
)
403 [ c
.disconnect_node() for c
in self
.connections
]
404 self
.wait_for_disconnections()
405 self
.block_store
.close()
406 self
.tx_store
.close()