Remove superfluous manipulation of import path.
[dput.git] / test / test_dput.py
blob8301f24b10494b38087c97278d26a7dacab9ece4
1 # -*- coding: utf-8; -*-
3 # test/test_dput.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # This is free software, and you are welcome to redistribute it under
7 # certain conditions; see the end of this file for copyright
8 # information, grant of license, and disclaimer of warranty.
10 """ Unit tests for ‘dput’ module. """
12 from __future__ import (absolute_import, unicode_literals)
14 import sys
15 import os
16 import os.path
17 import subprocess
18 import io
19 import tempfile
20 import signal
21 import textwrap
22 import doctest
23 import hashlib
25 import gpgme
26 import testtools
27 import testtools.matchers
28 import testscenarios
30 import dput.crypto
31 import dput.dput
32 from dput.helper import dputhelper
34 from .helper import (
35 builtins,
36 StringIO,
37 mock,
38 make_unique_slug,
39 patch_signal_signal,
40 FakeSystemExit,
41 EXIT_STATUS_FAILURE,
42 patch_system_interfaces,
43 make_fake_file_scenarios,
44 setup_fake_file_fixtures,
45 set_fake_file_scenario,
46 setup_file_double_behaviour,
47 patch_os_path_exists,
48 patch_os_access,
49 FileDouble,
50 ARG_ANY,
51 SubprocessDouble,
52 patch_subprocess_popen,
53 patch_subprocess_call,
54 patch_subprocess_check_call,
56 from .test_configfile import (
57 set_config,
58 patch_runtime_config_options,
60 from .test_changesfile import (
61 set_fake_upload_file_paths,
62 setup_upload_file_fixtures,
63 make_upload_files_params,
64 make_changes_document,
65 make_changes_file_scenarios,
66 setup_changes_file_fixtures,
67 set_changes_file_scenario,
71 class hexify_string_TestCase(
72 testscenarios.WithScenarios,
73 testtools.TestCase):
74 """ Success test cases for `hexify_string` function. """
76 scenarios = [
77 ('empty', {
78 'input_bytes': b"",
79 'expected_result': str(""),
80 }),
81 ('nul', {
82 'input_bytes': b"\x00",
83 'expected_result': str("00"),
84 }),
85 ('ASCII text', {
86 'input_bytes': (
87 b"Lorem ipsum dolor sit amet"
89 'expected_result': str(
90 "4c6f72656d20697073756d20646f6c6f722073697420616d6574"),
91 }),
92 ('UTF-8 encoded text', {
93 'input_bytes': b"::".join(text.encode('utf-8') for text in [
94 "السّلام عليكم",
95 "⠓⠑⠇⠇⠕",
96 "你好",
97 "Hello",
98 "Γειά σας",
99 "שלום",
100 "नमस्ते",
101 "こんにちは",
102 "안녕하세요",
103 "Здра́вствуйте",
104 "வணக்கம்",
106 'expected_result': str("3a3a").join([
107 "d8a7d984d8b3d991d984d8a7d98520d8b9d984d98ad983d985",
108 "e2a093e2a091e2a087e2a087e2a095",
109 "e4bda0e5a5bd",
110 "48656c6c6f",
111 "ce93ceb5ceb9ceac20cf83ceb1cf82",
112 "d7a9d79cd795d79d",
113 "e0a4a8e0a4aee0a4b8e0a58de0a4a4e0a587",
114 "e38193e38293e381abe381a1e381af",
115 "ec9588eb8595ed9598ec84b8ec9a94",
116 "d097d0b4d180d0b0cc81d0b2d181d182d0b2d183d0b9d182d0b5",
117 "e0aeb5e0aea3e0ae95e0af8de0ae95e0aeaee0af8d",
122 def test_returns_expected_result_for_input(self):
123 """ Should return expected result for input. """
124 result = dput.dput.hexify_string(self.input_bytes)
125 self.assertEqual(self.expected_result, result)
128 class checksum_test_TestCase(
129 testscenarios.WithScenarios,
130 testtools.TestCase):
131 """ Base for test cases for `checksum_test` function. """
133 scenarios = NotImplemented
135 def setUp(self):
136 """ Set up test fixtures. """
137 super(checksum_test_TestCase, self).setUp()
138 patch_system_interfaces(self)
140 setup_fake_file_fixtures(self)
141 set_fake_file_scenario(self, self.fake_file_scenario_name)
143 file_content = self.file_double.fake_file.getvalue().encode('utf-8')
144 self.file_double.fake_file = io.BytesIO(file_content)
146 self.set_test_args()
148 def set_test_args(self):
149 """ Set the arguments for the test call to the function. """
150 self.test_args = dict(
151 filename=self.file_double.path,
152 hash_name=self.hash_name,
156 class checksum_test_SuccessTestCase(checksum_test_TestCase):
157 """ Success test cases for `checksum_test` function. """
159 hash_scenarios = [
160 ('md5', {
161 'hash_name': "md5",
162 'test_hash_func': hashlib.md5,
164 ('sha1', {
165 'hash_name': "sha1",
166 'test_hash_func': hashlib.sha1,
170 fake_file_scenarios = list(
171 (name, scenario)
172 for (name, scenario) in make_fake_file_scenarios().items()
173 if not name.startswith('error'))
175 scenarios = testscenarios.multiply_scenarios(
176 hash_scenarios, fake_file_scenarios)
178 def test_returns_expected_result_for_input(self):
179 """ Should return expected result for specified inputs. """
180 expected_hash = self.test_hash_func(
181 self.file_double.fake_file.getvalue())
182 expected_result = expected_hash.hexdigest()
183 result = dput.dput.checksum_test(**self.test_args)
184 self.assertEqual(expected_result, result)
187 class checksum_test_ErrorTestCase(checksum_test_TestCase):
188 """ Error test cases for `checksum_test` function. """
190 hash_scenarios = [
191 ('md5', {
192 'hash_name': "md5",
196 fake_file_scenarios = list(
197 (name, scenario)
198 for (name, scenario) in make_fake_file_scenarios().items()
199 if name.startswith('error'))
201 scenarios = testscenarios.multiply_scenarios(
202 hash_scenarios, fake_file_scenarios)
204 def test_calls_sys_exit_if_error_reading_file(self):
205 """ Should call `sys.exit` if unable to read the file. """
206 set_fake_file_scenario(self, 'error-read-denied')
207 with testtools.ExpectedException(FakeSystemExit):
208 dput.dput.checksum_test(**self.test_args)
209 expected_output = textwrap.dedent("""\
210 Can't open {path}
211 """).format(path=self.file_double.path)
212 self.assertIn(expected_output, sys.stdout.getvalue())
213 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
216 class verify_signature_TestCase(
217 testscenarios.WithScenarios,
218 testtools.TestCase):
219 """ Base for test cases for `verify_signature` function. """
221 scenarios = NotImplemented
223 default_args = dict(
224 host="foo",
225 check_only=False,
226 unsigned_upload=False,
227 binary_upload=False,
228 debug=None,
231 def setUp(self):
232 """ Set up test fixtures. """
233 super(verify_signature_TestCase, self).setUp()
234 patch_system_interfaces(self)
236 set_config(
237 self,
238 getattr(self, 'config_scenario_name', 'exist-simple'))
239 patch_runtime_config_options(self)
241 self.set_changes_file_scenario('okay')
242 self.set_dsc_file_scenario('okay')
243 self.set_test_args()
245 setup_fake_file_fixtures(self)
246 set_fake_file_scenario(self, 'exist-minimal')
248 self.patch_check_file_signature()
250 def patch_check_file_signature(self):
251 """ Patch the `check_file_signature` function for this test case. """
252 func_patcher = mock.patch.object(
253 dput.crypto, "check_file_signature", autospec=True)
254 func_patcher.start()
255 self.addCleanup(func_patcher.stop)
257 def set_changes_file_scenario(self, name):
258 """ Set the changes file scenario for this test case. """
259 file_double = FileDouble(tempfile.mktemp())
260 file_double.set_open_scenario(name)
261 file_double.register_for_testcase(self)
262 self.changes_file_double = file_double
264 def set_dsc_file_scenario(self, name):
265 """ Set the source control file scenario for this test case. """
266 file_double = FileDouble(tempfile.mktemp())
267 file_double.set_open_scenario(name)
268 file_double.register_for_testcase(self)
269 self.dsc_file_double = file_double
271 def set_test_args(self):
272 """ Set test args for this test case. """
273 extra_args = getattr(self, 'extra_args', {})
274 self.test_args = self.default_args.copy()
275 self.test_args['config'] = self.runtime_config_parser
276 self.test_args['changes_file_path'] = self.changes_file_double.path
277 self.test_args['dsc_file_path'] = self.dsc_file_double.path
278 self.test_args.update(extra_args)
281 class verify_signature_DebugMessageTestCase(verify_signature_TestCase):
282 """ Test cases for `verify_signature` debug messages. """
284 scenarios = [
285 ('default', {}),
288 def test_emits_debug_message_showing_files(self):
289 """ Should emit a debug message for the specified files. """
290 self.test_args['debug'] = True
291 dput.dput.verify_signature(**self.test_args)
292 expected_output = textwrap.dedent("""\
293 D: upload control file: {changes_path}
294 D: source control file: {dsc_path}
295 """).format(
296 changes_path=self.test_args['changes_file_path'],
297 dsc_path=self.test_args['dsc_file_path'])
298 self.assertIn(expected_output, sys.stdout.getvalue())
301 class verify_signature_ChecksTestCase(verify_signature_TestCase):
302 """ Test cases for `verify_signature` checks. """
304 scenarios = [
305 ('default', {
306 'expected_checks': ['changes', 'dsc'],
308 ('source unsigned', {
309 'extra_args': {
310 'binary_upload': False,
311 'unsigned_upload': True,
313 'expected_checks': [],
315 ('source check-only', {
316 'extra_args': {
317 'binary_upload': False,
318 'check_only': True,
320 'expected_checks': ['changes', 'dsc'],
322 ('source allow_unsigned_uploads', {
323 'extra_args': {
324 'binary_upload': False,
326 'config_scenario_name': 'exist-default-not-unsigned',
327 'expected_checks': ['changes', 'dsc'],
329 ('binary unsigned', {
330 'extra_args': {
331 'binary_upload': True,
332 'unsigned_upload': True,
334 'expected_checks': [],
336 ('binary check-only', {
337 'extra_args': {
338 'binary_upload': True,
339 'check_only': True,
341 'expected_checks': ['changes'],
343 ('binary allow_unsigned_uploads', {
344 'extra_args': {
345 'binary_upload': True,
347 'config_scenario_name': 'exist-default-not-unsigned',
348 'expected_checks': ['changes'],
352 def test_checks_changes_signature_only_when_expected(self):
353 """ Should only check the changes document signature if expected. """
354 dput.dput.verify_signature(**self.test_args)
355 check_call = mock.call(self.changes_file_double.fake_file)
356 if 'changes' in self.expected_checks:
357 self.assertIn(
358 check_call,
359 dput.crypto.check_file_signature.call_args_list)
360 else:
361 self.assertNotIn(
362 check_call,
363 dput.crypto.check_file_signature.call_args_list)
365 def test_calls_sys_exit_when_check_file_signature_raises_error(self):
366 """ Should call `sys.exit` when `check_file_signature` error. """
367 if not self.expected_checks:
368 self.skipTest("No signature checks requested")
369 dput.crypto.check_file_signature.side_effect = gpgme.GpgmeError
370 with testtools.ExpectedException(FakeSystemExit):
371 dput.dput.verify_signature(**self.test_args)
372 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
374 def test_checks_dsc_signature_only_when_expected(self):
375 """ Should only check the ‘dsc’ document signature if expected. """
376 dput.dput.verify_signature(**self.test_args)
377 check_call = mock.call(self.dsc_file_double.fake_file)
378 if 'dsc' in self.expected_checks:
379 self.assertIn(
380 check_call,
381 dput.crypto.check_file_signature.call_args_list)
382 else:
383 self.assertNotIn(
384 check_call,
385 dput.crypto.check_file_signature.call_args_list)
388 class create_upload_file_TestCase(
389 testscenarios.WithScenarios,
390 testtools.TestCase):
391 """ Test cases for `create_upload_file` function. """
393 scenarios = [
394 ('simple', {}),
395 ('local', {
396 'test_host': "foo",
397 'test_fqdn': "localhost",
399 ('log file exists', {
400 'fake_file_scenario_name': 'exist-empty',
401 'file_access_scenario_name': 'okay',
402 'expected_open_mode': 'a',
406 def setUp(self):
407 """ Set up test fixtures. """
408 super(create_upload_file_TestCase, self).setUp()
409 patch_system_interfaces(self)
411 if not hasattr(self, 'test_host'):
412 self.test_host = make_unique_slug(self)
413 self.test_fqdn = make_unique_slug(self)
415 setup_changes_file_fixtures(self)
416 set_changes_file_scenario(
417 self,
418 getattr(self, 'changes_file_scenario_name', 'exist-minimal'))
420 set_fake_upload_file_paths(self)
421 self.set_test_args()
423 setup_fake_file_fixtures(self)
424 self.set_fake_file_scenario(
425 getattr(self, 'fake_file_scenario_name', 'not-found'))
426 if not hasattr(self, 'expected_open_mode'):
427 self.expected_open_mode = 'w'
429 self.set_upload_log_file_double()
430 self.set_upload_log_file_params()
432 patch_os_access(self)
434 def set_test_args(self):
435 """ Set the arguments for the test call to the function. """
436 changes_file_name = os.path.basename(self.changes_file_double.path)
437 self.package_file_name = (
438 os.path.splitext(changes_file_name)[0] + ".source")
439 self.test_args = dict(
440 package=self.package_file_name,
441 host=self.test_host,
442 fqdn=self.test_fqdn,
443 path=os.path.dirname(self.changes_file_double.path),
444 files_to_upload=self.fake_upload_file_paths,
445 debug=False,
448 def set_upload_log_file_double(self):
449 """ Set the fake upload log file. """
450 changes_file_name = os.path.basename(self.changes_file_double.path)
451 changes_file_name_base = os.path.splitext(changes_file_name)[0]
452 upload_log_file_name = "{base}.{host}.upload".format(
453 base=changes_file_name_base, host=self.test_host)
454 self.file_double.path = os.path.join(
455 os.path.dirname(self.changes_file_double.path),
456 upload_log_file_name)
457 self.upload_log_file_double = self.file_double
458 self.upload_log_file_double.set_os_access_scenario(
459 getattr(
460 self, 'file_access_scenario_name', 'not_exist'))
461 self.upload_log_file_double.register_for_testcase(self)
463 def set_upload_log_file_params(self):
464 """ Set the parameters for the upload log file. """
465 self.upload_log_file_double.set_open_scenario('nonexist')
467 def set_fake_file_scenario(self, name):
468 """ Set the output file scenario for this test case. """
469 self.fake_file_scenario = self.fake_file_scenarios[name]
470 self.file_double = self.fake_file_scenario['file_double']
472 def test_emits_upload_log_debug_message(self):
473 """ Should emit debug message for creation of upload log file. """
474 self.test_args['debug'] = True
475 dput.dput.create_upload_file(**self.test_args)
476 expected_output = textwrap.dedent("""\
477 D: Writing logfile: {path}
478 """).format(path=self.upload_log_file_double.path)
479 self.assertIn(expected_output, sys.stdout.getvalue())
481 def test_opens_log_file_with_expected_mode(self):
482 """ Should open log file with expected write mode. """
483 dput.dput.create_upload_file(**self.test_args)
484 builtins.open.assert_called_with(
485 self.upload_log_file_double.path, self.expected_open_mode)
487 def test_writes_expected_message_for_each_file(self):
488 """ Should write log message for each upload file path. """
489 with mock.patch.object(
490 self.upload_log_file_double.fake_file, "close", autospec=True):
491 dput.dput.create_upload_file(**self.test_args)
492 expected_host = self.test_host
493 expected_fqdn = self.test_fqdn
494 for file_path in self.fake_upload_file_paths:
495 expected_message = (
496 "Successfully uploaded {filename}"
497 " to {fqdn} for {host}.\n").format(
498 filename=os.path.basename(file_path),
499 fqdn=expected_fqdn, host=expected_host)
500 self.expectThat(
501 self.upload_log_file_double.fake_file.getvalue(),
502 testtools.matchers.Contains(expected_message))
504 def test_calls_sys_exit_if_write_denied(self):
505 """ Should call `sys.exit` if write permission denied. """
506 self.upload_log_file_double.set_os_access_scenario('read_only')
507 self.upload_log_file_double.set_open_scenario('write_denied')
508 with testtools.ExpectedException(FakeSystemExit):
509 dput.dput.create_upload_file(**self.test_args)
510 expected_output = textwrap.dedent("""\
511 Could not write {path}
512 """).format(path=self.upload_log_file_double.path)
513 self.assertIn(expected_output, sys.stderr.getvalue())
514 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
517 class run_lintian_test_TestCase(testtools.TestCase):
518 """ Test cases for `run_lintian_test` function. """
520 scenarios = [
521 ('simple', {}),
524 def setUp(self):
525 """ Set up test fixtures. """
526 super(run_lintian_test_TestCase, self).setUp()
527 patch_system_interfaces(self)
529 patch_signal_signal(self)
530 patch_os_access(self)
532 setup_changes_file_fixtures(self)
533 set_changes_file_scenario(
534 self,
535 getattr(self, 'changes_file_scenario_name', 'exist-minimal'))
537 self.set_test_args()
539 self.patch_lintian_program_file()
540 self.lintian_program_file_double.set_os_access_scenario('okay')
542 patch_subprocess_check_call(self)
543 self.set_lintian_subprocess_double()
545 def set_test_args(self):
546 """ Set the arguments for the test call to the function. """
547 self.test_args = dict(
548 changes_file=self.changes_file_double.path,
551 def patch_lintian_program_file(self):
552 """ Patch the Lintian program file for this test case. """
553 file_path = '/usr/bin/lintian'
554 file_double = FileDouble(file_path)
555 file_double.register_for_testcase(self)
556 self.lintian_program_file_double = file_double
558 def set_lintian_subprocess_double(self):
559 """ Set the test double for the Lintian subprocess. """
560 argv = [
561 os.path.basename(self.lintian_program_file_double.path),
562 "-i", ARG_ANY]
563 double = SubprocessDouble(
564 self.lintian_program_file_double.path, argv=argv)
565 double.register_for_testcase(self)
566 double.set_subprocess_check_call_scenario('not_found')
567 self.lintian_subprocess_double = double
569 def test_calls_sys_exit_if_read_denied(self):
570 """ Should call `sys.exit` if read permission denied. """
571 self.changes_file_double.set_os_access_scenario('denied')
572 self.changes_file_double.set_open_scenario('read_denied')
573 with testtools.ExpectedException(FakeSystemExit):
574 dput.dput.run_lintian_test(**self.test_args)
575 expected_output = textwrap.dedent("""\
576 Can't read {path}
577 """).format(path=self.changes_file_double.path)
578 self.assertIn(expected_output, sys.stdout.getvalue())
579 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
581 def test_emits_message_when_lintian_access_denied(self):
582 """ Should emit an explanatory message when Lintian access denied. """
583 self.lintian_program_file_double.set_os_access_scenario('denied')
584 dput.dput.run_lintian_test(**self.test_args)
585 expected_output = textwrap.dedent("""\
586 lintian is not installed, skipping package test.
587 """)
588 self.assertIn(expected_output, sys.stdout.getvalue())
590 def test_omits_check_call_when_lintian_access_denied(self):
591 """ Should omit `subprocess.check_call` when Lintian access denied. """
592 self.lintian_program_file_double.set_os_access_scenario('denied')
593 dput.dput.run_lintian_test(**self.test_args)
594 self.assertFalse(subprocess.check_call.called)
596 def test_sets_default_signal_handler_for_pipe_signal(self):
597 """ Should set default signal handler for PIPE signal. """
598 with testtools.ExpectedException(FakeSystemExit):
599 dput.dput.run_lintian_test(**self.test_args)
600 signal.signal.assert_called_with(signal.SIGPIPE, signal.SIG_DFL)
602 def test_calls_check_call_with_expected_args(self):
603 """ Should call `subprocess.check_call` with expected arguments. """
604 lintian_command_argv = [
605 os.path.basename(self.lintian_program_file_double.path),
606 "-i", self.changes_file_double.path]
607 with testtools.ExpectedException(FakeSystemExit):
608 dput.dput.run_lintian_test(**self.test_args)
609 subprocess.check_call.assert_called_with(lintian_command_argv)
611 def test_resets_signal_handler_for_pipe_signal_when_lintian_success(self):
612 """ Should reset signal handler for PIPE when Lintian succeeds. """
613 fake_orig_pipe_signal = self.getUniqueInteger()
614 signal.signal.return_value = fake_orig_pipe_signal
615 subprocess_double = self.lintian_subprocess_double
616 subprocess_double.set_subprocess_check_call_scenario('success')
617 dput.dput.run_lintian_test(**self.test_args)
618 expected_calls = [
619 mock.call(signal.SIGPIPE, signal.SIG_DFL),
620 mock.call(signal.SIGPIPE, fake_orig_pipe_signal),
622 signal.signal.assert_has_calls(expected_calls)
624 def test_calls_sys_exit_when_lintian_failure(self):
625 """ Should call `sys.exit` when Lintian reports failure. """
626 with testtools.ExpectedException(FakeSystemExit):
627 dput.dput.run_lintian_test(**self.test_args)
628 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
630 def test_emits_message_when_lintian_failure(self):
631 """ Should emit explanatory message when Lintian reports failure. """
632 with testtools.ExpectedException(FakeSystemExit):
633 dput.dput.run_lintian_test(**self.test_args)
634 expected_output = textwrap.dedent("""\
636 Lintian says this package is not compliant with ...
638 """)
639 self.assertThat(
640 sys.stdout.getvalue(),
641 testtools.matchers.DocTestMatches(
642 expected_output, doctest.ELLIPSIS))
645 class dinstall_caller_TestCase(
646 testtools.TestCase):
647 """ Test cases for `dinstall_caller` function. """
649 def setUp(self):
650 """ Set up test fixtures. """
651 super(dinstall_caller_TestCase, self).setUp()
652 patch_system_interfaces(self)
654 patch_subprocess_check_call(self)
656 self.set_test_args()
658 self.patch_ssh_program_file()
659 self.set_subprocess_double()
660 self.set_expected_command_argv()
662 def patch_ssh_program_file(self):
663 """ Patch the SSH program file for this test case. """
664 file_path = '/usr/bin/ssh'
665 file_double = FileDouble(file_path)
666 file_double.register_for_testcase(self)
667 self.ssh_program_file_double = file_double
669 def set_subprocess_double(self):
670 """ Set the test double for the subprocess. """
671 argv = [
672 os.path.basename(self.ssh_program_file_double.path),
673 ARG_ANY,
674 "cd", ARG_ANY,
675 ";",
676 "dinstall", "-n", ARG_ANY]
677 double = SubprocessDouble(self.ssh_program_file_double.path, argv=argv)
678 double.register_for_testcase(self)
679 double.set_subprocess_check_call_scenario('success')
680 self.ssh_subprocess_double = double
682 def set_test_args(self):
683 """ Set the arguments for the test call to the function. """
684 self.test_args = dict(
685 filename=tempfile.mktemp(),
686 host=make_unique_slug(self),
687 fqdn=make_unique_slug(self),
688 login=make_unique_slug(self),
689 incoming=tempfile.mktemp(),
690 debug=False,
693 def set_expected_command_argv(self):
694 """ Set the expected argv for the command. """
695 expected_fqdn = self.test_args['fqdn']
696 self.expected_host_spec = "{user}@{host}".format(
697 user=self.test_args['login'], host=expected_fqdn)
698 self.expected_command_argv = [
699 os.path.basename(self.ssh_program_file_double.path),
700 self.expected_host_spec,
701 "cd", self.test_args['incoming'], ";",
702 "dinstall", "-n", self.test_args['filename'],
705 def test_calls_check_call_with_expected_args(self):
706 """ Should call `subprocess.check_call` with expected arguments. """
707 dput.dput.dinstall_caller(**self.test_args)
708 subprocess.check_call.assert_called_with(self.expected_command_argv)
710 def test_emits_remote_command_debug_message(self):
711 """ Should emit a debug message for the remote command. """
712 self.test_args['debug'] = True
713 dput.dput.dinstall_caller(**self.test_args)
714 expected_output = textwrap.dedent("""\
715 D: Logging into {user}@{host}:{incoming_path}
716 D: dinstall -n {file_path}
717 """).format(
718 user=self.test_args['login'], host=self.test_args['host'],
719 incoming_path=self.test_args['incoming'],
720 file_path=self.test_args['filename'])
721 self.assertThat(
722 sys.stdout.getvalue(),
723 testtools.matchers.DocTestMatches(
724 expected_output, doctest.ELLIPSIS))
726 def test_calls_sys_exit_when_dinstall_failure(self):
727 """ Should call `sys.exit` when Dinstall reports failure. """
728 self.ssh_subprocess_double.set_subprocess_check_call_scenario(
729 'failure')
730 with testtools.ExpectedException(FakeSystemExit):
731 dput.dput.dinstall_caller(**self.test_args)
732 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
734 def test_emits_message_when_dinstall_failure(self):
735 """ Should emit explanatory message when Dinstall reports failure. """
736 self.ssh_subprocess_double.set_subprocess_check_call_scenario(
737 'failure')
738 with testtools.ExpectedException(FakeSystemExit):
739 dput.dput.dinstall_caller(**self.test_args)
740 expected_output = textwrap.dedent("""\
741 Error occured while trying to connect, or while attempting ...
742 """)
743 self.assertThat(
744 sys.stdout.getvalue(),
745 testtools.matchers.DocTestMatches(
746 expected_output, doctest.ELLIPSIS))
749 class execute_command_TestCase(
750 testtools.TestCase):
751 """ Test cases for `execute_command` function. """
753 def setUp(self):
754 """ Set up test fixtures. """
755 super(execute_command_TestCase, self).setUp()
756 patch_system_interfaces(self)
758 self.test_position = make_unique_slug(self)
759 self.set_program_file_double()
760 self.test_command = self.getUniqueString()
762 self.set_test_args()
764 patch_subprocess_call(self)
765 self.set_upload_command_scenario('simple')
767 def set_program_file_double(self):
768 """ Set the file double for the command program. """
769 self.upload_command_program_file_double = FileDouble()
771 def set_test_args(self):
772 """ Set the arguments for the test call to the function. """
773 self.test_args = dict(
774 command=self.test_command,
775 position=self.test_position,
776 debug=False,
779 def set_upload_command_scenario(self, name):
780 """ Set the scenario for the upload command behaviour. """
781 double = SubprocessDouble(
782 self.upload_command_program_file_double.path,
783 argv=self.test_command.split())
784 double.register_for_testcase(self)
785 double.set_subprocess_call_scenario('success')
786 self.upload_command_subprocess_double = double
788 def test_emits_debug_message_for_command(self):
789 """ Should emit a debug message for the specified command. """
790 self.test_args['debug'] = True
791 dput.dput.execute_command(**self.test_args)
792 expected_output = textwrap.dedent("""\
793 D: Command: {command}
794 """).format(command=self.test_command)
795 self.assertIn(expected_output, sys.stdout.getvalue())
797 def test_calls_subprocess_call_with_expected_args(self):
798 """ Should call `subprocess.call` with expected arguments. """
799 dput.dput.execute_command(**self.test_args)
800 subprocess.call.assert_called_with(self.test_command, shell=True)
802 def test_raises_error_when_command_failure(self):
803 """ Should raise error when command exits with failure. """
804 double = self.upload_command_subprocess_double
805 double.set_subprocess_call_scenario('failure')
806 with testtools.ExpectedException(dputhelper.DputUploadFatalException):
807 dput.dput.execute_command(**self.test_args)
809 def test_raises_error_when_command_not_found(self):
810 """ Should raise error when command not found. """
811 double = self.upload_command_subprocess_double
812 double.set_subprocess_call_scenario('not_found')
813 with testtools.ExpectedException(dputhelper.DputUploadFatalException):
814 dput.dput.execute_command(**self.test_args)
817 class version_check_TestCase(
818 testscenarios.WithScenarios,
819 testtools.TestCase):
820 """ Base for test cases for `version_check` function. """
822 package_file_scenarios = [
823 ('package-arch {arch}'.format(arch=arch), {
824 'file_double': FileDouble(),
825 'field_output': textwrap.dedent("""\
826 Architecture: {arch}
827 """).format(arch=arch),
829 for arch in ["all", "foo", "bar", "baz"]]
831 version_scenarios = [
832 ('version none', {
833 'upload_version': "lorem",
835 ('version equal', {
836 'upload_version': "lorem",
837 'installed_version': "lorem",
839 ('version unequal', {
840 'upload_version': "lorem",
841 'installed_version': "ipsum",
845 scenarios = NotImplemented
847 def setUp(self):
848 """ Set up test fixtures. """
849 super(version_check_TestCase, self).setUp()
850 patch_system_interfaces(self)
852 self.patch_dpkg_program_file()
854 if not hasattr(self, 'test_architectures'):
855 self.test_architectures = []
857 setup_changes_file_fixtures(self)
858 self.set_changes_file_scenario('no-format')
860 self.set_upload_files()
861 self.set_changes_document()
863 self.set_test_args()
865 patch_subprocess_popen(self)
866 self.set_dpkg_print_architecture_scenario('simple')
867 self.set_dpkg_field_scenario('simple')
868 self.set_dpkg_status_scenario('simple')
870 def set_changes_file_scenario(self, name):
871 """ Set the package changes document based on scenario name. """
872 scenarios = make_changes_file_scenarios()
873 scenario = dict(scenarios)[name]
874 self.changes_file_double = scenario['file_double']
876 def set_upload_files(self):
877 """ Set the files marked for upload in this scenario. """
878 package_file_suffix = ".deb"
879 file_suffix_by_arch = {
880 arch: "_{arch}{suffix}".format(
881 arch=arch, suffix=package_file_suffix)
882 for arch in self.test_architectures}
883 self.additional_file_suffixes = list(file_suffix_by_arch.values())
885 setup_upload_file_fixtures(self)
887 registry = FileDouble.get_registry_for_testcase(self)
888 for path in self.fake_upload_file_paths:
889 for (arch, suffix) in file_suffix_by_arch.items():
890 if path.endswith(suffix):
891 file_double = registry[path]
892 package_file_scenario = dict(self.package_file_scenarios)[
893 "package-arch {arch}".format(arch=arch)]
894 package_file_scenario['file_double'] = file_double
896 def set_changes_document(self):
897 """ Set the changes document for this test case. """
898 upload_params_by_name = make_upload_files_params(
899 self.fake_checksum_by_file, self.fake_size_by_file)
900 self.changes_document = make_changes_document(
901 fields={
902 'architecture': " ".join(self.test_architectures),
904 upload_params_by_name=upload_params_by_name)
906 def set_test_args(self):
907 """ Set the arguments for the test call to the function. """
908 self.test_args = dict(
909 path=os.path.dirname(self.changes_file_double.path),
910 changes=self.changes_document,
911 debug=False,
914 def patch_dpkg_program_file(self):
915 """ Patch the Dpkg program file for this test case. """
916 file_path = '/usr/bin/dpkg'
917 file_double = FileDouble(file_path)
918 file_double.register_for_testcase(self)
919 self.dpkg_program_file_double = file_double
921 def set_dpkg_print_architecture_scenario(self, name):
922 """ Set the scenario for ‘dpkg --print-architecture’ behaviour. """
923 argv = (
924 os.path.basename(self.dpkg_program_file_double.path),
925 "--print-architecture")
926 double = SubprocessDouble(
927 self.dpkg_program_file_double.path, argv=argv)
928 double.register_for_testcase(self)
929 double.set_subprocess_popen_scenario('success')
930 double.set_stdout_content(self.host_architecture)
931 self.dpkg_print_architecture_subprocess_double = double
933 def set_dpkg_field_scenario(self, name):
934 """ Set the scenario for ‘dpkg --field’ behaviour. """
935 for arch in self.test_architectures:
936 package_file_scenario = dict(self.package_file_scenarios)[
937 'package-arch {arch}'.format(arch=arch)]
938 upload_file_path = package_file_scenario['file_double'].path
939 argv = (
940 os.path.basename(self.dpkg_program_file_double.path),
941 "--field", upload_file_path)
942 double = SubprocessDouble(
943 self.dpkg_program_file_double.path, argv=argv)
944 double.register_for_testcase(self)
945 double.set_subprocess_popen_scenario('success')
947 package_file_scenario['package_name'] = make_unique_slug(self)
948 package_file_scenario['package_version'] = self.upload_version
949 field_output = package_file_scenario['field_output']
950 field_output += textwrap.dedent("""\
951 Package: {name}
952 Version: {version}
953 """).format(
954 name=package_file_scenario['package_name'],
955 version=package_file_scenario['package_version'])
956 double.set_stdout_content(field_output)
958 def set_dpkg_status_scenario(self, name):
959 """ Set the scenario for ‘dpkg -s’ behaviour. """
960 for arch in self.test_architectures:
961 package_file_scenario = dict(self.package_file_scenarios)[
962 'package-arch {arch}'.format(arch=arch)]
963 argv = (
964 os.path.basename(self.dpkg_program_file_double.path),
965 "-s", package_file_scenario['package_name'])
966 double = SubprocessDouble(
967 self.dpkg_program_file_double.path, argv=argv)
968 double.register_for_testcase(self)
969 double.set_subprocess_popen_scenario('success')
971 version_field = ""
972 if hasattr(self, 'installed_version'):
973 version_field = "Version: {version}\n".format(
974 version=self.installed_version)
975 status_output = textwrap.dedent("""\
976 {version}""").format(version=version_field)
977 double.set_stdout_content(status_output)
979 def get_subprocess_doubles_matching_argv_prefix(self, argv_prefix):
980 """ Get subprocess doubles for this test case matching the argv. """
981 subprocess_registry = SubprocessDouble.get_registry_for_testcase(self)
982 subprocess_doubles = [
983 double for double in subprocess_registry.values()
984 if double.argv[:len(argv_prefix)] == argv_prefix
986 return subprocess_doubles
989 class version_check_ArchitectureMatchTestCase(version_check_TestCase):
990 """ Test cases for `version_check` when host architecture matches. """
992 host_architecture_scenarios = [
993 ('host-arch foo', {
994 'host_architecture': "foo",
998 package_architecture_scenarios = [
999 ('one binary', {
1000 'test_architectures': ["foo"],
1002 ('three binaries', {
1003 'test_architectures': ["foo", "bar", "baz"],
1005 ('all-arch binary', {
1006 'test_architectures': ["all"],
1010 scenarios = testscenarios.multiply_scenarios(
1011 host_architecture_scenarios,
1012 package_architecture_scenarios,
1013 version_check_TestCase.version_scenarios)
1015 def test_emits_debug_message_showing_architecture(self):
1016 """ Should emit a debug message for the specified architecture. """
1017 test_architecture = self.getUniqueString()
1018 subprocess_double = self.dpkg_print_architecture_subprocess_double
1019 subprocess_double.set_stdout_content(
1020 "{arch}\n".format(arch=test_architecture))
1021 self.test_args['debug'] = True
1022 try:
1023 dput.dput.version_check(**self.test_args)
1024 except FakeSystemExit:
1025 pass
1026 expected_output = textwrap.dedent("""\
1027 D: detected architecture: '{arch}'
1028 """).format(arch=test_architecture)
1029 self.assertIn(expected_output, sys.stdout.getvalue())
1031 def test_omits_stderr_output_message_when_not_debug(self):
1032 """ Should omit any debug messages for `stderr` output. """
1033 doubles = self.get_subprocess_doubles_matching_argv_prefix(
1034 ("dpkg", "--print-architecture"))
1035 if self.test_architectures:
1036 doubles.extend(
1037 self.get_subprocess_doubles_matching_argv_prefix(
1038 ("dpkg", "--field")))
1039 doubles.extend(
1040 self.get_subprocess_doubles_matching_argv_prefix(
1041 ("dpkg", "-s")))
1042 for double in doubles:
1043 double.set_stderr_content(self.getUniqueString())
1044 self.test_args['debug'] = False
1045 try:
1046 dput.dput.version_check(**self.test_args)
1047 except FakeSystemExit:
1048 pass
1049 message_snippet = " stderr output:"
1050 self.assertNotIn(message_snippet, sys.stdout.getvalue())
1052 def test_emits_debug_message_for_architecture_stderr_output(self):
1053 """ Should emit debug message for Dpkg architecture `stderr`. """
1054 subprocess_double = self.dpkg_print_architecture_subprocess_double
1055 test_output = self.getUniqueString()
1056 subprocess_double.set_stderr_content(test_output)
1057 self.test_args['debug'] = True
1058 try:
1059 dput.dput.version_check(**self.test_args)
1060 except FakeSystemExit:
1061 pass
1062 expected_output = textwrap.dedent("""\
1063 D: dpkg-architecture stderr output: {output!r}
1064 """).format(output=test_output)
1065 self.assertIn(expected_output, sys.stdout.getvalue())
1067 def test_emits_debug_message_for_field_stderr_output(self):
1068 """ Should emit debug message for Dpkg fields `stderr`. """
1069 test_output = self.getUniqueString()
1070 for double in self.get_subprocess_doubles_matching_argv_prefix(
1071 ("dpkg", "--field")):
1072 double.set_stderr_content(test_output)
1073 self.test_args['debug'] = True
1074 try:
1075 dput.dput.version_check(**self.test_args)
1076 except FakeSystemExit:
1077 pass
1078 expected_output = textwrap.dedent("""\
1079 D: dpkg stderr output: {output!r}
1080 """).format(output=test_output)
1081 self.assertIn(expected_output, sys.stdout.getvalue())
1083 def test_emits_debug_message_for_status_stderr_output(self):
1084 """ Should emit debug message for Dpkg status `stderr`. """
1085 test_output = self.getUniqueString()
1086 for double in self.get_subprocess_doubles_matching_argv_prefix(
1087 ("dpkg", "-s")):
1088 double.set_stderr_content(test_output)
1089 self.test_args['debug'] = True
1090 try:
1091 dput.dput.version_check(**self.test_args)
1092 except FakeSystemExit:
1093 pass
1094 expected_output = textwrap.dedent("""\
1095 D: dpkg stderr output: {output!r}
1096 """).format(output=test_output)
1097 self.assertIn(expected_output, sys.stdout.getvalue())
1099 def test_emits_expected_debug_message_for_installed_version(self):
1100 """ Should emit debug message for package installed version. """
1101 self.test_args['debug'] = True
1102 message_lead = "D: Installed-Version:"
1103 if hasattr(self, 'installed_version'):
1104 dput.dput.version_check(**self.test_args)
1105 expected_output = "{lead} {version}".format(
1106 lead=message_lead, version=self.installed_version)
1107 self.assertIn(expected_output, sys.stdout.getvalue())
1108 else:
1109 with testtools.ExpectedException(FakeSystemExit):
1110 dput.dput.version_check(**self.test_args)
1111 self.assertNotIn(message_lead, sys.stdout.getvalue())
1113 def test_emits_expected_debug_message_for_upload_version(self):
1114 """ Should emit debug message for package upload version. """
1115 self.test_args['debug'] = True
1116 message_lead = "D: Check-Version:"
1117 if hasattr(self, 'installed_version'):
1118 dput.dput.version_check(**self.test_args)
1119 expected_output = "{lead} {version}".format(
1120 lead=message_lead, version=self.upload_version)
1121 self.assertIn(expected_output, sys.stdout.getvalue())
1122 else:
1123 with testtools.ExpectedException(FakeSystemExit):
1124 dput.dput.version_check(**self.test_args)
1125 self.assertNotIn(message_lead, sys.stdout.getvalue())
1128 class version_check_ArchitectureMismatchTestCase(version_check_TestCase):
1129 """ Test cases for `version_check` when no match to host architecture. """
1131 host_architecture_scenarios = [
1132 ('host-arch spam', {
1133 'host_architecture': "spam",
1137 package_architecture_scenarios = [
1138 ('one binary', {
1139 'test_architectures': ["foo"],
1143 scenarios = testscenarios.multiply_scenarios(
1144 host_architecture_scenarios,
1145 package_architecture_scenarios,
1146 version_check_TestCase.version_scenarios)
1148 def test_emits_debug_message_stating_arch_mismatch(self):
1149 """ Should emit a debug message stating the architecture mismatch. """
1150 self.test_args['debug'] = True
1151 dput.dput.version_check(**self.test_args)
1152 file_scenarios = dict(self.package_file_scenarios)
1153 for arch in self.test_architectures:
1154 file_scenario_name = "package-arch {arch}".format(arch=arch)
1155 file_scenario = file_scenarios[file_scenario_name]
1156 file_double = file_scenario['file_double']
1157 expected_output = textwrap.dedent("""\
1158 D: not install-checking {path} due to arch mismatch
1159 """).format(path=file_double.path)
1160 self.expectThat(
1161 sys.stdout.getvalue(),
1162 testtools.matchers.Contains(expected_output))
1165 class version_check_PackageNotInstalledTestCase(version_check_TestCase):
1166 """ Test cases for `version_check` when package is not installed. """
1168 host_architecture_scenarios = [
1169 ('host-arch foo', {
1170 'host_architecture': "foo",
1174 package_architecture_scenarios = [
1175 ('one binary', {
1176 'test_architectures': ["foo"],
1180 version_scenarios = [
1181 ('version none', {
1182 'upload_version': "lorem",
1186 scenarios = testscenarios.multiply_scenarios(
1187 host_architecture_scenarios,
1188 package_architecture_scenarios,
1189 version_scenarios)
1191 def test_emits_message_stating_package_not_installed(self):
1192 """ Should emit message stating package is not installed. """
1193 with testtools.ExpectedException(FakeSystemExit):
1194 dput.dput.version_check(**self.test_args)
1195 expected_output = textwrap.dedent("""\
1196 Uninstalled Package. Test it before uploading it.
1197 """)
1198 self.assertIn(expected_output, sys.stdout.getvalue())
1200 def test_calls_sys_exit_with_error_status(self):
1201 """ Should call `sys.exit` with exit status indicating error. """
1202 with testtools.ExpectedException(FakeSystemExit):
1203 dput.dput.version_check(**self.test_args)
1204 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
1207 class check_upload_logfile_TestCase(
1208 testscenarios.WithScenarios,
1209 testtools.TestCase):
1210 """ Base for test cases for `check_upload_logfile` function. """
1212 scenarios = NotImplemented
1214 def setUp(self):
1215 """ Set up test fixtures. """
1216 super(check_upload_logfile_TestCase, self).setUp()
1217 patch_system_interfaces(self)
1219 set_config(self, 'exist-simple')
1220 patch_runtime_config_options(self)
1222 setup_changes_file_fixtures(self)
1223 self.set_changes_file_scenario('no-format')
1225 self.set_test_args()
1227 patch_os_path_exists(self)
1229 self.set_upload_log_file_double()
1231 setup_file_double_behaviour(
1232 self,
1233 [self.changes_file_double, self.upload_log_file_double])
1235 def set_changes_file_scenario(self, name):
1236 """ Set the package changes file based on scenario name. """
1237 scenarios = make_changes_file_scenarios()
1238 scenario = dict(scenarios)[name]
1239 self.changes_file_double = scenario['file_double']
1241 def set_test_args(self):
1242 """ Set the arguments for the test call to the function. """
1243 self.test_args = dict(
1244 changes_file=self.changes_file_double.path,
1245 host=self.test_host,
1246 fqdn=self.runtime_config_parser.get(self.test_host, 'fqdn'),
1247 check_only=False,
1248 call_lintian=False,
1249 force_upload=False,
1250 debug=False,
1252 custom_args = getattr(self, 'custom_args', {})
1253 self.test_args.update(custom_args)
1255 def set_upload_log_file_double(self):
1256 """ Set the upload log file double. """
1257 (file_dir_path, changes_file_name) = os.path.split(
1258 self.changes_file_double.path)
1259 (file_basename, __) = os.path.splitext(changes_file_name)
1260 file_name = "{basename}.{host}.upload".format(
1261 basename=file_basename, host=self.test_host)
1262 file_path = os.path.join(file_dir_path, file_name)
1264 double = FileDouble(file_path)
1265 double.set_os_path_exists_scenario(
1266 getattr(self, 'path_exists_scenario_name', 'exist'))
1267 double.set_open_scenario(
1268 getattr(self, 'open_scenario_name', 'okay'))
1270 file_content = getattr(self, 'log_content', "")
1271 double.fake_file = StringIO(file_content)
1273 self.upload_log_file_double = double
1276 class check_upload_logfile_SuccessTestCase(check_upload_logfile_TestCase):
1277 """ Success test cases for `check_upload_logfile` function. """
1279 scenarios = [
1280 ('simple', {}),
1281 ('not_exist', {
1282 'path_exists_scenario_name': 'not_exist',
1283 'open_scenario_name': 'nonexist',
1285 ('check_only', {
1286 'custom_args': {
1287 'check_only': True,
1290 ('force_upload', {
1291 'custom_args': {
1292 'force_upload': True,
1295 ('check_only and force_upload', {
1296 'custom_args': {
1297 'check_only': True,
1298 'force_upload': True,
1301 ('method-ftp not-uploaded', {
1302 'config_method': "ftp",
1303 'config_fqdn': "quux.example.com",
1304 'log_content': "foo lorem-ipsum bar\n",
1306 ('method-local not-uploaded', {
1307 'config_method': "local",
1308 'log_content': "foo lorem-ipsum bar\n",
1312 for (scenario_name, scenario) in scenarios:
1313 scenario['expected_result'] = None
1314 del scenario_name, scenario
1316 def test_returns_expected_result(self):
1317 """ Should return expected result for the scenario. """
1318 try:
1319 result = dput.dput.check_upload_logfile(**self.test_args)
1320 except FakeSystemExit:
1321 pass
1322 self.assertEqual(self.expected_result, result)
1325 class check_upload_logfile_ExitTestCase(check_upload_logfile_TestCase):
1326 """ Exit test cases for `check_upload_logfile` function. """
1328 scenarios = [
1329 ('uploaded', {
1330 'config_method': "ftp",
1331 'config_fqdn': "quux.example.com",
1332 'log_content': "foo quux.example.com bar\n",
1333 'expected_exit_status': 0,
1337 def test_calls_sys_exit_with_expected_exit_status(self):
1338 """ Should call `sys.exit` with expected exit status. """
1339 with testtools.ExpectedException(FakeSystemExit):
1340 dput.dput.check_upload_logfile(**self.test_args)
1341 sys.exit.assert_called_with(self.expected_exit_status)
1344 class check_upload_logfile_ErrorTestCase(check_upload_logfile_TestCase):
1345 """ Error test cases for `check_upload_logfile` function. """
1347 log_file_scenarios = [
1348 ('denied', {
1349 'path_exists_scenario_name': 'exist',
1350 'open_scenario_name': 'read_denied',
1351 'expected_exit_status': 1,
1355 scenarios = log_file_scenarios
1357 def test_calls_sys_exit_with_expected_exit_status(self):
1358 """ Should call `sys.exit` with expected exit status. """
1359 with testtools.ExpectedException(FakeSystemExit):
1360 dput.dput.check_upload_logfile(**self.test_args)
1361 sys.exit.assert_called_with(self.expected_exit_status)
1364 # Copyright © 2015–2016 Ben Finney <bignose@debian.org>
1366 # This is free software: you may copy, modify, and/or distribute this work
1367 # under the terms of the GNU General Public License as published by the
1368 # Free Software Foundation; version 3 of that license or any later version.
1369 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
1372 # Local variables:
1373 # coding: utf-8
1374 # mode: python
1375 # End:
1376 # vim: fileencoding=utf-8 filetype=python :