1 """The main job wrapper
3 This is the core infrastructure.
5 Copyright Andy Whitcroft, Martin J. Bligh 2006
8 import copy
, os
, platform
, re
, shutil
, sys
, time
, traceback
, types
, glob
9 import logging
, getpass
, errno
, weakref
10 import cPickle
as pickle
11 from autotest_lib
.client
.bin
import client_logging_config
12 from autotest_lib
.client
.bin
import utils
, parallel
, kernel
, xen
13 from autotest_lib
.client
.bin
import profilers
, boottool
, harness
14 from autotest_lib
.client
.bin
import config
, sysinfo
, test
, local_host
15 from autotest_lib
.client
.bin
import partition
as partition_lib
16 from autotest_lib
.client
.common_lib
import base_job
17 from autotest_lib
.client
.common_lib
import error
, barrier
, log
, logging_manager
18 from autotest_lib
.client
.common_lib
import base_packages
, packages
19 from autotest_lib
.client
.common_lib
import global_config
22 LAST_BOOT_TAG
= object()
24 from autotest_lib.client.common_lib.error import *
25 from autotest_lib.client.bin.utils import *
29 class StepError(error
.AutotestError
):
32 class NotAvailableError(error
.AutotestError
):
37 def _run_test_complete_on_exit(f
):
38 """Decorator for job methods that automatically calls
39 self.harness.run_test_complete when the method exits, if appropriate."""
40 def wrapped(self
, *args
, **dargs
):
42 return f(self
, *args
, **dargs
)
44 if self
._logger
.global_filename
== 'status':
45 self
.harness
.run_test_complete()
47 logging
.debug("Dropping caches")
49 wrapped
.__name
__ = f
.__name
__
50 wrapped
.__doc
__ = f
.__doc
__
51 wrapped
.__dict
__.update(f
.__dict
__)
55 class status_indenter(base_job
.status_indenter
):
56 """Provide a status indenter that is backed by job._record_prefix."""
57 def __init__(self
, job
):
58 self
.job
= weakref
.proxy(job
) # avoid a circular reference
63 return self
.job
._record
_indent
67 self
.job
._record
_indent
+= 1
71 self
.job
._record
_indent
-= 1
74 class base_client_job(base_job
.base_job
):
75 """The client-side concrete implementation of base_job.
77 Optional properties provided by this implementation:
83 _WARNING_DISABLE_DELAY
= 5
85 # _record_indent is a persistent property, but only on the client
86 _job_state
= base_job
.base_job
._job
_state
87 _record_indent
= _job_state
.property_factory(
88 '_state', '_record_indent', 0, namespace
='client')
89 _max_disk_usage_rate
= _job_state
.property_factory(
90 '_state', '_max_disk_usage_rate', 0.0, namespace
='client')
93 def __init__(self
, control
, options
, drop_caches
=True,
94 extra_copy_cmdline
=None):
96 Prepare a client side job object.
98 @param control: The control file (pathname of).
99 @param options: an object which includes:
100 jobtag: The job tag string (eg "default").
101 cont: If this is the continuation of this job.
102 harness_type: An alternative server harness. [None]
103 use_external_logging: If true, the enable_external_logging
104 method will be called during construction. [False]
105 @param drop_caches: If true, utils.drop_caches() is called before and
106 between all tests. [True]
107 @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
108 copy from the running kernel to all the installed kernels with
111 super(base_client_job
, self
).__init
__(options
=options
)
112 self
._pre
_record
_init
(control
, options
)
114 self
._post
_record
_init
(control
, options
, drop_caches
,
116 except Exception, err
:
118 'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
124 def _get_environ_autodir(cls
):
125 return os
.environ
['AUTODIR']
129 def _find_base_directories(cls
):
131 Determine locations of autodir and clientdir (which are the same)
132 using os.environ. Serverdir does not exist in this context.
134 autodir
= clientdir
= cls
._get
_environ
_autodir
()
135 return autodir
, clientdir
, None
139 def _parse_args(cls
, args
):
140 return re
.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args
)
143 def _find_resultdir(self
, options
):
145 Determine the directory for storing results. On a client this is
146 always <autodir>/results/<tag>, where tag is passed in on the command
149 return os
.path
.join(self
.autodir
, 'results', options
.tag
)
152 def _get_status_logger(self
):
153 """Return a reference to the status logger."""
157 def _pre_record_init(self
, control
, options
):
159 Initialization function that should peform ONLY the required
160 setup so that the self.record() method works.
162 As of now self.record() needs self.resultdir, self._group_level,
163 self.harness and of course self._logger.
166 self
._cleanup
_debugdir
_files
()
167 self
._cleanup
_results
_dir
()
169 logging_manager
.configure_logging(
170 client_logging_config
.ClientLoggingConfig(),
171 results_dir
=self
.resultdir
,
172 verbose
=options
.verbose
)
173 logging
.info('Writing results to %s', self
.resultdir
)
175 # init_group_level needs the state
176 self
.control
= os
.path
.realpath(control
)
177 self
._is
_continuation
= options
.cont
178 self
._current
_step
_ancestry
= []
179 self
._next
_step
_index
= 0
182 _harness
= self
.handle_persistent_option(options
, 'harness')
183 _harness_args
= self
.handle_persistent_option(options
, 'harness_args')
185 self
.harness
= harness
.select(_harness
, self
, _harness_args
)
187 # set up the status logger
188 def client_job_record_hook(entry
):
190 if '.' in self
._logger
.global_filename
:
191 msg_tag
= self
._logger
.global_filename
.split('.', 1)[1]
192 # send the entry to the job harness
193 message
= '\n'.join([entry
.message
] + entry
.extra_message_lines
)
194 rendered_entry
= self
._logger
.render_entry(entry
)
195 self
.harness
.test_status_detail(entry
.status_code
, entry
.subdir
,
196 entry
.operation
, message
, msg_tag
,
198 self
.harness
.test_status(rendered_entry
, msg_tag
)
199 # send the entry to stdout, if it's enabled
200 logging
.info(rendered_entry
)
201 self
._logger
= base_job
.status_logger(
202 self
, status_indenter(self
), record_hook
=client_job_record_hook
,
203 tap_writer
=self
._tap
)
205 def _post_record_init(self
, control
, options
, drop_caches
,
208 Perform job initialization not required by self.record().
210 self
._init
_drop
_caches
(drop_caches
)
212 self
._init
_packages
()
214 self
.sysinfo
= sysinfo
.sysinfo(self
.resultdir
)
215 self
._load
_sysinfo
_state
()
218 download
= os
.path
.join(self
.testdir
, 'download')
219 if not os
.path
.exists(download
):
222 shutil
.copyfile(self
.control
,
223 os
.path
.join(self
.resultdir
, 'control'))
225 self
.control
= control
227 self
.logging
= logging_manager
.get_logging_manager(
228 manage_stdout_and_stderr
=True, redirect_fds
=True)
229 self
.logging
.start_logging()
231 self
._config
= config
.config(self
)
232 self
.profilers
= profilers
.profilers(self
)
234 self
._init
_bootloader
()
236 self
.machines
= [options
.hostname
]
237 self
.hosts
= set([local_host
.LocalHost(hostname
=options
.hostname
,
238 bootloader
=self
.bootloader
)])
242 self
.args
= self
._parse
_args
(options
.args
)
245 self
.user
= options
.user
247 self
.user
= getpass
.getuser()
249 self
.sysinfo
.log_per_reboot_data()
252 self
.record('START', None, None)
254 self
.harness
.run_start()
257 self
.enable_external_logging()
259 self
._init
_cmdline
(extra_copy_cmdline
)
261 self
.num_tests_run
= None
262 self
.num_tests_failed
= None
264 self
.warning_loggers
= None
265 self
.warning_manager
= None
268 def _init_drop_caches(self
, drop_caches
):
270 Perform the drop caches initialization.
272 self
.drop_caches_between_iterations
= (
273 global_config
.global_config
.get_config_value('CLIENT',
274 'drop_caches_between_iterations',
275 type=bool, default
=True))
276 self
.drop_caches
= drop_caches
278 logging
.debug("Dropping caches")
282 def _init_bootloader(self
):
284 Perform boottool initialization.
286 tool
= self
.config_get('boottool.executable')
287 self
.bootloader
= boottool
.boottool(tool
)
290 def _init_packages(self
):
292 Perform the packages support initialization.
294 self
.pkgmgr
= packages
.PackageManager(
295 self
.autodir
, run_function_dargs
={'timeout':3600})
298 def _init_cmdline(self
, extra_copy_cmdline
):
300 Initialize default cmdline for booted kernels in this job.
302 copy_cmdline
= set(['console'])
303 if extra_copy_cmdline
is not None:
304 copy_cmdline
.update(extra_copy_cmdline
)
306 # extract console= and other args from cmdline and add them into the
307 # base args that we use for all kernels we install
308 cmdline
= utils
.read_one_line('/proc/cmdline')
310 for karg
in cmdline
.split():
311 for param
in copy_cmdline
:
312 if karg
.startswith(param
) and \
313 (len(param
) == len(karg
) or karg
[len(param
)] == '='):
314 kernel_args
.append(karg
)
315 self
.config_set('boot.default_args', ' '.join(kernel_args
))
318 def _cleanup_results_dir(self
):
319 """Delete everything in resultsdir"""
320 assert os
.path
.exists(self
.resultdir
)
321 list_files
= glob
.glob('%s/*' % self
.resultdir
)
325 elif os
.path
.isfile(f
):
329 def _cleanup_debugdir_files(self
):
331 Delete any leftover debugdir files
333 list_files
= glob
.glob("/tmp/autotest_results_dir.*")
338 def disable_warnings(self
, warning_type
):
339 self
.record("INFO", None, None,
340 "disabling %s warnings" % warning_type
,
341 {"warnings.disable": warning_type
})
342 time
.sleep(self
._WARNING
_DISABLE
_DELAY
)
345 def enable_warnings(self
, warning_type
):
346 time
.sleep(self
._WARNING
_DISABLE
_DELAY
)
347 self
.record("INFO", None, None,
348 "enabling %s warnings" % warning_type
,
349 {"warnings.enable": warning_type
})
352 def monitor_disk_usage(self
, max_rate
):
354 Signal that the job should monitor disk space usage on /
355 and generate a warning if a test uses up disk space at a
356 rate exceeding 'max_rate'.
359 max_rate - the maximium allowed rate of disk consumption
360 during a test, in MB/hour, or 0 to indicate
363 self
._max
_disk
_usage
_rate
= max_rate
366 def relative_path(self
, path
):
368 Return a patch relative to the job results directory
370 head
= len(self
.resultdir
) + 1 # remove the / inbetween
374 def control_get(self
):
378 def control_set(self
, control
):
379 self
.control
= os
.path
.abspath(control
)
382 def harness_select(self
, which
, harness_args
):
383 self
.harness
= harness
.select(which
, self
, harness_args
)
386 def config_set(self
, name
, value
):
387 self
._config
.set(name
, value
)
390 def config_get(self
, name
):
391 return self
._config
.get(name
)
394 def setup_dirs(self
, results_dir
, tmp_dir
):
396 tmp_dir
= os
.path
.join(self
.tmpdir
, 'build')
397 if not os
.path
.exists(tmp_dir
):
399 if not os
.path
.isdir(tmp_dir
):
400 e_msg
= "Temp dir (%s) is not a dir - args backwards?" % self
.tmpdir
401 raise ValueError(e_msg
)
403 # We label the first build "build" and then subsequent ones
404 # as "build.2", "build.3", etc. Whilst this is a little bit
405 # inconsistent, 99.9% of jobs will only have one build
406 # (that's not done as kernbench, sparse, or buildtest),
407 # so it works out much cleaner. One of life's comprimises.
409 results_dir
= os
.path
.join(self
.resultdir
, 'build')
411 while os
.path
.exists(results_dir
):
412 results_dir
= os
.path
.join(self
.resultdir
, 'build.%d' % i
)
414 if not os
.path
.exists(results_dir
):
415 os
.mkdir(results_dir
)
417 return (results_dir
, tmp_dir
)
420 def xen(self
, base_tree
, results_dir
= '', tmp_dir
= '', leave
= False, \
422 """Summon a xen object"""
423 (results_dir
, tmp_dir
) = self
.setup_dirs(results_dir
, tmp_dir
)
425 return xen
.xen(self
, base_tree
, results_dir
, tmp_dir
, build_dir
,
429 def kernel(self
, base_tree
, results_dir
= '', tmp_dir
= '', leave
= False):
430 """Summon a kernel object"""
431 (results_dir
, tmp_dir
) = self
.setup_dirs(results_dir
, tmp_dir
)
433 return kernel
.auto_kernel(self
, base_tree
, results_dir
, tmp_dir
,
437 def barrier(self
, *args
, **kwds
):
438 """Create a barrier object"""
439 return barrier
.barrier(*args
, **kwds
)
442 def install_pkg(self
, name
, pkg_type
, install_dir
):
444 This method is a simple wrapper around the actual package
445 installation method in the Packager class. This is used
446 internally by the profilers, deps and tests code.
447 name : name of the package (ex: sleeptest, dbench etc.)
448 pkg_type : Type of the package (ex: test, dep etc.)
449 install_dir : The directory in which the source is actually
450 untarred into. (ex: client/profilers/<name> for profilers)
452 if self
.pkgmgr
.repositories
:
453 self
.pkgmgr
.install_pkg(name
, pkg_type
, self
.pkgdir
, install_dir
)
456 def add_repository(self
, repo_urls
):
458 Adds the repository locations to the job so that packages
459 can be fetched from them when needed. The repository list
460 needs to be a string list
461 Ex: job.add_repository(['http://blah1','http://blah2'])
463 for repo_url
in repo_urls
:
464 self
.pkgmgr
.add_repository(repo_url
)
466 # Fetch the packages' checksum file that contains the checksums
467 # of all the packages if it is not already fetched. The checksum
468 # is always fetched whenever a job is first started. This
469 # is not done in the job's constructor as we don't have the list of
470 # the repositories there (and obviously don't care about this file
471 # if we are not using the repos)
473 checksum_file_path
= os
.path
.join(self
.pkgmgr
.pkgmgr_dir
,
474 base_packages
.CHECKSUM_FILE
)
475 self
.pkgmgr
.fetch_pkg(base_packages
.CHECKSUM_FILE
,
476 checksum_file_path
, use_checksum
=False)
477 except error
.PackageFetchError
:
478 # packaging system might not be working in this case
479 # Silently fall back to the normal case
483 def require_gcc(self
):
485 Test whether gcc is installed on the machine.
487 # check if gcc is installed on the system.
489 utils
.system('which gcc')
490 except error
.CmdError
, e
:
491 raise NotAvailableError('gcc is required by this job and is '
492 'not available on the system')
495 def setup_dep(self
, deps
):
496 """Set up the dependencies for this test.
497 deps is a list of libraries required for this test.
499 # Fetch the deps from the repositories and set them up.
501 dep_dir
= os
.path
.join(self
.autodir
, 'deps', dep
)
502 # Search for the dependency in the repositories if specified,
503 # else check locally.
505 self
.install_pkg(dep
, 'dep', dep_dir
)
506 except error
.PackageInstallError
:
507 # see if the dep is there locally
510 # dep_dir might not exist if it is not fetched from the repos
511 if not os
.path
.exists(dep_dir
):
512 raise error
.TestError("Dependency %s does not exist" % dep
)
515 if execfile('%s.py' % dep
, {}) is None:
516 logging
.info('Dependency %s successfuly built', dep
)
519 def _runtest(self
, url
, tag
, timeout
, args
, dargs
):
521 l
= lambda : test
.runtest(self
, url
, tag
, args
, dargs
)
522 pid
= parallel
.fork_start(self
.resultdir
, l
)
525 logging
.debug('Waiting for pid %d for %d seconds', pid
, timeout
)
526 parallel
.fork_waitfor_timed(self
.resultdir
, pid
, timeout
)
528 parallel
.fork_waitfor(self
.resultdir
, pid
)
530 except error
.TestBaseException
:
531 # These are already classified with an error type (exit_status)
533 except error
.JobError
:
534 raise # Caught further up and turned into an ABORT.
536 # Converts all other exceptions thrown by the test regardless
537 # of phase into a TestError(TestBaseException) subclass that
538 # reports them with their full stack trace.
539 raise error
.UnhandledTestError(e
)
542 @_run_test_complete_on_exit
543 def run_test(self
, url
, *args
, **dargs
):
545 Summon a test object and run it.
547 @param url A url that identifies the test to run.
548 @param tag An optional keyword argument that will be added to the
549 test and subdir name.
550 @param subdir_tag An optional keyword argument that will be added
553 @returns True if the test passes, False otherwise.
555 group
, testname
= self
.pkgmgr
.get_package_name(url
, 'test')
556 testname
, subdir
, tag
= self
._build
_tagged
_test
_name
(testname
, dargs
)
557 outputdir
= self
._make
_test
_outputdir
(subdir
)
559 timeout
= dargs
.pop('timeout', None)
561 logging
.debug('Test has timeout: %d sec.', timeout
)
563 def log_warning(reason
):
564 self
.record("WARN", subdir
, testname
, reason
)
565 @disk_usage_monitor.watch(log_warning
, "/", self
._max
_disk
_usage
_rate
)
568 self
._runtest
(url
, tag
, timeout
, args
, dargs
)
569 except error
.TestBaseException
, detail
:
570 # The error is already classified, record it properly.
571 self
.record(detail
.exit_status
, subdir
, testname
, str(detail
))
574 self
.record('GOOD', subdir
, testname
, 'completed successfully')
577 self
._rungroup
(subdir
, testname
, group_func
, timeout
)
579 except error
.TestBaseException
:
581 # Any other exception here will be given to the caller
583 # NOTE: The only exception possible from the control file here
584 # is error.JobError as _runtest() turns all others into an
585 # UnhandledTestError that is caught above.
588 def _rungroup(self
, subdir
, testname
, function
, timeout
, *args
, **dargs
):
593 name of the test to run, or support step
597 arguments for the function
599 Returns the result of the passed in function
603 optional_fields
= None
606 optional_fields
['timeout'] = timeout
607 self
.record('START', subdir
, testname
,
608 optional_fields
=optional_fields
)
610 self
._state
.set('client', 'unexpected_reboot', (subdir
, testname
))
612 result
= function(*args
, **dargs
)
613 self
.record('END GOOD', subdir
, testname
)
615 except error
.TestBaseException
, e
:
616 self
.record('END %s' % e
.exit_status
, subdir
, testname
)
618 except error
.JobError
, e
:
619 self
.record('END ABORT', subdir
, testname
)
622 # This should only ever happen due to a bug in the given
623 # function's code. The common case of being called by
624 # run_test() will never reach this. If a control file called
625 # run_group() itself, bugs in its function will be caught
627 err_msg
= str(e
) + '\n' + traceback
.format_exc()
628 self
.record('END ERROR', subdir
, testname
, err_msg
)
631 self
._state
.discard('client', 'unexpected_reboot')
634 def run_group(self
, function
, tag
=None, **dargs
):
636 Run a function nested within a group level.
641 An optional tag name for the group. If None (default)
642 function.__name__ will be used.
644 Named arguments for the function.
649 name
= function
.__name
__
652 return self
._rungroup
(subdir
=None, testname
=name
,
653 function
=function
, timeout
=None, **dargs
)
654 except (SystemExit, error
.TestBaseException
):
656 # If there was a different exception, turn it into a TestError.
657 # It will be caught by step_engine or _run_step_fn.
659 raise error
.UnhandledTestError(e
)
663 return utils
.count_cpus() # use total system count
666 def start_reboot(self
):
667 self
.record('START', None, 'reboot')
668 self
.record('GOOD', None, 'reboot.start')
671 def _record_reboot_failure(self
, subdir
, operation
, status
,
673 self
.record("ABORT", subdir
, operation
, status
)
675 running_id
= utils
.running_os_ident()
676 kernel
= {"kernel": running_id
.split("::")[0]}
677 self
.record("END ABORT", subdir
, 'reboot', optional_fields
=kernel
)
680 def _check_post_reboot(self
, subdir
, running_id
=None):
682 Function to perform post boot checks such as if the system configuration
683 has changed across reboots (specifically, CPUs and partitions).
685 @param subdir: The subdir to use in the job.record call.
686 @param running_id: An optional running_id to include in the reboot
689 @raise JobError: Raised if the current configuration does not match the
690 pre-reboot configuration.
692 # check to see if any partitions have changed
693 partition_list
= partition_lib
.get_partition_list(self
,
695 mount_info
= partition_lib
.get_mount_info(partition_list
)
696 old_mount_info
= self
._state
.get('client', 'mount_info')
697 if mount_info
!= old_mount_info
:
698 new_entries
= mount_info
- old_mount_info
699 old_entries
= old_mount_info
- mount_info
700 description
= ("mounted partitions are different after reboot "
701 "(old entries: %s, new entries: %s)" %
702 (old_entries
, new_entries
))
703 self
._record
_reboot
_failure
(subdir
, "reboot.verify_config",
704 description
, running_id
=running_id
)
705 raise error
.JobError("Reboot failed: %s" % description
)
707 # check to see if any CPUs have changed
708 cpu_count
= utils
.count_cpus()
709 old_count
= self
._state
.get('client', 'cpu_count')
710 if cpu_count
!= old_count
:
711 description
= ('Number of CPUs changed after reboot '
712 '(old count: %d, new count: %d)' %
713 (old_count
, cpu_count
))
714 self
._record
_reboot
_failure
(subdir
, 'reboot.verify_config',
715 description
, running_id
=running_id
)
716 raise error
.JobError('Reboot failed: %s' % description
)
719 def end_reboot(self
, subdir
, kernel
, patches
, running_id
=None):
720 self
._check
_post
_reboot
(subdir
, running_id
=running_id
)
722 # strip ::<timestamp> from the kernel version if present
723 kernel
= kernel
.split("::")[0]
724 kernel_info
= {"kernel": kernel
}
725 for i
, patch
in enumerate(patches
):
726 kernel_info
["patch%d" % i
] = patch
727 self
.record("END GOOD", subdir
, "reboot", optional_fields
=kernel_info
)
730 def end_reboot_and_verify(self
, expected_when
, expected_id
, subdir
,
731 type='src', patches
=[]):
732 """ Check the passed kernel identifier against the command line
733 and the running kernel, abort the job on missmatch. """
735 logging
.info("POST BOOT: checking booted kernel "
736 "mark=%d identity='%s' type='%s'",
737 expected_when
, expected_id
, type)
739 running_id
= utils
.running_os_ident()
741 cmdline
= utils
.read_one_line("/proc/cmdline")
743 find_sum
= re
.compile(r
'.*IDENT=(\d+)')
744 m
= find_sum
.match(cmdline
)
747 cmdline_when
= int(m
.groups()[0])
749 # We have all the facts, see if they indicate we
750 # booted the requested kernel or not.
752 if (type == 'src' and expected_id
!= running_id
or
754 not running_id
.startswith(expected_id
+ '::')):
755 logging
.error("Kernel identifier mismatch")
757 if expected_when
!= cmdline_when
:
758 logging
.error("Kernel command line mismatch")
762 logging
.error(" Expected Ident: " + expected_id
)
763 logging
.error(" Running Ident: " + running_id
)
764 logging
.error(" Expected Mark: %d", expected_when
)
765 logging
.error("Command Line Mark: %d", cmdline_when
)
766 logging
.error(" Command Line: " + cmdline
)
768 self
._record
_reboot
_failure
(subdir
, "reboot.verify", "boot failure",
769 running_id
=running_id
)
770 raise error
.JobError("Reboot returned with the wrong kernel")
772 self
.record('GOOD', subdir
, 'reboot.verify',
773 utils
.running_os_full_version())
774 self
.end_reboot(subdir
, expected_id
, patches
, running_id
=running_id
)
777 def partition(self
, device
, loop_size
=0, mountpoint
=None):
779 Work with a machine partition
781 @param device: e.g. /dev/sda2, /dev/sdb1 etc...
782 @param mountpoint: Specify a directory to mount to. If not specified
783 autotest tmp directory will be used.
784 @param loop_size: Size of loopback device (in MB). Defaults to 0.
786 @return: A L{client.bin.partition.partition} object
790 mountpoint
= self
.tmpdir
791 return partition_lib
.partition(self
, device
, loop_size
, mountpoint
)
794 def filesystem(self
, device
, mountpoint
=None, loop_size
=0):
795 """ Same as partition
797 @deprecated: Use partition method instead
799 return self
.partition(device
, loop_size
, mountpoint
)
802 def enable_external_logging(self
):
806 def disable_external_logging(self
):
810 def reboot_setup(self
):
811 # save the partition list and mount points, as well as the cpu count
812 partition_list
= partition_lib
.get_partition_list(self
,
814 mount_info
= partition_lib
.get_mount_info(partition_list
)
815 self
._state
.set('client', 'mount_info', mount_info
)
816 self
._state
.set('client', 'cpu_count', utils
.count_cpus())
819 def reboot(self
, tag
=LAST_BOOT_TAG
):
820 if tag
== LAST_BOOT_TAG
:
821 tag
= self
.last_boot_tag
823 self
.last_boot_tag
= tag
826 self
.harness
.run_reboot()
827 default
= self
.config_get('boot.set_default')
829 self
.bootloader
.set_default(tag
)
831 self
.bootloader
.boot_once(tag
)
833 # HACK: using this as a module sometimes hangs shutdown, so if it's
834 # installed unload it first
835 utils
.system("modprobe -r netconsole", ignore_status
=True)
837 # sync first, so that a sync during shutdown doesn't time out
838 utils
.system("sync; sync", ignore_status
=True)
840 utils
.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
844 def noop(self
, text
):
845 logging
.info("job: noop: " + text
)
848 @_run_test_complete_on_exit
849 def parallel(self
, *tasklist
):
850 """Run tasks in parallel"""
853 old_log_filename
= self
._logger
.global_filename
854 for i
, task
in enumerate(tasklist
):
855 assert isinstance(task
, (tuple, list))
856 self
._logger
.global_filename
= old_log_filename
+ (".%d" % i
)
858 # stub out _record_indent with a process-local one
859 base_record_indent
= self
._record
_indent
860 proc_local
= self
._job
_state
.property_factory(
861 '_state', '_record_indent.%d' % os
.getpid(),
862 base_record_indent
, namespace
='client')
863 self
.__class
__._record
_indent
= proc_local
865 pids
.append(parallel
.fork_start(self
.resultdir
, task_func
))
867 old_log_path
= os
.path
.join(self
.resultdir
, old_log_filename
)
868 old_log
= open(old_log_path
, "a")
870 for i
, pid
in enumerate(pids
):
871 # wait for the task to finish
873 parallel
.fork_waitfor(self
.resultdir
, pid
)
876 # copy the logs from the subtask into the main log
877 new_log_path
= old_log_path
+ (".%d" % i
)
878 if os
.path
.exists(new_log_path
):
879 new_log
= open(new_log_path
)
880 old_log
.write(new_log
.read())
883 os
.remove(new_log_path
)
886 self
._logger
.global_filename
= old_log_filename
888 # handle any exceptions raised by the parallel tasks
890 msg
= "%d task(s) failed in job.parallel" % len(exceptions
)
891 raise error
.JobError(msg
)
895 # XXX: should have a better name.
896 self
.harness
.run_pause()
897 raise error
.JobContinue("more to come")
900 def complete(self
, status
):
901 """Write pending TAP reports, clean up, and exit"""
902 # write out TAP reports
903 if self
._tap
.do_tap_report
:
905 self
._tap
._write
_tap
_archive
()
907 # We are about to exit 'complete' so clean up the control file.
908 dest
= os
.path
.join(self
.resultdir
, os
.path
.basename(self
._state
_file
))
909 shutil
.move(self
._state
_file
, dest
)
911 self
.harness
.run_complete()
912 self
.disable_external_logging()
916 def _load_state(self
):
917 # grab any initial state and set up $CONTROL.state as the backing file
918 init_state_file
= self
.control
+ '.init.state'
919 self
._state
_file
= self
.control
+ '.state'
920 if os
.path
.exists(init_state_file
):
921 shutil
.move(init_state_file
, self
._state
_file
)
922 self
._state
.set_backing_file(self
._state
_file
)
924 # initialize the state engine, if necessary
925 has_steps
= self
._state
.has('client', 'steps')
926 if not self
._is
_continuation
and has_steps
:
927 raise RuntimeError('Loaded state can only contain client.steps if '
928 'this is a continuation')
931 logging
.info('Initializing the state engine')
932 self
._state
.set('client', 'steps', [])
935 def handle_persistent_option(self
, options
, option_name
):
937 Select option from command line or persistent state.
938 Store selected option to allow standalone client to continue
939 after reboot with previously selected options.
941 1. explicitly specified via command line
942 2. stored in state file (if continuing job '-c')
946 cmd_line_option
= getattr(options
, option_name
)
948 option
= cmd_line_option
949 self
._state
.set('client', option_name
, option
)
951 stored_option
= self
._state
.get('client', option_name
, None)
953 option
= stored_option
954 logging
.debug('Persistent option %s now set to %s', option_name
, option
)
958 def __create_step_tuple(self
, fn
, args
, dargs
):
959 # Legacy code passes in an array where the first arg is
960 # the function or its name.
961 if isinstance(fn
, list):
962 assert(len(args
) == 0)
963 assert(len(dargs
) == 0)
966 # Pickling actual functions is hairy, thus we have to call
967 # them by name. Unfortunately, this means only functions
968 # defined globally can be used as a next step.
971 if not isinstance(fn
, types
.StringTypes
):
972 raise StepError("Next steps must be functions or "
973 "strings containing the function name")
974 ancestry
= copy
.copy(self
._current
_step
_ancestry
)
975 return (ancestry
, fn
, args
, dargs
)
978 def next_step_append(self
, fn
, *args
, **dargs
):
979 """Define the next step and place it at the end"""
980 steps
= self
._state
.get('client', 'steps')
981 steps
.append(self
.__create
_step
_tuple
(fn
, args
, dargs
))
982 self
._state
.set('client', 'steps', steps
)
985 def next_step(self
, fn
, *args
, **dargs
):
986 """Create a new step and place it after any steps added
987 while running the current step but before any steps added in
989 steps
= self
._state
.get('client', 'steps')
990 steps
.insert(self
._next
_step
_index
,
991 self
.__create
_step
_tuple
(fn
, args
, dargs
))
992 self
._next
_step
_index
+= 1
993 self
._state
.set('client', 'steps', steps
)
996 def next_step_prepend(self
, fn
, *args
, **dargs
):
997 """Insert a new step, executing first"""
998 steps
= self
._state
.get('client', 'steps')
999 steps
.insert(0, self
.__create
_step
_tuple
(fn
, args
, dargs
))
1000 self
._next
_step
_index
+= 1
1001 self
._state
.set('client', 'steps', steps
)
1005 def _run_step_fn(self
, local_vars
, fn
, args
, dargs
):
1006 """Run a (step) function within the given context"""
1008 local_vars
['__args'] = args
1009 local_vars
['__dargs'] = dargs
1011 exec('__ret = %s(*__args, **__dargs)' % fn
, local_vars
, local_vars
)
1012 return local_vars
['__ret']
1014 raise # Send error.JobContinue and JobComplete on up to runjob.
1015 except error
.TestNAError
, detail
:
1016 self
.record(detail
.exit_status
, None, fn
, str(detail
))
1017 except Exception, detail
:
1018 raise error
.UnhandledJobError(detail
)
1021 def _create_frame(self
, global_vars
, ancestry
, fn_name
):
1022 """Set up the environment like it would have been when this
1023 function was first defined.
1025 Child step engine 'implementations' must have 'return locals()'
1026 at end end of their steps. Because of this, we can call the
1027 parent function and get back all child functions (i.e. those
1030 Unfortunately, the call stack of the function calling
1031 job.next_step might have been deeper than the function it
1032 added. In order to make sure that the environment is what it
1033 should be, we need to then pop off the frames we built until
1034 we find the frame where the function was first defined."""
1036 # The copies ensure that the parent frames are not modified
1037 # while building child frames. This matters if we then
1038 # pop some frames in the next part of this function.
1039 current_frame
= copy
.copy(global_vars
)
1040 frames
= [current_frame
]
1041 for steps_fn_name
in ancestry
:
1042 ret
= self
._run
_step
_fn
(current_frame
, steps_fn_name
, [], {})
1043 current_frame
= copy
.copy(ret
)
1044 frames
.append(current_frame
)
1046 # Walk up the stack frames until we find the place fn_name was defined.
1047 while len(frames
) > 2:
1048 if fn_name
not in frames
[-2]:
1050 if frames
[-2][fn_name
] != frames
[-1][fn_name
]:
1055 return (frames
[-1], ancestry
)
1058 def _add_step_init(self
, local_vars
, current_function
):
1059 """If the function returned a dictionary that includes a
1060 function named 'step_init', prepend it to our list of steps.
1061 This will only get run the first time a function with a nested
1062 use of the step engine is run."""
1064 if (isinstance(local_vars
, dict) and
1065 'step_init' in local_vars
and
1066 callable(local_vars
['step_init'])):
1067 # The init step is a child of the function
1068 # we were just running.
1069 self
._current
_step
_ancestry
.append(current_function
)
1070 self
.next_step_prepend('step_init')
1073 def step_engine(self
):
1074 """The multi-run engine used when the control file defines step_init.
1079 # Set up the environment and then interpret the control file.
1080 # Some control files will have code outside of functions,
1081 # which means we need to have our state engine initialized
1082 # before reading in the file.
1083 global_control_vars
= {'job': self
,
1085 exec(JOB_PREAMBLE
, global_control_vars
, global_control_vars
)
1087 execfile(self
.control
, global_control_vars
, global_control_vars
)
1088 except error
.TestNAError
, detail
:
1089 self
.record(detail
.exit_status
, None, self
.control
, str(detail
))
1091 raise # Send error.JobContinue and JobComplete on up to runjob.
1092 except Exception, detail
:
1093 # Syntax errors or other general Python exceptions coming out of
1094 # the top level of the control file itself go through here.
1095 raise error
.UnhandledJobError(detail
)
1097 # If we loaded in a mid-job state file, then we presumably
1098 # know what steps we have yet to run.
1099 if not self
._is
_continuation
:
1100 if 'step_init' in global_control_vars
:
1101 self
.next_step(global_control_vars
['step_init'])
1103 # if last job failed due to unexpected reboot, record it as fail
1104 # so harness gets called
1105 last_job
= self
._state
.get('client', 'unexpected_reboot', None)
1107 subdir
, testname
= last_job
1108 self
.record('FAIL', subdir
, testname
, 'unexpected reboot')
1109 self
.record('END FAIL', subdir
, testname
)
1111 # Iterate through the steps. If we reboot, we'll simply
1112 # continue iterating on the next step.
1113 while len(self
._state
.get('client', 'steps')) > 0:
1114 steps
= self
._state
.get('client', 'steps')
1115 (ancestry
, fn_name
, args
, dargs
) = steps
.pop(0)
1116 self
._state
.set('client', 'steps', steps
)
1118 self
._next
_step
_index
= 0
1119 ret
= self
._create
_frame
(global_control_vars
, ancestry
, fn_name
)
1120 local_vars
, self
._current
_step
_ancestry
= ret
1121 local_vars
= self
._run
_step
_fn
(local_vars
, fn_name
, args
, dargs
)
1122 self
._add
_step
_init
(local_vars
, fn_name
)
1125 def add_sysinfo_command(self
, command
, logfile
=None, on_every_test
=False):
1126 self
._add
_sysinfo
_loggable
(sysinfo
.command(command
, logf
=logfile
),
1130 def add_sysinfo_logfile(self
, file, on_every_test
=False):
1131 self
._add
_sysinfo
_loggable
(sysinfo
.logfile(file), on_every_test
)
1134 def _add_sysinfo_loggable(self
, loggable
, on_every_test
):
1136 self
.sysinfo
.test_loggables
.add(loggable
)
1138 self
.sysinfo
.boot_loggables
.add(loggable
)
1139 self
._save
_sysinfo
_state
()
1142 def _load_sysinfo_state(self
):
1143 state
= self
._state
.get('client', 'sysinfo', None)
1145 self
.sysinfo
.deserialize(state
)
1148 def _save_sysinfo_state(self
):
1149 state
= self
.sysinfo
.serialize()
1150 self
._state
.set('client', 'sysinfo', state
)
1153 class disk_usage_monitor
:
1154 def __init__(self
, logging_func
, device
, max_mb_per_hour
):
1155 self
.func
= logging_func
1156 self
.device
= device
1157 self
.max_mb_per_hour
= max_mb_per_hour
1161 self
.initial_space
= utils
.freespace(self
.device
)
1162 self
.start_time
= time
.time()
1166 # if no maximum usage rate was set, we don't need to
1167 # generate any warnings
1168 if not self
.max_mb_per_hour
:
1171 final_space
= utils
.freespace(self
.device
)
1172 used_space
= self
.initial_space
- final_space
1173 stop_time
= time
.time()
1174 total_time
= stop_time
- self
.start_time
1175 # round up the time to one minute, to keep extremely short
1176 # tests from generating false positives due to short, badly
1177 # timed bursts of activity
1178 total_time
= max(total_time
, 60.0)
1180 # determine the usage rate
1181 bytes_per_sec
= used_space
/ total_time
1182 mb_per_sec
= bytes_per_sec
/ 1024**2
1183 mb_per_hour
= mb_per_sec
* 60 * 60
1185 if mb_per_hour
> self
.max_mb_per_hour
:
1186 msg
= ("disk space on %s was consumed at a rate of %.2f MB/hour")
1187 msg
%= (self
.device
, mb_per_hour
)
1192 def watch(cls
, *monitor_args
, **monitor_dargs
):
1193 """ Generic decorator to wrap a function call with the
1194 standard create-monitor -> start -> call -> stop idiom."""
1195 def decorator(func
):
1196 def watched_func(*args
, **dargs
):
1197 monitor
= cls(*monitor_args
, **monitor_dargs
)
1200 func(*args
, **dargs
)
1207 def runjob(control
, drop_caches
, options
):
1209 Run a job using the given control file.
1211 This is the main interface to this module.
1213 @see base_job.__init__ for parameter info.
1215 control
= os
.path
.abspath(control
)
1216 state
= control
+ '.state'
1217 # Ensure state file is cleaned up before the job starts to run if autotest
1218 # is not running with the --continue flag
1219 if not options
.cont
and os
.path
.isfile(state
):
1220 logging
.debug('Cleaning up previously found state file')
1223 # instantiate the job object ready for the control file.
1226 # Check that the control file is valid
1227 if not os
.path
.exists(control
):
1228 raise error
.JobError(control
+ ": control file not found")
1230 # When continuing, the job is complete when there is no
1231 # state file, ensure we don't try and continue.
1232 if options
.cont
and not os
.path
.exists(state
):
1233 raise error
.JobComplete("all done")
1235 myjob
= job(control
=control
, drop_caches
=drop_caches
, options
=options
)
1237 # Load in the users control file, may do any one of:
1238 # 1) execute in toto
1239 # 2) define steps, and select the first via next_step()
1242 except error
.JobContinue
:
1245 except error
.JobComplete
:
1248 except error
.JobError
, instance
:
1249 logging
.error("JOB ERROR: " + str(instance
))
1252 if len(instance
.args
) > 1:
1253 command
= instance
.args
[1]
1254 myjob
.record('ABORT', None, command
, str(instance
))
1255 myjob
.record('END ABORT', None, None, str(instance
))
1256 assert myjob
._record
_indent
== 0
1261 except Exception, e
:
1262 # NOTE: job._run_step_fn and job.step_engine will turn things into
1263 # a JobError for us. If we get here, its likely an autotest bug.
1264 msg
= str(e
) + '\n' + traceback
.format_exc()
1265 logging
.critical("JOB ERROR (autotest bug?): " + msg
)
1267 myjob
.record('END ABORT', None, None, msg
)
1268 assert myjob
._record
_indent
== 0
1273 # If we get here, then we assume the job is complete and good.
1274 myjob
.record('END GOOD', None, None)
1275 assert myjob
._record
_indent
== 0
1280 site_job
= utils
.import_site_class(
1281 __file__
, "autotest_lib.client.bin.site_job", "site_job", base_client_job
)
1283 class job(site_job
):