smbprinting: fix wrong == in shell tests
[Samba/gebeck_regimport.git] / wintest / wintest.py
blobb8e6ea2dd19d69aa69e04402f86ecf1af52c0262
1 #!/usr/bin/env python
3 '''automated testing library for testing Samba against windows'''
5 import pexpect, subprocess
6 import sys, os, time, re
8 class wintest():
9 '''testing of Samba against windows VMs'''
11 def __init__(self):
12 self.vars = {}
13 self.list_mode = False
14 self.vms = None
15 os.putenv('PYTHONUNBUFFERED', '1')
17 def setvar(self, varname, value):
18 '''set a substitution variable'''
19 self.vars[varname] = value
21 def getvar(self, varname):
22 '''return a substitution variable'''
23 if not varname in self.vars:
24 return None
25 return self.vars[varname]
27 def setwinvars(self, vm, prefix='WIN'):
28 '''setup WIN_XX vars based on a vm name'''
29 for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'REALM', 'DOMAIN', 'IP']:
30 vname = '%s_%s' % (vm, v)
31 if vname in self.vars:
32 self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
33 else:
34 self.vars.pop("%s_%s" % (prefix,v), None)
36 if self.getvar("WIN_REALM"):
37 self.setvar("WIN_REALM", self.getvar("WIN_REALM").upper())
38 self.setvar("WIN_LCREALM", self.getvar("WIN_REALM").lower())
39 dnsdomain = self.getvar("WIN_REALM")
40 self.setvar("WIN_BASEDN", "DC=" + dnsdomain.replace(".", ",DC="))
42 def info(self, msg):
43 '''print some information'''
44 if not self.list_mode:
45 print(self.substitute(msg))
47 def load_config(self, fname):
48 '''load the config file'''
49 f = open(fname)
50 for line in f:
51 line = line.strip()
52 if len(line) == 0 or line[0] == '#':
53 continue
54 colon = line.find(':')
55 if colon == -1:
56 raise RuntimeError("Invalid config line '%s'" % line)
57 varname = line[0:colon].strip()
58 value = line[colon+1:].strip()
59 self.setvar(varname, value)
61 def list_steps_mode(self):
62 '''put wintest in step listing mode'''
63 self.list_mode = True
65 def set_skip(self, skiplist):
66 '''set a list of tests to skip'''
67 self.skiplist = skiplist.split(',')
69 def set_vms(self, vms):
70 '''set a list of VMs to test'''
71 if vms is not None:
72 self.vms = vms.split(',')
74 def skip(self, step):
75 '''return True if we should skip a step'''
76 if self.list_mode:
77 print("\t%s" % step)
78 return True
79 return step in self.skiplist
81 def substitute(self, text):
82 """Substitute strings of the form ${NAME} in text, replacing
83 with substitutions from vars.
84 """
85 if isinstance(text, list):
86 ret = text[:]
87 for i in range(len(ret)):
88 ret[i] = self.substitute(ret[i])
89 return ret
91 """We may have objects such as pexpect.EOF that are not strings"""
92 if not isinstance(text, str):
93 return text
94 while True:
95 var_start = text.find("${")
96 if var_start == -1:
97 return text
98 var_end = text.find("}", var_start)
99 if var_end == -1:
100 return text
101 var_name = text[var_start+2:var_end]
102 if not var_name in self.vars:
103 raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
104 text = text.replace("${%s}" % var_name, self.vars[var_name])
105 return text
107 def have_var(self, varname):
108 '''see if a variable has been set'''
109 return varname in self.vars
111 def have_vm(self, vmname):
112 '''see if a VM should be used'''
113 if not self.have_var(vmname + '_VM'):
114 return False
115 if self.vms is None:
116 return True
117 return vmname in self.vms
119 def putenv(self, key, value):
120 '''putenv with substitution'''
121 os.putenv(key, self.substitute(value))
123 def chdir(self, dir):
124 '''chdir with substitution'''
125 os.chdir(self.substitute(dir))
127 def del_files(self, dirs):
128 '''delete all files in the given directory'''
129 for d in dirs:
130 self.run_cmd("find %s -type f | xargs rm -f" % d)
132 def write_file(self, filename, text, mode='w'):
133 '''write to a file'''
134 f = open(self.substitute(filename), mode=mode)
135 f.write(self.substitute(text))
136 f.close()
138 def run_cmd(self, cmd, dir=".", show=None, output=False, checkfail=True):
139 '''run a command'''
140 cmd = self.substitute(cmd)
141 if isinstance(cmd, list):
142 self.info('$ ' + " ".join(cmd))
143 else:
144 self.info('$ ' + cmd)
145 if output:
146 return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
147 if isinstance(cmd, list):
148 shell=False
149 else:
150 shell=True
151 if checkfail:
152 return subprocess.check_call(cmd, shell=shell, cwd=dir)
153 else:
154 return subprocess.call(cmd, shell=shell, cwd=dir)
157 def run_child(self, cmd, dir="."):
158 '''create a child and return the Popen handle to it'''
159 cwd = os.getcwd()
160 cmd = self.substitute(cmd)
161 if isinstance(cmd, list):
162 self.info('$ ' + " ".join(cmd))
163 else:
164 self.info('$ ' + cmd)
165 if isinstance(cmd, list):
166 shell=False
167 else:
168 shell=True
169 os.chdir(dir)
170 ret = subprocess.Popen(cmd, shell=shell, stderr=subprocess.STDOUT)
171 os.chdir(cwd)
172 return ret
174 def cmd_output(self, cmd):
175 '''return output from and command'''
176 cmd = self.substitute(cmd)
177 return self.run_cmd(cmd, output=True)
179 def cmd_contains(self, cmd, contains, nomatch=False, ordered=False, regex=False,
180 casefold=True):
181 '''check that command output contains the listed strings'''
183 if isinstance(contains, str):
184 contains = [contains]
186 out = self.cmd_output(cmd)
187 self.info(out)
188 for c in self.substitute(contains):
189 if regex:
190 if casefold:
191 c = c.upper()
192 out = out.upper()
193 m = re.search(c, out)
194 if m is None:
195 start = -1
196 end = -1
197 else:
198 start = m.start()
199 end = m.end()
200 elif casefold:
201 start = out.upper().find(c.upper())
202 end = start + len(c)
203 else:
204 start = out.find(c)
205 end = start + len(c)
206 if nomatch:
207 if start != -1:
208 raise RuntimeError("Expected to not see %s in %s" % (c, cmd))
209 else:
210 if start == -1:
211 raise RuntimeError("Expected to see %s in %s" % (c, cmd))
212 if ordered and start != -1:
213 out = out[end:]
215 def retry_cmd(self, cmd, contains, retries=30, delay=2, wait_for_fail=False,
216 ordered=False, regex=False, casefold=True):
217 '''retry a command a number of times'''
218 while retries > 0:
219 try:
220 self.cmd_contains(cmd, contains, nomatch=wait_for_fail,
221 ordered=ordered, regex=regex, casefold=casefold)
222 return
223 except:
224 time.sleep(delay)
225 retries -= 1
226 self.info("retrying (retries=%u delay=%u)" % (retries, delay))
227 raise RuntimeError("Failed to find %s" % contains)
229 def pexpect_spawn(self, cmd, timeout=60, crlf=True, casefold=True):
230 '''wrapper around pexpect spawn'''
231 cmd = self.substitute(cmd)
232 self.info("$ " + cmd)
233 ret = pexpect.spawn(cmd, logfile=sys.stdout, timeout=timeout)
235 def sendline_sub(line):
236 line = self.substitute(line)
237 if crlf:
238 line = line.replace('\n', '\r\n') + '\r'
239 return ret.old_sendline(line)
241 def expect_sub(line, timeout=ret.timeout, casefold=casefold):
242 line = self.substitute(line)
243 if casefold:
244 if isinstance(line, list):
245 for i in range(len(line)):
246 if isinstance(line[i], str):
247 line[i] = '(?i)' + line[i]
248 elif isinstance(line, str):
249 line = '(?i)' + line
250 return ret.old_expect(line, timeout=timeout)
252 ret.old_sendline = ret.sendline
253 ret.sendline = sendline_sub
254 ret.old_expect = ret.expect
255 ret.expect = expect_sub
257 return ret
259 def get_nameserver(self):
260 '''Get the current nameserver from /etc/resolv.conf'''
261 child = self.pexpect_spawn('cat /etc/resolv.conf', crlf=False)
262 i = child.expect(['Generated by wintest', 'nameserver'])
263 if i == 0:
264 child.expect('your original resolv.conf')
265 child.expect('nameserver')
266 child.expect('\d+.\d+.\d+.\d+')
267 return child.after
269 def vm_poweroff(self, vmname, checkfail=True):
270 '''power off a VM'''
271 self.setvar('VMNAME', vmname)
272 self.run_cmd("${VM_POWEROFF}", checkfail=checkfail)
274 def vm_reset(self, vmname):
275 '''reset a VM'''
276 self.setvar('VMNAME', vmname)
277 self.run_cmd("${VM_RESET}")
279 def vm_restore(self, vmname, snapshot):
280 '''restore a VM'''
281 self.setvar('VMNAME', vmname)
282 self.setvar('SNAPSHOT', snapshot)
283 self.run_cmd("${VM_RESTORE}")
285 def ping_wait(self, hostname):
286 '''wait for a hostname to come up on the network'''
287 hostname = self.substitute(hostname)
288 loops=10
289 while loops > 0:
290 try:
291 self.run_cmd("ping -c 1 -w 10 %s" % hostname)
292 break
293 except:
294 loops = loops - 1
295 if loops == 0:
296 raise RuntimeError("Failed to ping %s" % hostname)
297 self.info("Host %s is up" % hostname)
299 def port_wait(self, hostname, port, retries=200, delay=3, wait_for_fail=False):
300 '''wait for a host to come up on the network'''
301 self.retry_cmd("nc -v -z -w 1 %s %u" % (hostname, port), ['succeeded'],
302 retries=retries, delay=delay, wait_for_fail=wait_for_fail)
304 def run_net_time(self, child):
305 '''run net time on windows'''
306 child.sendline("net time \\\\${HOSTNAME} /set")
307 child.expect("Do you want to set the local computer")
308 child.sendline("Y")
309 child.expect("The command completed successfully")
311 def run_date_time(self, child, time_tuple=None):
312 '''run date and time on windows'''
313 if time_tuple is None:
314 time_tuple = time.localtime()
315 child.sendline("date")
316 child.expect("Enter the new date:")
317 i = child.expect(["dd-mm-yy", "mm-dd-yy"])
318 if i == 0:
319 child.sendline(time.strftime("%d-%m-%y", time_tuple))
320 else:
321 child.sendline(time.strftime("%m-%d-%y", time_tuple))
322 child.expect("C:")
323 child.sendline("time")
324 child.expect("Enter the new time:")
325 child.sendline(time.strftime("%H:%M:%S", time_tuple))
326 child.expect("C:")
328 def get_ipconfig(self, child):
329 '''get the IP configuration of the child'''
330 child.sendline("ipconfig /all")
331 child.expect('Ethernet adapter ')
332 child.expect("[\w\s]+")
333 self.setvar("WIN_NIC", child.after)
334 child.expect(['IPv4 Address', 'IP Address'])
335 child.expect('\d+.\d+.\d+.\d+')
336 self.setvar('WIN_IPV4_ADDRESS', child.after)
337 child.expect('Subnet Mask')
338 child.expect('\d+.\d+.\d+.\d+')
339 self.setvar('WIN_SUBNET_MASK', child.after)
340 child.expect('Default Gateway')
341 child.expect('\d+.\d+.\d+.\d+')
342 self.setvar('WIN_DEFAULT_GATEWAY', child.after)
343 child.expect("C:")
345 def get_is_dc(self, child):
346 '''check if a windows machine is a domain controller'''
347 child.sendline("dcdiag")
348 i = child.expect(["is not a Directory Server",
349 "is not recognized as an internal or external command",
350 "Home Server = ",
351 "passed test Replications"])
352 if i == 0:
353 return False
354 if i == 1 or i == 3:
355 child.expect("C:")
356 child.sendline("net config Workstation")
357 child.expect("Workstation domain")
358 child.expect('[\S]+')
359 domain = child.after
360 i = child.expect(["Workstation Domain DNS Name", "Logon domain"])
361 '''If we get the Logon domain first, we are not in an AD domain'''
362 if i == 1:
363 return False
364 if domain.upper() == self.getvar("WIN_DOMAIN").upper():
365 return True
367 child.expect('[\S]+')
368 hostname = child.after
369 if hostname.upper() == self.getvar("WIN_HOSTNAME").upper():
370 return True
372 def run_tlntadmn(self, child):
373 '''remove the annoying telnet restrictions'''
374 child.sendline('tlntadmn config maxconn=1024')
375 child.expect("The settings were successfully updated")
376 child.expect("C:")
378 def disable_firewall(self, child):
379 '''remove the annoying firewall'''
380 child.sendline('netsh advfirewall set allprofiles state off')
381 i = child.expect(["Ok", "The following command was not found: advfirewall set allprofiles state off"])
382 child.expect("C:")
383 if i == 1:
384 child.sendline('netsh firewall set opmode mode = DISABLE profile = ALL')
385 i = child.expect(["Ok", "The following command was not found"])
386 if i != 0:
387 self.info("Firewall disable failed - ignoring")
388 child.expect("C:")
390 def set_dns(self, child):
391 child.sendline('netsh interface ip set dns "${WIN_NIC}" static ${INTERFACE_IP} primary')
392 i = child.expect(['C:', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
393 if i > 0:
394 return True
395 else:
396 return False
398 def set_ip(self, child):
399 """fix the IP address to the same value it had when we
400 connected, but don't use DHCP, and force the DNS server to our
401 DNS server. This allows DNS updates to run"""
402 self.get_ipconfig(child)
403 if self.getvar("WIN_IPV4_ADDRESS") != self.getvar("WIN_IP"):
404 raise RuntimeError("ipconfig address %s != nmblookup address %s" % (self.getvar("WIN_IPV4_ADDRESS"),
405 self.getvar("WIN_IP")))
406 child.sendline('netsh')
407 child.expect('netsh>')
408 child.sendline('offline')
409 child.expect('netsh>')
410 child.sendline('routing ip add persistentroute dest=0.0.0.0 mask=0.0.0.0 name="${WIN_NIC}" nhop=${WIN_DEFAULT_GATEWAY}')
411 child.expect('netsh>')
412 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1 store=persistent')
413 i = child.expect(['The syntax supplied for this command is not valid. Check help for the correct syntax', 'netsh>', pexpect.EOF, pexpect.TIMEOUT], timeout=5)
414 if i == 0:
415 child.sendline('interface ip set address "${WIN_NIC}" static ${WIN_IPV4_ADDRESS} ${WIN_SUBNET_MASK} ${WIN_DEFAULT_GATEWAY} 1')
416 child.expect('netsh>')
417 child.sendline('commit')
418 child.sendline('online')
419 child.sendline('exit')
421 child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
422 return True
425 def resolve_ip(self, hostname, retries=60, delay=5):
426 '''resolve an IP given a hostname, assuming NBT'''
427 while retries > 0:
428 child = self.pexpect_spawn("bin/nmblookup %s" % hostname)
429 i = child.expect(['\d+.\d+.\d+.\d+', "Lookup failed"])
430 if i == 0:
431 return child.after
432 retries -= 1
433 time.sleep(delay)
434 self.info("retrying (retries=%u delay=%u)" % (retries, delay))
435 raise RuntimeError("Failed to resolve IP of %s" % hostname)
438 def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False,
439 disable_firewall=True, run_tlntadmn=True):
440 '''open a telnet connection to a windows server, return the pexpect child'''
441 set_route = False
442 set_dns = False
443 if self.getvar('WIN_IP'):
444 ip = self.getvar('WIN_IP')
445 else:
446 ip = self.resolve_ip(hostname)
447 self.setvar('WIN_IP', ip)
448 while retries > 0:
449 child = self.pexpect_spawn("telnet " + ip + " -l '" + username + "'")
450 i = child.expect(["Welcome to Microsoft Telnet Service",
451 "Denying new connections due to the limit on number of connections",
452 "No more connections are allowed to telnet server",
453 "Unable to connect to remote host",
454 "No route to host",
455 "Connection refused",
456 pexpect.EOF])
457 if i != 0:
458 child.close()
459 time.sleep(delay)
460 retries -= 1
461 self.info("retrying (retries=%u delay=%u)" % (retries, delay))
462 continue
463 child.expect("password:")
464 child.sendline(password)
465 i = child.expect(["C:",
466 "Denying new connections due to the limit on number of connections",
467 "No more connections are allowed to telnet server",
468 "Unable to connect to remote host",
469 "No route to host",
470 "Connection refused",
471 pexpect.EOF])
472 if i != 0:
473 child.close()
474 time.sleep(delay)
475 retries -= 1
476 self.info("retrying (retries=%u delay=%u)" % (retries, delay))
477 continue
478 if set_dns:
479 set_dns = False
480 if self.set_dns(child):
481 continue;
482 if set_route:
483 child.sendline('route add 0.0.0.0 mask 0.0.0.0 ${WIN_DEFAULT_GATEWAY}')
484 child.expect("C:")
485 set_route = False
486 if set_time:
487 self.run_date_time(child, None)
488 set_time = False
489 if run_tlntadmn:
490 self.run_tlntadmn(child)
491 run_tlntadmn = False
492 if disable_firewall:
493 self.disable_firewall(child)
494 disable_firewall = False
495 if set_ip:
496 set_ip = False
497 if self.set_ip(child):
498 set_route = True
499 set_dns = True
500 continue
501 return child
502 raise RuntimeError("Failed to connect with telnet")
504 def kinit(self, username, password):
505 '''use kinit to setup a credentials cache'''
506 self.run_cmd("kdestroy")
507 self.putenv('KRB5CCNAME', "${PREFIX}/ccache.test")
508 username = self.substitute(username)
509 s = username.split('@')
510 if len(s) > 0:
511 s[1] = s[1].upper()
512 username = '@'.join(s)
513 child = self.pexpect_spawn('kinit ' + username)
514 child.expect("Password")
515 child.sendline(password)
516 child.expect(pexpect.EOF)
517 child.close()
518 if child.exitstatus != 0:
519 raise RuntimeError("kinit failed with status %d" % child.exitstatus)
521 def get_domains(self):
522 '''return a dictionary of DNS domains and IPs for named.conf'''
523 ret = {}
524 for v in self.vars:
525 if v[-6:] == "_REALM":
526 base = v[:-6]
527 if base + '_IP' in self.vars:
528 ret[self.vars[base + '_REALM']] = self.vars[base + '_IP']
529 return ret
531 def wait_reboot(self, retries=3):
532 '''wait for a VM to reboot'''
534 # first wait for it to shutdown
535 self.port_wait("${WIN_IP}", 139, wait_for_fail=True, delay=6)
537 # now wait for it to come back. If it fails to come back
538 # then try resetting it
539 while retries > 0:
540 try:
541 self.port_wait("${WIN_IP}", 139)
542 return
543 except:
544 retries -= 1
545 self.vm_reset("${WIN_VM}")
546 self.info("retrying reboot (retries=%u)" % retries)
547 raise RuntimeError(self.substitute("VM ${WIN_VM} failed to reboot"))
549 def get_vms(self):
550 '''return a dictionary of all the configured VM names'''
551 ret = []
552 for v in self.vars:
553 if v[-3:] == "_VM":
554 ret.append(self.vars[v])
555 return ret