KVM test: installer: Fix KojiInstaller bug
[autotest-zwu.git] / scheduler / monitor_db.py
blob6be4af2dff73812721382f9bebe572eb42c21258
1 #!/usr/bin/python -u
3 """
4 Autotest scheduler
5 """
8 import common
9 import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
10 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
11 import itertools, logging, weakref, gc
13 import MySQLdb
15 from autotest_lib.scheduler import scheduler_logging_config
16 from autotest_lib.frontend import setup_django_environment
18 import django.db
20 from autotest_lib.client.common_lib import global_config, logging_manager
21 from autotest_lib.client.common_lib import host_protections, utils
22 from autotest_lib.database import database_connection
23 from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
24 from autotest_lib.frontend.afe import model_attributes
25 from autotest_lib.scheduler import drone_manager, drones, email_manager
26 from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
27 from autotest_lib.scheduler import status_server, scheduler_config
28 from autotest_lib.scheduler import scheduler_models
29 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30 PID_FILE_PREFIX = 'monitor_db'
32 RESULTS_DIR = '.'
33 AUTOSERV_NICE_LEVEL = 10
34 DB_CONFIG_SECTION = 'AUTOTEST_WEB'
35 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37 if os.environ.has_key('AUTOTEST_DIR'):
38 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
39 AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40 AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42 if AUTOTEST_SERVER_DIR not in sys.path:
43 sys.path.insert(0, AUTOTEST_SERVER_DIR)
45 # error message to leave in results dir when an autoserv process disappears
46 # mysteriously
47 _LOST_PROCESS_ERROR = """\
48 Autoserv failed abnormally during execution for this job, probably due to a
49 system error on the Autotest server. Full results may not be available. Sorry.
50 """
52 _db = None
53 _shutdown = False
54 _autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
55 _testing_mode = False
56 _drone_manager = None
58 def _parser_path_default(install_dir):
59 return os.path.join(install_dir, 'tko', 'parse')
60 _parser_path_func = utils.import_site_function(
61 __file__, 'autotest_lib.scheduler.site_monitor_db',
62 'parser_path', _parser_path_default)
63 _parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
66 def _get_pidfile_timeout_secs():
67 """@returns How long to wait for autoserv to write pidfile."""
68 pidfile_timeout_mins = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
70 return pidfile_timeout_mins * 60
73 def _site_init_monitor_db_dummy():
74 return {}
77 def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
80 raise host_scheduler.SchedulerError(
81 'Drone sets are enabled, but no default is set')
84 def _sanity_check():
85 """Make sure the configs are consistent before starting the scheduler"""
86 _verify_default_drone_set_exists()
89 def main():
90 try:
91 try:
92 main_without_exception_handling()
93 except SystemExit:
94 raise
95 except:
96 logging.exception('Exception escaping in monitor_db')
97 raise
98 finally:
99 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
102 def main_without_exception_handling():
103 setup_logging()
105 usage = 'usage: %prog [options] results_dir'
106 parser = optparse.OptionParser(usage)
107 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
108 action='store_true')
109 parser.add_option('--test', help='Indicate that scheduler is under ' +
110 'test and should use dummy autoserv and no parsing',
111 action='store_true')
112 (options, args) = parser.parse_args()
113 if len(args) != 1:
114 parser.print_usage()
115 return
117 scheduler_enabled = global_config.global_config.get_config_value(
118 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
120 if not scheduler_enabled:
121 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
122 "global_config's SCHEDULER section to enabled it. Exiting.")
123 logging.error(msg)
124 sys.exit(1)
126 global RESULTS_DIR
127 RESULTS_DIR = args[0]
129 site_init = utils.import_site_function(__file__,
130 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
131 _site_init_monitor_db_dummy)
132 site_init()
134 # Change the cwd while running to avoid issues incase we were launched from
135 # somewhere odd (such as a random NFS home directory of the person running
136 # sudo to launch us as the appropriate user).
137 os.chdir(RESULTS_DIR)
139 # This is helpful for debugging why stuff a scheduler launches is
140 # misbehaving.
141 logging.info('os.environ: %s', os.environ)
143 if options.test:
144 global _autoserv_path
145 _autoserv_path = 'autoserv_dummy'
146 global _testing_mode
147 _testing_mode = True
149 server = status_server.StatusServer()
150 server.start()
152 try:
153 initialize()
154 dispatcher = Dispatcher()
155 dispatcher.initialize(recover_hosts=options.recover_hosts)
157 while not _shutdown and not server._shutdown_scheduler:
158 dispatcher.tick()
159 time.sleep(scheduler_config.config.tick_pause_sec)
160 except:
161 email_manager.manager.log_stacktrace(
162 "Uncaught exception; terminating monitor_db")
164 email_manager.manager.send_queued_emails()
165 server.shutdown()
166 _drone_manager.shutdown()
167 _db.disconnect()
170 def setup_logging():
171 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
172 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
173 logging_manager.configure_logging(
174 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
175 logfile_name=log_name)
178 def handle_sigint(signum, frame):
179 global _shutdown
180 _shutdown = True
181 logging.info("Shutdown request received.")
184 def initialize():
185 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
186 logging.info("My PID is %d", os.getpid())
188 if utils.program_is_alive(PID_FILE_PREFIX):
189 logging.critical("monitor_db already running, aborting!")
190 sys.exit(1)
191 utils.write_pid(PID_FILE_PREFIX)
193 if _testing_mode:
194 global_config.global_config.override_config_value(
195 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
197 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
198 global _db
199 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
200 _db.connect(db_type='django')
202 # ensure Django connection is in autocommit
203 setup_django_environment.enable_autocommit()
204 # bypass the readonly connection
205 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
207 logging.info("Setting signal handler")
208 signal.signal(signal.SIGINT, handle_sigint)
210 initialize_globals()
211 scheduler_models.initialize()
213 drones = global_config.global_config.get_config_value(
214 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
215 drone_list = [hostname.strip() for hostname in drones.split(',')]
216 results_host = global_config.global_config.get_config_value(
217 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
218 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
220 logging.info("Connected! Running...")
223 def initialize_globals():
224 global _drone_manager
225 _drone_manager = drone_manager.instance()
228 def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
229 verbose=True):
231 @returns The autoserv command line as a list of executable + parameters.
233 @param machines - string - A machine or comma separated list of machines
234 for the (-m) flag.
235 @param extra_args - list - Additional arguments to pass to autoserv.
236 @param job - Job object - If supplied, -u owner and -l name parameters
237 will be added.
238 @param queue_entry - A HostQueueEntry object - If supplied and no Job
239 object was supplied, this will be used to lookup the Job object.
241 autoserv_argv = [_autoserv_path, '-p',
242 '-r', drone_manager.WORKING_DIRECTORY]
243 if machines:
244 autoserv_argv += ['-m', machines]
245 if job or queue_entry:
246 if not job:
247 job = queue_entry.job
248 autoserv_argv += ['-u', job.owner, '-l', job.name]
249 if verbose:
250 autoserv_argv.append('--verbose')
251 return autoserv_argv + extra_args
254 class Dispatcher(object):
255 def __init__(self):
256 self._agents = []
257 self._last_clean_time = time.time()
258 self._host_scheduler = host_scheduler.HostScheduler(_db)
259 user_cleanup_time = scheduler_config.config.clean_interval
260 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
261 _db, user_cleanup_time)
262 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
263 self._host_agents = {}
264 self._queue_entry_agents = {}
265 self._tick_count = 0
266 self._last_garbage_stats_time = time.time()
267 self._seconds_between_garbage_stats = 60 * (
268 global_config.global_config.get_config_value(
269 scheduler_config.CONFIG_SECTION,
270 'gc_stats_interval_mins', type=int, default=6*60))
273 def initialize(self, recover_hosts=True):
274 self._periodic_cleanup.initialize()
275 self._24hr_upkeep.initialize()
277 # always recover processes
278 self._recover_processes()
280 if recover_hosts:
281 self._recover_hosts()
283 self._host_scheduler.recovery_on_startup()
286 def tick(self):
287 self._garbage_collection()
288 _drone_manager.refresh()
289 self._run_cleanup()
290 self._find_aborting()
291 self._process_recurring_runs()
292 self._schedule_delay_tasks()
293 self._schedule_running_host_queue_entries()
294 self._schedule_special_tasks()
295 self._schedule_new_jobs()
296 self._handle_agents()
297 self._host_scheduler.tick()
298 _drone_manager.execute_actions()
299 email_manager.manager.send_queued_emails()
300 django.db.reset_queries()
301 self._tick_count += 1
304 def _run_cleanup(self):
305 self._periodic_cleanup.run_cleanup_maybe()
306 self._24hr_upkeep.run_cleanup_maybe()
309 def _garbage_collection(self):
310 threshold_time = time.time() - self._seconds_between_garbage_stats
311 if threshold_time < self._last_garbage_stats_time:
312 # Don't generate these reports very often.
313 return
315 self._last_garbage_stats_time = time.time()
316 # Force a full level 0 collection (because we can, it doesn't hurt
317 # at this interval).
318 gc.collect()
319 logging.info('Logging garbage collector stats on tick %d.',
320 self._tick_count)
321 gc_stats._log_garbage_collector_stats()
324 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
325 for object_id in object_ids:
326 agent_dict.setdefault(object_id, set()).add(agent)
329 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
330 for object_id in object_ids:
331 assert object_id in agent_dict
332 agent_dict[object_id].remove(agent)
335 def add_agent_task(self, agent_task):
336 agent = Agent(agent_task)
337 self._agents.append(agent)
338 agent.dispatcher = self
339 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
340 self._register_agent_for_ids(self._queue_entry_agents,
341 agent.queue_entry_ids, agent)
344 def get_agents_for_entry(self, queue_entry):
346 Find agents corresponding to the specified queue_entry.
348 return list(self._queue_entry_agents.get(queue_entry.id, set()))
351 def host_has_agent(self, host):
353 Determine if there is currently an Agent present using this host.
355 return bool(self._host_agents.get(host.id, None))
358 def remove_agent(self, agent):
359 self._agents.remove(agent)
360 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
361 agent)
362 self._unregister_agent_for_ids(self._queue_entry_agents,
363 agent.queue_entry_ids, agent)
366 def _host_has_scheduled_special_task(self, host):
367 return bool(models.SpecialTask.objects.filter(host__id=host.id,
368 is_active=False,
369 is_complete=False))
372 def _recover_processes(self):
373 agent_tasks = self._create_recovery_agent_tasks()
374 self._register_pidfiles(agent_tasks)
375 _drone_manager.refresh()
376 self._recover_tasks(agent_tasks)
377 self._recover_pending_entries()
378 self._check_for_unrecovered_verifying_entries()
379 self._reverify_remaining_hosts()
380 # reinitialize drones after killing orphaned processes, since they can
381 # leave around files when they die
382 _drone_manager.execute_actions()
383 _drone_manager.reinitialize_drones()
386 def _create_recovery_agent_tasks(self):
387 return (self._get_queue_entry_agent_tasks()
388 + self._get_special_task_agent_tasks(is_active=True))
391 def _get_queue_entry_agent_tasks(self):
392 # host queue entry statuses handled directly by AgentTasks (Verifying is
393 # handled through SpecialTasks, so is not listed here)
394 statuses = (models.HostQueueEntry.Status.STARTING,
395 models.HostQueueEntry.Status.RUNNING,
396 models.HostQueueEntry.Status.GATHERING,
397 models.HostQueueEntry.Status.PARSING,
398 models.HostQueueEntry.Status.ARCHIVING)
399 status_list = ','.join("'%s'" % status for status in statuses)
400 queue_entries = scheduler_models.HostQueueEntry.fetch(
401 where='status IN (%s)' % status_list)
403 agent_tasks = []
404 used_queue_entries = set()
405 for entry in queue_entries:
406 if self.get_agents_for_entry(entry):
407 # already being handled
408 continue
409 if entry in used_queue_entries:
410 # already picked up by a synchronous job
411 continue
412 agent_task = self._get_agent_task_for_queue_entry(entry)
413 agent_tasks.append(agent_task)
414 used_queue_entries.update(agent_task.queue_entries)
415 return agent_tasks
418 def _get_special_task_agent_tasks(self, is_active=False):
419 special_tasks = models.SpecialTask.objects.filter(
420 is_active=is_active, is_complete=False)
421 return [self._get_agent_task_for_special_task(task)
422 for task in special_tasks]
425 def _get_agent_task_for_queue_entry(self, queue_entry):
427 Construct an AgentTask instance for the given active HostQueueEntry,
428 if one can currently run it.
429 @param queue_entry: a HostQueueEntry
430 @returns an AgentTask to run the queue entry
432 task_entries = queue_entry.job.get_group_entries(queue_entry)
433 self._check_for_duplicate_host_entries(task_entries)
435 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
436 models.HostQueueEntry.Status.RUNNING):
437 if queue_entry.is_hostless():
438 return HostlessQueueTask(queue_entry=queue_entry)
439 return QueueTask(queue_entries=task_entries)
440 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
441 return GatherLogsTask(queue_entries=task_entries)
442 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
443 return FinalReparseTask(queue_entries=task_entries)
444 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
445 return ArchiveResultsTask(queue_entries=task_entries)
447 raise host_scheduler.SchedulerError(
448 '_get_agent_task_for_queue_entry got entry with '
449 'invalid status %s: %s' % (queue_entry.status, queue_entry))
452 def _check_for_duplicate_host_entries(self, task_entries):
453 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
454 models.HostQueueEntry.Status.ARCHIVING)
455 for task_entry in task_entries:
456 using_host = (task_entry.host is not None
457 and task_entry.status not in non_host_statuses)
458 if using_host:
459 self._assert_host_has_no_agent(task_entry)
462 def _assert_host_has_no_agent(self, entry):
464 @param entry: a HostQueueEntry or a SpecialTask
466 if self.host_has_agent(entry.host):
467 agent = tuple(self._host_agents.get(entry.host.id))[0]
468 raise host_scheduler.SchedulerError(
469 'While scheduling %s, host %s already has a host agent %s'
470 % (entry, entry.host, agent.task))
473 def _get_agent_task_for_special_task(self, special_task):
475 Construct an AgentTask class to run the given SpecialTask and add it
476 to this dispatcher.
477 @param special_task: a models.SpecialTask instance
478 @returns an AgentTask to run this SpecialTask
480 self._assert_host_has_no_agent(special_task)
482 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
483 for agent_task_class in special_agent_task_classes:
484 if agent_task_class.TASK_TYPE == special_task.task:
485 return agent_task_class(task=special_task)
487 raise host_scheduler.SchedulerError(
488 'No AgentTask class for task', str(special_task))
491 def _register_pidfiles(self, agent_tasks):
492 for agent_task in agent_tasks:
493 agent_task.register_necessary_pidfiles()
496 def _recover_tasks(self, agent_tasks):
497 orphans = _drone_manager.get_orphaned_autoserv_processes()
499 for agent_task in agent_tasks:
500 agent_task.recover()
501 if agent_task.monitor and agent_task.monitor.has_process():
502 orphans.discard(agent_task.monitor.get_process())
503 self.add_agent_task(agent_task)
505 self._check_for_remaining_orphan_processes(orphans)
508 def _get_unassigned_entries(self, status):
509 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
510 % status):
511 if entry.status == status and not self.get_agents_for_entry(entry):
512 # The status can change during iteration, e.g., if job.run()
513 # sets a group of queue entries to Starting
514 yield entry
517 def _check_for_remaining_orphan_processes(self, orphans):
518 if not orphans:
519 return
520 subject = 'Unrecovered orphan autoserv processes remain'
521 message = '\n'.join(str(process) for process in orphans)
522 email_manager.manager.enqueue_notify_email(subject, message)
524 die_on_orphans = global_config.global_config.get_config_value(
525 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
527 if die_on_orphans:
528 raise RuntimeError(subject + '\n' + message)
531 def _recover_pending_entries(self):
532 for entry in self._get_unassigned_entries(
533 models.HostQueueEntry.Status.PENDING):
534 logging.info('Recovering Pending entry %s', entry)
535 entry.on_pending()
538 def _check_for_unrecovered_verifying_entries(self):
539 queue_entries = scheduler_models.HostQueueEntry.fetch(
540 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
541 unrecovered_hqes = []
542 for queue_entry in queue_entries:
543 special_tasks = models.SpecialTask.objects.filter(
544 task__in=(models.SpecialTask.Task.CLEANUP,
545 models.SpecialTask.Task.VERIFY),
546 queue_entry__id=queue_entry.id,
547 is_complete=False)
548 if special_tasks.count() == 0:
549 unrecovered_hqes.append(queue_entry)
551 if unrecovered_hqes:
552 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
553 raise host_scheduler.SchedulerError(
554 '%d unrecovered verifying host queue entries:\n%s' %
555 (len(unrecovered_hqes), message))
558 def _get_prioritized_special_tasks(self):
560 Returns all queued SpecialTasks prioritized for repair first, then
561 cleanup, then verify.
563 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
564 is_complete=False,
565 host__locked=False)
566 # exclude hosts with active queue entries unless the SpecialTask is for
567 # that queue entry
568 queued_tasks = models.SpecialTask.objects.add_join(
569 queued_tasks, 'afe_host_queue_entries', 'host_id',
570 join_condition='afe_host_queue_entries.active',
571 join_from_key='host_id', force_left_join=True)
572 queued_tasks = queued_tasks.extra(
573 where=['(afe_host_queue_entries.id IS NULL OR '
574 'afe_host_queue_entries.id = '
575 'afe_special_tasks.queue_entry_id)'])
577 # reorder tasks by priority
578 task_priority_order = [models.SpecialTask.Task.REPAIR,
579 models.SpecialTask.Task.CLEANUP,
580 models.SpecialTask.Task.VERIFY]
581 def task_priority_key(task):
582 return task_priority_order.index(task.task)
583 return sorted(queued_tasks, key=task_priority_key)
586 def _schedule_special_tasks(self):
588 Execute queued SpecialTasks that are ready to run on idle hosts.
590 for task in self._get_prioritized_special_tasks():
591 if self.host_has_agent(task.host):
592 continue
593 self.add_agent_task(self._get_agent_task_for_special_task(task))
596 def _reverify_remaining_hosts(self):
597 # recover active hosts that have not yet been recovered, although this
598 # should never happen
599 message = ('Recovering active host %s - this probably indicates a '
600 'scheduler bug')
601 self._reverify_hosts_where(
602 "status IN ('Repairing', 'Verifying', 'Cleaning')",
603 print_message=message)
606 def _reverify_hosts_where(self, where,
607 print_message='Reverifying host %s'):
608 full_where='locked = 0 AND invalid = 0 AND ' + where
609 for host in scheduler_models.Host.fetch(where=full_where):
610 if self.host_has_agent(host):
611 # host has already been recovered in some way
612 continue
613 if self._host_has_scheduled_special_task(host):
614 # host will have a special task scheduled on the next cycle
615 continue
616 if print_message:
617 logging.info(print_message, host.hostname)
618 models.SpecialTask.objects.create(
619 task=models.SpecialTask.Task.CLEANUP,
620 host=models.Host.objects.get(id=host.id))
623 def _recover_hosts(self):
624 # recover "Repair Failed" hosts
625 message = 'Reverifying dead host %s'
626 self._reverify_hosts_where("status = 'Repair Failed'",
627 print_message=message)
631 def _get_pending_queue_entries(self):
632 # prioritize by job priority, then non-metahost over metahost, then FIFO
633 return list(scheduler_models.HostQueueEntry.fetch(
634 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
635 where='NOT complete AND NOT active AND status="Queued"',
636 order_by='afe_jobs.priority DESC, meta_host, job_id'))
639 def _refresh_pending_queue_entries(self):
641 Lookup the pending HostQueueEntries and call our HostScheduler
642 refresh() method given that list. Return the list.
644 @returns A list of pending HostQueueEntries sorted in priority order.
646 queue_entries = self._get_pending_queue_entries()
647 if not queue_entries:
648 return []
650 self._host_scheduler.refresh(queue_entries)
652 return queue_entries
655 def _schedule_atomic_group(self, queue_entry):
657 Schedule the given queue_entry on an atomic group of hosts.
659 Returns immediately if there are insufficient available hosts.
661 Creates new HostQueueEntries based off of queue_entry for the
662 scheduled hosts and starts them all running.
664 # This is a virtual host queue entry representing an entire
665 # atomic group, find a group and schedule their hosts.
666 group_hosts = self._host_scheduler.find_eligible_atomic_group(
667 queue_entry)
668 if not group_hosts:
669 return
671 logging.info('Expanding atomic group entry %s with hosts %s',
672 queue_entry,
673 ', '.join(host.hostname for host in group_hosts))
675 for assigned_host in group_hosts[1:]:
676 # Create a new HQE for every additional assigned_host.
677 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
678 new_hqe.save()
679 new_hqe.set_host(assigned_host)
680 self._run_queue_entry(new_hqe)
682 # The first assigned host uses the original HostQueueEntry
683 queue_entry.set_host(group_hosts[0])
684 self._run_queue_entry(queue_entry)
687 def _schedule_hostless_job(self, queue_entry):
688 self.add_agent_task(HostlessQueueTask(queue_entry))
689 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
692 def _schedule_new_jobs(self):
693 queue_entries = self._refresh_pending_queue_entries()
694 if not queue_entries:
695 return
697 for queue_entry in queue_entries:
698 is_unassigned_atomic_group = (
699 queue_entry.atomic_group_id is not None
700 and queue_entry.host_id is None)
702 if queue_entry.is_hostless():
703 self._schedule_hostless_job(queue_entry)
704 elif is_unassigned_atomic_group:
705 self._schedule_atomic_group(queue_entry)
706 else:
707 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
708 if assigned_host and not self.host_has_agent(assigned_host):
709 assert assigned_host.id == queue_entry.host_id
710 self._run_queue_entry(queue_entry)
713 def _schedule_running_host_queue_entries(self):
714 for agent_task in self._get_queue_entry_agent_tasks():
715 self.add_agent_task(agent_task)
718 def _schedule_delay_tasks(self):
719 for entry in scheduler_models.HostQueueEntry.fetch(
720 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
721 task = entry.job.schedule_delayed_callback_task(entry)
722 if task:
723 self.add_agent_task(task)
726 def _run_queue_entry(self, queue_entry):
727 queue_entry.schedule_pre_job_tasks()
730 def _find_aborting(self):
731 jobs_to_stop = set()
732 for entry in scheduler_models.HostQueueEntry.fetch(
733 where='aborted and not complete'):
734 logging.info('Aborting %s', entry)
735 for agent in self.get_agents_for_entry(entry):
736 agent.abort()
737 entry.abort(self)
738 jobs_to_stop.add(entry.job)
739 for job in jobs_to_stop:
740 job.stop_if_necessary()
743 def _can_start_agent(self, agent, num_started_this_cycle,
744 have_reached_limit):
745 # always allow zero-process agents to run
746 if agent.task.num_processes == 0:
747 return True
748 # don't allow any nonzero-process agents to run after we've reached a
749 # limit (this avoids starvation of many-process agents)
750 if have_reached_limit:
751 return False
752 # total process throttling
753 max_runnable_processes = _drone_manager.max_runnable_processes(
754 agent.task.owner_username,
755 agent.task.get_drone_hostnames_allowed())
756 if agent.task.num_processes > max_runnable_processes:
757 return False
758 # if a single agent exceeds the per-cycle throttling, still allow it to
759 # run when it's the first agent in the cycle
760 if num_started_this_cycle == 0:
761 return True
762 # per-cycle throttling
763 if (num_started_this_cycle + agent.task.num_processes >
764 scheduler_config.config.max_processes_started_per_cycle):
765 return False
766 return True
769 def _handle_agents(self):
770 num_started_this_cycle = 0
771 have_reached_limit = False
772 # iterate over copy, so we can remove agents during iteration
773 for agent in list(self._agents):
774 if not agent.started:
775 if not self._can_start_agent(agent, num_started_this_cycle,
776 have_reached_limit):
777 have_reached_limit = True
778 continue
779 num_started_this_cycle += agent.task.num_processes
780 agent.tick()
781 if agent.is_done():
782 logging.info("agent finished")
783 self.remove_agent(agent)
784 logging.info('%d running processes',
785 _drone_manager.total_running_processes())
788 def _process_recurring_runs(self):
789 recurring_runs = models.RecurringRun.objects.filter(
790 start_date__lte=datetime.datetime.now())
791 for rrun in recurring_runs:
792 # Create job from template
793 job = rrun.job
794 info = rpc_utils.get_job_info(job)
795 options = job.get_object_dict()
797 host_objects = info['hosts']
798 one_time_hosts = info['one_time_hosts']
799 metahost_objects = info['meta_hosts']
800 dependencies = info['dependencies']
801 atomic_group = info['atomic_group']
803 for host in one_time_hosts or []:
804 this_host = models.Host.create_one_time_host(host.hostname)
805 host_objects.append(this_host)
807 try:
808 rpc_utils.create_new_job(owner=rrun.owner.login,
809 options=options,
810 host_objects=host_objects,
811 metahost_objects=metahost_objects,
812 atomic_group=atomic_group)
814 except Exception, ex:
815 logging.exception(ex)
816 #TODO send email
818 if rrun.loop_count == 1:
819 rrun.delete()
820 else:
821 if rrun.loop_count != 0: # if not infinite loop
822 # calculate new start_date
823 difference = datetime.timedelta(seconds=rrun.loop_period)
824 rrun.start_date = rrun.start_date + difference
825 rrun.loop_count -= 1
826 rrun.save()
829 class PidfileRunMonitor(object):
831 Client must call either run() to start a new process or
832 attach_to_existing_process().
835 class _PidfileException(Exception):
837 Raised when there's some unexpected behavior with the pid file, but only
838 used internally (never allowed to escape this class).
842 def __init__(self):
843 self.lost_process = False
844 self._start_time = None
845 self.pidfile_id = None
846 self._state = drone_manager.PidfileContents()
849 def _add_nice_command(self, command, nice_level):
850 if not nice_level:
851 return command
852 return ['nice', '-n', str(nice_level)] + command
855 def _set_start_time(self):
856 self._start_time = time.time()
859 def run(self, command, working_directory, num_processes, nice_level=None,
860 log_file=None, pidfile_name=None, paired_with_pidfile=None,
861 username=None, drone_hostnames_allowed=None):
862 assert command is not None
863 if nice_level is not None:
864 command = ['nice', '-n', str(nice_level)] + command
865 self._set_start_time()
866 self.pidfile_id = _drone_manager.execute_command(
867 command, working_directory, pidfile_name=pidfile_name,
868 num_processes=num_processes, log_file=log_file,
869 paired_with_pidfile=paired_with_pidfile, username=username,
870 drone_hostnames_allowed=drone_hostnames_allowed)
873 def attach_to_existing_process(self, execution_path,
874 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
875 num_processes=None):
876 self._set_start_time()
877 self.pidfile_id = _drone_manager.get_pidfile_id_from(
878 execution_path, pidfile_name=pidfile_name)
879 if num_processes is not None:
880 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
883 def kill(self):
884 if self.has_process():
885 _drone_manager.kill_process(self.get_process())
888 def has_process(self):
889 self._get_pidfile_info()
890 return self._state.process is not None
893 def get_process(self):
894 self._get_pidfile_info()
895 assert self._state.process is not None
896 return self._state.process
899 def _read_pidfile(self, use_second_read=False):
900 assert self.pidfile_id is not None, (
901 'You must call run() or attach_to_existing_process()')
902 contents = _drone_manager.get_pidfile_contents(
903 self.pidfile_id, use_second_read=use_second_read)
904 if contents.is_invalid():
905 self._state = drone_manager.PidfileContents()
906 raise self._PidfileException(contents)
907 self._state = contents
910 def _handle_pidfile_error(self, error, message=''):
911 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
912 self._state.process, self.pidfile_id, message)
913 email_manager.manager.enqueue_notify_email(error, message)
914 self.on_lost_process(self._state.process)
917 def _get_pidfile_info_helper(self):
918 if self.lost_process:
919 return
921 self._read_pidfile()
923 if self._state.process is None:
924 self._handle_no_process()
925 return
927 if self._state.exit_status is None:
928 # double check whether or not autoserv is running
929 if _drone_manager.is_process_running(self._state.process):
930 return
932 # pid but no running process - maybe process *just* exited
933 self._read_pidfile(use_second_read=True)
934 if self._state.exit_status is None:
935 # autoserv exited without writing an exit code
936 # to the pidfile
937 self._handle_pidfile_error(
938 'autoserv died without writing exit code')
941 def _get_pidfile_info(self):
942 """\
943 After completion, self._state will contain:
944 pid=None, exit_status=None if autoserv has not yet run
945 pid!=None, exit_status=None if autoserv is running
946 pid!=None, exit_status!=None if autoserv has completed
948 try:
949 self._get_pidfile_info_helper()
950 except self._PidfileException, exc:
951 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
954 def _handle_no_process(self):
955 """\
956 Called when no pidfile is found or no pid is in the pidfile.
958 message = 'No pid found at %s' % self.pidfile_id
959 if time.time() - self._start_time > _get_pidfile_timeout_secs():
960 email_manager.manager.enqueue_notify_email(
961 'Process has failed to write pidfile', message)
962 self.on_lost_process()
965 def on_lost_process(self, process=None):
966 """\
967 Called when autoserv has exited without writing an exit status,
968 or we've timed out waiting for autoserv to write a pid to the
969 pidfile. In either case, we just return failure and the caller
970 should signal some kind of warning.
972 process is unimportant here, as it shouldn't be used by anyone.
974 self.lost_process = True
975 self._state.process = process
976 self._state.exit_status = 1
977 self._state.num_tests_failed = 0
980 def exit_code(self):
981 self._get_pidfile_info()
982 return self._state.exit_status
985 def num_tests_failed(self):
986 """@returns The number of tests that failed or -1 if unknown."""
987 self._get_pidfile_info()
988 if self._state.num_tests_failed is None:
989 return -1
990 return self._state.num_tests_failed
993 def try_copy_results_on_drone(self, **kwargs):
994 if self.has_process():
995 # copy results logs into the normal place for job results
996 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
999 def try_copy_to_results_repository(self, source, **kwargs):
1000 if self.has_process():
1001 _drone_manager.copy_to_results_repository(self.get_process(),
1002 source, **kwargs)
1005 class Agent(object):
1007 An agent for use by the Dispatcher class to perform a task.
1009 The following methods are required on all task objects:
1010 poll() - Called periodically to let the task check its status and
1011 update its internal state. If the task succeeded.
1012 is_done() - Returns True if the task is finished.
1013 abort() - Called when an abort has been requested. The task must
1014 set its aborted attribute to True if it actually aborted.
1016 The following attributes are required on all task objects:
1017 aborted - bool, True if this task was aborted.
1018 success - bool, True if this task succeeded.
1019 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1020 host_ids - A sequence of Host ids this task represents.
1024 def __init__(self, task):
1026 @param task: A task as described in the class docstring.
1028 self.task = task
1030 # This is filled in by Dispatcher.add_agent()
1031 self.dispatcher = None
1033 self.queue_entry_ids = task.queue_entry_ids
1034 self.host_ids = task.host_ids
1036 self.started = False
1037 self.finished = False
1040 def tick(self):
1041 self.started = True
1042 if not self.finished:
1043 self.task.poll()
1044 if self.task.is_done():
1045 self.finished = True
1048 def is_done(self):
1049 return self.finished
1052 def abort(self):
1053 if self.task:
1054 self.task.abort()
1055 if self.task.aborted:
1056 # tasks can choose to ignore aborts
1057 self.finished = True
1060 class AgentTask(object):
1061 class _NullMonitor(object):
1062 pidfile_id = None
1064 def has_process(self):
1065 return True
1068 def __init__(self, log_file_name=None):
1070 @param log_file_name: (optional) name of file to log command output to
1072 self.done = False
1073 self.started = False
1074 self.success = None
1075 self.aborted = False
1076 self.monitor = None
1077 self.queue_entry_ids = []
1078 self.host_ids = []
1079 self._log_file_name = log_file_name
1082 def _set_ids(self, host=None, queue_entries=None):
1083 if queue_entries and queue_entries != [None]:
1084 self.host_ids = [entry.host.id for entry in queue_entries]
1085 self.queue_entry_ids = [entry.id for entry in queue_entries]
1086 else:
1087 assert host
1088 self.host_ids = [host.id]
1091 def poll(self):
1092 if not self.started:
1093 self.start()
1094 if not self.done:
1095 self.tick()
1098 def tick(self):
1099 assert self.monitor
1100 exit_code = self.monitor.exit_code()
1101 if exit_code is None:
1102 return
1104 success = (exit_code == 0)
1105 self.finished(success)
1108 def is_done(self):
1109 return self.done
1112 def finished(self, success):
1113 if self.done:
1114 assert self.started
1115 return
1116 self.started = True
1117 self.done = True
1118 self.success = success
1119 self.epilog()
1122 def prolog(self):
1124 To be overridden.
1126 assert not self.monitor
1127 self.register_necessary_pidfiles()
1130 def _log_file(self):
1131 if not self._log_file_name:
1132 return None
1133 return os.path.join(self._working_directory(), self._log_file_name)
1136 def cleanup(self):
1137 log_file = self._log_file()
1138 if self.monitor and log_file:
1139 self.monitor.try_copy_to_results_repository(log_file)
1142 def epilog(self):
1144 To be overridden.
1146 self.cleanup()
1147 logging.info("%s finished with success=%s", type(self).__name__,
1148 self.success)
1152 def start(self):
1153 if not self.started:
1154 self.prolog()
1155 self.run()
1157 self.started = True
1160 def abort(self):
1161 if self.monitor:
1162 self.monitor.kill()
1163 self.done = True
1164 self.aborted = True
1165 self.cleanup()
1168 def _get_consistent_execution_path(self, execution_entries):
1169 first_execution_path = execution_entries[0].execution_path()
1170 for execution_entry in execution_entries[1:]:
1171 assert execution_entry.execution_path() == first_execution_path, (
1172 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1173 execution_entry,
1174 first_execution_path,
1175 execution_entries[0]))
1176 return first_execution_path
1179 def _copy_results(self, execution_entries, use_monitor=None):
1181 @param execution_entries: list of objects with execution_path() method
1183 if use_monitor is not None and not use_monitor.has_process():
1184 return
1186 assert len(execution_entries) > 0
1187 if use_monitor is None:
1188 assert self.monitor
1189 use_monitor = self.monitor
1190 assert use_monitor.has_process()
1191 execution_path = self._get_consistent_execution_path(execution_entries)
1192 results_path = execution_path + '/'
1193 use_monitor.try_copy_to_results_repository(results_path)
1196 def _parse_results(self, queue_entries):
1197 for queue_entry in queue_entries:
1198 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1201 def _archive_results(self, queue_entries):
1202 for queue_entry in queue_entries:
1203 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
1206 def _command_line(self):
1208 Return the command line to run. Must be overridden.
1210 raise NotImplementedError
1213 @property
1214 def num_processes(self):
1216 Return the number of processes forked by this AgentTask's process. It
1217 may only be approximate. To be overridden if necessary.
1219 return 1
1222 def _paired_with_monitor(self):
1224 If this AgentTask's process must run on the same machine as some
1225 previous process, this method should be overridden to return a
1226 PidfileRunMonitor for that process.
1228 return self._NullMonitor()
1231 @property
1232 def owner_username(self):
1234 Return login of user responsible for this task. May be None. Must be
1235 overridden.
1237 raise NotImplementedError
1240 def _working_directory(self):
1242 Return the directory where this AgentTask's process executes. Must be
1243 overridden.
1245 raise NotImplementedError
1248 def _pidfile_name(self):
1250 Return the name of the pidfile this AgentTask's process uses. To be
1251 overridden if necessary.
1253 return drone_manager.AUTOSERV_PID_FILE
1256 def _check_paired_results_exist(self):
1257 if not self._paired_with_monitor().has_process():
1258 email_manager.manager.enqueue_notify_email(
1259 'No paired results in task',
1260 'No paired results in task %s at %s'
1261 % (self, self._paired_with_monitor().pidfile_id))
1262 self.finished(False)
1263 return False
1264 return True
1267 def _create_monitor(self):
1268 assert not self.monitor
1269 self.monitor = PidfileRunMonitor()
1272 def run(self):
1273 if not self._check_paired_results_exist():
1274 return
1276 self._create_monitor()
1277 self.monitor.run(
1278 self._command_line(), self._working_directory(),
1279 num_processes=self.num_processes,
1280 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1281 pidfile_name=self._pidfile_name(),
1282 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1283 username=self.owner_username,
1284 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1287 def get_drone_hostnames_allowed(self):
1288 if not models.DroneSet.drone_sets_enabled():
1289 return None
1291 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1292 if not hqes:
1293 # Only special tasks could be missing host queue entries
1294 assert isinstance(self, SpecialAgentTask)
1295 return self._user_or_global_default_drone_set(
1296 self.task, self.task.requested_by)
1298 job_ids = hqes.values_list('job', flat=True).distinct()
1299 assert job_ids.count() == 1, ("AgentTask's queue entries "
1300 "span multiple jobs")
1302 job = models.Job.objects.get(id=job_ids[0])
1303 drone_set = job.drone_set
1304 if not drone_set:
1305 return self._user_or_global_default_drone_set(job, job.user())
1307 return drone_set.get_drone_hostnames()
1310 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1312 Returns the user's default drone set, if present.
1314 Otherwise, returns the global default drone set.
1316 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1317 if not user:
1318 logging.warn('%s had no owner; using default drone set',
1319 obj_with_owner)
1320 return default_hostnames
1321 if not user.drone_set:
1322 logging.warn('User %s has no default drone set, using global '
1323 'default', user.login)
1324 return default_hostnames
1325 return user.drone_set.get_drone_hostnames()
1328 def register_necessary_pidfiles(self):
1329 pidfile_id = _drone_manager.get_pidfile_id_from(
1330 self._working_directory(), self._pidfile_name())
1331 _drone_manager.register_pidfile(pidfile_id)
1333 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1334 if paired_pidfile_id:
1335 _drone_manager.register_pidfile(paired_pidfile_id)
1338 def recover(self):
1339 if not self._check_paired_results_exist():
1340 return
1342 self._create_monitor()
1343 self.monitor.attach_to_existing_process(
1344 self._working_directory(), pidfile_name=self._pidfile_name(),
1345 num_processes=self.num_processes)
1346 if not self.monitor.has_process():
1347 # no process to recover; wait to be started normally
1348 self.monitor = None
1349 return
1351 self.started = True
1352 logging.info('Recovering process %s for %s at %s'
1353 % (self.monitor.get_process(), type(self).__name__,
1354 self._working_directory()))
1357 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1358 allowed_host_statuses=None):
1359 class_name = self.__class__.__name__
1360 for entry in queue_entries:
1361 if entry.status not in allowed_hqe_statuses:
1362 raise host_scheduler.SchedulerError(
1363 '%s attempting to start entry with invalid status %s: '
1364 '%s' % (class_name, entry.status, entry))
1365 invalid_host_status = (
1366 allowed_host_statuses is not None
1367 and entry.host.status not in allowed_host_statuses)
1368 if invalid_host_status:
1369 raise host_scheduler.SchedulerError(
1370 '%s attempting to start on queue entry with invalid '
1371 'host status %s: %s'
1372 % (class_name, entry.host.status, entry))
1375 class TaskWithJobKeyvals(object):
1376 """AgentTask mixin providing functionality to help with job keyval files."""
1377 _KEYVAL_FILE = 'keyval'
1378 def _format_keyval(self, key, value):
1379 return '%s=%s' % (key, value)
1382 def _keyval_path(self):
1383 """Subclasses must override this"""
1384 raise NotImplementedError
1387 def _write_keyval_after_job(self, field, value):
1388 assert self.monitor
1389 if not self.monitor.has_process():
1390 return
1391 _drone_manager.write_lines_to_file(
1392 self._keyval_path(), [self._format_keyval(field, value)],
1393 paired_with_process=self.monitor.get_process())
1396 def _job_queued_keyval(self, job):
1397 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1400 def _write_job_finished(self):
1401 self._write_keyval_after_job("job_finished", int(time.time()))
1404 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1405 keyval_contents = '\n'.join(self._format_keyval(key, value)
1406 for key, value in keyval_dict.iteritems())
1407 # always end with a newline to allow additional keyvals to be written
1408 keyval_contents += '\n'
1409 _drone_manager.attach_file_to_execution(self._working_directory(),
1410 keyval_contents,
1411 file_path=keyval_path)
1414 def _write_keyvals_before_job(self, keyval_dict):
1415 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1418 def _write_host_keyvals(self, host):
1419 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
1420 host.hostname)
1421 platform, all_labels = host.platform_and_labels()
1422 all_labels = [ urllib.quote(label) for label in all_labels ]
1423 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1424 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1427 class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
1429 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1432 TASK_TYPE = None
1433 host = None
1434 queue_entry = None
1436 def __init__(self, task, extra_command_args):
1437 super(SpecialAgentTask, self).__init__()
1439 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
1441 self.host = scheduler_models.Host(id=task.host.id)
1442 self.queue_entry = None
1443 if task.queue_entry:
1444 self.queue_entry = scheduler_models.HostQueueEntry(
1445 id=task.queue_entry.id)
1447 self.task = task
1448 self._extra_command_args = extra_command_args
1451 def _keyval_path(self):
1452 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1455 def _command_line(self):
1456 return _autoserv_command_line(self.host.hostname,
1457 self._extra_command_args,
1458 queue_entry=self.queue_entry)
1461 def _working_directory(self):
1462 return self.task.execution_path()
1465 @property
1466 def owner_username(self):
1467 if self.task.requested_by:
1468 return self.task.requested_by.login
1469 return None
1472 def prolog(self):
1473 super(SpecialAgentTask, self).prolog()
1474 self.task.activate()
1475 self._write_host_keyvals(self.host)
1478 def _fail_queue_entry(self):
1479 assert self.queue_entry
1481 if self.queue_entry.meta_host:
1482 return # don't fail metahost entries, they'll be reassigned
1484 self.queue_entry.update_from_database()
1485 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
1486 return # entry has been aborted
1488 self.queue_entry.set_execution_subdir()
1489 queued_key, queued_time = self._job_queued_keyval(
1490 self.queue_entry.job)
1491 self._write_keyval_after_job(queued_key, queued_time)
1492 self._write_job_finished()
1494 # copy results logs into the normal place for job results
1495 self.monitor.try_copy_results_on_drone(
1496 source_path=self._working_directory() + '/',
1497 destination_path=self.queue_entry.execution_path() + '/')
1499 pidfile_id = _drone_manager.get_pidfile_id_from(
1500 self.queue_entry.execution_path(),
1501 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
1502 _drone_manager.register_pidfile(pidfile_id)
1504 if self.queue_entry.job.parse_failed_repair:
1505 self._parse_results([self.queue_entry])
1506 else:
1507 self._archive_results([self.queue_entry])
1510 def cleanup(self):
1511 super(SpecialAgentTask, self).cleanup()
1513 # We will consider an aborted task to be "Failed"
1514 self.task.finish(bool(self.success))
1516 if self.monitor:
1517 if self.monitor.has_process():
1518 self._copy_results([self.task])
1519 if self.monitor.pidfile_id is not None:
1520 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
1523 class RepairTask(SpecialAgentTask):
1524 TASK_TYPE = models.SpecialTask.Task.REPAIR
1527 def __init__(self, task):
1528 """\
1529 queue_entry: queue entry to mark failed if this repair fails.
1531 protection = host_protections.Protection.get_string(
1532 task.host.protection)
1533 # normalize the protection name
1534 protection = host_protections.Protection.get_attr_name(protection)
1536 super(RepairTask, self).__init__(
1537 task, ['-R', '--host-protection', protection])
1539 # *don't* include the queue entry in IDs -- if the queue entry is
1540 # aborted, we want to leave the repair task running
1541 self._set_ids(host=self.host)
1544 def prolog(self):
1545 super(RepairTask, self).prolog()
1546 logging.info("repair_task starting")
1547 self.host.set_status(models.Host.Status.REPAIRING)
1550 def epilog(self):
1551 super(RepairTask, self).epilog()
1553 if self.success:
1554 self.host.set_status(models.Host.Status.READY)
1555 else:
1556 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1557 if self.queue_entry:
1558 self._fail_queue_entry()
1561 class PreJobTask(SpecialAgentTask):
1562 def _copy_to_results_repository(self):
1563 if not self.queue_entry or self.queue_entry.meta_host:
1564 return
1566 self.queue_entry.set_execution_subdir()
1567 log_name = os.path.basename(self.task.execution_path())
1568 source = os.path.join(self.task.execution_path(), 'debug',
1569 'autoserv.DEBUG')
1570 destination = os.path.join(
1571 self.queue_entry.execution_path(), log_name)
1573 self.monitor.try_copy_to_results_repository(
1574 source, destination_path=destination)
1577 def epilog(self):
1578 super(PreJobTask, self).epilog()
1580 if self.success:
1581 return
1583 self._copy_to_results_repository()
1585 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
1586 # effectively ignore failure for these hosts
1587 self.success = True
1588 return
1590 if self.queue_entry:
1591 self.queue_entry.requeue()
1593 if models.SpecialTask.objects.filter(
1594 task=models.SpecialTask.Task.REPAIR,
1595 queue_entry__id=self.queue_entry.id):
1596 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1597 self._fail_queue_entry()
1598 return
1600 queue_entry = models.HostQueueEntry.objects.get(
1601 id=self.queue_entry.id)
1602 else:
1603 queue_entry = None
1605 models.SpecialTask.objects.create(
1606 host=models.Host.objects.get(id=self.host.id),
1607 task=models.SpecialTask.Task.REPAIR,
1608 queue_entry=queue_entry,
1609 requested_by=self.task.requested_by)
1612 class VerifyTask(PreJobTask):
1613 TASK_TYPE = models.SpecialTask.Task.VERIFY
1616 def __init__(self, task):
1617 super(VerifyTask, self).__init__(task, ['-v'])
1618 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1621 def prolog(self):
1622 super(VerifyTask, self).prolog()
1624 logging.info("starting verify on %s", self.host.hostname)
1625 if self.queue_entry:
1626 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1627 self.host.set_status(models.Host.Status.VERIFYING)
1629 # Delete any queued manual reverifies for this host. One verify will do
1630 # and there's no need to keep records of other requests.
1631 queued_verifies = models.SpecialTask.objects.filter(
1632 host__id=self.host.id,
1633 task=models.SpecialTask.Task.VERIFY,
1634 is_active=False, is_complete=False, queue_entry=None)
1635 queued_verifies = queued_verifies.exclude(id=self.task.id)
1636 queued_verifies.delete()
1639 def epilog(self):
1640 super(VerifyTask, self).epilog()
1641 if self.success:
1642 if self.queue_entry:
1643 self.queue_entry.on_pending()
1644 else:
1645 self.host.set_status(models.Host.Status.READY)
1648 class CleanupTask(PreJobTask):
1649 # note this can also run post-job, but when it does, it's running standalone
1650 # against the host (not related to the job), so it's not considered a
1651 # PostJobTask
1653 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1656 def __init__(self, task, recover_run_monitor=None):
1657 super(CleanupTask, self).__init__(task, ['--cleanup'])
1658 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1661 def prolog(self):
1662 super(CleanupTask, self).prolog()
1663 logging.info("starting cleanup task for host: %s", self.host.hostname)
1664 self.host.set_status(models.Host.Status.CLEANING)
1665 if self.queue_entry:
1666 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1669 def _finish_epilog(self):
1670 if not self.queue_entry or not self.success:
1671 return
1673 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1674 should_run_verify = (
1675 self.queue_entry.job.run_verify
1676 and self.host.protection != do_not_verify_protection)
1677 if should_run_verify:
1678 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1679 models.SpecialTask.objects.create(
1680 host=models.Host.objects.get(id=self.host.id),
1681 queue_entry=entry,
1682 task=models.SpecialTask.Task.VERIFY)
1683 else:
1684 self.queue_entry.on_pending()
1687 def epilog(self):
1688 super(CleanupTask, self).epilog()
1690 if self.success:
1691 self.host.update_field('dirty', 0)
1692 self.host.set_status(models.Host.Status.READY)
1694 self._finish_epilog()
1697 class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1699 Common functionality for QueueTask and HostlessQueueTask
1701 def __init__(self, queue_entries):
1702 super(AbstractQueueTask, self).__init__()
1703 self.job = queue_entries[0].job
1704 self.queue_entries = queue_entries
1707 def _keyval_path(self):
1708 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1711 def _write_control_file(self, execution_path):
1712 control_path = _drone_manager.attach_file_to_execution(
1713 execution_path, self.job.control_file)
1714 return control_path
1717 def _command_line(self):
1718 execution_path = self.queue_entries[0].execution_path()
1719 control_path = self._write_control_file(execution_path)
1720 hostnames = ','.join(entry.host.hostname
1721 for entry in self.queue_entries
1722 if not entry.is_hostless())
1724 execution_tag = self.queue_entries[0].execution_tag()
1725 params = _autoserv_command_line(
1726 hostnames,
1727 ['-P', execution_tag, '-n',
1728 _drone_manager.absolute_path(control_path)],
1729 job=self.job, verbose=False)
1731 if not self.job.is_server_job():
1732 params.append('-c')
1734 return params
1737 @property
1738 def num_processes(self):
1739 return len(self.queue_entries)
1742 @property
1743 def owner_username(self):
1744 return self.job.owner
1747 def _working_directory(self):
1748 return self._get_consistent_execution_path(self.queue_entries)
1751 def prolog(self):
1752 queued_key, queued_time = self._job_queued_keyval(self.job)
1753 keyval_dict = self.job.keyval_dict()
1754 keyval_dict[queued_key] = queued_time
1755 group_name = self.queue_entries[0].get_group_name()
1756 if group_name:
1757 keyval_dict['host_group_name'] = group_name
1758 self._write_keyvals_before_job(keyval_dict)
1759 for queue_entry in self.queue_entries:
1760 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
1761 queue_entry.set_started_on_now()
1764 def _write_lost_process_error_file(self):
1765 error_file_path = os.path.join(self._working_directory(), 'job_failure')
1766 _drone_manager.write_lines_to_file(error_file_path,
1767 [_LOST_PROCESS_ERROR])
1770 def _finish_task(self):
1771 if not self.monitor:
1772 return
1774 self._write_job_finished()
1776 if self.monitor.lost_process:
1777 self._write_lost_process_error_file()
1780 def _write_status_comment(self, comment):
1781 _drone_manager.write_lines_to_file(
1782 os.path.join(self._working_directory(), 'status.log'),
1783 ['INFO\t----\t----\t' + comment],
1784 paired_with_process=self.monitor.get_process())
1787 def _log_abort(self):
1788 if not self.monitor or not self.monitor.has_process():
1789 return
1791 # build up sets of all the aborted_by and aborted_on values
1792 aborted_by, aborted_on = set(), set()
1793 for queue_entry in self.queue_entries:
1794 if queue_entry.aborted_by:
1795 aborted_by.add(queue_entry.aborted_by)
1796 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1797 aborted_on.add(t)
1799 # extract some actual, unique aborted by value and write it out
1800 # TODO(showard): this conditional is now obsolete, we just need to leave
1801 # it in temporarily for backwards compatibility over upgrades. delete
1802 # soon.
1803 assert len(aborted_by) <= 1
1804 if len(aborted_by) == 1:
1805 aborted_by_value = aborted_by.pop()
1806 aborted_on_value = max(aborted_on)
1807 else:
1808 aborted_by_value = 'autotest_system'
1809 aborted_on_value = int(time.time())
1811 self._write_keyval_after_job("aborted_by", aborted_by_value)
1812 self._write_keyval_after_job("aborted_on", aborted_on_value)
1814 aborted_on_string = str(datetime.datetime.fromtimestamp(
1815 aborted_on_value))
1816 self._write_status_comment('Job aborted by %s on %s' %
1817 (aborted_by_value, aborted_on_string))
1820 def abort(self):
1821 super(AbstractQueueTask, self).abort()
1822 self._log_abort()
1823 self._finish_task()
1826 def epilog(self):
1827 super(AbstractQueueTask, self).epilog()
1828 self._finish_task()
1831 class QueueTask(AbstractQueueTask):
1832 def __init__(self, queue_entries):
1833 super(QueueTask, self).__init__(queue_entries)
1834 self._set_ids(queue_entries=queue_entries)
1837 def prolog(self):
1838 self._check_queue_entry_statuses(
1839 self.queue_entries,
1840 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1841 models.HostQueueEntry.Status.RUNNING),
1842 allowed_host_statuses=(models.Host.Status.PENDING,
1843 models.Host.Status.RUNNING))
1845 super(QueueTask, self).prolog()
1847 for queue_entry in self.queue_entries:
1848 self._write_host_keyvals(queue_entry.host)
1849 queue_entry.host.set_status(models.Host.Status.RUNNING)
1850 queue_entry.host.update_field('dirty', 1)
1851 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1852 # TODO(gps): Remove this if nothing needs it anymore.
1853 # A potential user is: tko/parser
1854 self.job.write_to_machines_file(self.queue_entries[0])
1857 def _finish_task(self):
1858 super(QueueTask, self)._finish_task()
1860 for queue_entry in self.queue_entries:
1861 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
1862 queue_entry.host.set_status(models.Host.Status.RUNNING)
1865 class HostlessQueueTask(AbstractQueueTask):
1866 def __init__(self, queue_entry):
1867 super(HostlessQueueTask, self).__init__([queue_entry])
1868 self.queue_entry_ids = [queue_entry.id]
1871 def prolog(self):
1872 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1873 super(HostlessQueueTask, self).prolog()
1876 def _finish_task(self):
1877 super(HostlessQueueTask, self)._finish_task()
1878 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
1881 class PostJobTask(AgentTask):
1882 def __init__(self, queue_entries, log_file_name):
1883 super(PostJobTask, self).__init__(log_file_name=log_file_name)
1885 self.queue_entries = queue_entries
1887 self._autoserv_monitor = PidfileRunMonitor()
1888 self._autoserv_monitor.attach_to_existing_process(
1889 self._working_directory())
1892 def _command_line(self):
1893 if _testing_mode:
1894 return 'true'
1895 return self._generate_command(
1896 _drone_manager.absolute_path(self._working_directory()))
1899 def _generate_command(self, results_dir):
1900 raise NotImplementedError('Subclasses must override this')
1903 @property
1904 def owner_username(self):
1905 return self.queue_entries[0].job.owner
1908 def _working_directory(self):
1909 return self._get_consistent_execution_path(self.queue_entries)
1912 def _paired_with_monitor(self):
1913 return self._autoserv_monitor
1916 def _job_was_aborted(self):
1917 was_aborted = None
1918 for queue_entry in self.queue_entries:
1919 queue_entry.update_from_database()
1920 if was_aborted is None: # first queue entry
1921 was_aborted = bool(queue_entry.aborted)
1922 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1923 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1924 for entry in self.queue_entries]
1925 email_manager.manager.enqueue_notify_email(
1926 'Inconsistent abort state',
1927 'Queue entries have inconsistent abort state:\n' +
1928 '\n'.join(entries))
1929 # don't crash here, just assume true
1930 return True
1931 return was_aborted
1934 def _final_status(self):
1935 if self._job_was_aborted():
1936 return models.HostQueueEntry.Status.ABORTED
1938 # we'll use a PidfileRunMonitor to read the autoserv exit status
1939 if self._autoserv_monitor.exit_code() == 0:
1940 return models.HostQueueEntry.Status.COMPLETED
1941 return models.HostQueueEntry.Status.FAILED
1944 def _set_all_statuses(self, status):
1945 for queue_entry in self.queue_entries:
1946 queue_entry.set_status(status)
1949 def abort(self):
1950 # override AgentTask.abort() to avoid killing the process and ending
1951 # the task. post-job tasks continue when the job is aborted.
1952 pass
1955 def _pidfile_label(self):
1956 # '.autoserv_execute' -> 'autoserv'
1957 return self._pidfile_name()[1:-len('_execute')]
1960 class GatherLogsTask(PostJobTask):
1962 Task responsible for
1963 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1964 * copying logs to the results repository
1965 * spawning CleanupTasks for hosts, if necessary
1966 * spawning a FinalReparseTask for the job
1968 def __init__(self, queue_entries, recover_run_monitor=None):
1969 self._job = queue_entries[0].job
1970 super(GatherLogsTask, self).__init__(
1971 queue_entries, log_file_name='.collect_crashinfo.log')
1972 self._set_ids(queue_entries=queue_entries)
1975 def _generate_command(self, results_dir):
1976 host_list = ','.join(queue_entry.host.hostname
1977 for queue_entry in self.queue_entries)
1978 return [_autoserv_path , '-p',
1979 '--pidfile-label=%s' % self._pidfile_label(),
1980 '--use-existing-results', '--collect-crashinfo',
1981 '-m', host_list, '-r', results_dir]
1984 @property
1985 def num_processes(self):
1986 return len(self.queue_entries)
1989 def _pidfile_name(self):
1990 return drone_manager.CRASHINFO_PID_FILE
1993 def prolog(self):
1994 self._check_queue_entry_statuses(
1995 self.queue_entries,
1996 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
1997 allowed_host_statuses=(models.Host.Status.RUNNING,))
1999 super(GatherLogsTask, self).prolog()
2002 def epilog(self):
2003 super(GatherLogsTask, self).epilog()
2004 self._parse_results(self.queue_entries)
2005 self._reboot_hosts()
2008 def _reboot_hosts(self):
2009 if self._autoserv_monitor.has_process():
2010 final_success = (self._final_status() ==
2011 models.HostQueueEntry.Status.COMPLETED)
2012 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2013 else:
2014 final_success = False
2015 num_tests_failed = 0
2017 reboot_after = self._job.reboot_after
2018 do_reboot = (
2019 # always reboot after aborted jobs
2020 self._final_status() == models.HostQueueEntry.Status.ABORTED
2021 or reboot_after == model_attributes.RebootAfter.ALWAYS
2022 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
2023 and final_success and num_tests_failed == 0))
2025 for queue_entry in self.queue_entries:
2026 if do_reboot:
2027 # don't pass the queue entry to the CleanupTask. if the cleanup
2028 # fails, the job doesn't care -- it's over.
2029 models.SpecialTask.objects.create(
2030 host=models.Host.objects.get(id=queue_entry.host.id),
2031 task=models.SpecialTask.Task.CLEANUP,
2032 requested_by=self._job.owner_model())
2033 else:
2034 queue_entry.host.set_status(models.Host.Status.READY)
2037 def run(self):
2038 autoserv_exit_code = self._autoserv_monitor.exit_code()
2039 # only run if Autoserv exited due to some signal. if we have no exit
2040 # code, assume something bad (and signal-like) happened.
2041 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
2042 super(GatherLogsTask, self).run()
2043 else:
2044 self.finished(True)
2047 class SelfThrottledPostJobTask(PostJobTask):
2049 Special AgentTask subclass that maintains its own global process limit.
2051 _num_running_processes = 0
2054 @classmethod
2055 def _increment_running_processes(cls):
2056 cls._num_running_processes += 1
2059 @classmethod
2060 def _decrement_running_processes(cls):
2061 cls._num_running_processes -= 1
2064 @classmethod
2065 def _max_processes(cls):
2066 raise NotImplementedError
2069 @classmethod
2070 def _can_run_new_process(cls):
2071 return cls._num_running_processes < cls._max_processes()
2074 def _process_started(self):
2075 return bool(self.monitor)
2078 def tick(self):
2079 # override tick to keep trying to start until the process count goes
2080 # down and we can, at which point we revert to default behavior
2081 if self._process_started():
2082 super(SelfThrottledPostJobTask, self).tick()
2083 else:
2084 self._try_starting_process()
2087 def run(self):
2088 # override run() to not actually run unless we can
2089 self._try_starting_process()
2092 def _try_starting_process(self):
2093 if not self._can_run_new_process():
2094 return
2096 # actually run the command
2097 super(SelfThrottledPostJobTask, self).run()
2098 if self._process_started():
2099 self._increment_running_processes()
2102 def finished(self, success):
2103 super(SelfThrottledPostJobTask, self).finished(success)
2104 if self._process_started():
2105 self._decrement_running_processes()
2108 class FinalReparseTask(SelfThrottledPostJobTask):
2109 def __init__(self, queue_entries):
2110 super(FinalReparseTask, self).__init__(queue_entries,
2111 log_file_name='.parse.log')
2112 # don't use _set_ids, since we don't want to set the host_ids
2113 self.queue_entry_ids = [entry.id for entry in queue_entries]
2116 def _generate_command(self, results_dir):
2117 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
2118 results_dir]
2121 @property
2122 def num_processes(self):
2123 return 0 # don't include parser processes in accounting
2126 def _pidfile_name(self):
2127 return drone_manager.PARSER_PID_FILE
2130 @classmethod
2131 def _max_processes(cls):
2132 return scheduler_config.config.max_parse_processes
2135 def prolog(self):
2136 self._check_queue_entry_statuses(
2137 self.queue_entries,
2138 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
2140 super(FinalReparseTask, self).prolog()
2143 def epilog(self):
2144 super(FinalReparseTask, self).epilog()
2145 self._archive_results(self.queue_entries)
2148 class ArchiveResultsTask(SelfThrottledPostJobTask):
2149 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2151 def __init__(self, queue_entries):
2152 super(ArchiveResultsTask, self).__init__(queue_entries,
2153 log_file_name='.archiving.log')
2154 # don't use _set_ids, since we don't want to set the host_ids
2155 self.queue_entry_ids = [entry.id for entry in queue_entries]
2158 def _pidfile_name(self):
2159 return drone_manager.ARCHIVER_PID_FILE
2162 def _generate_command(self, results_dir):
2163 return [_autoserv_path , '-p',
2164 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
2165 '--use-existing-results', '--control-filename=control.archive',
2166 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2167 'archive_results.control.srv')]
2170 @classmethod
2171 def _max_processes(cls):
2172 return scheduler_config.config.max_transfer_processes
2175 def prolog(self):
2176 self._check_queue_entry_statuses(
2177 self.queue_entries,
2178 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2180 super(ArchiveResultsTask, self).prolog()
2183 def epilog(self):
2184 super(ArchiveResultsTask, self).epilog()
2185 if not self.success and self._paired_with_monitor().has_process():
2186 failed_file = os.path.join(self._working_directory(),
2187 self._ARCHIVING_FAILED_FILE)
2188 paired_process = self._paired_with_monitor().get_process()
2189 _drone_manager.write_lines_to_file(
2190 failed_file, ['Archiving failed with exit code %s'
2191 % self.monitor.exit_code()],
2192 paired_with_process=paired_process)
2193 self._set_all_statuses(self._final_status())
2196 if __name__ == '__main__':
2197 main()