From 4ee2af01de0ecf5d2e72592c8d2fffc676df0e82 Mon Sep 17 00:00:00 2001 From: Adrian Moennich Date: Fri, 29 May 2015 19:17:18 +0200 Subject: [PATCH] Remove the old scheduler --- doc/dev/source/images/scheduler_arch.svg | 520 --------------------- doc/dev/source/installation.rst | 4 +- doc/guides/source/AdminGuide/AdminGuide.rst | 5 - indico/MaKaC/common/pendingQueues.py | 229 +-------- indico/MaKaC/conference.py | 1 - indico/MaKaC/consoleScripts/installBase.py | 6 +- indico/MaKaC/services/implementation/conference.py | 1 - indico/MaKaC/services/implementation/scheduler.py | 141 ------ indico/MaKaC/services/interface/rpc/handlers.py | 1 - indico/MaKaC/webinterface/pages/admins.py | 32 +- indico/MaKaC/webinterface/rh/conferenceModif.py | 1 - indico/MaKaC/webinterface/rh/taskManager.py | 32 -- indico/MaKaC/webinterface/tpls/TaskManager.tpl | 7 - indico/MaKaC/webinterface/urlHandlers.py | 4 - indico/core/db/migration.py | 10 - indico/htdocs/js/indico/Admin/Scheduler.js | 372 --------------- indico/modules/base.py | 9 +- indico/modules/scheduler/__init__.py | 30 -- indico/modules/scheduler/base.py | 145 ------ indico/modules/scheduler/client.py | 102 ---- indico/modules/scheduler/daemon_script.py | 347 -------------- indico/modules/scheduler/fossils.py | 69 --- indico/modules/scheduler/module.py | 250 ---------- indico/modules/scheduler/server.py | 483 ------------------- indico/modules/scheduler/slave.py | 160 ------- indico/modules/scheduler/tasks/__init__.py | 365 --------------- indico/modules/scheduler/tasks/periodic.py | 164 ------- indico/util/counter.py | 193 -------- indico/util/struct/queue.py | 161 ------- indico/web/assets/bundles.py | 1 - indico/web/flask/app.py | 2 +- indico/web/flask/blueprints/admin.py | 7 +- requirements.txt | 1 - setup.py | 1 - 34 files changed, 34 insertions(+), 3822 deletions(-) delete mode 100644 doc/dev/source/images/scheduler_arch.svg delete mode 100644 indico/MaKaC/services/implementation/scheduler.py delete mode 100644 indico/MaKaC/webinterface/rh/taskManager.py delete mode 100644 indico/MaKaC/webinterface/tpls/TaskManager.tpl delete mode 100644 indico/htdocs/js/indico/Admin/Scheduler.js delete mode 100644 indico/modules/scheduler/__init__.py delete mode 100644 indico/modules/scheduler/base.py delete mode 100644 indico/modules/scheduler/client.py delete mode 100644 indico/modules/scheduler/daemon_script.py delete mode 100644 indico/modules/scheduler/fossils.py delete mode 100644 indico/modules/scheduler/module.py delete mode 100644 indico/modules/scheduler/server.py delete mode 100644 indico/modules/scheduler/slave.py delete mode 100644 indico/modules/scheduler/tasks/__init__.py delete mode 100644 indico/modules/scheduler/tasks/periodic.py delete mode 100644 indico/util/counter.py delete mode 100644 indico/util/struct/queue.py diff --git a/doc/dev/source/images/scheduler_arch.svg b/doc/dev/source/images/scheduler_arch.svg deleted file mode 100644 index 53dab0f23..000000000 --- a/doc/dev/source/images/scheduler_arch.svg +++ /dev/null @@ -1,520 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - image/svg+xml - - - - - - - - - - - - Client - - - Client - - - Client - - - Scheduler - - - - Daemon - Web ServerProcess - Web ServerProcess - Web ServerProcess - Web server - Database Client - Database Client - - diff --git a/doc/dev/source/installation.rst b/doc/dev/source/installation.rst index 3e93648de..a8e55c0c4 100644 --- a/doc/dev/source/installation.rst +++ b/doc/dev/source/installation.rst @@ -313,11 +313,11 @@ From v1.2 on, the URLs will be shorter, alike ``http://my.indico.srv/event/2413/ Post-install tasks ================== -If you wish to use the scheduler daemon (replaces old ``taskDaemon``), then you should run: +For background tasks you need to run the Celery scheduler daemon: .. code-block:: console - # sudo -u apache indico_scheduler start + # indico celery worker -l INFO -B If you have changed your server host name for some reason you may need to delete ``/opt/indico/tmp/vars.js.tpl.tmp``. diff --git a/doc/guides/source/AdminGuide/AdminGuide.rst b/doc/guides/source/AdminGuide/AdminGuide.rst index 432264189..6b4b14cd6 100644 --- a/doc/guides/source/AdminGuide/AdminGuide.rst +++ b/doc/guides/source/AdminGuide/AdminGuide.rst @@ -323,11 +323,6 @@ The calendar notification contains the following information about the event: This plugin is enabled by default for each user but the user can disable it from the *User Personal Preferences* page. The Synchronization is run at a specific interval by the indico-scheduler. -To run the scheduler task for synchronizing events which the calendar, you have to run following commands in *indico_shell*:: - - from indico.ext.calendaring.outlook.tasks import OutlookTaskRegistry - OutlookTaskRegistry.register() - |image14| diff --git a/indico/MaKaC/common/pendingQueues.py b/indico/MaKaC/common/pendingQueues.py index 6edcecc1d..935920c1c 100644 --- a/indico/MaKaC/common/pendingQueues.py +++ b/indico/MaKaC/common/pendingQueues.py @@ -14,14 +14,10 @@ # You should have received a copy of the GNU General Public License # along with Indico; if not, see . -from datetime import timedelta - from persistent import Persistent from MaKaC.common import indexes, mail from MaKaC.common.info import HelperMaKaCInfo -from MaKaC.common.timezoneUtils import nowutc -from MaKaC.user import AvatarHolder from MaKaC.webinterface import urlHandlers from MaKaC.webinterface.mail import GenericNotification @@ -29,8 +25,6 @@ from indico.core import signals from indico.core.config import Config from indico.core.logger import Logger from indico.modules.auth.util import url_for_login -from indico.modules.scheduler import Scheduler -from indico.modules.scheduler.tasks import OneShotTask logger = Logger.get('pending') @@ -45,30 +39,21 @@ class PendingHolder(object): def __init__(self): """Index by email of all the request and all the tasks with the reminders""" - self._id="" - self._idx = None # All the pendign users - self._tasksIdx=None # Tasks which send reminder emails periodically asking for the creation of one indico account - self._reminder=PendingReminder + self._id = "" + self._idx = None # All the pending users def getPendingByEmail(self, email): return self._idx.matchPendingUser(email) def removePending(self, sb): - """Remove the pendant from the queue, and if it's the last one then remove the task""" + """Remove the pendant from the queue""" self._idx.unindexPendingUser(sb) - email=sb.getEmail().lower().strip() - if self.getPendingByEmail(email)==[]: - self._removeTask(email) - def addPending(self, sb, sendEmail=True, sendPeriodicEmail=False): - """Add a new user to the index, and a new task only the first time""" + def addPending(self, sb, sendEmail=True): + """Add a new user to the index""" self._idx.indexPendingUser(sb) - email=sb.getEmail().lower().strip() - sendEmail=sendEmail and not self._hasTask(email) if sendEmail: self._sendReminderEmail(sb) - if sendPeriodicEmail: - self._addTask(email) def grantRights(self, av): """We must implement this method in order to grant the specific rights to the new user""" @@ -79,39 +64,6 @@ class PendingHolder(object): pass - def _getTasksIdx(self): - return self._tasksIdx - - def _addTask(self, email): - """Creating a task in order to send reminder emails periodically. - It's mandatory to implement this method for the specific pending queues""" - # ------ Creating a task in order to send reminder emails periodically ------ - if not self._hasTask(email): - # Create the object which send the email - pedingReminder = self._reminder(email) - pedingReminder.setId("ReminderPending%s-%s" % (self._id,email)) - # Create the task - t=task() - t.addObj(pedingReminder) - t.setInterval(timedelta(7)) # Remind each 7 days - nw=nowutc() - t.setLastDate(nw) # start in 7 days cos the first email was already sent - t.setEndDate(nw+timedelta(15)) # keep trying during 15 days - self._tasksIdx.indexTask(email, t) - - Scheduler.addTask(t) - - - def _removeTask(self, email): - if self._hasTask(email): - t=self._getTasksIdx().matchTask(email)[0] - Scheduler.removeTask(t) - self._tasksIdx.unindexTask(email, t) - - def _hasTask(self, email): - return self._getTasksIdx().matchTask(email) != [] - - class _PendingNotification(GenericNotification): def __init__(self, psList): @@ -146,39 +98,6 @@ class _PendingNotification(GenericNotification): return d -class PendingReminder(OneShotTask): - def __init__(self, email, **kwargs): - super(PendingReminder, self).__init__(**kwargs) - self._email = email - - def run(self): - """Mandatory to implement for the specific queues. - It runs the task of sending an email to the users in order to - ask them for the creation of the account. - - It returns: - - TRUE if the pending user has already created the account. - - FALSE if not.""" - ah = AvatarHolder() - results=ah.match({"email":self._email}, exact=1) - av=None - for i in results: - if i.getEmail().lower().strip()==self._email.lower().strip(): - av=i - break - if av is not None and av.isActivated(): - ph=PendingQueuesHolder() - ph.grantRights(av) - return True - return False - - def tearDown(self): - '''Inheriting classes must implement this method''' - raise Exception('Unimplemented tearDown') - - - def getEmail(self): - return self._email #---END GENERAL @@ -196,8 +115,6 @@ class PendingConfSubmittersHolder(PendingHolder): """Index by email of all the request to add Chairpersons or Speakers""" self._id="ConfSubmitters" self._idx = indexes.IndexesHolder().getById("pendingConfSubmitters") # All the Conference Chairpersons/Speakers - self._tasksIdx=indexes.IndexesHolder().getById("pendingConfSubmittersTasks") # Tasks which send reminder emails periodically asking for the creation of one indico account - self._reminder=PendingConfSubmitterReminder def grantRights(self, av): l = self.getPendingByEmail(av.getEmail()) @@ -250,24 +167,6 @@ class _PendingConfSubmitterNotification(_PendingNotification): return participations -class PendingConfSubmitterReminder(PendingReminder): - - def run(self): - if not PendingReminder.run(self): - psl = PendingConfSubmittersHolder().getPendingByEmail(self._email) - if psl: - mail.GenericMailer.send(_PendingConfSubmitterNotification(psl)) - - def tearDown(self): - for submitter in PendingConfSubmittersHolder().getPendingByEmail(self._email): - submitter.getConference().getPendingQueuesMgr().removePendingConfSubmitter(submitter) - - def getPendings(self): - try: - return PendingConfSubmittersHolder().getPendingByEmail(self._email) - except: - return None - class PendingSubmittersHolder(PendingHolder): """ This is an index that holds all the requests to add Authors and Speakers in the @@ -280,8 +179,6 @@ class PendingSubmittersHolder(PendingHolder): """Index by email of all the request to add Authors or Speakers""" self._id="Submitters" self._idx = indexes.IndexesHolder().getById("pendingSubmitters") # All the ContributionParticipants - self._tasksIdx=indexes.IndexesHolder().getById("pendingSubmittersTasks") # Tasks which send reminder emails periodically asking for the creation of one indico account - self._reminder=PendingSubmitterReminder def grantRights(self, av): l = self.getPendingByEmail(av.getEmail()) @@ -353,29 +250,6 @@ class _PendingSubmitterNotification(_PendingNotification): participations+="\t\t\t\t - Access: %s\n" % accessURL return participations -class PendingSubmitterReminder(PendingReminder): - - def run(self): - hasAccount=PendingReminder.run(self) - if not hasAccount: - psh=PendingSubmittersHolder() - psl=psh.getPendingByEmail(self._email) - if psl != [] and psl is not None: - notif = _PendingSubmitterNotification( psl ) - mail.GenericMailer.send( notif ) - - def tearDown(self): - psh=PendingSubmittersHolder() - psl=psh.getPendingByEmail(self._email) - for e in psl: - e.getConference().getPendingQueuesMgr().removePendingSubmitter(e) - - def getPendings(self): - psh=PendingSubmittersHolder() - try: - return psh.getPendingByEmail(self._email) - except: - return None #---END SUBMITTERS @@ -392,9 +266,6 @@ class PendingManagersHolder(PendingHolder): """Index by email of all the requests""" self._id="Managers" self._idx = indexes.IndexesHolder().getById("pendingManagers") # All the pending managers - self._tasksIdx=indexes.IndexesHolder().getById("pendingManagersTasks") # Tasks which send reminder emails periodically asking - # for the creation of one indico account - self._reminder=PendingManagerReminder def grantRights(self, av): l=self.getPendingByEmail(av.getEmail()) @@ -455,26 +326,6 @@ class _PendingManagerNotification(_PendingNotification): return participations -class PendingManagerReminder(PendingReminder): - - def run(self): - hasAccount=PendingReminder.run(self) - if not hasAccount: - psh=PendingManagersHolder() - psl=psh.getPendingByEmail(self._email) - if psl != [] and psl is not None: - notif = _PendingManagerNotification( psl ) - mail.GenericMailer.send( notif ) - - def tearDown(self): - psh = PendingManagersHolder() - psl = psh.getPendingByEmail(self._email) - for e in psl: - e.getConference().getPendingQueuesMgr().removePendingManager(e) - - def getPendings(self): - psh=PendingManagersHolder() - return psh.getPendingByEmail(self._email) #---END MANAGERS @@ -491,9 +342,6 @@ class PendingConfManagersHolder(PendingHolder): """Index by email of all the requests""" self._id="ConfManagers" self._idx = indexes.IndexesHolder().getById("pendingConfManagers") # All the pending managers - self._tasksIdx=indexes.IndexesHolder().getById("pendingConfManagersTasks") # Tasks which send reminder emails periodically asking - # for the creation of one indico account - self._reminder=PendingConfManagerReminder def grantRights(self, av): l=self.getPendingByEmail(av.getEmail()) @@ -549,25 +397,6 @@ class _PendingConfManagerNotification(_PendingNotification): return participations -class PendingConfManagerReminder(PendingReminder): - def run(self): - hasAccount=PendingReminder.run(self) - if not hasAccount: - psh=PendingConfManagersHolder() - psl=psh.getPendingByEmail(self._email) - if psl != [] and psl is not None: - notif = _PendingConfManagerNotification( psl ) - mail.GenericMailer.send( notif ) - - def tearDown(self): - psh = PendingConfManagersHolder() - psl = psh.getPendingByEmail(self._email) - for e in psl: - e.getConference().getPendingQueuesMgr().removePendingConfManager(e) - - def getPendings(self): - psh = PendingManagersHolder() - return psh.getPendingByEmail(self._email) #---END MANAGERS #---PENDING COORDINATORS @@ -583,8 +412,6 @@ class PendingCoordinatorsHolder(PendingHolder): """Index by email of all the requests""" self._id="Coordinators" self._idx = indexes.IndexesHolder().getById("pendingCoordinators") # All the pending coordinators - self._tasksIdx=indexes.IndexesHolder().getById("pendingCoordinatorsTasks") # Tasks which send reminder emails periodically asking for the creation of one indico account - self._reminder=PendingCoordinatorReminder def grantRights(self, av): l=self.getPendingByEmail(av.getEmail()) @@ -645,25 +472,6 @@ class _PendingCoordinatorNotification(_PendingNotification): return participations -class PendingCoordinatorReminder(PendingReminder): - def run(self): - hasAccount=PendingReminder.run(self) - if not hasAccount: - psh = PendingCoordinatorsHolder() - psl = psh.getPendingByEmail(self._email) - if psl != [] and psl is not None: - notif = _PendingCoordinatorNotification(psl) - mail.GenericMailer.send(notif) - - def tearDown(self): - psh = PendingCoordinatorsHolder() - psl = psh.getPendingByEmail(self._email) - for e in psl: - e.getConference().getPendingQueuesMgr().removePendingCoordinator(e) - - def getPendings(self): - psh = PendingCoordinatorsHolder() - return psh.getPendingByEmail(self._email) #---END COORDINATORS @@ -795,10 +603,10 @@ class ConfPendingQueuesMgr(Persistent): keys=self.getPendingConfSubmitters().keys() return keys - def addSubmitter(self, ps, owner, sendEmail=True, sendPeriodicEmail=False): + def addSubmitter(self, ps, owner, sendEmail=True): from MaKaC.conference import Conference if isinstance(owner, Conference): - self.addPendingConfSubmitter(ps, sendEmail=True, sendPeriodicEmail=False) + self.addPendingConfSubmitter(ps, sendEmail=True) mail.GenericMailer.sendAndLog(_PendingConfSubmitterNotification([ps]), owner) def removeSubmitter(self, ps, owner): @@ -806,7 +614,7 @@ class ConfPendingQueuesMgr(Persistent): if isinstance(owner, Conference): self.removePendingConfSubmitter(ps) - def addPendingConfManager(self, ps, sendEmail=True, sendPeriodicEmail=False): + def addPendingConfManager(self, ps, sendEmail=True): email=ps.getEmail().lower().strip() if self.getPendingConfManagers().has_key(email): if not ps in self._pendingConfManagers[email]: @@ -814,7 +622,7 @@ class ConfPendingQueuesMgr(Persistent): else: self._pendingConfManagers[email] = [ps] pendings=PendingConfManagersHolder() - pendings.addPending(ps, sendEmail, sendPeriodicEmail) + pendings.addPending(ps, sendEmail) self.notifyModification() def removePendingConfManager(self, ps): @@ -828,7 +636,7 @@ class ConfPendingQueuesMgr(Persistent): del self._pendingConfManagers[email] self.notifyModification() - def addPendingConfSubmitter(self, ps, sendEmail=True, sendPeriodicEmail=False): + def addPendingConfSubmitter(self, ps, sendEmail=True): email=ps.getEmail().lower().strip() if self.getPendingConfSubmitters().has_key(email): if not ps in self._pendingConfSubmitters[email]: @@ -836,7 +644,7 @@ class ConfPendingQueuesMgr(Persistent): else: self._pendingConfSubmitters[email] = [ps] pendings=PendingConfSubmittersHolder() - pendings.addPending(ps, sendEmail, sendPeriodicEmail) + pendings.addPending(ps, sendEmail) self.notifyModification() def removePendingConfSubmitter(self, ps): @@ -898,7 +706,7 @@ class ConfPendingQueuesMgr(Persistent): del self._pendingSubmitters[email] self.notifyModification() - def addPendingSubmitter(self, ps, sendEmail=True, sendPeriodicEmail=False): + def addPendingSubmitter(self, ps, sendEmail=True): # Used only from contributions. # TODO: when refactoring, this method should be renamed and called only from self.addSubmitter email=ps.getEmail().lower().strip() @@ -908,7 +716,7 @@ class ConfPendingQueuesMgr(Persistent): else: self._pendingSubmitters[email] = [ps] pendings=PendingSubmittersHolder() - pendings.addPending(ps, sendEmail, sendPeriodicEmail) + pendings.addPending(ps, sendEmail) self.notifyModification() def getPendingSubmittersByEmail(self, email): @@ -943,7 +751,7 @@ class ConfPendingQueuesMgr(Persistent): keys=self.getPendingManagers().keys() return keys - def addPendingManager(self, ps, sendEmail=True, sendPeriodicEmail=False): + def addPendingManager(self, ps, sendEmail=True): email=ps.getEmail().lower().strip() if self.getPendingManagers().has_key(email): if not ps in self._pendingManagers[email]: @@ -951,7 +759,7 @@ class ConfPendingQueuesMgr(Persistent): else: self._pendingManagers[email] = [ps] pendings=PendingManagersHolder() - pendings.addPending(ps, sendEmail, sendPeriodicEmail) + pendings.addPending(ps, sendEmail) self.notifyModification() def removePendingManager(self, ps): @@ -998,7 +806,7 @@ class ConfPendingQueuesMgr(Persistent): keys=self.getPendingCoordinators().keys() return keys - def addPendingCoordinator(self, ps, sendEmail=True, sendPeriodicEmail=False): + def addPendingCoordinator(self, ps, sendEmail=True): email=ps.getEmail().lower().strip() if self.getPendingCoordinators().has_key(email): if not ps in self._pendingCoordinators[email]: @@ -1006,7 +814,7 @@ class ConfPendingQueuesMgr(Persistent): else: self._pendingCoordinators[email] = [ps] pendings=PendingCoordinatorsHolder() - pendings.addPending(ps, sendEmail, sendPeriodicEmail) + pendings.addPending(ps, sendEmail) self.notifyModification() def removePendingCoordinator(self, ps): @@ -1038,8 +846,7 @@ class ConfPendingQueuesMgr(Persistent): @signals.users.registered.connect def _on_user_register(user, **kwargs): - """Remove newly-added users from pending lists - """ + """Remove newly-added users from pending lists""" pending_submitter = PendingSubmittersHolder().getPendingByEmail(user.email) if pending_submitter: principal = pending_submitter[0] diff --git a/indico/MaKaC/conference.py b/indico/MaKaC/conference.py index def0c0507..890c0219f 100644 --- a/indico/MaKaC/conference.py +++ b/indico/MaKaC/conference.py @@ -98,7 +98,6 @@ from indico.core.logger import Logger from MaKaC.common.contextManager import ContextManager import zope.interface -from indico.modules.scheduler import Client, tasks from indico.core import signals from indico.util.date_time import utc_timestamp, format_datetime from indico.core.index import IIndexableByStartDateTime, IUniqueIdProvider, Catalog diff --git a/indico/MaKaC/consoleScripts/installBase.py b/indico/MaKaC/consoleScripts/installBase.py index 3822ce31f..b9e130b2c 100644 --- a/indico/MaKaC/consoleScripts/installBase.py +++ b/indico/MaKaC/consoleScripts/installBase.py @@ -586,12 +586,14 @@ For information on how to configure Apache HTTPD, take a look at: http://indico.readthedocs.org/en/latest/installation/#configuring-the-web-server -Please do not forget to start the scheduler in order to use alarms, creation -of off-line websites, reminders, etc. You can manage it with the 'indico_scheduler' command. +Please do not forget to start the Celery worker in order to use background tasks +such as event reminders and periodic cleanups. You can run it using this command: +$ indico celery worker -l INFO -B %s """ % (targetDirs['etc'], targetDirs['bin'], targetDirs['doc'], targetDirs['etc'], targetDirs['htdocs'], _databaseText(targetDirs['etc'])) + def _databaseText(cfgPrefix): return """If you are running ZODB on this host: - Review %s/zodb.conf and %s/zdctl.conf to make sure everything is ok. diff --git a/indico/MaKaC/services/implementation/conference.py b/indico/MaKaC/services/implementation/conference.py index ed2f8a670..d87d0b08c 100644 --- a/indico/MaKaC/services/implementation/conference.py +++ b/indico/MaKaC/services/implementation/conference.py @@ -59,7 +59,6 @@ from MaKaC.services.interface.rpc.common import (HTMLSecurityError, NoReportErro # indico imports -from indico.modules.scheduler import tasks from indico.modules.users.legacy import AvatarUserWrapper from indico.modules.users.util import get_user_by_email from indico.util.user import principal_from_fossil diff --git a/indico/MaKaC/services/implementation/scheduler.py b/indico/MaKaC/services/implementation/scheduler.py deleted file mode 100644 index 5116800db..000000000 --- a/indico/MaKaC/services/implementation/scheduler.py +++ /dev/null @@ -1,141 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -""" -AJAX Services for Scheduler (admin) -""" - -from MaKaC.services.implementation.base import AdminService -from MaKaC.services.interface.rpc.common import ServiceError - -from indico.modules import ModuleHolder -from indico.modules.scheduler import Client -from indico.util import fossilize, date_time - - -class SchedulerModuleAdminService(AdminService): - - def _checkParams(self): - AdminService._checkParams(self) - self.schedModule = ModuleHolder().getById('scheduler') - - -class GetSchedulerSummary(SchedulerModuleAdminService): - """ - Returns a summary of the status of the scheduler daemon - """ - - def _getAnswer(self): - return fossilize.fossilize(self.schedModule.getStatus()) - - -class GetWaitingTaskList(SchedulerModuleAdminService): - """ - Returns the list of tasks currently waiting to be executed - """ - - def _getAnswer(self): - return fossilize.fossilize([v for k, v in self.schedModule.getWaitingQueue()]) - - -class GetRunningTaskList(SchedulerModuleAdminService): - """ - Returns the list of tasks currently running - """ - - def _getAnswer(self): - return fossilize.fossilize(self.schedModule.getRunningList()) - - -class GetFailedTaskList(SchedulerModuleAdminService): - """ - Returns the list of tasks currently running - """ - - def _getAnswer(self): - return fossilize.fossilize(x for x in reversed(self.schedModule.getFailedIndex().values()) - if not hasattr(x, '__Broken_state__') and - not hasattr(getattr(x, '_task', None), '__Broken_state__')) - - -class GetFinishedTaskList(SchedulerModuleAdminService): - """ - Returns the list of tasks that have successfully finished their execution - """ - - def _checkParams(self): - SchedulerModuleAdminService._checkParams(self) - self._criteria = dict((k,v) for (k,v) in - self._params.iteritems() if k in ['ndays']) - - def _getAnswer(self): - - idx = self.schedModule.getFinishedIndex() - - if len(idx) > 0: - # 1 day - ts = idx.maxKey() - 24*60*60 - - return fossilize.fossilize(x for x in idx.itervalues(ts) - if not hasattr(getattr(x, '_task', None), '__Broken_state__')) - else: - return {} - - -class TaskAdminService(SchedulerModuleAdminService): - """ - Common to all tasks - """ - - def _checkParams(self): - SchedulerModuleAdminService._checkParams(self) - tid = self._params.get('id', None) - - if tid == None: - raise ServiceError('ERR-T0','Task Id not provided') - - self._client = Client() - self._task = self._client.getTask(tid) - - -class DeleteTask(TaskAdminService): - """ - Deletes a task - """ - - def _getAnswer(self): - self._client.dequeue(self._task) - - -class RunFailedTask(TaskAdminService): - """ - Runs a (failed) task now - """ - - def _getAnswer(self): - - self._task.setStartOn(date_time.nowutc()) - self._client.startFailedTask(self._task) - -methodMap = { - "summary": GetSchedulerSummary, - "tasks.listWaiting": GetWaitingTaskList, - "tasks.listRunning": GetRunningTaskList, - "tasks.listFailed": GetFailedTaskList, - "tasks.listFinished": GetFinishedTaskList, - "tasks.delete" : DeleteTask, - "tasks.runFailed" : RunFailedTask - } diff --git a/indico/MaKaC/services/interface/rpc/handlers.py b/indico/MaKaC/services/interface/rpc/handlers.py index 3390bdb0a..14faffd0d 100644 --- a/indico/MaKaC/services/interface/rpc/handlers.py +++ b/indico/MaKaC/services/interface/rpc/handlers.py @@ -77,7 +77,6 @@ endpointMap = { "category": importModule('MaKaC.services.implementation.category'), "upcomingEvents": importModule('MaKaC.services.implementation.upcoming'), "timezone": importModule('MaKaC.services.implementation.timezone'), - "scheduler": importModule('MaKaC.services.implementation.scheduler'), "abstractReviewing": importModule('MaKaC.services.implementation.abstractReviewing'), "abstract": importModule('MaKaC.services.implementation.abstract'), "abstracts": importModule('MaKaC.services.implementation.abstracts'), diff --git a/indico/MaKaC/webinterface/pages/admins.py b/indico/MaKaC/webinterface/pages/admins.py index 6a24a4850..f2dd16843 100644 --- a/indico/MaKaC/webinterface/pages/admins.py +++ b/indico/MaKaC/webinterface/pages/admins.py @@ -855,12 +855,10 @@ class WPAdminsSystemBase(WPAdminsBase): def _createTabCtrl(self): self._tabCtrl = wcomponents.TabControl() - self._subTabConfiguration = self._tabCtrl.newTab("configuration", _("Configuration"), \ - urlHandlers.UHAdminsSystem.getURL()) - self._subTabTaskManager = self._tabCtrl.newTab("tasks", _("Task Manager"), \ - urlHandlers.UHTaskManager.getURL()) - self._subTabMaintenance = self._tabCtrl.newTab("maintenance", _("Maintenance"), \ - urlHandlers.UHMaintenance.getURL()) + self._subTabConfiguration = self._tabCtrl.newTab("configuration", _("Configuration"), + urlHandlers.UHAdminsSystem.getURL()) + self._subTabMaintenance = self._tabCtrl.newTab("maintenance", _("Maintenance"), + urlHandlers.UHMaintenance.getURL()) def _getPageContent(self, params): return wcomponents.WTabControl(self._tabCtrl, self._getAW()).getHTML(self._getTabContent(params)) @@ -987,28 +985,6 @@ class WPMaintenancePack(WPMaintenanceBase): """ % wc.getHTML(msg, url, {}) -class WPTaskManagerBase(WPAdminsSystemBase): - - def __init__(self, rh): - WPAdminsBase.__init__(self, rh) - - def _setActiveTab(self): - self._subTabTaskManager.setActive() - - -class WPTaskManager(WPTaskManagerBase): - - def _getTabContent(self, params): - wc = WTaskManager() - - pars = {} - return wc.getHTML(pars) - - -class WTaskManager(wcomponents.WTemplated): - pass - - class WPIPBasedACL( WPServicesCommon ): def __init__( self, rh ): diff --git a/indico/MaKaC/webinterface/rh/conferenceModif.py b/indico/MaKaC/webinterface/rh/conferenceModif.py index 88092a835..74b868728 100644 --- a/indico/MaKaC/webinterface/rh/conferenceModif.py +++ b/indico/MaKaC/webinterface/rh/conferenceModif.py @@ -65,7 +65,6 @@ from MaKaC.review import AbstractStatusSubmitted, AbstractStatusProposedToAccept import MaKaC.webinterface.pages.abstracts as abstracts from MaKaC.fossils.conference import ISessionBasicFossil -from indico.modules.scheduler import Client from indico.util import json from indico.web.http_api.metadata.serializer import Serializer from indico.web.flask.util import send_file, url_for diff --git a/indico/MaKaC/webinterface/rh/taskManager.py b/indico/MaKaC/webinterface/rh/taskManager.py deleted file mode 100644 index 2427d24c8..000000000 --- a/indico/MaKaC/webinterface/rh/taskManager.py +++ /dev/null @@ -1,32 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -import MaKaC.webinterface.rh.admins as admins -import MaKaC.webinterface.urlHandlers as urlHandlers -from MaKaC.webinterface.pages import admins as adminPages - - -class RHTaskManagerBase(admins.RHAdminBase): - pass - - -class RHTaskManager(RHTaskManagerBase): - _uh = urlHandlers.UHTaskManager - - def _process(self): - - p = adminPages.WPTaskManager(self) - return p.display() diff --git a/indico/MaKaC/webinterface/tpls/TaskManager.tpl b/indico/MaKaC/webinterface/tpls/TaskManager.tpl deleted file mode 100644 index 7064e05b1..000000000 --- a/indico/MaKaC/webinterface/tpls/TaskManager.tpl +++ /dev/null @@ -1,7 +0,0 @@ -
-
- - diff --git a/indico/MaKaC/webinterface/urlHandlers.py b/indico/MaKaC/webinterface/urlHandlers.py index 14b36ecf4..3cf6736ae 100644 --- a/indico/MaKaC/webinterface/urlHandlers.py +++ b/indico/MaKaC/webinterface/urlHandlers.py @@ -1881,10 +1881,6 @@ class UHConfModFullMaterialPackagePerform(URLHandler): _endpoint = 'event_mgmt.confModifTools-performMatPkg' -class UHTaskManager(URLHandler): - _endpoint = 'admin.taskManager' - - class UHUpdateNews(URLHandler): _endpoint = 'admin.updateNews' diff --git a/indico/core/db/migration.py b/indico/core/db/migration.py index 9c2173e17..59602f7c7 100644 --- a/indico/core/db/migration.py +++ b/indico/core/db/migration.py @@ -34,16 +34,6 @@ globalname_dict = { "PersistentList": ("persistent.list", None), 'SlotSchedule': ('MaKaC.schedule', 'SlotSchedule'), 'PosterSlotSchedule': ('MaKaC.schedule', 'PosterSlotSchedule'), - "PeriodicTask": ("indico.modules.scheduler.tasks.periodic", None), - "PeriodicUniqueTask": ("indico.modules.scheduler.tasks.periodic", None), - "TaskOccurrence": ("indico.modules.scheduler.tasks.periodic", None), - "CategoryStatisticsUpdaterTask": ("indico.modules.scheduler.tasks.periodic", None), - "FoundationSyncTask": ("indico.modules.scheduler.tasks.periodic", None), - "SamplePeriodicTask": ("indico.modules.scheduler.tasks.periodic", None), - 'RoomReservationTask': ('indico.modules.scheduler.tasks', 'DeletedTask'), - 'RoomReservationEndTask': ('indico.modules.scheduler.tasks', 'DeletedTask'), - 'FoundationSyncTask': ('indico.modules.scheduler.tasks', 'DeletedTask'), - 'LiveSyncUpdateTask': ('indico.modules.scheduler.tasks', 'DeletedTask'), 'UHConferenceInstantMessaging': ('MaKaC.webinterface.urlHandlers', 'URLHandler'), 'Avatar': ('indico.modules.users.legacy', 'AvatarUserWrapper'), 'Group': ('indico.modules.groups.legacy', 'LocalGroupWrapper'), diff --git a/indico/htdocs/js/indico/Admin/Scheduler.js b/indico/htdocs/js/indico/Admin/Scheduler.js deleted file mode 100644 index 2a0d61107..000000000 --- a/indico/htdocs/js/indico/Admin/Scheduler.js +++ /dev/null @@ -1,372 +0,0 @@ -/* This file is part of Indico. - * Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). - * - * Indico is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License as - * published by the Free Software Foundation; either version 3 of the - * License, or (at your option) any later version. - * - * Indico is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Indico; if not, see . - */ - -type('TaskActions', [], - { - _request: function(method, args, success) { - var killProgress = - IndicoUI.Dialogs.Util.progress($T("Executing Operation...")); - - indicoRequest(method, args, - function(result,error){ - killProgress(); - if(error) { - IndicoUtil.errorReport(error); - } else { - success(result); - } - }); - }, - - deleteTask: function(task) { - this._request('scheduler.tasks.delete', - {id: task.id}, - function(result) { - // add some kind of decent notification bar? - new AlertPopup($T("Success"), $T("Task deleted.")).open(); - this._table.refresh(); - }); - }, - - runFailedTask: function(task) { - this._request('scheduler.tasks.runFailed', - {id: task.id}, - function(result) { - // add some kind of decent notification bar? - new AlertPopup($T("Success"), $T("Task failed ran.")).open(); - this._table.refresh(); - }); - } - - }, - function(table) { - this._table = table; - }); - -type('TaskTable', ['RemoteTableWidget'], - { - draw: function() { - return Html.div({}, - this._drawToolbar(), - RemoteTableWidget.prototype.draw.call(this) - ); - }, - - _drawItem: function(pair) { - var task = pair.get(); - - // an 'element' is a tuple (ok, list) [timestamp, {...}], - // where the second element is an object with the task attributes - return concat(this._taskLine(task), - [Html.td({}, - this._drawLineOptions(task))]); - }, - - _formatDateTime: function(dateTime, format){ - return Util.DateTime.friendlyDateTime(dateTime, format); - }, - - _taskLine: function(task) { - return [Html.td({}, task.id), - Html.td({}, task.typeId), - Html.td({}, this._formatDateTime(task.startOn, - IndicoDateTimeFormats.Default)), - Html.td({}, this._formatDateTime(task.createdOn, - IndicoDateTimeFormats.Default))]; - }, - - _drawToolbar: function() { - var self = this; - - var refreshCount = $B(Html.span("smallGrey"), - this._timeLeft, - function(val) { - return val + " " + $T("sec. left"); - }); - - var chooser = new Chooser( - { - 'stopMode': command(function() { - chooser.set('resumeMode'); - clearTimeout(self._timeout); - }, $T('Stop monitoring')), - 'resumeMode': command(function() { - chooser.set('stopMode'); - self._timeout = setTimeout( - function() { - self._countToRefresh(self._refreshTime); - }, 1000); - }, $T('Resume monitoring')) - }); - - chooser.set('stopMode'); - - return Html.div({style: {height: '50px'}}, - Html.ul({className: "horizontalMenu", - style: {cssFloat: 'left', height: '20px'}}, - Html.li({}, - Widget.link(chooser)), - Html.li({}, Widget.link(command(function() { - // do the actual refresh - self.source.refresh(); - - // reset refresh time counter - self._timeLeft.set(self._refreshTime); - }, $T('Refresh now')))), - Html.li({}, refreshCount)), - Html.div({style:{cssFloat: 'left', marginTop: '4px', - padding:'4px'}}, - this._progress)); - }, - - _drawLineOptions: function() { - return ''; - }, - - // counting function - _countToRefresh: function(refreshTime) { - var self = this; - var val = this._timeLeft.get(); - if (val > 1) { - this._timeLeft.set(val - 1) - } else { - this._timeLeft.set(0); - this.refresh(); - this._timeLeft.set(refreshTime); - } - // store the timeout reference, so that we can cancel it - // when we switch tabs - this._timeout = setTimeout(function() { - self._countToRefresh(refreshTime); - }, 1000); - }, - - _runIndicator: function() { - this._progress.dom.style.display = 'inline'; - }, - - _stopIndicator: function() { - this._progress.dom.style.display = 'none'; - }, - - refresh: function() { - this.source.refresh(); - } - - }, - function(method, args, refreshTime) { - this._progress = progressIndicator(true); - - this._refreshTime = refreshTime || 5; - this.RemoteTableWidget("schedTaskList", method, args || {}); - this._actions = new TaskActions(this); - - // counter till the next refresh - this._timeLeft = new WatchValue(this._refreshTime); - - var self = this; - this._timeout = setTimeout(function() { - self._countToRefresh(self._refreshTime) - }, 1000); - - }); - -type('FinishedTaskTable', ['TaskTable'], - { - - _getHeader: function() { - return Html.tr({}, - Html.th({},'ID'), - Html.th({},'Task type'), - Html.th({},'Started on'), - Html.th({},'Ended on'), - Html.th({},'Creation date'), - Html.th({}, 'Options')); - }, - - _taskLine: function(task) { - if (task._fossil == 'taskOccurrence') { - return [Html.td({}, task.task.id + ":" + task.id), - Html.td({}, task.task.typeId), - Html.td({}, this._formatDateTime( - task.startedOn, - IndicoDateTimeFormats.Default)), - Html.td({}, this._formatDateTime( - task.endedOn, - IndicoDateTimeFormats.Default)), - Html.td({}, this._formatDateTime( - task.task.createdOn, - IndicoDateTimeFormats.Default))]; - } else { - return [Html.td({}, task.id), - Html.td({}, task.typeId), - Html.td({}, this._formatDateTime( - task.startedOn, - IndicoDateTimeFormats.Default)), - Html.td({}, this._formatDateTime( - task.endedOn, - IndicoDateTimeFormats.Default)), - Html.td({}, this._formatDateTime( - task.createdOn, - IndicoDateTimeFormats.Default))]; - } - } - }, - function(method, criteria) { - this.TaskTable(method, criteria, 30); - }); - - -type('FailedTaskTable', ['FinishedTaskTable'], - { - _runTask: function(task) { - this._actions.runFailedTask(task); - }, - - _drawLineOptions: function(task) { - var self = this; - - return task._fossil == 'taskOccurrence' ? '' : - Html.div({}, Widget.link( - command( - function() { - self._runTask(task); - }, "Run now"))); - } - }, - function(method, criteria) { - this.FinishedTaskTable(method, criteria); - }); - - -type('SchedulerSummaryWidget', ['RemoteWidget'], - { - drawContent: function() { - var data = this.source.get(); - - var state = Html.tr({}, - Html.td({}, $T('State')), - Html.td({}, data.state ? - Html.span({}, $T('Enabled')) : - Html.span({}, $T('Disabled')))); - - var hostname = Html.tr({}, - Html.td({}, $T('Hostname')), - Html.td({}, Html.span({}, data.state ? data.hostname : 'n/a'))); - var pid = Html.tr({}, - Html.td({}, $T('PID')), - Html.td({}, Html.span({}, data.state ? data.pid : 'n/a'))); - - - var spool = Html.tr({}, Html.td({}, $T('Spooled commands')), - Html.td({}, data.spooled)); - var queue = Html.tr({}, Html.td({}, $T('Queued tasks')), - Html.td({}, data.waiting)); - var running = Html.tr({}, Html.td({}, $T('Running tasks')), - Html.td({}, data.running)); - var failed = Html.tr({}, Html.td({}, $T('Failed tasks')), - Html.td({}, data.failed)); - var finished = Html.tr({}, Html.td({}, $T('Finished tasks')), - Html.td({}, data.finished)); - - return Html.table({}, - Html.tbody({}, state, hostname, pid, spool, queue, running, failed, finished)); - } - }, - function(method) { - this.RemoteWidget(method, {}); - }); - -type('WaitingTaskTable', ['TaskTable'], - { - _getHeader: function() { - return Html.tr({}, - Html.th({},'ID'), - Html.th({},'Task type'), - Html.th({},'Scheduled execution'), - Html.th({},'Creation date'), - Html.th({}, 'Options')); - }, - - _deleteTask: function(task) { - this._actions.deleteTask(task); - }, - - _drawLineOptions: function(task) { - var self = this; - - return Html.div({}, Widget.link( - command( - function() { - self._deleteTask(task); - }, "Delete"))); - } - - }, - function(method) { - this.TaskTable(method, {}, 10); - }); - -type('SchedulerPanel', ['JLookupTabWidget'], - { - _notifyTabChange: function() { - // each time a tab changes, cancel the timeout events - // (otherwise we'd have a circus of AJAX requests) - if(this._currentWidget) { - clearTimeout(this._currentWidget._timeout); - } - }, - - _summary: function() { - return new SchedulerSummaryWidget('scheduler.summary'); - }, - - _running: function() { - return new TaskTable("scheduler.tasks.listRunning"); - }, - - _waiting: function() { - return new WaitingTaskTable("scheduler.tasks.listWaiting"); - }, - - _history: function() { - return new FinishedTaskTable("scheduler.tasks.listFinished", - {'number': 10}); - }, - - _failed: function() { - return new FailedTaskTable("scheduler.tasks.listFailed", - {'number': 10}); - }, - - _keepStatusWrapper: function(func) { - var self = this; - return function() { - self._currentWidget = func(); - return self._currentWidget.draw() - } - } - - }, function() { - this.JLookupTabWidget([ - [$T('Summary'), this._keepStatusWrapper(this._summary)], - [$T('Running Tasks'), this._keepStatusWrapper(this._running)], - [$T('Waiting queue'), this._keepStatusWrapper(this._waiting)], - [$T('Failed Tasks'), this._keepStatusWrapper(this._failed)], - [$T('History'), this._keepStatusWrapper(this._history)] - ]); - }); diff --git a/indico/modules/base.py b/indico/modules/base.py index c1be762c4..9a8a93dd8 100644 --- a/indico/modules/base.py +++ b/indico/modules/base.py @@ -25,7 +25,6 @@ behind to ensure the persistence. from MaKaC.common.ObjectHolders import ObjectHolder from MaKaC.errors import MaKaCError -from MaKaC.i18n import _ from persistent import Persistent @@ -45,13 +44,11 @@ class ModuleHolder( ObjectHolder ): from indico.modules import news from indico.modules import cssTpls from indico.modules import upcoming - from indico.modules import scheduler ModuleHolder._availableModules = { - news.NewsModule.id : news.NewsModule, - cssTpls.CssTplsModule.id : cssTpls.CssTplsModule, - upcoming.UpcomingEventsModule.id : upcoming.UpcomingEventsModule, - scheduler.SchedulerModule.id : scheduler.SchedulerModule + news.NewsModule.id: news.NewsModule, + cssTpls.CssTplsModule.id: cssTpls.CssTplsModule, + upcoming.UpcomingEventsModule.id: upcoming.UpcomingEventsModule, } def _newId( self ): diff --git a/indico/modules/scheduler/__init__.py b/indico/modules/scheduler/__init__.py deleted file mode 100644 index f827f0ed0..000000000 --- a/indico/modules/scheduler/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -""" -The ``scheduler`` module provides Indico with a scheduling API that allows specific jobs -(tasks to be run at given times, with a certain repeatibility, if needed). -""" - -class TaskDelayed(Exception): - def __init__(self, seconds): - self.delaySeconds = seconds - -from indico.modules.scheduler.module import SchedulerModule -from indico.modules.scheduler.server import Scheduler -from indico.modules.scheduler.client import Client -from indico.modules.scheduler.tasks import OneShotTask -from indico.modules.scheduler.tasks.periodic import PeriodicTask diff --git a/indico/modules/scheduler/base.py b/indico/modules/scheduler/base.py deleted file mode 100644 index 43c05204d..000000000 --- a/indico/modules/scheduler/base.py +++ /dev/null @@ -1,145 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -import time -import types -from ZODB.POSException import ConflictError - -from indico.util.date_time import nowutc - -TASK_STATUS_NONE, TASK_STATUS_SPOOLED, TASK_STATUS_QUEUED, \ -TASK_STATUS_RUNNING, TASK_STATUS_FAILED, TASK_STATUS_ABORTED, \ -TASK_STATUS_FINISHED, TASK_STATUS_TERMINATED = range(0,8) - - -def status(num): - return ['NN', 'SP', 'QD', 'RN', 'FA', 'AB', - 'FI', 'TD'][num] - - -CONFLICTERROR_MAX_RETRIES = 10 - - -class OperationManager(object): - """ - Takes care of synchronizing resources - """ - - def __init__(self, f): - self._f = f - - def __get__(self, obj, ownerClass=None): - return types.MethodType(self, obj) - - def __call__(self, zelf, *args, **kwargs): - # some magic introspection - logger = zelf._logger - dbi = zelf._dbi - sync = False - - logger.debug("START Critical section around %s" % self._f.__name__) - - for i in range(CONFLICTERROR_MAX_RETRIES): - if sync: - dbi.sync() - retValue = self._f(zelf, *args, **kwargs) - try: - dbi.commit() - except Exception, e: - logger.exception("Commit failed (%d)" % i) - if isinstance(e, ConflictError): - sync = True - else: - raise - else: - break - else: - logger.error("Commit failed %d consecutive times. " - "Something bad must be going on..." % - CONFLICTERROR_MAX_RETRIES) - - logger.debug("END Critical section") - - return retValue - - -## Time Sources - -class TimeSource(object): - """ - A class that represents a time source (a reference clock) - This abstraction may look a bit of an overkill, but it is very - useful for testing purposes - """ - - @classmethod - def get(cls): - return cls._source - - @classmethod - def set(cls, source): - cls._source = source - - def __init__(self): - super(TimeSource, self).__init__() - - def sleep(self, amount): - time.sleep(amount) - - def getCurrentTime(self): - """ - Returns the current datetime - """ - raise Exception('Undefined method! Should be overloaded.') - - -class UTCTimeSource(TimeSource): - - def getCurrentTime(self): - return nowutc() - -TimeSource._source = UTCTimeSource() - - -## Exceptions - -class SchedulerException(Exception): - pass - - -class TaskStillRunningException(SchedulerException): - def __init__(self, task): - SchedulerException.__init__(self, '%s is currently running' % task) - - -class TaskNotFoundException(SchedulerException): - pass - - -class TaskInconsistentStatusException(SchedulerException): - pass - - -class SchedulerQuitException(SchedulerException): - pass - - -class SchedulerUnknownOperationException(SchedulerException): - pass - - -class SchedulerConfigurationException(SchedulerException): - pass diff --git a/indico/modules/scheduler/client.py b/indico/modules/scheduler/client.py deleted file mode 100644 index 537d50af3..000000000 --- a/indico/modules/scheduler/client.py +++ /dev/null @@ -1,102 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -from indico.modules.scheduler import SchedulerModule, base -from indico.util.date_time import int_timestamp - -class Client(object): - - """ - :py:class:`~indico.modules.scheduler.Client` provices a transparent scheduler - client, that allows Indico client processes to interact with the Scheduler - without the need for a lot of code. - - It acts as a remote proxy. - """ - - def __init__(self): - super(Client, self).__init__() - self._schedMod = SchedulerModule.getDBInstance() - - def enqueue(self, task): - """ - Schedules a task for execution - """ - - return self._schedMod.spool('add', task) - - def dequeue(self, task): - """ - Schedules a task for deletion - """ - - return self._schedMod.spool('del', task) - - def moveTask(self, task, newDate): - oldTS = int_timestamp(task.getStartOn()) - task.setStartOn(newDate) - return self._schedMod.spool('change', (oldTS, task)) - - def shutdown(self, msg=""): - """ - Shuts down the scheduler. `msg` is an optional paramater that provides - an information message that will be written in the logs - """ - - return self._schedMod.spool('shutdown', msg) - - def clearSpool(self): - """ - Clears the spool, returning the number of removed elements - """ - return self._schedMod.clearSpool() - - def getSpool(self): - """ - Returns the spool - """ - return self._schedMod.getSpool() - - def getStatus(self): - """ - Returns status information (dictionary), containing the lengths (tasks) of: - * spool; - * waiting queue; - * running queue; - * finished task index; - * failed task index; - - As well as if the scheduler is running (`state`) - """ - - return self._schedMod.getStatus() - - def getTask(self, tid): - """ - Returns a :py:class:`task ` object, - given its task id - """ - - return self._schedMod.getTaskIndex()[tid] - - def startFailedTask(self, task): - """ - Starts a failed task - """ - - self._schedMod.moveTask(task, - base.TASK_STATUS_FAILED, - base.TASK_STATUS_QUEUED) diff --git a/indico/modules/scheduler/daemon_script.py b/indico/modules/scheduler/daemon_script.py deleted file mode 100644 index b7f2e518b..000000000 --- a/indico/modules/scheduler/daemon_script.py +++ /dev/null @@ -1,347 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -""" -This script starts an Indico Scheduler instance, forking it off as a background -process. -""" - -import argparse -import ConfigParser -import logging -import multiprocessing -import os -import socket -import sys -import time -import warnings -from logging.handlers import SMTPHandler - -from indico.modules.scheduler import Scheduler, SchedulerModule, Client, base -from indico.web.flask.app import make_app - -# legacy import -from indico.core.config import Config -from indico.core.db import DBMgr - - -class SchedulerApp(object): - - def __init__(self, args): - super(SchedulerApp, self).__init__() - self.args = args - config = Config.getInstance() - worker = config.getWorkerName() - - cp = ConfigParser.ConfigParser() - logging_conf_file = os.path.join(config.getConfigurationDir(), "logging.conf") - cp.read(logging_conf_file) - - if cp.has_option('handler_smtp', 'args'): - # get e-mail from logging config file - log_mail = eval(cp.get('handler_smtp', 'args'))[2] - else: - log_mail = config.getSupportEmail() - - self.mailer = SMTPHandler(config.getSmtpServer(), - 'scheduler@%s' % worker, - log_mail, - "[indico_scheduler] Problem at %s" % worker) - - self.mailer.setLevel(logging.ERROR) - - def run(self): - root_logger = logging.getLogger('') - root_logger.addHandler(self.mailer) - - logger = logging.getLogger('daemon') - config = {} - if Config.getInstance().getDebug(): - config['sleep_interval'] = 1 - try: - Scheduler(**config).run() - return_val = 0 - except base.SchedulerQuitException: - logger.info("Daemon shut down successfully") - return_val = 0 - except: - logger.exception("Daemon terminated for unknown reason ") - return_val = -1 - finally: - return return_val - - -def _setup(args): - - cfg = Config.getInstance() - - # logging setup - handler = logging.handlers.TimedRotatingFileHandler(os.path.join(cfg.getLogDir(), 'scheduler.log'), 'midnight') - handler.setFormatter( - logging.Formatter( - "%(asctime)s %(process)s %(name)s: %(levelname)-8s %(message)s")) - - if 'log' not in args.__dict__: - args.log = 'INFO' - - level = getattr(logging, args.log) - - root_logger = logging.getLogger('') - root_logger.addHandler(handler) - root_logger.setLevel(level) - - mp_logger = multiprocessing.get_logger() - mp_logger.setLevel(level) - mp_logger.addHandler(handler) - - -def _check_running(check_process=False): - - with DBMgr.getInstance().global_connection(): - status = Client().getStatus() - - if not check_process: - return status['state'] - - if status['pid'] is None: - return False - return os.path.isdir('/proc/{0}/'.format(status['pid'])) - - -def _start(args): - - _setup(args) - - running = _check_running() - - if not args.force and running: - raise Exception("The daemon seems to be already running (consider -f?)") - if hasattr(args, 'standalone') and args.standalone: - SchedulerApp(args).run() - else: - pid = os.fork() - if pid: - print pid - return - else: - DBMgr.setInstance(None) - SchedulerApp(args).run() - - return 0 - - -def _stop(args): - - _setup(args) - - running = _check_running() - - if not args.force and not running: - raise Exception("The daemon doesn't seem to be running (consider -f?)") - - dbi = DBMgr.getInstance() - dbi.startRequest() - c = Client() - c.shutdown(msg="Daemon script") - dbi.commit() - - print "Waiting for death confirmation... " - for i in range(0, 20): - if not c.getStatus()['state']: - break - else: - time.sleep(1) - dbi.sync() - else: - print "FAILED!" - - print "DONE!" - dbi.endRequest() - - -def _restart(args): - with DBMgr.getInstance().global_connection(): - status = Client().getStatus() - if status['hostname'] is not None and status['hostname'] != socket.getfqdn() and not args.force: - raise Exception('The daemon is running on another machine ({0[hostname]}) (consider -f?)'.format(status)) - - _stop(args) - _start(args) - - -def _check(args): - if not os.path.isdir('/proc'): - raise Exception('This command only works on systems that have /proc/') - - with DBMgr.getInstance().global_connection(): - status = Client().getStatus() - if status['hostname'] is not None and status['hostname'] != socket.getfqdn(): - print >>sys.stderr, 'The daemon is running on another machine ({0[hostname]})'.format(status) - sys.exit(2) - - db_running = _check_running(False) - os_running = _check_running(True) - - if not args.quiet: - print >>sys.stderr, 'Database status: running={1}, host={0[hostname]}, pid={0[pid]}'.format( - status, db_running) - print >>sys.stderr, 'Process status: running={0}'.format(os_running) - - if db_running and os_running: - print status['pid'] - sys.exit(0) - elif not db_running and not os_running: - sys.exit(1) - elif db_running and not os_running: - if not args.quiet: - print >>sys.stderr, 'Marking dead scheduler as not running' - SchedulerModule.getDBInstance().setSchedulerRunningStatus(False) - DBMgr.getInstance().commit() - sys.exit(1) - else: - print >>sys.stderr, 'Unexpected state! Process is running, but scheduler is not marked as running' - sys.exit(2) - - -def _show(args): - - dbi = DBMgr.getInstance() - - dbi.startRequest() - c = Client() - - if args.field == "status": - status = c.getStatus() - - if status['state']: - print 'Scheduler is currently running on {0[hostname]} with pid {0[pid]}'.format(status) - else: - print 'Scheduler is currently NOT running' - print """ -Spooled commands: %(spooled)s - -Tasks: - - Waiting: %(waiting)s - - Running: %(running)s - - Failed: %(failed)s - - Finished: %(finished)s -""" % status - elif args.field == "spool": - - for op, obj in c.getSpool(): - if op in ['add', 'del']: - print "%s %s" % (op, obj) - else: - print op - - dbi.endRequest() - - -def _cmd(args): - - dbi = DBMgr.getInstance() - - dbi.startRequest() - c = Client() - - if args.command == "clear_spool": - print "%s operations removed" % c.clearSpool() - - dbi.endRequest() - - -def _run(args): - _setup(args) - - formatter = logging.Formatter("%(asctime)s %(name)s - %(levelname)s %(filename)s:%(lineno)s: %(message)s") - - root = logging.getLogger('') - handler = logging.StreamHandler() - handler.setFormatter(formatter) - root.addHandler(handler) - - dbi = DBMgr.getInstance(max_disconnect_poll=40) - dbi.startRequest() - - sm = SchedulerModule.getDBInstance() - t = sm.getTaskById(args.taskid) - - t.plugLogger(logging.getLogger('console.run/%s' % args.taskid)) - - with make_app(True).app_context(): - t.run() - - dbi.endRequest() - - -def main(): - - parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__) - subparsers = parser.add_subparsers(help="the action to be performed") - - parser_start = subparsers.add_parser('start', help="start the daemon") - parser_stop = subparsers.add_parser('stop', help="stop the daemon") - parser_restart = subparsers.add_parser('restart', help="restart the daemon") - parser_check = subparsers.add_parser('check', help="check and sync status") - parser_show = subparsers.add_parser('show', help="show information") - parser_cmd = subparsers.add_parser('cmd', help="execute a command") - parser_run = subparsers.add_parser('run', help="run a task, from this process") - - parser.add_argument("-p", "--fork-processes", dest="fork_processes", - action="store_true", required=False, - help="obsolete, the scheduler always uses processes") - parser.add_argument("-f", "--force", dest="force", - action="store_const", const=True, - default=False, required=False, - help="ignores the information in the DB about scheduler status") - - parser_start.add_argument("-s", "--standalone", - action="store_const", const=True, default=False, required=False, - help="forces standalone mode - process doesn't go to background") - - parser_start.add_argument("--log", type=str, default="INFO", required=False, help="set different logging mode") - - parser_start.set_defaults(func=_start) - parser_stop.set_defaults(func=_stop) - parser_restart.set_defaults(func=_restart) - - parser_check.set_defaults(func=_check) - parser_check.add_argument('-q', '--quiet', dest='quiet', action='store_true', help='Suppress console output') - - parser_show.add_argument("field", choices=['status', 'spool'], type=str, help="information to be shown") - parser_show.set_defaults(func=_show) - - parser_cmd.add_argument("command", choices=['clear_spool'], type=str, help="command to be executed") - parser_cmd.set_defaults(func=_cmd) - - parser_run.add_argument("taskid", type=int, help="task to be executed (id)") - parser_run.set_defaults(func=_run) - - args = parser.parse_args() - if args.fork_processes: - warnings.warn('The scheduler always uses processes so -p/--fork-processes is not needed anymore', - DeprecationWarning, 2) - - try: - return args.func(args) - except Exception: - import traceback - traceback.print_exc() - return -1 - - -if __name__ == "__main__": - main() diff --git a/indico/modules/scheduler/fossils.py b/indico/modules/scheduler/fossils.py deleted file mode 100644 index b865fc110..000000000 --- a/indico/modules/scheduler/fossils.py +++ /dev/null @@ -1,69 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -""" -Fossils for tasks -""" - -from indico.util.fossilize import IFossil -from indico.util.fossilize.conversion import Conversion - -class ITaskFossil(IFossil): - """ - A fossil representing a scheduler task - """ - def getId(): - pass - - def getTypeId(): - pass - - def getStartOn(): - pass - getStartOn.convert = Conversion.datetime - - def getStartedOn(self): - pass - getStartedOn.convert = Conversion.datetime - - def getEndedOn(): - pass - getEndedOn.convert = Conversion.datetime - - def getCreatedOn(): - pass - getCreatedOn.convert = Conversion.datetime - - -class ITaskOccurrenceFossil(IFossil): - """ - A task occurrence - """ - - def getId(): - pass - - def getStartedOn(self): - pass - getStartedOn.convert = Conversion.datetime - - def getEndedOn(self): - pass - getEndedOn.convert = Conversion.datetime - - def getTask(self): - pass - getTask.result = ITaskFossil diff --git a/indico/modules/scheduler/module.py b/indico/modules/scheduler/module.py deleted file mode 100644 index 28bf47bd2..000000000 --- a/indico/modules/scheduler/module.py +++ /dev/null @@ -1,250 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - - -import logging -import os -import socket - -from BTrees.IOBTree import IOBTree -from BTrees.Length import Length - -from indico.modules import Module -from indico.modules.scheduler import base -from indico.util.struct.queue import PersistentWaitingQueue -from indico.util.date_time import int_timestamp -from indico.core.index import IOIndex, IIndexableByArbitraryDateTime - -from zc.queue import Queue - -class SchedulerModule(Module): - id = "scheduler" - - def __init__(self): - # logging.getLogger('scheduler') = logging.getLogger('scheduler') - # logging.getLogger('scheduler').warning('Creating incomingQueue and runningList..') - self._waitingQueue = PersistentWaitingQueue() - self._runningList = [] - - # Failed tasks (there is an object still in the DB) - self._failedIndex = IOIndex(IIndexableByArbitraryDateTime) - - # Finished tasks (no object data, just metadata) - self._finishedIndex = IOIndex(IIndexableByArbitraryDateTime) - - # Stores all tasks - self._taskIdx = IOBTree() - self._taskCounter = Length(0) - - # Is the scheduler running - self._schedulerStatus = False - self._hostname = None - self._pid = None - - # Temporary area where all the tasks stay before being - # added to the waiting list - self._taskSpool = Queue() - - def _assertTaskStatus(self, task, status): - """ - Confirm the status of this task - """ - - if task.status != status: - raise base.TaskInconsistentStatusException( - "%s status is not %s" % - (task, base.status(status))) - - if status == base.TASK_STATUS_RUNNING and \ - task not in self._runningList: - raise base.TaskInconsistentStatusException( - 'task %s was not found in the running task list' % task) - - # TODO: remaining elifs - - def _indexTask(self, task): - """ - Provide the task with an id and add it to the - task index - """ - - # give it a serial id - task.initialize(self._taskCounter(), base.TASK_STATUS_SPOOLED) - - # index it and increase the count - self._taskIdx[task.id] = task - self._taskCounter.change(1) - - logging.getLogger('scheduler').debug( - 'Added %s to index..' % task) - - - ## These are all interface methods, called by different modules - - def getStatus(self): - """ - Returns some basic info - """ - - return { - 'state': self._schedulerStatus, - 'hostname': getattr(self, '_hostname', None), - 'pid': getattr(self, '_pid', None), - 'waiting': len(self._waitingQueue), - 'running': len(self._runningList), - 'spooled': len(self._taskSpool), - 'failed': self._failedIndex._num_objs() , - 'finished': self._finishedIndex._num_objs() - } - - def getTaskById(self, taskId): - return self._taskIdx[taskId] - - def getSpool(self): - return self._taskSpool - - def clearSpool(self): - i = 0 - - try: - while(self._taskSpool.pull()): - i += 1 - except IndexError: - pass - - return i - - def spool(self, op, obj): - """ - Adds an 'instruction' to the spool, in the form (op, obj) - """ - - self._taskSpool.put((op, obj)) - - logging.getLogger('scheduler').debug( - 'Added instruction %s to spool..' % ((op, obj),)) - - return True - - def removeRunningTask(self, task): - """ - Remove a task from the running list - """ - try: - self._runningList.remove(task) - self._p_changed = True - except ValueError: - logging.getLogger('scheduler').exception("Problem removing running task: %s" % self._runningList) - - def moveTask(self, task, moveFrom, status, occurrence=None, nocheck=False): - """ - Move a task somewhere - """ - if not occurrence: - occurrence = task - - if not nocheck: - self._assertTaskStatus(task, moveFrom) - - if moveFrom == base.TASK_STATUS_RUNNING: - # actually remove it from list - self.removeRunningTask(task) - - elif moveFrom == base.TASK_STATUS_QUEUED: - idx_timestamp = int_timestamp(task.getStartOn()) - self._waitingQueue.dequeue(idx_timestamp, task) - elif moveFrom == base.TASK_STATUS_FAILED: - self._failedIndex.unindex_obj(task) - - # index it either in finished or failed - # (or queue it up again) - if status == base.TASK_STATUS_FINISHED: - self._finishedIndex.index_obj(occurrence) - elif status in [base.TASK_STATUS_FAILED, - base.TASK_STATUS_TERMINATED]: - self._failedIndex.index_obj(occurrence) - elif status == base.TASK_STATUS_QUEUED: - self.addTaskToWaitingQueue(occurrence) - - def changeTaskStartDate(self, oldTS, task): - - newTS = int_timestamp(task.getStartOn()) - - # enqueue-dequeue - try: - self._waitingQueue.dequeue(oldTS, task) - except: - logging.getLogger('scheduler').error( - "%s was supposed to be changed but it was not " - "found in the waiting queue!" % task) - return - - self._waitingQueue.enqueue(newTS, task) - - logging.getLogger('scheduler').info( - '%s moved from bin %s to %s...' % (task, oldTS, newTS)) - - def addTaskToWaitingQueue(self, task, index=False): - - if index: - self._indexTask(task) - - # get an int timestamp - if task.getStartOn(): - timestamp = int_timestamp(task.getStartOn()) - - self._waitingQueue.enqueue(timestamp, task) - - # make it "officially" queued - task.setStatus(base.TASK_STATUS_QUEUED) - - logging.getLogger('scheduler').debug( - 'Added %s to waitingQueue..' % task) - - def popNextWaitingTask(self): - return self._waitingQueue.pop() - - def peekNextWaitingTask(self): - return self._waitingQueue.peek() - - def removeWaitingTask(self, timestamp, task): - return self._waitingQueue.dequeue(timestamp, task) - - def getRunningList(self): - return self._runningList - - def getWaitingQueue(self): - return self._waitingQueue - - def getFailedIndex(self): - return self._failedIndex - - def getFinishedIndex(self): - return self._finishedIndex - - def getTaskIndex(self): - return self._taskIdx - - def setSchedulerRunningStatus(self, status): - self._schedulerStatus = status - self._hostname = socket.getfqdn() if status else None - self._pid = os.getpid() if status else None - - def addTaskToRunningList(self, task): - - logging.getLogger('scheduler').debug('Added task %s to runningList..' % task.id) - self._runningList.append(task) - self._p_changed = True diff --git a/indico/modules/scheduler/server.py b/indico/modules/scheduler/server.py deleted file mode 100644 index a4aa5aef1..000000000 --- a/indico/modules/scheduler/server.py +++ /dev/null @@ -1,483 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -import logging -import random - -from indico.core.db import DBMgr - - -from indico.modules.scheduler import SchedulerModule, base -from indico.modules.scheduler.tasks.periodic import PeriodicTask, TaskOccurrence -from indico.modules.scheduler.slave import ProcessWorker -from indico.util.date_time import int_timestamp - - -class Scheduler(object): - """ - A :py:class:`~indico.modules.scheduler.Scheduler` object provides a job scheduler - based on a waiting queue, that communicates with its clients through the database. - Things have been done in a way that the probability of conflict is minimized, and - operations are repeated in case one happens. - - The entry point of the process consists of a 'spooler' that periodically takes - tasks out of a `conflict-safe` FIFO (spool) and adds them to an ``IOBTree``-based - waiting queue. The waiting queue is then checked periodically for the next task, - and when the time comes the task is executed. - - Tasks are executed in different processes. - - The :py:class:`~indico.modules.scheduler.Client` class works as a transparent - remote proxy for this class. - """ - - __instance = None - - # configuration options - _options = { - # time to wait between cycles - 'sleep_interval': 10, - - # AWOL = Absent Without Leave - # [0.0, 1.0) probability that after a Scheduler tick it will check for AWOL - # tasks in the runningList the lower the number the lower the number of checks - 'awol_tasks_check_probability': 0.3, - - # Number of times to try to run a task before aborting (min 1) - 'task_max_tries': 5 - } - - def __init__(self, **config): - """ - config is a dictionary containing configuration parameters - """ - - self._readConfig(config) - - self._logger = logging.getLogger('scheduler') - - self._dbi = DBMgr.getInstance() - - self._dbi.startRequest() - self._schedModule = SchedulerModule.getDBInstance() - - self._runningWorkers = {} - - ## DB access - surrounded by commit retry cycle - - @base.OperationManager - def _db_popFromSpool(self): - """ - Get the the element at the top of the spool - """ - spool = self._schedModule.getSpool() - - try: - pair = spool.pull() - except IndexError: - pair = None - - return pair - - @base.OperationManager - def _db_setRunningStatus(self, value): - self._schedModule.setSchedulerRunningStatus(value) - - @base.OperationManager - def _db_moveTask(self, task, moveFrom, status, occurrence=None, - nocheck = False, setStatus = False): - self._schedModule.moveTask(task, moveFrom, status, - occurrence=occurrence, nocheck=nocheck) - if setStatus: - task.setStatus(status) - - @base.OperationManager - def _db_changeTaskStartDate(self, oldTS, task): - self._schedModule.changeTaskStartDate(oldTS, task) - - @base.OperationManager - def _db_addTaskToQueue(self, task, index = True): - """ - Submits a new task - """ - SchedulerModule.getDBInstance().addTaskToWaitingQueue(task, - index = index) - self._logger.info("Task %s queued for execution" % task) - - @base.OperationManager - def _db_setTaskRunning(self, timestamp, curTask): - # remove task from waiting list - self._schedModule.removeWaitingTask(timestamp, curTask) - - # mark the task as being in the running list - curTask.setOnRunningListSince(self._getCurrentDateTime()) - - # add it to the running list - self._schedModule.addTaskToRunningList(curTask) - - @base.OperationManager - def _db_notifyTaskStatus(self, task, status): - """ - Called by a task when it's done. If a task doesn't notify us - after AWOLThresold seconds we assume it went AWOL and - we notify the admins - """ - - if status == base.TASK_STATUS_FINISHED: - self._logger.info('Task %s says it has finished...' % task) - elif status == base.TASK_STATUS_FAILED: - self._logger.error('Task %s says it has failed..' % task) - else: - raise Exception('Impossible task/slave state') - - # clean up the mess - task.tearDown() - - # task forcefully terminated? - if task.getStatus() == base.TASK_STATUS_TERMINATED: - # well, we have a final status already, and the task is - # by now properly indexed - self._logger.warning("%s finished after being terminated, with status %s" % (task, base.status(status))) - - # If the task has been left running - if task in self._schedModule.getRunningList(): - self._schedModule.removeRunningTask(task) - # We end here! - return - # else... - - task.setStatus(status) - - # if it's a periodic task, do some extra things - if isinstance(task, PeriodicTask): - # prepare an "occurrence" object - - occurrence = TaskOccurrence(task) - - task.addOccurrence(occurrence) - - # if the task is supposed to be run again - if task.shouldComeBack(): - - # reset "ended on" - task.setEndedOn(None) - - # calculate next occurrence - task.setNextOccurrence() - - # move the occurrence to the correct place - self._schedModule.moveTask(task, - base.TASK_STATUS_RUNNING, - status, - occurrence = occurrence, - nocheck = True) - - # do not index the task again - self._db_addTaskToQueue(task, index = False) - - self._logger.info('Task %s rescheduled for %s' % - (task, task.getStartOn())) - - else: - # move the task to the correct place - self._schedModule.moveTask(task, - base.TASK_STATUS_RUNNING, - status, - nocheck = True) - #### - - def _getCurrentDateTime(self): - return base.TimeSource.get().getCurrentTime() - - def _readConfig(self, config): - """ - Reads the config dictionary and verifies the parameters are ok. - If it's the case, it sets them. - """ - - class DummyType(object): - pass - - self._config = DummyType() - self._config.__dict__ = dict(Scheduler._options) - - for name, value in config.iteritems(): - if name not in Scheduler._options: - raise base.SchedulerConfigurationException( - 'Option %s is not supported!') - else: - setattr(self._config, name, value) - - def _relaunchRunningListItems(self): - # During startup any item in runningList will have died prematurely - # (except for AWOL tasks), so we relaunch them. - - for task in self._schedModule.getRunningList(): - self._logger.warning('Task %s found in runningList on startup. Relaunching..' % task.id) - task.tearDown() - try: - self._db_moveTask(task, - base.TASK_STATUS_RUNNING, - base.TASK_STATUS_QUEUED) - except base.TaskInconsistentStatusException: - self._logger.exception("Problem relaunching task %s - " - "setting it as failed" % task) - self._db_moveTask(task, - base.TASK_STATUS_RUNNING, - base.TASK_STATUS_FAILED, - nocheck=True) - - self._dbi.commit() - - def _iterateTasks(self): - """ - Iterate over all the tasks in the waiting queue, blocking - in case there are none - """ - while True: - - currentTimestamp = int_timestamp(self._getCurrentDateTime()) - - # this will basically abort the transaction, so, make sure - # everything important before this was committed - self._readFromDb() - - # print a status message (only in debug mode) - self._printStatus(mode = 'debug') - - # move the tasks in the spool to the waiting queue - try: - self._processSpool() - finally: - # this `finally` makes sure finished tasks are handled - # even if a shutdown order is sent - - # process tasks that have finished meanwhile - # (tasks have been running in different processes, so, the sync - # thas was done above won't hurt) - self._checkFinishedTasks() - - # get the next task in queue - res = self._schedModule.peekNextWaitingTask() - - if res: - # it's actually a timestamp, task tuple - nextTS, nextTask = res - - self._logger.debug((nextTS, currentTimestamp, self._getCurrentDateTime())) - - # if it's time to execute the task - if (nextTS <= currentTimestamp): - yield nextTS, nextTask - - # don't sleep, jump back to the beginning of the cycle - continue - - # we assume the task cycle has been completed (if there was one) - # so, we can just reset the db status (sync) - self._readFromDb() - - # we also check AWOL tasks from time to time - if random.random() < self._config.awol_tasks_check_probability: - self._checkAWOLTasks() - - # if we get here, we have nothing else to do... - self._sleep('Nothing to do. Sleeping for %d secs...' % - self._config.sleep_interval) - - # read from DB again after sleeping - self._readFromDb() - - def _checkFinishedTasks(self): - """ - Check if there are any tasks that have finished recently, and - need to be moved to the correct places - """ - - self._logger.debug("Checking finished tasks") - - for taskId, process in self._runningWorkers.items(): - - # the process is dead? good, it's finished - if not process.isAlive(): - task = self._schedModule._taskIdx[taskId] - - # let's check if it was successful or not - # and write it in the db - - if process.getResult() == True: - self._db_notifyTaskStatus(task, base.TASK_STATUS_FINISHED) - elif process.getResult() == False: - self._db_notifyTaskStatus(task, base.TASK_STATUS_FAILED) - else: - # something weird happened - self._logger.warning("task %s finished, but the return value " - "was %s" % - (task, process.getResult())) - - # delete the entry from the dictionary - del self._runningWorkers[taskId] - process.join() - - def _printStatus(self, mode='debug'): - """ - Print an informative message with some run-time parameters - """ - status = self._schedModule.getStatus() - - if mode == 'debug': - func = self._logger.debug - else: - func = self._logger.info - - func("Status: waiting: [%(waiting)d] | " - "running: [%(running)d] | " - "spooled: [%(spooled)d]" % status) - - def run(self): - """ - Main loop, should only be called from scheduler - """ - - try: - self._db_setRunningStatus(True) - - self._logger.info('**** Scheduler started') - self._printStatus() - - # relaunch items that were running in the last session - self._relaunchRunningListItems() - - - # iterate over the tasks in the waiting queue - # that should be running - for timestamp, curTask in self._iterateTasks(): - # execute the "task cycle" for each new task - self._taskCycle(timestamp, curTask) - - except base.SchedulerQuitException, e: - self._logger.warning('Scheduler was shut down: %s' % e) - return 0 - except: - self._logger.exception('Unexpected error') - raise - finally: - self._logger.info('Setting running status as False') - - self._db_setRunningStatus(False) - - def _taskCycle(self, timestamp, curTask): - - self._db_setTaskRunning(timestamp, curTask) - - # Start a worker subprocess and add it to the worker dict - delay = int_timestamp(self._getCurrentDateTime()) - timestamp - self._runningWorkers[curTask.id] = ProcessWorker(curTask.id, self._config, delay) - self._runningWorkers[curTask.id].start() - - def _processSpool(self): - """ - Adds all the tasks in the spool to the waiting list - """ - # pop the first one - pair = self._db_popFromSpool() - - while pair: - try: - op, obj = pair - - if op == 'add': - self._db_addTaskToQueue(obj) - elif op == 'change': - # pass oldTS and task - self._db_changeTaskStartDate(*obj) - elif op == 'del': - self._db_deleteTaskFromQueue(obj) - elif op == 'shutdown': - raise base.SchedulerQuitException(obj) - else: - raise base.SchedulerUnknownOperationException(op) - except Exception, e: - self._logger.exception('Exception in task %s: %s' % (obj, e)) - raise - pair = self._db_popFromSpool() - - def _sleep(self, msg): - self._logger.debug(msg) - base.TimeSource.get().sleep(self._config.sleep_interval) - - def _readFromDb(self): - self._logger.debug('_readFromDb()..') - self._dbi.sync() - - def _abortDb(self): - self._logger.debug('_abortDb()..') - self._dbi.abort() - - def _db_deleteTaskFromQueue(self, task): - """ - """ - - if isinstance(task, PeriodicTask): - # don't let periodic tasks respawn - task.dontComeBack() - self._dbi.commit() - - oldStatus = task.getStatus() - - self._logger.info("dequeueing %s from status %s" % \ - (task, base.status(oldStatus))) - - # it doesn't matter if the task is already running again, - # get rid of it - self._db_moveTask(task, - oldStatus, - base.TASK_STATUS_NONE) - - - def _checkAWOLTasks(self): - - self._logger.debug('Checking AWOL tasks...') - - for task in self._schedModule.getRunningList()[:]: - if not task.getOnRunningListSince(): - self._logger.warning("Task %s is in the runningList but has no " - "onRunningListSince value! Removing from runningList " - "and relaunching..." % (task.id)) - task.tearDown() - - # relaunch it - self._db_moveTask( - task, - base.TASK_STATUS_RUNNING, - base.TASK_STATUS_QUEUED) - else: - runForSecs = int_timestamp(self._getCurrentDateTime()) - \ - int_timestamp(task.getOnRunningListSince()) - - if runForSecs > task._AWOLThresold: - self._logger.warning("Task %s has been running for %d secs. " - "Assuming it has died abnormally and forcibly " - "calling its tearDown()..." % (task.id, runForSecs)) - task.tearDown() - - self._db_moveTask( - task, - base.TASK_STATUS_RUNNING, - base.TASK_STATUS_TERMINATED, - setStatus=True) - - self._logger.info("Task %s terminated." % (task.id)) diff --git a/indico/modules/scheduler/slave.py b/indico/modules/scheduler/slave.py deleted file mode 100644 index fba391f3a..000000000 --- a/indico/modules/scheduler/slave.py +++ /dev/null @@ -1,160 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -import time -import logging -import multiprocessing -import threading -import os - -import transaction -from ZEO.Exceptions import ClientDisconnected -from ZODB.POSException import ConflictError - -from indico.core.db import DBMgr -from MaKaC.common.mail import GenericMailer -from indico.modules.scheduler import SchedulerModule, TaskDelayed -from indico.util import fossilize -from indico.core.db.util import flush_after_commit_queue - - -class _Worker(object): - - def __init__(self, taskId, configData, delay): - super(_Worker, self).__init__() - - self._logger = logging.getLogger('worker/%s' % taskId) - self._taskId = taskId - self._config = configData - self._executionDelay = delay - self._app = None - - def _prepare(self): - """ - This acts as a second 'constructor', that is executed in the - context of the thread (due to database reasons) - """ - self._prepareDB() - self._dbi.startRequest() - self._delayed = False - - with self._dbi.transaction(): - schedMod = SchedulerModule.getDBInstance() - self._task = schedMod.getTaskById(self._taskId) - - # open a logging channel - self._task.plugLogger(self._logger) - - # XXX: potentially conflict-prone - with self._dbi.transaction(sync=True): - self._task.prepare() - - def _prepare_retry(self): - self._dbi.abort() - self._dbi.sync() - flush_after_commit_queue(False) - GenericMailer.flushQueue(False) - self._task.plugLogger(self._logger) - - def _prepareDB(self): - self._dbi = DBMgr.getInstance() - - def _process_task(self): - with self._dbi.transaction(): - with self._app.app_context(): - fossilize.clearCache() - self._task.start(self._executionDelay) - transaction.commit() - - def run(self): - # Import it here to avoid circular import - from indico.web.flask.app import make_app - self._app = make_app(True) - - self._prepare() - self._logger.info('Running task {}.. (delay: {})'.format(self._task.id, self._executionDelay)) - - try: - for i, retry in enumerate(transaction.attempts(self._config.task_max_tries)): - with retry: - self._logger.info('Task attempt #{}'.format(i)) - if i > 0: - self._prepare_retry() - try: - self._process_task() - break - except ConflictError: - transaction.abort() - except ClientDisconnected: - self._logger.warning("Retrying for the {}th time in {} secs..".format(i + 1, i * 10)) - transaction.abort() - time.sleep(i * 10) - except TaskDelayed, e: - self._logger.info("{} delayed by {} seconds".format(self._task, e.delaySeconds)) - self._delayed = True - self._executionDelay = 0 - time.sleep(e.delaySeconds) - flush_after_commit_queue(True) - GenericMailer.flushQueue(True) - - except Exception as e: - self._logger.exception("{} failed with exception '{}'".format(self._task, e)) - transaction.abort() - - finally: - self._logger.info('{} ended on: {}'.format(self._task, self._task.endedOn)) - # task successfully finished - if self._task.endedOn: - with self._dbi.transaction(): - self._setResult(True) - if i > (1 + int(self._delayed)): - self._logger.warning("{} failed {} times before " - "finishing correctly".format(self._task, i - int(self._delayed) - 1)) - # task failed - else: - with self._dbi.transaction(): - self._setResult(False) - self._logger.error("{} failed too many ({}) times. Aborting its execution..".format(self._task, i)) - self._logger.info("exiting") - - self._dbi.endRequest() - - -class ProcessWorker(_Worker, multiprocessing.Process): - - def __init__(self, tid, configData, delay): - super(ProcessWorker, self).__init__(tid, configData, delay) - self._result = multiprocessing.Value('i', 0) - - def _prepareDB(self): - # since the DBMgr instance will be replicated across objects, - # we just set it as None for this one. - - # first, store the server address - this wouldn't normally be needed, - # but the tests won't work otherwise (as the DB is _not_ the default one) - hostname, port = DBMgr._instances[os.getppid()]._db.storage._addr - - DBMgr.setInstance(DBMgr(hostname, port)) - self._dbi = DBMgr.getInstance() - - def isAlive(self): - return self.is_alive() - - def _setResult(self, res): - self._result.value = res - - def getResult(self): - return self._result.value diff --git a/indico/modules/scheduler/tasks/__init__.py b/indico/modules/scheduler/tasks/__init__.py deleted file mode 100644 index 1d5f90652..000000000 --- a/indico/modules/scheduler/tasks/__init__.py +++ /dev/null @@ -1,365 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -import logging -import urllib - -import zope.interface -from persistent import Persistent -from flask import render_template - -from indico.util.fossilize import fossilizes, Fossilizable -from indico.util.date_time import int_timestamp, format_datetime -from indico.modules.scheduler.fossils import ITaskFossil -from indico.modules.scheduler import base -from indico.modules.users.legacy import AvatarUserWrapper -from indico.core.db import db -from indico.core.db.sqlalchemy.util.session import update_session_options -from indico.core.index import IUniqueIdProvider, IIndexableByArbitraryDateTime -from indico.core.config import Config -from indico.web.flask.util import url_for -from indico.util.i18n import _ - - -# Defines base classes for tasks, and some specific tasks as well - - -class TimedEvent(Persistent, Fossilizable): - - zope.interface.implements(IUniqueIdProvider, - IIndexableByArbitraryDateTime) - - def getIndexingDateTime(self): - return int_timestamp(self._getCurrentDateTime()) - - def _getCurrentDateTime(self): - # just get current date/time - return base.TimeSource.get().getCurrentTime() - - def __conform__(self, proto): - - if proto == IIndexableByArbitraryDateTime: - return self.getIndexingDateTime() - else: - return None - - -class BaseTask(TimedEvent): - """ - A base class for tasks. - `expiryDate` is the last point in time when the task can run. A task will - refuse to run if current time is past `expiryDate` - """ - - DISABLE_ZODB_HOOK = False - - fossilizes(ITaskFossil) - - # seconds to consider a task AWOL - _AWOLThresold = 6000 - - def __init__(self, expiryDate=None): - self.createdOn = self._getCurrentDateTime() - self.expiryDate = expiryDate - self.typeId = self.__class__.__name__ - self.id = None - self.reset() - self.status = 0 - - self.startedOn = None - self.endedOn = None - - def __cmp__(self, obj): - from indico.modules.scheduler.tasks.periodic import TaskOccurrence - if obj is None: - return 1 - elif isinstance(obj, TaskOccurrence): - task_cmp = cmp(self, obj.getTask()) - if task_cmp == 0: - return -1 - else: - return task_cmp - # This condition will mostlike never happen - elif not isinstance(obj, BaseTask) or (self.id == obj.id and self.id is None): - return cmp(hash(self), hash(obj)) - # This is the 'default case' where we are comparing 2 Tasks - else: - return cmp(self.id, obj.id) - - def reset(self): - '''Resets a task to its state before being run''' - - self.running = False - self.onRunningListSince = None - - # Time methods - - def getCreatedOn(self): - return self.createdOn - - def getEndedOn(self): - return self.endedOn - - def setEndedOn(self, dateTime): - self.endedOn = dateTime - - def getStartedOn(self): - return self.startedOn - - def setStartedOn(self, dateTime): - self.startedOn = dateTime - - def setOnRunningListSince(self, sometime): - self.onRunningListSince = sometime - self._p_changed = 1 - - def getOnRunningListSince(self): - return self.onRunningListSince - - def setStatus(self, newstatus): - self.getLogger().info("%s set status %s" % (self, base.status(newstatus))) - self.status = newstatus - - def getStatus(self): - return self.status - - def getId(self): - return self.id - - def getUniqueId(self): - return "task%s" % self.id - - def getTypeId(self): - return self.typeId - - def initialize(self, newid, newstatus): - self.id = newid - self.setStatus(newstatus) - - def plugLogger(self, logger): - self._v_logger = logger - - def getLogger(self): - if not hasattr(self, '_v_logger') or not self._v_logger: - self._v_logger = logging.getLogger('task/%s' % self.typeId) - return self._v_logger - - def prepare(self): - """ - This information will be saved regardless of the task being repeated or not - """ - - curTime = self._getCurrentDateTime() - tsDiff = int_timestamp(self.getStartOn()) - int_timestamp(curTime) - - if tsDiff > 0: - self.getLogger().debug('Task %s will wait for some time. (%s) > (%s)' % \ - (self.id, self.getStartOn(), curTime)) - base.TimeSource.get().sleep(tsDiff) - - if self.expiryDate and curTime > self.expiryDate: - self.getLogger().warning( - 'Task %s will not be executed, expiryDate (%s) < current time (%s)' % \ - (self.id, self.expiryDate, curTime)) - return False - - self.startedOn = curTime - self.running = True - self.setStatus(base.TASK_STATUS_RUNNING) - - def start(self, delay): - if self.DISABLE_ZODB_HOOK: - update_session_options(db) - self._executionDelay = delay - try: - self.run() - self.endedOn = self._getCurrentDateTime() - finally: - self.running = False - - def tearDown(self): - """ - If a task needs to do something once it has run and been removed - from runningList, overload this method - """ - pass - - def __str__(self): - return "[%s:%s|%s]" % (self.typeId, self.id, - base.status(self.status)) - - -class OneShotTask(BaseTask): - """ - Tasks that are executed only once - """ - - def __init__(self, startDateTime, expiryDate = None): - super(OneShotTask, self).__init__(expiryDate = expiryDate) - self.startDateTime = startDateTime - - def getStartOn(self): - return self.startDateTime - - def setStartOn(self, newtime): - self.startDateTime = newtime - - def suicide(self): - self.setStatus(base.TASK_STATUS_TERMINATED) - self.setEndedOn(self._getCurrentDateTime()) - - -class SendMailTask(OneShotTask): - """ - """ - def __init__(self, startDateTime): - super(SendMailTask, self).__init__(startDateTime) - self.fromAddr = "" - self.toAddr = [] - self.toUser = [] - self.ccAddr = [] - self.subject = "" - self.text = "" - self.smtpServer = Config.getInstance().getSmtpServer() - - def _prepare(self, check): - """ - Overloaded by descendants - """ - - def run(self, check=True): - import smtplib - from MaKaC.webinterface.mail import GenericMailer, GenericNotification - - # prepare the mail - send = self._prepare(check=check) - - # _prepare decided we shouldn't send the mail? - if not send: - return - - # just in case some ill-behaved code generates empty addresses - addrs = list(smtplib.quoteaddr(x) for x in self.toAddr if x) - ccaddrs = list(smtplib.quoteaddr(x) for x in self.ccAddr if x) - - if len(addrs) + len(ccaddrs) == 0: - self.getLogger().warning("Attention: no recipients, mail won't be sent") - else: - self.getLogger().info("Sending mail To: %s, CC: %s" % (addrs, ccaddrs)) - - for user in self.toUser: - addrs.append(smtplib.quoteaddr(user.getEmail())) - - if addrs or ccaddrs: - GenericMailer.send(GenericNotification({"fromAddr": self.fromAddr, - "toList": addrs, - "ccList": ccaddrs, - "subject": self.subject, - "body": self.text })) - - def setFromAddr(self, addr): - self.fromAddr = addr - self._p_changed = 1 - - def getFromAddr(self): - return self.fromAddr - - def addToAddr(self, addr): - if not addr in self.toAddr: - self.toAddr.append(addr) - self._p_changed = 1 - - def addCcAddr(self, addr): - if not addr in self.ccAddr: - self.ccAddr.append(addr) - self._p_changed = 1 - - def removeToAddr(self, addr): - if addr in self.toAddr: - self.toAddr.remove(addr) - self._p_changed = 1 - - def setToAddrList(self, addrList): - """Params: addrList -- addresses of type : list of str.""" - self.toAddr = addrList - self._p_changed = 1 - - def getToAddrList(self): - return self.toAddr - - def setCcAddrList(self, addrList): - """Params: addrList -- addresses of type : list of str.""" - self.ccAddr = addrList - self._p_changed = 1 - - def getCcAddrList(self): - return self.ccAddr - - def addToUser(self, user): - if not user in self.toUser: - self.toUser.append(user) - self._p_changed = 1 - - def removeToUser(self, user): - if user in self.toUser: - self.toUser.remove(user) - self._p_changed = 1 - - def getToUserList(self): - return self.toUser - - def setSubject(self, subject): - self.subject = subject - - def getSubject(self): - return self.subject - - def setText(self, text): - self.text = text - - def getText(self): - return self.text - - -class HTTPTask(OneShotTask): - def __init__(self, url, data=None): - super(HTTPTask, self).__init__(base.TimeSource.get().getCurrentTime()) - self._url = url - self._data = data - - def run(self): - method = 'POST' if self._data is not None else 'GET' - self.getLogger().info('Executing HTTP task: %s %s' % (method, self._url)) - data = urllib.urlencode(self._data) if self._data is not None else None - req = urllib.urlopen(self._url, data) - req.close() - - -class SampleOneShotTask(OneShotTask): - def run(self): - self.getLogger().debug('Now i shall sleeeeeeeep!') - base.TimeSource.get().sleep(1) - self.getLogger().debug('%s executed' % self.__class__.__name__) - - -class DeletedTask(BaseTask): - """ZODB migration class to avoid broken objects for deleted tasks""" - - def getStartOn(self): - try: - return self._nextOccurrence - except AttributeError: - return self.startDateTime diff --git a/indico/modules/scheduler/tasks/periodic.py b/indico/modules/scheduler/tasks/periodic.py deleted file mode 100644 index a9b6cf628..000000000 --- a/indico/modules/scheduler/tasks/periodic.py +++ /dev/null @@ -1,164 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -from dateutil import rrule -from datetime import timedelta - -from BTrees.IOBTree import IOBTree - -from indico.util.fossilize import fossilizes -from indico.modules.scheduler import base -from indico.modules.scheduler.tasks import BaseTask, TimedEvent -from indico.modules.scheduler.fossils import ITaskOccurrenceFossil - - -class PeriodicTask(BaseTask): - """ - Tasks that should be executed at regular intervals - """ - - def __init__(self, frequency, **kwargs): - """ - :param frequency: a valid dateutil frequency specifier (DAILY, HOURLY, etc...) - """ - super(PeriodicTask, self).__init__() - - self._nextOccurrence = None - self._lastFinishedOn = None - self._occurrences = IOBTree() - self._occurrenceCount = 0 - self._repeat = True - - if 'dtstart' not in kwargs: - kwargs['dtstart'] = self._getCurrentDateTime() - - self._rule = rrule.rrule( - frequency, - **kwargs - ) - - self._nextOccurrence = self._rule.after( - kwargs['dtstart'] - timedelta(seconds=1), - inc=True) - - def start(self, delay): - super(PeriodicTask, self).start(delay) - - def tearDown(self): - super(PeriodicTask, self).tearDown() - - def setNextOccurrence(self, dateAfter=None): - - if not self._nextOccurrence: - # if it's None already, it means there's no "future" - return - - if not dateAfter: - dateAfter = self._getCurrentDateTime() - - # find next date after - nextOcc = self._rule.after(max(self._nextOccurrence, dateAfter), - inc=False) - - # repeat process till a satisfactory date is found - # or there is nothing left to check - #while nextOcc and nextOcc < dateAfter: - # nextOcc = self._rule.after(nextOcc, - # inc = False) - - self._nextOccurrence = nextOcc - return nextOcc - - def getStartOn(self): - return self._nextOccurrence - - def getLastFinishedOn(self): - return self._lastFinishedOn - - def addOccurrence(self, occurrence): - occurrence.setId(self._occurrenceCount) - self._occurrences[self._occurrenceCount] = occurrence - self._occurrenceCount += 1 - - def dontComeBack(self): - self._repeat = False - - def shouldComeBack(self): - return self._repeat - - -class PeriodicUniqueTask(PeriodicTask): - """ - Singleton periodic tasks: no two or more PeriodicUniqueTask of this - class will be queued or running at the same time (TODO) - """ - # TODO: implement this - - -class TaskOccurrence(TimedEvent): - """ - Wraps around a PeriodicTask object, and represents an occurrence of this task - """ - - fossilizes(ITaskOccurrenceFossil) - - def __init__(self, task): - self._task = task - self._startedOn = task.getStartedOn() - self._endedOn = task.getEndedOn() - self._id = None - - def __cmp__(self, obj): - if obj is None: - return 1 - elif isinstance(obj, BaseTask): - task_cmp = cmp(self.getTask(), obj) - if task_cmp == 0: - return 1 - else: - return task_cmp - elif not isinstance(obj, TaskOccurrence) or (self._id == obj._id and self._id is None): - return cmp(hash(self), hash(obj)) - - occ_task_cmp = cmp(self.getTask(), obj.getTask()) - if occ_task_cmp == 0: - return cmp(self.getId(), obj.getId()) - else: - return occ_task_cmp - - def getId(self): - return self._id - - def getUniqueId(self): - return "%s:%s" % (self._task.getUniqueId(), self._id) - - def setId(self, occId): - self._id = occId - - def getStartedOn(self): - return self._startedOn - - def getEndedOn(self): - return self._endedOn - - def getTask(self): - return self._task - - -class SamplePeriodicTask(PeriodicTask): - def run(self): - base.TimeSource.get().sleep(1) - self.getLogger().debug('%s executed' % self.__class__.__name__) diff --git a/indico/util/counter.py b/indico/util/counter.py deleted file mode 100644 index 0f2b2e8db..000000000 --- a/indico/util/counter.py +++ /dev/null @@ -1,193 +0,0 @@ -from operator import itemgetter -from heapq import nlargest -from itertools import repeat, ifilter - - -# from http://code.activestate.com/recipes/576611-counter-class/ -# remove once we drop python 2.6 support - -class Counter(dict): - '''Dict subclass for counting hashable objects. Sometimes called a bag - or multiset. Elements are stored as dictionary keys and their counts - are stored as dictionary values. - - >>> Counter('zyzygy') - Counter({'y': 3, 'z': 2, 'g': 1}) - - ''' - - def __init__(self, iterable=None, **kwds): - '''Create a new, empty Counter object. And if given, count elements - from an input iterable. Or, initialize the count from another mapping - of elements to their counts. - - >>> c = Counter() # a new, empty counter - >>> c = Counter('gallahad') # a new counter from an iterable - >>> c = Counter({'a': 4, 'b': 2}) # a new counter from a mapping - >>> c = Counter(a=4, b=2) # a new counter from keyword args - - ''' - self.update(iterable, **kwds) - - def __missing__(self, key): - return 0 - - def most_common(self, n=None): - '''List the n most common elements and their counts from the most - common to the least. If n is None, then list all element counts. - - >>> Counter('abracadabra').most_common(3) - [('a', 5), ('r', 2), ('b', 2)] - - ''' - if n is None: - return sorted(self.iteritems(), key=itemgetter(1), reverse=True) - return nlargest(n, self.iteritems(), key=itemgetter(1)) - - def elements(self): - '''Iterator over elements repeating each as many times as its count. - - >>> c = Counter('ABCABC') - >>> sorted(c.elements()) - ['A', 'A', 'B', 'B', 'C', 'C'] - - If an element's count has been set to zero or is a negative number, - elements() will ignore it. - - ''' - for elem, count in self.iteritems(): - for _ in repeat(None, count): - yield elem - - # Override dict methods where the meaning changes for Counter objects. - - @classmethod - def fromkeys(cls, iterable, v=None): - raise NotImplementedError( - 'Counter.fromkeys() is undefined. Use Counter(iterable) instead.') - - def update(self, iterable=None, **kwds): - '''Like dict.update() but add counts instead of replacing them. - - Source can be an iterable, a dictionary, or another Counter instance. - - >>> c = Counter('which') - >>> c.update('witch') # add elements from another iterable - >>> d = Counter('watch') - >>> c.update(d) # add elements from another counter - >>> c['h'] # four 'h' in which, witch, and watch - 4 - - ''' - if iterable is not None: - if hasattr(iterable, 'iteritems'): - if self: - self_get = self.get - for elem, count in iterable.iteritems(): - self[elem] = self_get(elem, 0) + count - else: - dict.update(self, iterable) # fast path when counter is empty - else: - self_get = self.get - for elem in iterable: - self[elem] = self_get(elem, 0) + 1 - if kwds: - self.update(kwds) - - def copy(self): - 'Like dict.copy() but returns a Counter instance instead of a dict.' - return Counter(self) - - def __delitem__(self, elem): - 'Like dict.__delitem__() but does not raise KeyError for missing values.' - if elem in self: - dict.__delitem__(self, elem) - - def __repr__(self): - if not self: - return '%s()' % self.__class__.__name__ - items = ', '.join(map('%r: %r'.__mod__, self.most_common())) - return '%s({%s})' % (self.__class__.__name__, items) - - # Multiset-style mathematical operations discussed in: - # Knuth TAOCP Volume II section 4.6.3 exercise 19 - # and at http://en.wikipedia.org/wiki/Multiset - # - # Outputs guaranteed to only include positive counts. - # - # To strip negative and zero counts, add-in an empty counter: - # c += Counter() - - def __add__(self, other): - '''Add counts from two counters. - - >>> Counter('abbb') + Counter('bcc') - Counter({'b': 4, 'c': 2, 'a': 1}) - - - ''' - if not isinstance(other, Counter): - return NotImplemented - result = Counter() - for elem in set(self) | set(other): - newcount = self[elem] + other[elem] - if newcount > 0: - result[elem] = newcount - return result - - def __sub__(self, other): - ''' Subtract count, but keep only results with positive counts. - - >>> Counter('abbbc') - Counter('bccd') - Counter({'b': 2, 'a': 1}) - - ''' - if not isinstance(other, Counter): - return NotImplemented - result = Counter() - for elem in set(self) | set(other): - newcount = self[elem] - other[elem] - if newcount > 0: - result[elem] = newcount - return result - - def __or__(self, other): - '''Union is the maximum of value in either of the input counters. - - >>> Counter('abbb') | Counter('bcc') - Counter({'b': 3, 'c': 2, 'a': 1}) - - ''' - if not isinstance(other, Counter): - return NotImplemented - _max = max - result = Counter() - for elem in set(self) | set(other): - newcount = _max(self[elem], other[elem]) - if newcount > 0: - result[elem] = newcount - return result - - def __and__(self, other): - ''' Intersection is the minimum of corresponding counts. - - >>> Counter('abbb') & Counter('bcc') - Counter({'b': 1}) - - ''' - if not isinstance(other, Counter): - return NotImplemented - _min = min - result = Counter() - if len(self) < len(other): - self, other = other, self - for elem in ifilter(self.__contains__, other): - newcount = _min(self[elem], other[elem]) - if newcount > 0: - result[elem] = newcount - return result - - -if __name__ == '__main__': - import doctest - print doctest.testmod() diff --git a/indico/util/struct/queue.py b/indico/util/struct/queue.py deleted file mode 100644 index 268e5715a..000000000 --- a/indico/util/struct/queue.py +++ /dev/null @@ -1,161 +0,0 @@ -# This file is part of Indico. -# Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN). -# -# Indico is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 3 of the -# License, or (at your option) any later version. -# -# Indico is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Indico; if not, see . - -""" -Queue-style data structures -""" - -from BTrees.IOBTree import IOBTree -from BTrees.OOBTree import OOTreeSet -from BTrees.Length import Length -from persistent import Persistent - -class DuplicateElementException(Exception): - """ - Tried to insert the same element twice in the queue - """ - - -class PersistentWaitingQueue(Persistent): - """ - A Waiting queue, implemented using a map structure (BTree...) - It is persistent, but very vulnerable to conflicts. This is due to the - fact that sets are used as container, and there can happen a situation - where two different sets are assigned to the same timestamp. This will - for sure result in conflict. - - That said, the commits of objects like these have to be carefully - synchronized. See `indico.modules.scheduler.controllers` for more info - (particularly the way we use the 'spool'). - """ - - def __init__(self): - super(PersistentWaitingQueue, self).__init__() - self._reset() - - def _reset(self): - # this counter keeps the number of elements - self._elem_counter = Length(0) - self._container = IOBTree() - - def _gc_bin(self, t): - """ - 'garbage-collect' bins - """ - if len(self._container[t]) == 0: - del self._container[t] - - def _check_gc_consistency(self): - """ - 'check that there are no empty bins' - """ - for t in self._container: - if len(self._container[t]) == 0: - return False - - return True - - def enqueue(self, t, obj): - """ - Add an element to the queue - """ - - if t not in self._container: - self._container[t] = OOTreeSet() - - if obj in self._container[t]: - raise DuplicateElementException(obj) - - self._container[t].add(obj) - self._elem_counter.change(1) - - def dequeue(self, t, obj): - """ - Remove an element from the queue - """ - self._container[t].remove(obj) - self._gc_bin(t) - self._elem_counter.change(-1) - - def _next_timestamp(self): - """ - Return the next 'priority' to be served - """ - i = iter(self._container) - - try: - t = i.next() - return t - except StopIteration: - return None - - def peek(self): - """ - Return the next element - """ - t = self._next_timestamp() - if t: - # just to be sure - assert(len(self._container[t]) != 0) - - # find the next element - i = iter(self._container[t]) - # store it - elem = i.next() - - # return the element - return t, elem - else: - return None - - def pop(self): - """ - Remove and return the next set of elements to be processed - """ - pair = self.peek() - if pair: - self.dequeue(*pair) - - # return the element - return pair - else: - return None - - def nbins(self): - """ - Return the number of 'bins' (map entries) currently used - """ - # get 'real' len() - return len(self._container) - - def __len__(self): - return self._elem_counter() - - - def __getitem__(self, param): - return self._container.__getitem__(param) - - def __iter__(self): - - # tree iterator - for tstamp in iter(self._container): - cur_set = self._container[tstamp] - try: - # set iterator - for elem in cur_set: - yield tstamp, elem - except StopIteration: - pass diff --git a/indico/web/assets/bundles.py b/indico/web/assets/bundles.py index be18c1041..a5ab3fa08 100644 --- a/indico/web/assets/bundles.py +++ b/indico/web/assets/bundles.py @@ -142,7 +142,6 @@ indico_admin = rjs_bundle( *namespace('js/indico/Admin', 'News.js', - 'Scheduler.js', 'Upcoming.js')) indico_timetable = rjs_bundle( diff --git a/indico/web/flask/app.py b/indico/web/flask/app.py index db7727848..79acf47e6 100644 --- a/indico/web/flask/app.py +++ b/indico/web/flask/app.py @@ -232,7 +232,7 @@ ASSETS_REGISTERED = False def setup_assets(): global ASSETS_REGISTERED if ASSETS_REGISTERED: - # Avoid errors when forking after creating an app (e.g. in scheduler tests) + # Avoid errors when forking after creating an app return ASSETS_REGISTERED = True register_all_js(core_env) diff --git a/indico/web/flask/blueprints/admin.py b/indico/web/flask/blueprints/admin.py index 13b96aff6..af6371bee 100644 --- a/indico/web/flask/blueprints/admin.py +++ b/indico/web/flask/blueprints/admin.py @@ -14,8 +14,8 @@ # You should have received a copy of the GNU General Public License # along with Indico; if not, see . -from MaKaC.webinterface.rh import (admins, announcement, taskManager, maintenance, domains, templates, - conferenceModif, services, oauth) +from MaKaC.webinterface.rh import (admins, announcement, maintenance, domains, templates, conferenceModif, services, + oauth) from indico.web.flask.wrappers import IndicoBlueprint @@ -42,9 +42,6 @@ admin.add_url_rule('/news', 'updateNews', admins.RHUpdateNews) # Upcoming events admin.add_url_rule('/upcoming-events', 'adminUpcomingEvents', admins.RHConfigUpcoming) -# Task manager -admin.add_url_rule('/tasks', 'taskManager', taskManager.RHTaskManager) - # Maintenance admin.add_url_rule('/maintenance/', 'adminMaintenance', maintenance.RHMaintenance) admin.add_url_rule('/maintenance/pack-db', 'adminMaintenance-pack', maintenance.RHMaintenancePack, diff --git a/requirements.txt b/requirements.txt index 8d5347df3..eb83ec266 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,6 @@ Mako==0.9.1 MarkupSafe==0.15 ZConfig==2.9.2 ZODB3==3.10.5 -argparse==1.2.1 cds-indico-extras==0.2 cssmin==0.1.4 icalendar==3.8.2 diff --git a/setup.py b/setup.py index 1e609b334..964811039 100644 --- a/setup.py +++ b/setup.py @@ -327,7 +327,6 @@ if __name__ == '__main__': license="http://www.gnu.org/licenses/gpl-3.0.txt", entry_points=""" [console_scripts] - indico_scheduler = indico.modules.scheduler.daemon_script:main indico_initial_setup = MaKaC.consoleScripts.indicoInitialSetup:main indico_ctl = MaKaC.consoleScripts.indicoCtl:main indico = indico.cli.manage:main -- 2.11.4.GIT