virt.virt_test_utils: run_autotest - 'tar' needs relative paths to strip the leading '/'
[autotest-zwu.git] / client / bin / parallel.py
blobc4269684f8a8694be340452719fa6ac1515217ff
1 """ Parallel execution management """
3 __author__ = """Copyright Andy Whitcroft 2006"""
5 import sys, logging, os, pickle, traceback, gc, time
6 from autotest_lib.client.common_lib import error, utils
8 def fork_start(tmp, l):
9 sys.stdout.flush()
10 sys.stderr.flush()
11 pid = os.fork()
12 if pid:
13 # Parent
14 return pid
16 try:
17 try:
18 l()
19 except error.AutotestError:
20 raise
21 except Exception, e:
22 raise error.UnhandledTestError(e)
23 except Exception, detail:
24 try:
25 try:
26 logging.error('child process failed')
27 # logging.exception() uses ERROR level, but we want DEBUG for
28 # the traceback
29 logging.debug(traceback.format_exc())
30 finally:
31 # note that exceptions originating in this block won't make it
32 # to the logs
33 output_dir = os.path.join(tmp, 'debug')
34 if not os.path.exists(output_dir):
35 os.makedirs(output_dir)
36 ename = os.path.join(output_dir, "error-%d" % os.getpid())
37 pickle.dump(detail, open(ename, "w"))
39 sys.stdout.flush()
40 sys.stderr.flush()
41 finally:
42 # clear exception information to allow garbage collection of
43 # objects referenced by the exception's traceback
44 sys.exc_clear()
45 gc.collect()
46 os._exit(1)
47 else:
48 try:
49 sys.stdout.flush()
50 sys.stderr.flush()
51 finally:
52 os._exit(0)
55 def _check_for_subprocess_exception(temp_dir, pid):
56 ename = temp_dir + "/debug/error-%d" % pid
57 if os.path.exists(ename):
58 e = pickle.load(file(ename, 'r'))
59 # rename the error-pid file so that they do not affect later child
60 # processes that use recycled pids.
61 i = 0
62 while True:
63 pename = ename + ('-%d' % i)
64 i += 1
65 if not os.path.exists(pename):
66 break
67 os.rename(ename, pename)
68 raise e
71 def fork_waitfor(tmp, pid):
72 (pid, status) = os.waitpid(pid, 0)
74 _check_for_subprocess_exception(tmp, pid)
76 if status:
77 raise error.TestError("Test subprocess failed rc=%d" % (status))
79 def fork_waitfor_timed(tmp, pid, timeout):
80 """
81 Waits for pid until it terminates or timeout expires.
82 If timeout expires, test subprocess is killed.
83 """
84 timer_expired = True
85 poll_time = 2
86 time_passed = 0
87 while time_passed < timeout:
88 time.sleep(poll_time)
89 (child_pid, status) = os.waitpid(pid, os.WNOHANG)
90 if (child_pid, status) == (0, 0):
91 time_passed = time_passed + poll_time
92 else:
93 timer_expired = False
94 break
96 if timer_expired:
97 logging.info('Timer expired (%d sec.), nuking pid %d', timeout, pid)
98 utils.nuke_pid(pid)
99 (child_pid, status) = os.waitpid(pid, 0)
100 raise error.TestError("Test timeout expired, rc=%d" % (status))
101 else:
102 _check_for_subprocess_exception(tmp, pid)
104 if status:
105 raise error.TestError("Test subprocess failed rc=%d" % (status))
107 def fork_nuke_subprocess(tmp, pid):
108 utils.nuke_pid(pid)
109 _check_for_subprocess_exception(tmp, pid)