s4-tests: delete_force was unused, removed it.
[Samba.git] / wintest / wintest.py
blob36d0659f031f3b064b1fcab0a468a8e5db894039
1 #!/usr/bin/env python
3 '''automated testing library for testing Samba against windows'''
5 import pexpect, subprocess
6 import sys, os, time, re
8 class wintest():
9 '''testing of Samba against windows VMs'''
11 def __init__(self):
12 self.vars = {}
13 self.list_mode = False
14 os.putenv('PYTHONUNBUFFERED', '1')
16 def setvar(self, varname, value):
17 '''set a substitution variable'''
18 self.vars[varname] = value
20 def getvar(self, varname):
21 '''return a substitution variable'''
22 if not varname in self.vars:
23 return None
24 return self.vars[varname]
26 def setwinvars(self, vm, prefix='WIN'):
27 '''setup WIN_XX vars based on a vm name'''
28 for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'BASEDN', 'REALM', 'DOMAIN', 'IP']:
29 vname = '%s_%s' % (vm, v)
30 if vname in self.vars:
31 self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
32 else:
33 self.vars.pop("%s_%s" % (prefix,v), None)
35 def info(self, msg):
36 '''print some information'''
37 if not self.list_mode:
38 print(self.substitute(msg))
40 def load_config(self, fname):
41 '''load the config file'''
42 f = open(fname)
43 for line in f:
44 line = line.strip()
45 if len(line) == 0 or line[0] == '#':
46 continue
47 colon = line.find(':')
48 if colon == -1:
49 raise RuntimeError("Invalid config line '%s'" % line)
50 varname = line[0:colon].strip()
51 value = line[colon+1:].strip()
52 self.setvar(varname, value)
54 def list_steps_mode(self):
55 '''put wintest in step listing mode'''
56 self.list_mode = True
58 def set_skip(self, skiplist):
59 '''set a list of tests to skip'''
60 self.skiplist = skiplist.split(',')
62 def skip(self, step):
63 '''return True if we should skip a step'''
64 if self.list_mode:
65 print("\t%s" % step)
66 return True
67 return step in self.skiplist
69 def substitute(self, text):
70 """Substitute strings of the form ${NAME} in text, replacing
71 with substitutions from vars.
72 """
73 if isinstance(text, list):
74 ret = text[:]
75 for i in range(len(ret)):
76 ret[i] = self.substitute(ret[i])
77 return ret
79 """We may have objects such as pexpect.EOF that are not strings"""
80 if not isinstance(text, str):
81 return text
82 while True:
83 var_start = text.find("${")
84 if var_start == -1:
85 return text
86 var_end = text.find("}", var_start)
87 if var_end == -1:
88 return text
89 var_name = text[var_start+2:var_end]
90 if not var_name in self.vars:
91 raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
92 text = text.replace("${%s}" % var_name, self.vars[var_name])
93 return text
95 def have_var(self, varname):
96 '''see if a variable has been set'''
97 return varname in self.vars
100 def putenv(self, key, value):
101 '''putenv with substitution'''
102 os.putenv(key, self.substitute(value))
104 def chdir(self, dir):
105 '''chdir with substitution'''
106 os.chdir(self.substitute(dir))
108 def del_files(self, dirs):
109 '''delete all files in the given directory'''
110 for d in dirs:
111 self.run_cmd("find %s -type f | xargs rm -f" % d)
113 def write_file(self, filename, text, mode='w'):
114 '''write to a file'''
115 f = open(self.substitute(filename), mode=mode)
116 f.write(self.substitute(text))
117 f.close()
119 def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
120 '''run a command'''
121 cmd = self.substitute(cmd)
122 if isinstance(cmd, list):
123 self.info('$ ' + " ".join(cmd))
124 else:
125 self.info('$ ' + cmd)
126 if output:
127 return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
128 if isinstance(cmd, list):
129 shell=False
130 else:
131 shell=True
132 if checkfail:
133 return subprocess.check_call(cmd, shell=shell, cwd=dir)
134 else:
135 return subprocess.call(cmd, shell=shell, cwd=dir)
138 def run_child(self, cmd, dir="."):
139 '''create a child and return the Popen handle to it'''
140 cwd = os.getcwd()
141 cmd = self.substitute(cmd)
142 if isinstance(cmd, list):
143 self.info('$ ' + " ".join(cmd))
144 else:
145 self.info('$ ' + cmd)
146 if isinstance(cmd, list):
147 shell=False
148 else:
149 shell=True
150 os.chdir(dir)
151 ret = subprocess.Popen(cmd, shell=shell, stderr=subprocess.STDOUT)
152 os.chdir(cwd)
153 return ret
155 def cmd_output(self, cmd):
156 '''return output from and command'''
157 cmd = self.substitute(cmd)
158 return self.run_cmd(cmd, output=True)
160 def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False,
161 casefold=False):
162 '''check that command output contains the listed strings'''
164 if isinstance(contains, str):
165 contains = [contains]
167 out = self.cmd_output(cmd)
168 self.info(out)
169 for c in self.substitute(contains):
170 if regex:
171 m = re.search(c, out)
172 if m is None:
173 start = -1
174 end = -1
175 else:
176 start = m.start()
177 end = m.end()
178 elif casefold:
179 start = out.upper().find(c.upper())
180 end = start + len(c)
181 else:
182 start = out.find(c)
183 end = start + len(c)
184 if nomatch:
185 if start != -1:
186 raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
187 else:
188 if start == -1:
189 raise RuntimeError("Expected to see %s in %s" % (c, cmd))
190 if ordered and start != -1:
191 out = out[end:]
193 def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False,
194 ordered=False, regex=False, casefold=False):
195 '''retry a command a number of times'''
196 while retries > 0:
197 try:
198 self.cmd_contains(cmd, contains, nomatch=wait_for_fail,
199 ordered=ordered, regex=regex, casefold=casefold)
200 return
201 except:
202 time.sleep(delay)
203 retries = retries - 1
204 raise RuntimeError("Failed to find %s" % contains)
206 def pexpect_spawn(self, cmd, timeout=60, crlf=True):
207 '''wrapper around pexpect spawn'''
208 cmd = self.substitute(cmd)
209 self.info("$ " + cmd)
210 ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
212 def sendline_sub(line):
213 line = self.substitute(line).replace('\n', '\r\n')
214 return ret.old_sendline(line + '\r')
216 def expect_sub(line, timeout=ret.timeout):
217 line = self.substitute(line)
218 return ret.old_expect(line, timeout=timeout)
220 if crlf:
221 ret.old_sendline = ret.sendline
222 ret.sendline = sendline_sub
223 ret.old_expect = ret.expect
224 ret.expect = expect_sub
226 return ret
228 def get_nameserver(self):
229 '''Get the current nameserver from /etc/resolv.conf'''
230 child = self.pexpect_spawn('cat /etc/resolv.conf', crlf=False)
231 i = child.expect(['Generated by wintest', 'nameserver'])
232 if i == 0:
233 child.expect('your original resolv.conf')
234 child.expect('nameserver')
235 child.expect('\d+.\d+.\d+.\d+')
236 return child.after
238 def vm_poweroff(self, vmname, checkfail=True):
239 '''power off a VM'''
240 self.setvar('VMNAME', vmname)
241 self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
243 def vm_restore(self, vmname, snapshot):
244 '''restore a VM'''
245 self.setvar('VMNAME', vmname)
246 self.setvar('SNAPSHOT', snapshot)
247 self.run_cmd("${VM_RESTORE}")
249 def ping_wait(self, hostname):
250 '''wait for a hostname to come up on the network'''
251 hostname = self.substitute(hostname)
252 loops=10
253 while loops > 0:
254 try:
255 self.run_cmd("ping -c 1 -w 10 %s" % hostname)
256 break
257 except:
258 loops = loops - 1
259 if loops == 0:
260 raise RuntimeError("Failed to ping %s" % hostname)
261 self.info("Host %s is up" % hostname)
263 def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False):
264 '''wait for a host to come up on the network'''
265 self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
266 retries=retries, delay=delay, wait_for_fail=wait_for_fail)
268 def run_net_time(self, child):
269 '''run net time on windows'''
270 child.sendline("net time \\\\${HOSTNAME} /set")
271 child.expect("Do you want to set the local computer")
272 child.sendline("Y")
273 child.expect("The command completed successfully")
275 def run_date_time(self, child, time_tuple=None):
276 '''run date and time on windows'''
277 if time_tuple is None:
278 time_tuple = time.localtime()
279 child.sendline("date")
280 child.expect("Enter the new date:")
281 i = child.expect(["dd-mm-yy", "mm-dd-yy"])
282 if i == 0:
283 child.sendline(time.strftime("%d-%m-%y", time_tuple))
284 else:
285 child.sendline(time.strftime("%m-%d-%y", time_tuple))
286 child.expect("C:")
287 child.sendline("time")
288 child.expect("Enter the new time:")
289 child.sendline(time.strftime("%H:%M:%S", time_tuple))
290 child.expect("C:")
292 def get_ipconfig(self, child):
293 '''get the IP configuration of the child'''
294 child.sendline("ipconfig /all")
295 child.expect('Ethernet adapter ')
296 child.expect("[\w\s]+")
297 self.setvar("WIN_NIC", child.after)
298 child.expect(['IPv4 Address', 'IP Address'])
299 child.expect('\d+.\d+.\d+.\d+')
300 self.setvar('WIN_IPV4_ADDRESS', child.after)
301 child.expect('Subnet Mask')
302 child.expect('\d+.\d+.\d+.\d+')
303 self.setvar('WIN_SUBNET_MASK', child.after)
304 child.expect('Default Gateway')
305 child.expect('\d+.\d+.\d+.\d+')
306 self.setvar('WIN_DEFAULT_GATEWAY', child.after)
307 child.expect("C:")
309 def run_tlntadmn(self, child):
310 '''remove the annoying telnet restrictions'''
311 child.sendline('tlntadmn config maxconn=1024')
312 child.expect("The settings were successfully updated")
313 child.expect("C:")
315 def disable_firewall(self, child):
316 '''remove the annoying firewall'''
317 child.sendline('netsh advfirewall set allprofiles state off')
318 i = child.expect(["Ok", "The following command was not found: advfirewall set allprofiles state off"])
319 child.expect("C:")
320 if i == 1:
321 child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL')
322 child.expect("Ok")
323 child.expect("C:")
325 def set_dns(self, child):
326 child.sendline('netsh interface ip set dns "${WIN_NIC}" static ${INTERFACE_IP} primary')
327 i = child.expect(['C:', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
328 if i > 0:
329 return True
330 else:
331 return False
333 def set_ip(self, child):
334 """fix the IP address to the same value it had when we
335 connected, but don't use DHCP, and force the DNS server to our
336 DNS server. This allows DNS updates to run"""
337 self.get_ipconfig(child)
338 if self.getvar("WIN_IPV4_ADDRESS") != self.getvar("WIN_IP"):
339 raise RuntimeError("ipconfig address %s != nmblookup address %s" % (self.getvar("WIN_IPV4_ADDRESS"),
340 self.getvar("WIN_IP")))
341 child.sendline('netsh')
342 child.expect('netsh>')
343 child.sendline('offline')
344 child.expect('netsh>')
345 child.sendline('routing ip add persistentroute dest=0.0.0.0 mask=0.0.0.0 name="${WIN_NIC}" nhop=${WIN_DEFAULT_GATEWAY}')
346 child.expect('netsh>')
347 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1 store=persistent')
348 i = child.expect(['The syntax supplied for this command is not valid. Check help for the correct syntax', 'netsh>', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
349 if i == 0:
350 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1')
351 child.expect('netsh>')
352 child.sendline('commit')
353 child.sendline('online')
354 child.sendline('exit')
356 child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
357 return True
360 def resolve_ip(self, hostname, retries=60, delay=5):
361 '''resolve an IP given a hostname, assuming NBT'''
362 while retries > 0:
363 child = self.pexpect_spawn("bin/nmblookup %s" % hostname)
364 i = child.expect(['\d+.\d+.\d+.\d+', "Lookup failed"])
365 if i == 0:
366 return child.after
367 retries -= 1
368 time.sleep(delay)
369 raise RuntimeError("Failed to resolve IP of %s" % hostname)
372 def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False,
373 disable_firewall=True, run_tlntadmn=True):
374 '''open a telnet connection to a windows server, return the pexpect child'''
375 set_route = False
376 set_dns = False
377 if self.getvar('WIN_IP'):
378 ip = self.getvar('WIN_IP')
379 else:
380 ip = self.resolve_ip(hostname)
381 self.setvar('WIN_IP', ip)
382 while retries > 0:
383 child = self.pexpect_spawn("telnet " + ip + " -l '" + username + "'")
384 i = child.expect(["Welcome to Microsoft Telnet Service",
385 "Denying new connections due to the limit on number of connections",
386 "No more connections are allowed to telnet server",
387 "Unable to connect to remote host",
388 "No route to host",
389 "Connection refused",
390 pexpect.EOF])
391 if i != 0:
392 child.close()
393 time.sleep(delay)
394 retries -= 1
395 continue
396 child.expect("password:")
397 child.sendline(password)
398 i = child.expect(["C:",
399 "Denying new connections due to the limit on number of connections",
400 "No more connections are allowed to telnet server",
401 "Unable to connect to remote host",
402 "No route to host",
403 "Connection refused",
404 pexpect.EOF])
405 if i != 0:
406 child.close()
407 time.sleep(delay)
408 retries -= 1
409 continue
410 if set_dns:
411 set_dns = False
412 if self.set_dns(child):
413 continue;
414 if set_route:
415 child.sendline('route add 0.0.0.0 mask 0.0.0.0 ${WIN_DEFAULT_GATEWAY}')
416 child.expect("C:")
417 set_route = False
418 if set_time:
419 self.run_date_time(child, None)
420 set_time = False
421 if run_tlntadmn:
422 self.run_tlntadmn(child)
423 run_tlntadmn = False
424 if disable_firewall:
425 self.disable_firewall(child)
426 disable_firewall = False
427 if set_ip:
428 set_ip = False
429 if self.set_ip(child):
430 set_route = True
431 set_dns = True
432 continue
433 return child
434 raise RuntimeError("Failed to connect with telnet")
436 def kinit(self, username, password):
437 '''use kinit to setup a credentials cache'''
438 self.run_cmd("kdestroy")
439 self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
440 username = self.substitute(username)
441 s = username.split('@')
442 if len(s) > 0:
443 s[1] = s[1].upper()
444 username = '@'.join(s)
445 child = self.pexpect_spawn('kinit -V ' + username)
446 child.expect("Password for")
447 child.sendline(password)
448 child.expect("Authenticated to Kerberos")
451 def get_domains(self):
452 '''return a dictionary of DNS domains and IPs for named.conf'''
453 ret = {}
454 for v in self.vars:
455 if v[-6:] == "_REALM":
456 base = v[:-6]
457 if base + '_IP' in self.vars:
458 ret[self.vars[base + '_REALM']] = self.vars[base + '_IP']
459 return ret