Update sdk/platform-tools to version 26.0.0.
[android_tools.git] / sdk / platform-tools / systrace / catapult / devil / devil / utils / cmd_helper_test.py
blob783c4137c8d0b15e87a81f76f2351a30b29af7ef
1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Tests for the cmd_helper module."""
8 import unittest
9 import subprocess
10 import time
12 from devil import devil_env
13 from devil.utils import cmd_helper
15 with devil_env.SysPath(devil_env.PYMOCK_PATH):
16 import mock # pylint: disable=import-error
19 class CmdHelperSingleQuoteTest(unittest.TestCase):
21 def testSingleQuote_basic(self):
22 self.assertEquals('hello',
23 cmd_helper.SingleQuote('hello'))
25 def testSingleQuote_withSpaces(self):
26 self.assertEquals("'hello world'",
27 cmd_helper.SingleQuote('hello world'))
29 def testSingleQuote_withUnsafeChars(self):
30 self.assertEquals("""'hello'"'"'; rm -rf /'""",
31 cmd_helper.SingleQuote("hello'; rm -rf /"))
33 def testSingleQuote_dontExpand(self):
34 test_string = 'hello $TEST_VAR'
35 cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string)
36 self.assertEquals(test_string,
37 cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
40 class CmdHelperDoubleQuoteTest(unittest.TestCase):
42 def testDoubleQuote_basic(self):
43 self.assertEquals('hello',
44 cmd_helper.DoubleQuote('hello'))
46 def testDoubleQuote_withSpaces(self):
47 self.assertEquals('"hello world"',
48 cmd_helper.DoubleQuote('hello world'))
50 def testDoubleQuote_withUnsafeChars(self):
51 self.assertEquals('''"hello\\"; rm -rf /"''',
52 cmd_helper.DoubleQuote('hello"; rm -rf /'))
54 def testSingleQuote_doExpand(self):
55 test_string = 'hello $TEST_VAR'
56 cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string)
57 self.assertEquals('hello world',
58 cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
61 class CmdHelperShinkToSnippetTest(unittest.TestCase):
63 def testShrinkToSnippet_noArgs(self):
64 self.assertEquals('foo',
65 cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar'))
66 self.assertEquals("'foo foo'",
67 cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar'))
68 self.assertEquals('"$a"\' bar\'',
69 cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo'))
70 self.assertEquals('\'foo \'"$a"',
71 cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar'))
72 self.assertEquals('foo"$a"',
73 cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar'))
75 def testShrinkToSnippet_singleArg(self):
76 self.assertEquals("foo ''",
77 cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar'))
78 self.assertEquals("foo foo",
79 cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar'))
80 self.assertEquals('"$a" "$a"',
81 cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo'))
82 self.assertEquals('foo "$a""$a"',
83 cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar'))
84 self.assertEquals('foo "$a"\' \'"$a"',
85 cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar'))
86 self.assertEquals('foo "$a""$a"\' \'',
87 cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar'))
88 self.assertEquals('foo \' \'"$a""$a"\' \'',
89 cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar'))
92 _DEFAULT = 'DEFAULT'
95 class _ProcessOutputEvent(object):
97 def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT):
98 self.select_fds = select_fds
99 self.read_contents = read_contents
100 self.ts = ts
103 class _MockProcess(object):
105 def __init__(self, output_sequence=None, return_value=0):
107 # Arbitrary.
108 fake_stdout_fileno = 25
110 self.mock_proc = mock.MagicMock(spec=subprocess.Popen)
111 self.mock_proc.stdout = mock.MagicMock()
112 self.mock_proc.stdout.fileno = mock.MagicMock(
113 return_value=fake_stdout_fileno)
114 self.mock_proc.returncode = None
116 self._return_value = return_value
118 # This links the behavior of os.read, select.select, time.time, and
119 # <process>.poll. The output sequence can be thought of as a list of
120 # return values for select.select with corresponding return values for
121 # the other calls at any time between that select call and the following
122 # one. We iterate through the sequence only on calls to select.select.
124 # os.read is a special case, though, where we only return a given chunk
125 # of data *once* after a given call to select.
127 if not output_sequence:
128 output_sequence = []
130 # Use an leading element to make the iteration logic work.
131 initial_seq_element = _ProcessOutputEvent(
132 _DEFAULT, '',
133 output_sequence[0].ts if output_sequence else _DEFAULT)
134 output_sequence.insert(0, initial_seq_element)
136 for o in output_sequence:
137 if o.select_fds == _DEFAULT:
138 if o.read_contents is None:
139 o.select_fds = []
140 else:
141 o.select_fds = [fake_stdout_fileno]
142 if o.ts == _DEFAULT:
143 o.ts = time.time()
144 self._output_sequence = output_sequence
146 self._output_seq_index = 0
147 self._read_flags = [False] * len(output_sequence)
149 def read_side_effect(*_args, **_kwargs):
150 if self._read_flags[self._output_seq_index]:
151 return None
152 self._read_flags[self._output_seq_index] = True
153 return self._output_sequence[self._output_seq_index].read_contents
155 def select_side_effect(*_args, **_kwargs):
156 if self._output_seq_index is None:
157 self._output_seq_index = 0
158 else:
159 self._output_seq_index += 1
160 return (self._output_sequence[self._output_seq_index].select_fds,
161 None, None)
163 def time_side_effect(*_args, **_kwargs):
164 return self._output_sequence[self._output_seq_index].ts
166 def poll_side_effect(*_args, **_kwargs):
167 if self._output_seq_index >= len(self._output_sequence) - 1:
168 self.mock_proc.returncode = self._return_value
169 return self.mock_proc.returncode
171 mock_read = mock.MagicMock(side_effect=read_side_effect)
172 mock_select = mock.MagicMock(side_effect=select_side_effect)
173 mock_time = mock.MagicMock(side_effect=time_side_effect)
174 self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect)
176 # Set up but *do not start* the mocks.
177 self._mocks = [
178 mock.patch('fcntl.fcntl'),
179 mock.patch('os.read', new=mock_read),
180 mock.patch('select.select', new=mock_select),
181 mock.patch('time.time', new=mock_time),
184 def __enter__(self):
185 for m in self._mocks:
186 m.__enter__()
187 return self.mock_proc
189 def __exit__(self, exc_type, exc_val, exc_tb):
190 for m in reversed(self._mocks):
191 m.__exit__(exc_type, exc_val, exc_tb)
194 class CmdHelperIterCmdOutputLinesTest(unittest.TestCase):
195 """Test IterCmdOutputLines with some calls to the unix 'seq' command."""
197 # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it
198 # can mock the process.
199 # pylint: disable=protected-access
201 _SIMPLE_OUTPUT_SEQUENCE = [
202 _ProcessOutputEvent(read_contents='1\n2\n'),
205 def testIterCmdOutputLines_success(self):
206 with _MockProcess(
207 output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc:
208 for num, line in enumerate(
209 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
210 self.assertEquals(num, int(line))
212 def testIterCmdOutputLines_exitStatusFail(self):
213 with self.assertRaises(subprocess.CalledProcessError):
214 with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
215 return_value=1) as mock_proc:
216 for num, line in enumerate(
217 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
218 self.assertEquals(num, int(line))
219 # after reading all the output we get an exit status of 1
221 def testIterCmdOutputLines_exitStatusIgnored(self):
222 with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
223 return_value=1) as mock_proc:
224 for num, line in enumerate(
225 cmd_helper._IterCmdOutputLines(
226 mock_proc, 'mock_proc', check_status=False),
228 self.assertEquals(num, int(line))
230 def testIterCmdOutputLines_exitStatusSkipped(self):
231 with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
232 return_value=1) as mock_proc:
233 for num, line in enumerate(
234 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
235 self.assertEquals(num, int(line))
236 # no exception will be raised because we don't attempt to read past
237 # the end of the output and, thus, the status never gets checked
238 if num == 2:
239 break
241 def testIterCmdOutputLines_delay(self):
242 output_sequence = [
243 _ProcessOutputEvent(read_contents='1\n2\n', ts=1),
244 _ProcessOutputEvent(read_contents=None, ts=2),
245 _ProcessOutputEvent(read_contents='Awake', ts=10),
247 with _MockProcess(output_sequence=output_sequence) as mock_proc:
248 for num, line in enumerate(
249 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc',
250 iter_timeout=5), 1):
251 if num <= 2:
252 self.assertEquals(num, int(line))
253 elif num == 3:
254 self.assertEquals(None, line)
255 elif num == 4:
256 self.assertEquals('Awake', line)
257 else:
258 self.fail()
261 if __name__ == '__main__':
262 unittest.main()