3 import gc
, logging
, time
5 from autotest_lib
.frontend
import setup_django_environment
6 from autotest_lib
.frontend
.afe
import frontend_test_utils
7 from autotest_lib
.client
.common_lib
.test_utils
import mock
8 from autotest_lib
.client
.common_lib
.test_utils
import unittest
9 from autotest_lib
.database
import database_connection
10 from autotest_lib
.frontend
.afe
import models
11 from autotest_lib
.scheduler
import monitor_db
, drone_manager
, email_manager
12 from autotest_lib
.scheduler
import scheduler_config
, gc_stats
, host_scheduler
13 from autotest_lib
.scheduler
import monitor_db_functional_test
14 from autotest_lib
.scheduler
import scheduler_models
19 class DummyAgentTask(object):
21 owner_username
= 'my_user'
23 def get_drone_hostnames_allowed(self
):
27 class DummyAgent(object):
34 self
.task
= DummyAgentTask()
45 def set_done(self
, done
):
49 class IsRow(mock
.argument_comparator
):
50 def __init__(self
, row_id
):
54 def is_satisfied_by(self
, parameter
):
55 return list(parameter
)[0] == self
.row_id
59 return 'row with id %s' % self
.row_id
62 class IsAgentWithTask(mock
.argument_comparator
):
63 def __init__(self
, task
):
67 def is_satisfied_by(self
, parameter
):
68 if not isinstance(parameter
, monitor_db
.Agent
):
70 tasks
= list(parameter
.queue
.queue
)
73 return tasks
[0] == self
._task
76 def _set_host_and_qe_ids(agent_or_task
, id_list
=None):
79 agent_or_task
.host_ids
= agent_or_task
.queue_entry_ids
= id_list
82 class BaseSchedulerTest(unittest
.TestCase
,
83 frontend_test_utils
.FrontendTestMixin
):
84 _config_section
= 'AUTOTEST_WEB'
86 def _do_query(self
, sql
):
87 self
._database
.execute(sql
)
90 def _set_monitor_stubs(self
):
91 # Clear the instance cache as this is a brand new database.
92 scheduler_models
.DBObject
._clear
_instance
_cache
()
95 database_connection
.TranslatingDatabase
.get_test_database(
96 translators
=monitor_db_functional_test
._DB
_TRANSLATORS
))
97 self
._database
.connect(db_type
='django')
98 self
._database
.debug
= _DEBUG
100 self
.god
.stub_with(monitor_db
, '_db', self
._database
)
101 self
.god
.stub_with(scheduler_models
, '_db', self
._database
)
102 self
.god
.stub_with(drone_manager
.instance(), '_results_dir',
104 self
.god
.stub_with(drone_manager
.instance(), '_temporary_directory',
107 monitor_db
.initialize_globals()
108 scheduler_models
.initialize_globals()
112 self
._frontend
_common
_setup
()
113 self
._set
_monitor
_stubs
()
114 self
._dispatcher
= monitor_db
.Dispatcher()
118 self
._database
.disconnect()
119 self
._frontend
_common
_teardown
()
122 def _update_hqe(self
, set, where
=''):
123 query
= 'UPDATE afe_host_queue_entries SET ' + set
125 query
+= ' WHERE ' + where
126 self
._do
_query
(query
)
129 class DispatcherSchedulingTest(BaseSchedulerTest
):
134 super(DispatcherSchedulingTest
, self
).tearDown()
137 def _set_monitor_stubs(self
):
138 super(DispatcherSchedulingTest
, self
)._set
_monitor
_stubs
()
140 def hqe__do_schedule_pre_job_tasks_stub(queue_entry
):
141 """Called by HostQueueEntry.run()."""
142 self
._record
_job
_scheduled
(queue_entry
.job
.id, queue_entry
.host
.id)
143 queue_entry
.set_status('Starting')
145 self
.god
.stub_with(scheduler_models
.HostQueueEntry
,
146 '_do_schedule_pre_job_tasks',
147 hqe__do_schedule_pre_job_tasks_stub
)
149 def hqe_queue_log_record_stub(self
, log_line
):
150 """No-Op to avoid calls down to the _drone_manager during tests."""
152 self
.god
.stub_with(scheduler_models
.HostQueueEntry
, 'queue_log_record',
153 hqe_queue_log_record_stub
)
156 def _record_job_scheduled(self
, job_id
, host_id
):
157 record
= (job_id
, host_id
)
158 self
.assert_(record
not in self
._jobs
_scheduled
,
159 'Job %d scheduled on host %d twice' %
161 self
._jobs
_scheduled
.append(record
)
164 def _assert_job_scheduled_on(self
, job_id
, host_id
):
165 record
= (job_id
, host_id
)
166 self
.assert_(record
in self
._jobs
_scheduled
,
167 'Job %d not scheduled on host %d as expected\n'
168 'Jobs scheduled: %s' %
169 (job_id
, host_id
, self
._jobs
_scheduled
))
170 self
._jobs
_scheduled
.remove(record
)
173 def _assert_job_scheduled_on_number_of(self
, job_id
, host_ids
, number
):
174 """Assert job was scheduled on exactly number hosts out of a set."""
176 for host_id
in host_ids
:
177 record
= (job_id
, host_id
)
178 if record
in self
._jobs
_scheduled
:
180 self
._jobs
_scheduled
.remove(record
)
181 if len(found
) < number
:
182 self
.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
183 'Jobs scheduled: %s' % (job_id
, number
, host_ids
, found
))
184 elif len(found
) > number
:
185 self
.fail('Job %d scheduled on more than %d hosts in %s.\n'
186 'Jobs scheduled: %s' % (job_id
, number
, host_ids
, found
))
189 def _check_for_extra_schedulings(self
):
190 if len(self
._jobs
_scheduled
) != 0:
191 self
.fail('Extra jobs scheduled: ' +
192 str(self
._jobs
_scheduled
))
195 def _convert_jobs_to_metahosts(self
, *job_ids
):
196 sql_tuple
= '(' + ','.join(str(i
) for i
in job_ids
) + ')'
197 self
._do
_query
('UPDATE afe_host_queue_entries SET '
198 'meta_host=host_id, host_id=NULL '
199 'WHERE job_id IN ' + sql_tuple
)
202 def _lock_host(self
, host_id
):
203 self
._do
_query
('UPDATE afe_hosts SET locked=1 WHERE id=' +
208 super(DispatcherSchedulingTest
, self
).setUp()
209 self
._jobs
_scheduled
= []
212 def _run_scheduler(self
):
213 for _
in xrange(2): # metahost scheduling can take two cycles
214 self
._dispatcher
._schedule
_new
_jobs
()
217 def _test_basic_scheduling_helper(self
, use_metahosts
):
218 'Basic nonmetahost scheduling'
219 self
._create
_job
_simple
([1], use_metahosts
)
220 self
._create
_job
_simple
([2], use_metahosts
)
221 self
._run
_scheduler
()
222 self
._assert
_job
_scheduled
_on
(1, 1)
223 self
._assert
_job
_scheduled
_on
(2, 2)
224 self
._check
_for
_extra
_schedulings
()
227 def _test_priorities_helper(self
, use_metahosts
):
228 'Test prioritization ordering'
229 self
._create
_job
_simple
([1], use_metahosts
)
230 self
._create
_job
_simple
([2], use_metahosts
)
231 self
._create
_job
_simple
([1,2], use_metahosts
)
232 self
._create
_job
_simple
([1], use_metahosts
, priority
=1)
233 self
._run
_scheduler
()
234 self
._assert
_job
_scheduled
_on
(4, 1) # higher priority
235 self
._assert
_job
_scheduled
_on
(2, 2) # earlier job over later
236 self
._check
_for
_extra
_schedulings
()
239 def _test_hosts_ready_helper(self
, use_metahosts
):
241 Only hosts that are status=Ready, unlocked and not invalid get
244 self
._create
_job
_simple
([1], use_metahosts
)
245 self
._do
_query
('UPDATE afe_hosts SET status="Running" WHERE id=1')
246 self
._run
_scheduler
()
247 self
._check
_for
_extra
_schedulings
()
249 self
._do
_query
('UPDATE afe_hosts SET status="Ready", locked=1 '
251 self
._run
_scheduler
()
252 self
._check
_for
_extra
_schedulings
()
254 self
._do
_query
('UPDATE afe_hosts SET locked=0, invalid=1 '
256 self
._run
_scheduler
()
257 if not use_metahosts
:
258 self
._assert
_job
_scheduled
_on
(1, 1)
259 self
._check
_for
_extra
_schedulings
()
262 def _test_hosts_idle_helper(self
, use_metahosts
):
263 'Only idle hosts get scheduled'
264 self
._create
_job
(hosts
=[1], active
=True)
265 self
._create
_job
_simple
([1], use_metahosts
)
266 self
._run
_scheduler
()
267 self
._check
_for
_extra
_schedulings
()
270 def _test_obey_ACLs_helper(self
, use_metahosts
):
271 self
._do
_query
('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
272 self
._create
_job
_simple
([1], use_metahosts
)
273 self
._run
_scheduler
()
274 self
._check
_for
_extra
_schedulings
()
277 def test_basic_scheduling(self
):
278 self
._test
_basic
_scheduling
_helper
(False)
281 def test_priorities(self
):
282 self
._test
_priorities
_helper
(False)
285 def test_hosts_ready(self
):
286 self
._test
_hosts
_ready
_helper
(False)
289 def test_hosts_idle(self
):
290 self
._test
_hosts
_idle
_helper
(False)
293 def test_obey_ACLs(self
):
294 self
._test
_obey
_ACLs
_helper
(False)
297 def test_one_time_hosts_ignore_ACLs(self
):
298 self
._do
_query
('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
299 self
._do
_query
('UPDATE afe_hosts SET invalid=1 WHERE id=1')
300 self
._create
_job
_simple
([1])
301 self
._run
_scheduler
()
302 self
._assert
_job
_scheduled
_on
(1, 1)
303 self
._check
_for
_extra
_schedulings
()
306 def test_non_metahost_on_invalid_host(self
):
308 Non-metahost entries can get scheduled on invalid hosts (this is how
309 one-time hosts work).
311 self
._do
_query
('UPDATE afe_hosts SET invalid=1')
312 self
._test
_basic
_scheduling
_helper
(False)
315 def test_metahost_scheduling(self
):
317 Basic metahost scheduling
319 self
._test
_basic
_scheduling
_helper
(True)
322 def test_metahost_priorities(self
):
323 self
._test
_priorities
_helper
(True)
326 def test_metahost_hosts_ready(self
):
327 self
._test
_hosts
_ready
_helper
(True)
330 def test_metahost_hosts_idle(self
):
331 self
._test
_hosts
_idle
_helper
(True)
334 def test_metahost_obey_ACLs(self
):
335 self
._test
_obey
_ACLs
_helper
(True)
338 def _setup_test_only_if_needed_labels(self
):
339 # apply only_if_needed label3 to host1
340 models
.Host
.smart_get('host1').labels
.add(self
.label3
)
341 return self
._create
_job
_simple
([1], use_metahost
=True)
344 def test_only_if_needed_labels_avoids_host(self
):
345 job
= self
._setup
_test
_only
_if
_needed
_labels
()
346 # if the job doesn't depend on label3, there should be no scheduling
347 self
._run
_scheduler
()
348 self
._check
_for
_extra
_schedulings
()
351 def test_only_if_needed_labels_schedules(self
):
352 job
= self
._setup
_test
_only
_if
_needed
_labels
()
353 job
.dependency_labels
.add(self
.label3
)
354 self
._run
_scheduler
()
355 self
._assert
_job
_scheduled
_on
(1, 1)
356 self
._check
_for
_extra
_schedulings
()
359 def test_only_if_needed_labels_via_metahost(self
):
360 job
= self
._setup
_test
_only
_if
_needed
_labels
()
361 job
.dependency_labels
.add(self
.label3
)
362 # should also work if the metahost is the only_if_needed label
363 self
._do
_query
('DELETE FROM afe_jobs_dependency_labels')
364 self
._create
_job
(metahosts
=[3])
365 self
._run
_scheduler
()
366 self
._assert
_job
_scheduled
_on
(2, 1)
367 self
._check
_for
_extra
_schedulings
()
370 def test_nonmetahost_over_metahost(self
):
372 Non-metahost entries should take priority over metahost entries
375 self
._create
_job
(metahosts
=[1])
376 self
._create
_job
(hosts
=[1])
377 self
._run
_scheduler
()
378 self
._assert
_job
_scheduled
_on
(2, 1)
379 self
._check
_for
_extra
_schedulings
()
382 def test_metahosts_obey_blocks(self
):
384 Metahosts can't get scheduled on hosts already scheduled for
387 self
._create
_job
(metahosts
=[1], hosts
=[1])
388 # make the nonmetahost entry complete, so the metahost can try
390 self
._update
_hqe
(set='complete = 1', where
='host_id=1')
391 self
._run
_scheduler
()
392 self
._check
_for
_extra
_schedulings
()
395 # TODO(gps): These should probably live in their own TestCase class
396 # specific to testing HostScheduler methods directly. It was convenient
397 # to put it here for now to share existing test environment setup code.
398 def test_HostScheduler_check_atomic_group_labels(self
):
399 normal_job
= self
._create
_job
(metahosts
=[0])
400 atomic_job
= self
._create
_job
(atomic_group
=1)
401 # Indirectly initialize the internal state of the host scheduler.
402 self
._dispatcher
._refresh
_pending
_queue
_entries
()
404 atomic_hqe
= scheduler_models
.HostQueueEntry
.fetch(where
='job_id=%d' %
406 normal_hqe
= scheduler_models
.HostQueueEntry
.fetch(where
='job_id=%d' %
409 host_scheduler
= self
._dispatcher
._host
_scheduler
410 self
.assertTrue(host_scheduler
._check
_atomic
_group
_labels
(
411 [self
.label4
.id], atomic_hqe
))
412 self
.assertFalse(host_scheduler
._check
_atomic
_group
_labels
(
413 [self
.label4
.id], normal_hqe
))
414 self
.assertFalse(host_scheduler
._check
_atomic
_group
_labels
(
415 [self
.label5
.id, self
.label6
.id, self
.label7
.id], normal_hqe
))
416 self
.assertTrue(host_scheduler
._check
_atomic
_group
_labels
(
417 [self
.label4
.id, self
.label6
.id], atomic_hqe
))
418 self
.assertTrue(host_scheduler
._check
_atomic
_group
_labels
(
419 [self
.label4
.id, self
.label5
.id],
423 def test_HostScheduler_get_host_atomic_group_id(self
):
424 job
= self
._create
_job
(metahosts
=[self
.label6
.id])
425 queue_entry
= scheduler_models
.HostQueueEntry
.fetch(
426 where
='job_id=%d' % job
.id)[0]
427 # Indirectly initialize the internal state of the host scheduler.
428 self
._dispatcher
._refresh
_pending
_queue
_entries
()
430 # Test the host scheduler
431 host_scheduler
= self
._dispatcher
._host
_scheduler
433 # Two labels each in a different atomic group. This should log an
434 # error and continue.
435 orig_logging_error
= logging
.error
436 def mock_logging_error(message
, *args
):
437 mock_logging_error
._num
_calls
+= 1
438 # Test the logging call itself, we just wrapped it to count it.
439 orig_logging_error(message
, *args
)
440 mock_logging_error
._num
_calls
= 0
441 self
.god
.stub_with(logging
, 'error', mock_logging_error
)
442 self
.assertNotEquals(None, host_scheduler
._get
_host
_atomic
_group
_id
(
443 [self
.label4
.id, self
.label8
.id], queue_entry
))
444 self
.assertTrue(mock_logging_error
._num
_calls
> 0)
445 self
.god
.unstub(logging
, 'error')
447 # Two labels both in the same atomic group, this should not raise an
448 # error, it will merely cause the job to schedule on the intersection.
449 self
.assertEquals(1, host_scheduler
._get
_host
_atomic
_group
_id
(
450 [self
.label4
.id, self
.label5
.id]))
452 self
.assertEquals(None, host_scheduler
._get
_host
_atomic
_group
_id
([]))
453 self
.assertEquals(None, host_scheduler
._get
_host
_atomic
_group
_id
(
454 [self
.label3
.id, self
.label7
.id, self
.label6
.id]))
455 self
.assertEquals(1, host_scheduler
._get
_host
_atomic
_group
_id
(
456 [self
.label4
.id, self
.label7
.id, self
.label6
.id]))
457 self
.assertEquals(1, host_scheduler
._get
_host
_atomic
_group
_id
(
458 [self
.label7
.id, self
.label5
.id]))
461 def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self
):
462 # Create a job scheduled to run on label6.
463 self
._create
_job
(metahosts
=[self
.label6
.id])
464 self
._run
_scheduler
()
465 # label6 only has hosts that are in atomic groups associated with it,
466 # there should be no scheduling.
467 self
._check
_for
_extra
_schedulings
()
470 def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self
):
471 # Create a job scheduled to run on label5. This is an atomic group
472 # label but this job does not request atomic group scheduling.
473 self
._create
_job
(metahosts
=[self
.label5
.id])
474 self
._run
_scheduler
()
475 # label6 only has hosts that are in atomic groups associated with it,
476 # there should be no scheduling.
477 self
._check
_for
_extra
_schedulings
()
480 def test_atomic_group_scheduling_basics(self
):
481 # Create jobs scheduled to run on an atomic group.
482 job_a
= self
._create
_job
(synchronous
=True, metahosts
=[self
.label4
.id],
484 job_b
= self
._create
_job
(synchronous
=True, metahosts
=[self
.label5
.id],
486 self
._run
_scheduler
()
487 # atomic_group.max_number_of_machines was 2 so we should run on 2.
488 self
._assert
_job
_scheduled
_on
_number
_of
(job_a
.id, (5, 6, 7), 2)
489 self
._assert
_job
_scheduled
_on
(job_b
.id, 8) # label5
490 self
._assert
_job
_scheduled
_on
(job_b
.id, 9) # label5
491 self
._check
_for
_extra
_schedulings
()
493 # The three host label4 atomic group still has one host available.
494 # That means a job with a synch_count of 1 asking to be scheduled on
495 # the atomic group can still use the final machine.
497 # This may seem like a somewhat odd use case. It allows the use of an
498 # atomic group as a set of machines to run smaller jobs within (a set
499 # of hosts configured for use in network tests with eachother perhaps?)
500 onehost_job
= self
._create
_job
(atomic_group
=1)
501 self
._run
_scheduler
()
502 self
._assert
_job
_scheduled
_on
_number
_of
(onehost_job
.id, (5, 6, 7), 1)
503 self
._check
_for
_extra
_schedulings
()
505 # No more atomic groups have hosts available, no more jobs should
507 self
._create
_job
(atomic_group
=1)
508 self
._run
_scheduler
()
509 self
._check
_for
_extra
_schedulings
()
512 def test_atomic_group_scheduling_obeys_acls(self
):
513 # Request scheduling on a specific atomic label but be denied by ACLs.
514 self
._do
_query
('DELETE FROM afe_acl_groups_hosts '
515 'WHERE host_id in (8,9)')
516 job
= self
._create
_job
(metahosts
=[self
.label5
.id], atomic_group
=1)
517 self
._run
_scheduler
()
518 self
._check
_for
_extra
_schedulings
()
521 def test_atomic_group_scheduling_dependency_label_exclude(self
):
522 # A dependency label that matches no hosts in the atomic group.
523 job_a
= self
._create
_job
(atomic_group
=1)
524 job_a
.dependency_labels
.add(self
.label3
)
525 self
._run
_scheduler
()
526 self
._check
_for
_extra
_schedulings
()
529 def test_atomic_group_scheduling_metahost_dependency_label_exclude(self
):
530 # A metahost and dependency label that excludes too many hosts.
531 job_b
= self
._create
_job
(synchronous
=True, metahosts
=[self
.label4
.id],
533 job_b
.dependency_labels
.add(self
.label7
)
534 self
._run
_scheduler
()
535 self
._check
_for
_extra
_schedulings
()
538 def test_atomic_group_scheduling_dependency_label_match(self
):
539 # A dependency label that exists on enough atomic group hosts in only
540 # one of the two atomic group labels.
541 job_c
= self
._create
_job
(synchronous
=True, atomic_group
=1)
542 job_c
.dependency_labels
.add(self
.label7
)
543 self
._run
_scheduler
()
544 self
._assert
_job
_scheduled
_on
_number
_of
(job_c
.id, (8, 9), 2)
545 self
._check
_for
_extra
_schedulings
()
548 def test_atomic_group_scheduling_no_metahost(self
):
549 # Force it to schedule on the other group for a reliable test.
550 self
._do
_query
('UPDATE afe_hosts SET invalid=1 WHERE id=9')
551 # An atomic job without a metahost.
552 job
= self
._create
_job
(synchronous
=True, atomic_group
=1)
553 self
._run
_scheduler
()
554 self
._assert
_job
_scheduled
_on
_number
_of
(job
.id, (5, 6, 7), 2)
555 self
._check
_for
_extra
_schedulings
()
558 def test_atomic_group_scheduling_partial_group(self
):
559 # Make one host in labels[3] unavailable so that there are only two
560 # hosts left in the group.
561 self
._do
_query
('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
562 job
= self
._create
_job
(synchronous
=True, metahosts
=[self
.label4
.id],
564 self
._run
_scheduler
()
565 # Verify that it was scheduled on the 2 ready hosts in that group.
566 self
._assert
_job
_scheduled
_on
(job
.id, 6)
567 self
._assert
_job
_scheduled
_on
(job
.id, 7)
568 self
._check
_for
_extra
_schedulings
()
571 def test_atomic_group_scheduling_not_enough_available(self
):
572 # Mark some hosts in each atomic group label as not usable.
573 # One host running, another invalid in the first group label.
574 self
._do
_query
('UPDATE afe_hosts SET status="Running" WHERE id=5')
575 self
._do
_query
('UPDATE afe_hosts SET invalid=1 WHERE id=6')
576 # One host invalid in the second group label.
577 self
._do
_query
('UPDATE afe_hosts SET invalid=1 WHERE id=9')
578 # Nothing to schedule when no group label has enough (2) good hosts..
579 self
._create
_job
(atomic_group
=1, synchronous
=True)
580 self
._run
_scheduler
()
581 # There are not enough hosts in either atomic group,
582 # No more scheduling should occur.
583 self
._check
_for
_extra
_schedulings
()
585 # Now create an atomic job that has a synch count of 1. It should
586 # schedule on exactly one of the hosts.
587 onehost_job
= self
._create
_job
(atomic_group
=1)
588 self
._run
_scheduler
()
589 self
._assert
_job
_scheduled
_on
_number
_of
(onehost_job
.id, (7, 8), 1)
592 def test_atomic_group_scheduling_no_valid_hosts(self
):
593 self
._do
_query
('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
594 self
._create
_job
(synchronous
=True, metahosts
=[self
.label5
.id],
596 self
._run
_scheduler
()
597 # no hosts in the selected group and label are valid. no schedulings.
598 self
._check
_for
_extra
_schedulings
()
601 def test_atomic_group_scheduling_metahost_works(self
):
602 # Test that atomic group scheduling also obeys metahosts.
603 self
._create
_job
(metahosts
=[0], atomic_group
=1)
604 self
._run
_scheduler
()
605 # There are no atomic group hosts that also have that metahost.
606 self
._check
_for
_extra
_schedulings
()
608 job_b
= self
._create
_job
(metahosts
=[self
.label5
.id], atomic_group
=1)
609 self
._run
_scheduler
()
610 self
._assert
_job
_scheduled
_on
(job_b
.id, 8)
611 self
._assert
_job
_scheduled
_on
(job_b
.id, 9)
612 self
._check
_for
_extra
_schedulings
()
615 def test_atomic_group_skips_ineligible_hosts(self
):
616 # Test hosts marked ineligible for this job are not eligible.
617 # How would this ever happen anyways?
618 job
= self
._create
_job
(metahosts
=[self
.label4
.id], atomic_group
=1)
619 models
.IneligibleHostQueue
.objects
.create(job
=job
, host_id
=5)
620 models
.IneligibleHostQueue
.objects
.create(job
=job
, host_id
=6)
621 models
.IneligibleHostQueue
.objects
.create(job
=job
, host_id
=7)
622 self
._run
_scheduler
()
623 # No scheduling should occur as all desired hosts were ineligible.
624 self
._check
_for
_extra
_schedulings
()
627 def test_atomic_group_scheduling_fail(self
):
628 # If synch_count is > the atomic group number of machines, the job
629 # should be aborted immediately.
630 model_job
= self
._create
_job
(synchronous
=True, atomic_group
=1)
631 model_job
.synch_count
= 4
633 job
= scheduler_models
.Job(id=model_job
.id)
634 self
._run
_scheduler
()
635 self
._check
_for
_extra
_schedulings
()
636 queue_entries
= job
.get_host_queue_entries()
637 self
.assertEqual(1, len(queue_entries
))
638 self
.assertEqual(queue_entries
[0].status
,
639 models
.HostQueueEntry
.Status
.ABORTED
)
642 def test_atomic_group_no_labels_no_scheduling(self
):
643 # Never schedule on atomic groups marked invalid.
644 job
= self
._create
_job
(metahosts
=[self
.label5
.id], synchronous
=True,
646 # Deleting an atomic group via the frontend marks it invalid and
647 # removes all label references to the group. The job now references
648 # an invalid atomic group with no labels associated with it.
649 self
.label5
.atomic_group
.invalid
= True
650 self
.label5
.atomic_group
.save()
651 self
.label5
.atomic_group
= None
654 self
._run
_scheduler
()
655 self
._check
_for
_extra
_schedulings
()
658 def test_schedule_directly_on_atomic_group_host_fail(self
):
659 # Scheduling a job directly on hosts in an atomic group must
660 # fail to avoid users inadvertently holding up the use of an
661 # entire atomic group by using the machines individually.
662 job
= self
._create
_job
(hosts
=[5])
663 self
._run
_scheduler
()
664 self
._check
_for
_extra
_schedulings
()
667 def test_schedule_directly_on_atomic_group_host(self
):
668 # Scheduling a job directly on one host in an atomic group will
669 # work when the atomic group is listed on the HQE in addition
670 # to the host (assuming the sync count is 1).
671 job
= self
._create
_job
(hosts
=[5], atomic_group
=1)
672 self
._run
_scheduler
()
673 self
._assert
_job
_scheduled
_on
(job
.id, 5)
674 self
._check
_for
_extra
_schedulings
()
677 def test_schedule_directly_on_atomic_group_hosts_sync2(self
):
678 job
= self
._create
_job
(hosts
=[5,8], atomic_group
=1, synchronous
=True)
679 self
._run
_scheduler
()
680 self
._assert
_job
_scheduled
_on
(job
.id, 5)
681 self
._assert
_job
_scheduled
_on
(job
.id, 8)
682 self
._check
_for
_extra
_schedulings
()
685 def test_schedule_directly_on_atomic_group_hosts_wrong_group(self
):
686 job
= self
._create
_job
(hosts
=[5,8], atomic_group
=2, synchronous
=True)
687 self
._run
_scheduler
()
688 self
._check
_for
_extra
_schedulings
()
691 def test_only_schedule_queued_entries(self
):
692 self
._create
_job
(metahosts
=[1])
693 self
._update
_hqe
(set='active=1, host_id=2')
694 self
._run
_scheduler
()
695 self
._check
_for
_extra
_schedulings
()
698 def test_no_ready_hosts(self
):
699 self
._create
_job
(hosts
=[1])
700 self
._do
_query
('UPDATE afe_hosts SET status="Repair Failed"')
701 self
._run
_scheduler
()
702 self
._check
_for
_extra
_schedulings
()
705 def test_garbage_collection(self
):
706 self
.god
.stub_with(self
._dispatcher
, '_seconds_between_garbage_stats',
708 self
.god
.stub_function(gc
, 'collect')
709 self
.god
.stub_function(gc_stats
, '_log_garbage_collector_stats')
710 gc
.collect
.expect_call().and_return(0)
711 gc_stats
._log
_garbage
_collector
_stats
.expect_call()
712 # Force a garbage collection run
713 self
._dispatcher
._last
_garbage
_stats
_time
= 0
714 self
._dispatcher
._garbage
_collection
()
715 # The previous call should have reset the time, it won't do anything
716 # the second time. If it does, we'll get an unexpected call.
717 self
._dispatcher
._garbage
_collection
()
721 class DispatcherThrottlingTest(BaseSchedulerTest
):
723 Test that the dispatcher throttles:
724 * total number of running processes
725 * number of processes started per cycle
731 super(DispatcherThrottlingTest
, self
).setUp()
732 scheduler_config
.config
.max_processes_per_drone
= self
._MAX
_RUNNING
733 scheduler_config
.config
.max_processes_started_per_cycle
= (
736 def fake_max_runnable_processes(fake_self
, username
,
737 drone_hostnames_allowed
):
738 running
= sum(agent
.task
.num_processes
739 for agent
in self
._agents
740 if agent
.started
and not agent
.is_done())
741 return self
._MAX
_RUNNING
- running
742 self
.god
.stub_with(drone_manager
.DroneManager
, 'max_runnable_processes',
743 fake_max_runnable_processes
)
746 def _setup_some_agents(self
, num_agents
):
747 self
._agents
= [DummyAgent() for i
in xrange(num_agents
)]
748 self
._dispatcher
._agents
= list(self
._agents
)
751 def _run_a_few_cycles(self
):
753 self
._dispatcher
._handle
_agents
()
756 def _assert_agents_started(self
, indexes
, is_started
=True):
758 self
.assert_(self
._agents
[i
].started
== is_started
,
759 'Agent %d %sstarted' %
760 (i
, is_started
and 'not ' or ''))
763 def _assert_agents_not_started(self
, indexes
):
764 self
._assert
_agents
_started
(indexes
, False)
767 def test_throttle_total(self
):
768 self
._setup
_some
_agents
(4)
769 self
._run
_a
_few
_cycles
()
770 self
._assert
_agents
_started
([0, 1, 2])
771 self
._assert
_agents
_not
_started
([3])
774 def test_throttle_per_cycle(self
):
775 self
._setup
_some
_agents
(3)
776 self
._dispatcher
._handle
_agents
()
777 self
._assert
_agents
_started
([0, 1])
778 self
._assert
_agents
_not
_started
([2])
781 def test_throttle_with_synchronous(self
):
782 self
._setup
_some
_agents
(2)
783 self
._agents
[0].task
.num_processes
= 3
784 self
._run
_a
_few
_cycles
()
785 self
._assert
_agents
_started
([0])
786 self
._assert
_agents
_not
_started
([1])
789 def test_large_agent_starvation(self
):
791 Ensure large agents don't get starved by lower-priority agents.
793 self
._setup
_some
_agents
(3)
794 self
._agents
[1].task
.num_processes
= 3
795 self
._run
_a
_few
_cycles
()
796 self
._assert
_agents
_started
([0])
797 self
._assert
_agents
_not
_started
([1, 2])
799 self
._agents
[0].set_done(True)
800 self
._run
_a
_few
_cycles
()
801 self
._assert
_agents
_started
([1])
802 self
._assert
_agents
_not
_started
([2])
805 def test_zero_process_agent(self
):
806 self
._setup
_some
_agents
(5)
807 self
._agents
[4].task
.num_processes
= 0
808 self
._run
_a
_few
_cycles
()
809 self
._assert
_agents
_started
([0, 1, 2, 4])
810 self
._assert
_agents
_not
_started
([3])
813 class PidfileRunMonitorTest(unittest
.TestCase
):
814 execution_tag
= 'test_tag'
816 process
= drone_manager
.Process('myhost', pid
)
820 self
.god
= mock
.mock_god()
821 self
.mock_drone_manager
= self
.god
.create_mock_class(
822 drone_manager
.DroneManager
, 'drone_manager')
823 self
.god
.stub_with(monitor_db
, '_drone_manager',
824 self
.mock_drone_manager
)
825 self
.god
.stub_function(email_manager
.manager
, 'enqueue_notify_email')
826 self
.god
.stub_with(monitor_db
, '_get_pidfile_timeout_secs',
827 self
._mock
_get
_pidfile
_timeout
_secs
)
829 self
.pidfile_id
= object()
831 (self
.mock_drone_manager
.get_pidfile_id_from
832 .expect_call(self
.execution_tag
,
833 pidfile_name
=drone_manager
.AUTOSERV_PID_FILE
)
834 .and_return(self
.pidfile_id
))
836 self
.monitor
= monitor_db
.PidfileRunMonitor()
837 self
.monitor
.attach_to_existing_process(self
.execution_tag
)
840 self
.god
.unstub_all()
843 def _mock_get_pidfile_timeout_secs(self
):
847 def setup_pidfile(self
, pid
=None, exit_code
=None, tests_failed
=None,
848 use_second_read
=False):
849 contents
= drone_manager
.PidfileContents()
851 contents
.process
= drone_manager
.Process('myhost', pid
)
852 contents
.exit_status
= exit_code
853 contents
.num_tests_failed
= tests_failed
854 self
.mock_drone_manager
.get_pidfile_contents
.expect_call(
855 self
.pidfile_id
, use_second_read
=use_second_read
).and_return(
859 def set_not_yet_run(self
):
863 def set_empty_pidfile(self
):
867 def set_running(self
, use_second_read
=False):
868 self
.setup_pidfile(self
.pid
, use_second_read
=use_second_read
)
871 def set_complete(self
, error_code
, use_second_read
=False):
872 self
.setup_pidfile(self
.pid
, error_code
, self
.num_tests_failed
,
873 use_second_read
=use_second_read
)
876 def _check_monitor(self
, expected_pid
, expected_exit_status
,
877 expected_num_tests_failed
):
878 if expected_pid
is None:
879 self
.assertEquals(self
.monitor
._state
.process
, None)
881 self
.assertEquals(self
.monitor
._state
.process
.pid
, expected_pid
)
882 self
.assertEquals(self
.monitor
._state
.exit_status
, expected_exit_status
)
883 self
.assertEquals(self
.monitor
._state
.num_tests_failed
,
884 expected_num_tests_failed
)
887 self
.god
.check_playback()
890 def _test_read_pidfile_helper(self
, expected_pid
, expected_exit_status
,
891 expected_num_tests_failed
):
892 self
.monitor
._read
_pidfile
()
893 self
._check
_monitor
(expected_pid
, expected_exit_status
,
894 expected_num_tests_failed
)
897 def _get_expected_tests_failed(self
, expected_exit_status
):
898 if expected_exit_status
is None:
899 expected_tests_failed
= None
901 expected_tests_failed
= self
.num_tests_failed
902 return expected_tests_failed
905 def test_read_pidfile(self
):
906 self
.set_not_yet_run()
907 self
._test
_read
_pidfile
_helper
(None, None, None)
909 self
.set_empty_pidfile()
910 self
._test
_read
_pidfile
_helper
(None, None, None)
913 self
._test
_read
_pidfile
_helper
(self
.pid
, None, None)
915 self
.set_complete(123)
916 self
._test
_read
_pidfile
_helper
(self
.pid
, 123, self
.num_tests_failed
)
919 def test_read_pidfile_error(self
):
920 self
.mock_drone_manager
.get_pidfile_contents
.expect_call(
921 self
.pidfile_id
, use_second_read
=False).and_return(
922 drone_manager
.InvalidPidfile('error'))
923 self
.assertRaises(monitor_db
.PidfileRunMonitor
._PidfileException
,
924 self
.monitor
._read
_pidfile
)
925 self
.god
.check_playback()
928 def setup_is_running(self
, is_running
):
929 self
.mock_drone_manager
.is_process_running
.expect_call(
930 self
.process
).and_return(is_running
)
933 def _test_get_pidfile_info_helper(self
, expected_pid
, expected_exit_status
,
934 expected_num_tests_failed
):
935 self
.monitor
._get
_pidfile
_info
()
936 self
._check
_monitor
(expected_pid
, expected_exit_status
,
937 expected_num_tests_failed
)
940 def test_get_pidfile_info(self
):
942 normal cases for get_pidfile_info
946 self
.setup_is_running(True)
947 self
._test
_get
_pidfile
_info
_helper
(self
.pid
, None, None)
949 # exited during check
951 self
.setup_is_running(False)
952 self
.set_complete(123, use_second_read
=True) # pidfile gets read again
953 self
._test
_get
_pidfile
_info
_helper
(self
.pid
, 123, self
.num_tests_failed
)
956 self
.set_complete(123)
957 self
._test
_get
_pidfile
_info
_helper
(self
.pid
, 123, self
.num_tests_failed
)
960 def test_get_pidfile_info_running_no_proc(self
):
962 pidfile shows process running, but no proc exists
964 # running but no proc
966 self
.setup_is_running(False)
967 self
.set_running(use_second_read
=True)
968 email_manager
.manager
.enqueue_notify_email
.expect_call(
969 mock
.is_string_comparator(), mock
.is_string_comparator())
970 self
._test
_get
_pidfile
_info
_helper
(self
.pid
, 1, 0)
971 self
.assertTrue(self
.monitor
.lost_process
)
974 def test_get_pidfile_info_not_yet_run(self
):
976 pidfile hasn't been written yet
978 self
.set_not_yet_run()
979 self
._test
_get
_pidfile
_info
_helper
(None, None, None)
982 def test_process_failed_to_write_pidfile(self
):
983 self
.set_not_yet_run()
984 email_manager
.manager
.enqueue_notify_email
.expect_call(
985 mock
.is_string_comparator(), mock
.is_string_comparator())
986 self
.monitor
._start
_time
= (time
.time() -
987 monitor_db
._get
_pidfile
_timeout
_secs
() - 1)
988 self
._test
_get
_pidfile
_info
_helper
(None, 1, 0)
989 self
.assertTrue(self
.monitor
.lost_process
)
992 class AgentTest(unittest
.TestCase
):
994 self
.god
= mock
.mock_god()
995 self
._dispatcher
= self
.god
.create_mock_class(monitor_db
.Dispatcher
,
1000 self
.god
.unstub_all()
1003 def _create_mock_task(self
, name
):
1004 task
= self
.god
.create_mock_class(monitor_db
.AgentTask
, name
)
1005 task
.num_processes
= 1
1006 _set_host_and_qe_ids(task
)
1009 def _create_agent(self
, task
):
1010 agent
= monitor_db
.Agent(task
)
1011 agent
.dispatcher
= self
._dispatcher
1015 def _finish_agent(self
, agent
):
1016 while not agent
.is_done():
1020 def test_agent_abort(self
):
1021 task
= self
._create
_mock
_task
('task')
1022 task
.poll
.expect_call()
1023 task
.is_done
.expect_call().and_return(False)
1024 task
.abort
.expect_call()
1027 agent
= self
._create
_agent
(task
)
1030 self
._finish
_agent
(agent
)
1031 self
.god
.check_playback()
1034 def _test_agent_abort_before_started_helper(self
, ignore_abort
=False):
1035 task
= self
._create
_mock
_task
('task')
1036 task
.abort
.expect_call()
1038 task
.aborted
= False
1039 task
.poll
.expect_call()
1040 task
.is_done
.expect_call().and_return(True)
1045 agent
= self
._create
_agent
(task
)
1047 self
._finish
_agent
(agent
)
1048 self
.god
.check_playback()
1051 def test_agent_abort_before_started(self
):
1052 self
._test
_agent
_abort
_before
_started
_helper
()
1053 self
._test
_agent
_abort
_before
_started
_helper
(True)
1056 class JobSchedulingTest(BaseSchedulerTest
):
1057 def _test_run_helper(self
, expect_agent
=True, expect_starting
=False,
1058 expect_pending
=False):
1060 expected_status
= models
.HostQueueEntry
.Status
.STARTING
1061 elif expect_pending
:
1062 expected_status
= models
.HostQueueEntry
.Status
.PENDING
1064 expected_status
= models
.HostQueueEntry
.Status
.VERIFYING
1065 job
= scheduler_models
.Job
.fetch('id = 1')[0]
1066 queue_entry
= scheduler_models
.HostQueueEntry
.fetch('id = 1')[0]
1067 assert queue_entry
.job
is job
1068 job
.run_if_ready(queue_entry
)
1070 self
.god
.check_playback()
1072 self
._dispatcher
._schedule
_delay
_tasks
()
1073 self
._dispatcher
._schedule
_running
_host
_queue
_entries
()
1074 agent
= self
._dispatcher
._agents
[0]
1076 actual_status
= models
.HostQueueEntry
.smart_get(1).status
1077 self
.assertEquals(expected_status
, actual_status
)
1079 if not expect_agent
:
1080 self
.assertEquals(agent
, None)
1083 self
.assert_(isinstance(agent
, monitor_db
.Agent
))
1084 self
.assert_(agent
.task
)
1088 def test_run_if_ready_delays(self
):
1089 # Also tests Job.run_with_ready_delay() on atomic group jobs.
1090 django_job
= self
._create
_job
(hosts
=[5, 6], atomic_group
=1)
1091 job
= scheduler_models
.Job(django_job
.id)
1092 self
.assertEqual(1, job
.synch_count
)
1093 django_hqes
= list(models
.HostQueueEntry
.objects
.filter(job
=job
.id))
1094 self
.assertEqual(2, len(django_hqes
))
1095 self
.assertEqual(2, django_hqes
[0].atomic_group
.max_number_of_machines
)
1097 def set_hqe_status(django_hqe
, status
):
1098 django_hqe
.status
= status
1100 scheduler_models
.HostQueueEntry(django_hqe
.id).host
.set_status(status
)
1102 # An initial state, our synch_count is 1
1103 set_hqe_status(django_hqes
[0], models
.HostQueueEntry
.Status
.VERIFYING
)
1104 set_hqe_status(django_hqes
[1], models
.HostQueueEntry
.Status
.PENDING
)
1106 # So that we don't depend on the config file value during the test.
1107 self
.assert_(scheduler_config
.config
1108 .secs_to_wait_for_atomic_group_hosts
is not None)
1109 self
.god
.stub_with(scheduler_config
.config
,
1110 'secs_to_wait_for_atomic_group_hosts', 123456)
1112 # Get the pending one as a scheduler_models.HostQueueEntry object.
1113 hqe
= scheduler_models
.HostQueueEntry(django_hqes
[1].id)
1114 self
.assert_(not job
._delay
_ready
_task
)
1115 self
.assertTrue(job
.is_ready())
1117 # Ready with one pending, one verifying and an atomic group should
1118 # result in a DelayCallTask to re-check if we're ready a while later.
1119 job
.run_if_ready(hqe
)
1120 self
.assertEquals('Waiting', hqe
.status
)
1121 self
._dispatcher
._schedule
_delay
_tasks
()
1122 self
.assertEquals('Pending', hqe
.status
)
1123 agent
= self
._dispatcher
._agents
[0]
1124 self
.assert_(job
._delay
_ready
_task
)
1125 self
.assert_(isinstance(agent
, monitor_db
.Agent
))
1126 self
.assert_(agent
.task
)
1127 delay_task
= agent
.task
1128 self
.assert_(isinstance(delay_task
, scheduler_models
.DelayedCallTask
))
1129 self
.assert_(not delay_task
.is_done())
1131 self
.god
.stub_function(delay_task
, 'abort')
1133 self
.god
.stub_function(job
, 'run')
1135 self
.god
.stub_function(job
, '_pending_count')
1136 self
.god
.stub_with(job
, 'synch_count', 9)
1137 self
.god
.stub_function(job
, 'request_abort')
1139 # Test that the DelayedCallTask's callback queued up above does the
1140 # correct thing and does not call run if there are not enough hosts
1141 # in pending after the delay.
1142 job
._pending
_count
.expect_call().and_return(0)
1143 job
.request_abort
.expect_call()
1144 delay_task
._callback
()
1145 self
.god
.check_playback()
1147 # Test that the DelayedCallTask's callback queued up above does the
1148 # correct thing and returns the Agent returned by job.run() if
1149 # there are still enough hosts pending after the delay.
1151 job
._pending
_count
.expect_call().and_return(4)
1152 job
.run
.expect_call(hqe
)
1153 delay_task
._callback
()
1154 self
.god
.check_playback()
1156 job
._pending
_count
.expect_call().and_return(4)
1158 # Adjust the delay deadline so that enough time has passed.
1159 job
._delay
_ready
_task
.end_time
= time
.time() - 111111
1160 job
.run
.expect_call(hqe
)
1161 # ...the delay_expired condition should cause us to call run()
1162 self
._dispatcher
._handle
_agents
()
1163 self
.god
.check_playback()
1164 delay_task
.success
= False
1166 # Adjust the delay deadline back so that enough time has not passed.
1167 job
._delay
_ready
_task
.end_time
= time
.time() + 111111
1168 self
._dispatcher
._handle
_agents
()
1169 self
.god
.check_playback()
1171 # Now max_number_of_machines HQEs are in pending state. Remaining
1172 # delay will now be ignored.
1173 other_hqe
= scheduler_models
.HostQueueEntry(django_hqes
[0].id)
1174 self
.god
.unstub(job
, 'run')
1175 self
.god
.unstub(job
, '_pending_count')
1176 self
.god
.unstub(job
, 'synch_count')
1177 self
.god
.unstub(job
, 'request_abort')
1178 # ...the over_max_threshold test should cause us to call run()
1179 delay_task
.abort
.expect_call()
1180 other_hqe
.on_pending()
1181 self
.assertEquals('Starting', other_hqe
.status
)
1182 self
.assertEquals('Starting', hqe
.status
)
1183 self
.god
.stub_function(job
, 'run')
1184 self
.god
.unstub(delay_task
, 'abort')
1186 hqe
.set_status('Pending')
1187 other_hqe
.set_status('Pending')
1188 # Now we're not over the max for the atomic group. But all assigned
1189 # hosts are in pending state. over_max_threshold should make us run().
1190 hqe
.atomic_group
.max_number_of_machines
+= 1
1191 hqe
.atomic_group
.save()
1192 job
.run
.expect_call(hqe
)
1194 self
.god
.check_playback()
1195 hqe
.atomic_group
.max_number_of_machines
-= 1
1196 hqe
.atomic_group
.save()
1198 other_hqe
= scheduler_models
.HostQueueEntry(django_hqes
[0].id)
1199 self
.assertTrue(hqe
.job
is other_hqe
.job
)
1200 # DBObject classes should reuse instances so these should be the same.
1201 self
.assertEqual(job
, other_hqe
.job
)
1202 self
.assertEqual(other_hqe
.job
, hqe
.job
)
1203 # Be sure our delay was not lost during the other_hqe construction.
1204 self
.assertEqual(job
._delay
_ready
_task
, delay_task
)
1205 self
.assert_(job
._delay
_ready
_task
)
1206 self
.assertFalse(job
._delay
_ready
_task
.is_done())
1207 self
.assertFalse(job
._delay
_ready
_task
.aborted
)
1209 # We want the real run() to be called below.
1210 self
.god
.unstub(job
, 'run')
1212 # We pass in the other HQE this time the same way it would happen
1213 # for real when one host finishes verifying and enters pending.
1214 job
.run_if_ready(other_hqe
)
1216 # The delayed task must be aborted by the actual run() call above.
1217 self
.assertTrue(job
._delay
_ready
_task
.aborted
)
1218 self
.assertFalse(job
._delay
_ready
_task
.success
)
1219 self
.assertTrue(job
._delay
_ready
_task
.is_done())
1221 # Check that job run() and _finish_run() were called by the above:
1222 self
._dispatcher
._schedule
_running
_host
_queue
_entries
()
1223 agent
= self
._dispatcher
._agents
[0]
1224 self
.assert_(agent
.task
)
1226 self
.assert_(isinstance(task
, monitor_db
.QueueTask
))
1227 # Requery these hqes in order to verify the status from the DB.
1228 django_hqes
= list(models
.HostQueueEntry
.objects
.filter(job
=job
.id))
1229 for entry
in django_hqes
:
1230 self
.assertEqual(models
.HostQueueEntry
.Status
.STARTING
,
1233 # We're already running, but more calls to run_with_ready_delay can
1234 # continue to come in due to straggler hosts enter Pending. Make
1235 # sure we don't do anything.
1236 self
.god
.stub_function(job
, 'run')
1237 job
.run_with_ready_delay(hqe
)
1238 self
.god
.check_playback()
1239 self
.god
.unstub(job
, 'run')
1242 def test_run_synchronous_atomic_group_ready(self
):
1243 self
._create
_job
(hosts
=[5, 6], atomic_group
=1, synchronous
=True)
1244 self
._update
_hqe
("status='Pending', execution_subdir=''")
1246 queue_task
= self
._test
_run
_helper
(expect_starting
=True)
1248 self
.assert_(isinstance(queue_task
, monitor_db
.QueueTask
))
1249 # Atomic group jobs that do not depend on a specific label in the
1250 # atomic group will use the atomic group name as their group name.
1251 self
.assertEquals(queue_task
.queue_entries
[0].get_group_name(),
1255 def test_run_synchronous_atomic_group_with_label_ready(self
):
1256 job
= self
._create
_job
(hosts
=[5, 6], atomic_group
=1, synchronous
=True)
1257 job
.dependency_labels
.add(self
.label4
)
1258 self
._update
_hqe
("status='Pending', execution_subdir=''")
1260 queue_task
= self
._test
_run
_helper
(expect_starting
=True)
1262 self
.assert_(isinstance(queue_task
, monitor_db
.QueueTask
))
1263 # Atomic group jobs that also specify a label in the atomic group
1264 # will use the label name as their group name.
1265 self
.assertEquals(queue_task
.queue_entries
[0].get_group_name(),
1269 def test_run_synchronous_ready(self
):
1270 self
._create
_job
(hosts
=[1, 2], synchronous
=True)
1271 self
._update
_hqe
("status='Pending', execution_subdir=''")
1273 queue_task
= self
._test
_run
_helper
(expect_starting
=True)
1275 self
.assert_(isinstance(queue_task
, monitor_db
.QueueTask
))
1276 self
.assertEquals(queue_task
.job
.id, 1)
1277 hqe_ids
= [hqe
.id for hqe
in queue_task
.queue_entries
]
1278 self
.assertEquals(hqe_ids
, [1, 2])
1281 def test_schedule_running_host_queue_entries_fail(self
):
1282 self
._create
_job
(hosts
=[2])
1283 self
._update
_hqe
("status='%s', execution_subdir=''" %
1284 models
.HostQueueEntry
.Status
.PENDING
)
1285 job
= scheduler_models
.Job
.fetch('id = 1')[0]
1286 queue_entry
= scheduler_models
.HostQueueEntry
.fetch('id = 1')[0]
1287 assert queue_entry
.job
is job
1288 job
.run_if_ready(queue_entry
)
1289 self
.assertEqual(queue_entry
.status
,
1290 models
.HostQueueEntry
.Status
.STARTING
)
1291 self
.assert_(queue_entry
.execution_subdir
)
1292 self
.god
.check_playback()
1294 class dummy_test_agent(object):
1295 task
= 'dummy_test_agent'
1296 self
._dispatcher
._register
_agent
_for
_ids
(
1297 self
._dispatcher
._host
_agents
, [queue_entry
.host
.id],
1300 # Attempted to schedule on a host that already has an agent.
1301 self
.assertRaises(host_scheduler
.SchedulerError
,
1302 self
._dispatcher
._schedule
_running
_host
_queue
_entries
)
1305 def test_schedule_hostless_job(self
):
1306 job
= self
._create
_job
(hostless
=True)
1307 self
.assertEqual(1, job
.hostqueueentry_set
.count())
1308 hqe_query
= scheduler_models
.HostQueueEntry
.fetch(
1309 'id = %s' % job
.hostqueueentry_set
.all()[0].id)
1310 self
.assertEqual(1, len(hqe_query
))
1313 self
.assertEqual(models
.HostQueueEntry
.Status
.QUEUED
, hqe
.status
)
1314 self
.assertEqual(0, len(self
._dispatcher
._agents
))
1316 self
._dispatcher
._schedule
_new
_jobs
()
1318 self
.assertEqual(models
.HostQueueEntry
.Status
.STARTING
, hqe
.status
)
1319 self
.assertEqual(1, len(self
._dispatcher
._agents
))
1321 self
._dispatcher
._schedule
_new
_jobs
()
1323 # No change to previously schedule hostless job, and no additional agent
1324 self
.assertEqual(models
.HostQueueEntry
.Status
.STARTING
, hqe
.status
)
1325 self
.assertEqual(1, len(self
._dispatcher
._agents
))
1328 class TopLevelFunctionsTest(unittest
.TestCase
):
1330 self
.god
= mock
.mock_god()
1334 self
.god
.unstub_all()
1337 def test_autoserv_command_line(self
):
1338 machines
= 'abcd12,efgh34'
1339 extra_args
= ['-Z', 'hello']
1340 expected_command_line_base
= set((monitor_db
._autoserv
_path
, '-p',
1341 '-m', machines
, '-r',
1342 drone_manager
.WORKING_DIRECTORY
))
1344 expected_command_line
= expected_command_line_base
.union(
1345 ['--verbose']).union(extra_args
)
1347 monitor_db
._autoserv
_command
_line
(machines
, extra_args
))
1348 self
.assertEqual(expected_command_line
, command_line
)
1350 class FakeJob(object):
1352 name
= 'fake job name'
1355 class FakeHQE(object):
1358 expected_command_line
= expected_command_line_base
.union(
1359 ['-u', FakeJob
.owner
, '-l', FakeJob
.name
])
1360 command_line
= set(monitor_db
._autoserv
_command
_line
(
1361 machines
, extra_args
=[], queue_entry
=FakeHQE
, verbose
=False))
1362 self
.assertEqual(expected_command_line
, command_line
)
1365 class AgentTaskTest(unittest
.TestCase
,
1366 frontend_test_utils
.FrontendTestMixin
):
1368 self
._frontend
_common
_setup
()
1372 self
._frontend
_common
_teardown
()
1375 def _setup_drones(self
):
1376 self
.god
.stub_function(models
.DroneSet
, 'drone_sets_enabled')
1377 models
.DroneSet
.drone_sets_enabled
.expect_call().and_return(True)
1381 drones
.append(models
.Drone
.objects
.create(hostname
=str(x
)))
1383 drone_set_1
= models
.DroneSet
.objects
.create(name
='1')
1384 drone_set_1
.drones
.add(*drones
[0:2])
1385 drone_set_2
= models
.DroneSet
.objects
.create(name
='2')
1386 drone_set_2
.drones
.add(*drones
[2:4])
1387 drone_set_3
= models
.DroneSet
.objects
.create(name
='3')
1389 job_1
= self
._create
_job
_simple
([self
.hosts
[0].id],
1390 drone_set
=drone_set_1
)
1391 job_2
= self
._create
_job
_simple
([self
.hosts
[0].id],
1392 drone_set
=drone_set_2
)
1393 job_3
= self
._create
_job
_simple
([self
.hosts
[0].id],
1394 drone_set
=drone_set_3
)
1396 job_4
= self
._create
_job
_simple
([self
.hosts
[0].id])
1397 job_4
.drone_set
= None
1400 hqe_1
= job_1
.hostqueueentry_set
.all()[0]
1401 hqe_2
= job_2
.hostqueueentry_set
.all()[0]
1402 hqe_3
= job_3
.hostqueueentry_set
.all()[0]
1403 hqe_4
= job_4
.hostqueueentry_set
.all()[0]
1405 return (hqe_1
, hqe_2
, hqe_3
, hqe_4
), monitor_db
.AgentTask()
1408 def test_get_drone_hostnames_allowed_no_drones_in_set(self
):
1409 hqes
, task
= self
._setup
_drones
()
1410 task
.queue_entry_ids
= (hqes
[2].id,)
1411 self
.assertEqual(set(), task
.get_drone_hostnames_allowed())
1412 self
.god
.check_playback()
1415 def test_get_drone_hostnames_allowed_no_drone_set(self
):
1416 hqes
, task
= self
._setup
_drones
()
1418 task
.queue_entry_ids
= (hqe
.id,)
1422 self
.god
.stub_function(task
, '_user_or_global_default_drone_set')
1423 task
._user
_or
_global
_default
_drone
_set
.expect_call(
1424 hqe
.job
, hqe
.job
.user()).and_return(result
)
1426 self
.assertEqual(result
, task
.get_drone_hostnames_allowed())
1427 self
.god
.check_playback()
1430 def test_get_drone_hostnames_allowed_success(self
):
1431 hqes
, task
= self
._setup
_drones
()
1432 task
.queue_entry_ids
= (hqes
[0].id,)
1433 self
.assertEqual(set(('0','1')), task
.get_drone_hostnames_allowed())
1434 self
.god
.check_playback()
1437 def test_get_drone_hostnames_allowed_multiple_jobs(self
):
1438 hqes
, task
= self
._setup
_drones
()
1439 task
.queue_entry_ids
= (hqes
[0].id, hqes
[1].id)
1440 self
.assertRaises(AssertionError,
1441 task
.get_drone_hostnames_allowed
)
1442 self
.god
.check_playback()
1445 def test_get_drone_hostnames_allowed_no_hqe(self
):
1446 class MockSpecialTask(object):
1447 requested_by
= object()
1449 class MockSpecialAgentTask(monitor_db
.SpecialAgentTask
):
1450 task
= MockSpecialTask()
1451 queue_entry_ids
= []
1452 def __init__(self
, *args
, **kwargs
):
1455 task
= MockSpecialAgentTask()
1456 self
.god
.stub_function(models
.DroneSet
, 'drone_sets_enabled')
1457 self
.god
.stub_function(task
, '_user_or_global_default_drone_set')
1460 models
.DroneSet
.drone_sets_enabled
.expect_call().and_return(True)
1461 task
._user
_or
_global
_default
_drone
_set
.expect_call(
1462 task
.task
, MockSpecialTask
.requested_by
).and_return(result
)
1464 self
.assertEqual(result
, task
.get_drone_hostnames_allowed())
1465 self
.god
.check_playback()
1468 def _setup_test_user_or_global_default_drone_set(self
):
1470 class MockDroneSet(object):
1471 def get_drone_hostnames(self
):
1474 self
.god
.stub_function(models
.DroneSet
, 'get_default')
1475 models
.DroneSet
.get_default
.expect_call().and_return(MockDroneSet())
1479 def test_user_or_global_default_drone_set(self
):
1481 class MockDroneSet(object):
1482 def get_drone_hostnames(self
):
1484 class MockUser(object):
1485 drone_set
= MockDroneSet()
1487 self
._setup
_test
_user
_or
_global
_default
_drone
_set
()
1489 actual
= monitor_db
.AgentTask()._user
_or
_global
_default
_drone
_set
(
1492 self
.assertEqual(expected
, actual
)
1493 self
.god
.check_playback()
1496 def test_user_or_global_default_drone_set_no_user(self
):
1497 expected
= self
._setup
_test
_user
_or
_global
_default
_drone
_set
()
1498 actual
= monitor_db
.AgentTask()._user
_or
_global
_default
_drone
_set
(
1501 self
.assertEqual(expected
, actual
)
1502 self
.god
.check_playback()
1505 def test_user_or_global_default_drone_set_no_user_drone_set(self
):
1506 class MockUser(object):
1510 expected
= self
._setup
_test
_user
_or
_global
_default
_drone
_set
()
1511 actual
= monitor_db
.AgentTask()._user
_or
_global
_default
_drone
_set
(
1514 self
.assertEqual(expected
, actual
)
1515 self
.god
.check_playback()
1518 if __name__
== '__main__':