1 # Run the _testcapi module tests (tests for the Python/C API): by defn,
2 # these are all functions _testcapi exports whose name begins with 'test_'.
4 from __future__
import with_statement
10 from test
import test_support
13 class TestPendingCalls(unittest
.TestCase
):
15 def pendingcalls_submit(self
, l
, n
):
17 #this function can be interrupted by thread switching so let's
18 #use an atomic operation
22 time
.sleep(random
.random()*0.02) #0.01 secs on average
23 #try submitting callback until successful.
24 #rely on regular interrupt to flush queue if we are
27 if _testcapi
._pending
_threadfunc
(callback
):
30 def pendingcalls_wait(self
, l
, n
, context
= None):
31 #now, stick around until l[0] has grown to 10
34 #this busy loop is where we expect to be interrupted to
35 #run our callbacks. Note that callbacks are only run on the
37 if False and test_support
.verbose
:
38 print "(%i)"%(len(l
),),
39 for i
in xrange(1000):
41 if context
and not context
.event
.is_set():
44 self
.assertTrue(count
< 10000,
45 "timeout waiting for %i callbacks, got %i"%(n
, len(l
)))
46 if False and test_support
.verbose
:
47 print "(%i)"%(len(l
),)
49 def test_pendingcalls_threaded(self
):
51 #do every callback on a separate thread
52 n
= 32 #total callbacks
54 class foo(object):pass
57 context
.n
= 2 #submits per thread
58 context
.nThreads
= n
/ context
.n
60 context
.lock
= threading
.Lock()
61 context
.event
= threading
.Event()
63 for i
in range(context
.nThreads
):
64 t
= threading
.Thread(target
=self
.pendingcalls_thread
, args
= (context
,))
68 self
.pendingcalls_wait(context
.l
, n
, context
)
73 def pendingcalls_thread(self
, context
):
75 self
.pendingcalls_submit(context
.l
, context
.n
)
78 context
.nFinished
+= 1
79 nFinished
= context
.nFinished
80 if False and test_support
.verbose
:
81 print "finished threads: ", nFinished
82 if nFinished
== context
.nThreads
:
85 def test_pendingcalls_non_threaded(self
):
86 #again, just using the main thread, likely they will all be dispathced at
87 #once. It is ok to ask for too many, because we loop until we find a slot.
88 #the loop can be interrupted to dispatch.
89 #there are only 32 dispatch slots, so we go for twice that!
92 self
.pendingcalls_submit(l
, n
)
93 self
.pendingcalls_wait(l
, n
)
98 for name
in dir(_testcapi
):
99 if name
.startswith('test_'):
100 test
= getattr(_testcapi
, name
)
101 if test_support
.verbose
:
102 print "internal", name
105 except _testcapi
.error
:
106 raise test_support
.TestFailed
, sys
.exc_info()[1]
108 # some extra thread-state tests driven via _testcapi
109 def TestThreadState():
110 if test_support
.verbose
:
111 print "auto-thread-state"
116 idents
.append(thread
.get_ident())
118 _testcapi
._test
_thread
_state
(callback
)
121 # Check our main thread is in the list exactly 3 times.
122 if idents
.count(thread
.get_ident()) != 3:
123 raise test_support
.TestFailed
, \
124 "Couldn't find main thread correctly in the list"
127 _testcapi
._test
_thread
_state
128 have_thread_state
= True
129 except AttributeError:
130 have_thread_state
= False
132 if have_thread_state
:
137 t
=threading
.Thread(target
=TestThreadState
)
141 test_support
.run_unittest(TestPendingCalls
)
143 if __name__
== "__main__":