Merge branch 'alias' into 'master'
[mailman.git] / src / mailman / core / runner.py
blob19686d101b5566dce132fa9cae19ac84049805ad
1 # Copyright (C) 2001-2019 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18 """The process runner base class."""
20 import time
21 import signal
22 import logging
23 import traceback
25 from contextlib import suppress
26 from io import StringIO
27 from lazr.config import as_boolean, as_timedelta
28 from mailman.config import config
29 from mailman.core.i18n import _
30 from mailman.core.logging import reopen
31 from mailman.core.switchboard import Switchboard
32 from mailman.interfaces.languages import ILanguageManager
33 from mailman.interfaces.listmanager import IListManager
34 from mailman.interfaces.runner import (
35 IRunner, RunnerCrashEvent, RunnerInterrupt)
36 from mailman.utilities.string import expand
37 from public import public
38 from zope.component import getUtility
39 from zope.event import notify
40 from zope.interface import implementer
43 dlog = logging.getLogger('mailman.debug')
44 elog = logging.getLogger('mailman.error')
45 rlog = logging.getLogger('mailman.runner')
48 @public
49 @implementer(IRunner)
50 class Runner:
51 is_queue_runner = True
53 def __init__(self, name, slice=None):
54 """Create a runner.
56 :param slice: The slice number for this runner. This is passed
57 directly to the underlying `ISwitchboard` object. This is ignored
58 for runners that don't manage a queue.
59 :type slice: int or None
60 """
61 # Grab the configuration section.
62 self.name = name
63 section = getattr(config, 'runner.' + name)
64 substitutions = config.paths
65 substitutions['name'] = name
66 numslices = int(section.instances)
67 # Check whether the runner is queue runner or not; non-queue runner
68 # should not have queue_directory or switchboard instance.
69 if self.is_queue_runner:
70 self.queue_directory = expand(section.path, None, substitutions)
71 self.switchboard = Switchboard(
72 name, self.queue_directory, slice, numslices, True)
73 else:
74 self.queue_directory = None
75 self.switchboard = None
76 self.sleep_time = as_timedelta(section.sleep_time)
77 # sleep_time is a timedelta; turn it into a float for time.sleep().
78 self.sleep_float = (86400 * self.sleep_time.days +
79 self.sleep_time.seconds +
80 self.sleep_time.microseconds / 1.0e6)
81 self.max_restarts = int(section.max_restarts)
82 self.start = as_boolean(section.start)
83 self._stop = False
84 self.status = 0
86 def __repr__(self):
87 return '<{} at {:#x}>'.format(self.__class__.__name__, id(self))
89 def signal_handler(self, signum, frame): # pragma: nocover
90 signame = {
91 signal.SIGTERM: 'SIGTERM',
92 signal.SIGINT: 'SIGINT',
93 signal.SIGUSR1: 'SIGUSR1',
94 }.get(signum, signum)
95 if signum == signal.SIGHUP:
96 reopen()
97 rlog.info('{} runner caught SIGHUP. Reopening logs.'.format(
98 self.name))
99 elif signum in (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1):
100 self.stop()
101 self.status = signum
102 rlog.info('{} runner caught {}. Stopping.'.format(
103 self.name, signame))
104 # As of Python 3.5, PEP 475 gets in our way. Runners with long
105 # time.sleep()'s in their _snooze() method (e.g. the retry runner)
106 # will have their system call implemented time.sleep()
107 # automatically retried at the C layer. The only reliable way to
108 # prevent this is to raise an exception in the signal handler. The
109 # standard run() method automatically suppresses this exception,
110 # meaning, it's caught and ignored, but effectively breaks the
111 # run() loop, which is just what we want. Runners which implement
112 # their own run() method must be prepared to catch
113 # RunnerInterrupts, usually also ignoring them.
114 raise RunnerInterrupt
116 def set_signals(self):
117 """See `IRunner`."""
118 signal.signal(signal.SIGHUP, self.signal_handler)
119 signal.signal(signal.SIGINT, self.signal_handler)
120 signal.signal(signal.SIGTERM, self.signal_handler)
121 signal.signal(signal.SIGUSR1, self.signal_handler)
123 def stop(self):
124 """See `IRunner`."""
125 self._stop = True
127 def run(self):
128 """See `IRunner`."""
129 # Start the main loop for this runner.
130 with suppress(KeyboardInterrupt, RunnerInterrupt):
131 while True:
132 # Once through the loop that processes all the files in the
133 # queue directory.
134 filecnt = self._one_iteration()
135 # Do the periodic work for the subclass.
136 self._do_periodic()
137 # If the stop flag is set, we're done.
138 if self._stop:
139 break
140 # Give the runner an opportunity to snooze for a while, but
141 # pass it the file count so it can decide whether to do more
142 # work now or not.
143 self._snooze(filecnt)
144 self._clean_up()
146 def _one_iteration(self):
147 """See `IRunner`."""
148 me = self.__class__.__name__
149 dlog.debug('[%s] starting oneloop', me)
150 # List all the files in our queue directory. The switchboard is
151 # guaranteed to hand us the files in FIFO order.
152 files = self.switchboard.files
153 for filebase in files:
154 dlog.debug('[%s] processing filebase: %s', me, filebase)
155 try:
156 # Ask the switchboard for the message and metadata objects
157 # associated with this queue file.
158 msg, msgdata = self.switchboard.dequeue(filebase)
159 except Exception as error:
160 # This used to just catch email.Errors.MessageParseError, but
161 # other problems can occur in message parsing, e.g.
162 # ValueError, and exceptions can occur in unpickling too. We
163 # don't want the runner to die, so we just log and skip this
164 # entry, but preserve it for analysis.
165 self._log(error)
166 elog.error('Skipping and preserving unparseable message: %s',
167 filebase)
168 self.switchboard.finish(filebase, preserve=True)
169 config.db.abort()
170 continue
171 try:
172 dlog.debug('[%s] processing onefile', me)
173 self._process_one_file(msg, msgdata)
174 dlog.debug('[%s] finishing filebase: %s', me, filebase)
175 self.switchboard.finish(filebase)
176 except Exception as error:
177 # All runners that implement _dispose() must guarantee that
178 # exceptions are caught and dealt with properly. Still, there
179 # may be a bug in the infrastructure, and we do not want those
180 # to cause messages to be lost. Any uncaught exceptions will
181 # cause the message to be stored in the shunt queue for human
182 # intervention.
183 self._log(error)
184 # Put a marker in the metadata for unshunting.
185 msgdata['whichq'] = self.switchboard.name
186 # It is possible that shunting can throw an exception, e.g. a
187 # permissions problem or a MemoryError due to a really large
188 # message. Try to be graceful.
189 try:
190 shunt = config.switchboards['shunt']
191 new_filebase = shunt.enqueue(msg, msgdata)
192 elog.error('SHUNTING: %s', new_filebase)
193 self.switchboard.finish(filebase)
194 except Exception as error:
195 # The message wasn't successfully shunted. Log the
196 # exception and try to preserve the original queue entry
197 # for possible analysis.
198 self._log(error)
199 elog.error(
200 'SHUNTING FAILED, preserving original entry: %s',
201 filebase)
202 self.switchboard.finish(filebase, preserve=True)
203 config.db.abort()
204 # Other work we want to do each time through the loop.
205 dlog.debug('[%s] doing periodic', me)
206 self._do_periodic()
207 dlog.debug('[%s] committing transaction', me)
208 config.db.commit()
209 dlog.debug('[%s] checking short circuit', me)
210 if self._short_circuit():
211 dlog.debug('[%s] short circuiting', me)
212 break
213 dlog.debug('[%s] ending oneloop: %s', me, len(files))
214 return len(files)
216 def _process_one_file(self, msg, msgdata):
217 """See `IRunner`."""
218 # Do some common sanity checking on the message metadata. It's got to
219 # be destined for a particular mailing list. This switchboard is used
220 # to shunt off badly formatted messages. We don't want to just trash
221 # them because they may be fixable with human intervention. Just get
222 # them out of our sight.
224 # Find out which mailing list this message is destined for.
225 mlist = None
226 missing = object()
227 # First try to dig out the target list by id. If there's no list-id
228 # in the metadata, fall back to the fqdn list name for backward
229 # compatibility.
230 list_manager = getUtility(IListManager)
231 list_id = msgdata.get('listid', missing)
232 fqdn_listname = None
233 if list_id is missing:
234 fqdn_listname = msgdata.get('listname', missing)
235 # XXX Deprecate.
236 if fqdn_listname is not missing:
237 mlist = list_manager.get(fqdn_listname)
238 else:
239 mlist = list_manager.get_by_list_id(list_id)
240 if mlist is None:
241 identifier = (list_id if list_id is not None else fqdn_listname)
242 elog.error(
243 '%s runner "%s" shunting message for missing list: %s',
244 msg['message-id'], self.name, identifier)
245 config.switchboards['shunt'].enqueue(msg, msgdata)
246 return
247 # Now process this message. We also want to set up the language
248 # context for this message. The context will be the preferred
249 # language for the user if the sender is a member of the list, or it
250 # will be the list's preferred language. However, we must take
251 # special care to reset the defaults, otherwise subsequent messages
252 # may be translated incorrectly.
253 if mlist is None:
254 language_manager = getUtility(ILanguageManager)
255 language = language_manager[config.mailman.default_language]
256 elif msg.sender:
257 member = mlist.members.get_member(msg.sender)
258 language = (member.preferred_language
259 if member is not None
260 else mlist.preferred_language)
261 else:
262 language = mlist.preferred_language
263 with _.using(language.code):
264 msgdata['lang'] = language.code
265 try:
266 keepqueued = self._dispose(mlist, msg, msgdata)
267 except Exception as error:
268 # Trigger the Zope event and re-raise
269 notify(RunnerCrashEvent(self, mlist, msg, msgdata, error))
270 raise
271 if keepqueued:
272 self.switchboard.enqueue(msg, msgdata)
274 def _log(self, exc):
275 elog.error('Uncaught runner exception: %s', exc)
276 s = StringIO()
277 traceback.print_exc(file=s)
278 elog.error('%s', s.getvalue())
280 def _clean_up(self):
281 """See `IRunner`."""
282 pass
284 def _dispose(self, mlist, msg, msgdata):
285 """See `IRunner`."""
286 raise NotImplementedError
288 def _do_periodic(self):
289 """See `IRunner`."""
290 pass
292 def _snooze(self, filecnt):
293 """See `IRunner`."""
294 if filecnt or self.sleep_float <= 0:
295 return
296 time.sleep(self.sleep_float)
298 def _short_circuit(self):
299 """See `IRunner`."""
300 return self._stop