focus on delete events and be more defensive when a recentfile cannot be stat-ed
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blobcbc4dccfa55473d4e873da4c388d9af23e393b2a
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =cut
14 my $HAVE = {};
15 for my $package (
16 "Data::Serializer",
17 "File::Rsync"
18 ) {
19 $HAVE->{$package} = eval qq{ require $package; };
21 use Config;
22 use File::Basename qw(basename dirname fileparse);
23 use File::Copy qw(cp);
24 use File::Path qw(mkpath);
25 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
26 use File::Temp;
27 use List::Util qw(first max min);
28 use Scalar::Util qw(reftype);
29 use Storable;
30 use Time::HiRes qw();
31 use YAML::Syck;
33 use version; our $VERSION = qv('0.0.5');
35 use constant MAX_INT => ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL => 1;
38 # cf. interval_secs
39 my %seconds;
41 # maybe subclass if this mapping is bad?
42 my %serializers;
44 =head1 SYNOPSIS
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
51 interval => q(6h),
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
59 Reader/mirrorer:
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
64 ignore_link_stat_errors => 1,
65 interval => q(6h),
66 localroot => "/home/ftp/pub/PAUSE/authors",
67 remote_dir => "",
68 remote_host => "pause.perl.org",
69 remote_module => "authors",
70 rsync_options => {
71 compress => 1,
72 'rsync-path' => '/usr/bin/rsync',
73 links => 1,
74 times => 1,
75 'omit-dir-times' => 1,
76 checksum => 1,
78 verbose => 1,
80 $rf->mirror;
82 Aggregator (usually the writer):
84 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
85 $rf->aggregate;
87 =head1 DESCRIPTION
89 Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
90 is always composed of several recentfiles, controlled by the
91 F:R:M:Recent object. The Recentfile object has to do the bookkeeping
92 for a single timeslice.
94 =head1 EXPORT
96 No exports.
98 =head1 CONSTRUCTORS / DESTRUCTOR
100 =head2 my $obj = CLASS->new(%hash)
102 Constructor. On every argument pair the key is a method name and the
103 value is an argument to that method name.
105 If a recentfile for this resource already exists, metadata that are
106 not defined by the constructor will be fetched from there as soon as
107 it is being read by recent_events().
109 =cut
111 sub new {
112 my($class, @args) = @_;
113 my $self = bless {}, $class;
114 while (@args) {
115 my($method,$arg) = splice @args, 0, 2;
116 $self->$method($arg);
118 unless (defined $self->protocol) {
119 $self->protocol(DEFAULT_PROTOCOL);
121 unless (defined $self->filenameroot) {
122 $self->filenameroot("RECENT");
124 unless (defined $self->serializer_suffix) {
125 $self->serializer_suffix(".yaml");
127 return $self;
130 =head2 my $obj = CLASS->new_from_file($file)
132 Constructor. $file is a I<recentfile>.
134 =cut
136 sub new_from_file {
137 my($class, $file) = @_;
138 my $self = bless {}, $class;
139 $self->_rfile($file);
140 #?# $self->lock;
141 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
142 local $/;
143 <$fh>;
145 # XXX: we can skip this step when the metadata are sufficient, but
146 # we cannot parse the file without some magic stuff about
147 # serialized formats
148 while (-l $file) {
149 my($name,$path) = fileparse $file;
150 my $symlink = readlink $file;
151 if ($symlink =~ m|/|) {
152 die "FIXME: filenames containing '/' not supported, got $symlink";
154 $file = File::Spec->catfile ( $path, $symlink );
156 my($name,$path,$suffix) = fileparse $file, keys %serializers;
157 $self->serializer_suffix($suffix);
158 $self->localroot($path);
159 die "Could not determine file format from suffix" unless $suffix;
160 my $deserialized;
161 if ($suffix eq ".yaml") {
162 require YAML::Syck;
163 $deserialized = YAML::Syck::LoadFile($file);
164 } elsif ($HAVE->{"Data::Serializer"}) {
165 my $serializer = Data::Serializer->new
166 ( serializer => $serializers{$suffix} );
167 $deserialized = $serializer->raw_deserialize($serialized);
168 } else {
169 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
171 while (my($k,$v) = each %{$deserialized->{meta}}) {
172 next if $k ne lc $k; # "Producers"
173 $self->$k($v);
175 unless (defined $self->protocol) {
176 $self->protocol(DEFAULT_PROTOCOL);
178 return $self;
181 =head2 DESTROY
183 A simple unlock.
185 =cut
186 sub DESTROY {
187 my $self = shift;
188 $self->unlock;
189 unless ($self->_current_tempfile_fh) {
190 if (my $tempfile = $self->_current_tempfile) {
191 if (-e $tempfile) {
192 unlink $tempfile; # may fail in global destruction
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_delayed_operations",
209 "_done",
210 "_interval",
211 "_is_locked",
212 "_localroot",
213 "_merged",
214 "_pathdb",
215 "_remember_last_uptodate_call",
216 "_remote_dir",
217 "_remoteroot",
218 "_rfile",
219 "_rsync",
220 "__verified_tempdir",
221 "_seeded",
222 "_uptodateness_ever_reached",
223 "_use_tempfile",
226 my @pod_lines =
227 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
229 =over 4
231 =item aggregator
233 A list of interval specs that tell the aggregator which I<recentfile>s
234 are to be produced.
236 =item canonize
238 The name of a method to canonize the path before rsyncing. Only
239 supported value is C<naive_path_normalize>. Defaults to that.
241 =item comment
243 A comment about this tree and setup.
245 =item dirtymark
247 A timestamp. The dirtymark is updated whenever an out of band change
248 on the origin server is performed that violates the protocol. Say,
249 they add or remove files in the middle somewhere. Slaves must react
250 with a devaluation of their C<done> structure which then leads to a
251 full re-sync of all files. Implementation note: dirtymark may increase
252 or decrease.
254 =item filenameroot
256 The (prefix of the) filename we use for this I<recentfile>. Defaults to
257 C<RECENT>. The string must not contain a directory separator.
259 =item have_mirrored
261 Timestamp remembering when we mirrored this recentfile the last time.
262 Only relevant for slaves.
264 =item ignore_link_stat_errors
266 If set to true, rsync errors are ignored that complain about link stat
267 errors. These seem to happen only when there are files missing at the
268 origin. In race conditions this can always happen, so it is
269 recommended to set this value to true.
271 =item is_slave
273 If set to true, this object will fetch a new recentfile from remote
274 when the timespan between the last mirror (see have_mirrored) and now
275 is too large (see C<ttl>).
277 =item keep_delete_objects_forever
279 The default for delete events is that they are passed through the
280 collection of recentfile objects until they reach the Z file. There
281 they get dropped so that the associated file object ceases to exist at
282 all. By setting C<keep_delete_objects_forever> the delete objects are
283 kept forever. This makes the Z file larger but has the advantage that
284 slaves that have interrupted mirroring for a long time still can clean
285 up their copy.
287 =item locktimeout
289 After how many seconds shall we die if we cannot lock a I<recentfile>?
290 Defaults to 600 seconds.
292 =item loopinterval
294 When mirror_loop is called, this accessor can specify how much time
295 every loop shall at least take. If the work of a loop is done before
296 that time has gone, sleeps for the rest of the time. Defaults to
297 arbitrary 42 seconds.
299 =item max_files_per_connection
301 Maximum number of files that are transferred on a single rsync call.
302 Setting it higher means higher performance at the price of holding
303 connections longer and potentially disturbing other users in the pool.
304 Defaults to the arbitrary value 42.
306 =item max_rsync_errors
308 When rsync operations encounter that many errors without any resetting
309 success in between, then we die. Defaults to unlimited. A value of
310 -1 means we run forever ignoring all rsync errors.
312 =item minmax
314 Hashref remembering when we read the recent_events from this file the
315 last time and what the timespan was.
317 =item protocol
319 When the RECENT file format changes, we increment the protocol. We try
320 to support older protocols in later releases.
322 =item remote_host
324 The host we are mirroring from. Leave empty for the local filesystem.
326 =item remote_module
328 Rsync servers have so called modules to separate directory trees from
329 each other. Put here the name of the module under which we are
330 mirroring. Leave empty for local filesystem.
332 =item rsync_options
334 Things like compress, links, times or checksums. Passed in to the
335 File::Rsync object used to run the mirror.
337 =item serializer_suffix
339 Mostly untested accessor. The only well tested format for
340 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
341 Data::Serializer. But in principle other formats are supported as
342 well. See section SERIALIZERS below.
344 =item sleep_per_connection
346 Sleep that many seconds (floating point OK) after every chunk of rsyncing
347 has finished. Defaults to arbitrary 0.42.
349 =item tempdir
351 Directory to write temporary files to. Must allow rename operations
352 into the tree which usually means it must live on the same partition
353 as the target directory. Defaults to C<< $self->localroot >>.
355 =item ttl
357 Time to live. Number of seconds after which this recentfile must be
358 fetched again from the origin server. Only relevant for slaves.
359 Defaults to arbitrary 24.2 seconds.
361 =item verbose
363 Boolean to turn on a bit verbosity.
365 =item verboselog
367 Path to the logfile to write verbose progress information to. This is
368 a primitive stop gap solution to get simple verbose logging working.
369 Switching to Log4perl or similar is probably the way to go.
371 =back
373 =cut
375 use accessors @accessors;
377 =head1 METHODS
379 =head2 (void) $obj->aggregate( %options )
381 Takes all intervals that are collected in the accessor called
382 aggregator. Sorts them by actual length of the interval.
383 Removes those that are shorter than our own interval. Then merges this
384 object into the next larger object. The merging continues upwards
385 as long as the next I<recentfile> is old enough to warrant a merge.
387 If a merge is warranted is decided according to the interval of the
388 previous interval so that larger files are not so often updated as
389 smaller ones. If $options{force} is true, all files get updated.
391 Here is an example to illustrate the behaviour. Given aggregators
393 1h 1d 1W 1M 1Q 1Y Z
395 then
397 1h updates 1d on every call to aggregate()
398 1d updates 1W earliest after 1h
399 1W updates 1M earliest after 1d
400 1M updates 1Q earliest after 1W
401 1Q updates 1Y earliest after 1M
402 1Y updates Z earliest after 1Q
404 Note that all but the smallest recentfile get updated at an arbitrary
405 rate and as such are quite useless on their own.
407 =cut
409 sub aggregate {
410 my($self, %option) = @_;
411 my @aggs = sort { $a->{secs} <=> $b->{secs} }
412 grep { $_->{secs} >= $self->interval_secs }
413 map { { interval => $_, secs => $self->interval_secs($_)} }
414 $self->interval, @{$self->aggregator || []};
415 $self->update;
416 $aggs[0]{object} = $self;
417 AGGREGATOR: for my $i (0..$#aggs-1) {
418 my $this = $aggs[$i]{object};
419 my $next = $this->_sparse_clone;
420 $next->interval($aggs[$i+1]{interval});
421 my $want_merge = 0;
422 if ($option{force} || $i == 0) {
423 $want_merge = 1;
424 } else {
425 my $next_rfile = $next->rfile;
426 if (-e $next_rfile) {
427 my $prev = $aggs[$i-1]{object};
428 local $^T = time;
429 my $next_age = 86400 * -M $next_rfile;
430 if ($next_age > $prev->interval_secs) {
431 $want_merge = 1;
433 } else {
434 $want_merge = 1;
437 if ($want_merge) {
438 $next->merge($this);
439 $aggs[$i+1]{object} = $next;
440 } else {
441 last AGGREGATOR;
446 # collect file size and mtime for all files of this aggregate
447 sub _debug_aggregate {
448 my($self) = @_;
449 my @aggs = sort { $a->{secs} <=> $b->{secs} }
450 map { { interval => $_, secs => $self->interval_secs($_)} }
451 $self->interval, @{$self->aggregator || []};
452 my $report = [];
453 for my $i (0..$#aggs) {
454 my $this = Storable::dclone $self;
455 $this->interval($aggs[$i]{interval});
456 my $rfile = $this->rfile;
457 my @stat = stat $rfile;
458 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
460 $report;
463 # (void) $self->_assert_symlink()
464 sub _assert_symlink {
465 my($self) = @_;
466 my $recentrecentfile = File::Spec->catfile
468 $self->localroot,
469 sprintf
471 "%s.recent",
472 $self->filenameroot
475 if ($Config{d_symlink} eq "define") {
476 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
477 if (-l $recentrecentfile) {
478 my $found_symlink = readlink $recentrecentfile;
479 if ($found_symlink eq $self->rfilename) {
480 return;
481 } else {
482 $howto_create_symlink = 2;
484 } else {
485 $howto_create_symlink = 1;
487 if (1 == $howto_create_symlink) {
488 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
489 } else {
490 unlink "$recentrecentfile.$$"; # may fail
491 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
492 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
494 } else {
495 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
496 unlink "$recentrecentfile.$$"; # may fail
497 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
498 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
502 =head2 $hashref = $obj->delayed_operations
504 A hash of hashes containing unlink and rmdir operations which had to
505 wait until the recentfile got unhidden in order to not confuse
506 downstream mirrors (in case we have some).
508 =cut
510 sub delayed_operations {
511 my($self) = @_;
512 my $x = $self->_delayed_operations;
513 unless (defined $x) {
514 $x = {
515 unlink => {},
516 rmdir => {},
518 $self->_delayed_operations ($x);
520 return $x;
523 =head2 $done = $obj->done
525 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
526 object that keeps track of rsync activities. Only needed and used when
527 we are a mirroring slave.
529 =cut
531 sub done {
532 my($self) = @_;
533 my $done = $self->_done;
534 if (!$done) {
535 require File::Rsync::Mirror::Recentfile::Done;
536 $done = File::Rsync::Mirror::Recentfile::Done->new();
537 $done->_rfinterval ($self->interval);
538 $self->_done ( $done );
540 return $done;
543 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
545 Stores the remote I<recentfile> locally as a tempfile. The caller is
546 responsible to remove the file after use.
548 Note: if you're intending to act as an rsync server for other slaves,
549 then you must prefer this method to fetch that file with
550 get_remotefile(). Otherwise downstream mirrors would expect you to
551 already have mirrored all the files that are in the I<recentfile>
552 before you have them mirrored.
554 =cut
556 sub get_remote_recentfile_as_tempfile {
557 my($self) = @_;
558 mkpath $self->localroot;
559 my $fh;
560 my $trfilename;
561 if ( $self->_use_tempfile() ) {
562 if ($self->ttl_reached) {
563 $fh = $self->_current_tempfile_fh;
564 $trfilename = $self->rfilename;
565 } else {
566 return $self->_current_tempfile;
568 } else {
569 $trfilename = $self->rfilename;
572 my $dst;
573 if ($fh) {
574 $dst = $self->_current_tempfile;
575 } else {
576 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
577 $dst = $fh->filename;
578 $self->_current_tempfile ($dst);
579 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
580 if (defined $rfile && -e $rfile) {
581 # saving on bandwidth. Might need to be configurable
582 # $self->bandwidth_is_cheap?
583 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
586 my $src = join ("/",
587 $self->remoteroot,
588 $trfilename,
590 if ($self->verbose) {
591 my $doing = -e $dst ? "Sync" : "Get";
592 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
593 my $LFH = $self->_logfilehandle;
594 printf $LFH
596 "%-4s %d (1/1/%s) temp %s ... ",
597 $doing,
598 time,
599 $self->interval,
600 $display_dst,
603 my $gaveup = 0;
604 my $retried = 0;
605 while (!$self->rsync->exec(
606 src => $src,
607 dst => $dst,
608 )) {
609 $self->register_rsync_error ($self->rsync->err);
610 if (++$retried >= 3) {
611 warn "XXX giving up";
612 $gaveup = 1;
613 last;
616 if ($gaveup) {
617 my $LFH = $self->_logfilehandle;
618 printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
619 } else {
620 $self->_refresh_internals ($dst);
621 $self->have_mirrored (Time::HiRes::time);
622 $self->un_register_rsync_error ();
624 $self->unseed;
625 if ($self->verbose) {
626 my $LFH = $self->_logfilehandle;
627 print $LFH "DONE\n";
629 my $mode = 0644;
630 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
631 return $dst;
634 sub _verified_tempdir {
635 my($self) = @_;
636 my $tempdir = $self->__verified_tempdir();
637 return $tempdir if defined $tempdir;
638 unless ($tempdir = $self->tempdir) {
639 $tempdir = $self->localroot;
641 unless (-d $tempdir) {
642 mkpath $tempdir;
644 $self->__verified_tempdir($tempdir);
645 return $tempdir;
648 sub _get_remote_rat_provide_tempfile_object {
649 my($self, $trfilename) = @_;
650 my $_verified_tempdir = $self->_verified_tempdir;
651 my $fh = File::Temp->new
652 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
653 $trfilename,
655 DIR => $_verified_tempdir,
656 SUFFIX => $self->serializer_suffix,
657 UNLINK => $self->_use_tempfile,
659 my $mode = 0644;
660 my $dst = $fh->filename;
661 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
662 if ($self->_use_tempfile) {
663 $self->_current_tempfile_fh ($fh); # delay self destruction
665 return $fh;
668 sub _logfilehandle {
669 my($self) = @_;
670 my $fh;
671 if (my $vl = $self->verboselog) {
672 open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
673 } else {
674 $fh = \*STDERR;
676 return $fh;
679 =head2 $localpath = $obj->get_remotefile ( $relative_path )
681 Rsyncs one single remote file to local filesystem.
683 Note: no locking is done on this file. Any number of processes may
684 mirror this object.
686 Note II: do not use for recentfiles. If you are a cascading
687 slave/server combination, it would confuse other slaves. They would
688 expect the contents of these recentfiles to be available. Use
689 get_remote_recentfile_as_tempfile() instead.
691 =cut
693 sub get_remotefile {
694 my($self, $path) = @_;
695 my $dst = File::Spec->catfile($self->localroot, $path);
696 mkpath dirname $dst;
697 if ($self->verbose) {
698 my $doing = -e $dst ? "Sync" : "Get";
699 my $LFH = $self->_logfilehandle;
700 printf $LFH
702 "%-4s %d (1/1/%s) %s ... ",
703 $doing,
704 time,
705 $self->interval,
706 $path,
709 while (!$self->rsync->exec(
710 src => join("/",
711 $self->remoteroot,
712 $path),
713 dst => $dst,
714 )) {
715 $self->register_rsync_error ($self->rsync->err);
717 $self->un_register_rsync_error ();
718 if ($self->verbose) {
719 my $LFH = $self->_logfilehandle;
720 print $LFH "DONE\n";
722 return $dst;
725 =head2 $obj->interval ( $interval_spec )
727 Get/set accessor. $interval_spec is a string and described below in
728 the section INTERVAL SPEC.
730 =cut
732 sub interval {
733 my ($self, $interval) = @_;
734 if (@_ >= 2) {
735 $self->_interval($interval);
736 $self->_rfile(undef);
738 $interval = $self->_interval;
739 unless (defined $interval) {
740 # do not ask the $self too much, it recurses!
741 require Carp;
742 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
744 return $interval;
747 =head2 $secs = $obj->interval_secs ( $interval_spec )
749 $interval_spec is described below in the section INTERVAL SPEC. If
750 empty defaults to the inherent interval for this object.
752 =cut
754 sub interval_secs {
755 my ($self, $interval) = @_;
756 $interval ||= $self->interval;
757 unless (defined $interval) {
758 die "interval_secs() called without argument on an object without a declared one";
760 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
761 die "Could not determine seconds from interval[$interval]";
762 if ($interval eq "Z") {
763 return MAX_INT;
764 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
765 return $seconds{$t}*$n;
766 } else {
767 die "Invalid interval specification: n[$n]t[$t]";
771 =head2 $obj->localroot ( $localroot )
773 Get/set accessor. The local root of the tree.
775 =cut
777 sub localroot {
778 my ($self, $localroot) = @_;
779 if (@_ >= 2) {
780 $self->_localroot($localroot);
781 $self->_rfile(undef);
783 $localroot = $self->_localroot;
786 =head2 $ret = $obj->local_path($path_found_in_recentfile)
788 Combines the path to our local mirror and the path of an object found
789 in this I<recentfile>. In other words: the target of a mirror operation.
791 Implementation note: We split on slashes and then use
792 File::Spec::catfile to adjust to the local operating system.
794 =cut
796 sub local_path {
797 my($self,$path) = @_;
798 unless (defined $path) {
799 # seems like a degenerated case
800 return $self->localroot;
802 my @p = split m|/|, $path;
803 File::Spec->catfile($self->localroot,@p);
806 =head2 (void) $obj->lock
808 Locking is implemented with an C<mkdir> on a locking directory
809 (C<.lock> appended to $rfile).
811 =cut
813 sub lock {
814 my ($self) = @_;
815 # not using flock because it locks on filehandles instead of
816 # old school ressources.
817 my $locked = $self->_is_locked and return;
818 my $rfile = $self->rfile;
819 # XXX need a way to allow breaking the lock
820 my $start = time;
821 my $locktimeout = $self->locktimeout || 600;
822 while (not mkdir "$rfile.lock") {
823 Time::HiRes::sleep 0.01;
824 if (time - $start > $locktimeout) {
825 die "Could not acquire lockdirectory '$rfile.lock': $!";
828 $self->_is_locked (1);
831 =head2 (void) $obj->merge ($other)
833 Bulk update of this object with another one. It's used to merge a
834 smaller and younger $other object into the current one. If this file
835 is a C<Z> file, then we normally do not merge in objects of type
836 C<delete>; this can be overridden by setting
837 keep_delete_objects_forever. But if we encounter an object of type
838 delete we delete the corresponding C<new> object if we have it.
840 If there is nothing to be merged, nothing is done.
842 =cut
844 sub merge {
845 my($self, $other) = @_;
846 $self->_merge_sanitycheck ( $other );
847 $other->lock;
848 my $other_recent = $other->recent_events || [];
849 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
850 $self->lock;
851 $self->_merge_locked ( $other, $other_recent );
852 $self->unlock;
853 $other->unlock;
856 sub _merge_locked {
857 my($self, $other, $other_recent) = @_;
858 my $my_recent = $self->recent_events || [];
860 # calculate the target time span
861 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
862 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
863 my $oldest_allowed = 0;
864 my $something_done;
865 unless ($my_recent->[0]) {
866 # obstetrics
867 $something_done = 1;
869 if ($epoch) {
870 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
871 $oldest_allowed = 0;
872 $something_done = 1;
873 } elsif (my $merged = $self->merged) {
874 my $secs = $self->interval_secs();
875 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
876 if (@$other_recent and
877 _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
879 $oldest_allowed = $other_recent->[-1]{epoch};
882 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
883 pop @$my_recent;
884 $something_done = 1;
888 my %have_path;
889 my $other_recent_filtered = [];
890 for my $oev (@$other_recent) {
891 my $oevepoch = $oev->{epoch} || 0;
892 next if _bigfloatlt($oevepoch, $oldest_allowed);
893 my $path = $oev->{path};
894 next if $have_path{$path}++;
895 if ( $self->interval eq "Z"
896 and $oev->{type} eq "delete"
897 and ! $self->keep_delete_objects_forever
899 # do nothing
900 } else {
901 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
902 $something_done = 1;
904 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
907 if ($something_done) {
908 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
912 sub _merge_something_done {
913 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
914 my $recent = [];
915 my $epoch_conflict = 0;
916 my $last_epoch;
917 ZIP: while (@$other_recent_filtered || @$my_recent) {
918 my $event;
919 if (!@$my_recent ||
920 @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
921 $event = shift @$other_recent_filtered;
922 } else {
923 $event = shift @$my_recent;
924 next ZIP if $have_path->{$event->{path}}++;
926 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
927 $last_epoch = $event->{epoch};
928 push @$recent, $event;
930 if ($epoch_conflict) {
931 my %have_epoch;
932 for (my $i = $#$recent;$i>=0;$i--) {
933 my $epoch = $recent->[$i]{epoch};
934 if ($have_epoch{$epoch}++) {
935 while ($have_epoch{$epoch}) {
936 $epoch = _increase_a_bit($epoch);
938 $recent->[$i]{epoch} = $epoch;
939 $have_epoch{$epoch}++;
943 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
944 $self->dirtymark ( $other->dirtymark );
946 $self->write_recent($recent);
947 $other->merged({
948 time => Time::HiRes::time, # not used anywhere
949 epoch => $recent->[0]{epoch},
950 into_interval => $self->interval, # not used anywhere
952 $other->write_recent($other_recent);
955 sub _merge_sanitycheck {
956 my($self, $other) = @_;
957 if ($self->interval_secs <= $other->interval_secs) {
958 die sprintf
960 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
961 $self->interval_secs,
962 $other->interval_secs,
967 =head2 merged
969 Hashref denoting when this recentfile has been merged into some other
970 at which epoch.
972 =cut
974 sub merged {
975 my($self, $set) = @_;
976 if (defined $set) {
977 $self->_merged ($set);
979 my $merged = $self->_merged;
980 my $into;
981 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
982 # sanity checks
983 if ($into eq $self->interval) {
984 require Carp;
985 Carp::cluck(sprintf
987 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
988 $into,
989 $self->interval,
991 } elsif ($self->interval_secs($into) < $self->interval_secs) {
992 require Carp;
993 Carp::cluck(sprintf
995 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
996 $self->interval_secs($into),
997 $self->interval_secs,
998 $self->interval,
1002 $merged;
1005 =head2 $hashref = $obj->meta_data
1007 Returns the hashref of metadata that the server has to add to the
1008 I<recentfile>.
1010 =cut
1012 sub meta_data {
1013 my($self) = @_;
1014 my $ret = $self->{meta};
1015 for my $m (
1016 "aggregator",
1017 "canonize",
1018 "comment",
1019 "dirtymark",
1020 "filenameroot",
1021 "interval",
1022 "merged",
1023 "minmax",
1024 "protocol",
1025 "serializer_suffix",
1027 my $v = $self->$m;
1028 if (defined $v) {
1029 $ret->{$m} = $v;
1032 # XXX need to reset the Producer if I am a writer, keep it when I
1033 # am a reader
1034 $ret->{Producers} ||= {
1035 __PACKAGE__, "$VERSION", # stringified it looks better
1036 '$0', $0,
1037 'time', Time::HiRes::time,
1039 $ret->{dirtymark} ||= Time::HiRes::time;
1040 return $ret;
1043 =head2 $success = $obj->mirror ( %options )
1045 Mirrors the files in this I<recentfile> as reported by
1046 C<recent_events>. Options named C<after>, C<before>, C<max>, and
1047 C<skip-deletes> are passed through to the C<recent_events> call. The
1048 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
1049 C<max_files_per_connection> and keep track of the rsynced files so
1050 that future calls will rsync different files until all files are
1051 brought to sync.
1053 =cut
1055 sub mirror {
1056 my($self, %options) = @_;
1057 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1058 $self->_use_tempfile (1);
1059 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
1060 my ($recent_events) = $self->recent_events(%passthrough);
1061 my(@error, @dlcollector); # download-collector: array containing paths we need
1062 my $first_item = 0;
1063 my $last_item = $#$recent_events;
1064 my $done = $self->done;
1065 my $pathdb = $self->_pathdb;
1066 ITEM: for my $i ($first_item..$last_item) {
1067 my $status = +{};
1068 $self->_mirror_item
1071 $recent_events,
1072 $last_item,
1073 $done,
1074 $pathdb,
1075 \@dlcollector,
1076 \%options,
1077 $status,
1078 \@error,
1080 last if $i == $last_item;
1081 if ($status->{mustreturn}){
1082 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1083 # looks like a bug somewhere else
1084 my $t = $self->_current_tempfile;
1085 unlink $t or die "Could not unlink '$t': $!";
1086 $self->_current_tempfile(undef);
1087 $self->_use_tempfile(0);
1089 return;
1092 if (@dlcollector) {
1093 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
1094 if (!$success || $@) {
1095 warn "Warning: Unknown error while mirroring: $@";
1096 push @error, $@;
1097 sleep 1;
1100 if ($self->verbose) {
1101 my $LFH = $self->_logfilehandle;
1102 print $LFH "DONE\n";
1104 # once we've gone to the end we consider ourselves free of obligations
1105 $self->unseed;
1106 $self->_mirror_unhide_tempfile ($trecentfile);
1107 $self->_mirror_perform_delayed_ops;
1108 return !@error;
1111 sub _mirror_item {
1112 my($self,
1114 $recent_events,
1115 $last_item,
1116 $done,
1117 $pathdb,
1118 $dlcollector,
1119 $options,
1120 $status,
1121 $error,
1122 ) = @_;
1123 my $recent_event = $recent_events->[$i];
1124 return if $done->covered ( $recent_event->{epoch} );
1125 if ($pathdb) {
1126 my $rec = $pathdb->{$recent_event->{path}};
1127 if ($rec && $rec->{recentepoch}) {
1128 if (_bigfloatgt
1129 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1130 $done->register ($recent_events, [$i]);
1131 return;
1135 my $dst = $self->local_path($recent_event->{path});
1136 if ($recent_event->{type} eq "new"){
1137 $self->_mirror_item_new
1139 $dst,
1141 $last_item,
1142 $recent_events,
1143 $recent_event,
1144 $dlcollector,
1145 $pathdb,
1146 $status,
1147 $error,
1148 $options,
1150 } elsif ($recent_event->{type} eq "delete") {
1151 my $activity;
1152 if ($options->{'skip-deletes'}) {
1153 $activity = "skipped";
1154 } else {
1155 if (! -e $dst) {
1156 $activity = "not_found";
1157 } elsif (-l $dst or not -d _) {
1158 $self->delayed_operations->{unlink}{$dst}++;
1159 $activity = "deleted";
1160 } else {
1161 $self->delayed_operations->{rmdir}{$dst}++;
1162 $activity = "deleted";
1165 $done->register ($recent_events, [$i]);
1166 if ($pathdb) {
1167 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1169 } else {
1170 warn "Warning: invalid upload type '$recent_event->{type}'";
1174 sub _mirror_item_new {
1175 my($self,
1176 $dst,
1178 $last_item,
1179 $recent_events,
1180 $recent_event,
1181 $dlcollector,
1182 $pathdb,
1183 $status,
1184 $error,
1185 $options,
1186 ) = @_;
1187 if ($self->verbose) {
1188 my $doing = -e $dst ? "Sync" : "Get";
1189 my $LFH = $self->_logfilehandle;
1190 printf $LFH
1192 "%-4s %d (%d/%d/%s) %s ... ",
1193 $doing,
1194 time,
1195 1+$i,
1196 1+$last_item,
1197 $self->interval,
1198 $recent_event->{path},
1201 my $max_files_per_connection = $self->max_files_per_connection || 42;
1202 my $success;
1203 if ($self->verbose) {
1204 my $LFH = $self->_logfilehandle;
1205 print $LFH "\n";
1207 push @$dlcollector, { rev => $recent_event, i => $i };
1208 if (@$dlcollector >= $max_files_per_connection) {
1209 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1210 my $sleep = $self->sleep_per_connection;
1211 $sleep = 0.42 unless defined $sleep;
1212 Time::HiRes::sleep $sleep;
1213 if ($options->{piecemeal}) {
1214 $status->{mustreturn} = 1;
1215 return;
1217 } else {
1218 return;
1220 if (!$success || $@) {
1221 warn "Warning: Error while mirroring: $@";
1222 push @$error, $@;
1223 sleep 1;
1225 if ($self->verbose) {
1226 my $LFH = $self->_logfilehandle;
1227 print $LFH "DONE\n";
1231 sub _mirror_dlcollector {
1232 my($self,$xcoll,$pathdb,$recent_events) = @_;
1233 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1234 if ($pathdb) {
1235 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1237 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1238 @$xcoll = ();
1239 return $success;
1242 sub _mirror_register_path {
1243 my($self,$pathdb,$coll,$activity) = @_;
1244 my $time = time;
1245 for my $item (@$coll) {
1246 $pathdb->{$item->{path}} =
1248 recentepoch => $item->{epoch},
1249 ($activity."_on") => $time,
1254 sub _mirror_unhide_tempfile {
1255 my($self, $trecentfile) = @_;
1256 my $rfile = $self->rfile;
1257 if (rename $trecentfile, $rfile) {
1258 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1259 } else {
1260 require Carp;
1261 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1263 $self->_use_tempfile (0);
1264 if (my $ctfh = $self->_current_tempfile_fh) {
1265 $ctfh->unlink_on_destroy (0);
1266 $self->_current_tempfile_fh (undef);
1270 sub _mirror_perform_delayed_ops {
1271 my($self) = @_;
1272 my $delayed = $self->delayed_operations;
1273 for my $dst (keys %{$delayed->{unlink}}) {
1274 unless (unlink $dst) {
1275 require Carp;
1276 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1278 delete $delayed->{unlink}{$dst};
1280 for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
1281 unless (rmdir $dst) {
1282 require Carp;
1283 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1285 delete $delayed->{rmdir}{$dst};
1289 =head2 $success = $obj->mirror_path ( $arrref | $path )
1291 If the argument is a scalar it is treated as a path. The remote path
1292 is mirrored into the local copy. $path is the path found in the
1293 I<recentfile>, i.e. it is relative to the root directory of the
1294 mirror.
1296 If the argument is an array reference then all elements are treated as
1297 a path below the current tree and all are rsynced with a single
1298 command (and a single connection).
1300 =cut
1302 sub mirror_path {
1303 my($self,$path) = @_;
1304 # XXX simplify the two branches such that $path is treated as
1305 # [$path] maybe even demand the argument as an arrayref to
1306 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1307 # interface)
1308 if (ref $path and ref $path eq "ARRAY") {
1309 my $dst = $self->localroot;
1310 mkpath dirname $dst;
1311 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1312 lc $self->filenameroot,
1314 TMPDIR => 1,
1315 UNLINK => 0,
1317 for my $p (@$path) {
1318 print $fh $p, "\n";
1320 $fh->flush;
1321 $fh->unlink_on_destroy(1);
1322 my $gaveup = 0;
1323 my $retried = 0;
1324 while (!$self->rsync->exec
1326 src => join("/",
1327 $self->remoteroot,
1329 dst => $dst,
1330 'files-from' => $fh->filename,
1331 )) {
1332 my(@err) = $self->rsync->err;
1333 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1334 if ($self->verbose) {
1335 my $LFH = $self->_logfilehandle;
1336 print $LFH "Info: ignoring link_stat error '@err'";
1338 return 1;
1340 $self->register_rsync_error (@err);
1341 if (++$retried >= 3) {
1342 warn "XXX giving up.";
1343 $gaveup = 1;
1344 last;
1347 unless ($gaveup) {
1348 $self->un_register_rsync_error ();
1350 } else {
1351 my $dst = $self->local_path($path);
1352 mkpath dirname $dst;
1353 while (!$self->rsync->exec
1355 src => join("/",
1356 $self->remoteroot,
1357 $path
1359 dst => $dst,
1360 )) {
1361 my(@err) = $self->rsync->err;
1362 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1363 if ($self->verbose) {
1364 my $LFH = $self->_logfilehandle;
1365 print $LFH "Info: ignoring link_stat error '@err'";
1367 return 1;
1369 $self->register_rsync_error (@err);
1371 $self->un_register_rsync_error ();
1373 return 1;
1376 sub _my_current_rfile {
1377 my($self) = @_;
1378 my $rfile;
1379 if ($self->_use_tempfile) {
1380 $rfile = $self->_current_tempfile;
1381 } else {
1382 $rfile = $self->rfile;
1384 return $rfile;
1387 =head2 $path = $obj->naive_path_normalize ($path)
1389 Takes an absolute unix style path as argument and canonicalizes it to
1390 a shorter path if possible, removing things like double slashes or
1391 C</./> and removes references to C<../> directories to get a shorter
1392 unambiguos path. This is used to make the code easier that determines
1393 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1395 =cut
1397 sub naive_path_normalize {
1398 my($self,$path) = @_;
1399 $path =~ s|/+|/|g;
1400 1 while $path =~ s|/[^/]+/\.\./|/|;
1401 $path =~ s|/$||;
1402 $path;
1405 =head2 $ret = $obj->read_recent_1 ( $data )
1407 Delegate of C<recent_events()> on protocol 1
1409 =cut
1411 sub read_recent_1 {
1412 my($self, $data) = @_;
1413 return $data->{recent};
1416 =head2 $array_ref = $obj->recent_events ( %options )
1418 Note: the code relies on the resource being written atomically. We
1419 cannot lock because we may have no write access. If the caller has
1420 write access (eg. aggregate() or update()), it has to care for any
1421 necessary locking and it MUST write atomically.
1423 If C<$options{after}> is specified, only file events after this
1424 timestamp are returned.
1426 If C<$options{before}> is specified, only file events before this
1427 timestamp are returned.
1429 If C<$options{max}> is specified only a maximum of this many events is
1430 returned.
1432 If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1433 will be returned.
1435 If C<$options{contains}> is specified the value must be a hash
1436 reference containing a query. The query may contain the keys C<epoch>,
1437 C<path>, and C<type>. Each represents a condition that must be met. If
1438 there is more than one such key, the conditions are ANDed.
1440 If C<$options{info}> is specified, it must be a hashref. This hashref
1441 will be filled with metadata about the unfiltered recent_events of
1442 this object, in key C<first> there is the first item, in key C<last>
1443 is the last.
1445 =cut
1447 sub recent_events {
1448 my ($self, %options) = @_;
1449 my $info = $options{info};
1450 if ($self->is_slave) {
1451 # XXX seems dubious, might produce tempfiles without removing them?
1452 $self->get_remote_recentfile_as_tempfile;
1454 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1455 -e $rfile_or_tempfile or return [];
1456 my $suffix = $self->serializer_suffix;
1457 my ($data) = eval {
1458 $self->_try_deserialize
1460 $suffix,
1461 $rfile_or_tempfile,
1464 my $err = $@;
1465 if ($err or !$data) {
1466 return [];
1468 my $re;
1469 if (reftype $data eq 'ARRAY') { # protocol 0
1470 $re = $data;
1471 } else {
1472 $re = $self->_recent_events_protocol_x
1474 $data,
1475 $rfile_or_tempfile,
1478 return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
1479 $self->_recent_events_handle_options ($re, \%options);
1482 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1483 sub _recent_events_handle_options {
1484 my($self, $re, $options) = @_;
1485 my $last_item = $#$re;
1486 my $info = $options->{info};
1487 if ($info) {
1488 $info->{first} = $re->[0];
1489 $info->{last} = $re->[-1];
1491 if (defined $options->{after}) {
1492 if ($re->[0]{epoch} > $options->{after}) {
1493 if (
1494 my $f = first
1495 {$re->[$_]{epoch} <= $options->{after}}
1496 0..$#$re
1498 $last_item = $f-1;
1500 } else {
1501 $last_item = -1;
1504 my $first_item = 0;
1505 if (defined $options->{before}) {
1506 if ($re->[0]{epoch} > $options->{before}) {
1507 if (
1508 my $f = first
1509 {$re->[$_]{epoch} < $options->{before}}
1510 0..$last_item
1512 $first_item = $f;
1514 } else {
1515 $first_item = 0;
1518 if (0 != $first_item || -1 != $last_item) {
1519 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1521 if ($options->{'skip-deletes'}) {
1522 @$re = grep { $_->{type} ne "delete" } @$re;
1524 if (my $contopt = $options->{contains}) {
1525 my $seen_allowed = 0;
1526 for my $allow (qw(epoch path type)) {
1527 if (exists $contopt->{$allow}) {
1528 $seen_allowed++;
1529 my $v = $contopt->{$allow};
1530 @$re = grep { $_->{$allow} eq $v } @$re;
1533 if (keys %$contopt > $seen_allowed) {
1534 require Carp;
1535 Carp::confess
1536 (sprintf "unknown query: %s", join ", ", %$contopt);
1539 if ($options->{max} && @$re > $options->{max}) {
1540 @$re = splice @$re, 0, $options->{max};
1542 $re;
1545 sub _recent_events_protocol_x {
1546 my($self,
1547 $data,
1548 $rfile_or_tempfile,
1549 ) = @_;
1550 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1551 # we may be reading meta for the first time
1552 while (my($k,$v) = each %{$data->{meta}}) {
1553 next if $k ne lc $k; # "Producers"
1554 next if defined $self->$k;
1555 $self->$k($v);
1557 my $re = $self->$meth ($data);
1558 my $minmax;
1559 if (my @stat = stat $rfile_or_tempfile) {
1560 $minmax = { mtime => $stat[9] };
1561 } else {
1562 # defensive because ABH encountered:
1564 #### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
1565 #### Ydr_.yaml ... DONE
1566 #### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
1567 #### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
1568 #### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
1569 #### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
1570 #### gone already at cpan-pause.pl line 0
1572 my $LFH = $self->_logfilehandle;
1573 print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
1575 if (@$re) {
1576 $minmax->{min} = $re->[-1]{epoch};
1577 $minmax->{max} = $re->[0]{epoch};
1579 $self->minmax ( $minmax );
1580 return $re;
1583 sub _try_deserialize {
1584 my($self,
1585 $suffix,
1586 $rfile_or_tempfile,
1587 ) = @_;
1588 if ($suffix eq ".yaml") {
1589 require YAML::Syck;
1590 YAML::Syck::LoadFile($rfile_or_tempfile);
1591 } elsif ($HAVE->{"Data::Serializer"}) {
1592 my $serializer = Data::Serializer->new
1593 ( serializer => $serializers{$suffix} );
1594 my $serialized = do
1596 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1597 local $/;
1598 <$fh>;
1600 $serializer->raw_deserialize($serialized);
1601 } else {
1602 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1606 sub _refresh_internals {
1607 my($self, $dst) = @_;
1608 my $class = ref $self;
1609 my $rfpeek = $class->new_from_file ($dst);
1610 for my $acc (qw(
1611 _merged
1612 minmax
1613 )) {
1614 $self->$acc ( $rfpeek->$acc );
1616 my $old_dirtymark = $self->dirtymark;
1617 my $new_dirtymark = $rfpeek->dirtymark;
1618 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1619 $self->done->reset;
1620 $self->dirtymark ( $new_dirtymark );
1621 $self->_uptodateness_ever_reached(0);
1622 $self->seed;
1626 =head2 $ret = $obj->rfilename
1628 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1629 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1631 =cut
1633 sub rfilename {
1634 my($self) = @_;
1635 my $file = sprintf("%s-%s%s",
1636 $self->filenameroot,
1637 $self->interval,
1638 $self->serializer_suffix,
1640 return $file;
1643 =head2 $str = $self->remote_dir
1645 The directory we are mirroring from.
1647 =cut
1649 sub remote_dir {
1650 my($self, $set) = @_;
1651 if (defined $set) {
1652 $self->_remote_dir ($set);
1654 my $x = $self->_remote_dir;
1655 $self->is_slave (1);
1656 return $x;
1659 =head2 $str = $obj->remoteroot
1661 =head2 (void) $obj->remoteroot ( $set )
1663 Get/Set the composed prefix needed when rsyncing from a remote module.
1664 If remote_host, remote_module, and remote_dir are set, it is composed
1665 from these.
1667 =cut
1669 sub remoteroot {
1670 my($self, $set) = @_;
1671 if (defined $set) {
1672 $self->_remoteroot($set);
1674 my $remoteroot = $self->_remoteroot;
1675 unless (defined $remoteroot) {
1676 $remoteroot = sprintf
1678 "%s%s%s",
1679 defined $self->remote_host ? ($self->remote_host."::") : "",
1680 defined $self->remote_module ? ($self->remote_module."/") : "",
1681 defined $self->remote_dir ? $self->remote_dir : "",
1683 $self->_remoteroot($remoteroot);
1685 return $remoteroot;
1688 =head2 (void) $obj->split_rfilename ( $recentfilename )
1690 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1691 of the pattern
1693 $filenameroot-$interval$serializer_suffix
1695 e.g.
1697 RECENT-1M.yaml
1699 This filename is split into its parts and the parts are fed to the
1700 object itself.
1702 =cut
1704 sub split_rfilename {
1705 my($self, $rfname) = @_;
1706 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1707 if (my($f,$i,$s) = $rfname =~ $splitter) {
1708 $self->filenameroot ($f);
1709 $self->interval ($i);
1710 $self->serializer_suffix ($s);
1711 } else {
1712 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1714 return;
1717 =head2 my $rfile = $obj->rfile
1719 Returns the full path of the I<recentfile>
1721 =cut
1723 sub rfile {
1724 my($self) = @_;
1725 my $rfile = $self->_rfile;
1726 return $rfile if defined $rfile;
1727 $rfile = File::Spec->catfile
1728 ($self->localroot,
1729 $self->rfilename,
1731 $self->_rfile ($rfile);
1732 return $rfile;
1735 =head2 $rsync_obj = $obj->rsync
1737 The File::Rsync object that this object uses for communicating with an
1738 upstream server.
1740 =cut
1742 sub rsync {
1743 my($self) = @_;
1744 my $rsync = $self->_rsync;
1745 unless (defined $rsync) {
1746 my $rsync_options = $self->rsync_options || {};
1747 if ($HAVE->{"File::Rsync"}) {
1748 $rsync = File::Rsync->new($rsync_options);
1749 $self->_rsync($rsync);
1750 } else {
1751 die "File::Rsync required for rsync operations. Cannot continue";
1754 return $rsync;
1757 =head2 (void) $obj->register_rsync_error(@err)
1759 =head2 (void) $obj->un_register_rsync_error()
1761 Register_rsync_error is called whenever the File::Rsync object fails
1762 on an exec (say, connection doesn't succeed). It issues a warning and
1763 sleeps for an increasing amount of time. Un_register_rsync_error
1764 resets the error count. See also accessor C<max_rsync_errors>.
1766 =cut
1769 my $no_success_count = 0;
1770 my $no_success_time = 0;
1771 sub register_rsync_error {
1772 my($self, @err) = @_;
1773 chomp @err;
1774 $no_success_time = time;
1775 $no_success_count++;
1776 my $max_rsync_errors = $self->max_rsync_errors;
1777 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1778 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1779 require Carp;
1780 Carp::confess
1782 sprintf
1784 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1785 $self->interval,
1786 join(" ",@err),
1787 $no_success_count,
1790 my $sleep = 12 * $no_success_count;
1791 $sleep = 300 if $sleep > 300;
1792 require Carp;
1793 Carp::cluck
1794 (sprintf
1796 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1797 scalar(localtime($no_success_time)),
1798 $self->interval,
1799 join(" ",@err),
1800 $sleep,
1802 sleep $sleep
1804 sub un_register_rsync_error {
1805 my($self) = @_;
1806 $no_success_time = 0;
1807 $no_success_count = 0;
1811 =head2 $clone = $obj->_sparse_clone
1813 Clones just as much from itself that it does not hurt. Experimental
1814 method.
1816 Note: what fits better: sparse or shallow? Other suggestions?
1818 =cut
1820 sub _sparse_clone {
1821 my($self) = @_;
1822 my $new = bless {}, ref $self;
1823 for my $m (qw(
1824 _interval
1825 _localroot
1826 _remoteroot
1827 _rfile
1828 _use_tempfile
1829 aggregator
1830 filenameroot
1831 is_slave
1832 max_files_per_connection
1833 protocol
1834 rsync_options
1835 serializer_suffix
1836 sleep_per_connection
1837 tempdir
1838 verbose
1839 )) {
1840 my $o = $self->$m;
1841 $o = Storable::dclone $o if ref $o;
1842 $new->$m($o);
1844 $new;
1847 =head2 $boolean = OBJ->ttl_reached ()
1849 =cut
1851 sub ttl_reached {
1852 my($self) = @_;
1853 my $have_mirrored = $self->have_mirrored || 0;
1854 my $now = Time::HiRes::time;
1855 my $ttl = $self->ttl;
1856 $ttl = 24.2 unless defined $ttl;
1857 if ($now > $have_mirrored + $ttl) {
1858 return 1;
1860 return 0;
1863 =head2 (void) $obj->unlock()
1865 Unlocking is implemented with an C<rmdir> on a locking directory
1866 (C<.lock> appended to $rfile).
1868 =cut
1870 sub unlock {
1871 my($self) = @_;
1872 return unless $self->_is_locked;
1873 my $rfile = $self->rfile;
1874 rmdir "$rfile.lock";
1875 $self->_is_locked (0);
1878 =head2 unseed
1880 Sets this recentfile in the state of not 'seeded'.
1882 =cut
1883 sub unseed {
1884 my($self) = @_;
1885 $self->seeded(0);
1888 =head2 $ret = $obj->update ($path, $type)
1890 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1892 =head2 $ret = $obj->update ()
1894 Enter one file into the local I<recentfile>. $path is the (usually
1895 absolute) path. If the path is outside I<our> tree, then it is
1896 ignored.
1898 C<$type> is one of C<new> or C<delete>.
1900 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1901 not used and the epoch is calculated by the update() routine itself
1902 based on current time. But if there is the demand to insert a
1903 not-so-current file into the dataset, then the caller sets
1904 $dirty_epoch. This causes the epoch of the registered event to become
1905 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1906 bit more. As compensation the dirtymark of the whole dataset is set to
1907 now or the current epoch, whichever is higher. Note: setting the
1908 dirty_epoch to the future is prohibited as it's very unlikely to be
1909 intended: it definitely might wreak havoc with the index files.
1911 The new file event is unshifted (or, if dirty_epoch is set, inserted
1912 at the place it belongs to, according to the rule to have a sequence
1913 of strictly decreasing timestamps) to the array of recent_events and
1914 the array is shortened to the length of the timespan allowed. This is
1915 usually the timespan specified by the interval of this recentfile but
1916 as long as this recentfile has not been merged to another one, the
1917 timespan may grow without bounds.
1919 The third form runs an update without inserting a new file. This may
1920 be desired to truncate a recentfile.
1922 =cut
1923 sub _epoch_monotonically_increasing {
1924 my($self,$epoch,$recent) = @_;
1925 return $epoch unless @$recent; # the first one goes unoffended
1926 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1927 return $epoch;
1928 } else {
1929 return _increase_a_bit($recent->[0]{epoch});
1932 sub update {
1933 my($self,$path,$type,$dirty_epoch) = @_;
1934 if (defined $path or defined $type or defined $dirty_epoch) {
1935 die "update called without path argument" unless defined $path;
1936 die "update called without type argument" unless defined $type;
1937 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1938 # since we have keep_delete_objects_forever we must let them inject delete objects too:
1939 #die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1940 # "dirty_epoch only allowed with type=new" if defined $dirty_epoch and $type ne "new";
1941 my $canonmeth = $self->canonize;
1942 unless ($canonmeth) {
1943 $canonmeth = "naive_path_normalize";
1945 $path = $self->$canonmeth($path);
1947 my $lrd = $self->localroot;
1948 $self->lock;
1949 # you must calculate the time after having locked, of course
1950 my $now = Time::HiRes::time;
1951 my $interval = $self->interval;
1952 my $secs = $self->interval_secs();
1953 my $recent = $self->recent_events;
1955 my $epoch;
1956 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
1957 $epoch = $dirty_epoch;
1958 } else {
1959 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
1962 $recent ||= [];
1963 my $oldest_allowed = 0;
1964 my $merged = $self->merged;
1965 if ($merged->{epoch}) {
1966 my $virtualnow = _bigfloatmax($now,$epoch);
1967 # for the lower bound I think we need no big math, we calc already
1968 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
1969 } else {
1970 # as long as we are not merged at all, no limits!
1972 my $something_done = 0;
1973 TRUNCATE: while (@$recent) {
1974 # $DB::single++ unless defined $oldest_allowed;
1975 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
1976 pop @$recent;
1977 $something_done = 1;
1978 } else {
1979 last TRUNCATE;
1982 if (defined $path && $path =~ s|^\Q$lrd\E||) {
1983 $path =~ s|^/||;
1984 my $splicepos;
1985 # remove the older duplicates of this $path, irrespective of $type:
1986 if (defined $dirty_epoch) {
1987 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
1988 $recent = $ctx->{recent};
1989 $splicepos = $ctx->{splicepos};
1990 $epoch = $ctx->{epoch};
1991 my $dirtymark = $self->dirtymark;
1992 my $new_dm = $now;
1993 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
1994 $new_dm = $epoch;
1996 $self->dirtymark($new_dm);
1997 my $merged = $self->merged;
1998 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
1999 $self->merged(+{});
2001 } else {
2002 $recent = [ grep { $_->{path} ne $path } @$recent ];
2003 $splicepos = 0;
2005 if (defined $splicepos) {
2006 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
2008 $something_done = 1;
2011 $self->write_recent($recent) if $something_done;
2012 $self->_assert_symlink;
2013 $self->unlock;
2016 sub _update_with_dirty_epoch {
2017 my($self,$path,$recent,$epoch) = @_;
2018 my $splicepos;
2019 my $new_recent = [];
2020 if (grep { $_->{path} ne $path } @$recent) {
2021 my $cancel = 0;
2022 KNOWN_EVENT: for my $i (0..$#$recent) {
2023 if ($recent->[$i]{path} eq $path) {
2024 if ($recent->[$i]{epoch} eq $epoch) {
2025 # nothing to do
2026 $cancel = 1;
2027 last KNOWN_EVENT;
2029 } else {
2030 push @$new_recent, $recent->[$i];
2033 @$recent = @$new_recent unless $cancel;
2035 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
2036 $splicepos = 0;
2037 } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
2038 $splicepos = @$recent;
2039 } else {
2040 RECENT: for my $i (0..$#$recent) {
2041 my $ev = $recent->[$i];
2042 if ($epoch eq $recent->[$i]{epoch}) {
2043 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
2045 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
2046 $splicepos = $i;
2047 last RECENT;
2051 return {
2052 recent => $recent,
2053 splicepos => $splicepos,
2054 epoch => $epoch,
2058 =head2 seed
2060 Sets this recentfile in the state of 'seeded' which means it has to
2061 re-evaluate its uptodateness.
2063 =cut
2064 sub seed {
2065 my($self) = @_;
2066 $self->seeded(1);
2069 =head2 seeded
2071 Tells if the recentfile is in the state 'seeded'.
2073 =cut
2074 sub seeded {
2075 my($self, $set) = @_;
2076 if (defined $set) {
2077 $self->_seeded ($set);
2079 my $x = $self->_seeded;
2080 unless (defined $x) {
2081 $x = 0;
2082 $self->_seeded ($x);
2084 return $x;
2087 =head2 uptodate
2089 True if this object has mirrored the complete interval covered by the
2090 current recentfile.
2092 =cut
2093 sub uptodate {
2094 my($self) = @_;
2095 my $uptodate;
2096 my $why;
2097 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2098 $why = "saturated";
2099 $uptodate = 1;
2101 # it's too easy to misconfigure ttl and related timings and then
2102 # never reach uptodateness, so disabled 2009-03-22
2103 if (0 and not defined $uptodate) {
2104 if ($self->ttl_reached){
2105 $why = "ttl_reached returned true, so we are not uptodate";
2106 $uptodate = 0 ;
2109 unless (defined $uptodate) {
2110 # look if recentfile has unchanged timestamp
2111 my $minmax = $self->minmax;
2112 if (exists $minmax->{mtime}) {
2113 my $rfile = $self->_my_current_rfile;
2114 my @stat = stat $rfile or die "Could not stat '$rfile': $!";
2115 my $mtime = $stat[9];
2116 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
2117 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2118 $uptodate = 0;
2119 } else {
2120 my $covered = $self->done->covered(@$minmax{qw(max min)});
2121 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2122 $uptodate = $covered;
2126 unless (defined $uptodate) {
2127 $why = "fallthrough, so not uptodate";
2128 $uptodate = 0;
2130 if ($uptodate) {
2131 $self->_uptodateness_ever_reached(1);
2133 my $remember =
2135 uptodate => $uptodate,
2136 why => $why,
2138 $self->_remember_last_uptodate_call($remember);
2139 return $uptodate;
2142 =head2 $obj->write_recent ($recent_files_arrayref)
2144 Writes a I<recentfile> based on the current reflection of the current
2145 state of the tree limited by the current interval.
2147 =cut
2148 sub _resort {
2149 my($self) = @_;
2150 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
2151 return;
2153 sub write_recent {
2154 my ($self,$recent) = @_;
2155 die "write_recent called without argument" unless defined $recent;
2156 my $Last_epoch;
2157 SANITYCHECK: for my $i (0..$#$recent) {
2158 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2159 require Carp;
2160 Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2161 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2162 # you may want to:
2163 # $self->_resort($recent);
2164 # last SANITYCHECK;
2166 $Last_epoch = $recent->[$i]{epoch};
2168 my $minmax = $self->minmax;
2169 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2170 $minmax->{max} = $recent->[0]{epoch};
2172 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2173 $minmax->{min} = $recent->[-1]{epoch};
2175 $self->minmax($minmax);
2176 my $meth = sprintf "write_%d", $self->protocol;
2177 $self->$meth($recent);
2180 =head2 $obj->write_0 ($recent_files_arrayref)
2182 Delegate of C<write_recent()> on protocol 0
2184 =cut
2186 sub write_0 {
2187 my ($self,$recent) = @_;
2188 my $rfile = $self->rfile;
2189 YAML::Syck::DumpFile("$rfile.new",$recent);
2190 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2193 =head2 $obj->write_1 ($recent_files_arrayref)
2195 Delegate of C<write_recent()> on protocol 1
2197 =cut
2199 sub write_1 {
2200 my ($self,$recent) = @_;
2201 my $rfile = $self->rfile;
2202 my $suffix = $self->serializer_suffix;
2203 my $data = {
2204 meta => $self->meta_data,
2205 recent => $recent,
2207 my $serialized;
2208 if ($suffix eq ".yaml") {
2209 $serialized = YAML::Syck::Dump($data);
2210 } elsif ($HAVE->{"Data::Serializer"}) {
2211 my $serializer = Data::Serializer->new
2212 ( serializer => $serializers{$suffix} );
2213 $serialized = $serializer->raw_serialize($data);
2214 } else {
2215 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2217 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2218 print $fh $serialized;
2219 close $fh or die "Could not close '$rfile.new': $!";
2220 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2223 BEGIN {
2224 my $nq = qr/[^"]+/; # non-quotes
2225 my @pod_lines =
2226 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2228 =head1 SERIALIZERS
2230 The following suffixes are supported and trigger the use of these
2231 serializers:
2233 =over 4
2235 =item C<< ".yaml" => "YAML::Syck" >>
2237 =item C<< ".json" => "JSON" >>
2239 =item C<< ".sto" => "Storable" >>
2241 =item C<< ".dd" => "Data::Dumper" >>
2243 =back
2245 =cut
2247 BEGIN {
2248 my @pod_lines =
2249 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2251 =head1 INTERVAL SPEC
2253 An interval spec is a primitive way to express time spans. Normally it
2254 is composed from an integer and a letter.
2256 As a special case, a string that consists only of the single letter
2257 C<Z>, stands for MAX_INT seconds.
2259 The following letters express the specified number of seconds:
2261 =over 4
2263 =item C<< s => 1 >>
2265 =item C<< m => 60 >>
2267 =item C<< h => 60*60 >>
2269 =item C<< d => 60*60*24 >>
2271 =item C<< W => 60*60*24*7 >>
2273 =item C<< M => 60*60*24*30 >>
2275 =item C<< Q => 60*60*24*90 >>
2277 =item C<< Y => 60*60*24*365.25 >>
2279 =back
2281 =cut
2283 =head1 SEE ALSO
2285 L<File::Rsync::Mirror::Recent>,
2286 L<File::Rsync::Mirror::Recentfile::Done>,
2287 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2289 =head1 BUGS
2291 Please report any bugs or feature requests through the web interface
2293 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2294 I will be notified, and then you'll automatically be notified of
2295 progress on your bug as I make changes.
2297 =head1 KNOWN BUGS
2299 Memory hungry: it seems all memory is allocated during the initial
2300 rsync where a list of all files is maintained in memory.
2302 =head1 SUPPORT
2304 You can find documentation for this module with the perldoc command.
2306 perldoc File::Rsync::Mirror::Recentfile
2308 You can also look for information at:
2310 =over 4
2312 =item * RT: CPAN's request tracker
2314 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2316 =item * AnnoCPAN: Annotated CPAN documentation
2318 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2320 =item * CPAN Ratings
2322 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2324 =item * Search CPAN
2326 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2328 =back
2331 =head1 ACKNOWLEDGEMENTS
2333 Thanks to RJBS for module-starter.
2335 =head1 AUTHOR
2337 Andreas König
2339 =head1 COPYRIGHT & LICENSE
2341 Copyright 2008,2009 Andreas König.
2343 This program is free software; you can redistribute it and/or modify it
2344 under the same terms as Perl itself.
2347 =cut
2349 1; # End of File::Rsync::Mirror::Recentfile
2351 # Local Variables:
2352 # mode: cperl
2353 # cperl-indent-level: 4
2354 # End: