throw the secondaryttl stuff out because it is arbitrary; try to get the chain from...
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob460ad6bd878a6b375afa5b64f854eed14cf07ac0
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 my $HAVE = {};
19 for my $package (
20 "Data::Serializer",
21 "File::Rsync"
22 ) {
23 $HAVE->{$package} = eval qq{ require $package; };
25 use Config;
26 use File::Basename qw(basename dirname fileparse);
27 use File::Copy qw(cp);
28 use File::Path qw(mkpath);
29 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
30 use File::Temp;
31 use List::Util qw(first min);
32 use Scalar::Util qw(reftype);
33 use Storable;
34 use Time::HiRes qw();
35 use YAML::Syck;
37 use version; our $VERSION = qv('0.0.1');
40 use constant MAX_INT => ~0>>1; # anything better?
41 use constant DEFAULT_PROTOCOL => 1;
43 # cf. interval_secs
44 my %seconds;
46 # maybe subclass if this mapping is bad?
47 my %serializers;
49 =head1 SYNOPSIS
51 B<!!!! PRE-ALPHA ALERT !!!!>
53 Nothing in here is believed to be stable, nothing yet intended for
54 public consumption. The plan is to provide a script in one of the next
55 releases that acts as a frontend for all the backend functionality.
56 Option and method names will very likely change.
58 For the rationale see the section BACKGROUND.
60 This is published only for developers of the (yet to be named)
61 script(s).
63 Writer (of a single file):
65 use File::Rsync::Mirror::Recentfile;
66 my $fr = File::Rsync::Mirror::Recentfile->new
68 interval => q(6h),
69 filenameroot => "RECENT",
70 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
71 localroot => "/home/ftp/pub/PAUSE/authors/",
72 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
74 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
76 Reader/mirrorer:
78 my $rf = File::Rsync::Mirror::Recentfile->new
80 filenameroot => "RECENT",
81 ignore_link_stat_errors => 1,
82 interval => q(6h),
83 localroot => "/home/ftp/pub/PAUSE/authors",
84 remote_dir => "",
85 remote_host => "pause.perl.org",
86 remote_module => "authors",
87 rsync_options => {
88 compress => 1,
89 'rsync-path' => '/usr/bin/rsync',
90 links => 1,
91 times => 1,
92 'omit-dir-times' => 1,
93 checksum => 1,
95 verbose => 1,
97 $rf->mirror;
99 Aggregator (usually the writer):
101 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
102 $rf->aggregate;
104 =head1 EXPORT
106 No exports.
108 =head1 CONSTRUCTORS / DESTRUCTOR
110 =head2 my $obj = CLASS->new(%hash)
112 Constructor. On every argument pair the key is a method name and the
113 value is an argument to that method name.
115 If a recentfile for this resource already exists, metadata that are
116 not defined by the constructor will be fetched from there as soon as
117 it is being read by recent_events().
119 =cut
121 sub new {
122 my($class, @args) = @_;
123 my $self = bless {}, $class;
124 while (@args) {
125 my($method,$arg) = splice @args, 0, 2;
126 $self->$method($arg);
128 unless (defined $self->protocol) {
129 $self->protocol(DEFAULT_PROTOCOL);
131 unless (defined $self->filenameroot) {
132 $self->filenameroot("RECENT");
134 unless (defined $self->serializer_suffix) {
135 $self->serializer_suffix(".yaml");
137 return $self;
140 =head2 my $obj = CLASS->new_from_file($file)
142 Constructor. $file is a I<recentfile>.
144 =cut
146 sub new_from_file {
147 my($class, $file) = @_;
148 my $self = bless {}, $class;
149 $self->_rfile($file);
150 #?# $self->lock;
151 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
152 local $/;
153 <$fh>;
155 # XXX: we can skip this step when the metadata are sufficient, but
156 # we cannot parse the file without some magic stuff about
157 # serialized formats
158 while (-l $file) {
159 my($name,$path) = fileparse $file;
160 my $symlink = readlink $file;
161 if ($symlink =~ m|/|) {
162 die "FIXME: filenames containing '/' not supported, got $symlink";
164 $file = File::Spec->catfile ( $path, $symlink );
166 my($name,$path,$suffix) = fileparse $file, keys %serializers;
167 $self->serializer_suffix($suffix);
168 $self->localroot($path);
169 die "Could not determine file format from suffix" unless $suffix;
170 my $deserialized;
171 if ($suffix eq ".yaml") {
172 require YAML::Syck;
173 $deserialized = YAML::Syck::LoadFile($file);
174 } elsif ($HAVE->{"Data::Serializer"}) {
175 my $serializer = Data::Serializer->new
176 ( serializer => $serializers{$suffix} );
177 $deserialized = $serializer->raw_deserialize($serialized);
178 } else {
179 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
181 while (my($k,$v) = each %{$deserialized->{meta}}) {
182 next if $k ne lc $k; # "Producers"
183 $self->$k($v);
185 unless (defined $self->protocol) {
186 $self->protocol(DEFAULT_PROTOCOL);
188 return $self;
191 =head2 DESTROY
193 A simple unlock.
195 =cut
196 sub DESTROY { shift->unlock }
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_delayed_operations",
209 "_done",
210 "_interval",
211 "_is_locked",
212 "_localroot",
213 "_merged",
214 "_pathdb",
215 "_remember_last_uptodate_call",
216 "_remote_dir",
217 "_remoteroot",
218 "_rfile",
219 "_rsync",
220 "_seeded",
221 "_uptodateness_ever_reached",
222 "_use_tempfile",
225 my @pod_lines =
226 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
228 =over 4
230 =item aggregator
232 A list of interval specs that tell the aggregator which I<recentfile>s
233 are to be produced.
235 =item canonize
237 The name of a method to canonize the path before rsyncing. Only
238 supported value is C<naive_path_normalize>. Defaults to that.
240 =item comment
242 A comment about this tree and setup.
244 =item dirtymark
246 A timestamp. The dirtymark is updated whenever an out of band change
247 on the origin server is performed that violates the protocol. Say,
248 they add or remove files in the middle somewhere. Slaves must react
249 with a devaluation of their C<done> structure which then leads to a
250 full re-sync of all files.
252 =item filenameroot
254 The (prefix of the) filename we use for this I<recentfile>. Defaults to
255 C<RECENT>.
257 =item have_mirrored
259 Timestamp remembering when we mirrored this recentfile the last time.
260 Only relevant for slaves.
262 =item ignore_link_stat_errors
264 If set to true, rsync errors are ignored that complain about link stat
265 errors. These seem to happen only when there are files missing at the
266 origin. In race conditions this can always happen, so it is
267 recommended to set this value to true.
269 =item is_slave
271 If set to true, this object will fetch a new recentfile from remote
272 when the timespan between the last mirror (see have_mirrored) and now
273 is too large (currently hardcoded arbitrary 420 seconds).
275 =item locktimeout
277 After how many seconds shall we die if we cannot lock a I<recentfile>?
278 Defaults to 600 seconds.
280 =item loopinterval
282 When mirror_loop is called, this accessor can specify how much time
283 every loop shall at least take. If the work of a loop is done before
284 that time has gone, sleeps for the rest of the time. Defaults to
285 arbitrary 42 seconds.
287 =item max_files_per_connection
289 Maximum number of files that are transferred on a single rsync call.
290 Setting it higher means higher performance at the price of holding
291 connections longer and potentially disturbing other users in the pool.
292 Defaults to the arbitrary value 42.
294 =item max_rsync_errors
296 When rsync operations encounter that many errors without any resetting
297 success in between, then we die. Defaults to arbitrary 12. A value of
298 -1 means we run forever ignoring all rsync errors.
300 =item minmax
302 Hashref remembering when we read the recent_events from this file the
303 last time and what the timespan was.
305 =item protocol
307 When the RECENT file format changes, we increment the protocol. We try
308 to support older protocols in later releases.
310 =item remote_host
312 The host we are mirroring from. Leave empty for the local filesystem.
314 =item remote_module
316 Rsync servers have so called modules to separate directory trees from
317 each other. Put here the name of the module under which we are
318 mirroring. Leave empty for local filesystem.
320 =item rsync_options
322 Things like compress, links, times or checksums. Passed in to the
323 File::Rsync object used to run the mirror.
325 =item serializer_suffix
327 Mostly untested accessor. The only well tested format for
328 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
329 Data::Serializer. But in principle other formats are supported as
330 well. See section SERIALIZERS below.
332 =item sleep_per_connection
334 Sleep that many seconds (floating point OK) after every chunk of rsyncing
335 has finished. Defaults to arbitrary 0.42.
337 =item ttl
339 Time to live. Number of seconds after which this recentfile must be
340 fetched again from the origin server. Only relevant for slaves.
341 Defaults to arbitrary 24.2 seconds.
343 =item verbose
345 Boolean to turn on a bit verbosity.
347 =back
349 =cut
351 use accessors @accessors;
353 =head1 METHODS
355 =head2 (void) $obj->aggregate
357 Takes all intervals that are collected in the accessor called
358 aggregator. Sorts them by actual length of the interval.
359 Removes those that are shorter than our own interval. Then merges this
360 object into the next larger object. The merging continues upwards
361 as long as the next I<recentfile> is old enough to warrant a merge.
363 If a merge is warranted is decided according to the interval of the
364 previous interval so that larger files are not so often updated as
365 smaller ones.
367 Here is an example to illustrate the behaviour. Given aggregators
369 1h 1d 1W 1M 1Q 1Y Z
371 then
373 1h updates 1d on every call to aggregate()
374 1d updates 1W earliest after 1h
375 1W updates 1M earliest after 1d
376 1M updates 1Q earliest after 1W
377 1Q updates 1Y earliest after 1M
378 1Y updates Z earliest after 1Q
380 Note that all but the smallest recentfile get updated at an arbitrary
381 rate and as such are quite useless on their own.
383 =cut
385 sub aggregate {
386 my($self) = @_;
387 my @aggs = sort { $a->{secs} <=> $b->{secs} }
388 grep { $_->{secs} >= $self->interval_secs }
389 map { { interval => $_, secs => $self->interval_secs($_)} }
390 $self->interval, @{$self->aggregator || []};
391 $aggs[0]{object} = $self;
392 AGGREGATOR: for my $i (0..$#aggs-1) {
393 my $this = $aggs[$i]{object};
394 my $next = $this->_sparse_clone;
395 $next->interval($aggs[$i+1]{interval});
396 my $want_merge = 0;
397 if ($i == 0) {
398 $want_merge = 1;
399 } else {
400 my $next_rfile = $next->rfile;
401 if (-e $next_rfile) {
402 my $prev = $aggs[$i-1]{object};
403 local $^T = time;
404 my $next_age = 86400 * -M $next_rfile;
405 if ($next_age > $prev->interval_secs) {
406 $want_merge = 1;
408 } else {
409 $want_merge = 1;
412 if ($want_merge) {
413 $next->merge($this);
414 $aggs[$i+1]{object} = $next;
415 } else {
416 last AGGREGATOR;
421 # collect file size and mtime for all files of this aggregate
422 sub _debug_aggregate {
423 my($self) = @_;
424 my @aggs = sort { $a->{secs} <=> $b->{secs} }
425 map { { interval => $_, secs => $self->interval_secs($_)} }
426 $self->interval, @{$self->aggregator || []};
427 my $report = [];
428 for my $i (0..$#aggs) {
429 my $this = Storable::dclone $self;
430 $this->interval($aggs[$i]{interval});
431 my $rfile = $this->rfile;
432 my @stat = stat $rfile;
433 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
435 $report;
438 # (void) $self->_assert_symlink()
439 sub _assert_symlink {
440 my($self) = @_;
441 my $recentrecentfile = File::Spec->catfile
443 $self->localroot,
444 sprintf
446 "%s.recent",
447 $self->filenameroot
450 if ($Config{d_symlink} eq "define") {
451 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
452 if (-l $recentrecentfile) {
453 my $found_symlink = readlink $recentrecentfile;
454 if ($found_symlink eq $self->rfilename) {
455 return;
456 } else {
457 $howto_create_symlink = 2;
459 } else {
460 $howto_create_symlink = 1;
462 if (1 == $howto_create_symlink) {
463 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
464 } else {
465 unlink "$recentrecentfile.$$"; # may fail
466 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
467 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
469 } else {
470 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
471 unlink "$recentrecentfile.$$"; # may fail
472 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
473 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
477 =head2 $hashref = $obj->delayed_operations
479 A hash of hashes containing unlink and rmdir operations which had to
480 wait until the recentfile got unhidden in order to not confuse
481 downstream mirrors (in case we have some).
483 =cut
485 sub delayed_operations {
486 my($self) = @_;
487 my $x = $self->_delayed_operations;
488 unless (defined $x) {
489 $x = {
490 unlink => {},
491 rmdir => {},
493 $self->_delayed_operations ($x);
495 return $x;
498 =head2 $done = $obj->done
500 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
501 that keeps track of rsync activities. Only needed and used when we are
502 a mirroring slave.
504 =cut
506 sub done {
507 my($self) = @_;
508 my $done = $self->_done;
509 if (!$done) {
510 require File::Rsync::Mirror::Recentfile::Done;
511 $done = File::Rsync::Mirror::Recentfile::Done->new();
512 $done->_rfinterval ($self->interval);
513 $self->_done ( $done );
515 return $done;
518 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
520 Stores the remote I<recentfile> locally as a tempfile. The caller is
521 responsible to remove the file after use.
523 Note: if you're intending to act as an rsync server for other slaves,
524 then you must prefer this method to fetch that file with
525 get_remotefile(). Otherwise downstream mirrors would expect you to
526 already have mirrored all the files that are in the I<recentfile>
527 before you have them mirrored.
529 =cut
531 sub get_remote_recentfile_as_tempfile {
532 my($self) = @_;
533 mkpath $self->localroot;
534 my $fh;
535 my $trfilename;
536 if ( $self->_use_tempfile() ) {
537 return $self->_current_tempfile if ! $self->ttl_reached;
538 $fh = $self->_current_tempfile_fh;
539 $trfilename = $self->rfilename;
540 } else {
541 $trfilename = $self->rfilename;
544 my $dst;
545 if ($fh) {
546 $dst = $self->_current_tempfile;
547 } else {
548 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
549 $dst = $fh->filename;
550 $self->_current_tempfile ($dst);
551 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
552 if (defined $rfile && -e $rfile) {
553 # saving on bandwidth. Might need to be configurable
554 # $self->bandwidth_is_cheap?
555 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
558 my $src = join ("/",
559 $self->remoteroot,
560 $trfilename,
562 if ($self->verbose) {
563 my $doing = -e $dst ? "Sync" : "Get";
564 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
565 printf STDERR
567 "%-4s %d (1/1/%s) temp %s ... ",
568 $doing,
569 time,
570 $self->interval,
571 $display_dst,
574 my $gaveup = 0;
575 my $retried = 0;
576 while (!$self->rsync->exec(
577 src => $src,
578 dst => $dst,
579 )) {
580 $self->register_rsync_error ($self->rsync->err);
581 if (++$retried >= 3) {
582 warn "XXX giving up";
583 $gaveup = 1;
584 last;
587 unless ($gaveup) {
588 $self->have_mirrored (Time::HiRes::time);
589 $self->un_register_rsync_error ();
591 if ($self->verbose) {
592 print STDERR "DONE\n";
594 my $mode = 0644;
595 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
596 return $dst;
599 sub _get_remote_rat_provide_tempfile_object {
600 my($self, $trfilename) = @_;
601 my $fh = File::Temp->new
602 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
603 $trfilename,
605 DIR => $self->localroot,
606 SUFFIX => $self->serializer_suffix,
607 UNLINK => $self->_use_tempfile,
609 if ($self->_use_tempfile) {
610 $self->_current_tempfile_fh ($fh); # delay self destruction
612 return $fh;
615 =head2 $localpath = $obj->get_remotefile ( $relative_path )
617 Rsyncs one single remote file to local filesystem.
619 Note: no locking is done on this file. Any number of processes may
620 mirror this object.
622 Note II: do not use for recentfiles. If you are a cascading
623 slave/server combination, it would confuse other slaves. They would
624 expect the contents of these recentfiles to be available. Use
625 get_remote_recentfile_as_tempfile() instead.
627 =cut
629 sub get_remotefile {
630 my($self, $path) = @_;
631 my $dst = File::Spec->catfile($self->localroot, $path);
632 mkpath dirname $dst;
633 if ($self->verbose) {
634 my $doing = -e $dst ? "Sync" : "Get";
635 printf STDERR
637 "%-4s %d (1/1/%s) %s ... ",
638 $doing,
639 time,
640 $self->interval,
641 $path,
644 while (!$self->rsync->exec(
645 src => join("/",
646 $self->remoteroot,
647 $path),
648 dst => $dst,
649 )) {
650 $self->register_rsync_error ($self->rsync->err);
652 $self->un_register_rsync_error ();
653 if ($self->verbose) {
654 print STDERR "DONE\n";
656 return $dst;
659 =head2 $obj->interval ( $interval_spec )
661 Get/set accessor. $interval_spec is a string and described below in
662 the section INTERVAL SPEC.
664 =cut
666 sub interval {
667 my ($self, $interval) = @_;
668 if (@_ >= 2) {
669 $self->_interval($interval);
670 $self->_rfile(undef);
672 $interval = $self->_interval;
673 unless (defined $interval) {
674 # do not ask the $self too much, it recurses!
675 require Carp;
676 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
678 return $interval;
681 =head2 $secs = $obj->interval_secs ( $interval_spec )
683 $interval_spec is described below in the section INTERVAL SPEC. If
684 empty defaults to the inherent interval for this object.
686 =cut
688 sub interval_secs {
689 my ($self, $interval) = @_;
690 $interval ||= $self->interval;
691 unless (defined $interval) {
692 die "interval_secs() called without argument on an object without a declared one";
694 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
695 die "Could not determine seconds from interval[$interval]";
696 if ($interval eq "Z") {
697 return MAX_INT;
698 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
699 return $seconds{$t}*$n;
700 } else {
701 die "Invalid interval specification: n[$n]t[$t]";
705 =head2 $obj->localroot ( $localroot )
707 Get/set accessor. The local root of the tree.
709 =cut
711 sub localroot {
712 my ($self, $localroot) = @_;
713 if (@_ >= 2) {
714 $self->_localroot($localroot);
715 $self->_rfile(undef);
717 $localroot = $self->_localroot;
720 =head2 $ret = $obj->local_path($path_found_in_recentfile)
722 Combines the path to our local mirror and the path of an object found
723 in this I<recentfile>. In other words: the target of a mirror operation.
725 Implementation note: We split on slashes and then use
726 File::Spec::catfile to adjust to the local operating system.
728 =cut
730 sub local_path {
731 my($self,$path) = @_;
732 unless (defined $path) {
733 # seems like a degenerated case
734 return $self->localroot;
736 my @p = split m|/|, $path;
737 File::Spec->catfile($self->localroot,@p);
740 =head2 (void) $obj->lock
742 Locking is implemented with an C<mkdir> on a locking directory
743 (C<.lock> appended to $rfile).
745 =cut
747 sub lock {
748 my ($self) = @_;
749 # not using flock because it locks on filehandles instead of
750 # old school ressources.
751 my $locked = $self->_is_locked and return;
752 my $rfile = $self->rfile;
753 # XXX need a way to allow breaking the lock
754 my $start = time;
755 my $locktimeout = $self->locktimeout || 600;
756 while (not mkdir "$rfile.lock") {
757 Time::HiRes::sleep 0.01;
758 if (time - $start > $locktimeout) {
759 die "Could not acquire lockdirectory '$rfile.lock': $!";
762 $self->_is_locked (1);
765 =head2 (void) $obj->merge ($other)
767 Bulk update of this object with another one. It's used to merge a
768 smaller and younger $other object into the current one. If this file
769 is a C<Z> file, then we do not merge in objects of type C<delete>. But
770 if we encounter an object of type delete we delete the corresponding
771 C<new> object if we have it.
773 If there is nothing to be merged, nothing is done.
775 =cut
777 sub merge {
778 my($self, $other) = @_;
779 $self->_merge_sanitycheck ( $other );
780 $other->lock;
781 my $other_recent = $other->recent_events || [];
782 $self->lock;
783 my $my_recent = $self->recent_events || [];
785 # calculate the target time span
786 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
787 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
788 my $oldest_allowed = 0;
789 my $something_done;
790 unless ($my_recent->[0]) {
791 # obstetrics
792 $something_done=1;
794 if ($epoch) {
795 if (my $merged = $self->merged) {
796 my $secs = $self->interval_secs();
797 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
799 # throw away outsiders
800 # XXX _bigfloat!
801 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
802 pop @$my_recent;
803 $something_done=1;
807 my %have;
808 my $recent = [];
809 for my $oev (@$other_recent) {
810 my $oevepoch = $oev->{epoch} || 0;
811 next if $oevepoch < $oldest_allowed;
812 my $path = $oev->{path};
813 next if $have{$path}++;
814 if ( $self->interval eq "Z"
815 and $oev->{type} eq "delete") {
816 # do nothing
817 } else {
818 if (!$myepoch || $oevepoch > $myepoch) {
819 $something_done=1;
821 push @$recent, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
824 if ($something_done) {
825 $self->_merge_something_done ($recent, $my_recent, $other_recent, $other, \%have, $epoch);
827 $self->unlock;
828 $other->unlock;
831 sub _merge_something_done {
832 my($self, $recent, $my_recent, $other_recent, $other, $have, $epoch) = @_;
833 push @$recent, grep { !$have->{$_->{path}}++ } @$my_recent;
834 if (_bigfloatgt($other->dirtymark, $self->dirtymark)) {
835 $self->dirtymark ( $other->dirtymark );
837 $self->write_recent($recent);
838 $other->merged({
839 time => Time::HiRes::time, # not used anywhere
840 epoch => $my_recent->[0]{epoch},
841 into_interval => $self->interval, # not used anywhere
843 $other->write_recent($other_recent);
846 sub _merge_sanitycheck {
847 my($self, $other) = @_;
848 if ($self->interval_secs <= $other->interval_secs) {
849 die sprintf
851 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
852 $self->interval_secs,
853 $other->interval_secs,
858 =head2 merged
860 Hashref denoting when this recentfile has been merged into some other
861 at which epoch.
863 =cut
865 sub merged {
866 my($self, $set) = @_;
867 if (defined $set) {
868 $self->_merged ($set);
870 my $merged = $self->_merged;
871 my $into;
872 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
873 if ($into eq $self->interval) {
874 require Carp;
875 Carp::cluck(sprintf
877 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
878 $into,
879 $self->interval,
881 } elsif ($self->interval_secs($into) < $self->interval_secs) {
882 require Carp;
883 Carp::cluck(sprintf
885 "Warning: into_interval[%s] smaller than own interval[%s] on interval[%s]. Danger ahead.",
886 $self->interval_secs($into),
887 $self->interval_secs,
888 $self->interval,
892 $merged;
895 =head2 $hashref = $obj->meta_data
897 Returns the hashref of metadata that the server has to add to the
898 I<recentfile>.
900 =cut
902 sub meta_data {
903 my($self) = @_;
904 my $ret = $self->{meta};
905 for my $m (
906 "aggregator",
907 "canonize",
908 "comment",
909 "dirtymark",
910 "filenameroot",
911 "merged",
912 "interval",
913 "protocol",
914 "serializer_suffix",
916 my $v = $self->$m;
917 if (defined $v) {
918 $ret->{$m} = $v;
921 # XXX need to reset the Producer if I am a writer, keep it when I
922 # am a reader
923 $ret->{Producers} ||= {
924 __PACKAGE__, "$VERSION", # stringified it looks better
925 '$0', $0,
926 'time', Time::HiRes::time,
928 $ret->{dirtymark} ||= Time::HiRes::time;
929 return $ret;
932 =head2 $success = $obj->mirror ( %options )
934 Mirrors the files in this I<recentfile> as reported by
935 C<recent_events>. Options named C<after>, C<before>, C<max>, and
936 C<skip-deletes> are passed through to the L<recent_events> call. The
937 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
938 C<max_files_per_connection> and keep track of the rsynced files so
939 that future calls will rsync different files until all files are
940 brought to sync.
942 =cut
944 sub mirror {
945 my($self, %options) = @_;
946 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
947 $self->_use_tempfile (1);
948 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
949 my ($recent_events) = $self->recent_events(%passthrough);
950 my(@error, @xcollector);
951 my $first_item = 0;
952 my $last_item = $#$recent_events;
953 my $done = $self->done;
954 my $pathdb = $self->_pathdb;
955 ITEM: for my $i ($first_item..$last_item) {
956 my $status = +{};
957 $self->_mirror_item
960 $recent_events,
961 $last_item,
962 $done,
963 $pathdb,
964 \@xcollector,
965 \%options,
966 $status,
967 \@error,
969 return if $status->{mustreturn};
971 if (@xcollector) {
972 my $success = eval { $self->_mirror_empty_xcollector (\@xcollector,$pathdb,$recent_events);};
973 if (!$success || $@) {
974 warn "Warning: Unknown error while mirroring: $@";
975 push @error, $@;
976 sleep 1;
978 if ($self->verbose) {
979 print STDERR "DONE\n";
982 # once we've gone to the end we consider ourselve free of obligations
983 $self->unseed;
984 $self->_mirror_unhide_tempfile ($trecentfile);
985 $self->_mirror_perform_delayed_ops;
986 return !@error;
989 sub _mirror_item {
990 my($self,
992 $recent_events,
993 $last_item,
994 $done,
995 $pathdb,
996 $xcollector,
997 $options,
998 $status,
999 $error,
1000 ) = @_;
1001 my $recent_event = $recent_events->[$i];
1002 return if $done->covered ( $recent_event->{epoch} );
1003 if ($pathdb) {
1004 my $rec = $pathdb->{$recent_event->{path}};
1005 if ($rec && $rec->{recentepoch}) {
1006 if (_bigfloatgt
1007 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1008 $done->register ($recent_events, [$i]);
1009 return;
1013 my $dst = $self->local_path($recent_event->{path});
1014 if ($recent_event->{type} eq "new"){
1015 $self->_mirror_item_new
1017 $dst,
1019 $last_item,
1020 $recent_events,
1021 $recent_event,
1022 $xcollector,
1023 $pathdb,
1024 $status,
1025 $error,
1026 $options,
1028 } elsif ($recent_event->{type} eq "delete") {
1029 my $activity;
1030 if ($options->{'skip-deletes'}) {
1031 $activity = "skipped";
1032 } else {
1033 if (! -e $dst) {
1034 $activity = "not_found";
1035 } elsif (-l $dst or not -d _) {
1036 $self->delayed_operations->{unlink}{$dst}++;
1037 $activity = "deleted";
1038 } else {
1039 $self->delayed_operations->{rmdir}{$dst}++;
1040 $activity = "deleted";
1043 $done->register ($recent_events, [$i]);
1044 if ($pathdb) {
1045 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1047 } else {
1048 warn "Warning: invalid upload type '$recent_event->{type}'";
1052 sub _mirror_item_new {
1053 my($self,
1054 $dst,
1056 $last_item,
1057 $recent_events,
1058 $recent_event,
1059 $xcollector,
1060 $pathdb,
1061 $status,
1062 $error,
1063 $options,
1064 ) = @_;
1065 if ($self->verbose) {
1066 my $doing = -e $dst ? "Sync" : "Get";
1067 printf STDERR
1069 "%-4s %d (%d/%d/%s) %s ... ",
1070 $doing,
1071 time,
1072 1+$i,
1073 1+$last_item,
1074 $self->interval,
1075 $recent_event->{path},
1078 my $max_files_per_connection = $self->max_files_per_connection || 42;
1079 my $success;
1080 if ($self->verbose) {
1081 print STDERR "\n";
1083 push @$xcollector, { rev => $recent_event, i => $i };
1084 if (@$xcollector >= $max_files_per_connection) {
1085 $success = eval {$self->_mirror_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1086 my $sleep = $self->sleep_per_connection;
1087 $sleep = 0.42 unless defined $sleep;
1088 Time::HiRes::sleep $sleep;
1089 if ($options->{piecemeal}) {
1090 $status->{mustreturn} = 1;
1091 return;
1093 } else {
1094 return;
1096 if (!$success || $@) {
1097 warn "Warning: Error while mirroring: $@";
1098 push @$error, $@;
1099 sleep 1;
1101 if ($self->verbose) {
1102 print STDERR "DONE\n";
1106 sub _mirror_empty_xcollector {
1107 my($self,$xcoll,$pathdb,$recent_events) = @_;
1108 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1109 if ($pathdb) {
1110 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1112 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1113 @$xcoll = ();
1114 return $success;
1117 sub _mirror_register_path {
1118 my($self,$pathdb,$coll,$activity) = @_;
1119 my $time = time;
1120 for my $item (@$coll) {
1121 $pathdb->{$item->{path}} =
1123 recentepoch => $item->{epoch},
1124 ($activity."_on") => $time,
1129 sub _mirror_unhide_tempfile {
1130 my($self, $trecentfile) = @_;
1131 my $rfile = $self->rfile;
1132 unless (rename $trecentfile, $rfile) {
1133 require Carp;
1134 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1136 $self->_use_tempfile (0);
1137 if (my $ctfh = $self->_current_tempfile_fh) {
1138 $ctfh->unlink_on_destroy (0);
1139 $self->_current_tempfile_fh (undef);
1144 sub _mirror_perform_delayed_ops {
1145 my($self) = @_;
1146 my $delayed = $self->delayed_operations;
1147 for my $dst (keys %{$delayed->{unlink}}) {
1148 unless (unlink $dst) {
1149 require Carp;
1150 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1152 delete $delayed->{unlink}{$dst};
1154 for my $dst (keys %{$delayed->{rmdir}}) {
1155 unless (rmdir $dst) {
1156 require Carp;
1157 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1159 delete $delayed->{rmdir}{$dst};
1163 =head2 (void) $obj->mirror_loop
1165 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1166 What happens/should happen if we miss the interval during a single loop?
1168 =cut
1170 sub mirror_loop {
1171 my($self) = @_;
1172 my $iteration_start = time;
1174 my $Signal = 0;
1175 $SIG{INT} = sub { $Signal++ };
1176 my $loopinterval = $self->loopinterval || 42;
1177 my $after = -999999999;
1178 LOOP: while () {
1179 $self->mirror($after);
1180 last LOOP if $Signal;
1181 my $re = $self->recent_events;
1182 $after = $re->[0]{epoch};
1183 if ($self->verbose) {
1184 local $| = 1;
1185 print "($after)";
1187 if (time - $iteration_start < $loopinterval) {
1188 sleep $iteration_start + $loopinterval - time;
1190 if ($self->verbose) {
1191 local $| = 1;
1192 print "~";
1197 =head2 $success = $obj->mirror_path ( $arrref | $path )
1199 If the argument is a scalar it is treated as a path. The remote path
1200 is mirrored into the local copy. $path is the path found in the
1201 I<recentfile>, i.e. it is relative to the root directory of the
1202 mirror.
1204 If the argument is an array reference then all elements are treated as
1205 a path below the current tree and all are rsynced with a single
1206 command (and a single connection).
1208 =cut
1210 sub mirror_path {
1211 my($self,$path) = @_;
1212 # XXX simplify the two branches such that $path is treated as
1213 # [$path] maybe even demand the argument as an arrayref to
1214 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1215 # interface)
1216 if (ref $path and ref $path eq "ARRAY") {
1217 my $dst = $self->localroot;
1218 mkpath dirname $dst;
1219 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1220 lc $self->filenameroot,
1222 TMPDIR => 1,
1223 UNLINK => 0,
1225 for my $p (@$path) {
1226 print $fh $p, "\n";
1228 $fh->flush;
1229 $fh->unlink_on_destroy(1);
1230 my $gaveup = 0;
1231 my $retried = 0;
1232 while (!$self->rsync->exec
1234 src => join("/",
1235 $self->remoteroot,
1237 dst => $dst,
1238 'files-from' => $fh->filename,
1239 )) {
1240 my($err) = $self->rsync->err;
1241 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1242 if ($self->verbose) {
1243 warn "Info: ignoring link_stat error '$err'";
1245 return 1;
1247 $self->register_rsync_error ($err);
1248 if (++$retried >= 3) {
1249 warn "XXX giving up.";
1250 $gaveup = 1;
1251 last;
1254 unless ($gaveup) {
1255 $self->un_register_rsync_error ();
1257 } else {
1258 my $dst = $self->local_path($path);
1259 mkpath dirname $dst;
1260 while (!$self->rsync->exec
1262 src => join("/",
1263 $self->remoteroot,
1264 $path
1266 dst => $dst,
1267 )) {
1268 my($err) = $self->rsync->err;
1269 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1270 if ($self->verbose) {
1271 warn "Info: ignoring link_stat error '$err'";
1273 return 1;
1275 $self->register_rsync_error ($err);
1277 $self->un_register_rsync_error ();
1279 return 1;
1282 sub _my_current_rfile {
1283 my($self) = @_;
1284 my $rfile;
1285 if ($self->_use_tempfile) {
1286 $rfile = $self->_current_tempfile;
1287 } else {
1288 $rfile = $self->rfile;
1290 return $rfile;
1293 =head2 $path = $obj->naive_path_normalize ($path)
1295 Takes an absolute unix style path as argument and canonicalizes it to
1296 a shorter path if possible, removing things like double slashes or
1297 C</./> and removes references to C<../> directories to get a shorter
1298 unambiguos path. This is used to make the code easier that determines
1299 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1301 =cut
1303 sub naive_path_normalize {
1304 my($self,$path) = @_;
1305 $path =~ s|/+|/|g;
1306 1 while $path =~ s|/[^/]+/\.\./|/|;
1307 $path =~ s|/$||;
1308 $path;
1311 =head2 $ret = $obj->read_recent_1 ( $data )
1313 Delegate of C<recent_events()> on protocol 1
1315 =cut
1317 sub read_recent_1 {
1318 my($self, $data) = @_;
1319 return $data->{recent};
1322 =head2 $array_ref = $obj->recent_events ( %options )
1324 Note: the code relies on the resource being written atomically. We
1325 cannot lock because we may have no write access. If the caller has
1326 write access (eg. aggregate() or update()), it has to care for any
1327 necessary locking.
1329 If $options{after} is specified, only file events after this timestamp
1330 are returned.
1332 If $options{before} is specified, only file events before this
1333 timestamp are returned.
1335 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1336 be returned.
1338 If $options{max} is specified only this many events are returned.
1340 If $options{info} is specified, it must be a hashref. This hashref
1341 will be filled with metadata about the unfiltered recent_events of
1342 this object, in key C<first> there is the first item, in key C<last>
1343 is the last.
1345 =cut
1347 sub recent_events {
1348 my ($self, %options) = @_;
1349 my $info = $options{info};
1350 if ($self->is_slave) {
1351 $self->get_remote_recentfile_as_tempfile;
1353 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1354 -e $rfile_or_tempfile or return [];
1355 my $suffix = $self->serializer_suffix;
1356 my ($data) = eval {
1357 $self->_try_deserialize
1359 $suffix,
1360 $rfile_or_tempfile,
1363 my $err = $@;
1364 if ($err or !$data) {
1365 return [];
1367 my $re;
1368 if (reftype $data eq 'ARRAY') { # protocol 0
1369 $re = $data;
1370 } else {
1371 $re = $self->_recent_events_protocol_x
1373 $data,
1374 $rfile_or_tempfile,
1377 return $re unless grep {defined $options{$_}} qw(after before max);
1378 $self->_recent_events_handle_options ($re, \%options);
1381 sub _recent_events_handle_options {
1382 my($self, $re, $options) = @_;
1383 my $last_item = $#$re;
1384 my $info = $options->{info};
1385 if ($info) {
1386 $info->{first} = $re->[0];
1387 $info->{last} = $re->[-1];
1389 if (defined $options->{after}) {
1390 if ($re->[0]{epoch} > $options->{after}) {
1391 if (
1392 my $f = first
1393 {$re->[$_]{epoch} <= $options->{after}}
1394 0..$#$re
1396 $last_item = $f-1;
1398 } else {
1399 $last_item = -1;
1402 my $first_item = 0;
1403 if (defined $options->{before}) {
1404 if ($re->[0]{epoch} > $options->{before}) {
1405 if (
1406 my $f = first
1407 {$re->[$_]{epoch} < $options->{before}}
1408 0..$last_item
1410 $first_item = $f;
1412 } else {
1413 $first_item = 0;
1416 my @rre = splice @$re, $first_item, 1+$last_item-$first_item;
1417 if ($options->{'skip-deletes'}) {
1418 @rre = grep { $_->{type} ne "delete" } @rre;
1420 if ($options->{max} && @rre > $options->{max}) {
1421 @rre = splice @rre, 0, $options->{max};
1423 \@rre;
1426 sub _recent_events_protocol_x {
1427 my($self,
1428 $data,
1429 $rfile_or_tempfile,
1430 ) = @_;
1431 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1432 # we may be reading meta for the first time
1433 while (my($k,$v) = each %{$data->{meta}}) {
1434 next if $k ne lc $k; # "Producers"
1435 next if defined $self->$k;
1436 $self->$k($v);
1438 my $re = $self->$meth ($data);
1439 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1440 my $minmax = { mtime => $stat[9] };
1441 if (@$re) {
1442 $minmax->{min} = $re->[-1]{epoch};
1443 $minmax->{max} = $re->[0]{epoch};
1445 $self->minmax ( $minmax );
1446 return $re;
1449 sub _try_deserialize {
1450 my($self,
1451 $suffix,
1452 $rfile_or_tempfile,
1453 ) = @_;
1454 if ($suffix eq ".yaml") {
1455 require YAML::Syck;
1456 YAML::Syck::LoadFile($rfile_or_tempfile);
1457 } elsif ($HAVE->{"Data::Serializer"}) {
1458 my $serializer = Data::Serializer->new
1459 ( serializer => $serializers{$suffix} );
1460 my $serialized = do
1462 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1463 local $/;
1464 <$fh>;
1466 $serializer->raw_deserialize($serialized);
1467 } else {
1468 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1472 =head2 $ret = $obj->rfilename
1474 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1475 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1477 =cut
1479 sub rfilename {
1480 my($self) = @_;
1481 my $file = sprintf("%s-%s%s",
1482 $self->filenameroot,
1483 $self->interval,
1484 $self->serializer_suffix,
1486 return $file;
1489 =head2 $str = $self->remote_dir
1491 The directory we are mirroring from.
1493 =cut
1495 sub remote_dir {
1496 my($self, $set) = @_;
1497 if (defined $set) {
1498 $self->_remote_dir ($set);
1500 my $x = $self->_remote_dir;
1501 $self->is_slave (1);
1502 return $x;
1505 =head2 $str = $obj->remoteroot
1507 =head2 (void) $obj->remoteroot ( $set )
1509 Get/Set the composed prefix needed when rsyncing from a remote module.
1510 If remote_host, remote_module, and remote_dir are set, it is composed
1511 from these.
1513 =cut
1515 sub remoteroot {
1516 my($self, $set) = @_;
1517 if (defined $set) {
1518 $self->_remoteroot($set);
1520 my $remoteroot = $self->_remoteroot;
1521 unless (defined $remoteroot) {
1522 $remoteroot = sprintf
1524 "%s%s%s",
1525 defined $self->remote_host ? ($self->remote_host."::") : "",
1526 defined $self->remote_module ? ($self->remote_module."/") : "",
1527 defined $self->remote_dir ? $self->remote_dir : "",
1529 $self->_remoteroot($remoteroot);
1531 return $remoteroot;
1534 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1536 Inverse method to L<rfilename>. $recentfilename is a plain filename of
1537 the pattern
1539 $filenameroot-$interval$serializer_suffix
1541 e.g.
1543 RECENT-1M.yaml
1545 This filename is split into its parts and the parts are fed to the
1546 object itself.
1548 =cut
1550 sub resolve_recentfilename {
1551 my($self, $rfname) = @_;
1552 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1553 if (my($f,$i,$s) = $rfname =~ $splitter) {
1554 $self->filenameroot ($f);
1555 $self->interval ($i);
1556 $self->serializer_suffix ($s);
1557 } else {
1558 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1560 return;
1563 =head2 my $rfile = $obj->rfile
1565 Returns the full path of the I<recentfile>
1567 =cut
1569 sub rfile {
1570 my($self) = @_;
1571 my $rfile = $self->_rfile;
1572 return $rfile if defined $rfile;
1573 $rfile = File::Spec->catfile
1574 ($self->localroot,
1575 $self->rfilename,
1577 $self->_rfile ($rfile);
1578 return $rfile;
1581 =head2 $rsync_obj = $obj->rsync
1583 The File::Rsync object that this object uses for communicating with an
1584 upstream server.
1586 =cut
1588 sub rsync {
1589 my($self) = @_;
1590 my $rsync = $self->_rsync;
1591 unless (defined $rsync) {
1592 my $rsync_options = $self->rsync_options || {};
1593 if ($HAVE->{"File::Rsync"}) {
1594 $rsync = File::Rsync->new($rsync_options);
1595 $self->_rsync($rsync);
1596 } else {
1597 die "File::Rsync required for rsync operations. Cannot continue";
1600 return $rsync;
1603 =head2 (void) $obj->register_rsync_error($err)
1605 =head2 (void) $obj->un_register_rsync_error()
1607 Register_rsync_error is called whenever the File::Rsync object fails
1608 on an exec (say, connection doesn't succeed). It issues a warning and
1609 sleeps for an increasing amount of time. Un_register_rsync_error
1610 resets the error count. See also accessor C<max_rsync_errors>.
1612 =cut
1615 my $no_success_count = 0;
1616 my $no_success_time = 0;
1617 sub register_rsync_error {
1618 my($self, $err) = @_;
1619 chomp $err;
1620 $no_success_time = time;
1621 $no_success_count++;
1622 my $max_rsync_errors = $self->max_rsync_errors;
1623 $max_rsync_errors = 12 unless defined $max_rsync_errors;
1624 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1625 require Carp;
1626 Carp::confess
1628 sprintf
1630 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1631 $self->interval,
1632 $err,
1633 $no_success_count,
1636 my $sleep = 12 * $no_success_count;
1637 $sleep = 120 if $sleep > 120;
1638 require Carp;
1639 Carp::cluck
1640 (sprintf
1642 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1643 scalar(localtime($no_success_time)),
1644 $self->interval,
1645 $err,
1646 $sleep,
1648 sleep $sleep
1650 sub un_register_rsync_error {
1651 my($self) = @_;
1652 $no_success_time = 0;
1653 $no_success_count = 0;
1657 =head2 $clone = $obj->_sparse_clone
1659 Clones just as much from itself that it does not hurt. Experimental
1660 method.
1662 Note: what fits better: sparse or shallow? Other suggestions?
1664 =cut
1666 sub _sparse_clone {
1667 my($self) = @_;
1668 my $new = bless {}, ref $self;
1669 for my $m (qw(
1670 _interval
1671 _localroot
1672 _remoteroot
1673 _rfile
1674 _use_tempfile
1675 aggregator
1676 dirtymark
1677 filenameroot
1678 is_slave
1679 max_files_per_connection
1680 protocol
1681 rsync_options
1682 serializer_suffix
1683 sleep_per_connection
1684 verbose
1685 )) {
1686 my $o = $self->$m;
1687 $o = Storable::dclone $o if ref $o;
1688 $new->$m($o);
1690 $new;
1693 =head2 $boolean = OBJ->ttl_reached ()
1695 =cut
1697 sub ttl_reached {
1698 my($self) = @_;
1699 my $have_mirrored = $self->have_mirrored || 0;
1700 my $now = Time::HiRes::time;
1701 my $ttl = $self->ttl;
1702 $ttl = 24.2 unless defined $ttl;
1703 if ($now > $have_mirrored + $ttl) {
1704 return 1;
1706 return 0;
1709 =head2 (void) $obj->unlock()
1711 Unlocking is implemented with an C<rmdir> on a locking directory
1712 (C<.lock> appended to $rfile).
1714 =cut
1716 sub unlock {
1717 my($self) = @_;
1718 return unless $self->_is_locked;
1719 my $rfile = $self->rfile;
1720 rmdir "$rfile.lock";
1721 $self->_is_locked (0);
1724 =head2 unseed
1726 Sets this recentfile in the state of not 'seeded'.
1728 =cut
1729 sub unseed {
1730 my($self) = @_;
1731 $self->seeded(0);
1734 =head2 $ret = $obj->update ($path, $type)
1736 Enter one file into the local I<recentfile>. $path is the (usually
1737 absolute) path. If the path is outside the I<our> tree, then it is
1738 ignored.
1740 $type is one of C<new> or C<delete>.
1742 The new file event is uhshifted to the array of recent_events and the
1743 array is shortened to the length of the timespan allowed. This is
1744 usually the timespan specified by the interval of this recentfile but
1745 as long as this recentfile has not been merged to another one, the
1746 timespan may grow without bounds.
1748 =cut
1749 sub _epoch_monotonically_increasing {
1750 my($self,$epoch,$recent) = @_;
1751 return $epoch unless @$recent; # the first one goes unoffended
1752 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1753 return $epoch;
1754 } else {
1755 return _increase_a_bit($recent->[0]{epoch});
1758 sub update {
1759 my($self,$path,$type) = @_;
1760 die "update called without path argument" unless defined $path;
1761 die "update called without type argument" unless defined $type;
1762 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1763 my $canonmeth = $self->canonize;
1764 unless ($canonmeth) {
1765 $canonmeth = "naive_path_normalize";
1767 $path = $self->$canonmeth($path);
1768 my $lrd = $self->localroot;
1769 if ($path =~ s|^\Q$lrd\E||) {
1770 $path =~ s|^/||;
1771 my $interval = $self->interval;
1772 my $secs = $self->interval_secs();
1773 $self->lock;
1774 # you must calculate the time after having locked, of course
1775 my $epoch = Time::HiRes::time;
1776 my $recent = $self->recent_events;
1777 $epoch = $self->_epoch_monotonically_increasing($epoch,$recent);
1778 $recent ||= [];
1779 my $oldest_allowed = 0;
1780 if (my $merged = $self->merged) {
1781 # XXX _bigfloat!
1782 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
1783 } else {
1784 # as long as we are not merged at all, no limits!
1786 TRUNCATE: while (@$recent) {
1787 if ($recent->[-1]{epoch} < $oldest_allowed) { # XXX _bigfloatlt!
1788 pop @$recent;
1789 } else {
1790 last TRUNCATE;
1793 # remove older duplicates of this $path, irrespective of $type:
1794 $recent = [ grep { $_->{path} ne $path } @$recent ];
1796 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1797 $self->write_recent($recent);
1798 $self->_assert_symlink;
1799 $self->unlock;
1803 =head2 seed
1805 Sets this recentfile in the state of 'seeded' which means it has to
1806 re-evaluate its uptodateness.
1808 =cut
1809 sub seed {
1810 my($self) = @_;
1811 $self->seeded(1);
1814 =head2 seeded
1816 Tells if the recentfile is in the state 'seeded'.
1818 =cut
1819 sub seeded {
1820 my($self, $set) = @_;
1821 if (defined $set) {
1822 $self->_seeded ($set);
1824 my $x = $self->_seeded;
1825 unless (defined $x) {
1826 $x = 0;
1827 $self->_seeded ($x);
1829 return $x;
1832 =head2 uptodate
1834 True if this object has mirrored the complete interval covered by the
1835 current recentfile.
1837 *** WIP ***
1839 =cut
1840 sub uptodate {
1841 my($self) = @_;
1842 my $uptodate;
1843 my $why;
1844 if ($self->_uptodateness_ever_reached and not $self->seeded) {
1845 $why = "saturated";
1846 $uptodate = 1;
1848 unless (defined $uptodate) {
1849 if ($self->ttl_reached){
1850 $why = "ttl_reached returned true, so we are not uptodate";
1851 $uptodate = 0 ;
1854 unless (defined $uptodate) {
1855 # look if recentfile has unchanged timestamp
1856 my $minmax = $self->minmax;
1857 if (exists $minmax->{mtime}) {
1858 my $rfile = $self->_my_current_rfile;
1859 my @stat = stat $rfile;
1860 my $mtime = $stat[9];
1861 if ($mtime > $minmax->{mtime}) {
1862 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
1863 $uptodate = 0;
1864 } else {
1865 my $covered = $self->done->covered(@$minmax{qw(max min)});
1866 $why = "minmax covered[$covered], so we return that";
1867 $uptodate = $covered;
1871 unless (defined $uptodate) {
1872 $why = "fallthrough, so not uptodate";
1873 $uptodate = 0;
1875 if ($uptodate) {
1876 $self->_uptodateness_ever_reached(1);
1877 $self->unseed;
1879 my $remember =
1881 uptodate => $uptodate,
1882 why => $why,
1884 $self->_remember_last_uptodate_call($remember);
1885 return $uptodate;
1888 =head2 $obj->write_recent ($recent_files_arrayref)
1890 Writes a I<recentfile> based on the current reflection of the current
1891 state of the tree limited by the current interval.
1893 =cut
1895 sub write_recent {
1896 my ($self,$recent) = @_;
1897 die "write_recent called without argument" unless defined $recent;
1898 my $meth = sprintf "write_%d", $self->protocol;
1899 $self->$meth($recent);
1902 =head2 $obj->write_0 ($recent_files_arrayref)
1904 Delegate of C<write_recent()> on protocol 0
1906 =cut
1908 sub write_0 {
1909 my ($self,$recent) = @_;
1910 my $rfile = $self->rfile;
1911 YAML::Syck::DumpFile("$rfile.new",$recent);
1912 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1915 =head2 $obj->write_1 ($recent_files_arrayref)
1917 Delegate of C<write_recent()> on protocol 1
1919 =cut
1921 sub write_1 {
1922 my ($self,$recent) = @_;
1923 my $rfile = $self->rfile;
1924 my $suffix = $self->serializer_suffix;
1925 my $data = {
1926 meta => $self->meta_data,
1927 recent => $recent,
1929 my $serialized;
1930 if ($suffix eq ".yaml") {
1931 $serialized = YAML::Syck::Dump($data);
1932 } elsif ($HAVE->{"Data::Serializer"}) {
1933 my $serializer = Data::Serializer->new
1934 ( serializer => $serializers{$suffix} );
1935 $serialized = $serializer->raw_serialize($data);
1936 } else {
1937 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1939 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
1940 print $fh $serialized;
1941 close $fh or die "Could not close '$rfile.new': $!";
1942 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1945 BEGIN {
1946 my @pod_lines =
1947 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1949 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1951 The idea is that we want to have a short file that records really
1952 recent changes. So that a fresh mirror can be kept fresh as long as
1953 the connectivity is given. Then we want longer files that record the
1954 history before. So when the mirror falls behind the update period
1955 reflected in the shortest file, it can complement the list of recent
1956 file events with the next one. And if this is not long enough we want
1957 another one, again a bit longer. And we want one that completes the
1958 history back to the oldest file. The index files do contain the
1959 complete list of current files. The larger an index file is the less
1960 often it is updated. For practical reasons adjacent files will often
1961 overlap a bit but this is neither necessary nor enforced. That's the
1962 basic idea. The following example represents a tree that has a few
1963 updates every day:
1965 RECENT.recent -> RECENT-1h.yaml
1966 RECENT-6h.yaml
1967 RECENT-1d.yaml
1968 RECENT-1M.yaml
1969 RECENT-1W.yaml
1970 RECENT-1Q.yaml
1971 RECENT-1Y.yaml
1972 RECENT-Z.yaml
1974 The first file is the principal file, in so far it is the one that is
1975 written first after a filesystem change. Usually a symlink links to it
1976 with a filename that has the same filenameroot and the suffix
1977 C<.recent>. On systems that do not support symlinks there is a plain
1978 copy maintained instead.
1980 The last file, the Z file, contains the complementary files that are
1981 in none of the other files. It does never contain C<deletes>. Besides
1982 this it serves the role of a recovery mechanism or spill over pond.
1983 When things go wrong, it's a valuable controlling instance to hold the
1984 differences between the collection of limited interval files and the
1985 actual filesystem.
1987 =head2 A SINGLE RECENTFILE
1989 A I<recentfile> consists of a hash that has two keys: C<meta> and
1990 C<recent>. The C<meta> part has metadata and the C<recent> part has a
1991 list of fileobjects.
1993 =head2 THE META PART
1995 Here we find things that are pretty much self explaining: all
1996 lowercase attributes are accessors and as such explained somewhere
1997 above in this manpage. The uppercase attribute C<Producers> contains
1998 version information about involved software components. Nothing to
1999 worry about as I believe.
2001 =head2 THE RECENT PART
2003 This is the interesting part. Every entry refers to some filesystem
2004 change (with path, epoch, type). The epoch value is the point in time
2005 when some change was I<registered>. Do not be tempted to believe that
2006 the entry has a direct relation to something like modification time or
2007 change time on the filesystem level. The timestamp (I<epoch> element)
2008 is a floating point number and does practically never correspond
2009 exactly to the data recorded in the filesystem but rather to the time
2010 when some process succeeded to report to the I<recentfile> mechanism
2011 that something has changed. This is why many parts of the code refer
2012 to I<events>, because we merely try to record the I<event> of the
2013 discovery of a change, not the time of the change itself.
2015 All these entries can be devided into two types (denoted by the
2016 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
2017 C<new>s. Deletes are C<delete>s.
2019 Another distinction is for objects with an epoch timestamp and others
2020 without. All files that were already existing on the filesystem before
2021 the I<recentfile> mechanism was installed, get recorded with a
2022 timestamp of zero.
2024 Besides an C<epoch> and a C<type> attribute we find a third one:
2025 C<path>. This path is relative to the directory we find the
2026 I<recentfile> in.
2028 The order of the entries in the I<recentfile> is by decreasing epoch
2029 attribute. These are either 0 or a unique floating point number. They
2030 are zero for events that were happening either before the time that
2031 the I<recentfile> mechanism was set up or were left undiscovered for a
2032 while and never handed over to update(). They are floating point
2033 numbers for all events being regularly handed to update(). And when
2034 the server has ntp running correctly, then the timestamps are
2035 actually decreasing and unique.
2037 =head1 CORRUPTION AND RECOVERY
2039 If the origin host breaks the promise to deliver consistent and
2040 complete I<recentfiles> then the way back to sanity shall be achieved
2041 through either the C<zloop> (still TBD) or traditional rsyncing
2042 between the hosts. For example, if the origin server forgets to deploy
2043 ntp and the clock on it jumps backwards some day, then this would
2044 probably go unnoticed for a while and many software components that
2045 rely on the time never running backwards will make wrong decisions.
2046 After some time this accident would probably still be found in one of
2047 the I<recentfiles> but would become meaningless as soon as a mirror
2048 has run through the sanitizing procedures. Same goes for origin hosts
2049 that forget to include or deliberately omit some files.
2051 =head1 SERIALIZERS
2053 The following suffixes are supported and trigger the use of these
2054 serializers:
2056 =over 4
2058 =item C<< ".yaml" => "YAML::Syck" >>
2060 =item C<< ".json" => "JSON" >>
2062 =item C<< ".sto" => "Storable" >>
2064 =item C<< ".dd" => "Data::Dumper" >>
2066 =back
2068 =cut
2070 BEGIN {
2071 my @pod_lines =
2072 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2074 =head1 INTERVAL SPEC
2076 An interval spec is a primitive way to express time spans. Normally it
2077 is composed from an integer and a letter.
2079 As a special case, a string that consists only of the single letter
2080 C<Z>, stands for unlimited time.
2082 The following letters express the specified number of seconds:
2084 =over 4
2086 =item C<< s => 1 >>
2088 =item C<< m => 60 >>
2090 =item C<< h => 60*60 >>
2092 =item C<< d => 60*60*24 >>
2094 =item C<< W => 60*60*24*7 >>
2096 =item C<< M => 60*60*24*30 >>
2098 =item C<< Q => 60*60*24*90 >>
2100 =item C<< Y => 60*60*24*365.25 >>
2102 =back
2104 =cut
2106 =head1 BACKGROUND
2108 This is about speeding up rsync operation on large trees to many
2109 places. Uses a small metadata cocktail and pull technology.
2111 =head2 NON-COMPETITORS
2113 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
2114 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
2115 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
2116 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
2117 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
2118 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
2120 rsnapshot www.rsnapshot.org focus on backup
2121 csync www.csync.org more like unison
2123 =head2 COMPETITORS
2125 The problem to solve which clusters and ftp mirrors and otherwise
2126 replicated datasets like CPAN share: how to transfer only a minimum
2127 amount of data to determine the diff between two hosts.
2129 Normally it takes a long time to determine the diff itself before it
2130 can be transferred. Known solutions at the time of this writing are
2131 csync2, and rsync 3 batch mode.
2133 For many years the best solution was csync2 which solves the
2134 problem by maintining a sqlite database on both ends and talking a
2135 highly sophisticated protocol to quickly determine which files to send
2136 and which to delete at any given point in time. Csync2 is often
2137 inconvenient because the act of syncing demands quite an intimate
2138 relationship between the sender and the receiver and suffers when the
2139 number of syncing sites is large or connections are unreliable.
2141 Rsync 3 batch mode works around these problems by providing rsync-able
2142 batch files which allow receiving nodes to replay the history of the
2143 other nodes. This reduces the need to have an incestuous relation but
2144 it has the disadvantage that these batch files replicate the contents
2145 of the involved files. This seems inappropriate when the nodes already
2146 have a means of communicating over rsync.
2148 rersyncrecent solves this problem with a couple of (usually 2-10)
2149 index files which cover different overlapping time intervals. The
2150 master writes these files and the clients can construct the full tree
2151 from the information contained in them. The most recent index file
2152 usually covers the last seconds or minutes or hours of the tree and
2153 depending on the needs, slaves can rsync every few seconds and then
2154 bring their trees in full sync.
2156 The rersyncrecent mode was developed for CPAN but I hope it is a
2157 convenient and economic general purpose solution. I'm looking forward
2158 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2159 then ... the first FUSE based CPAN filesystem anyone?
2161 =head1 AUTHOR
2163 Andreas König
2165 =head1 BUGS
2167 Please report any bugs or feature requests through the web interface
2169 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2170 I will be notified, and then you'll automatically be notified of
2171 progress on your bug as I make changes.
2173 =head1 SUPPORT
2175 You can find documentation for this module with the perldoc command.
2177 perldoc File::Rsync::Mirror::Recentfile
2179 You can also look for information at:
2181 =over 4
2183 =item * RT: CPAN's request tracker
2185 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2187 =item * AnnoCPAN: Annotated CPAN documentation
2189 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2191 =item * CPAN Ratings
2193 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2195 =item * Search CPAN
2197 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2199 =back
2202 =head1 ACKNOWLEDGEMENTS
2204 Thanks to RJBS for module-starter.
2206 =head1 COPYRIGHT & LICENSE
2208 Copyright 2008 Andreas König.
2210 This program is free software; you can redistribute it and/or modify it
2211 under the same terms as Perl itself.
2214 =cut
2216 1; # End of File::Rsync::Mirror::Recentfile
2218 # Local Variables:
2219 # mode: cperl
2220 # cperl-indent-level: 4
2221 # End: