Re-organise source for better maintenance.
[dput.git] / test / test_changesfile.py
bloba75c60142db167a0417cc375af68678149d8e96e
1 # -*- coding: utf-8; -*-
3 # test/test_changesfile.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # This is free software, and you are welcome to redistribute it under
7 # certain conditions; see the end of this file for copyright
8 # information, grant of license, and disclaimer of warranty.
10 """ Unit tests for Debian upload control (‘*.changes’) files. """
12 from __future__ import (absolute_import, unicode_literals)
14 import sys
15 import os
16 import os.path
17 import tempfile
18 import textwrap
19 import doctest
20 import email.message
21 import collections
23 import testtools
24 import testscenarios
26 __package__ = str("test")
27 __import__(__package__)
28 sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
29 import dput.dput
30 from dput.helper import dputhelper
32 from .helper import (
33 StringIO,
34 mock,
35 patch_system_interfaces,
36 make_unique_slug,
37 FakeSystemExit,
38 EXIT_STATUS_FAILURE,
39 make_fake_file_scenarios,
40 get_file_doubles_from_fake_file_scenarios,
41 setup_file_double_behaviour,
42 patch_os_stat,
43 FileDouble,
45 from .test_configfile import (
46 set_config,
47 patch_runtime_config_options,
50 __metaclass__ = type
53 class _FieldsMapping:
54 """ A mapping to stand in for the `dict` of an `email.message.Message`. """
56 def __init__(self, *args, **kwargs):
57 try:
58 self._message = kwargs.pop('_message')
59 except KeyError:
60 raise TypeError("no ‘_message’ specified for this mapping")
61 super(_FieldsMapping, self).__init__(*args, **kwargs)
63 def __len__(self, *args, **kwargs):
64 return self._message.__len__(*args, **kwargs)
66 def __contains__(self, *args, **kwargs):
67 return self._message.__contains__(*args, **kwargs)
69 def __getitem__(self, *args, **kwargs):
70 return self._message.__getitem__(*args, **kwargs)
72 def __setitem__(self, *args, **kwargs):
73 self._message.__setitem__(*args, **kwargs)
75 def __delitem__(self, *args, **kwargs):
76 self._message.__delitem__(*args, **kwargs)
78 def has_key(self, *args, **kwargs):
79 return self._message.has_key(*args, **kwargs)
81 def keys(self, *args, **kwargs):
82 return self._message.keys(*args, **kwargs)
84 def values(self, *args, **kwargs):
85 return self._message.values(*args, **kwargs)
87 def items(self, *args, **kwargs):
88 return self._message.items(*args, **kwargs)
90 def get(self, *args, **kwargs):
91 return self._message.get(*args, **kwargs)
94 class FakeMessage(email.message.Message, object):
95 """ A fake RFC 2822 message that mocks the obsolete `rfc822.Message`. """
97 def __init__(self, *args, **kwargs):
98 super(FakeMessage, self).__init__(*args, **kwargs)
99 self.dict = _FieldsMapping(_message=self)
102 def make_fake_message(fields):
103 """ Make a fake message instance. """
104 message = FakeMessage()
105 for (name, value) in fields.items():
106 message.add_header(name, value)
107 return message
110 def make_files_field_value(params_by_name):
111 """ Make a value for “Files” field of a changes document. """
112 result = "\n".join(
113 " ".join(params)
114 for (file_name, params) in params_by_name.items())
115 return result
118 def make_upload_files_params(checksums_by_file_name, sizes_by_file_name):
119 """ Make a mapping of upload parameters for files. """
120 params_by_name = {
121 file_name: [
122 checksums_by_file_name[file_name],
123 str(sizes_by_file_name[file_name]),
124 "foo", "bar", file_name]
125 for file_name in checksums_by_file_name}
126 return params_by_name
129 def make_changes_document(fields, upload_params_by_name=None):
130 """ Make a changes document from field values.
132 :param fields: Sequence of (name, value) tuples for fields.
133 :param upload_params_by_name: Mapping from filename to upload
134 parameters for each file.
135 :return: The changes document as an RFC 822 formatted text.
138 document_fields = fields.copy()
139 if upload_params_by_name is not None:
140 files_field_text = make_files_field_value(upload_params_by_name)
141 document_fields.update({'files': files_field_text})
142 document = make_fake_message(document_fields)
144 return document
147 def make_changes_file_scenarios():
148 """ Make fake Debian upload control (‘*.changes’) scenarios. """
149 file_path = make_changes_file_path()
151 fake_file_empty = StringIO()
152 fake_file_no_format = StringIO(textwrap.dedent("""\
155 Files:
156 Lorem ipsum dolor sit amet
157 """))
158 fake_file_with_signature = StringIO(textwrap.dedent("""\
159 -----BEGIN PGP SIGNED MESSAGE-----
160 Hash: SHA1
164 Files:
165 Lorem ipsum dolor sit amet
167 -----BEGIN PGP SIGNATURE-----
168 Version: 0.0
169 Comment: Proin ac massa at orci sagittis fermentum.
171 gibberishgibberishgibberishgibberishgibberishgibberish
172 gibberishgibberishgibberishgibberishgibberishgibberish
173 gibberishgibberishgibberishgibberishgibberishgibberish
174 -----END PGP SIGNATURE-----
175 """))
176 fake_file_with_format = StringIO(textwrap.dedent("""\
177 Format: FOO
178 Files:
179 Lorem ipsum dolor sit amet
180 """))
181 fake_file_invalid = StringIO(textwrap.dedent("""\
182 Format: FOO
183 Files:
184 FOO BAR
185 """))
187 scenarios = [
188 ('no-format', {
189 'file_double': FileDouble(
190 path=file_path,
191 fake_file=fake_file_no_format),
192 'expected_result': make_changes_document({
193 'files': "Lorem ipsum dolor sit amet",
196 ('with-pgp-signature', {
197 'file_double': FileDouble(
198 path=file_path,
199 fake_file=fake_file_with_signature),
200 'expected_result': make_changes_document({
201 'files': "Lorem ipsum dolor sit amet",
204 ('with-format', {
205 'file_double': FileDouble(
206 path=file_path,
207 fake_file=fake_file_with_format),
208 'expected_result': make_changes_document({
209 'files': "Lorem ipsum dolor sit amet",
212 ('error empty', {
213 'file_double': FileDouble(
214 path=file_path, fake_file=fake_file_empty),
215 'expected_error': KeyError,
217 ('error invalid', {
218 'file_double': FileDouble(
219 path=file_path,
220 fake_file=fake_file_invalid),
221 'expected_error': FakeSystemExit,
225 for (scenario_name, scenario) in scenarios:
226 scenario['changes_file_scenario_name'] = scenario_name
228 return scenarios
231 def set_fake_upload_file_paths(testcase):
232 """ Set the fake upload file paths. """
233 testcase.fake_upload_file_paths = [
234 os.path.join(
235 os.path.dirname(testcase.changes_file_double.path),
236 os.path.basename(tempfile.mktemp()))
237 for __ in range(10)]
239 required_suffixes = [".dsc", ".tar.xz"]
240 suffixes = required_suffixes + getattr(
241 testcase, 'additional_file_suffixes', [])
242 file_path_base = testcase.fake_upload_file_paths.pop()
243 for suffix in suffixes:
244 file_path = file_path_base + suffix
245 testcase.fake_upload_file_paths.insert(0, file_path)
248 def set_file_checksums(testcase):
249 """ Set the fake file checksums for the test case. """
250 testcase.fake_checksum_by_file = {
251 os.path.basename(file_path): make_unique_slug(testcase)
252 for file_path in testcase.fake_upload_file_paths}
255 def set_file_sizes(testcase):
256 """ Set the fake file sizes for the test case. """
257 testcase.fake_size_by_file = {
258 os.path.basename(file_path): testcase.getUniqueInteger()
259 for file_path in testcase.fake_upload_file_paths}
262 def set_file_doubles(testcase):
263 """ Set the file doubles for the test case. """
264 for file_path in testcase.fake_upload_file_paths:
265 file_double = FileDouble(file_path)
266 file_double.set_os_stat_scenario('okay')
267 file_double.stat_result = file_double.stat_result._replace(
268 st_size=testcase.fake_size_by_file[
269 os.path.basename(file_path)],
271 file_double.register_for_testcase(testcase)
274 def setup_upload_file_fixtures(testcase):
275 """ Set fixtures for fake files to upload for the test case. """
276 set_fake_upload_file_paths(testcase)
277 set_file_checksums(testcase)
278 set_file_sizes(testcase)
279 set_file_doubles(testcase)
282 def make_changes_file_path(file_dir_path=None):
283 """ Make a filesystem path for the changes file. """
284 if file_dir_path is None:
285 file_dir_path = tempfile.mktemp()
286 file_name = os.path.basename(
287 "{base}.changes".format(base=tempfile.mktemp()))
288 file_path = os.path.join(file_dir_path, file_name)
290 return file_path
293 def setup_changes_file_fixtures(testcase):
294 """ Set up fixtures for changes file doubles. """
295 file_path = make_changes_file_path()
297 scenarios = make_fake_file_scenarios(file_path)
298 testcase.changes_file_scenarios = scenarios
300 file_doubles = get_file_doubles_from_fake_file_scenarios(
301 scenarios.values())
302 setup_file_double_behaviour(testcase, file_doubles)
305 def set_changes_file_scenario(testcase, name):
306 """ Set the changes file scenario for this test case. """
307 scenario = dict(testcase.changes_file_scenarios)[name]
308 testcase.changes_file_scenario = scenario
309 testcase.changes_file_double = scenario['file_double']
310 testcase.changes_file_double.register_for_testcase(testcase)
313 class parse_changes_TestCase(
314 testscenarios.WithScenarios,
315 testtools.TestCase):
316 """ Base for test cases for `parse_changes` function. """
318 scenarios = NotImplemented
320 def setUp(self):
321 """ Set up test fixtures. """
322 super(parse_changes_TestCase, self).setUp()
323 patch_system_interfaces(self)
325 self.test_infile = StringIO()
328 class parse_changes_SuccessTestCase(parse_changes_TestCase):
329 """ Success test cases for `parse_changes` function. """
331 scenarios = list(
332 (name, scenario)
333 for (name, scenario) in make_changes_file_scenarios()
334 if not name.startswith('error'))
336 def test_gives_expected_result_for_infile(self):
337 """ Should give the expected result for specified input file. """
338 result = dput.dput.parse_changes(self.file_double.fake_file)
339 normalised_result_set = set(
340 (key.lower(), value.strip())
341 for (key, value) in result.items())
342 self.assertEqual(
343 set(self.expected_result.items()), normalised_result_set)
346 class parse_changes_ErrorTestCase(parse_changes_TestCase):
347 """ Error test cases for `parse_changes` function. """
349 scenarios = list(
350 (name, scenario)
351 for (name, scenario) in make_changes_file_scenarios()
352 if name.startswith('error'))
354 def test_raises_expected_exception_for_infile(self):
355 """ Should raise the expected exception for specified input file. """
356 with testtools.ExpectedException(self.expected_error):
357 dput.dput.parse_changes(self.file_double.fake_file)
360 class check_upload_variant_TestCase(
361 testscenarios.WithScenarios,
362 testtools.TestCase):
363 """ Test cases for `check_upload_variant` function. """
365 scenarios = [
366 ('simple', {
367 'fields': {
368 'architecture': "foo bar baz",
370 'expected_result': True,
372 ('arch-missing', {
373 'fields': {
374 'spam': "Lorem ipsum dolor sit amet",
376 'expected_result': False,
378 ('source-only', {
379 'fields': {
380 'architecture': "source",
382 'expected_result': False,
384 ('source-and-others', {
385 'fields': {
386 'architecture': "foo source bar",
388 'expected_result': False,
392 def setUp(self):
393 """ Set up test fixtures. """
394 super(check_upload_variant_TestCase, self).setUp()
395 patch_system_interfaces(self)
397 self.set_changes_document(self.fields)
398 self.set_test_args()
400 def set_changes_document(self, fields):
401 """ Set the package changes document based on specified fields. """
402 self.test_changes_document = make_changes_document(fields)
404 def set_test_args(self):
405 """ Set the arguments for the test call to the function. """
406 self.test_args = {
407 'changes': self.test_changes_document,
408 'debug': False,
411 def test_returns_expected_result_for_changes_document(self):
412 """ Should return expected result for specified changes document. """
413 result = dput.dput.check_upload_variant(**self.test_args)
414 self.assertEqual(self.expected_result, result)
416 def test_emits_debug_message_showing_architecture(self):
417 """ Should emit a debug message for the specified architecture. """
418 if 'architecture' not in self.fields:
419 self.skipTest("Architecture field not in this scenario")
420 self.test_args['debug'] = True
421 dput.dput.check_upload_variant(**self.test_args)
422 expected_output = textwrap.dedent("""\
423 D: Architecture: {arch}
424 """).format(arch=self.fields['architecture'])
425 self.assertIn(expected_output, sys.stdout.getvalue())
427 def test_emits_debug_message_for_binary_upload(self):
428 """ Should emit a debug message for the specified architecture. """
429 triggers_binaryonly = bool(self.expected_result)
430 if not triggers_binaryonly:
431 self.skipTest("Scenario does not trigger binary-only upload")
432 self.test_args['debug'] = True
433 dput.dput.check_upload_variant(**self.test_args)
434 expected_output = textwrap.dedent("""\
435 D: Doing a binary upload only.
436 """)
437 self.assertIn(expected_output, sys.stdout.getvalue())
440 SourceCheckResult = collections.namedtuple(
441 'SourceCheckResult', ['include_orig', 'include_tar'])
444 class source_check_TestCase(
445 testscenarios.WithScenarios,
446 testtools.TestCase):
447 """ Test cases for `source_check` function. """
449 default_expected_result = SourceCheckResult(
450 include_orig=False, include_tar=False)
452 scenarios = [
453 ('no-version', {
454 'expected_result': default_expected_result,
456 ('no-epoch native-version', {
457 'upstream_version': "1.2",
458 'expected_result': SourceCheckResult(
459 include_orig=False, include_tar=True),
461 ('epoch native-version', {
462 'epoch': "3",
463 'upstream_version': "1.2",
464 'expected_result': SourceCheckResult(
465 include_orig=False, include_tar=True),
467 ('no-epoch debian-release', {
468 'upstream_version': "1.2",
469 'release': "5",
470 'expected_result': SourceCheckResult(
471 include_orig=False, include_tar=True),
473 ('epoch debian-release', {
474 'epoch': "3",
475 'upstream_version': "1.2",
476 'release': "5",
477 'expected_result': SourceCheckResult(
478 include_orig=False, include_tar=True),
480 ('no-epoch new-upstream-version', {
481 'upstream_version': "1.2",
482 'release': "1",
483 'expected_result': SourceCheckResult(
484 include_orig=True, include_tar=False),
486 ('epoch new_upstream-version', {
487 'epoch': "3",
488 'upstream_version': "1.2",
489 'release': "1",
490 'expected_result': SourceCheckResult(
491 include_orig=True, include_tar=False),
493 ('no-epoch nmu', {
494 'upstream_version': "1.2",
495 'release': "4.5",
496 'expected_result': SourceCheckResult(
497 include_orig=False, include_tar=True),
499 ('epoch nmu', {
500 'epoch': "3",
501 'upstream_version': "1.2",
502 'release': "4.5",
503 'expected_result': SourceCheckResult(
504 include_orig=False, include_tar=True),
506 ('no-epoch nmu before-first-release', {
507 'upstream_version': "1.2",
508 'release': "0.1",
509 'expected_result': SourceCheckResult(
510 include_orig=True, include_tar=False),
512 ('epoch nmu before-first-release', {
513 'epoch': "3",
514 'upstream_version': "1.2",
515 'release': "0.1",
516 'expected_result': SourceCheckResult(
517 include_orig=True, include_tar=False),
519 ('no-epoch nmu after-first-release', {
520 'upstream_version': "1.2",
521 'release': "1.1",
522 'expected_result': SourceCheckResult(
523 include_orig=True, include_tar=False),
525 ('epoch nmu after-first-release', {
526 'epoch': "3",
527 'upstream_version': "1.2",
528 'release': "1.1",
529 'expected_result': SourceCheckResult(
530 include_orig=True, include_tar=False),
534 for (scenario_name, scenario) in scenarios:
535 fields = {}
536 if 'upstream_version' in scenario:
537 version_string = scenario['upstream_version']
538 if 'epoch' in scenario:
539 version_string = "{epoch}:{version}".format(
540 epoch=scenario['epoch'], version=version_string)
541 if 'release' in scenario:
542 version_string = "{version}-{release}".format(
543 version=version_string, release=scenario['release'])
544 fields.update({'version': version_string})
545 scenario['version'] = version_string
546 scenario['changes_document'] = make_changes_document(fields)
547 del scenario_name, scenario
548 del fields, version_string
550 def setUp(self):
551 """ Set up test fixtures. """
552 super(source_check_TestCase, self).setUp()
553 patch_system_interfaces(self)
555 self.test_args = {
556 'changes': self.changes_document,
557 'debug': False,
560 def test_returns_expected_result_for_changes_document(self):
561 """ Should return expected result for specified changes document. """
562 result = dput.dput.source_check(**self.test_args)
563 self.assertEqual(self.expected_result, result)
565 def test_emits_version_string_debug_message_only_if_version(self):
566 """ Should emit message for version only if has version. """
567 self.test_args['debug'] = True
568 version = getattr(self, 'version', None)
569 message_lead = "D: Package Version:"
570 expected_output = textwrap.dedent("""\
571 {lead} {version}
572 """).format(
573 lead=message_lead, version=version)
574 dput.dput.source_check(**self.test_args)
575 if hasattr(self, 'version'):
576 self.assertIn(expected_output, sys.stdout.getvalue())
577 else:
578 self.assertNotIn(message_lead, sys.stdout.getvalue())
580 def test_emits_epoch_debug_message_only_if_epoch(self):
581 """ Should emit message for epoch only if has an epoch. """
582 self.test_args['debug'] = True
583 dput.dput.source_check(**self.test_args)
584 expected_output = textwrap.dedent("""\
585 D: Epoch found
586 """)
587 dput.dput.source_check(**self.test_args)
588 if (hasattr(self, 'epoch') and hasattr(self, 'release')):
589 self.assertIn(expected_output, sys.stdout.getvalue())
590 else:
591 self.assertNotIn(expected_output, sys.stdout.getvalue())
593 def test_emits_upstream_version_debug_message_only_if_nonnative(self):
594 """ Should emit message for upstream version only if non-native. """
595 self.test_args['debug'] = True
596 upstream_version = getattr(self, 'upstream_version', None)
597 message_lead = "D: Upstream Version:"
598 expected_output = textwrap.dedent("""\
599 {lead} {version}
600 """).format(
601 lead=message_lead, version=upstream_version)
602 dput.dput.source_check(**self.test_args)
603 if hasattr(self, 'release'):
604 self.assertIn(expected_output, sys.stdout.getvalue())
605 else:
606 self.assertNotIn(message_lead, sys.stdout.getvalue())
608 def test_emits_debian_release_debug_message_only_if_nonnative(self):
609 """ Should emit message for Debian release only if non-native. """
610 self.test_args['debug'] = True
611 debian_release = getattr(self, 'release', None)
612 message_lead = "D: Debian Version:"
613 expected_output = textwrap.dedent("""\
614 {lead} {version}
615 """).format(
616 lead=message_lead, version=debian_release)
617 dput.dput.source_check(**self.test_args)
618 if hasattr(self, 'release'):
619 self.assertIn(expected_output, sys.stdout.getvalue())
620 else:
621 self.assertNotIn(message_lead, sys.stdout.getvalue())
624 class verify_files_TestCase(
625 testscenarios.WithScenarios,
626 testtools.TestCase):
627 """ Test cases for `verify_files` function. """
629 default_args = dict(
630 host="foo",
631 check_only=None,
632 check_version=None,
633 unsigned_upload=None,
634 debug=None,
637 scenarios = [
638 ('default', {}),
639 ('binary-only', {
640 'check_upload_variant_return_value': False,
642 ('include foo.tar.gz', {
643 'additional_file_suffixes': [".tar.gz"],
644 'source_check_result': SourceCheckResult(
645 include_orig=False, include_tar=True),
647 ('include foo.orig.tar.gz', {
648 'additional_file_suffixes': [".orig.tar.gz"],
649 'source_check_result': SourceCheckResult(
650 include_orig=True, include_tar=False),
652 ('unexpected foo.tar.gz', {
653 'additional_file_suffixes': [".tar.gz"],
654 'expected_rejection_message': (
655 "Package includes a .tar.gz file although"),
657 ('unexpected foo.orig.tar.gz', {
658 'additional_file_suffixes': [".orig.tar.gz"],
659 'expected_rejection_message': (
660 "Package includes an .orig.tar.gz file although"),
662 ('no distribution', {
663 'test_distribution': None,
667 def setUp(self):
668 """ Set up test fixtures. """
669 super(verify_files_TestCase, self).setUp()
670 patch_system_interfaces(self)
672 self.file_double_by_path = {}
673 set_config(self, 'exist-simple')
674 patch_runtime_config_options(self)
676 self.set_test_args()
678 setup_changes_file_fixtures(self)
679 set_changes_file_scenario(self, 'exist-minimal')
680 self.test_args.update(dict(
681 path=os.path.dirname(self.changes_file_double.path),
682 filename=os.path.basename(self.changes_file_double.path),
685 patch_os_stat(self)
687 setup_upload_file_fixtures(self)
688 self.set_expected_files_to_upload()
690 self.patch_checksum_test()
691 self.patch_parse_changes()
692 self.patch_check_upload_variant()
693 self.set_expected_binary_upload()
694 self.set_expected_source_control_file_path()
695 self.patch_version_check()
696 self.patch_verify_signature()
697 self.patch_source_check()
699 def set_expected_files_to_upload(self):
700 """ Set the expected `files_to_upload` result for this test case. """
701 self.expected_files_to_upload = set(
702 path for path in self.fake_upload_file_paths)
703 self.expected_files_to_upload.add(self.changes_file_double.path)
705 def patch_checksum_test(self):
706 """ Patch `checksum_test` function for this test case. """
707 func_patcher = mock.patch.object(
708 dput.dput, "checksum_test", autospec=True)
709 mock_func = func_patcher.start()
710 self.addCleanup(func_patcher.stop)
712 def get_checksum_for_file(path, hash_name):
713 return self.fake_checksum_by_file[os.path.basename(path)]
714 mock_func.side_effect = get_checksum_for_file
716 def set_changes_document(self):
717 """ Set the changes document for this test case. """
718 self.changes_document = make_changes_document(
719 fields={},
720 upload_params_by_name=self.upload_params_by_name)
721 self.test_distribution = getattr(self, 'test_distribution', "lorem")
722 if self.test_distribution is not None:
723 self.changes_document.add_header(
724 'distribution', self.test_distribution)
725 self.runtime_config_parser.set(
726 self.test_args['host'], 'allowed_distributions',
727 self.test_distribution)
729 dput.dput.parse_changes.return_value = self.changes_document
731 def set_upload_params(self):
732 """ Set the upload parameters for this test case. """
733 self.upload_params_by_name = make_upload_files_params(
734 self.fake_checksum_by_file,
735 self.fake_size_by_file)
737 def patch_parse_changes(self):
738 """ Patch `parse_changes` function for this test case. """
739 func_patcher = mock.patch.object(
740 dput.dput, "parse_changes", autospec=True)
741 func_patcher.start()
742 self.addCleanup(func_patcher.stop)
744 self.set_upload_params()
745 self.set_changes_document()
747 def patch_check_upload_variant(self):
748 """ Patch `check_upload_variant` function for this test case. """
749 if not hasattr(self, 'check_upload_variant_return_value'):
750 self.check_upload_variant_return_value = True
752 func_patcher = mock.patch.object(
753 dput.dput, "check_upload_variant", autospec=True,
754 return_value=self.check_upload_variant_return_value)
755 func_patcher.start()
756 self.addCleanup(func_patcher.stop)
758 def patch_version_check(self):
759 """ Patch `version_check` function for this test case. """
760 func_patcher = mock.patch.object(
761 dput.dput, "version_check", autospec=True)
762 func_patcher.start()
763 self.addCleanup(func_patcher.stop)
765 def patch_verify_signature(self):
766 """ Patch `verify_signature` function for this test case. """
767 func_patcher = mock.patch.object(
768 dput.dput, "verify_signature", autospec=True)
769 func_patcher.start()
770 self.addCleanup(func_patcher.stop)
772 def patch_source_check(self):
773 """ Patch `source_check` function for this test case. """
774 func_patcher = mock.patch.object(
775 dput.dput, "source_check", autospec=True)
776 mock_func = func_patcher.start()
777 self.addCleanup(func_patcher.stop)
779 source_check_result = getattr(
780 self, 'source_check_result', SourceCheckResult(
781 include_orig=False, include_tar=False))
782 mock_func.return_value = source_check_result
784 def set_test_args(self):
785 """ Set test args for this test case. """
786 extra_args = getattr(self, 'extra_args', {})
787 self.test_args = self.default_args.copy()
788 self.test_args['config'] = self.runtime_config_parser
789 self.test_args.update(extra_args)
791 def set_expected_binary_upload(self):
792 """ Set expected value for `binary_upload` flag. """
793 self.expected_binary_upload = self.check_upload_variant_return_value
795 def set_expected_source_control_file_path(self):
796 """ Set expected value for source control file path. """
797 file_name = next(
798 os.path.basename(file_path)
799 for file_path in self.fake_upload_file_paths
800 if file_path.endswith(".dsc"))
801 if not self.expected_binary_upload:
802 self.expected_source_control_file_path = os.path.join(
803 os.path.dirname(self.changes_file_double.path), file_name)
804 else:
805 self.expected_source_control_file_path = ""
807 def test_emits_changes_file_path_debug_message(self):
808 """ Should emit debug message for changes file path. """
809 self.test_args['debug'] = True
810 dput.dput.verify_files(**self.test_args)
811 expected_output = textwrap.dedent("""\
812 D: Validating contents of changes file {path}
813 """).format(path=self.changes_file_double.path)
814 self.assertIn(expected_output, sys.stdout.getvalue())
816 def test_calls_sys_exit_if_input_read_denied(self):
817 """ Should call `sys.exit` if input file read access is denied. """
818 set_changes_file_scenario(self, 'error-read-denied')
819 with testtools.ExpectedException(FakeSystemExit):
820 dput.dput.verify_files(**self.test_args)
821 expected_output = textwrap.dedent("""\
822 Can't open {path}
823 """).format(path=self.changes_file_double.path)
824 self.assertIn(expected_output, sys.stdout.getvalue())
825 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
827 def test_calls_parse_changes_with_changes_files(self):
828 """ Should call `parse_changes` with changes file. """
829 dput.dput.verify_files(**self.test_args)
830 dput.dput.parse_changes.assert_called_with(
831 self.changes_file_double.fake_file)
833 def test_calls_check_upload_variant_with_changes_document(self):
834 """ Should call `check_upload_variant` with changes document. """
835 dput.dput.verify_files(**self.test_args)
836 dput.dput.check_upload_variant.assert_called_with(
837 self.changes_document, mock.ANY)
839 def test_emits_upload_dsc_file_debug_message(self):
840 """ Should emit debug message for ‘*.dsc’ file. """
841 if getattr(self, 'check_upload_variant_return_value', True):
842 self.skipTest("Binary package upload for this scenario")
843 self.test_args['debug'] = True
844 dput.dput.verify_files(**self.test_args)
845 dsc_file_path = next(
846 os.path.basename(file_path)
847 for file_path in self.fake_upload_file_paths
848 if file_path.endswith(".dsc"))
849 expected_output = textwrap.dedent("""\
850 D: dsc-File: {path}
851 """).format(path=dsc_file_path)
852 self.assertThat(
853 sys.stdout.getvalue(),
854 testtools.matchers.Contains(expected_output))
856 def test_calls_sys_exit_when_source_upload_omits_dsc_file(self):
857 """ Should call `sys.exit` when source upload omits ‘*.dsc’ file. """
858 if getattr(self, 'check_upload_variant_return_value', True):
859 self.skipTest("Binary package upload for this scenario")
860 self.fake_checksum_by_file = dict(
861 (file_path, checksum)
862 for (file_path, checksum)
863 in self.fake_checksum_by_file.items()
864 if not file_path.endswith(".dsc"))
865 self.set_upload_params()
866 self.set_changes_document()
867 with testtools.ExpectedException(FakeSystemExit):
868 dput.dput.verify_files(**self.test_args)
869 expected_output = textwrap.dedent("""\
870 Error: no dsc file found in sourceful upload
871 """)
872 self.assertThat(
873 sys.stderr.getvalue(),
874 testtools.matchers.Contains(expected_output))
875 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
877 def test_calls_version_check_when_specified_in_config(self):
878 """ Should call `version_check` when specified in config. """
879 self.runtime_config_parser.set(
880 self.test_args['host'], 'check_version', "true")
881 dput.dput.verify_files(**self.test_args)
882 dput.dput.version_check.assert_called_with(
883 os.path.dirname(self.changes_file_double.path),
884 self.changes_document,
885 self.test_args['debug'])
887 def test_calls_version_check_when_specified_in_args(self):
888 """ Should call `version_check` when specified in arguments. """
889 self.test_args['check_version'] = True
890 dput.dput.verify_files(**self.test_args)
891 dput.dput.version_check.assert_called_with(
892 os.path.dirname(self.changes_file_double.path),
893 self.changes_document,
894 self.test_args['debug'])
896 def test_calls_sys_exit_when_host_section_not_in_config(self):
897 """ Should call `sys.exit` when specified host not in config. """
898 self.runtime_config_parser.remove_section(self.test_args['host'])
899 with testtools.ExpectedException(FakeSystemExit):
900 dput.dput.verify_files(**self.test_args)
901 expected_output = textwrap.dedent("""\
902 Error in config file:
903 No section: ...
904 """)
905 self.assertThat(
906 sys.stderr.getvalue(),
907 testtools.matchers.DocTestMatches(
908 expected_output, doctest.ELLIPSIS))
909 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
911 def test_calls_verify_signature_with_expected_args(self):
912 """ Should call `verify_signature` with expected args. """
913 dput.dput.verify_files(**self.test_args)
914 dput.dput.verify_signature.assert_called_with(
915 self.test_args['host'],
916 self.changes_file_double.path,
917 self.expected_source_control_file_path,
918 self.runtime_config_parser,
919 self.test_args['check_only'],
920 self.test_args['unsigned_upload'], mock.ANY,
921 self.test_args['debug'])
923 def test_calls_source_check_with_changes_document(self):
924 """ Should call `source_check` with changes document. """
925 dput.dput.verify_files(**self.test_args)
926 dput.dput.source_check.assert_called_with(
927 self.changes_document, self.test_args['debug'])
929 def test_emits_upload_file_path_debug_message(self):
930 """ Should emit debug message for each upload file path. """
931 self.test_args['debug'] = True
932 dput.dput.verify_files(**self.test_args)
933 for file_path in self.fake_upload_file_paths:
934 expected_output = textwrap.dedent("""\
935 D: File to upload: {path}
936 """).format(path=file_path)
937 self.expectThat(
938 sys.stdout.getvalue(),
939 testtools.matchers.Contains(expected_output))
941 def test_calls_checksum_test_with_upload_files(self):
942 """ Should call `checksum_test` with each upload file path. """
943 dput.dput.verify_files(**self.test_args)
944 expected_calls = [
945 mock.call(file_path, mock.ANY)
946 for file_path in self.fake_upload_file_paths]
947 dput.dput.checksum_test.assert_has_calls(
948 expected_calls, any_order=True)
950 def set_bogus_file_checksums(self):
951 """ Set bogus file checksums that will not match. """
952 self.fake_checksum_by_file = {
953 file_name: self.getUniqueString()
954 for file_name in self.fake_checksum_by_file}
956 def test_emits_checksum_okay_debug_message(self):
957 """ Should emit debug message checksum okay for each file. """
958 self.test_args['debug'] = True
959 dput.dput.verify_files(**self.test_args)
960 for file_path in self.fake_upload_file_paths:
961 expected_output = textwrap.dedent("""\
962 D: Checksum for {path} is fine
963 """).format(path=file_path)
964 self.expectThat(
965 sys.stdout.getvalue(),
966 testtools.matchers.Contains(expected_output))
968 def test_emits_checksum_mismatch_debug_message(self):
969 """ Should emit debug message when a checksum does not match. """
970 self.test_args['debug'] = True
971 self.set_bogus_file_checksums()
972 with testtools.ExpectedException(FakeSystemExit):
973 dput.dput.verify_files(**self.test_args)
974 expected_output = textwrap.dedent("""\
976 D: Checksum from .changes: ...
977 D: Generated Checksum: ...
979 """)
980 self.assertThat(
981 sys.stdout.getvalue(),
982 testtools.matchers.DocTestMatches(
983 expected_output, doctest.ELLIPSIS))
985 def test_calls_sys_exit_when_checksum_mismatch(self):
986 """ Should call `sys.exit` when a checksum does not match. """
987 specified_checksum_by_file = self.fake_checksum_by_file
988 self.set_bogus_file_checksums()
989 with testtools.ExpectedException(FakeSystemExit):
990 dput.dput.verify_files(**self.test_args)
992 expected_output_for_files = [
993 textwrap.dedent("""\
994 Checksum doesn't match for {file_name}
995 """).format(
996 file_name=os.path.join(
997 os.path.dirname(self.changes_file_double.path),
998 file_name),
999 specified_hash=specified_hash,
1000 computed_hash=self.fake_checksum_by_file[file_name])
1001 for (file_name, specified_hash)
1002 in specified_checksum_by_file.items()]
1003 self.assertThat(
1004 sys.stdout.getvalue(),
1005 testtools.matchers.MatchesAny(*[
1006 testtools.matchers.Contains(expected_output)
1007 for expected_output in expected_output_for_files]))
1008 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
1010 def test_calls_os_stat_with_upload_files(self):
1011 """ Should call `os.stat` with each upload file path. """
1012 dput.dput.verify_files(**self.test_args)
1013 expected_calls = [
1014 mock.call(file_path)
1015 for file_path in self.fake_upload_file_paths]
1016 os.stat.assert_has_calls(expected_calls, any_order=True)
1018 def set_bogus_file_sizes(self):
1019 """ Set bogus file sizes that will not match. """
1020 file_double_registry = FileDouble.get_registry_for_testcase(self)
1021 for file_name in self.fake_size_by_file:
1022 bogus_size = self.getUniqueInteger()
1023 self.fake_size_by_file[file_name] = bogus_size
1024 file_path = os.path.join(
1025 os.path.dirname(self.changes_file_double.path),
1026 file_name)
1027 file_double = file_double_registry[file_path]
1028 file_double.stat_result = file_double.stat_result._replace(
1029 st_size=bogus_size)
1031 def test_emits_size_mismatch_debug_message(self):
1032 """ Should emit debug message when a size does not match. """
1033 self.test_args['debug'] = True
1034 self.set_bogus_file_sizes()
1035 dput.dput.verify_files(**self.test_args)
1036 expected_output = textwrap.dedent("""\
1038 D: size from .changes: ...
1039 D: calculated size: ...
1041 """)
1042 self.assertThat(
1043 sys.stdout.getvalue(),
1044 testtools.matchers.DocTestMatches(
1045 expected_output, doctest.ELLIPSIS))
1047 def test_emits_size_mismatch_message_for_each_file(self):
1048 """ Should emit error message for each file with size mismatch. """
1049 self.set_bogus_file_sizes()
1050 dput.dput.verify_files(**self.test_args)
1051 for file_path in self.fake_upload_file_paths:
1052 expected_output = textwrap.dedent("""\
1053 size doesn't match for {path}
1054 """).format(path=file_path)
1055 self.expectThat(
1056 sys.stdout.getvalue(),
1057 testtools.matchers.Contains(expected_output))
1059 def test_emits_rejection_warning_when_unexpected_tarball(self):
1060 """ Should emit warning of rejection when unexpected tarball. """
1061 if not hasattr(self, 'expected_rejection_message'):
1062 self.skipTest("No rejection message expected")
1063 dput.dput.verify_files(**self.test_args)
1064 sys.stderr.write("calls: {calls!r}\n".format(
1065 calls=sys.stdout.write.mock_calls))
1066 self.assertThat(
1067 sys.stdout.getvalue(),
1068 testtools.matchers.Contains(self.expected_rejection_message))
1070 def test_raises_error_when_distribution_mismatch(self):
1071 """ Should raise error when distribution mismatch against allowed. """
1072 if not getattr(self, 'test_distribution', None):
1073 self.skipTest("No distribution set for this test case")
1074 self.runtime_config_parser.set(
1075 self.test_args['host'], 'allowed_distributions',
1076 "dolor sit amet")
1077 with testtools.ExpectedException(dputhelper.DputUploadFatalException):
1078 dput.dput.verify_files(**self.test_args)
1080 def test_emits_changes_file_upload_debug_message(self):
1081 """ Should emit debug message for upload of changes file. """
1082 self.test_args['debug'] = True
1083 dput.dput.verify_files(**self.test_args)
1084 expected_output = textwrap.dedent("""\
1085 D: File to upload: {path}
1086 """).format(path=self.changes_file_double.path)
1087 self.assertIn(expected_output, sys.stdout.getvalue())
1089 def test_returns_expected_files_to_upload_collection(self):
1090 """ Should return expected `files_to_upload` collection value. """
1091 result = dput.dput.verify_files(**self.test_args)
1092 expected_result = self.expected_files_to_upload
1093 self.assertEqual(expected_result, set(result))
1096 class guess_upload_host_TestCase(
1097 testscenarios.WithScenarios,
1098 testtools.TestCase):
1099 """ Test cases for `guess_upload_host` function. """
1101 changes_file_scenarios = [
1102 ('no-distribution', {
1103 'fake_file': StringIO(textwrap.dedent("""\
1104 Files:
1105 Lorem ipsum dolor sit amet
1106 """)),
1108 ('distribution-spam', {
1109 'fake_file': StringIO(textwrap.dedent("""\
1110 Distribution: spam
1111 Files:
1112 Lorem ipsum dolor sit amet
1113 """)),
1115 ('distribution-beans', {
1116 'fake_file': StringIO(textwrap.dedent("""\
1117 Distribution: beans
1118 Files:
1119 Lorem ipsum dolor sit amet
1120 """)),
1124 scenarios = [
1125 ('distribution-found-of-one', {
1126 'changes_file_scenario_name': "distribution-spam",
1127 'test_distribution': "spam",
1128 'config_scenario_name': "exist-distribution-one",
1129 'expected_host': "foo",
1131 ('distribution-notfound-of-one', {
1132 'changes_file_scenario_name': "distribution-beans",
1133 'test_distribution': "beans",
1134 'config_scenario_name': "exist-distribution-one",
1135 'expected_host': "ftp-master",
1137 ('distribution-first-of-three', {
1138 'changes_file_scenario_name': "distribution-spam",
1139 'test_distribution': "spam",
1140 'config_scenario_name': "exist-distribution-three",
1141 'expected_host': "foo",
1143 ('distribution-last-of-three', {
1144 'changes_file_scenario_name': "distribution-beans",
1145 'test_distribution': "beans",
1146 'config_scenario_name': "exist-distribution-three",
1147 'expected_host': "foo",
1149 ('no-configured-distribution', {
1150 'changes_file_scenario_name': "distribution-beans",
1151 'config_scenario_name': "exist-distribution-none",
1152 'expected_host': "ftp-master",
1154 ('no-distribution', {
1155 'changes_file_scenario_name': "no-distribution",
1156 'config_scenario_name': "exist-simple",
1157 'expected_host': "ftp-master",
1159 ('default-distribution', {
1160 'config_scenario_name': "exist-default-distribution-only",
1161 'config_default_default_host_main': "consecteur",
1162 'expected_host': "consecteur",
1166 def setUp(self):
1167 """ Set up test fixtures. """
1168 super(guess_upload_host_TestCase, self).setUp()
1169 patch_system_interfaces(self)
1171 set_config(
1172 self,
1173 getattr(self, 'config_scenario_name', 'exist-minimal'))
1174 patch_runtime_config_options(self)
1176 self.setup_changes_file_fixtures()
1177 set_changes_file_scenario(
1178 self,
1179 getattr(self, 'changes_file_scenario_name', 'no-distribution'))
1181 self.set_test_args()
1183 def set_test_args(self):
1184 """ Set the arguments for the test call to the function. """
1185 self.test_args = dict(
1186 path=os.path.dirname(self.changes_file_double.path),
1187 filename=os.path.basename(self.changes_file_double.path),
1188 config=self.runtime_config_parser,
1191 def setup_changes_file_fixtures(self):
1192 """ Set up fixtures for fake changes file. """
1193 file_path = make_changes_file_path()
1195 scenarios = [s for (__, s) in self.changes_file_scenarios]
1196 for scenario in scenarios:
1197 scenario['file_double'] = FileDouble(
1198 file_path, scenario['fake_file'])
1199 setup_file_double_behaviour(
1200 self,
1201 get_file_doubles_from_fake_file_scenarios(scenarios))
1203 def test_calls_sys_exit_if_read_denied(self):
1204 """ Should call `sys.exit` if read permission denied. """
1205 self.changes_file_double.set_os_access_scenario('denied')
1206 self.changes_file_double.set_open_scenario('read_denied')
1207 with testtools.ExpectedException(FakeSystemExit):
1208 dput.dput.guess_upload_host(**self.test_args)
1209 expected_output = textwrap.dedent("""\
1210 Can't open {path}
1211 """).format(path=self.changes_file_double.path)
1212 self.assertIn(expected_output, sys.stdout.getvalue())
1213 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
1215 def test_returns_expected_host(self):
1216 """ Should return expected host value. """
1217 result = dput.dput.guess_upload_host(**self.test_args)
1218 self.assertEqual(self.expected_host, result)
1220 @mock.patch.object(dput.dput, 'debug', True)
1221 def test_emits_debug_message_for_host(self):
1222 """ Should emit a debug message for the discovered host. """
1223 config_parser = self.runtime_config_parser
1224 if not (
1225 config_parser.has_section(self.expected_host)
1226 and config_parser.get(self.expected_host, 'distributions')):
1227 self.skipTest("No distributions specified")
1228 dput.dput.guess_upload_host(**self.test_args)
1229 expected_output = textwrap.dedent("""\
1230 D: guessing host {host} based on distribution {dist}
1231 """).format(
1232 host=self.expected_host, dist=self.test_distribution)
1233 self.assertIn(expected_output, sys.stdout.getvalue())
1236 # Copyright © 2015–2016 Ben Finney <bignose@debian.org>
1238 # This is free software: you may copy, modify, and/or distribute this work
1239 # under the terms of the GNU General Public License as published by the
1240 # Free Software Foundation; version 3 of that license or any later version.
1241 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
1244 # Local variables:
1245 # coding: utf-8
1246 # mode: python
1247 # End:
1248 # vim: fileencoding=utf-8 filetype=python :