Silence the DeprecationWarning raised by importing mimetools in BaseHTTPServer.
[python.git] / Lib / multiprocessing / synchronize.py
blob1ebd7b62e64b0b2b5d3980477180b84f87e2798b
2 # Module implementing synchronization primitives
4 # multiprocessing/synchronize.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
9 __all__ = [
10 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
13 import threading
14 import os
15 import sys
17 from time import time as _time, sleep as _sleep
19 import _multiprocessing
20 from multiprocessing.process import current_process
21 from multiprocessing.util import Finalize, register_after_fork, debug
22 from multiprocessing.forking import assert_spawning, Popen
25 # Constants
28 RECURSIVE_MUTEX, SEMAPHORE = range(2)
29 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
32 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
35 class SemLock(object):
37 def __init__(self, kind, value, maxvalue):
38 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
39 debug('created semlock with handle %s' % sl.handle)
40 self._make_methods()
42 if sys.platform != 'win32':
43 def _after_fork(obj):
44 obj._semlock._after_fork()
45 register_after_fork(self, _after_fork)
47 def _make_methods(self):
48 self.acquire = self._semlock.acquire
49 self.release = self._semlock.release
50 self.__enter__ = self._semlock.__enter__
51 self.__exit__ = self._semlock.__exit__
53 def __getstate__(self):
54 assert_spawning(self)
55 sl = self._semlock
56 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
58 def __setstate__(self, state):
59 self._semlock = _multiprocessing.SemLock._rebuild(*state)
60 debug('recreated blocker with handle %r' % state[0])
61 self._make_methods()
64 # Semaphore
67 class Semaphore(SemLock):
69 def __init__(self, value=1):
70 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
72 def get_value(self):
73 return self._semlock._get_value()
75 def __repr__(self):
76 try:
77 value = self._semlock._get_value()
78 except Exception:
79 value = 'unknown'
80 return '<Semaphore(value=%s)>' % value
83 # Bounded semaphore
86 class BoundedSemaphore(Semaphore):
88 def __init__(self, value=1):
89 SemLock.__init__(self, SEMAPHORE, value, value)
91 def __repr__(self):
92 try:
93 value = self._semlock._get_value()
94 except Exception:
95 value = 'unknown'
96 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
97 (value, self._semlock.maxvalue)
100 # Non-recursive lock
103 class Lock(SemLock):
105 def __init__(self):
106 SemLock.__init__(self, SEMAPHORE, 1, 1)
108 def __repr__(self):
109 try:
110 if self._semlock._is_mine():
111 name = current_process().get_name()
112 if threading.current_thread().get_name() != 'MainThread':
113 name += '|' + threading.current_thread().get_name()
114 elif self._semlock._get_value() == 1:
115 name = 'None'
116 elif self._semlock._count() > 0:
117 name = 'SomeOtherThread'
118 else:
119 name = 'SomeOtherProcess'
120 except Exception:
121 name = 'unknown'
122 return '<Lock(owner=%s)>' % name
125 # Recursive lock
128 class RLock(SemLock):
130 def __init__(self):
131 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
133 def __repr__(self):
134 try:
135 if self._semlock._is_mine():
136 name = current_process().get_name()
137 if threading.current_thread().get_name() != 'MainThread':
138 name += '|' + threading.current_thread().get_name()
139 count = self._semlock._count()
140 elif self._semlock._get_value() == 1:
141 name, count = 'None', 0
142 elif self._semlock._count() > 0:
143 name, count = 'SomeOtherThread', 'nonzero'
144 else:
145 name, count = 'SomeOtherProcess', 'nonzero'
146 except Exception:
147 name, count = 'unknown', 'unknown'
148 return '<RLock(%s, %s)>' % (name, count)
151 # Condition variable
154 class Condition(object):
156 def __init__(self, lock=None):
157 self._lock = lock or RLock()
158 self._sleeping_count = Semaphore(0)
159 self._woken_count = Semaphore(0)
160 self._wait_semaphore = Semaphore(0)
161 self._make_methods()
163 def __getstate__(self):
164 assert_spawning(self)
165 return (self._lock, self._sleeping_count,
166 self._woken_count, self._wait_semaphore)
168 def __setstate__(self, state):
169 (self._lock, self._sleeping_count,
170 self._woken_count, self._wait_semaphore) = state
171 self._make_methods()
173 def _make_methods(self):
174 self.acquire = self._lock.acquire
175 self.release = self._lock.release
176 self.__enter__ = self._lock.__enter__
177 self.__exit__ = self._lock.__exit__
179 def __repr__(self):
180 try:
181 num_waiters = (self._sleeping_count._semlock._get_value() -
182 self._woken_count._semlock._get_value())
183 except Exception:
184 num_waiters = 'unkown'
185 return '<Condition(%s, %s)>' % (self._lock, num_waiters)
187 def wait(self, timeout=None):
188 assert self._lock._semlock._is_mine(), \
189 'must acquire() condition before using wait()'
191 # indicate that this thread is going to sleep
192 self._sleeping_count.release()
194 # release lock
195 count = self._lock._semlock._count()
196 for i in xrange(count):
197 self._lock.release()
199 try:
200 # wait for notification or timeout
201 self._wait_semaphore.acquire(True, timeout)
202 finally:
203 # indicate that this thread has woken
204 self._woken_count.release()
206 # reacquire lock
207 for i in xrange(count):
208 self._lock.acquire()
210 def notify(self):
211 assert self._lock._semlock._is_mine(), 'lock is not owned'
212 assert not self._wait_semaphore.acquire(False)
214 # to take account of timeouts since last notify() we subtract
215 # woken_count from sleeping_count and rezero woken_count
216 while self._woken_count.acquire(False):
217 res = self._sleeping_count.acquire(False)
218 assert res
220 if self._sleeping_count.acquire(False): # try grabbing a sleeper
221 self._wait_semaphore.release() # wake up one sleeper
222 self._woken_count.acquire() # wait for the sleeper to wake
224 # rezero _wait_semaphore in case a timeout just happened
225 self._wait_semaphore.acquire(False)
227 def notify_all(self):
228 assert self._lock._semlock._is_mine(), 'lock is not owned'
229 assert not self._wait_semaphore.acquire(False)
231 # to take account of timeouts since last notify*() we subtract
232 # woken_count from sleeping_count and rezero woken_count
233 while self._woken_count.acquire(False):
234 res = self._sleeping_count.acquire(False)
235 assert res
237 sleepers = 0
238 while self._sleeping_count.acquire(False):
239 self._wait_semaphore.release() # wake up one sleeper
240 sleepers += 1
242 if sleepers:
243 for i in xrange(sleepers):
244 self._woken_count.acquire() # wait for a sleeper to wake
246 # rezero wait_semaphore in case some timeouts just happened
247 while self._wait_semaphore.acquire(False):
248 pass
251 # Event
254 class Event(object):
256 def __init__(self):
257 self._cond = Condition(Lock())
258 self._flag = Semaphore(0)
260 def is_set(self):
261 self._cond.acquire()
262 try:
263 if self._flag.acquire(False):
264 self._flag.release()
265 return True
266 return False
267 finally:
268 self._cond.release()
270 def set(self):
271 self._cond.acquire()
272 try:
273 self._flag.acquire(False)
274 self._flag.release()
275 self._cond.notify_all()
276 finally:
277 self._cond.release()
279 def clear(self):
280 self._cond.acquire()
281 try:
282 self._flag.acquire(False)
283 finally:
284 self._cond.release()
286 def wait(self, timeout=None):
287 self._cond.acquire()
288 try:
289 if self._flag.acquire(False):
290 self._flag.release()
291 else:
292 self._cond.wait(timeout)
293 finally:
294 self._cond.release()