KVM test: tests_base.cfg. sample: Fix test dependencies
[autotest-zwu.git] / scheduler / drone_manager.py
blobe094f14f1fb080ef9108b90b287b0b46f49e78ae
1 import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
2 import common, logging
3 from autotest_lib.client.common_lib import error, global_config
4 from autotest_lib.scheduler import email_manager, drone_utility, drones
5 from autotest_lib.scheduler import scheduler_config
8 # results on drones will be placed under the drone_installation_directory in a
9 # directory with this name
10 _DRONE_RESULTS_DIR_SUFFIX = 'results'
12 WORKING_DIRECTORY = object() # see execute_command()
15 AUTOSERV_PID_FILE = '.autoserv_execute'
16 CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
17 PARSER_PID_FILE = '.parser_execute'
18 ARCHIVER_PID_FILE = '.archiver_execute'
20 ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
21 ARCHIVER_PID_FILE)
24 class DroneManagerError(Exception):
25 pass
28 class CustomEquals(object):
29 def _id(self):
30 raise NotImplementedError
33 def __eq__(self, other):
34 if not isinstance(other, type(self)):
35 return NotImplemented
36 return self._id() == other._id()
39 def __ne__(self, other):
40 return not self == other
43 def __hash__(self):
44 return hash(self._id())
47 class Process(CustomEquals):
48 def __init__(self, hostname, pid, ppid=None):
49 self.hostname = hostname
50 self.pid = pid
51 self.ppid = ppid
53 def _id(self):
54 return (self.hostname, self.pid)
57 def __str__(self):
58 return '%s/%s' % (self.hostname, self.pid)
61 def __repr__(self):
62 return super(Process, self).__repr__() + '<%s>' % self
65 class PidfileId(CustomEquals):
66 def __init__(self, path):
67 self.path = path
70 def _id(self):
71 return self.path
74 def __str__(self):
75 return str(self.path)
78 class _PidfileInfo(object):
79 age = 0
80 num_processes = None
83 class PidfileContents(object):
84 process = None
85 exit_status = None
86 num_tests_failed = None
88 def is_invalid(self):
89 return False
92 def is_running(self):
93 return self.process and not self.exit_status
96 class InvalidPidfile(object):
97 def __init__(self, error):
98 self.error = error
101 def is_invalid(self):
102 return True
105 def is_running(self):
106 return False
109 def __str__(self):
110 return self.error
113 class _DroneHeapWrapper(object):
114 """Wrapper to compare drones based on used_capacity().
116 These objects can be used to keep a heap of drones by capacity.
118 def __init__(self, drone):
119 self.drone = drone
122 def __cmp__(self, other):
123 assert isinstance(other, _DroneHeapWrapper)
124 return cmp(self.drone.used_capacity(), other.drone.used_capacity())
127 class DroneManager(object):
129 This class acts as an interface from the scheduler to drones, whether it be
130 only a single "drone" for localhost or multiple remote drones.
132 All paths going into and out of this class are relative to the full results
133 directory, except for those returns by absolute_path().
135 def __init__(self):
136 # absolute path of base results dir
137 self._results_dir = None
138 # holds Process objects
139 self._process_set = set()
140 # maps PidfileId to PidfileContents
141 self._pidfiles = {}
142 # same as _pidfiles
143 self._pidfiles_second_read = {}
144 # maps PidfileId to _PidfileInfo
145 self._registered_pidfile_info = {}
146 # used to generate unique temporary paths
147 self._temporary_path_counter = 0
148 # maps hostname to Drone object
149 self._drones = {}
150 self._results_drone = None
151 # maps results dir to dict mapping file path to contents
152 self._attached_files = {}
153 # heapq of _DroneHeapWrappers
154 self._drone_queue = []
157 def initialize(self, base_results_dir, drone_hostnames,
158 results_repository_hostname):
159 self._results_dir = base_results_dir
161 for hostname in drone_hostnames:
162 self._add_drone(hostname)
164 if not self._drones:
165 # all drones failed to initialize
166 raise DroneManagerError('No valid drones found')
168 self.refresh_drone_configs()
170 logging.info('Using results repository on %s',
171 results_repository_hostname)
172 self._results_drone = drones.get_drone(results_repository_hostname)
173 results_installation_dir = global_config.global_config.get_config_value(
174 scheduler_config.CONFIG_SECTION,
175 'results_host_installation_directory', default=None)
176 if results_installation_dir:
177 self._results_drone.set_autotest_install_dir(
178 results_installation_dir)
179 # don't initialize() the results drone - we don't want to clear out any
180 # directories and we don't need to kill any processes
183 def reinitialize_drones(self):
184 self._call_all_drones('initialize', self._results_dir)
187 def shutdown(self):
188 for drone in self.get_drones():
189 drone.shutdown()
192 def _get_max_pidfile_refreshes(self):
194 Normally refresh() is called on every monitor_db.Dispatcher.tick().
196 @returns: The number of refresh() calls before we forget a pidfile.
198 pidfile_timeout = global_config.global_config.get_config_value(
199 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
200 type=int, default=2000)
201 return pidfile_timeout
204 def _add_drone(self, hostname):
205 logging.info('Adding drone %s' % hostname)
206 drone = drones.get_drone(hostname)
207 if drone:
208 self._drones[drone.hostname] = drone
209 drone.call('initialize', self.absolute_path(''))
212 def _remove_drone(self, hostname):
213 self._drones.pop(hostname, None)
216 def refresh_drone_configs(self):
218 Reread global config options for all drones.
220 config = global_config.global_config
221 section = scheduler_config.CONFIG_SECTION
222 config.parse_config_file()
223 for hostname, drone in self._drones.iteritems():
224 disabled = config.get_config_value(
225 section, '%s_disabled' % hostname, default='')
226 drone.enabled = not bool(disabled)
228 drone.max_processes = config.get_config_value(
229 section, '%s_max_processes' % hostname, type=int,
230 default=scheduler_config.config.max_processes_per_drone)
232 allowed_users = config.get_config_value(
233 section, '%s_users' % hostname, default=None)
234 if allowed_users is not None:
235 allowed_users = set(allowed_users.split())
236 drone.allowed_users = allowed_users
238 self._reorder_drone_queue() # max_processes may have changed
241 def get_drones(self):
242 return self._drones.itervalues()
245 def _get_drone_for_process(self, process):
246 return self._drones[process.hostname]
249 def _get_drone_for_pidfile_id(self, pidfile_id):
250 pidfile_contents = self.get_pidfile_contents(pidfile_id)
251 assert pidfile_contents.process is not None
252 return self._get_drone_for_process(pidfile_contents.process)
255 def _drop_old_pidfiles(self):
256 # use items() since the dict is modified in unregister_pidfile()
257 for pidfile_id, info in self._registered_pidfile_info.items():
258 if info.age > self._get_max_pidfile_refreshes():
259 logging.warning('dropping leaked pidfile %s', pidfile_id)
260 self.unregister_pidfile(pidfile_id)
261 else:
262 info.age += 1
265 def _reset(self):
266 self._process_set = set()
267 self._pidfiles = {}
268 self._pidfiles_second_read = {}
269 self._drone_queue = []
272 def _call_all_drones(self, method, *args, **kwargs):
273 all_results = {}
274 for drone in self.get_drones():
275 all_results[drone] = drone.call(method, *args, **kwargs)
276 return all_results
279 def _parse_pidfile(self, drone, raw_contents):
280 contents = PidfileContents()
281 if not raw_contents:
282 return contents
283 lines = raw_contents.splitlines()
284 if len(lines) > 3:
285 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
286 (len(lines), lines))
287 try:
288 pid = int(lines[0])
289 contents.process = Process(drone.hostname, pid)
290 # if len(lines) == 2, assume we caught Autoserv between writing
291 # exit_status and num_failed_tests, so just ignore it and wait for
292 # the next cycle
293 if len(lines) == 3:
294 contents.exit_status = int(lines[1])
295 contents.num_tests_failed = int(lines[2])
296 except ValueError, exc:
297 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
299 return contents
302 def _process_pidfiles(self, drone, pidfiles, store_in_dict):
303 for pidfile_path, contents in pidfiles.iteritems():
304 pidfile_id = PidfileId(pidfile_path)
305 contents = self._parse_pidfile(drone, contents)
306 store_in_dict[pidfile_id] = contents
309 def _add_process(self, drone, process_info):
310 process = Process(drone.hostname, int(process_info['pid']),
311 int(process_info['ppid']))
312 self._process_set.add(process)
315 def _add_autoserv_process(self, drone, process_info):
316 assert process_info['comm'] == 'autoserv'
317 # only root autoserv processes have pgid == pid
318 if process_info['pgid'] != process_info['pid']:
319 return
320 self._add_process(drone, process_info)
323 def _enqueue_drone(self, drone):
324 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
327 def _reorder_drone_queue(self):
328 heapq.heapify(self._drone_queue)
331 def _compute_active_processes(self, drone):
332 drone.active_processes = 0
333 for pidfile_id, contents in self._pidfiles.iteritems():
334 is_running = contents.exit_status is None
335 on_this_drone = (contents.process
336 and contents.process.hostname == drone.hostname)
337 if is_running and on_this_drone:
338 info = self._registered_pidfile_info[pidfile_id]
339 if info.num_processes is not None:
340 drone.active_processes += info.num_processes
343 def refresh(self):
345 Called at the beginning of a scheduler cycle to refresh all process
346 information.
348 self._reset()
349 self._drop_old_pidfiles()
350 pidfile_paths = [pidfile_id.path
351 for pidfile_id in self._registered_pidfile_info]
352 all_results = self._call_all_drones('refresh', pidfile_paths)
354 for drone, results_list in all_results.iteritems():
355 results = results_list[0]
357 for process_info in results['autoserv_processes']:
358 self._add_autoserv_process(drone, process_info)
359 for process_info in results['parse_processes']:
360 self._add_process(drone, process_info)
362 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
363 self._process_pidfiles(drone, results['pidfiles_second_read'],
364 self._pidfiles_second_read)
366 self._compute_active_processes(drone)
367 if drone.enabled:
368 self._enqueue_drone(drone)
371 def execute_actions(self):
373 Called at the end of a scheduler cycle to execute all queued actions
374 on drones.
376 for drone in self._drones.values():
377 drone.execute_queued_calls()
379 try:
380 self._results_drone.execute_queued_calls()
381 except error.AutoservError:
382 warning = ('Results repository failed to execute calls:\n' +
383 traceback.format_exc())
384 email_manager.manager.enqueue_notify_email(
385 'Results repository error', warning)
386 self._results_drone.clear_call_queue()
389 def get_orphaned_autoserv_processes(self):
391 Returns a set of Process objects for orphaned processes only.
393 return set(process for process in self._process_set
394 if process.ppid == 1)
397 def kill_process(self, process):
399 Kill the given process.
401 logging.info('killing %s', process)
402 drone = self._get_drone_for_process(process)
403 drone.queue_call('kill_process', process)
406 def _ensure_directory_exists(self, path):
407 if not os.path.exists(path):
408 os.makedirs(path)
411 def total_running_processes(self):
412 return sum(drone.active_processes for drone in self.get_drones())
415 def max_runnable_processes(self, username, drone_hostnames_allowed):
417 Return the maximum number of processes that can be run (in a single
418 execution) given the current load on drones.
419 @param username: login of user to run a process. may be None.
420 @param drone_hostnames_allowed: list of drones that can be used. May be
421 None
423 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
424 if wrapper.drone.usable_by(username) and
425 (drone_hostnames_allowed is None or
426 wrapper.drone.hostname in
427 drone_hostnames_allowed)]
428 if not usable_drone_wrappers:
429 # all drones disabled or inaccessible
430 return 0
431 runnable_processes = [
432 wrapper.drone.max_processes - wrapper.drone.active_processes
433 for wrapper in usable_drone_wrappers]
434 return max([0] + runnable_processes)
437 def _least_loaded_drone(self, drones):
438 drone_to_use = drones[0]
439 for drone in drones[1:]:
440 if drone.used_capacity() < drone_to_use.used_capacity():
441 drone_to_use = drone
442 return drone_to_use
445 def _choose_drone_for_execution(self, num_processes, username,
446 drone_hostnames_allowed):
447 # cycle through drones is order of increasing used capacity until
448 # we find one that can handle these processes
449 checked_drones = []
450 usable_drones = []
451 drone_to_use = None
452 while self._drone_queue:
453 drone = heapq.heappop(self._drone_queue).drone
454 checked_drones.append(drone)
455 logging.info('Checking drone %s', drone.hostname)
456 if not drone.usable_by(username):
457 continue
459 drone_allowed = (drone_hostnames_allowed is None
460 or drone.hostname in drone_hostnames_allowed)
461 if not drone_allowed:
462 logging.debug('Drone %s not allowed: ', drone.hostname)
463 continue
465 usable_drones.append(drone)
467 if drone.active_processes + num_processes <= drone.max_processes:
468 drone_to_use = drone
469 break
470 logging.info('Drone %s has %d active + %s requested > %s max',
471 drone.hostname, drone.active_processes, num_processes,
472 drone.max_processes)
474 if not drone_to_use and usable_drones:
475 drone_summary = ','.join('%s %s/%s' % (drone.hostname,
476 drone.active_processes,
477 drone.max_processes)
478 for drone in usable_drones)
479 logging.error('No drone has capacity to handle %d processes (%s) '
480 'for user %s', num_processes, drone_summary, username)
481 drone_to_use = self._least_loaded_drone(usable_drones)
483 # refill _drone_queue
484 for drone in checked_drones:
485 self._enqueue_drone(drone)
487 return drone_to_use
490 def _substitute_working_directory_into_command(self, command,
491 working_directory):
492 for i, item in enumerate(command):
493 if item is WORKING_DIRECTORY:
494 command[i] = working_directory
497 def execute_command(self, command, working_directory, pidfile_name,
498 num_processes, log_file=None, paired_with_pidfile=None,
499 username=None, drone_hostnames_allowed=None):
501 Execute the given command, taken as an argv list.
503 @param command: command to execute as a list. if any item is
504 WORKING_DIRECTORY, the absolute path to the working directory
505 will be substituted for it.
506 @param working_directory: directory in which the pidfile will be written
507 @param pidfile_name: name of the pidfile this process will write
508 @param num_processes: number of processes to account for from this
509 execution
510 @param log_file (optional): path (in the results repository) to hold
511 command output.
512 @param paired_with_pidfile (optional): a PidfileId for an
513 already-executed process; the new process will execute on the
514 same drone as the previous process.
515 @param username (optional): login of the user responsible for this
516 process.
517 @param drone_hostnames_allowed (optional): hostnames of the drones that
518 this command is allowed to
519 execute on
521 abs_working_directory = self.absolute_path(working_directory)
522 if not log_file:
523 log_file = self.get_temporary_path('execute')
524 log_file = self.absolute_path(log_file)
526 self._substitute_working_directory_into_command(command,
527 abs_working_directory)
529 if paired_with_pidfile:
530 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
531 else:
532 drone = self._choose_drone_for_execution(num_processes, username,
533 drone_hostnames_allowed)
535 if not drone:
536 raise DroneManagerError('command failed; no drones available: %s'
537 % command)
539 logging.info("command = %s" % command)
540 logging.info('log file = %s:%s' % (drone.hostname, log_file))
541 self._write_attached_files(working_directory, drone)
542 drone.queue_call('execute_command', command, abs_working_directory,
543 log_file, pidfile_name)
544 drone.active_processes += num_processes
545 self._reorder_drone_queue()
547 pidfile_path = os.path.join(abs_working_directory, pidfile_name)
548 pidfile_id = PidfileId(pidfile_path)
549 self.register_pidfile(pidfile_id)
550 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
551 return pidfile_id
554 def get_pidfile_id_from(self, execution_tag, pidfile_name):
555 path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
556 return PidfileId(path)
559 def register_pidfile(self, pidfile_id):
561 Indicate that the DroneManager should look for the given pidfile when
562 refreshing.
564 if pidfile_id not in self._registered_pidfile_info:
565 logging.info('monitoring pidfile %s', pidfile_id)
566 self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
567 self._reset_pidfile_age(pidfile_id)
570 def _reset_pidfile_age(self, pidfile_id):
571 if pidfile_id in self._registered_pidfile_info:
572 self._registered_pidfile_info[pidfile_id].age = 0
575 def unregister_pidfile(self, pidfile_id):
576 if pidfile_id in self._registered_pidfile_info:
577 logging.info('forgetting pidfile %s', pidfile_id)
578 del self._registered_pidfile_info[pidfile_id]
581 def declare_process_count(self, pidfile_id, num_processes):
582 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
585 def get_pidfile_contents(self, pidfile_id, use_second_read=False):
587 Retrieve a PidfileContents object for the given pidfile_id. If
588 use_second_read is True, use results that were read after the processes
589 were checked, instead of before.
591 self._reset_pidfile_age(pidfile_id)
592 if use_second_read:
593 pidfile_map = self._pidfiles_second_read
594 else:
595 pidfile_map = self._pidfiles
596 return pidfile_map.get(pidfile_id, PidfileContents())
599 def is_process_running(self, process):
601 Check if the given process is in the running process list.
603 return process in self._process_set
606 def get_temporary_path(self, base_name):
608 Get a new temporary path guaranteed to be unique across all drones
609 for this scheduler execution.
611 self._temporary_path_counter += 1
612 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
613 '%s.%s' % (base_name, self._temporary_path_counter))
616 def absolute_path(self, path, on_results_repository=False):
617 if on_results_repository:
618 base_dir = self._results_dir
619 else:
620 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
621 _DRONE_RESULTS_DIR_SUFFIX)
622 return os.path.join(base_dir, path)
625 def _copy_results_helper(self, process, source_path, destination_path,
626 to_results_repository=False):
627 full_source = self.absolute_path(source_path)
628 full_destination = self.absolute_path(
629 destination_path, on_results_repository=to_results_repository)
630 source_drone = self._get_drone_for_process(process)
631 if to_results_repository:
632 source_drone.send_file_to(self._results_drone, full_source,
633 full_destination, can_fail=True)
634 else:
635 source_drone.queue_call('copy_file_or_directory', full_source,
636 full_destination)
639 def copy_to_results_repository(self, process, source_path,
640 destination_path=None):
642 Copy results from the given process at source_path to destination_path
643 in the results repository.
645 if destination_path is None:
646 destination_path = source_path
647 self._copy_results_helper(process, source_path, destination_path,
648 to_results_repository=True)
651 def copy_results_on_drone(self, process, source_path, destination_path):
653 Copy a results directory from one place to another on the drone.
655 self._copy_results_helper(process, source_path, destination_path)
658 def _write_attached_files(self, results_dir, drone):
659 attached_files = self._attached_files.pop(results_dir, {})
660 for file_path, contents in attached_files.iteritems():
661 drone.queue_call('write_to_file', self.absolute_path(file_path),
662 contents)
665 def attach_file_to_execution(self, results_dir, file_contents,
666 file_path=None):
668 When the process for the results directory is executed, the given file
669 contents will be placed in a file on the drone. Returns the path at
670 which the file will be placed.
672 if not file_path:
673 file_path = self.get_temporary_path('attach')
674 files_for_execution = self._attached_files.setdefault(results_dir, {})
675 assert file_path not in files_for_execution
676 files_for_execution[file_path] = file_contents
677 return file_path
680 def write_lines_to_file(self, file_path, lines, paired_with_process=None):
682 Write the given lines (as a list of strings) to a file. If
683 paired_with_process is given, the file will be written on the drone
684 running the given Process. Otherwise, the file will be written to the
685 results repository.
687 file_contents = '\n'.join(lines) + '\n'
688 if paired_with_process:
689 drone = self._get_drone_for_process(paired_with_process)
690 on_results_repository = False
691 else:
692 drone = self._results_drone
693 on_results_repository = True
694 full_path = self.absolute_path(
695 file_path, on_results_repository=on_results_repository)
696 drone.queue_call('write_to_file', full_path, file_contents)
699 _the_instance = None
701 def instance():
702 if _the_instance is None:
703 _set_instance(DroneManager())
704 return _the_instance
707 def _set_instance(instance): # usable for testing
708 global _the_instance
709 _the_instance = instance