virt.virt_test_utils: run_autotest - 'tar' needs relative paths to strip the leading '/'
[autotest-zwu.git] / client / bin / harness_ABAT.py
blob2ee8694e9c7db9326ba6fc327607d1cebe0452ab
1 """The ABAT harness interface
3 The interface as required for ABAT.
4 """
6 __author__ = """Copyright Andy Whitcroft 2006"""
8 from autotest_lib.client.bin import utils
9 import os, harness, time, re
11 def autobench_load(fn):
12 disks = re.compile(r'^\s*DATS_FREE_DISKS\s*=(.*\S)\s*$')
13 parts = re.compile(r'^\s*DATS_FREE_PARTITIONS\s*=(.*\S)\s*$')
14 modules = re.compile(r'^\s*INITRD_MODULES\s*=(.*\S)\s*$')
16 conf = {}
18 try:
19 fd = file(fn, "r")
20 except:
21 return conf
22 for ln in fd.readlines():
23 m = disks.match(ln)
24 if m:
25 val = m.groups()[0]
26 conf['disks'] = val.strip('"').split()
27 m = parts.match(ln)
28 if m:
29 val = m.groups()[0]
30 conf['partitions'] = val.strip('"').split()
31 m = modules.match(ln)
32 if m:
33 val = m.groups()[0]
34 conf['modules'] = val.strip('"').split()
35 fd.close()
37 return conf
40 class harness_ABAT(harness.harness):
41 """The ABAT server harness
43 Properties:
44 job
45 The job object for this job
46 """
48 def __init__(self, job, harness_args):
49 """
50 job
51 The job object for this job
52 """
53 self.setup(job)
55 if 'ABAT_STATUS' in os.environ:
56 self.status = file(os.environ['ABAT_STATUS'], "w")
57 else:
58 self.status = None
61 def __send(self, msg):
62 if self.status:
63 msg = msg.rstrip()
64 self.status.write(msg + "\n")
65 self.status.flush()
68 def __send_status(self, code, subdir, operation, msg):
69 self.__send("STATUS %s %s %s %s" % (code, subdir, operation, msg))
72 def __root_device(self):
73 device = None
74 root = re.compile(r'^\S*(/dev/\S+).*\s/\s*$')
76 df = utils.system_output('df -lP')
77 for line in df.split("\n"):
78 m = root.match(line)
79 if m:
80 device = m.groups()[0]
82 return device
85 def run_start(self):
86 """A run within this job is starting"""
87 self.__send_status('GOOD', '----', '----', 'run starting')
89 # Load up the autobench.conf if it exists.
90 conf = autobench_load("/etc/autobench.conf")
91 if 'partitions' in conf:
92 self.job.config_set('partition.partitions',
93 conf['partitions'])
95 # Search the boot loader configuration for the autobench entry,
96 # and extract its args.
97 args = None
98 for entry in self.job.bootloader.get_entries().itervalues():
99 if entry['title'].startswith('autobench'):
100 args = entry.get('args')
102 if args:
103 args = re.sub(r'autobench_args:.*', '', args)
104 args = re.sub(r'root=\S*', '', args)
105 args += " root=" + self.__root_device()
107 self.job.config_set('boot.default_args', args)
109 # Turn off boot_once semantics.
110 self.job.config_set('boot.set_default', True)
112 # For RedHat installs we do not load up the module.conf
113 # as they cannot be builtin. Pass them as arguments.
114 vendor = utils.get_os_vendor()
115 if vendor in ['Red Hat', 'Fedora Core'] and 'modules' in conf:
116 args = '--allow-missing'
117 for mod in conf['modules']:
118 args += " --with " + mod
119 self.job.config_set('kernel.mkinitrd_extra_args', args)
122 def run_reboot(self):
123 """A run within this job is performing a reboot
124 (expect continue following reboot)
126 self.__send("REBOOT")
129 def run_complete(self):
130 """A run within this job is completing (all done)"""
131 self.__send("DONE")
134 def test_status_detail(self, code, subdir, operation, msg, tag,
135 optional_fields):
136 """A test within this job is completing (detail)"""
138 # Send the first line with the status code as a STATUS message.
139 lines = msg.split("\n")
140 self.__send_status(code, subdir, operation, lines[0])
143 def test_status(self, msg, tag):
144 lines = msg.split("\n")
146 # Send each line as a SUMMARY message.
147 for line in lines:
148 self.__send("SUMMARY :" + line)