Issue #7042: Use a better mechanism for testing timers in test_signal.
[python.git] / Lib / test / test_signal.py
blobc4b8e4eb55cc2a1b03d6b7b847fc50d4af4fd284
1 import unittest
2 from test import test_support
3 from contextlib import closing
4 import gc
5 import pickle
6 import select
7 import signal
8 import subprocess
9 import traceback
10 import sys, os, time, errno
12 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
13 raise unittest.SkipTest("Can't test signal on %s" % \
14 sys.platform)
17 class HandlerBCalled(Exception):
18 pass
21 def exit_subprocess():
22 """Use os._exit(0) to exit the current subprocess.
24 Otherwise, the test catches the SystemExit and continues executing
25 in parallel with the original test, so you wind up with an
26 exponential number of tests running concurrently.
27 """
28 os._exit(0)
31 def ignoring_eintr(__func, *args, **kwargs):
32 try:
33 return __func(*args, **kwargs)
34 except EnvironmentError as e:
35 if e.errno != errno.EINTR:
36 raise
37 return None
40 class InterProcessSignalTests(unittest.TestCase):
41 MAX_DURATION = 20 # Entire test should last at most 20 sec.
43 def setUp(self):
44 self.using_gc = gc.isenabled()
45 gc.disable()
47 def tearDown(self):
48 if self.using_gc:
49 gc.enable()
51 def format_frame(self, frame, limit=None):
52 return ''.join(traceback.format_stack(frame, limit=limit))
54 def handlerA(self, signum, frame):
55 self.a_called = True
56 if test_support.verbose:
57 print "handlerA invoked from signal %s at:\n%s" % (
58 signum, self.format_frame(frame, limit=1))
60 def handlerB(self, signum, frame):
61 self.b_called = True
62 if test_support.verbose:
63 print "handlerB invoked from signal %s at:\n%s" % (
64 signum, self.format_frame(frame, limit=1))
65 raise HandlerBCalled(signum, self.format_frame(frame))
67 def wait(self, child):
68 """Wait for child to finish, ignoring EINTR."""
69 while True:
70 try:
71 child.wait()
72 return
73 except OSError as e:
74 if e.errno != errno.EINTR:
75 raise
77 def run_test(self):
78 # Install handlers. This function runs in a sub-process, so we
79 # don't worry about re-setting the default handlers.
80 signal.signal(signal.SIGHUP, self.handlerA)
81 signal.signal(signal.SIGUSR1, self.handlerB)
82 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
83 signal.signal(signal.SIGALRM, signal.default_int_handler)
85 # Variables the signals will modify:
86 self.a_called = False
87 self.b_called = False
89 # Let the sub-processes know who to send signals to.
90 pid = os.getpid()
91 if test_support.verbose:
92 print "test runner's pid is", pid
94 child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
95 if child:
96 self.wait(child)
97 if not self.a_called:
98 time.sleep(1) # Give the signal time to be delivered.
99 self.assertTrue(self.a_called)
100 self.assertFalse(self.b_called)
101 self.a_called = False
103 # Make sure the signal isn't delivered while the previous
104 # Popen object is being destroyed, because __del__ swallows
105 # exceptions.
106 del child
107 try:
108 child = subprocess.Popen(['kill', '-USR1', str(pid)])
109 # This wait should be interrupted by the signal's exception.
110 self.wait(child)
111 time.sleep(1) # Give the signal time to be delivered.
112 self.fail('HandlerBCalled exception not thrown')
113 except HandlerBCalled:
114 self.assertTrue(self.b_called)
115 self.assertFalse(self.a_called)
116 if test_support.verbose:
117 print "HandlerBCalled exception caught"
119 child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
120 if child:
121 self.wait(child) # Nothing should happen.
123 try:
124 signal.alarm(1)
125 # The race condition in pause doesn't matter in this case,
126 # since alarm is going to raise a KeyboardException, which
127 # will skip the call.
128 signal.pause()
129 # But if another signal arrives before the alarm, pause
130 # may return early.
131 time.sleep(1)
132 except KeyboardInterrupt:
133 if test_support.verbose:
134 print "KeyboardInterrupt (the alarm() went off)"
135 except:
136 self.fail("Some other exception woke us from pause: %s" %
137 traceback.format_exc())
138 else:
139 self.fail("pause returned of its own accord, and the signal"
140 " didn't arrive after another second.")
142 def test_main(self):
143 # This function spawns a child process to insulate the main
144 # test-running process from all the signals. It then
145 # communicates with that child process over a pipe and
146 # re-raises information about any exceptions the child
147 # throws. The real work happens in self.run_test().
148 os_done_r, os_done_w = os.pipe()
149 with closing(os.fdopen(os_done_r)) as done_r, \
150 closing(os.fdopen(os_done_w, 'w')) as done_w:
151 child = os.fork()
152 if child == 0:
153 # In the child process; run the test and report results
154 # through the pipe.
155 try:
156 done_r.close()
157 # Have to close done_w again here because
158 # exit_subprocess() will skip the enclosing with block.
159 with closing(done_w):
160 try:
161 self.run_test()
162 except:
163 pickle.dump(traceback.format_exc(), done_w)
164 else:
165 pickle.dump(None, done_w)
166 except:
167 print 'Uh oh, raised from pickle.'
168 traceback.print_exc()
169 finally:
170 exit_subprocess()
172 done_w.close()
173 # Block for up to MAX_DURATION seconds for the test to finish.
174 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
175 if done_r in r:
176 tb = pickle.load(done_r)
177 if tb:
178 self.fail(tb)
179 else:
180 os.kill(child, signal.SIGKILL)
181 self.fail('Test deadlocked after %d seconds.' %
182 self.MAX_DURATION)
185 class BasicSignalTests(unittest.TestCase):
186 def trivial_signal_handler(self, *args):
187 pass
189 def test_out_of_range_signal_number_raises_error(self):
190 self.assertRaises(ValueError, signal.getsignal, 4242)
192 self.assertRaises(ValueError, signal.signal, 4242,
193 self.trivial_signal_handler)
195 def test_setting_signal_handler_to_none_raises_error(self):
196 self.assertRaises(TypeError, signal.signal,
197 signal.SIGUSR1, None)
199 def test_getsignal(self):
200 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
201 self.assertEquals(signal.getsignal(signal.SIGHUP),
202 self.trivial_signal_handler)
203 signal.signal(signal.SIGHUP, hup)
204 self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
207 class WakeupSignalTests(unittest.TestCase):
208 TIMEOUT_FULL = 10
209 TIMEOUT_HALF = 5
211 def test_wakeup_fd_early(self):
212 import select
214 signal.alarm(1)
215 before_time = time.time()
216 # We attempt to get a signal during the sleep,
217 # before select is called
218 time.sleep(self.TIMEOUT_FULL)
219 mid_time = time.time()
220 self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
221 select.select([self.read], [], [], self.TIMEOUT_FULL)
222 after_time = time.time()
223 self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
225 def test_wakeup_fd_during(self):
226 import select
228 signal.alarm(1)
229 before_time = time.time()
230 # We attempt to get a signal during the select call
231 self.assertRaises(select.error, select.select,
232 [self.read], [], [], self.TIMEOUT_FULL)
233 after_time = time.time()
234 self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
236 def setUp(self):
237 import fcntl
239 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
240 self.read, self.write = os.pipe()
241 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
242 flags = flags | os.O_NONBLOCK
243 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
244 self.old_wakeup = signal.set_wakeup_fd(self.write)
246 def tearDown(self):
247 signal.set_wakeup_fd(self.old_wakeup)
248 os.close(self.read)
249 os.close(self.write)
250 signal.signal(signal.SIGALRM, self.alrm)
252 class SiginterruptTest(unittest.TestCase):
253 signum = signal.SIGUSR1
254 def readpipe_interrupted(self, cb):
255 r, w = os.pipe()
256 ppid = os.getpid()
257 pid = os.fork()
259 oldhandler = signal.signal(self.signum, lambda x,y: None)
260 cb()
261 if pid==0:
262 # child code: sleep, kill, sleep. and then exit,
263 # which closes the pipe from which the parent process reads
264 try:
265 time.sleep(0.2)
266 os.kill(ppid, self.signum)
267 time.sleep(0.2)
268 finally:
269 exit_subprocess()
271 try:
272 os.close(w)
274 try:
275 d=os.read(r, 1)
276 return False
277 except OSError, err:
278 if err.errno != errno.EINTR:
279 raise
280 return True
281 finally:
282 signal.signal(self.signum, oldhandler)
283 os.waitpid(pid, 0)
285 def test_without_siginterrupt(self):
286 i=self.readpipe_interrupted(lambda: None)
287 self.assertEquals(i, True)
289 def test_siginterrupt_on(self):
290 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
291 self.assertEquals(i, True)
293 def test_siginterrupt_off(self):
294 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
295 self.assertEquals(i, False)
297 class ItimerTest(unittest.TestCase):
298 def setUp(self):
299 self.hndl_called = False
300 self.hndl_count = 0
301 self.itimer = None
302 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
304 def tearDown(self):
305 signal.signal(signal.SIGALRM, self.old_alarm)
306 if self.itimer is not None: # test_itimer_exc doesn't change this attr
307 # just ensure that itimer is stopped
308 signal.setitimer(self.itimer, 0)
310 def sig_alrm(self, *args):
311 self.hndl_called = True
312 if test_support.verbose:
313 print("SIGALRM handler invoked", args)
315 def sig_vtalrm(self, *args):
316 self.hndl_called = True
318 if self.hndl_count > 3:
319 # it shouldn't be here, because it should have been disabled.
320 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
321 "timer.")
322 elif self.hndl_count == 3:
323 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
324 signal.setitimer(signal.ITIMER_VIRTUAL, 0)
325 if test_support.verbose:
326 print("last SIGVTALRM handler call")
328 self.hndl_count += 1
330 if test_support.verbose:
331 print("SIGVTALRM handler invoked", args)
333 def sig_prof(self, *args):
334 self.hndl_called = True
335 signal.setitimer(signal.ITIMER_PROF, 0)
337 if test_support.verbose:
338 print("SIGPROF handler invoked", args)
340 def test_itimer_exc(self):
341 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
342 # defines it ?
343 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
344 # Negative times are treated as zero on some platforms.
345 if 0:
346 self.assertRaises(signal.ItimerError,
347 signal.setitimer, signal.ITIMER_REAL, -1)
349 def test_itimer_real(self):
350 self.itimer = signal.ITIMER_REAL
351 signal.setitimer(self.itimer, 1.0)
352 if test_support.verbose:
353 print("\ncall pause()...")
354 signal.pause()
356 self.assertEqual(self.hndl_called, True)
358 def test_itimer_virtual(self):
359 self.itimer = signal.ITIMER_VIRTUAL
360 signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
361 signal.setitimer(self.itimer, 0.3, 0.2)
363 start_time = time.time()
364 while time.time() - start_time < 5.0:
365 # use up some virtual time by doing real work
366 _ = pow(12345, 67890, 10000019)
367 if signal.getitimer(self.itimer) == (0.0, 0.0):
368 break # sig_vtalrm handler stopped this itimer
369 else:
370 self.fail('timeout waiting for sig_vtalrm signal')
372 # virtual itimer should be (0.0, 0.0) now
373 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
374 # and the handler should have been called
375 self.assertEquals(self.hndl_called, True)
377 def test_itimer_prof(self):
378 self.itimer = signal.ITIMER_PROF
379 signal.signal(signal.SIGPROF, self.sig_prof)
380 signal.setitimer(self.itimer, 0.2, 0.2)
382 start_time = time.time()
383 while time.time() - start_time < 5.0:
384 # do some work
385 _ = pow(12345, 67890, 10000019)
386 if signal.getitimer(self.itimer) == (0.0, 0.0):
387 break # sig_prof handler stopped this itimer
388 else:
389 self.fail('timeout waiting for sig_prof signal')
391 # profiling itimer should be (0.0, 0.0) now
392 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
393 # and the handler should have been called
394 self.assertEqual(self.hndl_called, True)
396 def test_main():
397 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
398 WakeupSignalTests, SiginterruptTest, ItimerTest)
401 if __name__ == "__main__":
402 test_main()