image-fuzzer: Run using python3
[qemu.git] / tests / image-fuzzer / runner.py
blob07932348150a8280f04377568bac69b47a254f9c
1 #!/usr/bin/env python3
3 # Tool for running fuzz tests
5 # Copyright (C) 2014 Maria Kustova <maria.k@catit.be>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import sys
22 import os
23 import signal
24 import subprocess
25 import random
26 import shutil
27 from itertools import count
28 import time
29 import getopt
30 import io
31 import resource
33 try:
34 import json
35 except ImportError:
36 try:
37 import simplejson as json
38 except ImportError:
39 print("Warning: Module for JSON processing is not found.\n" \
40 "'--config' and '--command' options are not supported.", file=sys.stderr)
42 # Backing file sizes in MB
43 MAX_BACKING_FILE_SIZE = 10
44 MIN_BACKING_FILE_SIZE = 1
47 def multilog(msg, *output):
48 """ Write an object to all of specified file descriptors."""
49 for fd in output:
50 fd.write(msg)
51 fd.flush()
54 def str_signal(sig):
55 """ Convert a numeric value of a system signal to the string one
56 defined by the current operational system.
57 """
58 for k, v in signal.__dict__.items():
59 if v == sig:
60 return k
63 def run_app(fd, q_args):
64 """Start an application with specified arguments and return its exit code
65 or kill signal depending on the result of execution.
66 """
68 class Alarm(Exception):
69 """Exception for signal.alarm events."""
70 pass
72 def handler(*args):
73 """Notify that an alarm event occurred."""
74 raise Alarm
76 signal.signal(signal.SIGALRM, handler)
77 signal.alarm(600)
78 term_signal = signal.SIGKILL
79 devnull = open('/dev/null', 'r+')
80 process = subprocess.Popen(q_args, stdin=devnull,
81 stdout=subprocess.PIPE,
82 stderr=subprocess.PIPE)
83 try:
84 out, err = process.communicate()
85 signal.alarm(0)
86 # fd is a text file, so we need to decode the process output before
87 # writing to it.
88 # We could be simply using the `errors` parameter of subprocess.Popen(),
89 # but this will be possible only after migrating to Python 3
90 fd.write(out.decode(errors='replace'))
91 fd.write(err.decode(errors='replace'))
92 fd.flush()
93 return process.returncode
95 except Alarm:
96 os.kill(process.pid, term_signal)
97 fd.write('The command was terminated by timeout.\n')
98 fd.flush()
99 return -term_signal
102 class TestException(Exception):
103 """Exception for errors risen by TestEnv objects."""
104 pass
107 class TestEnv(object):
109 """Test object.
111 The class sets up test environment, generates backing and test images
112 and executes application under tests with specified arguments and a test
113 image provided.
115 All logs are collected.
117 The summary log will contain short descriptions and statuses of tests in
118 a run.
120 The test log will include application (e.g. 'qemu-img') logs besides info
121 sent to the summary log.
124 def __init__(self, test_id, seed, work_dir, run_log,
125 cleanup=True, log_all=False):
126 """Set test environment in a specified work directory.
128 Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and
129 'QEMU_IO' environment variables.
131 if seed is not None:
132 self.seed = seed
133 else:
134 self.seed = str(random.randint(0, sys.maxsize))
135 random.seed(self.seed)
137 self.init_path = os.getcwd()
138 self.work_dir = work_dir
139 self.current_dir = os.path.join(work_dir, 'test-' + test_id)
140 self.qemu_img = \
141 os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ')
142 self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ')
143 self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'],
144 ['qemu-img', 'info', '-f', 'qcow2', '$test_img'],
145 ['qemu-io', '$test_img', '-c', 'read $off $len'],
146 ['qemu-io', '$test_img', '-c', 'write $off $len'],
147 ['qemu-io', '$test_img', '-c',
148 'aio_read $off $len'],
149 ['qemu-io', '$test_img', '-c',
150 'aio_write $off $len'],
151 ['qemu-io', '$test_img', '-c', 'flush'],
152 ['qemu-io', '$test_img', '-c',
153 'discard $off $len'],
154 ['qemu-io', '$test_img', '-c',
155 'truncate $off']]
156 for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']:
157 self.commands.append(
158 ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt,
159 '$test_img', 'converted_image.' + fmt])
161 try:
162 os.makedirs(self.current_dir)
163 except OSError as e:
164 print("Error: The working directory '%s' cannot be used. Reason: %s"\
165 % (self.work_dir, e[1]), file=sys.stderr)
166 raise TestException
167 self.log = open(os.path.join(self.current_dir, "test.log"), "w")
168 self.parent_log = open(run_log, "a")
169 self.failed = False
170 self.cleanup = cleanup
171 self.log_all = log_all
173 def _create_backing_file(self):
174 """Create a backing file in the current directory.
176 Return a tuple of a backing file name and format.
178 Format of a backing file is randomly chosen from all formats supported
179 by 'qemu-img create'.
181 # All formats supported by the 'qemu-img create' command.
182 backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2',
183 'file', 'qed', 'vpc'])
184 backing_file_name = 'backing_img.' + backing_file_fmt
185 backing_file_size = random.randint(MIN_BACKING_FILE_SIZE,
186 MAX_BACKING_FILE_SIZE) * (1 << 20)
187 cmd = self.qemu_img + ['create', '-f', backing_file_fmt,
188 backing_file_name, str(backing_file_size)]
189 temp_log = io.StringIO()
190 retcode = run_app(temp_log, cmd)
191 if retcode == 0:
192 temp_log.close()
193 return (backing_file_name, backing_file_fmt)
194 else:
195 multilog("Warning: The %s backing file was not created.\n\n"
196 % backing_file_fmt, sys.stderr, self.log, self.parent_log)
197 self.log.write("Log for the failure:\n" + temp_log.getvalue() +
198 '\n\n')
199 temp_log.close()
200 return (None, None)
202 def execute(self, input_commands=None, fuzz_config=None):
203 """ Execute a test.
205 The method creates backing and test images, runs test app and analyzes
206 its exit status. If the application was killed by a signal, the test
207 is marked as failed.
209 if input_commands is None:
210 commands = self.commands
211 else:
212 commands = input_commands
214 os.chdir(self.current_dir)
215 backing_file_name, backing_file_fmt = self._create_backing_file()
216 img_size = image_generator.create_image(
217 'test.img', backing_file_name, backing_file_fmt, fuzz_config)
218 for item in commands:
219 shutil.copy('test.img', 'copy.img')
220 # 'off' and 'len' are multiple of the sector size
221 sector_size = 512
222 start = random.randrange(0, img_size + 1, sector_size)
223 end = random.randrange(start, img_size + 1, sector_size)
225 if item[0] == 'qemu-img':
226 current_cmd = list(self.qemu_img)
227 elif item[0] == 'qemu-io':
228 current_cmd = list(self.qemu_io)
229 else:
230 multilog("Warning: test command '%s' is not defined.\n"
231 % item[0], sys.stderr, self.log, self.parent_log)
232 continue
233 # Replace all placeholders with their real values
234 for v in item[1:]:
235 c = (v
236 .replace('$test_img', 'copy.img')
237 .replace('$off', str(start))
238 .replace('$len', str(end - start)))
239 current_cmd.append(c)
241 # Log string with the test header
242 test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \
243 "Backing file: %s\n" \
244 % (self.seed, " ".join(current_cmd),
245 self.current_dir, backing_file_name)
246 temp_log = io.StringIO()
247 try:
248 retcode = run_app(temp_log, current_cmd)
249 except OSError as e:
250 multilog("%sError: Start of '%s' failed. Reason: %s\n\n"
251 % (test_summary, os.path.basename(current_cmd[0]),
252 e[1]),
253 sys.stderr, self.log, self.parent_log)
254 raise TestException
256 if retcode < 0:
257 self.log.write(temp_log.getvalue())
258 multilog("%sFAIL: Test terminated by signal %s\n\n"
259 % (test_summary, str_signal(-retcode)),
260 sys.stderr, self.log, self.parent_log)
261 self.failed = True
262 else:
263 if self.log_all:
264 self.log.write(temp_log.getvalue())
265 multilog("%sPASS: Application exited with the code " \
266 "'%d'\n\n" % (test_summary, retcode),
267 sys.stdout, self.log, self.parent_log)
268 temp_log.close()
269 os.remove('copy.img')
271 def finish(self):
272 """Restore the test environment after a test execution."""
273 self.log.close()
274 self.parent_log.close()
275 os.chdir(self.init_path)
276 if self.cleanup and not self.failed:
277 shutil.rmtree(self.current_dir)
279 if __name__ == '__main__':
281 def usage():
282 print("""
283 Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR
285 Set up test environment in TEST_DIR and run a test in it. A module for
286 test image generation should be specified via IMG_GENERATOR.
288 Example:
289 runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2
291 Optional arguments:
292 -h, --help display this help and exit
293 -d, --duration=NUMBER finish tests after NUMBER of seconds
294 -c, --command=JSON run tests for all commands specified in
295 the JSON array
296 -s, --seed=STRING seed for a test image generation,
297 by default will be generated randomly
298 --config=JSON take fuzzer configuration from the JSON
299 array
300 -k, --keep_passed don't remove folders of passed tests
301 -v, --verbose log information about passed tests
303 JSON:
305 '--command' accepts a JSON array of commands. Each command presents
306 an application under test with all its parameters as a list of strings,
307 e.g. ["qemu-io", "$test_img", "-c", "write $off $len"].
309 Supported application aliases: 'qemu-img' and 'qemu-io'.
311 Supported argument aliases: $test_img for the fuzzed image, $off
312 for an offset, $len for length.
314 Values for $off and $len will be generated based on the virtual disk
315 size of the fuzzed image.
317 Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and
318 'QEMU_IO' environment variables.
320 '--config' accepts a JSON array of fields to be fuzzed, e.g.
321 '[["header"], ["header", "version"]]'.
323 Each of the list elements can consist of a complex image element only
324 as ["header"] or ["feature_name_table"] or an exact field as
325 ["header", "version"]. In the first case random portion of the element
326 fields will be fuzzed, in the second one the specified field will be
327 fuzzed always.
329 If '--config' argument is specified, fields not listed in
330 the configuration array will not be fuzzed.
331 """)
333 def run_test(test_id, seed, work_dir, run_log, cleanup, log_all,
334 command, fuzz_config):
335 """Setup environment for one test and execute this test."""
336 try:
337 test = TestEnv(test_id, seed, work_dir, run_log, cleanup,
338 log_all)
339 except TestException:
340 sys.exit(1)
342 # Python 2.4 doesn't support 'finally' and 'except' in the same 'try'
343 # block
344 try:
345 try:
346 test.execute(command, fuzz_config)
347 except TestException:
348 sys.exit(1)
349 finally:
350 test.finish()
352 def should_continue(duration, start_time):
353 """Return True if a new test can be started and False otherwise."""
354 current_time = int(time.time())
355 return (duration is None) or (current_time - start_time < duration)
357 try:
358 opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:',
359 ['command=', 'help', 'seed=', 'config=',
360 'keep_passed', 'verbose', 'duration='])
361 except getopt.error as e:
362 print("Error: %s\n\nTry 'runner.py --help' for more information" % e, file=sys.stderr)
363 sys.exit(1)
365 command = None
366 cleanup = True
367 log_all = False
368 seed = None
369 config = None
370 duration = None
371 for opt, arg in opts:
372 if opt in ('-h', '--help'):
373 usage()
374 sys.exit()
375 elif opt in ('-c', '--command'):
376 try:
377 command = json.loads(arg)
378 except (TypeError, ValueError, NameError) as e:
379 print("Error: JSON array of test commands cannot be loaded.\n" \
380 "Reason: %s" % e, file=sys.stderr)
381 sys.exit(1)
382 elif opt in ('-k', '--keep_passed'):
383 cleanup = False
384 elif opt in ('-v', '--verbose'):
385 log_all = True
386 elif opt in ('-s', '--seed'):
387 seed = arg
388 elif opt in ('-d', '--duration'):
389 duration = int(arg)
390 elif opt == '--config':
391 try:
392 config = json.loads(arg)
393 except (TypeError, ValueError, NameError) as e:
394 print("Error: JSON array with the fuzzer configuration cannot" \
395 " be loaded\nReason: %s" % e, file=sys.stderr)
396 sys.exit(1)
398 if not len(args) == 2:
399 print("Expected two parameters\nTry 'runner.py --help'" \
400 " for more information.", file=sys.stderr)
401 sys.exit(1)
403 work_dir = os.path.realpath(args[0])
404 # run_log is created in 'main', because multiple tests are expected to
405 # log in it
406 run_log = os.path.join(work_dir, 'run.log')
408 # Add the path to the image generator module to sys.path
409 sys.path.append(os.path.realpath(os.path.dirname(args[1])))
410 # Remove a script extension from image generator module if any
411 generator_name = os.path.splitext(os.path.basename(args[1]))[0]
413 try:
414 image_generator = __import__(generator_name)
415 except ImportError as e:
416 print("Error: The image generator '%s' cannot be imported.\n" \
417 "Reason: %s" % (generator_name, e), file=sys.stderr)
418 sys.exit(1)
420 # Enable core dumps
421 resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
422 # If a seed is specified, only one test will be executed.
423 # Otherwise runner will terminate after a keyboard interruption
424 start_time = int(time.time())
425 test_id = count(1)
426 while should_continue(duration, start_time):
427 try:
428 run_test(str(next(test_id)), seed, work_dir, run_log, cleanup,
429 log_all, command, config)
430 except (KeyboardInterrupt, SystemExit):
431 sys.exit(1)
433 if seed is not None:
434 break