Import helper functions that have been removed from python-fastimport.
[bzr-fastimport.git] / processors / generic_processor.py
blobae63ad842b23b80fd6364b46f45b75352c6f67fb
1 # Copyright (C) 2008 Canonical Ltd
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 """Import processor that supports all Bazaar repository formats."""
19 import time
20 from bzrlib import (
21 debug,
22 delta,
23 errors,
24 osutils,
25 progress,
27 try:
28 from bzrlib.repofmt.knitpack_repo import KnitPackRepository
29 except ImportError:
30 from bzrlib.repofmt.pack_repo import KnitPackRepository
31 from bzrlib.trace import (
32 mutter,
33 note,
34 warning,
36 try:
37 import bzrlib.util.configobj.configobj as configobj
38 except ImportError:
39 import configobj
40 from bzrlib.plugins.fastimport import (
41 branch_updater,
42 cache_manager,
43 helpers,
44 idmapfile,
45 marks_file,
46 revision_store,
48 from fastimport import (
49 commands,
50 errors as plugin_errors,
51 processor,
55 # How many commits before automatically reporting progress
56 _DEFAULT_AUTO_PROGRESS = 1000
58 # How many commits before automatically checkpointing
59 _DEFAULT_AUTO_CHECKPOINT = 10000
61 # How many checkpoints before automatically packing
62 _DEFAULT_AUTO_PACK = 4
64 # How many inventories to cache
65 _DEFAULT_INV_CACHE_SIZE = 1
66 _DEFAULT_CHK_INV_CACHE_SIZE = 1
69 class GenericProcessor(processor.ImportProcessor):
70 """An import processor that handles basic imports.
72 Current features supported:
74 * blobs are cached in memory
75 * files and symlinks commits are supported
76 * checkpoints automatically happen at a configurable frequency
77 over and above the stream requested checkpoints
78 * timestamped progress reporting, both automatic and stream requested
79 * some basic statistics are dumped on completion.
81 At checkpoints and on completion, the commit-id -> revision-id map is
82 saved to a file called 'fastimport-id-map'. If the import crashes
83 or is interrupted, it can be started again and this file will be
84 used to skip over already loaded revisions. The format of each line
85 is "commit-id revision-id" so commit-ids cannot include spaces.
87 Here are the supported parameters:
89 * info - name of a hints file holding the analysis generated
90 by running the fast-import-info processor in verbose mode. When
91 importing large repositories, this parameter is needed so
92 that the importer knows what blobs to intelligently cache.
94 * trees - update the working trees before completing.
95 By default, the importer updates the repository
96 and branches and the user needs to run 'bzr update' for the
97 branches of interest afterwards.
99 * count - only import this many commits then exit. If not set
100 or negative, all commits are imported.
102 * checkpoint - automatically checkpoint every n commits over and
103 above any checkpoints contained in the import stream.
104 The default is 10000.
106 * autopack - pack every n checkpoints. The default is 4.
108 * inv-cache - number of inventories to cache.
109 If not set, the default is 1.
111 * mode - import algorithm to use: default, experimental or classic.
113 * import-marks - name of file to read to load mark information from
115 * export-marks - name of file to write to save mark information to
118 known_params = [
119 'info',
120 'trees',
121 'count',
122 'checkpoint',
123 'autopack',
124 'inv-cache',
125 'mode',
126 'import-marks',
127 'export-marks',
130 def __init__(self, bzrdir, params=None, verbose=False, outf=None,
131 prune_empty_dirs=True):
132 processor.ImportProcessor.__init__(self, params, verbose)
133 self.prune_empty_dirs = prune_empty_dirs
134 self.bzrdir = bzrdir
135 try:
136 # Might be inside a branch
137 (self.working_tree, self.branch) = bzrdir._get_tree_branch()
138 self.repo = self.branch.repository
139 except errors.NotBranchError:
140 # Must be inside a repository
141 self.working_tree = None
142 self.branch = None
143 self.repo = bzrdir.open_repository()
145 def pre_process(self):
146 self._start_time = time.time()
147 self._load_info_and_params()
148 if self.total_commits:
149 self.note("Starting import of %d commits ..." %
150 (self.total_commits,))
151 else:
152 self.note("Starting import ...")
153 self.cache_mgr = cache_manager.CacheManager(self.info, self.verbose,
154 self.inventory_cache_size)
156 if self.params.get("import-marks") is not None:
157 mark_info = marks_file.import_marks(self.params.get("import-marks"))
158 if mark_info is not None:
159 self.cache_mgr.marks = mark_info
160 self.skip_total = False
161 self.first_incremental_commit = True
162 else:
163 self.first_incremental_commit = False
164 self.skip_total = self._init_id_map()
165 if self.skip_total:
166 self.note("Found %d commits already loaded - "
167 "skipping over these ...", self.skip_total)
168 self._revision_count = 0
170 # mapping of tag name to revision_id
171 self.tags = {}
173 # Create the revision store to use for committing, if any
174 self.rev_store = self._revision_store_factory()
176 # Disable autopacking if the repo format supports it.
177 # THIS IS A HACK - there is no sanctioned way of doing this yet.
178 if isinstance(self.repo, KnitPackRepository):
179 self._original_max_pack_count = \
180 self.repo._pack_collection._max_pack_count
181 def _max_pack_count_for_import(total_revisions):
182 return total_revisions + 1
183 self.repo._pack_collection._max_pack_count = \
184 _max_pack_count_for_import
185 else:
186 self._original_max_pack_count = None
188 # Make groupcompress use the fast algorithm during importing.
189 # We want to repack at the end anyhow when more information
190 # is available to do a better job of saving space.
191 try:
192 from bzrlib import groupcompress
193 groupcompress._FAST = True
194 except ImportError:
195 pass
197 # Create a write group. This is committed at the end of the import.
198 # Checkpointing closes the current one and starts a new one.
199 self.repo.start_write_group()
201 def _load_info_and_params(self):
202 from bzrlib.plugins.fastimport import bzr_commit_handler
203 self._mode = bool(self.params.get('mode', 'default'))
204 self._experimental = self._mode == 'experimental'
206 # This is currently hard-coded but might be configurable via
207 # parameters one day if that's needed
208 repo_transport = self.repo.control_files._transport
209 self.id_map_path = repo_transport.local_abspath("fastimport-id-map")
211 # Load the info file, if any
212 info_path = self.params.get('info')
213 if info_path is not None:
214 self.info = configobj.ConfigObj(info_path)
215 else:
216 self.info = None
218 # Decide which CommitHandler to use
219 self.supports_chk = getattr(self.repo._format, 'supports_chks', False)
220 if self.supports_chk and self._mode == 'classic':
221 note("Cannot use classic algorithm on CHK repositories"
222 " - using default one instead")
223 self._mode = 'default'
224 if self._mode == 'classic':
225 self.commit_handler_factory = \
226 bzr_commit_handler.InventoryCommitHandler
227 else:
228 self.commit_handler_factory = \
229 bzr_commit_handler.InventoryDeltaCommitHandler
231 # Decide how often to automatically report progress
232 # (not a parameter yet)
233 self.progress_every = _DEFAULT_AUTO_PROGRESS
234 if self.verbose:
235 self.progress_every = self.progress_every / 10
237 # Decide how often (# of commits) to automatically checkpoint
238 self.checkpoint_every = int(self.params.get('checkpoint',
239 _DEFAULT_AUTO_CHECKPOINT))
241 # Decide how often (# of checkpoints) to automatically pack
242 self.checkpoint_count = 0
243 self.autopack_every = int(self.params.get('autopack',
244 _DEFAULT_AUTO_PACK))
246 # Decide how big to make the inventory cache
247 cache_size = int(self.params.get('inv-cache', -1))
248 if cache_size == -1:
249 if self.supports_chk:
250 cache_size = _DEFAULT_CHK_INV_CACHE_SIZE
251 else:
252 cache_size = _DEFAULT_INV_CACHE_SIZE
253 self.inventory_cache_size = cache_size
255 # Find the maximum number of commits to import (None means all)
256 # and prepare progress reporting. Just in case the info file
257 # has an outdated count of commits, we store the max counts
258 # at which we need to terminate separately to the total used
259 # for progress tracking.
260 try:
261 self.max_commits = int(self.params['count'])
262 if self.max_commits < 0:
263 self.max_commits = None
264 except KeyError:
265 self.max_commits = None
266 if self.info is not None:
267 self.total_commits = int(self.info['Command counts']['commit'])
268 if (self.max_commits is not None and
269 self.total_commits > self.max_commits):
270 self.total_commits = self.max_commits
271 else:
272 self.total_commits = self.max_commits
274 def _revision_store_factory(self):
275 """Make a RevisionStore based on what the repository supports."""
276 new_repo_api = hasattr(self.repo, 'revisions')
277 if new_repo_api:
278 return revision_store.RevisionStore2(self.repo)
279 elif not self._experimental:
280 return revision_store.RevisionStore1(self.repo)
281 else:
282 def fulltext_when(count):
283 total = self.total_commits
284 if total is not None and count == total:
285 fulltext = True
286 else:
287 # Create an inventory fulltext every 200 revisions
288 fulltext = count % 200 == 0
289 if fulltext:
290 self.note("%d commits - storing inventory as full-text",
291 count)
292 return fulltext
294 return revision_store.ImportRevisionStore1(
295 self.repo, self.inventory_cache_size,
296 fulltext_when=fulltext_when)
298 def process(self, command_iter):
299 """Import data into Bazaar by processing a stream of commands.
301 :param command_iter: an iterator providing commands
303 if self.working_tree is not None:
304 self.working_tree.lock_write()
305 elif self.branch is not None:
306 self.branch.lock_write()
307 elif self.repo is not None:
308 self.repo.lock_write()
309 try:
310 super(GenericProcessor, self)._process(command_iter)
311 finally:
312 # If an unhandled exception occurred, abort the write group
313 if self.repo is not None and self.repo.is_in_write_group():
314 self.repo.abort_write_group()
315 # Release the locks
316 if self.working_tree is not None:
317 self.working_tree.unlock()
318 elif self.branch is not None:
319 self.branch.unlock()
320 elif self.repo is not None:
321 self.repo.unlock()
323 def _process(self, command_iter):
324 # if anything goes wrong, abort the write group if any
325 try:
326 processor.ImportProcessor._process(self, command_iter)
327 except:
328 if self.repo is not None and self.repo.is_in_write_group():
329 self.repo.abort_write_group()
330 raise
332 def post_process(self):
333 # Commit the current write group and checkpoint the id map
334 self.repo.commit_write_group()
335 self._save_id_map()
337 if self.params.get("export-marks") is not None:
338 marks_file.export_marks(self.params.get("export-marks"),
339 self.cache_mgr.marks)
341 if self.cache_mgr.reftracker.last_ref == None:
342 """Nothing to refresh"""
343 return
345 # Update the branches
346 self.note("Updating branch information ...")
347 updater = branch_updater.BranchUpdater(self.repo, self.branch,
348 self.cache_mgr, helpers.invert_dictset(
349 self.cache_mgr.reftracker.heads),
350 self.cache_mgr.reftracker.last_ref, self.tags)
351 branches_updated, branches_lost = updater.update()
352 self._branch_count = len(branches_updated)
354 # Tell the user about branches that were not created
355 if branches_lost:
356 if not self.repo.is_shared():
357 self.warning("Cannot import multiple branches into "
358 "a standalone branch")
359 self.warning("Not creating branches for these head revisions:")
360 for lost_info in branches_lost:
361 head_revision = lost_info[1]
362 branch_name = lost_info[0]
363 self.note("\t %s = %s", head_revision, branch_name)
365 # Update the working trees as requested
366 self._tree_count = 0
367 remind_about_update = True
368 if self._branch_count == 0:
369 self.note("no branches to update")
370 self.note("no working trees to update")
371 remind_about_update = False
372 elif self.params.get('trees', False):
373 trees = self._get_working_trees(branches_updated)
374 if trees:
375 self._update_working_trees(trees)
376 remind_about_update = False
377 else:
378 self.warning("No working trees available to update")
379 else:
380 # Update just the trunk. (This is always the first branch
381 # returned by the branch updater.)
382 trunk_branch = branches_updated[0]
383 trees = self._get_working_trees([trunk_branch])
384 if trees:
385 self._update_working_trees(trees)
386 remind_about_update = self._branch_count > 1
388 # Dump the cache stats now because we clear it before the final pack
389 if self.verbose:
390 self.cache_mgr.dump_stats()
391 if self._original_max_pack_count:
392 # We earlier disabled autopacking, creating one pack every
393 # checkpoint instead. We now pack the repository to optimise
394 # how data is stored.
395 self.cache_mgr.clear_all()
396 self._pack_repository()
398 # Finish up by dumping stats & telling the user what to do next.
399 self.dump_stats()
400 if remind_about_update:
401 # This message is explicitly not timestamped.
402 note("To refresh the working tree for other branches, "
403 "use 'bzr update' inside that branch.")
405 def _update_working_trees(self, trees):
406 if self.verbose:
407 reporter = delta._ChangeReporter()
408 else:
409 reporter = None
410 for wt in trees:
411 self.note("Updating the working tree for %s ...", wt.basedir)
412 wt.update(reporter)
413 self._tree_count += 1
415 def _pack_repository(self, final=True):
416 # Before packing, free whatever memory we can and ensure
417 # that groupcompress is configured to optimise disk space
418 import gc
419 if final:
420 try:
421 from bzrlib import groupcompress
422 except ImportError:
423 pass
424 else:
425 groupcompress._FAST = False
426 gc.collect()
427 self.note("Packing repository ...")
428 self.repo.pack()
430 # To be conservative, packing puts the old packs and
431 # indices in obsolete_packs. We err on the side of
432 # optimism and clear out that directory to save space.
433 self.note("Removing obsolete packs ...")
434 # TODO: Use a public API for this once one exists
435 repo_transport = self.repo._pack_collection.transport
436 repo_transport.clone('obsolete_packs').delete_multi(
437 repo_transport.list_dir('obsolete_packs'))
439 # If we're not done, free whatever memory we can
440 if not final:
441 gc.collect()
443 def _get_working_trees(self, branches):
444 """Get the working trees for branches in the repository."""
445 result = []
446 wt_expected = self.repo.make_working_trees()
447 for br in branches:
448 if br is None:
449 continue
450 elif br == self.branch:
451 if self.working_tree:
452 result.append(self.working_tree)
453 elif wt_expected:
454 try:
455 result.append(br.bzrdir.open_workingtree())
456 except errors.NoWorkingTree:
457 self.warning("No working tree for branch %s", br)
458 return result
460 def dump_stats(self):
461 time_required = progress.str_tdelta(time.time() - self._start_time)
462 rc = self._revision_count - self.skip_total
463 bc = self._branch_count
464 wtc = self._tree_count
465 self.note("Imported %d %s, updating %d %s and %d %s in %s",
466 rc, helpers.single_plural(rc, "revision", "revisions"),
467 bc, helpers.single_plural(bc, "branch", "branches"),
468 wtc, helpers.single_plural(wtc, "tree", "trees"),
469 time_required)
471 def _init_id_map(self):
472 """Load the id-map and check it matches the repository.
474 :return: the number of entries in the map
476 # Currently, we just check the size. In the future, we might
477 # decide to be more paranoid and check that the revision-ids
478 # are identical as well.
479 self.cache_mgr.marks, known = idmapfile.load_id_map(
480 self.id_map_path)
481 existing_count = len(self.repo.all_revision_ids())
482 if existing_count < known:
483 raise plugin_errors.BadRepositorySize(known, existing_count)
484 return known
486 def _save_id_map(self):
487 """Save the id-map."""
488 # Save the whole lot every time. If this proves a problem, we can
489 # change to 'append just the new ones' at a later time.
490 idmapfile.save_id_map(self.id_map_path, self.cache_mgr.marks)
492 def blob_handler(self, cmd):
493 """Process a BlobCommand."""
494 if cmd.mark is not None:
495 dataref = cmd.id
496 else:
497 dataref = osutils.sha_strings(cmd.data)
498 self.cache_mgr.store_blob(dataref, cmd.data)
500 def checkpoint_handler(self, cmd):
501 """Process a CheckpointCommand."""
502 # Commit the current write group and start a new one
503 self.repo.commit_write_group()
504 self._save_id_map()
505 # track the number of automatic checkpoints done
506 if cmd is None:
507 self.checkpoint_count += 1
508 if self.checkpoint_count % self.autopack_every == 0:
509 self._pack_repository(final=False)
510 self.repo.start_write_group()
512 def commit_handler(self, cmd):
513 """Process a CommitCommand."""
514 mark = cmd.id.lstrip(':')
515 if self.skip_total and self._revision_count < self.skip_total:
516 self.cache_mgr.reftracker.track_heads(cmd)
517 # Check that we really do know about this commit-id
518 if not self.cache_mgr.marks.has_key(mark):
519 raise plugin_errors.BadRestart(mark)
520 self.cache_mgr._blobs = {}
521 self._revision_count += 1
522 if cmd.ref.startswith('refs/tags/'):
523 tag_name = cmd.ref[len('refs/tags/'):]
524 self._set_tag(tag_name, cmd.id)
525 return
526 if self.first_incremental_commit:
527 self.first_incremental_commit = None
528 parents = self.cache_mgr.reftracker.track_heads(cmd)
530 # 'Commit' the revision and report progress
531 handler = self.commit_handler_factory(cmd, self.cache_mgr,
532 self.rev_store, verbose=self.verbose,
533 prune_empty_dirs=self.prune_empty_dirs)
534 try:
535 handler.process()
536 except:
537 print "ABORT: exception occurred processing commit %s" % (cmd.id)
538 raise
539 self.cache_mgr.add_mark(mark, handler.revision_id)
540 self._revision_count += 1
541 self.report_progress("(%s)" % cmd.id.lstrip(':'))
543 if cmd.ref.startswith('refs/tags/'):
544 tag_name = cmd.ref[len('refs/tags/'):]
545 self._set_tag(tag_name, cmd.id)
547 # Check if we should finish up or automatically checkpoint
548 if (self.max_commits is not None and
549 self._revision_count >= self.max_commits):
550 self.note("Stopping after reaching requested count of commits")
551 self.finished = True
552 elif self._revision_count % self.checkpoint_every == 0:
553 self.note("%d commits - automatic checkpoint triggered",
554 self._revision_count)
555 self.checkpoint_handler(None)
557 def report_progress(self, details=''):
558 if self._revision_count % self.progress_every == 0:
559 if self.total_commits is not None:
560 counts = "%d/%d" % (self._revision_count, self.total_commits)
561 else:
562 counts = "%d" % (self._revision_count,)
563 minutes = (time.time() - self._start_time) / 60
564 revisions_added = self._revision_count - self.skip_total
565 rate = revisions_added * 1.0 / minutes
566 if rate > 10:
567 rate_str = "at %.0f/minute " % rate
568 else:
569 rate_str = "at %.1f/minute " % rate
570 self.note("%s commits processed %s%s" % (counts, rate_str, details))
572 def progress_handler(self, cmd):
573 """Process a ProgressCommand."""
574 # Most progress messages embedded in streams are annoying.
575 # Ignore them unless in verbose mode.
576 if self.verbose:
577 self.note("progress %s" % (cmd.message,))
579 def reset_handler(self, cmd):
580 """Process a ResetCommand."""
581 if cmd.ref.startswith('refs/tags/'):
582 tag_name = cmd.ref[len('refs/tags/'):]
583 if cmd.from_ is not None:
584 self._set_tag(tag_name, cmd.from_)
585 elif self.verbose:
586 self.warning("ignoring reset refs/tags/%s - no from clause"
587 % tag_name)
588 return
590 if cmd.from_ is not None:
591 self.cache_mgr.reftracker.track_heads_for_ref(cmd.ref, cmd.from_)
593 def tag_handler(self, cmd):
594 """Process a TagCommand."""
595 if cmd.from_ is not None:
596 self._set_tag(cmd.id, cmd.from_)
597 else:
598 self.warning("ignoring tag %s - no from clause" % cmd.id)
600 def _set_tag(self, name, from_):
601 """Define a tag given a name and import 'from' reference."""
602 bzr_tag_name = name.decode('utf-8', 'replace')
603 bzr_rev_id = self.cache_mgr.lookup_committish(from_)
604 self.tags[bzr_tag_name] = bzr_rev_id
606 def feature_handler(self, cmd):
607 """Process a FeatureCommand."""
608 feature = cmd.feature_name
609 if feature not in commands.FEATURE_NAMES:
610 raise plugin_errors.UnknownFeature(feature)
612 def debug(self, msg, *args):
613 """Output a debug message if the appropriate -D option was given."""
614 if "fast-import" in debug.debug_flags:
615 msg = "%s DEBUG: %s" % (self._time_of_day(), msg)
616 mutter(msg, *args)
618 def note(self, msg, *args):
619 """Output a note but timestamp it."""
620 msg = "%s %s" % (self._time_of_day(), msg)
621 note(msg, *args)
623 def warning(self, msg, *args):
624 """Output a warning but timestamp it."""
625 msg = "%s WARNING: %s" % (self._time_of_day(), msg)
626 warning(msg, *args)