This should finally fix #6896. Let's watch the buildbots.
[python.git] / Demo / metaclasses / Synch.py
blob80e52d9fd411ef4f1917ce161c390f394619f8f4
1 """Synchronization metaclass.
3 This metaclass makes it possible to declare synchronized methods.
5 """
7 import thread
9 # First we need to define a reentrant lock.
10 # This is generally useful and should probably be in a standard Python
11 # library module. For now, we in-line it.
13 class Lock:
15 """Reentrant lock.
17 This is a mutex-like object which can be acquired by the same
18 thread more than once. It keeps a reference count of the number
19 of times it has been acquired by the same thread. Each acquire()
20 call must be matched by a release() call and only the last
21 release() call actually releases the lock for acquisition by
22 another thread.
24 The implementation uses two locks internally:
26 __mutex is a short term lock used to protect the instance variables
27 __wait is the lock for which other threads wait
29 A thread intending to acquire both locks should acquire __wait
30 first.
32 The implementation uses two other instance variables, protected by
33 locking __mutex:
35 __tid is the thread ID of the thread that currently has the lock
36 __count is the number of times the current thread has acquired it
38 When the lock is released, __tid is None and __count is zero.
40 """
42 def __init__(self):
43 """Constructor. Initialize all instance variables."""
44 self.__mutex = thread.allocate_lock()
45 self.__wait = thread.allocate_lock()
46 self.__tid = None
47 self.__count = 0
49 def acquire(self, flag=1):
50 """Acquire the lock.
52 If the optional flag argument is false, returns immediately
53 when it cannot acquire the __wait lock without blocking (it
54 may still block for a little while in order to acquire the
55 __mutex lock).
57 The return value is only relevant when the flag argument is
58 false; it is 1 if the lock is acquired, 0 if not.
60 """
61 self.__mutex.acquire()
62 try:
63 if self.__tid == thread.get_ident():
64 self.__count = self.__count + 1
65 return 1
66 finally:
67 self.__mutex.release()
68 locked = self.__wait.acquire(flag)
69 if not flag and not locked:
70 return 0
71 try:
72 self.__mutex.acquire()
73 assert self.__tid == None
74 assert self.__count == 0
75 self.__tid = thread.get_ident()
76 self.__count = 1
77 return 1
78 finally:
79 self.__mutex.release()
81 def release(self):
82 """Release the lock.
84 If this thread doesn't currently have the lock, an assertion
85 error is raised.
87 Only allow another thread to acquire the lock when the count
88 reaches zero after decrementing it.
90 """
91 self.__mutex.acquire()
92 try:
93 assert self.__tid == thread.get_ident()
94 assert self.__count > 0
95 self.__count = self.__count - 1
96 if self.__count == 0:
97 self.__tid = None
98 self.__wait.release()
99 finally:
100 self.__mutex.release()
103 def _testLock():
105 done = []
107 def f2(lock, done=done):
108 lock.acquire()
109 print "f2 running in thread %d\n" % thread.get_ident(),
110 lock.release()
111 done.append(1)
113 def f1(lock, f2=f2, done=done):
114 lock.acquire()
115 print "f1 running in thread %d\n" % thread.get_ident(),
116 try:
117 f2(lock)
118 finally:
119 lock.release()
120 done.append(1)
122 lock = Lock()
123 lock.acquire()
124 f1(lock) # Adds 2 to done
125 lock.release()
127 lock.acquire()
129 thread.start_new_thread(f1, (lock,)) # Adds 2
130 thread.start_new_thread(f1, (lock, f1)) # Adds 3
131 thread.start_new_thread(f2, (lock,)) # Adds 1
132 thread.start_new_thread(f2, (lock,)) # Adds 1
134 lock.release()
135 import time
136 while len(done) < 9:
137 print len(done)
138 time.sleep(0.001)
139 print len(done)
142 # Now, the Locking metaclass is a piece of cake.
143 # As an example feature, methods whose name begins with exactly one
144 # underscore are not synchronized.
146 from Meta import MetaClass, MetaHelper, MetaMethodWrapper
148 class LockingMethodWrapper(MetaMethodWrapper):
149 def __call__(self, *args, **kw):
150 if self.__name__[:1] == '_' and self.__name__[1:] != '_':
151 return apply(self.func, (self.inst,) + args, kw)
152 self.inst.__lock__.acquire()
153 try:
154 return apply(self.func, (self.inst,) + args, kw)
155 finally:
156 self.inst.__lock__.release()
158 class LockingHelper(MetaHelper):
159 __methodwrapper__ = LockingMethodWrapper
160 def __helperinit__(self, formalclass):
161 MetaHelper.__helperinit__(self, formalclass)
162 self.__lock__ = Lock()
164 class LockingMetaClass(MetaClass):
165 __helper__ = LockingHelper
167 Locking = LockingMetaClass('Locking', (), {})
169 def _test():
170 # For kicks, take away the Locking base class and see it die
171 class Buffer(Locking):
172 def __init__(self, initialsize):
173 assert initialsize > 0
174 self.size = initialsize
175 self.buffer = [None]*self.size
176 self.first = self.last = 0
177 def put(self, item):
178 # Do we need to grow the buffer?
179 if (self.last+1) % self.size != self.first:
180 # Insert the new item
181 self.buffer[self.last] = item
182 self.last = (self.last+1) % self.size
183 return
184 # Double the buffer size
185 # First normalize it so that first==0 and last==size-1
186 print "buffer =", self.buffer
187 print "first = %d, last = %d, size = %d" % (
188 self.first, self.last, self.size)
189 if self.first <= self.last:
190 temp = self.buffer[self.first:self.last]
191 else:
192 temp = self.buffer[self.first:] + self.buffer[:self.last]
193 print "temp =", temp
194 self.buffer = temp + [None]*(self.size+1)
195 self.first = 0
196 self.last = self.size-1
197 self.size = self.size*2
198 print "Buffer size doubled to", self.size
199 print "new buffer =", self.buffer
200 print "first = %d, last = %d, size = %d" % (
201 self.first, self.last, self.size)
202 self.put(item) # Recursive call to test the locking
203 def get(self):
204 # Is the buffer empty?
205 if self.first == self.last:
206 raise EOFError # Avoid defining a new exception
207 item = self.buffer[self.first]
208 self.first = (self.first+1) % self.size
209 return item
211 def producer(buffer, wait, n=1000):
212 import time
213 i = 0
214 while i < n:
215 print "put", i
216 buffer.put(i)
217 i = i+1
218 print "Producer: done producing", n, "items"
219 wait.release()
221 def consumer(buffer, wait, n=1000):
222 import time
223 i = 0
224 tout = 0.001
225 while i < n:
226 try:
227 x = buffer.get()
228 if x != i:
229 raise AssertionError, \
230 "get() returned %s, expected %s" % (x, i)
231 print "got", i
232 i = i+1
233 tout = 0.001
234 except EOFError:
235 time.sleep(tout)
236 tout = tout*2
237 print "Consumer: done consuming", n, "items"
238 wait.release()
240 pwait = thread.allocate_lock()
241 pwait.acquire()
242 cwait = thread.allocate_lock()
243 cwait.acquire()
244 buffer = Buffer(1)
245 n = 1000
246 thread.start_new_thread(consumer, (buffer, cwait, n))
247 thread.start_new_thread(producer, (buffer, pwait, n))
248 pwait.acquire()
249 print "Producer done"
250 cwait.acquire()
251 print "All done"
252 print "buffer size ==", len(buffer.buffer)
254 if __name__ == '__main__':
255 _testLock()
256 _test()