1 package File
::Rsync
::Mirror
::Recentfile
;
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
19 $HAVE->{$package} = eval qq{ require $package; };
22 use File
::Basename
qw(basename dirname fileparse);
23 use File
::Copy
qw(cp);
24 use File
::Path
qw(mkpath);
25 use File
::Rsync
::Mirror
::Recentfile
::FakeBigFloat
qw(:all);
27 use List
::Util
qw(first max min);
28 use Scalar
::Util
qw(reftype);
33 use version
; our $VERSION = qv
('0.0.1');
35 use constant MAX_INT
=> ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL
=> 1;
41 # maybe subclass if this mapping is bad?
46 B<!!!! PRE-ALPHA ALERT !!!!>
48 Nothing in here is believed to be stable, nothing yet intended for
49 public consumption. The plan is to provide scripts that act as
50 frontends for all the backend functionality. Option and method names
51 will very likely change.
53 For the rationale see the section BACKGROUND.
55 This is published only for developers of the (yet to be named)
58 Writer (of a single file):
60 use File::Rsync::Mirror::Recentfile;
61 my $fr = File::Rsync::Mirror::Recentfile->new
64 filenameroot => "RECENT",
65 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
66 localroot => "/home/ftp/pub/PAUSE/authors/",
67 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
69 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
73 my $rf = File::Rsync::Mirror::Recentfile->new
75 filenameroot => "RECENT",
76 ignore_link_stat_errors => 1,
78 localroot => "/home/ftp/pub/PAUSE/authors",
80 remote_host => "pause.perl.org",
81 remote_module => "authors",
84 'rsync-path' => '/usr/bin/rsync',
87 'omit-dir-times' => 1,
94 Aggregator (usually the writer):
96 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
103 =head1 CONSTRUCTORS / DESTRUCTOR
105 =head2 my $obj = CLASS->new(%hash)
107 Constructor. On every argument pair the key is a method name and the
108 value is an argument to that method name.
110 If a recentfile for this resource already exists, metadata that are
111 not defined by the constructor will be fetched from there as soon as
112 it is being read by recent_events().
117 my($class, @args) = @_;
118 my $self = bless {}, $class;
120 my($method,$arg) = splice @args, 0, 2;
121 $self->$method($arg);
123 unless (defined $self->protocol) {
124 $self->protocol(DEFAULT_PROTOCOL
);
126 unless (defined $self->filenameroot) {
127 $self->filenameroot("RECENT");
129 unless (defined $self->serializer_suffix) {
130 $self->serializer_suffix(".yaml");
135 =head2 my $obj = CLASS->new_from_file($file)
137 Constructor. $file is a I<recentfile>.
142 my($class, $file) = @_;
143 my $self = bless {}, $class;
144 $self->_rfile($file);
146 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
150 # XXX: we can skip this step when the metadata are sufficient, but
151 # we cannot parse the file without some magic stuff about
154 my($name,$path) = fileparse
$file;
155 my $symlink = readlink $file;
156 if ($symlink =~ m
|/|) {
157 die "FIXME: filenames containing '/' not supported, got $symlink";
159 $file = File
::Spec
->catfile ( $path, $symlink );
161 my($name,$path,$suffix) = fileparse
$file, keys %serializers;
162 $self->serializer_suffix($suffix);
163 $self->localroot($path);
164 die "Could not determine file format from suffix" unless $suffix;
166 if ($suffix eq ".yaml") {
168 $deserialized = YAML
::Syck
::LoadFile
($file);
169 } elsif ($HAVE->{"Data::Serializer"}) {
170 my $serializer = Data
::Serializer
->new
171 ( serializer
=> $serializers{$suffix} );
172 $deserialized = $serializer->raw_deserialize($serialized);
174 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
176 while (my($k,$v) = each %{$deserialized->{meta
}}) {
177 next if $k ne lc $k; # "Producers"
180 unless (defined $self->protocol) {
181 $self->protocol(DEFAULT_PROTOCOL
);
191 sub DESTROY
{ shift->unlock }
202 "_current_tempfile_fh",
203 "_delayed_operations",
210 "_remember_last_uptodate_call",
216 "_uptodateness_ever_reached",
221 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
227 A list of interval specs that tell the aggregator which I<recentfile>s
232 The name of a method to canonize the path before rsyncing. Only
233 supported value is C<naive_path_normalize>. Defaults to that.
237 A comment about this tree and setup.
241 A timestamp. The dirtymark is updated whenever an out of band change
242 on the origin server is performed that violates the protocol. Say,
243 they add or remove files in the middle somewhere. Slaves must react
244 with a devaluation of their C<done> structure which then leads to a
245 full re-sync of all files.
249 The (prefix of the) filename we use for this I<recentfile>. Defaults to
250 C<RECENT>. The string must not contain a directory separator.
254 Timestamp remembering when we mirrored this recentfile the last time.
255 Only relevant for slaves.
257 =item ignore_link_stat_errors
259 If set to true, rsync errors are ignored that complain about link stat
260 errors. These seem to happen only when there are files missing at the
261 origin. In race conditions this can always happen, so it is
262 recommended to set this value to true.
266 If set to true, this object will fetch a new recentfile from remote
267 when the timespan between the last mirror (see have_mirrored) and now
268 is too large (currently hardcoded arbitrary 420 seconds).
272 After how many seconds shall we die if we cannot lock a I<recentfile>?
273 Defaults to 600 seconds.
277 When mirror_loop is called, this accessor can specify how much time
278 every loop shall at least take. If the work of a loop is done before
279 that time has gone, sleeps for the rest of the time. Defaults to
280 arbitrary 42 seconds.
282 =item max_files_per_connection
284 Maximum number of files that are transferred on a single rsync call.
285 Setting it higher means higher performance at the price of holding
286 connections longer and potentially disturbing other users in the pool.
287 Defaults to the arbitrary value 42.
289 =item max_rsync_errors
291 When rsync operations encounter that many errors without any resetting
292 success in between, then we die. Defaults to unlimited. A value of
293 -1 means we run forever ignoring all rsync errors.
297 Hashref remembering when we read the recent_events from this file the
298 last time and what the timespan was.
302 When the RECENT file format changes, we increment the protocol. We try
303 to support older protocols in later releases.
307 The host we are mirroring from. Leave empty for the local filesystem.
311 Rsync servers have so called modules to separate directory trees from
312 each other. Put here the name of the module under which we are
313 mirroring. Leave empty for local filesystem.
317 Things like compress, links, times or checksums. Passed in to the
318 File::Rsync object used to run the mirror.
320 =item serializer_suffix
322 Mostly untested accessor. The only well tested format for
323 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
324 Data::Serializer. But in principle other formats are supported as
325 well. See section SERIALIZERS below.
327 =item sleep_per_connection
329 Sleep that many seconds (floating point OK) after every chunk of rsyncing
330 has finished. Defaults to arbitrary 0.42.
334 Time to live. Number of seconds after which this recentfile must be
335 fetched again from the origin server. Only relevant for slaves.
336 Defaults to arbitrary 24.2 seconds.
340 Boolean to turn on a bit verbosity.
346 use accessors
@accessors;
350 =head2 (void) $obj->aggregate( %options )
352 Takes all intervals that are collected in the accessor called
353 aggregator. Sorts them by actual length of the interval.
354 Removes those that are shorter than our own interval. Then merges this
355 object into the next larger object. The merging continues upwards
356 as long as the next I<recentfile> is old enough to warrant a merge.
358 If a merge is warranted is decided according to the interval of the
359 previous interval so that larger files are not so often updated as
360 smaller ones. If $options{force} is true, all files get updated.
362 Here is an example to illustrate the behaviour. Given aggregators
368 1h updates 1d on every call to aggregate()
369 1d updates 1W earliest after 1h
370 1W updates 1M earliest after 1d
371 1M updates 1Q earliest after 1W
372 1Q updates 1Y earliest after 1M
373 1Y updates Z earliest after 1Q
375 Note that all but the smallest recentfile get updated at an arbitrary
376 rate and as such are quite useless on their own.
381 my($self, %option) = @_;
382 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
383 grep { $_->{secs
} >= $self->interval_secs }
384 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
385 $self->interval, @
{$self->aggregator || []};
387 $aggs[0]{object
} = $self;
388 AGGREGATOR
: for my $i (0..$#aggs-1) {
389 my $this = $aggs[$i]{object
};
390 my $next = $this->_sparse_clone;
391 $next->interval($aggs[$i+1]{interval
});
393 if ($option{force
} || $i == 0) {
396 my $next_rfile = $next->rfile;
397 if (-e
$next_rfile) {
398 my $prev = $aggs[$i-1]{object
};
400 my $next_age = 86400 * -M
$next_rfile;
401 if ($next_age > $prev->interval_secs) {
410 $aggs[$i+1]{object
} = $next;
417 # collect file size and mtime for all files of this aggregate
418 sub _debug_aggregate
{
420 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
421 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
422 $self->interval, @
{$self->aggregator || []};
424 for my $i (0..$#aggs) {
425 my $this = Storable
::dclone
$self;
426 $this->interval($aggs[$i]{interval
});
427 my $rfile = $this->rfile;
428 my @stat = stat $rfile;
429 push @
$report, {rfile
=> $rfile, size
=> $stat[7], mtime
=> $stat[9]};
434 # (void) $self->_assert_symlink()
435 sub _assert_symlink
{
437 my $recentrecentfile = File
::Spec
->catfile
446 if ($Config{d_symlink
} eq "define") {
447 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
448 if (-l
$recentrecentfile) {
449 my $found_symlink = readlink $recentrecentfile;
450 if ($found_symlink eq $self->rfilename) {
453 $howto_create_symlink = 2;
456 $howto_create_symlink = 1;
458 if (1 == $howto_create_symlink) {
459 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
461 unlink "$recentrecentfile.$$"; # may fail
462 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
463 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
466 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
467 unlink "$recentrecentfile.$$"; # may fail
468 cp
$self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
469 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
473 =head2 $hashref = $obj->delayed_operations
475 A hash of hashes containing unlink and rmdir operations which had to
476 wait until the recentfile got unhidden in order to not confuse
477 downstream mirrors (in case we have some).
481 sub delayed_operations
{
483 my $x = $self->_delayed_operations;
484 unless (defined $x) {
489 $self->_delayed_operations ($x);
494 =head2 $done = $obj->done
496 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
497 that keeps track of rsync activities. Only needed and used when we are
504 my $done = $self->_done;
506 require File
::Rsync
::Mirror
::Recentfile
::Done
;
507 $done = File
::Rsync
::Mirror
::Recentfile
::Done
->new();
508 $done->_rfinterval ($self->interval);
509 $self->_done ( $done );
514 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
516 Stores the remote I<recentfile> locally as a tempfile. The caller is
517 responsible to remove the file after use.
519 Note: if you're intending to act as an rsync server for other slaves,
520 then you must prefer this method to fetch that file with
521 get_remotefile(). Otherwise downstream mirrors would expect you to
522 already have mirrored all the files that are in the I<recentfile>
523 before you have them mirrored.
527 sub get_remote_recentfile_as_tempfile
{
529 mkpath
$self->localroot;
532 if ( $self->_use_tempfile() ) {
533 return $self->_current_tempfile if ! $self->ttl_reached;
534 $fh = $self->_current_tempfile_fh;
535 $trfilename = $self->rfilename;
537 $trfilename = $self->rfilename;
542 $dst = $self->_current_tempfile;
544 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
545 $dst = $fh->filename;
546 $self->_current_tempfile ($dst);
547 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
548 if (defined $rfile && -e
$rfile) {
549 # saving on bandwidth. Might need to be configurable
550 # $self->bandwidth_is_cheap?
551 cp
$rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
558 if ($self->verbose) {
559 my $doing = -e
$dst ?
"Sync" : "Get";
560 my $display_dst = join "/", "...", basename
(dirname
($dst)), basename
($dst);
563 "%-4s %d (1/1/%s) temp %s ... ",
572 while (!$self->rsync->exec(
576 $self->register_rsync_error ($self->rsync->err);
577 if (++$retried >= 3) {
578 warn "XXX giving up";
584 printf STDERR
"Warning: gave up mirroring %s, will try again later", $self->interval;
586 $self->_refresh_internals ($dst);
587 $self->have_mirrored (Time
::HiRes
::time);
588 $self->un_register_rsync_error ();
590 if ($self->verbose) {
591 print STDERR
"DONE\n";
594 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
598 sub _get_remote_rat_provide_tempfile_object
{
599 my($self, $trfilename) = @_;
600 my $fh = File
::Temp
->new
601 (TEMPLATE
=> sprintf(".FRMRecent-%s-XXXX",
604 DIR
=> $self->localroot,
605 SUFFIX
=> $self->serializer_suffix,
606 UNLINK
=> $self->_use_tempfile,
608 if ($self->_use_tempfile) {
609 $self->_current_tempfile_fh ($fh); # delay self destruction
614 =head2 $localpath = $obj->get_remotefile ( $relative_path )
616 Rsyncs one single remote file to local filesystem.
618 Note: no locking is done on this file. Any number of processes may
621 Note II: do not use for recentfiles. If you are a cascading
622 slave/server combination, it would confuse other slaves. They would
623 expect the contents of these recentfiles to be available. Use
624 get_remote_recentfile_as_tempfile() instead.
629 my($self, $path) = @_;
630 my $dst = File
::Spec
->catfile($self->localroot, $path);
632 if ($self->verbose) {
633 my $doing = -e
$dst ?
"Sync" : "Get";
636 "%-4s %d (1/1/%s) %s ... ",
643 while (!$self->rsync->exec(
649 $self->register_rsync_error ($self->rsync->err);
651 $self->un_register_rsync_error ();
652 if ($self->verbose) {
653 print STDERR
"DONE\n";
658 =head2 $obj->interval ( $interval_spec )
660 Get/set accessor. $interval_spec is a string and described below in
661 the section INTERVAL SPEC.
666 my ($self, $interval) = @_;
668 $self->_interval($interval);
669 $self->_rfile(undef);
671 $interval = $self->_interval;
672 unless (defined $interval) {
673 # do not ask the $self too much, it recurses!
675 Carp
::confess
("Alert: interval undefined for '".$self."'. Cannot continue.");
680 =head2 $secs = $obj->interval_secs ( $interval_spec )
682 $interval_spec is described below in the section INTERVAL SPEC. If
683 empty defaults to the inherent interval for this object.
688 my ($self, $interval) = @_;
689 $interval ||= $self->interval;
690 unless (defined $interval) {
691 die "interval_secs() called without argument on an object without a declared one";
693 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
694 die "Could not determine seconds from interval[$interval]";
695 if ($interval eq "Z") {
697 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
698 return $seconds{$t}*$n;
700 die "Invalid interval specification: n[$n]t[$t]";
704 =head2 $obj->localroot ( $localroot )
706 Get/set accessor. The local root of the tree.
711 my ($self, $localroot) = @_;
713 $self->_localroot($localroot);
714 $self->_rfile(undef);
716 $localroot = $self->_localroot;
719 =head2 $ret = $obj->local_path($path_found_in_recentfile)
721 Combines the path to our local mirror and the path of an object found
722 in this I<recentfile>. In other words: the target of a mirror operation.
724 Implementation note: We split on slashes and then use
725 File::Spec::catfile to adjust to the local operating system.
730 my($self,$path) = @_;
731 unless (defined $path) {
732 # seems like a degenerated case
733 return $self->localroot;
735 my @p = split m
|/|, $path;
736 File
::Spec
->catfile($self->localroot,@p);
739 =head2 (void) $obj->lock
741 Locking is implemented with an C<mkdir> on a locking directory
742 (C<.lock> appended to $rfile).
748 # not using flock because it locks on filehandles instead of
749 # old school ressources.
750 my $locked = $self->_is_locked and return;
751 my $rfile = $self->rfile;
752 # XXX need a way to allow breaking the lock
754 my $locktimeout = $self->locktimeout || 600;
755 while (not mkdir "$rfile.lock") {
756 Time
::HiRes
::sleep 0.01;
757 if (time - $start > $locktimeout) {
758 die "Could not acquire lockdirectory '$rfile.lock': $!";
761 $self->_is_locked (1);
764 =head2 (void) $obj->merge ($other)
766 Bulk update of this object with another one. It's used to merge a
767 smaller and younger $other object into the current one. If this file
768 is a C<Z> file, then we do not merge in objects of type C<delete>. But
769 if we encounter an object of type delete we delete the corresponding
770 C<new> object if we have it.
772 If there is nothing to be merged, nothing is done.
777 my($self, $other) = @_;
778 $self->_merge_sanitycheck ( $other );
780 my $other_recent = $other->recent_events || [];
781 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
783 my $my_recent = $self->recent_events || [];
785 # calculate the target time span
786 my $myepoch = $my_recent->[0] ?
$my_recent->[0]{epoch
} : undef;
787 my $epoch = $other_recent->[0] ?
$other_recent->[0]{epoch
} : $myepoch;
788 my $oldest_allowed = 0;
790 unless ($my_recent->[0]) {
795 if (_bigfloatgt
($other->dirtymark, $self->dirtymark||0)) {
798 } elsif (my $merged = $self->merged) {
799 my $secs = $self->interval_secs();
800 $oldest_allowed = min
($epoch - $secs, $merged->{epoch
}||0);
801 if (@
$other_recent and
802 _bigfloatlt
($other_recent->[-1]{epoch
}, $oldest_allowed)
804 $oldest_allowed = $other_recent->[-1]{epoch
};
807 while (@
$my_recent && _bigfloatlt
($my_recent->[-1]{epoch
}, $oldest_allowed)) {
814 my $other_recent_filtered = [];
815 for my $oev (@
$other_recent) {
816 my $oevepoch = $oev->{epoch
} || 0;
817 next if _bigfloatlt
($oevepoch, $oldest_allowed);
818 my $path = $oev->{path
};
819 next if $have_path{$path}++;
820 if ( $self->interval eq "Z"
821 and $oev->{type
} eq "delete") {
824 if (!$myepoch || _bigfloatgt
($oevepoch, $myepoch)) {
827 push @
$other_recent_filtered, { epoch
=> $oev->{epoch
}, path
=> $path, type
=> $oev->{type
} };
830 if ($something_done) {
831 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \
%have_path, $epoch);
837 sub _merge_something_done
{
838 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
840 my $epoch_conflict = 0;
842 ZIP
: while (@
$other_recent_filtered || @
$my_recent) {
845 @
$other_recent_filtered && _bigfloatge
($other_recent_filtered->[0]{epoch
},$my_recent->[0]{epoch
})) {
846 $event = shift @
$other_recent_filtered;
848 $event = shift @
$my_recent;
849 next ZIP
if $have_path->{$event->{path
}}++;
851 $epoch_conflict=1 if defined $last_epoch && $event->{epoch
} eq $last_epoch;
852 $last_epoch = $event->{epoch
};
853 push @
$recent, $event;
855 if ($epoch_conflict) {
857 for (my $i = $#$recent;$i>=0;$i--) {
858 my $epoch = $recent->[$i]{epoch
};
859 if ($have_epoch{$epoch}++) {
860 while ($have_epoch{$epoch}) {
861 $epoch = _increase_a_bit
($epoch);
863 $recent->[$i]{epoch
} = $epoch;
864 $have_epoch{$epoch}++;
868 if (!$self->dirtymark || _bigfloatgt
($other->dirtymark, $self->dirtymark)) {
869 $self->dirtymark ( $other->dirtymark );
871 $self->write_recent($recent);
873 time => Time
::HiRes
::time, # not used anywhere
874 epoch
=> $recent->[0]{epoch
},
875 into_interval
=> $self->interval, # not used anywhere
877 $other->write_recent($other_recent);
880 sub _merge_sanitycheck
{
881 my($self, $other) = @_;
882 if ($self->interval_secs <= $other->interval_secs) {
885 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
886 $self->interval_secs,
887 $other->interval_secs,
894 Hashref denoting when this recentfile has been merged into some other
900 my($self, $set) = @_;
902 $self->_merged ($set);
904 my $merged = $self->_merged;
906 if ($merged and $into = $merged->{into_interval
} and defined $self->_interval) {
908 if ($into eq $self->interval) {
912 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
916 } elsif ($self->interval_secs($into) < $self->interval_secs) {
920 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
921 $self->interval_secs($into),
922 $self->interval_secs,
930 =head2 $hashref = $obj->meta_data
932 Returns the hashref of metadata that the server has to add to the
939 my $ret = $self->{meta
};
956 # XXX need to reset the Producer if I am a writer, keep it when I
958 $ret->{Producers
} ||= {
959 __PACKAGE__
, "$VERSION", # stringified it looks better
961 'time', Time
::HiRes
::time,
963 $ret->{dirtymark
} ||= Time
::HiRes
::time;
967 =head2 $success = $obj->mirror ( %options )
969 Mirrors the files in this I<recentfile> as reported by
970 C<recent_events>. Options named C<after>, C<before>, C<max>, and
971 C<skip-deletes> are passed through to the L<recent_events> call. The
972 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
973 C<max_files_per_connection> and keep track of the rsynced files so
974 that future calls will rsync different files until all files are
980 my($self, %options) = @_;
981 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
982 $self->_use_tempfile (1);
983 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
984 my ($recent_events) = $self->recent_events(%passthrough);
985 my(@error, @xcollector);
987 my $last_item = $#$recent_events;
988 my $done = $self->done;
989 my $pathdb = $self->_pathdb;
990 ITEM
: for my $i ($first_item..$last_item) {
1004 last if $i == $last_item;
1005 return if $status->{mustreturn
};
1008 my $success = eval { $self->_mirror_empty_xcollector (\
@xcollector,$pathdb,$recent_events);};
1009 if (!$success || $@
) {
1010 warn "Warning: Unknown error while mirroring: $@";
1015 if ($self->verbose) {
1016 print STDERR
"DONE\n";
1018 # once we've gone to the end we consider ourselves free of obligations
1020 $self->_mirror_unhide_tempfile ($trecentfile);
1021 $self->_mirror_perform_delayed_ops;
1037 my $recent_event = $recent_events->[$i];
1038 return if $done->covered ( $recent_event->{epoch
} );
1040 my $rec = $pathdb->{$recent_event->{path
}};
1041 if ($rec && $rec->{recentepoch
}) {
1043 ( $rec->{recentepoch
}, $recent_event->{epoch
} )){
1044 $done->register ($recent_events, [$i]);
1049 my $dst = $self->local_path($recent_event->{path
});
1050 if ($recent_event->{type
} eq "new"){
1051 $self->_mirror_item_new
1064 } elsif ($recent_event->{type
} eq "delete") {
1066 if ($options->{'skip-deletes'}) {
1067 $activity = "skipped";
1070 $activity = "not_found";
1071 } elsif (-l
$dst or not -d _
) {
1072 $self->delayed_operations->{unlink}{$dst}++;
1073 $activity = "deleted";
1075 $self->delayed_operations->{rmdir}{$dst}++;
1076 $activity = "deleted";
1079 $done->register ($recent_events, [$i]);
1081 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1084 warn "Warning: invalid upload type '$recent_event->{type}'";
1088 sub _mirror_item_new
{
1101 if ($self->verbose) {
1102 my $doing = -e
$dst ?
"Sync" : "Get";
1105 "%-4s %d (%d/%d/%s) %s ... ",
1111 $recent_event->{path
},
1114 my $max_files_per_connection = $self->max_files_per_connection || 42;
1116 if ($self->verbose) {
1119 push @
$xcollector, { rev
=> $recent_event, i
=> $i };
1120 if (@
$xcollector >= $max_files_per_connection) {
1121 $success = eval {$self->_mirror_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1122 my $sleep = $self->sleep_per_connection;
1123 $sleep = 0.42 unless defined $sleep;
1124 Time
::HiRes
::sleep $sleep;
1125 if ($options->{piecemeal
}) {
1126 $status->{mustreturn
} = 1;
1132 if (!$success || $@
) {
1133 warn "Warning: Error while mirroring: $@";
1137 if ($self->verbose) {
1138 print STDERR
"DONE\n";
1142 sub _mirror_empty_xcollector
{
1143 my($self,$xcoll,$pathdb,$recent_events) = @_;
1144 my $success = $self->mirror_path([map {$_->{rev
}{path
}} @
$xcoll]);
1146 $self->_mirror_register_path($pathdb,[map {$_->{rev
}} @
$xcoll],"rsync");
1148 $self->done->register($recent_events, [map {$_->{i
}} @
$xcoll]);
1153 sub _mirror_register_path
{
1154 my($self,$pathdb,$coll,$activity) = @_;
1156 for my $item (@
$coll) {
1157 $pathdb->{$item->{path
}} =
1159 recentepoch
=> $item->{epoch
},
1160 ($activity."_on") => $time,
1165 sub _mirror_unhide_tempfile
{
1166 my($self, $trecentfile) = @_;
1167 my $rfile = $self->rfile;
1168 if (rename $trecentfile, $rfile) {
1169 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1172 Carp
::confess
("Could not rename '$trecentfile' to '$rfile': $!");
1174 $self->_use_tempfile (0);
1175 if (my $ctfh = $self->_current_tempfile_fh) {
1176 $ctfh->unlink_on_destroy (0);
1177 $self->_current_tempfile_fh (undef);
1181 sub _mirror_perform_delayed_ops
{
1183 my $delayed = $self->delayed_operations;
1184 for my $dst (keys %{$delayed->{unlink}}) {
1185 unless (unlink $dst) {
1187 Carp
::cluck
( "Warning: Error while unlinking '$dst': $!" );
1189 delete $delayed->{unlink}{$dst};
1191 for my $dst (keys %{$delayed->{rmdir}}) {
1192 unless (rmdir $dst) {
1194 Carp
::cluck
( "Warning: Error on rmdir '$dst': $!" );
1196 delete $delayed->{rmdir}{$dst};
1200 =head2 (void) $obj->mirror_loop
1202 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1203 What happens/should happen if we miss the interval during a single loop?
1209 my $iteration_start = time;
1212 $SIG{INT
} = sub { $Signal++ };
1213 my $loopinterval = $self->loopinterval || 42;
1214 my $after = -999999999;
1216 $self->mirror($after);
1217 last LOOP
if $Signal;
1218 my $re = $self->recent_events;
1219 $after = $re->[0]{epoch
};
1220 if ($self->verbose) {
1224 if (time - $iteration_start < $loopinterval) {
1225 sleep $iteration_start + $loopinterval - time;
1227 if ($self->verbose) {
1234 =head2 $success = $obj->mirror_path ( $arrref | $path )
1236 If the argument is a scalar it is treated as a path. The remote path
1237 is mirrored into the local copy. $path is the path found in the
1238 I<recentfile>, i.e. it is relative to the root directory of the
1241 If the argument is an array reference then all elements are treated as
1242 a path below the current tree and all are rsynced with a single
1243 command (and a single connection).
1248 my($self,$path) = @_;
1249 # XXX simplify the two branches such that $path is treated as
1250 # [$path] maybe even demand the argument as an arrayref to
1251 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1253 if (ref $path and ref $path eq "ARRAY") {
1254 my $dst = $self->localroot;
1255 mkpath dirname
$dst;
1256 my($fh) = File
::Temp
->new(TEMPLATE
=> sprintf(".%s-XXXX",
1257 lc $self->filenameroot,
1262 for my $p (@
$path) {
1266 $fh->unlink_on_destroy(1);
1269 while (!$self->rsync->exec
1275 'files-from' => $fh->filename,
1277 my(@err) = $self->rsync->err;
1278 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1279 if ($self->verbose) {
1280 warn "Info: ignoring link_stat error '@err'";
1284 $self->register_rsync_error (@err);
1285 if (++$retried >= 3) {
1286 warn "XXX giving up.";
1292 $self->un_register_rsync_error ();
1295 my $dst = $self->local_path($path);
1296 mkpath dirname
$dst;
1297 while (!$self->rsync->exec
1305 my(@err) = $self->rsync->err;
1306 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1307 if ($self->verbose) {
1308 warn "Info: ignoring link_stat error '@err'";
1312 $self->register_rsync_error (@err);
1314 $self->un_register_rsync_error ();
1319 sub _my_current_rfile
{
1322 if ($self->_use_tempfile) {
1323 $rfile = $self->_current_tempfile;
1325 $rfile = $self->rfile;
1330 =head2 $path = $obj->naive_path_normalize ($path)
1332 Takes an absolute unix style path as argument and canonicalizes it to
1333 a shorter path if possible, removing things like double slashes or
1334 C</./> and removes references to C<../> directories to get a shorter
1335 unambiguos path. This is used to make the code easier that determines
1336 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1340 sub naive_path_normalize
{
1341 my($self,$path) = @_;
1343 1 while $path =~ s
|/[^/]+/\.\./|/|;
1348 =head2 $ret = $obj->read_recent_1 ( $data )
1350 Delegate of C<recent_events()> on protocol 1
1355 my($self, $data) = @_;
1356 return $data->{recent
};
1359 =head2 $array_ref = $obj->recent_events ( %options )
1361 Note: the code relies on the resource being written atomically. We
1362 cannot lock because we may have no write access. If the caller has
1363 write access (eg. aggregate() or update()), it has to care for any
1364 necessary locking and it MUST write atomically.
1366 If $options{after} is specified, only file events after this timestamp
1369 If $options{before} is specified, only file events before this
1370 timestamp are returned.
1372 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1375 If $options{max} is specified only a maximum of this many events is
1378 If $options{contains} is specified the value must be a hash reference
1379 containing a query. The query may contain the keys C<epoch>, C<path>,
1380 and C<type>. Each represents a condition that must be met. If there is
1381 more than one such key, the conditions are ANDed.
1383 If $options{info} is specified, it must be a hashref. This hashref
1384 will be filled with metadata about the unfiltered recent_events of
1385 this object, in key C<first> there is the first item, in key C<last>
1391 my ($self, %options) = @_;
1392 my $info = $options{info
};
1393 if ($self->is_slave) {
1394 $self->get_remote_recentfile_as_tempfile;
1396 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1397 -e
$rfile_or_tempfile or return [];
1398 my $suffix = $self->serializer_suffix;
1400 $self->_try_deserialize
1407 if ($err or !$data) {
1411 if (reftype
$data eq 'ARRAY') { # protocol 0
1414 $re = $self->_recent_events_protocol_x
1420 return $re unless grep {defined $options{$_}} qw(after before max);
1421 $self->_recent_events_handle_options ($re, \
%options);
1424 sub _recent_events_handle_options
{
1425 my($self, $re, $options) = @_;
1426 my $last_item = $#$re;
1427 my $info = $options->{info
};
1429 $info->{first
} = $re->[0];
1430 $info->{last} = $re->[-1];
1432 if (defined $options->{after
}) {
1433 if ($re->[0]{epoch
} > $options->{after
}) {
1436 {$re->[$_]{epoch
} <= $options->{after
}}
1446 if (defined $options->{before
}) {
1447 if ($re->[0]{epoch
} > $options->{before
}) {
1450 {$re->[$_]{epoch
} < $options->{before
}}
1459 if (0 != $first_item || -1 != $last_item) {
1460 @
$re = splice @
$re, $first_item, 1+$last_item-$first_item;
1462 if ($options->{'skip-deletes'}) {
1463 @
$re = grep { $_->{type
} ne "delete" } @
$re;
1465 if (my $contopt = $options->{contains
}) {
1466 my $seen_allowed = 0;
1467 for my $allow (qw(epoch path type)) {
1468 if (exists $contopt->{$allow}) {
1470 my $v = $contopt->{$allow};
1471 @
$re = grep { $_->{$allow} eq $v } @
$re;
1474 if (keys %$contopt > $seen_allowed) {
1477 (sprintf "unknown query: %s", join ", ", %$contopt);
1480 if ($options->{max
} && @
$re > $options->{max
}) {
1481 @
$re = splice @
$re, 0, $options->{max
};
1486 sub _recent_events_protocol_x
{
1491 my $meth = sprintf "read_recent_%d", $data->{meta
}{protocol
};
1492 # we may be reading meta for the first time
1493 while (my($k,$v) = each %{$data->{meta
}}) {
1494 next if $k ne lc $k; # "Producers"
1495 next if defined $self->$k;
1498 my $re = $self->$meth ($data);
1499 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1500 my $minmax = { mtime
=> $stat[9] };
1502 $minmax->{min
} = $re->[-1]{epoch
};
1503 $minmax->{max
} = $re->[0]{epoch
};
1505 $self->minmax ( $minmax );
1509 sub _try_deserialize
{
1514 if ($suffix eq ".yaml") {
1516 YAML
::Syck
::LoadFile
($rfile_or_tempfile);
1517 } elsif ($HAVE->{"Data::Serializer"}) {
1518 my $serializer = Data
::Serializer
->new
1519 ( serializer
=> $serializers{$suffix} );
1522 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1526 $serializer->raw_deserialize($serialized);
1528 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1532 sub _refresh_internals
{
1533 my($self, $dst) = @_;
1534 my $class = ref $self;
1535 my $rfpeek = $class->new_from_file ($dst);
1540 $self->$acc ( $rfpeek->$acc );
1542 my $old_dirtymark = $self->dirtymark;
1543 my $new_dirtymark = $rfpeek->dirtymark;
1544 if ($old_dirtymark && $new_dirtymark && _bigfloatgt
($new_dirtymark,$old_dirtymark)) {
1546 $self->dirtymark ( $new_dirtymark );
1551 =head2 $ret = $obj->rfilename
1553 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1554 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1560 my $file = sprintf("%s-%s%s",
1561 $self->filenameroot,
1563 $self->serializer_suffix,
1568 =head2 $str = $self->remote_dir
1570 The directory we are mirroring from.
1575 my($self, $set) = @_;
1577 $self->_remote_dir ($set);
1579 my $x = $self->_remote_dir;
1580 $self->is_slave (1);
1584 =head2 $str = $obj->remoteroot
1586 =head2 (void) $obj->remoteroot ( $set )
1588 Get/Set the composed prefix needed when rsyncing from a remote module.
1589 If remote_host, remote_module, and remote_dir are set, it is composed
1595 my($self, $set) = @_;
1597 $self->_remoteroot($set);
1599 my $remoteroot = $self->_remoteroot;
1600 unless (defined $remoteroot) {
1601 $remoteroot = sprintf
1604 defined $self->remote_host ?
($self->remote_host."::") : "",
1605 defined $self->remote_module ?
($self->remote_module."/") : "",
1606 defined $self->remote_dir ?
$self->remote_dir : "",
1608 $self->_remoteroot($remoteroot);
1613 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1615 Inverse method to L<rfilename>. $recentfilename is a plain filename of
1618 $filenameroot-$interval$serializer_suffix
1624 This filename is split into its parts and the parts are fed to the
1629 sub resolve_recentfilename
{
1630 my($self, $rfname) = @_;
1631 my($splitter) = qr
(^(.+)-([^-\
.]+)(\
.[^\
.]+));
1632 if (my($f,$i,$s) = $rfname =~ $splitter) {
1633 $self->filenameroot ($f);
1634 $self->interval ($i);
1635 $self->serializer_suffix ($s);
1637 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1642 =head2 my $rfile = $obj->rfile
1644 Returns the full path of the I<recentfile>
1650 my $rfile = $self->_rfile;
1651 return $rfile if defined $rfile;
1652 $rfile = File
::Spec
->catfile
1656 $self->_rfile ($rfile);
1660 =head2 $rsync_obj = $obj->rsync
1662 The File::Rsync object that this object uses for communicating with an
1669 my $rsync = $self->_rsync;
1670 unless (defined $rsync) {
1671 my $rsync_options = $self->rsync_options || {};
1672 if ($HAVE->{"File::Rsync"}) {
1673 $rsync = File
::Rsync
->new($rsync_options);
1674 $self->_rsync($rsync);
1676 die "File::Rsync required for rsync operations. Cannot continue";
1682 =head2 (void) $obj->register_rsync_error(@err)
1684 =head2 (void) $obj->un_register_rsync_error()
1686 Register_rsync_error is called whenever the File::Rsync object fails
1687 on an exec (say, connection doesn't succeed). It issues a warning and
1688 sleeps for an increasing amount of time. Un_register_rsync_error
1689 resets the error count. See also accessor C<max_rsync_errors>.
1694 my $no_success_count = 0;
1695 my $no_success_time = 0;
1696 sub register_rsync_error
{
1697 my($self, @err) = @_;
1699 $no_success_time = time;
1700 $no_success_count++;
1701 my $max_rsync_errors = $self->max_rsync_errors;
1702 $max_rsync_errors = MAX_INT
unless defined $max_rsync_errors;
1703 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1709 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1715 my $sleep = 12 * $no_success_count;
1716 $sleep = 300 if $sleep > 300;
1721 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1722 scalar(localtime($no_success_time)),
1729 sub un_register_rsync_error
{
1731 $no_success_time = 0;
1732 $no_success_count = 0;
1736 =head2 $clone = $obj->_sparse_clone
1738 Clones just as much from itself that it does not hurt. Experimental
1741 Note: what fits better: sparse or shallow? Other suggestions?
1747 my $new = bless {}, ref $self;
1757 max_files_per_connection
1761 sleep_per_connection
1765 $o = Storable
::dclone
$o if ref $o;
1771 =head2 $boolean = OBJ->ttl_reached ()
1777 my $have_mirrored = $self->have_mirrored || 0;
1778 my $now = Time
::HiRes
::time;
1779 my $ttl = $self->ttl;
1780 $ttl = 24.2 unless defined $ttl;
1781 if ($now > $have_mirrored + $ttl) {
1787 =head2 (void) $obj->unlock()
1789 Unlocking is implemented with an C<rmdir> on a locking directory
1790 (C<.lock> appended to $rfile).
1796 return unless $self->_is_locked;
1797 my $rfile = $self->rfile;
1798 rmdir "$rfile.lock";
1799 $self->_is_locked (0);
1804 Sets this recentfile in the state of not 'seeded'.
1812 =head2 $ret = $obj->update ($path, $type)
1814 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1816 =head2 $ret = $obj->update ()
1818 Enter one file into the local I<recentfile>. $path is the (usually
1819 absolute) path. If the path is outside I<our> tree, then it is
1822 $type is one of C<new> or C<delete>.
1824 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1825 not used and the epoch is calculated by the update() routine itself
1826 based on current time. But if there is the demand to insert a
1827 not-so-current file into the dataset, then the caller sets
1828 $dirty_epoch. This causes the epoch of the registered event to become
1829 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1830 bit more. As compensation the dirtymark of the whole dataset is set to
1833 The new file event is unshifted (or, if dirty_epoch is set, inserted
1834 at the place it belongs to, according to the rule to have a sequence
1835 of strictly decreasing timestamps) to the array of recent_events and
1836 the array is shortened to the length of the timespan allowed. This is
1837 usually the timespan specified by the interval of this recentfile but
1838 as long as this recentfile has not been merged to another one, the
1839 timespan may grow without bounds.
1841 The third form runs an update without inserting a new file. This may
1842 be disired to truncate a recentfile.
1845 sub _epoch_monotonically_increasing
{
1846 my($self,$epoch,$recent) = @_;
1847 return $epoch unless @
$recent; # the first one goes unoffended
1848 if (_bigfloatgt
("".$epoch,$recent->[0]{epoch
})) {
1851 return _increase_a_bit
($recent->[0]{epoch
});
1855 my($self,$path,$type,$dirty_epoch) = @_;
1856 if (defined $path or defined $type or defined $dirty_epoch) {
1857 die "update called without path argument" unless defined $path;
1858 die "update called without type argument" unless defined $type;
1859 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1860 die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1861 "dirty_epoch only allowed with type=new" if $dirty_epoch and $type ne "new";
1862 my $canonmeth = $self->canonize;
1863 unless ($canonmeth) {
1864 $canonmeth = "naive_path_normalize";
1866 $path = $self->$canonmeth($path);
1868 my $lrd = $self->localroot;
1870 # you must calculate the time after having locked, of course
1871 my $now = Time
::HiRes
::time;
1872 my $interval = $self->interval;
1873 my $secs = $self->interval_secs();
1874 my $recent = $self->recent_events;
1878 $epoch = $dirty_epoch;
1880 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
1884 my $oldest_allowed = 0;
1885 my $merged = $self->merged;
1886 if ($merged->{epoch
}) {
1887 my $virtualnow = max
($now,$epoch);
1888 # for the lower bound could we need big math?
1889 $oldest_allowed = min
($virtualnow - $secs, $merged->{epoch
}, $epoch);
1891 # as long as we are not merged at all, no limits!
1893 my $something_done = 0;
1894 TRUNCATE
: while (@
$recent) {
1895 # $DB::single++ unless defined $oldest_allowed;
1896 if (_bigfloatlt
($recent->[-1]{epoch
}, $oldest_allowed)) {
1898 $something_done = 1;
1903 if (defined $path && $path =~ s
|^\Q
$lrd\E
||) {
1906 # remove the older duplicates of this $path, irrespective of $type:
1908 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
1909 $recent = $ctx->{recent
};
1910 $splicepos = $ctx->{splicepos
};
1911 $epoch = $ctx->{epoch
};
1912 my $dirtymark = $self->dirtymark;
1914 if (_bigfloatgt
($epoch, $now)) {
1917 $self->dirtymark($new_dm);
1918 my $merged = $self->merged;
1919 if (not defined $merged->{epoch
} or _bigfloatlt
($epoch,$merged->{epoch
})) {
1923 $recent = [ grep { $_->{path
} ne $path } @
$recent ];
1926 if (defined $splicepos) {
1927 splice @
$recent, $splicepos, 0, { epoch
=> $epoch, path
=> $path, type
=> $type };
1929 $something_done = 1;
1932 $self->write_recent($recent) if $something_done;
1933 $self->_assert_symlink;
1937 sub _update_with_dirty_epoch
{
1938 my($self,$path,$recent,$epoch) = @_;
1940 my $new_recent = [];
1941 if (grep { $_->{path
} ne $path } @
$recent) {
1943 KNOWN_EVENT
: for my $i (0..$#$recent) {
1944 if ($recent->[$i]{path
} eq $path) {
1945 if ($recent->[$i]{epoch
} eq $epoch) {
1951 push @
$new_recent, $recent->[$i];
1954 @
$recent = @
$new_recent unless $cancel;
1956 if (!exists $recent->[0] or _bigfloatgt
($epoch,$recent->[0]{epoch
})) {
1958 } elsif (_bigfloatlt
($epoch,$recent->[-1]{epoch
})) {
1959 $splicepos = @
$recent;
1961 RECENT
: for my $i (0..$#$recent) {
1962 my $ev = $recent->[$i];
1963 if ($epoch eq $recent->[$i]{epoch
}) {
1964 $epoch = _increase_a_bit
($epoch, $i ?
$recent->[$i-1]{epoch
} : undef);
1966 if (_bigfloatgt
($epoch,$recent->[$i]{epoch
})) {
1974 splicepos
=> $splicepos,
1981 Sets this recentfile in the state of 'seeded' which means it has to
1982 re-evaluate its uptodateness.
1992 Tells if the recentfile is in the state 'seeded'.
1996 my($self, $set) = @_;
1998 $self->_seeded ($set);
2000 my $x = $self->_seeded;
2001 unless (defined $x) {
2003 $self->_seeded ($x);
2010 True if this object has mirrored the complete interval covered by the
2020 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2024 # it's too easy to misconfigure ttl and related timings and then
2025 # never reach uptodateness, so disabled 2009-03-22
2026 if (0 and not defined $uptodate) {
2027 if ($self->ttl_reached){
2028 $why = "ttl_reached returned true, so we are not uptodate";
2032 unless (defined $uptodate) {
2033 # look if recentfile has unchanged timestamp
2034 my $minmax = $self->minmax;
2035 if (exists $minmax->{mtime
}) {
2036 my $rfile = $self->_my_current_rfile;
2037 my @stat = stat $rfile;
2038 my $mtime = $stat[9];
2039 if ($mtime > $minmax->{mtime
}) {
2040 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2043 my $covered = $self->done->covered(@
$minmax{qw(max min)});
2044 $why = "minmax covered[$covered], so we return that";
2045 $uptodate = $covered;
2049 unless (defined $uptodate) {
2050 $why = "fallthrough, so not uptodate";
2054 $self->_uptodateness_ever_reached(1);
2059 uptodate
=> $uptodate,
2062 $self->_remember_last_uptodate_call($remember);
2066 =head2 $obj->write_recent ($recent_files_arrayref)
2068 Writes a I<recentfile> based on the current reflection of the current
2069 state of the tree limited by the current interval.
2074 @
{$_[1]} = sort { _bigfloatcmp
($b->{epoch
},$a->{epoch
}) } @
{$_[1]};
2078 my ($self,$recent) = @_;
2079 die "write_recent called without argument" unless defined $recent;
2081 SANITYCHECK
: for my $i (0..$#$recent) {
2082 if (defined($Last_epoch) and _bigfloatge
($recent->[$i]{epoch
},$Last_epoch)) {
2084 Carp
::confess
(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2085 $recent->[$i]{epoch
}, $Last_epoch, $self->interval);
2087 # $self->_resort($recent);
2090 $Last_epoch = $recent->[$i]{epoch
};
2092 my $meth = sprintf "write_%d", $self->protocol;
2093 $self->$meth($recent);
2096 =head2 $obj->write_0 ($recent_files_arrayref)
2098 Delegate of C<write_recent()> on protocol 0
2103 my ($self,$recent) = @_;
2104 my $rfile = $self->rfile;
2105 YAML
::Syck
::DumpFile
("$rfile.new",$recent);
2106 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2109 =head2 $obj->write_1 ($recent_files_arrayref)
2111 Delegate of C<write_recent()> on protocol 1
2116 my ($self,$recent) = @_;
2117 my $rfile = $self->rfile;
2118 my $suffix = $self->serializer_suffix;
2120 meta
=> $self->meta_data,
2124 if ($suffix eq ".yaml") {
2125 $serialized = YAML
::Syck
::Dump
($data);
2126 } elsif ($HAVE->{"Data::Serializer"}) {
2127 my $serializer = Data
::Serializer
->new
2128 ( serializer
=> $serializers{$suffix} );
2129 $serialized = $serializer->raw_serialize($data);
2131 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2133 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2134 print $fh $serialized;
2135 close $fh or die "Could not close '$rfile.new': $!";
2136 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2141 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2143 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
2145 The idea is that we want to have a short file that records really
2146 recent changes. So that a fresh mirror can be kept fresh as long as
2147 the connectivity is given. Then we want longer files that record the
2148 history before. So when the mirror falls behind the update period
2149 reflected in the shortest file, it can complement the list of recent
2150 file events with the next one. And if this is not long enough we want
2151 another one, again a bit longer. And we want one that completes the
2152 history back to the oldest file. The index files do contain the
2153 complete list of current files. The longer a period covered by an
2154 index file is gone the less often the index file is updated. For
2155 practical reasons adjacent files will often overlap a bit but this is
2156 neither necessary nor enforced. That's the basic idea. The following
2157 example represents a tree that has a few updates every day:
2159 RECENT.recent -> RECENT-1h.yaml
2168 The first file is the principal file, in so far it is the one that is
2169 written first after a filesystem change. Usually a symlink links to it
2170 with a filename that has the same filenameroot and the suffix
2171 C<.recent>. On systems that do not support symlinks there is a plain
2172 copy maintained instead.
2174 The last file, the Z file, contains the complementary files that are
2175 in none of the other files. It does never contain C<deletes>. Besides
2176 this it serves the role of a recovery mechanism or spill over pond.
2177 When things go wrong, it's a valuable controlling instance to hold the
2178 differences between the collection of limited interval files and the
2181 =head2 THE INDIVIDUAL RECENTFILE
2183 A I<recentfile> consists of a hash that has two keys: C<meta> and
2184 C<recent>. The C<meta> part has metadata and the C<recent> part has a
2185 list of fileobjects.
2187 =head2 THE META PART
2189 Here we find things that are pretty much self explaining: all
2190 lowercase attributes are accessors and as such explained somewhere
2191 above in this manpage. The uppercase attribute C<Producers> contains
2192 version information about involved software components. Nothing to
2193 worry about as I believe.
2195 =head2 THE RECENT PART
2197 This is the interesting part. Every entry refers to some filesystem
2198 change (with path, epoch, type). The epoch value is the point in time
2199 when some change was I<registered>. Do not be tempted to believe that
2200 the entry has a direct relation to something like modification time or
2201 change time on the filesystem level. The timestamp (I<epoch> element)
2202 is a floating point number and does practically never correspond
2203 exactly to the data recorded in the filesystem but rather to the time
2204 when some process succeeded to report some filesystem change to the
2205 I<recentfile> mechanism. This is why many parts of the code refer to
2206 I<events>, because we merely try to record the I<event> of the
2207 discovery of a change, not the time of the change itself.
2209 All these entries can be devided into two types (denoted by the
2210 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
2211 C<new>s. Deletes are C<delete>s.
2213 Besides an C<epoch> and a C<type> attribute we find a third one:
2214 C<path>. This path is relative to the directory we find the
2217 The order of the entries in the I<recentfile> is by decreasing epoch
2218 attribute. These are unique floating point numbers. When the server
2219 has ntp running correctly, then the timestamps are usually reflecting
2220 a real epoch. If time is running backwards, we trump the system epoch
2221 with strictly monotonically increasing floating point timestamps and
2222 guarantee they are unique.
2224 =head1 CORRUPTION AND RECOVERY
2226 If the origin host breaks the promise to deliver consistent and
2227 complete I<recentfiles> then the way back to sanity shall be achieved
2228 through traditional rsyncing between the hosts. But don't forget to
2229 report it as a bug:)
2233 The following suffixes are supported and trigger the use of these
2238 =item C<< ".yaml" => "YAML::Syck" >>
2240 =item C<< ".json" => "JSON" >>
2242 =item C<< ".sto" => "Storable" >>
2244 =item C<< ".dd" => "Data::Dumper" >>
2252 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2254 =head1 INTERVAL SPEC
2256 An interval spec is a primitive way to express time spans. Normally it
2257 is composed from an integer and a letter.
2259 As a special case, a string that consists only of the single letter
2260 C<Z>, stands for unlimited time.
2262 The following letters express the specified number of seconds:
2268 =item C<< m => 60 >>
2270 =item C<< h => 60*60 >>
2272 =item C<< d => 60*60*24 >>
2274 =item C<< W => 60*60*24*7 >>
2276 =item C<< M => 60*60*24*30 >>
2278 =item C<< Q => 60*60*24*90 >>
2280 =item C<< Y => 60*60*24*365.25 >>
2288 This is about speeding up rsync operation on large trees. Uses a small
2289 metadata cocktail and pull technology.
2291 =head2 NON-COMPETITORS
2293 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
2294 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
2295 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
2296 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
2297 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
2298 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
2300 rsnapshot www.rsnapshot.org focus on backup
2301 csync www.csync.org more like unison
2302 multi-rsync sourceforge 167893 lan push to many
2306 The problem to solve which clusters and ftp mirrors and otherwise
2307 replicated datasets like CPAN share: how to transfer only a minimum
2308 amount of data to determine the diff between two hosts.
2310 Normally it takes a long time to determine the diff itself before it
2311 can be transferred. Known solutions at the time of this writing are
2312 csync2, and rsync 3 batch mode.
2314 For many years the best solution was csync2 which solves the problem
2315 by maintaining a sqlite database on both ends and talking a highly
2316 sophisticated protocol to quickly determine which files to send and
2317 which to delete at any given point in time. Csync2 is often
2318 inconvenient because it is push technology and the act of syncing
2319 demands quite an intimate relationship between the sender and the
2320 receiver. This is hard to achieve in an environment of loosely coupled
2321 sites where the number of sites is large or connections are
2322 unreliable or network topology is changing.
2324 Rsync 3 batch mode works around these problems by providing rsync-able
2325 batch files which allow receiving nodes to replay the history of the
2326 other nodes. This reduces the need to have an incestuous relation but
2327 it has the disadvantage that these batch files replicate the contents
2328 of the involved files. This seems inappropriate when the nodes already
2329 have a means of communicating over rsync.
2331 rersyncrecent solves this problem with a couple of (usually 2-10)
2332 index files which cover different overlapping time intervals. The
2333 master writes these files and the clients/slaves can construct the
2334 full tree from the information contained in them. The most recent
2335 index file usually covers the last seconds or minutes or hours of the
2336 tree and depending on the needs, slaves can rsync every few seconds or
2337 minutes and then bring their trees in full sync.
2339 The rersyncrecent mode was developed for CPAN but I hope it is a
2340 convenient and economic general purpose solution. I'm looking forward
2341 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2342 then ... the first FUSE based CPAN filesystem anyone?
2344 =head1 FUTURE DIRECTIONS
2346 Currently the origin server must keep track of injected and removed
2347 files. Should be supported by an inotify-based assistant.
2351 Barbie is providing a database of release dates. See
2352 http://use.perl.org/~barbie/journal/37907
2360 Please report any bugs or feature requests through the web interface
2362 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2363 I will be notified, and then you'll automatically be notified of
2364 progress on your bug as I make changes.
2368 Memory hungry: it seems all memory is allocated during the initial
2369 rsync where a list of all files is maintained in memory.
2373 You can find documentation for this module with the perldoc command.
2375 perldoc File::Rsync::Mirror::Recentfile
2377 You can also look for information at:
2381 =item * RT: CPAN's request tracker
2383 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2385 =item * AnnoCPAN: Annotated CPAN documentation
2387 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2389 =item * CPAN Ratings
2391 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2395 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2400 =head1 ACKNOWLEDGEMENTS
2402 Thanks to RJBS for module-starter.
2404 =head1 COPYRIGHT & LICENSE
2406 Copyright 2008,2009 Andreas König.
2408 This program is free software; you can redistribute it and/or modify it
2409 under the same terms as Perl itself.
2414 1; # End of File::Rsync::Mirror::Recentfile
2418 # cperl-indent-level: 4