Fixed a wrong apostrophe
[python.git] / Lib / multiprocessing / sharedctypes.py
blob76b5e94b65bd3cf0140614beaa2ded9cbce0a9c1
2 # Module which supports allocation of ctypes objects from shared memory
4 # multiprocessing/sharedctypes.py
6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
9 import sys
10 import ctypes
11 import weakref
13 from multiprocessing import heap, RLock
14 from multiprocessing.forking import assert_spawning, ForkingPickler
16 __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
22 typecode_to_type = {
23 'c': ctypes.c_char, 'u': ctypes.c_wchar,
24 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
25 'h': ctypes.c_short, 'H': ctypes.c_ushort,
26 'i': ctypes.c_int, 'I': ctypes.c_uint,
27 'l': ctypes.c_long, 'L': ctypes.c_ulong,
28 'f': ctypes.c_float, 'd': ctypes.c_double
35 def _new_value(type_):
36 size = ctypes.sizeof(type_)
37 wrapper = heap.BufferWrapper(size)
38 return rebuild_ctype(type_, wrapper, None)
40 def RawValue(typecode_or_type, *args):
41 '''
42 Returns a ctypes object allocated from shared memory
43 '''
44 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
45 obj = _new_value(type_)
46 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
47 obj.__init__(*args)
48 return obj
50 def RawArray(typecode_or_type, size_or_initializer):
51 '''
52 Returns a ctypes array allocated from shared memory
53 '''
54 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
55 if isinstance(size_or_initializer, int):
56 type_ = type_ * size_or_initializer
57 return _new_value(type_)
58 else:
59 type_ = type_ * len(size_or_initializer)
60 result = _new_value(type_)
61 result.__init__(*size_or_initializer)
62 return result
64 def Value(typecode_or_type, *args, **kwds):
65 '''
66 Return a synchronization wrapper for a Value
67 '''
68 lock = kwds.pop('lock', None)
69 if kwds:
70 raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
71 obj = RawValue(typecode_or_type, *args)
72 if lock is False:
73 return obj
74 if lock in (True, None):
75 lock = RLock()
76 if not hasattr(lock, 'acquire'):
77 raise AttributeError("'%r' has no method 'acquire'" % lock)
78 return synchronized(obj, lock)
80 def Array(typecode_or_type, size_or_initializer, **kwds):
81 '''
82 Return a synchronization wrapper for a RawArray
83 '''
84 lock = kwds.pop('lock', None)
85 if kwds:
86 raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
87 obj = RawArray(typecode_or_type, size_or_initializer)
88 if lock is False:
89 return obj
90 if lock in (True, None):
91 lock = RLock()
92 if not hasattr(lock, 'acquire'):
93 raise AttributeError("'%r' has no method 'acquire'" % lock)
94 return synchronized(obj, lock)
96 def copy(obj):
97 new_obj = _new_value(type(obj))
98 ctypes.pointer(new_obj)[0] = obj
99 return new_obj
101 def synchronized(obj, lock=None):
102 assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
104 if isinstance(obj, ctypes._SimpleCData):
105 return Synchronized(obj, lock)
106 elif isinstance(obj, ctypes.Array):
107 if obj._type_ is ctypes.c_char:
108 return SynchronizedString(obj, lock)
109 return SynchronizedArray(obj, lock)
110 else:
111 cls = type(obj)
112 try:
113 scls = class_cache[cls]
114 except KeyError:
115 names = [field[0] for field in cls._fields_]
116 d = dict((name, make_property(name)) for name in names)
117 classname = 'Synchronized' + cls.__name__
118 scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
119 return scls(obj, lock)
122 # Functions for pickling/unpickling
125 def reduce_ctype(obj):
126 assert_spawning(obj)
127 if isinstance(obj, ctypes.Array):
128 return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
129 else:
130 return rebuild_ctype, (type(obj), obj._wrapper, None)
132 def rebuild_ctype(type_, wrapper, length):
133 if length is not None:
134 type_ = type_ * length
135 ForkingPickler.register(type_, reduce_ctype)
136 obj = type_.from_address(wrapper.get_address())
137 obj._wrapper = wrapper
138 return obj
141 # Function to create properties
144 def make_property(name):
145 try:
146 return prop_cache[name]
147 except KeyError:
148 d = {}
149 exec template % ((name,)*7) in d
150 prop_cache[name] = d[name]
151 return d[name]
153 template = '''
154 def get%s(self):
155 self.acquire()
156 try:
157 return self._obj.%s
158 finally:
159 self.release()
160 def set%s(self, value):
161 self.acquire()
162 try:
163 self._obj.%s = value
164 finally:
165 self.release()
166 %s = property(get%s, set%s)
169 prop_cache = {}
170 class_cache = weakref.WeakKeyDictionary()
173 # Synchronized wrappers
176 class SynchronizedBase(object):
178 def __init__(self, obj, lock=None):
179 self._obj = obj
180 self._lock = lock or RLock()
181 self.acquire = self._lock.acquire
182 self.release = self._lock.release
184 def __reduce__(self):
185 assert_spawning(self)
186 return synchronized, (self._obj, self._lock)
188 def get_obj(self):
189 return self._obj
191 def get_lock(self):
192 return self._lock
194 def __repr__(self):
195 return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
198 class Synchronized(SynchronizedBase):
199 value = make_property('value')
202 class SynchronizedArray(SynchronizedBase):
204 def __len__(self):
205 return len(self._obj)
207 def __getitem__(self, i):
208 self.acquire()
209 try:
210 return self._obj[i]
211 finally:
212 self.release()
214 def __setitem__(self, i, value):
215 self.acquire()
216 try:
217 self._obj[i] = value
218 finally:
219 self.release()
221 def __getslice__(self, start, stop):
222 self.acquire()
223 try:
224 return self._obj[start:stop]
225 finally:
226 self.release()
228 def __setslice__(self, start, stop, values):
229 self.acquire()
230 try:
231 self._obj[start:stop] = values
232 finally:
233 self.release()
236 class SynchronizedString(SynchronizedArray):
237 value = make_property('value')
238 raw = make_property('raw')