make file closing more robust
[python/dscho.git] / Lib / test / fork_wait.py
blob03a4d6f984e5f818207bba4cb3186afef7c36af7
1 """This test case provides support for checking forking and wait behavior.
3 To test different wait behavior, overrise the wait_impl method.
5 We want fork1() semantics -- only the forking thread survives in the
6 child after a fork().
8 On some systems (e.g. Solaris without posix threads) we find that all
9 active threads survive in the child after a fork(); this is an error.
10 """
12 import os, sys, time, _thread, unittest
14 LONGSLEEP = 2
15 SHORTSLEEP = 0.5
16 NUM_THREADS = 4
18 class ForkWait(unittest.TestCase):
20 def setUp(self):
21 self.alive = {}
22 self.stop = 0
24 def f(self, id):
25 while not self.stop:
26 self.alive[id] = os.getpid()
27 try:
28 time.sleep(SHORTSLEEP)
29 except IOError:
30 pass
32 def wait_impl(self, cpid):
33 for i in range(10):
34 # waitpid() shouldn't hang, but some of the buildbots seem to hang
35 # in the forking tests. This is an attempt to fix the problem.
36 spid, status = os.waitpid(cpid, os.WNOHANG)
37 if spid == cpid:
38 break
39 time.sleep(2 * SHORTSLEEP)
41 self.assertEquals(spid, cpid)
42 self.assertEquals(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))
44 def test_wait(self):
45 for i in range(NUM_THREADS):
46 _thread.start_new(self.f, (i,))
48 time.sleep(LONGSLEEP)
50 a = sorted(self.alive.keys())
51 self.assertEquals(a, list(range(NUM_THREADS)))
53 prefork_lives = self.alive.copy()
55 if sys.platform in ['unixware7']:
56 cpid = os.fork1()
57 else:
58 cpid = os.fork()
60 if cpid == 0:
61 # Child
62 time.sleep(LONGSLEEP)
63 n = 0
64 for key in self.alive:
65 if self.alive[key] != prefork_lives[key]:
66 n += 1
67 os._exit(n)
68 else:
69 # Parent
70 self.wait_impl(cpid)
71 # Tell threads to die
72 self.stop = 1
73 time.sleep(2*SHORTSLEEP) # Wait for threads to die