1 # Run the _testcapi module tests (tests for the Python/C API): by defn,
2 # these are all functions _testcapi exports whose name begins with 'test_'.
4 from __future__
import with_statement
9 from test
import test_support
16 @unittest.skipUnless(threading
, 'Threading required for this test.')
17 class TestPendingCalls(unittest
.TestCase
):
19 def pendingcalls_submit(self
, l
, n
):
21 #this function can be interrupted by thread switching so let's
22 #use an atomic operation
26 time
.sleep(random
.random()*0.02) #0.01 secs on average
27 #try submitting callback until successful.
28 #rely on regular interrupt to flush queue if we are
31 if _testcapi
._pending
_threadfunc
(callback
):
34 def pendingcalls_wait(self
, l
, n
, context
= None):
35 #now, stick around until l[0] has grown to 10
38 #this busy loop is where we expect to be interrupted to
39 #run our callbacks. Note that callbacks are only run on the
41 if False and test_support
.verbose
:
42 print "(%i)"%(len(l
),),
43 for i
in xrange(1000):
45 if context
and not context
.event
.is_set():
48 self
.assertTrue(count
< 10000,
49 "timeout waiting for %i callbacks, got %i"%(n
, len(l
)))
50 if False and test_support
.verbose
:
51 print "(%i)"%(len(l
),)
53 def test_pendingcalls_threaded(self
):
54 #do every callback on a separate thread
55 n
= 32 #total callbacks
57 class foo(object):pass
60 context
.n
= 2 #submits per thread
61 context
.nThreads
= n
// context
.n
63 context
.lock
= threading
.Lock()
64 context
.event
= threading
.Event()
66 for i
in range(context
.nThreads
):
67 t
= threading
.Thread(target
=self
.pendingcalls_thread
, args
= (context
,))
71 self
.pendingcalls_wait(context
.l
, n
, context
)
76 def pendingcalls_thread(self
, context
):
78 self
.pendingcalls_submit(context
.l
, context
.n
)
81 context
.nFinished
+= 1
82 nFinished
= context
.nFinished
83 if False and test_support
.verbose
:
84 print "finished threads: ", nFinished
85 if nFinished
== context
.nThreads
:
88 def test_pendingcalls_non_threaded(self
):
89 #again, just using the main thread, likely they will all be dispathced at
90 #once. It is ok to ask for too many, because we loop until we find a slot.
91 #the loop can be interrupted to dispatch.
92 #there are only 32 dispatch slots, so we go for twice that!
95 self
.pendingcalls_submit(l
, n
)
96 self
.pendingcalls_wait(l
, n
)
101 for name
in dir(_testcapi
):
102 if name
.startswith('test_'):
103 test
= getattr(_testcapi
, name
)
104 if test_support
.verbose
:
105 print "internal", name
108 except _testcapi
.error
:
109 raise test_support
.TestFailed
, sys
.exc_info()[1]
111 # some extra thread-state tests driven via _testcapi
112 def TestThreadState():
113 if test_support
.verbose
:
114 print "auto-thread-state"
119 idents
.append(thread
.get_ident())
121 _testcapi
._test
_thread
_state
(callback
)
124 # Check our main thread is in the list exactly 3 times.
125 if idents
.count(thread
.get_ident()) != 3:
126 raise test_support
.TestFailed
, \
127 "Couldn't find main thread correctly in the list"
133 t
=threading
.Thread(target
=TestThreadState
)
137 test_support
.run_unittest(TestPendingCalls
)
139 if __name__
== "__main__":