bugfix where intervals were not collapsed: solved with overlapping intervals instead...
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blobf45f6e517b2facd4b822f933d33beba47895d7f2
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 my $HAVE = {};
19 for my $package (
20 "Data::Serializer",
21 "File::Rsync"
22 ) {
23 $HAVE->{$package} = eval qq{ require $package; };
25 use Config;
26 use File::Basename qw(dirname fileparse);
27 use File::Copy qw(cp);
28 use File::Path qw(mkpath);
29 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
30 use File::Temp;
31 use List::Util qw(first min);
32 use Scalar::Util qw(reftype);
33 use Storable;
34 use Time::HiRes qw();
35 use YAML::Syck;
37 use version; our $VERSION = qv('0.0.1');
40 use constant MAX_INT => ~0>>1; # anything better?
41 use constant DEFAULT_PROTOCOL => 1;
43 # cf. interval_secs
44 my %seconds;
46 # maybe subclass if this mapping is bad?
47 my %serializers;
49 =head1 SYNOPSIS
51 B<!!!! PRE-ALPHA ALERT !!!!>
53 Nothing in here is believed to be stable, nothing yet intended for
54 public consumption. The plan is to provide a script in one of the next
55 releases that acts as a frontend for all the backend functionality.
56 Option and method names will very likely change.
58 For the rationale see the section BACKGROUND.
60 This is published only for developers of the (yet to be named)
61 script(s).
63 Writer (of a single file):
65 use File::Rsync::Mirror::Recentfile;
66 my $fr = File::Rsync::Mirror::Recentfile->new
68 interval => q(6h),
69 filenameroot => "RECENT",
70 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
71 localroot => "/home/ftp/pub/PAUSE/authors/",
72 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
74 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
76 Reader/mirrorer:
78 my $rf = File::Rsync::Mirror::Recentfile->new
80 filenameroot => "RECENT",
81 ignore_link_stat_errors => 1,
82 interval => q(6h),
83 localroot => "/home/ftp/pub/PAUSE/authors",
84 remote_dir => "",
85 remote_host => "pause.perl.org",
86 remote_module => "authors",
87 rsync_options => {
88 compress => 1,
89 'rsync-path' => '/usr/bin/rsync',
90 links => 1,
91 times => 1,
92 'omit-dir-times' => 1,
93 checksum => 1,
95 verbose => 1,
97 $rf->mirror;
99 Aggregator (usually the writer):
101 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
102 $rf->aggregate;
104 =head1 EXPORT
106 No exports.
108 =head1 CONSTRUCTORS / DESTRUCTOR
110 =head2 my $obj = CLASS->new(%hash)
112 Constructor. On every argument pair the key is a method name and the
113 value is an argument to that method name.
115 If a recentfile for this resource already exists, metadata that are
116 not defined by the constructor will be fetched from there as soon as
117 it is being read by recent_events().
119 =cut
121 sub new {
122 my($class, @args) = @_;
123 my $self = bless {}, $class;
124 while (@args) {
125 my($method,$arg) = splice @args, 0, 2;
126 $self->$method($arg);
128 unless (defined $self->protocol) {
129 $self->protocol(DEFAULT_PROTOCOL);
131 unless (defined $self->filenameroot) {
132 $self->filenameroot("RECENT");
134 unless (defined $self->serializer_suffix) {
135 $self->serializer_suffix(".yaml");
137 return $self;
140 =head2 my $obj = CLASS->new_from_file($file)
142 Constructor. $file is a I<recentfile>.
144 =cut
146 sub new_from_file {
147 my($class, $file) = @_;
148 my $self = bless {}, $class;
149 $self->_rfile($file);
150 #?# $self->lock;
151 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
152 local $/;
153 <$fh>;
155 # XXX: we can skip this step when the metadata are sufficient, but
156 # we cannot parse the file without some magic stuff about
157 # serialized formats
158 while (-l $file) {
159 my($name,$path) = fileparse $file;
160 my $symlink = readlink $file;
161 if ($symlink =~ m|/|) {
162 die "FIXME: filenames containing '/' not supported, got $symlink";
164 $file = File::Spec->catfile ( $path, $symlink );
166 my($name,$path,$suffix) = fileparse $file, keys %serializers;
167 $self->serializer_suffix($suffix);
168 $self->localroot($path);
169 die "Could not determine file format from suffix" unless $suffix;
170 my $deserialized;
171 if ($suffix eq ".yaml") {
172 require YAML::Syck;
173 $deserialized = YAML::Syck::LoadFile($file);
174 } elsif ($HAVE->{"Data::Serializer"}) {
175 my $serializer = Data::Serializer->new
176 ( serializer => $serializers{$suffix} );
177 $deserialized = $serializer->raw_deserialize($serialized);
178 } else {
179 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
181 while (my($k,$v) = each %{$deserialized->{meta}}) {
182 next if $k ne lc $k; # "Producers"
183 $self->$k($v);
185 unless (defined $self->protocol) {
186 $self->protocol(DEFAULT_PROTOCOL);
188 return $self;
191 =head2 DESTROY
193 A simple unlock.
195 =cut
196 sub DESTROY { shift->unlock }
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_done",
209 "_interval",
210 "_is_locked",
211 "_localroot",
212 "_merged",
213 "_pathdb",
214 "_remember_last_uptodate_call",
215 "_remote_dir",
216 "_remoteroot",
217 "_rfile",
218 "_rsync",
219 "_use_tempfile",
222 my @pod_lines =
223 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
225 =over 4
227 =item aggregator
229 A list of interval specs that tell the aggregator which I<recentfile>s
230 are to be produced.
232 =item canonize
234 The name of a method to canonize the path before rsyncing. Only
235 supported value is C<naive_path_normalize>. Defaults to that.
237 =item comment
239 A comment about this tree and setup.
241 =item filenameroot
243 The (prefix of the) filename we use for this I<recentfile>. Defaults to
244 C<RECENT>.
246 =item have_mirrored
248 Timestamp remembering when we mirrored this recentfile the last time.
249 Only relevant for slaves.
251 =item ignore_link_stat_errors
253 If set to true, rsync errors are ignored that complain about link stat
254 errors. These seem to happen only when there are files missing at the
255 origin. In race conditions this can always happen, so it is
256 recommended to set this value to true.
258 =item is_slave
260 If set to true, this object will fetch a new recentfile from remote
261 when the timespan between the last mirror (see have_mirrored) and now
262 is too large (currently hardcoded arbitrary 420 seconds).
264 =item locktimeout
266 After how many seconds shall we die if we cannot lock a I<recentfile>?
267 Defaults to 600 seconds.
269 =item loopinterval
271 When mirror_loop is called, this accessor can specify how much time
272 every loop shall at least take. If the work of a loop is done before
273 that time has gone, sleeps for the rest of the time. Defaults to
274 arbitrary 42 seconds.
276 =item max_files_per_connection
278 Maximum number of files that are transferred on a single rsync call.
279 Setting it higher means higher performance at the price of holding
280 connections longer and potentially disturbing other users in the pool.
281 Defaults to the arbitrary value 42.
283 =item max_rsync_errors
285 When rsync operations encounter that many errors without any resetting
286 success in between, then we die. Defaults to arbitrary 12. A value of
287 -1 means we run forever ignoring all rsync errors.
289 =item minmax
291 Hashref remembering when we read the recent_events from this file the
292 last time and what the timespan was.
294 =item protocol
296 When the RECENT file format changes, we increment the protocol. We try
297 to support older protocols in later releases.
299 =item remote_host
301 The host we are mirroring from. Leave empty for the local filesystem.
303 =item remote_module
305 Rsync servers have so called modules to separate directory trees from
306 each other. Put here the name of the module under which we are
307 mirroring. Leave empty for local filesystem.
309 =item rsync_options
311 Things like compress, links, times or checksums. Passed in to the
312 File::Rsync object used to run the mirror.
314 =item serializer_suffix
316 Mostly untested accessor. The only well tested format for
317 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
318 Data::Serializer. But in principle other formats are supported as
319 well. See section SERIALIZERS below.
321 =item sleep_per_connection
323 Sleep that many seconds (floating point OK) after every chunk of rsyncing
324 has finished. Defaults to arbitrary 0.42.
326 =item ttl
328 Time to live. Number of seconds after which this recentfile must be
329 fetched again from the origin server. Only relevant for slaves.
330 Defaults to arbitrary 24.2 seconds.
332 =item verbose
334 Boolean to turn on a bit verbosity.
336 =back
338 =cut
340 use accessors @accessors;
342 =head1 METHODS
344 =head2 (void) $obj->aggregate
346 Takes all intervals that are collected in the accessor called
347 aggregator. Sorts them by actual length of the interval.
348 Removes those that are shorter than our own interval. Then merges this
349 object into the next larger object. The merging continues upwards
350 as long as the next I<recentfile> is old enough to warrant a merge.
352 If a merge is warranted is decided according to the interval of the
353 previous interval so that larger files are not so often updated as
354 smaller ones.
356 Here is an example to illustrate the behaviour. Given aggregators
358 1h 1d 1W 1M 1Q 1Y Z
360 then
362 1h updates 1d on every call to aggregate()
363 1d updates 1W earliest after 1h
364 1W updates 1M earliest after 1d
365 1M updates 1Q earliest after 1W
366 1Q updates 1Y earliest after 1M
367 1Y updates Z earliest after 1Q
369 Note that all but the smallest recentfile get updated at an arbitrary
370 rate and as such are quite useless on their own.
372 =cut
374 sub aggregate {
375 my($self) = @_;
376 my @aggs = sort { $a->{secs} <=> $b->{secs} }
377 grep { $_->{secs} >= $self->interval_secs }
378 map { { interval => $_, secs => $self->interval_secs($_)} }
379 $self->interval, @{$self->aggregator || []};
380 $aggs[0]{object} = $self;
381 AGGREGATOR: for my $i (0..$#aggs-1) {
382 my $this = $aggs[$i]{object};
383 my $next = $this->_sparse_clone;
384 $next->interval($aggs[$i+1]{interval});
385 my $want_merge = 0;
386 if ($i == 0) {
387 $want_merge = 1;
388 } else {
389 my $next_rfile = $next->rfile;
390 if (-e $next_rfile) {
391 my $prev = $aggs[$i-1]{object};
392 local $^T = time;
393 my $next_age = 86400 * -M $next_rfile;
394 if ($next_age > $prev->interval_secs) {
395 $want_merge = 1;
397 } else {
398 $want_merge = 1;
401 if ($want_merge) {
402 $next->merge($this);
403 $aggs[$i+1]{object} = $next;
404 } else {
405 last AGGREGATOR;
410 # collect file size and mtime for all files of this aggregate
411 sub _debug_aggregate {
412 my($self) = @_;
413 my @aggs = sort { $a->{secs} <=> $b->{secs} }
414 map { { interval => $_, secs => $self->interval_secs($_)} }
415 $self->interval, @{$self->aggregator || []};
416 my $report = [];
417 for my $i (0..$#aggs) {
418 my $this = Storable::dclone $self;
419 $this->interval($aggs[$i]{interval});
420 my $rfile = $this->rfile;
421 my @stat = stat $rfile;
422 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
424 $report;
427 # (void) $self->_assert_symlink()
428 sub _assert_symlink {
429 my($self) = @_;
430 my $recentrecentfile = File::Spec->catfile
432 $self->localroot,
433 sprintf
435 "%s.recent",
436 $self->filenameroot
439 if ($Config{d_symlink} eq "define") {
440 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
441 if (-l $recentrecentfile) {
442 my $found_symlink = readlink $recentrecentfile;
443 if ($found_symlink eq $self->rfilename) {
444 return;
445 } else {
446 $howto_create_symlink = 2;
448 } else {
449 $howto_create_symlink = 1;
451 if (1 == $howto_create_symlink) {
452 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
453 } else {
454 unlink "$recentrecentfile.$$"; # may fail
455 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
456 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
458 } else {
459 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
460 unlink "$recentrecentfile.$$"; # may fail
461 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
462 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
466 =head2 $done = $obj->done
468 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
469 that keeps track of rsync activities. Only needed and used when we are
470 a mirroring slave.
472 =cut
474 sub done {
475 my($self) = @_;
476 my $done = $self->_done;
477 if (!$done) {
478 require File::Rsync::Mirror::Recentfile::Done;
479 $done = File::Rsync::Mirror::Recentfile::Done->new();
480 $self->_done ( $done );
482 return $done;
485 =head2 $success = $obj->full_mirror
487 (TBD) Mirrors the whole remote site, starting with the smallest I<recentfile>,
488 switching to larger ones ...
490 =cut
492 sub full_mirror {
493 my($self) = @_;
494 die "FIXME: Not yet implemented";
497 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
499 Stores the remote I<recentfile> locally as a tempfile. The caller is
500 responsible to remove the file after use.
502 Note: if you're intending to act as an rsync server for other slaves,
503 then you must prefer this method to fetch that file with
504 get_remotefile(). Otherwise downstream mirrors would expect you to
505 already have mirrored all the files that are in the I<recentfile>
506 before you have them mirrored.
508 =cut
510 sub get_remote_recentfile_as_tempfile {
511 my($self) = @_;
512 mkpath $self->localroot;
513 my $fh;
514 my $trfilename;
515 if ( $self->_use_tempfile() ) {
516 return $self->_current_tempfile if ! $self->ttl_reached;
517 $fh = $self->_current_tempfile_fh;
518 $trfilename = $self->rfilename;
519 } else {
520 $trfilename = $self->rfilename;
523 my $dst;
524 if ($fh) {
525 $dst = $self->_current_tempfile;
526 } else {
527 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
528 $dst = $fh->filename;
529 $self->_current_tempfile ($dst);
530 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
531 if (defined $rfile && -e $rfile) {
532 # saving on bandwidth. Might need to be configurable
533 # $self->bandwidth_is_cheap?
534 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
537 my $src = join ("/",
538 $self->remoteroot,
539 $trfilename,
541 if ($self->verbose) {
542 my $doing = -e $dst ? "Syncing" : "Getting";
543 printf STDERR
545 "%s (1/1) temporary %s ... ",
546 $doing,
547 $dst,
550 my $gaveup = 0;
551 my $retried = 0;
552 while (!$self->rsync->exec(
553 src => $src,
554 dst => $dst,
555 )) {
556 $self->register_rsync_error ($self->rsync->err);
557 if (++$retried >= 3) {
558 warn "XXX giving up";
559 $gaveup = 1;
560 last;
563 unless ($gaveup) {
564 $self->have_mirrored (Time::HiRes::time);
565 $self->un_register_rsync_error ();
567 if ($self->verbose) {
568 print STDERR "DONE\n";
570 my $mode = 0644;
571 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
572 return $dst;
575 sub _get_remote_rat_provide_tempfile_object {
576 my($self, $trfilename) = @_;
577 my $fh = File::Temp->new
578 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
579 $trfilename,
581 DIR => $self->localroot,
582 SUFFIX => $self->serializer_suffix,
583 UNLINK => $self->_use_tempfile,
585 if ($self->_use_tempfile) {
586 $self->_current_tempfile_fh ($fh); # delay self destruction
588 return $fh;
591 =head2 $localpath = $obj->get_remotefile ( $relative_path )
593 Rsyncs one single remote file to local filesystem.
595 Note: no locking is done on this file. Any number of processes may
596 mirror this object.
598 Note II: do not use for recentfiles. If you are a cascading
599 slave/server combination, it would confuse other slaves. They would
600 expect the contents of these recentfiles to be available. Use
601 get_remote_recentfile_as_tempfile() instead.
603 =cut
605 sub get_remotefile {
606 my($self, $path) = @_;
607 my $dst = File::Spec->catfile($self->localroot, $path);
608 mkpath dirname $dst;
609 if ($self->verbose) {
610 my $doing = -e $dst ? "Syncing" : "Getting";
611 printf STDERR
613 "%s (1/1) %s ... ",
614 $doing,
615 $path,
618 while (!$self->rsync->exec(
619 src => join("/",
620 $self->remoteroot,
621 $path),
622 dst => $dst,
623 )) {
624 $self->register_rsync_error ($self->rsync->err);
626 $self->un_register_rsync_error ();
627 if ($self->verbose) {
628 print STDERR "DONE\n";
630 return $dst;
633 =head2 $obj->interval ( $interval_spec )
635 Get/set accessor. $interval_spec is a string and described below in
636 the section INTERVAL SPEC.
638 =cut
640 sub interval {
641 my ($self, $interval) = @_;
642 if (@_ >= 2) {
643 $self->_interval($interval);
644 $self->_rfile(undef);
646 $interval = $self->_interval;
647 unless (defined $interval) {
648 # do not ask the $self too much, it recurses!
649 require Carp;
650 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
652 return $interval;
655 =head2 $secs = $obj->interval_secs ( $interval_spec )
657 $interval_spec is described below in the section INTERVAL SPEC. If
658 empty defaults to the inherent interval for this object.
660 =cut
662 sub interval_secs {
663 my ($self, $interval) = @_;
664 $interval ||= $self->interval;
665 unless (defined $interval) {
666 die "interval_secs() called without argument on an object without a declared one";
668 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
669 die "Could not determine seconds from interval[$interval]";
670 if ($interval eq "Z") {
671 return MAX_INT;
672 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
673 return $seconds{$t}*$n;
674 } else {
675 die "Invalid interval specification: n[$n]t[$t]";
679 =head2 $obj->localroot ( $localroot )
681 Get/set accessor. The local root of the tree.
683 =cut
685 sub localroot {
686 my ($self, $localroot) = @_;
687 if (@_ >= 2) {
688 $self->_localroot($localroot);
689 $self->_rfile(undef);
691 $localroot = $self->_localroot;
694 =head2 $ret = $obj->local_path($path_found_in_recentfile)
696 Combines the path to our local mirror and the path of an object found
697 in this I<recentfile>. In other words: the target of a mirror operation.
699 Implementation note: We split on slashes and then use
700 File::Spec::catfile to adjust to the local operating system.
702 =cut
704 sub local_path {
705 my($self,$path) = @_;
706 unless (defined $path) {
707 # seems like a degenerated case
708 return $self->localroot;
710 my @p = split m|/|, $path;
711 File::Spec->catfile($self->localroot,@p);
714 =head2 (void) $obj->lock
716 Locking is implemented with an C<mkdir> on a locking directory
717 (C<.lock> appended to $rfile).
719 =cut
721 sub lock {
722 my ($self) = @_;
723 # not using flock because it locks on filehandles instead of
724 # old school ressources.
725 my $locked = $self->_is_locked and return;
726 my $rfile = $self->rfile;
727 # XXX need a way to allow breaking the lock
728 my $start = time;
729 my $locktimeout = $self->locktimeout || 600;
730 while (not mkdir "$rfile.lock") {
731 Time::HiRes::sleep 0.01;
732 if (time - $start > $locktimeout) {
733 die "Could not acquire lockdirectory '$rfile.lock': $!";
736 $self->_is_locked (1);
739 =head2 (void) $obj->merge ($other)
741 Bulk update of this object with another one. It's used to merge a
742 smaller and younger $other object into the current one. If this file
743 is a C<Z> file, then we do not merge in objects of type C<delete>. But
744 if we encounter an object of type delete we delete the corresponding
745 C<new> object.
747 If there is nothing to be merged, nothing is done.
749 =cut
751 sub merge {
752 my($self, $other) = @_;
753 $self->_merge_sanitycheck ( $other );
754 $other->lock;
755 my $other_recent = $other->recent_events || [];
756 $self->lock;
757 my $my_recent = $self->recent_events || [];
759 # calculate the target time span
760 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
761 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
762 my $oldest_allowed = 0;
763 my $something_done;
764 unless ($my_recent->[0]) {
765 # obstetrics
766 $something_done=1;
768 if ($epoch) {
769 if (my $merged = $self->merged) {
770 my $secs = $self->interval_secs();
771 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
773 # throw away outsiders
774 # XXX _bigfloat!
775 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
776 pop @$my_recent;
777 $something_done=1;
781 my %have;
782 my $recent = [];
783 for my $oev (@$other_recent) {
784 my $oevepoch = $oev->{epoch} || 0;
785 next if $oevepoch < $oldest_allowed;
786 my $path = $oev->{path};
787 next if $have{$path}++;
788 if ( $self->interval eq "Z"
789 and $oev->{type} eq "delete") {
790 # do nothing
791 } else {
792 if (!$myepoch || $oevepoch > $myepoch) {
793 $something_done=1;
795 push @$recent, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
798 if ($something_done) {
799 push @$recent, grep { !$have{$_->{path}}++ } @$my_recent;
800 $self->write_recent($recent);
801 $other->merged({
802 time => Time::HiRes::time, # not used anywhere
803 epoch => $epoch, # used in oldest_allowed
804 into_interval => $self->interval, # not used anywhere
806 $other->write_recent($other_recent);
808 $self->unlock;
809 $other->unlock;
812 sub _merge_sanitycheck {
813 my($self, $other) = @_;
814 if ($self->interval_secs <= $other->interval_secs) {
815 die sprintf
817 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
818 $self->interval_secs,
819 $other->interval_secs,
824 =head2 merged
826 Hashref denoting when this recentfile has been merged into some other
827 at which epoch.
829 =cut
831 sub merged {
832 my($self, $set) = @_;
833 if (defined $set) {
834 $self->_merged ($set);
836 my $merged = $self->_merged;
837 my $into;
838 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
839 if ($into eq $self->interval) {
840 require Carp;
841 Carp::cluck(sprintf
843 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
844 $into,
845 $self->interval,
847 } elsif ($self->interval_secs($into) < $self->interval_secs) {
848 require Carp;
849 Carp::cluck(sprintf
851 "Warning: into_interval[%s] smaller than own interval[%s] on interval[%s]. Danger ahead.",
852 $self->interval_secs($into),
853 $self->interval_secs,
854 $self->interval,
858 $merged;
861 =head2 $hashref = $obj->meta_data
863 Returns the hashref of metadata that the server has to add to the
864 I<recentfile>.
866 =cut
868 sub meta_data {
869 my($self) = @_;
870 my $ret = $self->{meta};
871 for my $m (
872 "aggregator",
873 "canonize",
874 "comment",
875 "filenameroot",
876 "merged",
877 "interval",
878 "protocol",
879 "serializer_suffix",
881 my $v = $self->$m;
882 if (defined $v) {
883 $ret->{$m} = $v;
886 # XXX need to reset the Producer if I am a writer, keep it when I
887 # am a reader
888 $ret->{Producers} ||= {
889 __PACKAGE__, "$VERSION", # stringified it looks better
890 '$0', $0,
891 'time', Time::HiRes::time,
893 return $ret;
896 =head2 $success = $obj->mirror ( %options )
898 Mirrors the files in this I<recentfile> as reported by
899 C<recent_events>. Options named C<after>, C<before>, C<max>, and
900 C<skip-deletes> are passed through to the L<recent_events> call. The
901 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
902 C<max_files_per_connection> and keep track of the rsynced files so
903 that future calls will rsync different files until all files are
904 brought to sync.
906 =cut
908 sub mirror {
909 my($self, %options) = @_;
910 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
911 $self->_use_tempfile (1);
912 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
913 my ($recent_events) = $self->recent_events(%passthrough);
914 my(@error, @xcollector);
915 my $first_item = 0;
916 my $last_item = $#$recent_events;
917 my $done = $self->done;
918 my $pathdb = $self->_pathdb;
919 ITEM: for my $i ($first_item..$last_item) {
920 my $status = +{};
921 $self->_mirror_item
924 $recent_events,
925 $last_item,
926 $done,
927 $pathdb,
928 \@xcollector,
929 \%options,
930 $status,
931 \@error,
933 return if $status->{mustreturn};
935 if (@xcollector) {
936 my $success = eval { $self->_empty_xcollector (\@xcollector,$pathdb,$recent_events);};
937 if (!$success || $@) {
938 warn "Warning: Unknown error while mirroring: $@";
939 push @error, $@;
940 sleep 1;
942 if ($self->verbose) {
943 print STDERR "DONE\n";
946 my $rfile = $self->rfile;
947 unless (rename $trecentfile, $rfile) {
948 require Carp;
949 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
951 $self->_use_tempfile (0);
952 if (my $ctfh = $self->_current_tempfile_fh) {
953 $ctfh->unlink_on_destroy (0);
954 $self->_current_tempfile_fh (undef);
956 return !@error;
959 sub _mirror_item {
960 my($self,
962 $recent_events,
963 $last_item,
964 $done,
965 $pathdb,
966 $xcollector,
967 $options,
968 $status,
969 $error,
970 ) = @_;
971 my $recent_event = $recent_events->[$i];
972 return if $done->covered ( $recent_event->{epoch} );
973 if ($pathdb) {
974 my $rec = $pathdb->{$recent_event->{path}};
975 if ($rec && $rec->{recentepoch}) {
976 if (_bigfloatgt
977 ( $rec->{recentepoch}, $recent_event->{epoch} )){
978 $done->register ($recent_events, [$i]);
979 return;
983 my $dst = $self->local_path($recent_event->{path});
984 if ($recent_event->{type} eq "new"){
985 $self->_mirror_item_new
987 $dst,
989 $last_item,
990 $recent_events,
991 $recent_event,
992 $xcollector,
993 $pathdb,
994 $status,
995 $error,
996 $options,
998 } elsif ($recent_event->{type} eq "delete") {
999 my $activity;
1000 if ($options->{'skip-deletes'}) {
1001 $activity = "skipped";
1002 } else {
1003 if (! -e $dst) {
1004 $activity = "not_found";
1005 } elsif (-l $dst or not -d _) {
1006 unless (unlink $dst) {
1007 require Carp;
1008 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1010 $activity = "deleted";
1011 } else {
1012 unless (rmdir $dst) {
1013 require Carp;
1014 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1016 $activity = "deleted";
1019 $done->register ($recent_events, [$i]);
1020 if ($pathdb) {
1021 $self->_register_path($pathdb,[$recent_event],$activity);
1023 } else {
1024 warn "Warning: invalid upload type '$recent_event->{type}'";
1028 sub _mirror_item_new {
1029 my($self,
1030 $dst,
1032 $last_item,
1033 $recent_events,
1034 $recent_event,
1035 $xcollector,
1036 $pathdb,
1037 $status,
1038 $error,
1039 $options,
1040 ) = @_;
1041 if ($self->verbose) {
1042 my $doing = -e $dst ? "Syncing" : "Getting";
1043 printf STDERR
1045 "%s (%d/%d/%s) %s ... ",
1046 $doing,
1047 1+$i,
1048 1+$last_item,
1049 $self->interval,
1050 $recent_event->{path},
1053 my $max_files_per_connection = $self->max_files_per_connection || 42;
1054 my $success;
1055 if ($self->verbose) {
1056 print STDERR "\n";
1058 push @$xcollector, { rev => $recent_event, i => $i };
1059 if (@$xcollector >= $max_files_per_connection) {
1060 $success = eval {$self->_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1061 my $sleep = $self->sleep_per_connection;
1062 $sleep = 0.42 unless defined $sleep;
1063 Time::HiRes::sleep $sleep;
1064 if ($options->{piecemeal}) {
1065 $status->{mustreturn} = 1;
1066 return;
1068 } else {
1069 return;
1071 if (!$success || $@) {
1072 warn "Warning: Error while mirroring: $@";
1073 push @$error, $@;
1074 sleep 1;
1076 if ($self->verbose) {
1077 print STDERR "DONE\n";
1081 sub _empty_xcollector {
1082 my($self,$xcoll,$pathdb,$recent_events) = @_;
1083 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1084 if ($pathdb) {
1085 $self->_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1087 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1088 @$xcoll = ();
1089 return $success;
1092 sub _register_path {
1093 my($self,$db,$coll,$activity) = @_;
1094 my $time = time;
1095 for my $item (@$coll) {
1096 $db->{$item->{path}} =
1098 recentepoch => $item->{epoch},
1099 ($activity."_on") => $time,
1104 =head2 (void) $obj->mirror_loop
1106 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1107 What happens/should happen if we miss the interval during a single loop?
1109 =cut
1111 sub mirror_loop {
1112 my($self) = @_;
1113 my $iteration_start = time;
1115 my $Signal = 0;
1116 $SIG{INT} = sub { $Signal++ };
1117 my $loopinterval = $self->loopinterval || 42;
1118 my $after = -999999999;
1119 LOOP: while () {
1120 $self->mirror($after);
1121 last LOOP if $Signal;
1122 my $re = $self->recent_events;
1123 $after = $re->[0]{epoch};
1124 if ($self->verbose) {
1125 local $| = 1;
1126 print "($after)";
1128 if (time - $iteration_start < $loopinterval) {
1129 sleep $iteration_start + $loopinterval - time;
1131 if ($self->verbose) {
1132 local $| = 1;
1133 print "~";
1138 =head2 $success = $obj->mirror_path ( $arrref | $path )
1140 If the argument is a scalar it is treated as a path. The remote path
1141 is mirrored into the local copy. $path is the path found in the
1142 I<recentfile>, i.e. it is relative to the root directory of the
1143 mirror.
1145 If the argument is an array reference then all elements are treated as
1146 a path below the current tree and all are rsynced with a single
1147 command (and a single connection).
1149 =cut
1151 sub mirror_path {
1152 my($self,$path) = @_;
1153 # XXX simplify the two branches such that $path is treated as
1154 # [$path] maybe even demand the argument as an arrayref to
1155 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1156 # interface)
1157 if (ref $path and ref $path eq "ARRAY") {
1158 my $dst = $self->localroot;
1159 mkpath dirname $dst;
1160 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1161 lc $self->filenameroot,
1163 TMPDIR => 1,
1164 UNLINK => 0,
1166 for my $p (@$path) {
1167 print $fh $p, "\n";
1169 $fh->flush;
1170 $fh->unlink_on_destroy(1);
1171 my $gaveup = 0;
1172 my $retried = 0;
1173 while (!$self->rsync->exec
1175 src => join("/",
1176 $self->remoteroot,
1178 dst => $dst,
1179 'files-from' => $fh->filename,
1180 )) {
1181 my($err) = $self->rsync->err;
1182 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1183 if ($self->verbose) {
1184 warn "Info: ignoring link_stat error '$err'";
1186 return 1;
1188 $self->register_rsync_error ($err);
1189 if (++$retried >= 3) {
1190 warn "XXX giving up.";
1191 $gaveup = 1;
1192 last;
1195 unless ($gaveup) {
1196 $self->un_register_rsync_error ();
1198 } else {
1199 my $dst = $self->local_path($path);
1200 mkpath dirname $dst;
1201 while (!$self->rsync->exec
1203 src => join("/",
1204 $self->remoteroot,
1205 $path
1207 dst => $dst,
1208 )) {
1209 my($err) = $self->rsync->err;
1210 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1211 if ($self->verbose) {
1212 warn "Info: ignoring link_stat error '$err'";
1214 return 1;
1216 $self->register_rsync_error ($err);
1218 $self->un_register_rsync_error ();
1220 return 1;
1223 sub _my_current_rfile {
1224 my($self) = @_;
1225 my $rfile;
1226 if ($self->_use_tempfile) {
1227 $rfile = $self->_current_tempfile;
1228 } else {
1229 $rfile = $self->rfile;
1231 return $rfile;
1234 =head2 $path = $obj->naive_path_normalize ($path)
1236 Takes an absolute unix style path as argument and canonicalizes it to
1237 a shorter path if possible, removing things like double slashes or
1238 C</./> and removes references to C<../> directories to get a shorter
1239 unambiguos path. This is used to make the code easier that determines
1240 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1242 =cut
1244 sub naive_path_normalize {
1245 my($self,$path) = @_;
1246 $path =~ s|/+|/|g;
1247 1 while $path =~ s|/[^/]+/\.\./|/|;
1248 $path =~ s|/$||;
1249 $path;
1252 =head2 $ret = $obj->read_recent_1 ( $data )
1254 Delegate of C<recent_events()> on protocol 1
1256 =cut
1258 sub read_recent_1 {
1259 my($self, $data) = @_;
1260 return $data->{recent};
1263 =head2 $array_ref = $obj->recent_events ( %options )
1265 Note: the code relies on the resource being written atomically. We
1266 cannot lock because we may have no write access. If the caller has
1267 write access (eg. aggregate() or update()), it has to care for any
1268 necessary locking.
1270 If $options{after} is specified, only file events after this timestamp
1271 are returned.
1273 If $options{before} is specified, only file events before this
1274 timestamp are returned.
1276 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1277 be returned.
1279 If $options{max} is specified only this many events are returned.
1281 If $options{info} is specified, it must be a hashref. This hashref
1282 will be filled with metadata about the unfiltered recent_events of
1283 this object, in key C<first> there is the first item, in key C<last>
1284 is the last.
1286 =cut
1288 sub recent_events {
1289 my ($self, %options) = @_;
1290 my $info = $options{info};
1291 if ($self->is_slave) {
1292 $self->get_remote_recentfile_as_tempfile;
1294 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1295 -e $rfile_or_tempfile or return [];
1296 my $suffix = $self->serializer_suffix;
1297 my ($data) = eval {
1298 $self->_try_deserialize
1300 $suffix,
1301 $rfile_or_tempfile,
1304 my $err = $@;
1305 if ($err or !$data) {
1306 return [];
1308 my $re;
1309 if (reftype $data eq 'ARRAY') { # protocol 0
1310 $re = $data;
1311 } else {
1312 $re = $self->_recent_events_protocol_x
1314 $data,
1315 $rfile_or_tempfile,
1318 return $re unless defined $options{after}; # XXX same for before and max
1319 my $last_item = $#$re;
1320 if ($info) {
1321 $info->{first} = $re->[0];
1322 $info->{last} = $re->[-1];
1324 if (defined $options{after}) {
1325 if ($re->[0]{epoch} > $options{after}) {
1326 if (
1327 my $f = first
1328 {$re->[$_]{epoch} <= $options{after}}
1329 0..$#$re
1331 $last_item = $f-1;
1333 } else {
1334 $last_item = -1;
1337 my $first_item = 0;
1338 if (defined $options{before}) {
1339 if ($re->[0]{epoch} > $options{before}) {
1340 if (
1341 my $f = first
1342 {$re->[$_]{epoch} < $options{before}}
1343 0..$last_item
1345 $first_item = $f;
1347 } else {
1348 $first_item = 0;
1351 my @rre = splice @$re, $first_item, 1+$last_item-$first_item;
1352 if ($options{'skip-deletes'}) {
1353 @rre = grep { $_->{type} ne "delete" } @rre;
1355 if ($options{max} && @rre > $options{max}) {
1356 @rre = splice @rre, 0, $options{max};
1358 \@rre;
1361 sub _recent_events_protocol_x {
1362 my($self,
1363 $data,
1364 $rfile_or_tempfile,
1365 ) = @_;
1366 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1367 # we may be reading meta for the first time
1368 while (my($k,$v) = each %{$data->{meta}}) {
1369 next if $k ne lc $k; # "Producers"
1370 next if defined $self->$k;
1371 $self->$k($v);
1373 my $re = $self->$meth ($data);
1374 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1375 my $minmax = { mtime => $stat[9] };
1376 if (@$re) {
1377 $minmax->{min} = $re->[-1]{epoch};
1378 $minmax->{max} = $re->[0]{epoch};
1380 $self->minmax ( $minmax );
1381 return $re;
1384 sub _try_deserialize {
1385 my($self,
1386 $suffix,
1387 $rfile_or_tempfile,
1388 ) = @_;
1389 if ($suffix eq ".yaml") {
1390 require YAML::Syck;
1391 YAML::Syck::LoadFile($rfile_or_tempfile);
1392 } elsif ($HAVE->{"Data::Serializer"}) {
1393 my $serializer = Data::Serializer->new
1394 ( serializer => $serializers{$suffix} );
1395 my $serialized = do
1397 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1398 local $/;
1399 <$fh>;
1401 $serializer->raw_deserialize($serialized);
1402 } else {
1403 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1407 =head2 $ret = $obj->rfilename
1409 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1410 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1412 =cut
1414 sub rfilename {
1415 my($self) = @_;
1416 my $file = sprintf("%s-%s%s",
1417 $self->filenameroot,
1418 $self->interval,
1419 $self->serializer_suffix,
1421 return $file;
1424 =head2 $str = $self->remote_dir
1426 The directory we are mirroring from.
1428 =cut
1430 sub remote_dir {
1431 my($self, $set) = @_;
1432 if (defined $set) {
1433 $self->_remote_dir ($set);
1435 my $x = $self->_remote_dir;
1436 $self->is_slave (1);
1437 return $x;
1440 =head2 $str = $obj->remoteroot
1442 =head2 (void) $obj->remoteroot ( $set )
1444 Get/Set the composed prefix needed when rsyncing from a remote module.
1445 If remote_host, remote_module, and remote_dir are set, it is composed
1446 from these.
1448 =cut
1450 sub remoteroot {
1451 my($self, $set) = @_;
1452 if (defined $set) {
1453 $self->_remoteroot($set);
1455 my $remoteroot = $self->_remoteroot;
1456 unless (defined $remoteroot) {
1457 $remoteroot = sprintf
1459 "%s%s%s",
1460 defined $self->remote_host ? ($self->remote_host."::") : "",
1461 defined $self->remote_module ? ($self->remote_module."/") : "",
1462 defined $self->remote_dir ? $self->remote_dir : "",
1464 $self->_remoteroot($remoteroot);
1466 return $remoteroot;
1469 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1471 Inverse method to L<rfilename>. $recentfilename is a plain filename of
1472 the pattern
1474 $filenameroot-$interval$serializer_suffix
1476 e.g.
1478 RECENT-1M.yaml
1480 This filename is split into its parts and the parts are fed to the
1481 object itself.
1483 =cut
1485 sub resolve_recentfilename {
1486 my($self, $rfname) = @_;
1487 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1488 if (my($f,$i,$s) = $rfname =~ $splitter) {
1489 $self->filenameroot ($f);
1490 $self->interval ($i);
1491 $self->serializer_suffix ($s);
1492 } else {
1493 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1495 return;
1498 =head2 my $rfile = $obj->rfile
1500 Returns the full path of the I<recentfile>
1502 =cut
1504 sub rfile {
1505 my($self) = @_;
1506 my $rfile = $self->_rfile;
1507 return $rfile if defined $rfile;
1508 $rfile = File::Spec->catfile
1509 ($self->localroot,
1510 $self->rfilename,
1512 $self->_rfile ($rfile);
1513 return $rfile;
1516 =head2 $rsync_obj = $obj->rsync
1518 The File::Rsync object that this object uses for communicating with an
1519 upstream server.
1521 =cut
1523 sub rsync {
1524 my($self) = @_;
1525 my $rsync = $self->_rsync;
1526 unless (defined $rsync) {
1527 my $rsync_options = $self->rsync_options || {};
1528 if ($HAVE->{"File::Rsync"}) {
1529 $rsync = File::Rsync->new($rsync_options);
1530 $self->_rsync($rsync);
1531 } else {
1532 die "File::Rsync required for rsync operations. Cannot continue";
1535 return $rsync;
1538 =head2 (void) $obj->register_rsync_error($err)
1540 =head2 (void) $obj->un_register_rsync_error()
1542 Register_rsync_error is called whenever the File::Rsync object fails
1543 on an exec (say, connection doesn't succeed). It issues a warning and
1544 sleeps for an increasing amount of time. Un_register_rsync_error
1545 resets the error count. See also accessor C<max_rsync_errors>.
1547 =cut
1550 my $no_success_count = 0;
1551 my $no_success_time = 0;
1552 sub register_rsync_error {
1553 my($self, $err) = @_;
1554 chomp $err;
1555 $no_success_time = time;
1556 $no_success_count++;
1557 my $max_rsync_errors = $self->max_rsync_errors;
1558 $max_rsync_errors = 12 unless defined $max_rsync_errors;
1559 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1560 require Carp;
1561 Carp::confess
1563 sprintf
1565 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1566 $self->interval,
1567 $err,
1568 $no_success_count,
1571 my $sleep = 12 * $no_success_count;
1572 $sleep = 120 if $sleep > 120;
1573 require Carp;
1574 Carp::cluck
1575 (sprintf
1577 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1578 scalar(localtime($no_success_time)),
1579 $self->interval,
1580 $err,
1581 $sleep,
1583 sleep $sleep
1585 sub un_register_rsync_error {
1586 my($self) = @_;
1587 $no_success_time = 0;
1588 $no_success_count = 0;
1592 =head2 $clone = $obj->_sparse_clone
1594 Clones just as much from itself that it does not hurt. Experimental
1595 method.
1597 Note: what fits better: sparse or shallow? Other suggestions?
1599 =cut
1601 sub _sparse_clone {
1602 my($self) = @_;
1603 my $new = bless {}, ref $self;
1604 for my $m (qw(
1605 _interval
1606 _localroot
1607 _remoteroot
1608 _rfile
1609 _use_tempfile
1610 aggregator
1611 filenameroot
1612 is_slave
1613 max_files_per_connection
1614 protocol
1615 rsync_options
1616 serializer_suffix
1617 sleep_per_connection
1618 verbose
1619 )) {
1620 my $o = $self->$m;
1621 $o = Storable::dclone $o if ref $o;
1622 $new->$m($o);
1624 $new;
1627 =head2 $boolean = OBJ->ttl_reached ()
1629 =cut
1631 sub ttl_reached {
1632 my($self) = @_;
1633 my $have_mirrored = $self->have_mirrored || 0;
1634 my $now = Time::HiRes::time;
1635 my $ttl = $self->ttl;
1636 $ttl = 24.2 unless defined $ttl;
1637 if ($now > $have_mirrored + $ttl) {
1638 return 1;
1640 return 0;
1643 =head2 (void) $obj->unlock()
1645 Unlocking is implemented with an C<rmdir> on a locking directory
1646 (C<.lock> appended to $rfile).
1648 =cut
1650 sub unlock {
1651 my($self) = @_;
1652 return unless $self->_is_locked;
1653 my $rfile = $self->rfile;
1654 rmdir "$rfile.lock";
1655 $self->_is_locked (0);
1658 =head2 $ret = $obj->update ($path, $type)
1660 Enter one file into the local I<recentfile>. $path is the (usually
1661 absolute) path. If the path is outside the I<our> tree, then it is
1662 ignored.
1664 $type is one of C<new> or C<delete>.
1666 The new file event is uhshifted to the array of recent_events and the
1667 array is shortened to the length of the timespan allowed. This is
1668 usually the timespan specified by the interval of this recentfile but
1669 as long as this recentfile has not been merged to another one, the
1670 timespan may grow without bounds.
1672 =cut
1673 sub _epoch_monotonically_increasing {
1674 my($self,$epoch,$recent) = @_;
1675 return $epoch unless @$recent; # the first one goes unoffended
1676 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1677 return $epoch;
1678 } else {
1679 return _increase_a_bit($recent->[0]{epoch});
1682 sub update {
1683 my($self,$path,$type) = @_;
1684 die "update called without path argument" unless defined $path;
1685 die "update called without type argument" unless defined $type;
1686 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1687 my $canonmeth = $self->canonize;
1688 unless ($canonmeth) {
1689 $canonmeth = "naive_path_normalize";
1691 $path = $self->$canonmeth($path);
1692 my $lrd = $self->localroot;
1693 if ($path =~ s|^\Q$lrd\E||) {
1694 $path =~ s|^/||;
1695 my $interval = $self->interval;
1696 my $secs = $self->interval_secs();
1697 $self->lock;
1698 # you must calculate the time after having locked, of course
1699 my $epoch = Time::HiRes::time;
1700 my $recent = $self->recent_events;
1701 $epoch = $self->_epoch_monotonically_increasing($epoch,$recent);
1702 $recent ||= [];
1703 my $oldest_allowed = 0;
1704 if (my $merged = $self->merged) {
1705 # XXX _bigfloat!
1706 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
1707 } else {
1708 # as long as we are not merged at all, no limits!
1710 TRUNCATE: while (@$recent) {
1711 if ($recent->[-1]{epoch} < $oldest_allowed) { # XXX _bigfloatlt!
1712 pop @$recent;
1713 } else {
1714 last TRUNCATE;
1717 # remove older duplicates of this $path, irrespective of $type:
1718 $recent = [ grep { $_->{path} ne $path } @$recent ];
1720 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1721 $self->write_recent($recent);
1722 $self->_assert_symlink;
1723 $self->unlock;
1727 =head2 uptodate
1729 True if this object has mirrored the complete interval covered by the
1730 current recentfile.
1732 *** WIP ***
1734 =cut
1736 sub uptodate {
1737 my($self) = @_;
1738 my $uptodate;
1739 my $why;
1740 if ($self->ttl_reached){
1741 $why = "ttl_reached returned true, so we are not uptodate";
1742 $uptodate = 0 ;
1745 unless (defined $uptodate) {
1746 # look if recentfile has unchanged timestamp
1747 my $minmax = $self->minmax;
1748 if (exists $minmax->{mtime}) {
1749 my $rfile = $self->_my_current_rfile;
1750 my @stat = stat $rfile;
1751 my $mtime = $stat[9];
1752 if ($mtime > $minmax->{mtime}) {
1753 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
1754 $uptodate = 0;
1755 } else {
1756 my $covered = $self->done->covered(@$minmax{qw(max min)});
1757 $why = "minmax covered[$covered], so we return that";
1758 $uptodate = $covered;
1762 unless (defined $uptodate) {
1763 $why = "fallthrough, so not uptodate";
1764 $uptodate = 0;
1766 my $remember =
1768 uptodate => $uptodate,
1769 why => $why,
1771 $self->_remember_last_uptodate_call($remember);
1772 return $uptodate;
1775 =head2 $obj->write_recent ($recent_files_arrayref)
1777 Writes a I<recentfile> based on the current reflection of the current
1778 state of the tree limited by the current interval.
1780 =cut
1782 sub write_recent {
1783 my ($self,$recent) = @_;
1784 die "write_recent called without argument" unless defined $recent;
1785 my $meth = sprintf "write_%d", $self->protocol;
1786 $self->$meth($recent);
1789 =head2 $obj->write_0 ($recent_files_arrayref)
1791 Delegate of C<write_recent()> on protocol 0
1793 =cut
1795 sub write_0 {
1796 my ($self,$recent) = @_;
1797 my $rfile = $self->rfile;
1798 YAML::Syck::DumpFile("$rfile.new",$recent);
1799 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1802 =head2 $obj->write_1 ($recent_files_arrayref)
1804 Delegate of C<write_recent()> on protocol 1
1806 =cut
1808 sub write_1 {
1809 my ($self,$recent) = @_;
1810 my $rfile = $self->rfile;
1811 my $suffix = $self->serializer_suffix;
1812 my $data = {
1813 meta => $self->meta_data,
1814 recent => $recent,
1816 my $serialized;
1817 if ($suffix eq ".yaml") {
1818 $serialized = YAML::Syck::Dump($data);
1819 } elsif ($HAVE->{"Data::Serializer"}) {
1820 my $serializer = Data::Serializer->new
1821 ( serializer => $serializers{$suffix} );
1822 $serialized = $serializer->raw_serialize($data);
1823 } else {
1824 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1826 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
1827 print $fh $serialized;
1828 close $fh or die "Could not close '$rfile.new': $!";
1829 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1832 BEGIN {
1833 my @pod_lines =
1834 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1836 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1838 The idea is that we want to have a short file that records really
1839 recent changes. So that a fresh mirror can be kept fresh as long as
1840 the connectivity is given. Then we want longer files that record the
1841 history before. So when the mirror falls behind the update period
1842 reflected in the shortest file, it can complement the list of recent
1843 file events with the next one. And if this is not long enough we want
1844 another one, again a bit longer. And we want one that completes the
1845 history back to the oldest file. The index files do contain the
1846 complete list of current files. The larger an index file is the less
1847 often it is updated. For practical reasons adjacent files will often
1848 overlap a bit but this is neither necessary nor enforced. That's the
1849 basic idea. The following example represents a tree that has a few
1850 updates every day:
1852 RECENT.recent -> RECENT-1h.yaml
1853 RECENT-6h.yaml
1854 RECENT-1d.yaml
1855 RECENT-1M.yaml
1856 RECENT-1W.yaml
1857 RECENT-1Q.yaml
1858 RECENT-1Y.yaml
1859 RECENT-Z.yaml
1861 The first file is the principal file, in so far it is the one that is
1862 written first after a filesystem change. Usually a symlink links to it
1863 with a filename that has the same filenameroot and the suffix
1864 C<.recent>. On systems that do not support symlinks there is a plain
1865 copy maintained instead.
1867 The last file, the Z file, contains the complementary files that are
1868 in none of the other files. It does never contain C<deletes>. Besides
1869 this it serves the role of a recovery mechanism or spill over pond.
1870 When things go wrong, it's a valuable controlling instance to hold the
1871 differences between the collection of limited interval files and the
1872 actual filesystem.
1874 =head2 A SINGLE RECENTFILE
1876 A I<recentfile> consists of a hash that has two keys: C<meta> and
1877 C<recent>. The C<meta> part has metadata and the C<recent> part has a
1878 list of fileobjects.
1880 =head2 THE META PART
1882 Here we find things that are pretty much self explaining: all
1883 lowercase attributes are accessors and as such explained somewhere
1884 above in this manpage. The uppercase attribute C<Producers> contains
1885 version information about involved software components. Nothing to
1886 worry about as I believe.
1888 =head2 THE RECENT PART
1890 This is the interesting part. Every entry refers to some filesystem
1891 change (with path, epoch, type). The epoch value is the point in time
1892 when some change was I<registered>. Do not be tempted to believe that
1893 the entry has a direct relation to something like modification time or
1894 change time on the filesystem level. The timestamp (I<epoch> element)
1895 is a floating point number and does practically never correspond
1896 exactly to the data recorded in the filesystem but rather to the time
1897 when some process succeeded to report to the I<recentfile> mechanism
1898 that something has changed. This is why many parts of the code refer
1899 to I<events>, because we merely try to record the I<event> of the
1900 discovery of a change, not the time of the change itself.
1902 All these entries can be devided into two types (denoted by the
1903 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
1904 C<new>s. Deletes are C<delete>s.
1906 Another distinction is for objects with an epoch timestamp and others
1907 without. All files that were already existing on the filesystem before
1908 the I<recentfile> mechanism was installed, get recorded with a
1909 timestamp of zero.
1911 Besides an C<epoch> and a C<type> attribute we find a third one:
1912 C<path>. This path is relative to the directory we find the
1913 I<recentfile> in.
1915 The order of the entries in the I<recentfile> is by decreasing epoch
1916 attribute. These are either 0 or a unique floating point number. They
1917 are zero for events that were happening either before the time that
1918 the I<recentfile> mechanism was set up or were left undiscovered for a
1919 while and never handed over to update(). They are floating point
1920 numbers for all events being regularly handed to update(). And when
1921 the server has ntp running correctly, then the timestamps are
1922 actually decreasing and unique.
1924 =head1 CORRUPTION AND RECOVERY
1926 If the origin host breaks the promise to deliver consistent and
1927 complete I<recentfiles> then the way back to sanity shall be achieved
1928 through either the C<zloop> (still TBD) or traditional rsyncing
1929 between the hosts. For example, if the origin server forgets to deploy
1930 ntp and the clock on it jumps backwards some day, then this would
1931 probably go unnoticed for a while and many software components that
1932 rely on the time never running backwards will make wrong decisions.
1933 After some time this accident would probably still be found in one of
1934 the I<recentfiles> but would become meaningless as soon as a mirror
1935 has run through the sanitizing procedures. Same goes for origin hosts
1936 that forget to include or deliberately omit some files.
1938 =head1 SERIALIZERS
1940 The following suffixes are supported and trigger the use of these
1941 serializers:
1943 =over 4
1945 =item C<< ".yaml" => "YAML::Syck" >>
1947 =item C<< ".json" => "JSON" >>
1949 =item C<< ".sto" => "Storable" >>
1951 =item C<< ".dd" => "Data::Dumper" >>
1953 =back
1955 =cut
1957 BEGIN {
1958 my @pod_lines =
1959 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1961 =head1 INTERVAL SPEC
1963 An interval spec is a primitive way to express time spans. Normally it
1964 is composed from an integer and a letter.
1966 As a special case, a string that consists only of the single letter
1967 C<Z>, stands for unlimited time.
1969 The following letters express the specified number of seconds:
1971 =over 4
1973 =item C<< s => 1 >>
1975 =item C<< m => 60 >>
1977 =item C<< h => 60*60 >>
1979 =item C<< d => 60*60*24 >>
1981 =item C<< W => 60*60*24*7 >>
1983 =item C<< M => 60*60*24*30 >>
1985 =item C<< Q => 60*60*24*90 >>
1987 =item C<< Y => 60*60*24*365.25 >>
1989 =back
1991 =cut
1993 =head1 BACKGROUND
1995 This is about speeding up rsync operation on large trees to many
1996 places. Uses a small metadata cocktail and pull technology.
1998 =head2 NON-COMPETITORS
2000 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
2001 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
2002 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
2003 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
2004 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
2005 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
2007 rsnapshot www.rsnapshot.org focus on backup
2008 csync www.csync.org more like unison
2010 =head2 COMPETITORS
2012 The problem to solve which clusters and ftp mirrors and otherwise
2013 replicated datasets like CPAN share: how to transfer only a minimum
2014 amount of data to determine the diff between two hosts.
2016 Normally it takes a long time to determine the diff itself before it
2017 can be transferred. Known solutions at the time of this writing are
2018 csync2, and rsync 3 batch mode.
2020 For many years the best solution was csync2 which solves the
2021 problem by maintining a sqlite database on both ends and talking a
2022 highly sophisticated protocol to quickly determine which files to send
2023 and which to delete at any given point in time. Csync2 is often
2024 inconvenient because the act of syncing demands quite an intimate
2025 relationship between the sender and the receiver and suffers when the
2026 number of syncing sites is large or connections are unreliable.
2028 Rsync 3 batch mode works around these problems by providing rsync-able
2029 batch files which allow receiving nodes to replay the history of the
2030 other nodes. This reduces the need to have an incestuous relation but
2031 it has the disadvantage that these batch files replicate the contents
2032 of the involved files. This seems inappropriate when the nodes already
2033 have a means of communicating over rsync.
2035 rersyncrecent solves this problem with a couple of (usually 2-10)
2036 index files which cover different overlapping time intervals. The
2037 master writes these files and the clients can construct the full tree
2038 from the information contained in them. The most recent index file
2039 usually covers the last seconds or minutes or hours of the tree and
2040 depending on the needs, slaves can rsync every few seconds and then
2041 bring their trees in full sync.
2043 The rersyncrecent mode was developed for CPAN but I hope it is a
2044 convenient and economic general purpose solution. I'm looking forward
2045 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2046 then ... the first FUSE based CPAN filesystem anyone?
2048 =head1 AUTHOR
2050 Andreas König
2052 =head1 BUGS
2054 Please report any bugs or feature requests through the web interface
2056 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2057 I will be notified, and then you'll automatically be notified of
2058 progress on your bug as I make changes.
2060 =head1 SUPPORT
2062 You can find documentation for this module with the perldoc command.
2064 perldoc File::Rsync::Mirror::Recentfile
2066 You can also look for information at:
2068 =over 4
2070 =item * RT: CPAN's request tracker
2072 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2074 =item * AnnoCPAN: Annotated CPAN documentation
2076 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2078 =item * CPAN Ratings
2080 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2082 =item * Search CPAN
2084 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2086 =back
2089 =head1 ACKNOWLEDGEMENTS
2091 Thanks to RJBS for module-starter.
2093 =head1 COPYRIGHT & LICENSE
2095 Copyright 2008 Andreas König.
2097 This program is free software; you can redistribute it and/or modify it
2098 under the same terms as Perl itself.
2101 =cut
2103 1; # End of File::Rsync::Mirror::Recentfile
2105 # Local Variables:
2106 # mode: cperl
2107 # cperl-indent-level: 4
2108 # End: