First upload
[CS310.git] / gradelib.py
blob87548ea68833dc6c9d07cb17b03e74c1015fab83
1 from __future__ import print_function
3 import sys, os, re, time, socket, select, subprocess, errno, shutil, random, string, json
4 from subprocess import check_call, Popen
5 from optparse import OptionParser
7 __all__ = []
9 ##################################################################
10 # Test structure
13 __all__ += ["test", "end_part", "run_tests", "get_current_test"]
15 TESTS = []
16 TOTAL = POSSIBLE = 0
17 PART_TOTAL = PART_POSSIBLE = 0
18 CURRENT_TEST = None
20 def test(points, title=None, parent=None):
21 """Decorator for declaring test functions. If title is None, the
22 title of the test will be derived from the function name by
23 stripping the leading "test_" and replacing underscores with
24 spaces."""
26 def register_test(fn, title=title):
27 if not title:
28 assert fn.__name__.startswith("test_")
29 title = fn.__name__[5:].replace("_", " ")
30 if parent:
31 title = " " + title
33 def run_test(info):
34 global TOTAL, POSSIBLE, CURRENT_TEST
36 # Handle test dependencies
37 if run_test.complete:
38 return run_test.ok
39 run_test.complete = True
40 parent_failed = False
41 if parent:
42 parent_failed = not parent(info)
44 # Run the test
45 fail = None
46 start = time.time()
47 CURRENT_TEST = run_test
48 sys.stdout.write("%s: " % title)
49 sys.stdout.flush()
50 try:
51 if parent_failed:
52 raise AssertionError('Parent failed: %s' % parent.__name__)
53 fn()
54 except AssertionError as e:
55 fail = str(e)
57 # Display and handle test result
58 POSSIBLE += points
59 if points:
60 print("%s" % \
61 (color("red", "FAIL") if fail else color("green", "OK")), end=' ')
62 if time.time() - start > 0.1:
63 print("(%.1fs)" % (time.time() - start), end=' ')
64 print()
65 if fail:
66 print(" %s" % fail.replace("\n", "\n "))
67 else:
68 TOTAL += points
69 for callback in run_test.on_finish:
70 callback(fail)
71 CURRENT_TEST = None
73 # Logging test info for JSON output
74 # NOTE: This assumes xv6.out is used as the defualt save path
75 info["name"] = title
76 info["status"] = "failed" if fail else "passed"
77 info["score"] = 0.0 if fail else float(points)
78 info["max_score"] = points
79 with open("xv6.out", "r") as file:
80 info["output"] = file.read()
82 run_test.ok = not fail
83 return run_test.ok
85 # Record test metadata on the test wrapper function
86 run_test.__name__ = fn.__name__
87 run_test.title = title
88 run_test.complete = False
89 run_test.ok = False
90 run_test.on_finish = []
91 TESTS.append(run_test)
92 return run_test
93 return register_test
95 def end_part(name):
96 def show_part():
97 global PART_TOTAL, PART_POSSIBLE
98 print("Part %s score: %d/%d" % \
99 (name, TOTAL - PART_TOTAL, POSSIBLE - PART_POSSIBLE))
100 print()
101 PART_TOTAL, PART_POSSIBLE = TOTAL, POSSIBLE
102 show_part.title = ""
103 TESTS.append(show_part)
105 def run_tests(outputJSON=None):
106 """Set up for testing and run the registered test functions."""
108 # Handle command line
109 global options
110 parser = OptionParser(usage="usage: %prog [-v] [filters...]")
111 parser.add_option("-v", "--verbose", action="store_true",
112 help="print commands")
113 parser.add_option("--color", choices=["never", "always", "auto"],
114 default="auto", help="never, always, or auto")
115 (options, args) = parser.parse_args()
117 # Start with a full build to catch build errors
118 make()
120 # Clean the file system if there is one
121 reset_fs()
123 # Keeping track of test info for JSON output
124 output = {}
125 output["tests"] = []
127 # Run tests
128 try:
129 for test in TESTS:
130 info = {}
131 test(info)
132 output["tests"].append(info)
133 print("Score: %d/%d" % (TOTAL, POSSIBLE))
134 except KeyboardInterrupt:
135 pass
137 if outputJSON:
138 with open(outputJSON, "w") as file:
139 file.write(json.dumps(output))
141 if TOTAL < POSSIBLE:
142 sys.exit(1)
144 def get_current_test():
145 if not CURRENT_TEST:
146 raise RuntimeError("No test is running")
147 return CURRENT_TEST
149 ##################################################################
150 # Assertions
153 __all__ += ["assert_equal", "assert_lines_match"]
155 def assert_equal(got, expect, msg=""):
156 if got == expect:
157 return
158 if msg:
159 msg += "\n"
160 raise AssertionError("%sgot:\n %s\nexpected:\n %s" %
161 (msg, str(got).replace("\n", "\n "),
162 str(expect).replace("\n", "\n ")))
164 def assert_lines_match(text, *regexps, **kw):
165 """Assert that all of regexps match some line in text. If a 'no'
166 keyword argument is given, it must be a list of regexps that must
167 *not* match any line in text."""
169 def assert_lines_match_kw(no=[]):
170 return no
171 no = assert_lines_match_kw(**kw)
173 # Check text against regexps
174 lines = text.splitlines()
175 good = set()
176 bad = set()
177 for i, line in enumerate(lines):
178 if any(re.match(r, line) for r in regexps):
179 good.add(i)
180 regexps = [r for r in regexps if not re.match(r, line)]
181 if any(re.match(r, line) for r in no):
182 bad.add(i)
184 if not regexps and not bad:
185 return
187 # We failed; construct an informative failure message
188 show = set()
189 for lineno in good.union(bad):
190 for offset in range(-2, 3):
191 show.add(lineno + offset)
192 if regexps:
193 show.update(n for n in range(len(lines) - 5, len(lines)))
195 msg = []
196 last = -1
197 for lineno in sorted(show):
198 if 0 <= lineno < len(lines):
199 if lineno != last + 1:
200 msg.append("...")
201 last = lineno
202 msg.append("%s %s" % (color("red", "BAD ") if lineno in bad else
203 color("green", "GOOD") if lineno in good
204 else " ",
205 lines[lineno]))
206 if last != len(lines) - 1:
207 msg.append("...")
208 if bad:
209 msg.append("unexpected lines in output")
210 for r in regexps:
211 msg.append(color("red", "MISSING") + " '%s'" % r)
212 raise AssertionError("\n".join(msg))
214 ##################################################################
215 # Utilities
218 __all__ += ["make", "maybe_unlink", "reset_fs", "color", "random_str"]
220 MAKE_TIMESTAMP = 0
222 def pre_make():
223 """Delay prior to running make to ensure file mtimes change."""
224 while int(time.time()) == MAKE_TIMESTAMP:
225 time.sleep(0.1)
227 def post_make():
228 """Record the time after make completes so that the next run of
229 make can be delayed if needed."""
230 global MAKE_TIMESTAMP
231 MAKE_TIMESTAMP = int(time.time())
233 def make(*target):
234 pre_make()
235 if Popen(("make",) + target).wait():
236 sys.exit(1)
237 post_make()
239 def show_command(cmd):
240 from pipes import quote
241 print("\n$", " ".join(map(quote, cmd)))
243 def maybe_unlink(*paths):
244 for path in paths:
245 try:
246 os.unlink(path)
247 except EnvironmentError as e:
248 if e.errno != errno.ENOENT:
249 raise
251 COLORS = {"default": "\033[0m", "red": "\033[31m", "green": "\033[32m"}
253 def color(name, text):
254 if options.color == "always" or (options.color == "auto" and os.isatty(1)):
255 return COLORS[name] + text + COLORS["default"]
256 return text
258 def reset_fs():
259 if os.path.exists("obj/fs/clean-fs.img"):
260 shutil.copyfile("obj/fs/clean-fs.img", "obj/fs/fs.img")
262 def random_str(n=8):
263 letters = string.ascii_letters + string.digits
264 return ''.join(random.choice(letters) for _ in range(n))
266 ##################################################################
267 # Controllers
270 __all__ += ["QEMU", "GDBClient"]
272 class QEMU(object):
273 _GDBPORT = None
275 def __init__(self, *make_args):
276 # Check that QEMU is not currently running
277 try:
278 GDBClient(self.get_gdb_port(), timeout=0).close()
279 except socket.error:
280 pass
281 else:
282 print("""\
283 GDB stub found on port %d.
284 QEMU appears to already be running. Please exit it if possible or use
285 'killall qemu' or 'killall qemu.real'.""" % self.get_gdb_port(), file=sys.stderr)
286 sys.exit(1)
288 if options.verbose:
289 show_command(("make",) + make_args)
290 cmd = ("make", "-s", "--no-print-directory") + make_args
291 self.proc = Popen(cmd, stdout=subprocess.PIPE,
292 stderr=subprocess.STDOUT,
293 stdin=subprocess.PIPE)
294 # Accumulated output as a string
295 self.output = ""
296 # Accumulated output as a bytearray
297 self.outbytes = bytearray()
298 self.on_output = []
300 @staticmethod
301 def get_gdb_port():
302 if QEMU._GDBPORT is None:
303 p = Popen(["make", "-s", "--no-print-directory", "print-gdbport"],
304 stdout=subprocess.PIPE)
305 (out, _) = p.communicate()
306 if p.returncode:
307 raise RuntimeError(
308 "Failed to get gdbport: make exited with %d" %
309 p.returncode)
310 QEMU._GDBPORT = int(out)
311 return QEMU._GDBPORT
313 def fileno(self):
314 if self.proc:
315 return self.proc.stdout.fileno()
317 def handle_read(self):
318 buf = os.read(self.proc.stdout.fileno(), 4096)
319 self.outbytes.extend(buf)
320 self.output = self.outbytes.decode("utf-8", "replace")
321 for callback in self.on_output:
322 callback(buf)
323 if buf == b"":
324 self.wait()
325 return
327 def write(self, buf):
328 if isinstance(buf, str):
329 buf = buf.encode('utf-8')
330 self.proc.stdin.write(buf)
331 self.proc.stdin.flush()
333 def wait(self):
334 if self.proc:
335 self.proc.wait()
336 self.proc = None
338 def kill(self):
339 if self.proc:
340 self.proc.terminate()
342 class GDBClient(object):
343 def __init__(self, port, timeout=15):
344 start = time.time()
345 while True:
346 self.sock = socket.socket()
347 try:
348 self.sock.settimeout(1)
349 self.sock.connect(("localhost", port))
350 break
351 except socket.error:
352 if time.time() >= start + timeout:
353 raise
354 self.__buf = ""
356 def fileno(self):
357 if self.sock:
358 return self.sock.fileno()
360 def handle_read(self):
361 try:
362 data = self.sock.recv(4096).decode("ascii", "replace")
363 except socket.error:
364 data = ""
365 if data == "":
366 self.sock.close()
367 self.sock = None
368 return
369 self.__buf += data
371 while True:
372 m = re.search(r"\$([^#]*)#[0-9a-zA-Z]{2}", self.__buf)
373 if not m:
374 break
375 pkt = m.group(1)
376 self.__buf = self.__buf[m.end():]
378 if pkt.startswith("T05"):
379 # Breakpoint
380 raise TerminateTest
382 def __send(self, cmd):
383 packet = "$%s#%02x" % (cmd, sum(map(ord, cmd)) % 256)
384 self.sock.sendall(packet.encode("ascii"))
386 def __send_break(self):
387 self.sock.sendall(b"\x03")
389 def close(self):
390 if self.sock:
391 self.sock.close()
392 self.sock = None
394 def cont(self):
395 self.__send("c")
397 def breakpoint(self, addr):
398 self.__send("Z1,%x,1" % addr)
401 ##################################################################
402 # QEMU test runner
405 __all__ += ["TerminateTest", "Runner"]
407 class TerminateTest(Exception):
408 pass
410 class Runner():
411 def __init__(self, *default_monitors):
412 self.__default_monitors = default_monitors
414 def run_qemu(self, *monitors, **kw):
415 """Run a QEMU-based test. monitors should functions that will
416 be called with this Runner instance once QEMU and GDB are
417 started. Typically, they should register callbacks that throw
418 TerminateTest when stop events occur. The target_base
419 argument gives the make target to run. The make_args argument
420 should be a list of additional arguments to pass to make. The
421 timeout argument bounds how long to run before returning."""
423 def run_qemu_kw(target_base="qemu", make_args=[], timeout=30):
424 return target_base, make_args, timeout
425 target_base, make_args, timeout = run_qemu_kw(**kw)
427 # Start QEMU
428 pre_make()
429 self.qemu = QEMU(target_base + "-gdb", *make_args)
430 self.gdb = None
432 try:
433 # Wait for QEMU to start or make to fail. This will set
434 # self.gdb if QEMU starts.
435 self.qemu.on_output = [self.__monitor_start]
436 self.__react([self.qemu], timeout=30)
437 self.qemu.on_output = []
438 if self.gdb is None:
439 print("Failed to connect to QEMU; output:")
440 print(self.qemu.output)
441 sys.exit(1)
442 post_make()
444 # QEMU and GDB are up
445 self.reactors = [self.qemu, self.gdb]
447 # Start monitoring
448 for m in self.__default_monitors + monitors:
449 m(self)
451 # Run and react
452 self.gdb.cont()
453 self.__react(self.reactors, timeout)
454 finally:
455 # Shutdown QEMU
456 try:
457 if self.gdb is None:
458 sys.exit(1)
459 self.qemu.kill()
460 self.__react(self.reactors, 5)
461 self.gdb.close()
462 self.qemu.wait()
463 except:
464 print("""\
465 Failed to shutdown QEMU. You might need to 'killall qemu' or
466 'killall qemu.real'.
467 """)
468 raise
470 def __monitor_start(self, output):
471 if b"\n" in output:
472 try:
473 self.gdb = GDBClient(self.qemu.get_gdb_port(), timeout=2)
474 raise TerminateTest
475 except socket.error:
476 pass
477 if not len(output):
478 raise TerminateTest
480 def __react(self, reactors, timeout):
481 deadline = time.time() + timeout
482 try:
483 while True:
484 timeleft = deadline - time.time()
485 if timeleft < 0:
486 sys.stdout.write("Timeout! ")
487 sys.stdout.flush()
488 return
490 rset = [r for r in reactors if r.fileno() is not None]
491 if not rset:
492 return
494 rset, _, _ = select.select(rset, [], [], timeleft)
495 for reactor in rset:
496 reactor.handle_read()
497 except TerminateTest:
498 pass
500 def user_test(self, binary, *monitors, **kw):
501 """Run a user test using the specified binary. Monitors and
502 keyword arguments are as for run_qemu. This runs on a disk
503 snapshot unless the keyword argument 'snapshot' is False."""
505 maybe_unlink("obj/kern/init.o", "obj/kern/kernel")
506 if kw.pop("snapshot", True):
507 kw.setdefault("make_args", []).append("QEMUEXTRA+=-snapshot")
508 self.run_qemu(target_base="run-%s" % binary, *monitors, **kw)
510 def match(self, *args, **kwargs):
511 """Shortcut to call assert_lines_match on the most recent QEMU
512 output."""
514 assert_lines_match(self.qemu.output, *args, **kwargs)
516 ##################################################################
517 # Monitors
520 __all__ += ["save", "stop_breakpoint", "call_on_line", "stop_on_line", "shell_script"]
522 def save(path):
523 """Return a monitor that writes QEMU's output to path. If the
524 test fails, copy the output to path.test-name."""
526 def setup_save(runner):
527 f.seek(0)
528 f.truncate()
529 runner.qemu.on_output.append(f.write)
530 get_current_test().on_finish.append(save_on_finish)
532 def save_on_finish(fail):
533 f.flush()
534 save_path = path + "." + get_current_test().__name__[5:]
535 if fail:
536 shutil.copyfile(path, save_path)
537 print(" QEMU output saved to %s" % save_path)
538 elif os.path.exists(save_path):
539 os.unlink(save_path)
540 print(" (Old %s failure log removed)" % save_path)
542 f = open(path, "wb")
543 return setup_save
545 def stop_breakpoint(addr):
546 """Returns a monitor that stops when addr is reached. addr may be
547 a number or the name of a symbol."""
549 def setup_breakpoint(runner):
550 if isinstance(addr, str):
551 addrs = [int(sym[:16], 16) for sym in open("kernel/kernel.sym")
552 if sym[17:].strip() == addr]
553 assert len(addrs), "Symbol %s not found" % addr
554 runner.gdb.breakpoint(addrs[0])
555 else:
556 runner.gdb.breakpoint(addr)
557 return setup_breakpoint
559 def call_on_line(regexp, callback):
560 """Returns a monitor that calls 'callback' when QEMU prints a line
561 matching 'regexp'."""
563 def setup_call_on_line(runner):
564 buf = bytearray()
565 def handle_output(output):
566 buf.extend(output)
567 while b"\n" in buf:
568 line, buf[:] = buf.split(b"\n", 1)
569 line = line.decode("utf-8", "replace")
570 if re.match(regexp, line):
571 callback(line)
572 runner.qemu.on_output.append(handle_output)
573 return setup_call_on_line
575 def stop_on_line(regexp):
576 """Returns a monitor that stops when QEMU prints a line matching
577 'regexp'."""
579 def stop(line):
580 raise TerminateTest
581 return call_on_line(regexp, stop)
583 def shell_script(script, terminate_match=None):
584 """Returns a monitor that plays the script, and stops when the script is
585 done executing."""
587 def setup_call_on_line(runner):
588 class context:
589 n = 0
590 buf = bytearray()
591 def handle_output(output):
592 context.buf.extend(output)
593 if terminate_match is not None:
594 if re.match(terminate_match, context.buf.decode('utf-8', 'replace')):
595 raise TerminateTest
596 if b'$ ' in context.buf:
597 context.buf = bytearray()
598 if context.n < len(script):
599 runner.qemu.write(script[context.n])
600 runner.qemu.write('\n')
601 context.n += 1
602 else:
603 if terminate_match is None:
604 raise TerminateTest
605 runner.qemu.on_output.append(handle_output)
606 return setup_call_on_line