1 # -*- coding: utf-8 -*-
2 ###########################################################################
3 # Copyright (C) 2008 by Andrew Mahone
4 # <andrew.mahone@gmail.com>
6 # Copyright: See COPYING file that comes with this distribution
8 ###########################################################################
12 from types
import FunctionType
, GeneratorType
13 from twisted
.internet
import threads
, defer
14 from twisted
.python
import failure
16 if 'twisted.internet.reactor' not in sys
.modules
:
17 for reactor
in ('kqreactor','epollreactor','pollreactor','selectreactor'):
19 r
= __import__('twisted.internet.' + reactor
, fromlist
=[reactor
])
22 except ImportError: pass
24 from twisted
.internet
import reactor
27 if reactor
.running
: reactor
.stop()
29 atexit
.register(cleanup
)
34 def __new__(cls
,*args
,**kw
):
37 class BaseTask(object):
38 def __init__(self
, target
=None, args
=(), kwargs
=(), background
=True):
42 self
.kwargs
= dict(kwargs
)
43 self
.background
= background
52 class FuncTask(BaseTask
):
53 def __init__(self
, target
=None, args
=(), kwargs
=(), background
=True):
54 BaseTask
.__init
__(self
, target
, args
, kwargs
, background
)
58 self
.target(*self
.args
, **self
.kwargs
)
59 self
.deferred
= defer
.succeed(self
)
62 self
.deferred
= defer
.failure(failure
.Failure())
66 self
.deferred
= threads
.deferToThread(self
.target
, *self
.args
, **self
.kwargs
)
69 class CLITask(BaseTask
):
70 def __init__(self
, target
=None, args
=(), stdin
=None, stdout
=None, stderr
=None, kwargs
=(), background
=True):
74 BaseTask
.__init
__(self
, target
, args
, background
=background
)
80 if isinstance(self
.stdin
, basestring
):
81 self
.stdin
= file(self
.stdin
, 'rb')
82 if isinstance(self
.stdout
, basestring
):
83 self
.stdout
= file(self
.stdout
, 'wb')
84 if isinstance(self
.stderr
, basestring
):
85 self
.stderr
= file(self
.stderr
, 'wb')
86 self
.proc
= Popen(executable
=self
.target
, args
=self
.args
, stdin
=self
.stdin
, stderr
=self
.stderr
, stdout
=self
.stdout
)
96 if not hasattr(self
,'proc'):
98 ret
= self
.proc
.wait()
100 raise CalledProcessError(ret
,self
.args
)
102 for f
in (self
.stdin
, self
.stdout
, self
.stderr
):
103 if isinstance(f
,int) and f
>2:
105 if isinstance(f
,file) and f
not in (sys
.stdin
, sys
.stdout
, sys
.stderr
):
111 if not hasattr(self
,'proc'):
113 ret
= self
.proc
.poll()
117 raise CalledProcessError(ret
)
119 class TaskSet(FuncTask
):
120 def __init__(self
, tasks
=(), background
=True):
121 FuncTask
.__init
__(self
, background
=background
)
122 self
.tasks
= list(tasks
)
126 t
= self
.bgprocs
.pop()
132 if isinstance(t
, BaseTask
):
134 if isinstance(t
, BaseTask
):
140 __all__
= ['FuncTask','CLITask','TaskSet','reactor']