2 # A test of `multiprocessing.Pool` class
4 # Copyright (c) 2006-2008, R Oudkerk
14 # Functions used by test code
17 def calculate(func
, args
):
19 return '%s says that %s%s = %s' % (
20 multiprocessing
.current_process().name
,
21 func
.__name
__, args
, result
24 def calculatestar(args
):
25 return calculate(*args
)
28 time
.sleep(0.5*random
.random())
32 time
.sleep(0.5*random
.random())
49 print 'cpu_count() = %d\n' % multiprocessing
.cpu_count()
56 print 'Creating pool with %d processes\n' % PROCESSES
57 pool
= multiprocessing
.Pool(PROCESSES
)
58 print 'pool = %s' % pool
65 TASKS
= [(mul
, (i
, 7)) for i
in range(10)] + \
66 [(plus
, (i
, 8)) for i
in range(10)]
68 results
= [pool
.apply_async(calculate
, t
) for t
in TASKS
]
69 imap_it
= pool
.imap(calculatestar
, TASKS
)
70 imap_unordered_it
= pool
.imap_unordered(calculatestar
, TASKS
)
72 print 'Ordered results using pool.apply_async():'
77 print 'Ordered results using pool.imap():'
82 print 'Unordered results using pool.imap_unordered():'
83 for x
in imap_unordered_it
:
87 print 'Ordered results using pool.map() --- will block till complete:'
88 for x
in pool
.map(calculatestar
, TASKS
):
97 print 'def pow3(x): return x**3'
100 A
= map(pow3
, xrange(N
))
101 print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
105 B
= pool
.map(pow3
, xrange(N
))
106 print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
110 C
= list(pool
.imap(pow3
, xrange(N
), chunksize
=N
//8))
111 print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
112 ' seconds' % (N
, N
//8, time
.time() - t
)
114 assert A
== B
== C
, (len(A
), len(B
), len(C
))
118 print 'def noop(x): pass'
119 print 'L = [None] * 1000000'
123 print '\tmap(noop, L):\n\t\t%s seconds' % \
127 B
= pool
.map(noop
, L
)
128 print '\tpool.map(noop, L):\n\t\t%s seconds' % \
132 C
= list(pool
.imap(noop
, L
, chunksize
=len(L
)//8))
133 print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
134 (len(L
)//8, time
.time() - t
)
136 assert A
== B
== C
, (len(A
), len(B
), len(C
))
142 # Test error handling
145 print 'Testing error handling:'
148 print pool
.apply(f
, (5,))
149 except ZeroDivisionError:
150 print '\tGot ZeroDivisionError as expected from pool.apply()'
152 raise AssertionError('expected ZeroDivisionError')
155 print pool
.map(f
, range(10))
156 except ZeroDivisionError:
157 print '\tGot ZeroDivisionError as expected from pool.map()'
159 raise AssertionError('expected ZeroDivisionError')
162 print list(pool
.imap(f
, range(10)))
163 except ZeroDivisionError:
164 print '\tGot ZeroDivisionError as expected from list(pool.imap())'
166 raise AssertionError('expected ZeroDivisionError')
168 it
= pool
.imap(f
, range(10))
172 except ZeroDivisionError:
175 except StopIteration:
179 raise AssertionError('expected ZeroDivisionError')
182 print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
189 print 'Testing ApplyResult.get() with timeout:',
190 res
= pool
.apply_async(calculate
, TASKS
[0])
194 sys
.stdout
.write('\n\t%s' % res
.get(0.02))
196 except multiprocessing
.TimeoutError
:
197 sys
.stdout
.write('.')
201 print 'Testing IMapIterator.next() with timeout:',
202 it
= pool
.imap(calculatestar
, TASKS
)
206 sys
.stdout
.write('\n\t%s' % it
.next(0.02))
207 except StopIteration:
209 except multiprocessing
.TimeoutError
:
210 sys
.stdout
.write('.')
218 print 'Testing callback:'
221 B
= [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
223 r
= pool
.apply_async(mul
, (7, 8), callback
=A
.append
)
226 r
= pool
.map_async(pow3
, range(10), callback
=A
.extend
)
230 print '\tcallbacks succeeded\n'
232 print '\t*** callbacks failed\n\t\t%s != %s\n' % (A
, B
)
235 # Check there are no outstanding tasks
238 assert not pool
._cache
, 'cache = %r' % pool
._cache
241 # Check close() methods
244 print 'Testing close():'
246 for worker
in pool
._pool
:
247 assert worker
.is_alive()
249 result
= pool
.apply_async(time
.sleep
, [0.5])
253 assert result
.get() is None
255 for worker
in pool
._pool
:
256 assert not worker
.is_alive()
258 print '\tclose() succeeded\n'
261 # Check terminate() method
264 print 'Testing terminate():'
266 pool
= multiprocessing
.Pool(2)
268 ignore
= pool
.apply(pow3
, [2])
269 results
= [pool
.apply_async(time
.sleep
, [DELTA
]) for i
in range(100)]
273 for worker
in pool
._pool
:
274 assert not worker
.is_alive()
276 print '\tterminate() succeeded\n'
279 # Check garbage collection
282 print 'Testing garbage collection:'
284 pool
= multiprocessing
.Pool(2)
286 processes
= pool
._pool
287 ignore
= pool
.apply(pow3
, [2])
288 results
= [pool
.apply_async(time
.sleep
, [DELTA
]) for i
in range(100)]
290 results
= pool
= None
292 time
.sleep(DELTA
* 2)
294 for worker
in processes
:
295 assert not worker
.is_alive()
297 print '\tgarbage collection succeeded\n'
300 if __name__
== '__main__':
301 multiprocessing
.freeze_support()
303 assert len(sys
.argv
) in (1, 2)
305 if len(sys
.argv
) == 1 or sys
.argv
[1] == 'processes':
306 print ' Using processes '.center(79, '-')
307 elif sys
.argv
[1] == 'threads':
308 print ' Using threads '.center(79, '-')
309 import multiprocessing
.dummy
as multiprocessing
311 print 'Usage:\n\t%s [processes | threads]' % sys
.argv
[0]