2 # A test of `multiprocessing.Pool` class
4 # Copyright (c) 2006-2008, R Oudkerk
14 # Functions used by test code
17 def calculate(func
, args
):
19 return '%s says that %s%s = %s' % (
20 multiprocessing
.current_process().name
,
21 func
.__name
__, args
, result
24 def calculatestar(args
):
25 return calculate(*args
)
28 time
.sleep(0.5*random
.random())
32 time
.sleep(0.5*random
.random())
49 print('cpu_count() = %d\n' % multiprocessing
.cpu_count())
56 print('Creating pool with %d processes\n' % PROCESSES
)
57 pool
= multiprocessing
.Pool(PROCESSES
)
58 print('pool = %s' % pool
)
65 TASKS
= [(mul
, (i
, 7)) for i
in range(10)] + \
66 [(plus
, (i
, 8)) for i
in range(10)]
68 results
= [pool
.apply_async(calculate
, t
) for t
in TASKS
]
69 imap_it
= pool
.imap(calculatestar
, TASKS
)
70 imap_unordered_it
= pool
.imap_unordered(calculatestar
, TASKS
)
72 print('Ordered results using pool.apply_async():')
77 print('Ordered results using pool.imap():')
82 print('Unordered results using pool.imap_unordered():')
83 for x
in imap_unordered_it
:
87 print('Ordered results using pool.map() --- will block till complete:')
88 for x
in pool
.map(calculatestar
, TASKS
):
97 print('def pow3(x): return x**3')
100 A
= list(map(pow3
, range(N
)))
101 print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \
102 (N
, time
.time() - t
))
105 B
= pool
.map(pow3
, range(N
))
106 print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \
107 (N
, time
.time() - t
))
110 C
= list(pool
.imap(pow3
, range(N
), chunksize
=N
//8))
111 print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \
112 ' seconds' % (N
, N
//8, time
.time() - t
))
114 assert A
== B
== C
, (len(A
), len(B
), len(C
))
118 print('def noop(x): pass')
119 print('L = [None] * 1000000')
122 A
= list(map(noop
, L
))
123 print('\tmap(noop, L):\n\t\t%s seconds' % \
127 B
= pool
.map(noop
, L
)
128 print('\tpool.map(noop, L):\n\t\t%s seconds' % \
132 C
= list(pool
.imap(noop
, L
, chunksize
=len(L
)//8))
133 print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
134 (len(L
)//8, time
.time() - t
))
136 assert A
== B
== C
, (len(A
), len(B
), len(C
))
142 # Test error handling
145 print('Testing error handling:')
148 print(pool
.apply(f
, (5,)))
149 except ZeroDivisionError:
150 print('\tGot ZeroDivisionError as expected from pool.apply()')
152 raise AssertionError('expected ZeroDivisionError')
155 print(pool
.map(f
, list(range(10))))
156 except ZeroDivisionError:
157 print('\tGot ZeroDivisionError as expected from pool.map()')
159 raise AssertionError('expected ZeroDivisionError')
162 print(list(pool
.imap(f
, list(range(10)))))
163 except ZeroDivisionError:
164 print('\tGot ZeroDivisionError as expected from list(pool.imap())')
166 raise AssertionError('expected ZeroDivisionError')
168 it
= pool
.imap(f
, list(range(10)))
172 except ZeroDivisionError:
175 except StopIteration:
179 raise AssertionError('expected ZeroDivisionError')
182 print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
189 print('Testing ApplyResult.get() with timeout:', end
=' ')
190 res
= pool
.apply_async(calculate
, TASKS
[0])
194 sys
.stdout
.write('\n\t%s' % res
.get(0.02))
196 except multiprocessing
.TimeoutError
:
197 sys
.stdout
.write('.')
201 print('Testing IMapIterator.next() with timeout:', end
=' ')
202 it
= pool
.imap(calculatestar
, TASKS
)
206 sys
.stdout
.write('\n\t%s' % it
.next(0.02))
207 except StopIteration:
209 except multiprocessing
.TimeoutError
:
210 sys
.stdout
.write('.')
218 print('Testing callback:')
221 B
= [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
223 r
= pool
.apply_async(mul
, (7, 8), callback
=A
.append
)
226 r
= pool
.map_async(pow3
, list(range(10)), callback
=A
.extend
)
230 print('\tcallbacks succeeded\n')
232 print('\t*** callbacks failed\n\t\t%s != %s\n' % (A
, B
))
235 # Check there are no outstanding tasks
238 assert not pool
._cache
, 'cache = %r' % pool
._cache
241 # Check close() methods
244 print('Testing close():')
246 for worker
in pool
._pool
:
247 assert worker
.is_alive()
249 result
= pool
.apply_async(time
.sleep
, [0.5])
253 assert result
.get() is None
255 for worker
in pool
._pool
:
256 assert not worker
.is_alive()
258 print('\tclose() succeeded\n')
261 # Check terminate() method
264 print('Testing terminate():')
266 pool
= multiprocessing
.Pool(2)
268 ignore
= pool
.apply(pow3
, [2])
269 results
= [pool
.apply_async(time
.sleep
, [DELTA
]) for i
in range(100)]
273 for worker
in pool
._pool
:
274 assert not worker
.is_alive()
276 print('\tterminate() succeeded\n')
279 # Check garbage collection
282 print('Testing garbage collection:')
284 pool
= multiprocessing
.Pool(2)
286 processes
= pool
._pool
287 ignore
= pool
.apply(pow3
, [2])
288 results
= [pool
.apply_async(time
.sleep
, [DELTA
]) for i
in range(100)]
290 results
= pool
= None
292 time
.sleep(DELTA
* 2)
294 for worker
in processes
:
295 assert not worker
.is_alive()
297 print('\tgarbage collection succeeded\n')
300 if __name__
== '__main__':
301 multiprocessing
.freeze_support()
303 assert len(sys
.argv
) in (1, 2)
305 if len(sys
.argv
) == 1 or sys
.argv
[1] == 'processes':
306 print(' Using processes '.center(79, '-'))
307 elif sys
.argv
[1] == 'threads':
308 print(' Using threads '.center(79, '-'))
309 import multiprocessing
.dummy
as multiprocessing
311 print('Usage:\n\t%s [processes | threads]' % sys
.argv
[0])