3 # Tool for running fuzz tests
5 # Copyright (C) 2014 Maria Kustova <maria.k@catit.be>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
27 from itertools
import count
37 import simplejson
as json
40 "Warning: Module for JSON processing is not found.\n" \
41 "'--config' and '--command' options are not supported."
43 # Backing file sizes in MB
44 MAX_BACKING_FILE_SIZE
= 10
45 MIN_BACKING_FILE_SIZE
= 1
48 def multilog(msg
, *output
):
49 """ Write an object to all of specified file descriptors."""
56 """ Convert a numeric value of a system signal to the string one
57 defined by the current operational system.
59 for k
, v
in signal
.__dict
__.items():
64 def run_app(fd
, q_args
):
65 """Start an application with specified arguments and return its exit code
66 or kill signal depending on the result of execution.
69 class Alarm(Exception):
70 """Exception for signal.alarm events."""
74 """Notify that an alarm event occurred."""
77 signal
.signal(signal
.SIGALRM
, handler
)
79 term_signal
= signal
.SIGKILL
80 devnull
= open('/dev/null', 'r+')
81 process
= subprocess
.Popen(q_args
, stdin
=devnull
,
82 stdout
=subprocess
.PIPE
,
83 stderr
=subprocess
.PIPE
)
85 out
, err
= process
.communicate()
90 return process
.returncode
93 os
.kill(process
.pid
, term_signal
)
94 fd
.write('The command was terminated by timeout.\n')
99 class TestException(Exception):
100 """Exception for errors risen by TestEnv objects."""
104 class TestEnv(object):
108 The class sets up test environment, generates backing and test images
109 and executes application under tests with specified arguments and a test
112 All logs are collected.
114 The summary log will contain short descriptions and statuses of tests in
117 The test log will include application (e.g. 'qemu-img') logs besides info
118 sent to the summary log.
121 def __init__(self
, test_id
, seed
, work_dir
, run_log
,
122 cleanup
=True, log_all
=False):
123 """Set test environment in a specified work directory.
125 Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and
126 'QEMU_IO' environment variables.
131 self
.seed
= str(random
.randint(0, sys
.maxint
))
132 random
.seed(self
.seed
)
134 self
.init_path
= os
.getcwd()
135 self
.work_dir
= work_dir
136 self
.current_dir
= os
.path
.join(work_dir
, 'test-' + test_id
)
137 self
.qemu_img
= os
.environ
.get('QEMU_IMG', 'qemu-img')\
139 self
.qemu_io
= os
.environ
.get('QEMU_IO', 'qemu-io').strip().split(' ')
140 self
.commands
= [['qemu-img', 'check', '-f', 'qcow2', '$test_img'],
141 ['qemu-img', 'info', '-f', 'qcow2', '$test_img'],
142 ['qemu-io', '$test_img', '-c', 'read $off $len'],
143 ['qemu-io', '$test_img', '-c', 'write $off $len'],
144 ['qemu-io', '$test_img', '-c',
145 'aio_read $off $len'],
146 ['qemu-io', '$test_img', '-c',
147 'aio_write $off $len'],
148 ['qemu-io', '$test_img', '-c', 'flush'],
149 ['qemu-io', '$test_img', '-c',
150 'discard $off $len'],
151 ['qemu-io', '$test_img', '-c',
153 for fmt
in ['raw', 'vmdk', 'vdi', 'cow', 'qcow2', 'file',
155 self
.commands
.append(
156 ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt
,
157 '$test_img', 'converted_image.' + fmt
])
160 os
.makedirs(self
.current_dir
)
162 print >>sys
.stderr
, \
163 "Error: The working directory '%s' cannot be used. Reason: %s"\
164 % (self
.work_dir
, e
[1])
166 self
.log
= open(os
.path
.join(self
.current_dir
, "test.log"), "w")
167 self
.parent_log
= open(run_log
, "a")
169 self
.cleanup
= cleanup
170 self
.log_all
= log_all
172 def _create_backing_file(self
):
173 """Create a backing file in the current directory.
175 Return a tuple of a backing file name and format.
177 Format of a backing file is randomly chosen from all formats supported
178 by 'qemu-img create'.
180 # All formats supported by the 'qemu-img create' command.
181 backing_file_fmt
= random
.choice(['raw', 'vmdk', 'vdi', 'cow', 'qcow2',
182 'file', 'qed', 'vpc'])
183 backing_file_name
= 'backing_img.' + backing_file_fmt
184 backing_file_size
= random
.randint(MIN_BACKING_FILE_SIZE
,
185 MAX_BACKING_FILE_SIZE
) * (1 << 20)
186 cmd
= self
.qemu_img
+ ['create', '-f', backing_file_fmt
,
187 backing_file_name
, str(backing_file_size
)]
188 temp_log
= StringIO
.StringIO()
189 retcode
= run_app(temp_log
, cmd
)
192 return (backing_file_name
, backing_file_fmt
)
194 multilog("Warning: The %s backing file was not created.\n\n"
195 % backing_file_fmt
, sys
.stderr
, self
.log
, self
.parent_log
)
196 self
.log
.write("Log for the failure:\n" + temp_log
.getvalue() +
201 def execute(self
, input_commands
=None, fuzz_config
=None):
204 The method creates backing and test images, runs test app and analyzes
205 its exit status. If the application was killed by a signal, the test
208 if input_commands
is None:
209 commands
= self
.commands
211 commands
= input_commands
213 os
.chdir(self
.current_dir
)
214 backing_file_name
, backing_file_fmt
= self
._create
_backing
_file
()
215 img_size
= image_generator
.create_image('test.img',
219 for item
in commands
:
220 shutil
.copy('test.img', 'copy.img')
221 # 'off' and 'len' are multiple of the sector size
223 start
= random
.randrange(0, img_size
+ 1, sector_size
)
224 end
= random
.randrange(start
, img_size
+ 1, sector_size
)
226 if item
[0] == 'qemu-img':
227 current_cmd
= list(self
.qemu_img
)
228 elif item
[0] == 'qemu-io':
229 current_cmd
= list(self
.qemu_io
)
231 multilog("Warning: test command '%s' is not defined.\n" \
232 % item
[0], sys
.stderr
, self
.log
, self
.parent_log
)
234 # Replace all placeholders with their real values
237 .replace('$test_img', 'copy.img')
238 .replace('$off', str(start
))
239 .replace('$len', str(end
- start
)))
240 current_cmd
.append(c
)
242 # Log string with the test header
243 test_summary
= "Seed: %s\nCommand: %s\nTest directory: %s\n" \
244 "Backing file: %s\n" \
245 % (self
.seed
, " ".join(current_cmd
),
246 self
.current_dir
, backing_file_name
)
248 temp_log
= StringIO
.StringIO()
250 retcode
= run_app(temp_log
, current_cmd
)
252 multilog(test_summary
+ "Error: Start of '%s' failed. " \
253 "Reason: %s\n\n" % (os
.path
.basename(
254 current_cmd
[0]), e
[1]),
255 sys
.stderr
, self
.log
, self
.parent_log
)
259 self
.log
.write(temp_log
.getvalue())
260 multilog(test_summary
+ "FAIL: Test terminated by signal " +
261 "%s\n\n" % str_signal(-retcode
), sys
.stderr
, self
.log
,
266 self
.log
.write(temp_log
.getvalue())
267 multilog(test_summary
+ "PASS: Application exited with" +
268 " the code '%d'\n\n" % retcode
, sys
.stdout
,
269 self
.log
, self
.parent_log
)
271 os
.remove('copy.img')
274 """Restore the test environment after a test execution."""
276 self
.parent_log
.close()
277 os
.chdir(self
.init_path
)
278 if self
.cleanup
and not self
.failed
:
279 shutil
.rmtree(self
.current_dir
)
281 if __name__
== '__main__':
285 Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR
287 Set up test environment in TEST_DIR and run a test in it. A module for
288 test image generation should be specified via IMG_GENERATOR.
290 runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2
293 -h, --help display this help and exit
294 -d, --duration=NUMBER finish tests after NUMBER of seconds
295 -c, --command=JSON run tests for all commands specified in
297 -s, --seed=STRING seed for a test image generation,
298 by default will be generated randomly
299 --config=JSON take fuzzer configuration from the JSON
301 -k, --keep_passed don't remove folders of passed tests
302 -v, --verbose log information about passed tests
306 '--command' accepts a JSON array of commands. Each command presents
307 an application under test with all its paramaters as a list of strings,
309 ["qemu-io", "$test_img", "-c", "write $off $len"]
311 Supported application aliases: 'qemu-img' and 'qemu-io'.
312 Supported argument aliases: $test_img for the fuzzed image, $off
313 for an offset, $len for length.
315 Values for $off and $len will be generated based on the virtual disk
316 size of the fuzzed image
317 Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and
318 'QEMU_IO' environment variables
320 '--config' accepts a JSON array of fields to be fuzzed, e.g.
321 '[["header"], ["header", "version"]]'
322 Each of the list elements can consist of a complex image element only
323 as ["header"] or ["feature_name_table"] or an exact field as
324 ["header", "version"]. In the first case random portion of the element
325 fields will be fuzzed, in the second one the specified field will be
328 If '--config' argument is specified, fields not listed in
329 the configuration array will not be fuzzed.
332 def run_test(test_id
, seed
, work_dir
, run_log
, cleanup
, log_all
,
333 command
, fuzz_config
):
334 """Setup environment for one test and execute this test."""
336 test
= TestEnv(test_id
, seed
, work_dir
, run_log
, cleanup
,
338 except TestException
:
341 # Python 2.4 doesn't support 'finally' and 'except' in the same 'try'
345 test
.execute(command
, fuzz_config
)
346 except TestException
:
351 def should_continue(duration
, start_time
):
352 """Return True if a new test can be started and False otherwise."""
353 current_time
= int(time
.time())
354 return (duration
is None) or (current_time
- start_time
< duration
)
357 opts
, args
= getopt
.gnu_getopt(sys
.argv
[1:], 'c:hs:kvd:',
358 ['command=', 'help', 'seed=', 'config=',
359 'keep_passed', 'verbose', 'duration='])
360 except getopt
.error
, e
:
361 print >>sys
.stderr
, \
362 "Error: %s\n\nTry 'runner.py --help' for more information" % e
372 for opt
, arg
in opts
:
373 if opt
in ('-h', '--help'):
376 elif opt
in ('-c', '--command'):
378 command
= json
.loads(arg
)
379 except (TypeError, ValueError, NameError), e
:
380 print >>sys
.stderr
, \
381 "Error: JSON array of test commands cannot be loaded.\n" \
384 elif opt
in ('-k', '--keep_passed'):
386 elif opt
in ('-v', '--verbose'):
388 elif opt
in ('-s', '--seed'):
390 elif opt
in ('-d', '--duration'):
392 elif opt
== '--config':
394 config
= json
.loads(arg
)
395 except (TypeError, ValueError, NameError), e
:
396 print >>sys
.stderr
, \
397 "Error: JSON array with the fuzzer configuration cannot" \
398 " be loaded\nReason: %s" % e
401 if not len(args
) == 2:
402 print >>sys
.stderr
, \
403 "Expected two parameters\nTry 'runner.py --help'" \
404 " for more information."
407 work_dir
= os
.path
.realpath(args
[0])
408 # run_log is created in 'main', because multiple tests are expected to
410 run_log
= os
.path
.join(work_dir
, 'run.log')
412 # Add the path to the image generator module to sys.path
413 sys
.path
.append(os
.path
.realpath(os
.path
.dirname(args
[1])))
414 # Remove a script extension from image generator module if any
415 generator_name
= os
.path
.splitext(os
.path
.basename(args
[1]))[0]
418 image_generator
= __import__(generator_name
)
419 except ImportError, e
:
420 print >>sys
.stderr
, \
421 "Error: The image generator '%s' cannot be imported.\n" \
422 "Reason: %s" % (generator_name
, e
)
426 resource
.setrlimit(resource
.RLIMIT_CORE
, (-1, -1))
427 # If a seed is specified, only one test will be executed.
428 # Otherwise runner will terminate after a keyboard interruption
429 start_time
= int(time
.time())
431 while should_continue(duration
, start_time
):
433 run_test(str(test_id
.next()), seed
, work_dir
, run_log
, cleanup
,
434 log_all
, command
, config
)
435 except (KeyboardInterrupt, SystemExit):