tempfile cleanup; tempdir accessor; docs adjusted; chmod on tempfiles
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob6cbac1e818bf1941c4bd5e8cdfba8c9db1c83f5e
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =cut
14 my $HAVE = {};
15 for my $package (
16 "Data::Serializer",
17 "File::Rsync"
18 ) {
19 $HAVE->{$package} = eval qq{ require $package; };
21 use Config;
22 use File::Basename qw(basename dirname fileparse);
23 use File::Copy qw(cp);
24 use File::Path qw(mkpath);
25 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
26 use File::Temp;
27 use List::Util qw(first max min);
28 use Scalar::Util qw(reftype);
29 use Storable;
30 use Time::HiRes qw();
31 use YAML::Syck;
33 use version; our $VERSION = qv('0.0.1');
35 use constant MAX_INT => ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL => 1;
38 # cf. interval_secs
39 my %seconds;
41 # maybe subclass if this mapping is bad?
42 my %serializers;
44 =head1 SYNOPSIS
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
51 interval => q(6h),
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
59 Reader/mirrorer:
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
64 ignore_link_stat_errors => 1,
65 interval => q(6h),
66 localroot => "/home/ftp/pub/PAUSE/authors",
67 remote_dir => "",
68 remote_host => "pause.perl.org",
69 remote_module => "authors",
70 rsync_options => {
71 compress => 1,
72 'rsync-path' => '/usr/bin/rsync',
73 links => 1,
74 times => 1,
75 'omit-dir-times' => 1,
76 checksum => 1,
78 verbose => 1,
80 $rf->mirror;
82 Aggregator (usually the writer):
84 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
85 $rf->aggregate;
87 =head1 DESCRIPTION
89 Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
90 is always composed of several recentfiles, controlled by the
91 F:R:M:Recent object. The Recentfile object has to do the bookkeeping
92 for a single timeslice.
94 =head1 EXPORT
96 No exports.
98 =head1 CONSTRUCTORS / DESTRUCTOR
100 =head2 my $obj = CLASS->new(%hash)
102 Constructor. On every argument pair the key is a method name and the
103 value is an argument to that method name.
105 If a recentfile for this resource already exists, metadata that are
106 not defined by the constructor will be fetched from there as soon as
107 it is being read by recent_events().
109 =cut
111 sub new {
112 my($class, @args) = @_;
113 my $self = bless {}, $class;
114 while (@args) {
115 my($method,$arg) = splice @args, 0, 2;
116 $self->$method($arg);
118 unless (defined $self->protocol) {
119 $self->protocol(DEFAULT_PROTOCOL);
121 unless (defined $self->filenameroot) {
122 $self->filenameroot("RECENT");
124 unless (defined $self->serializer_suffix) {
125 $self->serializer_suffix(".yaml");
127 return $self;
130 =head2 my $obj = CLASS->new_from_file($file)
132 Constructor. $file is a I<recentfile>.
134 =cut
136 sub new_from_file {
137 my($class, $file) = @_;
138 my $self = bless {}, $class;
139 $self->_rfile($file);
140 #?# $self->lock;
141 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
142 local $/;
143 <$fh>;
145 # XXX: we can skip this step when the metadata are sufficient, but
146 # we cannot parse the file without some magic stuff about
147 # serialized formats
148 while (-l $file) {
149 my($name,$path) = fileparse $file;
150 my $symlink = readlink $file;
151 if ($symlink =~ m|/|) {
152 die "FIXME: filenames containing '/' not supported, got $symlink";
154 $file = File::Spec->catfile ( $path, $symlink );
156 my($name,$path,$suffix) = fileparse $file, keys %serializers;
157 $self->serializer_suffix($suffix);
158 $self->localroot($path);
159 die "Could not determine file format from suffix" unless $suffix;
160 my $deserialized;
161 if ($suffix eq ".yaml") {
162 require YAML::Syck;
163 $deserialized = YAML::Syck::LoadFile($file);
164 } elsif ($HAVE->{"Data::Serializer"}) {
165 my $serializer = Data::Serializer->new
166 ( serializer => $serializers{$suffix} );
167 $deserialized = $serializer->raw_deserialize($serialized);
168 } else {
169 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
171 while (my($k,$v) = each %{$deserialized->{meta}}) {
172 next if $k ne lc $k; # "Producers"
173 $self->$k($v);
175 unless (defined $self->protocol) {
176 $self->protocol(DEFAULT_PROTOCOL);
178 return $self;
181 =head2 DESTROY
183 A simple unlock.
185 =cut
186 sub DESTROY {
187 my $self = shift;
188 $self->unlock;
189 unless ($self->_current_tempfile_fh) {
190 if (my $tempfile = $self->_current_tempfile) {
191 if (-e $tempfile) {
192 unlink $tempfile; # may fail in global destruction
198 =head1 ACCESSORS
200 =cut
202 my @accessors;
204 BEGIN {
205 @accessors = (
206 "_current_tempfile",
207 "_current_tempfile_fh",
208 "_delayed_operations",
209 "_done",
210 "_interval",
211 "_is_locked",
212 "_localroot",
213 "_merged",
214 "_pathdb",
215 "_remember_last_uptodate_call",
216 "_remote_dir",
217 "_remoteroot",
218 "_rfile",
219 "_rsync",
220 "__verified_tempdir",
221 "_seeded",
222 "_uptodateness_ever_reached",
223 "_use_tempfile",
226 my @pod_lines =
227 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
229 =over 4
231 =item aggregator
233 A list of interval specs that tell the aggregator which I<recentfile>s
234 are to be produced.
236 =item canonize
238 The name of a method to canonize the path before rsyncing. Only
239 supported value is C<naive_path_normalize>. Defaults to that.
241 =item comment
243 A comment about this tree and setup.
245 =item dirtymark
247 A timestamp. The dirtymark is updated whenever an out of band change
248 on the origin server is performed that violates the protocol. Say,
249 they add or remove files in the middle somewhere. Slaves must react
250 with a devaluation of their C<done> structure which then leads to a
251 full re-sync of all files. Implementation note: dirtymark may increase
252 or decrease.
254 =item filenameroot
256 The (prefix of the) filename we use for this I<recentfile>. Defaults to
257 C<RECENT>. The string must not contain a directory separator.
259 =item have_mirrored
261 Timestamp remembering when we mirrored this recentfile the last time.
262 Only relevant for slaves.
264 =item ignore_link_stat_errors
266 If set to true, rsync errors are ignored that complain about link stat
267 errors. These seem to happen only when there are files missing at the
268 origin. In race conditions this can always happen, so it is
269 recommended to set this value to true.
271 =item is_slave
273 If set to true, this object will fetch a new recentfile from remote
274 when the timespan between the last mirror (see have_mirrored) and now
275 is too large (see C<ttl>).
277 =item keep_delete_objects_forever
279 The default for delete events is that they are passed through the
280 collection of recentfile objects until they reach the Z file. There
281 they get dropped so that the associated file object ceases to exist at
282 all. By setting C<keep_delete_objects_forever> the delete objects are
283 kept forever. This makes the Z file larger but has the advantage that
284 slaves that have interrupted mirroring for a long time still can clean
285 up their copy.
287 =item locktimeout
289 After how many seconds shall we die if we cannot lock a I<recentfile>?
290 Defaults to 600 seconds.
292 =item loopinterval
294 When mirror_loop is called, this accessor can specify how much time
295 every loop shall at least take. If the work of a loop is done before
296 that time has gone, sleeps for the rest of the time. Defaults to
297 arbitrary 42 seconds.
299 =item max_files_per_connection
301 Maximum number of files that are transferred on a single rsync call.
302 Setting it higher means higher performance at the price of holding
303 connections longer and potentially disturbing other users in the pool.
304 Defaults to the arbitrary value 42.
306 =item max_rsync_errors
308 When rsync operations encounter that many errors without any resetting
309 success in between, then we die. Defaults to unlimited. A value of
310 -1 means we run forever ignoring all rsync errors.
312 =item minmax
314 Hashref remembering when we read the recent_events from this file the
315 last time and what the timespan was.
317 =item protocol
319 When the RECENT file format changes, we increment the protocol. We try
320 to support older protocols in later releases.
322 =item remote_host
324 The host we are mirroring from. Leave empty for the local filesystem.
326 =item remote_module
328 Rsync servers have so called modules to separate directory trees from
329 each other. Put here the name of the module under which we are
330 mirroring. Leave empty for local filesystem.
332 =item rsync_options
334 Things like compress, links, times or checksums. Passed in to the
335 File::Rsync object used to run the mirror.
337 =item serializer_suffix
339 Mostly untested accessor. The only well tested format for
340 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
341 Data::Serializer. But in principle other formats are supported as
342 well. See section SERIALIZERS below.
344 =item sleep_per_connection
346 Sleep that many seconds (floating point OK) after every chunk of rsyncing
347 has finished. Defaults to arbitrary 0.42.
349 =item tempdir
351 Directory to write temporary files to. Must allow rename operations
352 into the tree which usually means it must live on the same partition
353 as the target directory. Defaults to C<< $self->localroot >>.
355 =item ttl
357 Time to live. Number of seconds after which this recentfile must be
358 fetched again from the origin server. Only relevant for slaves.
359 Defaults to arbitrary 24.2 seconds.
361 =item verbose
363 Boolean to turn on a bit verbosity.
365 =back
367 =cut
369 use accessors @accessors;
371 =head1 METHODS
373 =head2 (void) $obj->aggregate( %options )
375 Takes all intervals that are collected in the accessor called
376 aggregator. Sorts them by actual length of the interval.
377 Removes those that are shorter than our own interval. Then merges this
378 object into the next larger object. The merging continues upwards
379 as long as the next I<recentfile> is old enough to warrant a merge.
381 If a merge is warranted is decided according to the interval of the
382 previous interval so that larger files are not so often updated as
383 smaller ones. If $options{force} is true, all files get updated.
385 Here is an example to illustrate the behaviour. Given aggregators
387 1h 1d 1W 1M 1Q 1Y Z
389 then
391 1h updates 1d on every call to aggregate()
392 1d updates 1W earliest after 1h
393 1W updates 1M earliest after 1d
394 1M updates 1Q earliest after 1W
395 1Q updates 1Y earliest after 1M
396 1Y updates Z earliest after 1Q
398 Note that all but the smallest recentfile get updated at an arbitrary
399 rate and as such are quite useless on their own.
401 =cut
403 sub aggregate {
404 my($self, %option) = @_;
405 my @aggs = sort { $a->{secs} <=> $b->{secs} }
406 grep { $_->{secs} >= $self->interval_secs }
407 map { { interval => $_, secs => $self->interval_secs($_)} }
408 $self->interval, @{$self->aggregator || []};
409 $self->update;
410 $aggs[0]{object} = $self;
411 AGGREGATOR: for my $i (0..$#aggs-1) {
412 my $this = $aggs[$i]{object};
413 my $next = $this->_sparse_clone;
414 $next->interval($aggs[$i+1]{interval});
415 my $want_merge = 0;
416 if ($option{force} || $i == 0) {
417 $want_merge = 1;
418 } else {
419 my $next_rfile = $next->rfile;
420 if (-e $next_rfile) {
421 my $prev = $aggs[$i-1]{object};
422 local $^T = time;
423 my $next_age = 86400 * -M $next_rfile;
424 if ($next_age > $prev->interval_secs) {
425 $want_merge = 1;
427 } else {
428 $want_merge = 1;
431 if ($want_merge) {
432 $next->merge($this);
433 $aggs[$i+1]{object} = $next;
434 } else {
435 last AGGREGATOR;
440 # collect file size and mtime for all files of this aggregate
441 sub _debug_aggregate {
442 my($self) = @_;
443 my @aggs = sort { $a->{secs} <=> $b->{secs} }
444 map { { interval => $_, secs => $self->interval_secs($_)} }
445 $self->interval, @{$self->aggregator || []};
446 my $report = [];
447 for my $i (0..$#aggs) {
448 my $this = Storable::dclone $self;
449 $this->interval($aggs[$i]{interval});
450 my $rfile = $this->rfile;
451 my @stat = stat $rfile;
452 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
454 $report;
457 # (void) $self->_assert_symlink()
458 sub _assert_symlink {
459 my($self) = @_;
460 my $recentrecentfile = File::Spec->catfile
462 $self->localroot,
463 sprintf
465 "%s.recent",
466 $self->filenameroot
469 if ($Config{d_symlink} eq "define") {
470 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
471 if (-l $recentrecentfile) {
472 my $found_symlink = readlink $recentrecentfile;
473 if ($found_symlink eq $self->rfilename) {
474 return;
475 } else {
476 $howto_create_symlink = 2;
478 } else {
479 $howto_create_symlink = 1;
481 if (1 == $howto_create_symlink) {
482 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
483 } else {
484 unlink "$recentrecentfile.$$"; # may fail
485 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
486 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
488 } else {
489 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
490 unlink "$recentrecentfile.$$"; # may fail
491 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
492 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
496 =head2 $hashref = $obj->delayed_operations
498 A hash of hashes containing unlink and rmdir operations which had to
499 wait until the recentfile got unhidden in order to not confuse
500 downstream mirrors (in case we have some).
502 =cut
504 sub delayed_operations {
505 my($self) = @_;
506 my $x = $self->_delayed_operations;
507 unless (defined $x) {
508 $x = {
509 unlink => {},
510 rmdir => {},
512 $self->_delayed_operations ($x);
514 return $x;
517 =head2 $done = $obj->done
519 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
520 object that keeps track of rsync activities. Only needed and used when
521 we are a mirroring slave.
523 =cut
525 sub done {
526 my($self) = @_;
527 my $done = $self->_done;
528 if (!$done) {
529 require File::Rsync::Mirror::Recentfile::Done;
530 $done = File::Rsync::Mirror::Recentfile::Done->new();
531 $done->_rfinterval ($self->interval);
532 $self->_done ( $done );
534 return $done;
537 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
539 Stores the remote I<recentfile> locally as a tempfile. The caller is
540 responsible to remove the file after use.
542 Note: if you're intending to act as an rsync server for other slaves,
543 then you must prefer this method to fetch that file with
544 get_remotefile(). Otherwise downstream mirrors would expect you to
545 already have mirrored all the files that are in the I<recentfile>
546 before you have them mirrored.
548 =cut
550 sub get_remote_recentfile_as_tempfile {
551 my($self) = @_;
552 mkpath $self->localroot;
553 my $fh;
554 my $trfilename;
555 if ( $self->_use_tempfile() ) {
556 if ($self->ttl_reached) {
557 $fh = $self->_current_tempfile_fh;
558 $trfilename = $self->rfilename;
559 } else {
560 return $self->_current_tempfile;
562 } else {
563 $trfilename = $self->rfilename;
566 my $dst;
567 if ($fh) {
568 $dst = $self->_current_tempfile;
569 } else {
570 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
571 $dst = $fh->filename;
572 $self->_current_tempfile ($dst);
573 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
574 if (defined $rfile && -e $rfile) {
575 # saving on bandwidth. Might need to be configurable
576 # $self->bandwidth_is_cheap?
577 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
580 my $src = join ("/",
581 $self->remoteroot,
582 $trfilename,
584 if ($self->verbose) {
585 my $doing = -e $dst ? "Sync" : "Get";
586 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
587 printf STDERR
589 "%-4s %d (1/1/%s) temp %s ... ",
590 $doing,
591 time,
592 $self->interval,
593 $display_dst,
596 my $gaveup = 0;
597 my $retried = 0;
598 while (!$self->rsync->exec(
599 src => $src,
600 dst => $dst,
601 )) {
602 $self->register_rsync_error ($self->rsync->err);
603 if (++$retried >= 3) {
604 warn "XXX giving up";
605 $gaveup = 1;
606 last;
609 if ($gaveup) {
610 printf STDERR "Warning: gave up mirroring %s, will try again later", $self->interval;
611 } else {
612 $self->_refresh_internals ($dst);
613 $self->have_mirrored (Time::HiRes::time);
614 $self->un_register_rsync_error ();
616 $self->unseed;
617 if ($self->verbose) {
618 print STDERR "DONE\n";
620 my $mode = 0644;
621 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
622 return $dst;
625 sub _verified_tempdir {
626 my($self) = @_;
627 my $tempdir = $self->__verified_tempdir();
628 return $tempdir if defined $tempdir;
629 unless ($tempdir = $self->tempdir) {
630 $tempdir = $self->localroot;
632 unless (-d $tempdir) {
633 mkpath $tempdir;
635 $self->__verified_tempdir($tempdir);
636 return $tempdir;
639 sub _get_remote_rat_provide_tempfile_object {
640 my($self, $trfilename) = @_;
641 my $_verified_tempdir = $self->_verified_tempdir;
642 my $fh = File::Temp->new
643 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
644 $trfilename,
646 DIR => $_verified_tempdir,
647 SUFFIX => $self->serializer_suffix,
648 UNLINK => $self->_use_tempfile,
650 my $mode = 0644;
651 my $dst = $fh->filename;
652 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
653 if ($self->_use_tempfile) {
654 $self->_current_tempfile_fh ($fh); # delay self destruction
656 return $fh;
659 =head2 $localpath = $obj->get_remotefile ( $relative_path )
661 Rsyncs one single remote file to local filesystem.
663 Note: no locking is done on this file. Any number of processes may
664 mirror this object.
666 Note II: do not use for recentfiles. If you are a cascading
667 slave/server combination, it would confuse other slaves. They would
668 expect the contents of these recentfiles to be available. Use
669 get_remote_recentfile_as_tempfile() instead.
671 =cut
673 sub get_remotefile {
674 my($self, $path) = @_;
675 my $dst = File::Spec->catfile($self->localroot, $path);
676 mkpath dirname $dst;
677 if ($self->verbose) {
678 my $doing = -e $dst ? "Sync" : "Get";
679 printf STDERR
681 "%-4s %d (1/1/%s) %s ... ",
682 $doing,
683 time,
684 $self->interval,
685 $path,
688 while (!$self->rsync->exec(
689 src => join("/",
690 $self->remoteroot,
691 $path),
692 dst => $dst,
693 )) {
694 $self->register_rsync_error ($self->rsync->err);
696 $self->un_register_rsync_error ();
697 if ($self->verbose) {
698 print STDERR "DONE\n";
700 return $dst;
703 =head2 $obj->interval ( $interval_spec )
705 Get/set accessor. $interval_spec is a string and described below in
706 the section INTERVAL SPEC.
708 =cut
710 sub interval {
711 my ($self, $interval) = @_;
712 if (@_ >= 2) {
713 $self->_interval($interval);
714 $self->_rfile(undef);
716 $interval = $self->_interval;
717 unless (defined $interval) {
718 # do not ask the $self too much, it recurses!
719 require Carp;
720 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
722 return $interval;
725 =head2 $secs = $obj->interval_secs ( $interval_spec )
727 $interval_spec is described below in the section INTERVAL SPEC. If
728 empty defaults to the inherent interval for this object.
730 =cut
732 sub interval_secs {
733 my ($self, $interval) = @_;
734 $interval ||= $self->interval;
735 unless (defined $interval) {
736 die "interval_secs() called without argument on an object without a declared one";
738 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
739 die "Could not determine seconds from interval[$interval]";
740 if ($interval eq "Z") {
741 return MAX_INT;
742 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
743 return $seconds{$t}*$n;
744 } else {
745 die "Invalid interval specification: n[$n]t[$t]";
749 =head2 $obj->localroot ( $localroot )
751 Get/set accessor. The local root of the tree.
753 =cut
755 sub localroot {
756 my ($self, $localroot) = @_;
757 if (@_ >= 2) {
758 $self->_localroot($localroot);
759 $self->_rfile(undef);
761 $localroot = $self->_localroot;
764 =head2 $ret = $obj->local_path($path_found_in_recentfile)
766 Combines the path to our local mirror and the path of an object found
767 in this I<recentfile>. In other words: the target of a mirror operation.
769 Implementation note: We split on slashes and then use
770 File::Spec::catfile to adjust to the local operating system.
772 =cut
774 sub local_path {
775 my($self,$path) = @_;
776 unless (defined $path) {
777 # seems like a degenerated case
778 return $self->localroot;
780 my @p = split m|/|, $path;
781 File::Spec->catfile($self->localroot,@p);
784 =head2 (void) $obj->lock
786 Locking is implemented with an C<mkdir> on a locking directory
787 (C<.lock> appended to $rfile).
789 =cut
791 sub lock {
792 my ($self) = @_;
793 # not using flock because it locks on filehandles instead of
794 # old school ressources.
795 my $locked = $self->_is_locked and return;
796 my $rfile = $self->rfile;
797 # XXX need a way to allow breaking the lock
798 my $start = time;
799 my $locktimeout = $self->locktimeout || 600;
800 while (not mkdir "$rfile.lock") {
801 Time::HiRes::sleep 0.01;
802 if (time - $start > $locktimeout) {
803 die "Could not acquire lockdirectory '$rfile.lock': $!";
806 $self->_is_locked (1);
809 =head2 (void) $obj->merge ($other)
811 Bulk update of this object with another one. It's used to merge a
812 smaller and younger $other object into the current one. If this file
813 is a C<Z> file, then we normally do not merge in objects of type
814 C<delete>; this can be overridden by setting
815 keep_delete_objects_forever. But if we encounter an object of type
816 delete we delete the corresponding C<new> object if we have it.
818 If there is nothing to be merged, nothing is done.
820 =cut
822 sub merge {
823 my($self, $other) = @_;
824 $self->_merge_sanitycheck ( $other );
825 $other->lock;
826 my $other_recent = $other->recent_events || [];
827 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
828 $self->lock;
829 $self->_merge_locked ( $other, $other_recent );
830 $self->unlock;
831 $other->unlock;
834 sub _merge_locked {
835 my($self, $other, $other_recent) = @_;
836 my $my_recent = $self->recent_events || [];
838 # calculate the target time span
839 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
840 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
841 my $oldest_allowed = 0;
842 my $something_done;
843 unless ($my_recent->[0]) {
844 # obstetrics
845 $something_done = 1;
847 if ($epoch) {
848 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
849 $oldest_allowed = 0;
850 $something_done = 1;
851 } elsif (my $merged = $self->merged) {
852 my $secs = $self->interval_secs();
853 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
854 if (@$other_recent and
855 _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
857 $oldest_allowed = $other_recent->[-1]{epoch};
860 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
861 pop @$my_recent;
862 $something_done = 1;
866 my %have_path;
867 my $other_recent_filtered = [];
868 for my $oev (@$other_recent) {
869 my $oevepoch = $oev->{epoch} || 0;
870 next if _bigfloatlt($oevepoch, $oldest_allowed);
871 my $path = $oev->{path};
872 next if $have_path{$path}++;
873 if ( $self->interval eq "Z"
874 and $oev->{type} eq "delete"
875 and ! $self->keep_delete_objects_forever
877 # do nothing
878 } else {
879 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
880 $something_done = 1;
882 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
885 if ($something_done) {
886 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
890 sub _merge_something_done {
891 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
892 my $recent = [];
893 my $epoch_conflict = 0;
894 my $last_epoch;
895 ZIP: while (@$other_recent_filtered || @$my_recent) {
896 my $event;
897 if (!@$my_recent ||
898 @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
899 $event = shift @$other_recent_filtered;
900 } else {
901 $event = shift @$my_recent;
902 next ZIP if $have_path->{$event->{path}}++;
904 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
905 $last_epoch = $event->{epoch};
906 push @$recent, $event;
908 if ($epoch_conflict) {
909 my %have_epoch;
910 for (my $i = $#$recent;$i>=0;$i--) {
911 my $epoch = $recent->[$i]{epoch};
912 if ($have_epoch{$epoch}++) {
913 while ($have_epoch{$epoch}) {
914 $epoch = _increase_a_bit($epoch);
916 $recent->[$i]{epoch} = $epoch;
917 $have_epoch{$epoch}++;
921 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
922 $self->dirtymark ( $other->dirtymark );
924 $self->write_recent($recent);
925 $other->merged({
926 time => Time::HiRes::time, # not used anywhere
927 epoch => $recent->[0]{epoch},
928 into_interval => $self->interval, # not used anywhere
930 $other->write_recent($other_recent);
933 sub _merge_sanitycheck {
934 my($self, $other) = @_;
935 if ($self->interval_secs <= $other->interval_secs) {
936 die sprintf
938 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
939 $self->interval_secs,
940 $other->interval_secs,
945 =head2 merged
947 Hashref denoting when this recentfile has been merged into some other
948 at which epoch.
950 =cut
952 sub merged {
953 my($self, $set) = @_;
954 if (defined $set) {
955 $self->_merged ($set);
957 my $merged = $self->_merged;
958 my $into;
959 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
960 # sanity checks
961 if ($into eq $self->interval) {
962 require Carp;
963 Carp::cluck(sprintf
965 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
966 $into,
967 $self->interval,
969 } elsif ($self->interval_secs($into) < $self->interval_secs) {
970 require Carp;
971 Carp::cluck(sprintf
973 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
974 $self->interval_secs($into),
975 $self->interval_secs,
976 $self->interval,
980 $merged;
983 =head2 $hashref = $obj->meta_data
985 Returns the hashref of metadata that the server has to add to the
986 I<recentfile>.
988 =cut
990 sub meta_data {
991 my($self) = @_;
992 my $ret = $self->{meta};
993 for my $m (
994 "aggregator",
995 "canonize",
996 "comment",
997 "dirtymark",
998 "filenameroot",
999 "interval",
1000 "merged",
1001 "minmax",
1002 "protocol",
1003 "serializer_suffix",
1005 my $v = $self->$m;
1006 if (defined $v) {
1007 $ret->{$m} = $v;
1010 # XXX need to reset the Producer if I am a writer, keep it when I
1011 # am a reader
1012 $ret->{Producers} ||= {
1013 __PACKAGE__, "$VERSION", # stringified it looks better
1014 '$0', $0,
1015 'time', Time::HiRes::time,
1017 $ret->{dirtymark} ||= Time::HiRes::time;
1018 return $ret;
1021 =head2 $success = $obj->mirror ( %options )
1023 Mirrors the files in this I<recentfile> as reported by
1024 C<recent_events>. Options named C<after>, C<before>, C<max>, and
1025 C<skip-deletes> are passed through to the C<recent_events> call. The
1026 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
1027 C<max_files_per_connection> and keep track of the rsynced files so
1028 that future calls will rsync different files until all files are
1029 brought to sync.
1031 =cut
1033 sub mirror {
1034 my($self, %options) = @_;
1035 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1036 $self->_use_tempfile (1);
1037 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
1038 my ($recent_events) = $self->recent_events(%passthrough);
1039 my(@error, @dlcollector); # download-collector: array containing paths we need
1040 my $first_item = 0;
1041 my $last_item = $#$recent_events;
1042 my $done = $self->done;
1043 my $pathdb = $self->_pathdb;
1044 ITEM: for my $i ($first_item..$last_item) {
1045 my $status = +{};
1046 $self->_mirror_item
1049 $recent_events,
1050 $last_item,
1051 $done,
1052 $pathdb,
1053 \@dlcollector,
1054 \%options,
1055 $status,
1056 \@error,
1058 last if $i == $last_item;
1059 if ($status->{mustreturn}){
1060 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1061 # looks like a bug somewhere else
1062 my $t = $self->_current_tempfile;
1063 unlink $t or die "Could not unlink '$t': $!";
1064 $self->_current_tempfile(undef);
1066 return;
1069 if (@dlcollector) {
1070 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
1071 if (!$success || $@) {
1072 warn "Warning: Unknown error while mirroring: $@";
1073 push @error, $@;
1074 sleep 1;
1077 if ($self->verbose) {
1078 print STDERR "DONE\n";
1080 # once we've gone to the end we consider ourselves free of obligations
1081 $self->unseed;
1082 $self->_mirror_unhide_tempfile ($trecentfile);
1083 $self->_mirror_perform_delayed_ops;
1084 return !@error;
1087 sub _mirror_item {
1088 my($self,
1090 $recent_events,
1091 $last_item,
1092 $done,
1093 $pathdb,
1094 $dlcollector,
1095 $options,
1096 $status,
1097 $error,
1098 ) = @_;
1099 my $recent_event = $recent_events->[$i];
1100 return if $done->covered ( $recent_event->{epoch} );
1101 if ($pathdb) {
1102 my $rec = $pathdb->{$recent_event->{path}};
1103 if ($rec && $rec->{recentepoch}) {
1104 if (_bigfloatgt
1105 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1106 $done->register ($recent_events, [$i]);
1107 return;
1111 my $dst = $self->local_path($recent_event->{path});
1112 if ($recent_event->{type} eq "new"){
1113 $self->_mirror_item_new
1115 $dst,
1117 $last_item,
1118 $recent_events,
1119 $recent_event,
1120 $dlcollector,
1121 $pathdb,
1122 $status,
1123 $error,
1124 $options,
1126 } elsif ($recent_event->{type} eq "delete") {
1127 my $activity;
1128 if ($options->{'skip-deletes'}) {
1129 $activity = "skipped";
1130 } else {
1131 if (! -e $dst) {
1132 $activity = "not_found";
1133 } elsif (-l $dst or not -d _) {
1134 $self->delayed_operations->{unlink}{$dst}++;
1135 $activity = "deleted";
1136 } else {
1137 $self->delayed_operations->{rmdir}{$dst}++;
1138 $activity = "deleted";
1141 $done->register ($recent_events, [$i]);
1142 if ($pathdb) {
1143 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1145 } else {
1146 warn "Warning: invalid upload type '$recent_event->{type}'";
1150 sub _mirror_item_new {
1151 my($self,
1152 $dst,
1154 $last_item,
1155 $recent_events,
1156 $recent_event,
1157 $dlcollector,
1158 $pathdb,
1159 $status,
1160 $error,
1161 $options,
1162 ) = @_;
1163 if ($self->verbose) {
1164 my $doing = -e $dst ? "Sync" : "Get";
1165 printf STDERR
1167 "%-4s %d (%d/%d/%s) %s ... ",
1168 $doing,
1169 time,
1170 1+$i,
1171 1+$last_item,
1172 $self->interval,
1173 $recent_event->{path},
1176 my $max_files_per_connection = $self->max_files_per_connection || 42;
1177 my $success;
1178 if ($self->verbose) {
1179 print STDERR "\n";
1181 push @$dlcollector, { rev => $recent_event, i => $i };
1182 if (@$dlcollector >= $max_files_per_connection) {
1183 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1184 my $sleep = $self->sleep_per_connection;
1185 $sleep = 0.42 unless defined $sleep;
1186 Time::HiRes::sleep $sleep;
1187 if ($options->{piecemeal}) {
1188 $status->{mustreturn} = 1;
1189 return;
1191 } else {
1192 return;
1194 if (!$success || $@) {
1195 warn "Warning: Error while mirroring: $@";
1196 push @$error, $@;
1197 sleep 1;
1199 if ($self->verbose) {
1200 print STDERR "DONE\n";
1204 sub _mirror_dlcollector {
1205 my($self,$xcoll,$pathdb,$recent_events) = @_;
1206 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1207 if ($pathdb) {
1208 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1210 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1211 @$xcoll = ();
1212 return $success;
1215 sub _mirror_register_path {
1216 my($self,$pathdb,$coll,$activity) = @_;
1217 my $time = time;
1218 for my $item (@$coll) {
1219 $pathdb->{$item->{path}} =
1221 recentepoch => $item->{epoch},
1222 ($activity."_on") => $time,
1227 sub _mirror_unhide_tempfile {
1228 my($self, $trecentfile) = @_;
1229 my $rfile = $self->rfile;
1230 if (rename $trecentfile, $rfile) {
1231 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1232 } else {
1233 require Carp;
1234 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1236 $self->_use_tempfile (0);
1237 if (my $ctfh = $self->_current_tempfile_fh) {
1238 $ctfh->unlink_on_destroy (0);
1239 $self->_current_tempfile_fh (undef);
1243 sub _mirror_perform_delayed_ops {
1244 my($self) = @_;
1245 my $delayed = $self->delayed_operations;
1246 for my $dst (keys %{$delayed->{unlink}}) {
1247 unless (unlink $dst) {
1248 require Carp;
1249 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" );
1251 delete $delayed->{unlink}{$dst};
1253 for my $dst (keys %{$delayed->{rmdir}}) {
1254 unless (rmdir $dst) {
1255 require Carp;
1256 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" );
1258 delete $delayed->{rmdir}{$dst};
1262 =head2 (void) $obj->mirror_loop
1264 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1265 What happens/should happen if we miss the interval during a single loop?
1267 =cut
1269 sub mirror_loop {
1270 my($self) = @_;
1271 my $iteration_start = time;
1273 my $Signal = 0;
1274 $SIG{INT} = sub { $Signal++ };
1275 my $loopinterval = $self->loopinterval || 42;
1276 my $after = -999999999;
1277 LOOP: while () {
1278 $self->mirror($after);
1279 last LOOP if $Signal;
1280 my $re = $self->recent_events;
1281 $after = $re->[0]{epoch};
1282 if ($self->verbose) {
1283 local $| = 1;
1284 print "($after)";
1286 if (time - $iteration_start < $loopinterval) {
1287 sleep $iteration_start + $loopinterval - time;
1289 if ($self->verbose) {
1290 local $| = 1;
1291 print "~";
1296 =head2 $success = $obj->mirror_path ( $arrref | $path )
1298 If the argument is a scalar it is treated as a path. The remote path
1299 is mirrored into the local copy. $path is the path found in the
1300 I<recentfile>, i.e. it is relative to the root directory of the
1301 mirror.
1303 If the argument is an array reference then all elements are treated as
1304 a path below the current tree and all are rsynced with a single
1305 command (and a single connection).
1307 =cut
1309 sub mirror_path {
1310 my($self,$path) = @_;
1311 # XXX simplify the two branches such that $path is treated as
1312 # [$path] maybe even demand the argument as an arrayref to
1313 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1314 # interface)
1315 if (ref $path and ref $path eq "ARRAY") {
1316 my $dst = $self->localroot;
1317 mkpath dirname $dst;
1318 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1319 lc $self->filenameroot,
1321 TMPDIR => 1,
1322 UNLINK => 0,
1324 for my $p (@$path) {
1325 print $fh $p, "\n";
1327 $fh->flush;
1328 $fh->unlink_on_destroy(1);
1329 my $gaveup = 0;
1330 my $retried = 0;
1331 while (!$self->rsync->exec
1333 src => join("/",
1334 $self->remoteroot,
1336 dst => $dst,
1337 'files-from' => $fh->filename,
1338 )) {
1339 my(@err) = $self->rsync->err;
1340 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1341 if ($self->verbose) {
1342 warn "Info: ignoring link_stat error '@err'";
1344 return 1;
1346 $self->register_rsync_error (@err);
1347 if (++$retried >= 3) {
1348 warn "XXX giving up.";
1349 $gaveup = 1;
1350 last;
1353 unless ($gaveup) {
1354 $self->un_register_rsync_error ();
1356 } else {
1357 my $dst = $self->local_path($path);
1358 mkpath dirname $dst;
1359 while (!$self->rsync->exec
1361 src => join("/",
1362 $self->remoteroot,
1363 $path
1365 dst => $dst,
1366 )) {
1367 my(@err) = $self->rsync->err;
1368 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1369 if ($self->verbose) {
1370 warn "Info: ignoring link_stat error '@err'";
1372 return 1;
1374 $self->register_rsync_error (@err);
1376 $self->un_register_rsync_error ();
1378 return 1;
1381 sub _my_current_rfile {
1382 my($self) = @_;
1383 my $rfile;
1384 if ($self->_use_tempfile) {
1385 $rfile = $self->_current_tempfile;
1386 } else {
1387 $rfile = $self->rfile;
1389 return $rfile;
1392 =head2 $path = $obj->naive_path_normalize ($path)
1394 Takes an absolute unix style path as argument and canonicalizes it to
1395 a shorter path if possible, removing things like double slashes or
1396 C</./> and removes references to C<../> directories to get a shorter
1397 unambiguos path. This is used to make the code easier that determines
1398 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1400 =cut
1402 sub naive_path_normalize {
1403 my($self,$path) = @_;
1404 $path =~ s|/+|/|g;
1405 1 while $path =~ s|/[^/]+/\.\./|/|;
1406 $path =~ s|/$||;
1407 $path;
1410 =head2 $ret = $obj->read_recent_1 ( $data )
1412 Delegate of C<recent_events()> on protocol 1
1414 =cut
1416 sub read_recent_1 {
1417 my($self, $data) = @_;
1418 return $data->{recent};
1421 =head2 $array_ref = $obj->recent_events ( %options )
1423 Note: the code relies on the resource being written atomically. We
1424 cannot lock because we may have no write access. If the caller has
1425 write access (eg. aggregate() or update()), it has to care for any
1426 necessary locking and it MUST write atomically.
1428 If C<$options{after}> is specified, only file events after this
1429 timestamp are returned.
1431 If C<$options{before}> is specified, only file events before this
1432 timestamp are returned.
1434 IF C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1435 will be returned.
1437 If C<$options{max}> is specified only a maximum of this many events is
1438 returned.
1440 If C<$options{contains}> is specified the value must be a hash
1441 reference containing a query. The query may contain the keys C<epoch>,
1442 C<path>, and C<type>. Each represents a condition that must be met. If
1443 there is more than one such key, the conditions are ANDed.
1445 If C<$options{info}> is specified, it must be a hashref. This hashref
1446 will be filled with metadata about the unfiltered recent_events of
1447 this object, in key C<first> there is the first item, in key C<last>
1448 is the last.
1450 =cut
1452 sub recent_events {
1453 my ($self, %options) = @_;
1454 my $info = $options{info};
1455 if ($self->is_slave) {
1456 # XXX seems dubious, might produce tempfiles without removing them?
1457 $self->get_remote_recentfile_as_tempfile;
1459 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1460 -e $rfile_or_tempfile or return [];
1461 my $suffix = $self->serializer_suffix;
1462 my ($data) = eval {
1463 $self->_try_deserialize
1465 $suffix,
1466 $rfile_or_tempfile,
1469 my $err = $@;
1470 if ($err or !$data) {
1471 return [];
1473 my $re;
1474 if (reftype $data eq 'ARRAY') { # protocol 0
1475 $re = $data;
1476 } else {
1477 $re = $self->_recent_events_protocol_x
1479 $data,
1480 $rfile_or_tempfile,
1483 return $re unless grep {defined $options{$_}} qw(after before contains max);
1484 $self->_recent_events_handle_options ($re, \%options);
1487 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1488 sub _recent_events_handle_options {
1489 my($self, $re, $options) = @_;
1490 my $last_item = $#$re;
1491 my $info = $options->{info};
1492 if ($info) {
1493 $info->{first} = $re->[0];
1494 $info->{last} = $re->[-1];
1496 if (defined $options->{after}) {
1497 if ($re->[0]{epoch} > $options->{after}) {
1498 if (
1499 my $f = first
1500 {$re->[$_]{epoch} <= $options->{after}}
1501 0..$#$re
1503 $last_item = $f-1;
1505 } else {
1506 $last_item = -1;
1509 my $first_item = 0;
1510 if (defined $options->{before}) {
1511 if ($re->[0]{epoch} > $options->{before}) {
1512 if (
1513 my $f = first
1514 {$re->[$_]{epoch} < $options->{before}}
1515 0..$last_item
1517 $first_item = $f;
1519 } else {
1520 $first_item = 0;
1523 if (0 != $first_item || -1 != $last_item) {
1524 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1526 if ($options->{'skip-deletes'}) {
1527 @$re = grep { $_->{type} ne "delete" } @$re;
1529 if (my $contopt = $options->{contains}) {
1530 my $seen_allowed = 0;
1531 for my $allow (qw(epoch path type)) {
1532 if (exists $contopt->{$allow}) {
1533 $seen_allowed++;
1534 my $v = $contopt->{$allow};
1535 @$re = grep { $_->{$allow} eq $v } @$re;
1538 if (keys %$contopt > $seen_allowed) {
1539 require Carp;
1540 Carp::confess
1541 (sprintf "unknown query: %s", join ", ", %$contopt);
1544 if ($options->{max} && @$re > $options->{max}) {
1545 @$re = splice @$re, 0, $options->{max};
1547 $re;
1550 sub _recent_events_protocol_x {
1551 my($self,
1552 $data,
1553 $rfile_or_tempfile,
1554 ) = @_;
1555 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1556 # we may be reading meta for the first time
1557 while (my($k,$v) = each %{$data->{meta}}) {
1558 next if $k ne lc $k; # "Producers"
1559 next if defined $self->$k;
1560 $self->$k($v);
1562 my $re = $self->$meth ($data);
1563 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1564 my $minmax = { mtime => $stat[9] };
1565 if (@$re) {
1566 $minmax->{min} = $re->[-1]{epoch};
1567 $minmax->{max} = $re->[0]{epoch};
1569 $self->minmax ( $minmax );
1570 return $re;
1573 sub _try_deserialize {
1574 my($self,
1575 $suffix,
1576 $rfile_or_tempfile,
1577 ) = @_;
1578 if ($suffix eq ".yaml") {
1579 require YAML::Syck;
1580 YAML::Syck::LoadFile($rfile_or_tempfile);
1581 } elsif ($HAVE->{"Data::Serializer"}) {
1582 my $serializer = Data::Serializer->new
1583 ( serializer => $serializers{$suffix} );
1584 my $serialized = do
1586 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1587 local $/;
1588 <$fh>;
1590 $serializer->raw_deserialize($serialized);
1591 } else {
1592 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1596 sub _refresh_internals {
1597 my($self, $dst) = @_;
1598 my $class = ref $self;
1599 my $rfpeek = $class->new_from_file ($dst);
1600 for my $acc (qw(
1601 _merged
1602 minmax
1603 )) {
1604 $self->$acc ( $rfpeek->$acc );
1606 my $old_dirtymark = $self->dirtymark;
1607 my $new_dirtymark = $rfpeek->dirtymark;
1608 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1609 $self->done->reset;
1610 $self->dirtymark ( $new_dirtymark );
1611 $self->seed;
1615 =head2 $ret = $obj->rfilename
1617 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1618 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1620 =cut
1622 sub rfilename {
1623 my($self) = @_;
1624 my $file = sprintf("%s-%s%s",
1625 $self->filenameroot,
1626 $self->interval,
1627 $self->serializer_suffix,
1629 return $file;
1632 =head2 $str = $self->remote_dir
1634 The directory we are mirroring from.
1636 =cut
1638 sub remote_dir {
1639 my($self, $set) = @_;
1640 if (defined $set) {
1641 $self->_remote_dir ($set);
1643 my $x = $self->_remote_dir;
1644 $self->is_slave (1);
1645 return $x;
1648 =head2 $str = $obj->remoteroot
1650 =head2 (void) $obj->remoteroot ( $set )
1652 Get/Set the composed prefix needed when rsyncing from a remote module.
1653 If remote_host, remote_module, and remote_dir are set, it is composed
1654 from these.
1656 =cut
1658 sub remoteroot {
1659 my($self, $set) = @_;
1660 if (defined $set) {
1661 $self->_remoteroot($set);
1663 my $remoteroot = $self->_remoteroot;
1664 unless (defined $remoteroot) {
1665 $remoteroot = sprintf
1667 "%s%s%s",
1668 defined $self->remote_host ? ($self->remote_host."::") : "",
1669 defined $self->remote_module ? ($self->remote_module."/") : "",
1670 defined $self->remote_dir ? $self->remote_dir : "",
1672 $self->_remoteroot($remoteroot);
1674 return $remoteroot;
1677 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1679 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1680 of the pattern
1682 $filenameroot-$interval$serializer_suffix
1684 e.g.
1686 RECENT-1M.yaml
1688 This filename is split into its parts and the parts are fed to the
1689 object itself.
1691 =cut
1693 sub resolve_recentfilename {
1694 my($self, $rfname) = @_;
1695 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1696 if (my($f,$i,$s) = $rfname =~ $splitter) {
1697 $self->filenameroot ($f);
1698 $self->interval ($i);
1699 $self->serializer_suffix ($s);
1700 } else {
1701 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1703 return;
1706 =head2 my $rfile = $obj->rfile
1708 Returns the full path of the I<recentfile>
1710 =cut
1712 sub rfile {
1713 my($self) = @_;
1714 my $rfile = $self->_rfile;
1715 return $rfile if defined $rfile;
1716 $rfile = File::Spec->catfile
1717 ($self->localroot,
1718 $self->rfilename,
1720 $self->_rfile ($rfile);
1721 return $rfile;
1724 =head2 $rsync_obj = $obj->rsync
1726 The File::Rsync object that this object uses for communicating with an
1727 upstream server.
1729 =cut
1731 sub rsync {
1732 my($self) = @_;
1733 my $rsync = $self->_rsync;
1734 unless (defined $rsync) {
1735 my $rsync_options = $self->rsync_options || {};
1736 if ($HAVE->{"File::Rsync"}) {
1737 $rsync = File::Rsync->new($rsync_options);
1738 $self->_rsync($rsync);
1739 } else {
1740 die "File::Rsync required for rsync operations. Cannot continue";
1743 return $rsync;
1746 =head2 (void) $obj->register_rsync_error(@err)
1748 =head2 (void) $obj->un_register_rsync_error()
1750 Register_rsync_error is called whenever the File::Rsync object fails
1751 on an exec (say, connection doesn't succeed). It issues a warning and
1752 sleeps for an increasing amount of time. Un_register_rsync_error
1753 resets the error count. See also accessor C<max_rsync_errors>.
1755 =cut
1758 my $no_success_count = 0;
1759 my $no_success_time = 0;
1760 sub register_rsync_error {
1761 my($self, @err) = @_;
1762 chomp @err;
1763 $no_success_time = time;
1764 $no_success_count++;
1765 my $max_rsync_errors = $self->max_rsync_errors;
1766 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1767 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1768 require Carp;
1769 Carp::confess
1771 sprintf
1773 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1774 $self->interval,
1775 join(" ",@err),
1776 $no_success_count,
1779 my $sleep = 12 * $no_success_count;
1780 $sleep = 300 if $sleep > 300;
1781 require Carp;
1782 Carp::cluck
1783 (sprintf
1785 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1786 scalar(localtime($no_success_time)),
1787 $self->interval,
1788 join(" ",@err),
1789 $sleep,
1791 sleep $sleep
1793 sub un_register_rsync_error {
1794 my($self) = @_;
1795 $no_success_time = 0;
1796 $no_success_count = 0;
1800 =head2 $clone = $obj->_sparse_clone
1802 Clones just as much from itself that it does not hurt. Experimental
1803 method.
1805 Note: what fits better: sparse or shallow? Other suggestions?
1807 =cut
1809 sub _sparse_clone {
1810 my($self) = @_;
1811 my $new = bless {}, ref $self;
1812 for my $m (qw(
1813 _interval
1814 _localroot
1815 _remoteroot
1816 _rfile
1817 _use_tempfile
1818 aggregator
1819 filenameroot
1820 is_slave
1821 max_files_per_connection
1822 protocol
1823 rsync_options
1824 serializer_suffix
1825 sleep_per_connection
1826 tempdir
1827 verbose
1828 )) {
1829 my $o = $self->$m;
1830 $o = Storable::dclone $o if ref $o;
1831 $new->$m($o);
1833 $new;
1836 =head2 $boolean = OBJ->ttl_reached ()
1838 =cut
1840 sub ttl_reached {
1841 my($self) = @_;
1842 my $have_mirrored = $self->have_mirrored || 0;
1843 my $now = Time::HiRes::time;
1844 my $ttl = $self->ttl;
1845 $ttl = 24.2 unless defined $ttl;
1846 if ($now > $have_mirrored + $ttl) {
1847 return 1;
1849 return 0;
1852 =head2 (void) $obj->unlock()
1854 Unlocking is implemented with an C<rmdir> on a locking directory
1855 (C<.lock> appended to $rfile).
1857 =cut
1859 sub unlock {
1860 my($self) = @_;
1861 return unless $self->_is_locked;
1862 my $rfile = $self->rfile;
1863 rmdir "$rfile.lock";
1864 $self->_is_locked (0);
1867 =head2 unseed
1869 Sets this recentfile in the state of not 'seeded'.
1871 =cut
1872 sub unseed {
1873 my($self) = @_;
1874 $self->seeded(0);
1877 =head2 $ret = $obj->update ($path, $type)
1879 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1881 =head2 $ret = $obj->update ()
1883 Enter one file into the local I<recentfile>. $path is the (usually
1884 absolute) path. If the path is outside I<our> tree, then it is
1885 ignored.
1887 C<$type> is one of C<new> or C<delete>.
1889 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1890 not used and the epoch is calculated by the update() routine itself
1891 based on current time. But if there is the demand to insert a
1892 not-so-current file into the dataset, then the caller sets
1893 $dirty_epoch. This causes the epoch of the registered event to become
1894 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1895 bit more. As compensation the dirtymark of the whole dataset is set to
1896 now or the current epoch, whichever is higher. Note: setting the
1897 dirty_epoch to the future is prohibited as it's very unlikely to be
1898 intended: it definitely might wreak havoc with the index files.
1900 The new file event is unshifted (or, if dirty_epoch is set, inserted
1901 at the place it belongs to, according to the rule to have a sequence
1902 of strictly decreasing timestamps) to the array of recent_events and
1903 the array is shortened to the length of the timespan allowed. This is
1904 usually the timespan specified by the interval of this recentfile but
1905 as long as this recentfile has not been merged to another one, the
1906 timespan may grow without bounds.
1908 The third form runs an update without inserting a new file. This may
1909 be desired to truncate a recentfile.
1911 =cut
1912 sub _epoch_monotonically_increasing {
1913 my($self,$epoch,$recent) = @_;
1914 return $epoch unless @$recent; # the first one goes unoffended
1915 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1916 return $epoch;
1917 } else {
1918 return _increase_a_bit($recent->[0]{epoch});
1921 sub update {
1922 my($self,$path,$type,$dirty_epoch) = @_;
1923 if (defined $path or defined $type or defined $dirty_epoch) {
1924 die "update called without path argument" unless defined $path;
1925 die "update called without type argument" unless defined $type;
1926 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1927 # since we have keep_delete_objects_forever we must let them inject delete objects too:
1928 #die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1929 # "dirty_epoch only allowed with type=new" if defined $dirty_epoch and $type ne "new";
1930 my $canonmeth = $self->canonize;
1931 unless ($canonmeth) {
1932 $canonmeth = "naive_path_normalize";
1934 $path = $self->$canonmeth($path);
1936 my $lrd = $self->localroot;
1937 $self->lock;
1938 # you must calculate the time after having locked, of course
1939 my $now = Time::HiRes::time;
1940 my $interval = $self->interval;
1941 my $secs = $self->interval_secs();
1942 my $recent = $self->recent_events;
1944 my $epoch;
1945 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
1946 $epoch = $dirty_epoch;
1947 } else {
1948 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
1951 $recent ||= [];
1952 my $oldest_allowed = 0;
1953 my $merged = $self->merged;
1954 if ($merged->{epoch}) {
1955 my $virtualnow = _bigfloatmax($now,$epoch);
1956 # for the lower bound I think we need no big math, we calc already
1957 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
1958 } else {
1959 # as long as we are not merged at all, no limits!
1961 my $something_done = 0;
1962 TRUNCATE: while (@$recent) {
1963 # $DB::single++ unless defined $oldest_allowed;
1964 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
1965 pop @$recent;
1966 $something_done = 1;
1967 } else {
1968 last TRUNCATE;
1971 if (defined $path && $path =~ s|^\Q$lrd\E||) {
1972 $path =~ s|^/||;
1973 my $splicepos;
1974 # remove the older duplicates of this $path, irrespective of $type:
1975 if (defined $dirty_epoch) {
1976 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
1977 $recent = $ctx->{recent};
1978 $splicepos = $ctx->{splicepos};
1979 $epoch = $ctx->{epoch};
1980 my $dirtymark = $self->dirtymark;
1981 my $new_dm = $now;
1982 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
1983 $new_dm = $epoch;
1985 $self->dirtymark($new_dm);
1986 my $merged = $self->merged;
1987 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
1988 $self->merged(+{});
1990 } else {
1991 $recent = [ grep { $_->{path} ne $path } @$recent ];
1992 $splicepos = 0;
1994 if (defined $splicepos) {
1995 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
1997 $something_done = 1;
2000 $self->write_recent($recent) if $something_done;
2001 $self->_assert_symlink;
2002 $self->unlock;
2005 sub _update_with_dirty_epoch {
2006 my($self,$path,$recent,$epoch) = @_;
2007 my $splicepos;
2008 my $new_recent = [];
2009 if (grep { $_->{path} ne $path } @$recent) {
2010 my $cancel = 0;
2011 KNOWN_EVENT: for my $i (0..$#$recent) {
2012 if ($recent->[$i]{path} eq $path) {
2013 if ($recent->[$i]{epoch} eq $epoch) {
2014 # nothing to do
2015 $cancel = 1;
2016 last KNOWN_EVENT;
2018 } else {
2019 push @$new_recent, $recent->[$i];
2022 @$recent = @$new_recent unless $cancel;
2024 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
2025 $splicepos = 0;
2026 } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
2027 $splicepos = @$recent;
2028 } else {
2029 RECENT: for my $i (0..$#$recent) {
2030 my $ev = $recent->[$i];
2031 if ($epoch eq $recent->[$i]{epoch}) {
2032 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
2034 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
2035 $splicepos = $i;
2036 last RECENT;
2040 return {
2041 recent => $recent,
2042 splicepos => $splicepos,
2043 epoch => $epoch,
2047 =head2 seed
2049 Sets this recentfile in the state of 'seeded' which means it has to
2050 re-evaluate its uptodateness.
2052 =cut
2053 sub seed {
2054 my($self) = @_;
2055 $self->seeded(1);
2058 =head2 seeded
2060 Tells if the recentfile is in the state 'seeded'.
2062 =cut
2063 sub seeded {
2064 my($self, $set) = @_;
2065 if (defined $set) {
2066 $self->_seeded ($set);
2068 my $x = $self->_seeded;
2069 unless (defined $x) {
2070 $x = 0;
2071 $self->_seeded ($x);
2073 return $x;
2076 =head2 uptodate
2078 True if this object has mirrored the complete interval covered by the
2079 current recentfile.
2081 =cut
2082 sub uptodate {
2083 my($self) = @_;
2084 my $uptodate;
2085 my $why;
2086 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2087 $why = "saturated";
2088 $uptodate = 1;
2090 # it's too easy to misconfigure ttl and related timings and then
2091 # never reach uptodateness, so disabled 2009-03-22
2092 if (0 and not defined $uptodate) {
2093 if ($self->ttl_reached){
2094 $why = "ttl_reached returned true, so we are not uptodate";
2095 $uptodate = 0 ;
2098 unless (defined $uptodate) {
2099 # look if recentfile has unchanged timestamp
2100 my $minmax = $self->minmax;
2101 if (exists $minmax->{mtime}) {
2102 my $rfile = $self->_my_current_rfile;
2103 my @stat = stat $rfile;
2104 my $mtime = $stat[9];
2105 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
2106 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2107 $uptodate = 0;
2108 } else {
2109 my $covered = $self->done->covered(@$minmax{qw(max min)});
2110 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2111 $uptodate = $covered;
2115 unless (defined $uptodate) {
2116 $why = "fallthrough, so not uptodate";
2117 $uptodate = 0;
2119 if ($uptodate) {
2120 $self->_uptodateness_ever_reached(1);
2122 my $remember =
2124 uptodate => $uptodate,
2125 why => $why,
2127 $self->_remember_last_uptodate_call($remember);
2128 return $uptodate;
2131 =head2 $obj->write_recent ($recent_files_arrayref)
2133 Writes a I<recentfile> based on the current reflection of the current
2134 state of the tree limited by the current interval.
2136 =cut
2137 sub _resort {
2138 my($self) = @_;
2139 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
2140 return;
2142 sub write_recent {
2143 my ($self,$recent) = @_;
2144 die "write_recent called without argument" unless defined $recent;
2145 my $Last_epoch;
2146 SANITYCHECK: for my $i (0..$#$recent) {
2147 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2148 require Carp;
2149 Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2150 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2151 # you may want to:
2152 # $self->_resort($recent);
2153 # last SANITYCHECK;
2155 $Last_epoch = $recent->[$i]{epoch};
2157 my $minmax = $self->minmax;
2158 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2159 $minmax->{max} = $recent->[0]{epoch};
2161 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2162 $minmax->{min} = $recent->[-1]{epoch};
2164 $self->minmax($minmax);
2165 my $meth = sprintf "write_%d", $self->protocol;
2166 $self->$meth($recent);
2169 =head2 $obj->write_0 ($recent_files_arrayref)
2171 Delegate of C<write_recent()> on protocol 0
2173 =cut
2175 sub write_0 {
2176 my ($self,$recent) = @_;
2177 my $rfile = $self->rfile;
2178 YAML::Syck::DumpFile("$rfile.new",$recent);
2179 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2182 =head2 $obj->write_1 ($recent_files_arrayref)
2184 Delegate of C<write_recent()> on protocol 1
2186 =cut
2188 sub write_1 {
2189 my ($self,$recent) = @_;
2190 my $rfile = $self->rfile;
2191 my $suffix = $self->serializer_suffix;
2192 my $data = {
2193 meta => $self->meta_data,
2194 recent => $recent,
2196 my $serialized;
2197 if ($suffix eq ".yaml") {
2198 $serialized = YAML::Syck::Dump($data);
2199 } elsif ($HAVE->{"Data::Serializer"}) {
2200 my $serializer = Data::Serializer->new
2201 ( serializer => $serializers{$suffix} );
2202 $serialized = $serializer->raw_serialize($data);
2203 } else {
2204 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2206 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2207 print $fh $serialized;
2208 close $fh or die "Could not close '$rfile.new': $!";
2209 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2212 BEGIN {
2213 my $nq = qr/[^"]+/; # non-quotes
2214 my @pod_lines =
2215 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2217 =head1 SERIALIZERS
2219 The following suffixes are supported and trigger the use of these
2220 serializers:
2222 =over 4
2224 =item C<< ".yaml" => "YAML::Syck" >>
2226 =item C<< ".json" => "JSON" >>
2228 =item C<< ".sto" => "Storable" >>
2230 =item C<< ".dd" => "Data::Dumper" >>
2232 =back
2234 =cut
2236 BEGIN {
2237 my @pod_lines =
2238 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2240 =head1 INTERVAL SPEC
2242 An interval spec is a primitive way to express time spans. Normally it
2243 is composed from an integer and a letter.
2245 As a special case, a string that consists only of the single letter
2246 C<Z>, stands for MAX_INT seconds.
2248 The following letters express the specified number of seconds:
2250 =over 4
2252 =item C<< s => 1 >>
2254 =item C<< m => 60 >>
2256 =item C<< h => 60*60 >>
2258 =item C<< d => 60*60*24 >>
2260 =item C<< W => 60*60*24*7 >>
2262 =item C<< M => 60*60*24*30 >>
2264 =item C<< Q => 60*60*24*90 >>
2266 =item C<< Y => 60*60*24*365.25 >>
2268 =back
2270 =cut
2272 =head1 SEE ALSO
2274 L<File::Rsync::Mirror::Recent>,
2275 L<File::Rsync::Mirror::Recentfile::Done>,
2276 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2278 =head1 BUGS
2280 Please report any bugs or feature requests through the web interface
2282 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2283 I will be notified, and then you'll automatically be notified of
2284 progress on your bug as I make changes.
2286 =head1 KNOWN BUGS
2288 Memory hungry: it seems all memory is allocated during the initial
2289 rsync where a list of all files is maintained in memory.
2291 =head1 SUPPORT
2293 You can find documentation for this module with the perldoc command.
2295 perldoc File::Rsync::Mirror::Recentfile
2297 You can also look for information at:
2299 =over 4
2301 =item * RT: CPAN's request tracker
2303 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2305 =item * AnnoCPAN: Annotated CPAN documentation
2307 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2309 =item * CPAN Ratings
2311 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2313 =item * Search CPAN
2315 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2317 =back
2320 =head1 ACKNOWLEDGEMENTS
2322 Thanks to RJBS for module-starter.
2324 =head1 AUTHOR
2326 Andreas König
2328 =head1 COPYRIGHT & LICENSE
2330 Copyright 2008,2009 Andreas König.
2332 This program is free software; you can redistribute it and/or modify it
2333 under the same terms as Perl itself.
2336 =cut
2338 1; # End of File::Rsync::Mirror::Recentfile
2340 # Local Variables:
2341 # mode: cperl
2342 # cperl-indent-level: 4
2343 # End: