making contains testable with rrr-news; fixing bug: we lost the options with the...
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blobeb09fd9e8eae847eb7887a3935e401b4136d5d57
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 my $HAVE = {};
19 for my $package (
20 "Data::Serializer",
21 "File::Rsync"
22 ) {
23 $HAVE->{$package} = eval qq{ require $package; };
25 use Config;
26 use File::Basename qw(basename dirname fileparse);
27 use File::Copy qw(cp);
28 use File::Path qw(mkpath);
29 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
30 use File::Temp;
31 use List::Util qw(first min);
32 use Scalar::Util qw(reftype);
33 use Storable;
34 use Time::HiRes qw();
35 use YAML::Syck;
37 use version; our $VERSION = qv('0.0.1');
40 use constant MAX_INT => ~0>>1; # anything better?
41 use constant DEFAULT_PROTOCOL => 1;
43 # cf. interval_secs
44 my %seconds;
46 # maybe subclass if this mapping is bad?
47 my %serializers;
49 =head1 SYNOPSIS
51 B<!!!! PRE-ALPHA ALERT !!!!>
53 Nothing in here is believed to be stable, nothing yet intended for
54 public consumption. The plan is to provide a script in one of the next
55 releases that acts as a frontend for all the backend functionality.
56 Option and method names will very likely change.
58 For the rationale see the section BACKGROUND.
60 This is published only for developers of the (yet to be named)
61 script(s).
63 Writer (of a single file):
65 use File::Rsync::Mirror::Recentfile;
66 my $fr = File::Rsync::Mirror::Recentfile->new
68 interval => q(6h),
69 filenameroot => "RECENT",
70 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
71 localroot => "/home/ftp/pub/PAUSE/authors/",
72 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
74 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
76 Reader/mirrorer:
78 my $rf = File::Rsync::Mirror::Recentfile->new
80 filenameroot => "RECENT",
81 ignore_link_stat_errors => 1,
82 interval => q(6h),
83 localroot => "/home/ftp/pub/PAUSE/authors",
84 remote_dir => "",
85 remote_host => "pause.perl.org",
86 remote_module => "authors",
87 rsync_options => {
88 compress => 1,
89 'rsync-path' => '/usr/bin/rsync',
90 links => 1,
91 times => 1,
92 'omit-dir-times' => 1,
93 checksum => 1,
95 verbose => 1,
97 $rf->mirror;
99 Aggregator (usually the writer):
101 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
102 $rf->aggregate;
104 =head1 EXPORT
106 No exports.
108 =head1 CONSTRUCTORS / DESTRUCTOR
110 =head2 my $obj = CLASS->new(%hash)
112 Constructor. On every argument pair the key is a method name and the
113 value is an argument to that method name.
115 If a recentfile for this resource already exists, metadata that are
116 not defined by the constructor will be fetched from there as soon as
117 it is being read by recent_events().
119 =cut
121 sub new {
122 my($class, @args) = @_;
123 my $self = bless {}, $class;
124 while (@args) {
125 my($method,$arg) = splice @args, 0, 2;
126 $self->$method($arg);
128 unless (defined $self->protocol) {
129 $self->protocol(DEFAULT_PROTOCOL);
131 unless (defined $self->filenameroot) {
132 $self->filenameroot("RECENT");
134 unless (defined $self->serializer_suffix) {
135 $self->serializer_suffix(".yaml");
137 return $self;
140 =head2 my $obj = CLASS->new_from_file($file)
142 Constructor. $file is a I<recentfile>.
144 =cut
146 sub new_from_file {
147 my($class, $file) = @_;
148 my $self = bless {}, $class;
149 $self->_rfile($file);
150 #?# $self->lock;
151 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
152 local $/;
153 <$fh>;
155 # XXX: we can skip this step when the metadata are sufficient, but
156 # we cannot parse the file without some magic stuff about
157 # serialized formats
158 while (-l $file) {
159 my($name,$path) = fileparse $file;
160 my $symlink = readlink $file;
161 if ($symlink =~ m|/|) {
162 die "FIXME: filenames containing '/' not supported, got $symlink";
164 $file = File::Spec->catfile ( $path, $symlink );
166 my($name,$path,$suffix) = fileparse $file, keys %serializers;
167 $self->serializer_suffix($suffix);
168 $self->localroot($path);
169 die "Could not determine file format from suffix" unless $suffix;
170 my $deserialized;
171 if ($suffix eq ".yaml") {
172 require YAML::Syck;
173 $deserialized = YAML::Syck::LoadFile($file);
174 } elsif ($HAVE->{"Data::Serializer"}) {
175 my $serializer = Data::Serializer->new
176 ( serializer => $serializers{$suffix} );
177 $deserialized = $serializer->raw_deserialize($serialized);
178 } else {
179 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
181 while (my($k,$v) = each %{$deserialized->{meta}}) {
182 next if $k ne lc $k; # "Producers"
183 $self->$k($v);
185 unless (defined $self->protocol) {
186 $self->protocol(DEFAULT_PROTOCOL);
188 return $self;
191 =head2 DESTROY
193 A simple unlock.
195 =cut
196 sub DESTROY { shift->unlock }
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_delayed_operations",
209 "_done",
210 "_interval",
211 "_is_locked",
212 "_localroot",
213 "_merged",
214 "_pathdb",
215 "_remember_last_uptodate_call",
216 "_remote_dir",
217 "_remoteroot",
218 "_rfile",
219 "_rsync",
220 "_seeded",
221 "_uptodateness_ever_reached",
222 "_use_tempfile",
225 my @pod_lines =
226 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
228 =over 4
230 =item aggregator
232 A list of interval specs that tell the aggregator which I<recentfile>s
233 are to be produced.
235 =item canonize
237 The name of a method to canonize the path before rsyncing. Only
238 supported value is C<naive_path_normalize>. Defaults to that.
240 =item comment
242 A comment about this tree and setup.
244 =item dirtymark
246 A timestamp. The dirtymark is updated whenever an out of band change
247 on the origin server is performed that violates the protocol. Say,
248 they add or remove files in the middle somewhere. Slaves must react
249 with a devaluation of their C<done> structure which then leads to a
250 full re-sync of all files.
252 =item filenameroot
254 The (prefix of the) filename we use for this I<recentfile>. Defaults to
255 C<RECENT>.
257 =item have_mirrored
259 Timestamp remembering when we mirrored this recentfile the last time.
260 Only relevant for slaves.
262 =item ignore_link_stat_errors
264 If set to true, rsync errors are ignored that complain about link stat
265 errors. These seem to happen only when there are files missing at the
266 origin. In race conditions this can always happen, so it is
267 recommended to set this value to true.
269 =item is_slave
271 If set to true, this object will fetch a new recentfile from remote
272 when the timespan between the last mirror (see have_mirrored) and now
273 is too large (currently hardcoded arbitrary 420 seconds).
275 =item locktimeout
277 After how many seconds shall we die if we cannot lock a I<recentfile>?
278 Defaults to 600 seconds.
280 =item loopinterval
282 When mirror_loop is called, this accessor can specify how much time
283 every loop shall at least take. If the work of a loop is done before
284 that time has gone, sleeps for the rest of the time. Defaults to
285 arbitrary 42 seconds.
287 =item max_files_per_connection
289 Maximum number of files that are transferred on a single rsync call.
290 Setting it higher means higher performance at the price of holding
291 connections longer and potentially disturbing other users in the pool.
292 Defaults to the arbitrary value 42.
294 =item max_rsync_errors
296 When rsync operations encounter that many errors without any resetting
297 success in between, then we die. Defaults to unlimited. A value of
298 -1 means we run forever ignoring all rsync errors.
300 =item minmax
302 Hashref remembering when we read the recent_events from this file the
303 last time and what the timespan was.
305 =item protocol
307 When the RECENT file format changes, we increment the protocol. We try
308 to support older protocols in later releases.
310 =item remote_host
312 The host we are mirroring from. Leave empty for the local filesystem.
314 =item remote_module
316 Rsync servers have so called modules to separate directory trees from
317 each other. Put here the name of the module under which we are
318 mirroring. Leave empty for local filesystem.
320 =item rsync_options
322 Things like compress, links, times or checksums. Passed in to the
323 File::Rsync object used to run the mirror.
325 =item serializer_suffix
327 Mostly untested accessor. The only well tested format for
328 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
329 Data::Serializer. But in principle other formats are supported as
330 well. See section SERIALIZERS below.
332 =item sleep_per_connection
334 Sleep that many seconds (floating point OK) after every chunk of rsyncing
335 has finished. Defaults to arbitrary 0.42.
337 =item ttl
339 Time to live. Number of seconds after which this recentfile must be
340 fetched again from the origin server. Only relevant for slaves.
341 Defaults to arbitrary 24.2 seconds.
343 =item verbose
345 Boolean to turn on a bit verbosity.
347 =back
349 =cut
351 use accessors @accessors;
353 =head1 METHODS
355 =head2 (void) $obj->aggregate
357 Takes all intervals that are collected in the accessor called
358 aggregator. Sorts them by actual length of the interval.
359 Removes those that are shorter than our own interval. Then merges this
360 object into the next larger object. The merging continues upwards
361 as long as the next I<recentfile> is old enough to warrant a merge.
363 If a merge is warranted is decided according to the interval of the
364 previous interval so that larger files are not so often updated as
365 smaller ones.
367 Here is an example to illustrate the behaviour. Given aggregators
369 1h 1d 1W 1M 1Q 1Y Z
371 then
373 1h updates 1d on every call to aggregate()
374 1d updates 1W earliest after 1h
375 1W updates 1M earliest after 1d
376 1M updates 1Q earliest after 1W
377 1Q updates 1Y earliest after 1M
378 1Y updates Z earliest after 1Q
380 Note that all but the smallest recentfile get updated at an arbitrary
381 rate and as such are quite useless on their own.
383 =cut
385 sub aggregate {
386 my($self) = @_;
387 my @aggs = sort { $a->{secs} <=> $b->{secs} }
388 grep { $_->{secs} >= $self->interval_secs }
389 map { { interval => $_, secs => $self->interval_secs($_)} }
390 $self->interval, @{$self->aggregator || []};
391 $aggs[0]{object} = $self;
392 AGGREGATOR: for my $i (0..$#aggs-1) {
393 my $this = $aggs[$i]{object};
394 my $next = $this->_sparse_clone;
395 $next->interval($aggs[$i+1]{interval});
396 my $want_merge = 0;
397 if ($i == 0) {
398 $want_merge = 1;
399 } else {
400 my $next_rfile = $next->rfile;
401 if (-e $next_rfile) {
402 my $prev = $aggs[$i-1]{object};
403 local $^T = time;
404 my $next_age = 86400 * -M $next_rfile;
405 if ($next_age > $prev->interval_secs) {
406 $want_merge = 1;
408 } else {
409 $want_merge = 1;
412 if ($want_merge) {
413 $next->merge($this);
414 $aggs[$i+1]{object} = $next;
415 } else {
416 last AGGREGATOR;
421 # collect file size and mtime for all files of this aggregate
422 sub _debug_aggregate {
423 my($self) = @_;
424 my @aggs = sort { $a->{secs} <=> $b->{secs} }
425 map { { interval => $_, secs => $self->interval_secs($_)} }
426 $self->interval, @{$self->aggregator || []};
427 my $report = [];
428 for my $i (0..$#aggs) {
429 my $this = Storable::dclone $self;
430 $this->interval($aggs[$i]{interval});
431 my $rfile = $this->rfile;
432 my @stat = stat $rfile;
433 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
435 $report;
438 # (void) $self->_assert_symlink()
439 sub _assert_symlink {
440 my($self) = @_;
441 my $recentrecentfile = File::Spec->catfile
443 $self->localroot,
444 sprintf
446 "%s.recent",
447 $self->filenameroot
450 if ($Config{d_symlink} eq "define") {
451 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
452 if (-l $recentrecentfile) {
453 my $found_symlink = readlink $recentrecentfile;
454 if ($found_symlink eq $self->rfilename) {
455 return;
456 } else {
457 $howto_create_symlink = 2;
459 } else {
460 $howto_create_symlink = 1;
462 if (1 == $howto_create_symlink) {
463 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
464 } else {
465 unlink "$recentrecentfile.$$"; # may fail
466 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
467 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
469 } else {
470 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
471 unlink "$recentrecentfile.$$"; # may fail
472 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
473 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
477 =head2 $hashref = $obj->delayed_operations
479 A hash of hashes containing unlink and rmdir operations which had to
480 wait until the recentfile got unhidden in order to not confuse
481 downstream mirrors (in case we have some).
483 =cut
485 sub delayed_operations {
486 my($self) = @_;
487 my $x = $self->_delayed_operations;
488 unless (defined $x) {
489 $x = {
490 unlink => {},
491 rmdir => {},
493 $self->_delayed_operations ($x);
495 return $x;
498 =head2 $done = $obj->done
500 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
501 that keeps track of rsync activities. Only needed and used when we are
502 a mirroring slave.
504 =cut
506 sub done {
507 my($self) = @_;
508 my $done = $self->_done;
509 if (!$done) {
510 require File::Rsync::Mirror::Recentfile::Done;
511 $done = File::Rsync::Mirror::Recentfile::Done->new();
512 $done->_rfinterval ($self->interval);
513 $self->_done ( $done );
515 return $done;
518 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
520 Stores the remote I<recentfile> locally as a tempfile. The caller is
521 responsible to remove the file after use.
523 Note: if you're intending to act as an rsync server for other slaves,
524 then you must prefer this method to fetch that file with
525 get_remotefile(). Otherwise downstream mirrors would expect you to
526 already have mirrored all the files that are in the I<recentfile>
527 before you have them mirrored.
529 =cut
531 sub get_remote_recentfile_as_tempfile {
532 my($self) = @_;
533 mkpath $self->localroot;
534 my $fh;
535 my $trfilename;
536 if ( $self->_use_tempfile() ) {
537 return $self->_current_tempfile if ! $self->ttl_reached;
538 $fh = $self->_current_tempfile_fh;
539 $trfilename = $self->rfilename;
540 } else {
541 $trfilename = $self->rfilename;
544 my $dst;
545 if ($fh) {
546 $dst = $self->_current_tempfile;
547 } else {
548 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
549 $dst = $fh->filename;
550 $self->_current_tempfile ($dst);
551 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
552 if (defined $rfile && -e $rfile) {
553 # saving on bandwidth. Might need to be configurable
554 # $self->bandwidth_is_cheap?
555 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
558 my $src = join ("/",
559 $self->remoteroot,
560 $trfilename,
562 if ($self->verbose) {
563 my $doing = -e $dst ? "Sync" : "Get";
564 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
565 printf STDERR
567 "%-4s %d (1/1/%s) temp %s ... ",
568 $doing,
569 time,
570 $self->interval,
571 $display_dst,
574 my $gaveup = 0;
575 my $retried = 0;
576 while (!$self->rsync->exec(
577 src => $src,
578 dst => $dst,
579 )) {
580 $self->register_rsync_error ($self->rsync->err);
581 if (++$retried >= 3) {
582 warn "XXX giving up";
583 $gaveup = 1;
584 last;
587 if ($gaveup) {
588 printf STDERR "Warning: gave up mirroring %s, will try again later", $self->interval;
589 } else {
590 $self->_refresh_internals ($dst);
591 $self->have_mirrored (Time::HiRes::time);
592 $self->un_register_rsync_error ();
594 if ($self->verbose) {
595 print STDERR "DONE\n";
597 my $mode = 0644;
598 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
599 return $dst;
602 sub _get_remote_rat_provide_tempfile_object {
603 my($self, $trfilename) = @_;
604 my $fh = File::Temp->new
605 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
606 $trfilename,
608 DIR => $self->localroot,
609 SUFFIX => $self->serializer_suffix,
610 UNLINK => $self->_use_tempfile,
612 if ($self->_use_tempfile) {
613 $self->_current_tempfile_fh ($fh); # delay self destruction
615 return $fh;
618 =head2 $localpath = $obj->get_remotefile ( $relative_path )
620 Rsyncs one single remote file to local filesystem.
622 Note: no locking is done on this file. Any number of processes may
623 mirror this object.
625 Note II: do not use for recentfiles. If you are a cascading
626 slave/server combination, it would confuse other slaves. They would
627 expect the contents of these recentfiles to be available. Use
628 get_remote_recentfile_as_tempfile() instead.
630 =cut
632 sub get_remotefile {
633 my($self, $path) = @_;
634 my $dst = File::Spec->catfile($self->localroot, $path);
635 mkpath dirname $dst;
636 if ($self->verbose) {
637 my $doing = -e $dst ? "Sync" : "Get";
638 printf STDERR
640 "%-4s %d (1/1/%s) %s ... ",
641 $doing,
642 time,
643 $self->interval,
644 $path,
647 while (!$self->rsync->exec(
648 src => join("/",
649 $self->remoteroot,
650 $path),
651 dst => $dst,
652 )) {
653 $self->register_rsync_error ($self->rsync->err);
655 $self->un_register_rsync_error ();
656 if ($self->verbose) {
657 print STDERR "DONE\n";
659 return $dst;
662 =head2 $obj->interval ( $interval_spec )
664 Get/set accessor. $interval_spec is a string and described below in
665 the section INTERVAL SPEC.
667 =cut
669 sub interval {
670 my ($self, $interval) = @_;
671 if (@_ >= 2) {
672 $self->_interval($interval);
673 $self->_rfile(undef);
675 $interval = $self->_interval;
676 unless (defined $interval) {
677 # do not ask the $self too much, it recurses!
678 require Carp;
679 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
681 return $interval;
684 =head2 $secs = $obj->interval_secs ( $interval_spec )
686 $interval_spec is described below in the section INTERVAL SPEC. If
687 empty defaults to the inherent interval for this object.
689 =cut
691 sub interval_secs {
692 my ($self, $interval) = @_;
693 $interval ||= $self->interval;
694 unless (defined $interval) {
695 die "interval_secs() called without argument on an object without a declared one";
697 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
698 die "Could not determine seconds from interval[$interval]";
699 if ($interval eq "Z") {
700 return MAX_INT;
701 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
702 return $seconds{$t}*$n;
703 } else {
704 die "Invalid interval specification: n[$n]t[$t]";
708 =head2 $obj->localroot ( $localroot )
710 Get/set accessor. The local root of the tree.
712 =cut
714 sub localroot {
715 my ($self, $localroot) = @_;
716 if (@_ >= 2) {
717 $self->_localroot($localroot);
718 $self->_rfile(undef);
720 $localroot = $self->_localroot;
723 =head2 $ret = $obj->local_path($path_found_in_recentfile)
725 Combines the path to our local mirror and the path of an object found
726 in this I<recentfile>. In other words: the target of a mirror operation.
728 Implementation note: We split on slashes and then use
729 File::Spec::catfile to adjust to the local operating system.
731 =cut
733 sub local_path {
734 my($self,$path) = @_;
735 unless (defined $path) {
736 # seems like a degenerated case
737 return $self->localroot;
739 my @p = split m|/|, $path;
740 File::Spec->catfile($self->localroot,@p);
743 =head2 (void) $obj->lock
745 Locking is implemented with an C<mkdir> on a locking directory
746 (C<.lock> appended to $rfile).
748 =cut
750 sub lock {
751 my ($self) = @_;
752 # not using flock because it locks on filehandles instead of
753 # old school ressources.
754 my $locked = $self->_is_locked and return;
755 my $rfile = $self->rfile;
756 # XXX need a way to allow breaking the lock
757 my $start = time;
758 my $locktimeout = $self->locktimeout || 600;
759 while (not mkdir "$rfile.lock") {
760 Time::HiRes::sleep 0.01;
761 if (time - $start > $locktimeout) {
762 die "Could not acquire lockdirectory '$rfile.lock': $!";
765 $self->_is_locked (1);
768 =head2 (void) $obj->merge ($other)
770 Bulk update of this object with another one. It's used to merge a
771 smaller and younger $other object into the current one. If this file
772 is a C<Z> file, then we do not merge in objects of type C<delete>. But
773 if we encounter an object of type delete we delete the corresponding
774 C<new> object if we have it.
776 If there is nothing to be merged, nothing is done.
778 =cut
780 sub merge {
781 my($self, $other) = @_;
782 $self->_merge_sanitycheck ( $other );
783 $other->lock;
784 my $other_recent = $other->recent_events || [];
785 $self->lock;
786 my $my_recent = $self->recent_events || [];
788 # calculate the target time span
789 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
790 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
791 my $oldest_allowed = 0;
792 my $something_done;
793 unless ($my_recent->[0]) {
794 # obstetrics
795 $something_done=1;
797 if ($epoch) {
798 if (my $merged = $self->merged) {
799 my $secs = $self->interval_secs();
800 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
802 # throw away outsiders
803 # XXX _bigfloat!
804 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
805 pop @$my_recent;
806 $something_done=1;
810 my %have;
811 my $recent = [];
812 for my $oev (@$other_recent) {
813 my $oevepoch = $oev->{epoch} || 0;
814 next if $oevepoch < $oldest_allowed;
815 my $path = $oev->{path};
816 next if $have{$path}++;
817 if ( $self->interval eq "Z"
818 and $oev->{type} eq "delete") {
819 # do nothing
820 } else {
821 if (!$myepoch || $oevepoch > $myepoch) {
822 $something_done=1;
824 push @$recent, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
827 if ($something_done) {
828 $self->_merge_something_done ($recent, $my_recent, $other_recent, $other, \%have, $epoch);
830 $self->unlock;
831 $other->unlock;
834 sub _merge_something_done {
835 my($self, $recent, $my_recent, $other_recent, $other, $have, $epoch) = @_;
836 push @$recent, grep { !$have->{$_->{path}}++ } @$my_recent;
837 if (_bigfloatgt($other->dirtymark, $self->dirtymark)) {
838 $self->dirtymark ( $other->dirtymark );
840 $self->write_recent($recent);
841 $other->merged({
842 time => Time::HiRes::time, # not used anywhere
843 epoch => $recent->[0]{epoch},
844 into_interval => $self->interval, # not used anywhere
846 $other->write_recent($other_recent);
849 sub _merge_sanitycheck {
850 my($self, $other) = @_;
851 if ($self->interval_secs <= $other->interval_secs) {
852 die sprintf
854 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
855 $self->interval_secs,
856 $other->interval_secs,
861 =head2 merged
863 Hashref denoting when this recentfile has been merged into some other
864 at which epoch.
866 =cut
868 sub merged {
869 my($self, $set) = @_;
870 if (defined $set) {
871 $self->_merged ($set);
873 my $merged = $self->_merged;
874 my $into;
875 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
876 if ($into eq $self->interval) {
877 require Carp;
878 Carp::cluck(sprintf
880 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
881 $into,
882 $self->interval,
884 } elsif ($self->interval_secs($into) < $self->interval_secs) {
885 require Carp;
886 Carp::cluck(sprintf
888 "Warning: into_interval[%s] smaller than own interval[%s] on interval[%s]. Danger ahead.",
889 $self->interval_secs($into),
890 $self->interval_secs,
891 $self->interval,
895 $merged;
898 =head2 $hashref = $obj->meta_data
900 Returns the hashref of metadata that the server has to add to the
901 I<recentfile>.
903 =cut
905 sub meta_data {
906 my($self) = @_;
907 my $ret = $self->{meta};
908 for my $m (
909 "aggregator",
910 "canonize",
911 "comment",
912 "dirtymark",
913 "filenameroot",
914 "merged",
915 "interval",
916 "protocol",
917 "serializer_suffix",
919 my $v = $self->$m;
920 if (defined $v) {
921 $ret->{$m} = $v;
924 # XXX need to reset the Producer if I am a writer, keep it when I
925 # am a reader
926 $ret->{Producers} ||= {
927 __PACKAGE__, "$VERSION", # stringified it looks better
928 '$0', $0,
929 'time', Time::HiRes::time,
931 $ret->{dirtymark} ||= Time::HiRes::time;
932 return $ret;
935 =head2 $success = $obj->mirror ( %options )
937 Mirrors the files in this I<recentfile> as reported by
938 C<recent_events>. Options named C<after>, C<before>, C<max>, and
939 C<skip-deletes> are passed through to the L<recent_events> call. The
940 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
941 C<max_files_per_connection> and keep track of the rsynced files so
942 that future calls will rsync different files until all files are
943 brought to sync.
945 =cut
947 sub mirror {
948 my($self, %options) = @_;
949 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
950 $self->_use_tempfile (1);
951 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
952 my ($recent_events) = $self->recent_events(%passthrough);
953 my(@error, @xcollector);
954 my $first_item = 0;
955 my $last_item = $#$recent_events;
956 my $done = $self->done;
957 my $pathdb = $self->_pathdb;
958 ITEM: for my $i ($first_item..$last_item) {
959 my $status = +{};
960 $self->_mirror_item
963 $recent_events,
964 $last_item,
965 $done,
966 $pathdb,
967 \@xcollector,
968 \%options,
969 $status,
970 \@error,
972 last if $i == $last_item;
973 return if $status->{mustreturn};
975 if (@xcollector) {
976 my $success = eval { $self->_mirror_empty_xcollector (\@xcollector,$pathdb,$recent_events);};
977 if (!$success || $@) {
978 warn "Warning: Unknown error while mirroring: $@";
979 push @error, $@;
980 sleep 1;
983 if ($self->verbose) {
984 print STDERR "DONE\n";
986 # once we've gone to the end we consider ourselve free of obligations
987 $self->unseed;
988 $self->_mirror_unhide_tempfile ($trecentfile);
989 $self->_mirror_perform_delayed_ops;
990 return !@error;
993 sub _mirror_item {
994 my($self,
996 $recent_events,
997 $last_item,
998 $done,
999 $pathdb,
1000 $xcollector,
1001 $options,
1002 $status,
1003 $error,
1004 ) = @_;
1005 my $recent_event = $recent_events->[$i];
1006 return if $done->covered ( $recent_event->{epoch} );
1007 if ($pathdb) {
1008 my $rec = $pathdb->{$recent_event->{path}};
1009 if ($rec && $rec->{recentepoch}) {
1010 if (_bigfloatgt
1011 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1012 $done->register ($recent_events, [$i]);
1013 return;
1017 my $dst = $self->local_path($recent_event->{path});
1018 if ($recent_event->{type} eq "new"){
1019 $self->_mirror_item_new
1021 $dst,
1023 $last_item,
1024 $recent_events,
1025 $recent_event,
1026 $xcollector,
1027 $pathdb,
1028 $status,
1029 $error,
1030 $options,
1032 } elsif ($recent_event->{type} eq "delete") {
1033 my $activity;
1034 if ($options->{'skip-deletes'}) {
1035 $activity = "skipped";
1036 } else {
1037 if (! -e $dst) {
1038 $activity = "not_found";
1039 } elsif (-l $dst or not -d _) {
1040 $self->delayed_operations->{unlink}{$dst}++;
1041 $activity = "deleted";
1042 } else {
1043 $self->delayed_operations->{rmdir}{$dst}++;
1044 $activity = "deleted";
1047 $done->register ($recent_events, [$i]);
1048 if ($pathdb) {
1049 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1051 } else {
1052 warn "Warning: invalid upload type '$recent_event->{type}'";
1056 sub _mirror_item_new {
1057 my($self,
1058 $dst,
1060 $last_item,
1061 $recent_events,
1062 $recent_event,
1063 $xcollector,
1064 $pathdb,
1065 $status,
1066 $error,
1067 $options,
1068 ) = @_;
1069 if ($self->verbose) {
1070 my $doing = -e $dst ? "Sync" : "Get";
1071 printf STDERR
1073 "%-4s %d (%d/%d/%s) %s ... ",
1074 $doing,
1075 time,
1076 1+$i,
1077 1+$last_item,
1078 $self->interval,
1079 $recent_event->{path},
1082 my $max_files_per_connection = $self->max_files_per_connection || 42;
1083 my $success;
1084 if ($self->verbose) {
1085 print STDERR "\n";
1087 push @$xcollector, { rev => $recent_event, i => $i };
1088 if (@$xcollector >= $max_files_per_connection) {
1089 $success = eval {$self->_mirror_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1090 my $sleep = $self->sleep_per_connection;
1091 $sleep = 0.42 unless defined $sleep;
1092 Time::HiRes::sleep $sleep;
1093 if ($options->{piecemeal}) {
1094 $status->{mustreturn} = 1;
1095 return;
1097 } else {
1098 return;
1100 if (!$success || $@) {
1101 warn "Warning: Error while mirroring: $@";
1102 push @$error, $@;
1103 sleep 1;
1105 if ($self->verbose) {
1106 print STDERR "DONE\n";
1110 sub _mirror_empty_xcollector {
1111 my($self,$xcoll,$pathdb,$recent_events) = @_;
1112 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1113 if ($pathdb) {
1114 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1116 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1117 @$xcoll = ();
1118 return $success;
1121 sub _mirror_register_path {
1122 my($self,$pathdb,$coll,$activity) = @_;
1123 my $time = time;
1124 for my $item (@$coll) {
1125 $pathdb->{$item->{path}} =
1127 recentepoch => $item->{epoch},
1128 ($activity."_on") => $time,
1133 sub _mirror_unhide_tempfile {
1134 my($self, $trecentfile) = @_;
1135 my $rfile = $self->rfile;
1136 if (rename $trecentfile, $rfile) {
1137 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1138 } else {
1139 require Carp;
1140 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1142 $self->_use_tempfile (0);
1143 if (my $ctfh = $self->_current_tempfile_fh) {
1144 $ctfh->unlink_on_destroy (0);
1145 $self->_current_tempfile_fh (undef);
1149 sub _mirror_perform_delayed_ops {
1150 my($self) = @_;
1151 my $delayed = $self->delayed_operations;
1152 for my $dst (keys %{$delayed->{unlink}}) {
1153 unless (unlink $dst) {
1154 require Carp;
1155 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1157 delete $delayed->{unlink}{$dst};
1159 for my $dst (keys %{$delayed->{rmdir}}) {
1160 unless (rmdir $dst) {
1161 require Carp;
1162 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1164 delete $delayed->{rmdir}{$dst};
1168 =head2 (void) $obj->mirror_loop
1170 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1171 What happens/should happen if we miss the interval during a single loop?
1173 =cut
1175 sub mirror_loop {
1176 my($self) = @_;
1177 my $iteration_start = time;
1179 my $Signal = 0;
1180 $SIG{INT} = sub { $Signal++ };
1181 my $loopinterval = $self->loopinterval || 42;
1182 my $after = -999999999;
1183 LOOP: while () {
1184 $self->mirror($after);
1185 last LOOP if $Signal;
1186 my $re = $self->recent_events;
1187 $after = $re->[0]{epoch};
1188 if ($self->verbose) {
1189 local $| = 1;
1190 print "($after)";
1192 if (time - $iteration_start < $loopinterval) {
1193 sleep $iteration_start + $loopinterval - time;
1195 if ($self->verbose) {
1196 local $| = 1;
1197 print "~";
1202 =head2 $success = $obj->mirror_path ( $arrref | $path )
1204 If the argument is a scalar it is treated as a path. The remote path
1205 is mirrored into the local copy. $path is the path found in the
1206 I<recentfile>, i.e. it is relative to the root directory of the
1207 mirror.
1209 If the argument is an array reference then all elements are treated as
1210 a path below the current tree and all are rsynced with a single
1211 command (and a single connection).
1213 =cut
1215 sub mirror_path {
1216 my($self,$path) = @_;
1217 # XXX simplify the two branches such that $path is treated as
1218 # [$path] maybe even demand the argument as an arrayref to
1219 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1220 # interface)
1221 if (ref $path and ref $path eq "ARRAY") {
1222 my $dst = $self->localroot;
1223 mkpath dirname $dst;
1224 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1225 lc $self->filenameroot,
1227 TMPDIR => 1,
1228 UNLINK => 0,
1230 for my $p (@$path) {
1231 print $fh $p, "\n";
1233 $fh->flush;
1234 $fh->unlink_on_destroy(1);
1235 my $gaveup = 0;
1236 my $retried = 0;
1237 while (!$self->rsync->exec
1239 src => join("/",
1240 $self->remoteroot,
1242 dst => $dst,
1243 'files-from' => $fh->filename,
1244 )) {
1245 my($err) = $self->rsync->err;
1246 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1247 if ($self->verbose) {
1248 warn "Info: ignoring link_stat error '$err'";
1250 return 1;
1252 $self->register_rsync_error ($err);
1253 if (++$retried >= 3) {
1254 warn "XXX giving up.";
1255 $gaveup = 1;
1256 last;
1259 unless ($gaveup) {
1260 $self->un_register_rsync_error ();
1262 } else {
1263 my $dst = $self->local_path($path);
1264 mkpath dirname $dst;
1265 while (!$self->rsync->exec
1267 src => join("/",
1268 $self->remoteroot,
1269 $path
1271 dst => $dst,
1272 )) {
1273 my($err) = $self->rsync->err;
1274 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1275 if ($self->verbose) {
1276 warn "Info: ignoring link_stat error '$err'";
1278 return 1;
1280 $self->register_rsync_error ($err);
1282 $self->un_register_rsync_error ();
1284 return 1;
1287 sub _my_current_rfile {
1288 my($self) = @_;
1289 my $rfile;
1290 if ($self->_use_tempfile) {
1291 $rfile = $self->_current_tempfile;
1292 } else {
1293 $rfile = $self->rfile;
1295 return $rfile;
1298 =head2 $path = $obj->naive_path_normalize ($path)
1300 Takes an absolute unix style path as argument and canonicalizes it to
1301 a shorter path if possible, removing things like double slashes or
1302 C</./> and removes references to C<../> directories to get a shorter
1303 unambiguos path. This is used to make the code easier that determines
1304 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1306 =cut
1308 sub naive_path_normalize {
1309 my($self,$path) = @_;
1310 $path =~ s|/+|/|g;
1311 1 while $path =~ s|/[^/]+/\.\./|/|;
1312 $path =~ s|/$||;
1313 $path;
1316 =head2 $ret = $obj->read_recent_1 ( $data )
1318 Delegate of C<recent_events()> on protocol 1
1320 =cut
1322 sub read_recent_1 {
1323 my($self, $data) = @_;
1324 return $data->{recent};
1327 =head2 $array_ref = $obj->recent_events ( %options )
1329 Note: the code relies on the resource being written atomically. We
1330 cannot lock because we may have no write access. If the caller has
1331 write access (eg. aggregate() or update()), it has to care for any
1332 necessary locking and it MUST write atomically.
1334 If $options{after} is specified, only file events after this timestamp
1335 are returned.
1337 If $options{before} is specified, only file events before this
1338 timestamp are returned.
1340 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1341 be returned.
1343 If $options{max} is specified only a maximum of this many events is
1344 returned.
1346 If $options{contains} is specified the value must be a hash reference
1347 containing a query. The query may contain the keys C<epoch>, C<path>,
1348 and C<type>. Each represents a condition that must be met. If there is
1349 more than one such key, the conditions are ANDed.
1351 If $options{info} is specified, it must be a hashref. This hashref
1352 will be filled with metadata about the unfiltered recent_events of
1353 this object, in key C<first> there is the first item, in key C<last>
1354 is the last.
1356 =cut
1358 sub recent_events {
1359 my ($self, %options) = @_;
1360 my $info = $options{info};
1361 if ($self->is_slave) {
1362 $self->get_remote_recentfile_as_tempfile;
1364 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1365 -e $rfile_or_tempfile or return [];
1366 my $suffix = $self->serializer_suffix;
1367 my ($data) = eval {
1368 $self->_try_deserialize
1370 $suffix,
1371 $rfile_or_tempfile,
1374 my $err = $@;
1375 if ($err or !$data) {
1376 return [];
1378 my $re;
1379 if (reftype $data eq 'ARRAY') { # protocol 0
1380 $re = $data;
1381 } else {
1382 $re = $self->_recent_events_protocol_x
1384 $data,
1385 $rfile_or_tempfile,
1388 return $re unless grep {defined $options{$_}} qw(after before max);
1389 $self->_recent_events_handle_options ($re, \%options);
1392 sub _recent_events_handle_options {
1393 my($self, $re, $options) = @_;
1394 my $last_item = $#$re;
1395 my $info = $options->{info};
1396 if ($info) {
1397 $info->{first} = $re->[0];
1398 $info->{last} = $re->[-1];
1400 if (defined $options->{after}) {
1401 if ($re->[0]{epoch} > $options->{after}) {
1402 if (
1403 my $f = first
1404 {$re->[$_]{epoch} <= $options->{after}}
1405 0..$#$re
1407 $last_item = $f-1;
1409 } else {
1410 $last_item = -1;
1413 my $first_item = 0;
1414 if (defined $options->{before}) {
1415 if ($re->[0]{epoch} > $options->{before}) {
1416 if (
1417 my $f = first
1418 {$re->[$_]{epoch} < $options->{before}}
1419 0..$last_item
1421 $first_item = $f;
1423 } else {
1424 $first_item = 0;
1427 if (0 != $first_item || -1 != $last_item) {
1428 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1430 if ($options->{'skip-deletes'}) {
1431 @$re = grep { $_->{type} ne "delete" } @$re;
1433 if (my $contopt = $options->{contains}) {
1434 my $seen_allowed = 0;
1435 for my $allow (qw(epoch path type)) {
1436 if (exists $contopt->{$allow}) {
1437 $seen_allowed++;
1438 my $v = $contopt->{$allow};
1439 @$re = grep { $_->{$allow} eq $v } @$re;
1442 if (keys %$contopt > $seen_allowed) {
1443 require Carp;
1444 Carp::confess
1445 (sprintf "unknown query: %s", join ", ", %$contopt);
1448 if ($options->{max} && @$re > $options->{max}) {
1449 @$re = splice @$re, 0, $options->{max};
1451 $re;
1454 sub _recent_events_protocol_x {
1455 my($self,
1456 $data,
1457 $rfile_or_tempfile,
1458 ) = @_;
1459 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1460 # we may be reading meta for the first time
1461 while (my($k,$v) = each %{$data->{meta}}) {
1462 next if $k ne lc $k; # "Producers"
1463 next if defined $self->$k;
1464 $self->$k($v);
1466 my $re = $self->$meth ($data);
1467 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1468 my $minmax = { mtime => $stat[9] };
1469 if (@$re) {
1470 $minmax->{min} = $re->[-1]{epoch};
1471 $minmax->{max} = $re->[0]{epoch};
1473 $self->minmax ( $minmax );
1474 return $re;
1477 sub _try_deserialize {
1478 my($self,
1479 $suffix,
1480 $rfile_or_tempfile,
1481 ) = @_;
1482 if ($suffix eq ".yaml") {
1483 require YAML::Syck;
1484 YAML::Syck::LoadFile($rfile_or_tempfile);
1485 } elsif ($HAVE->{"Data::Serializer"}) {
1486 my $serializer = Data::Serializer->new
1487 ( serializer => $serializers{$suffix} );
1488 my $serialized = do
1490 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1491 local $/;
1492 <$fh>;
1494 $serializer->raw_deserialize($serialized);
1495 } else {
1496 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1500 sub _refresh_internals {
1501 my($self, $dst) = @_;
1502 my $class = ref $self;
1503 my $rfpeek = $class->new_from_file ($dst);
1504 for my $acc (qw(
1505 _merged
1506 minmax
1507 )) {
1508 $self->$acc ( $rfpeek->$acc );
1510 my $old_dirtymark = $self->dirtymark;
1511 my $new_dirtymark = $rfpeek->dirtymark;
1512 if ($old_dirtymark && $new_dirtymark && _bigfloatgt($new_dirtymark,$old_dirtymark)) {
1513 $self->done->reset;
1514 $self->dirtymark ( $new_dirtymark );
1515 $self->seed;
1519 =head2 $ret = $obj->rfilename
1521 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1522 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1524 =cut
1526 sub rfilename {
1527 my($self) = @_;
1528 my $file = sprintf("%s-%s%s",
1529 $self->filenameroot,
1530 $self->interval,
1531 $self->serializer_suffix,
1533 return $file;
1536 =head2 $str = $self->remote_dir
1538 The directory we are mirroring from.
1540 =cut
1542 sub remote_dir {
1543 my($self, $set) = @_;
1544 if (defined $set) {
1545 $self->_remote_dir ($set);
1547 my $x = $self->_remote_dir;
1548 $self->is_slave (1);
1549 return $x;
1552 =head2 $str = $obj->remoteroot
1554 =head2 (void) $obj->remoteroot ( $set )
1556 Get/Set the composed prefix needed when rsyncing from a remote module.
1557 If remote_host, remote_module, and remote_dir are set, it is composed
1558 from these.
1560 =cut
1562 sub remoteroot {
1563 my($self, $set) = @_;
1564 if (defined $set) {
1565 $self->_remoteroot($set);
1567 my $remoteroot = $self->_remoteroot;
1568 unless (defined $remoteroot) {
1569 $remoteroot = sprintf
1571 "%s%s%s",
1572 defined $self->remote_host ? ($self->remote_host."::") : "",
1573 defined $self->remote_module ? ($self->remote_module."/") : "",
1574 defined $self->remote_dir ? $self->remote_dir : "",
1576 $self->_remoteroot($remoteroot);
1578 return $remoteroot;
1581 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1583 Inverse method to L<rfilename>. $recentfilename is a plain filename of
1584 the pattern
1586 $filenameroot-$interval$serializer_suffix
1588 e.g.
1590 RECENT-1M.yaml
1592 This filename is split into its parts and the parts are fed to the
1593 object itself.
1595 =cut
1597 sub resolve_recentfilename {
1598 my($self, $rfname) = @_;
1599 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1600 if (my($f,$i,$s) = $rfname =~ $splitter) {
1601 $self->filenameroot ($f);
1602 $self->interval ($i);
1603 $self->serializer_suffix ($s);
1604 } else {
1605 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1607 return;
1610 =head2 my $rfile = $obj->rfile
1612 Returns the full path of the I<recentfile>
1614 =cut
1616 sub rfile {
1617 my($self) = @_;
1618 my $rfile = $self->_rfile;
1619 return $rfile if defined $rfile;
1620 $rfile = File::Spec->catfile
1621 ($self->localroot,
1622 $self->rfilename,
1624 $self->_rfile ($rfile);
1625 return $rfile;
1628 =head2 $rsync_obj = $obj->rsync
1630 The File::Rsync object that this object uses for communicating with an
1631 upstream server.
1633 =cut
1635 sub rsync {
1636 my($self) = @_;
1637 my $rsync = $self->_rsync;
1638 unless (defined $rsync) {
1639 my $rsync_options = $self->rsync_options || {};
1640 if ($HAVE->{"File::Rsync"}) {
1641 $rsync = File::Rsync->new($rsync_options);
1642 $self->_rsync($rsync);
1643 } else {
1644 die "File::Rsync required for rsync operations. Cannot continue";
1647 return $rsync;
1650 =head2 (void) $obj->register_rsync_error($err)
1652 =head2 (void) $obj->un_register_rsync_error()
1654 Register_rsync_error is called whenever the File::Rsync object fails
1655 on an exec (say, connection doesn't succeed). It issues a warning and
1656 sleeps for an increasing amount of time. Un_register_rsync_error
1657 resets the error count. See also accessor C<max_rsync_errors>.
1659 =cut
1662 my $no_success_count = 0;
1663 my $no_success_time = 0;
1664 sub register_rsync_error {
1665 my($self, $err) = @_;
1666 chomp $err;
1667 $no_success_time = time;
1668 $no_success_count++;
1669 my $max_rsync_errors = $self->max_rsync_errors;
1670 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1671 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1672 require Carp;
1673 Carp::confess
1675 sprintf
1677 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1678 $self->interval,
1679 $err,
1680 $no_success_count,
1683 my $sleep = 12 * $no_success_count;
1684 $sleep = 300 if $sleep > 300;
1685 require Carp;
1686 Carp::cluck
1687 (sprintf
1689 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1690 scalar(localtime($no_success_time)),
1691 $self->interval,
1692 $err,
1693 $sleep,
1695 sleep $sleep
1697 sub un_register_rsync_error {
1698 my($self) = @_;
1699 $no_success_time = 0;
1700 $no_success_count = 0;
1704 =head2 $clone = $obj->_sparse_clone
1706 Clones just as much from itself that it does not hurt. Experimental
1707 method.
1709 Note: what fits better: sparse or shallow? Other suggestions?
1711 =cut
1713 sub _sparse_clone {
1714 my($self) = @_;
1715 my $new = bless {}, ref $self;
1716 for my $m (qw(
1717 _interval
1718 _localroot
1719 _remoteroot
1720 _rfile
1721 _use_tempfile
1722 aggregator
1723 dirtymark
1724 filenameroot
1725 is_slave
1726 max_files_per_connection
1727 protocol
1728 rsync_options
1729 serializer_suffix
1730 sleep_per_connection
1731 verbose
1732 )) {
1733 my $o = $self->$m;
1734 $o = Storable::dclone $o if ref $o;
1735 $new->$m($o);
1737 $new;
1740 =head2 $boolean = OBJ->ttl_reached ()
1742 =cut
1744 sub ttl_reached {
1745 my($self) = @_;
1746 my $have_mirrored = $self->have_mirrored || 0;
1747 my $now = Time::HiRes::time;
1748 my $ttl = $self->ttl;
1749 $ttl = 24.2 unless defined $ttl;
1750 if ($now > $have_mirrored + $ttl) {
1751 return 1;
1753 return 0;
1756 =head2 (void) $obj->unlock()
1758 Unlocking is implemented with an C<rmdir> on a locking directory
1759 (C<.lock> appended to $rfile).
1761 =cut
1763 sub unlock {
1764 my($self) = @_;
1765 return unless $self->_is_locked;
1766 my $rfile = $self->rfile;
1767 rmdir "$rfile.lock";
1768 $self->_is_locked (0);
1771 =head2 unseed
1773 Sets this recentfile in the state of not 'seeded'.
1775 =cut
1776 sub unseed {
1777 my($self) = @_;
1778 $self->seeded(0);
1781 =head2 $ret = $obj->update ($path, $type)
1783 Enter one file into the local I<recentfile>. $path is the (usually
1784 absolute) path. If the path is outside the I<our> tree, then it is
1785 ignored.
1787 $type is one of C<new> or C<delete>.
1789 The new file event is uhshifted to the array of recent_events and the
1790 array is shortened to the length of the timespan allowed. This is
1791 usually the timespan specified by the interval of this recentfile but
1792 as long as this recentfile has not been merged to another one, the
1793 timespan may grow without bounds.
1795 =cut
1796 sub _epoch_monotonically_increasing {
1797 my($self,$epoch,$recent) = @_;
1798 return $epoch unless @$recent; # the first one goes unoffended
1799 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1800 return $epoch;
1801 } else {
1802 return _increase_a_bit($recent->[0]{epoch});
1805 sub update {
1806 my($self,$path,$type) = @_;
1807 die "update called without path argument" unless defined $path;
1808 die "update called without type argument" unless defined $type;
1809 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1810 my $canonmeth = $self->canonize;
1811 unless ($canonmeth) {
1812 $canonmeth = "naive_path_normalize";
1814 $path = $self->$canonmeth($path);
1815 my $lrd = $self->localroot;
1816 if ($path =~ s|^\Q$lrd\E||) {
1817 $path =~ s|^/||;
1818 my $interval = $self->interval;
1819 my $secs = $self->interval_secs();
1820 $self->lock;
1821 # you must calculate the time after having locked, of course
1822 my $epoch = Time::HiRes::time;
1823 my $recent = $self->recent_events;
1824 $epoch = $self->_epoch_monotonically_increasing($epoch,$recent);
1825 $recent ||= [];
1826 my $oldest_allowed = 0;
1827 if (my $merged = $self->merged) {
1828 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
1829 } else {
1830 # as long as we are not merged at all, no limits!
1832 TRUNCATE: while (@$recent) {
1833 if ($recent->[-1]{epoch} < $oldest_allowed) {
1834 pop @$recent;
1835 } else {
1836 last TRUNCATE;
1839 # remove older duplicates of this $path, irrespective of $type:
1840 $recent = [ grep { $_->{path} ne $path } @$recent ];
1842 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1843 $self->write_recent($recent);
1844 $self->_assert_symlink;
1845 $self->unlock;
1849 =head2 seed
1851 Sets this recentfile in the state of 'seeded' which means it has to
1852 re-evaluate its uptodateness.
1854 =cut
1855 sub seed {
1856 my($self) = @_;
1857 $self->seeded(1);
1860 =head2 seeded
1862 Tells if the recentfile is in the state 'seeded'.
1864 =cut
1865 sub seeded {
1866 my($self, $set) = @_;
1867 if (defined $set) {
1868 $self->_seeded ($set);
1870 my $x = $self->_seeded;
1871 unless (defined $x) {
1872 $x = 0;
1873 $self->_seeded ($x);
1875 return $x;
1878 =head2 uptodate
1880 True if this object has mirrored the complete interval covered by the
1881 current recentfile.
1883 *** WIP ***
1885 =cut
1886 sub uptodate {
1887 my($self) = @_;
1888 my $uptodate;
1889 my $why;
1890 if ($self->_uptodateness_ever_reached and not $self->seeded) {
1891 $why = "saturated";
1892 $uptodate = 1;
1894 unless (defined $uptodate) {
1895 if ($self->ttl_reached){
1896 $why = "ttl_reached returned true, so we are not uptodate";
1897 $uptodate = 0 ;
1900 unless (defined $uptodate) {
1901 # look if recentfile has unchanged timestamp
1902 my $minmax = $self->minmax;
1903 if (exists $minmax->{mtime}) {
1904 my $rfile = $self->_my_current_rfile;
1905 my @stat = stat $rfile;
1906 my $mtime = $stat[9];
1907 if ($mtime > $minmax->{mtime}) {
1908 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
1909 $uptodate = 0;
1910 } else {
1911 my $covered = $self->done->covered(@$minmax{qw(max min)});
1912 $why = "minmax covered[$covered], so we return that";
1913 $uptodate = $covered;
1917 unless (defined $uptodate) {
1918 $why = "fallthrough, so not uptodate";
1919 $uptodate = 0;
1921 if ($uptodate) {
1922 $self->_uptodateness_ever_reached(1);
1923 $self->unseed;
1925 my $remember =
1927 uptodate => $uptodate,
1928 why => $why,
1930 $self->_remember_last_uptodate_call($remember);
1931 return $uptodate;
1934 =head2 $obj->write_recent ($recent_files_arrayref)
1936 Writes a I<recentfile> based on the current reflection of the current
1937 state of the tree limited by the current interval.
1939 =cut
1941 sub write_recent {
1942 my ($self,$recent) = @_;
1943 die "write_recent called without argument" unless defined $recent;
1944 my $meth = sprintf "write_%d", $self->protocol;
1945 $self->$meth($recent);
1948 =head2 $obj->write_0 ($recent_files_arrayref)
1950 Delegate of C<write_recent()> on protocol 0
1952 =cut
1954 sub write_0 {
1955 my ($self,$recent) = @_;
1956 my $rfile = $self->rfile;
1957 YAML::Syck::DumpFile("$rfile.new",$recent);
1958 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1961 =head2 $obj->write_1 ($recent_files_arrayref)
1963 Delegate of C<write_recent()> on protocol 1
1965 =cut
1967 sub write_1 {
1968 my ($self,$recent) = @_;
1969 my $rfile = $self->rfile;
1970 my $suffix = $self->serializer_suffix;
1971 my $data = {
1972 meta => $self->meta_data,
1973 recent => $recent,
1975 my $serialized;
1976 if ($suffix eq ".yaml") {
1977 $serialized = YAML::Syck::Dump($data);
1978 } elsif ($HAVE->{"Data::Serializer"}) {
1979 my $serializer = Data::Serializer->new
1980 ( serializer => $serializers{$suffix} );
1981 $serialized = $serializer->raw_serialize($data);
1982 } else {
1983 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1985 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
1986 print $fh $serialized;
1987 close $fh or die "Could not close '$rfile.new': $!";
1988 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1991 BEGIN {
1992 my @pod_lines =
1993 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1995 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1997 The idea is that we want to have a short file that records really
1998 recent changes. So that a fresh mirror can be kept fresh as long as
1999 the connectivity is given. Then we want longer files that record the
2000 history before. So when the mirror falls behind the update period
2001 reflected in the shortest file, it can complement the list of recent
2002 file events with the next one. And if this is not long enough we want
2003 another one, again a bit longer. And we want one that completes the
2004 history back to the oldest file. The index files do contain the
2005 complete list of current files. The larger an index file is the less
2006 often it is updated. For practical reasons adjacent files will often
2007 overlap a bit but this is neither necessary nor enforced. That's the
2008 basic idea. The following example represents a tree that has a few
2009 updates every day:
2011 RECENT.recent -> RECENT-1h.yaml
2012 RECENT-6h.yaml
2013 RECENT-1d.yaml
2014 RECENT-1M.yaml
2015 RECENT-1W.yaml
2016 RECENT-1Q.yaml
2017 RECENT-1Y.yaml
2018 RECENT-Z.yaml
2020 The first file is the principal file, in so far it is the one that is
2021 written first after a filesystem change. Usually a symlink links to it
2022 with a filename that has the same filenameroot and the suffix
2023 C<.recent>. On systems that do not support symlinks there is a plain
2024 copy maintained instead.
2026 The last file, the Z file, contains the complementary files that are
2027 in none of the other files. It does never contain C<deletes>. Besides
2028 this it serves the role of a recovery mechanism or spill over pond.
2029 When things go wrong, it's a valuable controlling instance to hold the
2030 differences between the collection of limited interval files and the
2031 actual filesystem.
2033 =head2 A SINGLE RECENTFILE
2035 A I<recentfile> consists of a hash that has two keys: C<meta> and
2036 C<recent>. The C<meta> part has metadata and the C<recent> part has a
2037 list of fileobjects.
2039 =head2 THE META PART
2041 Here we find things that are pretty much self explaining: all
2042 lowercase attributes are accessors and as such explained somewhere
2043 above in this manpage. The uppercase attribute C<Producers> contains
2044 version information about involved software components. Nothing to
2045 worry about as I believe.
2047 =head2 THE RECENT PART
2049 This is the interesting part. Every entry refers to some filesystem
2050 change (with path, epoch, type). The epoch value is the point in time
2051 when some change was I<registered>. Do not be tempted to believe that
2052 the entry has a direct relation to something like modification time or
2053 change time on the filesystem level. The timestamp (I<epoch> element)
2054 is a floating point number and does practically never correspond
2055 exactly to the data recorded in the filesystem but rather to the time
2056 when some process succeeded to report to the I<recentfile> mechanism
2057 that something has changed. This is why many parts of the code refer
2058 to I<events>, because we merely try to record the I<event> of the
2059 discovery of a change, not the time of the change itself.
2061 All these entries can be devided into two types (denoted by the
2062 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
2063 C<new>s. Deletes are C<delete>s.
2065 Another distinction is for objects with an epoch timestamp and others
2066 without. All files that were already existing on the filesystem before
2067 the I<recentfile> mechanism was installed, get recorded with a
2068 timestamp of zero.
2070 Besides an C<epoch> and a C<type> attribute we find a third one:
2071 C<path>. This path is relative to the directory we find the
2072 I<recentfile> in.
2074 The order of the entries in the I<recentfile> is by decreasing epoch
2075 attribute. These are either 0 or a unique floating point number. They
2076 are zero for events that were happening either before the time that
2077 the I<recentfile> mechanism was set up or were left undiscovered for a
2078 while and never handed over to update(). They are floating point
2079 numbers for all events being regularly handed to update(). And when
2080 the server has ntp running correctly, then the timestamps are
2081 actually decreasing and unique.
2083 =head1 CORRUPTION AND RECOVERY
2085 If the origin host breaks the promise to deliver consistent and
2086 complete I<recentfiles> then the way back to sanity shall be achieved
2087 through either the C<zloop> (still TBD) or traditional rsyncing
2088 between the hosts. For example, if the origin server forgets to deploy
2089 ntp and the clock on it jumps backwards some day, then this would
2090 probably go unnoticed for a while and many software components that
2091 rely on the time never running backwards will make wrong decisions.
2092 After some time this accident would probably still be found in one of
2093 the I<recentfiles> but would become meaningless as soon as a mirror
2094 has run through the sanitizing procedures. Same goes for origin hosts
2095 that forget to include or deliberately omit some files.
2097 =head1 SERIALIZERS
2099 The following suffixes are supported and trigger the use of these
2100 serializers:
2102 =over 4
2104 =item C<< ".yaml" => "YAML::Syck" >>
2106 =item C<< ".json" => "JSON" >>
2108 =item C<< ".sto" => "Storable" >>
2110 =item C<< ".dd" => "Data::Dumper" >>
2112 =back
2114 =cut
2116 BEGIN {
2117 my @pod_lines =
2118 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2120 =head1 INTERVAL SPEC
2122 An interval spec is a primitive way to express time spans. Normally it
2123 is composed from an integer and a letter.
2125 As a special case, a string that consists only of the single letter
2126 C<Z>, stands for unlimited time.
2128 The following letters express the specified number of seconds:
2130 =over 4
2132 =item C<< s => 1 >>
2134 =item C<< m => 60 >>
2136 =item C<< h => 60*60 >>
2138 =item C<< d => 60*60*24 >>
2140 =item C<< W => 60*60*24*7 >>
2142 =item C<< M => 60*60*24*30 >>
2144 =item C<< Q => 60*60*24*90 >>
2146 =item C<< Y => 60*60*24*365.25 >>
2148 =back
2150 =cut
2152 =head1 BACKGROUND
2154 This is about speeding up rsync operation on large trees to many
2155 places. Uses a small metadata cocktail and pull technology.
2157 =head2 NON-COMPETITORS
2159 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
2160 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
2161 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
2162 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
2163 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
2164 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
2166 rsnapshot www.rsnapshot.org focus on backup
2167 csync www.csync.org more like unison
2168 multi-rsync sourceforge 167893 lan push to many
2170 =head2 COMPETITORS
2172 The problem to solve which clusters and ftp mirrors and otherwise
2173 replicated datasets like CPAN share: how to transfer only a minimum
2174 amount of data to determine the diff between two hosts.
2176 Normally it takes a long time to determine the diff itself before it
2177 can be transferred. Known solutions at the time of this writing are
2178 csync2, and rsync 3 batch mode.
2180 For many years the best solution was csync2 which solves the
2181 problem by maintining a sqlite database on both ends and talking a
2182 highly sophisticated protocol to quickly determine which files to send
2183 and which to delete at any given point in time. Csync2 is often
2184 inconvenient because the act of syncing demands quite an intimate
2185 relationship between the sender and the receiver and suffers when the
2186 number of syncing sites is large or connections are unreliable.
2188 Rsync 3 batch mode works around these problems by providing rsync-able
2189 batch files which allow receiving nodes to replay the history of the
2190 other nodes. This reduces the need to have an incestuous relation but
2191 it has the disadvantage that these batch files replicate the contents
2192 of the involved files. This seems inappropriate when the nodes already
2193 have a means of communicating over rsync.
2195 rersyncrecent solves this problem with a couple of (usually 2-10)
2196 index files which cover different overlapping time intervals. The
2197 master writes these files and the clients can construct the full tree
2198 from the information contained in them. The most recent index file
2199 usually covers the last seconds or minutes or hours of the tree and
2200 depending on the needs, slaves can rsync every few seconds and then
2201 bring their trees in full sync.
2203 The rersyncrecent mode was developed for CPAN but I hope it is a
2204 convenient and economic general purpose solution. I'm looking forward
2205 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2206 then ... the first FUSE based CPAN filesystem anyone?
2208 =head1 SEE ALSO
2210 Barbie is providing a database of release dates. See
2211 http://use.perl.org/~barbie/journal/37907
2213 =head1 AUTHOR
2215 Andreas König
2217 =head1 BUGS
2219 Please report any bugs or feature requests through the web interface
2221 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2222 I will be notified, and then you'll automatically be notified of
2223 progress on your bug as I make changes.
2225 =head1 SUPPORT
2227 You can find documentation for this module with the perldoc command.
2229 perldoc File::Rsync::Mirror::Recentfile
2231 You can also look for information at:
2233 =over 4
2235 =item * RT: CPAN's request tracker
2237 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2239 =item * AnnoCPAN: Annotated CPAN documentation
2241 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2243 =item * CPAN Ratings
2245 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2247 =item * Search CPAN
2249 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2251 =back
2254 =head1 ACKNOWLEDGEMENTS
2256 Thanks to RJBS for module-starter.
2258 =head1 COPYRIGHT & LICENSE
2260 Copyright 2008 Andreas König.
2262 This program is free software; you can redistribute it and/or modify it
2263 under the same terms as Perl itself.
2266 =cut
2268 1; # End of File::Rsync::Mirror::Recentfile
2270 # Local Variables:
2271 # mode: cperl
2272 # cperl-indent-level: 4
2273 # End: