Refine restrictions on test doubles to catch more errors.
[dput.git] / test / test_changesfile.py
blob151289c60b7fc3a7e13bc01dea6a7ac6c02f8a60
1 # -*- coding: utf-8; -*-
3 # test/test_changesfile.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Unit tests for Debian upload control (‘*.changes’) files. """
15 from __future__ import (absolute_import, unicode_literals)
17 import sys
18 import os
19 import os.path
20 import tempfile
21 import textwrap
22 import doctest
23 import email.message
24 import collections
26 import testtools
27 import testscenarios
29 __package__ = str("test")
30 __import__(__package__)
31 sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
32 import dput.dput
33 from dput.helper import dputhelper
35 from .helper import (
36 StringIO,
37 mock,
38 patch_system_interfaces,
39 make_unique_slug,
40 FakeSystemExit,
41 EXIT_STATUS_FAILURE,
42 make_fake_file_scenarios,
43 get_file_doubles_from_fake_file_scenarios,
44 setup_file_double_behaviour,
45 patch_os_stat,
46 FileDouble,
48 from .test_configfile import (
49 set_config,
50 patch_runtime_config_options,
53 __metaclass__ = type
56 class _FieldsMapping:
57 """ A mapping to stand in for the `dict` of an `email.message.Message`. """
59 def __init__(self, *args, **kwargs):
60 try:
61 self._message = kwargs.pop('_message')
62 except KeyError:
63 raise TypeError("no ‘_message’ specified for this mapping")
64 super(_FieldsMapping, self).__init__(*args, **kwargs)
66 def __len__(self, *args, **kwargs):
67 return self._message.__len__(*args, **kwargs)
69 def __contains__(self, *args, **kwargs):
70 return self._message.__contains__(*args, **kwargs)
72 def __getitem__(self, *args, **kwargs):
73 return self._message.__getitem__(*args, **kwargs)
75 def __setitem__(self, *args, **kwargs):
76 self._message.__setitem__(*args, **kwargs)
78 def __delitem__(self, *args, **kwargs):
79 self._message.__delitem__(*args, **kwargs)
81 def has_key(self, *args, **kwargs):
82 return self._message.has_key(*args, **kwargs)
84 def keys(self, *args, **kwargs):
85 return self._message.keys(*args, **kwargs)
87 def values(self, *args, **kwargs):
88 return self._message.values(*args, **kwargs)
90 def items(self, *args, **kwargs):
91 return self._message.items(*args, **kwargs)
93 def get(self, *args, **kwargs):
94 return self._message.get(*args, **kwargs)
97 class FakeMessage(email.message.Message, object):
98 """ A fake RFC 2822 message that mocks the obsolete `rfc822.Message`. """
100 def __init__(self, *args, **kwargs):
101 super(FakeMessage, self).__init__(*args, **kwargs)
102 self.dict = _FieldsMapping(_message=self)
105 def make_fake_message(fields):
106 """ Make a fake message instance. """
107 message = FakeMessage()
108 for (name, value) in fields.items():
109 message.add_header(name, value)
110 return message
113 def make_files_field_value(params_by_name):
114 """ Make a value for “Files” field of a changes document. """
115 result = "\n".join(
116 " ".join(params)
117 for (file_name, params) in params_by_name.items())
118 return result
121 def make_upload_files_params(checksums_by_file_name, sizes_by_file_name):
122 """ Make a mapping of upload parameters for files. """
123 params_by_name = {
124 file_name: [
125 checksums_by_file_name[file_name],
126 str(sizes_by_file_name[file_name]),
127 "foo", "bar", file_name]
128 for file_name in checksums_by_file_name}
129 return params_by_name
132 def make_changes_document(fields, upload_params_by_name=None):
133 """ Make a changes document from field values.
135 :param fields: Sequence of (name, value) tuples for fields.
136 :param upload_params_by_name: Mapping from filename to upload
137 parameters for each file.
138 :return: The changes document as an RFC 822 formatted text.
141 document_fields = fields.copy()
142 if upload_params_by_name is not None:
143 files_field_text = make_files_field_value(upload_params_by_name)
144 document_fields.update({'files': files_field_text})
145 document = make_fake_message(document_fields)
147 return document
150 def make_changes_file_scenarios():
151 """ Make fake Debian upload control (‘*.changes’) scenarios. """
152 file_path = make_changes_file_path()
154 fake_file_empty = StringIO()
155 fake_file_no_format = StringIO(textwrap.dedent("""\
158 Files:
159 Lorem ipsum dolor sit amet
160 """))
161 fake_file_with_signature = StringIO(textwrap.dedent("""\
162 -----BEGIN PGP SIGNED MESSAGE-----
163 Hash: SHA1
167 Files:
168 Lorem ipsum dolor sit amet
170 -----BEGIN PGP SIGNATURE-----
171 Version: 0.0
172 Comment: Proin ac massa at orci sagittis fermentum.
174 gibberishgibberishgibberishgibberishgibberishgibberish
175 gibberishgibberishgibberishgibberishgibberishgibberish
176 gibberishgibberishgibberishgibberishgibberishgibberish
177 -----END PGP SIGNATURE-----
178 """))
179 fake_file_with_format = StringIO(textwrap.dedent("""\
180 Format: FOO
181 Files:
182 Lorem ipsum dolor sit amet
183 """))
184 fake_file_invalid = StringIO(textwrap.dedent("""\
185 Format: FOO
186 Files:
187 FOO BAR
188 """))
190 scenarios = [
191 ('no-format', {
192 'file_double': FileDouble(
193 path=file_path,
194 fake_file=fake_file_no_format),
195 'expected_result': make_changes_document({
196 'files': "Lorem ipsum dolor sit amet",
199 ('with-pgp-signature', {
200 'file_double': FileDouble(
201 path=file_path,
202 fake_file=fake_file_with_signature),
203 'expected_result': make_changes_document({
204 'files': "Lorem ipsum dolor sit amet",
207 ('with-format', {
208 'file_double': FileDouble(
209 path=file_path,
210 fake_file=fake_file_with_format),
211 'expected_result': make_changes_document({
212 'files': "Lorem ipsum dolor sit amet",
215 ('error empty', {
216 'file_double': FileDouble(
217 path=file_path, fake_file=fake_file_empty),
218 'expected_error': KeyError,
220 ('error invalid', {
221 'file_double': FileDouble(
222 path=file_path,
223 fake_file=fake_file_invalid),
224 'expected_error': FakeSystemExit,
228 for (scenario_name, scenario) in scenarios:
229 scenario['changes_file_scenario_name'] = scenario_name
231 return scenarios
234 def set_fake_upload_file_paths(testcase):
235 """ Set the fake upload file paths. """
236 testcase.fake_upload_file_paths = [
237 os.path.join(
238 os.path.dirname(testcase.changes_file_double.path),
239 os.path.basename(tempfile.mktemp()))
240 for __ in range(10)]
242 required_suffixes = [".dsc", ".tar.xz"]
243 suffixes = required_suffixes + getattr(
244 testcase, 'additional_file_suffixes', [])
245 file_path_base = testcase.fake_upload_file_paths.pop()
246 for suffix in suffixes:
247 file_path = file_path_base + suffix
248 testcase.fake_upload_file_paths.insert(0, file_path)
251 def set_file_checksums(testcase):
252 """ Set the fake file checksums for the test case. """
253 testcase.fake_checksum_by_file = {
254 os.path.basename(file_path): make_unique_slug(testcase)
255 for file_path in testcase.fake_upload_file_paths}
258 def set_file_sizes(testcase):
259 """ Set the fake file sizes for the test case. """
260 testcase.fake_size_by_file = {
261 os.path.basename(file_path): testcase.getUniqueInteger()
262 for file_path in testcase.fake_upload_file_paths}
265 def set_file_doubles(testcase):
266 """ Set the file doubles for the test case. """
267 for file_path in testcase.fake_upload_file_paths:
268 file_double = FileDouble(file_path)
269 file_double.set_os_stat_scenario('okay')
270 file_double.stat_result = file_double.stat_result._replace(
271 st_size=testcase.fake_size_by_file[
272 os.path.basename(file_path)],
274 file_double.register_for_testcase(testcase)
277 def setup_upload_file_fixtures(testcase):
278 """ Set fixtures for fake files to upload for the test case. """
279 set_fake_upload_file_paths(testcase)
280 set_file_checksums(testcase)
281 set_file_sizes(testcase)
282 set_file_doubles(testcase)
285 def make_changes_file_path(file_dir_path=None):
286 """ Make a filesystem path for the changes file. """
287 if file_dir_path is None:
288 file_dir_path = tempfile.mktemp()
289 file_name = os.path.basename(
290 "{base}.changes".format(base=tempfile.mktemp()))
291 file_path = os.path.join(file_dir_path, file_name)
293 return file_path
296 def setup_changes_file_fixtures(testcase):
297 """ Set up fixtures for changes file doubles. """
298 file_path = make_changes_file_path()
300 scenarios = make_fake_file_scenarios(file_path)
301 testcase.changes_file_scenarios = scenarios
303 file_doubles = get_file_doubles_from_fake_file_scenarios(
304 scenarios.values())
305 setup_file_double_behaviour(testcase, file_doubles)
308 def set_changes_file_scenario(testcase, name):
309 """ Set the changes file scenario for this test case. """
310 scenario = dict(testcase.changes_file_scenarios)[name]
311 testcase.changes_file_scenario = scenario
312 testcase.changes_file_double = scenario['file_double']
313 testcase.changes_file_double.register_for_testcase(testcase)
316 class parse_changes_TestCase(
317 testscenarios.WithScenarios,
318 testtools.TestCase):
319 """ Base for test cases for `parse_changes` function. """
321 scenarios = NotImplemented
323 def setUp(self):
324 """ Set up test fixtures. """
325 super(parse_changes_TestCase, self).setUp()
326 patch_system_interfaces(self)
328 self.test_infile = StringIO()
331 class parse_changes_SuccessTestCase(parse_changes_TestCase):
332 """ Success test cases for `parse_changes` function. """
334 scenarios = list(
335 (name, scenario)
336 for (name, scenario) in make_changes_file_scenarios()
337 if not name.startswith('error'))
339 def test_gives_expected_result_for_infile(self):
340 """ Should give the expected result for specified input file. """
341 result = dput.dput.parse_changes(self.file_double.fake_file)
342 normalised_result_set = set(
343 (key.lower(), value.strip())
344 for (key, value) in result.items())
345 self.assertEqual(
346 set(self.expected_result.items()), normalised_result_set)
349 class parse_changes_ErrorTestCase(parse_changes_TestCase):
350 """ Error test cases for `parse_changes` function. """
352 scenarios = list(
353 (name, scenario)
354 for (name, scenario) in make_changes_file_scenarios()
355 if name.startswith('error'))
357 def test_raises_expected_exception_for_infile(self):
358 """ Should raise the expected exception for specified input file. """
359 with testtools.ExpectedException(self.expected_error):
360 dput.dput.parse_changes(self.file_double.fake_file)
363 class check_upload_variant_TestCase(
364 testscenarios.WithScenarios,
365 testtools.TestCase):
366 """ Test cases for `check_upload_variant` function. """
368 scenarios = [
369 ('simple', {
370 'fields': {
371 'architecture': "foo bar baz",
373 'expected_result': True,
375 ('arch-missing', {
376 'fields': {
377 'spam': "Lorem ipsum dolor sit amet",
379 'expected_result': False,
381 ('source-only', {
382 'fields': {
383 'architecture': "source",
385 'expected_result': False,
387 ('source-and-others', {
388 'fields': {
389 'architecture': "foo source bar",
391 'expected_result': False,
395 def setUp(self):
396 """ Set up test fixtures. """
397 super(check_upload_variant_TestCase, self).setUp()
398 patch_system_interfaces(self)
400 self.set_changes_document(self.fields)
401 self.set_test_args()
403 def set_changes_document(self, fields):
404 """ Set the package changes document based on specified fields. """
405 self.test_changes_document = make_changes_document(fields)
407 def set_test_args(self):
408 """ Set the arguments for the test call to the function. """
409 self.test_args = {
410 'changes': self.test_changes_document,
411 'debug': False,
414 def test_returns_expected_result_for_changes_document(self):
415 """ Should return expected result for specified changes document. """
416 result = dput.dput.check_upload_variant(**self.test_args)
417 self.assertEqual(self.expected_result, result)
419 def test_emits_debug_message_showing_architecture(self):
420 """ Should emit a debug message for the specified architecture. """
421 if 'architecture' not in self.fields:
422 self.skipTest("Architecture field not in this scenario")
423 self.test_args['debug'] = True
424 dput.dput.check_upload_variant(**self.test_args)
425 expected_output = textwrap.dedent("""\
426 D: Architecture: {arch}
427 """).format(arch=self.fields['architecture'])
428 self.assertIn(expected_output, sys.stdout.getvalue())
430 def test_emits_debug_message_for_binary_upload(self):
431 """ Should emit a debug message for the specified architecture. """
432 triggers_binaryonly = bool(self.expected_result)
433 if not triggers_binaryonly:
434 self.skipTest("Scenario does not trigger binary-only upload")
435 self.test_args['debug'] = True
436 dput.dput.check_upload_variant(**self.test_args)
437 expected_output = textwrap.dedent("""\
438 D: Doing a binary upload only.
439 """)
440 self.assertIn(expected_output, sys.stdout.getvalue())
443 SourceCheckResult = collections.namedtuple(
444 'SourceCheckResult', ['include_orig', 'include_tar'])
447 class source_check_TestCase(
448 testscenarios.WithScenarios,
449 testtools.TestCase):
450 """ Test cases for `source_check` function. """
452 default_expected_result = SourceCheckResult(
453 include_orig=False, include_tar=False)
455 scenarios = [
456 ('no-version', {
457 'expected_result': default_expected_result,
459 ('no-epoch native-version', {
460 'upstream_version': "1.2",
461 'expected_result': SourceCheckResult(
462 include_orig=False, include_tar=True),
464 ('epoch native-version', {
465 'epoch': "3",
466 'upstream_version': "1.2",
467 'expected_result': SourceCheckResult(
468 include_orig=False, include_tar=True),
470 ('no-epoch debian-release', {
471 'upstream_version': "1.2",
472 'release': "5",
473 'expected_result': SourceCheckResult(
474 include_orig=False, include_tar=True),
476 ('epoch debian-release', {
477 'epoch': "3",
478 'upstream_version': "1.2",
479 'release': "5",
480 'expected_result': SourceCheckResult(
481 include_orig=False, include_tar=True),
483 ('no-epoch new-upstream-version', {
484 'upstream_version': "1.2",
485 'release': "1",
486 'expected_result': SourceCheckResult(
487 include_orig=True, include_tar=False),
489 ('epoch new_upstream-version', {
490 'epoch': "3",
491 'upstream_version': "1.2",
492 'release': "1",
493 'expected_result': SourceCheckResult(
494 include_orig=True, include_tar=False),
496 ('no-epoch nmu', {
497 'upstream_version': "1.2",
498 'release': "4.5",
499 'expected_result': SourceCheckResult(
500 include_orig=False, include_tar=True),
502 ('epoch nmu', {
503 'epoch': "3",
504 'upstream_version': "1.2",
505 'release': "4.5",
506 'expected_result': SourceCheckResult(
507 include_orig=False, include_tar=True),
509 ('no-epoch nmu before-first-release', {
510 'upstream_version': "1.2",
511 'release': "0.1",
512 'expected_result': SourceCheckResult(
513 include_orig=True, include_tar=False),
515 ('epoch nmu before-first-release', {
516 'epoch': "3",
517 'upstream_version': "1.2",
518 'release': "0.1",
519 'expected_result': SourceCheckResult(
520 include_orig=True, include_tar=False),
522 ('no-epoch nmu after-first-release', {
523 'upstream_version': "1.2",
524 'release': "1.1",
525 'expected_result': SourceCheckResult(
526 include_orig=True, include_tar=False),
528 ('epoch nmu after-first-release', {
529 'epoch': "3",
530 'upstream_version': "1.2",
531 'release': "1.1",
532 'expected_result': SourceCheckResult(
533 include_orig=True, include_tar=False),
537 for (scenario_name, scenario) in scenarios:
538 fields = {}
539 if 'upstream_version' in scenario:
540 version_string = scenario['upstream_version']
541 if 'epoch' in scenario:
542 version_string = "{epoch}:{version}".format(
543 epoch=scenario['epoch'], version=version_string)
544 if 'release' in scenario:
545 version_string = "{version}-{release}".format(
546 version=version_string, release=scenario['release'])
547 fields.update({'version': version_string})
548 scenario['version'] = version_string
549 scenario['changes_document'] = make_changes_document(fields)
550 del scenario_name, scenario
551 del fields, version_string
553 def setUp(self):
554 """ Set up test fixtures. """
555 super(source_check_TestCase, self).setUp()
556 patch_system_interfaces(self)
558 self.test_args = {
559 'changes': self.changes_document,
560 'debug': False,
563 def test_returns_expected_result_for_changes_document(self):
564 """ Should return expected result for specified changes document. """
565 result = dput.dput.source_check(**self.test_args)
566 self.assertEqual(self.expected_result, result)
568 def test_emits_version_string_debug_message_only_if_version(self):
569 """ Should emit message for version only if has version. """
570 self.test_args['debug'] = True
571 version = getattr(self, 'version', None)
572 message_lead = "D: Package Version:"
573 expected_output = textwrap.dedent("""\
574 {lead} {version}
575 """).format(
576 lead=message_lead, version=version)
577 dput.dput.source_check(**self.test_args)
578 if hasattr(self, 'version'):
579 self.assertIn(expected_output, sys.stdout.getvalue())
580 else:
581 self.assertNotIn(message_lead, sys.stdout.getvalue())
583 def test_emits_epoch_debug_message_only_if_epoch(self):
584 """ Should emit message for epoch only if has an epoch. """
585 self.test_args['debug'] = True
586 dput.dput.source_check(**self.test_args)
587 expected_output = textwrap.dedent("""\
588 D: Epoch found
589 """)
590 dput.dput.source_check(**self.test_args)
591 if (hasattr(self, 'epoch') and hasattr(self, 'release')):
592 self.assertIn(expected_output, sys.stdout.getvalue())
593 else:
594 self.assertNotIn(expected_output, sys.stdout.getvalue())
596 def test_emits_upstream_version_debug_message_only_if_nonnative(self):
597 """ Should emit message for upstream version only if non-native. """
598 self.test_args['debug'] = True
599 upstream_version = getattr(self, 'upstream_version', None)
600 message_lead = "D: Upstream Version:"
601 expected_output = textwrap.dedent("""\
602 {lead} {version}
603 """).format(
604 lead=message_lead, version=upstream_version)
605 dput.dput.source_check(**self.test_args)
606 if hasattr(self, 'release'):
607 self.assertIn(expected_output, sys.stdout.getvalue())
608 else:
609 self.assertNotIn(message_lead, sys.stdout.getvalue())
611 def test_emits_debian_release_debug_message_only_if_nonnative(self):
612 """ Should emit message for Debian release only if non-native. """
613 self.test_args['debug'] = True
614 debian_release = getattr(self, 'release', None)
615 message_lead = "D: Debian Version:"
616 expected_output = textwrap.dedent("""\
617 {lead} {version}
618 """).format(
619 lead=message_lead, version=debian_release)
620 dput.dput.source_check(**self.test_args)
621 if hasattr(self, 'release'):
622 self.assertIn(expected_output, sys.stdout.getvalue())
623 else:
624 self.assertNotIn(message_lead, sys.stdout.getvalue())
627 class verify_files_TestCase(
628 testscenarios.WithScenarios,
629 testtools.TestCase):
630 """ Test cases for `verify_files` function. """
632 default_args = dict(
633 host="foo",
634 check_only=None,
635 check_version=None,
636 unsigned_upload=None,
637 debug=None,
640 scenarios = [
641 ('default', {}),
642 ('binary-only', {
643 'check_upload_variant_return_value': False,
645 ('include foo.tar.gz', {
646 'additional_file_suffixes': [".tar.gz"],
647 'source_check_result': SourceCheckResult(
648 include_orig=False, include_tar=True),
650 ('include foo.orig.tar.gz', {
651 'additional_file_suffixes': [".orig.tar.gz"],
652 'source_check_result': SourceCheckResult(
653 include_orig=True, include_tar=False),
655 ('unexpected foo.tar.gz', {
656 'additional_file_suffixes': [".tar.gz"],
657 'expected_rejection_message': (
658 "Package includes a .tar.gz file although"),
660 ('unexpected foo.orig.tar.gz', {
661 'additional_file_suffixes': [".orig.tar.gz"],
662 'expected_rejection_message': (
663 "Package includes an .orig.tar.gz file although"),
665 ('no distribution', {
666 'test_distribution': None,
670 def setUp(self):
671 """ Set up test fixtures. """
672 super(verify_files_TestCase, self).setUp()
673 patch_system_interfaces(self)
675 self.file_double_by_path = {}
676 set_config(self, 'exist-simple')
677 patch_runtime_config_options(self)
679 self.set_test_args()
681 setup_changes_file_fixtures(self)
682 set_changes_file_scenario(self, 'exist-minimal')
683 self.test_args.update(dict(
684 path=os.path.dirname(self.changes_file_double.path),
685 filename=os.path.basename(self.changes_file_double.path),
688 patch_os_stat(self)
690 setup_upload_file_fixtures(self)
691 self.set_expected_files_to_upload()
693 self.patch_checksum_test()
694 self.patch_parse_changes()
695 self.patch_check_upload_variant()
696 self.set_expected_binary_upload()
697 self.set_expected_source_control_file_path()
698 self.patch_version_check()
699 self.patch_verify_signature()
700 self.patch_source_check()
702 def set_expected_files_to_upload(self):
703 """ Set the expected `files_to_upload` result for this test case. """
704 self.expected_files_to_upload = set(
705 path for path in self.fake_upload_file_paths)
706 self.expected_files_to_upload.add(self.changes_file_double.path)
708 def patch_checksum_test(self):
709 """ Patch `checksum_test` function for this test case. """
710 func_patcher = mock.patch.object(
711 dput.dput, "checksum_test", autospec=True)
712 mock_func = func_patcher.start()
713 self.addCleanup(func_patcher.stop)
715 def get_checksum_for_file(path, hash):
716 return self.fake_checksum_by_file[os.path.basename(path)]
717 mock_func.side_effect = get_checksum_for_file
719 def set_changes_document(self):
720 """ Set the changes document for this test case. """
721 self.changes_document = make_changes_document(
722 fields={},
723 upload_params_by_name=self.upload_params_by_name)
724 self.test_distribution = getattr(self, 'test_distribution', "lorem")
725 if self.test_distribution is not None:
726 self.changes_document.add_header(
727 'distribution', self.test_distribution)
728 self.runtime_config_parser.set(
729 self.test_args['host'], 'allowed_distributions',
730 self.test_distribution)
732 dput.dput.parse_changes.return_value = self.changes_document
734 def patch_parse_changes(self):
735 """ Patch `parse_changes` function for this test case. """
736 func_patcher = mock.patch.object(
737 dput.dput, "parse_changes", autospec=True)
738 func_patcher.start()
739 self.addCleanup(func_patcher.stop)
741 self.upload_params_by_name = make_upload_files_params(
742 self.fake_checksum_by_file,
743 self.fake_size_by_file)
744 self.set_changes_document()
746 def patch_check_upload_variant(self):
747 """ Patch `check_upload_variant` function for this test case. """
748 if not hasattr(self, 'check_upload_variant_return_value'):
749 self.check_upload_variant_return_value = True
751 func_patcher = mock.patch.object(
752 dput.dput, "check_upload_variant", autospec=True,
753 return_value=self.check_upload_variant_return_value)
754 func_patcher.start()
755 self.addCleanup(func_patcher.stop)
757 def patch_version_check(self):
758 """ Patch `version_check` function for this test case. """
759 func_patcher = mock.patch.object(
760 dput.dput, "version_check", autospec=True)
761 func_patcher.start()
762 self.addCleanup(func_patcher.stop)
764 def patch_verify_signature(self):
765 """ Patch `verify_signature` function for this test case. """
766 func_patcher = mock.patch.object(
767 dput.dput, "verify_signature", autospec=True)
768 func_patcher.start()
769 self.addCleanup(func_patcher.stop)
771 def patch_source_check(self):
772 """ Patch `source_check` function for this test case. """
773 func_patcher = mock.patch.object(
774 dput.dput, "source_check", autospec=True)
775 mock_func = func_patcher.start()
776 self.addCleanup(func_patcher.stop)
778 source_check_result = getattr(
779 self, 'source_check_result', SourceCheckResult(
780 include_orig=False, include_tar=False))
781 mock_func.return_value = source_check_result
783 def set_test_args(self):
784 """ Set test args for this test case. """
785 extra_args = getattr(self, 'extra_args', {})
786 self.test_args = self.default_args.copy()
787 self.test_args['config'] = self.runtime_config_parser
788 self.test_args.update(extra_args)
790 def set_expected_binary_upload(self):
791 """ Set expected value for `binary_upload` flag. """
792 self.expected_binary_upload = self.check_upload_variant_return_value
794 def set_expected_source_control_file_path(self):
795 """ Set expected value for source control file path. """
796 file_name = next(
797 os.path.basename(file_path)
798 for file_path in self.fake_upload_file_paths
799 if file_path.endswith(".dsc"))
800 if not self.expected_binary_upload:
801 self.expected_source_control_file_path = os.path.join(
802 os.path.dirname(self.changes_file_double.path), file_name)
803 else:
804 self.expected_source_control_file_path = ""
806 def test_emits_changes_file_path_debug_message(self):
807 """ Should emit debug message for changes file path. """
808 self.test_args['debug'] = True
809 dput.dput.verify_files(**self.test_args)
810 expected_output = textwrap.dedent("""\
811 D: Validating contents of changes file {path}
812 """).format(path=self.changes_file_double.path)
813 self.assertIn(expected_output, sys.stdout.getvalue())
815 def test_calls_sys_exit_if_input_read_denied(self):
816 """ Should call `sys.exit` if input file read access is denied. """
817 set_changes_file_scenario(self, 'error-read-denied')
818 with testtools.ExpectedException(FakeSystemExit):
819 dput.dput.verify_files(**self.test_args)
820 expected_output = textwrap.dedent("""\
821 Can't open {path}
822 """).format(path=self.changes_file_double.path)
823 self.assertIn(expected_output, sys.stdout.getvalue())
824 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
826 def test_calls_parse_changes_with_changes_files(self):
827 """ Should call `parse_changes` with changes file. """
828 dput.dput.verify_files(**self.test_args)
829 dput.dput.parse_changes.assert_called_with(
830 self.changes_file_double.fake_file)
832 def test_calls_check_upload_variant_with_changes_document(self):
833 """ Should call `check_upload_variant` with changes document. """
834 dput.dput.verify_files(**self.test_args)
835 dput.dput.check_upload_variant.assert_called_with(
836 self.changes_document, mock.ANY)
838 def test_emits_upload_dsc_file_debug_message(self):
839 """ Should emit debug message for ‘*.dsc’ file. """
840 if getattr(self, 'check_upload_variant_return_value', True):
841 self.skipTest("Binary package upload for this scenario")
842 self.test_args['debug'] = True
843 dput.dput.verify_files(**self.test_args)
844 dsc_file_path = next(
845 os.path.basename(file_path)
846 for file_path in self.fake_upload_file_paths
847 if file_path.endswith(".dsc"))
848 expected_output = textwrap.dedent("""\
849 D: dsc-File: {path}
850 """).format(path=dsc_file_path)
851 self.assertThat(
852 sys.stdout.getvalue(),
853 testtools.matchers.Contains(expected_output))
855 def test_calls_sys_exit_when_source_upload_omits_dsc_file(self):
856 """ Should call `sys.exit` when source upload omits ‘*.dsc’ file. """
857 if getattr(self, 'check_upload_variant_return_value', True):
858 self.skipTest("Binary package upload for this scenario")
859 self.fake_checksum_by_file = dict(
860 (file_path, checksum)
861 for (file_path, checksum)
862 in self.fake_checksum_by_file.items()
863 if not file_path.endswith(".dsc"))
864 self.patch_parse_changes()
865 with testtools.ExpectedException(FakeSystemExit):
866 dput.dput.verify_files(**self.test_args)
867 expected_output = textwrap.dedent("""\
868 Error: no dsc file found in sourceful upload
869 """)
870 self.assertThat(
871 sys.stderr.getvalue(),
872 testtools.matchers.Contains(expected_output))
873 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
875 def test_calls_version_check_when_specified_in_config(self):
876 """ Should call `version_check` when specified in config. """
877 self.runtime_config_parser.set(
878 self.test_args['host'], 'check_version', "true")
879 dput.dput.verify_files(**self.test_args)
880 dput.dput.version_check.assert_called_with(
881 os.path.dirname(self.changes_file_double.path),
882 self.changes_document,
883 self.test_args['debug'])
885 def test_calls_version_check_when_specified_in_args(self):
886 """ Should call `version_check` when specified in arguments. """
887 self.test_args['check_version'] = True
888 dput.dput.verify_files(**self.test_args)
889 dput.dput.version_check.assert_called_with(
890 os.path.dirname(self.changes_file_double.path),
891 self.changes_document,
892 self.test_args['debug'])
894 def test_calls_sys_exit_when_host_section_not_in_config(self):
895 """ Should call `sys.exit` when specified host not in config. """
896 self.runtime_config_parser.remove_section(self.test_args['host'])
897 with testtools.ExpectedException(FakeSystemExit):
898 dput.dput.verify_files(**self.test_args)
899 expected_output = textwrap.dedent("""\
900 Error in config file:
901 No section: ...
902 """)
903 self.assertThat(
904 sys.stderr.getvalue(),
905 testtools.matchers.DocTestMatches(
906 expected_output, doctest.ELLIPSIS))
907 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
909 def test_calls_verify_signature_with_expected_args(self):
910 """ Should call `verify_signature` with expected args. """
911 dput.dput.verify_files(**self.test_args)
912 dput.dput.verify_signature.assert_called_with(
913 self.test_args['host'],
914 self.changes_file_double.path,
915 self.expected_source_control_file_path,
916 self.runtime_config_parser,
917 self.test_args['check_only'],
918 self.test_args['unsigned_upload'], mock.ANY,
919 self.test_args['debug'])
921 def test_calls_source_check_with_changes_document(self):
922 """ Should call `source_check` with changes document. """
923 dput.dput.verify_files(**self.test_args)
924 dput.dput.source_check.assert_called_with(
925 self.changes_document, self.test_args['debug'])
927 def test_emits_upload_file_path_debug_message(self):
928 """ Should emit debug message for each upload file path. """
929 self.test_args['debug'] = True
930 dput.dput.verify_files(**self.test_args)
931 for file_path in self.fake_upload_file_paths:
932 expected_output = textwrap.dedent("""\
933 D: File to upload: {path}
934 """).format(path=file_path)
935 self.expectThat(
936 sys.stdout.getvalue(),
937 testtools.matchers.Contains(expected_output))
939 def test_calls_checksum_test_with_upload_files(self):
940 """ Should call `checksum_test` with each upload file path. """
941 dput.dput.verify_files(**self.test_args)
942 expected_calls = [
943 mock.call(file_path, mock.ANY)
944 for file_path in self.fake_upload_file_paths]
945 dput.dput.checksum_test.assert_has_calls(
946 expected_calls, any_order=True)
948 def set_bogus_file_checksums(self):
949 """ Set bogus file checksums that will not match. """
950 self.fake_checksum_by_file = {
951 file_name: self.getUniqueString()
952 for file_name in self.fake_checksum_by_file}
954 def test_emits_checksum_okay_debug_message(self):
955 """ Should emit debug message checksum okay for each file. """
956 self.test_args['debug'] = True
957 dput.dput.verify_files(**self.test_args)
958 for file_path in self.fake_upload_file_paths:
959 expected_output = textwrap.dedent("""\
960 D: Checksum for {path} is fine
961 """).format(path=file_path)
962 self.expectThat(
963 sys.stdout.getvalue(),
964 testtools.matchers.Contains(expected_output))
966 def test_emits_checksum_mismatch_debug_message(self):
967 """ Should emit debug message when a checksum does not match. """
968 self.test_args['debug'] = True
969 self.set_bogus_file_checksums()
970 with testtools.ExpectedException(FakeSystemExit):
971 dput.dput.verify_files(**self.test_args)
972 expected_output = textwrap.dedent("""\
974 D: Checksum from .changes: ...
975 D: Generated Checksum: ...
977 """)
978 self.assertThat(
979 sys.stdout.getvalue(),
980 testtools.matchers.DocTestMatches(
981 expected_output, doctest.ELLIPSIS))
983 def test_calls_sys_exit_when_checksum_mismatch(self):
984 """ Should call `sys.exit` when a checksum does not match. """
985 specified_checksum_by_file = self.fake_checksum_by_file
986 self.set_bogus_file_checksums()
987 with testtools.ExpectedException(FakeSystemExit):
988 dput.dput.verify_files(**self.test_args)
990 expected_output_for_files = [
991 textwrap.dedent("""\
992 Checksum doesn't match for {file_name}
993 """).format(
994 file_name=os.path.join(
995 os.path.dirname(self.changes_file_double.path),
996 file_name),
997 specified_hash=specified_hash,
998 computed_hash=self.fake_checksum_by_file[file_name])
999 for (file_name, specified_hash)
1000 in specified_checksum_by_file.items()]
1001 self.assertThat(
1002 sys.stdout.getvalue(),
1003 testtools.matchers.MatchesAny(*[
1004 testtools.matchers.Contains(expected_output)
1005 for expected_output in expected_output_for_files]))
1006 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
1008 def test_calls_os_stat_with_upload_files(self):
1009 """ Should call `os.stat` with each upload file path. """
1010 dput.dput.verify_files(**self.test_args)
1011 expected_calls = [
1012 mock.call(file_path)
1013 for file_path in self.fake_upload_file_paths]
1014 os.stat.assert_has_calls(expected_calls, any_order=True)
1016 def set_bogus_file_sizes(self):
1017 """ Set bogus file sizes that will not match. """
1018 file_double_registry = FileDouble.get_registry_for_testcase(self)
1019 for file_name in self.fake_size_by_file:
1020 bogus_size = self.getUniqueInteger()
1021 self.fake_size_by_file[file_name] = bogus_size
1022 file_path = os.path.join(
1023 os.path.dirname(self.changes_file_double.path),
1024 file_name)
1025 file_double = file_double_registry[file_path]
1026 file_double.stat_result = file_double.stat_result._replace(
1027 st_size=bogus_size)
1029 def test_emits_size_mismatch_debug_message(self):
1030 """ Should emit debug message when a size does not match. """
1031 self.test_args['debug'] = True
1032 self.set_bogus_file_sizes()
1033 dput.dput.verify_files(**self.test_args)
1034 expected_output = textwrap.dedent("""\
1036 D: size from .changes: ...
1037 D: calculated size: ...
1039 """)
1040 self.assertThat(
1041 sys.stdout.getvalue(),
1042 testtools.matchers.DocTestMatches(
1043 expected_output, doctest.ELLIPSIS))
1045 def test_emits_size_mismatch_message_for_each_file(self):
1046 """ Should emit error message for each file with size mismatch. """
1047 self.set_bogus_file_sizes()
1048 dput.dput.verify_files(**self.test_args)
1049 for file_path in self.fake_upload_file_paths:
1050 expected_output = textwrap.dedent("""\
1051 size doesn't match for {path}
1052 """).format(path=file_path)
1053 self.expectThat(
1054 sys.stdout.getvalue(),
1055 testtools.matchers.Contains(expected_output))
1057 def test_emits_rejection_warning_when_unexpected_tarball(self):
1058 """ Should emit warning of rejection when unexpected tarball. """
1059 if not hasattr(self, 'expected_rejection_message'):
1060 self.skipTest("No rejection message expected")
1061 dput.dput.verify_files(**self.test_args)
1062 sys.stderr.write("calls: {calls!r}\n".format(
1063 calls=sys.stdout.write.mock_calls))
1064 self.assertThat(
1065 sys.stdout.getvalue(),
1066 testtools.matchers.Contains(self.expected_rejection_message))
1068 def test_raises_error_when_distribution_mismatch(self):
1069 """ Should raise error when distribution mismatch against allowed. """
1070 if not getattr(self, 'test_distribution', None):
1071 self.skipTest("No distribution set for this test case")
1072 self.runtime_config_parser.set(
1073 self.test_args['host'], 'allowed_distributions',
1074 "dolor sit amet")
1075 with testtools.ExpectedException(dputhelper.DputUploadFatalException):
1076 dput.dput.verify_files(**self.test_args)
1078 def test_emits_changes_file_upload_debug_message(self):
1079 """ Should emit debug message for upload of changes file. """
1080 self.test_args['debug'] = True
1081 dput.dput.verify_files(**self.test_args)
1082 expected_output = textwrap.dedent("""\
1083 D: File to upload: {path}
1084 """).format(path=self.changes_file_double.path)
1085 self.assertIn(expected_output, sys.stdout.getvalue())
1087 def test_returns_expected_files_to_upload_collection(self):
1088 """ Should return expected `files_to_upload` collection value. """
1089 result = dput.dput.verify_files(**self.test_args)
1090 expected_result = self.expected_files_to_upload
1091 self.assertEqual(expected_result, set(result))
1094 class guess_upload_host_TestCase(
1095 testscenarios.WithScenarios,
1096 testtools.TestCase):
1097 """ Test cases for `guess_upload_host` function. """
1099 changes_file_scenarios = [
1100 ('no-distribution', {
1101 'fake_file': StringIO(textwrap.dedent("""\
1102 Files:
1103 Lorem ipsum dolor sit amet
1104 """)),
1106 ('distribution-spam', {
1107 'fake_file': StringIO(textwrap.dedent("""\
1108 Distribution: spam
1109 Files:
1110 Lorem ipsum dolor sit amet
1111 """)),
1113 ('distribution-beans', {
1114 'fake_file': StringIO(textwrap.dedent("""\
1115 Distribution: beans
1116 Files:
1117 Lorem ipsum dolor sit amet
1118 """)),
1122 scenarios = [
1123 ('distribution-found-of-one', {
1124 'changes_file_scenario_name': "distribution-spam",
1125 'test_distribution': "spam",
1126 'config_scenario_name': "exist-distribution-one",
1127 'expected_host': "foo",
1129 ('distribution-notfound-of-one', {
1130 'changes_file_scenario_name': "distribution-beans",
1131 'test_distribution': "beans",
1132 'config_scenario_name': "exist-distribution-one",
1133 'expected_host': "ftp-master",
1135 ('distribution-first-of-three', {
1136 'changes_file_scenario_name': "distribution-spam",
1137 'test_distribution': "spam",
1138 'config_scenario_name': "exist-distribution-three",
1139 'expected_host': "foo",
1141 ('distribution-last-of-three', {
1142 'changes_file_scenario_name': "distribution-beans",
1143 'test_distribution': "beans",
1144 'config_scenario_name': "exist-distribution-three",
1145 'expected_host': "foo",
1147 ('no-configured-distribution', {
1148 'changes_file_scenario_name': "distribution-beans",
1149 'config_scenario_name': "exist-distribution-none",
1150 'expected_host': "ftp-master",
1152 ('no-distribution', {
1153 'changes_file_scenario_name': "no-distribution",
1154 'config_scenario_name': "exist-simple",
1155 'expected_host': "ftp-master",
1157 ('default-distribution', {
1158 'config_scenario_name': "exist-default-distribution-only",
1159 'config_default_default_host_main': "consecteur",
1160 'expected_host': "consecteur",
1164 def setUp(self):
1165 """ Set up test fixtures. """
1166 super(guess_upload_host_TestCase, self).setUp()
1167 patch_system_interfaces(self)
1169 set_config(
1170 self,
1171 getattr(self, 'config_scenario_name', 'exist-minimal'))
1172 patch_runtime_config_options(self)
1174 self.setup_changes_file_fixtures()
1175 set_changes_file_scenario(
1176 self,
1177 getattr(self, 'changes_file_scenario_name', 'no-distribution'))
1179 self.set_test_args()
1181 def set_test_args(self):
1182 """ Set the arguments for the test call to the function. """
1183 self.test_args = dict(
1184 path=os.path.dirname(self.changes_file_double.path),
1185 filename=os.path.basename(self.changes_file_double.path),
1186 config=self.runtime_config_parser,
1189 def setup_changes_file_fixtures(self):
1190 """ Set up fixtures for fake changes file. """
1191 file_path = make_changes_file_path()
1193 scenarios = [s for (__, s) in self.changes_file_scenarios]
1194 for scenario in scenarios:
1195 scenario['file_double'] = FileDouble(
1196 file_path, scenario['fake_file'])
1197 setup_file_double_behaviour(
1198 self,
1199 get_file_doubles_from_fake_file_scenarios(scenarios))
1201 def test_calls_sys_exit_if_read_denied(self):
1202 """ Should call `sys.exit` if read permission denied. """
1203 self.changes_file_double.set_os_access_scenario('denied')
1204 self.changes_file_double.set_open_scenario('read_denied')
1205 with testtools.ExpectedException(FakeSystemExit):
1206 dput.dput.guess_upload_host(**self.test_args)
1207 expected_output = textwrap.dedent("""\
1208 Can't open {path}
1209 """).format(path=self.changes_file_double.path)
1210 self.assertIn(expected_output, sys.stdout.getvalue())
1211 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
1213 def test_returns_expected_host(self):
1214 """ Should return expected host value. """
1215 result = dput.dput.guess_upload_host(**self.test_args)
1216 self.assertEqual(self.expected_host, result)
1218 @mock.patch.object(dput.dput, 'debug', True)
1219 def test_emits_debug_message_for_host(self):
1220 """ Should emit a debug message for the discovered host. """
1221 config_parser = self.runtime_config_parser
1222 if not (
1223 config_parser.has_section(self.expected_host)
1224 and config_parser.get(self.expected_host, 'distributions')):
1225 self.skipTest("No distributions specified")
1226 dput.dput.guess_upload_host(**self.test_args)
1227 expected_output = textwrap.dedent("""\
1228 D: guessing host {host} based on distribution {dist}
1229 """).format(
1230 host=self.expected_host, dist=self.test_distribution)
1231 self.assertIn(expected_output, sys.stdout.getvalue())
1234 # Local variables:
1235 # coding: utf-8
1236 # mode: python
1237 # End:
1238 # vim: fileencoding=utf-8 filetype=python :