1 # Copyright (C) 2008 Canonical Ltd
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 """Import processor that supports all Bazaar repository formats."""
28 from bzrlib
.repofmt
import pack_repo
29 from bzrlib
.trace
import note
, mutter
31 import bzrlib
.util
.configobj
.configobj
as configobj
34 from bzrlib
.plugins
.fastimport
import (
39 errors
as plugin_errors
,
48 # How many commits before automatically reporting progress
49 _DEFAULT_AUTO_PROGRESS
= 1000
51 # How many commits before automatically checkpointing
52 _DEFAULT_AUTO_CHECKPOINT
= 10000
54 # How many checkpoints before automatically packing
55 _DEFAULT_AUTO_PACK
= 4
57 # How many inventories to cache
58 _DEFAULT_INV_CACHE_SIZE
= 1
59 _DEFAULT_CHK_INV_CACHE_SIZE
= 1
62 class GenericProcessor(processor
.ImportProcessor
):
63 """An import processor that handles basic imports.
65 Current features supported:
67 * blobs are cached in memory
68 * files and symlinks commits are supported
69 * checkpoints automatically happen at a configurable frequency
70 over and above the stream requested checkpoints
71 * timestamped progress reporting, both automatic and stream requested
72 * some basic statistics are dumped on completion.
74 At checkpoints and on completion, the commit-id -> revision-id map is
75 saved to a file called 'fastimport-id-map'. If the import crashes
76 or is interrupted, it can be started again and this file will be
77 used to skip over already loaded revisions. The format of each line
78 is "commit-id revision-id" so commit-ids cannot include spaces.
80 Here are the supported parameters:
82 * info - name of a hints file holding the analysis generated
83 by running the fast-import-info processor in verbose mode. When
84 importing large repositories, this parameter is needed so
85 that the importer knows what blobs to intelligently cache.
87 * trees - update the working trees before completing.
88 By default, the importer updates the repository
89 and branches and the user needs to run 'bzr update' for the
90 branches of interest afterwards.
92 * count - only import this many commits then exit. If not set
93 or negative, all commits are imported.
95 * checkpoint - automatically checkpoint every n commits over and
96 above any checkpoints contained in the import stream.
99 * autopack - pack every n checkpoints. The default is 4.
101 * inv-cache - number of inventories to cache.
102 If not set, the default is 1.
104 * mode - import algorithm to use: default, experimental or classic.
106 * import-marks - name of file to read to load mark information from
108 * export-marks - name of file to write to save mark information to
123 def __init__(self
, bzrdir
, params
=None, verbose
=False, outf
=None,
124 prune_empty_dirs
=True):
125 processor
.ImportProcessor
.__init
__(self
, bzrdir
, params
, verbose
)
126 self
.prune_empty_dirs
= prune_empty_dirs
128 def pre_process(self
):
129 self
._start
_time
= time
.time()
130 self
._load
_info
_and
_params
()
131 if self
.total_commits
:
132 self
.note("Starting import of %d commits ..." %
133 (self
.total_commits
,))
135 self
.note("Starting import ...")
136 self
.cache_mgr
= cache_manager
.CacheManager(self
.info
, self
.verbose
,
137 self
.inventory_cache_size
)
139 if self
.params
.get("import-marks") is not None:
140 mark_info
= marks_file
.import_marks(self
.params
.get("import-marks"))
141 if mark_info
is not None:
142 self
.cache_mgr
.revision_ids
= mark_info
[0]
143 self
.skip_total
= False
144 self
.first_incremental_commit
= True
146 self
.first_incremental_commit
= False
147 self
.skip_total
= self
._init
_id
_map
()
149 self
.note("Found %d commits already loaded - "
150 "skipping over these ...", self
.skip_total
)
151 self
._revision
_count
= 0
153 # mapping of tag name to revision_id
156 # Create the revision store to use for committing, if any
157 self
.rev_store
= self
._revision
_store
_factory
()
159 # Disable autopacking if the repo format supports it.
160 # THIS IS A HACK - there is no sanctioned way of doing this yet.
161 if isinstance(self
.repo
, pack_repo
.KnitPackRepository
):
162 self
._original
_max
_pack
_count
= \
163 self
.repo
._pack
_collection
._max
_pack
_count
164 def _max_pack_count_for_import(total_revisions
):
165 return total_revisions
+ 1
166 self
.repo
._pack
_collection
._max
_pack
_count
= \
167 _max_pack_count_for_import
169 self
._original
_max
_pack
_count
= None
171 # Make groupcompress use the fast algorithm during importing.
172 # We want to repack at the end anyhow when more information
173 # is available to do a better job of saving space.
175 from bzrlib
import groupcompress
176 groupcompress
._FAST
= True
180 # Create a write group. This is committed at the end of the import.
181 # Checkpointing closes the current one and starts a new one.
182 self
.repo
.start_write_group()
184 def _load_info_and_params(self
):
185 self
._mode
= bool(self
.params
.get('mode', 'default'))
186 self
._experimental
= self
._mode
== 'experimental'
188 # This is currently hard-coded but might be configurable via
189 # parameters one day if that's needed
190 repo_transport
= self
.repo
.control_files
._transport
191 self
.id_map_path
= repo_transport
.local_abspath("fastimport-id-map")
193 # Load the info file, if any
194 info_path
= self
.params
.get('info')
195 if info_path
is not None:
196 self
.info
= configobj
.ConfigObj(info_path
)
200 # Decide which CommitHandler to use
201 self
.supports_chk
= getattr(self
.repo
._format
, 'supports_chks', False)
202 if self
.supports_chk
and self
._mode
== 'classic':
203 note("Cannot use classic algorithm on CHK repositories"
204 " - using default one instead")
205 self
._mode
= 'default'
206 if self
._mode
== 'classic':
207 self
.commit_handler_factory
= \
208 bzr_commit_handler
.InventoryCommitHandler
210 self
.commit_handler_factory
= \
211 bzr_commit_handler
.InventoryDeltaCommitHandler
213 # Decide how often to automatically report progress
214 # (not a parameter yet)
215 self
.progress_every
= _DEFAULT_AUTO_PROGRESS
217 self
.progress_every
= self
.progress_every
/ 10
219 # Decide how often (# of commits) to automatically checkpoint
220 self
.checkpoint_every
= int(self
.params
.get('checkpoint',
221 _DEFAULT_AUTO_CHECKPOINT
))
223 # Decide how often (# of checkpoints) to automatically pack
224 self
.checkpoint_count
= 0
225 self
.autopack_every
= int(self
.params
.get('autopack',
228 # Decide how big to make the inventory cache
229 cache_size
= int(self
.params
.get('inv-cache', -1))
231 if self
.supports_chk
:
232 cache_size
= _DEFAULT_CHK_INV_CACHE_SIZE
234 cache_size
= _DEFAULT_INV_CACHE_SIZE
235 self
.inventory_cache_size
= cache_size
237 # Find the maximum number of commits to import (None means all)
238 # and prepare progress reporting. Just in case the info file
239 # has an outdated count of commits, we store the max counts
240 # at which we need to terminate separately to the total used
241 # for progress tracking.
243 self
.max_commits
= int(self
.params
['count'])
244 if self
.max_commits
< 0:
245 self
.max_commits
= None
247 self
.max_commits
= None
248 if self
.info
is not None:
249 self
.total_commits
= int(self
.info
['Command counts']['commit'])
250 if (self
.max_commits
is not None and
251 self
.total_commits
> self
.max_commits
):
252 self
.total_commits
= self
.max_commits
254 self
.total_commits
= self
.max_commits
256 def _revision_store_factory(self
):
257 """Make a RevisionStore based on what the repository supports."""
258 new_repo_api
= hasattr(self
.repo
, 'revisions')
260 return revision_store
.RevisionStore2(self
.repo
)
261 elif not self
._experimental
:
262 return revision_store
.RevisionStore1(self
.repo
)
264 def fulltext_when(count
):
265 total
= self
.total_commits
266 if total
is not None and count
== total
:
269 # Create an inventory fulltext every 200 revisions
270 fulltext
= count
% 200 == 0
272 self
.note("%d commits - storing inventory as full-text",
276 return revision_store
.ImportRevisionStore1(
277 self
.repo
, self
.inventory_cache_size
,
278 fulltext_when
=fulltext_when
)
280 def _process(self
, command_iter
):
281 # if anything goes wrong, abort the write group if any
283 processor
.ImportProcessor
._process
(self
, command_iter
)
285 if self
.repo
is not None and self
.repo
.is_in_write_group():
286 self
.repo
.abort_write_group()
289 def post_process(self
):
290 # Commit the current write group and checkpoint the id map
291 self
.repo
.commit_write_group()
294 if self
.params
.get("export-marks") is not None:
295 marks_file
.export_marks(self
.params
.get("export-marks"),
296 self
.cache_mgr
.revision_ids
)
298 if self
.cache_mgr
.last_ref
== None:
299 """Nothing to refresh"""
302 # Update the branches
303 self
.note("Updating branch information ...")
304 updater
= branch_updater
.BranchUpdater(self
.repo
, self
.branch
,
305 self
.cache_mgr
, helpers
.invert_dictset(self
.cache_mgr
.heads
),
306 self
.cache_mgr
.last_ref
, self
.tags
)
307 branches_updated
, branches_lost
= updater
.update()
308 self
._branch
_count
= len(branches_updated
)
310 # Tell the user about branches that were not created
312 if not self
.repo
.is_shared():
313 self
.warning("Cannot import multiple branches into "
314 "a standalone branch")
315 self
.warning("Not creating branches for these head revisions:")
316 for lost_info
in branches_lost
:
317 head_revision
= lost_info
[1]
318 branch_name
= lost_info
[0]
319 self
.note("\t %s = %s", head_revision
, branch_name
)
321 # Update the working trees as requested
323 remind_about_update
= True
324 if self
._branch
_count
== 0:
325 self
.note("no branches to update")
326 self
.note("no working trees to update")
327 remind_about_update
= False
328 elif self
.params
.get('trees', False):
329 trees
= self
._get
_working
_trees
(branches_updated
)
331 self
._update
_working
_trees
(trees
)
332 remind_about_update
= False
334 self
.warning("No working trees available to update")
336 # Update just the trunk. (This is always the first branch
337 # returned by the branch updater.)
338 trunk_branch
= branches_updated
[0]
339 trees
= self
._get
_working
_trees
([trunk_branch
])
341 self
._update
_working
_trees
(trees
)
342 remind_about_update
= self
._branch
_count
> 1
344 # Dump the cache stats now because we clear it before the final pack
346 self
.cache_mgr
.dump_stats()
347 if self
._original
_max
_pack
_count
:
348 # We earlier disabled autopacking, creating one pack every
349 # checkpoint instead. We now pack the repository to optimise
350 # how data is stored.
351 self
.cache_mgr
.clear_all()
352 self
._pack
_repository
()
354 # Finish up by dumping stats & telling the user what to do next.
356 if remind_about_update
:
357 # This message is explicitly not timestamped.
358 note("To refresh the working tree for other branches, "
359 "use 'bzr update' inside that branch.")
361 def _update_working_trees(self
, trees
):
363 reporter
= delta
._ChangeReporter
()
367 self
.note("Updating the working tree for %s ...", wt
.basedir
)
369 self
._tree
_count
+= 1
371 def _pack_repository(self
, final
=True):
372 # Before packing, free whatever memory we can and ensure
373 # that groupcompress is configured to optimise disk space
377 from bzrlib
import groupcompress
381 groupcompress
._FAST
= False
383 self
.note("Packing repository ...")
386 # To be conservative, packing puts the old packs and
387 # indices in obsolete_packs. We err on the side of
388 # optimism and clear out that directory to save space.
389 self
.note("Removing obsolete packs ...")
390 # TODO: Use a public API for this once one exists
391 repo_transport
= self
.repo
._pack
_collection
.transport
392 repo_transport
.clone('obsolete_packs').delete_multi(
393 repo_transport
.list_dir('obsolete_packs'))
395 # If we're not done, free whatever memory we can
399 def _get_working_trees(self
, branches
):
400 """Get the working trees for branches in the repository."""
402 wt_expected
= self
.repo
.make_working_trees()
406 elif br
== self
.branch
:
407 if self
.working_tree
:
408 result
.append(self
.working_tree
)
411 result
.append(br
.bzrdir
.open_workingtree())
412 except errors
.NoWorkingTree
:
413 self
.warning("No working tree for branch %s", br
)
416 def dump_stats(self
):
417 time_required
= progress
.str_tdelta(time
.time() - self
._start
_time
)
418 rc
= self
._revision
_count
- self
.skip_total
419 bc
= self
._branch
_count
420 wtc
= self
._tree
_count
421 self
.note("Imported %d %s, updating %d %s and %d %s in %s",
422 rc
, helpers
.single_plural(rc
, "revision", "revisions"),
423 bc
, helpers
.single_plural(bc
, "branch", "branches"),
424 wtc
, helpers
.single_plural(wtc
, "tree", "trees"),
427 def _init_id_map(self
):
428 """Load the id-map and check it matches the repository.
430 :return: the number of entries in the map
432 # Currently, we just check the size. In the future, we might
433 # decide to be more paranoid and check that the revision-ids
434 # are identical as well.
435 self
.cache_mgr
.revision_ids
, known
= idmapfile
.load_id_map(
437 existing_count
= len(self
.repo
.all_revision_ids())
438 if existing_count
< known
:
439 raise plugin_errors
.BadRepositorySize(known
, existing_count
)
442 def _save_id_map(self
):
443 """Save the id-map."""
444 # Save the whole lot every time. If this proves a problem, we can
445 # change to 'append just the new ones' at a later time.
446 idmapfile
.save_id_map(self
.id_map_path
, self
.cache_mgr
.revision_ids
)
448 def blob_handler(self
, cmd
):
449 """Process a BlobCommand."""
450 if cmd
.mark
is not None:
453 dataref
= osutils
.sha_strings(cmd
.data
)
454 self
.cache_mgr
.store_blob(dataref
, cmd
.data
)
456 def checkpoint_handler(self
, cmd
):
457 """Process a CheckpointCommand."""
458 # Commit the current write group and start a new one
459 self
.repo
.commit_write_group()
461 # track the number of automatic checkpoints done
463 self
.checkpoint_count
+= 1
464 if self
.checkpoint_count
% self
.autopack_every
== 0:
465 self
._pack
_repository
(final
=False)
466 self
.repo
.start_write_group()
468 def commit_handler(self
, cmd
):
469 """Process a CommitCommand."""
470 if self
.skip_total
and self
._revision
_count
< self
.skip_total
:
471 self
.cache_mgr
.track_heads(cmd
)
472 # Check that we really do know about this commit-id
473 if not self
.cache_mgr
.revision_ids
.has_key(cmd
.id):
474 raise plugin_errors
.BadRestart(cmd
.id)
475 # Consume the file commands and free any non-sticky blobs
476 for fc
in cmd
.file_iter():
478 self
.cache_mgr
._blobs
= {}
479 self
._revision
_count
+= 1
480 if cmd
.ref
.startswith('refs/tags/'):
481 tag_name
= cmd
.ref
[len('refs/tags/'):]
482 self
._set
_tag
(tag_name
, cmd
.id)
484 if self
.first_incremental_commit
:
485 self
.first_incremental_commit
= None
486 parents
= self
.cache_mgr
.track_heads(cmd
)
488 # 'Commit' the revision and report progress
489 handler
= self
.commit_handler_factory(cmd
, self
.cache_mgr
,
490 self
.rev_store
, verbose
=self
.verbose
,
491 prune_empty_dirs
=self
.prune_empty_dirs
)
495 print "ABORT: exception occurred processing commit %s" % (cmd
.id)
497 self
.cache_mgr
.revision_ids
[cmd
.id] = handler
.revision_id
498 self
._revision
_count
+= 1
499 self
.report_progress("(%s)" % cmd
.id)
501 if cmd
.ref
.startswith('refs/tags/'):
502 tag_name
= cmd
.ref
[len('refs/tags/'):]
503 self
._set
_tag
(tag_name
, cmd
.id)
505 # Check if we should finish up or automatically checkpoint
506 if (self
.max_commits
is not None and
507 self
._revision
_count
>= self
.max_commits
):
508 self
.note("Stopping after reaching requested count of commits")
510 elif self
._revision
_count
% self
.checkpoint_every
== 0:
511 self
.note("%d commits - automatic checkpoint triggered",
512 self
._revision
_count
)
513 self
.checkpoint_handler(None)
515 def report_progress(self
, details
=''):
516 if self
._revision
_count
% self
.progress_every
== 0:
517 if self
.total_commits
is not None:
518 counts
= "%d/%d" % (self
._revision
_count
, self
.total_commits
)
520 counts
= "%d" % (self
._revision
_count
,)
521 minutes
= (time
.time() - self
._start
_time
) / 60
522 revisions_added
= self
._revision
_count
- self
.skip_total
523 rate
= revisions_added
* 1.0 / minutes
525 rate_str
= "at %.0f/minute " % rate
527 rate_str
= "at %.1f/minute " % rate
528 self
.note("%s commits processed %s%s" % (counts
, rate_str
, details
))
530 def progress_handler(self
, cmd
):
531 """Process a ProgressCommand."""
532 # Most progress messages embedded in streams are annoying.
533 # Ignore them unless in verbose mode.
535 self
.note("progress %s" % (cmd
.message
,))
537 def reset_handler(self
, cmd
):
538 """Process a ResetCommand."""
539 if cmd
.ref
.startswith('refs/tags/'):
540 tag_name
= cmd
.ref
[len('refs/tags/'):]
541 if cmd
.from_
is not None:
542 self
._set
_tag
(tag_name
, cmd
.from_
)
544 self
.warning("ignoring reset refs/tags/%s - no from clause"
548 if cmd
.from_
is not None:
549 self
.cache_mgr
.track_heads_for_ref(cmd
.ref
, cmd
.from_
)
551 def tag_handler(self
, cmd
):
552 """Process a TagCommand."""
553 if cmd
.from_
is not None:
554 self
._set
_tag
(cmd
.id, cmd
.from_
)
556 self
.warning("ignoring tag %s - no from clause" % cmd
.id)
558 def _set_tag(self
, name
, from_
):
559 """Define a tag given a name and import 'from' reference."""
560 bzr_tag_name
= name
.decode('utf-8', 'replace')
561 bzr_rev_id
= self
.cache_mgr
.revision_ids
[from_
]
562 self
.tags
[bzr_tag_name
] = bzr_rev_id
564 def feature_handler(self
, cmd
):
565 """Process a FeatureCommand."""
566 feature
= cmd
.feature_name
567 if feature
not in commands
.FEATURE_NAMES
:
568 raise plugin_errors
.UnknownFeature(feature
)