2 # Module implementing synchronization primitives
4 # multiprocessing/synchronize.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
10 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
17 from time
import time
as _time
, sleep
as _sleep
19 import _multiprocessing
20 from multiprocessing
.process
import current_process
21 from multiprocessing
.util
import Finalize
, register_after_fork
, debug
22 from multiprocessing
.forking
import assert_spawning
, Popen
24 # Try to import the mp.synchronize module cleanly, if it fails
25 # raise ImportError for platforms lacking a working sem_open implementation.
28 from _multiprocessing
import SemLock
30 raise ImportError("This platform lacks a functioning sem_open" +
31 " implementation, therefore, the required" +
32 " synchronization primitives needed will not" +
33 " function, see issue 3770.")
39 RECURSIVE_MUTEX
, SEMAPHORE
= range(2)
40 SEM_VALUE_MAX
= _multiprocessing
.SemLock
.SEM_VALUE_MAX
43 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
46 class SemLock(object):
48 def __init__(self
, kind
, value
, maxvalue
):
49 sl
= self
._semlock
= _multiprocessing
.SemLock(kind
, value
, maxvalue
)
50 debug('created semlock with handle %s' % sl
.handle
)
53 if sys
.platform
!= 'win32':
55 obj
._semlock
._after
_fork
()
56 register_after_fork(self
, _after_fork
)
58 def _make_methods(self
):
59 self
.acquire
= self
._semlock
.acquire
60 self
.release
= self
._semlock
.release
63 return self
._semlock
.__enter
__()
65 def __exit__(self
, *args
):
66 return self
._semlock
.__exit
__(*args
)
68 def __getstate__(self
):
71 return (Popen
.duplicate_for_child(sl
.handle
), sl
.kind
, sl
.maxvalue
)
73 def __setstate__(self
, state
):
74 self
._semlock
= _multiprocessing
.SemLock
._rebuild
(*state
)
75 debug('recreated blocker with handle %r' % state
[0])
82 class Semaphore(SemLock
):
84 def __init__(self
, value
=1):
85 SemLock
.__init
__(self
, SEMAPHORE
, value
, SEM_VALUE_MAX
)
88 return self
._semlock
._get
_value
()
92 value
= self
._semlock
._get
_value
()
95 return '<Semaphore(value=%s)>' % value
101 class BoundedSemaphore(Semaphore
):
103 def __init__(self
, value
=1):
104 SemLock
.__init
__(self
, SEMAPHORE
, value
, value
)
108 value
= self
._semlock
._get
_value
()
111 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
112 (value
, self
._semlock
.maxvalue
)
121 SemLock
.__init
__(self
, SEMAPHORE
, 1, 1)
125 if self
._semlock
._is
_mine
():
126 name
= current_process().name
127 if threading
.current_thread().name
!= 'MainThread':
128 name
+= '|' + threading
.current_thread().name
129 elif self
._semlock
._get
_value
() == 1:
131 elif self
._semlock
._count
() > 0:
132 name
= 'SomeOtherThread'
134 name
= 'SomeOtherProcess'
137 return '<Lock(owner=%s)>' % name
143 class RLock(SemLock
):
146 SemLock
.__init
__(self
, RECURSIVE_MUTEX
, 1, 1)
150 if self
._semlock
._is
_mine
():
151 name
= current_process().name
152 if threading
.current_thread().name
!= 'MainThread':
153 name
+= '|' + threading
.current_thread().name
154 count
= self
._semlock
._count
()
155 elif self
._semlock
._get
_value
() == 1:
156 name
, count
= 'None', 0
157 elif self
._semlock
._count
() > 0:
158 name
, count
= 'SomeOtherThread', 'nonzero'
160 name
, count
= 'SomeOtherProcess', 'nonzero'
162 name
, count
= 'unknown', 'unknown'
163 return '<RLock(%s, %s)>' % (name
, count
)
169 class Condition(object):
171 def __init__(self
, lock
=None):
172 self
._lock
= lock
or RLock()
173 self
._sleeping
_count
= Semaphore(0)
174 self
._woken
_count
= Semaphore(0)
175 self
._wait
_semaphore
= Semaphore(0)
178 def __getstate__(self
):
179 assert_spawning(self
)
180 return (self
._lock
, self
._sleeping
_count
,
181 self
._woken
_count
, self
._wait
_semaphore
)
183 def __setstate__(self
, state
):
184 (self
._lock
, self
._sleeping
_count
,
185 self
._woken
_count
, self
._wait
_semaphore
) = state
189 return self
._lock
.__enter
__()
191 def __exit__(self
, *args
):
192 return self
._lock
.__exit
__(*args
)
194 def _make_methods(self
):
195 self
.acquire
= self
._lock
.acquire
196 self
.release
= self
._lock
.release
200 num_waiters
= (self
._sleeping
_count
._semlock
._get
_value
() -
201 self
._woken
_count
._semlock
._get
_value
())
203 num_waiters
= 'unkown'
204 return '<Condition(%s, %s)>' % (self
._lock
, num_waiters
)
206 def wait(self
, timeout
=None):
207 assert self
._lock
._semlock
._is
_mine
(), \
208 'must acquire() condition before using wait()'
210 # indicate that this thread is going to sleep
211 self
._sleeping
_count
.release()
214 count
= self
._lock
._semlock
._count
()
215 for i
in xrange(count
):
219 # wait for notification or timeout
220 self
._wait
_semaphore
.acquire(True, timeout
)
222 # indicate that this thread has woken
223 self
._woken
_count
.release()
226 for i
in xrange(count
):
230 assert self
._lock
._semlock
._is
_mine
(), 'lock is not owned'
231 assert not self
._wait
_semaphore
.acquire(False)
233 # to take account of timeouts since last notify() we subtract
234 # woken_count from sleeping_count and rezero woken_count
235 while self
._woken
_count
.acquire(False):
236 res
= self
._sleeping
_count
.acquire(False)
239 if self
._sleeping
_count
.acquire(False): # try grabbing a sleeper
240 self
._wait
_semaphore
.release() # wake up one sleeper
241 self
._woken
_count
.acquire() # wait for the sleeper to wake
243 # rezero _wait_semaphore in case a timeout just happened
244 self
._wait
_semaphore
.acquire(False)
246 def notify_all(self
):
247 assert self
._lock
._semlock
._is
_mine
(), 'lock is not owned'
248 assert not self
._wait
_semaphore
.acquire(False)
250 # to take account of timeouts since last notify*() we subtract
251 # woken_count from sleeping_count and rezero woken_count
252 while self
._woken
_count
.acquire(False):
253 res
= self
._sleeping
_count
.acquire(False)
257 while self
._sleeping
_count
.acquire(False):
258 self
._wait
_semaphore
.release() # wake up one sleeper
262 for i
in xrange(sleepers
):
263 self
._woken
_count
.acquire() # wait for a sleeper to wake
265 # rezero wait_semaphore in case some timeouts just happened
266 while self
._wait
_semaphore
.acquire(False):
276 self
._cond
= Condition(Lock())
277 self
._flag
= Semaphore(0)
282 if self
._flag
.acquire(False):
292 self
._flag
.acquire(False)
294 self
._cond
.notify_all()
301 self
._flag
.acquire(False)
305 def wait(self
, timeout
=None):
308 if self
._flag
.acquire(False):
311 self
._cond
.wait(timeout
)
313 if self
._flag
.acquire(False):