1 # Copyright (C) 2001-2023 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <https://www.gnu.org/licenses/>.
22 from datetime
import datetime
23 from lazr
.config
import as_timedelta
24 from mailman
.app
.bounces
import PENDABLE_LIFETIME
25 from mailman
.config
import config
26 from mailman
.core
.runner
import Runner
27 from mailman
.database
.transaction
import dbconnection
, transactional
28 from mailman
.interfaces
.cache
import ICacheManager
29 from mailman
.interfaces
.messages
import IMessageStore
30 from mailman
.interfaces
.pending
import IPendings
31 from mailman
.interfaces
.workflow
import IWorkflowStateManager
32 from mailman
.model
.bounce
import BounceEvent
33 from mailman
.model
.requests
import _Request
34 from mailman
.utilities
.datetime
import now
35 from public
import public
36 from zope
.component
import getUtility
39 dlog
= logging
.getLogger('mailman.debug')
40 tlog
= logging
.getLogger('mailman.task')
44 class TaskRunner(Runner
):
45 """The task runner."""
47 is_queue_runner
= False
49 def __init__(self
, name
, slice=None):
50 super().__init
__(name
, slice)
51 self
.lastrun
= datetime
.min
52 self
.delay
= as_timedelta(config
.mailman
.run_tasks_every
)
55 def _do_periodic(self
):
56 """Invoked periodically by the run() method in the super class."""
57 if self
.lastrun
+ self
.delay
> datetime
.now():
58 return # pragma: nocover
59 self
.lastrun
= datetime
.now()
60 dlog
.debug('Running task runner periodic tasks')
61 self
._evict
_pendings
()
62 self
._evict
_expired
_bounce
_events
()
66 def _get_requests(self
, store
):
67 # Get (id, token) for all requests.
68 results
= store
.query(_Request
).all()
69 yield from [(result
.id, result
.data_hash
) for result
in results
]
72 def _delete_request(self
, store
, id):
73 # Delete the request with id = id.
74 request
= (store
.query(_Request
).get(id))
75 if request
is not None:
78 def _evict_pendings(self
):
79 pendings
= getUtility(IPendings
)
80 wfmanager
= getUtility(IWorkflowStateManager
)
81 opendings
= pendings
.count()
82 owflows
= wfmanager
.count
84 count
= opendings
- pendings
.count()
85 tlog
.info('Task runner evicted %d expired pendings', count
)
86 # Now delete any orphaned workflow states.
87 for token
in wfmanager
.get_all_tokens():
88 if pendings
.confirm(token
, expunge
=False) is None:
89 wfmanager
.discard(token
)
90 count
= owflows
- wfmanager
.count
91 tlog
.info('Task runner deleted %d orphaned workflows', count
)
93 for id, token
in self
._get
_requests
():
94 if pendings
.confirm(token
, expunge
=False) is None:
95 self
._delete
_request
(id)
97 tlog
.info('Task runner deleted %d orphaned requests', count
)
98 # Also, delete any orphaned messages from the message store.
100 for token
, pendable
in pendings
:
102 continue # pragma: nocover
103 mid
= pendable
.get('_mod_message_id')
107 messages
= getUtility(IMessageStore
)
108 for msg
in messages
.messages
:
109 # msg can be None if file is already removed.
111 mid
= msg
.get('message-id')
113 messages
.delete_message(mid
)
115 tlog
.info('Task runner deleted %d orphaned messages', count
)
118 def _evict_expired_bounce_events(self
, store
):
120 for entry
in store
.query(BounceEvent
).all():
121 if not entry
.processed
:
123 if entry
.timestamp
> now() - as_timedelta(PENDABLE_LIFETIME
):
127 tlog
.info('Task runner evicted %d expired bounce events', count
)
129 def _evict_cache(self
):
130 getUtility(ICacheManager
).evict_expired()
131 tlog
.info('Task runner evicted expired cache entries')