Merge tag 'pull-request-2024-10-21' of https://gitlab.com/thuth/qemu into staging
[qemu/kevin.git] / tests / qemu-iotests / testrunner.py
blob2e236c8fa3906928c488c453c139ec3125b69ec8
1 # Class for actually running tests.
3 # Copyright (c) 2020-2021 Virtuozzo International GmbH
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import os
20 from pathlib import Path
21 import datetime
22 import time
23 import difflib
24 import subprocess
25 import contextlib
26 import json
27 import shutil
28 import sys
29 from multiprocessing import Pool
30 from typing import List, Optional, Any, Sequence, Dict
31 from testenv import TestEnv
33 if sys.version_info >= (3, 9):
34 from contextlib import AbstractContextManager as ContextManager
35 else:
36 from typing import ContextManager
39 def silent_unlink(path: Path) -> None:
40 try:
41 path.unlink()
42 except OSError:
43 pass
46 def file_diff(file1: str, file2: str) -> List[str]:
47 with open(file1, encoding="utf-8") as f1, \
48 open(file2, encoding="utf-8") as f2:
49 # We want to ignore spaces at line ends. There are a lot of mess about
50 # it in iotests.
51 # TODO: fix all tests to not produce extra spaces, fix all .out files
52 # and use strict diff here!
53 seq1 = [line.rstrip() for line in f1]
54 seq2 = [line.rstrip() for line in f2]
55 res = [line.rstrip()
56 for line in difflib.unified_diff(seq1, seq2, file1, file2)]
57 return res
60 class LastElapsedTime(ContextManager['LastElapsedTime']):
61 """ Cache for elapsed time for tests, to show it during new test run
63 It is safe to use get() at any time. To use update(), you must either
64 use it inside with-block or use save() after update().
65 """
66 def __init__(self, cache_file: str, env: TestEnv) -> None:
67 self.env = env
68 self.cache_file = cache_file
69 self.cache: Dict[str, Dict[str, Dict[str, float]]]
71 try:
72 with open(cache_file, encoding="utf-8") as f:
73 self.cache = json.load(f)
74 except (OSError, ValueError):
75 self.cache = {}
77 def get(self, test: str,
78 default: Optional[float] = None) -> Optional[float]:
79 if test not in self.cache:
80 return default
82 if self.env.imgproto not in self.cache[test]:
83 return default
85 return self.cache[test][self.env.imgproto].get(self.env.imgfmt,
86 default)
88 def update(self, test: str, elapsed: float) -> None:
89 d = self.cache.setdefault(test, {})
90 d.setdefault(self.env.imgproto, {})[self.env.imgfmt] = elapsed
92 def save(self) -> None:
93 with open(self.cache_file, 'w', encoding="utf-8") as f:
94 json.dump(self.cache, f)
96 def __enter__(self) -> 'LastElapsedTime':
97 return self
99 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
100 self.save()
103 class TestResult:
104 def __init__(self, status: str, description: str = '',
105 elapsed: Optional[float] = None, diff: Sequence[str] = (),
106 casenotrun: str = '', interrupted: bool = False) -> None:
107 self.status = status
108 self.description = description
109 self.elapsed = elapsed
110 self.diff = diff
111 self.casenotrun = casenotrun
112 self.interrupted = interrupted
115 class TestRunner(ContextManager['TestRunner']):
116 shared_self = None
118 @staticmethod
119 def proc_run_test(test: str, test_field_width: int) -> TestResult:
120 # We are in a subprocess, we can't change the runner object!
121 runner = TestRunner.shared_self
122 assert runner is not None
123 return runner.run_test(test, test_field_width, mp=True)
125 def run_tests_pool(self, tests: List[str],
126 test_field_width: int, jobs: int) -> List[TestResult]:
128 # passing self directly to Pool.starmap() just doesn't work, because
129 # it's a context manager.
130 assert TestRunner.shared_self is None
131 TestRunner.shared_self = self
133 with Pool(jobs) as p:
134 results = p.starmap(self.proc_run_test,
135 zip(tests, [test_field_width] * len(tests)))
137 TestRunner.shared_self = None
139 return results
141 def __init__(self, env: TestEnv, tap: bool = False,
142 color: str = 'auto') -> None:
143 self.env = env
144 self.tap = tap
145 self.last_elapsed = LastElapsedTime('.last-elapsed-cache', env)
147 assert color in ('auto', 'on', 'off')
148 self.color = (color == 'on') or (color == 'auto' and
149 sys.stdout.isatty())
151 self._stack: contextlib.ExitStack
153 def __enter__(self) -> 'TestRunner':
154 self._stack = contextlib.ExitStack()
155 self._stack.enter_context(self.env)
156 self._stack.enter_context(self.last_elapsed)
157 return self
159 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
160 self._stack.close()
162 def test_print_one_line(self, test: str,
163 test_field_width: int,
164 starttime: str,
165 endtime: Optional[str] = None, status: str = '...',
166 lasttime: Optional[float] = None,
167 thistime: Optional[float] = None,
168 description: str = '',
169 end: str = '\n') -> None:
170 """ Print short test info before/after test run """
171 test = os.path.basename(test)
173 if test_field_width is None:
174 test_field_width = 8
176 if self.tap:
177 if status == 'pass':
178 print(f'ok {self.env.imgfmt} {test}')
179 elif status == 'fail':
180 print(f'not ok {self.env.imgfmt} {test}')
181 elif status == 'not run':
182 print(f'ok {self.env.imgfmt} {test} # SKIP')
183 return
185 if lasttime:
186 lasttime_s = f' (last: {lasttime:.1f}s)'
187 else:
188 lasttime_s = ''
189 if thistime:
190 thistime_s = f'{thistime:.1f}s'
191 else:
192 thistime_s = '...'
194 if endtime:
195 endtime = f'[{endtime}]'
196 else:
197 endtime = ''
199 if self.color:
200 if status == 'pass':
201 col = '\033[32m'
202 elif status == 'fail':
203 col = '\033[1m\033[31m'
204 elif status == 'not run':
205 col = '\033[33m'
206 else:
207 col = ''
209 col_end = '\033[0m'
210 else:
211 col = ''
212 col_end = ''
214 print(f'{test:{test_field_width}} {col}{status:10}{col_end} '
215 f'[{starttime}] {endtime:13}{thistime_s:5} {lasttime_s:14} '
216 f'{description}', end=end)
218 def find_reference(self, test: str) -> str:
219 if self.env.cachemode == 'none':
220 ref = f'{test}.out.nocache'
221 if os.path.isfile(ref):
222 return ref
224 ref = f'{test}.out.{self.env.imgfmt}'
225 if os.path.isfile(ref):
226 return ref
228 ref = f'{test}.{self.env.qemu_default_machine}.out'
229 if os.path.isfile(ref):
230 return ref
232 return f'{test}.out'
234 def do_run_test(self, test: str) -> TestResult:
236 Run one test
238 :param test: test file path
240 Note: this method may be called from subprocess, so it does not
241 change ``self`` object in any way!
244 f_test = Path(test)
245 f_reference = Path(self.find_reference(test))
247 if not f_test.exists():
248 return TestResult(status='fail',
249 description=f'No such test file: {f_test}')
251 if not os.access(str(f_test), os.X_OK):
252 sys.exit(f'Not executable: {f_test}')
254 if not f_reference.exists():
255 return TestResult(status='not run',
256 description='No qualified output '
257 f'(expected {f_reference})')
259 args = [str(f_test.resolve())]
260 env = self.env.prepare_subprocess(args)
262 # Split test directories, so that tests running in parallel don't
263 # break each other.
264 for d in ['TEST_DIR', 'SOCK_DIR']:
265 env[d] = os.path.join(
266 env[d],
267 f"{self.env.imgfmt}-{self.env.imgproto}-{f_test.name}")
268 Path(env[d]).mkdir(parents=True, exist_ok=True)
270 test_dir = env['TEST_DIR']
271 f_bad = Path(test_dir, f_test.name + '.out.bad')
272 f_notrun = Path(test_dir, f_test.name + '.notrun')
273 f_casenotrun = Path(test_dir, f_test.name + '.casenotrun')
275 for p in (f_notrun, f_casenotrun):
276 silent_unlink(p)
278 t0 = time.time()
279 with f_bad.open('w', encoding="utf-8") as f:
280 with subprocess.Popen(args, cwd=str(f_test.parent), env=env,
281 stdin=subprocess.DEVNULL,
282 stdout=f, stderr=subprocess.STDOUT) as proc:
283 try:
284 proc.wait()
285 except KeyboardInterrupt:
286 proc.terminate()
287 proc.wait()
288 return TestResult(status='not run',
289 description='Interrupted by user',
290 interrupted=True)
291 ret = proc.returncode
293 elapsed = round(time.time() - t0, 1)
295 if ret != 0:
296 return TestResult(status='fail', elapsed=elapsed,
297 description=f'failed, exit status {ret}',
298 diff=file_diff(str(f_reference), str(f_bad)))
300 if f_notrun.exists():
301 return TestResult(
302 status='not run',
303 description=f_notrun.read_text(encoding='utf-8').strip())
305 casenotrun = ''
306 if f_casenotrun.exists():
307 casenotrun = f_casenotrun.read_text(encoding='utf-8')
309 diff = file_diff(str(f_reference), str(f_bad))
310 if diff:
311 if os.environ.get("QEMU_IOTESTS_REGEN", None) is not None:
312 shutil.copyfile(str(f_bad), str(f_reference))
313 print("########################################")
314 print("##### REFERENCE FILE UPDATED #####")
315 print("########################################")
316 return TestResult(status='fail', elapsed=elapsed,
317 description=f'output mismatch (see {f_bad})',
318 diff=diff, casenotrun=casenotrun)
319 else:
320 f_bad.unlink()
321 return TestResult(status='pass', elapsed=elapsed,
322 casenotrun=casenotrun)
324 def run_test(self, test: str,
325 test_field_width: int,
326 mp: bool = False) -> TestResult:
328 Run one test and print short status
330 :param test: test file path
331 :param test_field_width: width for first field of status format
332 :param mp: if true, we are in a multiprocessing environment, don't try
333 to rewrite things in stdout
335 Note: this method may be called from subprocess, so it does not
336 change ``self`` object in any way!
339 last_el = self.last_elapsed.get(test)
340 start = datetime.datetime.now().strftime('%H:%M:%S')
342 if not self.tap:
343 self.test_print_one_line(test=test,
344 test_field_width=test_field_width,
345 status = 'started' if mp else '...',
346 starttime=start,
347 lasttime=last_el,
348 end = '\n' if mp else '\r')
349 else:
350 testname = os.path.basename(test)
351 print(f'# running {self.env.imgfmt} {testname}')
353 res = self.do_run_test(test)
355 end = datetime.datetime.now().strftime('%H:%M:%S')
356 self.test_print_one_line(test=test,
357 test_field_width=test_field_width,
358 status=res.status,
359 starttime=start, endtime=end,
360 lasttime=last_el, thistime=res.elapsed,
361 description=res.description)
363 if res.casenotrun:
364 if self.tap:
365 print('#' + res.casenotrun.replace('\n', '\n#'))
366 else:
367 print(res.casenotrun)
369 sys.stdout.flush()
370 return res
372 def run_tests(self, tests: List[str], jobs: int = 1) -> bool:
373 n_run = 0
374 failed = []
375 notrun = []
376 casenotrun = []
378 if self.tap:
379 print('TAP version 13')
380 self.env.print_env('# ')
381 print('1..%d' % len(tests))
382 else:
383 self.env.print_env()
385 test_field_width = max(len(os.path.basename(t)) for t in tests) + 2
387 if jobs > 1:
388 results = self.run_tests_pool(tests, test_field_width, jobs)
390 for i, t in enumerate(tests):
391 name = os.path.basename(t)
393 if jobs > 1:
394 res = results[i]
395 else:
396 res = self.run_test(t, test_field_width)
398 assert res.status in ('pass', 'fail', 'not run')
400 if res.casenotrun:
401 casenotrun.append(t)
403 if res.status != 'not run':
404 n_run += 1
406 if res.status == 'fail':
407 failed.append(name)
408 if res.diff:
409 if self.tap:
410 print('\n'.join(res.diff), file=sys.stderr)
411 else:
412 print('\n'.join(res.diff))
413 elif res.status == 'not run':
414 notrun.append(name)
415 elif res.status == 'pass':
416 assert res.elapsed is not None
417 self.last_elapsed.update(t, res.elapsed)
419 sys.stdout.flush()
420 if res.interrupted:
421 break
423 if not self.tap:
424 if notrun:
425 print('Not run:', ' '.join(notrun))
427 if casenotrun:
428 print('Some cases not run in:', ' '.join(casenotrun))
430 if failed:
431 print('Failures:', ' '.join(failed))
432 print(f'Failed {len(failed)} of {n_run} iotests')
433 else:
434 print(f'Passed all {n_run} iotests')
435 return not failed