1 """The main job wrapper
3 This is the core infrastructure.
5 Copyright Andy Whitcroft, Martin J. Bligh 2006
8 import copy
, os
, platform
, re
, shutil
, sys
, time
, traceback
, types
, glob
9 import logging
, getpass
, errno
, weakref
10 import cPickle
as pickle
11 from autotest_lib
.client
.bin
import client_logging_config
12 from autotest_lib
.client
.bin
import utils
, parallel
, kernel
, xen
13 from autotest_lib
.client
.bin
import profilers
, boottool
, harness
14 from autotest_lib
.client
.bin
import config
, sysinfo
, test
, local_host
15 from autotest_lib
.client
.bin
import partition
as partition_lib
16 from autotest_lib
.client
.common_lib
import base_job
17 from autotest_lib
.client
.common_lib
import error
, barrier
, log
, logging_manager
18 from autotest_lib
.client
.common_lib
import base_packages
, packages
19 from autotest_lib
.client
.common_lib
import global_config
20 from autotest_lib
.client
.tools
import html_report
23 LAST_BOOT_TAG
= object()
25 from autotest_lib.client.common_lib.error import *
26 from autotest_lib.client.bin.utils import *
30 class StepError(error
.AutotestError
):
33 class NotAvailableError(error
.AutotestError
):
38 def _run_test_complete_on_exit(f
):
39 """Decorator for job methods that automatically calls
40 self.harness.run_test_complete when the method exits, if appropriate."""
41 def wrapped(self
, *args
, **dargs
):
43 return f(self
, *args
, **dargs
)
45 if self
._logger
.global_filename
== 'status':
46 self
.harness
.run_test_complete()
48 logging
.debug("Dropping caches")
50 wrapped
.__name
__ = f
.__name
__
51 wrapped
.__doc
__ = f
.__doc
__
52 wrapped
.__dict
__.update(f
.__dict
__)
56 class status_indenter(base_job
.status_indenter
):
57 """Provide a status indenter that is backed by job._record_prefix."""
58 def __init__(self
, job
):
59 self
.job
= weakref
.proxy(job
) # avoid a circular reference
64 return self
.job
._record
_indent
68 self
.job
._record
_indent
+= 1
72 self
.job
._record
_indent
-= 1
75 class base_client_job(base_job
.base_job
):
76 """The client-side concrete implementation of base_job.
78 Optional properties provided by this implementation:
84 _WARNING_DISABLE_DELAY
= 5
86 # _record_indent is a persistent property, but only on the client
87 _job_state
= base_job
.base_job
._job
_state
88 _record_indent
= _job_state
.property_factory(
89 '_state', '_record_indent', 0, namespace
='client')
90 _max_disk_usage_rate
= _job_state
.property_factory(
91 '_state', '_max_disk_usage_rate', 0.0, namespace
='client')
94 def __init__(self
, control
, options
, drop_caches
=True,
95 extra_copy_cmdline
=None):
97 Prepare a client side job object.
99 @param control: The control file (pathname of).
100 @param options: an object which includes:
101 jobtag: The job tag string (eg "default").
102 cont: If this is the continuation of this job.
103 harness_type: An alternative server harness. [None]
104 use_external_logging: If true, the enable_external_logging
105 method will be called during construction. [False]
106 @param drop_caches: If true, utils.drop_caches() is called before and
107 between all tests. [True]
108 @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
109 copy from the running kernel to all the installed kernels with
112 super(base_client_job
, self
).__init
__(options
=options
)
113 self
._pre
_record
_init
(control
, options
)
115 self
._post
_record
_init
(control
, options
, drop_caches
,
117 except Exception, err
:
119 'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
125 def _get_environ_autodir(cls
):
126 return os
.environ
['AUTODIR']
130 def _find_base_directories(cls
):
132 Determine locations of autodir and clientdir (which are the same)
133 using os.environ. Serverdir does not exist in this context.
135 autodir
= clientdir
= cls
._get
_environ
_autodir
()
136 return autodir
, clientdir
, None
140 def _parse_args(cls
, args
):
141 return re
.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args
)
144 def _find_resultdir(self
, options
):
146 Determine the directory for storing results. On a client this is
147 always <autodir>/results/<tag>, where tag is passed in on the command
150 return os
.path
.join(self
.autodir
, 'results', options
.tag
)
153 def _get_status_logger(self
):
154 """Return a reference to the status logger."""
158 def _pre_record_init(self
, control
, options
):
160 Initialization function that should peform ONLY the required
161 setup so that the self.record() method works.
163 As of now self.record() needs self.resultdir, self._group_level,
164 self.harness and of course self._logger.
167 self
._cleanup
_debugdir
_files
()
168 self
._cleanup
_results
_dir
()
170 logging_manager
.configure_logging(
171 client_logging_config
.ClientLoggingConfig(),
172 results_dir
=self
.resultdir
,
173 verbose
=options
.verbose
)
174 logging
.info('Writing results to %s', self
.resultdir
)
176 # init_group_level needs the state
177 self
.control
= os
.path
.realpath(control
)
178 self
._is
_continuation
= options
.cont
179 self
._current
_step
_ancestry
= []
180 self
._next
_step
_index
= 0
183 _harness
= self
.handle_persistent_option(options
, 'harness')
184 _harness_args
= self
.handle_persistent_option(options
, 'harness_args')
186 self
.harness
= harness
.select(_harness
, self
, _harness_args
)
188 # set up the status logger
189 def client_job_record_hook(entry
):
191 if '.' in self
._logger
.global_filename
:
192 msg_tag
= self
._logger
.global_filename
.split('.', 1)[1]
193 # send the entry to the job harness
194 message
= '\n'.join([entry
.message
] + entry
.extra_message_lines
)
195 rendered_entry
= self
._logger
.render_entry(entry
)
196 self
.harness
.test_status_detail(entry
.status_code
, entry
.subdir
,
197 entry
.operation
, message
, msg_tag
,
199 self
.harness
.test_status(rendered_entry
, msg_tag
)
200 # send the entry to stdout, if it's enabled
201 logging
.info(rendered_entry
)
202 self
._logger
= base_job
.status_logger(
203 self
, status_indenter(self
), record_hook
=client_job_record_hook
,
204 tap_writer
=self
._tap
)
206 def _post_record_init(self
, control
, options
, drop_caches
,
209 Perform job initialization not required by self.record().
211 self
._init
_drop
_caches
(drop_caches
)
213 self
._init
_packages
()
215 self
.sysinfo
= sysinfo
.sysinfo(self
.resultdir
)
216 self
._load
_sysinfo
_state
()
219 download
= os
.path
.join(self
.testdir
, 'download')
220 if not os
.path
.exists(download
):
223 shutil
.copyfile(self
.control
,
224 os
.path
.join(self
.resultdir
, 'control'))
226 self
.control
= control
228 self
.logging
= logging_manager
.get_logging_manager(
229 manage_stdout_and_stderr
=True, redirect_fds
=True)
230 self
.logging
.start_logging()
232 self
._config
= config
.config(self
)
233 self
.profilers
= profilers
.profilers(self
)
235 self
._init
_bootloader
()
237 self
.machines
= [options
.hostname
]
238 self
.hosts
= set([local_host
.LocalHost(hostname
=options
.hostname
,
239 bootloader
=self
.bootloader
)])
243 self
.args
= self
._parse
_args
(options
.args
)
246 self
.user
= options
.user
248 self
.user
= getpass
.getuser()
250 self
.sysinfo
.log_per_reboot_data()
253 self
.record('START', None, None)
255 self
.harness
.run_start()
258 self
.enable_external_logging()
260 self
._init
_cmdline
(extra_copy_cmdline
)
262 self
.num_tests_run
= None
263 self
.num_tests_failed
= None
265 self
.warning_loggers
= None
266 self
.warning_manager
= None
269 def _init_drop_caches(self
, drop_caches
):
271 Perform the drop caches initialization.
273 self
.drop_caches_between_iterations
= (
274 global_config
.global_config
.get_config_value('CLIENT',
275 'drop_caches_between_iterations',
276 type=bool, default
=True))
277 self
.drop_caches
= drop_caches
279 logging
.debug("Dropping caches")
283 def _init_bootloader(self
):
285 Perform boottool initialization.
287 tool
= self
.config_get('boottool.executable')
288 self
.bootloader
= boottool
.boottool(tool
)
291 def _init_packages(self
):
293 Perform the packages support initialization.
295 self
.pkgmgr
= packages
.PackageManager(
296 self
.autodir
, run_function_dargs
={'timeout':3600})
299 def _init_cmdline(self
, extra_copy_cmdline
):
301 Initialize default cmdline for booted kernels in this job.
303 copy_cmdline
= set(['console'])
304 if extra_copy_cmdline
is not None:
305 copy_cmdline
.update(extra_copy_cmdline
)
307 # extract console= and other args from cmdline and add them into the
308 # base args that we use for all kernels we install
309 cmdline
= utils
.read_one_line('/proc/cmdline')
311 for karg
in cmdline
.split():
312 for param
in copy_cmdline
:
313 if karg
.startswith(param
) and \
314 (len(param
) == len(karg
) or karg
[len(param
)] == '='):
315 kernel_args
.append(karg
)
316 self
.config_set('boot.default_args', ' '.join(kernel_args
))
319 def _cleanup_results_dir(self
):
320 """Delete everything in resultsdir"""
321 assert os
.path
.exists(self
.resultdir
)
322 list_files
= glob
.glob('%s/*' % self
.resultdir
)
326 elif os
.path
.isfile(f
):
330 def _cleanup_debugdir_files(self
):
332 Delete any leftover debugdir files
334 list_files
= glob
.glob("/tmp/autotest_results_dir.*")
339 def disable_warnings(self
, warning_type
):
340 self
.record("INFO", None, None,
341 "disabling %s warnings" % warning_type
,
342 {"warnings.disable": warning_type
})
343 time
.sleep(self
._WARNING
_DISABLE
_DELAY
)
346 def enable_warnings(self
, warning_type
):
347 time
.sleep(self
._WARNING
_DISABLE
_DELAY
)
348 self
.record("INFO", None, None,
349 "enabling %s warnings" % warning_type
,
350 {"warnings.enable": warning_type
})
353 def monitor_disk_usage(self
, max_rate
):
355 Signal that the job should monitor disk space usage on /
356 and generate a warning if a test uses up disk space at a
357 rate exceeding 'max_rate'.
360 max_rate - the maximium allowed rate of disk consumption
361 during a test, in MB/hour, or 0 to indicate
364 self
._max
_disk
_usage
_rate
= max_rate
367 def relative_path(self
, path
):
369 Return a patch relative to the job results directory
371 head
= len(self
.resultdir
) + 1 # remove the / inbetween
375 def control_get(self
):
379 def control_set(self
, control
):
380 self
.control
= os
.path
.abspath(control
)
383 def harness_select(self
, which
, harness_args
):
384 self
.harness
= harness
.select(which
, self
, harness_args
)
387 def config_set(self
, name
, value
):
388 self
._config
.set(name
, value
)
391 def config_get(self
, name
):
392 return self
._config
.get(name
)
395 def setup_dirs(self
, results_dir
, tmp_dir
):
397 tmp_dir
= os
.path
.join(self
.tmpdir
, 'build')
398 if not os
.path
.exists(tmp_dir
):
400 if not os
.path
.isdir(tmp_dir
):
401 e_msg
= "Temp dir (%s) is not a dir - args backwards?" % self
.tmpdir
402 raise ValueError(e_msg
)
404 # We label the first build "build" and then subsequent ones
405 # as "build.2", "build.3", etc. Whilst this is a little bit
406 # inconsistent, 99.9% of jobs will only have one build
407 # (that's not done as kernbench, sparse, or buildtest),
408 # so it works out much cleaner. One of life's comprimises.
410 results_dir
= os
.path
.join(self
.resultdir
, 'build')
412 while os
.path
.exists(results_dir
):
413 results_dir
= os
.path
.join(self
.resultdir
, 'build.%d' % i
)
415 if not os
.path
.exists(results_dir
):
416 os
.mkdir(results_dir
)
418 return (results_dir
, tmp_dir
)
421 def xen(self
, base_tree
, results_dir
= '', tmp_dir
= '', leave
= False, \
423 """Summon a xen object"""
424 (results_dir
, tmp_dir
) = self
.setup_dirs(results_dir
, tmp_dir
)
426 return xen
.xen(self
, base_tree
, results_dir
, tmp_dir
, build_dir
,
430 def kernel(self
, base_tree
, results_dir
= '', tmp_dir
= '', leave
= False):
431 """Summon a kernel object"""
432 (results_dir
, tmp_dir
) = self
.setup_dirs(results_dir
, tmp_dir
)
434 return kernel
.auto_kernel(self
, base_tree
, results_dir
, tmp_dir
,
438 def barrier(self
, *args
, **kwds
):
439 """Create a barrier object"""
440 return barrier
.barrier(*args
, **kwds
)
443 def install_pkg(self
, name
, pkg_type
, install_dir
):
445 This method is a simple wrapper around the actual package
446 installation method in the Packager class. This is used
447 internally by the profilers, deps and tests code.
448 name : name of the package (ex: sleeptest, dbench etc.)
449 pkg_type : Type of the package (ex: test, dep etc.)
450 install_dir : The directory in which the source is actually
451 untarred into. (ex: client/profilers/<name> for profilers)
453 if self
.pkgmgr
.repositories
:
454 self
.pkgmgr
.install_pkg(name
, pkg_type
, self
.pkgdir
, install_dir
)
457 def add_repository(self
, repo_urls
):
459 Adds the repository locations to the job so that packages
460 can be fetched from them when needed. The repository list
461 needs to be a string list
462 Ex: job.add_repository(['http://blah1','http://blah2'])
464 for repo_url
in repo_urls
:
465 self
.pkgmgr
.add_repository(repo_url
)
467 # Fetch the packages' checksum file that contains the checksums
468 # of all the packages if it is not already fetched. The checksum
469 # is always fetched whenever a job is first started. This
470 # is not done in the job's constructor as we don't have the list of
471 # the repositories there (and obviously don't care about this file
472 # if we are not using the repos)
474 checksum_file_path
= os
.path
.join(self
.pkgmgr
.pkgmgr_dir
,
475 base_packages
.CHECKSUM_FILE
)
476 self
.pkgmgr
.fetch_pkg(base_packages
.CHECKSUM_FILE
,
477 checksum_file_path
, use_checksum
=False)
478 except error
.PackageFetchError
:
479 # packaging system might not be working in this case
480 # Silently fall back to the normal case
484 def require_gcc(self
):
486 Test whether gcc is installed on the machine.
488 # check if gcc is installed on the system.
490 utils
.system('which gcc')
491 except error
.CmdError
, e
:
492 raise NotAvailableError('gcc is required by this job and is '
493 'not available on the system')
496 def setup_dep(self
, deps
):
497 """Set up the dependencies for this test.
498 deps is a list of libraries required for this test.
500 # Fetch the deps from the repositories and set them up.
502 dep_dir
= os
.path
.join(self
.autodir
, 'deps', dep
)
503 # Search for the dependency in the repositories if specified,
504 # else check locally.
506 self
.install_pkg(dep
, 'dep', dep_dir
)
507 except error
.PackageInstallError
:
508 # see if the dep is there locally
511 # dep_dir might not exist if it is not fetched from the repos
512 if not os
.path
.exists(dep_dir
):
513 raise error
.TestError("Dependency %s does not exist" % dep
)
516 if execfile('%s.py' % dep
, {}) is None:
517 logging
.info('Dependency %s successfuly built', dep
)
520 def _runtest(self
, url
, tag
, timeout
, args
, dargs
):
522 l
= lambda : test
.runtest(self
, url
, tag
, args
, dargs
)
523 pid
= parallel
.fork_start(self
.resultdir
, l
)
526 logging
.debug('Waiting for pid %d for %d seconds', pid
, timeout
)
527 parallel
.fork_waitfor_timed(self
.resultdir
, pid
, timeout
)
529 parallel
.fork_waitfor(self
.resultdir
, pid
)
531 except error
.TestBaseException
:
532 # These are already classified with an error type (exit_status)
534 except error
.JobError
:
535 raise # Caught further up and turned into an ABORT.
537 # Converts all other exceptions thrown by the test regardless
538 # of phase into a TestError(TestBaseException) subclass that
539 # reports them with their full stack trace.
540 raise error
.UnhandledTestError(e
)
543 def _run_test_base(self
, url
, *args
, **dargs
):
545 Prepares arguments and run functions to run_test and run_test_detail.
547 @param url A url that identifies the test to run.
548 @param tag An optional keyword argument that will be added to the
549 test and subdir name.
550 @param subdir_tag An optional keyword argument that will be added
554 subdir: Test subdirectory
556 group_func: Actual test run function
557 timeout: Test timeout
559 group
, testname
= self
.pkgmgr
.get_package_name(url
, 'test')
560 testname
, subdir
, tag
= self
._build
_tagged
_test
_name
(testname
, dargs
)
561 outputdir
= self
._make
_test
_outputdir
(subdir
)
563 timeout
= dargs
.pop('timeout', None)
565 logging
.debug('Test has timeout: %d sec.', timeout
)
567 def log_warning(reason
):
568 self
.record("WARN", subdir
, testname
, reason
)
569 @disk_usage_monitor.watch(log_warning
, "/", self
._max
_disk
_usage
_rate
)
572 self
._runtest
(url
, tag
, timeout
, args
, dargs
)
573 except error
.TestBaseException
, detail
:
574 # The error is already classified, record it properly.
575 self
.record(detail
.exit_status
, subdir
, testname
, str(detail
))
578 self
.record('GOOD', subdir
, testname
, 'completed successfully')
580 return (subdir
, testname
, group_func
, timeout
)
583 @_run_test_complete_on_exit
584 def run_test(self
, url
, *args
, **dargs
):
586 Summon a test object and run it.
588 @param url A url that identifies the test to run.
589 @param tag An optional keyword argument that will be added to the
590 test and subdir name.
591 @param subdir_tag An optional keyword argument that will be added
594 @returns True if the test passes, False otherwise.
596 (subdir
, testname
, group_func
, timeout
) = self
._run
_test
_base
(url
,
600 self
._rungroup
(subdir
, testname
, group_func
, timeout
)
602 except error
.TestBaseException
:
604 # Any other exception here will be given to the caller
606 # NOTE: The only exception possible from the control file here
607 # is error.JobError as _runtest() turns all others into an
608 # UnhandledTestError that is caught above.
611 @_run_test_complete_on_exit
612 def run_test_detail(self
, url
, *args
, **dargs
):
614 Summon a test object and run it, returning test status.
616 @param url A url that identifies the test to run.
617 @param tag An optional keyword argument that will be added to the
618 test and subdir name.
619 @param subdir_tag An optional keyword argument that will be added
623 @see: client/common_lib/error.py, exit_status
625 (subdir
, testname
, group_func
, timeout
) = self
._run
_test
_base
(url
,
629 self
._rungroup
(subdir
, testname
, group_func
, timeout
)
631 except error
.TestBaseException
, detail
:
632 return detail
.exit_status
635 def _rungroup(self
, subdir
, testname
, function
, timeout
, *args
, **dargs
):
640 name of the test to run, or support step
644 arguments for the function
646 Returns the result of the passed in function
650 optional_fields
= None
653 optional_fields
['timeout'] = timeout
654 self
.record('START', subdir
, testname
,
655 optional_fields
=optional_fields
)
657 self
._state
.set('client', 'unexpected_reboot', (subdir
, testname
))
659 result
= function(*args
, **dargs
)
660 self
.record('END GOOD', subdir
, testname
)
662 except error
.TestBaseException
, e
:
663 self
.record('END %s' % e
.exit_status
, subdir
, testname
)
665 except error
.JobError
, e
:
666 self
.record('END ABORT', subdir
, testname
)
669 # This should only ever happen due to a bug in the given
670 # function's code. The common case of being called by
671 # run_test() will never reach this. If a control file called
672 # run_group() itself, bugs in its function will be caught
674 err_msg
= str(e
) + '\n' + traceback
.format_exc()
675 self
.record('END ERROR', subdir
, testname
, err_msg
)
678 self
._state
.discard('client', 'unexpected_reboot')
681 def run_group(self
, function
, tag
=None, **dargs
):
683 Run a function nested within a group level.
688 An optional tag name for the group. If None (default)
689 function.__name__ will be used.
691 Named arguments for the function.
696 name
= function
.__name
__
699 return self
._rungroup
(subdir
=None, testname
=name
,
700 function
=function
, timeout
=None, **dargs
)
701 except (SystemExit, error
.TestBaseException
):
703 # If there was a different exception, turn it into a TestError.
704 # It will be caught by step_engine or _run_step_fn.
706 raise error
.UnhandledTestError(e
)
710 return utils
.count_cpus() # use total system count
713 def start_reboot(self
):
714 self
.record('START', None, 'reboot')
715 self
.record('GOOD', None, 'reboot.start')
718 def _record_reboot_failure(self
, subdir
, operation
, status
,
720 self
.record("ABORT", subdir
, operation
, status
)
722 running_id
= utils
.running_os_ident()
723 kernel
= {"kernel": running_id
.split("::")[0]}
724 self
.record("END ABORT", subdir
, 'reboot', optional_fields
=kernel
)
727 def _check_post_reboot(self
, subdir
, running_id
=None):
729 Function to perform post boot checks such as if the system configuration
730 has changed across reboots (specifically, CPUs and partitions).
732 @param subdir: The subdir to use in the job.record call.
733 @param running_id: An optional running_id to include in the reboot
736 @raise JobError: Raised if the current configuration does not match the
737 pre-reboot configuration.
739 # check to see if any partitions have changed
740 partition_list
= partition_lib
.get_partition_list(self
,
742 mount_info
= partition_lib
.get_mount_info(partition_list
)
743 old_mount_info
= self
._state
.get('client', 'mount_info')
744 if mount_info
!= old_mount_info
:
745 new_entries
= mount_info
- old_mount_info
746 old_entries
= old_mount_info
- mount_info
747 description
= ("mounted partitions are different after reboot "
748 "(old entries: %s, new entries: %s)" %
749 (old_entries
, new_entries
))
750 self
._record
_reboot
_failure
(subdir
, "reboot.verify_config",
751 description
, running_id
=running_id
)
752 raise error
.JobError("Reboot failed: %s" % description
)
754 # check to see if any CPUs have changed
755 cpu_count
= utils
.count_cpus()
756 old_count
= self
._state
.get('client', 'cpu_count')
757 if cpu_count
!= old_count
:
758 description
= ('Number of CPUs changed after reboot '
759 '(old count: %d, new count: %d)' %
760 (old_count
, cpu_count
))
761 self
._record
_reboot
_failure
(subdir
, 'reboot.verify_config',
762 description
, running_id
=running_id
)
763 raise error
.JobError('Reboot failed: %s' % description
)
766 def end_reboot(self
, subdir
, kernel
, patches
, running_id
=None):
767 self
._check
_post
_reboot
(subdir
, running_id
=running_id
)
769 # strip ::<timestamp> from the kernel version if present
770 kernel
= kernel
.split("::")[0]
771 kernel_info
= {"kernel": kernel
}
772 for i
, patch
in enumerate(patches
):
773 kernel_info
["patch%d" % i
] = patch
774 self
.record("END GOOD", subdir
, "reboot", optional_fields
=kernel_info
)
777 def end_reboot_and_verify(self
, expected_when
, expected_id
, subdir
,
778 type='src', patches
=[]):
779 """ Check the passed kernel identifier against the command line
780 and the running kernel, abort the job on missmatch. """
782 logging
.info("POST BOOT: checking booted kernel "
783 "mark=%d identity='%s' type='%s'",
784 expected_when
, expected_id
, type)
786 running_id
= utils
.running_os_ident()
788 cmdline
= utils
.read_one_line("/proc/cmdline")
790 find_sum
= re
.compile(r
'.*IDENT=(\d+)')
791 m
= find_sum
.match(cmdline
)
794 cmdline_when
= int(m
.groups()[0])
796 # We have all the facts, see if they indicate we
797 # booted the requested kernel or not.
799 if (type == 'src' and expected_id
!= running_id
or
801 not running_id
.startswith(expected_id
+ '::')):
802 logging
.error("Kernel identifier mismatch")
804 if expected_when
!= cmdline_when
:
805 logging
.error("Kernel command line mismatch")
809 logging
.error(" Expected Ident: " + expected_id
)
810 logging
.error(" Running Ident: " + running_id
)
811 logging
.error(" Expected Mark: %d", expected_when
)
812 logging
.error("Command Line Mark: %d", cmdline_when
)
813 logging
.error(" Command Line: " + cmdline
)
815 self
._record
_reboot
_failure
(subdir
, "reboot.verify", "boot failure",
816 running_id
=running_id
)
817 raise error
.JobError("Reboot returned with the wrong kernel")
819 self
.record('GOOD', subdir
, 'reboot.verify',
820 utils
.running_os_full_version())
821 self
.end_reboot(subdir
, expected_id
, patches
, running_id
=running_id
)
824 def partition(self
, device
, loop_size
=0, mountpoint
=None):
826 Work with a machine partition
828 @param device: e.g. /dev/sda2, /dev/sdb1 etc...
829 @param mountpoint: Specify a directory to mount to. If not specified
830 autotest tmp directory will be used.
831 @param loop_size: Size of loopback device (in MB). Defaults to 0.
833 @return: A L{client.bin.partition.partition} object
837 mountpoint
= self
.tmpdir
838 return partition_lib
.partition(self
, device
, loop_size
, mountpoint
)
841 def filesystem(self
, device
, mountpoint
=None, loop_size
=0):
842 """ Same as partition
844 @deprecated: Use partition method instead
846 return self
.partition(device
, loop_size
, mountpoint
)
849 def enable_external_logging(self
):
853 def disable_external_logging(self
):
857 def reboot_setup(self
):
858 # save the partition list and mount points, as well as the cpu count
859 partition_list
= partition_lib
.get_partition_list(self
,
861 mount_info
= partition_lib
.get_mount_info(partition_list
)
862 self
._state
.set('client', 'mount_info', mount_info
)
863 self
._state
.set('client', 'cpu_count', utils
.count_cpus())
866 def reboot(self
, tag
=LAST_BOOT_TAG
):
867 if tag
== LAST_BOOT_TAG
:
868 tag
= self
.last_boot_tag
870 self
.last_boot_tag
= tag
873 self
.harness
.run_reboot()
874 default
= self
.config_get('boot.set_default')
876 self
.bootloader
.set_default(tag
)
878 self
.bootloader
.boot_once(tag
)
880 # HACK: using this as a module sometimes hangs shutdown, so if it's
881 # installed unload it first
882 utils
.system("modprobe -r netconsole", ignore_status
=True)
884 # sync first, so that a sync during shutdown doesn't time out
885 utils
.system("sync; sync", ignore_status
=True)
887 utils
.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
891 def noop(self
, text
):
892 logging
.info("job: noop: " + text
)
895 @_run_test_complete_on_exit
896 def parallel(self
, *tasklist
):
897 """Run tasks in parallel"""
900 old_log_filename
= self
._logger
.global_filename
901 for i
, task
in enumerate(tasklist
):
902 assert isinstance(task
, (tuple, list))
903 self
._logger
.global_filename
= old_log_filename
+ (".%d" % i
)
905 # stub out _record_indent with a process-local one
906 base_record_indent
= self
._record
_indent
907 proc_local
= self
._job
_state
.property_factory(
908 '_state', '_record_indent.%d' % os
.getpid(),
909 base_record_indent
, namespace
='client')
910 self
.__class
__._record
_indent
= proc_local
912 pids
.append(parallel
.fork_start(self
.resultdir
, task_func
))
914 old_log_path
= os
.path
.join(self
.resultdir
, old_log_filename
)
915 old_log
= open(old_log_path
, "a")
917 for i
, pid
in enumerate(pids
):
918 # wait for the task to finish
920 parallel
.fork_waitfor(self
.resultdir
, pid
)
923 # copy the logs from the subtask into the main log
924 new_log_path
= old_log_path
+ (".%d" % i
)
925 if os
.path
.exists(new_log_path
):
926 new_log
= open(new_log_path
)
927 old_log
.write(new_log
.read())
930 os
.remove(new_log_path
)
933 self
._logger
.global_filename
= old_log_filename
935 # handle any exceptions raised by the parallel tasks
937 msg
= "%d task(s) failed in job.parallel" % len(exceptions
)
938 raise error
.JobError(msg
)
942 # XXX: should have a better name.
943 self
.harness
.run_pause()
944 raise error
.JobContinue("more to come")
947 def complete(self
, status
):
948 """Write pending TAP reports, clean up, and exit"""
949 # write out TAP reports
950 if self
._tap
.do_tap_report
:
952 self
._tap
._write
_tap
_archive
()
954 # write out a job HTML report
956 html_report
.create_report(self
.resultdir
)
958 logging
.error("Error writing job HTML report: %s", e
)
960 # We are about to exit 'complete' so clean up the control file.
961 dest
= os
.path
.join(self
.resultdir
, os
.path
.basename(self
._state
_file
))
962 shutil
.move(self
._state
_file
, dest
)
964 self
.harness
.run_complete()
965 self
.disable_external_logging()
969 def _load_state(self
):
970 # grab any initial state and set up $CONTROL.state as the backing file
971 init_state_file
= self
.control
+ '.init.state'
972 self
._state
_file
= self
.control
+ '.state'
973 if os
.path
.exists(init_state_file
):
974 shutil
.move(init_state_file
, self
._state
_file
)
975 self
._state
.set_backing_file(self
._state
_file
)
977 # initialize the state engine, if necessary
978 has_steps
= self
._state
.has('client', 'steps')
979 if not self
._is
_continuation
and has_steps
:
980 raise RuntimeError('Loaded state can only contain client.steps if '
981 'this is a continuation')
984 logging
.info('Initializing the state engine')
985 self
._state
.set('client', 'steps', [])
988 def handle_persistent_option(self
, options
, option_name
):
990 Select option from command line or persistent state.
991 Store selected option to allow standalone client to continue
992 after reboot with previously selected options.
994 1. explicitly specified via command line
995 2. stored in state file (if continuing job '-c')
999 cmd_line_option
= getattr(options
, option_name
)
1001 option
= cmd_line_option
1002 self
._state
.set('client', option_name
, option
)
1004 stored_option
= self
._state
.get('client', option_name
, None)
1006 option
= stored_option
1007 logging
.debug('Persistent option %s now set to %s', option_name
, option
)
1011 def __create_step_tuple(self
, fn
, args
, dargs
):
1012 # Legacy code passes in an array where the first arg is
1013 # the function or its name.
1014 if isinstance(fn
, list):
1015 assert(len(args
) == 0)
1016 assert(len(dargs
) == 0)
1019 # Pickling actual functions is hairy, thus we have to call
1020 # them by name. Unfortunately, this means only functions
1021 # defined globally can be used as a next step.
1024 if not isinstance(fn
, types
.StringTypes
):
1025 raise StepError("Next steps must be functions or "
1026 "strings containing the function name")
1027 ancestry
= copy
.copy(self
._current
_step
_ancestry
)
1028 return (ancestry
, fn
, args
, dargs
)
1031 def next_step_append(self
, fn
, *args
, **dargs
):
1032 """Define the next step and place it at the end"""
1033 steps
= self
._state
.get('client', 'steps')
1034 steps
.append(self
.__create
_step
_tuple
(fn
, args
, dargs
))
1035 self
._state
.set('client', 'steps', steps
)
1038 def next_step(self
, fn
, *args
, **dargs
):
1039 """Create a new step and place it after any steps added
1040 while running the current step but before any steps added in
1042 steps
= self
._state
.get('client', 'steps')
1043 steps
.insert(self
._next
_step
_index
,
1044 self
.__create
_step
_tuple
(fn
, args
, dargs
))
1045 self
._next
_step
_index
+= 1
1046 self
._state
.set('client', 'steps', steps
)
1049 def next_step_prepend(self
, fn
, *args
, **dargs
):
1050 """Insert a new step, executing first"""
1051 steps
= self
._state
.get('client', 'steps')
1052 steps
.insert(0, self
.__create
_step
_tuple
(fn
, args
, dargs
))
1053 self
._next
_step
_index
+= 1
1054 self
._state
.set('client', 'steps', steps
)
1058 def _run_step_fn(self
, local_vars
, fn
, args
, dargs
):
1059 """Run a (step) function within the given context"""
1061 local_vars
['__args'] = args
1062 local_vars
['__dargs'] = dargs
1064 exec('__ret = %s(*__args, **__dargs)' % fn
, local_vars
, local_vars
)
1065 return local_vars
['__ret']
1067 raise # Send error.JobContinue and JobComplete on up to runjob.
1068 except error
.TestNAError
, detail
:
1069 self
.record(detail
.exit_status
, None, fn
, str(detail
))
1070 except Exception, detail
:
1071 raise error
.UnhandledJobError(detail
)
1074 def _create_frame(self
, global_vars
, ancestry
, fn_name
):
1075 """Set up the environment like it would have been when this
1076 function was first defined.
1078 Child step engine 'implementations' must have 'return locals()'
1079 at end end of their steps. Because of this, we can call the
1080 parent function and get back all child functions (i.e. those
1083 Unfortunately, the call stack of the function calling
1084 job.next_step might have been deeper than the function it
1085 added. In order to make sure that the environment is what it
1086 should be, we need to then pop off the frames we built until
1087 we find the frame where the function was first defined."""
1089 # The copies ensure that the parent frames are not modified
1090 # while building child frames. This matters if we then
1091 # pop some frames in the next part of this function.
1092 current_frame
= copy
.copy(global_vars
)
1093 frames
= [current_frame
]
1094 for steps_fn_name
in ancestry
:
1095 ret
= self
._run
_step
_fn
(current_frame
, steps_fn_name
, [], {})
1096 current_frame
= copy
.copy(ret
)
1097 frames
.append(current_frame
)
1099 # Walk up the stack frames until we find the place fn_name was defined.
1100 while len(frames
) > 2:
1101 if fn_name
not in frames
[-2]:
1103 if frames
[-2][fn_name
] != frames
[-1][fn_name
]:
1108 return (frames
[-1], ancestry
)
1111 def _add_step_init(self
, local_vars
, current_function
):
1112 """If the function returned a dictionary that includes a
1113 function named 'step_init', prepend it to our list of steps.
1114 This will only get run the first time a function with a nested
1115 use of the step engine is run."""
1117 if (isinstance(local_vars
, dict) and
1118 'step_init' in local_vars
and
1119 callable(local_vars
['step_init'])):
1120 # The init step is a child of the function
1121 # we were just running.
1122 self
._current
_step
_ancestry
.append(current_function
)
1123 self
.next_step_prepend('step_init')
1126 def step_engine(self
):
1127 """The multi-run engine used when the control file defines step_init.
1132 # Set up the environment and then interpret the control file.
1133 # Some control files will have code outside of functions,
1134 # which means we need to have our state engine initialized
1135 # before reading in the file.
1136 global_control_vars
= {'job': self
,
1138 exec(JOB_PREAMBLE
, global_control_vars
, global_control_vars
)
1140 execfile(self
.control
, global_control_vars
, global_control_vars
)
1141 except error
.TestNAError
, detail
:
1142 self
.record(detail
.exit_status
, None, self
.control
, str(detail
))
1144 raise # Send error.JobContinue and JobComplete on up to runjob.
1145 except Exception, detail
:
1146 # Syntax errors or other general Python exceptions coming out of
1147 # the top level of the control file itself go through here.
1148 raise error
.UnhandledJobError(detail
)
1150 # If we loaded in a mid-job state file, then we presumably
1151 # know what steps we have yet to run.
1152 if not self
._is
_continuation
:
1153 if 'step_init' in global_control_vars
:
1154 self
.next_step(global_control_vars
['step_init'])
1156 # if last job failed due to unexpected reboot, record it as fail
1157 # so harness gets called
1158 last_job
= self
._state
.get('client', 'unexpected_reboot', None)
1160 subdir
, testname
= last_job
1161 self
.record('FAIL', subdir
, testname
, 'unexpected reboot')
1162 self
.record('END FAIL', subdir
, testname
)
1164 # Iterate through the steps. If we reboot, we'll simply
1165 # continue iterating on the next step.
1166 while len(self
._state
.get('client', 'steps')) > 0:
1167 steps
= self
._state
.get('client', 'steps')
1168 (ancestry
, fn_name
, args
, dargs
) = steps
.pop(0)
1169 self
._state
.set('client', 'steps', steps
)
1171 self
._next
_step
_index
= 0
1172 ret
= self
._create
_frame
(global_control_vars
, ancestry
, fn_name
)
1173 local_vars
, self
._current
_step
_ancestry
= ret
1174 local_vars
= self
._run
_step
_fn
(local_vars
, fn_name
, args
, dargs
)
1175 self
._add
_step
_init
(local_vars
, fn_name
)
1178 def add_sysinfo_command(self
, command
, logfile
=None, on_every_test
=False):
1179 self
._add
_sysinfo
_loggable
(sysinfo
.command(command
, logf
=logfile
),
1183 def add_sysinfo_logfile(self
, file, on_every_test
=False):
1184 self
._add
_sysinfo
_loggable
(sysinfo
.logfile(file), on_every_test
)
1187 def _add_sysinfo_loggable(self
, loggable
, on_every_test
):
1189 self
.sysinfo
.test_loggables
.add(loggable
)
1191 self
.sysinfo
.boot_loggables
.add(loggable
)
1192 self
._save
_sysinfo
_state
()
1195 def _load_sysinfo_state(self
):
1196 state
= self
._state
.get('client', 'sysinfo', None)
1198 self
.sysinfo
.deserialize(state
)
1201 def _save_sysinfo_state(self
):
1202 state
= self
.sysinfo
.serialize()
1203 self
._state
.set('client', 'sysinfo', state
)
1206 class disk_usage_monitor
:
1207 def __init__(self
, logging_func
, device
, max_mb_per_hour
):
1208 self
.func
= logging_func
1209 self
.device
= device
1210 self
.max_mb_per_hour
= max_mb_per_hour
1214 self
.initial_space
= utils
.freespace(self
.device
)
1215 self
.start_time
= time
.time()
1219 # if no maximum usage rate was set, we don't need to
1220 # generate any warnings
1221 if not self
.max_mb_per_hour
:
1224 final_space
= utils
.freespace(self
.device
)
1225 used_space
= self
.initial_space
- final_space
1226 stop_time
= time
.time()
1227 total_time
= stop_time
- self
.start_time
1228 # round up the time to one minute, to keep extremely short
1229 # tests from generating false positives due to short, badly
1230 # timed bursts of activity
1231 total_time
= max(total_time
, 60.0)
1233 # determine the usage rate
1234 bytes_per_sec
= used_space
/ total_time
1235 mb_per_sec
= bytes_per_sec
/ 1024**2
1236 mb_per_hour
= mb_per_sec
* 60 * 60
1238 if mb_per_hour
> self
.max_mb_per_hour
:
1239 msg
= ("disk space on %s was consumed at a rate of %.2f MB/hour")
1240 msg
%= (self
.device
, mb_per_hour
)
1245 def watch(cls
, *monitor_args
, **monitor_dargs
):
1246 """ Generic decorator to wrap a function call with the
1247 standard create-monitor -> start -> call -> stop idiom."""
1248 def decorator(func
):
1249 def watched_func(*args
, **dargs
):
1250 monitor
= cls(*monitor_args
, **monitor_dargs
)
1253 func(*args
, **dargs
)
1260 def runjob(control
, drop_caches
, options
):
1262 Run a job using the given control file.
1264 This is the main interface to this module.
1266 @see base_job.__init__ for parameter info.
1268 control
= os
.path
.abspath(control
)
1269 state
= control
+ '.state'
1270 # Ensure state file is cleaned up before the job starts to run if autotest
1271 # is not running with the --continue flag
1272 if not options
.cont
and os
.path
.isfile(state
):
1273 logging
.debug('Cleaning up previously found state file')
1276 # instantiate the job object ready for the control file.
1279 # Check that the control file is valid
1280 if not os
.path
.exists(control
):
1281 raise error
.JobError(control
+ ": control file not found")
1283 # When continuing, the job is complete when there is no
1284 # state file, ensure we don't try and continue.
1285 if options
.cont
and not os
.path
.exists(state
):
1286 raise error
.JobComplete("all done")
1288 myjob
= job(control
=control
, drop_caches
=drop_caches
, options
=options
)
1290 # Load in the users control file, may do any one of:
1291 # 1) execute in toto
1292 # 2) define steps, and select the first via next_step()
1295 except error
.JobContinue
:
1298 except error
.JobComplete
:
1301 except error
.JobError
, instance
:
1302 logging
.error("JOB ERROR: " + str(instance
))
1305 if len(instance
.args
) > 1:
1306 command
= instance
.args
[1]
1307 myjob
.record('ABORT', None, command
, str(instance
))
1308 myjob
.record('END ABORT', None, None, str(instance
))
1309 assert myjob
._record
_indent
== 0
1314 except Exception, e
:
1315 # NOTE: job._run_step_fn and job.step_engine will turn things into
1316 # a JobError for us. If we get here, its likely an autotest bug.
1317 msg
= str(e
) + '\n' + traceback
.format_exc()
1318 logging
.critical("JOB ERROR (autotest bug?): " + msg
)
1320 myjob
.record('END ABORT', None, None, msg
)
1321 assert myjob
._record
_indent
== 0
1326 # If we get here, then we assume the job is complete and good.
1327 myjob
.record('END GOOD', None, None)
1328 assert myjob
._record
_indent
== 0
1333 site_job
= utils
.import_site_class(
1334 __file__
, "autotest_lib.client.bin.site_job", "site_job", base_client_job
)
1336 class job(site_job
):