1 package File
::Rsync
::Mirror
::Recentfile
;
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
19 $HAVE->{$package} = eval qq{ require $package; };
22 use File
::Basename
qw(basename dirname fileparse);
23 use File
::Copy
qw(cp);
24 use File
::Path
qw(mkpath);
25 use File
::Rsync
::Mirror
::Recentfile
::FakeBigFloat
qw(:all);
27 use List
::Util
qw(first max min);
28 use Scalar
::Util
qw(reftype);
33 use version
; our $VERSION = qv
('0.0.1');
35 use constant MAX_INT
=> ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL
=> 1;
41 # maybe subclass if this mapping is bad?
46 B<!!!! PRE-ALPHA ALERT !!!!>
48 Nothing in here is believed to be stable, nothing yet intended for
49 public consumption. The plan is to provide scripts that act as
50 frontends for all the backend functionality. Option and method names
51 will very likely change.
53 For the rationale see the section BACKGROUND.
55 This is published only for developers of the (yet to be named)
58 Writer (of a single file):
60 use File::Rsync::Mirror::Recentfile;
61 my $fr = File::Rsync::Mirror::Recentfile->new
64 filenameroot => "RECENT",
65 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
66 localroot => "/home/ftp/pub/PAUSE/authors/",
67 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
69 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
73 my $rf = File::Rsync::Mirror::Recentfile->new
75 filenameroot => "RECENT",
76 ignore_link_stat_errors => 1,
78 localroot => "/home/ftp/pub/PAUSE/authors",
80 remote_host => "pause.perl.org",
81 remote_module => "authors",
84 'rsync-path' => '/usr/bin/rsync',
87 'omit-dir-times' => 1,
94 Aggregator (usually the writer):
96 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
103 =head1 CONSTRUCTORS / DESTRUCTOR
105 =head2 my $obj = CLASS->new(%hash)
107 Constructor. On every argument pair the key is a method name and the
108 value is an argument to that method name.
110 If a recentfile for this resource already exists, metadata that are
111 not defined by the constructor will be fetched from there as soon as
112 it is being read by recent_events().
117 my($class, @args) = @_;
118 my $self = bless {}, $class;
120 my($method,$arg) = splice @args, 0, 2;
121 $self->$method($arg);
123 unless (defined $self->protocol) {
124 $self->protocol(DEFAULT_PROTOCOL
);
126 unless (defined $self->filenameroot) {
127 $self->filenameroot("RECENT");
129 unless (defined $self->serializer_suffix) {
130 $self->serializer_suffix(".yaml");
135 =head2 my $obj = CLASS->new_from_file($file)
137 Constructor. $file is a I<recentfile>.
142 my($class, $file) = @_;
143 my $self = bless {}, $class;
144 $self->_rfile($file);
146 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
150 # XXX: we can skip this step when the metadata are sufficient, but
151 # we cannot parse the file without some magic stuff about
154 my($name,$path) = fileparse
$file;
155 my $symlink = readlink $file;
156 if ($symlink =~ m
|/|) {
157 die "FIXME: filenames containing '/' not supported, got $symlink";
159 $file = File
::Spec
->catfile ( $path, $symlink );
161 my($name,$path,$suffix) = fileparse
$file, keys %serializers;
162 $self->serializer_suffix($suffix);
163 $self->localroot($path);
164 die "Could not determine file format from suffix" unless $suffix;
166 if ($suffix eq ".yaml") {
168 $deserialized = YAML
::Syck
::LoadFile
($file);
169 } elsif ($HAVE->{"Data::Serializer"}) {
170 my $serializer = Data
::Serializer
->new
171 ( serializer
=> $serializers{$suffix} );
172 $deserialized = $serializer->raw_deserialize($serialized);
174 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
176 while (my($k,$v) = each %{$deserialized->{meta
}}) {
177 next if $k ne lc $k; # "Producers"
180 unless (defined $self->protocol) {
181 $self->protocol(DEFAULT_PROTOCOL
);
191 sub DESTROY
{ shift->unlock }
202 "_current_tempfile_fh",
203 "_delayed_operations",
210 "_remember_last_uptodate_call",
216 "_uptodateness_ever_reached",
221 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
227 A list of interval specs that tell the aggregator which I<recentfile>s
232 The name of a method to canonize the path before rsyncing. Only
233 supported value is C<naive_path_normalize>. Defaults to that.
237 A comment about this tree and setup.
241 A timestamp. The dirtymark is updated whenever an out of band change
242 on the origin server is performed that violates the protocol. Say,
243 they add or remove files in the middle somewhere. Slaves must react
244 with a devaluation of their C<done> structure which then leads to a
245 full re-sync of all files.
249 The (prefix of the) filename we use for this I<recentfile>. Defaults to
250 C<RECENT>. The string must not contain a directory separator.
254 Timestamp remembering when we mirrored this recentfile the last time.
255 Only relevant for slaves.
257 =item ignore_link_stat_errors
259 If set to true, rsync errors are ignored that complain about link stat
260 errors. These seem to happen only when there are files missing at the
261 origin. In race conditions this can always happen, so it is
262 recommended to set this value to true.
266 If set to true, this object will fetch a new recentfile from remote
267 when the timespan between the last mirror (see have_mirrored) and now
268 is too large (currently hardcoded).
272 After how many seconds shall we die if we cannot lock a I<recentfile>?
273 Defaults to 600 seconds.
277 When mirror_loop is called, this accessor can specify how much time
278 every loop shall at least take. If the work of a loop is done before
279 that time has gone, sleeps for the rest of the time. Defaults to
280 arbitrary 42 seconds.
282 =item max_files_per_connection
284 Maximum number of files that are transferred on a single rsync call.
285 Setting it higher means higher performance at the price of holding
286 connections longer and potentially disturbing other users in the pool.
287 Defaults to the arbitrary value 42.
289 =item max_rsync_errors
291 When rsync operations encounter that many errors without any resetting
292 success in between, then we die. Defaults to unlimited. A value of
293 -1 means we run forever ignoring all rsync errors.
297 Hashref remembering when we read the recent_events from this file the
298 last time and what the timespan was.
302 When the RECENT file format changes, we increment the protocol. We try
303 to support older protocols in later releases.
307 The host we are mirroring from. Leave empty for the local filesystem.
311 Rsync servers have so called modules to separate directory trees from
312 each other. Put here the name of the module under which we are
313 mirroring. Leave empty for local filesystem.
317 Things like compress, links, times or checksums. Passed in to the
318 File::Rsync object used to run the mirror.
320 =item serializer_suffix
322 Mostly untested accessor. The only well tested format for
323 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
324 Data::Serializer. But in principle other formats are supported as
325 well. See section SERIALIZERS below.
327 =item sleep_per_connection
329 Sleep that many seconds (floating point OK) after every chunk of rsyncing
330 has finished. Defaults to arbitrary 0.42.
334 Time to live. Number of seconds after which this recentfile must be
335 fetched again from the origin server. Only relevant for slaves.
336 Defaults to arbitrary 24.2 seconds.
340 Boolean to turn on a bit verbosity.
346 use accessors
@accessors;
350 =head2 (void) $obj->aggregate( %options )
352 Takes all intervals that are collected in the accessor called
353 aggregator. Sorts them by actual length of the interval.
354 Removes those that are shorter than our own interval. Then merges this
355 object into the next larger object. The merging continues upwards
356 as long as the next I<recentfile> is old enough to warrant a merge.
358 If a merge is warranted is decided according to the interval of the
359 previous interval so that larger files are not so often updated as
360 smaller ones. If $options{force} is true, all files get updated.
362 Here is an example to illustrate the behaviour. Given aggregators
368 1h updates 1d on every call to aggregate()
369 1d updates 1W earliest after 1h
370 1W updates 1M earliest after 1d
371 1M updates 1Q earliest after 1W
372 1Q updates 1Y earliest after 1M
373 1Y updates Z earliest after 1Q
375 Note that all but the smallest recentfile get updated at an arbitrary
376 rate and as such are quite useless on their own.
381 my($self, %option) = @_;
382 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
383 grep { $_->{secs
} >= $self->interval_secs }
384 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
385 $self->interval, @
{$self->aggregator || []};
387 $aggs[0]{object
} = $self;
388 AGGREGATOR
: for my $i (0..$#aggs-1) {
389 my $this = $aggs[$i]{object
};
390 my $next = $this->_sparse_clone;
391 $next->interval($aggs[$i+1]{interval
});
393 if ($option{force
} || $i == 0) {
396 my $next_rfile = $next->rfile;
397 if (-e
$next_rfile) {
398 my $prev = $aggs[$i-1]{object
};
400 my $next_age = 86400 * -M
$next_rfile;
401 if ($next_age > $prev->interval_secs) {
410 $aggs[$i+1]{object
} = $next;
417 # collect file size and mtime for all files of this aggregate
418 sub _debug_aggregate
{
420 my @aggs = sort { $a->{secs
} <=> $b->{secs
} }
421 map { { interval
=> $_, secs
=> $self->interval_secs($_)} }
422 $self->interval, @
{$self->aggregator || []};
424 for my $i (0..$#aggs) {
425 my $this = Storable
::dclone
$self;
426 $this->interval($aggs[$i]{interval
});
427 my $rfile = $this->rfile;
428 my @stat = stat $rfile;
429 push @
$report, {rfile
=> $rfile, size
=> $stat[7], mtime
=> $stat[9]};
434 # (void) $self->_assert_symlink()
435 sub _assert_symlink
{
437 my $recentrecentfile = File
::Spec
->catfile
446 if ($Config{d_symlink
} eq "define") {
447 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
448 if (-l
$recentrecentfile) {
449 my $found_symlink = readlink $recentrecentfile;
450 if ($found_symlink eq $self->rfilename) {
453 $howto_create_symlink = 2;
456 $howto_create_symlink = 1;
458 if (1 == $howto_create_symlink) {
459 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
461 unlink "$recentrecentfile.$$"; # may fail
462 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
463 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
466 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
467 unlink "$recentrecentfile.$$"; # may fail
468 cp
$self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
469 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
473 =head2 $hashref = $obj->delayed_operations
475 A hash of hashes containing unlink and rmdir operations which had to
476 wait until the recentfile got unhidden in order to not confuse
477 downstream mirrors (in case we have some).
481 sub delayed_operations
{
483 my $x = $self->_delayed_operations;
484 unless (defined $x) {
489 $self->_delayed_operations ($x);
494 =head2 $done = $obj->done
496 $done is a reference to a File::Rsync::Mirror::Recentfile::Done object
497 that keeps track of rsync activities. Only needed and used when we are
504 my $done = $self->_done;
506 require File
::Rsync
::Mirror
::Recentfile
::Done
;
507 $done = File
::Rsync
::Mirror
::Recentfile
::Done
->new();
508 $done->_rfinterval ($self->interval);
509 $self->_done ( $done );
514 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
516 Stores the remote I<recentfile> locally as a tempfile. The caller is
517 responsible to remove the file after use.
519 Note: if you're intending to act as an rsync server for other slaves,
520 then you must prefer this method to fetch that file with
521 get_remotefile(). Otherwise downstream mirrors would expect you to
522 already have mirrored all the files that are in the I<recentfile>
523 before you have them mirrored.
527 sub get_remote_recentfile_as_tempfile
{
529 mkpath
$self->localroot;
532 if ( $self->_use_tempfile() ) {
533 return $self->_current_tempfile if ! $self->ttl_reached;
534 $fh = $self->_current_tempfile_fh;
535 $trfilename = $self->rfilename;
537 $trfilename = $self->rfilename;
542 $dst = $self->_current_tempfile;
544 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
545 $dst = $fh->filename;
546 $self->_current_tempfile ($dst);
547 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
548 if (defined $rfile && -e
$rfile) {
549 # saving on bandwidth. Might need to be configurable
550 # $self->bandwidth_is_cheap?
551 cp
$rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
558 if ($self->verbose) {
559 my $doing = -e
$dst ?
"Sync" : "Get";
560 my $display_dst = join "/", "...", basename
(dirname
($dst)), basename
($dst);
563 "%-4s %d (1/1/%s) temp %s ... ",
572 while (!$self->rsync->exec(
576 $self->register_rsync_error ($self->rsync->err);
577 if (++$retried >= 3) {
578 warn "XXX giving up";
584 printf STDERR
"Warning: gave up mirroring %s, will try again later", $self->interval;
586 $self->_refresh_internals ($dst);
587 $self->have_mirrored (Time
::HiRes
::time);
588 $self->un_register_rsync_error ();
590 if ($self->verbose) {
591 print STDERR
"DONE\n";
594 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
598 sub _get_remote_rat_provide_tempfile_object
{
599 my($self, $trfilename) = @_;
600 my $fh = File
::Temp
->new
601 (TEMPLATE
=> sprintf(".FRMRecent-%s-XXXX",
604 DIR
=> $self->localroot,
605 SUFFIX
=> $self->serializer_suffix,
606 UNLINK
=> $self->_use_tempfile,
608 if ($self->_use_tempfile) {
609 $self->_current_tempfile_fh ($fh); # delay self destruction
614 =head2 $localpath = $obj->get_remotefile ( $relative_path )
616 Rsyncs one single remote file to local filesystem.
618 Note: no locking is done on this file. Any number of processes may
621 Note II: do not use for recentfiles. If you are a cascading
622 slave/server combination, it would confuse other slaves. They would
623 expect the contents of these recentfiles to be available. Use
624 get_remote_recentfile_as_tempfile() instead.
629 my($self, $path) = @_;
630 my $dst = File
::Spec
->catfile($self->localroot, $path);
632 if ($self->verbose) {
633 my $doing = -e
$dst ?
"Sync" : "Get";
636 "%-4s %d (1/1/%s) %s ... ",
643 while (!$self->rsync->exec(
649 $self->register_rsync_error ($self->rsync->err);
651 $self->un_register_rsync_error ();
652 if ($self->verbose) {
653 print STDERR
"DONE\n";
658 =head2 $obj->interval ( $interval_spec )
660 Get/set accessor. $interval_spec is a string and described below in
661 the section INTERVAL SPEC.
666 my ($self, $interval) = @_;
668 $self->_interval($interval);
669 $self->_rfile(undef);
671 $interval = $self->_interval;
672 unless (defined $interval) {
673 # do not ask the $self too much, it recurses!
675 Carp
::confess
("Alert: interval undefined for '".$self."'. Cannot continue.");
680 =head2 $secs = $obj->interval_secs ( $interval_spec )
682 $interval_spec is described below in the section INTERVAL SPEC. If
683 empty defaults to the inherent interval for this object.
688 my ($self, $interval) = @_;
689 $interval ||= $self->interval;
690 unless (defined $interval) {
691 die "interval_secs() called without argument on an object without a declared one";
693 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
694 die "Could not determine seconds from interval[$interval]";
695 if ($interval eq "Z") {
697 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
698 return $seconds{$t}*$n;
700 die "Invalid interval specification: n[$n]t[$t]";
704 =head2 $obj->localroot ( $localroot )
706 Get/set accessor. The local root of the tree.
711 my ($self, $localroot) = @_;
713 $self->_localroot($localroot);
714 $self->_rfile(undef);
716 $localroot = $self->_localroot;
719 =head2 $ret = $obj->local_path($path_found_in_recentfile)
721 Combines the path to our local mirror and the path of an object found
722 in this I<recentfile>. In other words: the target of a mirror operation.
724 Implementation note: We split on slashes and then use
725 File::Spec::catfile to adjust to the local operating system.
730 my($self,$path) = @_;
731 unless (defined $path) {
732 # seems like a degenerated case
733 return $self->localroot;
735 my @p = split m
|/|, $path;
736 File
::Spec
->catfile($self->localroot,@p);
739 =head2 (void) $obj->lock
741 Locking is implemented with an C<mkdir> on a locking directory
742 (C<.lock> appended to $rfile).
748 # not using flock because it locks on filehandles instead of
749 # old school ressources.
750 my $locked = $self->_is_locked and return;
751 my $rfile = $self->rfile;
752 # XXX need a way to allow breaking the lock
754 my $locktimeout = $self->locktimeout || 600;
755 while (not mkdir "$rfile.lock") {
756 Time
::HiRes
::sleep 0.01;
757 if (time - $start > $locktimeout) {
758 die "Could not acquire lockdirectory '$rfile.lock': $!";
761 $self->_is_locked (1);
764 =head2 (void) $obj->merge ($other)
766 Bulk update of this object with another one. It's used to merge a
767 smaller and younger $other object into the current one. If this file
768 is a C<Z> file, then we do not merge in objects of type C<delete>. But
769 if we encounter an object of type delete we delete the corresponding
770 C<new> object if we have it.
772 If there is nothing to be merged, nothing is done.
777 my($self, $other) = @_;
778 $self->_merge_sanitycheck ( $other );
780 my $other_recent = $other->recent_events || [];
781 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
783 my $my_recent = $self->recent_events || [];
785 # calculate the target time span
786 my $myepoch = $my_recent->[0] ?
$my_recent->[0]{epoch
} : undef;
787 my $epoch = $other_recent->[0] ?
$other_recent->[0]{epoch
} : $myepoch;
788 my $oldest_allowed = 0;
790 unless ($my_recent->[0]) {
795 if (_bigfloatgt
($other->dirtymark, $self->dirtymark||0)) {
798 } elsif (my $merged = $self->merged) {
799 my $secs = $self->interval_secs();
800 $oldest_allowed = min
($epoch - $secs, $merged->{epoch
}||0);
801 if (@
$other_recent and
802 _bigfloatlt
($other_recent->[-1]{epoch
}, $oldest_allowed)
804 $oldest_allowed = $other_recent->[-1]{epoch
};
807 while (@
$my_recent && _bigfloatlt
($my_recent->[-1]{epoch
}, $oldest_allowed)) {
814 my $other_recent_filtered = [];
815 for my $oev (@
$other_recent) {
816 my $oevepoch = $oev->{epoch
} || 0;
817 next if _bigfloatlt
($oevepoch, $oldest_allowed);
818 my $path = $oev->{path
};
819 next if $have_path{$path}++;
820 if ( $self->interval eq "Z"
821 and $oev->{type
} eq "delete") {
824 if (!$myepoch || _bigfloatgt
($oevepoch, $myepoch)) {
827 push @
$other_recent_filtered, { epoch
=> $oev->{epoch
}, path
=> $path, type
=> $oev->{type
} };
830 if ($something_done) {
831 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \
%have_path, $epoch);
837 sub _merge_something_done
{
838 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
840 my $epoch_conflict = 0;
842 ZIP
: while (@
$other_recent_filtered || @
$my_recent) {
845 @
$other_recent_filtered && _bigfloatge
($other_recent_filtered->[0]{epoch
},$my_recent->[0]{epoch
})) {
846 $event = shift @
$other_recent_filtered;
848 $event = shift @
$my_recent;
849 next ZIP
if $have_path->{$event->{path
}}++;
851 $epoch_conflict=1 if defined $last_epoch && $event->{epoch
} eq $last_epoch;
852 $last_epoch = $event->{epoch
};
853 push @
$recent, $event;
855 if ($epoch_conflict) {
857 for (my $i = $#$recent;$i>=0;$i--) {
858 my $epoch = $recent->[$i]{epoch
};
859 if ($have_epoch{$epoch}++) {
860 while ($have_epoch{$epoch}) {
861 $epoch = _increase_a_bit
($epoch);
863 $recent->[$i]{epoch
} = $epoch;
864 $have_epoch{$epoch}++;
868 if (!$self->dirtymark || _bigfloatgt
($other->dirtymark, $self->dirtymark)) {
869 $self->dirtymark ( $other->dirtymark );
871 $self->write_recent($recent);
873 time => Time
::HiRes
::time, # not used anywhere
874 epoch
=> $recent->[0]{epoch
},
875 into_interval
=> $self->interval, # not used anywhere
877 $other->write_recent($other_recent);
880 sub _merge_sanitycheck
{
881 my($self, $other) = @_;
882 if ($self->interval_secs <= $other->interval_secs) {
885 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
886 $self->interval_secs,
887 $other->interval_secs,
894 Hashref denoting when this recentfile has been merged into some other
900 my($self, $set) = @_;
902 $self->_merged ($set);
904 my $merged = $self->_merged;
906 if ($merged and $into = $merged->{into_interval
} and defined $self->_interval) {
908 if ($into eq $self->interval) {
912 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
916 } elsif ($self->interval_secs($into) < $self->interval_secs) {
920 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
921 $self->interval_secs($into),
922 $self->interval_secs,
930 =head2 $hashref = $obj->meta_data
932 Returns the hashref of metadata that the server has to add to the
939 my $ret = $self->{meta
};
957 # XXX need to reset the Producer if I am a writer, keep it when I
959 $ret->{Producers
} ||= {
960 __PACKAGE__
, "$VERSION", # stringified it looks better
962 'time', Time
::HiRes
::time,
964 $ret->{dirtymark
} ||= Time
::HiRes
::time;
968 =head2 $success = $obj->mirror ( %options )
970 Mirrors the files in this I<recentfile> as reported by
971 C<recent_events>. Options named C<after>, C<before>, C<max>, and
972 C<skip-deletes> are passed through to the L<recent_events> call. The
973 boolean option C<piecemeal>, if true, causes C<mirror> to only rsync
974 C<max_files_per_connection> and keep track of the rsynced files so
975 that future calls will rsync different files until all files are
981 my($self, %options) = @_;
982 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
983 $self->_use_tempfile (1);
984 my %passthrough = map { ($_ => $options{$_}) } qw(before after max skip-deletes);
985 my ($recent_events) = $self->recent_events(%passthrough);
986 my(@error, @dlcollector); # download-collector: array containing paths we need
988 my $last_item = $#$recent_events;
989 my $done = $self->done;
990 my $pathdb = $self->_pathdb;
991 ITEM
: for my $i ($first_item..$last_item) {
1005 last if $i == $last_item;
1006 return if $status->{mustreturn
};
1009 my $success = eval { $self->_mirror_dlcollector (\
@dlcollector,$pathdb,$recent_events);};
1010 if (!$success || $@
) {
1011 warn "Warning: Unknown error while mirroring: $@";
1016 if ($self->verbose) {
1017 print STDERR
"DONE\n";
1019 # once we've gone to the end we consider ourselves free of obligations
1021 $self->_mirror_unhide_tempfile ($trecentfile);
1022 $self->_mirror_perform_delayed_ops;
1038 my $recent_event = $recent_events->[$i];
1039 return if $done->covered ( $recent_event->{epoch
} );
1041 my $rec = $pathdb->{$recent_event->{path
}};
1042 if ($rec && $rec->{recentepoch
}) {
1044 ( $rec->{recentepoch
}, $recent_event->{epoch
} )){
1045 $done->register ($recent_events, [$i]);
1050 my $dst = $self->local_path($recent_event->{path
});
1051 if ($recent_event->{type
} eq "new"){
1052 $self->_mirror_item_new
1065 } elsif ($recent_event->{type
} eq "delete") {
1067 if ($options->{'skip-deletes'}) {
1068 $activity = "skipped";
1071 $activity = "not_found";
1072 } elsif (-l
$dst or not -d _
) {
1073 $self->delayed_operations->{unlink}{$dst}++;
1074 $activity = "deleted";
1076 $self->delayed_operations->{rmdir}{$dst}++;
1077 $activity = "deleted";
1080 $done->register ($recent_events, [$i]);
1082 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1085 warn "Warning: invalid upload type '$recent_event->{type}'";
1089 sub _mirror_item_new
{
1102 if ($self->verbose) {
1103 my $doing = -e
$dst ?
"Sync" : "Get";
1106 "%-4s %d (%d/%d/%s) %s ... ",
1112 $recent_event->{path
},
1115 my $max_files_per_connection = $self->max_files_per_connection || 42;
1117 if ($self->verbose) {
1120 push @
$dlcollector, { rev
=> $recent_event, i
=> $i };
1121 if (@
$dlcollector >= $max_files_per_connection) {
1122 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1123 my $sleep = $self->sleep_per_connection;
1124 $sleep = 0.42 unless defined $sleep;
1125 Time
::HiRes
::sleep $sleep;
1126 if ($options->{piecemeal
}) {
1127 $status->{mustreturn
} = 1;
1133 if (!$success || $@
) {
1134 warn "Warning: Error while mirroring: $@";
1138 if ($self->verbose) {
1139 print STDERR
"DONE\n";
1143 sub _mirror_dlcollector
{
1144 my($self,$xcoll,$pathdb,$recent_events) = @_;
1145 my $success = $self->mirror_path([map {$_->{rev
}{path
}} @
$xcoll]);
1147 $self->_mirror_register_path($pathdb,[map {$_->{rev
}} @
$xcoll],"rsync");
1149 $self->done->register($recent_events, [map {$_->{i
}} @
$xcoll]);
1154 sub _mirror_register_path
{
1155 my($self,$pathdb,$coll,$activity) = @_;
1157 for my $item (@
$coll) {
1158 $pathdb->{$item->{path
}} =
1160 recentepoch
=> $item->{epoch
},
1161 ($activity."_on") => $time,
1166 sub _mirror_unhide_tempfile
{
1167 my($self, $trecentfile) = @_;
1168 my $rfile = $self->rfile;
1169 if (rename $trecentfile, $rfile) {
1170 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1173 Carp
::confess
("Could not rename '$trecentfile' to '$rfile': $!");
1175 $self->_use_tempfile (0);
1176 if (my $ctfh = $self->_current_tempfile_fh) {
1177 $ctfh->unlink_on_destroy (0);
1178 $self->_current_tempfile_fh (undef);
1182 sub _mirror_perform_delayed_ops
{
1184 my $delayed = $self->delayed_operations;
1185 for my $dst (keys %{$delayed->{unlink}}) {
1186 unless (unlink $dst) {
1188 Carp
::cluck
( "Warning: Error while unlinking '$dst': $!" );
1190 delete $delayed->{unlink}{$dst};
1192 for my $dst (keys %{$delayed->{rmdir}}) {
1193 unless (rmdir $dst) {
1195 Carp
::cluck
( "Warning: Error on rmdir '$dst': $!" );
1197 delete $delayed->{rmdir}{$dst};
1201 =head2 (void) $obj->mirror_loop
1203 Run mirror in an endless loop. See the accessor C<loopinterval>. XXX
1204 What happens/should happen if we miss the interval during a single loop?
1210 my $iteration_start = time;
1213 $SIG{INT
} = sub { $Signal++ };
1214 my $loopinterval = $self->loopinterval || 42;
1215 my $after = -999999999;
1217 $self->mirror($after);
1218 last LOOP
if $Signal;
1219 my $re = $self->recent_events;
1220 $after = $re->[0]{epoch
};
1221 if ($self->verbose) {
1225 if (time - $iteration_start < $loopinterval) {
1226 sleep $iteration_start + $loopinterval - time;
1228 if ($self->verbose) {
1235 =head2 $success = $obj->mirror_path ( $arrref | $path )
1237 If the argument is a scalar it is treated as a path. The remote path
1238 is mirrored into the local copy. $path is the path found in the
1239 I<recentfile>, i.e. it is relative to the root directory of the
1242 If the argument is an array reference then all elements are treated as
1243 a path below the current tree and all are rsynced with a single
1244 command (and a single connection).
1249 my($self,$path) = @_;
1250 # XXX simplify the two branches such that $path is treated as
1251 # [$path] maybe even demand the argument as an arrayref to
1252 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1254 if (ref $path and ref $path eq "ARRAY") {
1255 my $dst = $self->localroot;
1256 mkpath dirname
$dst;
1257 my($fh) = File
::Temp
->new(TEMPLATE
=> sprintf(".%s-XXXX",
1258 lc $self->filenameroot,
1263 for my $p (@
$path) {
1267 $fh->unlink_on_destroy(1);
1270 while (!$self->rsync->exec
1276 'files-from' => $fh->filename,
1278 my(@err) = $self->rsync->err;
1279 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1280 if ($self->verbose) {
1281 warn "Info: ignoring link_stat error '@err'";
1285 $self->register_rsync_error (@err);
1286 if (++$retried >= 3) {
1287 warn "XXX giving up.";
1293 $self->un_register_rsync_error ();
1296 my $dst = $self->local_path($path);
1297 mkpath dirname
$dst;
1298 while (!$self->rsync->exec
1306 my(@err) = $self->rsync->err;
1307 if ($self->ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1308 if ($self->verbose) {
1309 warn "Info: ignoring link_stat error '@err'";
1313 $self->register_rsync_error (@err);
1315 $self->un_register_rsync_error ();
1320 sub _my_current_rfile
{
1323 if ($self->_use_tempfile) {
1324 $rfile = $self->_current_tempfile;
1326 $rfile = $self->rfile;
1331 =head2 $path = $obj->naive_path_normalize ($path)
1333 Takes an absolute unix style path as argument and canonicalizes it to
1334 a shorter path if possible, removing things like double slashes or
1335 C</./> and removes references to C<../> directories to get a shorter
1336 unambiguos path. This is used to make the code easier that determines
1337 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1341 sub naive_path_normalize
{
1342 my($self,$path) = @_;
1344 1 while $path =~ s
|/[^/]+/\.\./|/|;
1349 =head2 $ret = $obj->read_recent_1 ( $data )
1351 Delegate of C<recent_events()> on protocol 1
1356 my($self, $data) = @_;
1357 return $data->{recent
};
1360 =head2 $array_ref = $obj->recent_events ( %options )
1362 Note: the code relies on the resource being written atomically. We
1363 cannot lock because we may have no write access. If the caller has
1364 write access (eg. aggregate() or update()), it has to care for any
1365 necessary locking and it MUST write atomically.
1367 If $options{after} is specified, only file events after this timestamp
1370 If $options{before} is specified, only file events before this
1371 timestamp are returned.
1373 IF $options{'skip-deletes'} is specified, no files-to-be-deleted will
1376 If $options{max} is specified only a maximum of this many events is
1379 If $options{contains} is specified the value must be a hash reference
1380 containing a query. The query may contain the keys C<epoch>, C<path>,
1381 and C<type>. Each represents a condition that must be met. If there is
1382 more than one such key, the conditions are ANDed.
1384 If $options{info} is specified, it must be a hashref. This hashref
1385 will be filled with metadata about the unfiltered recent_events of
1386 this object, in key C<first> there is the first item, in key C<last>
1392 my ($self, %options) = @_;
1393 my $info = $options{info
};
1394 if ($self->is_slave) {
1395 # XXX seems dubious, might produce tempfiles without removing them?
1396 $self->get_remote_recentfile_as_tempfile;
1398 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1399 -e
$rfile_or_tempfile or return [];
1400 my $suffix = $self->serializer_suffix;
1402 $self->_try_deserialize
1409 if ($err or !$data) {
1413 if (reftype
$data eq 'ARRAY') { # protocol 0
1416 $re = $self->_recent_events_protocol_x
1422 return $re unless grep {defined $options{$_}} qw(after before max);
1423 $self->_recent_events_handle_options ($re, \
%options);
1426 sub _recent_events_handle_options
{
1427 my($self, $re, $options) = @_;
1428 my $last_item = $#$re;
1429 my $info = $options->{info
};
1431 $info->{first
} = $re->[0];
1432 $info->{last} = $re->[-1];
1434 if (defined $options->{after
}) {
1435 if ($re->[0]{epoch
} > $options->{after
}) {
1438 {$re->[$_]{epoch
} <= $options->{after
}}
1448 if (defined $options->{before
}) {
1449 if ($re->[0]{epoch
} > $options->{before
}) {
1452 {$re->[$_]{epoch
} < $options->{before
}}
1461 if (0 != $first_item || -1 != $last_item) {
1462 @
$re = splice @
$re, $first_item, 1+$last_item-$first_item;
1464 if ($options->{'skip-deletes'}) {
1465 @
$re = grep { $_->{type
} ne "delete" } @
$re;
1467 if (my $contopt = $options->{contains
}) {
1468 my $seen_allowed = 0;
1469 for my $allow (qw(epoch path type)) {
1470 if (exists $contopt->{$allow}) {
1472 my $v = $contopt->{$allow};
1473 @
$re = grep { $_->{$allow} eq $v } @
$re;
1476 if (keys %$contopt > $seen_allowed) {
1479 (sprintf "unknown query: %s", join ", ", %$contopt);
1482 if ($options->{max
} && @
$re > $options->{max
}) {
1483 @
$re = splice @
$re, 0, $options->{max
};
1488 sub _recent_events_protocol_x
{
1493 my $meth = sprintf "read_recent_%d", $data->{meta
}{protocol
};
1494 # we may be reading meta for the first time
1495 while (my($k,$v) = each %{$data->{meta
}}) {
1496 next if $k ne lc $k; # "Producers"
1497 next if defined $self->$k;
1500 my $re = $self->$meth ($data);
1501 my @stat = stat $rfile_or_tempfile or die "Cannot stat '$rfile_or_tempfile': $!";
1502 my $minmax = { mtime
=> $stat[9] };
1504 $minmax->{min
} = $re->[-1]{epoch
};
1505 $minmax->{max
} = $re->[0]{epoch
};
1507 $self->minmax ( $minmax );
1511 sub _try_deserialize
{
1516 if ($suffix eq ".yaml") {
1518 YAML
::Syck
::LoadFile
($rfile_or_tempfile);
1519 } elsif ($HAVE->{"Data::Serializer"}) {
1520 my $serializer = Data
::Serializer
->new
1521 ( serializer
=> $serializers{$suffix} );
1524 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1528 $serializer->raw_deserialize($serialized);
1530 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1534 sub _refresh_internals
{
1535 my($self, $dst) = @_;
1536 my $class = ref $self;
1537 my $rfpeek = $class->new_from_file ($dst);
1542 $self->$acc ( $rfpeek->$acc );
1544 my $old_dirtymark = $self->dirtymark;
1545 my $new_dirtymark = $rfpeek->dirtymark;
1546 if ($old_dirtymark && $new_dirtymark && _bigfloatgt
($new_dirtymark,$old_dirtymark)) {
1548 $self->dirtymark ( $new_dirtymark );
1553 =head2 $ret = $obj->rfilename
1555 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1556 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1562 my $file = sprintf("%s-%s%s",
1563 $self->filenameroot,
1565 $self->serializer_suffix,
1570 =head2 $str = $self->remote_dir
1572 The directory we are mirroring from.
1577 my($self, $set) = @_;
1579 $self->_remote_dir ($set);
1581 my $x = $self->_remote_dir;
1582 $self->is_slave (1);
1586 =head2 $str = $obj->remoteroot
1588 =head2 (void) $obj->remoteroot ( $set )
1590 Get/Set the composed prefix needed when rsyncing from a remote module.
1591 If remote_host, remote_module, and remote_dir are set, it is composed
1597 my($self, $set) = @_;
1599 $self->_remoteroot($set);
1601 my $remoteroot = $self->_remoteroot;
1602 unless (defined $remoteroot) {
1603 $remoteroot = sprintf
1606 defined $self->remote_host ?
($self->remote_host."::") : "",
1607 defined $self->remote_module ?
($self->remote_module."/") : "",
1608 defined $self->remote_dir ?
$self->remote_dir : "",
1610 $self->_remoteroot($remoteroot);
1615 =head2 (void) $obj->resolve_recentfilename ( $recentfilename )
1617 Inverse method to L<rfilename>. $recentfilename is a plain filename of
1620 $filenameroot-$interval$serializer_suffix
1626 This filename is split into its parts and the parts are fed to the
1631 sub resolve_recentfilename
{
1632 my($self, $rfname) = @_;
1633 my($splitter) = qr
(^(.+)-([^-\
.]+)(\
.[^\
.]+));
1634 if (my($f,$i,$s) = $rfname =~ $splitter) {
1635 $self->filenameroot ($f);
1636 $self->interval ($i);
1637 $self->serializer_suffix ($s);
1639 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1644 =head2 my $rfile = $obj->rfile
1646 Returns the full path of the I<recentfile>
1652 my $rfile = $self->_rfile;
1653 return $rfile if defined $rfile;
1654 $rfile = File
::Spec
->catfile
1658 $self->_rfile ($rfile);
1662 =head2 $rsync_obj = $obj->rsync
1664 The File::Rsync object that this object uses for communicating with an
1671 my $rsync = $self->_rsync;
1672 unless (defined $rsync) {
1673 my $rsync_options = $self->rsync_options || {};
1674 if ($HAVE->{"File::Rsync"}) {
1675 $rsync = File
::Rsync
->new($rsync_options);
1676 $self->_rsync($rsync);
1678 die "File::Rsync required for rsync operations. Cannot continue";
1684 =head2 (void) $obj->register_rsync_error(@err)
1686 =head2 (void) $obj->un_register_rsync_error()
1688 Register_rsync_error is called whenever the File::Rsync object fails
1689 on an exec (say, connection doesn't succeed). It issues a warning and
1690 sleeps for an increasing amount of time. Un_register_rsync_error
1691 resets the error count. See also accessor C<max_rsync_errors>.
1696 my $no_success_count = 0;
1697 my $no_success_time = 0;
1698 sub register_rsync_error
{
1699 my($self, @err) = @_;
1701 $no_success_time = time;
1702 $no_success_count++;
1703 my $max_rsync_errors = $self->max_rsync_errors;
1704 $max_rsync_errors = MAX_INT
unless defined $max_rsync_errors;
1705 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1711 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1717 my $sleep = 12 * $no_success_count;
1718 $sleep = 300 if $sleep > 300;
1723 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1724 scalar(localtime($no_success_time)),
1731 sub un_register_rsync_error
{
1733 $no_success_time = 0;
1734 $no_success_count = 0;
1738 =head2 $clone = $obj->_sparse_clone
1740 Clones just as much from itself that it does not hurt. Experimental
1743 Note: what fits better: sparse or shallow? Other suggestions?
1749 my $new = bless {}, ref $self;
1759 max_files_per_connection
1763 sleep_per_connection
1767 $o = Storable
::dclone
$o if ref $o;
1773 =head2 $boolean = OBJ->ttl_reached ()
1779 my $have_mirrored = $self->have_mirrored || 0;
1780 my $now = Time
::HiRes
::time;
1781 my $ttl = $self->ttl;
1782 $ttl = 24.2 unless defined $ttl;
1783 if ($now > $have_mirrored + $ttl) {
1789 =head2 (void) $obj->unlock()
1791 Unlocking is implemented with an C<rmdir> on a locking directory
1792 (C<.lock> appended to $rfile).
1798 return unless $self->_is_locked;
1799 my $rfile = $self->rfile;
1800 rmdir "$rfile.lock";
1801 $self->_is_locked (0);
1806 Sets this recentfile in the state of not 'seeded'.
1814 =head2 $ret = $obj->update ($path, $type)
1816 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1818 =head2 $ret = $obj->update ()
1820 Enter one file into the local I<recentfile>. $path is the (usually
1821 absolute) path. If the path is outside I<our> tree, then it is
1824 $type is one of C<new> or C<delete>.
1826 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1827 not used and the epoch is calculated by the update() routine itself
1828 based on current time. But if there is the demand to insert a
1829 not-so-current file into the dataset, then the caller sets
1830 $dirty_epoch. This causes the epoch of the registered event to become
1831 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1832 bit more. As compensation the dirtymark of the whole dataset is set to
1833 the current epoch. Note: setting the dirty_epoch to the future is
1834 prohibited as it's very unlikely to be intended: it definitely might
1835 wreak havoc with the index files.
1837 The new file event is unshifted (or, if dirty_epoch is set, inserted
1838 at the place it belongs to, according to the rule to have a sequence
1839 of strictly decreasing timestamps) to the array of recent_events and
1840 the array is shortened to the length of the timespan allowed. This is
1841 usually the timespan specified by the interval of this recentfile but
1842 as long as this recentfile has not been merged to another one, the
1843 timespan may grow without bounds.
1845 The third form runs an update without inserting a new file. This may
1846 be disired to truncate a recentfile.
1849 sub _epoch_monotonically_increasing
{
1850 my($self,$epoch,$recent) = @_;
1851 return $epoch unless @
$recent; # the first one goes unoffended
1852 if (_bigfloatgt
("".$epoch,$recent->[0]{epoch
})) {
1855 return _increase_a_bit
($recent->[0]{epoch
});
1859 my($self,$path,$type,$dirty_epoch) = @_;
1860 if (defined $path or defined $type or defined $dirty_epoch) {
1861 die "update called without path argument" unless defined $path;
1862 die "update called without type argument" unless defined $type;
1863 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
1864 die "update called with \$type=$type and \$dirty_epoch=$dirty_epoch; ".
1865 "dirty_epoch only allowed with type=new" if defined $dirty_epoch and $type ne "new";
1866 my $canonmeth = $self->canonize;
1867 unless ($canonmeth) {
1868 $canonmeth = "naive_path_normalize";
1870 $path = $self->$canonmeth($path);
1872 my $lrd = $self->localroot;
1874 # you must calculate the time after having locked, of course
1875 my $now = Time
::HiRes
::time;
1876 my $interval = $self->interval;
1877 my $secs = $self->interval_secs();
1878 my $recent = $self->recent_events;
1881 if (defined $dirty_epoch && _bigfloatgt
($now,$dirty_epoch)) {
1882 $epoch = $dirty_epoch;
1884 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
1888 my $oldest_allowed = 0;
1889 my $merged = $self->merged;
1890 if ($merged->{epoch
}) {
1891 my $virtualnow = max
($now,$epoch);
1892 # for the lower bound could we need big math?
1893 $oldest_allowed = min
($virtualnow - $secs, $merged->{epoch
}, $epoch);
1895 # as long as we are not merged at all, no limits!
1897 my $something_done = 0;
1898 TRUNCATE
: while (@
$recent) {
1899 # $DB::single++ unless defined $oldest_allowed;
1900 if (_bigfloatlt
($recent->[-1]{epoch
}, $oldest_allowed)) {
1902 $something_done = 1;
1907 if (defined $path && $path =~ s
|^\Q
$lrd\E
||) {
1910 # remove the older duplicates of this $path, irrespective of $type:
1911 if (defined $dirty_epoch) {
1912 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch);
1913 $recent = $ctx->{recent
};
1914 $splicepos = $ctx->{splicepos
};
1915 $epoch = $ctx->{epoch
};
1916 my $dirtymark = $self->dirtymark;
1918 if (_bigfloatgt
($epoch, $now)) {
1921 $self->dirtymark($new_dm);
1922 my $merged = $self->merged;
1923 if (not defined $merged->{epoch
} or _bigfloatlt
($epoch,$merged->{epoch
})) {
1927 $recent = [ grep { $_->{path
} ne $path } @
$recent ];
1930 if (defined $splicepos) {
1931 splice @
$recent, $splicepos, 0, { epoch
=> $epoch, path
=> $path, type
=> $type };
1933 $something_done = 1;
1936 $self->write_recent($recent) if $something_done;
1937 $self->_assert_symlink;
1941 sub _update_with_dirty_epoch
{
1942 my($self,$path,$recent,$epoch) = @_;
1944 my $new_recent = [];
1945 if (grep { $_->{path
} ne $path } @
$recent) {
1947 KNOWN_EVENT
: for my $i (0..$#$recent) {
1948 if ($recent->[$i]{path
} eq $path) {
1949 if ($recent->[$i]{epoch
} eq $epoch) {
1955 push @
$new_recent, $recent->[$i];
1958 @
$recent = @
$new_recent unless $cancel;
1960 if (!exists $recent->[0] or _bigfloatgt
($epoch,$recent->[0]{epoch
})) {
1962 } elsif (_bigfloatlt
($epoch,$recent->[-1]{epoch
})) {
1963 $splicepos = @
$recent;
1965 RECENT
: for my $i (0..$#$recent) {
1966 my $ev = $recent->[$i];
1967 if ($epoch eq $recent->[$i]{epoch
}) {
1968 $epoch = _increase_a_bit
($epoch, $i ?
$recent->[$i-1]{epoch
} : undef);
1970 if (_bigfloatgt
($epoch,$recent->[$i]{epoch
})) {
1978 splicepos
=> $splicepos,
1985 Sets this recentfile in the state of 'seeded' which means it has to
1986 re-evaluate its uptodateness.
1996 Tells if the recentfile is in the state 'seeded'.
2000 my($self, $set) = @_;
2002 $self->_seeded ($set);
2004 my $x = $self->_seeded;
2005 unless (defined $x) {
2007 $self->_seeded ($x);
2014 True if this object has mirrored the complete interval covered by the
2024 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2028 # it's too easy to misconfigure ttl and related timings and then
2029 # never reach uptodateness, so disabled 2009-03-22
2030 if (0 and not defined $uptodate) {
2031 if ($self->ttl_reached){
2032 $why = "ttl_reached returned true, so we are not uptodate";
2036 unless (defined $uptodate) {
2037 # look if recentfile has unchanged timestamp
2038 my $minmax = $self->minmax;
2039 if (exists $minmax->{mtime
}) {
2040 my $rfile = $self->_my_current_rfile;
2041 my @stat = stat $rfile;
2042 my $mtime = $stat[9];
2043 if (defined $mtime && defined $minmax->{mtime
} && $mtime > $minmax->{mtime
}) {
2044 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2047 my $covered = $self->done->covered(@
$minmax{qw(max min)});
2048 $why = sprintf "minmax covered[%s], so we return that", defined $covered ?
$covered : "UNDEF";
2049 $uptodate = $covered;
2053 unless (defined $uptodate) {
2054 $why = "fallthrough, so not uptodate";
2058 $self->_uptodateness_ever_reached(1);
2063 uptodate
=> $uptodate,
2066 $self->_remember_last_uptodate_call($remember);
2070 =head2 $obj->write_recent ($recent_files_arrayref)
2072 Writes a I<recentfile> based on the current reflection of the current
2073 state of the tree limited by the current interval.
2078 @
{$_[1]} = sort { _bigfloatcmp
($b->{epoch
},$a->{epoch
}) } @
{$_[1]};
2082 my ($self,$recent) = @_;
2083 die "write_recent called without argument" unless defined $recent;
2085 SANITYCHECK
: for my $i (0..$#$recent) {
2086 if (defined($Last_epoch) and _bigfloatge
($recent->[$i]{epoch
},$Last_epoch)) {
2088 Carp
::confess
(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2089 $recent->[$i]{epoch
}, $Last_epoch, $self->interval);
2091 # $self->_resort($recent);
2094 $Last_epoch = $recent->[$i]{epoch
};
2096 my $minmax = $self->minmax;
2097 if (!defined $minmax->{max
} || _bigfloatlt
($minmax->{max
},$recent->[0]{epoch
})) {
2098 $minmax->{max
} = $recent->[0]{epoch
};
2100 if (!defined $minmax->{min
} || _bigfloatlt
($minmax->{min
},$recent->[-1]{epoch
})) {
2101 $minmax->{min
} = $recent->[-1]{epoch
};
2103 $self->minmax($minmax);
2104 my $meth = sprintf "write_%d", $self->protocol;
2105 $self->$meth($recent);
2108 =head2 $obj->write_0 ($recent_files_arrayref)
2110 Delegate of C<write_recent()> on protocol 0
2115 my ($self,$recent) = @_;
2116 my $rfile = $self->rfile;
2117 YAML
::Syck
::DumpFile
("$rfile.new",$recent);
2118 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2121 =head2 $obj->write_1 ($recent_files_arrayref)
2123 Delegate of C<write_recent()> on protocol 1
2128 my ($self,$recent) = @_;
2129 my $rfile = $self->rfile;
2130 my $suffix = $self->serializer_suffix;
2132 meta
=> $self->meta_data,
2136 if ($suffix eq ".yaml") {
2137 $serialized = YAML
::Syck
::Dump
($data);
2138 } elsif ($HAVE->{"Data::Serializer"}) {
2139 my $serializer = Data
::Serializer
->new
2140 ( serializer
=> $serializers{$suffix} );
2141 $serialized = $serializer->raw_serialize($data);
2143 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2145 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2146 print $fh $serialized;
2147 close $fh or die "Could not close '$rfile.new': $!";
2148 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2153 split /\n/, <<'=cut'; %serializers = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2155 =head1 THE ARCHITECTURE OF A COLLECTION OF RECENTFILES
2157 The idea is that we want to have a short file that records really
2158 recent changes. So that a fresh mirror can be kept fresh as long as
2159 the connectivity is given. Then we want longer files that record the
2160 history before. So when the mirror falls behind the update period
2161 reflected in the shortest file, it can complement the list of recent
2162 file events with the next one. And if this is not long enough we want
2163 another one, again a bit longer. And we want one that completes the
2164 history back to the oldest file. The index files do contain the
2165 complete list of current files. The longer a period covered by an
2166 index file is gone the less often the index file is updated. For
2167 practical reasons adjacent files will often overlap a bit but this is
2168 neither necessary nor enforced. That's the basic idea. The following
2169 example represents a tree that has a few updates every day:
2171 RECENT.recent -> RECENT-1h.yaml
2180 The first file is the principal file, in so far it is the one that is
2181 written first after a filesystem change. Usually a symlink links to it
2182 with a filename that has the same filenameroot and the suffix
2183 C<.recent>. On systems that do not support symlinks there is a plain
2184 copy maintained instead.
2186 The last file, the Z file, contains the complementary files that are
2187 in none of the other files. It does never contain C<deletes>. Besides
2188 this it serves the role of a recovery mechanism or spill over pond.
2189 When things go wrong, it's a valuable controlling instance to hold the
2190 differences between the collection of limited interval files and the
2193 =head2 THE INDIVIDUAL RECENTFILE
2195 A I<recentfile> consists of a hash that has two keys: C<meta> and
2196 C<recent>. The C<meta> part has metadata and the C<recent> part has a
2197 list of fileobjects.
2199 =head2 THE META PART
2201 Here we find things that are pretty much self explaining: all
2202 lowercase attributes are accessors and as such explained somewhere
2203 above in this manpage. The uppercase attribute C<Producers> contains
2204 version information about involved software components. Nothing to
2205 worry about as I believe.
2207 =head2 THE RECENT PART
2209 This is the interesting part. Every entry refers to some filesystem
2210 change (with path, epoch, type). The epoch value is the point in time
2211 when some change was I<registered>. Do not be tempted to believe that
2212 the entry has a direct relation to something like modification time or
2213 change time on the filesystem level. The timestamp (I<epoch> element)
2214 is a floating point number and does practically never correspond
2215 exactly to the data recorded in the filesystem but rather to the time
2216 when some process succeeded to report some filesystem change to the
2217 I<recentfile> mechanism. This is why many parts of the code refer to
2218 I<events>, because we merely try to record the I<event> of the
2219 discovery of a change, not the time of the change itself.
2221 All these entries can be devided into two types (denoted by the
2222 C<type> attribute): C<new>s and C<delete>s. Changes and creations are
2223 C<new>s. Deletes are C<delete>s.
2225 Besides an C<epoch> and a C<type> attribute we find a third one:
2226 C<path>. This path is relative to the directory we find the
2229 The order of the entries in the I<recentfile> is by decreasing epoch
2230 attribute. These are unique floating point numbers. When the server
2231 has ntp running correctly, then the timestamps are usually reflecting
2232 a real epoch. If time is running backwards, we trump the system epoch
2233 with strictly monotonically increasing floating point timestamps and
2234 guarantee they are unique.
2236 =head1 CORRUPTION AND RECOVERY
2238 If the origin host breaks the promise to deliver consistent and
2239 complete I<recentfiles> then the way back to sanity shall be achieved
2240 through traditional rsyncing between the hosts. But don't forget to
2241 report it as a bug:)
2245 The following suffixes are supported and trigger the use of these
2250 =item C<< ".yaml" => "YAML::Syck" >>
2252 =item C<< ".json" => "JSON" >>
2254 =item C<< ".sto" => "Storable" >>
2256 =item C<< ".dd" => "Data::Dumper" >>
2264 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2266 =head1 INTERVAL SPEC
2268 An interval spec is a primitive way to express time spans. Normally it
2269 is composed from an integer and a letter.
2271 As a special case, a string that consists only of the single letter
2272 C<Z>, stands for unlimited time.
2274 The following letters express the specified number of seconds:
2280 =item C<< m => 60 >>
2282 =item C<< h => 60*60 >>
2284 =item C<< d => 60*60*24 >>
2286 =item C<< W => 60*60*24*7 >>
2288 =item C<< M => 60*60*24*30 >>
2290 =item C<< Q => 60*60*24*90 >>
2292 =item C<< Y => 60*60*24*365.25 >>
2300 This is about speeding up rsync operation on large trees. Uses a small
2301 metadata cocktail and pull technology.
2303 =head2 NON-COMPETITORS
2305 File::Mirror JWU/File-Mirror/File-Mirror-0.10.tar.gz only local trees
2306 Mirror::YAML ADAMK/Mirror-YAML-0.03.tar.gz some sort of inner circle
2307 Net::DownloadMirror KNORR/Net-DownloadMirror-0.04.tar.gz FTP sites and stuff
2308 Net::MirrorDir KNORR/Net-MirrorDir-0.05.tar.gz dito
2309 Net::UploadMirror KNORR/Net-UploadMirror-0.06.tar.gz dito
2310 Pushmi::Mirror CLKAO/Pushmi-v1.0.0.tar.gz something SVK
2312 rsnapshot www.rsnapshot.org focus on backup
2313 csync www.csync.org more like unison
2314 multi-rsync sourceforge 167893 lan push to many
2318 The problem to solve which clusters and ftp mirrors and otherwise
2319 replicated datasets like CPAN share: how to transfer only a minimum
2320 amount of data to determine the diff between two hosts.
2322 Normally it takes a long time to determine the diff itself before it
2323 can be transferred. Known solutions at the time of this writing are
2324 csync2, and rsync 3 batch mode.
2326 For many years the best solution was csync2 which solves the problem
2327 by maintaining a sqlite database on both ends and talking a highly
2328 sophisticated protocol to quickly determine which files to send and
2329 which to delete at any given point in time. Csync2 is often
2330 inconvenient because it is push technology and the act of syncing
2331 demands quite an intimate relationship between the sender and the
2332 receiver. This is hard to achieve in an environment of loosely coupled
2333 sites where the number of sites is large or connections are
2334 unreliable or network topology is changing.
2336 Rsync 3 batch mode works around these problems by providing rsync-able
2337 batch files which allow receiving nodes to replay the history of the
2338 other nodes. This reduces the need to have an incestuous relation but
2339 it has the disadvantage that these batch files replicate the contents
2340 of the involved files. This seems inappropriate when the nodes already
2341 have a means of communicating over rsync.
2343 rersyncrecent solves this problem with a couple of (usually 2-10)
2344 index files which cover different overlapping time intervals. The
2345 master writes these files and the clients/slaves can construct the
2346 full tree from the information contained in them. The most recent
2347 index file usually covers the last seconds or minutes or hours of the
2348 tree and depending on the needs, slaves can rsync every few seconds or
2349 minutes and then bring their trees in full sync.
2351 The rersyncrecent mode was developed for CPAN but I hope it is a
2352 convenient and economic general purpose solution. I'm looking forward
2353 to see a CPAN backbone that is only a few seconds behind PAUSE. And
2354 then ... the first FUSE based CPAN filesystem anyone?
2356 =head1 FUTURE DIRECTIONS
2358 Currently the origin server must keep track of injected and removed
2359 files. Should be supported by an inotify-based assistant.
2363 Barbie is providing a database of release dates. See
2364 http://use.perl.org/~barbie/journal/37907
2372 Please report any bugs or feature requests through the web interface
2374 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2375 I will be notified, and then you'll automatically be notified of
2376 progress on your bug as I make changes.
2380 Memory hungry: it seems all memory is allocated during the initial
2381 rsync where a list of all files is maintained in memory.
2385 You can find documentation for this module with the perldoc command.
2387 perldoc File::Rsync::Mirror::Recentfile
2389 You can also look for information at:
2393 =item * RT: CPAN's request tracker
2395 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2397 =item * AnnoCPAN: Annotated CPAN documentation
2399 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2401 =item * CPAN Ratings
2403 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2407 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2412 =head1 ACKNOWLEDGEMENTS
2414 Thanks to RJBS for module-starter.
2416 =head1 COPYRIGHT & LICENSE
2418 Copyright 2008,2009 Andreas König.
2420 This program is free software; you can redistribute it and/or modify it
2421 under the same terms as Perl itself.
2426 1; # End of File::Rsync::Mirror::Recentfile
2430 # cperl-indent-level: 4