virt.virt_test_utils: run_autotest - 'tar' needs relative paths to strip the leading '/'
[autotest-zwu.git] / client / common_lib / test.py
bloba1d68abbe7802626fc7a36286d902fb83be9bedf
1 # Shell class for a test, inherited by all individual tests
3 # Methods:
4 # __init__ initialise
5 # initialize run once for each job
6 # setup run once for each new version of the test installed
7 # run run the test (wrapped by job.run_test())
9 # Data:
10 # job backreference to the job this test instance is part of
11 # outputdir eg. results/<job>/<testname.tag>
12 # resultsdir eg. results/<job>/<testname.tag>/results
13 # profdir eg. results/<job>/<testname.tag>/profiling
14 # debugdir eg. results/<job>/<testname.tag>/debug
15 # bindir eg. tests/<test>
16 # src eg. tests/<test>/src
17 # tmpdir eg. tmp/<tempname>_<testname.tag>
19 import fcntl, getpass, os, re, sys, shutil, tarfile, tempfile, time, traceback
20 import warnings, logging, glob, resource
22 from autotest_lib.client.common_lib import error
23 from autotest_lib.client.bin import utils
26 class base_test(object):
27 preserve_srcdir = False
28 network_destabilizing = False
30 def __init__(self, job, bindir, outputdir):
31 self.job = job
32 self.pkgmgr = job.pkgmgr
33 self.autodir = job.autodir
34 self.outputdir = outputdir
35 self.tagged_testname = os.path.basename(self.outputdir)
36 self.resultsdir = os.path.join(self.outputdir, 'results')
37 os.mkdir(self.resultsdir)
38 self.profdir = os.path.join(self.outputdir, 'profiling')
39 os.mkdir(self.profdir)
40 self.debugdir = os.path.join(self.outputdir, 'debug')
41 os.mkdir(self.debugdir)
42 if getpass.getuser() == 'root':
43 self.configure_crash_handler()
44 else:
45 self.crash_handling_enabled = False
46 self.bindir = bindir
47 self.srcdir = os.path.join(self.bindir, 'src')
48 self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
49 dir=job.tmpdir)
50 self._keyvals = []
51 self._new_keyval = False
52 self.failed_constraints = []
53 self.iteration = 0
54 self.before_iteration_hooks = []
55 self.after_iteration_hooks = []
58 def configure_crash_handler(self):
59 pass
62 def crash_handler_report(self):
63 pass
66 def assert_(self, expr, msg='Assertion failed.'):
67 if not expr:
68 raise error.TestError(msg)
71 def write_test_keyval(self, attr_dict):
72 utils.write_keyval(self.outputdir, attr_dict,
73 tap_report=self.job._tap)
75 @staticmethod
76 def _append_type_to_keys(dictionary, typename):
77 new_dict = {}
78 for key, value in dictionary.iteritems():
79 new_key = "%s{%s}" % (key, typename)
80 new_dict[new_key] = value
81 return new_dict
84 def write_perf_keyval(self, perf_dict):
85 self.write_iteration_keyval({}, perf_dict,
86 tap_report=self.job._tap)
89 def write_attr_keyval(self, attr_dict):
90 self.write_iteration_keyval(attr_dict, {},
91 tap_report=self.job._tap)
94 def write_iteration_keyval(self, attr_dict, perf_dict, tap_report=None):
95 # append the dictionaries before they have the {perf} and {attr} added
96 self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
97 self._new_keyval = True
99 if attr_dict:
100 attr_dict = self._append_type_to_keys(attr_dict, "attr")
101 utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr",
102 tap_report=tap_report)
104 if perf_dict:
105 perf_dict = self._append_type_to_keys(perf_dict, "perf")
106 utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf",
107 tap_report=tap_report)
109 keyval_path = os.path.join(self.resultsdir, "keyval")
110 print >> open(keyval_path, "a"), ""
113 def analyze_perf_constraints(self, constraints):
114 if not self._new_keyval:
115 return
117 # create a dict from the keyvals suitable as an environment for eval
118 keyval_env = self._keyvals[-1]['perf'].copy()
119 keyval_env['__builtins__'] = None
120 self._new_keyval = False
121 failures = []
123 # evaluate each constraint using the current keyvals
124 for constraint in constraints:
125 logging.info('___________________ constraint = %s', constraint)
126 logging.info('___________________ keyvals = %s', keyval_env)
128 try:
129 if not eval(constraint, keyval_env):
130 failures.append('%s: constraint was not met' % constraint)
131 except:
132 failures.append('could not evaluate constraint: %s'
133 % constraint)
135 # keep track of the errors for each iteration
136 self.failed_constraints.append(failures)
139 def process_failed_constraints(self):
140 msg = ''
141 for i, failures in enumerate(self.failed_constraints):
142 if failures:
143 msg += 'iteration %d:%s ' % (i, ','.join(failures))
145 if msg:
146 raise error.TestFail(msg)
149 def register_before_iteration_hook(self, iteration_hook):
151 This is how we expect test writers to register a before_iteration_hook.
152 This adds the method to the list of hooks which are executed
153 before each iteration.
155 @param iteration_hook: Method to run before each iteration. A valid
156 hook accepts a single argument which is the
157 test object.
159 self.before_iteration_hooks.append(iteration_hook)
162 def register_after_iteration_hook(self, iteration_hook):
164 This is how we expect test writers to register an after_iteration_hook.
165 This adds the method to the list of hooks which are executed
166 after each iteration.
168 @param iteration_hook: Method to run after each iteration. A valid
169 hook accepts a single argument which is the
170 test object.
172 self.after_iteration_hooks.append(iteration_hook)
175 def initialize(self):
176 pass
179 def setup(self):
180 pass
183 def warmup(self, *args, **dargs):
184 pass
187 def drop_caches_between_iterations(self):
188 if self.job.drop_caches_between_iterations:
189 utils.drop_caches()
192 def _call_run_once(self, constraints, profile_only,
193 postprocess_profiled_run, args, dargs):
194 self.drop_caches_between_iterations()
196 # execute iteration hooks
197 for hook in self.before_iteration_hooks:
198 hook(self)
200 try:
201 if profile_only:
202 if not self.job.profilers.present():
203 self.job.record('WARN', None, None,
204 'No profilers have been added but '
205 'profile_only is set - nothing '
206 'will be run')
207 self.run_once_profiling(postprocess_profiled_run,
208 *args, **dargs)
209 else:
210 self.before_run_once()
211 self.run_once(*args, **dargs)
212 self.after_run_once()
214 self.postprocess_iteration()
215 self.analyze_perf_constraints(constraints)
216 finally:
217 for hook in self.after_iteration_hooks:
218 hook(self)
221 def execute(self, iterations=None, test_length=None, profile_only=None,
222 _get_time=time.time, postprocess_profiled_run=None,
223 constraints=(), *args, **dargs):
225 This is the basic execute method for the tests inherited from base_test.
226 If you want to implement a benchmark test, it's better to implement
227 the run_once function, to cope with the profiling infrastructure. For
228 other tests, you can just override the default implementation.
230 @param test_length: The minimum test length in seconds. We'll run the
231 run_once function for a number of times large enough to cover the
232 minimum test length.
234 @param iterations: A number of iterations that we'll run the run_once
235 function. This parameter is incompatible with test_length and will
236 be silently ignored if you specify both.
238 @param profile_only: If true run X iterations with profilers enabled.
239 If false run X iterations and one with profiling if profiles are
240 enabled. If None, default to the value of job.default_profile_only.
242 @param _get_time: [time.time] Used for unit test time injection.
244 @param postprocess_profiled_run: Run the postprocessing for the
245 profiled run.
248 # For our special class of tests, the benchmarks, we don't want
249 # profilers to run during the test iterations. Let's reserve only
250 # the last iteration for profiling, if needed. So let's stop
251 # all profilers if they are present and active.
252 profilers = self.job.profilers
253 if profilers.active():
254 profilers.stop(self)
255 if profile_only is None:
256 profile_only = self.job.default_profile_only
257 # If the user called this test in an odd way (specified both iterations
258 # and test_length), let's warn them.
259 if iterations and test_length:
260 logging.debug('Iterations parameter ignored (timed execution)')
261 if test_length:
262 test_start = _get_time()
263 time_elapsed = 0
264 timed_counter = 0
265 logging.debug('Test started. Specified %d s as the minimum test '
266 'length', test_length)
267 while time_elapsed < test_length:
268 timed_counter = timed_counter + 1
269 if time_elapsed == 0:
270 logging.debug('Executing iteration %d', timed_counter)
271 elif time_elapsed > 0:
272 logging.debug('Executing iteration %d, time_elapsed %d s',
273 timed_counter, time_elapsed)
274 self._call_run_once(constraints, profile_only,
275 postprocess_profiled_run, args, dargs)
276 test_iteration_finish = _get_time()
277 time_elapsed = test_iteration_finish - test_start
278 logging.debug('Test finished after %d iterations, '
279 'time elapsed: %d s', timed_counter, time_elapsed)
280 else:
281 if iterations is None:
282 iterations = 1
283 if iterations > 1:
284 logging.debug('Test started. Specified %d iterations',
285 iterations)
286 for self.iteration in xrange(1, iterations + 1):
287 if iterations > 1:
288 logging.debug('Executing iteration %d of %d',
289 self.iteration, iterations)
290 self._call_run_once(constraints, profile_only,
291 postprocess_profiled_run, args, dargs)
293 if not profile_only:
294 self.iteration += 1
295 self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
297 # Do any postprocessing, normally extracting performance keyvals, etc
298 self.postprocess()
299 self.process_failed_constraints()
302 def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
303 profilers = self.job.profilers
304 # Do a profiling run if necessary
305 if profilers.present():
306 self.drop_caches_between_iterations()
307 profilers.before_start(self)
309 self.before_run_once()
310 profilers.start(self)
311 logging.debug('Profilers present. Profiling run started')
313 try:
314 self.run_once(*args, **dargs)
316 # Priority to the run_once() argument over the attribute.
317 postprocess_attribute = getattr(self,
318 'postprocess_profiled_run',
319 False)
321 if (postprocess_profiled_run or
322 (postprocess_profiled_run is None and
323 postprocess_attribute)):
324 self.postprocess_iteration()
326 finally:
327 profilers.stop(self)
328 profilers.report(self)
330 self.after_run_once()
333 def postprocess(self):
334 pass
337 def postprocess_iteration(self):
338 pass
341 def cleanup(self):
342 pass
345 def before_run_once(self):
347 Override in tests that need it, will be called before any run_once()
348 call including the profiling run (when it's called before starting
349 the profilers).
351 pass
354 def after_run_once(self):
356 Called after every run_once (including from a profiled run when it's
357 called after stopping the profilers).
359 pass
362 def _exec(self, args, dargs):
363 self.job.logging.tee_redirect_debug_dir(self.debugdir,
364 log_name=self.tagged_testname)
365 try:
366 if self.network_destabilizing:
367 self.job.disable_warnings("NETWORK")
369 # write out the test attributes into a keyval
370 dargs = dargs.copy()
371 run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
372 keyvals = dargs.pop('test_attributes', {}).copy()
373 keyvals['version'] = self.version
374 for i, arg in enumerate(args):
375 keyvals['param-%d' % i] = repr(arg)
376 for name, arg in dargs.iteritems():
377 keyvals['param-%s' % name] = repr(arg)
378 self.write_test_keyval(keyvals)
380 _validate_args(args, dargs, self.initialize, self.setup,
381 self.execute, self.cleanup)
383 try:
384 # Initialize:
385 _cherry_pick_call(self.initialize, *args, **dargs)
387 lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
388 try:
389 fcntl.flock(lockfile, fcntl.LOCK_EX)
390 # Setup: (compile and install the test, if needed)
391 p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
392 utils.update_version(self.srcdir, self.preserve_srcdir,
393 self.version, self.setup,
394 *p_args, **p_dargs)
395 finally:
396 fcntl.flock(lockfile, fcntl.LOCK_UN)
397 lockfile.close()
399 # Execute:
400 os.chdir(self.outputdir)
402 # call self.warmup cherry picking the arguments it accepts and
403 # translate exceptions if needed
404 _call_test_function(_cherry_pick_call, self.warmup,
405 *args, **dargs)
407 if hasattr(self, 'run_once'):
408 p_args, p_dargs = _cherry_pick_args(self.run_once,
409 args, dargs)
410 # pull in any non-* and non-** args from self.execute
411 for param in _get_nonstar_args(self.execute):
412 if param in dargs:
413 p_dargs[param] = dargs[param]
414 else:
415 p_args, p_dargs = _cherry_pick_args(self.execute,
416 args, dargs)
418 _call_test_function(self.execute, *p_args, **p_dargs)
419 except Exception:
420 try:
421 logging.exception('Exception escaping from test:')
422 except:
423 pass # don't let logging exceptions here interfere
425 # Save the exception while we run our cleanup() before
426 # reraising it.
427 exc_info = sys.exc_info()
428 try:
429 try:
430 if run_cleanup:
431 _cherry_pick_call(self.cleanup, *args, **dargs)
432 except Exception:
433 logging.error('Ignoring exception during cleanup() phase:')
434 traceback.print_exc()
435 logging.error('Now raising the earlier %s error',
436 exc_info[0])
437 self.crash_handler_report()
438 finally:
439 self.job.logging.restore()
440 try:
441 raise exc_info[0], exc_info[1], exc_info[2]
442 finally:
443 # http://docs.python.org/library/sys.html#sys.exc_info
444 # Be nice and prevent a circular reference.
445 del exc_info
446 else:
447 try:
448 if run_cleanup:
449 _cherry_pick_call(self.cleanup, *args, **dargs)
450 self.crash_handler_report()
451 finally:
452 self.job.logging.restore()
453 except error.AutotestError:
454 if self.network_destabilizing:
455 self.job.enable_warnings("NETWORK")
456 # Pass already-categorized errors on up.
457 raise
458 except Exception, e:
459 if self.network_destabilizing:
460 self.job.enable_warnings("NETWORK")
461 # Anything else is an ERROR in our own code, not execute().
462 raise error.UnhandledTestError(e)
463 else:
464 if self.network_destabilizing:
465 self.job.enable_warnings("NETWORK")
468 def runsubtest(self, url, *args, **dargs):
470 Execute another autotest test from inside the current test's scope.
472 @param test: Parent test.
473 @param url: Url of new test.
474 @param tag: Tag added to test name.
475 @param args: Args for subtest.
476 @param dargs: Dictionary with args for subtest.
477 @iterations: Number of subtest iterations.
478 @profile_only: If true execute one profiled run.
480 dargs["profile_only"] = dargs.get("profile_only", False)
481 test_basepath = self.outputdir[len(self.job.resultdir + "/"):]
482 return self.job.run_test(url, master_testpath=test_basepath,
483 *args, **dargs)
486 def _get_nonstar_args(func):
487 """Extract all the (normal) function parameter names.
489 Given a function, returns a tuple of parameter names, specifically
490 excluding the * and ** parameters, if the function accepts them.
492 @param func: A callable that we want to chose arguments for.
494 @return: A tuple of parameters accepted by the function.
496 return func.func_code.co_varnames[:func.func_code.co_argcount]
499 def _cherry_pick_args(func, args, dargs):
500 """Sanitize positional and keyword arguments before calling a function.
502 Given a callable (func), an argument tuple and a dictionary of keyword
503 arguments, pick only those arguments which the function is prepared to
504 accept and return a new argument tuple and keyword argument dictionary.
506 Args:
507 func: A callable that we want to choose arguments for.
508 args: A tuple of positional arguments to consider passing to func.
509 dargs: A dictionary of keyword arguments to consider passing to func.
510 Returns:
511 A tuple of: (args tuple, keyword arguments dictionary)
513 # Cherry pick args:
514 if func.func_code.co_flags & 0x04:
515 # func accepts *args, so return the entire args.
516 p_args = args
517 else:
518 p_args = ()
520 # Cherry pick dargs:
521 if func.func_code.co_flags & 0x08:
522 # func accepts **dargs, so return the entire dargs.
523 p_dargs = dargs
524 else:
525 # Only return the keyword arguments that func accepts.
526 p_dargs = {}
527 for param in _get_nonstar_args(func):
528 if param in dargs:
529 p_dargs[param] = dargs[param]
531 return p_args, p_dargs
534 def _cherry_pick_call(func, *args, **dargs):
535 """Cherry picks arguments from args/dargs based on what "func" accepts
536 and calls the function with the picked arguments."""
537 p_args, p_dargs = _cherry_pick_args(func, args, dargs)
538 return func(*p_args, **p_dargs)
541 def _validate_args(args, dargs, *funcs):
542 """Verify that arguments are appropriate for at least one callable.
544 Given a list of callables as additional parameters, verify that
545 the proposed keyword arguments in dargs will each be accepted by at least
546 one of the callables.
548 NOTE: args is currently not supported and must be empty.
550 Args:
551 args: A tuple of proposed positional arguments.
552 dargs: A dictionary of proposed keyword arguments.
553 *funcs: Callables to be searched for acceptance of args and dargs.
554 Raises:
555 error.AutotestError: if an arg won't be accepted by any of *funcs.
557 all_co_flags = 0
558 all_varnames = ()
559 for func in funcs:
560 all_co_flags |= func.func_code.co_flags
561 all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
563 # Check if given args belongs to at least one of the methods below.
564 if len(args) > 0:
565 # Current implementation doesn't allow the use of args.
566 raise error.TestError('Unnamed arguments not accepted. Please '
567 'call job.run_test with named args only')
569 # Check if given dargs belongs to at least one of the methods below.
570 if len(dargs) > 0:
571 if not all_co_flags & 0x08:
572 # no func accepts *dargs, so:
573 for param in dargs:
574 if not param in all_varnames:
575 raise error.AutotestError('Unknown parameter: %s' % param)
578 def _installtest(job, url):
579 (group, name) = job.pkgmgr.get_package_name(url, 'test')
581 # Bail if the test is already installed
582 group_dir = os.path.join(job.testdir, "download", group)
583 if os.path.exists(os.path.join(group_dir, name)):
584 return (group, name)
586 # If the group directory is missing create it and add
587 # an empty __init__.py so that sub-directories are
588 # considered for import.
589 if not os.path.exists(group_dir):
590 os.makedirs(group_dir)
591 f = file(os.path.join(group_dir, '__init__.py'), 'w+')
592 f.close()
594 logging.debug("%s: installing test url=%s", name, url)
595 tarball = os.path.basename(url)
596 tarball_path = os.path.join(group_dir, tarball)
597 test_dir = os.path.join(group_dir, name)
598 job.pkgmgr.fetch_pkg(tarball, tarball_path,
599 repo_url = os.path.dirname(url))
601 # Create the directory for the test
602 if not os.path.exists(test_dir):
603 os.mkdir(os.path.join(group_dir, name))
605 job.pkgmgr.untar_pkg(tarball_path, test_dir)
607 os.remove(tarball_path)
609 # For this 'sub-object' to be importable via the name
610 # 'group.name' we need to provide an __init__.py,
611 # so link the main entry point to this.
612 os.symlink(name + '.py', os.path.join(group_dir, name,
613 '__init__.py'))
615 # The test is now installed.
616 return (group, name)
619 def _call_test_function(func, *args, **dargs):
620 """Calls a test function and translates exceptions so that errors
621 inside test code are considered test failures."""
622 try:
623 return func(*args, **dargs)
624 except error.AutotestError:
625 # Pass already-categorized errors on up as is.
626 raise
627 except Exception, e:
628 # Other exceptions must be treated as a FAIL when
629 # raised during the test functions
630 raise error.UnhandledTestFail(e)
633 def runtest(job, url, tag, args, dargs,
634 local_namespace={}, global_namespace={},
635 before_test_hook=None, after_test_hook=None,
636 before_iteration_hook=None, after_iteration_hook=None):
637 local_namespace = local_namespace.copy()
638 global_namespace = global_namespace.copy()
640 # if this is not a plain test name then download and install the
641 # specified test
642 if url.endswith('.tar.bz2'):
643 (testgroup, testname) = _installtest(job, url)
644 bindir = os.path.join(job.testdir, 'download', testgroup, testname)
645 importdir = os.path.join(job.testdir, 'download')
646 site_bindir = None
647 modulename = '%s.%s' % (re.sub('/', '.', testgroup), testname)
648 classname = '%s.%s' % (modulename, testname)
649 path = testname
650 else:
651 # If the test is local, it may be under either testdir or site_testdir.
652 # Tests in site_testdir override tests defined in testdir
653 testname = path = url
654 testgroup = ''
655 path = re.sub(':', '/', testname)
656 modulename = os.path.basename(path)
657 classname = '%s.%s' % (modulename, modulename)
659 # Try installing the test package
660 # The job object may be either a server side job or a client side job.
661 # 'install_pkg' method will be present only if it's a client side job.
662 if hasattr(job, 'install_pkg'):
663 try:
664 bindir = os.path.join(job.testdir, testname)
665 job.install_pkg(testname, 'test', bindir)
666 except error.PackageInstallError, e:
667 # continue as a fall back mechanism and see if the test code
668 # already exists on the machine
669 pass
671 bindir = testdir = None
672 for dir in [job.testdir, getattr(job, 'site_testdir', None)]:
673 if dir is not None and os.path.exists(os.path.join(dir, path)):
674 testdir = dir
675 importdir = bindir = os.path.join(dir, path)
676 if not bindir:
677 raise error.TestError(testname + ': test does not exist')
679 subdir = os.path.join(dargs.pop('master_testpath', ""), testname)
680 outputdir = os.path.join(job.resultdir, subdir)
681 if tag:
682 outputdir += '.' + tag
684 local_namespace['job'] = job
685 local_namespace['bindir'] = bindir
686 local_namespace['outputdir'] = outputdir
688 sys.path.insert(0, importdir)
689 try:
690 exec ('import %s' % modulename, local_namespace, global_namespace)
691 exec ("mytest = %s(job, bindir, outputdir)" % classname,
692 local_namespace, global_namespace)
693 finally:
694 sys.path.pop(0)
696 pwd = os.getcwd()
697 os.chdir(outputdir)
699 try:
700 mytest = global_namespace['mytest']
701 if before_test_hook:
702 before_test_hook(mytest)
704 # we use the register iteration hooks methods to register the passed
705 # in hooks
706 if before_iteration_hook:
707 mytest.register_before_iteration_hook(before_iteration_hook)
708 if after_iteration_hook:
709 mytest.register_after_iteration_hook(after_iteration_hook)
710 mytest._exec(args, dargs)
711 finally:
712 os.chdir(pwd)
713 if after_test_hook:
714 after_test_hook(mytest)
715 shutil.rmtree(mytest.tmpdir, ignore_errors=True)