1 # This file is part of Indico.
2 # Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN).
4 # Indico is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License as
6 # published by the Free Software Foundation; either version 3 of the
7 # License, or (at your option) any later version.
9 # Indico is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with Indico; if not, see <http://www.gnu.org/licenses/>.
18 This script starts an Indico Scheduler instance, forking it off as a background
25 import multiprocessing
31 from logging
.handlers
import SMTPHandler
33 from indico
.modules
.scheduler
import Scheduler
, SchedulerModule
, Client
, base
34 from indico
.web
.flask
.app
import make_app
37 from indico
.core
.config
import Config
38 from indico
.core
.db
import DBMgr
41 class SchedulerApp(object):
43 def __init__(self
, args
):
44 super(SchedulerApp
, self
).__init
__()
46 config
= Config
.getInstance()
47 worker
= config
.getWorkerName()
49 cp
= ConfigParser
.ConfigParser()
50 logging_conf_file
= os
.path
.join(config
.getConfigurationDir(), "logging.conf")
51 cp
.read(logging_conf_file
)
53 if cp
.has_option('handler_smtp', 'args'):
54 # get e-mail from logging config file
55 log_mail
= eval(cp
.get('handler_smtp', 'args'))[2]
57 log_mail
= config
.getSupportEmail()
59 self
.mailer
= SMTPHandler(config
.getSmtpServer(),
60 'scheduler@%s' % worker
,
62 "[indico_scheduler] Problem at %s" % worker
)
64 self
.mailer
.setLevel(logging
.ERROR
)
67 root_logger
= logging
.getLogger('')
68 root_logger
.addHandler(self
.mailer
)
70 logger
= logging
.getLogger('daemon')
72 if Config
.getInstance().getDebug():
73 config
['sleep_interval'] = 1
75 Scheduler(**config
).run()
77 except base
.SchedulerQuitException
:
78 logger
.info("Daemon shut down successfully")
81 logger
.exception("Daemon terminated for unknown reason ")
89 cfg
= Config
.getInstance()
92 handler
= logging
.handlers
.TimedRotatingFileHandler(os
.path
.join(cfg
.getLogDir(), 'scheduler.log'), 'midnight')
95 "%(asctime)s %(process)s %(name)s: %(levelname)-8s %(message)s"))
97 if 'log' not in args
.__dict
__:
100 level
= getattr(logging
, args
.log
)
102 root_logger
= logging
.getLogger('')
103 root_logger
.addHandler(handler
)
104 root_logger
.setLevel(level
)
106 mp_logger
= multiprocessing
.get_logger()
107 mp_logger
.setLevel(level
)
108 mp_logger
.addHandler(handler
)
111 def _check_running(check_process
=False):
113 with DBMgr
.getInstance().global_connection():
114 status
= Client().getStatus()
116 if not check_process
:
117 return status
['state']
119 if status
['pid'] is None:
121 return os
.path
.isdir('/proc/{0}/'.format(status
['pid']))
128 running
= _check_running()
130 if not args
.force
and running
:
131 raise Exception("The daemon seems to be already running (consider -f?)")
132 if hasattr(args
, 'standalone') and args
.standalone
:
133 SchedulerApp(args
).run()
140 DBMgr
.setInstance(None)
141 SchedulerApp(args
).run()
150 running
= _check_running()
152 if not args
.force
and not running
:
153 raise Exception("The daemon doesn't seem to be running (consider -f?)")
155 dbi
= DBMgr
.getInstance()
158 c
.shutdown(msg
="Daemon script")
161 print "Waiting for death confirmation... "
162 for i
in range(0, 20):
163 if not c
.getStatus()['state']:
176 with DBMgr
.getInstance().global_connection():
177 status
= Client().getStatus()
178 if status
['hostname'] is not None and status
['hostname'] != socket
.getfqdn() and not args
.force
:
179 raise Exception('The daemon is running on another machine ({0[hostname]}) (consider -f?)'.format(status
))
186 if not os
.path
.isdir('/proc'):
187 raise Exception('This command only works on systems that have /proc/')
189 with DBMgr
.getInstance().global_connection():
190 status
= Client().getStatus()
191 if status
['hostname'] is not None and status
['hostname'] != socket
.getfqdn():
192 print >>sys
.stderr
, 'The daemon is running on another machine ({0[hostname]})'.format(status
)
195 db_running
= _check_running(False)
196 os_running
= _check_running(True)
199 print >>sys
.stderr
, 'Database status: running={1}, host={0[hostname]}, pid={0[pid]}'.format(
201 print >>sys
.stderr
, 'Process status: running={0}'.format(os_running
)
203 if db_running
and os_running
:
206 elif not db_running
and not os_running
:
208 elif db_running
and not os_running
:
210 print >>sys
.stderr
, 'Marking dead scheduler as not running'
211 SchedulerModule
.getDBInstance().setSchedulerRunningStatus(False)
212 DBMgr
.getInstance().commit()
215 print >>sys
.stderr
, 'Unexpected state! Process is running, but scheduler is not marked as running'
221 dbi
= DBMgr
.getInstance()
226 if args
.field
== "status":
227 status
= c
.getStatus()
230 print 'Scheduler is currently running on {0[hostname]} with pid {0[pid]}'.format(status
)
232 print 'Scheduler is currently NOT running'
234 Spooled commands: %(spooled)s
237 - Waiting: %(waiting)s
238 - Running: %(running)s
240 - Finished: %(finished)s
242 elif args
.field
== "spool":
244 for op
, obj
in c
.getSpool():
245 if op
in ['add', 'del']:
246 print "%s %s" % (op
, obj
)
255 dbi
= DBMgr
.getInstance()
260 if args
.command
== "clear_spool":
261 print "%s operations removed" % c
.clearSpool()
269 formatter
= logging
.Formatter("%(asctime)s %(name)s - %(levelname)s %(filename)s:%(lineno)s: %(message)s")
271 root
= logging
.getLogger('')
272 handler
= logging
.StreamHandler()
273 handler
.setFormatter(formatter
)
274 root
.addHandler(handler
)
276 dbi
= DBMgr
.getInstance(max_disconnect_poll
=40)
279 sm
= SchedulerModule
.getDBInstance()
280 t
= sm
.getTaskById(args
.taskid
)
282 t
.plugLogger(logging
.getLogger('console.run/%s' % args
.taskid
))
284 with
make_app(True).app_context():
292 parser
= argparse
.ArgumentParser(description
=sys
.modules
[__name__
].__doc
__)
293 subparsers
= parser
.add_subparsers(help="the action to be performed")
295 parser_start
= subparsers
.add_parser('start', help="start the daemon")
296 parser_stop
= subparsers
.add_parser('stop', help="stop the daemon")
297 parser_restart
= subparsers
.add_parser('restart', help="restart the daemon")
298 parser_check
= subparsers
.add_parser('check', help="check and sync status")
299 parser_show
= subparsers
.add_parser('show', help="show information")
300 parser_cmd
= subparsers
.add_parser('cmd', help="execute a command")
301 parser_run
= subparsers
.add_parser('run', help="run a task, from this process")
303 parser
.add_argument("-p", "--fork-processes", dest
="fork_processes",
304 action
="store_true", required
=False,
305 help="obsolete, the scheduler always uses processes")
306 parser
.add_argument("-f", "--force", dest
="force",
307 action
="store_const", const
=True,
308 default
=False, required
=False,
309 help="ignores the information in the DB about scheduler status")
311 parser_start
.add_argument("-s", "--standalone",
312 action
="store_const", const
=True, default
=False, required
=False,
313 help="forces standalone mode - process doesn't go to background")
315 parser_start
.add_argument("--log", type=str, default
="INFO", required
=False, help="set different logging mode")
317 parser_start
.set_defaults(func
=_start
)
318 parser_stop
.set_defaults(func
=_stop
)
319 parser_restart
.set_defaults(func
=_restart
)
321 parser_check
.set_defaults(func
=_check
)
322 parser_check
.add_argument('-q', '--quiet', dest
='quiet', action
='store_true', help='Suppress console output')
324 parser_show
.add_argument("field", choices
=['status', 'spool'], type=str, help="information to be shown")
325 parser_show
.set_defaults(func
=_show
)
327 parser_cmd
.add_argument("command", choices
=['clear_spool'], type=str, help="command to be executed")
328 parser_cmd
.set_defaults(func
=_cmd
)
330 parser_run
.add_argument("taskid", type=int, help="task to be executed (id)")
331 parser_run
.set_defaults(func
=_run
)
333 args
= parser
.parse_args()
334 if args
.fork_processes
:
335 warnings
.warn('The scheduler always uses processes so -p/--fork-processes is not needed anymore',
336 DeprecationWarning, 2)
339 return args
.func(args
)
342 traceback
.print_exc()
346 if __name__
== "__main__":