2 from test
import test_support
3 from contextlib
import closing
10 import sys
, os
, time
, errno
12 if sys
.platform
[:3] in ('win', 'os2') or sys
.platform
== 'riscos':
13 raise unittest
.SkipTest("Can't test signal on %s" % \
17 class HandlerBCalled(Exception):
21 def exit_subprocess():
22 """Use os._exit(0) to exit the current subprocess.
24 Otherwise, the test catches the SystemExit and continues executing
25 in parallel with the original test, so you wind up with an
26 exponential number of tests running concurrently.
31 def ignoring_eintr(__func
, *args
, **kwargs
):
33 return __func(*args
, **kwargs
)
34 except EnvironmentError as e
:
35 if e
.errno
!= errno
.EINTR
:
40 class InterProcessSignalTests(unittest
.TestCase
):
41 MAX_DURATION
= 20 # Entire test should last at most 20 sec.
44 self
.using_gc
= gc
.isenabled()
51 def format_frame(self
, frame
, limit
=None):
52 return ''.join(traceback
.format_stack(frame
, limit
=limit
))
54 def handlerA(self
, signum
, frame
):
56 if test_support
.verbose
:
57 print "handlerA invoked from signal %s at:\n%s" % (
58 signum
, self
.format_frame(frame
, limit
=1))
60 def handlerB(self
, signum
, frame
):
62 if test_support
.verbose
:
63 print "handlerB invoked from signal %s at:\n%s" % (
64 signum
, self
.format_frame(frame
, limit
=1))
65 raise HandlerBCalled(signum
, self
.format_frame(frame
))
67 def wait(self
, child
):
68 """Wait for child to finish, ignoring EINTR."""
74 if e
.errno
!= errno
.EINTR
:
78 # Install handlers. This function runs in a sub-process, so we
79 # don't worry about re-setting the default handlers.
80 signal
.signal(signal
.SIGHUP
, self
.handlerA
)
81 signal
.signal(signal
.SIGUSR1
, self
.handlerB
)
82 signal
.signal(signal
.SIGUSR2
, signal
.SIG_IGN
)
83 signal
.signal(signal
.SIGALRM
, signal
.default_int_handler
)
85 # Variables the signals will modify:
89 # Let the sub-processes know who to send signals to.
91 if test_support
.verbose
:
92 print "test runner's pid is", pid
94 child
= ignoring_eintr(subprocess
.Popen
, ['kill', '-HUP', str(pid
)])
98 time
.sleep(1) # Give the signal time to be delivered.
99 self
.assertTrue(self
.a_called
)
100 self
.assertFalse(self
.b_called
)
101 self
.a_called
= False
103 # Make sure the signal isn't delivered while the previous
104 # Popen object is being destroyed, because __del__ swallows
108 child
= subprocess
.Popen(['kill', '-USR1', str(pid
)])
109 # This wait should be interrupted by the signal's exception.
111 time
.sleep(1) # Give the signal time to be delivered.
112 self
.fail('HandlerBCalled exception not thrown')
113 except HandlerBCalled
:
114 self
.assertTrue(self
.b_called
)
115 self
.assertFalse(self
.a_called
)
116 if test_support
.verbose
:
117 print "HandlerBCalled exception caught"
119 child
= ignoring_eintr(subprocess
.Popen
, ['kill', '-USR2', str(pid
)])
121 self
.wait(child
) # Nothing should happen.
125 # The race condition in pause doesn't matter in this case,
126 # since alarm is going to raise a KeyboardException, which
127 # will skip the call.
129 # But if another signal arrives before the alarm, pause
132 except KeyboardInterrupt:
133 if test_support
.verbose
:
134 print "KeyboardInterrupt (the alarm() went off)"
136 self
.fail("Some other exception woke us from pause: %s" %
137 traceback
.format_exc())
139 self
.fail("pause returned of its own accord, and the signal"
140 " didn't arrive after another second.")
143 # This function spawns a child process to insulate the main
144 # test-running process from all the signals. It then
145 # communicates with that child process over a pipe and
146 # re-raises information about any exceptions the child
147 # throws. The real work happens in self.run_test().
148 os_done_r
, os_done_w
= os
.pipe()
149 with
closing(os
.fdopen(os_done_r
)) as done_r
, \
150 closing(os
.fdopen(os_done_w
, 'w')) as done_w
:
153 # In the child process; run the test and report results
157 # Have to close done_w again here because
158 # exit_subprocess() will skip the enclosing with block.
159 with
closing(done_w
):
163 pickle
.dump(traceback
.format_exc(), done_w
)
165 pickle
.dump(None, done_w
)
167 print 'Uh oh, raised from pickle.'
168 traceback
.print_exc()
173 # Block for up to MAX_DURATION seconds for the test to finish.
174 r
, w
, x
= select
.select([done_r
], [], [], self
.MAX_DURATION
)
176 tb
= pickle
.load(done_r
)
180 os
.kill(child
, signal
.SIGKILL
)
181 self
.fail('Test deadlocked after %d seconds.' %
185 class BasicSignalTests(unittest
.TestCase
):
186 def trivial_signal_handler(self
, *args
):
189 def test_out_of_range_signal_number_raises_error(self
):
190 self
.assertRaises(ValueError, signal
.getsignal
, 4242)
192 self
.assertRaises(ValueError, signal
.signal
, 4242,
193 self
.trivial_signal_handler
)
195 def test_setting_signal_handler_to_none_raises_error(self
):
196 self
.assertRaises(TypeError, signal
.signal
,
197 signal
.SIGUSR1
, None)
199 def test_getsignal(self
):
200 hup
= signal
.signal(signal
.SIGHUP
, self
.trivial_signal_handler
)
201 self
.assertEquals(signal
.getsignal(signal
.SIGHUP
),
202 self
.trivial_signal_handler
)
203 signal
.signal(signal
.SIGHUP
, hup
)
204 self
.assertEquals(signal
.getsignal(signal
.SIGHUP
), hup
)
207 class WakeupSignalTests(unittest
.TestCase
):
211 def test_wakeup_fd_early(self
):
215 before_time
= time
.time()
216 # We attempt to get a signal during the sleep,
217 # before select is called
218 time
.sleep(self
.TIMEOUT_FULL
)
219 mid_time
= time
.time()
220 self
.assertTrue(mid_time
- before_time
< self
.TIMEOUT_HALF
)
221 select
.select([self
.read
], [], [], self
.TIMEOUT_FULL
)
222 after_time
= time
.time()
223 self
.assertTrue(after_time
- mid_time
< self
.TIMEOUT_HALF
)
225 def test_wakeup_fd_during(self
):
229 before_time
= time
.time()
230 # We attempt to get a signal during the select call
231 self
.assertRaises(select
.error
, select
.select
,
232 [self
.read
], [], [], self
.TIMEOUT_FULL
)
233 after_time
= time
.time()
234 self
.assertTrue(after_time
- before_time
< self
.TIMEOUT_HALF
)
239 self
.alrm
= signal
.signal(signal
.SIGALRM
, lambda x
,y
:None)
240 self
.read
, self
.write
= os
.pipe()
241 flags
= fcntl
.fcntl(self
.write
, fcntl
.F_GETFL
, 0)
242 flags
= flags | os
.O_NONBLOCK
243 fcntl
.fcntl(self
.write
, fcntl
.F_SETFL
, flags
)
244 self
.old_wakeup
= signal
.set_wakeup_fd(self
.write
)
247 signal
.set_wakeup_fd(self
.old_wakeup
)
250 signal
.signal(signal
.SIGALRM
, self
.alrm
)
252 class SiginterruptTest(unittest
.TestCase
):
253 signum
= signal
.SIGUSR1
254 def readpipe_interrupted(self
, cb
):
259 oldhandler
= signal
.signal(self
.signum
, lambda x
,y
: None)
262 # child code: sleep, kill, sleep. and then exit,
263 # which closes the pipe from which the parent process reads
266 os
.kill(ppid
, self
.signum
)
278 if err
.errno
!= errno
.EINTR
:
282 signal
.signal(self
.signum
, oldhandler
)
285 def test_without_siginterrupt(self
):
286 i
=self
.readpipe_interrupted(lambda: None)
287 self
.assertEquals(i
, True)
289 def test_siginterrupt_on(self
):
290 i
=self
.readpipe_interrupted(lambda: signal
.siginterrupt(self
.signum
, 1))
291 self
.assertEquals(i
, True)
293 def test_siginterrupt_off(self
):
294 i
=self
.readpipe_interrupted(lambda: signal
.siginterrupt(self
.signum
, 0))
295 self
.assertEquals(i
, False)
297 class ItimerTest(unittest
.TestCase
):
299 self
.hndl_called
= False
302 self
.old_alarm
= signal
.signal(signal
.SIGALRM
, self
.sig_alrm
)
305 signal
.signal(signal
.SIGALRM
, self
.old_alarm
)
306 if self
.itimer
is not None: # test_itimer_exc doesn't change this attr
307 # just ensure that itimer is stopped
308 signal
.setitimer(self
.itimer
, 0)
310 def sig_alrm(self
, *args
):
311 self
.hndl_called
= True
312 if test_support
.verbose
:
313 print("SIGALRM handler invoked", args
)
315 def sig_vtalrm(self
, *args
):
316 self
.hndl_called
= True
318 if self
.hndl_count
> 3:
319 # it shouldn't be here, because it should have been disabled.
320 raise signal
.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
322 elif self
.hndl_count
== 3:
323 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
324 signal
.setitimer(signal
.ITIMER_VIRTUAL
, 0)
325 if test_support
.verbose
:
326 print("last SIGVTALRM handler call")
330 if test_support
.verbose
:
331 print("SIGVTALRM handler invoked", args
)
333 def sig_prof(self
, *args
):
334 self
.hndl_called
= True
335 signal
.setitimer(signal
.ITIMER_PROF
, 0)
337 if test_support
.verbose
:
338 print("SIGPROF handler invoked", args
)
340 def test_itimer_exc(self
):
341 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
343 self
.assertRaises(signal
.ItimerError
, signal
.setitimer
, -1, 0)
344 # Negative times are treated as zero on some platforms.
346 self
.assertRaises(signal
.ItimerError
,
347 signal
.setitimer
, signal
.ITIMER_REAL
, -1)
349 def test_itimer_real(self
):
350 self
.itimer
= signal
.ITIMER_REAL
351 signal
.setitimer(self
.itimer
, 1.0)
352 if test_support
.verbose
:
353 print("\ncall pause()...")
356 self
.assertEqual(self
.hndl_called
, True)
358 def test_itimer_virtual(self
):
359 self
.itimer
= signal
.ITIMER_VIRTUAL
360 signal
.signal(signal
.SIGVTALRM
, self
.sig_vtalrm
)
361 signal
.setitimer(self
.itimer
, 0.3, 0.2)
363 for i
in xrange(100000000):
364 # use up some virtual time by doing real work
365 _
= pow(12345, 67890, 10000019)
366 if signal
.getitimer(self
.itimer
) == (0.0, 0.0):
367 break # sig_vtalrm handler stopped this itimer
369 # virtual itimer should be (0.0, 0.0) now
370 self
.assertEquals(signal
.getitimer(self
.itimer
), (0.0, 0.0))
371 # and the handler should have been called
372 self
.assertEquals(self
.hndl_called
, True)
374 def test_itimer_prof(self
):
375 self
.itimer
= signal
.ITIMER_PROF
376 signal
.signal(signal
.SIGPROF
, self
.sig_prof
)
377 signal
.setitimer(self
.itimer
, 0.2, 0.2)
379 for i
in xrange(100000000):
380 if signal
.getitimer(self
.itimer
) == (0.0, 0.0):
381 break # sig_prof handler stopped this itimer
383 # profiling itimer should be (0.0, 0.0) now
384 self
.assertEquals(signal
.getitimer(self
.itimer
), (0.0, 0.0))
385 # and the handler should have been called
386 self
.assertEqual(self
.hndl_called
, True)
389 test_support
.run_unittest(BasicSignalTests
, InterProcessSignalTests
,
390 WakeupSignalTests
, SiginterruptTest
, ItimerTest
)
393 if __name__
== "__main__":