utils.external_packages: Introduce minimum_version
[autotest-zwu.git] / scheduler / scheduler_models_unittest.py
blobdf7fe35955ec015760e58458bbcba988feed03a7
1 #!/usr/bin/python
3 import datetime
4 import common
5 from autotest_lib.frontend import setup_django_environment
6 from autotest_lib.frontend.afe import frontend_test_utils
7 from autotest_lib.client.common_lib.test_utils import mock
8 from autotest_lib.client.common_lib.test_utils import unittest
9 from autotest_lib.database import database_connection
10 from autotest_lib.frontend.afe import models, model_attributes
11 from autotest_lib.scheduler import monitor_db_functional_test
12 from autotest_lib.scheduler import scheduler_models
14 _DEBUG = False
17 class BaseSchedulerModelsTest(unittest.TestCase,
18 frontend_test_utils.FrontendTestMixin):
19 _config_section = 'AUTOTEST_WEB'
21 def _do_query(self, sql):
22 self._database.execute(sql)
25 def _set_monitor_stubs(self):
26 # Clear the instance cache as this is a brand new database.
27 scheduler_models.DBObject._clear_instance_cache()
29 self._database = (
30 database_connection.TranslatingDatabase.get_test_database(
31 translators=monitor_db_functional_test._DB_TRANSLATORS))
32 self._database.connect(db_type='django')
33 self._database.debug = _DEBUG
35 self.god.stub_with(scheduler_models, '_db', self._database)
38 def setUp(self):
39 self._frontend_common_setup()
40 self._set_monitor_stubs()
43 def tearDown(self):
44 self._database.disconnect()
45 self._frontend_common_teardown()
48 def _update_hqe(self, set, where=''):
49 query = 'UPDATE afe_host_queue_entries SET ' + set
50 if where:
51 query += ' WHERE ' + where
52 self._do_query(query)
55 class DelayedCallTaskTest(unittest.TestCase):
56 def setUp(self):
57 self.god = mock.mock_god()
60 def tearDown(self):
61 self.god.unstub_all()
64 def test_delayed_call(self):
65 test_time = self.god.create_mock_function('time')
66 test_time.expect_call().and_return(33)
67 test_time.expect_call().and_return(34.01)
68 test_time.expect_call().and_return(34.99)
69 test_time.expect_call().and_return(35.01)
70 def test_callback():
71 test_callback.calls += 1
72 test_callback.calls = 0
73 delay_task = scheduler_models.DelayedCallTask(
74 delay_seconds=2, callback=test_callback,
75 now_func=test_time) # time 33
76 self.assertEqual(35, delay_task.end_time)
77 delay_task.poll() # activates the task and polls it once, time 34.01
78 self.assertEqual(0, test_callback.calls, "callback called early")
79 delay_task.poll() # time 34.99
80 self.assertEqual(0, test_callback.calls, "callback called early")
81 delay_task.poll() # time 35.01
82 self.assertEqual(1, test_callback.calls)
83 self.assert_(delay_task.is_done())
84 self.assert_(delay_task.success)
85 self.assert_(not delay_task.aborted)
86 self.god.check_playback()
89 def test_delayed_call_abort(self):
90 delay_task = scheduler_models.DelayedCallTask(
91 delay_seconds=987654, callback=lambda : None)
92 delay_task.abort()
93 self.assert_(delay_task.aborted)
94 self.assert_(delay_task.is_done())
95 self.assert_(not delay_task.success)
96 self.god.check_playback()
99 class DBObjectTest(BaseSchedulerModelsTest):
100 def test_compare_fields_in_row(self):
101 host = scheduler_models.Host(id=1)
102 fields = list(host._fields)
103 row_data = [getattr(host, fieldname) for fieldname in fields]
104 self.assertEqual({}, host._compare_fields_in_row(row_data))
105 row_data[fields.index('hostname')] = 'spam'
106 self.assertEqual({'hostname': ('host1', 'spam')},
107 host._compare_fields_in_row(row_data))
108 row_data[fields.index('id')] = 23
109 self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)},
110 host._compare_fields_in_row(row_data))
113 def test_compare_fields_in_row_datetime_ignores_microseconds(self):
114 datetime_with_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 7890)
115 datetime_without_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 0)
116 class TestTable(scheduler_models.DBObject):
117 _table_name = 'test_table'
118 _fields = ('id', 'test_datetime')
119 tt = TestTable(row=[1, datetime_without_us])
120 self.assertEqual({}, tt._compare_fields_in_row([1, datetime_with_us]))
123 def test_always_query(self):
124 host_a = scheduler_models.Host(id=2)
125 self.assertEqual(host_a.hostname, 'host2')
126 self._do_query('UPDATE afe_hosts SET hostname="host2-updated" '
127 'WHERE id=2')
128 host_b = scheduler_models.Host(id=2, always_query=True)
129 self.assert_(host_a is host_b, 'Cached instance not returned.')
130 self.assertEqual(host_a.hostname, 'host2-updated',
131 'Database was not re-queried')
133 # If either of these are called, a query was made when it shouldn't be.
134 host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!')
135 host_a._update_fields_from_row = host_a._compare_fields_in_row
136 host_c = scheduler_models.Host(id=2, always_query=False)
137 self.assert_(host_a is host_c, 'Cached instance not returned')
140 def test_delete(self):
141 host = scheduler_models.Host(id=3)
142 host.delete()
143 host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3,
144 always_query=False)
145 host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3,
146 always_query=True)
148 def test_save(self):
149 # Dummy Job to avoid creating a one in the HostQueueEntry __init__.
150 class MockJob(object):
151 def __init__(self, id):
152 pass
153 def tag(self):
154 return 'MockJob'
155 self.god.stub_with(scheduler_models, 'Job', MockJob)
156 hqe = scheduler_models.HostQueueEntry(
157 new_record=True,
158 row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False, None])
159 hqe.save()
160 new_id = hqe.id
161 # Force a re-query and verify that the correct data was stored.
162 scheduler_models.DBObject._clear_instance_cache()
163 hqe = scheduler_models.HostQueueEntry(id=new_id)
164 self.assertEqual(hqe.id, new_id)
165 self.assertEqual(hqe.job_id, 1)
166 self.assertEqual(hqe.host_id, 2)
167 self.assertEqual(hqe.status, 'Queued')
168 self.assertEqual(hqe.meta_host, None)
169 self.assertEqual(hqe.active, False)
170 self.assertEqual(hqe.complete, False)
171 self.assertEqual(hqe.deleted, False)
172 self.assertEqual(hqe.execution_subdir, '.')
173 self.assertEqual(hqe.atomic_group_id, None)
174 self.assertEqual(hqe.started_on, None)
177 class HostTest(BaseSchedulerModelsTest):
178 def test_cmp_for_sort(self):
179 expected_order = [
180 'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010',
181 'host10', 'host11', 'yolkfolk']
182 hostname_idx = list(scheduler_models.Host._fields).index('hostname')
183 row = [None] * len(scheduler_models.Host._fields)
184 hosts = []
185 for hostname in expected_order:
186 row[hostname_idx] = hostname
187 hosts.append(scheduler_models.Host(row=row, new_record=True))
189 host1 = hosts[expected_order.index('Host1')]
190 host010 = hosts[expected_order.index('HOST010')]
191 host10 = hosts[expected_order.index('host10')]
192 host3 = hosts[expected_order.index('host3')]
193 alice = hosts[expected_order.index('alice')]
194 self.assertEqual(0, scheduler_models.Host.cmp_for_sort(host10, host10))
195 self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host10, host010))
196 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host010, host10))
197 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host10))
198 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host010))
199 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host10))
200 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host010))
201 self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, host1))
202 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host3))
203 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(alice, host3))
204 self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, alice))
205 self.assertEqual(0, scheduler_models.Host.cmp_for_sort(alice, alice))
207 hosts.sort(cmp=scheduler_models.Host.cmp_for_sort)
208 self.assertEqual(expected_order, [h.hostname for h in hosts])
210 hosts.reverse()
211 hosts.sort(cmp=scheduler_models.Host.cmp_for_sort)
212 self.assertEqual(expected_order, [h.hostname for h in hosts])
215 class HostQueueEntryTest(BaseSchedulerModelsTest):
216 def _create_hqe(self, dependency_labels=(), **create_job_kwargs):
217 job = self._create_job(**create_job_kwargs)
218 for label in dependency_labels:
219 job.dependency_labels.add(label)
220 hqes = list(scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % job.id))
221 self.assertEqual(1, len(hqes))
222 return hqes[0]
225 def _check_hqe_labels(self, hqe, expected_labels):
226 expected_labels = set(expected_labels)
227 label_names = set(label.name for label in hqe.get_labels())
228 self.assertEqual(expected_labels, label_names)
231 def test_get_labels_empty(self):
232 hqe = self._create_hqe(hosts=[1])
233 labels = list(hqe.get_labels())
234 self.assertEqual([], labels)
237 def test_get_labels_metahost(self):
238 hqe = self._create_hqe(metahosts=[2])
239 self._check_hqe_labels(hqe, ['label2'])
242 def test_get_labels_dependancies(self):
243 hqe = self._create_hqe(dependency_labels=(self.label3, self.label4),
244 metahosts=[1])
245 self._check_hqe_labels(hqe, ['label1', 'label3', 'label4'])
248 class JobTest(BaseSchedulerModelsTest):
249 def setUp(self):
250 super(JobTest, self).setUp()
252 def _mock_create(**kwargs):
253 task = models.SpecialTask(**kwargs)
254 task.save()
255 self._task = task
256 self.god.stub_with(models.SpecialTask.objects, 'create', _mock_create)
259 def _test_pre_job_tasks_helper(self):
261 Calls HQE._do_schedule_pre_job_tasks() and returns the created special
262 task
264 self._task = None
265 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
266 queue_entry._do_schedule_pre_job_tasks()
267 return self._task
270 def test_job_request_abort(self):
271 django_job = self._create_job(hosts=[5, 6], atomic_group=1)
272 job = scheduler_models.Job(django_job.id)
273 job.request_abort()
274 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
275 for hqe in django_hqes:
276 self.assertTrue(hqe.aborted)
279 def test__atomic_and_has_started__on_atomic(self):
280 self._create_job(hosts=[5, 6], atomic_group=1)
281 job = scheduler_models.Job.fetch('id = 1')[0]
282 self.assertFalse(job._atomic_and_has_started())
284 self._update_hqe("status='Pending'")
285 self.assertFalse(job._atomic_and_has_started())
286 self._update_hqe("status='Verifying'")
287 self.assertFalse(job._atomic_and_has_started())
288 self.assertFalse(job._atomic_and_has_started())
289 self._update_hqe("status='Failed'")
290 self.assertFalse(job._atomic_and_has_started())
291 self._update_hqe("status='Stopped'")
292 self.assertFalse(job._atomic_and_has_started())
294 self._update_hqe("status='Starting'")
295 self.assertTrue(job._atomic_and_has_started())
296 self._update_hqe("status='Completed'")
297 self.assertTrue(job._atomic_and_has_started())
298 self._update_hqe("status='Aborted'")
301 def test__atomic_and_has_started__not_atomic(self):
302 self._create_job(hosts=[1, 2])
303 job = scheduler_models.Job.fetch('id = 1')[0]
304 self.assertFalse(job._atomic_and_has_started())
305 self._update_hqe("status='Starting'")
306 self.assertFalse(job._atomic_and_has_started())
309 def _check_special_task(self, task, task_type, queue_entry_id=None):
310 self.assertEquals(task.task, task_type)
311 self.assertEquals(task.host.id, 1)
312 if queue_entry_id:
313 self.assertEquals(task.queue_entry.id, queue_entry_id)
316 def test_run_asynchronous(self):
317 self._create_job(hosts=[1, 2])
319 task = self._test_pre_job_tasks_helper()
321 self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)
324 def test_run_asynchronous_skip_verify(self):
325 job = self._create_job(hosts=[1, 2])
326 job.run_verify = False
327 job.save()
329 task = self._test_pre_job_tasks_helper()
331 self.assertEquals(task, None)
334 def test_run_synchronous_verify(self):
335 self._create_job(hosts=[1, 2], synchronous=True)
337 task = self._test_pre_job_tasks_helper()
339 self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)
342 def test_run_synchronous_skip_verify(self):
343 job = self._create_job(hosts=[1, 2], synchronous=True)
344 job.run_verify = False
345 job.save()
347 task = self._test_pre_job_tasks_helper()
349 self.assertEquals(task, None)
352 def test_run_atomic_group_already_started(self):
353 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
354 self._update_hqe("status='Starting', execution_subdir=''")
356 job = scheduler_models.Job.fetch('id = 1')[0]
357 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
358 assert queue_entry.job is job
359 self.assertEqual(None, job.run(queue_entry))
361 self.god.check_playback()
364 def test_reboot_before_always(self):
365 job = self._create_job(hosts=[1])
366 job.reboot_before = model_attributes.RebootBefore.ALWAYS
367 job.save()
369 task = self._test_pre_job_tasks_helper()
371 self._check_special_task(task, models.SpecialTask.Task.CLEANUP)
374 def _test_reboot_before_if_dirty_helper(self, expect_reboot):
375 job = self._create_job(hosts=[1])
376 job.reboot_before = model_attributes.RebootBefore.IF_DIRTY
377 job.save()
379 task = self._test_pre_job_tasks_helper()
380 if expect_reboot:
381 task_type = models.SpecialTask.Task.CLEANUP
382 else:
383 task_type = models.SpecialTask.Task.VERIFY
384 self._check_special_task(task, task_type)
387 def test_reboot_before_if_dirty(self):
388 models.Host.smart_get(1).update_object(dirty=True)
389 self._test_reboot_before_if_dirty_helper(True)
392 def test_reboot_before_not_dirty(self):
393 models.Host.smart_get(1).update_object(dirty=False)
394 self._test_reboot_before_if_dirty_helper(False)
397 def test_next_group_name(self):
398 django_job = self._create_job(metahosts=[1])
399 job = scheduler_models.Job(id=django_job.id)
400 self.assertEqual('group0', job._next_group_name())
402 for hqe in django_job.hostqueueentry_set.filter():
403 hqe.execution_subdir = 'my_rack.group0'
404 hqe.save()
405 self.assertEqual('my_rack.group1', job._next_group_name('my/rack'))
408 if __name__ == '__main__':
409 unittest.main()