2 # A test file for the `multiprocessing` package
4 # Copyright (c) 2006-2008, R Oudkerk
8 import time
, sys
, random
9 from Queue
import Empty
11 import multiprocessing
# may get overwritten
16 def value_func(running
, mutex
):
18 time
.sleep(random
.random()*4)
21 print '\n\t\t\t' + str(multiprocessing
.current_process()) + ' has finished'
27 running
= multiprocessing
.Value('i', TASKS
)
28 mutex
= multiprocessing
.Lock()
30 for i
in range(TASKS
):
31 p
= multiprocessing
.Process(target
=value_func
, args
=(running
, mutex
))
34 while running
.value
> 0:
42 print 'No more running processes'
47 def queue_func(queue
):
49 time
.sleep(0.5 * random
.random())
54 q
= multiprocessing
.Queue()
56 p
= multiprocessing
.Process(target
=queue_func
, args
=(q
,))
62 o
= q
.get(timeout
=0.3)
73 def condition_func(cond
):
75 print '\t' + str(cond
)
77 print '\tchild is notifying'
78 print '\t' + str(cond
)
83 cond
= multiprocessing
.Condition()
85 p
= multiprocessing
.Process(target
=condition_func
, args
=(cond
,))
95 print 'main is waiting'
97 print 'main has woken up'
110 def semaphore_func(sema
, mutex
, running
):
115 print running
.value
, 'tasks are running'
119 time
.sleep(random
.random()*2)
123 print '%s has finished' % multiprocessing
.current_process()
128 def test_semaphore():
129 sema
= multiprocessing
.Semaphore(3)
130 mutex
= multiprocessing
.RLock()
131 running
= multiprocessing
.Value('i', 0)
134 multiprocessing
.Process(target
=semaphore_func
,
135 args
=(sema
, mutex
, running
))
146 #### TEST_JOIN_TIMEOUT
148 def join_timeout_func():
149 print '\tchild sleeping'
151 print '\n\tchild terminating'
153 def test_join_timeout():
154 p
= multiprocessing
.Process(target
=join_timeout_func
)
157 print 'waiting for process to finish'
169 def event_func(event
):
170 print '\t%r is waiting' % multiprocessing
.current_process()
172 print '\t%r has woken up' % multiprocessing
.current_process()
175 event
= multiprocessing
.Event()
177 processes
= [multiprocessing
.Process(target
=event_func
, args
=(event
,))
183 print 'main is sleeping'
186 print 'main is setting event'
193 #### TEST_SHAREDVALUES
195 def sharedvalues_func(values
, arrays
, shared_values
, shared_arrays
):
196 for i
in range(len(values
)):
198 sv
= shared_values
[i
].value
201 for i
in range(len(values
)):
203 sa
= list(shared_arrays
[i
][:])
208 def test_sharedvalues():
216 ('d', [0.25 * i
for i
in range(100)]),
220 shared_values
= [multiprocessing
.Value(id, v
) for id, v
in values
]
221 shared_arrays
= [multiprocessing
.Array(id, a
) for id, a
in arrays
]
223 p
= multiprocessing
.Process(
224 target
=sharedvalues_func
,
225 args
=(values
, arrays
, shared_values
, shared_arrays
)
230 assert p
.exitcode
== 0
235 def test(namespace
=multiprocessing
):
236 global multiprocessing
238 multiprocessing
= namespace
240 for func
in [ test_value
, test_queue
, test_condition
,
241 test_semaphore
, test_join_timeout
, test_event
,
244 print '\n\t######## %s\n' % func
.__name
__
247 ignore
= multiprocessing
.active_children() # cleanup any old processes
248 if hasattr(multiprocessing
, '_debug_info'):
249 info
= multiprocessing
._debug
_info
()
252 raise ValueError, 'there should be no positive refcounts left'
255 if __name__
== '__main__':
256 multiprocessing
.freeze_support()
258 assert len(sys
.argv
) in (1, 2)
260 if len(sys
.argv
) == 1 or sys
.argv
[1] == 'processes':
261 print ' Using processes '.center(79, '-')
262 namespace
= multiprocessing
263 elif sys
.argv
[1] == 'manager':
264 print ' Using processes and a manager '.center(79, '-')
265 namespace
= multiprocessing
.Manager()
266 namespace
.Process
= multiprocessing
.Process
267 namespace
.current_process
= multiprocessing
.current_process
268 namespace
.active_children
= multiprocessing
.active_children
269 elif sys
.argv
[1] == 'threads':
270 print ' Using threads '.center(79, '-')
271 import multiprocessing
.dummy
as namespace
273 print 'Usage:\n\t%s [processes | manager | threads]' % sys
.argv
[0]