2 Autotest scheduling utility.
8 from autotest_lib
.client
.common_lib
import global_config
, utils
9 from autotest_lib
.frontend
.afe
import models
10 from autotest_lib
.scheduler
import metahost_scheduler
, scheduler_config
11 from autotest_lib
.scheduler
import scheduler_models
14 get_site_metahost_schedulers
= utils
.import_site_function(
15 __file__
, 'autotest_lib.scheduler.site_metahost_scheduler',
16 'get_metahost_schedulers', lambda : ())
19 class SchedulerError(Exception):
20 """Raised by HostScheduler when an inconsistent state occurs."""
23 class BaseHostScheduler(metahost_scheduler
.HostSchedulingUtility
):
24 """Handles the logic for choosing when to run jobs and on which hosts.
26 This class makes several queries to the database on each tick, building up
27 some auxiliary data structures and using them to determine which hosts are
28 eligible to run which jobs, taking into account all the various factors that
31 In the past this was done with one or two very large, complex database
32 queries. It has proven much simpler and faster to build these auxiliary
33 data structures and perform the logic in Python.
35 def __init__(self
, db
):
37 self
._metahost
_schedulers
= metahost_scheduler
.get_metahost_schedulers()
39 # load site-specific scheduler selected in global_config
40 site_schedulers_str
= global_config
.global_config
.get_config_value(
41 scheduler_config
.CONFIG_SECTION
, 'site_metahost_schedulers',
43 site_schedulers
= set(site_schedulers_str
.split(','))
44 for scheduler
in get_site_metahost_schedulers():
45 if type(scheduler
).__name
__ in site_schedulers
:
46 # always prepend, so site schedulers take precedence
47 self
._metahost
_schedulers
= (
48 [scheduler
] + self
._metahost
_schedulers
)
49 logging
.info('Metahost schedulers: %s',
50 ', '.join(type(scheduler
).__name
__ for scheduler
51 in self
._metahost
_schedulers
))
54 def _get_ready_hosts(self
):
55 # avoid any host with a currently active queue entry against it
56 hosts
= scheduler_models
.Host
.fetch(
57 joins
='LEFT JOIN afe_host_queue_entries AS active_hqe '
58 'ON (afe_hosts.id = active_hqe.host_id AND '
60 where
="active_hqe.host_id IS NULL "
61 "AND NOT afe_hosts.locked "
62 "AND (afe_hosts.status IS NULL "
63 "OR afe_hosts.status = 'Ready')")
64 return dict((host
.id, host
) for host
in hosts
)
67 def _get_sql_id_list(self
, id_list
):
68 return ','.join(str(item_id
) for item_id
in id_list
)
71 def _get_many2many_dict(self
, query
, id_list
, flip
=False):
74 query
%= self
._get
_sql
_id
_list
(id_list
)
75 rows
= self
._db
.execute(query
)
76 return self
._process
_many
2many
_dict
(rows
, flip
)
79 def _process_many2many_dict(self
, rows
, flip
=False):
82 left_id
, right_id
= int(row
[0]), int(row
[1])
84 left_id
, right_id
= right_id
, left_id
85 result
.setdefault(left_id
, set()).add(right_id
)
89 def _get_job_acl_groups(self
, job_ids
):
91 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
93 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
94 INNER JOIN afe_acl_groups_users ON
95 afe_acl_groups_users.user_id = afe_users.id
96 WHERE afe_jobs.id IN (%s)
98 return self
._get
_many
2many
_dict
(query
, job_ids
)
101 def _get_job_ineligible_hosts(self
, job_ids
):
103 SELECT job_id, host_id
104 FROM afe_ineligible_host_queues
107 return self
._get
_many
2many
_dict
(query
, job_ids
)
110 def _get_job_dependencies(self
, job_ids
):
112 SELECT job_id, label_id
113 FROM afe_jobs_dependency_labels
116 return self
._get
_many
2many
_dict
(query
, job_ids
)
119 def _get_host_acls(self
, host_ids
):
121 SELECT host_id, aclgroup_id
122 FROM afe_acl_groups_hosts
123 WHERE host_id IN (%s)
125 return self
._get
_many
2many
_dict
(query
, host_ids
)
128 def _get_label_hosts(self
, host_ids
):
132 SELECT label_id, host_id
133 FROM afe_hosts_labels
134 WHERE host_id IN (%s)
135 """ % self
._get
_sql
_id
_list
(host_ids
)
136 rows
= self
._db
.execute(query
)
137 labels_to_hosts
= self
._process
_many
2many
_dict
(rows
)
138 hosts_to_labels
= self
._process
_many
2many
_dict
(rows
, flip
=True)
139 return labels_to_hosts
, hosts_to_labels
142 def _get_labels(self
):
143 return dict((label
.id, label
) for label
144 in scheduler_models
.Label
.fetch())
147 def recovery_on_startup(self
):
148 for metahost_scheduler
in self
._metahost
_schedulers
:
149 metahost_scheduler
.recovery_on_startup()
152 def refresh(self
, pending_queue_entries
):
153 self
._hosts
_available
= self
._get
_ready
_hosts
()
155 relevant_jobs
= [queue_entry
.job_id
156 for queue_entry
in pending_queue_entries
]
157 self
._job
_acls
= self
._get
_job
_acl
_groups
(relevant_jobs
)
158 self
._ineligible
_hosts
= self
._get
_job
_ineligible
_hosts
(relevant_jobs
)
159 self
._job
_dependencies
= self
._get
_job
_dependencies
(relevant_jobs
)
161 host_ids
= self
._hosts
_available
.keys()
162 self
._host
_acls
= self
._get
_host
_acls
(host_ids
)
163 self
._label
_hosts
, self
._host
_labels
= self
._get
_label
_hosts
(host_ids
)
165 self
._labels
= self
._get
_labels
()
169 for metahost_scheduler
in self
._metahost
_schedulers
:
170 metahost_scheduler
.tick()
173 def hosts_in_label(self
, label_id
):
174 return set(self
._label
_hosts
.get(label_id
, ()))
177 def remove_host_from_label(self
, host_id
, label_id
):
178 self
._label
_hosts
[label_id
].remove(host_id
)
181 def pop_host(self
, host_id
):
182 return self
._hosts
_available
.pop(host_id
)
185 def ineligible_hosts_for_entry(self
, queue_entry
):
186 return set(self
._ineligible
_hosts
.get(queue_entry
.job_id
, ()))
189 def _is_acl_accessible(self
, host_id
, queue_entry
):
190 job_acls
= self
._job
_acls
.get(queue_entry
.job_id
, set())
191 host_acls
= self
._host
_acls
.get(host_id
, set())
192 return len(host_acls
.intersection(job_acls
)) > 0
195 def _check_job_dependencies(self
, job_dependencies
, host_labels
):
196 missing
= job_dependencies
- host_labels
197 return len(missing
) == 0
200 def _check_only_if_needed_labels(self
, job_dependencies
, host_labels
,
202 if not queue_entry
.meta_host
:
203 # bypass only_if_needed labels when a specific host is selected
206 for label_id
in host_labels
:
207 label
= self
._labels
[label_id
]
208 if not label
.only_if_needed
:
209 # we don't care about non-only_if_needed labels
211 if queue_entry
.meta_host
== label_id
:
212 # if the label was requested in a metahost it's OK
214 if label_id
not in job_dependencies
:
219 def _check_atomic_group_labels(self
, host_labels
, queue_entry
):
221 Determine if the given HostQueueEntry's atomic group settings are okay
222 to schedule on a host with the given labels.
224 @param host_labels: A list of label ids that the host has.
225 @param queue_entry: The HostQueueEntry being considered for the host.
227 @returns True if atomic group settings are okay, False otherwise.
229 return (self
._get
_host
_atomic
_group
_id
(host_labels
, queue_entry
) ==
230 queue_entry
.atomic_group_id
)
233 def _get_host_atomic_group_id(self
, host_labels
, queue_entry
=None):
235 Return the atomic group label id for a host with the given set of
236 labels if any, or None otherwise. Raises an exception if more than
237 one atomic group are found in the set of labels.
239 @param host_labels: A list of label ids that the host has.
240 @param queue_entry: The HostQueueEntry we're testing. Only used for
241 extra info in a potential logged error message.
243 @returns The id of the atomic group found on a label in host_labels
244 or None if no atomic group label is found.
246 atomic_labels
= [self
._labels
[label_id
] for label_id
in host_labels
247 if self
._labels
[label_id
].atomic_group_id
is not None]
248 atomic_ids
= set(label
.atomic_group_id
for label
in atomic_labels
)
251 if len(atomic_ids
) > 1:
252 logging
.error('More than one Atomic Group on HQE "%s" via: %r',
253 queue_entry
, atomic_labels
)
254 return atomic_ids
.pop()
257 def _get_atomic_group_labels(self
, atomic_group_id
):
259 Lookup the label ids that an atomic_group is associated with.
261 @param atomic_group_id - The id of the AtomicGroup to look up.
263 @returns A generator yeilding Label ids for this atomic group.
265 return (id for id, label
in self
._labels
.iteritems()
266 if label
.atomic_group_id
== atomic_group_id
267 and not label
.invalid
)
270 def _get_eligible_host_ids_in_group(self
, group_hosts
, queue_entry
):
272 @param group_hosts - A sequence of Host ids to test for usability
273 and eligibility against the Job associated with queue_entry.
274 @param queue_entry - The HostQueueEntry that these hosts are being
275 tested for eligibility against.
277 @returns A subset of group_hosts Host ids that are eligible for the
278 supplied queue_entry.
280 return set(host_id
for host_id
in group_hosts
281 if self
.is_host_usable(host_id
)
282 and self
.is_host_eligible_for_job(host_id
, queue_entry
))
285 def is_host_eligible_for_job(self
, host_id
, queue_entry
):
286 if self
._is
_host
_invalid
(host_id
):
287 # if an invalid host is scheduled for a job, it's a one-time host
288 # and it therefore bypasses eligibility checks. note this can only
289 # happen for non-metahosts, because invalid hosts have their label
290 # relationships cleared.
293 job_dependencies
= self
._job
_dependencies
.get(queue_entry
.job_id
, set())
294 host_labels
= self
._host
_labels
.get(host_id
, set())
296 return (self
._is
_acl
_accessible
(host_id
, queue_entry
) and
297 self
._check
_job
_dependencies
(job_dependencies
, host_labels
) and
298 self
._check
_only
_if
_needed
_labels
(
299 job_dependencies
, host_labels
, queue_entry
) and
300 self
._check
_atomic
_group
_labels
(host_labels
, queue_entry
))
303 def _is_host_invalid(self
, host_id
):
304 host_object
= self
._hosts
_available
.get(host_id
, None)
305 return host_object
and host_object
.invalid
308 def _schedule_non_metahost(self
, queue_entry
):
309 if not self
.is_host_eligible_for_job(queue_entry
.host_id
, queue_entry
):
311 return self
._hosts
_available
.pop(queue_entry
.host_id
, None)
314 def is_host_usable(self
, host_id
):
315 if host_id
not in self
._hosts
_available
:
316 # host was already used during this scheduling cycle
318 if self
._hosts
_available
[host_id
].invalid
:
319 # Invalid hosts cannot be used for metahosts. They're included in
320 # the original query because they can be used by non-metahosts.
325 def schedule_entry(self
, queue_entry
):
326 if queue_entry
.host_id
is not None:
327 return self
._schedule
_non
_metahost
(queue_entry
)
329 for scheduler
in self
._metahost
_schedulers
:
330 if scheduler
.can_schedule_metahost(queue_entry
):
331 scheduler
.schedule_metahost(queue_entry
, self
)
334 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry
)
337 def find_eligible_atomic_group(self
, queue_entry
):
339 Given an atomic group host queue entry, locate an appropriate group
340 of hosts for the associated job to run on.
342 The caller is responsible for creating new HQEs for the additional
343 hosts returned in order to run the actual job on them.
345 @returns A list of Host instances in a ready state to satisfy this
346 atomic group scheduling. Hosts will all belong to the same
347 atomic group label as specified by the queue_entry.
348 An empty list will be returned if no suitable atomic
349 group could be found.
351 TODO(gps): what is responsible for kicking off any attempted repairs on
352 a group of hosts? not this function, but something needs to. We do
353 not communicate that reason for returning [] outside of here...
354 For now, we'll just be unschedulable if enough hosts within one group
355 enter Repair Failed state.
357 assert queue_entry
.atomic_group_id
is not None
358 job
= queue_entry
.job
359 assert job
.synch_count
and job
.synch_count
> 0
360 atomic_group
= queue_entry
.atomic_group
361 if job
.synch_count
> atomic_group
.max_number_of_machines
:
362 # Such a Job and HostQueueEntry should never be possible to
363 # create using the frontend. Regardless, we can't process it.
364 # Abort it immediately and log an error on the scheduler.
365 queue_entry
.set_status(models
.HostQueueEntry
.Status
.ABORTED
)
367 'Error: job %d synch_count=%d > requested atomic_group %d '
368 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
369 job
.id, job
.synch_count
, atomic_group
.id,
370 atomic_group
.max_number_of_machines
, queue_entry
.id)
372 hosts_in_label
= self
.hosts_in_label(queue_entry
.meta_host
)
373 ineligible_host_ids
= self
.ineligible_hosts_for_entry(queue_entry
)
375 # Look in each label associated with atomic_group until we find one with
376 # enough hosts to satisfy the job.
377 for atomic_label_id
in self
._get
_atomic
_group
_labels
(atomic_group
.id):
378 group_hosts
= set(self
.hosts_in_label(atomic_label_id
))
379 if queue_entry
.meta_host
is not None:
380 # If we have a metahost label, only allow its hosts.
381 group_hosts
.intersection_update(hosts_in_label
)
382 group_hosts
-= ineligible_host_ids
383 eligible_host_ids_in_group
= self
._get
_eligible
_host
_ids
_in
_group
(
384 group_hosts
, queue_entry
)
386 # Job.synch_count is treated as "minimum synch count" when
387 # scheduling for an atomic group of hosts. The atomic group
388 # number of machines is the maximum to pick out of a single
389 # atomic group label for scheduling at one time.
390 min_hosts
= job
.synch_count
391 max_hosts
= atomic_group
.max_number_of_machines
393 if len(eligible_host_ids_in_group
) < min_hosts
:
394 # Not enough eligible hosts in this atomic group label.
397 eligible_hosts_in_group
= [self
._hosts
_available
[id]
398 for id in eligible_host_ids_in_group
]
399 # So that they show up in a sane order when viewing the job.
400 eligible_hosts_in_group
.sort(cmp=scheduler_models
.Host
.cmp_for_sort
)
402 # Limit ourselves to scheduling the atomic group size.
403 if len(eligible_hosts_in_group
) > max_hosts
:
404 eligible_hosts_in_group
= eligible_hosts_in_group
[:max_hosts
]
406 # Remove the selected hosts from our cached internal state
407 # of available hosts in order to return the Host objects.
409 for host
in eligible_hosts_in_group
:
410 hosts_in_label
.discard(host
.id)
411 self
._hosts
_available
.pop(host
.id)
412 host_list
.append(host
)
418 site_host_scheduler
= utils
.import_site_class(
419 __file__
, 'autotest_lib.scheduler.site_host_scheduler',
420 'site_host_scheduler', BaseHostScheduler
)
423 class HostScheduler(site_host_scheduler
):