describing the need of and a first implementation of the merged accessor and a test
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blobb30a99ec603a50d7d4d1831bd49b95b293e2025f
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 package File::Rsync::Mirror::Recentfile;
20 my $HAVE = {};
21 for my $package (qw( Data::Serializer File::Rsync )) {
22 $HAVE->{$package} = eval qq{ require $package; };
24 use File::Basename qw(dirname fileparse);
25 use File::Copy qw(cp);
26 use File::Path qw(mkpath);
27 use File::Temp;
28 use List::Util qw(first min);
29 use Scalar::Util qw(reftype);
30 use Storable;
31 use Time::HiRes qw();
32 use YAML::Syck;
34 use version; our $VERSION = qv('0.0.1');
37 use constant MAX_INT => ~0>>1; # anything better?
39 # cf. interval_secs
40 my %seconds;
42 # maybe subclass if this mapping is bad?
43 my %serializers;
45 =head1 SYNOPSIS
47 B<!!!! PRE-ALPHA ALERT !!!!>
49 Nothing in here is believed to be stable, nothing yet intended for
50 public consumption. The plan is to provide a script in one of the next
51 releases that acts as a frontend for all the backend functionality.
52 Option and method names will very likely change.
54 For the rationale see the section BACKGROUND.
56 This is published only for developers of the (yet to be named)
57 script(s).
59 Writer (of a single file):
61 use File::Rsync::Mirror::Recentfile;
62 my $fr = File::Rsync::Mirror::Recentfile->new
64 interval => q(6h),
65 filenameroot => "RECENT",
66 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
67 localroot => "/home/ftp/pub/PAUSE/authors/",
68 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
70 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
72 Reader/mirrorer:
74 my $rf = File::Rsync::Mirror::Recentfile->new
76 filenameroot => "RECENT",
77 ignore_link_stat_errors => 1,
78 interval => q(6h),
79 localroot => "/home/ftp/pub/PAUSE/authors",
80 remote_dir => "",
81 remote_host => "pause.perl.org",
82 remote_module => "authors",
83 rsync_options => {
84 compress => 1,
85 'rsync-path' => '/usr/bin/rsync',
86 links => 1,
87 times => 1,
88 'omit-dir-times' => 1,
89 checksum => 1,
91 verbose => 1,
93 $rf->mirror;
95 Aggregator (usually the writer):
97 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
98 $rf->aggregate;
100 =head1 EXPORT
102 No exports.
104 =head1 CONSTRUCTORS
106 =head2 my $obj = CLASS->new(%hash)
108 Constructor. On every argument pair the key is a method name and the
109 value is an argument to that method name.
111 =cut
113 sub new {
114 my($class, @args) = @_;
115 my $self = bless {}, $class;
116 while (@args) {
117 my($method,$arg) = splice @args, 0, 2;
118 $self->$method($arg);
120 unless (defined $self->protocol) {
121 $self->protocol(1);
123 unless (defined $self->filenameroot) {
124 $self->filenameroot("RECENT");
126 unless (defined $self->serializer_suffix) {
127 $self->serializer_suffix(".yaml");
129 return $self;
132 =head2 my $obj = CLASS->new_from_file($file)
134 Constructor. $file is a I<recentfile>.
136 =cut
138 sub new_from_file {
139 my($class, $file) = @_;
140 my $self = bless {}, $class;
141 $self->_rfile($file);
142 #?# $self->lock;
143 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
144 local $/;
145 <$fh>;
147 my($name,$path,$suffix) = fileparse $file, keys %serializers;
148 $self->serializer_suffix($suffix);
149 $self->localroot($path);
150 die "Could not determine file format from suffix" unless $suffix;
151 my $deserialized;
152 if ($HAVE->{"Data::Serializer"}) {
153 my $serializer = Data::Serializer->new
155 serializer => $serializers{$suffix},
156 secret => undef,
157 compress => 0,
158 digest => 0,
159 portable => 0,
160 encoding => "raw",
162 $deserialized = $serializer->deserialize($serialized);
163 } else {
164 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'"
165 unless $suffix eq ".yaml";
166 require YAML::Syck;
167 $deserialized = YAML::Syck::LoadFile($file);
169 while (my($k,$v) = each %{$deserialized->{meta}}) {
170 next if $k ne lc $k; # "Producers"
171 $self->$k($v);
173 unless (defined $self->protocol) {
174 $self->protocol(1);
176 return $self;
179 =head1 ACCESSORS
181 =cut
183 my @accessors;
185 BEGIN {
186 @accessors = (
187 "_current_tempfile",
188 "_interval",
189 "_is_locked",
190 "_localroot",
191 "_remotebase",
192 "_rfile",
193 "_rsync",
194 "_use_tempfile",
197 my @pod_lines =
198 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
200 =over 4
202 =item aggregator
204 A list of interval specs that tell the aggregator which I<recentfile>s
205 are to be produced.
207 =item canonize
209 The name of a method to canonize the path before rsyncing. Only
210 supported value is C<naive_path_normalize>. Defaults to that.
212 =item comment
214 A comment about this tree and setup.
216 =item filenameroot
218 The (prefix of the) filename we use for this I<recentfile>. Defaults to
219 C<RECENT>.
221 =item ignore_link_stat_errors
223 If set to true, rsync errors are ignored that complain about link stat
224 errors. These seem to happen only when there are files missing at the
225 origin. In race conditions this can always happen, so it is
226 recommended to set this value to true.
228 =item locktimeout
230 After how many seconds shall we die if we cannot lock a I<recentfile>?
231 Defaults to 600 seconds.
233 =item loopinterval
235 When mirror_loop is called, this accessor can specify how much time
236 every loop shall at least take. If the work of a loop is done before
237 that time has gone, sleeps for the rest of the time. Defaults to
238 arbitrary 42 seconds.
240 =item max_files_per_connection
242 Maximum number of files that are transferred on a single rsync call.
243 Setting it higher means higher performance at the price of holding
244 connections longer and potentially disturbing other users in the pool.
245 Defaults to the arbitrary value 42.
247 =item max_rsync_errors
249 When rsync operations encounter that many errors without any resetting
250 success in between, then we die. Defaults to -1 which means we run
251 forever ignoring all rsync errors.
253 =item merged
255 Hashref denoting when this recentfile has been merged into some other
256 at which epoch.
258 =item protocol
260 When the RECENT file format changes, we increment the protocol. We try
261 to support older protocols in later releases.
263 =item remote_dir
265 The directory we are mirroring from.
267 =item remote_host
269 The host we are mirroring from. Leave empty for the local filesystem.
271 =item remote_module
273 Rsync servers have so called modules to separate directory trees from
274 each other. Put here the name of the module under which we are
275 mirroring. Leave empty for local filesystem.
277 =item rsync_options
279 Things like compress, links, times or checksums. Passed in to the
280 File::Rsync object used to run the mirror.
282 =item serializer_suffix
284 Untested accessor. The only tested format for I<recentfile>s at the
285 moment is YAML. It is used with YAML::Syck via Data::Serializer. But
286 in principle other formats are supported as well. See section
287 SERIALIZERS below.
289 =item sleep_per_connection
291 Sleep that many seconds (floating point OK) after every chunk of rsyncing
292 has finished. Defaults to arbitrary 0.42.
294 =item verbose
296 Boolean to turn on a bit verbosity.
298 =back
300 =cut
302 use accessors @accessors;
304 =head1 METHODS
306 =head2 (void) $obj->aggregate
308 Takes all intervals that are collected in the accessor called
309 aggregator. Sorts them by actual length of the interval.
310 Removes those that are shorter than our own interval. Then merges this
311 object into the next larger object. The merging continues upwards
312 as long as the next I<recentfile> is old enough to warrant a merge.
314 If a merge is warranted is decided according to the interval of the
315 previous interval so that larger files are not so often updated as
316 smaller ones.
318 Here is an example to illustrate the behaviour. Given aggregators
320 1h 1d 1W 1M 1Q 1Y Z
322 then
324 1h updates 1d on every call to aggregate()
325 1d updates 1W earliest after 1h
326 1W updates 1M earliest after 1d
327 1M updates 1Q earliest after 1W
328 1Q updates 1Y earliest after 1M
329 1Y updates Z earliest after 1Q
331 Note that all but the smallest recentfile get updated at an arbitrary
332 rate and as such are quite useless on their own.
334 =cut
336 sub aggregate {
337 my($self) = @_;
338 my @aggs = sort { $a->{secs} <=> $b->{secs} }
339 grep { $_->{secs} >= $self->interval_secs }
340 map { { interval => $_, secs => $self->interval_secs($_)} }
341 $self->interval, @{$self->aggregator || []};
342 $aggs[0]{object} = $self;
343 AGGREGATOR: for my $i (0..$#aggs-1) {
344 my $this = $aggs[$i]{object};
345 my $next = Storable::dclone $this;
346 $next->interval($aggs[$i+1]{interval});
347 my $want_merge = 0;
348 if ($i == 0) {
349 $want_merge = 1;
350 } else {
351 my $next_rfile = $next->rfile;
352 if (-e $next_rfile) {
353 my $prev = $aggs[$i-1]{object};
354 local $^T = time;
355 my $next_age = 86400 * -M $next_rfile;
356 if ($next_age > $prev->interval_secs) {
357 $want_merge = 1;
359 } else {
360 $want_merge = 1;
363 if ($want_merge) {
364 $next->merge($this);
365 $aggs[$i+1]{object} = $next;
366 } else {
367 last AGGREGATOR;
372 sub _debug_aggregate {
373 my($self) = @_;
374 my @aggs = sort { $a->{secs} <=> $b->{secs} }
375 map { { interval => $_, secs => $self->interval_secs($_)} }
376 $self->interval, @{$self->aggregator || []};
377 my $report = [];
378 for my $i (0..$#aggs) {
379 my $this = Storable::dclone $self;
380 $this->interval($aggs[$i]{interval});
381 my $rfile = $this->rfile;
382 my @stat = stat $rfile;
383 push @$report, [$rfile, map {$stat[$_]||"undef"} 7,9];
385 $report;
388 # (void) $self->_assert_symlink()
389 sub _assert_symlink {
390 my($self) = @_;
391 my $symlink = File::Spec->catfile
393 $self->localroot,
394 sprintf
396 "%s.recent",
397 $self->filenameroot
400 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
401 if (-l $symlink) {
402 my $found_symlink = readlink $symlink;
403 if ($found_symlink eq $self->recentfile_basename) {
404 return;
405 } else {
406 $howto_create_symlink = 2;
408 } else {
409 $howto_create_symlink = 1;
411 if (1 == $howto_create_symlink) {
412 symlink $self->recentfile_basename, $symlink or die "Could not create symlink '$symlink': $!"
413 } else {
414 unlink "$symlink.$$"; # may fail
415 symlink $self->recentfile_basename, "$symlink.$$" or die "Could not create symlink '$symlink.$$': $!";
416 rename "$symlink.$$", $symlink or die "Could not rename '$symlink.$$' to $symlink: $!"; }
419 =head2 $success = $obj->full_mirror
421 (TBD) Mirrors the whole remote site, starting with the smallest I<recentfile>,
422 switching to larger ones ...
424 =cut
426 sub full_mirror {
427 my($self) = @_;
428 die "FIXME: Not yet implemented";
431 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile
433 Stores the remote I<recentfile> locally as a tempfile
435 =cut
437 sub get_remote_recentfile_as_tempfile {
438 my($self) = @_;
439 mkpath $self->localroot;
440 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
441 $self->recentfile_basename,
443 DIR => $self->localroot,
444 SUFFIX => $self->serializer_suffix,
445 UNLINK => 0,
447 my($trecentfile) = $fh->filename;
448 my $rfile = $self->rfile;
449 if (-e $rfile) {
450 # saving on bandwidth. Might need to be configurable
451 # $self->bandwidth_is_cheap?
452 cp $rfile, $trecentfile or die "Could not copy '$rfile' to '$trecentfile': $!"
454 while (!$self->rsync->exec(
455 src => join("/",
456 $self->remotebase,
457 $self->recentfile_basename),
458 dst => $trecentfile,
459 )) {
460 $self->register_rsync_error ($self->rsync->err);
462 $self->un_register_rsync_error ();
463 my $mode = 0644;
464 chmod $mode, $trecentfile or die "Could not chmod $mode '$trecentfile': $!";
465 $self->_current_tempfile ($trecentfile);
466 return $trecentfile;
469 =head2 $localpath = $obj->get_remotefile ( $relative_path )
471 Rsyncs one single remote file to local filesystem.
473 =cut
475 sub get_remotefile {
476 my($self, $path) = @_;
477 my $lfile = File::Spec->catfile($self->localroot, $path);
478 mkpath dirname $lfile;
479 while (!$self->rsync->exec(
480 src => join("/",
481 $self->remotebase,
482 $path),
483 dst => $lfile,
484 )) {
485 $self->register_rsync_error ($self->rsync->err);
487 $self->un_register_rsync_error ();
488 return $lfile;
491 =head2 $obj->interval ( $interval_spec )
493 Get/set accessor. $interval_spec is a string and described below in
494 the section INTERVAL SPEC.
496 =cut
498 sub interval {
499 my ($self, $interval) = @_;
500 if (@_ >= 2) {
501 $self->_interval($interval);
502 $self->_rfile(undef);
504 $interval = $self->_interval;
505 unless (defined $interval) {
506 # do not ask the $self too much, it recurses!
507 require Carp;
508 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
510 return $interval;
513 =head2 $secs = $obj->interval_secs ( $interval_spec )
515 $interval_spec is described below in the section INTERVAL SPEC. If
516 empty defaults to the inherent interval for this object.
518 =cut
520 sub interval_secs {
521 my ($self, $interval) = @_;
522 $interval ||= $self->interval;
523 unless (defined $interval) {
524 die "interval_secs() called without argument on an object without a declared one";
526 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
527 die "Could not determine seconds from interval[$interval]";
528 if ($interval eq "Z") {
529 return MAX_INT;
530 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
531 return $seconds{$t}*$n;
532 } else {
533 die "Invalid interval specification: n[$n]t[$t]";
537 =head2 $obj->localroot ( $localroot )
539 Get/set accessor. The local root of the tree.
541 =cut
543 sub localroot {
544 my ($self, $localroot) = @_;
545 if (@_ >= 2) {
546 $self->_localroot($localroot);
547 $self->_rfile(undef);
549 $localroot = $self->_localroot;
552 =head2 $ret = $obj->local_event_path
554 Misnomer, deprecated. Use local_path instead
556 =cut
558 sub local_event_path {
559 my($self,$path) = @_;
560 require Carp;
561 Carp::cluck("Deprecated method local_event_path called. Please use local_path instead");
562 my @p = split m|/|, $path; # rsync paths are always slash-separated
563 File::Spec->catfile($self->localroot,@p);
566 =head2 $ret = $obj->local_path($path_found_in_recentfile)
568 Combines the path to our local mirror and the path of an object found
569 in this I<recentfile>. In other words: the target of a mirror operation.
571 =cut
573 sub local_path {
574 my($self,$path) = @_;
575 unless (defined $path) {
576 return $self->localroot;
578 my @p = split m|/|, $path; # rsync paths are always slash-separated
579 File::Spec->catfile($self->localroot,@p);
582 =head2 (void) $obj->lock
584 Locking is implemented with an C<mkdir> on a locking directory
585 (C<.lock> appended to $rfile).
587 =cut
589 sub lock {
590 my ($self) = @_;
591 # not using flock because it locks on filehandles instead of
592 # old school ressources.
593 my $locked = $self->_is_locked and return;
594 my $rfile = $self->rfile;
595 # XXX need a way to allow breaking the lock
596 my $start = time;
597 my $locktimeout = $self->locktimeout || 600;
598 while (not mkdir "$rfile.lock") {
599 Time::HiRes::sleep 0.01;
600 if (time - $start > $locktimeout) {
601 die "Could not acquire lockdirectory '$rfile.lock': $!";
604 $self->_is_locked (1);
607 =head2 $ret = $obj->merge ($other)
609 Bulk update of this object with another one. It's intended (but not
610 enforced) to only merge smaller and younger $other objects into the
611 current one. If this file is a C<Z> file, then we do not merge in
612 objects of type C<delete>. But if we encounter an object of type
613 delete we delete the corresponding C<new> object.
615 =cut
617 sub merge {
618 my($self,$other) = @_;
619 $other->lock;
620 my $other_recent = $other->recent_events || [];
621 $self->lock;
622 my $my_recent = $self->recent_events || [];
624 # calculate the target time span
625 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
626 my $oldest_allowed = 0;
627 if ($epoch) {
628 $DB::single++;
629 if (my $merged = $self->merged) {
630 my $secs = $self->interval_secs();
631 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
633 # throw away outsiders
634 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
635 pop @$my_recent;
639 my %have;
640 my $recent = [];
641 for my $ev (@$other_recent) {
642 my $epoch = $ev->{epoch} || 0;
643 next if $epoch < $oldest_allowed;
644 my $path = $ev->{path};
645 next if $have{$path}++;
646 if ( $self->interval eq "Z"
647 and $ev->{type} eq "delete") {
648 } else {
649 push @$recent, { epoch => $ev->{epoch}, path => $path, type => $ev->{type} };
652 push @$recent, grep { !$have{$_->{path}}++ } @$my_recent;
653 $self->recent_events($recent);
654 $self->write_recent($recent);
655 $self->unlock;
656 $other->merged({
657 time => Time::HiRes::time, # not used anywhere
658 epoch => $epoch, # used in oldest_allowed
659 interval => $self->interval, # not used anywhere
661 $other->write_recent($other_recent);
662 $other->unlock;
665 =head2 $hashref = $obj->meta_data
667 Returns the hashref of metadata that the server has to add to the
668 I<recentfile>.
670 =cut
672 sub meta_data {
673 my($self) = @_;
674 my $ret = $self->{meta};
675 for my $m (
676 "aggregator",
677 "canonize",
678 "comment",
679 "filenameroot",
680 "merged",
681 "interval",
682 "protocol",
684 my $v = $self->$m;
685 if (defined $v) {
686 $ret->{$m} = $v;
689 # XXX need to reset the Producer if I am a writer, keep it when I
690 # am a reader
691 $ret->{Producers} ||= {
692 __PACKAGE__, "$VERSION", # stringified it looks better
694 return $ret;
697 =head2 $success = $obj->mirror ( %options )
699 Mirrors the files in this I<recentfile>. If $options{after} is
700 specified, only file events after this timestamp are being mirrored.
702 =cut
704 sub mirror {
705 my($self, %options) = @_;
706 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
707 my ($recent_data) = $self->recent_events_from_tempfile();
708 my $i = 0;
709 my @error;
710 my @collector;
711 my $last_item = $#$recent_data;
712 if (defined $options{after}) {
713 if ($recent_data->[0]{epoch} > $options{after}) {
714 if (
715 my $f = first
716 {$recent_data->[$_]{epoch} <= $options{after}}
717 0..$#$recent_data
719 $last_item = $f-1;
721 } else {
722 $last_item = -1;
725 ITEM: for my $i (0..$last_item) {
726 my $recent_event = $recent_data->[$i];
727 my $dst = $self->local_path($recent_event->{path});
728 if ($recent_event->{type} eq "new"){
729 if ($self->verbose) {
730 my $doing = -e $dst ? "Syncing" : "Getting";
731 printf STDERR
733 "%s (%d/%d) %s ... ",
734 $doing,
735 1+$i,
736 1+$last_item,
737 $recent_event->{path},
740 my $max_files_per_connection = $self->max_files_per_connection || 42;
741 my $success;
742 if ($max_files_per_connection == 1) {
743 # old code path may go away when the collector has
744 # proved useful...
745 $success = eval { $self->mirror_path($recent_event->{path}) };
746 } else {
747 if ($self->verbose) {
748 print STDERR "\n";
750 push @collector, $recent_event->{path};
751 if (@collector == $max_files_per_connection) {
752 $success = eval { $self->mirror_path(\@collector) };
753 @collector = ();
754 my $sleep = $self->sleep_per_connection;
755 $sleep = 0.42 unless defined $sleep;
756 Time::HiRes::sleep $sleep;
757 } else {
758 next ITEM;
761 if (!$success || $@) {
762 warn "Warning: Error while mirroring: $@";
763 push @error, $@;
764 sleep 1;
766 if ($self->verbose) {
767 print STDERR "DONE\n";
769 } elsif ($recent_event->{type} eq "delete") {
770 if (-l $dst or not -d _) {
771 unlink $dst or warn "Warning: Error while unlinking '$dst': $!";
772 } else {
773 rmdir $dst or warn "Warning: Error on rmdir '$dst': $!";
775 } else {
776 warn "Warning: invalid upload type '$recent_event->{type}'";
779 if (@collector) {
780 my $success = eval { $self->mirror_path(\@collector) };
781 if (!$success || $@) {
782 warn "Warning: Unknown error while mirroring: $@";
783 push @error, $@;
784 sleep 1;
786 if ($self->verbose) {
787 print STDERR "DONE\n";
790 rename $trecentfile, $self->rfile;
791 return !@error;
794 =head2 (void) $obj->mirror_loop
796 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
797 What happens/should happen if we miss the interval during a single loop?
799 =cut
801 sub mirror_loop {
802 my($self) = @_;
803 my $iteration_start = time;
805 my $Signal = 0;
806 $SIG{INT} = sub { $Signal++ };
807 my $loopinterval = $self->loopinterval || 42;
808 my $after = -999999999;
809 LOOP: while () {
810 $self->mirror($after);
811 last LOOP if $Signal;
812 my $re = $self->recent_events;
813 $after = $re->[0]{epoch};
814 if ($self->verbose) {
815 local $| = 1;
816 print "($after)";
818 if (time - $iteration_start < $loopinterval) {
819 sleep $iteration_start + $loopinterval - time;
821 if ($self->verbose) {
822 local $| = 1;
823 print "~";
828 =head2 $success = $obj->mirror_path ( $arrref | $path )
830 If the argument is a scalar it is treated as a path. The remote path
831 is mirrored into the local copy. $path is the path found in the
832 I<recentfile>, i.e. it is relative to the root directory of the
833 mirror.
835 If the argument is an array reference then all elements are treated as
836 a path below the current tree and all are rsynced with a single
837 command (and a single connection).
839 =cut
841 sub mirror_path {
842 my($self,$path) = @_;
843 # XXX simplify the two branches such that $path is treated as
844 # [$path] maybe even demand the argument as an arrayref to
845 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
846 # interface)
847 if (ref $path and ref $path eq "ARRAY") {
848 my $dst = $self->local_path();
849 mkpath dirname $dst;
850 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
851 lc $self->filenameroot,
853 TMPDIR => 1,
854 UNLINK => 0,
856 for my $p (@$path) {
857 print $fh $p, "\n";
859 $fh->flush;
860 $fh->unlink_on_destroy(1);
861 while (!$self->rsync->exec
863 src => join("/",
864 $self->remotebase,
866 dst => $dst,
867 'files-from' => $fh->filename,
868 )) {
869 my($err) = $self->rsync->err;
870 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
871 if ($self->verbose) {
872 warn "Info: ignoring link_stat error '$err'";
874 return 1;
876 $self->register_rsync_error ($err);
878 $self->un_register_rsync_error ();
879 } else {
880 my $dst = $self->local_path($path);
881 mkpath dirname $dst;
882 while (!$self->rsync->exec
884 src => join("/",
885 $self->remotebase,
886 $path
888 dst => $dst,
889 )) {
890 my($err) = $self->rsync->err;
891 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
892 if ($self->verbose) {
893 warn "Info: ignoring link_stat error '$err'";
895 return 1;
897 $self->register_rsync_error ($err);
899 $self->un_register_rsync_error ();
901 return 1;
904 =head2 $path = $obj->naive_path_normalize ($path)
906 Takes an absolute unix style path as argument and canonicalizes it to
907 a shorter path if possible, removing things like double slashes or
908 C</./> and removes references to C<../> directories to get a shorter
909 unambiguos path. This is used to make the code easier that determines
910 if a file passed to C<upgrade()> is indeed below our C<localroot>.
912 =cut
914 sub naive_path_normalize {
915 my($self,$path) = @_;
916 $path =~ s|/+|/|g;
917 1 while $path =~ s|/[^/]+/\.\./|/|;
918 $path =~ s|/$||;
919 $path;
922 =head2 $ret = $obj->read_recent_1 ( $recent_data )
924 Delegate of C<recent_events()> on protocol 1
926 =cut
928 sub read_recent_1 {
929 my($self,$data) = @_;
930 return $data->{recent};
933 =head2 $array_ref = $obj->recent_events
935 Note: the code relies on the resource being written atomically. We
936 cannot lock because we may have no write access. If the caller has
937 write access (eg. aggregate() or update()), it has to care for any
938 necessary locking.
940 =cut
942 sub recent_events {
943 my ($self) = @_;
944 my $rfile = $self->rfile;
945 my ($data) = eval {YAML::Syck::LoadFile($rfile);};
946 my $err = $@;
947 if ($err or !$data) {
948 return [];
950 if (reftype $data eq 'ARRAY') { # protocol 0
951 return $data;
952 } else {
953 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
954 return $self->$meth($data);
958 =head2 $array_ref = $obj->recent_events_from_tempfile
960 Reads the file-events in the temporary local mirror of the remote file.
962 =cut
964 sub recent_events_from_tempfile {
965 my ($self) = @_;
966 $self->_use_tempfile(1);
967 my $ret = $self->recent_events;
968 $self->_use_tempfile(0);
969 return $ret;
972 =head2 $ret = $obj->recentfile
974 deprecated, use rfile instead
976 =cut
978 sub recentfile {
979 my($self) = @_;
980 require Carp;
981 Carp::cluck("deprecated method recentfile called. Please use rfile instead!");
982 my $recent = File::Spec->catfile(
983 $self->localroot,
984 $self->recentfile_basename,
986 return $recent;
989 =head2 $ret = $obj->recentfile_basename
991 Just the basename of our I<recentfile>, composed from C<filenameroot>,
992 C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
994 =cut
996 sub recentfile_basename {
997 my($self) = @_;
998 my $file = sprintf("%s-%s%s",
999 $self->filenameroot,
1000 $self->interval,
1001 $self->serializer_suffix,
1003 return $file;
1006 =head2 $str = $obj->remotebase
1008 =head2 (void) $obj->remotebase ( $set )
1010 Get/Set the composed prefix needed when rsyncing from a remote module.
1011 If remote_host, remote_module, and remote_dir are set, it is composed
1012 from these.
1014 =cut
1016 sub remotebase {
1017 my($self, $set) = @_;
1018 if (defined $set) {
1019 $self->_remotebase($set);
1021 my $remotebase = $self->_remotebase;
1022 unless (defined $remotebase) {
1023 $remotebase = sprintf
1025 "%s%s%s",
1026 defined $self->remote_host ? ($self->remote_host."::") : "",
1027 defined $self->remote_module ? ($self->remote_module."/") : "",
1028 defined $self->remote_dir ? $self->remote_dir : "",
1030 $self->_remotebase($remotebase);
1032 return $remotebase;
1035 =head2 my $rfile = $obj->rfile
1037 Returns the full path of the I<recentfile>
1039 =cut
1041 sub rfile {
1042 my($self) = @_;
1043 if ($self->_use_tempfile) {
1044 return $self->_current_tempfile;
1045 } else {
1046 my $rfile = $self->_rfile;
1047 return $rfile if defined $rfile;
1048 $rfile = File::Spec->catfile
1049 ($self->localroot,
1050 $self->recentfile_basename,
1052 $self->_rfile ($rfile);
1053 return $rfile;
1057 =head2 $rsync_obj = $obj->rsync
1059 The File::Rsync object that this object uses for communicating with an
1060 upstream server.
1062 =cut
1064 sub rsync {
1065 my($self) = @_;
1066 my $rsync = $self->_rsync;
1067 unless (defined $rsync) {
1068 my $rsync_options = $self->rsync_options || {};
1069 if ($HAVE->{"File::Rsync"}) {
1070 $rsync = File::Rsync->new($rsync_options);
1071 $self->_rsync($rsync);
1072 } else {
1073 die "File::Rsync required for rsync operations. Cannot continue";
1076 return $rsync;
1079 =head2 (void) $obj->register_rsync_error($err)
1081 =head2 (void) $obj->un_register_rsync_error()
1083 Register_rsync_error is called whenever the File::Rsync object fails
1084 on an exec (say, connection doesn't succeed). It issues a warning and
1085 sleeps for an increasing amount of time. Un_register_rsync_error
1086 resets the error count. See also accessor C<max_rsync_errors>.
1088 =cut
1091 my $no_success_count = 0;
1092 my $no_success_time = 0;
1093 sub register_rsync_error {
1094 my($self, $err) = @_;
1095 chomp $err;
1096 $no_success_time = time;
1097 $no_success_count++;
1098 my $max_rsync_errors = $self->max_rsync_errors;
1099 $max_rsync_errors = -1 unless defined $max_rsync_errors;
1100 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1101 die sprintf
1103 "Alert: Error while rsyncing: '%s', error count: %d, exiting now,",
1104 $err,
1105 $no_success_count,
1108 my $sleep = 12 * $no_success_count;
1109 $sleep = 120 if $sleep > 120;
1110 warn sprintf
1112 "Warning: %s, Error while rsyncing: '%s', sleeping %d",
1113 scalar(localtime($no_success_time)),
1114 $err,
1115 $sleep,
1117 sleep $sleep
1119 sub un_register_rsync_error {
1120 my($self) = @_;
1121 $no_success_time = 0;
1122 $no_success_count = 0;
1126 =head2 (void) $obj->unlock()
1128 Unlocking is implemented with an C<rmdir> on a locking directory
1129 (C<.lock> appended to $rfile).
1131 =cut
1133 sub unlock {
1134 my($self) = @_;
1135 return unless $self->_is_locked;
1136 my $rfile = $self->rfile;
1137 rmdir "$rfile.lock";
1138 $self->_is_locked (0);
1141 =head2 $ret = $obj->update ($path, $type)
1143 Enter one file into the local I<recentfile>. $path is the (usually
1144 absolute) path. If the path is outside the I<our> tree, then it is
1145 ignored.
1147 $type is one of C<new> or C<delete>.
1149 =cut
1151 sub update {
1152 my($self,$path,$type) = @_;
1153 die "update called without path argument" unless defined $path;
1154 die "update called without type argument" unless defined $type;
1155 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1156 my $canonmeth = $self->canonize;
1157 unless ($canonmeth) {
1158 $canonmeth = "naive_path_normalize";
1160 $path = $self->$canonmeth($path);
1161 my $lrd = $self->localroot;
1162 if ($path =~ s|^\Q$lrd\E||) {
1163 $path =~ s|^/||;
1164 my $interval = $self->interval;
1165 my $secs = $self->interval_secs();
1166 my $epoch = Time::HiRes::time;
1167 # XXX next four lines copy&paste from merge()
1168 my $oldest_allowed = 0;
1169 if (my $merged = $self->merged) {
1170 my $secs = $self->interval_secs();
1171 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
1174 $self->lock;
1175 my $recent = $self->recent_events;
1176 $recent ||= [];
1177 TRUNCATE: while (@$recent) {
1178 if ($recent->[-1]{epoch} < $oldest_allowed) {
1179 pop @$recent;
1180 } else {
1181 last TRUNCATE;
1184 # remove older duplicates of this $path, irrespective of $type:
1185 $recent = [ grep { $_->{path} ne $path } @$recent ];
1187 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1188 $self->write_recent($recent);
1189 $self->_assert_symlink;
1190 $self->unlock;
1194 =head2 $obj->write_recent ($recent_files_arrayref)
1196 Writes a I<recentfile> based on the current reflection of the current
1197 state of the tree limited by the current interval.
1199 =cut
1201 sub write_recent {
1202 my ($self,$recent) = @_;
1203 die "write_recent called without argument" unless defined $recent;
1204 my $meth = sprintf "write_%d", $self->protocol;
1205 $self->$meth($recent);
1208 =head2 $obj->write_0 ($recent_files_arrayref)
1210 Delegate of C<write_recent()> on protocol 0
1212 =cut
1214 sub write_0 {
1215 my ($self,$recent) = @_;
1216 my $rfile = $self->rfile;
1217 YAML::Syck::DumpFile("$rfile.new",$recent);
1218 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1221 =head2 $obj->write_1 ($recent_files_arrayref)
1223 Delegate of C<write_recent()> on protocol 1
1225 =cut
1227 sub write_1 {
1228 my ($self,$recent) = @_;
1229 my $rfile = $self->rfile;
1230 YAML::Syck::DumpFile("$rfile.new",{
1231 meta => $self->meta_data,
1232 recent => $recent,
1234 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1237 BEGIN {
1238 my @pod_lines =
1239 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1241 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1243 The idea is that we want to have a short file that records really
1244 recent changes. So that a fresh mirror can be kept fresh as long as
1245 the connectivity is given. Then we want longer files that record the
1246 history before. So when the mirror falls behind the update period
1247 reflected in the shortest file, it can switch to the next one. And if
1248 this is not long enough we want another one, again a bit longer. And
1249 we want one that completes the history back to the oldest file. For
1250 practical reasons the timespans of these files must overlap a bit and
1251 to keep the bandwidth necessities low they must not be
1252 updated too frequently. That's the basic idea. The following
1253 example represents a tree that has a few updates every day:
1255 RECENT-1h.yaml
1256 RECENT-6h.yaml
1257 RECENT-1d.yaml
1258 RECENT-1M.yaml
1259 RECENT-1W.yaml
1260 RECENT-1Q.yaml
1261 RECENT-1Y.yaml
1262 RECENT-Z.yaml
1264 The last file, the Z file, contains the complementary files that are
1265 in none of the other files. It does never contain C<deletes>. Besides
1266 this it serves the role of a recovery mechanism or spill over pond.
1267 When things go wrong, it's a valuable controlling instance to hold the
1268 differences between the collection of limited interval files and the
1269 actual filesystem.
1271 =head2 A SINGLE RECENTFILE
1273 A I<recentfile> consists of a hash that has two keys: C<meta> and
1274 C<recent>. The C<meta> part has metadata and the C<recent> part has a
1275 list of fileobjects.
1277 =head2 THE META PART
1279 Here we find things that are pretty much self explaining: all
1280 lowercase attributes are accessors and as such explained somewhere
1281 above in this manpage. The uppercase attribute C<Producers> contains
1282 version information about involved software components. Nothing to
1283 worry about as I believe.
1285 =head2 THE RECENT PART
1287 This is the interesting part. Every entry refers to some filesystem
1288 change (with path, epoch, type). The epoch value is the point in time
1289 when some change was I<registered>. Do not be tempted to believe that
1290 the entry has a direct relation to something like modification time or
1291 change time on the filesystem level. The timestamp (I<epoch> element)
1292 is a floating point number and does practically never correspond
1293 exactly to the data recorded in the filesystem but rather to the time
1294 when some process succeeded to report to the I<recentfile> mechanism
1295 that something has changed. This is why many parts of the code refer
1296 to I<events>, because we merely try to record the I<event> of the
1297 discovery of a change, not the time of the change itself.
1299 All these entries can be devided into two types (denoted by the
1300 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
1301 C<new>s. Deletes are C<delete>s.
1303 Another distinction is for objects with an epoch timestamp and others
1304 without. All files that were already existing on the filesystem before
1305 the I<recentfile> mechanism was installed, get recorded with a
1306 timestamp of zero.
1308 Besides an C<epoch> and a C<type> attribute we find a third one:
1309 C<path>. This path is relative to the directory we find the
1310 I<recentfile> in.
1312 The order of the entries in the I<recentfile> is by decreasing epoch
1313 attribute. These are either 0 or a unique floating point number. They
1314 are zero for events that were happening either before the time that
1315 the I<recentfile> mechanism was set up or were left undiscovered for a
1316 while and never handed over to update(). They are floating point
1317 numbers for all events being regularly handed to update(). And when
1318 the server has ntp running correctly, then the timestamps are
1319 actually decreasing and unique.
1321 =head1 CORRUPTION AND RECOVERY
1323 If the origin host breaks the promise to deliver consistent and
1324 complete I<recentfiles> then the way back to sanity shall be achieved
1325 through either the C<zloop> (still TBD) or traditional rsyncing
1326 between the hosts. For example, if the origin server forgets to deploy
1327 ntp and the clock on it jumps backwards some day, then this would
1328 probably go unnoticed for a while and many software components that
1329 rely on the time never running backwards will make wrong decisions.
1330 After some time this accident would probably still be found in one of
1331 the I<recentfiles> but would become meaningless as soon as a mirror
1332 has run through the sanitizing procedures. Same goes for origin hosts
1333 that forget to include or deliberately omit some files.
1335 =head1 SERIALIZERS
1337 The following suffixes are supported and trigger the use of these
1338 serializers:
1340 =over 4
1342 =item C<< ".yaml" => "YAML::Syck" >>
1344 =item C<< ".json" => "JSON" >>
1346 =item C<< ".sto" => "Storable" >>
1348 =item C<< ".dd" => "Data::Dumper" >>
1350 =back
1352 =cut
1354 BEGIN {
1355 my @pod_lines =
1356 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1358 =head1 INTERVAL SPEC
1360 An interval spec is a primitive way to express time spans. Normally it
1361 is composed from an integer and a letter.
1363 As a special case, a string that consists only of the single letter
1364 C<Z>, stands for unlimited time.
1366 The following letters express the specified number of seconds:
1368 =over 4
1370 =item C<< s => 1 >>
1372 =item C<< m => 60 >>
1374 =item C<< h => 60*60 >>
1376 =item C<< d => 60*60*24 >>
1378 =item C<< W => 60*60*24*7 >>
1380 =item C<< M => 60*60*24*30 >>
1382 =item C<< Q => 60*60*24*90 >>
1384 =item C<< Y => 60*60*24*365.25 >>
1386 =back
1388 =cut
1390 =head1 BACKGROUND
1392 This is about speeding up rsync operation on large trees to many
1393 places. Uses a small metadata cocktail and pull technology.
1395 =head2 NON-COMPETITORS
1397 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
1398 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
1399 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
1400 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
1401 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
1402 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
1404 =head2 COMPETITORS
1406 The problem to solve which clusters and ftp mirrors and otherwise
1407 replicated datasets like CPAN share: how to transfer only a minimum
1408 amount of data to determine the diff between two hosts.
1410 Normally it takes a long time to determine the diff itself before it
1411 can be transferred. Known solutions at the time of this writing are
1412 csync2, and rsync 3 batch mode.
1414 For many years the best solution was csync2 which solves the
1415 problem by maintining a sqlite database on both ends and talking a
1416 highly sophisticated protocol to quickly determine which files to send
1417 and which to delete at any given point in time. Csync2 is often
1418 inconvenient because the act of syncing demands quite an intimate
1419 relationship between the sender and the receiver and suffers when the
1420 number of syncing sites is large or connections are unreliable.
1422 Rsync 3 batch mode works around these problems by providing rsync-able
1423 batch files which allow receiving nodes to replay the history of the
1424 other nodes. This reduces the need to have an incestuous relation but
1425 it has the disadvantage that these batch files replicate the contents
1426 of the involved files. This seems inappropriate when the nodes already
1427 have a means of communicating over rsync.
1429 rersyncrecent solves this problem with a couple of (usually 2-10)
1430 index files which cover different overlapping time intervals. The
1431 master writes these files and the clients can construct the full tree
1432 from the information contained in them. The most recent index file
1433 usually covers the last seconds or minutes or hours of the tree and
1434 depending on the needs, slaves can rsync every few seconds and then
1435 bring their trees in full sync.
1437 The rersyncrecent mode was developed for CPAN but I hope it is a
1438 convenient and economic general purpose solution. I'm looking forward
1439 to see a CPAN backbone that is only a few seconds behind PAUSE. And
1440 then ... the first FUSE based CPAN filesystem anyone?
1442 =head1 AUTHOR
1444 Andreas König
1446 =head1 BUGS
1448 Please report any bugs or feature requests through the web interface
1450 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
1451 I will be notified, and then you'll automatically be notified of
1452 progress on your bug as I make changes.
1454 =head1 SUPPORT
1456 You can find documentation for this module with the perldoc command.
1458 perldoc File::Rsync::Mirror::Recentfile
1460 You can also look for information at:
1462 =over 4
1464 =item * RT: CPAN's request tracker
1466 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
1468 =item * AnnoCPAN: Annotated CPAN documentation
1470 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
1472 =item * CPAN Ratings
1474 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
1476 =item * Search CPAN
1478 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
1480 =back
1483 =head1 ACKNOWLEDGEMENTS
1485 Thanks to RJBS for module-starter.
1487 =head1 COPYRIGHT & LICENSE
1489 Copyright 2008 Andreas König, all rights reserved.
1491 This program is free software; you can redistribute it and/or modify it
1492 under the same terms as Perl itself.
1495 =cut
1497 1; # End of File::Rsync::Mirror::Recentfile
1499 # Local Variables:
1500 # mode: cperl
1501 # cperl-indent-level: 4
1502 # End: