3 import logging
, os
, unittest
5 from autotest_lib
.client
.common_lib
import enum
, global_config
, host_protections
6 from autotest_lib
.database
import database_connection
7 from autotest_lib
.frontend
import setup_django_environment
8 from autotest_lib
.frontend
.afe
import frontend_test_utils
, models
9 from autotest_lib
.frontend
.afe
import model_attributes
10 from autotest_lib
.scheduler
import drone_manager
, email_manager
, host_scheduler
11 from autotest_lib
.scheduler
import monitor_db
, scheduler_models
13 # translations necessary for scheduler queries to work with SQLite
14 _re_translator
= database_connection
.TranslatingDatabase
.make_regexp_translator
16 _re_translator(r
'NOW\(\)', 'time("now")'),
17 _re_translator(r
'LAST_INSERT_ID\(\)', 'LAST_INSERT_ROWID()'),
18 # older SQLite doesn't support group_concat, so just don't bother until
19 # it arises in an important query
20 _re_translator(r
'GROUP_CONCAT\((.*?)\)', r
'\1'),
23 HqeStatus
= models
.HostQueueEntry
.Status
24 HostStatus
= models
.Host
.Status
26 class NullMethodObject(object):
30 def null_method(*args
, **kwargs
):
33 for method_name
in self
._NULL
_METHODS
:
34 setattr(self
, method_name
, null_method
)
36 class MockGlobalConfig(object):
38 self
._config
_info
= {}
41 def set_config_value(self
, section
, key
, value
):
42 self
._config
_info
[(section
, key
)] = value
45 def get_config_value(self
, section
, key
, type=str,
46 default
=None, allow_blank
=False):
47 identifier
= (section
, key
)
48 if identifier
not in self
._config
_info
:
50 return self
._config
_info
[identifier
]
53 # the SpecialTask names here must match the suffixes used on the SpecialTask
55 _PidfileType
= enum
.Enum('verify', 'cleanup', 'repair', 'job', 'gather',
59 _PIDFILE_TO_PIDFILE_TYPE
= {
60 drone_manager
.AUTOSERV_PID_FILE
: _PidfileType
.JOB
,
61 drone_manager
.CRASHINFO_PID_FILE
: _PidfileType
.GATHER
,
62 drone_manager
.PARSER_PID_FILE
: _PidfileType
.PARSE
,
63 drone_manager
.ARCHIVER_PID_FILE
: _PidfileType
.ARCHIVE
,
67 _PIDFILE_TYPE_TO_PIDFILE
= dict((value
, key
) for key
, value
68 in _PIDFILE_TO_PIDFILE_TYPE
.iteritems())
71 class MockDroneManager(NullMethodObject
):
74 max_runnable_processes_value: value returned by max_runnable_processes().
75 tests can change this to activate throttling.
77 _NULL_METHODS
= ('reinitialize_drones', 'copy_to_results_repository',
78 'copy_results_on_drone')
80 class _DummyPidfileId(object):
82 Object to represent pidfile IDs that is opaque to the scheduler code but
83 still debugging-friendly for us.
85 def __init__(self
, working_directory
, pidfile_name
, num_processes
=None):
86 self
._working
_directory
= working_directory
87 self
._pidfile
_name
= pidfile_name
88 self
._num
_processes
= num_processes
89 self
._paired
_with
_pidfile
= None
93 """Key for MockDroneManager._pidfile_index"""
94 return (self
._working
_directory
, self
._pidfile
_name
)
98 return os
.path
.join(self
._working
_directory
, self
._pidfile
_name
)
102 return '<_DummyPidfileId: %s>' % str(self
)
106 super(MockDroneManager
, self
).__init
__()
107 self
.process_capacity
= 100
109 # maps result_dir to set of tuples (file_path, file_contents)
110 self
._attached
_files
= {}
111 # maps pidfile IDs to PidfileContents
113 # pidfile IDs that haven't been created yet
114 self
._future
_pidfiles
= []
115 # maps _PidfileType to the most recently created pidfile ID of that type
116 self
._last
_pidfile
_id
= {}
117 # maps (working_directory, pidfile_name) to pidfile IDs
118 self
._pidfile
_index
= {}
119 # maps process to pidfile IDs
120 self
._process
_index
= {}
121 # tracks pidfiles of processes that have been killed
122 self
._killed
_pidfiles
= set()
123 # pidfile IDs that have just been unregistered (so will disappear on the
125 self
._unregistered
_pidfiles
= set()
128 # utility APIs for use by the test
130 def finish_process(self
, pidfile_type
, exit_status
=0):
131 pidfile_id
= self
._last
_pidfile
_id
[pidfile_type
]
132 self
._set
_pidfile
_exit
_status
(pidfile_id
, exit_status
)
135 def finish_specific_process(self
, working_directory
, pidfile_name
):
136 pidfile_id
= self
.pidfile_from_path(working_directory
, pidfile_name
)
137 self
._set
_pidfile
_exit
_status
(pidfile_id
, 0)
140 def _set_pidfile_exit_status(self
, pidfile_id
, exit_status
):
141 assert pidfile_id
is not None
142 contents
= self
._pidfiles
[pidfile_id
]
143 contents
.exit_status
= exit_status
144 contents
.num_tests_failed
= 0
147 def was_last_process_killed(self
, pidfile_type
):
148 pidfile_id
= self
._last
_pidfile
_id
[pidfile_type
]
149 return pidfile_id
in self
._killed
_pidfiles
152 def nonfinished_pidfile_ids(self
):
153 return [pidfile_id
for pidfile_id
, pidfile_contents
154 in self
._pidfiles
.iteritems()
155 if pidfile_contents
.exit_status
is None]
158 def running_pidfile_ids(self
):
159 return [pidfile_id
for pidfile_id
in self
.nonfinished_pidfile_ids()
160 if self
._pidfiles
[pidfile_id
].process
is not None]
163 def pidfile_from_path(self
, working_directory
, pidfile_name
):
164 return self
._pidfile
_index
[(working_directory
, pidfile_name
)]
167 def attached_files(self
, working_directory
):
169 Return dict mapping path to contents for attached files with specified
172 return dict((path
, contents
) for path
, contents
173 in self
._attached
_files
.get(working_directory
, [])
177 # DroneManager emulation APIs for use by monitor_db
179 def get_orphaned_autoserv_processes(self
):
183 def total_running_processes(self
):
184 return sum(pidfile_id
._num
_processes
185 for pidfile_id
in self
.nonfinished_pidfile_ids())
188 def max_runnable_processes(self
, username
, drone_hostnames_allowed
):
189 return self
.process_capacity
- self
.total_running_processes()
193 for pidfile_id
in self
._unregistered
_pidfiles
:
194 # intentionally handle non-registered pidfiles silently
195 self
._pidfiles
.pop(pidfile_id
, None)
196 self
._unregistered
_pidfiles
= set()
199 def execute_actions(self
):
200 # executing an "execute_command" causes a pidfile to be created
201 for pidfile_id
in self
._future
_pidfiles
:
202 # Process objects are opaque to monitor_db
204 self
._pidfiles
[pidfile_id
].process
= process
205 self
._process
_index
[process
] = pidfile_id
206 self
._future
_pidfiles
= []
209 def attach_file_to_execution(self
, result_dir
, file_contents
,
211 self
._attached
_files
.setdefault(result_dir
, set()).add((file_path
,
216 def _initialize_pidfile(self
, pidfile_id
):
217 if pidfile_id
not in self
._pidfiles
:
218 assert pidfile_id
.key() not in self
._pidfile
_index
219 self
._pidfiles
[pidfile_id
] = drone_manager
.PidfileContents()
220 self
._pidfile
_index
[pidfile_id
.key()] = pidfile_id
223 def _set_last_pidfile(self
, pidfile_id
, working_directory
, pidfile_name
):
224 if working_directory
.startswith('hosts/'):
225 # such paths look like hosts/host1/1-verify, we'll grab the end
226 type_string
= working_directory
.rsplit('-', 1)[1]
227 pidfile_type
= _PidfileType
.get_value(type_string
)
229 pidfile_type
= _PIDFILE_TO_PIDFILE_TYPE
[pidfile_name
]
230 self
._last
_pidfile
_id
[pidfile_type
] = pidfile_id
233 def execute_command(self
, command
, working_directory
, pidfile_name
,
234 num_processes
, log_file
=None, paired_with_pidfile
=None,
235 username
=None, drone_hostnames_allowed
=None):
236 logging
.debug('Executing %s in %s', command
, working_directory
)
237 pidfile_id
= self
._DummyPidfileId
(working_directory
, pidfile_name
)
238 if pidfile_id
.key() in self
._pidfile
_index
:
239 pidfile_id
= self
._pidfile
_index
[pidfile_id
.key()]
240 pidfile_id
._num
_processes
= num_processes
241 pidfile_id
._paired
_with
_pidfile
= paired_with_pidfile
243 self
._future
_pidfiles
.append(pidfile_id
)
244 self
._initialize
_pidfile
(pidfile_id
)
245 self
._pidfile
_index
[(working_directory
, pidfile_name
)] = pidfile_id
246 self
._set
_last
_pidfile
(pidfile_id
, working_directory
, pidfile_name
)
250 def get_pidfile_contents(self
, pidfile_id
, use_second_read
=False):
251 if pidfile_id
not in self
._pidfiles
:
252 logging
.debug('Request for nonexistent pidfile %s' % pidfile_id
)
253 return self
._pidfiles
.get(pidfile_id
, drone_manager
.PidfileContents())
256 def is_process_running(self
, process
):
260 def register_pidfile(self
, pidfile_id
):
261 self
._initialize
_pidfile
(pidfile_id
)
264 def unregister_pidfile(self
, pidfile_id
):
265 self
._unregistered
_pidfiles
.add(pidfile_id
)
268 def declare_process_count(self
, pidfile_id
, num_processes
):
269 pidfile_id
.num_processes
= num_processes
272 def absolute_path(self
, path
):
273 return 'absolute/' + path
276 def write_lines_to_file(self
, file_path
, lines
, paired_with_process
=None):
281 def get_pidfile_id_from(self
, execution_tag
, pidfile_name
):
282 default_pidfile
= self
._DummyPidfileId
(execution_tag
, pidfile_name
,
284 return self
._pidfile
_index
.get((execution_tag
, pidfile_name
),
288 def kill_process(self
, process
):
289 pidfile_id
= self
._process
_index
[process
]
290 self
._killed
_pidfiles
.add(pidfile_id
)
291 self
._set
_pidfile
_exit
_status
(pidfile_id
, 271)
294 class MockEmailManager(NullMethodObject
):
295 _NULL_METHODS
= ('send_queued_emails', 'send_email')
297 def enqueue_notify_email(self
, subject
, message
):
298 logging
.warn('enqueue_notify_email: %s', subject
)
299 logging
.warn(message
)
302 class SchedulerFunctionalTest(unittest
.TestCase
,
303 frontend_test_utils
.FrontendTestMixin
):
304 # some number of ticks after which the scheduler is presumed to have
305 # stabilized, given no external changes
309 self
._frontend
_common
_setup
()
311 self
._set
_global
_config
_values
()
312 self
._create
_dispatcher
()
314 logging
.basicConfig(level
=logging
.DEBUG
)
317 def _create_dispatcher(self
):
318 self
.dispatcher
= monitor_db
.Dispatcher()
322 self
._database
.disconnect()
323 self
._frontend
_common
_teardown
()
326 def _set_stubs(self
):
327 self
.mock_config
= MockGlobalConfig()
328 self
.god
.stub_with(global_config
, 'global_config', self
.mock_config
)
330 self
.mock_drone_manager
= MockDroneManager()
331 drone_manager
._set
_instance
(self
.mock_drone_manager
)
333 self
.mock_email_manager
= MockEmailManager()
334 self
.god
.stub_with(email_manager
, 'manager', self
.mock_email_manager
)
337 database_connection
.TranslatingDatabase
.get_test_database(
338 translators
=_DB_TRANSLATORS
))
339 self
._database
.connect(db_type
='django')
340 self
.god
.stub_with(monitor_db
, '_db', self
._database
)
341 self
.god
.stub_with(scheduler_models
, '_db', self
._database
)
343 monitor_db
.initialize_globals()
344 scheduler_models
.initialize_globals()
347 def _set_global_config_values(self
):
348 self
.mock_config
.set_config_value('SCHEDULER', 'pidfile_timeout_mins',
350 self
.mock_config
.set_config_value('SCHEDULER', 'gc_stats_interval_mins',
354 def _initialize_test(self
):
355 self
.dispatcher
.initialize()
358 def _run_dispatcher(self
):
359 for _
in xrange(self
._A
_LOT
_OF
_TICKS
):
360 self
.dispatcher
.tick()
364 self
._initialize
_test
()
365 self
._run
_dispatcher
()
368 def _assert_process_executed(self
, working_directory
, pidfile_name
):
369 process_was_executed
= self
.mock_drone_manager
.was_process_executed(
370 'hosts/host1/1-verify', drone_manager
.AUTOSERV_PID_FILE
)
371 self
.assert_(process_was_executed
,
372 '%s/%s not executed' % (working_directory
, pidfile_name
))
375 def _update_instance(self
, model_instance
):
376 return type(model_instance
).objects
.get(pk
=model_instance
.pk
)
379 def _check_statuses(self
, queue_entry
, queue_entry_status
,
381 self
._check
_entry
_status
(queue_entry
, queue_entry_status
)
383 self
._check
_host
_status
(queue_entry
.host
, host_status
)
386 def _check_entry_status(self
, queue_entry
, status
):
388 queue_entry
= self
._update
_instance
(queue_entry
)
389 self
.assertEquals(queue_entry
.status
, status
)
392 def _check_host_status(self
, host
, status
):
394 host
= self
._update
_instance
(host
)
395 self
.assertEquals(host
.status
, status
)
398 def _run_pre_job_verify(self
, queue_entry
):
399 self
._run
_dispatcher
() # launches verify
400 self
._check
_statuses
(queue_entry
, HqeStatus
.VERIFYING
,
401 HostStatus
.VERIFYING
)
402 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
405 def test_simple_job(self
):
406 self
._initialize
_test
()
407 job
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
408 self
._run
_pre
_job
_verify
(queue_entry
)
409 self
._run
_dispatcher
() # launches job
410 self
._check
_statuses
(queue_entry
, HqeStatus
.RUNNING
, HostStatus
.RUNNING
)
411 self
._finish
_job
(queue_entry
)
412 self
._check
_statuses
(queue_entry
, HqeStatus
.COMPLETED
, HostStatus
.READY
)
413 self
._assert
_nothing
_is
_running
()
416 def _setup_for_pre_job_cleanup(self
):
417 self
._initialize
_test
()
418 job
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
419 job
.reboot_before
= model_attributes
.RebootBefore
.ALWAYS
424 def _run_pre_job_cleanup_job(self
, queue_entry
):
425 self
._run
_dispatcher
() # cleanup
426 self
._check
_statuses
(queue_entry
, HqeStatus
.VERIFYING
,
428 self
.mock_drone_manager
.finish_process(_PidfileType
.CLEANUP
)
429 self
._run
_dispatcher
() # verify
430 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
431 self
._run
_dispatcher
() # job
432 self
._finish
_job
(queue_entry
)
435 def test_pre_job_cleanup(self
):
436 queue_entry
= self
._setup
_for
_pre
_job
_cleanup
()
437 self
._run
_pre
_job
_cleanup
_job
(queue_entry
)
440 def _run_pre_job_cleanup_one_failure(self
):
441 queue_entry
= self
._setup
_for
_pre
_job
_cleanup
()
442 self
._run
_dispatcher
() # cleanup
443 self
.mock_drone_manager
.finish_process(_PidfileType
.CLEANUP
,
445 self
._run
_dispatcher
() # repair
446 self
._check
_statuses
(queue_entry
, HqeStatus
.QUEUED
,
447 HostStatus
.REPAIRING
)
448 self
.mock_drone_manager
.finish_process(_PidfileType
.REPAIR
)
452 def test_pre_job_cleanup_failure(self
):
453 queue_entry
= self
._run
_pre
_job
_cleanup
_one
_failure
()
454 # from here the job should run as normal
455 self
._run
_pre
_job
_cleanup
_job
(queue_entry
)
458 def test_pre_job_cleanup_double_failure(self
):
459 # TODO (showard): this test isn't perfect. in reality, when the second
460 # cleanup fails, it copies its results over to the job directory using
461 # copy_results_on_drone() and then parses them. since we don't handle
462 # that, there appear to be no results at the job directory. the
463 # scheduler handles this gracefully, parsing gets effectively skipped,
464 # and this test passes as is. but we ought to properly test that
466 queue_entry
= self
._run
_pre
_job
_cleanup
_one
_failure
()
467 self
._run
_dispatcher
() # second cleanup
468 self
.mock_drone_manager
.finish_process(_PidfileType
.CLEANUP
,
470 self
._run
_dispatcher
()
471 self
._check
_statuses
(queue_entry
, HqeStatus
.FAILED
,
472 HostStatus
.REPAIR_FAILED
)
473 # nothing else should run
474 self
._assert
_nothing
_is
_running
()
477 def _assert_nothing_is_running(self
):
478 self
.assertEquals(self
.mock_drone_manager
.running_pidfile_ids(), [])
481 def _setup_for_post_job_cleanup(self
):
482 self
._initialize
_test
()
483 job
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
484 job
.reboot_after
= model_attributes
.RebootAfter
.ALWAYS
489 def _run_post_job_cleanup_failure_up_to_repair(self
, queue_entry
,
490 include_verify
=True):
492 self
._run
_pre
_job
_verify
(queue_entry
)
493 self
._run
_dispatcher
() # job
494 self
.mock_drone_manager
.finish_process(_PidfileType
.JOB
)
495 self
._run
_dispatcher
() # parsing + cleanup
496 self
.mock_drone_manager
.finish_process(_PidfileType
.PARSE
)
497 self
.mock_drone_manager
.finish_process(_PidfileType
.CLEANUP
,
499 self
._run
_dispatcher
() # repair, HQE unaffected
500 self
.mock_drone_manager
.finish_process(_PidfileType
.ARCHIVE
)
501 self
._run
_dispatcher
()
505 def test_post_job_cleanup_failure(self
):
506 queue_entry
= self
._setup
_for
_post
_job
_cleanup
()
507 self
._run
_post
_job
_cleanup
_failure
_up
_to
_repair
(queue_entry
)
508 self
._check
_statuses
(queue_entry
, HqeStatus
.COMPLETED
,
509 HostStatus
.REPAIRING
)
510 self
.mock_drone_manager
.finish_process(_PidfileType
.REPAIR
)
511 self
._run
_dispatcher
()
512 self
._check
_statuses
(queue_entry
, HqeStatus
.COMPLETED
, HostStatus
.READY
)
515 def test_post_job_cleanup_failure_repair_failure(self
):
516 queue_entry
= self
._setup
_for
_post
_job
_cleanup
()
517 self
._run
_post
_job
_cleanup
_failure
_up
_to
_repair
(queue_entry
)
518 self
.mock_drone_manager
.finish_process(_PidfileType
.REPAIR
,
520 self
._run
_dispatcher
()
521 self
._check
_statuses
(queue_entry
, HqeStatus
.COMPLETED
,
522 HostStatus
.REPAIR_FAILED
)
525 def _ensure_post_job_process_is_paired(self
, queue_entry
, pidfile_type
):
526 pidfile_name
= _PIDFILE_TYPE_TO_PIDFILE
[pidfile_type
]
527 queue_entry
= self
._update
_instance
(queue_entry
)
528 pidfile_id
= self
.mock_drone_manager
.pidfile_from_path(
529 queue_entry
.execution_path(), pidfile_name
)
530 self
.assert_(pidfile_id
._paired
_with
_pidfile
)
533 def _finish_job(self
, queue_entry
):
534 self
.mock_drone_manager
.finish_process(_PidfileType
.JOB
)
535 self
._run
_dispatcher
() # launches parsing + cleanup
536 self
._check
_statuses
(queue_entry
, HqeStatus
.PARSING
,
538 self
._ensure
_post
_job
_process
_is
_paired
(queue_entry
, _PidfileType
.PARSE
)
539 self
._finish
_parsing
_and
_cleanup
(queue_entry
)
542 def _finish_parsing_and_cleanup(self
, queue_entry
):
543 self
.mock_drone_manager
.finish_process(_PidfileType
.CLEANUP
)
544 self
.mock_drone_manager
.finish_process(_PidfileType
.PARSE
)
545 self
._run
_dispatcher
()
547 self
._check
_entry
_status
(queue_entry
, HqeStatus
.ARCHIVING
)
548 self
.mock_drone_manager
.finish_process(_PidfileType
.ARCHIVE
)
549 self
._run
_dispatcher
()
552 def _create_reverify_request(self
):
554 models
.SpecialTask
.schedule_special_task(
555 host
=host
, task
=models
.SpecialTask
.Task
.VERIFY
)
559 def test_requested_reverify(self
):
560 host
= self
._create
_reverify
_request
()
561 self
._run
_dispatcher
()
562 self
._check
_host
_status
(host
, HostStatus
.VERIFYING
)
563 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
564 self
._run
_dispatcher
()
565 self
._check
_host
_status
(host
, HostStatus
.READY
)
568 def test_requested_reverify_failure(self
):
569 host
= self
._create
_reverify
_request
()
570 self
._run
_dispatcher
()
571 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
,
573 self
._run
_dispatcher
() # repair
574 self
._check
_host
_status
(host
, HostStatus
.REPAIRING
)
575 self
.mock_drone_manager
.finish_process(_PidfileType
.REPAIR
)
576 self
._run
_dispatcher
()
577 self
._check
_host
_status
(host
, HostStatus
.READY
)
580 def _setup_for_do_not_verify(self
):
581 self
._initialize
_test
()
582 job
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
583 queue_entry
.host
.protection
= host_protections
.Protection
.DO_NOT_VERIFY
584 queue_entry
.host
.save()
588 def test_do_not_verify_job(self
):
589 queue_entry
= self
._setup
_for
_do
_not
_verify
()
590 self
._run
_dispatcher
() # runs job directly
591 self
._finish
_job
(queue_entry
)
594 def test_do_not_verify_job_with_cleanup(self
):
595 queue_entry
= self
._setup
_for
_do
_not
_verify
()
596 queue_entry
.job
.reboot_before
= model_attributes
.RebootBefore
.ALWAYS
597 queue_entry
.job
.save()
599 self
._run
_dispatcher
() # cleanup
600 self
.mock_drone_manager
.finish_process(_PidfileType
.CLEANUP
)
601 self
._run
_dispatcher
() # job
602 self
._finish
_job
(queue_entry
)
605 def test_do_not_verify_pre_job_cleanup_failure(self
):
606 queue_entry
= self
._setup
_for
_do
_not
_verify
()
607 queue_entry
.job
.reboot_before
= model_attributes
.RebootBefore
.ALWAYS
608 queue_entry
.job
.save()
610 self
._run
_dispatcher
() # cleanup
611 self
.mock_drone_manager
.finish_process(_PidfileType
.CLEANUP
,
613 self
._run
_dispatcher
() # failure ignored; job runs
614 self
._finish
_job
(queue_entry
)
617 def test_do_not_verify_post_job_cleanup_failure(self
):
618 queue_entry
= self
._setup
_for
_do
_not
_verify
()
620 self
._run
_post
_job
_cleanup
_failure
_up
_to
_repair
(queue_entry
,
621 include_verify
=False)
622 # failure ignored, host still set to Ready
623 self
._check
_statuses
(queue_entry
, HqeStatus
.COMPLETED
, HostStatus
.READY
)
624 self
._run
_dispatcher
() # nothing else runs
625 self
._assert
_nothing
_is
_running
()
628 def test_do_not_verify_requested_reverify_failure(self
):
629 host
= self
._create
_reverify
_request
()
630 host
.protection
= host_protections
.Protection
.DO_NOT_VERIFY
633 self
._run
_dispatcher
()
634 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
,
636 self
._run
_dispatcher
()
637 self
._check
_host
_status
(host
, HostStatus
.READY
) # ignore failure
638 self
._assert
_nothing
_is
_running
()
641 def test_job_abort_in_verify(self
):
642 self
._initialize
_test
()
643 job
= self
._create
_job
(hosts
=[1])
644 self
._run
_dispatcher
() # launches verify
645 job
.hostqueueentry_set
.update(aborted
=True)
646 self
._run
_dispatcher
() # kills verify, launches cleanup
647 self
.assert_(self
.mock_drone_manager
.was_last_process_killed(
648 _PidfileType
.VERIFY
))
649 self
.mock_drone_manager
.finish_process(_PidfileType
.CLEANUP
)
650 self
._run
_dispatcher
()
653 def test_job_abort(self
):
654 self
._initialize
_test
()
655 job
= self
._create
_job
(hosts
=[1])
656 job
.run_verify
= False
659 self
._run
_dispatcher
() # launches job
660 job
.hostqueueentry_set
.update(aborted
=True)
661 self
._run
_dispatcher
() # kills job, launches gathering
662 self
.assert_(self
.mock_drone_manager
.was_last_process_killed(
664 self
.mock_drone_manager
.finish_process(_PidfileType
.GATHER
)
665 self
._run
_dispatcher
() # launches parsing + cleanup
666 queue_entry
= job
.hostqueueentry_set
.all()[0]
667 self
._finish
_parsing
_and
_cleanup
(queue_entry
)
670 def test_job_abort_queued_synchronous(self
):
671 self
._initialize
_test
()
672 job
= self
._create
_job
(hosts
=[1,2])
676 job
.hostqueueentry_set
.update(aborted
=True)
677 self
._run
_dispatcher
()
678 for host_queue_entry
in job
.hostqueueentry_set
.all():
679 self
.assertEqual(host_queue_entry
.status
,
683 def test_no_pidfile_leaking(self
):
684 self
._initialize
_test
()
685 self
.test_simple_job()
686 self
.assertEquals(self
.mock_drone_manager
._pidfiles
, {})
688 self
.test_job_abort_in_verify()
689 self
.assertEquals(self
.mock_drone_manager
._pidfiles
, {})
691 self
.test_job_abort()
692 self
.assertEquals(self
.mock_drone_manager
._pidfiles
, {})
695 def _make_job_and_queue_entry(self
):
696 job
= self
._create
_job
(hosts
=[1])
697 queue_entry
= job
.hostqueueentry_set
.all()[0]
698 return job
, queue_entry
701 def test_recover_running_no_process(self
):
702 # recovery should re-execute a Running HQE if no process is found
703 _
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
704 queue_entry
.status
= HqeStatus
.RUNNING
705 queue_entry
.execution_subdir
= '1-myuser/host1'
707 queue_entry
.host
.status
= HostStatus
.RUNNING
708 queue_entry
.host
.save()
710 self
._initialize
_test
()
711 self
._run
_dispatcher
()
712 self
._finish
_job
(queue_entry
)
715 def test_recover_verifying_hqe_no_special_task(self
):
716 # recovery should fail on a Verifing HQE with no corresponding
717 # Verify or Cleanup SpecialTask
718 _
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
719 queue_entry
.status
= HqeStatus
.VERIFYING
722 # make some dummy SpecialTasks that shouldn't count
723 models
.SpecialTask
.objects
.create(
724 host
=queue_entry
.host
,
725 task
=models
.SpecialTask
.Task
.VERIFY
,
726 requested_by
=models
.User
.current_user())
727 models
.SpecialTask
.objects
.create(
728 host
=queue_entry
.host
,
729 task
=models
.SpecialTask
.Task
.CLEANUP
,
730 queue_entry
=queue_entry
,
732 requested_by
=models
.User
.current_user())
734 self
.assertRaises(host_scheduler
.SchedulerError
, self
._initialize
_test
)
737 def _test_recover_verifying_hqe_helper(self
, task
, pidfile_type
):
738 _
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
739 queue_entry
.status
= HqeStatus
.VERIFYING
742 special_task
= models
.SpecialTask
.objects
.create(
743 host
=queue_entry
.host
, task
=task
, queue_entry
=queue_entry
)
745 self
._initialize
_test
()
746 self
._run
_dispatcher
()
747 self
.mock_drone_manager
.finish_process(pidfile_type
)
748 self
._run
_dispatcher
()
749 # don't bother checking the rest of the job execution, as long as the
753 def test_recover_verifying_hqe_with_cleanup(self
):
754 # recover an HQE that was in pre-job cleanup
755 self
._test
_recover
_verifying
_hqe
_helper
(models
.SpecialTask
.Task
.CLEANUP
,
756 _PidfileType
.CLEANUP
)
759 def test_recover_verifying_hqe_with_verify(self
):
760 # recover an HQE that was in pre-job verify
761 self
._test
_recover
_verifying
_hqe
_helper
(models
.SpecialTask
.Task
.VERIFY
,
765 def test_recover_pending_hqes_with_group(self
):
766 # recover a group of HQEs that are in Pending, in the same group (e.g.,
767 # in a job with atomic hosts)
768 job
= self
._create
_job
(hosts
=[1,2], atomic_group
=1)
771 job
.hostqueueentry_set
.all().update(status
=HqeStatus
.PENDING
)
773 self
._initialize
_test
()
774 for queue_entry
in job
.hostqueueentry_set
.all():
775 self
.assertEquals(queue_entry
.status
, HqeStatus
.STARTING
)
778 def test_recover_parsing(self
):
779 self
._initialize
_test
()
780 job
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
781 job
.run_verify
= False
782 job
.reboot_after
= model_attributes
.RebootAfter
.NEVER
785 self
._run
_dispatcher
() # launches job
786 self
.mock_drone_manager
.finish_process(_PidfileType
.JOB
)
787 self
._run
_dispatcher
() # launches parsing
789 # now "restart" the scheduler
790 self
._create
_dispatcher
()
791 self
._initialize
_test
()
792 self
._run
_dispatcher
()
793 self
.mock_drone_manager
.finish_process(_PidfileType
.PARSE
)
794 self
._run
_dispatcher
()
797 def test_recover_parsing__no_process_already_aborted(self
):
798 _
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
799 queue_entry
.execution_subdir
= 'host1'
800 queue_entry
.status
= HqeStatus
.PARSING
801 queue_entry
.aborted
= True
804 self
._initialize
_test
()
805 self
._run
_dispatcher
()
808 def test_job_scheduled_just_after_abort(self
):
809 # test a pretty obscure corner case where a job is aborted while queued,
810 # another job is ready to run, and throttling is active. the post-abort
811 # cleanup must not be pre-empted by the second job.
812 job1
, queue_entry1
= self
._make
_job
_and
_queue
_entry
()
813 job2
, queue_entry2
= self
._make
_job
_and
_queue
_entry
()
815 self
.mock_drone_manager
.process_capacity
= 0
816 self
._run
_dispatcher
() # schedule job1, but won't start verify
817 job1
.hostqueueentry_set
.update(aborted
=True)
818 self
.mock_drone_manager
.process_capacity
= 100
819 self
._run
_dispatcher
() # cleanup must run here, not verify for job2
820 self
._check
_statuses
(queue_entry1
, HqeStatus
.ABORTED
,
822 self
.mock_drone_manager
.finish_process(_PidfileType
.CLEANUP
)
823 self
._run
_dispatcher
() # now verify starts for job2
824 self
._check
_statuses
(queue_entry2
, HqeStatus
.VERIFYING
,
825 HostStatus
.VERIFYING
)
828 def test_reverify_interrupting_pre_job(self
):
829 # ensure things behave sanely if a reverify is scheduled in the middle
831 _
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
833 self
._run
_dispatcher
() # pre-job verify
834 self
._create
_reverify
_request
()
835 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
,
837 self
._run
_dispatcher
() # repair
838 self
.mock_drone_manager
.finish_process(_PidfileType
.REPAIR
)
839 self
._run
_dispatcher
() # reverify runs now
840 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
841 self
._run
_dispatcher
() # pre-job verify
842 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
843 self
._run
_dispatcher
() # and job runs...
844 self
._check
_statuses
(queue_entry
, HqeStatus
.RUNNING
, HostStatus
.RUNNING
)
845 self
._finish
_job
(queue_entry
) # reverify has been deleted
846 self
._check
_statuses
(queue_entry
, HqeStatus
.COMPLETED
,
848 self
._assert
_nothing
_is
_running
()
851 def test_reverify_while_job_running(self
):
852 # once a job is running, a reverify must not be allowed to preempt
854 _
, queue_entry
= self
._make
_job
_and
_queue
_entry
()
855 self
._run
_pre
_job
_verify
(queue_entry
)
856 self
._run
_dispatcher
() # job runs
857 self
._create
_reverify
_request
()
858 # make job end with a signal, so gathering will run
859 self
.mock_drone_manager
.finish_process(_PidfileType
.JOB
,
861 self
._run
_dispatcher
() # gathering must start
862 self
.mock_drone_manager
.finish_process(_PidfileType
.GATHER
)
863 self
._run
_dispatcher
() # parsing and cleanup
864 self
._finish
_parsing
_and
_cleanup
(queue_entry
)
865 self
._run
_dispatcher
() # now reverify runs
866 self
._check
_statuses
(queue_entry
, HqeStatus
.FAILED
,
867 HostStatus
.VERIFYING
)
868 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
869 self
._run
_dispatcher
()
870 self
._check
_host
_status
(queue_entry
.host
, HostStatus
.READY
)
873 def test_reverify_while_host_pending(self
):
874 # ensure that if a reverify is scheduled while a host is in Pending, it
875 # won't run until the host is actually free
876 job
= self
._create
_job
(hosts
=[1,2])
877 queue_entry
= job
.hostqueueentry_set
.get(host__hostname
='host1')
881 host2
= self
.hosts
[1]
885 self
._run
_dispatcher
() # verify host1
886 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
887 self
._run
_dispatcher
() # host1 Pending
888 self
._check
_statuses
(queue_entry
, HqeStatus
.PENDING
, HostStatus
.PENDING
)
889 self
._create
_reverify
_request
()
890 self
._run
_dispatcher
() # nothing should happen here
891 self
._check
_statuses
(queue_entry
, HqeStatus
.PENDING
, HostStatus
.PENDING
)
893 # now let the job run
896 self
._run
_dispatcher
() # verify host2
897 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
898 self
._run
_dispatcher
() # run job
899 self
._finish
_job
(queue_entry
)
900 # need to explicitly finish host1's post-job cleanup
901 self
.mock_drone_manager
.finish_specific_process(
902 'hosts/host1/4-cleanup', drone_manager
.AUTOSERV_PID_FILE
)
903 self
._run
_dispatcher
()
904 # the reverify should now be running
905 self
._check
_statuses
(queue_entry
, HqeStatus
.COMPLETED
,
906 HostStatus
.VERIFYING
)
907 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
908 self
._run
_dispatcher
()
909 self
._check
_host
_status
(queue_entry
.host
, HostStatus
.READY
)
912 def test_throttling(self
):
913 job
= self
._create
_job
(hosts
=[1,2,3])
917 queue_entries
= list(job
.hostqueueentry_set
.all())
918 def _check_hqe_statuses(*statuses
):
919 for queue_entry
, status
in zip(queue_entries
, statuses
):
920 self
._check
_statuses
(queue_entry
, status
)
922 self
.mock_drone_manager
.process_capacity
= 2
923 self
._run
_dispatcher
() # verify runs on 1 and 2
924 _check_hqe_statuses(HqeStatus
.VERIFYING
, HqeStatus
.VERIFYING
,
926 self
.assertEquals(len(self
.mock_drone_manager
.running_pidfile_ids()), 2)
928 self
.mock_drone_manager
.finish_specific_process(
929 'hosts/host1/1-verify', drone_manager
.AUTOSERV_PID_FILE
)
930 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
931 self
._run
_dispatcher
() # verify runs on 3
932 _check_hqe_statuses(HqeStatus
.PENDING
, HqeStatus
.PENDING
,
935 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
936 self
._run
_dispatcher
() # job won't run due to throttling
937 _check_hqe_statuses(HqeStatus
.STARTING
, HqeStatus
.STARTING
,
939 self
._assert
_nothing
_is
_running
()
941 self
.mock_drone_manager
.process_capacity
= 3
942 self
._run
_dispatcher
() # now job runs
943 _check_hqe_statuses(HqeStatus
.RUNNING
, HqeStatus
.RUNNING
,
946 self
.mock_drone_manager
.process_capacity
= 2
947 self
.mock_drone_manager
.finish_process(_PidfileType
.JOB
,
949 self
._run
_dispatcher
() # gathering won't run due to throttling
950 _check_hqe_statuses(HqeStatus
.GATHERING
, HqeStatus
.GATHERING
,
952 self
._assert
_nothing
_is
_running
()
954 self
.mock_drone_manager
.process_capacity
= 3
955 self
._run
_dispatcher
() # now gathering runs
957 self
.mock_drone_manager
.process_capacity
= 0
958 self
.mock_drone_manager
.finish_process(_PidfileType
.GATHER
)
959 self
._run
_dispatcher
() # parsing runs despite throttling
960 _check_hqe_statuses(HqeStatus
.PARSING
, HqeStatus
.PARSING
,
964 def test_abort_starting_while_throttling(self
):
965 self
._initialize
_test
()
966 job
= self
._create
_job
(hosts
=[1,2], synchronous
=True)
967 queue_entry
= job
.hostqueueentry_set
.all()[0]
968 job
.run_verify
= False
969 job
.reboot_after
= model_attributes
.RebootAfter
.NEVER
972 self
.mock_drone_manager
.process_capacity
= 0
973 self
._run
_dispatcher
() # go to starting, but don't start job
974 self
._check
_statuses
(queue_entry
, HqeStatus
.STARTING
,
977 job
.hostqueueentry_set
.update(aborted
=True)
978 self
._run
_dispatcher
()
979 self
._check
_statuses
(queue_entry
, HqeStatus
.GATHERING
,
982 self
.mock_drone_manager
.process_capacity
= 5
983 self
._run
_dispatcher
()
984 self
._check
_statuses
(queue_entry
, HqeStatus
.ABORTED
,
988 def test_simple_atomic_group_job(self
):
989 job
= self
._create
_job
(atomic_group
=1)
990 self
._run
_dispatcher
() # expand + verify
991 queue_entries
= job
.hostqueueentry_set
.all()
992 self
.assertEquals(len(queue_entries
), 2)
993 self
.assertEquals(queue_entries
[0].host
.hostname
, 'host5')
994 self
.assertEquals(queue_entries
[1].host
.hostname
, 'host6')
996 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
997 self
._run
_dispatcher
() # delay task started waiting
999 self
.mock_drone_manager
.finish_specific_process(
1000 'hosts/host6/1-verify', drone_manager
.AUTOSERV_PID_FILE
)
1001 self
._run
_dispatcher
() # job starts now
1002 for entry
in queue_entries
:
1003 self
._check
_statuses
(entry
, HqeStatus
.RUNNING
, HostStatus
.RUNNING
)
1005 # rest of job proceeds normally
1008 def test_simple_metahost_assignment(self
):
1009 job
= self
._create
_job
(metahosts
=[1])
1010 self
._run
_dispatcher
()
1011 entry
= job
.hostqueueentry_set
.all()[0]
1012 self
.assertEquals(entry
.host
.hostname
, 'host1')
1013 self
._check
_statuses
(entry
, HqeStatus
.VERIFYING
, HostStatus
.VERIFYING
)
1014 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
1015 self
._run
_dispatcher
()
1016 self
._check
_statuses
(entry
, HqeStatus
.RUNNING
, HostStatus
.RUNNING
)
1017 # rest of job proceeds normally
1020 def test_metahost_fail_verify(self
):
1021 self
.hosts
[1].labels
.add(self
.labels
[0]) # put label1 also on host2
1022 job
= self
._create
_job
(metahosts
=[1])
1023 self
._run
_dispatcher
() # assigned to host1
1024 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
,
1026 self
._run
_dispatcher
() # host1 failed, gets reassigned to host2
1027 entry
= job
.hostqueueentry_set
.all()[0]
1028 self
.assertEquals(entry
.host
.hostname
, 'host2')
1029 self
._check
_statuses
(entry
, HqeStatus
.VERIFYING
, HostStatus
.VERIFYING
)
1030 self
._check
_host
_status
(self
.hosts
[0], HostStatus
.REPAIRING
)
1032 self
.mock_drone_manager
.finish_process(_PidfileType
.VERIFY
)
1033 self
._run
_dispatcher
()
1034 self
._check
_statuses
(entry
, HqeStatus
.RUNNING
, HostStatus
.RUNNING
)
1037 def test_hostless_job(self
):
1038 job
= self
._create
_job
(hostless
=True)
1039 entry
= job
.hostqueueentry_set
.all()[0]
1041 self
._run
_dispatcher
()
1042 self
._check
_entry
_status
(entry
, HqeStatus
.RUNNING
)
1044 self
.mock_drone_manager
.finish_process(_PidfileType
.JOB
)
1045 self
._run
_dispatcher
()
1046 self
._check
_entry
_status
(entry
, HqeStatus
.PARSING
)
1047 self
.mock_drone_manager
.finish_process(_PidfileType
.PARSE
)
1048 self
._run
_dispatcher
()
1049 self
._check
_entry
_status
(entry
, HqeStatus
.ARCHIVING
)
1050 self
.mock_drone_manager
.finish_process(_PidfileType
.ARCHIVE
)
1051 self
._run
_dispatcher
()
1052 self
._check
_entry
_status
(entry
, HqeStatus
.COMPLETED
)
1055 def test_pre_job_keyvals(self
):
1056 job
= self
._create
_job
(hosts
=[1])
1057 job
.run_verify
= False
1058 job
.reboot_before
= model_attributes
.RebootBefore
.NEVER
1060 models
.JobKeyval
.objects
.create(job
=job
, key
='mykey', value
='myvalue')
1062 self
._run
_dispatcher
()
1063 self
._finish
_job
(job
.hostqueueentry_set
.all()[0])
1065 attached_files
= self
.mock_drone_manager
.attached_files(
1066 '1-autotest_system/host1')
1067 job_keyval_path
= '1-autotest_system/host1/keyval'
1068 self
.assert_(job_keyval_path
in attached_files
, attached_files
)
1069 keyval_contents
= attached_files
[job_keyval_path
]
1070 keyval_dict
= dict(line
.strip().split('=', 1)
1071 for line
in keyval_contents
.splitlines())
1072 self
.assert_('job_queued' in keyval_dict
, keyval_dict
)
1073 self
.assertEquals(keyval_dict
['mykey'], 'myvalue')
1076 if __name__
== '__main__':