remove unused and outdated method mirror_loop, superceded by Recent->rmirror
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob7be5ab74829f09bdc49d8d021407fe93411fabe3
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =cut
14 my $HAVE = {};
15 for my $package (
16 "Data::Serializer",
17 "File::Rsync"
18 ) {
19 $HAVE->{$package} = eval qq{ require $package; };
21 use Config;
22 use File::Basename qw(basename dirname fileparse);
23 use File::Copy qw(cp);
24 use File::Path qw(mkpath);
25 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
26 use File::Temp;
27 use List::Util qw(first max min);
28 use Scalar::Util qw(reftype);
29 use Storable;
30 use Time::HiRes qw();
31 use YAML::Syck;
33 use version; our $VERSION = qv('0.0.1');
35 use constant MAX_INT => ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL => 1;
38 # cf. interval_secs
39 my %seconds;
41 # maybe subclass if this mapping is bad?
42 my %serializers;
44 =head1 SYNOPSIS
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
51 interval => q(6h),
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
59 Reader/mirrorer:
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
64 ignore_link_stat_errors => 1,
65 interval => q(6h),
66 localroot => "/home/ftp/pub/PAUSE/authors",
67 remote_dir => "",
68 remote_host => "pause.perl.org",
69 remote_module => "authors",
70 rsync_options => {
71 compress => 1,
72 'rsync-path' => '/usr/bin/rsync',
73 links => 1,
74 times => 1,
75 'omit-dir-times' => 1,
76 checksum => 1,
78 verbose => 1,
80 $rf->mirror;
82 Aggregator (usually the writer):
84 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
85 $rf->aggregate;
87 =head1 DESCRIPTION
89 Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
90 is always composed of several recentfiles, controlled by the
91 F:R:M:Recent object. The Recentfile object has to do the bookkeeping
92 for a single timeslice.
94 =head1 EXPORT
96 No exports.
98 =head1 CONSTRUCTORS / DESTRUCTOR
100 =head2 my $obj = CLASS->new(%hash)
102 Constructor. On every argument pair the key is a method name and the
103 value is an argument to that method name.
105 If a recentfile for this resource already exists, metadata that are
106 not defined by the constructor will be fetched from there as soon as
107 it is being read by recent_events().
109 =cut
111 sub new {
112 my($class, @args) = @_;
113 my $self = bless {}, $class;
114 while (@args) {
115 my($method,$arg) = splice @args, 0, 2;
116 $self->$method($arg);
118 unless (defined $self->protocol) {
119 $self->protocol(DEFAULT_PROTOCOL);
121 unless (defined $self->filenameroot) {
122 $self->filenameroot("RECENT");
124 unless (defined $self->serializer_suffix) {
125 $self->serializer_suffix(".yaml");
127 return $self;
130 =head2 my $obj = CLASS->new_from_file($file)
132 Constructor. $file is a I<recentfile>.
134 =cut
136 sub new_from_file {
137 my($class, $file) = @_;
138 my $self = bless {}, $class;
139 $self->_rfile($file);
140 #?# $self->lock;
141 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
142 local $/;
143 <$fh>;
145 # XXX: we can skip this step when the metadata are sufficient, but
146 # we cannot parse the file without some magic stuff about
147 # serialized formats
148 while (-l $file) {
149 my($name,$path) = fileparse $file;
150 my $symlink = readlink $file;
151 if ($symlink =~ m|/|) {
152 die "FIXME: filenames containing '/' not supported, got $symlink";
154 $file = File::Spec->catfile ( $path, $symlink );
156 my($name,$path,$suffix) = fileparse $file, keys %serializers;
157 $self->serializer_suffix($suffix);
158 $self->localroot($path);
159 die "Could not determine file format from suffix" unless $suffix;
160 my $deserialized;
161 if ($suffix eq ".yaml") {
162 require YAML::Syck;
163 $deserialized = YAML::Syck::LoadFile($file);
164 } elsif ($HAVE->{"Data::Serializer"}) {
165 my $serializer = Data::Serializer->new
166 ( serializer => $serializers{$suffix} );
167 $deserialized = $serializer->raw_deserialize($serialized);
168 } else {
169 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
171 while (my($k,$v) = each %{$deserialized->{meta}}) {
172 next if $k ne lc $k; # "Producers"
173 $self->$k($v);
175 unless (defined $self->protocol) {
176 $self->protocol(DEFAULT_PROTOCOL);
178 return $self;
181 =head2 DESTROY
183 A simple unlock.
185 =cut
186 sub DESTROY {
187 my $self = shift;
188 $self->unlock;
189 unless ($self->_current_tempfile_fh) {
190 if (my $tempfile = $self->_current_tempfile) {
191 if (-e $tempfile) {
192 unlink $tempfile; # may fail in global destruction
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_delayed_operations",
209 "_done",
210 "_interval",
211 "_is_locked",
212 "_localroot",
213 "_merged",
214 "_pathdb",
215 "_remember_last_uptodate_call",
216 "_remote_dir",
217 "_remoteroot",
218 "_rfile",
219 "_rsync",
220 "__verified_tempdir",
221 "_seeded",
222 "_uptodateness_ever_reached",
223 "_use_tempfile",
226 my @pod_lines =
227 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
229 =over 4
231 =item aggregator
233 A list of interval specs that tell the aggregator which I<recentfile>s
234 are to be produced.
236 =item canonize
238 The name of a method to canonize the path before rsyncing. Only
239 supported value is C<naive_path_normalize>. Defaults to that.
241 =item comment
243 A comment about this tree and setup.
245 =item dirtymark
247 A timestamp. The dirtymark is updated whenever an out of band change
248 on the origin server is performed that violates the protocol. Say,
249 they add or remove files in the middle somewhere. Slaves must react
250 with a devaluation of their C<done> structure which then leads to a
251 full re-sync of all files. Implementation note: dirtymark may increase
252 or decrease.
254 =item filenameroot
256 The (prefix of the) filename we use for this I<recentfile>. Defaults to
257 C<RECENT>. The string must not contain a directory separator.
259 =item have_mirrored
261 Timestamp remembering when we mirrored this recentfile the last time.
262 Only relevant for slaves.
264 =item ignore_link_stat_errors
266 If set to true, rsync errors are ignored that complain about link stat
267 errors. These seem to happen only when there are files missing at the
268 origin. In race conditions this can always happen, so it is
269 recommended to set this value to true.
271 =item is_slave
273 If set to true, this object will fetch a new recentfile from remote
274 when the timespan between the last mirror (see have_mirrored) and now
275 is too large (see C<ttl>).
277 =item keep_delete_objects_forever
279 The default for delete events is that they are passed through the
280 collection of recentfile objects until they reach the Z file. There
281 they get dropped so that the associated file object ceases to exist at
282 all. By setting C<keep_delete_objects_forever> the delete objects are
283 kept forever. This makes the Z file larger but has the advantage that
284 slaves that have interrupted mirroring for a long time still can clean
285 up their copy.
287 =item locktimeout
289 After how many seconds shall we die if we cannot lock a I<recentfile>?
290 Defaults to 600 seconds.
292 =item loopinterval
294 When mirror_loop is called, this accessor can specify how much time
295 every loop shall at least take. If the work of a loop is done before
296 that time has gone, sleeps for the rest of the time. Defaults to
297 arbitrary 42 seconds.
299 =item max_files_per_connection
301 Maximum number of files that are transferred on a single rsync call.
302 Setting it higher means higher performance at the price of holding
303 connections longer and potentially disturbing other users in the pool.
304 Defaults to the arbitrary value 42.
306 =item max_rsync_errors
308 When rsync operations encounter that many errors without any resetting
309 success in between, then we die. Defaults to unlimited. A value of
310 -1 means we run forever ignoring all rsync errors.
312 =item minmax
314 Hashref remembering when we read the recent_events from this file the
315 last time and what the timespan was.
317 =item protocol
319 When the RECENT file format changes, we increment the protocol. We try
320 to support older protocols in later releases.
322 =item remote_host
324 The host we are mirroring from. Leave empty for the local filesystem.
326 =item remote_module
328 Rsync servers have so called modules to separate directory trees from
329 each other. Put here the name of the module under which we are
330 mirroring. Leave empty for local filesystem.
332 =item rsync_options
334 Things like compress, links, times or checksums. Passed in to the
335 File::Rsync object used to run the mirror.
337 =item serializer_suffix
339 Mostly untested accessor. The only well tested format for
340 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
341 Data::Serializer. But in principle other formats are supported as
342 well. See section SERIALIZERS below.
344 =item sleep_per_connection
346 Sleep that many seconds (floating point OK) after every chunk of rsyncing
347 has finished. Defaults to arbitrary 0.42.
349 =item tempdir
351 Directory to write temporary files to. Must allow rename operations
352 into the tree which usually means it must live on the same partition
353 as the target directory. Defaults to C<< $self->localroot >>.
355 =item ttl
357 Time to live. Number of seconds after which this recentfile must be
358 fetched again from the origin server. Only relevant for slaves.
359 Defaults to arbitrary 24.2 seconds.
361 =item verbose
363 Boolean to turn on a bit verbosity.
365 =back
367 =cut
369 use accessors @accessors;
371 =head1 METHODS
373 =head2 (void) $obj->aggregate( %options )
375 Takes all intervals that are collected in the accessor called
376 aggregator. Sorts them by actual length of the interval.
377 Removes those that are shorter than our own interval. Then merges this
378 object into the next larger object. The merging continues upwards
379 as long as the next I<recentfile> is old enough to warrant a merge.
381 If a merge is warranted is decided according to the interval of the
382 previous interval so that larger files are not so often updated as
383 smaller ones. If $options{force} is true, all files get updated.
385 Here is an example to illustrate the behaviour. Given aggregators
387 1h 1d 1W 1M 1Q 1Y Z
389 then
391 1h updates 1d on every call to aggregate()
392 1d updates 1W earliest after 1h
393 1W updates 1M earliest after 1d
394 1M updates 1Q earliest after 1W
395 1Q updates 1Y earliest after 1M
396 1Y updates Z earliest after 1Q
398 Note that all but the smallest recentfile get updated at an arbitrary
399 rate and as such are quite useless on their own.
401 =cut
403 sub aggregate {
404 my($self, %option) = @_;
405 my @aggs = sort { $a->{secs} <=> $b->{secs} }
406 grep { $_->{secs} >= $self->interval_secs }
407 map { { interval => $_, secs => $self->interval_secs($_)} }
408 $self->interval, @{$self->aggregator || []};
409 $self->update;
410 $aggs[0]{object} = $self;
411 AGGREGATOR: for my $i (0..$#aggs-1) {
412 my $this = $aggs[$i]{object};
413 my $next = $this->_sparse_clone;
414 $next->interval($aggs[$i+1]{interval});
415 my $want_merge = 0;
416 if ($option{force} || $i == 0) {
417 $want_merge = 1;
418 } else {
419 my $next_rfile = $next->rfile;
420 if (-e $next_rfile) {
421 my $prev = $aggs[$i-1]{object};
422 local $^T = time;
423 my $next_age = 86400 * -M $next_rfile;
424 if ($next_age > $prev->interval_secs) {
425 $want_merge = 1;
427 } else {
428 $want_merge = 1;
431 if ($want_merge) {
432 $next->merge($this);
433 $aggs[$i+1]{object} = $next;
434 } else {
435 last AGGREGATOR;
440 # collect file size and mtime for all files of this aggregate
441 sub _debug_aggregate {
442 my($self) = @_;
443 my @aggs = sort { $a->{secs} <=> $b->{secs} }
444 map { { interval => $_, secs => $self->interval_secs($_)} }
445 $self->interval, @{$self->aggregator || []};
446 my $report = [];
447 for my $i (0..$#aggs) {
448 my $this = Storable::dclone $self;
449 $this->interval($aggs[$i]{interval});
450 my $rfile = $this->rfile;
451 my @stat = stat $rfile;
452 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
454 $report;
457 # (void) $self->_assert_symlink()
458 sub _assert_symlink {
459 my($self) = @_;
460 my $recentrecentfile = File::Spec->catfile
462 $self->localroot,
463 sprintf
465 "%s.recent",
466 $self->filenameroot
469 if ($Config{d_symlink} eq "define") {
470 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
471 if (-l $recentrecentfile) {
472 my $found_symlink = readlink $recentrecentfile;
473 if ($found_symlink eq $self->rfilename) {
474 return;
475 } else {
476 $howto_create_symlink = 2;
478 } else {
479 $howto_create_symlink = 1;
481 if (1 == $howto_create_symlink) {
482 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
483 } else {
484 unlink "$recentrecentfile.$$"; # may fail
485 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
486 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
488 } else {
489 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
490 unlink "$recentrecentfile.$$"; # may fail
491 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
492 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
496 =head2 $hashref = $obj->delayed_operations
498 A hash of hashes containing unlink and rmdir operations which had to
499 wait until the recentfile got unhidden in order to not confuse
500 downstream mirrors (in case we have some).
502 =cut
504 sub delayed_operations {
505 my($self) = @_;
506 my $x = $self->_delayed_operations;
507 unless (defined $x) {
508 $x = {
509 unlink => {},
510 rmdir => {},
512 $self->_delayed_operations ($x);
514 return $x;
517 =head2 $done = $obj->done
519 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
520 object that keeps track of rsync activities. Only needed and used when
521 we are a mirroring slave.
523 =cut
525 sub done {
526 my($self) = @_;
527 my $done = $self->_done;
528 if (!$done) {
529 require File::Rsync::Mirror::Recentfile::Done;
530 $done = File::Rsync::Mirror::Recentfile::Done->new();
531 $done->_rfinterval ($self->interval);
532 $self->_done ( $done );
534 return $done;
537 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
539 Stores the remote I<recentfile> locally as a tempfile. The caller is
540 responsible to remove the file after use.
542 Note: if you're intending to act as an rsync server for other slaves,
543 then you must prefer this method to fetch that file with
544 get_remotefile(). Otherwise downstream mirrors would expect you to
545 already have mirrored all the files that are in the I<recentfile>
546 before you have them mirrored.
548 =cut
550 sub get_remote_recentfile_as_tempfile {
551 my($self) = @_;
552 mkpath $self->localroot;
553 my $fh;
554 my $trfilename;
555 if ( $self->_use_tempfile() ) {
556 if ($self->ttl_reached) {
557 $fh = $self->_current_tempfile_fh;
558 $trfilename = $self->rfilename;
559 } else {
560 return $self->_current_tempfile;
562 } else {
563 $trfilename = $self->rfilename;
566 my $dst;
567 if ($fh) {
568 $dst = $self->_current_tempfile;
569 } else {
570 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
571 $dst = $fh->filename;
572 $self->_current_tempfile ($dst);
573 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
574 if (defined $rfile && -e $rfile) {
575 # saving on bandwidth. Might need to be configurable
576 # $self->bandwidth_is_cheap?
577 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
580 my $src = join ("/",
581 $self->remoteroot,
582 $trfilename,
584 if ($self->verbose) {
585 my $doing = -e $dst ? "Sync" : "Get";
586 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
587 printf STDERR
589 "%-4s %d (1/1/%s) temp %s ... ",
590 $doing,
591 time,
592 $self->interval,
593 $display_dst,
596 my $gaveup = 0;
597 my $retried = 0;
598 while (!$self->rsync->exec(
599 src => $src,
600 dst => $dst,
601 )) {
602 $self->register_rsync_error ($self->rsync->err);
603 if (++$retried >= 3) {
604 warn "XXX giving up";
605 $gaveup = 1;
606 last;
609 if ($gaveup) {
610 printf STDERR "Warning: gave up mirroring %s, will try again later", $self->interval;
611 } else {
612 $self->_refresh_internals ($dst);
613 $self->have_mirrored (Time::HiRes::time);
614 $self->un_register_rsync_error ();
616 $self->unseed;
617 if ($self->verbose) {
618 print STDERR "DONE\n";
620 my $mode = 0644;
621 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
622 return $dst;
625 sub _verified_tempdir {
626 my($self) = @_;
627 my $tempdir = $self->__verified_tempdir();
628 return $tempdir if defined $tempdir;
629 unless ($tempdir = $self->tempdir) {
630 $tempdir = $self->localroot;
632 unless (-d $tempdir) {
633 mkpath $tempdir;
635 $self->__verified_tempdir($tempdir);
636 return $tempdir;
639 sub _get_remote_rat_provide_tempfile_object {
640 my($self, $trfilename) = @_;
641 my $_verified_tempdir = $self->_verified_tempdir;
642 my $fh = File::Temp->new
643 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
644 $trfilename,
646 DIR => $_verified_tempdir,
647 SUFFIX => $self->serializer_suffix,
648 UNLINK => $self->_use_tempfile,
650 my $mode = 0644;
651 my $dst = $fh->filename;
652 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
653 if ($self->_use_tempfile) {
654 $self->_current_tempfile_fh ($fh); # delay self destruction
656 return $fh;
659 =head2 $localpath = $obj->get_remotefile ( $relative_path )
661 Rsyncs one single remote file to local filesystem.
663 Note: no locking is done on this file. Any number of processes may
664 mirror this object.
666 Note II: do not use for recentfiles. If you are a cascading
667 slave/server combination, it would confuse other slaves. They would
668 expect the contents of these recentfiles to be available. Use
669 get_remote_recentfile_as_tempfile() instead.
671 =cut
673 sub get_remotefile {
674 my($self, $path) = @_;
675 my $dst = File::Spec->catfile($self->localroot, $path);
676 mkpath dirname $dst;
677 if ($self->verbose) {
678 my $doing = -e $dst ? "Sync" : "Get";
679 printf STDERR
681 "%-4s %d (1/1/%s) %s ... ",
682 $doing,
683 time,
684 $self->interval,
685 $path,
688 while (!$self->rsync->exec(
689 src => join("/",
690 $self->remoteroot,
691 $path),
692 dst => $dst,
693 )) {
694 $self->register_rsync_error ($self->rsync->err);
696 $self->un_register_rsync_error ();
697 if ($self->verbose) {
698 print STDERR "DONE\n";
700 return $dst;
703 =head2 $obj->interval ( $interval_spec )
705 Get/set accessor. $interval_spec is a string and described below in
706 the section INTERVAL SPEC.
708 =cut
710 sub interval {
711 my ($self, $interval) = @_;
712 if (@_ >= 2) {
713 $self->_interval($interval);
714 $self->_rfile(undef);
716 $interval = $self->_interval;
717 unless (defined $interval) {
718 # do not ask the $self too much, it recurses!
719 require Carp;
720 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
722 return $interval;
725 =head2 $secs = $obj->interval_secs ( $interval_spec )
727 $interval_spec is described below in the section INTERVAL SPEC. If
728 empty defaults to the inherent interval for this object.
730 =cut
732 sub interval_secs {
733 my ($self, $interval) = @_;
734 $interval ||= $self->interval;
735 unless (defined $interval) {
736 die "interval_secs() called without argument on an object without a declared one";
738 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
739 die "Could not determine seconds from interval[$interval]";
740 if ($interval eq "Z") {
741 return MAX_INT;
742 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
743 return $seconds{$t}*$n;
744 } else {
745 die "Invalid interval specification: n[$n]t[$t]";
749 =head2 $obj->localroot ( $localroot )
751 Get/set accessor. The local root of the tree.
753 =cut
755 sub localroot {
756 my ($self, $localroot) = @_;
757 if (@_ >= 2) {
758 $self->_localroot($localroot);
759 $self->_rfile(undef);
761 $localroot = $self->_localroot;
764 =head2 $ret = $obj->local_path($path_found_in_recentfile)
766 Combines the path to our local mirror and the path of an object found
767 in this I<recentfile>. In other words: the target of a mirror operation.
769 Implementation note: We split on slashes and then use
770 File::Spec::catfile to adjust to the local operating system.
772 =cut
774 sub local_path {
775 my($self,$path) = @_;
776 unless (defined $path) {
777 # seems like a degenerated case
778 return $self->localroot;
780 my @p = split m|/|, $path;
781 File::Spec->catfile($self->localroot,@p);
784 =head2 (void) $obj->lock
786 Locking is implemented with an C<mkdir> on a locking directory
787 (C<.lock> appended to $rfile).
789 =cut
791 sub lock {
792 my ($self) = @_;
793 # not using flock because it locks on filehandles instead of
794 # old school ressources.
795 my $locked = $self->_is_locked and return;
796 my $rfile = $self->rfile;
797 # XXX need a way to allow breaking the lock
798 my $start = time;
799 my $locktimeout = $self->locktimeout || 600;
800 while (not mkdir "$rfile.lock") {
801 Time::HiRes::sleep 0.01;
802 if (time - $start > $locktimeout) {
803 die "Could not acquire lockdirectory '$rfile.lock': $!";
806 $self->_is_locked (1);
809 =head2 (void) $obj->merge ($other)
811 Bulk update of this object with another one. It's used to merge a
812 smaller and younger $other object into the current one. If this file
813 is a C<Z> file, then we normally do not merge in objects of type
814 C<delete>; this can be overridden by setting
815 keep_delete_objects_forever. But if we encounter an object of type
816 delete we delete the corresponding C<new> object if we have it.
818 If there is nothing to be merged, nothing is done.
820 =cut
822 sub merge {
823 my($self, $other) = @_;
824 $self->_merge_sanitycheck ( $other );
825 $other->lock;
826 my $other_recent = $other->recent_events || [];
827 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
828 $self->lock;
829 $self->_merge_locked ( $other, $other_recent );
830 $self->unlock;
831 $other->unlock;
834 sub _merge_locked {
835 my($self, $other, $other_recent) = @_;
836 my $my_recent = $self->recent_events || [];
838 # calculate the target time span
839 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
840 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
841 my $oldest_allowed = 0;
842 my $something_done;
843 unless ($my_recent->[0]) {
844 # obstetrics
845 $something_done = 1;
847 if ($epoch) {
848 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
849 $oldest_allowed = 0;
850 $something_done = 1;
851 } elsif (my $merged = $self->merged) {
852 my $secs = $self->interval_secs();
853 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
854 if (@$other_recent and
855 _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
857 $oldest_allowed = $other_recent->[-1]{epoch};
860 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
861 pop @$my_recent;
862 $something_done = 1;
866 my %have_path;
867 my $other_recent_filtered = [];
868 for my $oev (@$other_recent) {
869 my $oevepoch = $oev->{epoch} || 0;
870 next if _bigfloatlt($oevepoch, $oldest_allowed);
871 my $path = $oev->{path};
872 next if $have_path{$path}++;
873 if ( $self->interval eq "Z"
874 and $oev->{type} eq "delete"
875 and ! $self->keep_delete_objects_forever
877 # do nothing
878 } else {
879 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
880 $something_done = 1;
882 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
885 if ($something_done) {
886 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
890 sub _merge_something_done {
891 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
892 my $recent = [];
893 my $epoch_conflict = 0;
894 my $last_epoch;
895 ZIP: while (@$other_recent_filtered || @$my_recent) {
896 my $event;
897 if (!@$my_recent ||
898 @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
899 $event = shift @$other_recent_filtered;
900 } else {
901 $event = shift @$my_recent;
902 next ZIP if $have_path->{$event->{path}}++;
904 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
905 $last_epoch = $event->{epoch};
906 push @$recent, $event;
908 if ($epoch_conflict) {
909 my %have_epoch;
910 for (my $i = $#$recent;$i>=0;$i--) {
911 my $epoch = $recent->[$i]{epoch};
912 if ($have_epoch{$epoch}++) {
913 while ($have_epoch{$epoch}) {
914 $epoch = _increase_a_bit($epoch);
916 $recent->[$i]{epoch} = $epoch;
917 $have_epoch{$epoch}++;
921 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
922 $self->dirtymark ( $other->dirtymark );
924 $self->write_recent($recent);
925 $other->merged({
926 time => Time::HiRes::time, # not used anywhere
927 epoch => $recent->[0]{epoch},
928 into_interval => $self->interval, # not used anywhere
930 $other->write_recent($other_recent);
933 sub _merge_sanitycheck {
934 my($self, $other) = @_;
935 if ($self->interval_secs <= $other->interval_secs) {
936 die sprintf
938 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
939 $self->interval_secs,
940 $other->interval_secs,
945 =head2 merged
947 Hashref denoting when this recentfile has been merged into some other
948 at which epoch.
950 =cut
952 sub merged {
953 my($self, $set) = @_;
954 if (defined $set) {
955 $self->_merged ($set);
957 my $merged = $self->_merged;
958 my $into;
959 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
960 # sanity checks
961 if ($into eq $self->interval) {
962 require Carp;
963 Carp::cluck(sprintf
965 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
966 $into,
967 $self->interval,
969 } elsif ($self->interval_secs($into) < $self->interval_secs) {
970 require Carp;
971 Carp::cluck(sprintf
973 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
974 $self->interval_secs($into),
975 $self->interval_secs,
976 $self->interval,
980 $merged;
983 =head2 $hashref = $obj->meta_data
985 Returns the hashref of metadata that the server has to add to the
986 I<recentfile>.
988 =cut
990 sub meta_data {
991 my($self) = @_;
992 my $ret = $self->{meta};
993 for my $m (
994 "aggregator",
995 "canonize",
996 "comment",
997 "dirtymark",
998 "filenameroot",
999 "interval",
1000 "merged",
1001 "minmax",
1002 "protocol",
1003 "serializer_suffix",
1005 my $v = $self->$m;
1006 if (defined $v) {
1007 $ret->{$m} = $v;
1010 # XXX need to reset the Producer if I am a writer, keep it when I
1011 # am a reader
1012 $ret->{Producers} ||= {
1013 __PACKAGE__, "$VERSION", # stringified it looks better
1014 '$0', $0,
1015 'time', Time::HiRes::time,
1017 $ret->{dirtymark} ||= Time::HiRes::time;
1018 return $ret;
1021 =head2 $success = $obj->mirror ( %options )
1023 Mirrors the files in this I<recentfile> as reported by
1024 C<recent_events>. Options named C<after>, C<before>, C<max>, and
1025 C<skip-deletes> are passed through to the C<recent_events> call. The
1026 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
1027 C<max_files_per_connection> and keep track of the rsynced files so
1028 that future calls will rsync different files until all files are
1029 brought to sync.
1031 =cut
1033 sub mirror {
1034 my($self, %options) = @_;
1035 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1036 $self->_use_tempfile (1);
1037 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
1038 my ($recent_events) = $self->recent_events(%passthrough);
1039 my(@error, @dlcollector); # download-collector: array containing paths we need
1040 my $first_item = 0;
1041 my $last_item = $#$recent_events;
1042 my $done = $self->done;
1043 my $pathdb = $self->_pathdb;
1044 ITEM: for my $i ($first_item..$last_item) {
1045 my $status = +{};
1046 $self->_mirror_item
1049 $recent_events,
1050 $last_item,
1051 $done,
1052 $pathdb,
1053 \@dlcollector,
1054 \%options,
1055 $status,
1056 \@error,
1058 last if $i == $last_item;
1059 if ($status->{mustreturn}){
1060 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1061 # looks like a bug somewhere else
1062 my $t = $self->_current_tempfile;
1063 unlink $t or die "Could not unlink '$t': $!";
1064 $self->_current_tempfile(undef);
1066 return;
1069 if (@dlcollector) {
1070 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
1071 if (!$success || $@) {
1072 warn "Warning: Unknown error while mirroring: $@";
1073 push @error, $@;
1074 sleep 1;
1077 if ($self->verbose) {
1078 print STDERR "DONE\n";
1080 # once we've gone to the end we consider ourselves free of obligations
1081 $self->unseed;
1082 $self->_mirror_unhide_tempfile ($trecentfile);
1083 $self->_mirror_perform_delayed_ops;
1084 return !@error;
1087 sub _mirror_item {
1088 my($self,
1090 $recent_events,
1091 $last_item,
1092 $done,
1093 $pathdb,
1094 $dlcollector,
1095 $options,
1096 $status,
1097 $error,
1098 ) = @_;
1099 my $recent_event = $recent_events->[$i];
1100 return if $done->covered ( $recent_event->{epoch} );
1101 if ($pathdb) {
1102 my $rec = $pathdb->{$recent_event->{path}};
1103 if ($rec && $rec->{recentepoch}) {
1104 if (_bigfloatgt
1105 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1106 $done->register ($recent_events, [$i]);
1107 return;
1111 my $dst = $self->local_path($recent_event->{path});
1112 if ($recent_event->{type} eq "new"){
1113 $self->_mirror_item_new
1115 $dst,
1117 $last_item,
1118 $recent_events,
1119 $recent_event,
1120 $dlcollector,
1121 $pathdb,
1122 $status,
1123 $error,
1124 $options,
1126 } elsif ($recent_event->{type} eq "delete") {
1127 my $activity;
1128 if ($options->{'skip-deletes'}) {
1129 $activity = "skipped";
1130 } else {
1131 if (! -e $dst) {
1132 $activity = "not_found";
1133 } elsif (-l $dst or not -d _) {
1134 $self->delayed_operations->{unlink}{$dst}++;
1135 $activity = "deleted";
1136 } else {
1137 $self->delayed_operations->{rmdir}{$dst}++;
1138 $activity = "deleted";
1141 $done->register ($recent_events, [$i]);
1142 if ($pathdb) {
1143 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1145 } else {
1146 warn "Warning: invalid upload type '$recent_event->{type}'";
1150 sub _mirror_item_new {
1151 my($self,
1152 $dst,
1154 $last_item,
1155 $recent_events,
1156 $recent_event,
1157 $dlcollector,
1158 $pathdb,
1159 $status,
1160 $error,
1161 $options,
1162 ) = @_;
1163 if ($self->verbose) {
1164 my $doing = -e $dst ? "Sync" : "Get";
1165 printf STDERR
1167 "%-4s %d (%d/%d/%s) %s ... ",
1168 $doing,
1169 time,
1170 1+$i,
1171 1+$last_item,
1172 $self->interval,
1173 $recent_event->{path},
1176 my $max_files_per_connection = $self->max_files_per_connection || 42;
1177 my $success;
1178 if ($self->verbose) {
1179 print STDERR "\n";
1181 push @$dlcollector, { rev => $recent_event, i => $i };
1182 if (@$dlcollector >= $max_files_per_connection) {
1183 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1184 my $sleep = $self->sleep_per_connection;
1185 $sleep = 0.42 unless defined $sleep;
1186 Time::HiRes::sleep $sleep;
1187 if ($options->{piecemeal}) {
1188 $status->{mustreturn} = 1;
1189 return;
1191 } else {
1192 return;
1194 if (!$success || $@) {
1195 warn "Warning: Error while mirroring: $@";
1196 push @$error, $@;
1197 sleep 1;
1199 if ($self->verbose) {
1200 print STDERR "DONE\n";
1204 sub _mirror_dlcollector {
1205 my($self,$xcoll,$pathdb,$recent_events) = @_;
1206 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1207 if ($pathdb) {
1208 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1210 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1211 @$xcoll = ();
1212 return $success;
1215 sub _mirror_register_path {
1216 my($self,$pathdb,$coll,$activity) = @_;
1217 my $time = time;
1218 for my $item (@$coll) {
1219 $pathdb->{$item->{path}} =
1221 recentepoch => $item->{epoch},
1222 ($activity."_on") => $time,
1227 sub _mirror_unhide_tempfile {
1228 my($self, $trecentfile) = @_;
1229 my $rfile = $self->rfile;
1230 if (rename $trecentfile, $rfile) {
1231 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1232 } else {
1233 require Carp;
1234 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1236 $self->_use_tempfile (0);
1237 if (my $ctfh = $self->_current_tempfile_fh) {
1238 $ctfh->unlink_on_destroy (0);
1239 $self->_current_tempfile_fh (undef);
1243 sub _mirror_perform_delayed_ops {
1244 my($self) = @_;
1245 my $delayed = $self->delayed_operations;
1246 for my $dst (keys %{$delayed->{unlink}}) {
1247 unless (unlink $dst) {
1248 require Carp;
1249 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1251 delete $delayed->{unlink}{$dst};
1253 for my $dst (keys %{$delayed->{rmdir}}) {
1254 unless (rmdir $dst) {
1255 require Carp;
1256 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1258 delete $delayed->{rmdir}{$dst};
1262 =head2 $success = $obj->mirror_path ( $arrref | $path )
1264 If the argument is a scalar it is treated as a path. The remote path
1265 is mirrored into the local copy. $path is the path found in the
1266 I<recentfile>, i.e. it is relative to the root directory of the
1267 mirror.
1269 If the argument is an array reference then all elements are treated as
1270 a path below the current tree and all are rsynced with a single
1271 command (and a single connection).
1273 =cut
1275 sub mirror_path {
1276 my($self,$path) = @_;
1277 # XXX simplify the two branches such that $path is treated as
1278 # [$path] maybe even demand the argument as an arrayref to
1279 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1280 # interface)
1281 if (ref $path and ref $path eq "ARRAY") {
1282 my $dst = $self->localroot;
1283 mkpath dirname $dst;
1284 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1285 lc $self->filenameroot,
1287 TMPDIR => 1,
1288 UNLINK => 0,
1290 for my $p (@$path) {
1291 print $fh $p, "\n";
1293 $fh->flush;
1294 $fh->unlink_on_destroy(1);
1295 my $gaveup = 0;
1296 my $retried = 0;
1297 while (!$self->rsync->exec
1299 src => join("/",
1300 $self->remoteroot,
1302 dst => $dst,
1303 'files-from' => $fh->filename,
1304 )) {
1305 my(@err) = $self->rsync->err;
1306 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1307 if ($self->verbose) {
1308 warn "Info: ignoring link_stat error '@err'";
1310 return 1;
1312 $self->register_rsync_error (@err);
1313 if (++$retried >= 3) {
1314 warn "XXX giving up.";
1315 $gaveup = 1;
1316 last;
1319 unless ($gaveup) {
1320 $self->un_register_rsync_error ();
1322 } else {
1323 my $dst = $self->local_path($path);
1324 mkpath dirname $dst;
1325 while (!$self->rsync->exec
1327 src => join("/",
1328 $self->remoteroot,
1329 $path
1331 dst => $dst,
1332 )) {
1333 my(@err) = $self->rsync->err;
1334 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1335 if ($self->verbose) {
1336 warn "Info: ignoring link_stat error '@err'";
1338 return 1;
1340 $self->register_rsync_error (@err);
1342 $self->un_register_rsync_error ();
1344 return 1;
1347 sub _my_current_rfile {
1348 my($self) = @_;
1349 my $rfile;
1350 if ($self->_use_tempfile) {
1351 $rfile = $self->_current_tempfile;
1352 } else {
1353 $rfile = $self->rfile;
1355 return $rfile;
1358 =head2 $path = $obj->naive_path_normalize ($path)
1360 Takes an absolute unix style path as argument and canonicalizes it to
1361 a shorter path if possible, removing things like double slashes or
1362 C</./> and removes references to C<../> directories to get a shorter
1363 unambiguos path. This is used to make the code easier that determines
1364 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1366 =cut
1368 sub naive_path_normalize {
1369 my($self,$path) = @_;
1370 $path =~ s|/+|/|g;
1371 1 while $path =~ s|/[^/]+/\.\./|/|;
1372 $path =~ s|/$||;
1373 $path;
1376 =head2 $ret = $obj->read_recent_1 ( $data )
1378 Delegate of C<recent_events()> on protocol 1
1380 =cut
1382 sub read_recent_1 {
1383 my($self, $data) = @_;
1384 return $data->{recent};
1387 =head2 $array_ref = $obj->recent_events ( %options )
1389 Note: the code relies on the resource being written atomically. We
1390 cannot lock because we may have no write access. If the caller has
1391 write access (eg. aggregate() or update()), it has to care for any
1392 necessary locking and it MUST write atomically.
1394 If C<$options{after}> is specified, only file events after this
1395 timestamp are returned.
1397 If C<$options{before}> is specified, only file events before this
1398 timestamp are returned.
1400 IF C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1401 will be returned.
1403 If C<$options{max}> is specified only a maximum of this many events is
1404 returned.
1406 If C<$options{contains}> is specified the value must be a hash
1407 reference containing a query. The query may contain the keys C<epoch>,
1408 C<path>, and C<type>. Each represents a condition that must be met. If
1409 there is more than one such key, the conditions are ANDed.
1411 If C<$options{info}> is specified, it must be a hashref. This hashref
1412 will be filled with metadata about the unfiltered recent_events of
1413 this object, in key C<first> there is the first item, in key C<last>
1414 is the last.
1416 =cut
1418 sub recent_events {
1419 my ($self, %options) = @_;
1420 my $info = $options{info};
1421 if ($self->is_slave) {
1422 # XXX seems dubious, might produce tempfiles without removing them?
1423 $self->get_remote_recentfile_as_tempfile;
1425 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1426 -e $rfile_or_tempfile or return [];
1427 my $suffix = $self->serializer_suffix;
1428 my ($data) = eval {
1429 $self->_try_deserialize
1431 $suffix,
1432 $rfile_or_tempfile,
1435 my $err = $@;
1436 if ($err or !$data) {
1437 return [];
1439 my $re;
1440 if (reftype $data eq 'ARRAY') { # protocol 0
1441 $re = $data;
1442 } else {
1443 $re = $self->_recent_events_protocol_x
1445 $data,
1446 $rfile_or_tempfile,
1449 return $re unless grep {defined $options{$_}} qw(after before contains max);
1450 $self->_recent_events_handle_options ($re, \%options);
1453 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1454 sub _recent_events_handle_options {
1455 my($self, $re, $options) = @_;
1456 my $last_item = $#$re;
1457 my $info = $options->{info};
1458 if ($info) {
1459 $info->{first} = $re->[0];
1460 $info->{last} = $re->[-1];
1462 if (defined $options->{after}) {
1463 if ($re->[0]{epoch} > $options->{after}) {
1464 if (
1465 my $f = first
1466 {$re->[$_]{epoch} <= $options->{after}}
1467 0..$#$re
1469 $last_item = $f-1;
1471 } else {
1472 $last_item = -1;
1475 my $first_item = 0;
1476 if (defined $options->{before}) {
1477 if ($re->[0]{epoch} > $options->{before}) {
1478 if (
1479 my $f = first
1480 {$re->[$_]{epoch} < $options->{before}}
1481 0..$last_item
1483 $first_item = $f;
1485 } else {
1486 $first_item = 0;
1489 if (0 != $first_item || -1 != $last_item) {
1490 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1492 if ($options->{'skip-deletes'}) {
1493 @$re = grep { $_->{type} ne "delete" } @$re;
1495 if (my $contopt = $options->{contains}) {
1496 my $seen_allowed = 0;
1497 for my $allow (qw(epoch path type)) {
1498 if (exists $contopt->{$allow}) {
1499 $seen_allowed++;
1500 my $v = $contopt->{$allow};
1501 @$re = grep { $_->{$allow} eq $v } @$re;
1504 if (keys %$contopt > $seen_allowed) {
1505 require Carp;
1506 Carp::confess
1507 (sprintf "unknown query: %s", join ", ", %$contopt);
1510 if ($options->{max} && @$re > $options->{max}) {
1511 @$re = splice @$re, 0, $options->{max};
1513 $re;
1516 sub _recent_events_protocol_x {
1517 my($self,
1518 $data,
1519 $rfile_or_tempfile,
1520 ) = @_;
1521 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1522 # we may be reading meta for the first time
1523 while (my($k,$v) = each %{$data->{meta}}) {
1524 next if $k ne lc $k; # "Producers"
1525 next if defined $self->$k;
1526 $self->$k($v);
1528 my $re = $self->$meth ($data);
1529 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1530 my $minmax = { mtime => $stat[9] };
1531 if (@$re) {
1532 $minmax->{min} = $re->[-1]{epoch};
1533 $minmax->{max} = $re->[0]{epoch};
1535 $self->minmax ( $minmax );
1536 return $re;
1539 sub _try_deserialize {
1540 my($self,
1541 $suffix,
1542 $rfile_or_tempfile,
1543 ) = @_;
1544 if ($suffix eq ".yaml") {
1545 require YAML::Syck;
1546 YAML::Syck::LoadFile($rfile_or_tempfile);
1547 } elsif ($HAVE->{"Data::Serializer"}) {
1548 my $serializer = Data::Serializer->new
1549 ( serializer => $serializers{$suffix} );
1550 my $serialized = do
1552 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1553 local $/;
1554 <$fh>;
1556 $serializer->raw_deserialize($serialized);
1557 } else {
1558 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1562 sub _refresh_internals {
1563 my($self, $dst) = @_;
1564 my $class = ref $self;
1565 my $rfpeek = $class->new_from_file ($dst);
1566 for my $acc (qw(
1567 _merged
1568 minmax
1569 )) {
1570 $self->$acc ( $rfpeek->$acc );
1572 my $old_dirtymark = $self->dirtymark;
1573 my $new_dirtymark = $rfpeek->dirtymark;
1574 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1575 $self->done->reset;
1576 $self->dirtymark ( $new_dirtymark );
1577 $self->seed;
1581 =head2 $ret = $obj->rfilename
1583 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1584 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1586 =cut
1588 sub rfilename {
1589 my($self) = @_;
1590 my $file = sprintf("%s-%s%s",
1591 $self->filenameroot,
1592 $self->interval,
1593 $self->serializer_suffix,
1595 return $file;
1598 =head2 $str = $self->remote_dir
1600 The directory we are mirroring from.
1602 =cut
1604 sub remote_dir {
1605 my($self, $set) = @_;
1606 if (defined $set) {
1607 $self->_remote_dir ($set);
1609 my $x = $self->_remote_dir;
1610 $self->is_slave (1);
1611 return $x;
1614 =head2 $str = $obj->remoteroot
1616 =head2 (void) $obj->remoteroot ( $set )
1618 Get/Set the composed prefix needed when rsyncing from a remote module.
1619 If remote_host, remote_module, and remote_dir are set, it is composed
1620 from these.
1622 =cut
1624 sub remoteroot {
1625 my($self, $set) = @_;
1626 if (defined $set) {
1627 $self->_remoteroot($set);
1629 my $remoteroot = $self->_remoteroot;
1630 unless (defined $remoteroot) {
1631 $remoteroot = sprintf
1633 "%s%s%s",
1634 defined $self->remote_host ? ($self->remote_host."::") : "",
1635 defined $self->remote_module ? ($self->remote_module."/") : "",
1636 defined $self->remote_dir ? $self->remote_dir : "",
1638 $self->_remoteroot($remoteroot);
1640 return $remoteroot;
1643 =head2 (void) $obj->split_rfilename ( $recentfilename )
1645 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1646 of the pattern
1648 $filenameroot-$interval$serializer_suffix
1650 e.g.
1652 RECENT-1M.yaml
1654 This filename is split into its parts and the parts are fed to the
1655 object itself.
1657 =cut
1659 sub split_rfilename {
1660 my($self, $rfname) = @_;
1661 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1662 if (my($f,$i,$s) = $rfname =~ $splitter) {
1663 $self->filenameroot ($f);
1664 $self->interval ($i);
1665 $self->serializer_suffix ($s);
1666 } else {
1667 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1669 return;
1672 =head2 my $rfile = $obj->rfile
1674 Returns the full path of the I<recentfile>
1676 =cut
1678 sub rfile {
1679 my($self) = @_;
1680 my $rfile = $self->_rfile;
1681 return $rfile if defined $rfile;
1682 $rfile = File::Spec->catfile
1683 ($self->localroot,
1684 $self->rfilename,
1686 $self->_rfile ($rfile);
1687 return $rfile;
1690 =head2 $rsync_obj = $obj->rsync
1692 The File::Rsync object that this object uses for communicating with an
1693 upstream server.
1695 =cut
1697 sub rsync {
1698 my($self) = @_;
1699 my $rsync = $self->_rsync;
1700 unless (defined $rsync) {
1701 my $rsync_options = $self->rsync_options || {};
1702 if ($HAVE->{"File::Rsync"}) {
1703 $rsync = File::Rsync->new($rsync_options);
1704 $self->_rsync($rsync);
1705 } else {
1706 die "File::Rsync required for rsync operations. Cannot continue";
1709 return $rsync;
1712 =head2 (void) $obj->register_rsync_error(@err)
1714 =head2 (void) $obj->un_register_rsync_error()
1716 Register_rsync_error is called whenever the File::Rsync object fails
1717 on an exec (say, connection doesn't succeed). It issues a warning and
1718 sleeps for an increasing amount of time. Un_register_rsync_error
1719 resets the error count. See also accessor C<max_rsync_errors>.
1721 =cut
1724 my $no_success_count = 0;
1725 my $no_success_time = 0;
1726 sub register_rsync_error {
1727 my($self, @err) = @_;
1728 chomp @err;
1729 $no_success_time = time;
1730 $no_success_count++;
1731 my $max_rsync_errors = $self->max_rsync_errors;
1732 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1733 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1734 require Carp;
1735 Carp::confess
1737 sprintf
1739 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1740 $self->interval,
1741 join(" ",@err),
1742 $no_success_count,
1745 my $sleep = 12 * $no_success_count;
1746 $sleep = 300 if $sleep > 300;
1747 require Carp;
1748 Carp::cluck
1749 (sprintf
1751 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1752 scalar(localtime($no_success_time)),
1753 $self->interval,
1754 join(" ",@err),
1755 $sleep,
1757 sleep $sleep
1759 sub un_register_rsync_error {
1760 my($self) = @_;
1761 $no_success_time = 0;
1762 $no_success_count = 0;
1766 =head2 $clone = $obj->_sparse_clone
1768 Clones just as much from itself that it does not hurt. Experimental
1769 method.
1771 Note: what fits better: sparse or shallow? Other suggestions?
1773 =cut
1775 sub _sparse_clone {
1776 my($self) = @_;
1777 my $new = bless {}, ref $self;
1778 for my $m (qw(
1779 _interval
1780 _localroot
1781 _remoteroot
1782 _rfile
1783 _use_tempfile
1784 aggregator
1785 filenameroot
1786 is_slave
1787 max_files_per_connection
1788 protocol
1789 rsync_options
1790 serializer_suffix
1791 sleep_per_connection
1792 tempdir
1793 verbose
1794 )) {
1795 my $o = $self->$m;
1796 $o = Storable::dclone $o if ref $o;
1797 $new->$m($o);
1799 $new;
1802 =head2 $boolean = OBJ->ttl_reached ()
1804 =cut
1806 sub ttl_reached {
1807 my($self) = @_;
1808 my $have_mirrored = $self->have_mirrored || 0;
1809 my $now = Time::HiRes::time;
1810 my $ttl = $self->ttl;
1811 $ttl = 24.2 unless defined $ttl;
1812 if ($now > $have_mirrored + $ttl) {
1813 return 1;
1815 return 0;
1818 =head2 (void) $obj->unlock()
1820 Unlocking is implemented with an C<rmdir> on a locking directory
1821 (C<.lock> appended to $rfile).
1823 =cut
1825 sub unlock {
1826 my($self) = @_;
1827 return unless $self->_is_locked;
1828 my $rfile = $self->rfile;
1829 rmdir "$rfile.lock";
1830 $self->_is_locked (0);
1833 =head2 unseed
1835 Sets this recentfile in the state of not 'seeded'.
1837 =cut
1838 sub unseed {
1839 my($self) = @_;
1840 $self->seeded(0);
1843 =head2 $ret = $obj->update ($path, $type)
1845 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1847 =head2 $ret = $obj->update ()
1849 Enter one file into the local I<recentfile>. $path is the (usually
1850 absolute) path. If the path is outside I<our> tree, then it is
1851 ignored.
1853 C<$type> is one of C<new> or C<delete>.
1855 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1856 not used and the epoch is calculated by the update() routine itself
1857 based on current time. But if there is the demand to insert a
1858 not-so-current file into the dataset, then the caller sets
1859 $dirty_epoch. This causes the epoch of the registered event to become
1860 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1861 bit more. As compensation the dirtymark of the whole dataset is set to
1862 now or the current epoch, whichever is higher. Note: setting the
1863 dirty_epoch to the future is prohibited as it's very unlikely to be
1864 intended: it definitely might wreak havoc with the index files.
1866 The new file event is unshifted (or, if dirty_epoch is set, inserted
1867 at the place it belongs to, according to the rule to have a sequence
1868 of strictly decreasing timestamps) to the array of recent_events and
1869 the array is shortened to the length of the timespan allowed. This is
1870 usually the timespan specified by the interval of this recentfile but
1871 as long as this recentfile has not been merged to another one, the
1872 timespan may grow without bounds.
1874 The third form runs an update without inserting a new file. This may
1875 be desired to truncate a recentfile.
1877 =cut
1878 sub _epoch_monotonically_increasing {
1879 my($self,$epoch,$recent) = @_;
1880 return $epoch unless @$recent; # the first one goes unoffended
1881 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1882 return $epoch;
1883 } else {
1884 return _increase_a_bit($recent->[0]{epoch});
1887 sub update {
1888 my($self,$path,$type,$dirty_epoch) = @_;
1889 if (defined $path or defined $type or defined $dirty_epoch) {
1890 die "update called without path argument" unless defined $path;
1891 die "update called without type argument" unless defined $type;
1892 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1893 # since we have keep_delete_objects_forever we must let them inject delete objects too:
1894 #die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1895 # "dirty_epoch only allowed with type=new" if defined $dirty_epoch and $type ne "new";
1896 my $canonmeth = $self->canonize;
1897 unless ($canonmeth) {
1898 $canonmeth = "naive_path_normalize";
1900 $path = $self->$canonmeth($path);
1902 my $lrd = $self->localroot;
1903 $self->lock;
1904 # you must calculate the time after having locked, of course
1905 my $now = Time::HiRes::time;
1906 my $interval = $self->interval;
1907 my $secs = $self->interval_secs();
1908 my $recent = $self->recent_events;
1910 my $epoch;
1911 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
1912 $epoch = $dirty_epoch;
1913 } else {
1914 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
1917 $recent ||= [];
1918 my $oldest_allowed = 0;
1919 my $merged = $self->merged;
1920 if ($merged->{epoch}) {
1921 my $virtualnow = _bigfloatmax($now,$epoch);
1922 # for the lower bound I think we need no big math, we calc already
1923 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
1924 } else {
1925 # as long as we are not merged at all, no limits!
1927 my $something_done = 0;
1928 TRUNCATE: while (@$recent) {
1929 # $DB::single++ unless defined $oldest_allowed;
1930 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
1931 pop @$recent;
1932 $something_done = 1;
1933 } else {
1934 last TRUNCATE;
1937 if (defined $path && $path =~ s|^\Q$lrd\E||) {
1938 $path =~ s|^/||;
1939 my $splicepos;
1940 # remove the older duplicates of this $path, irrespective of $type:
1941 if (defined $dirty_epoch) {
1942 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
1943 $recent = $ctx->{recent};
1944 $splicepos = $ctx->{splicepos};
1945 $epoch = $ctx->{epoch};
1946 my $dirtymark = $self->dirtymark;
1947 my $new_dm = $now;
1948 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
1949 $new_dm = $epoch;
1951 $self->dirtymark($new_dm);
1952 my $merged = $self->merged;
1953 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
1954 $self->merged(+{});
1956 } else {
1957 $recent = [ grep { $_->{path} ne $path } @$recent ];
1958 $splicepos = 0;
1960 if (defined $splicepos) {
1961 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
1963 $something_done = 1;
1966 $self->write_recent($recent) if $something_done;
1967 $self->_assert_symlink;
1968 $self->unlock;
1971 sub _update_with_dirty_epoch {
1972 my($self,$path,$recent,$epoch) = @_;
1973 my $splicepos;
1974 my $new_recent = [];
1975 if (grep { $_->{path} ne $path } @$recent) {
1976 my $cancel = 0;
1977 KNOWN_EVENT: for my $i (0..$#$recent) {
1978 if ($recent->[$i]{path} eq $path) {
1979 if ($recent->[$i]{epoch} eq $epoch) {
1980 # nothing to do
1981 $cancel = 1;
1982 last KNOWN_EVENT;
1984 } else {
1985 push @$new_recent, $recent->[$i];
1988 @$recent = @$new_recent unless $cancel;
1990 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
1991 $splicepos = 0;
1992 } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
1993 $splicepos = @$recent;
1994 } else {
1995 RECENT: for my $i (0..$#$recent) {
1996 my $ev = $recent->[$i];
1997 if ($epoch eq $recent->[$i]{epoch}) {
1998 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
2000 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
2001 $splicepos = $i;
2002 last RECENT;
2006 return {
2007 recent => $recent,
2008 splicepos => $splicepos,
2009 epoch => $epoch,
2013 =head2 seed
2015 Sets this recentfile in the state of 'seeded' which means it has to
2016 re-evaluate its uptodateness.
2018 =cut
2019 sub seed {
2020 my($self) = @_;
2021 $self->seeded(1);
2024 =head2 seeded
2026 Tells if the recentfile is in the state 'seeded'.
2028 =cut
2029 sub seeded {
2030 my($self, $set) = @_;
2031 if (defined $set) {
2032 $self->_seeded ($set);
2034 my $x = $self->_seeded;
2035 unless (defined $x) {
2036 $x = 0;
2037 $self->_seeded ($x);
2039 return $x;
2042 =head2 uptodate
2044 True if this object has mirrored the complete interval covered by the
2045 current recentfile.
2047 =cut
2048 sub uptodate {
2049 my($self) = @_;
2050 my $uptodate;
2051 my $why;
2052 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2053 $why = "saturated";
2054 $uptodate = 1;
2056 # it's too easy to misconfigure ttl and related timings and then
2057 # never reach uptodateness, so disabled 2009-03-22
2058 if (0 and not defined $uptodate) {
2059 if ($self->ttl_reached){
2060 $why = "ttl_reached returned true, so we are not uptodate";
2061 $uptodate = 0 ;
2064 unless (defined $uptodate) {
2065 # look if recentfile has unchanged timestamp
2066 my $minmax = $self->minmax;
2067 if (exists $minmax->{mtime}) {
2068 my $rfile = $self->_my_current_rfile;
2069 my @stat = stat $rfile;
2070 my $mtime = $stat[9];
2071 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
2072 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2073 $uptodate = 0;
2074 } else {
2075 my $covered = $self->done->covered(@$minmax{qw(max min)});
2076 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2077 $uptodate = $covered;
2081 unless (defined $uptodate) {
2082 $why = "fallthrough, so not uptodate";
2083 $uptodate = 0;
2085 if ($uptodate) {
2086 $self->_uptodateness_ever_reached(1);
2088 my $remember =
2090 uptodate => $uptodate,
2091 why => $why,
2093 $self->_remember_last_uptodate_call($remember);
2094 return $uptodate;
2097 =head2 $obj->write_recent ($recent_files_arrayref)
2099 Writes a I<recentfile> based on the current reflection of the current
2100 state of the tree limited by the current interval.
2102 =cut
2103 sub _resort {
2104 my($self) = @_;
2105 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
2106 return;
2108 sub write_recent {
2109 my ($self,$recent) = @_;
2110 die "write_recent called without argument" unless defined $recent;
2111 my $Last_epoch;
2112 SANITYCHECK: for my $i (0..$#$recent) {
2113 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2114 require Carp;
2115 Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2116 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2117 # you may want to:
2118 # $self->_resort($recent);
2119 # last SANITYCHECK;
2121 $Last_epoch = $recent->[$i]{epoch};
2123 my $minmax = $self->minmax;
2124 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2125 $minmax->{max} = $recent->[0]{epoch};
2127 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2128 $minmax->{min} = $recent->[-1]{epoch};
2130 $self->minmax($minmax);
2131 my $meth = sprintf "write_%d", $self->protocol;
2132 $self->$meth($recent);
2135 =head2 $obj->write_0 ($recent_files_arrayref)
2137 Delegate of C<write_recent()> on protocol 0
2139 =cut
2141 sub write_0 {
2142 my ($self,$recent) = @_;
2143 my $rfile = $self->rfile;
2144 YAML::Syck::DumpFile("$rfile.new",$recent);
2145 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2148 =head2 $obj->write_1 ($recent_files_arrayref)
2150 Delegate of C<write_recent()> on protocol 1
2152 =cut
2154 sub write_1 {
2155 my ($self,$recent) = @_;
2156 my $rfile = $self->rfile;
2157 my $suffix = $self->serializer_suffix;
2158 my $data = {
2159 meta => $self->meta_data,
2160 recent => $recent,
2162 my $serialized;
2163 if ($suffix eq ".yaml") {
2164 $serialized = YAML::Syck::Dump($data);
2165 } elsif ($HAVE->{"Data::Serializer"}) {
2166 my $serializer = Data::Serializer->new
2167 ( serializer => $serializers{$suffix} );
2168 $serialized = $serializer->raw_serialize($data);
2169 } else {
2170 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2172 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2173 print $fh $serialized;
2174 close $fh or die "Could not close '$rfile.new': $!";
2175 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2178 BEGIN {
2179 my $nq = qr/[^"]+/; # non-quotes
2180 my @pod_lines =
2181 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2183 =head1 SERIALIZERS
2185 The following suffixes are supported and trigger the use of these
2186 serializers:
2188 =over 4
2190 =item C<< ".yaml" => "YAML::Syck" >>
2192 =item C<< ".json" => "JSON" >>
2194 =item C<< ".sto" => "Storable" >>
2196 =item C<< ".dd" => "Data::Dumper" >>
2198 =back
2200 =cut
2202 BEGIN {
2203 my @pod_lines =
2204 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2206 =head1 INTERVAL SPEC
2208 An interval spec is a primitive way to express time spans. Normally it
2209 is composed from an integer and a letter.
2211 As a special case, a string that consists only of the single letter
2212 C<Z>, stands for MAX_INT seconds.
2214 The following letters express the specified number of seconds:
2216 =over 4
2218 =item C<< s => 1 >>
2220 =item C<< m => 60 >>
2222 =item C<< h => 60*60 >>
2224 =item C<< d => 60*60*24 >>
2226 =item C<< W => 60*60*24*7 >>
2228 =item C<< M => 60*60*24*30 >>
2230 =item C<< Q => 60*60*24*90 >>
2232 =item C<< Y => 60*60*24*365.25 >>
2234 =back
2236 =cut
2238 =head1 SEE ALSO
2240 L<File::Rsync::Mirror::Recent>,
2241 L<File::Rsync::Mirror::Recentfile::Done>,
2242 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2244 =head1 BUGS
2246 Please report any bugs or feature requests through the web interface
2248 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2249 I will be notified, and then you'll automatically be notified of
2250 progress on your bug as I make changes.
2252 =head1 KNOWN BUGS
2254 Memory hungry: it seems all memory is allocated during the initial
2255 rsync where a list of all files is maintained in memory.
2257 =head1 SUPPORT
2259 You can find documentation for this module with the perldoc command.
2261 perldoc File::Rsync::Mirror::Recentfile
2263 You can also look for information at:
2265 =over 4
2267 =item * RT: CPAN's request tracker
2269 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2271 =item * AnnoCPAN: Annotated CPAN documentation
2273 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2275 =item * CPAN Ratings
2277 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2279 =item * Search CPAN
2281 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2283 =back
2286 =head1 ACKNOWLEDGEMENTS
2288 Thanks to RJBS for module-starter.
2290 =head1 AUTHOR
2292 Andreas König
2294 =head1 COPYRIGHT & LICENSE
2296 Copyright 2008,2009 Andreas König.
2298 This program is free software; you can redistribute it and/or modify it
2299 under the same terms as Perl itself.
2302 =cut
2304 1; # End of File::Rsync::Mirror::Recentfile
2306 # Local Variables:
2307 # mode: cperl
2308 # cperl-indent-level: 4
2309 # End: