only use -fno-strict-aliasing when needed by compiler
[python/dscho.git] / Doc / includes / mp_pool.py
blobe360703bd1e78178015d2d97bc350ac849277903
2 # A test of `multiprocessing.Pool` class
4 # Copyright (c) 2006-2008, R Oudkerk
5 # All rights reserved.
8 import multiprocessing
9 import time
10 import random
11 import sys
14 # Functions used by test code
17 def calculate(func, args):
18 result = func(*args)
19 return '%s says that %s%s = %s' % (
20 multiprocessing.current_process().name,
21 func.__name__, args, result
24 def calculatestar(args):
25 return calculate(*args)
27 def mul(a, b):
28 time.sleep(0.5*random.random())
29 return a * b
31 def plus(a, b):
32 time.sleep(0.5*random.random())
33 return a + b
35 def f(x):
36 return 1.0 / (x-5.0)
38 def pow3(x):
39 return x**3
41 def noop(x):
42 pass
45 # Test code
48 def test():
49 print('cpu_count() = %d\n' % multiprocessing.cpu_count())
52 # Create pool
55 PROCESSES = 4
56 print('Creating pool with %d processes\n' % PROCESSES)
57 pool = multiprocessing.Pool(PROCESSES)
58 print('pool = %s' % pool)
59 print()
62 # Tests
65 TASKS = [(mul, (i, 7)) for i in range(10)] + \
66 [(plus, (i, 8)) for i in range(10)]
68 results = [pool.apply_async(calculate, t) for t in TASKS]
69 imap_it = pool.imap(calculatestar, TASKS)
70 imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
72 print('Ordered results using pool.apply_async():')
73 for r in results:
74 print('\t', r.get())
75 print()
77 print('Ordered results using pool.imap():')
78 for x in imap_it:
79 print('\t', x)
80 print()
82 print('Unordered results using pool.imap_unordered():')
83 for x in imap_unordered_it:
84 print('\t', x)
85 print()
87 print('Ordered results using pool.map() --- will block till complete:')
88 for x in pool.map(calculatestar, TASKS):
89 print('\t', x)
90 print()
93 # Simple benchmarks
96 N = 100000
97 print('def pow3(x): return x**3')
99 t = time.time()
100 A = list(map(pow3, range(N)))
101 print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \
102 (N, time.time() - t))
104 t = time.time()
105 B = pool.map(pow3, range(N))
106 print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \
107 (N, time.time() - t))
109 t = time.time()
110 C = list(pool.imap(pow3, range(N), chunksize=N//8))
111 print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \
112 ' seconds' % (N, N//8, time.time() - t))
114 assert A == B == C, (len(A), len(B), len(C))
115 print()
117 L = [None] * 1000000
118 print('def noop(x): pass')
119 print('L = [None] * 1000000')
121 t = time.time()
122 A = list(map(noop, L))
123 print('\tmap(noop, L):\n\t\t%s seconds' % \
124 (time.time() - t))
126 t = time.time()
127 B = pool.map(noop, L)
128 print('\tpool.map(noop, L):\n\t\t%s seconds' % \
129 (time.time() - t))
131 t = time.time()
132 C = list(pool.imap(noop, L, chunksize=len(L)//8))
133 print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
134 (len(L)//8, time.time() - t))
136 assert A == B == C, (len(A), len(B), len(C))
137 print()
139 del A, B, C, L
142 # Test error handling
145 print('Testing error handling:')
147 try:
148 print(pool.apply(f, (5,)))
149 except ZeroDivisionError:
150 print('\tGot ZeroDivisionError as expected from pool.apply()')
151 else:
152 raise AssertionError('expected ZeroDivisionError')
154 try:
155 print(pool.map(f, list(range(10))))
156 except ZeroDivisionError:
157 print('\tGot ZeroDivisionError as expected from pool.map()')
158 else:
159 raise AssertionError('expected ZeroDivisionError')
161 try:
162 print(list(pool.imap(f, list(range(10)))))
163 except ZeroDivisionError:
164 print('\tGot ZeroDivisionError as expected from list(pool.imap())')
165 else:
166 raise AssertionError('expected ZeroDivisionError')
168 it = pool.imap(f, list(range(10)))
169 for i in range(10):
170 try:
171 x = next(it)
172 except ZeroDivisionError:
173 if i == 5:
174 pass
175 except StopIteration:
176 break
177 else:
178 if i == 5:
179 raise AssertionError('expected ZeroDivisionError')
181 assert i == 9
182 print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
183 print()
186 # Testing timeouts
189 print('Testing ApplyResult.get() with timeout:', end=' ')
190 res = pool.apply_async(calculate, TASKS[0])
191 while 1:
192 sys.stdout.flush()
193 try:
194 sys.stdout.write('\n\t%s' % res.get(0.02))
195 break
196 except multiprocessing.TimeoutError:
197 sys.stdout.write('.')
198 print()
199 print()
201 print('Testing IMapIterator.next() with timeout:', end=' ')
202 it = pool.imap(calculatestar, TASKS)
203 while 1:
204 sys.stdout.flush()
205 try:
206 sys.stdout.write('\n\t%s' % it.next(0.02))
207 except StopIteration:
208 break
209 except multiprocessing.TimeoutError:
210 sys.stdout.write('.')
211 print()
212 print()
215 # Testing callback
218 print('Testing callback:')
220 A = []
221 B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
223 r = pool.apply_async(mul, (7, 8), callback=A.append)
224 r.wait()
226 r = pool.map_async(pow3, list(range(10)), callback=A.extend)
227 r.wait()
229 if A == B:
230 print('\tcallbacks succeeded\n')
231 else:
232 print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
235 # Check there are no outstanding tasks
238 assert not pool._cache, 'cache = %r' % pool._cache
241 # Check close() methods
244 print('Testing close():')
246 for worker in pool._pool:
247 assert worker.is_alive()
249 result = pool.apply_async(time.sleep, [0.5])
250 pool.close()
251 pool.join()
253 assert result.get() is None
255 for worker in pool._pool:
256 assert not worker.is_alive()
258 print('\tclose() succeeded\n')
261 # Check terminate() method
264 print('Testing terminate():')
266 pool = multiprocessing.Pool(2)
267 DELTA = 0.1
268 ignore = pool.apply(pow3, [2])
269 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
270 pool.terminate()
271 pool.join()
273 for worker in pool._pool:
274 assert not worker.is_alive()
276 print('\tterminate() succeeded\n')
279 # Check garbage collection
282 print('Testing garbage collection:')
284 pool = multiprocessing.Pool(2)
285 DELTA = 0.1
286 processes = pool._pool
287 ignore = pool.apply(pow3, [2])
288 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
290 results = pool = None
292 time.sleep(DELTA * 2)
294 for worker in processes:
295 assert not worker.is_alive()
297 print('\tgarbage collection succeeded\n')
300 if __name__ == '__main__':
301 multiprocessing.freeze_support()
303 assert len(sys.argv) in (1, 2)
305 if len(sys.argv) == 1 or sys.argv[1] == 'processes':
306 print(' Using processes '.center(79, '-'))
307 elif sys.argv[1] == 'threads':
308 print(' Using threads '.center(79, '-'))
309 import multiprocessing.dummy as multiprocessing
310 else:
311 print('Usage:\n\t%s [processes | threads]' % sys.argv[0])
312 raise SystemExit(2)
314 test()