2 # Module implementing queues
4 # multiprocessing/queues.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
9 __all__
= ['Queue', 'SimpleQueue', 'JoinableQueue']
19 from Queue
import Empty
, Full
20 import _multiprocessing
21 from multiprocessing
import Pipe
22 from multiprocessing
.synchronize
import Lock
, BoundedSemaphore
, Semaphore
, Condition
23 from multiprocessing
.util
import debug
, info
, Finalize
, register_after_fork
24 from multiprocessing
.forking
import assert_spawning
27 # Queue type using a pipe, buffer and thread
32 def __init__(self
, maxsize
=0):
34 maxsize
= _multiprocessing
.SemLock
.SEM_VALUE_MAX
35 self
._maxsize
= maxsize
36 self
._reader
, self
._writer
= Pipe(duplex
=False)
38 self
._opid
= os
.getpid()
39 if sys
.platform
== 'win32':
43 self
._sem
= BoundedSemaphore(maxsize
)
47 if sys
.platform
!= 'win32':
48 register_after_fork(self
, Queue
._after
_fork
)
50 def __getstate__(self
):
52 return (self
._maxsize
, self
._reader
, self
._writer
,
53 self
._rlock
, self
._wlock
, self
._sem
, self
._opid
)
55 def __setstate__(self
, state
):
56 (self
._maxsize
, self
._reader
, self
._writer
,
57 self
._rlock
, self
._wlock
, self
._sem
, self
._opid
) = state
60 def _after_fork(self
):
61 debug('Queue._after_fork()')
62 self
._notempty
= threading
.Condition(threading
.Lock())
63 self
._buffer
= collections
.deque()
65 self
._jointhread
= None
66 self
._joincancelled
= False
69 self
._send
= self
._writer
.send
70 self
._recv
= self
._reader
.recv
71 self
._poll
= self
._reader
.poll
73 def put(self
, obj
, block
=True, timeout
=None):
74 assert not self
._closed
75 if not self
._sem
.acquire(block
, timeout
):
78 self
._notempty
.acquire()
80 if self
._thread
is None:
82 self
._buffer
.append(obj
)
83 self
._notempty
.notify()
85 self
._notempty
.release()
87 def get(self
, block
=True, timeout
=None):
88 if block
and timeout
is None:
99 deadline
= time
.time() + timeout
100 if not self
._rlock
.acquire(block
, timeout
):
103 if not self
._poll
(block
and (deadline
-time
.time()) or 0.0):
109 self
._rlock
.release()
112 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
113 return self
._maxsize
- self
._sem
._semlock
._get
_value
()
116 return not self
._poll
()
119 return self
._sem
._semlock
._is
_zero
()
121 def get_nowait(self
):
122 return self
.get(False)
124 def put_nowait(self
, obj
):
125 return self
.put(obj
, False)
133 def join_thread(self
):
134 debug('Queue.join_thread()')
139 def cancel_join_thread(self
):
140 debug('Queue.cancel_join_thread()')
141 self
._joincancelled
= True
143 self
._jointhread
.cancel()
144 except AttributeError:
147 def _start_thread(self
):
148 debug('Queue._start_thread()')
150 # Start thread which transfers data from buffer to pipe
152 self
._thread
= threading
.Thread(
154 args
=(self
._buffer
, self
._notempty
, self
._send
,
155 self
._wlock
, self
._writer
.close
),
156 name
='QueueFeederThread'
158 self
._thread
.daemon
= True
160 debug('doing self._thread.start()')
162 debug('... done self._thread.start()')
164 # On process exit we will wait for data to be flushed to pipe.
166 # However, if this process created the queue then all
167 # processes which use the queue will be descendants of this
168 # process. Therefore waiting for the queue to be flushed
169 # is pointless once all the child processes have been joined.
170 created_by_this_process
= (self
._opid
== os
.getpid())
171 if not self
._joincancelled
and not created_by_this_process
:
172 self
._jointhread
= Finalize(
173 self
._thread
, Queue
._finalize
_join
,
174 [weakref
.ref(self
._thread
)],
178 # Send sentinel to the thread queue object when garbage collected
179 self
._close
= Finalize(
180 self
, Queue
._finalize
_close
,
181 [self
._buffer
, self
._notempty
],
186 def _finalize_join(twr
):
187 debug('joining queue thread')
189 if thread
is not None:
191 debug('... queue thread joined')
193 debug('... queue thread already dead')
196 def _finalize_close(buffer, notempty
):
197 debug('telling queue thread to quit')
200 buffer.append(_sentinel
)
206 def _feed(buffer, notempty
, send
, writelock
, close
):
207 debug('starting thread to feed data to pipe')
208 from .util
import is_exiting
210 nacquire
= notempty
.acquire
211 nrelease
= notempty
.release
212 nwait
= notempty
.wait
213 bpopleft
= buffer.popleft
215 if sys
.platform
!= 'win32':
216 wacquire
= writelock
.acquire
217 wrelease
= writelock
.release
233 debug('feeder thread got sentinel -- exiting')
248 # Since this runs in a daemon thread the resources it uses
249 # may be become unusable while the process is cleaning up.
250 # We ignore errors which happen after the process has
251 # started to cleanup.
254 info('error in queue thread: %s', e
)
257 traceback
.print_exc()
264 # A queue type which also supports join() and task_done() methods
266 # Note that if you do not call task_done() for each finished task then
267 # eventually the counter's semaphore may overflow causing Bad Things
271 class JoinableQueue(Queue
):
273 def __init__(self
, maxsize
=0):
274 Queue
.__init
__(self
, maxsize
)
275 self
._unfinished
_tasks
= Semaphore(0)
276 self
._cond
= Condition()
278 def __getstate__(self
):
279 return Queue
.__getstate
__(self
) + (self
._cond
, self
._unfinished
_tasks
)
281 def __setstate__(self
, state
):
282 Queue
.__setstate
__(self
, state
[:-2])
283 self
._cond
, self
._unfinished
_tasks
= state
[-2:]
285 def put(self
, item
, block
=True, timeout
=None):
286 Queue
.put(self
, item
, block
, timeout
)
287 self
._unfinished
_tasks
.release()
292 if not self
._unfinished
_tasks
.acquire(False):
293 raise ValueError('task_done() called too many times')
294 if self
._unfinished
_tasks
._semlock
._is
_zero
():
295 self
._cond
.notify_all()
302 if not self
._unfinished
_tasks
._semlock
._is
_zero
():
308 # Simplified Queue type -- really just a locked pipe
311 class SimpleQueue(object):
314 self
._reader
, self
._writer
= Pipe(duplex
=False)
316 if sys
.platform
== 'win32':
323 return not self
._reader
.poll()
325 def __getstate__(self
):
326 assert_spawning(self
)
327 return (self
._reader
, self
._writer
, self
._rlock
, self
._wlock
)
329 def __setstate__(self
, state
):
330 (self
._reader
, self
._writer
, self
._rlock
, self
._wlock
) = state
333 def _make_methods(self
):
334 recv
= self
._reader
.recv
335 racquire
, rrelease
= self
._rlock
.acquire
, self
._rlock
.release
344 if self
._wlock
is None:
345 # writes to a message oriented win32 pipe are atomic
346 self
.put
= self
._writer
.send
348 send
= self
._writer
.send
349 wacquire
, wrelease
= self
._wlock
.acquire
, self
._wlock
.release