Fix mailman shell processing of $PYTHONSTARTUP.
[mailman.git] / src / mailman / testing / helpers.py
blob3edd6647950133235c481f8f50f83d00429b08e4
1 # Copyright (C) 2008-2016 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18 """Various test helpers."""
20 import os
21 import json
22 import time
23 import uuid
24 import errno
25 import shutil
26 import signal
27 import socket
28 import logging
29 import smtplib
30 import datetime
31 import threading
33 from base64 import b64encode
34 from contextlib import contextmanager
35 from email import message_from_string
36 from httplib2 import Http
37 from lazr.config import as_timedelta
38 from mailman import public
39 from mailman.bin.master import Loop as Master
40 from mailman.config import config
41 from mailman.database.transaction import transaction
42 from mailman.email.message import Message
43 from mailman.interfaces.member import MemberRole
44 from mailman.interfaces.messages import IMessageStore
45 from mailman.interfaces.styles import IStyleManager
46 from mailman.interfaces.usermanager import IUserManager
47 from mailman.runners.digest import DigestRunner
48 from mailman.utilities.mailbox import Mailbox
49 from unittest import mock
50 from urllib.error import HTTPError
51 from urllib.parse import urlencode
52 from zope import event
53 from zope.component import getUtility
56 NL = '\n'
59 @public
60 def make_testable_runner(runner_class, name=None, predicate=None):
61 """Create a runner that runs until its queue is empty.
63 :param runner_class: The runner class.
64 :type runner_class: class
65 :param name: Optional queue name; if not given, it is calculated from the
66 class name.
67 :type name: string or None
68 :param predicate: Optional alternative predicate for deciding when to stop
69 the runner. When None (the default) it stops when the queue is empty.
70 :type predicate: callable that gets one argument, the queue runner.
71 :return: A runner instance.
72 """
73 if name is None:
74 assert runner_class.__name__.endswith('Runner'), (
75 'Unparseable runner class name: %s' % runner_class.__name__)
76 name = runner_class.__name__[:-6].lower()
78 class EmptyingRunner(runner_class):
79 """Stop processing when the queue is empty."""
81 def __init__(self, *args, **kws):
82 super().__init__(*args, **kws)
83 # We know it's an EmptyingRunner, so really we want to see the
84 # super class in the log files.
85 self.__class__.__name__ = runner_class.__name__
87 def _do_periodic(self):
88 """Stop when the queue is empty."""
89 if predicate is None:
90 self._stop = (len(self.switchboard.files) == 0)
91 else:
92 self._stop = predicate(self)
94 return EmptyingRunner(name)
97 class _Bag:
98 def __init__(self, **kws):
99 for key, value in kws.items():
100 setattr(self, key, value)
103 @public
104 def get_queue_messages(queue_name, sort_on=None, expected_count=None):
105 """Return and clear all the messages in the given queue.
107 :param queue_name: A string naming a queue.
108 :param sort_on: The message header to sort on. If None (the default),
109 no sorting is performed.
110 :param expected_count: If given and there aren't exactly this number of
111 messages in the queue, raise an AssertionError.
112 :return: A list of 2-tuples where each item contains the message and
113 message metadata.
115 queue = config.switchboards[queue_name]
116 messages = []
117 for filebase in queue.files:
118 msg, msgdata = queue.dequeue(filebase)
119 messages.append(_Bag(msg=msg, msgdata=msgdata))
120 queue.finish(filebase)
121 if expected_count is not None:
122 assert len(messages) == expected_count, 'Wanted {}, got {}'.format(
123 expected_count, len(messages))
124 if sort_on is not None:
125 messages.sort(key=lambda item: str(item.msg[sort_on]))
126 return messages
129 @public
130 def digest_mbox(mlist):
131 """The mailing list's pending digest as a mailbox.
133 :param mlist: The mailing list.
134 :return: The mailing list's pending digest as a mailbox.
136 path = os.path.join(mlist.data_path, 'digest.mmdf')
137 return Mailbox(path)
140 # Remember, Master is mailman.bin.master.Loop.
141 @public
142 class TestableMaster(Master):
143 """A testable master loop watcher."""
145 def __init__(self, start_check=None):
146 """Create a testable master loop watcher.
148 :param start_check: Optional callable used to check whether everything
149 is running as the test expects. Called in `loop()` in the
150 subthread before the event is set. The callback should block
151 until the pass condition is set.
152 :type start_check: Callable taking no arguments, returning nothing.
154 super().__init__(restartable=False, config_file=config.filename)
155 self.start_check = start_check
156 self.event = threading.Event()
157 self.thread = threading.Thread(target=self.loop)
158 self.thread.daemon = True
159 self._started_kids = None
161 def _pause(self):
162 """See `Master`."""
163 # No-op this because the tests generally do not signal the master,
164 # which would mean the signal.pause() never exits.
165 pass
167 def start(self, *runners):
168 """Start the master."""
169 self.start_runners(runners)
170 self.thread.start()
171 # Wait until all the children are definitely started.
172 self.event.wait()
174 def stop(self):
175 """Stop the master by killing all the children."""
176 for pid in self.runner_pids:
177 os.kill(pid, signal.SIGTERM)
178 self.cleanup()
179 self.thread.join()
181 def loop(self):
182 """Wait until all the runners are actually running before looping."""
183 starting_kids = set(self._kids)
184 while starting_kids:
185 for pid in self._kids:
186 try:
187 os.kill(pid, 0)
188 starting_kids.remove(pid)
189 except OSError as error:
190 if error.errno == errno.ESRCH:
191 # The child has not yet started.
192 pass
193 raise
194 # Keeping a copy of all the started child processes for use by the
195 # testing environment, even after all have exited.
196 self._started_kids = set(self._kids)
197 # If there are extra conditions to check, do it now.
198 if self.start_check is not None:
199 self.start_check()
200 # Let the blocking thread know everything's running.
201 self.event.set()
202 super().loop()
204 @property
205 def runner_pids(self):
206 """The pids of all the child runner processes."""
207 yield from self._started_kids
210 class LMTP(smtplib.SMTP):
211 """Like a normal SMTP client, but for LMTP."""
212 def lhlo(self, name=''):
213 self.putcmd('lhlo', name or self.local_hostname)
214 code, msg = self.getreply()
215 self.helo_resp = msg
216 return code, msg
219 @public
220 def get_lmtp_client(quiet=False):
221 """Return a connected LMTP client."""
222 # It's possible the process has started but is not yet accepting
223 # connections. Wait a little while.
224 lmtp = LMTP()
225 # lmtp.debuglevel = 1
226 until = datetime.datetime.now() + as_timedelta(config.devmode.wait)
227 while datetime.datetime.now() < until:
228 try:
229 response = lmtp.connect(
230 config.mta.lmtp_host, int(config.mta.lmtp_port))
231 if not quiet:
232 print(response)
233 return lmtp
234 except IOError as error:
235 if error.errno == errno.ECONNREFUSED:
236 time.sleep(0.1)
237 else:
238 raise
239 else:
240 raise RuntimeError('Connection refused')
243 @public
244 def get_nntp_server(cleanups):
245 """Create and start an NNTP server mock.
247 This can be used to retrieve the posted message for verification.
249 patcher = mock.patch('nntplib.NNTP')
250 server_class = patcher.start()
251 cleanups.append(patcher.stop)
252 nntpd = server_class()
253 # A class for more convenient access to the posted message.
254 class NNTPProxy: # noqa
255 def get_message(self):
256 args = nntpd.post.call_args
257 return specialized_message_from_string(args[0][0].read())
258 return NNTPProxy()
261 @public
262 def wait_for_webservice():
263 """Wait for the REST server to start serving requests."""
264 until = datetime.datetime.now() + as_timedelta(config.devmode.wait)
265 while datetime.datetime.now() < until:
266 try:
267 socket.socket().connect((config.webservice.hostname,
268 int(config.webservice.port)))
269 except IOError as error:
270 if error.errno == errno.ECONNREFUSED:
271 time.sleep(0.1)
272 else:
273 raise
274 else:
275 break
276 else:
277 raise RuntimeError('Connection refused')
280 @public
281 def call_api(url, data=None, method=None, username=None, password=None):
282 """'Call a URL with a given HTTP method and return the resulting object.
284 The object will have been JSON decoded.
286 :param url: The url to open, read, and print.
287 :type url: string
288 :param data: Data to use to POST to a URL.
289 :type data: dict
290 :param method: Alternative HTTP method to use.
291 :type method: str
292 :param username: The HTTP Basic Auth user name. None means use the value
293 from the configuration.
294 :type username: str
295 :param password: The HTTP Basic Auth password. None means use the value
296 from the configuration.
297 :type username: str
298 :return: A 2-tuple containing the JSON decoded content (if there is any,
299 else None) and the response object.
300 :rtype: 2-tuple of (dict, response)
301 :raises HTTPError: when a non-2xx return code is received.
303 headers = {}
304 if data is not None:
305 data = urlencode(data, doseq=True)
306 headers['Content-Type'] = 'application/x-www-form-urlencoded'
307 if method is None:
308 if data is None:
309 method = 'GET'
310 else:
311 method = 'POST'
312 method = method.upper()
313 if method in ('POST', 'PUT', 'PATCH') and data is None:
314 data = urlencode({}, doseq=True)
315 basic_auth = '{0}:{1}'.format(
316 (config.webservice.admin_user if username is None else username),
317 (config.webservice.admin_pass if password is None else password))
318 # b64encode() requires a bytes, but the header value must be str. Do the
319 # necessary conversion dances.
320 token = b64encode(basic_auth.encode('utf-8')).decode('ascii')
321 headers['Authorization'] = 'Basic ' + token
322 response, content = Http().request(url, method, data, headers)
323 # If we did not get a 2xx status code, make this look like a urllib2
324 # exception, for backward compatibility with existing doctests.
325 if response.status // 100 != 2:
326 raise HTTPError(url, response.status, content, response, None)
327 if len(content) == 0:
328 return None, response
329 # XXX Workaround http://bugs.python.org/issue10038
330 content = content.decode('utf-8')
331 return json.loads(content), response
334 @public
335 @contextmanager
336 def event_subscribers(*subscribers):
337 """Temporarily extend the Zope event subscribers list.
339 :param subscribers: A sequence of event subscribers.
340 :type subscribers: sequence of callables, each receiving one argument, the
341 event.
343 old_subscribers = event.subscribers[:]
344 event.subscribers.extend(subscribers)
345 try:
346 yield
347 finally:
348 event.subscribers[:] = old_subscribers
351 @public
352 class configuration:
353 """A decorator/context manager for temporarily setting configurations."""
355 def __init__(self, section, **kws):
356 self._section = section
357 # Most tests don't care about the name given to the temporary
358 # configuration. Usually we'll just craft a random one, but some
359 # tests do care, so give them a hook to set it.
360 if '_configname' in kws:
361 self._uuid = kws.pop('_configname')
362 else:
363 self._uuid = uuid.uuid4().hex
364 self._values = kws.copy()
366 def _apply(self):
367 lines = ['[{0}]'.format(self._section)]
368 for key, value in self._values.items():
369 lines.append('{0}: {1}'.format(key, value))
370 config.push(self._uuid, NL.join(lines))
372 def _remove(self):
373 config.pop(self._uuid)
375 def __enter__(self):
376 self._apply()
378 def __exit__(self, *exc_info):
379 self._remove()
380 # Do not suppress exceptions.
381 return False
383 def __call__(self, func):
384 def wrapper(*args, **kws):
385 self._apply()
386 try:
387 return func(*args, **kws)
388 finally:
389 self._remove()
390 return wrapper
393 @public
394 @contextmanager
395 def temporary_db(db):
396 real_db = config.db
397 config.db = db
398 try:
399 yield
400 finally:
401 config.db = real_db
404 @public
405 class chdir:
406 """A context manager for temporary directory changing."""
407 def __init__(self, directory):
408 self._curdir = None
409 self._directory = directory
411 def __enter__(self):
412 self._curdir = os.getcwd()
413 os.chdir(self._directory)
415 def __exit__(self, exc_type, exc_val, exc_tb):
416 os.chdir(self._curdir)
417 # Don't suppress exceptions.
418 return False
421 @public
422 def subscribe(mlist, first_name, role=MemberRole.member, email=None):
423 """Helper for subscribing a sample person to a mailing list.
425 Returns the newly created member object.
427 user_manager = getUtility(IUserManager)
428 email = ('{}person@example.com'.format(first_name[0].lower())
429 if email is None else email)
430 full_name = '{} Person'.format(first_name)
431 with transaction():
432 person = user_manager.get_user(email)
433 if person is None:
434 address = user_manager.get_address(email)
435 if address is None:
436 person = user_manager.create_user(email, full_name)
437 subscription_address = list(person.addresses)[0]
438 else:
439 subscription_address = address
440 else:
441 subscription_address = list(person.addresses)[0]
442 mlist.subscribe(subscription_address, role)
443 roster = mlist.get_roster(role)
444 return roster.get_member(email)
447 @public
448 def reset_the_world():
449 """Reset everything:
451 * Clear out the database
452 * Remove all residual queue and digest files
453 * Clear the message store
454 * Reset the global style manager
456 This should be as thorough a reset of the system as necessary to keep
457 tests isolated.
459 # Reset the database between tests.
460 config.db._reset()
461 # Remove any digest files and members.txt file (for the file-recips
462 # handler) in the lists' data directories.
463 for dirpath, dirnames, filenames in os.walk(config.LIST_DATA_DIR):
464 for filename in filenames:
465 if filename.endswith('.mmdf') or filename == 'members.txt':
466 os.remove(os.path.join(dirpath, filename))
467 # Remove all residual queue files.
468 for dirpath, dirnames, filenames in os.walk(config.QUEUE_DIR):
469 for filename in filenames:
470 os.remove(os.path.join(dirpath, filename))
471 # Clear out messages in the message store.
472 message_store = getUtility(IMessageStore)
473 with transaction():
474 for message in message_store.messages:
475 message_store.delete_message(message['message-id'])
476 # Delete any other residual messages.
477 for dirpath, dirnames, filenames in os.walk(config.MESSAGES_DIR):
478 for filename in filenames:
479 os.remove(os.path.join(dirpath, filename))
480 shutil.rmtree(dirpath)
481 # Reset the global style manager.
482 getUtility(IStyleManager).populate()
483 # Remove all dynamic header-match rules.
484 config.chains['header-match'].flush()
487 @public
488 def specialized_message_from_string(unicode_text):
489 """Parse text into a message object.
491 This is specialized in the sense that an instance of Mailman's own Message
492 object is returned, and this message object has an attribute
493 `original_size` which is the pre-calculated size in bytes of the message's
494 text representation.
496 Also, the text must be ASCII-only unicode.
498 # This mimic what Switchboard.dequeue() does when parsing a message from
499 # text into a Message instance.
500 original_size = len(unicode_text)
501 message = message_from_string(unicode_text, Message)
502 message.original_size = original_size
503 return message
506 @public
507 class LogFileMark:
508 def __init__(self, log_name):
509 self._log = logging.getLogger(log_name)
510 self._filename = self._log.handlers[0].filename
511 self._filepos = os.stat(self._filename).st_size
513 def readline(self):
514 with open(self._filename) as fp:
515 fp.seek(self._filepos)
516 return fp.readline()
518 def read(self):
519 with open(self._filename) as fp:
520 fp.seek(self._filepos)
521 return fp.read()
524 @public
525 def make_digest_messages(mlist, msg=None):
526 if msg is None:
527 msg = specialized_message_from_string("""\
528 From: anne@example.org
529 To: {listname}
530 Message-ID: <testing>
532 message triggering a digest
533 """.format(listname=mlist.fqdn_listname))
534 mbox_path = os.path.join(mlist.data_path, 'digest.mmdf')
535 config.handlers['to-digest'].process(mlist, msg, {})
536 config.switchboards['digest'].enqueue(
537 msg,
538 listname=mlist.fqdn_listname,
539 digest_path=mbox_path,
540 volume=1, digest_number=1)
541 runner = make_testable_runner(DigestRunner, 'digest')
542 runner.run()
545 @public
546 def set_preferred(user):
547 # Avoid circular imports.
548 from mailman.utilities.datetime import now
549 preferred = list(user.addresses)[0]
550 preferred.verified_on = now()
551 user.preferred_address = preferred
552 return preferred
555 @public
556 @contextmanager
557 def hackenv(envar, new_value):
558 """Hack the environment temporarily, then reset it."""
559 old_value = os.getenv(envar)
560 if new_value is None:
561 if envar in os.environ:
562 del os.environ[envar]
563 else:
564 os.environ[envar] = new_value
565 try:
566 yield
567 finally:
568 if old_value is None:
569 if envar in os.environ:
570 del os.environ[envar]
571 else:
572 os.environ[envar] = old_value