Moving test_setup.display_attributes to kvm_utils
[autotest-zwu.git] / scheduler / host_scheduler.py
blob8b56ee4b84f91a82c31e323ce81b2a39ac8a53d9
1 """
2 Autotest scheduling utility.
3 """
6 import logging
8 from autotest_lib.client.common_lib import global_config, utils
9 from autotest_lib.frontend.afe import models
10 from autotest_lib.scheduler import metahost_scheduler, scheduler_config
11 from autotest_lib.scheduler import scheduler_models
14 get_site_metahost_schedulers = utils.import_site_function(
15 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
16 'get_metahost_schedulers', lambda : ())
19 class SchedulerError(Exception):
20 """Raised by HostScheduler when an inconsistent state occurs."""
23 class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility):
24 """Handles the logic for choosing when to run jobs and on which hosts.
26 This class makes several queries to the database on each tick, building up
27 some auxiliary data structures and using them to determine which hosts are
28 eligible to run which jobs, taking into account all the various factors that
29 affect that.
31 In the past this was done with one or two very large, complex database
32 queries. It has proven much simpler and faster to build these auxiliary
33 data structures and perform the logic in Python.
34 """
35 def __init__(self, db):
36 self._db = db
37 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
39 # load site-specific scheduler selected in global_config
40 site_schedulers_str = global_config.global_config.get_config_value(
41 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
42 default='')
43 site_schedulers = set(site_schedulers_str.split(','))
44 for scheduler in get_site_metahost_schedulers():
45 if type(scheduler).__name__ in site_schedulers:
46 # always prepend, so site schedulers take precedence
47 self._metahost_schedulers = (
48 [scheduler] + self._metahost_schedulers)
49 logging.info('Metahost schedulers: %s',
50 ', '.join(type(scheduler).__name__ for scheduler
51 in self._metahost_schedulers))
54 def _get_ready_hosts(self):
55 # avoid any host with a currently active queue entry against it
56 hosts = scheduler_models.Host.fetch(
57 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
58 'ON (afe_hosts.id = active_hqe.host_id AND '
59 'active_hqe.active)',
60 where="active_hqe.host_id IS NULL "
61 "AND NOT afe_hosts.locked "
62 "AND (afe_hosts.status IS NULL "
63 "OR afe_hosts.status = 'Ready')")
64 return dict((host.id, host) for host in hosts)
67 def _get_sql_id_list(self, id_list):
68 return ','.join(str(item_id) for item_id in id_list)
71 def _get_many2many_dict(self, query, id_list, flip=False):
72 if not id_list:
73 return {}
74 query %= self._get_sql_id_list(id_list)
75 rows = self._db.execute(query)
76 return self._process_many2many_dict(rows, flip)
79 def _process_many2many_dict(self, rows, flip=False):
80 result = {}
81 for row in rows:
82 left_id, right_id = int(row[0]), int(row[1])
83 if flip:
84 left_id, right_id = right_id, left_id
85 result.setdefault(left_id, set()).add(right_id)
86 return result
89 def _get_job_acl_groups(self, job_ids):
90 query = """
91 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
92 FROM afe_jobs
93 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
94 INNER JOIN afe_acl_groups_users ON
95 afe_acl_groups_users.user_id = afe_users.id
96 WHERE afe_jobs.id IN (%s)
97 """
98 return self._get_many2many_dict(query, job_ids)
101 def _get_job_ineligible_hosts(self, job_ids):
102 query = """
103 SELECT job_id, host_id
104 FROM afe_ineligible_host_queues
105 WHERE job_id IN (%s)
107 return self._get_many2many_dict(query, job_ids)
110 def _get_job_dependencies(self, job_ids):
111 query = """
112 SELECT job_id, label_id
113 FROM afe_jobs_dependency_labels
114 WHERE job_id IN (%s)
116 return self._get_many2many_dict(query, job_ids)
119 def _get_host_acls(self, host_ids):
120 query = """
121 SELECT host_id, aclgroup_id
122 FROM afe_acl_groups_hosts
123 WHERE host_id IN (%s)
125 return self._get_many2many_dict(query, host_ids)
128 def _get_label_hosts(self, host_ids):
129 if not host_ids:
130 return {}, {}
131 query = """
132 SELECT label_id, host_id
133 FROM afe_hosts_labels
134 WHERE host_id IN (%s)
135 """ % self._get_sql_id_list(host_ids)
136 rows = self._db.execute(query)
137 labels_to_hosts = self._process_many2many_dict(rows)
138 hosts_to_labels = self._process_many2many_dict(rows, flip=True)
139 return labels_to_hosts, hosts_to_labels
142 def _get_labels(self):
143 return dict((label.id, label) for label
144 in scheduler_models.Label.fetch())
147 def recovery_on_startup(self):
148 for metahost_scheduler in self._metahost_schedulers:
149 metahost_scheduler.recovery_on_startup()
152 def refresh(self, pending_queue_entries):
153 self._hosts_available = self._get_ready_hosts()
155 relevant_jobs = [queue_entry.job_id
156 for queue_entry in pending_queue_entries]
157 self._job_acls = self._get_job_acl_groups(relevant_jobs)
158 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
159 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
161 host_ids = self._hosts_available.keys()
162 self._host_acls = self._get_host_acls(host_ids)
163 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
165 self._labels = self._get_labels()
168 def tick(self):
169 for metahost_scheduler in self._metahost_schedulers:
170 metahost_scheduler.tick()
173 def hosts_in_label(self, label_id):
174 return set(self._label_hosts.get(label_id, ()))
177 def remove_host_from_label(self, host_id, label_id):
178 self._label_hosts[label_id].remove(host_id)
181 def pop_host(self, host_id):
182 return self._hosts_available.pop(host_id)
185 def ineligible_hosts_for_entry(self, queue_entry):
186 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
189 def _is_acl_accessible(self, host_id, queue_entry):
190 job_acls = self._job_acls.get(queue_entry.job_id, set())
191 host_acls = self._host_acls.get(host_id, set())
192 return len(host_acls.intersection(job_acls)) > 0
195 def _check_job_dependencies(self, job_dependencies, host_labels):
196 missing = job_dependencies - host_labels
197 return len(missing) == 0
200 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
201 queue_entry):
202 if not queue_entry.meta_host:
203 # bypass only_if_needed labels when a specific host is selected
204 return True
206 for label_id in host_labels:
207 label = self._labels[label_id]
208 if not label.only_if_needed:
209 # we don't care about non-only_if_needed labels
210 continue
211 if queue_entry.meta_host == label_id:
212 # if the label was requested in a metahost it's OK
213 continue
214 if label_id not in job_dependencies:
215 return False
216 return True
219 def _check_atomic_group_labels(self, host_labels, queue_entry):
221 Determine if the given HostQueueEntry's atomic group settings are okay
222 to schedule on a host with the given labels.
224 @param host_labels: A list of label ids that the host has.
225 @param queue_entry: The HostQueueEntry being considered for the host.
227 @returns True if atomic group settings are okay, False otherwise.
229 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
230 queue_entry.atomic_group_id)
233 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
235 Return the atomic group label id for a host with the given set of
236 labels if any, or None otherwise. Raises an exception if more than
237 one atomic group are found in the set of labels.
239 @param host_labels: A list of label ids that the host has.
240 @param queue_entry: The HostQueueEntry we're testing. Only used for
241 extra info in a potential logged error message.
243 @returns The id of the atomic group found on a label in host_labels
244 or None if no atomic group label is found.
246 atomic_labels = [self._labels[label_id] for label_id in host_labels
247 if self._labels[label_id].atomic_group_id is not None]
248 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
249 if not atomic_ids:
250 return None
251 if len(atomic_ids) > 1:
252 logging.error('More than one Atomic Group on HQE "%s" via: %r',
253 queue_entry, atomic_labels)
254 return atomic_ids.pop()
257 def _get_atomic_group_labels(self, atomic_group_id):
259 Lookup the label ids that an atomic_group is associated with.
261 @param atomic_group_id - The id of the AtomicGroup to look up.
263 @returns A generator yeilding Label ids for this atomic group.
265 return (id for id, label in self._labels.iteritems()
266 if label.atomic_group_id == atomic_group_id
267 and not label.invalid)
270 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
272 @param group_hosts - A sequence of Host ids to test for usability
273 and eligibility against the Job associated with queue_entry.
274 @param queue_entry - The HostQueueEntry that these hosts are being
275 tested for eligibility against.
277 @returns A subset of group_hosts Host ids that are eligible for the
278 supplied queue_entry.
280 return set(host_id for host_id in group_hosts
281 if self.is_host_usable(host_id)
282 and self.is_host_eligible_for_job(host_id, queue_entry))
285 def is_host_eligible_for_job(self, host_id, queue_entry):
286 if self._is_host_invalid(host_id):
287 # if an invalid host is scheduled for a job, it's a one-time host
288 # and it therefore bypasses eligibility checks. note this can only
289 # happen for non-metahosts, because invalid hosts have their label
290 # relationships cleared.
291 return True
293 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
294 host_labels = self._host_labels.get(host_id, set())
296 return (self._is_acl_accessible(host_id, queue_entry) and
297 self._check_job_dependencies(job_dependencies, host_labels) and
298 self._check_only_if_needed_labels(
299 job_dependencies, host_labels, queue_entry) and
300 self._check_atomic_group_labels(host_labels, queue_entry))
303 def _is_host_invalid(self, host_id):
304 host_object = self._hosts_available.get(host_id, None)
305 return host_object and host_object.invalid
308 def _schedule_non_metahost(self, queue_entry):
309 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
310 return None
311 return self._hosts_available.pop(queue_entry.host_id, None)
314 def is_host_usable(self, host_id):
315 if host_id not in self._hosts_available:
316 # host was already used during this scheduling cycle
317 return False
318 if self._hosts_available[host_id].invalid:
319 # Invalid hosts cannot be used for metahosts. They're included in
320 # the original query because they can be used by non-metahosts.
321 return False
322 return True
325 def schedule_entry(self, queue_entry):
326 if queue_entry.host_id is not None:
327 return self._schedule_non_metahost(queue_entry)
329 for scheduler in self._metahost_schedulers:
330 if scheduler.can_schedule_metahost(queue_entry):
331 scheduler.schedule_metahost(queue_entry, self)
332 return None
334 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
337 def find_eligible_atomic_group(self, queue_entry):
339 Given an atomic group host queue entry, locate an appropriate group
340 of hosts for the associated job to run on.
342 The caller is responsible for creating new HQEs for the additional
343 hosts returned in order to run the actual job on them.
345 @returns A list of Host instances in a ready state to satisfy this
346 atomic group scheduling. Hosts will all belong to the same
347 atomic group label as specified by the queue_entry.
348 An empty list will be returned if no suitable atomic
349 group could be found.
351 TODO(gps): what is responsible for kicking off any attempted repairs on
352 a group of hosts? not this function, but something needs to. We do
353 not communicate that reason for returning [] outside of here...
354 For now, we'll just be unschedulable if enough hosts within one group
355 enter Repair Failed state.
357 assert queue_entry.atomic_group_id is not None
358 job = queue_entry.job
359 assert job.synch_count and job.synch_count > 0
360 atomic_group = queue_entry.atomic_group
361 if job.synch_count > atomic_group.max_number_of_machines:
362 # Such a Job and HostQueueEntry should never be possible to
363 # create using the frontend. Regardless, we can't process it.
364 # Abort it immediately and log an error on the scheduler.
365 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
366 logging.error(
367 'Error: job %d synch_count=%d > requested atomic_group %d '
368 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
369 job.id, job.synch_count, atomic_group.id,
370 atomic_group.max_number_of_machines, queue_entry.id)
371 return []
372 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
373 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
375 # Look in each label associated with atomic_group until we find one with
376 # enough hosts to satisfy the job.
377 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
378 group_hosts = set(self.hosts_in_label(atomic_label_id))
379 if queue_entry.meta_host is not None:
380 # If we have a metahost label, only allow its hosts.
381 group_hosts.intersection_update(hosts_in_label)
382 group_hosts -= ineligible_host_ids
383 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
384 group_hosts, queue_entry)
386 # Job.synch_count is treated as "minimum synch count" when
387 # scheduling for an atomic group of hosts. The atomic group
388 # number of machines is the maximum to pick out of a single
389 # atomic group label for scheduling at one time.
390 min_hosts = job.synch_count
391 max_hosts = atomic_group.max_number_of_machines
393 if len(eligible_host_ids_in_group) < min_hosts:
394 # Not enough eligible hosts in this atomic group label.
395 continue
397 eligible_hosts_in_group = [self._hosts_available[id]
398 for id in eligible_host_ids_in_group]
399 # So that they show up in a sane order when viewing the job.
400 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
402 # Limit ourselves to scheduling the atomic group size.
403 if len(eligible_hosts_in_group) > max_hosts:
404 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
406 # Remove the selected hosts from our cached internal state
407 # of available hosts in order to return the Host objects.
408 host_list = []
409 for host in eligible_hosts_in_group:
410 hosts_in_label.discard(host.id)
411 self._hosts_available.pop(host.id)
412 host_list.append(host)
413 return host_list
415 return []
418 site_host_scheduler = utils.import_site_class(
419 __file__, 'autotest_lib.scheduler.site_host_scheduler',
420 'site_host_scheduler', BaseHostScheduler)
423 class HostScheduler(site_host_scheduler):
424 pass