2 # Copyright (c) 2010 ArtForz -- public domain half-a-node
3 # Copyright (c) 2012 Jeff Garzik
4 # Copyright (c) 2010-2016 The Bitcoin Core developers
5 # Distributed under the MIT software license, see the accompanying
6 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
7 """Bitcoin P2P network half-a-node.
9 This python code was modified from ArtForz' public domain half-a-node, as
10 found in the mini-node branch of http://github.com/jgarzik/pynode.
12 NodeConn: an object which manages p2p connectivity to a bitcoin node
13 NodeConnCB: a base class that describes the interface for receiving
14 callbacks with network messages from a NodeConn
15 CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....:
16 data structures that should map to corresponding structures in
18 msg_block, msg_tx, msg_headers, etc.:
19 data structures that represent network messages
20 ser_*, deser_*: functions that handle serialization/deserialization
24 from codecs
import encode
25 from collections
import defaultdict
28 from io
import BytesIO
35 from threading
import RLock
, Thread
37 from test_framework
.siphash
import siphash256
38 from test_framework
.util
import hex_str_to_bytes
, bytes_to_hex_str
, wait_until
40 BIP0031_VERSION
= 60000
41 MY_VERSION
= 70014 # past bip-31 for ping/pong
42 MY_SUBVERSION
= b
"/python-mininode-tester:0.0.3/"
43 MY_RELAY
= 1 # from version 70001 onwards, fRelay should be appended to version messages (BIP37)
46 MAX_BLOCK_BASE_SIZE
= 1000000
48 COIN
= 100000000 # 1 btc in satoshis
50 NODE_NETWORK
= (1 << 0)
51 # NODE_GETUTXO = (1 << 1)
52 # NODE_BLOOM = (1 << 2)
53 NODE_WITNESS
= (1 << 3)
54 NODE_UNSUPPORTED_SERVICE_BIT_5
= (1 << 5)
55 NODE_UNSUPPORTED_SERVICE_BIT_7
= (1 << 7)
57 logger
= logging
.getLogger("TestFramework.mininode")
59 # Keep our own socket map for asyncore, so that we can track disconnects
60 # ourselves (to workaround an issue with closing an asyncore socket when
62 mininode_socket_map
= dict()
64 # One lock for synchronizing all data access between the networking thread (see
65 # NetworkThread below) and the thread running the test logic. For simplicity,
66 # NodeConn acquires this lock whenever delivering a message to a NodeConnCB,
67 # and whenever adding anything to the send buffer (in send_message()). This
68 # lock should be acquired in the thread running the test logic to synchronize
69 # access to any data shared with the NodeConnCB or NodeConn.
70 mininode_lock
= RLock()
72 # Serialization/deserialization tools
74 return hashlib
.new('sha256', s
).digest()
77 return hashlib
.new('ripemd160', s
).digest()
80 return sha256(sha256(s
))
82 def ser_compact_size(l
):
85 r
= struct
.pack("B", l
)
87 r
= struct
.pack("<BH", 253, l
)
89 r
= struct
.pack("<BI", 254, l
)
91 r
= struct
.pack("<BQ", 255, l
)
94 def deser_compact_size(f
):
95 nit
= struct
.unpack("<B", f
.read(1))[0]
97 nit
= struct
.unpack("<H", f
.read(2))[0]
99 nit
= struct
.unpack("<I", f
.read(4))[0]
101 nit
= struct
.unpack("<Q", f
.read(8))[0]
105 nit
= deser_compact_size(f
)
109 return ser_compact_size(len(s
)) + s
111 def deser_uint256(f
):
114 t
= struct
.unpack("<I", f
.read(4))[0]
122 rs
+= struct
.pack("<I", u
& 0xFFFFFFFF)
127 def uint256_from_str(s
):
129 t
= struct
.unpack("<IIIIIIII", s
[:32])
131 r
+= t
[i
] << (i
* 32)
135 def uint256_from_compact(c
):
136 nbytes
= (c
>> 24) & 0xFF
137 v
= (c
& 0xFFFFFF) << (8 * (nbytes
- 3))
141 def deser_vector(f
, c
):
142 nit
= deser_compact_size(f
)
151 # ser_function_name: Allow for an alternate serialization function on the
152 # entries in the vector (we use this for serializing the vector of transactions
153 # for a witness block).
154 def ser_vector(l
, ser_function_name
=None):
155 r
= ser_compact_size(len(l
))
157 if ser_function_name
:
158 r
+= getattr(i
, ser_function_name
)()
164 def deser_uint256_vector(f
):
165 nit
= deser_compact_size(f
)
173 def ser_uint256_vector(l
):
174 r
= ser_compact_size(len(l
))
180 def deser_string_vector(f
):
181 nit
= deser_compact_size(f
)
189 def ser_string_vector(l
):
190 r
= ser_compact_size(len(l
))
196 def deser_int_vector(f
):
197 nit
= deser_compact_size(f
)
200 t
= struct
.unpack("<i", f
.read(4))[0]
205 def ser_int_vector(l
):
206 r
= ser_compact_size(len(l
))
208 r
+= struct
.pack("<i", i
)
211 # Deserialize from a hex string representation (eg from RPC)
212 def FromHex(obj
, hex_string
):
213 obj
.deserialize(BytesIO(hex_str_to_bytes(hex_string
)))
216 # Convert a binary-serializable object to hex (eg for submission via RPC)
218 return bytes_to_hex_str(obj
.serialize())
220 # Objects that map to bitcoind objects, which can be serialized/deserialized
225 self
.pchReserved
= b
"\x00" * 10 + b
"\xff" * 2
229 def deserialize(self
, f
):
230 self
.nServices
= struct
.unpack("<Q", f
.read(8))[0]
231 self
.pchReserved
= f
.read(12)
232 self
.ip
= socket
.inet_ntoa(f
.read(4))
233 self
.port
= struct
.unpack(">H", f
.read(2))[0]
237 r
+= struct
.pack("<Q", self
.nServices
)
238 r
+= self
.pchReserved
239 r
+= socket
.inet_aton(self
.ip
)
240 r
+= struct
.pack(">H", self
.port
)
244 return "CAddress(nServices=%i ip=%s port=%i)" % (self
.nServices
,
247 MSG_WITNESS_FLAG
= 1<<30
254 1|MSG_WITNESS_FLAG
: "WitnessTx",
255 2|MSG_WITNESS_FLAG
: "WitnessBlock",
259 def __init__(self
, t
=0, h
=0):
263 def deserialize(self
, f
):
264 self
.type = struct
.unpack("<i", f
.read(4))[0]
265 self
.hash = deser_uint256(f
)
269 r
+= struct
.pack("<i", self
.type)
270 r
+= ser_uint256(self
.hash)
274 return "CInv(type=%s hash=%064x)" \
275 % (self
.typemap
[self
.type], self
.hash)
278 class CBlockLocator():
280 self
.nVersion
= MY_VERSION
283 def deserialize(self
, f
):
284 self
.nVersion
= struct
.unpack("<i", f
.read(4))[0]
285 self
.vHave
= deser_uint256_vector(f
)
289 r
+= struct
.pack("<i", self
.nVersion
)
290 r
+= ser_uint256_vector(self
.vHave
)
294 return "CBlockLocator(nVersion=%i vHave=%s)" \
295 % (self
.nVersion
, repr(self
.vHave
))
299 def __init__(self
, hash=0, n
=0):
303 def deserialize(self
, f
):
304 self
.hash = deser_uint256(f
)
305 self
.n
= struct
.unpack("<I", f
.read(4))[0]
309 r
+= ser_uint256(self
.hash)
310 r
+= struct
.pack("<I", self
.n
)
314 return "COutPoint(hash=%064x n=%i)" % (self
.hash, self
.n
)
318 def __init__(self
, outpoint
=None, scriptSig
=b
"", nSequence
=0):
320 self
.prevout
= COutPoint()
322 self
.prevout
= outpoint
323 self
.scriptSig
= scriptSig
324 self
.nSequence
= nSequence
326 def deserialize(self
, f
):
327 self
.prevout
= COutPoint()
328 self
.prevout
.deserialize(f
)
329 self
.scriptSig
= deser_string(f
)
330 self
.nSequence
= struct
.unpack("<I", f
.read(4))[0]
334 r
+= self
.prevout
.serialize()
335 r
+= ser_string(self
.scriptSig
)
336 r
+= struct
.pack("<I", self
.nSequence
)
340 return "CTxIn(prevout=%s scriptSig=%s nSequence=%i)" \
341 % (repr(self
.prevout
), bytes_to_hex_str(self
.scriptSig
),
346 def __init__(self
, nValue
=0, scriptPubKey
=b
""):
348 self
.scriptPubKey
= scriptPubKey
350 def deserialize(self
, f
):
351 self
.nValue
= struct
.unpack("<q", f
.read(8))[0]
352 self
.scriptPubKey
= deser_string(f
)
356 r
+= struct
.pack("<q", self
.nValue
)
357 r
+= ser_string(self
.scriptPubKey
)
361 return "CTxOut(nValue=%i.%08i scriptPubKey=%s)" \
362 % (self
.nValue
// COIN
, self
.nValue
% COIN
,
363 bytes_to_hex_str(self
.scriptPubKey
))
366 class CScriptWitness():
368 # stack is a vector of strings
372 return "CScriptWitness(%s)" % \
373 (",".join([bytes_to_hex_str(x
) for x
in self
.stack
]))
381 class CTxInWitness():
383 self
.scriptWitness
= CScriptWitness()
385 def deserialize(self
, f
):
386 self
.scriptWitness
.stack
= deser_string_vector(f
)
389 return ser_string_vector(self
.scriptWitness
.stack
)
392 return repr(self
.scriptWitness
)
395 return self
.scriptWitness
.is_null()
402 def deserialize(self
, f
):
403 for i
in range(len(self
.vtxinwit
)):
404 self
.vtxinwit
[i
].deserialize(f
)
408 # This is different than the usual vector serialization --
409 # we omit the length of the vector, which is required to be
410 # the same length as the transaction's vin vector.
411 for x
in self
.vtxinwit
:
416 return "CTxWitness(%s)" % \
417 (';'.join([repr(x
) for x
in self
.vtxinwit
]))
420 for x
in self
.vtxinwit
:
426 class CTransaction():
427 def __init__(self
, tx
=None):
432 self
.wit
= CTxWitness()
437 self
.nVersion
= tx
.nVersion
438 self
.vin
= copy
.deepcopy(tx
.vin
)
439 self
.vout
= copy
.deepcopy(tx
.vout
)
440 self
.nLockTime
= tx
.nLockTime
441 self
.sha256
= tx
.sha256
443 self
.wit
= copy
.deepcopy(tx
.wit
)
445 def deserialize(self
, f
):
446 self
.nVersion
= struct
.unpack("<i", f
.read(4))[0]
447 self
.vin
= deser_vector(f
, CTxIn
)
449 if len(self
.vin
) == 0:
450 flags
= struct
.unpack("<B", f
.read(1))[0]
451 # Not sure why flags can't be zero, but this
452 # matches the implementation in bitcoind
454 self
.vin
= deser_vector(f
, CTxIn
)
455 self
.vout
= deser_vector(f
, CTxOut
)
457 self
.vout
= deser_vector(f
, CTxOut
)
459 self
.wit
.vtxinwit
= [CTxInWitness() for i
in range(len(self
.vin
))]
460 self
.wit
.deserialize(f
)
461 self
.nLockTime
= struct
.unpack("<I", f
.read(4))[0]
465 def serialize_without_witness(self
):
467 r
+= struct
.pack("<i", self
.nVersion
)
468 r
+= ser_vector(self
.vin
)
469 r
+= ser_vector(self
.vout
)
470 r
+= struct
.pack("<I", self
.nLockTime
)
473 # Only serialize with witness when explicitly called for
474 def serialize_with_witness(self
):
476 if not self
.wit
.is_null():
479 r
+= struct
.pack("<i", self
.nVersion
)
482 r
+= ser_vector(dummy
)
483 r
+= struct
.pack("<B", flags
)
484 r
+= ser_vector(self
.vin
)
485 r
+= ser_vector(self
.vout
)
487 if (len(self
.wit
.vtxinwit
) != len(self
.vin
)):
488 # vtxinwit must have the same length as vin
489 self
.wit
.vtxinwit
= self
.wit
.vtxinwit
[:len(self
.vin
)]
490 for i
in range(len(self
.wit
.vtxinwit
), len(self
.vin
)):
491 self
.wit
.vtxinwit
.append(CTxInWitness())
492 r
+= self
.wit
.serialize()
493 r
+= struct
.pack("<I", self
.nLockTime
)
496 # Regular serialization is without witness -- must explicitly
497 # call serialize_with_witness to include witness data.
499 return self
.serialize_without_witness()
501 # Recalculate the txid (transaction hash without witness)
506 # We will only cache the serialization without witness in
507 # self.sha256 and self.hash -- those are expected to be the txid.
508 def calc_sha256(self
, with_witness
=False):
510 # Don't cache the result, just return it
511 return uint256_from_str(hash256(self
.serialize_with_witness()))
513 if self
.sha256
is None:
514 self
.sha256
= uint256_from_str(hash256(self
.serialize_without_witness()))
515 self
.hash = encode(hash256(self
.serialize())[::-1], 'hex_codec').decode('ascii')
519 for tout
in self
.vout
:
520 if tout
.nValue
< 0 or tout
.nValue
> 21000000 * COIN
:
525 return "CTransaction(nVersion=%i vin=%s vout=%s wit=%s nLockTime=%i)" \
526 % (self
.nVersion
, repr(self
.vin
), repr(self
.vout
), repr(self
.wit
), self
.nLockTime
)
529 class CBlockHeader():
530 def __init__(self
, header
=None):
534 self
.nVersion
= header
.nVersion
535 self
.hashPrevBlock
= header
.hashPrevBlock
536 self
.hashMerkleRoot
= header
.hashMerkleRoot
537 self
.nTime
= header
.nTime
538 self
.nBits
= header
.nBits
539 self
.nNonce
= header
.nNonce
540 self
.sha256
= header
.sha256
541 self
.hash = header
.hash
546 self
.hashPrevBlock
= 0
547 self
.hashMerkleRoot
= 0
554 def deserialize(self
, f
):
555 self
.nVersion
= struct
.unpack("<i", f
.read(4))[0]
556 self
.hashPrevBlock
= deser_uint256(f
)
557 self
.hashMerkleRoot
= deser_uint256(f
)
558 self
.nTime
= struct
.unpack("<I", f
.read(4))[0]
559 self
.nBits
= struct
.unpack("<I", f
.read(4))[0]
560 self
.nNonce
= struct
.unpack("<I", f
.read(4))[0]
566 r
+= struct
.pack("<i", self
.nVersion
)
567 r
+= ser_uint256(self
.hashPrevBlock
)
568 r
+= ser_uint256(self
.hashMerkleRoot
)
569 r
+= struct
.pack("<I", self
.nTime
)
570 r
+= struct
.pack("<I", self
.nBits
)
571 r
+= struct
.pack("<I", self
.nNonce
)
574 def calc_sha256(self
):
575 if self
.sha256
is None:
577 r
+= struct
.pack("<i", self
.nVersion
)
578 r
+= ser_uint256(self
.hashPrevBlock
)
579 r
+= ser_uint256(self
.hashMerkleRoot
)
580 r
+= struct
.pack("<I", self
.nTime
)
581 r
+= struct
.pack("<I", self
.nBits
)
582 r
+= struct
.pack("<I", self
.nNonce
)
583 self
.sha256
= uint256_from_str(hash256(r
))
584 self
.hash = encode(hash256(r
)[::-1], 'hex_codec').decode('ascii')
592 return "CBlockHeader(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x)" \
593 % (self
.nVersion
, self
.hashPrevBlock
, self
.hashMerkleRoot
,
594 time
.ctime(self
.nTime
), self
.nBits
, self
.nNonce
)
597 class CBlock(CBlockHeader
):
598 def __init__(self
, header
=None):
599 super(CBlock
, self
).__init
__(header
)
602 def deserialize(self
, f
):
603 super(CBlock
, self
).deserialize(f
)
604 self
.vtx
= deser_vector(f
, CTransaction
)
606 def serialize(self
, with_witness
=False):
608 r
+= super(CBlock
, self
).serialize()
610 r
+= ser_vector(self
.vtx
, "serialize_with_witness")
612 r
+= ser_vector(self
.vtx
)
615 # Calculate the merkle root given a vector of transaction hashes
617 def get_merkle_root(cls
, hashes
):
618 while len(hashes
) > 1:
620 for i
in range(0, len(hashes
), 2):
621 i2
= min(i
+1, len(hashes
)-1)
622 newhashes
.append(hash256(hashes
[i
] + hashes
[i2
]))
624 return uint256_from_str(hashes
[0])
626 def calc_merkle_root(self
):
630 hashes
.append(ser_uint256(tx
.sha256
))
631 return self
.get_merkle_root(hashes
)
633 def calc_witness_merkle_root(self
):
634 # For witness root purposes, the hash of the
635 # coinbase, with witness, is defined to be 0...0
636 hashes
= [ser_uint256(0)]
638 for tx
in self
.vtx
[1:]:
639 # Calculate the hashes with witness data
640 hashes
.append(ser_uint256(tx
.calc_sha256(True)))
642 return self
.get_merkle_root(hashes
)
646 target
= uint256_from_compact(self
.nBits
)
647 if self
.sha256
> target
:
650 if not tx
.is_valid():
652 if self
.calc_merkle_root() != self
.hashMerkleRoot
:
658 target
= uint256_from_compact(self
.nBits
)
659 while self
.sha256
> target
:
664 return "CBlock(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x vtx=%s)" \
665 % (self
.nVersion
, self
.hashPrevBlock
, self
.hashMerkleRoot
,
666 time
.ctime(self
.nTime
), self
.nBits
, self
.nNonce
, repr(self
.vtx
))
669 class CUnsignedAlert():
681 self
.strComment
= b
""
682 self
.strStatusBar
= b
""
683 self
.strReserved
= b
""
685 def deserialize(self
, f
):
686 self
.nVersion
= struct
.unpack("<i", f
.read(4))[0]
687 self
.nRelayUntil
= struct
.unpack("<q", f
.read(8))[0]
688 self
.nExpiration
= struct
.unpack("<q", f
.read(8))[0]
689 self
.nID
= struct
.unpack("<i", f
.read(4))[0]
690 self
.nCancel
= struct
.unpack("<i", f
.read(4))[0]
691 self
.setCancel
= deser_int_vector(f
)
692 self
.nMinVer
= struct
.unpack("<i", f
.read(4))[0]
693 self
.nMaxVer
= struct
.unpack("<i", f
.read(4))[0]
694 self
.setSubVer
= deser_string_vector(f
)
695 self
.nPriority
= struct
.unpack("<i", f
.read(4))[0]
696 self
.strComment
= deser_string(f
)
697 self
.strStatusBar
= deser_string(f
)
698 self
.strReserved
= deser_string(f
)
702 r
+= struct
.pack("<i", self
.nVersion
)
703 r
+= struct
.pack("<q", self
.nRelayUntil
)
704 r
+= struct
.pack("<q", self
.nExpiration
)
705 r
+= struct
.pack("<i", self
.nID
)
706 r
+= struct
.pack("<i", self
.nCancel
)
707 r
+= ser_int_vector(self
.setCancel
)
708 r
+= struct
.pack("<i", self
.nMinVer
)
709 r
+= struct
.pack("<i", self
.nMaxVer
)
710 r
+= ser_string_vector(self
.setSubVer
)
711 r
+= struct
.pack("<i", self
.nPriority
)
712 r
+= ser_string(self
.strComment
)
713 r
+= ser_string(self
.strStatusBar
)
714 r
+= ser_string(self
.strReserved
)
718 return "CUnsignedAlert(nVersion %d, nRelayUntil %d, nExpiration %d, nID %d, nCancel %d, nMinVer %d, nMaxVer %d, nPriority %d, strComment %s, strStatusBar %s, strReserved %s)" \
719 % (self
.nVersion
, self
.nRelayUntil
, self
.nExpiration
, self
.nID
,
720 self
.nCancel
, self
.nMinVer
, self
.nMaxVer
, self
.nPriority
,
721 self
.strComment
, self
.strStatusBar
, self
.strReserved
)
729 def deserialize(self
, f
):
730 self
.vchMsg
= deser_string(f
)
731 self
.vchSig
= deser_string(f
)
735 r
+= ser_string(self
.vchMsg
)
736 r
+= ser_string(self
.vchSig
)
740 return "CAlert(vchMsg.sz %d, vchSig.sz %d)" \
741 % (len(self
.vchMsg
), len(self
.vchSig
))
744 class PrefilledTransaction():
745 def __init__(self
, index
=0, tx
= None):
749 def deserialize(self
, f
):
750 self
.index
= deser_compact_size(f
)
751 self
.tx
= CTransaction()
752 self
.tx
.deserialize(f
)
754 def serialize(self
, with_witness
=False):
756 r
+= ser_compact_size(self
.index
)
758 r
+= self
.tx
.serialize_with_witness()
760 r
+= self
.tx
.serialize_without_witness()
763 def serialize_with_witness(self
):
764 return self
.serialize(with_witness
=True)
767 return "PrefilledTransaction(index=%d, tx=%s)" % (self
.index
, repr(self
.tx
))
769 # This is what we send on the wire, in a cmpctblock message.
770 class P2PHeaderAndShortIDs():
772 self
.header
= CBlockHeader()
774 self
.shortids_length
= 0
776 self
.prefilled_txn_length
= 0
777 self
.prefilled_txn
= []
779 def deserialize(self
, f
):
780 self
.header
.deserialize(f
)
781 self
.nonce
= struct
.unpack("<Q", f
.read(8))[0]
782 self
.shortids_length
= deser_compact_size(f
)
783 for i
in range(self
.shortids_length
):
784 # shortids are defined to be 6 bytes in the spec, so append
785 # two zero bytes and read it in as an 8-byte number
786 self
.shortids
.append(struct
.unpack("<Q", f
.read(6) + b
'\x00\x00')[0])
787 self
.prefilled_txn
= deser_vector(f
, PrefilledTransaction
)
788 self
.prefilled_txn_length
= len(self
.prefilled_txn
)
790 # When using version 2 compact blocks, we must serialize with_witness.
791 def serialize(self
, with_witness
=False):
793 r
+= self
.header
.serialize()
794 r
+= struct
.pack("<Q", self
.nonce
)
795 r
+= ser_compact_size(self
.shortids_length
)
796 for x
in self
.shortids
:
797 # We only want the first 6 bytes
798 r
+= struct
.pack("<Q", x
)[0:6]
800 r
+= ser_vector(self
.prefilled_txn
, "serialize_with_witness")
802 r
+= ser_vector(self
.prefilled_txn
)
806 return "P2PHeaderAndShortIDs(header=%s, nonce=%d, shortids_length=%d, shortids=%s, prefilled_txn_length=%d, prefilledtxn=%s" % (repr(self
.header
), self
.nonce
, self
.shortids_length
, repr(self
.shortids
), self
.prefilled_txn_length
, repr(self
.prefilled_txn
))
808 # P2P version of the above that will use witness serialization (for compact
810 class P2PHeaderAndShortWitnessIDs(P2PHeaderAndShortIDs
):
812 return super(P2PHeaderAndShortWitnessIDs
, self
).serialize(with_witness
=True)
814 # Calculate the BIP 152-compact blocks shortid for a given transaction hash
815 def calculate_shortid(k0
, k1
, tx_hash
):
816 expected_shortid
= siphash256(k0
, k1
, tx_hash
)
817 expected_shortid
&= 0x0000ffffffffffff
818 return expected_shortid
820 # This version gets rid of the array lengths, and reinterprets the differential
821 # encoding into indices that can be used for lookup.
822 class HeaderAndShortIDs():
823 def __init__(self
, p2pheaders_and_shortids
= None):
824 self
.header
= CBlockHeader()
827 self
.prefilled_txn
= []
828 self
.use_witness
= False
830 if p2pheaders_and_shortids
!= None:
831 self
.header
= p2pheaders_and_shortids
.header
832 self
.nonce
= p2pheaders_and_shortids
.nonce
833 self
.shortids
= p2pheaders_and_shortids
.shortids
835 for x
in p2pheaders_and_shortids
.prefilled_txn
:
836 self
.prefilled_txn
.append(PrefilledTransaction(x
.index
+ last_index
+ 1, x
.tx
))
837 last_index
= self
.prefilled_txn
[-1].index
841 ret
= P2PHeaderAndShortWitnessIDs()
843 ret
= P2PHeaderAndShortIDs()
844 ret
.header
= self
.header
845 ret
.nonce
= self
.nonce
846 ret
.shortids_length
= len(self
.shortids
)
847 ret
.shortids
= self
.shortids
848 ret
.prefilled_txn_length
= len(self
.prefilled_txn
)
849 ret
.prefilled_txn
= []
851 for x
in self
.prefilled_txn
:
852 ret
.prefilled_txn
.append(PrefilledTransaction(x
.index
- last_index
- 1, x
.tx
))
856 def get_siphash_keys(self
):
857 header_nonce
= self
.header
.serialize()
858 header_nonce
+= struct
.pack("<Q", self
.nonce
)
859 hash_header_nonce_as_str
= sha256(header_nonce
)
860 key0
= struct
.unpack("<Q", hash_header_nonce_as_str
[0:8])[0]
861 key1
= struct
.unpack("<Q", hash_header_nonce_as_str
[8:16])[0]
862 return [ key0
, key1
]
864 # Version 2 compact blocks use wtxid in shortids (rather than txid)
865 def initialize_from_block(self
, block
, nonce
=0, prefill_list
= [0], use_witness
= False):
866 self
.header
= CBlockHeader(block
)
868 self
.prefilled_txn
= [ PrefilledTransaction(i
, block
.vtx
[i
]) for i
in prefill_list
]
870 self
.use_witness
= use_witness
871 [k0
, k1
] = self
.get_siphash_keys()
872 for i
in range(len(block
.vtx
)):
873 if i
not in prefill_list
:
874 tx_hash
= block
.vtx
[i
].sha256
876 tx_hash
= block
.vtx
[i
].calc_sha256(with_witness
=True)
877 self
.shortids
.append(calculate_shortid(k0
, k1
, tx_hash
))
880 return "HeaderAndShortIDs(header=%s, nonce=%d, shortids=%s, prefilledtxn=%s" % (repr(self
.header
), self
.nonce
, repr(self
.shortids
), repr(self
.prefilled_txn
))
883 class BlockTransactionsRequest():
885 def __init__(self
, blockhash
=0, indexes
= None):
886 self
.blockhash
= blockhash
887 self
.indexes
= indexes
if indexes
!= None else []
889 def deserialize(self
, f
):
890 self
.blockhash
= deser_uint256(f
)
891 indexes_length
= deser_compact_size(f
)
892 for i
in range(indexes_length
):
893 self
.indexes
.append(deser_compact_size(f
))
897 r
+= ser_uint256(self
.blockhash
)
898 r
+= ser_compact_size(len(self
.indexes
))
899 for x
in self
.indexes
:
900 r
+= ser_compact_size(x
)
903 # helper to set the differentially encoded indexes from absolute ones
904 def from_absolute(self
, absolute_indexes
):
907 for x
in absolute_indexes
:
908 self
.indexes
.append(x
-last_index
-1)
911 def to_absolute(self
):
912 absolute_indexes
= []
914 for x
in self
.indexes
:
915 absolute_indexes
.append(x
+last_index
+1)
916 last_index
= absolute_indexes
[-1]
917 return absolute_indexes
920 return "BlockTransactionsRequest(hash=%064x indexes=%s)" % (self
.blockhash
, repr(self
.indexes
))
923 class BlockTransactions():
925 def __init__(self
, blockhash
=0, transactions
= None):
926 self
.blockhash
= blockhash
927 self
.transactions
= transactions
if transactions
!= None else []
929 def deserialize(self
, f
):
930 self
.blockhash
= deser_uint256(f
)
931 self
.transactions
= deser_vector(f
, CTransaction
)
933 def serialize(self
, with_witness
=False):
935 r
+= ser_uint256(self
.blockhash
)
937 r
+= ser_vector(self
.transactions
, "serialize_with_witness")
939 r
+= ser_vector(self
.transactions
)
943 return "BlockTransactions(hash=%064x transactions=%s)" % (self
.blockhash
, repr(self
.transactions
))
946 # Objects that correspond to messages on the wire
951 self
.nVersion
= MY_VERSION
952 self
.nServices
= NODE_NETWORK | NODE_WITNESS
953 self
.nTime
= int(time
.time())
954 self
.addrTo
= CAddress()
955 self
.addrFrom
= CAddress()
956 self
.nNonce
= random
.getrandbits(64)
957 self
.strSubVer
= MY_SUBVERSION
958 self
.nStartingHeight
= -1
959 self
.nRelay
= MY_RELAY
961 def deserialize(self
, f
):
962 self
.nVersion
= struct
.unpack("<i", f
.read(4))[0]
963 if self
.nVersion
== 10300:
965 self
.nServices
= struct
.unpack("<Q", f
.read(8))[0]
966 self
.nTime
= struct
.unpack("<q", f
.read(8))[0]
967 self
.addrTo
= CAddress()
968 self
.addrTo
.deserialize(f
)
970 if self
.nVersion
>= 106:
971 self
.addrFrom
= CAddress()
972 self
.addrFrom
.deserialize(f
)
973 self
.nNonce
= struct
.unpack("<Q", f
.read(8))[0]
974 self
.strSubVer
= deser_string(f
)
978 self
.strSubVer
= None
979 self
.nStartingHeight
= None
981 if self
.nVersion
>= 209:
982 self
.nStartingHeight
= struct
.unpack("<i", f
.read(4))[0]
984 self
.nStartingHeight
= None
986 if self
.nVersion
>= 70001:
987 # Relay field is optional for version 70001 onwards
989 self
.nRelay
= struct
.unpack("<b", f
.read(1))[0]
997 r
+= struct
.pack("<i", self
.nVersion
)
998 r
+= struct
.pack("<Q", self
.nServices
)
999 r
+= struct
.pack("<q", self
.nTime
)
1000 r
+= self
.addrTo
.serialize()
1001 r
+= self
.addrFrom
.serialize()
1002 r
+= struct
.pack("<Q", self
.nNonce
)
1003 r
+= ser_string(self
.strSubVer
)
1004 r
+= struct
.pack("<i", self
.nStartingHeight
)
1005 r
+= struct
.pack("<b", self
.nRelay
)
1009 return 'msg_version(nVersion=%i nServices=%i nTime=%s addrTo=%s addrFrom=%s nNonce=0x%016X strSubVer=%s nStartingHeight=%i nRelay=%i)' \
1010 % (self
.nVersion
, self
.nServices
, time
.ctime(self
.nTime
),
1011 repr(self
.addrTo
), repr(self
.addrFrom
), self
.nNonce
,
1012 self
.strSubVer
, self
.nStartingHeight
, self
.nRelay
)
1021 def deserialize(self
, f
):
1024 def serialize(self
):
1028 return "msg_verack()"
1037 def deserialize(self
, f
):
1038 self
.addrs
= deser_vector(f
, CAddress
)
1040 def serialize(self
):
1041 return ser_vector(self
.addrs
)
1044 return "msg_addr(addrs=%s)" % (repr(self
.addrs
))
1051 self
.alert
= CAlert()
1053 def deserialize(self
, f
):
1054 self
.alert
= CAlert()
1055 self
.alert
.deserialize(f
)
1057 def serialize(self
):
1059 r
+= self
.alert
.serialize()
1063 return "msg_alert(alert=%s)" % (repr(self
.alert
), )
1069 def __init__(self
, inv
=None):
1075 def deserialize(self
, f
):
1076 self
.inv
= deser_vector(f
, CInv
)
1078 def serialize(self
):
1079 return ser_vector(self
.inv
)
1082 return "msg_inv(inv=%s)" % (repr(self
.inv
))
1085 class msg_getdata():
1086 command
= b
"getdata"
1088 def __init__(self
, inv
=None):
1089 self
.inv
= inv
if inv
!= None else []
1091 def deserialize(self
, f
):
1092 self
.inv
= deser_vector(f
, CInv
)
1094 def serialize(self
):
1095 return ser_vector(self
.inv
)
1098 return "msg_getdata(inv=%s)" % (repr(self
.inv
))
1101 class msg_getblocks():
1102 command
= b
"getblocks"
1105 self
.locator
= CBlockLocator()
1108 def deserialize(self
, f
):
1109 self
.locator
= CBlockLocator()
1110 self
.locator
.deserialize(f
)
1111 self
.hashstop
= deser_uint256(f
)
1113 def serialize(self
):
1115 r
+= self
.locator
.serialize()
1116 r
+= ser_uint256(self
.hashstop
)
1120 return "msg_getblocks(locator=%s hashstop=%064x)" \
1121 % (repr(self
.locator
), self
.hashstop
)
1127 def __init__(self
, tx
=CTransaction()):
1130 def deserialize(self
, f
):
1131 self
.tx
.deserialize(f
)
1133 def serialize(self
):
1134 return self
.tx
.serialize_without_witness()
1137 return "msg_tx(tx=%s)" % (repr(self
.tx
))
1139 class msg_witness_tx(msg_tx
):
1141 def serialize(self
):
1142 return self
.tx
.serialize_with_witness()
1148 def __init__(self
, block
=None):
1150 self
.block
= CBlock()
1154 def deserialize(self
, f
):
1155 self
.block
.deserialize(f
)
1157 def serialize(self
):
1158 return self
.block
.serialize()
1161 return "msg_block(block=%s)" % (repr(self
.block
))
1163 # for cases where a user needs tighter control over what is sent over the wire
1164 # note that the user must supply the name of the command, and the data
1165 class msg_generic():
1166 def __init__(self
, command
, data
=None):
1167 self
.command
= command
1170 def serialize(self
):
1174 return "msg_generic()"
1176 class msg_witness_block(msg_block
):
1178 def serialize(self
):
1179 r
= self
.block
.serialize(with_witness
=True)
1182 class msg_getaddr():
1183 command
= b
"getaddr"
1188 def deserialize(self
, f
):
1191 def serialize(self
):
1195 return "msg_getaddr()"
1198 class msg_ping_prebip31():
1204 def deserialize(self
, f
):
1207 def serialize(self
):
1211 return "msg_ping() (pre-bip31)"
1217 def __init__(self
, nonce
=0):
1220 def deserialize(self
, f
):
1221 self
.nonce
= struct
.unpack("<Q", f
.read(8))[0]
1223 def serialize(self
):
1225 r
+= struct
.pack("<Q", self
.nonce
)
1229 return "msg_ping(nonce=%08x)" % self
.nonce
1235 def __init__(self
, nonce
=0):
1238 def deserialize(self
, f
):
1239 self
.nonce
= struct
.unpack("<Q", f
.read(8))[0]
1241 def serialize(self
):
1243 r
+= struct
.pack("<Q", self
.nonce
)
1247 return "msg_pong(nonce=%08x)" % self
.nonce
1250 class msg_mempool():
1251 command
= b
"mempool"
1256 def deserialize(self
, f
):
1259 def serialize(self
):
1263 return "msg_mempool()"
1265 class msg_sendheaders():
1266 command
= b
"sendheaders"
1271 def deserialize(self
, f
):
1274 def serialize(self
):
1278 return "msg_sendheaders()"
1281 # getheaders message has
1284 # hash_stop (hash of last desired block header, 0 to get as many as possible)
1285 class msg_getheaders():
1286 command
= b
"getheaders"
1289 self
.locator
= CBlockLocator()
1292 def deserialize(self
, f
):
1293 self
.locator
= CBlockLocator()
1294 self
.locator
.deserialize(f
)
1295 self
.hashstop
= deser_uint256(f
)
1297 def serialize(self
):
1299 r
+= self
.locator
.serialize()
1300 r
+= ser_uint256(self
.hashstop
)
1304 return "msg_getheaders(locator=%s, stop=%064x)" \
1305 % (repr(self
.locator
), self
.hashstop
)
1308 # headers message has
1309 # <count> <vector of block headers>
1310 class msg_headers():
1311 command
= b
"headers"
1313 def __init__(self
, headers
=None):
1314 self
.headers
= headers
if headers
is not None else []
1316 def deserialize(self
, f
):
1317 # comment in bitcoind indicates these should be deserialized as blocks
1318 blocks
= deser_vector(f
, CBlock
)
1320 self
.headers
.append(CBlockHeader(x
))
1322 def serialize(self
):
1323 blocks
= [CBlock(x
) for x
in self
.headers
]
1324 return ser_vector(blocks
)
1327 return "msg_headers(headers=%s)" % repr(self
.headers
)
1332 REJECT_MALFORMED
= 1
1340 def deserialize(self
, f
):
1341 self
.message
= deser_string(f
)
1342 self
.code
= struct
.unpack("<B", f
.read(1))[0]
1343 self
.reason
= deser_string(f
)
1344 if (self
.code
!= self
.REJECT_MALFORMED
and
1345 (self
.message
== b
"block" or self
.message
== b
"tx")):
1346 self
.data
= deser_uint256(f
)
1348 def serialize(self
):
1349 r
= ser_string(self
.message
)
1350 r
+= struct
.pack("<B", self
.code
)
1351 r
+= ser_string(self
.reason
)
1352 if (self
.code
!= self
.REJECT_MALFORMED
and
1353 (self
.message
== b
"block" or self
.message
== b
"tx")):
1354 r
+= ser_uint256(self
.data
)
1358 return "msg_reject: %s %d %s [%064x]" \
1359 % (self
.message
, self
.code
, self
.reason
, self
.data
)
1361 class msg_feefilter():
1362 command
= b
"feefilter"
1364 def __init__(self
, feerate
=0):
1365 self
.feerate
= feerate
1367 def deserialize(self
, f
):
1368 self
.feerate
= struct
.unpack("<Q", f
.read(8))[0]
1370 def serialize(self
):
1372 r
+= struct
.pack("<Q", self
.feerate
)
1376 return "msg_feefilter(feerate=%08x)" % self
.feerate
1378 class msg_sendcmpct():
1379 command
= b
"sendcmpct"
1382 self
.announce
= False
1385 def deserialize(self
, f
):
1386 self
.announce
= struct
.unpack("<?", f
.read(1))[0]
1387 self
.version
= struct
.unpack("<Q", f
.read(8))[0]
1389 def serialize(self
):
1391 r
+= struct
.pack("<?", self
.announce
)
1392 r
+= struct
.pack("<Q", self
.version
)
1396 return "msg_sendcmpct(announce=%s, version=%lu)" % (self
.announce
, self
.version
)
1398 class msg_cmpctblock():
1399 command
= b
"cmpctblock"
1401 def __init__(self
, header_and_shortids
= None):
1402 self
.header_and_shortids
= header_and_shortids
1404 def deserialize(self
, f
):
1405 self
.header_and_shortids
= P2PHeaderAndShortIDs()
1406 self
.header_and_shortids
.deserialize(f
)
1408 def serialize(self
):
1410 r
+= self
.header_and_shortids
.serialize()
1414 return "msg_cmpctblock(HeaderAndShortIDs=%s)" % repr(self
.header_and_shortids
)
1416 class msg_getblocktxn():
1417 command
= b
"getblocktxn"
1420 self
.block_txn_request
= None
1422 def deserialize(self
, f
):
1423 self
.block_txn_request
= BlockTransactionsRequest()
1424 self
.block_txn_request
.deserialize(f
)
1426 def serialize(self
):
1428 r
+= self
.block_txn_request
.serialize()
1432 return "msg_getblocktxn(block_txn_request=%s)" % (repr(self
.block_txn_request
))
1434 class msg_blocktxn():
1435 command
= b
"blocktxn"
1438 self
.block_transactions
= BlockTransactions()
1440 def deserialize(self
, f
):
1441 self
.block_transactions
.deserialize(f
)
1443 def serialize(self
):
1445 r
+= self
.block_transactions
.serialize()
1449 return "msg_blocktxn(block_transactions=%s)" % (repr(self
.block_transactions
))
1451 class msg_witness_blocktxn(msg_blocktxn
):
1452 def serialize(self
):
1454 r
+= self
.block_transactions
.serialize(with_witness
=True)
1458 """Callback and helper functions for P2P connection to a bitcoind node.
1460 Individual testcases should subclass this and override the on_* methods
1461 if they want to alter message handling behaviour.
1465 # Track whether we have a P2P connection open to the node
1466 self
.connected
= False
1467 self
.connection
= None
1469 # Track number of messages of each type received and the most recent
1470 # message of each type
1471 self
.message_count
= defaultdict(int)
1472 self
.last_message
= {}
1474 # A count of the number of ping messages we've sent to the node
1475 self
.ping_counter
= 1
1477 # deliver_sleep_time is helpful for debugging race conditions in p2p
1478 # tests; it causes message delivery to sleep for the specified time
1479 # before acquiring the global lock and delivering the next message.
1480 self
.deliver_sleep_time
= None
1482 # Message receiving methods
1484 def deliver(self
, conn
, message
):
1485 """Receive message and dispatch message to appropriate callback.
1487 We keep a count of how many of each message type has been received
1488 and the most recent message of each type.
1490 Optionally waits for deliver_sleep_time before dispatching message.
1493 deliver_sleep
= self
.get_deliver_sleep_time()
1494 if deliver_sleep
is not None:
1495 time
.sleep(deliver_sleep
)
1498 command
= message
.command
.decode('ascii')
1499 self
.message_count
[command
] += 1
1500 self
.last_message
[command
] = message
1501 getattr(self
, 'on_' + command
)(conn
, message
)
1503 print("ERROR delivering %s (%s)" % (repr(message
),
1507 def get_deliver_sleep_time(self
):
1509 return self
.deliver_sleep_time
1511 # Callback methods. Can be overridden by subclasses in individual test
1512 # cases to provide custom message handling behaviour.
1514 def on_open(self
, conn
):
1515 self
.connected
= True
1517 def on_close(self
, conn
):
1518 self
.connected
= False
1519 self
.connection
= None
1521 def on_addr(self
, conn
, message
): pass
1522 def on_alert(self
, conn
, message
): pass
1523 def on_block(self
, conn
, message
): pass
1524 def on_blocktxn(self
, conn
, message
): pass
1525 def on_cmpctblock(self
, conn
, message
): pass
1526 def on_feefilter(self
, conn
, message
): pass
1527 def on_getaddr(self
, conn
, message
): pass
1528 def on_getblocks(self
, conn
, message
): pass
1529 def on_getblocktxn(self
, conn
, message
): pass
1530 def on_getdata(self
, conn
, message
): pass
1531 def on_getheaders(self
, conn
, message
): pass
1532 def on_headers(self
, conn
, message
): pass
1533 def on_mempool(self
, conn
): pass
1534 def on_pong(self
, conn
, message
): pass
1535 def on_reject(self
, conn
, message
): pass
1536 def on_sendcmpct(self
, conn
, message
): pass
1537 def on_sendheaders(self
, conn
, message
): pass
1538 def on_tx(self
, conn
, message
): pass
1540 def on_inv(self
, conn
, message
):
1541 want
= msg_getdata()
1542 for i
in message
.inv
:
1546 conn
.send_message(want
)
1548 def on_ping(self
, conn
, message
):
1549 if conn
.ver_send
> BIP0031_VERSION
:
1550 conn
.send_message(msg_pong(message
.nonce
))
1552 def on_verack(self
, conn
, message
):
1553 conn
.ver_recv
= conn
.ver_send
1554 self
.verack_received
= True
1556 def on_version(self
, conn
, message
):
1557 if message
.nVersion
>= 209:
1558 conn
.send_message(msg_verack())
1559 conn
.ver_send
= min(MY_VERSION
, message
.nVersion
)
1560 if message
.nVersion
< 209:
1561 conn
.ver_recv
= conn
.ver_send
1562 conn
.nServices
= message
.nServices
1564 # Connection helper methods
1566 def add_connection(self
, conn
):
1567 self
.connection
= conn
1569 def wait_for_disconnect(self
, timeout
=60):
1570 test_function
= lambda: not self
.connected
1571 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
1573 # Message receiving helper methods
1575 def wait_for_block(self
, blockhash
, timeout
=60):
1576 test_function
= lambda: self
.last_message
.get("block") and self
.last_message
["block"].block
.rehash() == blockhash
1577 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
1579 def wait_for_getdata(self
, timeout
=60):
1580 test_function
= lambda: self
.last_message
.get("getdata")
1581 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
1583 def wait_for_getheaders(self
, timeout
=60):
1584 test_function
= lambda: self
.last_message
.get("getheaders")
1585 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
1587 def wait_for_inv(self
, expected_inv
, timeout
=60):
1588 """Waits for an INV message and checks that the first inv object in the message was as expected."""
1589 if len(expected_inv
) > 1:
1590 raise NotImplementedError("wait_for_inv() will only verify the first inv object")
1591 test_function
= lambda: self
.last_message
.get("inv") and \
1592 self
.last_message
["inv"].inv
[0].type == expected_inv
[0].type and \
1593 self
.last_message
["inv"].inv
[0].hash == expected_inv
[0].hash
1594 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
1596 def wait_for_verack(self
, timeout
=60):
1597 test_function
= lambda: self
.message_count
["verack"]
1598 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
1600 # Message sending helper functions
1602 def send_message(self
, message
):
1604 self
.connection
.send_message(message
)
1606 logger
.error("Cannot send message. No connection to node!")
1608 def send_and_ping(self
, message
):
1609 self
.send_message(message
)
1610 self
.sync_with_ping()
1612 # Sync up with the node
1613 def sync_with_ping(self
, timeout
=60):
1614 self
.send_message(msg_ping(nonce
=self
.ping_counter
))
1615 test_function
= lambda: self
.last_message
.get("pong") and self
.last_message
["pong"].nonce
== self
.ping_counter
1616 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
1617 self
.ping_counter
+= 1
1619 # The actual NodeConn class
1620 # This class provides an interface for a p2p connection to a specified node
1621 class NodeConn(asyncore
.dispatcher
):
1623 b
"version": msg_version
,
1624 b
"verack": msg_verack
,
1626 b
"alert": msg_alert
,
1628 b
"getdata": msg_getdata
,
1629 b
"getblocks": msg_getblocks
,
1631 b
"block": msg_block
,
1632 b
"getaddr": msg_getaddr
,
1635 b
"headers": msg_headers
,
1636 b
"getheaders": msg_getheaders
,
1637 b
"reject": msg_reject
,
1638 b
"mempool": msg_mempool
,
1639 b
"feefilter": msg_feefilter
,
1640 b
"sendheaders": msg_sendheaders
,
1641 b
"sendcmpct": msg_sendcmpct
,
1642 b
"cmpctblock": msg_cmpctblock
,
1643 b
"getblocktxn": msg_getblocktxn
,
1644 b
"blocktxn": msg_blocktxn
1647 "mainnet": b
"\xf9\xbe\xb4\xd9", # mainnet
1648 "testnet3": b
"\x0b\x11\x09\x07", # testnet3
1649 "regtest": b
"\xfa\xbf\xb5\xda", # regtest
1652 def __init__(self
, dstaddr
, dstport
, rpc
, callback
, net
="regtest", services
=NODE_NETWORK|NODE_WITNESS
, send_version
=True):
1653 asyncore
.dispatcher
.__init
__(self
, map=mininode_socket_map
)
1654 self
.dstaddr
= dstaddr
1655 self
.dstport
= dstport
1656 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1657 self
.socket
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, 1)
1663 self
.state
= "connecting"
1666 self
.disconnect
= False
1670 # stuff version msg into sendbuf
1672 vt
.nServices
= services
1673 vt
.addrTo
.ip
= self
.dstaddr
1674 vt
.addrTo
.port
= self
.dstport
1675 vt
.addrFrom
.ip
= "0.0.0.0"
1676 vt
.addrFrom
.port
= 0
1677 self
.send_message(vt
, True)
1679 logger
.info('Connecting to Bitcoin Node: %s:%d' % (self
.dstaddr
, self
.dstport
))
1682 self
.connect((dstaddr
, dstport
))
1687 def handle_connect(self
):
1688 if self
.state
!= "connected":
1689 logger
.debug("Connected & Listening: %s:%d" % (self
.dstaddr
, self
.dstport
))
1690 self
.state
= "connected"
1691 self
.cb
.on_open(self
)
1693 def handle_close(self
):
1694 logger
.debug("Closing connection to: %s:%d" % (self
.dstaddr
, self
.dstport
))
1695 self
.state
= "closed"
1702 self
.cb
.on_close(self
)
1704 def handle_read(self
):
1715 pre_connection
= self
.state
== "connecting"
1716 length
= len(self
.sendbuf
)
1717 return (length
> 0 or pre_connection
)
1719 def handle_write(self
):
1721 # asyncore does not expose socket connection, only the first read/write
1722 # event, thus we must check connection manually here to know when we
1724 if self
.state
== "connecting":
1725 self
.handle_connect()
1726 if not self
.writable():
1730 sent
= self
.send(self
.sendbuf
)
1734 self
.sendbuf
= self
.sendbuf
[sent
:]
1739 if len(self
.recvbuf
) < 4:
1741 if self
.recvbuf
[:4] != self
.MAGIC_BYTES
[self
.network
]:
1742 raise ValueError("got garbage %s" % repr(self
.recvbuf
))
1743 if self
.ver_recv
< 209:
1744 if len(self
.recvbuf
) < 4 + 12 + 4:
1746 command
= self
.recvbuf
[4:4+12].split(b
"\x00", 1)[0]
1747 msglen
= struct
.unpack("<i", self
.recvbuf
[4+12:4+12+4])[0]
1749 if len(self
.recvbuf
) < 4 + 12 + 4 + msglen
:
1751 msg
= self
.recvbuf
[4+12+4:4+12+4+msglen
]
1752 self
.recvbuf
= self
.recvbuf
[4+12+4+msglen
:]
1754 if len(self
.recvbuf
) < 4 + 12 + 4 + 4:
1756 command
= self
.recvbuf
[4:4+12].split(b
"\x00", 1)[0]
1757 msglen
= struct
.unpack("<i", self
.recvbuf
[4+12:4+12+4])[0]
1758 checksum
= self
.recvbuf
[4+12+4:4+12+4+4]
1759 if len(self
.recvbuf
) < 4 + 12 + 4 + 4 + msglen
:
1761 msg
= self
.recvbuf
[4+12+4+4:4+12+4+4+msglen
]
1764 if checksum
!= h
[:4]:
1765 raise ValueError("got bad checksum " + repr(self
.recvbuf
))
1766 self
.recvbuf
= self
.recvbuf
[4+12+4+4+msglen
:]
1767 if command
in self
.messagemap
:
1769 t
= self
.messagemap
[command
]()
1773 logger
.warning("Received unknown command from %s:%d: '%s' %s" % (self
.dstaddr
, self
.dstport
, command
, repr(msg
)))
1774 raise ValueError("Unknown command: '%s'" % (command
))
1775 except Exception as e
:
1776 logger
.exception('got_data:', repr(e
))
1779 def send_message(self
, message
, pushbuf
=False):
1780 if self
.state
!= "connected" and not pushbuf
:
1781 raise IOError('Not connected, no pushbuf')
1782 self
._log
_message
("send", message
)
1783 command
= message
.command
1784 data
= message
.serialize()
1785 tmsg
= self
.MAGIC_BYTES
[self
.network
]
1787 tmsg
+= b
"\x00" * (12 - len(command
))
1788 tmsg
+= struct
.pack("<I", len(data
))
1789 if self
.ver_send
>= 209:
1795 if (len(self
.sendbuf
) == 0 and not pushbuf
):
1797 sent
= self
.send(tmsg
)
1798 self
.sendbuf
= tmsg
[sent
:]
1799 except BlockingIOError
:
1802 self
.sendbuf
+= tmsg
1803 self
.last_sent
= time
.time()
1805 def got_message(self
, message
):
1806 if message
.command
== b
"version":
1807 if message
.nVersion
<= BIP0031_VERSION
:
1808 self
.messagemap
[b
'ping'] = msg_ping_prebip31
1809 if self
.last_sent
+ 30 * 60 < time
.time():
1810 self
.send_message(self
.messagemap
[b
'ping']())
1811 self
._log
_message
("receive", message
)
1812 self
.cb
.deliver(self
, message
)
1814 def _log_message(self
, direction
, msg
):
1815 if direction
== "send":
1816 log_message
= "Send message to "
1817 elif direction
== "receive":
1818 log_message
= "Received message from "
1819 log_message
+= "%s:%d: %s" % (self
.dstaddr
, self
.dstport
, repr(msg
)[:500])
1820 if len(log_message
) > 500:
1821 log_message
+= "... (msg truncated)"
1822 logger
.debug(log_message
)
1824 def disconnect_node(self
):
1825 self
.disconnect
= True
1828 class NetworkThread(Thread
):
1830 while mininode_socket_map
:
1831 # We check for whether to disconnect outside of the asyncore
1832 # loop to workaround the behavior of asyncore when using
1835 for fd
, obj
in mininode_socket_map
.items():
1837 disconnected
.append(obj
)
1838 [ obj
.handle_close() for obj
in disconnected
]
1839 asyncore
.loop(0.1, use_poll
=True, map=mininode_socket_map
, count
=1)
1840 logger
.debug("Network thread closing")
1843 # An exception we can raise if we detect a potential disconnect
1844 # (p2p or rpc) before the test is complete
1845 class EarlyDisconnectError(Exception):
1846 def __init__(self
, value
):
1850 return repr(self
.value
)