small complexity reduction, renaming variables, change ordering, remove braindead...
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob393fb1f499bbeffd3be18a6e1a64ca217fa5643f
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 my $HAVE = {};
19 for my $package (
20 "Data::Serializer",
21 "File::Rsync"
22 ) {
23 $HAVE->{$package} = eval qq{ require $package; };
25 use Config;
26 use File::Basename qw(dirname fileparse);
27 use File::Copy qw(cp);
28 use File::Path qw(mkpath);
29 use File::Temp;
30 use List::Util qw(first min);
31 use Scalar::Util qw(reftype);
32 use Storable;
33 use Time::HiRes qw();
34 use YAML::Syck;
36 use version; our $VERSION = qv('0.0.1');
39 use constant MAX_INT => ~0>>1; # anything better?
40 use constant DEFAULT_PROTOCOL => 1;
42 # cf. interval_secs
43 my %seconds;
45 # maybe subclass if this mapping is bad?
46 my %serializers;
48 =head1 SYNOPSIS
50 B<!!!! PRE-ALPHA ALERT !!!!>
52 Nothing in here is believed to be stable, nothing yet intended for
53 public consumption. The plan is to provide a script in one of the next
54 releases that acts as a frontend for all the backend functionality.
55 Option and method names will very likely change.
57 For the rationale see the section BACKGROUND.
59 This is published only for developers of the (yet to be named)
60 script(s).
62 Writer (of a single file):
64 use File::Rsync::Mirror::Recentfile;
65 my $fr = File::Rsync::Mirror::Recentfile->new
67 interval => q(6h),
68 filenameroot => "RECENT",
69 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
70 localroot => "/home/ftp/pub/PAUSE/authors/",
71 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
73 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
75 Reader/mirrorer:
77 my $rf = File::Rsync::Mirror::Recentfile->new
79 filenameroot => "RECENT",
80 ignore_link_stat_errors => 1,
81 interval => q(6h),
82 localroot => "/home/ftp/pub/PAUSE/authors",
83 remote_dir => "",
84 remote_host => "pause.perl.org",
85 remote_module => "authors",
86 rsync_options => {
87 compress => 1,
88 'rsync-path' => '/usr/bin/rsync',
89 links => 1,
90 times => 1,
91 'omit-dir-times' => 1,
92 checksum => 1,
94 verbose => 1,
96 $rf->mirror;
98 Aggregator (usually the writer):
100 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
101 $rf->aggregate;
103 =head1 EXPORT
105 No exports.
107 =head1 CONSTRUCTORS / DESTRUCTOR
109 =head2 my $obj = CLASS->new(%hash)
111 Constructor. On every argument pair the key is a method name and the
112 value is an argument to that method name.
114 If a recentfile for this resource already exists, metadata that are
115 not defined by the constructor will be fetched from there as soon as
116 it is being read by recent_events().
118 =cut
120 sub new {
121 my($class, @args) = @_;
122 my $self = bless {}, $class;
123 while (@args) {
124 my($method,$arg) = splice @args, 0, 2;
125 $self->$method($arg);
127 unless (defined $self->protocol) {
128 $self->protocol(DEFAULT_PROTOCOL);
130 unless (defined $self->filenameroot) {
131 $self->filenameroot("RECENT");
133 unless (defined $self->serializer_suffix) {
134 $self->serializer_suffix(".yaml");
136 return $self;
139 =head2 my $obj = CLASS->new_from_file($file)
141 Constructor. $file is a I<recentfile>.
143 =cut
145 sub new_from_file {
146 my($class, $file) = @_;
147 my $self = bless {}, $class;
148 $self->_rfile($file);
149 #?# $self->lock;
150 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
151 local $/;
152 <$fh>;
154 # XXX: we can skip this step when the metadata are sufficient, but
155 # we cannot parse the file without some magic stuff about
156 # serialized formats
157 while (-l $file) {
158 my($name,$path) = fileparse $file;
159 my $symlink = readlink $file;
160 if ($symlink =~ m|/|) {
161 die "FIXME: filenames containing '/' not supported, got $symlink";
163 $file = File::Spec->catfile ( $path, $symlink );
165 my($name,$path,$suffix) = fileparse $file, keys %serializers;
166 $self->serializer_suffix($suffix);
167 $self->localroot($path);
168 die "Could not determine file format from suffix" unless $suffix;
169 my $deserialized;
170 if ($suffix eq ".yaml") {
171 require YAML::Syck;
172 $deserialized = YAML::Syck::LoadFile($file);
173 } elsif ($HAVE->{"Data::Serializer"}) {
174 my $serializer = Data::Serializer->new
175 ( serializer => $serializers{$suffix} );
176 $deserialized = $serializer->raw_deserialize($serialized);
177 } else {
178 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
180 while (my($k,$v) = each %{$deserialized->{meta}}) {
181 next if $k ne lc $k; # "Producers"
182 $self->$k($v);
184 unless (defined $self->protocol) {
185 $self->protocol(DEFAULT_PROTOCOL);
187 return $self;
190 =head2 DESTROY
192 A simple unlock.
194 =cut
195 sub DESTROY { shift->unlock }
197 =head1 ACCESSORS
199 =cut
201 my @accessors;
203 BEGIN {
204 @accessors = (
205 "_current_tempfile",
206 "_current_tempfile_fh",
207 "_done",
208 "_interval",
209 "_is_locked",
210 "_localroot",
211 "_merged",
212 "_pathdb",
213 "_remote_dir",
214 "_remoteroot",
215 "_rfile",
216 "_rsync",
217 "_use_tempfile",
220 my @pod_lines =
221 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
223 =over 4
225 =item aggregator
227 A list of interval specs that tell the aggregator which I<recentfile>s
228 are to be produced.
230 =item canonize
232 The name of a method to canonize the path before rsyncing. Only
233 supported value is C<naive_path_normalize>. Defaults to that.
235 =item comment
237 A comment about this tree and setup.
239 =item filenameroot
241 The (prefix of the) filename we use for this I<recentfile>. Defaults to
242 C<RECENT>.
244 =item have_mirrored
246 Timestamp remembering when we mirrored this recentfile the last time.
247 Only relevant for slaves.
249 =item ignore_link_stat_errors
251 If set to true, rsync errors are ignored that complain about link stat
252 errors. These seem to happen only when there are files missing at the
253 origin. In race conditions this can always happen, so it is
254 recommended to set this value to true.
256 =item is_slave
258 If set to true, this object will fetch a new recentfile from remote
259 when the timespan between the last mirror (see have_mirrored) and now
260 is too large (currently hardcoded arbitrary 420 seconds).
262 =item locktimeout
264 After how many seconds shall we die if we cannot lock a I<recentfile>?
265 Defaults to 600 seconds.
267 =item loopinterval
269 When mirror_loop is called, this accessor can specify how much time
270 every loop shall at least take. If the work of a loop is done before
271 that time has gone, sleeps for the rest of the time. Defaults to
272 arbitrary 42 seconds.
274 =item max_files_per_connection
276 Maximum number of files that are transferred on a single rsync call.
277 Setting it higher means higher performance at the price of holding
278 connections longer and potentially disturbing other users in the pool.
279 Defaults to the arbitrary value 42.
281 =item max_rsync_errors
283 When rsync operations encounter that many errors without any resetting
284 success in between, then we die. Defaults to arbitrary 12. A value of
285 -1 means we run forever ignoring all rsync errors.
287 =item minmax
289 Hashref remembering when we read the recent_events from this file the
290 last time and what the timespan was.
292 =item protocol
294 When the RECENT file format changes, we increment the protocol. We try
295 to support older protocols in later releases.
297 =item remote_host
299 The host we are mirroring from. Leave empty for the local filesystem.
301 =item remote_module
303 Rsync servers have so called modules to separate directory trees from
304 each other. Put here the name of the module under which we are
305 mirroring. Leave empty for local filesystem.
307 =item rsync_options
309 Things like compress, links, times or checksums. Passed in to the
310 File::Rsync object used to run the mirror.
312 =item serializer_suffix
314 Mostly untested accessor. The only well tested format for
315 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
316 Data::Serializer. But in principle other formats are supported as
317 well. See section SERIALIZERS below.
319 =item sleep_per_connection
321 Sleep that many seconds (floating point OK) after every chunk of rsyncing
322 has finished. Defaults to arbitrary 0.42.
324 =item ttl
326 Time to live. Number of seconds after which this recentfile must be
327 fetched again from the origin server. Only relevant for slaves.
328 Defaults to arbitrary 24.2 seconds.
330 =item verbose
332 Boolean to turn on a bit verbosity.
334 =back
336 =cut
338 use accessors @accessors;
340 =head1 METHODS
342 =head2 (void) $obj->aggregate
344 Takes all intervals that are collected in the accessor called
345 aggregator. Sorts them by actual length of the interval.
346 Removes those that are shorter than our own interval. Then merges this
347 object into the next larger object. The merging continues upwards
348 as long as the next I<recentfile> is old enough to warrant a merge.
350 If a merge is warranted is decided according to the interval of the
351 previous interval so that larger files are not so often updated as
352 smaller ones.
354 Here is an example to illustrate the behaviour. Given aggregators
356 1h 1d 1W 1M 1Q 1Y Z
358 then
360 1h updates 1d on every call to aggregate()
361 1d updates 1W earliest after 1h
362 1W updates 1M earliest after 1d
363 1M updates 1Q earliest after 1W
364 1Q updates 1Y earliest after 1M
365 1Y updates Z earliest after 1Q
367 Note that all but the smallest recentfile get updated at an arbitrary
368 rate and as such are quite useless on their own.
370 =cut
372 sub aggregate {
373 my($self) = @_;
374 my @aggs = sort { $a->{secs} <=> $b->{secs} }
375 grep { $_->{secs} >= $self->interval_secs }
376 map { { interval => $_, secs => $self->interval_secs($_)} }
377 $self->interval, @{$self->aggregator || []};
378 $aggs[0]{object} = $self;
379 AGGREGATOR: for my $i (0..$#aggs-1) {
380 my $this = $aggs[$i]{object};
381 my $next = $this->_sparse_clone;
382 $next->interval($aggs[$i+1]{interval});
383 my $want_merge = 0;
384 if ($i == 0) {
385 $want_merge = 1;
386 } else {
387 my $next_rfile = $next->rfile;
388 if (-e $next_rfile) {
389 my $prev = $aggs[$i-1]{object};
390 local $^T = time;
391 my $next_age = 86400 * -M $next_rfile;
392 if ($next_age > $prev->interval_secs) {
393 $want_merge = 1;
395 } else {
396 $want_merge = 1;
399 if ($want_merge) {
400 $next->merge($this);
401 $aggs[$i+1]{object} = $next;
402 } else {
403 last AGGREGATOR;
408 # collect file size and mtime for all files of this aggregate
409 sub _debug_aggregate {
410 my($self) = @_;
411 my @aggs = sort { $a->{secs} <=> $b->{secs} }
412 map { { interval => $_, secs => $self->interval_secs($_)} }
413 $self->interval, @{$self->aggregator || []};
414 my $report = [];
415 for my $i (0..$#aggs) {
416 my $this = Storable::dclone $self;
417 $this->interval($aggs[$i]{interval});
418 my $rfile = $this->rfile;
419 my @stat = stat $rfile;
420 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
422 $report;
425 # (void) $self->_assert_symlink()
426 sub _assert_symlink {
427 my($self) = @_;
428 my $recentrecentfile = File::Spec->catfile
430 $self->localroot,
431 sprintf
433 "%s.recent",
434 $self->filenameroot
437 if ($Config{d_symlink} eq "define") {
438 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
439 if (-l $recentrecentfile) {
440 my $found_symlink = readlink $recentrecentfile;
441 if ($found_symlink eq $self->rfilename) {
442 return;
443 } else {
444 $howto_create_symlink = 2;
446 } else {
447 $howto_create_symlink = 1;
449 if (1 == $howto_create_symlink) {
450 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
451 } else {
452 unlink "$recentrecentfile.$$"; # may fail
453 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
454 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
456 } else {
457 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
458 unlink "$recentrecentfile.$$"; # may fail
459 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
460 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
464 =head2 $done = $obj->done
466 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
467 that keeps track of rsync activities. Only used/needed when we are a
468 mirroring slave.
470 =cut
472 sub done {
473 my($self) = @_;
474 my $done = $self->_done;
475 if (!$done) {
476 require File::Rsync::Mirror::Recentfile::Done;
477 $done = File::Rsync::Mirror::Recentfile::Done->new();
478 $self->_done ( $done );
480 return $done;
483 =head2 $success = $obj->full_mirror
485 (TBD) Mirrors the whole remote site, starting with the smallest I<recentfile>,
486 switching to larger ones ...
488 =cut
490 sub full_mirror {
491 my($self) = @_;
492 die "FIXME: Not yet implemented";
495 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ($rfilename)
497 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
499 Stores the remote I<recentfile> locally as a tempfile. $rfilename must
500 be a plain filename without path separators. The second form fetches
501 the file with the default name. The caller is responsible to remove
502 the file after use.
504 Note: if you're intending to act as an rsync server for other slaves,
505 then you must prefer this method to mirror (and read) recentfiles over
506 get_remotefile(). Otherwise downstream mirrors would expect you to
507 have files that you do not have yet.
509 Note: currently we have an arbitrary brake built into the method:
510 before 4.42 seconds are over since the last download we will return
511 without downloading. XXX
513 =cut
515 sub get_remote_recentfile_as_tempfile {
516 my($self, $trfilename) = @_;
517 mkpath $self->localroot;
518 my $fh;
519 if ($trfilename) {
520 $self->_use_tempfile (1); # why?
521 } elsif ( $self->_use_tempfile() ) {
522 return $self->_current_tempfile if ! $self->ttl_reached;
523 $fh = $self->_current_tempfile_fh;
524 $trfilename = $self->rfilename;
525 } else {
526 $trfilename = $self->rfilename;
529 return $trfilename
530 if (!$trfilename
531 && $self->have_mirrored
532 && Time::HiRes::time-$self->have_mirrored < 4.42
534 die "Alert: illegal filename[$trfilename] contains a slash" if $trfilename =~ m|/|;
535 my $dst;
536 if ($fh) {
537 $dst = $self->_current_tempfile;
538 } else {
539 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
540 $dst = $fh->filename;
541 $self->_current_tempfile ($dst);
542 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
543 if (defined $rfile && -e $rfile) {
544 # saving on bandwidth. Might need to be configurable
545 # $self->bandwidth_is_cheap?
546 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
549 my $src = join ("/",
550 $self->remoteroot,
551 $trfilename,
553 if ($self->verbose) {
554 my $doing = -e $dst ? "Syncing" : "Getting";
555 printf STDERR
557 "%s (1/1) temporary %s ... ",
558 $doing,
559 $dst,
562 my $gaveup = 0;
563 my $retried = 0;
564 while (!$self->rsync->exec(
565 src => $src,
566 dst => $dst,
567 )) {
568 $self->register_rsync_error ($self->rsync->err);
569 if (++$retried >= 3) {
570 warn "XXX giving up";
571 $gaveup = 1;
572 last;
575 unless ($gaveup) {
576 $self->have_mirrored (Time::HiRes::time);
577 $self->un_register_rsync_error ();
579 if ($self->verbose) {
580 print STDERR "DONE\n";
582 my $mode = 0644;
583 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
584 return $dst;
587 sub _get_remote_rat_provide_tempfile_object {
588 my($self, $trfilename) = @_;
589 my $fh = File::Temp->new
590 (TEMPLATE => sprintf(".%s-XXXX",
591 $trfilename,
593 DIR => $self->localroot,
594 SUFFIX => $self->serializer_suffix,
595 UNLINK => $self->_use_tempfile,
597 if ($self->_use_tempfile) {
598 $self->_current_tempfile_fh ($fh); # delay self destruction
600 return $fh;
603 =head2 $localpath = $obj->get_remotefile ( $relative_path )
605 Rsyncs one single remote file to local filesystem.
607 Note: no locking is done on this file. Any number of processes may
608 mirror this object.
610 Note II: do not use for recentfiles. If you are a cascading
611 slave/server combination, it would confuse other slaves. They would
612 expect the contents of these recentfiles to be available. Use
613 get_remote_recentfile_as_tempfile() instead.
615 =cut
617 sub get_remotefile {
618 my($self, $path) = @_;
619 my $dst = File::Spec->catfile($self->localroot, $path);
620 mkpath dirname $dst;
621 if ($self->verbose) {
622 my $doing = -e $dst ? "Syncing" : "Getting";
623 printf STDERR
625 "%s (1/1) %s ... ",
626 $doing,
627 $path,
630 while (!$self->rsync->exec(
631 src => join("/",
632 $self->remoteroot,
633 $path),
634 dst => $dst,
635 )) {
636 $self->register_rsync_error ($self->rsync->err);
638 $self->un_register_rsync_error ();
639 if ($self->verbose) {
640 print STDERR "DONE\n";
642 return $dst;
645 =head2 $obj->interval ( $interval_spec )
647 Get/set accessor. $interval_spec is a string and described below in
648 the section INTERVAL SPEC.
650 =cut
652 sub interval {
653 my ($self, $interval) = @_;
654 if (@_ >= 2) {
655 $self->_interval($interval);
656 $self->_rfile(undef);
658 $interval = $self->_interval;
659 unless (defined $interval) {
660 # do not ask the $self too much, it recurses!
661 require Carp;
662 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
664 return $interval;
667 =head2 $secs = $obj->interval_secs ( $interval_spec )
669 $interval_spec is described below in the section INTERVAL SPEC. If
670 empty defaults to the inherent interval for this object.
672 =cut
674 sub interval_secs {
675 my ($self, $interval) = @_;
676 $interval ||= $self->interval;
677 unless (defined $interval) {
678 die "interval_secs() called without argument on an object without a declared one";
680 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
681 die "Could not determine seconds from interval[$interval]";
682 if ($interval eq "Z") {
683 return MAX_INT;
684 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
685 return $seconds{$t}*$n;
686 } else {
687 die "Invalid interval specification: n[$n]t[$t]";
691 =head2 $obj->localroot ( $localroot )
693 Get/set accessor. The local root of the tree.
695 =cut
697 sub localroot {
698 my ($self, $localroot) = @_;
699 if (@_ >= 2) {
700 $self->_localroot($localroot);
701 $self->_rfile(undef);
703 $localroot = $self->_localroot;
706 =head2 $ret = $obj->local_path($path_found_in_recentfile)
708 Combines the path to our local mirror and the path of an object found
709 in this I<recentfile>. In other words: the target of a mirror operation.
711 Implementation note: We split on slashes and then use
712 File::Spec::catfile to adjust to the local operating system.
714 =cut
716 sub local_path {
717 my($self,$path) = @_;
718 unless (defined $path) {
719 # seems like a degenerated case
720 return $self->localroot;
722 my @p = split m|/|, $path;
723 File::Spec->catfile($self->localroot,@p);
726 =head2 (void) $obj->lock
728 Locking is implemented with an C<mkdir> on a locking directory
729 (C<.lock> appended to $rfile).
731 =cut
733 sub lock {
734 my ($self) = @_;
735 # not using flock because it locks on filehandles instead of
736 # old school ressources.
737 my $locked = $self->_is_locked and return;
738 my $rfile = $self->rfile;
739 # XXX need a way to allow breaking the lock
740 my $start = time;
741 my $locktimeout = $self->locktimeout || 600;
742 while (not mkdir "$rfile.lock") {
743 Time::HiRes::sleep 0.01;
744 if (time - $start > $locktimeout) {
745 die "Could not acquire lockdirectory '$rfile.lock': $!";
748 $self->_is_locked (1);
751 =head2 (void) $obj->merge ($other)
753 Bulk update of this object with another one. It's used to merge a
754 smaller and younger $other object into the current one. If this file
755 is a C<Z> file, then we do not merge in objects of type C<delete>. But
756 if we encounter an object of type delete we delete the corresponding
757 C<new> object.
759 If there is nothing to be merged, nothing is done.
761 =cut
763 sub merge {
764 my($self, $other) = @_;
765 $self->_merge_sanitycheck ( $other );
766 $other->lock;
767 my $other_recent = $other->recent_events || [];
768 $self->lock;
769 my $my_recent = $self->recent_events || [];
771 # calculate the target time span
772 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
773 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
774 my $oldest_allowed = 0;
775 my $something_done;
776 unless ($my_recent->[0]) {
777 # obstetrics
778 $something_done=1;
780 if ($epoch) {
781 if (my $merged = $self->merged) {
782 my $secs = $self->interval_secs();
783 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
785 # throw away outsiders
786 # XXX _bigfloat!
787 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
788 pop @$my_recent;
789 $something_done=1;
793 my %have;
794 my $recent = [];
795 for my $oev (@$other_recent) {
796 my $oevepoch = $oev->{epoch} || 0;
797 next if $oevepoch < $oldest_allowed;
798 my $path = $oev->{path};
799 next if $have{$path}++;
800 if ( $self->interval eq "Z"
801 and $oev->{type} eq "delete") {
802 # do nothing
803 } else {
804 if (!$myepoch || $oevepoch > $myepoch) {
805 $something_done=1;
807 push @$recent, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
810 if ($something_done) {
811 push @$recent, grep { !$have{$_->{path}}++ } @$my_recent;
812 $self->write_recent($recent);
813 $other->merged({
814 time => Time::HiRes::time, # not used anywhere
815 epoch => $epoch, # used in oldest_allowed
816 into_interval => $self->interval, # not used anywhere
818 $other->write_recent($other_recent);
820 $self->unlock;
821 $other->unlock;
824 sub _merge_sanitycheck {
825 my($self, $other) = @_;
826 if ($self->interval_secs <= $other->interval_secs) {
827 die sprintf
829 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
830 $self->interval_secs,
831 $other->interval_secs,
836 =head2 merged
838 Hashref denoting when this recentfile has been merged into some other
839 at which epoch.
841 =cut
843 sub merged {
844 my($self, $set) = @_;
845 if (defined $set) {
846 $self->_merged ($set);
848 my $merged = $self->_merged;
849 my $into;
850 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
851 if ($into eq $self->interval) {
852 warn sprintf
854 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
855 $into,
856 $self->interval,
858 } elsif ($self->interval_secs($into) < $self->interval_secs) {
859 warn sprintf
861 "Warning: into_interval[%s] smaller than own interval[%s] on interval[%s]. Danger ahead.",
862 $self->interval_secs($into),
863 $self->interval_secs,
864 $self->interval,
868 $merged;
871 =head2 $hashref = $obj->meta_data
873 Returns the hashref of metadata that the server has to add to the
874 I<recentfile>.
876 =cut
878 sub meta_data {
879 my($self) = @_;
880 my $ret = $self->{meta};
881 for my $m (
882 "aggregator",
883 "canonize",
884 "comment",
885 "filenameroot",
886 "merged",
887 "interval",
888 "protocol",
889 "serializer_suffix",
891 my $v = $self->$m;
892 if (defined $v) {
893 $ret->{$m} = $v;
896 # XXX need to reset the Producer if I am a writer, keep it when I
897 # am a reader
898 $ret->{Producers} ||= {
899 __PACKAGE__, "$VERSION", # stringified it looks better
900 '$0', $0,
901 'time', Time::HiRes::time,
903 return $ret;
906 =head2 $success = $obj->mirror ( %options )
908 Mirrors the files in this I<recentfile> as reported by
909 C<recent_events>. Options named C<after>, C<before>, C<max>, and
910 C<skip-deletes> are passed through to the L<recent_events> call. The
911 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
912 C<max_files_per_connection> and keep track of the rsynced files so
913 that future calls will rsync different files until all files are
914 brought to sync.
916 =cut
918 sub mirror {
919 my($self, %options) = @_;
920 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
921 $self->_use_tempfile (1);
922 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
923 my ($recent_events) = $self->recent_events(%passthrough);
924 my(@error, @xcollector);
925 my $first_item = 0;
926 my $last_item = $#$recent_events;
927 my $done = $self->done;
928 my $pathdb = $self->_pathdb;
929 ITEM: for my $i ($first_item..$last_item) {
930 my $status = +{};
931 $self->_mirror_item
934 $recent_events,
935 $last_item,
936 $done,
937 $pathdb,
938 \@xcollector,
939 \%options,
940 $status,
941 \@error,
943 return if $status->{mustreturn};
945 if (@xcollector) {
946 my $success = eval { $self->_empty_xcollector (\@xcollector,$pathdb,$recent_events);};
947 if (!$success || $@) {
948 warn "Warning: Unknown error while mirroring: $@";
949 push @error, $@;
950 sleep 1;
952 if ($self->verbose) {
953 print STDERR "DONE\n";
956 my $rfile = $self->rfile;
957 unless (rename $trecentfile, $rfile) {
958 require Carp;
959 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
961 $self->_use_tempfile (0);
962 if (my $ctfh = $self->_current_tempfile_fh) {
963 $ctfh->unlink_on_destroy (0);
964 $self->_current_tempfile_fh (undef);
966 return !@error;
969 sub _mirror_item {
970 my($self,
972 $recent_events,
973 $last_item,
974 $done,
975 $pathdb,
976 $xcollector,
977 $options,
978 $status,
979 $error,
980 ) = @_;
981 my $recent_event = $recent_events->[$i];
982 return if $done->covered ( $recent_event->{epoch} );
983 if ($pathdb) {
984 my $rec = $pathdb->{$recent_event->{path}};
985 if ($rec && $rec->{recentepoch}) {
986 if (File::Rsync::Mirror::Recentfile::Done::_bigfloatgt
987 ( $rec->{recentepoch}, $recent_event->{epoch} )){
988 $done->register ($recent_events, [$i]);
989 return;
993 my $dst = $self->local_path($recent_event->{path});
994 if ($recent_event->{type} eq "new"){
995 $self->_mirror_item_new
997 $dst,
999 $last_item,
1000 $recent_events,
1001 $recent_event,
1002 $xcollector,
1003 $pathdb,
1004 $status,
1005 $error,
1006 $options,
1008 } elsif ($recent_event->{type} eq "delete") {
1009 my $activity;
1010 if ($options->{'skip-deletes'}) {
1011 $activity = "skipp";
1012 } else {
1013 if (-l $dst or not -d _) {
1014 unless (unlink $dst) {
1015 require Carp;
1016 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1018 } else {
1019 unless (rmdir $dst) {
1020 require Carp;
1021 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1024 $activity = "delet";
1026 $done->register ($recent_events, [$i]);
1027 if ($pathdb) {
1028 $self->_register_path($pathdb,[$recent_event],$activity);
1030 } else {
1031 warn "Warning: invalid upload type '$recent_event->{type}'";
1035 sub _mirror_item_new {
1036 my($self,
1037 $dst,
1039 $last_item,
1040 $recent_events,
1041 $recent_event,
1042 $xcollector,
1043 $pathdb,
1044 $status,
1045 $error,
1046 $options,
1047 ) = @_;
1048 if ($self->verbose) {
1049 my $doing = -e $dst ? "Syncing" : "Getting";
1050 printf STDERR
1052 "%s (%d/%d/%s) %s ... ",
1053 $doing,
1054 1+$i,
1055 1+$last_item,
1056 $self->interval,
1057 $recent_event->{path},
1060 my $max_files_per_connection = $self->max_files_per_connection || 42;
1061 my $success;
1062 if ($self->verbose) {
1063 print STDERR "\n";
1065 push @$xcollector, { rev => $recent_event, i => $i };
1066 if (@$xcollector >= $max_files_per_connection) {
1067 $success = eval {$self->_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1068 my $sleep = $self->sleep_per_connection;
1069 $sleep = 0.42 unless defined $sleep;
1070 Time::HiRes::sleep $sleep;
1071 if ($options->{piecemeal}) {
1072 $status->{mustreturn} = 1;
1073 return;
1075 } else {
1076 return;
1078 if (!$success || $@) {
1079 warn "Warning: Error while mirroring: $@";
1080 push @$error, $@;
1081 sleep 1;
1083 if ($self->verbose) {
1084 print STDERR "DONE\n";
1088 sub _empty_xcollector {
1089 my($self,$xcoll,$pathdb,$recent_events) = @_;
1090 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1091 if ($pathdb) {
1092 $self->_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1094 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1095 @$xcoll = ();
1096 return $success;
1099 sub _register_path {
1100 my($self,$db,$coll,$act) = @_;
1101 my $time = time;
1102 for my $item (@$coll) {
1103 $db->{$item->{path}} =
1105 recentepoch => $item->{epoch},
1106 ($act."edon") => $time,
1111 =head2 (void) $obj->mirror_loop
1113 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1114 What happens/should happen if we miss the interval during a single loop?
1116 =cut
1118 sub mirror_loop {
1119 my($self) = @_;
1120 my $iteration_start = time;
1122 my $Signal = 0;
1123 $SIG{INT} = sub { $Signal++ };
1124 my $loopinterval = $self->loopinterval || 42;
1125 my $after = -999999999;
1126 LOOP: while () {
1127 $self->mirror($after);
1128 last LOOP if $Signal;
1129 my $re = $self->recent_events;
1130 $after = $re->[0]{epoch};
1131 if ($self->verbose) {
1132 local $| = 1;
1133 print "($after)";
1135 if (time - $iteration_start < $loopinterval) {
1136 sleep $iteration_start + $loopinterval - time;
1138 if ($self->verbose) {
1139 local $| = 1;
1140 print "~";
1145 =head2 $success = $obj->mirror_path ( $arrref | $path )
1147 If the argument is a scalar it is treated as a path. The remote path
1148 is mirrored into the local copy. $path is the path found in the
1149 I<recentfile>, i.e. it is relative to the root directory of the
1150 mirror.
1152 If the argument is an array reference then all elements are treated as
1153 a path below the current tree and all are rsynced with a single
1154 command (and a single connection).
1156 =cut
1158 sub mirror_path {
1159 my($self,$path) = @_;
1160 # XXX simplify the two branches such that $path is treated as
1161 # [$path] maybe even demand the argument as an arrayref to
1162 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1163 # interface)
1164 if (ref $path and ref $path eq "ARRAY") {
1165 my $dst = $self->localroot;
1166 mkpath dirname $dst;
1167 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1168 lc $self->filenameroot,
1170 TMPDIR => 1,
1171 UNLINK => 0,
1173 for my $p (@$path) {
1174 print $fh $p, "\n";
1176 $fh->flush;
1177 $fh->unlink_on_destroy(1);
1178 my $gaveup = 0;
1179 my $retried = 0;
1180 while (!$self->rsync->exec
1182 src => join("/",
1183 $self->remoteroot,
1185 dst => $dst,
1186 'files-from' => $fh->filename,
1187 )) {
1188 my($err) = $self->rsync->err;
1189 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1190 if ($self->verbose) {
1191 warn "Info: ignoring link_stat error '$err'";
1193 return 1;
1195 $self->register_rsync_error ($err);
1196 if (++$retried >= 3) {
1197 warn "XXX giving up.";
1198 $gaveup = 1;
1199 last;
1202 unless ($gaveup) {
1203 $self->un_register_rsync_error ();
1205 } else {
1206 my $dst = $self->local_path($path);
1207 mkpath dirname $dst;
1208 while (!$self->rsync->exec
1210 src => join("/",
1211 $self->remoteroot,
1212 $path
1214 dst => $dst,
1215 )) {
1216 my($err) = $self->rsync->err;
1217 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1218 if ($self->verbose) {
1219 warn "Info: ignoring link_stat error '$err'";
1221 return 1;
1223 $self->register_rsync_error ($err);
1225 $self->un_register_rsync_error ();
1227 return 1;
1230 sub _my_current_rfile {
1231 my($self) = @_;
1232 my $rfile;
1233 if ($self->_use_tempfile) {
1234 $rfile = $self->_current_tempfile;
1235 } else {
1236 $rfile = $self->rfile;
1238 return $rfile;
1241 =head2 $path = $obj->naive_path_normalize ($path)
1243 Takes an absolute unix style path as argument and canonicalizes it to
1244 a shorter path if possible, removing things like double slashes or
1245 C</./> and removes references to C<../> directories to get a shorter
1246 unambiguos path. This is used to make the code easier that determines
1247 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1249 =cut
1251 sub naive_path_normalize {
1252 my($self,$path) = @_;
1253 $path =~ s|/+|/|g;
1254 1 while $path =~ s|/[^/]+/\.\./|/|;
1255 $path =~ s|/$||;
1256 $path;
1259 =head2 $ret = $obj->read_recent_1 ( $data )
1261 Delegate of C<recent_events()> on protocol 1
1263 =cut
1265 sub read_recent_1 {
1266 my($self, $data) = @_;
1267 return $data->{recent};
1270 =head2 $array_ref = $obj->recent_events ( %options )
1272 Note: the code relies on the resource being written atomically. We
1273 cannot lock because we may have no write access. If the caller has
1274 write access (eg. aggregate() or update()), it has to care for any
1275 necessary locking.
1277 If $options{after} is specified, only file events after this timestamp
1278 are returned.
1280 If $options{before} is specified, only file events before this
1281 timestamp are returned.
1283 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1284 be returned.
1286 If $options{max} is specified only this many events are returned.
1288 If $options{info} is specified, it must be a hashref. This hashref
1289 will be filled with metadata about the unfiltered recent_events of
1290 this object, in key C<first> there is the first item, in key C<last>
1291 is the last.
1293 =cut
1295 sub recent_events {
1296 my ($self, %options) = @_;
1297 my $info = $options{info};
1298 if ($self->is_slave) {
1299 $self->get_remote_recentfile_as_tempfile;
1301 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1302 -e $rfile_or_tempfile or return [];
1303 my $suffix = $self->serializer_suffix;
1304 my ($data) = eval {
1305 $self->_try_deserialize
1307 $suffix,
1308 $rfile_or_tempfile,
1311 my $err = $@;
1312 if ($err or !$data) {
1313 return [];
1315 my $re;
1316 if (reftype $data eq 'ARRAY') { # protocol 0
1317 $re = $data;
1318 } else {
1319 $re = $self->_recent_events_protocol_x
1321 $data,
1322 $rfile_or_tempfile,
1325 return $re unless defined $options{after}; # XXX same for before and max
1326 my $last_item = $#$re;
1327 if ($info) {
1328 $info->{first} = $re->[0];
1329 $info->{last} = $re->[-1];
1331 if (defined $options{after}) {
1332 if ($re->[0]{epoch} > $options{after}) {
1333 if (
1334 my $f = first
1335 {$re->[$_]{epoch} <= $options{after}}
1336 0..$#$re
1338 $last_item = $f-1;
1340 } else {
1341 $last_item = -1;
1344 my $first_item = 0;
1345 if (defined $options{before}) {
1346 if ($re->[0]{epoch} > $options{before}) {
1347 if (
1348 my $f = first
1349 {$re->[$_]{epoch} < $options{before}}
1350 0..$last_item
1352 $first_item = $f;
1354 } else {
1355 $first_item = 0;
1358 my @rre = splice @$re, $first_item, 1+$last_item-$first_item;
1359 if ($options{'skip-deletes'}) {
1360 @rre = grep { $_->{type} ne "delete" } @rre;
1362 if ($options{max} && @rre > $options{max}) {
1363 @rre = splice @rre, 0, $options{max};
1365 \@rre;
1368 sub _recent_events_protocol_x {
1369 my($self,
1370 $data,
1371 $rfile_or_tempfile,
1372 ) = @_;
1373 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1374 # we may be reading meta for the first time
1375 while (my($k,$v) = each %{$data->{meta}}) {
1376 next if $k ne lc $k; # "Producers"
1377 next if defined $self->$k;
1378 $self->$k($v);
1380 my $re = $self->$meth ($data);
1381 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1382 my $minmax = { mtime => $stat[9] };
1383 if (@$re) {
1384 $minmax->{min} = $re->[-1]{epoch};
1385 $minmax->{max} = $re->[0]{epoch};
1387 $self->minmax ( $minmax );
1388 return $re;
1391 sub _try_deserialize {
1392 my($self,
1393 $suffix,
1394 $rfile_or_tempfile,
1395 ) = @_;
1396 if ($suffix eq ".yaml") {
1397 require YAML::Syck;
1398 YAML::Syck::LoadFile($rfile_or_tempfile);
1399 } elsif ($HAVE->{"Data::Serializer"}) {
1400 my $serializer = Data::Serializer->new
1401 ( serializer => $serializers{$suffix} );
1402 my $serialized = do
1404 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1405 local $/;
1406 <$fh>;
1408 $serializer->raw_deserialize($serialized);
1409 } else {
1410 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1414 =head2 $ret = $obj->rfilename
1416 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1417 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1419 =cut
1421 sub rfilename {
1422 my($self) = @_;
1423 my $file = sprintf("%s-%s%s",
1424 $self->filenameroot,
1425 $self->interval,
1426 $self->serializer_suffix,
1428 return $file;
1431 =head2 $str = $self->remote_dir
1433 The directory we are mirroring from.
1435 =cut
1437 sub remote_dir {
1438 my($self, $set) = @_;
1439 if (defined $set) {
1440 $self->_remote_dir ($set);
1442 my $x = $self->_remote_dir;
1443 $self->is_slave (1);
1444 return $x;
1447 =head2 $str = $obj->remoteroot
1449 =head2 (void) $obj->remoteroot ( $set )
1451 Get/Set the composed prefix needed when rsyncing from a remote module.
1452 If remote_host, remote_module, and remote_dir are set, it is composed
1453 from these.
1455 =cut
1457 sub remoteroot {
1458 my($self, $set) = @_;
1459 if (defined $set) {
1460 $self->_remoteroot($set);
1462 my $remoteroot = $self->_remoteroot;
1463 unless (defined $remoteroot) {
1464 $remoteroot = sprintf
1466 "%s%s%s",
1467 defined $self->remote_host ? ($self->remote_host."::") : "",
1468 defined $self->remote_module ? ($self->remote_module."/") : "",
1469 defined $self->remote_dir ? $self->remote_dir : "",
1471 $self->_remoteroot($remoteroot);
1473 return $remoteroot;
1476 =head2 my $rfile = $obj->rfile
1478 Returns the full path of the I<recentfile>
1480 =cut
1482 sub rfile {
1483 my($self) = @_;
1484 my $rfile = $self->_rfile;
1485 return $rfile if defined $rfile;
1486 $rfile = File::Spec->catfile
1487 ($self->localroot,
1488 $self->rfilename,
1490 $self->_rfile ($rfile);
1491 return $rfile;
1494 =head2 $rsync_obj = $obj->rsync
1496 The File::Rsync object that this object uses for communicating with an
1497 upstream server.
1499 =cut
1501 sub rsync {
1502 my($self) = @_;
1503 my $rsync = $self->_rsync;
1504 unless (defined $rsync) {
1505 my $rsync_options = $self->rsync_options || {};
1506 if ($HAVE->{"File::Rsync"}) {
1507 $rsync = File::Rsync->new($rsync_options);
1508 $self->_rsync($rsync);
1509 } else {
1510 die "File::Rsync required for rsync operations. Cannot continue";
1513 return $rsync;
1516 =head2 (void) $obj->register_rsync_error($err)
1518 =head2 (void) $obj->un_register_rsync_error()
1520 Register_rsync_error is called whenever the File::Rsync object fails
1521 on an exec (say, connection doesn't succeed). It issues a warning and
1522 sleeps for an increasing amount of time. Un_register_rsync_error
1523 resets the error count. See also accessor C<max_rsync_errors>.
1525 =cut
1528 my $no_success_count = 0;
1529 my $no_success_time = 0;
1530 sub register_rsync_error {
1531 my($self, $err) = @_;
1532 chomp $err;
1533 $no_success_time = time;
1534 $no_success_count++;
1535 my $max_rsync_errors = $self->max_rsync_errors;
1536 $max_rsync_errors = 12 unless defined $max_rsync_errors;
1537 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1538 require Carp;
1539 Carp::confess
1541 sprintf
1543 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1544 $self->interval,
1545 $err,
1546 $no_success_count,
1549 my $sleep = 12 * $no_success_count;
1550 $sleep = 120 if $sleep > 120;
1551 require Carp;
1552 Carp::cluck
1553 (sprintf
1555 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1556 scalar(localtime($no_success_time)),
1557 $self->interval,
1558 $err,
1559 $sleep,
1561 sleep $sleep
1563 sub un_register_rsync_error {
1564 my($self) = @_;
1565 $no_success_time = 0;
1566 $no_success_count = 0;
1570 =head2 $clone = $obj->_sparse_clone
1572 Clones just as much from itself that it does not hurt. Experimental method.
1574 =cut
1576 sub _sparse_clone {
1577 my($self) = @_;
1578 my $new = bless {}, ref $self;
1579 for my $m (qw(
1580 _interval
1581 _localroot
1582 _remoteroot
1583 _rfile
1584 _use_tempfile
1585 aggregator
1586 filenameroot
1587 is_slave
1588 max_files_per_connection
1589 protocol
1590 rsync_options
1591 serializer_suffix
1592 sleep_per_connection
1593 verbose
1594 )) {
1595 my $o = $self->$m;
1596 $o = Storable::dclone $o if ref $o;
1597 $new->$m($o);
1599 $new;
1602 =head2 $boolean = OBJ->ttl_reached ()
1604 =cut
1606 sub ttl_reached {
1607 my($self) = @_;
1608 my $have_mirrored = $self->have_mirrored;
1609 my $now = Time::HiRes::time;
1610 my $ttl = $self->ttl;
1611 $ttl = 24.2 unless defined $ttl;
1612 if ($now > $have_mirrored + $ttl) {
1613 return 1;
1615 return 0;
1618 =head2 (void) $obj->unlock()
1620 Unlocking is implemented with an C<rmdir> on a locking directory
1621 (C<.lock> appended to $rfile).
1623 =cut
1625 sub unlock {
1626 my($self) = @_;
1627 return unless $self->_is_locked;
1628 my $rfile = $self->rfile;
1629 rmdir "$rfile.lock";
1630 $self->_is_locked (0);
1633 =head2 $ret = $obj->update ($path, $type)
1635 Enter one file into the local I<recentfile>. $path is the (usually
1636 absolute) path. If the path is outside the I<our> tree, then it is
1637 ignored.
1639 $type is one of C<new> or C<delete>.
1641 The new file event is uhshifted to the array of recent_events and the
1642 array is shortened to the length of the timespan allowed. This is
1643 usually the timespan specified by the interval of this recentfile but
1644 as long as this recentfile has not been merged to another one, the
1645 timespan may grow without bounds.
1647 =cut
1649 sub update {
1650 my($self,$path,$type) = @_;
1651 die "update called without path argument" unless defined $path;
1652 die "update called without type argument" unless defined $type;
1653 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1654 my $canonmeth = $self->canonize;
1655 unless ($canonmeth) {
1656 $canonmeth = "naive_path_normalize";
1658 $path = $self->$canonmeth($path);
1659 my $lrd = $self->localroot;
1660 if ($path =~ s|^\Q$lrd\E||) {
1661 $path =~ s|^/||;
1662 my $interval = $self->interval;
1663 my $secs = $self->interval_secs();
1664 $self->lock;
1665 # you must calculate the time after having locked, of course
1666 my $epoch = Time::HiRes::time;
1667 my $recent = $self->recent_events;
1668 $recent ||= [];
1669 my $oldest_allowed = 0;
1670 if (my $merged = $self->merged) {
1671 # XXX _bigfloat!
1672 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
1673 } else {
1674 # as long as we are not merged at all, no limits!
1676 TRUNCATE: while (@$recent) {
1677 if ($recent->[-1]{epoch} < $oldest_allowed) { # XXX _bigfloatlt!
1678 pop @$recent;
1679 } else {
1680 last TRUNCATE;
1683 # remove older duplicates of this $path, irrespective of $type:
1684 $recent = [ grep { $_->{path} ne $path } @$recent ];
1686 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1687 $self->write_recent($recent);
1688 $self->_assert_symlink;
1689 $self->unlock;
1693 =head2 uptodate
1695 True if this object has mirrored the complete interval covered by the
1696 current recentfile.
1698 *** WIP ***
1700 =cut
1702 sub uptodate {
1703 my($self, $debug) = @_;
1704 if ($self->ttl_reached){
1705 if ($debug) {
1706 warn "ttl_reached returned true, so we are not uptodate";
1708 return 0 ;
1711 # look if recentfile has unchanged timestamp
1712 my $minmax = $self->minmax;
1713 if (exists $minmax->{mtime}) {
1714 my $rfile = $self->_my_current_rfile;
1715 my @stat = stat $rfile;
1716 my $mtime = $stat[9];
1717 if ($mtime > $minmax->{mtime}) {
1718 if ($debug) {
1719 warn "$mtime > $minmax->{mtime}, so we are not uptodate";
1721 return 0;
1722 } else {
1723 my $covered = $self->done->covered(@$minmax{qw(max min)});
1724 if ($debug) {
1725 warn "minmax covered[$covered], so we return that";
1727 return $covered;
1730 if ($debug) {
1731 warn "fallthrough, so not uptodate";
1733 return 0;
1736 =head2 $obj->write_recent ($recent_files_arrayref)
1738 Writes a I<recentfile> based on the current reflection of the current
1739 state of the tree limited by the current interval.
1741 =cut
1743 sub write_recent {
1744 my ($self,$recent) = @_;
1745 die "write_recent called without argument" unless defined $recent;
1746 my $meth = sprintf "write_%d", $self->protocol;
1747 $self->$meth($recent);
1750 =head2 $obj->write_0 ($recent_files_arrayref)
1752 Delegate of C<write_recent()> on protocol 0
1754 =cut
1756 sub write_0 {
1757 my ($self,$recent) = @_;
1758 my $rfile = $self->rfile;
1759 YAML::Syck::DumpFile("$rfile.new",$recent);
1760 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1763 =head2 $obj->write_1 ($recent_files_arrayref)
1765 Delegate of C<write_recent()> on protocol 1
1767 =cut
1769 sub write_1 {
1770 my ($self,$recent) = @_;
1771 my $rfile = $self->rfile;
1772 my $suffix = $self->serializer_suffix;
1773 my $data = {
1774 meta => $self->meta_data,
1775 recent => $recent,
1777 my $serialized;
1778 if ($suffix eq ".yaml") {
1779 $serialized = YAML::Syck::Dump($data);
1780 } elsif ($HAVE->{"Data::Serializer"}) {
1781 my $serializer = Data::Serializer->new
1782 ( serializer => $serializers{$suffix} );
1783 $serialized = $serializer->raw_serialize($data);
1784 } else {
1785 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1787 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
1788 print $fh $serialized;
1789 close $fh or die "Could not close '$rfile.new': $!";
1790 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1793 BEGIN {
1794 my @pod_lines =
1795 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1797 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1799 The idea is that we want to have a short file that records really
1800 recent changes. So that a fresh mirror can be kept fresh as long as
1801 the connectivity is given. Then we want longer files that record the
1802 history before. So when the mirror falls behind the update period
1803 reflected in the shortest file, it can complement the list of recent
1804 file events with the next one. And if this is not long enough we want
1805 another one, again a bit longer. And we want one that completes the
1806 history back to the oldest file. The index files do contain the
1807 complete list of current files. The larger an index file is the less
1808 often it is updated. For practical reasons adjacent files will often
1809 overlap a bit but this is neither necessary nor enforced. That's the
1810 basic idea. The following example represents a tree that has a few
1811 updates every day:
1813 RECENT.recent -> RECENT-1h.yaml
1814 RECENT-6h.yaml
1815 RECENT-1d.yaml
1816 RECENT-1M.yaml
1817 RECENT-1W.yaml
1818 RECENT-1Q.yaml
1819 RECENT-1Y.yaml
1820 RECENT-Z.yaml
1822 The first file is the principal file, in so far it is the one that is
1823 written first after a filesystem change. Usually a symlink links to it
1824 with a filename that has the same filenameroot and the suffix
1825 C<.recent>. On systems that do not support symlinks there is a plain
1826 copy maintained instead.
1828 The last file, the Z file, contains the complementary files that are
1829 in none of the other files. It does never contain C<deletes>. Besides
1830 this it serves the role of a recovery mechanism or spill over pond.
1831 When things go wrong, it's a valuable controlling instance to hold the
1832 differences between the collection of limited interval files and the
1833 actual filesystem.
1835 =head2 A SINGLE RECENTFILE
1837 A I<recentfile> consists of a hash that has two keys: C<meta> and
1838 C<recent>. The C<meta> part has metadata and the C<recent> part has a
1839 list of fileobjects.
1841 =head2 THE META PART
1843 Here we find things that are pretty much self explaining: all
1844 lowercase attributes are accessors and as such explained somewhere
1845 above in this manpage. The uppercase attribute C<Producers> contains
1846 version information about involved software components. Nothing to
1847 worry about as I believe.
1849 =head2 THE RECENT PART
1851 This is the interesting part. Every entry refers to some filesystem
1852 change (with path, epoch, type). The epoch value is the point in time
1853 when some change was I<registered>. Do not be tempted to believe that
1854 the entry has a direct relation to something like modification time or
1855 change time on the filesystem level. The timestamp (I<epoch> element)
1856 is a floating point number and does practically never correspond
1857 exactly to the data recorded in the filesystem but rather to the time
1858 when some process succeeded to report to the I<recentfile> mechanism
1859 that something has changed. This is why many parts of the code refer
1860 to I<events>, because we merely try to record the I<event> of the
1861 discovery of a change, not the time of the change itself.
1863 All these entries can be devided into two types (denoted by the
1864 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
1865 C<new>s. Deletes are C<delete>s.
1867 Another distinction is for objects with an epoch timestamp and others
1868 without. All files that were already existing on the filesystem before
1869 the I<recentfile> mechanism was installed, get recorded with a
1870 timestamp of zero.
1872 Besides an C<epoch> and a C<type> attribute we find a third one:
1873 C<path>. This path is relative to the directory we find the
1874 I<recentfile> in.
1876 The order of the entries in the I<recentfile> is by decreasing epoch
1877 attribute. These are either 0 or a unique floating point number. They
1878 are zero for events that were happening either before the time that
1879 the I<recentfile> mechanism was set up or were left undiscovered for a
1880 while and never handed over to update(). They are floating point
1881 numbers for all events being regularly handed to update(). And when
1882 the server has ntp running correctly, then the timestamps are
1883 actually decreasing and unique.
1885 =head1 CORRUPTION AND RECOVERY
1887 If the origin host breaks the promise to deliver consistent and
1888 complete I<recentfiles> then the way back to sanity shall be achieved
1889 through either the C<zloop> (still TBD) or traditional rsyncing
1890 between the hosts. For example, if the origin server forgets to deploy
1891 ntp and the clock on it jumps backwards some day, then this would
1892 probably go unnoticed for a while and many software components that
1893 rely on the time never running backwards will make wrong decisions.
1894 After some time this accident would probably still be found in one of
1895 the I<recentfiles> but would become meaningless as soon as a mirror
1896 has run through the sanitizing procedures. Same goes for origin hosts
1897 that forget to include or deliberately omit some files.
1899 =head1 SERIALIZERS
1901 The following suffixes are supported and trigger the use of these
1902 serializers:
1904 =over 4
1906 =item C<< ".yaml" => "YAML::Syck" >>
1908 =item C<< ".json" => "JSON" >>
1910 =item C<< ".sto" => "Storable" >>
1912 =item C<< ".dd" => "Data::Dumper" >>
1914 =back
1916 =cut
1918 BEGIN {
1919 my @pod_lines =
1920 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1922 =head1 INTERVAL SPEC
1924 An interval spec is a primitive way to express time spans. Normally it
1925 is composed from an integer and a letter.
1927 As a special case, a string that consists only of the single letter
1928 C<Z>, stands for unlimited time.
1930 The following letters express the specified number of seconds:
1932 =over 4
1934 =item C<< s => 1 >>
1936 =item C<< m => 60 >>
1938 =item C<< h => 60*60 >>
1940 =item C<< d => 60*60*24 >>
1942 =item C<< W => 60*60*24*7 >>
1944 =item C<< M => 60*60*24*30 >>
1946 =item C<< Q => 60*60*24*90 >>
1948 =item C<< Y => 60*60*24*365.25 >>
1950 =back
1952 =cut
1954 =head1 BACKGROUND
1956 This is about speeding up rsync operation on large trees to many
1957 places. Uses a small metadata cocktail and pull technology.
1959 =head2 NON-COMPETITORS
1961 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
1962 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
1963 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
1964 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
1965 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
1966 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
1968 rsnapshot www.rsnapshot.org focus on backup
1969 csync www.csync.org more like unison
1971 =head2 COMPETITORS
1973 The problem to solve which clusters and ftp mirrors and otherwise
1974 replicated datasets like CPAN share: how to transfer only a minimum
1975 amount of data to determine the diff between two hosts.
1977 Normally it takes a long time to determine the diff itself before it
1978 can be transferred. Known solutions at the time of this writing are
1979 csync2, and rsync 3 batch mode.
1981 For many years the best solution was csync2 which solves the
1982 problem by maintining a sqlite database on both ends and talking a
1983 highly sophisticated protocol to quickly determine which files to send
1984 and which to delete at any given point in time. Csync2 is often
1985 inconvenient because the act of syncing demands quite an intimate
1986 relationship between the sender and the receiver and suffers when the
1987 number of syncing sites is large or connections are unreliable.
1989 Rsync 3 batch mode works around these problems by providing rsync-able
1990 batch files which allow receiving nodes to replay the history of the
1991 other nodes. This reduces the need to have an incestuous relation but
1992 it has the disadvantage that these batch files replicate the contents
1993 of the involved files. This seems inappropriate when the nodes already
1994 have a means of communicating over rsync.
1996 rersyncrecent solves this problem with a couple of (usually 2-10)
1997 index files which cover different overlapping time intervals. The
1998 master writes these files and the clients can construct the full tree
1999 from the information contained in them. The most recent index file
2000 usually covers the last seconds or minutes or hours of the tree and
2001 depending on the needs, slaves can rsync every few seconds and then
2002 bring their trees in full sync.
2004 The rersyncrecent mode was developed for CPAN but I hope it is a
2005 convenient and economic general purpose solution. I'm looking forward
2006 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2007 then ... the first FUSE based CPAN filesystem anyone?
2009 =head1 AUTHOR
2011 Andreas König
2013 =head1 BUGS
2015 Please report any bugs or feature requests through the web interface
2017 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2018 I will be notified, and then you'll automatically be notified of
2019 progress on your bug as I make changes.
2021 =head1 SUPPORT
2023 You can find documentation for this module with the perldoc command.
2025 perldoc File::Rsync::Mirror::Recentfile
2027 You can also look for information at:
2029 =over 4
2031 =item * RT: CPAN's request tracker
2033 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2035 =item * AnnoCPAN: Annotated CPAN documentation
2037 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2039 =item * CPAN Ratings
2041 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2043 =item * Search CPAN
2045 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2047 =back
2050 =head1 ACKNOWLEDGEMENTS
2052 Thanks to RJBS for module-starter.
2054 =head1 COPYRIGHT & LICENSE
2056 Copyright 2008 Andreas König.
2058 This program is free software; you can redistribute it and/or modify it
2059 under the same terms as Perl itself.
2062 =cut
2064 1; # End of File::Rsync::Mirror::Recentfile
2066 # Local Variables:
2067 # mode: cperl
2068 # cperl-indent-level: 4
2069 # End: