move sections
[python/dscho.git] / Lib / test / test_signal.py
blobc9de8a45815cafc53c2c71f531755293f156ea44
1 import unittest
2 from test import test_support
3 from contextlib import closing
4 import gc
5 import pickle
6 import select
7 import signal
8 import subprocess
9 import traceback
10 import sys, os, time, errno
12 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
13 raise unittest.SkipTest("Can't test signal on %s" % \
14 sys.platform)
17 class HandlerBCalled(Exception):
18 pass
21 def exit_subprocess():
22 """Use os._exit(0) to exit the current subprocess.
24 Otherwise, the test catches the SystemExit and continues executing
25 in parallel with the original test, so you wind up with an
26 exponential number of tests running concurrently.
27 """
28 os._exit(0)
31 def ignoring_eintr(__func, *args, **kwargs):
32 try:
33 return __func(*args, **kwargs)
34 except EnvironmentError as e:
35 if e.errno != errno.EINTR:
36 raise
37 return None
40 class InterProcessSignalTests(unittest.TestCase):
41 MAX_DURATION = 20 # Entire test should last at most 20 sec.
43 def setUp(self):
44 self.using_gc = gc.isenabled()
45 gc.disable()
47 def tearDown(self):
48 if self.using_gc:
49 gc.enable()
51 def format_frame(self, frame, limit=None):
52 return ''.join(traceback.format_stack(frame, limit=limit))
54 def handlerA(self, signum, frame):
55 self.a_called = True
56 if test_support.verbose:
57 print "handlerA invoked from signal %s at:\n%s" % (
58 signum, self.format_frame(frame, limit=1))
60 def handlerB(self, signum, frame):
61 self.b_called = True
62 if test_support.verbose:
63 print "handlerB invoked from signal %s at:\n%s" % (
64 signum, self.format_frame(frame, limit=1))
65 raise HandlerBCalled(signum, self.format_frame(frame))
67 def wait(self, child):
68 """Wait for child to finish, ignoring EINTR."""
69 while True:
70 try:
71 child.wait()
72 return
73 except OSError as e:
74 if e.errno != errno.EINTR:
75 raise
77 def run_test(self):
78 # Install handlers. This function runs in a sub-process, so we
79 # don't worry about re-setting the default handlers.
80 signal.signal(signal.SIGHUP, self.handlerA)
81 signal.signal(signal.SIGUSR1, self.handlerB)
82 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
83 signal.signal(signal.SIGALRM, signal.default_int_handler)
85 # Variables the signals will modify:
86 self.a_called = False
87 self.b_called = False
89 # Let the sub-processes know who to send signals to.
90 pid = os.getpid()
91 if test_support.verbose:
92 print "test runner's pid is", pid
94 child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
95 if child:
96 self.wait(child)
97 if not self.a_called:
98 time.sleep(1) # Give the signal time to be delivered.
99 self.assertTrue(self.a_called)
100 self.assertFalse(self.b_called)
101 self.a_called = False
103 # Make sure the signal isn't delivered while the previous
104 # Popen object is being destroyed, because __del__ swallows
105 # exceptions.
106 del child
107 try:
108 child = subprocess.Popen(['kill', '-USR1', str(pid)])
109 # This wait should be interrupted by the signal's exception.
110 self.wait(child)
111 time.sleep(1) # Give the signal time to be delivered.
112 self.fail('HandlerBCalled exception not thrown')
113 except HandlerBCalled:
114 self.assertTrue(self.b_called)
115 self.assertFalse(self.a_called)
116 if test_support.verbose:
117 print "HandlerBCalled exception caught"
119 child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
120 if child:
121 self.wait(child) # Nothing should happen.
123 try:
124 signal.alarm(1)
125 # The race condition in pause doesn't matter in this case,
126 # since alarm is going to raise a KeyboardException, which
127 # will skip the call.
128 signal.pause()
129 # But if another signal arrives before the alarm, pause
130 # may return early.
131 time.sleep(1)
132 except KeyboardInterrupt:
133 if test_support.verbose:
134 print "KeyboardInterrupt (the alarm() went off)"
135 except:
136 self.fail("Some other exception woke us from pause: %s" %
137 traceback.format_exc())
138 else:
139 self.fail("pause returned of its own accord, and the signal"
140 " didn't arrive after another second.")
142 # Issue 3864. Unknown if this affects earlier versions of freebsd also.
143 @unittest.skipIf(sys.platform=='freebsd6',
144 'inter process signals not reliable (do not mix well with threading) '
145 'on freebsd6')
146 def test_main(self):
147 # This function spawns a child process to insulate the main
148 # test-running process from all the signals. It then
149 # communicates with that child process over a pipe and
150 # re-raises information about any exceptions the child
151 # throws. The real work happens in self.run_test().
152 os_done_r, os_done_w = os.pipe()
153 with closing(os.fdopen(os_done_r)) as done_r, \
154 closing(os.fdopen(os_done_w, 'w')) as done_w:
155 child = os.fork()
156 if child == 0:
157 # In the child process; run the test and report results
158 # through the pipe.
159 try:
160 done_r.close()
161 # Have to close done_w again here because
162 # exit_subprocess() will skip the enclosing with block.
163 with closing(done_w):
164 try:
165 self.run_test()
166 except:
167 pickle.dump(traceback.format_exc(), done_w)
168 else:
169 pickle.dump(None, done_w)
170 except:
171 print 'Uh oh, raised from pickle.'
172 traceback.print_exc()
173 finally:
174 exit_subprocess()
176 done_w.close()
177 # Block for up to MAX_DURATION seconds for the test to finish.
178 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
179 if done_r in r:
180 tb = pickle.load(done_r)
181 if tb:
182 self.fail(tb)
183 else:
184 os.kill(child, signal.SIGKILL)
185 self.fail('Test deadlocked after %d seconds.' %
186 self.MAX_DURATION)
189 class BasicSignalTests(unittest.TestCase):
190 def trivial_signal_handler(self, *args):
191 pass
193 def test_out_of_range_signal_number_raises_error(self):
194 self.assertRaises(ValueError, signal.getsignal, 4242)
196 self.assertRaises(ValueError, signal.signal, 4242,
197 self.trivial_signal_handler)
199 def test_setting_signal_handler_to_none_raises_error(self):
200 self.assertRaises(TypeError, signal.signal,
201 signal.SIGUSR1, None)
203 def test_getsignal(self):
204 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
205 self.assertEquals(signal.getsignal(signal.SIGHUP),
206 self.trivial_signal_handler)
207 signal.signal(signal.SIGHUP, hup)
208 self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
211 class WakeupSignalTests(unittest.TestCase):
212 TIMEOUT_FULL = 10
213 TIMEOUT_HALF = 5
215 def test_wakeup_fd_early(self):
216 import select
218 signal.alarm(1)
219 before_time = time.time()
220 # We attempt to get a signal during the sleep,
221 # before select is called
222 time.sleep(self.TIMEOUT_FULL)
223 mid_time = time.time()
224 self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
225 select.select([self.read], [], [], self.TIMEOUT_FULL)
226 after_time = time.time()
227 self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
229 def test_wakeup_fd_during(self):
230 import select
232 signal.alarm(1)
233 before_time = time.time()
234 # We attempt to get a signal during the select call
235 self.assertRaises(select.error, select.select,
236 [self.read], [], [], self.TIMEOUT_FULL)
237 after_time = time.time()
238 self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
240 def setUp(self):
241 import fcntl
243 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
244 self.read, self.write = os.pipe()
245 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
246 flags = flags | os.O_NONBLOCK
247 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
248 self.old_wakeup = signal.set_wakeup_fd(self.write)
250 def tearDown(self):
251 signal.set_wakeup_fd(self.old_wakeup)
252 os.close(self.read)
253 os.close(self.write)
254 signal.signal(signal.SIGALRM, self.alrm)
256 class SiginterruptTest(unittest.TestCase):
257 signum = signal.SIGUSR1
259 def setUp(self):
260 """Install a no-op signal handler that can be set to allow
261 interrupts or not, and arrange for the original signal handler to be
262 re-installed when the test is finished.
264 oldhandler = signal.signal(self.signum, lambda x,y: None)
265 self.addCleanup(signal.signal, self.signum, oldhandler)
267 def readpipe_interrupted(self):
268 """Perform a read during which a signal will arrive. Return True if the
269 read is interrupted by the signal and raises an exception. Return False
270 if it returns normally.
272 # Create a pipe that can be used for the read. Also clean it up
273 # when the test is over, since nothing else will (but see below for
274 # the write end).
275 r, w = os.pipe()
276 self.addCleanup(os.close, r)
278 # Create another process which can send a signal to this one to try
279 # to interrupt the read.
280 ppid = os.getpid()
281 pid = os.fork()
283 if pid == 0:
284 # Child code: sleep to give the parent enough time to enter the
285 # read() call (there's a race here, but it's really tricky to
286 # eliminate it); then signal the parent process. Also, sleep
287 # again to make it likely that the signal is delivered to the
288 # parent process before the child exits. If the child exits
289 # first, the write end of the pipe will be closed and the test
290 # is invalid.
291 try:
292 time.sleep(0.2)
293 os.kill(ppid, self.signum)
294 time.sleep(0.2)
295 finally:
296 # No matter what, just exit as fast as possible now.
297 exit_subprocess()
298 else:
299 # Parent code.
300 # Make sure the child is eventually reaped, else it'll be a
301 # zombie for the rest of the test suite run.
302 self.addCleanup(os.waitpid, pid, 0)
304 # Close the write end of the pipe. The child has a copy, so
305 # it's not really closed until the child exits. We need it to
306 # close when the child exits so that in the non-interrupt case
307 # the read eventually completes, otherwise we could just close
308 # it *after* the test.
309 os.close(w)
311 # Try the read and report whether it is interrupted or not to
312 # the caller.
313 try:
314 d = os.read(r, 1)
315 return False
316 except OSError, err:
317 if err.errno != errno.EINTR:
318 raise
319 return True
321 def test_without_siginterrupt(self):
322 """If a signal handler is installed and siginterrupt is not called
323 at all, when that signal arrives, it interrupts a syscall that's in
324 progress.
326 i = self.readpipe_interrupted()
327 self.assertTrue(i)
328 # Arrival of the signal shouldn't have changed anything.
329 i = self.readpipe_interrupted()
330 self.assertTrue(i)
332 def test_siginterrupt_on(self):
333 """If a signal handler is installed and siginterrupt is called with
334 a true value for the second argument, when that signal arrives, it
335 interrupts a syscall that's in progress.
337 signal.siginterrupt(self.signum, 1)
338 i = self.readpipe_interrupted()
339 self.assertTrue(i)
340 # Arrival of the signal shouldn't have changed anything.
341 i = self.readpipe_interrupted()
342 self.assertTrue(i)
344 def test_siginterrupt_off(self):
345 """If a signal handler is installed and siginterrupt is called with
346 a false value for the second argument, when that signal arrives, it
347 does not interrupt a syscall that's in progress.
349 signal.siginterrupt(self.signum, 0)
350 i = self.readpipe_interrupted()
351 self.assertFalse(i)
352 # Arrival of the signal shouldn't have changed anything.
353 i = self.readpipe_interrupted()
354 self.assertFalse(i)
358 class ItimerTest(unittest.TestCase):
359 def setUp(self):
360 self.hndl_called = False
361 self.hndl_count = 0
362 self.itimer = None
363 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
365 def tearDown(self):
366 signal.signal(signal.SIGALRM, self.old_alarm)
367 if self.itimer is not None: # test_itimer_exc doesn't change this attr
368 # just ensure that itimer is stopped
369 signal.setitimer(self.itimer, 0)
371 def sig_alrm(self, *args):
372 self.hndl_called = True
373 if test_support.verbose:
374 print("SIGALRM handler invoked", args)
376 def sig_vtalrm(self, *args):
377 self.hndl_called = True
379 if self.hndl_count > 3:
380 # it shouldn't be here, because it should have been disabled.
381 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
382 "timer.")
383 elif self.hndl_count == 3:
384 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
385 signal.setitimer(signal.ITIMER_VIRTUAL, 0)
386 if test_support.verbose:
387 print("last SIGVTALRM handler call")
389 self.hndl_count += 1
391 if test_support.verbose:
392 print("SIGVTALRM handler invoked", args)
394 def sig_prof(self, *args):
395 self.hndl_called = True
396 signal.setitimer(signal.ITIMER_PROF, 0)
398 if test_support.verbose:
399 print("SIGPROF handler invoked", args)
401 def test_itimer_exc(self):
402 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
403 # defines it ?
404 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
405 # Negative times are treated as zero on some platforms.
406 if 0:
407 self.assertRaises(signal.ItimerError,
408 signal.setitimer, signal.ITIMER_REAL, -1)
410 def test_itimer_real(self):
411 self.itimer = signal.ITIMER_REAL
412 signal.setitimer(self.itimer, 1.0)
413 if test_support.verbose:
414 print("\ncall pause()...")
415 signal.pause()
417 self.assertEqual(self.hndl_called, True)
419 # Issue 3864. Unknown if this affects earlier versions of freebsd also.
420 @unittest.skipIf(sys.platform=='freebsd6',
421 'itimer not reliable (does not mix well with threading) on freebsd6')
422 def test_itimer_virtual(self):
423 self.itimer = signal.ITIMER_VIRTUAL
424 signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
425 signal.setitimer(self.itimer, 0.3, 0.2)
427 start_time = time.time()
428 while time.time() - start_time < 60.0:
429 # use up some virtual time by doing real work
430 _ = pow(12345, 67890, 10000019)
431 if signal.getitimer(self.itimer) == (0.0, 0.0):
432 break # sig_vtalrm handler stopped this itimer
433 else: # Issue 8424
434 self.skipTest("timeout: likely cause: machine too slow or load too "
435 "high")
437 # virtual itimer should be (0.0, 0.0) now
438 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
439 # and the handler should have been called
440 self.assertEquals(self.hndl_called, True)
442 # Issue 3864. Unknown if this affects earlier versions of freebsd also.
443 @unittest.skipIf(sys.platform=='freebsd6',
444 'itimer not reliable (does not mix well with threading) on freebsd6')
445 def test_itimer_prof(self):
446 self.itimer = signal.ITIMER_PROF
447 signal.signal(signal.SIGPROF, self.sig_prof)
448 signal.setitimer(self.itimer, 0.2, 0.2)
450 start_time = time.time()
451 while time.time() - start_time < 60.0:
452 # do some work
453 _ = pow(12345, 67890, 10000019)
454 if signal.getitimer(self.itimer) == (0.0, 0.0):
455 break # sig_prof handler stopped this itimer
456 else: # Issue 8424
457 self.skipTest("timeout: likely cause: machine too slow or load too "
458 "high")
460 # profiling itimer should be (0.0, 0.0) now
461 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
462 # and the handler should have been called
463 self.assertEqual(self.hndl_called, True)
465 def test_main():
466 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
467 WakeupSignalTests, SiginterruptTest, ItimerTest)
470 if __name__ == "__main__":
471 test_main()