1 # This file is part of Indico.
2 # Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN).
4 # Indico is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License as
6 # published by the Free Software Foundation; either version 3 of the
7 # License, or (at your option) any later version.
9 # Indico is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with Indico; if not, see <http://www.gnu.org/licenses/>.
17 from __future__
import unicode_literals
21 from celery
import Celery
22 from celery
.beat
import PersistentScheduler
23 from celery
.signals
import before_task_publish
24 from sqlalchemy
import inspect
26 from indico
.core
.celery
import CELERY_IMPORTS
27 from indico
.core
.config
import Config
28 from indico
.core
.db
import DBMgr
, db
29 from indico
.util
.string
import return_ascii
32 class IndicoCelery(Celery
):
33 """Celery sweetened with some Indico/Flask-related sugar"""
35 def __init__(self
, *args
, **kwargs
):
36 super(IndicoCelery
, self
).__init
__(*args
, **kwargs
)
37 self
.flask_app
= None # set from configure_celery
40 def init_app(self
, app
):
41 cfg
= Config
.getInstance()
42 broker_url
= cfg
.getCeleryBroker()
43 if not broker_url
and not app
.config
['TESTING']:
44 raise ValueError('Celery broker URL is not set')
45 self
.conf
['BROKER_URL'] = broker_url
46 self
.conf
['CELERY_RESULT_BACKEND'] = cfg
.getCeleryResultBackend() or broker_url
47 self
.conf
['CELERYBEAT_SCHEDULER'] = IndicoPersistentScheduler
48 self
.conf
['CELERYBEAT_SCHEDULE_FILENAME'] = os
.path
.join(cfg
.getTempDir(), 'celerybeat-schedule')
49 self
.conf
['CELERYD_HIJACK_ROOT_LOGGER'] = False
50 self
.conf
['CELERY_TIMEZONE'] = cfg
.getDefaultTimezone()
51 self
.conf
['CELERY_IGNORE_RESULT'] = True
52 self
.conf
['CELERY_STORE_ERRORS_EVEN_IF_IGNORED'] = True
53 self
.conf
['CELERY_REDIRECT_STDOUTS'] = not app
.debug
54 self
.conf
['CELERY_IMPORTS'] = CELERY_IMPORTS
55 # Pickle isn't pretty but that way we can pass along all types (tz-aware datetimes, sets, etc.)
56 self
.conf
['CELERY_RESULT_SERIALIZER'] = 'pickle'
57 self
.conf
['CELERY_TASK_SERIALIZER'] = 'pickle'
58 self
.conf
['CELERY_ACCEPT_CONTENT'] = ['pickle']
59 # Send emails about failed tasks
60 self
.conf
['CELERY_SEND_TASK_ERROR_EMAILS'] = True
61 self
.conf
['ADMINS'] = [('Admin', cfg
.getSupportEmail())]
62 self
.conf
['SERVER_EMAIL'] = 'Celery <{}>'.format(cfg
.getNoReplyEmail())
63 self
.conf
['EMAIL_HOST'] = cfg
.getSmtpServer()[0]
64 self
.conf
['EMAIL_PORT'] = cfg
.getSmtpServer()[1]
65 self
.conf
['EMAIL_USE_TLS'] = cfg
.getSmtpUseTLS()
66 self
.conf
['EMAIL_HOST_USER'] = cfg
.getSmtpLogin() or None
67 self
.conf
['EMAIL_HOST_PASWORD'] = cfg
.getSmtpPassword() or None
68 # Allow indico.conf to override settings
69 self
.conf
.update(cfg
.getCeleryConfig())
70 assert self
.flask_app
is None
73 def periodic_task(self
, *args
, **kwargs
):
74 """Decorator to register a periodic task.
76 This behaves like the :meth:`task` decorator, but automatically
77 schedules the task to execute periodically, using extra kwargs
78 as described in the Celery documentation:
79 http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields
83 'schedule': kwargs
.pop('run_every'),
84 'args': kwargs
.pop('args', ()),
85 'kwargs': kwargs
.pop('kwargs', {}),
86 'options': kwargs
.pop('options', {}),
87 'relative': kwargs
.pop('relative', False)
89 kwargs
.setdefault('ignore_result', True)
90 task
= self
.task(f
, *args
, **kwargs
)
91 entry
['task'] = task
.name
92 self
.conf
['CELERYBEAT_SCHEDULE'][task
.name
] = entry
97 def _patch_task(self
):
98 """Patches the `task` decorator to run tasks inside the indico environment"""
99 class IndicoTask(self
.Task
):
102 def __call__(s
, *args
, **kwargs
):
103 with self
.flask_app
.app_context():
104 with DBMgr
.getInstance().global_connection():
105 args
= _CelerySAWrapper
.unwrap_args(args
)
106 kwargs
= _CelerySAWrapper
.unwrap_kwargs(kwargs
)
107 return super(IndicoTask
, s
).__call
__(*args
, **kwargs
)
109 self
.Task
= IndicoTask
112 class IndicoPersistentScheduler(PersistentScheduler
):
113 """Celery scheduler that allows indico.conf to override specific entries"""
115 def setup_schedule(self
):
116 for task_name
, entry
in Config
.getInstance().getScheduledTaskOverride().iteritems():
117 if task_name
not in self
.app
.conf
['CELERYBEAT_SCHEDULE']:
118 self
.logger
.error('Invalid entry in ScheduledTaskOverride: ' + task_name
)
121 del self
.app
.conf
['CELERYBEAT_SCHEDULE'][task_name
]
122 elif isinstance(entry
, dict):
123 assert entry
.get('task') in {None, task_name
} # make sure the task name is not changed
124 self
.app
.conf
['CELERYBEAT_SCHEDULE'][task_name
].update(entry
)
126 self
.app
.conf
['CELERYBEAT_SCHEDULE'][task_name
]['schedule'] = entry
127 super(IndicoPersistentScheduler
, self
).setup_schedule()
130 class _CelerySAWrapper(object):
131 """Wrapper to safely pass SQLAlchemy objects to tasks.
133 This is achieved by passing only the model name and its PK values
134 through the Celery serializer and then fetching the actual objects
135 again when executing the task.
137 __slots__
= ('identity_key',)
139 def __init__(self
, obj
):
140 self
.identity_key
= inspect(obj
).identity_key
144 obj
= self
.identity_key
[0].get(self
.identity_key
[1])
146 raise ValueError('Object not in DB: {}'.format(self
))
151 model
, args
= self
.identity_key
152 return '<{}: {}>'.format(model
.__name
__, ','.join(map(repr, args
)))
155 def wrap_args(cls
, args
):
156 return tuple(cls(x
) if isinstance(x
, db
.Model
) else x
for x
in args
)
159 def wrap_kwargs(cls
, kwargs
):
160 return {k
: cls(v
) if isinstance(v
, db
.Model
) else v
for k
, v
in kwargs
.iteritems()}
163 def unwrap_args(cls
, args
):
164 return tuple(x
.object if isinstance(x
, cls
) else x
for x
in args
)
167 def unwrap_kwargs(cls
, kwargs
):
168 return {k
: v
.object if isinstance(v
, cls
) else v
for k
, v
in kwargs
.iteritems()}
171 @before_task_publish.connect
172 def before_task_publish_signal(*args
, **kwargs
):
173 body
= kwargs
['body']
174 body
['args'] = _CelerySAWrapper
.wrap_args(body
['args'])
175 body
['kwargs'] = _CelerySAWrapper
.wrap_kwargs(body
['kwargs'])