1 package File
::Rsync
::Mirror
::Recentfile
;
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
19 $HAVE->{$package} = eval qq{ require $package; };
22 use File
::Basename
qw(basename dirname fileparse);
23 use File
::Copy
qw(cp);
24 use File
::Path
qw(mkpath);
25 use File
::Rsync
::Mirror
::Recentfile
::FakeBigFloat
qw(:all);
27 use List
::Util
qw(first max min);
28 use Scalar
::Util
qw(reftype);
33 use version
; our $VERSION = qv
('0.0.1');
35 use constant MAX_INT
=> ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL
=> 1;
41 # maybe subclass if this mapping is bad?
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
64 ignore_link_stat_errors => 1,
66 localroot => "/home/ftp/pub/PAUSE/authors",
68 remote_host => "pause.perl.org",
69 remote_module => "authors",
72 'rsync-path' => '/usr/bin/rsync',
75 'omit-dir-times' => 1,
82 Aggregator (usually the writer):
84 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
89 Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
90 is always composed of several recentfiles, controlled by the
91 F:R:M:Recent object. The Recentfile object has to do the bookkeeping
92 for a single timeslice.
98 =head1 CONSTRUCTORS / DESTRUCTOR
100 =head2 my $obj = CLASS->new(%hash)
102 Constructor. On every argument pair the key is a method name and the
103 value is an argument to that method name.
105 If a recentfile for this resource already exists, metadata that are
106 not defined by the constructor will be fetched from there as soon as
107 it is being read by recent_events().
112 my($class, @args) = @_;
113 my $self = bless {}, $class;
115 my($method,$arg) = splice @args, 0, 2;
116 $self->$method($arg);
118 unless (defined $self->protocol) {
119 $self->protocol(DEFAULT_PROTOCOL
);
121 unless (defined $self->filenameroot) {
122 $self->filenameroot("RECENT");
124 unless (defined $self->serializer_suffix) {
125 $self->serializer_suffix(".yaml");
130 =head2 my $obj = CLASS->new_from_file($file)
132 Constructor. $file is a I<recentfile>.
137 my($class, $file) = @_;
138 my $self = bless {}, $class;
139 $self->_rfile($file);
141 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
145 # XXX: we can skip this step when the metadata are sufficient, but
146 # we cannot parse the file without some magic stuff about
149 my($name,$path) = fileparse
$file;
150 my $symlink = readlink $file;
151 if ($symlink =~ m
|/|) {
152 die "FIXME: filenames containing '/' not supported, got $symlink";
154 $file = File
::Spec
->catfile ( $path, $symlink );
156 my($name,$path,$suffix) = fileparse
$file, keys %serializers;
157 $self->serializer_suffix($suffix);
158 $self->localroot($path);
159 die "Could not determine file format from suffix" unless $suffix;
161 if ($suffix eq ".yaml") {
163 $deserialized = YAML
::Syck
::LoadFile
($file);
164 } elsif ($HAVE->{"Data::Serializer"}) {
165 my $serializer = Data
::Serializer
->new
166 ( serializer
=> $serializers{$suffix} );
167 $deserialized = $serializer->raw_deserialize($serialized);
169 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
171 while (my($k,$v) = each %{$deserialized->{meta
}}) {
172 next if $k ne lc $k; # "Producers"
175 unless (defined $self->protocol) {
176 $self->protocol(DEFAULT_PROTOCOL
);
189 unless ($self->_current_tempfile_fh) {
190 if (my $tempfile = $self->_current_tempfile) {
192 unlink $tempfile; # may fail in global destruction
207 "_current_tempfile_fh",
208 "_delayed_operations",
215 "_remember_last_uptodate_call",
220 "__verified_tempdir",
222 "_uptodateness_ever_reached",
227 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
233 A list of interval specs that tell the aggregator which I<recentfile>s
238 The name of a method to canonize the path before rsyncing. Only
239 supported value is C<naive_path_normalize>. Defaults to that.
243 A comment about this tree and setup.
247 A timestamp. The dirtymark is updated whenever an out of band change
248 on the origin server is performed that violates the protocol. Say,
249 they add or remove files in the middle somewhere. Slaves must react
250 with a devaluation of their C<done> structure which then leads to a
251 full re-sync of all files. Implementation note: dirtymark may increase
256 The (prefix of the) filename we use for this I<recentfile>. Defaults to
257 C<RECENT>. The string must not contain a directory separator.
261 Timestamp remembering when we mirrored this recentfile the last time.
262 Only relevant for slaves.
264 =item ignore_link_stat_errors
266 If set to true, rsync errors are ignored that complain about link stat
267 errors. These seem to happen only when there are files missing at the
268 origin. In race conditions this can always happen, so it is
269 recommended to set this value to true.
273 If set to true, this object will fetch a new recentfile from remote
274 when the timespan between the last mirror (see have_mirrored) and now
275 is too large (see C<ttl>).
277 =item keep_delete_objects_forever
279 The default for delete events is that they are passed through the
280 collection of recentfile objects until they reach the Z file. There
281 they get dropped so that the associated file object ceases to exist at
282 all. By setting C<keep_delete_objects_forever> the delete objects are
283 kept forever. This makes the Z file larger but has the advantage that
284 slaves that have interrupted mirroring for a long time still can clean
289 After how many seconds shall we die if we cannot lock a I<recentfile>?
290 Defaults to 600 seconds.
294 When mirror_loop is called, this accessor can specify how much time
295 every loop shall at least take. If the work of a loop is done before
296 that time has gone, sleeps for the rest of the time. Defaults to
297 arbitrary 42 seconds.
299 =item max_files_per_connection
301 Maximum number of files that are transferred on a single rsync call.
302 Setting it higher means higher performance at the price of holding
303 connections longer and potentially disturbing other users in the pool.
304 Defaults to the arbitrary value 42.
306 =item max_rsync_errors
308 When rsync operations encounter that many errors without any resetting
309 success in between, then we die. Defaults to unlimited. A value of
310 -1 means we run forever ignoring all rsync errors.
314 Hashref remembering when we read the recent_events from this file the
315 last time and what the timespan was.
319 When the RECENT file format changes, we increment the protocol. We try
320 to support older protocols in later releases.
324 The host we are mirroring from. Leave empty for the local filesystem.
328 Rsync servers have so called modules to separate directory trees from
329 each other. Put here the name of the module under which we are
330 mirroring. Leave empty for local filesystem.
334 Things like compress, links, times or checksums. Passed in to the
335 File::Rsync object used to run the mirror.
337 =item serializer_suffix
339 Mostly untested accessor. The only well tested format for
340 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
341 Data::Serializer. But in principle other formats are supported as
342 well. See section SERIALIZERS below.
344 =item sleep_per_connection
346 Sleep that many seconds (floating point OK) after every chunk of rsyncing
347 has finished. Defaults to arbitrary 0.42.
351 Directory to write temporary files to. Must allow rename operations
352 into the tree which usually means it must live on the same partition
353 as the target directory. Defaults to C<< $self->localroot >>.
357 Time to live. Number of seconds after which this recentfile must be
358 fetched again from the origin server. Only relevant for slaves.
359 Defaults to arbitrary 24.2 seconds.
363 Boolean to turn on a bit verbosity.
369 use accessors
@accessors;
373 =head2 (void) $obj->aggregate( %options )
375 Takes all intervals that are collected in the accessor called
376 aggregator. Sorts them by actual length of the interval.
377 Removes those that are shorter than our own interval. Then merges this
378 object into the next larger object. The merging continues upwards
379 as long as the next I<recentfile> is old enough to warrant a merge.
381 If a merge is warranted is decided according to the interval of the
382 previous interval so that larger files are not so often updated as
383 smaller ones. If $options{force} is true, all files get updated.
385 Here is an example to illustrate the behaviour. Given aggregators
391 1h updates 1d on every call to aggregate()
392 1d updates 1W earliest after 1h
393 1W updates 1M earliest after 1d
394 1M updates 1Q earliest after 1W
395 1Q updates 1Y earliest after 1M
396 1Y updates Z earliest after 1Q
398 Note that all but the smallest recentfile get updated at an arbitrary
399 rate and as such are quite useless on their own.
404 my($self, %option) = @_;
405 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
406 grep { $_->{secs
} >= $self->interval_secs }
407 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
408 $self->interval, @
{$self->aggregator || []};
410 $aggs[0]{object
} = $self;
411 AGGREGATOR
: for my $i (0..$#aggs-1) {
412 my $this = $aggs[$i]{object
};
413 my $next = $this->_sparse_clone;
414 $next->interval($aggs[$i+1]{interval
});
416 if ($option{force
} || $i == 0) {
419 my $next_rfile = $next->rfile;
420 if (-e
$next_rfile) {
421 my $prev = $aggs[$i-1]{object
};
423 my $next_age = 86400 * -M
$next_rfile;
424 if ($next_age > $prev->interval_secs) {
433 $aggs[$i+1]{object
} = $next;
440 # collect file size and mtime for all files of this aggregate
441 sub _debug_aggregate
{
443 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
444 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
445 $self->interval, @
{$self->aggregator || []};
447 for my $i (0..$#aggs) {
448 my $this = Storable
::dclone
$self;
449 $this->interval($aggs[$i]{interval
});
450 my $rfile = $this->rfile;
451 my @stat = stat $rfile;
452 push @
$report, {rfile
=> $rfile, size
=> $stat[7], mtime
=> $stat[9]};
457 # (void) $self->_assert_symlink()
458 sub _assert_symlink
{
460 my $recentrecentfile = File
::Spec
->catfile
469 if ($Config{d_symlink
} eq "define") {
470 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
471 if (-l
$recentrecentfile) {
472 my $found_symlink = readlink $recentrecentfile;
473 if ($found_symlink eq $self->rfilename) {
476 $howto_create_symlink = 2;
479 $howto_create_symlink = 1;
481 if (1 == $howto_create_symlink) {
482 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
484 unlink "$recentrecentfile.$$"; # may fail
485 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
486 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
489 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
490 unlink "$recentrecentfile.$$"; # may fail
491 cp
$self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
492 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
496 =head2 $hashref = $obj->delayed_operations
498 A hash of hashes containing unlink and rmdir operations which had to
499 wait until the recentfile got unhidden in order to not confuse
500 downstream mirrors (in case we have some).
504 sub delayed_operations
{
506 my $x = $self->_delayed_operations;
507 unless (defined $x) {
512 $self->_delayed_operations ($x);
517 =head2 $done = $obj->done
519 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
520 object that keeps track of rsync activities. Only needed and used when
521 we are a mirroring slave.
527 my $done = $self->_done;
529 require File
::Rsync
::Mirror
::Recentfile
::Done
;
530 $done = File
::Rsync
::Mirror
::Recentfile
::Done
->new();
531 $done->_rfinterval ($self->interval);
532 $self->_done ( $done );
537 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
539 Stores the remote I<recentfile> locally as a tempfile. The caller is
540 responsible to remove the file after use.
542 Note: if you're intending to act as an rsync server for other slaves,
543 then you must prefer this method to fetch that file with
544 get_remotefile(). Otherwise downstream mirrors would expect you to
545 already have mirrored all the files that are in the I<recentfile>
546 before you have them mirrored.
550 sub get_remote_recentfile_as_tempfile
{
552 mkpath
$self->localroot;
555 if ( $self->_use_tempfile() ) {
556 if ($self->ttl_reached) {
557 $fh = $self->_current_tempfile_fh;
558 $trfilename = $self->rfilename;
560 return $self->_current_tempfile;
563 $trfilename = $self->rfilename;
568 $dst = $self->_current_tempfile;
570 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
571 $dst = $fh->filename;
572 $self->_current_tempfile ($dst);
573 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
574 if (defined $rfile && -e
$rfile) {
575 # saving on bandwidth. Might need to be configurable
576 # $self->bandwidth_is_cheap?
577 cp
$rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
584 if ($self->verbose) {
585 my $doing = -e
$dst ?
"Sync" : "Get";
586 my $display_dst = join "/", "...", basename
(dirname
($dst)), basename
($dst);
589 "%-4s %d (1/1/%s) temp %s ... ",
598 while (!$self->rsync->exec(
602 $self->register_rsync_error ($self->rsync->err);
603 if (++$retried >= 3) {
604 warn "XXX giving up";
610 printf STDERR
"Warning: gave up mirroring %s, will try again later", $self->interval;
612 $self->_refresh_internals ($dst);
613 $self->have_mirrored (Time
::HiRes
::time);
614 $self->un_register_rsync_error ();
617 if ($self->verbose) {
618 print STDERR
"DONE\n";
621 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
625 sub _verified_tempdir
{
627 my $tempdir = $self->__verified_tempdir();
628 return $tempdir if defined $tempdir;
629 unless ($tempdir = $self->tempdir) {
630 $tempdir = $self->localroot;
632 unless (-d
$tempdir) {
635 $self->__verified_tempdir($tempdir);
639 sub _get_remote_rat_provide_tempfile_object
{
640 my($self, $trfilename) = @_;
641 my $_verified_tempdir = $self->_verified_tempdir;
642 my $fh = File
::Temp
->new
643 (TEMPLATE
=> sprintf(".FRMRecent-%s-XXXX",
646 DIR
=> $_verified_tempdir,
647 SUFFIX
=> $self->serializer_suffix,
648 UNLINK
=> $self->_use_tempfile,
651 my $dst = $fh->filename;
652 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
653 if ($self->_use_tempfile) {
654 $self->_current_tempfile_fh ($fh); # delay self destruction
659 =head2 $localpath = $obj->get_remotefile ( $relative_path )
661 Rsyncs one single remote file to local filesystem.
663 Note: no locking is done on this file. Any number of processes may
666 Note II: do not use for recentfiles. If you are a cascading
667 slave/server combination, it would confuse other slaves. They would
668 expect the contents of these recentfiles to be available. Use
669 get_remote_recentfile_as_tempfile() instead.
674 my($self, $path) = @_;
675 my $dst = File
::Spec
->catfile($self->localroot, $path);
677 if ($self->verbose) {
678 my $doing = -e
$dst ?
"Sync" : "Get";
681 "%-4s %d (1/1/%s) %s ... ",
688 while (!$self->rsync->exec(
694 $self->register_rsync_error ($self->rsync->err);
696 $self->un_register_rsync_error ();
697 if ($self->verbose) {
698 print STDERR
"DONE\n";
703 =head2 $obj->interval ( $interval_spec )
705 Get/set accessor. $interval_spec is a string and described below in
706 the section INTERVAL SPEC.
711 my ($self, $interval) = @_;
713 $self->_interval($interval);
714 $self->_rfile(undef);
716 $interval = $self->_interval;
717 unless (defined $interval) {
718 # do not ask the $self too much, it recurses!
720 Carp
::confess
("Alert: interval undefined for '".$self."'. Cannot continue.");
725 =head2 $secs = $obj->interval_secs ( $interval_spec )
727 $interval_spec is described below in the section INTERVAL SPEC. If
728 empty defaults to the inherent interval for this object.
733 my ($self, $interval) = @_;
734 $interval ||= $self->interval;
735 unless (defined $interval) {
736 die "interval_secs() called without argument on an object without a declared one";
738 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
739 die "Could not determine seconds from interval[$interval]";
740 if ($interval eq "Z") {
742 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
743 return $seconds{$t}*$n;
745 die "Invalid interval specification: n[$n]t[$t]";
749 =head2 $obj->localroot ( $localroot )
751 Get/set accessor. The local root of the tree.
756 my ($self, $localroot) = @_;
758 $self->_localroot($localroot);
759 $self->_rfile(undef);
761 $localroot = $self->_localroot;
764 =head2 $ret = $obj->local_path($path_found_in_recentfile)
766 Combines the path to our local mirror and the path of an object found
767 in this I<recentfile>. In other words: the target of a mirror operation.
769 Implementation note: We split on slashes and then use
770 File::Spec::catfile to adjust to the local operating system.
775 my($self,$path) = @_;
776 unless (defined $path) {
777 # seems like a degenerated case
778 return $self->localroot;
780 my @p = split m
|/|, $path;
781 File
::Spec
->catfile($self->localroot,@p);
784 =head2 (void) $obj->lock
786 Locking is implemented with an C<mkdir> on a locking directory
787 (C<.lock> appended to $rfile).
793 # not using flock because it locks on filehandles instead of
794 # old school ressources.
795 my $locked = $self->_is_locked and return;
796 my $rfile = $self->rfile;
797 # XXX need a way to allow breaking the lock
799 my $locktimeout = $self->locktimeout || 600;
800 while (not mkdir "$rfile.lock") {
801 Time
::HiRes
::sleep 0.01;
802 if (time - $start > $locktimeout) {
803 die "Could not acquire lockdirectory '$rfile.lock': $!";
806 $self->_is_locked (1);
809 =head2 (void) $obj->merge ($other)
811 Bulk update of this object with another one. It's used to merge a
812 smaller and younger $other object into the current one. If this file
813 is a C<Z> file, then we normally do not merge in objects of type
814 C<delete>; this can be overridden by setting
815 keep_delete_objects_forever. But if we encounter an object of type
816 delete we delete the corresponding C<new> object if we have it.
818 If there is nothing to be merged, nothing is done.
823 my($self, $other) = @_;
824 $self->_merge_sanitycheck ( $other );
826 my $other_recent = $other->recent_events || [];
827 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
829 $self->_merge_locked ( $other, $other_recent );
835 my($self, $other, $other_recent) = @_;
836 my $my_recent = $self->recent_events || [];
838 # calculate the target time span
839 my $myepoch = $my_recent->[0] ?
$my_recent->[0]{epoch
} : undef;
840 my $epoch = $other_recent->[0] ?
$other_recent->[0]{epoch
} : $myepoch;
841 my $oldest_allowed = 0;
843 unless ($my_recent->[0]) {
848 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
851 } elsif (my $merged = $self->merged) {
852 my $secs = $self->interval_secs();
853 $oldest_allowed = min
($epoch - $secs, $merged->{epoch
}||0);
854 if (@
$other_recent and
855 _bigfloatlt
($other_recent->[-1]{epoch
}, $oldest_allowed)
857 $oldest_allowed = $other_recent->[-1]{epoch
};
860 while (@
$my_recent && _bigfloatlt
($my_recent->[-1]{epoch
}, $oldest_allowed)) {
867 my $other_recent_filtered = [];
868 for my $oev (@
$other_recent) {
869 my $oevepoch = $oev->{epoch
} || 0;
870 next if _bigfloatlt
($oevepoch, $oldest_allowed);
871 my $path = $oev->{path
};
872 next if $have_path{$path}++;
873 if ( $self->interval eq "Z"
874 and $oev->{type
} eq "delete"
875 and ! $self->keep_delete_objects_forever
879 if (!$myepoch || _bigfloatgt
($oevepoch, $myepoch)) {
882 push @
$other_recent_filtered, { epoch
=> $oev->{epoch
}, path
=> $path, type
=> $oev->{type
} };
885 if ($something_done) {
886 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \
%have_path, $epoch);
890 sub _merge_something_done
{
891 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
893 my $epoch_conflict = 0;
895 ZIP
: while (@
$other_recent_filtered || @
$my_recent) {
898 @
$other_recent_filtered && _bigfloatge
($other_recent_filtered->[0]{epoch
},$my_recent->[0]{epoch
})) {
899 $event = shift @
$other_recent_filtered;
901 $event = shift @
$my_recent;
902 next ZIP
if $have_path->{$event->{path
}}++;
904 $epoch_conflict=1 if defined $last_epoch && $event->{epoch
} eq $last_epoch;
905 $last_epoch = $event->{epoch
};
906 push @
$recent, $event;
908 if ($epoch_conflict) {
910 for (my $i = $#$recent;$i>=0;$i--) {
911 my $epoch = $recent->[$i]{epoch
};
912 if ($have_epoch{$epoch}++) {
913 while ($have_epoch{$epoch}) {
914 $epoch = _increase_a_bit
($epoch);
916 $recent->[$i]{epoch
} = $epoch;
917 $have_epoch{$epoch}++;
921 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
922 $self->dirtymark ( $other->dirtymark );
924 $self->write_recent($recent);
926 time => Time
::HiRes
::time, # not used anywhere
927 epoch
=> $recent->[0]{epoch
},
928 into_interval
=> $self->interval, # not used anywhere
930 $other->write_recent($other_recent);
933 sub _merge_sanitycheck
{
934 my($self, $other) = @_;
935 if ($self->interval_secs <= $other->interval_secs) {
938 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
939 $self->interval_secs,
940 $other->interval_secs,
947 Hashref denoting when this recentfile has been merged into some other
953 my($self, $set) = @_;
955 $self->_merged ($set);
957 my $merged = $self->_merged;
959 if ($merged and $into = $merged->{into_interval
} and defined $self->_interval) {
961 if ($into eq $self->interval) {
965 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
969 } elsif ($self->interval_secs($into) < $self->interval_secs) {
973 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
974 $self->interval_secs($into),
975 $self->interval_secs,
983 =head2 $hashref = $obj->meta_data
985 Returns the hashref of metadata that the server has to add to the
992 my $ret = $self->{meta
};
1003 "serializer_suffix",
1010 # XXX need to reset the Producer if I am a writer, keep it when I
1012 $ret->{Producers
} ||= {
1013 __PACKAGE__
, "$VERSION", # stringified it looks better
1015 'time', Time
::HiRes
::time,
1017 $ret->{dirtymark
} ||= Time
::HiRes
::time;
1021 =head2 $success = $obj->mirror ( %options )
1023 Mirrors the files in this I<recentfile> as reported by
1024 C<recent_events>. Options named C<after>, C<before>, C<max>, and
1025 C<skip-deletes> are passed through to the C<recent_events> call. The
1026 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
1027 C<max_files_per_connection> and keep track of the rsynced files so
1028 that future calls will rsync different files until all files are
1034 my($self, %options) = @_;
1035 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1036 $self->_use_tempfile (1);
1037 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
1038 my ($recent_events) = $self->recent_events(%passthrough);
1039 my(@error, @dlcollector); # download-collector: array containing paths we need
1041 my $last_item = $#$recent_events;
1042 my $done = $self->done;
1043 my $pathdb = $self->_pathdb;
1044 ITEM
: for my $i ($first_item..$last_item) {
1058 last if $i == $last_item;
1059 if ($status->{mustreturn
}){
1060 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1061 # looks like a bug somewhere else
1062 my $t = $self->_current_tempfile;
1063 unlink $t or die "Could not unlink '$t': $!";
1064 $self->_current_tempfile(undef);
1070 my $success = eval { $self->_mirror_dlcollector (\
@dlcollector,$pathdb,$recent_events);};
1071 if (!$success || $@
) {
1072 warn "Warning: Unknown error while mirroring: $@";
1077 if ($self->verbose) {
1078 print STDERR
"DONE\n";
1080 # once we've gone to the end we consider ourselves free of obligations
1082 $self->_mirror_unhide_tempfile ($trecentfile);
1083 $self->_mirror_perform_delayed_ops;
1099 my $recent_event = $recent_events->[$i];
1100 return if $done->covered ( $recent_event->{epoch
} );
1102 my $rec = $pathdb->{$recent_event->{path
}};
1103 if ($rec && $rec->{recentepoch
}) {
1105 ( $rec->{recentepoch
}, $recent_event->{epoch
} )){
1106 $done->register ($recent_events, [$i]);
1111 my $dst = $self->local_path($recent_event->{path
});
1112 if ($recent_event->{type
} eq "new"){
1113 $self->_mirror_item_new
1126 } elsif ($recent_event->{type
} eq "delete") {
1128 if ($options->{'skip-deletes'}) {
1129 $activity = "skipped";
1132 $activity = "not_found";
1133 } elsif (-l
$dst or not -d _
) {
1134 $self->delayed_operations->{unlink}{$dst}++;
1135 $activity = "deleted";
1137 $self->delayed_operations->{rmdir}{$dst}++;
1138 $activity = "deleted";
1141 $done->register ($recent_events, [$i]);
1143 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1146 warn "Warning: invalid upload type '$recent_event->{type}'";
1150 sub _mirror_item_new
{
1163 if ($self->verbose) {
1164 my $doing = -e
$dst ?
"Sync" : "Get";
1167 "%-4s %d (%d/%d/%s) %s ... ",
1173 $recent_event->{path
},
1176 my $max_files_per_connection = $self->max_files_per_connection || 42;
1178 if ($self->verbose) {
1181 push @
$dlcollector, { rev
=> $recent_event, i
=> $i };
1182 if (@
$dlcollector >= $max_files_per_connection) {
1183 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1184 my $sleep = $self->sleep_per_connection;
1185 $sleep = 0.42 unless defined $sleep;
1186 Time
::HiRes
::sleep $sleep;
1187 if ($options->{piecemeal
}) {
1188 $status->{mustreturn
} = 1;
1194 if (!$success || $@
) {
1195 warn "Warning: Error while mirroring: $@";
1199 if ($self->verbose) {
1200 print STDERR
"DONE\n";
1204 sub _mirror_dlcollector
{
1205 my($self,$xcoll,$pathdb,$recent_events) = @_;
1206 my $success = $self->mirror_path([map {$_->{rev
}{path
}} @
$xcoll]);
1208 $self->_mirror_register_path($pathdb,[map {$_->{rev
}} @
$xcoll],"rsync");
1210 $self->done->register($recent_events, [map {$_->{i
}} @
$xcoll]);
1215 sub _mirror_register_path
{
1216 my($self,$pathdb,$coll,$activity) = @_;
1218 for my $item (@
$coll) {
1219 $pathdb->{$item->{path
}} =
1221 recentepoch
=> $item->{epoch
},
1222 ($activity."_on") => $time,
1227 sub _mirror_unhide_tempfile
{
1228 my($self, $trecentfile) = @_;
1229 my $rfile = $self->rfile;
1230 if (rename $trecentfile, $rfile) {
1231 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1234 Carp
::confess
("Could not rename '$trecentfile' to '$rfile': $!");
1236 $self->_use_tempfile (0);
1237 if (my $ctfh = $self->_current_tempfile_fh) {
1238 $ctfh->unlink_on_destroy (0);
1239 $self->_current_tempfile_fh (undef);
1243 sub _mirror_perform_delayed_ops
{
1245 my $delayed = $self->delayed_operations;
1246 for my $dst (keys %{$delayed->{unlink}}) {
1247 unless (unlink $dst) {
1249 Carp
::cluck
( "Warning: Error while unlinking '$dst': $!" );
1251 delete $delayed->{unlink}{$dst};
1253 for my $dst (keys %{$delayed->{rmdir}}) {
1254 unless (rmdir $dst) {
1256 Carp
::cluck
( "Warning: Error on rmdir '$dst': $!" );
1258 delete $delayed->{rmdir}{$dst};
1262 =head2 (void) $obj->mirror_loop
1264 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1265 What happens/should happen if we miss the interval during a single loop?
1271 my $iteration_start = time;
1274 $SIG{INT
} = sub { $Signal++ };
1275 my $loopinterval = $self->loopinterval || 42;
1276 my $after = -999999999;
1278 $self->mirror($after);
1279 last LOOP
if $Signal;
1280 my $re = $self->recent_events;
1281 $after = $re->[0]{epoch
};
1282 if ($self->verbose) {
1286 if (time - $iteration_start < $loopinterval) {
1287 sleep $iteration_start + $loopinterval - time;
1289 if ($self->verbose) {
1296 =head2 $success = $obj->mirror_path ( $arrref | $path )
1298 If the argument is a scalar it is treated as a path. The remote path
1299 is mirrored into the local copy. $path is the path found in the
1300 I<recentfile>, i.e. it is relative to the root directory of the
1303 If the argument is an array reference then all elements are treated as
1304 a path below the current tree and all are rsynced with a single
1305 command (and a single connection).
1310 my($self,$path) = @_;
1311 # XXX simplify the two branches such that $path is treated as
1312 # [$path] maybe even demand the argument as an arrayref to
1313 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1315 if (ref $path and ref $path eq "ARRAY") {
1316 my $dst = $self->localroot;
1317 mkpath dirname
$dst;
1318 my($fh) = File
::Temp
->new(TEMPLATE
=> sprintf(".%s-XXXX",
1319 lc $self->filenameroot,
1324 for my $p (@
$path) {
1328 $fh->unlink_on_destroy(1);
1331 while (!$self->rsync->exec
1337 'files-from' => $fh->filename,
1339 my(@err) = $self->rsync->err;
1340 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1341 if ($self->verbose) {
1342 warn "Info: ignoring link_stat error '@err'";
1346 $self->register_rsync_error (@err);
1347 if (++$retried >= 3) {
1348 warn "XXX giving up.";
1354 $self->un_register_rsync_error ();
1357 my $dst = $self->local_path($path);
1358 mkpath dirname
$dst;
1359 while (!$self->rsync->exec
1367 my(@err) = $self->rsync->err;
1368 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1369 if ($self->verbose) {
1370 warn "Info: ignoring link_stat error '@err'";
1374 $self->register_rsync_error (@err);
1376 $self->un_register_rsync_error ();
1381 sub _my_current_rfile
{
1384 if ($self->_use_tempfile) {
1385 $rfile = $self->_current_tempfile;
1387 $rfile = $self->rfile;
1392 =head2 $path = $obj->naive_path_normalize ($path)
1394 Takes an absolute unix style path as argument and canonicalizes it to
1395 a shorter path if possible, removing things like double slashes or
1396 C</./> and removes references to C<../> directories to get a shorter
1397 unambiguos path. This is used to make the code easier that determines
1398 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1402 sub naive_path_normalize
{
1403 my($self,$path) = @_;
1405 1 while $path =~ s
|/[^/]+/\.\./|/|;
1410 =head2 $ret = $obj->read_recent_1 ( $data )
1412 Delegate of C<recent_events()> on protocol 1
1417 my($self, $data) = @_;
1418 return $data->{recent
};
1421 =head2 $array_ref = $obj->recent_events ( %options )
1423 Note: the code relies on the resource being written atomically. We
1424 cannot lock because we may have no write access. If the caller has
1425 write access (eg. aggregate() or update()), it has to care for any
1426 necessary locking and it MUST write atomically.
1428 If C<$options{after}> is specified, only file events after this
1429 timestamp are returned.
1431 If C<$options{before}> is specified, only file events before this
1432 timestamp are returned.
1434 IF C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1437 If C<$options{max}> is specified only a maximum of this many events is
1440 If C<$options{contains}> is specified the value must be a hash
1441 reference containing a query. The query may contain the keys C<epoch>,
1442 C<path>, and C<type>. Each represents a condition that must be met. If
1443 there is more than one such key, the conditions are ANDed.
1445 If C<$options{info}> is specified, it must be a hashref. This hashref
1446 will be filled with metadata about the unfiltered recent_events of
1447 this object, in key C<first> there is the first item, in key C<last>
1453 my ($self, %options) = @_;
1454 my $info = $options{info
};
1455 if ($self->is_slave) {
1456 # XXX seems dubious, might produce tempfiles without removing them?
1457 $self->get_remote_recentfile_as_tempfile;
1459 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1460 -e
$rfile_or_tempfile or return [];
1461 my $suffix = $self->serializer_suffix;
1463 $self->_try_deserialize
1470 if ($err or !$data) {
1474 if (reftype
$data eq 'ARRAY') { # protocol 0
1477 $re = $self->_recent_events_protocol_x
1483 return $re unless grep {defined $options{$_}} qw(after before contains max);
1484 $self->_recent_events_handle_options ($re, \
%options);
1487 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1488 sub _recent_events_handle_options
{
1489 my($self, $re, $options) = @_;
1490 my $last_item = $#$re;
1491 my $info = $options->{info
};
1493 $info->{first
} = $re->[0];
1494 $info->{last} = $re->[-1];
1496 if (defined $options->{after
}) {
1497 if ($re->[0]{epoch
} > $options->{after
}) {
1500 {$re->[$_]{epoch
} <= $options->{after
}}
1510 if (defined $options->{before
}) {
1511 if ($re->[0]{epoch
} > $options->{before
}) {
1514 {$re->[$_]{epoch
} < $options->{before
}}
1523 if (0 != $first_item || -1 != $last_item) {
1524 @
$re = splice @
$re, $first_item, 1+$last_item-$first_item;
1526 if ($options->{'skip-deletes'}) {
1527 @
$re = grep { $_->{type
} ne "delete" } @
$re;
1529 if (my $contopt = $options->{contains
}) {
1530 my $seen_allowed = 0;
1531 for my $allow (qw(epoch path type)) {
1532 if (exists $contopt->{$allow}) {
1534 my $v = $contopt->{$allow};
1535 @
$re = grep { $_->{$allow} eq $v } @
$re;
1538 if (keys %$contopt > $seen_allowed) {
1541 (sprintf "unknown query: %s", join ", ", %$contopt);
1544 if ($options->{max
} && @
$re > $options->{max
}) {
1545 @
$re = splice @
$re, 0, $options->{max
};
1550 sub _recent_events_protocol_x
{
1555 my $meth = sprintf "read_recent_%d", $data->{meta
}{protocol
};
1556 # we may be reading meta for the first time
1557 while (my($k,$v) = each %{$data->{meta
}}) {
1558 next if $k ne lc $k; # "Producers"
1559 next if defined $self->$k;
1562 my $re = $self->$meth ($data);
1563 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1564 my $minmax = { mtime
=> $stat[9] };
1566 $minmax->{min
} = $re->[-1]{epoch
};
1567 $minmax->{max
} = $re->[0]{epoch
};
1569 $self->minmax ( $minmax );
1573 sub _try_deserialize
{
1578 if ($suffix eq ".yaml") {
1580 YAML
::Syck
::LoadFile
($rfile_or_tempfile);
1581 } elsif ($HAVE->{"Data::Serializer"}) {
1582 my $serializer = Data
::Serializer
->new
1583 ( serializer
=> $serializers{$suffix} );
1586 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1590 $serializer->raw_deserialize($serialized);
1592 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1596 sub _refresh_internals
{
1597 my($self, $dst) = @_;
1598 my $class = ref $self;
1599 my $rfpeek = $class->new_from_file ($dst);
1604 $self->$acc ( $rfpeek->$acc );
1606 my $old_dirtymark = $self->dirtymark;
1607 my $new_dirtymark = $rfpeek->dirtymark;
1608 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1610 $self->dirtymark ( $new_dirtymark );
1615 =head2 $ret = $obj->rfilename
1617 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1618 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1624 my $file = sprintf("%s-%s%s",
1625 $self->filenameroot,
1627 $self->serializer_suffix,
1632 =head2 $str = $self->remote_dir
1634 The directory we are mirroring from.
1639 my($self, $set) = @_;
1641 $self->_remote_dir ($set);
1643 my $x = $self->_remote_dir;
1644 $self->is_slave (1);
1648 =head2 $str = $obj->remoteroot
1650 =head2 (void) $obj->remoteroot ( $set )
1652 Get/Set the composed prefix needed when rsyncing from a remote module.
1653 If remote_host, remote_module, and remote_dir are set, it is composed
1659 my($self, $set) = @_;
1661 $self->_remoteroot($set);
1663 my $remoteroot = $self->_remoteroot;
1664 unless (defined $remoteroot) {
1665 $remoteroot = sprintf
1668 defined $self->remote_host ?
($self->remote_host."::") : "",
1669 defined $self->remote_module ?
($self->remote_module."/") : "",
1670 defined $self->remote_dir ?
$self->remote_dir : "",
1672 $self->_remoteroot($remoteroot);
1677 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1679 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1682 $filenameroot-$interval$serializer_suffix
1688 This filename is split into its parts and the parts are fed to the
1693 sub resolve_recentfilename
{
1694 my($self, $rfname) = @_;
1695 my($splitter) = qr
(^(.+)-([^-\
.]+)(\
.[^\
.]+));
1696 if (my($f,$i,$s) = $rfname =~ $splitter) {
1697 $self->filenameroot ($f);
1698 $self->interval ($i);
1699 $self->serializer_suffix ($s);
1701 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1706 =head2 my $rfile = $obj->rfile
1708 Returns the full path of the I<recentfile>
1714 my $rfile = $self->_rfile;
1715 return $rfile if defined $rfile;
1716 $rfile = File
::Spec
->catfile
1720 $self->_rfile ($rfile);
1724 =head2 $rsync_obj = $obj->rsync
1726 The File::Rsync object that this object uses for communicating with an
1733 my $rsync = $self->_rsync;
1734 unless (defined $rsync) {
1735 my $rsync_options = $self->rsync_options || {};
1736 if ($HAVE->{"File::Rsync"}) {
1737 $rsync = File
::Rsync
->new($rsync_options);
1738 $self->_rsync($rsync);
1740 die "File::Rsync required for rsync operations. Cannot continue";
1746 =head2 (void) $obj->register_rsync_error(@err)
1748 =head2 (void) $obj->un_register_rsync_error()
1750 Register_rsync_error is called whenever the File::Rsync object fails
1751 on an exec (say, connection doesn't succeed). It issues a warning and
1752 sleeps for an increasing amount of time. Un_register_rsync_error
1753 resets the error count. See also accessor C<max_rsync_errors>.
1758 my $no_success_count = 0;
1759 my $no_success_time = 0;
1760 sub register_rsync_error
{
1761 my($self, @err) = @_;
1763 $no_success_time = time;
1764 $no_success_count++;
1765 my $max_rsync_errors = $self->max_rsync_errors;
1766 $max_rsync_errors = MAX_INT
unless defined $max_rsync_errors;
1767 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1773 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1779 my $sleep = 12 * $no_success_count;
1780 $sleep = 300 if $sleep > 300;
1785 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1786 scalar(localtime($no_success_time)),
1793 sub un_register_rsync_error
{
1795 $no_success_time = 0;
1796 $no_success_count = 0;
1800 =head2 $clone = $obj->_sparse_clone
1802 Clones just as much from itself that it does not hurt. Experimental
1805 Note: what fits better: sparse or shallow? Other suggestions?
1811 my $new = bless {}, ref $self;
1821 max_files_per_connection
1825 sleep_per_connection
1830 $o = Storable
::dclone
$o if ref $o;
1836 =head2 $boolean = OBJ->ttl_reached ()
1842 my $have_mirrored = $self->have_mirrored || 0;
1843 my $now = Time
::HiRes
::time;
1844 my $ttl = $self->ttl;
1845 $ttl = 24.2 unless defined $ttl;
1846 if ($now > $have_mirrored + $ttl) {
1852 =head2 (void) $obj->unlock()
1854 Unlocking is implemented with an C<rmdir> on a locking directory
1855 (C<.lock> appended to $rfile).
1861 return unless $self->_is_locked;
1862 my $rfile = $self->rfile;
1863 rmdir "$rfile.lock";
1864 $self->_is_locked (0);
1869 Sets this recentfile in the state of not 'seeded'.
1877 =head2 $ret = $obj->update ($path, $type)
1879 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1881 =head2 $ret = $obj->update ()
1883 Enter one file into the local I<recentfile>. $path is the (usually
1884 absolute) path. If the path is outside I<our> tree, then it is
1887 C<$type> is one of C<new> or C<delete>.
1889 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1890 not used and the epoch is calculated by the update() routine itself
1891 based on current time. But if there is the demand to insert a
1892 not-so-current file into the dataset, then the caller sets
1893 $dirty_epoch. This causes the epoch of the registered event to become
1894 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1895 bit more. As compensation the dirtymark of the whole dataset is set to
1896 now or the current epoch, whichever is higher. Note: setting the
1897 dirty_epoch to the future is prohibited as it's very unlikely to be
1898 intended: it definitely might wreak havoc with the index files.
1900 The new file event is unshifted (or, if dirty_epoch is set, inserted
1901 at the place it belongs to, according to the rule to have a sequence
1902 of strictly decreasing timestamps) to the array of recent_events and
1903 the array is shortened to the length of the timespan allowed. This is
1904 usually the timespan specified by the interval of this recentfile but
1905 as long as this recentfile has not been merged to another one, the
1906 timespan may grow without bounds.
1908 The third form runs an update without inserting a new file. This may
1909 be desired to truncate a recentfile.
1912 sub _epoch_monotonically_increasing
{
1913 my($self,$epoch,$recent) = @_;
1914 return $epoch unless @
$recent; # the first one goes unoffended
1915 if (_bigfloatgt
("".$epoch,$recent->[0]{epoch
})) {
1918 return _increase_a_bit
($recent->[0]{epoch
});
1922 my($self,$path,$type,$dirty_epoch) = @_;
1923 if (defined $path or defined $type or defined $dirty_epoch) {
1924 die "update called without path argument" unless defined $path;
1925 die "update called without type argument" unless defined $type;
1926 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1927 # since we have keep_delete_objects_forever we must let them inject delete objects too:
1928 #die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1929 # "dirty_epoch only allowed with type=new" if defined $dirty_epoch and $type ne "new";
1930 my $canonmeth = $self->canonize;
1931 unless ($canonmeth) {
1932 $canonmeth = "naive_path_normalize";
1934 $path = $self->$canonmeth($path);
1936 my $lrd = $self->localroot;
1938 # you must calculate the time after having locked, of course
1939 my $now = Time
::HiRes
::time;
1940 my $interval = $self->interval;
1941 my $secs = $self->interval_secs();
1942 my $recent = $self->recent_events;
1945 if (defined $dirty_epoch && _bigfloatgt
($now,$dirty_epoch)) {
1946 $epoch = $dirty_epoch;
1948 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
1952 my $oldest_allowed = 0;
1953 my $merged = $self->merged;
1954 if ($merged->{epoch
}) {
1955 my $virtualnow = _bigfloatmax
($now,$epoch);
1956 # for the lower bound I think we need no big math, we calc already
1957 $oldest_allowed = min
($virtualnow - $secs, $merged->{epoch
}, $epoch);
1959 # as long as we are not merged at all, no limits!
1961 my $something_done = 0;
1962 TRUNCATE
: while (@
$recent) {
1963 # $DB::single++ unless defined $oldest_allowed;
1964 if (_bigfloatlt
($recent->[-1]{epoch
}, $oldest_allowed)) {
1966 $something_done = 1;
1971 if (defined $path && $path =~ s
|^\Q
$lrd\E
||) {
1974 # remove the older duplicates of this $path, irrespective of $type:
1975 if (defined $dirty_epoch) {
1976 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
1977 $recent = $ctx->{recent
};
1978 $splicepos = $ctx->{splicepos
};
1979 $epoch = $ctx->{epoch
};
1980 my $dirtymark = $self->dirtymark;
1982 if (_bigfloatgt
($epoch, $now)) { # just in case we had to increase it
1985 $self->dirtymark($new_dm);
1986 my $merged = $self->merged;
1987 if (not defined $merged->{epoch
} or _bigfloatlt
($epoch,$merged->{epoch
})) {
1991 $recent = [ grep { $_->{path
} ne $path } @
$recent ];
1994 if (defined $splicepos) {
1995 splice @
$recent, $splicepos, 0, { epoch
=> $epoch, path
=> $path, type
=> $type };
1997 $something_done = 1;
2000 $self->write_recent($recent) if $something_done;
2001 $self->_assert_symlink;
2005 sub _update_with_dirty_epoch
{
2006 my($self,$path,$recent,$epoch) = @_;
2008 my $new_recent = [];
2009 if (grep { $_->{path
} ne $path } @
$recent) {
2011 KNOWN_EVENT
: for my $i (0..$#$recent) {
2012 if ($recent->[$i]{path
} eq $path) {
2013 if ($recent->[$i]{epoch
} eq $epoch) {
2019 push @
$new_recent, $recent->[$i];
2022 @
$recent = @
$new_recent unless $cancel;
2024 if (!exists $recent->[0] or _bigfloatgt
($epoch,$recent->[0]{epoch
})) {
2026 } elsif (_bigfloatlt
($epoch,$recent->[-1]{epoch
})) {
2027 $splicepos = @
$recent;
2029 RECENT
: for my $i (0..$#$recent) {
2030 my $ev = $recent->[$i];
2031 if ($epoch eq $recent->[$i]{epoch
}) {
2032 $epoch = _increase_a_bit
($epoch, $i ?
$recent->[$i-1]{epoch
} : undef);
2034 if (_bigfloatgt
($epoch,$recent->[$i]{epoch
})) {
2042 splicepos
=> $splicepos,
2049 Sets this recentfile in the state of 'seeded' which means it has to
2050 re-evaluate its uptodateness.
2060 Tells if the recentfile is in the state 'seeded'.
2064 my($self, $set) = @_;
2066 $self->_seeded ($set);
2068 my $x = $self->_seeded;
2069 unless (defined $x) {
2071 $self->_seeded ($x);
2078 True if this object has mirrored the complete interval covered by the
2086 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2090 # it's too easy to misconfigure ttl and related timings and then
2091 # never reach uptodateness, so disabled 2009-03-22
2092 if (0 and not defined $uptodate) {
2093 if ($self->ttl_reached){
2094 $why = "ttl_reached returned true, so we are not uptodate";
2098 unless (defined $uptodate) {
2099 # look if recentfile has unchanged timestamp
2100 my $minmax = $self->minmax;
2101 if (exists $minmax->{mtime
}) {
2102 my $rfile = $self->_my_current_rfile;
2103 my @stat = stat $rfile;
2104 my $mtime = $stat[9];
2105 if (defined $mtime && defined $minmax->{mtime
} && $mtime > $minmax->{mtime
}) {
2106 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2109 my $covered = $self->done->covered(@
$minmax{qw(max min)});
2110 $why = sprintf "minmax covered[%s], so we return that", defined $covered ?
$covered : "UNDEF";
2111 $uptodate = $covered;
2115 unless (defined $uptodate) {
2116 $why = "fallthrough, so not uptodate";
2120 $self->_uptodateness_ever_reached(1);
2124 uptodate
=> $uptodate,
2127 $self->_remember_last_uptodate_call($remember);
2131 =head2 $obj->write_recent ($recent_files_arrayref)
2133 Writes a I<recentfile> based on the current reflection of the current
2134 state of the tree limited by the current interval.
2139 @
{$_[1]} = sort { _bigfloatcmp
($b->{epoch
},$a->{epoch
}) } @
{$_[1]};
2143 my ($self,$recent) = @_;
2144 die "write_recent called without argument" unless defined $recent;
2146 SANITYCHECK
: for my $i (0..$#$recent) {
2147 if (defined($Last_epoch) and _bigfloatge
($recent->[$i]{epoch
},$Last_epoch)) {
2149 Carp
::confess
(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2150 $recent->[$i]{epoch
}, $Last_epoch, $self->interval);
2152 # $self->_resort($recent);
2155 $Last_epoch = $recent->[$i]{epoch
};
2157 my $minmax = $self->minmax;
2158 if (!defined $minmax->{max
} || _bigfloatlt
($minmax->{max
},$recent->[0]{epoch
})) {
2159 $minmax->{max
} = $recent->[0]{epoch
};
2161 if (!defined $minmax->{min
} || _bigfloatlt
($minmax->{min
},$recent->[-1]{epoch
})) {
2162 $minmax->{min
} = $recent->[-1]{epoch
};
2164 $self->minmax($minmax);
2165 my $meth = sprintf "write_%d", $self->protocol;
2166 $self->$meth($recent);
2169 =head2 $obj->write_0 ($recent_files_arrayref)
2171 Delegate of C<write_recent()> on protocol 0
2176 my ($self,$recent) = @_;
2177 my $rfile = $self->rfile;
2178 YAML
::Syck
::DumpFile
("$rfile.new",$recent);
2179 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2182 =head2 $obj->write_1 ($recent_files_arrayref)
2184 Delegate of C<write_recent()> on protocol 1
2189 my ($self,$recent) = @_;
2190 my $rfile = $self->rfile;
2191 my $suffix = $self->serializer_suffix;
2193 meta
=> $self->meta_data,
2197 if ($suffix eq ".yaml") {
2198 $serialized = YAML
::Syck
::Dump
($data);
2199 } elsif ($HAVE->{"Data::Serializer"}) {
2200 my $serializer = Data
::Serializer
->new
2201 ( serializer
=> $serializers{$suffix} );
2202 $serialized = $serializer->raw_serialize($data);
2204 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2206 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2207 print $fh $serialized;
2208 close $fh or die "Could not close '$rfile.new': $!";
2209 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2213 my $nq = qr/[^"]+/; # non-quotes
2215 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2219 The following suffixes are supported and trigger the use of these
2224 =item C<< ".yaml" => "YAML::Syck" >>
2226 =item C<< ".json" => "JSON" >>
2228 =item C<< ".sto" => "Storable" >>
2230 =item C<< ".dd" => "Data::Dumper" >>
2238 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2240 =head1 INTERVAL SPEC
2242 An interval spec is a primitive way to express time spans. Normally it
2243 is composed from an integer and a letter.
2245 As a special case, a string that consists only of the single letter
2246 C<Z>, stands for MAX_INT seconds.
2248 The following letters express the specified number of seconds:
2254 =item C<< m => 60 >>
2256 =item C<< h => 60*60 >>
2258 =item C<< d => 60*60*24 >>
2260 =item C<< W => 60*60*24*7 >>
2262 =item C<< M => 60*60*24*30 >>
2264 =item C<< Q => 60*60*24*90 >>
2266 =item C<< Y => 60*60*24*365.25 >>
2274 L<File::Rsync::Mirror::Recent>,
2275 L<File::Rsync::Mirror::Recentfile::Done>,
2276 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2280 Please report any bugs or feature requests through the web interface
2282 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2283 I will be notified, and then you'll automatically be notified of
2284 progress on your bug as I make changes.
2288 Memory hungry: it seems all memory is allocated during the initial
2289 rsync where a list of all files is maintained in memory.
2293 You can find documentation for this module with the perldoc command.
2295 perldoc File::Rsync::Mirror::Recentfile
2297 You can also look for information at:
2301 =item * RT: CPAN's request tracker
2303 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2305 =item * AnnoCPAN: Annotated CPAN documentation
2307 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2309 =item * CPAN Ratings
2311 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2315 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2320 =head1 ACKNOWLEDGEMENTS
2322 Thanks to RJBS for module-starter.
2328 =head1 COPYRIGHT & LICENSE
2330 Copyright 2008,2009 Andreas König.
2332 This program is free software; you can redistribute it and/or modify it
2333 under the same terms as Perl itself.
2338 1; # End of File::Rsync::Mirror::Recentfile
2342 # cperl-indent-level: 4