1 package File
::Rsync
::Mirror
::Recentfile
;
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
19 $HAVE->{$package} = eval qq{ require $package; };
22 use File
::Basename
qw(basename dirname fileparse);
23 use File
::Copy
qw(cp);
24 use File
::Path
qw(mkpath);
25 use File
::Rsync
::Mirror
::Recentfile
::FakeBigFloat
qw(:all);
27 use List
::Util
qw(first max min);
28 use Scalar
::Util
qw(reftype);
33 use version
; our $VERSION = qv
('0.0.8');
35 use constant MAX_INT
=> ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL
=> 1;
41 # maybe subclass if this mapping is bad?
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
65 localroot => "/home/ftp/pub/PAUSE/authors",
67 remote_host => "pause.perl.org",
68 remote_module => "authors",
71 'rsync-path' => '/usr/bin/rsync',
74 'omit-dir-times' => 1,
81 Aggregator (usually the writer):
83 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
88 Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
89 is always composed of several recentfiles, controlled by the
90 F:R:M:Recent object. The Recentfile object has to do the bookkeeping
91 for a single timeslice.
97 =head1 CONSTRUCTORS / DESTRUCTOR
99 =head2 my $obj = CLASS->new(%hash)
101 Constructor. On every argument pair the key is a method name and the
102 value is an argument to that method name.
104 If a recentfile for this resource already exists, metadata that are
105 not defined by the constructor will be fetched from there as soon as
106 it is being read by recent_events().
111 my($class, @args) = @_;
112 my $self = bless {}, $class;
114 my($method,$arg) = splice @args, 0, 2;
115 $self->$method($arg);
117 unless (defined $self->protocol) {
118 $self->protocol(DEFAULT_PROTOCOL
);
120 unless (defined $self->filenameroot) {
121 $self->filenameroot("RECENT");
123 unless (defined $self->serializer_suffix) {
124 $self->serializer_suffix(".yaml");
129 =head2 my $obj = CLASS->new_from_file($file)
131 Constructor. $file is a I<recentfile>.
136 my($class, $file) = @_;
137 my $self = bless {}, $class;
138 $self->_rfile($file);
140 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
144 # XXX: we can skip this step when the metadata are sufficient, but
145 # we cannot parse the file without some magic stuff about
148 my($name,$path) = fileparse
$file;
149 my $symlink = readlink $file;
150 if ($symlink =~ m
|/|) {
151 die "FIXME: filenames containing '/' not supported, got $symlink";
153 $file = File
::Spec
->catfile ( $path, $symlink );
155 my($name,$path,$suffix) = fileparse
$file, keys %serializers;
156 $self->serializer_suffix($suffix);
157 $self->localroot($path);
158 die "Could not determine file format from suffix" unless $suffix;
160 if ($suffix eq ".yaml") {
162 $deserialized = YAML
::Syck
::LoadFile
($file);
163 } elsif ($HAVE->{"Data::Serializer"}) {
164 my $serializer = Data
::Serializer
->new
165 ( serializer
=> $serializers{$suffix} );
166 $deserialized = $serializer->raw_deserialize($serialized);
168 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
170 while (my($k,$v) = each %{$deserialized->{meta
}}) {
171 next if $k ne lc $k; # "Producers"
174 unless (defined $self->protocol) {
175 $self->protocol(DEFAULT_PROTOCOL
);
188 unless ($self->_current_tempfile_fh) {
189 if (my $tempfile = $self->_current_tempfile) {
191 unlink $tempfile; # may fail in global destruction
206 "_current_tempfile_fh",
207 "_delayed_operations",
214 "_remember_last_uptodate_call",
219 "__verified_tempdir",
221 "_uptodateness_ever_reached",
226 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
232 A list of interval specs that tell the aggregator which I<recentfile>s
237 The name of a method to canonize the path before rsyncing. Only
238 supported value is C<naive_path_normalize>. Defaults to that.
242 A comment about this tree and setup.
246 A timestamp. The dirtymark is updated whenever an out of band change
247 on the origin server is performed that violates the protocol. Say,
248 they add or remove files in the middle somewhere. Slaves must react
249 with a devaluation of their C<done> structure which then leads to a
250 full re-sync of all files. Implementation note: dirtymark may increase
255 The (prefix of the) filename we use for this I<recentfile>. Defaults to
256 C<RECENT>. The string must not contain a directory separator.
260 Timestamp remembering when we mirrored this recentfile the last time.
261 Only relevant for slaves.
263 =item ignore_link_stat_errors
265 If set to true, rsync errors are ignored that complain about link stat
266 errors. These seem to happen only when there are files missing at the
267 origin. In race conditions this can always happen, so it defaults to
272 If set to true, this object will fetch a new recentfile from remote
273 when the timespan between the last mirror (see have_mirrored) and now
274 is too large (see C<ttl>).
276 =item keep_delete_objects_forever
278 The default for delete events is that they are passed through the
279 collection of recentfile objects until they reach the Z file. There
280 they get dropped so that the associated file object ceases to exist at
281 all. By setting C<keep_delete_objects_forever> the delete objects are
282 kept forever. This makes the Z file larger but has the advantage that
283 slaves that have interrupted mirroring for a long time still can clean
288 After how many seconds shall we die if we cannot lock a I<recentfile>?
289 Defaults to 600 seconds.
293 When mirror_loop is called, this accessor can specify how much time
294 every loop shall at least take. If the work of a loop is done before
295 that time has gone, sleeps for the rest of the time. Defaults to
296 arbitrary 42 seconds.
298 =item max_files_per_connection
300 Maximum number of files that are transferred on a single rsync call.
301 Setting it higher means higher performance at the price of holding
302 connections longer and potentially disturbing other users in the pool.
303 Defaults to the arbitrary value 42.
305 =item max_rsync_errors
307 When rsync operations encounter that many errors without any resetting
308 success in between, then we die. Defaults to unlimited. A value of
309 -1 means we run forever ignoring all rsync errors.
313 Hashref remembering when we read the recent_events from this file the
314 last time and what the timespan was.
318 When the RECENT file format changes, we increment the protocol. We try
319 to support older protocols in later releases.
323 The host we are mirroring from. Leave empty for the local filesystem.
327 Rsync servers have so called modules to separate directory trees from
328 each other. Put here the name of the module under which we are
329 mirroring. Leave empty for local filesystem.
333 Things like compress, links, times or checksums. Passed in to the
334 File::Rsync object used to run the mirror.
336 =item serializer_suffix
338 Mostly untested accessor. The only well tested format for
339 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
340 Data::Serializer. But in principle other formats are supported as
341 well. See section SERIALIZERS below.
343 =item sleep_per_connection
345 Sleep that many seconds (floating point OK) after every chunk of rsyncing
346 has finished. Defaults to arbitrary 0.42.
350 Directory to write temporary files to. Must allow rename operations
351 into the tree which usually means it must live on the same partition
352 as the target directory. Defaults to C<< $self->localroot >>.
356 Time to live. Number of seconds after which this recentfile must be
357 fetched again from the origin server. Only relevant for slaves.
358 Defaults to arbitrary 24.2 seconds.
362 Boolean to turn on a bit verbosity.
366 Path to the logfile to write verbose progress information to. This is
367 a primitive stop gap solution to get simple verbose logging working.
368 Switching to Log4perl or similar is probably the way to go.
374 use accessors
@accessors;
378 =head2 (void) $obj->aggregate( %options )
380 Takes all intervals that are collected in the accessor called
381 aggregator. Sorts them by actual length of the interval.
382 Removes those that are shorter than our own interval. Then merges this
383 object into the next larger object. The merging continues upwards
384 as long as the next I<recentfile> is old enough to warrant a merge.
386 If a merge is warranted is decided according to the interval of the
387 previous interval so that larger files are not so often updated as
388 smaller ones. If $options{force} is true, all files get updated.
390 Here is an example to illustrate the behaviour. Given aggregators
396 1h updates 1d on every call to aggregate()
397 1d updates 1W earliest after 1h
398 1W updates 1M earliest after 1d
399 1M updates 1Q earliest after 1W
400 1Q updates 1Y earliest after 1M
401 1Y updates Z earliest after 1Q
403 Note that all but the smallest recentfile get updated at an arbitrary
404 rate and as such are quite useless on their own.
409 my($self, %option) = @_;
410 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
411 grep { $_->{secs
} >= $self->interval_secs }
412 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
413 $self->interval, @
{$self->aggregator || []};
415 $aggs[0]{object
} = $self;
416 AGGREGATOR
: for my $i (0..$#aggs-1) {
417 my $this = $aggs[$i]{object
};
418 my $next = $this->_sparse_clone;
419 $next->interval($aggs[$i+1]{interval
});
421 if ($option{force
} || $i == 0) {
424 my $next_rfile = $next->rfile;
425 if (-e
$next_rfile) {
426 my $prev = $aggs[$i-1]{object
};
428 my $next_age = 86400 * -M
$next_rfile;
429 if ($next_age > $prev->interval_secs) {
438 $aggs[$i+1]{object
} = $next;
445 # collect file size and mtime for all files of this aggregate
446 sub _debug_aggregate
{
448 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
449 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
450 $self->interval, @
{$self->aggregator || []};
452 for my $i (0..$#aggs) {
453 my $this = Storable
::dclone
$self;
454 $this->interval($aggs[$i]{interval
});
455 my $rfile = $this->rfile;
456 my @stat = stat $rfile;
457 push @
$report, {rfile
=> $rfile, size
=> $stat[7], mtime
=> $stat[9]};
462 # (void) $self->_assert_symlink()
463 sub _assert_symlink
{
465 my $recentrecentfile = File
::Spec
->catfile
474 if ($Config{d_symlink
} eq "define") {
475 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
476 if (-l
$recentrecentfile) {
477 my $found_symlink = readlink $recentrecentfile;
478 if ($found_symlink eq $self->rfilename) {
481 $howto_create_symlink = 2;
484 $howto_create_symlink = 1;
486 if (1 == $howto_create_symlink) {
487 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
489 unlink "$recentrecentfile.$$"; # may fail
490 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
491 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
494 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
495 unlink "$recentrecentfile.$$"; # may fail
496 cp
$self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
497 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
501 =head2 $hashref = $obj->delayed_operations
503 A hash of hashes containing unlink and rmdir operations which had to
504 wait until the recentfile got unhidden in order to not confuse
505 downstream mirrors (in case we have some).
509 sub delayed_operations
{
511 my $x = $self->_delayed_operations;
512 unless (defined $x) {
517 $self->_delayed_operations ($x);
522 =head2 $done = $obj->done
524 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
525 object that keeps track of rsync activities. Only needed and used when
526 we are a mirroring slave.
532 my $done = $self->_done;
534 require File
::Rsync
::Mirror
::Recentfile
::Done
;
535 $done = File
::Rsync
::Mirror
::Recentfile
::Done
->new();
536 $done->_rfinterval ($self->interval);
537 $self->_done ( $done );
542 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
544 Stores the remote I<recentfile> locally as a tempfile. The caller is
545 responsible to remove the file after use.
547 Note: if you're intending to act as an rsync server for other slaves,
548 then you must prefer this method to fetch that file with
549 get_remotefile(). Otherwise downstream mirrors would expect you to
550 already have mirrored all the files that are in the I<recentfile>
551 before you have them mirrored.
555 sub get_remote_recentfile_as_tempfile
{
557 mkpath
$self->localroot;
560 if ( $self->_use_tempfile() ) {
561 if ($self->ttl_reached) {
562 $fh = $self->_current_tempfile_fh;
563 $trfilename = $self->rfilename;
565 return $self->_current_tempfile;
568 $trfilename = $self->rfilename;
573 $dst = $self->_current_tempfile;
575 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
576 $dst = $fh->filename;
577 $self->_current_tempfile ($dst);
578 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
579 if (defined $rfile && -e
$rfile) {
580 # saving on bandwidth. Might need to be configurable
581 # $self->bandwidth_is_cheap?
582 cp
$rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
589 if ($self->verbose) {
590 my $doing = -e
$dst ?
"Sync" : "Get";
591 my $display_dst = join "/", "...", basename
(dirname
($dst)), basename
($dst);
592 my $LFH = $self->_logfilehandle;
595 "%-4s %d (1/1/%s) temp %s ... ",
604 local($ENV{LANG
}) = "C";
605 while (!$self->rsync->exec(
609 $self->register_rsync_error ($self->rsync->err);
610 if (++$retried >= 3) {
611 warn "XXX giving up";
617 my $LFH = $self->_logfilehandle;
618 printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
620 $self->_refresh_internals ($dst);
621 $self->have_mirrored (Time
::HiRes
::time);
622 $self->un_register_rsync_error ();
625 if ($self->verbose) {
626 my $LFH = $self->_logfilehandle;
630 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
634 sub _verified_tempdir
{
636 my $tempdir = $self->__verified_tempdir();
637 return $tempdir if defined $tempdir;
638 unless ($tempdir = $self->tempdir) {
639 $tempdir = $self->localroot;
641 unless (-d
$tempdir) {
644 $self->__verified_tempdir($tempdir);
648 sub _get_remote_rat_provide_tempfile_object
{
649 my($self, $trfilename) = @_;
650 my $_verified_tempdir = $self->_verified_tempdir;
651 my $fh = File
::Temp
->new
652 (TEMPLATE
=> sprintf(".FRMRecent-%s-XXXX",
655 DIR
=> $_verified_tempdir,
656 SUFFIX
=> $self->serializer_suffix,
657 UNLINK
=> $self->_use_tempfile,
660 my $dst = $fh->filename;
661 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
662 if ($self->_use_tempfile) {
663 $self->_current_tempfile_fh ($fh); # delay self destruction
671 if (my $vl = $self->verboselog) {
672 open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
679 =head2 $localpath = $obj->get_remotefile ( $relative_path )
681 Rsyncs one single remote file to local filesystem.
683 Note: no locking is done on this file. Any number of processes may
686 Note II: do not use for recentfiles. If you are a cascading
687 slave/server combination, it would confuse other slaves. They would
688 expect the contents of these recentfiles to be available. Use
689 get_remote_recentfile_as_tempfile() instead.
694 my($self, $path) = @_;
695 my $dst = File
::Spec
->catfile($self->localroot, $path);
697 if ($self->verbose) {
698 my $doing = -e
$dst ?
"Sync" : "Get";
699 my $LFH = $self->_logfilehandle;
702 "%-4s %d (1/1/%s) %s ... ",
709 local($ENV{LANG
}) = "C";
710 my $remoteroot = $self->remoteroot or die "Alert: missing remoteroot. Cannot continue";
711 while (!$self->rsync->exec(
717 $self->register_rsync_error ($self->rsync->err);
719 $self->un_register_rsync_error ();
720 if ($self->verbose) {
721 my $LFH = $self->_logfilehandle;
727 =head2 $obj->interval ( $interval_spec )
729 Get/set accessor. $interval_spec is a string and described below in
730 the section INTERVAL SPEC.
735 my ($self, $interval) = @_;
737 $self->_interval($interval);
738 $self->_rfile(undef);
740 $interval = $self->_interval;
741 unless (defined $interval) {
742 # do not ask the $self too much, it recurses!
744 Carp
::confess
("Alert: interval undefined for '".$self."'. Cannot continue.");
749 =head2 $secs = $obj->interval_secs ( $interval_spec )
751 $interval_spec is described below in the section INTERVAL SPEC. If
752 empty defaults to the inherent interval for this object.
757 my ($self, $interval) = @_;
758 $interval ||= $self->interval;
759 unless (defined $interval) {
760 die "interval_secs() called without argument on an object without a declared one";
762 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
763 die "Could not determine seconds from interval[$interval]";
764 if ($interval eq "Z") {
766 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
767 return $seconds{$t}*$n;
769 die "Invalid interval specification: n[$n]t[$t]";
773 =head2 $obj->localroot ( $localroot )
775 Get/set accessor. The local root of the tree.
780 my ($self, $localroot) = @_;
782 $self->_localroot($localroot);
783 $self->_rfile(undef);
785 $localroot = $self->_localroot;
788 =head2 $ret = $obj->local_path($path_found_in_recentfile)
790 Combines the path to our local mirror and the path of an object found
791 in this I<recentfile>. In other words: the target of a mirror operation.
793 Implementation note: We split on slashes and then use
794 File::Spec::catfile to adjust to the local operating system.
799 my($self,$path) = @_;
800 unless (defined $path) {
801 # seems like a degenerated case
802 return $self->localroot;
804 my @p = split m
|/|, $path;
805 File
::Spec
->catfile($self->localroot,@p);
808 =head2 (void) $obj->lock
810 Locking is implemented with an C<mkdir> on a locking directory
811 (C<.lock> appended to $rfile).
817 # not using flock because it locks on filehandles instead of
818 # old school ressources.
819 my $locked = $self->_is_locked and return;
820 my $rfile = $self->rfile;
821 # XXX need a way to allow breaking the lock
823 my $locktimeout = $self->locktimeout || 600;
824 while (not mkdir "$rfile.lock") {
825 Time
::HiRes
::sleep 0.01;
826 if (time - $start > $locktimeout) {
827 die "Could not acquire lockdirectory '$rfile.lock': $!";
830 $self->_is_locked (1);
833 =head2 (void) $obj->merge ($other)
835 Bulk update of this object with another one. It's used to merge a
836 smaller and younger $other object into the current one. If this file
837 is a C<Z> file, then we normally do not merge in objects of type
838 C<delete>; this can be overridden by setting
839 keep_delete_objects_forever. But if we encounter an object of type
840 delete we delete the corresponding C<new> object if we have it.
842 If there is nothing to be merged, nothing is done.
847 my($self, $other) = @_;
848 $self->_merge_sanitycheck ( $other );
850 my $other_recent = $other->recent_events || [];
851 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
853 $self->_merge_locked ( $other, $other_recent );
859 my($self, $other, $other_recent) = @_;
860 my $my_recent = $self->recent_events || [];
862 # calculate the target time span
863 my $myepoch = $my_recent->[0] ?
$my_recent->[0]{epoch
} : undef;
864 my $epoch = $other_recent->[0] ?
$other_recent->[0]{epoch
} : $myepoch;
865 my $oldest_allowed = 0;
867 unless ($my_recent->[0]) {
872 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
875 } elsif (my $merged = $self->merged) {
876 my $secs = $self->interval_secs();
877 $oldest_allowed = min
($epoch - $secs, $merged->{epoch
}||0);
878 if (@
$other_recent and
879 _bigfloatlt
($other_recent->[-1]{epoch
}, $oldest_allowed)
881 $oldest_allowed = $other_recent->[-1]{epoch
};
884 while (@
$my_recent && _bigfloatlt
($my_recent->[-1]{epoch
}, $oldest_allowed)) {
891 my $other_recent_filtered = [];
892 for my $oev (@
$other_recent) {
893 my $oevepoch = $oev->{epoch
} || 0;
894 next if _bigfloatlt
($oevepoch, $oldest_allowed);
895 my $path = $oev->{path
};
896 next if $have_path{$path}++;
897 if ( $self->interval eq "Z"
898 and $oev->{type
} eq "delete"
899 and ! $self->keep_delete_objects_forever
903 if (!$myepoch || _bigfloatgt
($oevepoch, $myepoch)) {
906 push @
$other_recent_filtered, { epoch
=> $oev->{epoch
}, path
=> $path, type
=> $oev->{type
} };
909 if ($something_done) {
910 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \
%have_path, $epoch);
914 sub _merge_something_done
{
915 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
917 my $epoch_conflict = 0;
919 ZIP
: while (@
$other_recent_filtered || @
$my_recent) {
922 @
$other_recent_filtered && _bigfloatge
($other_recent_filtered->[0]{epoch
},$my_recent->[0]{epoch
})) {
923 $event = shift @
$other_recent_filtered;
925 $event = shift @
$my_recent;
926 next ZIP
if $have_path->{$event->{path
}}++;
928 $epoch_conflict=1 if defined $last_epoch && $event->{epoch
} eq $last_epoch;
929 $last_epoch = $event->{epoch
};
930 push @
$recent, $event;
932 if ($epoch_conflict) {
934 for (my $i = $#$recent;$i>=0;$i--) {
935 my $epoch = $recent->[$i]{epoch
};
936 if ($have_epoch{$epoch}++) {
937 while ($have_epoch{$epoch}) {
938 $epoch = _increase_a_bit
($epoch);
940 $recent->[$i]{epoch
} = $epoch;
941 $have_epoch{$epoch}++;
945 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
946 $self->dirtymark ( $other->dirtymark );
948 $self->write_recent($recent);
950 time => Time
::HiRes
::time, # not used anywhere
951 epoch
=> $recent->[0]{epoch
},
952 into_interval
=> $self->interval, # not used anywhere
954 $other->write_recent($other_recent);
957 sub _merge_sanitycheck
{
958 my($self, $other) = @_;
959 if ($self->interval_secs <= $other->interval_secs) {
962 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
963 $self->interval_secs,
964 $other->interval_secs,
971 Hashref denoting when this recentfile has been merged into some other
977 my($self, $set) = @_;
979 $self->_merged ($set);
981 my $merged = $self->_merged;
983 if ($merged and $into = $merged->{into_interval
} and defined $self->_interval) {
985 if ($into eq $self->interval) {
989 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
993 } elsif ($self->interval_secs($into) < $self->interval_secs) {
997 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
998 $self->interval_secs($into),
999 $self->interval_secs,
1007 =head2 $hashref = $obj->meta_data
1009 Returns the hashref of metadata that the server has to add to the
1016 my $ret = $self->{meta
};
1027 "serializer_suffix",
1034 # XXX need to reset the Producer if I am a writer, keep it when I
1036 $ret->{Producers
} ||= {
1037 __PACKAGE__
, "$VERSION", # stringified it looks better
1039 'time', Time
::HiRes
::time,
1041 $ret->{dirtymark
} ||= Time
::HiRes
::time;
1045 =head2 $success = $obj->mirror ( %options )
1047 Mirrors the files in this I<recentfile> as reported by
1048 C<recent_events>. Options named C<after>, C<before>, C<max> are passed
1049 through to the C<recent_events> call. The boolean option C<piecemeal>,
1050 if true, causes C<mirror> to only rsync C<max_files_per_connection>
1051 and keep track of the rsynced files so that future calls will rsync
1052 different files until all files are brought to sync.
1057 my($self, %options) = @_;
1058 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1059 $self->_use_tempfile (1);
1060 # skip-deletes is inadequat for passthrough within mirror. We
1061 # would never reach uptodateness when a delete were on a
1063 my %passthrough = map { ($_ => $options{$_}) } qw(before after max);
1064 my ($recent_events) = $self->recent_events(%passthrough);
1065 my(@error, @dlcollector); # download-collector: array containing paths we need
1067 my $last_item = $#$recent_events;
1068 my $done = $self->done;
1069 my $pathdb = $self->_pathdb;
1070 ITEM
: for my $i ($first_item..$last_item) {
1084 last if $i == $last_item;
1085 if ($status->{mustreturn
}){
1086 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1087 # looks like a bug somewhere else
1088 my $t = $self->_current_tempfile;
1089 unlink $t or die "Could not unlink '$t': $!";
1090 $self->_current_tempfile(undef);
1091 $self->_use_tempfile(0);
1097 my $success = eval { $self->_mirror_dlcollector (\
@dlcollector,$pathdb,$recent_events);};
1098 if (!$success || $@
) {
1099 warn "Warning: Unknown error while mirroring: $@";
1104 if ($self->verbose) {
1105 my $LFH = $self->_logfilehandle;
1106 print $LFH "DONE\n";
1108 # once we've gone to the end we consider ourselves free of obligations
1110 $self->_mirror_unhide_tempfile ($trecentfile);
1111 $self->_mirror_perform_delayed_ops;
1127 my $recent_event = $recent_events->[$i];
1128 return if $done->covered ( $recent_event->{epoch
} );
1130 my $rec = $pathdb->{$recent_event->{path
}};
1131 if ($rec && $rec->{recentepoch
}) {
1133 ( $rec->{recentepoch
}, $recent_event->{epoch
} )){
1134 $done->register ($recent_events, [$i]);
1139 my $dst = $self->local_path($recent_event->{path
});
1140 if ($recent_event->{type
} eq "new"){
1141 $self->_mirror_item_new
1154 } elsif ($recent_event->{type
} eq "delete") {
1156 if ($options->{'skip-deletes'}) {
1157 $activity = "skipped";
1160 $activity = "not_found";
1161 } elsif (-l
$dst or not -d _
) {
1162 $self->delayed_operations->{unlink}{$dst}++;
1163 $activity = "deleted";
1165 $self->delayed_operations->{rmdir}{$dst}++;
1166 $activity = "deleted";
1169 $done->register ($recent_events, [$i]);
1171 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1174 warn "Warning: invalid upload type '$recent_event->{type}'";
1178 sub _mirror_item_new
{
1191 if ($self->verbose) {
1192 my $doing = -e
$dst ?
"Sync" : "Get";
1193 my $LFH = $self->_logfilehandle;
1196 "%-4s %d (%d/%d/%s) %s ... ",
1202 $recent_event->{path
},
1205 my $max_files_per_connection = $self->max_files_per_connection || 42;
1207 if ($self->verbose) {
1208 my $LFH = $self->_logfilehandle;
1211 push @
$dlcollector, { rev
=> $recent_event, i
=> $i };
1212 if (@
$dlcollector >= $max_files_per_connection) {
1213 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1214 my $sleep = $self->sleep_per_connection;
1215 $sleep = 0.42 unless defined $sleep;
1216 Time
::HiRes
::sleep $sleep;
1217 if ($options->{piecemeal
}) {
1218 $status->{mustreturn
} = 1;
1224 if (!$success || $@
) {
1225 warn "Warning: Error while mirroring: $@";
1229 if ($self->verbose) {
1230 my $LFH = $self->_logfilehandle;
1231 print $LFH "DONE\n";
1235 sub _mirror_dlcollector
{
1236 my($self,$xcoll,$pathdb,$recent_events) = @_;
1237 my $success = $self->mirror_path([map {$_->{rev
}{path
}} @
$xcoll]);
1239 $self->_mirror_register_path($pathdb,[map {$_->{rev
}} @
$xcoll],"rsync");
1241 $self->done->register($recent_events, [map {$_->{i
}} @
$xcoll]);
1246 sub _mirror_register_path
{
1247 my($self,$pathdb,$coll,$activity) = @_;
1249 for my $item (@
$coll) {
1250 $pathdb->{$item->{path
}} =
1252 recentepoch
=> $item->{epoch
},
1253 ($activity."_on") => $time,
1258 sub _mirror_unhide_tempfile
{
1259 my($self, $trecentfile) = @_;
1260 my $rfile = $self->rfile;
1261 if (rename $trecentfile, $rfile) {
1262 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1265 Carp
::confess
("Could not rename '$trecentfile' to '$rfile': $!");
1267 $self->_use_tempfile (0);
1268 if (my $ctfh = $self->_current_tempfile_fh) {
1269 $ctfh->unlink_on_destroy (0);
1270 $self->_current_tempfile_fh (undef);
1274 sub _mirror_perform_delayed_ops
{
1276 my $delayed = $self->delayed_operations;
1277 for my $dst (keys %{$delayed->{unlink}}) {
1278 unless (unlink $dst) {
1280 Carp
::cluck
( "Warning: Error while unlinking '$dst': $!" );
1282 if ($self->verbose) {
1284 my $LFH = $self->_logfilehandle;
1287 "%-4s %d (%s) %s DONE\n",
1293 delete $delayed->{unlink}{$dst};
1296 for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
1297 unless (rmdir $dst) {
1299 Carp
::cluck
( "Warning: Error on rmdir '$dst': $!" );
1301 if ($self->verbose) {
1303 my $LFH = $self->_logfilehandle;
1306 "%-4s %d (%s) %s DONE\n",
1312 delete $delayed->{rmdir}{$dst};
1317 =head2 $success = $obj->mirror_path ( $arrref | $path )
1319 If the argument is a scalar it is treated as a path. The remote path
1320 is mirrored into the local copy. $path is the path found in the
1321 I<recentfile>, i.e. it is relative to the root directory of the
1324 If the argument is an array reference then all elements are treated as
1325 a path below the current tree and all are rsynced with a single
1326 command (and a single connection).
1331 my($self,$path) = @_;
1332 # XXX simplify the two branches such that $path is treated as
1333 # [$path] maybe even demand the argument as an arrayref to
1334 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1336 if (ref $path and ref $path eq "ARRAY") {
1337 my $dst = $self->localroot;
1338 mkpath dirname
$dst;
1339 my($fh) = File
::Temp
->new(TEMPLATE
=> sprintf(".%s-XXXX",
1340 lc $self->filenameroot,
1345 for my $p (@
$path) {
1349 $fh->unlink_on_destroy(1);
1352 local($ENV{LANG
}) = "C";
1353 while (!$self->rsync->exec
1359 'files-from' => $fh->filename,
1361 my(@err) = $self->rsync->err;
1362 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1363 if ($self->verbose) {
1364 my $LFH = $self->_logfilehandle;
1365 print $LFH "Info: ignoring link_stat error '@err'";
1369 $self->register_rsync_error (@err);
1370 if (++$retried >= 3) {
1371 my $batchsize = @
$path;
1372 warn "The number of rsync retries now reached 3 within a batch of size $batchsize. Error was '@err'. Giving up now, will retry later, ";
1379 $self->un_register_rsync_error ();
1382 my $dst = $self->local_path($path);
1383 mkpath dirname
$dst;
1384 local($ENV{LANG
}) = "C";
1385 while (!$self->rsync->exec
1393 my(@err) = $self->rsync->err;
1394 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1395 if ($self->verbose) {
1396 my $LFH = $self->_logfilehandle;
1397 print $LFH "Info: ignoring link_stat error '@err'";
1401 $self->register_rsync_error (@err);
1403 $self->un_register_rsync_error ();
1408 sub _my_ignore_link_stat_errors
{
1410 my $x = $self->ignore_link_stat_errors;
1411 $x = 1 unless defined $x;
1415 sub _my_current_rfile
{
1418 if ($self->_use_tempfile) {
1419 $rfile = $self->_current_tempfile;
1421 $rfile = $self->rfile;
1426 =head2 $path = $obj->naive_path_normalize ($path)
1428 Takes an absolute unix style path as argument and canonicalizes it to
1429 a shorter path if possible, removing things like double slashes or
1430 C</./> and removes references to C<../> directories to get a shorter
1431 unambiguos path. This is used to make the code easier that determines
1432 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1436 sub naive_path_normalize
{
1437 my($self,$path) = @_;
1439 1 while $path =~ s
|/[^/]+/\.\./|/|;
1444 =head2 $ret = $obj->read_recent_1 ( $data )
1446 Delegate of C<recent_events()> on protocol 1
1451 my($self, $data) = @_;
1452 return $data->{recent
};
1455 =head2 $array_ref = $obj->recent_events ( %options )
1457 Note: the code relies on the resource being written atomically. We
1458 cannot lock because we may have no write access. If the caller has
1459 write access (eg. aggregate() or update()), it has to care for any
1460 necessary locking and it MUST write atomically.
1462 If C<$options{after}> is specified, only file events after this
1463 timestamp are returned.
1465 If C<$options{before}> is specified, only file events before this
1466 timestamp are returned.
1468 If C<$options{max}> is specified only a maximum of this many events is
1471 If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1474 If C<$options{contains}> is specified the value must be a hash
1475 reference containing a query. The query may contain the keys C<epoch>,
1476 C<path>, and C<type>. Each represents a condition that must be met. If
1477 there is more than one such key, the conditions are ANDed.
1479 If C<$options{info}> is specified, it must be a hashref. This hashref
1480 will be filled with metadata about the unfiltered recent_events of
1481 this object, in key C<first> there is the first item, in key C<last>
1487 my ($self, %options) = @_;
1488 my $info = $options{info
};
1489 if ($self->is_slave) {
1490 # XXX seems dubious, might produce tempfiles without removing them?
1491 $self->get_remote_recentfile_as_tempfile;
1493 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1494 -e
$rfile_or_tempfile or return [];
1495 my $suffix = $self->serializer_suffix;
1497 $self->_try_deserialize
1504 if ($err or !$data) {
1508 if (reftype
$data eq 'ARRAY') { # protocol 0
1511 $re = $self->_recent_events_protocol_x
1517 return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
1518 $self->_recent_events_handle_options ($re, \
%options);
1521 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1522 sub _recent_events_handle_options
{
1523 my($self, $re, $options) = @_;
1524 my $last_item = $#$re;
1525 my $info = $options->{info
};
1527 $info->{first
} = $re->[0];
1528 $info->{last} = $re->[-1];
1530 if (defined $options->{after
}) {
1531 if ($re->[0]{epoch
} > $options->{after
}) {
1534 {$re->[$_]{epoch
} <= $options->{after
}}
1544 if (defined $options->{before
}) {
1545 if ($re->[0]{epoch
} > $options->{before
}) {
1548 {$re->[$_]{epoch
} < $options->{before
}}
1557 if (0 != $first_item || -1 != $last_item) {
1558 @
$re = splice @
$re, $first_item, 1+$last_item-$first_item;
1560 if ($options->{'skip-deletes'}) {
1561 @
$re = grep { $_->{type
} ne "delete" } @
$re;
1563 if (my $contopt = $options->{contains
}) {
1564 my $seen_allowed = 0;
1565 for my $allow (qw(epoch path type)) {
1566 if (exists $contopt->{$allow}) {
1568 my $v = $contopt->{$allow};
1569 @
$re = grep { $_->{$allow} eq $v } @
$re;
1572 if (keys %$contopt > $seen_allowed) {
1575 (sprintf "unknown query: %s", join ", ", %$contopt);
1578 if ($options->{max
} && @
$re > $options->{max
}) {
1579 @
$re = splice @
$re, 0, $options->{max
};
1584 sub _recent_events_protocol_x
{
1589 my $meth = sprintf "read_recent_%d", $data->{meta
}{protocol
};
1590 # we may be reading meta for the first time
1591 while (my($k,$v) = each %{$data->{meta
}}) {
1592 if ($k ne lc $k){ # "Producers"
1593 $self->{ORIG
}{$k} = $v;
1596 next if defined $self->$k;
1599 my $re = $self->$meth ($data);
1601 if (my @stat = stat $rfile_or_tempfile) {
1602 $minmax = { mtime
=> $stat[9] };
1604 # defensive because ABH encountered:
1606 #### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
1607 #### Ydr_.yaml ... DONE
1608 #### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
1609 #### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
1610 #### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
1611 #### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
1612 #### gone already at cpan-pause.pl line 0
1614 my $LFH = $self->_logfilehandle;
1615 print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
1618 $minmax->{min
} = $re->[-1]{epoch
};
1619 $minmax->{max
} = $re->[0]{epoch
};
1621 $self->minmax ( $minmax );
1625 sub _try_deserialize
{
1630 if ($suffix eq ".yaml") {
1632 YAML
::Syck
::LoadFile
($rfile_or_tempfile);
1633 } elsif ($HAVE->{"Data::Serializer"}) {
1634 my $serializer = Data
::Serializer
->new
1635 ( serializer
=> $serializers{$suffix} );
1638 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1642 $serializer->raw_deserialize($serialized);
1644 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1648 sub _refresh_internals
{
1649 my($self, $dst) = @_;
1650 my $class = ref $self;
1651 my $rfpeek = $class->new_from_file ($dst);
1656 $self->$acc ( $rfpeek->$acc );
1658 my $old_dirtymark = $self->dirtymark;
1659 my $new_dirtymark = $rfpeek->dirtymark;
1660 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1662 $self->dirtymark ( $new_dirtymark );
1663 $self->_uptodateness_ever_reached(0);
1668 =head2 $ret = $obj->rfilename
1670 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1671 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1677 my $file = sprintf("%s-%s%s",
1678 $self->filenameroot,
1680 $self->serializer_suffix,
1685 =head2 $str = $self->remote_dir
1687 The directory we are mirroring from.
1692 my($self, $set) = @_;
1694 $self->_remote_dir ($set);
1696 my $x = $self->_remote_dir;
1697 $self->is_slave (1);
1701 =head2 $str = $obj->remoteroot
1703 =head2 (void) $obj->remoteroot ( $set )
1705 Get/Set the composed prefix needed when rsyncing from a remote module.
1706 If remote_host, remote_module, and remote_dir are set, it is composed
1712 my($self, $set) = @_;
1714 $self->_remoteroot($set);
1716 my $remoteroot = $self->_remoteroot;
1717 unless (defined $remoteroot) {
1718 $remoteroot = sprintf
1721 defined $self->remote_host ?
($self->remote_host."::") : "",
1722 defined $self->remote_module ?
($self->remote_module."/") : "",
1723 defined $self->remote_dir ?
$self->remote_dir : "",
1725 $self->_remoteroot($remoteroot);
1730 =head2 (void) $obj->split_rfilename ( $recentfilename )
1732 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1735 $filenameroot-$interval$serializer_suffix
1741 This filename is split into its parts and the parts are fed to the
1746 sub split_rfilename
{
1747 my($self, $rfname) = @_;
1748 my($splitter) = qr
(^(.+)-([^-\
.]+)(\
.[^\
.]+));
1749 if (my($f,$i,$s) = $rfname =~ $splitter) {
1750 $self->filenameroot ($f);
1751 $self->interval ($i);
1752 $self->serializer_suffix ($s);
1754 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1759 =head2 my $rfile = $obj->rfile
1761 Returns the full path of the I<recentfile>
1767 my $rfile = $self->_rfile;
1768 return $rfile if defined $rfile;
1769 $rfile = File
::Spec
->catfile
1773 $self->_rfile ($rfile);
1777 =head2 $rsync_obj = $obj->rsync
1779 The File::Rsync object that this object uses for communicating with an
1786 my $rsync = $self->_rsync;
1787 unless (defined $rsync) {
1788 my $rsync_options = $self->rsync_options || {};
1789 if ($HAVE->{"File::Rsync"}) {
1790 $rsync = File
::Rsync
->new($rsync_options);
1791 $self->_rsync($rsync);
1793 die "File::Rsync required for rsync operations. Cannot continue";
1799 =head2 (void) $obj->register_rsync_error(@err)
1801 =head2 (void) $obj->un_register_rsync_error()
1803 Register_rsync_error is called whenever the File::Rsync object fails
1804 on an exec (say, connection doesn't succeed). It issues a warning and
1805 sleeps for an increasing amount of time. Un_register_rsync_error
1806 resets the error count. See also accessor C<max_rsync_errors>.
1811 my $no_success_count = 0;
1812 my $no_success_time = 0;
1813 sub register_rsync_error
{
1814 my($self, @err) = @_;
1816 $no_success_time = time;
1817 $no_success_count++;
1818 my $max_rsync_errors = $self->max_rsync_errors;
1819 $max_rsync_errors = MAX_INT
unless defined $max_rsync_errors;
1820 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1826 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1832 my $sleep = 12 * $no_success_count;
1833 $sleep = 300 if $sleep > 300;
1838 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1839 scalar(localtime($no_success_time)),
1846 sub un_register_rsync_error
{
1848 $no_success_time = 0;
1849 $no_success_count = 0;
1853 =head2 $clone = $obj->_sparse_clone
1855 Clones just as much from itself that it does not hurt. Experimental
1858 Note: what fits better: sparse or shallow? Other suggestions?
1864 my $new = bless {}, ref $self;
1873 ignore_link_stat_errors
1875 max_files_per_connection
1879 sleep_per_connection
1884 $o = Storable
::dclone
$o if ref $o;
1890 =head2 $boolean = OBJ->ttl_reached ()
1896 my $have_mirrored = $self->have_mirrored || 0;
1897 my $now = Time
::HiRes
::time;
1898 my $ttl = $self->ttl;
1899 $ttl = 24.2 unless defined $ttl;
1900 if ($now > $have_mirrored + $ttl) {
1906 =head2 (void) $obj->unlock()
1908 Unlocking is implemented with an C<rmdir> on a locking directory
1909 (C<.lock> appended to $rfile).
1915 return unless $self->_is_locked;
1916 my $rfile = $self->rfile;
1917 rmdir "$rfile.lock";
1918 $self->_is_locked (0);
1923 Sets this recentfile in the state of not 'seeded'.
1931 =head2 $ret = $obj->update ($path, $type)
1933 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1935 =head2 $ret = $obj->update ()
1937 Enter one file into the local I<recentfile>. $path is the (usually
1938 absolute) path. If the path is outside I<our> tree, then it is
1941 C<$type> is one of C<new> or C<delete>.
1943 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1944 not used and the epoch is calculated by the update() routine itself
1945 based on current time. But if there is the demand to insert a
1946 not-so-current file into the dataset, then the caller sets
1947 $dirty_epoch. This causes the epoch of the registered event to become
1948 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1949 bit more. As compensation the dirtymark of the whole dataset is set to
1950 now or the current epoch, whichever is higher. Note: setting the
1951 dirty_epoch to the future is prohibited as it's very unlikely to be
1952 intended: it definitely might wreak havoc with the index files.
1954 The new file event is unshifted (or, if dirty_epoch is set, inserted
1955 at the place it belongs to, according to the rule to have a sequence
1956 of strictly decreasing timestamps) to the array of recent_events and
1957 the array is shortened to the length of the timespan allowed. This is
1958 usually the timespan specified by the interval of this recentfile but
1959 as long as this recentfile has not been merged to another one, the
1960 timespan may grow without bounds.
1962 The third form runs an update without inserting a new file. This may
1963 be desired to truncate a recentfile.
1966 sub _epoch_monotonically_increasing
{
1967 my($self,$epoch,$recent) = @_;
1968 return $epoch unless @
$recent; # the first one goes unoffended
1969 if (_bigfloatgt
("".$epoch,$recent->[0]{epoch
})) {
1972 return _increase_a_bit
($recent->[0]{epoch
});
1976 my($self,$path,$type,$dirty_epoch) = @_;
1978 my $ctx = $self->_locked_batch_update([{path
=>$path,type
=>$type,epoch
=>$dirty_epoch}]);
1979 $self->write_recent($ctx->{recent
}) if $ctx->{something_done
};
1980 $self->_assert_symlink;
1983 sub _locked_batch_update
{
1984 my($self,$batch) = @_;
1985 my($path,$type,$dirty_epoch) = @
{$batch->[0]}{qw(path type epoch)};
1986 if (defined $path or defined $type or defined $dirty_epoch) {
1987 die "update called without path argument" unless defined $path;
1988 die "update called without type argument" unless defined $type;
1989 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1990 # since we have keep_delete_objects_forever we must let them inject delete objects too:
1991 #die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1992 # "dirty_epoch only allowed with type=new" if defined $dirty_epoch and $type ne "new";
1993 my $canonmeth = $self->canonize;
1994 unless ($canonmeth) {
1995 $canonmeth = "naive_path_normalize";
1997 $path = $self->$canonmeth($path);
1999 my $lrd = $self->localroot;
2000 # you must calculate the time after having locked, of course
2001 my $now = Time
::HiRes
::time;
2002 my $interval = $self->interval;
2003 my $secs = $self->interval_secs();
2004 my $recent = $self->recent_events;
2007 if (defined $dirty_epoch && _bigfloatgt
($now,$dirty_epoch)) {
2008 $epoch = $dirty_epoch;
2010 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
2014 my $oldest_allowed = 0;
2015 my $merged = $self->merged;
2016 if ($merged->{epoch
}) {
2017 my $virtualnow = _bigfloatmax
($now,$epoch);
2018 # for the lower bound I think we need no big math, we calc already
2019 $oldest_allowed = min
($virtualnow - $secs, $merged->{epoch
}, $epoch);
2021 # as long as we are not merged at all, no limits!
2023 my $something_done = 0;
2024 TRUNCATE
: while (@
$recent) {
2025 # $DB::single++ unless defined $oldest_allowed;
2026 if (_bigfloatlt
($recent->[-1]{epoch
}, $oldest_allowed)) {
2028 $something_done = 1;
2033 if (defined $path && $path =~ s
|^\Q
$lrd\E
||) {
2036 # remove the older duplicates of this $path, irrespective of $type:
2037 if (defined $dirty_epoch) {
2038 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
2039 $recent = $ctx->{recent
};
2040 $splicepos = $ctx->{splicepos
};
2041 $epoch = $ctx->{epoch
};
2042 my $dirtymark = $self->dirtymark;
2044 if (_bigfloatgt
($epoch, $now)) { # just in case we had to increase it
2047 $self->dirtymark($new_dm);
2048 my $merged = $self->merged;
2049 if (not defined $merged->{epoch
} or _bigfloatlt
($epoch,$merged->{epoch
})) {
2053 $recent = [ grep { $_->{path
} ne $path } @
$recent ];
2056 if (defined $splicepos) {
2057 splice @
$recent, $splicepos, 0, { epoch
=> $epoch, path
=> $path, type
=> $type };
2059 $something_done = 1;
2061 return {something_done
=>$something_done,recent
=>$recent};
2064 sub _update_with_dirty_epoch
{
2065 my($self,$path,$recent,$epoch) = @_;
2067 my $new_recent = [];
2068 if (grep { $_->{path
} ne $path } @
$recent) {
2070 KNOWN_EVENT
: for my $i (0..$#$recent) {
2071 if ($recent->[$i]{path
} eq $path) {
2072 if ($recent->[$i]{epoch
} eq $epoch) {
2078 push @
$new_recent, $recent->[$i];
2081 @
$recent = @
$new_recent unless $cancel;
2083 if (!exists $recent->[0] or _bigfloatgt
($epoch,$recent->[0]{epoch
})) {
2085 } elsif (_bigfloatlt
($epoch,$recent->[-1]{epoch
})) {
2086 $splicepos = @
$recent;
2088 RECENT
: for my $i (0..$#$recent) {
2089 my $ev = $recent->[$i];
2090 if ($epoch eq $recent->[$i]{epoch
}) {
2091 $epoch = _increase_a_bit
($epoch, $i ?
$recent->[$i-1]{epoch
} : undef);
2093 if (_bigfloatgt
($epoch,$recent->[$i]{epoch
})) {
2101 splicepos
=> $splicepos,
2108 Sets this recentfile in the state of 'seeded' which means it has to
2109 re-evaluate its uptodateness.
2119 Tells if the recentfile is in the state 'seeded'.
2123 my($self, $set) = @_;
2125 $self->_seeded ($set);
2127 my $x = $self->_seeded;
2128 unless (defined $x) {
2130 $self->_seeded ($x);
2137 True if this object has mirrored the complete interval covered by the
2145 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2149 # it's too easy to misconfigure ttl and related timings and then
2150 # never reach uptodateness, so disabled 2009-03-22
2151 if (0 and not defined $uptodate) {
2152 if ($self->ttl_reached){
2153 $why = "ttl_reached returned true, so we are not uptodate";
2157 unless (defined $uptodate) {
2158 # look if recentfile has unchanged timestamp
2159 my $minmax = $self->minmax;
2160 if (exists $minmax->{mtime
}) {
2161 my $rfile = $self->_my_current_rfile;
2162 my @stat = stat $rfile or die "Could not stat '$rfile': $!";
2163 my $mtime = $stat[9];
2164 if (defined $mtime && defined $minmax->{mtime
} && $mtime > $minmax->{mtime
}) {
2165 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2168 my $covered = $self->done->covered(@
$minmax{qw(max min)});
2169 $why = sprintf "minmax covered[%s], so we return that", defined $covered ?
$covered : "UNDEF";
2170 $uptodate = $covered;
2174 unless (defined $uptodate) {
2175 $why = "fallthrough, so not uptodate";
2179 $self->_uptodateness_ever_reached(1);
2183 uptodate
=> $uptodate,
2186 $self->_remember_last_uptodate_call($remember);
2190 =head2 $obj->write_recent ($recent_files_arrayref)
2192 Writes a I<recentfile> based on the current reflection of the current
2193 state of the tree limited by the current interval.
2198 @
{$_[1]} = sort { _bigfloatcmp
($b->{epoch
},$a->{epoch
}) } @
{$_[1]};
2202 my ($self,$recent) = @_;
2203 die "write_recent called without argument" unless defined $recent;
2205 SANITYCHECK
: for my $i (0..$#$recent) {
2206 if (defined($Last_epoch) and _bigfloatge
($recent->[$i]{epoch
},$Last_epoch)) {
2208 Carp
::confess
(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2209 $recent->[$i]{epoch
}, $Last_epoch, $self->interval);
2211 # $self->_resort($recent);
2214 $Last_epoch = $recent->[$i]{epoch
};
2216 my $minmax = $self->minmax;
2217 if (!defined $minmax->{max
} || _bigfloatlt
($minmax->{max
},$recent->[0]{epoch
})) {
2218 $minmax->{max
} = $recent->[0]{epoch
};
2220 if (!defined $minmax->{min
} || _bigfloatlt
($minmax->{min
},$recent->[-1]{epoch
})) {
2221 $minmax->{min
} = $recent->[-1]{epoch
};
2223 $self->minmax($minmax);
2224 my $meth = sprintf "write_%d", $self->protocol;
2225 $self->$meth($recent);
2228 =head2 $obj->write_0 ($recent_files_arrayref)
2230 Delegate of C<write_recent()> on protocol 0
2235 my ($self,$recent) = @_;
2236 my $rfile = $self->rfile;
2237 YAML
::Syck
::DumpFile
("$rfile.new",$recent);
2238 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2241 =head2 $obj->write_1 ($recent_files_arrayref)
2243 Delegate of C<write_recent()> on protocol 1
2248 my ($self,$recent) = @_;
2249 my $rfile = $self->rfile;
2250 my $suffix = $self->serializer_suffix;
2252 meta
=> $self->meta_data,
2256 if ($suffix eq ".yaml") {
2257 $serialized = YAML
::Syck
::Dump
($data);
2258 } elsif ($HAVE->{"Data::Serializer"}) {
2259 my $serializer = Data
::Serializer
->new
2260 ( serializer
=> $serializers{$suffix} );
2261 $serialized = $serializer->raw_serialize($data);
2263 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2265 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2266 print $fh $serialized;
2267 close $fh or die "Could not close '$rfile.new': $!";
2268 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2272 my $nq = qr/[^"]+/; # non-quotes
2274 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2278 The following suffixes are supported and trigger the use of these
2283 =item C<< ".yaml" => "YAML::Syck" >>
2285 =item C<< ".json" => "JSON" >>
2287 =item C<< ".sto" => "Storable" >>
2289 =item C<< ".dd" => "Data::Dumper" >>
2297 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2299 =head1 INTERVAL SPEC
2301 An interval spec is a primitive way to express time spans. Normally it
2302 is composed from an integer and a letter.
2304 As a special case, a string that consists only of the single letter
2305 C<Z>, stands for MAX_INT seconds.
2307 The following letters express the specified number of seconds:
2313 =item C<< m => 60 >>
2315 =item C<< h => 60*60 >>
2317 =item C<< d => 60*60*24 >>
2319 =item C<< W => 60*60*24*7 >>
2321 =item C<< M => 60*60*24*30 >>
2323 =item C<< Q => 60*60*24*90 >>
2325 =item C<< Y => 60*60*24*365.25 >>
2333 L<File::Rsync::Mirror::Recent>,
2334 L<File::Rsync::Mirror::Recentfile::Done>,
2335 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2339 Please report any bugs or feature requests through the web interface
2341 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2342 I will be notified, and then you'll automatically be notified of
2343 progress on your bug as I make changes.
2347 Memory hungry: it seems all memory is allocated during the initial
2348 rsync where a list of all files is maintained in memory.
2352 You can find documentation for this module with the perldoc command.
2354 perldoc File::Rsync::Mirror::Recentfile
2356 You can also look for information at:
2360 =item * RT: CPAN's request tracker
2362 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2364 =item * AnnoCPAN: Annotated CPAN documentation
2366 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2368 =item * CPAN Ratings
2370 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2374 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2379 =head1 ACKNOWLEDGEMENTS
2381 Thanks to RJBS for module-starter.
2387 =head1 COPYRIGHT & LICENSE
2389 Copyright 2008,2009 Andreas König.
2391 This program is free software; you can redistribute it and/or modify it
2392 under the same terms as Perl itself.
2397 1; # End of File::Rsync::Mirror::Recentfile
2401 # cperl-indent-level: 4