2 # Module which supports allocation of memory from an mmap
4 # multiprocessing/heap.py
6 # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
17 import _multiprocessing
18 from multiprocessing
.util
import Finalize
, info
19 from multiprocessing
.forking
import assert_spawning
21 __all__
= ['BufferWrapper']
24 # Inheirtable class which wraps an mmap, and from which blocks can be allocated
27 if sys
.platform
== 'win32':
29 from ._multiprocessing
import win32
33 _counter
= itertools
.count()
35 def __init__(self
, size
):
37 self
.name
= 'pym-%d-%d' % (os
.getpid(), Arena
._counter
.next())
38 self
.buffer = mmap
.mmap(-1, self
.size
, tagname
=self
.name
)
39 assert win32
.GetLastError() == 0, 'tagname already in use'
40 self
._state
= (self
.size
, self
.name
)
42 def __getstate__(self
):
46 def __setstate__(self
, state
):
47 self
.size
, self
.name
= self
._state
= state
48 self
.buffer = mmap
.mmap(-1, self
.size
, tagname
=self
.name
)
49 assert win32
.GetLastError() == win32
.ERROR_ALREADY_EXISTS
55 def __init__(self
, size
):
56 self
.buffer = mmap
.mmap(-1, size
)
61 # Class allowing allocation of chunks of memory from arenas
68 def __init__(self
, size
=mmap
.PAGESIZE
):
69 self
._lastpid
= os
.getpid()
70 self
._lock
= threading
.Lock()
74 self
._start
_to
_block
= {}
75 self
._stop
_to
_block
= {}
76 self
._allocated
_blocks
= set()
80 def _roundup(n
, alignment
):
81 # alignment must be a power of 2
83 return (n
+ mask
) & ~mask
85 def _malloc(self
, size
):
86 # returns a large enough block -- it might be much larger
87 i
= bisect
.bisect_left(self
._lengths
, size
)
88 if i
== len(self
._lengths
):
89 length
= self
._roundup
(max(self
._size
, size
), mmap
.PAGESIZE
)
91 info('allocating a new mmap of length %d', length
)
93 self
._arenas
.append(arena
)
94 return (arena
, 0, length
)
96 length
= self
._lengths
[i
]
97 seq
= self
._len
_to
_seq
[length
]
100 del self
._len
_to
_seq
[length
], self
._lengths
[i
]
102 (arena
, start
, stop
) = block
103 del self
._start
_to
_block
[(arena
, start
)]
104 del self
._stop
_to
_block
[(arena
, stop
)]
107 def _free(self
, block
):
108 # free location and try to merge with neighbours
109 (arena
, start
, stop
) = block
112 prev_block
= self
._stop
_to
_block
[(arena
, start
)]
116 start
, _
= self
._absorb
(prev_block
)
119 next_block
= self
._start
_to
_block
[(arena
, stop
)]
123 _
, stop
= self
._absorb
(next_block
)
125 block
= (arena
, start
, stop
)
126 length
= stop
- start
129 self
._len
_to
_seq
[length
].append(block
)
131 self
._len
_to
_seq
[length
] = [block
]
132 bisect
.insort(self
._lengths
, length
)
134 self
._start
_to
_block
[(arena
, start
)] = block
135 self
._stop
_to
_block
[(arena
, stop
)] = block
137 def _absorb(self
, block
):
138 # deregister this block so it can be merged with a neighbour
139 (arena
, start
, stop
) = block
140 del self
._start
_to
_block
[(arena
, start
)]
141 del self
._stop
_to
_block
[(arena
, stop
)]
143 length
= stop
- start
144 seq
= self
._len
_to
_seq
[length
]
147 del self
._len
_to
_seq
[length
]
148 self
._lengths
.remove(length
)
152 def free(self
, block
):
153 # free a block returned by malloc()
154 assert os
.getpid() == self
._lastpid
157 self
._allocated
_blocks
.remove(block
)
162 def malloc(self
, size
):
163 # return a block of right size (possibly rounded up)
164 assert 0 <= size
< sys
.maxint
165 if os
.getpid() != self
._lastpid
:
166 self
.__init
__() # reinitialize after fork
169 size
= self
._roundup
(max(size
,1), self
._alignment
)
170 (arena
, start
, stop
) = self
._malloc
(size
)
171 new_stop
= start
+ size
173 self
._free
((arena
, new_stop
, stop
))
174 block
= (arena
, start
, new_stop
)
175 self
._allocated
_blocks
.add(block
)
181 # Class representing a chunk of an mmap -- can be inherited
184 class BufferWrapper(object):
188 def __init__(self
, size
):
189 assert 0 <= size
< sys
.maxint
190 block
= BufferWrapper
._heap
.malloc(size
)
191 self
._state
= (block
, size
)
192 Finalize(self
, BufferWrapper
._heap
.free
, args
=(block
,))
194 def get_address(self
):
195 (arena
, start
, stop
), size
= self
._state
196 address
, length
= _multiprocessing
.address_of_buffer(arena
.buffer)
197 assert size
<= length
198 return address
+ start
201 return self
._state
[1]