introduce max_error_count
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob673ebac052e3b59438aa2f4afaf23fcf352239c7
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 package File::Rsync::Mirror::Recentfile;
20 my $HAVE = {};
21 for my $package (qw( Data::Serializer File::Rsync )) {
22 $HAVE->{$package} = eval qq{ require $package; };
24 use File::Basename qw(dirname fileparse);
25 use File::Copy qw(cp);
26 use File::Path qw(mkpath);
27 use File::Temp;
28 use List::Util qw(first);
29 use Scalar::Util qw(reftype);
30 use Storable;
31 use Time::HiRes qw();
32 use YAML::Syck;
34 use version; our $VERSION = qv('0.0.1');
37 use constant MAX_INT => ~0>>1; # anything better?
39 # cf. interval_secs
40 my %seconds;
42 # maybe subclass if this mapping is bad?
43 my %serializers;
45 =head1 SYNOPSIS
47 B<!!!! PRE-ALPHA ALERT !!!!>
49 Nothing in here is believed to be stable, nothing yet intended for
50 public consumption. The plan is to provide a script in one of the next
51 releases that acts as a frontend for all the backend functionality.
52 Option and method names will very likely change.
54 For the rationale see the section BACKGROUND.
56 This is published only for developers of the (yet to be named)
57 script(s).
59 Writer (of a single file):
61 use File::Rsync::Mirror::Recentfile;
62 my $fr = File::Rsync::Mirror::Recentfile->new
64 interval => q(6h),
65 filenameroot => "RECENT",
66 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
67 localroot => "/home/ftp/pub/PAUSE/authors/",
68 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
70 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
72 Reader/mirrorer:
74 my $rf = File::Rsync::Mirror::Recentfile->new
76 filenameroot => "RECENT",
77 ignore_link_stat_errors => 1,
78 interval => q(6h),
79 localroot => "/home/ftp/pub/PAUSE/authors",
80 remote_dir => "",
81 remote_host => "pause.perl.org",
82 remote_module => "authors",
83 rsync_options => {
84 compress => 1,
85 'rsync-path' => '/usr/bin/rsync',
86 links => 1,
87 times => 1,
88 'omit-dir-times' => 1,
89 checksum => 1,
91 verbose => 1,
93 $rf->mirror;
95 Aggregator (usually the writer):
97 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
98 $rf->aggregate;
100 =head1 EXPORT
102 No exports.
104 =head1 CONSTRUCTORS
106 =head2 my $obj = CLASS->new(%hash)
108 Constructor. On every argument pair the key is a method name and the
109 value is an argument to that method name.
111 =cut
113 sub new {
114 my($class, @args) = @_;
115 my $self = bless {}, $class;
116 while (@args) {
117 my($method,$arg) = splice @args, 0, 2;
118 $self->$method($arg);
120 unless (defined $self->protocol) {
121 $self->protocol(1);
123 unless (defined $self->filenameroot) {
124 $self->filenameroot("RECENT");
126 unless (defined $self->serializer_suffix) {
127 $self->serializer_suffix(".yaml");
129 return $self;
132 =head2 my $obj = CLASS->new_from_file($file)
134 Constructor. $file is a I<recentfile>.
136 =cut
138 sub new_from_file {
139 my($class, $file) = @_;
140 my $self = bless {}, $class;
141 $self->_rfile($file);
142 #?# $self->lock;
143 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
144 local $/;
145 <$fh>;
147 my($name,$path,$suffix) = fileparse $file, keys %serializers;
148 $self->serializer_suffix($suffix);
149 $self->localroot($path);
150 die "Could not determine file format from suffix" unless $suffix;
151 my $deserialized;
152 if ($HAVE->{"Data::Serializer"}) {
153 my $serializer = Data::Serializer->new
155 serializer => $serializers{$suffix},
156 secret => undef,
157 compress => 0,
158 digest => 0,
159 portable => 0,
160 encoding => "raw",
162 $deserialized = $serializer->deserialize($serialized);
163 } else {
164 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'"
165 unless $suffix eq ".yaml";
166 require YAML::Syck;
167 $deserialized = YAML::Syck::LoadFile($file);
169 while (my($k,$v) = each %{$deserialized->{meta}}) {
170 next if $k ne lc $k; # "Producers"
171 $self->$k($v);
173 unless (defined $self->protocol) {
174 $self->protocol(1);
176 return $self;
179 =head1 ACCESSORS
181 =cut
183 my @accessors;
185 BEGIN {
186 @accessors = (
187 "_current_tempfile",
188 "_interval",
189 "_is_locked",
190 "_localroot",
191 "_remotebase",
192 "_rfile",
193 "_rsync",
194 "_use_tempfile",
197 my @pod_lines =
198 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
200 =over 4
202 =item aggregator
204 A list of interval specs that tell the aggregator which I<recentfile>s
205 are to be produced.
207 =item canonize
209 The name of a method to canonize the path before rsyncing. Only
210 supported value is C<naive_path_normalize>. Defaults to that.
212 =item comment
214 A comment about this tree and setup.
216 =item filenameroot
218 The (prefix of the) filename we use for this I<recentfile>. Defaults to
219 C<RECENT>.
221 =item ignore_link_stat_errors
223 If set to true, rsync errors are ignored that complain about link stat
224 errors. These seem to happen only when there are files missing at the
225 origin. In race conditions this can always happen, so it is
226 recommended to set this value to true.
228 =item locktimeout
230 After how many seconds shall we die if we cannot lock a I<recentfile>?
231 Defaults to 600 seconds.
233 =item loopinterval
235 When mirror_loop is called, this accessor can specify how much time
236 every loop shall at least take. If the work of a loop is done before
237 that time has gone, sleeps for the rest of the time. Defaults to
238 arbitrary 42 seconds.
240 =item max_files_per_connection
242 Maximum number of files that are transferred on a single rsync call.
243 Setting it higher means higher performance at the price of holding
244 connections longer and potentially disturbing other users in the pool.
245 Defaults to the arbitrary value 42.
247 =item max_rsync_errors
249 When rsync operations encounter that many errors without any resetting
250 success in between, then we die. Defaults to -1 which means we run
251 forever ignoring all rsync errors.
253 =item protocol
255 When the RECENT file format changes, we increment the protocol. We try
256 to support older protocols in later releases.
258 =item remote_dir
260 The directory we are mirroring from.
262 =item remote_host
264 The host we are mirroring from. Leave empty for the local filesystem.
266 =item remote_module
268 Rsync servers have so called modules to separate directory trees from
269 each other. Put here the name of the module under which we are
270 mirroring. Leave empty for local filesystem.
272 =item rsync_options
274 Things like compress, links, times or checksums. Passed in to the
275 File::Rsync object used to run the mirror.
277 =item serializer_suffix
279 Untested accessor. The only tested format for I<recentfile>s at the
280 moment is YAML. It is used with YAML::Syck via Data::Serializer. But
281 in principle other formats are supported as well. See section
282 SERIALIZERS below.
284 =item sleep_per_connection
286 Sleep that many seconds (floating point OK) after every chunk of rsyncing
287 has finished. Defaults to arbitrary 0.42.
289 =item verbose
291 Boolean to turn on a bit verbosity.
293 =back
295 =cut
297 use accessors @accessors;
299 =head1 METHODS
301 =head2 (void) $obj->aggregate
303 Takes all intervals that are collected in the accessor called
304 aggregator. Sorts them by actual length of the interval.
305 Removes those that are shorter than our own interval. Then merges this
306 object into the next larger object. The merging continues upwards
307 as long as the next I<recentfile> is old enough to warrant a merge.
309 If a merge is warranted is decided according to the interval of the
310 previous interval so that larger files are not so often updated as
311 smaller ones.
313 Here is an example to illustrate the behaviour. Given aggregators
315 1h 1d 1W 1M 1Q 1Y Z
317 then
319 1h updates 1d on every call to aggregate()
320 1d updates 1W earliest after 1h
321 1W updates 1M earliest after 1d
322 1M updates 1Q earliest after 1W
323 1Q updates 1Y earliest after 1M
324 1Y updates Z earliest after 1Q
326 Note that all but the smallest recentfile get updated at an arbitrary
327 rate and as such are quite useless on their own.
329 =cut
331 sub aggregate {
332 my($self) = @_;
333 my @aggs = sort { $a->{secs} <=> $b->{secs} }
334 grep { $_->{secs} >= $self->interval_secs }
335 map { { interval => $_, secs => $self->interval_secs($_)} }
336 $self->interval, @{$self->aggregator || []};
337 $aggs[0]{object} = $self;
338 AGGREGATOR: for my $i (0..$#aggs-1) {
339 my $this = $aggs[$i]{object};
340 my $next = Storable::dclone $this;
341 $next->interval($aggs[$i+1]{interval});
342 my $want_merge = 0;
343 if ($i == 0) {
344 $want_merge = 1;
345 } else {
346 my $next_rfile = $next->rfile;
347 if (-e $next_rfile) {
348 my $prev = $aggs[$i-1]{object};
349 local $^T = time;
350 my $next_age = 86400 * -M $next_rfile;
351 if ($next_age > $prev->interval_secs) {
352 $want_merge = 1;
354 } else {
355 $want_merge = 1;
358 if ($want_merge) {
359 $next->merge($this);
360 $aggs[$i+1]{object} = $next;
361 } else {
362 last AGGREGATOR;
367 sub _debug_aggregate {
368 my($self) = @_;
369 my @aggs = sort { $a->{secs} <=> $b->{secs} }
370 map { { interval => $_, secs => $self->interval_secs($_)} }
371 $self->interval, @{$self->aggregator || []};
372 my $report = [];
373 for my $i (0..$#aggs) {
374 my $this = Storable::dclone $self;
375 $this->interval($aggs[$i]{interval});
376 my $rfile = $this->rfile;
377 my @stat = stat $rfile;
378 push @$report, [$rfile, map {$stat[$_]||"undef"} 7,9];
380 $report;
383 # (void) $self->_assert_symlink()
384 sub _assert_symlink {
385 my($self) = @_;
386 my $symlink = File::Spec->catfile
388 $self->localroot,
389 sprintf
391 "%s.recent",
392 $self->filenameroot
395 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
396 if (-l $symlink) {
397 my $found_symlink = readlink $symlink;
398 if ($found_symlink eq $self->recentfile_basename) {
399 return;
400 } else {
401 $howto_create_symlink = 2;
403 } else {
404 $howto_create_symlink = 1;
406 if (1 == $howto_create_symlink) {
407 symlink $self->recentfile_basename, $symlink or die "Could not create symlink '$symlink': $!"
408 } else {
409 unlink "$symlink.$$"; # may fail
410 symlink $self->recentfile_basename, "$symlink.$$" or die "Could not create symlink '$symlink.$$': $!";
411 rename "$symlink.$$", $symlink or die "Could not rename '$symlink.$$' to $symlink: $!"; }
414 =head2 $success = $obj->full_mirror
416 (TBD) Mirrors the whole remote site, starting with the smallest I<recentfile>,
417 switching to larger ones ...
419 =cut
421 sub full_mirror {
422 my($self) = @_;
423 warn "Not yet implemented";
426 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile
428 Stores the remote I<recentfile> locally as a tempfile
430 =cut
432 sub get_remote_recentfile_as_tempfile {
433 my($self) = @_;
434 mkpath $self->localroot;
435 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
436 $self->recentfile_basename,
438 DIR => $self->localroot,
439 SUFFIX => $self->serializer_suffix,
440 UNLINK => 0,
442 my($trecentfile) = $fh->filename;
443 my $rfile = $self->rfile;
444 if (-e $rfile) {
445 # saving on bandwidth. Might need to be configurable
446 # $self->bandwidth_is_cheap?
447 cp $rfile, $trecentfile or die "Could not copy '$rfile' to '$trecentfile': $!"
449 while (!$self->rsync->exec(
450 src => join("/",
451 $self->remotebase,
452 $self->recentfile_basename),
453 dst => $trecentfile,
454 )) {
455 $self->register_rsync_error ($self->rsync->err);
457 $self->un_register_rsync_error ();
458 my $mode = 0644;
459 chmod $mode, $trecentfile or die "Could not chmod $mode '$trecentfile': $!";
460 $self->_current_tempfile ($trecentfile);
461 return $trecentfile;
464 =head2 $localpath = $obj->get_remotefile ( $relative_path )
466 Rsyncs one single remote file to local filesystem.
468 =cut
470 sub get_remotefile {
471 my($self, $path) = @_;
472 my $lfile = File::Spec->catfile($self->localroot, $path);
473 mkpath dirname $lfile;
474 while (!$self->rsync->exec(
475 src => join("/",
476 $self->remotebase,
477 $path),
478 dst => $lfile,
479 )) {
480 $self->register_rsync_error ($self->rsync->err);
482 $self->un_register_rsync_error ();
483 return $lfile;
486 =head2 $obj->interval ( $interval_spec )
488 Get/set accessor. $interval_spec is a string and described below in
489 the section INTERVAL SPEC.
491 =cut
493 sub interval {
494 my ($self, $interval) = @_;
495 if (@_ >= 2) {
496 $self->_interval($interval);
497 $self->_rfile(undef);
499 $interval = $self->_interval;
500 unless (defined $interval) {
501 # do not ask the $self too much, it recurses!
502 die "Alert: interval undefined for '".$self."'. Cannot continue.";
504 return $interval;
507 =head2 $secs = $obj->interval_secs ( $interval_spec )
509 $interval_spec is described below in the section INTERVAL SPEC. If
510 empty defaults to the inherent interval for this object.
512 =cut
514 sub interval_secs {
515 my ($self, $interval) = @_;
516 $interval ||= $self->interval;
517 unless (defined $interval) {
518 die "interval_secs() called without argument on an object without a declared one";
520 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
521 die "Could not determine seconds from interval[$interval]";
522 if ($interval eq "Z") {
523 return MAX_INT;
524 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
525 return $seconds{$t}*$n;
526 } else {
527 die "Invalid interval specification: n[$n]t[$t]";
531 =head2 $obj->localroot ( $localroot )
533 Get/set accessor. The local root of the tree.
535 =cut
537 sub localroot {
538 my ($self, $localroot) = @_;
539 if (@_ >= 2) {
540 $self->_localroot($localroot);
541 $self->_rfile(undef);
543 $localroot = $self->_localroot;
546 =head2 $ret = $obj->local_event_path
548 Misnomer, deprecated. Use local_path instead
550 =cut
552 sub local_event_path {
553 my($self,$path) = @_;
554 require Carp;
555 Carp::cluck("Deprecated method local_event_path called. Please use local_path instead");
556 my @p = split m|/|, $path; # rsync paths are always slash-separated
557 File::Spec->catfile($self->localroot,@p);
560 =head2 $ret = $obj->local_path($path_found_in_recentfile)
562 Combines the path to our local mirror and the path of an object found
563 in this I<recentfile>. In other words: the target of a mirror operation.
565 =cut
567 sub local_path {
568 my($self,$path) = @_;
569 unless (defined $path) {
570 return $self->localroot;
572 my @p = split m|/|, $path; # rsync paths are always slash-separated
573 File::Spec->catfile($self->localroot,@p);
576 =head2 (void) $obj->lock
578 Locking is implemented with an C<mkdir> on a locking directory
579 (C<.lock> appended to $rfile).
581 =cut
583 sub lock {
584 my ($self) = @_;
585 # not using flock because it locks on filehandles instead of
586 # old school ressources.
587 my $locked = $self->_is_locked and return;
588 my $rfile = $self->rfile;
589 # XXX need a way to allow breaking the lock
590 my $start = time;
591 my $locktimeout = $self->locktimeout || 600;
592 while (not mkdir "$rfile.lock") {
593 Time::HiRes::sleep 0.01;
594 if (time - $start > $locktimeout) {
595 die "Could not acquire lockdirectory '$rfile.lock': $!";
598 $self->_is_locked (1);
601 =head2 $ret = $obj->merge ($other)
603 Bulk update of this object with another one. It's intended (but not
604 enforced) to only merge smaller and younger $other objects into the
605 current one. If this file is a C<Z> file, then we do not merge in
606 objects of type C<delete>. But if we encounter an object of type
607 delete we delete the corresponding C<add> object.
609 =cut
611 sub merge {
612 my($self,$other) = @_;
613 my $lrd = $self->localroot;
614 my $other_recent = $other->recent_events || [];
615 $self->lock;
616 my $interval = $self->interval;
617 my $secs = $self->interval_secs();
618 my $my_recent = $self->recent_events || [];
620 # calculate the target time span
621 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
622 if ($epoch) {
623 my $oldest_allowed = $epoch - $secs;
624 # throw away outsiders
625 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
626 pop @$my_recent;
630 my %have;
631 my $recent = [];
632 for my $ev (@$other_recent) {
633 my $path = $ev->{path};
634 next if $have{$path}++;
635 if ($self->interval eq "Z" and $ev->{type} eq "delete") {
636 } else {
637 push @$recent, { epoch => $ev->{epoch}, path => $path, type => $ev->{type} };
640 push @$recent, grep { !$have{$_->{path}}++ } @$my_recent;
641 $self->recent_events($recent);
642 $self->write_recent($recent);
643 $self->unlock;
646 =head2 $hashref = $obj->meta_data
648 Returns the hashref of metadata that the server has to add to the
649 I<recentfile>.
651 =cut
653 sub meta_data {
654 my($self) = @_;
655 my $ret = $self->{meta};
656 for my $m (
657 "aggregator",
658 "canonize",
659 "comment",
660 "filenameroot",
661 "interval",
662 "protocol",
664 $ret->{$m} = $self->$m;
666 # XXX need to reset the Producer if I am a writer, keep it when I
667 # am a reader
668 $ret->{Producers} ||= {
669 __PACKAGE__, "$VERSION", # stringified it looks better
671 return $ret;
674 =head2 $success = $obj->mirror ( %options )
676 Mirrors the files in this I<recentfile>. If $options{after} is
677 specified, only file events after this timestamp are being mirrored.
679 =cut
681 sub mirror {
682 my($self, %options) = @_;
683 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
684 my ($recent_data) = $self->recent_events_from_tempfile();
685 my $i = 0;
686 my @error;
687 my @collector;
688 my $last_item = $#$recent_data;
689 if (defined $options{after}) {
690 if ($recent_data->[0]{epoch} > $options{after}) {
691 if (
692 my $f = first
693 {$recent_data->[$_]{epoch} <= $options{after}}
694 0..$#$recent_data
696 $last_item = $f-1;
698 } else {
699 $last_item = -1;
702 ITEM: for my $i (0..$last_item) {
703 my $recent_event = $recent_data->[$i];
704 my $dst = $self->local_path($recent_event->{path});
705 if ($recent_event->{type} eq "new"){
706 if ($self->verbose) {
707 my $doing = -e $dst ? "Syncing" : "Getting";
708 printf STDERR
710 "%s (%d/%d) %s ... ",
711 $doing,
712 1+$i,
713 1+$last_item,
714 $recent_event->{path},
717 my $max_files_per_connection = $self->max_files_per_connection || 42;
718 my $success;
719 if ($max_files_per_connection == 1) {
720 # old code path may go away when the collector has
721 # proved useful...
722 $success = eval { $self->mirror_path($recent_event->{path}) };
723 } else {
724 if ($self->verbose) {
725 print STDERR "\n";
727 push @collector, $recent_event->{path};
728 if (@collector == $max_files_per_connection) {
729 $success = eval { $self->mirror_path(\@collector) };
730 @collector = ();
731 my $sleep = $self->sleep_per_connection;
732 $sleep = 0.42 unless defined $sleep;
733 Time::HiRes::sleep $sleep;
734 } else {
735 next ITEM;
738 if (!$success || $@) {
739 warn "error while mirroring: $@";
740 push @error, $@;
741 sleep 1;
743 if ($self->verbose) {
744 print STDERR "DONE\n";
746 } elsif ($recent_event->{type} eq "delete") {
747 if (-l $dst or not -d _) {
748 unlink $dst or warn "error while unlinking '$dst': $!";
749 } else {
750 rmdir $dst or warn "error on rmdir '$dst': $!";
752 } else {
753 warn "Warning: invalid upload type '$recent_event->{type}'";
756 if (@collector) {
757 my $success = eval { $self->mirror_path(\@collector) };
758 if (!$success || $@) {
759 warn "Warning: Unknown error while mirroring: $@";
760 push @error, $@;
761 sleep 1;
763 if ($self->verbose) {
764 print STDERR "DONE\n";
767 rename $trecentfile, $self->rfile;
768 return !@error;
771 =head2 (void) $obj->mirror_loop
773 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
774 What happens/should happen if we miss the interval during a single loop?
776 =cut
778 sub mirror_loop {
779 my($self) = @_;
780 my $iteration_start = time;
782 my $Signal = 0;
783 $SIG{INT} = sub { $Signal++ };
784 my $loopinterval = $self->loopinterval || 42;
785 my $after = -999999999;
786 LOOP: while () {
787 $self->mirror($after);
788 last LOOP if $Signal;
789 my $re = $self->recent_events;
790 $after = $re->[0]{epoch};
791 if ($self->verbose) {
792 local $| = 1;
793 print "($after)";
795 if (time - $iteration_start < $loopinterval) {
796 sleep $iteration_start + $loopinterval - time;
798 if ($self->verbose) {
799 local $| = 1;
800 print "~";
805 =head2 $success = $obj->mirror_path ( $arrref | $path )
807 If the argument is a scalar it is treated as a path. The remote path
808 is mirrored into the local copy. $path is the path found in the
809 I<recentfile>, i.e. it is relative to the root directory of the
810 mirror.
812 If the argument is an array reference then all elements are treated as
813 a path below the current tree and all are rsynced with a single
814 command (and a single connection).
816 =cut
818 sub mirror_path {
819 my($self,$path) = @_;
820 # XXX simplify the two branches such that $path is treated as
821 # [$path] maybe even demand the argument as an arrayref to
822 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
823 # interface)
824 if (ref $path and ref $path eq "ARRAY") {
825 my $dst = $self->local_path();
826 mkpath dirname $dst;
827 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
828 lc $self->filenameroot,
830 TMPDIR => 1,
831 UNLINK => 0,
833 for my $p (@$path) {
834 print $fh $p, "\n";
836 $fh->flush;
837 $fh->unlink_on_destroy(1);
838 while (!$self->rsync->exec
840 src => join("/",
841 $self->remotebase,
843 dst => $dst,
844 'files-from' => $fh->filename,
845 )) {
846 my($err) = $self->rsync->err;
847 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
848 if ($self->verbose) {
849 warn "Info: ignoring link_stat error '$err'";
851 return 1;
853 $self->register_rsync_error ($err);
855 $self->un_register_rsync_error ();
856 } else {
857 my $dst = $self->local_path($path);
858 mkpath dirname $dst;
859 while (!$self->rsync->exec
861 src => join("/",
862 $self->remotebase,
863 $path
865 dst => $dst,
866 )) {
867 my($err) = $self->rsync->err;
868 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
869 if ($self->verbose) {
870 warn "Info: ignoring link_stat error '$err'";
872 return 1;
874 $self->register_rsync_error ($err);
876 $self->un_register_rsync_error ();
878 return 1;
881 =head2 $path = $obj->naive_path_normalize ($path)
883 Takes an absolute unix style path as argument and canonicalizes it to
884 a shorter path if possible, removing things like double slashes or
885 C</./> and removes references to C<../> directories to get a shorter
886 unambiguos path. This is used to make the code easier that determines
887 if a file passed to C<upgrade()> is indeed below our C<localroot>.
889 =cut
891 sub naive_path_normalize {
892 my($self,$path) = @_;
893 $path =~ s|/+|/|g;
894 1 while $path =~ s|/[^/]+/\.\./|/|;
895 $path =~ s|/$||;
896 $path;
899 =head2 $ret = $obj->read_recent_1 ( $recent_data )
901 Delegate of C<recent_events()> on protocol 1
903 =cut
905 sub read_recent_1 {
906 my($self,$data) = @_;
907 return $data->{recent};
910 =head2 $array_ref = $obj->recent_events
912 Note: the code relies on the resource being written atomically. We
913 cannot lock because we may have no write access.
915 =cut
917 sub recent_events {
918 my ($self) = @_;
919 my $rfile = $self->rfile;
920 my ($data) = eval {YAML::Syck::LoadFile($rfile);};
921 my $err = $@;
922 if ($err or !$data) {
923 return [];
925 if (reftype $data eq 'ARRAY') { # protocol 0
926 return $data;
927 } else {
928 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
929 return $self->$meth($data);
933 =head2 $array_ref = $obj->recent_events_from_tempfile
935 Reads the file-events in the temporary local mirror of the remote file.
937 =cut
939 sub recent_events_from_tempfile {
940 my ($self) = @_;
941 $self->_use_tempfile(1);
942 my $ret = $self->recent_events;
943 $self->_use_tempfile(0);
944 return $ret;
947 =head2 $ret = $obj->recentfile
949 deprecated, use rfile instead
951 =cut
953 sub recentfile {
954 my($self) = @_;
955 require Carp;
956 Carp::cluck("deprecated method recentfile called. Please use rfile instead!");
957 my $recent = File::Spec->catfile(
958 $self->localroot,
959 $self->recentfile_basename,
961 return $recent;
964 =head2 $ret = $obj->recentfile_basename
966 Just the basename of our I<recentfile>, composed from C<filenameroot>,
967 C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
969 =cut
971 sub recentfile_basename {
972 my($self) = @_;
973 my $file = sprintf("%s-%s%s",
974 $self->filenameroot,
975 $self->interval,
976 $self->serializer_suffix,
978 return $file;
981 =head2 $str = $obj->remotebase
983 =head2 (void) $obj->remotebase ( $set )
985 Get/Set the composed prefix needed when rsyncing from a remote module.
986 If remote_host, remote_module, and remote_dir are set, it is composed
987 from these.
989 =cut
991 sub remotebase {
992 my($self, $set) = @_;
993 if (defined $set) {
994 $self->_remotebase($set);
996 my $remotebase = $self->_remotebase;
997 unless (defined $remotebase) {
998 $remotebase = sprintf
1000 "%s%s%s",
1001 defined $self->remote_host ? ($self->remote_host."::") : "",
1002 defined $self->remote_module ? ($self->remote_module."/") : "",
1003 defined $self->remote_dir ? $self->remote_dir : "",
1005 $self->_remotebase($remotebase);
1007 return $remotebase;
1010 =head2 my $rfile = $obj->rfile
1012 Returns the full path of the I<recentfile>
1014 =cut
1016 sub rfile {
1017 my($self) = @_;
1018 if ($self->_use_tempfile) {
1019 return $self->_current_tempfile;
1020 } else {
1021 my $rfile = $self->_rfile;
1022 return $rfile if defined $rfile;
1023 $rfile = File::Spec->catfile
1024 ($self->localroot,
1025 $self->recentfile_basename,
1027 $self->_rfile ($rfile);
1028 return $rfile;
1032 =head2 $rsync_obj = $obj->rsync
1034 The File::Rsync object that this object uses for communicating with an
1035 upstream server.
1037 =cut
1039 sub rsync {
1040 my($self) = @_;
1041 my $rsync = $self->_rsync;
1042 unless (defined $rsync) {
1043 my $rsync_options = $self->rsync_options || {};
1044 if ($HAVE->{"File::Rsync"}) {
1045 $rsync = File::Rsync->new($rsync_options);
1046 $self->_rsync($rsync);
1047 } else {
1048 die "File::Rsync required for rsync operations. Cannot continue";
1051 return $rsync;
1054 =head2 (void) $obj->register_rsync_error($err)
1056 =head2 (void) $obj->un_register_rsync_error()
1058 Register_rsync_error is called whenever the File::Rsync object fails
1059 on an exec (say, connection doesn't succeed). It issues a warning and
1060 sleeps for an increasing amount of time. Un_register_rsync_error
1061 resets the error count. See also accessor C<max_rsync_errors>.
1063 =cut
1066 my $no_success_count = 0;
1067 my $no_success_time = 0;
1068 sub register_rsync_error {
1069 my($self, $err) = @_;
1070 chomp $err;
1071 $no_success_time = time;
1072 $no_success_count++;
1073 my $max_rsync_errors = $self->max_rsync_errors;
1074 $max_rsync_errors = -1 unless defined $max_rsync_errors;
1075 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1076 die sprintf
1078 "Alert: Error while rsyncing: '%s', error count: %d, exiting now,",
1079 $err,
1080 $no_success_count,
1083 my $sleep = 12 * $no_success_count;
1084 $sleep = 120 if $sleep > 120;
1085 warn sprintf
1087 "Warning: %s, Error while rsyncing: '%s', sleeping %d",
1088 scalar(localtime($no_success_time)),
1089 $err,
1090 $sleep,
1092 sleep $sleep
1094 sub un_register_rsync_error {
1095 my($self) = @_;
1096 $no_success_time = 0;
1097 $no_success_count = 0;
1101 =head2 (void) $obj->unlock()
1103 Unlocking is implemented with an C<rmdir> on a locking directory
1104 (C<.lock> appended to $rfile).
1106 =cut
1108 sub unlock {
1109 my($self) = @_;
1110 return unless $self->_is_locked;
1111 my $rfile = $self->rfile;
1112 rmdir "$rfile.lock";
1113 $self->_is_locked (0);
1116 =head2 $ret = $obj->update ($path, $type)
1118 Enter one file into the local I<recentfile>. $path is the (usually
1119 absolute) path. If the path is outside the I<our> tree, then it is
1120 ignored.
1122 $type is one of C<new> or C<delete>.
1124 =cut
1126 sub update {
1127 my($self,$path,$type) = @_;
1128 die "update called without path argument" unless defined $path;
1129 die "update called without type argument" unless defined $type;
1130 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1131 my $canonmeth = $self->canonize;
1132 unless ($canonmeth) {
1133 $canonmeth = "naive_path_normalize";
1135 $path = $self->$canonmeth($path);
1136 my $lrd = $self->localroot;
1137 if ($path =~ s|^\Q$lrd\E||) {
1138 $path =~ s|^/||;
1139 my $interval = $self->interval;
1140 my $secs = $self->interval_secs();
1141 my $epoch = Time::HiRes::time;
1142 my $oldest_allowed = $epoch-$secs;
1144 $self->lock;
1145 my $recent = $self->recent_events;
1146 $recent ||= [];
1147 TRUNCATE: while (@$recent) {
1148 if ($recent->[-1]{epoch} < $oldest_allowed) {
1149 pop @$recent;
1150 } else {
1151 last TRUNCATE;
1154 # remove older duplicates of this $path, irrespective of $type:
1155 $recent = [ grep { $_->{path} ne $path } @$recent ];
1157 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1158 # sort?
1159 $self->write_recent($recent);
1160 $self->_assert_symlink;
1161 $self->unlock;
1165 =head2 $obj->write_recent ($recent_files_arrayref)
1167 Writes a I<recentfile> based on the current reflection of the current
1168 state of the tree limited by the current interval.
1170 =cut
1172 sub write_recent {
1173 my ($self,$recent) = @_;
1174 die "write_recent called without argument" unless defined $recent;
1175 my $meth = sprintf "write_%d", $self->protocol;
1176 $self->$meth($recent);
1179 =head2 $obj->write_0 ($recent_files_arrayref)
1181 Delegate of C<write_recent()> on protocol 0
1183 =cut
1185 sub write_0 {
1186 my ($self,$recent) = @_;
1187 my $rfile = $self->rfile;
1188 YAML::Syck::DumpFile("$rfile.new",$recent);
1189 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1192 =head2 $obj->write_1 ($recent_files_arrayref)
1194 Delegate of C<write_recent()> on protocol 1
1196 =cut
1198 sub write_1 {
1199 my ($self,$recent) = @_;
1200 my $rfile = $self->rfile;
1201 YAML::Syck::DumpFile("$rfile.new",{
1202 meta => $self->meta_data,
1203 recent => $recent,
1205 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1208 BEGIN {
1209 my @pod_lines =
1210 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1212 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1214 The idea is that we want to have a short file that records really
1215 recent changes. So that a fresh mirror can be kept fresh as long as
1216 the connectivity is given. Then we want longer files that record the
1217 history before. So when the mirror falls behind the update period
1218 reflected in the shortest file, it can switch to the next one. And if
1219 this is not long enough we want another one, again a bit longer. And
1220 we want one that completes the history back to the oldest file. For
1221 practical reasons the timespans of these files must overlap a bit and
1222 to keep the bandwidth necessities low they must not be
1223 updated too frequently. That's the basic idea. The following
1224 example represents a tree that has a few updates every day:
1226 RECENT-1h.yaml
1227 RECENT-6h.yaml
1228 RECENT-1d.yaml
1229 RECENT-1M.yaml
1230 RECENT-1W.yaml
1231 RECENT-1Q.yaml
1232 RECENT-1Y.yaml
1233 RECENT-Z.yaml
1235 The last file, the Z file, contains the complementary files that are
1236 in none of the other files. It does never contain C<deletes>. Besides
1237 this it serves the role of a recovery mechanism or spill over pond.
1238 When things go wrong, it's a valuable controlling instance to hold the
1239 differences between the collection of limited interval files and the
1240 actual filesystem.
1242 =head2 A SINGLE RECENTFILE
1244 A I<recentfile> consists of a hash that has two keys: C<meta> and
1245 C<recent>. The C<meta> part has metadata and the C<recent> part has a
1246 list of fileobjects.
1248 =head2 THE META PART
1250 Here we find things that are pretty much self explaining: all
1251 lowercase attributes are accessors and as such explained somewhere
1252 above in this manpage. The uppercase attribute C<Producers> contains
1253 version information about involved software components. Nothing to
1254 worry about as I believe.
1256 =head2 THE RECENT PART
1258 This is the interesting part. Every entry refers to some filesystem
1259 change (with path, epoch, type). The epoch value is the point in time
1260 when some change was I<registered>. Do not be tempted to believe that
1261 the entry has a direct relation to something like modification time or
1262 change time on the filesystem level. The timestamp (I<epoch> element)
1263 is a floating point number and does practically never correspond
1264 exactly to the data recorded in the filesystem but rather to the time
1265 when some process succeeded to report to the I<recentfile> mechanism
1266 that something has changed. This is why many parts of the code refer
1267 to I<events>, because we merely try to record the I<event> of the
1268 discovery of a change, not the time of the change itself.
1270 All these entries can be devided into two types (denoted by the
1271 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
1272 C<new>s. Deletes are C<delete>s.
1274 Another distinction is for objects with an epoch timestamp and others
1275 without. All files that were already existing on the filesystem before
1276 the I<recentfile> mechanism was installed, get recorded with a
1277 timestamp of zero.
1279 Besides an C<epoch> and a C<type> attribute we find a third one:
1280 C<path>. This path is relative to the directory we find the
1281 I<recentfile> in.
1283 The order of the entries in the I<recentfile> is by decreasing epoch
1284 attribute. These are either 0 or a unique floating point number. They
1285 are zero for events that were happening either before the time that
1286 the I<recentfile> mechanism was set up or were left undiscovered for a
1287 while and never handed over to update(). They are floating point
1288 numbers for all events being regularly handed to update(). And when
1289 the server has ntp running correctly, then the timestamps are
1290 actually decreasing and unique.
1292 =head1 CORRUPTION AND RECOVERY
1294 If the origin host breaks the promise to deliver consistent and
1295 complete I<recentfiles> then the way back to sanity shall be achieved
1296 through either the C<zloop> (still TBD) or traditional rsyncing
1297 between the hosts. For example, if the origin server forgets to deploy
1298 ntp and the clock on it jumps backwards some day, then this would
1299 probably go unnoticed for a while and many software components that
1300 rely on the time never running backwards will make wrong decisions.
1301 After some time this accident would probably still be found in one of
1302 the I<recentfiles> but would become meaningless as soon as a mirror
1303 has run through the sanitizing procedures. Same goes for origin hosts
1304 that forget to include or deliberately omit some files.
1306 =head1 SERIALIZERS
1308 The following suffixes are supported and trigger the use of these
1309 serializers:
1311 =over 4
1313 =item C<< ".yaml" => "YAML::Syck" >>
1315 =item C<< ".json" => "JSON" >>
1317 =item C<< ".sto" => "Storable" >>
1319 =item C<< ".dd" => "Data::Dumper" >>
1321 =back
1323 =cut
1325 BEGIN {
1326 my @pod_lines =
1327 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1329 =head1 INTERVAL SPEC
1331 An interval spec is a primitive way to express time spans. Normally it
1332 is composed from an integer and a letter.
1334 As a special case, a string that consists only of the single letter
1335 C<Z>, stands for unlimited time.
1337 The following letters express the specified number of seconds:
1339 =over 4
1341 =item C<< s => 1 >>
1343 =item C<< m => 60 >>
1345 =item C<< h => 60*60 >>
1347 =item C<< d => 60*60*24 >>
1349 =item C<< W => 60*60*24*7 >>
1351 =item C<< M => 60*60*24*30 >>
1353 =item C<< Q => 60*60*24*90 >>
1355 =item C<< Y => 60*60*24*365.25 >>
1357 =back
1359 =cut
1361 =head1 BACKGROUND
1363 This is about speeding up rsync operation on large trees to many
1364 places. Uses a small metadata cocktail and pull technology.
1366 =head2 NON-COMPETITORS
1368 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
1369 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
1370 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
1371 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
1372 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
1373 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
1375 =head2 COMPETITORS
1377 The problem to solve which clusters and ftp mirrors and otherwise
1378 replicated datasets like CPAN share: how to transfer only a minimum
1379 amount of data to determine the diff between two hosts.
1381 Normally it takes a long time to determine the diff itself before it
1382 can be transferred. Known solutions at the time of this writing are
1383 csync2, and rsync 3 batch mode.
1385 For many years the best solution was csync2 which solves the
1386 problem by maintining a sqlite database on both ends and talking a
1387 highly sophisticated protocol to quickly determine which files to send
1388 and which to delete at any given point in time. Csync2 is often
1389 inconvenient because the act of syncing demands quite an intimate
1390 relationship between the sender and the receiver and suffers when the
1391 number of syncing sites is large or connections are unreliable.
1393 Rsync 3 batch mode works around these problems by providing rsync-able
1394 batch files which allow receiving nodes to replay the history of the
1395 other nodes. This reduces the need to have an incestuous relation but
1396 it has the disadvantage that these batch files replicate the contents
1397 of the involved files. This seems inappropriate when the nodes already
1398 have a means of communicating over rsync.
1400 rersyncrecent solves this problem with a couple of (usually 2-10)
1401 index files which cover different overlapping time intervals. The
1402 master writes these files and the clients can construct the full tree
1403 from the information contained in them. The most recent index file
1404 usually covers the last seconds or minutes or hours of the tree and
1405 depending on the needs, slaves can rsync every few seconds and then
1406 bring their trees in full sync.
1408 The rersyncrecent mode was developed for CPAN but I hope it is a
1409 convenient and economic general purpose solution. I'm looking forward
1410 to see a CPAN backbone that is only a few seconds behind PAUSE. And
1411 then ... the first FUSE based CPAN filesystem anyone?
1413 =head1 AUTHOR
1415 Andreas König
1417 =head1 BUGS
1419 Please report any bugs or feature requests through the web interface
1421 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
1422 I will be notified, and then you'll automatically be notified of
1423 progress on your bug as I make changes.
1425 =head1 SUPPORT
1427 You can find documentation for this module with the perldoc command.
1429 perldoc File::Rsync::Mirror::Recentfile
1431 You can also look for information at:
1433 =over 4
1435 =item * RT: CPAN's request tracker
1437 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
1439 =item * AnnoCPAN: Annotated CPAN documentation
1441 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
1443 =item * CPAN Ratings
1445 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
1447 =item * Search CPAN
1449 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
1451 =back
1454 =head1 ACKNOWLEDGEMENTS
1456 Thanks to RJBS for module-starter.
1458 =head1 COPYRIGHT & LICENSE
1460 Copyright 2008 Andreas König, all rights reserved.
1462 This program is free software; you can redistribute it and/or modify it
1463 under the same terms as Perl itself.
1466 =cut
1468 1; # End of File::Rsync::Mirror::Recentfile
1470 # Local Variables:
1471 # mode: cperl
1472 # cperl-indent-level: 4
1473 # End: