Checkpointing.
[mailman.git] / src / mailman / app / tests / test_subscriptions.py
blob6b5d88026d239e06716227f467aad7545537e403
1 # Copyright (C) 2011-2015 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the subscription service."""
20 __all__ = [
21 'TestSubscriptionWorkflow',
25 import unittest
27 from mailman.app.lifecycle import create_list
28 from mailman.app.subscriptions import SubscriptionWorkflow
29 from mailman.interfaces.bans import IBanManager
30 from mailman.interfaces.member import MembershipIsBannedError
31 from mailman.interfaces.pending import IPendings
32 from mailman.interfaces.subscriptions import TokenOwner
33 from mailman.testing.helpers import LogFileMark, get_queue_messages
34 from mailman.testing.layers import ConfigLayer
35 from mailman.interfaces.mailinglist import SubscriptionPolicy
36 from mailman.interfaces.usermanager import IUserManager
37 from mailman.utilities.datetime import now
38 from unittest.mock import patch
39 from zope.component import getUtility
43 class TestSubscriptionWorkflow(unittest.TestCase):
44 layer = ConfigLayer
45 maxDiff = None
47 def setUp(self):
48 self._mlist = create_list('test@example.com')
49 self._mlist.admin_immed_notify = False
50 self._anne = 'anne@example.com'
51 self._user_manager = getUtility(IUserManager)
53 def test_start_state(self):
54 # The workflow starts with no tokens or member.
55 workflow = SubscriptionWorkflow(self._mlist)
56 self.assertIsNone(workflow.token)
57 self.assertEqual(workflow.token_owner, TokenOwner.no_one)
58 self.assertIsNone(workflow.member)
60 def test_pended_data(self):
61 # There is a Pendable associated with the held request, and it has
62 # some data associated with it.
63 anne = self._user_manager.create_address(self._anne)
64 workflow = SubscriptionWorkflow(self._mlist, anne)
65 try:
66 workflow.run_thru('send_confirmation')
67 except StopIteration:
68 pass
69 self.assertIsNotNone(workflow.token)
70 pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
71 self.assertEqual(pendable['list_id'], 'test.example.com')
72 self.assertEqual(pendable['address'], 'anne@example.com')
73 self.assertEqual(pendable['hold_date'], '2005-08-01T07:49:23')
74 self.assertEqual(pendable['token_owner'], 'subscriber')
76 def test_user_or_address_required(self):
77 # The `subscriber` attribute must be a user or address.
78 workflow = SubscriptionWorkflow(self._mlist)
79 self.assertRaises(AssertionError, list, workflow)
81 def test_sanity_checks_address(self):
82 # Ensure that the sanity check phase, when given an IAddress, ends up
83 # with a linked user.
84 anne = self._user_manager.create_address(self._anne)
85 workflow = SubscriptionWorkflow(self._mlist, anne)
86 self.assertIsNotNone(workflow.address)
87 self.assertIsNone(workflow.user)
88 workflow.run_thru('sanity_checks')
89 self.assertIsNotNone(workflow.address)
90 self.assertIsNotNone(workflow.user)
91 self.assertEqual(list(workflow.user.addresses)[0].email, self._anne)
93 def test_sanity_checks_user_with_preferred_address(self):
94 # Ensure that the sanity check phase, when given an IUser with a
95 # preferred address, ends up with an address.
96 anne = self._user_manager.make_user(self._anne)
97 address = list(anne.addresses)[0]
98 address.verified_on = now()
99 anne.preferred_address = address
100 workflow = SubscriptionWorkflow(self._mlist, anne)
101 # The constructor sets workflow.address because the user has a
102 # preferred address.
103 self.assertEqual(workflow.address, address)
104 self.assertEqual(workflow.user, anne)
105 workflow.run_thru('sanity_checks')
106 self.assertEqual(workflow.address, address)
107 self.assertEqual(workflow.user, anne)
109 def test_sanity_checks_user_without_preferred_address(self):
110 # Ensure that the sanity check phase, when given a user without a
111 # preferred address, but with at least one linked address, gets an
112 # address.
113 anne = self._user_manager.make_user(self._anne)
114 workflow = SubscriptionWorkflow(self._mlist, anne)
115 self.assertIsNone(workflow.address)
116 self.assertEqual(workflow.user, anne)
117 workflow.run_thru('sanity_checks')
118 self.assertIsNotNone(workflow.address)
119 self.assertEqual(workflow.user, anne)
121 def test_sanity_checks_user_with_multiple_linked_addresses(self):
122 # Ensure that the santiy check phase, when given a user without a
123 # preferred address, but with multiple linked addresses, gets of of
124 # those addresses (exactly which one is undefined).
125 anne = self._user_manager.make_user(self._anne)
126 anne.link(self._user_manager.create_address('anne@example.net'))
127 anne.link(self._user_manager.create_address('anne@example.org'))
128 workflow = SubscriptionWorkflow(self._mlist, anne)
129 self.assertIsNone(workflow.address)
130 self.assertEqual(workflow.user, anne)
131 workflow.run_thru('sanity_checks')
132 self.assertIn(workflow.address.email, ['anne@example.com',
133 'anne@example.net',
134 'anne@example.org'])
135 self.assertEqual(workflow.user, anne)
137 def test_sanity_checks_user_without_addresses(self):
138 # It is an error to try to subscribe a user with no linked addresses.
139 user = self._user_manager.create_user()
140 workflow = SubscriptionWorkflow(self._mlist, user)
141 self.assertRaises(AssertionError, workflow.run_thru, 'sanity_checks')
143 def test_sanity_checks_globally_banned_address(self):
144 # An exception is raised if the address is globally banned.
145 anne = self._user_manager.create_address(self._anne)
146 IBanManager(None).ban(self._anne)
147 workflow = SubscriptionWorkflow(self._mlist, anne)
148 self.assertRaises(MembershipIsBannedError, list, workflow)
150 def test_sanity_checks_banned_address(self):
151 # An exception is raised if the address is banned by the mailing list.
152 anne = self._user_manager.create_address(self._anne)
153 IBanManager(self._mlist).ban(self._anne)
154 workflow = SubscriptionWorkflow(self._mlist, anne)
155 self.assertRaises(MembershipIsBannedError, list, workflow)
157 def test_verification_checks_with_verified_address(self):
158 # When the address is already verified, we skip straight to the
159 # confirmation checks.
160 anne = self._user_manager.create_address(self._anne)
161 anne.verified_on = now()
162 workflow = SubscriptionWorkflow(self._mlist, anne)
163 workflow.run_thru('verification_checks')
164 with patch.object(workflow, '_step_confirmation_checks') as step:
165 next(workflow)
166 step.assert_called_once_with()
168 def test_verification_checks_with_pre_verified_address(self):
169 # When the address is not yet verified, but the pre-verified flag is
170 # passed to the workflow, we skip to the confirmation checks.
171 anne = self._user_manager.create_address(self._anne)
172 workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
173 workflow.run_thru('verification_checks')
174 with patch.object(workflow, '_step_confirmation_checks') as step:
175 next(workflow)
176 step.assert_called_once_with()
177 # And now the address is verified.
178 self.assertIsNotNone(anne.verified_on)
180 def test_verification_checks_confirmation_needed(self):
181 # The address is neither verified, nor is the pre-verified flag set.
182 # A confirmation message must be sent to the user which will also
183 # verify their address.
184 anne = self._user_manager.create_address(self._anne)
185 workflow = SubscriptionWorkflow(self._mlist, anne)
186 workflow.run_thru('verification_checks')
187 with patch.object(workflow, '_step_send_confirmation') as step:
188 next(workflow)
189 step.assert_called_once_with()
190 # The address still hasn't been verified.
191 self.assertIsNone(anne.verified_on)
193 def test_confirmation_checks_open_list(self):
194 # A subscription to an open list does not need to be confirmed or
195 # moderated.
196 self._mlist.subscription_policy = SubscriptionPolicy.open
197 anne = self._user_manager.create_address(self._anne)
198 workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
199 workflow.run_thru('confirmation_checks')
200 with patch.object(workflow, '_step_do_subscription') as step:
201 next(workflow)
202 step.assert_called_once_with()
204 def test_confirmation_checks_no_user_confirmation_needed(self):
205 # A subscription to a list which does not need user confirmation skips
206 # to the moderation checks.
207 self._mlist.subscription_policy = SubscriptionPolicy.moderate
208 anne = self._user_manager.create_address(self._anne)
209 workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
210 workflow.run_thru('confirmation_checks')
211 with patch.object(workflow, '_step_moderation_checks') as step:
212 next(workflow)
213 step.assert_called_once_with()
215 def test_confirmation_checks_confirm_pre_confirmed(self):
216 # The subscription policy requires user confirmation, but their
217 # subscription is pre-confirmed. Since moderation is not required,
218 # the user will be immediately subscribed.
219 self._mlist.subscription_policy = SubscriptionPolicy.confirm
220 anne = self._user_manager.create_address(self._anne)
221 workflow = SubscriptionWorkflow(self._mlist, anne,
222 pre_verified=True,
223 pre_confirmed=True)
224 workflow.run_thru('confirmation_checks')
225 with patch.object(workflow, '_step_do_subscription') as step:
226 next(workflow)
227 step.assert_called_once_with()
229 def test_confirmation_checks_confirm_then_moderate_pre_confirmed(self):
230 # The subscription policy requires user confirmation, but their
231 # subscription is pre-confirmed. Since moderation is required, that
232 # check will be performed.
233 self._mlist.subscription_policy = \
234 SubscriptionPolicy.confirm_then_moderate
235 anne = self._user_manager.create_address(self._anne)
236 workflow = SubscriptionWorkflow(self._mlist, anne,
237 pre_verified=True,
238 pre_confirmed=True)
239 workflow.run_thru('confirmation_checks')
240 with patch.object(workflow, '_step_moderation_checks') as step:
241 next(workflow)
242 step.assert_called_once_with()
244 def test_confirmation_checks_confirm_and_moderate_pre_confirmed(self):
245 # The subscription policy requires user confirmation and moderation,
246 # but their subscription is pre-confirmed.
247 self._mlist.subscription_policy = \
248 SubscriptionPolicy.confirm_then_moderate
249 anne = self._user_manager.create_address(self._anne)
250 workflow = SubscriptionWorkflow(self._mlist, anne,
251 pre_verified=True,
252 pre_confirmed=True)
253 workflow.run_thru('confirmation_checks')
254 with patch.object(workflow, '_step_moderation_checks') as step:
255 next(workflow)
256 step.assert_called_once_with()
258 def test_confirmation_checks_confirmation_needed(self):
259 # The subscription policy requires confirmation and the subscription
260 # is not pre-confirmed.
261 self._mlist.subscription_policy = SubscriptionPolicy.confirm
262 anne = self._user_manager.create_address(self._anne)
263 workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
264 workflow.run_thru('confirmation_checks')
265 with patch.object(workflow, '_step_send_confirmation') as step:
266 next(workflow)
267 step.assert_called_once_with()
269 def test_confirmation_checks_moderate_confirmation_needed(self):
270 # The subscription policy requires confirmation and moderation, and the
271 # subscription is not pre-confirmed.
272 self._mlist.subscription_policy = \
273 SubscriptionPolicy.confirm_then_moderate
274 anne = self._user_manager.create_address(self._anne)
275 workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
276 workflow.run_thru('confirmation_checks')
277 with patch.object(workflow, '_step_send_confirmation') as step:
278 next(workflow)
279 step.assert_called_once_with()
281 def test_moderation_checks_pre_approved(self):
282 # The subscription is pre-approved by the moderator.
283 self._mlist.subscription_policy = SubscriptionPolicy.moderate
284 anne = self._user_manager.create_address(self._anne)
285 workflow = SubscriptionWorkflow(self._mlist, anne,
286 pre_verified=True,
287 pre_approved=True)
288 workflow.run_thru('moderation_checks')
289 with patch.object(workflow, '_step_do_subscription') as step:
290 next(workflow)
291 step.assert_called_once_with()
293 def test_moderation_checks_approval_required(self):
294 # The moderator must approve the subscription.
295 self._mlist.subscription_policy = SubscriptionPolicy.moderate
296 anne = self._user_manager.create_address(self._anne)
297 workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
298 workflow.run_thru('moderation_checks')
299 with patch.object(workflow, '_step_get_moderator_approval') as step:
300 next(workflow)
301 step.assert_called_once_with()
303 def test_do_subscription(self):
304 # An open subscription policy plus a pre-verified address means the
305 # user gets subscribed to the mailing list without any further
306 # confirmations or approvals.
307 self._mlist.subscription_policy = SubscriptionPolicy.open
308 anne = self._user_manager.create_address(self._anne)
309 workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
310 # Consume the entire state machine.
311 list(workflow)
312 # Anne is now a member of the mailing list.
313 member = self._mlist.regular_members.get_member(self._anne)
314 self.assertEqual(member.address, anne)
315 self.assertEqual(workflow.member, member)
316 # No further token is needed.
317 self.assertIsNone(workflow.token)
318 self.assertEqual(workflow.token_owner, TokenOwner.no_one)
320 def test_do_subscription_pre_approved(self):
321 # An moderation-requiring subscription policy plus a pre-verified and
322 # pre-approved address means the user gets subscribed to the mailing
323 # list without any further confirmations or approvals.
324 self._mlist.subscription_policy = SubscriptionPolicy.moderate
325 anne = self._user_manager.create_address(self._anne)
326 workflow = SubscriptionWorkflow(self._mlist, anne,
327 pre_verified=True,
328 pre_approved=True)
329 # Consume the entire state machine.
330 list(workflow)
331 # Anne is now a member of the mailing list.
332 member = self._mlist.regular_members.get_member(self._anne)
333 self.assertEqual(member.address, anne)
334 self.assertEqual(workflow.member, member)
335 # No further token is needed.
336 self.assertIsNone(workflow.token)
337 self.assertEqual(workflow.token_owner, TokenOwner.no_one)
339 def test_do_subscription_pre_approved_pre_confirmed(self):
340 # An moderation-requiring subscription policy plus a pre-verified and
341 # pre-approved address means the user gets subscribed to the mailing
342 # list without any further confirmations or approvals.
343 self._mlist.subscription_policy = \
344 SubscriptionPolicy.confirm_then_moderate
345 anne = self._user_manager.create_address(self._anne)
346 workflow = SubscriptionWorkflow(self._mlist, anne,
347 pre_verified=True,
348 pre_confirmed=True,
349 pre_approved=True)
350 # Consume the entire state machine.
351 list(workflow)
352 # Anne is now a member of the mailing list.
353 member = self._mlist.regular_members.get_member(self._anne)
354 self.assertEqual(member.address, anne)
355 self.assertEqual(workflow.member, member)
356 # No further token is needed.
357 self.assertIsNone(workflow.token)
358 self.assertEqual(workflow.token_owner, TokenOwner.no_one)
360 def test_do_subscription_cleanups(self):
361 # Once the user is subscribed, the token, and its associated pending
362 # database record will be removed from the database.
363 self._mlist.subscription_policy = SubscriptionPolicy.open
364 anne = self._user_manager.create_address(self._anne)
365 workflow = SubscriptionWorkflow(self._mlist, anne,
366 pre_verified=True,
367 pre_confirmed=True,
368 pre_approved=True)
369 # Cache the token.
370 token = workflow.token
371 # Consume the entire state machine.
372 list(workflow)
373 # Anne is now a member of the mailing list.
374 member = self._mlist.regular_members.get_member(self._anne)
375 self.assertEqual(member.address, anne)
376 self.assertEqual(workflow.member, member)
377 # The workflow is done, so it has no token.
378 self.assertIsNone(workflow.token)
379 self.assertEqual(workflow.token_owner, TokenOwner.no_one)
380 # The pendable associated with the token has been evicted.
381 self.assertIsNone(getUtility(IPendings).confirm(token, expunge=False))
382 # There is no saved workflow associated with the token. This shows up
383 # as an exception when we try to restore the workflow.
384 new_workflow = SubscriptionWorkflow(self._mlist)
385 new_workflow.token = token
386 self.assertRaises(LookupError, new_workflow.restore)
388 def test_moderator_approves(self):
389 # The workflow runs until moderator approval is required, at which
390 # point the workflow is saved. Once the moderator approves, the
391 # workflow resumes and the user is subscribed.
392 self._mlist.subscription_policy = SubscriptionPolicy.moderate
393 anne = self._user_manager.create_address(self._anne)
394 workflow = SubscriptionWorkflow(self._mlist, anne,
395 pre_verified=True,
396 pre_confirmed=True)
397 # Consume the entire state machine.
398 list(workflow)
399 # The user is not currently subscribed to the mailing list.
400 member = self._mlist.regular_members.get_member(self._anne)
401 self.assertIsNone(member)
402 self.assertIsNone(workflow.member)
403 # The token is owned by the moderator.
404 self.assertIsNotNone(workflow.token)
405 self.assertEqual(workflow.token_owner, TokenOwner.moderator)
406 # Create a new workflow with the previous workflow's save token, and
407 # restore its state. This models an approved subscription and should
408 # result in the user getting subscribed.
409 approved_workflow = SubscriptionWorkflow(self._mlist)
410 approved_workflow.token = workflow.token
411 approved_workflow.restore()
412 list(approved_workflow)
413 # Now the user is subscribed to the mailing list.
414 member = self._mlist.regular_members.get_member(self._anne)
415 self.assertEqual(member.address, anne)
416 self.assertEqual(approved_workflow.member, member)
417 # No further token is needed.
418 self.assertIsNone(approved_workflow.token)
419 self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one)
421 def test_get_moderator_approval_log_on_hold(self):
422 # When the subscription is held for moderator approval, a message is
423 # logged.
424 mark = LogFileMark('mailman.subscribe')
425 self._mlist.subscription_policy = SubscriptionPolicy.moderate
426 anne = self._user_manager.create_address(self._anne)
427 workflow = SubscriptionWorkflow(self._mlist, anne,
428 pre_verified=True,
429 pre_confirmed=True)
430 # Consume the entire state machine.
431 list(workflow)
432 self.assertIn(
433 'test@example.com: held subscription request from anne@example.com',
434 mark.readline()
437 def test_get_moderator_approval_notifies_moderators(self):
438 # When the subscription is held for moderator approval, and the list
439 # is so configured, a notification is sent to the list moderators.
440 self._mlist.admin_immed_notify = True
441 self._mlist.subscription_policy = SubscriptionPolicy.moderate
442 anne = self._user_manager.create_address(self._anne)
443 workflow = SubscriptionWorkflow(self._mlist, anne,
444 pre_verified=True,
445 pre_confirmed=True)
446 # Consume the entire state machine.
447 list(workflow)
448 items = get_queue_messages('virgin')
449 self.assertEqual(len(items), 1)
450 message = items[0].msg
451 self.assertEqual(message['From'], 'test-owner@example.com')
452 self.assertEqual(message['To'], 'test-owner@example.com')
453 self.assertEqual(
454 message['Subject'],
455 'New subscription request to Test from anne@example.com')
456 self.assertEqual(message.get_payload(), """\
457 Your authorization is required for a mailing list subscription request
458 approval:
460 For: anne@example.com
461 List: test@example.com""")
463 def test_get_moderator_approval_no_notifications(self):
464 # When the subscription is held for moderator approval, and the list
465 # is so configured, a notification is sent to the list moderators.
466 self._mlist.admin_immed_notify = False
467 self._mlist.subscription_policy = SubscriptionPolicy.moderate
468 anne = self._user_manager.create_address(self._anne)
469 workflow = SubscriptionWorkflow(self._mlist, anne,
470 pre_verified=True,
471 pre_confirmed=True)
472 # Consume the entire state machine.
473 list(workflow)
474 items = get_queue_messages('virgin')
475 self.assertEqual(len(items), 0)
477 def test_send_confirmation(self):
478 # A confirmation message gets sent when the address is not verified.
479 anne = self._user_manager.create_address(self._anne)
480 self.assertIsNone(anne.verified_on)
481 # Run the workflow to model the confirmation step.
482 workflow = SubscriptionWorkflow(self._mlist, anne)
483 list(workflow)
484 items = get_queue_messages('virgin')
485 self.assertEqual(len(items), 1)
486 message = items[0].msg
487 token = workflow.token
488 self.assertEqual(message['Subject'], 'confirm {}'.format(token))
489 self.assertEqual(
490 message['From'], 'test-confirm+{}@example.com'.format(token))
492 def test_send_confirmation_pre_confirmed(self):
493 # A confirmation message gets sent when the address is not verified
494 # but the subscription is pre-confirmed.
495 anne = self._user_manager.create_address(self._anne)
496 self.assertIsNone(anne.verified_on)
497 # Run the workflow to model the confirmation step.
498 workflow = SubscriptionWorkflow(self._mlist, anne, pre_confirmed=True)
499 list(workflow)
500 items = get_queue_messages('virgin')
501 self.assertEqual(len(items), 1)
502 message = items[0].msg
503 token = workflow.token
504 self.assertEqual(
505 message['Subject'], 'confirm {}'.format(workflow.token))
506 self.assertEqual(
507 message['From'], 'test-confirm+{}@example.com'.format(token))
509 def test_send_confirmation_pre_verified(self):
510 # A confirmation message gets sent even when the address is verified
511 # when the subscription must be confirmed.
512 self._mlist.subscription_policy = SubscriptionPolicy.confirm
513 anne = self._user_manager.create_address(self._anne)
514 self.assertIsNone(anne.verified_on)
515 # Run the workflow to model the confirmation step.
516 workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
517 list(workflow)
518 items = get_queue_messages('virgin')
519 self.assertEqual(len(items), 1)
520 message = items[0].msg
521 token = workflow.token
522 self.assertEqual(
523 message['Subject'], 'confirm {}'.format(workflow.token))
524 self.assertEqual(
525 message['From'], 'test-confirm+{}@example.com'.format(token))
527 def test_do_confirm_verify_address(self):
528 # The address is not yet verified, nor are we pre-verifying. A
529 # confirmation message will be sent. When the user confirms their
530 # subscription request, the address will end up being verified.
531 anne = self._user_manager.create_address(self._anne)
532 self.assertIsNone(anne.verified_on)
533 # Run the workflow to model the confirmation step.
534 workflow = SubscriptionWorkflow(self._mlist, anne)
535 list(workflow)
536 # The address is still not verified.
537 self.assertIsNone(anne.verified_on)
538 confirm_workflow = SubscriptionWorkflow(self._mlist)
539 confirm_workflow.token = workflow.token
540 confirm_workflow.restore()
541 confirm_workflow.run_thru('do_confirm_verify')
542 # The address is now verified.
543 self.assertIsNotNone(anne.verified_on)
545 def test_do_confirmation_subscribes_user(self):
546 # Subscriptions to the mailing list must be confirmed. Once that's
547 # done, the user's address (which is not initially verified) gets
548 # subscribed to the mailing list.
549 self._mlist.subscription_policy = SubscriptionPolicy.confirm
550 anne = self._user_manager.create_address(self._anne)
551 self.assertIsNone(anne.verified_on)
552 workflow = SubscriptionWorkflow(self._mlist, anne)
553 list(workflow)
554 # Anne is not yet a member.
555 member = self._mlist.regular_members.get_member(self._anne)
556 self.assertIsNone(member)
557 self.assertIsNone(workflow.member)
558 # The token is owned by the subscriber.
559 self.assertIsNotNone(workflow.token)
560 self.assertEqual(workflow.token_owner, TokenOwner.subscriber)
561 # Confirm.
562 confirm_workflow = SubscriptionWorkflow(self._mlist)
563 confirm_workflow.token = workflow.token
564 confirm_workflow.restore()
565 list(confirm_workflow)
566 self.assertIsNotNone(anne.verified_on)
567 # Anne is now a member.
568 member = self._mlist.regular_members.get_member(self._anne)
569 self.assertEqual(member.address, anne)
570 self.assertEqual(confirm_workflow.member, member)
571 # No further token is needed.
572 self.assertIsNone(confirm_workflow.token)
573 self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one)
575 def test_prevent_confirmation_replay_attacks(self):
576 # Ensure that if the workflow requires two confirmations, e.g. first
577 # the user confirming their subscription, and then the moderator
578 # approving it, that different tokens are used in these two cases.
579 self._mlist.subscription_policy = \
580 SubscriptionPolicy.confirm_then_moderate
581 anne = self._user_manager.create_address(self._anne)
582 workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
583 # Run the state machine up to the first confirmation, and cache the
584 # confirmation token.
585 list(workflow)
586 token = workflow.token
587 # Anne is not yet a member of the mailing list.
588 member = self._mlist.regular_members.get_member(self._anne)
589 self.assertIsNone(member)
590 self.assertIsNone(workflow.member)
591 # The token is owned by the subscriber.
592 self.assertIsNotNone(workflow.token)
593 self.assertEqual(workflow.token_owner, TokenOwner.subscriber)
594 # The old token will not work for moderator approval.
595 moderator_workflow = SubscriptionWorkflow(self._mlist)
596 moderator_workflow.token = token
597 moderator_workflow.restore()
598 list(moderator_workflow)
599 # The token is owned by the moderator.
600 self.assertIsNotNone(moderator_workflow.token)
601 self.assertEqual(moderator_workflow.token_owner, TokenOwner.moderator)
602 # While we wait for the moderator to approve the subscription, note
603 # that there's a new token for the next steps.
604 self.assertNotEqual(token, moderator_workflow.token)
605 # The old token won't work.
606 final_workflow = SubscriptionWorkflow(self._mlist)
607 final_workflow.token = token
608 self.assertRaises(LookupError, final_workflow.restore)
609 # Running this workflow will fail.
610 self.assertRaises(AssertionError, list, final_workflow)
611 # Anne is still not subscribed.
612 member = self._mlist.regular_members.get_member(self._anne)
613 self.assertIsNone(member)
614 self.assertIsNone(final_workflow.member)
615 # However, if we use the new token, her subscription request will be
616 # approved by the moderator.
617 final_workflow.token = moderator_workflow.token
618 final_workflow.restore()
619 list(final_workflow)
620 # And now Anne is a member.
621 member = self._mlist.regular_members.get_member(self._anne)
622 self.assertEqual(member.address.email, self._anne)
623 self.assertEqual(final_workflow.member, member)
624 # No further token is needed.
625 self.assertIsNone(final_workflow.token)
626 self.assertEqual(final_workflow.token_owner, TokenOwner.no_one)
628 def test_confirmation_needed_and_pre_confirmed(self):
629 # The subscription policy is 'confirm' but the subscription is
630 # pre-confirmed so the moderation checks can be skipped.
631 self._mlist.subscription_policy = SubscriptionPolicy.confirm
632 anne = self._user_manager.create_address(self._anne)
633 workflow = SubscriptionWorkflow(
634 self._mlist, anne,
635 pre_verified=True, pre_confirmed=True, pre_approved=True)
636 list(workflow)
637 # Anne was subscribed.
638 self.assertIsNone(workflow.token)
639 self.assertEqual(workflow.token_owner, TokenOwner.no_one)
640 self.assertEqual(workflow.member.address, anne)