9 import datetime
, errno
, optparse
, os
, pwd
, Queue
, re
, shutil
, signal
10 import smtplib
, socket
, stat
, subprocess
, sys
, tempfile
, time
, traceback
, urllib
11 import itertools
, logging
, weakref
, gc
15 from autotest_lib
.scheduler
import scheduler_logging_config
16 from autotest_lib
.frontend
import setup_django_environment
20 from autotest_lib
.client
.common_lib
import global_config
, logging_manager
21 from autotest_lib
.client
.common_lib
import host_protections
, utils
22 from autotest_lib
.database
import database_connection
23 from autotest_lib
.frontend
.afe
import models
, rpc_utils
, readonly_connection
24 from autotest_lib
.frontend
.afe
import model_attributes
25 from autotest_lib
.scheduler
import drone_manager
, drones
, email_manager
26 from autotest_lib
.scheduler
import gc_stats
, host_scheduler
, monitor_db_cleanup
27 from autotest_lib
.scheduler
import status_server
, scheduler_config
28 from autotest_lib
.scheduler
import scheduler_models
29 BABYSITTER_PID_FILE_PREFIX
= 'monitor_db_babysitter'
30 PID_FILE_PREFIX
= 'monitor_db'
33 AUTOSERV_NICE_LEVEL
= 10
34 DB_CONFIG_SECTION
= 'AUTOTEST_WEB'
35 AUTOTEST_PATH
= os
.path
.join(os
.path
.dirname(__file__
), '..')
37 if os
.environ
.has_key('AUTOTEST_DIR'):
38 AUTOTEST_PATH
= os
.environ
['AUTOTEST_DIR']
39 AUTOTEST_SERVER_DIR
= os
.path
.join(AUTOTEST_PATH
, 'server')
40 AUTOTEST_TKO_DIR
= os
.path
.join(AUTOTEST_PATH
, 'tko')
42 if AUTOTEST_SERVER_DIR
not in sys
.path
:
43 sys
.path
.insert(0, AUTOTEST_SERVER_DIR
)
45 # error message to leave in results dir when an autoserv process disappears
47 _LOST_PROCESS_ERROR
= """\
48 Autoserv failed abnormally during execution for this job, probably due to a
49 system error on the Autotest server. Full results may not be available. Sorry.
54 _autoserv_path
= os
.path
.join(drones
.AUTOTEST_INSTALL_DIR
, 'server', 'autoserv')
58 def _parser_path_default(install_dir
):
59 return os
.path
.join(install_dir
, 'tko', 'parse')
60 _parser_path_func
= utils
.import_site_function(
61 __file__
, 'autotest_lib.scheduler.site_monitor_db',
62 'parser_path', _parser_path_default
)
63 _parser_path
= _parser_path_func(drones
.AUTOTEST_INSTALL_DIR
)
66 def _get_pidfile_timeout_secs():
67 """@returns How long to wait for autoserv to write pidfile."""
68 pidfile_timeout_mins
= global_config
.global_config
.get_config_value(
69 scheduler_config
.CONFIG_SECTION
, 'pidfile_timeout_mins', type=int)
70 return pidfile_timeout_mins
* 60
73 def _site_init_monitor_db_dummy():
77 def _verify_default_drone_set_exists():
78 if (models
.DroneSet
.drone_sets_enabled() and
79 not models
.DroneSet
.default_drone_set_name()):
80 raise host_scheduler
.SchedulerError(
81 'Drone sets are enabled, but no default is set')
85 """Make sure the configs are consistent before starting the scheduler"""
86 _verify_default_drone_set_exists()
92 main_without_exception_handling()
96 logging
.exception('Exception escaping in monitor_db')
99 utils
.delete_pid_file_if_exists(PID_FILE_PREFIX
)
102 def main_without_exception_handling():
105 usage
= 'usage: %prog [options] results_dir'
106 parser
= optparse
.OptionParser(usage
)
107 parser
.add_option('--recover-hosts', help='Try to recover dead hosts',
109 parser
.add_option('--test', help='Indicate that scheduler is under ' +
110 'test and should use dummy autoserv and no parsing',
112 (options
, args
) = parser
.parse_args()
117 scheduler_enabled
= global_config
.global_config
.get_config_value(
118 scheduler_config
.CONFIG_SECTION
, 'enable_scheduler', type=bool)
120 if not scheduler_enabled
:
121 msg
= ("Scheduler not enabled, set enable_scheduler to true in the "
122 "global_config's SCHEDULER section to enabled it. Exiting.")
127 RESULTS_DIR
= args
[0]
129 site_init
= utils
.import_site_function(__file__
,
130 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
131 _site_init_monitor_db_dummy
)
134 # Change the cwd while running to avoid issues incase we were launched from
135 # somewhere odd (such as a random NFS home directory of the person running
136 # sudo to launch us as the appropriate user).
137 os
.chdir(RESULTS_DIR
)
139 # This is helpful for debugging why stuff a scheduler launches is
141 logging
.info('os.environ: %s', os
.environ
)
144 global _autoserv_path
145 _autoserv_path
= 'autoserv_dummy'
149 server
= status_server
.StatusServer()
154 dispatcher
= Dispatcher()
155 dispatcher
.initialize(recover_hosts
=options
.recover_hosts
)
157 while not _shutdown
and not server
._shutdown
_scheduler
:
159 time
.sleep(scheduler_config
.config
.tick_pause_sec
)
161 email_manager
.manager
.log_stacktrace(
162 "Uncaught exception; terminating monitor_db")
164 email_manager
.manager
.send_queued_emails()
166 _drone_manager
.shutdown()
171 log_dir
= os
.environ
.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
172 log_name
= os
.environ
.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
173 logging_manager
.configure_logging(
174 scheduler_logging_config
.SchedulerLoggingConfig(), log_dir
=log_dir
,
175 logfile_name
=log_name
)
178 def handle_sigint(signum
, frame
):
181 logging
.info("Shutdown request received.")
185 logging
.info("%s> dispatcher starting", time
.strftime("%X %x"))
186 logging
.info("My PID is %d", os
.getpid())
188 if utils
.program_is_alive(PID_FILE_PREFIX
):
189 logging
.critical("monitor_db already running, aborting!")
191 utils
.write_pid(PID_FILE_PREFIX
)
194 global_config
.global_config
.override_config_value(
195 DB_CONFIG_SECTION
, 'database', 'stresstest_autotest_web')
197 os
.environ
['PATH'] = AUTOTEST_SERVER_DIR
+ ':' + os
.environ
['PATH']
199 _db
= database_connection
.DatabaseConnection(DB_CONFIG_SECTION
)
200 _db
.connect(db_type
='django')
202 # ensure Django connection is in autocommit
203 setup_django_environment
.enable_autocommit()
204 # bypass the readonly connection
205 readonly_connection
.ReadOnlyConnection
.set_globally_disabled(True)
207 logging
.info("Setting signal handler")
208 signal
.signal(signal
.SIGINT
, handle_sigint
)
211 scheduler_models
.initialize()
213 drones
= global_config
.global_config
.get_config_value(
214 scheduler_config
.CONFIG_SECTION
, 'drones', default
='localhost')
215 drone_list
= [hostname
.strip() for hostname
in drones
.split(',')]
216 results_host
= global_config
.global_config
.get_config_value(
217 scheduler_config
.CONFIG_SECTION
, 'results_host', default
='localhost')
218 _drone_manager
.initialize(RESULTS_DIR
, drone_list
, results_host
)
220 logging
.info("Connected! Running...")
223 def initialize_globals():
224 global _drone_manager
225 _drone_manager
= drone_manager
.instance()
228 def _autoserv_command_line(machines
, extra_args
, job
=None, queue_entry
=None,
231 @returns The autoserv command line as a list of executable + parameters.
233 @param machines - string - A machine or comma separated list of machines
235 @param extra_args - list - Additional arguments to pass to autoserv.
236 @param job - Job object - If supplied, -u owner and -l name parameters
238 @param queue_entry - A HostQueueEntry object - If supplied and no Job
239 object was supplied, this will be used to lookup the Job object.
241 autoserv_argv
= [_autoserv_path
, '-p',
242 '-r', drone_manager
.WORKING_DIRECTORY
]
244 autoserv_argv
+= ['-m', machines
]
245 if job
or queue_entry
:
247 job
= queue_entry
.job
248 autoserv_argv
+= ['-u', job
.owner
, '-l', job
.name
]
250 autoserv_argv
.append('--verbose')
251 return autoserv_argv
+ extra_args
254 class Dispatcher(object):
257 self
._last
_clean
_time
= time
.time()
258 self
._host
_scheduler
= host_scheduler
.HostScheduler(_db
)
259 user_cleanup_time
= scheduler_config
.config
.clean_interval
260 self
._periodic
_cleanup
= monitor_db_cleanup
.UserCleanup(
261 _db
, user_cleanup_time
)
262 self
._24hr
_upkeep
= monitor_db_cleanup
.TwentyFourHourUpkeep(_db
)
263 self
._host
_agents
= {}
264 self
._queue
_entry
_agents
= {}
266 self
._last
_garbage
_stats
_time
= time
.time()
267 self
._seconds
_between
_garbage
_stats
= 60 * (
268 global_config
.global_config
.get_config_value(
269 scheduler_config
.CONFIG_SECTION
,
270 'gc_stats_interval_mins', type=int, default
=6*60))
273 def initialize(self
, recover_hosts
=True):
274 self
._periodic
_cleanup
.initialize()
275 self
._24hr
_upkeep
.initialize()
277 # always recover processes
278 self
._recover
_processes
()
281 self
._recover
_hosts
()
283 self
._host
_scheduler
.recovery_on_startup()
287 self
._garbage
_collection
()
288 _drone_manager
.refresh()
290 self
._find
_aborting
()
291 self
._process
_recurring
_runs
()
292 self
._schedule
_delay
_tasks
()
293 self
._schedule
_running
_host
_queue
_entries
()
294 self
._schedule
_special
_tasks
()
295 self
._schedule
_new
_jobs
()
296 self
._handle
_agents
()
297 self
._host
_scheduler
.tick()
298 _drone_manager
.execute_actions()
299 email_manager
.manager
.send_queued_emails()
300 django
.db
.reset_queries()
301 self
._tick
_count
+= 1
304 def _run_cleanup(self
):
305 self
._periodic
_cleanup
.run_cleanup_maybe()
306 self
._24hr
_upkeep
.run_cleanup_maybe()
309 def _garbage_collection(self
):
310 threshold_time
= time
.time() - self
._seconds
_between
_garbage
_stats
311 if threshold_time
< self
._last
_garbage
_stats
_time
:
312 # Don't generate these reports very often.
315 self
._last
_garbage
_stats
_time
= time
.time()
316 # Force a full level 0 collection (because we can, it doesn't hurt
319 logging
.info('Logging garbage collector stats on tick %d.',
321 gc_stats
._log
_garbage
_collector
_stats
()
324 def _register_agent_for_ids(self
, agent_dict
, object_ids
, agent
):
325 for object_id
in object_ids
:
326 agent_dict
.setdefault(object_id
, set()).add(agent
)
329 def _unregister_agent_for_ids(self
, agent_dict
, object_ids
, agent
):
330 for object_id
in object_ids
:
331 assert object_id
in agent_dict
332 agent_dict
[object_id
].remove(agent
)
335 def add_agent_task(self
, agent_task
):
336 agent
= Agent(agent_task
)
337 self
._agents
.append(agent
)
338 agent
.dispatcher
= self
339 self
._register
_agent
_for
_ids
(self
._host
_agents
, agent
.host_ids
, agent
)
340 self
._register
_agent
_for
_ids
(self
._queue
_entry
_agents
,
341 agent
.queue_entry_ids
, agent
)
344 def get_agents_for_entry(self
, queue_entry
):
346 Find agents corresponding to the specified queue_entry.
348 return list(self
._queue
_entry
_agents
.get(queue_entry
.id, set()))
351 def host_has_agent(self
, host
):
353 Determine if there is currently an Agent present using this host.
355 return bool(self
._host
_agents
.get(host
.id, None))
358 def remove_agent(self
, agent
):
359 self
._agents
.remove(agent
)
360 self
._unregister
_agent
_for
_ids
(self
._host
_agents
, agent
.host_ids
,
362 self
._unregister
_agent
_for
_ids
(self
._queue
_entry
_agents
,
363 agent
.queue_entry_ids
, agent
)
366 def _host_has_scheduled_special_task(self
, host
):
367 return bool(models
.SpecialTask
.objects
.filter(host__id
=host
.id,
372 def _recover_processes(self
):
373 agent_tasks
= self
._create
_recovery
_agent
_tasks
()
374 self
._register
_pidfiles
(agent_tasks
)
375 _drone_manager
.refresh()
376 self
._recover
_tasks
(agent_tasks
)
377 self
._recover
_pending
_entries
()
378 self
._check
_for
_unrecovered
_verifying
_entries
()
379 self
._reverify
_remaining
_hosts
()
380 # reinitialize drones after killing orphaned processes, since they can
381 # leave around files when they die
382 _drone_manager
.execute_actions()
383 _drone_manager
.reinitialize_drones()
386 def _create_recovery_agent_tasks(self
):
387 return (self
._get
_queue
_entry
_agent
_tasks
()
388 + self
._get
_special
_task
_agent
_tasks
(is_active
=True))
391 def _get_queue_entry_agent_tasks(self
):
392 # host queue entry statuses handled directly by AgentTasks (Verifying is
393 # handled through SpecialTasks, so is not listed here)
394 statuses
= (models
.HostQueueEntry
.Status
.STARTING
,
395 models
.HostQueueEntry
.Status
.RUNNING
,
396 models
.HostQueueEntry
.Status
.GATHERING
,
397 models
.HostQueueEntry
.Status
.PARSING
,
398 models
.HostQueueEntry
.Status
.ARCHIVING
)
399 status_list
= ','.join("'%s'" % status
for status
in statuses
)
400 queue_entries
= scheduler_models
.HostQueueEntry
.fetch(
401 where
='status IN (%s)' % status_list
)
404 used_queue_entries
= set()
405 for entry
in queue_entries
:
406 if self
.get_agents_for_entry(entry
):
407 # already being handled
409 if entry
in used_queue_entries
:
410 # already picked up by a synchronous job
412 agent_task
= self
._get
_agent
_task
_for
_queue
_entry
(entry
)
413 agent_tasks
.append(agent_task
)
414 used_queue_entries
.update(agent_task
.queue_entries
)
418 def _get_special_task_agent_tasks(self
, is_active
=False):
419 special_tasks
= models
.SpecialTask
.objects
.filter(
420 is_active
=is_active
, is_complete
=False)
421 return [self
._get
_agent
_task
_for
_special
_task
(task
)
422 for task
in special_tasks
]
425 def _get_agent_task_for_queue_entry(self
, queue_entry
):
427 Construct an AgentTask instance for the given active HostQueueEntry,
428 if one can currently run it.
429 @param queue_entry: a HostQueueEntry
430 @returns an AgentTask to run the queue entry
432 task_entries
= queue_entry
.job
.get_group_entries(queue_entry
)
433 self
._check
_for
_duplicate
_host
_entries
(task_entries
)
435 if queue_entry
.status
in (models
.HostQueueEntry
.Status
.STARTING
,
436 models
.HostQueueEntry
.Status
.RUNNING
):
437 if queue_entry
.is_hostless():
438 return HostlessQueueTask(queue_entry
=queue_entry
)
439 return QueueTask(queue_entries
=task_entries
)
440 if queue_entry
.status
== models
.HostQueueEntry
.Status
.GATHERING
:
441 return GatherLogsTask(queue_entries
=task_entries
)
442 if queue_entry
.status
== models
.HostQueueEntry
.Status
.PARSING
:
443 return FinalReparseTask(queue_entries
=task_entries
)
444 if queue_entry
.status
== models
.HostQueueEntry
.Status
.ARCHIVING
:
445 return ArchiveResultsTask(queue_entries
=task_entries
)
447 raise host_scheduler
.SchedulerError(
448 '_get_agent_task_for_queue_entry got entry with '
449 'invalid status %s: %s' % (queue_entry
.status
, queue_entry
))
452 def _check_for_duplicate_host_entries(self
, task_entries
):
453 non_host_statuses
= (models
.HostQueueEntry
.Status
.PARSING
,
454 models
.HostQueueEntry
.Status
.ARCHIVING
)
455 for task_entry
in task_entries
:
456 using_host
= (task_entry
.host
is not None
457 and task_entry
.status
not in non_host_statuses
)
459 self
._assert
_host
_has
_no
_agent
(task_entry
)
462 def _assert_host_has_no_agent(self
, entry
):
464 @param entry: a HostQueueEntry or a SpecialTask
466 if self
.host_has_agent(entry
.host
):
467 agent
= tuple(self
._host
_agents
.get(entry
.host
.id))[0]
468 raise host_scheduler
.SchedulerError(
469 'While scheduling %s, host %s already has a host agent %s'
470 % (entry
, entry
.host
, agent
.task
))
473 def _get_agent_task_for_special_task(self
, special_task
):
475 Construct an AgentTask class to run the given SpecialTask and add it
477 @param special_task: a models.SpecialTask instance
478 @returns an AgentTask to run this SpecialTask
480 self
._assert
_host
_has
_no
_agent
(special_task
)
482 special_agent_task_classes
= (CleanupTask
, VerifyTask
, RepairTask
)
483 for agent_task_class
in special_agent_task_classes
:
484 if agent_task_class
.TASK_TYPE
== special_task
.task
:
485 return agent_task_class(task
=special_task
)
487 raise host_scheduler
.SchedulerError(
488 'No AgentTask class for task', str(special_task
))
491 def _register_pidfiles(self
, agent_tasks
):
492 for agent_task
in agent_tasks
:
493 agent_task
.register_necessary_pidfiles()
496 def _recover_tasks(self
, agent_tasks
):
497 orphans
= _drone_manager
.get_orphaned_autoserv_processes()
499 for agent_task
in agent_tasks
:
501 if agent_task
.monitor
and agent_task
.monitor
.has_process():
502 orphans
.discard(agent_task
.monitor
.get_process())
503 self
.add_agent_task(agent_task
)
505 self
._check
_for
_remaining
_orphan
_processes
(orphans
)
508 def _get_unassigned_entries(self
, status
):
509 for entry
in scheduler_models
.HostQueueEntry
.fetch(where
="status = '%s'"
511 if entry
.status
== status
and not self
.get_agents_for_entry(entry
):
512 # The status can change during iteration, e.g., if job.run()
513 # sets a group of queue entries to Starting
517 def _check_for_remaining_orphan_processes(self
, orphans
):
520 subject
= 'Unrecovered orphan autoserv processes remain'
521 message
= '\n'.join(str(process
) for process
in orphans
)
522 email_manager
.manager
.enqueue_notify_email(subject
, message
)
524 die_on_orphans
= global_config
.global_config
.get_config_value(
525 scheduler_config
.CONFIG_SECTION
, 'die_on_orphans', type=bool)
528 raise RuntimeError(subject
+ '\n' + message
)
531 def _recover_pending_entries(self
):
532 for entry
in self
._get
_unassigned
_entries
(
533 models
.HostQueueEntry
.Status
.PENDING
):
534 logging
.info('Recovering Pending entry %s', entry
)
538 def _check_for_unrecovered_verifying_entries(self
):
539 queue_entries
= scheduler_models
.HostQueueEntry
.fetch(
540 where
='status = "%s"' % models
.HostQueueEntry
.Status
.VERIFYING
)
541 unrecovered_hqes
= []
542 for queue_entry
in queue_entries
:
543 special_tasks
= models
.SpecialTask
.objects
.filter(
544 task__in
=(models
.SpecialTask
.Task
.CLEANUP
,
545 models
.SpecialTask
.Task
.VERIFY
),
546 queue_entry__id
=queue_entry
.id,
548 if special_tasks
.count() == 0:
549 unrecovered_hqes
.append(queue_entry
)
552 message
= '\n'.join(str(hqe
) for hqe
in unrecovered_hqes
)
553 raise host_scheduler
.SchedulerError(
554 '%d unrecovered verifying host queue entries:\n%s' %
555 (len(unrecovered_hqes
), message
))
558 def _get_prioritized_special_tasks(self
):
560 Returns all queued SpecialTasks prioritized for repair first, then
561 cleanup, then verify.
563 queued_tasks
= models
.SpecialTask
.objects
.filter(is_active
=False,
566 # exclude hosts with active queue entries unless the SpecialTask is for
568 queued_tasks
= models
.SpecialTask
.objects
.add_join(
569 queued_tasks
, 'afe_host_queue_entries', 'host_id',
570 join_condition
='afe_host_queue_entries.active',
571 join_from_key
='host_id', force_left_join
=True)
572 queued_tasks
= queued_tasks
.extra(
573 where
=['(afe_host_queue_entries.id IS NULL OR '
574 'afe_host_queue_entries.id = '
575 'afe_special_tasks.queue_entry_id)'])
577 # reorder tasks by priority
578 task_priority_order
= [models
.SpecialTask
.Task
.REPAIR
,
579 models
.SpecialTask
.Task
.CLEANUP
,
580 models
.SpecialTask
.Task
.VERIFY
]
581 def task_priority_key(task
):
582 return task_priority_order
.index(task
.task
)
583 return sorted(queued_tasks
, key
=task_priority_key
)
586 def _schedule_special_tasks(self
):
588 Execute queued SpecialTasks that are ready to run on idle hosts.
590 for task
in self
._get
_prioritized
_special
_tasks
():
591 if self
.host_has_agent(task
.host
):
593 self
.add_agent_task(self
._get
_agent
_task
_for
_special
_task
(task
))
596 def _reverify_remaining_hosts(self
):
597 # recover active hosts that have not yet been recovered, although this
598 # should never happen
599 message
= ('Recovering active host %s - this probably indicates a '
601 self
._reverify
_hosts
_where
(
602 "status IN ('Repairing', 'Verifying', 'Cleaning')",
603 print_message
=message
)
606 def _reverify_hosts_where(self
, where
,
607 print_message
='Reverifying host %s'):
608 full_where
='locked = 0 AND invalid = 0 AND ' + where
609 for host
in scheduler_models
.Host
.fetch(where
=full_where
):
610 if self
.host_has_agent(host
):
611 # host has already been recovered in some way
613 if self
._host
_has
_scheduled
_special
_task
(host
):
614 # host will have a special task scheduled on the next cycle
617 logging
.info(print_message
, host
.hostname
)
618 models
.SpecialTask
.objects
.create(
619 task
=models
.SpecialTask
.Task
.CLEANUP
,
620 host
=models
.Host
.objects
.get(id=host
.id))
623 def _recover_hosts(self
):
624 # recover "Repair Failed" hosts
625 message
= 'Reverifying dead host %s'
626 self
._reverify
_hosts
_where
("status = 'Repair Failed'",
627 print_message
=message
)
631 def _get_pending_queue_entries(self
):
632 # prioritize by job priority, then non-metahost over metahost, then FIFO
633 return list(scheduler_models
.HostQueueEntry
.fetch(
634 joins
='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
635 where
='NOT complete AND NOT active AND status="Queued"',
636 order_by
='afe_jobs.priority DESC, meta_host, job_id'))
639 def _refresh_pending_queue_entries(self
):
641 Lookup the pending HostQueueEntries and call our HostScheduler
642 refresh() method given that list. Return the list.
644 @returns A list of pending HostQueueEntries sorted in priority order.
646 queue_entries
= self
._get
_pending
_queue
_entries
()
647 if not queue_entries
:
650 self
._host
_scheduler
.refresh(queue_entries
)
655 def _schedule_atomic_group(self
, queue_entry
):
657 Schedule the given queue_entry on an atomic group of hosts.
659 Returns immediately if there are insufficient available hosts.
661 Creates new HostQueueEntries based off of queue_entry for the
662 scheduled hosts and starts them all running.
664 # This is a virtual host queue entry representing an entire
665 # atomic group, find a group and schedule their hosts.
666 group_hosts
= self
._host
_scheduler
.find_eligible_atomic_group(
671 logging
.info('Expanding atomic group entry %s with hosts %s',
673 ', '.join(host
.hostname
for host
in group_hosts
))
675 for assigned_host
in group_hosts
[1:]:
676 # Create a new HQE for every additional assigned_host.
677 new_hqe
= scheduler_models
.HostQueueEntry
.clone(queue_entry
)
679 new_hqe
.set_host(assigned_host
)
680 self
._run
_queue
_entry
(new_hqe
)
682 # The first assigned host uses the original HostQueueEntry
683 queue_entry
.set_host(group_hosts
[0])
684 self
._run
_queue
_entry
(queue_entry
)
687 def _schedule_hostless_job(self
, queue_entry
):
688 self
.add_agent_task(HostlessQueueTask(queue_entry
))
689 queue_entry
.set_status(models
.HostQueueEntry
.Status
.STARTING
)
692 def _schedule_new_jobs(self
):
693 queue_entries
= self
._refresh
_pending
_queue
_entries
()
694 if not queue_entries
:
697 for queue_entry
in queue_entries
:
698 is_unassigned_atomic_group
= (
699 queue_entry
.atomic_group_id
is not None
700 and queue_entry
.host_id
is None)
702 if queue_entry
.is_hostless():
703 self
._schedule
_hostless
_job
(queue_entry
)
704 elif is_unassigned_atomic_group
:
705 self
._schedule
_atomic
_group
(queue_entry
)
707 assigned_host
= self
._host
_scheduler
.schedule_entry(queue_entry
)
708 if assigned_host
and not self
.host_has_agent(assigned_host
):
709 assert assigned_host
.id == queue_entry
.host_id
710 self
._run
_queue
_entry
(queue_entry
)
713 def _schedule_running_host_queue_entries(self
):
714 for agent_task
in self
._get
_queue
_entry
_agent
_tasks
():
715 self
.add_agent_task(agent_task
)
718 def _schedule_delay_tasks(self
):
719 for entry
in scheduler_models
.HostQueueEntry
.fetch(
720 where
='status = "%s"' % models
.HostQueueEntry
.Status
.WAITING
):
721 task
= entry
.job
.schedule_delayed_callback_task(entry
)
723 self
.add_agent_task(task
)
726 def _run_queue_entry(self
, queue_entry
):
727 queue_entry
.schedule_pre_job_tasks()
730 def _find_aborting(self
):
732 for entry
in scheduler_models
.HostQueueEntry
.fetch(
733 where
='aborted and not complete'):
734 logging
.info('Aborting %s', entry
)
735 for agent
in self
.get_agents_for_entry(entry
):
738 jobs_to_stop
.add(entry
.job
)
739 for job
in jobs_to_stop
:
740 job
.stop_if_necessary()
743 def _can_start_agent(self
, agent
, num_started_this_cycle
,
745 # always allow zero-process agents to run
746 if agent
.task
.num_processes
== 0:
748 # don't allow any nonzero-process agents to run after we've reached a
749 # limit (this avoids starvation of many-process agents)
750 if have_reached_limit
:
752 # total process throttling
753 max_runnable_processes
= _drone_manager
.max_runnable_processes(
754 agent
.task
.owner_username
,
755 agent
.task
.get_drone_hostnames_allowed())
756 if agent
.task
.num_processes
> max_runnable_processes
:
758 # if a single agent exceeds the per-cycle throttling, still allow it to
759 # run when it's the first agent in the cycle
760 if num_started_this_cycle
== 0:
762 # per-cycle throttling
763 if (num_started_this_cycle
+ agent
.task
.num_processes
>
764 scheduler_config
.config
.max_processes_started_per_cycle
):
769 def _handle_agents(self
):
770 num_started_this_cycle
= 0
771 have_reached_limit
= False
772 # iterate over copy, so we can remove agents during iteration
773 for agent
in list(self
._agents
):
774 if not agent
.started
:
775 if not self
._can
_start
_agent
(agent
, num_started_this_cycle
,
777 have_reached_limit
= True
779 num_started_this_cycle
+= agent
.task
.num_processes
782 logging
.info("agent finished")
783 self
.remove_agent(agent
)
784 logging
.info('%d running processes',
785 _drone_manager
.total_running_processes())
788 def _process_recurring_runs(self
):
789 recurring_runs
= models
.RecurringRun
.objects
.filter(
790 start_date__lte
=datetime
.datetime
.now())
791 for rrun
in recurring_runs
:
792 # Create job from template
794 info
= rpc_utils
.get_job_info(job
)
795 options
= job
.get_object_dict()
797 host_objects
= info
['hosts']
798 one_time_hosts
= info
['one_time_hosts']
799 metahost_objects
= info
['meta_hosts']
800 dependencies
= info
['dependencies']
801 atomic_group
= info
['atomic_group']
803 for host
in one_time_hosts
or []:
804 this_host
= models
.Host
.create_one_time_host(host
.hostname
)
805 host_objects
.append(this_host
)
808 rpc_utils
.create_new_job(owner
=rrun
.owner
.login
,
810 host_objects
=host_objects
,
811 metahost_objects
=metahost_objects
,
812 atomic_group
=atomic_group
)
814 except Exception, ex
:
815 logging
.exception(ex
)
818 if rrun
.loop_count
== 1:
821 if rrun
.loop_count
!= 0: # if not infinite loop
822 # calculate new start_date
823 difference
= datetime
.timedelta(seconds
=rrun
.loop_period
)
824 rrun
.start_date
= rrun
.start_date
+ difference
829 class PidfileRunMonitor(object):
831 Client must call either run() to start a new process or
832 attach_to_existing_process().
835 class _PidfileException(Exception):
837 Raised when there's some unexpected behavior with the pid file, but only
838 used internally (never allowed to escape this class).
843 self
.lost_process
= False
844 self
._start
_time
= None
845 self
.pidfile_id
= None
846 self
._state
= drone_manager
.PidfileContents()
849 def _add_nice_command(self
, command
, nice_level
):
852 return ['nice', '-n', str(nice_level
)] + command
855 def _set_start_time(self
):
856 self
._start
_time
= time
.time()
859 def run(self
, command
, working_directory
, num_processes
, nice_level
=None,
860 log_file
=None, pidfile_name
=None, paired_with_pidfile
=None,
861 username
=None, drone_hostnames_allowed
=None):
862 assert command
is not None
863 if nice_level
is not None:
864 command
= ['nice', '-n', str(nice_level
)] + command
865 self
._set
_start
_time
()
866 self
.pidfile_id
= _drone_manager
.execute_command(
867 command
, working_directory
, pidfile_name
=pidfile_name
,
868 num_processes
=num_processes
, log_file
=log_file
,
869 paired_with_pidfile
=paired_with_pidfile
, username
=username
,
870 drone_hostnames_allowed
=drone_hostnames_allowed
)
873 def attach_to_existing_process(self
, execution_path
,
874 pidfile_name
=drone_manager
.AUTOSERV_PID_FILE
,
876 self
._set
_start
_time
()
877 self
.pidfile_id
= _drone_manager
.get_pidfile_id_from(
878 execution_path
, pidfile_name
=pidfile_name
)
879 if num_processes
is not None:
880 _drone_manager
.declare_process_count(self
.pidfile_id
, num_processes
)
884 if self
.has_process():
885 _drone_manager
.kill_process(self
.get_process())
888 def has_process(self
):
889 self
._get
_pidfile
_info
()
890 return self
._state
.process
is not None
893 def get_process(self
):
894 self
._get
_pidfile
_info
()
895 assert self
._state
.process
is not None
896 return self
._state
.process
899 def _read_pidfile(self
, use_second_read
=False):
900 assert self
.pidfile_id
is not None, (
901 'You must call run() or attach_to_existing_process()')
902 contents
= _drone_manager
.get_pidfile_contents(
903 self
.pidfile_id
, use_second_read
=use_second_read
)
904 if contents
.is_invalid():
905 self
._state
= drone_manager
.PidfileContents()
906 raise self
._PidfileException
(contents
)
907 self
._state
= contents
910 def _handle_pidfile_error(self
, error
, message
=''):
911 message
= error
+ '\nProcess: %s\nPidfile: %s\n%s' % (
912 self
._state
.process
, self
.pidfile_id
, message
)
913 email_manager
.manager
.enqueue_notify_email(error
, message
)
914 self
.on_lost_process(self
._state
.process
)
917 def _get_pidfile_info_helper(self
):
918 if self
.lost_process
:
923 if self
._state
.process
is None:
924 self
._handle
_no
_process
()
927 if self
._state
.exit_status
is None:
928 # double check whether or not autoserv is running
929 if _drone_manager
.is_process_running(self
._state
.process
):
932 # pid but no running process - maybe process *just* exited
933 self
._read
_pidfile
(use_second_read
=True)
934 if self
._state
.exit_status
is None:
935 # autoserv exited without writing an exit code
937 self
._handle
_pidfile
_error
(
938 'autoserv died without writing exit code')
941 def _get_pidfile_info(self
):
943 After completion, self._state will contain:
944 pid=None, exit_status=None if autoserv has not yet run
945 pid!=None, exit_status=None if autoserv is running
946 pid!=None, exit_status!=None if autoserv has completed
949 self
._get
_pidfile
_info
_helper
()
950 except self
._PidfileException
, exc
:
951 self
._handle
_pidfile
_error
('Pidfile error', traceback
.format_exc())
954 def _handle_no_process(self
):
956 Called when no pidfile is found or no pid is in the pidfile.
958 message
= 'No pid found at %s' % self
.pidfile_id
959 if time
.time() - self
._start
_time
> _get_pidfile_timeout_secs():
960 email_manager
.manager
.enqueue_notify_email(
961 'Process has failed to write pidfile', message
)
962 self
.on_lost_process()
965 def on_lost_process(self
, process
=None):
967 Called when autoserv has exited without writing an exit status,
968 or we've timed out waiting for autoserv to write a pid to the
969 pidfile. In either case, we just return failure and the caller
970 should signal some kind of warning.
972 process is unimportant here, as it shouldn't be used by anyone.
974 self
.lost_process
= True
975 self
._state
.process
= process
976 self
._state
.exit_status
= 1
977 self
._state
.num_tests_failed
= 0
981 self
._get
_pidfile
_info
()
982 return self
._state
.exit_status
985 def num_tests_failed(self
):
986 """@returns The number of tests that failed or -1 if unknown."""
987 self
._get
_pidfile
_info
()
988 if self
._state
.num_tests_failed
is None:
990 return self
._state
.num_tests_failed
993 def try_copy_results_on_drone(self
, **kwargs
):
994 if self
.has_process():
995 # copy results logs into the normal place for job results
996 _drone_manager
.copy_results_on_drone(self
.get_process(), **kwargs
)
999 def try_copy_to_results_repository(self
, source
, **kwargs
):
1000 if self
.has_process():
1001 _drone_manager
.copy_to_results_repository(self
.get_process(),
1005 class Agent(object):
1007 An agent for use by the Dispatcher class to perform a task.
1009 The following methods are required on all task objects:
1010 poll() - Called periodically to let the task check its status and
1011 update its internal state. If the task succeeded.
1012 is_done() - Returns True if the task is finished.
1013 abort() - Called when an abort has been requested. The task must
1014 set its aborted attribute to True if it actually aborted.
1016 The following attributes are required on all task objects:
1017 aborted - bool, True if this task was aborted.
1018 success - bool, True if this task succeeded.
1019 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1020 host_ids - A sequence of Host ids this task represents.
1024 def __init__(self
, task
):
1026 @param task: A task as described in the class docstring.
1030 # This is filled in by Dispatcher.add_agent()
1031 self
.dispatcher
= None
1033 self
.queue_entry_ids
= task
.queue_entry_ids
1034 self
.host_ids
= task
.host_ids
1036 self
.started
= False
1037 self
.finished
= False
1042 if not self
.finished
:
1044 if self
.task
.is_done():
1045 self
.finished
= True
1049 return self
.finished
1055 if self
.task
.aborted
:
1056 # tasks can choose to ignore aborts
1057 self
.finished
= True
1060 class AgentTask(object):
1061 class _NullMonitor(object):
1064 def has_process(self
):
1068 def __init__(self
, log_file_name
=None):
1070 @param log_file_name: (optional) name of file to log command output to
1073 self
.started
= False
1075 self
.aborted
= False
1077 self
.queue_entry_ids
= []
1079 self
._log
_file
_name
= log_file_name
1082 def _set_ids(self
, host
=None, queue_entries
=None):
1083 if queue_entries
and queue_entries
!= [None]:
1084 self
.host_ids
= [entry
.host
.id for entry
in queue_entries
]
1085 self
.queue_entry_ids
= [entry
.id for entry
in queue_entries
]
1088 self
.host_ids
= [host
.id]
1092 if not self
.started
:
1100 exit_code
= self
.monitor
.exit_code()
1101 if exit_code
is None:
1104 success
= (exit_code
== 0)
1105 self
.finished(success
)
1112 def finished(self
, success
):
1118 self
.success
= success
1126 assert not self
.monitor
1127 self
.register_necessary_pidfiles()
1130 def _log_file(self
):
1131 if not self
._log
_file
_name
:
1133 return os
.path
.join(self
._working
_directory
(), self
._log
_file
_name
)
1137 log_file
= self
._log
_file
()
1138 if self
.monitor
and log_file
:
1139 self
.monitor
.try_copy_to_results_repository(log_file
)
1147 logging
.info("%s finished with success=%s", type(self
).__name
__,
1153 if not self
.started
:
1168 def _get_consistent_execution_path(self
, execution_entries
):
1169 first_execution_path
= execution_entries
[0].execution_path()
1170 for execution_entry
in execution_entries
[1:]:
1171 assert execution_entry
.execution_path() == first_execution_path
, (
1172 '%s (%s) != %s (%s)' % (execution_entry
.execution_path(),
1174 first_execution_path
,
1175 execution_entries
[0]))
1176 return first_execution_path
1179 def _copy_results(self
, execution_entries
, use_monitor
=None):
1181 @param execution_entries: list of objects with execution_path() method
1183 if use_monitor
is not None and not use_monitor
.has_process():
1186 assert len(execution_entries
) > 0
1187 if use_monitor
is None:
1189 use_monitor
= self
.monitor
1190 assert use_monitor
.has_process()
1191 execution_path
= self
._get
_consistent
_execution
_path
(execution_entries
)
1192 results_path
= execution_path
+ '/'
1193 use_monitor
.try_copy_to_results_repository(results_path
)
1196 def _parse_results(self
, queue_entries
):
1197 for queue_entry
in queue_entries
:
1198 queue_entry
.set_status(models
.HostQueueEntry
.Status
.PARSING
)
1201 def _archive_results(self
, queue_entries
):
1202 for queue_entry
in queue_entries
:
1203 queue_entry
.set_status(models
.HostQueueEntry
.Status
.ARCHIVING
)
1206 def _command_line(self
):
1208 Return the command line to run. Must be overridden.
1210 raise NotImplementedError
1214 def num_processes(self
):
1216 Return the number of processes forked by this AgentTask's process. It
1217 may only be approximate. To be overridden if necessary.
1222 def _paired_with_monitor(self
):
1224 If this AgentTask's process must run on the same machine as some
1225 previous process, this method should be overridden to return a
1226 PidfileRunMonitor for that process.
1228 return self
._NullMonitor
()
1232 def owner_username(self
):
1234 Return login of user responsible for this task. May be None. Must be
1237 raise NotImplementedError
1240 def _working_directory(self
):
1242 Return the directory where this AgentTask's process executes. Must be
1245 raise NotImplementedError
1248 def _pidfile_name(self
):
1250 Return the name of the pidfile this AgentTask's process uses. To be
1251 overridden if necessary.
1253 return drone_manager
.AUTOSERV_PID_FILE
1256 def _check_paired_results_exist(self
):
1257 if not self
._paired
_with
_monitor
().has_process():
1258 email_manager
.manager
.enqueue_notify_email(
1259 'No paired results in task',
1260 'No paired results in task %s at %s'
1261 % (self
, self
._paired
_with
_monitor
().pidfile_id
))
1262 self
.finished(False)
1267 def _create_monitor(self
):
1268 assert not self
.monitor
1269 self
.monitor
= PidfileRunMonitor()
1273 if not self
._check
_paired
_results
_exist
():
1276 self
._create
_monitor
()
1278 self
._command
_line
(), self
._working
_directory
(),
1279 num_processes
=self
.num_processes
,
1280 nice_level
=AUTOSERV_NICE_LEVEL
, log_file
=self
._log
_file
(),
1281 pidfile_name
=self
._pidfile
_name
(),
1282 paired_with_pidfile
=self
._paired
_with
_monitor
().pidfile_id
,
1283 username
=self
.owner_username
,
1284 drone_hostnames_allowed
=self
.get_drone_hostnames_allowed())
1287 def get_drone_hostnames_allowed(self
):
1288 if not models
.DroneSet
.drone_sets_enabled():
1291 hqes
= models
.HostQueueEntry
.objects
.filter(id__in
=self
.queue_entry_ids
)
1293 # Only special tasks could be missing host queue entries
1294 assert isinstance(self
, SpecialAgentTask
)
1295 return self
._user
_or
_global
_default
_drone
_set
(
1296 self
.task
, self
.task
.requested_by
)
1298 job_ids
= hqes
.values_list('job', flat
=True).distinct()
1299 assert job_ids
.count() == 1, ("AgentTask's queue entries "
1300 "span multiple jobs")
1302 job
= models
.Job
.objects
.get(id=job_ids
[0])
1303 drone_set
= job
.drone_set
1305 return self
._user
_or
_global
_default
_drone
_set
(job
, job
.user())
1307 return drone_set
.get_drone_hostnames()
1310 def _user_or_global_default_drone_set(self
, obj_with_owner
, user
):
1312 Returns the user's default drone set, if present.
1314 Otherwise, returns the global default drone set.
1316 default_hostnames
= models
.DroneSet
.get_default().get_drone_hostnames()
1318 logging
.warn('%s had no owner; using default drone set',
1320 return default_hostnames
1321 if not user
.drone_set
:
1322 logging
.warn('User %s has no default drone set, using global '
1323 'default', user
.login
)
1324 return default_hostnames
1325 return user
.drone_set
.get_drone_hostnames()
1328 def register_necessary_pidfiles(self
):
1329 pidfile_id
= _drone_manager
.get_pidfile_id_from(
1330 self
._working
_directory
(), self
._pidfile
_name
())
1331 _drone_manager
.register_pidfile(pidfile_id
)
1333 paired_pidfile_id
= self
._paired
_with
_monitor
().pidfile_id
1334 if paired_pidfile_id
:
1335 _drone_manager
.register_pidfile(paired_pidfile_id
)
1339 if not self
._check
_paired
_results
_exist
():
1342 self
._create
_monitor
()
1343 self
.monitor
.attach_to_existing_process(
1344 self
._working
_directory
(), pidfile_name
=self
._pidfile
_name
(),
1345 num_processes
=self
.num_processes
)
1346 if not self
.monitor
.has_process():
1347 # no process to recover; wait to be started normally
1352 logging
.info('Recovering process %s for %s at %s'
1353 % (self
.monitor
.get_process(), type(self
).__name
__,
1354 self
._working
_directory
()))
1357 def _check_queue_entry_statuses(self
, queue_entries
, allowed_hqe_statuses
,
1358 allowed_host_statuses
=None):
1359 class_name
= self
.__class
__.__name
__
1360 for entry
in queue_entries
:
1361 if entry
.status
not in allowed_hqe_statuses
:
1362 raise host_scheduler
.SchedulerError(
1363 '%s attempting to start entry with invalid status %s: '
1364 '%s' % (class_name
, entry
.status
, entry
))
1365 invalid_host_status
= (
1366 allowed_host_statuses
is not None
1367 and entry
.host
.status
not in allowed_host_statuses
)
1368 if invalid_host_status
:
1369 raise host_scheduler
.SchedulerError(
1370 '%s attempting to start on queue entry with invalid '
1371 'host status %s: %s'
1372 % (class_name
, entry
.host
.status
, entry
))
1375 class TaskWithJobKeyvals(object):
1376 """AgentTask mixin providing functionality to help with job keyval files."""
1377 _KEYVAL_FILE
= 'keyval'
1378 def _format_keyval(self
, key
, value
):
1379 return '%s=%s' % (key
, value
)
1382 def _keyval_path(self
):
1383 """Subclasses must override this"""
1384 raise NotImplementedError
1387 def _write_keyval_after_job(self
, field
, value
):
1389 if not self
.monitor
.has_process():
1391 _drone_manager
.write_lines_to_file(
1392 self
._keyval
_path
(), [self
._format
_keyval
(field
, value
)],
1393 paired_with_process
=self
.monitor
.get_process())
1396 def _job_queued_keyval(self
, job
):
1397 return 'job_queued', int(time
.mktime(job
.created_on
.timetuple()))
1400 def _write_job_finished(self
):
1401 self
._write
_keyval
_after
_job
("job_finished", int(time
.time()))
1404 def _write_keyvals_before_job_helper(self
, keyval_dict
, keyval_path
):
1405 keyval_contents
= '\n'.join(self
._format
_keyval
(key
, value
)
1406 for key
, value
in keyval_dict
.iteritems())
1407 # always end with a newline to allow additional keyvals to be written
1408 keyval_contents
+= '\n'
1409 _drone_manager
.attach_file_to_execution(self
._working
_directory
(),
1411 file_path
=keyval_path
)
1414 def _write_keyvals_before_job(self
, keyval_dict
):
1415 self
._write
_keyvals
_before
_job
_helper
(keyval_dict
, self
._keyval
_path
())
1418 def _write_host_keyvals(self
, host
):
1419 keyval_path
= os
.path
.join(self
._working
_directory
(), 'host_keyvals',
1421 platform
, all_labels
= host
.platform_and_labels()
1422 all_labels
= [ urllib
.quote(label
) for label
in all_labels
]
1423 keyval_dict
= dict(platform
=platform
, labels
=','.join(all_labels
))
1424 self
._write
_keyvals
_before
_job
_helper
(keyval_dict
, keyval_path
)
1427 class SpecialAgentTask(AgentTask
, TaskWithJobKeyvals
):
1429 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1436 def __init__(self
, task
, extra_command_args
):
1437 super(SpecialAgentTask
, self
).__init
__()
1439 assert self
.TASK_TYPE
is not None, 'self.TASK_TYPE must be overridden'
1441 self
.host
= scheduler_models
.Host(id=task
.host
.id)
1442 self
.queue_entry
= None
1443 if task
.queue_entry
:
1444 self
.queue_entry
= scheduler_models
.HostQueueEntry(
1445 id=task
.queue_entry
.id)
1448 self
._extra
_command
_args
= extra_command_args
1451 def _keyval_path(self
):
1452 return os
.path
.join(self
._working
_directory
(), self
._KEYVAL
_FILE
)
1455 def _command_line(self
):
1456 return _autoserv_command_line(self
.host
.hostname
,
1457 self
._extra
_command
_args
,
1458 queue_entry
=self
.queue_entry
)
1461 def _working_directory(self
):
1462 return self
.task
.execution_path()
1466 def owner_username(self
):
1467 if self
.task
.requested_by
:
1468 return self
.task
.requested_by
.login
1473 super(SpecialAgentTask
, self
).prolog()
1474 self
.task
.activate()
1475 self
._write
_host
_keyvals
(self
.host
)
1478 def _fail_queue_entry(self
):
1479 assert self
.queue_entry
1481 if self
.queue_entry
.meta_host
:
1482 return # don't fail metahost entries, they'll be reassigned
1484 self
.queue_entry
.update_from_database()
1485 if self
.queue_entry
.status
!= models
.HostQueueEntry
.Status
.QUEUED
:
1486 return # entry has been aborted
1488 self
.queue_entry
.set_execution_subdir()
1489 queued_key
, queued_time
= self
._job
_queued
_keyval
(
1490 self
.queue_entry
.job
)
1491 self
._write
_keyval
_after
_job
(queued_key
, queued_time
)
1492 self
._write
_job
_finished
()
1494 # copy results logs into the normal place for job results
1495 self
.monitor
.try_copy_results_on_drone(
1496 source_path
=self
._working
_directory
() + '/',
1497 destination_path
=self
.queue_entry
.execution_path() + '/')
1499 pidfile_id
= _drone_manager
.get_pidfile_id_from(
1500 self
.queue_entry
.execution_path(),
1501 pidfile_name
=drone_manager
.AUTOSERV_PID_FILE
)
1502 _drone_manager
.register_pidfile(pidfile_id
)
1504 if self
.queue_entry
.job
.parse_failed_repair
:
1505 self
._parse
_results
([self
.queue_entry
])
1507 self
._archive
_results
([self
.queue_entry
])
1511 super(SpecialAgentTask
, self
).cleanup()
1513 # We will consider an aborted task to be "Failed"
1514 self
.task
.finish(bool(self
.success
))
1517 if self
.monitor
.has_process():
1518 self
._copy
_results
([self
.task
])
1519 if self
.monitor
.pidfile_id
is not None:
1520 _drone_manager
.unregister_pidfile(self
.monitor
.pidfile_id
)
1523 class RepairTask(SpecialAgentTask
):
1524 TASK_TYPE
= models
.SpecialTask
.Task
.REPAIR
1527 def __init__(self
, task
):
1529 queue_entry: queue entry to mark failed if this repair fails.
1531 protection
= host_protections
.Protection
.get_string(
1532 task
.host
.protection
)
1533 # normalize the protection name
1534 protection
= host_protections
.Protection
.get_attr_name(protection
)
1536 super(RepairTask
, self
).__init
__(
1537 task
, ['-R', '--host-protection', protection
])
1539 # *don't* include the queue entry in IDs -- if the queue entry is
1540 # aborted, we want to leave the repair task running
1541 self
._set
_ids
(host
=self
.host
)
1545 super(RepairTask
, self
).prolog()
1546 logging
.info("repair_task starting")
1547 self
.host
.set_status(models
.Host
.Status
.REPAIRING
)
1551 super(RepairTask
, self
).epilog()
1554 self
.host
.set_status(models
.Host
.Status
.READY
)
1556 self
.host
.set_status(models
.Host
.Status
.REPAIR_FAILED
)
1557 if self
.queue_entry
:
1558 self
._fail
_queue
_entry
()
1561 class PreJobTask(SpecialAgentTask
):
1562 def _copy_to_results_repository(self
):
1563 if not self
.queue_entry
or self
.queue_entry
.meta_host
:
1566 self
.queue_entry
.set_execution_subdir()
1567 log_name
= os
.path
.basename(self
.task
.execution_path())
1568 source
= os
.path
.join(self
.task
.execution_path(), 'debug',
1570 destination
= os
.path
.join(
1571 self
.queue_entry
.execution_path(), log_name
)
1573 self
.monitor
.try_copy_to_results_repository(
1574 source
, destination_path
=destination
)
1578 super(PreJobTask
, self
).epilog()
1583 self
._copy
_to
_results
_repository
()
1585 if self
.host
.protection
== host_protections
.Protection
.DO_NOT_VERIFY
:
1586 # effectively ignore failure for these hosts
1590 if self
.queue_entry
:
1591 self
.queue_entry
.requeue()
1593 if models
.SpecialTask
.objects
.filter(
1594 task
=models
.SpecialTask
.Task
.REPAIR
,
1595 queue_entry__id
=self
.queue_entry
.id):
1596 self
.host
.set_status(models
.Host
.Status
.REPAIR_FAILED
)
1597 self
._fail
_queue
_entry
()
1600 queue_entry
= models
.HostQueueEntry
.objects
.get(
1601 id=self
.queue_entry
.id)
1605 models
.SpecialTask
.objects
.create(
1606 host
=models
.Host
.objects
.get(id=self
.host
.id),
1607 task
=models
.SpecialTask
.Task
.REPAIR
,
1608 queue_entry
=queue_entry
,
1609 requested_by
=self
.task
.requested_by
)
1612 class VerifyTask(PreJobTask
):
1613 TASK_TYPE
= models
.SpecialTask
.Task
.VERIFY
1616 def __init__(self
, task
):
1617 super(VerifyTask
, self
).__init
__(task
, ['-v'])
1618 self
._set
_ids
(host
=self
.host
, queue_entries
=[self
.queue_entry
])
1622 super(VerifyTask
, self
).prolog()
1624 logging
.info("starting verify on %s", self
.host
.hostname
)
1625 if self
.queue_entry
:
1626 self
.queue_entry
.set_status(models
.HostQueueEntry
.Status
.VERIFYING
)
1627 self
.host
.set_status(models
.Host
.Status
.VERIFYING
)
1629 # Delete any queued manual reverifies for this host. One verify will do
1630 # and there's no need to keep records of other requests.
1631 queued_verifies
= models
.SpecialTask
.objects
.filter(
1632 host__id
=self
.host
.id,
1633 task
=models
.SpecialTask
.Task
.VERIFY
,
1634 is_active
=False, is_complete
=False, queue_entry
=None)
1635 queued_verifies
= queued_verifies
.exclude(id=self
.task
.id)
1636 queued_verifies
.delete()
1640 super(VerifyTask
, self
).epilog()
1642 if self
.queue_entry
:
1643 self
.queue_entry
.on_pending()
1645 self
.host
.set_status(models
.Host
.Status
.READY
)
1648 class CleanupTask(PreJobTask
):
1649 # note this can also run post-job, but when it does, it's running standalone
1650 # against the host (not related to the job), so it's not considered a
1653 TASK_TYPE
= models
.SpecialTask
.Task
.CLEANUP
1656 def __init__(self
, task
, recover_run_monitor
=None):
1657 super(CleanupTask
, self
).__init
__(task
, ['--cleanup'])
1658 self
._set
_ids
(host
=self
.host
, queue_entries
=[self
.queue_entry
])
1662 super(CleanupTask
, self
).prolog()
1663 logging
.info("starting cleanup task for host: %s", self
.host
.hostname
)
1664 self
.host
.set_status(models
.Host
.Status
.CLEANING
)
1665 if self
.queue_entry
:
1666 self
.queue_entry
.set_status(models
.HostQueueEntry
.Status
.VERIFYING
)
1669 def _finish_epilog(self
):
1670 if not self
.queue_entry
or not self
.success
:
1673 do_not_verify_protection
= host_protections
.Protection
.DO_NOT_VERIFY
1674 should_run_verify
= (
1675 self
.queue_entry
.job
.run_verify
1676 and self
.host
.protection
!= do_not_verify_protection
)
1677 if should_run_verify
:
1678 entry
= models
.HostQueueEntry
.objects
.get(id=self
.queue_entry
.id)
1679 models
.SpecialTask
.objects
.create(
1680 host
=models
.Host
.objects
.get(id=self
.host
.id),
1682 task
=models
.SpecialTask
.Task
.VERIFY
)
1684 self
.queue_entry
.on_pending()
1688 super(CleanupTask
, self
).epilog()
1691 self
.host
.update_field('dirty', 0)
1692 self
.host
.set_status(models
.Host
.Status
.READY
)
1694 self
._finish
_epilog
()
1697 class AbstractQueueTask(AgentTask
, TaskWithJobKeyvals
):
1699 Common functionality for QueueTask and HostlessQueueTask
1701 def __init__(self
, queue_entries
):
1702 super(AbstractQueueTask
, self
).__init
__()
1703 self
.job
= queue_entries
[0].job
1704 self
.queue_entries
= queue_entries
1707 def _keyval_path(self
):
1708 return os
.path
.join(self
._working
_directory
(), self
._KEYVAL
_FILE
)
1711 def _write_control_file(self
, execution_path
):
1712 control_path
= _drone_manager
.attach_file_to_execution(
1713 execution_path
, self
.job
.control_file
)
1717 def _command_line(self
):
1718 execution_path
= self
.queue_entries
[0].execution_path()
1719 control_path
= self
._write
_control
_file
(execution_path
)
1720 hostnames
= ','.join(entry
.host
.hostname
1721 for entry
in self
.queue_entries
1722 if not entry
.is_hostless())
1724 execution_tag
= self
.queue_entries
[0].execution_tag()
1725 params
= _autoserv_command_line(
1727 ['-P', execution_tag
, '-n',
1728 _drone_manager
.absolute_path(control_path
)],
1729 job
=self
.job
, verbose
=False)
1731 if not self
.job
.is_server_job():
1738 def num_processes(self
):
1739 return len(self
.queue_entries
)
1743 def owner_username(self
):
1744 return self
.job
.owner
1747 def _working_directory(self
):
1748 return self
._get
_consistent
_execution
_path
(self
.queue_entries
)
1752 queued_key
, queued_time
= self
._job
_queued
_keyval
(self
.job
)
1753 keyval_dict
= self
.job
.keyval_dict()
1754 keyval_dict
[queued_key
] = queued_time
1755 group_name
= self
.queue_entries
[0].get_group_name()
1757 keyval_dict
['host_group_name'] = group_name
1758 self
._write
_keyvals
_before
_job
(keyval_dict
)
1759 for queue_entry
in self
.queue_entries
:
1760 queue_entry
.set_status(models
.HostQueueEntry
.Status
.RUNNING
)
1761 queue_entry
.set_started_on_now()
1764 def _write_lost_process_error_file(self
):
1765 error_file_path
= os
.path
.join(self
._working
_directory
(), 'job_failure')
1766 _drone_manager
.write_lines_to_file(error_file_path
,
1767 [_LOST_PROCESS_ERROR
])
1770 def _finish_task(self
):
1771 if not self
.monitor
:
1774 self
._write
_job
_finished
()
1776 if self
.monitor
.lost_process
:
1777 self
._write
_lost
_process
_error
_file
()
1780 def _write_status_comment(self
, comment
):
1781 _drone_manager
.write_lines_to_file(
1782 os
.path
.join(self
._working
_directory
(), 'status.log'),
1783 ['INFO\t----\t----\t' + comment
],
1784 paired_with_process
=self
.monitor
.get_process())
1787 def _log_abort(self
):
1788 if not self
.monitor
or not self
.monitor
.has_process():
1791 # build up sets of all the aborted_by and aborted_on values
1792 aborted_by
, aborted_on
= set(), set()
1793 for queue_entry
in self
.queue_entries
:
1794 if queue_entry
.aborted_by
:
1795 aborted_by
.add(queue_entry
.aborted_by
)
1796 t
= int(time
.mktime(queue_entry
.aborted_on
.timetuple()))
1799 # extract some actual, unique aborted by value and write it out
1800 # TODO(showard): this conditional is now obsolete, we just need to leave
1801 # it in temporarily for backwards compatibility over upgrades. delete
1803 assert len(aborted_by
) <= 1
1804 if len(aborted_by
) == 1:
1805 aborted_by_value
= aborted_by
.pop()
1806 aborted_on_value
= max(aborted_on
)
1808 aborted_by_value
= 'autotest_system'
1809 aborted_on_value
= int(time
.time())
1811 self
._write
_keyval
_after
_job
("aborted_by", aborted_by_value
)
1812 self
._write
_keyval
_after
_job
("aborted_on", aborted_on_value
)
1814 aborted_on_string
= str(datetime
.datetime
.fromtimestamp(
1816 self
._write
_status
_comment
('Job aborted by %s on %s' %
1817 (aborted_by_value
, aborted_on_string
))
1821 super(AbstractQueueTask
, self
).abort()
1827 super(AbstractQueueTask
, self
).epilog()
1831 class QueueTask(AbstractQueueTask
):
1832 def __init__(self
, queue_entries
):
1833 super(QueueTask
, self
).__init
__(queue_entries
)
1834 self
._set
_ids
(queue_entries
=queue_entries
)
1838 self
._check
_queue
_entry
_statuses
(
1840 allowed_hqe_statuses
=(models
.HostQueueEntry
.Status
.STARTING
,
1841 models
.HostQueueEntry
.Status
.RUNNING
),
1842 allowed_host_statuses
=(models
.Host
.Status
.PENDING
,
1843 models
.Host
.Status
.RUNNING
))
1845 super(QueueTask
, self
).prolog()
1847 for queue_entry
in self
.queue_entries
:
1848 self
._write
_host
_keyvals
(queue_entry
.host
)
1849 queue_entry
.host
.set_status(models
.Host
.Status
.RUNNING
)
1850 queue_entry
.host
.update_field('dirty', 1)
1851 if self
.job
.synch_count
== 1 and len(self
.queue_entries
) == 1:
1852 # TODO(gps): Remove this if nothing needs it anymore.
1853 # A potential user is: tko/parser
1854 self
.job
.write_to_machines_file(self
.queue_entries
[0])
1857 def _finish_task(self
):
1858 super(QueueTask
, self
)._finish
_task
()
1860 for queue_entry
in self
.queue_entries
:
1861 queue_entry
.set_status(models
.HostQueueEntry
.Status
.GATHERING
)
1862 queue_entry
.host
.set_status(models
.Host
.Status
.RUNNING
)
1865 class HostlessQueueTask(AbstractQueueTask
):
1866 def __init__(self
, queue_entry
):
1867 super(HostlessQueueTask
, self
).__init
__([queue_entry
])
1868 self
.queue_entry_ids
= [queue_entry
.id]
1872 self
.queue_entries
[0].update_field('execution_subdir', 'hostless')
1873 super(HostlessQueueTask
, self
).prolog()
1876 def _finish_task(self
):
1877 super(HostlessQueueTask
, self
)._finish
_task
()
1878 self
.queue_entries
[0].set_status(models
.HostQueueEntry
.Status
.PARSING
)
1881 class PostJobTask(AgentTask
):
1882 def __init__(self
, queue_entries
, log_file_name
):
1883 super(PostJobTask
, self
).__init
__(log_file_name
=log_file_name
)
1885 self
.queue_entries
= queue_entries
1887 self
._autoserv
_monitor
= PidfileRunMonitor()
1888 self
._autoserv
_monitor
.attach_to_existing_process(
1889 self
._working
_directory
())
1892 def _command_line(self
):
1895 return self
._generate
_command
(
1896 _drone_manager
.absolute_path(self
._working
_directory
()))
1899 def _generate_command(self
, results_dir
):
1900 raise NotImplementedError('Subclasses must override this')
1904 def owner_username(self
):
1905 return self
.queue_entries
[0].job
.owner
1908 def _working_directory(self
):
1909 return self
._get
_consistent
_execution
_path
(self
.queue_entries
)
1912 def _paired_with_monitor(self
):
1913 return self
._autoserv
_monitor
1916 def _job_was_aborted(self
):
1918 for queue_entry
in self
.queue_entries
:
1919 queue_entry
.update_from_database()
1920 if was_aborted
is None: # first queue entry
1921 was_aborted
= bool(queue_entry
.aborted
)
1922 elif was_aborted
!= bool(queue_entry
.aborted
): # subsequent entries
1923 entries
= ['%s (aborted: %s)' % (entry
, entry
.aborted
)
1924 for entry
in self
.queue_entries
]
1925 email_manager
.manager
.enqueue_notify_email(
1926 'Inconsistent abort state',
1927 'Queue entries have inconsistent abort state:\n' +
1929 # don't crash here, just assume true
1934 def _final_status(self
):
1935 if self
._job
_was
_aborted
():
1936 return models
.HostQueueEntry
.Status
.ABORTED
1938 # we'll use a PidfileRunMonitor to read the autoserv exit status
1939 if self
._autoserv
_monitor
.exit_code() == 0:
1940 return models
.HostQueueEntry
.Status
.COMPLETED
1941 return models
.HostQueueEntry
.Status
.FAILED
1944 def _set_all_statuses(self
, status
):
1945 for queue_entry
in self
.queue_entries
:
1946 queue_entry
.set_status(status
)
1950 # override AgentTask.abort() to avoid killing the process and ending
1951 # the task. post-job tasks continue when the job is aborted.
1955 def _pidfile_label(self
):
1956 # '.autoserv_execute' -> 'autoserv'
1957 return self
._pidfile
_name
()[1:-len('_execute')]
1960 class GatherLogsTask(PostJobTask
):
1962 Task responsible for
1963 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1964 * copying logs to the results repository
1965 * spawning CleanupTasks for hosts, if necessary
1966 * spawning a FinalReparseTask for the job
1968 def __init__(self
, queue_entries
, recover_run_monitor
=None):
1969 self
._job
= queue_entries
[0].job
1970 super(GatherLogsTask
, self
).__init
__(
1971 queue_entries
, log_file_name
='.collect_crashinfo.log')
1972 self
._set
_ids
(queue_entries
=queue_entries
)
1975 def _generate_command(self
, results_dir
):
1976 host_list
= ','.join(queue_entry
.host
.hostname
1977 for queue_entry
in self
.queue_entries
)
1978 return [_autoserv_path
, '-p',
1979 '--pidfile-label=%s' % self
._pidfile
_label
(),
1980 '--use-existing-results', '--collect-crashinfo',
1981 '-m', host_list
, '-r', results_dir
]
1985 def num_processes(self
):
1986 return len(self
.queue_entries
)
1989 def _pidfile_name(self
):
1990 return drone_manager
.CRASHINFO_PID_FILE
1994 self
._check
_queue
_entry
_statuses
(
1996 allowed_hqe_statuses
=(models
.HostQueueEntry
.Status
.GATHERING
,),
1997 allowed_host_statuses
=(models
.Host
.Status
.RUNNING
,))
1999 super(GatherLogsTask
, self
).prolog()
2003 super(GatherLogsTask
, self
).epilog()
2004 self
._parse
_results
(self
.queue_entries
)
2005 self
._reboot
_hosts
()
2008 def _reboot_hosts(self
):
2009 if self
._autoserv
_monitor
.has_process():
2010 final_success
= (self
._final
_status
() ==
2011 models
.HostQueueEntry
.Status
.COMPLETED
)
2012 num_tests_failed
= self
._autoserv
_monitor
.num_tests_failed()
2014 final_success
= False
2015 num_tests_failed
= 0
2017 reboot_after
= self
._job
.reboot_after
2019 # always reboot after aborted jobs
2020 self
._final
_status
() == models
.HostQueueEntry
.Status
.ABORTED
2021 or reboot_after
== model_attributes
.RebootAfter
.ALWAYS
2022 or (reboot_after
== model_attributes
.RebootAfter
.IF_ALL_TESTS_PASSED
2023 and final_success
and num_tests_failed
== 0))
2025 for queue_entry
in self
.queue_entries
:
2027 # don't pass the queue entry to the CleanupTask. if the cleanup
2028 # fails, the job doesn't care -- it's over.
2029 models
.SpecialTask
.objects
.create(
2030 host
=models
.Host
.objects
.get(id=queue_entry
.host
.id),
2031 task
=models
.SpecialTask
.Task
.CLEANUP
,
2032 requested_by
=self
._job
.owner_model())
2034 queue_entry
.host
.set_status(models
.Host
.Status
.READY
)
2038 autoserv_exit_code
= self
._autoserv
_monitor
.exit_code()
2039 # only run if Autoserv exited due to some signal. if we have no exit
2040 # code, assume something bad (and signal-like) happened.
2041 if autoserv_exit_code
is None or os
.WIFSIGNALED(autoserv_exit_code
):
2042 super(GatherLogsTask
, self
).run()
2047 class SelfThrottledPostJobTask(PostJobTask
):
2049 Special AgentTask subclass that maintains its own global process limit.
2051 _num_running_processes
= 0
2055 def _increment_running_processes(cls
):
2056 cls
._num
_running
_processes
+= 1
2060 def _decrement_running_processes(cls
):
2061 cls
._num
_running
_processes
-= 1
2065 def _max_processes(cls
):
2066 raise NotImplementedError
2070 def _can_run_new_process(cls
):
2071 return cls
._num
_running
_processes
< cls
._max
_processes
()
2074 def _process_started(self
):
2075 return bool(self
.monitor
)
2079 # override tick to keep trying to start until the process count goes
2080 # down and we can, at which point we revert to default behavior
2081 if self
._process
_started
():
2082 super(SelfThrottledPostJobTask
, self
).tick()
2084 self
._try
_starting
_process
()
2088 # override run() to not actually run unless we can
2089 self
._try
_starting
_process
()
2092 def _try_starting_process(self
):
2093 if not self
._can
_run
_new
_process
():
2096 # actually run the command
2097 super(SelfThrottledPostJobTask
, self
).run()
2098 if self
._process
_started
():
2099 self
._increment
_running
_processes
()
2102 def finished(self
, success
):
2103 super(SelfThrottledPostJobTask
, self
).finished(success
)
2104 if self
._process
_started
():
2105 self
._decrement
_running
_processes
()
2108 class FinalReparseTask(SelfThrottledPostJobTask
):
2109 def __init__(self
, queue_entries
):
2110 super(FinalReparseTask
, self
).__init
__(queue_entries
,
2111 log_file_name
='.parse.log')
2112 # don't use _set_ids, since we don't want to set the host_ids
2113 self
.queue_entry_ids
= [entry
.id for entry
in queue_entries
]
2116 def _generate_command(self
, results_dir
):
2117 return [_parser_path
, '--write-pidfile', '-l', '2', '-r', '-o',
2122 def num_processes(self
):
2123 return 0 # don't include parser processes in accounting
2126 def _pidfile_name(self
):
2127 return drone_manager
.PARSER_PID_FILE
2131 def _max_processes(cls
):
2132 return scheduler_config
.config
.max_parse_processes
2136 self
._check
_queue
_entry
_statuses
(
2138 allowed_hqe_statuses
=(models
.HostQueueEntry
.Status
.PARSING
,))
2140 super(FinalReparseTask
, self
).prolog()
2144 super(FinalReparseTask
, self
).epilog()
2145 self
._archive
_results
(self
.queue_entries
)
2148 class ArchiveResultsTask(SelfThrottledPostJobTask
):
2149 _ARCHIVING_FAILED_FILE
= '.archiver_failed'
2151 def __init__(self
, queue_entries
):
2152 super(ArchiveResultsTask
, self
).__init
__(queue_entries
,
2153 log_file_name
='.archiving.log')
2154 # don't use _set_ids, since we don't want to set the host_ids
2155 self
.queue_entry_ids
= [entry
.id for entry
in queue_entries
]
2158 def _pidfile_name(self
):
2159 return drone_manager
.ARCHIVER_PID_FILE
2162 def _generate_command(self
, results_dir
):
2163 return [_autoserv_path
, '-p',
2164 '--pidfile-label=%s' % self
._pidfile
_label
(), '-r', results_dir
,
2165 '--use-existing-results', '--control-filename=control.archive',
2166 os
.path
.join(drones
.AUTOTEST_INSTALL_DIR
, 'scheduler',
2167 'archive_results.control.srv')]
2171 def _max_processes(cls
):
2172 return scheduler_config
.config
.max_transfer_processes
2176 self
._check
_queue
_entry
_statuses
(
2178 allowed_hqe_statuses
=(models
.HostQueueEntry
.Status
.ARCHIVING
,))
2180 super(ArchiveResultsTask
, self
).prolog()
2184 super(ArchiveResultsTask
, self
).epilog()
2185 if not self
.success
and self
._paired
_with
_monitor
().has_process():
2186 failed_file
= os
.path
.join(self
._working
_directory
(),
2187 self
._ARCHIVING
_FAILED
_FILE
)
2188 paired_process
= self
._paired
_with
_monitor
().get_process()
2189 _drone_manager
.write_lines_to_file(
2190 failed_file
, ['Archiving failed with exit code %s'
2191 % self
.monitor
.exit_code()],
2192 paired_with_process
=paired_process
)
2193 self
._set
_all
_statuses
(self
._final
_status
())
2196 if __name__
== '__main__':