2 from test
import test_support
3 from contextlib
import closing
10 import sys
, os
, time
, errno
12 if sys
.platform
[:3] in ('win', 'os2') or sys
.platform
== 'riscos':
13 raise unittest
.SkipTest("Can't test signal on %s" % \
17 class HandlerBCalled(Exception):
21 def exit_subprocess():
22 """Use os._exit(0) to exit the current subprocess.
24 Otherwise, the test catches the SystemExit and continues executing
25 in parallel with the original test, so you wind up with an
26 exponential number of tests running concurrently.
31 def ignoring_eintr(__func
, *args
, **kwargs
):
33 return __func(*args
, **kwargs
)
34 except EnvironmentError as e
:
35 if e
.errno
!= errno
.EINTR
:
40 class InterProcessSignalTests(unittest
.TestCase
):
41 MAX_DURATION
= 20 # Entire test should last at most 20 sec.
44 self
.using_gc
= gc
.isenabled()
51 def format_frame(self
, frame
, limit
=None):
52 return ''.join(traceback
.format_stack(frame
, limit
=limit
))
54 def handlerA(self
, signum
, frame
):
56 if test_support
.verbose
:
57 print "handlerA invoked from signal %s at:\n%s" % (
58 signum
, self
.format_frame(frame
, limit
=1))
60 def handlerB(self
, signum
, frame
):
62 if test_support
.verbose
:
63 print "handlerB invoked from signal %s at:\n%s" % (
64 signum
, self
.format_frame(frame
, limit
=1))
65 raise HandlerBCalled(signum
, self
.format_frame(frame
))
67 def wait(self
, child
):
68 """Wait for child to finish, ignoring EINTR."""
74 if e
.errno
!= errno
.EINTR
:
78 # Install handlers. This function runs in a sub-process, so we
79 # don't worry about re-setting the default handlers.
80 signal
.signal(signal
.SIGHUP
, self
.handlerA
)
81 signal
.signal(signal
.SIGUSR1
, self
.handlerB
)
82 signal
.signal(signal
.SIGUSR2
, signal
.SIG_IGN
)
83 signal
.signal(signal
.SIGALRM
, signal
.default_int_handler
)
85 # Variables the signals will modify:
89 # Let the sub-processes know who to send signals to.
91 if test_support
.verbose
:
92 print "test runner's pid is", pid
94 child
= ignoring_eintr(subprocess
.Popen
, ['kill', '-HUP', str(pid
)])
98 time
.sleep(1) # Give the signal time to be delivered.
99 self
.assertTrue(self
.a_called
)
100 self
.assertFalse(self
.b_called
)
101 self
.a_called
= False
103 # Make sure the signal isn't delivered while the previous
104 # Popen object is being destroyed, because __del__ swallows
108 child
= subprocess
.Popen(['kill', '-USR1', str(pid
)])
109 # This wait should be interrupted by the signal's exception.
111 time
.sleep(1) # Give the signal time to be delivered.
112 self
.fail('HandlerBCalled exception not thrown')
113 except HandlerBCalled
:
114 self
.assertTrue(self
.b_called
)
115 self
.assertFalse(self
.a_called
)
116 if test_support
.verbose
:
117 print "HandlerBCalled exception caught"
119 child
= ignoring_eintr(subprocess
.Popen
, ['kill', '-USR2', str(pid
)])
121 self
.wait(child
) # Nothing should happen.
125 # The race condition in pause doesn't matter in this case,
126 # since alarm is going to raise a KeyboardException, which
127 # will skip the call.
129 # But if another signal arrives before the alarm, pause
132 except KeyboardInterrupt:
133 if test_support
.verbose
:
134 print "KeyboardInterrupt (the alarm() went off)"
136 self
.fail("Some other exception woke us from pause: %s" %
137 traceback
.format_exc())
139 self
.fail("pause returned of its own accord, and the signal"
140 " didn't arrive after another second.")
142 # Issue 3864. Unknown if this affects earlier versions of freebsd also.
143 @unittest.skipIf(sys
.platform
=='freebsd6',
144 'inter process signals not reliable (do not mix well with threading) '
147 # This function spawns a child process to insulate the main
148 # test-running process from all the signals. It then
149 # communicates with that child process over a pipe and
150 # re-raises information about any exceptions the child
151 # throws. The real work happens in self.run_test().
152 os_done_r
, os_done_w
= os
.pipe()
153 with
closing(os
.fdopen(os_done_r
)) as done_r
, \
154 closing(os
.fdopen(os_done_w
, 'w')) as done_w
:
157 # In the child process; run the test and report results
161 # Have to close done_w again here because
162 # exit_subprocess() will skip the enclosing with block.
163 with
closing(done_w
):
167 pickle
.dump(traceback
.format_exc(), done_w
)
169 pickle
.dump(None, done_w
)
171 print 'Uh oh, raised from pickle.'
172 traceback
.print_exc()
177 # Block for up to MAX_DURATION seconds for the test to finish.
178 r
, w
, x
= select
.select([done_r
], [], [], self
.MAX_DURATION
)
180 tb
= pickle
.load(done_r
)
184 os
.kill(child
, signal
.SIGKILL
)
185 self
.fail('Test deadlocked after %d seconds.' %
189 class BasicSignalTests(unittest
.TestCase
):
190 def trivial_signal_handler(self
, *args
):
193 def test_out_of_range_signal_number_raises_error(self
):
194 self
.assertRaises(ValueError, signal
.getsignal
, 4242)
196 self
.assertRaises(ValueError, signal
.signal
, 4242,
197 self
.trivial_signal_handler
)
199 def test_setting_signal_handler_to_none_raises_error(self
):
200 self
.assertRaises(TypeError, signal
.signal
,
201 signal
.SIGUSR1
, None)
203 def test_getsignal(self
):
204 hup
= signal
.signal(signal
.SIGHUP
, self
.trivial_signal_handler
)
205 self
.assertEquals(signal
.getsignal(signal
.SIGHUP
),
206 self
.trivial_signal_handler
)
207 signal
.signal(signal
.SIGHUP
, hup
)
208 self
.assertEquals(signal
.getsignal(signal
.SIGHUP
), hup
)
211 class WakeupSignalTests(unittest
.TestCase
):
215 def test_wakeup_fd_early(self
):
219 before_time
= time
.time()
220 # We attempt to get a signal during the sleep,
221 # before select is called
222 time
.sleep(self
.TIMEOUT_FULL
)
223 mid_time
= time
.time()
224 self
.assertTrue(mid_time
- before_time
< self
.TIMEOUT_HALF
)
225 select
.select([self
.read
], [], [], self
.TIMEOUT_FULL
)
226 after_time
= time
.time()
227 self
.assertTrue(after_time
- mid_time
< self
.TIMEOUT_HALF
)
229 def test_wakeup_fd_during(self
):
233 before_time
= time
.time()
234 # We attempt to get a signal during the select call
235 self
.assertRaises(select
.error
, select
.select
,
236 [self
.read
], [], [], self
.TIMEOUT_FULL
)
237 after_time
= time
.time()
238 self
.assertTrue(after_time
- before_time
< self
.TIMEOUT_HALF
)
243 self
.alrm
= signal
.signal(signal
.SIGALRM
, lambda x
,y
:None)
244 self
.read
, self
.write
= os
.pipe()
245 flags
= fcntl
.fcntl(self
.write
, fcntl
.F_GETFL
, 0)
246 flags
= flags | os
.O_NONBLOCK
247 fcntl
.fcntl(self
.write
, fcntl
.F_SETFL
, flags
)
248 self
.old_wakeup
= signal
.set_wakeup_fd(self
.write
)
251 signal
.set_wakeup_fd(self
.old_wakeup
)
254 signal
.signal(signal
.SIGALRM
, self
.alrm
)
256 class SiginterruptTest(unittest
.TestCase
):
257 signum
= signal
.SIGUSR1
260 """Install a no-op signal handler that can be set to allow
261 interrupts or not, and arrange for the original signal handler to be
262 re-installed when the test is finished.
264 oldhandler
= signal
.signal(self
.signum
, lambda x
,y
: None)
265 self
.addCleanup(signal
.signal
, self
.signum
, oldhandler
)
267 def readpipe_interrupted(self
):
268 """Perform a read during which a signal will arrive. Return True if the
269 read is interrupted by the signal and raises an exception. Return False
270 if it returns normally.
272 # Create a pipe that can be used for the read. Also clean it up
273 # when the test is over, since nothing else will (but see below for
276 self
.addCleanup(os
.close
, r
)
278 # Create another process which can send a signal to this one to try
279 # to interrupt the read.
284 # Child code: sleep to give the parent enough time to enter the
285 # read() call (there's a race here, but it's really tricky to
286 # eliminate it); then signal the parent process. Also, sleep
287 # again to make it likely that the signal is delivered to the
288 # parent process before the child exits. If the child exits
289 # first, the write end of the pipe will be closed and the test
293 os
.kill(ppid
, self
.signum
)
296 # No matter what, just exit as fast as possible now.
300 # Make sure the child is eventually reaped, else it'll be a
301 # zombie for the rest of the test suite run.
302 self
.addCleanup(os
.waitpid
, pid
, 0)
304 # Close the write end of the pipe. The child has a copy, so
305 # it's not really closed until the child exits. We need it to
306 # close when the child exits so that in the non-interrupt case
307 # the read eventually completes, otherwise we could just close
308 # it *after* the test.
311 # Try the read and report whether it is interrupted or not to
317 if err
.errno
!= errno
.EINTR
:
321 def test_without_siginterrupt(self
):
322 """If a signal handler is installed and siginterrupt is not called
323 at all, when that signal arrives, it interrupts a syscall that's in
326 i
= self
.readpipe_interrupted()
328 # Arrival of the signal shouldn't have changed anything.
329 i
= self
.readpipe_interrupted()
332 def test_siginterrupt_on(self
):
333 """If a signal handler is installed and siginterrupt is called with
334 a true value for the second argument, when that signal arrives, it
335 interrupts a syscall that's in progress.
337 signal
.siginterrupt(self
.signum
, 1)
338 i
= self
.readpipe_interrupted()
340 # Arrival of the signal shouldn't have changed anything.
341 i
= self
.readpipe_interrupted()
344 def test_siginterrupt_off(self
):
345 """If a signal handler is installed and siginterrupt is called with
346 a false value for the second argument, when that signal arrives, it
347 does not interrupt a syscall that's in progress.
349 signal
.siginterrupt(self
.signum
, 0)
350 i
= self
.readpipe_interrupted()
352 # Arrival of the signal shouldn't have changed anything.
353 i
= self
.readpipe_interrupted()
358 class ItimerTest(unittest
.TestCase
):
360 self
.hndl_called
= False
363 self
.old_alarm
= signal
.signal(signal
.SIGALRM
, self
.sig_alrm
)
366 signal
.signal(signal
.SIGALRM
, self
.old_alarm
)
367 if self
.itimer
is not None: # test_itimer_exc doesn't change this attr
368 # just ensure that itimer is stopped
369 signal
.setitimer(self
.itimer
, 0)
371 def sig_alrm(self
, *args
):
372 self
.hndl_called
= True
373 if test_support
.verbose
:
374 print("SIGALRM handler invoked", args
)
376 def sig_vtalrm(self
, *args
):
377 self
.hndl_called
= True
379 if self
.hndl_count
> 3:
380 # it shouldn't be here, because it should have been disabled.
381 raise signal
.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
383 elif self
.hndl_count
== 3:
384 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
385 signal
.setitimer(signal
.ITIMER_VIRTUAL
, 0)
386 if test_support
.verbose
:
387 print("last SIGVTALRM handler call")
391 if test_support
.verbose
:
392 print("SIGVTALRM handler invoked", args
)
394 def sig_prof(self
, *args
):
395 self
.hndl_called
= True
396 signal
.setitimer(signal
.ITIMER_PROF
, 0)
398 if test_support
.verbose
:
399 print("SIGPROF handler invoked", args
)
401 def test_itimer_exc(self
):
402 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
404 self
.assertRaises(signal
.ItimerError
, signal
.setitimer
, -1, 0)
405 # Negative times are treated as zero on some platforms.
407 self
.assertRaises(signal
.ItimerError
,
408 signal
.setitimer
, signal
.ITIMER_REAL
, -1)
410 def test_itimer_real(self
):
411 self
.itimer
= signal
.ITIMER_REAL
412 signal
.setitimer(self
.itimer
, 1.0)
413 if test_support
.verbose
:
414 print("\ncall pause()...")
417 self
.assertEqual(self
.hndl_called
, True)
419 # Issue 3864. Unknown if this affects earlier versions of freebsd also.
420 @unittest.skipIf(sys
.platform
=='freebsd6',
421 'itimer not reliable (does not mix well with threading) on freebsd6')
422 def test_itimer_virtual(self
):
423 self
.itimer
= signal
.ITIMER_VIRTUAL
424 signal
.signal(signal
.SIGVTALRM
, self
.sig_vtalrm
)
425 signal
.setitimer(self
.itimer
, 0.3, 0.2)
427 start_time
= time
.time()
428 while time
.time() - start_time
< 60.0:
429 # use up some virtual time by doing real work
430 _
= pow(12345, 67890, 10000019)
431 if signal
.getitimer(self
.itimer
) == (0.0, 0.0):
432 break # sig_vtalrm handler stopped this itimer
434 self
.skipTest("timeout: likely cause: machine too slow or load too "
437 # virtual itimer should be (0.0, 0.0) now
438 self
.assertEquals(signal
.getitimer(self
.itimer
), (0.0, 0.0))
439 # and the handler should have been called
440 self
.assertEquals(self
.hndl_called
, True)
442 # Issue 3864. Unknown if this affects earlier versions of freebsd also.
443 @unittest.skipIf(sys
.platform
=='freebsd6',
444 'itimer not reliable (does not mix well with threading) on freebsd6')
445 def test_itimer_prof(self
):
446 self
.itimer
= signal
.ITIMER_PROF
447 signal
.signal(signal
.SIGPROF
, self
.sig_prof
)
448 signal
.setitimer(self
.itimer
, 0.2, 0.2)
450 start_time
= time
.time()
451 while time
.time() - start_time
< 60.0:
453 _
= pow(12345, 67890, 10000019)
454 if signal
.getitimer(self
.itimer
) == (0.0, 0.0):
455 break # sig_prof handler stopped this itimer
457 self
.skipTest("timeout: likely cause: machine too slow or load too "
460 # profiling itimer should be (0.0, 0.0) now
461 self
.assertEquals(signal
.getitimer(self
.itimer
), (0.0, 0.0))
462 # and the handler should have been called
463 self
.assertEqual(self
.hndl_called
, True)
466 test_support
.run_unittest(BasicSignalTests
, InterProcessSignalTests
,
467 WakeupSignalTests
, SiginterruptTest
, ItimerTest
)
470 if __name__
== "__main__":