2 # Module providing the `Pool` class for managing a process pool
4 # multiprocessing/pool.py
6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
21 from multiprocessing
import Process
, cpu_count
, TimeoutError
22 from multiprocessing
.util
import Finalize
, debug
25 # Constants representing the state of a pool
36 job_counter
= itertools
.count()
42 # Code run by worker processes
45 def worker(inqueue
, outqueue
, initializer
=None, initargs
=()):
48 if hasattr(inqueue
, '_writer'):
49 inqueue
._writer
.close()
50 outqueue
._reader
.close()
52 if initializer
is not None:
53 initializer(*initargs
)
58 except (EOFError, IOError):
59 debug('worker got EOFError or IOError -- exiting')
63 debug('worker got sentinel -- exiting')
66 job
, i
, func
, args
, kwds
= task
68 result
= (True, func(*args
, **kwds
))
74 # Class representing a process pool
79 Class which supports an async version of the `apply()` builtin
83 def __init__(self
, processes
=None, initializer
=None, initargs
=()):
85 self
._taskqueue
= Queue
.Queue()
91 processes
= cpu_count()
92 except NotImplementedError:
95 if initializer
is not None and not hasattr(initializer
, '__call__'):
96 raise TypeError('initializer must be a callable')
99 for i
in range(processes
):
102 args
=(self
._inqueue
, self
._outqueue
, initializer
, initargs
)
105 w
.name
= w
.name
.replace('Process', 'PoolWorker')
109 self
._task
_handler
= threading
.Thread(
110 target
=Pool
._handle
_tasks
,
111 args
=(self
._taskqueue
, self
._quick
_put
, self
._outqueue
, self
._pool
)
113 self
._task
_handler
.daemon
= True
114 self
._task
_handler
._state
= RUN
115 self
._task
_handler
.start()
117 self
._result
_handler
= threading
.Thread(
118 target
=Pool
._handle
_results
,
119 args
=(self
._outqueue
, self
._quick
_get
, self
._cache
)
121 self
._result
_handler
.daemon
= True
122 self
._result
_handler
._state
= RUN
123 self
._result
_handler
.start()
125 self
._terminate
= Finalize(
126 self
, self
._terminate
_pool
,
127 args
=(self
._taskqueue
, self
._inqueue
, self
._outqueue
, self
._pool
,
128 self
._task
_handler
, self
._result
_handler
, self
._cache
),
132 def _setup_queues(self
):
133 from .queues
import SimpleQueue
134 self
._inqueue
= SimpleQueue()
135 self
._outqueue
= SimpleQueue()
136 self
._quick
_put
= self
._inqueue
._writer
.send
137 self
._quick
_get
= self
._outqueue
._reader
.recv
139 def apply(self
, func
, args
=(), kwds
={}):
141 Equivalent of `apply()` builtin
143 assert self
._state
== RUN
144 return self
.apply_async(func
, args
, kwds
).get()
146 def map(self
, func
, iterable
, chunksize
=None):
148 Equivalent of `map()` builtin
150 assert self
._state
== RUN
151 return self
.map_async(func
, iterable
, chunksize
).get()
153 def imap(self
, func
, iterable
, chunksize
=1):
155 Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
157 assert self
._state
== RUN
159 result
= IMapIterator(self
._cache
)
160 self
._taskqueue
.put((((result
._job
, i
, func
, (x
,), {})
161 for i
, x
in enumerate(iterable
)), result
._set
_length
))
165 task_batches
= Pool
._get
_tasks
(func
, iterable
, chunksize
)
166 result
= IMapIterator(self
._cache
)
167 self
._taskqueue
.put((((result
._job
, i
, mapstar
, (x
,), {})
168 for i
, x
in enumerate(task_batches
)), result
._set
_length
))
169 return (item
for chunk
in result
for item
in chunk
)
171 def imap_unordered(self
, func
, iterable
, chunksize
=1):
173 Like `imap()` method but ordering of results is arbitrary
175 assert self
._state
== RUN
177 result
= IMapUnorderedIterator(self
._cache
)
178 self
._taskqueue
.put((((result
._job
, i
, func
, (x
,), {})
179 for i
, x
in enumerate(iterable
)), result
._set
_length
))
183 task_batches
= Pool
._get
_tasks
(func
, iterable
, chunksize
)
184 result
= IMapUnorderedIterator(self
._cache
)
185 self
._taskqueue
.put((((result
._job
, i
, mapstar
, (x
,), {})
186 for i
, x
in enumerate(task_batches
)), result
._set
_length
))
187 return (item
for chunk
in result
for item
in chunk
)
189 def apply_async(self
, func
, args
=(), kwds
={}, callback
=None):
191 Asynchronous equivalent of `apply()` builtin
193 assert self
._state
== RUN
194 result
= ApplyResult(self
._cache
, callback
)
195 self
._taskqueue
.put(([(result
._job
, None, func
, args
, kwds
)], None))
198 def map_async(self
, func
, iterable
, chunksize
=None, callback
=None):
200 Asynchronous equivalent of `map()` builtin
202 assert self
._state
== RUN
203 if not hasattr(iterable
, '__len__'):
204 iterable
= list(iterable
)
206 if chunksize
is None:
207 chunksize
, extra
= divmod(len(iterable
), len(self
._pool
) * 4)
210 if len(iterable
) == 0:
213 task_batches
= Pool
._get
_tasks
(func
, iterable
, chunksize
)
214 result
= MapResult(self
._cache
, chunksize
, len(iterable
), callback
)
215 self
._taskqueue
.put((((result
._job
, i
, mapstar
, (x
,), {})
216 for i
, x
in enumerate(task_batches
)), None))
220 def _handle_tasks(taskqueue
, put
, outqueue
, pool
):
221 thread
= threading
.current_thread()
223 for taskseq
, set_length
in iter(taskqueue
.get
, None):
225 for i
, task
in enumerate(taskseq
):
227 debug('task handler found thread._state != RUN')
232 debug('could not put task on queue')
236 debug('doing set_length()')
241 debug('task handler got sentinel')
245 # tell result handler to finish when cache is empty
246 debug('task handler sending sentinel to result handler')
249 # tell workers there is no more work
250 debug('task handler sending sentinel to workers')
254 debug('task handler got IOError when sending sentinels')
256 debug('task handler exiting')
259 def _handle_results(outqueue
, get
, cache
):
260 thread
= threading
.current_thread()
265 except (IOError, EOFError):
266 debug('result handler got EOFError/IOError -- exiting')
270 assert thread
._state
== TERMINATE
271 debug('result handler found thread._state=TERMINATE')
275 debug('result handler got sentinel')
280 cache
[job
]._set
(i
, obj
)
284 while cache
and thread
._state
!= TERMINATE
:
287 except (IOError, EOFError):
288 debug('result handler got EOFError/IOError -- exiting')
292 debug('result handler ignoring extra sentinel')
296 cache
[job
]._set
(i
, obj
)
300 if hasattr(outqueue
, '_reader'):
301 debug('ensuring that outqueue is not full')
302 # If we don't make room available in outqueue then
303 # attempts to add the sentinel (None) to outqueue may
304 # block. There is guaranteed to be no more than 2 sentinels.
307 if not outqueue
._reader
.poll():
310 except (IOError, EOFError):
313 debug('result handler exiting: len(cache)=%s, thread._state=%s',
314 len(cache
), thread
._state
)
317 def _get_tasks(func
, it
, size
):
320 x
= tuple(itertools
.islice(it
, size
))
325 def __reduce__(self
):
326 raise NotImplementedError(
327 'pool objects cannot be passed between processes or pickled'
331 debug('closing pool')
332 if self
._state
== RUN
:
334 self
._taskqueue
.put(None)
337 debug('terminating pool')
338 self
._state
= TERMINATE
342 debug('joining pool')
343 assert self
._state
in (CLOSE
, TERMINATE
)
344 self
._task
_handler
.join()
345 self
._result
_handler
.join()
350 def _help_stuff_finish(inqueue
, task_handler
, size
):
351 # task_handler may be blocked trying to put items on inqueue
352 debug('removing tasks from inqueue until task handler finished')
353 inqueue
._rlock
.acquire()
354 while task_handler
.is_alive() and inqueue
._reader
.poll():
355 inqueue
._reader
.recv()
359 def _terminate_pool(cls
, taskqueue
, inqueue
, outqueue
, pool
,
360 task_handler
, result_handler
, cache
):
361 # this is guaranteed to only be called once
362 debug('finalizing pool')
364 task_handler
._state
= TERMINATE
365 taskqueue
.put(None) # sentinel
367 debug('helping task handler/workers to finish')
368 cls
._help
_stuff
_finish
(inqueue
, task_handler
, len(pool
))
370 assert result_handler
.is_alive() or len(cache
) == 0
372 result_handler
._state
= TERMINATE
373 outqueue
.put(None) # sentinel
375 if pool
and hasattr(pool
[0], 'terminate'):
376 debug('terminating workers')
380 debug('joining task handler')
381 task_handler
.join(1e100
)
383 debug('joining result handler')
384 result_handler
.join(1e100
)
386 if pool
and hasattr(pool
[0], 'terminate'):
387 debug('joining pool workers')
392 # Class whose instances are returned by `Pool.apply_async()`
395 class ApplyResult(object):
397 def __init__(self
, cache
, callback
):
398 self
._cond
= threading
.Condition(threading
.Lock())
399 self
._job
= job_counter
.next()
402 self
._callback
= callback
403 cache
[self
._job
] = self
408 def successful(self
):
412 def wait(self
, timeout
=None):
416 self
._cond
.wait(timeout
)
420 def get(self
, timeout
=None):
429 def _set(self
, i
, obj
):
430 self
._success
, self
._value
= obj
431 if self
._callback
and self
._success
:
432 self
._callback
(self
._value
)
439 del self
._cache
[self
._job
]
442 # Class whose instances are returned by `Pool.map_async()`
445 class MapResult(ApplyResult
):
447 def __init__(self
, cache
, chunksize
, length
, callback
):
448 ApplyResult
.__init
__(self
, cache
, callback
)
450 self
._value
= [None] * length
451 self
._chunksize
= chunksize
453 self
._number
_left
= 0
456 self
._number
_left
= length
//chunksize
+ bool(length
% chunksize
)
458 def _set(self
, i
, success_result
):
459 success
, result
= success_result
461 self
._value
[i
*self
._chunksize
:(i
+1)*self
._chunksize
] = result
462 self
._number
_left
-= 1
463 if self
._number
_left
== 0:
465 self
._callback
(self
._value
)
466 del self
._cache
[self
._job
]
475 self
._success
= False
477 del self
._cache
[self
._job
]
486 # Class whose instances are returned by `Pool.imap()`
489 class IMapIterator(object):
491 def __init__(self
, cache
):
492 self
._cond
= threading
.Condition(threading
.Lock())
493 self
._job
= job_counter
.next()
495 self
._items
= collections
.deque()
499 cache
[self
._job
] = self
504 def next(self
, timeout
=None):
508 item
= self
._items
.popleft()
510 if self
._index
== self
._length
:
512 self
._cond
.wait(timeout
)
514 item
= self
._items
.popleft()
516 if self
._index
== self
._length
:
522 success
, value
= item
527 __next__
= next
# XXX
529 def _set(self
, i
, obj
):
533 self
._items
.append(obj
)
535 while self
._index
in self
._unsorted
:
536 obj
= self
._unsorted
.pop(self
._index
)
537 self
._items
.append(obj
)
541 self
._unsorted
[i
] = obj
543 if self
._index
== self
._length
:
544 del self
._cache
[self
._job
]
548 def _set_length(self
, length
):
551 self
._length
= length
552 if self
._index
== self
._length
:
554 del self
._cache
[self
._job
]
559 # Class whose instances are returned by `Pool.imap_unordered()`
562 class IMapUnorderedIterator(IMapIterator
):
564 def _set(self
, i
, obj
):
567 self
._items
.append(obj
)
570 if self
._index
== self
._length
:
571 del self
._cache
[self
._job
]
579 class ThreadPool(Pool
):
581 from .dummy
import Process
583 def __init__(self
, processes
=None, initializer
=None, initargs
=()):
584 Pool
.__init
__(self
, processes
, initializer
, initargs
)
586 def _setup_queues(self
):
587 self
._inqueue
= Queue
.Queue()
588 self
._outqueue
= Queue
.Queue()
589 self
._quick
_put
= self
._inqueue
.put
590 self
._quick
_get
= self
._outqueue
.get
593 def _help_stuff_finish(inqueue
, task_handler
, size
):
594 # put sentinels at head of inqueue to make workers finish
595 inqueue
.not_empty
.acquire()
597 inqueue
.queue
.clear()
598 inqueue
.queue
.extend([None] * size
)
599 inqueue
.not_empty
.notify_all()
601 inqueue
.not_empty
.release()