2 The main job wrapper for the server side.
4 This is the core infrastructure. Derived from the client side job.py
6 Copyright Martin J. Bligh, Andy Whitcroft 2007
9 import getpass
, os
, sys
, re
, stat
, tempfile
, time
, select
, subprocess
, platform
10 import traceback
, shutil
, warnings
, fcntl
, pickle
, logging
, itertools
, errno
11 from autotest_lib
.client
.bin
import sysinfo
12 from autotest_lib
.client
.common_lib
import base_job
13 from autotest_lib
.client
.common_lib
import error
, log
, utils
, packages
14 from autotest_lib
.client
.common_lib
import logging_manager
15 from autotest_lib
.server
import test
, subcommand
, profilers
16 from autotest_lib
.server
.hosts
import abstract_ssh
17 from autotest_lib
.tko
import db
as tko_db
, status_lib
, utils
as tko_utils
20 def _control_segment_path(name
):
21 """Get the pathname of the named control segment file."""
22 server_dir
= os
.path
.dirname(os
.path
.abspath(__file__
))
23 return os
.path
.join(server_dir
, "control_segments", name
)
26 CLIENT_CONTROL_FILENAME
= 'control'
27 SERVER_CONTROL_FILENAME
= 'control.srv'
28 MACHINES_FILENAME
= '.machines'
30 CLIENT_WRAPPER_CONTROL_FILE
= _control_segment_path('client_wrapper')
31 CRASHDUMPS_CONTROL_FILE
= _control_segment_path('crashdumps')
32 CRASHINFO_CONTROL_FILE
= _control_segment_path('crashinfo')
33 INSTALL_CONTROL_FILE
= _control_segment_path('install')
34 CLEANUP_CONTROL_FILE
= _control_segment_path('cleanup')
36 VERIFY_CONTROL_FILE
= _control_segment_path('verify')
37 REPAIR_CONTROL_FILE
= _control_segment_path('repair')
40 # by default provide a stub that generates no site data
41 def _get_site_job_data_dummy(job
):
45 # load up site-specific code for generating site-specific job data
46 get_site_job_data
= utils
.import_site_function(__file__
,
47 "autotest_lib.server.site_server_job", "get_site_job_data",
48 _get_site_job_data_dummy
)
51 class status_indenter(base_job
.status_indenter
):
52 """Provide a simple integer-backed status indenter."""
70 def get_context(self
):
71 """Returns a context object for use by job.get_record_context."""
72 class context(object):
73 def __init__(self
, indenter
, indent
):
74 self
._indenter
= indenter
77 self
._indenter
._indent
= self
._indent
78 return context(self
, self
._indent
)
81 class server_job_record_hook(object):
82 """The job.record hook for server job. Used to inject WARN messages from
83 the console or vlm whenever new logs are written, and to echo any logs
84 to INFO level logging. Implemented as a class so that it can use state to
85 block recursive calls, so that the hook can call job.record itself to
88 Depends on job._read_warnings and job._logger.
90 def __init__(self
, job
):
92 self
._being
_called
= False
95 def __call__(self
, entry
):
96 """A wrapper around the 'real' record hook, the _hook method, which
97 prevents recursion. This isn't making any effort to be threadsafe,
98 the intent is to outright block infinite recursion via a
99 job.record->_hook->job.record->_hook->job.record... chain."""
100 if self
._being
_called
:
102 self
._being
_called
= True
104 self
._hook
(self
._job
, entry
)
106 self
._being
_called
= False
110 def _hook(job
, entry
):
111 """The core hook, which can safely call job.record."""
113 # poll all our warning loggers for new warnings
114 for timestamp
, msg
in job
._read
_warnings
():
115 warning_entry
= base_job
.status_log_entry(
116 'WARN', None, None, msg
, {}, timestamp
=timestamp
)
117 entries
.append(warning_entry
)
118 job
.record_entry(warning_entry
)
119 # echo rendered versions of all the status logs to info
120 entries
.append(entry
)
121 for entry
in entries
:
122 rendered_entry
= job
._logger
.render_entry(entry
)
123 logging
.info(rendered_entry
)
124 job
._parse
_status
(rendered_entry
)
127 class base_server_job(base_job
.base_job
):
128 """The server-side concrete implementation of base_job.
130 Optional properties provided by this implementation:
143 def __init__(self
, control
, args
, resultdir
, label
, user
, machines
,
144 client
=False, parse_job
='',
145 ssh_user
='root', ssh_port
=22, ssh_pass
='',
146 group_name
='', tag
='',
147 control_filename
=SERVER_CONTROL_FILENAME
):
149 Create a server side job object.
151 @param control: The pathname of the control file.
152 @param args: Passed to the control file.
153 @param resultdir: Where to throw the results.
154 @param label: Description of the job.
155 @param user: Username for the job (email address).
156 @param client: True if this is a client-side control file.
157 @param parse_job: string, if supplied it is the job execution tag that
158 the results will be passed through to the TKO parser with.
159 @param ssh_user: The SSH username. [root]
160 @param ssh_port: The SSH port number. [22]
161 @param ssh_pass: The SSH passphrase, if needed.
162 @param group_name: If supplied, this will be written out as
163 host_group_name in the keyvals file for the parser.
164 @param tag: The job execution tag from the scheduler. [optional]
165 @param control_filename: The filename where the server control file
166 should be written in the results directory.
168 super(base_server_job
, self
).__init
__(resultdir
=resultdir
)
170 path
= os
.path
.dirname(__file__
)
171 self
.control
= control
172 self
._uncollected
_log
_file
= os
.path
.join(self
.resultdir
,
174 debugdir
= os
.path
.join(self
.resultdir
, 'debug')
175 if not os
.path
.exists(debugdir
):
181 self
.user
= getpass
.getuser()
184 self
.machines
= machines
185 self
._client
= client
186 self
.warning_loggers
= set()
187 self
.warning_manager
= warning_manager()
188 self
._ssh
_user
= ssh_user
189 self
._ssh
_port
= ssh_port
190 self
._ssh
_pass
= ssh_pass
192 self
.last_boot_tag
= None
194 self
.drop_caches
= False
195 self
.drop_caches_between_iterations
= False
196 self
._control
_filename
= control_filename
198 self
.logging
= logging_manager
.get_logging_manager(
199 manage_stdout_and_stderr
=True, redirect_fds
=True)
200 subcommand
.logging_manager_object
= self
.logging
202 self
.sysinfo
= sysinfo
.sysinfo(self
.resultdir
)
203 self
.profilers
= profilers
.profilers(self
)
205 job_data
= {'label' : label
, 'user' : user
,
206 'hostname' : ','.join(machines
),
207 'drone' : platform
.node(),
208 'status_version' : str(self
._STATUS
_VERSION
),
209 'job_started' : str(int(time
.time()))}
211 job_data
['host_group_name'] = group_name
213 # only write these keyvals out on the first job in a resultdir
214 if 'job_started' not in utils
.read_keyval(self
.resultdir
):
215 job_data
.update(get_site_job_data(self
))
216 utils
.write_keyval(self
.resultdir
, job_data
)
218 self
._parse
_job
= parse_job
219 self
._using
_parser
= (self
._parse
_job
and len(machines
) <= 1)
220 self
.pkgmgr
= packages
.PackageManager(
221 self
.autodir
, run_function_dargs
={'timeout':600})
222 self
.num_tests_run
= 0
223 self
.num_tests_failed
= 0
225 self
._register
_subcommand
_hooks
()
227 # these components aren't usable on the server
228 self
.bootloader
= None
231 # set up the status logger
232 self
._indenter
= status_indenter()
233 self
._logger
= base_job
.status_logger(
234 self
, self
._indenter
, 'status.log', 'status.log',
235 record_hook
=server_job_record_hook(self
))
239 def _find_base_directories(cls
):
241 Determine locations of autodir, clientdir and serverdir. Assumes
242 that this file is located within serverdir and uses __file__ along
243 with relative paths to resolve the location.
245 serverdir
= os
.path
.abspath(os
.path
.dirname(__file__
))
246 autodir
= os
.path
.normpath(os
.path
.join(serverdir
, '..'))
247 clientdir
= os
.path
.join(autodir
, 'client')
248 return autodir
, clientdir
, serverdir
251 def _find_resultdir(self
, resultdir
):
253 Determine the location of resultdir. For server jobs we expect one to
254 always be explicitly passed in to __init__, so just return that.
257 return os
.path
.normpath(resultdir
)
262 def _get_status_logger(self
):
263 """Return a reference to the status logger."""
268 def _load_control_file(path
):
271 control_file
= f
.read()
274 return re
.sub('\r', '', control_file
)
277 def _register_subcommand_hooks(self
):
279 Register some hooks into the subcommand modules that allow us
280 to properly clean up self.hosts created in forked subprocesses.
283 self
._existing
_hosts
_on
_fork
= set(self
.hosts
)
285 new_hosts
= self
.hosts
- self
._existing
_hosts
_on
_fork
286 for host
in new_hosts
:
288 subcommand
.subcommand
.register_fork_hook(on_fork
)
289 subcommand
.subcommand
.register_join_hook(on_join
)
292 def init_parser(self
):
294 Start the continuous parsing of self.resultdir. This sets up
295 the database connection and inserts the basic job object into
296 the database if necessary.
298 if not self
._using
_parser
:
300 # redirect parser debugging to .parse.log
301 parse_log
= os
.path
.join(self
.resultdir
, '.parse.log')
302 parse_log
= open(parse_log
, 'w', 0)
303 tko_utils
.redirect_parser_debugging(parse_log
)
304 # create a job model object and set up the db
305 self
.results_db
= tko_db
.db(autocommit
=True)
306 self
.parser
= status_lib
.parser(self
._STATUS
_VERSION
)
307 self
.job_model
= self
.parser
.make_job(self
.resultdir
)
308 self
.parser
.start(self
.job_model
)
309 # check if a job already exists in the db and insert it if
311 job_idx
= self
.results_db
.find_job(self
._parse
_job
)
313 self
.results_db
.insert_job(self
._parse
_job
, self
.job_model
)
315 machine_idx
= self
.results_db
.lookup_machine(self
.job_model
.machine
)
316 self
.job_model
.index
= job_idx
317 self
.job_model
.machine_idx
= machine_idx
320 def cleanup_parser(self
):
322 This should be called after the server job is finished
323 to carry out any remaining cleanup (e.g. flushing any
324 remaining test results to the results db)
326 if not self
._using
_parser
:
328 final_tests
= self
.parser
.end()
329 for test
in final_tests
:
330 self
.__insert
_test
(test
)
331 self
._using
_parser
= False
335 if not self
.machines
:
336 raise error
.AutoservError('No machines specified to verify')
338 os
.chdir(self
.resultdir
)
340 namespace
= {'machines' : self
.machines
, 'job' : self
,
341 'ssh_user' : self
._ssh
_user
,
342 'ssh_port' : self
._ssh
_port
,
343 'ssh_pass' : self
._ssh
_pass
}
344 self
._execute
_code
(VERIFY_CONTROL_FILE
, namespace
, protect
=False)
346 msg
= ('Verify failed\n' + str(e
) + '\n' + traceback
.format_exc())
347 self
.record('ABORT', None, None, msg
)
351 def repair(self
, host_protection
):
352 if not self
.machines
:
353 raise error
.AutoservError('No machines specified to repair')
355 os
.chdir(self
.resultdir
)
356 namespace
= {'machines': self
.machines
, 'job': self
,
357 'ssh_user': self
._ssh
_user
, 'ssh_port': self
._ssh
_port
,
358 'ssh_pass': self
._ssh
_pass
,
359 'protection_level': host_protection
}
361 self
._execute
_code
(REPAIR_CONTROL_FILE
, namespace
, protect
=False)
366 perform any additional checks in derived classes.
371 def enable_external_logging(self
):
373 Start or restart external logging mechanism.
378 def disable_external_logging(self
):
380 Pause or stop external logging mechanism.
385 def use_external_logging(self
):
387 Return True if external logging should be used.
392 def _make_parallel_wrapper(self
, function
, machines
, log
):
393 """Wrap function as appropriate for calling by parallel_simple."""
394 is_forking
= not (len(machines
) == 1 and self
.machines
== machines
)
395 if self
._parse
_job
and is_forking
and log
:
396 def wrapper(machine
):
397 self
._parse
_job
+= "/" + machine
398 self
._using
_parser
= True
399 self
.machines
= [machine
]
400 self
.push_execution_context(machine
)
401 os
.chdir(self
.resultdir
)
402 utils
.write_keyval(self
.resultdir
, {"hostname": machine
})
404 result
= function(machine
)
405 self
.cleanup_parser()
407 elif len(machines
) > 1 and log
:
408 def wrapper(machine
):
409 self
.push_execution_context(machine
)
410 os
.chdir(self
.resultdir
)
411 machine_data
= {'hostname' : machine
,
412 'status_version' : str(self
._STATUS
_VERSION
)}
413 utils
.write_keyval(self
.resultdir
, machine_data
)
414 result
= function(machine
)
421 def parallel_simple(self
, function
, machines
, log
=True, timeout
=None,
422 return_results
=False):
424 Run 'function' using parallel_simple, with an extra wrapper to handle
425 the necessary setup for continuous parsing, if possible. If continuous
426 parsing is already properly initialized then this should just work.
428 @param function: A callable to run in parallel given each machine.
429 @param machines: A list of machine names to be passed one per subcommand
430 invocation of function.
431 @param log: If True, output will be written to output in a subdirectory
432 named after each machine.
433 @param timeout: Seconds after which the function call should timeout.
434 @param return_results: If True instead of an AutoServError being raised
435 on any error a list of the results|exceptions from the function
436 called on each arg is returned. [default: False]
438 @raises error.AutotestError: If any of the functions failed.
440 wrapper
= self
._make
_parallel
_wrapper
(function
, machines
, log
)
441 return subcommand
.parallel_simple(wrapper
, machines
,
442 log
=log
, timeout
=timeout
,
443 return_results
=return_results
)
446 def parallel_on_machines(self
, function
, machines
, timeout
=None):
448 @param function: Called in parallel with one machine as its argument.
449 @param machines: A list of machines to call function(machine) on.
450 @param timeout: Seconds after which the function call should timeout.
452 @returns A list of machines on which function(machine) returned
453 without raising an exception.
455 results
= self
.parallel_simple(function
, machines
, timeout
=timeout
,
457 success_machines
= []
458 for result
, machine
in itertools
.izip(results
, machines
):
459 if not isinstance(result
, Exception):
460 success_machines
.append(machine
)
461 return success_machines
464 _USE_TEMP_DIR
= object()
465 def run(self
, cleanup
=False, install_before
=False, install_after
=False,
466 collect_crashdumps
=True, namespace
={}, control
=None,
467 control_file_dir
=None, only_collect_crashinfo
=False):
468 # for a normal job, make sure the uncollected logs file exists
469 # for a crashinfo-only run it should already exist, bail out otherwise
470 created_uncollected_logs
= False
471 if self
.resultdir
and not os
.path
.exists(self
._uncollected
_log
_file
):
472 if only_collect_crashinfo
:
473 # if this is a crashinfo-only run, and there were no existing
474 # uncollected logs, just bail out early
475 logging
.info("No existing uncollected logs, "
476 "skipping crashinfo collection")
479 log_file
= open(self
._uncollected
_log
_file
, "w")
480 pickle
.dump([], log_file
)
482 created_uncollected_logs
= True
484 # use a copy so changes don't affect the original dictionary
485 namespace
= namespace
.copy()
486 machines
= self
.machines
488 if self
.control
is None:
491 control
= self
._load
_control
_file
(self
.control
)
492 if control_file_dir
is None:
493 control_file_dir
= self
.resultdir
496 namespace
['machines'] = machines
497 namespace
['args'] = self
.args
498 namespace
['job'] = self
499 namespace
['ssh_user'] = self
._ssh
_user
500 namespace
['ssh_port'] = self
._ssh
_port
501 namespace
['ssh_pass'] = self
._ssh
_pass
502 test_start_time
= int(time
.time())
505 os
.chdir(self
.resultdir
)
506 # touch status.log so that the parser knows a job is running here
507 open(self
.get_status_log_path(), 'a').close()
508 self
.enable_external_logging()
510 collect_crashinfo
= True
511 temp_control_file_dir
= None
514 if install_before
and machines
:
515 self
._execute
_code
(INSTALL_CONTROL_FILE
, namespace
)
517 if only_collect_crashinfo
:
520 # determine the dir to write the control files to
521 cfd_specified
= (control_file_dir
522 and control_file_dir
is not self
._USE
_TEMP
_DIR
)
524 temp_control_file_dir
= None
526 temp_control_file_dir
= tempfile
.mkdtemp(
527 suffix
='temp_control_file_dir')
528 control_file_dir
= temp_control_file_dir
529 server_control_file
= os
.path
.join(control_file_dir
,
530 self
._control
_filename
)
531 client_control_file
= os
.path
.join(control_file_dir
,
532 CLIENT_CONTROL_FILENAME
)
534 namespace
['control'] = control
535 utils
.open_write_close(client_control_file
, control
)
536 shutil
.copyfile(CLIENT_WRAPPER_CONTROL_FILE
,
539 utils
.open_write_close(server_control_file
, control
)
540 logging
.info("Processing control file")
541 self
._execute
_code
(server_control_file
, namespace
)
542 logging
.info("Finished processing control file")
544 # no error occured, so we don't need to collect crashinfo
545 collect_crashinfo
= False
549 'Exception escaped control file, job aborting:')
550 self
.record('INFO', None, None, str(e
),
551 {'job_abort_reason': str(e
)})
553 pass # don't let logging exceptions here interfere
556 if temp_control_file_dir
:
557 # Clean up temp directory used for copies of the control files
559 shutil
.rmtree(temp_control_file_dir
)
561 logging
.warn('Could not remove temp directory %s: %s',
562 temp_control_file_dir
, e
)
564 if machines
and (collect_crashdumps
or collect_crashinfo
):
565 namespace
['test_start_time'] = test_start_time
566 if collect_crashinfo
:
567 # includes crashdumps
568 self
._execute
_code
(CRASHINFO_CONTROL_FILE
, namespace
)
570 self
._execute
_code
(CRASHDUMPS_CONTROL_FILE
, namespace
)
571 if self
._uncollected
_log
_file
and created_uncollected_logs
:
572 os
.remove(self
._uncollected
_log
_file
)
573 self
.disable_external_logging()
574 if cleanup
and machines
:
575 self
._execute
_code
(CLEANUP_CONTROL_FILE
, namespace
)
576 if install_after
and machines
:
577 self
._execute
_code
(INSTALL_CONTROL_FILE
, namespace
)
580 def run_test(self
, url
, *args
, **dargs
):
582 Summon a test object and run it.
585 tag to add to testname
587 url of the test to run
589 group
, testname
= self
.pkgmgr
.get_package_name(url
, 'test')
590 testname
, subdir
, tag
= self
._build
_tagged
_test
_name
(testname
, dargs
)
591 outputdir
= self
._make
_test
_outputdir
(subdir
)
595 test
.runtest(self
, url
, tag
, args
, dargs
)
596 except error
.TestBaseException
, e
:
597 self
.record(e
.exit_status
, subdir
, testname
, str(e
))
600 info
= str(e
) + "\n" + traceback
.format_exc()
601 self
.record('FAIL', subdir
, testname
, info
)
604 self
.record('GOOD', subdir
, testname
, 'completed successfully')
606 result
, exc_info
= self
._run
_group
(testname
, subdir
, group_func
)
607 if exc_info
and isinstance(exc_info
[1], error
.TestBaseException
):
610 raise exc_info
[0], exc_info
[1], exc_info
[2]
615 def _run_group(self
, name
, subdir
, function
, *args
, **dargs
):
617 Underlying method for running something inside of a group.
619 result
, exc_info
= None, None
621 self
.record('START', subdir
, name
)
622 result
= function(*args
, **dargs
)
623 except error
.TestBaseException
, e
:
624 self
.record("END %s" % e
.exit_status
, subdir
, name
)
625 exc_info
= sys
.exc_info()
627 err_msg
= str(e
) + '\n'
628 err_msg
+= traceback
.format_exc()
629 self
.record('END ABORT', subdir
, name
, err_msg
)
630 raise error
.JobError(name
+ ' failed\n' + traceback
.format_exc())
632 self
.record('END GOOD', subdir
, name
)
634 return result
, exc_info
637 def run_group(self
, function
, *args
, **dargs
):
642 arguments for the function
645 name
= function
.__name
__
647 # Allow the tag for the group to be specified.
648 tag
= dargs
.pop('tag', None)
652 return self
._run
_group
(name
, None, function
, *args
, **dargs
)[0]
655 def run_reboot(self
, reboot_func
, get_kernel_func
):
657 A specialization of run_group meant specifically for handling
658 a reboot. Includes support for capturing the kernel version
661 reboot_func: a function that carries out the reboot
663 get_kernel_func: a function that returns a string
664 representing the kernel version.
667 self
.record('START', None, 'reboot')
670 err_msg
= str(e
) + '\n' + traceback
.format_exc()
671 self
.record('END FAIL', None, 'reboot', err_msg
)
674 kernel
= get_kernel_func()
675 self
.record('END GOOD', None, 'reboot',
676 optional_fields
={"kernel": kernel
})
679 def run_control(self
, path
):
680 """Execute a control file found at path (relative to the autotest
681 path). Intended for executing a control file within a control file,
682 not for running the top-level job control file."""
683 path
= os
.path
.join(self
.autodir
, path
)
684 control_file
= self
._load
_control
_file
(path
)
685 self
.run(control
=control_file
, control_file_dir
=self
._USE
_TEMP
_DIR
)
688 def add_sysinfo_command(self
, command
, logfile
=None, on_every_test
=False):
689 self
._add
_sysinfo
_loggable
(sysinfo
.command(command
, logf
=logfile
),
693 def add_sysinfo_logfile(self
, file, on_every_test
=False):
694 self
._add
_sysinfo
_loggable
(sysinfo
.logfile(file), on_every_test
)
697 def _add_sysinfo_loggable(self
, loggable
, on_every_test
):
699 self
.sysinfo
.test_loggables
.add(loggable
)
701 self
.sysinfo
.boot_loggables
.add(loggable
)
704 def _read_warnings(self
):
705 """Poll all the warning loggers and extract any new warnings that have
706 been logged. If the warnings belong to a category that is currently
707 disabled, this method will discard them and they will no longer be
710 Returns a list of (timestamp, message) tuples, where timestamp is an
711 integer epoch timestamp."""
714 # pull in a line of output from every logger that has
715 # output ready to be read
716 loggers
, _
, _
= select
.select(self
.warning_loggers
, [], [], 0)
717 closed_loggers
= set()
718 for logger
in loggers
:
719 line
= logger
.readline()
720 # record any broken pipes (aka line == empty)
722 closed_loggers
.add(logger
)
724 # parse out the warning
725 timestamp
, msgtype
, msg
= line
.split('\t', 2)
726 timestamp
= int(timestamp
)
727 # if the warning is valid, add it to the results
728 if self
.warning_manager
.is_valid(timestamp
, msgtype
):
729 warnings
.append((timestamp
, msg
.strip()))
731 # stop listening to loggers that are closed
732 self
.warning_loggers
-= closed_loggers
734 # stop if none of the loggers have any output left
738 # sort into timestamp order
743 def _unique_subdirectory(self
, base_subdirectory_name
):
744 """Compute a unique results subdirectory based on the given name.
746 Appends base_subdirectory_name with a number as necessary to find a
747 directory name that doesn't already exist.
749 subdirectory
= base_subdirectory_name
751 while os
.path
.exists(os
.path
.join(self
.resultdir
, subdirectory
)):
752 subdirectory
= base_subdirectory_name
+ '.' + str(counter
)
757 def get_record_context(self
):
758 """Returns an object representing the current job.record context.
760 The object returned is an opaque object with a 0-arg restore method
761 which can be called to restore the job.record context (i.e. indentation)
762 to the current level. The intention is that it should be used when
763 something external which generate job.record calls (e.g. an autotest
764 client) can fail catastrophically and the server job record state
765 needs to be reset to its original "known good" state.
767 @return: A context object with a 0-arg restore() method."""
768 return self
._indenter
.get_context()
771 def record_summary(self
, status_code
, test_name
, reason
='', attributes
=None,
772 distinguishing_attributes
=(), child_test_ids
=None):
773 """Record a summary test result.
775 @param status_code: status code string, see
776 common_lib.log.is_valid_status()
777 @param test_name: name of the test
778 @param reason: (optional) string providing detailed reason for test
780 @param attributes: (optional) dict of string keyvals to associate with
782 @param distinguishing_attributes: (optional) list of attribute names
783 that should be used to distinguish identically-named test
784 results. These attributes should be present in the attributes
785 parameter. This is used to generate user-friendly subdirectory
787 @param child_test_ids: (optional) list of test indices for test results
788 used in generating this result.
790 subdirectory_name_parts
= [test_name
]
791 for attribute
in distinguishing_attributes
:
793 assert attribute
in attributes
, '%s not in %s' % (attribute
,
795 subdirectory_name_parts
.append(attributes
[attribute
])
796 base_subdirectory_name
= '.'.join(subdirectory_name_parts
)
798 subdirectory
= self
._unique
_subdirectory
(base_subdirectory_name
)
799 subdirectory_path
= os
.path
.join(self
.resultdir
, subdirectory
)
800 os
.mkdir(subdirectory_path
)
802 self
.record(status_code
, subdirectory
, test_name
,
803 status
=reason
, optional_fields
={'is_summary': True})
806 utils
.write_keyval(subdirectory_path
, attributes
)
809 ids_string
= ','.join(str(test_id
) for test_id
in child_test_ids
)
810 summary_data
= {'child_test_ids': ids_string
}
811 utils
.write_keyval(os
.path
.join(subdirectory_path
, 'summary_data'),
815 def disable_warnings(self
, warning_type
):
816 self
.warning_manager
.disable_warnings(warning_type
)
817 self
.record("INFO", None, None,
818 "disabling %s warnings" % warning_type
,
819 {"warnings.disable": warning_type
})
822 def enable_warnings(self
, warning_type
):
823 self
.warning_manager
.enable_warnings(warning_type
)
824 self
.record("INFO", None, None,
825 "enabling %s warnings" % warning_type
,
826 {"warnings.enable": warning_type
})
829 def get_status_log_path(self
, subdir
=None):
830 """Return the path to the job status log.
832 @param subdir - Optional paramter indicating that you want the path
833 to a subdirectory status log.
835 @returns The path where the status log should be.
839 return os
.path
.join(self
.resultdir
, subdir
, "status.log")
841 return os
.path
.join(self
.resultdir
, "status.log")
846 def _update_uncollected_logs_list(self
, update_func
):
847 """Updates the uncollected logs list in a multi-process safe manner.
849 @param update_func - a function that updates the list of uncollected
850 logs. Should take one parameter, the list to be updated.
852 if self
._uncollected
_log
_file
:
853 log_file
= open(self
._uncollected
_log
_file
, "r+")
854 fcntl
.flock(log_file
, fcntl
.LOCK_EX
)
856 uncollected_logs
= pickle
.load(log_file
)
857 update_func(uncollected_logs
)
860 pickle
.dump(uncollected_logs
, log_file
)
863 fcntl
.flock(log_file
, fcntl
.LOCK_UN
)
867 def add_client_log(self
, hostname
, remote_path
, local_path
):
868 """Adds a new set of client logs to the list of uncollected logs,
869 to allow for future log recovery.
871 @param host - the hostname of the machine holding the logs
872 @param remote_path - the directory on the remote machine holding logs
873 @param local_path - the local directory to copy the logs into
875 def update_func(logs_list
):
876 logs_list
.append((hostname
, remote_path
, local_path
))
877 self
._update
_uncollected
_logs
_list
(update_func
)
880 def remove_client_log(self
, hostname
, remote_path
, local_path
):
881 """Removes a set of client logs from the list of uncollected logs,
882 to allow for future log recovery.
884 @param host - the hostname of the machine holding the logs
885 @param remote_path - the directory on the remote machine holding logs
886 @param local_path - the local directory to copy the logs into
888 def update_func(logs_list
):
889 logs_list
.remove((hostname
, remote_path
, local_path
))
890 self
._update
_uncollected
_logs
_list
(update_func
)
893 def get_client_logs(self
):
894 """Retrieves the list of uncollected logs, if it exists.
896 @returns A list of (host, remote_path, local_path) tuples. Returns
897 an empty list if no uncollected logs file exists.
899 log_exists
= (self
._uncollected
_log
_file
and
900 os
.path
.exists(self
._uncollected
_log
_file
))
902 return pickle
.load(open(self
._uncollected
_log
_file
))
907 def _fill_server_control_namespace(self
, namespace
, protect
=True):
909 Prepare a namespace to be used when executing server control files.
911 This sets up the control file API by importing modules and making them
912 available under the appropriate names within namespace.
914 For use by _execute_code().
917 namespace: The namespace dictionary to fill in.
918 protect: Boolean. If True (the default) any operation that would
919 clobber an existing entry in namespace will cause an error.
921 error.AutoservError: When a name would be clobbered by import.
923 def _import_names(module_name
, names
=()):
925 Import a module and assign named attributes into namespace.
928 module_name: The string module name.
929 names: A limiting list of names to import from module_name. If
930 empty (the default), all names are imported from the module
931 similar to a "from foo.bar import *" statement.
933 error.AutoservError: When a name being imported would clobber
934 a name already in namespace.
936 module
= __import__(module_name
, {}, {}, names
)
938 # No names supplied? Import * from the lowest level module.
939 # (Ugh, why do I have to implement this part myself?)
941 for submodule_name
in module_name
.split('.')[1:]:
942 module
= getattr(module
, submodule_name
)
943 if hasattr(module
, '__all__'):
944 names
= getattr(module
, '__all__')
948 # Install each name into namespace, checking to make sure it
949 # doesn't override anything that already exists.
951 # Check for conflicts to help prevent future problems.
952 if name
in namespace
and protect
:
953 if namespace
[name
] is not getattr(module
, name
):
954 raise error
.AutoservError('importing name '
955 '%s from %s %r would override %r' %
956 (name
, module_name
, getattr(module
, name
),
959 # Encourage cleanliness and the use of __all__ for a
960 # more concrete API with less surprises on '*' imports.
961 warnings
.warn('%s (%r) being imported from %s for use '
962 'in server control files is not the '
963 'first occurrance of that import.' %
964 (name
, namespace
[name
], module_name
))
966 namespace
[name
] = getattr(module
, name
)
969 # This is the equivalent of prepending a bunch of import statements to
970 # the front of the control script.
971 namespace
.update(os
=os
, sys
=sys
, logging
=logging
)
972 _import_names('autotest_lib.server',
973 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
974 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
975 _import_names('autotest_lib.server.subcommand',
976 ('parallel', 'parallel_simple', 'subcommand'))
977 _import_names('autotest_lib.server.utils',
978 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
979 _import_names('autotest_lib.client.common_lib.error')
980 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
982 # Inject ourself as the job object into other classes within the API.
983 # (Yuck, this injection is a gross thing be part of a public API. -gps)
985 # XXX Base & SiteAutotest do not appear to use .job. Who does?
986 namespace
['autotest'].Autotest
.job
= self
987 # server.hosts.base_classes.Host uses .job.
988 namespace
['hosts'].Host
.job
= self
989 namespace
['hosts'].factory
.ssh_user
= self
._ssh
_user
990 namespace
['hosts'].factory
.ssh_port
= self
._ssh
_port
991 namespace
['hosts'].factory
.ssh_pass
= self
._ssh
_pass
994 def _execute_code(self
, code_file
, namespace
, protect
=True):
996 Execute code using a copy of namespace as a server control script.
998 Unless protect_namespace is explicitly set to False, the dict will not
1002 code_file: The filename of the control file to execute.
1003 namespace: A dict containing names to make available during execution.
1004 protect: Boolean. If True (the default) a copy of the namespace dict
1005 is used during execution to prevent the code from modifying its
1006 contents outside of this function. If False the raw dict is
1007 passed in and modifications will be allowed.
1010 namespace
= namespace
.copy()
1011 self
._fill
_server
_control
_namespace
(namespace
, protect
=protect
)
1012 # TODO: Simplify and get rid of the special cases for only 1 machine.
1013 if len(self
.machines
) > 1:
1014 machines_text
= '\n'.join(self
.machines
) + '\n'
1015 # Only rewrite the file if it does not match our machine list.
1017 machines_f
= open(MACHINES_FILENAME
, 'r')
1018 existing_machines_text
= machines_f
.read()
1020 except EnvironmentError:
1021 existing_machines_text
= None
1022 if machines_text
!= existing_machines_text
:
1023 utils
.open_write_close(MACHINES_FILENAME
, machines_text
)
1024 execfile(code_file
, namespace
, namespace
)
1027 def _parse_status(self
, new_line
):
1028 if not self
._using
_parser
:
1030 new_tests
= self
.parser
.process_lines([new_line
])
1031 for test
in new_tests
:
1032 self
.__insert
_test
(test
)
1035 def __insert_test(self
, test
):
1037 An internal method to insert a new test result into the
1038 database. This method will not raise an exception, even if an
1039 error occurs during the insert, to avoid failing a test
1040 simply because of unexpected database issues."""
1041 self
.num_tests_run
+= 1
1042 if status_lib
.is_worse_than_or_equal_to(test
.status
, 'FAIL'):
1043 self
.num_tests_failed
+= 1
1045 self
.results_db
.insert_test(self
.job_model
, test
)
1047 msg
= ("WARNING: An unexpected error occured while "
1048 "inserting test results into the database. "
1049 "Ignoring error.\n" + traceback
.format_exc())
1050 print >> sys
.stderr
, msg
1053 def preprocess_client_state(self
):
1055 Produce a state file for initializing the state of a client job.
1057 Creates a new client state file with all the current server state, as
1058 well as some pre-set client state.
1060 @returns The path of the file the state was written into.
1062 # initialize the sysinfo state
1063 self
._state
.set('client', 'sysinfo', self
.sysinfo
.serialize())
1065 # dump the state out to a tempfile
1066 fd
, file_path
= tempfile
.mkstemp(dir=self
.tmpdir
)
1069 # write_to_file doesn't need locking, we exclusively own file_path
1070 self
._state
.write_to_file(file_path
)
1074 def postprocess_client_state(self
, state_path
):
1076 Update the state of this job with the state from a client job.
1078 Updates the state of the server side of a job with the final state
1079 of a client job that was run. Updates the non-client-specific state,
1080 pulls in some specific bits from the client-specific state, and then
1081 discards the rest. Removes the state file afterwards
1083 @param state_file A path to the state file from the client.
1085 # update the on-disk state
1087 self
._state
.read_from_file(state_path
)
1088 os
.remove(state_path
)
1090 # ignore file-not-found errors
1091 if e
.errno
!= errno
.ENOENT
:
1094 logging
.debug('Client state file %s not found', state_path
)
1096 # update the sysinfo state
1097 if self
._state
.has('client', 'sysinfo'):
1098 self
.sysinfo
.deserialize(self
._state
.get('client', 'sysinfo'))
1100 # drop all the client-specific state
1101 self
._state
.discard_namespace('client')
1104 def clear_all_known_hosts(self
):
1105 """Clears known hosts files for all AbstractSSHHosts."""
1106 for host
in self
.hosts
:
1107 if isinstance(host
, abstract_ssh
.AbstractSSHHost
):
1108 host
.clear_known_hosts()
1111 site_server_job
= utils
.import_site_class(
1112 __file__
, "autotest_lib.server.site_server_job", "site_server_job",
1115 class server_job(site_server_job
):
1119 class warning_manager(object):
1120 """Class for controlling warning logs. Manages the enabling and disabling
1123 # a map of warning types to a list of disabled time intervals
1124 self
.disabled_warnings
= {}
1127 def is_valid(self
, timestamp
, warning_type
):
1128 """Indicates if a warning (based on the time it occured and its type)
1129 is a valid warning. A warning is considered "invalid" if this type of
1130 warning was marked as "disabled" at the time the warning occured."""
1131 disabled_intervals
= self
.disabled_warnings
.get(warning_type
, [])
1132 for start
, end
in disabled_intervals
:
1133 if timestamp
>= start
and (end
is None or timestamp
< end
):
1138 def disable_warnings(self
, warning_type
, current_time_func
=time
.time
):
1139 """As of now, disables all further warnings of this type."""
1140 intervals
= self
.disabled_warnings
.setdefault(warning_type
, [])
1141 if not intervals
or intervals
[-1][1] is not None:
1142 intervals
.append((int(current_time_func()), None))
1145 def enable_warnings(self
, warning_type
, current_time_func
=time
.time
):
1146 """As of now, enables all further warnings of this type."""
1147 intervals
= self
.disabled_warnings
.get(warning_type
, [])
1148 if intervals
and intervals
[-1][1] is None:
1149 intervals
[-1] = (intervals
[-1][0], int(current_time_func()))