2 # Module implementing synchronization primitives
4 # multiprocessing/synchronize.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
10 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
17 from time
import time
as _time
, sleep
as _sleep
19 import _multiprocessing
20 from multiprocessing
.process
import current_process
21 from multiprocessing
.util
import Finalize
, register_after_fork
, debug
22 from multiprocessing
.forking
import assert_spawning
, Popen
28 RECURSIVE_MUTEX
, SEMAPHORE
= range(2)
29 SEM_VALUE_MAX
= _multiprocessing
.SemLock
.SEM_VALUE_MAX
32 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
35 class SemLock(object):
37 def __init__(self
, kind
, value
, maxvalue
):
38 sl
= self
._semlock
= _multiprocessing
.SemLock(kind
, value
, maxvalue
)
39 debug('created semlock with handle %s' % sl
.handle
)
42 if sys
.platform
!= 'win32':
44 obj
._semlock
._after
_fork
()
45 register_after_fork(self
, _after_fork
)
47 def _make_methods(self
):
48 self
.acquire
= self
._semlock
.acquire
49 self
.release
= self
._semlock
.release
50 self
.__enter
__ = self
._semlock
.__enter
__
51 self
.__exit
__ = self
._semlock
.__exit
__
53 def __getstate__(self
):
56 return (Popen
.duplicate_for_child(sl
.handle
), sl
.kind
, sl
.maxvalue
)
58 def __setstate__(self
, state
):
59 self
._semlock
= _multiprocessing
.SemLock
._rebuild
(*state
)
60 debug('recreated blocker with handle %r' % state
[0])
67 class Semaphore(SemLock
):
69 def __init__(self
, value
=1):
70 SemLock
.__init
__(self
, SEMAPHORE
, value
, SEM_VALUE_MAX
)
73 return self
._semlock
._get
_value
()
77 value
= self
._semlock
._get
_value
()
80 return '<Semaphore(value=%s)>' % value
86 class BoundedSemaphore(Semaphore
):
88 def __init__(self
, value
=1):
89 SemLock
.__init
__(self
, SEMAPHORE
, value
, value
)
93 value
= self
._semlock
._get
_value
()
96 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
97 (value
, self
._semlock
.maxvalue
)
106 SemLock
.__init
__(self
, SEMAPHORE
, 1, 1)
110 if self
._semlock
._is
_mine
():
111 name
= current_process().get_name()
112 if threading
.current_thread().get_name() != 'MainThread':
113 name
+= '|' + threading
.current_thread().get_name()
114 elif self
._semlock
._get
_value
() == 1:
116 elif self
._semlock
._count
() > 0:
117 name
= 'SomeOtherThread'
119 name
= 'SomeOtherProcess'
122 return '<Lock(owner=%s)>' % name
128 class RLock(SemLock
):
131 SemLock
.__init
__(self
, RECURSIVE_MUTEX
, 1, 1)
135 if self
._semlock
._is
_mine
():
136 name
= current_process().get_name()
137 if threading
.current_thread().get_name() != 'MainThread':
138 name
+= '|' + threading
.current_thread().get_name()
139 count
= self
._semlock
._count
()
140 elif self
._semlock
._get
_value
() == 1:
141 name
, count
= 'None', 0
142 elif self
._semlock
._count
() > 0:
143 name
, count
= 'SomeOtherThread', 'nonzero'
145 name
, count
= 'SomeOtherProcess', 'nonzero'
147 name
, count
= 'unknown', 'unknown'
148 return '<RLock(%s, %s)>' % (name
, count
)
154 class Condition(object):
156 def __init__(self
, lock
=None):
157 self
._lock
= lock
or RLock()
158 self
._sleeping
_count
= Semaphore(0)
159 self
._woken
_count
= Semaphore(0)
160 self
._wait
_semaphore
= Semaphore(0)
163 def __getstate__(self
):
164 assert_spawning(self
)
165 return (self
._lock
, self
._sleeping
_count
,
166 self
._woken
_count
, self
._wait
_semaphore
)
168 def __setstate__(self
, state
):
169 (self
._lock
, self
._sleeping
_count
,
170 self
._woken
_count
, self
._wait
_semaphore
) = state
173 def _make_methods(self
):
174 self
.acquire
= self
._lock
.acquire
175 self
.release
= self
._lock
.release
176 self
.__enter
__ = self
._lock
.__enter
__
177 self
.__exit
__ = self
._lock
.__exit
__
181 num_waiters
= (self
._sleeping
_count
._semlock
._get
_value
() -
182 self
._woken
_count
._semlock
._get
_value
())
184 num_waiters
= 'unkown'
185 return '<Condition(%s, %s)>' % (self
._lock
, num_waiters
)
187 def wait(self
, timeout
=None):
188 assert self
._lock
._semlock
._is
_mine
(), \
189 'must acquire() condition before using wait()'
191 # indicate that this thread is going to sleep
192 self
._sleeping
_count
.release()
195 count
= self
._lock
._semlock
._count
()
196 for i
in xrange(count
):
200 # wait for notification or timeout
201 self
._wait
_semaphore
.acquire(True, timeout
)
203 # indicate that this thread has woken
204 self
._woken
_count
.release()
207 for i
in xrange(count
):
211 assert self
._lock
._semlock
._is
_mine
(), 'lock is not owned'
212 assert not self
._wait
_semaphore
.acquire(False)
214 # to take account of timeouts since last notify() we subtract
215 # woken_count from sleeping_count and rezero woken_count
216 while self
._woken
_count
.acquire(False):
217 res
= self
._sleeping
_count
.acquire(False)
220 if self
._sleeping
_count
.acquire(False): # try grabbing a sleeper
221 self
._wait
_semaphore
.release() # wake up one sleeper
222 self
._woken
_count
.acquire() # wait for the sleeper to wake
224 # rezero _wait_semaphore in case a timeout just happened
225 self
._wait
_semaphore
.acquire(False)
227 def notify_all(self
):
228 assert self
._lock
._semlock
._is
_mine
(), 'lock is not owned'
229 assert not self
._wait
_semaphore
.acquire(False)
231 # to take account of timeouts since last notify*() we subtract
232 # woken_count from sleeping_count and rezero woken_count
233 while self
._woken
_count
.acquire(False):
234 res
= self
._sleeping
_count
.acquire(False)
238 while self
._sleeping
_count
.acquire(False):
239 self
._wait
_semaphore
.release() # wake up one sleeper
243 for i
in xrange(sleepers
):
244 self
._woken
_count
.acquire() # wait for a sleeper to wake
246 # rezero wait_semaphore in case some timeouts just happened
247 while self
._wait
_semaphore
.acquire(False):
257 self
._cond
= Condition(Lock())
258 self
._flag
= Semaphore(0)
263 if self
._flag
.acquire(False):
273 self
._flag
.acquire(False)
275 self
._cond
.notify_all()
282 self
._flag
.acquire(False)
286 def wait(self
, timeout
=None):
289 if self
._flag
.acquire(False):
292 self
._cond
.wait(timeout
)