1 # -*- coding: utf-8; -*-
3 # test/test_dputhelper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015–2016 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Unit tests for ‘dput.helper.dputhelper’ module. """
15 from __future__
import (absolute_import
, unicode_literals
)
29 import testtools
.matchers
33 __package__
= str("test")
34 __import__(__package__
)
35 sys
.path
.insert(1, os
.path
.dirname(os
.path
.dirname(__file__
)))
36 from dput
.helper
import dputhelper
42 patch_system_interfaces
,
44 EXIT_STATUS_SUCCESS
, EXIT_STATUS_FAILURE
,
45 EXIT_STATUS_COMMAND_NOT_FOUND
,
46 make_fake_file_scenarios
,
48 patch_subprocess_check_call
,
53 class check_call_TestCase(
54 testscenarios
.WithScenarios
,
56 """ Test cases for `check_call` function. """
58 default_args
= collections
.OrderedDict([
59 ('args', ["arg-{}".format(n
) for n
in range(5)]),
64 'test_args': default_args
.copy(),
65 'subprocess_check_call_scenario_name': 'success',
66 'expected_exit_status': EXIT_STATUS_SUCCESS
,
69 'test_args': default_args
.copy(),
70 'subprocess_check_call_scenario_name': 'failure',
71 'expected_exit_status': EXIT_STATUS_FAILURE
,
72 'expected_output': textwrap
.dedent("""\
73 Warning: The execution of '...' as
75 returned a nonzero exit code.
79 'test_args': default_args
.copy(),
80 'subprocess_check_call_scenario_name': 'not_found',
81 'expected_exit_status': EXIT_STATUS_COMMAND_NOT_FOUND
,
82 'expected_output': textwrap
.dedent("""\
83 Error: Failed to execute '...'.
84 The file may not exist or not be executable.
90 """ Set up test fixtures. """
91 super(check_call_TestCase
, self
).setUp()
92 patch_system_interfaces(self
)
94 patch_subprocess_check_call(self
)
96 self
.set_subprocess_double()
98 def set_subprocess_double(self
):
99 """ Set the test double for the subprocess. """
100 command_file_path
= self
.test_args
['args'][0]
101 command_argv
= self
.test_args
['args']
102 double
= SubprocessDouble(command_file_path
, command_argv
)
103 double
.register_for_testcase(self
)
104 double
.set_subprocess_check_call_scenario(
105 self
.subprocess_check_call_scenario_name
)
106 self
.subprocess_double
= double
108 def test_calls_os_spawnv_with_specified_args(self
):
109 """ Should call `subprocess.check_call` with specified arguments. """
110 dputhelper
.check_call(*self
.test_args
.values())
111 subprocess
.check_call
.assert_called_with(*self
.test_args
.values())
113 def test_returns_expected_exit_status(self
):
114 """ Should return expected exit status for subprocess. """
115 exit_status
= dputhelper
.check_call(*self
.test_args
.values())
116 self
.assertEqual(self
.expected_exit_status
, exit_status
)
118 def test_emits_expected_output(self
):
119 """ Should emit the expected output messages. """
120 if not hasattr(self
, 'expected_output'):
121 self
.expected_output
= ""
122 dputhelper
.check_call(*self
.test_args
.values())
124 sys
.stderr
.getvalue(),
125 testtools
.matchers
.DocTestMatches(
126 self
.expected_output
, flags
=doctest
.ELLIPSIS
))
129 class TimestampFile_TestCase(testtools
.TestCase
):
130 """ Base for test cases for the `TimestampFile` class. """
132 scenarios
= NotImplemented
135 """ Set up test fixtures. """
136 super(TimestampFile_TestCase
, self
).setUp()
138 patch_time_time(self
, itertools
.count(1))
140 self
.test_file
= StringIO()
141 self
.instance
= dputhelper
.TimestampFile(self
.test_file
)
144 class TimestampFile_InstanceTestCase(
145 testscenarios
.WithScenarios
,
146 TimestampFile_TestCase
):
147 """ Test cases for `TimestampFile` instance creation. """
153 def test_has_specified_file(self
):
154 """ Should have specified file object as `f` attribute. """
155 self
.assertIs(self
.test_file
, self
.instance
.f
)
157 def test_has_attributes_from_component_file(self
):
158 """ Should have attributes directly from component file. """
161 'mode', 'name', 'encoding',
162 'readable', 'seekable', 'writable',
163 'read', 'seek', 'tell',
165 for attr_name
in attr_names
:
166 expected_attr_value
= getattr(self
.test_file
, attr_name
, None)
168 getattr(self
.instance
, attr_name
, None),
169 testtools
.matchers
.Equals(expected_attr_value
))
172 class TimestampFile_write_TestCase(
173 testscenarios
.WithScenarios
,
174 TimestampFile_TestCase
):
175 """ Test cases for `TimestampFile.write` method. """
180 'expected_lines': [],
183 'test_output': textwrap
.dedent("""\
184 Lorem ipsum, dolor sit amet.
187 "1: Lorem ipsum, dolor sit amet.",
192 'test_output': textwrap
.dedent("""\
193 Lorem ipsum, dolor sit amet,
194 consectetur adipiscing elit.
195 Integer non pulvinar risus, sed malesuada diam.
198 "1: Lorem ipsum, dolor sit amet,",
199 "2: consectetur adipiscing elit.",
200 "3: Integer non pulvinar risus, sed malesuada diam.",
204 ('lines-two-with-trail', {
205 'test_output': textwrap
.dedent("""\
206 Lorem ipsum, dolor sit amet,
207 consectetur adipiscing elit.
208 Integer non pulvinar risus"""),
210 "1: Lorem ipsum, dolor sit amet,",
211 "2: consectetur adipiscing elit.",
212 "3: Integer non pulvinar risus",
217 def test_has_expected_content_for_output(self
):
218 """ Should have expected content for specified `write` output. """
219 self
.instance
.write(self
.test_output
)
220 expected_lines
= self
.expected_lines
221 if self
.expected_lines
:
222 if self
.expected_lines
[-1]:
223 # Expecting an unterminated final line.
224 expected_lines
= self
.expected_lines
[:-1]
225 expected_lines
.append("")
227 # Expecting no output following newline.
228 expected_lines
= self
.expected_lines
229 expected_content
= "\n".join(expected_lines
)
230 self
.assertEqual(expected_content
, self
.instance
.f
.getvalue())
233 class TimestampFile_close_TestCase(
234 testscenarios
.WithScenarios
,
235 TimestampFile_TestCase
):
236 """ Test cases for `TimestampFile.write` method. """
238 scenarios
= TimestampFile_write_TestCase
.scenarios
240 @testtools.skip("TimestampFile.close method is broken")
241 def test_has_expected_final_line(self
):
242 """ Should have expected final line. """
243 self
.instance
.write(self
.test_output
)
244 self
.instance
.f
.seek(0)
245 self
.instance
.close()
246 expected_content
= self
.expected_lines
[-1]
247 self
.assertEqual(expected_content
, self
.instance
.f
.getvalue())
250 class FileWithProgress_TestCase(
251 testscenarios
.WithScenarios
,
253 """ Base for test cases for the `FileWithProgress` class. """
257 'progressf': sys
.__stdout
__,
263 """ Set up test fixtures. """
264 super(FileWithProgress_TestCase
, self
).setUp()
265 patch_system_interfaces(self
)
267 self
.test_file
= StringIO(
268 getattr(self
, 'content', ""))
273 def set_test_args(self
):
274 """ Set the arguments for the test instance constructor. """
275 self
.test_args
= dict(
278 if hasattr(self
, 'test_ptype'):
279 self
.test_args
['ptype'] = self
.test_ptype
280 if hasattr(self
, 'test_progressf'):
281 self
.test_args
['progressf'] = self
.test_progressf
282 if hasattr(self
, 'test_size'):
283 self
.test_args
['size'] = self
.test_size
284 if hasattr(self
, 'test_step'):
285 self
.test_args
['step'] = self
.test_step
287 def make_instance(self
):
288 """ Make the test instance of the class. """
289 self
.instance
= dputhelper
.FileWithProgress(**self
.test_args
)
292 class FileWithProgress_ArgsTestCase(FileWithProgress_TestCase
):
293 """ Test cases for constructor arguments for `FileWithProgress` class. """
299 'test_progressf': StringIO(),
305 def test_has_specified_file(self
):
306 """ Should have specified file object as `f` attribute. """
307 self
.assertIs(self
.test_file
, self
.instance
.f
)
309 def test_has_specified_ptype(self
):
310 """ Should have specified progress type value as `ptype` attribute. """
311 expected_ptype
= getattr(
312 self
, 'test_ptype', self
.default_args
['ptype'])
313 self
.assertEqual(expected_ptype
, self
.instance
.ptype
)
315 def test_has_specified_progressf(self
):
316 """ Should have specified progress file as `progressf` attribute. """
317 expected_progressf
= getattr(
318 self
, 'test_progressf', self
.default_args
['progressf'])
319 self
.assertEqual(expected_progressf
, self
.instance
.progressf
)
321 def test_has_specified_size(self
):
322 """ Should have specified size value as `size` attribute. """
323 expected_size
= getattr(
324 self
, 'test_size', self
.default_args
['size'])
325 self
.assertEqual(expected_size
, self
.instance
.size
)
327 def test_has_specified_step(self
):
328 """ Should have specified step value as `step` attribute. """
329 expected_step
= getattr(
330 self
, 'test_step', self
.default_args
['step'])
331 self
.assertEqual(expected_step
, self
.instance
.step
)
333 def test_has_attributes_from_component_file(self
):
334 """ Should have attributes directly from component file. """
337 'mode', 'name', 'encoding',
338 'readable', 'seekable', 'writable',
339 'seek', 'tell', 'write',
341 for attr_name
in attr_names
:
342 expected_attr_value
= getattr(self
.test_file
, attr_name
, None)
344 getattr(self
.instance
, attr_name
, None),
345 testtools
.matchers
.Equals(expected_attr_value
))
348 class FileWithProgress_OutputTestCase(FileWithProgress_TestCase
):
349 """ Test cases for progress output for `FileWithProgress` class. """
351 content_scenarios
= [
356 'content': "0123456789\n" * 1000,
358 ('10 000 000 chars', {
359 'content': "0123456789\n" * 1000000,
365 ('ptype 0', {'test_ptype': 0}),
366 ('ptype 1', {'test_ptype': 1}),
367 ('ptype 2', {'test_ptype': 2}),
372 ('step 5', {'test_step': 5}),
373 ('step 500', {'test_step': 500}),
374 ('step 50 000', {'test_step': 50000}),
377 scenarios
= testscenarios
.multiply_scenarios(
378 content_scenarios
, ptype_scenarios
, step_scenarios
)
381 """ Set up test fixtures. """
382 super(FileWithProgress_OutputTestCase
, self
).setUp()
384 self
.test_file
= StringIO(self
.content
)
385 self
.test_size
= len(self
.content
)
386 self
.test_progressf
= StringIO()
390 self
.set_expected_output()
392 def set_expected_output(self
):
393 """ Set the expected output for this test case. """
394 ptype
= getattr(self
, 'test_ptype', self
.default_args
['ptype'])
396 self
.expected_output
= "/"
398 step
= getattr(self
, 'test_step', 1024)
399 total_bytes
= len(self
.content
)
400 total_hunks
= int(total_bytes
/ step
)
401 total_hunks_text
= "{size}k".format(size
=total_hunks
)
403 (total_bytes
+ step
- 1) / step
)
404 total_steps_text
= "{size}k".format(size
=total_steps
)
405 progress_text
= "{hunks}/{steps}".format(
406 hunks
=total_hunks_text
, steps
=total_steps_text
)
407 self
.expected_output
= progress_text
409 # `ptype == 0` specifies no progress output.
410 self
.expected_output
= ""
413 # No progress output for an empty file.
414 self
.expected_output
= ""
416 def test_emits_expected_output_for_content(self
):
417 """ Should emit expected output for file content. """
419 output_stream_content
= self
.test_progressf
.getvalue()
421 self
.expected_output
, output_stream_content
)
423 def test_clears_output_on_close(self
):
424 """ Should clear progress output when closed. """
426 self
.instance
.close()
429 + len(self
.expected_output
) * "\b"
430 + len(self
.expected_output
) * " "
431 + len(self
.expected_output
) * "\b"
433 output_stream_content
= self
.test_progressf
.getvalue()
434 self
.assertEqual(expected_output
, output_stream_content
)
437 def patch_filewithprogress(testcase
):
438 """ Patch the `FileWithProgress` class for the test case. """
439 if not hasattr(testcase
, 'fake_filewithprogress'):
440 testcase
.fake_filewithprogress
= mock
.MagicMock(
441 spec
=dputhelper
.FileWithProgress
, name
="FileWithProgress")
443 def fake_filewithprogress_factory(
444 f
, ptype
=0, progressf
=sys
.stdout
, size
=-1, step
=1024):
445 result
= testcase
.fake_filewithprogress
448 result
.progressf
= progressf
453 func_patcher
= mock
.patch
.object(
454 dputhelper
, "FileWithProgress", autospec
=True,
455 side_effect
=fake_filewithprogress_factory
)
457 testcase
.addCleanup(func_patcher
.stop
)
460 class make_text_stream_TestCase(
461 testscenarios
.WithScenarios
,
463 """ Test cases for `make_text_stream` function. """
465 fake_preferred_encoding
= str("johab")
468 'bytes-file': io
.BytesIO(b
"Lorem ipsum"),
469 'text-file-no-encoding': io
.StringIO("Lorem ipsum"),
470 'text-file': io
.TextIOWrapper(
471 io
.BytesIO(b
"Lorem ipsum"),
476 ('text-stream-no-encoding', {
477 'fake_file_params': {
479 'content': u
"Lorem ipsum",
482 'expected_encoding': None,
483 'expected_content': u
"Lorem ipsum",
486 'fake_file_params': {
487 'type': io
.TextIOWrapper
,
488 'content': u
"Lorem ipsum",
489 'encoding': str("utf-8"),
491 'expected_encoding': "utf-8",
492 'expected_content': u
"Lorem ipsum",
495 'fake_file_params': {
497 'content': u
"Lorem ipsum".encode(fake_preferred_encoding
),
499 'expected_encoding': fake_preferred_encoding
,
500 'expected_content': u
"Lorem ipsum",
505 """ Set up test fixtures. """
506 super(make_text_stream_TestCase
, self
).setUp()
508 self
.patch_locale_getpreferredencoding()
514 def set_test_args(self
):
515 """ Set the arguments for the test call to the function. """
516 self
.test_args
= dict(
517 stream
=self
.fake_file
,
520 def patch_locale_getpreferredencoding(self
):
521 """ Patch the `locale.getpreferredencoding` function. """
522 func_patcher
= mock
.patch
.object(
523 locale
, "getpreferredencoding", autospec
=True,
524 return_value
=self
.fake_preferred_encoding
)
526 self
.addCleanup(func_patcher
.stop
)
528 def set_fake_file(self
):
529 """ Set the fake file for this test case. """
530 file_params
= self
.fake_file_params
531 file_type
= file_params
['type']
533 content
= file_params
['content']
534 if file_params
.get('encoding', None) is not None:
535 content_bytestream
= io
.BytesIO(
536 content
.encode(file_params
['encoding']))
537 elif isinstance(content
, bytes
):
538 content_bytestream
= io
.BytesIO(content
)
540 content_bytestream
= None
542 if issubclass(file_type
, io
.TextIOWrapper
):
543 fake_file
= file_type(
544 content_bytestream
, encoding
=file_params
['encoding'])
546 fake_file
= file_type(content
)
548 self
.fake_file
= fake_file
550 def test_result_is_specified_stream_if_has_encoding(self
):
551 """ Result should be the same stream if it has an encoding. """
552 if not isinstance(self
.fake_file
, io
.TextIOBase
):
553 self
.skipTest("Specified stream is not text")
554 result
= dputhelper
.make_text_stream(**self
.test_args
)
555 self
.assertIs(self
.fake_file
, result
)
557 def test_result_has_expected_encoding(self
):
558 """ Result should have the expected `encoding` attribute. """
559 result
= dputhelper
.make_text_stream(**self
.test_args
)
560 self
.assertEqual(self
.expected_encoding
, result
.encoding
)
562 def test_result_emits_expected_content(self
):
563 """ Result should emit the expected content. """
564 result
= dputhelper
.make_text_stream(**self
.test_args
)
565 content
= result
.read()
566 self
.assertEqual(self
.expected_content
, content
)
569 GetoptResult
= collections
.namedtuple('GetoptResult', ['optlist', 'args'])
572 class getopt_SuccessTestCase(
573 testscenarios
.WithScenarios
,
575 """ Success test cases for `getopt` function. """
579 'test_argv': [object()],
580 'expected_result': GetoptResult(
581 optlist
=[], args
=[]),
584 'test_argv': [object(), "foo", "bar", "baz"],
585 'expected_result': GetoptResult(
586 optlist
=[], args
=["foo", "bar", "baz"]),
588 ('only short opts', {
589 'test_argv': [object(), "-a", "-b", "-c"],
590 'test_shortopts': "axbycz",
591 'expected_result': GetoptResult(
600 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
602 "wibble", "alpha", "wobble",
603 "beta", "wubble", "gamma",
605 'expected_result': GetoptResult(
613 ('long opt prefix', {
614 'test_argv': [object(), "--al", "--be", "--ga"],
616 "wibble", "alpha", "wobble",
617 "beta", "wubble", "gamma",
619 'expected_result': GetoptResult(
627 ('short opt cluster', {
628 'test_argv': [object(), "-abc"],
629 'test_shortopts': "abc",
630 'expected_result': GetoptResult(
638 ('short with args', {
639 'test_argv': [object(), "-a", "-b", "eggs", "-cbeans"],
640 'test_shortopts': "ab:c:",
641 'expected_result': GetoptResult(
656 "wibble", "alpha", "wobble",
657 "beta=", "wubble", "gamma=",
659 'expected_result': GetoptResult(
663 ('--gamma', "beans"),
667 ('long with optional args', {
674 "wibble", "alpha", "wobble",
675 "beta==", "wubble", "gamma==",
677 'expected_result': GetoptResult(
685 ('single hyphen arg', {
686 'test_argv': [object(), "-a", "-b", "-c", "-"],
687 'test_shortopts': "axbycz",
688 'expected_result': GetoptResult(
696 ('explicit end of opts', {
704 "wibble", "alpha", "wobble",
705 "beta", "wubble", "gamma",
707 'expected_result': GetoptResult(
716 def test_returns_expected_result_for_argv(self
):
717 """ Should return expected result for specified argv. """
718 shortopts
= getattr(self
, 'test_shortopts', "")
719 longopts
= getattr(self
, 'test_longopts', "")
720 result
= dputhelper
.getopt(
721 self
.test_argv
[1:], shortopts
, longopts
)
722 self
.assertEqual(self
.expected_result
, result
)
725 class getopt_ErrorTestCase(
726 testscenarios
.WithScenarios
,
728 """ Error test cases for `getopt` function. """
731 ('short opt unknown', {
732 'test_argv': [object(), "-a", "-b", "-z", "-c"],
733 'test_shortopts': "abc",
734 'expected_error': dputhelper
.DputException
,
736 ('short missing arg', {
737 'test_argv': [object(), "-a", "-b", "-c"],
738 'test_shortopts': "abc:",
739 'expected_error': dputhelper
.DputException
,
741 ('long opt unknown', {
743 object(), "--alpha", "--beta", "--zeta", "--gamma"],
745 "alpha", "beta", "gamma"],
746 'expected_error': dputhelper
.DputException
,
748 ('long ambiguous prefix', {
750 object(), "--alpha", "--be", "--gamma"],
752 "alpha", "beta", "bettong", "bertha", "gamma"],
753 'expected_error': dputhelper
.DputException
,
755 ('long missing arg', {
756 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
758 "alpha", "beta", "gamma="],
759 'expected_error': dputhelper
.DputException
,
761 ('long unexpected arg', {
763 object(), "--alpha", "--beta=beans", "--gamma"],
765 "alpha", "beta", "gamma"],
766 'expected_error': dputhelper
.DputException
,
770 def test_raises_expected_error_for_argv(self
):
771 """ Should raise expected error for specified argv. """
772 shortopts
= getattr(self
, 'test_shortopts', "")
773 longopts
= getattr(self
, 'test_longopts', "")
774 with testtools
.ExpectedException(self
.expected_error
):
776 self
.test_argv
[1:], shortopts
, longopts
)
779 def patch_getopt(testcase
):
780 """ Patch the `getopt` function for the specified test case. """
781 def fake_getopt(args
, shortopts
, longopts
):
782 result
= (testcase
.getopt_opts
, testcase
.getopt_args
)
785 func_patcher
= mock
.patch
.object(
786 dputhelper
, "getopt", autospec
=True,
787 side_effect
=fake_getopt
)
789 testcase
.addCleanup(func_patcher
.stop
)
792 class get_progname_TestCase(
793 testscenarios
.WithScenarios
,
795 """ Test cases for `get_progname` function. """
797 command_name_scenarios
= [
800 'expected_progname': "amet",
802 ('command-relative', {
803 'argv_zero': "lorem/ipsum/dolor/sit/amet",
804 'expected_progname': "amet",
806 ('command-absolute', {
807 'argv_zero': "/lorem/ipsum/dolor/sit/amet",
808 'expected_progname': "amet",
812 subsequent_args_scenarios
= [
817 'argv_remain': ["spam"],
819 ('args-three-words', {
820 'argv_remain': ["spam", "beans", "eggs"],
822 ('args-one-option', {
823 'argv_remain': ["--spam"],
827 scenarios
= testscenarios
.multiply_scenarios(
828 command_name_scenarios
, subsequent_args_scenarios
)
831 """ Set up test fixtures. """
832 super(get_progname_TestCase
, self
).setUp()
834 self
.test_argv
= [self
.argv_zero
] + self
.argv_remain
836 def test_returns_expected_progname(self
):
837 """ Should return expected progname value for command line. """
838 result
= dputhelper
.get_progname(self
.test_argv
)
839 self
.assertEqual(self
.expected_progname
, result
)
841 def test_queries_sys_argv_if_argv_unspecified(self
):
842 """ Should query `sys.argv` if no `argv` specified. """
843 self
.sys_argv
= self
.test_argv
845 result
= dputhelper
.get_progname()
846 self
.assertEqual(self
.expected_progname
, result
)
849 def patch_pkg_resources_get_distribution(testcase
):
850 """ Patch `pkg_resources.get_distribution` for the test case. """
851 if not hasattr(testcase
, 'fake_distribution'):
852 testcase
.fake_distribution
= mock
.MagicMock(pkg_resources
.Distribution
)
853 func_patcher
= mock
.patch
.object(
854 pkg_resources
, "get_distribution", autospec
=True,
855 return_value
=testcase
.fake_distribution
)
857 testcase
.addCleanup(func_patcher
.stop
)
860 class get_distribution_version_TestCase(
861 testscenarios
.WithScenarios
,
863 """ Test cases for `get_distribution_version` function. """
867 'fake_distribution': mock
.MagicMock(
868 project_name
="lorem", version
="42.23"),
873 """ Set up test fixtures. """
874 super(get_distribution_version_TestCase
, self
).setUp()
876 patch_pkg_resources_get_distribution(self
)
878 def test_returns_expected_result(self
):
879 """ Should return expected version for the distribution. """
880 result
= dputhelper
.get_distribution_version()
881 expected_version
= self
.fake_distribution
.version
882 self
.assertEqual(expected_version
, result
)
889 # vim: fileencoding=utf-8 filetype=python :