1 # -*- coding: utf-8 -*-
2 ###########################################################################
3 # Copyright (C) 2008 by Andrew Mahone
4 # <andrew.mahone@gmail.com>
6 # Copyright: See COPYING file that comes with this distribution
8 ###########################################################################
9 #pylint: disable=E1101,E1103
13 from types
import FunctionType
, GeneratorType
14 from twisted
.internet
import defer
, protocol
, error
, fdesc
15 from twisted
.python
import failure
16 from functools
import wraps
17 from multiprocessing
import cpu_count
18 from audiomangler
.config
import Config
19 from audiomangler
.util
import ClassInitMeta
20 from audiomangler
.logging
import err
21 from decorator
import decorator
23 if 'twisted.internet.reactor' not in sys
.modules
:
24 for reactor
in 'kqreactor', 'epollreactor', 'pollreactor', 'selectreactor':
26 r
= __import__('twisted.internet.' + reactor
, fromlist
=[reactor
])
29 except ImportError: pass
31 from twisted
.internet
import reactor
34 def background_task(func
, self
, *args
, **kwargs
):
36 reactor
.callWhenRunning(func
, self
, *args
, **kwargs
)
37 if not reactor
.running
:
40 #def background_task(func):
41 #argspec = inspect.getargspec(func)
47 #reactor.callWhenRunning(%s)
48 #if not reactor.running:
50 #return proxy""" % (inspect.formatargspec(*argspec), ', '.join(['func', 'self'] + ['%s=%s' % (arg, arg) for arg in argspec.args[1:]]))
51 #exec code in globals(), env
52 #return wraps(func)(env['decorate'](func))
62 def chainDeferreds(d1
, d2
):
63 d1
.addCallbacks(chain(d2
.callback
), chain(d2
.errback
))
65 class BaseTask(object):
66 "Base class for other Task types, providing handling for Task registration and cleanup, and ensuring that the first task is started inside the reactor."
67 __metaclass__
= ClassInitMeta
69 def __classinit__(cls
, name
, bases
, cls_dict
):
70 run
= getattr(cls
, 'run', None)
73 run
= background_task(run
.im_func
)
75 run
.__doc
__ = "Start the task, returning status via <task>.deferred callbacks when the task completes."
78 __slots__
= 'deferred', 'args', 'parent'
80 def __init__(self
, *args
):
82 self
.deferred
= defer
.Deferred()
83 self
.deferred
.addBoth(self
._complete
)
86 self
.__class
__.__bg
_tasks
.add(self
)
88 def _complete(self
, out
):
89 if self
.deferred
.callbacks
:
90 self
.deferred
.addBoth(self
._complete
)
92 parent
= getattr(self
, 'parent', None)
95 parent
.complete_sub(out
, self
)
97 if self
in self
.__bg
_tasks
:
98 self
.__bg
_tasks
.remove(self
)
99 if not self
.__bg
_tasks
:
103 class CLIProcessProtocol(protocol
.ProcessProtocol
):
104 "Support class for CLITask, saving output from the spawned process and triggering task callbacks on exit."
105 def __init__(self
, task
):
110 def outReceived(self
, data
):
111 self
._out
.append(data
)
113 def errReceived(self
, data
):
114 self
._err
.append(data
)
116 def processEnded(self
, reason
):
117 (self
.task
.out
, self
.task
.err
) = map(''.join
, (self
._out
, self
._err
))
118 if reason
.check(error
.ProcessDone
) and not (reason
.value
.status
or reason
.value
.signal
):
119 self
.task
.deferred
.callback((self
.task
.out
, self
.task
.err
))
121 self
.task
.deferred
.errback(reason
)
123 class CLITask(BaseTask
):
124 "Task subclass to spawn subprocesses, with the executable and arguments passed on initialization. The keyword arguments stdin, stdout, and stderr may be used to provide a file or file descriptor for the stdin of the first process in the pipeline, or the stdout and stderr of the last process."
125 __slots__
= 'proc', 'out', 'err', 'exit', 'stdin', 'stdout', 'stderr'
126 def __init__(self
, *args
, **kwargs
):
127 for arg
in 'stdin', 'stdout', 'stderr':
129 setattr(self
, arg
, kwargs
[arg
])
130 super(CLITask
, self
).__init
__(*args
)
132 def run(self
, stdin
=None, stdout
=None, stderr
=None):
133 "Start the task, returning status via <task>.deferred callbacks when the task completes. The keyword arguments stdin, stdout, and stderr may be used to override the ones provided at initialization."
136 if stdin
is not None:
139 childFDs
[0] = getattr(self
, 'stdin', 'w')
140 if stdout
is not None:
143 childFDs
[1] = getattr(self
, 'stdout', 'r')
144 if stderr
is not None:
147 childFDs
[2] = getattr(self
, 'stderr', 'r')
148 for key
, value
in childFDs
.items():
149 if isinstance(value
, basestring
) and (value
.startswith('w:') or value
.startswith('r:')):
150 mode
, path
= value
.split(':', 1)
151 mode
= os
.O_WRONLY|os
.O_CREAT
if mode
== 'w' else os
.O_RDONLY
152 closeFDs
.append(os
.open(path
, mode
))
153 childFDs
[key
] = closeFDs
[-1]
154 self
.proc
= reactor
.spawnProcess(CLIProcessProtocol(self
), executable
=self
.args
[0], args
=self
.args
, childFDs
= childFDs
)
158 class BaseSetTask(BaseTask
):
159 "Base class for Tasks that run a set of other Tasks."
160 slots
= 'subs', 'main'
161 def __init__(self
, *args
, **kwargs
):
162 super(BaseSetTask
, self
).__init
__()
165 main
= kwargs
.pop('main')
166 assert isinstance(main
, (int, BaseTask
))
167 if isinstance(main
, int):
170 if args
and isinstance(args
[0], GeneratorType
):
174 def run_sub(self
, sub
, *args
, **kwargs
):
177 sub
.run(*args
, **kwargs
)
179 def complete_sub(self
, out
, sub
):
180 self
.subs
.remove(sub
)
183 class CLIPipelineTask(BaseSetTask
):
184 "Task comprised of a series of subprocesses, with stdout of each connected to stdin of the previous one. The keyword arguments stdin, stdout, and stderr may be used to provide a file or file descriptor for the stdin of the first process in the pipeline, or the stdout and stderr of the last process."
185 __slots__
= 'tasks', 'stdin', 'stdout', 'stderr'
186 def __init__(self
, *args
, **kwargs
):
188 for arg
in 'stdin', 'stdout', 'stderr':
190 setattr(self
, arg
, kwargs
[arg
])
191 super(CLIPipelineTask
, self
).__init
__(*args
)
193 def run(self
, stdin
=None, stdout
=None, stderr
=None):
194 "Start the task, returning status via <task>.deferred callbacks when the task completes. The keyword arguments stdin, stdout, and stderr may be used to override the ones provided at initialization."
196 stdin
= getattr(self
, 'stdin', None)
198 stdout
= getattr(self
, 'stdout', None)
200 stderr
= getattr(self
, 'stderr', None)
203 for task
in self
.args
:
205 self
.run_sub(prev
, stdin
=fd
)
206 fd
= prev
.proc
.pipes
.pop(1)
207 fdesc
.setBlocking(fd
)
212 self
.run_sub(prev
, stdin
=fd
, stdout
=stdout
, stderr
=stderr
)
214 self
.deferred
.callback(None)
216 def complete_sub(self
, out
, sub
):
217 super(CLIPipelineTask
, self
).complete_sub(out
, sub
)
219 task
= getattr(self
, 'main', self
.tasks
[-1])
220 chainDeferreds(task
.deferred
, self
.deferred
)
222 def run_sub(self
, sub
, *args
, **kwargs
):
223 super(CLIPipelineTask
, self
).run_sub(sub
, *args
, **kwargs
)
224 self
.tasks
.append(sub
)
227 def generator_task(func
, *args
, **kwargs
):
228 "Decorator function wrapping a generator that yields Tasks in a GeneratorTask."
229 gen
= func(*args
, **kwargs
)
230 return GeneratorTask(gen
)
232 class GeneratorTask(BaseSetTask
):
233 "Task that runs subtasks produced by a generator, passing their output back via the generator's send method. If the generator yields a value that is not a Task, that value will be passed to the GeneratorTask's callback."
234 def __init__(self
, gen
):
235 assert(isinstance(gen
, GeneratorType
))
236 super(GeneratorTask
, self
).__init
__(gen
)
239 assert(isinstance(self
.args
, GeneratorType
))
241 task
= self
.args
.next()
243 except StopIteration:
244 self
.deferred
.callback(None)
246 err(failure
.Failure())
248 def complete_sub(self
, out
, sub
):
249 super(GeneratorTask
, self
).complete_sub(out
, sub
)
251 newout
= self
.args
.send(out
)
252 if isinstance(newout
, BaseTask
):
255 self
.deferred
.callback(out
)
256 except StopIteration:
257 self
.deferred
.callback(out
)
259 err(failure
.Failure())
261 class GroupTask(BaseSetTask
):
262 "Task that starts a group of tasks and waits for them to complete before firing its callback."
264 for task
in self
.args
:
267 self
.deferred
.callback(None)
269 def complete_sub(self
, out
, sub
):
270 out
= super(GroupTask
, self
).complete_sub(out
, sub
)
272 self
.deferred
.callback(None)
274 class PoolTask(BaseSetTask
):
275 "Task that runs at most Config['jobs'] tasks at a time from its arguments until out of tasks, then fires its callback with None. A suitable number of jobs will be chosen if Config does not specify one, and the sub-Tasks are not connected to each other in any way."
276 __slots__
= 'max_tasks'
277 def __init__(self
, *args
):
278 self
.max_tasks
= int(Config
.get('jobs', cpu_count()))
279 if args
and isinstance(args
[0], GeneratorType
):
282 super(PoolTask
, self
).__init
__(args
)
286 while len(self
.subs
) < self
.max_tasks
:
287 next_task
= self
.args
.next()
288 self
.run_sub(next_task
)
289 except StopIteration:
292 self
.deferred
.callback(None)
294 def complete_sub(self
, out
, sub
):
295 out
= super(PoolTask
, self
).complete_sub(out
, sub
)