Pass SQLAlchemy objects cleanly to Celery tasks
[cds-indico.git] / indico / core / celery / core.py
blob1924d33ad606794d6129d97b5c06f8c43419fc7d
1 # This file is part of Indico.
2 # Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN).
4 # Indico is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License as
6 # published by the Free Software Foundation; either version 3 of the
7 # License, or (at your option) any later version.
9 # Indico is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with Indico; if not, see <http://www.gnu.org/licenses/>.
17 from __future__ import unicode_literals
19 import os
21 from celery import Celery
22 from celery.beat import PersistentScheduler
23 from celery.signals import before_task_publish
24 from sqlalchemy import inspect
26 from indico.core.celery import CELERY_IMPORTS
27 from indico.core.config import Config
28 from indico.core.db import DBMgr, db
29 from indico.util.string import return_ascii
32 class IndicoCelery(Celery):
33 """Celery sweetened with some Indico/Flask-related sugar"""
35 def __init__(self, *args, **kwargs):
36 super(IndicoCelery, self).__init__(*args, **kwargs)
37 self.flask_app = None # set from configure_celery
38 self._patch_task()
40 def init_app(self, app):
41 cfg = Config.getInstance()
42 broker_url = cfg.getCeleryBroker()
43 if not broker_url and not app.config['TESTING']:
44 raise ValueError('Celery broker URL is not set')
45 self.conf['BROKER_URL'] = broker_url
46 self.conf['CELERY_RESULT_BACKEND'] = cfg.getCeleryResultBackend() or broker_url
47 self.conf['CELERYBEAT_SCHEDULER'] = IndicoPersistentScheduler
48 self.conf['CELERYBEAT_SCHEDULE_FILENAME'] = os.path.join(cfg.getTempDir(), 'celerybeat-schedule')
49 self.conf['CELERYD_HIJACK_ROOT_LOGGER'] = False
50 self.conf['CELERY_TIMEZONE'] = cfg.getDefaultTimezone()
51 self.conf['CELERY_IGNORE_RESULT'] = True
52 self.conf['CELERY_STORE_ERRORS_EVEN_IF_IGNORED'] = True
53 self.conf['CELERY_REDIRECT_STDOUTS'] = not app.debug
54 self.conf['CELERY_IMPORTS'] = CELERY_IMPORTS
55 # Pickle isn't pretty but that way we can pass along all types (tz-aware datetimes, sets, etc.)
56 self.conf['CELERY_RESULT_SERIALIZER'] = 'pickle'
57 self.conf['CELERY_TASK_SERIALIZER'] = 'pickle'
58 self.conf['CELERY_ACCEPT_CONTENT'] = ['pickle']
59 # Send emails about failed tasks
60 self.conf['CELERY_SEND_TASK_ERROR_EMAILS'] = True
61 self.conf['ADMINS'] = [('Admin', cfg.getSupportEmail())]
62 self.conf['SERVER_EMAIL'] = 'Celery <{}>'.format(cfg.getNoReplyEmail())
63 self.conf['EMAIL_HOST'] = cfg.getSmtpServer()[0]
64 self.conf['EMAIL_PORT'] = cfg.getSmtpServer()[1]
65 self.conf['EMAIL_USE_TLS'] = cfg.getSmtpUseTLS()
66 self.conf['EMAIL_HOST_USER'] = cfg.getSmtpLogin() or None
67 self.conf['EMAIL_HOST_PASWORD'] = cfg.getSmtpPassword() or None
68 # Allow indico.conf to override settings
69 self.conf.update(cfg.getCeleryConfig())
70 assert self.flask_app is None
71 self.flask_app = app
73 def periodic_task(self, *args, **kwargs):
74 """Decorator to register a periodic task.
76 This behaves like the :meth:`task` decorator, but automatically
77 schedules the task to execute periodically, using extra kwargs
78 as described in the Celery documentation:
79 http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields
80 """
81 def decorator(f):
82 entry = {
83 'schedule': kwargs.pop('run_every'),
84 'args': kwargs.pop('args', ()),
85 'kwargs': kwargs.pop('kwargs', {}),
86 'options': kwargs.pop('options', {}),
87 'relative': kwargs.pop('relative', False)
89 kwargs.setdefault('ignore_result', True)
90 task = self.task(f, *args, **kwargs)
91 entry['task'] = task.name
92 self.conf['CELERYBEAT_SCHEDULE'][task.name] = entry
93 return task
95 return decorator
97 def _patch_task(self):
98 """Patches the `task` decorator to run tasks inside the indico environment"""
99 class IndicoTask(self.Task):
100 abstract = True
102 def __call__(s, *args, **kwargs):
103 with self.flask_app.app_context():
104 with DBMgr.getInstance().global_connection():
105 args = _CelerySAWrapper.unwrap_args(args)
106 kwargs = _CelerySAWrapper.unwrap_kwargs(kwargs)
107 return super(IndicoTask, s).__call__(*args, **kwargs)
109 self.Task = IndicoTask
112 class IndicoPersistentScheduler(PersistentScheduler):
113 """Celery scheduler that allows indico.conf to override specific entries"""
115 def setup_schedule(self):
116 for task_name, entry in Config.getInstance().getScheduledTaskOverride().iteritems():
117 if task_name not in self.app.conf['CELERYBEAT_SCHEDULE']:
118 self.logger.error('Invalid entry in ScheduledTaskOverride: ' + task_name)
119 continue
120 if not entry:
121 del self.app.conf['CELERYBEAT_SCHEDULE'][task_name]
122 elif isinstance(entry, dict):
123 assert entry.get('task') in {None, task_name} # make sure the task name is not changed
124 self.app.conf['CELERYBEAT_SCHEDULE'][task_name].update(entry)
125 else:
126 self.app.conf['CELERYBEAT_SCHEDULE'][task_name]['schedule'] = entry
127 super(IndicoPersistentScheduler, self).setup_schedule()
130 class _CelerySAWrapper(object):
131 """Wrapper to safely pass SQLAlchemy objects to tasks.
133 This is achieved by passing only the model name and its PK values
134 through the Celery serializer and then fetching the actual objects
135 again when executing the task.
137 __slots__ = ('identity_key',)
139 def __init__(self, obj):
140 self.identity_key = inspect(obj).identity_key
142 @property
143 def object(self):
144 obj = self.identity_key[0].get(self.identity_key[1])
145 if obj is None:
146 raise ValueError('Object not in DB: {}'.format(self))
147 return obj
149 @return_ascii
150 def __repr__(self):
151 model, args = self.identity_key
152 return '<{}: {}>'.format(model.__name__, ','.join(map(repr, args)))
154 @classmethod
155 def wrap_args(cls, args):
156 return tuple(cls(x) if isinstance(x, db.Model) else x for x in args)
158 @classmethod
159 def wrap_kwargs(cls, kwargs):
160 return {k: cls(v) if isinstance(v, db.Model) else v for k, v in kwargs.iteritems()}
162 @classmethod
163 def unwrap_args(cls, args):
164 return tuple(x.object if isinstance(x, cls) else x for x in args)
166 @classmethod
167 def unwrap_kwargs(cls, kwargs):
168 return {k: v.object if isinstance(v, cls) else v for k, v in kwargs.iteritems()}
171 @before_task_publish.connect
172 def before_task_publish_signal(*args, **kwargs):
173 body = kwargs['body']
174 body['args'] = _CelerySAWrapper.wrap_args(body['args'])
175 body['kwargs'] = _CelerySAWrapper.wrap_kwargs(body['kwargs'])