virt.virt_test_utils: run_autotest - 'tar' needs relative paths to strip the leading '/'
[autotest-zwu.git] / server / server_job.py
blob8a16c6a4fdb06ba250149d459d29cfd9c34ccd30
1 """
2 The main job wrapper for the server side.
4 This is the core infrastructure. Derived from the client side job.py
6 Copyright Martin J. Bligh, Andy Whitcroft 2007
7 """
9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform
10 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
11 from autotest_lib.client.bin import sysinfo
12 from autotest_lib.client.common_lib import base_job
13 from autotest_lib.client.common_lib import error, log, utils, packages
14 from autotest_lib.client.common_lib import logging_manager
15 from autotest_lib.server import test, subcommand, profilers
16 from autotest_lib.server.hosts import abstract_ssh
17 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20 def _control_segment_path(name):
21 """Get the pathname of the named control segment file."""
22 server_dir = os.path.dirname(os.path.abspath(__file__))
23 return os.path.join(server_dir, "control_segments", name)
26 CLIENT_CONTROL_FILENAME = 'control'
27 SERVER_CONTROL_FILENAME = 'control.srv'
28 MACHINES_FILENAME = '.machines'
30 CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
31 CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
32 CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
33 INSTALL_CONTROL_FILE = _control_segment_path('install')
34 CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
36 VERIFY_CONTROL_FILE = _control_segment_path('verify')
37 REPAIR_CONTROL_FILE = _control_segment_path('repair')
40 # by default provide a stub that generates no site data
41 def _get_site_job_data_dummy(job):
42 return {}
45 # load up site-specific code for generating site-specific job data
46 get_site_job_data = utils.import_site_function(__file__,
47 "autotest_lib.server.site_server_job", "get_site_job_data",
48 _get_site_job_data_dummy)
51 class status_indenter(base_job.status_indenter):
52 """Provide a simple integer-backed status indenter."""
53 def __init__(self):
54 self._indent = 0
57 @property
58 def indent(self):
59 return self._indent
62 def increment(self):
63 self._indent += 1
66 def decrement(self):
67 self._indent -= 1
70 def get_context(self):
71 """Returns a context object for use by job.get_record_context."""
72 class context(object):
73 def __init__(self, indenter, indent):
74 self._indenter = indenter
75 self._indent = indent
76 def restore(self):
77 self._indenter._indent = self._indent
78 return context(self, self._indent)
81 class server_job_record_hook(object):
82 """The job.record hook for server job. Used to inject WARN messages from
83 the console or vlm whenever new logs are written, and to echo any logs
84 to INFO level logging. Implemented as a class so that it can use state to
85 block recursive calls, so that the hook can call job.record itself to
86 log WARN messages.
88 Depends on job._read_warnings and job._logger.
89 """
90 def __init__(self, job):
91 self._job = job
92 self._being_called = False
95 def __call__(self, entry):
96 """A wrapper around the 'real' record hook, the _hook method, which
97 prevents recursion. This isn't making any effort to be threadsafe,
98 the intent is to outright block infinite recursion via a
99 job.record->_hook->job.record->_hook->job.record... chain."""
100 if self._being_called:
101 return
102 self._being_called = True
103 try:
104 self._hook(self._job, entry)
105 finally:
106 self._being_called = False
109 @staticmethod
110 def _hook(job, entry):
111 """The core hook, which can safely call job.record."""
112 entries = []
113 # poll all our warning loggers for new warnings
114 for timestamp, msg in job._read_warnings():
115 warning_entry = base_job.status_log_entry(
116 'WARN', None, None, msg, {}, timestamp=timestamp)
117 entries.append(warning_entry)
118 job.record_entry(warning_entry)
119 # echo rendered versions of all the status logs to info
120 entries.append(entry)
121 for entry in entries:
122 rendered_entry = job._logger.render_entry(entry)
123 logging.info(rendered_entry)
124 job._parse_status(rendered_entry)
127 class base_server_job(base_job.base_job):
128 """The server-side concrete implementation of base_job.
130 Optional properties provided by this implementation:
131 serverdir
132 conmuxdir
134 num_tests_run
135 num_tests_failed
137 warning_manager
138 warning_loggers
141 _STATUS_VERSION = 1
143 def __init__(self, control, args, resultdir, label, user, machines,
144 client=False, parse_job='',
145 ssh_user='root', ssh_port=22, ssh_pass='',
146 group_name='', tag='',
147 control_filename=SERVER_CONTROL_FILENAME):
149 Create a server side job object.
151 @param control: The pathname of the control file.
152 @param args: Passed to the control file.
153 @param resultdir: Where to throw the results.
154 @param label: Description of the job.
155 @param user: Username for the job (email address).
156 @param client: True if this is a client-side control file.
157 @param parse_job: string, if supplied it is the job execution tag that
158 the results will be passed through to the TKO parser with.
159 @param ssh_user: The SSH username. [root]
160 @param ssh_port: The SSH port number. [22]
161 @param ssh_pass: The SSH passphrase, if needed.
162 @param group_name: If supplied, this will be written out as
163 host_group_name in the keyvals file for the parser.
164 @param tag: The job execution tag from the scheduler. [optional]
165 @param control_filename: The filename where the server control file
166 should be written in the results directory.
168 super(base_server_job, self).__init__(resultdir=resultdir)
170 path = os.path.dirname(__file__)
171 self.control = control
172 self._uncollected_log_file = os.path.join(self.resultdir,
173 'uncollected_logs')
174 debugdir = os.path.join(self.resultdir, 'debug')
175 if not os.path.exists(debugdir):
176 os.mkdir(debugdir)
178 if user:
179 self.user = user
180 else:
181 self.user = getpass.getuser()
183 self.args = args
184 self.machines = machines
185 self._client = client
186 self.warning_loggers = set()
187 self.warning_manager = warning_manager()
188 self._ssh_user = ssh_user
189 self._ssh_port = ssh_port
190 self._ssh_pass = ssh_pass
191 self.tag = tag
192 self.last_boot_tag = None
193 self.hosts = set()
194 self.drop_caches = False
195 self.drop_caches_between_iterations = False
196 self._control_filename = control_filename
198 self.logging = logging_manager.get_logging_manager(
199 manage_stdout_and_stderr=True, redirect_fds=True)
200 subcommand.logging_manager_object = self.logging
202 self.sysinfo = sysinfo.sysinfo(self.resultdir)
203 self.profilers = profilers.profilers(self)
205 job_data = {'label' : label, 'user' : user,
206 'hostname' : ','.join(machines),
207 'drone' : platform.node(),
208 'status_version' : str(self._STATUS_VERSION),
209 'job_started' : str(int(time.time()))}
210 if group_name:
211 job_data['host_group_name'] = group_name
213 # only write these keyvals out on the first job in a resultdir
214 if 'job_started' not in utils.read_keyval(self.resultdir):
215 job_data.update(get_site_job_data(self))
216 utils.write_keyval(self.resultdir, job_data)
218 self._parse_job = parse_job
219 self._using_parser = (self._parse_job and len(machines) <= 1)
220 self.pkgmgr = packages.PackageManager(
221 self.autodir, run_function_dargs={'timeout':600})
222 self.num_tests_run = 0
223 self.num_tests_failed = 0
225 self._register_subcommand_hooks()
227 # these components aren't usable on the server
228 self.bootloader = None
229 self.harness = None
231 # set up the status logger
232 self._indenter = status_indenter()
233 self._logger = base_job.status_logger(
234 self, self._indenter, 'status.log', 'status.log',
235 record_hook=server_job_record_hook(self))
238 @classmethod
239 def _find_base_directories(cls):
241 Determine locations of autodir, clientdir and serverdir. Assumes
242 that this file is located within serverdir and uses __file__ along
243 with relative paths to resolve the location.
245 serverdir = os.path.abspath(os.path.dirname(__file__))
246 autodir = os.path.normpath(os.path.join(serverdir, '..'))
247 clientdir = os.path.join(autodir, 'client')
248 return autodir, clientdir, serverdir
251 def _find_resultdir(self, resultdir):
253 Determine the location of resultdir. For server jobs we expect one to
254 always be explicitly passed in to __init__, so just return that.
256 if resultdir:
257 return os.path.normpath(resultdir)
258 else:
259 return None
262 def _get_status_logger(self):
263 """Return a reference to the status logger."""
264 return self._logger
267 @staticmethod
268 def _load_control_file(path):
269 f = open(path)
270 try:
271 control_file = f.read()
272 finally:
273 f.close()
274 return re.sub('\r', '', control_file)
277 def _register_subcommand_hooks(self):
279 Register some hooks into the subcommand modules that allow us
280 to properly clean up self.hosts created in forked subprocesses.
282 def on_fork(cmd):
283 self._existing_hosts_on_fork = set(self.hosts)
284 def on_join(cmd):
285 new_hosts = self.hosts - self._existing_hosts_on_fork
286 for host in new_hosts:
287 host.close()
288 subcommand.subcommand.register_fork_hook(on_fork)
289 subcommand.subcommand.register_join_hook(on_join)
292 def init_parser(self):
294 Start the continuous parsing of self.resultdir. This sets up
295 the database connection and inserts the basic job object into
296 the database if necessary.
298 if not self._using_parser:
299 return
300 # redirect parser debugging to .parse.log
301 parse_log = os.path.join(self.resultdir, '.parse.log')
302 parse_log = open(parse_log, 'w', 0)
303 tko_utils.redirect_parser_debugging(parse_log)
304 # create a job model object and set up the db
305 self.results_db = tko_db.db(autocommit=True)
306 self.parser = status_lib.parser(self._STATUS_VERSION)
307 self.job_model = self.parser.make_job(self.resultdir)
308 self.parser.start(self.job_model)
309 # check if a job already exists in the db and insert it if
310 # it does not
311 job_idx = self.results_db.find_job(self._parse_job)
312 if job_idx is None:
313 self.results_db.insert_job(self._parse_job, self.job_model)
314 else:
315 machine_idx = self.results_db.lookup_machine(self.job_model.machine)
316 self.job_model.index = job_idx
317 self.job_model.machine_idx = machine_idx
320 def cleanup_parser(self):
322 This should be called after the server job is finished
323 to carry out any remaining cleanup (e.g. flushing any
324 remaining test results to the results db)
326 if not self._using_parser:
327 return
328 final_tests = self.parser.end()
329 for test in final_tests:
330 self.__insert_test(test)
331 self._using_parser = False
334 def verify(self):
335 if not self.machines:
336 raise error.AutoservError('No machines specified to verify')
337 if self.resultdir:
338 os.chdir(self.resultdir)
339 try:
340 namespace = {'machines' : self.machines, 'job' : self,
341 'ssh_user' : self._ssh_user,
342 'ssh_port' : self._ssh_port,
343 'ssh_pass' : self._ssh_pass}
344 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
345 except Exception, e:
346 msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc())
347 self.record('ABORT', None, None, msg)
348 raise
351 def repair(self, host_protection):
352 if not self.machines:
353 raise error.AutoservError('No machines specified to repair')
354 if self.resultdir:
355 os.chdir(self.resultdir)
356 namespace = {'machines': self.machines, 'job': self,
357 'ssh_user': self._ssh_user, 'ssh_port': self._ssh_port,
358 'ssh_pass': self._ssh_pass,
359 'protection_level': host_protection}
361 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
364 def precheck(self):
366 perform any additional checks in derived classes.
368 pass
371 def enable_external_logging(self):
373 Start or restart external logging mechanism.
375 pass
378 def disable_external_logging(self):
380 Pause or stop external logging mechanism.
382 pass
385 def use_external_logging(self):
387 Return True if external logging should be used.
389 return False
392 def _make_parallel_wrapper(self, function, machines, log):
393 """Wrap function as appropriate for calling by parallel_simple."""
394 is_forking = not (len(machines) == 1 and self.machines == machines)
395 if self._parse_job and is_forking and log:
396 def wrapper(machine):
397 self._parse_job += "/" + machine
398 self._using_parser = True
399 self.machines = [machine]
400 self.push_execution_context(machine)
401 os.chdir(self.resultdir)
402 utils.write_keyval(self.resultdir, {"hostname": machine})
403 self.init_parser()
404 result = function(machine)
405 self.cleanup_parser()
406 return result
407 elif len(machines) > 1 and log:
408 def wrapper(machine):
409 self.push_execution_context(machine)
410 os.chdir(self.resultdir)
411 machine_data = {'hostname' : machine,
412 'status_version' : str(self._STATUS_VERSION)}
413 utils.write_keyval(self.resultdir, machine_data)
414 result = function(machine)
415 return result
416 else:
417 wrapper = function
418 return wrapper
421 def parallel_simple(self, function, machines, log=True, timeout=None,
422 return_results=False):
424 Run 'function' using parallel_simple, with an extra wrapper to handle
425 the necessary setup for continuous parsing, if possible. If continuous
426 parsing is already properly initialized then this should just work.
428 @param function: A callable to run in parallel given each machine.
429 @param machines: A list of machine names to be passed one per subcommand
430 invocation of function.
431 @param log: If True, output will be written to output in a subdirectory
432 named after each machine.
433 @param timeout: Seconds after which the function call should timeout.
434 @param return_results: If True instead of an AutoServError being raised
435 on any error a list of the results|exceptions from the function
436 called on each arg is returned. [default: False]
438 @raises error.AutotestError: If any of the functions failed.
440 wrapper = self._make_parallel_wrapper(function, machines, log)
441 return subcommand.parallel_simple(wrapper, machines,
442 log=log, timeout=timeout,
443 return_results=return_results)
446 def parallel_on_machines(self, function, machines, timeout=None):
448 @param function: Called in parallel with one machine as its argument.
449 @param machines: A list of machines to call function(machine) on.
450 @param timeout: Seconds after which the function call should timeout.
452 @returns A list of machines on which function(machine) returned
453 without raising an exception.
455 results = self.parallel_simple(function, machines, timeout=timeout,
456 return_results=True)
457 success_machines = []
458 for result, machine in itertools.izip(results, machines):
459 if not isinstance(result, Exception):
460 success_machines.append(machine)
461 return success_machines
464 _USE_TEMP_DIR = object()
465 def run(self, cleanup=False, install_before=False, install_after=False,
466 collect_crashdumps=True, namespace={}, control=None,
467 control_file_dir=None, only_collect_crashinfo=False):
468 # for a normal job, make sure the uncollected logs file exists
469 # for a crashinfo-only run it should already exist, bail out otherwise
470 created_uncollected_logs = False
471 if self.resultdir and not os.path.exists(self._uncollected_log_file):
472 if only_collect_crashinfo:
473 # if this is a crashinfo-only run, and there were no existing
474 # uncollected logs, just bail out early
475 logging.info("No existing uncollected logs, "
476 "skipping crashinfo collection")
477 return
478 else:
479 log_file = open(self._uncollected_log_file, "w")
480 pickle.dump([], log_file)
481 log_file.close()
482 created_uncollected_logs = True
484 # use a copy so changes don't affect the original dictionary
485 namespace = namespace.copy()
486 machines = self.machines
487 if control is None:
488 if self.control is None:
489 control = ''
490 else:
491 control = self._load_control_file(self.control)
492 if control_file_dir is None:
493 control_file_dir = self.resultdir
495 self.aborted = False
496 namespace['machines'] = machines
497 namespace['args'] = self.args
498 namespace['job'] = self
499 namespace['ssh_user'] = self._ssh_user
500 namespace['ssh_port'] = self._ssh_port
501 namespace['ssh_pass'] = self._ssh_pass
502 test_start_time = int(time.time())
504 if self.resultdir:
505 os.chdir(self.resultdir)
506 # touch status.log so that the parser knows a job is running here
507 open(self.get_status_log_path(), 'a').close()
508 self.enable_external_logging()
510 collect_crashinfo = True
511 temp_control_file_dir = None
512 try:
513 try:
514 if install_before and machines:
515 self._execute_code(INSTALL_CONTROL_FILE, namespace)
517 if only_collect_crashinfo:
518 return
520 # determine the dir to write the control files to
521 cfd_specified = (control_file_dir
522 and control_file_dir is not self._USE_TEMP_DIR)
523 if cfd_specified:
524 temp_control_file_dir = None
525 else:
526 temp_control_file_dir = tempfile.mkdtemp(
527 suffix='temp_control_file_dir')
528 control_file_dir = temp_control_file_dir
529 server_control_file = os.path.join(control_file_dir,
530 self._control_filename)
531 client_control_file = os.path.join(control_file_dir,
532 CLIENT_CONTROL_FILENAME)
533 if self._client:
534 namespace['control'] = control
535 utils.open_write_close(client_control_file, control)
536 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
537 server_control_file)
538 else:
539 utils.open_write_close(server_control_file, control)
540 logging.info("Processing control file")
541 self._execute_code(server_control_file, namespace)
542 logging.info("Finished processing control file")
544 # no error occured, so we don't need to collect crashinfo
545 collect_crashinfo = False
546 except Exception, e:
547 try:
548 logging.exception(
549 'Exception escaped control file, job aborting:')
550 self.record('INFO', None, None, str(e),
551 {'job_abort_reason': str(e)})
552 except:
553 pass # don't let logging exceptions here interfere
554 raise
555 finally:
556 if temp_control_file_dir:
557 # Clean up temp directory used for copies of the control files
558 try:
559 shutil.rmtree(temp_control_file_dir)
560 except Exception, e:
561 logging.warn('Could not remove temp directory %s: %s',
562 temp_control_file_dir, e)
564 if machines and (collect_crashdumps or collect_crashinfo):
565 namespace['test_start_time'] = test_start_time
566 if collect_crashinfo:
567 # includes crashdumps
568 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
569 else:
570 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
571 if self._uncollected_log_file and created_uncollected_logs:
572 os.remove(self._uncollected_log_file)
573 self.disable_external_logging()
574 if cleanup and machines:
575 self._execute_code(CLEANUP_CONTROL_FILE, namespace)
576 if install_after and machines:
577 self._execute_code(INSTALL_CONTROL_FILE, namespace)
580 def run_test(self, url, *args, **dargs):
582 Summon a test object and run it.
585 tag to add to testname
587 url of the test to run
589 group, testname = self.pkgmgr.get_package_name(url, 'test')
590 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
591 outputdir = self._make_test_outputdir(subdir)
593 def group_func():
594 try:
595 test.runtest(self, url, tag, args, dargs)
596 except error.TestBaseException, e:
597 self.record(e.exit_status, subdir, testname, str(e))
598 raise
599 except Exception, e:
600 info = str(e) + "\n" + traceback.format_exc()
601 self.record('FAIL', subdir, testname, info)
602 raise
603 else:
604 self.record('GOOD', subdir, testname, 'completed successfully')
606 result, exc_info = self._run_group(testname, subdir, group_func)
607 if exc_info and isinstance(exc_info[1], error.TestBaseException):
608 return False
609 elif exc_info:
610 raise exc_info[0], exc_info[1], exc_info[2]
611 else:
612 return True
615 def _run_group(self, name, subdir, function, *args, **dargs):
616 """\
617 Underlying method for running something inside of a group.
619 result, exc_info = None, None
620 try:
621 self.record('START', subdir, name)
622 result = function(*args, **dargs)
623 except error.TestBaseException, e:
624 self.record("END %s" % e.exit_status, subdir, name)
625 exc_info = sys.exc_info()
626 except Exception, e:
627 err_msg = str(e) + '\n'
628 err_msg += traceback.format_exc()
629 self.record('END ABORT', subdir, name, err_msg)
630 raise error.JobError(name + ' failed\n' + traceback.format_exc())
631 else:
632 self.record('END GOOD', subdir, name)
634 return result, exc_info
637 def run_group(self, function, *args, **dargs):
638 """\
639 function:
640 subroutine to run
641 *args:
642 arguments for the function
645 name = function.__name__
647 # Allow the tag for the group to be specified.
648 tag = dargs.pop('tag', None)
649 if tag:
650 name = tag
652 return self._run_group(name, None, function, *args, **dargs)[0]
655 def run_reboot(self, reboot_func, get_kernel_func):
656 """\
657 A specialization of run_group meant specifically for handling
658 a reboot. Includes support for capturing the kernel version
659 after the reboot.
661 reboot_func: a function that carries out the reboot
663 get_kernel_func: a function that returns a string
664 representing the kernel version.
666 try:
667 self.record('START', None, 'reboot')
668 reboot_func()
669 except Exception, e:
670 err_msg = str(e) + '\n' + traceback.format_exc()
671 self.record('END FAIL', None, 'reboot', err_msg)
672 raise
673 else:
674 kernel = get_kernel_func()
675 self.record('END GOOD', None, 'reboot',
676 optional_fields={"kernel": kernel})
679 def run_control(self, path):
680 """Execute a control file found at path (relative to the autotest
681 path). Intended for executing a control file within a control file,
682 not for running the top-level job control file."""
683 path = os.path.join(self.autodir, path)
684 control_file = self._load_control_file(path)
685 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
688 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
689 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
690 on_every_test)
693 def add_sysinfo_logfile(self, file, on_every_test=False):
694 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
697 def _add_sysinfo_loggable(self, loggable, on_every_test):
698 if on_every_test:
699 self.sysinfo.test_loggables.add(loggable)
700 else:
701 self.sysinfo.boot_loggables.add(loggable)
704 def _read_warnings(self):
705 """Poll all the warning loggers and extract any new warnings that have
706 been logged. If the warnings belong to a category that is currently
707 disabled, this method will discard them and they will no longer be
708 retrievable.
710 Returns a list of (timestamp, message) tuples, where timestamp is an
711 integer epoch timestamp."""
712 warnings = []
713 while True:
714 # pull in a line of output from every logger that has
715 # output ready to be read
716 loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
717 closed_loggers = set()
718 for logger in loggers:
719 line = logger.readline()
720 # record any broken pipes (aka line == empty)
721 if len(line) == 0:
722 closed_loggers.add(logger)
723 continue
724 # parse out the warning
725 timestamp, msgtype, msg = line.split('\t', 2)
726 timestamp = int(timestamp)
727 # if the warning is valid, add it to the results
728 if self.warning_manager.is_valid(timestamp, msgtype):
729 warnings.append((timestamp, msg.strip()))
731 # stop listening to loggers that are closed
732 self.warning_loggers -= closed_loggers
734 # stop if none of the loggers have any output left
735 if not loggers:
736 break
738 # sort into timestamp order
739 warnings.sort()
740 return warnings
743 def _unique_subdirectory(self, base_subdirectory_name):
744 """Compute a unique results subdirectory based on the given name.
746 Appends base_subdirectory_name with a number as necessary to find a
747 directory name that doesn't already exist.
749 subdirectory = base_subdirectory_name
750 counter = 1
751 while os.path.exists(os.path.join(self.resultdir, subdirectory)):
752 subdirectory = base_subdirectory_name + '.' + str(counter)
753 counter += 1
754 return subdirectory
757 def get_record_context(self):
758 """Returns an object representing the current job.record context.
760 The object returned is an opaque object with a 0-arg restore method
761 which can be called to restore the job.record context (i.e. indentation)
762 to the current level. The intention is that it should be used when
763 something external which generate job.record calls (e.g. an autotest
764 client) can fail catastrophically and the server job record state
765 needs to be reset to its original "known good" state.
767 @return: A context object with a 0-arg restore() method."""
768 return self._indenter.get_context()
771 def record_summary(self, status_code, test_name, reason='', attributes=None,
772 distinguishing_attributes=(), child_test_ids=None):
773 """Record a summary test result.
775 @param status_code: status code string, see
776 common_lib.log.is_valid_status()
777 @param test_name: name of the test
778 @param reason: (optional) string providing detailed reason for test
779 outcome
780 @param attributes: (optional) dict of string keyvals to associate with
781 this result
782 @param distinguishing_attributes: (optional) list of attribute names
783 that should be used to distinguish identically-named test
784 results. These attributes should be present in the attributes
785 parameter. This is used to generate user-friendly subdirectory
786 names.
787 @param child_test_ids: (optional) list of test indices for test results
788 used in generating this result.
790 subdirectory_name_parts = [test_name]
791 for attribute in distinguishing_attributes:
792 assert attributes
793 assert attribute in attributes, '%s not in %s' % (attribute,
794 attributes)
795 subdirectory_name_parts.append(attributes[attribute])
796 base_subdirectory_name = '.'.join(subdirectory_name_parts)
798 subdirectory = self._unique_subdirectory(base_subdirectory_name)
799 subdirectory_path = os.path.join(self.resultdir, subdirectory)
800 os.mkdir(subdirectory_path)
802 self.record(status_code, subdirectory, test_name,
803 status=reason, optional_fields={'is_summary': True})
805 if attributes:
806 utils.write_keyval(subdirectory_path, attributes)
808 if child_test_ids:
809 ids_string = ','.join(str(test_id) for test_id in child_test_ids)
810 summary_data = {'child_test_ids': ids_string}
811 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
812 summary_data)
815 def disable_warnings(self, warning_type):
816 self.warning_manager.disable_warnings(warning_type)
817 self.record("INFO", None, None,
818 "disabling %s warnings" % warning_type,
819 {"warnings.disable": warning_type})
822 def enable_warnings(self, warning_type):
823 self.warning_manager.enable_warnings(warning_type)
824 self.record("INFO", None, None,
825 "enabling %s warnings" % warning_type,
826 {"warnings.enable": warning_type})
829 def get_status_log_path(self, subdir=None):
830 """Return the path to the job status log.
832 @param subdir - Optional paramter indicating that you want the path
833 to a subdirectory status log.
835 @returns The path where the status log should be.
837 if self.resultdir:
838 if subdir:
839 return os.path.join(self.resultdir, subdir, "status.log")
840 else:
841 return os.path.join(self.resultdir, "status.log")
842 else:
843 return None
846 def _update_uncollected_logs_list(self, update_func):
847 """Updates the uncollected logs list in a multi-process safe manner.
849 @param update_func - a function that updates the list of uncollected
850 logs. Should take one parameter, the list to be updated.
852 if self._uncollected_log_file:
853 log_file = open(self._uncollected_log_file, "r+")
854 fcntl.flock(log_file, fcntl.LOCK_EX)
855 try:
856 uncollected_logs = pickle.load(log_file)
857 update_func(uncollected_logs)
858 log_file.seek(0)
859 log_file.truncate()
860 pickle.dump(uncollected_logs, log_file)
861 log_file.flush()
862 finally:
863 fcntl.flock(log_file, fcntl.LOCK_UN)
864 log_file.close()
867 def add_client_log(self, hostname, remote_path, local_path):
868 """Adds a new set of client logs to the list of uncollected logs,
869 to allow for future log recovery.
871 @param host - the hostname of the machine holding the logs
872 @param remote_path - the directory on the remote machine holding logs
873 @param local_path - the local directory to copy the logs into
875 def update_func(logs_list):
876 logs_list.append((hostname, remote_path, local_path))
877 self._update_uncollected_logs_list(update_func)
880 def remove_client_log(self, hostname, remote_path, local_path):
881 """Removes a set of client logs from the list of uncollected logs,
882 to allow for future log recovery.
884 @param host - the hostname of the machine holding the logs
885 @param remote_path - the directory on the remote machine holding logs
886 @param local_path - the local directory to copy the logs into
888 def update_func(logs_list):
889 logs_list.remove((hostname, remote_path, local_path))
890 self._update_uncollected_logs_list(update_func)
893 def get_client_logs(self):
894 """Retrieves the list of uncollected logs, if it exists.
896 @returns A list of (host, remote_path, local_path) tuples. Returns
897 an empty list if no uncollected logs file exists.
899 log_exists = (self._uncollected_log_file and
900 os.path.exists(self._uncollected_log_file))
901 if log_exists:
902 return pickle.load(open(self._uncollected_log_file))
903 else:
904 return []
907 def _fill_server_control_namespace(self, namespace, protect=True):
909 Prepare a namespace to be used when executing server control files.
911 This sets up the control file API by importing modules and making them
912 available under the appropriate names within namespace.
914 For use by _execute_code().
916 Args:
917 namespace: The namespace dictionary to fill in.
918 protect: Boolean. If True (the default) any operation that would
919 clobber an existing entry in namespace will cause an error.
920 Raises:
921 error.AutoservError: When a name would be clobbered by import.
923 def _import_names(module_name, names=()):
925 Import a module and assign named attributes into namespace.
927 Args:
928 module_name: The string module name.
929 names: A limiting list of names to import from module_name. If
930 empty (the default), all names are imported from the module
931 similar to a "from foo.bar import *" statement.
932 Raises:
933 error.AutoservError: When a name being imported would clobber
934 a name already in namespace.
936 module = __import__(module_name, {}, {}, names)
938 # No names supplied? Import * from the lowest level module.
939 # (Ugh, why do I have to implement this part myself?)
940 if not names:
941 for submodule_name in module_name.split('.')[1:]:
942 module = getattr(module, submodule_name)
943 if hasattr(module, '__all__'):
944 names = getattr(module, '__all__')
945 else:
946 names = dir(module)
948 # Install each name into namespace, checking to make sure it
949 # doesn't override anything that already exists.
950 for name in names:
951 # Check for conflicts to help prevent future problems.
952 if name in namespace and protect:
953 if namespace[name] is not getattr(module, name):
954 raise error.AutoservError('importing name '
955 '%s from %s %r would override %r' %
956 (name, module_name, getattr(module, name),
957 namespace[name]))
958 else:
959 # Encourage cleanliness and the use of __all__ for a
960 # more concrete API with less surprises on '*' imports.
961 warnings.warn('%s (%r) being imported from %s for use '
962 'in server control files is not the '
963 'first occurrance of that import.' %
964 (name, namespace[name], module_name))
966 namespace[name] = getattr(module, name)
969 # This is the equivalent of prepending a bunch of import statements to
970 # the front of the control script.
971 namespace.update(os=os, sys=sys, logging=logging)
972 _import_names('autotest_lib.server',
973 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
974 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
975 _import_names('autotest_lib.server.subcommand',
976 ('parallel', 'parallel_simple', 'subcommand'))
977 _import_names('autotest_lib.server.utils',
978 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
979 _import_names('autotest_lib.client.common_lib.error')
980 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
982 # Inject ourself as the job object into other classes within the API.
983 # (Yuck, this injection is a gross thing be part of a public API. -gps)
985 # XXX Base & SiteAutotest do not appear to use .job. Who does?
986 namespace['autotest'].Autotest.job = self
987 # server.hosts.base_classes.Host uses .job.
988 namespace['hosts'].Host.job = self
989 namespace['hosts'].factory.ssh_user = self._ssh_user
990 namespace['hosts'].factory.ssh_port = self._ssh_port
991 namespace['hosts'].factory.ssh_pass = self._ssh_pass
994 def _execute_code(self, code_file, namespace, protect=True):
996 Execute code using a copy of namespace as a server control script.
998 Unless protect_namespace is explicitly set to False, the dict will not
999 be modified.
1001 Args:
1002 code_file: The filename of the control file to execute.
1003 namespace: A dict containing names to make available during execution.
1004 protect: Boolean. If True (the default) a copy of the namespace dict
1005 is used during execution to prevent the code from modifying its
1006 contents outside of this function. If False the raw dict is
1007 passed in and modifications will be allowed.
1009 if protect:
1010 namespace = namespace.copy()
1011 self._fill_server_control_namespace(namespace, protect=protect)
1012 # TODO: Simplify and get rid of the special cases for only 1 machine.
1013 if len(self.machines) > 1:
1014 machines_text = '\n'.join(self.machines) + '\n'
1015 # Only rewrite the file if it does not match our machine list.
1016 try:
1017 machines_f = open(MACHINES_FILENAME, 'r')
1018 existing_machines_text = machines_f.read()
1019 machines_f.close()
1020 except EnvironmentError:
1021 existing_machines_text = None
1022 if machines_text != existing_machines_text:
1023 utils.open_write_close(MACHINES_FILENAME, machines_text)
1024 execfile(code_file, namespace, namespace)
1027 def _parse_status(self, new_line):
1028 if not self._using_parser:
1029 return
1030 new_tests = self.parser.process_lines([new_line])
1031 for test in new_tests:
1032 self.__insert_test(test)
1035 def __insert_test(self, test):
1037 An internal method to insert a new test result into the
1038 database. This method will not raise an exception, even if an
1039 error occurs during the insert, to avoid failing a test
1040 simply because of unexpected database issues."""
1041 self.num_tests_run += 1
1042 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1043 self.num_tests_failed += 1
1044 try:
1045 self.results_db.insert_test(self.job_model, test)
1046 except Exception:
1047 msg = ("WARNING: An unexpected error occured while "
1048 "inserting test results into the database. "
1049 "Ignoring error.\n" + traceback.format_exc())
1050 print >> sys.stderr, msg
1053 def preprocess_client_state(self):
1055 Produce a state file for initializing the state of a client job.
1057 Creates a new client state file with all the current server state, as
1058 well as some pre-set client state.
1060 @returns The path of the file the state was written into.
1062 # initialize the sysinfo state
1063 self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1065 # dump the state out to a tempfile
1066 fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1067 os.close(fd)
1069 # write_to_file doesn't need locking, we exclusively own file_path
1070 self._state.write_to_file(file_path)
1071 return file_path
1074 def postprocess_client_state(self, state_path):
1076 Update the state of this job with the state from a client job.
1078 Updates the state of the server side of a job with the final state
1079 of a client job that was run. Updates the non-client-specific state,
1080 pulls in some specific bits from the client-specific state, and then
1081 discards the rest. Removes the state file afterwards
1083 @param state_file A path to the state file from the client.
1085 # update the on-disk state
1086 try:
1087 self._state.read_from_file(state_path)
1088 os.remove(state_path)
1089 except OSError, e:
1090 # ignore file-not-found errors
1091 if e.errno != errno.ENOENT:
1092 raise
1093 else:
1094 logging.debug('Client state file %s not found', state_path)
1096 # update the sysinfo state
1097 if self._state.has('client', 'sysinfo'):
1098 self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1100 # drop all the client-specific state
1101 self._state.discard_namespace('client')
1104 def clear_all_known_hosts(self):
1105 """Clears known hosts files for all AbstractSSHHosts."""
1106 for host in self.hosts:
1107 if isinstance(host, abstract_ssh.AbstractSSHHost):
1108 host.clear_known_hosts()
1111 site_server_job = utils.import_site_class(
1112 __file__, "autotest_lib.server.site_server_job", "site_server_job",
1113 base_server_job)
1115 class server_job(site_server_job):
1116 pass
1119 class warning_manager(object):
1120 """Class for controlling warning logs. Manages the enabling and disabling
1121 of warnings."""
1122 def __init__(self):
1123 # a map of warning types to a list of disabled time intervals
1124 self.disabled_warnings = {}
1127 def is_valid(self, timestamp, warning_type):
1128 """Indicates if a warning (based on the time it occured and its type)
1129 is a valid warning. A warning is considered "invalid" if this type of
1130 warning was marked as "disabled" at the time the warning occured."""
1131 disabled_intervals = self.disabled_warnings.get(warning_type, [])
1132 for start, end in disabled_intervals:
1133 if timestamp >= start and (end is None or timestamp < end):
1134 return False
1135 return True
1138 def disable_warnings(self, warning_type, current_time_func=time.time):
1139 """As of now, disables all further warnings of this type."""
1140 intervals = self.disabled_warnings.setdefault(warning_type, [])
1141 if not intervals or intervals[-1][1] is not None:
1142 intervals.append((int(current_time_func()), None))
1145 def enable_warnings(self, warning_type, current_time_func=time.time):
1146 """As of now, enables all further warnings of this type."""
1147 intervals = self.disabled_warnings.get(warning_type, [])
1148 if intervals and intervals[-1][1] is None:
1149 intervals[-1] = (intervals[-1][0], int(current_time_func()))