add support for "contains" to find specific entries in a recent_events query
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob64752c75380c7f3cd458d6b3634ac4a18c68ea44
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =head1 VERSION
14 Version 0.0.1
16 =cut
18 my $HAVE = {};
19 for my $package (
20 "Data::Serializer",
21 "File::Rsync"
22 ) {
23 $HAVE->{$package} = eval qq{ require $package; };
25 use Config;
26 use File::Basename qw(basename dirname fileparse);
27 use File::Copy qw(cp);
28 use File::Path qw(mkpath);
29 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
30 use File::Temp;
31 use List::Util qw(first min);
32 use Scalar::Util qw(reftype);
33 use Storable;
34 use Time::HiRes qw();
35 use YAML::Syck;
37 use version; our $VERSION = qv('0.0.1');
40 use constant MAX_INT => ~0>>1; # anything better?
41 use constant DEFAULT_PROTOCOL => 1;
43 # cf. interval_secs
44 my %seconds;
46 # maybe subclass if this mapping is bad?
47 my %serializers;
49 =head1 SYNOPSIS
51 B<!!!! PRE-ALPHA ALERT !!!!>
53 Nothing in here is believed to be stable, nothing yet intended for
54 public consumption. The plan is to provide a script in one of the next
55 releases that acts as a frontend for all the backend functionality.
56 Option and method names will very likely change.
58 For the rationale see the section BACKGROUND.
60 This is published only for developers of the (yet to be named)
61 script(s).
63 Writer (of a single file):
65 use File::Rsync::Mirror::Recentfile;
66 my $fr = File::Rsync::Mirror::Recentfile->new
68 interval => q(6h),
69 filenameroot => "RECENT",
70 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
71 localroot => "/home/ftp/pub/PAUSE/authors/",
72 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
74 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
76 Reader/mirrorer:
78 my $rf = File::Rsync::Mirror::Recentfile->new
80 filenameroot => "RECENT",
81 ignore_link_stat_errors => 1,
82 interval => q(6h),
83 localroot => "/home/ftp/pub/PAUSE/authors",
84 remote_dir => "",
85 remote_host => "pause.perl.org",
86 remote_module => "authors",
87 rsync_options => {
88 compress => 1,
89 'rsync-path' => '/usr/bin/rsync',
90 links => 1,
91 times => 1,
92 'omit-dir-times' => 1,
93 checksum => 1,
95 verbose => 1,
97 $rf->mirror;
99 Aggregator (usually the writer):
101 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
102 $rf->aggregate;
104 =head1 EXPORT
106 No exports.
108 =head1 CONSTRUCTORS / DESTRUCTOR
110 =head2 my $obj = CLASS->new(%hash)
112 Constructor. On every argument pair the key is a method name and the
113 value is an argument to that method name.
115 If a recentfile for this resource already exists, metadata that are
116 not defined by the constructor will be fetched from there as soon as
117 it is being read by recent_events().
119 =cut
121 sub new {
122 my($class, @args) = @_;
123 my $self = bless {}, $class;
124 while (@args) {
125 my($method,$arg) = splice @args, 0, 2;
126 $self->$method($arg);
128 unless (defined $self->protocol) {
129 $self->protocol(DEFAULT_PROTOCOL);
131 unless (defined $self->filenameroot) {
132 $self->filenameroot("RECENT");
134 unless (defined $self->serializer_suffix) {
135 $self->serializer_suffix(".yaml");
137 return $self;
140 =head2 my $obj = CLASS->new_from_file($file)
142 Constructor. $file is a I<recentfile>.
144 =cut
146 sub new_from_file {
147 my($class, $file) = @_;
148 my $self = bless {}, $class;
149 $self->_rfile($file);
150 #?# $self->lock;
151 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
152 local $/;
153 <$fh>;
155 # XXX: we can skip this step when the metadata are sufficient, but
156 # we cannot parse the file without some magic stuff about
157 # serialized formats
158 while (-l $file) {
159 my($name,$path) = fileparse $file;
160 my $symlink = readlink $file;
161 if ($symlink =~ m|/|) {
162 die "FIXME: filenames containing '/' not supported, got $symlink";
164 $file = File::Spec->catfile ( $path, $symlink );
166 my($name,$path,$suffix) = fileparse $file, keys %serializers;
167 $self->serializer_suffix($suffix);
168 $self->localroot($path);
169 die "Could not determine file format from suffix" unless $suffix;
170 my $deserialized;
171 if ($suffix eq ".yaml") {
172 require YAML::Syck;
173 $deserialized = YAML::Syck::LoadFile($file);
174 } elsif ($HAVE->{"Data::Serializer"}) {
175 my $serializer = Data::Serializer->new
176 ( serializer => $serializers{$suffix} );
177 $deserialized = $serializer->raw_deserialize($serialized);
178 } else {
179 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
181 while (my($k,$v) = each %{$deserialized->{meta}}) {
182 next if $k ne lc $k; # "Producers"
183 $self->$k($v);
185 unless (defined $self->protocol) {
186 $self->protocol(DEFAULT_PROTOCOL);
188 return $self;
191 =head2 DESTROY
193 A simple unlock.
195 =cut
196 sub DESTROY { shift->unlock }
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_delayed_operations",
209 "_done",
210 "_interval",
211 "_is_locked",
212 "_localroot",
213 "_merged",
214 "_pathdb",
215 "_remember_last_uptodate_call",
216 "_remote_dir",
217 "_remoteroot",
218 "_rfile",
219 "_rsync",
220 "_seeded",
221 "_uptodateness_ever_reached",
222 "_use_tempfile",
225 my @pod_lines =
226 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
228 =over 4
230 =item aggregator
232 A list of interval specs that tell the aggregator which I<recentfile>s
233 are to be produced.
235 =item canonize
237 The name of a method to canonize the path before rsyncing. Only
238 supported value is C<naive_path_normalize>. Defaults to that.
240 =item comment
242 A comment about this tree and setup.
244 =item dirtymark
246 A timestamp. The dirtymark is updated whenever an out of band change
247 on the origin server is performed that violates the protocol. Say,
248 they add or remove files in the middle somewhere. Slaves must react
249 with a devaluation of their C<done> structure which then leads to a
250 full re-sync of all files.
252 =item filenameroot
254 The (prefix of the) filename we use for this I<recentfile>. Defaults to
255 C<RECENT>.
257 =item have_mirrored
259 Timestamp remembering when we mirrored this recentfile the last time.
260 Only relevant for slaves.
262 =item ignore_link_stat_errors
264 If set to true, rsync errors are ignored that complain about link stat
265 errors. These seem to happen only when there are files missing at the
266 origin. In race conditions this can always happen, so it is
267 recommended to set this value to true.
269 =item is_slave
271 If set to true, this object will fetch a new recentfile from remote
272 when the timespan between the last mirror (see have_mirrored) and now
273 is too large (currently hardcoded arbitrary 420 seconds).
275 =item locktimeout
277 After how many seconds shall we die if we cannot lock a I<recentfile>?
278 Defaults to 600 seconds.
280 =item loopinterval
282 When mirror_loop is called, this accessor can specify how much time
283 every loop shall at least take. If the work of a loop is done before
284 that time has gone, sleeps for the rest of the time. Defaults to
285 arbitrary 42 seconds.
287 =item max_files_per_connection
289 Maximum number of files that are transferred on a single rsync call.
290 Setting it higher means higher performance at the price of holding
291 connections longer and potentially disturbing other users in the pool.
292 Defaults to the arbitrary value 42.
294 =item max_rsync_errors
296 When rsync operations encounter that many errors without any resetting
297 success in between, then we die. Defaults to unlimited. A value of
298 -1 means we run forever ignoring all rsync errors.
300 =item minmax
302 Hashref remembering when we read the recent_events from this file the
303 last time and what the timespan was.
305 =item protocol
307 When the RECENT file format changes, we increment the protocol. We try
308 to support older protocols in later releases.
310 =item remote_host
312 The host we are mirroring from. Leave empty for the local filesystem.
314 =item remote_module
316 Rsync servers have so called modules to separate directory trees from
317 each other. Put here the name of the module under which we are
318 mirroring. Leave empty for local filesystem.
320 =item rsync_options
322 Things like compress, links, times or checksums. Passed in to the
323 File::Rsync object used to run the mirror.
325 =item serializer_suffix
327 Mostly untested accessor. The only well tested format for
328 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
329 Data::Serializer. But in principle other formats are supported as
330 well. See section SERIALIZERS below.
332 =item sleep_per_connection
334 Sleep that many seconds (floating point OK) after every chunk of rsyncing
335 has finished. Defaults to arbitrary 0.42.
337 =item ttl
339 Time to live. Number of seconds after which this recentfile must be
340 fetched again from the origin server. Only relevant for slaves.
341 Defaults to arbitrary 24.2 seconds.
343 =item verbose
345 Boolean to turn on a bit verbosity.
347 =back
349 =cut
351 use accessors @accessors;
353 =head1 METHODS
355 =head2 (void) $obj->aggregate
357 Takes all intervals that are collected in the accessor called
358 aggregator. Sorts them by actual length of the interval.
359 Removes those that are shorter than our own interval. Then merges this
360 object into the next larger object. The merging continues upwards
361 as long as the next I<recentfile> is old enough to warrant a merge.
363 If a merge is warranted is decided according to the interval of the
364 previous interval so that larger files are not so often updated as
365 smaller ones.
367 Here is an example to illustrate the behaviour. Given aggregators
369 1h 1d 1W 1M 1Q 1Y Z
371 then
373 1h updates 1d on every call to aggregate()
374 1d updates 1W earliest after 1h
375 1W updates 1M earliest after 1d
376 1M updates 1Q earliest after 1W
377 1Q updates 1Y earliest after 1M
378 1Y updates Z earliest after 1Q
380 Note that all but the smallest recentfile get updated at an arbitrary
381 rate and as such are quite useless on their own.
383 =cut
385 sub aggregate {
386 my($self) = @_;
387 my @aggs = sort { $a->{secs} <=> $b->{secs} }
388 grep { $_->{secs} >= $self->interval_secs }
389 map { { interval => $_, secs => $self->interval_secs($_)} }
390 $self->interval, @{$self->aggregator || []};
391 $aggs[0]{object} = $self;
392 AGGREGATOR: for my $i (0..$#aggs-1) {
393 my $this = $aggs[$i]{object};
394 my $next = $this->_sparse_clone;
395 $next->interval($aggs[$i+1]{interval});
396 my $want_merge = 0;
397 if ($i == 0) {
398 $want_merge = 1;
399 } else {
400 my $next_rfile = $next->rfile;
401 if (-e $next_rfile) {
402 my $prev = $aggs[$i-1]{object};
403 local $^T = time;
404 my $next_age = 86400 * -M $next_rfile;
405 if ($next_age > $prev->interval_secs) {
406 $want_merge = 1;
408 } else {
409 $want_merge = 1;
412 if ($want_merge) {
413 $next->merge($this);
414 $aggs[$i+1]{object} = $next;
415 } else {
416 last AGGREGATOR;
421 # collect file size and mtime for all files of this aggregate
422 sub _debug_aggregate {
423 my($self) = @_;
424 my @aggs = sort { $a->{secs} <=> $b->{secs} }
425 map { { interval => $_, secs => $self->interval_secs($_)} }
426 $self->interval, @{$self->aggregator || []};
427 my $report = [];
428 for my $i (0..$#aggs) {
429 my $this = Storable::dclone $self;
430 $this->interval($aggs[$i]{interval});
431 my $rfile = $this->rfile;
432 my @stat = stat $rfile;
433 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
435 $report;
438 # (void) $self->_assert_symlink()
439 sub _assert_symlink {
440 my($self) = @_;
441 my $recentrecentfile = File::Spec->catfile
443 $self->localroot,
444 sprintf
446 "%s.recent",
447 $self->filenameroot
450 if ($Config{d_symlink} eq "define") {
451 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
452 if (-l $recentrecentfile) {
453 my $found_symlink = readlink $recentrecentfile;
454 if ($found_symlink eq $self->rfilename) {
455 return;
456 } else {
457 $howto_create_symlink = 2;
459 } else {
460 $howto_create_symlink = 1;
462 if (1 == $howto_create_symlink) {
463 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
464 } else {
465 unlink "$recentrecentfile.$$"; # may fail
466 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
467 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
469 } else {
470 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
471 unlink "$recentrecentfile.$$"; # may fail
472 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
473 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
477 =head2 $hashref = $obj->delayed_operations
479 A hash of hashes containing unlink and rmdir operations which had to
480 wait until the recentfile got unhidden in order to not confuse
481 downstream mirrors (in case we have some).
483 =cut
485 sub delayed_operations {
486 my($self) = @_;
487 my $x = $self->_delayed_operations;
488 unless (defined $x) {
489 $x = {
490 unlink => {},
491 rmdir => {},
493 $self->_delayed_operations ($x);
495 return $x;
498 =head2 $done = $obj->done
500 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
501 that keeps track of rsync activities. Only needed and used when we are
502 a mirroring slave.
504 =cut
506 sub done {
507 my($self) = @_;
508 my $done = $self->_done;
509 if (!$done) {
510 require File::Rsync::Mirror::Recentfile::Done;
511 $done = File::Rsync::Mirror::Recentfile::Done->new();
512 $done->_rfinterval ($self->interval);
513 $self->_done ( $done );
515 return $done;
518 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
520 Stores the remote I<recentfile> locally as a tempfile. The caller is
521 responsible to remove the file after use.
523 Note: if you're intending to act as an rsync server for other slaves,
524 then you must prefer this method to fetch that file with
525 get_remotefile(). Otherwise downstream mirrors would expect you to
526 already have mirrored all the files that are in the I<recentfile>
527 before you have them mirrored.
529 =cut
531 sub get_remote_recentfile_as_tempfile {
532 my($self) = @_;
533 mkpath $self->localroot;
534 my $fh;
535 my $trfilename;
536 if ( $self->_use_tempfile() ) {
537 return $self->_current_tempfile if ! $self->ttl_reached;
538 $fh = $self->_current_tempfile_fh;
539 $trfilename = $self->rfilename;
540 } else {
541 $trfilename = $self->rfilename;
544 my $dst;
545 if ($fh) {
546 $dst = $self->_current_tempfile;
547 } else {
548 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
549 $dst = $fh->filename;
550 $self->_current_tempfile ($dst);
551 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
552 if (defined $rfile && -e $rfile) {
553 # saving on bandwidth. Might need to be configurable
554 # $self->bandwidth_is_cheap?
555 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
558 my $src = join ("/",
559 $self->remoteroot,
560 $trfilename,
562 if ($self->verbose) {
563 my $doing = -e $dst ? "Sync" : "Get";
564 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
565 printf STDERR
567 "%-4s %d (1/1/%s) temp %s ... ",
568 $doing,
569 time,
570 $self->interval,
571 $display_dst,
574 my $gaveup = 0;
575 my $retried = 0;
576 while (!$self->rsync->exec(
577 src => $src,
578 dst => $dst,
579 )) {
580 $self->register_rsync_error ($self->rsync->err);
581 if (++$retried >= 3) {
582 warn "XXX giving up";
583 $gaveup = 1;
584 last;
587 if ($gaveup) {
588 printf STDERR "Warning: gave up mirroring %s, will try again later", $self->interval;
589 } else {
590 $self->_refresh_internals ($dst);
591 $self->have_mirrored (Time::HiRes::time);
592 $self->un_register_rsync_error ();
594 if ($self->verbose) {
595 print STDERR "DONE\n";
597 my $mode = 0644;
598 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
599 return $dst;
602 sub _get_remote_rat_provide_tempfile_object {
603 my($self, $trfilename) = @_;
604 my $fh = File::Temp->new
605 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
606 $trfilename,
608 DIR => $self->localroot,
609 SUFFIX => $self->serializer_suffix,
610 UNLINK => $self->_use_tempfile,
612 if ($self->_use_tempfile) {
613 $self->_current_tempfile_fh ($fh); # delay self destruction
615 return $fh;
618 =head2 $localpath = $obj->get_remotefile ( $relative_path )
620 Rsyncs one single remote file to local filesystem.
622 Note: no locking is done on this file. Any number of processes may
623 mirror this object.
625 Note II: do not use for recentfiles. If you are a cascading
626 slave/server combination, it would confuse other slaves. They would
627 expect the contents of these recentfiles to be available. Use
628 get_remote_recentfile_as_tempfile() instead.
630 =cut
632 sub get_remotefile {
633 my($self, $path) = @_;
634 my $dst = File::Spec->catfile($self->localroot, $path);
635 mkpath dirname $dst;
636 if ($self->verbose) {
637 my $doing = -e $dst ? "Sync" : "Get";
638 printf STDERR
640 "%-4s %d (1/1/%s) %s ... ",
641 $doing,
642 time,
643 $self->interval,
644 $path,
647 while (!$self->rsync->exec(
648 src => join("/",
649 $self->remoteroot,
650 $path),
651 dst => $dst,
652 )) {
653 $self->register_rsync_error ($self->rsync->err);
655 $self->un_register_rsync_error ();
656 if ($self->verbose) {
657 print STDERR "DONE\n";
659 return $dst;
662 =head2 $obj->interval ( $interval_spec )
664 Get/set accessor. $interval_spec is a string and described below in
665 the section INTERVAL SPEC.
667 =cut
669 sub interval {
670 my ($self, $interval) = @_;
671 if (@_ >= 2) {
672 $self->_interval($interval);
673 $self->_rfile(undef);
675 $interval = $self->_interval;
676 unless (defined $interval) {
677 # do not ask the $self too much, it recurses!
678 require Carp;
679 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
681 return $interval;
684 =head2 $secs = $obj->interval_secs ( $interval_spec )
686 $interval_spec is described below in the section INTERVAL SPEC. If
687 empty defaults to the inherent interval for this object.
689 =cut
691 sub interval_secs {
692 my ($self, $interval) = @_;
693 $interval ||= $self->interval;
694 unless (defined $interval) {
695 die "interval_secs() called without argument on an object without a declared one";
697 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
698 die "Could not determine seconds from interval[$interval]";
699 if ($interval eq "Z") {
700 return MAX_INT;
701 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
702 return $seconds{$t}*$n;
703 } else {
704 die "Invalid interval specification: n[$n]t[$t]";
708 =head2 $obj->localroot ( $localroot )
710 Get/set accessor. The local root of the tree.
712 =cut
714 sub localroot {
715 my ($self, $localroot) = @_;
716 if (@_ >= 2) {
717 $self->_localroot($localroot);
718 $self->_rfile(undef);
720 $localroot = $self->_localroot;
723 =head2 $ret = $obj->local_path($path_found_in_recentfile)
725 Combines the path to our local mirror and the path of an object found
726 in this I<recentfile>. In other words: the target of a mirror operation.
728 Implementation note: We split on slashes and then use
729 File::Spec::catfile to adjust to the local operating system.
731 =cut
733 sub local_path {
734 my($self,$path) = @_;
735 unless (defined $path) {
736 # seems like a degenerated case
737 return $self->localroot;
739 my @p = split m|/|, $path;
740 File::Spec->catfile($self->localroot,@p);
743 =head2 (void) $obj->lock
745 Locking is implemented with an C<mkdir> on a locking directory
746 (C<.lock> appended to $rfile).
748 =cut
750 sub lock {
751 my ($self) = @_;
752 # not using flock because it locks on filehandles instead of
753 # old school ressources.
754 my $locked = $self->_is_locked and return;
755 my $rfile = $self->rfile;
756 # XXX need a way to allow breaking the lock
757 my $start = time;
758 my $locktimeout = $self->locktimeout || 600;
759 while (not mkdir "$rfile.lock") {
760 Time::HiRes::sleep 0.01;
761 if (time - $start > $locktimeout) {
762 die "Could not acquire lockdirectory '$rfile.lock': $!";
765 $self->_is_locked (1);
768 =head2 (void) $obj->merge ($other)
770 Bulk update of this object with another one. It's used to merge a
771 smaller and younger $other object into the current one. If this file
772 is a C<Z> file, then we do not merge in objects of type C<delete>. But
773 if we encounter an object of type delete we delete the corresponding
774 C<new> object if we have it.
776 If there is nothing to be merged, nothing is done.
778 =cut
780 sub merge {
781 my($self, $other) = @_;
782 $self->_merge_sanitycheck ( $other );
783 $other->lock;
784 my $other_recent = $other->recent_events || [];
785 $self->lock;
786 my $my_recent = $self->recent_events || [];
788 # calculate the target time span
789 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
790 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
791 my $oldest_allowed = 0;
792 my $something_done;
793 unless ($my_recent->[0]) {
794 # obstetrics
795 $something_done=1;
797 if ($epoch) {
798 if (my $merged = $self->merged) {
799 my $secs = $self->interval_secs();
800 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
802 # throw away outsiders
803 # XXX _bigfloat!
804 while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) {
805 pop @$my_recent;
806 $something_done=1;
810 my %have;
811 my $recent = [];
812 for my $oev (@$other_recent) {
813 my $oevepoch = $oev->{epoch} || 0;
814 next if $oevepoch < $oldest_allowed;
815 my $path = $oev->{path};
816 next if $have{$path}++;
817 if ( $self->interval eq "Z"
818 and $oev->{type} eq "delete") {
819 # do nothing
820 } else {
821 if (!$myepoch || $oevepoch > $myepoch) {
822 $something_done=1;
824 push @$recent, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
827 if ($something_done) {
828 $self->_merge_something_done ($recent, $my_recent, $other_recent, $other, \%have, $epoch);
830 $self->unlock;
831 $other->unlock;
834 sub _merge_something_done {
835 my($self, $recent, $my_recent, $other_recent, $other, $have, $epoch) = @_;
836 push @$recent, grep { !$have->{$_->{path}}++ } @$my_recent;
837 if (_bigfloatgt($other->dirtymark, $self->dirtymark)) {
838 $self->dirtymark ( $other->dirtymark );
840 $self->write_recent($recent);
841 $other->merged({
842 time => Time::HiRes::time, # not used anywhere
843 epoch => $recent->[0]{epoch},
844 into_interval => $self->interval, # not used anywhere
846 $other->write_recent($other_recent);
849 sub _merge_sanitycheck {
850 my($self, $other) = @_;
851 if ($self->interval_secs <= $other->interval_secs) {
852 die sprintf
854 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
855 $self->interval_secs,
856 $other->interval_secs,
861 =head2 merged
863 Hashref denoting when this recentfile has been merged into some other
864 at which epoch.
866 =cut
868 sub merged {
869 my($self, $set) = @_;
870 if (defined $set) {
871 $self->_merged ($set);
873 my $merged = $self->_merged;
874 my $into;
875 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
876 if ($into eq $self->interval) {
877 require Carp;
878 Carp::cluck(sprintf
880 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
881 $into,
882 $self->interval,
884 } elsif ($self->interval_secs($into) < $self->interval_secs) {
885 require Carp;
886 Carp::cluck(sprintf
888 "Warning: into_interval[%s] smaller than own interval[%s] on interval[%s]. Danger ahead.",
889 $self->interval_secs($into),
890 $self->interval_secs,
891 $self->interval,
895 $merged;
898 =head2 $hashref = $obj->meta_data
900 Returns the hashref of metadata that the server has to add to the
901 I<recentfile>.
903 =cut
905 sub meta_data {
906 my($self) = @_;
907 my $ret = $self->{meta};
908 for my $m (
909 "aggregator",
910 "canonize",
911 "comment",
912 "dirtymark",
913 "filenameroot",
914 "merged",
915 "interval",
916 "protocol",
917 "serializer_suffix",
919 my $v = $self->$m;
920 if (defined $v) {
921 $ret->{$m} = $v;
924 # XXX need to reset the Producer if I am a writer, keep it when I
925 # am a reader
926 $ret->{Producers} ||= {
927 __PACKAGE__, "$VERSION", # stringified it looks better
928 '$0', $0,
929 'time', Time::HiRes::time,
931 $ret->{dirtymark} ||= Time::HiRes::time;
932 return $ret;
935 =head2 $success = $obj->mirror ( %options )
937 Mirrors the files in this I<recentfile> as reported by
938 C<recent_events>. Options named C<after>, C<before>, C<max>, and
939 C<skip-deletes> are passed through to the L<recent_events> call. The
940 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
941 C<max_files_per_connection> and keep track of the rsynced files so
942 that future calls will rsync different files until all files are
943 brought to sync.
945 =cut
947 sub mirror {
948 my($self, %options) = @_;
949 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
950 $self->_use_tempfile (1);
951 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
952 my ($recent_events) = $self->recent_events(%passthrough);
953 my(@error, @xcollector);
954 my $first_item = 0;
955 my $last_item = $#$recent_events;
956 my $done = $self->done;
957 my $pathdb = $self->_pathdb;
958 ITEM: for my $i ($first_item..$last_item) {
959 my $status = +{};
960 $self->_mirror_item
963 $recent_events,
964 $last_item,
965 $done,
966 $pathdb,
967 \@xcollector,
968 \%options,
969 $status,
970 \@error,
972 last if $i == $last_item;
973 return if $status->{mustreturn};
975 if (@xcollector) {
976 my $success = eval { $self->_mirror_empty_xcollector (\@xcollector,$pathdb,$recent_events);};
977 if (!$success || $@) {
978 warn "Warning: Unknown error while mirroring: $@";
979 push @error, $@;
980 sleep 1;
983 if ($self->verbose) {
984 print STDERR "DONE\n";
986 # once we've gone to the end we consider ourselve free of obligations
987 $self->unseed;
988 $self->_mirror_unhide_tempfile ($trecentfile);
989 $self->_mirror_perform_delayed_ops;
990 return !@error;
993 sub _mirror_item {
994 my($self,
996 $recent_events,
997 $last_item,
998 $done,
999 $pathdb,
1000 $xcollector,
1001 $options,
1002 $status,
1003 $error,
1004 ) = @_;
1005 my $recent_event = $recent_events->[$i];
1006 return if $done->covered ( $recent_event->{epoch} );
1007 if ($pathdb) {
1008 my $rec = $pathdb->{$recent_event->{path}};
1009 if ($rec && $rec->{recentepoch}) {
1010 if (_bigfloatgt
1011 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1012 $done->register ($recent_events, [$i]);
1013 return;
1017 my $dst = $self->local_path($recent_event->{path});
1018 if ($recent_event->{type} eq "new"){
1019 $self->_mirror_item_new
1021 $dst,
1023 $last_item,
1024 $recent_events,
1025 $recent_event,
1026 $xcollector,
1027 $pathdb,
1028 $status,
1029 $error,
1030 $options,
1032 } elsif ($recent_event->{type} eq "delete") {
1033 my $activity;
1034 if ($options->{'skip-deletes'}) {
1035 $activity = "skipped";
1036 } else {
1037 if (! -e $dst) {
1038 $activity = "not_found";
1039 } elsif (-l $dst or not -d _) {
1040 $self->delayed_operations->{unlink}{$dst}++;
1041 $activity = "deleted";
1042 } else {
1043 $self->delayed_operations->{rmdir}{$dst}++;
1044 $activity = "deleted";
1047 $done->register ($recent_events, [$i]);
1048 if ($pathdb) {
1049 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1051 } else {
1052 warn "Warning: invalid upload type '$recent_event->{type}'";
1056 sub _mirror_item_new {
1057 my($self,
1058 $dst,
1060 $last_item,
1061 $recent_events,
1062 $recent_event,
1063 $xcollector,
1064 $pathdb,
1065 $status,
1066 $error,
1067 $options,
1068 ) = @_;
1069 if ($self->verbose) {
1070 my $doing = -e $dst ? "Sync" : "Get";
1071 printf STDERR
1073 "%-4s %d (%d/%d/%s) %s ... ",
1074 $doing,
1075 time,
1076 1+$i,
1077 1+$last_item,
1078 $self->interval,
1079 $recent_event->{path},
1082 my $max_files_per_connection = $self->max_files_per_connection || 42;
1083 my $success;
1084 if ($self->verbose) {
1085 print STDERR "\n";
1087 push @$xcollector, { rev => $recent_event, i => $i };
1088 if (@$xcollector >= $max_files_per_connection) {
1089 $success = eval {$self->_mirror_empty_xcollector ($xcollector,$pathdb,$recent_events);};
1090 my $sleep = $self->sleep_per_connection;
1091 $sleep = 0.42 unless defined $sleep;
1092 Time::HiRes::sleep $sleep;
1093 if ($options->{piecemeal}) {
1094 $status->{mustreturn} = 1;
1095 return;
1097 } else {
1098 return;
1100 if (!$success || $@) {
1101 warn "Warning: Error while mirroring: $@";
1102 push @$error, $@;
1103 sleep 1;
1105 if ($self->verbose) {
1106 print STDERR "DONE\n";
1110 sub _mirror_empty_xcollector {
1111 my($self,$xcoll,$pathdb,$recent_events) = @_;
1112 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1113 if ($pathdb) {
1114 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1116 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1117 @$xcoll = ();
1118 return $success;
1121 sub _mirror_register_path {
1122 my($self,$pathdb,$coll,$activity) = @_;
1123 my $time = time;
1124 for my $item (@$coll) {
1125 $pathdb->{$item->{path}} =
1127 recentepoch => $item->{epoch},
1128 ($activity."_on") => $time,
1133 sub _mirror_unhide_tempfile {
1134 my($self, $trecentfile) = @_;
1135 my $rfile = $self->rfile;
1136 if (rename $trecentfile, $rfile) {
1137 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1138 } else {
1139 require Carp;
1140 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1142 $self->_use_tempfile (0);
1143 if (my $ctfh = $self->_current_tempfile_fh) {
1144 $ctfh->unlink_on_destroy (0);
1145 $self->_current_tempfile_fh (undef);
1149 sub _mirror_perform_delayed_ops {
1150 my($self) = @_;
1151 my $delayed = $self->delayed_operations;
1152 for my $dst (keys %{$delayed->{unlink}}) {
1153 unless (unlink $dst) {
1154 require Carp;
1155 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1157 delete $delayed->{unlink}{$dst};
1159 for my $dst (keys %{$delayed->{rmdir}}) {
1160 unless (rmdir $dst) {
1161 require Carp;
1162 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1164 delete $delayed->{rmdir}{$dst};
1168 =head2 (void) $obj->mirror_loop
1170 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1171 What happens/should happen if we miss the interval during a single loop?
1173 =cut
1175 sub mirror_loop {
1176 my($self) = @_;
1177 my $iteration_start = time;
1179 my $Signal = 0;
1180 $SIG{INT} = sub { $Signal++ };
1181 my $loopinterval = $self->loopinterval || 42;
1182 my $after = -999999999;
1183 LOOP: while () {
1184 $self->mirror($after);
1185 last LOOP if $Signal;
1186 my $re = $self->recent_events;
1187 $after = $re->[0]{epoch};
1188 if ($self->verbose) {
1189 local $| = 1;
1190 print "($after)";
1192 if (time - $iteration_start < $loopinterval) {
1193 sleep $iteration_start + $loopinterval - time;
1195 if ($self->verbose) {
1196 local $| = 1;
1197 print "~";
1202 =head2 $success = $obj->mirror_path ( $arrref | $path )
1204 If the argument is a scalar it is treated as a path. The remote path
1205 is mirrored into the local copy. $path is the path found in the
1206 I<recentfile>, i.e. it is relative to the root directory of the
1207 mirror.
1209 If the argument is an array reference then all elements are treated as
1210 a path below the current tree and all are rsynced with a single
1211 command (and a single connection).
1213 =cut
1215 sub mirror_path {
1216 my($self,$path) = @_;
1217 # XXX simplify the two branches such that $path is treated as
1218 # [$path] maybe even demand the argument as an arrayref to
1219 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1220 # interface)
1221 if (ref $path and ref $path eq "ARRAY") {
1222 my $dst = $self->localroot;
1223 mkpath dirname $dst;
1224 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1225 lc $self->filenameroot,
1227 TMPDIR => 1,
1228 UNLINK => 0,
1230 for my $p (@$path) {
1231 print $fh $p, "\n";
1233 $fh->flush;
1234 $fh->unlink_on_destroy(1);
1235 my $gaveup = 0;
1236 my $retried = 0;
1237 while (!$self->rsync->exec
1239 src => join("/",
1240 $self->remoteroot,
1242 dst => $dst,
1243 'files-from' => $fh->filename,
1244 )) {
1245 my($err) = $self->rsync->err;
1246 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1247 if ($self->verbose) {
1248 warn "Info: ignoring link_stat error '$err'";
1250 return 1;
1252 $self->register_rsync_error ($err);
1253 if (++$retried >= 3) {
1254 warn "XXX giving up.";
1255 $gaveup = 1;
1256 last;
1259 unless ($gaveup) {
1260 $self->un_register_rsync_error ();
1262 } else {
1263 my $dst = $self->local_path($path);
1264 mkpath dirname $dst;
1265 while (!$self->rsync->exec
1267 src => join("/",
1268 $self->remoteroot,
1269 $path
1271 dst => $dst,
1272 )) {
1273 my($err) = $self->rsync->err;
1274 if ($self->ignore_link_stat_errors && $err =~ m{^ rsync: \s link_stat }x ) {
1275 if ($self->verbose) {
1276 warn "Info: ignoring link_stat error '$err'";
1278 return 1;
1280 $self->register_rsync_error ($err);
1282 $self->un_register_rsync_error ();
1284 return 1;
1287 sub _my_current_rfile {
1288 my($self) = @_;
1289 my $rfile;
1290 if ($self->_use_tempfile) {
1291 $rfile = $self->_current_tempfile;
1292 } else {
1293 $rfile = $self->rfile;
1295 return $rfile;
1298 =head2 $path = $obj->naive_path_normalize ($path)
1300 Takes an absolute unix style path as argument and canonicalizes it to
1301 a shorter path if possible, removing things like double slashes or
1302 C</./> and removes references to C<../> directories to get a shorter
1303 unambiguos path. This is used to make the code easier that determines
1304 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1306 =cut
1308 sub naive_path_normalize {
1309 my($self,$path) = @_;
1310 $path =~ s|/+|/|g;
1311 1 while $path =~ s|/[^/]+/\.\./|/|;
1312 $path =~ s|/$||;
1313 $path;
1316 =head2 $ret = $obj->read_recent_1 ( $data )
1318 Delegate of C<recent_events()> on protocol 1
1320 =cut
1322 sub read_recent_1 {
1323 my($self, $data) = @_;
1324 return $data->{recent};
1327 =head2 $array_ref = $obj->recent_events ( %options )
1329 Note: the code relies on the resource being written atomically. We
1330 cannot lock because we may have no write access. If the caller has
1331 write access (eg. aggregate() or update()), it has to care for any
1332 necessary locking and it MUST write atomically.
1334 If $options{after} is specified, only file events after this timestamp
1335 are returned.
1337 If $options{before} is specified, only file events before this
1338 timestamp are returned.
1340 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1341 be returned.
1343 If $options{max} is specified only a maximum of this many events is
1344 returned.
1346 If $options{contains} is specified the value must be a hash reference
1347 containing a query. The query may contain the keys C<epoch>, C<path>,
1348 and C<type>. Each represents a condition that must be met. If there is
1349 more than one such key, the conditions are ANDed.
1351 If $options{info} is specified, it must be a hashref. This hashref
1352 will be filled with metadata about the unfiltered recent_events of
1353 this object, in key C<first> there is the first item, in key C<last>
1354 is the last.
1356 =cut
1358 sub recent_events {
1359 my ($self, %options) = @_;
1360 my $info = $options{info};
1361 if ($self->is_slave) {
1362 $self->get_remote_recentfile_as_tempfile;
1364 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1365 -e $rfile_or_tempfile or return [];
1366 my $suffix = $self->serializer_suffix;
1367 my ($data) = eval {
1368 $self->_try_deserialize
1370 $suffix,
1371 $rfile_or_tempfile,
1374 my $err = $@;
1375 if ($err or !$data) {
1376 return [];
1378 my $re;
1379 if (reftype $data eq 'ARRAY') { # protocol 0
1380 $re = $data;
1381 } else {
1382 $re = $self->_recent_events_protocol_x
1384 $data,
1385 $rfile_or_tempfile,
1388 return $re unless grep {defined $options{$_}} qw(after before max);
1389 $self->_recent_events_handle_options ($re, \%options);
1392 sub _recent_events_handle_options {
1393 my($self, $re, $options) = @_;
1394 my $last_item = $#$re;
1395 my $info = $options->{info};
1396 if ($info) {
1397 $info->{first} = $re->[0];
1398 $info->{last} = $re->[-1];
1400 if (defined $options->{after}) {
1401 if ($re->[0]{epoch} > $options->{after}) {
1402 if (
1403 my $f = first
1404 {$re->[$_]{epoch} <= $options->{after}}
1405 0..$#$re
1407 $last_item = $f-1;
1409 } else {
1410 $last_item = -1;
1413 my $first_item = 0;
1414 if (defined $options->{before}) {
1415 if ($re->[0]{epoch} > $options->{before}) {
1416 if (
1417 my $f = first
1418 {$re->[$_]{epoch} < $options->{before}}
1419 0..$last_item
1421 $first_item = $f;
1423 } else {
1424 $first_item = 0;
1427 if (0 != $first_item || -1 != $last_item) {
1428 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1430 if ($options->{'skip-deletes'}) {
1431 @$re = grep { $_->{type} ne "delete" } @$re;
1433 if (my $contopt = $options->{contains}) {
1434 for my $allow (qw(epoch path type)) {
1435 if (exists $contopt->{$allow}) {
1436 my $v = delete $contopt->{$allow};
1437 @$re = grep { $_->{$allow} eq $v } @$re;
1441 if ($options->{max} && @$re > $options->{max}) {
1442 @$re = splice @$re, 0, $options->{max};
1444 $re;
1447 sub _recent_events_protocol_x {
1448 my($self,
1449 $data,
1450 $rfile_or_tempfile,
1451 ) = @_;
1452 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1453 # we may be reading meta for the first time
1454 while (my($k,$v) = each %{$data->{meta}}) {
1455 next if $k ne lc $k; # "Producers"
1456 next if defined $self->$k;
1457 $self->$k($v);
1459 my $re = $self->$meth ($data);
1460 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1461 my $minmax = { mtime => $stat[9] };
1462 if (@$re) {
1463 $minmax->{min} = $re->[-1]{epoch};
1464 $minmax->{max} = $re->[0]{epoch};
1466 $self->minmax ( $minmax );
1467 return $re;
1470 sub _try_deserialize {
1471 my($self,
1472 $suffix,
1473 $rfile_or_tempfile,
1474 ) = @_;
1475 if ($suffix eq ".yaml") {
1476 require YAML::Syck;
1477 YAML::Syck::LoadFile($rfile_or_tempfile);
1478 } elsif ($HAVE->{"Data::Serializer"}) {
1479 my $serializer = Data::Serializer->new
1480 ( serializer => $serializers{$suffix} );
1481 my $serialized = do
1483 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1484 local $/;
1485 <$fh>;
1487 $serializer->raw_deserialize($serialized);
1488 } else {
1489 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1493 sub _refresh_internals {
1494 my($self, $dst) = @_;
1495 my $class = ref $self;
1496 my $rfpeek = $class->new_from_file ($dst);
1497 for my $acc (qw(
1498 _merged
1499 minmax
1500 )) {
1501 $self->$acc ( $rfpeek->$acc );
1503 my $old_dirtymark = $self->dirtymark;
1504 my $new_dirtymark = $rfpeek->dirtymark;
1505 if ($old_dirtymark && $new_dirtymark && _bigfloatgt($new_dirtymark,$old_dirtymark)) {
1506 $self->done->reset;
1507 $self->dirtymark ( $new_dirtymark );
1508 $self->seed;
1512 =head2 $ret = $obj->rfilename
1514 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1515 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1517 =cut
1519 sub rfilename {
1520 my($self) = @_;
1521 my $file = sprintf("%s-%s%s",
1522 $self->filenameroot,
1523 $self->interval,
1524 $self->serializer_suffix,
1526 return $file;
1529 =head2 $str = $self->remote_dir
1531 The directory we are mirroring from.
1533 =cut
1535 sub remote_dir {
1536 my($self, $set) = @_;
1537 if (defined $set) {
1538 $self->_remote_dir ($set);
1540 my $x = $self->_remote_dir;
1541 $self->is_slave (1);
1542 return $x;
1545 =head2 $str = $obj->remoteroot
1547 =head2 (void) $obj->remoteroot ( $set )
1549 Get/Set the composed prefix needed when rsyncing from a remote module.
1550 If remote_host, remote_module, and remote_dir are set, it is composed
1551 from these.
1553 =cut
1555 sub remoteroot {
1556 my($self, $set) = @_;
1557 if (defined $set) {
1558 $self->_remoteroot($set);
1560 my $remoteroot = $self->_remoteroot;
1561 unless (defined $remoteroot) {
1562 $remoteroot = sprintf
1564 "%s%s%s",
1565 defined $self->remote_host ? ($self->remote_host."::") : "",
1566 defined $self->remote_module ? ($self->remote_module."/") : "",
1567 defined $self->remote_dir ? $self->remote_dir : "",
1569 $self->_remoteroot($remoteroot);
1571 return $remoteroot;
1574 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1576 Inverse method to L<rfilename>. $recentfilename is a plain filename of
1577 the pattern
1579 $filenameroot-$interval$serializer_suffix
1581 e.g.
1583 RECENT-1M.yaml
1585 This filename is split into its parts and the parts are fed to the
1586 object itself.
1588 =cut
1590 sub resolve_recentfilename {
1591 my($self, $rfname) = @_;
1592 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1593 if (my($f,$i,$s) = $rfname =~ $splitter) {
1594 $self->filenameroot ($f);
1595 $self->interval ($i);
1596 $self->serializer_suffix ($s);
1597 } else {
1598 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1600 return;
1603 =head2 my $rfile = $obj->rfile
1605 Returns the full path of the I<recentfile>
1607 =cut
1609 sub rfile {
1610 my($self) = @_;
1611 my $rfile = $self->_rfile;
1612 return $rfile if defined $rfile;
1613 $rfile = File::Spec->catfile
1614 ($self->localroot,
1615 $self->rfilename,
1617 $self->_rfile ($rfile);
1618 return $rfile;
1621 =head2 $rsync_obj = $obj->rsync
1623 The File::Rsync object that this object uses for communicating with an
1624 upstream server.
1626 =cut
1628 sub rsync {
1629 my($self) = @_;
1630 my $rsync = $self->_rsync;
1631 unless (defined $rsync) {
1632 my $rsync_options = $self->rsync_options || {};
1633 if ($HAVE->{"File::Rsync"}) {
1634 $rsync = File::Rsync->new($rsync_options);
1635 $self->_rsync($rsync);
1636 } else {
1637 die "File::Rsync required for rsync operations. Cannot continue";
1640 return $rsync;
1643 =head2 (void) $obj->register_rsync_error($err)
1645 =head2 (void) $obj->un_register_rsync_error()
1647 Register_rsync_error is called whenever the File::Rsync object fails
1648 on an exec (say, connection doesn't succeed). It issues a warning and
1649 sleeps for an increasing amount of time. Un_register_rsync_error
1650 resets the error count. See also accessor C<max_rsync_errors>.
1652 =cut
1655 my $no_success_count = 0;
1656 my $no_success_time = 0;
1657 sub register_rsync_error {
1658 my($self, $err) = @_;
1659 chomp $err;
1660 $no_success_time = time;
1661 $no_success_count++;
1662 my $max_rsync_errors = $self->max_rsync_errors;
1663 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1664 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1665 require Carp;
1666 Carp::confess
1668 sprintf
1670 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1671 $self->interval,
1672 $err,
1673 $no_success_count,
1676 my $sleep = 12 * $no_success_count;
1677 $sleep = 300 if $sleep > 300;
1678 require Carp;
1679 Carp::cluck
1680 (sprintf
1682 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1683 scalar(localtime($no_success_time)),
1684 $self->interval,
1685 $err,
1686 $sleep,
1688 sleep $sleep
1690 sub un_register_rsync_error {
1691 my($self) = @_;
1692 $no_success_time = 0;
1693 $no_success_count = 0;
1697 =head2 $clone = $obj->_sparse_clone
1699 Clones just as much from itself that it does not hurt. Experimental
1700 method.
1702 Note: what fits better: sparse or shallow? Other suggestions?
1704 =cut
1706 sub _sparse_clone {
1707 my($self) = @_;
1708 my $new = bless {}, ref $self;
1709 for my $m (qw(
1710 _interval
1711 _localroot
1712 _remoteroot
1713 _rfile
1714 _use_tempfile
1715 aggregator
1716 dirtymark
1717 filenameroot
1718 is_slave
1719 max_files_per_connection
1720 protocol
1721 rsync_options
1722 serializer_suffix
1723 sleep_per_connection
1724 verbose
1725 )) {
1726 my $o = $self->$m;
1727 $o = Storable::dclone $o if ref $o;
1728 $new->$m($o);
1730 $new;
1733 =head2 $boolean = OBJ->ttl_reached ()
1735 =cut
1737 sub ttl_reached {
1738 my($self) = @_;
1739 my $have_mirrored = $self->have_mirrored || 0;
1740 my $now = Time::HiRes::time;
1741 my $ttl = $self->ttl;
1742 $ttl = 24.2 unless defined $ttl;
1743 if ($now > $have_mirrored + $ttl) {
1744 return 1;
1746 return 0;
1749 =head2 (void) $obj->unlock()
1751 Unlocking is implemented with an C<rmdir> on a locking directory
1752 (C<.lock> appended to $rfile).
1754 =cut
1756 sub unlock {
1757 my($self) = @_;
1758 return unless $self->_is_locked;
1759 my $rfile = $self->rfile;
1760 rmdir "$rfile.lock";
1761 $self->_is_locked (0);
1764 =head2 unseed
1766 Sets this recentfile in the state of not 'seeded'.
1768 =cut
1769 sub unseed {
1770 my($self) = @_;
1771 $self->seeded(0);
1774 =head2 $ret = $obj->update ($path, $type)
1776 Enter one file into the local I<recentfile>. $path is the (usually
1777 absolute) path. If the path is outside the I<our> tree, then it is
1778 ignored.
1780 $type is one of C<new> or C<delete>.
1782 The new file event is uhshifted to the array of recent_events and the
1783 array is shortened to the length of the timespan allowed. This is
1784 usually the timespan specified by the interval of this recentfile but
1785 as long as this recentfile has not been merged to another one, the
1786 timespan may grow without bounds.
1788 =cut
1789 sub _epoch_monotonically_increasing {
1790 my($self,$epoch,$recent) = @_;
1791 return $epoch unless @$recent; # the first one goes unoffended
1792 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1793 return $epoch;
1794 } else {
1795 return _increase_a_bit($recent->[0]{epoch});
1798 sub update {
1799 my($self,$path,$type) = @_;
1800 die "update called without path argument" unless defined $path;
1801 die "update called without type argument" unless defined $type;
1802 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1803 my $canonmeth = $self->canonize;
1804 unless ($canonmeth) {
1805 $canonmeth = "naive_path_normalize";
1807 $path = $self->$canonmeth($path);
1808 my $lrd = $self->localroot;
1809 if ($path =~ s|^\Q$lrd\E||) {
1810 $path =~ s|^/||;
1811 my $interval = $self->interval;
1812 my $secs = $self->interval_secs();
1813 $self->lock;
1814 # you must calculate the time after having locked, of course
1815 my $epoch = Time::HiRes::time;
1816 my $recent = $self->recent_events;
1817 $epoch = $self->_epoch_monotonically_increasing($epoch,$recent);
1818 $recent ||= [];
1819 my $oldest_allowed = 0;
1820 if (my $merged = $self->merged) {
1821 $oldest_allowed = min($epoch - $secs, $merged->{epoch});
1822 } else {
1823 # as long as we are not merged at all, no limits!
1825 TRUNCATE: while (@$recent) {
1826 if ($recent->[-1]{epoch} < $oldest_allowed) {
1827 pop @$recent;
1828 } else {
1829 last TRUNCATE;
1832 # remove older duplicates of this $path, irrespective of $type:
1833 $recent = [ grep { $_->{path} ne $path } @$recent ];
1835 unshift @$recent, { epoch => $epoch, path => $path, type => $type };
1836 $self->write_recent($recent);
1837 $self->_assert_symlink;
1838 $self->unlock;
1842 =head2 seed
1844 Sets this recentfile in the state of 'seeded' which means it has to
1845 re-evaluate its uptodateness.
1847 =cut
1848 sub seed {
1849 my($self) = @_;
1850 $self->seeded(1);
1853 =head2 seeded
1855 Tells if the recentfile is in the state 'seeded'.
1857 =cut
1858 sub seeded {
1859 my($self, $set) = @_;
1860 if (defined $set) {
1861 $self->_seeded ($set);
1863 my $x = $self->_seeded;
1864 unless (defined $x) {
1865 $x = 0;
1866 $self->_seeded ($x);
1868 return $x;
1871 =head2 uptodate
1873 True if this object has mirrored the complete interval covered by the
1874 current recentfile.
1876 *** WIP ***
1878 =cut
1879 sub uptodate {
1880 my($self) = @_;
1881 my $uptodate;
1882 my $why;
1883 if ($self->_uptodateness_ever_reached and not $self->seeded) {
1884 $why = "saturated";
1885 $uptodate = 1;
1887 unless (defined $uptodate) {
1888 if ($self->ttl_reached){
1889 $why = "ttl_reached returned true, so we are not uptodate";
1890 $uptodate = 0 ;
1893 unless (defined $uptodate) {
1894 # look if recentfile has unchanged timestamp
1895 my $minmax = $self->minmax;
1896 if (exists $minmax->{mtime}) {
1897 my $rfile = $self->_my_current_rfile;
1898 my @stat = stat $rfile;
1899 my $mtime = $stat[9];
1900 if ($mtime > $minmax->{mtime}) {
1901 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
1902 $uptodate = 0;
1903 } else {
1904 my $covered = $self->done->covered(@$minmax{qw(max min)});
1905 $why = "minmax covered[$covered], so we return that";
1906 $uptodate = $covered;
1910 unless (defined $uptodate) {
1911 $why = "fallthrough, so not uptodate";
1912 $uptodate = 0;
1914 if ($uptodate) {
1915 $self->_uptodateness_ever_reached(1);
1916 $self->unseed;
1918 my $remember =
1920 uptodate => $uptodate,
1921 why => $why,
1923 $self->_remember_last_uptodate_call($remember);
1924 return $uptodate;
1927 =head2 $obj->write_recent ($recent_files_arrayref)
1929 Writes a I<recentfile> based on the current reflection of the current
1930 state of the tree limited by the current interval.
1932 =cut
1934 sub write_recent {
1935 my ($self,$recent) = @_;
1936 die "write_recent called without argument" unless defined $recent;
1937 my $meth = sprintf "write_%d", $self->protocol;
1938 $self->$meth($recent);
1941 =head2 $obj->write_0 ($recent_files_arrayref)
1943 Delegate of C<write_recent()> on protocol 0
1945 =cut
1947 sub write_0 {
1948 my ($self,$recent) = @_;
1949 my $rfile = $self->rfile;
1950 YAML::Syck::DumpFile("$rfile.new",$recent);
1951 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1954 =head2 $obj->write_1 ($recent_files_arrayref)
1956 Delegate of C<write_recent()> on protocol 1
1958 =cut
1960 sub write_1 {
1961 my ($self,$recent) = @_;
1962 my $rfile = $self->rfile;
1963 my $suffix = $self->serializer_suffix;
1964 my $data = {
1965 meta => $self->meta_data,
1966 recent => $recent,
1968 my $serialized;
1969 if ($suffix eq ".yaml") {
1970 $serialized = YAML::Syck::Dump($data);
1971 } elsif ($HAVE->{"Data::Serializer"}) {
1972 my $serializer = Data::Serializer->new
1973 ( serializer => $serializers{$suffix} );
1974 $serialized = $serializer->raw_serialize($data);
1975 } else {
1976 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1978 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
1979 print $fh $serialized;
1980 close $fh or die "Could not close '$rfile.new': $!";
1981 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
1984 BEGIN {
1985 my @pod_lines =
1986 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
1988 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
1990 The idea is that we want to have a short file that records really
1991 recent changes. So that a fresh mirror can be kept fresh as long as
1992 the connectivity is given. Then we want longer files that record the
1993 history before. So when the mirror falls behind the update period
1994 reflected in the shortest file, it can complement the list of recent
1995 file events with the next one. And if this is not long enough we want
1996 another one, again a bit longer. And we want one that completes the
1997 history back to the oldest file. The index files do contain the
1998 complete list of current files. The larger an index file is the less
1999 often it is updated. For practical reasons adjacent files will often
2000 overlap a bit but this is neither necessary nor enforced. That's the
2001 basic idea. The following example represents a tree that has a few
2002 updates every day:
2004 RECENT.recent -> RECENT-1h.yaml
2005 RECENT-6h.yaml
2006 RECENT-1d.yaml
2007 RECENT-1M.yaml
2008 RECENT-1W.yaml
2009 RECENT-1Q.yaml
2010 RECENT-1Y.yaml
2011 RECENT-Z.yaml
2013 The first file is the principal file, in so far it is the one that is
2014 written first after a filesystem change. Usually a symlink links to it
2015 with a filename that has the same filenameroot and the suffix
2016 C<.recent>. On systems that do not support symlinks there is a plain
2017 copy maintained instead.
2019 The last file, the Z file, contains the complementary files that are
2020 in none of the other files. It does never contain C<deletes>. Besides
2021 this it serves the role of a recovery mechanism or spill over pond.
2022 When things go wrong, it's a valuable controlling instance to hold the
2023 differences between the collection of limited interval files and the
2024 actual filesystem.
2026 =head2 A SINGLE RECENTFILE
2028 A I<recentfile> consists of a hash that has two keys: C<meta> and
2029 C<recent>. The C<meta> part has metadata and the C<recent> part has a
2030 list of fileobjects.
2032 =head2 THE META PART
2034 Here we find things that are pretty much self explaining: all
2035 lowercase attributes are accessors and as such explained somewhere
2036 above in this manpage. The uppercase attribute C<Producers> contains
2037 version information about involved software components. Nothing to
2038 worry about as I believe.
2040 =head2 THE RECENT PART
2042 This is the interesting part. Every entry refers to some filesystem
2043 change (with path, epoch, type). The epoch value is the point in time
2044 when some change was I<registered>. Do not be tempted to believe that
2045 the entry has a direct relation to something like modification time or
2046 change time on the filesystem level. The timestamp (I<epoch> element)
2047 is a floating point number and does practically never correspond
2048 exactly to the data recorded in the filesystem but rather to the time
2049 when some process succeeded to report to the I<recentfile> mechanism
2050 that something has changed. This is why many parts of the code refer
2051 to I<events>, because we merely try to record the I<event> of the
2052 discovery of a change, not the time of the change itself.
2054 All these entries can be devided into two types (denoted by the
2055 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
2056 C<new>s. Deletes are C<delete>s.
2058 Another distinction is for objects with an epoch timestamp and others
2059 without. All files that were already existing on the filesystem before
2060 the I<recentfile> mechanism was installed, get recorded with a
2061 timestamp of zero.
2063 Besides an C<epoch> and a C<type> attribute we find a third one:
2064 C<path>. This path is relative to the directory we find the
2065 I<recentfile> in.
2067 The order of the entries in the I<recentfile> is by decreasing epoch
2068 attribute. These are either 0 or a unique floating point number. They
2069 are zero for events that were happening either before the time that
2070 the I<recentfile> mechanism was set up or were left undiscovered for a
2071 while and never handed over to update(). They are floating point
2072 numbers for all events being regularly handed to update(). And when
2073 the server has ntp running correctly, then the timestamps are
2074 actually decreasing and unique.
2076 =head1 CORRUPTION AND RECOVERY
2078 If the origin host breaks the promise to deliver consistent and
2079 complete I<recentfiles> then the way back to sanity shall be achieved
2080 through either the C<zloop> (still TBD) or traditional rsyncing
2081 between the hosts. For example, if the origin server forgets to deploy
2082 ntp and the clock on it jumps backwards some day, then this would
2083 probably go unnoticed for a while and many software components that
2084 rely on the time never running backwards will make wrong decisions.
2085 After some time this accident would probably still be found in one of
2086 the I<recentfiles> but would become meaningless as soon as a mirror
2087 has run through the sanitizing procedures. Same goes for origin hosts
2088 that forget to include or deliberately omit some files.
2090 =head1 SERIALIZERS
2092 The following suffixes are supported and trigger the use of these
2093 serializers:
2095 =over 4
2097 =item C<< ".yaml" => "YAML::Syck" >>
2099 =item C<< ".json" => "JSON" >>
2101 =item C<< ".sto" => "Storable" >>
2103 =item C<< ".dd" => "Data::Dumper" >>
2105 =back
2107 =cut
2109 BEGIN {
2110 my @pod_lines =
2111 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2113 =head1 INTERVAL SPEC
2115 An interval spec is a primitive way to express time spans. Normally it
2116 is composed from an integer and a letter.
2118 As a special case, a string that consists only of the single letter
2119 C<Z>, stands for unlimited time.
2121 The following letters express the specified number of seconds:
2123 =over 4
2125 =item C<< s => 1 >>
2127 =item C<< m => 60 >>
2129 =item C<< h => 60*60 >>
2131 =item C<< d => 60*60*24 >>
2133 =item C<< W => 60*60*24*7 >>
2135 =item C<< M => 60*60*24*30 >>
2137 =item C<< Q => 60*60*24*90 >>
2139 =item C<< Y => 60*60*24*365.25 >>
2141 =back
2143 =cut
2145 =head1 BACKGROUND
2147 This is about speeding up rsync operation on large trees to many
2148 places. Uses a small metadata cocktail and pull technology.
2150 =head2 NON-COMPETITORS
2152 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
2153 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
2154 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
2155 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
2156 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
2157 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
2159 rsnapshot www.rsnapshot.org focus on backup
2160 csync www.csync.org more like unison
2161 multi-rsync sourceforge 167893 lan push to many
2163 =head2 COMPETITORS
2165 The problem to solve which clusters and ftp mirrors and otherwise
2166 replicated datasets like CPAN share: how to transfer only a minimum
2167 amount of data to determine the diff between two hosts.
2169 Normally it takes a long time to determine the diff itself before it
2170 can be transferred. Known solutions at the time of this writing are
2171 csync2, and rsync 3 batch mode.
2173 For many years the best solution was csync2 which solves the
2174 problem by maintining a sqlite database on both ends and talking a
2175 highly sophisticated protocol to quickly determine which files to send
2176 and which to delete at any given point in time. Csync2 is often
2177 inconvenient because the act of syncing demands quite an intimate
2178 relationship between the sender and the receiver and suffers when the
2179 number of syncing sites is large or connections are unreliable.
2181 Rsync 3 batch mode works around these problems by providing rsync-able
2182 batch files which allow receiving nodes to replay the history of the
2183 other nodes. This reduces the need to have an incestuous relation but
2184 it has the disadvantage that these batch files replicate the contents
2185 of the involved files. This seems inappropriate when the nodes already
2186 have a means of communicating over rsync.
2188 rersyncrecent solves this problem with a couple of (usually 2-10)
2189 index files which cover different overlapping time intervals. The
2190 master writes these files and the clients can construct the full tree
2191 from the information contained in them. The most recent index file
2192 usually covers the last seconds or minutes or hours of the tree and
2193 depending on the needs, slaves can rsync every few seconds and then
2194 bring their trees in full sync.
2196 The rersyncrecent mode was developed for CPAN but I hope it is a
2197 convenient and economic general purpose solution. I'm looking forward
2198 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2199 then ... the first FUSE based CPAN filesystem anyone?
2201 =head1 SEE ALSO
2203 Barbie is providing a database of release dates. See
2204 http://use.perl.org/~barbie/journal/37907
2206 =head1 AUTHOR
2208 Andreas König
2210 =head1 BUGS
2212 Please report any bugs or feature requests through the web interface
2214 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2215 I will be notified, and then you'll automatically be notified of
2216 progress on your bug as I make changes.
2218 =head1 SUPPORT
2220 You can find documentation for this module with the perldoc command.
2222 perldoc File::Rsync::Mirror::Recentfile
2224 You can also look for information at:
2226 =over 4
2228 =item * RT: CPAN's request tracker
2230 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2232 =item * AnnoCPAN: Annotated CPAN documentation
2234 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2236 =item * CPAN Ratings
2238 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2240 =item * Search CPAN
2242 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2244 =back
2247 =head1 ACKNOWLEDGEMENTS
2249 Thanks to RJBS for module-starter.
2251 =head1 COPYRIGHT & LICENSE
2253 Copyright 2008 Andreas König.
2255 This program is free software; you can redistribute it and/or modify it
2256 under the same terms as Perl itself.
2259 =cut
2261 1; # End of File::Rsync::Mirror::Recentfile
2263 # Local Variables:
2264 # mode: cperl
2265 # cperl-indent-level: 4
2266 # End: