Fix GitLab pipeline status badge link.
[mailman.git] / src / mailman / testing / helpers.py
blob87f591f29a53f4a6caf228564561e7006a9f6992
1 # Copyright (C) 2008-2023 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <https://www.gnu.org/licenses/>.
18 """Various test helpers."""
20 import os
21 import sys
22 import time
23 import uuid
24 import shutil
25 import signal
26 import socket
27 import logging
28 import smtplib
29 import datetime
30 import threading
32 from contextlib import contextmanager, suppress
33 from email import message_from_bytes, message_from_string
34 from lazr.config import as_timedelta
35 from mailman.bin.master import Loop as Master
36 from mailman.config import config
37 from mailman.database.transaction import transaction
38 from mailman.email.message import Message
39 from mailman.interfaces.action import Action
40 from mailman.interfaces.member import DeliveryStatus, MemberRole
41 from mailman.interfaces.messages import IMessageStore
42 from mailman.interfaces.styles import IStyleManager
43 from mailman.interfaces.usermanager import IUserManager
44 from mailman.runners.digest import DigestRunner
45 from mailman.utilities.mailbox import Mailbox
46 from public import public
47 from requests import request
48 from unittest import mock
49 from urllib.error import HTTPError
50 from zope import event
51 from zope.component import getUtility
54 NL = '\n'
57 @public
58 def make_testable_runner(runner_class, name=None, predicate=None):
59 """Create a runner that runs until its queue is empty.
61 :param runner_class: The runner class.
62 :type runner_class: class
63 :param name: Optional queue name; if not given, it is calculated from the
64 class name.
65 :type name: string or None
66 :param predicate: Optional alternative predicate for deciding when to stop
67 the runner. When None (the default) it stops when the queue is empty.
68 :type predicate: callable that gets one argument, the queue runner.
69 :return: A runner instance.
70 """
71 if name is None:
72 assert runner_class.__name__.endswith('Runner'), (
73 'Unparseable runner class name: %s' % runner_class.__name__)
74 name = runner_class.__name__[:-6].lower()
76 class EmptyingRunner(runner_class):
77 """Stop processing when the queue is empty."""
79 def __init__(self, *args, **kws):
80 super().__init__(*args, **kws)
81 # We know it's an EmptyingRunner, so really we want to see the
82 # super class in the log files.
83 self.__class__.__name__ = runner_class.__name__
85 def _do_periodic(self):
86 """Stop when the queue is empty."""
87 super()._do_periodic()
88 if predicate is None:
89 if self.switchboard is None:
90 self._stop = True
91 else:
92 self._stop = (len(self.switchboard.files) == 0)
93 else:
94 self._stop = predicate(self)
96 return EmptyingRunner(name)
99 class _Bag:
100 def __init__(self, **kws):
101 for key, value in kws.items():
102 setattr(self, key, value)
105 @public
106 def get_queue_messages(queue_name, sort_on=None, expected_count=None):
107 """Return and clear all the messages in the given queue.
109 :param queue_name: A string naming a queue.
110 :param sort_on: The message header to sort on. If None (the default),
111 no sorting is performed.
112 :param expected_count: If given and there aren't exactly this number of
113 messages in the queue, raise an AssertionError.
114 :return: A list of 2-tuples where each item contains the message and
115 message metadata.
117 queue = config.switchboards[queue_name]
118 messages = []
119 for filebase in queue.files:
120 msg, msgdata = queue.dequeue(filebase)
121 messages.append(_Bag(msg=msg, msgdata=msgdata))
122 queue.finish(filebase)
123 if expected_count is not None:
124 if len(messages) != expected_count:
125 for item in messages:
126 print(item.msg, file=sys.stderr)
127 raise AssertionError('Wanted {}, got {}'.format(
128 expected_count, len(messages)))
129 if sort_on is not None:
130 messages.sort(key=lambda item: str(item.msg[sort_on]))
131 return messages
134 @public
135 def digest_mbox(mlist):
136 """The mailing list's pending digest as a mailbox.
138 :param mlist: The mailing list.
139 :return: The mailing list's pending digest as a mailbox.
141 path = os.path.join(mlist.data_path, 'digest.mmdf')
142 return Mailbox(path)
145 # Remember, Master is mailman.bin.master.Loop.
146 @public
147 class TestableMaster(Master):
148 """A testable master loop watcher."""
150 def __init__(self, start_check=None):
151 """Create a testable master loop watcher.
153 :param start_check: Optional callable used to check whether everything
154 is running as the test expects. Called in `loop()` in the
155 subthread before the event is set. The callback should block
156 until the pass condition is set.
157 :type start_check: Callable taking no arguments, returning nothing.
159 super().__init__(
160 restartable=False,
161 config_file=os.environ.get('MAILMAN_CONFIG_FILE', config.filename))
162 self.start_check = start_check
163 self.event = threading.Event()
164 self.thread = threading.Thread(target=self.loop)
165 self.thread.daemon = True
166 self._started_kids = None
168 def _pause(self):
169 """See `Master`."""
170 # No-op this because the tests generally do not signal the master,
171 # which would mean the signal.pause() never exits.
172 pass
174 def start(self, *runners):
175 """Start the master."""
176 self.start_runners(runners)
177 self.thread.start()
178 # Wait until all the children are definitely started.
179 self.event.wait()
181 def stop(self):
182 """Stop the master by killing all the children."""
183 for pid in self.runner_pids:
184 os.kill(pid, signal.SIGTERM)
185 self.cleanup()
186 self.thread.join()
188 def loop(self):
189 """Wait until all the runners are actually running before looping."""
190 starting_kids = set(self._kids)
191 while starting_kids:
192 for pid in self._kids:
193 # Ignore the exception which gets raised when the child has
194 # not yet started.
195 with suppress(ProcessLookupError):
196 os.kill(pid, 0)
197 starting_kids.remove(pid)
198 # Keeping a copy of all the started child processes for use by the
199 # testing environment, even after all have exited.
200 self._started_kids = set(self._kids)
201 # If there are extra conditions to check, do it now.
202 if self.start_check is not None:
203 self.start_check()
204 # Let the blocking thread know everything's running.
205 self.event.set()
206 super().loop()
208 @property
209 def runner_pids(self):
210 """The pids of all the child runner processes."""
211 yield from self._started_kids
214 class LMTP(smtplib.SMTP):
215 """Like a normal SMTP client, but for LMTP."""
216 def lhlo(self, name=''):
217 self.putcmd('lhlo', name or self.local_hostname)
218 code, msg = self.getreply()
219 self.helo_resp = msg
220 return code, msg
223 @public
224 def get_lmtp_client(quiet=False):
225 """Return a connected LMTP client."""
226 # It's possible the process has started but is not yet accepting
227 # connections. Wait a little while.
228 lmtp = LMTP()
229 # lmtp.debuglevel = 1
230 until = datetime.datetime.now() + as_timedelta(config.devmode.wait)
231 while datetime.datetime.now() < until:
232 try:
233 response = lmtp.connect(
234 config.mta.lmtp_host, int(config.mta.lmtp_port))
235 if not quiet:
236 print(response)
237 return lmtp
238 except ConnectionRefusedError:
239 time.sleep(0.1)
240 else:
241 raise RuntimeError('Connection refused')
244 @public
245 def get_nntp_server(cleanups):
246 """Create and start an NNTP server mock.
248 This can be used to retrieve the posted message for verification.
250 patcher = mock.patch('nntplib.NNTP')
251 server_class = patcher.start()
252 cleanups.callback(patcher.stop)
253 nntpd = server_class()
254 # A class for more convenient access to the posted message.
255 class NNTPProxy: # noqa: E306
256 def get_message(self):
257 args = nntpd.post.call_args
258 return message_from_bytes(args[0][0].read())
259 return NNTPProxy()
262 @public
263 def wait_for_webservice(hostname=None, port=None):
264 """Wait for the REST server to start serving requests."""
265 hostname = config.webservice.hostname if hostname is None else hostname
266 port = int(config.webservice.port) if port is None else port
267 until = datetime.datetime.now() + as_timedelta(config.devmode.wait)
268 while datetime.datetime.now() < until:
269 try:
270 socket.socket().connect((hostname, port))
271 except ConnectionRefusedError:
272 time.sleep(0.1)
273 else:
274 break
275 else:
276 raise RuntimeError('Connection refused')
279 @public
280 def call_api(url, data=None, method=None, username=None, password=None,
281 headers=None, json=None):
282 """'Call a URL with a given HTTP method and return the resulting object.
284 The object will have been JSON decoded.
286 :param url: The url to open, read, and print.
287 :type url: string
288 :param data: Data to use to POST to a URL.
289 :type data: dict
290 :param method: Alternative HTTP method to use.
291 :type method: str
292 :param username: The HTTP Basic Auth user name. None means use the value
293 from the configuration.
294 :type username: str
295 :param password: The HTTP Basic Auth password. None means use the value
296 from the configuration.
297 :type password: str
298 :param headers: List of additional headers to be included with the request.
299 :type headers: A dictionary of {'Header': 'Value'}
300 :param json: JSON body for the request.
301 :type json: dict
302 :return: A 2-tuple containing the JSON decoded content (if there is any,
303 else None) and the response object.
304 :rtype: 2-tuple of (dict, response)
305 :raises HTTPError: when a non-2xx return code is received.
307 if method is None:
308 if data is None and json is None:
309 method = 'GET'
310 else:
311 method = 'POST'
312 method = method.upper()
313 basic_auth = (
314 (config.webservice.admin_user if username is None else username),
315 (config.webservice.admin_pass if password is None else password))
316 response = request(
317 method, url, data=data, auth=basic_auth, headers=headers, json=json)
318 # For backward compatibility with existing doctests, turn non-2xx response
319 # codes into a urllib.error exceptions.
320 if response.status_code // 100 != 2:
321 content = None
322 content_type = response.headers.get('content-type')
323 if ('application/json' in content_type and
324 response.content):
325 content = response.json().get('description', None)
326 else:
327 content = response.text
328 raise HTTPError(
329 url, response.status_code, content, response, None)
330 if len(response.content) == 0:
331 return None, response
332 return response.json(), response
335 @public
336 @contextmanager
337 def event_subscribers(*subscribers):
338 """Temporarily extend the Zope event subscribers list.
340 :param subscribers: A sequence of event subscribers.
341 :type subscribers: sequence of callables, each receiving one argument, the
342 event.
344 old_subscribers = event.subscribers[:]
345 event.subscribers.extend(subscribers)
346 try:
347 yield
348 finally:
349 event.subscribers[:] = old_subscribers
352 @public
353 class configuration:
354 """A decorator/context manager for temporarily setting configurations."""
356 def __init__(self, section, **kws):
357 self._section = section
358 # Most tests don't care about the name given to the temporary
359 # configuration. Usually we'll just craft a random one, but some
360 # tests do care, so give them a hook to set it.
361 if '_configname' in kws:
362 self._uuid = kws.pop('_configname')
363 else:
364 self._uuid = uuid.uuid4().hex
365 self._values = kws.copy()
367 def _apply(self):
368 lines = ['[{}]'.format(self._section)]
369 for key, value in self._values.items():
370 lines.append('{}: {}'.format(key, value))
371 config.push(self._uuid, NL.join(lines))
373 def _remove(self):
374 config.pop(self._uuid)
376 def __enter__(self):
377 self._apply()
378 return self
380 def __exit__(self, *exc_info):
381 self._remove()
382 # Do not suppress exceptions.
383 return False
385 def __call__(self, func):
386 def wrapper(*args, **kws):
387 self._apply()
388 try:
389 return func(*args, **kws)
390 finally:
391 self._remove()
392 return wrapper
395 @public
396 @contextmanager
397 def temporary_db(db):
398 real_db = config.db
399 config.db = db
400 try:
401 yield
402 finally:
403 config.db = real_db
406 @public
407 class chdir:
408 """A context manager for temporary directory changing."""
409 def __init__(self, directory):
410 self._curdir = None
411 self._directory = directory
413 def __enter__(self):
414 self._curdir = os.getcwd()
415 os.chdir(self._directory)
417 def __exit__(self, exc_type, exc_val, exc_tb):
418 os.chdir(self._curdir)
419 # Don't suppress exceptions.
420 return False
423 @public
424 def subscribe(mlist, first_name, role=MemberRole.member, email=None,
425 as_user=False):
426 """Helper for subscribing a sample person to a mailing list.
428 Returns the newly created member object.
430 user_manager = getUtility(IUserManager)
431 email = ('{}person@example.com'.format(first_name[0].lower())
432 if email is None else email)
433 full_name = '{} Person'.format(first_name)
434 with transaction():
435 person = user_manager.get_user(email)
436 if person is None:
437 address = user_manager.get_address(email)
438 if address is None:
439 person = user_manager.create_user(email, full_name)
440 set_preferred(person)
441 if as_user:
442 subscription_object = person
443 else:
444 subscription_object = person.preferred_address
445 else:
446 if not as_user:
447 subscription_object = address
448 else:
449 person = user_manager.create_user(display_name=full_name)
450 person.link(address)
451 set_preferred(person)
452 subscription_object = person
453 else:
454 if as_user:
455 if person.preferred_address is None:
456 set_preferred(person)
457 subscription_object = person
458 else:
459 subscription_object = list(person.addresses)[0]
460 mlist.subscribe(subscription_object, role)
461 roster = mlist.get_roster(role)
462 return roster.get_member(email)
465 @public
466 def reset_the_world():
467 """Reset everything:
469 * Clear out the database
470 * Remove all residual queue and digest files
471 * Clear the message store
472 * Reset the global style manager
474 This should be as thorough a reset of the system as necessary to keep
475 tests isolated.
477 # Reset the database between tests.
478 config.db._reset()
479 # Remove any digest files and members.txt file (for the file-recips
480 # handler) in the lists' data directories.
481 for dirpath, dirnames, filenames in os.walk(config.LIST_DATA_DIR):
482 for filename in filenames:
483 if filename.endswith('.mmdf') or filename == 'members.txt':
484 os.remove(os.path.join(dirpath, filename))
485 # Remove all residual queue files.
486 for dirpath, dirnames, filenames in os.walk(config.QUEUE_DIR):
487 for filename in filenames:
488 os.remove(os.path.join(dirpath, filename))
489 # Clear out messages in the message store.
490 message_store = getUtility(IMessageStore)
491 with transaction():
492 for message in message_store.messages:
493 message_store.delete_message(message['message-id'])
494 # Delete any other residual messages.
495 for dirpath, dirnames, filenames in os.walk(config.MESSAGES_DIR):
496 for filename in filenames:
497 os.remove(os.path.join(dirpath, filename))
498 shutil.rmtree(dirpath)
499 # Remove all the cache subdirectories, recursively.
500 for dirname in os.listdir(config.CACHE_DIR):
501 shutil.rmtree(os.path.join(config.CACHE_DIR, dirname))
502 # Reset the global style manager.
503 getUtility(IStyleManager).populate()
504 # Remove all dynamic header-match rules.
505 config.chains['header-match'].flush()
506 # Remove cached organizational domain suffix file.
507 from mailman.rules.dmarc import LOCAL_FILE_NAME
508 suffix_file = os.path.join(config.VAR_DIR, LOCAL_FILE_NAME)
509 with suppress(FileNotFoundError):
510 os.remove(suffix_file)
513 @public
514 def specialized_message_from_string(unicode_text):
515 """Parse text into a message object.
517 This is specialized in the sense that an instance of Mailman's own Message
518 object is returned, and this message object has an attribute
519 `original_size` which is the pre-calculated size in bytes of the message's
520 text representation.
522 Also, the text must be ASCII-only unicode.
524 # This mimic what Switchboard.dequeue() does when parsing a message from
525 # text into a Message instance.
526 original_size = len(unicode_text)
527 message = message_from_string(unicode_text, Message)
528 message.original_size = original_size
529 return message
532 @public
533 class LogFileMark:
534 def __init__(self, log_name):
535 self._log = logging.getLogger(log_name)
536 self._filename = self._log.handlers[0].filename
537 self._filepos = os.stat(self._filename).st_size
539 def readline(self):
540 with open(self._filename) as fp:
541 fp.seek(self._filepos)
542 return fp.readline()
544 def read(self):
545 with open(self._filename) as fp:
546 fp.seek(self._filepos)
547 return fp.read()
550 @public
551 def make_digest_messages(mlist, msg=None):
552 if msg is None:
553 msg = specialized_message_from_string("""\
554 From: anne@example.org
555 To: {listname}
556 Message-ID: <testing>
558 message triggering a digest
559 """.format(listname=mlist.fqdn_listname))
560 mbox_path = os.path.join(mlist.data_path, 'digest.mmdf')
561 config.handlers['to-digest'].process(mlist, msg, {})
562 config.switchboards['digest'].enqueue(
563 msg,
564 listname=mlist.fqdn_listname,
565 digest_path=mbox_path,
566 volume=1, digest_number=1)
567 runner = make_testable_runner(DigestRunner, 'digest')
568 runner.run()
571 @public
572 def set_preferred(user):
573 # Avoid circular imports.
574 from mailman.utilities.datetime import now
575 preferred = list(user.addresses)[0]
576 preferred.verified_on = now()
577 user.preferred_address = preferred
578 return preferred
581 @public
582 @contextmanager
583 def hackenv(envar, new_value):
584 """Hack the environment temporarily, then reset it."""
585 old_value = os.getenv(envar)
586 if new_value is None:
587 if envar in os.environ:
588 del os.environ[envar]
589 else:
590 os.environ[envar] = new_value
591 try:
592 yield
593 finally:
594 if old_value is None:
595 if envar in os.environ:
596 del os.environ[envar]
597 else:
598 os.environ[envar] = old_value
601 def nose2_start_test_run_callback(plugin):
602 from mailman.testing.layers import ConfigLayer, MockAndMonkeyLayer
603 MockAndMonkeyLayer.testing_mode = True
604 if (plugin.stderr or
605 len(os.environ.get('MM_VERBOSE_TESTLOG', '').strip()) > 0):
606 ConfigLayer.stderr = True
609 def set_moderation(mlist, subscriber, action):
610 """Set moderation action for the subscriber of mailing list."""
611 member = mlist.members.get_member(subscriber)
612 member.moderation_action = Action[action]
615 def set_delivery(mlist, subscriber, delivery_status):
616 """Set delivery_status to DeliveryStatus"""
617 member = mlist.members.get_member(subscriber)
618 member.preferences.delivery_status = DeliveryStatus[delivery_status]