fix bug that calculated the timespan of recentfiles wrong (too long); add a test...
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob8613409260db4fc7af5373dabb48b07154460d1c
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 package File::Rsync::Mirror::Recentfile;
20 use Data::Serializer;
21 use File::Basename qw(dirname fileparse);
22 use File::Copy qw(cp);
23 use File::Path qw(mkpath);
24 use File::Rsync;
25 use File::Temp;
26 use List::Util qw(first);
27 use Scalar::Util qw(reftype);
28 use Storable;
29 use Time::HiRes qw();
30 use YAML::Syck;
32 use version; our $VERSION = qv('0.0.1');
35 use constant MAX_INT => ~0>>1; # anything better?
37 # cf. interval_secs
38 my %seconds;
40 # maybe subclass if this mapping is bad?
41 my %serializers;
43 =head1 SYNOPSIS
45 B<!!!! PRE-ALPHA ALERT !!!!>
47 Nothing in here is believed to be stable, nothing yet intended for
48 public consumption. The plan is to provide a script in one of the next
49 releases that acts as a frontend for all the backend functionality.
50 Option and method names will very likely change.
52 For the rationale see the section BACKGROUND.
54 This is published only for developers of the (yet to be named)
55 script(s).
57 Writer (of a single file):
59 use File::Rsync::Mirror::Recentfile;
60 my $fr = File::Rsync::Mirror::Recentfile->new
62 interval => q(6h),
63 filenameroot => "RECENT",
64 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
65 localroot => "/home/ftp/pub/PAUSE/authors/",
66 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
68 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
70 Reader/mirrorer:
72 my $rf = File::Rsync::Mirror::Recentfile->new
74 filenameroot => "RECENT",
75 ignore_link_stat_errors => 1,
76 interval => q(6h),
77 localroot => "/home/ftp/pub/PAUSE/authors",
78 remote_dir => "",
79 remote_host => "pause.perl.org",
80 remote_module => "authors",
81 rsync_options => {
82 compress => 1,
83 'rsync-path' => '/usr/bin/rsync',
84 links => 1,
85 times => 1,
86 'omit-dir-times' => 1,
87 checksum => 1,
89 verbose => 1,
91 $rf->mirror;
93 Aggregator (usually the writer):
95 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
96 $rf->aggregate;
98 =head1 EXPORT
100 No exports.
102 =head1 CONSTRUCTORS
104 =head2 my $obj = CLASS->new(%hash)
106 Constructor. On every argument pair the key is a method name and the
107 value is an argument to that method name.
109 =cut
111 sub new {
112 my($class, @args) = @_;
113 my $self = bless {}, $class;
114 while (@args) {
115 my($method,$arg) = splice @args, 0, 2;
116 $self->$method($arg);
118 unless (defined $self->protocol) {
119 $self->protocol(1);
121 unless (defined $self->filenameroot) {
122 $self->filenameroot("RECENT");
124 unless (defined $self->serializer_suffix) {
125 $self->serializer_suffix(".yaml");
127 return $self;
130 =head2 my $obj = CLASS->new_from_file($file)
132 Constructor. $file is a I<recentfile>.
134 =cut
136 sub new_from_file {
137 my($class, $file) = @_;
138 my $self = bless {}, $class;
139 $self->_rfile($file);
140 #?# $self->lock;
141 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
142 local $/;
143 <$fh>;
145 my($name,$path,$suffix) = fileparse $file, keys %serializers;
146 $self->serializer_suffix($suffix);
147 $self->localroot($path);
148 die "Could not determine file format from suffix" unless $suffix;
149 my $serializer = Data::Serializer->new
151 serializer => $serializers{$suffix},
152 secret => undef,
153 compress => 0,
154 digest => 0,
155 portable => 0,
156 encoding => "raw",
158 my $deserialized = $serializer->deserialize($serialized);
159 while (my($k,$v) = each %{$deserialized->{meta}}) {
160 next if $k ne lc $k; # "Producers"
161 $self->$k($v);
163 unless (defined $self->protocol) {
164 $self->protocol(1);
166 return $self;
169 =head1 ACCESSORS
171 =cut
173 my @accessors;
175 BEGIN {
176 @accessors = (
177 "_current_tempfile",
178 "_interval",
179 "_is_locked",
180 "_localroot",
181 "_remotebase",
182 "_rfile",
183 "_rsync",
184 "_use_tempfile",
187 my @pod_lines =
188 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
190 =over 4
192 =item aggregator
194 A list of interval specs that tell the aggregator which I<recentfile>s
195 are to be produced.
197 =item canonize
199 The name of a method to canonize the path before rsyncing. Only
200 supported value is C<naive_path_normalize>. Defaults to that.
202 =item comment
204 A comment about this tree and setup.
206 =item filenameroot
208 The (prefix of the) filename we use for this I<recentfile>. Defaults to
209 C<RECENT>.
211 =item ignore_link_stat_errors
213 If set to true, rsync errors are ignored that complain about link stat
214 errors. These seem to happen only when there are files missing at the
215 origin. In race conditions this can always happen, so it is
216 recommended to set this value to true.
218 =item locktimeout
220 After how many seconds shall we die if we cannot lock a I<recentfile>?
221 Defaults to 600 seconds.
223 =item loopinterval
225 When mirror_loop is called, this accessor can specify how much time
226 every loop shall at least take. If the work of a loop is done before
227 that time has gone, sleeps for the rest of the time. Defaults to
228 arbitrary 42 seconds.
230 =item max_files_per_connection
232 Maximum number of files that are transferred on a single rsync call.
233 Setting it higher means higher performance at the price of holding
234 connections longer and potentially disturbing other users in the pool.
235 Defaults to the arbitrary value 42.
237 =item protocol
239 When the RECENT file format changes, we increment the protocol. We try
240 to support older protocols in later releases.
242 =item remote_dir
244 The directory we are mirroring from.
246 =item remote_host
248 The host we are mirroring from. Leave empty for the local filesystem.
250 =item remote_module
252 Rsync servers have so called modules to separate directory trees from
253 each other. Put here the name of the module under which we are
254 mirroring. Leave empty for local filesystem.
256 =item rsync_options
258 Things like compress, links, times or checksums. Passed in to the
259 File::Rsync object used to run the mirror.
261 =item serializer_suffix
263 Untested accessor. The only tested format for I<recentfile>s at the
264 moment is YAML. It is used with YAML::Syck via Data::Serializer. But
265 in principle other formats are supported as well. See section
266 SERIALIZERS below.
268 =item sleep_per_connection
270 Sleep that many seconds (floating point OK) after every chunk of rsyncing
271 has finished. Defaults to arbitrary 0.42.
273 =item verbose
275 Boolean to turn on a bit verbosity.
277 =back
279 =cut
281 use accessors @accessors;
283 =head1 METHODS
285 =head2 (void) $obj->aggregate
287 Takes all intervals that are collected in the accessor called
288 aggregator. Sorts them numerically by actual length of the interval.
289 Removes those that are shorter than our own interval. Then merges this
290 object into the next larger object. The merging continues upwards
291 as long as the next I<recentfile>s is old enough to warrant a merge.
293 If a merge is warranted is decided according to the interval of the
294 previous interval so that larger files are not so often updated as
295 smaller ones.
297 Here is an example to illustrate the behaviour. Given aggregators
299 1h 1d 1W 1M 1Q 1Y Z
301 then
303 1h updates 1d on every call to aggregate()
304 1d updates 1W earliest after 1h
305 1W updates 1M earliest after 1d
306 1M updates 1Q earliest after 1W
307 1Q updates 1Y earliest after 1M
308 1Y updates Z earliest after 1Q
310 =cut
312 sub aggregate {
313 my($self) = @_;
314 my @aggs = sort { $a->{secs} <=> $b->{secs} }
315 grep { $_->{secs} >= $self->interval_secs }
316 map { { interval => $_, secs => $self->interval_secs($_)} }
317 $self->interval, @{$self->aggregator || []};
318 $aggs[0]{object} = $self;
319 AGGREGATOR: for my $i (0..$#aggs-1) {
320 my $this = $aggs[$i]{object};
321 my $next = Storable::dclone $this;
322 $next->interval($aggs[$i+1]{interval});
323 my $want_merge = 0;
324 if ($i == 0) {
325 $want_merge = 1;
326 } else {
327 my $next_rfile = $next->rfile;
328 if (-e $next_rfile) {
329 my $prev = $aggs[$i-1]{object};
330 local $^T = time;
331 my $next_age = 86400 * -M $next_rfile;
332 if ($next_age > $prev->interval_secs) {
333 $want_merge = 1;
335 } else {
336 $want_merge = 1;
339 if ($want_merge) {
340 $next->merge($this);
341 $aggs[$i+1]{object} = $next;
342 } else {
343 last AGGREGATOR;
348 sub _debug_aggregate {
349 my($self) = @_;
350 my @aggs = sort { $a->{secs} <=> $b->{secs} }
351 map { { interval => $_, secs => $self->interval_secs($_)} }
352 $self->interval, @{$self->aggregator || []};
353 my $report = [];
354 for my $i (0..$#aggs) {
355 my $this = Storable::dclone $self;
356 $this->interval($aggs[$i]{interval});
357 my $rfile = $this->rfile;
358 my @stat = stat $rfile;
359 push @$report, [$rfile, map {$stat[$_]||"undef"} 7,9];
361 $report;
364 # (void) $self->_assert_symlink()
365 sub _assert_symlink {
366 my($self) = @_;
367 my $symlink = File::Spec->catfile
369 $self->localroot,
370 sprintf
372 "%s.recent",
373 $self->filenameroot
376 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
377 if (-l $symlink) {
378 my $found_symlink = readlink $symlink;
379 if ($found_symlink eq $self->recentfile_basename) {
380 return;
381 } else {
382 $howto_create_symlink = 2;
384 } else {
385 $howto_create_symlink = 1;
387 if (1 == $howto_create_symlink) {
388 symlink $self->recentfile_basename, $symlink or die "Could not create symlink '$symlink': $!"
389 } else {
390 unlink "$symlink.$$"; # may fail
391 symlink $self->recentfile_basename, "$symlink.$$" or die "Could not create symlink '$symlink.$$': $!";
392 rename "$symlink.$$", $symlink or die "Could not rename '$symlink.$$' to $symlink: $!"; }
395 =head2 $success = $obj->full_mirror
397 (TBD) Mirrors the whole remote site, starting with the smallest I<recentfile>,
398 switching to larger ones ...
400 =cut
402 sub full_mirror {
403 my($self) = @_;
404 warn "Not yet implemented";
407 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile
409 Stores the remote I<recentfile> locally as a tempfile
411 =cut
413 sub get_remote_recentfile_as_tempfile {
414 my($self) = @_;
415 mkpath $self->localroot;
416 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
417 $self->recentfile_basename,
419 DIR => $self->localroot,
420 SUFFIX => $self->serializer_suffix,
421 UNLINK => 0,
423 my($trecentfile) = $fh->filename;
424 my $rfile = $self->rfile;
425 if (-e $rfile) {
426 # saving on bandwidth. Might need to be configurable
427 # $self->bandwidth_is_cheap?
428 cp $rfile, $trecentfile or die "Could not copy '$rfile' to '$trecentfile': $!"
430 while (!$self->rsync->exec(
431 src => join("/",
432 $self->remotebase,
433 $self->recentfile_basename),
434 dst => $trecentfile,
435 )) {
436 $self->register_rsync_error ($self->rsync->err);
438 $self->un_register_rsync_error ();
439 my $mode = 0644;
440 chmod $mode, $trecentfile or die "Could not chmod $mode '$trecentfile': $!";
441 $self->_current_tempfile ($trecentfile);
442 return $trecentfile;
445 =head2 $obj->interval ( $interval_spec )
447 Get/set accessor. $interval_spec is a string and described below in
448 the section INTERVAL SPEC.
450 =cut
452 sub interval {
453 my ($self, $interval) = @_;
454 if (@_ >= 2) {
455 $self->_interval($interval);
456 $self->_rfile(undef);
458 $interval = $self->_interval;
459 unless (defined $interval) {
460 # do not ask the $self too much, it recurses!
461 die "Alert: interval undefined for '".$self."'. Cannot continue.";
463 return $interval;
466 =head2 $secs = $obj->interval_secs ( $interval_spec )
468 $interval_spec is described below in the section INTERVAL SPEC. If
469 empty defaults to the inherent interval for this object.
471 =cut
473 sub interval_secs {
474 my ($self, $interval) = @_;
475 $interval ||= $self->interval;
476 unless (defined $interval) {
477 die "interval_secs() called without argument on an object without a declared one";
479 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
480 die "Could not determine seconds from interval[$interval]";
481 if ($interval eq "Z") {
482 return MAX_INT;
483 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
484 return $seconds{$t}*$n;
485 } else {
486 die "Invalid interval specification: n[$n]t[$t]";
490 =head2 $obj->localroot ( $localroot )
492 Get/set accessor. The local root of the tree.
494 =cut
496 sub localroot {
497 my ($self, $localroot) = @_;
498 if (@_ >= 2) {
499 $self->_localroot($localroot);
500 $self->_rfile(undef);
502 $localroot = $self->_localroot;
505 =head2 $ret = $obj->local_event_path
507 Misnomer, deprecated. Use local_path instead
509 =cut
511 sub local_event_path {
512 my($self,$path) = @_;
513 require Carp;
514 Carp::cluck("Deprecated method local_event_path called. Please use local_path instead");
515 my @p = split m|/|, $path; # rsync paths are always slash-separated
516 File::Spec->catfile($self->localroot,@p);
519 =head2 $ret = $obj->local_path($path_found_in_recentfile)
521 Combines the path to our local mirror and the path of an object found
522 in this I<recentfile>. In other words: the target of a mirro operation
524 =cut
526 sub local_path {
527 my($self,$path) = @_;
528 unless (defined $path) {
529 return $self->localroot;
531 my @p = split m|/|, $path; # rsync paths are always slash-separated
532 File::Spec->catfile($self->localroot,@p);
535 =head2 (void) $obj->lock
537 Locking is implemented with an C<mkdir> on a locking directory
538 (C<.lock> appended to $rfile).
540 =cut
542 sub lock {
543 my ($self) = @_;
544 # not using flock because it locks on filehandles instead of
545 # old school ressources.
546 my $locked = $self->_is_locked and return;
547 my $rfile = $self->rfile;
548 # XXX need a way to allow breaking the lock
549 my $start = time;
550 my $locktimeout = $self->locktimeout || 600;
551 while (not mkdir "$rfile.lock") {
552 Time::HiRes::sleep 0.01;
553 if (time - $start > $locktimeout) {
554 die "Could not acquire lockdirectory '$rfile.lock': $!";
557 $self->_is_locked (1);
560 =head2 $ret = $obj->merge ($other)
562 Bulk update of this object with another one. It's intended (but not
563 enforced) to only merge smaller and younger $other objects into the
564 current one. If the other file is a C<Z> file, then we do not merge in
565 objects of type C<delete>. But if we encounter an object of type
566 delete we delete the corresponding C<add> object.
568 =cut
570 sub merge {
571 my($self,$other) = @_;
572 my $canonmeth = $self->canonize;
573 unless ($canonmeth) {
574 $canonmeth = "naive_path_normalize";
576 my $lrd = $self->localroot;
577 my $other_recent_events = $other->recent_events;
578 $self->lock;
579 my $interval = $self->interval;
580 my $secs = $self->interval_secs();
581 my $recent = $self->recent_events;
582 unless (@$recent) {
583 $recent = [];
584 $self->recent_events($recent);
586 my($epoch,$oldest_allowed);
587 # reverse combined with unshift smells. This can be done by
588 # starting with hashifying both lists, concatenation, and removing
589 # the duplicates. Need to write better tests to make sure I get it
590 # right
591 for my $ev (reverse @$other_recent_events) {
592 my $path = $ev->{path};
593 $path = $self->$canonmeth($path);
594 $epoch = $ev->{epoch};
595 $oldest_allowed = $epoch-$secs;
596 # smells of inefficiency
597 while (@$recent && $recent->[-1]{epoch} < $oldest_allowed) {
598 pop @$recent;
600 # more smells:
601 $recent = [ grep { $_->{path} ne $path } @$recent ];
602 # stinking:
603 if ($self->interval eq "Z" and $ev->{type} eq "delete") {
604 # a Z file has no deletes, only living objects
605 } else {
606 unshift @$recent, { epoch => $ev->{epoch}, path => $path, type => $ev->{type} };
609 # sort?
610 $self->write_recent($recent);
611 $self->unlock;
614 =head2 $hashref = $obj->meta_data
616 Returns the hashref of metadata that the server has to add to the
617 I<recentfile>.
619 =cut
621 sub meta_data {
622 my($self) = @_;
623 my $ret = $self->{meta};
624 for my $m (
625 "aggregator",
626 "canonize",
627 "comment",
628 "filenameroot",
629 "interval",
630 "protocol",
632 $ret->{$m} = $self->$m;
634 # XXX need to reset the Producer if I am a writer, keep it when I
635 # am a reader
636 $ret->{Producers} ||= {
637 __PACKAGE__, "$VERSION", # stringified it looks better
639 return $ret;
642 =head2 $success = $obj->mirror ( %options )
644 Mirrors the files in this I<recentfile>. If $options{after} is
645 specified, only file events after this timestamp are being mirrored.
647 =cut
649 sub mirror {
650 my($self, %options) = @_;
651 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
652 my ($recent_data) = $self->recent_events_from_tempfile();
653 my $i = 0;
654 my @error;
655 my @collector;
656 my $last_item = $#$recent_data;
657 if (defined $options{after}) {
658 if ($recent_data->[0]{epoch} > $options{after}) {
659 if (
660 my $f = first
661 {$recent_data->[$_]{epoch} <= $options{after}}
662 0..$#$recent_data
664 $last_item = $f-1;
666 } else {
667 $last_item = -1;
670 ITEM: for my $i (0..$last_item) {
671 my $recent_event = $recent_data->[$i];
672 my $dst = $self->local_path($recent_event->{path});
673 if ($recent_event->{type} eq "new"){
674 if ($self->verbose) {
675 my $doing = -e $dst ? "Syncing" : "Getting";
676 printf STDERR
678 "%s (%d/%d) %s ... ",
679 $doing,
680 1+$i,
681 1+$last_item,
682 $recent_event->{path},
685 my $max_files_per_connection = $self->max_files_per_connection || 42;
686 my $success;
687 if ($max_files_per_connection == 1) {
688 # old code path may go away when the collector has
689 # proved useful...
690 $success = eval { $self->mirror_path($recent_event->{path}) };
691 } else {
692 if ($self->verbose) {
693 print STDERR "\n";
695 push @collector, $recent_event->{path};
696 if (@collector == $max_files_per_connection) {
697 $success = eval { $self->mirror_path(\@collector) };
698 @collector = ();
699 my $sleep = $self->sleep_per_connection;
700 $sleep = 0.42 unless defined $sleep;
701 Time::HiRes::sleep $sleep;
702 } else {
703 next ITEM;
706 if (!$success || $@) {
707 warn "error while mirroring: $@";
708 push @error, $@;
709 sleep 1;
711 if ($self->verbose) {
712 print STDERR "DONE\n";
714 } elsif ($recent_event->{type} eq "delete") {
715 if (-l $dst or not -d _) {
716 unlink $dst or warn "error while unlinking '$dst': $!";
717 } else {
718 rmdir $dst or warn "error on rmdir '$dst': $!";
720 } else {
721 warn "Warning: invalid upload type '$recent_event->{type}'";
724 if (@collector) {
725 my $success = eval { $self->mirror_path(\@collector) };
726 if (!$success || $@) {
727 warn "Warning: Unknown error while mirroring: $@";
728 push @error, $@;
729 sleep 1;
731 if ($self->verbose) {
732 print STDERR "DONE\n";
735 rename $trecentfile, $self->rfile;
736 return !@error;
739 =head2 (void) $obj->mirror_loop
741 Run mirror in an endless loop. See the accessor loopinterval. XXX What
742 happens if we miss the interval during a single loop?
744 =cut
746 sub mirror_loop {
747 my($self) = @_;
748 my $iteration_start = time;
750 my $Signal = 0;
751 $SIG{INT} = sub { $Signal++ };
752 my $loopinterval = $self->loopinterval || 42;
753 my $after = -999999999;
754 LOOP: while () {
755 $self->mirror($after);
756 last LOOP if $Signal;
757 my $re = $self->recent_events;
758 $after = $re->[0]{epoch};
759 if ($self->verbose) {
760 local $| = 1;
761 print "($after)";
763 if (time - $iteration_start < $loopinterval) {
764 sleep $iteration_start + $loopinterval - time;
766 if ($self->verbose) {
767 local $| = 1;
768 print "~";
773 =head2 $success = $obj->mirror_path ( $arrref | $path )
775 If the argument is a scalar, fetches a remote path into the local
776 copy. $path is the path found in the I<recentfile>, i.e. it is relative
777 to the root directory of the mirror.
779 If $path is an array reference then all elements are treated as a path
780 below the current tree and all are rsynced with a single command (and
781 a single connection).
783 =cut
785 sub mirror_path {
786 my($self,$path) = @_;
787 if (ref $path and ref $path eq "ARRAY") {
788 my $dst = $self->local_path();
789 mkpath dirname $dst;
790 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
791 lc $self->filenameroot,
793 TMPDIR => 1,
794 UNLINK => 0,
796 for my $p (@$path) {
797 print $fh $p, "\n";
799 $fh->flush;
800 $fh->unlink_on_destroy(1);
801 while (!$self->rsync->exec
803 src => join("/",
804 $self->remotebase,
806 dst => $dst,
807 'files-from' => $fh->filename,
808 )) {
809 my($err) = $self->rsync->err;
810 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
811 if ($self->verbose) {
812 warn "Info: ignoring link_stat error '$err'";
814 return 1;
816 $self->register_rsync_error ($err);
818 $self->un_register_rsync_error ();
819 } else {
820 my $dst = $self->local_path($path);
821 mkpath dirname $dst;
822 while (!$self->rsync->exec
824 src => join("/",
825 $self->remotebase,
826 $path
828 dst => $dst,
829 )) {
830 my($err) = $self->rsync->err;
831 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
832 if ($self->verbose) {
833 warn "Info: ignoring link_stat error '$err'";
835 return 1;
837 $self->register_rsync_error ($err);
839 $self->un_register_rsync_error ();
841 return 1;
844 =head2 $path = $obj->naive_path_normalize ($path)
846 Takes an absolute unix style path as argument and canonicalizes it to
847 a shorter path if possible, removing things like double slashes or
848 C</./> and removes references to C<../> directories to get a shorter
849 unambiguos path. This is used to make the code easier that determines
850 if a file passed to C<upgrade()> is indeed below our C<localroot>.
852 =cut
854 sub naive_path_normalize {
855 my($self,$path) = @_;
856 $path =~ s|/+|/|g;
857 1 while $path =~ s|/[^/]+/\.\./|/|;
858 $path =~ s|/$||;
859 $path;
862 =head2 $ret = $obj->read_recent_1 ( $recent_data )
864 Delegate of C<recent_events()> on protocol 1
866 =cut
868 sub read_recent_1 {
869 my($self,$data) = @_;
870 return $data->{recent};
873 =head2 $array_ref = $obj->recent_events
875 Note: the code relies on the resource being written atomically. We
876 cannot lock because we may have no write access.
878 =cut
880 sub recent_events {
881 my ($self) = @_;
882 my $rfile = $self->rfile;
883 my ($data) = eval {YAML::Syck::LoadFile($rfile);};
884 my $err = $@;
885 if ($err or !$data) {
886 return [];
888 if (reftype $data eq 'ARRAY') { # protocol 0
889 return $data;
890 } else {
891 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
892 return $self->$meth($data);
896 =head2 $array_ref = $obj->recent_events_from_tempfile
898 Reads the file-events in the temporary local mirror of the remote file.
900 =cut
902 sub recent_events_from_tempfile {
903 my ($self) = @_;
904 $self->_use_tempfile(1);
905 my $ret = $self->recent_events;
906 $self->_use_tempfile(0);
907 return $ret;
910 =head2 $ret = $obj->recentfile
912 deprecated, use rfile instead
914 =cut
916 sub recentfile {
917 my($self) = @_;
918 require Carp;
919 Carp::cluck("deprecated method recentfile called. Please use rfile instead!");
920 my $recent = File::Spec->catfile(
921 $self->localroot,
922 $self->recentfile_basename,
924 return $recent;
927 =head2 $ret = $obj->recentfile_basename
929 Just the basename of our I<recentfile>, composed from C<filenameroot>,
930 C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
932 =cut
934 sub recentfile_basename {
935 my($self) = @_;
936 my $file = sprintf("%s-%s%s",
937 $self->filenameroot,
938 $self->interval,
939 $self->serializer_suffix,
941 return $file;
944 =head2 $str = $obj->remotebase
946 Returns the composed prefix needed when rsyncing from a remote module.
948 =cut
950 sub remotebase {
951 my($self) = @_;
952 my $remotebase = $self->_remotebase;
953 unless (defined $remotebase) {
954 $remotebase = sprintf
956 "%s%s%s",
957 defined $self->remote_host ? ($self->remote_host."::") : "",
958 defined $self->remote_module ? ($self->remote_module."/") : "",
959 defined $self->remote_dir ? $self->remote_dir : "",
961 $self->_remotebase($remotebase);
963 return $remotebase;
966 =head2 my $rfile = $obj->rfile
968 Returns the full path of the I<recentfile>
970 =cut
972 sub rfile {
973 my($self) = @_;
974 if ($self->_use_tempfile) {
975 return $self->_current_tempfile;
976 } else {
977 my $rfile = $self->_rfile;
978 return $rfile if defined $rfile;
979 $rfile = File::Spec->catfile
980 ($self->localroot,
981 $self->recentfile_basename,
983 $self->_rfile ($rfile);
984 return $rfile;
988 =head2 $rsync_obj = $obj->rsync
990 The File::Rsync object that this object uses for communicating with an
991 upstream server.
993 =cut
995 sub rsync {
996 my($self) = @_;
997 my $rsync = $self->_rsync;
998 unless (defined $rsync) {
999 my $rsync_options = $self->rsync_options || {};
1000 $rsync = File::Rsync->new($rsync_options);
1001 $self->_rsync($rsync);
1003 return $rsync;
1006 =head2 (void) $obj->register_rsync_error($err)
1008 =head2 (void) $obj->un_register_rsync_error()
1010 Register_rsync_error is called whenever the File::Rsync object fails
1011 on an exec (say, connection doesn't succeed). It issues a warning and
1012 sleeps for an increasing amount of time. Un_register_rsync_error
1013 resets the sleep time.
1015 =cut
1018 my $no_success_count = 0;
1019 my $no_success_time = 0;
1020 sub register_rsync_error {
1021 my($self, $err) = @_;
1022 $no_success_time = time;
1023 $no_success_count++;
1024 my $sleep = 12 * $no_success_count;
1025 $sleep = 120 if $sleep > 120;
1026 warn sprintf
1028 "Warning: %s, Error while rsyncing: '%s', sleeping %d",
1029 scalar(localtime($no_success_time)),
1030 $err,
1031 $sleep,
1033 sleep $sleep
1035 sub un_register_rsync_error {
1036 my($self) = @_;
1037 $no_success_time = 0;
1038 $no_success_count = 0;
1041 =head2 (void) $obj->unlock
1043 Unlocking is implemented with an C<rmdir> on a locking directory
1044 (C<.lock> appended to $rfile).
1046 =cut
1048 sub unlock {
1049 my($self) = @_;
1050 return unless $self->_is_locked;
1051 my $rfile = $self->rfile;
1052 rmdir "$rfile.lock";
1053 $self->_is_locked (0);
1056 =head2 $ret = $obj->update ($path, $type)
1058 Enter one file into the local I<recentfile>. $path is the (usually
1059 absolute) path. If the path is outside the I<our> tree, then it is
1060 ignored.
1062 $type is one of C<new> or C<delete>.
1064 =cut
1066 sub update {
1067 my($self,$path,$type) = @_;
1068 die "write_recent called without path argument" unless defined $path;
1069 die "write_recent called without type argument" unless defined $type;
1070 die "write_recent called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1071 my $meth = $self->canonize;
1072 unless ($meth) {
1073 $meth = "naive_path_normalize";
1075 $path = $self->$meth($path);
1076 my $lrd = $self->localroot;
1077 if ($path =~ s|^\Q$lrd\E||) {
1078 $path =~ s|^/||;
1079 my $interval = $self->interval;
1080 my $secs = $self->interval_secs();
1081 my $epoch = Time::HiRes::time;
1082 my $oldest_allowed = $epoch-$secs;
1084 $self->lock;
1085 my $recent = $self->recent_events;
1086 $recent ||= [];
1087 TRUNCATE: while (@$recent) {
1088 if ($recent->[-1]{epoch} < $oldest_allowed) {
1089 pop @$recent;
1090 } else {
1091 last TRUNCATE;
1094 # remove older duplicates of this $path, irrespective of $type:
1095 $recent = [ grep { $_->{path} ne $path } @$recent ];
1097 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1098 # sort?
1099 $self->write_recent($recent);
1100 $self->_assert_symlink;
1101 $self->unlock;
1105 =head2 $obj->write_recent ($recent_files_arrayref)
1107 Writes a I<recentfile> based on the current reflection of the current
1108 state of the tree limited by the current interval.
1110 =cut
1112 sub write_recent {
1113 my ($self,$recent) = @_;
1114 die "write_recent called without argument" unless defined $recent;
1115 my $meth = sprintf "write_%d", $self->protocol;
1116 $self->$meth($recent);
1119 =head2 $obj->write_0 ($recent_files_arrayref)
1121 Delegate of C<write_recent()> on protocol 0
1123 =cut
1125 sub write_0 {
1126 my ($self,$recent) = @_;
1127 my $rfile = $self->rfile;
1128 YAML::Syck::DumpFile("$rfile.new",$recent);
1129 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1132 =head2 $obj->write_1 ($recent_files_arrayref)
1134 Delegate of C<write_recent()> on protocol 1
1136 =cut
1138 sub write_1 {
1139 my ($self,$recent) = @_;
1140 my $rfile = $self->rfile;
1141 YAML::Syck::DumpFile("$rfile.new",{
1142 meta => $self->meta_data,
1143 recent => $recent,
1145 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1148 BEGIN {
1149 my @pod_lines =
1150 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1152 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1154 The idea is that we want to have a short file that records really
1155 recent changes. So that a fresh mirror can be kept fresh as long as
1156 the connectivity is given. Then we want longer files that record the
1157 history before. So when the mirror falls behind the update period
1158 reflected in the shortest file, it can switch to the next one. And if
1159 this is not long enough we want another one, again a bit longer. And
1160 we want one that completes the history back to the oldest file. For
1161 practical reasons the timespans of these files must overlap a bit and
1162 to keep the bandwidth necessities low they must not be
1163 updated too frequently. That's the basic idea. The following
1164 example represents a tree that has a few updates every day:
1166 RECENT-1h.yaml
1167 RECENT-6h.yaml
1168 RECENT-1d.yaml
1169 RECENT-1M.yaml
1170 RECENT-1W.yaml
1171 RECENT-1Q.yaml
1172 RECENT-1Y.yaml
1173 RECENT-Z.yaml
1175 The last file, the Z file, contains the complementary files that are
1176 in none of the other files. It does never contain C<deletes>. Besides
1177 this it serves the role of a recovery mechanism or spill over pond.
1178 When things go wrong, it's a valuable controlling instance to hold the
1179 differences between the collection of limited interval files and the
1180 actual filesystem.
1182 =head2 A SINGLE RECENTFILE
1184 A I<recentfile> consists of a hash that has two keys: C<meta> and
1185 C<recent>. The C<meta> part has metadata and the C<recent> part has a
1186 list of fileobjects.
1188 =head2 THE META PART
1190 Here we find things that are pretty much self explaining: all
1191 lowercase attributes are accessors and as such explained somewhere
1192 above in this manpage. The uppercase attribute C<Producers> contains
1193 version information about involved software components. Nothing to
1194 worry about as I believe.
1196 =head2 THE RECENT PART
1198 This is the interesting part. Every entry refers to some filesystem
1199 change (with path, epoch, type). The epoch value is the point in time
1200 when some change was I<registered>. Do not be tempted to believe that
1201 the entry has a direct relation to something like modification time or
1202 change time on the filesystem level. The timestamp (I<epoch> element)
1203 is a floating point number and does practically never correspond
1204 exactly to the data recorded in the filesystem but rather to the time
1205 when some process succeeded to report to the I<recentfile> mechanism
1206 that something has changed. This is why many parts of the code refer
1207 to I<events>, because we merely try to record the I<event> of the
1208 discovery of a change, not the time of the change itself.
1210 All these entries can be devided into two types (denoted by the
1211 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
1212 C<new>s. Deletes are C<delete>s.
1214 Another distinction is for objects with an epoch timestamp and others
1215 without. All files that were already existing on the filesystem before
1216 the I<recentfile> mechanism was installed, get recorded with a
1217 timestamp of zero.
1219 Besides an C<epoch> and a C<type> attribute we find a third one:
1220 C<path>. This path is relative to the directory we find the
1221 I<recentfile> in.
1223 The order of the entries in the I<recentfile> is by decreasing epoch
1224 attribute. These are either 0 or a unique floating point number. They
1225 are zero for events that were happening either before the time that
1226 the I<recentfile> mechanism was set up or were left undiscovered for a
1227 while and never handed over to update(). They are floating point
1228 numbers for all events being regularly handed to update(). And when
1229 the server has ntp running correctly, then the timestamps are
1230 actually decreasing and unique.
1232 =head1 CORRUPTION AND RECOVERY
1234 If the origin host breaks the promise to deliver consistent and
1235 complete I<recentfiles> then the way back to sanity shall be achieved
1236 through either the C<zloop> (still TBD) or traditional rsyncing
1237 between the hosts. For example, if the origin server forgets to deploy
1238 ntp and the clock on it jumps backwards some day, then this would
1239 probably go unnoticed for a while and many software components that
1240 rely on the time never running backwards will make wrong decisions.
1241 After some time this accident would probably still be found in one of
1242 the I<recentfiles> but would become meaningless as soon as a mirror
1243 has run through the sanitizing procedures. Same goes for origin hosts
1244 that forget to include or deliberately omit some files.
1246 =head1 SERIALIZERS
1248 The following suffixes are supported and trigger the use of these
1249 serializers:
1251 =over 4
1253 =item C<< ".yaml" => "YAML::Syck" >>
1255 =item C<< ".json" => "JSON" >>
1257 =item C<< ".sto" => "Storable" >>
1259 =item C<< ".dd" => "Data::Dumper" >>
1261 =back
1263 =cut
1265 BEGIN {
1266 my @pod_lines =
1267 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1269 =head1 INTERVAL SPEC
1271 An interval spec is a primitive way to express time spans. Normally it
1272 is composed from an integer and a letter.
1274 As a special case, a string that consists only of the single letter
1275 C<Z>, stands for unlimited time.
1277 The following letters express the specified number of seconds:
1279 =over 4
1281 =item C<< s => 1 >>
1283 =item C<< m => 60 >>
1285 =item C<< h => 60*60 >>
1287 =item C<< d => 60*60*24 >>
1289 =item C<< W => 60*60*24*7 >>
1291 =item C<< M => 60*60*24*30 >>
1293 =item C<< Q => 60*60*24*90 >>
1295 =item C<< Y => 60*60*24*365.25 >>
1297 =back
1299 =cut
1301 =head1 BACKGROUND
1303 This is about speeding up rsync operation on large trees to many
1304 places. Uses a small metadata cocktail and pull technology.
1306 =head2 NON-COMPETITORS
1308 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
1309 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
1310 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
1311 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
1312 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
1313 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
1315 =head2 COMPETITORS
1317 The problem to solve which clusters and ftp mirrors and otherwise
1318 replicated datasets like CPAN share: how to transfer only a minimum
1319 amount of data to determine the diff between two hosts.
1321 Normally it takes a long time to determine the diff itself before it
1322 can be transferred. Known solutions at the time of this writing are
1323 csync2, and rsync 3 batch mode.
1325 For many years the best solution was csync2 which solves the
1326 problem by maintining a sqlite database on both ends and talking a
1327 highly sophisticated protocol to quickly determine which files to send
1328 and which to delete at any given point in time. Csync2 is often
1329 inconvenient because the act of syncing demands quite an intimate
1330 relationship between the sender and the receiver and suffers when the
1331 number of syncing sites is large or connections are unreliable.
1333 Rsync 3 batch mode works around these problems by providing rsync-able
1334 batch files which allow receiving nodes to replay the history of the
1335 other nodes. This reduces the need to have an incestuous relation but
1336 it has the disadvantage that these batch files replicate the contents
1337 of the involved files. This seems inappropriate when the nodes already
1338 have a means of communicating over rsync.
1340 rersyncrecent solves this problem with a couple of (usually 2-10)
1341 index files which cover different overlapping time intervals. The
1342 master writes these files and the clients can construct the full tree
1343 from the information contained in them. The most recent index file
1344 usually covers the last seconds or minutes or hours of the tree and
1345 depending on the needs, slaves can rsync every few seconds and then
1346 bring their trees in full sync.
1348 The rersyncrecent mode was developed for CPAN but I hope it is a
1349 convenient and economic general purpose solution. I'm looking forward
1350 to see a CPAN backbone that is only a few seconds behind PAUSE. And
1351 then ... the first FUSE based CPAN filesystem anyone?
1353 =head1 AUTHOR
1355 Andreas König
1357 =head1 BUGS
1359 Please report any bugs or feature requests through the web interface
1361 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
1362 I will be notified, and then you'll automatically be notified of
1363 progress on your bug as I make changes.
1365 =head1 SUPPORT
1367 You can find documentation for this module with the perldoc command.
1369 perldoc File::Rsync::Mirror::Recentfile
1371 You can also look for information at:
1373 =over 4
1375 =item * RT: CPAN's request tracker
1377 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
1379 =item * AnnoCPAN: Annotated CPAN documentation
1381 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
1383 =item * CPAN Ratings
1385 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
1387 =item * Search CPAN
1389 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
1391 =back
1394 =head1 ACKNOWLEDGEMENTS
1396 Thanks to RJBS for module-starter.
1398 =head1 COPYRIGHT & LICENSE
1400 Copyright 2008 Andreas König, all rights reserved.
1402 This program is free software; you can redistribute it and/or modify it
1403 under the same terms as Perl itself.
1406 =cut
1408 1; # End of File::Rsync::Mirror::Recentfile