2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Tests for the cmd_helper module."""
12 from devil
import devil_env
13 from devil
.utils
import cmd_helper
15 with devil_env
.SysPath(devil_env
.PYMOCK_PATH
):
16 import mock
# pylint: disable=import-error
19 class CmdHelperSingleQuoteTest(unittest
.TestCase
):
21 def testSingleQuote_basic(self
):
22 self
.assertEquals('hello',
23 cmd_helper
.SingleQuote('hello'))
25 def testSingleQuote_withSpaces(self
):
26 self
.assertEquals("'hello world'",
27 cmd_helper
.SingleQuote('hello world'))
29 def testSingleQuote_withUnsafeChars(self
):
30 self
.assertEquals("""'hello'"'"'; rm -rf /'""",
31 cmd_helper
.SingleQuote("hello'; rm -rf /"))
33 def testSingleQuote_dontExpand(self
):
34 test_string
= 'hello $TEST_VAR'
35 cmd
= 'TEST_VAR=world; echo %s' % cmd_helper
.SingleQuote(test_string
)
36 self
.assertEquals(test_string
,
37 cmd_helper
.GetCmdOutput(cmd
, shell
=True).rstrip())
40 class CmdHelperDoubleQuoteTest(unittest
.TestCase
):
42 def testDoubleQuote_basic(self
):
43 self
.assertEquals('hello',
44 cmd_helper
.DoubleQuote('hello'))
46 def testDoubleQuote_withSpaces(self
):
47 self
.assertEquals('"hello world"',
48 cmd_helper
.DoubleQuote('hello world'))
50 def testDoubleQuote_withUnsafeChars(self
):
51 self
.assertEquals('''"hello\\"; rm -rf /"''',
52 cmd_helper
.DoubleQuote('hello"; rm -rf /'))
54 def testSingleQuote_doExpand(self
):
55 test_string
= 'hello $TEST_VAR'
56 cmd
= 'TEST_VAR=world; echo %s' % cmd_helper
.DoubleQuote(test_string
)
57 self
.assertEquals('hello world',
58 cmd_helper
.GetCmdOutput(cmd
, shell
=True).rstrip())
61 class CmdHelperShinkToSnippetTest(unittest
.TestCase
):
63 def testShrinkToSnippet_noArgs(self
):
64 self
.assertEquals('foo',
65 cmd_helper
.ShrinkToSnippet(['foo'], 'a', 'bar'))
66 self
.assertEquals("'foo foo'",
67 cmd_helper
.ShrinkToSnippet(['foo foo'], 'a', 'bar'))
68 self
.assertEquals('"$a"\' bar\'',
69 cmd_helper
.ShrinkToSnippet(['foo bar'], 'a', 'foo'))
70 self
.assertEquals('\'foo \'"$a"',
71 cmd_helper
.ShrinkToSnippet(['foo bar'], 'a', 'bar'))
72 self
.assertEquals('foo"$a"',
73 cmd_helper
.ShrinkToSnippet(['foobar'], 'a', 'bar'))
75 def testShrinkToSnippet_singleArg(self
):
76 self
.assertEquals("foo ''",
77 cmd_helper
.ShrinkToSnippet(['foo', ''], 'a', 'bar'))
78 self
.assertEquals("foo foo",
79 cmd_helper
.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar'))
80 self
.assertEquals('"$a" "$a"',
81 cmd_helper
.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo'))
82 self
.assertEquals('foo "$a""$a"',
83 cmd_helper
.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar'))
84 self
.assertEquals('foo "$a"\' \'"$a"',
85 cmd_helper
.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar'))
86 self
.assertEquals('foo "$a""$a"\' \'',
87 cmd_helper
.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar'))
88 self
.assertEquals('foo \' \'"$a""$a"\' \'',
89 cmd_helper
.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar'))
95 class _ProcessOutputEvent(object):
97 def __init__(self
, select_fds
=_DEFAULT
, read_contents
=None, ts
=_DEFAULT
):
98 self
.select_fds
= select_fds
99 self
.read_contents
= read_contents
103 class _MockProcess(object):
105 def __init__(self
, output_sequence
=None, return_value
=0):
108 fake_stdout_fileno
= 25
110 self
.mock_proc
= mock
.MagicMock(spec
=subprocess
.Popen
)
111 self
.mock_proc
.stdout
= mock
.MagicMock()
112 self
.mock_proc
.stdout
.fileno
= mock
.MagicMock(
113 return_value
=fake_stdout_fileno
)
114 self
.mock_proc
.returncode
= None
116 self
._return
_value
= return_value
118 # This links the behavior of os.read, select.select, time.time, and
119 # <process>.poll. The output sequence can be thought of as a list of
120 # return values for select.select with corresponding return values for
121 # the other calls at any time between that select call and the following
122 # one. We iterate through the sequence only on calls to select.select.
124 # os.read is a special case, though, where we only return a given chunk
125 # of data *once* after a given call to select.
127 if not output_sequence
:
130 # Use an leading element to make the iteration logic work.
131 initial_seq_element
= _ProcessOutputEvent(
133 output_sequence
[0].ts
if output_sequence
else _DEFAULT
)
134 output_sequence
.insert(0, initial_seq_element
)
136 for o
in output_sequence
:
137 if o
.select_fds
== _DEFAULT
:
138 if o
.read_contents
is None:
141 o
.select_fds
= [fake_stdout_fileno
]
144 self
._output
_sequence
= output_sequence
146 self
._output
_seq
_index
= 0
147 self
._read
_flags
= [False] * len(output_sequence
)
149 def read_side_effect(*_args
, **_kwargs
):
150 if self
._read
_flags
[self
._output
_seq
_index
]:
152 self
._read
_flags
[self
._output
_seq
_index
] = True
153 return self
._output
_sequence
[self
._output
_seq
_index
].read_contents
155 def select_side_effect(*_args
, **_kwargs
):
156 if self
._output
_seq
_index
is None:
157 self
._output
_seq
_index
= 0
159 self
._output
_seq
_index
+= 1
160 return (self
._output
_sequence
[self
._output
_seq
_index
].select_fds
,
163 def time_side_effect(*_args
, **_kwargs
):
164 return self
._output
_sequence
[self
._output
_seq
_index
].ts
166 def poll_side_effect(*_args
, **_kwargs
):
167 if self
._output
_seq
_index
>= len(self
._output
_sequence
) - 1:
168 self
.mock_proc
.returncode
= self
._return
_value
169 return self
.mock_proc
.returncode
171 mock_read
= mock
.MagicMock(side_effect
=read_side_effect
)
172 mock_select
= mock
.MagicMock(side_effect
=select_side_effect
)
173 mock_time
= mock
.MagicMock(side_effect
=time_side_effect
)
174 self
.mock_proc
.poll
= mock
.MagicMock(side_effect
=poll_side_effect
)
176 # Set up but *do not start* the mocks.
178 mock
.patch('fcntl.fcntl'),
179 mock
.patch('os.read', new
=mock_read
),
180 mock
.patch('select.select', new
=mock_select
),
181 mock
.patch('time.time', new
=mock_time
),
185 for m
in self
._mocks
:
187 return self
.mock_proc
189 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
190 for m
in reversed(self
._mocks
):
191 m
.__exit
__(exc_type
, exc_val
, exc_tb
)
194 class CmdHelperIterCmdOutputLinesTest(unittest
.TestCase
):
195 """Test IterCmdOutputLines with some calls to the unix 'seq' command."""
197 # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it
198 # can mock the process.
199 # pylint: disable=protected-access
201 _SIMPLE_OUTPUT_SEQUENCE
= [
202 _ProcessOutputEvent(read_contents
='1\n2\n'),
205 def testIterCmdOutputLines_success(self
):
207 output_sequence
=self
._SIMPLE
_OUTPUT
_SEQUENCE
) as mock_proc
:
208 for num
, line
in enumerate(
209 cmd_helper
._IterCmdOutputLines
(mock_proc
, 'mock_proc'), 1):
210 self
.assertEquals(num
, int(line
))
212 def testIterCmdOutputLines_exitStatusFail(self
):
213 with self
.assertRaises(subprocess
.CalledProcessError
):
214 with
_MockProcess(output_sequence
=self
._SIMPLE
_OUTPUT
_SEQUENCE
,
215 return_value
=1) as mock_proc
:
216 for num
, line
in enumerate(
217 cmd_helper
._IterCmdOutputLines
(mock_proc
, 'mock_proc'), 1):
218 self
.assertEquals(num
, int(line
))
219 # after reading all the output we get an exit status of 1
221 def testIterCmdOutputLines_exitStatusIgnored(self
):
222 with
_MockProcess(output_sequence
=self
._SIMPLE
_OUTPUT
_SEQUENCE
,
223 return_value
=1) as mock_proc
:
224 for num
, line
in enumerate(
225 cmd_helper
._IterCmdOutputLines
(
226 mock_proc
, 'mock_proc', check_status
=False),
228 self
.assertEquals(num
, int(line
))
230 def testIterCmdOutputLines_exitStatusSkipped(self
):
231 with
_MockProcess(output_sequence
=self
._SIMPLE
_OUTPUT
_SEQUENCE
,
232 return_value
=1) as mock_proc
:
233 for num
, line
in enumerate(
234 cmd_helper
._IterCmdOutputLines
(mock_proc
, 'mock_proc'), 1):
235 self
.assertEquals(num
, int(line
))
236 # no exception will be raised because we don't attempt to read past
237 # the end of the output and, thus, the status never gets checked
241 def testIterCmdOutputLines_delay(self
):
243 _ProcessOutputEvent(read_contents
='1\n2\n', ts
=1),
244 _ProcessOutputEvent(read_contents
=None, ts
=2),
245 _ProcessOutputEvent(read_contents
='Awake', ts
=10),
247 with
_MockProcess(output_sequence
=output_sequence
) as mock_proc
:
248 for num
, line
in enumerate(
249 cmd_helper
._IterCmdOutputLines
(mock_proc
, 'mock_proc',
252 self
.assertEquals(num
, int(line
))
254 self
.assertEquals(None, line
)
256 self
.assertEquals('Awake', line
)
261 if __name__
== '__main__':