[tests] Add NetworkThread assertions
[bitcoinplatinum.git] / test / functional / test_framework / mininode.py
blob724d418099b38308d00da29d3d5a4470d6c5615b
1 #!/usr/bin/env python3
2 # Copyright (c) 2010 ArtForz -- public domain half-a-node
3 # Copyright (c) 2012 Jeff Garzik
4 # Copyright (c) 2010-2016 The Bitcoin Core developers
5 # Distributed under the MIT software license, see the accompanying
6 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
7 """Bitcoin P2P network half-a-node.
9 This python code was modified from ArtForz' public domain half-a-node, as
10 found in the mini-node branch of http://github.com/jgarzik/pynode.
12 P2PConnection: A low-level connection object to a node's P2P interface
13 P2PInterface: A high-level interface object for communicating to a node over P2P"""
14 import asyncore
15 from collections import defaultdict
16 from io import BytesIO
17 import logging
18 import socket
19 import struct
20 import sys
21 import threading
23 from test_framework.messages import *
24 from test_framework.util import wait_until
26 logger = logging.getLogger("TestFramework.mininode")
28 MESSAGEMAP = {
29 b"addr": msg_addr,
30 b"block": msg_block,
31 b"blocktxn": msg_blocktxn,
32 b"cmpctblock": msg_cmpctblock,
33 b"feefilter": msg_feefilter,
34 b"getaddr": msg_getaddr,
35 b"getblocks": msg_getblocks,
36 b"getblocktxn": msg_getblocktxn,
37 b"getdata": msg_getdata,
38 b"getheaders": msg_getheaders,
39 b"headers": msg_headers,
40 b"inv": msg_inv,
41 b"mempool": msg_mempool,
42 b"ping": msg_ping,
43 b"pong": msg_pong,
44 b"reject": msg_reject,
45 b"sendcmpct": msg_sendcmpct,
46 b"sendheaders": msg_sendheaders,
47 b"tx": msg_tx,
48 b"verack": msg_verack,
49 b"version": msg_version,
52 MAGIC_BYTES = {
53 "mainnet": b"\xf9\xbe\xb4\xd9", # mainnet
54 "testnet3": b"\x0b\x11\x09\x07", # testnet3
55 "regtest": b"\xfa\xbf\xb5\xda", # regtest
58 class P2PConnection(asyncore.dispatcher):
59 """A low-level connection object to a node's P2P interface.
61 This class is responsible for:
63 - opening and closing the TCP connection to the node
64 - reading bytes from and writing bytes to the socket
65 - deserializing and serializing the P2P message header
66 - logging messages as they are sent and received
68 This class contains no logic for handing the P2P message payloads. It must be
69 sub-classed and the on_message() callback overridden."""
71 def __init__(self):
72 # All P2PConnections must be created before starting the NetworkThread.
73 # assert that the network thread is not running.
74 assert not network_thread_running()
76 super().__init__(map=mininode_socket_map)
78 def peer_connect(self, dstaddr, dstport, net="regtest"):
79 self.dstaddr = dstaddr
80 self.dstport = dstport
81 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
82 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
83 self.sendbuf = b""
84 self.recvbuf = b""
85 self.state = "connecting"
86 self.network = net
87 self.disconnect = False
89 logger.info('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport))
91 try:
92 self.connect((dstaddr, dstport))
93 except:
94 self.handle_close()
96 def peer_disconnect(self):
97 # Connection could have already been closed by other end.
98 if self.state == "connected":
99 self.disconnect_node()
101 # Connection and disconnection methods
103 def handle_connect(self):
104 """asyncore callback when a connection is opened."""
105 if self.state != "connected":
106 logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
107 self.state = "connected"
108 self.on_open()
110 def handle_close(self):
111 """asyncore callback when a connection is closed."""
112 logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
113 self.state = "closed"
114 self.recvbuf = b""
115 self.sendbuf = b""
116 try:
117 self.close()
118 except:
119 pass
120 self.on_close()
122 def disconnect_node(self):
123 """Disconnect the p2p connection.
125 Called by the test logic thread. Causes the p2p connection
126 to be disconnected on the next iteration of the asyncore loop."""
127 self.disconnect = True
129 # Socket read methods
131 def handle_read(self):
132 """asyncore callback when data is read from the socket."""
133 t = self.recv(8192)
134 if len(t) > 0:
135 self.recvbuf += t
136 self._on_data()
138 def _on_data(self):
139 """Try to read P2P messages from the recv buffer.
141 This method reads data from the buffer in a loop. It deserializes,
142 parses and verifies the P2P header, then passes the P2P payload to
143 the on_message callback for processing."""
144 try:
145 while True:
146 if len(self.recvbuf) < 4:
147 return
148 if self.recvbuf[:4] != MAGIC_BYTES[self.network]:
149 raise ValueError("got garbage %s" % repr(self.recvbuf))
150 if len(self.recvbuf) < 4 + 12 + 4 + 4:
151 return
152 command = self.recvbuf[4:4+12].split(b"\x00", 1)[0]
153 msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0]
154 checksum = self.recvbuf[4+12+4:4+12+4+4]
155 if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen:
156 return
157 msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen]
158 th = sha256(msg)
159 h = sha256(th)
160 if checksum != h[:4]:
161 raise ValueError("got bad checksum " + repr(self.recvbuf))
162 self.recvbuf = self.recvbuf[4+12+4+4+msglen:]
163 if command not in MESSAGEMAP:
164 raise ValueError("Received unknown command from %s:%d: '%s' %s" % (self.dstaddr, self.dstport, command, repr(msg)))
165 f = BytesIO(msg)
166 t = MESSAGEMAP[command]()
167 t.deserialize(f)
168 self._log_message("receive", t)
169 self.on_message(t)
170 except Exception as e:
171 logger.exception('Error reading message:', repr(e))
172 raise
174 def on_message(self, message):
175 """Callback for processing a P2P payload. Must be overridden by derived class."""
176 raise NotImplementedError
178 # Socket write methods
180 def writable(self):
181 """asyncore method to determine whether the handle_write() callback should be called on the next loop."""
182 with mininode_lock:
183 pre_connection = self.state == "connecting"
184 length = len(self.sendbuf)
185 return (length > 0 or pre_connection)
187 def handle_write(self):
188 """asyncore callback when data should be written to the socket."""
189 with mininode_lock:
190 # asyncore does not expose socket connection, only the first read/write
191 # event, thus we must check connection manually here to know when we
192 # actually connect
193 if self.state == "connecting":
194 self.handle_connect()
195 if not self.writable():
196 return
198 try:
199 sent = self.send(self.sendbuf)
200 except:
201 self.handle_close()
202 return
203 self.sendbuf = self.sendbuf[sent:]
205 def send_message(self, message, pushbuf=False):
206 """Send a P2P message over the socket.
208 This method takes a P2P payload, builds the P2P header and adds
209 the message to the send buffer to be sent over the socket."""
210 if self.state != "connected" and not pushbuf:
211 raise IOError('Not connected, no pushbuf')
212 self._log_message("send", message)
213 command = message.command
214 data = message.serialize()
215 tmsg = MAGIC_BYTES[self.network]
216 tmsg += command
217 tmsg += b"\x00" * (12 - len(command))
218 tmsg += struct.pack("<I", len(data))
219 th = sha256(data)
220 h = sha256(th)
221 tmsg += h[:4]
222 tmsg += data
223 with mininode_lock:
224 if (len(self.sendbuf) == 0 and not pushbuf):
225 try:
226 sent = self.send(tmsg)
227 self.sendbuf = tmsg[sent:]
228 except BlockingIOError:
229 self.sendbuf = tmsg
230 else:
231 self.sendbuf += tmsg
233 # Class utility methods
235 def _log_message(self, direction, msg):
236 """Logs a message being sent or received over the connection."""
237 if direction == "send":
238 log_message = "Send message to "
239 elif direction == "receive":
240 log_message = "Received message from "
241 log_message += "%s:%d: %s" % (self.dstaddr, self.dstport, repr(msg)[:500])
242 if len(log_message) > 500:
243 log_message += "... (msg truncated)"
244 logger.debug(log_message)
247 class P2PInterface(P2PConnection):
248 """A high-level P2P interface class for communicating with a Bitcoin node.
250 This class provides high-level callbacks for processing P2P message
251 payloads, as well as convenience methods for interacting with the
252 node over P2P.
254 Individual testcases should subclass this and override the on_* methods
255 if they want to alter message handling behaviour."""
256 def __init__(self):
257 super().__init__()
259 # Track number of messages of each type received and the most recent
260 # message of each type
261 self.message_count = defaultdict(int)
262 self.last_message = {}
264 # A count of the number of ping messages we've sent to the node
265 self.ping_counter = 1
267 # The network services received from the peer
268 self.nServices = 0
270 def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=True, **kwargs):
271 super().peer_connect(*args, **kwargs)
273 if send_version:
274 # Send a version msg
275 vt = msg_version()
276 vt.nServices = services
277 vt.addrTo.ip = self.dstaddr
278 vt.addrTo.port = self.dstport
279 vt.addrFrom.ip = "0.0.0.0"
280 vt.addrFrom.port = 0
281 self.send_message(vt, True)
283 # Message receiving methods
285 def on_message(self, message):
286 """Receive message and dispatch message to appropriate callback.
288 We keep a count of how many of each message type has been received
289 and the most recent message of each type."""
290 with mininode_lock:
291 try:
292 command = message.command.decode('ascii')
293 self.message_count[command] += 1
294 self.last_message[command] = message
295 getattr(self, 'on_' + command)(message)
296 except:
297 print("ERROR delivering %s (%s)" % (repr(message), sys.exc_info()[0]))
298 raise
300 # Callback methods. Can be overridden by subclasses in individual test
301 # cases to provide custom message handling behaviour.
303 def on_open(self):
304 pass
306 def on_close(self):
307 pass
309 def on_addr(self, message): pass
310 def on_block(self, message): pass
311 def on_blocktxn(self, message): pass
312 def on_cmpctblock(self, message): pass
313 def on_feefilter(self, message): pass
314 def on_getaddr(self, message): pass
315 def on_getblocks(self, message): pass
316 def on_getblocktxn(self, message): pass
317 def on_getdata(self, message): pass
318 def on_getheaders(self, message): pass
319 def on_headers(self, message): pass
320 def on_mempool(self, message): pass
321 def on_pong(self, message): pass
322 def on_reject(self, message): pass
323 def on_sendcmpct(self, message): pass
324 def on_sendheaders(self, message): pass
325 def on_tx(self, message): pass
327 def on_inv(self, message):
328 want = msg_getdata()
329 for i in message.inv:
330 if i.type != 0:
331 want.inv.append(i)
332 if len(want.inv):
333 self.send_message(want)
335 def on_ping(self, message):
336 self.send_message(msg_pong(message.nonce))
338 def on_verack(self, message):
339 self.verack_received = True
341 def on_version(self, message):
342 assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_VERSION_SUPPORTED)
343 self.send_message(msg_verack())
344 self.nServices = message.nServices
346 # Connection helper methods
348 def wait_for_disconnect(self, timeout=60):
349 test_function = lambda: self.state != "connected"
350 wait_until(test_function, timeout=timeout, lock=mininode_lock)
352 # Message receiving helper methods
354 def wait_for_block(self, blockhash, timeout=60):
355 test_function = lambda: self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash
356 wait_until(test_function, timeout=timeout, lock=mininode_lock)
358 def wait_for_getdata(self, timeout=60):
359 test_function = lambda: self.last_message.get("getdata")
360 wait_until(test_function, timeout=timeout, lock=mininode_lock)
362 def wait_for_getheaders(self, timeout=60):
363 test_function = lambda: self.last_message.get("getheaders")
364 wait_until(test_function, timeout=timeout, lock=mininode_lock)
366 def wait_for_inv(self, expected_inv, timeout=60):
367 """Waits for an INV message and checks that the first inv object in the message was as expected."""
368 if len(expected_inv) > 1:
369 raise NotImplementedError("wait_for_inv() will only verify the first inv object")
370 test_function = lambda: self.last_message.get("inv") and \
371 self.last_message["inv"].inv[0].type == expected_inv[0].type and \
372 self.last_message["inv"].inv[0].hash == expected_inv[0].hash
373 wait_until(test_function, timeout=timeout, lock=mininode_lock)
375 def wait_for_verack(self, timeout=60):
376 test_function = lambda: self.message_count["verack"]
377 wait_until(test_function, timeout=timeout, lock=mininode_lock)
379 # Message sending helper functions
381 def send_and_ping(self, message):
382 self.send_message(message)
383 self.sync_with_ping()
385 # Sync up with the node
386 def sync_with_ping(self, timeout=60):
387 self.send_message(msg_ping(nonce=self.ping_counter))
388 test_function = lambda: self.last_message.get("pong") and self.last_message["pong"].nonce == self.ping_counter
389 wait_until(test_function, timeout=timeout, lock=mininode_lock)
390 self.ping_counter += 1
393 # Keep our own socket map for asyncore, so that we can track disconnects
394 # ourselves (to workaround an issue with closing an asyncore socket when
395 # using select)
396 mininode_socket_map = dict()
398 # One lock for synchronizing all data access between the networking thread (see
399 # NetworkThread below) and the thread running the test logic. For simplicity,
400 # P2PConnection acquires this lock whenever delivering a message to a P2PInterface,
401 # and whenever adding anything to the send buffer (in send_message()). This
402 # lock should be acquired in the thread running the test logic to synchronize
403 # access to any data shared with the P2PInterface or P2PConnection.
404 mininode_lock = threading.RLock()
406 class NetworkThread(threading.Thread):
407 def __init__(self):
408 super().__init__(name="NetworkThread")
410 def run(self):
411 while mininode_socket_map:
412 # We check for whether to disconnect outside of the asyncore
413 # loop to workaround the behavior of asyncore when using
414 # select
415 disconnected = []
416 for fd, obj in mininode_socket_map.items():
417 if obj.disconnect:
418 disconnected.append(obj)
419 [obj.handle_close() for obj in disconnected]
420 asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1)
421 logger.debug("Network thread closing")
423 def network_thread_start():
424 """Start the network thread."""
425 # Only one network thread may run at a time
426 assert not network_thread_running()
428 NetworkThread().start()
430 def network_thread_running():
431 """Return whether the network thread is running."""
432 return any([thread.name == "NetworkThread" for thread in threading.enumerate()])
434 def network_thread_join(timeout=10):
435 """Wait timeout seconds for the network thread to terminate.
437 Throw if the network thread doesn't terminate in timeout seconds."""
438 network_threads = [thread for thread in threading.enumerate() if thread.name == "NetworkThread"]
439 assert len(network_threads) <= 1
440 for thread in network_threads:
441 thread.join(timeout)
442 assert not thread.is_alive()