Cope with Debian's bzr using the system configobj, not shipping with its own.
[bzr-fastimport.git] / processors / generic_processor.py
blob65b3d4efebb46d46c6dac5fd39ac154920e6a141
1 # Copyright (C) 2008 Canonical Ltd
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 """Import processor that supports all Bazaar repository formats."""
20 import time
21 from bzrlib import (
22 bzrdir,
23 delta,
24 errors,
25 osutils,
26 progress,
28 from bzrlib.repofmt import pack_repo
29 from bzrlib.trace import note, mutter
30 try:
31 import bzrlib.util.configobj.configobj as configobj
32 except ImportError:
33 import configobj
34 from bzrlib.plugins.fastimport import (
35 branch_updater,
36 bzr_commit_handler,
37 cache_manager,
38 commands,
39 errors as plugin_errors,
40 helpers,
41 idmapfile,
42 marks_file,
43 processor,
44 revision_store,
48 # How many commits before automatically reporting progress
49 _DEFAULT_AUTO_PROGRESS = 1000
51 # How many commits before automatically checkpointing
52 _DEFAULT_AUTO_CHECKPOINT = 10000
54 # How many checkpoints before automatically packing
55 _DEFAULT_AUTO_PACK = 4
57 # How many inventories to cache
58 _DEFAULT_INV_CACHE_SIZE = 1
59 _DEFAULT_CHK_INV_CACHE_SIZE = 1
62 class GenericProcessor(processor.ImportProcessor):
63 """An import processor that handles basic imports.
65 Current features supported:
67 * blobs are cached in memory
68 * files and symlinks commits are supported
69 * checkpoints automatically happen at a configurable frequency
70 over and above the stream requested checkpoints
71 * timestamped progress reporting, both automatic and stream requested
72 * some basic statistics are dumped on completion.
74 At checkpoints and on completion, the commit-id -> revision-id map is
75 saved to a file called 'fastimport-id-map'. If the import crashes
76 or is interrupted, it can be started again and this file will be
77 used to skip over already loaded revisions. The format of each line
78 is "commit-id revision-id" so commit-ids cannot include spaces.
80 Here are the supported parameters:
82 * info - name of a hints file holding the analysis generated
83 by running the fast-import-info processor in verbose mode. When
84 importing large repositories, this parameter is needed so
85 that the importer knows what blobs to intelligently cache.
87 * trees - update the working trees before completing.
88 By default, the importer updates the repository
89 and branches and the user needs to run 'bzr update' for the
90 branches of interest afterwards.
92 * count - only import this many commits then exit. If not set
93 or negative, all commits are imported.
95 * checkpoint - automatically checkpoint every n commits over and
96 above any checkpoints contained in the import stream.
97 The default is 10000.
99 * autopack - pack every n checkpoints. The default is 4.
101 * inv-cache - number of inventories to cache.
102 If not set, the default is 1.
104 * mode - import algorithm to use: default, experimental or classic.
106 * import-marks - name of file to read to load mark information from
108 * export-marks - name of file to write to save mark information to
111 known_params = [
112 'info',
113 'trees',
114 'count',
115 'checkpoint',
116 'autopack',
117 'inv-cache',
118 'mode',
119 'import-marks',
120 'export-marks',
123 def __init__(self, bzrdir, params=None, verbose=False, outf=None,
124 prune_empty_dirs=True):
125 processor.ImportProcessor.__init__(self, bzrdir, params, verbose)
126 self.prune_empty_dirs = prune_empty_dirs
128 def pre_process(self):
129 self._start_time = time.time()
130 self._load_info_and_params()
131 if self.total_commits:
132 self.note("Starting import of %d commits ..." %
133 (self.total_commits,))
134 else:
135 self.note("Starting import ...")
136 self.cache_mgr = cache_manager.CacheManager(self.info, self.verbose,
137 self.inventory_cache_size)
139 if self.params.get("import-marks") is not None:
140 mark_info = marks_file.import_marks(self.params.get("import-marks"))
141 if mark_info is not None:
142 self.cache_mgr.revision_ids = mark_info[0]
143 self.skip_total = False
144 self.first_incremental_commit = True
145 else:
146 self.first_incremental_commit = False
147 self.skip_total = self._init_id_map()
148 if self.skip_total:
149 self.note("Found %d commits already loaded - "
150 "skipping over these ...", self.skip_total)
151 self._revision_count = 0
153 # mapping of tag name to revision_id
154 self.tags = {}
156 # Create the revision store to use for committing, if any
157 self.rev_store = self._revision_store_factory()
159 # Disable autopacking if the repo format supports it.
160 # THIS IS A HACK - there is no sanctioned way of doing this yet.
161 if isinstance(self.repo, pack_repo.KnitPackRepository):
162 self._original_max_pack_count = \
163 self.repo._pack_collection._max_pack_count
164 def _max_pack_count_for_import(total_revisions):
165 return total_revisions + 1
166 self.repo._pack_collection._max_pack_count = \
167 _max_pack_count_for_import
168 else:
169 self._original_max_pack_count = None
171 # Make groupcompress use the fast algorithm during importing.
172 # We want to repack at the end anyhow when more information
173 # is available to do a better job of saving space.
174 try:
175 from bzrlib import groupcompress
176 groupcompress._FAST = True
177 except ImportError:
178 pass
180 # Create a write group. This is committed at the end of the import.
181 # Checkpointing closes the current one and starts a new one.
182 self.repo.start_write_group()
184 def _load_info_and_params(self):
185 self._mode = bool(self.params.get('mode', 'default'))
186 self._experimental = self._mode == 'experimental'
188 # This is currently hard-coded but might be configurable via
189 # parameters one day if that's needed
190 repo_transport = self.repo.control_files._transport
191 self.id_map_path = repo_transport.local_abspath("fastimport-id-map")
193 # Load the info file, if any
194 info_path = self.params.get('info')
195 if info_path is not None:
196 self.info = configobj.ConfigObj(info_path)
197 else:
198 self.info = None
200 # Decide which CommitHandler to use
201 self.supports_chk = getattr(self.repo._format, 'supports_chks', False)
202 if self.supports_chk and self._mode == 'classic':
203 note("Cannot use classic algorithm on CHK repositories"
204 " - using default one instead")
205 self._mode = 'default'
206 if self._mode == 'classic':
207 self.commit_handler_factory = \
208 bzr_commit_handler.InventoryCommitHandler
209 else:
210 self.commit_handler_factory = \
211 bzr_commit_handler.InventoryDeltaCommitHandler
213 # Decide how often to automatically report progress
214 # (not a parameter yet)
215 self.progress_every = _DEFAULT_AUTO_PROGRESS
216 if self.verbose:
217 self.progress_every = self.progress_every / 10
219 # Decide how often (# of commits) to automatically checkpoint
220 self.checkpoint_every = int(self.params.get('checkpoint',
221 _DEFAULT_AUTO_CHECKPOINT))
223 # Decide how often (# of checkpoints) to automatically pack
224 self.checkpoint_count = 0
225 self.autopack_every = int(self.params.get('autopack',
226 _DEFAULT_AUTO_PACK))
228 # Decide how big to make the inventory cache
229 cache_size = int(self.params.get('inv-cache', -1))
230 if cache_size == -1:
231 if self.supports_chk:
232 cache_size = _DEFAULT_CHK_INV_CACHE_SIZE
233 else:
234 cache_size = _DEFAULT_INV_CACHE_SIZE
235 self.inventory_cache_size = cache_size
237 # Find the maximum number of commits to import (None means all)
238 # and prepare progress reporting. Just in case the info file
239 # has an outdated count of commits, we store the max counts
240 # at which we need to terminate separately to the total used
241 # for progress tracking.
242 try:
243 self.max_commits = int(self.params['count'])
244 if self.max_commits < 0:
245 self.max_commits = None
246 except KeyError:
247 self.max_commits = None
248 if self.info is not None:
249 self.total_commits = int(self.info['Command counts']['commit'])
250 if (self.max_commits is not None and
251 self.total_commits > self.max_commits):
252 self.total_commits = self.max_commits
253 else:
254 self.total_commits = self.max_commits
256 def _revision_store_factory(self):
257 """Make a RevisionStore based on what the repository supports."""
258 new_repo_api = hasattr(self.repo, 'revisions')
259 if new_repo_api:
260 return revision_store.RevisionStore2(self.repo)
261 elif not self._experimental:
262 return revision_store.RevisionStore1(self.repo)
263 else:
264 def fulltext_when(count):
265 total = self.total_commits
266 if total is not None and count == total:
267 fulltext = True
268 else:
269 # Create an inventory fulltext every 200 revisions
270 fulltext = count % 200 == 0
271 if fulltext:
272 self.note("%d commits - storing inventory as full-text",
273 count)
274 return fulltext
276 return revision_store.ImportRevisionStore1(
277 self.repo, self.inventory_cache_size,
278 fulltext_when=fulltext_when)
280 def _process(self, command_iter):
281 # if anything goes wrong, abort the write group if any
282 try:
283 processor.ImportProcessor._process(self, command_iter)
284 except:
285 if self.repo is not None and self.repo.is_in_write_group():
286 self.repo.abort_write_group()
287 raise
289 def post_process(self):
290 # Commit the current write group and checkpoint the id map
291 self.repo.commit_write_group()
292 self._save_id_map()
294 if self.params.get("export-marks") is not None:
295 marks_file.export_marks(self.params.get("export-marks"),
296 self.cache_mgr.revision_ids)
298 if self.cache_mgr.last_ref == None:
299 """Nothing to refresh"""
300 return
302 # Update the branches
303 self.note("Updating branch information ...")
304 updater = branch_updater.BranchUpdater(self.repo, self.branch,
305 self.cache_mgr, helpers.invert_dictset(self.cache_mgr.heads),
306 self.cache_mgr.last_ref, self.tags)
307 branches_updated, branches_lost = updater.update()
308 self._branch_count = len(branches_updated)
310 # Tell the user about branches that were not created
311 if branches_lost:
312 if not self.repo.is_shared():
313 self.warning("Cannot import multiple branches into "
314 "a standalone branch")
315 self.warning("Not creating branches for these head revisions:")
316 for lost_info in branches_lost:
317 head_revision = lost_info[1]
318 branch_name = lost_info[0]
319 self.note("\t %s = %s", head_revision, branch_name)
321 # Update the working trees as requested
322 self._tree_count = 0
323 remind_about_update = True
324 if self._branch_count == 0:
325 self.note("no branches to update")
326 self.note("no working trees to update")
327 remind_about_update = False
328 elif self.params.get('trees', False):
329 trees = self._get_working_trees(branches_updated)
330 if trees:
331 self._update_working_trees(trees)
332 remind_about_update = False
333 else:
334 self.warning("No working trees available to update")
335 else:
336 # Update just the trunk. (This is always the first branch
337 # returned by the branch updater.)
338 trunk_branch = branches_updated[0]
339 trees = self._get_working_trees([trunk_branch])
340 if trees:
341 self._update_working_trees(trees)
342 remind_about_update = self._branch_count > 1
344 # Dump the cache stats now because we clear it before the final pack
345 if self.verbose:
346 self.cache_mgr.dump_stats()
347 if self._original_max_pack_count:
348 # We earlier disabled autopacking, creating one pack every
349 # checkpoint instead. We now pack the repository to optimise
350 # how data is stored.
351 self.cache_mgr.clear_all()
352 self._pack_repository()
354 # Finish up by dumping stats & telling the user what to do next.
355 self.dump_stats()
356 if remind_about_update:
357 # This message is explicitly not timestamped.
358 note("To refresh the working tree for other branches, "
359 "use 'bzr update' inside that branch.")
361 def _update_working_trees(self, trees):
362 if self.verbose:
363 reporter = delta._ChangeReporter()
364 else:
365 reporter = None
366 for wt in trees:
367 self.note("Updating the working tree for %s ...", wt.basedir)
368 wt.update(reporter)
369 self._tree_count += 1
371 def _pack_repository(self, final=True):
372 # Before packing, free whatever memory we can and ensure
373 # that groupcompress is configured to optimise disk space
374 import gc
375 if final:
376 try:
377 from bzrlib import groupcompress
378 except ImportError:
379 pass
380 else:
381 groupcompress._FAST = False
382 gc.collect()
383 self.note("Packing repository ...")
384 self.repo.pack()
386 # To be conservative, packing puts the old packs and
387 # indices in obsolete_packs. We err on the side of
388 # optimism and clear out that directory to save space.
389 self.note("Removing obsolete packs ...")
390 # TODO: Use a public API for this once one exists
391 repo_transport = self.repo._pack_collection.transport
392 repo_transport.clone('obsolete_packs').delete_multi(
393 repo_transport.list_dir('obsolete_packs'))
395 # If we're not done, free whatever memory we can
396 if not final:
397 gc.collect()
399 def _get_working_trees(self, branches):
400 """Get the working trees for branches in the repository."""
401 result = []
402 wt_expected = self.repo.make_working_trees()
403 for br in branches:
404 if br is None:
405 continue
406 elif br == self.branch:
407 if self.working_tree:
408 result.append(self.working_tree)
409 elif wt_expected:
410 try:
411 result.append(br.bzrdir.open_workingtree())
412 except errors.NoWorkingTree:
413 self.warning("No working tree for branch %s", br)
414 return result
416 def dump_stats(self):
417 time_required = progress.str_tdelta(time.time() - self._start_time)
418 rc = self._revision_count - self.skip_total
419 bc = self._branch_count
420 wtc = self._tree_count
421 self.note("Imported %d %s, updating %d %s and %d %s in %s",
422 rc, helpers.single_plural(rc, "revision", "revisions"),
423 bc, helpers.single_plural(bc, "branch", "branches"),
424 wtc, helpers.single_plural(wtc, "tree", "trees"),
425 time_required)
427 def _init_id_map(self):
428 """Load the id-map and check it matches the repository.
430 :return: the number of entries in the map
432 # Currently, we just check the size. In the future, we might
433 # decide to be more paranoid and check that the revision-ids
434 # are identical as well.
435 self.cache_mgr.revision_ids, known = idmapfile.load_id_map(
436 self.id_map_path)
437 existing_count = len(self.repo.all_revision_ids())
438 if existing_count < known:
439 raise plugin_errors.BadRepositorySize(known, existing_count)
440 return known
442 def _save_id_map(self):
443 """Save the id-map."""
444 # Save the whole lot every time. If this proves a problem, we can
445 # change to 'append just the new ones' at a later time.
446 idmapfile.save_id_map(self.id_map_path, self.cache_mgr.revision_ids)
448 def blob_handler(self, cmd):
449 """Process a BlobCommand."""
450 if cmd.mark is not None:
451 dataref = cmd.id
452 else:
453 dataref = osutils.sha_strings(cmd.data)
454 self.cache_mgr.store_blob(dataref, cmd.data)
456 def checkpoint_handler(self, cmd):
457 """Process a CheckpointCommand."""
458 # Commit the current write group and start a new one
459 self.repo.commit_write_group()
460 self._save_id_map()
461 # track the number of automatic checkpoints done
462 if cmd is None:
463 self.checkpoint_count += 1
464 if self.checkpoint_count % self.autopack_every == 0:
465 self._pack_repository(final=False)
466 self.repo.start_write_group()
468 def commit_handler(self, cmd):
469 """Process a CommitCommand."""
470 if self.skip_total and self._revision_count < self.skip_total:
471 self.cache_mgr.track_heads(cmd)
472 # Check that we really do know about this commit-id
473 if not self.cache_mgr.revision_ids.has_key(cmd.id):
474 raise plugin_errors.BadRestart(cmd.id)
475 # Consume the file commands and free any non-sticky blobs
476 for fc in cmd.file_iter():
477 pass
478 self.cache_mgr._blobs = {}
479 self._revision_count += 1
480 if cmd.ref.startswith('refs/tags/'):
481 tag_name = cmd.ref[len('refs/tags/'):]
482 self._set_tag(tag_name, cmd.id)
483 return
484 if self.first_incremental_commit:
485 self.first_incremental_commit = None
486 parents = self.cache_mgr.track_heads(cmd)
488 # 'Commit' the revision and report progress
489 handler = self.commit_handler_factory(cmd, self.cache_mgr,
490 self.rev_store, verbose=self.verbose,
491 prune_empty_dirs=self.prune_empty_dirs)
492 try:
493 handler.process()
494 except:
495 print "ABORT: exception occurred processing commit %s" % (cmd.id)
496 raise
497 self.cache_mgr.revision_ids[cmd.id] = handler.revision_id
498 self._revision_count += 1
499 self.report_progress("(%s)" % cmd.id)
501 if cmd.ref.startswith('refs/tags/'):
502 tag_name = cmd.ref[len('refs/tags/'):]
503 self._set_tag(tag_name, cmd.id)
505 # Check if we should finish up or automatically checkpoint
506 if (self.max_commits is not None and
507 self._revision_count >= self.max_commits):
508 self.note("Stopping after reaching requested count of commits")
509 self.finished = True
510 elif self._revision_count % self.checkpoint_every == 0:
511 self.note("%d commits - automatic checkpoint triggered",
512 self._revision_count)
513 self.checkpoint_handler(None)
515 def report_progress(self, details=''):
516 if self._revision_count % self.progress_every == 0:
517 if self.total_commits is not None:
518 counts = "%d/%d" % (self._revision_count, self.total_commits)
519 else:
520 counts = "%d" % (self._revision_count,)
521 minutes = (time.time() - self._start_time) / 60
522 revisions_added = self._revision_count - self.skip_total
523 rate = revisions_added * 1.0 / minutes
524 if rate > 10:
525 rate_str = "at %.0f/minute " % rate
526 else:
527 rate_str = "at %.1f/minute " % rate
528 self.note("%s commits processed %s%s" % (counts, rate_str, details))
530 def progress_handler(self, cmd):
531 """Process a ProgressCommand."""
532 # Most progress messages embedded in streams are annoying.
533 # Ignore them unless in verbose mode.
534 if self.verbose:
535 self.note("progress %s" % (cmd.message,))
537 def reset_handler(self, cmd):
538 """Process a ResetCommand."""
539 if cmd.ref.startswith('refs/tags/'):
540 tag_name = cmd.ref[len('refs/tags/'):]
541 if cmd.from_ is not None:
542 self._set_tag(tag_name, cmd.from_)
543 elif self.verbose:
544 self.warning("ignoring reset refs/tags/%s - no from clause"
545 % tag_name)
546 return
548 if cmd.from_ is not None:
549 self.cache_mgr.track_heads_for_ref(cmd.ref, cmd.from_)
551 def tag_handler(self, cmd):
552 """Process a TagCommand."""
553 if cmd.from_ is not None:
554 self._set_tag(cmd.id, cmd.from_)
555 else:
556 self.warning("ignoring tag %s - no from clause" % cmd.id)
558 def _set_tag(self, name, from_):
559 """Define a tag given a name and import 'from' reference."""
560 bzr_tag_name = name.decode('utf-8', 'replace')
561 bzr_rev_id = self.cache_mgr.revision_ids[from_]
562 self.tags[bzr_tag_name] = bzr_rev_id
564 def feature_handler(self, cmd):
565 """Process a FeatureCommand."""
566 feature = cmd.feature_name
567 if feature not in commands.FEATURE_NAMES:
568 raise plugin_errors.UnknownFeature(feature)