fuzz: accelerate non-crash detection
[qemu/ar7.git] / scripts / oss-fuzz / minimize_qtest_trace.py
bloba28913a2a7452a7b0c80f6cd7dc02b0194d4034b
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 """
5 This takes a crashing qtest trace and tries to remove superflous operations
6 """
8 import sys
9 import os
10 import subprocess
11 import time
12 import struct
14 QEMU_ARGS = None
15 QEMU_PATH = None
16 TIMEOUT = 5
17 CRASH_TOKEN = None
19 write_suffix_lookup = {"b": (1, "B"),
20 "w": (2, "H"),
21 "l": (4, "L"),
22 "q": (8, "Q")}
24 def usage():
25 sys.exit("""\
26 Usage: QEMU_PATH="/path/to/qemu" QEMU_ARGS="args" {} input_trace output_trace
27 By default, will try to use the second-to-last line in the output to identify
28 whether the crash occred. Optionally, manually set a string that idenitifes the
29 crash by setting CRASH_TOKEN=
30 """.format((sys.argv[0])))
32 deduplication_note = """\n\
33 Note: While trimming the input, sometimes the mutated trace triggers a different
34 type crash but indicates the same bug. Under this situation, our minimizer is
35 incapable of recognizing and stopped from removing it. In the future, we may
36 use a more sophisticated crash case deduplication method.
37 \n"""
39 def check_if_trace_crashes(trace, path):
40 with open(path, "w") as tracefile:
41 tracefile.write("".join(trace))
43 rc = subprocess.Popen("timeout -s 9 {timeout}s {qemu_path} {qemu_args} 2>&1\
44 < {trace_path}".format(timeout=TIMEOUT,
45 qemu_path=QEMU_PATH,
46 qemu_args=QEMU_ARGS,
47 trace_path=path),
48 shell=True,
49 stdin=subprocess.PIPE,
50 stdout=subprocess.PIPE,
51 encoding="utf-8")
52 global CRASH_TOKEN
53 if CRASH_TOKEN is None:
54 try:
55 outs, _ = rc.communicate(timeout=5)
56 CRASH_TOKEN = " ".join(outs.splitlines()[-2].split()[0:3])
57 except subprocess.TimeoutExpired:
58 print("subprocess.TimeoutExpired")
59 return False
60 print("Identifying Crashes by this string: {}".format(CRASH_TOKEN))
61 global deduplication_note
62 print(deduplication_note)
63 return True
65 for line in iter(rc.stdout.readline, ""):
66 if "CLOSED" in line:
67 return False
68 if CRASH_TOKEN in line:
69 return True
71 print("\nWarning:")
72 print(" There is no 'CLOSED'or CRASH_TOKEN in the stdout of subprocess.")
73 print(" Usually this indicates a different type of crash.\n")
74 return False
77 def minimize_trace(inpath, outpath):
78 global TIMEOUT
79 with open(inpath) as f:
80 trace = f.readlines()
81 start = time.time()
82 if not check_if_trace_crashes(trace, outpath):
83 sys.exit("The input qtest trace didn't cause a crash...")
84 end = time.time()
85 print("Crashed in {} seconds".format(end-start))
86 TIMEOUT = (end-start)*5
87 print("Setting the timeout for {} seconds".format(TIMEOUT))
89 i = 0
90 newtrace = trace[:]
91 # For each line
92 while i < len(newtrace):
93 # 1.) Try to remove it completely and reproduce the crash. If it works,
94 # we're done.
95 prior = newtrace[i]
96 print("Trying to remove {}".format(newtrace[i]))
97 # Try to remove the line completely
98 newtrace[i] = ""
99 if check_if_trace_crashes(newtrace, outpath):
100 i += 1
101 continue
102 newtrace[i] = prior
104 # 2.) Try to replace write{bwlq} commands with a write addr, len
105 # command. Since this can require swapping endianness, try both LE and
106 # BE options. We do this, so we can "trim" the writes in (3)
107 if (newtrace[i].startswith("write") and not
108 newtrace[i].startswith("write ")):
109 suffix = newtrace[i].split()[0][-1]
110 assert(suffix in write_suffix_lookup)
111 addr = int(newtrace[i].split()[1], 16)
112 value = int(newtrace[i].split()[2], 16)
113 for endianness in ['<', '>']:
114 data = struct.pack("{end}{size}".format(end=endianness,
115 size=write_suffix_lookup[suffix][1]),
116 value)
117 newtrace[i] = "write {addr} {size} 0x{data}\n".format(
118 addr=hex(addr),
119 size=hex(write_suffix_lookup[suffix][0]),
120 data=data.hex())
121 if(check_if_trace_crashes(newtrace, outpath)):
122 break
123 else:
124 newtrace[i] = prior
126 # 3.) If it is a qtest write command: write addr len data, try to split
127 # it into two separate write commands. If splitting the write down the
128 # middle does not work, try to move the pivot "left" and retry, until
129 # there is no space left. The idea is to prune unneccessary bytes from
130 # long writes, while accommodating arbitrary MemoryRegion access sizes
131 # and alignments.
132 if newtrace[i].startswith("write "):
133 addr = int(newtrace[i].split()[1], 16)
134 length = int(newtrace[i].split()[2], 16)
135 data = newtrace[i].split()[3][2:]
136 if length > 1:
137 leftlength = int(length/2)
138 rightlength = length - leftlength
139 newtrace.insert(i+1, "")
140 while leftlength > 0:
141 newtrace[i] = "write {addr} {size} 0x{data}\n".format(
142 addr=hex(addr),
143 size=hex(leftlength),
144 data=data[:leftlength*2])
145 newtrace[i+1] = "write {addr} {size} 0x{data}\n".format(
146 addr=hex(addr+leftlength),
147 size=hex(rightlength),
148 data=data[leftlength*2:])
149 if check_if_trace_crashes(newtrace, outpath):
150 break
151 else:
152 leftlength -= 1
153 rightlength += 1
154 if check_if_trace_crashes(newtrace, outpath):
155 i -= 1
156 else:
157 newtrace[i] = prior
158 del newtrace[i+1]
159 i += 1
160 check_if_trace_crashes(newtrace, outpath)
163 if __name__ == '__main__':
164 if len(sys.argv) < 3:
165 usage()
167 QEMU_PATH = os.getenv("QEMU_PATH")
168 QEMU_ARGS = os.getenv("QEMU_ARGS")
169 if QEMU_PATH is None or QEMU_ARGS is None:
170 usage()
171 # if "accel" not in QEMU_ARGS:
172 # QEMU_ARGS += " -accel qtest"
173 CRASH_TOKEN = os.getenv("CRASH_TOKEN")
174 QEMU_ARGS += " -qtest stdio -monitor none -serial none "
175 minimize_trace(sys.argv[1], sys.argv[2])