1 # -*- coding: utf-8; -*-
3 # test/test_dputhelper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # This is free software, and you are welcome to redistribute it under
7 # certain conditions; see the end of this file for copyright
8 # information, grant of license, and disclaimer of warranty.
10 """ Unit tests for ‘dput.helper.dputhelper’ module. """
12 from __future__
import (absolute_import
, unicode_literals
)
25 import testtools
.matchers
29 from dput
.helper
import dputhelper
35 patch_system_interfaces
,
37 EXIT_STATUS_SUCCESS
, EXIT_STATUS_FAILURE
,
38 EXIT_STATUS_COMMAND_NOT_FOUND
,
39 patch_subprocess_check_call
,
44 class check_call_TestCase(
45 testscenarios
.WithScenarios
,
47 """ Test cases for `check_call` function. """
49 default_args
= collections
.OrderedDict([
50 ('args', ["arg-{}".format(n
) for n
in range(5)]),
55 'test_args': default_args
.copy(),
56 'subprocess_check_call_scenario_name': 'success',
57 'expected_exit_status': EXIT_STATUS_SUCCESS
,
60 'test_args': default_args
.copy(),
61 'subprocess_check_call_scenario_name': 'failure',
62 'expected_exit_status': EXIT_STATUS_FAILURE
,
63 'expected_output': textwrap
.dedent("""\
64 Warning: The execution of '...' as
66 returned a nonzero exit code.
70 'test_args': default_args
.copy(),
71 'subprocess_check_call_scenario_name': 'not_found',
72 'expected_exit_status': EXIT_STATUS_COMMAND_NOT_FOUND
,
73 'expected_output': textwrap
.dedent("""\
74 Error: Failed to execute '...'.
75 The file may not exist or not be executable.
81 """ Set up test fixtures. """
82 super(check_call_TestCase
, self
).setUp()
83 patch_system_interfaces(self
)
85 patch_subprocess_check_call(self
)
87 self
.set_subprocess_double()
89 def set_subprocess_double(self
):
90 """ Set the test double for the subprocess. """
91 command_file_path
= self
.test_args
['args'][0]
92 command_argv
= self
.test_args
['args']
93 double
= SubprocessDouble(command_file_path
, command_argv
)
94 double
.register_for_testcase(self
)
95 double
.set_subprocess_check_call_scenario(
96 self
.subprocess_check_call_scenario_name
)
97 self
.subprocess_double
= double
99 def test_calls_os_spawnv_with_specified_args(self
):
100 """ Should call `subprocess.check_call` with specified arguments. """
101 dputhelper
.check_call(*self
.test_args
.values())
102 subprocess
.check_call
.assert_called_with(*self
.test_args
.values())
104 def test_returns_expected_exit_status(self
):
105 """ Should return expected exit status for subprocess. """
106 exit_status
= dputhelper
.check_call(*self
.test_args
.values())
107 self
.assertEqual(self
.expected_exit_status
, exit_status
)
109 def test_emits_expected_output(self
):
110 """ Should emit the expected output messages. """
111 if not hasattr(self
, 'expected_output'):
112 self
.expected_output
= ""
113 dputhelper
.check_call(*self
.test_args
.values())
115 sys
.stderr
.getvalue(),
116 testtools
.matchers
.DocTestMatches(
117 self
.expected_output
, flags
=doctest
.ELLIPSIS
))
120 class TimestampFile_TestCase(testtools
.TestCase
):
121 """ Base for test cases for the `TimestampFile` class. """
123 scenarios
= NotImplemented
126 """ Set up test fixtures. """
127 super(TimestampFile_TestCase
, self
).setUp()
129 patch_time_time(self
, itertools
.count(1))
131 self
.test_file
= StringIO()
132 self
.instance
= dputhelper
.TimestampFile(self
.test_file
)
135 class TimestampFile_InstanceTestCase(
136 testscenarios
.WithScenarios
,
137 TimestampFile_TestCase
):
138 """ Test cases for `TimestampFile` instance creation. """
144 def test_has_specified_file(self
):
145 """ Should have specified file object as `f` attribute. """
146 self
.assertIs(self
.test_file
, self
.instance
.f
)
148 def test_has_attributes_from_component_file(self
):
149 """ Should have attributes directly from component file. """
152 'mode', 'name', 'encoding',
153 'readable', 'seekable', 'writable',
154 'read', 'seek', 'tell',
156 for attr_name
in attr_names
:
157 expected_attr_value
= getattr(self
.test_file
, attr_name
, None)
159 getattr(self
.instance
, attr_name
, None),
160 testtools
.matchers
.Equals(expected_attr_value
))
163 class TimestampFile_write_TestCase(
164 testscenarios
.WithScenarios
,
165 TimestampFile_TestCase
):
166 """ Test cases for `TimestampFile.write` method. """
171 'expected_lines': [],
174 'test_output': textwrap
.dedent("""\
175 Lorem ipsum, dolor sit amet.
178 "1: Lorem ipsum, dolor sit amet.",
183 'test_output': textwrap
.dedent("""\
184 Lorem ipsum, dolor sit amet,
185 consectetur adipiscing elit.
186 Integer non pulvinar risus, sed malesuada diam.
189 "1: Lorem ipsum, dolor sit amet,",
190 "2: consectetur adipiscing elit.",
191 "3: Integer non pulvinar risus, sed malesuada diam.",
195 ('lines-two-with-trail', {
196 'test_output': textwrap
.dedent("""\
197 Lorem ipsum, dolor sit amet,
198 consectetur adipiscing elit.
199 Integer non pulvinar risus"""),
201 "1: Lorem ipsum, dolor sit amet,",
202 "2: consectetur adipiscing elit.",
203 "3: Integer non pulvinar risus",
208 def test_has_expected_content_for_output(self
):
209 """ Should have expected content for specified `write` output. """
210 self
.instance
.write(self
.test_output
)
211 expected_lines
= self
.expected_lines
212 if self
.expected_lines
:
213 if self
.expected_lines
[-1]:
214 # Expecting an unterminated final line.
215 expected_lines
= self
.expected_lines
[:-1]
216 expected_lines
.append("")
218 # Expecting no output following newline.
219 expected_lines
= self
.expected_lines
220 expected_content
= "\n".join(expected_lines
)
221 self
.assertEqual(expected_content
, self
.instance
.f
.getvalue())
224 class TimestampFile_close_TestCase(
225 testscenarios
.WithScenarios
,
226 TimestampFile_TestCase
):
227 """ Test cases for `TimestampFile.write` method. """
229 scenarios
= TimestampFile_write_TestCase
.scenarios
231 @testtools.skip("TimestampFile.close method is broken")
232 def test_has_expected_final_line(self
):
233 """ Should have expected final line. """
234 self
.instance
.write(self
.test_output
)
235 self
.instance
.f
.seek(0)
236 self
.instance
.close()
237 expected_content
= self
.expected_lines
[-1]
238 self
.assertEqual(expected_content
, self
.instance
.f
.getvalue())
241 class FileWithProgress_TestCase(
242 testscenarios
.WithScenarios
,
244 """ Base for test cases for the `FileWithProgress` class. """
248 'progressf': sys
.__stdout
__,
254 """ Set up test fixtures. """
255 super(FileWithProgress_TestCase
, self
).setUp()
256 patch_system_interfaces(self
)
258 self
.test_file
= StringIO(
259 getattr(self
, 'content', ""))
264 def set_test_args(self
):
265 """ Set the arguments for the test instance constructor. """
266 self
.test_args
= dict(
269 if hasattr(self
, 'test_ptype'):
270 self
.test_args
['ptype'] = self
.test_ptype
271 if hasattr(self
, 'test_progressf'):
272 self
.test_args
['progressf'] = self
.test_progressf
273 if hasattr(self
, 'test_size'):
274 self
.test_args
['size'] = self
.test_size
275 if hasattr(self
, 'test_step'):
276 self
.test_args
['step'] = self
.test_step
278 def make_instance(self
):
279 """ Make the test instance of the class. """
280 self
.instance
= dputhelper
.FileWithProgress(**self
.test_args
)
283 class FileWithProgress_ArgsTestCase(FileWithProgress_TestCase
):
284 """ Test cases for constructor arguments for `FileWithProgress` class. """
290 'test_progressf': StringIO(),
296 def test_has_specified_file(self
):
297 """ Should have specified file object as `f` attribute. """
298 self
.assertIs(self
.test_file
, self
.instance
.f
)
300 def test_has_specified_ptype(self
):
301 """ Should have specified progress type value as `ptype` attribute. """
302 expected_ptype
= getattr(
303 self
, 'test_ptype', self
.default_args
['ptype'])
304 self
.assertEqual(expected_ptype
, self
.instance
.ptype
)
306 def test_has_specified_progressf(self
):
307 """ Should have specified progress file as `progressf` attribute. """
308 expected_progressf
= getattr(
309 self
, 'test_progressf', self
.default_args
['progressf'])
310 self
.assertEqual(expected_progressf
, self
.instance
.progressf
)
312 def test_has_specified_size(self
):
313 """ Should have specified size value as `size` attribute. """
314 expected_size
= getattr(
315 self
, 'test_size', self
.default_args
['size'])
316 self
.assertEqual(expected_size
, self
.instance
.size
)
318 def test_has_specified_step(self
):
319 """ Should have specified step value as `step` attribute. """
320 expected_step
= getattr(
321 self
, 'test_step', self
.default_args
['step'])
322 self
.assertEqual(expected_step
, self
.instance
.step
)
324 def test_has_attributes_from_component_file(self
):
325 """ Should have attributes directly from component file. """
328 'mode', 'name', 'encoding',
329 'readable', 'seekable', 'writable',
330 'seek', 'tell', 'write',
332 for attr_name
in attr_names
:
333 expected_attr_value
= getattr(self
.test_file
, attr_name
, None)
335 getattr(self
.instance
, attr_name
, None),
336 testtools
.matchers
.Equals(expected_attr_value
))
339 class FileWithProgress_OutputTestCase(FileWithProgress_TestCase
):
340 """ Test cases for progress output for `FileWithProgress` class. """
342 content_scenarios
= [
347 'content': "0123456789\n" * 1000,
349 ('10 000 000 chars', {
350 'content': "0123456789\n" * 1000000,
356 ('ptype 0', {'test_ptype': 0}),
357 ('ptype 1', {'test_ptype': 1}),
358 ('ptype 2', {'test_ptype': 2}),
363 ('step 5', {'test_step': 5}),
364 ('step 500', {'test_step': 500}),
365 ('step 50 000', {'test_step': 50000}),
368 scenarios
= testscenarios
.multiply_scenarios(
369 content_scenarios
, ptype_scenarios
, step_scenarios
)
372 """ Set up test fixtures. """
373 super(FileWithProgress_OutputTestCase
, self
).setUp()
375 self
.test_file
= StringIO(self
.content
)
376 self
.test_size
= len(self
.content
)
377 self
.test_progressf
= StringIO()
381 self
.set_expected_output()
383 def set_expected_output(self
):
384 """ Set the expected output for this test case. """
385 ptype
= getattr(self
, 'test_ptype', self
.default_args
['ptype'])
387 self
.expected_output
= "/"
389 step
= getattr(self
, 'test_step', 1024)
390 total_bytes
= len(self
.content
)
391 total_hunks
= int(total_bytes
/ step
)
392 total_hunks_text
= "{size}k".format(size
=total_hunks
)
394 (total_bytes
+ step
- 1) / step
)
395 total_steps_text
= "{size}k".format(size
=total_steps
)
396 progress_text
= "{hunks}/{steps}".format(
397 hunks
=total_hunks_text
, steps
=total_steps_text
)
398 self
.expected_output
= progress_text
400 # `ptype == 0` specifies no progress output.
401 self
.expected_output
= ""
404 # No progress output for an empty file.
405 self
.expected_output
= ""
407 def test_emits_expected_output_for_content(self
):
408 """ Should emit expected output for file content. """
410 output_stream_content
= self
.test_progressf
.getvalue()
412 self
.expected_output
, output_stream_content
)
414 def test_clears_output_on_close(self
):
415 """ Should clear progress output when closed. """
417 self
.instance
.close()
420 + len(self
.expected_output
) * "\b"
421 + len(self
.expected_output
) * " "
422 + len(self
.expected_output
) * "\b"
424 output_stream_content
= self
.test_progressf
.getvalue()
425 self
.assertEqual(expected_output
, output_stream_content
)
428 def patch_filewithprogress(testcase
):
429 """ Patch the `FileWithProgress` class for the test case. """
430 if not hasattr(testcase
, 'fake_filewithprogress'):
431 testcase
.fake_filewithprogress
= mock
.MagicMock(
432 spec
=dputhelper
.FileWithProgress
, name
="FileWithProgress")
434 def fake_filewithprogress_factory(
435 f
, ptype
=0, progressf
=sys
.stdout
, size
=-1, step
=1024):
436 result
= testcase
.fake_filewithprogress
439 result
.progressf
= progressf
444 func_patcher
= mock
.patch
.object(
445 dputhelper
, "FileWithProgress", autospec
=True,
446 side_effect
=fake_filewithprogress_factory
)
448 testcase
.addCleanup(func_patcher
.stop
)
451 class make_text_stream_TestCase(
452 testscenarios
.WithScenarios
,
454 """ Test cases for `make_text_stream` function. """
456 fake_preferred_encoding
= str("johab")
459 ('text-stream-no-encoding', {
460 'fake_file_params': {
462 'content': u
"Lorem ipsum",
465 'expected_encoding': None,
466 'expected_content': u
"Lorem ipsum",
469 'fake_file_params': {
470 'type': io
.TextIOWrapper
,
471 'content': u
"Lorem ipsum",
472 'encoding': str("utf-8"),
474 'expected_encoding': "utf-8",
475 'expected_content': u
"Lorem ipsum",
478 'fake_file_params': {
480 'content': u
"Lorem ipsum".encode(fake_preferred_encoding
),
482 'expected_encoding': fake_preferred_encoding
,
483 'expected_content': u
"Lorem ipsum",
487 if sys
.version_info
>= (3, 0):
488 # This version of Python does not have brute `file` type, so
489 # we don't need to test for streams of that type.
492 # Streams are brute `file` objects in Python < 3, so we need
493 # to test that type also.
496 'fake_file_params': {
498 'content': u
"Lorem ipsum".encode("utf-8"),
500 'expected_encoding': fake_preferred_encoding
,
501 'expected_content': u
"Lorem ipsum",
506 """ Set up test fixtures. """
507 super(make_text_stream_TestCase
, self
).setUp()
509 self
.patch_locale_getpreferredencoding()
515 def set_test_args(self
):
516 """ Set the arguments for the test call to the function. """
517 self
.test_args
= dict(
518 stream
=self
.fake_file
,
521 def patch_locale_getpreferredencoding(self
):
522 """ Patch the `locale.getpreferredencoding` function. """
523 func_patcher
= mock
.patch
.object(
524 locale
, "getpreferredencoding", autospec
=True,
525 return_value
=self
.fake_preferred_encoding
)
527 self
.addCleanup(func_patcher
.stop
)
529 def set_fake_file(self
):
530 """ Set the fake file for this test case. """
531 file_params
= self
.fake_file_params
532 file_type
= file_params
['type']
534 content
= file_params
['content']
535 if file_params
.get('encoding', None) is not None:
536 content_bytestream
= io
.BytesIO(
537 content
.encode(file_params
['encoding']))
538 elif isinstance(content
, bytes
):
539 content_bytestream
= io
.BytesIO(content
)
541 content_bytestream
= None
543 if isinstance(file_type
, type):
544 if issubclass(file_type
, io
.TextIOWrapper
):
545 fake_file
= file_type(
546 content_bytestream
, encoding
=file_params
['encoding'])
548 fake_file
= file_type(content
)
550 # Not actually a type, but a factory function.
551 fake_file
= file_type()
552 fake_file
.write(file_params
['content'])
555 self
.fake_file
= fake_file
557 def test_result_is_specified_stream_if_has_encoding(self
):
558 """ Result should be the same stream if it has an encoding. """
559 if not isinstance(self
.fake_file
, io
.TextIOBase
):
560 self
.skipTest("Specified stream is not text")
561 result
= dputhelper
.make_text_stream(**self
.test_args
)
562 self
.assertIs(self
.fake_file
, result
)
564 def test_result_has_expected_encoding(self
):
565 """ Result should have the expected `encoding` attribute. """
566 result
= dputhelper
.make_text_stream(**self
.test_args
)
567 self
.assertEqual(self
.expected_encoding
, result
.encoding
)
569 def test_result_emits_expected_content(self
):
570 """ Result should emit the expected content. """
571 result
= dputhelper
.make_text_stream(**self
.test_args
)
572 if isinstance(result
, io
.BufferedRandom
):
574 result
.name
, mode
='r',
575 encoding
=self
.fake_file
.encoding
) as infile
:
576 content
= infile
.read()
578 content
= result
.read()
579 self
.assertEqual(self
.expected_content
, content
)
582 GetoptResult
= collections
.namedtuple('GetoptResult', ['optlist', 'args'])
585 class getopt_SuccessTestCase(
586 testscenarios
.WithScenarios
,
588 """ Success test cases for `getopt` function. """
592 'test_argv': [object()],
593 'expected_result': GetoptResult(
594 optlist
=[], args
=[]),
597 'test_argv': [object(), "foo", "bar", "baz"],
598 'expected_result': GetoptResult(
599 optlist
=[], args
=["foo", "bar", "baz"]),
601 ('only short opts', {
602 'test_argv': [object(), "-a", "-b", "-c"],
603 'test_shortopts': "axbycz",
604 'expected_result': GetoptResult(
613 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
615 "wibble", "alpha", "wobble",
616 "beta", "wubble", "gamma",
618 'expected_result': GetoptResult(
626 ('long opt prefix', {
627 'test_argv': [object(), "--al", "--be", "--ga"],
629 "wibble", "alpha", "wobble",
630 "beta", "wubble", "gamma",
632 'expected_result': GetoptResult(
640 ('short opt cluster', {
641 'test_argv': [object(), "-abc"],
642 'test_shortopts': "abc",
643 'expected_result': GetoptResult(
651 ('short with args', {
652 'test_argv': [object(), "-a", "-b", "eggs", "-cbeans"],
653 'test_shortopts': "ab:c:",
654 'expected_result': GetoptResult(
669 "wibble", "alpha", "wobble",
670 "beta=", "wubble", "gamma=",
672 'expected_result': GetoptResult(
676 ('--gamma', "beans"),
680 ('long with optional args', {
687 "wibble", "alpha", "wobble",
688 "beta==", "wubble", "gamma==",
690 'expected_result': GetoptResult(
698 ('single hyphen arg', {
699 'test_argv': [object(), "-a", "-b", "-c", "-"],
700 'test_shortopts': "axbycz",
701 'expected_result': GetoptResult(
709 ('explicit end of opts', {
717 "wibble", "alpha", "wobble",
718 "beta", "wubble", "gamma",
720 'expected_result': GetoptResult(
729 def test_returns_expected_result_for_argv(self
):
730 """ Should return expected result for specified argv. """
731 shortopts
= getattr(self
, 'test_shortopts', "")
732 longopts
= getattr(self
, 'test_longopts', "")
733 result
= dputhelper
.getopt(
734 self
.test_argv
[1:], shortopts
, longopts
)
735 self
.assertEqual(self
.expected_result
, result
)
738 class getopt_ErrorTestCase(
739 testscenarios
.WithScenarios
,
741 """ Error test cases for `getopt` function. """
744 ('short opt unknown', {
745 'test_argv': [object(), "-a", "-b", "-z", "-c"],
746 'test_shortopts': "abc",
747 'expected_error': dputhelper
.DputException
,
749 ('short missing arg', {
750 'test_argv': [object(), "-a", "-b", "-c"],
751 'test_shortopts': "abc:",
752 'expected_error': dputhelper
.DputException
,
754 ('long opt unknown', {
756 object(), "--alpha", "--beta", "--zeta", "--gamma"],
758 "alpha", "beta", "gamma"],
759 'expected_error': dputhelper
.DputException
,
761 ('long ambiguous prefix', {
763 object(), "--alpha", "--be", "--gamma"],
765 "alpha", "beta", "bettong", "bertha", "gamma"],
766 'expected_error': dputhelper
.DputException
,
768 ('long missing arg', {
769 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
771 "alpha", "beta", "gamma="],
772 'expected_error': dputhelper
.DputException
,
774 ('long unexpected arg', {
776 object(), "--alpha", "--beta=beans", "--gamma"],
778 "alpha", "beta", "gamma"],
779 'expected_error': dputhelper
.DputException
,
783 def test_raises_expected_error_for_argv(self
):
784 """ Should raise expected error for specified argv. """
785 shortopts
= getattr(self
, 'test_shortopts', "")
786 longopts
= getattr(self
, 'test_longopts', "")
787 with testtools
.ExpectedException(self
.expected_error
):
789 self
.test_argv
[1:], shortopts
, longopts
)
792 def patch_getopt(testcase
):
793 """ Patch the `getopt` function for the specified test case. """
794 def fake_getopt(args
, shortopts
, longopts
):
795 result
= (testcase
.getopt_opts
, testcase
.getopt_args
)
798 func_patcher
= mock
.patch
.object(
799 dputhelper
, "getopt", autospec
=True,
800 side_effect
=fake_getopt
)
802 testcase
.addCleanup(func_patcher
.stop
)
805 class get_progname_TestCase(
806 testscenarios
.WithScenarios
,
808 """ Test cases for `get_progname` function. """
810 command_name_scenarios
= [
813 'expected_progname': "amet",
815 ('command-relative', {
816 'argv_zero': "lorem/ipsum/dolor/sit/amet",
817 'expected_progname': "amet",
819 ('command-absolute', {
820 'argv_zero': "/lorem/ipsum/dolor/sit/amet",
821 'expected_progname': "amet",
825 subsequent_args_scenarios
= [
830 'argv_remain': ["spam"],
832 ('args-three-words', {
833 'argv_remain': ["spam", "beans", "eggs"],
835 ('args-one-option', {
836 'argv_remain': ["--spam"],
840 scenarios
= testscenarios
.multiply_scenarios(
841 command_name_scenarios
, subsequent_args_scenarios
)
844 """ Set up test fixtures. """
845 super(get_progname_TestCase
, self
).setUp()
847 self
.test_argv
= [self
.argv_zero
] + self
.argv_remain
849 def test_returns_expected_progname(self
):
850 """ Should return expected progname value for command line. """
851 result
= dputhelper
.get_progname(self
.test_argv
)
852 self
.assertEqual(self
.expected_progname
, result
)
854 def test_queries_sys_argv_if_argv_unspecified(self
):
855 """ Should query `sys.argv` if no `argv` specified. """
856 self
.sys_argv
= self
.test_argv
858 result
= dputhelper
.get_progname()
859 self
.assertEqual(self
.expected_progname
, result
)
862 def patch_pkg_resources_get_distribution(testcase
):
863 """ Patch `pkg_resources.get_distribution` for the test case. """
864 if not hasattr(testcase
, 'fake_distribution'):
865 testcase
.fake_distribution
= mock
.MagicMock(pkg_resources
.Distribution
)
866 func_patcher
= mock
.patch
.object(
867 pkg_resources
, "get_distribution", autospec
=True,
868 return_value
=testcase
.fake_distribution
)
870 testcase
.addCleanup(func_patcher
.stop
)
873 class get_distribution_version_TestCase(
874 testscenarios
.WithScenarios
,
876 """ Test cases for `get_distribution_version` function. """
880 'fake_distribution': mock
.MagicMock(
881 project_name
="lorem", version
="42.23"),
886 """ Set up test fixtures. """
887 super(get_distribution_version_TestCase
, self
).setUp()
889 patch_pkg_resources_get_distribution(self
)
891 def test_returns_expected_result(self
):
892 """ Should return expected version for the distribution. """
893 result
= dputhelper
.get_distribution_version()
894 expected_version
= self
.fake_distribution
.version
895 self
.assertEqual(expected_version
, result
)
898 # Copyright © 2015–2016 Ben Finney <bignose@debian.org>
900 # This is free software: you may copy, modify, and/or distribute this work
901 # under the terms of the GNU General Public License as published by the
902 # Free Software Foundation; version 3 of that license or any later version.
903 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
910 # vim: fileencoding=utf-8 filetype=python :