KVM test: tests_base.cfg. sample: Fix test dependencies
[autotest-zwu.git] / scheduler / drone_manager_unittest.py
blob6d9949d04f923cd9c838d4fb443af6b89c8ba28e
1 #!/usr/bin/python
3 import os, unittest
4 import common
5 from autotest_lib.client.common_lib import global_config
6 from autotest_lib.client.common_lib.test_utils import mock
7 from autotest_lib.scheduler import drone_manager, drone_utility, drones
8 from autotest_lib.scheduler import scheduler_config
10 class MockDrone(drones._AbstractDrone):
11 def __init__(self, name, active_processes=0, max_processes=10,
12 allowed_users=None):
13 super(MockDrone, self).__init__()
14 self.name = name
15 self.hostname = name
16 self.active_processes = active_processes
17 self.max_processes = max_processes
18 self.allowed_users = allowed_users
19 # maps method names list of tuples containing method arguments
20 self._recorded_calls = {'queue_call': [],
21 'send_file_to': []}
24 def queue_call(self, method, *args, **kwargs):
25 self._recorded_calls['queue_call'].append((method, args, kwargs))
28 def call(self, method, *args, **kwargs):
29 # don't bother differentiating between call() and queue_call()
30 return self.queue_call(method, *args, **kwargs)
33 def send_file_to(self, drone, source_path, destination_path,
34 can_fail=False):
35 self._recorded_calls['send_file_to'].append(
36 (drone, source_path, destination_path))
39 # method for use by tests
40 def _check_for_recorded_call(self, method_name, arguments):
41 recorded_arg_list = self._recorded_calls[method_name]
42 was_called = arguments in recorded_arg_list
43 if not was_called:
44 print 'Recorded args:', recorded_arg_list
45 print 'Expected:', arguments
46 return was_called
49 def was_call_queued(self, method, *args, **kwargs):
50 return self._check_for_recorded_call('queue_call',
51 (method, args, kwargs))
54 def was_file_sent(self, drone, source_path, destination_path):
55 return self._check_for_recorded_call('send_file_to',
56 (drone, source_path,
57 destination_path))
60 class DroneManager(unittest.TestCase):
61 _DRONE_INSTALL_DIR = '/drone/install/dir'
62 _DRONE_RESULTS_DIR = os.path.join(_DRONE_INSTALL_DIR, 'results')
63 _RESULTS_DIR = '/results/dir'
64 _SOURCE_PATH = 'source/path'
65 _DESTINATION_PATH = 'destination/path'
66 _WORKING_DIRECTORY = 'working/directory'
67 _USERNAME = 'my_user'
69 def setUp(self):
70 self.god = mock.mock_god()
71 self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
72 self._DRONE_INSTALL_DIR)
73 self.manager = drone_manager.DroneManager()
74 self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
76 # we don't want this to ever actually get called
77 self.god.stub_function(drones, 'get_drone')
78 # we don't want the DroneManager to go messing with global config
79 def do_nothing():
80 pass
81 self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
83 # set up some dummy drones
84 self.mock_drone = MockDrone('mock_drone')
85 self.manager._drones[self.mock_drone.name] = self.mock_drone
86 self.results_drone = MockDrone('results_drone', 0, 10)
87 self.manager._results_drone = self.results_drone
89 self.mock_drone_process = drone_manager.Process(self.mock_drone.name, 0)
92 def tearDown(self):
93 self.god.unstub_all()
96 def _test_choose_drone_for_execution_helper(self, processes_info_list,
97 requested_processes):
98 for index, process_info in enumerate(processes_info_list):
99 active_processes, max_processes = process_info
100 self.manager._enqueue_drone(MockDrone(index, active_processes,
101 max_processes))
103 return self.manager._choose_drone_for_execution(requested_processes,
104 self._USERNAME, None)
107 def test_choose_drone_for_execution(self):
108 drone = self._test_choose_drone_for_execution_helper([(1, 2), (0, 2)],
110 self.assertEquals(drone.name, 1)
113 def test_choose_drone_for_execution_some_full(self):
114 drone = self._test_choose_drone_for_execution_helper([(0, 1), (1, 3)],
116 self.assertEquals(drone.name, 1)
119 def test_choose_drone_for_execution_all_full(self):
120 drone = self._test_choose_drone_for_execution_helper([(2, 1), (3, 2)],
122 self.assertEquals(drone.name, 1)
125 def test_choose_drone_for_execution_all_full_same_percentage_capacity(self):
126 drone = self._test_choose_drone_for_execution_helper([(5, 3), (10, 6)],
128 self.assertEquals(drone.name, 1)
131 def test_user_restrictions(self):
132 # this drone is restricted to a different user
133 self.manager._enqueue_drone(MockDrone(1, max_processes=10,
134 allowed_users=['fakeuser']))
135 # this drone is allowed but has lower capacity
136 self.manager._enqueue_drone(MockDrone(2, max_processes=2,
137 allowed_users=[self._USERNAME]))
139 self.assertEquals(2,
140 self.manager.max_runnable_processes(self._USERNAME,
141 None))
142 drone = self.manager._choose_drone_for_execution(
143 1, username=self._USERNAME, drone_hostnames_allowed=None)
144 self.assertEquals(drone.name, 2)
147 def test_user_restrictions_with_full_drone(self):
148 # this drone is restricted to a different user
149 self.manager._enqueue_drone(MockDrone(1, max_processes=10,
150 allowed_users=['fakeuser']))
151 # this drone is allowed but is full
152 self.manager._enqueue_drone(MockDrone(2, active_processes=3,
153 max_processes=2,
154 allowed_users=[self._USERNAME]))
156 self.assertEquals(0,
157 self.manager.max_runnable_processes(self._USERNAME,
158 None))
159 drone = self.manager._choose_drone_for_execution(
160 1, username=self._USERNAME, drone_hostnames_allowed=None)
161 self.assertEquals(drone.name, 2)
164 def _setup_test_drone_restrictions(self, active_processes=0):
165 self.manager._enqueue_drone(MockDrone(
166 1, active_processes=active_processes, max_processes=10))
167 self.manager._enqueue_drone(MockDrone(
168 2, active_processes=active_processes, max_processes=5))
169 self.manager._enqueue_drone(MockDrone(
170 3, active_processes=active_processes, max_processes=2))
173 def test_drone_restrictions_allow_any(self):
174 self._setup_test_drone_restrictions()
175 self.assertEquals(10,
176 self.manager.max_runnable_processes(self._USERNAME,
177 None))
178 drone = self.manager._choose_drone_for_execution(
179 1, username=self._USERNAME, drone_hostnames_allowed=None)
180 self.assertEqual(drone.name, 1)
183 def test_drone_restrictions_under_capacity(self):
184 self._setup_test_drone_restrictions()
185 drone_hostnames_allowed = (2, 3)
186 self.assertEquals(
187 5, self.manager.max_runnable_processes(self._USERNAME,
188 drone_hostnames_allowed))
189 drone = self.manager._choose_drone_for_execution(
190 1, username=self._USERNAME,
191 drone_hostnames_allowed=drone_hostnames_allowed)
193 self.assertEqual(drone.name, 2)
196 def test_drone_restrictions_over_capacity(self):
197 self._setup_test_drone_restrictions(active_processes=6)
198 drone_hostnames_allowed = (2, 3)
199 self.assertEquals(
200 0, self.manager.max_runnable_processes(self._USERNAME,
201 drone_hostnames_allowed))
202 drone = self.manager._choose_drone_for_execution(
203 7, username=self._USERNAME,
204 drone_hostnames_allowed=drone_hostnames_allowed)
205 self.assertEqual(drone.name, 2)
208 def test_drone_restrictions_allow_none(self):
209 self._setup_test_drone_restrictions()
210 drone_hostnames_allowed = ()
211 self.assertEquals(
212 0, self.manager.max_runnable_processes(self._USERNAME,
213 drone_hostnames_allowed))
214 drone = self.manager._choose_drone_for_execution(
215 1, username=self._USERNAME,
216 drone_hostnames_allowed=drone_hostnames_allowed)
217 self.assertEqual(drone, None)
220 def test_initialize(self):
221 results_hostname = 'results_repo'
222 results_install_dir = '/results/install'
223 global_config.global_config.override_config_value(
224 scheduler_config.CONFIG_SECTION,
225 'results_host_installation_directory', results_install_dir)
227 (drones.get_drone.expect_call(self.mock_drone.name)
228 .and_return(self.mock_drone))
230 results_drone = MockDrone('results_drone')
231 self.god.stub_function(results_drone, 'set_autotest_install_dir')
232 drones.get_drone.expect_call(results_hostname).and_return(results_drone)
233 results_drone.set_autotest_install_dir.expect_call(results_install_dir)
235 self.manager.initialize(base_results_dir=self._RESULTS_DIR,
236 drone_hostnames=[self.mock_drone.name],
237 results_repository_hostname=results_hostname)
239 self.assert_(self.mock_drone.was_call_queued(
240 'initialize', self._DRONE_RESULTS_DIR + '/'))
241 self.god.check_playback()
244 def test_execute_command(self):
245 self.manager._enqueue_drone(self.mock_drone)
247 pidfile_name = 'my_pidfile'
248 log_file = 'log_file'
250 pidfile_id = self.manager.execute_command(
251 command=['test', drone_manager.WORKING_DIRECTORY],
252 working_directory=self._WORKING_DIRECTORY,
253 pidfile_name=pidfile_name,
254 num_processes=1,
255 log_file=log_file)
257 full_working_directory = os.path.join(self._DRONE_RESULTS_DIR,
258 self._WORKING_DIRECTORY)
259 self.assertEquals(pidfile_id.path,
260 os.path.join(full_working_directory, pidfile_name))
261 self.assert_(self.mock_drone.was_call_queued(
262 'execute_command', ['test', full_working_directory],
263 full_working_directory,
264 os.path.join(self._DRONE_RESULTS_DIR, log_file), pidfile_name))
267 def test_attach_file_to_execution(self):
268 self.manager._enqueue_drone(self.mock_drone)
270 contents = 'my\ncontents'
271 attached_path = self.manager.attach_file_to_execution(
272 self._WORKING_DIRECTORY, contents)
273 self.manager.execute_command(command=['test'],
274 working_directory=self._WORKING_DIRECTORY,
275 pidfile_name='mypidfile',
276 num_processes=1,
277 drone_hostnames_allowed=None)
279 self.assert_(self.mock_drone.was_call_queued(
280 'write_to_file',
281 os.path.join(self._DRONE_RESULTS_DIR, attached_path),
282 contents))
285 def test_copy_results_on_drone(self):
286 self.manager.copy_results_on_drone(self.mock_drone_process,
287 self._SOURCE_PATH,
288 self._DESTINATION_PATH)
289 self.assert_(self.mock_drone.was_call_queued(
290 'copy_file_or_directory',
291 os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
292 os.path.join(self._DRONE_RESULTS_DIR, self._DESTINATION_PATH)))
295 def test_copy_to_results_repository(self):
296 self.manager.copy_to_results_repository(self.mock_drone_process,
297 self._SOURCE_PATH)
298 self.assert_(self.mock_drone.was_file_sent(
299 self.results_drone,
300 os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
301 os.path.join(self._RESULTS_DIR, self._SOURCE_PATH)))
304 def test_write_lines_to_file(self):
305 file_path = 'file/path'
306 lines = ['line1', 'line2']
307 written_data = 'line1\nline2\n'
309 # write to results repository
310 self.manager.write_lines_to_file(file_path, lines)
311 self.assert_(self.results_drone.was_call_queued(
312 'write_to_file', os.path.join(self._RESULTS_DIR, file_path),
313 written_data))
315 # write to a drone
316 self.manager.write_lines_to_file(
317 file_path, lines, paired_with_process=self.mock_drone_process)
318 self.assert_(self.mock_drone.was_call_queued(
319 'write_to_file',
320 os.path.join(self._DRONE_RESULTS_DIR, file_path), written_data))
323 def test_pidfile_expiration(self):
324 self.god.stub_with(self.manager, '_get_max_pidfile_refreshes',
325 lambda: 0)
326 pidfile_id = self.manager.get_pidfile_id_from('tag', 'name')
327 self.manager.register_pidfile(pidfile_id)
328 self.manager._drop_old_pidfiles()
329 self.manager._drop_old_pidfiles()
330 self.assertFalse(self.manager._registered_pidfile_info)
333 if __name__ == '__main__':
334 unittest.main()