target: Introduce and use OBJECT_DECLARE_CPU_TYPE() macro
[qemu.git] / tests / qemu-iotests / testrunner.py
blob9a942739754f72cdd846871df8ebddcd6d6aba88
1 # Class for actually running tests.
3 # Copyright (c) 2020-2021 Virtuozzo International GmbH
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import os
20 from pathlib import Path
21 import datetime
22 import time
23 import difflib
24 import subprocess
25 import contextlib
26 import json
27 import termios
28 import sys
29 from multiprocessing import Pool
30 from contextlib import contextmanager
31 from typing import List, Optional, Iterator, Any, Sequence, Dict, \
32 ContextManager
34 from testenv import TestEnv
37 def silent_unlink(path: Path) -> None:
38 try:
39 path.unlink()
40 except OSError:
41 pass
44 def file_diff(file1: str, file2: str) -> List[str]:
45 with open(file1, encoding="utf-8") as f1, \
46 open(file2, encoding="utf-8") as f2:
47 # We want to ignore spaces at line ends. There are a lot of mess about
48 # it in iotests.
49 # TODO: fix all tests to not produce extra spaces, fix all .out files
50 # and use strict diff here!
51 seq1 = [line.rstrip() for line in f1]
52 seq2 = [line.rstrip() for line in f2]
53 res = [line.rstrip()
54 for line in difflib.unified_diff(seq1, seq2, file1, file2)]
55 return res
58 # We want to save current tty settings during test run,
59 # since an aborting qemu call may leave things screwed up.
60 @contextmanager
61 def savetty() -> Iterator[None]:
62 isterm = sys.stdin.isatty()
63 if isterm:
64 fd = sys.stdin.fileno()
65 attr = termios.tcgetattr(fd)
67 try:
68 yield
69 finally:
70 if isterm:
71 termios.tcsetattr(fd, termios.TCSADRAIN, attr)
74 class LastElapsedTime(ContextManager['LastElapsedTime']):
75 """ Cache for elapsed time for tests, to show it during new test run
77 It is safe to use get() at any time. To use update(), you must either
78 use it inside with-block or use save() after update().
79 """
80 def __init__(self, cache_file: str, env: TestEnv) -> None:
81 self.env = env
82 self.cache_file = cache_file
83 self.cache: Dict[str, Dict[str, Dict[str, float]]]
85 try:
86 with open(cache_file, encoding="utf-8") as f:
87 self.cache = json.load(f)
88 except (OSError, ValueError):
89 self.cache = {}
91 def get(self, test: str,
92 default: Optional[float] = None) -> Optional[float]:
93 if test not in self.cache:
94 return default
96 if self.env.imgproto not in self.cache[test]:
97 return default
99 return self.cache[test][self.env.imgproto].get(self.env.imgfmt,
100 default)
102 def update(self, test: str, elapsed: float) -> None:
103 d = self.cache.setdefault(test, {})
104 d.setdefault(self.env.imgproto, {})[self.env.imgfmt] = elapsed
106 def save(self) -> None:
107 with open(self.cache_file, 'w', encoding="utf-8") as f:
108 json.dump(self.cache, f)
110 def __enter__(self) -> 'LastElapsedTime':
111 return self
113 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
114 self.save()
117 class TestResult:
118 def __init__(self, status: str, description: str = '',
119 elapsed: Optional[float] = None, diff: Sequence[str] = (),
120 casenotrun: str = '', interrupted: bool = False) -> None:
121 self.status = status
122 self.description = description
123 self.elapsed = elapsed
124 self.diff = diff
125 self.casenotrun = casenotrun
126 self.interrupted = interrupted
129 class TestRunner(ContextManager['TestRunner']):
130 shared_self = None
132 @staticmethod
133 def proc_run_test(test: str, test_field_width: int) -> TestResult:
134 # We are in a subprocess, we can't change the runner object!
135 runner = TestRunner.shared_self
136 assert runner is not None
137 return runner.run_test(test, test_field_width, mp=True)
139 def run_tests_pool(self, tests: List[str],
140 test_field_width: int, jobs: int) -> List[TestResult]:
142 # passing self directly to Pool.starmap() just doesn't work, because
143 # it's a context manager.
144 assert TestRunner.shared_self is None
145 TestRunner.shared_self = self
147 with Pool(jobs) as p:
148 results = p.starmap(self.proc_run_test,
149 zip(tests, [test_field_width] * len(tests)))
151 TestRunner.shared_self = None
153 return results
155 def __init__(self, env: TestEnv, tap: bool = False,
156 color: str = 'auto') -> None:
157 self.env = env
158 self.tap = tap
159 self.last_elapsed = LastElapsedTime('.last-elapsed-cache', env)
161 assert color in ('auto', 'on', 'off')
162 self.color = (color == 'on') or (color == 'auto' and
163 sys.stdout.isatty())
165 self._stack: contextlib.ExitStack
167 def __enter__(self) -> 'TestRunner':
168 self._stack = contextlib.ExitStack()
169 self._stack.enter_context(self.env)
170 self._stack.enter_context(self.last_elapsed)
171 self._stack.enter_context(savetty())
172 return self
174 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
175 self._stack.close()
177 def test_print_one_line(self, test: str,
178 test_field_width: int,
179 starttime: str,
180 endtime: Optional[str] = None, status: str = '...',
181 lasttime: Optional[float] = None,
182 thistime: Optional[float] = None,
183 description: str = '',
184 end: str = '\n') -> None:
185 """ Print short test info before/after test run """
186 test = os.path.basename(test)
188 if test_field_width is None:
189 test_field_width = 8
191 if self.tap:
192 if status == 'pass':
193 print(f'ok {self.env.imgfmt} {test}')
194 elif status == 'fail':
195 print(f'not ok {self.env.imgfmt} {test}')
196 elif status == 'not run':
197 print(f'ok {self.env.imgfmt} {test} # SKIP')
198 return
200 if lasttime:
201 lasttime_s = f' (last: {lasttime:.1f}s)'
202 else:
203 lasttime_s = ''
204 if thistime:
205 thistime_s = f'{thistime:.1f}s'
206 else:
207 thistime_s = '...'
209 if endtime:
210 endtime = f'[{endtime}]'
211 else:
212 endtime = ''
214 if self.color:
215 if status == 'pass':
216 col = '\033[32m'
217 elif status == 'fail':
218 col = '\033[1m\033[31m'
219 elif status == 'not run':
220 col = '\033[33m'
221 else:
222 col = ''
224 col_end = '\033[0m'
225 else:
226 col = ''
227 col_end = ''
229 print(f'{test:{test_field_width}} {col}{status:10}{col_end} '
230 f'[{starttime}] {endtime:13}{thistime_s:5} {lasttime_s:14} '
231 f'{description}', end=end)
233 def find_reference(self, test: str) -> str:
234 if self.env.cachemode == 'none':
235 ref = f'{test}.out.nocache'
236 if os.path.isfile(ref):
237 return ref
239 ref = f'{test}.out.{self.env.imgfmt}'
240 if os.path.isfile(ref):
241 return ref
243 ref = f'{test}.{self.env.qemu_default_machine}.out'
244 if os.path.isfile(ref):
245 return ref
247 return f'{test}.out'
249 def do_run_test(self, test: str, mp: bool) -> TestResult:
251 Run one test
253 :param test: test file path
254 :param mp: if true, we are in a multiprocessing environment, use
255 personal subdirectories for test run
257 Note: this method may be called from subprocess, so it does not
258 change ``self`` object in any way!
261 f_test = Path(test)
262 f_bad = Path(f_test.name + '.out.bad')
263 f_notrun = Path(f_test.name + '.notrun')
264 f_casenotrun = Path(f_test.name + '.casenotrun')
265 f_reference = Path(self.find_reference(test))
267 if not f_test.exists():
268 return TestResult(status='fail',
269 description=f'No such test file: {f_test}')
271 if not os.access(str(f_test), os.X_OK):
272 sys.exit(f'Not executable: {f_test}')
274 if not f_reference.exists():
275 return TestResult(status='not run',
276 description='No qualified output '
277 f'(expected {f_reference})')
279 for p in (f_bad, f_notrun, f_casenotrun):
280 silent_unlink(p)
282 args = [str(f_test.resolve())]
283 env = self.env.prepare_subprocess(args)
284 if mp:
285 # Split test directories, so that tests running in parallel don't
286 # break each other.
287 for d in ['TEST_DIR', 'SOCK_DIR']:
288 env[d] = os.path.join(env[d], f_test.name)
289 Path(env[d]).mkdir(parents=True, exist_ok=True)
291 t0 = time.time()
292 with f_bad.open('w', encoding="utf-8") as f:
293 with subprocess.Popen(args, cwd=str(f_test.parent), env=env,
294 stdout=f, stderr=subprocess.STDOUT) as proc:
295 try:
296 proc.wait()
297 except KeyboardInterrupt:
298 proc.terminate()
299 proc.wait()
300 return TestResult(status='not run',
301 description='Interrupted by user',
302 interrupted=True)
303 ret = proc.returncode
305 elapsed = round(time.time() - t0, 1)
307 if ret != 0:
308 return TestResult(status='fail', elapsed=elapsed,
309 description=f'failed, exit status {ret}',
310 diff=file_diff(str(f_reference), str(f_bad)))
312 if f_notrun.exists():
313 return TestResult(
314 status='not run',
315 description=f_notrun.read_text(encoding='utf-8').strip())
317 casenotrun = ''
318 if f_casenotrun.exists():
319 casenotrun = f_casenotrun.read_text(encoding='utf-8')
321 diff = file_diff(str(f_reference), str(f_bad))
322 if diff:
323 return TestResult(status='fail', elapsed=elapsed,
324 description=f'output mismatch (see {f_bad})',
325 diff=diff, casenotrun=casenotrun)
326 else:
327 f_bad.unlink()
328 return TestResult(status='pass', elapsed=elapsed,
329 casenotrun=casenotrun)
331 def run_test(self, test: str,
332 test_field_width: int,
333 mp: bool = False) -> TestResult:
335 Run one test and print short status
337 :param test: test file path
338 :param test_field_width: width for first field of status format
339 :param mp: if true, we are in a multiprocessing environment, don't try
340 to rewrite things in stdout
342 Note: this method may be called from subprocess, so it does not
343 change ``self`` object in any way!
346 last_el = self.last_elapsed.get(test)
347 start = datetime.datetime.now().strftime('%H:%M:%S')
349 if not self.tap:
350 self.test_print_one_line(test=test,
351 test_field_width=test_field_width,
352 status = 'started' if mp else '...',
353 starttime=start,
354 lasttime=last_el,
355 end = '\n' if mp else '\r')
357 res = self.do_run_test(test, mp)
359 end = datetime.datetime.now().strftime('%H:%M:%S')
360 self.test_print_one_line(test=test,
361 test_field_width=test_field_width,
362 status=res.status,
363 starttime=start, endtime=end,
364 lasttime=last_el, thistime=res.elapsed,
365 description=res.description)
367 if res.casenotrun:
368 print(res.casenotrun)
370 return res
372 def run_tests(self, tests: List[str], jobs: int = 1) -> bool:
373 n_run = 0
374 failed = []
375 notrun = []
376 casenotrun = []
378 if self.tap:
379 self.env.print_env('# ')
380 else:
381 self.env.print_env()
383 test_field_width = max(len(os.path.basename(t)) for t in tests) + 2
385 if jobs > 1:
386 results = self.run_tests_pool(tests, test_field_width, jobs)
388 for i, t in enumerate(tests):
389 name = os.path.basename(t)
391 if jobs > 1:
392 res = results[i]
393 else:
394 res = self.run_test(t, test_field_width)
396 assert res.status in ('pass', 'fail', 'not run')
398 if res.casenotrun:
399 casenotrun.append(t)
401 if res.status != 'not run':
402 n_run += 1
404 if res.status == 'fail':
405 failed.append(name)
406 if res.diff:
407 if self.tap:
408 print('\n'.join(res.diff), file=sys.stderr)
409 else:
410 print('\n'.join(res.diff))
411 elif res.status == 'not run':
412 notrun.append(name)
413 elif res.status == 'pass':
414 assert res.elapsed is not None
415 self.last_elapsed.update(t, res.elapsed)
417 sys.stdout.flush()
418 if res.interrupted:
419 break
421 if not self.tap:
422 if notrun:
423 print('Not run:', ' '.join(notrun))
425 if casenotrun:
426 print('Some cases not run in:', ' '.join(casenotrun))
428 if failed:
429 print('Failures:', ' '.join(failed))
430 print(f'Failed {len(failed)} of {n_run} iotests')
431 else:
432 print(f'Passed all {n_run} iotests')
433 return not failed