sanity check and tempfile cleanup when we return early from mirror() due to mustreturn
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob7c539d810a1a289b7e20d6faa7c13bc0b73cd7e7
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =cut
14 my $HAVE = {};
15 for my $package (
16 "Data::Serializer",
17 "File::Rsync"
18 ) {
19 $HAVE->{$package} = eval qq{ require $package; };
21 use Config;
22 use File::Basename qw(basename dirname fileparse);
23 use File::Copy qw(cp);
24 use File::Path qw(mkpath);
25 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
26 use File::Temp;
27 use List::Util qw(first max min);
28 use Scalar::Util qw(reftype);
29 use Storable;
30 use Time::HiRes qw();
31 use YAML::Syck;
33 use version; our $VERSION = qv('0.0.1');
35 use constant MAX_INT => ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL => 1;
38 # cf. interval_secs
39 my %seconds;
41 # maybe subclass if this mapping is bad?
42 my %serializers;
44 =head1 SYNOPSIS
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
51 interval => q(6h),
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
59 Reader/mirrorer:
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
64 ignore_link_stat_errors => 1,
65 interval => q(6h),
66 localroot => "/home/ftp/pub/PAUSE/authors",
67 remote_dir => "",
68 remote_host => "pause.perl.org",
69 remote_module => "authors",
70 rsync_options => {
71 compress => 1,
72 'rsync-path' => '/usr/bin/rsync',
73 links => 1,
74 times => 1,
75 'omit-dir-times' => 1,
76 checksum => 1,
78 verbose => 1,
80 $rf->mirror;
82 Aggregator (usually the writer):
84 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
85 $rf->aggregate;
87 =head1 DESCRIPTION
89 Lower level than F:R:M:Recent. Handles only one recentfile whereas a
90 tree always is composed of several recentfiles. The single recentfile
91 has to do the bookkeeping for a timeslice.
93 =head1 EXPORT
95 No exports.
97 =head1 CONSTRUCTORS / DESTRUCTOR
99 =head2 my $obj = CLASS->new(%hash)
101 Constructor. On every argument pair the key is a method name and the
102 value is an argument to that method name.
104 If a recentfile for this resource already exists, metadata that are
105 not defined by the constructor will be fetched from there as soon as
106 it is being read by recent_events().
108 =cut
110 sub new {
111 my($class, @args) = @_;
112 my $self = bless {}, $class;
113 while (@args) {
114 my($method,$arg) = splice @args, 0, 2;
115 $self->$method($arg);
117 unless (defined $self->protocol) {
118 $self->protocol(DEFAULT_PROTOCOL);
120 unless (defined $self->filenameroot) {
121 $self->filenameroot("RECENT");
123 unless (defined $self->serializer_suffix) {
124 $self->serializer_suffix(".yaml");
126 return $self;
129 =head2 my $obj = CLASS->new_from_file($file)
131 Constructor. $file is a I<recentfile>.
133 =cut
135 sub new_from_file {
136 my($class, $file) = @_;
137 my $self = bless {}, $class;
138 $self->_rfile($file);
139 #?# $self->lock;
140 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
141 local $/;
142 <$fh>;
144 # XXX: we can skip this step when the metadata are sufficient, but
145 # we cannot parse the file without some magic stuff about
146 # serialized formats
147 while (-l $file) {
148 my($name,$path) = fileparse $file;
149 my $symlink = readlink $file;
150 if ($symlink =~ m|/|) {
151 die "FIXME: filenames containing '/' not supported, got $symlink";
153 $file = File::Spec->catfile ( $path, $symlink );
155 my($name,$path,$suffix) = fileparse $file, keys %serializers;
156 $self->serializer_suffix($suffix);
157 $self->localroot($path);
158 die "Could not determine file format from suffix" unless $suffix;
159 my $deserialized;
160 if ($suffix eq ".yaml") {
161 require YAML::Syck;
162 $deserialized = YAML::Syck::LoadFile($file);
163 } elsif ($HAVE->{"Data::Serializer"}) {
164 my $serializer = Data::Serializer->new
165 ( serializer => $serializers{$suffix} );
166 $deserialized = $serializer->raw_deserialize($serialized);
167 } else {
168 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
170 while (my($k,$v) = each %{$deserialized->{meta}}) {
171 next if $k ne lc $k; # "Producers"
172 $self->$k($v);
174 unless (defined $self->protocol) {
175 $self->protocol(DEFAULT_PROTOCOL);
177 return $self;
180 =head2 DESTROY
182 A simple unlock.
184 =cut
185 sub DESTROY { shift->unlock }
187 =head1 ACCESSORS
189 =cut
191 my @accessors;
193 BEGIN {
194 @accessors = (
195 "_current_tempfile",
196 "_current_tempfile_fh",
197 "_delayed_operations",
198 "_done",
199 "_interval",
200 "_is_locked",
201 "_localroot",
202 "_merged",
203 "_pathdb",
204 "_remember_last_uptodate_call",
205 "_remote_dir",
206 "_remoteroot",
207 "_rfile",
208 "_rsync",
209 "_seeded",
210 "_uptodateness_ever_reached",
211 "_use_tempfile",
214 my @pod_lines =
215 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
217 =over 4
219 =item aggregator
221 A list of interval specs that tell the aggregator which I<recentfile>s
222 are to be produced.
224 =item canonize
226 The name of a method to canonize the path before rsyncing. Only
227 supported value is C<naive_path_normalize>. Defaults to that.
229 =item comment
231 A comment about this tree and setup.
233 =item dirtymark
235 A timestamp. The dirtymark is updated whenever an out of band change
236 on the origin server is performed that violates the protocol. Say,
237 they add or remove files in the middle somewhere. Slaves must react
238 with a devaluation of their C<done> structure which then leads to a
239 full re-sync of all files. Implementation note: dirtymark may increase
240 or decrease.
242 =item filenameroot
244 The (prefix of the) filename we use for this I<recentfile>. Defaults to
245 C<RECENT>. The string must not contain a directory separator.
247 =item have_mirrored
249 Timestamp remembering when we mirrored this recentfile the last time.
250 Only relevant for slaves.
252 =item ignore_link_stat_errors
254 If set to true, rsync errors are ignored that complain about link stat
255 errors. These seem to happen only when there are files missing at the
256 origin. In race conditions this can always happen, so it is
257 recommended to set this value to true.
259 =item is_slave
261 If set to true, this object will fetch a new recentfile from remote
262 when the timespan between the last mirror (see have_mirrored) and now
263 is too large (see C<ttl>).
265 =item keep_delete_objects_forever
267 The default for delete events is that they are passed through the
268 collection of recentfile objects until they reach the Z file. There
269 they get dropped so that the associated file object ceases to exist at
270 all. By setting C<keep_delete_objects_forever> the delete objects are
271 kept forever. This makes the Z file larger but has the advantage that
272 slaves that have interrupted mirroring for a long time still can clean
273 up their copy.
275 =item locktimeout
277 After how many seconds shall we die if we cannot lock a I<recentfile>?
278 Defaults to 600 seconds.
280 =item loopinterval
282 When mirror_loop is called, this accessor can specify how much time
283 every loop shall at least take. If the work of a loop is done before
284 that time has gone, sleeps for the rest of the time. Defaults to
285 arbitrary 42 seconds.
287 =item max_files_per_connection
289 Maximum number of files that are transferred on a single rsync call.
290 Setting it higher means higher performance at the price of holding
291 connections longer and potentially disturbing other users in the pool.
292 Defaults to the arbitrary value 42.
294 =item max_rsync_errors
296 When rsync operations encounter that many errors without any resetting
297 success in between, then we die. Defaults to unlimited. A value of
298 -1 means we run forever ignoring all rsync errors.
300 =item minmax
302 Hashref remembering when we read the recent_events from this file the
303 last time and what the timespan was.
305 =item protocol
307 When the RECENT file format changes, we increment the protocol. We try
308 to support older protocols in later releases.
310 =item remote_host
312 The host we are mirroring from. Leave empty for the local filesystem.
314 =item remote_module
316 Rsync servers have so called modules to separate directory trees from
317 each other. Put here the name of the module under which we are
318 mirroring. Leave empty for local filesystem.
320 =item rsync_options
322 Things like compress, links, times or checksums. Passed in to the
323 File::Rsync object used to run the mirror.
325 =item serializer_suffix
327 Mostly untested accessor. The only well tested format for
328 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
329 Data::Serializer. But in principle other formats are supported as
330 well. See section SERIALIZERS below.
332 =item sleep_per_connection
334 Sleep that many seconds (floating point OK) after every chunk of rsyncing
335 has finished. Defaults to arbitrary 0.42.
337 =item ttl
339 Time to live. Number of seconds after which this recentfile must be
340 fetched again from the origin server. Only relevant for slaves.
341 Defaults to arbitrary 24.2 seconds.
343 =item verbose
345 Boolean to turn on a bit verbosity.
347 =back
349 =cut
351 use accessors @accessors;
353 =head1 METHODS
355 =head2 (void) $obj->aggregate( %options )
357 Takes all intervals that are collected in the accessor called
358 aggregator. Sorts them by actual length of the interval.
359 Removes those that are shorter than our own interval. Then merges this
360 object into the next larger object. The merging continues upwards
361 as long as the next I<recentfile> is old enough to warrant a merge.
363 If a merge is warranted is decided according to the interval of the
364 previous interval so that larger files are not so often updated as
365 smaller ones. If $options{force} is true, all files get updated.
367 Here is an example to illustrate the behaviour. Given aggregators
369 1h 1d 1W 1M 1Q 1Y Z
371 then
373 1h updates 1d on every call to aggregate()
374 1d updates 1W earliest after 1h
375 1W updates 1M earliest after 1d
376 1M updates 1Q earliest after 1W
377 1Q updates 1Y earliest after 1M
378 1Y updates Z earliest after 1Q
380 Note that all but the smallest recentfile get updated at an arbitrary
381 rate and as such are quite useless on their own.
383 =cut
385 sub aggregate {
386 my($self, %option) = @_;
387 my @aggs = sort { $a->{secs} <=> $b->{secs} }
388 grep { $_->{secs} >= $self->interval_secs }
389 map { { interval => $_, secs => $self->interval_secs($_)} }
390 $self->interval, @{$self->aggregator || []};
391 $self->update;
392 $aggs[0]{object} = $self;
393 AGGREGATOR: for my $i (0..$#aggs-1) {
394 my $this = $aggs[$i]{object};
395 my $next = $this->_sparse_clone;
396 $next->interval($aggs[$i+1]{interval});
397 my $want_merge = 0;
398 if ($option{force} || $i == 0) {
399 $want_merge = 1;
400 } else {
401 my $next_rfile = $next->rfile;
402 if (-e $next_rfile) {
403 my $prev = $aggs[$i-1]{object};
404 local $^T = time;
405 my $next_age = 86400 * -M $next_rfile;
406 if ($next_age > $prev->interval_secs) {
407 $want_merge = 1;
409 } else {
410 $want_merge = 1;
413 if ($want_merge) {
414 $next->merge($this);
415 $aggs[$i+1]{object} = $next;
416 } else {
417 last AGGREGATOR;
422 # collect file size and mtime for all files of this aggregate
423 sub _debug_aggregate {
424 my($self) = @_;
425 my @aggs = sort { $a->{secs} <=> $b->{secs} }
426 map { { interval => $_, secs => $self->interval_secs($_)} }
427 $self->interval, @{$self->aggregator || []};
428 my $report = [];
429 for my $i (0..$#aggs) {
430 my $this = Storable::dclone $self;
431 $this->interval($aggs[$i]{interval});
432 my $rfile = $this->rfile;
433 my @stat = stat $rfile;
434 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
436 $report;
439 # (void) $self->_assert_symlink()
440 sub _assert_symlink {
441 my($self) = @_;
442 my $recentrecentfile = File::Spec->catfile
444 $self->localroot,
445 sprintf
447 "%s.recent",
448 $self->filenameroot
451 if ($Config{d_symlink} eq "define") {
452 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
453 if (-l $recentrecentfile) {
454 my $found_symlink = readlink $recentrecentfile;
455 if ($found_symlink eq $self->rfilename) {
456 return;
457 } else {
458 $howto_create_symlink = 2;
460 } else {
461 $howto_create_symlink = 1;
463 if (1 == $howto_create_symlink) {
464 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
465 } else {
466 unlink "$recentrecentfile.$$"; # may fail
467 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
468 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
470 } else {
471 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
472 unlink "$recentrecentfile.$$"; # may fail
473 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
474 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
478 =head2 $hashref = $obj->delayed_operations
480 A hash of hashes containing unlink and rmdir operations which had to
481 wait until the recentfile got unhidden in order to not confuse
482 downstream mirrors (in case we have some).
484 =cut
486 sub delayed_operations {
487 my($self) = @_;
488 my $x = $self->_delayed_operations;
489 unless (defined $x) {
490 $x = {
491 unlink => {},
492 rmdir => {},
494 $self->_delayed_operations ($x);
496 return $x;
499 =head2 $done = $obj->done
501 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
502 object that keeps track of rsync activities. Only needed and used when
503 we are a mirroring slave.
505 =cut
507 sub done {
508 my($self) = @_;
509 my $done = $self->_done;
510 if (!$done) {
511 require File::Rsync::Mirror::Recentfile::Done;
512 $done = File::Rsync::Mirror::Recentfile::Done->new();
513 $done->_rfinterval ($self->interval);
514 $self->_done ( $done );
516 return $done;
519 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
521 Stores the remote I<recentfile> locally as a tempfile. The caller is
522 responsible to remove the file after use.
524 Note: if you're intending to act as an rsync server for other slaves,
525 then you must prefer this method to fetch that file with
526 get_remotefile(). Otherwise downstream mirrors would expect you to
527 already have mirrored all the files that are in the I<recentfile>
528 before you have them mirrored.
530 =cut
532 sub get_remote_recentfile_as_tempfile {
533 my($self) = @_;
534 mkpath $self->localroot;
535 my $fh;
536 my $trfilename;
537 if ( $self->_use_tempfile() ) {
538 return $self->_current_tempfile if ! $self->ttl_reached;
539 $fh = $self->_current_tempfile_fh;
540 $trfilename = $self->rfilename;
541 } else {
542 $trfilename = $self->rfilename;
545 my $dst;
546 if ($fh) {
547 $dst = $self->_current_tempfile;
548 } else {
549 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
550 $dst = $fh->filename;
551 $self->_current_tempfile ($dst);
552 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
553 if (defined $rfile && -e $rfile) {
554 # saving on bandwidth. Might need to be configurable
555 # $self->bandwidth_is_cheap?
556 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
559 my $src = join ("/",
560 $self->remoteroot,
561 $trfilename,
563 if ($self->verbose) {
564 my $doing = -e $dst ? "Sync" : "Get";
565 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
566 printf STDERR
568 "%-4s %d (1/1/%s) temp %s ... ",
569 $doing,
570 time,
571 $self->interval,
572 $display_dst,
575 my $gaveup = 0;
576 my $retried = 0;
577 while (!$self->rsync->exec(
578 src => $src,
579 dst => $dst,
580 )) {
581 $self->register_rsync_error ($self->rsync->err);
582 if (++$retried >= 3) {
583 warn "XXX giving up";
584 $gaveup = 1;
585 last;
588 if ($gaveup) {
589 printf STDERR "Warning: gave up mirroring %s, will try again later", $self->interval;
590 } else {
591 $self->_refresh_internals ($dst);
592 $self->have_mirrored (Time::HiRes::time);
593 $self->un_register_rsync_error ();
595 $self->unseed;
596 if ($self->verbose) {
597 print STDERR "DONE\n";
599 my $mode = 0644;
600 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
601 return $dst;
604 sub _get_remote_rat_provide_tempfile_object {
605 my($self, $trfilename) = @_;
606 my $fh = File::Temp->new
607 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
608 $trfilename,
610 DIR => $self->localroot,
611 SUFFIX => $self->serializer_suffix,
612 UNLINK => $self->_use_tempfile,
614 if ($self->_use_tempfile) {
615 $self->_current_tempfile_fh ($fh); # delay self destruction
617 return $fh;
620 =head2 $localpath = $obj->get_remotefile ( $relative_path )
622 Rsyncs one single remote file to local filesystem.
624 Note: no locking is done on this file. Any number of processes may
625 mirror this object.
627 Note II: do not use for recentfiles. If you are a cascading
628 slave/server combination, it would confuse other slaves. They would
629 expect the contents of these recentfiles to be available. Use
630 get_remote_recentfile_as_tempfile() instead.
632 =cut
634 sub get_remotefile {
635 my($self, $path) = @_;
636 my $dst = File::Spec->catfile($self->localroot, $path);
637 mkpath dirname $dst;
638 if ($self->verbose) {
639 my $doing = -e $dst ? "Sync" : "Get";
640 printf STDERR
642 "%-4s %d (1/1/%s) %s ... ",
643 $doing,
644 time,
645 $self->interval,
646 $path,
649 while (!$self->rsync->exec(
650 src => join("/",
651 $self->remoteroot,
652 $path),
653 dst => $dst,
654 )) {
655 $self->register_rsync_error ($self->rsync->err);
657 $self->un_register_rsync_error ();
658 if ($self->verbose) {
659 print STDERR "DONE\n";
661 return $dst;
664 =head2 $obj->interval ( $interval_spec )
666 Get/set accessor. $interval_spec is a string and described below in
667 the section INTERVAL SPEC.
669 =cut
671 sub interval {
672 my ($self, $interval) = @_;
673 if (@_ >= 2) {
674 $self->_interval($interval);
675 $self->_rfile(undef);
677 $interval = $self->_interval;
678 unless (defined $interval) {
679 # do not ask the $self too much, it recurses!
680 require Carp;
681 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
683 return $interval;
686 =head2 $secs = $obj->interval_secs ( $interval_spec )
688 $interval_spec is described below in the section INTERVAL SPEC. If
689 empty defaults to the inherent interval for this object.
691 =cut
693 sub interval_secs {
694 my ($self, $interval) = @_;
695 $interval ||= $self->interval;
696 unless (defined $interval) {
697 die "interval_secs() called without argument on an object without a declared one";
699 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
700 die "Could not determine seconds from interval[$interval]";
701 if ($interval eq "Z") {
702 return MAX_INT;
703 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
704 return $seconds{$t}*$n;
705 } else {
706 die "Invalid interval specification: n[$n]t[$t]";
710 =head2 $obj->localroot ( $localroot )
712 Get/set accessor. The local root of the tree.
714 =cut
716 sub localroot {
717 my ($self, $localroot) = @_;
718 if (@_ >= 2) {
719 $self->_localroot($localroot);
720 $self->_rfile(undef);
722 $localroot = $self->_localroot;
725 =head2 $ret = $obj->local_path($path_found_in_recentfile)
727 Combines the path to our local mirror and the path of an object found
728 in this I<recentfile>. In other words: the target of a mirror operation.
730 Implementation note: We split on slashes and then use
731 File::Spec::catfile to adjust to the local operating system.
733 =cut
735 sub local_path {
736 my($self,$path) = @_;
737 unless (defined $path) {
738 # seems like a degenerated case
739 return $self->localroot;
741 my @p = split m|/|, $path;
742 File::Spec->catfile($self->localroot,@p);
745 =head2 (void) $obj->lock
747 Locking is implemented with an C<mkdir> on a locking directory
748 (C<.lock> appended to $rfile).
750 =cut
752 sub lock {
753 my ($self) = @_;
754 # not using flock because it locks on filehandles instead of
755 # old school ressources.
756 my $locked = $self->_is_locked and return;
757 my $rfile = $self->rfile;
758 # XXX need a way to allow breaking the lock
759 my $start = time;
760 my $locktimeout = $self->locktimeout || 600;
761 while (not mkdir "$rfile.lock") {
762 Time::HiRes::sleep 0.01;
763 if (time - $start > $locktimeout) {
764 die "Could not acquire lockdirectory '$rfile.lock': $!";
767 $self->_is_locked (1);
770 =head2 (void) $obj->merge ($other)
772 Bulk update of this object with another one. It's used to merge a
773 smaller and younger $other object into the current one. If this file
774 is a C<Z> file, then we normally do not merge in objects of type
775 C<delete>; this can be overridden by setting
776 keep_delete_objects_forever. But if we encounter an object of type
777 delete we delete the corresponding C<new> object if we have it.
779 If there is nothing to be merged, nothing is done.
781 =cut
783 sub merge {
784 my($self, $other) = @_;
785 $self->_merge_sanitycheck ( $other );
786 $other->lock;
787 my $other_recent = $other->recent_events || [];
788 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
789 $self->lock;
790 $self->_merge_locked ( $other, $other_recent );
791 $self->unlock;
792 $other->unlock;
795 sub _merge_locked {
796 my($self, $other, $other_recent) = @_;
797 my $my_recent = $self->recent_events || [];
799 # calculate the target time span
800 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
801 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
802 my $oldest_allowed = 0;
803 my $something_done;
804 unless ($my_recent->[0]) {
805 # obstetrics
806 $something_done = 1;
808 if ($epoch) {
809 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
810 $oldest_allowed = 0;
811 $something_done = 1;
812 } elsif (my $merged = $self->merged) {
813 my $secs = $self->interval_secs();
814 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
815 if (@$other_recent and
816 _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
818 $oldest_allowed = $other_recent->[-1]{epoch};
821 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
822 pop @$my_recent;
823 $something_done = 1;
827 my %have_path;
828 my $other_recent_filtered = [];
829 for my $oev (@$other_recent) {
830 my $oevepoch = $oev->{epoch} || 0;
831 next if _bigfloatlt($oevepoch, $oldest_allowed);
832 my $path = $oev->{path};
833 next if $have_path{$path}++;
834 if ( $self->interval eq "Z"
835 and $oev->{type} eq "delete"
836 and ! $self->keep_delete_objects_forever
838 # do nothing
839 } else {
840 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
841 $something_done = 1;
843 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
846 if ($something_done) {
847 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
851 sub _merge_something_done {
852 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
853 my $recent = [];
854 my $epoch_conflict = 0;
855 my $last_epoch;
856 ZIP: while (@$other_recent_filtered || @$my_recent) {
857 my $event;
858 if (!@$my_recent ||
859 @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
860 $event = shift @$other_recent_filtered;
861 } else {
862 $event = shift @$my_recent;
863 next ZIP if $have_path->{$event->{path}}++;
865 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
866 $last_epoch = $event->{epoch};
867 push @$recent, $event;
869 if ($epoch_conflict) {
870 my %have_epoch;
871 for (my $i = $#$recent;$i>=0;$i--) {
872 my $epoch = $recent->[$i]{epoch};
873 if ($have_epoch{$epoch}++) {
874 while ($have_epoch{$epoch}) {
875 $epoch = _increase_a_bit($epoch);
877 $recent->[$i]{epoch} = $epoch;
878 $have_epoch{$epoch}++;
882 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
883 $self->dirtymark ( $other->dirtymark );
885 $self->write_recent($recent);
886 $other->merged({
887 time => Time::HiRes::time, # not used anywhere
888 epoch => $recent->[0]{epoch},
889 into_interval => $self->interval, # not used anywhere
891 $other->write_recent($other_recent);
894 sub _merge_sanitycheck {
895 my($self, $other) = @_;
896 if ($self->interval_secs <= $other->interval_secs) {
897 die sprintf
899 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
900 $self->interval_secs,
901 $other->interval_secs,
906 =head2 merged
908 Hashref denoting when this recentfile has been merged into some other
909 at which epoch.
911 =cut
913 sub merged {
914 my($self, $set) = @_;
915 if (defined $set) {
916 $self->_merged ($set);
918 my $merged = $self->_merged;
919 my $into;
920 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
921 # sanity checks
922 if ($into eq $self->interval) {
923 require Carp;
924 Carp::cluck(sprintf
926 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
927 $into,
928 $self->interval,
930 } elsif ($self->interval_secs($into) < $self->interval_secs) {
931 require Carp;
932 Carp::cluck(sprintf
934 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
935 $self->interval_secs($into),
936 $self->interval_secs,
937 $self->interval,
941 $merged;
944 =head2 $hashref = $obj->meta_data
946 Returns the hashref of metadata that the server has to add to the
947 I<recentfile>.
949 =cut
951 sub meta_data {
952 my($self) = @_;
953 my $ret = $self->{meta};
954 for my $m (
955 "aggregator",
956 "canonize",
957 "comment",
958 "dirtymark",
959 "filenameroot",
960 "interval",
961 "merged",
962 "minmax",
963 "protocol",
964 "serializer_suffix",
966 my $v = $self->$m;
967 if (defined $v) {
968 $ret->{$m} = $v;
971 # XXX need to reset the Producer if I am a writer, keep it when I
972 # am a reader
973 $ret->{Producers} ||= {
974 __PACKAGE__, "$VERSION", # stringified it looks better
975 '$0', $0,
976 'time', Time::HiRes::time,
978 $ret->{dirtymark} ||= Time::HiRes::time;
979 return $ret;
982 =head2 $success = $obj->mirror ( %options )
984 Mirrors the files in this I<recentfile> as reported by
985 C<recent_events>. Options named C<after>, C<before>, C<max>, and
986 C<skip-deletes> are passed through to the C<recent_events> call. The
987 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
988 C<max_files_per_connection> and keep track of the rsynced files so
989 that future calls will rsync different files until all files are
990 brought to sync.
992 =cut
994 sub mirror {
995 my($self, %options) = @_;
996 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
997 $self->_use_tempfile (1);
998 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
999 my ($recent_events) = $self->recent_events(%passthrough);
1000 my(@error, @dlcollector); # download-collector: array containing paths we need
1001 my $first_item = 0;
1002 my $last_item = $#$recent_events;
1003 my $done = $self->done;
1004 my $pathdb = $self->_pathdb;
1005 ITEM: for my $i ($first_item..$last_item) {
1006 my $status = +{};
1007 $self->_mirror_item
1010 $recent_events,
1011 $last_item,
1012 $done,
1013 $pathdb,
1014 \@dlcollector,
1015 \%options,
1016 $status,
1017 \@error,
1019 last if $i == $last_item;
1020 if ($status->{mustreturn}){
1021 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1022 # looks like a bug somewhere else
1023 my $t = $self->_current_tempfile;
1024 unlink $t or die "Could not unlink '$t': $!";
1025 $self->_current_tempfile(undef);
1027 return;
1030 if (@dlcollector) {
1031 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
1032 if (!$success || $@) {
1033 warn "Warning: Unknown error while mirroring: $@";
1034 push @error, $@;
1035 sleep 1;
1038 if ($self->verbose) {
1039 print STDERR "DONE\n";
1041 # once we've gone to the end we consider ourselves free of obligations
1042 $self->unseed;
1043 $self->_mirror_unhide_tempfile ($trecentfile);
1044 $self->_mirror_perform_delayed_ops;
1045 return !@error;
1048 sub _mirror_item {
1049 my($self,
1051 $recent_events,
1052 $last_item,
1053 $done,
1054 $pathdb,
1055 $dlcollector,
1056 $options,
1057 $status,
1058 $error,
1059 ) = @_;
1060 my $recent_event = $recent_events->[$i];
1061 return if $done->covered ( $recent_event->{epoch} );
1062 if ($pathdb) {
1063 my $rec = $pathdb->{$recent_event->{path}};
1064 if ($rec && $rec->{recentepoch}) {
1065 if (_bigfloatgt
1066 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1067 $done->register ($recent_events, [$i]);
1068 return;
1072 my $dst = $self->local_path($recent_event->{path});
1073 if ($recent_event->{type} eq "new"){
1074 $self->_mirror_item_new
1076 $dst,
1078 $last_item,
1079 $recent_events,
1080 $recent_event,
1081 $dlcollector,
1082 $pathdb,
1083 $status,
1084 $error,
1085 $options,
1087 } elsif ($recent_event->{type} eq "delete") {
1088 my $activity;
1089 if ($options->{'skip-deletes'}) {
1090 $activity = "skipped";
1091 } else {
1092 if (! -e $dst) {
1093 $activity = "not_found";
1094 } elsif (-l $dst or not -d _) {
1095 $self->delayed_operations->{unlink}{$dst}++;
1096 $activity = "deleted";
1097 } else {
1098 $self->delayed_operations->{rmdir}{$dst}++;
1099 $activity = "deleted";
1102 $done->register ($recent_events, [$i]);
1103 if ($pathdb) {
1104 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1106 } else {
1107 warn "Warning: invalid upload type '$recent_event->{type}'";
1111 sub _mirror_item_new {
1112 my($self,
1113 $dst,
1115 $last_item,
1116 $recent_events,
1117 $recent_event,
1118 $dlcollector,
1119 $pathdb,
1120 $status,
1121 $error,
1122 $options,
1123 ) = @_;
1124 if ($self->verbose) {
1125 my $doing = -e $dst ? "Sync" : "Get";
1126 printf STDERR
1128 "%-4s %d (%d/%d/%s) %s ... ",
1129 $doing,
1130 time,
1131 1+$i,
1132 1+$last_item,
1133 $self->interval,
1134 $recent_event->{path},
1137 my $max_files_per_connection = $self->max_files_per_connection || 42;
1138 my $success;
1139 if ($self->verbose) {
1140 print STDERR "\n";
1142 push @$dlcollector, { rev => $recent_event, i => $i };
1143 if (@$dlcollector >= $max_files_per_connection) {
1144 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1145 my $sleep = $self->sleep_per_connection;
1146 $sleep = 0.42 unless defined $sleep;
1147 Time::HiRes::sleep $sleep;
1148 if ($options->{piecemeal}) {
1149 $status->{mustreturn} = 1;
1150 return;
1152 } else {
1153 return;
1155 if (!$success || $@) {
1156 warn "Warning: Error while mirroring: $@";
1157 push @$error, $@;
1158 sleep 1;
1160 if ($self->verbose) {
1161 print STDERR "DONE\n";
1165 sub _mirror_dlcollector {
1166 my($self,$xcoll,$pathdb,$recent_events) = @_;
1167 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1168 if ($pathdb) {
1169 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1171 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1172 @$xcoll = ();
1173 return $success;
1176 sub _mirror_register_path {
1177 my($self,$pathdb,$coll,$activity) = @_;
1178 my $time = time;
1179 for my $item (@$coll) {
1180 $pathdb->{$item->{path}} =
1182 recentepoch => $item->{epoch},
1183 ($activity."_on") => $time,
1188 sub _mirror_unhide_tempfile {
1189 my($self, $trecentfile) = @_;
1190 my $rfile = $self->rfile;
1191 if (rename $trecentfile, $rfile) {
1192 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1193 } else {
1194 require Carp;
1195 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1197 $self->_use_tempfile (0);
1198 if (my $ctfh = $self->_current_tempfile_fh) {
1199 $ctfh->unlink_on_destroy (0);
1200 $self->_current_tempfile_fh (undef);
1204 sub _mirror_perform_delayed_ops {
1205 my($self) = @_;
1206 my $delayed = $self->delayed_operations;
1207 for my $dst (keys %{$delayed->{unlink}}) {
1208 unless (unlink $dst) {
1209 require Carp;
1210 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1212 delete $delayed->{unlink}{$dst};
1214 for my $dst (keys %{$delayed->{rmdir}}) {
1215 unless (rmdir $dst) {
1216 require Carp;
1217 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1219 delete $delayed->{rmdir}{$dst};
1223 =head2 (void) $obj->mirror_loop
1225 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1226 What happens/should happen if we miss the interval during a single loop?
1228 =cut
1230 sub mirror_loop {
1231 my($self) = @_;
1232 my $iteration_start = time;
1234 my $Signal = 0;
1235 $SIG{INT} = sub { $Signal++ };
1236 my $loopinterval = $self->loopinterval || 42;
1237 my $after = -999999999;
1238 LOOP: while () {
1239 $self->mirror($after);
1240 last LOOP if $Signal;
1241 my $re = $self->recent_events;
1242 $after = $re->[0]{epoch};
1243 if ($self->verbose) {
1244 local $| = 1;
1245 print "($after)";
1247 if (time - $iteration_start < $loopinterval) {
1248 sleep $iteration_start + $loopinterval - time;
1250 if ($self->verbose) {
1251 local $| = 1;
1252 print "~";
1257 =head2 $success = $obj->mirror_path ( $arrref | $path )
1259 If the argument is a scalar it is treated as a path. The remote path
1260 is mirrored into the local copy. $path is the path found in the
1261 I<recentfile>, i.e. it is relative to the root directory of the
1262 mirror.
1264 If the argument is an array reference then all elements are treated as
1265 a path below the current tree and all are rsynced with a single
1266 command (and a single connection).
1268 =cut
1270 sub mirror_path {
1271 my($self,$path) = @_;
1272 # XXX simplify the two branches such that $path is treated as
1273 # [$path] maybe even demand the argument as an arrayref to
1274 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1275 # interface)
1276 if (ref $path and ref $path eq "ARRAY") {
1277 my $dst = $self->localroot;
1278 mkpath dirname $dst;
1279 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1280 lc $self->filenameroot,
1282 TMPDIR => 1,
1283 UNLINK => 0,
1285 for my $p (@$path) {
1286 print $fh $p, "\n";
1288 $fh->flush;
1289 $fh->unlink_on_destroy(1);
1290 my $gaveup = 0;
1291 my $retried = 0;
1292 while (!$self->rsync->exec
1294 src => join("/",
1295 $self->remoteroot,
1297 dst => $dst,
1298 'files-from' => $fh->filename,
1299 )) {
1300 my(@err) = $self->rsync->err;
1301 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1302 if ($self->verbose) {
1303 warn "Info: ignoring link_stat error '@err'";
1305 return 1;
1307 $self->register_rsync_error (@err);
1308 if (++$retried >= 3) {
1309 warn "XXX giving up.";
1310 $gaveup = 1;
1311 last;
1314 unless ($gaveup) {
1315 $self->un_register_rsync_error ();
1317 } else {
1318 my $dst = $self->local_path($path);
1319 mkpath dirname $dst;
1320 while (!$self->rsync->exec
1322 src => join("/",
1323 $self->remoteroot,
1324 $path
1326 dst => $dst,
1327 )) {
1328 my(@err) = $self->rsync->err;
1329 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1330 if ($self->verbose) {
1331 warn "Info: ignoring link_stat error '@err'";
1333 return 1;
1335 $self->register_rsync_error (@err);
1337 $self->un_register_rsync_error ();
1339 return 1;
1342 sub _my_current_rfile {
1343 my($self) = @_;
1344 my $rfile;
1345 if ($self->_use_tempfile) {
1346 $rfile = $self->_current_tempfile;
1347 } else {
1348 $rfile = $self->rfile;
1350 return $rfile;
1353 =head2 $path = $obj->naive_path_normalize ($path)
1355 Takes an absolute unix style path as argument and canonicalizes it to
1356 a shorter path if possible, removing things like double slashes or
1357 C</./> and removes references to C<../> directories to get a shorter
1358 unambiguos path. This is used to make the code easier that determines
1359 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1361 =cut
1363 sub naive_path_normalize {
1364 my($self,$path) = @_;
1365 $path =~ s|/+|/|g;
1366 1 while $path =~ s|/[^/]+/\.\./|/|;
1367 $path =~ s|/$||;
1368 $path;
1371 =head2 $ret = $obj->read_recent_1 ( $data )
1373 Delegate of C<recent_events()> on protocol 1
1375 =cut
1377 sub read_recent_1 {
1378 my($self, $data) = @_;
1379 return $data->{recent};
1382 =head2 $array_ref = $obj->recent_events ( %options )
1384 Note: the code relies on the resource being written atomically. We
1385 cannot lock because we may have no write access. If the caller has
1386 write access (eg. aggregate() or update()), it has to care for any
1387 necessary locking and it MUST write atomically.
1389 If C<$options{after}> is specified, only file events after this
1390 timestamp are returned.
1392 If C<$options{before}> is specified, only file events before this
1393 timestamp are returned.
1395 IF C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1396 will be returned.
1398 If C<$options{max}> is specified only a maximum of this many events is
1399 returned.
1401 If C<$options{contains}> is specified the value must be a hash
1402 reference containing a query. The query may contain the keys C<epoch>,
1403 C<path>, and C<type>. Each represents a condition that must be met. If
1404 there is more than one such key, the conditions are ANDed.
1406 If C<$options{info}> is specified, it must be a hashref. This hashref
1407 will be filled with metadata about the unfiltered recent_events of
1408 this object, in key C<first> there is the first item, in key C<last>
1409 is the last.
1411 =cut
1413 sub recent_events {
1414 my ($self, %options) = @_;
1415 my $info = $options{info};
1416 if ($self->is_slave) {
1417 # XXX seems dubious, might produce tempfiles without removing them?
1418 $self->get_remote_recentfile_as_tempfile;
1420 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1421 -e $rfile_or_tempfile or return [];
1422 my $suffix = $self->serializer_suffix;
1423 my ($data) = eval {
1424 $self->_try_deserialize
1426 $suffix,
1427 $rfile_or_tempfile,
1430 my $err = $@;
1431 if ($err or !$data) {
1432 return [];
1434 my $re;
1435 if (reftype $data eq 'ARRAY') { # protocol 0
1436 $re = $data;
1437 } else {
1438 $re = $self->_recent_events_protocol_x
1440 $data,
1441 $rfile_or_tempfile,
1444 return $re unless grep {defined $options{$_}} qw(after before contains max);
1445 $self->_recent_events_handle_options ($re, \%options);
1448 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1449 sub _recent_events_handle_options {
1450 my($self, $re, $options) = @_;
1451 my $last_item = $#$re;
1452 my $info = $options->{info};
1453 if ($info) {
1454 $info->{first} = $re->[0];
1455 $info->{last} = $re->[-1];
1457 if (defined $options->{after}) {
1458 if ($re->[0]{epoch} > $options->{after}) {
1459 if (
1460 my $f = first
1461 {$re->[$_]{epoch} <= $options->{after}}
1462 0..$#$re
1464 $last_item = $f-1;
1466 } else {
1467 $last_item = -1;
1470 my $first_item = 0;
1471 if (defined $options->{before}) {
1472 if ($re->[0]{epoch} > $options->{before}) {
1473 if (
1474 my $f = first
1475 {$re->[$_]{epoch} < $options->{before}}
1476 0..$last_item
1478 $first_item = $f;
1480 } else {
1481 $first_item = 0;
1484 if (0 != $first_item || -1 != $last_item) {
1485 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1487 if ($options->{'skip-deletes'}) {
1488 @$re = grep { $_->{type} ne "delete" } @$re;
1490 if (my $contopt = $options->{contains}) {
1491 my $seen_allowed = 0;
1492 for my $allow (qw(epoch path type)) {
1493 if (exists $contopt->{$allow}) {
1494 $seen_allowed++;
1495 my $v = $contopt->{$allow};
1496 @$re = grep { $_->{$allow} eq $v } @$re;
1499 if (keys %$contopt > $seen_allowed) {
1500 require Carp;
1501 Carp::confess
1502 (sprintf "unknown query: %s", join ", ", %$contopt);
1505 if ($options->{max} && @$re > $options->{max}) {
1506 @$re = splice @$re, 0, $options->{max};
1508 $re;
1511 sub _recent_events_protocol_x {
1512 my($self,
1513 $data,
1514 $rfile_or_tempfile,
1515 ) = @_;
1516 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1517 # we may be reading meta for the first time
1518 while (my($k,$v) = each %{$data->{meta}}) {
1519 next if $k ne lc $k; # "Producers"
1520 next if defined $self->$k;
1521 $self->$k($v);
1523 my $re = $self->$meth ($data);
1524 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1525 my $minmax = { mtime => $stat[9] };
1526 if (@$re) {
1527 $minmax->{min} = $re->[-1]{epoch};
1528 $minmax->{max} = $re->[0]{epoch};
1530 $self->minmax ( $minmax );
1531 return $re;
1534 sub _try_deserialize {
1535 my($self,
1536 $suffix,
1537 $rfile_or_tempfile,
1538 ) = @_;
1539 if ($suffix eq ".yaml") {
1540 require YAML::Syck;
1541 YAML::Syck::LoadFile($rfile_or_tempfile);
1542 } elsif ($HAVE->{"Data::Serializer"}) {
1543 my $serializer = Data::Serializer->new
1544 ( serializer => $serializers{$suffix} );
1545 my $serialized = do
1547 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1548 local $/;
1549 <$fh>;
1551 $serializer->raw_deserialize($serialized);
1552 } else {
1553 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1557 sub _refresh_internals {
1558 my($self, $dst) = @_;
1559 my $class = ref $self;
1560 my $rfpeek = $class->new_from_file ($dst);
1561 for my $acc (qw(
1562 _merged
1563 minmax
1564 )) {
1565 $self->$acc ( $rfpeek->$acc );
1567 my $old_dirtymark = $self->dirtymark;
1568 my $new_dirtymark = $rfpeek->dirtymark;
1569 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1570 $self->done->reset;
1571 $self->dirtymark ( $new_dirtymark );
1572 $self->seed;
1576 =head2 $ret = $obj->rfilename
1578 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1579 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1581 =cut
1583 sub rfilename {
1584 my($self) = @_;
1585 my $file = sprintf("%s-%s%s",
1586 $self->filenameroot,
1587 $self->interval,
1588 $self->serializer_suffix,
1590 return $file;
1593 =head2 $str = $self->remote_dir
1595 The directory we are mirroring from.
1597 =cut
1599 sub remote_dir {
1600 my($self, $set) = @_;
1601 if (defined $set) {
1602 $self->_remote_dir ($set);
1604 my $x = $self->_remote_dir;
1605 $self->is_slave (1);
1606 return $x;
1609 =head2 $str = $obj->remoteroot
1611 =head2 (void) $obj->remoteroot ( $set )
1613 Get/Set the composed prefix needed when rsyncing from a remote module.
1614 If remote_host, remote_module, and remote_dir are set, it is composed
1615 from these.
1617 =cut
1619 sub remoteroot {
1620 my($self, $set) = @_;
1621 if (defined $set) {
1622 $self->_remoteroot($set);
1624 my $remoteroot = $self->_remoteroot;
1625 unless (defined $remoteroot) {
1626 $remoteroot = sprintf
1628 "%s%s%s",
1629 defined $self->remote_host ? ($self->remote_host."::") : "",
1630 defined $self->remote_module ? ($self->remote_module."/") : "",
1631 defined $self->remote_dir ? $self->remote_dir : "",
1633 $self->_remoteroot($remoteroot);
1635 return $remoteroot;
1638 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1640 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1641 of the pattern
1643 $filenameroot-$interval$serializer_suffix
1645 e.g.
1647 RECENT-1M.yaml
1649 This filename is split into its parts and the parts are fed to the
1650 object itself.
1652 =cut
1654 sub resolve_recentfilename {
1655 my($self, $rfname) = @_;
1656 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1657 if (my($f,$i,$s) = $rfname =~ $splitter) {
1658 $self->filenameroot ($f);
1659 $self->interval ($i);
1660 $self->serializer_suffix ($s);
1661 } else {
1662 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1664 return;
1667 =head2 my $rfile = $obj->rfile
1669 Returns the full path of the I<recentfile>
1671 =cut
1673 sub rfile {
1674 my($self) = @_;
1675 my $rfile = $self->_rfile;
1676 return $rfile if defined $rfile;
1677 $rfile = File::Spec->catfile
1678 ($self->localroot,
1679 $self->rfilename,
1681 $self->_rfile ($rfile);
1682 return $rfile;
1685 =head2 $rsync_obj = $obj->rsync
1687 The File::Rsync object that this object uses for communicating with an
1688 upstream server.
1690 =cut
1692 sub rsync {
1693 my($self) = @_;
1694 my $rsync = $self->_rsync;
1695 unless (defined $rsync) {
1696 my $rsync_options = $self->rsync_options || {};
1697 if ($HAVE->{"File::Rsync"}) {
1698 $rsync = File::Rsync->new($rsync_options);
1699 $self->_rsync($rsync);
1700 } else {
1701 die "File::Rsync required for rsync operations. Cannot continue";
1704 return $rsync;
1707 =head2 (void) $obj->register_rsync_error(@err)
1709 =head2 (void) $obj->un_register_rsync_error()
1711 Register_rsync_error is called whenever the File::Rsync object fails
1712 on an exec (say, connection doesn't succeed). It issues a warning and
1713 sleeps for an increasing amount of time. Un_register_rsync_error
1714 resets the error count. See also accessor C<max_rsync_errors>.
1716 =cut
1719 my $no_success_count = 0;
1720 my $no_success_time = 0;
1721 sub register_rsync_error {
1722 my($self, @err) = @_;
1723 chomp @err;
1724 $no_success_time = time;
1725 $no_success_count++;
1726 my $max_rsync_errors = $self->max_rsync_errors;
1727 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1728 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1729 require Carp;
1730 Carp::confess
1732 sprintf
1734 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1735 $self->interval,
1736 join(" ",@err),
1737 $no_success_count,
1740 my $sleep = 12 * $no_success_count;
1741 $sleep = 300 if $sleep > 300;
1742 require Carp;
1743 Carp::cluck
1744 (sprintf
1746 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1747 scalar(localtime($no_success_time)),
1748 $self->interval,
1749 join(" ",@err),
1750 $sleep,
1752 sleep $sleep
1754 sub un_register_rsync_error {
1755 my($self) = @_;
1756 $no_success_time = 0;
1757 $no_success_count = 0;
1761 =head2 $clone = $obj->_sparse_clone
1763 Clones just as much from itself that it does not hurt. Experimental
1764 method.
1766 Note: what fits better: sparse or shallow? Other suggestions?
1768 =cut
1770 sub _sparse_clone {
1771 my($self) = @_;
1772 my $new = bless {}, ref $self;
1773 for my $m (qw(
1774 _interval
1775 _localroot
1776 _remoteroot
1777 _rfile
1778 _use_tempfile
1779 aggregator
1780 filenameroot
1781 is_slave
1782 max_files_per_connection
1783 protocol
1784 rsync_options
1785 serializer_suffix
1786 sleep_per_connection
1787 verbose
1788 )) {
1789 my $o = $self->$m;
1790 $o = Storable::dclone $o if ref $o;
1791 $new->$m($o);
1793 $new;
1796 =head2 $boolean = OBJ->ttl_reached ()
1798 =cut
1800 sub ttl_reached {
1801 my($self) = @_;
1802 my $have_mirrored = $self->have_mirrored || 0;
1803 my $now = Time::HiRes::time;
1804 my $ttl = $self->ttl;
1805 $ttl = 24.2 unless defined $ttl;
1806 if ($now > $have_mirrored + $ttl) {
1807 return 1;
1809 return 0;
1812 =head2 (void) $obj->unlock()
1814 Unlocking is implemented with an C<rmdir> on a locking directory
1815 (C<.lock> appended to $rfile).
1817 =cut
1819 sub unlock {
1820 my($self) = @_;
1821 return unless $self->_is_locked;
1822 my $rfile = $self->rfile;
1823 rmdir "$rfile.lock";
1824 $self->_is_locked (0);
1827 =head2 unseed
1829 Sets this recentfile in the state of not 'seeded'.
1831 =cut
1832 sub unseed {
1833 my($self) = @_;
1834 $self->seeded(0);
1837 =head2 $ret = $obj->update ($path, $type)
1839 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1841 =head2 $ret = $obj->update ()
1843 Enter one file into the local I<recentfile>. $path is the (usually
1844 absolute) path. If the path is outside I<our> tree, then it is
1845 ignored.
1847 C<$type> is one of C<new> or C<delete>.
1849 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1850 not used and the epoch is calculated by the update() routine itself
1851 based on current time. But if there is the demand to insert a
1852 not-so-current file into the dataset, then the caller sets
1853 $dirty_epoch. This causes the epoch of the registered event to become
1854 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1855 bit more. As compensation the dirtymark of the whole dataset is set to
1856 now or the current epoch, whichever is higher. Note: setting the
1857 dirty_epoch to the future is prohibited as it's very unlikely to be
1858 intended: it definitely might wreak havoc with the index files.
1860 The new file event is unshifted (or, if dirty_epoch is set, inserted
1861 at the place it belongs to, according to the rule to have a sequence
1862 of strictly decreasing timestamps) to the array of recent_events and
1863 the array is shortened to the length of the timespan allowed. This is
1864 usually the timespan specified by the interval of this recentfile but
1865 as long as this recentfile has not been merged to another one, the
1866 timespan may grow without bounds.
1868 The third form runs an update without inserting a new file. This may
1869 be desired to truncate a recentfile.
1871 =cut
1872 sub _epoch_monotonically_increasing {
1873 my($self,$epoch,$recent) = @_;
1874 return $epoch unless @$recent; # the first one goes unoffended
1875 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1876 return $epoch;
1877 } else {
1878 return _increase_a_bit($recent->[0]{epoch});
1881 sub update {
1882 my($self,$path,$type,$dirty_epoch) = @_;
1883 if (defined $path or defined $type or defined $dirty_epoch) {
1884 die "update called without path argument" unless defined $path;
1885 die "update called without type argument" unless defined $type;
1886 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1887 # since we have keep_delete_objects_forever we must let them inject delete objects too:
1888 #die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1889 # "dirty_epoch only allowed with type=new" if defined $dirty_epoch and $type ne "new";
1890 my $canonmeth = $self->canonize;
1891 unless ($canonmeth) {
1892 $canonmeth = "naive_path_normalize";
1894 $path = $self->$canonmeth($path);
1896 my $lrd = $self->localroot;
1897 $self->lock;
1898 # you must calculate the time after having locked, of course
1899 my $now = Time::HiRes::time;
1900 my $interval = $self->interval;
1901 my $secs = $self->interval_secs();
1902 my $recent = $self->recent_events;
1904 my $epoch;
1905 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
1906 $epoch = $dirty_epoch;
1907 } else {
1908 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
1911 $recent ||= [];
1912 my $oldest_allowed = 0;
1913 my $merged = $self->merged;
1914 if ($merged->{epoch}) {
1915 my $virtualnow = _bigfloatmax($now,$epoch);
1916 # for the lower bound I think we need no big math, we calc already
1917 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
1918 } else {
1919 # as long as we are not merged at all, no limits!
1921 my $something_done = 0;
1922 TRUNCATE: while (@$recent) {
1923 # $DB::single++ unless defined $oldest_allowed;
1924 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
1925 pop @$recent;
1926 $something_done = 1;
1927 } else {
1928 last TRUNCATE;
1931 if (defined $path && $path =~ s|^\Q$lrd\E||) {
1932 $path =~ s|^/||;
1933 my $splicepos;
1934 # remove the older duplicates of this $path, irrespective of $type:
1935 if (defined $dirty_epoch) {
1936 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
1937 $recent = $ctx->{recent};
1938 $splicepos = $ctx->{splicepos};
1939 $epoch = $ctx->{epoch};
1940 my $dirtymark = $self->dirtymark;
1941 my $new_dm = $now;
1942 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
1943 $new_dm = $epoch;
1945 $self->dirtymark($new_dm);
1946 my $merged = $self->merged;
1947 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
1948 $self->merged(+{});
1950 } else {
1951 $recent = [ grep { $_->{path} ne $path } @$recent ];
1952 $splicepos = 0;
1954 if (defined $splicepos) {
1955 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
1957 $something_done = 1;
1960 $self->write_recent($recent) if $something_done;
1961 $self->_assert_symlink;
1962 $self->unlock;
1965 sub _update_with_dirty_epoch {
1966 my($self,$path,$recent,$epoch) = @_;
1967 my $splicepos;
1968 my $new_recent = [];
1969 if (grep { $_->{path} ne $path } @$recent) {
1970 my $cancel = 0;
1971 KNOWN_EVENT: for my $i (0..$#$recent) {
1972 if ($recent->[$i]{path} eq $path) {
1973 if ($recent->[$i]{epoch} eq $epoch) {
1974 # nothing to do
1975 $cancel = 1;
1976 last KNOWN_EVENT;
1978 } else {
1979 push @$new_recent, $recent->[$i];
1982 @$recent = @$new_recent unless $cancel;
1984 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
1985 $splicepos = 0;
1986 } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
1987 $splicepos = @$recent;
1988 } else {
1989 RECENT: for my $i (0..$#$recent) {
1990 my $ev = $recent->[$i];
1991 if ($epoch eq $recent->[$i]{epoch}) {
1992 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
1994 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
1995 $splicepos = $i;
1996 last RECENT;
2000 return {
2001 recent => $recent,
2002 splicepos => $splicepos,
2003 epoch => $epoch,
2007 =head2 seed
2009 Sets this recentfile in the state of 'seeded' which means it has to
2010 re-evaluate its uptodateness.
2012 =cut
2013 sub seed {
2014 my($self) = @_;
2015 $self->seeded(1);
2018 =head2 seeded
2020 Tells if the recentfile is in the state 'seeded'.
2022 =cut
2023 sub seeded {
2024 my($self, $set) = @_;
2025 if (defined $set) {
2026 $self->_seeded ($set);
2028 my $x = $self->_seeded;
2029 unless (defined $x) {
2030 $x = 0;
2031 $self->_seeded ($x);
2033 return $x;
2036 =head2 uptodate
2038 True if this object has mirrored the complete interval covered by the
2039 current recentfile.
2041 =cut
2042 sub uptodate {
2043 my($self) = @_;
2044 my $uptodate;
2045 my $why;
2046 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2047 $why = "saturated";
2048 $uptodate = 1;
2050 # it's too easy to misconfigure ttl and related timings and then
2051 # never reach uptodateness, so disabled 2009-03-22
2052 if (0 and not defined $uptodate) {
2053 if ($self->ttl_reached){
2054 $why = "ttl_reached returned true, so we are not uptodate";
2055 $uptodate = 0 ;
2058 unless (defined $uptodate) {
2059 # look if recentfile has unchanged timestamp
2060 my $minmax = $self->minmax;
2061 if (exists $minmax->{mtime}) {
2062 my $rfile = $self->_my_current_rfile;
2063 my @stat = stat $rfile;
2064 my $mtime = $stat[9];
2065 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
2066 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2067 $uptodate = 0;
2068 } else {
2069 my $covered = $self->done->covered(@$minmax{qw(max min)});
2070 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2071 $uptodate = $covered;
2075 unless (defined $uptodate) {
2076 $why = "fallthrough, so not uptodate";
2077 $uptodate = 0;
2079 if ($uptodate) {
2080 $self->_uptodateness_ever_reached(1);
2082 my $remember =
2084 uptodate => $uptodate,
2085 why => $why,
2087 $self->_remember_last_uptodate_call($remember);
2088 return $uptodate;
2091 =head2 $obj->write_recent ($recent_files_arrayref)
2093 Writes a I<recentfile> based on the current reflection of the current
2094 state of the tree limited by the current interval.
2096 =cut
2097 sub _resort {
2098 my($self) = @_;
2099 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
2100 return;
2102 sub write_recent {
2103 my ($self,$recent) = @_;
2104 die "write_recent called without argument" unless defined $recent;
2105 my $Last_epoch;
2106 SANITYCHECK: for my $i (0..$#$recent) {
2107 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2108 require Carp;
2109 Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2110 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2111 # you may want to:
2112 # $self->_resort($recent);
2113 # last SANITYCHECK;
2115 $Last_epoch = $recent->[$i]{epoch};
2117 my $minmax = $self->minmax;
2118 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2119 $minmax->{max} = $recent->[0]{epoch};
2121 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2122 $minmax->{min} = $recent->[-1]{epoch};
2124 $self->minmax($minmax);
2125 my $meth = sprintf "write_%d", $self->protocol;
2126 $self->$meth($recent);
2129 =head2 $obj->write_0 ($recent_files_arrayref)
2131 Delegate of C<write_recent()> on protocol 0
2133 =cut
2135 sub write_0 {
2136 my ($self,$recent) = @_;
2137 my $rfile = $self->rfile;
2138 YAML::Syck::DumpFile("$rfile.new",$recent);
2139 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2142 =head2 $obj->write_1 ($recent_files_arrayref)
2144 Delegate of C<write_recent()> on protocol 1
2146 =cut
2148 sub write_1 {
2149 my ($self,$recent) = @_;
2150 my $rfile = $self->rfile;
2151 my $suffix = $self->serializer_suffix;
2152 my $data = {
2153 meta => $self->meta_data,
2154 recent => $recent,
2156 my $serialized;
2157 if ($suffix eq ".yaml") {
2158 $serialized = YAML::Syck::Dump($data);
2159 } elsif ($HAVE->{"Data::Serializer"}) {
2160 my $serializer = Data::Serializer->new
2161 ( serializer => $serializers{$suffix} );
2162 $serialized = $serializer->raw_serialize($data);
2163 } else {
2164 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2166 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2167 print $fh $serialized;
2168 close $fh or die "Could not close '$rfile.new': $!";
2169 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2172 BEGIN {
2173 my $nq = qr/[^"]+/; # non-quotes
2174 my @pod_lines =
2175 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2177 =head1 SERIALIZERS
2179 The following suffixes are supported and trigger the use of these
2180 serializers:
2182 =over 4
2184 =item C<< ".yaml" => "YAML::Syck" >>
2186 =item C<< ".json" => "JSON" >>
2188 =item C<< ".sto" => "Storable" >>
2190 =item C<< ".dd" => "Data::Dumper" >>
2192 =back
2194 =cut
2196 BEGIN {
2197 my @pod_lines =
2198 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2200 =head1 INTERVAL SPEC
2202 An interval spec is a primitive way to express time spans. Normally it
2203 is composed from an integer and a letter.
2205 As a special case, a string that consists only of the single letter
2206 C<Z>, stands for unlimited time.
2208 The following letters express the specified number of seconds:
2210 =over 4
2212 =item C<< s => 1 >>
2214 =item C<< m => 60 >>
2216 =item C<< h => 60*60 >>
2218 =item C<< d => 60*60*24 >>
2220 =item C<< W => 60*60*24*7 >>
2222 =item C<< M => 60*60*24*30 >>
2224 =item C<< Q => 60*60*24*90 >>
2226 =item C<< Y => 60*60*24*365.25 >>
2228 =back
2230 =cut
2232 =head1 SEE ALSO
2234 L<File::Rsync::Mirror::Recent>,
2235 L<File::Rsync::Mirror::Recentfile::Done>,
2236 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2238 =head1 BUGS
2240 Please report any bugs or feature requests through the web interface
2242 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2243 I will be notified, and then you'll automatically be notified of
2244 progress on your bug as I make changes.
2246 =head1 KNOWN BUGS
2248 Memory hungry: it seems all memory is allocated during the initial
2249 rsync where a list of all files is maintained in memory.
2251 =head1 SUPPORT
2253 You can find documentation for this module with the perldoc command.
2255 perldoc File::Rsync::Mirror::Recentfile
2257 You can also look for information at:
2259 =over 4
2261 =item * RT: CPAN's request tracker
2263 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2265 =item * AnnoCPAN: Annotated CPAN documentation
2267 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2269 =item * CPAN Ratings
2271 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2273 =item * Search CPAN
2275 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2277 =back
2280 =head1 ACKNOWLEDGEMENTS
2282 Thanks to RJBS for module-starter.
2284 =head1 AUTHOR
2286 Andreas König
2288 =head1 COPYRIGHT & LICENSE
2290 Copyright 2008,2009 Andreas König.
2292 This program is free software; you can redistribute it and/or modify it
2293 under the same terms as Perl itself.
2296 =cut
2298 1; # End of File::Rsync::Mirror::Recentfile
2300 # Local Variables:
2301 # mode: cperl
2302 # cperl-indent-level: 4
2303 # End: