#1153769: document PEP 237 changes to string formatting.
[python.git] / Lib / test / test_signal.py
blob64c9cdaa01e874546523bd494fa8a8c5b2724ca3
1 import unittest
2 from test import test_support
3 from contextlib import closing, nested
4 import gc
5 import pickle
6 import select
7 import signal
8 import subprocess
9 import traceback
10 import sys, os, time, errno
12 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
13 raise test_support.TestSkipped("Can't test signal on %s" % \
14 sys.platform)
17 class HandlerBCalled(Exception):
18 pass
21 def exit_subprocess():
22 """Use os._exit(0) to exit the current subprocess.
24 Otherwise, the test catches the SystemExit and continues executing
25 in parallel with the original test, so you wind up with an
26 exponential number of tests running concurrently.
27 """
28 os._exit(0)
31 def ignoring_eintr(__func, *args, **kwargs):
32 try:
33 return __func(*args, **kwargs)
34 except EnvironmentError as e:
35 if e.errno != errno.EINTR:
36 raise
37 return None
40 class InterProcessSignalTests(unittest.TestCase):
41 MAX_DURATION = 20 # Entire test should last at most 20 sec.
43 def setUp(self):
44 self.using_gc = gc.isenabled()
45 gc.disable()
47 def tearDown(self):
48 if self.using_gc:
49 gc.enable()
51 def format_frame(self, frame, limit=None):
52 return ''.join(traceback.format_stack(frame, limit=limit))
54 def handlerA(self, signum, frame):
55 self.a_called = True
56 if test_support.verbose:
57 print "handlerA invoked from signal %s at:\n%s" % (
58 signum, self.format_frame(frame, limit=1))
60 def handlerB(self, signum, frame):
61 self.b_called = True
62 if test_support.verbose:
63 print "handlerB invoked from signal %s at:\n%s" % (
64 signum, self.format_frame(frame, limit=1))
65 raise HandlerBCalled(signum, self.format_frame(frame))
67 def wait(self, child):
68 """Wait for child to finish, ignoring EINTR."""
69 while True:
70 try:
71 child.wait()
72 return
73 except OSError as e:
74 if e.errno != errno.EINTR:
75 raise
77 def run_test(self):
78 # Install handlers. This function runs in a sub-process, so we
79 # don't worry about re-setting the default handlers.
80 signal.signal(signal.SIGHUP, self.handlerA)
81 signal.signal(signal.SIGUSR1, self.handlerB)
82 signal.signal(signal.SIGUSR2, signal.SIG_IGN)
83 signal.signal(signal.SIGALRM, signal.default_int_handler)
85 # Variables the signals will modify:
86 self.a_called = False
87 self.b_called = False
89 # Let the sub-processes know who to send signals to.
90 pid = os.getpid()
91 if test_support.verbose:
92 print "test runner's pid is", pid
94 child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
95 if child:
96 self.wait(child)
97 if not self.a_called:
98 time.sleep(1) # Give the signal time to be delivered.
99 self.assertTrue(self.a_called)
100 self.assertFalse(self.b_called)
101 self.a_called = False
103 # Make sure the signal isn't delivered while the previous
104 # Popen object is being destroyed, because __del__ swallows
105 # exceptions.
106 del child
107 try:
108 child = subprocess.Popen(['kill', '-USR1', str(pid)])
109 # This wait should be interrupted by the signal's exception.
110 self.wait(child)
111 time.sleep(1) # Give the signal time to be delivered.
112 self.fail('HandlerBCalled exception not thrown')
113 except HandlerBCalled:
114 self.assertTrue(self.b_called)
115 self.assertFalse(self.a_called)
116 if test_support.verbose:
117 print "HandlerBCalled exception caught"
119 child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
120 if child:
121 self.wait(child) # Nothing should happen.
123 try:
124 signal.alarm(1)
125 # The race condition in pause doesn't matter in this case,
126 # since alarm is going to raise a KeyboardException, which
127 # will skip the call.
128 signal.pause()
129 # But if another signal arrives before the alarm, pause
130 # may return early.
131 time.sleep(1)
132 except KeyboardInterrupt:
133 if test_support.verbose:
134 print "KeyboardInterrupt (the alarm() went off)"
135 except:
136 self.fail("Some other exception woke us from pause: %s" %
137 traceback.format_exc())
138 else:
139 self.fail("pause returned of its own accord, and the signal"
140 " didn't arrive after another second.")
142 def test_main(self):
143 # This function spawns a child process to insulate the main
144 # test-running process from all the signals. It then
145 # communicates with that child process over a pipe and
146 # re-raises information about any exceptions the child
147 # throws. The real work happens in self.run_test().
148 os_done_r, os_done_w = os.pipe()
149 with nested(closing(os.fdopen(os_done_r)),
150 closing(os.fdopen(os_done_w, 'w'))) as (done_r, done_w):
151 child = os.fork()
152 if child == 0:
153 # In the child process; run the test and report results
154 # through the pipe.
155 try:
156 done_r.close()
157 # Have to close done_w again here because
158 # exit_subprocess() will skip the enclosing with block.
159 with closing(done_w):
160 try:
161 self.run_test()
162 except:
163 pickle.dump(traceback.format_exc(), done_w)
164 else:
165 pickle.dump(None, done_w)
166 except:
167 print 'Uh oh, raised from pickle.'
168 traceback.print_exc()
169 finally:
170 exit_subprocess()
172 done_w.close()
173 # Block for up to MAX_DURATION seconds for the test to finish.
174 r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
175 if done_r in r:
176 tb = pickle.load(done_r)
177 if tb:
178 self.fail(tb)
179 else:
180 os.kill(child, signal.SIGKILL)
181 self.fail('Test deadlocked after %d seconds.' %
182 self.MAX_DURATION)
185 class BasicSignalTests(unittest.TestCase):
186 def trivial_signal_handler(self, *args):
187 pass
189 def test_out_of_range_signal_number_raises_error(self):
190 self.assertRaises(ValueError, signal.getsignal, 4242)
192 self.assertRaises(ValueError, signal.signal, 4242,
193 self.trivial_signal_handler)
195 def test_setting_signal_handler_to_none_raises_error(self):
196 self.assertRaises(TypeError, signal.signal,
197 signal.SIGUSR1, None)
199 def test_getsignal(self):
200 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
201 self.assertEquals(signal.getsignal(signal.SIGHUP),
202 self.trivial_signal_handler)
203 signal.signal(signal.SIGHUP, hup)
204 self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
207 class WakeupSignalTests(unittest.TestCase):
208 TIMEOUT_FULL = 10
209 TIMEOUT_HALF = 5
211 def test_wakeup_fd_early(self):
212 import select
214 signal.alarm(1)
215 before_time = time.time()
216 # We attempt to get a signal during the sleep,
217 # before select is called
218 time.sleep(self.TIMEOUT_FULL)
219 mid_time = time.time()
220 self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
221 select.select([self.read], [], [], self.TIMEOUT_FULL)
222 after_time = time.time()
223 self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
225 def test_wakeup_fd_during(self):
226 import select
228 signal.alarm(1)
229 before_time = time.time()
230 # We attempt to get a signal during the select call
231 self.assertRaises(select.error, select.select,
232 [self.read], [], [], self.TIMEOUT_FULL)
233 after_time = time.time()
234 self.assert_(after_time - before_time < self.TIMEOUT_HALF)
236 def setUp(self):
237 import fcntl
239 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
240 self.read, self.write = os.pipe()
241 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
242 flags = flags | os.O_NONBLOCK
243 fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
244 self.old_wakeup = signal.set_wakeup_fd(self.write)
246 def tearDown(self):
247 signal.set_wakeup_fd(self.old_wakeup)
248 os.close(self.read)
249 os.close(self.write)
250 signal.signal(signal.SIGALRM, self.alrm)
252 class SiginterruptTest(unittest.TestCase):
253 signum = signal.SIGUSR1
254 def readpipe_interrupted(self, cb):
255 r, w = os.pipe()
256 ppid = os.getpid()
257 pid = os.fork()
259 oldhandler = signal.signal(self.signum, lambda x,y: None)
260 cb()
261 if pid==0:
262 # child code: sleep, kill, sleep. and then exit,
263 # which closes the pipe from which the parent process reads
264 try:
265 time.sleep(0.2)
266 os.kill(ppid, self.signum)
267 time.sleep(0.2)
268 finally:
269 exit_subprocess()
271 try:
272 os.close(w)
274 try:
275 d=os.read(r, 1)
276 return False
277 except OSError, err:
278 if err.errno != errno.EINTR:
279 raise
280 return True
281 finally:
282 signal.signal(self.signum, oldhandler)
283 os.waitpid(pid, 0)
285 def test_without_siginterrupt(self):
286 i=self.readpipe_interrupted(lambda: None)
287 self.assertEquals(i, True)
289 def test_siginterrupt_on(self):
290 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
291 self.assertEquals(i, True)
293 def test_siginterrupt_off(self):
294 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
295 self.assertEquals(i, False)
297 class ItimerTest(unittest.TestCase):
298 def setUp(self):
299 self.hndl_called = False
300 self.hndl_count = 0
301 self.itimer = None
302 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
304 def tearDown(self):
305 signal.signal(signal.SIGALRM, self.old_alarm)
306 if self.itimer is not None: # test_itimer_exc doesn't change this attr
307 # just ensure that itimer is stopped
308 signal.setitimer(self.itimer, 0)
310 def sig_alrm(self, *args):
311 self.hndl_called = True
312 if test_support.verbose:
313 print("SIGALRM handler invoked", args)
315 def sig_vtalrm(self, *args):
316 self.hndl_called = True
318 if self.hndl_count > 3:
319 # it shouldn't be here, because it should have been disabled.
320 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
321 "timer.")
322 elif self.hndl_count == 3:
323 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
324 signal.setitimer(signal.ITIMER_VIRTUAL, 0)
325 if test_support.verbose:
326 print("last SIGVTALRM handler call")
328 self.hndl_count += 1
330 if test_support.verbose:
331 print("SIGVTALRM handler invoked", args)
333 def sig_prof(self, *args):
334 self.hndl_called = True
335 signal.setitimer(signal.ITIMER_PROF, 0)
337 if test_support.verbose:
338 print("SIGPROF handler invoked", args)
340 def test_itimer_exc(self):
341 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
342 # defines it ?
343 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
344 # Negative times are treated as zero on some platforms.
345 if 0:
346 self.assertRaises(signal.ItimerError,
347 signal.setitimer, signal.ITIMER_REAL, -1)
349 def test_itimer_real(self):
350 self.itimer = signal.ITIMER_REAL
351 signal.setitimer(self.itimer, 1.0)
352 if test_support.verbose:
353 print("\ncall pause()...")
354 signal.pause()
356 self.assertEqual(self.hndl_called, True)
358 def test_itimer_virtual(self):
359 self.itimer = signal.ITIMER_VIRTUAL
360 signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
361 signal.setitimer(self.itimer, 0.3, 0.2)
363 for i in xrange(100000000):
364 if signal.getitimer(self.itimer) == (0.0, 0.0):
365 break # sig_vtalrm handler stopped this itimer
367 # virtual itimer should be (0.0, 0.0) now
368 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
369 # and the handler should have been called
370 self.assertEquals(self.hndl_called, True)
372 def test_itimer_prof(self):
373 self.itimer = signal.ITIMER_PROF
374 signal.signal(signal.SIGPROF, self.sig_prof)
375 signal.setitimer(self.itimer, 0.2, 0.2)
377 for i in xrange(100000000):
378 if signal.getitimer(self.itimer) == (0.0, 0.0):
379 break # sig_prof handler stopped this itimer
381 # profiling itimer should be (0.0, 0.0) now
382 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
383 # and the handler should have been called
384 self.assertEqual(self.hndl_called, True)
386 def test_main():
387 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
388 WakeupSignalTests, SiginterruptTest, ItimerTest)
391 if __name__ == "__main__":
392 test_main()