qa: Only allow disconnecting all NodeConns
[bitcoinplatinum.git] / test / functional / test_framework / netutil.py
blobe5d415788f453af0228cb9922a6e8d62d6808303
1 #!/usr/bin/env python3
2 # Copyright (c) 2014-2016 The Bitcoin Core developers
3 # Distributed under the MIT software license, see the accompanying
4 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 """Linux network utilities.
7 Roughly based on http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal
8 """
10 import sys
11 import socket
12 import fcntl
13 import struct
14 import array
15 import os
16 from binascii import unhexlify, hexlify
18 # STATE_ESTABLISHED = '01'
19 # STATE_SYN_SENT = '02'
20 # STATE_SYN_RECV = '03'
21 # STATE_FIN_WAIT1 = '04'
22 # STATE_FIN_WAIT2 = '05'
23 # STATE_TIME_WAIT = '06'
24 # STATE_CLOSE = '07'
25 # STATE_CLOSE_WAIT = '08'
26 # STATE_LAST_ACK = '09'
27 STATE_LISTEN = '0A'
28 # STATE_CLOSING = '0B'
30 def get_socket_inodes(pid):
31 '''
32 Get list of socket inodes for process pid.
33 '''
34 base = '/proc/%i/fd' % pid
35 inodes = []
36 for item in os.listdir(base):
37 target = os.readlink(os.path.join(base, item))
38 if target.startswith('socket:'):
39 inodes.append(int(target[8:-1]))
40 return inodes
42 def _remove_empty(array):
43 return [x for x in array if x !='']
45 def _convert_ip_port(array):
46 host,port = array.split(':')
47 # convert host from mangled-per-four-bytes form as used by kernel
48 host = unhexlify(host)
49 host_out = ''
50 for x in range(0, len(host) // 4):
51 (val,) = struct.unpack('=I', host[x*4:(x+1)*4])
52 host_out += '%08x' % val
54 return host_out,int(port,16)
56 def netstat(typ='tcp'):
57 '''
58 Function to return a list with status of tcp connections at linux systems
59 To get pid of all network process running on system, you must run this script
60 as superuser
61 '''
62 with open('/proc/net/'+typ,'r',encoding='utf8') as f:
63 content = f.readlines()
64 content.pop(0)
65 result = []
66 for line in content:
67 line_array = _remove_empty(line.split(' ')) # Split lines and remove empty spaces.
68 tcp_id = line_array[0]
69 l_addr = _convert_ip_port(line_array[1])
70 r_addr = _convert_ip_port(line_array[2])
71 state = line_array[3]
72 inode = int(line_array[9]) # Need the inode to match with process pid.
73 nline = [tcp_id, l_addr, r_addr, state, inode]
74 result.append(nline)
75 return result
77 def get_bind_addrs(pid):
78 '''
79 Get bind addresses as (host,port) tuples for process pid.
80 '''
81 inodes = get_socket_inodes(pid)
82 bind_addrs = []
83 for conn in netstat('tcp') + netstat('tcp6'):
84 if conn[3] == STATE_LISTEN and conn[4] in inodes:
85 bind_addrs.append(conn[1])
86 return bind_addrs
88 # from: http://code.activestate.com/recipes/439093/
89 def all_interfaces():
90 '''
91 Return all interfaces that are up
92 '''
93 is_64bits = sys.maxsize > 2**32
94 struct_size = 40 if is_64bits else 32
95 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
96 max_possible = 8 # initial value
97 while True:
98 bytes = max_possible * struct_size
99 names = array.array('B', b'\0' * bytes)
100 outbytes = struct.unpack('iL', fcntl.ioctl(
101 s.fileno(),
102 0x8912, # SIOCGIFCONF
103 struct.pack('iL', bytes, names.buffer_info()[0])
104 ))[0]
105 if outbytes == bytes:
106 max_possible *= 2
107 else:
108 break
109 namestr = names.tostring()
110 return [(namestr[i:i+16].split(b'\0', 1)[0],
111 socket.inet_ntoa(namestr[i+20:i+24]))
112 for i in range(0, outbytes, struct_size)]
114 def addr_to_hex(addr):
116 Convert string IPv4 or IPv6 address to binary address as returned by
117 get_bind_addrs.
118 Very naive implementation that certainly doesn't work for all IPv6 variants.
120 if '.' in addr: # IPv4
121 addr = [int(x) for x in addr.split('.')]
122 elif ':' in addr: # IPv6
123 sub = [[], []] # prefix, suffix
124 x = 0
125 addr = addr.split(':')
126 for i,comp in enumerate(addr):
127 if comp == '':
128 if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end
129 continue
130 x += 1 # :: skips to suffix
131 assert(x < 2)
132 else: # two bytes per component
133 val = int(comp, 16)
134 sub[x].append(val >> 8)
135 sub[x].append(val & 0xff)
136 nullbytes = 16 - len(sub[0]) - len(sub[1])
137 assert((x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0))
138 addr = sub[0] + ([0] * nullbytes) + sub[1]
139 else:
140 raise ValueError('Could not parse address %s' % addr)
141 return hexlify(bytearray(addr)).decode('ascii')
143 def test_ipv6_local():
145 Check for (local) IPv6 support.
147 import socket
148 # By using SOCK_DGRAM this will not actually make a connection, but it will
149 # fail if there is no route to IPv6 localhost.
150 have_ipv6 = True
151 try:
152 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
153 s.connect(('::1', 0))
154 except socket.error:
155 have_ipv6 = False
156 return have_ipv6