Added more cross-reference targets and tidied up list of useful handlers.
[python.git] / Doc / includes / mp_pool.py
blob9e89cbc607ff31af1f509da71f4b05504d9cd818
2 # A test of `multiprocessing.Pool` class
4 # Copyright (c) 2006-2008, R Oudkerk
5 # All rights reserved.
8 import multiprocessing
9 import time
10 import random
11 import sys
14 # Functions used by test code
17 def calculate(func, args):
18 result = func(*args)
19 return '%s says that %s%s = %s' % (
20 multiprocessing.current_process().name,
21 func.__name__, args, result
24 def calculatestar(args):
25 return calculate(*args)
27 def mul(a, b):
28 time.sleep(0.5*random.random())
29 return a * b
31 def plus(a, b):
32 time.sleep(0.5*random.random())
33 return a + b
35 def f(x):
36 return 1.0 / (x-5.0)
38 def pow3(x):
39 return x**3
41 def noop(x):
42 pass
45 # Test code
48 def test():
49 print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
52 # Create pool
55 PROCESSES = 4
56 print 'Creating pool with %d processes\n' % PROCESSES
57 pool = multiprocessing.Pool(PROCESSES)
58 print 'pool = %s' % pool
59 print
62 # Tests
65 TASKS = [(mul, (i, 7)) for i in range(10)] + \
66 [(plus, (i, 8)) for i in range(10)]
68 results = [pool.apply_async(calculate, t) for t in TASKS]
69 imap_it = pool.imap(calculatestar, TASKS)
70 imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
72 print 'Ordered results using pool.apply_async():'
73 for r in results:
74 print '\t', r.get()
75 print
77 print 'Ordered results using pool.imap():'
78 for x in imap_it:
79 print '\t', x
80 print
82 print 'Unordered results using pool.imap_unordered():'
83 for x in imap_unordered_it:
84 print '\t', x
85 print
87 print 'Ordered results using pool.map() --- will block till complete:'
88 for x in pool.map(calculatestar, TASKS):
89 print '\t', x
90 print
93 # Simple benchmarks
96 N = 100000
97 print 'def pow3(x): return x**3'
99 t = time.time()
100 A = map(pow3, xrange(N))
101 print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
102 (N, time.time() - t)
104 t = time.time()
105 B = pool.map(pow3, xrange(N))
106 print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
107 (N, time.time() - t)
109 t = time.time()
110 C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
111 print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
112 ' seconds' % (N, N//8, time.time() - t)
114 assert A == B == C, (len(A), len(B), len(C))
115 print
117 L = [None] * 1000000
118 print 'def noop(x): pass'
119 print 'L = [None] * 1000000'
121 t = time.time()
122 A = map(noop, L)
123 print '\tmap(noop, L):\n\t\t%s seconds' % \
124 (time.time() - t)
126 t = time.time()
127 B = pool.map(noop, L)
128 print '\tpool.map(noop, L):\n\t\t%s seconds' % \
129 (time.time() - t)
131 t = time.time()
132 C = list(pool.imap(noop, L, chunksize=len(L)//8))
133 print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
134 (len(L)//8, time.time() - t)
136 assert A == B == C, (len(A), len(B), len(C))
137 print
139 del A, B, C, L
142 # Test error handling
145 print 'Testing error handling:'
147 try:
148 print pool.apply(f, (5,))
149 except ZeroDivisionError:
150 print '\tGot ZeroDivisionError as expected from pool.apply()'
151 else:
152 raise AssertionError, 'expected ZeroDivisionError'
154 try:
155 print pool.map(f, range(10))
156 except ZeroDivisionError:
157 print '\tGot ZeroDivisionError as expected from pool.map()'
158 else:
159 raise AssertionError, 'expected ZeroDivisionError'
161 try:
162 print list(pool.imap(f, range(10)))
163 except ZeroDivisionError:
164 print '\tGot ZeroDivisionError as expected from list(pool.imap())'
165 else:
166 raise AssertionError, 'expected ZeroDivisionError'
168 it = pool.imap(f, range(10))
169 for i in range(10):
170 try:
171 x = it.next()
172 except ZeroDivisionError:
173 if i == 5:
174 pass
175 except StopIteration:
176 break
177 else:
178 if i == 5:
179 raise AssertionError, 'expected ZeroDivisionError'
181 assert i == 9
182 print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
183 print
186 # Testing timeouts
189 print 'Testing ApplyResult.get() with timeout:',
190 res = pool.apply_async(calculate, TASKS[0])
191 while 1:
192 sys.stdout.flush()
193 try:
194 sys.stdout.write('\n\t%s' % res.get(0.02))
195 break
196 except multiprocessing.TimeoutError:
197 sys.stdout.write('.')
198 print
199 print
201 print 'Testing IMapIterator.next() with timeout:',
202 it = pool.imap(calculatestar, TASKS)
203 while 1:
204 sys.stdout.flush()
205 try:
206 sys.stdout.write('\n\t%s' % it.next(0.02))
207 except StopIteration:
208 break
209 except multiprocessing.TimeoutError:
210 sys.stdout.write('.')
211 print
212 print
215 # Testing callback
218 print 'Testing callback:'
220 A = []
221 B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
223 r = pool.apply_async(mul, (7, 8), callback=A.append)
224 r.wait()
226 r = pool.map_async(pow3, range(10), callback=A.extend)
227 r.wait()
229 if A == B:
230 print '\tcallbacks succeeded\n'
231 else:
232 print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
235 # Check there are no outstanding tasks
238 assert not pool._cache, 'cache = %r' % pool._cache
241 # Check close() methods
244 print 'Testing close():'
246 for worker in pool._pool:
247 assert worker.is_alive()
249 result = pool.apply_async(time.sleep, [0.5])
250 pool.close()
251 pool.join()
253 assert result.get() is None
255 for worker in pool._pool:
256 assert not worker.is_alive()
258 print '\tclose() succeeded\n'
261 # Check terminate() method
264 print 'Testing terminate():'
266 pool = multiprocessing.Pool(2)
267 DELTA = 0.1
268 ignore = pool.apply(pow3, [2])
269 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
270 pool.terminate()
271 pool.join()
273 for worker in pool._pool:
274 assert not worker.is_alive()
276 print '\tterminate() succeeded\n'
279 # Check garbage collection
282 print 'Testing garbage collection:'
284 pool = multiprocessing.Pool(2)
285 DELTA = 0.1
286 processes = pool._pool
287 ignore = pool.apply(pow3, [2])
288 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
290 results = pool = None
292 time.sleep(DELTA * 2)
294 for worker in processes:
295 assert not worker.is_alive()
297 print '\tgarbage collection succeeded\n'
300 if __name__ == '__main__':
301 multiprocessing.freeze_support()
303 assert len(sys.argv) in (1, 2)
305 if len(sys.argv) == 1 or sys.argv[1] == 'processes':
306 print ' Using processes '.center(79, '-')
307 elif sys.argv[1] == 'threads':
308 print ' Using threads '.center(79, '-')
309 import multiprocessing.dummy as multiprocessing
310 else:
311 print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
312 raise SystemExit(2)
314 test()