Remove obsolete test code from setup.py
[cds-indico.git] / indico / modules / scheduler / daemon_script.py
blobb7f2e518b4b11f8ff8cc99331979cc31dac3ef89
1 # This file is part of Indico.
2 # Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN).
4 # Indico is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License as
6 # published by the Free Software Foundation; either version 3 of the
7 # License, or (at your option) any later version.
9 # Indico is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with Indico; if not, see <http://www.gnu.org/licenses/>.
17 """
18 This script starts an Indico Scheduler instance, forking it off as a background
19 process.
20 """
22 import argparse
23 import ConfigParser
24 import logging
25 import multiprocessing
26 import os
27 import socket
28 import sys
29 import time
30 import warnings
31 from logging.handlers import SMTPHandler
33 from indico.modules.scheduler import Scheduler, SchedulerModule, Client, base
34 from indico.web.flask.app import make_app
36 # legacy import
37 from indico.core.config import Config
38 from indico.core.db import DBMgr
41 class SchedulerApp(object):
43 def __init__(self, args):
44 super(SchedulerApp, self).__init__()
45 self.args = args
46 config = Config.getInstance()
47 worker = config.getWorkerName()
49 cp = ConfigParser.ConfigParser()
50 logging_conf_file = os.path.join(config.getConfigurationDir(), "logging.conf")
51 cp.read(logging_conf_file)
53 if cp.has_option('handler_smtp', 'args'):
54 # get e-mail from logging config file
55 log_mail = eval(cp.get('handler_smtp', 'args'))[2]
56 else:
57 log_mail = config.getSupportEmail()
59 self.mailer = SMTPHandler(config.getSmtpServer(),
60 'scheduler@%s' % worker,
61 log_mail,
62 "[indico_scheduler] Problem at %s" % worker)
64 self.mailer.setLevel(logging.ERROR)
66 def run(self):
67 root_logger = logging.getLogger('')
68 root_logger.addHandler(self.mailer)
70 logger = logging.getLogger('daemon')
71 config = {}
72 if Config.getInstance().getDebug():
73 config['sleep_interval'] = 1
74 try:
75 Scheduler(**config).run()
76 return_val = 0
77 except base.SchedulerQuitException:
78 logger.info("Daemon shut down successfully")
79 return_val = 0
80 except:
81 logger.exception("Daemon terminated for unknown reason ")
82 return_val = -1
83 finally:
84 return return_val
87 def _setup(args):
89 cfg = Config.getInstance()
91 # logging setup
92 handler = logging.handlers.TimedRotatingFileHandler(os.path.join(cfg.getLogDir(), 'scheduler.log'), 'midnight')
93 handler.setFormatter(
94 logging.Formatter(
95 "%(asctime)s %(process)s %(name)s: %(levelname)-8s %(message)s"))
97 if 'log' not in args.__dict__:
98 args.log = 'INFO'
100 level = getattr(logging, args.log)
102 root_logger = logging.getLogger('')
103 root_logger.addHandler(handler)
104 root_logger.setLevel(level)
106 mp_logger = multiprocessing.get_logger()
107 mp_logger.setLevel(level)
108 mp_logger.addHandler(handler)
111 def _check_running(check_process=False):
113 with DBMgr.getInstance().global_connection():
114 status = Client().getStatus()
116 if not check_process:
117 return status['state']
119 if status['pid'] is None:
120 return False
121 return os.path.isdir('/proc/{0}/'.format(status['pid']))
124 def _start(args):
126 _setup(args)
128 running = _check_running()
130 if not args.force and running:
131 raise Exception("The daemon seems to be already running (consider -f?)")
132 if hasattr(args, 'standalone') and args.standalone:
133 SchedulerApp(args).run()
134 else:
135 pid = os.fork()
136 if pid:
137 print pid
138 return
139 else:
140 DBMgr.setInstance(None)
141 SchedulerApp(args).run()
143 return 0
146 def _stop(args):
148 _setup(args)
150 running = _check_running()
152 if not args.force and not running:
153 raise Exception("The daemon doesn't seem to be running (consider -f?)")
155 dbi = DBMgr.getInstance()
156 dbi.startRequest()
157 c = Client()
158 c.shutdown(msg="Daemon script")
159 dbi.commit()
161 print "Waiting for death confirmation... "
162 for i in range(0, 20):
163 if not c.getStatus()['state']:
164 break
165 else:
166 time.sleep(1)
167 dbi.sync()
168 else:
169 print "FAILED!"
171 print "DONE!"
172 dbi.endRequest()
175 def _restart(args):
176 with DBMgr.getInstance().global_connection():
177 status = Client().getStatus()
178 if status['hostname'] is not None and status['hostname'] != socket.getfqdn() and not args.force:
179 raise Exception('The daemon is running on another machine ({0[hostname]}) (consider -f?)'.format(status))
181 _stop(args)
182 _start(args)
185 def _check(args):
186 if not os.path.isdir('/proc'):
187 raise Exception('This command only works on systems that have /proc/')
189 with DBMgr.getInstance().global_connection():
190 status = Client().getStatus()
191 if status['hostname'] is not None and status['hostname'] != socket.getfqdn():
192 print >>sys.stderr, 'The daemon is running on another machine ({0[hostname]})'.format(status)
193 sys.exit(2)
195 db_running = _check_running(False)
196 os_running = _check_running(True)
198 if not args.quiet:
199 print >>sys.stderr, 'Database status: running={1}, host={0[hostname]}, pid={0[pid]}'.format(
200 status, db_running)
201 print >>sys.stderr, 'Process status: running={0}'.format(os_running)
203 if db_running and os_running:
204 print status['pid']
205 sys.exit(0)
206 elif not db_running and not os_running:
207 sys.exit(1)
208 elif db_running and not os_running:
209 if not args.quiet:
210 print >>sys.stderr, 'Marking dead scheduler as not running'
211 SchedulerModule.getDBInstance().setSchedulerRunningStatus(False)
212 DBMgr.getInstance().commit()
213 sys.exit(1)
214 else:
215 print >>sys.stderr, 'Unexpected state! Process is running, but scheduler is not marked as running'
216 sys.exit(2)
219 def _show(args):
221 dbi = DBMgr.getInstance()
223 dbi.startRequest()
224 c = Client()
226 if args.field == "status":
227 status = c.getStatus()
229 if status['state']:
230 print 'Scheduler is currently running on {0[hostname]} with pid {0[pid]}'.format(status)
231 else:
232 print 'Scheduler is currently NOT running'
233 print """
234 Spooled commands: %(spooled)s
236 Tasks:
237 - Waiting: %(waiting)s
238 - Running: %(running)s
239 - Failed: %(failed)s
240 - Finished: %(finished)s
241 """ % status
242 elif args.field == "spool":
244 for op, obj in c.getSpool():
245 if op in ['add', 'del']:
246 print "%s %s" % (op, obj)
247 else:
248 print op
250 dbi.endRequest()
253 def _cmd(args):
255 dbi = DBMgr.getInstance()
257 dbi.startRequest()
258 c = Client()
260 if args.command == "clear_spool":
261 print "%s operations removed" % c.clearSpool()
263 dbi.endRequest()
266 def _run(args):
267 _setup(args)
269 formatter = logging.Formatter("%(asctime)s %(name)s - %(levelname)s %(filename)s:%(lineno)s: %(message)s")
271 root = logging.getLogger('')
272 handler = logging.StreamHandler()
273 handler.setFormatter(formatter)
274 root.addHandler(handler)
276 dbi = DBMgr.getInstance(max_disconnect_poll=40)
277 dbi.startRequest()
279 sm = SchedulerModule.getDBInstance()
280 t = sm.getTaskById(args.taskid)
282 t.plugLogger(logging.getLogger('console.run/%s' % args.taskid))
284 with make_app(True).app_context():
285 t.run()
287 dbi.endRequest()
290 def main():
292 parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__)
293 subparsers = parser.add_subparsers(help="the action to be performed")
295 parser_start = subparsers.add_parser('start', help="start the daemon")
296 parser_stop = subparsers.add_parser('stop', help="stop the daemon")
297 parser_restart = subparsers.add_parser('restart', help="restart the daemon")
298 parser_check = subparsers.add_parser('check', help="check and sync status")
299 parser_show = subparsers.add_parser('show', help="show information")
300 parser_cmd = subparsers.add_parser('cmd', help="execute a command")
301 parser_run = subparsers.add_parser('run', help="run a task, from this process")
303 parser.add_argument("-p", "--fork-processes", dest="fork_processes",
304 action="store_true", required=False,
305 help="obsolete, the scheduler always uses processes")
306 parser.add_argument("-f", "--force", dest="force",
307 action="store_const", const=True,
308 default=False, required=False,
309 help="ignores the information in the DB about scheduler status")
311 parser_start.add_argument("-s", "--standalone",
312 action="store_const", const=True, default=False, required=False,
313 help="forces standalone mode - process doesn't go to background")
315 parser_start.add_argument("--log", type=str, default="INFO", required=False, help="set different logging mode")
317 parser_start.set_defaults(func=_start)
318 parser_stop.set_defaults(func=_stop)
319 parser_restart.set_defaults(func=_restart)
321 parser_check.set_defaults(func=_check)
322 parser_check.add_argument('-q', '--quiet', dest='quiet', action='store_true', help='Suppress console output')
324 parser_show.add_argument("field", choices=['status', 'spool'], type=str, help="information to be shown")
325 parser_show.set_defaults(func=_show)
327 parser_cmd.add_argument("command", choices=['clear_spool'], type=str, help="command to be executed")
328 parser_cmd.set_defaults(func=_cmd)
330 parser_run.add_argument("taskid", type=int, help="task to be executed (id)")
331 parser_run.set_defaults(func=_run)
333 args = parser.parse_args()
334 if args.fork_processes:
335 warnings.warn('The scheduler always uses processes so -p/--fork-processes is not needed anymore',
336 DeprecationWarning, 2)
338 try:
339 return args.func(args)
340 except Exception:
341 import traceback
342 traceback.print_exc()
343 return -1
346 if __name__ == "__main__":
347 main()