1 # Copyright (C) 2001-2019 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18 """The process runner base class."""
25 from contextlib
import suppress
26 from io
import StringIO
27 from lazr
.config
import as_boolean
, as_timedelta
28 from mailman
.config
import config
29 from mailman
.core
.i18n
import _
30 from mailman
.core
.logging
import reopen
31 from mailman
.core
.switchboard
import Switchboard
32 from mailman
.interfaces
.languages
import ILanguageManager
33 from mailman
.interfaces
.listmanager
import IListManager
34 from mailman
.interfaces
.runner
import (
35 IRunner
, RunnerCrashEvent
, RunnerInterrupt
)
36 from mailman
.utilities
.string
import expand
37 from public
import public
38 from zope
.component
import getUtility
39 from zope
.event
import notify
40 from zope
.interface
import implementer
43 dlog
= logging
.getLogger('mailman.debug')
44 elog
= logging
.getLogger('mailman.error')
45 rlog
= logging
.getLogger('mailman.runner')
51 is_queue_runner
= True
53 def __init__(self
, name
, slice=None):
56 :param slice: The slice number for this runner. This is passed
57 directly to the underlying `ISwitchboard` object. This is ignored
58 for runners that don't manage a queue.
59 :type slice: int or None
61 # Grab the configuration section.
63 section
= getattr(config
, 'runner.' + name
)
64 substitutions
= config
.paths
65 substitutions
['name'] = name
66 numslices
= int(section
.instances
)
67 # Check whether the runner is queue runner or not; non-queue runner
68 # should not have queue_directory or switchboard instance.
69 if self
.is_queue_runner
:
70 self
.queue_directory
= expand(section
.path
, None, substitutions
)
71 self
.switchboard
= Switchboard(
72 name
, self
.queue_directory
, slice, numslices
, True)
74 self
.queue_directory
= None
75 self
.switchboard
= None
76 self
.sleep_time
= as_timedelta(section
.sleep_time
)
77 # sleep_time is a timedelta; turn it into a float for time.sleep().
78 self
.sleep_float
= (86400 * self
.sleep_time
.days
+
79 self
.sleep_time
.seconds
+
80 self
.sleep_time
.microseconds
/ 1.0e6
)
81 self
.max_restarts
= int(section
.max_restarts
)
82 self
.start
= as_boolean(section
.start
)
87 return '<{} at {:#x}>'.format(self
.__class
__.__name
__, id(self
))
89 def signal_handler(self
, signum
, frame
): # pragma: nocover
91 signal
.SIGTERM
: 'SIGTERM',
92 signal
.SIGINT
: 'SIGINT',
93 signal
.SIGUSR1
: 'SIGUSR1',
95 if signum
== signal
.SIGHUP
:
97 rlog
.info('{} runner caught SIGHUP. Reopening logs.'.format(
99 elif signum
in (signal
.SIGTERM
, signal
.SIGINT
, signal
.SIGUSR1
):
102 rlog
.info('{} runner caught {}. Stopping.'.format(
104 # As of Python 3.5, PEP 475 gets in our way. Runners with long
105 # time.sleep()'s in their _snooze() method (e.g. the retry runner)
106 # will have their system call implemented time.sleep()
107 # automatically retried at the C layer. The only reliable way to
108 # prevent this is to raise an exception in the signal handler. The
109 # standard run() method automatically suppresses this exception,
110 # meaning, it's caught and ignored, but effectively breaks the
111 # run() loop, which is just what we want. Runners which implement
112 # their own run() method must be prepared to catch
113 # RunnerInterrupts, usually also ignoring them.
114 raise RunnerInterrupt
116 def set_signals(self
):
118 signal
.signal(signal
.SIGHUP
, self
.signal_handler
)
119 signal
.signal(signal
.SIGINT
, self
.signal_handler
)
120 signal
.signal(signal
.SIGTERM
, self
.signal_handler
)
121 signal
.signal(signal
.SIGUSR1
, self
.signal_handler
)
129 # Start the main loop for this runner.
130 with
suppress(KeyboardInterrupt, RunnerInterrupt
):
132 # Once through the loop that processes all the files in the
134 filecnt
= self
._one
_iteration
()
135 # Do the periodic work for the subclass.
137 # If the stop flag is set, we're done.
140 # Give the runner an opportunity to snooze for a while, but
141 # pass it the file count so it can decide whether to do more
143 self
._snooze
(filecnt
)
146 def _one_iteration(self
):
148 me
= self
.__class
__.__name
__
149 dlog
.debug('[%s] starting oneloop', me
)
150 # List all the files in our queue directory. The switchboard is
151 # guaranteed to hand us the files in FIFO order.
152 files
= self
.switchboard
.files
153 for filebase
in files
:
154 dlog
.debug('[%s] processing filebase: %s', me
, filebase
)
156 # Ask the switchboard for the message and metadata objects
157 # associated with this queue file.
158 msg
, msgdata
= self
.switchboard
.dequeue(filebase
)
159 except Exception as error
:
160 # This used to just catch email.Errors.MessageParseError, but
161 # other problems can occur in message parsing, e.g.
162 # ValueError, and exceptions can occur in unpickling too. We
163 # don't want the runner to die, so we just log and skip this
164 # entry, but preserve it for analysis.
166 elog
.error('Skipping and preserving unparseable message: %s',
168 self
.switchboard
.finish(filebase
, preserve
=True)
172 dlog
.debug('[%s] processing onefile', me
)
173 self
._process
_one
_file
(msg
, msgdata
)
174 dlog
.debug('[%s] finishing filebase: %s', me
, filebase
)
175 self
.switchboard
.finish(filebase
)
176 except Exception as error
:
177 # All runners that implement _dispose() must guarantee that
178 # exceptions are caught and dealt with properly. Still, there
179 # may be a bug in the infrastructure, and we do not want those
180 # to cause messages to be lost. Any uncaught exceptions will
181 # cause the message to be stored in the shunt queue for human
184 # Put a marker in the metadata for unshunting.
185 msgdata
['whichq'] = self
.switchboard
.name
186 # It is possible that shunting can throw an exception, e.g. a
187 # permissions problem or a MemoryError due to a really large
188 # message. Try to be graceful.
190 shunt
= config
.switchboards
['shunt']
191 new_filebase
= shunt
.enqueue(msg
, msgdata
)
192 elog
.error('SHUNTING: %s', new_filebase
)
193 self
.switchboard
.finish(filebase
)
194 except Exception as error
:
195 # The message wasn't successfully shunted. Log the
196 # exception and try to preserve the original queue entry
197 # for possible analysis.
200 'SHUNTING FAILED, preserving original entry: %s',
202 self
.switchboard
.finish(filebase
, preserve
=True)
204 # Other work we want to do each time through the loop.
205 dlog
.debug('[%s] doing periodic', me
)
207 dlog
.debug('[%s] committing transaction', me
)
209 dlog
.debug('[%s] checking short circuit', me
)
210 if self
._short
_circuit
():
211 dlog
.debug('[%s] short circuiting', me
)
213 dlog
.debug('[%s] ending oneloop: %s', me
, len(files
))
216 def _process_one_file(self
, msg
, msgdata
):
218 # Do some common sanity checking on the message metadata. It's got to
219 # be destined for a particular mailing list. This switchboard is used
220 # to shunt off badly formatted messages. We don't want to just trash
221 # them because they may be fixable with human intervention. Just get
222 # them out of our sight.
224 # Find out which mailing list this message is destined for.
227 # First try to dig out the target list by id. If there's no list-id
228 # in the metadata, fall back to the fqdn list name for backward
230 list_manager
= getUtility(IListManager
)
231 list_id
= msgdata
.get('listid', missing
)
233 if list_id
is missing
:
234 fqdn_listname
= msgdata
.get('listname', missing
)
236 if fqdn_listname
is not missing
:
237 mlist
= list_manager
.get(fqdn_listname
)
239 mlist
= list_manager
.get_by_list_id(list_id
)
241 identifier
= (list_id
if list_id
is not None else fqdn_listname
)
243 '%s runner "%s" shunting message for missing list: %s',
244 msg
['message-id'], self
.name
, identifier
)
245 config
.switchboards
['shunt'].enqueue(msg
, msgdata
)
247 # Now process this message. We also want to set up the language
248 # context for this message. The context will be the preferred
249 # language for the user if the sender is a member of the list, or it
250 # will be the list's preferred language. However, we must take
251 # special care to reset the defaults, otherwise subsequent messages
252 # may be translated incorrectly.
254 language_manager
= getUtility(ILanguageManager
)
255 language
= language_manager
[config
.mailman
.default_language
]
257 member
= mlist
.members
.get_member(msg
.sender
)
258 language
= (member
.preferred_language
259 if member
is not None
260 else mlist
.preferred_language
)
262 language
= mlist
.preferred_language
263 with _
.using(language
.code
):
264 msgdata
['lang'] = language
.code
266 keepqueued
= self
._dispose
(mlist
, msg
, msgdata
)
267 except Exception as error
:
268 # Trigger the Zope event and re-raise
269 notify(RunnerCrashEvent(self
, mlist
, msg
, msgdata
, error
))
272 self
.switchboard
.enqueue(msg
, msgdata
)
275 elog
.error('Uncaught runner exception: %s', exc
)
277 traceback
.print_exc(file=s
)
278 elog
.error('%s', s
.getvalue())
284 def _dispose(self
, mlist
, msg
, msgdata
):
286 raise NotImplementedError
288 def _do_periodic(self
):
292 def _snooze(self
, filecnt
):
294 if filecnt
or self
.sleep_float
<= 0:
296 time
.sleep(self
.sleep_float
)
298 def _short_circuit(self
):