again wrong algorithm
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blobf78f4d46c657abc2fe40f01466ac77efd4925eae
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 my $HAVE = {};
19 for my $package (
20 "Data::Serializer",
21 "File::Rsync"
22 ) {
23 $HAVE->{$package} = eval qq{ require $package; };
25 use Config;
26 use File::Basename qw(dirname fileparse);
27 use File::Copy qw(cp);
28 use File::Path qw(mkpath);
29 use File::Temp;
30 use List::Util qw(first min);
31 use Scalar::Util qw(reftype);
32 use Storable;
33 use Time::HiRes qw();
34 use YAML::Syck;
36 use version; our $VERSION = qv('0.0.1');
39 use constant MAX_INT => ~0>>1; # anything better?
40 use constant DEFAULT_PROTOCOL => 1;
42 # cf. interval_secs
43 my %seconds;
45 # maybe subclass if this mapping is bad?
46 my %serializers;
48 =head1 SYNOPSIS
50 B<!!!! PRE-ALPHA ALERT !!!!>
52 Nothing in here is believed to be stable, nothing yet intended for
53 public consumption. The plan is to provide a script in one of the next
54 releases that acts as a frontend for all the backend functionality.
55 Option and method names will very likely change.
57 For the rationale see the section BACKGROUND.
59 This is published only for developers of the (yet to be named)
60 script(s).
62 Writer (of a single file):
64 use File::Rsync::Mirror::Recentfile;
65 my $fr = File::Rsync::Mirror::Recentfile->new
67 interval => q(6h),
68 filenameroot => "RECENT",
69 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
70 localroot => "/home/ftp/pub/PAUSE/authors/",
71 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
73 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
75 Reader/mirrorer:
77 my $rf = File::Rsync::Mirror::Recentfile->new
79 filenameroot => "RECENT",
80 ignore_link_stat_errors => 1,
81 interval => q(6h),
82 localroot => "/home/ftp/pub/PAUSE/authors",
83 remote_dir => "",
84 remote_host => "pause.perl.org",
85 remote_module => "authors",
86 rsync_options => {
87 compress => 1,
88 'rsync-path' => '/usr/bin/rsync',
89 links => 1,
90 times => 1,
91 'omit-dir-times' => 1,
92 checksum => 1,
94 verbose => 1,
96 $rf->mirror;
98 Aggregator (usually the writer):
100 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
101 $rf->aggregate;
103 =head1 EXPORT
105 No exports.
107 =head1 CONSTRUCTORS
109 =head2 my $obj = CLASS->new(%hash)
111 Constructor. On every argument pair the key is a method name and the
112 value is an argument to that method name.
114 If a recentfile for this resource already exists, metadata that are
115 not defined by the constructor will be fetched from there as soon as
116 it is being read by recent_events().
118 =cut
120 sub new {
121 my($class, @args) = @_;
122 my $self = bless {}, $class;
123 while (@args) {
124 my($method,$arg) = splice @args, 0, 2;
125 $self->$method($arg);
127 unless (defined $self->protocol) {
128 $self->protocol(DEFAULT_PROTOCOL);
130 unless (defined $self->filenameroot) {
131 $self->filenameroot("RECENT");
133 unless (defined $self->serializer_suffix) {
134 $self->serializer_suffix(".yaml");
136 return $self;
139 =head2 my $obj = CLASS->new_from_file($file)
141 Constructor. $file is a I<recentfile>.
143 =cut
145 sub new_from_file {
146 my($class, $file) = @_;
147 my $self = bless {}, $class;
148 $self->_rfile($file);
149 #?# $self->lock;
150 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
151 local $/;
152 <$fh>;
154 # XXX: we can skip this step when the metadata are sufficient, but
155 # we cannot parse the file without some magic stuff about
156 # serialized formats
157 while (-l $file) {
158 my($name,$path) = fileparse $file;
159 my $symlink = readlink $file;
160 if ($symlink =~ m|/|) {
161 die "FIXME: filenames containing '/' not supported, got $symlink";
163 $file = File::Spec->catfile ( $path, $symlink );
165 my($name,$path,$suffix) = fileparse $file, keys %serializers;
166 $self->serializer_suffix($suffix);
167 $self->localroot($path);
168 die "Could not determine file format from suffix" unless $suffix;
169 my $deserialized;
170 if ($suffix eq ".yaml") {
171 require YAML::Syck;
172 $deserialized = YAML::Syck::LoadFile($file);
173 } elsif ($HAVE->{"Data::Serializer"}) {
174 my $serializer = Data::Serializer->new
175 ( serializer => $serializers{$suffix} );
176 $deserialized = $serializer->raw_deserialize($serialized);
177 } else {
178 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
180 while (my($k,$v) = each %{$deserialized->{meta}}) {
181 next if $k ne lc $k; # "Producers"
182 $self->$k($v);
184 unless (defined $self->protocol) {
185 $self->protocol(DEFAULT_PROTOCOL);
187 return $self;
190 =head1 ACCESSORS
192 =cut
194 my @accessors;
196 BEGIN {
197 @accessors = (
198 "_current_tempfile",
199 "_current_tempfile_fh",
200 "_done",
201 "_interval",
202 "_is_locked",
203 "_localroot",
204 "_merged",
205 "_pathdb",
206 "_remote_dir",
207 "_remoteroot",
208 "_rfile",
209 "_rsync",
210 "_use_tempfile",
213 my @pod_lines =
214 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
216 =over 4
218 =item aggregator
220 A list of interval specs that tell the aggregator which I<recentfile>s
221 are to be produced.
223 =item canonize
225 The name of a method to canonize the path before rsyncing. Only
226 supported value is C<naive_path_normalize>. Defaults to that.
228 =item comment
230 A comment about this tree and setup.
232 =item filenameroot
234 The (prefix of the) filename we use for this I<recentfile>. Defaults to
235 C<RECENT>.
237 =item have_mirrored
239 Timestamp remembering when we mirrored this recentfile the last time.
240 Only relevant for slaves.
242 =item ignore_link_stat_errors
244 If set to true, rsync errors are ignored that complain about link stat
245 errors. These seem to happen only when there are files missing at the
246 origin. In race conditions this can always happen, so it is
247 recommended to set this value to true.
249 =item is_slave
251 If set to true, this object will fetch a new recentfile from remote
252 when the timespan between the last mirror (see have_mirrored) and now
253 is too large (currently hardcoded arbitrary 420 seconds).
255 =item locktimeout
257 After how many seconds shall we die if we cannot lock a I<recentfile>?
258 Defaults to 600 seconds.
260 =item loopinterval
262 When mirror_loop is called, this accessor can specify how much time
263 every loop shall at least take. If the work of a loop is done before
264 that time has gone, sleeps for the rest of the time. Defaults to
265 arbitrary 42 seconds.
267 =item max_files_per_connection
269 Maximum number of files that are transferred on a single rsync call.
270 Setting it higher means higher performance at the price of holding
271 connections longer and potentially disturbing other users in the pool.
272 Defaults to the arbitrary value 42.
274 =item max_rsync_errors
276 When rsync operations encounter that many errors without any resetting
277 success in between, then we die. Defaults to arbitrary 12. A value of
278 -1 means we run forever ignoring all rsync errors.
280 =item minmax
282 Hashref remembering when we read the recent_events from this file the
283 last time and what the timespan was.
285 =item protocol
287 When the RECENT file format changes, we increment the protocol. We try
288 to support older protocols in later releases.
290 =item remote_host
292 The host we are mirroring from. Leave empty for the local filesystem.
294 =item remote_module
296 Rsync servers have so called modules to separate directory trees from
297 each other. Put here the name of the module under which we are
298 mirroring. Leave empty for local filesystem.
300 =item rsync_options
302 Things like compress, links, times or checksums. Passed in to the
303 File::Rsync object used to run the mirror.
305 =item serializer_suffix
307 Mostly untested accessor. The only well tested format for
308 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
309 Data::Serializer. But in principle other formats are supported as
310 well. See section SERIALIZERS below.
312 =item sleep_per_connection
314 Sleep that many seconds (floating point OK) after every chunk of rsyncing
315 has finished. Defaults to arbitrary 0.42.
317 =item ttl
319 Time to live. Number of seconds after which this recentfile must be
320 fetched again from the origin server. Only relevant for slaves.
321 Defaults to arbitrary 24.2 seconds.
323 =item verbose
325 Boolean to turn on a bit verbosity.
327 =back
329 =cut
331 use accessors @accessors;
333 =head1 METHODS
335 =head2 (void) $obj->aggregate
337 Takes all intervals that are collected in the accessor called
338 aggregator. Sorts them by actual length of the interval.
339 Removes those that are shorter than our own interval. Then merges this
340 object into the next larger object. The merging continues upwards
341 as long as the next I<recentfile> is old enough to warrant a merge.
343 If a merge is warranted is decided according to the interval of the
344 previous interval so that larger files are not so often updated as
345 smaller ones.
347 Here is an example to illustrate the behaviour. Given aggregators
349 1h 1d 1W 1M 1Q 1Y Z
351 then
353 1h updates 1d on every call to aggregate()
354 1d updates 1W earliest after 1h
355 1W updates 1M earliest after 1d
356 1M updates 1Q earliest after 1W
357 1Q updates 1Y earliest after 1M
358 1Y updates Z earliest after 1Q
360 Note that all but the smallest recentfile get updated at an arbitrary
361 rate and as such are quite useless on their own.
363 =cut
365 sub aggregate {
366 my($self) = @_;
367 my @aggs = sort { $a->{secs} <=> $b->{secs} }
368 grep { $_->{secs} >= $self->interval_secs }
369 map { { interval => $_, secs => $self->interval_secs($_)} }
370 $self->interval, @{$self->aggregator || []};
371 $aggs[0]{object} = $self;
372 AGGREGATOR: for my $i (0..$#aggs-1) {
373 my $this = $aggs[$i]{object};
374 my $next = $this->_sparse_clone;
375 $next->interval($aggs[$i+1]{interval});
376 my $want_merge = 0;
377 if ($i == 0) {
378 $want_merge = 1;
379 } else {
380 my $next_rfile = $next->rfile;
381 if (-e $next_rfile) {
382 my $prev = $aggs[$i-1]{object};
383 local $^T = time;
384 my $next_age = 86400 * -M $next_rfile;
385 if ($next_age > $prev->interval_secs) {
386 $want_merge = 1;
388 } else {
389 $want_merge = 1;
392 if ($want_merge) {
393 $next->merge($this);
394 $aggs[$i+1]{object} = $next;
395 } else {
396 last AGGREGATOR;
401 # collect file size and mtime for all files of this aggregate
402 sub _debug_aggregate {
403 my($self) = @_;
404 my @aggs = sort { $a->{secs} <=> $b->{secs} }
405 map { { interval => $_, secs => $self->interval_secs($_)} }
406 $self->interval, @{$self->aggregator || []};
407 my $report = [];
408 for my $i (0..$#aggs) {
409 my $this = Storable::dclone $self;
410 $this->interval($aggs[$i]{interval});
411 my $rfile = $this->rfile;
412 my @stat = stat $rfile;
413 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
415 $report;
418 # (void) $self->_assert_symlink()
419 sub _assert_symlink {
420 my($self) = @_;
421 my $recentrecentfile = File::Spec->catfile
423 $self->localroot,
424 sprintf
426 "%s.recent",
427 $self->filenameroot
430 if ($Config{d_symlink} eq "define") {
431 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
432 if (-l $recentrecentfile) {
433 my $found_symlink = readlink $recentrecentfile;
434 if ($found_symlink eq $self->rfilename) {
435 return;
436 } else {
437 $howto_create_symlink = 2;
439 } else {
440 $howto_create_symlink = 1;
442 if (1 == $howto_create_symlink) {
443 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
444 } else {
445 unlink "$recentrecentfile.$$"; # may fail
446 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
447 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
449 } else {
450 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
451 unlink "$recentrecentfile.$$"; # may fail
452 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
453 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
457 =head2 $done = $obj->done
459 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
460 that keeps track of rsync activities. Only used/needed when we are a
461 mirroring slave.
463 =cut
465 sub done {
466 my($self) = @_;
467 my $done = $self->_done;
468 if (!$done) {
469 require File::Rsync::Mirror::Recentfile::Done;
470 $done = File::Rsync::Mirror::Recentfile::Done->new();
471 $self->_done ( $done );
473 return $done;
476 =head2 $success = $obj->full_mirror
478 (TBD) Mirrors the whole remote site, starting with the smallest I<recentfile>,
479 switching to larger ones ...
481 =cut
483 sub full_mirror {
484 my($self) = @_;
485 die "FIXME: Not yet implemented";
488 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ($rfilename)
490 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
492 Stores the remote I<recentfile> locally as a tempfile. $rfilename must
493 be a plain filename without path separators. The second form fetches
494 the file with the default name. The caller is responsible to remove
495 the file after use.
497 Note: if you're intending to act as an rsync server for other slaves,
498 then you must prefer this method to mirror (and read) recentfiles over
499 get_remotefile(). Otherwise downstream mirrors would expect you to
500 have files that you do not have yet.
502 Note: currently we have an arbitrary brake built into the method:
503 before 4.42 seconds are over since the last download we will return
504 without downloading. XXX
506 =cut
508 sub get_remote_recentfile_as_tempfile {
509 my($self, $rfilename) = @_;
510 mkpath $self->localroot;
511 my $fh;
512 if ($rfilename) {
513 $self->_use_tempfile (1); # why?
514 } elsif ( $self->_use_tempfile() ) {
515 return $self->_current_tempfile if ! $self->ttl_reached;
516 $fh = $self->_current_tempfile_fh;
517 $rfilename = $self->rfilename;
518 } else {
519 $rfilename = $self->rfilename;
522 return $rfilename
523 if (!$rfilename
524 && $self->have_mirrored
525 && Time::HiRes::time-$self->have_mirrored < 4.42
527 die "Alert: illegal filename[$rfilename] contains a slash" if $rfilename =~ m|/|;
528 my $dst;
529 if ($fh) {
530 $dst = $self->_current_tempfile;
531 } else {
532 $fh = File::Temp->new
533 (TEMPLATE => sprintf(".%s-XXXX",
534 $rfilename,
536 DIR => $self->localroot,
537 SUFFIX => $self->serializer_suffix,
538 UNLINK => $self->_use_tempfile,
540 if ($self->_use_tempfile) {
541 $self->_current_tempfile_fh ($fh); # delay self destruction
543 $dst = $fh->filename;
544 $self->_current_tempfile ($dst);
545 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
546 if (defined $rfile && -e $rfile) {
547 # saving on bandwidth. Might need to be configurable
548 # $self->bandwidth_is_cheap?
549 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
552 my $src = join ("/",
553 $self->remoteroot,
554 $rfilename,
556 if ($self->verbose) {
557 my $doing = -e $dst ? "Syncing" : "Getting";
558 printf STDERR
560 "%s (1/1) temporary %s ... ",
561 $doing,
562 $dst,
565 my $gaveup = 0;
566 my $retried = 0;
567 while (!$self->rsync->exec(
568 src => $src,
569 dst => $dst,
570 )) {
571 $self->register_rsync_error ($self->rsync->err);
572 if (++$retried >= 3) {
573 warn "XXX giving up";
574 $gaveup = 1;
575 last;
578 unless ($gaveup) {
579 $self->have_mirrored (Time::HiRes::time);
580 $self->un_register_rsync_error ();
582 if ($self->verbose) {
583 print STDERR "DONE\n";
585 my $mode = 0644;
586 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
587 return $dst;
590 =head2 $localpath = $obj->get_remotefile ( $relative_path )
592 Rsyncs one single remote file to local filesystem.
594 Note: no locking is done on this file. Any number of processes may
595 mirror this object.
597 Note II: do not use for recentfiles. If you are a cascading
598 slave/server combination, it would confuse other slaves. They would
599 expect the contents of these recentfiles to be available. Use
600 get_remote_recentfile_as_tempfile() instead.
602 =cut
604 sub get_remotefile {
605 my($self, $path) = @_;
606 my $dst = File::Spec->catfile($self->localroot, $path);
607 mkpath dirname $dst;
608 if ($self->verbose) {
609 my $doing = -e $dst ? "Syncing" : "Getting";
610 printf STDERR
612 "%s (1/1) %s ... ",
613 $doing,
614 $path,
617 while (!$self->rsync->exec(
618 src => join("/",
619 $self->remoteroot,
620 $path),
621 dst => $dst,
622 )) {
623 $self->register_rsync_error ($self->rsync->err);
625 $self->un_register_rsync_error ();
626 if ($self->verbose) {
627 print STDERR "DONE\n";
629 return $dst;
632 =head2 $obj->interval ( $interval_spec )
634 Get/set accessor. $interval_spec is a string and described below in
635 the section INTERVAL SPEC.
637 =cut
639 sub interval {
640 my ($self, $interval) = @_;
641 if (@_ >= 2) {
642 $self->_interval($interval);
643 $self->_rfile(undef);
645 $interval = $self->_interval;
646 unless (defined $interval) {
647 # do not ask the $self too much, it recurses!
648 require Carp;
649 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
651 return $interval;
654 =head2 $secs = $obj->interval_secs ( $interval_spec )
656 $interval_spec is described below in the section INTERVAL SPEC. If
657 empty defaults to the inherent interval for this object.
659 =cut
661 sub interval_secs {
662 my ($self, $interval) = @_;
663 $interval ||= $self->interval;
664 unless (defined $interval) {
665 die "interval_secs() called without argument on an object without a declared one";
667 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
668 die "Could not determine seconds from interval[$interval]";
669 if ($interval eq "Z") {
670 return MAX_INT;
671 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
672 return $seconds{$t}*$n;
673 } else {
674 die "Invalid interval specification: n[$n]t[$t]";
678 =head2 $obj->localroot ( $localroot )
680 Get/set accessor. The local root of the tree.
682 =cut
684 sub localroot {
685 my ($self, $localroot) = @_;
686 if (@_ >= 2) {
687 $self->_localroot($localroot);
688 $self->_rfile(undef);
690 $localroot = $self->_localroot;
693 =head2 $ret = $obj->local_path($path_found_in_recentfile)
695 Combines the path to our local mirror and the path of an object found
696 in this I<recentfile>. In other words: the target of a mirror operation.
698 Implementation note: We split on slashes and then use
699 File::Spec::catfile to adjust to the local operating system.
701 =cut
703 sub local_path {
704 my($self,$path) = @_;
705 unless (defined $path) {
706 # seems like a degenerated case
707 return $self->localroot;
709 my @p = split m|/|, $path;
710 File::Spec->catfile($self->localroot,@p);
713 =head2 (void) $obj->lock
715 Locking is implemented with an C<mkdir> on a locking directory
716 (C<.lock> appended to $rfile).
718 =cut
720 sub lock {
721 my ($self) = @_;
722 # not using flock because it locks on filehandles instead of
723 # old school ressources.
724 my $locked = $self->_is_locked and return;
725 my $rfile = $self->rfile;
726 # XXX need a way to allow breaking the lock
727 my $start = time;
728 my $locktimeout = $self->locktimeout || 600;
729 while (not mkdir "$rfile.lock") {
730 Time::HiRes::sleep 0.01;
731 if (time - $start > $locktimeout) {
732 die "Could not acquire lockdirectory '$rfile.lock': $!";
735 $self->_is_locked (1);
738 =head2 (void) $obj->merge ($other)
740 Bulk update of this object with another one. It's used to merge a
741 smaller and younger $other object into the current one. If this file
742 is a C<Z> file, then we do not merge in objects of type C<delete>. But
743 if we encounter an object of type delete we delete the corresponding
744 C<new> object.
746 If there is nothing to be merged, nothing is done.
748 =cut
750 sub merge {
751 my($self, $other) = @_;
752 $other->lock;
753 my $other_recent = $other->recent_events || [];
754 $self->lock;
755 my $my_recent = $self->recent_events || [];
756 if ($self->interval_secs <= $other->interval_secs) {
757 die sprintf
759 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
760 $self->interval_secs,
761 $other->interval_secs,
765 # calculate the target time span
766 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
767 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
768 my $oldest_allowed = 0;
769 my $something_done;
770 if ($my_recent->[0]) {
771 # couldn't we just short circuit now? This will not reach
772 # $something_done=1, right?
773 } else {
774 # obstetrics
775 $something_done=1;
777 if ($epoch) {
778 if (my $merged = $self->merged) {
779 my $secs = $self->interval_secs();
780 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
782 # throw away outsiders
783 # XXX _bigfloat!
784 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
785 pop @$my_recent;
786 $something_done=1;
790 my %have;
791 my $recent = [];
792 for my $oev (@$other_recent) {
793 my $oevepoch = $oev->{epoch} || 0;
794 next if $oevepoch < $oldest_allowed;
795 my $path = $oev->{path};
796 next if $have{$path}++;
797 if ( $self->interval eq "Z"
798 and $oev->{type} eq "delete") {
799 # do nothing
800 } else {
801 if (!$myepoch || $oevepoch > $myepoch) {
802 $something_done=1;
804 push @$recent, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
807 if ($something_done) {
808 push @$recent, grep { !$have{$_->{path}}++ } @$my_recent;
809 $self->write_recent($recent);
810 $other->merged({
811 time => Time::HiRes::time, # not used anywhere
812 epoch => $epoch, # used in oldest_allowed
813 into_interval => $self->interval, # not used anywhere
815 $other->write_recent($other_recent);
817 $self->unlock;
818 $other->unlock;
821 =head2 merged
823 Hashref denoting when this recentfile has been merged into some other
824 at which epoch.
826 =cut
828 sub merged {
829 my($self, $set) = @_;
830 if (defined $set) {
831 $self->_merged ($set);
833 my $merged = $self->_merged;
834 my $into;
835 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
836 if ($into eq $self->interval) {
837 warn sprintf
839 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
840 $into,
841 $self->interval,
843 } elsif ($self->interval_secs($into) < $self->interval_secs) {
844 warn sprintf
846 "Warning: into_interval[%s] smaller than own interval[%s] on interval[%s]. Danger ahead.",
847 $self->interval_secs($into),
848 $self->interval_secs,
849 $self->interval,
853 $merged;
856 =head2 $hashref = $obj->meta_data
858 Returns the hashref of metadata that the server has to add to the
859 I<recentfile>.
861 =cut
863 sub meta_data {
864 my($self) = @_;
865 my $ret = $self->{meta};
866 for my $m (
867 "aggregator",
868 "canonize",
869 "comment",
870 "filenameroot",
871 "merged",
872 "interval",
873 "protocol",
874 "serializer_suffix",
876 my $v = $self->$m;
877 if (defined $v) {
878 $ret->{$m} = $v;
881 # XXX need to reset the Producer if I am a writer, keep it when I
882 # am a reader
883 $ret->{Producers} ||= {
884 __PACKAGE__, "$VERSION", # stringified it looks better
885 '$0', $0,
886 'time', Time::HiRes::time,
888 return $ret;
891 =head2 $success = $obj->mirror ( %options )
893 Mirrors the files in this I<recentfile> as reported by
894 C<recent_events>. Options named C<after>, C<before>, C<max>, and
895 C<skip-deletes> are passed through to the L<recent_events> call. The
896 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
897 C<max_files_per_connection> and keep track of the rsynced files so
898 that future calls will rsync different files until all files are
899 brought to sync.
901 =cut
903 sub mirror {
904 my($self, %options) = @_;
905 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
906 $self->_use_tempfile (1);
907 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
908 my ($recent_events) = $self->recent_events(%passthrough);
909 my(@error, @xcollector);
910 my $first_item = 0;
911 my $last_item = $#$recent_events;
912 my $done = $self->done;
913 my $pathdb = $self->_pathdb;
914 ITEM: for my $i ($first_item..$last_item) {
915 my $status = +{};
916 $self->_mirror_item
919 $recent_events,
920 $last_item,
921 $done,
922 $pathdb,
923 \@xcollector,
924 \%options,
925 $status,
926 \@error,
928 return if $status->{mustreturn};
930 if (@xcollector) {
931 my $success = eval { $self->_empty_xcollector (\@xcollector,$pathdb,$recent_events);};
932 if (!$success || $@) {
933 warn "Warning: Unknown error while mirroring: $@";
934 push @error, $@;
935 sleep 1;
937 if ($self->verbose) {
938 print STDERR "DONE\n";
941 my $rfile = $self->rfile;
942 unless (rename $trecentfile, $rfile) {
943 require Carp;
944 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
946 $self->_use_tempfile (0);
947 if (my $ctfh = $self->_current_tempfile_fh) {
948 $ctfh->unlink_on_destroy (0);
949 $self->_current_tempfile_fh (undef);
951 return !@error;
954 sub _mirror_item {
955 my($self,
957 $recent_events,
958 $last_item,
959 $done,
960 $pathdb,
961 $xcollector,
962 $options,
963 $status,
964 $error,
965 ) = @_;
966 my $recent_event = $recent_events->[$i];
967 return if $done->covered ( $recent_event->{epoch} );
968 if ($pathdb) {
969 my $rec = $pathdb->{$recent_event->{path}};
970 if ($rec && $rec->{recentepoch}) {
971 if (File::Rsync::Mirror::Recentfile::Done::_bigfloatgt
972 ( $rec->{recentepoch}, $recent_event->{epoch} )){
973 $done->register ($recent_events, [$i]);
974 return;
978 my $dst = $self->local_path($recent_event->{path});
979 if ($recent_event->{type} eq "new"){
980 $self->_mirror_item_new
982 $dst,
984 $last_item,
985 $recent_events,
986 $recent_event,
987 $xcollector,
988 $pathdb,
989 $status,
990 $error,
991 $options,
993 } elsif ($recent_event->{type} eq "delete") {
994 my $activity;
995 if ($options->{'skip-deletes'}) {
996 $activity = "skipp";
997 } else {
998 if (-l $dst or not -d _) {
999 unless (unlink $dst) {
1000 require Carp;
1001 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1003 } else {
1004 unless (rmdir $dst) {
1005 require Carp;
1006 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1009 $activity = "delet";
1011 $done->register ($recent_events, [$i]);
1012 if ($pathdb) {
1013 $self->_register_path($pathdb,[$recent_event],$activity);
1015 } else {
1016 warn "Warning: invalid upload type '$recent_event->{type}'";
1020 sub _mirror_item_new {
1021 my($self,
1022 $dst,
1024 $last_item,
1025 $recent_events,
1026 $recent_event,
1027 $xcollector,
1028 $pathdb,
1029 $status,
1030 $error,
1031 $options,
1032 ) = @_;
1033 if ($self->verbose) {
1034 my $doing = -e $dst ? "Syncing" : "Getting";
1035 printf STDERR
1037 "%s (%d/%d/%s) %s ... ",
1038 $doing,
1039 1+$i,
1040 1+$last_item,
1041 $self->interval,
1042 $recent_event->{path},
1045 my $max_files_per_connection = $self->max_files_per_connection || 42;
1046 my $success;
1047 if ($self->verbose) {
1048 print STDERR "\n";
1050 push @$xcollector, { rev => $recent_event, i => $i };
1051 if (@$xcollector >= $max_files_per_connection) {
1052 $success = eval {$self->_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1053 my $sleep = $self->sleep_per_connection;
1054 $sleep = 0.42 unless defined $sleep;
1055 Time::HiRes::sleep $sleep;
1056 if ($options->{piecemeal}) {
1057 $status->{mustreturn} = 1;
1058 return;
1060 } else {
1061 return;
1063 if (!$success || $@) {
1064 warn "Warning: Error while mirroring: $@";
1065 push @$error, $@;
1066 sleep 1;
1068 if ($self->verbose) {
1069 print STDERR "DONE\n";
1073 sub _empty_xcollector {
1074 my($self,$xcoll,$pathdb,$recent_events) = @_;
1075 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1076 if ($pathdb) {
1077 $self->_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1079 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1080 @$xcoll = ();
1081 return $success;
1084 sub _register_path {
1085 my($self,$db,$coll,$act) = @_;
1086 my $time = time;
1087 for my $item (@$coll) {
1088 $db->{$item->{path}} =
1090 recentepoch => $item->{epoch},
1091 ($act."edon") => $time,
1096 =head2 (void) $obj->mirror_loop
1098 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1099 What happens/should happen if we miss the interval during a single loop?
1101 =cut
1103 sub mirror_loop {
1104 my($self) = @_;
1105 my $iteration_start = time;
1107 my $Signal = 0;
1108 $SIG{INT} = sub { $Signal++ };
1109 my $loopinterval = $self->loopinterval || 42;
1110 my $after = -999999999;
1111 LOOP: while () {
1112 $self->mirror($after);
1113 last LOOP if $Signal;
1114 my $re = $self->recent_events;
1115 $after = $re->[0]{epoch};
1116 if ($self->verbose) {
1117 local $| = 1;
1118 print "($after)";
1120 if (time - $iteration_start < $loopinterval) {
1121 sleep $iteration_start + $loopinterval - time;
1123 if ($self->verbose) {
1124 local $| = 1;
1125 print "~";
1130 =head2 $success = $obj->mirror_path ( $arrref | $path )
1132 If the argument is a scalar it is treated as a path. The remote path
1133 is mirrored into the local copy. $path is the path found in the
1134 I<recentfile>, i.e. it is relative to the root directory of the
1135 mirror.
1137 If the argument is an array reference then all elements are treated as
1138 a path below the current tree and all are rsynced with a single
1139 command (and a single connection).
1141 =cut
1143 sub mirror_path {
1144 my($self,$path) = @_;
1145 # XXX simplify the two branches such that $path is treated as
1146 # [$path] maybe even demand the argument as an arrayref to
1147 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1148 # interface)
1149 if (ref $path and ref $path eq "ARRAY") {
1150 my $dst = $self->localroot;
1151 mkpath dirname $dst;
1152 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1153 lc $self->filenameroot,
1155 TMPDIR => 1,
1156 UNLINK => 0,
1158 for my $p (@$path) {
1159 print $fh $p, "\n";
1161 $fh->flush;
1162 $fh->unlink_on_destroy(1);
1163 my $gaveup = 0;
1164 my $retried = 0;
1165 while (!$self->rsync->exec
1167 src => join("/",
1168 $self->remoteroot,
1170 dst => $dst,
1171 'files-from' => $fh->filename,
1172 )) {
1173 my($err) = $self->rsync->err;
1174 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1175 if ($self->verbose) {
1176 warn "Info: ignoring link_stat error '$err'";
1178 return 1;
1180 $self->register_rsync_error ($err);
1181 if (++$retried >= 3) {
1182 warn "XXX giving up.";
1183 $gaveup = 1;
1184 last;
1187 unless ($gaveup) {
1188 $self->un_register_rsync_error ();
1190 } else {
1191 my $dst = $self->local_path($path);
1192 mkpath dirname $dst;
1193 while (!$self->rsync->exec
1195 src => join("/",
1196 $self->remoteroot,
1197 $path
1199 dst => $dst,
1200 )) {
1201 my($err) = $self->rsync->err;
1202 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1203 if ($self->verbose) {
1204 warn "Info: ignoring link_stat error '$err'";
1206 return 1;
1208 $self->register_rsync_error ($err);
1210 $self->un_register_rsync_error ();
1212 return 1;
1215 sub _my_current_rfile {
1216 my($self) = @_;
1217 my $rfile;
1218 if ($self->_use_tempfile) {
1219 $rfile = $self->_current_tempfile;
1220 } else {
1221 $rfile = $self->rfile;
1223 return $rfile;
1226 =head2 $path = $obj->naive_path_normalize ($path)
1228 Takes an absolute unix style path as argument and canonicalizes it to
1229 a shorter path if possible, removing things like double slashes or
1230 C</./> and removes references to C<../> directories to get a shorter
1231 unambiguos path. This is used to make the code easier that determines
1232 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1234 =cut
1236 sub naive_path_normalize {
1237 my($self,$path) = @_;
1238 $path =~ s|/+|/|g;
1239 1 while $path =~ s|/[^/]+/\.\./|/|;
1240 $path =~ s|/$||;
1241 $path;
1244 =head2 $ret = $obj->read_recent_1 ( $data )
1246 Delegate of C<recent_events()> on protocol 1
1248 =cut
1250 sub read_recent_1 {
1251 my($self, $data) = @_;
1252 return $data->{recent};
1255 =head2 $array_ref = $obj->recent_events ( %options )
1257 Note: the code relies on the resource being written atomically. We
1258 cannot lock because we may have no write access. If the caller has
1259 write access (eg. aggregate() or update()), it has to care for any
1260 necessary locking.
1262 If $options{after} is specified, only file events after this timestamp
1263 are returned.
1265 If $options{before} is specified, only file events before this
1266 timestamp are returned.
1268 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1269 be returned.
1271 If $options{max} is specified only this many events are returned.
1273 If $options{info} is specified, it must be a hashref. This hashref
1274 will be filled with metadata about the unfiltered recent_events of
1275 this object, in key C<first> there is the first item, in key C<last>
1276 is the last.
1278 =cut
1280 sub recent_events {
1281 my ($self, %options) = @_;
1282 my $info = $options{info};
1283 if ($self->is_slave) {
1284 $self->get_remote_recentfile_as_tempfile;
1286 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1287 -e $rfile_or_tempfile or return [];
1288 my $suffix = $self->serializer_suffix;
1289 my ($data) = eval {
1290 $self->_try_deserialize
1292 $suffix,
1293 $rfile_or_tempfile,
1296 my $err = $@;
1297 if ($err or !$data) {
1298 return [];
1300 my $re;
1301 if (reftype $data eq 'ARRAY') { # protocol 0
1302 $re = $data;
1303 } else {
1304 $re = $self->_recent_events_protocol_x
1306 $data,
1307 $rfile_or_tempfile,
1310 return $re unless defined $options{after}; # XXX same for before and max
1311 my $last_item = $#$re;
1312 if ($info) {
1313 $info->{first} = $re->[0];
1314 $info->{last} = $re->[-1];
1316 if (defined $options{after}) {
1317 if ($re->[0]{epoch} > $options{after}) {
1318 if (
1319 my $f = first
1320 {$re->[$_]{epoch} <= $options{after}}
1321 0..$#$re
1323 $last_item = $f-1;
1325 } else {
1326 $last_item = -1;
1329 my $first_item = 0;
1330 if (defined $options{before}) {
1331 if ($re->[0]{epoch} > $options{before}) {
1332 if (
1333 my $f = first
1334 {$re->[$_]{epoch} < $options{before}}
1335 0..$last_item
1337 $first_item = $f;
1339 } else {
1340 $first_item = 0;
1343 my @rre = splice @$re, $first_item, 1+$last_item-$first_item;
1344 if ($options{'skip-deletes'}) {
1345 @rre = grep { $_->{type} ne "delete" } @rre;
1347 if ($options{max} && @rre > $options{max}) {
1348 @rre = splice @rre, 0, $options{max};
1350 \@rre;
1353 sub _recent_events_protocol_x {
1354 my($self,
1355 $data,
1356 $rfile_or_tempfile,
1357 ) = @_;
1358 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1359 # we may be reading meta for the first time
1360 while (my($k,$v) = each %{$data->{meta}}) {
1361 next if $k ne lc $k; # "Producers"
1362 next if defined $self->$k;
1363 $self->$k($v);
1365 my $re = $self->$meth ($data);
1366 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1367 my $minmax = { mtime => $stat[9] };
1368 if (@$re) {
1369 $minmax->{min} = $re->[-1]{epoch};
1370 $minmax->{max} = $re->[0]{epoch};
1372 $self->minmax ( $minmax );
1373 return $re;
1376 sub _try_deserialize {
1377 my($self,
1378 $suffix,
1379 $rfile_or_tempfile,
1380 ) = @_;
1381 if ($suffix eq ".yaml") {
1382 require YAML::Syck;
1383 YAML::Syck::LoadFile($rfile_or_tempfile);
1384 } elsif ($HAVE->{"Data::Serializer"}) {
1385 my $serializer = Data::Serializer->new
1386 ( serializer => $serializers{$suffix} );
1387 my $serialized = do
1389 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1390 local $/;
1391 <$fh>;
1393 $serializer->raw_deserialize($serialized);
1394 } else {
1395 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1399 =head2 $ret = $obj->rfilename
1401 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1402 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1404 =cut
1406 sub rfilename {
1407 my($self) = @_;
1408 my $file = sprintf("%s-%s%s",
1409 $self->filenameroot,
1410 $self->interval,
1411 $self->serializer_suffix,
1413 return $file;
1416 =head2 $str = $self->remote_dir
1418 The directory we are mirroring from.
1420 =cut
1422 sub remote_dir {
1423 my($self, $set) = @_;
1424 if (defined $set) {
1425 $self->_remote_dir ($set);
1427 my $x = $self->_remote_dir;
1428 $self->is_slave (1);
1429 return $x;
1432 =head2 $str = $obj->remoteroot
1434 =head2 (void) $obj->remoteroot ( $set )
1436 Get/Set the composed prefix needed when rsyncing from a remote module.
1437 If remote_host, remote_module, and remote_dir are set, it is composed
1438 from these.
1440 =cut
1442 sub remoteroot {
1443 my($self, $set) = @_;
1444 if (defined $set) {
1445 $self->_remoteroot($set);
1447 my $remoteroot = $self->_remoteroot;
1448 unless (defined $remoteroot) {
1449 $remoteroot = sprintf
1451 "%s%s%s",
1452 defined $self->remote_host ? ($self->remote_host."::") : "",
1453 defined $self->remote_module ? ($self->remote_module."/") : "",
1454 defined $self->remote_dir ? $self->remote_dir : "",
1456 $self->_remoteroot($remoteroot);
1458 return $remoteroot;
1461 =head2 my $rfile = $obj->rfile
1463 Returns the full path of the I<recentfile>
1465 =cut
1467 sub rfile {
1468 my($self) = @_;
1469 my $rfile = $self->_rfile;
1470 return $rfile if defined $rfile;
1471 $rfile = File::Spec->catfile
1472 ($self->localroot,
1473 $self->rfilename,
1475 $self->_rfile ($rfile);
1476 return $rfile;
1479 =head2 $rsync_obj = $obj->rsync
1481 The File::Rsync object that this object uses for communicating with an
1482 upstream server.
1484 =cut
1486 sub rsync {
1487 my($self) = @_;
1488 my $rsync = $self->_rsync;
1489 unless (defined $rsync) {
1490 my $rsync_options = $self->rsync_options || {};
1491 if ($HAVE->{"File::Rsync"}) {
1492 $rsync = File::Rsync->new($rsync_options);
1493 $self->_rsync($rsync);
1494 } else {
1495 die "File::Rsync required for rsync operations. Cannot continue";
1498 return $rsync;
1501 =head2 (void) $obj->register_rsync_error($err)
1503 =head2 (void) $obj->un_register_rsync_error()
1505 Register_rsync_error is called whenever the File::Rsync object fails
1506 on an exec (say, connection doesn't succeed). It issues a warning and
1507 sleeps for an increasing amount of time. Un_register_rsync_error
1508 resets the error count. See also accessor C<max_rsync_errors>.
1510 =cut
1513 my $no_success_count = 0;
1514 my $no_success_time = 0;
1515 sub register_rsync_error {
1516 my($self, $err) = @_;
1517 chomp $err;
1518 $no_success_time = time;
1519 $no_success_count++;
1520 my $max_rsync_errors = $self->max_rsync_errors;
1521 $max_rsync_errors = 12 unless defined $max_rsync_errors;
1522 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1523 require Carp;
1524 Carp::confess
1526 sprintf
1528 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1529 $self->interval,
1530 $err,
1531 $no_success_count,
1534 my $sleep = 12 * $no_success_count;
1535 $sleep = 120 if $sleep > 120;
1536 require Carp;
1537 Carp::cluck
1538 (sprintf
1540 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1541 scalar(localtime($no_success_time)),
1542 $self->interval,
1543 $err,
1544 $sleep,
1546 sleep $sleep
1548 sub un_register_rsync_error {
1549 my($self) = @_;
1550 $no_success_time = 0;
1551 $no_success_count = 0;
1555 =head2 $clone = $obj->_sparse_clone
1557 Clones just as much from itself that it does not hurt. Experimental method.
1559 =cut
1561 sub _sparse_clone {
1562 my($self) = @_;
1563 my $new = bless {}, ref $self;
1564 for my $m (qw(
1565 _interval
1566 _localroot
1567 _remoteroot
1568 _rfile
1569 _use_tempfile
1570 aggregator
1571 filenameroot
1572 is_slave
1573 max_files_per_connection
1574 protocol
1575 rsync_options
1576 serializer_suffix
1577 sleep_per_connection
1578 verbose
1579 )) {
1580 my $o = $self->$m;
1581 $o = Storable::dclone $o if ref $o;
1582 $new->$m($o);
1584 $new;
1587 =head2 $boolean = OBJ->ttl_reached ()
1589 =cut
1591 sub ttl_reached {
1592 my($self) = @_;
1593 my $have_mirrored = $self->have_mirrored;
1594 my $now = Time::HiRes::time;
1595 my $ttl = $self->ttl;
1596 $ttl = 24.2 unless defined $ttl;
1597 if ($now > $have_mirrored + $ttl) {
1598 return 1;
1600 return 0;
1603 =head2 (void) $obj->unlock()
1605 Unlocking is implemented with an C<rmdir> on a locking directory
1606 (C<.lock> appended to $rfile).
1608 =cut
1610 sub unlock {
1611 my($self) = @_;
1612 return unless $self->_is_locked;
1613 my $rfile = $self->rfile;
1614 rmdir "$rfile.lock";
1615 $self->_is_locked (0);
1618 =head2 $ret = $obj->update ($path, $type)
1620 Enter one file into the local I<recentfile>. $path is the (usually
1621 absolute) path. If the path is outside the I<our> tree, then it is
1622 ignored.
1624 $type is one of C<new> or C<delete>.
1626 The new file event is uhshifted to the array of recent_events and the
1627 array is shortened to the length of the timespan allowed. This is
1628 usually the timespan specified by the interval of this recentfile but
1629 as long as this recentfile has not been merged to another one, the
1630 timespan may grow without bounds.
1632 =cut
1634 sub update {
1635 my($self,$path,$type) = @_;
1636 die "update called without path argument" unless defined $path;
1637 die "update called without type argument" unless defined $type;
1638 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1639 my $canonmeth = $self->canonize;
1640 unless ($canonmeth) {
1641 $canonmeth = "naive_path_normalize";
1643 $path = $self->$canonmeth($path);
1644 my $lrd = $self->localroot;
1645 if ($path =~ s|^\Q$lrd\E||) {
1646 $path =~ s|^/||;
1647 my $interval = $self->interval;
1648 my $secs = $self->interval_secs();
1649 $self->lock;
1650 # you must calculate the time after having locked, of course
1651 my $epoch = Time::HiRes::time;
1652 my $recent = $self->recent_events;
1653 $recent ||= [];
1654 my $oldest_allowed = 0;
1655 if (my $merged = $self->merged) {
1656 # XXX _bigfloat!
1657 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
1658 } else {
1659 # as long as we are not merged at all, no limits!
1661 TRUNCATE: while (@$recent) {
1662 if ($recent->[-1]{epoch} < $oldest_allowed) { # XXX _bigfloatlt!
1663 pop @$recent;
1664 } else {
1665 last TRUNCATE;
1668 # remove older duplicates of this $path, irrespective of $type:
1669 $recent = [ grep { $_->{path} ne $path } @$recent ];
1671 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1672 $self->write_recent($recent);
1673 $self->_assert_symlink;
1674 $self->unlock;
1678 =head2 uptodate
1680 True if this object has mirrored the complete interval covered by the
1681 current recentfile.
1683 *** WIP ***
1685 =cut
1687 sub uptodate {
1688 my($self, $debug) = @_;
1689 if ($self->ttl_reached){
1690 if ($debug) {
1691 warn "ttl_reached returned true, so we are not uptodate";
1693 return 0 ;
1696 # look if recentfile has unchanged timestamp
1697 my $minmax = $self->minmax;
1698 if (exists $minmax->{mtime}) {
1699 my $rfile = $self->_my_current_rfile;
1700 my @stat = stat $rfile;
1701 my $mtime = $stat[9];
1702 if ($mtime > $minmax->{mtime}) {
1703 if ($debug) {
1704 warn "$mtime > $minmax->{mtime}, so we are not uptodate";
1706 return 0;
1707 } else {
1708 my $covered = $self->done->covered(@$minmax{qw(max min)});
1709 if ($debug) {
1710 warn "minmax covered[$covered], so we return that";
1712 return $covered;
1715 if ($debug) {
1716 warn "fallthrough, so not uptodate";
1718 return 0;
1721 =head2 $obj->write_recent ($recent_files_arrayref)
1723 Writes a I<recentfile> based on the current reflection of the current
1724 state of the tree limited by the current interval.
1726 =cut
1728 sub write_recent {
1729 my ($self,$recent) = @_;
1730 die "write_recent called without argument" unless defined $recent;
1731 my $meth = sprintf "write_%d", $self->protocol;
1732 $self->$meth($recent);
1735 =head2 $obj->write_0 ($recent_files_arrayref)
1737 Delegate of C<write_recent()> on protocol 0
1739 =cut
1741 sub write_0 {
1742 my ($self,$recent) = @_;
1743 my $rfile = $self->rfile;
1744 YAML::Syck::DumpFile("$rfile.new",$recent);
1745 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1748 =head2 $obj->write_1 ($recent_files_arrayref)
1750 Delegate of C<write_recent()> on protocol 1
1752 =cut
1754 sub write_1 {
1755 my ($self,$recent) = @_;
1756 my $rfile = $self->rfile;
1757 my $suffix = $self->serializer_suffix;
1758 my $data = {
1759 meta => $self->meta_data,
1760 recent => $recent,
1762 my $serialized;
1763 if ($suffix eq ".yaml") {
1764 $serialized = YAML::Syck::Dump($data);
1765 } elsif ($HAVE->{"Data::Serializer"}) {
1766 my $serializer = Data::Serializer->new
1767 ( serializer => $serializers{$suffix} );
1768 $serialized = $serializer->raw_serialize($data);
1769 } else {
1770 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1772 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
1773 print $fh $serialized;
1774 close $fh or die "Could not close '$rfile.new': $!";
1775 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1778 BEGIN {
1779 my @pod_lines =
1780 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1782 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1784 The idea is that we want to have a short file that records really
1785 recent changes. So that a fresh mirror can be kept fresh as long as
1786 the connectivity is given. Then we want longer files that record the
1787 history before. So when the mirror falls behind the update period
1788 reflected in the shortest file, it can switch to the next one. And if
1789 this is not long enough we want another one, again a bit longer. And
1790 we want one that completes the history back to the oldest file. For
1791 practical reasons the timespans of these files must overlap a bit and
1792 to keep the bandwidth necessities low they must not be
1793 updated too frequently. That's the basic idea. The following
1794 example represents a tree that has a few updates every day:
1796 RECENT.recent -> RECENT-1h.yaml
1797 RECENT-6h.yaml
1798 RECENT-1d.yaml
1799 RECENT-1M.yaml
1800 RECENT-1W.yaml
1801 RECENT-1Q.yaml
1802 RECENT-1Y.yaml
1803 RECENT-Z.yaml
1805 The first file is the principal file, in so far it is the one that is
1806 written first after a filesystem change. Usually a symlink links to it
1807 with a filename that has the same filenameroot and the suffix
1808 C<.recent>. On systems that do not support symlinks there is a plain
1809 copy maintained instead.
1811 The last file, the Z file, contains the complementary files that are
1812 in none of the other files. It does never contain C<deletes>. Besides
1813 this it serves the role of a recovery mechanism or spill over pond.
1814 When things go wrong, it's a valuable controlling instance to hold the
1815 differences between the collection of limited interval files and the
1816 actual filesystem.
1818 =head2 A SINGLE RECENTFILE
1820 A I<recentfile> consists of a hash that has two keys: C<meta> and
1821 C<recent>. The C<meta> part has metadata and the C<recent> part has a
1822 list of fileobjects.
1824 =head2 THE META PART
1826 Here we find things that are pretty much self explaining: all
1827 lowercase attributes are accessors and as such explained somewhere
1828 above in this manpage. The uppercase attribute C<Producers> contains
1829 version information about involved software components. Nothing to
1830 worry about as I believe.
1832 =head2 THE RECENT PART
1834 This is the interesting part. Every entry refers to some filesystem
1835 change (with path, epoch, type). The epoch value is the point in time
1836 when some change was I<registered>. Do not be tempted to believe that
1837 the entry has a direct relation to something like modification time or
1838 change time on the filesystem level. The timestamp (I<epoch> element)
1839 is a floating point number and does practically never correspond
1840 exactly to the data recorded in the filesystem but rather to the time
1841 when some process succeeded to report to the I<recentfile> mechanism
1842 that something has changed. This is why many parts of the code refer
1843 to I<events>, because we merely try to record the I<event> of the
1844 discovery of a change, not the time of the change itself.
1846 All these entries can be devided into two types (denoted by the
1847 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
1848 C<new>s. Deletes are C<delete>s.
1850 Another distinction is for objects with an epoch timestamp and others
1851 without. All files that were already existing on the filesystem before
1852 the I<recentfile> mechanism was installed, get recorded with a
1853 timestamp of zero.
1855 Besides an C<epoch> and a C<type> attribute we find a third one:
1856 C<path>. This path is relative to the directory we find the
1857 I<recentfile> in.
1859 The order of the entries in the I<recentfile> is by decreasing epoch
1860 attribute. These are either 0 or a unique floating point number. They
1861 are zero for events that were happening either before the time that
1862 the I<recentfile> mechanism was set up or were left undiscovered for a
1863 while and never handed over to update(). They are floating point
1864 numbers for all events being regularly handed to update(). And when
1865 the server has ntp running correctly, then the timestamps are
1866 actually decreasing and unique.
1868 =head1 CORRUPTION AND RECOVERY
1870 If the origin host breaks the promise to deliver consistent and
1871 complete I<recentfiles> then the way back to sanity shall be achieved
1872 through either the C<zloop> (still TBD) or traditional rsyncing
1873 between the hosts. For example, if the origin server forgets to deploy
1874 ntp and the clock on it jumps backwards some day, then this would
1875 probably go unnoticed for a while and many software components that
1876 rely on the time never running backwards will make wrong decisions.
1877 After some time this accident would probably still be found in one of
1878 the I<recentfiles> but would become meaningless as soon as a mirror
1879 has run through the sanitizing procedures. Same goes for origin hosts
1880 that forget to include or deliberately omit some files.
1882 =head1 SERIALIZERS
1884 The following suffixes are supported and trigger the use of these
1885 serializers:
1887 =over 4
1889 =item C<< ".yaml" => "YAML::Syck" >>
1891 =item C<< ".json" => "JSON" >>
1893 =item C<< ".sto" => "Storable" >>
1895 =item C<< ".dd" => "Data::Dumper" >>
1897 =back
1899 =cut
1901 BEGIN {
1902 my @pod_lines =
1903 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1905 =head1 INTERVAL SPEC
1907 An interval spec is a primitive way to express time spans. Normally it
1908 is composed from an integer and a letter.
1910 As a special case, a string that consists only of the single letter
1911 C<Z>, stands for unlimited time.
1913 The following letters express the specified number of seconds:
1915 =over 4
1917 =item C<< s => 1 >>
1919 =item C<< m => 60 >>
1921 =item C<< h => 60*60 >>
1923 =item C<< d => 60*60*24 >>
1925 =item C<< W => 60*60*24*7 >>
1927 =item C<< M => 60*60*24*30 >>
1929 =item C<< Q => 60*60*24*90 >>
1931 =item C<< Y => 60*60*24*365.25 >>
1933 =back
1935 =cut
1937 =head1 BACKGROUND
1939 This is about speeding up rsync operation on large trees to many
1940 places. Uses a small metadata cocktail and pull technology.
1942 =head2 NON-COMPETITORS
1944 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
1945 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
1946 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
1947 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
1948 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
1949 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
1951 rsnapshot www.rsnapshot.org focus on backup
1952 csync www.csync.org more like unison
1954 =head2 COMPETITORS
1956 The problem to solve which clusters and ftp mirrors and otherwise
1957 replicated datasets like CPAN share: how to transfer only a minimum
1958 amount of data to determine the diff between two hosts.
1960 Normally it takes a long time to determine the diff itself before it
1961 can be transferred. Known solutions at the time of this writing are
1962 csync2, and rsync 3 batch mode.
1964 For many years the best solution was csync2 which solves the
1965 problem by maintining a sqlite database on both ends and talking a
1966 highly sophisticated protocol to quickly determine which files to send
1967 and which to delete at any given point in time. Csync2 is often
1968 inconvenient because the act of syncing demands quite an intimate
1969 relationship between the sender and the receiver and suffers when the
1970 number of syncing sites is large or connections are unreliable.
1972 Rsync 3 batch mode works around these problems by providing rsync-able
1973 batch files which allow receiving nodes to replay the history of the
1974 other nodes. This reduces the need to have an incestuous relation but
1975 it has the disadvantage that these batch files replicate the contents
1976 of the involved files. This seems inappropriate when the nodes already
1977 have a means of communicating over rsync.
1979 rersyncrecent solves this problem with a couple of (usually 2-10)
1980 index files which cover different overlapping time intervals. The
1981 master writes these files and the clients can construct the full tree
1982 from the information contained in them. The most recent index file
1983 usually covers the last seconds or minutes or hours of the tree and
1984 depending on the needs, slaves can rsync every few seconds and then
1985 bring their trees in full sync.
1987 The rersyncrecent mode was developed for CPAN but I hope it is a
1988 convenient and economic general purpose solution. I'm looking forward
1989 to see a CPAN backbone that is only a few seconds behind PAUSE. And
1990 then ... the first FUSE based CPAN filesystem anyone?
1992 =head1 AUTHOR
1994 Andreas König
1996 =head1 BUGS
1998 Please report any bugs or feature requests through the web interface
2000 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2001 I will be notified, and then you'll automatically be notified of
2002 progress on your bug as I make changes.
2004 =head1 SUPPORT
2006 You can find documentation for this module with the perldoc command.
2008 perldoc File::Rsync::Mirror::Recentfile
2010 You can also look for information at:
2012 =over 4
2014 =item * RT: CPAN's request tracker
2016 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2018 =item * AnnoCPAN: Annotated CPAN documentation
2020 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2022 =item * CPAN Ratings
2024 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2026 =item * Search CPAN
2028 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2030 =back
2033 =head1 ACKNOWLEDGEMENTS
2035 Thanks to RJBS for module-starter.
2037 =head1 COPYRIGHT & LICENSE
2039 Copyright 2008 Andreas König.
2041 This program is free software; you can redistribute it and/or modify it
2042 under the same terms as Perl itself.
2045 =cut
2047 1; # End of File::Rsync::Mirror::Recentfile
2049 # Local Variables:
2050 # mode: cperl
2051 # cperl-indent-level: 4
2052 # End: