1 """Database model classes for the scheduler.
3 Contains model classes abstracting the various DB tables used by the scheduler.
4 These overlap the Django models in basic functionality, but were written before
5 the Django models existed and have not yet been phased out. Some of them
6 (particularly HostQueueEntry and Job) have considerable scheduler-specific logic
7 which would probably be ill-suited for inclusion in the general Django model
11 _notify_email_statuses: list of HQE statuses. each time a single HQE reaches
12 one of these statuses, an email will be sent to the job's email_list.
13 comes from global_config.
14 _base_url: URL to the local AFE server, used to construct URLs for emails.
15 _db: DatabaseConnection for this module.
16 _drone_manager: reference to global DroneManager instance.
19 import datetime
, itertools
, logging
, os
, re
, sys
, time
, weakref
20 from django
.db
import connection
21 from autotest_lib
.client
.common_lib
import global_config
, host_protections
22 from autotest_lib
.client
.common_lib
import global_config
, utils
23 from autotest_lib
.frontend
.afe
import models
, model_attributes
24 from autotest_lib
.database
import database_connection
25 from autotest_lib
.scheduler
import drone_manager
, email_manager
26 from autotest_lib
.scheduler
import scheduler_config
28 _notify_email_statuses
= []
36 _db
= database_connection
.DatabaseConnection('AUTOTEST_WEB')
37 _db
.connect(db_type
='django')
39 notify_statuses_list
= global_config
.global_config
.get_config_value(
40 scheduler_config
.CONFIG_SECTION
, "notify_email_statuses",
42 global _notify_email_statuses
43 _notify_email_statuses
= [status
for status
in
44 re
.split(r
'[\s,;:]', notify_statuses_list
.lower())
47 # AUTOTEST_WEB.base_url is still a supported config option as some people
48 # may wish to override the entire url.
50 config_base_url
= global_config
.global_config
.get_config_value(
51 scheduler_config
.CONFIG_SECTION
, 'base_url', default
='')
53 _base_url
= config_base_url
55 # For the common case of everything running on a single server you
56 # can just set the hostname in a single place in the config file.
57 server_name
= global_config
.global_config
.get_config_value(
60 logging
.critical('[SERVER] hostname missing from the config file.')
62 _base_url
= 'http://%s/afe/' % server_name
67 def initialize_globals():
69 _drone_manager
= drone_manager
.instance()
72 class DelayedCallTask(object):
74 A task object like AgentTask for an Agent to run that waits for the
75 specified amount of time to have elapsed before calling the supplied
76 callback once and finishing. If the callback returns anything, it is
77 assumed to be a new Agent instance and will be added to the dispatcher.
79 @attribute end_time: The absolute posix time after which this task will
80 call its callback when it is polled and be finished.
82 Also has all attributes required by the Agent class.
84 def __init__(self
, delay_seconds
, callback
, now_func
=None):
86 @param delay_seconds: The delay in seconds from now that this task
87 will call the supplied callback and be done.
88 @param callback: A callable to be called by this task once after at
89 least delay_seconds time has elapsed. It must return None
90 or a new Agent instance.
91 @param now_func: A time.time like function. Default: time.time.
94 assert delay_seconds
> 0
95 assert callable(callback
)
98 self
._now
_func
= now_func
99 self
._callback
= callback
101 self
.end_time
= self
._now
_func
() + delay_seconds
103 # These attributes are required by Agent.
107 self
.queue_entry_ids
= ()
108 self
.num_processes
= 0
112 if not self
.is_done() and self
._now
_func
() >= self
.end_time
:
118 return self
.success
or self
.aborted
125 class DBError(Exception):
126 """Raised by the DBObject constructor when its select fails."""
129 class DBObject(object):
130 """A miniature object relational model for the database."""
132 # Subclasses MUST override these:
136 # A mapping from (type, id) to the instance of the object for that
137 # particular id. This prevents us from creating new Job() and Host()
138 # instances for every HostQueueEntry object that we instantiate as
139 # multiple HQEs often share the same Job.
140 _instances_by_type_and_id
= weakref
.WeakValueDictionary()
144 def __new__(cls
, id=None, **kwargs
):
146 Look to see if we already have an instance for this particular type
147 and id. If so, use it instead of creating a duplicate instance.
150 instance
= cls
._instances
_by
_type
_and
_id
.get((cls
, id))
153 return super(DBObject
, cls
).__new
__(cls
, id=id, **kwargs
)
156 def __init__(self
, id=None, row
=None, new_record
=False, always_query
=True):
157 assert bool(id) or bool(row
)
158 if id is not None and row
is not None:
160 assert self
._table
_name
, '_table_name must be defined in your class'
161 assert self
._fields
, '_fields must be defined in your class'
163 if self
._initialized
and not always_query
:
164 return # We've already been initialized.
167 # Tell future constructors to use us instead of re-querying while
168 # this instance is still around.
169 self
._instances
_by
_type
_and
_id
[(type(self
), id)] = self
171 self
.__table
= self
._table
_name
173 self
.__new
_record
= new_record
176 row
= self
._fetch
_row
_from
_db
(id)
178 if self
._initialized
:
179 differences
= self
._compare
_fields
_in
_row
(row
)
182 'initialized %s %s instance requery is updating: %s',
183 type(self
), self
.id, differences
)
184 self
._update
_fields
_from
_row
(row
)
185 self
._initialized
= True
189 def _clear_instance_cache(cls
):
190 """Used for testing, clear the internal instance cache."""
191 cls
._instances
_by
_type
_and
_id
.clear()
194 def _fetch_row_from_db(self
, row_id
):
195 sql
= 'SELECT * FROM %s WHERE ID=%%s' % self
.__table
196 rows
= _db
.execute(sql
, (row_id
,))
198 raise DBError("row not found (table=%s, row id=%s)"
199 % (self
.__table
, row_id
))
203 def _assert_row_length(self
, row
):
204 assert len(row
) == len(self
._fields
), (
205 "table = %s, row = %s/%d, fields = %s/%d" % (
206 self
.__table
, row
, len(row
), self
._fields
, len(self
._fields
)))
209 def _compare_fields_in_row(self
, row
):
211 Given a row as returned by a SELECT query, compare it to our existing in
212 memory fields. Fractional seconds are stripped from datetime values
215 @param row - A sequence of values corresponding to fields named in
216 The class attribute _fields.
218 @returns A dictionary listing the differences keyed by field name
219 containing tuples of (current_value, row_value).
221 self
._assert
_row
_length
(row
)
223 datetime_cmp_fmt
= '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
224 for field
, row_value
in itertools
.izip(self
._fields
, row
):
225 current_value
= getattr(self
, field
)
226 if (isinstance(current_value
, datetime
.datetime
)
227 and isinstance(row_value
, datetime
.datetime
)):
228 current_value
= current_value
.strftime(datetime_cmp_fmt
)
229 row_value
= row_value
.strftime(datetime_cmp_fmt
)
230 if current_value
!= row_value
:
231 differences
[field
] = (current_value
, row_value
)
235 def _update_fields_from_row(self
, row
):
237 Update our field attributes using a single row returned by SELECT.
239 @param row - A sequence of values corresponding to fields named in
240 the class fields list.
242 self
._assert
_row
_length
(row
)
244 self
._valid
_fields
= set()
245 for field
, value
in itertools
.izip(self
._fields
, row
):
246 setattr(self
, field
, value
)
247 self
._valid
_fields
.add(field
)
249 self
._valid
_fields
.remove('id')
252 def update_from_database(self
):
253 assert self
.id is not None
254 row
= self
._fetch
_row
_from
_db
(self
.id)
255 self
._update
_fields
_from
_row
(row
)
258 def count(self
, where
, table
= None):
262 rows
= _db
.execute("""
263 SELECT count(*) FROM %s
265 """ % (table
, where
))
267 assert len(rows
) == 1
269 return int(rows
[0][0])
272 def update_field(self
, field
, value
):
273 assert field
in self
._valid
_fields
275 if getattr(self
, field
) == value
:
278 query
= "UPDATE %s SET %s = %%s WHERE id = %%s" % (self
.__table
, field
)
279 _db
.execute(query
, (value
, self
.id))
281 setattr(self
, field
, value
)
285 if self
.__new
_record
:
286 keys
= self
._fields
[1:] # avoid id
287 columns
= ','.join([str(key
) for key
in keys
])
290 value
= getattr(self
, key
)
292 values
.append('NULL')
294 values
.append('"%s"' % value
)
295 values_str
= ','.join(values
)
296 query
= ('INSERT INTO %s (%s) VALUES (%s)' %
297 (self
.__table
, columns
, values_str
))
299 # Update our id to the one the database just assigned to us.
300 self
.id = _db
.execute('SELECT LAST_INSERT_ID()')[0][0]
304 self
._instances
_by
_type
_and
_id
.pop((type(self
), id), None)
305 self
._initialized
= False
306 self
._valid
_fields
.clear()
307 query
= 'DELETE FROM %s WHERE id=%%s' % self
.__table
308 _db
.execute(query
, (self
.id,))
312 def _prefix_with(string
, prefix
):
314 string
= prefix
+ string
319 def fetch(cls
, where
='', params
=(), joins
='', order_by
=''):
321 Construct instances of our class based on the given database query.
323 @yields One class instance for each row fetched.
325 order_by
= cls
._prefix
_with
(order_by
, 'ORDER BY ')
326 where
= cls
._prefix
_with
(where
, 'WHERE ')
327 query
= ('SELECT %(table)s.* FROM %(table)s %(joins)s '
328 '%(where)s %(order_by)s' % {'table' : cls
._table
_name
,
331 'order_by' : order_by
})
332 rows
= _db
.execute(query
, params
)
333 return [cls(id=row
[0], row
=row
) for row
in rows
]
336 class IneligibleHostQueue(DBObject
):
337 _table_name
= 'afe_ineligible_host_queues'
338 _fields
= ('id', 'job_id', 'host_id')
341 class AtomicGroup(DBObject
):
342 _table_name
= 'afe_atomic_groups'
343 _fields
= ('id', 'name', 'description', 'max_number_of_machines',
347 class Label(DBObject
):
348 _table_name
= 'afe_labels'
349 _fields
= ('id', 'name', 'kernel_config', 'platform', 'invalid',
350 'only_if_needed', 'atomic_group_id')
354 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
355 self
.name
, self
.id, self
.atomic_group_id
)
358 class Host(DBObject
):
359 _table_name
= 'afe_hosts'
360 _fields
= ('id', 'hostname', 'locked', 'synch_id', 'status',
361 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
364 def set_status(self
,status
):
365 logging
.info('%s -> %s', self
.hostname
, status
)
366 self
.update_field('status',status
)
369 def platform_and_labels(self
):
371 Returns a tuple (platform_name, list_of_all_label_names).
373 rows
= _db
.execute("""
374 SELECT afe_labels.name, afe_labels.platform
376 INNER JOIN afe_hosts_labels ON
377 afe_labels.id = afe_hosts_labels.label_id
378 WHERE afe_hosts_labels.host_id = %s
379 ORDER BY afe_labels.name
383 for label_name
, is_platform
in rows
:
385 platform
= label_name
386 all_labels
.append(label_name
)
387 return platform
, all_labels
390 _ALPHANUM_HOST_RE
= re
.compile(r
'^([a-z-]+)(\d+)$', re
.IGNORECASE
)
394 def cmp_for_sort(cls
, a
, b
):
396 A comparison function for sorting Host objects by hostname.
398 This strips any trailing numeric digits, ignores leading 0s and
399 compares hostnames by the leading name and the trailing digits as a
400 number. If both hostnames do not match this pattern, they are simply
401 compared as lower case strings.
403 Example of how hostnames will be sorted:
405 alice, host1, host2, host09, host010, host10, host11, yolkfolk
407 This hopefully satisfy most people's hostname sorting needs regardless
408 of their exact naming schemes. Nobody sane should have both a host10
409 and host010 (but the algorithm works regardless).
411 lower_a
= a
.hostname
.lower()
412 lower_b
= b
.hostname
.lower()
413 match_a
= cls
._ALPHANUM
_HOST
_RE
.match(lower_a
)
414 match_b
= cls
._ALPHANUM
_HOST
_RE
.match(lower_b
)
415 if match_a
and match_b
:
416 name_a
, number_a_str
= match_a
.groups()
417 name_b
, number_b_str
= match_b
.groups()
418 number_a
= int(number_a_str
.lstrip('0'))
419 number_b
= int(number_b_str
.lstrip('0'))
420 result
= cmp((name_a
, number_a
), (name_b
, number_b
))
421 if result
== 0 and lower_a
!= lower_b
:
422 # If they compared equal above but the lower case names are
423 # indeed different, don't report equality. abc012 != abc12.
424 return cmp(lower_a
, lower_b
)
427 return cmp(lower_a
, lower_b
)
430 class HostQueueEntry(DBObject
):
431 _table_name
= 'afe_host_queue_entries'
432 _fields
= ('id', 'job_id', 'host_id', 'status', 'meta_host',
433 'active', 'complete', 'deleted', 'execution_subdir',
434 'atomic_group_id', 'aborted', 'started_on')
437 def __init__(self
, id=None, row
=None, **kwargs
):
439 super(HostQueueEntry
, self
).__init
__(id=id, row
=row
, **kwargs
)
440 self
.job
= Job(self
.job_id
)
443 self
.host
= Host(self
.host_id
)
447 if self
.atomic_group_id
:
448 self
.atomic_group
= AtomicGroup(self
.atomic_group_id
,
451 self
.atomic_group
= None
453 self
.queue_log_path
= os
.path
.join(self
.job
.tag(),
454 'queue.log.' + str(self
.id))
458 def clone(cls
, template
):
460 Creates a new row using the values from a template instance.
462 The new instance will not exist in the database or have a valid
463 id attribute until its save() method is called.
465 assert isinstance(template
, cls
)
466 new_row
= [getattr(template
, field
) for field
in cls
._fields
]
467 clone
= cls(row
=new_row
, new_record
=True)
472 def _view_job_url(self
):
473 return "%s#tab_id=view_job&object_id=%s" % (_base_url
, self
.job
.id)
476 def get_labels(self
):
478 Get all labels associated with this host queue entry (either via the
479 meta_host or as a job dependency label). The labels yielded are not
480 guaranteed to be unique.
482 @yields Label instances associated with this host_queue_entry.
485 yield Label(id=self
.meta_host
, always_query
=False)
486 labels
= Label
.fetch(
487 joins
="JOIN afe_jobs_dependency_labels AS deps "
488 "ON (afe_labels.id = deps.label_id)",
489 where
="deps.job_id = %d" % self
.job
.id)
494 def set_host(self
, host
):
496 logging
.info('Assigning host %s to entry %s', host
.hostname
, self
)
497 self
.queue_log_record('Assigning host ' + host
.hostname
)
498 self
.update_field('host_id', host
.id)
499 self
.block_host(host
.id)
501 logging
.info('Releasing host from %s', self
)
502 self
.queue_log_record('Releasing host')
503 self
.unblock_host(self
.host
.id)
504 self
.update_field('host_id', None)
509 def queue_log_record(self
, log_line
):
510 now
= str(datetime
.datetime
.now())
511 _drone_manager
.write_lines_to_file(self
.queue_log_path
,
512 [now
+ ' ' + log_line
])
515 def block_host(self
, host_id
):
516 logging
.info("creating block %s/%s", self
.job
.id, host_id
)
517 row
= [0, self
.job
.id, host_id
]
518 block
= IneligibleHostQueue(row
=row
, new_record
=True)
522 def unblock_host(self
, host_id
):
523 logging
.info("removing block %s/%s", self
.job
.id, host_id
)
524 blocks
= IneligibleHostQueue
.fetch(
525 'job_id=%d and host_id=%d' % (self
.job
.id, host_id
))
530 def set_execution_subdir(self
, subdir
=None):
533 subdir
= self
.host
.hostname
534 self
.update_field('execution_subdir', subdir
)
537 def _get_hostname(self
):
539 return self
.host
.hostname
546 flags
.append('active')
548 flags
.append('complete')
550 flags
.append('deleted')
552 flags
.append('aborted')
553 flags_str
= ','.join(flags
)
555 flags_str
= ' [%s]' % flags_str
556 return "%s/%d (%d) %s%s" % (self
._get
_hostname
(), self
.job
.id, self
.id,
557 self
.status
, flags_str
)
560 def set_status(self
, status
):
561 logging
.info("%s -> %s", self
, status
)
563 self
.update_field('status', status
)
565 active
= (status
in models
.HostQueueEntry
.ACTIVE_STATUSES
)
566 complete
= (status
in models
.HostQueueEntry
.COMPLETE_STATUSES
)
567 assert not (active
and complete
)
569 self
.update_field('active', active
)
570 self
.update_field('complete', complete
)
573 self
._on
_complete
(status
)
574 self
._email
_on
_job
_complete
()
576 should_email_status
= (status
.lower() in _notify_email_statuses
or
577 'all' in _notify_email_statuses
)
578 if should_email_status
:
579 self
._email
_on
_status
(status
)
582 def _on_complete(self
, status
):
583 if status
is not models
.HostQueueEntry
.Status
.ABORTED
:
584 self
.job
.stop_if_necessary()
586 if not self
.execution_subdir
:
588 # unregister any possible pidfiles associated with this queue entry
589 for pidfile_name
in drone_manager
.ALL_PIDFILE_NAMES
:
590 pidfile_id
= _drone_manager
.get_pidfile_id_from(
591 self
.execution_path(), pidfile_name
=pidfile_name
)
592 _drone_manager
.unregister_pidfile(pidfile_id
)
595 def _get_status_email_contents(self
, status
, summary
=None, hostname
=None):
597 Gather info for the status notification e-mails.
599 If needed, we could start using the Django templating engine to create
600 the subject and the e-mail body, but that doesn't seem necessary right
603 @param status: Job status text. Mandatory.
604 @param summary: Job summary text. Optional.
605 @param hostname: A hostname for the job. Optional.
607 @return: Tuple (subject, body) for the notification e-mail.
609 job_stats
= Job(id=self
.job
.id).get_execution_details()
611 subject
= ('Autotest | Job ID: %s "%s" | Status: %s ' %
612 (self
.job
.id, self
.job
.name
, status
))
614 if hostname
is not None:
615 subject
+= '| Hostname: %s ' % hostname
617 if status
not in ["1 Failed", "Failed"]:
618 subject
+= '| Success Rate: %.2f %%' % job_stats
['success_rate']
620 body
= "Job ID: %s\n" % self
.job
.id
621 body
+= "Job name: %s\n" % self
.job
.name
622 if hostname
is not None:
623 body
+= "Host: %s\n" % hostname
624 if summary
is not None:
625 body
+= "Summary: %s\n" % summary
626 body
+= "Status: %s\n" % status
627 body
+= "Results interface URL: %s\n" % self
._view
_job
_url
()
628 body
+= "Execution time (HH:MM:SS): %s\n" % job_stats
['execution_time']
629 if int(job_stats
['total_executed']) > 0:
630 body
+= "User tests executed: %s\n" % job_stats
['total_executed']
631 body
+= "User tests passed: %s\n" % job_stats
['total_passed']
632 body
+= "User tests failed: %s\n" % job_stats
['total_failed']
633 body
+= ("User tests success rate: %.2f %%\n" %
634 job_stats
['success_rate'])
636 if job_stats
['failed_rows']:
637 body
+= "Failures:\n"
638 body
+= job_stats
['failed_rows']
643 def _email_on_status(self
, status
):
644 hostname
= self
._get
_hostname
()
645 subject
, body
= self
._get
_status
_email
_contents
(status
, None, hostname
)
646 email_manager
.manager
.send_email(self
.job
.email_list
, subject
, body
)
649 def _email_on_job_complete(self
):
650 if not self
.job
.is_finished():
654 hosts_queue
= HostQueueEntry
.fetch('job_id = %s' % self
.job
.id)
655 for queue_entry
in hosts_queue
:
656 summary
.append("Host: %s Status: %s" %
657 (queue_entry
._get
_hostname
(),
660 summary
= "\n".join(summary
)
661 status_counts
= models
.Job
.objects
.get_status_counts(
662 [self
.job
.id])[self
.job
.id]
663 status
= ', '.join('%d %s' % (count
, status
) for status
, count
664 in status_counts
.iteritems())
666 subject
, body
= self
._get
_status
_email
_contents
(status
, summary
, None)
667 email_manager
.manager
.send_email(self
.job
.email_list
, subject
, body
)
670 def schedule_pre_job_tasks(self
):
671 logging
.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
672 self
.job
.name
, self
.meta_host
, self
.atomic_group_id
,
673 self
.job
.id, self
.id, self
.host
.hostname
, self
.status
)
675 self
._do
_schedule
_pre
_job
_tasks
()
678 def _do_schedule_pre_job_tasks(self
):
679 # Every host goes thru the Verifying stage (which may or may not
680 # actually do anything as determined by get_pre_job_tasks).
681 self
.set_status(models
.HostQueueEntry
.Status
.VERIFYING
)
682 self
.job
.schedule_pre_job_tasks(queue_entry
=self
)
687 self
.set_status(models
.HostQueueEntry
.Status
.QUEUED
)
688 self
.update_field('started_on', None)
689 # verify/cleanup failure sets the execution subdir, so reset it here
690 self
.set_execution_subdir('')
696 def aborted_by(self
):
697 self
._load
_abort
_info
()
698 return self
._aborted
_by
702 def aborted_on(self
):
703 self
._load
_abort
_info
()
704 return self
._aborted
_on
707 def _load_abort_info(self
):
708 """ Fetch info about who aborted the job. """
709 if hasattr(self
, "_aborted_by"):
711 rows
= _db
.execute("""
712 SELECT afe_users.login,
713 afe_aborted_host_queue_entries.aborted_on
714 FROM afe_aborted_host_queue_entries
716 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
717 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
720 self
._aborted
_by
, self
._aborted
_on
= rows
[0]
722 self
._aborted
_by
= self
._aborted
_on
= None
725 def on_pending(self
):
727 Called when an entry in a synchronous job has passed verify. If the
728 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
731 self
.set_status(models
.HostQueueEntry
.Status
.PENDING
)
732 self
.host
.set_status(models
.Host
.Status
.PENDING
)
734 # Some debug code here: sends an email if an asynchronous job does not
735 # immediately enter Starting.
736 # TODO: Remove this once we figure out why asynchronous jobs are getting
738 self
.job
.run_if_ready(queue_entry
=self
)
739 if (self
.job
.synch_count
== 1 and
740 self
.status
== models
.HostQueueEntry
.Status
.PENDING
):
741 subject
= 'Job %s (id %s)' % (self
.job
.name
, self
.job
.id)
742 message
= 'Asynchronous job stuck in Pending'
743 email_manager
.manager
.enqueue_notify_email(subject
, message
)
746 def abort(self
, dispatcher
):
747 assert self
.aborted
and not self
.complete
749 Status
= models
.HostQueueEntry
.Status
750 if self
.status
in (Status
.GATHERING
, Status
.PARSING
, Status
.ARCHIVING
):
751 # do nothing; post-job tasks will finish and then mark this entry
752 # with status "Aborted" and take care of the host
755 if self
.status
in (Status
.STARTING
, Status
.PENDING
, Status
.RUNNING
,
757 assert not dispatcher
.get_agents_for_entry(self
)
758 self
.host
.set_status(models
.Host
.Status
.READY
)
759 elif self
.status
== Status
.VERIFYING
:
760 models
.SpecialTask
.objects
.create(
761 task
=models
.SpecialTask
.Task
.CLEANUP
,
762 host
=models
.Host
.objects
.get(id=self
.host
.id),
763 requested_by
=self
.job
.owner_model())
765 self
.set_status(Status
.ABORTED
)
766 self
.job
.abort_delay_ready_task()
769 def get_group_name(self
):
770 atomic_group
= self
.atomic_group
774 # Look at any meta_host and dependency labels and pick the first
775 # one that also specifies this atomic group. Use that label name
776 # as the group name if possible (it is more specific).
777 for label
in self
.get_labels():
778 if label
.atomic_group_id
:
779 assert label
.atomic_group_id
== atomic_group
.id
781 return atomic_group
.name
784 def execution_tag(self
):
785 assert self
.execution_subdir
786 return "%s/%s" % (self
.job
.tag(), self
.execution_subdir
)
789 def execution_path(self
):
790 return self
.execution_tag()
793 def set_started_on_now(self
):
794 self
.update_field('started_on', datetime
.datetime
.now())
797 def is_hostless(self
):
798 return (self
.host_id
is None
799 and self
.meta_host
is None
800 and self
.atomic_group_id
is None)
804 _table_name
= 'afe_jobs'
805 _fields
= ('id', 'owner', 'name', 'priority', 'control_file',
806 'control_type', 'created_on', 'synch_count', 'timeout',
807 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
808 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
809 'parameterized_job_id')
811 # This does not need to be a column in the DB. The delays are likely to
812 # be configured short. If the scheduler is stopped and restarted in
813 # the middle of a job's delay cycle, the delay cycle will either be
814 # repeated or skipped depending on the number of Pending machines found
815 # when the restarted scheduler recovers to track it. Not a problem.
817 # A reference to the DelayedCallTask that will wake up the job should
818 # no other HQEs change state in time. Its end_time attribute is used
819 # by our run_with_ready_delay() method to determine if the wait is over.
820 _delay_ready_task
= None
822 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
823 # all status='Pending' atomic group HQEs incase a delay was running when the
824 # scheduler was restarted and no more hosts ever successfully exit Verify.
826 def __init__(self
, id=None, row
=None, **kwargs
):
828 super(Job
, self
).__init
__(id=id, row
=row
, **kwargs
)
829 self
._owner
_model
= None # caches model instance of owner
833 return models
.Job
.objects
.get(id=self
.id)
836 def owner_model(self
):
837 # work around the fact that the Job owner field is a string, not a
839 if not self
._owner
_model
:
840 self
._owner
_model
= models
.User
.objects
.get(login
=self
.owner
)
841 return self
._owner
_model
844 def is_server_job(self
):
845 return self
.control_type
!= 2
849 return "%s-%s" % (self
.id, self
.owner
)
852 def get_host_queue_entries(self
):
853 rows
= _db
.execute("""
854 SELECT * FROM afe_host_queue_entries
857 entries
= [HostQueueEntry(row
=i
) for i
in rows
]
859 assert len(entries
)>0
864 def get_execution_details(self
):
866 Get test execution details for this job.
868 @return: Dictionary with test execution details
870 def _find_test_jobs(rows
):
872 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
873 Those are autotest 'internal job' tests, so they should not be
874 counted when evaluating the test stats.
876 @param rows: List of rows (matrix) with database results.
878 job_test_pattern
= re
.compile('SERVER|CLIENT\\_JOB\.[\d]')
882 if job_test_pattern
.match(test_name
):
889 rows
= _db
.execute("""
890 SELECT t.test, s.word, t.reason
891 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
892 WHERE t.job_idx = j.job_idx
893 AND s.status_idx = t.status
894 AND j.afe_job_id = %s
898 failed_rows
= [r
for r
in rows
if not 'GOOD' in r
]
900 n_test_jobs
= _find_test_jobs(rows
)
901 n_test_jobs_failed
= _find_test_jobs(failed_rows
)
903 total_executed
= len(rows
) - n_test_jobs
904 total_failed
= len(failed_rows
) - n_test_jobs_failed
906 if total_executed
> 0:
907 success_rate
= 100 - ((total_failed
/ float(total_executed
)) * 100)
911 stats
['total_executed'] = total_executed
912 stats
['total_failed'] = total_failed
913 stats
['total_passed'] = total_executed
- total_failed
914 stats
['success_rate'] = success_rate
916 status_header
= ("Test Name", "Status", "Reason")
918 stats
['failed_rows'] = utils
.matrix_to_string(failed_rows
,
921 stats
['failed_rows'] = ''
923 time_row
= _db
.execute("""
924 SELECT started_time, finished_time
926 WHERE afe_job_id = %s
930 t_begin
, t_end
= time_row
[0]
932 delta
= t_end
- t_begin
933 minutes
, seconds
= divmod(delta
.seconds
, 60)
934 hours
, minutes
= divmod(minutes
, 60)
935 stats
['execution_time'] = ("%02d:%02d:%02d" %
936 (hours
, minutes
, seconds
))
937 # One of t_end or t_begin are None
939 stats
['execution_time'] = '(could not determine)'
941 stats
['execution_time'] = '(none)'
946 def set_status(self
, status
, update_queues
=False):
947 self
.update_field('status',status
)
950 for queue_entry
in self
.get_host_queue_entries():
951 queue_entry
.set_status(status
)
954 def keyval_dict(self
):
955 return self
.model().keyval_dict()
958 def _atomic_and_has_started(self
):
960 @returns True if any of the HostQueueEntries associated with this job
961 have entered the Status.STARTING state or beyond.
963 atomic_entries
= models
.HostQueueEntry
.objects
.filter(
964 job
=self
.id, atomic_group__isnull
=False)
965 if atomic_entries
.count() <= 0:
968 # These states may *only* be reached if Job.run() has been called.
969 started_statuses
= (models
.HostQueueEntry
.Status
.STARTING
,
970 models
.HostQueueEntry
.Status
.RUNNING
,
971 models
.HostQueueEntry
.Status
.COMPLETED
)
973 started_entries
= atomic_entries
.filter(status__in
=started_statuses
)
974 return started_entries
.count() > 0
977 def _hosts_assigned_count(self
):
978 """The number of HostQueueEntries assigned a Host for this job."""
979 entries
= models
.HostQueueEntry
.objects
.filter(job
=self
.id,
981 return entries
.count()
984 def _pending_count(self
):
985 """The number of HostQueueEntries for this job in the Pending state."""
986 pending_entries
= models
.HostQueueEntry
.objects
.filter(
987 job
=self
.id, status
=models
.HostQueueEntry
.Status
.PENDING
)
988 return pending_entries
.count()
991 def _max_hosts_needed_to_run(self
, atomic_group
):
993 @param atomic_group: The AtomicGroup associated with this job that we
994 are using to set an upper bound on the threshold.
995 @returns The maximum number of HostQueueEntries assigned a Host before
998 return min(self
._hosts
_assigned
_count
(),
999 atomic_group
.max_number_of_machines
)
1002 def _min_hosts_needed_to_run(self
):
1003 """Return the minumum number of hsots needed to run this job."""
1004 return self
.synch_count
1008 # NOTE: Atomic group jobs stop reporting ready after they have been
1009 # started to avoid launching multiple copies of one atomic job.
1010 # Only possible if synch_count is less than than half the number of
1011 # machines in the atomic group.
1012 pending_count
= self
._pending
_count
()
1013 atomic_and_has_started
= self
._atomic
_and
_has
_started
()
1014 ready
= (pending_count
>= self
.synch_count
1015 and not atomic_and_has_started
)
1019 'Job %s not ready: %s pending, %s required '
1020 '(Atomic and started: %s)',
1021 self
, pending_count
, self
.synch_count
,
1022 atomic_and_has_started
)
1027 def num_machines(self
, clause
= None):
1028 sql
= "job_id=%s" % self
.id
1030 sql
+= " AND (%s)" % clause
1031 return self
.count(sql
, table
='afe_host_queue_entries')
1034 def num_queued(self
):
1035 return self
.num_machines('not complete')
1038 def num_active(self
):
1039 return self
.num_machines('active')
1042 def num_complete(self
):
1043 return self
.num_machines('complete')
1046 def is_finished(self
):
1047 return self
.num_complete() == self
.num_machines()
1050 def _not_yet_run_entries(self
, include_verifying
=True):
1051 statuses
= [models
.HostQueueEntry
.Status
.QUEUED
,
1052 models
.HostQueueEntry
.Status
.PENDING
]
1053 if include_verifying
:
1054 statuses
.append(models
.HostQueueEntry
.Status
.VERIFYING
)
1055 return models
.HostQueueEntry
.objects
.filter(job
=self
.id,
1056 status__in
=statuses
)
1059 def _stop_all_entries(self
):
1060 entries_to_stop
= self
._not
_yet
_run
_entries
(
1061 include_verifying
=False)
1062 for child_entry
in entries_to_stop
:
1063 assert not child_entry
.complete
, (
1064 '%s status=%s, active=%s, complete=%s' %
1065 (child_entry
.id, child_entry
.status
, child_entry
.active
,
1066 child_entry
.complete
))
1067 if child_entry
.status
== models
.HostQueueEntry
.Status
.PENDING
:
1068 child_entry
.host
.status
= models
.Host
.Status
.READY
1069 child_entry
.host
.save()
1070 child_entry
.status
= models
.HostQueueEntry
.Status
.STOPPED
1074 def stop_if_necessary(self
):
1075 not_yet_run
= self
._not
_yet
_run
_entries
()
1076 if not_yet_run
.count() < self
.synch_count
:
1077 self
._stop
_all
_entries
()
1080 def write_to_machines_file(self
, queue_entry
):
1081 hostname
= queue_entry
.host
.hostname
1082 file_path
= os
.path
.join(self
.tag(), '.machines')
1083 _drone_manager
.write_lines_to_file(file_path
, [hostname
])
1086 def _next_group_name(self
, group_name
=''):
1087 """@returns a directory name to use for the next host group results."""
1089 # Sanitize for use as a pathname.
1090 group_name
= group_name
.replace(os
.path
.sep
, '_')
1091 if group_name
.startswith('.'):
1092 group_name
= '_' + group_name
[1:]
1093 # Add a separator between the group name and 'group%d'.
1095 group_count_re
= re
.compile(r
'%sgroup(\d+)' % re
.escape(group_name
))
1096 query
= models
.HostQueueEntry
.objects
.filter(
1097 job
=self
.id).values('execution_subdir').distinct()
1098 subdirs
= (entry
['execution_subdir'] for entry
in query
)
1099 group_matches
= (group_count_re
.match(subdir
) for subdir
in subdirs
)
1100 ids
= [int(match
.group(1)) for match
in group_matches
if match
]
1102 next_id
= max(ids
) + 1
1105 return '%sgroup%d' % (group_name
, next_id
)
1108 def get_group_entries(self
, queue_entry_from_group
):
1110 @param queue_entry_from_group: A HostQueueEntry instance to find other
1111 group entries on this job for.
1113 @returns A list of HostQueueEntry objects all executing this job as
1114 part of the same group as the one supplied (having the same
1117 execution_subdir
= queue_entry_from_group
.execution_subdir
1118 return list(HostQueueEntry
.fetch(
1119 where
='job_id=%s AND execution_subdir=%s',
1120 params
=(self
.id, execution_subdir
)))
1123 def _should_run_cleanup(self
, queue_entry
):
1124 if self
.reboot_before
== model_attributes
.RebootBefore
.ALWAYS
:
1126 elif self
.reboot_before
== model_attributes
.RebootBefore
.IF_DIRTY
:
1127 return queue_entry
.host
.dirty
1131 def _should_run_verify(self
, queue_entry
):
1132 do_not_verify
= (queue_entry
.host
.protection
==
1133 host_protections
.Protection
.DO_NOT_VERIFY
)
1136 return self
.run_verify
1139 def schedule_pre_job_tasks(self
, queue_entry
):
1141 Get a list of tasks to perform before the host_queue_entry
1142 may be used to run this Job (such as Cleanup & Verify).
1144 @returns A list of tasks to be done to the given queue_entry before
1145 it should be considered be ready to run this job. The last
1146 task in the list calls HostQueueEntry.on_pending(), which
1147 continues the flow of the job.
1149 if self
._should
_run
_cleanup
(queue_entry
):
1150 task
= models
.SpecialTask
.Task
.CLEANUP
1151 elif self
._should
_run
_verify
(queue_entry
):
1152 task
= models
.SpecialTask
.Task
.VERIFY
1154 queue_entry
.on_pending()
1157 queue_entry
= models
.HostQueueEntry
.objects
.get(id=queue_entry
.id)
1158 models
.SpecialTask
.objects
.create(
1159 host
=models
.Host
.objects
.get(id=queue_entry
.host_id
),
1160 queue_entry
=queue_entry
, task
=task
)
1163 def _assign_new_group(self
, queue_entries
, group_name
=''):
1164 if len(queue_entries
) == 1:
1165 group_subdir_name
= queue_entries
[0].host
.hostname
1167 group_subdir_name
= self
._next
_group
_name
(group_name
)
1168 logging
.info('Running synchronous job %d hosts %s as %s',
1169 self
.id, [entry
.host
.hostname
for entry
in queue_entries
],
1172 for queue_entry
in queue_entries
:
1173 queue_entry
.set_execution_subdir(group_subdir_name
)
1176 def _choose_group_to_run(self
, include_queue_entry
):
1178 @returns A tuple containing a list of HostQueueEntry instances to be
1179 used to run this Job, a string group name to suggest giving
1180 to this job in the results database.
1182 atomic_group
= include_queue_entry
.atomic_group
1183 chosen_entries
= [include_queue_entry
]
1185 num_entries_wanted
= atomic_group
.max_number_of_machines
1187 num_entries_wanted
= self
.synch_count
1188 num_entries_wanted
-= len(chosen_entries
)
1190 if num_entries_wanted
> 0:
1191 where_clause
= 'job_id = %s AND status = "Pending" AND id != %s'
1192 pending_entries
= list(HostQueueEntry
.fetch(
1194 params
=(self
.id, include_queue_entry
.id)))
1196 # Sort the chosen hosts by hostname before slicing.
1197 def cmp_queue_entries_by_hostname(entry_a
, entry_b
):
1198 return Host
.cmp_for_sort(entry_a
.host
, entry_b
.host
)
1199 pending_entries
.sort(cmp=cmp_queue_entries_by_hostname
)
1200 chosen_entries
+= pending_entries
[:num_entries_wanted
]
1202 # Sanity check. We'll only ever be called if this can be met.
1203 if len(chosen_entries
) < self
.synch_count
:
1204 message
= ('job %s got less than %s chosen entries: %s' % (
1205 self
.id, self
.synch_count
, chosen_entries
))
1206 logging
.error(message
)
1207 email_manager
.manager
.enqueue_notify_email(
1208 'Job not started, too few chosen entries', message
)
1211 group_name
= include_queue_entry
.get_group_name()
1213 self
._assign
_new
_group
(chosen_entries
, group_name
=group_name
)
1214 return chosen_entries
1217 def run_if_ready(self
, queue_entry
):
1219 Run this job by kicking its HQEs into status='Starting' if enough
1220 hosts are ready for it to run.
1222 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1225 if not self
.is_ready():
1226 self
.stop_if_necessary()
1227 elif queue_entry
.atomic_group
:
1228 self
.run_with_ready_delay(queue_entry
)
1230 self
.run(queue_entry
)
1233 def run_with_ready_delay(self
, queue_entry
):
1235 Start a delay to wait for more hosts to enter Pending state before
1236 launching an atomic group job. Once set, the a delay cannot be reset.
1238 @param queue_entry: The HostQueueEntry object to get atomic group
1239 info from and pass to run_if_ready when the delay is up.
1241 @returns An Agent to run the job as appropriate or None if a delay
1242 has already been set.
1244 assert queue_entry
.job_id
== self
.id
1245 assert queue_entry
.atomic_group
1246 delay
= scheduler_config
.config
.secs_to_wait_for_atomic_group_hosts
1247 over_max_threshold
= (self
._pending
_count
() >=
1248 self
._max
_hosts
_needed
_to
_run
(queue_entry
.atomic_group
))
1249 delay_expired
= (self
._delay
_ready
_task
and
1250 time
.time() >= self
._delay
_ready
_task
.end_time
)
1252 # Delay is disabled or we already have enough? Do not wait to run.
1253 if not delay
or over_max_threshold
or delay_expired
:
1254 self
.run(queue_entry
)
1256 queue_entry
.set_status(models
.HostQueueEntry
.Status
.WAITING
)
1259 def request_abort(self
):
1260 """Request that this Job be aborted on the next scheduler cycle."""
1261 self
.model().abort()
1264 def schedule_delayed_callback_task(self
, queue_entry
):
1265 queue_entry
.set_status(models
.HostQueueEntry
.Status
.PENDING
)
1267 if self
._delay
_ready
_task
:
1270 delay
= scheduler_config
.config
.secs_to_wait_for_atomic_group_hosts
1272 def run_job_after_delay():
1273 logging
.info('Job %s done waiting for extra hosts.', self
)
1274 # Check to see if the job is still relevant. It could have aborted
1275 # while we were waiting or hosts could have disappearred, etc.
1276 if self
._pending
_count
() < self
._min
_hosts
_needed
_to
_run
():
1277 logging
.info('Job %s had too few Pending hosts after waiting '
1278 'for extras. Not running.', self
)
1279 self
.request_abort()
1281 return self
.run(queue_entry
)
1283 logging
.info('Job %s waiting up to %s seconds for more hosts.',
1285 self
._delay
_ready
_task
= DelayedCallTask(delay_seconds
=delay
,
1286 callback
=run_job_after_delay
)
1287 return self
._delay
_ready
_task
1290 def run(self
, queue_entry
):
1292 @param queue_entry: The HostQueueEntry instance calling this method.
1294 if queue_entry
.atomic_group
and self
._atomic
_and
_has
_started
():
1295 logging
.error('Job.run() called on running atomic Job %d '
1296 'with HQE %s.', self
.id, queue_entry
)
1298 queue_entries
= self
._choose
_group
_to
_run
(queue_entry
)
1300 self
._finish
_run
(queue_entries
)
1303 def _finish_run(self
, queue_entries
):
1304 for queue_entry
in queue_entries
:
1305 queue_entry
.set_status(models
.HostQueueEntry
.Status
.STARTING
)
1306 self
.abort_delay_ready_task()
1309 def abort_delay_ready_task(self
):
1310 """Abort the delayed task associated with this job, if any."""
1311 if self
._delay
_ready
_task
:
1312 # Cancel any pending callback that would try to run again
1313 # as we are already running.
1314 self
._delay
_ready
_task
.abort()
1318 return '%s-%s' % (self
.id, self
.owner
)