KVM test: installer: Fix KojiInstaller bug
[autotest-zwu.git] / scheduler / monitor_db_cleanup.py
blobd8bab116c6d145c647106780d5f0144269765ae9
1 """
2 Autotest AFE Cleanup used by the scheduler
3 """
6 import datetime, time, logging, random
7 from autotest_lib.database import database_connection
8 from autotest_lib.frontend.afe import models
9 from autotest_lib.scheduler import email_manager, scheduler_config
10 from autotest_lib.client.common_lib import host_protections
13 class PeriodicCleanup(object):
16 def __init__(self, db, clean_interval, run_at_initialize=False):
17 self._db = db
18 self.clean_interval = clean_interval
19 self._last_clean_time = time.time()
20 self._run_at_initialize = run_at_initialize
23 def initialize(self):
24 if self._run_at_initialize:
25 self._cleanup()
28 def run_cleanup_maybe(self):
29 should_cleanup = (self._last_clean_time + self.clean_interval * 60
30 < time.time())
31 if should_cleanup:
32 self._cleanup()
33 self._last_clean_time = time.time()
36 def _cleanup(self):
37 """Abrstract cleanup method."""
38 raise NotImplementedError
41 class UserCleanup(PeriodicCleanup):
42 """User cleanup that is controlled by the global config variable
43 clean_interval in the SCHEDULER section.
44 """
47 def __init__(self, db, clean_interval_minutes):
48 super(UserCleanup, self).__init__(db, clean_interval_minutes)
49 self._last_reverify_time = time.time()
52 def _cleanup(self):
53 logging.info('Running periodic cleanup')
54 self._abort_timed_out_jobs()
55 self._abort_jobs_past_max_runtime()
56 self._clear_inactive_blocks()
57 self._check_for_db_inconsistencies()
58 self._reverify_dead_hosts()
61 def _abort_timed_out_jobs(self):
62 msg = 'Aborting all jobs that have timed out and are not complete'
63 logging.info(msg)
64 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
65 where=['created_on + INTERVAL timeout HOUR < NOW()'])
66 for job in query.distinct():
67 logging.warning('Aborting job %d due to job timeout', job.id)
68 job.abort()
71 def _abort_jobs_past_max_runtime(self):
72 """
73 Abort executions that have started and are past the job's max runtime.
74 """
75 logging.info('Aborting all jobs that have passed maximum runtime')
76 rows = self._db.execute("""
77 SELECT hqe.id
78 FROM afe_host_queue_entries AS hqe
79 INNER JOIN afe_jobs ON (hqe.job_id = afe_jobs.id)
80 WHERE NOT hqe.complete AND NOT hqe.aborted AND
81 hqe.started_on + INTERVAL afe_jobs.max_runtime_hrs HOUR < NOW()""")
82 query = models.HostQueueEntry.objects.filter(
83 id__in=[row[0] for row in rows])
84 for queue_entry in query.distinct():
85 logging.warning('Aborting entry %s due to max runtime', queue_entry)
86 queue_entry.abort()
89 def _check_for_db_inconsistencies(self):
90 logging.info('Cleaning db inconsistencies')
91 self._check_all_invalid_related_objects()
94 def _check_invalid_related_objects_one_way(self, first_model,
95 relation_field, second_model):
96 if 'invalid' not in first_model.get_field_dict():
97 return []
98 invalid_objects = list(first_model.objects.filter(invalid=True))
99 first_model.objects.populate_relationships(invalid_objects,
100 second_model,
101 'related_objects')
102 error_lines = []
103 for invalid_object in invalid_objects:
104 if invalid_object.related_objects:
105 related_list = ', '.join(str(related_object) for related_object
106 in invalid_object.related_objects)
107 error_lines.append('Invalid %s %s is related to %ss: %s'
108 % (first_model.__name__, invalid_object,
109 second_model.__name__, related_list))
110 related_manager = getattr(invalid_object, relation_field)
111 related_manager.clear()
112 return error_lines
115 def _check_invalid_related_objects(self, first_model, first_field,
116 second_model, second_field):
117 errors = self._check_invalid_related_objects_one_way(
118 first_model, first_field, second_model)
119 errors.extend(self._check_invalid_related_objects_one_way(
120 second_model, second_field, first_model))
121 return errors
124 def _check_all_invalid_related_objects(self):
125 model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
126 (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
127 (models.AclGroup, 'users', models.User, 'aclgroup_set'),
128 (models.Test, 'dependency_labels', models.Label,
129 'test_set'))
130 errors = []
131 for first_model, first_field, second_model, second_field in model_pairs:
132 errors.extend(self._check_invalid_related_objects(
133 first_model, first_field, second_model, second_field))
135 if errors:
136 subject = ('%s relationships to invalid models, cleaned all' %
137 len(errors))
138 message = '\n'.join(errors)
139 logging.warning(subject)
140 logging.warning(message)
141 email_manager.manager.enqueue_notify_email(subject, message)
144 def _clear_inactive_blocks(self):
145 msg = 'Clear out blocks for all completed jobs.'
146 logging.info(msg)
147 # this would be simpler using NOT IN (subquery), but MySQL
148 # treats all IN subqueries as dependent, so this optimizes much
149 # better
150 self._db.execute("""
151 DELETE ihq FROM afe_ineligible_host_queues ihq
152 LEFT JOIN (SELECT DISTINCT job_id FROM afe_host_queue_entries
153 WHERE NOT complete) hqe
154 USING (job_id) WHERE hqe.job_id IS NULL""")
157 def _should_reverify_hosts_now(self):
158 reverify_period_sec = (scheduler_config.config.reverify_period_minutes
159 * 60)
160 if reverify_period_sec == 0:
161 return False
162 return (self._last_reverify_time + reverify_period_sec) <= time.time()
165 def _choose_subset_of_hosts_to_reverify(self, hosts):
166 """Given hosts needing verification, return a subset to reverify."""
167 max_at_once = scheduler_config.config.reverify_max_hosts_at_once
168 if (max_at_once > 0 and len(hosts) > max_at_once):
169 return random.sample(hosts, max_at_once)
170 return sorted(hosts)
173 def _reverify_dead_hosts(self):
174 if not self._should_reverify_hosts_now():
175 return
177 self._last_reverify_time = time.time()
178 logging.info('Checking for dead hosts to reverify')
179 hosts = models.Host.objects.filter(
180 status=models.Host.Status.REPAIR_FAILED,
181 locked=False,
182 invalid=False)
183 hosts = hosts.exclude(
184 protection=host_protections.Protection.DO_NOT_VERIFY)
185 if not hosts:
186 return
188 hosts = list(hosts)
189 total_hosts = len(hosts)
190 hosts = self._choose_subset_of_hosts_to_reverify(hosts)
191 logging.info('Reverifying dead hosts (%d of %d) %s', len(hosts),
192 total_hosts, ', '.join(host.hostname for host in hosts))
193 for host in hosts:
194 models.SpecialTask.schedule_special_task(
195 host=host, task=models.SpecialTask.Task.VERIFY)
198 class TwentyFourHourUpkeep(PeriodicCleanup):
199 """Cleanup that runs at the startup of monitor_db and every subsequent
200 twenty four hours.
204 def __init__(self, db, run_at_initialize=True):
205 clean_interval = 24 * 60 # 24 hours
206 super(TwentyFourHourUpkeep, self).__init__(
207 db, clean_interval, run_at_initialize=run_at_initialize)
210 def _cleanup(self):
211 logging.info('Running 24 hour clean up')
212 self._django_session_cleanup()
213 self._check_for_uncleanable_db_inconsistencies()
216 def _django_session_cleanup(self):
217 """Clean up django_session since django doesn't for us.
218 http://www.djangoproject.com/documentation/0.96/sessions/
220 logging.info('Deleting old sessions from django_session')
221 sql = 'DELETE FROM django_session WHERE expire_date < NOW()'
222 self._db.execute(sql)
225 def _check_for_uncleanable_db_inconsistencies(self):
226 logging.info('Checking for uncleanable DB inconsistencies')
227 self._check_for_active_and_complete_queue_entries()
228 self._check_for_multiple_platform_hosts()
229 self._check_for_no_platform_hosts()
230 self._check_for_multiple_atomic_group_hosts()
233 def _check_for_active_and_complete_queue_entries(self):
234 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
235 if query.count() != 0:
236 subject = ('%d queue entries found with active=complete=1'
237 % query.count())
238 lines = [str(entry.get_object_dict()) for entry in query]
239 self._send_inconsistency_message(subject, lines)
242 def _check_for_multiple_platform_hosts(self):
243 rows = self._db.execute("""
244 SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count,
245 GROUP_CONCAT(afe_labels.name)
246 FROM afe_hosts
247 INNER JOIN afe_hosts_labels ON
248 afe_hosts.id = afe_hosts_labels.host_id
249 INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
250 WHERE afe_labels.platform
251 GROUP BY afe_hosts.id
252 HAVING platform_count > 1
253 ORDER BY hostname""")
254 if rows:
255 subject = '%s hosts with multiple platforms' % self._db.rowcount
256 lines = [' '.join(str(item) for item in row)
257 for row in rows]
258 self._send_inconsistency_message(subject, lines)
261 def _check_for_no_platform_hosts(self):
262 rows = self._db.execute("""
263 SELECT hostname
264 FROM afe_hosts
265 LEFT JOIN afe_hosts_labels
266 ON afe_hosts.id = afe_hosts_labels.host_id
267 AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels
268 WHERE platform)
269 WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""")
270 if rows:
271 logging.warn('%s hosts with no platform\n%s', self._db.rowcount,
272 ', '.join(row[0] for row in rows))
275 def _check_for_multiple_atomic_group_hosts(self):
276 rows = self._db.execute("""
277 SELECT afe_hosts.id, hostname,
278 COUNT(DISTINCT afe_atomic_groups.name) AS atomic_group_count,
279 GROUP_CONCAT(afe_labels.name),
280 GROUP_CONCAT(afe_atomic_groups.name)
281 FROM afe_hosts
282 INNER JOIN afe_hosts_labels ON
283 afe_hosts.id = afe_hosts_labels.host_id
284 INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
285 INNER JOIN afe_atomic_groups ON
286 afe_labels.atomic_group_id = afe_atomic_groups.id
287 WHERE NOT afe_hosts.invalid AND NOT afe_labels.invalid
288 GROUP BY afe_hosts.id
289 HAVING atomic_group_count > 1
290 ORDER BY hostname""")
291 if rows:
292 subject = '%s hosts with multiple atomic groups' % self._db.rowcount
293 lines = [' '.join(str(item) for item in row)
294 for row in rows]
295 self._send_inconsistency_message(subject, lines)
298 def _send_inconsistency_message(self, subject, lines):
299 logging.error(subject)
300 message = '\n'.join(lines)
301 if len(message) > 5000:
302 message = message[:5000] + '\n(truncated)\n'
303 email_manager.manager.enqueue_notify_email(subject, message)