2 Autotest AFE Cleanup used by the scheduler
6 import datetime
, time
, logging
, random
7 from autotest_lib
.database
import database_connection
8 from autotest_lib
.frontend
.afe
import models
9 from autotest_lib
.scheduler
import email_manager
, scheduler_config
10 from autotest_lib
.client
.common_lib
import host_protections
13 class PeriodicCleanup(object):
16 def __init__(self
, db
, clean_interval
, run_at_initialize
=False):
18 self
.clean_interval
= clean_interval
19 self
._last
_clean
_time
= time
.time()
20 self
._run
_at
_initialize
= run_at_initialize
24 if self
._run
_at
_initialize
:
28 def run_cleanup_maybe(self
):
29 should_cleanup
= (self
._last
_clean
_time
+ self
.clean_interval
* 60
33 self
._last
_clean
_time
= time
.time()
37 """Abrstract cleanup method."""
38 raise NotImplementedError
41 class UserCleanup(PeriodicCleanup
):
42 """User cleanup that is controlled by the global config variable
43 clean_interval in the SCHEDULER section.
47 def __init__(self
, db
, clean_interval_minutes
):
48 super(UserCleanup
, self
).__init
__(db
, clean_interval_minutes
)
49 self
._last
_reverify
_time
= time
.time()
53 logging
.info('Running periodic cleanup')
54 self
._abort
_timed
_out
_jobs
()
55 self
._abort
_jobs
_past
_max
_runtime
()
56 self
._clear
_inactive
_blocks
()
57 self
._check
_for
_db
_inconsistencies
()
58 self
._reverify
_dead
_hosts
()
61 def _abort_timed_out_jobs(self
):
62 msg
= 'Aborting all jobs that have timed out and are not complete'
64 query
= models
.Job
.objects
.filter(hostqueueentry__complete
=False).extra(
65 where
=['created_on + INTERVAL timeout HOUR < NOW()'])
66 for job
in query
.distinct():
67 logging
.warning('Aborting job %d due to job timeout', job
.id)
71 def _abort_jobs_past_max_runtime(self
):
73 Abort executions that have started and are past the job's max runtime.
75 logging
.info('Aborting all jobs that have passed maximum runtime')
76 rows
= self
._db
.execute("""
78 FROM afe_host_queue_entries AS hqe
79 INNER JOIN afe_jobs ON (hqe.job_id = afe_jobs.id)
80 WHERE NOT hqe.complete AND NOT hqe.aborted AND
81 hqe.started_on + INTERVAL afe_jobs.max_runtime_hrs HOUR < NOW()""")
82 query
= models
.HostQueueEntry
.objects
.filter(
83 id__in
=[row
[0] for row
in rows
])
84 for queue_entry
in query
.distinct():
85 logging
.warning('Aborting entry %s due to max runtime', queue_entry
)
89 def _check_for_db_inconsistencies(self
):
90 logging
.info('Cleaning db inconsistencies')
91 self
._check
_all
_invalid
_related
_objects
()
94 def _check_invalid_related_objects_one_way(self
, first_model
,
95 relation_field
, second_model
):
96 if 'invalid' not in first_model
.get_field_dict():
98 invalid_objects
= list(first_model
.objects
.filter(invalid
=True))
99 first_model
.objects
.populate_relationships(invalid_objects
,
103 for invalid_object
in invalid_objects
:
104 if invalid_object
.related_objects
:
105 related_list
= ', '.join(str(related_object
) for related_object
106 in invalid_object
.related_objects
)
107 error_lines
.append('Invalid %s %s is related to %ss: %s'
108 % (first_model
.__name
__, invalid_object
,
109 second_model
.__name
__, related_list
))
110 related_manager
= getattr(invalid_object
, relation_field
)
111 related_manager
.clear()
115 def _check_invalid_related_objects(self
, first_model
, first_field
,
116 second_model
, second_field
):
117 errors
= self
._check
_invalid
_related
_objects
_one
_way
(
118 first_model
, first_field
, second_model
)
119 errors
.extend(self
._check
_invalid
_related
_objects
_one
_way
(
120 second_model
, second_field
, first_model
))
124 def _check_all_invalid_related_objects(self
):
125 model_pairs
= ((models
.Host
, 'labels', models
.Label
, 'host_set'),
126 (models
.AclGroup
, 'hosts', models
.Host
, 'aclgroup_set'),
127 (models
.AclGroup
, 'users', models
.User
, 'aclgroup_set'),
128 (models
.Test
, 'dependency_labels', models
.Label
,
131 for first_model
, first_field
, second_model
, second_field
in model_pairs
:
132 errors
.extend(self
._check
_invalid
_related
_objects
(
133 first_model
, first_field
, second_model
, second_field
))
136 subject
= ('%s relationships to invalid models, cleaned all' %
138 message
= '\n'.join(errors
)
139 logging
.warning(subject
)
140 logging
.warning(message
)
141 email_manager
.manager
.enqueue_notify_email(subject
, message
)
144 def _clear_inactive_blocks(self
):
145 msg
= 'Clear out blocks for all completed jobs.'
147 # this would be simpler using NOT IN (subquery), but MySQL
148 # treats all IN subqueries as dependent, so this optimizes much
151 DELETE ihq FROM afe_ineligible_host_queues ihq
152 LEFT JOIN (SELECT DISTINCT job_id FROM afe_host_queue_entries
153 WHERE NOT complete) hqe
154 USING (job_id) WHERE hqe.job_id IS NULL""")
157 def _should_reverify_hosts_now(self
):
158 reverify_period_sec
= (scheduler_config
.config
.reverify_period_minutes
160 if reverify_period_sec
== 0:
162 return (self
._last
_reverify
_time
+ reverify_period_sec
) <= time
.time()
165 def _choose_subset_of_hosts_to_reverify(self
, hosts
):
166 """Given hosts needing verification, return a subset to reverify."""
167 max_at_once
= scheduler_config
.config
.reverify_max_hosts_at_once
168 if (max_at_once
> 0 and len(hosts
) > max_at_once
):
169 return random
.sample(hosts
, max_at_once
)
173 def _reverify_dead_hosts(self
):
174 if not self
._should
_reverify
_hosts
_now
():
177 self
._last
_reverify
_time
= time
.time()
178 logging
.info('Checking for dead hosts to reverify')
179 hosts
= models
.Host
.objects
.filter(
180 status
=models
.Host
.Status
.REPAIR_FAILED
,
183 hosts
= hosts
.exclude(
184 protection
=host_protections
.Protection
.DO_NOT_VERIFY
)
189 total_hosts
= len(hosts
)
190 hosts
= self
._choose
_subset
_of
_hosts
_to
_reverify
(hosts
)
191 logging
.info('Reverifying dead hosts (%d of %d) %s', len(hosts
),
192 total_hosts
, ', '.join(host
.hostname
for host
in hosts
))
194 models
.SpecialTask
.schedule_special_task(
195 host
=host
, task
=models
.SpecialTask
.Task
.VERIFY
)
198 class TwentyFourHourUpkeep(PeriodicCleanup
):
199 """Cleanup that runs at the startup of monitor_db and every subsequent
204 def __init__(self
, db
, run_at_initialize
=True):
205 clean_interval
= 24 * 60 # 24 hours
206 super(TwentyFourHourUpkeep
, self
).__init
__(
207 db
, clean_interval
, run_at_initialize
=run_at_initialize
)
211 logging
.info('Running 24 hour clean up')
212 self
._django
_session
_cleanup
()
213 self
._check
_for
_uncleanable
_db
_inconsistencies
()
216 def _django_session_cleanup(self
):
217 """Clean up django_session since django doesn't for us.
218 http://www.djangoproject.com/documentation/0.96/sessions/
220 logging
.info('Deleting old sessions from django_session')
221 sql
= 'DELETE FROM django_session WHERE expire_date < NOW()'
222 self
._db
.execute(sql
)
225 def _check_for_uncleanable_db_inconsistencies(self
):
226 logging
.info('Checking for uncleanable DB inconsistencies')
227 self
._check
_for
_active
_and
_complete
_queue
_entries
()
228 self
._check
_for
_multiple
_platform
_hosts
()
229 self
._check
_for
_no
_platform
_hosts
()
230 self
._check
_for
_multiple
_atomic
_group
_hosts
()
233 def _check_for_active_and_complete_queue_entries(self
):
234 query
= models
.HostQueueEntry
.objects
.filter(active
=True, complete
=True)
235 if query
.count() != 0:
236 subject
= ('%d queue entries found with active=complete=1'
238 lines
= [str(entry
.get_object_dict()) for entry
in query
]
239 self
._send
_inconsistency
_message
(subject
, lines
)
242 def _check_for_multiple_platform_hosts(self
):
243 rows
= self
._db
.execute("""
244 SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count,
245 GROUP_CONCAT(afe_labels.name)
247 INNER JOIN afe_hosts_labels ON
248 afe_hosts.id = afe_hosts_labels.host_id
249 INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
250 WHERE afe_labels.platform
251 GROUP BY afe_hosts.id
252 HAVING platform_count > 1
253 ORDER BY hostname""")
255 subject
= '%s hosts with multiple platforms' % self
._db
.rowcount
256 lines
= [' '.join(str(item
) for item
in row
)
258 self
._send
_inconsistency
_message
(subject
, lines
)
261 def _check_for_no_platform_hosts(self
):
262 rows
= self
._db
.execute("""
265 LEFT JOIN afe_hosts_labels
266 ON afe_hosts.id = afe_hosts_labels.host_id
267 AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels
269 WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""")
271 logging
.warn('%s hosts with no platform\n%s', self
._db
.rowcount
,
272 ', '.join(row
[0] for row
in rows
))
275 def _check_for_multiple_atomic_group_hosts(self
):
276 rows
= self
._db
.execute("""
277 SELECT afe_hosts.id, hostname,
278 COUNT(DISTINCT afe_atomic_groups.name) AS atomic_group_count,
279 GROUP_CONCAT(afe_labels.name),
280 GROUP_CONCAT(afe_atomic_groups.name)
282 INNER JOIN afe_hosts_labels ON
283 afe_hosts.id = afe_hosts_labels.host_id
284 INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
285 INNER JOIN afe_atomic_groups ON
286 afe_labels.atomic_group_id = afe_atomic_groups.id
287 WHERE NOT afe_hosts.invalid AND NOT afe_labels.invalid
288 GROUP BY afe_hosts.id
289 HAVING atomic_group_count > 1
290 ORDER BY hostname""")
292 subject
= '%s hosts with multiple atomic groups' % self
._db
.rowcount
293 lines
= [' '.join(str(item
) for item
in row
)
295 self
._send
_inconsistency
_message
(subject
, lines
)
298 def _send_inconsistency_message(self
, subject
, lines
):
299 logging
.error(subject
)
300 message
= '\n'.join(lines
)
301 if len(message
) > 5000:
302 message
= message
[:5000] + '\n(truncated)\n'
303 email_manager
.manager
.enqueue_notify_email(subject
, message
)