Implemented dmarc_addresses feature.
[mailman.git] / src / mailman / utilities / tests / test_import.py
blob6e0594049afdb7bceef3f152ac91dc4012fbb67f
1 # Copyright (C) 2010-2023 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <https://www.gnu.org/licenses/>.
18 """Tests for config.pck imports."""
20 import os
21 import unittest
23 from contextlib import ExitStack, redirect_stderr
24 from datetime import datetime, timedelta
25 from enum import Enum
26 from importlib_resources import open_binary
27 from io import StringIO
28 from mailman.app.lifecycle import create_list
29 from mailman.config import config
30 from mailman.database.helpers import is_mysql
31 from mailman.handlers.decorate import decorate
32 from mailman.interfaces.action import Action, FilterAction
33 from mailman.interfaces.address import InvalidEmailAddressError
34 from mailman.interfaces.archiver import ArchivePolicy
35 from mailman.interfaces.autorespond import ResponseAction
36 from mailman.interfaces.bans import IBanManager
37 from mailman.interfaces.bounce import UnrecognizedBounceDisposition
38 from mailman.interfaces.domain import IDomainManager
39 from mailman.interfaces.languages import ILanguageManager
40 from mailman.interfaces.mailinglist import (
41 DMARCMitigateAction,
42 IAcceptableAliasSet,
43 SubscriptionPolicy,
45 from mailman.interfaces.member import DeliveryMode, DeliveryStatus
46 from mailman.interfaces.nntp import NewsgroupModeration
47 from mailman.interfaces.template import ITemplateLoader, ITemplateManager
48 from mailman.interfaces.usermanager import IUserManager
49 from mailman.model.roster import RosterVisibility
50 from mailman.testing.helpers import LogFileMark
51 from mailman.testing.layers import ConfigLayer
52 from mailman.utilities.filesystem import makedirs
53 from mailman.utilities.i18n import search
54 from mailman.utilities.importer import (
55 check_language_code,
56 Import21Error,
57 import_config_pck,
59 from pickle import load
60 from shutil import rmtree
61 from unittest import mock
62 from urllib.error import URLError
63 from zope.component import getUtility
66 NL = '\n'
69 class DummyEnum(Enum):
70 # For testing purposes
71 val = 42
74 def list_to_string(data):
75 return NL.join(data).encode('utf-8')
78 class TestBasicImport(unittest.TestCase):
79 layer = ConfigLayer
80 maxDiff = None
82 def setUp(self):
83 self._mlist = create_list('blank@example.com')
84 self._pckdict = load(open_binary('mailman.testing', 'config.pck'))
86 def _import(self):
87 import_config_pck(self._mlist, self._pckdict)
89 def test_display_name(self):
90 # The mlist.display_name gets set from the old list's real_name.
91 self.assertEqual(self._mlist.display_name, 'Blank')
92 self._import()
93 self.assertEqual(self._mlist.display_name, 'Test')
95 def test_mail_host_invariant(self):
96 # The mlist.mail_host must not be updated when importing (it will
97 # change the list_id property, which is supposed to be read-only).
98 self.assertEqual(self._mlist.mail_host, 'example.com')
99 self._import()
100 self.assertEqual(self._mlist.mail_host, 'example.com')
102 def test_rfc2369_headers(self):
103 self._mlist.allow_list_posts = False
104 self._mlist.include_rfc2369_headers = False
105 self._import()
106 self.assertTrue(self._mlist.allow_list_posts)
107 self.assertTrue(self._mlist.include_rfc2369_headers)
109 def test_no_overwrite_rosters(self):
110 # The mlist.members and mlist.digest_members rosters must not be
111 # overwritten.
112 for rname in ('members', 'digest_members'):
113 roster = getattr(self._mlist, rname)
114 self.assertFalse(isinstance(roster, dict))
115 # Suppress warning messages in test output.
116 with mock.patch('sys.stderr'):
117 self._import()
118 self.assertFalse(
119 isinstance(roster, dict),
120 'The %s roster has been overwritten by the import' % rname)
122 def test_last_post_time(self):
123 # last_post_time -> last_post_at
124 self._pckdict['last_post_time'] = 1270420800.274485
125 self.assertEqual(self._mlist.last_post_at, None)
126 self._import()
127 # convert 1270420800.2744851 to datetime
128 expected = datetime(2010, 4, 4, 22, 40, 0, 274485)
129 self.assertEqual(self._mlist.last_post_at, expected)
131 def test_autoresponse_grace_period(self):
132 # autoresponse_graceperiod -> autoresponse_grace_period
133 # must be a timedelta, not an int
134 self._mlist.autoresponse_grace_period = timedelta(days=42)
135 self._import()
136 self.assertTrue(
137 isinstance(self._mlist.autoresponse_grace_period, timedelta))
138 self.assertEqual(self._mlist.autoresponse_grace_period,
139 timedelta(days=90))
141 def test_autoresponse_admin_to_owner(self):
142 # admin -> owner
143 self._mlist.autorespond_owner = DummyEnum.val
144 self._mlist.autoresponse_owner_text = 'DUMMY'
145 self._import()
146 self.assertEqual(self._mlist.autorespond_owner, ResponseAction.none)
147 self.assertEqual(self._mlist.autoresponse_owner_text, '')
149 def test_autoresponse_owner_yes(self):
150 # Yes -> ResponseAction.respond_and_continue
151 self._mlist.autorespond_owner = DummyEnum.val
152 self._mlist.autoresponse_owner_text = 'DUMMY'
153 self._pckdict['autorespond_admin'] = 1
154 self._pckdict['autoresponse_admin_text'] = 'Autoresponse'
155 self._import()
156 self.assertEqual(self._mlist.autorespond_owner,
157 ResponseAction.respond_and_continue)
158 self.assertEqual(self._mlist.autoresponse_owner_text, 'Autoresponse')
160 def test_autoresponse_post_yes(self):
161 # Yes -> ResponseAction.respond_and_continue
162 self._mlist.autorespond_postings = DummyEnum.val
163 self._mlist.autoresponse_postings_text = 'DUMMY'
164 self._pckdict['autorespond_postings'] = 1
165 self._pckdict['autoresponse_postings_text'] = 'Autoresponse'
166 self._import()
167 self.assertEqual(self._mlist.autorespond_postings,
168 ResponseAction.respond_and_continue)
169 self.assertEqual(self._mlist.autoresponse_postings_text,
170 'Autoresponse')
172 def test_autoresponse_post_no(self):
173 # No -> ResponseAction.none
174 self._mlist.autorespond_postings = DummyEnum.val
175 self._mlist.autoresponse_postings_text = 'DUMMY'
176 self._pckdict['autorespond_postings'] = 0
177 self._import()
178 self.assertEqual(self._mlist.autorespond_postings,
179 ResponseAction.none)
180 self.assertEqual(self._mlist.autoresponse_postings_text, '')
182 def test_autoresponse_request_continue(self):
183 # Yes, w/forward -> ResponseAction.respond_and_continue
184 self._mlist.autorespond_requests = DummyEnum.val
185 self._mlist.autoresponse_request_text = 'DUMMY'
186 self._pckdict['autorespond_requests'] = 2
187 self._pckdict['autoresponse_request_text'] = 'Autoresponse'
188 self._import()
189 self.assertEqual(self._mlist.autorespond_requests,
190 ResponseAction.respond_and_continue)
191 self.assertEqual(self._mlist.autoresponse_request_text,
192 'Autoresponse')
194 def test_autoresponse_request_discard(self):
195 # Yes, w/discard -> ResponseAction.respond_and_discard
196 self._mlist.autorespond_requests = DummyEnum.val
197 self._mlist.autoresponse_request_text = 'DUMMY'
198 self._pckdict['autorespond_requests'] = 1
199 self._pckdict['autoresponse_request_text'] = 'Autoresponse'
200 self._import()
201 self.assertEqual(self._mlist.autorespond_requests,
202 ResponseAction.respond_and_discard)
203 self.assertEqual(self._mlist.autoresponse_request_text,
204 'Autoresponse')
206 def test_autoresponse_request_no(self):
207 # No -> ResponseAction.none
208 self._mlist.autorespond_requests = DummyEnum.val
209 self._mlist.autoresponse_request_text = 'DUMMY'
210 self._pckdict['autorespond_requests'] = 0
211 self._import()
212 self.assertEqual(self._mlist.autorespond_requests,
213 ResponseAction.none)
214 self.assertEqual(self._mlist.autoresponse_request_text, '')
216 def test_administrativia(self):
217 self._mlist.administrivia = None
218 self._import()
219 self.assertTrue(self._mlist.administrivia)
221 def test_filter_pass_renames(self):
222 # mime_types -> types
223 # filename_extensions -> extensions
224 self._mlist.filter_types = ['dummy']
225 self._mlist.pass_types = ['dummy']
226 self._mlist.filter_extensions = ['dummy']
227 self._mlist.pass_extensions = ['dummy']
228 self._import()
229 self.assertEqual(list(self._mlist.filter_types), [])
230 self.assertEqual(list(self._mlist.filter_extensions),
231 ['exe', 'bat', 'cmd', 'com', 'pif',
232 'scr', 'vbs', 'cpl'])
233 self.assertEqual(
234 list(self._mlist.pass_types),
235 ['multipart/mixed', 'multipart/alternative', 'text/plain'])
236 self.assertEqual(list(self._mlist.pass_extensions), [])
238 def test_process_bounces(self):
239 # bounce_processing -> process_bounces
240 self._mlist.process_bounces = None
241 self._import()
242 self.assertTrue(self._mlist.process_bounces)
244 def test_forward_unrecognized_bounces_to(self):
245 # bounce_unrecognized_goes_to_list_owner
246 # -> forward_unrecognized_bounces_to
247 self._mlist.forward_unrecognized_bounces_to = DummyEnum.val
248 self._import()
249 self.assertEqual(self._mlist.forward_unrecognized_bounces_to,
250 UnrecognizedBounceDisposition.administrators)
252 def test_moderator_password(self):
253 # mod_password -> moderator_password
254 self._mlist.moderator_password = b'TESTDATA'
255 self._import()
256 self.assertEqual(self._mlist.moderator_password, None)
258 def test_moderator_password_str(self):
259 # moderator_password must not be unicode
260 self._pckdict['mod_password'] = b'TESTVALUE'
261 self._import()
262 self.assertNotIsInstance(self._mlist.moderator_password, str)
263 self.assertEqual(self._mlist.moderator_password, b'TESTVALUE')
265 def test_newsgroup_moderation(self):
266 # news_moderation -> newsgroup_moderation
267 # news_prefix_subject_too -> nntp_prefix_subject_too
268 self._mlist.newsgroup_moderation = DummyEnum.val
269 self._mlist.nntp_prefix_subject_too = None
270 self._import()
271 self.assertEqual(self._mlist.newsgroup_moderation,
272 NewsgroupModeration.none)
273 self.assertTrue(self._mlist.nntp_prefix_subject_too)
275 def test_msg_to_message(self):
276 # send_welcome_msg -> send_welcome_message
277 # send_goodbye_msg -> send_goodbye_message
278 self._mlist.send_welcome_message = None
279 self._mlist.send_goodbye_message = None
280 self._import()
281 self.assertTrue(self._mlist.send_welcome_message)
282 self.assertTrue(self._mlist.send_goodbye_message)
284 def test_dmarc_zero_from_is_list(self):
285 self._mlist.dmarc_mitigate_action = DummyEnum.val
286 self._mlist.dmarc_mitigate_unconditionally = True
287 self._pckdict['from_is_list'] = 0
288 self._pckdict['dmarc_moderation_action'] = 1
289 self._import()
290 self.assertFalse(self._mlist.dmarc_mitigate_unconditionally)
291 self.assertEqual(self._mlist.dmarc_mitigate_action,
292 DMARCMitigateAction.munge_from)
294 def test_dmarc_zero_dmarc_moderation_action(self):
295 self._mlist.dmarc_mitigate_action = DummyEnum.val
296 self._mlist.dmarc_mitigate_unconditionally = False
297 self._pckdict['from_is_list'] = 1
298 self._pckdict['dmarc_moderation_action'] = 0
299 self._import()
300 self.assertTrue(self._mlist.dmarc_mitigate_unconditionally)
301 self.assertEqual(self._mlist.dmarc_mitigate_action,
302 DMARCMitigateAction.munge_from)
304 def test_dmarc_nonzero_actions_fil(self):
305 self._mlist.dmarc_mitigate_action = DummyEnum.val
306 self._mlist.dmarc_mitigate_unconditionally = False
307 self._pckdict['from_is_list'] = 2
308 self._pckdict['dmarc_moderation_action'] = 1
309 self._import()
310 self.assertTrue(self._mlist.dmarc_mitigate_unconditionally)
311 self.assertEqual(self._mlist.dmarc_mitigate_action,
312 DMARCMitigateAction.wrap_message)
314 def test_dmarc_nonzero_actions_dma(self):
315 self._mlist.dmarc_mitigate_action = DummyEnum.val
316 self._mlist.dmarc_mitigate_unconditionally = True
317 self._pckdict['from_is_list'] = 1
318 self._pckdict['dmarc_moderation_action'] = 2
319 self._import()
320 self.assertFalse(self._mlist.dmarc_mitigate_unconditionally)
321 self.assertEqual(self._mlist.dmarc_mitigate_action,
322 DMARCMitigateAction.wrap_message)
324 def test_dmarc_messages(self):
325 self._pckdict['dmarc_moderation_notice'] = b'This is a notice.\n'
326 self._pckdict['dmarc_wrapped_message_text'] = b'This is text.\n'
327 self._import()
328 self.assertEqual('This is a notice.\n',
329 self._mlist.dmarc_moderation_notice)
330 self.assertEqual('This is text.\n',
331 self._mlist.dmarc_wrapped_message_text)
333 def test_dmarc_addresses(self):
334 self._pckdict['dmarc_moderation_addresses'] = [
335 b'user@example.com',
336 b'^.*@gmail.com',
338 self._import()
339 self.assertEqual(['user@example.com', '^.*@gmail.com'],
340 self._mlist.dmarc_addresses)
342 def test_ban_list(self):
343 banned = [
344 ('anne@example.net', 'anne@example.net'),
345 ('^.*@example.edu', 'bob@example.edu'),
346 ('non-ascii-\xe8@example.com', 'non-ascii-\ufffd@example.com'),
348 self._pckdict['ban_list'] = [b[0].encode('iso-8859-1') for b in banned]
349 self._import()
350 for _pattern, addr in banned:
351 self.assertTrue(IBanManager(self._mlist).is_banned(addr))
353 def test_acceptable_aliases(self):
354 # This used to be a plain-text field (values are newline-separated)
355 # but values were interpreted as regexps even without '^' so we need
356 # to add the '^'.
357 # Also check that the list name local part is added.
358 aliases = ['alias1@example.com',
359 'alias2@exemple.com',
360 'non-ascii-\xe8@example.com',
362 new_aliases = ['^alias1@example.com',
363 '^alias2@exemple.com',
364 '^blank@',
365 '^non-ascii-\xe8@example.com',
367 self._pckdict['acceptable_aliases'] = list_to_string(aliases)
368 self._import()
369 alias_set = IAcceptableAliasSet(self._mlist)
370 self.assertEqual(sorted(alias_set.aliases), new_aliases)
372 def test_acceptable_aliases_invalid(self):
373 # Values without an '@' sign used to be matched against the local
374 # part, now we need to add the '^' sign to indicate it's a regexp.
375 aliases = ['invalid-value']
376 new_aliases = ['^blank@', '^invalid-value']
377 self._pckdict['acceptable_aliases'] = list_to_string(aliases)
378 self._import()
379 alias_set = IAcceptableAliasSet(self._mlist)
380 self.assertEqual(sorted(alias_set.aliases), new_aliases)
382 def test_acceptable_aliases_as_list(self):
383 # In some versions of the pickle, this can be a list, not a string
384 # (seen in the wild). We still need to add the '^'.
385 aliases = [b'alias1@example.com', b'alias2@exemple.com']
386 new_aliases = ['^alias1@example.com', '^alias2@exemple.com', '^blank@']
387 self._pckdict['acceptable_aliases'] = aliases
388 self._import()
389 alias_set = IAcceptableAliasSet(self._mlist)
390 self.assertEqual(sorted(alias_set.aliases), new_aliases)
392 def test_dont_add_caret_if_present(self):
393 # The 2.1 alias could have had a leading '^' even though not required.
394 aliases = ['^alias1@example.com',
395 '^alias2@.*',
397 new_aliases = ['^alias1@example.com',
398 '^alias2@.*',
399 '^blank@',
401 self._pckdict['acceptable_aliases'] = list_to_string(aliases)
402 self._import()
403 alias_set = IAcceptableAliasSet(self._mlist)
404 self.assertEqual(sorted(alias_set.aliases), new_aliases)
406 def test_info_non_ascii(self):
407 # info can contain non-ascii characters.
408 info = 'O idioma aceito \xe9 somente Portugu\xeas do Brasil'
409 self._pckdict['info'] = info.encode('utf-8')
410 self._import()
411 self.assertEqual(self._mlist.info, info,
412 'Encoding to UTF-8 is not handled')
413 # Test fallback to ascii with replace.
414 self._pckdict['info'] = info.encode('iso-8859-1')
415 # Suppress warning messages in test output.
416 with mock.patch('sys.stderr'):
417 self._import()
418 self.assertEqual(
419 self._mlist.info,
420 self._pckdict['info'].decode('ascii', 'replace'),
421 "We don't fall back to replacing non-ascii chars")
423 def test_preferred_language(self):
424 self._pckdict['preferred_language'] = b'ja'
425 english = getUtility(ILanguageManager).get('en')
426 japanese = getUtility(ILanguageManager).get('ja')
427 self.assertEqual(self._mlist.preferred_language, english)
428 self._import()
429 self.assertEqual(self._mlist.preferred_language, japanese)
431 def test_preferred_language_unknown_previous(self):
432 # When the previous language is unknown, it should not fail.
433 self._mlist._preferred_language = 'xx'
434 self._import()
435 english = getUtility(ILanguageManager).get('en')
436 self.assertEqual(self._mlist.preferred_language, english)
438 def test_new_language(self):
439 self._pckdict['preferred_language'] = b'xx_XX'
440 try:
441 self._import()
442 except Import21Error as error:
443 # Check the message.
444 self.assertIn('[language.xx_XX]', str(error))
445 else:
446 self.fail('Import21Error was not raised')
448 def test_encode_ascii_prefixes(self):
449 self._pckdict['encode_ascii_prefixes'] = 2
450 self.assertEqual(self._mlist.encode_ascii_prefixes, False)
451 self._import()
452 self.assertEqual(self._mlist.encode_ascii_prefixes, True)
454 def test_subscription_policy_open(self):
455 self._mlist.subscription_policy = SubscriptionPolicy.confirm
456 self._pckdict['subscribe_policy'] = 0
457 self._import()
458 self.assertEqual(self._mlist.subscription_policy,
459 SubscriptionPolicy.open)
461 def test_subscription_policy_confirm(self):
462 self._mlist.subscription_policy = SubscriptionPolicy.open
463 self._pckdict['subscribe_policy'] = 1
464 self._import()
465 self.assertEqual(self._mlist.subscription_policy,
466 SubscriptionPolicy.confirm)
468 def test_subscription_policy_moderate(self):
469 self._mlist.subscription_policy = SubscriptionPolicy.open
470 self._pckdict['subscribe_policy'] = 2
471 self._import()
472 self.assertEqual(self._mlist.subscription_policy,
473 SubscriptionPolicy.moderate)
475 def test_subscription_policy_confirm_then_moderate(self):
476 self._mlist.subscription_policy = SubscriptionPolicy.open
477 self._pckdict['subscribe_policy'] = 3
478 self._import()
479 self.assertEqual(self._mlist.subscription_policy,
480 SubscriptionPolicy.confirm_then_moderate)
482 def test_header_matches(self):
483 # This test containes real cases of header_filter_rules.
484 self._pckdict['header_filter_rules'] = [
485 ('X\\-Spam\\-Status\\: Yes.*', 3, False),
486 ('^X-Spam-Status: Yes\r\n\r\n', 2, False),
487 ('^X-Spam-Level: \\*\\*\\*.*$', 3, False),
488 ('^X-Spam-Level:.\\*\\*\r\n^X-Spam:.Yes', 3, False),
489 ('Subject: \\[SPAM\\].*', 3, False),
490 ('^Subject: .*loan.*', 3, False),
491 ('Original-Received: from *linkedin.com*\r\n', 3, False),
492 ('X-Git-Module: rhq.*git', 6, False),
493 ('Approved: verysecretpassword', 6, False),
494 ('^Subject: dev-\r\n^Subject: staging-', 3, False),
495 ('from: .*info@aolanchem.com\r\nfrom: .*@jw-express.com',
496 2, False),
497 ('^Subject:.*\\Wwas:\\W', 3, False),
498 ('^Received: from smtp-.*\\.fedoraproject\\.org\r\n'
499 '^Received: from mx.*\\.redhat.com\r\n'
500 '^Resent-date:\r\n'
501 '^Resent-from:\r\n'
502 '^Resent-Message-ID:\r\n'
503 '^Resent-to:\r\n'
504 '^Subject: [^mtv]\r\n',
505 7, False),
506 ('^Received: from fedorahosted\\.org.*by fedorahosted\\.org\r\n'
507 '^Received: from hosted.*\\.fedoraproject.org.*by '
508 'hosted.*\\.fedoraproject\\.org\r\n'
509 '^Received: from hosted.*\\.fedoraproject.org.*by '
510 'fedoraproject\\.org\r\n'
511 '^Received: from hosted.*\\.fedoraproject.org.*by '
512 'fedorahosted\\.org',
513 6, False),
515 error_log = LogFileMark('mailman.error')
516 self._import()
517 self.assertListEqual(
518 [(hm.header, hm.pattern, hm.chain)
519 for hm in self._mlist.header_matches], [
520 ('x-spam-status', 'Yes.*', 'discard'),
521 ('x-spam-status', 'Yes', 'reject'),
522 ('x-spam-level', '\\*\\*\\*.*$', 'discard'),
523 ('x-spam-level', '\\*\\*', 'discard'),
524 ('x-spam', 'Yes', 'discard'),
525 ('subject', '\\[SPAM\\].*', 'discard'),
526 ('subject', '.*loan.*', 'discard'),
527 ('original-received', 'from *linkedin.com*', 'discard'),
528 ('x-git-module', 'rhq.*git', 'accept'),
529 ('approved', 'verysecretpassword', 'accept'),
530 ('subject', 'dev-', 'discard'),
531 ('subject', 'staging-', 'discard'),
532 ('from', '.*info@aolanchem.com', 'reject'),
533 ('from', '.*@jw-express.com', 'reject'),
534 ('subject', '\\Wwas:\\W', 'discard'),
535 ('received', 'from smtp-.*\\.fedoraproject\\.org', 'hold'),
536 ('received', 'from mx.*\\.redhat.com', 'hold'),
537 ('resent-date', '.*', 'hold'),
538 ('resent-from', '.*', 'hold'),
539 ('resent-message-id', '.*', 'hold'),
540 ('resent-to', '.*', 'hold'),
541 ('subject', '[^mtv]', 'hold'),
542 ('received', 'from fedorahosted\\.org.*by fedorahosted\\.org',
543 'accept'),
544 ('received',
545 'from hosted.*\\.fedoraproject.org.*by '
546 'hosted.*\\.fedoraproject\\.org', 'accept'),
547 ('received',
548 'from hosted.*\\.fedoraproject.org.*by '
549 'fedoraproject\\.org', 'accept'),
550 ('received',
551 'from hosted.*\\.fedoraproject.org.*by '
552 'fedorahosted\\.org', 'accept'),
554 loglines = error_log.read().strip()
555 self.assertEqual(len(loglines), 0)
557 def test_header_matches_header_only(self):
558 # Check that an empty pattern is skipped.
559 self._pckdict['header_filter_rules'] = [
560 ('SomeHeaderName', 3, False),
562 error_log = LogFileMark('mailman.error')
563 self._import()
564 self.assertListEqual(self._mlist.header_matches, [])
565 self.assertIn('Unsupported header_filter_rules pattern',
566 error_log.readline())
568 def test_header_matches_anything(self):
569 # Check that a wild card header pattern is skipped.
570 self._pckdict['header_filter_rules'] = [
571 ('.*', 7, False),
573 error_log = LogFileMark('mailman.error')
574 self._import()
575 self.assertListEqual(self._mlist.header_matches, [])
576 self.assertIn('Unsupported header_filter_rules pattern',
577 error_log.readline())
579 def test_header_matches_invalid_re(self):
580 # Check that an invalid regular expression pattern is skipped.
581 self._pckdict['header_filter_rules'] = [
582 ('SomeHeaderName: *invalid-re', 3, False),
584 error_log = LogFileMark('mailman.error')
585 self._import()
586 self.assertListEqual(self._mlist.header_matches, [])
587 self.assertIn('Skipping header_filter rule because of an invalid '
588 'regular expression', error_log.readline())
590 def test_header_matches_defer(self):
591 # Check that a defer action is properly converted.
592 self._pckdict['header_filter_rules'] = [
593 ('^X-Spam-Status: Yes', 0, False),
595 self._import()
596 self.assertListEqual(
597 [(hm.header, hm.pattern, hm.chain)
598 for hm in self._mlist.header_matches],
599 [('x-spam-status', 'Yes', None)]
602 def test_header_matches_unsupported_action(self):
603 # Check that unsupported actions are skipped.
604 for action_num in (1, 4, 5):
605 self._pckdict['header_filter_rules'] = [
606 ('HeaderName: test-re', action_num, False),
608 error_log = LogFileMark('mailman.error')
609 self._import()
610 self.assertListEqual(self._mlist.header_matches, [])
611 self.assertIn('Unsupported header_filter_rules action',
612 error_log.readline())
613 # Avoid a useless warning.
614 for member in self._mlist.members.members:
615 member.unsubscribe()
616 for member in self._mlist.owners.members:
617 member.unsubscribe()
619 def test_header_matches_duplicate(self):
620 # Check that duplicate patterns don't cause tracebacks.
621 self._pckdict['header_filter_rules'] = [
622 ('SomeHeaderName: test-pattern', 3, False),
623 ('SomeHeaderName: test-pattern', 2, False),
625 error_log = LogFileMark('mailman.error')
626 self._import()
627 self.assertListEqual(
628 [(hm.header, hm.pattern, hm.chain)
629 for hm in self._mlist.header_matches],
630 [('someheadername', 'test-pattern', 'discard')]
632 self.assertIn('Skipping duplicate header_filter rule',
633 error_log.readline())
635 def test_long_saunicode(self):
636 # Long SAUnicode fields should truncate for MySql with warning only.
637 long_desc = (
638 'A very long description exceeding 255 ckaracters to test '
639 'truncation of SAUnicode field data for MySQL. just add some '
640 'dots .......................................................'
641 '............................................................'
642 '... and a bit more Thats 255 ending at more')
643 self._pckdict['description'] = long_desc
644 self._import()
645 if is_mysql(config.db.engine):
646 self.assertEqual(long_desc[:255], self._mlist.description)
647 self.assertTrue(self._mlist.description.endswith('a bit more'))
648 else:
649 self.assertEqual(long_desc, self._mlist.description)
652 class TestArchiveImport(unittest.TestCase):
653 """Test conversion of the archive policies.
655 Mailman 2.1 had two variables `archive` and `archive_private`. Now
656 there's just a single `archive_policy` enum.
658 layer = ConfigLayer
660 def setUp(self):
661 self._mlist = create_list('blank@example.com')
662 self._mlist.archive_policy = DummyEnum.val
664 def _do_test(self, pckdict, expected):
665 import_config_pck(self._mlist, pckdict)
666 self.assertEqual(self._mlist.archive_policy, expected)
668 def test_public(self):
669 self._do_test(dict(archive=True, archive_private=False),
670 ArchivePolicy.public)
672 def test_private(self):
673 self._do_test(dict(archive=True, archive_private=True),
674 ArchivePolicy.private)
676 def test_no_archive(self):
677 self._do_test(dict(archive=False, archive_private=False),
678 ArchivePolicy.never)
680 def test_bad_state(self):
681 # For some reason, the old list has the invalid archiving state where
682 # `archive` is False and `archive_private` is True. It doesn't matter
683 # because this still collapses to the same enum value.
684 self._do_test(dict(archive=False, archive_private=True),
685 ArchivePolicy.never)
687 def test_missing_archive_key(self):
688 # For some reason, the old list didn't have an `archive` key. We
689 # treat this as if no archiving is done.
690 self._do_test(dict(archive_private=False), ArchivePolicy.never)
692 def test_missing_archive_key_archive_public(self):
693 # For some reason, the old list didn't have an `archive` key, and it
694 # has weird value for archive_private. We treat this as if no
695 # archiving is done.
696 self._do_test(dict(archive_private=True), ArchivePolicy.never)
698 def test_missing_archive_private_key(self):
699 # For some reason, the old list was missing an `archive_private` key.
700 # For maximum safety, we treat this as private archiving.
701 self._do_test(dict(archive=True), ArchivePolicy.private)
704 class TestFilterActionImport(unittest.TestCase):
705 # The mlist.filter_action enum values have changed. In Mailman 2.1 the
706 # order was 'Discard', 'Reject', 'Forward to List Owner', 'Preserve'.
708 layer = ConfigLayer
710 def setUp(self):
711 self._mlist = create_list('blank@example.com')
712 self._mlist.filter_action = DummyEnum.val
714 def _do_test(self, original, expected):
715 import_config_pck(self._mlist, dict(filter_action=original))
716 self.assertEqual(self._mlist.filter_action, expected)
718 def test_discard(self):
719 self._do_test(0, FilterAction.discard)
721 def test_reject(self):
722 self._do_test(1, FilterAction.reject)
724 def test_forward(self):
725 self._do_test(2, FilterAction.forward)
727 def test_preserve(self):
728 self._do_test(3, FilterAction.preserve)
731 class TestMemberActionImport(unittest.TestCase):
732 # The mlist.default_member_action and mlist.default_nonmember_action enum
733 # values are different in Mailman 2.1; they have been merged into a
734 # single enum in Mailman 3.
736 # For default_member_action, which used to be called
737 # member_moderation_action, the values were:
738 # 0==Hold, 1=Reject, 2==Discard
740 # For default_nonmember_action, which used to be called
741 # generic_nonmember_action, the values were:
742 # 0==Accept, 1==Hold, 2==Reject, 3==Discard
744 layer = ConfigLayer
746 def setUp(self):
747 self._mlist = create_list('blank@example.com')
748 self._mlist.default_member_action = DummyEnum.val
749 self._mlist.default_nonmember_action = DummyEnum.val
750 self._pckdict = dict(
751 member_moderation_action=DummyEnum.val,
752 generic_nonmember_action=DummyEnum.val,
755 def _do_test(self, expected):
756 # Suppress warning messages in the test output.
757 with mock.patch('sys.stderr'):
758 import_config_pck(self._mlist, self._pckdict)
759 for key, value in expected.items():
760 self.assertEqual(getattr(self._mlist, key), value)
762 def test_member_defer(self):
763 # If default_member_moderation is not set, the member_moderation_action
764 # value is meaningless.
765 self._pckdict['default_member_moderation'] = 0
766 for mmaval in range(3):
767 self._pckdict['member_moderation_action'] = mmaval
768 self._do_test(dict(default_member_action=Action.defer))
770 def test_member_hold(self):
771 self._pckdict['default_member_moderation'] = 1
772 self._pckdict['member_moderation_action'] = 0
773 self._do_test(dict(default_member_action=Action.hold))
775 def test_member_reject(self):
776 self._pckdict['default_member_moderation'] = 1
777 self._pckdict['member_moderation_action'] = 1
778 self._do_test(dict(default_member_action=Action.reject))
780 def test_member_discard(self):
781 self._pckdict['default_member_moderation'] = 1
782 self._pckdict['member_moderation_action'] = 2
783 self._do_test(dict(default_member_action=Action.discard))
785 def test_nonmember_accept(self):
786 self._pckdict['generic_nonmember_action'] = 0
787 self._do_test(dict(default_nonmember_action=Action.defer))
789 def test_nonmember_hold(self):
790 self._pckdict['generic_nonmember_action'] = 1
791 self._do_test(dict(default_nonmember_action=Action.hold))
793 def test_nonmember_reject(self):
794 self._pckdict['generic_nonmember_action'] = 2
795 self._do_test(dict(default_nonmember_action=Action.reject))
797 def test_nonmember_discard(self):
798 self._pckdict['generic_nonmember_action'] = 3
799 self._do_test(dict(default_nonmember_action=Action.discard))
802 class TestConvertToURI(unittest.TestCase):
803 # The following values were plain text, and are now URIs in Mailman 3:
804 # - welcome_message
805 # - goodbye_message
806 # - msg_header
807 # - msg_footer
808 # - digest_header
809 # - digest_footer
811 # We intentionally don't do welcome_message because it doesn't map well
812 # from MM 2.1
814 # The templates contain variables that must be replaced:
815 # - %(real_name)s -> %(display_name)s
816 # - %(real_name)s@%(host_name)s -> %(fqdn_listname)s
817 # - %(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s
818 # -> %(listinfo_uri)s
820 layer = ConfigLayer
821 maxDiff = None
823 def setUp(self):
824 self._mlist = create_list('blank@example.com')
825 self._conf_mapping = dict(
826 goodbye_msg='list:user:notice:goodbye',
827 msg_header='list:member:regular:header',
828 msg_footer='list:member:regular:footer',
829 digest_header='list:member:digest:header',
830 digest_footer='list:member:digest:footer',
832 self._pckdict = dict()
833 # Remove any residual template files.
834 with ExitStack() as resources:
835 filepath = list(search(resources, '', self._mlist))[0]
836 rmtree(filepath, ignore_errors=True)
838 def test_text_to_uri(self):
839 for oldvar, newvar in self._conf_mapping.items():
840 self._pckdict[str(oldvar)] = b'TEST VALUE'
841 import_config_pck(self._mlist, self._pckdict)
842 text = decorate(newvar, self._mlist)
843 self.assertEqual(
844 text, 'TEST VALUE',
845 'Old variable %s was not properly imported to %s'
846 % (oldvar, newvar))
848 def test_substitutions(self):
849 test_text = ('UNIT TESTING %(real_name)s mailing list\n'
850 '%(real_name)s@%(host_name)s')
851 expected_text = ('UNIT TESTING ${display_name} mailing list '
852 '-- ${listname}\n'
853 'To unsubscribe send an email to '
854 '${short_listname}-leave@${domain}')
855 for oldvar, newvar in self._conf_mapping.items():
856 self._pckdict[str(oldvar)] = str(test_text)
857 import_config_pck(self._mlist, self._pckdict)
858 text = getUtility(ITemplateLoader).get(newvar, self._mlist)
859 self.assertEqual(
860 text, expected_text,
861 'Old variables were not converted for %s' % newvar)
863 def test_more_substitutions(self):
864 test_text = ('UNIT TESTING %(real_name)s some list\n'
865 'The listname is %(list_name)s\n'
866 'The description is %(description)s\n'
867 'Info is %(info)s\n'
868 'User address to is %(user_address)s\n'
869 'User delivered is %(user_delivered_to)s\n'
870 'User password: %(user_password)s\n'
871 'User name %(user_name)s\n')
872 expected_text = ('UNIT TESTING ${display_name} some list\n'
873 'The listname is ${listname}\n'
874 'The description is ${description}\n'
875 'Info is ${info}\n'
876 'User address to is ${user_email}\n'
877 'User delivered is ${user_delivered_to}\n'
878 'User password: \n'
879 'User name ${user_name}\n')
880 for oldvar, newvar in self._conf_mapping.items():
881 self._pckdict[str(oldvar)] = str(test_text)
882 import_config_pck(self._mlist, self._pckdict)
883 text = getUtility(ITemplateLoader).get(newvar, self._mlist)
884 self.assertEqual(
885 text, expected_text,
886 'Old variables were not converted for %s' % newvar)
888 def test_cgiext_dropped(self):
889 test_text = ('UNIT TESTING %(real_name)s mailing list\n'
890 'https://example.com/mailman/listinfo%(cgiext)s')
891 expected_text = ('UNIT TESTING ${display_name} mailing list '
892 '-- ${listname}\n'
893 'https://example.com/mailman/listinfo')
894 for oldvar, newvar in self._conf_mapping.items():
895 self._pckdict[str(oldvar)] = str(test_text)
896 import_config_pck(self._mlist, self._pckdict)
897 text = getUtility(ITemplateLoader).get(newvar, self._mlist)
898 self.assertEqual(
899 text, expected_text,
900 'Old variables were not converted for %s' % newvar)
902 def test_unconverted_issues_message(self):
903 test_text = ('UNIT TESTING %(real_name)s mailing list\n'
904 '%(web_page_url)slistinfo/')
905 expected_text = ('UNIT TESTING ${display_name} mailing list '
906 '-- ${listname}\n'
907 '%(web_page_url)slistinfo/')
908 for oldvar, newvar in self._conf_mapping.items():
909 serr = StringIO()
910 with redirect_stderr(serr):
911 self._pckdict[str(oldvar)] = str(test_text)
912 import_config_pck(self._mlist, self._pckdict)
913 text = getUtility(ITemplateLoader).get(newvar, self._mlist)
914 self.assertEqual(
915 text, expected_text,
916 'Old variables were not converted for %s' % newvar)
917 self.assertEqual(serr.getvalue(),
918 'Unable to convert mailing list attribute: '
919 f'{oldvar} with value "UNIT TESTING '
920 '${display_name} mailing list -- ${listname}\n'
921 '%(web_page_url)slistinfo/"\n')
922 serr.close()
924 def test_no_unconverted_issues_no_message(self):
925 test_text = ('UNIT TESTING %(real_name)s mailing list\n'
926 'List description: %(description)s')
927 expected_text = ('UNIT TESTING ${display_name} mailing list '
928 '-- ${listname}\n'
929 'List description: ${description}')
930 for oldvar, newvar in self._conf_mapping.items():
931 serr = StringIO()
932 with redirect_stderr(serr):
933 self._pckdict[str(oldvar)] = str(test_text)
934 import_config_pck(self._mlist, self._pckdict)
935 text = getUtility(ITemplateLoader).get(newvar, self._mlist)
936 self.assertEqual(
937 text, expected_text,
938 'Old variables were not converted for %s' % newvar)
939 self.assertEqual(serr.getvalue(), '')
940 serr.close()
942 def test_drop_listinfo_if_contains_crs(self):
943 self._pckdict = dict(msg_footer=(
944 '_______________________________________________\r\n'
945 'This is a message footer\r\n'
946 '%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s\r\n'
948 expected_text = (
949 '_______________________________________________\n'
950 'This is a message footer\n'
952 import_config_pck(self._mlist, self._pckdict)
953 newvar = 'list:member:regular:footer'
954 text = getUtility(ITemplateLoader).get(newvar, self._mlist)
955 self.assertEqual(
956 text, expected_text, 'Listinfo URL not dropped')
958 def test_keep_default(self):
959 # If the value was not changed from MM2.1's default, don't import it.
960 default_msg_footer = (
961 '_______________________________________________\n'
962 '%(real_name)s mailing list\n'
963 '%(real_name)s@%(host_name)s\n'
964 '%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s\n'
966 loader = getUtility(ITemplateLoader)
967 for oldvar in ('msg_footer', 'digest_footer'):
968 newvar = self._conf_mapping[oldvar]
969 self._pckdict[str(oldvar)] = str(default_msg_footer)
970 try:
971 old_value = loader.get(newvar, self._mlist)
972 except URLError:
973 old_value = None
974 import_config_pck(self._mlist, self._pckdict)
975 try:
976 new_value = loader.get(newvar, self._mlist)
977 except URLError:
978 new_value = None
979 self.assertEqual(
980 old_value, new_value,
981 '{} changed unexpectedly: {} != {}'.format(
982 newvar, old_value, new_value))
984 def test_keep_default_with_crs(self):
985 # If the value was not changed from MM2.1's default, don't import it.
986 default_msg_footer = (
987 '_______________________________________________\n'
988 '%(real_name)s mailing list\n'
989 '%(real_name)s@%(host_name)s\n'
990 '%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s\n'
992 loader = getUtility(ITemplateLoader)
993 for oldvar in ('msg_footer', 'digest_footer'):
994 newvar = self._conf_mapping[oldvar]
995 self._pckdict[str(oldvar)] = str(default_msg_footer).replace(
996 '\n', '\r\n')
997 try:
998 old_value = loader.get(newvar, self._mlist)
999 except URLError:
1000 old_value = None
1001 import_config_pck(self._mlist, self._pckdict)
1002 try:
1003 new_value = loader.get(newvar, self._mlist)
1004 except URLError:
1005 new_value = None
1006 self.assertEqual(
1007 old_value, new_value,
1008 '{} changed unexpectedly: {} != {}'.format(
1009 newvar, old_value, new_value))
1011 def test_keep_default_if_fqdn_changed(self):
1012 # Use case: importing the old a@ex.com into b@ex.com. We can't check
1013 # if it changed from the default so don't import. We may do more harm
1014 # than good and it's easy to change if needed.
1015 test_value = b'TEST-VALUE'
1016 # We need an IDomain for this mail_host.
1017 getUtility(IDomainManager).add('test.example.com')
1018 manager = getUtility(ITemplateManager)
1019 for oldvar, newvar in self._conf_mapping.items():
1020 self._mlist.mail_host = 'example.com'
1021 self._pckdict['mail_host'] = b'test.example.com'
1022 self._pckdict[str(oldvar)] = test_value
1023 try:
1024 old_value = manager.get(newvar, 'blank.example.com')
1025 except URLError:
1026 old_value = None
1027 # Suppress warning messages in the test output.
1028 with mock.patch('sys.stderr'):
1029 import_config_pck(self._mlist, self._pckdict)
1030 try:
1031 new_value = manager.get(newvar, 'test.example.com')
1032 except URLError:
1033 new_value = None
1034 self.assertEqual(
1035 old_value, new_value,
1036 '{} changed unexpectedly: {} != {}'.format(
1037 newvar, old_value, new_value))
1039 def test_unicode(self):
1040 # non-ascii templates
1041 for oldvar in self._conf_mapping:
1042 self._pckdict[str(oldvar)] = b'Ol\xe1!'
1043 import_config_pck(self._mlist, self._pckdict)
1044 for oldvar, newvar in self._conf_mapping.items():
1045 text = decorate(newvar, self._mlist)
1046 expected = u'Ol\ufffd!'
1047 self.assertEqual(
1048 text, expected,
1049 '{} -> {} did not get converted'.format(oldvar, newvar))
1051 def test_unicode_in_default(self):
1052 # What if the default template is already in UTF-8? For example, if
1053 # you import it twice.
1054 footer = b'\xe4\xb8\xad $listinfo_uri'
1055 footer_path = os.path.join(
1056 config.VAR_DIR, 'templates', 'lists',
1057 'blank@example.com', 'en', 'footer.txt')
1058 makedirs(os.path.dirname(footer_path))
1059 with open(footer_path, 'wb') as fp:
1060 fp.write(footer)
1061 self._pckdict['msg_footer'] = b'NEW-VALUE'
1062 import_config_pck(self._mlist, self._pckdict)
1063 text = decorate('list:member:regular:footer', self._mlist)
1064 self.assertEqual(text, 'NEW-VALUE')
1067 class TestRosterImport(unittest.TestCase):
1068 """Test that rosters are imported correctly."""
1070 layer = ConfigLayer
1072 def setUp(self):
1073 self._mlist = create_list('blank@example.com')
1074 self._pckdict = {
1075 'members': {
1076 'anne@example.com': 0,
1077 'bob@example.com': b'bob@ExampLe.Com',
1079 'digest_members': {
1080 'cindy@example.com': 0,
1081 'dave@example.com': b'dave@ExampLe.Com',
1083 'passwords': {
1084 'anne@example.com': b'annepass',
1085 'bob@example.com': b'bobpass',
1086 'cindy@example.com': b'cindypass',
1087 'dave@example.com': b'davepass',
1089 'language': {
1090 'anne@example.com': b'fr',
1091 'bob@example.com': b'de',
1092 'cindy@example.com': b'es',
1093 'dave@example.com': b'it',
1095 # Usernames are unicode strings in the pickle
1096 'usernames': {
1097 'anne@example.com': 'Anne',
1098 'bob@example.com': 'Bob',
1099 'cindy@example.com': 'Cindy',
1100 'dave@example.com': 'Dave',
1102 'owner': [
1103 'anne@example.com',
1104 'emily@example.com',
1106 'moderator': [
1107 'bob@example.com',
1108 'fred@example.com',
1110 'accept_these_nonmembers': [
1111 'gene@example.com',
1112 '^gene-.*@example.com',
1114 'hold_these_nonmembers': [
1115 'homer@example.com',
1116 '^homer-.*@example.com',
1118 'reject_these_nonmembers': [
1119 'iris@example.com',
1120 '^iris-.*@example.com',
1122 'discard_these_nonmembers': [
1123 'kenny@example.com',
1124 '^kenny-.*@example.com',
1127 self._usermanager = getUtility(IUserManager)
1128 language_manager = getUtility(ILanguageManager)
1129 for code in self._pckdict['language'].values():
1130 if isinstance(code, bytes):
1131 code = code.decode('utf-8')
1132 if code not in language_manager.codes:
1133 language_manager.add(code, 'utf-8', code)
1135 def test_member(self):
1136 import_config_pck(self._mlist, self._pckdict)
1137 for name in ('anne', 'bob', 'cindy', 'dave'):
1138 addr = '%s@example.com' % name
1139 self.assertIn(addr,
1140 [a.email for a in self._mlist.members.addresses],
1141 'Address %s was not imported' % addr)
1142 self.assertIn('anne@example.com',
1143 [a.email for a in self._mlist.regular_members.addresses])
1144 self.assertIn('bob@example.com',
1145 [a.email for a in self._mlist.regular_members.addresses])
1146 self.assertIn('cindy@example.com',
1147 [a.email for a in self._mlist.digest_members.addresses])
1148 self.assertIn('dave@example.com',
1149 [a.email for a in self._mlist.digest_members.addresses])
1151 def test_original_email(self):
1152 import_config_pck(self._mlist, self._pckdict)
1153 bob = self._usermanager.get_address('bob@example.com')
1154 self.assertEqual(bob.original_email, 'bob@ExampLe.Com')
1155 dave = self._usermanager.get_address('dave@example.com')
1156 self.assertEqual(dave.original_email, 'dave@ExampLe.Com')
1158 def test_language(self):
1159 import_config_pck(self._mlist, self._pckdict)
1160 for name in ('anne', 'bob', 'cindy', 'dave'):
1161 addr = '%s@example.com' % name
1162 member = self._mlist.members.get_member(addr)
1163 self.assertIsNotNone(member, 'Address %s was not imported' % addr)
1164 code = self._pckdict['language'][addr]
1165 if isinstance(code, bytes):
1166 code = code.decode('utf-8')
1167 self.assertEqual(member.preferred_language.code, code)
1169 def test_new_language(self):
1170 self._pckdict['language']['anne@example.com'] = b'xx_XX'
1171 try:
1172 import_config_pck(self._mlist, self._pckdict)
1173 except Import21Error as error:
1174 self.assertIn('[language.xx_XX]', str(error))
1175 else:
1176 self.fail('Import21Error was not raised')
1178 def test_username(self):
1179 import_config_pck(self._mlist, self._pckdict)
1180 for name in ('anne', 'bob', 'cindy', 'dave'):
1181 addr = '%s@example.com' % name
1182 user = self._usermanager.get_user(addr)
1183 address = self._usermanager.get_address(addr)
1184 self.assertIsNotNone(user, 'User %s was not imported' % addr)
1185 self.assertIsNotNone(address, 'Address %s was not imported' % addr)
1186 display_name = self._pckdict['usernames'][addr]
1187 self.assertEqual(
1188 user.display_name, display_name,
1189 'The display name was not set for User %s' % addr)
1190 self.assertEqual(
1191 address.display_name, display_name,
1192 'The display name was not set for Address %s' % addr)
1194 def test_owner(self):
1195 import_config_pck(self._mlist, self._pckdict)
1196 for name in ('anne', 'emily'):
1197 addr = '%s@example.com' % name
1198 self.assertIn(addr,
1199 [a.email for a in self._mlist.owners.addresses],
1200 'Address %s was not imported as owner' % addr)
1201 self.assertNotIn(
1202 'emily@example.com',
1203 [a.email for a in self._mlist.members.addresses],
1204 'Address emily@ was wrongly added to the members list')
1206 def test_moderator(self):
1207 import_config_pck(self._mlist, self._pckdict)
1208 for name in ('bob', 'fred'):
1209 addr = '%s@example.com' % name
1210 self.assertIn(addr,
1211 [a.email for a in self._mlist.moderators.addresses],
1212 'Address %s was not imported as moderator' % addr)
1213 self.assertNotIn('fred@example.com',
1214 [a.email for a in self._mlist.members.addresses],
1215 'Address fred@ was wrongly added to the members list')
1217 # Commented out because password importing has been disabled.
1218 # def test_password(self):
1219 # # self.anne.password = config.password_context.encrypt('abc123')
1220 # import_config_pck(self._mlist, self._pckdict)
1221 # for name in ('anne', 'bob', 'cindy', 'dave'):
1222 # addr = '%s@example.com' % name
1223 # user = self._usermanager.get_user(addr)
1224 # self.assertIsNotNone(user, 'Address %s was not imported' % addr)
1225 # self.assertEqual(
1226 # user.password, '{plaintext}%spass' % name,
1227 # 'Password for %s was not imported' % addr)
1229 def test_same_user(self):
1230 # Adding the address of an existing User must not create another user.
1231 user = self._usermanager.create_user('anne@example.com', 'Anne')
1232 user.register('bob@example.com') # secondary email
1233 import_config_pck(self._mlist, self._pckdict)
1234 member = self._mlist.members.get_member('bob@example.com')
1235 self.assertEqual(member.user, user)
1237 def test_owner_and_moderator_delivery_enabled(self):
1238 # If an owner or moderator is a member with delivery disabled, the
1239 # imported owner/moderator must have delivery enabled.
1240 # Set anne and bob's delivery status disabled by user.
1241 self._pckdict['delivery_status'] = {
1242 'anne@example.com': (2, 1612366744.399534),
1243 'bob@example.com': (2, 1612366744.399534)}
1244 import_config_pck(self._mlist, self._pckdict)
1245 self.assertEqual(
1246 self._mlist.owners.get_member('anne@example.com').delivery_status,
1247 DeliveryStatus.enabled)
1248 self.assertEqual(
1249 self._mlist.moderators.get_member('bob@example.com').
1250 delivery_status, DeliveryStatus.enabled)
1252 def test_owner_and_moderator_not_lowercase(self):
1253 # In the v2.1 pickled dict, the owner and moderator lists are not
1254 # necessarily lowercased already.
1255 self._pckdict['owner'] = [b'Anne@example.com']
1256 self._pckdict['moderator'] = [b'Anne@example.com']
1257 import_config_pck(self._mlist, self._pckdict)
1258 self.assertIn('anne@example.com',
1259 [a.email for a in self._mlist.owners.addresses])
1260 self.assertIn('anne@example.com',
1261 [a.email for a in self._mlist.moderators.addresses])
1263 def test_address_already_exists_but_no_user(self):
1264 # An address already exists, but it is not linked to a user nor
1265 # subscribed.
1266 anne_addr = self._usermanager.create_address(
1267 'anne@example.com', 'Anne')
1268 import_config_pck(self._mlist, self._pckdict)
1269 anne = self._usermanager.get_user('anne@example.com')
1270 self.assertTrue(anne.controls('anne@example.com'))
1271 self.assertIn(anne_addr, self._mlist.regular_members.addresses)
1273 def test_address_already_subscribed_but_no_user(self):
1274 # An address is already subscribed, but it is not linked to a user.
1275 anne_addr = self._usermanager.create_address(
1276 'anne@example.com', 'Anne')
1277 self._mlist.subscribe(anne_addr)
1278 # Suppress warning messages in test output.
1279 with mock.patch('sys.stderr'):
1280 import_config_pck(self._mlist, self._pckdict)
1281 anne = self._usermanager.get_user('anne@example.com')
1282 self.assertTrue(anne.controls('anne@example.com'))
1284 def test_invalid_original_email(self):
1285 # When the member has an original email address (i.e. the
1286 # case-preserved version) that is invalid, their new address record's
1287 # original_email attribute will only be the case insensitive version.
1288 self._pckdict['members']['anne@example.com'] = b'invalid email address'
1289 try:
1290 import_config_pck(self._mlist, self._pckdict)
1291 except InvalidEmailAddressError as error:
1292 self.fail(error)
1293 self.assertIn('anne@example.com',
1294 [a.email for a in self._mlist.members.addresses])
1295 anne = self._usermanager.get_address('anne@example.com')
1296 self.assertEqual(anne.original_email, 'anne@example.com')
1298 def test_invalid_email(self):
1299 # When a member's email address is invalid, that member is skipped
1300 # during the import.
1301 self._pckdict['members'] = {
1302 'anne@example.com': 0,
1303 'invalid email address': b'invalid email address'
1305 self._pckdict['digest_members'] = {}
1306 try:
1307 import_config_pck(self._mlist, self._pckdict)
1308 except InvalidEmailAddressError as error:
1309 self.fail(error)
1310 self.assertEqual(['anne@example.com'],
1311 [a.email for a in self._mlist.members.addresses])
1313 def test_no_email_sent(self):
1314 # No welcome message is sent to newly imported members.
1315 self.assertTrue(self._mlist.send_welcome_message)
1316 import_config_pck(self._mlist, self._pckdict)
1317 self.assertIn('anne@example.com',
1318 [a.email for a in self._mlist.members.addresses])
1319 # There are no messages in any of the queues.
1320 for queue, switchboard in config.switchboards.items():
1321 file_count = len(switchboard.files)
1322 self.assertEqual(file_count, 0,
1323 "Unexpected queue '{}' file count: {}".format(
1324 queue, file_count))
1325 self.assertTrue(self._mlist.send_welcome_message)
1327 def test_nonmembers(self):
1328 import_config_pck(self._mlist, self._pckdict)
1329 expected = {
1330 'gene': Action.defer,
1331 'homer': Action.hold,
1332 'iris': Action.reject,
1333 'kenny': Action.discard,
1335 for name, action in expected.items():
1336 self.assertIn('{}@example.com'.format(name),
1337 [a.email for a in self._mlist.nonmembers.addresses],
1338 'Address {} was not imported'.format(name))
1339 member = self._mlist.nonmembers.get_member(
1340 '{}@example.com'.format(name))
1341 self.assertEqual(member.moderation_action, action)
1342 # Action.defer maps from accept; map it back to get the name.
1343 if action == Action.defer:
1344 action = Action.accept
1345 # Only regexps should remain in the list property.
1346 list_prop = getattr(
1347 self._mlist,
1348 '{}_these_nonmembers'.format(action.name))
1349 self.assertEqual(len(list_prop), 1)
1350 self.assertTrue(all(addr.startswith('^') for addr in list_prop))
1352 def test_nonmember_following_member(self):
1353 self._pckdict['hold_these_nonmembers'] = [
1354 'linda@example.com',
1355 'homer@example.com',
1357 self._pckdict['members']['linda@example.com'] = 0
1358 self._pckdict['user_options'] = {'linda@example.com': 1}
1359 import_config_pck(self._mlist, self._pckdict)
1360 member = self._mlist.nonmembers.get_member('linda@example.com')
1361 self.assertEqual(member.moderation_action, Action.defer)
1362 member = self._mlist.nonmembers.get_member('homer@example.com')
1363 self.assertEqual(member.moderation_action, Action.hold)
1365 def test_no_import_banned_address(self):
1366 # Banned addresses should not be imported with any role.
1367 self._pckdict['ban_list'] = [b'^.*example.com']
1368 import_config_pck(self._mlist, self._pckdict)
1369 self.assertEqual([], list(self._mlist.owners.addresses))
1370 self.assertEqual([], list(self._mlist.moderators.addresses))
1371 self.assertEqual([], list(self._mlist.members.addresses))
1372 self.assertEqual([], list(self._mlist.nonmembers.addresses))
1375 class TestRosterVisibilityImport(unittest.TestCase):
1376 """Test that member_roster_visibility is imported correctly.
1378 Mailman 2.1 lists have a private_roster attribute to control roster
1379 visibility with values 0==public, 1==members, 2==admins
1380 These correspond to the Mailman 3 member_roster_visibility values
1381 RosterVisibility.public, RosterVisibility.members and
1382 RosterVisibility.moderators
1384 layer = ConfigLayer
1386 def setUp(self):
1387 self._mlist = create_list('blank@example.com')
1388 self._mlist.member_roster_visibility = DummyEnum.val
1390 def _do_test(self, original, expected):
1391 import_config_pck(self._mlist, dict(private_roster=original))
1392 self.assertEqual(self._mlist.member_roster_visibility, expected)
1394 def test_roster_visibility_public(self):
1395 self._do_test(0, RosterVisibility.public)
1397 def test_roster_visibility_members(self):
1398 self._do_test(1, RosterVisibility.members)
1400 def test_roster_visibility_moderators(self):
1401 self._do_test(2, RosterVisibility.moderators)
1403 def test_roster_visibility_bad(self):
1404 self._do_test(3, DummyEnum.val)
1407 class TestPreferencesImport(unittest.TestCase):
1408 """Preferences get imported too."""
1410 layer = ConfigLayer
1412 def setUp(self):
1413 self._mlist = create_list('blank@example.com')
1414 self._pckdict = dict(
1415 members={'anne@example.com': 0},
1416 user_options=dict(),
1417 delivery_status=dict(),
1419 self._usermanager = getUtility(IUserManager)
1421 def _do_test(self, oldvalue, expected):
1422 self._pckdict['user_options']['anne@example.com'] = oldvalue
1423 import_config_pck(self._mlist, self._pckdict)
1424 user = self._usermanager.get_user('anne@example.com')
1425 self.assertIsNotNone(user, 'User was not imported')
1426 member = self._mlist.members.get_member('anne@example.com')
1427 self.assertIsNotNone(member, 'Address was not subscribed')
1428 for exp_name, exp_val in expected.items():
1429 try:
1430 currentval = getattr(member, exp_name)
1431 except AttributeError:
1432 # hide_address has no direct getter
1433 currentval = getattr(member.preferences, exp_name)
1434 self.assertEqual(
1435 currentval, exp_val,
1436 'Preference %s was not imported' % exp_name)
1437 # XXX: should I check that other params are still equal to
1438 # mailman.core.constants.system_preferences?
1440 def test_acknowledge_posts(self):
1441 # AcknowledgePosts
1442 self._do_test(4, dict(acknowledge_posts=True))
1444 def test_hide_address(self):
1445 # ConcealSubscription
1446 self._do_test(16, dict(hide_address=True))
1448 def test_receive_own_postings(self):
1449 # DontReceiveOwnPosts
1450 self._do_test(2, dict(receive_own_postings=False))
1452 def test_receive_list_copy(self):
1453 # DontReceiveDuplicates
1454 self._do_test(256, dict(receive_list_copy=False))
1456 def test_digest_plain(self):
1457 # Digests & DisableMime
1458 self._pckdict['digest_members'] = self._pckdict['members'].copy()
1459 self._pckdict['members'] = dict()
1460 self._do_test(8, dict(delivery_mode=DeliveryMode.plaintext_digests))
1462 def test_digest_mime(self):
1463 # Digests & not DisableMime
1464 self._pckdict['digest_members'] = self._pckdict['members'].copy()
1465 self._pckdict['members'] = dict()
1466 self._do_test(0, dict(delivery_mode=DeliveryMode.mime_digests))
1468 def test_delivery_status(self):
1469 # Look for the pckdict['delivery_status'] key which will look like
1470 # (status, time) where status is among the following:
1471 # ENABLED = 0 # enabled
1472 # UNKNOWN = 1 # legacy disabled
1473 # BYUSER = 2 # disabled by user choice
1474 # BYADMIN = 3 # disabled by admin choice
1475 # BYBOUNCE = 4 # disabled by bounces
1476 for oldval, expected in enumerate((
1477 DeliveryStatus.enabled,
1478 DeliveryStatus.unknown, DeliveryStatus.by_user,
1479 DeliveryStatus.by_moderator, DeliveryStatus.by_bounces)):
1480 self._pckdict['delivery_status']['anne@example.com'] = (oldval, 0)
1481 import_config_pck(self._mlist, self._pckdict)
1482 member = self._mlist.members.get_member('anne@example.com')
1483 self.assertIsNotNone(member, 'Address was not subscribed')
1484 self.assertEqual(member.delivery_status, expected)
1485 member.unsubscribe()
1487 def test_moderate_hold(self):
1488 # Option flag Moderate is translated to the action set in
1489 # member_moderation_action.
1490 self._pckdict['member_moderation_action'] = 0
1491 self._do_test(128, dict(moderation_action=Action.hold))
1493 def test_moderate_reject(self):
1494 # Option flag Moderate is translated to the action set in
1495 # member_moderation_action.
1496 self._pckdict['member_moderation_action'] = 1
1497 self._do_test(128, dict(moderation_action=Action.reject))
1499 def test_moderate_hold_discard(self):
1500 # Option flag Moderate is translated to the action set in
1501 # member_moderation_action.
1502 self._pckdict['member_moderation_action'] = 2
1503 self._do_test(128, dict(moderation_action=Action.discard))
1505 def test_no_moderate(self):
1506 # If the option flag Moderate is not set, the action is defer.
1507 # See: https://gitlab.com/mailman/mailman/merge_requests/100
1508 self._pckdict['member_moderation_action'] = 1 # reject
1509 self._do_test(0, dict(moderation_action=Action.defer))
1511 def test_multiple_options(self):
1512 # DontReceiveDuplicates & DisableMime & SuppressPasswordReminder
1513 # Keys might be Python 2 str/bytes or unicode.
1514 members = self._pckdict['members']
1515 self._pckdict['digest_members'] = members.copy()
1516 self._pckdict['members'] = dict()
1517 self._do_test(296, dict(
1518 receive_list_copy=False,
1519 delivery_mode=DeliveryMode.plaintext_digests,
1522 def test_language_code_none(self):
1523 self.assertIsNone(check_language_code(None))