1 # Class for actually running tests.
3 # Copyright (c) 2020-2021 Virtuozzo International GmbH
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from pathlib
import Path
29 from contextlib
import contextmanager
30 from typing
import List
, Optional
, Iterator
, Any
, Sequence
, Dict
, \
33 from testenv
import TestEnv
36 def silent_unlink(path
: Path
) -> None:
43 def file_diff(file1
: str, file2
: str) -> List
[str]:
44 with
open(file1
, encoding
="utf-8") as f1
, \
45 open(file2
, encoding
="utf-8") as f2
:
46 # We want to ignore spaces at line ends. There are a lot of mess about
48 # TODO: fix all tests to not produce extra spaces, fix all .out files
49 # and use strict diff here!
50 seq1
= [line
.rstrip() for line
in f1
]
51 seq2
= [line
.rstrip() for line
in f2
]
53 for line
in difflib
.unified_diff(seq1
, seq2
, file1
, file2
)]
57 # We want to save current tty settings during test run,
58 # since an aborting qemu call may leave things screwed up.
60 def savetty() -> Iterator
[None]:
61 isterm
= sys
.stdin
.isatty()
63 fd
= sys
.stdin
.fileno()
64 attr
= termios
.tcgetattr(fd
)
70 termios
.tcsetattr(fd
, termios
.TCSADRAIN
, attr
)
73 class LastElapsedTime(ContextManager
['LastElapsedTime']):
74 """ Cache for elapsed time for tests, to show it during new test run
76 It is safe to use get() at any time. To use update(), you must either
77 use it inside with-block or use save() after update().
79 def __init__(self
, cache_file
: str, env
: TestEnv
) -> None:
81 self
.cache_file
= cache_file
82 self
.cache
: Dict
[str, Dict
[str, Dict
[str, float]]]
85 with
open(cache_file
, encoding
="utf-8") as f
:
86 self
.cache
= json
.load(f
)
87 except (OSError, ValueError):
90 def get(self
, test
: str,
91 default
: Optional
[float] = None) -> Optional
[float]:
92 if test
not in self
.cache
:
95 if self
.env
.imgproto
not in self
.cache
[test
]:
98 return self
.cache
[test
][self
.env
.imgproto
].get(self
.env
.imgfmt
,
101 def update(self
, test
: str, elapsed
: float) -> None:
102 d
= self
.cache
.setdefault(test
, {})
103 d
.setdefault(self
.env
.imgproto
, {})[self
.env
.imgfmt
] = elapsed
105 def save(self
) -> None:
106 with
open(self
.cache_file
, 'w', encoding
="utf-8") as f
:
107 json
.dump(self
.cache
, f
)
109 def __enter__(self
) -> 'LastElapsedTime':
112 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
117 def __init__(self
, status
: str, description
: str = '',
118 elapsed
: Optional
[float] = None, diff
: Sequence
[str] = (),
119 casenotrun
: str = '', interrupted
: bool = False) -> None:
121 self
.description
= description
122 self
.elapsed
= elapsed
124 self
.casenotrun
= casenotrun
125 self
.interrupted
= interrupted
128 class TestRunner(ContextManager
['TestRunner']):
129 def __init__(self
, env
: TestEnv
, makecheck
: bool = False,
130 color
: str = 'auto') -> None:
132 self
.test_run_env
= self
.env
.get_env()
133 self
.makecheck
= makecheck
134 self
.last_elapsed
= LastElapsedTime('.last-elapsed-cache', env
)
136 assert color
in ('auto', 'on', 'off')
137 self
.color
= (color
== 'on') or (color
== 'auto' and
140 self
._stack
: contextlib
.ExitStack
142 def __enter__(self
) -> 'TestRunner':
143 self
._stack
= contextlib
.ExitStack()
144 self
._stack
.enter_context(self
.env
)
145 self
._stack
.enter_context(self
.last_elapsed
)
146 self
._stack
.enter_context(savetty())
149 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
152 def test_print_one_line(self
, test
: str, starttime
: str,
153 endtime
: Optional
[str] = None, status
: str = '...',
154 lasttime
: Optional
[float] = None,
155 thistime
: Optional
[float] = None,
156 description
: str = '',
157 test_field_width
: Optional
[int] = None,
158 end
: str = '\n') -> None:
159 """ Print short test info before/after test run """
160 test
= os
.path
.basename(test
)
162 if test_field_width
is None:
165 if self
.makecheck
and status
!= '...':
166 if status
and status
!= 'pass':
167 status
= f
' [{status}]'
171 print(f
' TEST iotest-{self.env.imgfmt}: {test}{status}')
175 lasttime_s
= f
' (last: {lasttime:.1f}s)'
179 thistime_s
= f
'{thistime:.1f}s'
184 endtime
= f
'[{endtime}]'
191 elif status
== 'fail':
192 col
= '\033[1m\033[31m'
193 elif status
== 'not run':
203 print(f
'{test:{test_field_width}} {col}{status:10}{col_end} '
204 f
'[{starttime}] {endtime:13}{thistime_s:5} {lasttime_s:14} '
205 f
'{description}', end
=end
)
207 def find_reference(self
, test
: str) -> str:
208 if self
.env
.cachemode
== 'none':
209 ref
= f
'{test}.out.nocache'
210 if os
.path
.isfile(ref
):
213 ref
= f
'{test}.out.{self.env.imgfmt}'
214 if os
.path
.isfile(ref
):
217 ref
= f
'{test}.{self.env.qemu_default_machine}.out'
218 if os
.path
.isfile(ref
):
223 def do_run_test(self
, test
: str) -> TestResult
:
225 f_bad
= Path(f_test
.name
+ '.out.bad')
226 f_notrun
= Path(f_test
.name
+ '.notrun')
227 f_casenotrun
= Path(f_test
.name
+ '.casenotrun')
228 f_reference
= Path(self
.find_reference(test
))
230 if not f_test
.exists():
231 return TestResult(status
='fail',
232 description
=f
'No such test file: {f_test}')
234 if not os
.access(str(f_test
), os
.X_OK
):
235 sys
.exit(f
'Not executable: {f_test}')
237 if not f_reference
.exists():
238 return TestResult(status
='not run',
239 description
='No qualified output '
240 f
'(expected {f_reference})')
242 for p
in (f_bad
, f_notrun
, f_casenotrun
):
245 args
= [str(f_test
.resolve())]
249 with f_test
.open(encoding
="utf-8") as f
:
251 if f
.readline().rstrip() == '#!/usr/bin/env python3':
252 args
.insert(0, self
.env
.python
)
253 except UnicodeDecodeError: # binary test? for future.
256 env
= os
.environ
.copy()
257 env
.update(self
.test_run_env
)
260 with f_bad
.open('w', encoding
="utf-8") as f
:
261 proc
= subprocess
.Popen(args
, cwd
=str(f_test
.parent
), env
=env
,
262 stdout
=f
, stderr
=subprocess
.STDOUT
)
265 except KeyboardInterrupt:
268 return TestResult(status
='not run',
269 description
='Interrupted by user',
271 ret
= proc
.returncode
273 elapsed
= round(time
.time() - t0
, 1)
276 return TestResult(status
='fail', elapsed
=elapsed
,
277 description
=f
'failed, exit status {ret}',
278 diff
=file_diff(str(f_reference
), str(f_bad
)))
280 if f_notrun
.exists():
281 return TestResult(status
='not run',
282 description
=f_notrun
.read_text().strip())
285 if f_casenotrun
.exists():
286 casenotrun
= f_casenotrun
.read_text()
288 diff
= file_diff(str(f_reference
), str(f_bad
))
290 return TestResult(status
='fail', elapsed
=elapsed
,
291 description
=f
'output mismatch (see {f_bad})',
292 diff
=diff
, casenotrun
=casenotrun
)
295 self
.last_elapsed
.update(test
, elapsed
)
296 return TestResult(status
='pass', elapsed
=elapsed
,
297 casenotrun
=casenotrun
)
299 def run_test(self
, test
: str,
300 test_field_width
: Optional
[int] = None) -> TestResult
:
301 last_el
= self
.last_elapsed
.get(test
)
302 start
= datetime
.datetime
.now().strftime('%H:%M:%S')
304 if not self
.makecheck
:
305 self
.test_print_one_line(test
=test
, starttime
=start
,
306 lasttime
=last_el
, end
='\r',
307 test_field_width
=test_field_width
)
309 res
= self
.do_run_test(test
)
311 end
= datetime
.datetime
.now().strftime('%H:%M:%S')
312 self
.test_print_one_line(test
=test
, status
=res
.status
,
313 starttime
=start
, endtime
=end
,
314 lasttime
=last_el
, thistime
=res
.elapsed
,
315 description
=res
.description
,
316 test_field_width
=test_field_width
)
319 print(res
.casenotrun
)
323 def run_tests(self
, tests
: List
[str]) -> bool:
329 if not self
.makecheck
:
333 test_field_width
= max(len(os
.path
.basename(t
)) for t
in tests
) + 2
336 name
= os
.path
.basename(t
)
337 res
= self
.run_test(t
, test_field_width
=test_field_width
)
339 assert res
.status
in ('pass', 'fail', 'not run')
344 if res
.status
!= 'not run':
347 if res
.status
== 'fail':
352 print('\n'.join(res
.diff
))
353 elif res
.status
== 'not run':
360 print('Not run:', ' '.join(notrun
))
363 print('Some cases not run in:', ' '.join(casenotrun
))
366 print('Failures:', ' '.join(failed
))
367 print(f
'Failed {len(failed)} of {n_run} iotests')
370 print(f
'Passed all {n_run} iotests')