epoch now guaranteed to be strict monotonically increasing
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob778cbb4ab3f4dca5734284cc487f7fbf957a1cdd
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 my $HAVE = {};
19 for my $package (
20 "Data::Serializer",
21 "File::Rsync"
22 ) {
23 $HAVE->{$package} = eval qq{ require $package; };
25 use Config;
26 use File::Basename qw(dirname fileparse);
27 use File::Copy qw(cp);
28 use File::Path qw(mkpath);
29 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
30 use File::Temp;
31 use List::Util qw(first min);
32 use Scalar::Util qw(reftype);
33 use Storable;
34 use Time::HiRes qw();
35 use YAML::Syck;
37 use version; our $VERSION = qv('0.0.1');
40 use constant MAX_INT => ~0>>1; # anything better?
41 use constant DEFAULT_PROTOCOL => 1;
43 # cf. interval_secs
44 my %seconds;
46 # maybe subclass if this mapping is bad?
47 my %serializers;
49 =head1 SYNOPSIS
51 B<!!!! PRE-ALPHA ALERT !!!!>
53 Nothing in here is believed to be stable, nothing yet intended for
54 public consumption. The plan is to provide a script in one of the next
55 releases that acts as a frontend for all the backend functionality.
56 Option and method names will very likely change.
58 For the rationale see the section BACKGROUND.
60 This is published only for developers of the (yet to be named)
61 script(s).
63 Writer (of a single file):
65 use File::Rsync::Mirror::Recentfile;
66 my $fr = File::Rsync::Mirror::Recentfile->new
68 interval => q(6h),
69 filenameroot => "RECENT",
70 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
71 localroot => "/home/ftp/pub/PAUSE/authors/",
72 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
74 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
76 Reader/mirrorer:
78 my $rf = File::Rsync::Mirror::Recentfile->new
80 filenameroot => "RECENT",
81 ignore_link_stat_errors => 1,
82 interval => q(6h),
83 localroot => "/home/ftp/pub/PAUSE/authors",
84 remote_dir => "",
85 remote_host => "pause.perl.org",
86 remote_module => "authors",
87 rsync_options => {
88 compress => 1,
89 'rsync-path' => '/usr/bin/rsync',
90 links => 1,
91 times => 1,
92 'omit-dir-times' => 1,
93 checksum => 1,
95 verbose => 1,
97 $rf->mirror;
99 Aggregator (usually the writer):
101 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
102 $rf->aggregate;
104 =head1 EXPORT
106 No exports.
108 =head1 CONSTRUCTORS / DESTRUCTOR
110 =head2 my $obj = CLASS->new(%hash)
112 Constructor. On every argument pair the key is a method name and the
113 value is an argument to that method name.
115 If a recentfile for this resource already exists, metadata that are
116 not defined by the constructor will be fetched from there as soon as
117 it is being read by recent_events().
119 =cut
121 sub new {
122 my($class, @args) = @_;
123 my $self = bless {}, $class;
124 while (@args) {
125 my($method,$arg) = splice @args, 0, 2;
126 $self->$method($arg);
128 unless (defined $self->protocol) {
129 $self->protocol(DEFAULT_PROTOCOL);
131 unless (defined $self->filenameroot) {
132 $self->filenameroot("RECENT");
134 unless (defined $self->serializer_suffix) {
135 $self->serializer_suffix(".yaml");
137 return $self;
140 =head2 my $obj = CLASS->new_from_file($file)
142 Constructor. $file is a I<recentfile>.
144 =cut
146 sub new_from_file {
147 my($class, $file) = @_;
148 my $self = bless {}, $class;
149 $self->_rfile($file);
150 #?# $self->lock;
151 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
152 local $/;
153 <$fh>;
155 # XXX: we can skip this step when the metadata are sufficient, but
156 # we cannot parse the file without some magic stuff about
157 # serialized formats
158 while (-l $file) {
159 my($name,$path) = fileparse $file;
160 my $symlink = readlink $file;
161 if ($symlink =~ m|/|) {
162 die "FIXME: filenames containing '/' not supported, got $symlink";
164 $file = File::Spec->catfile ( $path, $symlink );
166 my($name,$path,$suffix) = fileparse $file, keys %serializers;
167 $self->serializer_suffix($suffix);
168 $self->localroot($path);
169 die "Could not determine file format from suffix" unless $suffix;
170 my $deserialized;
171 if ($suffix eq ".yaml") {
172 require YAML::Syck;
173 $deserialized = YAML::Syck::LoadFile($file);
174 } elsif ($HAVE->{"Data::Serializer"}) {
175 my $serializer = Data::Serializer->new
176 ( serializer => $serializers{$suffix} );
177 $deserialized = $serializer->raw_deserialize($serialized);
178 } else {
179 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
181 while (my($k,$v) = each %{$deserialized->{meta}}) {
182 next if $k ne lc $k; # "Producers"
183 $self->$k($v);
185 unless (defined $self->protocol) {
186 $self->protocol(DEFAULT_PROTOCOL);
188 return $self;
191 =head2 DESTROY
193 A simple unlock.
195 =cut
196 sub DESTROY { shift->unlock }
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_done",
209 "_interval",
210 "_is_locked",
211 "_localroot",
212 "_merged",
213 "_pathdb",
214 "_remote_dir",
215 "_remoteroot",
216 "_rfile",
217 "_rsync",
218 "_use_tempfile",
221 my @pod_lines =
222 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
224 =over 4
226 =item aggregator
228 A list of interval specs that tell the aggregator which I<recentfile>s
229 are to be produced.
231 =item canonize
233 The name of a method to canonize the path before rsyncing. Only
234 supported value is C<naive_path_normalize>. Defaults to that.
236 =item comment
238 A comment about this tree and setup.
240 =item filenameroot
242 The (prefix of the) filename we use for this I<recentfile>. Defaults to
243 C<RECENT>.
245 =item have_mirrored
247 Timestamp remembering when we mirrored this recentfile the last time.
248 Only relevant for slaves.
250 =item ignore_link_stat_errors
252 If set to true, rsync errors are ignored that complain about link stat
253 errors. These seem to happen only when there are files missing at the
254 origin. In race conditions this can always happen, so it is
255 recommended to set this value to true.
257 =item is_slave
259 If set to true, this object will fetch a new recentfile from remote
260 when the timespan between the last mirror (see have_mirrored) and now
261 is too large (currently hardcoded arbitrary 420 seconds).
263 =item locktimeout
265 After how many seconds shall we die if we cannot lock a I<recentfile>?
266 Defaults to 600 seconds.
268 =item loopinterval
270 When mirror_loop is called, this accessor can specify how much time
271 every loop shall at least take. If the work of a loop is done before
272 that time has gone, sleeps for the rest of the time. Defaults to
273 arbitrary 42 seconds.
275 =item max_files_per_connection
277 Maximum number of files that are transferred on a single rsync call.
278 Setting it higher means higher performance at the price of holding
279 connections longer and potentially disturbing other users in the pool.
280 Defaults to the arbitrary value 42.
282 =item max_rsync_errors
284 When rsync operations encounter that many errors without any resetting
285 success in between, then we die. Defaults to arbitrary 12. A value of
286 -1 means we run forever ignoring all rsync errors.
288 =item minmax
290 Hashref remembering when we read the recent_events from this file the
291 last time and what the timespan was.
293 =item protocol
295 When the RECENT file format changes, we increment the protocol. We try
296 to support older protocols in later releases.
298 =item remote_host
300 The host we are mirroring from. Leave empty for the local filesystem.
302 =item remote_module
304 Rsync servers have so called modules to separate directory trees from
305 each other. Put here the name of the module under which we are
306 mirroring. Leave empty for local filesystem.
308 =item rsync_options
310 Things like compress, links, times or checksums. Passed in to the
311 File::Rsync object used to run the mirror.
313 =item serializer_suffix
315 Mostly untested accessor. The only well tested format for
316 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
317 Data::Serializer. But in principle other formats are supported as
318 well. See section SERIALIZERS below.
320 =item sleep_per_connection
322 Sleep that many seconds (floating point OK) after every chunk of rsyncing
323 has finished. Defaults to arbitrary 0.42.
325 =item ttl
327 Time to live. Number of seconds after which this recentfile must be
328 fetched again from the origin server. Only relevant for slaves.
329 Defaults to arbitrary 24.2 seconds.
331 =item verbose
333 Boolean to turn on a bit verbosity.
335 =back
337 =cut
339 use accessors @accessors;
341 =head1 METHODS
343 =head2 (void) $obj->aggregate
345 Takes all intervals that are collected in the accessor called
346 aggregator. Sorts them by actual length of the interval.
347 Removes those that are shorter than our own interval. Then merges this
348 object into the next larger object. The merging continues upwards
349 as long as the next I<recentfile> is old enough to warrant a merge.
351 If a merge is warranted is decided according to the interval of the
352 previous interval so that larger files are not so often updated as
353 smaller ones.
355 Here is an example to illustrate the behaviour. Given aggregators
357 1h 1d 1W 1M 1Q 1Y Z
359 then
361 1h updates 1d on every call to aggregate()
362 1d updates 1W earliest after 1h
363 1W updates 1M earliest after 1d
364 1M updates 1Q earliest after 1W
365 1Q updates 1Y earliest after 1M
366 1Y updates Z earliest after 1Q
368 Note that all but the smallest recentfile get updated at an arbitrary
369 rate and as such are quite useless on their own.
371 =cut
373 sub aggregate {
374 my($self) = @_;
375 my @aggs = sort { $a->{secs} <=> $b->{secs} }
376 grep { $_->{secs} >= $self->interval_secs }
377 map { { interval => $_, secs => $self->interval_secs($_)} }
378 $self->interval, @{$self->aggregator || []};
379 $aggs[0]{object} = $self;
380 AGGREGATOR: for my $i (0..$#aggs-1) {
381 my $this = $aggs[$i]{object};
382 my $next = $this->_sparse_clone;
383 $next->interval($aggs[$i+1]{interval});
384 my $want_merge = 0;
385 if ($i == 0) {
386 $want_merge = 1;
387 } else {
388 my $next_rfile = $next->rfile;
389 if (-e $next_rfile) {
390 my $prev = $aggs[$i-1]{object};
391 local $^T = time;
392 my $next_age = 86400 * -M $next_rfile;
393 if ($next_age > $prev->interval_secs) {
394 $want_merge = 1;
396 } else {
397 $want_merge = 1;
400 if ($want_merge) {
401 $next->merge($this);
402 $aggs[$i+1]{object} = $next;
403 } else {
404 last AGGREGATOR;
409 # collect file size and mtime for all files of this aggregate
410 sub _debug_aggregate {
411 my($self) = @_;
412 my @aggs = sort { $a->{secs} <=> $b->{secs} }
413 map { { interval => $_, secs => $self->interval_secs($_)} }
414 $self->interval, @{$self->aggregator || []};
415 my $report = [];
416 for my $i (0..$#aggs) {
417 my $this = Storable::dclone $self;
418 $this->interval($aggs[$i]{interval});
419 my $rfile = $this->rfile;
420 my @stat = stat $rfile;
421 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
423 $report;
426 # (void) $self->_assert_symlink()
427 sub _assert_symlink {
428 my($self) = @_;
429 my $recentrecentfile = File::Spec->catfile
431 $self->localroot,
432 sprintf
434 "%s.recent",
435 $self->filenameroot
438 if ($Config{d_symlink} eq "define") {
439 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
440 if (-l $recentrecentfile) {
441 my $found_symlink = readlink $recentrecentfile;
442 if ($found_symlink eq $self->rfilename) {
443 return;
444 } else {
445 $howto_create_symlink = 2;
447 } else {
448 $howto_create_symlink = 1;
450 if (1 == $howto_create_symlink) {
451 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
452 } else {
453 unlink "$recentrecentfile.$$"; # may fail
454 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
455 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
457 } else {
458 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
459 unlink "$recentrecentfile.$$"; # may fail
460 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
461 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
465 =head2 $done = $obj->done
467 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
468 that keeps track of rsync activities. Only used/needed when we are a
469 mirroring slave.
471 =cut
473 sub done {
474 my($self) = @_;
475 my $done = $self->_done;
476 if (!$done) {
477 require File::Rsync::Mirror::Recentfile::Done;
478 $done = File::Rsync::Mirror::Recentfile::Done->new();
479 $self->_done ( $done );
481 return $done;
484 =head2 $success = $obj->full_mirror
486 (TBD) Mirrors the whole remote site, starting with the smallest I<recentfile>,
487 switching to larger ones ...
489 =cut
491 sub full_mirror {
492 my($self) = @_;
493 die "FIXME: Not yet implemented";
496 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
498 Stores the remote I<recentfile> locally as a tempfile. The caller is
499 responsible to remove the file after use.
501 Note: if you're intending to act as an rsync server for other slaves,
502 then you must prefer this method to fetch that file with
503 get_remotefile(). Otherwise downstream mirrors would expect you to
504 already have mirrored all the files that are in the I<recentfile>
505 before you have them mirrored.
507 =cut
509 sub get_remote_recentfile_as_tempfile {
510 my($self) = @_;
511 mkpath $self->localroot;
512 my $fh;
513 my $trfilename;
514 if ( $self->_use_tempfile() ) {
515 return $self->_current_tempfile if ! $self->ttl_reached;
516 $fh = $self->_current_tempfile_fh;
517 $trfilename = $self->rfilename;
518 } else {
519 $trfilename = $self->rfilename;
522 my $dst;
523 if ($fh) {
524 $dst = $self->_current_tempfile;
525 } else {
526 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
527 $dst = $fh->filename;
528 $self->_current_tempfile ($dst);
529 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
530 if (defined $rfile && -e $rfile) {
531 # saving on bandwidth. Might need to be configurable
532 # $self->bandwidth_is_cheap?
533 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
536 my $src = join ("/",
537 $self->remoteroot,
538 $trfilename,
540 if ($self->verbose) {
541 my $doing = -e $dst ? "Syncing" : "Getting";
542 printf STDERR
544 "%s (1/1) temporary %s ... ",
545 $doing,
546 $dst,
549 my $gaveup = 0;
550 my $retried = 0;
551 while (!$self->rsync->exec(
552 src => $src,
553 dst => $dst,
554 )) {
555 $self->register_rsync_error ($self->rsync->err);
556 if (++$retried >= 3) {
557 warn "XXX giving up";
558 $gaveup = 1;
559 last;
562 unless ($gaveup) {
563 $self->have_mirrored (Time::HiRes::time);
564 $self->un_register_rsync_error ();
566 if ($self->verbose) {
567 print STDERR "DONE\n";
569 my $mode = 0644;
570 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
571 return $dst;
574 sub _get_remote_rat_provide_tempfile_object {
575 my($self, $trfilename) = @_;
576 my $fh = File::Temp->new
577 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
578 $trfilename,
580 DIR => $self->localroot,
581 SUFFIX => $self->serializer_suffix,
582 UNLINK => $self->_use_tempfile,
584 if ($self->_use_tempfile) {
585 $self->_current_tempfile_fh ($fh); # delay self destruction
587 return $fh;
590 =head2 $localpath = $obj->get_remotefile ( $relative_path )
592 Rsyncs one single remote file to local filesystem.
594 Note: no locking is done on this file. Any number of processes may
595 mirror this object.
597 Note II: do not use for recentfiles. If you are a cascading
598 slave/server combination, it would confuse other slaves. They would
599 expect the contents of these recentfiles to be available. Use
600 get_remote_recentfile_as_tempfile() instead.
602 =cut
604 sub get_remotefile {
605 my($self, $path) = @_;
606 my $dst = File::Spec->catfile($self->localroot, $path);
607 mkpath dirname $dst;
608 if ($self->verbose) {
609 my $doing = -e $dst ? "Syncing" : "Getting";
610 printf STDERR
612 "%s (1/1) %s ... ",
613 $doing,
614 $path,
617 while (!$self->rsync->exec(
618 src => join("/",
619 $self->remoteroot,
620 $path),
621 dst => $dst,
622 )) {
623 $self->register_rsync_error ($self->rsync->err);
625 $self->un_register_rsync_error ();
626 if ($self->verbose) {
627 print STDERR "DONE\n";
629 return $dst;
632 =head2 $obj->interval ( $interval_spec )
634 Get/set accessor. $interval_spec is a string and described below in
635 the section INTERVAL SPEC.
637 =cut
639 sub interval {
640 my ($self, $interval) = @_;
641 if (@_ >= 2) {
642 $self->_interval($interval);
643 $self->_rfile(undef);
645 $interval = $self->_interval;
646 unless (defined $interval) {
647 # do not ask the $self too much, it recurses!
648 require Carp;
649 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
651 return $interval;
654 =head2 $secs = $obj->interval_secs ( $interval_spec )
656 $interval_spec is described below in the section INTERVAL SPEC. If
657 empty defaults to the inherent interval for this object.
659 =cut
661 sub interval_secs {
662 my ($self, $interval) = @_;
663 $interval ||= $self->interval;
664 unless (defined $interval) {
665 die "interval_secs() called without argument on an object without a declared one";
667 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
668 die "Could not determine seconds from interval[$interval]";
669 if ($interval eq "Z") {
670 return MAX_INT;
671 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
672 return $seconds{$t}*$n;
673 } else {
674 die "Invalid interval specification: n[$n]t[$t]";
678 =head2 $obj->localroot ( $localroot )
680 Get/set accessor. The local root of the tree.
682 =cut
684 sub localroot {
685 my ($self, $localroot) = @_;
686 if (@_ >= 2) {
687 $self->_localroot($localroot);
688 $self->_rfile(undef);
690 $localroot = $self->_localroot;
693 =head2 $ret = $obj->local_path($path_found_in_recentfile)
695 Combines the path to our local mirror and the path of an object found
696 in this I<recentfile>. In other words: the target of a mirror operation.
698 Implementation note: We split on slashes and then use
699 File::Spec::catfile to adjust to the local operating system.
701 =cut
703 sub local_path {
704 my($self,$path) = @_;
705 unless (defined $path) {
706 # seems like a degenerated case
707 return $self->localroot;
709 my @p = split m|/|, $path;
710 File::Spec->catfile($self->localroot,@p);
713 =head2 (void) $obj->lock
715 Locking is implemented with an C<mkdir> on a locking directory
716 (C<.lock> appended to $rfile).
718 =cut
720 sub lock {
721 my ($self) = @_;
722 # not using flock because it locks on filehandles instead of
723 # old school ressources.
724 my $locked = $self->_is_locked and return;
725 my $rfile = $self->rfile;
726 # XXX need a way to allow breaking the lock
727 my $start = time;
728 my $locktimeout = $self->locktimeout || 600;
729 while (not mkdir "$rfile.lock") {
730 Time::HiRes::sleep 0.01;
731 if (time - $start > $locktimeout) {
732 die "Could not acquire lockdirectory '$rfile.lock': $!";
735 $self->_is_locked (1);
738 =head2 (void) $obj->merge ($other)
740 Bulk update of this object with another one. It's used to merge a
741 smaller and younger $other object into the current one. If this file
742 is a C<Z> file, then we do not merge in objects of type C<delete>. But
743 if we encounter an object of type delete we delete the corresponding
744 C<new> object.
746 If there is nothing to be merged, nothing is done.
748 =cut
750 sub merge {
751 my($self, $other) = @_;
752 $self->_merge_sanitycheck ( $other );
753 $other->lock;
754 my $other_recent = $other->recent_events || [];
755 $self->lock;
756 my $my_recent = $self->recent_events || [];
758 # calculate the target time span
759 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
760 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
761 my $oldest_allowed = 0;
762 my $something_done;
763 unless ($my_recent->[0]) {
764 # obstetrics
765 $something_done=1;
767 if ($epoch) {
768 if (my $merged = $self->merged) {
769 my $secs = $self->interval_secs();
770 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
772 # throw away outsiders
773 # XXX _bigfloat!
774 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
775 pop @$my_recent;
776 $something_done=1;
780 my %have;
781 my $recent = [];
782 for my $oev (@$other_recent) {
783 my $oevepoch = $oev->{epoch} || 0;
784 next if $oevepoch < $oldest_allowed;
785 my $path = $oev->{path};
786 next if $have{$path}++;
787 if ( $self->interval eq "Z"
788 and $oev->{type} eq "delete") {
789 # do nothing
790 } else {
791 if (!$myepoch || $oevepoch > $myepoch) {
792 $something_done=1;
794 push @$recent, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
797 if ($something_done) {
798 push @$recent, grep { !$have{$_->{path}}++ } @$my_recent;
799 $self->write_recent($recent);
800 $other->merged({
801 time => Time::HiRes::time, # not used anywhere
802 epoch => $epoch, # used in oldest_allowed
803 into_interval => $self->interval, # not used anywhere
805 $other->write_recent($other_recent);
807 $self->unlock;
808 $other->unlock;
811 sub _merge_sanitycheck {
812 my($self, $other) = @_;
813 if ($self->interval_secs <= $other->interval_secs) {
814 die sprintf
816 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
817 $self->interval_secs,
818 $other->interval_secs,
823 =head2 merged
825 Hashref denoting when this recentfile has been merged into some other
826 at which epoch.
828 =cut
830 sub merged {
831 my($self, $set) = @_;
832 if (defined $set) {
833 $self->_merged ($set);
835 my $merged = $self->_merged;
836 my $into;
837 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
838 if ($into eq $self->interval) {
839 warn sprintf
841 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
842 $into,
843 $self->interval,
845 } elsif ($self->interval_secs($into) < $self->interval_secs) {
846 warn sprintf
848 "Warning: into_interval[%s] smaller than own interval[%s] on interval[%s]. Danger ahead.",
849 $self->interval_secs($into),
850 $self->interval_secs,
851 $self->interval,
855 $merged;
858 =head2 $hashref = $obj->meta_data
860 Returns the hashref of metadata that the server has to add to the
861 I<recentfile>.
863 =cut
865 sub meta_data {
866 my($self) = @_;
867 my $ret = $self->{meta};
868 for my $m (
869 "aggregator",
870 "canonize",
871 "comment",
872 "filenameroot",
873 "merged",
874 "interval",
875 "protocol",
876 "serializer_suffix",
878 my $v = $self->$m;
879 if (defined $v) {
880 $ret->{$m} = $v;
883 # XXX need to reset the Producer if I am a writer, keep it when I
884 # am a reader
885 $ret->{Producers} ||= {
886 __PACKAGE__, "$VERSION", # stringified it looks better
887 '$0', $0,
888 'time', Time::HiRes::time,
890 return $ret;
893 =head2 $success = $obj->mirror ( %options )
895 Mirrors the files in this I<recentfile> as reported by
896 C<recent_events>. Options named C<after>, C<before>, C<max>, and
897 C<skip-deletes> are passed through to the L<recent_events> call. The
898 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
899 C<max_files_per_connection> and keep track of the rsynced files so
900 that future calls will rsync different files until all files are
901 brought to sync.
903 =cut
905 sub mirror {
906 my($self, %options) = @_;
907 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
908 $self->_use_tempfile (1);
909 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
910 my ($recent_events) = $self->recent_events(%passthrough);
911 my(@error, @xcollector);
912 my $first_item = 0;
913 my $last_item = $#$recent_events;
914 my $done = $self->done;
915 my $pathdb = $self->_pathdb;
916 ITEM: for my $i ($first_item..$last_item) {
917 my $status = +{};
918 $self->_mirror_item
921 $recent_events,
922 $last_item,
923 $done,
924 $pathdb,
925 \@xcollector,
926 \%options,
927 $status,
928 \@error,
930 return if $status->{mustreturn};
932 if (@xcollector) {
933 my $success = eval { $self->_empty_xcollector (\@xcollector,$pathdb,$recent_events);};
934 if (!$success || $@) {
935 warn "Warning: Unknown error while mirroring: $@";
936 push @error, $@;
937 sleep 1;
939 if ($self->verbose) {
940 print STDERR "DONE\n";
943 my $rfile = $self->rfile;
944 unless (rename $trecentfile, $rfile) {
945 require Carp;
946 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
948 $self->_use_tempfile (0);
949 if (my $ctfh = $self->_current_tempfile_fh) {
950 $ctfh->unlink_on_destroy (0);
951 $self->_current_tempfile_fh (undef);
953 return !@error;
956 sub _mirror_item {
957 my($self,
959 $recent_events,
960 $last_item,
961 $done,
962 $pathdb,
963 $xcollector,
964 $options,
965 $status,
966 $error,
967 ) = @_;
968 my $recent_event = $recent_events->[$i];
969 return if $done->covered ( $recent_event->{epoch} );
970 if ($pathdb) {
971 my $rec = $pathdb->{$recent_event->{path}};
972 if ($rec && $rec->{recentepoch}) {
973 if (_bigfloatgt
974 ( $rec->{recentepoch}, $recent_event->{epoch} )){
975 $done->register ($recent_events, [$i]);
976 return;
980 my $dst = $self->local_path($recent_event->{path});
981 if ($recent_event->{type} eq "new"){
982 $self->_mirror_item_new
984 $dst,
986 $last_item,
987 $recent_events,
988 $recent_event,
989 $xcollector,
990 $pathdb,
991 $status,
992 $error,
993 $options,
995 } elsif ($recent_event->{type} eq "delete") {
996 my $activity;
997 if ($options->{'skip-deletes'}) {
998 $activity = "skipped";
999 } else {
1000 if (! -e $dst) {
1001 $activity = "not_found";
1002 } elsif (-l $dst or not -d _) {
1003 unless (unlink $dst) {
1004 require Carp;
1005 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1007 $activity = "deleted";
1008 } else {
1009 unless (rmdir $dst) {
1010 require Carp;
1011 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1013 $activity = "deleted";
1016 $done->register ($recent_events, [$i]);
1017 if ($pathdb) {
1018 $self->_register_path($pathdb,[$recent_event],$activity);
1020 } else {
1021 warn "Warning: invalid upload type '$recent_event->{type}'";
1025 sub _mirror_item_new {
1026 my($self,
1027 $dst,
1029 $last_item,
1030 $recent_events,
1031 $recent_event,
1032 $xcollector,
1033 $pathdb,
1034 $status,
1035 $error,
1036 $options,
1037 ) = @_;
1038 if ($self->verbose) {
1039 my $doing = -e $dst ? "Syncing" : "Getting";
1040 printf STDERR
1042 "%s (%d/%d/%s) %s ... ",
1043 $doing,
1044 1+$i,
1045 1+$last_item,
1046 $self->interval,
1047 $recent_event->{path},
1050 my $max_files_per_connection = $self->max_files_per_connection || 42;
1051 my $success;
1052 if ($self->verbose) {
1053 print STDERR "\n";
1055 push @$xcollector, { rev => $recent_event, i => $i };
1056 if (@$xcollector >= $max_files_per_connection) {
1057 $success = eval {$self->_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1058 my $sleep = $self->sleep_per_connection;
1059 $sleep = 0.42 unless defined $sleep;
1060 Time::HiRes::sleep $sleep;
1061 if ($options->{piecemeal}) {
1062 $status->{mustreturn} = 1;
1063 return;
1065 } else {
1066 return;
1068 if (!$success || $@) {
1069 warn "Warning: Error while mirroring: $@";
1070 push @$error, $@;
1071 sleep 1;
1073 if ($self->verbose) {
1074 print STDERR "DONE\n";
1078 sub _empty_xcollector {
1079 my($self,$xcoll,$pathdb,$recent_events) = @_;
1080 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1081 if ($pathdb) {
1082 $self->_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1084 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1085 @$xcoll = ();
1086 return $success;
1089 sub _register_path {
1090 my($self,$db,$coll,$activity) = @_;
1091 my $time = time;
1092 for my $item (@$coll) {
1093 $db->{$item->{path}} =
1095 recentepoch => $item->{epoch},
1096 ($activity."_on") => $time,
1101 =head2 (void) $obj->mirror_loop
1103 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1104 What happens/should happen if we miss the interval during a single loop?
1106 =cut
1108 sub mirror_loop {
1109 my($self) = @_;
1110 my $iteration_start = time;
1112 my $Signal = 0;
1113 $SIG{INT} = sub { $Signal++ };
1114 my $loopinterval = $self->loopinterval || 42;
1115 my $after = -999999999;
1116 LOOP: while () {
1117 $self->mirror($after);
1118 last LOOP if $Signal;
1119 my $re = $self->recent_events;
1120 $after = $re->[0]{epoch};
1121 if ($self->verbose) {
1122 local $| = 1;
1123 print "($after)";
1125 if (time - $iteration_start < $loopinterval) {
1126 sleep $iteration_start + $loopinterval - time;
1128 if ($self->verbose) {
1129 local $| = 1;
1130 print "~";
1135 =head2 $success = $obj->mirror_path ( $arrref | $path )
1137 If the argument is a scalar it is treated as a path. The remote path
1138 is mirrored into the local copy. $path is the path found in the
1139 I<recentfile>, i.e. it is relative to the root directory of the
1140 mirror.
1142 If the argument is an array reference then all elements are treated as
1143 a path below the current tree and all are rsynced with a single
1144 command (and a single connection).
1146 =cut
1148 sub mirror_path {
1149 my($self,$path) = @_;
1150 # XXX simplify the two branches such that $path is treated as
1151 # [$path] maybe even demand the argument as an arrayref to
1152 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1153 # interface)
1154 if (ref $path and ref $path eq "ARRAY") {
1155 my $dst = $self->localroot;
1156 mkpath dirname $dst;
1157 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1158 lc $self->filenameroot,
1160 TMPDIR => 1,
1161 UNLINK => 0,
1163 for my $p (@$path) {
1164 print $fh $p, "\n";
1166 $fh->flush;
1167 $fh->unlink_on_destroy(1);
1168 my $gaveup = 0;
1169 my $retried = 0;
1170 while (!$self->rsync->exec
1172 src => join("/",
1173 $self->remoteroot,
1175 dst => $dst,
1176 'files-from' => $fh->filename,
1177 )) {
1178 my($err) = $self->rsync->err;
1179 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1180 if ($self->verbose) {
1181 warn "Info: ignoring link_stat error '$err'";
1183 return 1;
1185 $self->register_rsync_error ($err);
1186 if (++$retried >= 3) {
1187 warn "XXX giving up.";
1188 $gaveup = 1;
1189 last;
1192 unless ($gaveup) {
1193 $self->un_register_rsync_error ();
1195 } else {
1196 my $dst = $self->local_path($path);
1197 mkpath dirname $dst;
1198 while (!$self->rsync->exec
1200 src => join("/",
1201 $self->remoteroot,
1202 $path
1204 dst => $dst,
1205 )) {
1206 my($err) = $self->rsync->err;
1207 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1208 if ($self->verbose) {
1209 warn "Info: ignoring link_stat error '$err'";
1211 return 1;
1213 $self->register_rsync_error ($err);
1215 $self->un_register_rsync_error ();
1217 return 1;
1220 sub _my_current_rfile {
1221 my($self) = @_;
1222 my $rfile;
1223 if ($self->_use_tempfile) {
1224 $rfile = $self->_current_tempfile;
1225 } else {
1226 $rfile = $self->rfile;
1228 return $rfile;
1231 =head2 $path = $obj->naive_path_normalize ($path)
1233 Takes an absolute unix style path as argument and canonicalizes it to
1234 a shorter path if possible, removing things like double slashes or
1235 C</./> and removes references to C<../> directories to get a shorter
1236 unambiguos path. This is used to make the code easier that determines
1237 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1239 =cut
1241 sub naive_path_normalize {
1242 my($self,$path) = @_;
1243 $path =~ s|/+|/|g;
1244 1 while $path =~ s|/[^/]+/\.\./|/|;
1245 $path =~ s|/$||;
1246 $path;
1249 =head2 $ret = $obj->read_recent_1 ( $data )
1251 Delegate of C<recent_events()> on protocol 1
1253 =cut
1255 sub read_recent_1 {
1256 my($self, $data) = @_;
1257 return $data->{recent};
1260 =head2 $array_ref = $obj->recent_events ( %options )
1262 Note: the code relies on the resource being written atomically. We
1263 cannot lock because we may have no write access. If the caller has
1264 write access (eg. aggregate() or update()), it has to care for any
1265 necessary locking.
1267 If $options{after} is specified, only file events after this timestamp
1268 are returned.
1270 If $options{before} is specified, only file events before this
1271 timestamp are returned.
1273 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1274 be returned.
1276 If $options{max} is specified only this many events are returned.
1278 If $options{info} is specified, it must be a hashref. This hashref
1279 will be filled with metadata about the unfiltered recent_events of
1280 this object, in key C<first> there is the first item, in key C<last>
1281 is the last.
1283 =cut
1285 sub recent_events {
1286 my ($self, %options) = @_;
1287 my $info = $options{info};
1288 if ($self->is_slave) {
1289 $self->get_remote_recentfile_as_tempfile;
1291 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1292 -e $rfile_or_tempfile or return [];
1293 my $suffix = $self->serializer_suffix;
1294 my ($data) = eval {
1295 $self->_try_deserialize
1297 $suffix,
1298 $rfile_or_tempfile,
1301 my $err = $@;
1302 if ($err or !$data) {
1303 return [];
1305 my $re;
1306 if (reftype $data eq 'ARRAY') { # protocol 0
1307 $re = $data;
1308 } else {
1309 $re = $self->_recent_events_protocol_x
1311 $data,
1312 $rfile_or_tempfile,
1315 return $re unless defined $options{after}; # XXX same for before and max
1316 my $last_item = $#$re;
1317 if ($info) {
1318 $info->{first} = $re->[0];
1319 $info->{last} = $re->[-1];
1321 if (defined $options{after}) {
1322 if ($re->[0]{epoch} > $options{after}) {
1323 if (
1324 my $f = first
1325 {$re->[$_]{epoch} <= $options{after}}
1326 0..$#$re
1328 $last_item = $f-1;
1330 } else {
1331 $last_item = -1;
1334 my $first_item = 0;
1335 if (defined $options{before}) {
1336 if ($re->[0]{epoch} > $options{before}) {
1337 if (
1338 my $f = first
1339 {$re->[$_]{epoch} < $options{before}}
1340 0..$last_item
1342 $first_item = $f;
1344 } else {
1345 $first_item = 0;
1348 my @rre = splice @$re, $first_item, 1+$last_item-$first_item;
1349 if ($options{'skip-deletes'}) {
1350 @rre = grep { $_->{type} ne "delete" } @rre;
1352 if ($options{max} && @rre > $options{max}) {
1353 @rre = splice @rre, 0, $options{max};
1355 \@rre;
1358 sub _recent_events_protocol_x {
1359 my($self,
1360 $data,
1361 $rfile_or_tempfile,
1362 ) = @_;
1363 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1364 # we may be reading meta for the first time
1365 while (my($k,$v) = each %{$data->{meta}}) {
1366 next if $k ne lc $k; # "Producers"
1367 next if defined $self->$k;
1368 $self->$k($v);
1370 my $re = $self->$meth ($data);
1371 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1372 my $minmax = { mtime => $stat[9] };
1373 if (@$re) {
1374 $minmax->{min} = $re->[-1]{epoch};
1375 $minmax->{max} = $re->[0]{epoch};
1377 $self->minmax ( $minmax );
1378 return $re;
1381 sub _try_deserialize {
1382 my($self,
1383 $suffix,
1384 $rfile_or_tempfile,
1385 ) = @_;
1386 if ($suffix eq ".yaml") {
1387 require YAML::Syck;
1388 YAML::Syck::LoadFile($rfile_or_tempfile);
1389 } elsif ($HAVE->{"Data::Serializer"}) {
1390 my $serializer = Data::Serializer->new
1391 ( serializer => $serializers{$suffix} );
1392 my $serialized = do
1394 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1395 local $/;
1396 <$fh>;
1398 $serializer->raw_deserialize($serialized);
1399 } else {
1400 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1404 =head2 $ret = $obj->rfilename
1406 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1407 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1409 =cut
1411 sub rfilename {
1412 my($self) = @_;
1413 my $file = sprintf("%s-%s%s",
1414 $self->filenameroot,
1415 $self->interval,
1416 $self->serializer_suffix,
1418 return $file;
1421 =head2 $str = $self->remote_dir
1423 The directory we are mirroring from.
1425 =cut
1427 sub remote_dir {
1428 my($self, $set) = @_;
1429 if (defined $set) {
1430 $self->_remote_dir ($set);
1432 my $x = $self->_remote_dir;
1433 $self->is_slave (1);
1434 return $x;
1437 =head2 $str = $obj->remoteroot
1439 =head2 (void) $obj->remoteroot ( $set )
1441 Get/Set the composed prefix needed when rsyncing from a remote module.
1442 If remote_host, remote_module, and remote_dir are set, it is composed
1443 from these.
1445 =cut
1447 sub remoteroot {
1448 my($self, $set) = @_;
1449 if (defined $set) {
1450 $self->_remoteroot($set);
1452 my $remoteroot = $self->_remoteroot;
1453 unless (defined $remoteroot) {
1454 $remoteroot = sprintf
1456 "%s%s%s",
1457 defined $self->remote_host ? ($self->remote_host."::") : "",
1458 defined $self->remote_module ? ($self->remote_module."/") : "",
1459 defined $self->remote_dir ? $self->remote_dir : "",
1461 $self->_remoteroot($remoteroot);
1463 return $remoteroot;
1466 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1468 Inverse method to L<rfilename>. $recentfilename is a plain filename of
1469 the pattern
1471 $filenameroot-$interval$serializer_suffix
1473 e.g.
1475 RECENT-1M.yaml
1477 This filename is split into its parts and the parts are fed to the
1478 object itself.
1480 =cut
1482 sub resolve_recentfilename {
1483 my($self, $rfname) = @_;
1484 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1485 if (my($f,$i,$s) = $rfname =~ $splitter) {
1486 $self->filenameroot ($f);
1487 $self->interval ($i);
1488 $self->serializer_suffix ($s);
1489 } else {
1490 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1492 return;
1495 =head2 my $rfile = $obj->rfile
1497 Returns the full path of the I<recentfile>
1499 =cut
1501 sub rfile {
1502 my($self) = @_;
1503 my $rfile = $self->_rfile;
1504 return $rfile if defined $rfile;
1505 $rfile = File::Spec->catfile
1506 ($self->localroot,
1507 $self->rfilename,
1509 $self->_rfile ($rfile);
1510 return $rfile;
1513 =head2 $rsync_obj = $obj->rsync
1515 The File::Rsync object that this object uses for communicating with an
1516 upstream server.
1518 =cut
1520 sub rsync {
1521 my($self) = @_;
1522 my $rsync = $self->_rsync;
1523 unless (defined $rsync) {
1524 my $rsync_options = $self->rsync_options || {};
1525 if ($HAVE->{"File::Rsync"}) {
1526 $rsync = File::Rsync->new($rsync_options);
1527 $self->_rsync($rsync);
1528 } else {
1529 die "File::Rsync required for rsync operations. Cannot continue";
1532 return $rsync;
1535 =head2 (void) $obj->register_rsync_error($err)
1537 =head2 (void) $obj->un_register_rsync_error()
1539 Register_rsync_error is called whenever the File::Rsync object fails
1540 on an exec (say, connection doesn't succeed). It issues a warning and
1541 sleeps for an increasing amount of time. Un_register_rsync_error
1542 resets the error count. See also accessor C<max_rsync_errors>.
1544 =cut
1547 my $no_success_count = 0;
1548 my $no_success_time = 0;
1549 sub register_rsync_error {
1550 my($self, $err) = @_;
1551 chomp $err;
1552 $no_success_time = time;
1553 $no_success_count++;
1554 my $max_rsync_errors = $self->max_rsync_errors;
1555 $max_rsync_errors = 12 unless defined $max_rsync_errors;
1556 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1557 require Carp;
1558 Carp::confess
1560 sprintf
1562 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1563 $self->interval,
1564 $err,
1565 $no_success_count,
1568 my $sleep = 12 * $no_success_count;
1569 $sleep = 120 if $sleep > 120;
1570 require Carp;
1571 Carp::cluck
1572 (sprintf
1574 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1575 scalar(localtime($no_success_time)),
1576 $self->interval,
1577 $err,
1578 $sleep,
1580 sleep $sleep
1582 sub un_register_rsync_error {
1583 my($self) = @_;
1584 $no_success_time = 0;
1585 $no_success_count = 0;
1589 =head2 $clone = $obj->_sparse_clone
1591 Clones just as much from itself that it does not hurt. Experimental
1592 method.
1594 Note: what fits better: sparse or shallow? Other suggestions?
1596 =cut
1598 sub _sparse_clone {
1599 my($self) = @_;
1600 my $new = bless {}, ref $self;
1601 for my $m (qw(
1602 _interval
1603 _localroot
1604 _remoteroot
1605 _rfile
1606 _use_tempfile
1607 aggregator
1608 filenameroot
1609 is_slave
1610 max_files_per_connection
1611 protocol
1612 rsync_options
1613 serializer_suffix
1614 sleep_per_connection
1615 verbose
1616 )) {
1617 my $o = $self->$m;
1618 $o = Storable::dclone $o if ref $o;
1619 $new->$m($o);
1621 $new;
1624 =head2 $boolean = OBJ->ttl_reached ()
1626 =cut
1628 sub ttl_reached {
1629 my($self) = @_;
1630 my $have_mirrored = $self->have_mirrored || 0;
1631 my $now = Time::HiRes::time;
1632 my $ttl = $self->ttl;
1633 $ttl = 24.2 unless defined $ttl;
1634 if ($now > $have_mirrored + $ttl) {
1635 return 1;
1637 return 0;
1640 =head2 (void) $obj->unlock()
1642 Unlocking is implemented with an C<rmdir> on a locking directory
1643 (C<.lock> appended to $rfile).
1645 =cut
1647 sub unlock {
1648 my($self) = @_;
1649 return unless $self->_is_locked;
1650 my $rfile = $self->rfile;
1651 rmdir "$rfile.lock";
1652 $self->_is_locked (0);
1655 =head2 $ret = $obj->update ($path, $type)
1657 Enter one file into the local I<recentfile>. $path is the (usually
1658 absolute) path. If the path is outside the I<our> tree, then it is
1659 ignored.
1661 $type is one of C<new> or C<delete>.
1663 The new file event is uhshifted to the array of recent_events and the
1664 array is shortened to the length of the timespan allowed. This is
1665 usually the timespan specified by the interval of this recentfile but
1666 as long as this recentfile has not been merged to another one, the
1667 timespan may grow without bounds.
1669 =cut
1670 sub _epoch_monotonically_increasing {
1671 my($self,$epoch,$recent) = @_;
1672 return $epoch unless @$recent; # the first one goes unoffended
1673 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1674 return $epoch;
1675 } else {
1676 return _increase_a_bit($recent->[0]{epoch});
1679 sub update {
1680 my($self,$path,$type) = @_;
1681 die "update called without path argument" unless defined $path;
1682 die "update called without type argument" unless defined $type;
1683 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1684 my $canonmeth = $self->canonize;
1685 unless ($canonmeth) {
1686 $canonmeth = "naive_path_normalize";
1688 $path = $self->$canonmeth($path);
1689 my $lrd = $self->localroot;
1690 if ($path =~ s|^\Q$lrd\E||) {
1691 $path =~ s|^/||;
1692 my $interval = $self->interval;
1693 my $secs = $self->interval_secs();
1694 $self->lock;
1695 # you must calculate the time after having locked, of course
1696 my $epoch = Time::HiRes::time;
1697 my $recent = $self->recent_events;
1698 $epoch = $self->_epoch_monotonically_increasing($epoch,$recent);
1699 $recent ||= [];
1700 my $oldest_allowed = 0;
1701 if (my $merged = $self->merged) {
1702 # XXX _bigfloat!
1703 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
1704 } else {
1705 # as long as we are not merged at all, no limits!
1707 TRUNCATE: while (@$recent) {
1708 if ($recent->[-1]{epoch} < $oldest_allowed) { # XXX _bigfloatlt!
1709 pop @$recent;
1710 } else {
1711 last TRUNCATE;
1714 # remove older duplicates of this $path, irrespective of $type:
1715 $recent = [ grep { $_->{path} ne $path } @$recent ];
1717 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1718 $self->write_recent($recent);
1719 $self->_assert_symlink;
1720 $self->unlock;
1724 =head2 uptodate
1726 True if this object has mirrored the complete interval covered by the
1727 current recentfile.
1729 *** WIP ***
1731 =cut
1733 sub uptodate {
1734 my($self, $debug) = @_;
1735 if ($self->ttl_reached){
1736 if ($debug) {
1737 warn "ttl_reached returned true, so we are not uptodate";
1739 return 0 ;
1742 # look if recentfile has unchanged timestamp
1743 my $minmax = $self->minmax;
1744 if (exists $minmax->{mtime}) {
1745 my $rfile = $self->_my_current_rfile;
1746 my @stat = stat $rfile;
1747 my $mtime = $stat[9];
1748 if ($mtime > $minmax->{mtime}) {
1749 if ($debug) {
1750 warn "$mtime > $minmax->{mtime}, so we are not uptodate";
1752 return 0;
1753 } else {
1754 my $covered = $self->done->covered(@$minmax{qw(max min)});
1755 if ($debug) {
1756 warn "minmax covered[$covered], so we return that";
1758 return $covered;
1761 if ($debug) {
1762 warn "fallthrough, so not uptodate";
1764 return 0;
1767 =head2 $obj->write_recent ($recent_files_arrayref)
1769 Writes a I<recentfile> based on the current reflection of the current
1770 state of the tree limited by the current interval.
1772 =cut
1774 sub write_recent {
1775 my ($self,$recent) = @_;
1776 die "write_recent called without argument" unless defined $recent;
1777 my $meth = sprintf "write_%d", $self->protocol;
1778 $self->$meth($recent);
1781 =head2 $obj->write_0 ($recent_files_arrayref)
1783 Delegate of C<write_recent()> on protocol 0
1785 =cut
1787 sub write_0 {
1788 my ($self,$recent) = @_;
1789 my $rfile = $self->rfile;
1790 YAML::Syck::DumpFile("$rfile.new",$recent);
1791 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1794 =head2 $obj->write_1 ($recent_files_arrayref)
1796 Delegate of C<write_recent()> on protocol 1
1798 =cut
1800 sub write_1 {
1801 my ($self,$recent) = @_;
1802 my $rfile = $self->rfile;
1803 my $suffix = $self->serializer_suffix;
1804 my $data = {
1805 meta => $self->meta_data,
1806 recent => $recent,
1808 my $serialized;
1809 if ($suffix eq ".yaml") {
1810 $serialized = YAML::Syck::Dump($data);
1811 } elsif ($HAVE->{"Data::Serializer"}) {
1812 my $serializer = Data::Serializer->new
1813 ( serializer => $serializers{$suffix} );
1814 $serialized = $serializer->raw_serialize($data);
1815 } else {
1816 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1818 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
1819 print $fh $serialized;
1820 close $fh or die "Could not close '$rfile.new': $!";
1821 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1824 BEGIN {
1825 my @pod_lines =
1826 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1828 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1830 The idea is that we want to have a short file that records really
1831 recent changes. So that a fresh mirror can be kept fresh as long as
1832 the connectivity is given. Then we want longer files that record the
1833 history before. So when the mirror falls behind the update period
1834 reflected in the shortest file, it can complement the list of recent
1835 file events with the next one. And if this is not long enough we want
1836 another one, again a bit longer. And we want one that completes the
1837 history back to the oldest file. The index files do contain the
1838 complete list of current files. The larger an index file is the less
1839 often it is updated. For practical reasons adjacent files will often
1840 overlap a bit but this is neither necessary nor enforced. That's the
1841 basic idea. The following example represents a tree that has a few
1842 updates every day:
1844 RECENT.recent -> RECENT-1h.yaml
1845 RECENT-6h.yaml
1846 RECENT-1d.yaml
1847 RECENT-1M.yaml
1848 RECENT-1W.yaml
1849 RECENT-1Q.yaml
1850 RECENT-1Y.yaml
1851 RECENT-Z.yaml
1853 The first file is the principal file, in so far it is the one that is
1854 written first after a filesystem change. Usually a symlink links to it
1855 with a filename that has the same filenameroot and the suffix
1856 C<.recent>. On systems that do not support symlinks there is a plain
1857 copy maintained instead.
1859 The last file, the Z file, contains the complementary files that are
1860 in none of the other files. It does never contain C<deletes>. Besides
1861 this it serves the role of a recovery mechanism or spill over pond.
1862 When things go wrong, it's a valuable controlling instance to hold the
1863 differences between the collection of limited interval files and the
1864 actual filesystem.
1866 =head2 A SINGLE RECENTFILE
1868 A I<recentfile> consists of a hash that has two keys: C<meta> and
1869 C<recent>. The C<meta> part has metadata and the C<recent> part has a
1870 list of fileobjects.
1872 =head2 THE META PART
1874 Here we find things that are pretty much self explaining: all
1875 lowercase attributes are accessors and as such explained somewhere
1876 above in this manpage. The uppercase attribute C<Producers> contains
1877 version information about involved software components. Nothing to
1878 worry about as I believe.
1880 =head2 THE RECENT PART
1882 This is the interesting part. Every entry refers to some filesystem
1883 change (with path, epoch, type). The epoch value is the point in time
1884 when some change was I<registered>. Do not be tempted to believe that
1885 the entry has a direct relation to something like modification time or
1886 change time on the filesystem level. The timestamp (I<epoch> element)
1887 is a floating point number and does practically never correspond
1888 exactly to the data recorded in the filesystem but rather to the time
1889 when some process succeeded to report to the I<recentfile> mechanism
1890 that something has changed. This is why many parts of the code refer
1891 to I<events>, because we merely try to record the I<event> of the
1892 discovery of a change, not the time of the change itself.
1894 All these entries can be devided into two types (denoted by the
1895 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
1896 C<new>s. Deletes are C<delete>s.
1898 Another distinction is for objects with an epoch timestamp and others
1899 without. All files that were already existing on the filesystem before
1900 the I<recentfile> mechanism was installed, get recorded with a
1901 timestamp of zero.
1903 Besides an C<epoch> and a C<type> attribute we find a third one:
1904 C<path>. This path is relative to the directory we find the
1905 I<recentfile> in.
1907 The order of the entries in the I<recentfile> is by decreasing epoch
1908 attribute. These are either 0 or a unique floating point number. They
1909 are zero for events that were happening either before the time that
1910 the I<recentfile> mechanism was set up or were left undiscovered for a
1911 while and never handed over to update(). They are floating point
1912 numbers for all events being regularly handed to update(). And when
1913 the server has ntp running correctly, then the timestamps are
1914 actually decreasing and unique.
1916 =head1 CORRUPTION AND RECOVERY
1918 If the origin host breaks the promise to deliver consistent and
1919 complete I<recentfiles> then the way back to sanity shall be achieved
1920 through either the C<zloop> (still TBD) or traditional rsyncing
1921 between the hosts. For example, if the origin server forgets to deploy
1922 ntp and the clock on it jumps backwards some day, then this would
1923 probably go unnoticed for a while and many software components that
1924 rely on the time never running backwards will make wrong decisions.
1925 After some time this accident would probably still be found in one of
1926 the I<recentfiles> but would become meaningless as soon as a mirror
1927 has run through the sanitizing procedures. Same goes for origin hosts
1928 that forget to include or deliberately omit some files.
1930 =head1 SERIALIZERS
1932 The following suffixes are supported and trigger the use of these
1933 serializers:
1935 =over 4
1937 =item C<< ".yaml" => "YAML::Syck" >>
1939 =item C<< ".json" => "JSON" >>
1941 =item C<< ".sto" => "Storable" >>
1943 =item C<< ".dd" => "Data::Dumper" >>
1945 =back
1947 =cut
1949 BEGIN {
1950 my @pod_lines =
1951 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1953 =head1 INTERVAL SPEC
1955 An interval spec is a primitive way to express time spans. Normally it
1956 is composed from an integer and a letter.
1958 As a special case, a string that consists only of the single letter
1959 C<Z>, stands for unlimited time.
1961 The following letters express the specified number of seconds:
1963 =over 4
1965 =item C<< s => 1 >>
1967 =item C<< m => 60 >>
1969 =item C<< h => 60*60 >>
1971 =item C<< d => 60*60*24 >>
1973 =item C<< W => 60*60*24*7 >>
1975 =item C<< M => 60*60*24*30 >>
1977 =item C<< Q => 60*60*24*90 >>
1979 =item C<< Y => 60*60*24*365.25 >>
1981 =back
1983 =cut
1985 =head1 BACKGROUND
1987 This is about speeding up rsync operation on large trees to many
1988 places. Uses a small metadata cocktail and pull technology.
1990 =head2 NON-COMPETITORS
1992 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
1993 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
1994 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
1995 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
1996 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
1997 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
1999 rsnapshot www.rsnapshot.org focus on backup
2000 csync www.csync.org more like unison
2002 =head2 COMPETITORS
2004 The problem to solve which clusters and ftp mirrors and otherwise
2005 replicated datasets like CPAN share: how to transfer only a minimum
2006 amount of data to determine the diff between two hosts.
2008 Normally it takes a long time to determine the diff itself before it
2009 can be transferred. Known solutions at the time of this writing are
2010 csync2, and rsync 3 batch mode.
2012 For many years the best solution was csync2 which solves the
2013 problem by maintining a sqlite database on both ends and talking a
2014 highly sophisticated protocol to quickly determine which files to send
2015 and which to delete at any given point in time. Csync2 is often
2016 inconvenient because the act of syncing demands quite an intimate
2017 relationship between the sender and the receiver and suffers when the
2018 number of syncing sites is large or connections are unreliable.
2020 Rsync 3 batch mode works around these problems by providing rsync-able
2021 batch files which allow receiving nodes to replay the history of the
2022 other nodes. This reduces the need to have an incestuous relation but
2023 it has the disadvantage that these batch files replicate the contents
2024 of the involved files. This seems inappropriate when the nodes already
2025 have a means of communicating over rsync.
2027 rersyncrecent solves this problem with a couple of (usually 2-10)
2028 index files which cover different overlapping time intervals. The
2029 master writes these files and the clients can construct the full tree
2030 from the information contained in them. The most recent index file
2031 usually covers the last seconds or minutes or hours of the tree and
2032 depending on the needs, slaves can rsync every few seconds and then
2033 bring their trees in full sync.
2035 The rersyncrecent mode was developed for CPAN but I hope it is a
2036 convenient and economic general purpose solution. I'm looking forward
2037 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2038 then ... the first FUSE based CPAN filesystem anyone?
2040 =head1 AUTHOR
2042 Andreas König
2044 =head1 BUGS
2046 Please report any bugs or feature requests through the web interface
2048 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2049 I will be notified, and then you'll automatically be notified of
2050 progress on your bug as I make changes.
2052 =head1 SUPPORT
2054 You can find documentation for this module with the perldoc command.
2056 perldoc File::Rsync::Mirror::Recentfile
2058 You can also look for information at:
2060 =over 4
2062 =item * RT: CPAN's request tracker
2064 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2066 =item * AnnoCPAN: Annotated CPAN documentation
2068 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2070 =item * CPAN Ratings
2072 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2074 =item * Search CPAN
2076 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2078 =back
2081 =head1 ACKNOWLEDGEMENTS
2083 Thanks to RJBS for module-starter.
2085 =head1 COPYRIGHT & LICENSE
2087 Copyright 2008 Andreas König.
2089 This program is free software; you can redistribute it and/or modify it
2090 under the same terms as Perl itself.
2093 =cut
2095 1; # End of File::Rsync::Mirror::Recentfile
2097 # Local Variables:
2098 # mode: cperl
2099 # cperl-indent-level: 4
2100 # End: