Add NEWS entry as per RDM's suggestion (the bug was actually present
[python.git] / Lib / multiprocessing / heap.py
blob7e596ca70fa78e67576e38534f265c9b94112ba7
2 # Module which supports allocation of memory from an mmap
4 # multiprocessing/heap.py
6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
9 import bisect
10 import mmap
11 import tempfile
12 import os
13 import sys
14 import threading
15 import itertools
17 import _multiprocessing
18 from multiprocessing.util import Finalize, info
19 from multiprocessing.forking import assert_spawning
21 __all__ = ['BufferWrapper']
24 # Inheirtable class which wraps an mmap, and from which blocks can be allocated
27 if sys.platform == 'win32':
29 from ._multiprocessing import win32
31 class Arena(object):
33 _counter = itertools.count()
35 def __init__(self, size):
36 self.size = size
37 self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next())
38 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
39 assert win32.GetLastError() == 0, 'tagname already in use'
40 self._state = (self.size, self.name)
42 def __getstate__(self):
43 assert_spawning(self)
44 return self._state
46 def __setstate__(self, state):
47 self.size, self.name = self._state = state
48 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
49 assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS
51 else:
53 class Arena(object):
55 def __init__(self, size):
56 self.buffer = mmap.mmap(-1, size)
57 self.size = size
58 self.name = None
61 # Class allowing allocation of chunks of memory from arenas
64 class Heap(object):
66 _alignment = 8
68 def __init__(self, size=mmap.PAGESIZE):
69 self._lastpid = os.getpid()
70 self._lock = threading.Lock()
71 self._size = size
72 self._lengths = []
73 self._len_to_seq = {}
74 self._start_to_block = {}
75 self._stop_to_block = {}
76 self._allocated_blocks = set()
77 self._arenas = []
79 @staticmethod
80 def _roundup(n, alignment):
81 # alignment must be a power of 2
82 mask = alignment - 1
83 return (n + mask) & ~mask
85 def _malloc(self, size):
86 # returns a large enough block -- it might be much larger
87 i = bisect.bisect_left(self._lengths, size)
88 if i == len(self._lengths):
89 length = self._roundup(max(self._size, size), mmap.PAGESIZE)
90 self._size *= 2
91 info('allocating a new mmap of length %d', length)
92 arena = Arena(length)
93 self._arenas.append(arena)
94 return (arena, 0, length)
95 else:
96 length = self._lengths[i]
97 seq = self._len_to_seq[length]
98 block = seq.pop()
99 if not seq:
100 del self._len_to_seq[length], self._lengths[i]
102 (arena, start, stop) = block
103 del self._start_to_block[(arena, start)]
104 del self._stop_to_block[(arena, stop)]
105 return block
107 def _free(self, block):
108 # free location and try to merge with neighbours
109 (arena, start, stop) = block
111 try:
112 prev_block = self._stop_to_block[(arena, start)]
113 except KeyError:
114 pass
115 else:
116 start, _ = self._absorb(prev_block)
118 try:
119 next_block = self._start_to_block[(arena, stop)]
120 except KeyError:
121 pass
122 else:
123 _, stop = self._absorb(next_block)
125 block = (arena, start, stop)
126 length = stop - start
128 try:
129 self._len_to_seq[length].append(block)
130 except KeyError:
131 self._len_to_seq[length] = [block]
132 bisect.insort(self._lengths, length)
134 self._start_to_block[(arena, start)] = block
135 self._stop_to_block[(arena, stop)] = block
137 def _absorb(self, block):
138 # deregister this block so it can be merged with a neighbour
139 (arena, start, stop) = block
140 del self._start_to_block[(arena, start)]
141 del self._stop_to_block[(arena, stop)]
143 length = stop - start
144 seq = self._len_to_seq[length]
145 seq.remove(block)
146 if not seq:
147 del self._len_to_seq[length]
148 self._lengths.remove(length)
150 return start, stop
152 def free(self, block):
153 # free a block returned by malloc()
154 assert os.getpid() == self._lastpid
155 self._lock.acquire()
156 try:
157 self._allocated_blocks.remove(block)
158 self._free(block)
159 finally:
160 self._lock.release()
162 def malloc(self, size):
163 # return a block of right size (possibly rounded up)
164 assert 0 <= size < sys.maxint
165 if os.getpid() != self._lastpid:
166 self.__init__() # reinitialize after fork
167 self._lock.acquire()
168 try:
169 size = self._roundup(max(size,1), self._alignment)
170 (arena, start, stop) = self._malloc(size)
171 new_stop = start + size
172 if new_stop < stop:
173 self._free((arena, new_stop, stop))
174 block = (arena, start, new_stop)
175 self._allocated_blocks.add(block)
176 return block
177 finally:
178 self._lock.release()
181 # Class representing a chunk of an mmap -- can be inherited
184 class BufferWrapper(object):
186 _heap = Heap()
188 def __init__(self, size):
189 assert 0 <= size < sys.maxint
190 block = BufferWrapper._heap.malloc(size)
191 self._state = (block, size)
192 Finalize(self, BufferWrapper._heap.free, args=(block,))
194 def get_address(self):
195 (arena, start, stop), size = self._state
196 address, length = _multiprocessing.address_of_buffer(arena.buffer)
197 assert size <= length
198 return address + start
200 def get_size(self):
201 return self._state[1]