1 package File
::Rsync
::Mirror
::Recentfile
;
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
19 $HAVE->{$package} = eval qq{ require $package; };
22 use File
::Basename
qw(basename dirname fileparse);
23 use File
::Copy
qw(cp);
24 use File
::Path
qw(mkpath);
25 use File
::Rsync
::Mirror
::Recentfile
::FakeBigFloat
qw(:all);
27 use List
::Util
qw(first max min);
28 use Scalar
::Util
qw(reftype);
33 use version
; our $VERSION = qv
('0.0.8');
35 use constant MAX_INT
=> ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL
=> 1;
41 # maybe subclass if this mapping is bad?
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
65 localroot => "/home/ftp/pub/PAUSE/authors",
67 remote_host => "pause.perl.org",
68 remote_module => "authors",
71 'rsync-path' => '/usr/bin/rsync',
74 'omit-dir-times' => 1,
81 Aggregator (usually the writer):
83 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
88 Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
89 is always composed of several recentfiles, controlled by the
90 F:R:M:Recent object. The Recentfile object has to do the bookkeeping
91 for a single timeslice.
97 =head1 CONSTRUCTORS / DESTRUCTOR
99 =head2 my $obj = CLASS->new(%hash)
101 Constructor. On every argument pair the key is a method name and the
102 value is an argument to that method name.
104 If a recentfile for this resource already exists, metadata that are
105 not defined by the constructor will be fetched from there as soon as
106 it is being read by recent_events().
111 my($class, @args) = @_;
112 my $self = bless {}, $class;
114 my($method,$arg) = splice @args, 0, 2;
115 $self->$method($arg);
117 unless (defined $self->protocol) {
118 $self->protocol(DEFAULT_PROTOCOL
);
120 unless (defined $self->filenameroot) {
121 $self->filenameroot("RECENT");
123 unless (defined $self->serializer_suffix) {
124 $self->serializer_suffix(".yaml");
129 =head2 my $obj = CLASS->new_from_file($file)
131 Constructor. $file is a I<recentfile>.
136 my($class, $file) = @_;
137 my $self = bless {}, $class;
138 $self->_rfile($file);
140 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
144 # XXX: we can skip this step when the metadata are sufficient, but
145 # we cannot parse the file without some magic stuff about
148 my($name,$path) = fileparse
$file;
149 my $symlink = readlink $file;
150 if ($symlink =~ m
|/|) {
151 die "FIXME: filenames containing '/' not supported, got $symlink";
153 $file = File
::Spec
->catfile ( $path, $symlink );
155 my($name,$path,$suffix) = fileparse
$file, keys %serializers;
156 $self->serializer_suffix($suffix);
157 $self->localroot($path);
158 die "Could not determine file format from suffix" unless $suffix;
160 if ($suffix eq ".yaml") {
162 $deserialized = YAML
::Syck
::LoadFile
($file);
163 } elsif ($HAVE->{"Data::Serializer"}) {
164 my $serializer = Data
::Serializer
->new
165 ( serializer
=> $serializers{$suffix} );
166 $deserialized = $serializer->raw_deserialize($serialized);
168 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
170 while (my($k,$v) = each %{$deserialized->{meta
}}) {
171 next if $k ne lc $k; # "Producers"
174 unless (defined $self->protocol) {
175 $self->protocol(DEFAULT_PROTOCOL
);
188 unless ($self->_current_tempfile_fh) {
189 if (my $tempfile = $self->_current_tempfile) {
191 # unlink $tempfile; # may fail in global destruction
206 "_current_tempfile_fh",
207 "_delayed_operations",
214 "_remember_last_uptodate_call",
219 "__verified_tempdir",
221 "_uptodateness_ever_reached",
226 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
232 A list of interval specs that tell the aggregator which I<recentfile>s
237 The name of a method to canonize the path before rsyncing. Only
238 supported value is C<naive_path_normalize>. Defaults to that.
242 A comment about this tree and setup.
246 A timestamp. The dirtymark is updated whenever an out of band change
247 on the origin server is performed that violates the protocol. Say,
248 they add or remove files in the middle somewhere. Slaves must react
249 with a devaluation of their C<done> structure which then leads to a
250 full re-sync of all files. Implementation note: dirtymark may increase
255 The (prefix of the) filename we use for this I<recentfile>. Defaults to
256 C<RECENT>. The string must not contain a directory separator.
260 Timestamp remembering when we mirrored this recentfile the last time.
261 Only relevant for slaves.
263 =item ignore_link_stat_errors
265 If set to true, rsync errors are ignored that complain about link stat
266 errors. These seem to happen only when there are files missing at the
267 origin. In race conditions this can always happen, so it defaults to
272 If set to true, this object will fetch a new recentfile from remote
273 when the timespan between the last mirror (see have_mirrored) and now
274 is too large (see C<ttl>).
276 =item keep_delete_objects_forever
278 The default for delete events is that they are passed through the
279 collection of recentfile objects until they reach the Z file. There
280 they get dropped so that the associated file object ceases to exist at
281 all. By setting C<keep_delete_objects_forever> the delete objects are
282 kept forever. This makes the Z file larger but has the advantage that
283 slaves that have interrupted mirroring for a long time still can clean
288 After how many seconds shall we die if we cannot lock a I<recentfile>?
289 Defaults to 600 seconds.
293 When mirror_loop is called, this accessor can specify how much time
294 every loop shall at least take. If the work of a loop is done before
295 that time has gone, sleeps for the rest of the time. Defaults to
296 arbitrary 42 seconds.
298 =item max_files_per_connection
300 Maximum number of files that are transferred on a single rsync call.
301 Setting it higher means higher performance at the price of holding
302 connections longer and potentially disturbing other users in the pool.
303 Defaults to the arbitrary value 42.
305 =item max_rsync_errors
307 When rsync operations encounter that many errors without any resetting
308 success in between, then we die. Defaults to unlimited. A value of
309 -1 means we run forever ignoring all rsync errors.
313 Hashref remembering when we read the recent_events from this file the
314 last time and what the timespan was.
318 When the RECENT file format changes, we increment the protocol. We try
319 to support older protocols in later releases.
323 The host we are mirroring from. Leave empty for the local filesystem.
327 Rsync servers have so called modules to separate directory trees from
328 each other. Put here the name of the module under which we are
329 mirroring. Leave empty for local filesystem.
333 Things like compress, links, times or checksums. Passed in to the
334 File::Rsync object used to run the mirror.
336 =item serializer_suffix
338 Mostly untested accessor. The only well tested format for
339 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
340 Data::Serializer. But in principle other formats are supported as
341 well. See section SERIALIZERS below.
343 =item sleep_per_connection
345 Sleep that many seconds (floating point OK) after every chunk of rsyncing
346 has finished. Defaults to arbitrary 0.42.
350 Directory to write temporary files to. Must allow rename operations
351 into the tree which usually means it must live on the same partition
352 as the target directory. Defaults to C<< $self->localroot >>.
356 Time to live. Number of seconds after which this recentfile must be
357 fetched again from the origin server. Only relevant for slaves.
358 Defaults to arbitrary 24.2 seconds.
362 Boolean to turn on a bit verbosity.
366 Path to the logfile to write verbose progress information to. This is
367 a primitive stop gap solution to get simple verbose logging working.
368 Switching to Log4perl or similar is probably the way to go.
374 use accessors
@accessors;
378 =head2 (void) $obj->aggregate( %options )
380 Takes all intervals that are collected in the accessor called
381 aggregator. Sorts them by actual length of the interval.
382 Removes those that are shorter than our own interval. Then merges this
383 object into the next larger object. The merging continues upwards
384 as long as the next I<recentfile> is old enough to warrant a merge.
386 If a merge is warranted is decided according to the interval of the
387 previous interval so that larger files are not so often updated as
388 smaller ones. If $options{force} is true, all files get updated.
390 Here is an example to illustrate the behaviour. Given aggregators
396 1h updates 1d on every call to aggregate()
397 1d updates 1W earliest after 1h
398 1W updates 1M earliest after 1d
399 1M updates 1Q earliest after 1W
400 1Q updates 1Y earliest after 1M
401 1Y updates Z earliest after 1Q
403 Note that all but the smallest recentfile get updated at an arbitrary
404 rate and as such are quite useless on their own.
409 my($self, %option) = @_;
411 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
412 grep { !$seen_interval{$_->{interval
}}++ && $_->{secs
} >= $self->interval_secs }
413 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
414 $self->interval, @
{$self->aggregator || []};
416 $aggs[0]{object
} = $self;
417 AGGREGATOR
: for my $i (0..$#aggs-1) {
418 my $this = $aggs[$i]{object
};
419 my $next = $this->_sparse_clone;
420 $next->interval($aggs[$i+1]{interval
});
422 if ($option{force
} || $i == 0) {
425 my $next_rfile = $next->rfile;
426 if (-e
$next_rfile) {
427 my $prev = $aggs[$i-1]{object
};
429 my $next_age = 86400 * -M
$next_rfile;
430 if ($next_age > $prev->interval_secs) {
439 $aggs[$i+1]{object
} = $next;
446 # collect file size and mtime for all files of this aggregate
447 sub _debug_aggregate
{
449 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
450 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
451 $self->interval, @
{$self->aggregator || []};
453 for my $i (0..$#aggs) {
454 my $this = Storable
::dclone
$self;
455 $this->interval($aggs[$i]{interval
});
456 my $rfile = $this->rfile;
457 my @stat = stat $rfile;
458 push @
$report, {rfile
=> $rfile, size
=> $stat[7], mtime
=> $stat[9]};
463 # (void) $self->_assert_symlink()
464 sub _assert_symlink
{
466 my $recentrecentfile = File
::Spec
->catfile
475 if ($Config{d_symlink
} eq "define") {
476 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
477 if (-l
$recentrecentfile) {
478 my $found_symlink = readlink $recentrecentfile;
479 if ($found_symlink eq $self->rfilename) {
482 $howto_create_symlink = 2;
485 $howto_create_symlink = 1;
487 if (1 == $howto_create_symlink) {
488 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
490 unlink "$recentrecentfile.$$"; # may fail
491 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
492 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
495 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
496 unlink "$recentrecentfile.$$"; # may fail
497 cp
$self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
498 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
502 =head2 $hashref = $obj->delayed_operations
504 A hash of hashes containing unlink and rmdir operations which had to
505 wait until the recentfile got unhidden in order to not confuse
506 downstream mirrors (in case we have some).
510 sub delayed_operations
{
512 my $x = $self->_delayed_operations;
513 unless (defined $x) {
518 $self->_delayed_operations ($x);
523 =head2 $done = $obj->done
525 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
526 object that keeps track of rsync activities. Only needed and used when
527 we are a mirroring slave.
533 my $done = $self->_done;
535 require File
::Rsync
::Mirror
::Recentfile
::Done
;
536 $done = File
::Rsync
::Mirror
::Recentfile
::Done
->new();
537 $done->_rfinterval ($self->interval);
538 $self->_done ( $done );
543 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
545 Stores the remote I<recentfile> locally as a tempfile. The caller is
546 responsible to remove the file after use.
548 Note: if you're intending to act as an rsync server for other slaves,
549 then you must prefer this method to fetch that file with
550 get_remotefile(). Otherwise downstream mirrors would expect you to
551 already have mirrored all the files that are in the I<recentfile>
552 before you have them mirrored.
556 sub get_remote_recentfile_as_tempfile
{
558 mkpath
$self->localroot;
561 if ( $self->_use_tempfile() ) {
562 if ($self->ttl_reached) {
563 $fh = $self->_current_tempfile_fh;
564 $trfilename = $self->rfilename;
566 return $self->_current_tempfile;
569 $trfilename = $self->rfilename;
574 $dst = $self->_current_tempfile;
576 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
577 $dst = $fh->filename;
578 $self->_current_tempfile ($dst);
579 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
580 if (defined $rfile && -e
$rfile) {
581 # saving on bandwidth. Might need to be configurable
582 # $self->bandwidth_is_cheap?
583 cp
$rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
590 if ($self->verbose) {
591 my $doing = -e
$dst ?
"Sync" : "Get";
592 my $display_dst = join "/", "...", basename
(dirname
($dst)), basename
($dst);
593 my $LFH = $self->_logfilehandle;
596 "%-4s %d (1/1/%s) temp %s ... ",
605 local($ENV{LANG
}) = "C";
606 while (!$self->rsync->exec(
610 $self->register_rsync_error ($self->rsync->err);
611 if (++$retried >= 3) {
612 warn "XXX giving up";
618 my $LFH = $self->_logfilehandle;
619 printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
621 $self->_refresh_internals ($dst);
622 $self->have_mirrored (Time
::HiRes
::time);
623 $self->un_register_rsync_error ();
626 if ($self->verbose) {
627 my $LFH = $self->_logfilehandle;
631 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
635 sub _verified_tempdir
{
637 my $tempdir = $self->__verified_tempdir();
638 return $tempdir if defined $tempdir;
639 unless ($tempdir = $self->tempdir) {
640 $tempdir = $self->localroot;
642 unless (-d
$tempdir) {
645 $self->__verified_tempdir($tempdir);
649 sub _get_remote_rat_provide_tempfile_object
{
650 my($self, $trfilename) = @_;
651 my $_verified_tempdir = $self->_verified_tempdir;
652 my $fh = File
::Temp
->new
653 (TEMPLATE
=> sprintf(".FRMRecent-%s-XXXX",
656 DIR
=> $_verified_tempdir,
657 SUFFIX
=> $self->serializer_suffix,
658 UNLINK
=> $self->_use_tempfile,
661 my $dst = $fh->filename;
662 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
663 if ($self->_use_tempfile) {
664 $self->_current_tempfile_fh ($fh); # delay self destruction
672 if (my $vl = $self->verboselog) {
673 open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
680 =head2 $localpath = $obj->get_remotefile ( $relative_path )
682 Rsyncs one single remote file to local filesystem.
684 Note: no locking is done on this file. Any number of processes may
687 Note II: do not use for recentfiles. If you are a cascading
688 slave/server combination, it would confuse other slaves. They would
689 expect the contents of these recentfiles to be available. Use
690 get_remote_recentfile_as_tempfile() instead.
695 my($self, $path) = @_;
696 my $dst = File
::Spec
->catfile($self->localroot, $path);
698 if ($self->verbose) {
699 my $doing = -e
$dst ?
"Sync" : "Get";
700 my $LFH = $self->_logfilehandle;
703 "%-4s %d (1/1/%s) %s ... ",
710 local($ENV{LANG
}) = "C";
711 my $remoteroot = $self->remoteroot or die "Alert: missing remoteroot. Cannot continue";
712 while (!$self->rsync->exec(
718 $self->register_rsync_error ($self->rsync->err);
720 $self->un_register_rsync_error ();
721 if ($self->verbose) {
722 my $LFH = $self->_logfilehandle;
728 =head2 $obj->interval ( $interval_spec )
730 Get/set accessor. $interval_spec is a string and described below in
731 the section INTERVAL SPEC.
736 my ($self, $interval) = @_;
738 $self->_interval($interval);
739 $self->_rfile(undef);
741 $interval = $self->_interval;
742 unless (defined $interval) {
743 # do not ask the $self too much, it recurses!
745 Carp
::confess
("Alert: interval undefined for '".$self."'. Cannot continue.");
750 =head2 $secs = $obj->interval_secs ( $interval_spec )
752 $interval_spec is described below in the section INTERVAL SPEC. If
753 empty defaults to the inherent interval for this object.
758 my ($self, $interval) = @_;
759 $interval ||= $self->interval;
760 unless (defined $interval) {
761 die "interval_secs() called without argument on an object without a declared one";
763 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
764 die "Could not determine seconds from interval[$interval]";
765 if ($interval eq "Z") {
767 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
768 return $seconds{$t}*$n;
770 die "Invalid interval specification: n[$n]t[$t]";
774 =head2 $obj->localroot ( $localroot )
776 Get/set accessor. The local root of the tree.
781 my ($self, $localroot) = @_;
783 $self->_localroot($localroot);
784 $self->_rfile(undef);
786 $localroot = $self->_localroot;
789 =head2 $ret = $obj->local_path($path_found_in_recentfile)
791 Combines the path to our local mirror and the path of an object found
792 in this I<recentfile>. In other words: the target of a mirror operation.
794 Implementation note: We split on slashes and then use
795 File::Spec::catfile to adjust to the local operating system.
800 my($self,$path) = @_;
801 unless (defined $path) {
802 # seems like a degenerated case
803 return $self->localroot;
805 my @p = split m
|/|, $path;
806 File
::Spec
->catfile($self->localroot,@p);
809 =head2 (void) $obj->lock
811 Locking is implemented with an C<mkdir> on a locking directory
812 (C<.lock> appended to $rfile).
818 # not using flock because it locks on filehandles instead of
819 # old school ressources.
820 my $locked = $self->_is_locked and return;
821 my $rfile = $self->rfile;
822 # XXX need a way to allow breaking the lock
824 my $locktimeout = $self->locktimeout || 600;
826 my $lockdir = "$rfile.lock";
827 my $procfile = "$lockdir/process";
828 GETLOCK
: while (not mkdir $lockdir) {
829 if (open my $fh, "<", $procfile) {
830 chomp(my $process = <$fh>);
832 } elsif ($$ == $process) {
834 } elsif (kill 0, $process) {
835 warn "Warning: process $process holds a lock in '$lockdir', waiting..." unless $have_warned{$process}++;
837 warn "Warning: breaking lock held by process $process";
842 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown
}++;
844 Time
::HiRes
::sleep 0.01;
845 if (time - $start > $locktimeout) {
846 die "Could not acquire lockdirectory '$rfile.lock': $!";
849 open my $fh, ">", $procfile or die "Could not open >$procfile\: $!";
851 close $fh or die "Could not close: $!";
852 $self->_is_locked (1);
855 =head2 (void) $obj->merge ($other)
857 Bulk update of this object with another one. It's used to merge a
858 smaller and younger $other object into the current one. If this file
859 is a C<Z> file, then we normally do not merge in objects of type
860 C<delete>; this can be overridden by setting
861 keep_delete_objects_forever. But if we encounter an object of type
862 delete we delete the corresponding C<new> object if we have it.
864 If there is nothing to be merged, nothing is done.
869 my($self, $other) = @_;
870 $self->_merge_sanitycheck ( $other );
872 my $other_recent = $other->recent_events || [];
874 $self->_merge_locked ( $other, $other_recent );
880 my($self, $other, $other_recent) = @_;
881 my $my_recent = $self->recent_events || [];
883 # calculate the target time span
884 my $myepoch = $my_recent->[0] ?
$my_recent->[0]{epoch
} : undef;
885 my $epoch = $other_recent->[0] ?
$other_recent->[0]{epoch
} : $myepoch;
886 my $oldest_allowed = 0;
888 unless ($my_recent->[0]) {
893 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
896 } elsif (my $merged = $self->merged) {
897 my $secs = $self->interval_secs();
898 $oldest_allowed = min
($epoch - $secs, $merged->{epoch
}||0);
899 if (@
$other_recent and
900 _bigfloatlt
($other_recent->[-1]{epoch
}, $oldest_allowed)
902 $oldest_allowed = $other_recent->[-1]{epoch
};
905 while (@
$my_recent && _bigfloatlt
($my_recent->[-1]{epoch
}, $oldest_allowed)) {
912 my $other_recent_filtered = [];
913 for my $oev (@
$other_recent) {
914 my $oevepoch = $oev->{epoch
} || 0;
915 next if _bigfloatlt
($oevepoch, $oldest_allowed);
916 my $path = $oev->{path
};
917 next if $have_path{$path}++;
918 if ( $self->interval eq "Z"
919 and $oev->{type
} eq "delete"
920 and ! $self->keep_delete_objects_forever
924 if (!$myepoch || _bigfloatgt
($oevepoch, $myepoch)) {
927 push @
$other_recent_filtered, { epoch
=> $oev->{epoch
}, path
=> $path, type
=> $oev->{type
} };
930 if ($something_done) {
931 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \
%have_path, $epoch);
935 sub _merge_something_done
{
936 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
938 my $epoch_conflict = 0;
940 ZIP
: while (@
$other_recent_filtered || @
$my_recent) {
943 @
$other_recent_filtered && _bigfloatge
($other_recent_filtered->[0]{epoch
},$my_recent->[0]{epoch
})) {
944 $event = shift @
$other_recent_filtered;
946 $event = shift @
$my_recent;
947 next ZIP
if $have_path->{$event->{path
}}++;
949 $epoch_conflict=1 if defined $last_epoch && $event->{epoch
} eq $last_epoch;
950 $last_epoch = $event->{epoch
};
951 push @
$recent, $event;
953 if ($epoch_conflict) {
955 for (my $i = $#$recent;$i>=0;$i--) {
956 my $epoch = $recent->[$i]{epoch
};
957 if ($have_epoch{$epoch}++) {
958 while ($have_epoch{$epoch}) {
959 $epoch = _increase_a_bit
($epoch);
961 $recent->[$i]{epoch
} = $epoch;
962 $have_epoch{$epoch}++;
966 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
967 $self->dirtymark ( $other->dirtymark );
969 $self->write_recent($recent);
971 time => Time
::HiRes
::time, # not used anywhere
972 epoch
=> $recent->[0]{epoch
},
973 into_interval
=> $self->interval, # not used anywhere
975 $other->write_recent($other_recent);
978 sub _merge_sanitycheck
{
979 my($self, $other) = @_;
980 if ($self->interval_secs <= $other->interval_secs) {
985 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
986 $self->interval_secs,
987 $other->interval_secs,
994 Hashref denoting when this recentfile has been merged into some other
1000 my($self, $set) = @_;
1002 $self->_merged ($set);
1004 my $merged = $self->_merged;
1006 if ($merged and $into = $merged->{into_interval
} and defined $self->_interval) {
1008 if ($into eq $self->interval) {
1012 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
1016 } elsif ($self->interval_secs($into) < $self->interval_secs) {
1020 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
1021 $self->interval_secs($into),
1022 $self->interval_secs,
1030 =head2 $hashref = $obj->meta_data
1032 Returns the hashref of metadata that the server has to add to the
1039 my $ret = $self->{meta
};
1050 "serializer_suffix",
1057 # XXX need to reset the Producer if I am a writer, keep it when I
1059 $ret->{Producers
} ||= {
1060 __PACKAGE__
, "$VERSION", # stringified it looks better
1062 'time', Time
::HiRes
::time,
1064 $ret->{dirtymark
} ||= Time
::HiRes
::time;
1068 =head2 $success = $obj->mirror ( %options )
1070 Mirrors the files in this I<recentfile> as reported by
1071 C<recent_events>. Options named C<after>, C<before>, C<max> are passed
1072 through to the C<recent_events> call. The boolean option C<piecemeal>,
1073 if true, causes C<mirror> to only rsync C<max_files_per_connection>
1074 and keep track of the rsynced files so that future calls will rsync
1075 different files until all files are brought to sync.
1080 my($self, %options) = @_;
1081 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1082 $self->_use_tempfile (1);
1083 # skip-deletes is inadequat for passthrough within mirror. We
1084 # would never reach uptodateness when a delete were on a
1086 my %passthrough = map { ($_ => $options{$_}) } qw(before after max);
1087 my ($recent_events) = $self->recent_events(%passthrough);
1088 my(@error, @dlcollector); # download-collector: array containing paths we need
1090 my $last_item = $#$recent_events;
1091 my $done = $self->done;
1092 my $pathdb = $self->_pathdb;
1093 ITEM
: for my $i ($first_item..$last_item) {
1107 last if $i == $last_item;
1108 if ($status->{mustreturn
}){
1109 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1110 # looks like a bug somewhere else
1111 my $t = $self->_current_tempfile;
1112 unlink $t or die "Could not unlink '$t': $!";
1113 $self->_current_tempfile(undef);
1114 $self->_use_tempfile(0);
1120 my $success = eval { $self->_mirror_dlcollector (\
@dlcollector,$pathdb,$recent_events);};
1121 if (!$success || $@
) {
1122 warn "Warning: Unknown error while mirroring: $@";
1127 if ($self->verbose) {
1128 my $LFH = $self->_logfilehandle;
1129 print $LFH "DONE\n";
1131 # once we've gone to the end we consider ourselves free of obligations
1133 $self->_mirror_unhide_tempfile ($trecentfile);
1134 $self->_mirror_perform_delayed_ops(\
%options);
1150 my $recent_event = $recent_events->[$i];
1151 return if $done->covered ( $recent_event->{epoch
} );
1153 my $rec = $pathdb->{$recent_event->{path
}};
1154 if ($rec && $rec->{recentepoch
}) {
1156 ( $rec->{recentepoch
}, $recent_event->{epoch
} )){
1157 $done->register ($recent_events, [$i]);
1162 my $dst = $self->local_path($recent_event->{path
});
1163 if ($recent_event->{type
} eq "new"){
1164 $self->_mirror_item_new
1177 } elsif ($recent_event->{type
} eq "delete") {
1179 if ($options->{'skip-deletes'}) {
1180 $activity = "skipped";
1183 $activity = "not_found";
1184 } elsif (-l
$dst or not -d _
) {
1185 $self->delayed_operations->{unlink}{$dst}++;
1186 $activity = "deleted";
1188 $self->delayed_operations->{rmdir}{$dst}++;
1189 $activity = "deleted";
1192 $done->register ($recent_events, [$i]);
1194 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1197 warn "Warning: invalid upload type '$recent_event->{type}'";
1201 sub _mirror_item_new
{
1214 if ($self->verbose) {
1215 my $doing = -e
$dst ?
"Sync" : "Get";
1216 my $LFH = $self->_logfilehandle;
1219 "%-4s %d (%d/%d/%s) %s ... ",
1225 $recent_event->{path
},
1228 my $max_files_per_connection = $self->max_files_per_connection || 42;
1230 if ($self->verbose) {
1231 my $LFH = $self->_logfilehandle;
1234 push @
$dlcollector, { rev
=> $recent_event, i
=> $i };
1235 if (@
$dlcollector >= $max_files_per_connection) {
1236 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1237 my $sleep = $self->sleep_per_connection;
1238 $sleep = 0.42 unless defined $sleep;
1239 Time
::HiRes
::sleep $sleep;
1240 if ($options->{piecemeal
}) {
1241 $status->{mustreturn
} = 1;
1247 if (!$success || $@
) {
1248 warn "Warning: Error while mirroring: $@";
1252 if ($self->verbose) {
1253 my $LFH = $self->_logfilehandle;
1254 print $LFH "DONE\n";
1258 sub _mirror_dlcollector
{
1259 my($self,$xcoll,$pathdb,$recent_events) = @_;
1260 my $success = $self->mirror_path([map {$_->{rev
}{path
}} @
$xcoll]);
1262 $self->_mirror_register_path($pathdb,[map {$_->{rev
}} @
$xcoll],"rsync");
1264 $self->done->register($recent_events, [map {$_->{i
}} @
$xcoll]);
1269 sub _mirror_register_path
{
1270 my($self,$pathdb,$coll,$activity) = @_;
1272 for my $item (@
$coll) {
1273 $pathdb->{$item->{path
}} =
1275 recentepoch
=> $item->{epoch
},
1276 ($activity."_on") => $time,
1281 sub _mirror_unhide_tempfile
{
1282 my($self, $trecentfile) = @_;
1283 my $rfile = $self->rfile;
1284 if (rename $trecentfile, $rfile) {
1285 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1288 Carp
::confess
("Could not rename '$trecentfile' to '$rfile': $!");
1290 $self->_use_tempfile (0);
1291 if (my $ctfh = $self->_current_tempfile_fh) {
1292 $ctfh->unlink_on_destroy (0);
1293 $self->_current_tempfile_fh (undef);
1297 sub _mirror_perform_delayed_ops
{
1298 my($self,$options) = @_;
1299 my $delayed = $self->delayed_operations;
1300 for my $dst (keys %{$delayed->{unlink}}) {
1301 unless (unlink $dst) {
1303 Carp
::cluck
( "Warning: Error while unlinking '$dst': $!" ) if $options->{verbose
};
1305 if ($self->verbose) {
1307 my $LFH = $self->_logfilehandle;
1310 "%-4s %d (%s) %s DONE\n",
1316 delete $delayed->{unlink}{$dst};
1319 for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
1320 unless (rmdir $dst) {
1322 Carp
::cluck
( "Warning: Error on rmdir '$dst': $!" ) if $options->{verbose
};
1324 if ($self->verbose) {
1326 my $LFH = $self->_logfilehandle;
1329 "%-4s %d (%s) %s DONE\n",
1335 delete $delayed->{rmdir}{$dst};
1340 =head2 $success = $obj->mirror_path ( $arrref | $path )
1342 If the argument is a scalar it is treated as a path. The remote path
1343 is mirrored into the local copy. $path is the path found in the
1344 I<recentfile>, i.e. it is relative to the root directory of the
1347 If the argument is an array reference then all elements are treated as
1348 a path below the current tree and all are rsynced with a single
1349 command (and a single connection).
1354 my($self,$path) = @_;
1355 # XXX simplify the two branches such that $path is treated as
1356 # [$path] maybe even demand the argument as an arrayref to
1357 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1359 if (ref $path and ref $path eq "ARRAY") {
1360 my $dst = $self->localroot;
1361 mkpath dirname
$dst;
1362 my($fh) = File
::Temp
->new(TEMPLATE
=> sprintf(".%s-XXXX",
1363 lc $self->filenameroot,
1368 for my $p (@
$path) {
1372 $fh->unlink_on_destroy(1);
1375 local($ENV{LANG
}) = "C";
1376 while (!$self->rsync->exec
1382 'files-from' => $fh->filename,
1384 my(@err) = $self->rsync->err;
1385 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1386 if ($self->verbose) {
1387 my $LFH = $self->_logfilehandle;
1388 print $LFH "Info: ignoring link_stat error '@err'";
1392 $self->register_rsync_error (@err);
1393 if (++$retried >= 3) {
1394 my $batchsize = @
$path;
1395 warn "The number of rsync retries now reached 3 within a batch of size $batchsize. Error was '@err'. Giving up now, will retry later, ";
1402 $self->un_register_rsync_error ();
1405 my $dst = $self->local_path($path);
1406 mkpath dirname
$dst;
1407 local($ENV{LANG
}) = "C";
1408 while (!$self->rsync->exec
1416 my(@err) = $self->rsync->err;
1417 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1418 if ($self->verbose) {
1419 my $LFH = $self->_logfilehandle;
1420 print $LFH "Info: ignoring link_stat error '@err'";
1424 $self->register_rsync_error (@err);
1426 $self->un_register_rsync_error ();
1431 sub _my_ignore_link_stat_errors
{
1433 my $x = $self->ignore_link_stat_errors;
1434 $x = 1 unless defined $x;
1438 sub _my_current_rfile
{
1441 if ($self->_use_tempfile) {
1442 $rfile = $self->_current_tempfile;
1444 unless ($rfile && -s
$rfile) {
1445 $rfile = $self->rfile;
1450 =head2 $path = $obj->naive_path_normalize ($path)
1452 Takes an absolute unix style path as argument and canonicalizes it to
1453 a shorter path if possible, removing things like double slashes or
1454 C</./> and removes references to C<../> directories to get a shorter
1455 unambiguos path. This is used to make the code easier that determines
1456 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1460 sub naive_path_normalize
{
1461 my($self,$path) = @_;
1463 1 while $path =~ s
|/[^/]+/\.\./|/|;
1468 =head2 $ret = $obj->read_recent_1 ( $data )
1470 Delegate of C<recent_events()> on protocol 1
1475 my($self, $data) = @_;
1476 return $data->{recent
};
1479 =head2 $array_ref = $obj->recent_events ( %options )
1481 Note: the code relies on the resource being written atomically. We
1482 cannot lock because we may have no write access. If the caller has
1483 write access (eg. aggregate() or update()), it has to care for any
1484 necessary locking and it MUST write atomically.
1486 If C<$options{after}> is specified, only file events after this
1487 timestamp are returned.
1489 If C<$options{before}> is specified, only file events before this
1490 timestamp are returned.
1492 If C<$options{max}> is specified only a maximum of this many most
1493 recent events is returned.
1495 If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1498 If C<$options{contains}> is specified the value must be a hash
1499 reference containing a query. The query may contain the keys C<epoch>,
1500 C<path>, and C<type>. Each represents a condition that must be met. If
1501 there is more than one such key, the conditions are ANDed.
1503 If C<$options{info}> is specified, it must be a hashref. This hashref
1504 will be filled with metadata about the unfiltered recent_events of
1505 this object, in key C<first> there is the first item, in key C<last>
1511 my ($self, %options) = @_;
1512 my $info = $options{info
};
1513 if ($self->is_slave) {
1514 # XXX seems dubious, might produce tempfiles without removing them?
1515 $self->get_remote_recentfile_as_tempfile;
1517 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1518 -e
$rfile_or_tempfile or return [];
1519 my $suffix = $self->serializer_suffix;
1521 $self->_try_deserialize
1528 if ($err or !$data) {
1532 if (reftype
$data eq 'ARRAY') { # protocol 0
1535 $re = $self->_recent_events_protocol_x
1541 return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
1542 $self->_recent_events_handle_options ($re, \
%options);
1545 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1546 sub _recent_events_handle_options
{
1547 my($self, $re, $options) = @_;
1548 my $last_item = $#$re;
1549 my $info = $options->{info
};
1551 $info->{first
} = $re->[0];
1552 $info->{last} = $re->[-1];
1554 if (defined $options->{after
}) {
1555 if ($re->[0]{epoch
} > $options->{after
}) {
1558 {$re->[$_]{epoch
} <= $options->{after
}}
1568 if (defined $options->{before
}) {
1569 if ($re->[0]{epoch
} > $options->{before
}) {
1572 {$re->[$_]{epoch
} < $options->{before
}}
1581 if (0 != $first_item || -1 != $last_item) {
1582 @
$re = splice @
$re, $first_item, 1+$last_item-$first_item;
1584 if ($options->{'skip-deletes'}) {
1585 @
$re = grep { $_->{type
} ne "delete" } @
$re;
1587 if (my $contopt = $options->{contains
}) {
1588 my $seen_allowed = 0;
1589 for my $allow (qw(epoch path type)) {
1590 if (exists $contopt->{$allow}) {
1592 my $v = $contopt->{$allow};
1593 @
$re = grep { $_->{$allow} eq $v } @
$re;
1596 if (keys %$contopt > $seen_allowed) {
1599 (sprintf "unknown query: %s", join ", ", %$contopt);
1602 if ($options->{max
} && @
$re > $options->{max
}) {
1603 @
$re = splice @
$re, 0, $options->{max
};
1608 sub _recent_events_protocol_x
{
1613 my $meth = sprintf "read_recent_%d", $data->{meta
}{protocol
};
1614 # we may be reading meta for the first time
1615 while (my($k,$v) = each %{$data->{meta
}}) {
1616 if ($k ne lc $k){ # "Producers"
1617 $self->{ORIG
}{$k} = $v;
1620 next if defined $self->$k;
1623 my $re = $self->$meth ($data);
1625 if (my @stat = stat $rfile_or_tempfile) {
1626 $minmax = { mtime
=> $stat[9] };
1628 # defensive because ABH encountered:
1630 #### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
1631 #### Ydr_.yaml ... DONE
1632 #### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
1633 #### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
1634 #### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
1635 #### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
1636 #### gone already at cpan-pause.pl line 0
1638 my $LFH = $self->_logfilehandle;
1639 print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
1642 $minmax->{min
} = $re->[-1]{epoch
};
1643 $minmax->{max
} = $re->[0]{epoch
};
1645 $self->minmax ( $minmax );
1649 sub _try_deserialize
{
1654 if ($suffix eq ".yaml") {
1656 YAML
::Syck
::LoadFile
($rfile_or_tempfile);
1657 } elsif ($HAVE->{"Data::Serializer"}) {
1658 my $serializer = Data
::Serializer
->new
1659 ( serializer
=> $serializers{$suffix} );
1662 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1666 $serializer->raw_deserialize($serialized);
1668 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1672 sub _refresh_internals
{
1673 my($self, $dst) = @_;
1674 my $class = ref $self;
1675 my $rfpeek = $class->new_from_file ($dst);
1680 $self->$acc ( $rfpeek->$acc );
1682 my $old_dirtymark = $self->dirtymark;
1683 my $new_dirtymark = $rfpeek->dirtymark;
1684 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1686 $self->dirtymark ( $new_dirtymark );
1687 $self->_uptodateness_ever_reached(0);
1692 =head2 $ret = $obj->rfilename
1694 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1695 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1701 my $file = sprintf("%s-%s%s",
1702 $self->filenameroot,
1704 $self->serializer_suffix,
1709 =head2 $str = $self->remote_dir
1711 The directory we are mirroring from.
1716 my($self, $set) = @_;
1718 $self->_remote_dir ($set);
1720 my $x = $self->_remote_dir;
1721 $self->is_slave (1);
1725 =head2 $str = $obj->remoteroot
1727 =head2 (void) $obj->remoteroot ( $set )
1729 Get/Set the composed prefix needed when rsyncing from a remote module.
1730 If remote_host, remote_module, and remote_dir are set, it is composed
1736 my($self, $set) = @_;
1738 $self->_remoteroot($set);
1740 my $remoteroot = $self->_remoteroot;
1741 unless (defined $remoteroot) {
1742 $remoteroot = sprintf
1745 defined $self->remote_host ?
($self->remote_host."::") : "",
1746 defined $self->remote_module ?
($self->remote_module."/") : "",
1747 defined $self->remote_dir ?
$self->remote_dir : "",
1749 $self->_remoteroot($remoteroot);
1754 =head2 (void) $obj->split_rfilename ( $recentfilename )
1756 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1759 $filenameroot-$interval$serializer_suffix
1765 This filename is split into its parts and the parts are fed to the
1770 sub split_rfilename
{
1771 my($self, $rfname) = @_;
1772 my($splitter) = qr
(^(.+)-([^-\
.]+)(\
.[^\
.]+));
1773 if (my($f,$i,$s) = $rfname =~ $splitter) {
1774 $self->filenameroot ($f);
1775 $self->interval ($i);
1776 $self->serializer_suffix ($s);
1778 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1783 =head2 my $rfile = $obj->rfile
1785 Returns the full path of the I<recentfile>
1791 my $rfile = $self->_rfile;
1792 return $rfile if defined $rfile;
1793 $rfile = File
::Spec
->catfile
1797 $self->_rfile ($rfile);
1801 =head2 $rsync_obj = $obj->rsync
1803 The File::Rsync object that this object uses for communicating with an
1810 my $rsync = $self->_rsync;
1811 unless (defined $rsync) {
1812 my $rsync_options = $self->rsync_options || {};
1813 if ($HAVE->{"File::Rsync"}) {
1814 $rsync = File
::Rsync
->new($rsync_options);
1815 $self->_rsync($rsync);
1817 die "File::Rsync required for rsync operations. Cannot continue";
1823 =head2 (void) $obj->register_rsync_error(@err)
1825 =head2 (void) $obj->un_register_rsync_error()
1827 Register_rsync_error is called whenever the File::Rsync object fails
1828 on an exec (say, connection doesn't succeed). It issues a warning and
1829 sleeps for an increasing amount of time. Un_register_rsync_error
1830 resets the error count. See also accessor C<max_rsync_errors>.
1835 my $no_success_count = 0;
1836 my $no_success_time = 0;
1837 sub register_rsync_error
{
1838 my($self, @err) = @_;
1840 $no_success_time = time;
1841 $no_success_count++;
1842 my $max_rsync_errors = $self->max_rsync_errors;
1843 $max_rsync_errors = MAX_INT
unless defined $max_rsync_errors;
1844 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1850 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1856 my $sleep = 12 * $no_success_count;
1857 $sleep = 300 if $sleep > 300;
1862 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1863 scalar(localtime($no_success_time)),
1870 sub un_register_rsync_error
{
1872 $no_success_time = 0;
1873 $no_success_count = 0;
1877 =head2 $clone = $obj->_sparse_clone
1879 Clones just as much from itself that it does not hurt. Experimental
1882 Note: what fits better: sparse or shallow? Other suggestions?
1888 my $new = bless {}, ref $self;
1897 ignore_link_stat_errors
1899 max_files_per_connection
1903 sleep_per_connection
1908 $o = Storable
::dclone
$o if ref $o;
1914 =head2 $boolean = OBJ->ttl_reached ()
1920 my $have_mirrored = $self->have_mirrored || 0;
1921 my $now = Time
::HiRes
::time;
1922 my $ttl = $self->ttl;
1923 $ttl = 24.2 unless defined $ttl;
1924 if ($now > $have_mirrored + $ttl) {
1930 =head2 (void) $obj->unlock()
1932 Unlocking is implemented with an C<rmdir> on a locking directory
1933 (C<.lock> appended to $rfile).
1939 return unless $self->_is_locked;
1940 my $rfile = $self->rfile;
1941 unlink "$rfile.lock/process" or warn "Could not unlink lockfile '$rfile.lock/process': $!";
1942 rmdir "$rfile.lock" or warn "Could not rmdir lockdir '$rfile.lock': $!";;
1943 $self->_is_locked (0);
1948 Sets this recentfile in the state of not 'seeded'.
1956 =head2 $ret = $obj->update ($path, $type)
1958 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1960 =head2 $ret = $obj->update ()
1962 Enter one file into the local I<recentfile>. $path is the (usually
1963 absolute) path. If the path is outside I<our> tree, then it is
1966 C<$type> is one of C<new> or C<delete>.
1968 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1969 not used and the epoch is calculated by the update() routine itself
1970 based on current time. But if there is the demand to insert a
1971 not-so-current file into the dataset, then the caller sets
1972 $dirty_epoch. This causes the epoch of the registered event to become
1973 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1974 bit more. As compensation the dirtymark of the whole dataset is set to
1975 now or the current epoch, whichever is higher. Note: setting the
1976 dirty_epoch to the future is prohibited as it's very unlikely to be
1977 intended: it definitely might wreak havoc with the index files.
1979 The new file event is unshifted (or, if dirty_epoch is set, inserted
1980 at the place it belongs to, according to the rule to have a sequence
1981 of strictly decreasing timestamps) to the array of recent_events and
1982 the array is shortened to the length of the timespan allowed. This is
1983 usually the timespan specified by the interval of this recentfile but
1984 as long as this recentfile has not been merged to another one, the
1985 timespan may grow without bounds.
1987 The third form runs an update without inserting a new file. This may
1988 be desired to truncate a recentfile.
1991 sub _epoch_monotonically_increasing
{
1992 my($self,$epoch,$recent) = @_;
1993 return $epoch unless @
$recent; # the first one goes unoffended
1994 if (_bigfloatgt
("".$epoch,$recent->[0]{epoch
})) {
1997 return _increase_a_bit
($recent->[0]{epoch
});
2001 my($self,$path,$type,$dirty_epoch) = @_;
2002 if (defined $path or defined $type or defined $dirty_epoch) {
2003 die "update called without path argument" unless defined $path;
2004 die "update called without type argument" unless defined $type;
2005 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
2008 my $ctx = $self->_locked_batch_update([{path
=>$path,type
=>$type,epoch
=>$dirty_epoch}]);
2009 $self->write_recent($ctx->{recent
}) if $ctx->{something_done
};
2010 $self->_assert_symlink;
2014 =head2 $obj->batch_update($batch)
2016 Like update but for many files. $batch is an arrayref containing
2017 hashrefs with the structure
2029 my($self,$batch) = @_;
2031 my $ctx = $self->_locked_batch_update($batch);
2032 $self->write_recent($ctx->{recent
}) if $ctx->{something_done
};
2033 $self->_assert_symlink;
2036 sub _locked_batch_update
{
2037 my($self,$batch) = @_;
2038 my $something_done = 0;
2039 my $recent = $self->recent_events;
2040 unless ($recent->[0]) {
2042 $something_done = 1;
2044 my %paths_in_recent = map { $_->{path
} => undef } @
$recent;
2045 my $interval = $self->interval;
2046 my $canonmeth = $self->canonize;
2047 unless ($canonmeth) {
2048 $canonmeth = "naive_path_normalize";
2050 my $oldest_allowed = 0;
2051 my $setting_new_dirty_mark = 0;
2053 if ($self->verbose && @
$batch > 1) {
2054 eval {require Time
::Progress
};
2055 warn "dollarat[$@]" if $@
;
2057 $console = new Time
::Progress
;
2058 $console->attr( min
=> 1, max
=> scalar @
$batch );
2063 ITEM
: for my $item (sort {($b->{epoch
}||0) <=> ($a->{epoch
}||0)} @
$batch) {
2065 print $console->report( "\rdone %p elapsed: %L (%l sec), ETA %E (%e sec)", $i ) if $console and not $i % 50;
2066 my $ctx = $self->_update_batch_item($item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,\
%paths_in_recent,$memo_splicepos);
2067 $something_done = $ctx->{something_done
};
2068 $oldest_allowed = $ctx->{oldest_allowed
};
2069 $setting_new_dirty_mark = $ctx->{setting_new_dirty_mark
};
2070 $recent = $ctx->{recent
};
2071 $memo_splicepos = $ctx->{memo_splicepos
};
2073 print "\n" if $console;
2074 if ($setting_new_dirty_mark) {
2075 $oldest_allowed = 0;
2077 TRUNCATE
: while (@
$recent) {
2078 if (_bigfloatlt
($recent->[-1]{epoch
}, $oldest_allowed)) {
2080 $something_done = 1;
2085 return {something_done
=>$something_done,recent
=>$recent};
2087 sub _update_batch_item
{
2088 my($self,$item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,$paths_in_recent,$memo_splicepos) = @_;
2089 my($path,$type,$dirty_epoch) = @
{$item}{qw(path type epoch)};
2090 if (defined $path or defined $type or defined $dirty_epoch) {
2091 $path = $self->$canonmeth($path);
2093 # you must calculate the time after having locked, of course
2094 my $now = Time
::HiRes
::time;
2097 if (defined $dirty_epoch && _bigfloatgt
($now,$dirty_epoch)) {
2098 $epoch = $dirty_epoch;
2100 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
2103 my $merged = $self->merged;
2104 if ($merged->{epoch
} && !$setting_new_dirty_mark) {
2105 my $virtualnow = _bigfloatmax
($now,$epoch);
2106 # for the lower bound I think we need no big math, we calc already
2107 my $secs = $self->interval_secs();
2108 $oldest_allowed = min
($virtualnow - $secs, $merged->{epoch
}, $epoch);
2110 # as long as we are not merged at all, no limits!
2112 my $lrd = $self->localroot;
2113 if (defined $path && $path =~ s
|^\Q
$lrd\E
||) {
2116 # remove the older duplicates of this $path, irrespective of $type:
2117 if (defined $dirty_epoch) {
2118 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch,$paths_in_recent,$memo_splicepos);
2119 $recent = $ctx->{recent
};
2120 $splicepos = $ctx->{splicepos
};
2121 $epoch = $ctx->{epoch
};
2122 my $dirtymark = $self->dirtymark;
2124 if (_bigfloatgt
($epoch, $now)) { # just in case we had to increase it
2127 $self->dirtymark($new_dm);
2128 $setting_new_dirty_mark = 1;
2129 if (not defined $merged->{epoch
} or _bigfloatlt
($epoch,$merged->{epoch
})) {
2133 $recent = [ grep { $_->{path
} ne $path } @
$recent ];
2136 if (defined $splicepos) {
2137 splice @
$recent, $splicepos, 0, { epoch
=> $epoch, path
=> $path, type
=> $type };
2138 $paths_in_recent->{$path} = undef;
2140 $memo_splicepos = $splicepos;
2141 $something_done = 1;
2145 something_done
=> $something_done,
2146 oldest_allowed
=> $oldest_allowed,
2147 setting_new_dirty_mark
=> $setting_new_dirty_mark,
2149 memo_splicepos
=> $memo_splicepos,
2152 sub _update_with_dirty_epoch
{
2153 my($self,$path,$recent,$epoch,$paths_in_recent,$memo_splicepos) = @_;
2155 my $new_recent = [];
2156 if (exists $paths_in_recent->{$path}) {
2158 KNOWN_EVENT
: for my $i (0..$#$recent) {
2159 if ($recent->[$i]{path
} eq $path) {
2160 if ($recent->[$i]{epoch
} eq $epoch) {
2166 push @
$new_recent, $recent->[$i];
2169 @
$recent = @
$new_recent unless $cancel;
2171 if (!exists $recent->[0] or _bigfloatgt
($epoch,$recent->[0]{epoch
})) {
2173 } elsif (_bigfloatlt
($epoch,$recent->[-1]{epoch
})) {
2174 $splicepos = @
$recent;
2177 if (_bigfloatgt
($memo_splicepos<=$#$recent && $epoch, $recent->[$memo_splicepos]{epoch})) {
2180 $startingpoint = $memo_splicepos;
2182 RECENT
: for my $i ($startingpoint..$#$recent) {
2183 my $ev = $recent->[$i];
2184 if ($epoch eq $recent->[$i]{epoch
}) {
2185 $epoch = _increase_a_bit
($epoch, $i ?
$recent->[$i-1]{epoch
} : undef);
2187 if (_bigfloatgt
($epoch,$recent->[$i]{epoch
})) {
2195 splicepos
=> $splicepos,
2202 Sets this recentfile in the state of 'seeded' which means it has to
2203 re-evaluate its uptodateness.
2213 Tells if the recentfile is in the state 'seeded'.
2217 my($self, $set) = @_;
2219 $self->_seeded ($set);
2221 my $x = $self->_seeded;
2222 unless (defined $x) {
2224 $self->_seeded ($x);
2231 True if this object has mirrored the complete interval covered by the
2239 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2243 # it's too easy to misconfigure ttl and related timings and then
2244 # never reach uptodateness, so disabled 2009-03-22
2245 if (0 and not defined $uptodate) {
2246 if ($self->ttl_reached){
2247 $why = "ttl_reached returned true, so we are not uptodate";
2251 unless (defined $uptodate) {
2252 # look if recentfile has unchanged timestamp
2253 my $minmax = $self->minmax;
2254 if (exists $minmax->{mtime
}) {
2255 my $rfile = $self->_my_current_rfile;
2256 my @stat = stat $rfile or die "Could not stat '$rfile': $!";
2257 my $mtime = $stat[9];
2258 if (defined $mtime && defined $minmax->{mtime
} && $mtime > $minmax->{mtime
}) {
2259 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2262 my $covered = $self->done->covered(@
$minmax{qw(max min)});
2263 $why = sprintf "minmax covered[%s], so we return that", defined $covered ?
$covered : "UNDEF";
2264 $uptodate = $covered;
2268 unless (defined $uptodate) {
2269 $why = "fallthrough, so not uptodate";
2273 $self->_uptodateness_ever_reached(1);
2277 uptodate
=> $uptodate,
2280 $self->_remember_last_uptodate_call($remember);
2284 =head2 $obj->write_recent ($recent_files_arrayref)
2286 Writes a I<recentfile> based on the current reflection of the current
2287 state of the tree limited by the current interval.
2292 @
{$_[1]} = sort { _bigfloatcmp
($b->{epoch
},$a->{epoch
}) } @
{$_[1]};
2296 my ($self,$recent) = @_;
2297 die "write_recent called without argument" unless defined $recent;
2299 SANITYCHECK
: for my $i (0..$#$recent) {
2300 if (defined($Last_epoch) and _bigfloatge
($recent->[$i]{epoch
},$Last_epoch)) {
2302 Carp
::confess
(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2303 $recent->[$i]{epoch
}, $Last_epoch, $self->interval);
2305 # $self->_resort($recent);
2308 $Last_epoch = $recent->[$i]{epoch
};
2310 my $minmax = $self->minmax;
2311 if (!defined $minmax->{max
} || _bigfloatlt
($minmax->{max
},$recent->[0]{epoch
})) {
2312 $minmax->{max
} = @
$recent && exists $recent->[0]{epoch
} ?
$recent->[0]{epoch
} : undef;
2314 if (!defined $minmax->{min
} || _bigfloatlt
($minmax->{min
},$recent->[-1]{epoch
})) {
2315 $minmax->{min
} = @
$recent && exists $recent->[-1]{epoch
} ?
$recent->[-1]{epoch
} : undef;
2317 $self->minmax($minmax);
2318 my $meth = sprintf "write_%d", $self->protocol;
2319 $self->$meth($recent);
2322 =head2 $obj->write_0 ($recent_files_arrayref)
2324 Delegate of C<write_recent()> on protocol 0
2329 my ($self,$recent) = @_;
2330 my $rfile = $self->rfile;
2331 YAML
::Syck
::DumpFile
("$rfile.new",$recent);
2332 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2335 =head2 $obj->write_1 ($recent_files_arrayref)
2337 Delegate of C<write_recent()> on protocol 1
2342 my ($self,$recent) = @_;
2343 my $rfile = $self->rfile;
2344 my $suffix = $self->serializer_suffix;
2346 meta
=> $self->meta_data,
2350 if ($suffix eq ".yaml") {
2351 $serialized = YAML
::Syck
::Dump
($data);
2352 } elsif ($HAVE->{"Data::Serializer"}) {
2353 my $serializer = Data
::Serializer
->new
2354 ( serializer
=> $serializers{$suffix} );
2355 $serialized = $serializer->raw_serialize($data);
2357 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2359 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2360 print $fh $serialized;
2361 close $fh or die "Could not close '$rfile.new': $!";
2362 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2366 my $nq = qr/[^"]+/; # non-quotes
2368 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2372 The following suffixes are supported and trigger the use of these
2377 =item C<< ".yaml" => "YAML::Syck" >>
2379 =item C<< ".json" => "JSON" >>
2381 =item C<< ".sto" => "Storable" >>
2383 =item C<< ".dd" => "Data::Dumper" >>
2391 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2393 =head1 INTERVAL SPEC
2395 An interval spec is a primitive way to express time spans. Normally it
2396 is composed from an integer and a letter.
2398 As a special case, a string that consists only of the single letter
2399 C<Z>, stands for MAX_INT seconds.
2401 The following letters express the specified number of seconds:
2407 =item C<< m => 60 >>
2409 =item C<< h => 60*60 >>
2411 =item C<< d => 60*60*24 >>
2413 =item C<< W => 60*60*24*7 >>
2415 =item C<< M => 60*60*24*30 >>
2417 =item C<< Q => 60*60*24*90 >>
2419 =item C<< Y => 60*60*24*365.25 >>
2427 L<File::Rsync::Mirror::Recent>,
2428 L<File::Rsync::Mirror::Recentfile::Done>,
2429 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2433 Please report any bugs or feature requests through the web interface
2435 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2436 I will be notified, and then you'll automatically be notified of
2437 progress on your bug as I make changes.
2441 Memory hungry: it seems all memory is allocated during the initial
2442 rsync where a list of all files is maintained in memory.
2446 You can find documentation for this module with the perldoc command.
2448 perldoc File::Rsync::Mirror::Recentfile
2450 You can also look for information at:
2454 =item * RT: CPAN's request tracker
2456 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2458 =item * AnnoCPAN: Annotated CPAN documentation
2460 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2462 =item * CPAN Ratings
2464 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2468 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2473 =head1 ACKNOWLEDGEMENTS
2475 Thanks to RJBS for module-starter.
2481 =head1 COPYRIGHT & LICENSE
2483 Copyright 2008,2009 Andreas König.
2485 This program is free software; you can redistribute it and/or modify it
2486 under the same terms as Perl itself.
2491 1; # End of File::Rsync::Mirror::Recentfile
2495 # cperl-indent-level: 4