Update copyright notices.
[dput.git] / test / test_changesfile.py
blob8130f3f45783b928a61aff2ee215ffa178d114b3
1 # -*- coding: utf-8; -*-
3 # test/test_changesfile.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015–2016 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Unit tests for Debian upload control (‘*.changes’) files. """
15 from __future__ import (absolute_import, unicode_literals)
17 import sys
18 import os
19 import os.path
20 import tempfile
21 import textwrap
22 import doctest
23 import email.message
24 import collections
26 import testtools
27 import testscenarios
29 __package__ = str("test")
30 __import__(__package__)
31 sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
32 import dput.dput
33 from dput.helper import dputhelper
35 from .helper import (
36 StringIO,
37 mock,
38 patch_system_interfaces,
39 make_unique_slug,
40 FakeSystemExit,
41 EXIT_STATUS_FAILURE,
42 make_fake_file_scenarios,
43 get_file_doubles_from_fake_file_scenarios,
44 setup_file_double_behaviour,
45 patch_os_stat,
46 FileDouble,
48 from .test_configfile import (
49 set_config,
50 patch_runtime_config_options,
53 __metaclass__ = type
56 class _FieldsMapping:
57 """ A mapping to stand in for the `dict` of an `email.message.Message`. """
59 def __init__(self, *args, **kwargs):
60 try:
61 self._message = kwargs.pop('_message')
62 except KeyError:
63 raise TypeError("no ‘_message’ specified for this mapping")
64 super(_FieldsMapping, self).__init__(*args, **kwargs)
66 def __len__(self, *args, **kwargs):
67 return self._message.__len__(*args, **kwargs)
69 def __contains__(self, *args, **kwargs):
70 return self._message.__contains__(*args, **kwargs)
72 def __getitem__(self, *args, **kwargs):
73 return self._message.__getitem__(*args, **kwargs)
75 def __setitem__(self, *args, **kwargs):
76 self._message.__setitem__(*args, **kwargs)
78 def __delitem__(self, *args, **kwargs):
79 self._message.__delitem__(*args, **kwargs)
81 def has_key(self, *args, **kwargs):
82 return self._message.has_key(*args, **kwargs)
84 def keys(self, *args, **kwargs):
85 return self._message.keys(*args, **kwargs)
87 def values(self, *args, **kwargs):
88 return self._message.values(*args, **kwargs)
90 def items(self, *args, **kwargs):
91 return self._message.items(*args, **kwargs)
93 def get(self, *args, **kwargs):
94 return self._message.get(*args, **kwargs)
97 class FakeMessage(email.message.Message, object):
98 """ A fake RFC 2822 message that mocks the obsolete `rfc822.Message`. """
100 def __init__(self, *args, **kwargs):
101 super(FakeMessage, self).__init__(*args, **kwargs)
102 self.dict = _FieldsMapping(_message=self)
105 def make_fake_message(fields):
106 """ Make a fake message instance. """
107 message = FakeMessage()
108 for (name, value) in fields.items():
109 message.add_header(name, value)
110 return message
113 def make_files_field_value(params_by_name):
114 """ Make a value for “Files” field of a changes document. """
115 result = "\n".join(
116 " ".join(params)
117 for (file_name, params) in params_by_name.items())
118 return result
121 def make_upload_files_params(checksums_by_file_name, sizes_by_file_name):
122 """ Make a mapping of upload parameters for files. """
123 params_by_name = {
124 file_name: [
125 checksums_by_file_name[file_name],
126 str(sizes_by_file_name[file_name]),
127 "foo", "bar", file_name]
128 for file_name in checksums_by_file_name}
129 return params_by_name
132 def make_changes_document(fields, upload_params_by_name=None):
133 """ Make a changes document from field values.
135 :param fields: Sequence of (name, value) tuples for fields.
136 :param upload_params_by_name: Mapping from filename to upload
137 parameters for each file.
138 :return: The changes document as an RFC 822 formatted text.
141 document_fields = fields.copy()
142 if upload_params_by_name is not None:
143 files_field_text = make_files_field_value(upload_params_by_name)
144 document_fields.update({'files': files_field_text})
145 document = make_fake_message(document_fields)
147 return document
150 def make_changes_file_scenarios():
151 """ Make fake Debian upload control (‘*.changes’) scenarios. """
152 file_path = make_changes_file_path()
154 fake_file_empty = StringIO()
155 fake_file_no_format = StringIO(textwrap.dedent("""\
158 Files:
159 Lorem ipsum dolor sit amet
160 """))
161 fake_file_with_signature = StringIO(textwrap.dedent("""\
162 -----BEGIN PGP SIGNED MESSAGE-----
163 Hash: SHA1
167 Files:
168 Lorem ipsum dolor sit amet
170 -----BEGIN PGP SIGNATURE-----
171 Version: 0.0
172 Comment: Proin ac massa at orci sagittis fermentum.
174 gibberishgibberishgibberishgibberishgibberishgibberish
175 gibberishgibberishgibberishgibberishgibberishgibberish
176 gibberishgibberishgibberishgibberishgibberishgibberish
177 -----END PGP SIGNATURE-----
178 """))
179 fake_file_with_format = StringIO(textwrap.dedent("""\
180 Format: FOO
181 Files:
182 Lorem ipsum dolor sit amet
183 """))
184 fake_file_invalid = StringIO(textwrap.dedent("""\
185 Format: FOO
186 Files:
187 FOO BAR
188 """))
190 scenarios = [
191 ('no-format', {
192 'file_double': FileDouble(
193 path=file_path,
194 fake_file=fake_file_no_format),
195 'expected_result': make_changes_document({
196 'files': "Lorem ipsum dolor sit amet",
199 ('with-pgp-signature', {
200 'file_double': FileDouble(
201 path=file_path,
202 fake_file=fake_file_with_signature),
203 'expected_result': make_changes_document({
204 'files': "Lorem ipsum dolor sit amet",
207 ('with-format', {
208 'file_double': FileDouble(
209 path=file_path,
210 fake_file=fake_file_with_format),
211 'expected_result': make_changes_document({
212 'files': "Lorem ipsum dolor sit amet",
215 ('error empty', {
216 'file_double': FileDouble(
217 path=file_path, fake_file=fake_file_empty),
218 'expected_error': KeyError,
220 ('error invalid', {
221 'file_double': FileDouble(
222 path=file_path,
223 fake_file=fake_file_invalid),
224 'expected_error': FakeSystemExit,
228 for (scenario_name, scenario) in scenarios:
229 scenario['changes_file_scenario_name'] = scenario_name
231 return scenarios
234 def set_fake_upload_file_paths(testcase):
235 """ Set the fake upload file paths. """
236 testcase.fake_upload_file_paths = [
237 os.path.join(
238 os.path.dirname(testcase.changes_file_double.path),
239 os.path.basename(tempfile.mktemp()))
240 for __ in range(10)]
242 required_suffixes = [".dsc", ".tar.xz"]
243 suffixes = required_suffixes + getattr(
244 testcase, 'additional_file_suffixes', [])
245 file_path_base = testcase.fake_upload_file_paths.pop()
246 for suffix in suffixes:
247 file_path = file_path_base + suffix
248 testcase.fake_upload_file_paths.insert(0, file_path)
251 def set_file_checksums(testcase):
252 """ Set the fake file checksums for the test case. """
253 testcase.fake_checksum_by_file = {
254 os.path.basename(file_path): make_unique_slug(testcase)
255 for file_path in testcase.fake_upload_file_paths}
258 def set_file_sizes(testcase):
259 """ Set the fake file sizes for the test case. """
260 testcase.fake_size_by_file = {
261 os.path.basename(file_path): testcase.getUniqueInteger()
262 for file_path in testcase.fake_upload_file_paths}
265 def set_file_doubles(testcase):
266 """ Set the file doubles for the test case. """
267 for file_path in testcase.fake_upload_file_paths:
268 file_double = FileDouble(file_path)
269 file_double.set_os_stat_scenario('okay')
270 file_double.stat_result = file_double.stat_result._replace(
271 st_size=testcase.fake_size_by_file[
272 os.path.basename(file_path)],
274 file_double.register_for_testcase(testcase)
277 def setup_upload_file_fixtures(testcase):
278 """ Set fixtures for fake files to upload for the test case. """
279 set_fake_upload_file_paths(testcase)
280 set_file_checksums(testcase)
281 set_file_sizes(testcase)
282 set_file_doubles(testcase)
285 def make_changes_file_path(file_dir_path=None):
286 """ Make a filesystem path for the changes file. """
287 if file_dir_path is None:
288 file_dir_path = tempfile.mktemp()
289 file_name = os.path.basename(
290 "{base}.changes".format(base=tempfile.mktemp()))
291 file_path = os.path.join(file_dir_path, file_name)
293 return file_path
296 def setup_changes_file_fixtures(testcase):
297 """ Set up fixtures for changes file doubles. """
298 file_path = make_changes_file_path()
300 scenarios = make_fake_file_scenarios(file_path)
301 testcase.changes_file_scenarios = scenarios
303 file_doubles = get_file_doubles_from_fake_file_scenarios(
304 scenarios.values())
305 setup_file_double_behaviour(testcase, file_doubles)
308 def set_changes_file_scenario(testcase, name):
309 """ Set the changes file scenario for this test case. """
310 scenario = dict(testcase.changes_file_scenarios)[name]
311 testcase.changes_file_scenario = scenario
312 testcase.changes_file_double = scenario['file_double']
313 testcase.changes_file_double.register_for_testcase(testcase)
316 class parse_changes_TestCase(
317 testscenarios.WithScenarios,
318 testtools.TestCase):
319 """ Base for test cases for `parse_changes` function. """
321 scenarios = NotImplemented
323 def setUp(self):
324 """ Set up test fixtures. """
325 super(parse_changes_TestCase, self).setUp()
326 patch_system_interfaces(self)
328 self.test_infile = StringIO()
331 class parse_changes_SuccessTestCase(parse_changes_TestCase):
332 """ Success test cases for `parse_changes` function. """
334 scenarios = list(
335 (name, scenario)
336 for (name, scenario) in make_changes_file_scenarios()
337 if not name.startswith('error'))
339 def test_gives_expected_result_for_infile(self):
340 """ Should give the expected result for specified input file. """
341 result = dput.dput.parse_changes(self.file_double.fake_file)
342 normalised_result_set = set(
343 (key.lower(), value.strip())
344 for (key, value) in result.items())
345 self.assertEqual(
346 set(self.expected_result.items()), normalised_result_set)
349 class parse_changes_ErrorTestCase(parse_changes_TestCase):
350 """ Error test cases for `parse_changes` function. """
352 scenarios = list(
353 (name, scenario)
354 for (name, scenario) in make_changes_file_scenarios()
355 if name.startswith('error'))
357 def test_raises_expected_exception_for_infile(self):
358 """ Should raise the expected exception for specified input file. """
359 with testtools.ExpectedException(self.expected_error):
360 dput.dput.parse_changes(self.file_double.fake_file)
363 class check_upload_variant_TestCase(
364 testscenarios.WithScenarios,
365 testtools.TestCase):
366 """ Test cases for `check_upload_variant` function. """
368 scenarios = [
369 ('simple', {
370 'fields': {
371 'architecture': "foo bar baz",
373 'expected_result': True,
375 ('arch-missing', {
376 'fields': {
377 'spam': "Lorem ipsum dolor sit amet",
379 'expected_result': False,
381 ('source-only', {
382 'fields': {
383 'architecture': "source",
385 'expected_result': False,
387 ('source-and-others', {
388 'fields': {
389 'architecture': "foo source bar",
391 'expected_result': False,
395 def setUp(self):
396 """ Set up test fixtures. """
397 super(check_upload_variant_TestCase, self).setUp()
398 patch_system_interfaces(self)
400 self.set_changes_document(self.fields)
401 self.set_test_args()
403 def set_changes_document(self, fields):
404 """ Set the package changes document based on specified fields. """
405 self.test_changes_document = make_changes_document(fields)
407 def set_test_args(self):
408 """ Set the arguments for the test call to the function. """
409 self.test_args = {
410 'changes': self.test_changes_document,
411 'debug': False,
414 def test_returns_expected_result_for_changes_document(self):
415 """ Should return expected result for specified changes document. """
416 result = dput.dput.check_upload_variant(**self.test_args)
417 self.assertEqual(self.expected_result, result)
419 def test_emits_debug_message_showing_architecture(self):
420 """ Should emit a debug message for the specified architecture. """
421 if 'architecture' not in self.fields:
422 self.skipTest("Architecture field not in this scenario")
423 self.test_args['debug'] = True
424 dput.dput.check_upload_variant(**self.test_args)
425 expected_output = textwrap.dedent("""\
426 D: Architecture: {arch}
427 """).format(arch=self.fields['architecture'])
428 self.assertIn(expected_output, sys.stdout.getvalue())
430 def test_emits_debug_message_for_binary_upload(self):
431 """ Should emit a debug message for the specified architecture. """
432 triggers_binaryonly = bool(self.expected_result)
433 if not triggers_binaryonly:
434 self.skipTest("Scenario does not trigger binary-only upload")
435 self.test_args['debug'] = True
436 dput.dput.check_upload_variant(**self.test_args)
437 expected_output = textwrap.dedent("""\
438 D: Doing a binary upload only.
439 """)
440 self.assertIn(expected_output, sys.stdout.getvalue())
443 SourceCheckResult = collections.namedtuple(
444 'SourceCheckResult', ['include_orig', 'include_tar'])
447 class source_check_TestCase(
448 testscenarios.WithScenarios,
449 testtools.TestCase):
450 """ Test cases for `source_check` function. """
452 default_expected_result = SourceCheckResult(
453 include_orig=False, include_tar=False)
455 scenarios = [
456 ('no-version', {
457 'expected_result': default_expected_result,
459 ('no-epoch native-version', {
460 'upstream_version': "1.2",
461 'expected_result': SourceCheckResult(
462 include_orig=False, include_tar=True),
464 ('epoch native-version', {
465 'epoch': "3",
466 'upstream_version': "1.2",
467 'expected_result': SourceCheckResult(
468 include_orig=False, include_tar=True),
470 ('no-epoch debian-release', {
471 'upstream_version': "1.2",
472 'release': "5",
473 'expected_result': SourceCheckResult(
474 include_orig=False, include_tar=True),
476 ('epoch debian-release', {
477 'epoch': "3",
478 'upstream_version': "1.2",
479 'release': "5",
480 'expected_result': SourceCheckResult(
481 include_orig=False, include_tar=True),
483 ('no-epoch new-upstream-version', {
484 'upstream_version': "1.2",
485 'release': "1",
486 'expected_result': SourceCheckResult(
487 include_orig=True, include_tar=False),
489 ('epoch new_upstream-version', {
490 'epoch': "3",
491 'upstream_version': "1.2",
492 'release': "1",
493 'expected_result': SourceCheckResult(
494 include_orig=True, include_tar=False),
496 ('no-epoch nmu', {
497 'upstream_version': "1.2",
498 'release': "4.5",
499 'expected_result': SourceCheckResult(
500 include_orig=False, include_tar=True),
502 ('epoch nmu', {
503 'epoch': "3",
504 'upstream_version': "1.2",
505 'release': "4.5",
506 'expected_result': SourceCheckResult(
507 include_orig=False, include_tar=True),
509 ('no-epoch nmu before-first-release', {
510 'upstream_version': "1.2",
511 'release': "0.1",
512 'expected_result': SourceCheckResult(
513 include_orig=True, include_tar=False),
515 ('epoch nmu before-first-release', {
516 'epoch': "3",
517 'upstream_version': "1.2",
518 'release': "0.1",
519 'expected_result': SourceCheckResult(
520 include_orig=True, include_tar=False),
522 ('no-epoch nmu after-first-release', {
523 'upstream_version': "1.2",
524 'release': "1.1",
525 'expected_result': SourceCheckResult(
526 include_orig=True, include_tar=False),
528 ('epoch nmu after-first-release', {
529 'epoch': "3",
530 'upstream_version': "1.2",
531 'release': "1.1",
532 'expected_result': SourceCheckResult(
533 include_orig=True, include_tar=False),
537 for (scenario_name, scenario) in scenarios:
538 fields = {}
539 if 'upstream_version' in scenario:
540 version_string = scenario['upstream_version']
541 if 'epoch' in scenario:
542 version_string = "{epoch}:{version}".format(
543 epoch=scenario['epoch'], version=version_string)
544 if 'release' in scenario:
545 version_string = "{version}-{release}".format(
546 version=version_string, release=scenario['release'])
547 fields.update({'version': version_string})
548 scenario['version'] = version_string
549 scenario['changes_document'] = make_changes_document(fields)
550 del scenario_name, scenario
551 del fields, version_string
553 def setUp(self):
554 """ Set up test fixtures. """
555 super(source_check_TestCase, self).setUp()
556 patch_system_interfaces(self)
558 self.test_args = {
559 'changes': self.changes_document,
560 'debug': False,
563 def test_returns_expected_result_for_changes_document(self):
564 """ Should return expected result for specified changes document. """
565 result = dput.dput.source_check(**self.test_args)
566 self.assertEqual(self.expected_result, result)
568 def test_emits_version_string_debug_message_only_if_version(self):
569 """ Should emit message for version only if has version. """
570 self.test_args['debug'] = True
571 version = getattr(self, 'version', None)
572 message_lead = "D: Package Version:"
573 expected_output = textwrap.dedent("""\
574 {lead} {version}
575 """).format(
576 lead=message_lead, version=version)
577 dput.dput.source_check(**self.test_args)
578 if hasattr(self, 'version'):
579 self.assertIn(expected_output, sys.stdout.getvalue())
580 else:
581 self.assertNotIn(message_lead, sys.stdout.getvalue())
583 def test_emits_epoch_debug_message_only_if_epoch(self):
584 """ Should emit message for epoch only if has an epoch. """
585 self.test_args['debug'] = True
586 dput.dput.source_check(**self.test_args)
587 expected_output = textwrap.dedent("""\
588 D: Epoch found
589 """)
590 dput.dput.source_check(**self.test_args)
591 if (hasattr(self, 'epoch') and hasattr(self, 'release')):
592 self.assertIn(expected_output, sys.stdout.getvalue())
593 else:
594 self.assertNotIn(expected_output, sys.stdout.getvalue())
596 def test_emits_upstream_version_debug_message_only_if_nonnative(self):
597 """ Should emit message for upstream version only if non-native. """
598 self.test_args['debug'] = True
599 upstream_version = getattr(self, 'upstream_version', None)
600 message_lead = "D: Upstream Version:"
601 expected_output = textwrap.dedent("""\
602 {lead} {version}
603 """).format(
604 lead=message_lead, version=upstream_version)
605 dput.dput.source_check(**self.test_args)
606 if hasattr(self, 'release'):
607 self.assertIn(expected_output, sys.stdout.getvalue())
608 else:
609 self.assertNotIn(message_lead, sys.stdout.getvalue())
611 def test_emits_debian_release_debug_message_only_if_nonnative(self):
612 """ Should emit message for Debian release only if non-native. """
613 self.test_args['debug'] = True
614 debian_release = getattr(self, 'release', None)
615 message_lead = "D: Debian Version:"
616 expected_output = textwrap.dedent("""\
617 {lead} {version}
618 """).format(
619 lead=message_lead, version=debian_release)
620 dput.dput.source_check(**self.test_args)
621 if hasattr(self, 'release'):
622 self.assertIn(expected_output, sys.stdout.getvalue())
623 else:
624 self.assertNotIn(message_lead, sys.stdout.getvalue())
627 class verify_files_TestCase(
628 testscenarios.WithScenarios,
629 testtools.TestCase):
630 """ Test cases for `verify_files` function. """
632 default_args = dict(
633 host="foo",
634 check_only=None,
635 check_version=None,
636 unsigned_upload=None,
637 debug=None,
640 scenarios = [
641 ('default', {}),
642 ('binary-only', {
643 'check_upload_variant_return_value': False,
645 ('include foo.tar.gz', {
646 'additional_file_suffixes': [".tar.gz"],
647 'source_check_result': SourceCheckResult(
648 include_orig=False, include_tar=True),
650 ('include foo.orig.tar.gz', {
651 'additional_file_suffixes': [".orig.tar.gz"],
652 'source_check_result': SourceCheckResult(
653 include_orig=True, include_tar=False),
655 ('unexpected foo.tar.gz', {
656 'additional_file_suffixes': [".tar.gz"],
657 'expected_rejection_message': (
658 "Package includes a .tar.gz file although"),
660 ('unexpected foo.orig.tar.gz', {
661 'additional_file_suffixes': [".orig.tar.gz"],
662 'expected_rejection_message': (
663 "Package includes an .orig.tar.gz file although"),
665 ('no distribution', {
666 'test_distribution': None,
670 def setUp(self):
671 """ Set up test fixtures. """
672 super(verify_files_TestCase, self).setUp()
673 patch_system_interfaces(self)
675 self.file_double_by_path = {}
676 set_config(self, 'exist-simple')
677 patch_runtime_config_options(self)
679 self.set_test_args()
681 setup_changes_file_fixtures(self)
682 set_changes_file_scenario(self, 'exist-minimal')
683 self.test_args.update(dict(
684 path=os.path.dirname(self.changes_file_double.path),
685 filename=os.path.basename(self.changes_file_double.path),
688 patch_os_stat(self)
690 setup_upload_file_fixtures(self)
691 self.set_expected_files_to_upload()
693 self.patch_checksum_test()
694 self.patch_parse_changes()
695 self.patch_check_upload_variant()
696 self.set_expected_binary_upload()
697 self.set_expected_source_control_file_path()
698 self.patch_version_check()
699 self.patch_verify_signature()
700 self.patch_source_check()
702 def set_expected_files_to_upload(self):
703 """ Set the expected `files_to_upload` result for this test case. """
704 self.expected_files_to_upload = set(
705 path for path in self.fake_upload_file_paths)
706 self.expected_files_to_upload.add(self.changes_file_double.path)
708 def patch_checksum_test(self):
709 """ Patch `checksum_test` function for this test case. """
710 func_patcher = mock.patch.object(
711 dput.dput, "checksum_test", autospec=True)
712 mock_func = func_patcher.start()
713 self.addCleanup(func_patcher.stop)
715 def get_checksum_for_file(path, hash_name):
716 return self.fake_checksum_by_file[os.path.basename(path)]
717 mock_func.side_effect = get_checksum_for_file
719 def set_changes_document(self):
720 """ Set the changes document for this test case. """
721 self.changes_document = make_changes_document(
722 fields={},
723 upload_params_by_name=self.upload_params_by_name)
724 self.test_distribution = getattr(self, 'test_distribution', "lorem")
725 if self.test_distribution is not None:
726 self.changes_document.add_header(
727 'distribution', self.test_distribution)
728 self.runtime_config_parser.set(
729 self.test_args['host'], 'allowed_distributions',
730 self.test_distribution)
732 dput.dput.parse_changes.return_value = self.changes_document
734 def set_upload_params(self):
735 """ Set the upload parameters for this test case. """
736 self.upload_params_by_name = make_upload_files_params(
737 self.fake_checksum_by_file,
738 self.fake_size_by_file)
740 def patch_parse_changes(self):
741 """ Patch `parse_changes` function for this test case. """
742 func_patcher = mock.patch.object(
743 dput.dput, "parse_changes", autospec=True)
744 func_patcher.start()
745 self.addCleanup(func_patcher.stop)
747 self.set_upload_params()
748 self.set_changes_document()
750 def patch_check_upload_variant(self):
751 """ Patch `check_upload_variant` function for this test case. """
752 if not hasattr(self, 'check_upload_variant_return_value'):
753 self.check_upload_variant_return_value = True
755 func_patcher = mock.patch.object(
756 dput.dput, "check_upload_variant", autospec=True,
757 return_value=self.check_upload_variant_return_value)
758 func_patcher.start()
759 self.addCleanup(func_patcher.stop)
761 def patch_version_check(self):
762 """ Patch `version_check` function for this test case. """
763 func_patcher = mock.patch.object(
764 dput.dput, "version_check", autospec=True)
765 func_patcher.start()
766 self.addCleanup(func_patcher.stop)
768 def patch_verify_signature(self):
769 """ Patch `verify_signature` function for this test case. """
770 func_patcher = mock.patch.object(
771 dput.dput, "verify_signature", autospec=True)
772 func_patcher.start()
773 self.addCleanup(func_patcher.stop)
775 def patch_source_check(self):
776 """ Patch `source_check` function for this test case. """
777 func_patcher = mock.patch.object(
778 dput.dput, "source_check", autospec=True)
779 mock_func = func_patcher.start()
780 self.addCleanup(func_patcher.stop)
782 source_check_result = getattr(
783 self, 'source_check_result', SourceCheckResult(
784 include_orig=False, include_tar=False))
785 mock_func.return_value = source_check_result
787 def set_test_args(self):
788 """ Set test args for this test case. """
789 extra_args = getattr(self, 'extra_args', {})
790 self.test_args = self.default_args.copy()
791 self.test_args['config'] = self.runtime_config_parser
792 self.test_args.update(extra_args)
794 def set_expected_binary_upload(self):
795 """ Set expected value for `binary_upload` flag. """
796 self.expected_binary_upload = self.check_upload_variant_return_value
798 def set_expected_source_control_file_path(self):
799 """ Set expected value for source control file path. """
800 file_name = next(
801 os.path.basename(file_path)
802 for file_path in self.fake_upload_file_paths
803 if file_path.endswith(".dsc"))
804 if not self.expected_binary_upload:
805 self.expected_source_control_file_path = os.path.join(
806 os.path.dirname(self.changes_file_double.path), file_name)
807 else:
808 self.expected_source_control_file_path = ""
810 def test_emits_changes_file_path_debug_message(self):
811 """ Should emit debug message for changes file path. """
812 self.test_args['debug'] = True
813 dput.dput.verify_files(**self.test_args)
814 expected_output = textwrap.dedent("""\
815 D: Validating contents of changes file {path}
816 """).format(path=self.changes_file_double.path)
817 self.assertIn(expected_output, sys.stdout.getvalue())
819 def test_calls_sys_exit_if_input_read_denied(self):
820 """ Should call `sys.exit` if input file read access is denied. """
821 set_changes_file_scenario(self, 'error-read-denied')
822 with testtools.ExpectedException(FakeSystemExit):
823 dput.dput.verify_files(**self.test_args)
824 expected_output = textwrap.dedent("""\
825 Can't open {path}
826 """).format(path=self.changes_file_double.path)
827 self.assertIn(expected_output, sys.stdout.getvalue())
828 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
830 def test_calls_parse_changes_with_changes_files(self):
831 """ Should call `parse_changes` with changes file. """
832 dput.dput.verify_files(**self.test_args)
833 dput.dput.parse_changes.assert_called_with(
834 self.changes_file_double.fake_file)
836 def test_calls_check_upload_variant_with_changes_document(self):
837 """ Should call `check_upload_variant` with changes document. """
838 dput.dput.verify_files(**self.test_args)
839 dput.dput.check_upload_variant.assert_called_with(
840 self.changes_document, mock.ANY)
842 def test_emits_upload_dsc_file_debug_message(self):
843 """ Should emit debug message for ‘*.dsc’ file. """
844 if getattr(self, 'check_upload_variant_return_value', True):
845 self.skipTest("Binary package upload for this scenario")
846 self.test_args['debug'] = True
847 dput.dput.verify_files(**self.test_args)
848 dsc_file_path = next(
849 os.path.basename(file_path)
850 for file_path in self.fake_upload_file_paths
851 if file_path.endswith(".dsc"))
852 expected_output = textwrap.dedent("""\
853 D: dsc-File: {path}
854 """).format(path=dsc_file_path)
855 self.assertThat(
856 sys.stdout.getvalue(),
857 testtools.matchers.Contains(expected_output))
859 def test_calls_sys_exit_when_source_upload_omits_dsc_file(self):
860 """ Should call `sys.exit` when source upload omits ‘*.dsc’ file. """
861 if getattr(self, 'check_upload_variant_return_value', True):
862 self.skipTest("Binary package upload for this scenario")
863 self.fake_checksum_by_file = dict(
864 (file_path, checksum)
865 for (file_path, checksum)
866 in self.fake_checksum_by_file.items()
867 if not file_path.endswith(".dsc"))
868 self.set_upload_params()
869 self.set_changes_document()
870 with testtools.ExpectedException(FakeSystemExit):
871 dput.dput.verify_files(**self.test_args)
872 expected_output = textwrap.dedent("""\
873 Error: no dsc file found in sourceful upload
874 """)
875 self.assertThat(
876 sys.stderr.getvalue(),
877 testtools.matchers.Contains(expected_output))
878 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
880 def test_calls_version_check_when_specified_in_config(self):
881 """ Should call `version_check` when specified in config. """
882 self.runtime_config_parser.set(
883 self.test_args['host'], 'check_version', "true")
884 dput.dput.verify_files(**self.test_args)
885 dput.dput.version_check.assert_called_with(
886 os.path.dirname(self.changes_file_double.path),
887 self.changes_document,
888 self.test_args['debug'])
890 def test_calls_version_check_when_specified_in_args(self):
891 """ Should call `version_check` when specified in arguments. """
892 self.test_args['check_version'] = True
893 dput.dput.verify_files(**self.test_args)
894 dput.dput.version_check.assert_called_with(
895 os.path.dirname(self.changes_file_double.path),
896 self.changes_document,
897 self.test_args['debug'])
899 def test_calls_sys_exit_when_host_section_not_in_config(self):
900 """ Should call `sys.exit` when specified host not in config. """
901 self.runtime_config_parser.remove_section(self.test_args['host'])
902 with testtools.ExpectedException(FakeSystemExit):
903 dput.dput.verify_files(**self.test_args)
904 expected_output = textwrap.dedent("""\
905 Error in config file:
906 No section: ...
907 """)
908 self.assertThat(
909 sys.stderr.getvalue(),
910 testtools.matchers.DocTestMatches(
911 expected_output, doctest.ELLIPSIS))
912 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
914 def test_calls_verify_signature_with_expected_args(self):
915 """ Should call `verify_signature` with expected args. """
916 dput.dput.verify_files(**self.test_args)
917 dput.dput.verify_signature.assert_called_with(
918 self.test_args['host'],
919 self.changes_file_double.path,
920 self.expected_source_control_file_path,
921 self.runtime_config_parser,
922 self.test_args['check_only'],
923 self.test_args['unsigned_upload'], mock.ANY,
924 self.test_args['debug'])
926 def test_calls_source_check_with_changes_document(self):
927 """ Should call `source_check` with changes document. """
928 dput.dput.verify_files(**self.test_args)
929 dput.dput.source_check.assert_called_with(
930 self.changes_document, self.test_args['debug'])
932 def test_emits_upload_file_path_debug_message(self):
933 """ Should emit debug message for each upload file path. """
934 self.test_args['debug'] = True
935 dput.dput.verify_files(**self.test_args)
936 for file_path in self.fake_upload_file_paths:
937 expected_output = textwrap.dedent("""\
938 D: File to upload: {path}
939 """).format(path=file_path)
940 self.expectThat(
941 sys.stdout.getvalue(),
942 testtools.matchers.Contains(expected_output))
944 def test_calls_checksum_test_with_upload_files(self):
945 """ Should call `checksum_test` with each upload file path. """
946 dput.dput.verify_files(**self.test_args)
947 expected_calls = [
948 mock.call(file_path, mock.ANY)
949 for file_path in self.fake_upload_file_paths]
950 dput.dput.checksum_test.assert_has_calls(
951 expected_calls, any_order=True)
953 def set_bogus_file_checksums(self):
954 """ Set bogus file checksums that will not match. """
955 self.fake_checksum_by_file = {
956 file_name: self.getUniqueString()
957 for file_name in self.fake_checksum_by_file}
959 def test_emits_checksum_okay_debug_message(self):
960 """ Should emit debug message checksum okay for each file. """
961 self.test_args['debug'] = True
962 dput.dput.verify_files(**self.test_args)
963 for file_path in self.fake_upload_file_paths:
964 expected_output = textwrap.dedent("""\
965 D: Checksum for {path} is fine
966 """).format(path=file_path)
967 self.expectThat(
968 sys.stdout.getvalue(),
969 testtools.matchers.Contains(expected_output))
971 def test_emits_checksum_mismatch_debug_message(self):
972 """ Should emit debug message when a checksum does not match. """
973 self.test_args['debug'] = True
974 self.set_bogus_file_checksums()
975 with testtools.ExpectedException(FakeSystemExit):
976 dput.dput.verify_files(**self.test_args)
977 expected_output = textwrap.dedent("""\
979 D: Checksum from .changes: ...
980 D: Generated Checksum: ...
982 """)
983 self.assertThat(
984 sys.stdout.getvalue(),
985 testtools.matchers.DocTestMatches(
986 expected_output, doctest.ELLIPSIS))
988 def test_calls_sys_exit_when_checksum_mismatch(self):
989 """ Should call `sys.exit` when a checksum does not match. """
990 specified_checksum_by_file = self.fake_checksum_by_file
991 self.set_bogus_file_checksums()
992 with testtools.ExpectedException(FakeSystemExit):
993 dput.dput.verify_files(**self.test_args)
995 expected_output_for_files = [
996 textwrap.dedent("""\
997 Checksum doesn't match for {file_name}
998 """).format(
999 file_name=os.path.join(
1000 os.path.dirname(self.changes_file_double.path),
1001 file_name),
1002 specified_hash=specified_hash,
1003 computed_hash=self.fake_checksum_by_file[file_name])
1004 for (file_name, specified_hash)
1005 in specified_checksum_by_file.items()]
1006 self.assertThat(
1007 sys.stdout.getvalue(),
1008 testtools.matchers.MatchesAny(*[
1009 testtools.matchers.Contains(expected_output)
1010 for expected_output in expected_output_for_files]))
1011 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
1013 def test_calls_os_stat_with_upload_files(self):
1014 """ Should call `os.stat` with each upload file path. """
1015 dput.dput.verify_files(**self.test_args)
1016 expected_calls = [
1017 mock.call(file_path)
1018 for file_path in self.fake_upload_file_paths]
1019 os.stat.assert_has_calls(expected_calls, any_order=True)
1021 def set_bogus_file_sizes(self):
1022 """ Set bogus file sizes that will not match. """
1023 file_double_registry = FileDouble.get_registry_for_testcase(self)
1024 for file_name in self.fake_size_by_file:
1025 bogus_size = self.getUniqueInteger()
1026 self.fake_size_by_file[file_name] = bogus_size
1027 file_path = os.path.join(
1028 os.path.dirname(self.changes_file_double.path),
1029 file_name)
1030 file_double = file_double_registry[file_path]
1031 file_double.stat_result = file_double.stat_result._replace(
1032 st_size=bogus_size)
1034 def test_emits_size_mismatch_debug_message(self):
1035 """ Should emit debug message when a size does not match. """
1036 self.test_args['debug'] = True
1037 self.set_bogus_file_sizes()
1038 dput.dput.verify_files(**self.test_args)
1039 expected_output = textwrap.dedent("""\
1041 D: size from .changes: ...
1042 D: calculated size: ...
1044 """)
1045 self.assertThat(
1046 sys.stdout.getvalue(),
1047 testtools.matchers.DocTestMatches(
1048 expected_output, doctest.ELLIPSIS))
1050 def test_emits_size_mismatch_message_for_each_file(self):
1051 """ Should emit error message for each file with size mismatch. """
1052 self.set_bogus_file_sizes()
1053 dput.dput.verify_files(**self.test_args)
1054 for file_path in self.fake_upload_file_paths:
1055 expected_output = textwrap.dedent("""\
1056 size doesn't match for {path}
1057 """).format(path=file_path)
1058 self.expectThat(
1059 sys.stdout.getvalue(),
1060 testtools.matchers.Contains(expected_output))
1062 def test_emits_rejection_warning_when_unexpected_tarball(self):
1063 """ Should emit warning of rejection when unexpected tarball. """
1064 if not hasattr(self, 'expected_rejection_message'):
1065 self.skipTest("No rejection message expected")
1066 dput.dput.verify_files(**self.test_args)
1067 sys.stderr.write("calls: {calls!r}\n".format(
1068 calls=sys.stdout.write.mock_calls))
1069 self.assertThat(
1070 sys.stdout.getvalue(),
1071 testtools.matchers.Contains(self.expected_rejection_message))
1073 def test_raises_error_when_distribution_mismatch(self):
1074 """ Should raise error when distribution mismatch against allowed. """
1075 if not getattr(self, 'test_distribution', None):
1076 self.skipTest("No distribution set for this test case")
1077 self.runtime_config_parser.set(
1078 self.test_args['host'], 'allowed_distributions',
1079 "dolor sit amet")
1080 with testtools.ExpectedException(dputhelper.DputUploadFatalException):
1081 dput.dput.verify_files(**self.test_args)
1083 def test_emits_changes_file_upload_debug_message(self):
1084 """ Should emit debug message for upload of changes file. """
1085 self.test_args['debug'] = True
1086 dput.dput.verify_files(**self.test_args)
1087 expected_output = textwrap.dedent("""\
1088 D: File to upload: {path}
1089 """).format(path=self.changes_file_double.path)
1090 self.assertIn(expected_output, sys.stdout.getvalue())
1092 def test_returns_expected_files_to_upload_collection(self):
1093 """ Should return expected `files_to_upload` collection value. """
1094 result = dput.dput.verify_files(**self.test_args)
1095 expected_result = self.expected_files_to_upload
1096 self.assertEqual(expected_result, set(result))
1099 class guess_upload_host_TestCase(
1100 testscenarios.WithScenarios,
1101 testtools.TestCase):
1102 """ Test cases for `guess_upload_host` function. """
1104 changes_file_scenarios = [
1105 ('no-distribution', {
1106 'fake_file': StringIO(textwrap.dedent("""\
1107 Files:
1108 Lorem ipsum dolor sit amet
1109 """)),
1111 ('distribution-spam', {
1112 'fake_file': StringIO(textwrap.dedent("""\
1113 Distribution: spam
1114 Files:
1115 Lorem ipsum dolor sit amet
1116 """)),
1118 ('distribution-beans', {
1119 'fake_file': StringIO(textwrap.dedent("""\
1120 Distribution: beans
1121 Files:
1122 Lorem ipsum dolor sit amet
1123 """)),
1127 scenarios = [
1128 ('distribution-found-of-one', {
1129 'changes_file_scenario_name': "distribution-spam",
1130 'test_distribution': "spam",
1131 'config_scenario_name': "exist-distribution-one",
1132 'expected_host': "foo",
1134 ('distribution-notfound-of-one', {
1135 'changes_file_scenario_name': "distribution-beans",
1136 'test_distribution': "beans",
1137 'config_scenario_name': "exist-distribution-one",
1138 'expected_host': "ftp-master",
1140 ('distribution-first-of-three', {
1141 'changes_file_scenario_name': "distribution-spam",
1142 'test_distribution': "spam",
1143 'config_scenario_name': "exist-distribution-three",
1144 'expected_host': "foo",
1146 ('distribution-last-of-three', {
1147 'changes_file_scenario_name': "distribution-beans",
1148 'test_distribution': "beans",
1149 'config_scenario_name': "exist-distribution-three",
1150 'expected_host': "foo",
1152 ('no-configured-distribution', {
1153 'changes_file_scenario_name': "distribution-beans",
1154 'config_scenario_name': "exist-distribution-none",
1155 'expected_host': "ftp-master",
1157 ('no-distribution', {
1158 'changes_file_scenario_name': "no-distribution",
1159 'config_scenario_name': "exist-simple",
1160 'expected_host': "ftp-master",
1162 ('default-distribution', {
1163 'config_scenario_name': "exist-default-distribution-only",
1164 'config_default_default_host_main': "consecteur",
1165 'expected_host': "consecteur",
1169 def setUp(self):
1170 """ Set up test fixtures. """
1171 super(guess_upload_host_TestCase, self).setUp()
1172 patch_system_interfaces(self)
1174 set_config(
1175 self,
1176 getattr(self, 'config_scenario_name', 'exist-minimal'))
1177 patch_runtime_config_options(self)
1179 self.setup_changes_file_fixtures()
1180 set_changes_file_scenario(
1181 self,
1182 getattr(self, 'changes_file_scenario_name', 'no-distribution'))
1184 self.set_test_args()
1186 def set_test_args(self):
1187 """ Set the arguments for the test call to the function. """
1188 self.test_args = dict(
1189 path=os.path.dirname(self.changes_file_double.path),
1190 filename=os.path.basename(self.changes_file_double.path),
1191 config=self.runtime_config_parser,
1194 def setup_changes_file_fixtures(self):
1195 """ Set up fixtures for fake changes file. """
1196 file_path = make_changes_file_path()
1198 scenarios = [s for (__, s) in self.changes_file_scenarios]
1199 for scenario in scenarios:
1200 scenario['file_double'] = FileDouble(
1201 file_path, scenario['fake_file'])
1202 setup_file_double_behaviour(
1203 self,
1204 get_file_doubles_from_fake_file_scenarios(scenarios))
1206 def test_calls_sys_exit_if_read_denied(self):
1207 """ Should call `sys.exit` if read permission denied. """
1208 self.changes_file_double.set_os_access_scenario('denied')
1209 self.changes_file_double.set_open_scenario('read_denied')
1210 with testtools.ExpectedException(FakeSystemExit):
1211 dput.dput.guess_upload_host(**self.test_args)
1212 expected_output = textwrap.dedent("""\
1213 Can't open {path}
1214 """).format(path=self.changes_file_double.path)
1215 self.assertIn(expected_output, sys.stdout.getvalue())
1216 sys.exit.assert_called_with(EXIT_STATUS_FAILURE)
1218 def test_returns_expected_host(self):
1219 """ Should return expected host value. """
1220 result = dput.dput.guess_upload_host(**self.test_args)
1221 self.assertEqual(self.expected_host, result)
1223 @mock.patch.object(dput.dput, 'debug', True)
1224 def test_emits_debug_message_for_host(self):
1225 """ Should emit a debug message for the discovered host. """
1226 config_parser = self.runtime_config_parser
1227 if not (
1228 config_parser.has_section(self.expected_host)
1229 and config_parser.get(self.expected_host, 'distributions')):
1230 self.skipTest("No distributions specified")
1231 dput.dput.guess_upload_host(**self.test_args)
1232 expected_output = textwrap.dedent("""\
1233 D: guessing host {host} based on distribution {dist}
1234 """).format(
1235 host=self.expected_host, dist=self.test_distribution)
1236 self.assertIn(expected_output, sys.stdout.getvalue())
1239 # Local variables:
1240 # coding: utf-8
1241 # mode: python
1242 # End:
1243 # vim: fileencoding=utf-8 filetype=python :