failover: Remove primary_dev member
[qemu.git] / scripts / oss-fuzz / minimize_qtest_trace.py
blob5e405a0d5f0a9452255472fb391dbc225ff51c8a
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 """
5 This takes a crashing qtest trace and tries to remove superflous operations
6 """
8 import sys
9 import os
10 import subprocess
11 import time
12 import struct
14 QEMU_ARGS = None
15 QEMU_PATH = None
16 TIMEOUT = 5
17 CRASH_TOKEN = None
19 write_suffix_lookup = {"b": (1, "B"),
20 "w": (2, "H"),
21 "l": (4, "L"),
22 "q": (8, "Q")}
24 def usage():
25 sys.exit("""\
26 Usage: QEMU_PATH="/path/to/qemu" QEMU_ARGS="args" {} input_trace output_trace
27 By default, will try to use the second-to-last line in the output to identify
28 whether the crash occred. Optionally, manually set a string that idenitifes the
29 crash by setting CRASH_TOKEN=
30 """.format((sys.argv[0])))
32 def check_if_trace_crashes(trace, path):
33 global CRASH_TOKEN
34 with open(path, "w") as tracefile:
35 tracefile.write("".join(trace))
37 rc = subprocess.Popen("timeout -s 9 {timeout}s {qemu_path} {qemu_args} 2>&1\
38 < {trace_path}".format(timeout=TIMEOUT,
39 qemu_path=QEMU_PATH,
40 qemu_args=QEMU_ARGS,
41 trace_path=path),
42 shell=True,
43 stdin=subprocess.PIPE,
44 stdout=subprocess.PIPE)
45 stdo = rc.communicate()[0]
46 output = stdo.decode('unicode_escape')
47 if rc.returncode == 137: # Timed Out
48 return False
49 if len(output.splitlines()) < 2:
50 return False
52 if CRASH_TOKEN is None:
53 CRASH_TOKEN = output.splitlines()[-2]
55 return CRASH_TOKEN in output
58 def minimize_trace(inpath, outpath):
59 global TIMEOUT
60 with open(inpath) as f:
61 trace = f.readlines()
62 start = time.time()
63 if not check_if_trace_crashes(trace, outpath):
64 sys.exit("The input qtest trace didn't cause a crash...")
65 end = time.time()
66 print("Crashed in {} seconds".format(end-start))
67 TIMEOUT = (end-start)*5
68 print("Setting the timeout for {} seconds".format(TIMEOUT))
69 print("Identifying Crashes by this string: {}".format(CRASH_TOKEN))
71 i = 0
72 newtrace = trace[:]
73 # For each line
74 while i < len(newtrace):
75 # 1.) Try to remove it completely and reproduce the crash. If it works,
76 # we're done.
77 prior = newtrace[i]
78 print("Trying to remove {}".format(newtrace[i]))
79 # Try to remove the line completely
80 newtrace[i] = ""
81 if check_if_trace_crashes(newtrace, outpath):
82 i += 1
83 continue
84 newtrace[i] = prior
86 # 2.) Try to replace write{bwlq} commands with a write addr, len
87 # command. Since this can require swapping endianness, try both LE and
88 # BE options. We do this, so we can "trim" the writes in (3)
89 if (newtrace[i].startswith("write") and not
90 newtrace[i].startswith("write ")):
91 suffix = newtrace[i].split()[0][-1]
92 assert(suffix in write_suffix_lookup)
93 addr = int(newtrace[i].split()[1], 16)
94 value = int(newtrace[i].split()[2], 16)
95 for endianness in ['<', '>']:
96 data = struct.pack("{end}{size}".format(end=endianness,
97 size=write_suffix_lookup[suffix][1]),
98 value)
99 newtrace[i] = "write {addr} {size} 0x{data}\n".format(
100 addr=hex(addr),
101 size=hex(write_suffix_lookup[suffix][0]),
102 data=data.hex())
103 if(check_if_trace_crashes(newtrace, outpath)):
104 break
105 else:
106 newtrace[i] = prior
108 # 3.) If it is a qtest write command: write addr len data, try to split
109 # it into two separate write commands. If splitting the write down the
110 # middle does not work, try to move the pivot "left" and retry, until
111 # there is no space left. The idea is to prune unneccessary bytes from
112 # long writes, while accommodating arbitrary MemoryRegion access sizes
113 # and alignments.
114 if newtrace[i].startswith("write "):
115 addr = int(newtrace[i].split()[1], 16)
116 length = int(newtrace[i].split()[2], 16)
117 data = newtrace[i].split()[3][2:]
118 if length > 1:
119 leftlength = int(length/2)
120 rightlength = length - leftlength
121 newtrace.insert(i+1, "")
122 while leftlength > 0:
123 newtrace[i] = "write {addr} {size} 0x{data}\n".format(
124 addr=hex(addr),
125 size=hex(leftlength),
126 data=data[:leftlength*2])
127 newtrace[i+1] = "write {addr} {size} 0x{data}\n".format(
128 addr=hex(addr+leftlength),
129 size=hex(rightlength),
130 data=data[leftlength*2:])
131 if check_if_trace_crashes(newtrace, outpath):
132 break
133 else:
134 leftlength -= 1
135 rightlength += 1
136 if check_if_trace_crashes(newtrace, outpath):
137 i -= 1
138 else:
139 newtrace[i] = prior
140 del newtrace[i+1]
141 i += 1
142 check_if_trace_crashes(newtrace, outpath)
145 if __name__ == '__main__':
146 if len(sys.argv) < 3:
147 usage()
149 QEMU_PATH = os.getenv("QEMU_PATH")
150 QEMU_ARGS = os.getenv("QEMU_ARGS")
151 if QEMU_PATH is None or QEMU_ARGS is None:
152 usage()
153 # if "accel" not in QEMU_ARGS:
154 # QEMU_ARGS += " -accel qtest"
155 CRASH_TOKEN = os.getenv("CRASH_TOKEN")
156 QEMU_ARGS += " -qtest stdio -monitor none -serial none "
157 minimize_trace(sys.argv[1], sys.argv[2])