calculate a new dirtymark after a dirty_epoch operation
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob8b392a64807da5a738708580b3397cf7cfb7c873
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 my $HAVE = {};
19 for my $package (
20 "Data::Serializer",
21 "File::Rsync"
22 ) {
23 $HAVE->{$package} = eval qq{ require $package; };
25 use Config;
26 use File::Basename qw(basename dirname fileparse);
27 use File::Copy qw(cp);
28 use File::Path qw(mkpath);
29 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
30 use File::Temp;
31 use List::Util qw(first max min);
32 use Scalar::Util qw(reftype);
33 use Storable;
34 use Time::HiRes qw();
35 use YAML::Syck;
37 use version; our $VERSION = qv('0.0.1');
40 use constant MAX_INT => ~0>>1; # anything better?
41 use constant DEFAULT_PROTOCOL => 1;
43 # cf. interval_secs
44 my %seconds;
46 # maybe subclass if this mapping is bad?
47 my %serializers;
49 =head1 SYNOPSIS
51 B<!!!! PRE-ALPHA ALERT !!!!>
53 Nothing in here is believed to be stable, nothing yet intended for
54 public consumption. The plan is to provide a script in one of the next
55 releases that acts as a frontend for all the backend functionality.
56 Option and method names will very likely change.
58 For the rationale see the section BACKGROUND.
60 This is published only for developers of the (yet to be named)
61 script(s).
63 Writer (of a single file):
65 use File::Rsync::Mirror::Recentfile;
66 my $fr = File::Rsync::Mirror::Recentfile->new
68 interval => q(6h),
69 filenameroot => "RECENT",
70 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
71 localroot => "/home/ftp/pub/PAUSE/authors/",
72 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
74 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
76 Reader/mirrorer:
78 my $rf = File::Rsync::Mirror::Recentfile->new
80 filenameroot => "RECENT",
81 ignore_link_stat_errors => 1,
82 interval => q(6h),
83 localroot => "/home/ftp/pub/PAUSE/authors",
84 remote_dir => "",
85 remote_host => "pause.perl.org",
86 remote_module => "authors",
87 rsync_options => {
88 compress => 1,
89 'rsync-path' => '/usr/bin/rsync',
90 links => 1,
91 times => 1,
92 'omit-dir-times' => 1,
93 checksum => 1,
95 verbose => 1,
97 $rf->mirror;
99 Aggregator (usually the writer):
101 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
102 $rf->aggregate;
104 =head1 EXPORT
106 No exports.
108 =head1 CONSTRUCTORS / DESTRUCTOR
110 =head2 my $obj = CLASS->new(%hash)
112 Constructor. On every argument pair the key is a method name and the
113 value is an argument to that method name.
115 If a recentfile for this resource already exists, metadata that are
116 not defined by the constructor will be fetched from there as soon as
117 it is being read by recent_events().
119 =cut
121 sub new {
122 my($class, @args) = @_;
123 my $self = bless {}, $class;
124 while (@args) {
125 my($method,$arg) = splice @args, 0, 2;
126 $self->$method($arg);
128 unless (defined $self->protocol) {
129 $self->protocol(DEFAULT_PROTOCOL);
131 unless (defined $self->filenameroot) {
132 $self->filenameroot("RECENT");
134 unless (defined $self->serializer_suffix) {
135 $self->serializer_suffix(".yaml");
137 return $self;
140 =head2 my $obj = CLASS->new_from_file($file)
142 Constructor. $file is a I<recentfile>.
144 =cut
146 sub new_from_file {
147 my($class, $file) = @_;
148 my $self = bless {}, $class;
149 $self->_rfile($file);
150 #?# $self->lock;
151 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
152 local $/;
153 <$fh>;
155 # XXX: we can skip this step when the metadata are sufficient, but
156 # we cannot parse the file without some magic stuff about
157 # serialized formats
158 while (-l $file) {
159 my($name,$path) = fileparse $file;
160 my $symlink = readlink $file;
161 if ($symlink =~ m|/|) {
162 die "FIXME: filenames containing '/' not supported, got $symlink";
164 $file = File::Spec->catfile ( $path, $symlink );
166 my($name,$path,$suffix) = fileparse $file, keys %serializers;
167 $self->serializer_suffix($suffix);
168 $self->localroot($path);
169 die "Could not determine file format from suffix" unless $suffix;
170 my $deserialized;
171 if ($suffix eq ".yaml") {
172 require YAML::Syck;
173 $deserialized = YAML::Syck::LoadFile($file);
174 } elsif ($HAVE->{"Data::Serializer"}) {
175 my $serializer = Data::Serializer->new
176 ( serializer => $serializers{$suffix} );
177 $deserialized = $serializer->raw_deserialize($serialized);
178 } else {
179 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
181 while (my($k,$v) = each %{$deserialized->{meta}}) {
182 next if $k ne lc $k; # "Producers"
183 $self->$k($v);
185 unless (defined $self->protocol) {
186 $self->protocol(DEFAULT_PROTOCOL);
188 return $self;
191 =head2 DESTROY
193 A simple unlock.
195 =cut
196 sub DESTROY { shift->unlock }
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_delayed_operations",
209 "_done",
210 "_interval",
211 "_is_locked",
212 "_localroot",
213 "_merged",
214 "_pathdb",
215 "_remember_last_uptodate_call",
216 "_remote_dir",
217 "_remoteroot",
218 "_rfile",
219 "_rsync",
220 "_seeded",
221 "_uptodateness_ever_reached",
222 "_use_tempfile",
225 my @pod_lines =
226 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
228 =over 4
230 =item aggregator
232 A list of interval specs that tell the aggregator which I<recentfile>s
233 are to be produced.
235 =item canonize
237 The name of a method to canonize the path before rsyncing. Only
238 supported value is C<naive_path_normalize>. Defaults to that.
240 =item comment
242 A comment about this tree and setup.
244 =item dirtymark
246 A timestamp. The dirtymark is updated whenever an out of band change
247 on the origin server is performed that violates the protocol. Say,
248 they add or remove files in the middle somewhere. Slaves must react
249 with a devaluation of their C<done> structure which then leads to a
250 full re-sync of all files.
252 =item filenameroot
254 The (prefix of the) filename we use for this I<recentfile>. Defaults to
255 C<RECENT>.
257 =item have_mirrored
259 Timestamp remembering when we mirrored this recentfile the last time.
260 Only relevant for slaves.
262 =item ignore_link_stat_errors
264 If set to true, rsync errors are ignored that complain about link stat
265 errors. These seem to happen only when there are files missing at the
266 origin. In race conditions this can always happen, so it is
267 recommended to set this value to true.
269 =item is_slave
271 If set to true, this object will fetch a new recentfile from remote
272 when the timespan between the last mirror (see have_mirrored) and now
273 is too large (currently hardcoded arbitrary 420 seconds).
275 =item locktimeout
277 After how many seconds shall we die if we cannot lock a I<recentfile>?
278 Defaults to 600 seconds.
280 =item loopinterval
282 When mirror_loop is called, this accessor can specify how much time
283 every loop shall at least take. If the work of a loop is done before
284 that time has gone, sleeps for the rest of the time. Defaults to
285 arbitrary 42 seconds.
287 =item max_files_per_connection
289 Maximum number of files that are transferred on a single rsync call.
290 Setting it higher means higher performance at the price of holding
291 connections longer and potentially disturbing other users in the pool.
292 Defaults to the arbitrary value 42.
294 =item max_rsync_errors
296 When rsync operations encounter that many errors without any resetting
297 success in between, then we die. Defaults to unlimited. A value of
298 -1 means we run forever ignoring all rsync errors.
300 =item minmax
302 Hashref remembering when we read the recent_events from this file the
303 last time and what the timespan was.
305 =item protocol
307 When the RECENT file format changes, we increment the protocol. We try
308 to support older protocols in later releases.
310 =item remote_host
312 The host we are mirroring from. Leave empty for the local filesystem.
314 =item remote_module
316 Rsync servers have so called modules to separate directory trees from
317 each other. Put here the name of the module under which we are
318 mirroring. Leave empty for local filesystem.
320 =item rsync_options
322 Things like compress, links, times or checksums. Passed in to the
323 File::Rsync object used to run the mirror.
325 =item serializer_suffix
327 Mostly untested accessor. The only well tested format for
328 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
329 Data::Serializer. But in principle other formats are supported as
330 well. See section SERIALIZERS below.
332 =item sleep_per_connection
334 Sleep that many seconds (floating point OK) after every chunk of rsyncing
335 has finished. Defaults to arbitrary 0.42.
337 =item ttl
339 Time to live. Number of seconds after which this recentfile must be
340 fetched again from the origin server. Only relevant for slaves.
341 Defaults to arbitrary 24.2 seconds.
343 =item verbose
345 Boolean to turn on a bit verbosity.
347 =back
349 =cut
351 use accessors @accessors;
353 =head1 METHODS
355 =head2 (void) $obj->aggregate
357 Takes all intervals that are collected in the accessor called
358 aggregator. Sorts them by actual length of the interval.
359 Removes those that are shorter than our own interval. Then merges this
360 object into the next larger object. The merging continues upwards
361 as long as the next I<recentfile> is old enough to warrant a merge.
363 If a merge is warranted is decided according to the interval of the
364 previous interval so that larger files are not so often updated as
365 smaller ones.
367 Here is an example to illustrate the behaviour. Given aggregators
369 1h 1d 1W 1M 1Q 1Y Z
371 then
373 1h updates 1d on every call to aggregate()
374 1d updates 1W earliest after 1h
375 1W updates 1M earliest after 1d
376 1M updates 1Q earliest after 1W
377 1Q updates 1Y earliest after 1M
378 1Y updates Z earliest after 1Q
380 Note that all but the smallest recentfile get updated at an arbitrary
381 rate and as such are quite useless on their own.
383 =cut
385 sub aggregate {
386 my($self) = @_;
387 my @aggs = sort { $a->{secs} <=> $b->{secs} }
388 grep { $_->{secs} >= $self->interval_secs }
389 map { { interval => $_, secs => $self->interval_secs($_)} }
390 $self->interval, @{$self->aggregator || []};
391 $aggs[0]{object} = $self;
392 AGGREGATOR: for my $i (0..$#aggs-1) {
393 my $this = $aggs[$i]{object};
394 my $next = $this->_sparse_clone;
395 $next->interval($aggs[$i+1]{interval});
396 my $want_merge = 0;
397 if ($i == 0) {
398 $want_merge = 1;
399 } else {
400 my $next_rfile = $next->rfile;
401 if (-e $next_rfile) {
402 my $prev = $aggs[$i-1]{object};
403 local $^T = time;
404 my $next_age = 86400 * -M $next_rfile;
405 if ($next_age > $prev->interval_secs) {
406 $want_merge = 1;
408 } else {
409 $want_merge = 1;
412 if ($want_merge) {
413 $next->merge($this);
414 $aggs[$i+1]{object} = $next;
415 } else {
416 last AGGREGATOR;
421 # collect file size and mtime for all files of this aggregate
422 sub _debug_aggregate {
423 my($self) = @_;
424 my @aggs = sort { $a->{secs} <=> $b->{secs} }
425 map { { interval => $_, secs => $self->interval_secs($_)} }
426 $self->interval, @{$self->aggregator || []};
427 my $report = [];
428 for my $i (0..$#aggs) {
429 my $this = Storable::dclone $self;
430 $this->interval($aggs[$i]{interval});
431 my $rfile = $this->rfile;
432 my @stat = stat $rfile;
433 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
435 $report;
438 # (void) $self->_assert_symlink()
439 sub _assert_symlink {
440 my($self) = @_;
441 my $recentrecentfile = File::Spec->catfile
443 $self->localroot,
444 sprintf
446 "%s.recent",
447 $self->filenameroot
450 if ($Config{d_symlink} eq "define") {
451 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
452 if (-l $recentrecentfile) {
453 my $found_symlink = readlink $recentrecentfile;
454 if ($found_symlink eq $self->rfilename) {
455 return;
456 } else {
457 $howto_create_symlink = 2;
459 } else {
460 $howto_create_symlink = 1;
462 if (1 == $howto_create_symlink) {
463 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
464 } else {
465 unlink "$recentrecentfile.$$"; # may fail
466 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
467 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
469 } else {
470 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
471 unlink "$recentrecentfile.$$"; # may fail
472 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
473 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
477 =head2 $hashref = $obj->delayed_operations
479 A hash of hashes containing unlink and rmdir operations which had to
480 wait until the recentfile got unhidden in order to not confuse
481 downstream mirrors (in case we have some).
483 =cut
485 sub delayed_operations {
486 my($self) = @_;
487 my $x = $self->_delayed_operations;
488 unless (defined $x) {
489 $x = {
490 unlink => {},
491 rmdir => {},
493 $self->_delayed_operations ($x);
495 return $x;
498 =head2 $done = $obj->done
500 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
501 that keeps track of rsync activities. Only needed and used when we are
502 a mirroring slave.
504 =cut
506 sub done {
507 my($self) = @_;
508 my $done = $self->_done;
509 if (!$done) {
510 require File::Rsync::Mirror::Recentfile::Done;
511 $done = File::Rsync::Mirror::Recentfile::Done->new();
512 $done->_rfinterval ($self->interval);
513 $self->_done ( $done );
515 return $done;
518 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
520 Stores the remote I<recentfile> locally as a tempfile. The caller is
521 responsible to remove the file after use.
523 Note: if you're intending to act as an rsync server for other slaves,
524 then you must prefer this method to fetch that file with
525 get_remotefile(). Otherwise downstream mirrors would expect you to
526 already have mirrored all the files that are in the I<recentfile>
527 before you have them mirrored.
529 =cut
531 sub get_remote_recentfile_as_tempfile {
532 my($self) = @_;
533 mkpath $self->localroot;
534 my $fh;
535 my $trfilename;
536 if ( $self->_use_tempfile() ) {
537 return $self->_current_tempfile if ! $self->ttl_reached;
538 $fh = $self->_current_tempfile_fh;
539 $trfilename = $self->rfilename;
540 } else {
541 $trfilename = $self->rfilename;
544 my $dst;
545 if ($fh) {
546 $dst = $self->_current_tempfile;
547 } else {
548 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
549 $dst = $fh->filename;
550 $self->_current_tempfile ($dst);
551 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
552 if (defined $rfile && -e $rfile) {
553 # saving on bandwidth. Might need to be configurable
554 # $self->bandwidth_is_cheap?
555 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
558 my $src = join ("/",
559 $self->remoteroot,
560 $trfilename,
562 if ($self->verbose) {
563 my $doing = -e $dst ? "Sync" : "Get";
564 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
565 printf STDERR
567 "%-4s %d (1/1/%s) temp %s ... ",
568 $doing,
569 time,
570 $self->interval,
571 $display_dst,
574 my $gaveup = 0;
575 my $retried = 0;
576 while (!$self->rsync->exec(
577 src => $src,
578 dst => $dst,
579 )) {
580 $self->register_rsync_error ($self->rsync->err);
581 if (++$retried >= 3) {
582 warn "XXX giving up";
583 $gaveup = 1;
584 last;
587 if ($gaveup) {
588 printf STDERR "Warning: gave up mirroring %s, will try again later", $self->interval;
589 } else {
590 $self->_refresh_internals ($dst);
591 $self->have_mirrored (Time::HiRes::time);
592 $self->un_register_rsync_error ();
594 if ($self->verbose) {
595 print STDERR "DONE\n";
597 my $mode = 0644;
598 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
599 return $dst;
602 sub _get_remote_rat_provide_tempfile_object {
603 my($self, $trfilename) = @_;
604 my $fh = File::Temp->new
605 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
606 $trfilename,
608 DIR => $self->localroot,
609 SUFFIX => $self->serializer_suffix,
610 UNLINK => $self->_use_tempfile,
612 if ($self->_use_tempfile) {
613 $self->_current_tempfile_fh ($fh); # delay self destruction
615 return $fh;
618 =head2 $localpath = $obj->get_remotefile ( $relative_path )
620 Rsyncs one single remote file to local filesystem.
622 Note: no locking is done on this file. Any number of processes may
623 mirror this object.
625 Note II: do not use for recentfiles. If you are a cascading
626 slave/server combination, it would confuse other slaves. They would
627 expect the contents of these recentfiles to be available. Use
628 get_remote_recentfile_as_tempfile() instead.
630 =cut
632 sub get_remotefile {
633 my($self, $path) = @_;
634 my $dst = File::Spec->catfile($self->localroot, $path);
635 mkpath dirname $dst;
636 if ($self->verbose) {
637 my $doing = -e $dst ? "Sync" : "Get";
638 printf STDERR
640 "%-4s %d (1/1/%s) %s ... ",
641 $doing,
642 time,
643 $self->interval,
644 $path,
647 while (!$self->rsync->exec(
648 src => join("/",
649 $self->remoteroot,
650 $path),
651 dst => $dst,
652 )) {
653 $self->register_rsync_error ($self->rsync->err);
655 $self->un_register_rsync_error ();
656 if ($self->verbose) {
657 print STDERR "DONE\n";
659 return $dst;
662 =head2 $obj->interval ( $interval_spec )
664 Get/set accessor. $interval_spec is a string and described below in
665 the section INTERVAL SPEC.
667 =cut
669 sub interval {
670 my ($self, $interval) = @_;
671 if (@_ >= 2) {
672 $self->_interval($interval);
673 $self->_rfile(undef);
675 $interval = $self->_interval;
676 unless (defined $interval) {
677 # do not ask the $self too much, it recurses!
678 require Carp;
679 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
681 return $interval;
684 =head2 $secs = $obj->interval_secs ( $interval_spec )
686 $interval_spec is described below in the section INTERVAL SPEC. If
687 empty defaults to the inherent interval for this object.
689 =cut
691 sub interval_secs {
692 my ($self, $interval) = @_;
693 $interval ||= $self->interval;
694 unless (defined $interval) {
695 die "interval_secs() called without argument on an object without a declared one";
697 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
698 die "Could not determine seconds from interval[$interval]";
699 if ($interval eq "Z") {
700 return MAX_INT;
701 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
702 return $seconds{$t}*$n;
703 } else {
704 die "Invalid interval specification: n[$n]t[$t]";
708 =head2 $obj->localroot ( $localroot )
710 Get/set accessor. The local root of the tree.
712 =cut
714 sub localroot {
715 my ($self, $localroot) = @_;
716 if (@_ >= 2) {
717 $self->_localroot($localroot);
718 $self->_rfile(undef);
720 $localroot = $self->_localroot;
723 =head2 $ret = $obj->local_path($path_found_in_recentfile)
725 Combines the path to our local mirror and the path of an object found
726 in this I<recentfile>. In other words: the target of a mirror operation.
728 Implementation note: We split on slashes and then use
729 File::Spec::catfile to adjust to the local operating system.
731 =cut
733 sub local_path {
734 my($self,$path) = @_;
735 unless (defined $path) {
736 # seems like a degenerated case
737 return $self->localroot;
739 my @p = split m|/|, $path;
740 File::Spec->catfile($self->localroot,@p);
743 =head2 (void) $obj->lock
745 Locking is implemented with an C<mkdir> on a locking directory
746 (C<.lock> appended to $rfile).
748 =cut
750 sub lock {
751 my ($self) = @_;
752 # not using flock because it locks on filehandles instead of
753 # old school ressources.
754 my $locked = $self->_is_locked and return;
755 my $rfile = $self->rfile;
756 # XXX need a way to allow breaking the lock
757 my $start = time;
758 my $locktimeout = $self->locktimeout || 600;
759 while (not mkdir "$rfile.lock") {
760 Time::HiRes::sleep 0.01;
761 if (time - $start > $locktimeout) {
762 die "Could not acquire lockdirectory '$rfile.lock': $!";
765 $self->_is_locked (1);
768 =head2 (void) $obj->merge ($other)
770 Bulk update of this object with another one. It's used to merge a
771 smaller and younger $other object into the current one. If this file
772 is a C<Z> file, then we do not merge in objects of type C<delete>. But
773 if we encounter an object of type delete we delete the corresponding
774 C<new> object if we have it.
776 If there is nothing to be merged, nothing is done.
778 =cut
780 sub merge {
781 my($self, $other) = @_;
782 $self->_merge_sanitycheck ( $other );
783 $other->lock;
784 my $other_recent = $other->recent_events || [];
785 $self->lock;
786 my $my_recent = $self->recent_events || [];
788 # calculate the target time span
789 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
790 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
791 my $oldest_allowed = 0;
792 my $something_done;
793 unless ($my_recent->[0]) {
794 # obstetrics
795 $something_done=1;
797 if ($epoch) {
798 if (my $merged = $self->merged) {
799 my $secs = $self->interval_secs();
800 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
802 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
803 pop @$my_recent;
804 $something_done=1;
808 my %have;
809 my $recent = [];
810 for my $oev (@$other_recent) {
811 my $oevepoch = $oev->{epoch} || 0;
812 next if _bigfloatlt($oevepoch, $oldest_allowed);
813 my $path = $oev->{path};
814 next if $have{$path}++;
815 if ( $self->interval eq "Z"
816 and $oev->{type} eq "delete") {
817 # do nothing
818 } else {
819 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
820 $something_done=1;
822 push @$recent, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
825 if ($something_done) {
826 $self->_merge_something_done ($recent, $my_recent, $other_recent, $other, \%have, $epoch);
828 $self->unlock;
829 $other->unlock;
832 sub _merge_something_done {
833 my($self, $recent, $my_recent, $other_recent, $other, $have, $epoch) = @_;
834 push @$recent, grep { !$have->{$_->{path}}++ } @$my_recent;
835 if (_bigfloatgt($other->dirtymark, $self->dirtymark)) {
836 $self->dirtymark ( $other->dirtymark );
838 $self->write_recent($recent);
839 $other->merged({
840 time => Time::HiRes::time, # not used anywhere
841 epoch => $recent->[0]{epoch},
842 into_interval => $self->interval, # not used anywhere
844 $other->write_recent($other_recent);
847 sub _merge_sanitycheck {
848 my($self, $other) = @_;
849 if ($self->interval_secs <= $other->interval_secs) {
850 die sprintf
852 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
853 $self->interval_secs,
854 $other->interval_secs,
859 =head2 merged
861 Hashref denoting when this recentfile has been merged into some other
862 at which epoch.
864 =cut
866 sub merged {
867 my($self, $set) = @_;
868 if (defined $set) {
869 $self->_merged ($set);
871 my $merged = $self->_merged;
872 my $into;
873 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
874 if ($into eq $self->interval) {
875 require Carp;
876 Carp::cluck(sprintf
878 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
879 $into,
880 $self->interval,
882 } elsif ($self->interval_secs($into) < $self->interval_secs) {
883 require Carp;
884 Carp::cluck(sprintf
886 "Warning: into_interval[%s] smaller than own interval[%s] on interval[%s]. Danger ahead.",
887 $self->interval_secs($into),
888 $self->interval_secs,
889 $self->interval,
893 $merged;
896 =head2 $hashref = $obj->meta_data
898 Returns the hashref of metadata that the server has to add to the
899 I<recentfile>.
901 =cut
903 sub meta_data {
904 my($self) = @_;
905 my $ret = $self->{meta};
906 for my $m (
907 "aggregator",
908 "canonize",
909 "comment",
910 "dirtymark",
911 "filenameroot",
912 "merged",
913 "interval",
914 "protocol",
915 "serializer_suffix",
917 my $v = $self->$m;
918 if (defined $v) {
919 $ret->{$m} = $v;
922 # XXX need to reset the Producer if I am a writer, keep it when I
923 # am a reader
924 $ret->{Producers} ||= {
925 __PACKAGE__, "$VERSION", # stringified it looks better
926 '$0', $0,
927 'time', Time::HiRes::time,
929 $ret->{dirtymark} ||= Time::HiRes::time;
930 return $ret;
933 =head2 $success = $obj->mirror ( %options )
935 Mirrors the files in this I<recentfile> as reported by
936 C<recent_events>. Options named C<after>, C<before>, C<max>, and
937 C<skip-deletes> are passed through to the L<recent_events> call. The
938 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
939 C<max_files_per_connection> and keep track of the rsynced files so
940 that future calls will rsync different files until all files are
941 brought to sync.
943 =cut
945 sub mirror {
946 my($self, %options) = @_;
947 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
948 $self->_use_tempfile (1);
949 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
950 my ($recent_events) = $self->recent_events(%passthrough);
951 my(@error, @xcollector);
952 my $first_item = 0;
953 my $last_item = $#$recent_events;
954 my $done = $self->done;
955 my $pathdb = $self->_pathdb;
956 ITEM: for my $i ($first_item..$last_item) {
957 my $status = +{};
958 $self->_mirror_item
961 $recent_events,
962 $last_item,
963 $done,
964 $pathdb,
965 \@xcollector,
966 \%options,
967 $status,
968 \@error,
970 last if $i == $last_item;
971 return if $status->{mustreturn};
973 if (@xcollector) {
974 my $success = eval { $self->_mirror_empty_xcollector (\@xcollector,$pathdb,$recent_events);};
975 if (!$success || $@) {
976 warn "Warning: Unknown error while mirroring: $@";
977 push @error, $@;
978 sleep 1;
981 if ($self->verbose) {
982 print STDERR "DONE\n";
984 # once we've gone to the end we consider ourselve free of obligations
985 $self->unseed;
986 $self->_mirror_unhide_tempfile ($trecentfile);
987 $self->_mirror_perform_delayed_ops;
988 return !@error;
991 sub _mirror_item {
992 my($self,
994 $recent_events,
995 $last_item,
996 $done,
997 $pathdb,
998 $xcollector,
999 $options,
1000 $status,
1001 $error,
1002 ) = @_;
1003 my $recent_event = $recent_events->[$i];
1004 return if $done->covered ( $recent_event->{epoch} );
1005 if ($pathdb) {
1006 my $rec = $pathdb->{$recent_event->{path}};
1007 if ($rec && $rec->{recentepoch}) {
1008 if (_bigfloatgt
1009 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1010 $done->register ($recent_events, [$i]);
1011 return;
1015 my $dst = $self->local_path($recent_event->{path});
1016 if ($recent_event->{type} eq "new"){
1017 $self->_mirror_item_new
1019 $dst,
1021 $last_item,
1022 $recent_events,
1023 $recent_event,
1024 $xcollector,
1025 $pathdb,
1026 $status,
1027 $error,
1028 $options,
1030 } elsif ($recent_event->{type} eq "delete") {
1031 my $activity;
1032 if ($options->{'skip-deletes'}) {
1033 $activity = "skipped";
1034 } else {
1035 if (! -e $dst) {
1036 $activity = "not_found";
1037 } elsif (-l $dst or not -d _) {
1038 $self->delayed_operations->{unlink}{$dst}++;
1039 $activity = "deleted";
1040 } else {
1041 $self->delayed_operations->{rmdir}{$dst}++;
1042 $activity = "deleted";
1045 $done->register ($recent_events, [$i]);
1046 if ($pathdb) {
1047 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1049 } else {
1050 warn "Warning: invalid upload type '$recent_event->{type}'";
1054 sub _mirror_item_new {
1055 my($self,
1056 $dst,
1058 $last_item,
1059 $recent_events,
1060 $recent_event,
1061 $xcollector,
1062 $pathdb,
1063 $status,
1064 $error,
1065 $options,
1066 ) = @_;
1067 if ($self->verbose) {
1068 my $doing = -e $dst ? "Sync" : "Get";
1069 printf STDERR
1071 "%-4s %d (%d/%d/%s) %s ... ",
1072 $doing,
1073 time,
1074 1+$i,
1075 1+$last_item,
1076 $self->interval,
1077 $recent_event->{path},
1080 my $max_files_per_connection = $self->max_files_per_connection || 42;
1081 my $success;
1082 if ($self->verbose) {
1083 print STDERR "\n";
1085 push @$xcollector, { rev => $recent_event, i => $i };
1086 if (@$xcollector >= $max_files_per_connection) {
1087 $success = eval {$self->_mirror_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1088 my $sleep = $self->sleep_per_connection;
1089 $sleep = 0.42 unless defined $sleep;
1090 Time::HiRes::sleep $sleep;
1091 if ($options->{piecemeal}) {
1092 $status->{mustreturn} = 1;
1093 return;
1095 } else {
1096 return;
1098 if (!$success || $@) {
1099 warn "Warning: Error while mirroring: $@";
1100 push @$error, $@;
1101 sleep 1;
1103 if ($self->verbose) {
1104 print STDERR "DONE\n";
1108 sub _mirror_empty_xcollector {
1109 my($self,$xcoll,$pathdb,$recent_events) = @_;
1110 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1111 if ($pathdb) {
1112 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1114 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1115 @$xcoll = ();
1116 return $success;
1119 sub _mirror_register_path {
1120 my($self,$pathdb,$coll,$activity) = @_;
1121 my $time = time;
1122 for my $item (@$coll) {
1123 $pathdb->{$item->{path}} =
1125 recentepoch => $item->{epoch},
1126 ($activity."_on") => $time,
1131 sub _mirror_unhide_tempfile {
1132 my($self, $trecentfile) = @_;
1133 my $rfile = $self->rfile;
1134 if (rename $trecentfile, $rfile) {
1135 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1136 } else {
1137 require Carp;
1138 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1140 $self->_use_tempfile (0);
1141 if (my $ctfh = $self->_current_tempfile_fh) {
1142 $ctfh->unlink_on_destroy (0);
1143 $self->_current_tempfile_fh (undef);
1147 sub _mirror_perform_delayed_ops {
1148 my($self) = @_;
1149 my $delayed = $self->delayed_operations;
1150 for my $dst (keys %{$delayed->{unlink}}) {
1151 unless (unlink $dst) {
1152 require Carp;
1153 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1155 delete $delayed->{unlink}{$dst};
1157 for my $dst (keys %{$delayed->{rmdir}}) {
1158 unless (rmdir $dst) {
1159 require Carp;
1160 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1162 delete $delayed->{rmdir}{$dst};
1166 =head2 (void) $obj->mirror_loop
1168 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1169 What happens/should happen if we miss the interval during a single loop?
1171 =cut
1173 sub mirror_loop {
1174 my($self) = @_;
1175 my $iteration_start = time;
1177 my $Signal = 0;
1178 $SIG{INT} = sub { $Signal++ };
1179 my $loopinterval = $self->loopinterval || 42;
1180 my $after = -999999999;
1181 LOOP: while () {
1182 $self->mirror($after);
1183 last LOOP if $Signal;
1184 my $re = $self->recent_events;
1185 $after = $re->[0]{epoch};
1186 if ($self->verbose) {
1187 local $| = 1;
1188 print "($after)";
1190 if (time - $iteration_start < $loopinterval) {
1191 sleep $iteration_start + $loopinterval - time;
1193 if ($self->verbose) {
1194 local $| = 1;
1195 print "~";
1200 =head2 $success = $obj->mirror_path ( $arrref | $path )
1202 If the argument is a scalar it is treated as a path. The remote path
1203 is mirrored into the local copy. $path is the path found in the
1204 I<recentfile>, i.e. it is relative to the root directory of the
1205 mirror.
1207 If the argument is an array reference then all elements are treated as
1208 a path below the current tree and all are rsynced with a single
1209 command (and a single connection).
1211 =cut
1213 sub mirror_path {
1214 my($self,$path) = @_;
1215 # XXX simplify the two branches such that $path is treated as
1216 # [$path] maybe even demand the argument as an arrayref to
1217 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1218 # interface)
1219 if (ref $path and ref $path eq "ARRAY") {
1220 my $dst = $self->localroot;
1221 mkpath dirname $dst;
1222 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1223 lc $self->filenameroot,
1225 TMPDIR => 1,
1226 UNLINK => 0,
1228 for my $p (@$path) {
1229 print $fh $p, "\n";
1231 $fh->flush;
1232 $fh->unlink_on_destroy(1);
1233 my $gaveup = 0;
1234 my $retried = 0;
1235 while (!$self->rsync->exec
1237 src => join("/",
1238 $self->remoteroot,
1240 dst => $dst,
1241 'files-from' => $fh->filename,
1242 )) {
1243 my(@err) = $self->rsync->err;
1244 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1245 if ($self->verbose) {
1246 warn "Info: ignoring link_stat error '@err'";
1248 return 1;
1250 $self->register_rsync_error (@err);
1251 if (++$retried >= 3) {
1252 warn "XXX giving up.";
1253 $gaveup = 1;
1254 last;
1257 unless ($gaveup) {
1258 $self->un_register_rsync_error ();
1260 } else {
1261 my $dst = $self->local_path($path);
1262 mkpath dirname $dst;
1263 while (!$self->rsync->exec
1265 src => join("/",
1266 $self->remoteroot,
1267 $path
1269 dst => $dst,
1270 )) {
1271 my(@err) = $self->rsync->err;
1272 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1273 if ($self->verbose) {
1274 warn "Info: ignoring link_stat error '@err'";
1276 return 1;
1278 $self->register_rsync_error (@err);
1280 $self->un_register_rsync_error ();
1282 return 1;
1285 sub _my_current_rfile {
1286 my($self) = @_;
1287 my $rfile;
1288 if ($self->_use_tempfile) {
1289 $rfile = $self->_current_tempfile;
1290 } else {
1291 $rfile = $self->rfile;
1293 return $rfile;
1296 =head2 $path = $obj->naive_path_normalize ($path)
1298 Takes an absolute unix style path as argument and canonicalizes it to
1299 a shorter path if possible, removing things like double slashes or
1300 C</./> and removes references to C<../> directories to get a shorter
1301 unambiguos path. This is used to make the code easier that determines
1302 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1304 =cut
1306 sub naive_path_normalize {
1307 my($self,$path) = @_;
1308 $path =~ s|/+|/|g;
1309 1 while $path =~ s|/[^/]+/\.\./|/|;
1310 $path =~ s|/$||;
1311 $path;
1314 =head2 $ret = $obj->read_recent_1 ( $data )
1316 Delegate of C<recent_events()> on protocol 1
1318 =cut
1320 sub read_recent_1 {
1321 my($self, $data) = @_;
1322 return $data->{recent};
1325 =head2 $array_ref = $obj->recent_events ( %options )
1327 Note: the code relies on the resource being written atomically. We
1328 cannot lock because we may have no write access. If the caller has
1329 write access (eg. aggregate() or update()), it has to care for any
1330 necessary locking and it MUST write atomically.
1332 If $options{after} is specified, only file events after this timestamp
1333 are returned.
1335 If $options{before} is specified, only file events before this
1336 timestamp are returned.
1338 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1339 be returned.
1341 If $options{max} is specified only a maximum of this many events is
1342 returned.
1344 If $options{contains} is specified the value must be a hash reference
1345 containing a query. The query may contain the keys C<epoch>, C<path>,
1346 and C<type>. Each represents a condition that must be met. If there is
1347 more than one such key, the conditions are ANDed.
1349 If $options{info} is specified, it must be a hashref. This hashref
1350 will be filled with metadata about the unfiltered recent_events of
1351 this object, in key C<first> there is the first item, in key C<last>
1352 is the last.
1354 =cut
1356 sub recent_events {
1357 my ($self, %options) = @_;
1358 my $info = $options{info};
1359 if ($self->is_slave) {
1360 $self->get_remote_recentfile_as_tempfile;
1362 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1363 -e $rfile_or_tempfile or return [];
1364 my $suffix = $self->serializer_suffix;
1365 my ($data) = eval {
1366 $self->_try_deserialize
1368 $suffix,
1369 $rfile_or_tempfile,
1372 my $err = $@;
1373 if ($err or !$data) {
1374 return [];
1376 my $re;
1377 if (reftype $data eq 'ARRAY') { # protocol 0
1378 $re = $data;
1379 } else {
1380 $re = $self->_recent_events_protocol_x
1382 $data,
1383 $rfile_or_tempfile,
1386 return $re unless grep {defined $options{$_}} qw(after before max);
1387 $self->_recent_events_handle_options ($re, \%options);
1390 sub _recent_events_handle_options {
1391 my($self, $re, $options) = @_;
1392 my $last_item = $#$re;
1393 my $info = $options->{info};
1394 if ($info) {
1395 $info->{first} = $re->[0];
1396 $info->{last} = $re->[-1];
1398 if (defined $options->{after}) {
1399 if ($re->[0]{epoch} > $options->{after}) {
1400 if (
1401 my $f = first
1402 {$re->[$_]{epoch} <= $options->{after}}
1403 0..$#$re
1405 $last_item = $f-1;
1407 } else {
1408 $last_item = -1;
1411 my $first_item = 0;
1412 if (defined $options->{before}) {
1413 if ($re->[0]{epoch} > $options->{before}) {
1414 if (
1415 my $f = first
1416 {$re->[$_]{epoch} < $options->{before}}
1417 0..$last_item
1419 $first_item = $f;
1421 } else {
1422 $first_item = 0;
1425 if (0 != $first_item || -1 != $last_item) {
1426 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1428 if ($options->{'skip-deletes'}) {
1429 @$re = grep { $_->{type} ne "delete" } @$re;
1431 if (my $contopt = $options->{contains}) {
1432 my $seen_allowed = 0;
1433 for my $allow (qw(epoch path type)) {
1434 if (exists $contopt->{$allow}) {
1435 $seen_allowed++;
1436 my $v = $contopt->{$allow};
1437 @$re = grep { $_->{$allow} eq $v } @$re;
1440 if (keys %$contopt > $seen_allowed) {
1441 require Carp;
1442 Carp::confess
1443 (sprintf "unknown query: %s", join ", ", %$contopt);
1446 if ($options->{max} && @$re > $options->{max}) {
1447 @$re = splice @$re, 0, $options->{max};
1449 $re;
1452 sub _recent_events_protocol_x {
1453 my($self,
1454 $data,
1455 $rfile_or_tempfile,
1456 ) = @_;
1457 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1458 # we may be reading meta for the first time
1459 while (my($k,$v) = each %{$data->{meta}}) {
1460 next if $k ne lc $k; # "Producers"
1461 next if defined $self->$k;
1462 $self->$k($v);
1464 my $re = $self->$meth ($data);
1465 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1466 my $minmax = { mtime => $stat[9] };
1467 if (@$re) {
1468 $minmax->{min} = $re->[-1]{epoch};
1469 $minmax->{max} = $re->[0]{epoch};
1471 $self->minmax ( $minmax );
1472 return $re;
1475 sub _try_deserialize {
1476 my($self,
1477 $suffix,
1478 $rfile_or_tempfile,
1479 ) = @_;
1480 if ($suffix eq ".yaml") {
1481 require YAML::Syck;
1482 YAML::Syck::LoadFile($rfile_or_tempfile);
1483 } elsif ($HAVE->{"Data::Serializer"}) {
1484 my $serializer = Data::Serializer->new
1485 ( serializer => $serializers{$suffix} );
1486 my $serialized = do
1488 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1489 local $/;
1490 <$fh>;
1492 $serializer->raw_deserialize($serialized);
1493 } else {
1494 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1498 sub _refresh_internals {
1499 my($self, $dst) = @_;
1500 my $class = ref $self;
1501 my $rfpeek = $class->new_from_file ($dst);
1502 for my $acc (qw(
1503 _merged
1504 minmax
1505 )) {
1506 $self->$acc ( $rfpeek->$acc );
1508 my $old_dirtymark = $self->dirtymark;
1509 my $new_dirtymark = $rfpeek->dirtymark;
1510 if ($old_dirtymark && $new_dirtymark && _bigfloatgt($new_dirtymark,$old_dirtymark)) {
1511 $self->done->reset;
1512 $self->dirtymark ( $new_dirtymark );
1513 $self->seed;
1517 =head2 $ret = $obj->rfilename
1519 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1520 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1522 =cut
1524 sub rfilename {
1525 my($self) = @_;
1526 my $file = sprintf("%s-%s%s",
1527 $self->filenameroot,
1528 $self->interval,
1529 $self->serializer_suffix,
1531 return $file;
1534 =head2 $str = $self->remote_dir
1536 The directory we are mirroring from.
1538 =cut
1540 sub remote_dir {
1541 my($self, $set) = @_;
1542 if (defined $set) {
1543 $self->_remote_dir ($set);
1545 my $x = $self->_remote_dir;
1546 $self->is_slave (1);
1547 return $x;
1550 =head2 $str = $obj->remoteroot
1552 =head2 (void) $obj->remoteroot ( $set )
1554 Get/Set the composed prefix needed when rsyncing from a remote module.
1555 If remote_host, remote_module, and remote_dir are set, it is composed
1556 from these.
1558 =cut
1560 sub remoteroot {
1561 my($self, $set) = @_;
1562 if (defined $set) {
1563 $self->_remoteroot($set);
1565 my $remoteroot = $self->_remoteroot;
1566 unless (defined $remoteroot) {
1567 $remoteroot = sprintf
1569 "%s%s%s",
1570 defined $self->remote_host ? ($self->remote_host."::") : "",
1571 defined $self->remote_module ? ($self->remote_module."/") : "",
1572 defined $self->remote_dir ? $self->remote_dir : "",
1574 $self->_remoteroot($remoteroot);
1576 return $remoteroot;
1579 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1581 Inverse method to L<rfilename>. $recentfilename is a plain filename of
1582 the pattern
1584 $filenameroot-$interval$serializer_suffix
1586 e.g.
1588 RECENT-1M.yaml
1590 This filename is split into its parts and the parts are fed to the
1591 object itself.
1593 =cut
1595 sub resolve_recentfilename {
1596 my($self, $rfname) = @_;
1597 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1598 if (my($f,$i,$s) = $rfname =~ $splitter) {
1599 $self->filenameroot ($f);
1600 $self->interval ($i);
1601 $self->serializer_suffix ($s);
1602 } else {
1603 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1605 return;
1608 =head2 my $rfile = $obj->rfile
1610 Returns the full path of the I<recentfile>
1612 =cut
1614 sub rfile {
1615 my($self) = @_;
1616 my $rfile = $self->_rfile;
1617 return $rfile if defined $rfile;
1618 $rfile = File::Spec->catfile
1619 ($self->localroot,
1620 $self->rfilename,
1622 $self->_rfile ($rfile);
1623 return $rfile;
1626 =head2 $rsync_obj = $obj->rsync
1628 The File::Rsync object that this object uses for communicating with an
1629 upstream server.
1631 =cut
1633 sub rsync {
1634 my($self) = @_;
1635 my $rsync = $self->_rsync;
1636 unless (defined $rsync) {
1637 my $rsync_options = $self->rsync_options || {};
1638 if ($HAVE->{"File::Rsync"}) {
1639 $rsync = File::Rsync->new($rsync_options);
1640 $self->_rsync($rsync);
1641 } else {
1642 die "File::Rsync required for rsync operations. Cannot continue";
1645 return $rsync;
1648 =head2 (void) $obj->register_rsync_error(@err)
1650 =head2 (void) $obj->un_register_rsync_error()
1652 Register_rsync_error is called whenever the File::Rsync object fails
1653 on an exec (say, connection doesn't succeed). It issues a warning and
1654 sleeps for an increasing amount of time. Un_register_rsync_error
1655 resets the error count. See also accessor C<max_rsync_errors>.
1657 =cut
1660 my $no_success_count = 0;
1661 my $no_success_time = 0;
1662 sub register_rsync_error {
1663 my($self, @err) = @_;
1664 chomp @err;
1665 $no_success_time = time;
1666 $no_success_count++;
1667 my $max_rsync_errors = $self->max_rsync_errors;
1668 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1669 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1670 require Carp;
1671 Carp::confess
1673 sprintf
1675 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1676 $self->interval,
1677 join(" ",@err),
1678 $no_success_count,
1681 my $sleep = 12 * $no_success_count;
1682 $sleep = 300 if $sleep > 300;
1683 require Carp;
1684 Carp::cluck
1685 (sprintf
1687 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1688 scalar(localtime($no_success_time)),
1689 $self->interval,
1690 join(" ",@err),
1691 $sleep,
1693 sleep $sleep
1695 sub un_register_rsync_error {
1696 my($self) = @_;
1697 $no_success_time = 0;
1698 $no_success_count = 0;
1702 =head2 $clone = $obj->_sparse_clone
1704 Clones just as much from itself that it does not hurt. Experimental
1705 method.
1707 Note: what fits better: sparse or shallow? Other suggestions?
1709 =cut
1711 sub _sparse_clone {
1712 my($self) = @_;
1713 my $new = bless {}, ref $self;
1714 for my $m (qw(
1715 _interval
1716 _localroot
1717 _remoteroot
1718 _rfile
1719 _use_tempfile
1720 aggregator
1721 dirtymark
1722 filenameroot
1723 is_slave
1724 max_files_per_connection
1725 protocol
1726 rsync_options
1727 serializer_suffix
1728 sleep_per_connection
1729 verbose
1730 )) {
1731 my $o = $self->$m;
1732 $o = Storable::dclone $o if ref $o;
1733 $new->$m($o);
1735 $new;
1738 =head2 $boolean = OBJ->ttl_reached ()
1740 =cut
1742 sub ttl_reached {
1743 my($self) = @_;
1744 my $have_mirrored = $self->have_mirrored || 0;
1745 my $now = Time::HiRes::time;
1746 my $ttl = $self->ttl;
1747 $ttl = 24.2 unless defined $ttl;
1748 if ($now > $have_mirrored + $ttl) {
1749 return 1;
1751 return 0;
1754 =head2 (void) $obj->unlock()
1756 Unlocking is implemented with an C<rmdir> on a locking directory
1757 (C<.lock> appended to $rfile).
1759 =cut
1761 sub unlock {
1762 my($self) = @_;
1763 return unless $self->_is_locked;
1764 my $rfile = $self->rfile;
1765 rmdir "$rfile.lock";
1766 $self->_is_locked (0);
1769 =head2 unseed
1771 Sets this recentfile in the state of not 'seeded'.
1773 =cut
1774 sub unseed {
1775 my($self) = @_;
1776 $self->seeded(0);
1779 =head2 $ret = $obj->update ($path, $type)
1781 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1783 Enter one file into the local I<recentfile>. $path is the (usually
1784 absolute) path. If the path is outside I<our> tree, then it is
1785 ignored.
1787 $type is one of C<new> or C<delete>.
1789 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1790 not used and the epoch is calculated by the update() routine itself
1791 based on current time. But if there is the demand to insert a
1792 not-so-current file into the dataset, then the caller sets
1793 $dirty_epoch. This causes the epoch of the registered event to become
1794 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1795 bit more. As compensation the dirtymark of the whole dataset is set to
1796 the current epoch.
1798 The new file event is unshifted (or, if dirty_epoch is set, inserted
1799 at the place it belongs to, according to the rule to have a sequence
1800 of strictly decreasing timestamps) to the array of recent_events and
1801 the array is shortened to the length of the timespan allowed. This is
1802 usually the timespan specified by the interval of this recentfile but
1803 as long as this recentfile has not been merged to another one, the
1804 timespan may grow without bounds.
1806 =cut
1807 sub _epoch_monotonically_increasing {
1808 my($self,$epoch,$recent) = @_;
1809 return $epoch unless @$recent; # the first one goes unoffended
1810 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1811 return $epoch;
1812 } else {
1813 return _increase_a_bit($recent->[0]{epoch});
1816 sub update {
1817 my($self,$path,$type,$dirty_epoch) = @_;
1818 die "update called without path argument" unless defined $path;
1819 die "update called without type argument" unless defined $type;
1820 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1821 die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1822 "dirty_epoch only allowed with type=new" if $dirty_epoch and $type ne "new";
1823 my $canonmeth = $self->canonize;
1824 unless ($canonmeth) {
1825 $canonmeth = "naive_path_normalize";
1827 $path = $self->$canonmeth($path);
1828 my $lrd = $self->localroot;
1829 if ($path =~ s|^\Q$lrd\E||) {
1830 $path =~ s|^/||;
1831 my $interval = $self->interval;
1832 my $secs = $self->interval_secs();
1833 $self->lock;
1834 # you must calculate the time after having locked, of course
1835 my $now = Time::HiRes::time;
1836 my $recent = $self->recent_events;
1838 my $epoch;
1839 if ($dirty_epoch) {
1840 $epoch = $dirty_epoch;
1841 } else {
1842 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
1845 $recent ||= [];
1846 my $oldest_allowed = 0;
1847 if (my $merged = $self->merged) {
1848 my $virtualnow = max($now,$epoch);
1849 # for the lower bound could we need big math?
1850 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
1851 } else {
1852 # as long as we are not merged at all, no limits!
1854 TRUNCATE: while (@$recent) {
1855 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
1856 pop @$recent;
1857 } else {
1858 last TRUNCATE;
1861 my $splicepos;
1862 # remove the older duplicates of this $path, irrespective of $type:
1863 if ($dirty_epoch) {
1864 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
1865 $recent = $ctx->{recent};
1866 $splicepos = $ctx->{splicepos};
1867 $epoch = $ctx->{epoch};
1868 my $dirtymark = $self->dirtymark;
1869 my $new_dm = $now;
1870 if (_bigfloatgt($epoch, $now)) {
1871 $new_dm = $epoch;
1873 $self->dirtymark($new_dm);
1874 } else {
1875 $recent = [ grep { $_->{path} ne $path } @$recent ];
1876 $splicepos = 0;
1878 if (defined $splicepos) {
1879 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
1882 $self->write_recent($recent);
1883 $self->_assert_symlink;
1884 $self->unlock;
1888 sub _update_with_dirty_epoch {
1889 my($self,$path,$recent,$epoch) = @_;
1890 my $splicepos;
1891 my $new_recent = [];
1892 if (grep { $_->{path} ne $path } @$recent) {
1893 my $cancel = 0;
1894 KNOWN_EVENT: for my $i (0..$#$recent) {
1895 if ($recent->[$i]{path} eq $path) {
1896 if ($recent->[$i]{epoch} eq $epoch) {
1897 # nothing to do
1898 $cancel = 1;
1899 last KNOWN_EVENT;
1901 } else {
1902 push @$new_recent, $recent->[$i];
1905 @$recent = @$new_recent unless $cancel;
1907 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
1908 $splicepos = 0;
1909 } elsif (_bigfloatlt($epoch,$recent->[0]{epoch})) {
1910 $splicepos = @$recent;
1911 } else {
1912 RECENT: for my $i (0..$#$recent) {
1913 my $ev = $recent->[$i];
1914 if ($epoch eq $recent->[$i]{epoch}) {
1915 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
1917 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
1918 $splicepos = $i;
1919 last RECENT;
1923 return {
1924 recent => $recent,
1925 splicepos => $splicepos,
1926 epoch => $epoch,
1930 =head2 seed
1932 Sets this recentfile in the state of 'seeded' which means it has to
1933 re-evaluate its uptodateness.
1935 =cut
1936 sub seed {
1937 my($self) = @_;
1938 $self->seeded(1);
1941 =head2 seeded
1943 Tells if the recentfile is in the state 'seeded'.
1945 =cut
1946 sub seeded {
1947 my($self, $set) = @_;
1948 if (defined $set) {
1949 $self->_seeded ($set);
1951 my $x = $self->_seeded;
1952 unless (defined $x) {
1953 $x = 0;
1954 $self->_seeded ($x);
1956 return $x;
1959 =head2 uptodate
1961 True if this object has mirrored the complete interval covered by the
1962 current recentfile.
1964 *** WIP ***
1966 =cut
1967 sub uptodate {
1968 my($self) = @_;
1969 my $uptodate;
1970 my $why;
1971 if ($self->_uptodateness_ever_reached and not $self->seeded) {
1972 $why = "saturated";
1973 $uptodate = 1;
1975 unless (defined $uptodate) {
1976 if ($self->ttl_reached){
1977 $why = "ttl_reached returned true, so we are not uptodate";
1978 $uptodate = 0 ;
1981 unless (defined $uptodate) {
1982 # look if recentfile has unchanged timestamp
1983 my $minmax = $self->minmax;
1984 if (exists $minmax->{mtime}) {
1985 my $rfile = $self->_my_current_rfile;
1986 my @stat = stat $rfile;
1987 my $mtime = $stat[9];
1988 if ($mtime > $minmax->{mtime}) {
1989 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
1990 $uptodate = 0;
1991 } else {
1992 my $covered = $self->done->covered(@$minmax{qw(max min)});
1993 $why = "minmax covered[$covered], so we return that";
1994 $uptodate = $covered;
1998 unless (defined $uptodate) {
1999 $why = "fallthrough, so not uptodate";
2000 $uptodate = 0;
2002 if ($uptodate) {
2003 $self->_uptodateness_ever_reached(1);
2004 $self->unseed;
2006 my $remember =
2008 uptodate => $uptodate,
2009 why => $why,
2011 $self->_remember_last_uptodate_call($remember);
2012 return $uptodate;
2015 =head2 $obj->write_recent ($recent_files_arrayref)
2017 Writes a I<recentfile> based on the current reflection of the current
2018 state of the tree limited by the current interval.
2020 =cut
2022 sub write_recent {
2023 my ($self,$recent) = @_;
2024 die "write_recent called without argument" unless defined $recent;
2025 my $meth = sprintf "write_%d", $self->protocol;
2026 $self->$meth($recent);
2029 =head2 $obj->write_0 ($recent_files_arrayref)
2031 Delegate of C<write_recent()> on protocol 0
2033 =cut
2035 sub write_0 {
2036 my ($self,$recent) = @_;
2037 my $rfile = $self->rfile;
2038 YAML::Syck::DumpFile("$rfile.new",$recent);
2039 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2042 =head2 $obj->write_1 ($recent_files_arrayref)
2044 Delegate of C<write_recent()> on protocol 1
2046 =cut
2048 sub write_1 {
2049 my ($self,$recent) = @_;
2050 my $rfile = $self->rfile;
2051 my $suffix = $self->serializer_suffix;
2052 my $data = {
2053 meta => $self->meta_data,
2054 recent => $recent,
2056 my $serialized;
2057 if ($suffix eq ".yaml") {
2058 $serialized = YAML::Syck::Dump($data);
2059 } elsif ($HAVE->{"Data::Serializer"}) {
2060 my $serializer = Data::Serializer->new
2061 ( serializer => $serializers{$suffix} );
2062 $serialized = $serializer->raw_serialize($data);
2063 } else {
2064 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2066 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2067 print $fh $serialized;
2068 close $fh or die "Could not close '$rfile.new': $!";
2069 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2072 BEGIN {
2073 my @pod_lines =
2074 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2076 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
2078 The idea is that we want to have a short file that records really
2079 recent changes. So that a fresh mirror can be kept fresh as long as
2080 the connectivity is given. Then we want longer files that record the
2081 history before. So when the mirror falls behind the update period
2082 reflected in the shortest file, it can complement the list of recent
2083 file events with the next one. And if this is not long enough we want
2084 another one, again a bit longer. And we want one that completes the
2085 history back to the oldest file. The index files do contain the
2086 complete list of current files. The larger an index file is the less
2087 often it is updated. For practical reasons adjacent files will often
2088 overlap a bit but this is neither necessary nor enforced. That's the
2089 basic idea. The following example represents a tree that has a few
2090 updates every day:
2092 RECENT.recent -> RECENT-1h.yaml
2093 RECENT-6h.yaml
2094 RECENT-1d.yaml
2095 RECENT-1M.yaml
2096 RECENT-1W.yaml
2097 RECENT-1Q.yaml
2098 RECENT-1Y.yaml
2099 RECENT-Z.yaml
2101 The first file is the principal file, in so far it is the one that is
2102 written first after a filesystem change. Usually a symlink links to it
2103 with a filename that has the same filenameroot and the suffix
2104 C<.recent>. On systems that do not support symlinks there is a plain
2105 copy maintained instead.
2107 The last file, the Z file, contains the complementary files that are
2108 in none of the other files. It does never contain C<deletes>. Besides
2109 this it serves the role of a recovery mechanism or spill over pond.
2110 When things go wrong, it's a valuable controlling instance to hold the
2111 differences between the collection of limited interval files and the
2112 actual filesystem.
2114 =head2 A SINGLE RECENTFILE
2116 A I<recentfile> consists of a hash that has two keys: C<meta> and
2117 C<recent>. The C<meta> part has metadata and the C<recent> part has a
2118 list of fileobjects.
2120 =head2 THE META PART
2122 Here we find things that are pretty much self explaining: all
2123 lowercase attributes are accessors and as such explained somewhere
2124 above in this manpage. The uppercase attribute C<Producers> contains
2125 version information about involved software components. Nothing to
2126 worry about as I believe.
2128 =head2 THE RECENT PART
2130 This is the interesting part. Every entry refers to some filesystem
2131 change (with path, epoch, type). The epoch value is the point in time
2132 when some change was I<registered>. Do not be tempted to believe that
2133 the entry has a direct relation to something like modification time or
2134 change time on the filesystem level. The timestamp (I<epoch> element)
2135 is a floating point number and does practically never correspond
2136 exactly to the data recorded in the filesystem but rather to the time
2137 when some process succeeded to report to the I<recentfile> mechanism
2138 that something has changed. This is why many parts of the code refer
2139 to I<events>, because we merely try to record the I<event> of the
2140 discovery of a change, not the time of the change itself.
2142 All these entries can be devided into two types (denoted by the
2143 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
2144 C<new>s. Deletes are C<delete>s.
2146 Another distinction is for objects with an epoch timestamp and others
2147 without. All files that were already existing on the filesystem before
2148 the I<recentfile> mechanism was installed, get recorded with a
2149 timestamp of zero.
2151 Besides an C<epoch> and a C<type> attribute we find a third one:
2152 C<path>. This path is relative to the directory we find the
2153 I<recentfile> in.
2155 The order of the entries in the I<recentfile> is by decreasing epoch
2156 attribute. These are either 0 or a unique floating point number. They
2157 are zero for events that were happening either before the time that
2158 the I<recentfile> mechanism was set up or were left undiscovered for a
2159 while and never handed over to update(). They are floating point
2160 numbers for all events being regularly handed to update(). And when
2161 the server has ntp running correctly, then the timestamps are
2162 actually decreasing and unique.
2164 =head1 CORRUPTION AND RECOVERY
2166 If the origin host breaks the promise to deliver consistent and
2167 complete I<recentfiles> then the way back to sanity shall be achieved
2168 through either the C<zloop> (still TBD) or traditional rsyncing
2169 between the hosts. For example, if the origin server forgets to deploy
2170 ntp and the clock on it jumps backwards some day, then this would
2171 probably go unnoticed for a while and many software components that
2172 rely on the time never running backwards will make wrong decisions.
2173 After some time this accident would probably still be found in one of
2174 the I<recentfiles> but would become meaningless as soon as a mirror
2175 has run through the sanitizing procedures. Same goes for origin hosts
2176 that forget to include or deliberately omit some files.
2178 =head1 SERIALIZERS
2180 The following suffixes are supported and trigger the use of these
2181 serializers:
2183 =over 4
2185 =item C<< ".yaml" => "YAML::Syck" >>
2187 =item C<< ".json" => "JSON" >>
2189 =item C<< ".sto" => "Storable" >>
2191 =item C<< ".dd" => "Data::Dumper" >>
2193 =back
2195 =cut
2197 BEGIN {
2198 my @pod_lines =
2199 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2201 =head1 INTERVAL SPEC
2203 An interval spec is a primitive way to express time spans. Normally it
2204 is composed from an integer and a letter.
2206 As a special case, a string that consists only of the single letter
2207 C<Z>, stands for unlimited time.
2209 The following letters express the specified number of seconds:
2211 =over 4
2213 =item C<< s => 1 >>
2215 =item C<< m => 60 >>
2217 =item C<< h => 60*60 >>
2219 =item C<< d => 60*60*24 >>
2221 =item C<< W => 60*60*24*7 >>
2223 =item C<< M => 60*60*24*30 >>
2225 =item C<< Q => 60*60*24*90 >>
2227 =item C<< Y => 60*60*24*365.25 >>
2229 =back
2231 =cut
2233 =head1 BACKGROUND
2235 This is about speeding up rsync operation on large trees to many
2236 places. Uses a small metadata cocktail and pull technology.
2238 =head2 NON-COMPETITORS
2240 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
2241 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
2242 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
2243 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
2244 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
2245 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
2247 rsnapshot www.rsnapshot.org focus on backup
2248 csync www.csync.org more like unison
2249 multi-rsync sourceforge 167893 lan push to many
2251 =head2 COMPETITORS
2253 The problem to solve which clusters and ftp mirrors and otherwise
2254 replicated datasets like CPAN share: how to transfer only a minimum
2255 amount of data to determine the diff between two hosts.
2257 Normally it takes a long time to determine the diff itself before it
2258 can be transferred. Known solutions at the time of this writing are
2259 csync2, and rsync 3 batch mode.
2261 For many years the best solution was csync2 which solves the
2262 problem by maintining a sqlite database on both ends and talking a
2263 highly sophisticated protocol to quickly determine which files to send
2264 and which to delete at any given point in time. Csync2 is often
2265 inconvenient because the act of syncing demands quite an intimate
2266 relationship between the sender and the receiver and suffers when the
2267 number of syncing sites is large or connections are unreliable.
2269 Rsync 3 batch mode works around these problems by providing rsync-able
2270 batch files which allow receiving nodes to replay the history of the
2271 other nodes. This reduces the need to have an incestuous relation but
2272 it has the disadvantage that these batch files replicate the contents
2273 of the involved files. This seems inappropriate when the nodes already
2274 have a means of communicating over rsync.
2276 rersyncrecent solves this problem with a couple of (usually 2-10)
2277 index files which cover different overlapping time intervals. The
2278 master writes these files and the clients can construct the full tree
2279 from the information contained in them. The most recent index file
2280 usually covers the last seconds or minutes or hours of the tree and
2281 depending on the needs, slaves can rsync every few seconds and then
2282 bring their trees in full sync.
2284 The rersyncrecent mode was developed for CPAN but I hope it is a
2285 convenient and economic general purpose solution. I'm looking forward
2286 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2287 then ... the first FUSE based CPAN filesystem anyone?
2289 =head1 SEE ALSO
2291 Barbie is providing a database of release dates. See
2292 http://use.perl.org/~barbie/journal/37907
2294 =head1 AUTHOR
2296 Andreas König
2298 =head1 BUGS
2300 Please report any bugs or feature requests through the web interface
2302 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2303 I will be notified, and then you'll automatically be notified of
2304 progress on your bug as I make changes.
2306 =head1 SUPPORT
2308 You can find documentation for this module with the perldoc command.
2310 perldoc File::Rsync::Mirror::Recentfile
2312 You can also look for information at:
2314 =over 4
2316 =item * RT: CPAN's request tracker
2318 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2320 =item * AnnoCPAN: Annotated CPAN documentation
2322 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2324 =item * CPAN Ratings
2326 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2328 =item * Search CPAN
2330 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2332 =back
2335 =head1 ACKNOWLEDGEMENTS
2337 Thanks to RJBS for module-starter.
2339 =head1 COPYRIGHT & LICENSE
2341 Copyright 2008 Andreas König.
2343 This program is free software; you can redistribute it and/or modify it
2344 under the same terms as Perl itself.
2347 =cut
2349 1; # End of File::Rsync::Mirror::Recentfile
2351 # Local Variables:
2352 # mode: cperl
2353 # cperl-indent-level: 4
2354 # End: