KVM test: installer: Fix KojiInstaller bug
[autotest-zwu.git] / scheduler / scheduler_models.py
blob870914d5de0b2a3d3aa76d211b15058762ac059d
1 """Database model classes for the scheduler.
3 Contains model classes abstracting the various DB tables used by the scheduler.
4 These overlap the Django models in basic functionality, but were written before
5 the Django models existed and have not yet been phased out. Some of them
6 (particularly HostQueueEntry and Job) have considerable scheduler-specific logic
7 which would probably be ill-suited for inclusion in the general Django model
8 classes.
10 Globals:
11 _notify_email_statuses: list of HQE statuses. each time a single HQE reaches
12 one of these statuses, an email will be sent to the job's email_list.
13 comes from global_config.
14 _base_url: URL to the local AFE server, used to construct URLs for emails.
15 _db: DatabaseConnection for this module.
16 _drone_manager: reference to global DroneManager instance.
17 """
19 import datetime, itertools, logging, os, re, sys, time, weakref
20 from django.db import connection
21 from autotest_lib.client.common_lib import global_config, host_protections
22 from autotest_lib.client.common_lib import global_config, utils
23 from autotest_lib.frontend.afe import models, model_attributes
24 from autotest_lib.database import database_connection
25 from autotest_lib.scheduler import drone_manager, email_manager
26 from autotest_lib.scheduler import scheduler_config
28 _notify_email_statuses = []
29 _base_url = None
31 _db = None
32 _drone_manager = None
34 def initialize():
35 global _db
36 _db = database_connection.DatabaseConnection('AUTOTEST_WEB')
37 _db.connect(db_type='django')
39 notify_statuses_list = global_config.global_config.get_config_value(
40 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
41 default='')
42 global _notify_email_statuses
43 _notify_email_statuses = [status for status in
44 re.split(r'[\s,;:]', notify_statuses_list.lower())
45 if status]
47 # AUTOTEST_WEB.base_url is still a supported config option as some people
48 # may wish to override the entire url.
49 global _base_url
50 config_base_url = global_config.global_config.get_config_value(
51 scheduler_config.CONFIG_SECTION, 'base_url', default='')
52 if config_base_url:
53 _base_url = config_base_url
54 else:
55 # For the common case of everything running on a single server you
56 # can just set the hostname in a single place in the config file.
57 server_name = global_config.global_config.get_config_value(
58 'SERVER', 'hostname')
59 if not server_name:
60 logging.critical('[SERVER] hostname missing from the config file.')
61 sys.exit(1)
62 _base_url = 'http://%s/afe/' % server_name
64 initialize_globals()
67 def initialize_globals():
68 global _drone_manager
69 _drone_manager = drone_manager.instance()
72 class DelayedCallTask(object):
73 """
74 A task object like AgentTask for an Agent to run that waits for the
75 specified amount of time to have elapsed before calling the supplied
76 callback once and finishing. If the callback returns anything, it is
77 assumed to be a new Agent instance and will be added to the dispatcher.
79 @attribute end_time: The absolute posix time after which this task will
80 call its callback when it is polled and be finished.
82 Also has all attributes required by the Agent class.
83 """
84 def __init__(self, delay_seconds, callback, now_func=None):
85 """
86 @param delay_seconds: The delay in seconds from now that this task
87 will call the supplied callback and be done.
88 @param callback: A callable to be called by this task once after at
89 least delay_seconds time has elapsed. It must return None
90 or a new Agent instance.
91 @param now_func: A time.time like function. Default: time.time.
92 Used for testing.
93 """
94 assert delay_seconds > 0
95 assert callable(callback)
96 if not now_func:
97 now_func = time.time
98 self._now_func = now_func
99 self._callback = callback
101 self.end_time = self._now_func() + delay_seconds
103 # These attributes are required by Agent.
104 self.aborted = False
105 self.host_ids = ()
106 self.success = False
107 self.queue_entry_ids = ()
108 self.num_processes = 0
111 def poll(self):
112 if not self.is_done() and self._now_func() >= self.end_time:
113 self._callback()
114 self.success = True
117 def is_done(self):
118 return self.success or self.aborted
121 def abort(self):
122 self.aborted = True
125 class DBError(Exception):
126 """Raised by the DBObject constructor when its select fails."""
129 class DBObject(object):
130 """A miniature object relational model for the database."""
132 # Subclasses MUST override these:
133 _table_name = ''
134 _fields = ()
136 # A mapping from (type, id) to the instance of the object for that
137 # particular id. This prevents us from creating new Job() and Host()
138 # instances for every HostQueueEntry object that we instantiate as
139 # multiple HQEs often share the same Job.
140 _instances_by_type_and_id = weakref.WeakValueDictionary()
141 _initialized = False
144 def __new__(cls, id=None, **kwargs):
146 Look to see if we already have an instance for this particular type
147 and id. If so, use it instead of creating a duplicate instance.
149 if id is not None:
150 instance = cls._instances_by_type_and_id.get((cls, id))
151 if instance:
152 return instance
153 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
156 def __init__(self, id=None, row=None, new_record=False, always_query=True):
157 assert bool(id) or bool(row)
158 if id is not None and row is not None:
159 assert id == row[0]
160 assert self._table_name, '_table_name must be defined in your class'
161 assert self._fields, '_fields must be defined in your class'
162 if not new_record:
163 if self._initialized and not always_query:
164 return # We've already been initialized.
165 if id is None:
166 id = row[0]
167 # Tell future constructors to use us instead of re-querying while
168 # this instance is still around.
169 self._instances_by_type_and_id[(type(self), id)] = self
171 self.__table = self._table_name
173 self.__new_record = new_record
175 if row is None:
176 row = self._fetch_row_from_db(id)
178 if self._initialized:
179 differences = self._compare_fields_in_row(row)
180 if differences:
181 logging.warn(
182 'initialized %s %s instance requery is updating: %s',
183 type(self), self.id, differences)
184 self._update_fields_from_row(row)
185 self._initialized = True
188 @classmethod
189 def _clear_instance_cache(cls):
190 """Used for testing, clear the internal instance cache."""
191 cls._instances_by_type_and_id.clear()
194 def _fetch_row_from_db(self, row_id):
195 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
196 rows = _db.execute(sql, (row_id,))
197 if not rows:
198 raise DBError("row not found (table=%s, row id=%s)"
199 % (self.__table, row_id))
200 return rows[0]
203 def _assert_row_length(self, row):
204 assert len(row) == len(self._fields), (
205 "table = %s, row = %s/%d, fields = %s/%d" % (
206 self.__table, row, len(row), self._fields, len(self._fields)))
209 def _compare_fields_in_row(self, row):
211 Given a row as returned by a SELECT query, compare it to our existing in
212 memory fields. Fractional seconds are stripped from datetime values
213 before comparison.
215 @param row - A sequence of values corresponding to fields named in
216 The class attribute _fields.
218 @returns A dictionary listing the differences keyed by field name
219 containing tuples of (current_value, row_value).
221 self._assert_row_length(row)
222 differences = {}
223 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
224 for field, row_value in itertools.izip(self._fields, row):
225 current_value = getattr(self, field)
226 if (isinstance(current_value, datetime.datetime)
227 and isinstance(row_value, datetime.datetime)):
228 current_value = current_value.strftime(datetime_cmp_fmt)
229 row_value = row_value.strftime(datetime_cmp_fmt)
230 if current_value != row_value:
231 differences[field] = (current_value, row_value)
232 return differences
235 def _update_fields_from_row(self, row):
237 Update our field attributes using a single row returned by SELECT.
239 @param row - A sequence of values corresponding to fields named in
240 the class fields list.
242 self._assert_row_length(row)
244 self._valid_fields = set()
245 for field, value in itertools.izip(self._fields, row):
246 setattr(self, field, value)
247 self._valid_fields.add(field)
249 self._valid_fields.remove('id')
252 def update_from_database(self):
253 assert self.id is not None
254 row = self._fetch_row_from_db(self.id)
255 self._update_fields_from_row(row)
258 def count(self, where, table = None):
259 if not table:
260 table = self.__table
262 rows = _db.execute("""
263 SELECT count(*) FROM %s
264 WHERE %s
265 """ % (table, where))
267 assert len(rows) == 1
269 return int(rows[0][0])
272 def update_field(self, field, value):
273 assert field in self._valid_fields
275 if getattr(self, field) == value:
276 return
278 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
279 _db.execute(query, (value, self.id))
281 setattr(self, field, value)
284 def save(self):
285 if self.__new_record:
286 keys = self._fields[1:] # avoid id
287 columns = ','.join([str(key) for key in keys])
288 values = []
289 for key in keys:
290 value = getattr(self, key)
291 if value is None:
292 values.append('NULL')
293 else:
294 values.append('"%s"' % value)
295 values_str = ','.join(values)
296 query = ('INSERT INTO %s (%s) VALUES (%s)' %
297 (self.__table, columns, values_str))
298 _db.execute(query)
299 # Update our id to the one the database just assigned to us.
300 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
303 def delete(self):
304 self._instances_by_type_and_id.pop((type(self), id), None)
305 self._initialized = False
306 self._valid_fields.clear()
307 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
308 _db.execute(query, (self.id,))
311 @staticmethod
312 def _prefix_with(string, prefix):
313 if string:
314 string = prefix + string
315 return string
318 @classmethod
319 def fetch(cls, where='', params=(), joins='', order_by=''):
321 Construct instances of our class based on the given database query.
323 @yields One class instance for each row fetched.
325 order_by = cls._prefix_with(order_by, 'ORDER BY ')
326 where = cls._prefix_with(where, 'WHERE ')
327 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
328 '%(where)s %(order_by)s' % {'table' : cls._table_name,
329 'joins' : joins,
330 'where' : where,
331 'order_by' : order_by})
332 rows = _db.execute(query, params)
333 return [cls(id=row[0], row=row) for row in rows]
336 class IneligibleHostQueue(DBObject):
337 _table_name = 'afe_ineligible_host_queues'
338 _fields = ('id', 'job_id', 'host_id')
341 class AtomicGroup(DBObject):
342 _table_name = 'afe_atomic_groups'
343 _fields = ('id', 'name', 'description', 'max_number_of_machines',
344 'invalid')
347 class Label(DBObject):
348 _table_name = 'afe_labels'
349 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
350 'only_if_needed', 'atomic_group_id')
353 def __repr__(self):
354 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
355 self.name, self.id, self.atomic_group_id)
358 class Host(DBObject):
359 _table_name = 'afe_hosts'
360 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
361 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
364 def set_status(self,status):
365 logging.info('%s -> %s', self.hostname, status)
366 self.update_field('status',status)
369 def platform_and_labels(self):
371 Returns a tuple (platform_name, list_of_all_label_names).
373 rows = _db.execute("""
374 SELECT afe_labels.name, afe_labels.platform
375 FROM afe_labels
376 INNER JOIN afe_hosts_labels ON
377 afe_labels.id = afe_hosts_labels.label_id
378 WHERE afe_hosts_labels.host_id = %s
379 ORDER BY afe_labels.name
380 """, (self.id,))
381 platform = None
382 all_labels = []
383 for label_name, is_platform in rows:
384 if is_platform:
385 platform = label_name
386 all_labels.append(label_name)
387 return platform, all_labels
390 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
393 @classmethod
394 def cmp_for_sort(cls, a, b):
396 A comparison function for sorting Host objects by hostname.
398 This strips any trailing numeric digits, ignores leading 0s and
399 compares hostnames by the leading name and the trailing digits as a
400 number. If both hostnames do not match this pattern, they are simply
401 compared as lower case strings.
403 Example of how hostnames will be sorted:
405 alice, host1, host2, host09, host010, host10, host11, yolkfolk
407 This hopefully satisfy most people's hostname sorting needs regardless
408 of their exact naming schemes. Nobody sane should have both a host10
409 and host010 (but the algorithm works regardless).
411 lower_a = a.hostname.lower()
412 lower_b = b.hostname.lower()
413 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
414 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
415 if match_a and match_b:
416 name_a, number_a_str = match_a.groups()
417 name_b, number_b_str = match_b.groups()
418 number_a = int(number_a_str.lstrip('0'))
419 number_b = int(number_b_str.lstrip('0'))
420 result = cmp((name_a, number_a), (name_b, number_b))
421 if result == 0 and lower_a != lower_b:
422 # If they compared equal above but the lower case names are
423 # indeed different, don't report equality. abc012 != abc12.
424 return cmp(lower_a, lower_b)
425 return result
426 else:
427 return cmp(lower_a, lower_b)
430 class HostQueueEntry(DBObject):
431 _table_name = 'afe_host_queue_entries'
432 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
433 'active', 'complete', 'deleted', 'execution_subdir',
434 'atomic_group_id', 'aborted', 'started_on')
437 def __init__(self, id=None, row=None, **kwargs):
438 assert id or row
439 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
440 self.job = Job(self.job_id)
442 if self.host_id:
443 self.host = Host(self.host_id)
444 else:
445 self.host = None
447 if self.atomic_group_id:
448 self.atomic_group = AtomicGroup(self.atomic_group_id,
449 always_query=False)
450 else:
451 self.atomic_group = None
453 self.queue_log_path = os.path.join(self.job.tag(),
454 'queue.log.' + str(self.id))
457 @classmethod
458 def clone(cls, template):
460 Creates a new row using the values from a template instance.
462 The new instance will not exist in the database or have a valid
463 id attribute until its save() method is called.
465 assert isinstance(template, cls)
466 new_row = [getattr(template, field) for field in cls._fields]
467 clone = cls(row=new_row, new_record=True)
468 clone.id = None
469 return clone
472 def _view_job_url(self):
473 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
476 def get_labels(self):
478 Get all labels associated with this host queue entry (either via the
479 meta_host or as a job dependency label). The labels yielded are not
480 guaranteed to be unique.
482 @yields Label instances associated with this host_queue_entry.
484 if self.meta_host:
485 yield Label(id=self.meta_host, always_query=False)
486 labels = Label.fetch(
487 joins="JOIN afe_jobs_dependency_labels AS deps "
488 "ON (afe_labels.id = deps.label_id)",
489 where="deps.job_id = %d" % self.job.id)
490 for label in labels:
491 yield label
494 def set_host(self, host):
495 if host:
496 logging.info('Assigning host %s to entry %s', host.hostname, self)
497 self.queue_log_record('Assigning host ' + host.hostname)
498 self.update_field('host_id', host.id)
499 self.block_host(host.id)
500 else:
501 logging.info('Releasing host from %s', self)
502 self.queue_log_record('Releasing host')
503 self.unblock_host(self.host.id)
504 self.update_field('host_id', None)
506 self.host = host
509 def queue_log_record(self, log_line):
510 now = str(datetime.datetime.now())
511 _drone_manager.write_lines_to_file(self.queue_log_path,
512 [now + ' ' + log_line])
515 def block_host(self, host_id):
516 logging.info("creating block %s/%s", self.job.id, host_id)
517 row = [0, self.job.id, host_id]
518 block = IneligibleHostQueue(row=row, new_record=True)
519 block.save()
522 def unblock_host(self, host_id):
523 logging.info("removing block %s/%s", self.job.id, host_id)
524 blocks = IneligibleHostQueue.fetch(
525 'job_id=%d and host_id=%d' % (self.job.id, host_id))
526 for block in blocks:
527 block.delete()
530 def set_execution_subdir(self, subdir=None):
531 if subdir is None:
532 assert self.host
533 subdir = self.host.hostname
534 self.update_field('execution_subdir', subdir)
537 def _get_hostname(self):
538 if self.host:
539 return self.host.hostname
540 return 'no host'
543 def __str__(self):
544 flags = []
545 if self.active:
546 flags.append('active')
547 if self.complete:
548 flags.append('complete')
549 if self.deleted:
550 flags.append('deleted')
551 if self.aborted:
552 flags.append('aborted')
553 flags_str = ','.join(flags)
554 if flags_str:
555 flags_str = ' [%s]' % flags_str
556 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
557 self.status, flags_str)
560 def set_status(self, status):
561 logging.info("%s -> %s", self, status)
563 self.update_field('status', status)
565 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
566 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
567 assert not (active and complete)
569 self.update_field('active', active)
570 self.update_field('complete', complete)
572 if complete:
573 self._on_complete(status)
574 self._email_on_job_complete()
576 should_email_status = (status.lower() in _notify_email_statuses or
577 'all' in _notify_email_statuses)
578 if should_email_status:
579 self._email_on_status(status)
582 def _on_complete(self, status):
583 if status is not models.HostQueueEntry.Status.ABORTED:
584 self.job.stop_if_necessary()
586 if not self.execution_subdir:
587 return
588 # unregister any possible pidfiles associated with this queue entry
589 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
590 pidfile_id = _drone_manager.get_pidfile_id_from(
591 self.execution_path(), pidfile_name=pidfile_name)
592 _drone_manager.unregister_pidfile(pidfile_id)
595 def _get_status_email_contents(self, status, summary=None, hostname=None):
597 Gather info for the status notification e-mails.
599 If needed, we could start using the Django templating engine to create
600 the subject and the e-mail body, but that doesn't seem necessary right
601 now.
603 @param status: Job status text. Mandatory.
604 @param summary: Job summary text. Optional.
605 @param hostname: A hostname for the job. Optional.
607 @return: Tuple (subject, body) for the notification e-mail.
609 job_stats = Job(id=self.job.id).get_execution_details()
611 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
612 (self.job.id, self.job.name, status))
614 if hostname is not None:
615 subject += '| Hostname: %s ' % hostname
617 if status not in ["1 Failed", "Failed"]:
618 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
620 body = "Job ID: %s\n" % self.job.id
621 body += "Job name: %s\n" % self.job.name
622 if hostname is not None:
623 body += "Host: %s\n" % hostname
624 if summary is not None:
625 body += "Summary: %s\n" % summary
626 body += "Status: %s\n" % status
627 body += "Results interface URL: %s\n" % self._view_job_url()
628 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
629 if int(job_stats['total_executed']) > 0:
630 body += "User tests executed: %s\n" % job_stats['total_executed']
631 body += "User tests passed: %s\n" % job_stats['total_passed']
632 body += "User tests failed: %s\n" % job_stats['total_failed']
633 body += ("User tests success rate: %.2f %%\n" %
634 job_stats['success_rate'])
636 if job_stats['failed_rows']:
637 body += "Failures:\n"
638 body += job_stats['failed_rows']
640 return subject, body
643 def _email_on_status(self, status):
644 hostname = self._get_hostname()
645 subject, body = self._get_status_email_contents(status, None, hostname)
646 email_manager.manager.send_email(self.job.email_list, subject, body)
649 def _email_on_job_complete(self):
650 if not self.job.is_finished():
651 return
653 summary = []
654 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
655 for queue_entry in hosts_queue:
656 summary.append("Host: %s Status: %s" %
657 (queue_entry._get_hostname(),
658 queue_entry.status))
660 summary = "\n".join(summary)
661 status_counts = models.Job.objects.get_status_counts(
662 [self.job.id])[self.job.id]
663 status = ', '.join('%d %s' % (count, status) for status, count
664 in status_counts.iteritems())
666 subject, body = self._get_status_email_contents(status, summary, None)
667 email_manager.manager.send_email(self.job.email_list, subject, body)
670 def schedule_pre_job_tasks(self):
671 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
672 self.job.name, self.meta_host, self.atomic_group_id,
673 self.job.id, self.id, self.host.hostname, self.status)
675 self._do_schedule_pre_job_tasks()
678 def _do_schedule_pre_job_tasks(self):
679 # Every host goes thru the Verifying stage (which may or may not
680 # actually do anything as determined by get_pre_job_tasks).
681 self.set_status(models.HostQueueEntry.Status.VERIFYING)
682 self.job.schedule_pre_job_tasks(queue_entry=self)
685 def requeue(self):
686 assert self.host
687 self.set_status(models.HostQueueEntry.Status.QUEUED)
688 self.update_field('started_on', None)
689 # verify/cleanup failure sets the execution subdir, so reset it here
690 self.set_execution_subdir('')
691 if self.meta_host:
692 self.set_host(None)
695 @property
696 def aborted_by(self):
697 self._load_abort_info()
698 return self._aborted_by
701 @property
702 def aborted_on(self):
703 self._load_abort_info()
704 return self._aborted_on
707 def _load_abort_info(self):
708 """ Fetch info about who aborted the job. """
709 if hasattr(self, "_aborted_by"):
710 return
711 rows = _db.execute("""
712 SELECT afe_users.login,
713 afe_aborted_host_queue_entries.aborted_on
714 FROM afe_aborted_host_queue_entries
715 INNER JOIN afe_users
716 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
717 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
718 """, (self.id,))
719 if rows:
720 self._aborted_by, self._aborted_on = rows[0]
721 else:
722 self._aborted_by = self._aborted_on = None
725 def on_pending(self):
727 Called when an entry in a synchronous job has passed verify. If the
728 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
729 them in PENDING.
731 self.set_status(models.HostQueueEntry.Status.PENDING)
732 self.host.set_status(models.Host.Status.PENDING)
734 # Some debug code here: sends an email if an asynchronous job does not
735 # immediately enter Starting.
736 # TODO: Remove this once we figure out why asynchronous jobs are getting
737 # stuck in Pending.
738 self.job.run_if_ready(queue_entry=self)
739 if (self.job.synch_count == 1 and
740 self.status == models.HostQueueEntry.Status.PENDING):
741 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
742 message = 'Asynchronous job stuck in Pending'
743 email_manager.manager.enqueue_notify_email(subject, message)
746 def abort(self, dispatcher):
747 assert self.aborted and not self.complete
749 Status = models.HostQueueEntry.Status
750 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
751 # do nothing; post-job tasks will finish and then mark this entry
752 # with status "Aborted" and take care of the host
753 return
755 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
756 Status.WAITING):
757 assert not dispatcher.get_agents_for_entry(self)
758 self.host.set_status(models.Host.Status.READY)
759 elif self.status == Status.VERIFYING:
760 models.SpecialTask.objects.create(
761 task=models.SpecialTask.Task.CLEANUP,
762 host=models.Host.objects.get(id=self.host.id),
763 requested_by=self.job.owner_model())
765 self.set_status(Status.ABORTED)
766 self.job.abort_delay_ready_task()
769 def get_group_name(self):
770 atomic_group = self.atomic_group
771 if not atomic_group:
772 return ''
774 # Look at any meta_host and dependency labels and pick the first
775 # one that also specifies this atomic group. Use that label name
776 # as the group name if possible (it is more specific).
777 for label in self.get_labels():
778 if label.atomic_group_id:
779 assert label.atomic_group_id == atomic_group.id
780 return label.name
781 return atomic_group.name
784 def execution_tag(self):
785 assert self.execution_subdir
786 return "%s/%s" % (self.job.tag(), self.execution_subdir)
789 def execution_path(self):
790 return self.execution_tag()
793 def set_started_on_now(self):
794 self.update_field('started_on', datetime.datetime.now())
797 def is_hostless(self):
798 return (self.host_id is None
799 and self.meta_host is None
800 and self.atomic_group_id is None)
803 class Job(DBObject):
804 _table_name = 'afe_jobs'
805 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
806 'control_type', 'created_on', 'synch_count', 'timeout',
807 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
808 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
809 'parameterized_job_id')
811 # This does not need to be a column in the DB. The delays are likely to
812 # be configured short. If the scheduler is stopped and restarted in
813 # the middle of a job's delay cycle, the delay cycle will either be
814 # repeated or skipped depending on the number of Pending machines found
815 # when the restarted scheduler recovers to track it. Not a problem.
817 # A reference to the DelayedCallTask that will wake up the job should
818 # no other HQEs change state in time. Its end_time attribute is used
819 # by our run_with_ready_delay() method to determine if the wait is over.
820 _delay_ready_task = None
822 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
823 # all status='Pending' atomic group HQEs incase a delay was running when the
824 # scheduler was restarted and no more hosts ever successfully exit Verify.
826 def __init__(self, id=None, row=None, **kwargs):
827 assert id or row
828 super(Job, self).__init__(id=id, row=row, **kwargs)
829 self._owner_model = None # caches model instance of owner
832 def model(self):
833 return models.Job.objects.get(id=self.id)
836 def owner_model(self):
837 # work around the fact that the Job owner field is a string, not a
838 # foreign key
839 if not self._owner_model:
840 self._owner_model = models.User.objects.get(login=self.owner)
841 return self._owner_model
844 def is_server_job(self):
845 return self.control_type != 2
848 def tag(self):
849 return "%s-%s" % (self.id, self.owner)
852 def get_host_queue_entries(self):
853 rows = _db.execute("""
854 SELECT * FROM afe_host_queue_entries
855 WHERE job_id= %s
856 """, (self.id,))
857 entries = [HostQueueEntry(row=i) for i in rows]
859 assert len(entries)>0
861 return entries
864 def get_execution_details(self):
866 Get test execution details for this job.
868 @return: Dictionary with test execution details
870 def _find_test_jobs(rows):
872 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
873 Those are autotest 'internal job' tests, so they should not be
874 counted when evaluating the test stats.
876 @param rows: List of rows (matrix) with database results.
878 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
879 n_test_jobs = 0
880 for r in rows:
881 test_name = r[0]
882 if job_test_pattern.match(test_name):
883 n_test_jobs += 1
885 return n_test_jobs
887 stats = {}
889 rows = _db.execute("""
890 SELECT t.test, s.word, t.reason
891 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
892 WHERE t.job_idx = j.job_idx
893 AND s.status_idx = t.status
894 AND j.afe_job_id = %s
895 ORDER BY t.reason
896 """ % self.id)
898 failed_rows = [r for r in rows if not 'GOOD' in r]
900 n_test_jobs = _find_test_jobs(rows)
901 n_test_jobs_failed = _find_test_jobs(failed_rows)
903 total_executed = len(rows) - n_test_jobs
904 total_failed = len(failed_rows) - n_test_jobs_failed
906 if total_executed > 0:
907 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
908 else:
909 success_rate = 0
911 stats['total_executed'] = total_executed
912 stats['total_failed'] = total_failed
913 stats['total_passed'] = total_executed - total_failed
914 stats['success_rate'] = success_rate
916 status_header = ("Test Name", "Status", "Reason")
917 if failed_rows:
918 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
919 status_header)
920 else:
921 stats['failed_rows'] = ''
923 time_row = _db.execute("""
924 SELECT started_time, finished_time
925 FROM tko_jobs
926 WHERE afe_job_id = %s
927 """ % self.id)
929 if time_row:
930 t_begin, t_end = time_row[0]
931 try:
932 delta = t_end - t_begin
933 minutes, seconds = divmod(delta.seconds, 60)
934 hours, minutes = divmod(minutes, 60)
935 stats['execution_time'] = ("%02d:%02d:%02d" %
936 (hours, minutes, seconds))
937 # One of t_end or t_begin are None
938 except TypeError:
939 stats['execution_time'] = '(could not determine)'
940 else:
941 stats['execution_time'] = '(none)'
943 return stats
946 def set_status(self, status, update_queues=False):
947 self.update_field('status',status)
949 if update_queues:
950 for queue_entry in self.get_host_queue_entries():
951 queue_entry.set_status(status)
954 def keyval_dict(self):
955 return self.model().keyval_dict()
958 def _atomic_and_has_started(self):
960 @returns True if any of the HostQueueEntries associated with this job
961 have entered the Status.STARTING state or beyond.
963 atomic_entries = models.HostQueueEntry.objects.filter(
964 job=self.id, atomic_group__isnull=False)
965 if atomic_entries.count() <= 0:
966 return False
968 # These states may *only* be reached if Job.run() has been called.
969 started_statuses = (models.HostQueueEntry.Status.STARTING,
970 models.HostQueueEntry.Status.RUNNING,
971 models.HostQueueEntry.Status.COMPLETED)
973 started_entries = atomic_entries.filter(status__in=started_statuses)
974 return started_entries.count() > 0
977 def _hosts_assigned_count(self):
978 """The number of HostQueueEntries assigned a Host for this job."""
979 entries = models.HostQueueEntry.objects.filter(job=self.id,
980 host__isnull=False)
981 return entries.count()
984 def _pending_count(self):
985 """The number of HostQueueEntries for this job in the Pending state."""
986 pending_entries = models.HostQueueEntry.objects.filter(
987 job=self.id, status=models.HostQueueEntry.Status.PENDING)
988 return pending_entries.count()
991 def _max_hosts_needed_to_run(self, atomic_group):
993 @param atomic_group: The AtomicGroup associated with this job that we
994 are using to set an upper bound on the threshold.
995 @returns The maximum number of HostQueueEntries assigned a Host before
996 this job can run.
998 return min(self._hosts_assigned_count(),
999 atomic_group.max_number_of_machines)
1002 def _min_hosts_needed_to_run(self):
1003 """Return the minumum number of hsots needed to run this job."""
1004 return self.synch_count
1007 def is_ready(self):
1008 # NOTE: Atomic group jobs stop reporting ready after they have been
1009 # started to avoid launching multiple copies of one atomic job.
1010 # Only possible if synch_count is less than than half the number of
1011 # machines in the atomic group.
1012 pending_count = self._pending_count()
1013 atomic_and_has_started = self._atomic_and_has_started()
1014 ready = (pending_count >= self.synch_count
1015 and not atomic_and_has_started)
1017 if not ready:
1018 logging.info(
1019 'Job %s not ready: %s pending, %s required '
1020 '(Atomic and started: %s)',
1021 self, pending_count, self.synch_count,
1022 atomic_and_has_started)
1024 return ready
1027 def num_machines(self, clause = None):
1028 sql = "job_id=%s" % self.id
1029 if clause:
1030 sql += " AND (%s)" % clause
1031 return self.count(sql, table='afe_host_queue_entries')
1034 def num_queued(self):
1035 return self.num_machines('not complete')
1038 def num_active(self):
1039 return self.num_machines('active')
1042 def num_complete(self):
1043 return self.num_machines('complete')
1046 def is_finished(self):
1047 return self.num_complete() == self.num_machines()
1050 def _not_yet_run_entries(self, include_verifying=True):
1051 statuses = [models.HostQueueEntry.Status.QUEUED,
1052 models.HostQueueEntry.Status.PENDING]
1053 if include_verifying:
1054 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1055 return models.HostQueueEntry.objects.filter(job=self.id,
1056 status__in=statuses)
1059 def _stop_all_entries(self):
1060 entries_to_stop = self._not_yet_run_entries(
1061 include_verifying=False)
1062 for child_entry in entries_to_stop:
1063 assert not child_entry.complete, (
1064 '%s status=%s, active=%s, complete=%s' %
1065 (child_entry.id, child_entry.status, child_entry.active,
1066 child_entry.complete))
1067 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1068 child_entry.host.status = models.Host.Status.READY
1069 child_entry.host.save()
1070 child_entry.status = models.HostQueueEntry.Status.STOPPED
1071 child_entry.save()
1074 def stop_if_necessary(self):
1075 not_yet_run = self._not_yet_run_entries()
1076 if not_yet_run.count() < self.synch_count:
1077 self._stop_all_entries()
1080 def write_to_machines_file(self, queue_entry):
1081 hostname = queue_entry.host.hostname
1082 file_path = os.path.join(self.tag(), '.machines')
1083 _drone_manager.write_lines_to_file(file_path, [hostname])
1086 def _next_group_name(self, group_name=''):
1087 """@returns a directory name to use for the next host group results."""
1088 if group_name:
1089 # Sanitize for use as a pathname.
1090 group_name = group_name.replace(os.path.sep, '_')
1091 if group_name.startswith('.'):
1092 group_name = '_' + group_name[1:]
1093 # Add a separator between the group name and 'group%d'.
1094 group_name += '.'
1095 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1096 query = models.HostQueueEntry.objects.filter(
1097 job=self.id).values('execution_subdir').distinct()
1098 subdirs = (entry['execution_subdir'] for entry in query)
1099 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1100 ids = [int(match.group(1)) for match in group_matches if match]
1101 if ids:
1102 next_id = max(ids) + 1
1103 else:
1104 next_id = 0
1105 return '%sgroup%d' % (group_name, next_id)
1108 def get_group_entries(self, queue_entry_from_group):
1110 @param queue_entry_from_group: A HostQueueEntry instance to find other
1111 group entries on this job for.
1113 @returns A list of HostQueueEntry objects all executing this job as
1114 part of the same group as the one supplied (having the same
1115 execution_subdir).
1117 execution_subdir = queue_entry_from_group.execution_subdir
1118 return list(HostQueueEntry.fetch(
1119 where='job_id=%s AND execution_subdir=%s',
1120 params=(self.id, execution_subdir)))
1123 def _should_run_cleanup(self, queue_entry):
1124 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1125 return True
1126 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1127 return queue_entry.host.dirty
1128 return False
1131 def _should_run_verify(self, queue_entry):
1132 do_not_verify = (queue_entry.host.protection ==
1133 host_protections.Protection.DO_NOT_VERIFY)
1134 if do_not_verify:
1135 return False
1136 return self.run_verify
1139 def schedule_pre_job_tasks(self, queue_entry):
1141 Get a list of tasks to perform before the host_queue_entry
1142 may be used to run this Job (such as Cleanup & Verify).
1144 @returns A list of tasks to be done to the given queue_entry before
1145 it should be considered be ready to run this job. The last
1146 task in the list calls HostQueueEntry.on_pending(), which
1147 continues the flow of the job.
1149 if self._should_run_cleanup(queue_entry):
1150 task = models.SpecialTask.Task.CLEANUP
1151 elif self._should_run_verify(queue_entry):
1152 task = models.SpecialTask.Task.VERIFY
1153 else:
1154 queue_entry.on_pending()
1155 return
1157 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
1158 models.SpecialTask.objects.create(
1159 host=models.Host.objects.get(id=queue_entry.host_id),
1160 queue_entry=queue_entry, task=task)
1163 def _assign_new_group(self, queue_entries, group_name=''):
1164 if len(queue_entries) == 1:
1165 group_subdir_name = queue_entries[0].host.hostname
1166 else:
1167 group_subdir_name = self._next_group_name(group_name)
1168 logging.info('Running synchronous job %d hosts %s as %s',
1169 self.id, [entry.host.hostname for entry in queue_entries],
1170 group_subdir_name)
1172 for queue_entry in queue_entries:
1173 queue_entry.set_execution_subdir(group_subdir_name)
1176 def _choose_group_to_run(self, include_queue_entry):
1178 @returns A tuple containing a list of HostQueueEntry instances to be
1179 used to run this Job, a string group name to suggest giving
1180 to this job in the results database.
1182 atomic_group = include_queue_entry.atomic_group
1183 chosen_entries = [include_queue_entry]
1184 if atomic_group:
1185 num_entries_wanted = atomic_group.max_number_of_machines
1186 else:
1187 num_entries_wanted = self.synch_count
1188 num_entries_wanted -= len(chosen_entries)
1190 if num_entries_wanted > 0:
1191 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1192 pending_entries = list(HostQueueEntry.fetch(
1193 where=where_clause,
1194 params=(self.id, include_queue_entry.id)))
1196 # Sort the chosen hosts by hostname before slicing.
1197 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1198 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1199 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1200 chosen_entries += pending_entries[:num_entries_wanted]
1202 # Sanity check. We'll only ever be called if this can be met.
1203 if len(chosen_entries) < self.synch_count:
1204 message = ('job %s got less than %s chosen entries: %s' % (
1205 self.id, self.synch_count, chosen_entries))
1206 logging.error(message)
1207 email_manager.manager.enqueue_notify_email(
1208 'Job not started, too few chosen entries', message)
1209 return []
1211 group_name = include_queue_entry.get_group_name()
1213 self._assign_new_group(chosen_entries, group_name=group_name)
1214 return chosen_entries
1217 def run_if_ready(self, queue_entry):
1219 Run this job by kicking its HQEs into status='Starting' if enough
1220 hosts are ready for it to run.
1222 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1223 ready to run.
1225 if not self.is_ready():
1226 self.stop_if_necessary()
1227 elif queue_entry.atomic_group:
1228 self.run_with_ready_delay(queue_entry)
1229 else:
1230 self.run(queue_entry)
1233 def run_with_ready_delay(self, queue_entry):
1235 Start a delay to wait for more hosts to enter Pending state before
1236 launching an atomic group job. Once set, the a delay cannot be reset.
1238 @param queue_entry: The HostQueueEntry object to get atomic group
1239 info from and pass to run_if_ready when the delay is up.
1241 @returns An Agent to run the job as appropriate or None if a delay
1242 has already been set.
1244 assert queue_entry.job_id == self.id
1245 assert queue_entry.atomic_group
1246 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1247 over_max_threshold = (self._pending_count() >=
1248 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1249 delay_expired = (self._delay_ready_task and
1250 time.time() >= self._delay_ready_task.end_time)
1252 # Delay is disabled or we already have enough? Do not wait to run.
1253 if not delay or over_max_threshold or delay_expired:
1254 self.run(queue_entry)
1255 else:
1256 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1259 def request_abort(self):
1260 """Request that this Job be aborted on the next scheduler cycle."""
1261 self.model().abort()
1264 def schedule_delayed_callback_task(self, queue_entry):
1265 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1267 if self._delay_ready_task:
1268 return None
1270 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1272 def run_job_after_delay():
1273 logging.info('Job %s done waiting for extra hosts.', self)
1274 # Check to see if the job is still relevant. It could have aborted
1275 # while we were waiting or hosts could have disappearred, etc.
1276 if self._pending_count() < self._min_hosts_needed_to_run():
1277 logging.info('Job %s had too few Pending hosts after waiting '
1278 'for extras. Not running.', self)
1279 self.request_abort()
1280 return
1281 return self.run(queue_entry)
1283 logging.info('Job %s waiting up to %s seconds for more hosts.',
1284 self.id, delay)
1285 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1286 callback=run_job_after_delay)
1287 return self._delay_ready_task
1290 def run(self, queue_entry):
1292 @param queue_entry: The HostQueueEntry instance calling this method.
1294 if queue_entry.atomic_group and self._atomic_and_has_started():
1295 logging.error('Job.run() called on running atomic Job %d '
1296 'with HQE %s.', self.id, queue_entry)
1297 return
1298 queue_entries = self._choose_group_to_run(queue_entry)
1299 if queue_entries:
1300 self._finish_run(queue_entries)
1303 def _finish_run(self, queue_entries):
1304 for queue_entry in queue_entries:
1305 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1306 self.abort_delay_ready_task()
1309 def abort_delay_ready_task(self):
1310 """Abort the delayed task associated with this job, if any."""
1311 if self._delay_ready_task:
1312 # Cancel any pending callback that would try to run again
1313 # as we are already running.
1314 self._delay_ready_task.abort()
1317 def __str__(self):
1318 return '%s-%s' % (self.id, self.owner)