Cleanups, fixes, use decorator lib for argspec-preserving decorators.
[audiomangler.git] / audiomangler / task.py
blobd273f0e9229d18774068691cec139c391fee2070
1 # -*- coding: utf-8 -*-
2 ###########################################################################
3 # Copyright (C) 2008 by Andrew Mahone
4 # <andrew.mahone@gmail.com>
6 # Copyright: See COPYING file that comes with this distribution
8 ###########################################################################
9 #pylint: disable=E1101,E1103
10 import os
11 import sys
12 import inspect
13 from types import FunctionType, GeneratorType
14 from twisted.internet import defer, protocol, error, fdesc
15 from twisted.python import failure
16 from functools import wraps
17 from multiprocessing import cpu_count
18 from audiomangler.config import Config
19 from audiomangler.util import ClassInitMeta
20 from audiomangler.logging import err
21 from decorator import decorator
23 if 'twisted.internet.reactor' not in sys.modules:
24 for reactor in 'kqreactor', 'epollreactor', 'pollreactor', 'selectreactor':
25 try:
26 r = __import__('twisted.internet.' + reactor, fromlist=[reactor])
27 r.install()
28 break
29 except ImportError: pass
31 from twisted.internet import reactor
33 @decorator
34 def background_task(func, self, *args, **kwargs):
35 self._register()
36 reactor.callWhenRunning(func, self, *args, **kwargs)
37 if not reactor.running:
38 reactor.run()
40 #def background_task(func):
41 #argspec = inspect.getargspec(func)
42 #env = {}
43 #code = """
44 #def decorate(func):
45 #def proxy%s:
46 #self._register()
47 #reactor.callWhenRunning(%s)
48 #if not reactor.running:
49 #reactor.run()
50 #return proxy""" % (inspect.formatargspec(*argspec), ', '.join(['func', 'self'] + ['%s=%s' % (arg, arg) for arg in argspec.args[1:]]))
51 #exec code in globals(), env
52 #return wraps(func)(env['decorate'](func))
55 def chain(f):
56 @wraps(f)
57 def proxy(out):
58 f(out)
59 return out
60 return proxy
62 def chainDeferreds(d1, d2):
63 d1.addCallbacks(chain(d2.callback), chain(d2.errback))
65 class BaseTask(object):
66 "Base class for other Task types, providing handling for Task registration and cleanup, and ensuring that the first task is started inside the reactor."
67 __metaclass__ = ClassInitMeta
68 @classmethod
69 def __classinit__(cls, name, bases, cls_dict):
70 run = getattr(cls, 'run', None)
71 if run:
72 cls._run = run
73 run = background_task(run.im_func)
74 if not run.__doc__:
75 run.__doc__ = "Start the task, returning status via <task>.deferred callbacks when the task completes."
76 cls.run = run
78 __slots__ = 'deferred', 'args', 'parent'
79 __bg_tasks = set()
80 def __init__(self, *args):
81 self.args = args
82 self.deferred = defer.Deferred()
83 self.deferred.addBoth(self._complete)
85 def _register(self):
86 self.__class__.__bg_tasks.add(self)
88 def _complete(self, out):
89 if self.deferred.callbacks:
90 self.deferred.addBoth(self._complete)
91 return out
92 parent = getattr(self, 'parent', None)
93 try:
94 if parent:
95 parent.complete_sub(out, self)
96 finally:
97 if self in self.__bg_tasks:
98 self.__bg_tasks.remove(self)
99 if not self.__bg_tasks:
100 reactor.stop()
101 return out
103 class CLIProcessProtocol(protocol.ProcessProtocol):
104 "Support class for CLITask, saving output from the spawned process and triggering task callbacks on exit."
105 def __init__(self, task):
106 self._out = []
107 self._err = []
108 self.task = task
110 def outReceived(self, data):
111 self._out.append(data)
113 def errReceived(self, data):
114 self._err.append(data)
116 def processEnded(self, reason):
117 (self.task.out, self.task.err) = map(''.join, (self._out, self._err))
118 if reason.check(error.ProcessDone) and not (reason.value.status or reason.value.signal):
119 self.task.deferred.callback((self.task.out, self.task.err))
120 else:
121 self.task.deferred.errback(reason)
123 class CLITask(BaseTask):
124 "Task subclass to spawn subprocesses, with the executable and arguments passed on initialization. The keyword arguments stdin, stdout, and stderr may be used to provide a file or file descriptor for the stdin of the first process in the pipeline, or the stdout and stderr of the last process."
125 __slots__ = 'proc', 'out', 'err', 'exit', 'stdin', 'stdout', 'stderr'
126 def __init__(self, *args, **kwargs):
127 for arg in 'stdin', 'stdout', 'stderr':
128 if arg in kwargs:
129 setattr(self, arg, kwargs[arg])
130 super(CLITask, self).__init__(*args)
132 def run(self, stdin=None, stdout=None, stderr=None):
133 "Start the task, returning status via <task>.deferred callbacks when the task completes. The keyword arguments stdin, stdout, and stderr may be used to override the ones provided at initialization."
134 childFDs = {}
135 closeFDs = []
136 if stdin is not None:
137 childFDs[0] = stdin
138 else:
139 childFDs[0] = getattr(self, 'stdin', 'w')
140 if stdout is not None:
141 childFDs[1] = stdout
142 else:
143 childFDs[1] = getattr(self, 'stdout', 'r')
144 if stderr is not None:
145 childFDs[2] = stderr
146 else:
147 childFDs[2] = getattr(self, 'stderr', 'r')
148 for key, value in childFDs.items():
149 if isinstance(value, basestring) and (value.startswith('w:') or value.startswith('r:')):
150 mode, path = value.split(':', 1)
151 mode = os.O_WRONLY|os.O_CREAT if mode == 'w' else os.O_RDONLY
152 closeFDs.append(os.open(path, mode))
153 childFDs[key] = closeFDs[-1]
154 self.proc = reactor.spawnProcess(CLIProcessProtocol(self), executable=self.args[0], args=self.args, childFDs = childFDs)
155 for fd in closeFDs:
156 os.close(fd)
158 class BaseSetTask(BaseTask):
159 "Base class for Tasks that run a set of other Tasks."
160 slots = 'subs', 'main'
161 def __init__(self, *args, **kwargs):
162 super(BaseSetTask, self).__init__()
163 self.subs = set()
164 if 'main' in kwargs:
165 main = kwargs.pop('main')
166 assert isinstance(main, (int, BaseTask))
167 if isinstance(main, int):
168 main = args[main]
169 self.main = main
170 if args and isinstance(args[0], GeneratorType):
171 args = args[0]
172 self.args = args
174 def run_sub(self, sub, *args, **kwargs):
175 self.subs.add(sub)
176 sub.parent = self
177 sub.run(*args, **kwargs)
179 def complete_sub(self, out, sub):
180 self.subs.remove(sub)
181 return out
183 class CLIPipelineTask(BaseSetTask):
184 "Task comprised of a series of subprocesses, with stdout of each connected to stdin of the previous one. The keyword arguments stdin, stdout, and stderr may be used to provide a file or file descriptor for the stdin of the first process in the pipeline, or the stdout and stderr of the last process."
185 __slots__ = 'tasks', 'stdin', 'stdout', 'stderr'
186 def __init__(self, *args, **kwargs):
187 self.tasks = []
188 for arg in 'stdin', 'stdout', 'stderr':
189 if arg in kwargs:
190 setattr(self, arg, kwargs[arg])
191 super(CLIPipelineTask, self).__init__(*args)
193 def run(self, stdin=None, stdout=None, stderr=None):
194 "Start the task, returning status via <task>.deferred callbacks when the task completes. The keyword arguments stdin, stdout, and stderr may be used to override the ones provided at initialization."
195 if stdin is None:
196 stdin = getattr(self, 'stdin', None)
197 if stdout is None:
198 stdout = getattr(self, 'stdout', None)
199 if stderr is None:
200 stderr = getattr(self, 'stderr', None)
201 fd = stdin
202 prev = None
203 for task in self.args:
204 if prev:
205 self.run_sub(prev, stdin=fd)
206 fd = prev.proc.pipes.pop(1)
207 fdesc.setBlocking(fd)
208 fd.stopReading()
209 fd = fd.fileno()
210 prev = task
211 if prev:
212 self.run_sub(prev, stdin=fd, stdout=stdout, stderr=stderr)
213 else:
214 self.deferred.callback(None)
216 def complete_sub(self, out, sub):
217 super(CLIPipelineTask, self).complete_sub(out, sub)
218 if not self.subs:
219 task = getattr(self, 'main', self.tasks[-1])
220 chainDeferreds(task.deferred, self.deferred)
222 def run_sub(self, sub, *args, **kwargs):
223 super(CLIPipelineTask, self).run_sub(sub, *args, **kwargs)
224 self.tasks.append(sub)
226 @decorator
227 def generator_task(func, *args, **kwargs):
228 "Decorator function wrapping a generator that yields Tasks in a GeneratorTask."
229 gen = func(*args, **kwargs)
230 return GeneratorTask(gen)
232 class GeneratorTask(BaseSetTask):
233 "Task that runs subtasks produced by a generator, passing their output back via the generator's send method. If the generator yields a value that is not a Task, that value will be passed to the GeneratorTask's callback."
234 def __init__(self, gen):
235 assert(isinstance(gen, GeneratorType))
236 super(GeneratorTask, self).__init__(gen)
238 def run(self):
239 assert(isinstance(self.args, GeneratorType))
240 try:
241 task = self.args.next()
242 self.run_sub(task)
243 except StopIteration:
244 self.deferred.callback(None)
245 except:
246 err(failure.Failure())
248 def complete_sub(self, out, sub):
249 super(GeneratorTask, self).complete_sub(out, sub)
250 try:
251 newout = self.args.send(out)
252 if isinstance(newout, BaseTask):
253 self.run_sub(newout)
254 else:
255 self.deferred.callback(out)
256 except StopIteration:
257 self.deferred.callback(out)
258 except:
259 err(failure.Failure())
261 class GroupTask(BaseSetTask):
262 "Task that starts a group of tasks and waits for them to complete before firing its callback."
263 def run(self):
264 for task in self.args:
265 self.run_sub(task)
266 if not self.subs:
267 self.deferred.callback(None)
269 def complete_sub(self, out, sub):
270 out = super(GroupTask, self).complete_sub(out, sub)
271 if not(self.subs):
272 self.deferred.callback(None)
274 class PoolTask(BaseSetTask):
275 "Task that runs at most Config['jobs'] tasks at a time from its arguments until out of tasks, then fires its callback with None. A suitable number of jobs will be chosen if Config does not specify one, and the sub-Tasks are not connected to each other in any way."
276 __slots__ = 'max_tasks'
277 def __init__(self, *args):
278 self.max_tasks = int(Config.get('jobs', cpu_count()))
279 if args and isinstance(args[0], GeneratorType):
280 args = args[0]
281 args = iter(args)
282 super(PoolTask, self).__init__(args)
284 def run(self):
285 try:
286 while len(self.subs) < self.max_tasks:
287 next_task = self.args.next()
288 self.run_sub(next_task)
289 except StopIteration:
290 pass
291 if not self.subs:
292 self.deferred.callback(None)
294 def complete_sub(self, out, sub):
295 out = super(PoolTask, self).complete_sub(out, sub)
296 self.run()
297 return out
300 FuncTask=None
301 TaskSet=None
302 __all__ = []