1 # Class for actually running tests.
3 # Copyright (c) 2020-2021 Virtuozzo International GmbH
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from pathlib
import Path
29 from multiprocessing
import Pool
30 from contextlib
import contextmanager
31 from typing
import List
, Optional
, Iterator
, Any
, Sequence
, Dict
, \
34 from testenv
import TestEnv
37 def silent_unlink(path
: Path
) -> None:
44 def file_diff(file1
: str, file2
: str) -> List
[str]:
45 with
open(file1
, encoding
="utf-8") as f1
, \
46 open(file2
, encoding
="utf-8") as f2
:
47 # We want to ignore spaces at line ends. There are a lot of mess about
49 # TODO: fix all tests to not produce extra spaces, fix all .out files
50 # and use strict diff here!
51 seq1
= [line
.rstrip() for line
in f1
]
52 seq2
= [line
.rstrip() for line
in f2
]
54 for line
in difflib
.unified_diff(seq1
, seq2
, file1
, file2
)]
58 # We want to save current tty settings during test run,
59 # since an aborting qemu call may leave things screwed up.
61 def savetty() -> Iterator
[None]:
62 isterm
= sys
.stdin
.isatty()
64 fd
= sys
.stdin
.fileno()
65 attr
= termios
.tcgetattr(fd
)
71 termios
.tcsetattr(fd
, termios
.TCSADRAIN
, attr
)
74 class LastElapsedTime(ContextManager
['LastElapsedTime']):
75 """ Cache for elapsed time for tests, to show it during new test run
77 It is safe to use get() at any time. To use update(), you must either
78 use it inside with-block or use save() after update().
80 def __init__(self
, cache_file
: str, env
: TestEnv
) -> None:
82 self
.cache_file
= cache_file
83 self
.cache
: Dict
[str, Dict
[str, Dict
[str, float]]]
86 with
open(cache_file
, encoding
="utf-8") as f
:
87 self
.cache
= json
.load(f
)
88 except (OSError, ValueError):
91 def get(self
, test
: str,
92 default
: Optional
[float] = None) -> Optional
[float]:
93 if test
not in self
.cache
:
96 if self
.env
.imgproto
not in self
.cache
[test
]:
99 return self
.cache
[test
][self
.env
.imgproto
].get(self
.env
.imgfmt
,
102 def update(self
, test
: str, elapsed
: float) -> None:
103 d
= self
.cache
.setdefault(test
, {})
104 d
.setdefault(self
.env
.imgproto
, {})[self
.env
.imgfmt
] = elapsed
106 def save(self
) -> None:
107 with
open(self
.cache_file
, 'w', encoding
="utf-8") as f
:
108 json
.dump(self
.cache
, f
)
110 def __enter__(self
) -> 'LastElapsedTime':
113 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
118 def __init__(self
, status
: str, description
: str = '',
119 elapsed
: Optional
[float] = None, diff
: Sequence
[str] = (),
120 casenotrun
: str = '', interrupted
: bool = False) -> None:
122 self
.description
= description
123 self
.elapsed
= elapsed
125 self
.casenotrun
= casenotrun
126 self
.interrupted
= interrupted
129 class TestRunner(ContextManager
['TestRunner']):
133 def proc_run_test(test
: str, test_field_width
: int) -> TestResult
:
134 # We are in a subprocess, we can't change the runner object!
135 runner
= TestRunner
.shared_self
136 assert runner
is not None
137 return runner
.run_test(test
, test_field_width
, mp
=True)
139 def run_tests_pool(self
, tests
: List
[str],
140 test_field_width
: int, jobs
: int) -> List
[TestResult
]:
142 # passing self directly to Pool.starmap() just doesn't work, because
143 # it's a context manager.
144 assert TestRunner
.shared_self
is None
145 TestRunner
.shared_self
= self
147 with
Pool(jobs
) as p
:
148 results
= p
.starmap(self
.proc_run_test
,
149 zip(tests
, [test_field_width
] * len(tests
)))
151 TestRunner
.shared_self
= None
155 def __init__(self
, env
: TestEnv
, tap
: bool = False,
156 color
: str = 'auto') -> None:
159 self
.last_elapsed
= LastElapsedTime('.last-elapsed-cache', env
)
161 assert color
in ('auto', 'on', 'off')
162 self
.color
= (color
== 'on') or (color
== 'auto' and
165 self
._stack
: contextlib
.ExitStack
167 def __enter__(self
) -> 'TestRunner':
168 self
._stack
= contextlib
.ExitStack()
169 self
._stack
.enter_context(self
.env
)
170 self
._stack
.enter_context(self
.last_elapsed
)
171 self
._stack
.enter_context(savetty())
174 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
177 def test_print_one_line(self
, test
: str,
178 test_field_width
: int,
180 endtime
: Optional
[str] = None, status
: str = '...',
181 lasttime
: Optional
[float] = None,
182 thistime
: Optional
[float] = None,
183 description
: str = '',
184 end
: str = '\n') -> None:
185 """ Print short test info before/after test run """
186 test
= os
.path
.basename(test
)
188 if test_field_width
is None:
193 print(f
'ok {self.env.imgfmt} {test}')
194 elif status
== 'fail':
195 print(f
'not ok {self.env.imgfmt} {test}')
196 elif status
== 'not run':
197 print(f
'ok {self.env.imgfmt} {test} # SKIP')
201 lasttime_s
= f
' (last: {lasttime:.1f}s)'
205 thistime_s
= f
'{thistime:.1f}s'
210 endtime
= f
'[{endtime}]'
217 elif status
== 'fail':
218 col
= '\033[1m\033[31m'
219 elif status
== 'not run':
229 print(f
'{test:{test_field_width}} {col}{status:10}{col_end} '
230 f
'[{starttime}] {endtime:13}{thistime_s:5} {lasttime_s:14} '
231 f
'{description}', end
=end
)
233 def find_reference(self
, test
: str) -> str:
234 if self
.env
.cachemode
== 'none':
235 ref
= f
'{test}.out.nocache'
236 if os
.path
.isfile(ref
):
239 ref
= f
'{test}.out.{self.env.imgfmt}'
240 if os
.path
.isfile(ref
):
243 ref
= f
'{test}.{self.env.qemu_default_machine}.out'
244 if os
.path
.isfile(ref
):
249 def do_run_test(self
, test
: str, mp
: bool) -> TestResult
:
253 :param test: test file path
254 :param mp: if true, we are in a multiprocessing environment, use
255 personal subdirectories for test run
257 Note: this method may be called from subprocess, so it does not
258 change ``self`` object in any way!
262 f_bad
= Path(f_test
.name
+ '.out.bad')
263 f_notrun
= Path(f_test
.name
+ '.notrun')
264 f_casenotrun
= Path(f_test
.name
+ '.casenotrun')
265 f_reference
= Path(self
.find_reference(test
))
267 if not f_test
.exists():
268 return TestResult(status
='fail',
269 description
=f
'No such test file: {f_test}')
271 if not os
.access(str(f_test
), os
.X_OK
):
272 sys
.exit(f
'Not executable: {f_test}')
274 if not f_reference
.exists():
275 return TestResult(status
='not run',
276 description
='No qualified output '
277 f
'(expected {f_reference})')
279 for p
in (f_bad
, f_notrun
, f_casenotrun
):
282 args
= [str(f_test
.resolve())]
283 env
= self
.env
.prepare_subprocess(args
)
285 # Split test directories, so that tests running in parallel don't
287 for d
in ['TEST_DIR', 'SOCK_DIR']:
288 env
[d
] = os
.path
.join(env
[d
], f_test
.name
)
289 Path(env
[d
]).mkdir(parents
=True, exist_ok
=True)
292 with f_bad
.open('w', encoding
="utf-8") as f
:
293 with subprocess
.Popen(args
, cwd
=str(f_test
.parent
), env
=env
,
294 stdout
=f
, stderr
=subprocess
.STDOUT
) as proc
:
297 except KeyboardInterrupt:
300 return TestResult(status
='not run',
301 description
='Interrupted by user',
303 ret
= proc
.returncode
305 elapsed
= round(time
.time() - t0
, 1)
308 return TestResult(status
='fail', elapsed
=elapsed
,
309 description
=f
'failed, exit status {ret}',
310 diff
=file_diff(str(f_reference
), str(f_bad
)))
312 if f_notrun
.exists():
315 description
=f_notrun
.read_text(encoding
='utf-8').strip())
318 if f_casenotrun
.exists():
319 casenotrun
= f_casenotrun
.read_text(encoding
='utf-8')
321 diff
= file_diff(str(f_reference
), str(f_bad
))
323 return TestResult(status
='fail', elapsed
=elapsed
,
324 description
=f
'output mismatch (see {f_bad})',
325 diff
=diff
, casenotrun
=casenotrun
)
328 return TestResult(status
='pass', elapsed
=elapsed
,
329 casenotrun
=casenotrun
)
331 def run_test(self
, test
: str,
332 test_field_width
: int,
333 mp
: bool = False) -> TestResult
:
335 Run one test and print short status
337 :param test: test file path
338 :param test_field_width: width for first field of status format
339 :param mp: if true, we are in a multiprocessing environment, don't try
340 to rewrite things in stdout
342 Note: this method may be called from subprocess, so it does not
343 change ``self`` object in any way!
346 last_el
= self
.last_elapsed
.get(test
)
347 start
= datetime
.datetime
.now().strftime('%H:%M:%S')
350 self
.test_print_one_line(test
=test
,
351 test_field_width
=test_field_width
,
352 status
= 'started' if mp
else '...',
355 end
= '\n' if mp
else '\r')
357 res
= self
.do_run_test(test
, mp
)
359 end
= datetime
.datetime
.now().strftime('%H:%M:%S')
360 self
.test_print_one_line(test
=test
,
361 test_field_width
=test_field_width
,
363 starttime
=start
, endtime
=end
,
364 lasttime
=last_el
, thistime
=res
.elapsed
,
365 description
=res
.description
)
368 print(res
.casenotrun
)
372 def run_tests(self
, tests
: List
[str], jobs
: int = 1) -> bool:
379 self
.env
.print_env('# ')
383 test_field_width
= max(len(os
.path
.basename(t
)) for t
in tests
) + 2
386 results
= self
.run_tests_pool(tests
, test_field_width
, jobs
)
388 for i
, t
in enumerate(tests
):
389 name
= os
.path
.basename(t
)
394 res
= self
.run_test(t
, test_field_width
)
396 assert res
.status
in ('pass', 'fail', 'not run')
401 if res
.status
!= 'not run':
404 if res
.status
== 'fail':
408 print('\n'.join(res
.diff
), file=sys
.stderr
)
410 print('\n'.join(res
.diff
))
411 elif res
.status
== 'not run':
413 elif res
.status
== 'pass':
414 assert res
.elapsed
is not None
415 self
.last_elapsed
.update(t
, res
.elapsed
)
423 print('Not run:', ' '.join(notrun
))
426 print('Some cases not run in:', ' '.join(casenotrun
))
429 print('Failures:', ' '.join(failed
))
430 print(f
'Failed {len(failed)} of {n_run} iotests')
432 print(f
'Passed all {n_run} iotests')