Merge remote-tracking branch 'remotes/mst/tags/for_upstream' into staging
[qemu/ar7.git] / tests / image-fuzzer / runner.py
blob2fc010fd9d7b2548e2dac58a3b1295d217347baf
1 #!/usr/bin/env python3
3 # Tool for running fuzz tests
5 # Copyright (C) 2014 Maria Kustova <maria.k@catit.be>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import sys
22 import os
23 import signal
24 import subprocess
25 import random
26 import shutil
27 from itertools import count
28 import time
29 import getopt
30 import io
31 import resource
33 try:
34 import json
35 except ImportError:
36 try:
37 import simplejson as json
38 except ImportError:
39 print("Warning: Module for JSON processing is not found.\n" \
40 "'--config' and '--command' options are not supported.", file=sys.stderr)
42 # Backing file sizes in MB
43 MAX_BACKING_FILE_SIZE = 10
44 MIN_BACKING_FILE_SIZE = 1
47 def multilog(msg, *output):
48 """ Write an object to all of specified file descriptors."""
49 for fd in output:
50 fd.write(msg)
51 fd.flush()
54 def str_signal(sig):
55 """ Convert a numeric value of a system signal to the string one
56 defined by the current operational system.
57 """
58 for k, v in signal.__dict__.items():
59 if v == sig:
60 return k
63 def run_app(fd, q_args):
64 """Start an application with specified arguments and return its exit code
65 or kill signal depending on the result of execution.
66 """
68 class Alarm(Exception):
69 """Exception for signal.alarm events."""
70 pass
72 def handler(*args):
73 """Notify that an alarm event occurred."""
74 raise Alarm
76 signal.signal(signal.SIGALRM, handler)
77 signal.alarm(600)
78 term_signal = signal.SIGKILL
79 devnull = open('/dev/null', 'r+')
80 process = subprocess.Popen(q_args, stdin=devnull,
81 stdout=subprocess.PIPE,
82 stderr=subprocess.PIPE,
83 errors='replace')
84 try:
85 out, err = process.communicate()
86 signal.alarm(0)
87 fd.write(out)
88 fd.write(err)
89 fd.flush()
90 return process.returncode
92 except Alarm:
93 os.kill(process.pid, term_signal)
94 fd.write('The command was terminated by timeout.\n')
95 fd.flush()
96 return -term_signal
99 class TestException(Exception):
100 """Exception for errors risen by TestEnv objects."""
101 pass
104 class TestEnv(object):
106 """Test object.
108 The class sets up test environment, generates backing and test images
109 and executes application under tests with specified arguments and a test
110 image provided.
112 All logs are collected.
114 The summary log will contain short descriptions and statuses of tests in
115 a run.
117 The test log will include application (e.g. 'qemu-img') logs besides info
118 sent to the summary log.
121 def __init__(self, test_id, seed, work_dir, run_log,
122 cleanup=True, log_all=False):
123 """Set test environment in a specified work directory.
125 Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and
126 'QEMU_IO' environment variables.
128 if seed is not None:
129 self.seed = seed
130 else:
131 self.seed = str(random.randint(0, sys.maxsize))
132 random.seed(self.seed)
134 self.init_path = os.getcwd()
135 self.work_dir = work_dir
136 self.current_dir = os.path.join(work_dir, 'test-' + test_id)
137 self.qemu_img = \
138 os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ')
139 self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ')
140 self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'],
141 ['qemu-img', 'info', '-f', 'qcow2', '$test_img'],
142 ['qemu-io', '$test_img', '-c', 'read $off $len'],
143 ['qemu-io', '$test_img', '-c', 'write $off $len'],
144 ['qemu-io', '$test_img', '-c',
145 'aio_read $off $len'],
146 ['qemu-io', '$test_img', '-c',
147 'aio_write $off $len'],
148 ['qemu-io', '$test_img', '-c', 'flush'],
149 ['qemu-io', '$test_img', '-c',
150 'discard $off $len'],
151 ['qemu-io', '$test_img', '-c',
152 'truncate $off']]
153 for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']:
154 self.commands.append(
155 ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt,
156 '$test_img', 'converted_image.' + fmt])
158 try:
159 os.makedirs(self.current_dir)
160 except OSError as e:
161 print("Error: The working directory '%s' cannot be used. Reason: %s"\
162 % (self.work_dir, e.strerror), file=sys.stderr)
163 raise TestException
164 self.log = open(os.path.join(self.current_dir, "test.log"), "w")
165 self.parent_log = open(run_log, "a")
166 self.failed = False
167 self.cleanup = cleanup
168 self.log_all = log_all
170 def _create_backing_file(self):
171 """Create a backing file in the current directory.
173 Return a tuple of a backing file name and format.
175 Format of a backing file is randomly chosen from all formats supported
176 by 'qemu-img create'.
178 # All formats supported by the 'qemu-img create' command.
179 backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2',
180 'file', 'qed', 'vpc'])
181 backing_file_name = 'backing_img.' + backing_file_fmt
182 backing_file_size = random.randint(MIN_BACKING_FILE_SIZE,
183 MAX_BACKING_FILE_SIZE) * (1 << 20)
184 cmd = self.qemu_img + ['create', '-f', backing_file_fmt,
185 backing_file_name, str(backing_file_size)]
186 temp_log = io.StringIO()
187 retcode = run_app(temp_log, cmd)
188 if retcode == 0:
189 temp_log.close()
190 return (backing_file_name, backing_file_fmt)
191 else:
192 multilog("Warning: The %s backing file was not created.\n\n"
193 % backing_file_fmt, sys.stderr, self.log, self.parent_log)
194 self.log.write("Log for the failure:\n" + temp_log.getvalue() +
195 '\n\n')
196 temp_log.close()
197 return (None, None)
199 def execute(self, input_commands=None, fuzz_config=None):
200 """ Execute a test.
202 The method creates backing and test images, runs test app and analyzes
203 its exit status. If the application was killed by a signal, the test
204 is marked as failed.
206 if input_commands is None:
207 commands = self.commands
208 else:
209 commands = input_commands
211 os.chdir(self.current_dir)
212 backing_file_name, backing_file_fmt = self._create_backing_file()
213 img_size = image_generator.create_image(
214 'test.img', backing_file_name, backing_file_fmt, fuzz_config)
215 for item in commands:
216 shutil.copy('test.img', 'copy.img')
217 # 'off' and 'len' are multiple of the sector size
218 sector_size = 512
219 start = random.randrange(0, img_size + 1, sector_size)
220 end = random.randrange(start, img_size + 1, sector_size)
222 if item[0] == 'qemu-img':
223 current_cmd = list(self.qemu_img)
224 elif item[0] == 'qemu-io':
225 current_cmd = list(self.qemu_io)
226 else:
227 multilog("Warning: test command '%s' is not defined.\n"
228 % item[0], sys.stderr, self.log, self.parent_log)
229 continue
230 # Replace all placeholders with their real values
231 for v in item[1:]:
232 c = (v
233 .replace('$test_img', 'copy.img')
234 .replace('$off', str(start))
235 .replace('$len', str(end - start)))
236 current_cmd.append(c)
238 # Log string with the test header
239 test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \
240 "Backing file: %s\n" \
241 % (self.seed, " ".join(current_cmd),
242 self.current_dir, backing_file_name)
243 temp_log = io.StringIO()
244 try:
245 retcode = run_app(temp_log, current_cmd)
246 except OSError as e:
247 multilog("%sError: Start of '%s' failed. Reason: %s\n\n"
248 % (test_summary, os.path.basename(current_cmd[0]),
249 e.strerror),
250 sys.stderr, self.log, self.parent_log)
251 raise TestException
253 if retcode < 0:
254 self.log.write(temp_log.getvalue())
255 multilog("%sFAIL: Test terminated by signal %s\n\n"
256 % (test_summary, str_signal(-retcode)),
257 sys.stderr, self.log, self.parent_log)
258 self.failed = True
259 else:
260 if self.log_all:
261 self.log.write(temp_log.getvalue())
262 multilog("%sPASS: Application exited with the code " \
263 "'%d'\n\n" % (test_summary, retcode),
264 sys.stdout, self.log, self.parent_log)
265 temp_log.close()
266 os.remove('copy.img')
268 def finish(self):
269 """Restore the test environment after a test execution."""
270 self.log.close()
271 self.parent_log.close()
272 os.chdir(self.init_path)
273 if self.cleanup and not self.failed:
274 shutil.rmtree(self.current_dir)
276 if __name__ == '__main__':
278 def usage():
279 print("""
280 Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR
282 Set up test environment in TEST_DIR and run a test in it. A module for
283 test image generation should be specified via IMG_GENERATOR.
285 Example:
286 runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2
288 Optional arguments:
289 -h, --help display this help and exit
290 -d, --duration=NUMBER finish tests after NUMBER of seconds
291 -c, --command=JSON run tests for all commands specified in
292 the JSON array
293 -s, --seed=STRING seed for a test image generation,
294 by default will be generated randomly
295 --config=JSON take fuzzer configuration from the JSON
296 array
297 -k, --keep_passed don't remove folders of passed tests
298 -v, --verbose log information about passed tests
300 JSON:
302 '--command' accepts a JSON array of commands. Each command presents
303 an application under test with all its parameters as a list of strings,
304 e.g. ["qemu-io", "$test_img", "-c", "write $off $len"].
306 Supported application aliases: 'qemu-img' and 'qemu-io'.
308 Supported argument aliases: $test_img for the fuzzed image, $off
309 for an offset, $len for length.
311 Values for $off and $len will be generated based on the virtual disk
312 size of the fuzzed image.
314 Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and
315 'QEMU_IO' environment variables.
317 '--config' accepts a JSON array of fields to be fuzzed, e.g.
318 '[["header"], ["header", "version"]]'.
320 Each of the list elements can consist of a complex image element only
321 as ["header"] or ["feature_name_table"] or an exact field as
322 ["header", "version"]. In the first case random portion of the element
323 fields will be fuzzed, in the second one the specified field will be
324 fuzzed always.
326 If '--config' argument is specified, fields not listed in
327 the configuration array will not be fuzzed.
328 """)
330 def run_test(test_id, seed, work_dir, run_log, cleanup, log_all,
331 command, fuzz_config):
332 """Setup environment for one test and execute this test."""
333 try:
334 test = TestEnv(test_id, seed, work_dir, run_log, cleanup,
335 log_all)
336 except TestException:
337 sys.exit(1)
339 # Python 2.4 doesn't support 'finally' and 'except' in the same 'try'
340 # block
341 try:
342 try:
343 test.execute(command, fuzz_config)
344 except TestException:
345 sys.exit(1)
346 finally:
347 test.finish()
349 def should_continue(duration, start_time):
350 """Return True if a new test can be started and False otherwise."""
351 current_time = int(time.time())
352 return (duration is None) or (current_time - start_time < duration)
354 try:
355 opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:',
356 ['command=', 'help', 'seed=', 'config=',
357 'keep_passed', 'verbose', 'duration='])
358 except getopt.error as e:
359 print("Error: %s\n\nTry 'runner.py --help' for more information" % e, file=sys.stderr)
360 sys.exit(1)
362 command = None
363 cleanup = True
364 log_all = False
365 seed = None
366 config = None
367 duration = None
368 for opt, arg in opts:
369 if opt in ('-h', '--help'):
370 usage()
371 sys.exit()
372 elif opt in ('-c', '--command'):
373 try:
374 command = json.loads(arg)
375 except (TypeError, ValueError, NameError) as e:
376 print("Error: JSON array of test commands cannot be loaded.\n" \
377 "Reason: %s" % e, file=sys.stderr)
378 sys.exit(1)
379 elif opt in ('-k', '--keep_passed'):
380 cleanup = False
381 elif opt in ('-v', '--verbose'):
382 log_all = True
383 elif opt in ('-s', '--seed'):
384 seed = arg
385 elif opt in ('-d', '--duration'):
386 duration = int(arg)
387 elif opt == '--config':
388 try:
389 config = json.loads(arg)
390 except (TypeError, ValueError, NameError) as e:
391 print("Error: JSON array with the fuzzer configuration cannot" \
392 " be loaded\nReason: %s" % e, file=sys.stderr)
393 sys.exit(1)
395 if not len(args) == 2:
396 print("Expected two parameters\nTry 'runner.py --help'" \
397 " for more information.", file=sys.stderr)
398 sys.exit(1)
400 work_dir = os.path.realpath(args[0])
401 # run_log is created in 'main', because multiple tests are expected to
402 # log in it
403 run_log = os.path.join(work_dir, 'run.log')
405 # Add the path to the image generator module to sys.path
406 sys.path.append(os.path.realpath(os.path.dirname(args[1])))
407 # Remove a script extension from image generator module if any
408 generator_name = os.path.splitext(os.path.basename(args[1]))[0]
410 try:
411 image_generator = __import__(generator_name)
412 except ImportError as e:
413 print("Error: The image generator '%s' cannot be imported.\n" \
414 "Reason: %s" % (generator_name, e), file=sys.stderr)
415 sys.exit(1)
417 # Enable core dumps
418 resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
419 # If a seed is specified, only one test will be executed.
420 # Otherwise runner will terminate after a keyboard interruption
421 start_time = int(time.time())
422 test_id = count(1)
423 while should_continue(duration, start_time):
424 try:
425 run_test(str(next(test_id)), seed, work_dir, run_log, cleanup,
426 log_all, command, config)
427 except (KeyboardInterrupt, SystemExit):
428 sys.exit(1)
430 if seed is not None:
431 break