KVM test: usb cdrom support
[autotest-zwu.git] / server / subcommand_unittest.py
blobd8e8b68ec3f17e6b721d5466bf07b4208d0a17a5
1 #!/usr/bin/python
2 # Copyright 2009 Google Inc. Released under the GPL v2
4 import unittest
6 import common
7 from autotest_lib.client.common_lib.test_utils import mock
8 from autotest_lib.server import subcommand
11 def _create_subcommand(func, args):
12 # to avoid __init__
13 class wrapper(subcommand.subcommand):
14 def __init__(self, func, args):
15 self.func = func
16 self.args = args
17 self.subdir = None
18 self.debug = None
19 self.pid = None
20 self.returncode = None
21 self.lambda_function = lambda: func(*args)
23 return wrapper(func, args)
26 class subcommand_test(unittest.TestCase):
27 def setUp(self):
28 self.god = mock.mock_god()
31 def tearDown(self):
32 self.god.unstub_all()
33 # cleanup the hooks
34 subcommand.subcommand.fork_hooks = []
35 subcommand.subcommand.join_hooks = []
38 def test_create(self):
39 def check_attributes(cmd, func, args, subdir=None, debug=None,
40 pid=None, returncode=None, fork_hooks=[],
41 join_hooks=[]):
42 self.assertEquals(cmd.func, func)
43 self.assertEquals(cmd.args, args)
44 self.assertEquals(cmd.subdir, subdir)
45 self.assertEquals(cmd.debug, debug)
46 self.assertEquals(cmd.pid, pid)
47 self.assertEquals(cmd.returncode, returncode)
48 self.assertEquals(cmd.fork_hooks, fork_hooks)
49 self.assertEquals(cmd.join_hooks, join_hooks)
51 def func(arg1, arg2):
52 pass
54 cmd = subcommand.subcommand(func, (2, 3))
55 check_attributes(cmd, func, (2, 3))
56 self.god.check_playback()
58 self.god.stub_function(subcommand.os.path, 'abspath')
59 self.god.stub_function(subcommand.os.path, 'exists')
60 self.god.stub_function(subcommand.os, 'mkdir')
62 subcommand.os.path.abspath.expect_call('dir').and_return('/foo/dir')
63 subcommand.os.path.exists.expect_call('/foo/dir').and_return(False)
64 subcommand.os.mkdir.expect_call('/foo/dir')
66 (subcommand.os.path.exists.expect_call('/foo/dir/debug')
67 .and_return(False))
68 subcommand.os.mkdir.expect_call('/foo/dir/debug')
70 cmd = subcommand.subcommand(func, (2, 3), subdir='dir')
71 check_attributes(cmd, func, (2, 3), subdir='/foo/dir',
72 debug='/foo/dir/debug')
73 self.god.check_playback()
76 def _setup_fork_start_parent(self):
77 self.god.stub_function(subcommand.os, 'fork')
79 subcommand.os.fork.expect_call().and_return(1000)
80 func = self.god.create_mock_function('func')
81 cmd = _create_subcommand(func, [])
82 cmd.fork_start()
84 return cmd
87 def test_fork_start_parent(self):
88 cmd = self._setup_fork_start_parent()
90 self.assertEquals(cmd.pid, 1000)
91 self.god.check_playback()
94 def _setup_fork_start_child(self):
95 self.god.stub_function(subcommand.os, 'pipe')
96 self.god.stub_function(subcommand.os, 'fork')
97 self.god.stub_function(subcommand.os, 'close')
98 self.god.stub_function(subcommand.os, 'write')
99 self.god.stub_function(subcommand.cPickle, 'dumps')
100 self.god.stub_function(subcommand.os, '_exit')
103 def test_fork_start_child(self):
104 self._setup_fork_start_child()
106 func = self.god.create_mock_function('func')
107 fork_hook = self.god.create_mock_function('fork_hook')
108 join_hook = self.god.create_mock_function('join_hook')
110 subcommand.subcommand.register_fork_hook(fork_hook)
111 subcommand.subcommand.register_join_hook(join_hook)
112 cmd = _create_subcommand(func, (1, 2))
114 subcommand.os.pipe.expect_call().and_return((10, 20))
115 subcommand.os.fork.expect_call().and_return(0)
116 subcommand.os.close.expect_call(10)
117 fork_hook.expect_call(cmd)
118 func.expect_call(1, 2).and_return(True)
119 subcommand.cPickle.dumps.expect_call(True,
120 subcommand.cPickle.HIGHEST_PROTOCOL).and_return('True')
121 subcommand.os.write.expect_call(20, 'True')
122 subcommand.os.close.expect_call(20)
123 join_hook.expect_call(cmd)
124 subcommand.os._exit.expect_call(0)
126 cmd.fork_start()
127 self.god.check_playback()
130 def test_fork_start_child_error(self):
131 self._setup_fork_start_child()
132 self.god.stub_function(subcommand.logging, 'exception')
134 func = self.god.create_mock_function('func')
135 cmd = _create_subcommand(func, (1, 2))
136 error = Exception('some error')
138 subcommand.os.pipe.expect_call().and_return((10, 20))
139 subcommand.os.fork.expect_call().and_return(0)
140 subcommand.os.close.expect_call(10)
141 func.expect_call(1, 2).and_raises(error)
142 subcommand.logging.exception.expect_call('function failed')
143 subcommand.cPickle.dumps.expect_call(error,
144 subcommand.cPickle.HIGHEST_PROTOCOL).and_return('error')
145 subcommand.os.write.expect_call(20, 'error')
146 subcommand.os.close.expect_call(20)
147 subcommand.os._exit.expect_call(1)
149 cmd.fork_start()
150 self.god.check_playback()
153 def _setup_poll(self):
154 cmd = self._setup_fork_start_parent()
155 self.god.stub_function(subcommand.os, 'waitpid')
156 return cmd
159 def test_poll_running(self):
160 cmd = self._setup_poll()
162 (subcommand.os.waitpid.expect_call(1000, subcommand.os.WNOHANG)
163 .and_raises(subcommand.os.error('waitpid')))
164 self.assertEquals(cmd.poll(), None)
165 self.god.check_playback()
168 def test_poll_finished_success(self):
169 cmd = self._setup_poll()
171 (subcommand.os.waitpid.expect_call(1000, subcommand.os.WNOHANG)
172 .and_return((1000, 0)))
173 self.assertEquals(cmd.poll(), 0)
174 self.god.check_playback()
177 def test_poll_finished_failure(self):
178 cmd = self._setup_poll()
179 self.god.stub_function(cmd, '_handle_exitstatus')
181 (subcommand.os.waitpid.expect_call(1000, subcommand.os.WNOHANG)
182 .and_return((1000, 10)))
183 cmd._handle_exitstatus.expect_call(10).and_raises(Exception('fail'))
185 self.assertRaises(Exception, cmd.poll)
186 self.god.check_playback()
189 def test_wait_success(self):
190 cmd = self._setup_poll()
192 (subcommand.os.waitpid.expect_call(1000, 0)
193 .and_return((1000, 0)))
195 self.assertEquals(cmd.wait(), 0)
196 self.god.check_playback()
199 def test_wait_failure(self):
200 cmd = self._setup_poll()
201 self.god.stub_function(cmd, '_handle_exitstatus')
203 (subcommand.os.waitpid.expect_call(1000, 0)
204 .and_return((1000, 10)))
206 cmd._handle_exitstatus.expect_call(10).and_raises(Exception('fail'))
207 self.assertRaises(Exception, cmd.wait)
208 self.god.check_playback()
211 def _setup_fork_waitfor(self):
212 cmd = self._setup_fork_start_parent()
213 self.god.stub_function(cmd, 'wait')
214 self.god.stub_function(cmd, 'poll')
215 self.god.stub_function(subcommand.time, 'time')
216 self.god.stub_function(subcommand.time, 'sleep')
217 self.god.stub_function(subcommand.utils, 'nuke_pid')
219 return cmd
222 def test_fork_waitfor_no_timeout(self):
223 cmd = self._setup_fork_waitfor()
225 cmd.wait.expect_call().and_return(0)
227 self.assertEquals(cmd.fork_waitfor(), 0)
228 self.god.check_playback()
231 def test_fork_waitfor_success(self):
232 cmd = self._setup_fork_waitfor()
233 self.god.stub_function(cmd, 'wait')
234 timeout = 10
236 subcommand.time.time.expect_call().and_return(1)
237 for i in xrange(timeout):
238 subcommand.time.time.expect_call().and_return(i + 1)
239 cmd.poll.expect_call().and_return(None)
240 subcommand.time.sleep.expect_call(1)
241 subcommand.time.time.expect_call().and_return(i + 2)
242 cmd.poll.expect_call().and_return(0)
244 self.assertEquals(cmd.fork_waitfor(timeout=timeout), 0)
245 self.god.check_playback()
248 def test_fork_waitfor_failure(self):
249 cmd = self._setup_fork_waitfor()
250 self.god.stub_function(cmd, 'wait')
251 timeout = 10
253 subcommand.time.time.expect_call().and_return(1)
254 for i in xrange(timeout):
255 subcommand.time.time.expect_call().and_return(i + 1)
256 cmd.poll.expect_call().and_return(None)
257 subcommand.time.sleep.expect_call(1)
258 subcommand.time.time.expect_call().and_return(i + 3)
259 subcommand.utils.nuke_pid.expect_call(cmd.pid)
261 self.assertEquals(cmd.fork_waitfor(timeout=timeout), None)
262 self.god.check_playback()
265 class parallel_test(unittest.TestCase):
266 def setUp(self):
267 self.god = mock.mock_god()
268 self.god.stub_function(subcommand.cPickle, 'load')
271 def tearDown(self):
272 self.god.unstub_all()
275 def _get_cmd(self, func, args):
276 cmd = _create_subcommand(func, args)
277 cmd.result_pickle = self.god.create_mock_class(file, 'file')
278 return self.god.create_mock_class(cmd, 'subcommand')
281 def _get_tasklist(self):
282 return [self._get_cmd(lambda x: x * 2, (3,)),
283 self._get_cmd(lambda: None, [])]
286 def _setup_common(self):
287 tasklist = self._get_tasklist()
289 for task in tasklist:
290 task.fork_start.expect_call()
292 return tasklist
295 def test_success(self):
296 tasklist = self._setup_common()
298 for task in tasklist:
299 task.fork_waitfor.expect_call(timeout=None).and_return(0)
300 (subcommand.cPickle.load.expect_call(task.result_pickle)
301 .and_return(6))
302 task.result_pickle.close.expect_call()
304 subcommand.parallel(tasklist)
305 self.god.check_playback()
308 def test_failure(self):
309 tasklist = self._setup_common()
311 for task in tasklist:
312 task.fork_waitfor.expect_call(timeout=None).and_return(1)
313 (subcommand.cPickle.load.expect_call(task.result_pickle)
314 .and_return(6))
315 task.result_pickle.close.expect_call()
317 self.assertRaises(subcommand.error.AutoservError, subcommand.parallel,
318 tasklist)
319 self.god.check_playback()
322 def test_timeout(self):
323 self.god.stub_function(subcommand.time, 'time')
325 tasklist = self._setup_common()
326 timeout = 10
328 subcommand.time.time.expect_call().and_return(1)
330 for task in tasklist:
331 subcommand.time.time.expect_call().and_return(1)
332 task.fork_waitfor.expect_call(timeout=timeout).and_return(None)
333 (subcommand.cPickle.load.expect_call(task.result_pickle)
334 .and_return(6))
335 task.result_pickle.close.expect_call()
337 self.assertRaises(subcommand.error.AutoservError, subcommand.parallel,
338 tasklist, timeout=timeout)
339 self.god.check_playback()
342 def test_return_results(self):
343 tasklist = self._setup_common()
345 tasklist[0].fork_waitfor.expect_call(timeout=None).and_return(0)
346 (subcommand.cPickle.load.expect_call(tasklist[0].result_pickle)
347 .and_return(6))
348 tasklist[0].result_pickle.close.expect_call()
350 error = Exception('fail')
351 tasklist[1].fork_waitfor.expect_call(timeout=None).and_return(1)
352 (subcommand.cPickle.load.expect_call(tasklist[1].result_pickle)
353 .and_return(error))
354 tasklist[1].result_pickle.close.expect_call()
356 self.assertEquals(subcommand.parallel(tasklist, return_results=True),
357 [6, error])
358 self.god.check_playback()
361 class test_parallel_simple(unittest.TestCase):
362 def setUp(self):
363 self.god = mock.mock_god()
364 self.god.stub_function(subcommand, 'parallel')
365 ctor = self.god.create_mock_function('subcommand')
366 self.god.stub_with(subcommand, 'subcommand', ctor)
369 def tearDown(self):
370 self.god.unstub_all()
373 def test_simple_success(self):
374 func = self.god.create_mock_function('func')
376 func.expect_call(3)
378 subcommand.parallel_simple(func, (3,))
379 self.god.check_playback()
382 def test_simple_failure(self):
383 func = self.god.create_mock_function('func')
385 error = Exception('fail')
386 func.expect_call(3).and_raises(error)
388 self.assertRaises(Exception, subcommand.parallel_simple, func, (3,))
389 self.god.check_playback()
392 def test_simple_return_value(self):
393 func = self.god.create_mock_function('func')
395 result = 1000
396 func.expect_call(3).and_return(result)
398 self.assertEquals(subcommand.parallel_simple(func, (3,),
399 return_results=True),
400 [result])
401 self.god.check_playback()
404 def _setup_many(self, count, log):
405 func = self.god.create_mock_function('func')
407 args = []
408 cmds = []
409 for i in xrange(count):
410 arg = i + 1
411 args.append(arg)
413 if log:
414 subdir = str(arg)
415 else:
416 subdir = None
418 cmd = object()
419 cmds.append(cmd)
421 (subcommand.subcommand.expect_call(func, [arg], subdir)
422 .and_return(cmd))
424 subcommand.parallel.expect_call(cmds, None, return_results=False)
425 return func, args
428 def test_passthrough(self):
429 func, args = self._setup_many(4, True)
431 subcommand.parallel_simple(func, args)
432 self.god.check_playback()
435 def test_nolog(self):
436 func, args = self._setup_many(3, False)
438 subcommand.parallel_simple(func, args, log=False)
439 self.god.check_playback()
442 if __name__ == '__main__':
443 unittest.main()