put the bigfloat stuff into its own file so we can use it everywhere wil less effort
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob9e0c006acf239a0d8a9690bee84f17d26458dd76
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 my $HAVE = {};
19 for my $package (
20 "Data::Serializer",
21 "File::Rsync"
22 ) {
23 $HAVE->{$package} = eval qq{ require $package; };
25 use Config;
26 use File::Basename qw(dirname fileparse);
27 use File::Copy qw(cp);
28 use File::Path qw(mkpath);
29 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
30 use File::Temp;
31 use List::Util qw(first min);
32 use Scalar::Util qw(reftype);
33 use Storable;
34 use Time::HiRes qw();
35 use YAML::Syck;
37 use version; our $VERSION = qv('0.0.1');
40 use constant MAX_INT => ~0>>1; # anything better?
41 use constant DEFAULT_PROTOCOL => 1;
43 # cf. interval_secs
44 my %seconds;
46 # maybe subclass if this mapping is bad?
47 my %serializers;
49 =head1 SYNOPSIS
51 B<!!!! PRE-ALPHA ALERT !!!!>
53 Nothing in here is believed to be stable, nothing yet intended for
54 public consumption. The plan is to provide a script in one of the next
55 releases that acts as a frontend for all the backend functionality.
56 Option and method names will very likely change.
58 For the rationale see the section BACKGROUND.
60 This is published only for developers of the (yet to be named)
61 script(s).
63 Writer (of a single file):
65 use File::Rsync::Mirror::Recentfile;
66 my $fr = File::Rsync::Mirror::Recentfile->new
68 interval => q(6h),
69 filenameroot => "RECENT",
70 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
71 localroot => "/home/ftp/pub/PAUSE/authors/",
72 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
74 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
76 Reader/mirrorer:
78 my $rf = File::Rsync::Mirror::Recentfile->new
80 filenameroot => "RECENT",
81 ignore_link_stat_errors => 1,
82 interval => q(6h),
83 localroot => "/home/ftp/pub/PAUSE/authors",
84 remote_dir => "",
85 remote_host => "pause.perl.org",
86 remote_module => "authors",
87 rsync_options => {
88 compress => 1,
89 'rsync-path' => '/usr/bin/rsync',
90 links => 1,
91 times => 1,
92 'omit-dir-times' => 1,
93 checksum => 1,
95 verbose => 1,
97 $rf->mirror;
99 Aggregator (usually the writer):
101 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
102 $rf->aggregate;
104 =head1 EXPORT
106 No exports.
108 =head1 CONSTRUCTORS / DESTRUCTOR
110 =head2 my $obj = CLASS->new(%hash)
112 Constructor. On every argument pair the key is a method name and the
113 value is an argument to that method name.
115 If a recentfile for this resource already exists, metadata that are
116 not defined by the constructor will be fetched from there as soon as
117 it is being read by recent_events().
119 =cut
121 sub new {
122 my($class, @args) = @_;
123 my $self = bless {}, $class;
124 while (@args) {
125 my($method,$arg) = splice @args, 0, 2;
126 $self->$method($arg);
128 unless (defined $self->protocol) {
129 $self->protocol(DEFAULT_PROTOCOL);
131 unless (defined $self->filenameroot) {
132 $self->filenameroot("RECENT");
134 unless (defined $self->serializer_suffix) {
135 $self->serializer_suffix(".yaml");
137 return $self;
140 =head2 my $obj = CLASS->new_from_file($file)
142 Constructor. $file is a I<recentfile>.
144 =cut
146 sub new_from_file {
147 my($class, $file) = @_;
148 my $self = bless {}, $class;
149 $self->_rfile($file);
150 #?# $self->lock;
151 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
152 local $/;
153 <$fh>;
155 # XXX: we can skip this step when the metadata are sufficient, but
156 # we cannot parse the file without some magic stuff about
157 # serialized formats
158 while (-l $file) {
159 my($name,$path) = fileparse $file;
160 my $symlink = readlink $file;
161 if ($symlink =~ m|/|) {
162 die "FIXME: filenames containing '/' not supported, got $symlink";
164 $file = File::Spec->catfile ( $path, $symlink );
166 my($name,$path,$suffix) = fileparse $file, keys %serializers;
167 $self->serializer_suffix($suffix);
168 $self->localroot($path);
169 die "Could not determine file format from suffix" unless $suffix;
170 my $deserialized;
171 if ($suffix eq ".yaml") {
172 require YAML::Syck;
173 $deserialized = YAML::Syck::LoadFile($file);
174 } elsif ($HAVE->{"Data::Serializer"}) {
175 my $serializer = Data::Serializer->new
176 ( serializer => $serializers{$suffix} );
177 $deserialized = $serializer->raw_deserialize($serialized);
178 } else {
179 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
181 while (my($k,$v) = each %{$deserialized->{meta}}) {
182 next if $k ne lc $k; # "Producers"
183 $self->$k($v);
185 unless (defined $self->protocol) {
186 $self->protocol(DEFAULT_PROTOCOL);
188 return $self;
191 =head2 DESTROY
193 A simple unlock.
195 =cut
196 sub DESTROY { shift->unlock }
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_done",
209 "_interval",
210 "_is_locked",
211 "_localroot",
212 "_merged",
213 "_pathdb",
214 "_remote_dir",
215 "_remoteroot",
216 "_rfile",
217 "_rsync",
218 "_use_tempfile",
221 my @pod_lines =
222 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
224 =over 4
226 =item aggregator
228 A list of interval specs that tell the aggregator which I<recentfile>s
229 are to be produced.
231 =item canonize
233 The name of a method to canonize the path before rsyncing. Only
234 supported value is C<naive_path_normalize>. Defaults to that.
236 =item comment
238 A comment about this tree and setup.
240 =item filenameroot
242 The (prefix of the) filename we use for this I<recentfile>. Defaults to
243 C<RECENT>.
245 =item have_mirrored
247 Timestamp remembering when we mirrored this recentfile the last time.
248 Only relevant for slaves.
250 =item ignore_link_stat_errors
252 If set to true, rsync errors are ignored that complain about link stat
253 errors. These seem to happen only when there are files missing at the
254 origin. In race conditions this can always happen, so it is
255 recommended to set this value to true.
257 =item is_slave
259 If set to true, this object will fetch a new recentfile from remote
260 when the timespan between the last mirror (see have_mirrored) and now
261 is too large (currently hardcoded arbitrary 420 seconds).
263 =item locktimeout
265 After how many seconds shall we die if we cannot lock a I<recentfile>?
266 Defaults to 600 seconds.
268 =item loopinterval
270 When mirror_loop is called, this accessor can specify how much time
271 every loop shall at least take. If the work of a loop is done before
272 that time has gone, sleeps for the rest of the time. Defaults to
273 arbitrary 42 seconds.
275 =item max_files_per_connection
277 Maximum number of files that are transferred on a single rsync call.
278 Setting it higher means higher performance at the price of holding
279 connections longer and potentially disturbing other users in the pool.
280 Defaults to the arbitrary value 42.
282 =item max_rsync_errors
284 When rsync operations encounter that many errors without any resetting
285 success in between, then we die. Defaults to arbitrary 12. A value of
286 -1 means we run forever ignoring all rsync errors.
288 =item minmax
290 Hashref remembering when we read the recent_events from this file the
291 last time and what the timespan was.
293 =item protocol
295 When the RECENT file format changes, we increment the protocol. We try
296 to support older protocols in later releases.
298 =item remote_host
300 The host we are mirroring from. Leave empty for the local filesystem.
302 =item remote_module
304 Rsync servers have so called modules to separate directory trees from
305 each other. Put here the name of the module under which we are
306 mirroring. Leave empty for local filesystem.
308 =item rsync_options
310 Things like compress, links, times or checksums. Passed in to the
311 File::Rsync object used to run the mirror.
313 =item serializer_suffix
315 Mostly untested accessor. The only well tested format for
316 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
317 Data::Serializer. But in principle other formats are supported as
318 well. See section SERIALIZERS below.
320 =item sleep_per_connection
322 Sleep that many seconds (floating point OK) after every chunk of rsyncing
323 has finished. Defaults to arbitrary 0.42.
325 =item ttl
327 Time to live. Number of seconds after which this recentfile must be
328 fetched again from the origin server. Only relevant for slaves.
329 Defaults to arbitrary 24.2 seconds.
331 =item verbose
333 Boolean to turn on a bit verbosity.
335 =back
337 =cut
339 use accessors @accessors;
341 =head1 METHODS
343 =head2 (void) $obj->aggregate
345 Takes all intervals that are collected in the accessor called
346 aggregator. Sorts them by actual length of the interval.
347 Removes those that are shorter than our own interval. Then merges this
348 object into the next larger object. The merging continues upwards
349 as long as the next I<recentfile> is old enough to warrant a merge.
351 If a merge is warranted is decided according to the interval of the
352 previous interval so that larger files are not so often updated as
353 smaller ones.
355 Here is an example to illustrate the behaviour. Given aggregators
357 1h 1d 1W 1M 1Q 1Y Z
359 then
361 1h updates 1d on every call to aggregate()
362 1d updates 1W earliest after 1h
363 1W updates 1M earliest after 1d
364 1M updates 1Q earliest after 1W
365 1Q updates 1Y earliest after 1M
366 1Y updates Z earliest after 1Q
368 Note that all but the smallest recentfile get updated at an arbitrary
369 rate and as such are quite useless on their own.
371 =cut
373 sub aggregate {
374 my($self) = @_;
375 my @aggs = sort { $a->{secs} <=> $b->{secs} }
376 grep { $_->{secs} >= $self->interval_secs }
377 map { { interval => $_, secs => $self->interval_secs($_)} }
378 $self->interval, @{$self->aggregator || []};
379 $aggs[0]{object} = $self;
380 AGGREGATOR: for my $i (0..$#aggs-1) {
381 my $this = $aggs[$i]{object};
382 my $next = $this->_sparse_clone;
383 $next->interval($aggs[$i+1]{interval});
384 my $want_merge = 0;
385 if ($i == 0) {
386 $want_merge = 1;
387 } else {
388 my $next_rfile = $next->rfile;
389 if (-e $next_rfile) {
390 my $prev = $aggs[$i-1]{object};
391 local $^T = time;
392 my $next_age = 86400 * -M $next_rfile;
393 if ($next_age > $prev->interval_secs) {
394 $want_merge = 1;
396 } else {
397 $want_merge = 1;
400 if ($want_merge) {
401 $next->merge($this);
402 $aggs[$i+1]{object} = $next;
403 } else {
404 last AGGREGATOR;
409 # collect file size and mtime for all files of this aggregate
410 sub _debug_aggregate {
411 my($self) = @_;
412 my @aggs = sort { $a->{secs} <=> $b->{secs} }
413 map { { interval => $_, secs => $self->interval_secs($_)} }
414 $self->interval, @{$self->aggregator || []};
415 my $report = [];
416 for my $i (0..$#aggs) {
417 my $this = Storable::dclone $self;
418 $this->interval($aggs[$i]{interval});
419 my $rfile = $this->rfile;
420 my @stat = stat $rfile;
421 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
423 $report;
426 # (void) $self->_assert_symlink()
427 sub _assert_symlink {
428 my($self) = @_;
429 my $recentrecentfile = File::Spec->catfile
431 $self->localroot,
432 sprintf
434 "%s.recent",
435 $self->filenameroot
438 if ($Config{d_symlink} eq "define") {
439 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
440 if (-l $recentrecentfile) {
441 my $found_symlink = readlink $recentrecentfile;
442 if ($found_symlink eq $self->rfilename) {
443 return;
444 } else {
445 $howto_create_symlink = 2;
447 } else {
448 $howto_create_symlink = 1;
450 if (1 == $howto_create_symlink) {
451 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
452 } else {
453 unlink "$recentrecentfile.$$"; # may fail
454 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
455 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
457 } else {
458 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
459 unlink "$recentrecentfile.$$"; # may fail
460 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
461 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
465 =head2 $done = $obj->done
467 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
468 that keeps track of rsync activities. Only used/needed when we are a
469 mirroring slave.
471 =cut
473 sub done {
474 my($self) = @_;
475 my $done = $self->_done;
476 if (!$done) {
477 require File::Rsync::Mirror::Recentfile::Done;
478 $done = File::Rsync::Mirror::Recentfile::Done->new();
479 $self->_done ( $done );
481 return $done;
484 =head2 $success = $obj->full_mirror
486 (TBD) Mirrors the whole remote site, starting with the smallest I<recentfile>,
487 switching to larger ones ...
489 =cut
491 sub full_mirror {
492 my($self) = @_;
493 die "FIXME: Not yet implemented";
496 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ($rfilename)
498 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
500 Stores the remote I<recentfile> locally as a tempfile. $rfilename must
501 be a plain filename without path separators. The second form fetches
502 the file with the default name. The caller is responsible to remove
503 the file after use.
505 Note: if you're intending to act as an rsync server for other slaves,
506 then you must prefer this method to mirror (and read) recentfiles over
507 get_remotefile(). Otherwise downstream mirrors would expect you to
508 have files that you do not have yet.
510 Note: currently we have an arbitrary brake built into the method:
511 before 4.42 seconds are over since the last download we will return
512 without downloading. XXX
514 =cut
516 sub get_remote_recentfile_as_tempfile {
517 my($self, $trfilename) = @_;
518 mkpath $self->localroot;
519 my $fh;
520 if ($trfilename) {
521 $self->_use_tempfile (1); # why?
522 } elsif ( $self->_use_tempfile() ) {
523 return $self->_current_tempfile if ! $self->ttl_reached;
524 $fh = $self->_current_tempfile_fh;
525 $trfilename = $self->rfilename;
526 } else {
527 $trfilename = $self->rfilename;
530 return $trfilename
531 if (!$trfilename
532 && $self->have_mirrored
533 && Time::HiRes::time-$self->have_mirrored < 4.42
535 die "Alert: illegal filename[$trfilename] contains a slash" if $trfilename =~ m|/|;
536 my $dst;
537 if ($fh) {
538 $dst = $self->_current_tempfile;
539 } else {
540 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
541 $dst = $fh->filename;
542 $self->_current_tempfile ($dst);
543 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
544 if (defined $rfile && -e $rfile) {
545 # saving on bandwidth. Might need to be configurable
546 # $self->bandwidth_is_cheap?
547 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
550 my $src = join ("/",
551 $self->remoteroot,
552 $trfilename,
554 if ($self->verbose) {
555 my $doing = -e $dst ? "Syncing" : "Getting";
556 printf STDERR
558 "%s (1/1) temporary %s ... ",
559 $doing,
560 $dst,
563 my $gaveup = 0;
564 my $retried = 0;
565 while (!$self->rsync->exec(
566 src => $src,
567 dst => $dst,
568 )) {
569 $self->register_rsync_error ($self->rsync->err);
570 if (++$retried >= 3) {
571 warn "XXX giving up";
572 $gaveup = 1;
573 last;
576 unless ($gaveup) {
577 $self->have_mirrored (Time::HiRes::time);
578 $self->un_register_rsync_error ();
580 if ($self->verbose) {
581 print STDERR "DONE\n";
583 my $mode = 0644;
584 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
585 return $dst;
588 sub _get_remote_rat_provide_tempfile_object {
589 my($self, $trfilename) = @_;
590 my $fh = File::Temp->new
591 (TEMPLATE => sprintf(".%s-XXXX",
592 $trfilename,
594 DIR => $self->localroot,
595 SUFFIX => $self->serializer_suffix,
596 UNLINK => $self->_use_tempfile,
598 if ($self->_use_tempfile) {
599 $self->_current_tempfile_fh ($fh); # delay self destruction
601 return $fh;
604 =head2 $localpath = $obj->get_remotefile ( $relative_path )
606 Rsyncs one single remote file to local filesystem.
608 Note: no locking is done on this file. Any number of processes may
609 mirror this object.
611 Note II: do not use for recentfiles. If you are a cascading
612 slave/server combination, it would confuse other slaves. They would
613 expect the contents of these recentfiles to be available. Use
614 get_remote_recentfile_as_tempfile() instead.
616 =cut
618 sub get_remotefile {
619 my($self, $path) = @_;
620 my $dst = File::Spec->catfile($self->localroot, $path);
621 mkpath dirname $dst;
622 if ($self->verbose) {
623 my $doing = -e $dst ? "Syncing" : "Getting";
624 printf STDERR
626 "%s (1/1) %s ... ",
627 $doing,
628 $path,
631 while (!$self->rsync->exec(
632 src => join("/",
633 $self->remoteroot,
634 $path),
635 dst => $dst,
636 )) {
637 $self->register_rsync_error ($self->rsync->err);
639 $self->un_register_rsync_error ();
640 if ($self->verbose) {
641 print STDERR "DONE\n";
643 return $dst;
646 =head2 $obj->interval ( $interval_spec )
648 Get/set accessor. $interval_spec is a string and described below in
649 the section INTERVAL SPEC.
651 =cut
653 sub interval {
654 my ($self, $interval) = @_;
655 if (@_ >= 2) {
656 $self->_interval($interval);
657 $self->_rfile(undef);
659 $interval = $self->_interval;
660 unless (defined $interval) {
661 # do not ask the $self too much, it recurses!
662 require Carp;
663 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
665 return $interval;
668 =head2 $secs = $obj->interval_secs ( $interval_spec )
670 $interval_spec is described below in the section INTERVAL SPEC. If
671 empty defaults to the inherent interval for this object.
673 =cut
675 sub interval_secs {
676 my ($self, $interval) = @_;
677 $interval ||= $self->interval;
678 unless (defined $interval) {
679 die "interval_secs() called without argument on an object without a declared one";
681 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
682 die "Could not determine seconds from interval[$interval]";
683 if ($interval eq "Z") {
684 return MAX_INT;
685 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
686 return $seconds{$t}*$n;
687 } else {
688 die "Invalid interval specification: n[$n]t[$t]";
692 =head2 $obj->localroot ( $localroot )
694 Get/set accessor. The local root of the tree.
696 =cut
698 sub localroot {
699 my ($self, $localroot) = @_;
700 if (@_ >= 2) {
701 $self->_localroot($localroot);
702 $self->_rfile(undef);
704 $localroot = $self->_localroot;
707 =head2 $ret = $obj->local_path($path_found_in_recentfile)
709 Combines the path to our local mirror and the path of an object found
710 in this I<recentfile>. In other words: the target of a mirror operation.
712 Implementation note: We split on slashes and then use
713 File::Spec::catfile to adjust to the local operating system.
715 =cut
717 sub local_path {
718 my($self,$path) = @_;
719 unless (defined $path) {
720 # seems like a degenerated case
721 return $self->localroot;
723 my @p = split m|/|, $path;
724 File::Spec->catfile($self->localroot,@p);
727 =head2 (void) $obj->lock
729 Locking is implemented with an C<mkdir> on a locking directory
730 (C<.lock> appended to $rfile).
732 =cut
734 sub lock {
735 my ($self) = @_;
736 # not using flock because it locks on filehandles instead of
737 # old school ressources.
738 my $locked = $self->_is_locked and return;
739 my $rfile = $self->rfile;
740 # XXX need a way to allow breaking the lock
741 my $start = time;
742 my $locktimeout = $self->locktimeout || 600;
743 while (not mkdir "$rfile.lock") {
744 Time::HiRes::sleep 0.01;
745 if (time - $start > $locktimeout) {
746 die "Could not acquire lockdirectory '$rfile.lock': $!";
749 $self->_is_locked (1);
752 =head2 (void) $obj->merge ($other)
754 Bulk update of this object with another one. It's used to merge a
755 smaller and younger $other object into the current one. If this file
756 is a C<Z> file, then we do not merge in objects of type C<delete>. But
757 if we encounter an object of type delete we delete the corresponding
758 C<new> object.
760 If there is nothing to be merged, nothing is done.
762 =cut
764 sub merge {
765 my($self, $other) = @_;
766 $self->_merge_sanitycheck ( $other );
767 $other->lock;
768 my $other_recent = $other->recent_events || [];
769 $self->lock;
770 my $my_recent = $self->recent_events || [];
772 # calculate the target time span
773 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
774 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
775 my $oldest_allowed = 0;
776 my $something_done;
777 unless ($my_recent->[0]) {
778 # obstetrics
779 $something_done=1;
781 if ($epoch) {
782 if (my $merged = $self->merged) {
783 my $secs = $self->interval_secs();
784 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
786 # throw away outsiders
787 # XXX _bigfloat!
788 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
789 pop @$my_recent;
790 $something_done=1;
794 my %have;
795 my $recent = [];
796 for my $oev (@$other_recent) {
797 my $oevepoch = $oev->{epoch} || 0;
798 next if $oevepoch < $oldest_allowed;
799 my $path = $oev->{path};
800 next if $have{$path}++;
801 if ( $self->interval eq "Z"
802 and $oev->{type} eq "delete") {
803 # do nothing
804 } else {
805 if (!$myepoch || $oevepoch > $myepoch) {
806 $something_done=1;
808 push @$recent, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
811 if ($something_done) {
812 push @$recent, grep { !$have{$_->{path}}++ } @$my_recent;
813 $self->write_recent($recent);
814 $other->merged({
815 time => Time::HiRes::time, # not used anywhere
816 epoch => $epoch, # used in oldest_allowed
817 into_interval => $self->interval, # not used anywhere
819 $other->write_recent($other_recent);
821 $self->unlock;
822 $other->unlock;
825 sub _merge_sanitycheck {
826 my($self, $other) = @_;
827 if ($self->interval_secs <= $other->interval_secs) {
828 die sprintf
830 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
831 $self->interval_secs,
832 $other->interval_secs,
837 =head2 merged
839 Hashref denoting when this recentfile has been merged into some other
840 at which epoch.
842 =cut
844 sub merged {
845 my($self, $set) = @_;
846 if (defined $set) {
847 $self->_merged ($set);
849 my $merged = $self->_merged;
850 my $into;
851 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
852 if ($into eq $self->interval) {
853 warn sprintf
855 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
856 $into,
857 $self->interval,
859 } elsif ($self->interval_secs($into) < $self->interval_secs) {
860 warn sprintf
862 "Warning: into_interval[%s] smaller than own interval[%s] on interval[%s]. Danger ahead.",
863 $self->interval_secs($into),
864 $self->interval_secs,
865 $self->interval,
869 $merged;
872 =head2 $hashref = $obj->meta_data
874 Returns the hashref of metadata that the server has to add to the
875 I<recentfile>.
877 =cut
879 sub meta_data {
880 my($self) = @_;
881 my $ret = $self->{meta};
882 for my $m (
883 "aggregator",
884 "canonize",
885 "comment",
886 "filenameroot",
887 "merged",
888 "interval",
889 "protocol",
890 "serializer_suffix",
892 my $v = $self->$m;
893 if (defined $v) {
894 $ret->{$m} = $v;
897 # XXX need to reset the Producer if I am a writer, keep it when I
898 # am a reader
899 $ret->{Producers} ||= {
900 __PACKAGE__, "$VERSION", # stringified it looks better
901 '$0', $0,
902 'time', Time::HiRes::time,
904 return $ret;
907 =head2 $success = $obj->mirror ( %options )
909 Mirrors the files in this I<recentfile> as reported by
910 C<recent_events>. Options named C<after>, C<before>, C<max>, and
911 C<skip-deletes> are passed through to the L<recent_events> call. The
912 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
913 C<max_files_per_connection> and keep track of the rsynced files so
914 that future calls will rsync different files until all files are
915 brought to sync.
917 =cut
919 sub mirror {
920 my($self, %options) = @_;
921 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
922 $self->_use_tempfile (1);
923 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
924 my ($recent_events) = $self->recent_events(%passthrough);
925 my(@error, @xcollector);
926 my $first_item = 0;
927 my $last_item = $#$recent_events;
928 my $done = $self->done;
929 my $pathdb = $self->_pathdb;
930 ITEM: for my $i ($first_item..$last_item) {
931 my $status = +{};
932 $self->_mirror_item
935 $recent_events,
936 $last_item,
937 $done,
938 $pathdb,
939 \@xcollector,
940 \%options,
941 $status,
942 \@error,
944 return if $status->{mustreturn};
946 if (@xcollector) {
947 my $success = eval { $self->_empty_xcollector (\@xcollector,$pathdb,$recent_events);};
948 if (!$success || $@) {
949 warn "Warning: Unknown error while mirroring: $@";
950 push @error, $@;
951 sleep 1;
953 if ($self->verbose) {
954 print STDERR "DONE\n";
957 my $rfile = $self->rfile;
958 unless (rename $trecentfile, $rfile) {
959 require Carp;
960 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
962 $self->_use_tempfile (0);
963 if (my $ctfh = $self->_current_tempfile_fh) {
964 $ctfh->unlink_on_destroy (0);
965 $self->_current_tempfile_fh (undef);
967 return !@error;
970 sub _mirror_item {
971 my($self,
973 $recent_events,
974 $last_item,
975 $done,
976 $pathdb,
977 $xcollector,
978 $options,
979 $status,
980 $error,
981 ) = @_;
982 my $recent_event = $recent_events->[$i];
983 return if $done->covered ( $recent_event->{epoch} );
984 if ($pathdb) {
985 my $rec = $pathdb->{$recent_event->{path}};
986 if ($rec && $rec->{recentepoch}) {
987 if (_bigfloatgt
988 ( $rec->{recentepoch}, $recent_event->{epoch} )){
989 $done->register ($recent_events, [$i]);
990 return;
994 my $dst = $self->local_path($recent_event->{path});
995 if ($recent_event->{type} eq "new"){
996 $self->_mirror_item_new
998 $dst,
1000 $last_item,
1001 $recent_events,
1002 $recent_event,
1003 $xcollector,
1004 $pathdb,
1005 $status,
1006 $error,
1007 $options,
1009 } elsif ($recent_event->{type} eq "delete") {
1010 my $activity;
1011 if ($options->{'skip-deletes'}) {
1012 $activity = "skipp";
1013 } else {
1014 if (-l $dst or not -d _) {
1015 unless (unlink $dst) {
1016 require Carp;
1017 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1019 } else {
1020 unless (rmdir $dst) {
1021 require Carp;
1022 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1025 $activity = "delet";
1027 $done->register ($recent_events, [$i]);
1028 if ($pathdb) {
1029 $self->_register_path($pathdb,[$recent_event],$activity);
1031 } else {
1032 warn "Warning: invalid upload type '$recent_event->{type}'";
1036 sub _mirror_item_new {
1037 my($self,
1038 $dst,
1040 $last_item,
1041 $recent_events,
1042 $recent_event,
1043 $xcollector,
1044 $pathdb,
1045 $status,
1046 $error,
1047 $options,
1048 ) = @_;
1049 if ($self->verbose) {
1050 my $doing = -e $dst ? "Syncing" : "Getting";
1051 printf STDERR
1053 "%s (%d/%d/%s) %s ... ",
1054 $doing,
1055 1+$i,
1056 1+$last_item,
1057 $self->interval,
1058 $recent_event->{path},
1061 my $max_files_per_connection = $self->max_files_per_connection || 42;
1062 my $success;
1063 if ($self->verbose) {
1064 print STDERR "\n";
1066 push @$xcollector, { rev => $recent_event, i => $i };
1067 if (@$xcollector >= $max_files_per_connection) {
1068 $success = eval {$self->_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1069 my $sleep = $self->sleep_per_connection;
1070 $sleep = 0.42 unless defined $sleep;
1071 Time::HiRes::sleep $sleep;
1072 if ($options->{piecemeal}) {
1073 $status->{mustreturn} = 1;
1074 return;
1076 } else {
1077 return;
1079 if (!$success || $@) {
1080 warn "Warning: Error while mirroring: $@";
1081 push @$error, $@;
1082 sleep 1;
1084 if ($self->verbose) {
1085 print STDERR "DONE\n";
1089 sub _empty_xcollector {
1090 my($self,$xcoll,$pathdb,$recent_events) = @_;
1091 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1092 if ($pathdb) {
1093 $self->_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1095 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1096 @$xcoll = ();
1097 return $success;
1100 sub _register_path {
1101 my($self,$db,$coll,$act) = @_;
1102 my $time = time;
1103 for my $item (@$coll) {
1104 $db->{$item->{path}} =
1106 recentepoch => $item->{epoch},
1107 ($act."edon") => $time,
1112 =head2 (void) $obj->mirror_loop
1114 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1115 What happens/should happen if we miss the interval during a single loop?
1117 =cut
1119 sub mirror_loop {
1120 my($self) = @_;
1121 my $iteration_start = time;
1123 my $Signal = 0;
1124 $SIG{INT} = sub { $Signal++ };
1125 my $loopinterval = $self->loopinterval || 42;
1126 my $after = -999999999;
1127 LOOP: while () {
1128 $self->mirror($after);
1129 last LOOP if $Signal;
1130 my $re = $self->recent_events;
1131 $after = $re->[0]{epoch};
1132 if ($self->verbose) {
1133 local $| = 1;
1134 print "($after)";
1136 if (time - $iteration_start < $loopinterval) {
1137 sleep $iteration_start + $loopinterval - time;
1139 if ($self->verbose) {
1140 local $| = 1;
1141 print "~";
1146 =head2 $success = $obj->mirror_path ( $arrref | $path )
1148 If the argument is a scalar it is treated as a path. The remote path
1149 is mirrored into the local copy. $path is the path found in the
1150 I<recentfile>, i.e. it is relative to the root directory of the
1151 mirror.
1153 If the argument is an array reference then all elements are treated as
1154 a path below the current tree and all are rsynced with a single
1155 command (and a single connection).
1157 =cut
1159 sub mirror_path {
1160 my($self,$path) = @_;
1161 # XXX simplify the two branches such that $path is treated as
1162 # [$path] maybe even demand the argument as an arrayref to
1163 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1164 # interface)
1165 if (ref $path and ref $path eq "ARRAY") {
1166 my $dst = $self->localroot;
1167 mkpath dirname $dst;
1168 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1169 lc $self->filenameroot,
1171 TMPDIR => 1,
1172 UNLINK => 0,
1174 for my $p (@$path) {
1175 print $fh $p, "\n";
1177 $fh->flush;
1178 $fh->unlink_on_destroy(1);
1179 my $gaveup = 0;
1180 my $retried = 0;
1181 while (!$self->rsync->exec
1183 src => join("/",
1184 $self->remoteroot,
1186 dst => $dst,
1187 'files-from' => $fh->filename,
1188 )) {
1189 my($err) = $self->rsync->err;
1190 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1191 if ($self->verbose) {
1192 warn "Info: ignoring link_stat error '$err'";
1194 return 1;
1196 $self->register_rsync_error ($err);
1197 if (++$retried >= 3) {
1198 warn "XXX giving up.";
1199 $gaveup = 1;
1200 last;
1203 unless ($gaveup) {
1204 $self->un_register_rsync_error ();
1206 } else {
1207 my $dst = $self->local_path($path);
1208 mkpath dirname $dst;
1209 while (!$self->rsync->exec
1211 src => join("/",
1212 $self->remoteroot,
1213 $path
1215 dst => $dst,
1216 )) {
1217 my($err) = $self->rsync->err;
1218 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1219 if ($self->verbose) {
1220 warn "Info: ignoring link_stat error '$err'";
1222 return 1;
1224 $self->register_rsync_error ($err);
1226 $self->un_register_rsync_error ();
1228 return 1;
1231 sub _my_current_rfile {
1232 my($self) = @_;
1233 my $rfile;
1234 if ($self->_use_tempfile) {
1235 $rfile = $self->_current_tempfile;
1236 } else {
1237 $rfile = $self->rfile;
1239 return $rfile;
1242 =head2 $path = $obj->naive_path_normalize ($path)
1244 Takes an absolute unix style path as argument and canonicalizes it to
1245 a shorter path if possible, removing things like double slashes or
1246 C</./> and removes references to C<../> directories to get a shorter
1247 unambiguos path. This is used to make the code easier that determines
1248 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1250 =cut
1252 sub naive_path_normalize {
1253 my($self,$path) = @_;
1254 $path =~ s|/+|/|g;
1255 1 while $path =~ s|/[^/]+/\.\./|/|;
1256 $path =~ s|/$||;
1257 $path;
1260 =head2 $ret = $obj->read_recent_1 ( $data )
1262 Delegate of C<recent_events()> on protocol 1
1264 =cut
1266 sub read_recent_1 {
1267 my($self, $data) = @_;
1268 return $data->{recent};
1271 =head2 $array_ref = $obj->recent_events ( %options )
1273 Note: the code relies on the resource being written atomically. We
1274 cannot lock because we may have no write access. If the caller has
1275 write access (eg. aggregate() or update()), it has to care for any
1276 necessary locking.
1278 If $options{after} is specified, only file events after this timestamp
1279 are returned.
1281 If $options{before} is specified, only file events before this
1282 timestamp are returned.
1284 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1285 be returned.
1287 If $options{max} is specified only this many events are returned.
1289 If $options{info} is specified, it must be a hashref. This hashref
1290 will be filled with metadata about the unfiltered recent_events of
1291 this object, in key C<first> there is the first item, in key C<last>
1292 is the last.
1294 =cut
1296 sub recent_events {
1297 my ($self, %options) = @_;
1298 my $info = $options{info};
1299 if ($self->is_slave) {
1300 $self->get_remote_recentfile_as_tempfile;
1302 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1303 -e $rfile_or_tempfile or return [];
1304 my $suffix = $self->serializer_suffix;
1305 my ($data) = eval {
1306 $self->_try_deserialize
1308 $suffix,
1309 $rfile_or_tempfile,
1312 my $err = $@;
1313 if ($err or !$data) {
1314 return [];
1316 my $re;
1317 if (reftype $data eq 'ARRAY') { # protocol 0
1318 $re = $data;
1319 } else {
1320 $re = $self->_recent_events_protocol_x
1322 $data,
1323 $rfile_or_tempfile,
1326 return $re unless defined $options{after}; # XXX same for before and max
1327 my $last_item = $#$re;
1328 if ($info) {
1329 $info->{first} = $re->[0];
1330 $info->{last} = $re->[-1];
1332 if (defined $options{after}) {
1333 if ($re->[0]{epoch} > $options{after}) {
1334 if (
1335 my $f = first
1336 {$re->[$_]{epoch} <= $options{after}}
1337 0..$#$re
1339 $last_item = $f-1;
1341 } else {
1342 $last_item = -1;
1345 my $first_item = 0;
1346 if (defined $options{before}) {
1347 if ($re->[0]{epoch} > $options{before}) {
1348 if (
1349 my $f = first
1350 {$re->[$_]{epoch} < $options{before}}
1351 0..$last_item
1353 $first_item = $f;
1355 } else {
1356 $first_item = 0;
1359 my @rre = splice @$re, $first_item, 1+$last_item-$first_item;
1360 if ($options{'skip-deletes'}) {
1361 @rre = grep { $_->{type} ne "delete" } @rre;
1363 if ($options{max} && @rre > $options{max}) {
1364 @rre = splice @rre, 0, $options{max};
1366 \@rre;
1369 sub _recent_events_protocol_x {
1370 my($self,
1371 $data,
1372 $rfile_or_tempfile,
1373 ) = @_;
1374 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1375 # we may be reading meta for the first time
1376 while (my($k,$v) = each %{$data->{meta}}) {
1377 next if $k ne lc $k; # "Producers"
1378 next if defined $self->$k;
1379 $self->$k($v);
1381 my $re = $self->$meth ($data);
1382 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1383 my $minmax = { mtime => $stat[9] };
1384 if (@$re) {
1385 $minmax->{min} = $re->[-1]{epoch};
1386 $minmax->{max} = $re->[0]{epoch};
1388 $self->minmax ( $minmax );
1389 return $re;
1392 sub _try_deserialize {
1393 my($self,
1394 $suffix,
1395 $rfile_or_tempfile,
1396 ) = @_;
1397 if ($suffix eq ".yaml") {
1398 require YAML::Syck;
1399 YAML::Syck::LoadFile($rfile_or_tempfile);
1400 } elsif ($HAVE->{"Data::Serializer"}) {
1401 my $serializer = Data::Serializer->new
1402 ( serializer => $serializers{$suffix} );
1403 my $serialized = do
1405 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1406 local $/;
1407 <$fh>;
1409 $serializer->raw_deserialize($serialized);
1410 } else {
1411 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1415 =head2 $ret = $obj->rfilename
1417 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1418 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1420 =cut
1422 sub rfilename {
1423 my($self) = @_;
1424 my $file = sprintf("%s-%s%s",
1425 $self->filenameroot,
1426 $self->interval,
1427 $self->serializer_suffix,
1429 return $file;
1432 =head2 $str = $self->remote_dir
1434 The directory we are mirroring from.
1436 =cut
1438 sub remote_dir {
1439 my($self, $set) = @_;
1440 if (defined $set) {
1441 $self->_remote_dir ($set);
1443 my $x = $self->_remote_dir;
1444 $self->is_slave (1);
1445 return $x;
1448 =head2 $str = $obj->remoteroot
1450 =head2 (void) $obj->remoteroot ( $set )
1452 Get/Set the composed prefix needed when rsyncing from a remote module.
1453 If remote_host, remote_module, and remote_dir are set, it is composed
1454 from these.
1456 =cut
1458 sub remoteroot {
1459 my($self, $set) = @_;
1460 if (defined $set) {
1461 $self->_remoteroot($set);
1463 my $remoteroot = $self->_remoteroot;
1464 unless (defined $remoteroot) {
1465 $remoteroot = sprintf
1467 "%s%s%s",
1468 defined $self->remote_host ? ($self->remote_host."::") : "",
1469 defined $self->remote_module ? ($self->remote_module."/") : "",
1470 defined $self->remote_dir ? $self->remote_dir : "",
1472 $self->_remoteroot($remoteroot);
1474 return $remoteroot;
1477 =head2 my $rfile = $obj->rfile
1479 Returns the full path of the I<recentfile>
1481 =cut
1483 sub rfile {
1484 my($self) = @_;
1485 my $rfile = $self->_rfile;
1486 return $rfile if defined $rfile;
1487 $rfile = File::Spec->catfile
1488 ($self->localroot,
1489 $self->rfilename,
1491 $self->_rfile ($rfile);
1492 return $rfile;
1495 =head2 $rsync_obj = $obj->rsync
1497 The File::Rsync object that this object uses for communicating with an
1498 upstream server.
1500 =cut
1502 sub rsync {
1503 my($self) = @_;
1504 my $rsync = $self->_rsync;
1505 unless (defined $rsync) {
1506 my $rsync_options = $self->rsync_options || {};
1507 if ($HAVE->{"File::Rsync"}) {
1508 $rsync = File::Rsync->new($rsync_options);
1509 $self->_rsync($rsync);
1510 } else {
1511 die "File::Rsync required for rsync operations. Cannot continue";
1514 return $rsync;
1517 =head2 (void) $obj->register_rsync_error($err)
1519 =head2 (void) $obj->un_register_rsync_error()
1521 Register_rsync_error is called whenever the File::Rsync object fails
1522 on an exec (say, connection doesn't succeed). It issues a warning and
1523 sleeps for an increasing amount of time. Un_register_rsync_error
1524 resets the error count. See also accessor C<max_rsync_errors>.
1526 =cut
1529 my $no_success_count = 0;
1530 my $no_success_time = 0;
1531 sub register_rsync_error {
1532 my($self, $err) = @_;
1533 chomp $err;
1534 $no_success_time = time;
1535 $no_success_count++;
1536 my $max_rsync_errors = $self->max_rsync_errors;
1537 $max_rsync_errors = 12 unless defined $max_rsync_errors;
1538 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1539 require Carp;
1540 Carp::confess
1542 sprintf
1544 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1545 $self->interval,
1546 $err,
1547 $no_success_count,
1550 my $sleep = 12 * $no_success_count;
1551 $sleep = 120 if $sleep > 120;
1552 require Carp;
1553 Carp::cluck
1554 (sprintf
1556 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1557 scalar(localtime($no_success_time)),
1558 $self->interval,
1559 $err,
1560 $sleep,
1562 sleep $sleep
1564 sub un_register_rsync_error {
1565 my($self) = @_;
1566 $no_success_time = 0;
1567 $no_success_count = 0;
1571 =head2 $clone = $obj->_sparse_clone
1573 Clones just as much from itself that it does not hurt. Experimental method.
1575 =cut
1577 sub _sparse_clone {
1578 my($self) = @_;
1579 my $new = bless {}, ref $self;
1580 for my $m (qw(
1581 _interval
1582 _localroot
1583 _remoteroot
1584 _rfile
1585 _use_tempfile
1586 aggregator
1587 filenameroot
1588 is_slave
1589 max_files_per_connection
1590 protocol
1591 rsync_options
1592 serializer_suffix
1593 sleep_per_connection
1594 verbose
1595 )) {
1596 my $o = $self->$m;
1597 $o = Storable::dclone $o if ref $o;
1598 $new->$m($o);
1600 $new;
1603 =head2 $boolean = OBJ->ttl_reached ()
1605 =cut
1607 sub ttl_reached {
1608 my($self) = @_;
1609 my $have_mirrored = $self->have_mirrored;
1610 my $now = Time::HiRes::time;
1611 my $ttl = $self->ttl;
1612 $ttl = 24.2 unless defined $ttl;
1613 if ($now > $have_mirrored + $ttl) {
1614 return 1;
1616 return 0;
1619 =head2 (void) $obj->unlock()
1621 Unlocking is implemented with an C<rmdir> on a locking directory
1622 (C<.lock> appended to $rfile).
1624 =cut
1626 sub unlock {
1627 my($self) = @_;
1628 return unless $self->_is_locked;
1629 my $rfile = $self->rfile;
1630 rmdir "$rfile.lock";
1631 $self->_is_locked (0);
1634 =head2 $ret = $obj->update ($path, $type)
1636 Enter one file into the local I<recentfile>. $path is the (usually
1637 absolute) path. If the path is outside the I<our> tree, then it is
1638 ignored.
1640 $type is one of C<new> or C<delete>.
1642 The new file event is uhshifted to the array of recent_events and the
1643 array is shortened to the length of the timespan allowed. This is
1644 usually the timespan specified by the interval of this recentfile but
1645 as long as this recentfile has not been merged to another one, the
1646 timespan may grow without bounds.
1648 =cut
1650 sub update {
1651 my($self,$path,$type) = @_;
1652 die "update called without path argument" unless defined $path;
1653 die "update called without type argument" unless defined $type;
1654 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1655 my $canonmeth = $self->canonize;
1656 unless ($canonmeth) {
1657 $canonmeth = "naive_path_normalize";
1659 $path = $self->$canonmeth($path);
1660 my $lrd = $self->localroot;
1661 if ($path =~ s|^\Q$lrd\E||) {
1662 $path =~ s|^/||;
1663 my $interval = $self->interval;
1664 my $secs = $self->interval_secs();
1665 $self->lock;
1666 # you must calculate the time after having locked, of course
1667 my $epoch = Time::HiRes::time;
1668 my $recent = $self->recent_events;
1669 $recent ||= [];
1670 my $oldest_allowed = 0;
1671 if (my $merged = $self->merged) {
1672 # XXX _bigfloat!
1673 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
1674 } else {
1675 # as long as we are not merged at all, no limits!
1677 TRUNCATE: while (@$recent) {
1678 if ($recent->[-1]{epoch} < $oldest_allowed) { # XXX _bigfloatlt!
1679 pop @$recent;
1680 } else {
1681 last TRUNCATE;
1684 # remove older duplicates of this $path, irrespective of $type:
1685 $recent = [ grep { $_->{path} ne $path } @$recent ];
1687 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1688 $self->write_recent($recent);
1689 $self->_assert_symlink;
1690 $self->unlock;
1694 =head2 uptodate
1696 True if this object has mirrored the complete interval covered by the
1697 current recentfile.
1699 *** WIP ***
1701 =cut
1703 sub uptodate {
1704 my($self, $debug) = @_;
1705 if ($self->ttl_reached){
1706 if ($debug) {
1707 warn "ttl_reached returned true, so we are not uptodate";
1709 return 0 ;
1712 # look if recentfile has unchanged timestamp
1713 my $minmax = $self->minmax;
1714 if (exists $minmax->{mtime}) {
1715 my $rfile = $self->_my_current_rfile;
1716 my @stat = stat $rfile;
1717 my $mtime = $stat[9];
1718 if ($mtime > $minmax->{mtime}) {
1719 if ($debug) {
1720 warn "$mtime > $minmax->{mtime}, so we are not uptodate";
1722 return 0;
1723 } else {
1724 my $covered = $self->done->covered(@$minmax{qw(max min)});
1725 if ($debug) {
1726 warn "minmax covered[$covered], so we return that";
1728 return $covered;
1731 if ($debug) {
1732 warn "fallthrough, so not uptodate";
1734 return 0;
1737 =head2 $obj->write_recent ($recent_files_arrayref)
1739 Writes a I<recentfile> based on the current reflection of the current
1740 state of the tree limited by the current interval.
1742 =cut
1744 sub write_recent {
1745 my ($self,$recent) = @_;
1746 die "write_recent called without argument" unless defined $recent;
1747 my $meth = sprintf "write_%d", $self->protocol;
1748 $self->$meth($recent);
1751 =head2 $obj->write_0 ($recent_files_arrayref)
1753 Delegate of C<write_recent()> on protocol 0
1755 =cut
1757 sub write_0 {
1758 my ($self,$recent) = @_;
1759 my $rfile = $self->rfile;
1760 YAML::Syck::DumpFile("$rfile.new",$recent);
1761 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1764 =head2 $obj->write_1 ($recent_files_arrayref)
1766 Delegate of C<write_recent()> on protocol 1
1768 =cut
1770 sub write_1 {
1771 my ($self,$recent) = @_;
1772 my $rfile = $self->rfile;
1773 my $suffix = $self->serializer_suffix;
1774 my $data = {
1775 meta => $self->meta_data,
1776 recent => $recent,
1778 my $serialized;
1779 if ($suffix eq ".yaml") {
1780 $serialized = YAML::Syck::Dump($data);
1781 } elsif ($HAVE->{"Data::Serializer"}) {
1782 my $serializer = Data::Serializer->new
1783 ( serializer => $serializers{$suffix} );
1784 $serialized = $serializer->raw_serialize($data);
1785 } else {
1786 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1788 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
1789 print $fh $serialized;
1790 close $fh or die "Could not close '$rfile.new': $!";
1791 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1794 BEGIN {
1795 my @pod_lines =
1796 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1798 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1800 The idea is that we want to have a short file that records really
1801 recent changes. So that a fresh mirror can be kept fresh as long as
1802 the connectivity is given. Then we want longer files that record the
1803 history before. So when the mirror falls behind the update period
1804 reflected in the shortest file, it can complement the list of recent
1805 file events with the next one. And if this is not long enough we want
1806 another one, again a bit longer. And we want one that completes the
1807 history back to the oldest file. The index files do contain the
1808 complete list of current files. The larger an index file is the less
1809 often it is updated. For practical reasons adjacent files will often
1810 overlap a bit but this is neither necessary nor enforced. That's the
1811 basic idea. The following example represents a tree that has a few
1812 updates every day:
1814 RECENT.recent -> RECENT-1h.yaml
1815 RECENT-6h.yaml
1816 RECENT-1d.yaml
1817 RECENT-1M.yaml
1818 RECENT-1W.yaml
1819 RECENT-1Q.yaml
1820 RECENT-1Y.yaml
1821 RECENT-Z.yaml
1823 The first file is the principal file, in so far it is the one that is
1824 written first after a filesystem change. Usually a symlink links to it
1825 with a filename that has the same filenameroot and the suffix
1826 C<.recent>. On systems that do not support symlinks there is a plain
1827 copy maintained instead.
1829 The last file, the Z file, contains the complementary files that are
1830 in none of the other files. It does never contain C<deletes>. Besides
1831 this it serves the role of a recovery mechanism or spill over pond.
1832 When things go wrong, it's a valuable controlling instance to hold the
1833 differences between the collection of limited interval files and the
1834 actual filesystem.
1836 =head2 A SINGLE RECENTFILE
1838 A I<recentfile> consists of a hash that has two keys: C<meta> and
1839 C<recent>. The C<meta> part has metadata and the C<recent> part has a
1840 list of fileobjects.
1842 =head2 THE META PART
1844 Here we find things that are pretty much self explaining: all
1845 lowercase attributes are accessors and as such explained somewhere
1846 above in this manpage. The uppercase attribute C<Producers> contains
1847 version information about involved software components. Nothing to
1848 worry about as I believe.
1850 =head2 THE RECENT PART
1852 This is the interesting part. Every entry refers to some filesystem
1853 change (with path, epoch, type). The epoch value is the point in time
1854 when some change was I<registered>. Do not be tempted to believe that
1855 the entry has a direct relation to something like modification time or
1856 change time on the filesystem level. The timestamp (I<epoch> element)
1857 is a floating point number and does practically never correspond
1858 exactly to the data recorded in the filesystem but rather to the time
1859 when some process succeeded to report to the I<recentfile> mechanism
1860 that something has changed. This is why many parts of the code refer
1861 to I<events>, because we merely try to record the I<event> of the
1862 discovery of a change, not the time of the change itself.
1864 All these entries can be devided into two types (denoted by the
1865 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
1866 C<new>s. Deletes are C<delete>s.
1868 Another distinction is for objects with an epoch timestamp and others
1869 without. All files that were already existing on the filesystem before
1870 the I<recentfile> mechanism was installed, get recorded with a
1871 timestamp of zero.
1873 Besides an C<epoch> and a C<type> attribute we find a third one:
1874 C<path>. This path is relative to the directory we find the
1875 I<recentfile> in.
1877 The order of the entries in the I<recentfile> is by decreasing epoch
1878 attribute. These are either 0 or a unique floating point number. They
1879 are zero for events that were happening either before the time that
1880 the I<recentfile> mechanism was set up or were left undiscovered for a
1881 while and never handed over to update(). They are floating point
1882 numbers for all events being regularly handed to update(). And when
1883 the server has ntp running correctly, then the timestamps are
1884 actually decreasing and unique.
1886 =head1 CORRUPTION AND RECOVERY
1888 If the origin host breaks the promise to deliver consistent and
1889 complete I<recentfiles> then the way back to sanity shall be achieved
1890 through either the C<zloop> (still TBD) or traditional rsyncing
1891 between the hosts. For example, if the origin server forgets to deploy
1892 ntp and the clock on it jumps backwards some day, then this would
1893 probably go unnoticed for a while and many software components that
1894 rely on the time never running backwards will make wrong decisions.
1895 After some time this accident would probably still be found in one of
1896 the I<recentfiles> but would become meaningless as soon as a mirror
1897 has run through the sanitizing procedures. Same goes for origin hosts
1898 that forget to include or deliberately omit some files.
1900 =head1 SERIALIZERS
1902 The following suffixes are supported and trigger the use of these
1903 serializers:
1905 =over 4
1907 =item C<< ".yaml" => "YAML::Syck" >>
1909 =item C<< ".json" => "JSON" >>
1911 =item C<< ".sto" => "Storable" >>
1913 =item C<< ".dd" => "Data::Dumper" >>
1915 =back
1917 =cut
1919 BEGIN {
1920 my @pod_lines =
1921 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1923 =head1 INTERVAL SPEC
1925 An interval spec is a primitive way to express time spans. Normally it
1926 is composed from an integer and a letter.
1928 As a special case, a string that consists only of the single letter
1929 C<Z>, stands for unlimited time.
1931 The following letters express the specified number of seconds:
1933 =over 4
1935 =item C<< s => 1 >>
1937 =item C<< m => 60 >>
1939 =item C<< h => 60*60 >>
1941 =item C<< d => 60*60*24 >>
1943 =item C<< W => 60*60*24*7 >>
1945 =item C<< M => 60*60*24*30 >>
1947 =item C<< Q => 60*60*24*90 >>
1949 =item C<< Y => 60*60*24*365.25 >>
1951 =back
1953 =cut
1955 =head1 BACKGROUND
1957 This is about speeding up rsync operation on large trees to many
1958 places. Uses a small metadata cocktail and pull technology.
1960 =head2 NON-COMPETITORS
1962 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
1963 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
1964 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
1965 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
1966 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
1967 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
1969 rsnapshot www.rsnapshot.org focus on backup
1970 csync www.csync.org more like unison
1972 =head2 COMPETITORS
1974 The problem to solve which clusters and ftp mirrors and otherwise
1975 replicated datasets like CPAN share: how to transfer only a minimum
1976 amount of data to determine the diff between two hosts.
1978 Normally it takes a long time to determine the diff itself before it
1979 can be transferred. Known solutions at the time of this writing are
1980 csync2, and rsync 3 batch mode.
1982 For many years the best solution was csync2 which solves the
1983 problem by maintining a sqlite database on both ends and talking a
1984 highly sophisticated protocol to quickly determine which files to send
1985 and which to delete at any given point in time. Csync2 is often
1986 inconvenient because the act of syncing demands quite an intimate
1987 relationship between the sender and the receiver and suffers when the
1988 number of syncing sites is large or connections are unreliable.
1990 Rsync 3 batch mode works around these problems by providing rsync-able
1991 batch files which allow receiving nodes to replay the history of the
1992 other nodes. This reduces the need to have an incestuous relation but
1993 it has the disadvantage that these batch files replicate the contents
1994 of the involved files. This seems inappropriate when the nodes already
1995 have a means of communicating over rsync.
1997 rersyncrecent solves this problem with a couple of (usually 2-10)
1998 index files which cover different overlapping time intervals. The
1999 master writes these files and the clients can construct the full tree
2000 from the information contained in them. The most recent index file
2001 usually covers the last seconds or minutes or hours of the tree and
2002 depending on the needs, slaves can rsync every few seconds and then
2003 bring their trees in full sync.
2005 The rersyncrecent mode was developed for CPAN but I hope it is a
2006 convenient and economic general purpose solution. I'm looking forward
2007 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2008 then ... the first FUSE based CPAN filesystem anyone?
2010 =head1 AUTHOR
2012 Andreas König
2014 =head1 BUGS
2016 Please report any bugs or feature requests through the web interface
2018 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2019 I will be notified, and then you'll automatically be notified of
2020 progress on your bug as I make changes.
2022 =head1 SUPPORT
2024 You can find documentation for this module with the perldoc command.
2026 perldoc File::Rsync::Mirror::Recentfile
2028 You can also look for information at:
2030 =over 4
2032 =item * RT: CPAN's request tracker
2034 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2036 =item * AnnoCPAN: Annotated CPAN documentation
2038 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2040 =item * CPAN Ratings
2042 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2044 =item * Search CPAN
2046 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2048 =back
2051 =head1 ACKNOWLEDGEMENTS
2053 Thanks to RJBS for module-starter.
2055 =head1 COPYRIGHT & LICENSE
2057 Copyright 2008 Andreas König.
2059 This program is free software; you can redistribute it and/or modify it
2060 under the same terms as Perl itself.
2063 =cut
2065 1; # End of File::Rsync::Mirror::Recentfile
2067 # Local Variables:
2068 # mode: cperl
2069 # cperl-indent-level: 4
2070 # End: