1 package File
::Rsync
::Mirror
::Recentfile
;
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
19 $HAVE->{$package} = eval qq{ require $package; };
22 use File
::Basename
qw(basename dirname fileparse);
23 use File
::Copy
qw(cp);
24 use File
::Path
qw(mkpath);
25 use File
::Rsync
::Mirror
::Recentfile
::FakeBigFloat
qw(:all);
27 use List
::Util
qw(first max min);
28 use Scalar
::Util
qw(reftype);
33 use version
; our $VERSION = qv
('0.0.8');
35 use constant MAX_INT
=> ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL
=> 1;
41 # maybe subclass if this mapping is bad?
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
65 localroot => "/home/ftp/pub/PAUSE/authors",
67 remote_host => "pause.perl.org",
68 remote_module => "authors",
71 'rsync-path' => '/usr/bin/rsync',
74 'omit-dir-times' => 1,
81 Aggregator (usually the writer):
83 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
88 Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
89 is always composed of several recentfiles, controlled by the
90 F:R:M:Recent object. The Recentfile object has to do the bookkeeping
91 for a single timeslice.
97 =head1 CONSTRUCTORS / DESTRUCTOR
99 =head2 my $obj = CLASS->new(%hash)
101 Constructor. On every argument pair the key is a method name and the
102 value is an argument to that method name.
104 If a recentfile for this resource already exists, metadata that are
105 not defined by the constructor will be fetched from there as soon as
106 it is being read by recent_events().
111 my($class, @args) = @_;
112 my $self = bless {}, $class;
114 my($method,$arg) = splice @args, 0, 2;
115 $self->$method($arg);
117 unless (defined $self->protocol) {
118 $self->protocol(DEFAULT_PROTOCOL
);
120 unless (defined $self->filenameroot) {
121 $self->filenameroot("RECENT");
123 unless (defined $self->serializer_suffix) {
124 $self->serializer_suffix(".yaml");
129 =head2 my $obj = CLASS->new_from_file($file)
131 Constructor. $file is a I<recentfile>.
136 my($class, $file) = @_;
137 my $self = bless {}, $class;
138 $self->_rfile($file);
140 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
144 # XXX: we can skip this step when the metadata are sufficient, but
145 # we cannot parse the file without some magic stuff about
148 my($name,$path) = fileparse
$file;
149 my $symlink = readlink $file;
150 if ($symlink =~ m
|/|) {
151 die "FIXME: filenames containing '/' not supported, got $symlink";
153 $file = File
::Spec
->catfile ( $path, $symlink );
155 my($name,$path,$suffix) = fileparse
$file, keys %serializers;
156 $self->serializer_suffix($suffix);
157 $self->localroot($path);
158 die "Could not determine file format from suffix" unless $suffix;
160 if ($suffix eq ".yaml") {
162 $deserialized = YAML
::Syck
::LoadFile
($file);
163 } elsif ($HAVE->{"Data::Serializer"}) {
164 my $serializer = Data
::Serializer
->new
165 ( serializer
=> $serializers{$suffix} );
166 $deserialized = $serializer->raw_deserialize($serialized);
168 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
170 while (my($k,$v) = each %{$deserialized->{meta
}}) {
171 next if $k ne lc $k; # "Producers"
174 unless (defined $self->protocol) {
175 $self->protocol(DEFAULT_PROTOCOL
);
188 unless ($self->_current_tempfile_fh) {
189 if (my $tempfile = $self->_current_tempfile) {
191 # unlink $tempfile; # may fail in global destruction
206 "_current_tempfile_fh",
207 "_delayed_operations",
214 "_remember_last_uptodate_call",
220 "__verified_tempdir",
222 "_uptodateness_ever_reached",
227 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
233 A list of interval specs that tell the aggregator which I<recentfile>s
238 The name of a method to canonize the path before rsyncing. Only
239 supported value is C<naive_path_normalize>. Defaults to that.
243 A comment about this tree and setup.
247 A timestamp. The dirtymark is updated whenever an out of band change
248 on the origin server is performed that violates the protocol. Say,
249 they add or remove files in the middle somewhere. Slaves must react
250 with a devaluation of their C<done> structure which then leads to a
251 full re-sync of all files. Implementation note: dirtymark may increase
256 The (prefix of the) filename we use for this I<recentfile>. Defaults to
257 C<RECENT>. The string must not contain a directory separator.
261 Timestamp remembering when we mirrored this recentfile the last time.
262 Only relevant for slaves.
264 =item ignore_link_stat_errors
266 If set to true, rsync errors are ignored that complain about link stat
267 errors. These seem to happen only when there are files missing at the
268 origin. In race conditions this can always happen, so it defaults to
273 If set to true, this object will fetch a new recentfile from remote
274 when the timespan between the last mirror (see have_mirrored) and now
275 is too large (see C<ttl>).
277 =item keep_delete_objects_forever
279 The default for delete events is that they are passed through the
280 collection of recentfile objects until they reach the Z file. There
281 they get dropped so that the associated file object ceases to exist at
282 all. By setting C<keep_delete_objects_forever> the delete objects are
283 kept forever. This makes the Z file larger but has the advantage that
284 slaves that have interrupted mirroring for a long time still can clean
289 After how many seconds shall we die if we cannot lock a I<recentfile>?
290 Defaults to 600 seconds.
294 When mirror_loop is called, this accessor can specify how much time
295 every loop shall at least take. If the work of a loop is done before
296 that time has gone, sleeps for the rest of the time. Defaults to
297 arbitrary 42 seconds.
299 =item max_files_per_connection
301 Maximum number of files that are transferred on a single rsync call.
302 Setting it higher means higher performance at the price of holding
303 connections longer and potentially disturbing other users in the pool.
304 Defaults to the arbitrary value 42.
306 =item max_rsync_errors
308 When rsync operations encounter that many errors without any resetting
309 success in between, then we die. Defaults to unlimited. A value of
310 -1 means we run forever ignoring all rsync errors.
314 Hashref remembering when we read the recent_events from this file the
315 last time and what the timespan was.
319 When the RECENT file format changes, we increment the protocol. We try
320 to support older protocols in later releases.
324 The host we are mirroring from. Leave empty for the local filesystem.
328 Rsync servers have so called modules to separate directory trees from
329 each other. Put here the name of the module under which we are
330 mirroring. Leave empty for local filesystem.
334 Things like compress, links, times or checksums. Passed in to the
335 File::Rsync object used to run the mirror.
337 =item serializer_suffix
339 Mostly untested accessor. The only well tested format for
340 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
341 Data::Serializer. But in principle other formats are supported as
342 well. See section SERIALIZERS below.
344 =item sleep_per_connection
346 Sleep that many seconds (floating point OK) after every chunk of rsyncing
347 has finished. Defaults to arbitrary 0.42.
351 Directory to write temporary files to. Must allow rename operations
352 into the tree which usually means it must live on the same partition
353 as the target directory. Defaults to C<< $self->localroot >>.
357 Time to live. Number of seconds after which this recentfile must be
358 fetched again from the origin server. Only relevant for slaves.
359 Defaults to arbitrary 24.2 seconds.
363 Boolean to turn on a bit verbosity.
367 Path to the logfile to write verbose progress information to. This is
368 a primitive stop gap solution to get simple verbose logging working.
369 Switching to Log4perl or similar is probably the way to go.
375 use accessors
@accessors;
379 =head2 (void) $obj->aggregate( %options )
381 Takes all intervals that are collected in the accessor called
382 aggregator. Sorts them by actual length of the interval.
383 Removes those that are shorter than our own interval. Then merges this
384 object into the next larger object. The merging continues upwards
385 as long as the next I<recentfile> is old enough to warrant a merge.
387 If a merge is warranted is decided according to the interval of the
388 previous interval so that larger files are not so often updated as
389 smaller ones. If $options{force} is true, all files get updated.
391 Here is an example to illustrate the behaviour. Given aggregators
397 1h updates 1d on every call to aggregate()
398 1d updates 1W earliest after 1h
399 1W updates 1M earliest after 1d
400 1M updates 1Q earliest after 1W
401 1Q updates 1Y earliest after 1M
402 1Y updates Z earliest after 1Q
404 Note that all but the smallest recentfile get updated at an arbitrary
405 rate and as such are quite useless on their own.
410 my($self, %option) = @_;
412 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
413 grep { !$seen_interval{$_->{interval
}}++ && $_->{secs
} >= $self->interval_secs }
414 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
415 $self->interval, @
{$self->aggregator || []};
417 $aggs[0]{object
} = $self;
418 AGGREGATOR
: for my $i (0..$#aggs-1) {
419 my $this = $aggs[$i]{object
};
420 my $next = $this->_sparse_clone;
421 $next->interval($aggs[$i+1]{interval
});
423 if ($option{force
} || $i == 0) {
426 my $next_rfile = $next->rfile;
427 if (-e
$next_rfile) {
428 my $prev = $aggs[$i-1]{object
};
430 my $next_age = 86400 * -M
$next_rfile;
431 if ($next_age > $prev->interval_secs) {
440 $aggs[$i+1]{object
} = $next;
447 # collect file size and mtime for all files of this aggregate
448 sub _debug_aggregate
{
450 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
451 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
452 $self->interval, @
{$self->aggregator || []};
454 for my $i (0..$#aggs) {
455 my $this = Storable
::dclone
$self;
456 $this->interval($aggs[$i]{interval
});
457 my $rfile = $this->rfile;
458 my @stat = stat $rfile;
459 push @
$report, {rfile
=> $rfile, size
=> $stat[7], mtime
=> $stat[9]};
464 # (void) $self->_assert_symlink()
465 sub _assert_symlink
{
467 my $recentrecentfile = File
::Spec
->catfile
476 if ($Config{d_symlink
} eq "define") {
477 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
478 if (-l
$recentrecentfile) {
479 my $found_symlink = readlink $recentrecentfile;
480 if ($found_symlink eq $self->rfilename) {
483 $howto_create_symlink = 2;
486 $howto_create_symlink = 1;
488 if (1 == $howto_create_symlink) {
489 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
491 unlink "$recentrecentfile.$$"; # may fail
492 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
493 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
496 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
497 unlink "$recentrecentfile.$$"; # may fail
498 cp
$self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
499 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
503 =head2 $hashref = $obj->delayed_operations
505 A hash of hashes containing unlink and rmdir operations which had to
506 wait until the recentfile got unhidden in order to not confuse
507 downstream mirrors (in case we have some).
511 sub delayed_operations
{
513 my $x = $self->_delayed_operations;
514 unless (defined $x) {
519 $self->_delayed_operations ($x);
524 =head2 $done = $obj->done
526 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
527 object that keeps track of rsync activities. Only needed and used when
528 we are a mirroring slave.
534 my $done = $self->_done;
536 require File
::Rsync
::Mirror
::Recentfile
::Done
;
537 $done = File
::Rsync
::Mirror
::Recentfile
::Done
->new();
538 $done->_rfinterval ($self->interval);
539 $self->_done ( $done );
544 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
546 Stores the remote I<recentfile> locally as a tempfile. The caller is
547 responsible to remove the file after use.
549 Note: if you're intending to act as an rsync server for other slaves,
550 then you must prefer this method to fetch that file with
551 get_remotefile(). Otherwise downstream mirrors would expect you to
552 already have mirrored all the files that are in the I<recentfile>
553 before you have them mirrored.
557 sub get_remote_recentfile_as_tempfile
{
559 mkpath
$self->localroot;
562 if ( $self->_use_tempfile() ) {
563 if ($self->ttl_reached) {
564 $fh = $self->_current_tempfile_fh;
565 $trfilename = $self->rfilename;
567 return $self->_current_tempfile;
570 $trfilename = $self->rfilename;
575 $dst = $self->_current_tempfile;
577 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
578 $dst = $fh->filename;
579 $self->_current_tempfile ($dst);
580 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
581 if (defined $rfile && -e
$rfile) {
582 # saving on bandwidth. Might need to be configurable
583 # $self->bandwidth_is_cheap?
584 cp
$rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
591 if ($self->verbose) {
592 my $doing = -e
$dst ?
"Sync" : "Get";
593 my $display_dst = join "/", "...", basename
(dirname
($dst)), basename
($dst);
594 my $LFH = $self->_logfilehandle;
597 "%-4s %d (1/1/%s) temp %s ... ",
606 local($ENV{LANG
}) = "C";
607 while (!$self->rsync->exec(
611 $self->register_rsync_error ($self->rsync->err);
612 if (++$retried >= 3) {
613 warn "XXX giving up";
619 my $LFH = $self->_logfilehandle;
620 printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
622 $self->_refresh_internals ($dst);
623 $self->have_mirrored (Time
::HiRes
::time);
624 $self->un_register_rsync_error ();
627 if ($self->verbose) {
628 my $LFH = $self->_logfilehandle;
632 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
636 sub _verified_tempdir
{
638 my $tempdir = $self->__verified_tempdir();
639 return $tempdir if defined $tempdir;
640 unless ($tempdir = $self->tempdir) {
641 $tempdir = $self->localroot;
643 unless (-d
$tempdir) {
646 $self->__verified_tempdir($tempdir);
650 sub _get_remote_rat_provide_tempfile_object
{
651 my($self, $trfilename) = @_;
652 my $_verified_tempdir = $self->_verified_tempdir;
653 my $fh = File
::Temp
->new
654 (TEMPLATE
=> sprintf(".FRMRecent-%s-XXXX",
657 DIR
=> $_verified_tempdir,
658 SUFFIX
=> $self->serializer_suffix,
659 UNLINK
=> $self->_use_tempfile,
662 my $dst = $fh->filename;
663 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
664 if ($self->_use_tempfile) {
665 $self->_current_tempfile_fh ($fh); # delay self destruction
673 if (my $vl = $self->verboselog) {
674 open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
681 =head2 $localpath = $obj->get_remotefile ( $relative_path )
683 Rsyncs one single remote file to local filesystem.
685 Note: no locking is done on this file. Any number of processes may
688 Note II: do not use for recentfiles. If you are a cascading
689 slave/server combination, it would confuse other slaves. They would
690 expect the contents of these recentfiles to be available. Use
691 get_remote_recentfile_as_tempfile() instead.
696 my($self, $path) = @_;
697 my $dst = File
::Spec
->catfile($self->localroot, $path);
699 if ($self->verbose) {
700 my $doing = -e
$dst ?
"Sync" : "Get";
701 my $LFH = $self->_logfilehandle;
704 "%-4s %d (1/1/%s) %s ... ",
711 local($ENV{LANG
}) = "C";
712 my $remoteroot = $self->remoteroot or die "Alert: missing remoteroot. Cannot continue";
713 while (!$self->rsync->exec(
719 $self->register_rsync_error ($self->rsync->err);
721 $self->un_register_rsync_error ();
722 if ($self->verbose) {
723 my $LFH = $self->_logfilehandle;
729 =head2 $obj->interval ( $interval_spec )
731 Get/set accessor. $interval_spec is a string and described below in
732 the section INTERVAL SPEC.
737 my ($self, $interval) = @_;
739 $self->_interval($interval);
740 $self->_rfile(undef);
742 $interval = $self->_interval;
743 unless (defined $interval) {
744 # do not ask the $self too much, it recurses!
746 Carp
::confess
("Alert: interval undefined for '".$self."'. Cannot continue.");
751 =head2 $secs = $obj->interval_secs ( $interval_spec )
753 $interval_spec is described below in the section INTERVAL SPEC. If
754 empty defaults to the inherent interval for this object.
759 my ($self, $interval) = @_;
760 $interval ||= $self->interval;
761 unless (defined $interval) {
762 die "interval_secs() called without argument on an object without a declared one";
764 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
765 die "Could not determine seconds from interval[$interval]";
766 if ($interval eq "Z") {
768 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
769 return $seconds{$t}*$n;
771 die "Invalid interval specification: n[$n]t[$t]";
775 =head2 $obj->localroot ( $localroot )
777 Get/set accessor. The local root of the tree. Guaranteed without
783 my ($self, $localroot) = @_;
785 $localroot =~ s
|/$||;
786 $self->_localroot($localroot);
787 $self->_rfile(undef);
789 $localroot = $self->_localroot;
792 =head2 $ret = $obj->local_path($path_found_in_recentfile)
794 Combines the path to our local mirror and the path of an object found
795 in this I<recentfile>. In other words: the target of a mirror operation.
797 Implementation note: We split on slashes and then use
798 File::Spec::catfile to adjust to the local operating system.
803 my($self,$path) = @_;
804 unless (defined $path) {
805 # seems like a degenerated case
806 return $self->localroot;
808 my @p = split m
|/|, $path;
809 File
::Spec
->catfile($self->localroot,@p);
812 =head2 (void) $obj->lock
814 Locking is implemented with an C<mkdir> on a locking directory
815 (C<.lock> appended to $rfile).
821 # not using flock because it locks on filehandles instead of
822 # old school ressources.
823 my $locked = $self->_is_locked and return;
824 my $rfile = $self->rfile;
825 # XXX need a way to allow breaking the lock
827 my $locktimeout = $self->locktimeout || 600;
829 my $lockdir = "$rfile.lock";
830 my $procfile = "$lockdir/process";
831 GETLOCK
: while (not mkdir $lockdir) {
832 if (open my $fh, "<", $procfile) {
833 chomp(my $process = <$fh>);
835 } elsif ($process !~ /^\d+$/) {
836 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown
}++;
837 } elsif ($$ == $process) {
839 } elsif (kill 0, $process) {
840 warn "Warning: process $process holds a lock in '$lockdir', waiting..." unless $have_warned{$process}++;
842 warn "Warning: breaking lock held by process $process";
847 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown
}++;
849 Time
::HiRes
::sleep 0.01;
850 if (time - $start > $locktimeout) {
851 die "Could not acquire lockdirectory '$rfile.lock': $!";
854 open my $fh, ">", $procfile or die "Could not open >$procfile\: $!";
856 close $fh or die "Could not close: $!";
857 $self->_is_locked (1);
860 =head2 (void) $obj->merge ($other)
862 Bulk update of this object with another one. It's used to merge a
863 smaller and younger $other object into the current one. If this file
864 is a C<Z> file, then we normally do not merge in objects of type
865 C<delete>; this can be overridden by setting
866 keep_delete_objects_forever. But if we encounter an object of type
867 delete we delete the corresponding C<new> object if we have it.
869 If there is nothing to be merged, nothing is done.
874 my($self, $other) = @_;
875 $self->_merge_sanitycheck ( $other );
877 my $other_recent = $other->recent_events || [];
879 $self->_merge_locked ( $other, $other_recent );
885 my($self, $other, $other_recent) = @_;
886 my $my_recent = $self->recent_events || [];
888 # calculate the target time span
889 my $myepoch = $my_recent->[0] ?
$my_recent->[0]{epoch
} : undef;
890 my $epoch = $other_recent->[0] ?
$other_recent->[0]{epoch
} : $myepoch;
891 my $oldest_allowed = 0;
893 unless ($my_recent->[0]) {
898 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
901 } elsif (my $merged = $self->merged) {
902 my $secs = $self->interval_secs();
903 $oldest_allowed = min
($epoch - $secs, $merged->{epoch
}||0);
904 if (@
$other_recent and
905 _bigfloatlt
($other_recent->[-1]{epoch
}, $oldest_allowed)
907 $oldest_allowed = $other_recent->[-1]{epoch
};
910 while (@
$my_recent && _bigfloatlt
($my_recent->[-1]{epoch
}, $oldest_allowed)) {
917 my $other_recent_filtered = [];
918 for my $oev (@
$other_recent) {
919 my $oevepoch = $oev->{epoch
} || 0;
920 next if _bigfloatlt
($oevepoch, $oldest_allowed);
921 my $path = $oev->{path
};
922 next if $have_path{$path}++;
923 if ( $self->interval eq "Z"
924 and $oev->{type
} eq "delete"
925 and ! $self->keep_delete_objects_forever
929 if (!$myepoch || _bigfloatgt
($oevepoch, $myepoch)) {
932 push @
$other_recent_filtered, { epoch
=> $oev->{epoch
}, path
=> $path, type
=> $oev->{type
} };
935 if ($something_done) {
936 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \
%have_path, $epoch);
940 sub _merge_something_done
{
941 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
943 my $epoch_conflict = 0;
945 ZIP
: while (@
$other_recent_filtered || @
$my_recent) {
948 @
$other_recent_filtered && _bigfloatge
($other_recent_filtered->[0]{epoch
},$my_recent->[0]{epoch
})) {
949 $event = shift @
$other_recent_filtered;
951 $event = shift @
$my_recent;
952 next ZIP
if $have_path->{$event->{path
}}++;
954 $epoch_conflict=1 if defined $last_epoch && $event->{epoch
} eq $last_epoch;
955 $last_epoch = $event->{epoch
};
956 push @
$recent, $event;
958 if ($epoch_conflict) {
960 for (my $i = $#$recent;$i>=0;$i--) {
961 my $epoch = $recent->[$i]{epoch
};
962 if ($have_epoch{$epoch}++) {
963 while ($have_epoch{$epoch}) {
964 $epoch = _increase_a_bit
($epoch);
966 $recent->[$i]{epoch
} = $epoch;
967 $have_epoch{$epoch}++;
971 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
972 $self->dirtymark ( $other->dirtymark );
974 $self->write_recent($recent);
976 time => Time
::HiRes
::time, # not used anywhere
977 epoch
=> $recent->[0]{epoch
},
978 into_interval
=> $self->interval, # not used anywhere
980 $other->write_recent($other_recent);
983 sub _merge_sanitycheck
{
984 my($self, $other) = @_;
985 if ($self->interval_secs <= $other->interval_secs) {
990 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
991 $self->interval_secs,
992 $other->interval_secs,
999 Hashref denoting when this recentfile has been merged into some other
1005 my($self, $set) = @_;
1007 $self->_merged ($set);
1009 my $merged = $self->_merged;
1011 if ($merged and $into = $merged->{into_interval
} and defined $self->_interval) {
1013 if ($into eq $self->interval) {
1017 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
1021 } elsif ($self->interval_secs($into) < $self->interval_secs) {
1025 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
1026 $self->interval_secs($into),
1027 $self->interval_secs,
1035 =head2 $hashref = $obj->meta_data
1037 Returns the hashref of metadata that the server has to add to the
1044 my $ret = $self->{meta
};
1055 "serializer_suffix",
1062 # XXX need to reset the Producer if I am a writer, keep it when I
1064 $ret->{Producers
} ||= {
1065 __PACKAGE__
, "$VERSION", # stringified it looks better
1067 'time', Time
::HiRes
::time,
1069 $ret->{dirtymark
} ||= Time
::HiRes
::time;
1073 =head2 $success = $obj->mirror ( %options )
1075 Mirrors the files in this I<recentfile> as reported by
1076 C<recent_events>. Options named C<after>, C<before>, C<max> are passed
1077 through to the C<recent_events> call. The boolean option C<piecemeal>,
1078 if true, causes C<mirror> to only rsync C<max_files_per_connection>
1079 and keep track of the rsynced files so that future calls will rsync
1080 different files until all files are brought to sync.
1085 my($self, %options) = @_;
1086 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1087 $self->_use_tempfile (1);
1088 # skip-deletes is inadequat for passthrough within mirror. We
1089 # would never reach uptodateness when a delete were on a
1091 my %passthrough = map { ($_ => $options{$_}) } qw(before after max);
1092 my ($recent_events) = $self->recent_events(%passthrough);
1093 my(@error, @dlcollector); # download-collector: array containing paths we need
1095 my $last_item = $#$recent_events;
1096 my $done = $self->done;
1097 my $pathdb = $self->_pathdb;
1098 ITEM
: for my $i ($first_item..$last_item) {
1112 last if $i == $last_item;
1113 if ($status->{mustreturn
}){
1114 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1115 # looks like a bug somewhere else
1116 my $t = $self->_current_tempfile;
1117 unlink $t or die "Could not unlink '$t': $!";
1118 $self->_current_tempfile(undef);
1119 $self->_use_tempfile(0);
1125 my $success = eval { $self->_mirror_dlcollector (\
@dlcollector,$pathdb,$recent_events);};
1126 if (!$success || $@
) {
1127 warn "Warning: Unknown error while mirroring: $@";
1132 if ($self->verbose) {
1133 my $LFH = $self->_logfilehandle;
1134 print $LFH "DONE\n";
1136 # once we've gone to the end we consider ourselves free of obligations
1138 $self->_mirror_unhide_tempfile ($trecentfile);
1139 $self->_mirror_perform_delayed_ops(\
%options);
1155 my $recent_event = $recent_events->[$i];
1156 return if $done->covered ( $recent_event->{epoch
} );
1158 my $rec = $pathdb->{$recent_event->{path
}};
1159 if ($rec && $rec->{recentepoch
}) {
1161 ( $rec->{recentepoch
}, $recent_event->{epoch
} )){
1162 $done->register ($recent_events, [$i]);
1167 my $dst = $self->local_path($recent_event->{path
});
1168 if ($recent_event->{type
} eq "new"){
1169 $self->_mirror_item_new
1182 } elsif ($recent_event->{type
} eq "delete") {
1184 if ($options->{'skip-deletes'}) {
1185 $activity = "skipped";
1188 $activity = "not_found";
1189 } elsif (-l
$dst or not -d _
) {
1190 $self->delayed_operations->{unlink}{$dst}++;
1191 $activity = "deleted";
1193 $self->delayed_operations->{rmdir}{$dst}++;
1194 $activity = "deleted";
1197 $done->register ($recent_events, [$i]);
1199 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1202 warn "Warning: invalid upload type '$recent_event->{type}'";
1206 sub _mirror_item_new
{
1219 if ($self->verbose) {
1220 my $doing = -e
$dst ?
"Sync" : "Get";
1221 my $LFH = $self->_logfilehandle;
1224 "%-4s %d (%d/%d/%s) %s ... ",
1230 $recent_event->{path
},
1233 my $max_files_per_connection = $self->max_files_per_connection || 42;
1235 if ($self->verbose) {
1236 my $LFH = $self->_logfilehandle;
1239 push @
$dlcollector, { rev
=> $recent_event, i
=> $i };
1240 if (@
$dlcollector >= $max_files_per_connection) {
1241 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1242 my $sleep = $self->sleep_per_connection;
1243 $sleep = 0.42 unless defined $sleep;
1244 Time
::HiRes
::sleep $sleep;
1245 if ($options->{piecemeal
}) {
1246 $status->{mustreturn
} = 1;
1252 if (!$success || $@
) {
1253 warn "Warning: Error while mirroring: $@";
1257 if ($self->verbose) {
1258 my $LFH = $self->_logfilehandle;
1259 print $LFH "DONE\n";
1263 sub _mirror_dlcollector
{
1264 my($self,$xcoll,$pathdb,$recent_events) = @_;
1265 my $success = $self->mirror_path([map {$_->{rev
}{path
}} @
$xcoll]);
1267 $self->_mirror_register_path($pathdb,[map {$_->{rev
}} @
$xcoll],"rsync");
1269 $self->done->register($recent_events, [map {$_->{i
}} @
$xcoll]);
1274 sub _mirror_register_path
{
1275 my($self,$pathdb,$coll,$activity) = @_;
1277 for my $item (@
$coll) {
1278 $pathdb->{$item->{path
}} =
1280 recentepoch
=> $item->{epoch
},
1281 ($activity."_on") => $time,
1286 sub _mirror_unhide_tempfile
{
1287 my($self, $trecentfile) = @_;
1288 my $rfile = $self->rfile;
1289 if (rename $trecentfile, $rfile) {
1290 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1293 Carp
::confess
("Could not rename '$trecentfile' to '$rfile': $!");
1295 $self->_use_tempfile (0);
1296 if (my $ctfh = $self->_current_tempfile_fh) {
1297 $ctfh->unlink_on_destroy (0);
1298 $self->_current_tempfile_fh (undef);
1302 sub _mirror_perform_delayed_ops
{
1303 my($self,$options) = @_;
1304 my $delayed = $self->delayed_operations;
1305 for my $dst (keys %{$delayed->{unlink}}) {
1306 unless (unlink $dst) {
1308 Carp
::cluck
( "Warning: Error while unlinking '$dst': $!" ) if $options->{verbose
};
1310 if ($self->verbose) {
1312 my $LFH = $self->_logfilehandle;
1315 "%-4s %d (%s) %s DONE\n",
1321 delete $delayed->{unlink}{$dst};
1324 for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
1325 unless (rmdir $dst) {
1327 Carp
::cluck
( "Warning: Error on rmdir '$dst': $!" ) if $options->{verbose
};
1329 if ($self->verbose) {
1331 my $LFH = $self->_logfilehandle;
1334 "%-4s %d (%s) %s DONE\n",
1340 delete $delayed->{rmdir}{$dst};
1345 =head2 $success = $obj->mirror_path ( $arrref | $path )
1347 If the argument is a scalar it is treated as a path. The remote path
1348 is mirrored into the local copy. $path is the path found in the
1349 I<recentfile>, i.e. it is relative to the root directory of the
1352 If the argument is an array reference then all elements are treated as
1353 a path below the current tree and all are rsynced with a single
1354 command (and a single connection).
1359 my($self,$path) = @_;
1360 # XXX simplify the two branches such that $path is treated as
1361 # [$path] maybe even demand the argument as an arrayref to
1362 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1364 if (ref $path and ref $path eq "ARRAY") {
1365 my $dst = $self->localroot;
1366 mkpath dirname
$dst;
1367 my($fh) = File
::Temp
->new(TEMPLATE
=> sprintf(".%s-XXXX",
1368 lc $self->filenameroot,
1373 for my $p (@
$path) {
1377 $fh->unlink_on_destroy(1);
1380 local($ENV{LANG
}) = "C";
1381 while (!$self->rsync->exec
1387 'files-from' => $fh->filename,
1389 my(@err) = $self->rsync->err;
1390 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1391 if ($self->verbose) {
1392 my $LFH = $self->_logfilehandle;
1393 print $LFH "Info: ignoring link_stat error '@err'";
1397 $self->register_rsync_error (@err);
1398 if (++$retried >= 3) {
1399 my $batchsize = @
$path;
1400 warn "The number of rsync retries now reached 3 within a batch of size $batchsize. Error was '@err'. Giving up now, will retry later, ";
1407 $self->un_register_rsync_error ();
1410 my $dst = $self->local_path($path);
1411 mkpath dirname
$dst;
1412 local($ENV{LANG
}) = "C";
1413 while (!$self->rsync->exec
1421 my(@err) = $self->rsync->err;
1422 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1423 if ($self->verbose) {
1424 my $LFH = $self->_logfilehandle;
1425 print $LFH "Info: ignoring link_stat error '@err'";
1429 $self->register_rsync_error (@err);
1431 $self->un_register_rsync_error ();
1436 sub _my_ignore_link_stat_errors
{
1438 my $x = $self->ignore_link_stat_errors;
1439 $x = 1 unless defined $x;
1443 sub _my_current_rfile
{
1446 if ($self->_use_tempfile) {
1447 $rfile = $self->_current_tempfile;
1449 unless ($rfile && -s
$rfile) {
1450 $rfile = $self->rfile;
1455 =head2 $path = $obj->naive_path_normalize ($path)
1457 Takes an absolute unix style path as argument and canonicalizes it to
1458 a shorter path if possible, removing things like double slashes or
1459 C</./> and removes references to C<../> directories to get a shorter
1460 unambiguos path. This is used to make the code easier that determines
1461 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1465 sub naive_path_normalize
{
1466 my($self,$path) = @_;
1468 1 while $path =~ s
|/[^/]+/\.\./|/|;
1473 =head2 $ret = $obj->read_recent_1 ( $data )
1475 Delegate of C<recent_events()> on protocol 1
1480 my($self, $data) = @_;
1481 return $data->{recent
};
1484 =head2 $array_ref = $obj->recent_events ( %options )
1486 Note: the code relies on the resource being written atomically. We
1487 cannot lock because we may have no write access. If the caller has
1488 write access (eg. aggregate() or update()), it has to care for any
1489 necessary locking and it MUST write atomically.
1491 If C<$options{after}> is specified, only file events after this
1492 timestamp are returned.
1494 If C<$options{before}> is specified, only file events before this
1495 timestamp are returned.
1497 If C<$options{max}> is specified only a maximum of this many most
1498 recent events is returned.
1500 If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1503 If C<$options{contains}> is specified the value must be a hash
1504 reference containing a query. The query may contain the keys C<epoch>,
1505 C<path>, and C<type>. Each represents a condition that must be met. If
1506 there is more than one such key, the conditions are ANDed.
1508 If C<$options{info}> is specified, it must be a hashref. This hashref
1509 will be filled with metadata about the unfiltered recent_events of
1510 this object, in key C<first> there is the first item, in key C<last>
1516 my ($self, %options) = @_;
1517 my $info = $options{info
};
1518 if ($self->is_slave) {
1519 # XXX seems dubious, might produce tempfiles without removing them?
1520 $self->get_remote_recentfile_as_tempfile;
1522 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1523 -e
$rfile_or_tempfile or return [];
1524 my $suffix = $self->serializer_suffix;
1526 $self->_try_deserialize
1533 if ($err or !$data) {
1537 if (reftype
$data eq 'ARRAY') { # protocol 0
1540 $re = $self->_recent_events_protocol_x
1546 return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
1547 $self->_recent_events_handle_options ($re, \
%options);
1550 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1551 sub _recent_events_handle_options
{
1552 my($self, $re, $options) = @_;
1553 my $last_item = $#$re;
1554 my $info = $options->{info
};
1556 $info->{first
} = $re->[0];
1557 $info->{last} = $re->[-1];
1559 if (defined $options->{after
}) {
1560 if ($re->[0]{epoch
} > $options->{after
}) {
1563 {$re->[$_]{epoch
} <= $options->{after
}}
1573 if (defined $options->{before
}) {
1574 if ($re->[0]{epoch
} > $options->{before
}) {
1577 {$re->[$_]{epoch
} < $options->{before
}}
1586 if (0 != $first_item || -1 != $last_item) {
1587 @
$re = splice @
$re, $first_item, 1+$last_item-$first_item;
1589 if ($options->{'skip-deletes'}) {
1590 @
$re = grep { $_->{type
} ne "delete" } @
$re;
1592 if (my $contopt = $options->{contains
}) {
1593 my $seen_allowed = 0;
1594 for my $allow (qw(epoch path type)) {
1595 if (exists $contopt->{$allow}) {
1597 my $v = $contopt->{$allow};
1598 @
$re = grep { $_->{$allow} eq $v } @
$re;
1601 if (keys %$contopt > $seen_allowed) {
1604 (sprintf "unknown query: %s", join ", ", %$contopt);
1607 if ($options->{max
} && @
$re > $options->{max
}) {
1608 @
$re = splice @
$re, 0, $options->{max
};
1613 sub _recent_events_protocol_x
{
1618 my $meth = sprintf "read_recent_%d", $data->{meta
}{protocol
};
1619 # we may be reading meta for the first time
1620 while (my($k,$v) = each %{$data->{meta
}}) {
1621 if ($k ne lc $k){ # "Producers"
1622 $self->{ORIG
}{$k} = $v;
1625 next if defined $self->$k;
1628 my $re = $self->$meth ($data);
1630 if (my @stat = stat $rfile_or_tempfile) {
1631 $minmax = { mtime
=> $stat[9] };
1633 # defensive because ABH encountered:
1635 #### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
1636 #### Ydr_.yaml ... DONE
1637 #### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
1638 #### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
1639 #### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
1640 #### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
1641 #### gone already at cpan-pause.pl line 0
1643 my $LFH = $self->_logfilehandle;
1644 print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
1647 $minmax->{min
} = $re->[-1]{epoch
};
1648 $minmax->{max
} = $re->[0]{epoch
};
1650 $self->minmax ( $minmax );
1654 sub _try_deserialize
{
1659 if ($suffix eq ".yaml") {
1661 YAML
::Syck
::LoadFile
($rfile_or_tempfile);
1662 } elsif ($HAVE->{"Data::Serializer"}) {
1663 my $serializer = Data
::Serializer
->new
1664 ( serializer
=> $serializers{$suffix} );
1667 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1671 $serializer->raw_deserialize($serialized);
1673 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1677 sub _refresh_internals
{
1678 my($self, $dst) = @_;
1679 my $class = ref $self;
1680 my $rfpeek = $class->new_from_file ($dst);
1685 $self->$acc ( $rfpeek->$acc );
1687 my $old_dirtymark = $self->dirtymark;
1688 my $new_dirtymark = $rfpeek->dirtymark;
1689 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1691 $self->dirtymark ( $new_dirtymark );
1692 $self->_uptodateness_ever_reached(0);
1697 =head2 $ret = $obj->rfilename
1699 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1700 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1706 my $file = sprintf("%s-%s%s",
1707 $self->filenameroot,
1709 $self->serializer_suffix,
1714 =head2 $str = $self->remote_dir
1716 The directory we are mirroring from.
1721 my($self, $set) = @_;
1723 $self->_remote_dir ($set);
1725 my $x = $self->_remote_dir;
1726 $self->is_slave (1);
1730 =head2 $str = $obj->remoteroot
1732 =head2 (void) $obj->remoteroot ( $set )
1734 Get/Set the composed prefix needed when rsyncing from a remote module.
1735 If remote_host, remote_module, and remote_dir are set, it is composed
1741 my($self, $set) = @_;
1743 $self->_remoteroot($set);
1745 my $remoteroot = $self->_remoteroot;
1746 unless (defined $remoteroot) {
1747 $remoteroot = sprintf
1750 defined $self->remote_host ?
($self->remote_host."::") : "",
1751 defined $self->remote_module ?
($self->remote_module."/") : "",
1752 defined $self->remote_dir ?
$self->remote_dir : "",
1754 $self->_remoteroot($remoteroot);
1759 =head2 (void) $obj->split_rfilename ( $recentfilename )
1761 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1764 $filenameroot-$interval$serializer_suffix
1770 This filename is split into its parts and the parts are fed to the
1775 sub split_rfilename
{
1776 my($self, $rfname) = @_;
1777 my($splitter) = qr
(^(.+)-([^-\
.]+)(\
.[^\
.]+));
1778 if (my($f,$i,$s) = $rfname =~ $splitter) {
1779 $self->filenameroot ($f);
1780 $self->interval ($i);
1781 $self->serializer_suffix ($s);
1783 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1788 =head2 my $rfile = $obj->rfile
1790 Returns the full path of the I<recentfile>
1796 my $rfile = $self->_rfile;
1797 return $rfile if defined $rfile;
1798 $rfile = File
::Spec
->catfile
1802 $self->_rfile ($rfile);
1806 =head2 $rsync_obj = $obj->rsync
1808 The File::Rsync object that this object uses for communicating with an
1815 my $rsync = $self->_rsync;
1816 unless (defined $rsync) {
1817 my $rsync_options = $self->rsync_options || {};
1818 if ($HAVE->{"File::Rsync"}) {
1819 $rsync = File
::Rsync
->new($rsync_options);
1820 $self->_rsync($rsync);
1822 die "File::Rsync required for rsync operations. Cannot continue";
1828 =head2 (void) $obj->register_rsync_error(@err)
1830 =head2 (void) $obj->un_register_rsync_error()
1832 Register_rsync_error is called whenever the File::Rsync object fails
1833 on an exec (say, connection doesn't succeed). It issues a warning and
1834 sleeps for an increasing amount of time. Un_register_rsync_error
1835 resets the error count. See also accessor C<max_rsync_errors>.
1840 my $no_success_count = 0;
1841 my $no_success_time = 0;
1842 sub register_rsync_error
{
1843 my($self, @err) = @_;
1845 $no_success_time = time;
1846 $no_success_count++;
1847 my $max_rsync_errors = $self->max_rsync_errors;
1848 $max_rsync_errors = MAX_INT
unless defined $max_rsync_errors;
1849 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1855 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1861 my $sleep = 12 * $no_success_count;
1862 $sleep = 300 if $sleep > 300;
1867 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1868 scalar(localtime($no_success_time)),
1875 sub un_register_rsync_error
{
1877 $no_success_time = 0;
1878 $no_success_count = 0;
1882 =head2 $clone = $obj->_sparse_clone
1884 Clones just as much from itself that it does not hurt. Experimental
1887 Note: what fits better: sparse or shallow? Other suggestions?
1893 my $new = bless {}, ref $self;
1902 ignore_link_stat_errors
1904 max_files_per_connection
1908 sleep_per_connection
1913 $o = Storable
::dclone
$o if ref $o;
1919 =head2 $boolean = OBJ->ttl_reached ()
1925 my $have_mirrored = $self->have_mirrored || 0;
1926 my $now = Time
::HiRes
::time;
1927 my $ttl = $self->ttl;
1928 $ttl = 24.2 unless defined $ttl;
1929 if ($now > $have_mirrored + $ttl) {
1935 =head2 (void) $obj->unlock()
1937 Unlocking is implemented with an C<rmdir> on a locking directory
1938 (C<.lock> appended to $rfile).
1944 return unless $self->_is_locked;
1945 my $rfile = $self->rfile;
1946 unlink "$rfile.lock/process" or warn "Could not unlink lockfile '$rfile.lock/process': $!";
1947 rmdir "$rfile.lock" or warn "Could not rmdir lockdir '$rfile.lock': $!";;
1948 $self->_is_locked (0);
1953 Sets this recentfile in the state of not 'seeded'.
1961 =head2 $ret = $obj->update ($path, $type)
1963 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1965 =head2 $ret = $obj->update ()
1967 Enter one file into the local I<recentfile>. $path is the (usually
1968 absolute) path. If the path is outside I<our> tree, then it is
1971 C<$type> is one of C<new> or C<delete>.
1973 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1974 not used and the epoch is calculated by the update() routine itself
1975 based on current time. But if there is the demand to insert a
1976 not-so-current file into the dataset, then the caller sets
1977 $dirty_epoch. This causes the epoch of the registered event to become
1978 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1979 bit more. As compensation the dirtymark of the whole dataset is set to
1980 now or the current epoch, whichever is higher. Note: setting the
1981 dirty_epoch to the future is prohibited as it's very unlikely to be
1982 intended: it definitely might wreak havoc with the index files.
1984 The new file event is unshifted (or, if dirty_epoch is set, inserted
1985 at the place it belongs to, according to the rule to have a sequence
1986 of strictly decreasing timestamps) to the array of recent_events and
1987 the array is shortened to the length of the timespan allowed. This is
1988 usually the timespan specified by the interval of this recentfile but
1989 as long as this recentfile has not been merged to another one, the
1990 timespan may grow without bounds.
1992 The third form runs an update without inserting a new file. This may
1993 be desired to truncate a recentfile.
1996 sub _epoch_monotonically_increasing
{
1997 my($self,$epoch,$recent) = @_;
1998 return $epoch unless @
$recent; # the first one goes unoffended
1999 if (_bigfloatgt
("".$epoch,$recent->[0]{epoch
})) {
2002 return _increase_a_bit
($recent->[0]{epoch
});
2006 my($self,$path,$type,$dirty_epoch) = @_;
2007 if (defined $path or defined $type or defined $dirty_epoch) {
2008 die "update called without path argument" unless defined $path;
2009 die "update called without type argument" unless defined $type;
2010 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
2013 my $ctx = $self->_locked_batch_update([{path
=>$path,type
=>$type,epoch
=>$dirty_epoch}]);
2014 $self->write_recent($ctx->{recent
}) if $ctx->{something_done
};
2015 $self->_assert_symlink;
2019 =head2 $obj->batch_update($batch)
2021 Like update but for many files. $batch is an arrayref containing
2022 hashrefs with the structure
2034 my($self,$batch) = @_;
2036 my $ctx = $self->_locked_batch_update($batch);
2037 $self->write_recent($ctx->{recent
}) if $ctx->{something_done
};
2038 $self->_assert_symlink;
2041 sub _locked_batch_update
{
2042 my($self,$batch) = @_;
2043 my $something_done = 0;
2044 my $recent = $self->recent_events;
2045 unless ($recent->[0]) {
2047 $something_done = 1;
2049 my %paths_in_recent = map { $_->{path
} => undef } @
$recent;
2050 my $interval = $self->interval;
2051 my $canonmeth = $self->canonize;
2052 unless ($canonmeth) {
2053 $canonmeth = "naive_path_normalize";
2055 my $oldest_allowed = 0;
2056 my $setting_new_dirty_mark = 0;
2058 if ($self->verbose && @
$batch > 1) {
2059 eval {require Time
::Progress
};
2060 warn "dollarat[$@]" if $@
;
2062 $console = new Time
::Progress
;
2063 $console->attr( min
=> 1, max
=> scalar @
$batch );
2068 ITEM
: for my $item (sort {($b->{epoch
}||0) <=> ($a->{epoch
}||0)} @
$batch) {
2070 print $console->report( "\rdone %p elapsed: %L (%l sec), ETA %E (%e sec)", $i ) if $console and not $i % 50;
2071 my $ctx = $self->_update_batch_item($item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,\
%paths_in_recent,$memo_splicepos);
2072 $something_done = $ctx->{something_done
};
2073 $oldest_allowed = $ctx->{oldest_allowed
};
2074 $setting_new_dirty_mark = $ctx->{setting_new_dirty_mark
};
2075 $recent = $ctx->{recent
};
2076 $memo_splicepos = $ctx->{memo_splicepos
};
2078 print "\n" if $console;
2079 if ($setting_new_dirty_mark) {
2080 $oldest_allowed = 0;
2082 TRUNCATE
: while (@
$recent) {
2083 if (_bigfloatlt
($recent->[-1]{epoch
}, $oldest_allowed)) {
2085 $something_done = 1;
2090 return {something_done
=>$something_done,recent
=>$recent};
2092 sub _update_batch_item
{
2093 my($self,$item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,$paths_in_recent,$memo_splicepos) = @_;
2094 my($path,$type,$dirty_epoch) = @
{$item}{qw(path type epoch)};
2095 if (defined $path or defined $type or defined $dirty_epoch) {
2096 $path = $self->$canonmeth($path);
2098 # you must calculate the time after having locked, of course
2099 my $now = Time
::HiRes
::time;
2102 if (defined $dirty_epoch && _bigfloatgt
($now,$dirty_epoch)) {
2103 $epoch = $dirty_epoch;
2105 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
2108 my $merged = $self->merged;
2109 if ($merged->{epoch
} && !$setting_new_dirty_mark) {
2110 my $virtualnow = _bigfloatmax
($now,$epoch);
2111 # for the lower bound I think we need no big math, we calc already
2112 my $secs = $self->interval_secs();
2113 $oldest_allowed = min
($virtualnow - $secs, $merged->{epoch
}, $epoch);
2115 # as long as we are not merged at all, no limits!
2117 my $lrd = $self->localroot;
2118 if (defined $path && $path =~ s
|^\Q
$lrd\E
||) {
2121 # remove the older duplicates of this $path, irrespective of $type:
2122 if (defined $dirty_epoch) {
2123 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch,$paths_in_recent,$memo_splicepos);
2124 $recent = $ctx->{recent
};
2125 $splicepos = $ctx->{splicepos
};
2126 $epoch = $ctx->{epoch
};
2127 my $dirtymark = $self->dirtymark;
2129 if (_bigfloatgt
($epoch, $now)) { # just in case we had to increase it
2132 $self->dirtymark($new_dm);
2133 $setting_new_dirty_mark = 1;
2134 if (not defined $merged->{epoch
} or _bigfloatlt
($epoch,$merged->{epoch
})) {
2138 $recent = [ grep { $_->{path
} ne $path } @
$recent ];
2141 if (defined $splicepos) {
2142 splice @
$recent, $splicepos, 0, { epoch
=> $epoch, path
=> $path, type
=> $type };
2143 $paths_in_recent->{$path} = undef;
2145 $memo_splicepos = $splicepos;
2146 $something_done = 1;
2150 something_done
=> $something_done,
2151 oldest_allowed
=> $oldest_allowed,
2152 setting_new_dirty_mark
=> $setting_new_dirty_mark,
2154 memo_splicepos
=> $memo_splicepos,
2157 sub _update_with_dirty_epoch
{
2158 my($self,$path,$recent,$epoch,$paths_in_recent,$memo_splicepos) = @_;
2160 my $new_recent = [];
2161 if (exists $paths_in_recent->{$path}) {
2163 KNOWN_EVENT
: for my $i (0..$#$recent) {
2164 if ($recent->[$i]{path
} eq $path) {
2165 if ($recent->[$i]{epoch
} eq $epoch) {
2171 push @
$new_recent, $recent->[$i];
2174 @
$recent = @
$new_recent unless $cancel;
2176 if (!exists $recent->[0] or _bigfloatgt
($epoch,$recent->[0]{epoch
})) {
2178 } elsif (_bigfloatlt
($epoch,$recent->[-1]{epoch
})) {
2179 $splicepos = @
$recent;
2182 if (_bigfloatgt
($memo_splicepos<=$#$recent && $epoch, $recent->[$memo_splicepos]{epoch})) {
2185 $startingpoint = $memo_splicepos;
2187 RECENT
: for my $i ($startingpoint..$#$recent) {
2188 my $ev = $recent->[$i];
2189 if ($epoch eq $recent->[$i]{epoch
}) {
2190 $epoch = _increase_a_bit
($epoch, $i ?
$recent->[$i-1]{epoch
} : undef);
2192 if (_bigfloatgt
($epoch,$recent->[$i]{epoch
})) {
2200 splicepos
=> $splicepos,
2207 Sets this recentfile in the state of 'seeded' which means it has to
2208 re-evaluate its uptodateness.
2218 Tells if the recentfile is in the state 'seeded'.
2222 my($self, $set) = @_;
2224 $self->_seeded ($set);
2226 my $x = $self->_seeded;
2227 unless (defined $x) {
2229 $self->_seeded ($x);
2236 True if this object has mirrored the complete interval covered by the
2244 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2248 # it's too easy to misconfigure ttl and related timings and then
2249 # never reach uptodateness, so disabled 2009-03-22
2250 if (0 and not defined $uptodate) {
2251 if ($self->ttl_reached){
2252 $why = "ttl_reached returned true, so we are not uptodate";
2256 unless (defined $uptodate) {
2257 # look if recentfile has unchanged timestamp
2258 my $minmax = $self->minmax;
2259 if (exists $minmax->{mtime
}) {
2260 my $rfile = $self->_my_current_rfile;
2261 my @stat = stat $rfile;
2263 my $mtime = $stat[9];
2264 if (defined $mtime && defined $minmax->{mtime
} && $mtime > $minmax->{mtime
}) {
2265 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2268 my $covered = $self->done->covered(@
$minmax{qw(max min)});
2269 $why = sprintf "minmax covered[%s], so we return that", defined $covered ?
$covered : "UNDEF";
2270 $uptodate = $covered;
2274 $why = "Could not stat '$rfile': $!";
2280 unless (defined $uptodate) {
2281 $why = "fallthrough, so not uptodate";
2285 $self->_uptodateness_ever_reached(1);
2289 uptodate
=> $uptodate,
2292 $self->_remember_last_uptodate_call($remember);
2296 =head2 $obj->write_recent ($recent_files_arrayref)
2298 Writes a I<recentfile> based on the current reflection of the current
2299 state of the tree limited by the current interval.
2304 @
{$_[1]} = sort { _bigfloatcmp
($b->{epoch
},$a->{epoch
}) } @
{$_[1]};
2308 my ($self,$recent) = @_;
2309 die "write_recent called without argument" unless defined $recent;
2311 SANITYCHECK
: for my $i (0..$#$recent) {
2312 if (defined($Last_epoch) and _bigfloatge
($recent->[$i]{epoch
},$Last_epoch)) {
2314 Carp
::confess
(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2315 $recent->[$i]{epoch
}, $Last_epoch, $self->interval);
2317 # $self->_resort($recent);
2320 $Last_epoch = $recent->[$i]{epoch
};
2322 my $minmax = $self->minmax;
2323 if (!defined $minmax->{max
} || _bigfloatlt
($minmax->{max
},$recent->[0]{epoch
})) {
2324 $minmax->{max
} = @
$recent && exists $recent->[0]{epoch
} ?
$recent->[0]{epoch
} : undef;
2326 if (!defined $minmax->{min
} || _bigfloatlt
($minmax->{min
},$recent->[-1]{epoch
})) {
2327 $minmax->{min
} = @
$recent && exists $recent->[-1]{epoch
} ?
$recent->[-1]{epoch
} : undef;
2329 $self->minmax($minmax);
2330 my $meth = sprintf "write_%d", $self->protocol;
2331 $self->$meth($recent);
2334 =head2 $obj->write_0 ($recent_files_arrayref)
2336 Delegate of C<write_recent()> on protocol 0
2341 my ($self,$recent) = @_;
2342 my $rfile = $self->rfile;
2343 YAML
::Syck
::DumpFile
("$rfile.new",$recent);
2344 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2347 =head2 $obj->write_1 ($recent_files_arrayref)
2349 Delegate of C<write_recent()> on protocol 1
2354 my ($self,$recent) = @_;
2355 my $rfile = $self->rfile;
2356 my $suffix = $self->serializer_suffix;
2358 meta
=> $self->meta_data,
2362 if ($suffix eq ".yaml") {
2363 $serialized = YAML
::Syck
::Dump
($data);
2364 } elsif ($HAVE->{"Data::Serializer"}) {
2365 my $serializer = Data
::Serializer
->new
2366 ( serializer
=> $serializers{$suffix} );
2367 $serialized = $serializer->raw_serialize($data);
2369 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2371 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2372 print $fh $serialized;
2373 close $fh or die "Could not close '$rfile.new': $!";
2374 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2378 my $nq = qr/[^"]+/; # non-quotes
2380 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2384 The following suffixes are supported and trigger the use of these
2389 =item C<< ".yaml" => "YAML::Syck" >>
2391 =item C<< ".json" => "JSON" >>
2393 =item C<< ".sto" => "Storable" >>
2395 =item C<< ".dd" => "Data::Dumper" >>
2403 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2405 =head1 INTERVAL SPEC
2407 An interval spec is a primitive way to express time spans. Normally it
2408 is composed from an integer and a letter.
2410 As a special case, a string that consists only of the single letter
2411 C<Z>, stands for MAX_INT seconds.
2413 The following letters express the specified number of seconds:
2419 =item C<< m => 60 >>
2421 =item C<< h => 60*60 >>
2423 =item C<< d => 60*60*24 >>
2425 =item C<< W => 60*60*24*7 >>
2427 =item C<< M => 60*60*24*30 >>
2429 =item C<< Q => 60*60*24*90 >>
2431 =item C<< Y => 60*60*24*365.25 >>
2439 L<File::Rsync::Mirror::Recent>,
2440 L<File::Rsync::Mirror::Recentfile::Done>,
2441 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2445 Please report any bugs or feature requests through the web interface
2447 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2448 I will be notified, and then you'll automatically be notified of
2449 progress on your bug as I make changes.
2453 Memory hungry: it seems all memory is allocated during the initial
2454 rsync where a list of all files is maintained in memory.
2458 You can find documentation for this module with the perldoc command.
2460 perldoc File::Rsync::Mirror::Recentfile
2462 You can also look for information at:
2466 =item * RT: CPAN's request tracker
2468 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2470 =item * AnnoCPAN: Annotated CPAN documentation
2472 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2474 =item * CPAN Ratings
2476 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2480 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2485 =head1 ACKNOWLEDGEMENTS
2487 Thanks to RJBS for module-starter.
2493 =head1 COPYRIGHT & LICENSE
2495 Copyright 2008,2009 Andreas König.
2497 This program is free software; you can redistribute it and/or modify it
2498 under the same terms as Perl itself.
2503 1; # End of File::Rsync::Mirror::Recentfile
2507 # cperl-indent-level: 4