1 __author__
= """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
3 import sys
, os
, subprocess
, time
, signal
, cPickle
, logging
5 from autotest_lib
.client
.common_lib
import error
, utils
8 # entry points that use subcommand must set this to their logging manager
9 # to get log redirection for subcommands
10 logging_manager_object
= None
13 def parallel(tasklist
, timeout
=None, return_results
=False):
15 Run a set of predefined subcommands in parallel.
17 @param tasklist: A list of subcommand instances to execute.
18 @param timeout: Number of seconds after which the commands should timeout.
19 @param return_results: If True instead of an AutoServError being raised
20 on any error a list of the results|exceptions from the tasks is
21 returned. [default: False]
27 remaining_timeout
= None
29 endtime
= time
.time() + timeout
34 remaining_timeout
= max(endtime
- time
.time(), 1)
36 status
= task
.fork_waitfor(timeout
=remaining_timeout
)
37 except error
.AutoservSubcommandError
:
43 results
.append(cPickle
.load(task
.result_pickle
))
44 task
.result_pickle
.close()
49 message
= 'One or more subcommands failed:\n'
50 for task
, result
in zip(tasklist
, results
):
51 message
+= 'task: %s returned/raised: %r\n' % (task
, result
)
52 raise error
.AutoservError(message
)
55 def parallel_simple(function
, arglist
, log
=True, timeout
=None,
56 return_results
=False):
58 Each element in the arglist used to create a subcommand object,
59 where that arg is used both as a subdir name, and a single argument
60 to pass to "function".
62 We create a subcommand object for each element in the list,
63 then execute those subcommand objects in parallel.
65 NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
67 @param function: A callable to run in parallel once per arg in arglist.
68 @param arglist: A list of single arguments to be used one per subcommand;
69 typically a list of machine names.
70 @param log: If True, output will be written to output in a subdirectory
71 named after each subcommand's arg.
72 @param timeout: Number of seconds after which the commands should timeout.
73 @param return_results: If True instead of an AutoServError being raised
74 on any error a list of the results|exceptions from the function
75 called on each arg is returned. [default: False]
77 @returns None or a list of results/exceptions.
80 logging
.warn('parallel_simple was called with an empty arglist, '
81 'did you forget to pass in a list of machines?')
82 # Bypass the multithreading if only one machine.
87 result
= function(arg
)
102 subcommands
.append(subcommand(function
, args
, subdir
))
103 return parallel(subcommands
, timeout
, return_results
=return_results
)
106 class subcommand(object):
107 fork_hooks
, join_hooks
= [], []
109 def __init__(self
, func
, args
, subdir
= None):
110 # func(args) - the subcommand to run
111 # subdir - the subdirectory to log results in
113 self
.subdir
= os
.path
.abspath(subdir
)
114 if not os
.path
.exists(self
.subdir
):
115 os
.mkdir(self
.subdir
)
116 self
.debug
= os
.path
.join(self
.subdir
, 'debug')
117 if not os
.path
.exists(self
.debug
):
125 self
.lambda_function
= lambda: func(*args
)
127 self
.returncode
= None
131 return str('subcommand(func=%s, args=%s, subdir=%s)' %
132 (self
.func
, self
.args
, self
.subdir
))
136 def register_fork_hook(cls
, hook
):
137 """ Register a function to be called from the child process after
139 cls
.fork_hooks
.append(hook
)
143 def register_join_hook(cls
, hook
):
144 """ Register a function to be called when from the child process
145 just before the child process terminates (joins to the parent). """
146 cls
.join_hooks
.append(hook
)
149 def redirect_output(self
):
150 if self
.subdir
and logging_manager_object
:
151 tag
= os
.path
.basename(self
.subdir
)
152 logging_manager_object
.tee_redirect_debug_dir(self
.debug
, tag
=tag
)
155 def fork_start(self
):
159 self
.returncode
= None
162 if self
.pid
: # I am the parent
164 self
.result_pickle
= os
.fdopen(r
, 'r')
169 # We are the child from this point on. Never return.
170 signal
.signal(signal
.SIGTERM
, signal
.SIG_DFL
) # clear handler
172 os
.chdir(self
.subdir
)
173 self
.redirect_output()
176 for hook
in self
.fork_hooks
:
178 result
= self
.lambda_function()
179 os
.write(w
, cPickle
.dumps(result
, cPickle
.HIGHEST_PROTOCOL
))
182 logging
.exception('function failed')
184 os
.write(w
, cPickle
.dumps(e
, cPickle
.HIGHEST_PROTOCOL
))
189 for hook
in self
.join_hooks
:
197 def _handle_exitstatus(self
, sts
):
199 This is partially borrowed from subprocess.Popen.
201 if os
.WIFSIGNALED(sts
):
202 self
.returncode
= -os
.WTERMSIG(sts
)
203 elif os
.WIFEXITED(sts
):
204 self
.returncode
= os
.WEXITSTATUS(sts
)
206 # Should never happen
207 raise RuntimeError("Unknown child exit status!")
209 if self
.returncode
!= 0:
210 print "subcommand failed pid %d" % self
.pid
211 print "%s" % (self
.func
,)
212 print "rc=%d" % self
.returncode
215 stderr_file
= os
.path
.join(self
.debug
, 'autoserv.stderr')
216 if os
.path
.exists(stderr_file
):
217 for line
in open(stderr_file
).readlines():
219 print "\n--------------------------------------------\n"
220 raise error
.AutoservSubcommandError(self
.func
, self
.returncode
)
225 This is borrowed from subprocess.Popen.
227 if self
.returncode
is None:
229 pid
, sts
= os
.waitpid(self
.pid
, os
.WNOHANG
)
231 self
._handle
_exitstatus
(sts
)
234 return self
.returncode
239 This is borrowed from subprocess.Popen.
241 if self
.returncode
is None:
242 pid
, sts
= os
.waitpid(self
.pid
, 0)
243 self
._handle
_exitstatus
(sts
)
244 return self
.returncode
247 def fork_waitfor(self
, timeout
=None):
251 end_time
= time
.time() + timeout
252 while time
.time() <= end_time
:
253 returncode
= self
.poll()
254 if returncode
is not None:
258 utils
.nuke_pid(self
.pid
)
259 print "subcommand failed pid %d" % self
.pid
260 print "%s" % (self
.func
,)
261 print "timeout after %ds" % timeout