The task runner now deletes old processed bounce events.
[mailman.git] / src / mailman / runners / task.py
blob39a2a1ecf3eb399bf67896f5df74a5ec91a047cf
1 # Copyright (C) 2001-2023 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <https://www.gnu.org/licenses/>.
18 """Task runner."""
20 import logging
22 from datetime import datetime
23 from lazr.config import as_timedelta
24 from mailman.app.bounces import PENDABLE_LIFETIME
25 from mailman.config import config
26 from mailman.core.runner import Runner
27 from mailman.database.transaction import dbconnection, transactional
28 from mailman.interfaces.cache import ICacheManager
29 from mailman.interfaces.messages import IMessageStore
30 from mailman.interfaces.pending import IPendings
31 from mailman.interfaces.workflow import IWorkflowStateManager
32 from mailman.model.bounce import BounceEvent
33 from mailman.model.requests import _Request
34 from mailman.utilities.datetime import now
35 from public import public
36 from zope.component import getUtility
39 dlog = logging.getLogger('mailman.debug')
40 tlog = logging.getLogger('mailman.task')
43 @public
44 class TaskRunner(Runner):
45 """The task runner."""
47 is_queue_runner = False
49 def __init__(self, name, slice=None):
50 super().__init__(name, slice)
51 self.lastrun = datetime.min
52 self.delay = as_timedelta(config.mailman.run_tasks_every)
54 @transactional
55 def _do_periodic(self):
56 """Invoked periodically by the run() method in the super class."""
57 if self.lastrun + self.delay > datetime.now():
58 return # pragma: nocover
59 self.lastrun = datetime.now()
60 dlog.debug('Running task runner periodic tasks')
61 self._evict_pendings()
62 self._evict_expired_bounce_events()
63 self._evict_cache()
65 @dbconnection
66 def _get_requests(self, store):
67 # Get (id, token) for all requests.
68 results = store.query(_Request).all()
69 yield from [(result.id, result.data_hash) for result in results]
71 @dbconnection
72 def _delete_request(self, store, id):
73 # Delete the request with id = id.
74 request = (store.query(_Request).get(id))
75 if request is not None:
76 store.delete(request)
78 def _evict_pendings(self):
79 pendings = getUtility(IPendings)
80 wfmanager = getUtility(IWorkflowStateManager)
81 opendings = pendings.count()
82 owflows = wfmanager.count
83 pendings.evict()
84 count = opendings - pendings.count()
85 tlog.info('Task runner evicted %d expired pendings', count)
86 # Now delete any orphaned workflow states.
87 for token in wfmanager.get_all_tokens():
88 if pendings.confirm(token, expunge=False) is None:
89 wfmanager.discard(token)
90 count = owflows - wfmanager.count
91 tlog.info('Task runner deleted %d orphaned workflows', count)
92 count = 0
93 for id, token in self._get_requests():
94 if pendings.confirm(token, expunge=False) is None:
95 self._delete_request(id)
96 count += 1
97 tlog.info('Task runner deleted %d orphaned requests', count)
98 # Also, delete any orphaned messages from the message store.
99 mids = dict()
100 for token, pendable in pendings:
101 if not pendable:
102 continue # pragma: nocover
103 mid = pendable.get('_mod_message_id')
104 if mid:
105 mids[mid] = True
106 count = 0
107 messages = getUtility(IMessageStore)
108 for msg in messages.messages:
109 # msg can be None if file is already removed.
110 if msg is not None:
111 mid = msg.get('message-id')
112 if mid not in mids:
113 messages.delete_message(mid)
114 count += 1
115 tlog.info('Task runner deleted %d orphaned messages', count)
117 @dbconnection
118 def _evict_expired_bounce_events(self, store):
119 count = 0
120 for entry in store.query(BounceEvent).all():
121 if not entry.processed:
122 continue
123 if entry.timestamp > now() - as_timedelta(PENDABLE_LIFETIME):
124 continue
125 store.delete(entry)
126 count += 1
127 tlog.info('Task runner evicted %d expired bounce events', count)
129 def _evict_cache(self):
130 getUtility(ICacheManager).evict_expired()
131 tlog.info('Task runner evicted expired cache entries')