1 import os
, re
, shutil
, signal
, subprocess
, errno
, time
, heapq
, traceback
3 from autotest_lib
.client
.common_lib
import error
, global_config
4 from autotest_lib
.scheduler
import email_manager
, drone_utility
, drones
5 from autotest_lib
.scheduler
import scheduler_config
8 # results on drones will be placed under the drone_installation_directory in a
9 # directory with this name
10 _DRONE_RESULTS_DIR_SUFFIX
= 'results'
12 WORKING_DIRECTORY
= object() # see execute_command()
15 AUTOSERV_PID_FILE
= '.autoserv_execute'
16 CRASHINFO_PID_FILE
= '.collect_crashinfo_execute'
17 PARSER_PID_FILE
= '.parser_execute'
18 ARCHIVER_PID_FILE
= '.archiver_execute'
20 ALL_PIDFILE_NAMES
= (AUTOSERV_PID_FILE
, CRASHINFO_PID_FILE
, PARSER_PID_FILE
,
24 class DroneManagerError(Exception):
28 class CustomEquals(object):
30 raise NotImplementedError
33 def __eq__(self
, other
):
34 if not isinstance(other
, type(self
)):
36 return self
._id
() == other
._id
()
39 def __ne__(self
, other
):
40 return not self
== other
44 return hash(self
._id
())
47 class Process(CustomEquals
):
48 def __init__(self
, hostname
, pid
, ppid
=None):
49 self
.hostname
= hostname
54 return (self
.hostname
, self
.pid
)
58 return '%s/%s' % (self
.hostname
, self
.pid
)
62 return super(Process
, self
).__repr
__() + '<%s>' % self
65 class PidfileId(CustomEquals
):
66 def __init__(self
, path
):
78 class _PidfileInfo(object):
83 class PidfileContents(object):
86 num_tests_failed
= None
93 return self
.process
and not self
.exit_status
96 class InvalidPidfile(object):
97 def __init__(self
, error
):
101 def is_invalid(self
):
105 def is_running(self
):
113 class _DroneHeapWrapper(object):
114 """Wrapper to compare drones based on used_capacity().
116 These objects can be used to keep a heap of drones by capacity.
118 def __init__(self
, drone
):
122 def __cmp__(self
, other
):
123 assert isinstance(other
, _DroneHeapWrapper
)
124 return cmp(self
.drone
.used_capacity(), other
.drone
.used_capacity())
127 class DroneManager(object):
129 This class acts as an interface from the scheduler to drones, whether it be
130 only a single "drone" for localhost or multiple remote drones.
132 All paths going into and out of this class are relative to the full results
133 directory, except for those returns by absolute_path().
136 # absolute path of base results dir
137 self
._results
_dir
= None
138 # holds Process objects
139 self
._process
_set
= set()
140 # maps PidfileId to PidfileContents
143 self
._pidfiles
_second
_read
= {}
144 # maps PidfileId to _PidfileInfo
145 self
._registered
_pidfile
_info
= {}
146 # used to generate unique temporary paths
147 self
._temporary
_path
_counter
= 0
148 # maps hostname to Drone object
150 self
._results
_drone
= None
151 # maps results dir to dict mapping file path to contents
152 self
._attached
_files
= {}
153 # heapq of _DroneHeapWrappers
154 self
._drone
_queue
= []
157 def initialize(self
, base_results_dir
, drone_hostnames
,
158 results_repository_hostname
):
159 self
._results
_dir
= base_results_dir
161 for hostname
in drone_hostnames
:
162 self
._add
_drone
(hostname
)
165 # all drones failed to initialize
166 raise DroneManagerError('No valid drones found')
168 self
.refresh_drone_configs()
170 logging
.info('Using results repository on %s',
171 results_repository_hostname
)
172 self
._results
_drone
= drones
.get_drone(results_repository_hostname
)
173 results_installation_dir
= global_config
.global_config
.get_config_value(
174 scheduler_config
.CONFIG_SECTION
,
175 'results_host_installation_directory', default
=None)
176 if results_installation_dir
:
177 self
._results
_drone
.set_autotest_install_dir(
178 results_installation_dir
)
179 # don't initialize() the results drone - we don't want to clear out any
180 # directories and we don't need to kill any processes
183 def reinitialize_drones(self
):
184 self
._call
_all
_drones
('initialize', self
._results
_dir
)
188 for drone
in self
.get_drones():
192 def _get_max_pidfile_refreshes(self
):
194 Normally refresh() is called on every monitor_db.Dispatcher.tick().
196 @returns: The number of refresh() calls before we forget a pidfile.
198 pidfile_timeout
= global_config
.global_config
.get_config_value(
199 scheduler_config
.CONFIG_SECTION
, 'max_pidfile_refreshes',
200 type=int, default
=2000)
201 return pidfile_timeout
204 def _add_drone(self
, hostname
):
205 logging
.info('Adding drone %s' % hostname
)
206 drone
= drones
.get_drone(hostname
)
208 self
._drones
[drone
.hostname
] = drone
209 drone
.call('initialize', self
.absolute_path(''))
212 def _remove_drone(self
, hostname
):
213 self
._drones
.pop(hostname
, None)
216 def refresh_drone_configs(self
):
218 Reread global config options for all drones.
220 config
= global_config
.global_config
221 section
= scheduler_config
.CONFIG_SECTION
222 config
.parse_config_file()
223 for hostname
, drone
in self
._drones
.iteritems():
224 disabled
= config
.get_config_value(
225 section
, '%s_disabled' % hostname
, default
='')
226 drone
.enabled
= not bool(disabled
)
228 drone
.max_processes
= config
.get_config_value(
229 section
, '%s_max_processes' % hostname
, type=int,
230 default
=scheduler_config
.config
.max_processes_per_drone
)
232 allowed_users
= config
.get_config_value(
233 section
, '%s_users' % hostname
, default
=None)
234 if allowed_users
is not None:
235 allowed_users
= set(allowed_users
.split())
236 drone
.allowed_users
= allowed_users
238 self
._reorder
_drone
_queue
() # max_processes may have changed
241 def get_drones(self
):
242 return self
._drones
.itervalues()
245 def _get_drone_for_process(self
, process
):
246 return self
._drones
[process
.hostname
]
249 def _get_drone_for_pidfile_id(self
, pidfile_id
):
250 pidfile_contents
= self
.get_pidfile_contents(pidfile_id
)
251 assert pidfile_contents
.process
is not None
252 return self
._get
_drone
_for
_process
(pidfile_contents
.process
)
255 def _drop_old_pidfiles(self
):
256 # use items() since the dict is modified in unregister_pidfile()
257 for pidfile_id
, info
in self
._registered
_pidfile
_info
.items():
258 if info
.age
> self
._get
_max
_pidfile
_refreshes
():
259 logging
.warning('dropping leaked pidfile %s', pidfile_id
)
260 self
.unregister_pidfile(pidfile_id
)
266 self
._process
_set
= set()
268 self
._pidfiles
_second
_read
= {}
269 self
._drone
_queue
= []
272 def _call_all_drones(self
, method
, *args
, **kwargs
):
274 for drone
in self
.get_drones():
275 all_results
[drone
] = drone
.call(method
, *args
, **kwargs
)
279 def _parse_pidfile(self
, drone
, raw_contents
):
280 contents
= PidfileContents()
283 lines
= raw_contents
.splitlines()
285 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
289 contents
.process
= Process(drone
.hostname
, pid
)
290 # if len(lines) == 2, assume we caught Autoserv between writing
291 # exit_status and num_failed_tests, so just ignore it and wait for
294 contents
.exit_status
= int(lines
[1])
295 contents
.num_tests_failed
= int(lines
[2])
296 except ValueError, exc
:
297 return InvalidPidfile('Corrupt pid file: ' + str(exc
.args
))
302 def _process_pidfiles(self
, drone
, pidfiles
, store_in_dict
):
303 for pidfile_path
, contents
in pidfiles
.iteritems():
304 pidfile_id
= PidfileId(pidfile_path
)
305 contents
= self
._parse
_pidfile
(drone
, contents
)
306 store_in_dict
[pidfile_id
] = contents
309 def _add_process(self
, drone
, process_info
):
310 process
= Process(drone
.hostname
, int(process_info
['pid']),
311 int(process_info
['ppid']))
312 self
._process
_set
.add(process
)
315 def _add_autoserv_process(self
, drone
, process_info
):
316 assert process_info
['comm'] == 'autoserv'
317 # only root autoserv processes have pgid == pid
318 if process_info
['pgid'] != process_info
['pid']:
320 self
._add
_process
(drone
, process_info
)
323 def _enqueue_drone(self
, drone
):
324 heapq
.heappush(self
._drone
_queue
, _DroneHeapWrapper(drone
))
327 def _reorder_drone_queue(self
):
328 heapq
.heapify(self
._drone
_queue
)
331 def _compute_active_processes(self
, drone
):
332 drone
.active_processes
= 0
333 for pidfile_id
, contents
in self
._pidfiles
.iteritems():
334 is_running
= contents
.exit_status
is None
335 on_this_drone
= (contents
.process
336 and contents
.process
.hostname
== drone
.hostname
)
337 if is_running
and on_this_drone
:
338 info
= self
._registered
_pidfile
_info
[pidfile_id
]
339 if info
.num_processes
is not None:
340 drone
.active_processes
+= info
.num_processes
345 Called at the beginning of a scheduler cycle to refresh all process
349 self
._drop
_old
_pidfiles
()
350 pidfile_paths
= [pidfile_id
.path
351 for pidfile_id
in self
._registered
_pidfile
_info
]
352 all_results
= self
._call
_all
_drones
('refresh', pidfile_paths
)
354 for drone
, results_list
in all_results
.iteritems():
355 results
= results_list
[0]
357 for process_info
in results
['autoserv_processes']:
358 self
._add
_autoserv
_process
(drone
, process_info
)
359 for process_info
in results
['parse_processes']:
360 self
._add
_process
(drone
, process_info
)
362 self
._process
_pidfiles
(drone
, results
['pidfiles'], self
._pidfiles
)
363 self
._process
_pidfiles
(drone
, results
['pidfiles_second_read'],
364 self
._pidfiles
_second
_read
)
366 self
._compute
_active
_processes
(drone
)
368 self
._enqueue
_drone
(drone
)
371 def execute_actions(self
):
373 Called at the end of a scheduler cycle to execute all queued actions
376 for drone
in self
._drones
.values():
377 drone
.execute_queued_calls()
380 self
._results
_drone
.execute_queued_calls()
381 except error
.AutoservError
:
382 warning
= ('Results repository failed to execute calls:\n' +
383 traceback
.format_exc())
384 email_manager
.manager
.enqueue_notify_email(
385 'Results repository error', warning
)
386 self
._results
_drone
.clear_call_queue()
389 def get_orphaned_autoserv_processes(self
):
391 Returns a set of Process objects for orphaned processes only.
393 return set(process
for process
in self
._process
_set
394 if process
.ppid
== 1)
397 def kill_process(self
, process
):
399 Kill the given process.
401 logging
.info('killing %s', process
)
402 drone
= self
._get
_drone
_for
_process
(process
)
403 drone
.queue_call('kill_process', process
)
406 def _ensure_directory_exists(self
, path
):
407 if not os
.path
.exists(path
):
411 def total_running_processes(self
):
412 return sum(drone
.active_processes
for drone
in self
.get_drones())
415 def max_runnable_processes(self
, username
, drone_hostnames_allowed
):
417 Return the maximum number of processes that can be run (in a single
418 execution) given the current load on drones.
419 @param username: login of user to run a process. may be None.
420 @param drone_hostnames_allowed: list of drones that can be used. May be
423 usable_drone_wrappers
= [wrapper
for wrapper
in self
._drone
_queue
424 if wrapper
.drone
.usable_by(username
) and
425 (drone_hostnames_allowed
is None or
426 wrapper
.drone
.hostname
in
427 drone_hostnames_allowed
)]
428 if not usable_drone_wrappers
:
429 # all drones disabled or inaccessible
431 runnable_processes
= [
432 wrapper
.drone
.max_processes
- wrapper
.drone
.active_processes
433 for wrapper
in usable_drone_wrappers
]
434 return max([0] + runnable_processes
)
437 def _least_loaded_drone(self
, drones
):
438 drone_to_use
= drones
[0]
439 for drone
in drones
[1:]:
440 if drone
.used_capacity() < drone_to_use
.used_capacity():
445 def _choose_drone_for_execution(self
, num_processes
, username
,
446 drone_hostnames_allowed
):
447 # cycle through drones is order of increasing used capacity until
448 # we find one that can handle these processes
452 while self
._drone
_queue
:
453 drone
= heapq
.heappop(self
._drone
_queue
).drone
454 checked_drones
.append(drone
)
455 logging
.info('Checking drone %s', drone
.hostname
)
456 if not drone
.usable_by(username
):
459 drone_allowed
= (drone_hostnames_allowed
is None
460 or drone
.hostname
in drone_hostnames_allowed
)
461 if not drone_allowed
:
462 logging
.debug('Drone %s not allowed: ', drone
.hostname
)
465 usable_drones
.append(drone
)
467 if drone
.active_processes
+ num_processes
<= drone
.max_processes
:
470 logging
.info('Drone %s has %d active + %s requested > %s max',
471 drone
.hostname
, drone
.active_processes
, num_processes
,
474 if not drone_to_use
and usable_drones
:
475 drone_summary
= ','.join('%s %s/%s' % (drone
.hostname
,
476 drone
.active_processes
,
478 for drone
in usable_drones
)
479 logging
.error('No drone has capacity to handle %d processes (%s) '
480 'for user %s', num_processes
, drone_summary
, username
)
481 drone_to_use
= self
._least
_loaded
_drone
(usable_drones
)
483 # refill _drone_queue
484 for drone
in checked_drones
:
485 self
._enqueue
_drone
(drone
)
490 def _substitute_working_directory_into_command(self
, command
,
492 for i
, item
in enumerate(command
):
493 if item
is WORKING_DIRECTORY
:
494 command
[i
] = working_directory
497 def execute_command(self
, command
, working_directory
, pidfile_name
,
498 num_processes
, log_file
=None, paired_with_pidfile
=None,
499 username
=None, drone_hostnames_allowed
=None):
501 Execute the given command, taken as an argv list.
503 @param command: command to execute as a list. if any item is
504 WORKING_DIRECTORY, the absolute path to the working directory
505 will be substituted for it.
506 @param working_directory: directory in which the pidfile will be written
507 @param pidfile_name: name of the pidfile this process will write
508 @param num_processes: number of processes to account for from this
510 @param log_file (optional): path (in the results repository) to hold
512 @param paired_with_pidfile (optional): a PidfileId for an
513 already-executed process; the new process will execute on the
514 same drone as the previous process.
515 @param username (optional): login of the user responsible for this
517 @param drone_hostnames_allowed (optional): hostnames of the drones that
518 this command is allowed to
521 abs_working_directory
= self
.absolute_path(working_directory
)
523 log_file
= self
.get_temporary_path('execute')
524 log_file
= self
.absolute_path(log_file
)
526 self
._substitute
_working
_directory
_into
_command
(command
,
527 abs_working_directory
)
529 if paired_with_pidfile
:
530 drone
= self
._get
_drone
_for
_pidfile
_id
(paired_with_pidfile
)
532 drone
= self
._choose
_drone
_for
_execution
(num_processes
, username
,
533 drone_hostnames_allowed
)
536 raise DroneManagerError('command failed; no drones available: %s'
539 logging
.info("command = %s" % command
)
540 logging
.info('log file = %s:%s' % (drone
.hostname
, log_file
))
541 self
._write
_attached
_files
(working_directory
, drone
)
542 drone
.queue_call('execute_command', command
, abs_working_directory
,
543 log_file
, pidfile_name
)
544 drone
.active_processes
+= num_processes
545 self
._reorder
_drone
_queue
()
547 pidfile_path
= os
.path
.join(abs_working_directory
, pidfile_name
)
548 pidfile_id
= PidfileId(pidfile_path
)
549 self
.register_pidfile(pidfile_id
)
550 self
._registered
_pidfile
_info
[pidfile_id
].num_processes
= num_processes
554 def get_pidfile_id_from(self
, execution_tag
, pidfile_name
):
555 path
= os
.path
.join(self
.absolute_path(execution_tag
), pidfile_name
)
556 return PidfileId(path
)
559 def register_pidfile(self
, pidfile_id
):
561 Indicate that the DroneManager should look for the given pidfile when
564 if pidfile_id
not in self
._registered
_pidfile
_info
:
565 logging
.info('monitoring pidfile %s', pidfile_id
)
566 self
._registered
_pidfile
_info
[pidfile_id
] = _PidfileInfo()
567 self
._reset
_pidfile
_age
(pidfile_id
)
570 def _reset_pidfile_age(self
, pidfile_id
):
571 if pidfile_id
in self
._registered
_pidfile
_info
:
572 self
._registered
_pidfile
_info
[pidfile_id
].age
= 0
575 def unregister_pidfile(self
, pidfile_id
):
576 if pidfile_id
in self
._registered
_pidfile
_info
:
577 logging
.info('forgetting pidfile %s', pidfile_id
)
578 del self
._registered
_pidfile
_info
[pidfile_id
]
581 def declare_process_count(self
, pidfile_id
, num_processes
):
582 self
._registered
_pidfile
_info
[pidfile_id
].num_processes
= num_processes
585 def get_pidfile_contents(self
, pidfile_id
, use_second_read
=False):
587 Retrieve a PidfileContents object for the given pidfile_id. If
588 use_second_read is True, use results that were read after the processes
589 were checked, instead of before.
591 self
._reset
_pidfile
_age
(pidfile_id
)
593 pidfile_map
= self
._pidfiles
_second
_read
595 pidfile_map
= self
._pidfiles
596 return pidfile_map
.get(pidfile_id
, PidfileContents())
599 def is_process_running(self
, process
):
601 Check if the given process is in the running process list.
603 return process
in self
._process
_set
606 def get_temporary_path(self
, base_name
):
608 Get a new temporary path guaranteed to be unique across all drones
609 for this scheduler execution.
611 self
._temporary
_path
_counter
+= 1
612 return os
.path
.join(drone_utility
._TEMPORARY
_DIRECTORY
,
613 '%s.%s' % (base_name
, self
._temporary
_path
_counter
))
616 def absolute_path(self
, path
, on_results_repository
=False):
617 if on_results_repository
:
618 base_dir
= self
._results
_dir
620 base_dir
= os
.path
.join(drones
.AUTOTEST_INSTALL_DIR
,
621 _DRONE_RESULTS_DIR_SUFFIX
)
622 return os
.path
.join(base_dir
, path
)
625 def _copy_results_helper(self
, process
, source_path
, destination_path
,
626 to_results_repository
=False):
627 full_source
= self
.absolute_path(source_path
)
628 full_destination
= self
.absolute_path(
629 destination_path
, on_results_repository
=to_results_repository
)
630 source_drone
= self
._get
_drone
_for
_process
(process
)
631 if to_results_repository
:
632 source_drone
.send_file_to(self
._results
_drone
, full_source
,
633 full_destination
, can_fail
=True)
635 source_drone
.queue_call('copy_file_or_directory', full_source
,
639 def copy_to_results_repository(self
, process
, source_path
,
640 destination_path
=None):
642 Copy results from the given process at source_path to destination_path
643 in the results repository.
645 if destination_path
is None:
646 destination_path
= source_path
647 self
._copy
_results
_helper
(process
, source_path
, destination_path
,
648 to_results_repository
=True)
651 def copy_results_on_drone(self
, process
, source_path
, destination_path
):
653 Copy a results directory from one place to another on the drone.
655 self
._copy
_results
_helper
(process
, source_path
, destination_path
)
658 def _write_attached_files(self
, results_dir
, drone
):
659 attached_files
= self
._attached
_files
.pop(results_dir
, {})
660 for file_path
, contents
in attached_files
.iteritems():
661 drone
.queue_call('write_to_file', self
.absolute_path(file_path
),
665 def attach_file_to_execution(self
, results_dir
, file_contents
,
668 When the process for the results directory is executed, the given file
669 contents will be placed in a file on the drone. Returns the path at
670 which the file will be placed.
673 file_path
= self
.get_temporary_path('attach')
674 files_for_execution
= self
._attached
_files
.setdefault(results_dir
, {})
675 assert file_path
not in files_for_execution
676 files_for_execution
[file_path
] = file_contents
680 def write_lines_to_file(self
, file_path
, lines
, paired_with_process
=None):
682 Write the given lines (as a list of strings) to a file. If
683 paired_with_process is given, the file will be written on the drone
684 running the given Process. Otherwise, the file will be written to the
687 file_contents
= '\n'.join(lines
) + '\n'
688 if paired_with_process
:
689 drone
= self
._get
_drone
_for
_process
(paired_with_process
)
690 on_results_repository
= False
692 drone
= self
._results
_drone
693 on_results_repository
= True
694 full_path
= self
.absolute_path(
695 file_path
, on_results_repository
=on_results_repository
)
696 drone
.queue_call('write_to_file', full_path
, file_contents
)
702 if _the_instance
is None:
703 _set_instance(DroneManager())
707 def _set_instance(instance
): # usable for testing
709 _the_instance
= instance