client: pass run_test() timeout parameter to harness
[autotest-zwu.git] / client / bin / job.py
blob97412c0009d8b8ddf9192add07c0ea4f1236c519
1 """The main job wrapper
3 This is the core infrastructure.
5 Copyright Andy Whitcroft, Martin J. Bligh 2006
6 """
8 import copy, os, platform, re, shutil, sys, time, traceback, types, glob
9 import logging, getpass, errno, weakref
10 import cPickle as pickle
11 from autotest_lib.client.bin import client_logging_config
12 from autotest_lib.client.bin import utils, parallel, kernel, xen
13 from autotest_lib.client.bin import profilers, boottool, harness
14 from autotest_lib.client.bin import config, sysinfo, test, local_host
15 from autotest_lib.client.bin import partition as partition_lib
16 from autotest_lib.client.common_lib import base_job
17 from autotest_lib.client.common_lib import error, barrier, log, logging_manager
18 from autotest_lib.client.common_lib import base_packages, packages
19 from autotest_lib.client.common_lib import global_config
22 LAST_BOOT_TAG = object()
23 JOB_PREAMBLE = """
24 from autotest_lib.client.common_lib.error import *
25 from autotest_lib.client.bin.utils import *
26 """
29 class StepError(error.AutotestError):
30 pass
32 class NotAvailableError(error.AutotestError):
33 pass
37 def _run_test_complete_on_exit(f):
38 """Decorator for job methods that automatically calls
39 self.harness.run_test_complete when the method exits, if appropriate."""
40 def wrapped(self, *args, **dargs):
41 try:
42 return f(self, *args, **dargs)
43 finally:
44 if self._logger.global_filename == 'status':
45 self.harness.run_test_complete()
46 if self.drop_caches:
47 logging.debug("Dropping caches")
48 utils.drop_caches()
49 wrapped.__name__ = f.__name__
50 wrapped.__doc__ = f.__doc__
51 wrapped.__dict__.update(f.__dict__)
52 return wrapped
55 class status_indenter(base_job.status_indenter):
56 """Provide a status indenter that is backed by job._record_prefix."""
57 def __init__(self, job):
58 self.job = weakref.proxy(job) # avoid a circular reference
61 @property
62 def indent(self):
63 return self.job._record_indent
66 def increment(self):
67 self.job._record_indent += 1
70 def decrement(self):
71 self.job._record_indent -= 1
74 class base_client_job(base_job.base_job):
75 """The client-side concrete implementation of base_job.
77 Optional properties provided by this implementation:
78 control
79 bootloader
80 harness
81 """
83 _WARNING_DISABLE_DELAY = 5
85 # _record_indent is a persistent property, but only on the client
86 _job_state = base_job.base_job._job_state
87 _record_indent = _job_state.property_factory(
88 '_state', '_record_indent', 0, namespace='client')
89 _max_disk_usage_rate = _job_state.property_factory(
90 '_state', '_max_disk_usage_rate', 0.0, namespace='client')
93 def __init__(self, control, options, drop_caches=True,
94 extra_copy_cmdline=None):
95 """
96 Prepare a client side job object.
98 @param control: The control file (pathname of).
99 @param options: an object which includes:
100 jobtag: The job tag string (eg "default").
101 cont: If this is the continuation of this job.
102 harness_type: An alternative server harness. [None]
103 use_external_logging: If true, the enable_external_logging
104 method will be called during construction. [False]
105 @param drop_caches: If true, utils.drop_caches() is called before and
106 between all tests. [True]
107 @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
108 copy from the running kernel to all the installed kernels with
109 this job
111 super(base_client_job, self).__init__(options=options)
112 self._pre_record_init(control, options)
113 try:
114 self._post_record_init(control, options, drop_caches,
115 extra_copy_cmdline)
116 except Exception, err:
117 self.record(
118 'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
119 str(err))
120 raise
123 @classmethod
124 def _get_environ_autodir(cls):
125 return os.environ['AUTODIR']
128 @classmethod
129 def _find_base_directories(cls):
131 Determine locations of autodir and clientdir (which are the same)
132 using os.environ. Serverdir does not exist in this context.
134 autodir = clientdir = cls._get_environ_autodir()
135 return autodir, clientdir, None
138 @classmethod
139 def _parse_args(cls, args):
140 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
143 def _find_resultdir(self, options):
145 Determine the directory for storing results. On a client this is
146 always <autodir>/results/<tag>, where tag is passed in on the command
147 line as an option.
149 return os.path.join(self.autodir, 'results', options.tag)
152 def _get_status_logger(self):
153 """Return a reference to the status logger."""
154 return self._logger
157 def _pre_record_init(self, control, options):
159 Initialization function that should peform ONLY the required
160 setup so that the self.record() method works.
162 As of now self.record() needs self.resultdir, self._group_level,
163 self.harness and of course self._logger.
165 if not options.cont:
166 self._cleanup_debugdir_files()
167 self._cleanup_results_dir()
169 logging_manager.configure_logging(
170 client_logging_config.ClientLoggingConfig(),
171 results_dir=self.resultdir,
172 verbose=options.verbose)
173 logging.info('Writing results to %s', self.resultdir)
175 # init_group_level needs the state
176 self.control = os.path.realpath(control)
177 self._is_continuation = options.cont
178 self._current_step_ancestry = []
179 self._next_step_index = 0
180 self._load_state()
182 _harness = self.handle_persistent_option(options, 'harness')
183 _harness_args = self.handle_persistent_option(options, 'harness_args')
185 self.harness = harness.select(_harness, self, _harness_args)
187 # set up the status logger
188 def client_job_record_hook(entry):
189 msg_tag = ''
190 if '.' in self._logger.global_filename:
191 msg_tag = self._logger.global_filename.split('.', 1)[1]
192 # send the entry to the job harness
193 message = '\n'.join([entry.message] + entry.extra_message_lines)
194 rendered_entry = self._logger.render_entry(entry)
195 self.harness.test_status_detail(entry.status_code, entry.subdir,
196 entry.operation, message, msg_tag,
197 entry.fields)
198 self.harness.test_status(rendered_entry, msg_tag)
199 # send the entry to stdout, if it's enabled
200 logging.info(rendered_entry)
201 self._logger = base_job.status_logger(
202 self, status_indenter(self), record_hook=client_job_record_hook,
203 tap_writer=self._tap)
205 def _post_record_init(self, control, options, drop_caches,
206 extra_copy_cmdline):
208 Perform job initialization not required by self.record().
210 self._init_drop_caches(drop_caches)
212 self._init_packages()
214 self.sysinfo = sysinfo.sysinfo(self.resultdir)
215 self._load_sysinfo_state()
217 if not options.cont:
218 download = os.path.join(self.testdir, 'download')
219 if not os.path.exists(download):
220 os.mkdir(download)
222 shutil.copyfile(self.control,
223 os.path.join(self.resultdir, 'control'))
225 self.control = control
227 self.logging = logging_manager.get_logging_manager(
228 manage_stdout_and_stderr=True, redirect_fds=True)
229 self.logging.start_logging()
231 self._config = config.config(self)
232 self.profilers = profilers.profilers(self)
234 self._init_bootloader()
236 self.machines = [options.hostname]
237 self.hosts = set([local_host.LocalHost(hostname=options.hostname,
238 bootloader=self.bootloader)])
240 self.args = []
241 if options.args:
242 self.args = self._parse_args(options.args)
244 if options.user:
245 self.user = options.user
246 else:
247 self.user = getpass.getuser()
249 self.sysinfo.log_per_reboot_data()
251 if not options.cont:
252 self.record('START', None, None)
254 self.harness.run_start()
256 if options.log:
257 self.enable_external_logging()
259 self._init_cmdline(extra_copy_cmdline)
261 self.num_tests_run = None
262 self.num_tests_failed = None
264 self.warning_loggers = None
265 self.warning_manager = None
268 def _init_drop_caches(self, drop_caches):
270 Perform the drop caches initialization.
272 self.drop_caches_between_iterations = (
273 global_config.global_config.get_config_value('CLIENT',
274 'drop_caches_between_iterations',
275 type=bool, default=True))
276 self.drop_caches = drop_caches
277 if self.drop_caches:
278 logging.debug("Dropping caches")
279 utils.drop_caches()
282 def _init_bootloader(self):
284 Perform boottool initialization.
286 tool = self.config_get('boottool.executable')
287 self.bootloader = boottool.boottool(tool)
290 def _init_packages(self):
292 Perform the packages support initialization.
294 self.pkgmgr = packages.PackageManager(
295 self.autodir, run_function_dargs={'timeout':3600})
298 def _init_cmdline(self, extra_copy_cmdline):
300 Initialize default cmdline for booted kernels in this job.
302 copy_cmdline = set(['console'])
303 if extra_copy_cmdline is not None:
304 copy_cmdline.update(extra_copy_cmdline)
306 # extract console= and other args from cmdline and add them into the
307 # base args that we use for all kernels we install
308 cmdline = utils.read_one_line('/proc/cmdline')
309 kernel_args = []
310 for karg in cmdline.split():
311 for param in copy_cmdline:
312 if karg.startswith(param) and \
313 (len(param) == len(karg) or karg[len(param)] == '='):
314 kernel_args.append(karg)
315 self.config_set('boot.default_args', ' '.join(kernel_args))
318 def _cleanup_results_dir(self):
319 """Delete everything in resultsdir"""
320 assert os.path.exists(self.resultdir)
321 list_files = glob.glob('%s/*' % self.resultdir)
322 for f in list_files:
323 if os.path.isdir(f):
324 shutil.rmtree(f)
325 elif os.path.isfile(f):
326 os.remove(f)
329 def _cleanup_debugdir_files(self):
331 Delete any leftover debugdir files
333 list_files = glob.glob("/tmp/autotest_results_dir.*")
334 for f in list_files:
335 os.remove(f)
338 def disable_warnings(self, warning_type):
339 self.record("INFO", None, None,
340 "disabling %s warnings" % warning_type,
341 {"warnings.disable": warning_type})
342 time.sleep(self._WARNING_DISABLE_DELAY)
345 def enable_warnings(self, warning_type):
346 time.sleep(self._WARNING_DISABLE_DELAY)
347 self.record("INFO", None, None,
348 "enabling %s warnings" % warning_type,
349 {"warnings.enable": warning_type})
352 def monitor_disk_usage(self, max_rate):
353 """\
354 Signal that the job should monitor disk space usage on /
355 and generate a warning if a test uses up disk space at a
356 rate exceeding 'max_rate'.
358 Parameters:
359 max_rate - the maximium allowed rate of disk consumption
360 during a test, in MB/hour, or 0 to indicate
361 no limit.
363 self._max_disk_usage_rate = max_rate
366 def relative_path(self, path):
367 """\
368 Return a patch relative to the job results directory
370 head = len(self.resultdir) + 1 # remove the / inbetween
371 return path[head:]
374 def control_get(self):
375 return self.control
378 def control_set(self, control):
379 self.control = os.path.abspath(control)
382 def harness_select(self, which, harness_args):
383 self.harness = harness.select(which, self, harness_args)
386 def config_set(self, name, value):
387 self._config.set(name, value)
390 def config_get(self, name):
391 return self._config.get(name)
394 def setup_dirs(self, results_dir, tmp_dir):
395 if not tmp_dir:
396 tmp_dir = os.path.join(self.tmpdir, 'build')
397 if not os.path.exists(tmp_dir):
398 os.mkdir(tmp_dir)
399 if not os.path.isdir(tmp_dir):
400 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
401 raise ValueError(e_msg)
403 # We label the first build "build" and then subsequent ones
404 # as "build.2", "build.3", etc. Whilst this is a little bit
405 # inconsistent, 99.9% of jobs will only have one build
406 # (that's not done as kernbench, sparse, or buildtest),
407 # so it works out much cleaner. One of life's comprimises.
408 if not results_dir:
409 results_dir = os.path.join(self.resultdir, 'build')
410 i = 2
411 while os.path.exists(results_dir):
412 results_dir = os.path.join(self.resultdir, 'build.%d' % i)
413 i += 1
414 if not os.path.exists(results_dir):
415 os.mkdir(results_dir)
417 return (results_dir, tmp_dir)
420 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
421 kjob = None ):
422 """Summon a xen object"""
423 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
424 build_dir = 'xen'
425 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
426 leave, kjob)
429 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
430 """Summon a kernel object"""
431 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
432 build_dir = 'linux'
433 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
434 build_dir, leave)
437 def barrier(self, *args, **kwds):
438 """Create a barrier object"""
439 return barrier.barrier(*args, **kwds)
442 def install_pkg(self, name, pkg_type, install_dir):
444 This method is a simple wrapper around the actual package
445 installation method in the Packager class. This is used
446 internally by the profilers, deps and tests code.
447 name : name of the package (ex: sleeptest, dbench etc.)
448 pkg_type : Type of the package (ex: test, dep etc.)
449 install_dir : The directory in which the source is actually
450 untarred into. (ex: client/profilers/<name> for profilers)
452 if self.pkgmgr.repositories:
453 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
456 def add_repository(self, repo_urls):
458 Adds the repository locations to the job so that packages
459 can be fetched from them when needed. The repository list
460 needs to be a string list
461 Ex: job.add_repository(['http://blah1','http://blah2'])
463 for repo_url in repo_urls:
464 self.pkgmgr.add_repository(repo_url)
466 # Fetch the packages' checksum file that contains the checksums
467 # of all the packages if it is not already fetched. The checksum
468 # is always fetched whenever a job is first started. This
469 # is not done in the job's constructor as we don't have the list of
470 # the repositories there (and obviously don't care about this file
471 # if we are not using the repos)
472 try:
473 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
474 base_packages.CHECKSUM_FILE)
475 self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE,
476 checksum_file_path, use_checksum=False)
477 except error.PackageFetchError:
478 # packaging system might not be working in this case
479 # Silently fall back to the normal case
480 pass
483 def require_gcc(self):
485 Test whether gcc is installed on the machine.
487 # check if gcc is installed on the system.
488 try:
489 utils.system('which gcc')
490 except error.CmdError, e:
491 raise NotAvailableError('gcc is required by this job and is '
492 'not available on the system')
495 def setup_dep(self, deps):
496 """Set up the dependencies for this test.
497 deps is a list of libraries required for this test.
499 # Fetch the deps from the repositories and set them up.
500 for dep in deps:
501 dep_dir = os.path.join(self.autodir, 'deps', dep)
502 # Search for the dependency in the repositories if specified,
503 # else check locally.
504 try:
505 self.install_pkg(dep, 'dep', dep_dir)
506 except error.PackageInstallError:
507 # see if the dep is there locally
508 pass
510 # dep_dir might not exist if it is not fetched from the repos
511 if not os.path.exists(dep_dir):
512 raise error.TestError("Dependency %s does not exist" % dep)
514 os.chdir(dep_dir)
515 if execfile('%s.py' % dep, {}) is None:
516 logging.info('Dependency %s successfuly built', dep)
519 def _runtest(self, url, tag, timeout, args, dargs):
520 try:
521 l = lambda : test.runtest(self, url, tag, args, dargs)
522 pid = parallel.fork_start(self.resultdir, l)
524 if timeout:
525 logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
526 parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
527 else:
528 parallel.fork_waitfor(self.resultdir, pid)
530 except error.TestBaseException:
531 # These are already classified with an error type (exit_status)
532 raise
533 except error.JobError:
534 raise # Caught further up and turned into an ABORT.
535 except Exception, e:
536 # Converts all other exceptions thrown by the test regardless
537 # of phase into a TestError(TestBaseException) subclass that
538 # reports them with their full stack trace.
539 raise error.UnhandledTestError(e)
542 @_run_test_complete_on_exit
543 def run_test(self, url, *args, **dargs):
545 Summon a test object and run it.
547 @param url A url that identifies the test to run.
548 @param tag An optional keyword argument that will be added to the
549 test and subdir name.
550 @param subdir_tag An optional keyword argument that will be added
551 to the subdir name.
553 @returns True if the test passes, False otherwise.
555 group, testname = self.pkgmgr.get_package_name(url, 'test')
556 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
557 outputdir = self._make_test_outputdir(subdir)
559 timeout = dargs.pop('timeout', None)
560 if timeout:
561 logging.debug('Test has timeout: %d sec.', timeout)
563 def log_warning(reason):
564 self.record("WARN", subdir, testname, reason)
565 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
566 def group_func():
567 try:
568 self._runtest(url, tag, timeout, args, dargs)
569 except error.TestBaseException, detail:
570 # The error is already classified, record it properly.
571 self.record(detail.exit_status, subdir, testname, str(detail))
572 raise
573 else:
574 self.record('GOOD', subdir, testname, 'completed successfully')
576 try:
577 self._rungroup(subdir, testname, group_func, timeout)
578 return True
579 except error.TestBaseException:
580 return False
581 # Any other exception here will be given to the caller
583 # NOTE: The only exception possible from the control file here
584 # is error.JobError as _runtest() turns all others into an
585 # UnhandledTestError that is caught above.
588 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
589 """\
590 subdir:
591 name of the group
592 testname:
593 name of the test to run, or support step
594 function:
595 subroutine to run
596 *args:
597 arguments for the function
599 Returns the result of the passed in function
602 try:
603 optional_fields = None
604 if timeout:
605 optional_fields = {}
606 optional_fields['timeout'] = timeout
607 self.record('START', subdir, testname,
608 optional_fields=optional_fields)
610 self._state.set('client', 'unexpected_reboot', (subdir, testname))
611 try:
612 result = function(*args, **dargs)
613 self.record('END GOOD', subdir, testname)
614 return result
615 except error.TestBaseException, e:
616 self.record('END %s' % e.exit_status, subdir, testname)
617 raise
618 except error.JobError, e:
619 self.record('END ABORT', subdir, testname)
620 raise
621 except Exception, e:
622 # This should only ever happen due to a bug in the given
623 # function's code. The common case of being called by
624 # run_test() will never reach this. If a control file called
625 # run_group() itself, bugs in its function will be caught
626 # here.
627 err_msg = str(e) + '\n' + traceback.format_exc()
628 self.record('END ERROR', subdir, testname, err_msg)
629 raise
630 finally:
631 self._state.discard('client', 'unexpected_reboot')
634 def run_group(self, function, tag=None, **dargs):
636 Run a function nested within a group level.
638 function:
639 Callable to run.
640 tag:
641 An optional tag name for the group. If None (default)
642 function.__name__ will be used.
643 **dargs:
644 Named arguments for the function.
646 if tag:
647 name = tag
648 else:
649 name = function.__name__
651 try:
652 return self._rungroup(subdir=None, testname=name,
653 function=function, timeout=None, **dargs)
654 except (SystemExit, error.TestBaseException):
655 raise
656 # If there was a different exception, turn it into a TestError.
657 # It will be caught by step_engine or _run_step_fn.
658 except Exception, e:
659 raise error.UnhandledTestError(e)
662 def cpu_count(self):
663 return utils.count_cpus() # use total system count
666 def start_reboot(self):
667 self.record('START', None, 'reboot')
668 self.record('GOOD', None, 'reboot.start')
671 def _record_reboot_failure(self, subdir, operation, status,
672 running_id=None):
673 self.record("ABORT", subdir, operation, status)
674 if not running_id:
675 running_id = utils.running_os_ident()
676 kernel = {"kernel": running_id.split("::")[0]}
677 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
680 def _check_post_reboot(self, subdir, running_id=None):
682 Function to perform post boot checks such as if the system configuration
683 has changed across reboots (specifically, CPUs and partitions).
685 @param subdir: The subdir to use in the job.record call.
686 @param running_id: An optional running_id to include in the reboot
687 failure log message
689 @raise JobError: Raised if the current configuration does not match the
690 pre-reboot configuration.
692 # check to see if any partitions have changed
693 partition_list = partition_lib.get_partition_list(self,
694 exclude_swap=False)
695 mount_info = partition_lib.get_mount_info(partition_list)
696 old_mount_info = self._state.get('client', 'mount_info')
697 if mount_info != old_mount_info:
698 new_entries = mount_info - old_mount_info
699 old_entries = old_mount_info - mount_info
700 description = ("mounted partitions are different after reboot "
701 "(old entries: %s, new entries: %s)" %
702 (old_entries, new_entries))
703 self._record_reboot_failure(subdir, "reboot.verify_config",
704 description, running_id=running_id)
705 raise error.JobError("Reboot failed: %s" % description)
707 # check to see if any CPUs have changed
708 cpu_count = utils.count_cpus()
709 old_count = self._state.get('client', 'cpu_count')
710 if cpu_count != old_count:
711 description = ('Number of CPUs changed after reboot '
712 '(old count: %d, new count: %d)' %
713 (old_count, cpu_count))
714 self._record_reboot_failure(subdir, 'reboot.verify_config',
715 description, running_id=running_id)
716 raise error.JobError('Reboot failed: %s' % description)
719 def end_reboot(self, subdir, kernel, patches, running_id=None):
720 self._check_post_reboot(subdir, running_id=running_id)
722 # strip ::<timestamp> from the kernel version if present
723 kernel = kernel.split("::")[0]
724 kernel_info = {"kernel": kernel}
725 for i, patch in enumerate(patches):
726 kernel_info["patch%d" % i] = patch
727 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
730 def end_reboot_and_verify(self, expected_when, expected_id, subdir,
731 type='src', patches=[]):
732 """ Check the passed kernel identifier against the command line
733 and the running kernel, abort the job on missmatch. """
735 logging.info("POST BOOT: checking booted kernel "
736 "mark=%d identity='%s' type='%s'",
737 expected_when, expected_id, type)
739 running_id = utils.running_os_ident()
741 cmdline = utils.read_one_line("/proc/cmdline")
743 find_sum = re.compile(r'.*IDENT=(\d+)')
744 m = find_sum.match(cmdline)
745 cmdline_when = -1
746 if m:
747 cmdline_when = int(m.groups()[0])
749 # We have all the facts, see if they indicate we
750 # booted the requested kernel or not.
751 bad = False
752 if (type == 'src' and expected_id != running_id or
753 type == 'rpm' and
754 not running_id.startswith(expected_id + '::')):
755 logging.error("Kernel identifier mismatch")
756 bad = True
757 if expected_when != cmdline_when:
758 logging.error("Kernel command line mismatch")
759 bad = True
761 if bad:
762 logging.error(" Expected Ident: " + expected_id)
763 logging.error(" Running Ident: " + running_id)
764 logging.error(" Expected Mark: %d", expected_when)
765 logging.error("Command Line Mark: %d", cmdline_when)
766 logging.error(" Command Line: " + cmdline)
768 self._record_reboot_failure(subdir, "reboot.verify", "boot failure",
769 running_id=running_id)
770 raise error.JobError("Reboot returned with the wrong kernel")
772 self.record('GOOD', subdir, 'reboot.verify',
773 utils.running_os_full_version())
774 self.end_reboot(subdir, expected_id, patches, running_id=running_id)
777 def partition(self, device, loop_size=0, mountpoint=None):
779 Work with a machine partition
781 @param device: e.g. /dev/sda2, /dev/sdb1 etc...
782 @param mountpoint: Specify a directory to mount to. If not specified
783 autotest tmp directory will be used.
784 @param loop_size: Size of loopback device (in MB). Defaults to 0.
786 @return: A L{client.bin.partition.partition} object
789 if not mountpoint:
790 mountpoint = self.tmpdir
791 return partition_lib.partition(self, device, loop_size, mountpoint)
793 @utils.deprecated
794 def filesystem(self, device, mountpoint=None, loop_size=0):
795 """ Same as partition
797 @deprecated: Use partition method instead
799 return self.partition(device, loop_size, mountpoint)
802 def enable_external_logging(self):
803 pass
806 def disable_external_logging(self):
807 pass
810 def reboot_setup(self):
811 # save the partition list and mount points, as well as the cpu count
812 partition_list = partition_lib.get_partition_list(self,
813 exclude_swap=False)
814 mount_info = partition_lib.get_mount_info(partition_list)
815 self._state.set('client', 'mount_info', mount_info)
816 self._state.set('client', 'cpu_count', utils.count_cpus())
819 def reboot(self, tag=LAST_BOOT_TAG):
820 if tag == LAST_BOOT_TAG:
821 tag = self.last_boot_tag
822 else:
823 self.last_boot_tag = tag
825 self.reboot_setup()
826 self.harness.run_reboot()
827 default = self.config_get('boot.set_default')
828 if default:
829 self.bootloader.set_default(tag)
830 else:
831 self.bootloader.boot_once(tag)
833 # HACK: using this as a module sometimes hangs shutdown, so if it's
834 # installed unload it first
835 utils.system("modprobe -r netconsole", ignore_status=True)
837 # sync first, so that a sync during shutdown doesn't time out
838 utils.system("sync; sync", ignore_status=True)
840 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
841 self.quit()
844 def noop(self, text):
845 logging.info("job: noop: " + text)
848 @_run_test_complete_on_exit
849 def parallel(self, *tasklist):
850 """Run tasks in parallel"""
852 pids = []
853 old_log_filename = self._logger.global_filename
854 for i, task in enumerate(tasklist):
855 assert isinstance(task, (tuple, list))
856 self._logger.global_filename = old_log_filename + (".%d" % i)
857 def task_func():
858 # stub out _record_indent with a process-local one
859 base_record_indent = self._record_indent
860 proc_local = self._job_state.property_factory(
861 '_state', '_record_indent.%d' % os.getpid(),
862 base_record_indent, namespace='client')
863 self.__class__._record_indent = proc_local
864 task[0](*task[1:])
865 pids.append(parallel.fork_start(self.resultdir, task_func))
867 old_log_path = os.path.join(self.resultdir, old_log_filename)
868 old_log = open(old_log_path, "a")
869 exceptions = []
870 for i, pid in enumerate(pids):
871 # wait for the task to finish
872 try:
873 parallel.fork_waitfor(self.resultdir, pid)
874 except Exception, e:
875 exceptions.append(e)
876 # copy the logs from the subtask into the main log
877 new_log_path = old_log_path + (".%d" % i)
878 if os.path.exists(new_log_path):
879 new_log = open(new_log_path)
880 old_log.write(new_log.read())
881 new_log.close()
882 old_log.flush()
883 os.remove(new_log_path)
884 old_log.close()
886 self._logger.global_filename = old_log_filename
888 # handle any exceptions raised by the parallel tasks
889 if exceptions:
890 msg = "%d task(s) failed in job.parallel" % len(exceptions)
891 raise error.JobError(msg)
894 def quit(self):
895 # XXX: should have a better name.
896 self.harness.run_pause()
897 raise error.JobContinue("more to come")
900 def complete(self, status):
901 """Write pending TAP reports, clean up, and exit"""
902 # write out TAP reports
903 if self._tap.do_tap_report:
904 self._tap.write()
905 self._tap._write_tap_archive()
907 # We are about to exit 'complete' so clean up the control file.
908 dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
909 shutil.move(self._state_file, dest)
911 self.harness.run_complete()
912 self.disable_external_logging()
913 sys.exit(status)
916 def _load_state(self):
917 # grab any initial state and set up $CONTROL.state as the backing file
918 init_state_file = self.control + '.init.state'
919 self._state_file = self.control + '.state'
920 if os.path.exists(init_state_file):
921 shutil.move(init_state_file, self._state_file)
922 self._state.set_backing_file(self._state_file)
924 # initialize the state engine, if necessary
925 has_steps = self._state.has('client', 'steps')
926 if not self._is_continuation and has_steps:
927 raise RuntimeError('Loaded state can only contain client.steps if '
928 'this is a continuation')
930 if not has_steps:
931 logging.info('Initializing the state engine')
932 self._state.set('client', 'steps', [])
935 def handle_persistent_option(self, options, option_name):
937 Select option from command line or persistent state.
938 Store selected option to allow standalone client to continue
939 after reboot with previously selected options.
940 Priority:
941 1. explicitly specified via command line
942 2. stored in state file (if continuing job '-c')
943 3. default == None
945 option = None
946 cmd_line_option = getattr(options, option_name)
947 if cmd_line_option:
948 option = cmd_line_option
949 self._state.set('client', option_name, option)
950 else:
951 stored_option = self._state.get('client', option_name, None)
952 if stored_option:
953 option = stored_option
954 logging.debug('Persistent option %s now set to %s', option_name, option)
955 return option
958 def __create_step_tuple(self, fn, args, dargs):
959 # Legacy code passes in an array where the first arg is
960 # the function or its name.
961 if isinstance(fn, list):
962 assert(len(args) == 0)
963 assert(len(dargs) == 0)
964 args = fn[1:]
965 fn = fn[0]
966 # Pickling actual functions is hairy, thus we have to call
967 # them by name. Unfortunately, this means only functions
968 # defined globally can be used as a next step.
969 if callable(fn):
970 fn = fn.__name__
971 if not isinstance(fn, types.StringTypes):
972 raise StepError("Next steps must be functions or "
973 "strings containing the function name")
974 ancestry = copy.copy(self._current_step_ancestry)
975 return (ancestry, fn, args, dargs)
978 def next_step_append(self, fn, *args, **dargs):
979 """Define the next step and place it at the end"""
980 steps = self._state.get('client', 'steps')
981 steps.append(self.__create_step_tuple(fn, args, dargs))
982 self._state.set('client', 'steps', steps)
985 def next_step(self, fn, *args, **dargs):
986 """Create a new step and place it after any steps added
987 while running the current step but before any steps added in
988 previous steps"""
989 steps = self._state.get('client', 'steps')
990 steps.insert(self._next_step_index,
991 self.__create_step_tuple(fn, args, dargs))
992 self._next_step_index += 1
993 self._state.set('client', 'steps', steps)
996 def next_step_prepend(self, fn, *args, **dargs):
997 """Insert a new step, executing first"""
998 steps = self._state.get('client', 'steps')
999 steps.insert(0, self.__create_step_tuple(fn, args, dargs))
1000 self._next_step_index += 1
1001 self._state.set('client', 'steps', steps)
1005 def _run_step_fn(self, local_vars, fn, args, dargs):
1006 """Run a (step) function within the given context"""
1008 local_vars['__args'] = args
1009 local_vars['__dargs'] = dargs
1010 try:
1011 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
1012 return local_vars['__ret']
1013 except SystemExit:
1014 raise # Send error.JobContinue and JobComplete on up to runjob.
1015 except error.TestNAError, detail:
1016 self.record(detail.exit_status, None, fn, str(detail))
1017 except Exception, detail:
1018 raise error.UnhandledJobError(detail)
1021 def _create_frame(self, global_vars, ancestry, fn_name):
1022 """Set up the environment like it would have been when this
1023 function was first defined.
1025 Child step engine 'implementations' must have 'return locals()'
1026 at end end of their steps. Because of this, we can call the
1027 parent function and get back all child functions (i.e. those
1028 defined within it).
1030 Unfortunately, the call stack of the function calling
1031 job.next_step might have been deeper than the function it
1032 added. In order to make sure that the environment is what it
1033 should be, we need to then pop off the frames we built until
1034 we find the frame where the function was first defined."""
1036 # The copies ensure that the parent frames are not modified
1037 # while building child frames. This matters if we then
1038 # pop some frames in the next part of this function.
1039 current_frame = copy.copy(global_vars)
1040 frames = [current_frame]
1041 for steps_fn_name in ancestry:
1042 ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
1043 current_frame = copy.copy(ret)
1044 frames.append(current_frame)
1046 # Walk up the stack frames until we find the place fn_name was defined.
1047 while len(frames) > 2:
1048 if fn_name not in frames[-2]:
1049 break
1050 if frames[-2][fn_name] != frames[-1][fn_name]:
1051 break
1052 frames.pop()
1053 ancestry.pop()
1055 return (frames[-1], ancestry)
1058 def _add_step_init(self, local_vars, current_function):
1059 """If the function returned a dictionary that includes a
1060 function named 'step_init', prepend it to our list of steps.
1061 This will only get run the first time a function with a nested
1062 use of the step engine is run."""
1064 if (isinstance(local_vars, dict) and
1065 'step_init' in local_vars and
1066 callable(local_vars['step_init'])):
1067 # The init step is a child of the function
1068 # we were just running.
1069 self._current_step_ancestry.append(current_function)
1070 self.next_step_prepend('step_init')
1073 def step_engine(self):
1074 """The multi-run engine used when the control file defines step_init.
1076 Does the next step.
1079 # Set up the environment and then interpret the control file.
1080 # Some control files will have code outside of functions,
1081 # which means we need to have our state engine initialized
1082 # before reading in the file.
1083 global_control_vars = {'job': self,
1084 'args': self.args}
1085 exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1086 try:
1087 execfile(self.control, global_control_vars, global_control_vars)
1088 except error.TestNAError, detail:
1089 self.record(detail.exit_status, None, self.control, str(detail))
1090 except SystemExit:
1091 raise # Send error.JobContinue and JobComplete on up to runjob.
1092 except Exception, detail:
1093 # Syntax errors or other general Python exceptions coming out of
1094 # the top level of the control file itself go through here.
1095 raise error.UnhandledJobError(detail)
1097 # If we loaded in a mid-job state file, then we presumably
1098 # know what steps we have yet to run.
1099 if not self._is_continuation:
1100 if 'step_init' in global_control_vars:
1101 self.next_step(global_control_vars['step_init'])
1102 else:
1103 # if last job failed due to unexpected reboot, record it as fail
1104 # so harness gets called
1105 last_job = self._state.get('client', 'unexpected_reboot', None)
1106 if last_job:
1107 subdir, testname = last_job
1108 self.record('FAIL', subdir, testname, 'unexpected reboot')
1109 self.record('END FAIL', subdir, testname)
1111 # Iterate through the steps. If we reboot, we'll simply
1112 # continue iterating on the next step.
1113 while len(self._state.get('client', 'steps')) > 0:
1114 steps = self._state.get('client', 'steps')
1115 (ancestry, fn_name, args, dargs) = steps.pop(0)
1116 self._state.set('client', 'steps', steps)
1118 self._next_step_index = 0
1119 ret = self._create_frame(global_control_vars, ancestry, fn_name)
1120 local_vars, self._current_step_ancestry = ret
1121 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1122 self._add_step_init(local_vars, fn_name)
1125 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1126 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1127 on_every_test)
1130 def add_sysinfo_logfile(self, file, on_every_test=False):
1131 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1134 def _add_sysinfo_loggable(self, loggable, on_every_test):
1135 if on_every_test:
1136 self.sysinfo.test_loggables.add(loggable)
1137 else:
1138 self.sysinfo.boot_loggables.add(loggable)
1139 self._save_sysinfo_state()
1142 def _load_sysinfo_state(self):
1143 state = self._state.get('client', 'sysinfo', None)
1144 if state:
1145 self.sysinfo.deserialize(state)
1148 def _save_sysinfo_state(self):
1149 state = self.sysinfo.serialize()
1150 self._state.set('client', 'sysinfo', state)
1153 class disk_usage_monitor:
1154 def __init__(self, logging_func, device, max_mb_per_hour):
1155 self.func = logging_func
1156 self.device = device
1157 self.max_mb_per_hour = max_mb_per_hour
1160 def start(self):
1161 self.initial_space = utils.freespace(self.device)
1162 self.start_time = time.time()
1165 def stop(self):
1166 # if no maximum usage rate was set, we don't need to
1167 # generate any warnings
1168 if not self.max_mb_per_hour:
1169 return
1171 final_space = utils.freespace(self.device)
1172 used_space = self.initial_space - final_space
1173 stop_time = time.time()
1174 total_time = stop_time - self.start_time
1175 # round up the time to one minute, to keep extremely short
1176 # tests from generating false positives due to short, badly
1177 # timed bursts of activity
1178 total_time = max(total_time, 60.0)
1180 # determine the usage rate
1181 bytes_per_sec = used_space / total_time
1182 mb_per_sec = bytes_per_sec / 1024**2
1183 mb_per_hour = mb_per_sec * 60 * 60
1185 if mb_per_hour > self.max_mb_per_hour:
1186 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1187 msg %= (self.device, mb_per_hour)
1188 self.func(msg)
1191 @classmethod
1192 def watch(cls, *monitor_args, **monitor_dargs):
1193 """ Generic decorator to wrap a function call with the
1194 standard create-monitor -> start -> call -> stop idiom."""
1195 def decorator(func):
1196 def watched_func(*args, **dargs):
1197 monitor = cls(*monitor_args, **monitor_dargs)
1198 monitor.start()
1199 try:
1200 func(*args, **dargs)
1201 finally:
1202 monitor.stop()
1203 return watched_func
1204 return decorator
1207 def runjob(control, drop_caches, options):
1209 Run a job using the given control file.
1211 This is the main interface to this module.
1213 @see base_job.__init__ for parameter info.
1215 control = os.path.abspath(control)
1216 state = control + '.state'
1217 # Ensure state file is cleaned up before the job starts to run if autotest
1218 # is not running with the --continue flag
1219 if not options.cont and os.path.isfile(state):
1220 logging.debug('Cleaning up previously found state file')
1221 os.remove(state)
1223 # instantiate the job object ready for the control file.
1224 myjob = None
1225 try:
1226 # Check that the control file is valid
1227 if not os.path.exists(control):
1228 raise error.JobError(control + ": control file not found")
1230 # When continuing, the job is complete when there is no
1231 # state file, ensure we don't try and continue.
1232 if options.cont and not os.path.exists(state):
1233 raise error.JobComplete("all done")
1235 myjob = job(control=control, drop_caches=drop_caches, options=options)
1237 # Load in the users control file, may do any one of:
1238 # 1) execute in toto
1239 # 2) define steps, and select the first via next_step()
1240 myjob.step_engine()
1242 except error.JobContinue:
1243 sys.exit(5)
1245 except error.JobComplete:
1246 sys.exit(1)
1248 except error.JobError, instance:
1249 logging.error("JOB ERROR: " + str(instance))
1250 if myjob:
1251 command = None
1252 if len(instance.args) > 1:
1253 command = instance.args[1]
1254 myjob.record('ABORT', None, command, str(instance))
1255 myjob.record('END ABORT', None, None, str(instance))
1256 assert myjob._record_indent == 0
1257 myjob.complete(1)
1258 else:
1259 sys.exit(1)
1261 except Exception, e:
1262 # NOTE: job._run_step_fn and job.step_engine will turn things into
1263 # a JobError for us. If we get here, its likely an autotest bug.
1264 msg = str(e) + '\n' + traceback.format_exc()
1265 logging.critical("JOB ERROR (autotest bug?): " + msg)
1266 if myjob:
1267 myjob.record('END ABORT', None, None, msg)
1268 assert myjob._record_indent == 0
1269 myjob.complete(1)
1270 else:
1271 sys.exit(1)
1273 # If we get here, then we assume the job is complete and good.
1274 myjob.record('END GOOD', None, None)
1275 assert myjob._record_indent == 0
1277 myjob.complete(0)
1280 site_job = utils.import_site_class(
1281 __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job)
1283 class job(site_job):
1284 pass