1 from __future__
import print_function
3 import sys
, os
, re
, time
, socket
, select
, subprocess
, errno
, shutil
, random
, string
, json
4 from subprocess
import check_call
, Popen
5 from optparse
import OptionParser
9 ##################################################################
13 __all__
+= ["test", "end_part", "run_tests", "get_current_test"]
17 PART_TOTAL
= PART_POSSIBLE
= 0
20 def test(points
, title
=None, parent
=None):
21 """Decorator for declaring test functions. If title is None, the
22 title of the test will be derived from the function name by
23 stripping the leading "test_" and replacing underscores with
26 def register_test(fn
, title
=title
):
28 assert fn
.__name
__.startswith("test_")
29 title
= fn
.__name
__[5:].replace("_", " ")
34 global TOTAL
, POSSIBLE
, CURRENT_TEST
36 # Handle test dependencies
39 run_test
.complete
= True
42 parent_failed
= not parent(info
)
47 CURRENT_TEST
= run_test
48 sys
.stdout
.write("%s: " % title
)
52 raise AssertionError('Parent failed: %s' % parent
.__name
__)
54 except AssertionError as e
:
57 # Display and handle test result
61 (color("red", "FAIL") if fail
else color("green", "OK")), end
=' ')
62 if time
.time() - start
> 0.1:
63 print("(%.1fs)" % (time
.time() - start
), end
=' ')
66 print(" %s" % fail
.replace("\n", "\n "))
69 for callback
in run_test
.on_finish
:
73 # Logging test info for JSON output
74 # NOTE: This assumes xv6.out is used as the defualt save path
76 info
["status"] = "failed" if fail
else "passed"
77 info
["score"] = 0.0 if fail
else float(points
)
78 info
["max_score"] = points
79 with
open("xv6.out", "r") as file:
80 info
["output"] = file.read()
82 run_test
.ok
= not fail
85 # Record test metadata on the test wrapper function
86 run_test
.__name
__ = fn
.__name
__
87 run_test
.title
= title
88 run_test
.complete
= False
90 run_test
.on_finish
= []
91 TESTS
.append(run_test
)
97 global PART_TOTAL
, PART_POSSIBLE
98 print("Part %s score: %d/%d" % \
99 (name
, TOTAL
- PART_TOTAL
, POSSIBLE
- PART_POSSIBLE
))
101 PART_TOTAL
, PART_POSSIBLE
= TOTAL
, POSSIBLE
103 TESTS
.append(show_part
)
105 def run_tests(outputJSON
=None):
106 """Set up for testing and run the registered test functions."""
108 # Handle command line
110 parser
= OptionParser(usage
="usage: %prog [-v] [filters...]")
111 parser
.add_option("-v", "--verbose", action
="store_true",
112 help="print commands")
113 parser
.add_option("--color", choices
=["never", "always", "auto"],
114 default
="auto", help="never, always, or auto")
115 (options
, args
) = parser
.parse_args()
117 # Start with a full build to catch build errors
120 # Clean the file system if there is one
123 # Keeping track of test info for JSON output
132 output
["tests"].append(info
)
133 print("Score: %d/%d" % (TOTAL
, POSSIBLE
))
134 except KeyboardInterrupt:
138 with
open(outputJSON
, "w") as file:
139 file.write(json
.dumps(output
))
144 def get_current_test():
146 raise RuntimeError("No test is running")
149 ##################################################################
153 __all__
+= ["assert_equal", "assert_lines_match"]
155 def assert_equal(got
, expect
, msg
=""):
160 raise AssertionError("%sgot:\n %s\nexpected:\n %s" %
161 (msg
, str(got
).replace("\n", "\n "),
162 str(expect
).replace("\n", "\n ")))
164 def assert_lines_match(text
, *regexps
, **kw
):
165 """Assert that all of regexps match some line in text. If a 'no'
166 keyword argument is given, it must be a list of regexps that must
167 *not* match any line in text."""
169 def assert_lines_match_kw(no
=[]):
171 no
= assert_lines_match_kw(**kw
)
173 # Check text against regexps
174 lines
= text
.splitlines()
177 for i
, line
in enumerate(lines
):
178 if any(re
.match(r
, line
) for r
in regexps
):
180 regexps
= [r
for r
in regexps
if not re
.match(r
, line
)]
181 if any(re
.match(r
, line
) for r
in no
):
184 if not regexps
and not bad
:
187 # We failed; construct an informative failure message
189 for lineno
in good
.union(bad
):
190 for offset
in range(-2, 3):
191 show
.add(lineno
+ offset
)
193 show
.update(n
for n
in range(len(lines
) - 5, len(lines
)))
197 for lineno
in sorted(show
):
198 if 0 <= lineno
< len(lines
):
199 if lineno
!= last
+ 1:
202 msg
.append("%s %s" % (color("red", "BAD ") if lineno
in bad
else
203 color("green", "GOOD") if lineno
in good
206 if last
!= len(lines
) - 1:
209 msg
.append("unexpected lines in output")
211 msg
.append(color("red", "MISSING") + " '%s'" % r
)
212 raise AssertionError("\n".join(msg
))
214 ##################################################################
218 __all__
+= ["make", "maybe_unlink", "reset_fs", "color", "random_str"]
223 """Delay prior to running make to ensure file mtimes change."""
224 while int(time
.time()) == MAKE_TIMESTAMP
:
228 """Record the time after make completes so that the next run of
229 make can be delayed if needed."""
230 global MAKE_TIMESTAMP
231 MAKE_TIMESTAMP
= int(time
.time())
235 if Popen(("make",) + target
).wait():
239 def show_command(cmd
):
240 from pipes
import quote
241 print("\n$", " ".join(map(quote
, cmd
)))
243 def maybe_unlink(*paths
):
247 except EnvironmentError as e
:
248 if e
.errno
!= errno
.ENOENT
:
251 COLORS
= {"default": "\033[0m", "red": "\033[31m", "green": "\033[32m"}
253 def color(name
, text
):
254 if options
.color
== "always" or (options
.color
== "auto" and os
.isatty(1)):
255 return COLORS
[name
] + text
+ COLORS
["default"]
259 if os
.path
.exists("obj/fs/clean-fs.img"):
260 shutil
.copyfile("obj/fs/clean-fs.img", "obj/fs/fs.img")
263 letters
= string
.ascii_letters
+ string
.digits
264 return ''.join(random
.choice(letters
) for _
in range(n
))
266 ##################################################################
270 __all__
+= ["QEMU", "GDBClient"]
275 def __init__(self
, *make_args
):
276 # Check that QEMU is not currently running
278 GDBClient(self
.get_gdb_port(), timeout
=0).close()
283 GDB stub found on port %d.
284 QEMU appears to already be running. Please exit it if possible or use
285 'killall qemu' or 'killall qemu.real'.""" % self
.get_gdb_port(), file=sys
.stderr
)
289 show_command(("make",) + make_args
)
290 cmd
= ("make", "-s", "--no-print-directory") + make_args
291 self
.proc
= Popen(cmd
, stdout
=subprocess
.PIPE
,
292 stderr
=subprocess
.STDOUT
,
293 stdin
=subprocess
.PIPE
)
294 # Accumulated output as a string
296 # Accumulated output as a bytearray
297 self
.outbytes
= bytearray()
302 if QEMU
._GDBPORT
is None:
303 p
= Popen(["make", "-s", "--no-print-directory", "print-gdbport"],
304 stdout
=subprocess
.PIPE
)
305 (out
, _
) = p
.communicate()
308 "Failed to get gdbport: make exited with %d" %
310 QEMU
._GDBPORT
= int(out
)
315 return self
.proc
.stdout
.fileno()
317 def handle_read(self
):
318 buf
= os
.read(self
.proc
.stdout
.fileno(), 4096)
319 self
.outbytes
.extend(buf
)
320 self
.output
= self
.outbytes
.decode("utf-8", "replace")
321 for callback
in self
.on_output
:
327 def write(self
, buf
):
328 if isinstance(buf
, str):
329 buf
= buf
.encode('utf-8')
330 self
.proc
.stdin
.write(buf
)
331 self
.proc
.stdin
.flush()
340 self
.proc
.terminate()
342 class GDBClient(object):
343 def __init__(self
, port
, timeout
=15):
346 self
.sock
= socket
.socket()
348 self
.sock
.settimeout(1)
349 self
.sock
.connect(("localhost", port
))
352 if time
.time() >= start
+ timeout
:
358 return self
.sock
.fileno()
360 def handle_read(self
):
362 data
= self
.sock
.recv(4096).decode("ascii", "replace")
372 m
= re
.search(r
"\$([^#]*)#[0-9a-zA-Z]{2}", self
.__buf
)
376 self
.__buf
= self
.__buf
[m
.end():]
378 if pkt
.startswith("T05"):
382 def __send(self
, cmd
):
383 packet
= "$%s#%02x" % (cmd
, sum(map(ord, cmd
)) % 256)
384 self
.sock
.sendall(packet
.encode("ascii"))
386 def __send_break(self
):
387 self
.sock
.sendall(b
"\x03")
397 def breakpoint(self
, addr
):
398 self
.__send
("Z1,%x,1" % addr
)
401 ##################################################################
405 __all__
+= ["TerminateTest", "Runner"]
407 class TerminateTest(Exception):
411 def __init__(self
, *default_monitors
):
412 self
.__default
_monitors
= default_monitors
414 def run_qemu(self
, *monitors
, **kw
):
415 """Run a QEMU-based test. monitors should functions that will
416 be called with this Runner instance once QEMU and GDB are
417 started. Typically, they should register callbacks that throw
418 TerminateTest when stop events occur. The target_base
419 argument gives the make target to run. The make_args argument
420 should be a list of additional arguments to pass to make. The
421 timeout argument bounds how long to run before returning."""
423 def run_qemu_kw(target_base
="qemu", make_args
=[], timeout
=30):
424 return target_base
, make_args
, timeout
425 target_base
, make_args
, timeout
= run_qemu_kw(**kw
)
429 self
.qemu
= QEMU(target_base
+ "-gdb", *make_args
)
433 # Wait for QEMU to start or make to fail. This will set
434 # self.gdb if QEMU starts.
435 self
.qemu
.on_output
= [self
.__monitor
_start
]
436 self
.__react
([self
.qemu
], timeout
=30)
437 self
.qemu
.on_output
= []
439 print("Failed to connect to QEMU; output:")
440 print(self
.qemu
.output
)
444 # QEMU and GDB are up
445 self
.reactors
= [self
.qemu
, self
.gdb
]
448 for m
in self
.__default
_monitors
+ monitors
:
453 self
.__react
(self
.reactors
, timeout
)
460 self
.__react
(self
.reactors
, 5)
465 Failed to shutdown QEMU. You might need to 'killall qemu' or
470 def __monitor_start(self
, output
):
473 self
.gdb
= GDBClient(self
.qemu
.get_gdb_port(), timeout
=2)
480 def __react(self
, reactors
, timeout
):
481 deadline
= time
.time() + timeout
484 timeleft
= deadline
- time
.time()
486 sys
.stdout
.write("Timeout! ")
490 rset
= [r
for r
in reactors
if r
.fileno() is not None]
494 rset
, _
, _
= select
.select(rset
, [], [], timeleft
)
496 reactor
.handle_read()
497 except TerminateTest
:
500 def user_test(self
, binary
, *monitors
, **kw
):
501 """Run a user test using the specified binary. Monitors and
502 keyword arguments are as for run_qemu. This runs on a disk
503 snapshot unless the keyword argument 'snapshot' is False."""
505 maybe_unlink("obj/kern/init.o", "obj/kern/kernel")
506 if kw
.pop("snapshot", True):
507 kw
.setdefault("make_args", []).append("QEMUEXTRA+=-snapshot")
508 self
.run_qemu(target_base
="run-%s" % binary
, *monitors
, **kw
)
510 def match(self
, *args
, **kwargs
):
511 """Shortcut to call assert_lines_match on the most recent QEMU
514 assert_lines_match(self
.qemu
.output
, *args
, **kwargs
)
516 ##################################################################
520 __all__
+= ["save", "stop_breakpoint", "call_on_line", "stop_on_line", "shell_script"]
523 """Return a monitor that writes QEMU's output to path. If the
524 test fails, copy the output to path.test-name."""
526 def setup_save(runner
):
529 runner
.qemu
.on_output
.append(f
.write
)
530 get_current_test().on_finish
.append(save_on_finish
)
532 def save_on_finish(fail
):
534 save_path
= path
+ "." + get_current_test().__name
__[5:]
536 shutil
.copyfile(path
, save_path
)
537 print(" QEMU output saved to %s" % save_path
)
538 elif os
.path
.exists(save_path
):
540 print(" (Old %s failure log removed)" % save_path
)
545 def stop_breakpoint(addr
):
546 """Returns a monitor that stops when addr is reached. addr may be
547 a number or the name of a symbol."""
549 def setup_breakpoint(runner
):
550 if isinstance(addr
, str):
551 addrs
= [int(sym
[:16], 16) for sym
in open("kernel/kernel.sym")
552 if sym
[17:].strip() == addr
]
553 assert len(addrs
), "Symbol %s not found" % addr
554 runner
.gdb
.breakpoint(addrs
[0])
556 runner
.gdb
.breakpoint(addr
)
557 return setup_breakpoint
559 def call_on_line(regexp
, callback
):
560 """Returns a monitor that calls 'callback' when QEMU prints a line
561 matching 'regexp'."""
563 def setup_call_on_line(runner
):
565 def handle_output(output
):
568 line
, buf
[:] = buf
.split(b
"\n", 1)
569 line
= line
.decode("utf-8", "replace")
570 if re
.match(regexp
, line
):
572 runner
.qemu
.on_output
.append(handle_output
)
573 return setup_call_on_line
575 def stop_on_line(regexp
):
576 """Returns a monitor that stops when QEMU prints a line matching
581 return call_on_line(regexp
, stop
)
583 def shell_script(script
, terminate_match
=None):
584 """Returns a monitor that plays the script, and stops when the script is
587 def setup_call_on_line(runner
):
591 def handle_output(output
):
592 context
.buf
.extend(output
)
593 if terminate_match
is not None:
594 if re
.match(terminate_match
, context
.buf
.decode('utf-8', 'replace')):
596 if b
'$ ' in context
.buf
:
597 context
.buf
= bytearray()
598 if context
.n
< len(script
):
599 runner
.qemu
.write(script
[context
.n
])
600 runner
.qemu
.write('\n')
603 if terminate_match
is None:
605 runner
.qemu
.on_output
.append(handle_output
)
606 return setup_call_on_line