1 # -*- coding: utf-8; -*-
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # This is free software, and you are welcome to redistribute it under
7 # certain conditions; see the end of this file for copyright
8 # information, grant of license, and disclaimer of warranty.
10 """ Unit tests for ‘dput’ module. """
12 from __future__
import (absolute_import
, unicode_literals
)
27 import testtools
.matchers
32 from dput
.helper
import dputhelper
42 patch_system_interfaces
,
43 make_fake_file_scenarios
,
44 setup_fake_file_fixtures
,
45 set_fake_file_scenario
,
46 setup_file_double_behaviour
,
52 patch_subprocess_popen
,
53 patch_subprocess_call
,
54 patch_subprocess_check_call
,
56 from .test_configfile
import (
58 patch_runtime_config_options
,
60 from .test_changesfile
import (
61 set_fake_upload_file_paths
,
62 setup_upload_file_fixtures
,
63 make_upload_files_params
,
64 make_changes_document
,
65 make_changes_file_scenarios
,
66 setup_changes_file_fixtures
,
67 set_changes_file_scenario
,
71 class hexify_string_TestCase(
72 testscenarios
.WithScenarios
,
74 """ Success test cases for `hexify_string` function. """
79 'expected_result': str(""),
82 'input_bytes': b
"\x00",
83 'expected_result': str("00"),
87 b
"Lorem ipsum dolor sit amet"
89 'expected_result': str(
90 "4c6f72656d20697073756d20646f6c6f722073697420616d6574"),
92 ('UTF-8 encoded text', {
93 'input_bytes': b
"::".join(text
.encode('utf-8') for text
in [
106 'expected_result': str("3a3a").join([
107 "d8a7d984d8b3d991d984d8a7d98520d8b9d984d98ad983d985",
108 "e2a093e2a091e2a087e2a087e2a095",
111 "ce93ceb5ceb9ceac20cf83ceb1cf82",
113 "e0a4a8e0a4aee0a4b8e0a58de0a4a4e0a587",
114 "e38193e38293e381abe381a1e381af",
115 "ec9588eb8595ed9598ec84b8ec9a94",
116 "d097d0b4d180d0b0cc81d0b2d181d182d0b2d183d0b9d182d0b5",
117 "e0aeb5e0aea3e0ae95e0af8de0ae95e0aeaee0af8d",
122 def test_returns_expected_result_for_input(self
):
123 """ Should return expected result for input. """
124 result
= dput
.dput
.hexify_string(self
.input_bytes
)
125 self
.assertEqual(self
.expected_result
, result
)
128 class checksum_test_TestCase(
129 testscenarios
.WithScenarios
,
131 """ Base for test cases for `checksum_test` function. """
133 scenarios
= NotImplemented
136 """ Set up test fixtures. """
137 super(checksum_test_TestCase
, self
).setUp()
138 patch_system_interfaces(self
)
140 setup_fake_file_fixtures(self
)
141 set_fake_file_scenario(self
, self
.fake_file_scenario_name
)
143 file_content
= self
.file_double
.fake_file
.getvalue().encode('utf-8')
144 self
.file_double
.fake_file
= io
.BytesIO(file_content
)
148 def set_test_args(self
):
149 """ Set the arguments for the test call to the function. """
150 self
.test_args
= dict(
151 filename
=self
.file_double
.path
,
152 hash_name
=self
.hash_name
,
156 class checksum_test_SuccessTestCase(checksum_test_TestCase
):
157 """ Success test cases for `checksum_test` function. """
162 'test_hash_func': hashlib
.md5
,
166 'test_hash_func': hashlib
.sha1
,
170 fake_file_scenarios
= list(
172 for (name
, scenario
) in make_fake_file_scenarios().items()
173 if not name
.startswith('error'))
175 scenarios
= testscenarios
.multiply_scenarios(
176 hash_scenarios
, fake_file_scenarios
)
178 def test_returns_expected_result_for_input(self
):
179 """ Should return expected result for specified inputs. """
180 expected_hash
= self
.test_hash_func(
181 self
.file_double
.fake_file
.getvalue())
182 expected_result
= expected_hash
.hexdigest()
183 result
= dput
.dput
.checksum_test(**self
.test_args
)
184 self
.assertEqual(expected_result
, result
)
187 class checksum_test_ErrorTestCase(checksum_test_TestCase
):
188 """ Error test cases for `checksum_test` function. """
196 fake_file_scenarios
= list(
198 for (name
, scenario
) in make_fake_file_scenarios().items()
199 if name
.startswith('error'))
201 scenarios
= testscenarios
.multiply_scenarios(
202 hash_scenarios
, fake_file_scenarios
)
204 def test_calls_sys_exit_if_error_reading_file(self
):
205 """ Should call `sys.exit` if unable to read the file. """
206 set_fake_file_scenario(self
, 'error-read-denied')
207 with testtools
.ExpectedException(FakeSystemExit
):
208 dput
.dput
.checksum_test(**self
.test_args
)
209 expected_output
= textwrap
.dedent("""\
211 """).format(path
=self
.file_double
.path
)
212 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
213 sys
.exit
.assert_called_with(EXIT_STATUS_FAILURE
)
216 class verify_signature_TestCase(
217 testscenarios
.WithScenarios
,
219 """ Base for test cases for `verify_signature` function. """
221 scenarios
= NotImplemented
226 unsigned_upload
=False,
232 """ Set up test fixtures. """
233 super(verify_signature_TestCase
, self
).setUp()
234 patch_system_interfaces(self
)
238 getattr(self
, 'config_scenario_name', 'exist-simple'))
239 patch_runtime_config_options(self
)
241 self
.set_changes_file_scenario('okay')
242 self
.set_dsc_file_scenario('okay')
245 setup_fake_file_fixtures(self
)
246 set_fake_file_scenario(self
, 'exist-minimal')
248 self
.patch_check_file_signature()
250 def patch_check_file_signature(self
):
251 """ Patch the `check_file_signature` function for this test case. """
252 func_patcher
= mock
.patch
.object(
253 dput
.crypto
, "check_file_signature", autospec
=True)
255 self
.addCleanup(func_patcher
.stop
)
257 def set_changes_file_scenario(self
, name
):
258 """ Set the changes file scenario for this test case. """
259 file_double
= FileDouble(tempfile
.mktemp())
260 file_double
.set_open_scenario(name
)
261 file_double
.register_for_testcase(self
)
262 self
.changes_file_double
= file_double
264 def set_dsc_file_scenario(self
, name
):
265 """ Set the source control file scenario for this test case. """
266 file_double
= FileDouble(tempfile
.mktemp())
267 file_double
.set_open_scenario(name
)
268 file_double
.register_for_testcase(self
)
269 self
.dsc_file_double
= file_double
271 def set_test_args(self
):
272 """ Set test args for this test case. """
273 extra_args
= getattr(self
, 'extra_args', {})
274 self
.test_args
= self
.default_args
.copy()
275 self
.test_args
['config'] = self
.runtime_config_parser
276 self
.test_args
['changes_file_path'] = self
.changes_file_double
.path
277 self
.test_args
['dsc_file_path'] = self
.dsc_file_double
.path
278 self
.test_args
.update(extra_args
)
281 class verify_signature_DebugMessageTestCase(verify_signature_TestCase
):
282 """ Test cases for `verify_signature` debug messages. """
288 def test_emits_debug_message_showing_files(self
):
289 """ Should emit a debug message for the specified files. """
290 self
.test_args
['debug'] = True
291 dput
.dput
.verify_signature(**self
.test_args
)
292 expected_output
= textwrap
.dedent("""\
293 D: upload control file: {changes_path}
294 D: source control file: {dsc_path}
296 changes_path
=self
.test_args
['changes_file_path'],
297 dsc_path
=self
.test_args
['dsc_file_path'])
298 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
301 class verify_signature_ChecksTestCase(verify_signature_TestCase
):
302 """ Test cases for `verify_signature` checks. """
306 'expected_checks': ['changes', 'dsc'],
308 ('source unsigned', {
310 'binary_upload': False,
311 'unsigned_upload': True,
313 'expected_checks': [],
315 ('source check-only', {
317 'binary_upload': False,
320 'expected_checks': ['changes', 'dsc'],
322 ('source allow_unsigned_uploads', {
324 'binary_upload': False,
326 'config_scenario_name': 'exist-default-not-unsigned',
327 'expected_checks': ['changes', 'dsc'],
329 ('binary unsigned', {
331 'binary_upload': True,
332 'unsigned_upload': True,
334 'expected_checks': [],
336 ('binary check-only', {
338 'binary_upload': True,
341 'expected_checks': ['changes'],
343 ('binary allow_unsigned_uploads', {
345 'binary_upload': True,
347 'config_scenario_name': 'exist-default-not-unsigned',
348 'expected_checks': ['changes'],
352 def test_checks_changes_signature_only_when_expected(self
):
353 """ Should only check the changes document signature if expected. """
354 dput
.dput
.verify_signature(**self
.test_args
)
355 check_call
= mock
.call(self
.changes_file_double
.fake_file
)
356 if 'changes' in self
.expected_checks
:
359 dput
.crypto
.check_file_signature
.call_args_list
)
363 dput
.crypto
.check_file_signature
.call_args_list
)
365 def test_calls_sys_exit_when_check_file_signature_raises_error(self
):
366 """ Should call `sys.exit` when `check_file_signature` error. """
367 if not self
.expected_checks
:
368 self
.skipTest("No signature checks requested")
369 dput
.crypto
.check_file_signature
.side_effect
= gpgme
.GpgmeError
370 with testtools
.ExpectedException(FakeSystemExit
):
371 dput
.dput
.verify_signature(**self
.test_args
)
372 sys
.exit
.assert_called_with(EXIT_STATUS_FAILURE
)
374 def test_checks_dsc_signature_only_when_expected(self
):
375 """ Should only check the ‘dsc’ document signature if expected. """
376 dput
.dput
.verify_signature(**self
.test_args
)
377 check_call
= mock
.call(self
.dsc_file_double
.fake_file
)
378 if 'dsc' in self
.expected_checks
:
381 dput
.crypto
.check_file_signature
.call_args_list
)
385 dput
.crypto
.check_file_signature
.call_args_list
)
388 class create_upload_file_TestCase(
389 testscenarios
.WithScenarios
,
391 """ Test cases for `create_upload_file` function. """
397 'test_fqdn': "localhost",
399 ('log file exists', {
400 'fake_file_scenario_name': 'exist-empty',
401 'file_access_scenario_name': 'okay',
402 'expected_open_mode': 'a',
407 """ Set up test fixtures. """
408 super(create_upload_file_TestCase
, self
).setUp()
409 patch_system_interfaces(self
)
411 if not hasattr(self
, 'test_host'):
412 self
.test_host
= make_unique_slug(self
)
413 self
.test_fqdn
= make_unique_slug(self
)
415 setup_changes_file_fixtures(self
)
416 set_changes_file_scenario(
418 getattr(self
, 'changes_file_scenario_name', 'exist-minimal'))
420 set_fake_upload_file_paths(self
)
423 setup_fake_file_fixtures(self
)
424 self
.set_fake_file_scenario(
425 getattr(self
, 'fake_file_scenario_name', 'not-found'))
426 if not hasattr(self
, 'expected_open_mode'):
427 self
.expected_open_mode
= 'w'
429 self
.set_upload_log_file_double()
430 self
.set_upload_log_file_params()
432 patch_os_access(self
)
434 def set_test_args(self
):
435 """ Set the arguments for the test call to the function. """
436 changes_file_name
= os
.path
.basename(self
.changes_file_double
.path
)
437 self
.package_file_name
= (
438 os
.path
.splitext(changes_file_name
)[0] + ".source")
439 self
.test_args
= dict(
440 package
=self
.package_file_name
,
443 path
=os
.path
.dirname(self
.changes_file_double
.path
),
444 files_to_upload
=self
.fake_upload_file_paths
,
448 def set_upload_log_file_double(self
):
449 """ Set the fake upload log file. """
450 changes_file_name
= os
.path
.basename(self
.changes_file_double
.path
)
451 changes_file_name_base
= os
.path
.splitext(changes_file_name
)[0]
452 upload_log_file_name
= "{base}.{host}.upload".format(
453 base
=changes_file_name_base
, host
=self
.test_host
)
454 self
.file_double
.path
= os
.path
.join(
455 os
.path
.dirname(self
.changes_file_double
.path
),
456 upload_log_file_name
)
457 self
.upload_log_file_double
= self
.file_double
458 self
.upload_log_file_double
.set_os_access_scenario(
460 self
, 'file_access_scenario_name', 'not_exist'))
461 self
.upload_log_file_double
.register_for_testcase(self
)
463 def set_upload_log_file_params(self
):
464 """ Set the parameters for the upload log file. """
465 self
.upload_log_file_double
.set_open_scenario('nonexist')
467 def set_fake_file_scenario(self
, name
):
468 """ Set the output file scenario for this test case. """
469 self
.fake_file_scenario
= self
.fake_file_scenarios
[name
]
470 self
.file_double
= self
.fake_file_scenario
['file_double']
472 def test_emits_upload_log_debug_message(self
):
473 """ Should emit debug message for creation of upload log file. """
474 self
.test_args
['debug'] = True
475 dput
.dput
.create_upload_file(**self
.test_args
)
476 expected_output
= textwrap
.dedent("""\
477 D: Writing logfile: {path}
478 """).format(path
=self
.upload_log_file_double
.path
)
479 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
481 def test_opens_log_file_with_expected_mode(self
):
482 """ Should open log file with expected write mode. """
483 dput
.dput
.create_upload_file(**self
.test_args
)
484 builtins
.open.assert_called_with(
485 self
.upload_log_file_double
.path
, self
.expected_open_mode
)
487 def test_writes_expected_message_for_each_file(self
):
488 """ Should write log message for each upload file path. """
489 with mock
.patch
.object(
490 self
.upload_log_file_double
.fake_file
, "close", autospec
=True):
491 dput
.dput
.create_upload_file(**self
.test_args
)
492 expected_host
= self
.test_host
493 expected_fqdn
= self
.test_fqdn
494 for file_path
in self
.fake_upload_file_paths
:
496 "Successfully uploaded {filename}"
497 " to {fqdn} for {host}.\n").format(
498 filename
=os
.path
.basename(file_path
),
499 fqdn
=expected_fqdn
, host
=expected_host
)
501 self
.upload_log_file_double
.fake_file
.getvalue(),
502 testtools
.matchers
.Contains(expected_message
))
504 def test_calls_sys_exit_if_write_denied(self
):
505 """ Should call `sys.exit` if write permission denied. """
506 self
.upload_log_file_double
.set_os_access_scenario('read_only')
507 self
.upload_log_file_double
.set_open_scenario('write_denied')
508 with testtools
.ExpectedException(FakeSystemExit
):
509 dput
.dput
.create_upload_file(**self
.test_args
)
510 expected_output
= textwrap
.dedent("""\
511 Could not write {path}
512 """).format(path
=self
.upload_log_file_double
.path
)
513 self
.assertIn(expected_output
, sys
.stderr
.getvalue())
514 sys
.exit
.assert_called_with(EXIT_STATUS_FAILURE
)
517 class run_lintian_test_TestCase(testtools
.TestCase
):
518 """ Test cases for `run_lintian_test` function. """
525 """ Set up test fixtures. """
526 super(run_lintian_test_TestCase
, self
).setUp()
527 patch_system_interfaces(self
)
529 patch_signal_signal(self
)
530 patch_os_access(self
)
532 setup_changes_file_fixtures(self
)
533 set_changes_file_scenario(
535 getattr(self
, 'changes_file_scenario_name', 'exist-minimal'))
539 self
.patch_lintian_program_file()
540 self
.lintian_program_file_double
.set_os_access_scenario('okay')
542 patch_subprocess_check_call(self
)
543 self
.set_lintian_subprocess_double()
545 def set_test_args(self
):
546 """ Set the arguments for the test call to the function. """
547 self
.test_args
= dict(
548 changes_file
=self
.changes_file_double
.path
,
551 def patch_lintian_program_file(self
):
552 """ Patch the Lintian program file for this test case. """
553 file_path
= '/usr/bin/lintian'
554 file_double
= FileDouble(file_path
)
555 file_double
.register_for_testcase(self
)
556 self
.lintian_program_file_double
= file_double
558 def set_lintian_subprocess_double(self
):
559 """ Set the test double for the Lintian subprocess. """
561 os
.path
.basename(self
.lintian_program_file_double
.path
),
563 double
= SubprocessDouble(
564 self
.lintian_program_file_double
.path
, argv
=argv
)
565 double
.register_for_testcase(self
)
566 double
.set_subprocess_check_call_scenario('not_found')
567 self
.lintian_subprocess_double
= double
569 def test_calls_sys_exit_if_read_denied(self
):
570 """ Should call `sys.exit` if read permission denied. """
571 self
.changes_file_double
.set_os_access_scenario('denied')
572 self
.changes_file_double
.set_open_scenario('read_denied')
573 with testtools
.ExpectedException(FakeSystemExit
):
574 dput
.dput
.run_lintian_test(**self
.test_args
)
575 expected_output
= textwrap
.dedent("""\
577 """).format(path
=self
.changes_file_double
.path
)
578 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
579 sys
.exit
.assert_called_with(EXIT_STATUS_FAILURE
)
581 def test_emits_message_when_lintian_access_denied(self
):
582 """ Should emit an explanatory message when Lintian access denied. """
583 self
.lintian_program_file_double
.set_os_access_scenario('denied')
584 dput
.dput
.run_lintian_test(**self
.test_args
)
585 expected_output
= textwrap
.dedent("""\
586 lintian is not installed, skipping package test.
588 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
590 def test_omits_check_call_when_lintian_access_denied(self
):
591 """ Should omit `subprocess.check_call` when Lintian access denied. """
592 self
.lintian_program_file_double
.set_os_access_scenario('denied')
593 dput
.dput
.run_lintian_test(**self
.test_args
)
594 self
.assertFalse(subprocess
.check_call
.called
)
596 def test_sets_default_signal_handler_for_pipe_signal(self
):
597 """ Should set default signal handler for PIPE signal. """
598 with testtools
.ExpectedException(FakeSystemExit
):
599 dput
.dput
.run_lintian_test(**self
.test_args
)
600 signal
.signal
.assert_called_with(signal
.SIGPIPE
, signal
.SIG_DFL
)
602 def test_calls_check_call_with_expected_args(self
):
603 """ Should call `subprocess.check_call` with expected arguments. """
604 lintian_command_argv
= [
605 os
.path
.basename(self
.lintian_program_file_double
.path
),
606 "-i", self
.changes_file_double
.path
]
607 with testtools
.ExpectedException(FakeSystemExit
):
608 dput
.dput
.run_lintian_test(**self
.test_args
)
609 subprocess
.check_call
.assert_called_with(lintian_command_argv
)
611 def test_resets_signal_handler_for_pipe_signal_when_lintian_success(self
):
612 """ Should reset signal handler for PIPE when Lintian succeeds. """
613 fake_orig_pipe_signal
= self
.getUniqueInteger()
614 signal
.signal
.return_value
= fake_orig_pipe_signal
615 subprocess_double
= self
.lintian_subprocess_double
616 subprocess_double
.set_subprocess_check_call_scenario('success')
617 dput
.dput
.run_lintian_test(**self
.test_args
)
619 mock
.call(signal
.SIGPIPE
, signal
.SIG_DFL
),
620 mock
.call(signal
.SIGPIPE
, fake_orig_pipe_signal
),
622 signal
.signal
.assert_has_calls(expected_calls
)
624 def test_calls_sys_exit_when_lintian_failure(self
):
625 """ Should call `sys.exit` when Lintian reports failure. """
626 with testtools
.ExpectedException(FakeSystemExit
):
627 dput
.dput
.run_lintian_test(**self
.test_args
)
628 sys
.exit
.assert_called_with(EXIT_STATUS_FAILURE
)
630 def test_emits_message_when_lintian_failure(self
):
631 """ Should emit explanatory message when Lintian reports failure. """
632 with testtools
.ExpectedException(FakeSystemExit
):
633 dput
.dput
.run_lintian_test(**self
.test_args
)
634 expected_output
= textwrap
.dedent("""\
636 Lintian says this package is not compliant with ...
640 sys
.stdout
.getvalue(),
641 testtools
.matchers
.DocTestMatches(
642 expected_output
, doctest
.ELLIPSIS
))
645 class dinstall_caller_TestCase(
647 """ Test cases for `dinstall_caller` function. """
650 """ Set up test fixtures. """
651 super(dinstall_caller_TestCase
, self
).setUp()
652 patch_system_interfaces(self
)
654 patch_subprocess_check_call(self
)
658 self
.patch_ssh_program_file()
659 self
.set_subprocess_double()
660 self
.set_expected_command_argv()
662 def patch_ssh_program_file(self
):
663 """ Patch the SSH program file for this test case. """
664 file_path
= '/usr/bin/ssh'
665 file_double
= FileDouble(file_path
)
666 file_double
.register_for_testcase(self
)
667 self
.ssh_program_file_double
= file_double
669 def set_subprocess_double(self
):
670 """ Set the test double for the subprocess. """
672 os
.path
.basename(self
.ssh_program_file_double
.path
),
676 "dinstall", "-n", ARG_ANY
]
677 double
= SubprocessDouble(self
.ssh_program_file_double
.path
, argv
=argv
)
678 double
.register_for_testcase(self
)
679 double
.set_subprocess_check_call_scenario('success')
680 self
.ssh_subprocess_double
= double
682 def set_test_args(self
):
683 """ Set the arguments for the test call to the function. """
684 self
.test_args
= dict(
685 filename
=tempfile
.mktemp(),
686 host
=make_unique_slug(self
),
687 fqdn
=make_unique_slug(self
),
688 login
=make_unique_slug(self
),
689 incoming
=tempfile
.mktemp(),
693 def set_expected_command_argv(self
):
694 """ Set the expected argv for the command. """
695 expected_fqdn
= self
.test_args
['fqdn']
696 self
.expected_host_spec
= "{user}@{host}".format(
697 user
=self
.test_args
['login'], host
=expected_fqdn
)
698 self
.expected_command_argv
= [
699 os
.path
.basename(self
.ssh_program_file_double
.path
),
700 self
.expected_host_spec
,
701 "cd", self
.test_args
['incoming'], ";",
702 "dinstall", "-n", self
.test_args
['filename'],
705 def test_calls_check_call_with_expected_args(self
):
706 """ Should call `subprocess.check_call` with expected arguments. """
707 dput
.dput
.dinstall_caller(**self
.test_args
)
708 subprocess
.check_call
.assert_called_with(self
.expected_command_argv
)
710 def test_emits_remote_command_debug_message(self
):
711 """ Should emit a debug message for the remote command. """
712 self
.test_args
['debug'] = True
713 dput
.dput
.dinstall_caller(**self
.test_args
)
714 expected_output
= textwrap
.dedent("""\
715 D: Logging into {user}@{host}:{incoming_path}
716 D: dinstall -n {file_path}
718 user
=self
.test_args
['login'], host
=self
.test_args
['host'],
719 incoming_path
=self
.test_args
['incoming'],
720 file_path
=self
.test_args
['filename'])
722 sys
.stdout
.getvalue(),
723 testtools
.matchers
.DocTestMatches(
724 expected_output
, doctest
.ELLIPSIS
))
726 def test_calls_sys_exit_when_dinstall_failure(self
):
727 """ Should call `sys.exit` when Dinstall reports failure. """
728 self
.ssh_subprocess_double
.set_subprocess_check_call_scenario(
730 with testtools
.ExpectedException(FakeSystemExit
):
731 dput
.dput
.dinstall_caller(**self
.test_args
)
732 sys
.exit
.assert_called_with(EXIT_STATUS_FAILURE
)
734 def test_emits_message_when_dinstall_failure(self
):
735 """ Should emit explanatory message when Dinstall reports failure. """
736 self
.ssh_subprocess_double
.set_subprocess_check_call_scenario(
738 with testtools
.ExpectedException(FakeSystemExit
):
739 dput
.dput
.dinstall_caller(**self
.test_args
)
740 expected_output
= textwrap
.dedent("""\
741 Error occured while trying to connect, or while attempting ...
744 sys
.stdout
.getvalue(),
745 testtools
.matchers
.DocTestMatches(
746 expected_output
, doctest
.ELLIPSIS
))
749 class execute_command_TestCase(
751 """ Test cases for `execute_command` function. """
754 """ Set up test fixtures. """
755 super(execute_command_TestCase
, self
).setUp()
756 patch_system_interfaces(self
)
758 self
.test_position
= make_unique_slug(self
)
759 self
.set_program_file_double()
760 self
.test_command
= self
.getUniqueString()
764 patch_subprocess_call(self
)
765 self
.set_upload_command_scenario('simple')
767 def set_program_file_double(self
):
768 """ Set the file double for the command program. """
769 self
.upload_command_program_file_double
= FileDouble()
771 def set_test_args(self
):
772 """ Set the arguments for the test call to the function. """
773 self
.test_args
= dict(
774 command
=self
.test_command
,
775 position
=self
.test_position
,
779 def set_upload_command_scenario(self
, name
):
780 """ Set the scenario for the upload command behaviour. """
781 double
= SubprocessDouble(
782 self
.upload_command_program_file_double
.path
,
783 argv
=self
.test_command
.split())
784 double
.register_for_testcase(self
)
785 double
.set_subprocess_call_scenario('success')
786 self
.upload_command_subprocess_double
= double
788 def test_emits_debug_message_for_command(self
):
789 """ Should emit a debug message for the specified command. """
790 self
.test_args
['debug'] = True
791 dput
.dput
.execute_command(**self
.test_args
)
792 expected_output
= textwrap
.dedent("""\
793 D: Command: {command}
794 """).format(command
=self
.test_command
)
795 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
797 def test_calls_subprocess_call_with_expected_args(self
):
798 """ Should call `subprocess.call` with expected arguments. """
799 dput
.dput
.execute_command(**self
.test_args
)
800 subprocess
.call
.assert_called_with(self
.test_command
, shell
=True)
802 def test_raises_error_when_command_failure(self
):
803 """ Should raise error when command exits with failure. """
804 double
= self
.upload_command_subprocess_double
805 double
.set_subprocess_call_scenario('failure')
806 with testtools
.ExpectedException(dputhelper
.DputUploadFatalException
):
807 dput
.dput
.execute_command(**self
.test_args
)
809 def test_raises_error_when_command_not_found(self
):
810 """ Should raise error when command not found. """
811 double
= self
.upload_command_subprocess_double
812 double
.set_subprocess_call_scenario('not_found')
813 with testtools
.ExpectedException(dputhelper
.DputUploadFatalException
):
814 dput
.dput
.execute_command(**self
.test_args
)
817 class version_check_TestCase(
818 testscenarios
.WithScenarios
,
820 """ Base for test cases for `version_check` function. """
822 package_file_scenarios
= [
823 ('package-arch {arch}'.format(arch
=arch
), {
824 'file_double': FileDouble(),
825 'field_output': textwrap
.dedent("""\
827 """).format(arch
=arch
),
829 for arch
in ["all", "foo", "bar", "baz"]]
831 version_scenarios
= [
833 'upload_version': "lorem",
836 'upload_version': "lorem",
837 'installed_version': "lorem",
839 ('version unequal', {
840 'upload_version': "lorem",
841 'installed_version': "ipsum",
845 scenarios
= NotImplemented
848 """ Set up test fixtures. """
849 super(version_check_TestCase
, self
).setUp()
850 patch_system_interfaces(self
)
852 self
.patch_dpkg_program_file()
854 if not hasattr(self
, 'test_architectures'):
855 self
.test_architectures
= []
857 setup_changes_file_fixtures(self
)
858 self
.set_changes_file_scenario('no-format')
860 self
.set_upload_files()
861 self
.set_changes_document()
865 patch_subprocess_popen(self
)
866 self
.set_dpkg_print_architecture_scenario('simple')
867 self
.set_dpkg_field_scenario('simple')
868 self
.set_dpkg_status_scenario('simple')
870 def set_changes_file_scenario(self
, name
):
871 """ Set the package changes document based on scenario name. """
872 scenarios
= make_changes_file_scenarios()
873 scenario
= dict(scenarios
)[name
]
874 self
.changes_file_double
= scenario
['file_double']
876 def set_upload_files(self
):
877 """ Set the files marked for upload in this scenario. """
878 package_file_suffix
= ".deb"
879 file_suffix_by_arch
= {
880 arch
: "_{arch}{suffix}".format(
881 arch
=arch
, suffix
=package_file_suffix
)
882 for arch
in self
.test_architectures
}
883 self
.additional_file_suffixes
= list(file_suffix_by_arch
.values())
885 setup_upload_file_fixtures(self
)
887 registry
= FileDouble
.get_registry_for_testcase(self
)
888 for path
in self
.fake_upload_file_paths
:
889 for (arch
, suffix
) in file_suffix_by_arch
.items():
890 if path
.endswith(suffix
):
891 file_double
= registry
[path
]
892 package_file_scenario
= dict(self
.package_file_scenarios
)[
893 "package-arch {arch}".format(arch
=arch
)]
894 package_file_scenario
['file_double'] = file_double
896 def set_changes_document(self
):
897 """ Set the changes document for this test case. """
898 upload_params_by_name
= make_upload_files_params(
899 self
.fake_checksum_by_file
, self
.fake_size_by_file
)
900 self
.changes_document
= make_changes_document(
902 'architecture': " ".join(self
.test_architectures
),
904 upload_params_by_name
=upload_params_by_name
)
906 def set_test_args(self
):
907 """ Set the arguments for the test call to the function. """
908 self
.test_args
= dict(
909 path
=os
.path
.dirname(self
.changes_file_double
.path
),
910 changes
=self
.changes_document
,
914 def patch_dpkg_program_file(self
):
915 """ Patch the Dpkg program file for this test case. """
916 file_path
= '/usr/bin/dpkg'
917 file_double
= FileDouble(file_path
)
918 file_double
.register_for_testcase(self
)
919 self
.dpkg_program_file_double
= file_double
921 def set_dpkg_print_architecture_scenario(self
, name
):
922 """ Set the scenario for ‘dpkg --print-architecture’ behaviour. """
924 os
.path
.basename(self
.dpkg_program_file_double
.path
),
925 "--print-architecture")
926 double
= SubprocessDouble(
927 self
.dpkg_program_file_double
.path
, argv
=argv
)
928 double
.register_for_testcase(self
)
929 double
.set_subprocess_popen_scenario('success')
930 double
.set_stdout_content(self
.host_architecture
)
931 self
.dpkg_print_architecture_subprocess_double
= double
933 def set_dpkg_field_scenario(self
, name
):
934 """ Set the scenario for ‘dpkg --field’ behaviour. """
935 for arch
in self
.test_architectures
:
936 package_file_scenario
= dict(self
.package_file_scenarios
)[
937 'package-arch {arch}'.format(arch
=arch
)]
938 upload_file_path
= package_file_scenario
['file_double'].path
940 os
.path
.basename(self
.dpkg_program_file_double
.path
),
941 "--field", upload_file_path
)
942 double
= SubprocessDouble(
943 self
.dpkg_program_file_double
.path
, argv
=argv
)
944 double
.register_for_testcase(self
)
945 double
.set_subprocess_popen_scenario('success')
947 package_file_scenario
['package_name'] = make_unique_slug(self
)
948 package_file_scenario
['package_version'] = self
.upload_version
949 field_output
= package_file_scenario
['field_output']
950 field_output
+= textwrap
.dedent("""\
954 name
=package_file_scenario
['package_name'],
955 version
=package_file_scenario
['package_version'])
956 double
.set_stdout_content(field_output
)
958 def set_dpkg_status_scenario(self
, name
):
959 """ Set the scenario for ‘dpkg -s’ behaviour. """
960 for arch
in self
.test_architectures
:
961 package_file_scenario
= dict(self
.package_file_scenarios
)[
962 'package-arch {arch}'.format(arch
=arch
)]
964 os
.path
.basename(self
.dpkg_program_file_double
.path
),
965 "-s", package_file_scenario
['package_name'])
966 double
= SubprocessDouble(
967 self
.dpkg_program_file_double
.path
, argv
=argv
)
968 double
.register_for_testcase(self
)
969 double
.set_subprocess_popen_scenario('success')
972 if hasattr(self
, 'installed_version'):
973 version_field
= "Version: {version}\n".format(
974 version
=self
.installed_version
)
975 status_output
= textwrap
.dedent("""\
976 {version}""").format(version
=version_field
)
977 double
.set_stdout_content(status_output
)
979 def get_subprocess_doubles_matching_argv_prefix(self
, argv_prefix
):
980 """ Get subprocess doubles for this test case matching the argv. """
981 subprocess_registry
= SubprocessDouble
.get_registry_for_testcase(self
)
982 subprocess_doubles
= [
983 double
for double
in subprocess_registry
.values()
984 if double
.argv
[:len(argv_prefix
)] == argv_prefix
986 return subprocess_doubles
989 class version_check_ArchitectureMatchTestCase(version_check_TestCase
):
990 """ Test cases for `version_check` when host architecture matches. """
992 host_architecture_scenarios
= [
994 'host_architecture': "foo",
998 package_architecture_scenarios
= [
1000 'test_architectures': ["foo"],
1002 ('three binaries', {
1003 'test_architectures': ["foo", "bar", "baz"],
1005 ('all-arch binary', {
1006 'test_architectures': ["all"],
1010 scenarios
= testscenarios
.multiply_scenarios(
1011 host_architecture_scenarios
,
1012 package_architecture_scenarios
,
1013 version_check_TestCase
.version_scenarios
)
1015 def test_emits_debug_message_showing_architecture(self
):
1016 """ Should emit a debug message for the specified architecture. """
1017 test_architecture
= self
.getUniqueString()
1018 subprocess_double
= self
.dpkg_print_architecture_subprocess_double
1019 subprocess_double
.set_stdout_content(
1020 "{arch}\n".format(arch
=test_architecture
))
1021 self
.test_args
['debug'] = True
1023 dput
.dput
.version_check(**self
.test_args
)
1024 except FakeSystemExit
:
1026 expected_output
= textwrap
.dedent("""\
1027 D: detected architecture: '{arch}'
1028 """).format(arch
=test_architecture
)
1029 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1031 def test_omits_stderr_output_message_when_not_debug(self
):
1032 """ Should omit any debug messages for `stderr` output. """
1033 doubles
= self
.get_subprocess_doubles_matching_argv_prefix(
1034 ("dpkg", "--print-architecture"))
1035 if self
.test_architectures
:
1037 self
.get_subprocess_doubles_matching_argv_prefix(
1038 ("dpkg", "--field")))
1040 self
.get_subprocess_doubles_matching_argv_prefix(
1042 for double
in doubles
:
1043 double
.set_stderr_content(self
.getUniqueString())
1044 self
.test_args
['debug'] = False
1046 dput
.dput
.version_check(**self
.test_args
)
1047 except FakeSystemExit
:
1049 message_snippet
= " stderr output:"
1050 self
.assertNotIn(message_snippet
, sys
.stdout
.getvalue())
1052 def test_emits_debug_message_for_architecture_stderr_output(self
):
1053 """ Should emit debug message for Dpkg architecture `stderr`. """
1054 subprocess_double
= self
.dpkg_print_architecture_subprocess_double
1055 test_output
= self
.getUniqueString()
1056 subprocess_double
.set_stderr_content(test_output
)
1057 self
.test_args
['debug'] = True
1059 dput
.dput
.version_check(**self
.test_args
)
1060 except FakeSystemExit
:
1062 expected_output
= textwrap
.dedent("""\
1063 D: dpkg-architecture stderr output: {output!r}
1064 """).format(output
=test_output
)
1065 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1067 def test_emits_debug_message_for_field_stderr_output(self
):
1068 """ Should emit debug message for Dpkg fields `stderr`. """
1069 test_output
= self
.getUniqueString()
1070 for double
in self
.get_subprocess_doubles_matching_argv_prefix(
1071 ("dpkg", "--field")):
1072 double
.set_stderr_content(test_output
)
1073 self
.test_args
['debug'] = True
1075 dput
.dput
.version_check(**self
.test_args
)
1076 except FakeSystemExit
:
1078 expected_output
= textwrap
.dedent("""\
1079 D: dpkg stderr output: {output!r}
1080 """).format(output
=test_output
)
1081 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1083 def test_emits_debug_message_for_status_stderr_output(self
):
1084 """ Should emit debug message for Dpkg status `stderr`. """
1085 test_output
= self
.getUniqueString()
1086 for double
in self
.get_subprocess_doubles_matching_argv_prefix(
1088 double
.set_stderr_content(test_output
)
1089 self
.test_args
['debug'] = True
1091 dput
.dput
.version_check(**self
.test_args
)
1092 except FakeSystemExit
:
1094 expected_output
= textwrap
.dedent("""\
1095 D: dpkg stderr output: {output!r}
1096 """).format(output
=test_output
)
1097 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1099 def test_emits_expected_debug_message_for_installed_version(self
):
1100 """ Should emit debug message for package installed version. """
1101 self
.test_args
['debug'] = True
1102 message_lead
= "D: Installed-Version:"
1103 if hasattr(self
, 'installed_version'):
1104 dput
.dput
.version_check(**self
.test_args
)
1105 expected_output
= "{lead} {version}".format(
1106 lead
=message_lead
, version
=self
.installed_version
)
1107 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1109 with testtools
.ExpectedException(FakeSystemExit
):
1110 dput
.dput
.version_check(**self
.test_args
)
1111 self
.assertNotIn(message_lead
, sys
.stdout
.getvalue())
1113 def test_emits_expected_debug_message_for_upload_version(self
):
1114 """ Should emit debug message for package upload version. """
1115 self
.test_args
['debug'] = True
1116 message_lead
= "D: Check-Version:"
1117 if hasattr(self
, 'installed_version'):
1118 dput
.dput
.version_check(**self
.test_args
)
1119 expected_output
= "{lead} {version}".format(
1120 lead
=message_lead
, version
=self
.upload_version
)
1121 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1123 with testtools
.ExpectedException(FakeSystemExit
):
1124 dput
.dput
.version_check(**self
.test_args
)
1125 self
.assertNotIn(message_lead
, sys
.stdout
.getvalue())
1128 class version_check_ArchitectureMismatchTestCase(version_check_TestCase
):
1129 """ Test cases for `version_check` when no match to host architecture. """
1131 host_architecture_scenarios
= [
1132 ('host-arch spam', {
1133 'host_architecture': "spam",
1137 package_architecture_scenarios
= [
1139 'test_architectures': ["foo"],
1143 scenarios
= testscenarios
.multiply_scenarios(
1144 host_architecture_scenarios
,
1145 package_architecture_scenarios
,
1146 version_check_TestCase
.version_scenarios
)
1148 def test_emits_debug_message_stating_arch_mismatch(self
):
1149 """ Should emit a debug message stating the architecture mismatch. """
1150 self
.test_args
['debug'] = True
1151 dput
.dput
.version_check(**self
.test_args
)
1152 file_scenarios
= dict(self
.package_file_scenarios
)
1153 for arch
in self
.test_architectures
:
1154 file_scenario_name
= "package-arch {arch}".format(arch
=arch
)
1155 file_scenario
= file_scenarios
[file_scenario_name
]
1156 file_double
= file_scenario
['file_double']
1157 expected_output
= textwrap
.dedent("""\
1158 D: not install-checking {path} due to arch mismatch
1159 """).format(path
=file_double
.path
)
1161 sys
.stdout
.getvalue(),
1162 testtools
.matchers
.Contains(expected_output
))
1165 class version_check_PackageNotInstalledTestCase(version_check_TestCase
):
1166 """ Test cases for `version_check` when package is not installed. """
1168 host_architecture_scenarios
= [
1170 'host_architecture': "foo",
1174 package_architecture_scenarios
= [
1176 'test_architectures': ["foo"],
1180 version_scenarios
= [
1182 'upload_version': "lorem",
1186 scenarios
= testscenarios
.multiply_scenarios(
1187 host_architecture_scenarios
,
1188 package_architecture_scenarios
,
1191 def test_emits_message_stating_package_not_installed(self
):
1192 """ Should emit message stating package is not installed. """
1193 with testtools
.ExpectedException(FakeSystemExit
):
1194 dput
.dput
.version_check(**self
.test_args
)
1195 expected_output
= textwrap
.dedent("""\
1196 Uninstalled Package. Test it before uploading it.
1198 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1200 def test_calls_sys_exit_with_error_status(self
):
1201 """ Should call `sys.exit` with exit status indicating error. """
1202 with testtools
.ExpectedException(FakeSystemExit
):
1203 dput
.dput
.version_check(**self
.test_args
)
1204 sys
.exit
.assert_called_with(EXIT_STATUS_FAILURE
)
1207 class check_upload_logfile_TestCase(
1208 testscenarios
.WithScenarios
,
1209 testtools
.TestCase
):
1210 """ Base for test cases for `check_upload_logfile` function. """
1212 scenarios
= NotImplemented
1215 """ Set up test fixtures. """
1216 super(check_upload_logfile_TestCase
, self
).setUp()
1217 patch_system_interfaces(self
)
1219 set_config(self
, 'exist-simple')
1220 patch_runtime_config_options(self
)
1222 setup_changes_file_fixtures(self
)
1223 self
.set_changes_file_scenario('no-format')
1225 self
.set_test_args()
1227 patch_os_path_exists(self
)
1229 self
.set_upload_log_file_double()
1231 setup_file_double_behaviour(
1233 [self
.changes_file_double
, self
.upload_log_file_double
])
1235 def set_changes_file_scenario(self
, name
):
1236 """ Set the package changes file based on scenario name. """
1237 scenarios
= make_changes_file_scenarios()
1238 scenario
= dict(scenarios
)[name
]
1239 self
.changes_file_double
= scenario
['file_double']
1241 def set_test_args(self
):
1242 """ Set the arguments for the test call to the function. """
1243 self
.test_args
= dict(
1244 changes_file
=self
.changes_file_double
.path
,
1245 host
=self
.test_host
,
1246 fqdn
=self
.runtime_config_parser
.get(self
.test_host
, 'fqdn'),
1252 custom_args
= getattr(self
, 'custom_args', {})
1253 self
.test_args
.update(custom_args
)
1255 def set_upload_log_file_double(self
):
1256 """ Set the upload log file double. """
1257 (file_dir_path
, changes_file_name
) = os
.path
.split(
1258 self
.changes_file_double
.path
)
1259 (file_basename
, __
) = os
.path
.splitext(changes_file_name
)
1260 file_name
= "{basename}.{host}.upload".format(
1261 basename
=file_basename
, host
=self
.test_host
)
1262 file_path
= os
.path
.join(file_dir_path
, file_name
)
1264 double
= FileDouble(file_path
)
1265 double
.set_os_path_exists_scenario(
1266 getattr(self
, 'path_exists_scenario_name', 'exist'))
1267 double
.set_open_scenario(
1268 getattr(self
, 'open_scenario_name', 'okay'))
1270 file_content
= getattr(self
, 'log_content', "")
1271 double
.fake_file
= StringIO(file_content
)
1273 self
.upload_log_file_double
= double
1276 class check_upload_logfile_SuccessTestCase(check_upload_logfile_TestCase
):
1277 """ Success test cases for `check_upload_logfile` function. """
1282 'path_exists_scenario_name': 'not_exist',
1283 'open_scenario_name': 'nonexist',
1292 'force_upload': True,
1295 ('check_only and force_upload', {
1298 'force_upload': True,
1301 ('method-ftp not-uploaded', {
1302 'config_method': "ftp",
1303 'config_fqdn': "quux.example.com",
1304 'log_content': "foo lorem-ipsum bar\n",
1306 ('method-local not-uploaded', {
1307 'config_method': "local",
1308 'log_content': "foo lorem-ipsum bar\n",
1312 for (scenario_name
, scenario
) in scenarios
:
1313 scenario
['expected_result'] = None
1314 del scenario_name
, scenario
1316 def test_returns_expected_result(self
):
1317 """ Should return expected result for the scenario. """
1319 result
= dput
.dput
.check_upload_logfile(**self
.test_args
)
1320 except FakeSystemExit
:
1322 self
.assertEqual(self
.expected_result
, result
)
1325 class check_upload_logfile_ExitTestCase(check_upload_logfile_TestCase
):
1326 """ Exit test cases for `check_upload_logfile` function. """
1330 'config_method': "ftp",
1331 'config_fqdn': "quux.example.com",
1332 'log_content': "foo quux.example.com bar\n",
1333 'expected_exit_status': 0,
1337 def test_calls_sys_exit_with_expected_exit_status(self
):
1338 """ Should call `sys.exit` with expected exit status. """
1339 with testtools
.ExpectedException(FakeSystemExit
):
1340 dput
.dput
.check_upload_logfile(**self
.test_args
)
1341 sys
.exit
.assert_called_with(self
.expected_exit_status
)
1344 class check_upload_logfile_ErrorTestCase(check_upload_logfile_TestCase
):
1345 """ Error test cases for `check_upload_logfile` function. """
1347 log_file_scenarios
= [
1349 'path_exists_scenario_name': 'exist',
1350 'open_scenario_name': 'read_denied',
1351 'expected_exit_status': 1,
1355 scenarios
= log_file_scenarios
1357 def test_calls_sys_exit_with_expected_exit_status(self
):
1358 """ Should call `sys.exit` with expected exit status. """
1359 with testtools
.ExpectedException(FakeSystemExit
):
1360 dput
.dput
.check_upload_logfile(**self
.test_args
)
1361 sys
.exit
.assert_called_with(self
.expected_exit_status
)
1364 # Copyright © 2015–2016 Ben Finney <bignose@debian.org>
1366 # This is free software: you may copy, modify, and/or distribute this work
1367 # under the terms of the GNU General Public License as published by the
1368 # Free Software Foundation; version 3 of that license or any later version.
1369 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
1376 # vim: fileencoding=utf-8 filetype=python :