1 # Class for actually running tests.
3 # Copyright (c) 2020-2021 Virtuozzo International GmbH
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from pathlib
import Path
29 from multiprocessing
import Pool
30 from typing
import List
, Optional
, Any
, Sequence
, Dict
, \
33 from testenv
import TestEnv
36 def silent_unlink(path
: Path
) -> None:
43 def file_diff(file1
: str, file2
: str) -> List
[str]:
44 with
open(file1
, encoding
="utf-8") as f1
, \
45 open(file2
, encoding
="utf-8") as f2
:
46 # We want to ignore spaces at line ends. There are a lot of mess about
48 # TODO: fix all tests to not produce extra spaces, fix all .out files
49 # and use strict diff here!
50 seq1
= [line
.rstrip() for line
in f1
]
51 seq2
= [line
.rstrip() for line
in f2
]
53 for line
in difflib
.unified_diff(seq1
, seq2
, file1
, file2
)]
57 class LastElapsedTime(ContextManager
['LastElapsedTime']):
58 """ Cache for elapsed time for tests, to show it during new test run
60 It is safe to use get() at any time. To use update(), you must either
61 use it inside with-block or use save() after update().
63 def __init__(self
, cache_file
: str, env
: TestEnv
) -> None:
65 self
.cache_file
= cache_file
66 self
.cache
: Dict
[str, Dict
[str, Dict
[str, float]]]
69 with
open(cache_file
, encoding
="utf-8") as f
:
70 self
.cache
= json
.load(f
)
71 except (OSError, ValueError):
74 def get(self
, test
: str,
75 default
: Optional
[float] = None) -> Optional
[float]:
76 if test
not in self
.cache
:
79 if self
.env
.imgproto
not in self
.cache
[test
]:
82 return self
.cache
[test
][self
.env
.imgproto
].get(self
.env
.imgfmt
,
85 def update(self
, test
: str, elapsed
: float) -> None:
86 d
= self
.cache
.setdefault(test
, {})
87 d
.setdefault(self
.env
.imgproto
, {})[self
.env
.imgfmt
] = elapsed
89 def save(self
) -> None:
90 with
open(self
.cache_file
, 'w', encoding
="utf-8") as f
:
91 json
.dump(self
.cache
, f
)
93 def __enter__(self
) -> 'LastElapsedTime':
96 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
101 def __init__(self
, status
: str, description
: str = '',
102 elapsed
: Optional
[float] = None, diff
: Sequence
[str] = (),
103 casenotrun
: str = '', interrupted
: bool = False) -> None:
105 self
.description
= description
106 self
.elapsed
= elapsed
108 self
.casenotrun
= casenotrun
109 self
.interrupted
= interrupted
112 class TestRunner(ContextManager
['TestRunner']):
116 def proc_run_test(test
: str, test_field_width
: int) -> TestResult
:
117 # We are in a subprocess, we can't change the runner object!
118 runner
= TestRunner
.shared_self
119 assert runner
is not None
120 return runner
.run_test(test
, test_field_width
, mp
=True)
122 def run_tests_pool(self
, tests
: List
[str],
123 test_field_width
: int, jobs
: int) -> List
[TestResult
]:
125 # passing self directly to Pool.starmap() just doesn't work, because
126 # it's a context manager.
127 assert TestRunner
.shared_self
is None
128 TestRunner
.shared_self
= self
130 with
Pool(jobs
) as p
:
131 results
= p
.starmap(self
.proc_run_test
,
132 zip(tests
, [test_field_width
] * len(tests
)))
134 TestRunner
.shared_self
= None
138 def __init__(self
, env
: TestEnv
, tap
: bool = False,
139 color
: str = 'auto') -> None:
142 self
.last_elapsed
= LastElapsedTime('.last-elapsed-cache', env
)
144 assert color
in ('auto', 'on', 'off')
145 self
.color
= (color
== 'on') or (color
== 'auto' and
148 self
._stack
: contextlib
.ExitStack
150 def __enter__(self
) -> 'TestRunner':
151 self
._stack
= contextlib
.ExitStack()
152 self
._stack
.enter_context(self
.env
)
153 self
._stack
.enter_context(self
.last_elapsed
)
156 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
159 def test_print_one_line(self
, test
: str,
160 test_field_width
: int,
162 endtime
: Optional
[str] = None, status
: str = '...',
163 lasttime
: Optional
[float] = None,
164 thistime
: Optional
[float] = None,
165 description
: str = '',
166 end
: str = '\n') -> None:
167 """ Print short test info before/after test run """
168 test
= os
.path
.basename(test
)
170 if test_field_width
is None:
175 print(f
'ok {self.env.imgfmt} {test}')
176 elif status
== 'fail':
177 print(f
'not ok {self.env.imgfmt} {test}')
178 elif status
== 'not run':
179 print(f
'ok {self.env.imgfmt} {test} # SKIP')
183 lasttime_s
= f
' (last: {lasttime:.1f}s)'
187 thistime_s
= f
'{thistime:.1f}s'
192 endtime
= f
'[{endtime}]'
199 elif status
== 'fail':
200 col
= '\033[1m\033[31m'
201 elif status
== 'not run':
211 print(f
'{test:{test_field_width}} {col}{status:10}{col_end} '
212 f
'[{starttime}] {endtime:13}{thistime_s:5} {lasttime_s:14} '
213 f
'{description}', end
=end
)
215 def find_reference(self
, test
: str) -> str:
216 if self
.env
.cachemode
== 'none':
217 ref
= f
'{test}.out.nocache'
218 if os
.path
.isfile(ref
):
221 ref
= f
'{test}.out.{self.env.imgfmt}'
222 if os
.path
.isfile(ref
):
225 ref
= f
'{test}.{self.env.qemu_default_machine}.out'
226 if os
.path
.isfile(ref
):
231 def do_run_test(self
, test
: str) -> TestResult
:
235 :param test: test file path
237 Note: this method may be called from subprocess, so it does not
238 change ``self`` object in any way!
242 f_reference
= Path(self
.find_reference(test
))
244 if not f_test
.exists():
245 return TestResult(status
='fail',
246 description
=f
'No such test file: {f_test}')
248 if not os
.access(str(f_test
), os
.X_OK
):
249 sys
.exit(f
'Not executable: {f_test}')
251 if not f_reference
.exists():
252 return TestResult(status
='not run',
253 description
='No qualified output '
254 f
'(expected {f_reference})')
256 args
= [str(f_test
.resolve())]
257 env
= self
.env
.prepare_subprocess(args
)
259 # Split test directories, so that tests running in parallel don't
261 for d
in ['TEST_DIR', 'SOCK_DIR']:
262 env
[d
] = os
.path
.join(
264 f
"{self.env.imgfmt}-{self.env.imgproto}-{f_test.name}")
265 Path(env
[d
]).mkdir(parents
=True, exist_ok
=True)
267 test_dir
= env
['TEST_DIR']
268 f_bad
= Path(test_dir
, f_test
.name
+ '.out.bad')
269 f_notrun
= Path(test_dir
, f_test
.name
+ '.notrun')
270 f_casenotrun
= Path(test_dir
, f_test
.name
+ '.casenotrun')
272 for p
in (f_notrun
, f_casenotrun
):
276 with f_bad
.open('w', encoding
="utf-8") as f
:
277 with subprocess
.Popen(args
, cwd
=str(f_test
.parent
), env
=env
,
278 stdin
=subprocess
.DEVNULL
,
279 stdout
=f
, stderr
=subprocess
.STDOUT
) as proc
:
282 except KeyboardInterrupt:
285 return TestResult(status
='not run',
286 description
='Interrupted by user',
288 ret
= proc
.returncode
290 elapsed
= round(time
.time() - t0
, 1)
293 return TestResult(status
='fail', elapsed
=elapsed
,
294 description
=f
'failed, exit status {ret}',
295 diff
=file_diff(str(f_reference
), str(f_bad
)))
297 if f_notrun
.exists():
300 description
=f_notrun
.read_text(encoding
='utf-8').strip())
303 if f_casenotrun
.exists():
304 casenotrun
= f_casenotrun
.read_text(encoding
='utf-8')
306 diff
= file_diff(str(f_reference
), str(f_bad
))
308 if os
.environ
.get("QEMU_IOTESTS_REGEN", None) is not None:
309 shutil
.copyfile(str(f_bad
), str(f_reference
))
310 print("########################################")
311 print("##### REFERENCE FILE UPDATED #####")
312 print("########################################")
313 return TestResult(status
='fail', elapsed
=elapsed
,
314 description
=f
'output mismatch (see {f_bad})',
315 diff
=diff
, casenotrun
=casenotrun
)
318 return TestResult(status
='pass', elapsed
=elapsed
,
319 casenotrun
=casenotrun
)
321 def run_test(self
, test
: str,
322 test_field_width
: int,
323 mp
: bool = False) -> TestResult
:
325 Run one test and print short status
327 :param test: test file path
328 :param test_field_width: width for first field of status format
329 :param mp: if true, we are in a multiprocessing environment, don't try
330 to rewrite things in stdout
332 Note: this method may be called from subprocess, so it does not
333 change ``self`` object in any way!
336 last_el
= self
.last_elapsed
.get(test
)
337 start
= datetime
.datetime
.now().strftime('%H:%M:%S')
340 self
.test_print_one_line(test
=test
,
341 test_field_width
=test_field_width
,
342 status
= 'started' if mp
else '...',
345 end
= '\n' if mp
else '\r')
347 testname
= os
.path
.basename(test
)
348 print(f
'# running {self.env.imgfmt} {testname}')
350 res
= self
.do_run_test(test
)
352 end
= datetime
.datetime
.now().strftime('%H:%M:%S')
353 self
.test_print_one_line(test
=test
,
354 test_field_width
=test_field_width
,
356 starttime
=start
, endtime
=end
,
357 lasttime
=last_el
, thistime
=res
.elapsed
,
358 description
=res
.description
)
362 print('#' + res
.casenotrun
.replace('\n', '\n#'))
364 print(res
.casenotrun
)
369 def run_tests(self
, tests
: List
[str], jobs
: int = 1) -> bool:
376 print('TAP version 13')
377 self
.env
.print_env('# ')
378 print('1..%d' % len(tests
))
382 test_field_width
= max(len(os
.path
.basename(t
)) for t
in tests
) + 2
385 results
= self
.run_tests_pool(tests
, test_field_width
, jobs
)
387 for i
, t
in enumerate(tests
):
388 name
= os
.path
.basename(t
)
393 res
= self
.run_test(t
, test_field_width
)
395 assert res
.status
in ('pass', 'fail', 'not run')
400 if res
.status
!= 'not run':
403 if res
.status
== 'fail':
407 print('\n'.join(res
.diff
), file=sys
.stderr
)
409 print('\n'.join(res
.diff
))
410 elif res
.status
== 'not run':
412 elif res
.status
== 'pass':
413 assert res
.elapsed
is not None
414 self
.last_elapsed
.update(t
, res
.elapsed
)
422 print('Not run:', ' '.join(notrun
))
425 print('Some cases not run in:', ' '.join(casenotrun
))
428 print('Failures:', ' '.join(failed
))
429 print(f
'Failed {len(failed)} of {n_run} iotests')
431 print(f
'Passed all {n_run} iotests')