honour any change in the dirtymark, be it an increase or a decrease: we want them...
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blobc124b9bbc3db154d5bf888209a51b62e4e53aa47
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =cut
14 my $HAVE = {};
15 for my $package (
16 "Data::Serializer",
17 "File::Rsync"
18 ) {
19 $HAVE->{$package} = eval qq{ require $package; };
21 use Config;
22 use File::Basename qw(basename dirname fileparse);
23 use File::Copy qw(cp);
24 use File::Path qw(mkpath);
25 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
26 use File::Temp;
27 use List::Util qw(first max min);
28 use Scalar::Util qw(reftype);
29 use Storable;
30 use Time::HiRes qw();
31 use YAML::Syck;
33 use version; our $VERSION = qv('0.0.1');
35 use constant MAX_INT => ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL => 1;
38 # cf. interval_secs
39 my %seconds;
41 # maybe subclass if this mapping is bad?
42 my %serializers;
44 =head1 SYNOPSIS
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
51 interval => q(6h),
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
59 Reader/mirrorer:
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
64 ignore_link_stat_errors => 1,
65 interval => q(6h),
66 localroot => "/home/ftp/pub/PAUSE/authors",
67 remote_dir => "",
68 remote_host => "pause.perl.org",
69 remote_module => "authors",
70 rsync_options => {
71 compress => 1,
72 'rsync-path' => '/usr/bin/rsync',
73 links => 1,
74 times => 1,
75 'omit-dir-times' => 1,
76 checksum => 1,
78 verbose => 1,
80 $rf->mirror;
82 Aggregator (usually the writer):
84 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
85 $rf->aggregate;
87 =head1 DESCRIPTION
89 Lower level than F:R:M:Recent. Handles only one recentfile whereas a
90 tree always is composed of several recentfiles. The single recentfile
91 has to do the bookkeeping for a timeslice.
93 =head1 EXPORT
95 No exports.
97 =head1 CONSTRUCTORS / DESTRUCTOR
99 =head2 my $obj = CLASS->new(%hash)
101 Constructor. On every argument pair the key is a method name and the
102 value is an argument to that method name.
104 If a recentfile for this resource already exists, metadata that are
105 not defined by the constructor will be fetched from there as soon as
106 it is being read by recent_events().
108 =cut
110 sub new {
111 my($class, @args) = @_;
112 my $self = bless {}, $class;
113 while (@args) {
114 my($method,$arg) = splice @args, 0, 2;
115 $self->$method($arg);
117 unless (defined $self->protocol) {
118 $self->protocol(DEFAULT_PROTOCOL);
120 unless (defined $self->filenameroot) {
121 $self->filenameroot("RECENT");
123 unless (defined $self->serializer_suffix) {
124 $self->serializer_suffix(".yaml");
126 return $self;
129 =head2 my $obj = CLASS->new_from_file($file)
131 Constructor. $file is a I<recentfile>.
133 =cut
135 sub new_from_file {
136 my($class, $file) = @_;
137 my $self = bless {}, $class;
138 $self->_rfile($file);
139 #?# $self->lock;
140 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
141 local $/;
142 <$fh>;
144 # XXX: we can skip this step when the metadata are sufficient, but
145 # we cannot parse the file without some magic stuff about
146 # serialized formats
147 while (-l $file) {
148 my($name,$path) = fileparse $file;
149 my $symlink = readlink $file;
150 if ($symlink =~ m|/|) {
151 die "FIXME: filenames containing '/' not supported, got $symlink";
153 $file = File::Spec->catfile ( $path, $symlink );
155 my($name,$path,$suffix) = fileparse $file, keys %serializers;
156 $self->serializer_suffix($suffix);
157 $self->localroot($path);
158 die "Could not determine file format from suffix" unless $suffix;
159 my $deserialized;
160 if ($suffix eq ".yaml") {
161 require YAML::Syck;
162 $deserialized = YAML::Syck::LoadFile($file);
163 } elsif ($HAVE->{"Data::Serializer"}) {
164 my $serializer = Data::Serializer->new
165 ( serializer => $serializers{$suffix} );
166 $deserialized = $serializer->raw_deserialize($serialized);
167 } else {
168 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
170 while (my($k,$v) = each %{$deserialized->{meta}}) {
171 next if $k ne lc $k; # "Producers"
172 $self->$k($v);
174 unless (defined $self->protocol) {
175 $self->protocol(DEFAULT_PROTOCOL);
177 return $self;
180 =head2 DESTROY
182 A simple unlock.
184 =cut
185 sub DESTROY { shift->unlock }
187 =head1 ACCESSORS
189 =cut
191 my @accessors;
193 BEGIN {
194 @accessors = (
195 "_current_tempfile",
196 "_current_tempfile_fh",
197 "_delayed_operations",
198 "_done",
199 "_interval",
200 "_is_locked",
201 "_localroot",
202 "_merged",
203 "_pathdb",
204 "_remember_last_uptodate_call",
205 "_remote_dir",
206 "_remoteroot",
207 "_rfile",
208 "_rsync",
209 "_seeded",
210 "_uptodateness_ever_reached",
211 "_use_tempfile",
214 my @pod_lines =
215 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
217 =over 4
219 =item aggregator
221 A list of interval specs that tell the aggregator which I<recentfile>s
222 are to be produced.
224 =item canonize
226 The name of a method to canonize the path before rsyncing. Only
227 supported value is C<naive_path_normalize>. Defaults to that.
229 =item comment
231 A comment about this tree and setup.
233 =item dirtymark
235 A timestamp. The dirtymark is updated whenever an out of band change
236 on the origin server is performed that violates the protocol. Say,
237 they add or remove files in the middle somewhere. Slaves must react
238 with a devaluation of their C<done> structure which then leads to a
239 full re-sync of all files. Implementation note: dirtymark may increase
240 or decrease.
242 =item filenameroot
244 The (prefix of the) filename we use for this I<recentfile>. Defaults to
245 C<RECENT>. The string must not contain a directory separator.
247 =item have_mirrored
249 Timestamp remembering when we mirrored this recentfile the last time.
250 Only relevant for slaves.
252 =item ignore_link_stat_errors
254 If set to true, rsync errors are ignored that complain about link stat
255 errors. These seem to happen only when there are files missing at the
256 origin. In race conditions this can always happen, so it is
257 recommended to set this value to true.
259 =item is_slave
261 If set to true, this object will fetch a new recentfile from remote
262 when the timespan between the last mirror (see have_mirrored) and now
263 is too large (see C<ttl>).
265 =item keep_delete_objects_forever
267 The default for delete events is that they are passed through the
268 collection of recentfile objects until they reach the Z file. There
269 they get dropped so that the associated file object ceases to exist at
270 all. By setting C<keep_delete_objects_forever> the delete objects are
271 kept forever. This makes the Z file larger but has the advantage that
272 slaves that have interrupted mirroring for a long time still can clean
273 up their copy.
275 =item locktimeout
277 After how many seconds shall we die if we cannot lock a I<recentfile>?
278 Defaults to 600 seconds.
280 =item loopinterval
282 When mirror_loop is called, this accessor can specify how much time
283 every loop shall at least take. If the work of a loop is done before
284 that time has gone, sleeps for the rest of the time. Defaults to
285 arbitrary 42 seconds.
287 =item max_files_per_connection
289 Maximum number of files that are transferred on a single rsync call.
290 Setting it higher means higher performance at the price of holding
291 connections longer and potentially disturbing other users in the pool.
292 Defaults to the arbitrary value 42.
294 =item max_rsync_errors
296 When rsync operations encounter that many errors without any resetting
297 success in between, then we die. Defaults to unlimited. A value of
298 -1 means we run forever ignoring all rsync errors.
300 =item minmax
302 Hashref remembering when we read the recent_events from this file the
303 last time and what the timespan was.
305 =item protocol
307 When the RECENT file format changes, we increment the protocol. We try
308 to support older protocols in later releases.
310 =item remote_host
312 The host we are mirroring from. Leave empty for the local filesystem.
314 =item remote_module
316 Rsync servers have so called modules to separate directory trees from
317 each other. Put here the name of the module under which we are
318 mirroring. Leave empty for local filesystem.
320 =item rsync_options
322 Things like compress, links, times or checksums. Passed in to the
323 File::Rsync object used to run the mirror.
325 =item serializer_suffix
327 Mostly untested accessor. The only well tested format for
328 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
329 Data::Serializer. But in principle other formats are supported as
330 well. See section SERIALIZERS below.
332 =item sleep_per_connection
334 Sleep that many seconds (floating point OK) after every chunk of rsyncing
335 has finished. Defaults to arbitrary 0.42.
337 =item ttl
339 Time to live. Number of seconds after which this recentfile must be
340 fetched again from the origin server. Only relevant for slaves.
341 Defaults to arbitrary 24.2 seconds.
343 =item verbose
345 Boolean to turn on a bit verbosity.
347 =back
349 =cut
351 use accessors @accessors;
353 =head1 METHODS
355 =head2 (void) $obj->aggregate( %options )
357 Takes all intervals that are collected in the accessor called
358 aggregator. Sorts them by actual length of the interval.
359 Removes those that are shorter than our own interval. Then merges this
360 object into the next larger object. The merging continues upwards
361 as long as the next I<recentfile> is old enough to warrant a merge.
363 If a merge is warranted is decided according to the interval of the
364 previous interval so that larger files are not so often updated as
365 smaller ones. If $options{force} is true, all files get updated.
367 Here is an example to illustrate the behaviour. Given aggregators
369 1h 1d 1W 1M 1Q 1Y Z
371 then
373 1h updates 1d on every call to aggregate()
374 1d updates 1W earliest after 1h
375 1W updates 1M earliest after 1d
376 1M updates 1Q earliest after 1W
377 1Q updates 1Y earliest after 1M
378 1Y updates Z earliest after 1Q
380 Note that all but the smallest recentfile get updated at an arbitrary
381 rate and as such are quite useless on their own.
383 =cut
385 sub aggregate {
386 my($self, %option) = @_;
387 my @aggs = sort { $a->{secs} <=> $b->{secs} }
388 grep { $_->{secs} >= $self->interval_secs }
389 map { { interval => $_, secs => $self->interval_secs($_)} }
390 $self->interval, @{$self->aggregator || []};
391 $self->update;
392 $aggs[0]{object} = $self;
393 AGGREGATOR: for my $i (0..$#aggs-1) {
394 my $this = $aggs[$i]{object};
395 my $next = $this->_sparse_clone;
396 $next->interval($aggs[$i+1]{interval});
397 my $want_merge = 0;
398 if ($option{force} || $i == 0) {
399 $want_merge = 1;
400 } else {
401 my $next_rfile = $next->rfile;
402 if (-e $next_rfile) {
403 my $prev = $aggs[$i-1]{object};
404 local $^T = time;
405 my $next_age = 86400 * -M $next_rfile;
406 if ($next_age > $prev->interval_secs) {
407 $want_merge = 1;
409 } else {
410 $want_merge = 1;
413 if ($want_merge) {
414 $next->merge($this);
415 $aggs[$i+1]{object} = $next;
416 } else {
417 last AGGREGATOR;
422 # collect file size and mtime for all files of this aggregate
423 sub _debug_aggregate {
424 my($self) = @_;
425 my @aggs = sort { $a->{secs} <=> $b->{secs} }
426 map { { interval => $_, secs => $self->interval_secs($_)} }
427 $self->interval, @{$self->aggregator || []};
428 my $report = [];
429 for my $i (0..$#aggs) {
430 my $this = Storable::dclone $self;
431 $this->interval($aggs[$i]{interval});
432 my $rfile = $this->rfile;
433 my @stat = stat $rfile;
434 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
436 $report;
439 # (void) $self->_assert_symlink()
440 sub _assert_symlink {
441 my($self) = @_;
442 my $recentrecentfile = File::Spec->catfile
444 $self->localroot,
445 sprintf
447 "%s.recent",
448 $self->filenameroot
451 if ($Config{d_symlink} eq "define") {
452 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
453 if (-l $recentrecentfile) {
454 my $found_symlink = readlink $recentrecentfile;
455 if ($found_symlink eq $self->rfilename) {
456 return;
457 } else {
458 $howto_create_symlink = 2;
460 } else {
461 $howto_create_symlink = 1;
463 if (1 == $howto_create_symlink) {
464 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
465 } else {
466 unlink "$recentrecentfile.$$"; # may fail
467 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
468 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
470 } else {
471 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
472 unlink "$recentrecentfile.$$"; # may fail
473 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
474 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
478 =head2 $hashref = $obj->delayed_operations
480 A hash of hashes containing unlink and rmdir operations which had to
481 wait until the recentfile got unhidden in order to not confuse
482 downstream mirrors (in case we have some).
484 =cut
486 sub delayed_operations {
487 my($self) = @_;
488 my $x = $self->_delayed_operations;
489 unless (defined $x) {
490 $x = {
491 unlink => {},
492 rmdir => {},
494 $self->_delayed_operations ($x);
496 return $x;
499 =head2 $done = $obj->done
501 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
502 object that keeps track of rsync activities. Only needed and used when
503 we are a mirroring slave.
505 =cut
507 sub done {
508 my($self) = @_;
509 my $done = $self->_done;
510 if (!$done) {
511 require File::Rsync::Mirror::Recentfile::Done;
512 $done = File::Rsync::Mirror::Recentfile::Done->new();
513 $done->_rfinterval ($self->interval);
514 $self->_done ( $done );
516 return $done;
519 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
521 Stores the remote I<recentfile> locally as a tempfile. The caller is
522 responsible to remove the file after use.
524 Note: if you're intending to act as an rsync server for other slaves,
525 then you must prefer this method to fetch that file with
526 get_remotefile(). Otherwise downstream mirrors would expect you to
527 already have mirrored all the files that are in the I<recentfile>
528 before you have them mirrored.
530 =cut
532 sub get_remote_recentfile_as_tempfile {
533 my($self) = @_;
534 mkpath $self->localroot;
535 my $fh;
536 my $trfilename;
537 if ( $self->_use_tempfile() ) {
538 return $self->_current_tempfile if ! $self->ttl_reached;
539 $fh = $self->_current_tempfile_fh;
540 $trfilename = $self->rfilename;
541 } else {
542 $trfilename = $self->rfilename;
545 my $dst;
546 if ($fh) {
547 $dst = $self->_current_tempfile;
548 } else {
549 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
550 $dst = $fh->filename;
551 $self->_current_tempfile ($dst);
552 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
553 if (defined $rfile && -e $rfile) {
554 # saving on bandwidth. Might need to be configurable
555 # $self->bandwidth_is_cheap?
556 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
559 my $src = join ("/",
560 $self->remoteroot,
561 $trfilename,
563 if ($self->verbose) {
564 my $doing = -e $dst ? "Sync" : "Get";
565 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
566 printf STDERR
568 "%-4s %d (1/1/%s) temp %s ... ",
569 $doing,
570 time,
571 $self->interval,
572 $display_dst,
575 my $gaveup = 0;
576 my $retried = 0;
577 while (!$self->rsync->exec(
578 src => $src,
579 dst => $dst,
580 )) {
581 $self->register_rsync_error ($self->rsync->err);
582 if (++$retried >= 3) {
583 warn "XXX giving up";
584 $gaveup = 1;
585 last;
588 if ($gaveup) {
589 printf STDERR "Warning: gave up mirroring %s, will try again later", $self->interval;
590 } else {
591 $self->_refresh_internals ($dst);
592 $self->have_mirrored (Time::HiRes::time);
593 $self->un_register_rsync_error ();
595 $self->unseed;
596 if ($self->verbose) {
597 print STDERR "DONE\n";
599 my $mode = 0644;
600 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
601 return $dst;
604 sub _get_remote_rat_provide_tempfile_object {
605 my($self, $trfilename) = @_;
606 my $fh = File::Temp->new
607 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
608 $trfilename,
610 DIR => $self->localroot,
611 SUFFIX => $self->serializer_suffix,
612 UNLINK => $self->_use_tempfile,
614 if ($self->_use_tempfile) {
615 $self->_current_tempfile_fh ($fh); # delay self destruction
617 return $fh;
620 =head2 $localpath = $obj->get_remotefile ( $relative_path )
622 Rsyncs one single remote file to local filesystem.
624 Note: no locking is done on this file. Any number of processes may
625 mirror this object.
627 Note II: do not use for recentfiles. If you are a cascading
628 slave/server combination, it would confuse other slaves. They would
629 expect the contents of these recentfiles to be available. Use
630 get_remote_recentfile_as_tempfile() instead.
632 =cut
634 sub get_remotefile {
635 my($self, $path) = @_;
636 my $dst = File::Spec->catfile($self->localroot, $path);
637 mkpath dirname $dst;
638 if ($self->verbose) {
639 my $doing = -e $dst ? "Sync" : "Get";
640 printf STDERR
642 "%-4s %d (1/1/%s) %s ... ",
643 $doing,
644 time,
645 $self->interval,
646 $path,
649 while (!$self->rsync->exec(
650 src => join("/",
651 $self->remoteroot,
652 $path),
653 dst => $dst,
654 )) {
655 $self->register_rsync_error ($self->rsync->err);
657 $self->un_register_rsync_error ();
658 if ($self->verbose) {
659 print STDERR "DONE\n";
661 return $dst;
664 =head2 $obj->interval ( $interval_spec )
666 Get/set accessor. $interval_spec is a string and described below in
667 the section INTERVAL SPEC.
669 =cut
671 sub interval {
672 my ($self, $interval) = @_;
673 if (@_ >= 2) {
674 $self->_interval($interval);
675 $self->_rfile(undef);
677 $interval = $self->_interval;
678 unless (defined $interval) {
679 # do not ask the $self too much, it recurses!
680 require Carp;
681 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
683 return $interval;
686 =head2 $secs = $obj->interval_secs ( $interval_spec )
688 $interval_spec is described below in the section INTERVAL SPEC. If
689 empty defaults to the inherent interval for this object.
691 =cut
693 sub interval_secs {
694 my ($self, $interval) = @_;
695 $interval ||= $self->interval;
696 unless (defined $interval) {
697 die "interval_secs() called without argument on an object without a declared one";
699 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
700 die "Could not determine seconds from interval[$interval]";
701 if ($interval eq "Z") {
702 return MAX_INT;
703 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
704 return $seconds{$t}*$n;
705 } else {
706 die "Invalid interval specification: n[$n]t[$t]";
710 =head2 $obj->localroot ( $localroot )
712 Get/set accessor. The local root of the tree.
714 =cut
716 sub localroot {
717 my ($self, $localroot) = @_;
718 if (@_ >= 2) {
719 $self->_localroot($localroot);
720 $self->_rfile(undef);
722 $localroot = $self->_localroot;
725 =head2 $ret = $obj->local_path($path_found_in_recentfile)
727 Combines the path to our local mirror and the path of an object found
728 in this I<recentfile>. In other words: the target of a mirror operation.
730 Implementation note: We split on slashes and then use
731 File::Spec::catfile to adjust to the local operating system.
733 =cut
735 sub local_path {
736 my($self,$path) = @_;
737 unless (defined $path) {
738 # seems like a degenerated case
739 return $self->localroot;
741 my @p = split m|/|, $path;
742 File::Spec->catfile($self->localroot,@p);
745 =head2 (void) $obj->lock
747 Locking is implemented with an C<mkdir> on a locking directory
748 (C<.lock> appended to $rfile).
750 =cut
752 sub lock {
753 my ($self) = @_;
754 # not using flock because it locks on filehandles instead of
755 # old school ressources.
756 my $locked = $self->_is_locked and return;
757 my $rfile = $self->rfile;
758 # XXX need a way to allow breaking the lock
759 my $start = time;
760 my $locktimeout = $self->locktimeout || 600;
761 while (not mkdir "$rfile.lock") {
762 Time::HiRes::sleep 0.01;
763 if (time - $start > $locktimeout) {
764 die "Could not acquire lockdirectory '$rfile.lock': $!";
767 $self->_is_locked (1);
770 =head2 (void) $obj->merge ($other)
772 Bulk update of this object with another one. It's used to merge a
773 smaller and younger $other object into the current one. If this file
774 is a C<Z> file, then we normally do not merge in objects of type
775 C<delete>; this can be overridden by setting
776 keep_delete_objects_forever. But if we encounter an object of type
777 delete we delete the corresponding C<new> object if we have it.
779 If there is nothing to be merged, nothing is done.
781 =cut
783 sub merge {
784 my($self, $other) = @_;
785 $self->_merge_sanitycheck ( $other );
786 $other->lock;
787 my $other_recent = $other->recent_events || [];
788 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
789 $self->lock;
790 my $my_recent = $self->recent_events || [];
792 # calculate the target time span
793 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
794 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
795 my $oldest_allowed = 0;
796 my $something_done;
797 unless ($my_recent->[0]) {
798 # obstetrics
799 $something_done = 1;
801 if ($epoch) {
802 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
803 $oldest_allowed = 0;
804 $something_done = 1;
805 } elsif (my $merged = $self->merged) {
806 my $secs = $self->interval_secs();
807 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
808 if (@$other_recent and
809 _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
811 $oldest_allowed = $other_recent->[-1]{epoch};
814 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
815 pop @$my_recent;
816 $something_done = 1;
820 my %have_path;
821 my $other_recent_filtered = [];
822 for my $oev (@$other_recent) {
823 my $oevepoch = $oev->{epoch} || 0;
824 next if _bigfloatlt($oevepoch, $oldest_allowed);
825 my $path = $oev->{path};
826 next if $have_path{$path}++;
827 if ( $self->interval eq "Z"
828 and $oev->{type} eq "delete"
829 and ! $self->keep_delete_objects_forever
831 # do nothing
832 } else {
833 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
834 $something_done = 1;
836 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
839 if ($something_done) {
840 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
842 $self->unlock;
843 $other->unlock;
846 sub _merge_something_done {
847 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
848 my $recent = [];
849 my $epoch_conflict = 0;
850 my $last_epoch;
851 ZIP: while (@$other_recent_filtered || @$my_recent) {
852 my $event;
853 if (!@$my_recent ||
854 @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
855 $event = shift @$other_recent_filtered;
856 } else {
857 $event = shift @$my_recent;
858 next ZIP if $have_path->{$event->{path}}++;
860 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
861 $last_epoch = $event->{epoch};
862 push @$recent, $event;
864 if ($epoch_conflict) {
865 my %have_epoch;
866 for (my $i = $#$recent;$i>=0;$i--) {
867 my $epoch = $recent->[$i]{epoch};
868 if ($have_epoch{$epoch}++) {
869 while ($have_epoch{$epoch}) {
870 $epoch = _increase_a_bit($epoch);
872 $recent->[$i]{epoch} = $epoch;
873 $have_epoch{$epoch}++;
877 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
878 $self->dirtymark ( $other->dirtymark );
880 $self->write_recent($recent);
881 $other->merged({
882 time => Time::HiRes::time, # not used anywhere
883 epoch => $recent->[0]{epoch},
884 into_interval => $self->interval, # not used anywhere
886 $other->write_recent($other_recent);
889 sub _merge_sanitycheck {
890 my($self, $other) = @_;
891 if ($self->interval_secs <= $other->interval_secs) {
892 die sprintf
894 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
895 $self->interval_secs,
896 $other->interval_secs,
901 =head2 merged
903 Hashref denoting when this recentfile has been merged into some other
904 at which epoch.
906 =cut
908 sub merged {
909 my($self, $set) = @_;
910 if (defined $set) {
911 $self->_merged ($set);
913 my $merged = $self->_merged;
914 my $into;
915 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
916 # sanity checks
917 if ($into eq $self->interval) {
918 require Carp;
919 Carp::cluck(sprintf
921 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
922 $into,
923 $self->interval,
925 } elsif ($self->interval_secs($into) < $self->interval_secs) {
926 require Carp;
927 Carp::cluck(sprintf
929 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
930 $self->interval_secs($into),
931 $self->interval_secs,
932 $self->interval,
936 $merged;
939 =head2 $hashref = $obj->meta_data
941 Returns the hashref of metadata that the server has to add to the
942 I<recentfile>.
944 =cut
946 sub meta_data {
947 my($self) = @_;
948 my $ret = $self->{meta};
949 for my $m (
950 "aggregator",
951 "canonize",
952 "comment",
953 "dirtymark",
954 "filenameroot",
955 "interval",
956 "merged",
957 "minmax",
958 "protocol",
959 "serializer_suffix",
961 my $v = $self->$m;
962 if (defined $v) {
963 $ret->{$m} = $v;
966 # XXX need to reset the Producer if I am a writer, keep it when I
967 # am a reader
968 $ret->{Producers} ||= {
969 __PACKAGE__, "$VERSION", # stringified it looks better
970 '$0', $0,
971 'time', Time::HiRes::time,
973 $ret->{dirtymark} ||= Time::HiRes::time;
974 return $ret;
977 =head2 $success = $obj->mirror ( %options )
979 Mirrors the files in this I<recentfile> as reported by
980 C<recent_events>. Options named C<after>, C<before>, C<max>, and
981 C<skip-deletes> are passed through to the C<recent_events> call. The
982 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
983 C<max_files_per_connection> and keep track of the rsynced files so
984 that future calls will rsync different files until all files are
985 brought to sync.
987 =cut
989 sub mirror {
990 my($self, %options) = @_;
991 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
992 $self->_use_tempfile (1);
993 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
994 my ($recent_events) = $self->recent_events(%passthrough);
995 my(@error, @dlcollector); # download-collector: array containing paths we need
996 my $first_item = 0;
997 my $last_item = $#$recent_events;
998 my $done = $self->done;
999 my $pathdb = $self->_pathdb;
1000 ITEM: for my $i ($first_item..$last_item) {
1001 my $status = +{};
1002 $self->_mirror_item
1005 $recent_events,
1006 $last_item,
1007 $done,
1008 $pathdb,
1009 \@dlcollector,
1010 \%options,
1011 $status,
1012 \@error,
1014 last if $i == $last_item;
1015 return if $status->{mustreturn};
1017 if (@dlcollector) {
1018 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
1019 if (!$success || $@) {
1020 warn "Warning: Unknown error while mirroring: $@";
1021 push @error, $@;
1022 sleep 1;
1025 if ($self->verbose) {
1026 print STDERR "DONE\n";
1028 # once we've gone to the end we consider ourselves free of obligations
1029 $self->unseed;
1030 $self->_mirror_unhide_tempfile ($trecentfile);
1031 $self->_mirror_perform_delayed_ops;
1032 return !@error;
1035 sub _mirror_item {
1036 my($self,
1038 $recent_events,
1039 $last_item,
1040 $done,
1041 $pathdb,
1042 $dlcollector,
1043 $options,
1044 $status,
1045 $error,
1046 ) = @_;
1047 my $recent_event = $recent_events->[$i];
1048 return if $done->covered ( $recent_event->{epoch} );
1049 if ($pathdb) {
1050 my $rec = $pathdb->{$recent_event->{path}};
1051 if ($rec && $rec->{recentepoch}) {
1052 if (_bigfloatgt
1053 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1054 $done->register ($recent_events, [$i]);
1055 return;
1059 my $dst = $self->local_path($recent_event->{path});
1060 if ($recent_event->{type} eq "new"){
1061 $self->_mirror_item_new
1063 $dst,
1065 $last_item,
1066 $recent_events,
1067 $recent_event,
1068 $dlcollector,
1069 $pathdb,
1070 $status,
1071 $error,
1072 $options,
1074 } elsif ($recent_event->{type} eq "delete") {
1075 my $activity;
1076 if ($options->{'skip-deletes'}) {
1077 $activity = "skipped";
1078 } else {
1079 if (! -e $dst) {
1080 $activity = "not_found";
1081 } elsif (-l $dst or not -d _) {
1082 $self->delayed_operations->{unlink}{$dst}++;
1083 $activity = "deleted";
1084 } else {
1085 $self->delayed_operations->{rmdir}{$dst}++;
1086 $activity = "deleted";
1089 $done->register ($recent_events, [$i]);
1090 if ($pathdb) {
1091 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1093 } else {
1094 warn "Warning: invalid upload type '$recent_event->{type}'";
1098 sub _mirror_item_new {
1099 my($self,
1100 $dst,
1102 $last_item,
1103 $recent_events,
1104 $recent_event,
1105 $dlcollector,
1106 $pathdb,
1107 $status,
1108 $error,
1109 $options,
1110 ) = @_;
1111 if ($self->verbose) {
1112 my $doing = -e $dst ? "Sync" : "Get";
1113 printf STDERR
1115 "%-4s %d (%d/%d/%s) %s ... ",
1116 $doing,
1117 time,
1118 1+$i,
1119 1+$last_item,
1120 $self->interval,
1121 $recent_event->{path},
1124 my $max_files_per_connection = $self->max_files_per_connection || 42;
1125 my $success;
1126 if ($self->verbose) {
1127 print STDERR "\n";
1129 push @$dlcollector, { rev => $recent_event, i => $i };
1130 if (@$dlcollector >= $max_files_per_connection) {
1131 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1132 my $sleep = $self->sleep_per_connection;
1133 $sleep = 0.42 unless defined $sleep;
1134 Time::HiRes::sleep $sleep;
1135 if ($options->{piecemeal}) {
1136 $status->{mustreturn} = 1;
1137 return;
1139 } else {
1140 return;
1142 if (!$success || $@) {
1143 warn "Warning: Error while mirroring: $@";
1144 push @$error, $@;
1145 sleep 1;
1147 if ($self->verbose) {
1148 print STDERR "DONE\n";
1152 sub _mirror_dlcollector {
1153 my($self,$xcoll,$pathdb,$recent_events) = @_;
1154 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1155 if ($pathdb) {
1156 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1158 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1159 @$xcoll = ();
1160 return $success;
1163 sub _mirror_register_path {
1164 my($self,$pathdb,$coll,$activity) = @_;
1165 my $time = time;
1166 for my $item (@$coll) {
1167 $pathdb->{$item->{path}} =
1169 recentepoch => $item->{epoch},
1170 ($activity."_on") => $time,
1175 sub _mirror_unhide_tempfile {
1176 my($self, $trecentfile) = @_;
1177 my $rfile = $self->rfile;
1178 if (rename $trecentfile, $rfile) {
1179 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1180 } else {
1181 require Carp;
1182 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1184 $self->_use_tempfile (0);
1185 if (my $ctfh = $self->_current_tempfile_fh) {
1186 $ctfh->unlink_on_destroy (0);
1187 $self->_current_tempfile_fh (undef);
1191 sub _mirror_perform_delayed_ops {
1192 my($self) = @_;
1193 my $delayed = $self->delayed_operations;
1194 for my $dst (keys %{$delayed->{unlink}}) {
1195 unless (unlink $dst) {
1196 require Carp;
1197 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1199 delete $delayed->{unlink}{$dst};
1201 for my $dst (keys %{$delayed->{rmdir}}) {
1202 unless (rmdir $dst) {
1203 require Carp;
1204 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1206 delete $delayed->{rmdir}{$dst};
1210 =head2 (void) $obj->mirror_loop
1212 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1213 What happens/should happen if we miss the interval during a single loop?
1215 =cut
1217 sub mirror_loop {
1218 my($self) = @_;
1219 my $iteration_start = time;
1221 my $Signal = 0;
1222 $SIG{INT} = sub { $Signal++ };
1223 my $loopinterval = $self->loopinterval || 42;
1224 my $after = -999999999;
1225 LOOP: while () {
1226 $self->mirror($after);
1227 last LOOP if $Signal;
1228 my $re = $self->recent_events;
1229 $after = $re->[0]{epoch};
1230 if ($self->verbose) {
1231 local $| = 1;
1232 print "($after)";
1234 if (time - $iteration_start < $loopinterval) {
1235 sleep $iteration_start + $loopinterval - time;
1237 if ($self->verbose) {
1238 local $| = 1;
1239 print "~";
1244 =head2 $success = $obj->mirror_path ( $arrref | $path )
1246 If the argument is a scalar it is treated as a path. The remote path
1247 is mirrored into the local copy. $path is the path found in the
1248 I<recentfile>, i.e. it is relative to the root directory of the
1249 mirror.
1251 If the argument is an array reference then all elements are treated as
1252 a path below the current tree and all are rsynced with a single
1253 command (and a single connection).
1255 =cut
1257 sub mirror_path {
1258 my($self,$path) = @_;
1259 # XXX simplify the two branches such that $path is treated as
1260 # [$path] maybe even demand the argument as an arrayref to
1261 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1262 # interface)
1263 if (ref $path and ref $path eq "ARRAY") {
1264 my $dst = $self->localroot;
1265 mkpath dirname $dst;
1266 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1267 lc $self->filenameroot,
1269 TMPDIR => 1,
1270 UNLINK => 0,
1272 for my $p (@$path) {
1273 print $fh $p, "\n";
1275 $fh->flush;
1276 $fh->unlink_on_destroy(1);
1277 my $gaveup = 0;
1278 my $retried = 0;
1279 while (!$self->rsync->exec
1281 src => join("/",
1282 $self->remoteroot,
1284 dst => $dst,
1285 'files-from' => $fh->filename,
1286 )) {
1287 my(@err) = $self->rsync->err;
1288 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1289 if ($self->verbose) {
1290 warn "Info: ignoring link_stat error '@err'";
1292 return 1;
1294 $self->register_rsync_error (@err);
1295 if (++$retried >= 3) {
1296 warn "XXX giving up.";
1297 $gaveup = 1;
1298 last;
1301 unless ($gaveup) {
1302 $self->un_register_rsync_error ();
1304 } else {
1305 my $dst = $self->local_path($path);
1306 mkpath dirname $dst;
1307 while (!$self->rsync->exec
1309 src => join("/",
1310 $self->remoteroot,
1311 $path
1313 dst => $dst,
1314 )) {
1315 my(@err) = $self->rsync->err;
1316 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1317 if ($self->verbose) {
1318 warn "Info: ignoring link_stat error '@err'";
1320 return 1;
1322 $self->register_rsync_error (@err);
1324 $self->un_register_rsync_error ();
1326 return 1;
1329 sub _my_current_rfile {
1330 my($self) = @_;
1331 my $rfile;
1332 if ($self->_use_tempfile) {
1333 $rfile = $self->_current_tempfile;
1334 } else {
1335 $rfile = $self->rfile;
1337 return $rfile;
1340 =head2 $path = $obj->naive_path_normalize ($path)
1342 Takes an absolute unix style path as argument and canonicalizes it to
1343 a shorter path if possible, removing things like double slashes or
1344 C</./> and removes references to C<../> directories to get a shorter
1345 unambiguos path. This is used to make the code easier that determines
1346 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1348 =cut
1350 sub naive_path_normalize {
1351 my($self,$path) = @_;
1352 $path =~ s|/+|/|g;
1353 1 while $path =~ s|/[^/]+/\.\./|/|;
1354 $path =~ s|/$||;
1355 $path;
1358 =head2 $ret = $obj->read_recent_1 ( $data )
1360 Delegate of C<recent_events()> on protocol 1
1362 =cut
1364 sub read_recent_1 {
1365 my($self, $data) = @_;
1366 return $data->{recent};
1369 =head2 $array_ref = $obj->recent_events ( %options )
1371 Note: the code relies on the resource being written atomically. We
1372 cannot lock because we may have no write access. If the caller has
1373 write access (eg. aggregate() or update()), it has to care for any
1374 necessary locking and it MUST write atomically.
1376 If C<$options{after}> is specified, only file events after this
1377 timestamp are returned.
1379 If C<$options{before}> is specified, only file events before this
1380 timestamp are returned.
1382 IF C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1383 will be returned.
1385 If C<$options{max}> is specified only a maximum of this many events is
1386 returned.
1388 If C<$options{contains}> is specified the value must be a hash
1389 reference containing a query. The query may contain the keys C<epoch>,
1390 C<path>, and C<type>. Each represents a condition that must be met. If
1391 there is more than one such key, the conditions are ANDed.
1393 If C<$options{info}> is specified, it must be a hashref. This hashref
1394 will be filled with metadata about the unfiltered recent_events of
1395 this object, in key C<first> there is the first item, in key C<last>
1396 is the last.
1398 =cut
1400 sub recent_events {
1401 my ($self, %options) = @_;
1402 my $info = $options{info};
1403 if ($self->is_slave) {
1404 # XXX seems dubious, might produce tempfiles without removing them?
1405 $self->get_remote_recentfile_as_tempfile;
1407 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1408 -e $rfile_or_tempfile or return [];
1409 my $suffix = $self->serializer_suffix;
1410 my ($data) = eval {
1411 $self->_try_deserialize
1413 $suffix,
1414 $rfile_or_tempfile,
1417 my $err = $@;
1418 if ($err or !$data) {
1419 return [];
1421 my $re;
1422 if (reftype $data eq 'ARRAY') { # protocol 0
1423 $re = $data;
1424 } else {
1425 $re = $self->_recent_events_protocol_x
1427 $data,
1428 $rfile_or_tempfile,
1431 return $re unless grep {defined $options{$_}} qw(after before contains max);
1432 $self->_recent_events_handle_options ($re, \%options);
1435 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1436 sub _recent_events_handle_options {
1437 my($self, $re, $options) = @_;
1438 my $last_item = $#$re;
1439 my $info = $options->{info};
1440 if ($info) {
1441 $info->{first} = $re->[0];
1442 $info->{last} = $re->[-1];
1444 if (defined $options->{after}) {
1445 if ($re->[0]{epoch} > $options->{after}) {
1446 if (
1447 my $f = first
1448 {$re->[$_]{epoch} <= $options->{after}}
1449 0..$#$re
1451 $last_item = $f-1;
1453 } else {
1454 $last_item = -1;
1457 my $first_item = 0;
1458 if (defined $options->{before}) {
1459 if ($re->[0]{epoch} > $options->{before}) {
1460 if (
1461 my $f = first
1462 {$re->[$_]{epoch} < $options->{before}}
1463 0..$last_item
1465 $first_item = $f;
1467 } else {
1468 $first_item = 0;
1471 if (0 != $first_item || -1 != $last_item) {
1472 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1474 if ($options->{'skip-deletes'}) {
1475 @$re = grep { $_->{type} ne "delete" } @$re;
1477 if (my $contopt = $options->{contains}) {
1478 my $seen_allowed = 0;
1479 for my $allow (qw(epoch path type)) {
1480 if (exists $contopt->{$allow}) {
1481 $seen_allowed++;
1482 my $v = $contopt->{$allow};
1483 @$re = grep { $_->{$allow} eq $v } @$re;
1486 if (keys %$contopt > $seen_allowed) {
1487 require Carp;
1488 Carp::confess
1489 (sprintf "unknown query: %s", join ", ", %$contopt);
1492 if ($options->{max} && @$re > $options->{max}) {
1493 @$re = splice @$re, 0, $options->{max};
1495 $re;
1498 sub _recent_events_protocol_x {
1499 my($self,
1500 $data,
1501 $rfile_or_tempfile,
1502 ) = @_;
1503 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1504 # we may be reading meta for the first time
1505 while (my($k,$v) = each %{$data->{meta}}) {
1506 next if $k ne lc $k; # "Producers"
1507 next if defined $self->$k;
1508 $self->$k($v);
1510 my $re = $self->$meth ($data);
1511 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1512 my $minmax = { mtime => $stat[9] };
1513 if (@$re) {
1514 $minmax->{min} = $re->[-1]{epoch};
1515 $minmax->{max} = $re->[0]{epoch};
1517 $self->minmax ( $minmax );
1518 return $re;
1521 sub _try_deserialize {
1522 my($self,
1523 $suffix,
1524 $rfile_or_tempfile,
1525 ) = @_;
1526 if ($suffix eq ".yaml") {
1527 require YAML::Syck;
1528 YAML::Syck::LoadFile($rfile_or_tempfile);
1529 } elsif ($HAVE->{"Data::Serializer"}) {
1530 my $serializer = Data::Serializer->new
1531 ( serializer => $serializers{$suffix} );
1532 my $serialized = do
1534 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1535 local $/;
1536 <$fh>;
1538 $serializer->raw_deserialize($serialized);
1539 } else {
1540 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1544 sub _refresh_internals {
1545 my($self, $dst) = @_;
1546 my $class = ref $self;
1547 my $rfpeek = $class->new_from_file ($dst);
1548 for my $acc (qw(
1549 _merged
1550 minmax
1551 )) {
1552 $self->$acc ( $rfpeek->$acc );
1554 my $old_dirtymark = $self->dirtymark;
1555 my $new_dirtymark = $rfpeek->dirtymark;
1556 if ($old_dirtymark && $new_dirtymark && _bigfloatgt($new_dirtymark,$old_dirtymark)) {
1557 $self->done->reset;
1558 $self->dirtymark ( $new_dirtymark );
1559 $self->seed;
1563 =head2 $ret = $obj->rfilename
1565 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1566 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1568 =cut
1570 sub rfilename {
1571 my($self) = @_;
1572 my $file = sprintf("%s-%s%s",
1573 $self->filenameroot,
1574 $self->interval,
1575 $self->serializer_suffix,
1577 return $file;
1580 =head2 $str = $self->remote_dir
1582 The directory we are mirroring from.
1584 =cut
1586 sub remote_dir {
1587 my($self, $set) = @_;
1588 if (defined $set) {
1589 $self->_remote_dir ($set);
1591 my $x = $self->_remote_dir;
1592 $self->is_slave (1);
1593 return $x;
1596 =head2 $str = $obj->remoteroot
1598 =head2 (void) $obj->remoteroot ( $set )
1600 Get/Set the composed prefix needed when rsyncing from a remote module.
1601 If remote_host, remote_module, and remote_dir are set, it is composed
1602 from these.
1604 =cut
1606 sub remoteroot {
1607 my($self, $set) = @_;
1608 if (defined $set) {
1609 $self->_remoteroot($set);
1611 my $remoteroot = $self->_remoteroot;
1612 unless (defined $remoteroot) {
1613 $remoteroot = sprintf
1615 "%s%s%s",
1616 defined $self->remote_host ? ($self->remote_host."::") : "",
1617 defined $self->remote_module ? ($self->remote_module."/") : "",
1618 defined $self->remote_dir ? $self->remote_dir : "",
1620 $self->_remoteroot($remoteroot);
1622 return $remoteroot;
1625 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1627 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1628 of the pattern
1630 $filenameroot-$interval$serializer_suffix
1632 e.g.
1634 RECENT-1M.yaml
1636 This filename is split into its parts and the parts are fed to the
1637 object itself.
1639 =cut
1641 sub resolve_recentfilename {
1642 my($self, $rfname) = @_;
1643 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1644 if (my($f,$i,$s) = $rfname =~ $splitter) {
1645 $self->filenameroot ($f);
1646 $self->interval ($i);
1647 $self->serializer_suffix ($s);
1648 } else {
1649 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1651 return;
1654 =head2 my $rfile = $obj->rfile
1656 Returns the full path of the I<recentfile>
1658 =cut
1660 sub rfile {
1661 my($self) = @_;
1662 my $rfile = $self->_rfile;
1663 return $rfile if defined $rfile;
1664 $rfile = File::Spec->catfile
1665 ($self->localroot,
1666 $self->rfilename,
1668 $self->_rfile ($rfile);
1669 return $rfile;
1672 =head2 $rsync_obj = $obj->rsync
1674 The File::Rsync object that this object uses for communicating with an
1675 upstream server.
1677 =cut
1679 sub rsync {
1680 my($self) = @_;
1681 my $rsync = $self->_rsync;
1682 unless (defined $rsync) {
1683 my $rsync_options = $self->rsync_options || {};
1684 if ($HAVE->{"File::Rsync"}) {
1685 $rsync = File::Rsync->new($rsync_options);
1686 $self->_rsync($rsync);
1687 } else {
1688 die "File::Rsync required for rsync operations. Cannot continue";
1691 return $rsync;
1694 =head2 (void) $obj->register_rsync_error(@err)
1696 =head2 (void) $obj->un_register_rsync_error()
1698 Register_rsync_error is called whenever the File::Rsync object fails
1699 on an exec (say, connection doesn't succeed). It issues a warning and
1700 sleeps for an increasing amount of time. Un_register_rsync_error
1701 resets the error count. See also accessor C<max_rsync_errors>.
1703 =cut
1706 my $no_success_count = 0;
1707 my $no_success_time = 0;
1708 sub register_rsync_error {
1709 my($self, @err) = @_;
1710 chomp @err;
1711 $no_success_time = time;
1712 $no_success_count++;
1713 my $max_rsync_errors = $self->max_rsync_errors;
1714 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1715 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1716 require Carp;
1717 Carp::confess
1719 sprintf
1721 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1722 $self->interval,
1723 join(" ",@err),
1724 $no_success_count,
1727 my $sleep = 12 * $no_success_count;
1728 $sleep = 300 if $sleep > 300;
1729 require Carp;
1730 Carp::cluck
1731 (sprintf
1733 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1734 scalar(localtime($no_success_time)),
1735 $self->interval,
1736 join(" ",@err),
1737 $sleep,
1739 sleep $sleep
1741 sub un_register_rsync_error {
1742 my($self) = @_;
1743 $no_success_time = 0;
1744 $no_success_count = 0;
1748 =head2 $clone = $obj->_sparse_clone
1750 Clones just as much from itself that it does not hurt. Experimental
1751 method.
1753 Note: what fits better: sparse or shallow? Other suggestions?
1755 =cut
1757 sub _sparse_clone {
1758 my($self) = @_;
1759 my $new = bless {}, ref $self;
1760 for my $m (qw(
1761 _interval
1762 _localroot
1763 _remoteroot
1764 _rfile
1765 _use_tempfile
1766 aggregator
1767 filenameroot
1768 is_slave
1769 max_files_per_connection
1770 protocol
1771 rsync_options
1772 serializer_suffix
1773 sleep_per_connection
1774 verbose
1775 )) {
1776 my $o = $self->$m;
1777 $o = Storable::dclone $o if ref $o;
1778 $new->$m($o);
1780 $new;
1783 =head2 $boolean = OBJ->ttl_reached ()
1785 =cut
1787 sub ttl_reached {
1788 my($self) = @_;
1789 my $have_mirrored = $self->have_mirrored || 0;
1790 my $now = Time::HiRes::time;
1791 my $ttl = $self->ttl;
1792 $ttl = 24.2 unless defined $ttl;
1793 if ($now > $have_mirrored + $ttl) {
1794 return 1;
1796 return 0;
1799 =head2 (void) $obj->unlock()
1801 Unlocking is implemented with an C<rmdir> on a locking directory
1802 (C<.lock> appended to $rfile).
1804 =cut
1806 sub unlock {
1807 my($self) = @_;
1808 return unless $self->_is_locked;
1809 my $rfile = $self->rfile;
1810 rmdir "$rfile.lock";
1811 $self->_is_locked (0);
1814 =head2 unseed
1816 Sets this recentfile in the state of not 'seeded'.
1818 =cut
1819 sub unseed {
1820 my($self) = @_;
1821 $self->seeded(0);
1824 =head2 $ret = $obj->update ($path, $type)
1826 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1828 =head2 $ret = $obj->update ()
1830 Enter one file into the local I<recentfile>. $path is the (usually
1831 absolute) path. If the path is outside I<our> tree, then it is
1832 ignored.
1834 C<$type> is one of C<new> or C<delete>.
1836 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1837 not used and the epoch is calculated by the update() routine itself
1838 based on current time. But if there is the demand to insert a
1839 not-so-current file into the dataset, then the caller sets
1840 $dirty_epoch. This causes the epoch of the registered event to become
1841 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1842 bit more. As compensation the dirtymark of the whole dataset is set to
1843 now or the current epoch, whichever is higher. Note: setting the
1844 dirty_epoch to the future is prohibited as it's very unlikely to be
1845 intended: it definitely might wreak havoc with the index files.
1847 The new file event is unshifted (or, if dirty_epoch is set, inserted
1848 at the place it belongs to, according to the rule to have a sequence
1849 of strictly decreasing timestamps) to the array of recent_events and
1850 the array is shortened to the length of the timespan allowed. This is
1851 usually the timespan specified by the interval of this recentfile but
1852 as long as this recentfile has not been merged to another one, the
1853 timespan may grow without bounds.
1855 The third form runs an update without inserting a new file. This may
1856 be desired to truncate a recentfile.
1858 =cut
1859 sub _epoch_monotonically_increasing {
1860 my($self,$epoch,$recent) = @_;
1861 return $epoch unless @$recent; # the first one goes unoffended
1862 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1863 return $epoch;
1864 } else {
1865 return _increase_a_bit($recent->[0]{epoch});
1868 sub update {
1869 my($self,$path,$type,$dirty_epoch) = @_;
1870 if (defined $path or defined $type or defined $dirty_epoch) {
1871 die "update called without path argument" unless defined $path;
1872 die "update called without type argument" unless defined $type;
1873 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1874 # since we have keep_delete_objects_forever we must let them inject delete objects too:
1875 #die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1876 # "dirty_epoch only allowed with type=new" if defined $dirty_epoch and $type ne "new";
1877 my $canonmeth = $self->canonize;
1878 unless ($canonmeth) {
1879 $canonmeth = "naive_path_normalize";
1881 $path = $self->$canonmeth($path);
1883 my $lrd = $self->localroot;
1884 $self->lock;
1885 # you must calculate the time after having locked, of course
1886 my $now = Time::HiRes::time;
1887 my $interval = $self->interval;
1888 my $secs = $self->interval_secs();
1889 my $recent = $self->recent_events;
1891 my $epoch;
1892 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
1893 $epoch = $dirty_epoch;
1894 } else {
1895 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
1898 $recent ||= [];
1899 my $oldest_allowed = 0;
1900 my $merged = $self->merged;
1901 if ($merged->{epoch}) {
1902 my $virtualnow = _bigfloatmax($now,$epoch);
1903 # for the lower bound I think we need no big math, we calc already
1904 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
1905 } else {
1906 # as long as we are not merged at all, no limits!
1908 my $something_done = 0;
1909 TRUNCATE: while (@$recent) {
1910 # $DB::single++ unless defined $oldest_allowed;
1911 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
1912 pop @$recent;
1913 $something_done = 1;
1914 } else {
1915 last TRUNCATE;
1918 if (defined $path && $path =~ s|^\Q$lrd\E||) {
1919 $path =~ s|^/||;
1920 my $splicepos;
1921 # remove the older duplicates of this $path, irrespective of $type:
1922 if (defined $dirty_epoch) {
1923 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
1924 $recent = $ctx->{recent};
1925 $splicepos = $ctx->{splicepos};
1926 $epoch = $ctx->{epoch};
1927 my $dirtymark = $self->dirtymark;
1928 my $new_dm = $now;
1929 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
1930 $new_dm = $epoch;
1932 $self->dirtymark($new_dm);
1933 my $merged = $self->merged;
1934 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
1935 $self->merged(+{});
1937 } else {
1938 $recent = [ grep { $_->{path} ne $path } @$recent ];
1939 $splicepos = 0;
1941 if (defined $splicepos) {
1942 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
1944 $something_done = 1;
1947 $self->write_recent($recent) if $something_done;
1948 $self->_assert_symlink;
1949 $self->unlock;
1952 sub _update_with_dirty_epoch {
1953 my($self,$path,$recent,$epoch) = @_;
1954 my $splicepos;
1955 my $new_recent = [];
1956 if (grep { $_->{path} ne $path } @$recent) {
1957 my $cancel = 0;
1958 KNOWN_EVENT: for my $i (0..$#$recent) {
1959 if ($recent->[$i]{path} eq $path) {
1960 if ($recent->[$i]{epoch} eq $epoch) {
1961 # nothing to do
1962 $cancel = 1;
1963 last KNOWN_EVENT;
1965 } else {
1966 push @$new_recent, $recent->[$i];
1969 @$recent = @$new_recent unless $cancel;
1971 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
1972 $splicepos = 0;
1973 } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
1974 $splicepos = @$recent;
1975 } else {
1976 RECENT: for my $i (0..$#$recent) {
1977 my $ev = $recent->[$i];
1978 if ($epoch eq $recent->[$i]{epoch}) {
1979 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
1981 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
1982 $splicepos = $i;
1983 last RECENT;
1987 return {
1988 recent => $recent,
1989 splicepos => $splicepos,
1990 epoch => $epoch,
1994 =head2 seed
1996 Sets this recentfile in the state of 'seeded' which means it has to
1997 re-evaluate its uptodateness.
1999 =cut
2000 sub seed {
2001 my($self) = @_;
2002 $self->seeded(1);
2005 =head2 seeded
2007 Tells if the recentfile is in the state 'seeded'.
2009 =cut
2010 sub seeded {
2011 my($self, $set) = @_;
2012 if (defined $set) {
2013 $self->_seeded ($set);
2015 my $x = $self->_seeded;
2016 unless (defined $x) {
2017 $x = 0;
2018 $self->_seeded ($x);
2020 return $x;
2023 =head2 uptodate
2025 True if this object has mirrored the complete interval covered by the
2026 current recentfile.
2028 =cut
2029 sub uptodate {
2030 my($self) = @_;
2031 my $uptodate;
2032 my $why;
2033 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2034 $why = "saturated";
2035 $uptodate = 1;
2037 # it's too easy to misconfigure ttl and related timings and then
2038 # never reach uptodateness, so disabled 2009-03-22
2039 if (0 and not defined $uptodate) {
2040 if ($self->ttl_reached){
2041 $why = "ttl_reached returned true, so we are not uptodate";
2042 $uptodate = 0 ;
2045 unless (defined $uptodate) {
2046 # look if recentfile has unchanged timestamp
2047 my $minmax = $self->minmax;
2048 if (exists $minmax->{mtime}) {
2049 my $rfile = $self->_my_current_rfile;
2050 my @stat = stat $rfile;
2051 my $mtime = $stat[9];
2052 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
2053 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2054 $uptodate = 0;
2055 } else {
2056 my $covered = $self->done->covered(@$minmax{qw(max min)});
2057 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2058 $uptodate = $covered;
2062 unless (defined $uptodate) {
2063 $why = "fallthrough, so not uptodate";
2064 $uptodate = 0;
2066 if ($uptodate) {
2067 $self->_uptodateness_ever_reached(1);
2069 my $remember =
2071 uptodate => $uptodate,
2072 why => $why,
2074 $self->_remember_last_uptodate_call($remember);
2075 return $uptodate;
2078 =head2 $obj->write_recent ($recent_files_arrayref)
2080 Writes a I<recentfile> based on the current reflection of the current
2081 state of the tree limited by the current interval.
2083 =cut
2084 sub _resort {
2085 my($self) = @_;
2086 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
2087 return;
2089 sub write_recent {
2090 my ($self,$recent) = @_;
2091 die "write_recent called without argument" unless defined $recent;
2092 my $Last_epoch;
2093 SANITYCHECK: for my $i (0..$#$recent) {
2094 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2095 require Carp;
2096 Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2097 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2098 # you may want to:
2099 # $self->_resort($recent);
2100 # last SANITYCHECK;
2102 $Last_epoch = $recent->[$i]{epoch};
2104 my $minmax = $self->minmax;
2105 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2106 $minmax->{max} = $recent->[0]{epoch};
2108 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2109 $minmax->{min} = $recent->[-1]{epoch};
2111 $self->minmax($minmax);
2112 my $meth = sprintf "write_%d", $self->protocol;
2113 $self->$meth($recent);
2116 =head2 $obj->write_0 ($recent_files_arrayref)
2118 Delegate of C<write_recent()> on protocol 0
2120 =cut
2122 sub write_0 {
2123 my ($self,$recent) = @_;
2124 my $rfile = $self->rfile;
2125 YAML::Syck::DumpFile("$rfile.new",$recent);
2126 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2129 =head2 $obj->write_1 ($recent_files_arrayref)
2131 Delegate of C<write_recent()> on protocol 1
2133 =cut
2135 sub write_1 {
2136 my ($self,$recent) = @_;
2137 my $rfile = $self->rfile;
2138 my $suffix = $self->serializer_suffix;
2139 my $data = {
2140 meta => $self->meta_data,
2141 recent => $recent,
2143 my $serialized;
2144 if ($suffix eq ".yaml") {
2145 $serialized = YAML::Syck::Dump($data);
2146 } elsif ($HAVE->{"Data::Serializer"}) {
2147 my $serializer = Data::Serializer->new
2148 ( serializer => $serializers{$suffix} );
2149 $serialized = $serializer->raw_serialize($data);
2150 } else {
2151 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2153 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2154 print $fh $serialized;
2155 close $fh or die "Could not close '$rfile.new': $!";
2156 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2159 BEGIN {
2160 my @pod_lines =
2161 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2163 =head1 SERIALIZERS
2165 The following suffixes are supported and trigger the use of these
2166 serializers:
2168 =over 4
2170 =item C<< ".yaml" => "YAML::Syck" >>
2172 =item C<< ".json" => "JSON" >>
2174 =item C<< ".sto" => "Storable" >>
2176 =item C<< ".dd" => "Data::Dumper" >>
2178 =back
2180 =cut
2182 BEGIN {
2183 my @pod_lines =
2184 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2186 =head1 INTERVAL SPEC
2188 An interval spec is a primitive way to express time spans. Normally it
2189 is composed from an integer and a letter.
2191 As a special case, a string that consists only of the single letter
2192 C<Z>, stands for unlimited time.
2194 The following letters express the specified number of seconds:
2196 =over 4
2198 =item C<< s => 1 >>
2200 =item C<< m => 60 >>
2202 =item C<< h => 60*60 >>
2204 =item C<< d => 60*60*24 >>
2206 =item C<< W => 60*60*24*7 >>
2208 =item C<< M => 60*60*24*30 >>
2210 =item C<< Q => 60*60*24*90 >>
2212 =item C<< Y => 60*60*24*365.25 >>
2214 =back
2216 =cut
2218 =head1 SEE ALSO
2220 L<File::Rsync::Mirror::Recent>,
2221 L<File::Rsync::Mirror::Recentfile::Done>,
2222 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2224 =head1 BUGS
2226 Please report any bugs or feature requests through the web interface
2228 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2229 I will be notified, and then you'll automatically be notified of
2230 progress on your bug as I make changes.
2232 =head1 KNOWN BUGS
2234 Memory hungry: it seems all memory is allocated during the initial
2235 rsync where a list of all files is maintained in memory.
2237 =head1 SUPPORT
2239 You can find documentation for this module with the perldoc command.
2241 perldoc File::Rsync::Mirror::Recentfile
2243 You can also look for information at:
2245 =over 4
2247 =item * RT: CPAN's request tracker
2249 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2251 =item * AnnoCPAN: Annotated CPAN documentation
2253 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2255 =item * CPAN Ratings
2257 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2259 =item * Search CPAN
2261 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2263 =back
2266 =head1 ACKNOWLEDGEMENTS
2268 Thanks to RJBS for module-starter.
2270 =head1 AUTHOR
2272 Andreas König
2274 =head1 COPYRIGHT & LICENSE
2276 Copyright 2008,2009 Andreas König.
2278 This program is free software; you can redistribute it and/or modify it
2279 under the same terms as Perl itself.
2282 =cut
2284 1; # End of File::Rsync::Mirror::Recentfile
2286 # Local Variables:
2287 # mode: cperl
2288 # cperl-indent-level: 4
2289 # End: