Silence some py3k warnings claiming to affect _pyio
[python.git] / Lib / test / test_capi.py
blob4dc3104aeea905677573b2ae73e110757491052d
1 # Run the _testcapi module tests (tests for the Python/C API): by defn,
2 # these are all functions _testcapi exports whose name begins with 'test_'.
4 from __future__ import with_statement
5 import sys
6 import time
7 import random
8 import unittest
9 import threading
10 from test import test_support
11 import _testcapi
13 class TestPendingCalls(unittest.TestCase):
15 def pendingcalls_submit(self, l, n):
16 def callback():
17 #this function can be interrupted by thread switching so let's
18 #use an atomic operation
19 l.append(None)
21 for i in range(n):
22 time.sleep(random.random()*0.02) #0.01 secs on average
23 #try submitting callback until successful.
24 #rely on regular interrupt to flush queue if we are
25 #unsuccessful.
26 while True:
27 if _testcapi._pending_threadfunc(callback):
28 break;
30 def pendingcalls_wait(self, l, n, context = None):
31 #now, stick around until l[0] has grown to 10
32 count = 0;
33 while len(l) != n:
34 #this busy loop is where we expect to be interrupted to
35 #run our callbacks. Note that callbacks are only run on the
36 #main thread
37 if False and test_support.verbose:
38 print "(%i)"%(len(l),),
39 for i in xrange(1000):
40 a = i*i
41 if context and not context.event.is_set():
42 continue
43 count += 1
44 self.assertTrue(count < 10000,
45 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
46 if False and test_support.verbose:
47 print "(%i)"%(len(l),)
49 def test_pendingcalls_threaded(self):
51 #do every callback on a separate thread
52 n = 32 #total callbacks
53 threads = []
54 class foo(object):pass
55 context = foo()
56 context.l = []
57 context.n = 2 #submits per thread
58 context.nThreads = n / context.n
59 context.nFinished = 0
60 context.lock = threading.Lock()
61 context.event = threading.Event()
63 for i in range(context.nThreads):
64 t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
65 t.start()
66 threads.append(t)
68 self.pendingcalls_wait(context.l, n, context)
70 for t in threads:
71 t.join()
73 def pendingcalls_thread(self, context):
74 try:
75 self.pendingcalls_submit(context.l, context.n)
76 finally:
77 with context.lock:
78 context.nFinished += 1
79 nFinished = context.nFinished
80 if False and test_support.verbose:
81 print "finished threads: ", nFinished
82 if nFinished == context.nThreads:
83 context.event.set()
85 def test_pendingcalls_non_threaded(self):
86 #again, just using the main thread, likely they will all be dispathced at
87 #once. It is ok to ask for too many, because we loop until we find a slot.
88 #the loop can be interrupted to dispatch.
89 #there are only 32 dispatch slots, so we go for twice that!
90 l = []
91 n = 64
92 self.pendingcalls_submit(l, n)
93 self.pendingcalls_wait(l, n)
96 def test_main():
98 for name in dir(_testcapi):
99 if name.startswith('test_'):
100 test = getattr(_testcapi, name)
101 if test_support.verbose:
102 print "internal", name
103 try:
104 test()
105 except _testcapi.error:
106 raise test_support.TestFailed, sys.exc_info()[1]
108 # some extra thread-state tests driven via _testcapi
109 def TestThreadState():
110 if test_support.verbose:
111 print "auto-thread-state"
113 idents = []
115 def callback():
116 idents.append(thread.get_ident())
118 _testcapi._test_thread_state(callback)
119 a = b = callback
120 time.sleep(1)
121 # Check our main thread is in the list exactly 3 times.
122 if idents.count(thread.get_ident()) != 3:
123 raise test_support.TestFailed, \
124 "Couldn't find main thread correctly in the list"
126 try:
127 _testcapi._test_thread_state
128 have_thread_state = True
129 except AttributeError:
130 have_thread_state = False
132 if have_thread_state:
133 import thread
134 import time
135 TestThreadState()
136 import threading
137 t=threading.Thread(target=TestThreadState)
138 t.start()
139 t.join()
141 test_support.run_unittest(TestPendingCalls)
143 if __name__ == "__main__":
144 test_main()