Move pure-fastimport code into its own directory, in preparation of splitting it...
[bzr-fastimport.git] / cache_manager.py
blob5a31a00a40cd9947c9a4b0c68394272d5267f9b6
1 # Copyright (C) 2009 Canonical Ltd
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 """A manager of caches."""
19 import atexit
20 import os
21 import shutil
22 import tempfile
23 import weakref
25 from bzrlib import lru_cache, trace
26 from bzrlib.plugins.fastimport import branch_mapper, helpers
29 class _Cleanup(object):
30 """This class makes sure we clean up when CacheManager goes away.
32 We use a helper class to ensure that we are never in a refcycle.
33 """
35 def __init__(self, disk_blobs):
36 self.disk_blobs = disk_blobs
37 self.tempdir = None
38 self.small_blobs = None
40 def __del__(self):
41 self.finalize()
43 def finalize(self):
44 if self.disk_blobs is not None:
45 for info in self.disk_blobs.itervalues():
46 if info[-1] is not None:
47 os.unlink(info[-1])
48 self.disk_blobs = None
49 if self.small_blobs is not None:
50 self.small_blobs.close()
51 self.small_blobs = None
52 if self.tempdir is not None:
53 shutil.rmtree(self.tempdir)
56 class _Cleanup(object):
57 """This class makes sure we clean up when CacheManager goes away.
59 We use a helper class to ensure that we are never in a refcycle.
60 """
62 def __init__(self, disk_blobs):
63 self.disk_blobs = disk_blobs
64 self.tempdir = None
65 self.small_blobs = None
67 def __del__(self):
68 self.finalize()
70 def finalize(self):
71 if self.disk_blobs is not None:
72 for info in self.disk_blobs.itervalues():
73 if info[-1] is not None:
74 os.unlink(info[-1])
75 self.disk_blobs = None
76 if self.small_blobs is not None:
77 self.small_blobs.close()
78 self.small_blobs = None
79 if self.tempdir is not None:
80 shutils.rmtree(self.tempdir)
83 class CacheManager(object):
85 _small_blob_threshold = 25*1024
86 _sticky_cache_size = 300*1024*1024
87 _sticky_flushed_size = 100*1024*1024
89 def __init__(self, info=None, verbose=False, inventory_cache_size=10):
90 """Create a manager of caches.
92 :param info: a ConfigObj holding the output from
93 the --info processor, or None if no hints are available
94 """
95 self.verbose = verbose
97 # dataref -> data. datref is either :mark or the sha-1.
98 # Sticky blobs are referenced more than once, and are saved until their
99 # refcount goes to 0
100 self._blobs = {}
101 self._sticky_blobs = {}
102 self._sticky_memory_bytes = 0
103 # if we overflow our memory cache, then we will dump large blobs to
104 # disk in this directory
105 self._tempdir = None
106 # id => (offset, n_bytes, fname)
107 # if fname is None, then the content is stored in the small file
108 self._disk_blobs = {}
109 self._cleanup = _Cleanup(self._disk_blobs)
111 # revision-id -> Inventory cache
112 # these are large and we probably don't need too many as
113 # most parents are recent in history
114 self.inventories = lru_cache.LRUCache(inventory_cache_size)
116 # import commmit-ids -> revision-id lookup table
117 # we need to keep all of these but they are small
118 self.revision_ids = {}
120 # (path, branch_ref) -> file-ids - as generated.
121 # (Use store_file_id/fetch_fileid methods rather than direct access.)
123 # Head tracking: last ref, last id per ref & map of commit ids to ref*s*
124 self.last_ref = None
125 self.last_ids = {}
126 self.heads = {}
128 # Work out the blobs to make sticky - None means all
129 self._blob_ref_counts = {}
130 if info is not None:
131 try:
132 blobs_by_counts = info['Blob reference counts']
133 # The parser hands values back as lists, already parsed
134 for count, blob_list in blobs_by_counts.items():
135 n = int(count)
136 for b in blob_list:
137 self._blob_ref_counts[b] = n
138 except KeyError:
139 # info not in file - possible when no blobs used
140 pass
142 # BranchMapper has no state (for now?), but we keep it around rather
143 # than reinstantiate on every usage
144 self.branch_mapper = branch_mapper.BranchMapper()
146 def dump_stats(self, note=trace.note):
147 """Dump some statistics about what we cached."""
148 # TODO: add in inventory stastistics
149 note("Cache statistics:")
150 self._show_stats_for(self._sticky_blobs, "sticky blobs", note=note)
151 self._show_stats_for(self.revision_ids, "revision-ids", note=note)
152 # These aren't interesting so omit from the output, at least for now
153 #self._show_stats_for(self._blobs, "other blobs", note=note)
154 #self._show_stats_for(self.last_ids, "last-ids", note=note)
155 #self._show_stats_for(self.heads, "heads", note=note)
157 def _show_stats_for(self, dict, label, note=trace.note, tuple_key=False):
158 """Dump statistics about a given dictionary.
160 By the key and value need to support len().
162 count = len(dict)
163 if tuple_key:
164 size = sum(map(len, (''.join(k) for k in dict.keys())))
165 else:
166 size = sum(map(len, dict.keys()))
167 size += sum(map(len, dict.values()))
168 size = size * 1.0 / 1024
169 unit = 'K'
170 if size > 1024:
171 size = size / 1024
172 unit = 'M'
173 if size > 1024:
174 size = size / 1024
175 unit = 'G'
176 note(" %-12s: %8.1f %s (%d %s)" % (label, size, unit, count,
177 helpers.single_plural(count, "item", "items")))
179 def clear_all(self):
180 """Free up any memory used by the caches."""
181 self._blobs.clear()
182 self._sticky_blobs.clear()
183 self.revision_ids.clear()
184 self.last_ids.clear()
185 self.heads.clear()
186 self.inventories.clear()
188 def _flush_blobs_to_disk(self):
189 blobs = self._sticky_blobs.keys()
190 sticky_blobs = self._sticky_blobs
191 total_blobs = len(sticky_blobs)
192 blobs.sort(key=lambda k:len(sticky_blobs[k]))
193 if self._tempdir is None:
194 tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-')
195 self._tempdir = tempdir
196 self._cleanup.tempdir = self._tempdir
197 self._cleanup.small_blobs = tempfile.TemporaryFile(
198 prefix='small-blobs-', dir=self._tempdir)
199 small_blob_ref = weakref.ref(self._cleanup.small_blobs)
200 # Even though we add it to _Cleanup it seems that the object can be
201 # destroyed 'too late' for cleanup to actually occur. Probably a
202 # combination of bzr's "die directly, don't clean up" and how
203 # exceptions close the running stack.
204 def exit_cleanup():
205 small_blob = small_blob_ref()
206 if small_blob is not None:
207 small_blob.close()
208 shutil.rmtree(tempdir, ignore_errors=True)
209 atexit.register(exit_cleanup)
210 count = 0
211 bytes = 0
212 n_small_bytes = 0
213 while self._sticky_memory_bytes > self._sticky_flushed_size:
214 id = blobs.pop()
215 blob = self._sticky_blobs.pop(id)
216 n_bytes = len(blob)
217 self._sticky_memory_bytes -= n_bytes
218 if n_bytes < self._small_blob_threshold:
219 f = self._cleanup.small_blobs
220 f.seek(0, os.SEEK_END)
221 self._disk_blobs[id] = (f.tell(), n_bytes, None)
222 f.write(blob)
223 n_small_bytes += n_bytes
224 else:
225 fd, name = tempfile.mkstemp(prefix='blob-', dir=self._tempdir)
226 os.write(fd, blob)
227 os.close(fd)
228 self._disk_blobs[id] = (0, n_bytes, name)
229 bytes += n_bytes
230 del blob
231 count += 1
232 trace.note('flushed %d/%d blobs w/ %.1fMB (%.1fMB small) to disk'
233 % (count, total_blobs, bytes / 1024. / 1024,
234 n_small_bytes / 1024. / 1024))
237 def store_blob(self, id, data):
238 """Store a blob of data."""
239 # Note: If we're not reference counting, everything has to be sticky
240 if not self._blob_ref_counts or id in self._blob_ref_counts:
241 self._sticky_blobs[id] = data
242 self._sticky_memory_bytes += len(data)
243 if self._sticky_memory_bytes > self._sticky_cache_size:
244 self._flush_blobs_to_disk()
245 elif data == '':
246 # Empty data is always sticky
247 self._sticky_blobs[id] = data
248 else:
249 self._blobs[id] = data
251 def _decref(self, id, cache, fn):
252 if not self._blob_ref_counts:
253 return False
254 count = self._blob_ref_counts.get(id, None)
255 if count is not None:
256 count -= 1
257 if count <= 0:
258 del cache[id]
259 if fn is not None:
260 os.unlink(fn)
261 del self._blob_ref_counts[id]
262 return True
263 else:
264 self._blob_ref_counts[id] = count
265 return False
267 def fetch_blob(self, id):
268 """Fetch a blob of data."""
269 if id in self._blobs:
270 return self._blobs.pop(id)
271 if id in self._disk_blobs:
272 (offset, n_bytes, fn) = self._disk_blobs[id]
273 if fn is None:
274 f = self._cleanup.small_blobs
275 f.seek(offset)
276 content = f.read(n_bytes)
277 else:
278 fp = open(fn, 'rb')
279 try:
280 content = fp.read()
281 finally:
282 fp.close()
283 self._decref(id, self._disk_blobs, fn)
284 return content
285 content = self._sticky_blobs[id]
286 if self._decref(id, self._sticky_blobs, None):
287 self._sticky_memory_bytes -= len(content)
288 return content
290 def track_heads(self, cmd):
291 """Track the repository heads given a CommitCommand.
293 :param cmd: the CommitCommand
294 :return: the list of parents in terms of commit-ids
296 # Get the true set of parents
297 if cmd.from_ is not None:
298 parents = [cmd.from_]
299 else:
300 last_id = self.last_ids.get(cmd.ref)
301 if last_id is not None:
302 parents = [last_id]
303 else:
304 parents = []
305 parents.extend(cmd.merges)
307 # Track the heads
308 self.track_heads_for_ref(cmd.ref, cmd.id, parents)
309 return parents
311 def track_heads_for_ref(self, cmd_ref, cmd_id, parents=None):
312 if parents is not None:
313 for parent in parents:
314 if parent in self.heads:
315 del self.heads[parent]
316 self.heads.setdefault(cmd_id, set()).add(cmd_ref)
317 self.last_ids[cmd_ref] = cmd_id
318 self.last_ref = cmd_ref