prevent duplicate appearance of one interval when calculating aggregate()
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob183b1a19c66a786f0d67bf82b78a567f5df48a08
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =cut
14 my $HAVE = {};
15 for my $package (
16 "Data::Serializer",
17 "File::Rsync"
18 ) {
19 $HAVE->{$package} = eval qq{ require $package; };
21 use Config;
22 use File::Basename qw(basename dirname fileparse);
23 use File::Copy qw(cp);
24 use File::Path qw(mkpath);
25 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
26 use File::Temp;
27 use List::Util qw(first max min);
28 use Scalar::Util qw(reftype);
29 use Storable;
30 use Time::HiRes qw();
31 use YAML::Syck;
33 use version; our $VERSION = qv('0.0.8');
35 use constant MAX_INT => ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL => 1;
38 # cf. interval_secs
39 my %seconds;
41 # maybe subclass if this mapping is bad?
42 my %serializers;
44 =head1 SYNOPSIS
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
51 interval => q(6h),
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
59 Reader/mirrorer:
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
64 interval => q(6h),
65 localroot => "/home/ftp/pub/PAUSE/authors",
66 remote_dir => "",
67 remote_host => "pause.perl.org",
68 remote_module => "authors",
69 rsync_options => {
70 compress => 1,
71 'rsync-path' => '/usr/bin/rsync',
72 links => 1,
73 times => 1,
74 'omit-dir-times' => 1,
75 checksum => 1,
77 verbose => 1,
79 $rf->mirror;
81 Aggregator (usually the writer):
83 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
84 $rf->aggregate;
86 =head1 DESCRIPTION
88 Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
89 is always composed of several recentfiles, controlled by the
90 F:R:M:Recent object. The Recentfile object has to do the bookkeeping
91 for a single timeslice.
93 =head1 EXPORT
95 No exports.
97 =head1 CONSTRUCTORS / DESTRUCTOR
99 =head2 my $obj = CLASS->new(%hash)
101 Constructor. On every argument pair the key is a method name and the
102 value is an argument to that method name.
104 If a recentfile for this resource already exists, metadata that are
105 not defined by the constructor will be fetched from there as soon as
106 it is being read by recent_events().
108 =cut
110 sub new {
111 my($class, @args) = @_;
112 my $self = bless {}, $class;
113 while (@args) {
114 my($method,$arg) = splice @args, 0, 2;
115 $self->$method($arg);
117 unless (defined $self->protocol) {
118 $self->protocol(DEFAULT_PROTOCOL);
120 unless (defined $self->filenameroot) {
121 $self->filenameroot("RECENT");
123 unless (defined $self->serializer_suffix) {
124 $self->serializer_suffix(".yaml");
126 return $self;
129 =head2 my $obj = CLASS->new_from_file($file)
131 Constructor. $file is a I<recentfile>.
133 =cut
135 sub new_from_file {
136 my($class, $file) = @_;
137 my $self = bless {}, $class;
138 $self->_rfile($file);
139 #?# $self->lock;
140 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
141 local $/;
142 <$fh>;
144 # XXX: we can skip this step when the metadata are sufficient, but
145 # we cannot parse the file without some magic stuff about
146 # serialized formats
147 while (-l $file) {
148 my($name,$path) = fileparse $file;
149 my $symlink = readlink $file;
150 if ($symlink =~ m|/|) {
151 die "FIXME: filenames containing '/' not supported, got $symlink";
153 $file = File::Spec->catfile ( $path, $symlink );
155 my($name,$path,$suffix) = fileparse $file, keys %serializers;
156 $self->serializer_suffix($suffix);
157 $self->localroot($path);
158 die "Could not determine file format from suffix" unless $suffix;
159 my $deserialized;
160 if ($suffix eq ".yaml") {
161 require YAML::Syck;
162 $deserialized = YAML::Syck::LoadFile($file);
163 } elsif ($HAVE->{"Data::Serializer"}) {
164 my $serializer = Data::Serializer->new
165 ( serializer => $serializers{$suffix} );
166 $deserialized = $serializer->raw_deserialize($serialized);
167 } else {
168 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
170 while (my($k,$v) = each %{$deserialized->{meta}}) {
171 next if $k ne lc $k; # "Producers"
172 $self->$k($v);
174 unless (defined $self->protocol) {
175 $self->protocol(DEFAULT_PROTOCOL);
177 return $self;
180 =head2 DESTROY
182 A simple unlock.
184 =cut
185 sub DESTROY {
186 my $self = shift;
187 $self->unlock;
188 unless ($self->_current_tempfile_fh) {
189 if (my $tempfile = $self->_current_tempfile) {
190 if (-e $tempfile) {
191 # unlink $tempfile; # may fail in global destruction
197 =head1 ACCESSORS
199 =cut
201 my @accessors;
203 BEGIN {
204 @accessors = (
205 "_current_tempfile",
206 "_current_tempfile_fh",
207 "_delayed_operations",
208 "_done",
209 "_interval",
210 "_is_locked",
211 "_localroot",
212 "_merged",
213 "_pathdb",
214 "_remember_last_uptodate_call",
215 "_remote_dir",
216 "_remoteroot",
217 "_rfile",
218 "_rsync",
219 "__verified_tempdir",
220 "_seeded",
221 "_uptodateness_ever_reached",
222 "_use_tempfile",
225 my @pod_lines =
226 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
228 =over 4
230 =item aggregator
232 A list of interval specs that tell the aggregator which I<recentfile>s
233 are to be produced.
235 =item canonize
237 The name of a method to canonize the path before rsyncing. Only
238 supported value is C<naive_path_normalize>. Defaults to that.
240 =item comment
242 A comment about this tree and setup.
244 =item dirtymark
246 A timestamp. The dirtymark is updated whenever an out of band change
247 on the origin server is performed that violates the protocol. Say,
248 they add or remove files in the middle somewhere. Slaves must react
249 with a devaluation of their C<done> structure which then leads to a
250 full re-sync of all files. Implementation note: dirtymark may increase
251 or decrease.
253 =item filenameroot
255 The (prefix of the) filename we use for this I<recentfile>. Defaults to
256 C<RECENT>. The string must not contain a directory separator.
258 =item have_mirrored
260 Timestamp remembering when we mirrored this recentfile the last time.
261 Only relevant for slaves.
263 =item ignore_link_stat_errors
265 If set to true, rsync errors are ignored that complain about link stat
266 errors. These seem to happen only when there are files missing at the
267 origin. In race conditions this can always happen, so it defaults to
268 true.
270 =item is_slave
272 If set to true, this object will fetch a new recentfile from remote
273 when the timespan between the last mirror (see have_mirrored) and now
274 is too large (see C<ttl>).
276 =item keep_delete_objects_forever
278 The default for delete events is that they are passed through the
279 collection of recentfile objects until they reach the Z file. There
280 they get dropped so that the associated file object ceases to exist at
281 all. By setting C<keep_delete_objects_forever> the delete objects are
282 kept forever. This makes the Z file larger but has the advantage that
283 slaves that have interrupted mirroring for a long time still can clean
284 up their copy.
286 =item locktimeout
288 After how many seconds shall we die if we cannot lock a I<recentfile>?
289 Defaults to 600 seconds.
291 =item loopinterval
293 When mirror_loop is called, this accessor can specify how much time
294 every loop shall at least take. If the work of a loop is done before
295 that time has gone, sleeps for the rest of the time. Defaults to
296 arbitrary 42 seconds.
298 =item max_files_per_connection
300 Maximum number of files that are transferred on a single rsync call.
301 Setting it higher means higher performance at the price of holding
302 connections longer and potentially disturbing other users in the pool.
303 Defaults to the arbitrary value 42.
305 =item max_rsync_errors
307 When rsync operations encounter that many errors without any resetting
308 success in between, then we die. Defaults to unlimited. A value of
309 -1 means we run forever ignoring all rsync errors.
311 =item minmax
313 Hashref remembering when we read the recent_events from this file the
314 last time and what the timespan was.
316 =item protocol
318 When the RECENT file format changes, we increment the protocol. We try
319 to support older protocols in later releases.
321 =item remote_host
323 The host we are mirroring from. Leave empty for the local filesystem.
325 =item remote_module
327 Rsync servers have so called modules to separate directory trees from
328 each other. Put here the name of the module under which we are
329 mirroring. Leave empty for local filesystem.
331 =item rsync_options
333 Things like compress, links, times or checksums. Passed in to the
334 File::Rsync object used to run the mirror.
336 =item serializer_suffix
338 Mostly untested accessor. The only well tested format for
339 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
340 Data::Serializer. But in principle other formats are supported as
341 well. See section SERIALIZERS below.
343 =item sleep_per_connection
345 Sleep that many seconds (floating point OK) after every chunk of rsyncing
346 has finished. Defaults to arbitrary 0.42.
348 =item tempdir
350 Directory to write temporary files to. Must allow rename operations
351 into the tree which usually means it must live on the same partition
352 as the target directory. Defaults to C<< $self->localroot >>.
354 =item ttl
356 Time to live. Number of seconds after which this recentfile must be
357 fetched again from the origin server. Only relevant for slaves.
358 Defaults to arbitrary 24.2 seconds.
360 =item verbose
362 Boolean to turn on a bit verbosity.
364 =item verboselog
366 Path to the logfile to write verbose progress information to. This is
367 a primitive stop gap solution to get simple verbose logging working.
368 Switching to Log4perl or similar is probably the way to go.
370 =back
372 =cut
374 use accessors @accessors;
376 =head1 METHODS
378 =head2 (void) $obj->aggregate( %options )
380 Takes all intervals that are collected in the accessor called
381 aggregator. Sorts them by actual length of the interval.
382 Removes those that are shorter than our own interval. Then merges this
383 object into the next larger object. The merging continues upwards
384 as long as the next I<recentfile> is old enough to warrant a merge.
386 If a merge is warranted is decided according to the interval of the
387 previous interval so that larger files are not so often updated as
388 smaller ones. If $options{force} is true, all files get updated.
390 Here is an example to illustrate the behaviour. Given aggregators
392 1h 1d 1W 1M 1Q 1Y Z
394 then
396 1h updates 1d on every call to aggregate()
397 1d updates 1W earliest after 1h
398 1W updates 1M earliest after 1d
399 1M updates 1Q earliest after 1W
400 1Q updates 1Y earliest after 1M
401 1Y updates Z earliest after 1Q
403 Note that all but the smallest recentfile get updated at an arbitrary
404 rate and as such are quite useless on their own.
406 =cut
408 sub aggregate {
409 my($self, %option) = @_;
410 my %seen_interval;
411 my @aggs = sort { $a->{secs} <=> $b->{secs} }
412 grep { !$seen_interval{$_->{interval}}++ && $_->{secs} >= $self->interval_secs }
413 map { { interval => $_, secs => $self->interval_secs($_)} }
414 $self->interval, @{$self->aggregator || []};
415 $self->update;
416 $aggs[0]{object} = $self;
417 AGGREGATOR: for my $i (0..$#aggs-1) {
418 my $this = $aggs[$i]{object};
419 my $next = $this->_sparse_clone;
420 $next->interval($aggs[$i+1]{interval});
421 my $want_merge = 0;
422 if ($option{force} || $i == 0) {
423 $want_merge = 1;
424 } else {
425 my $next_rfile = $next->rfile;
426 if (-e $next_rfile) {
427 my $prev = $aggs[$i-1]{object};
428 local $^T = time;
429 my $next_age = 86400 * -M $next_rfile;
430 if ($next_age > $prev->interval_secs) {
431 $want_merge = 1;
433 } else {
434 $want_merge = 1;
437 if ($want_merge) {
438 $next->merge($this);
439 $aggs[$i+1]{object} = $next;
440 } else {
441 last AGGREGATOR;
446 # collect file size and mtime for all files of this aggregate
447 sub _debug_aggregate {
448 my($self) = @_;
449 my @aggs = sort { $a->{secs} <=> $b->{secs} }
450 map { { interval => $_, secs => $self->interval_secs($_)} }
451 $self->interval, @{$self->aggregator || []};
452 my $report = [];
453 for my $i (0..$#aggs) {
454 my $this = Storable::dclone $self;
455 $this->interval($aggs[$i]{interval});
456 my $rfile = $this->rfile;
457 my @stat = stat $rfile;
458 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
460 $report;
463 # (void) $self->_assert_symlink()
464 sub _assert_symlink {
465 my($self) = @_;
466 my $recentrecentfile = File::Spec->catfile
468 $self->localroot,
469 sprintf
471 "%s.recent",
472 $self->filenameroot
475 if ($Config{d_symlink} eq "define") {
476 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
477 if (-l $recentrecentfile) {
478 my $found_symlink = readlink $recentrecentfile;
479 if ($found_symlink eq $self->rfilename) {
480 return;
481 } else {
482 $howto_create_symlink = 2;
484 } else {
485 $howto_create_symlink = 1;
487 if (1 == $howto_create_symlink) {
488 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
489 } else {
490 unlink "$recentrecentfile.$$"; # may fail
491 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
492 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
494 } else {
495 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
496 unlink "$recentrecentfile.$$"; # may fail
497 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
498 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
502 =head2 $hashref = $obj->delayed_operations
504 A hash of hashes containing unlink and rmdir operations which had to
505 wait until the recentfile got unhidden in order to not confuse
506 downstream mirrors (in case we have some).
508 =cut
510 sub delayed_operations {
511 my($self) = @_;
512 my $x = $self->_delayed_operations;
513 unless (defined $x) {
514 $x = {
515 unlink => {},
516 rmdir => {},
518 $self->_delayed_operations ($x);
520 return $x;
523 =head2 $done = $obj->done
525 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
526 object that keeps track of rsync activities. Only needed and used when
527 we are a mirroring slave.
529 =cut
531 sub done {
532 my($self) = @_;
533 my $done = $self->_done;
534 if (!$done) {
535 require File::Rsync::Mirror::Recentfile::Done;
536 $done = File::Rsync::Mirror::Recentfile::Done->new();
537 $done->_rfinterval ($self->interval);
538 $self->_done ( $done );
540 return $done;
543 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
545 Stores the remote I<recentfile> locally as a tempfile. The caller is
546 responsible to remove the file after use.
548 Note: if you're intending to act as an rsync server for other slaves,
549 then you must prefer this method to fetch that file with
550 get_remotefile(). Otherwise downstream mirrors would expect you to
551 already have mirrored all the files that are in the I<recentfile>
552 before you have them mirrored.
554 =cut
556 sub get_remote_recentfile_as_tempfile {
557 my($self) = @_;
558 mkpath $self->localroot;
559 my $fh;
560 my $trfilename;
561 if ( $self->_use_tempfile() ) {
562 if ($self->ttl_reached) {
563 $fh = $self->_current_tempfile_fh;
564 $trfilename = $self->rfilename;
565 } else {
566 return $self->_current_tempfile;
568 } else {
569 $trfilename = $self->rfilename;
572 my $dst;
573 if ($fh) {
574 $dst = $self->_current_tempfile;
575 } else {
576 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
577 $dst = $fh->filename;
578 $self->_current_tempfile ($dst);
579 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
580 if (defined $rfile && -e $rfile) {
581 # saving on bandwidth. Might need to be configurable
582 # $self->bandwidth_is_cheap?
583 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
586 my $src = join ("/",
587 $self->remoteroot,
588 $trfilename,
590 if ($self->verbose) {
591 my $doing = -e $dst ? "Sync" : "Get";
592 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
593 my $LFH = $self->_logfilehandle;
594 printf $LFH
596 "%-4s %d (1/1/%s) temp %s ... ",
597 $doing,
598 time,
599 $self->interval,
600 $display_dst,
603 my $gaveup = 0;
604 my $retried = 0;
605 local($ENV{LANG}) = "C";
606 while (!$self->rsync->exec(
607 src => $src,
608 dst => $dst,
609 )) {
610 $self->register_rsync_error ($self->rsync->err);
611 if (++$retried >= 3) {
612 warn "XXX giving up";
613 $gaveup = 1;
614 last;
617 if ($gaveup) {
618 my $LFH = $self->_logfilehandle;
619 printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
620 } else {
621 $self->_refresh_internals ($dst);
622 $self->have_mirrored (Time::HiRes::time);
623 $self->un_register_rsync_error ();
625 $self->unseed;
626 if ($self->verbose) {
627 my $LFH = $self->_logfilehandle;
628 print $LFH "DONE\n";
630 my $mode = 0644;
631 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
632 return $dst;
635 sub _verified_tempdir {
636 my($self) = @_;
637 my $tempdir = $self->__verified_tempdir();
638 return $tempdir if defined $tempdir;
639 unless ($tempdir = $self->tempdir) {
640 $tempdir = $self->localroot;
642 unless (-d $tempdir) {
643 mkpath $tempdir;
645 $self->__verified_tempdir($tempdir);
646 return $tempdir;
649 sub _get_remote_rat_provide_tempfile_object {
650 my($self, $trfilename) = @_;
651 my $_verified_tempdir = $self->_verified_tempdir;
652 my $fh = File::Temp->new
653 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
654 $trfilename,
656 DIR => $_verified_tempdir,
657 SUFFIX => $self->serializer_suffix,
658 UNLINK => $self->_use_tempfile,
660 my $mode = 0644;
661 my $dst = $fh->filename;
662 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
663 if ($self->_use_tempfile) {
664 $self->_current_tempfile_fh ($fh); # delay self destruction
666 return $fh;
669 sub _logfilehandle {
670 my($self) = @_;
671 my $fh;
672 if (my $vl = $self->verboselog) {
673 open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
674 } else {
675 $fh = \*STDERR;
677 return $fh;
680 =head2 $localpath = $obj->get_remotefile ( $relative_path )
682 Rsyncs one single remote file to local filesystem.
684 Note: no locking is done on this file. Any number of processes may
685 mirror this object.
687 Note II: do not use for recentfiles. If you are a cascading
688 slave/server combination, it would confuse other slaves. They would
689 expect the contents of these recentfiles to be available. Use
690 get_remote_recentfile_as_tempfile() instead.
692 =cut
694 sub get_remotefile {
695 my($self, $path) = @_;
696 my $dst = File::Spec->catfile($self->localroot, $path);
697 mkpath dirname $dst;
698 if ($self->verbose) {
699 my $doing = -e $dst ? "Sync" : "Get";
700 my $LFH = $self->_logfilehandle;
701 printf $LFH
703 "%-4s %d (1/1/%s) %s ... ",
704 $doing,
705 time,
706 $self->interval,
707 $path,
710 local($ENV{LANG}) = "C";
711 my $remoteroot = $self->remoteroot or die "Alert: missing remoteroot. Cannot continue";
712 while (!$self->rsync->exec(
713 src => join("/",
714 $remoteroot,
715 $path),
716 dst => $dst,
717 )) {
718 $self->register_rsync_error ($self->rsync->err);
720 $self->un_register_rsync_error ();
721 if ($self->verbose) {
722 my $LFH = $self->_logfilehandle;
723 print $LFH "DONE\n";
725 return $dst;
728 =head2 $obj->interval ( $interval_spec )
730 Get/set accessor. $interval_spec is a string and described below in
731 the section INTERVAL SPEC.
733 =cut
735 sub interval {
736 my ($self, $interval) = @_;
737 if (@_ >= 2) {
738 $self->_interval($interval);
739 $self->_rfile(undef);
741 $interval = $self->_interval;
742 unless (defined $interval) {
743 # do not ask the $self too much, it recurses!
744 require Carp;
745 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
747 return $interval;
750 =head2 $secs = $obj->interval_secs ( $interval_spec )
752 $interval_spec is described below in the section INTERVAL SPEC. If
753 empty defaults to the inherent interval for this object.
755 =cut
757 sub interval_secs {
758 my ($self, $interval) = @_;
759 $interval ||= $self->interval;
760 unless (defined $interval) {
761 die "interval_secs() called without argument on an object without a declared one";
763 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
764 die "Could not determine seconds from interval[$interval]";
765 if ($interval eq "Z") {
766 return MAX_INT;
767 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
768 return $seconds{$t}*$n;
769 } else {
770 die "Invalid interval specification: n[$n]t[$t]";
774 =head2 $obj->localroot ( $localroot )
776 Get/set accessor. The local root of the tree.
778 =cut
780 sub localroot {
781 my ($self, $localroot) = @_;
782 if (@_ >= 2) {
783 $self->_localroot($localroot);
784 $self->_rfile(undef);
786 $localroot = $self->_localroot;
789 =head2 $ret = $obj->local_path($path_found_in_recentfile)
791 Combines the path to our local mirror and the path of an object found
792 in this I<recentfile>. In other words: the target of a mirror operation.
794 Implementation note: We split on slashes and then use
795 File::Spec::catfile to adjust to the local operating system.
797 =cut
799 sub local_path {
800 my($self,$path) = @_;
801 unless (defined $path) {
802 # seems like a degenerated case
803 return $self->localroot;
805 my @p = split m|/|, $path;
806 File::Spec->catfile($self->localroot,@p);
809 =head2 (void) $obj->lock
811 Locking is implemented with an C<mkdir> on a locking directory
812 (C<.lock> appended to $rfile).
814 =cut
816 sub lock {
817 my ($self) = @_;
818 # not using flock because it locks on filehandles instead of
819 # old school ressources.
820 my $locked = $self->_is_locked and return;
821 my $rfile = $self->rfile;
822 # XXX need a way to allow breaking the lock
823 my $start = time;
824 my $locktimeout = $self->locktimeout || 600;
825 my %have_warned;
826 GETLOCK: while (not mkdir "$rfile.lock") {
827 if (open my $fh, "<", "$rfile.lock/process") {
828 chomp(my $process = <$fh>);
829 if (0) {
830 } elsif ($$ == $process) {
831 last GETLOCK;
832 } elsif (kill 0, $process) {
833 warn "Warning: process $process holds a lock, waiting..." unless $have_warned{$process}++;
834 } else {
835 warn "Warning: breaking lock held by process $process";
836 sleep 1;
837 last GETLOCK;
840 Time::HiRes::sleep 0.01;
841 if (time - $start > $locktimeout) {
842 die "Could not acquire lockdirectory '$rfile.lock': $!";
845 open my $fh, ">", "$rfile.lock/process" or die "Could not open >$rfile.lock/process\: $!";
846 print $fh $$, "\n";
847 close $fh or die "Could not close: $!";
848 $self->_is_locked (1);
851 =head2 (void) $obj->merge ($other)
853 Bulk update of this object with another one. It's used to merge a
854 smaller and younger $other object into the current one. If this file
855 is a C<Z> file, then we normally do not merge in objects of type
856 C<delete>; this can be overridden by setting
857 keep_delete_objects_forever. But if we encounter an object of type
858 delete we delete the corresponding C<new> object if we have it.
860 If there is nothing to be merged, nothing is done.
862 =cut
864 sub merge {
865 my($self, $other) = @_;
866 $self->_merge_sanitycheck ( $other );
867 $other->lock;
868 my $other_recent = $other->recent_events || [];
869 # $DB::single++ if $other->interval_secs eq "2" and grep {$_->{epoch} eq "999.999"} @$other_recent;
870 $self->lock;
871 $self->_merge_locked ( $other, $other_recent );
872 $self->unlock;
873 $other->unlock;
876 sub _merge_locked {
877 my($self, $other, $other_recent) = @_;
878 my $my_recent = $self->recent_events || [];
880 # calculate the target time span
881 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
882 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
883 my $oldest_allowed = 0;
884 my $something_done;
885 unless ($my_recent->[0]) {
886 # obstetrics
887 $something_done = 1;
889 if ($epoch) {
890 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
891 $oldest_allowed = 0;
892 $something_done = 1;
893 } elsif (my $merged = $self->merged) {
894 my $secs = $self->interval_secs();
895 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
896 if (@$other_recent and
897 _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
899 $oldest_allowed = $other_recent->[-1]{epoch};
902 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
903 pop @$my_recent;
904 $something_done = 1;
908 my %have_path;
909 my $other_recent_filtered = [];
910 for my $oev (@$other_recent) {
911 my $oevepoch = $oev->{epoch} || 0;
912 next if _bigfloatlt($oevepoch, $oldest_allowed);
913 my $path = $oev->{path};
914 next if $have_path{$path}++;
915 if ( $self->interval eq "Z"
916 and $oev->{type} eq "delete"
917 and ! $self->keep_delete_objects_forever
919 # do nothing
920 } else {
921 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
922 $something_done = 1;
924 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
927 if ($something_done) {
928 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
932 sub _merge_something_done {
933 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
934 my $recent = [];
935 my $epoch_conflict = 0;
936 my $last_epoch;
937 ZIP: while (@$other_recent_filtered || @$my_recent) {
938 my $event;
939 if (!@$my_recent ||
940 @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
941 $event = shift @$other_recent_filtered;
942 } else {
943 $event = shift @$my_recent;
944 next ZIP if $have_path->{$event->{path}}++;
946 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
947 $last_epoch = $event->{epoch};
948 push @$recent, $event;
950 if ($epoch_conflict) {
951 my %have_epoch;
952 for (my $i = $#$recent;$i>=0;$i--) {
953 my $epoch = $recent->[$i]{epoch};
954 if ($have_epoch{$epoch}++) {
955 while ($have_epoch{$epoch}) {
956 $epoch = _increase_a_bit($epoch);
958 $recent->[$i]{epoch} = $epoch;
959 $have_epoch{$epoch}++;
963 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
964 $self->dirtymark ( $other->dirtymark );
966 $self->write_recent($recent);
967 $other->merged({
968 time => Time::HiRes::time, # not used anywhere
969 epoch => $recent->[0]{epoch},
970 into_interval => $self->interval, # not used anywhere
972 $other->write_recent($other_recent);
975 sub _merge_sanitycheck {
976 my($self, $other) = @_;
977 if ($self->interval_secs <= $other->interval_secs) {
978 require Carp;
979 Carp::confess
980 (sprintf
982 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
983 $self->interval_secs,
984 $other->interval_secs,
989 =head2 merged
991 Hashref denoting when this recentfile has been merged into some other
992 at which epoch.
994 =cut
996 sub merged {
997 my($self, $set) = @_;
998 if (defined $set) {
999 $self->_merged ($set);
1001 my $merged = $self->_merged;
1002 my $into;
1003 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
1004 # sanity checks
1005 if ($into eq $self->interval) {
1006 require Carp;
1007 Carp::cluck(sprintf
1009 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
1010 $into,
1011 $self->interval,
1013 } elsif ($self->interval_secs($into) < $self->interval_secs) {
1014 require Carp;
1015 Carp::cluck(sprintf
1017 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
1018 $self->interval_secs($into),
1019 $self->interval_secs,
1020 $self->interval,
1024 $merged;
1027 =head2 $hashref = $obj->meta_data
1029 Returns the hashref of metadata that the server has to add to the
1030 I<recentfile>.
1032 =cut
1034 sub meta_data {
1035 my($self) = @_;
1036 my $ret = $self->{meta};
1037 for my $m (
1038 "aggregator",
1039 "canonize",
1040 "comment",
1041 "dirtymark",
1042 "filenameroot",
1043 "interval",
1044 "merged",
1045 "minmax",
1046 "protocol",
1047 "serializer_suffix",
1049 my $v = $self->$m;
1050 if (defined $v) {
1051 $ret->{$m} = $v;
1054 # XXX need to reset the Producer if I am a writer, keep it when I
1055 # am a reader
1056 $ret->{Producers} ||= {
1057 __PACKAGE__, "$VERSION", # stringified it looks better
1058 '$0', $0,
1059 'time', Time::HiRes::time,
1061 $ret->{dirtymark} ||= Time::HiRes::time;
1062 return $ret;
1065 =head2 $success = $obj->mirror ( %options )
1067 Mirrors the files in this I<recentfile> as reported by
1068 C<recent_events>. Options named C<after>, C<before>, C<max> are passed
1069 through to the C<recent_events> call. The boolean option C<piecemeal>,
1070 if true, causes C<mirror> to only rsync C<max_files_per_connection>
1071 and keep track of the rsynced files so that future calls will rsync
1072 different files until all files are brought to sync.
1074 =cut
1076 sub mirror {
1077 my($self, %options) = @_;
1078 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1079 $self->_use_tempfile (1);
1080 # skip-deletes is inadequat for passthrough within mirror. We
1081 # would never reach uptodateness when a delete were on a
1082 # borderline
1083 my %passthrough = map { ($_ => $options{$_}) } qw(before after max);
1084 my ($recent_events) = $self->recent_events(%passthrough);
1085 my(@error, @dlcollector); # download-collector: array containing paths we need
1086 my $first_item = 0;
1087 my $last_item = $#$recent_events;
1088 my $done = $self->done;
1089 my $pathdb = $self->_pathdb;
1090 ITEM: for my $i ($first_item..$last_item) {
1091 my $status = +{};
1092 $self->_mirror_item
1095 $recent_events,
1096 $last_item,
1097 $done,
1098 $pathdb,
1099 \@dlcollector,
1100 \%options,
1101 $status,
1102 \@error,
1104 last if $i == $last_item;
1105 if ($status->{mustreturn}){
1106 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1107 # looks like a bug somewhere else
1108 my $t = $self->_current_tempfile;
1109 unlink $t or die "Could not unlink '$t': $!";
1110 $self->_current_tempfile(undef);
1111 $self->_use_tempfile(0);
1113 return;
1116 if (@dlcollector) {
1117 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
1118 if (!$success || $@) {
1119 warn "Warning: Unknown error while mirroring: $@";
1120 push @error, $@;
1121 sleep 1;
1124 if ($self->verbose) {
1125 my $LFH = $self->_logfilehandle;
1126 print $LFH "DONE\n";
1128 # once we've gone to the end we consider ourselves free of obligations
1129 $self->unseed;
1130 $self->_mirror_unhide_tempfile ($trecentfile);
1131 $self->_mirror_perform_delayed_ops(\%options);
1132 return !@error;
1135 sub _mirror_item {
1136 my($self,
1138 $recent_events,
1139 $last_item,
1140 $done,
1141 $pathdb,
1142 $dlcollector,
1143 $options,
1144 $status,
1145 $error,
1146 ) = @_;
1147 my $recent_event = $recent_events->[$i];
1148 return if $done->covered ( $recent_event->{epoch} );
1149 if ($pathdb) {
1150 my $rec = $pathdb->{$recent_event->{path}};
1151 if ($rec && $rec->{recentepoch}) {
1152 if (_bigfloatgt
1153 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1154 $done->register ($recent_events, [$i]);
1155 return;
1159 my $dst = $self->local_path($recent_event->{path});
1160 if ($recent_event->{type} eq "new"){
1161 $self->_mirror_item_new
1163 $dst,
1165 $last_item,
1166 $recent_events,
1167 $recent_event,
1168 $dlcollector,
1169 $pathdb,
1170 $status,
1171 $error,
1172 $options,
1174 } elsif ($recent_event->{type} eq "delete") {
1175 my $activity;
1176 if ($options->{'skip-deletes'}) {
1177 $activity = "skipped";
1178 } else {
1179 if (! -e $dst) {
1180 $activity = "not_found";
1181 } elsif (-l $dst or not -d _) {
1182 $self->delayed_operations->{unlink}{$dst}++;
1183 $activity = "deleted";
1184 } else {
1185 $self->delayed_operations->{rmdir}{$dst}++;
1186 $activity = "deleted";
1189 $done->register ($recent_events, [$i]);
1190 if ($pathdb) {
1191 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1193 } else {
1194 warn "Warning: invalid upload type '$recent_event->{type}'";
1198 sub _mirror_item_new {
1199 my($self,
1200 $dst,
1202 $last_item,
1203 $recent_events,
1204 $recent_event,
1205 $dlcollector,
1206 $pathdb,
1207 $status,
1208 $error,
1209 $options,
1210 ) = @_;
1211 if ($self->verbose) {
1212 my $doing = -e $dst ? "Sync" : "Get";
1213 my $LFH = $self->_logfilehandle;
1214 printf $LFH
1216 "%-4s %d (%d/%d/%s) %s ... ",
1217 $doing,
1218 time,
1219 1+$i,
1220 1+$last_item,
1221 $self->interval,
1222 $recent_event->{path},
1225 my $max_files_per_connection = $self->max_files_per_connection || 42;
1226 my $success;
1227 if ($self->verbose) {
1228 my $LFH = $self->_logfilehandle;
1229 print $LFH "\n";
1231 push @$dlcollector, { rev => $recent_event, i => $i };
1232 if (@$dlcollector >= $max_files_per_connection) {
1233 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1234 my $sleep = $self->sleep_per_connection;
1235 $sleep = 0.42 unless defined $sleep;
1236 Time::HiRes::sleep $sleep;
1237 if ($options->{piecemeal}) {
1238 $status->{mustreturn} = 1;
1239 return;
1241 } else {
1242 return;
1244 if (!$success || $@) {
1245 warn "Warning: Error while mirroring: $@";
1246 push @$error, $@;
1247 sleep 1;
1249 if ($self->verbose) {
1250 my $LFH = $self->_logfilehandle;
1251 print $LFH "DONE\n";
1255 sub _mirror_dlcollector {
1256 my($self,$xcoll,$pathdb,$recent_events) = @_;
1257 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1258 if ($pathdb) {
1259 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1261 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1262 @$xcoll = ();
1263 return $success;
1266 sub _mirror_register_path {
1267 my($self,$pathdb,$coll,$activity) = @_;
1268 my $time = time;
1269 for my $item (@$coll) {
1270 $pathdb->{$item->{path}} =
1272 recentepoch => $item->{epoch},
1273 ($activity."_on") => $time,
1278 sub _mirror_unhide_tempfile {
1279 my($self, $trecentfile) = @_;
1280 my $rfile = $self->rfile;
1281 if (rename $trecentfile, $rfile) {
1282 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1283 } else {
1284 require Carp;
1285 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1287 $self->_use_tempfile (0);
1288 if (my $ctfh = $self->_current_tempfile_fh) {
1289 $ctfh->unlink_on_destroy (0);
1290 $self->_current_tempfile_fh (undef);
1294 sub _mirror_perform_delayed_ops {
1295 my($self,$options) = @_;
1296 my $delayed = $self->delayed_operations;
1297 for my $dst (keys %{$delayed->{unlink}}) {
1298 unless (unlink $dst) {
1299 require Carp;
1300 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" ) if $options->{verbose};
1302 if ($self->verbose) {
1303 my $doing = "Del";
1304 my $LFH = $self->_logfilehandle;
1305 printf $LFH
1307 "%-4s %d (%s) %s DONE\n",
1308 $doing,
1309 time,
1310 $self->interval,
1311 $dst,
1313 delete $delayed->{unlink}{$dst};
1316 for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
1317 unless (rmdir $dst) {
1318 require Carp;
1319 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" ) if $options->{verbose};
1321 if ($self->verbose) {
1322 my $doing = "Del";
1323 my $LFH = $self->_logfilehandle;
1324 printf $LFH
1326 "%-4s %d (%s) %s DONE\n",
1327 $doing,
1328 time,
1329 $self->interval,
1330 $dst,
1332 delete $delayed->{rmdir}{$dst};
1337 =head2 $success = $obj->mirror_path ( $arrref | $path )
1339 If the argument is a scalar it is treated as a path. The remote path
1340 is mirrored into the local copy. $path is the path found in the
1341 I<recentfile>, i.e. it is relative to the root directory of the
1342 mirror.
1344 If the argument is an array reference then all elements are treated as
1345 a path below the current tree and all are rsynced with a single
1346 command (and a single connection).
1348 =cut
1350 sub mirror_path {
1351 my($self,$path) = @_;
1352 # XXX simplify the two branches such that $path is treated as
1353 # [$path] maybe even demand the argument as an arrayref to
1354 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1355 # interface)
1356 if (ref $path and ref $path eq "ARRAY") {
1357 my $dst = $self->localroot;
1358 mkpath dirname $dst;
1359 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1360 lc $self->filenameroot,
1362 TMPDIR => 1,
1363 UNLINK => 0,
1365 for my $p (@$path) {
1366 print $fh $p, "\n";
1368 $fh->flush;
1369 $fh->unlink_on_destroy(1);
1370 my $gaveup = 0;
1371 my $retried = 0;
1372 local($ENV{LANG}) = "C";
1373 while (!$self->rsync->exec
1375 src => join("/",
1376 $self->remoteroot,
1378 dst => $dst,
1379 'files-from' => $fh->filename,
1380 )) {
1381 my(@err) = $self->rsync->err;
1382 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1383 if ($self->verbose) {
1384 my $LFH = $self->_logfilehandle;
1385 print $LFH "Info: ignoring link_stat error '@err'";
1387 return 1;
1389 $self->register_rsync_error (@err);
1390 if (++$retried >= 3) {
1391 my $batchsize = @$path;
1392 warn "The number of rsync retries now reached 3 within a batch of size $batchsize. Error was '@err'. Giving up now, will retry later, ";
1393 $gaveup = 1;
1394 last;
1396 sleep 1;
1398 unless ($gaveup) {
1399 $self->un_register_rsync_error ();
1401 } else {
1402 my $dst = $self->local_path($path);
1403 mkpath dirname $dst;
1404 local($ENV{LANG}) = "C";
1405 while (!$self->rsync->exec
1407 src => join("/",
1408 $self->remoteroot,
1409 $path
1411 dst => $dst,
1412 )) {
1413 my(@err) = $self->rsync->err;
1414 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1415 if ($self->verbose) {
1416 my $LFH = $self->_logfilehandle;
1417 print $LFH "Info: ignoring link_stat error '@err'";
1419 return 1;
1421 $self->register_rsync_error (@err);
1423 $self->un_register_rsync_error ();
1425 return 1;
1428 sub _my_ignore_link_stat_errors {
1429 my($self) = @_;
1430 my $x = $self->ignore_link_stat_errors;
1431 $x = 1 unless defined $x;
1432 return $x;
1435 sub _my_current_rfile {
1436 my($self) = @_;
1437 my $rfile;
1438 if ($self->_use_tempfile) {
1439 $rfile = $self->_current_tempfile;
1441 unless ($rfile && -s $rfile) {
1442 $rfile = $self->rfile;
1444 return $rfile;
1447 =head2 $path = $obj->naive_path_normalize ($path)
1449 Takes an absolute unix style path as argument and canonicalizes it to
1450 a shorter path if possible, removing things like double slashes or
1451 C</./> and removes references to C<../> directories to get a shorter
1452 unambiguos path. This is used to make the code easier that determines
1453 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1455 =cut
1457 sub naive_path_normalize {
1458 my($self,$path) = @_;
1459 $path =~ s|/+|/|g;
1460 1 while $path =~ s|/[^/]+/\.\./|/|;
1461 $path =~ s|/$||;
1462 $path;
1465 =head2 $ret = $obj->read_recent_1 ( $data )
1467 Delegate of C<recent_events()> on protocol 1
1469 =cut
1471 sub read_recent_1 {
1472 my($self, $data) = @_;
1473 return $data->{recent};
1476 =head2 $array_ref = $obj->recent_events ( %options )
1478 Note: the code relies on the resource being written atomically. We
1479 cannot lock because we may have no write access. If the caller has
1480 write access (eg. aggregate() or update()), it has to care for any
1481 necessary locking and it MUST write atomically.
1483 If C<$options{after}> is specified, only file events after this
1484 timestamp are returned.
1486 If C<$options{before}> is specified, only file events before this
1487 timestamp are returned.
1489 If C<$options{max}> is specified only a maximum of this many events is
1490 returned.
1492 If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1493 will be returned.
1495 If C<$options{contains}> is specified the value must be a hash
1496 reference containing a query. The query may contain the keys C<epoch>,
1497 C<path>, and C<type>. Each represents a condition that must be met. If
1498 there is more than one such key, the conditions are ANDed.
1500 If C<$options{info}> is specified, it must be a hashref. This hashref
1501 will be filled with metadata about the unfiltered recent_events of
1502 this object, in key C<first> there is the first item, in key C<last>
1503 is the last.
1505 =cut
1507 sub recent_events {
1508 my ($self, %options) = @_;
1509 my $info = $options{info};
1510 if ($self->is_slave) {
1511 # XXX seems dubious, might produce tempfiles without removing them?
1512 $self->get_remote_recentfile_as_tempfile;
1514 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1515 -e $rfile_or_tempfile or return [];
1516 my $suffix = $self->serializer_suffix;
1517 my ($data) = eval {
1518 $self->_try_deserialize
1520 $suffix,
1521 $rfile_or_tempfile,
1524 my $err = $@;
1525 if ($err or !$data) {
1526 return [];
1528 my $re;
1529 if (reftype $data eq 'ARRAY') { # protocol 0
1530 $re = $data;
1531 } else {
1532 $re = $self->_recent_events_protocol_x
1534 $data,
1535 $rfile_or_tempfile,
1538 return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
1539 $self->_recent_events_handle_options ($re, \%options);
1542 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1543 sub _recent_events_handle_options {
1544 my($self, $re, $options) = @_;
1545 my $last_item = $#$re;
1546 my $info = $options->{info};
1547 if ($info) {
1548 $info->{first} = $re->[0];
1549 $info->{last} = $re->[-1];
1551 if (defined $options->{after}) {
1552 if ($re->[0]{epoch} > $options->{after}) {
1553 if (
1554 my $f = first
1555 {$re->[$_]{epoch} <= $options->{after}}
1556 0..$#$re
1558 $last_item = $f-1;
1560 } else {
1561 $last_item = -1;
1564 my $first_item = 0;
1565 if (defined $options->{before}) {
1566 if ($re->[0]{epoch} > $options->{before}) {
1567 if (
1568 my $f = first
1569 {$re->[$_]{epoch} < $options->{before}}
1570 0..$last_item
1572 $first_item = $f;
1574 } else {
1575 $first_item = 0;
1578 if (0 != $first_item || -1 != $last_item) {
1579 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1581 if ($options->{'skip-deletes'}) {
1582 @$re = grep { $_->{type} ne "delete" } @$re;
1584 if (my $contopt = $options->{contains}) {
1585 my $seen_allowed = 0;
1586 for my $allow (qw(epoch path type)) {
1587 if (exists $contopt->{$allow}) {
1588 $seen_allowed++;
1589 my $v = $contopt->{$allow};
1590 @$re = grep { $_->{$allow} eq $v } @$re;
1593 if (keys %$contopt > $seen_allowed) {
1594 require Carp;
1595 Carp::confess
1596 (sprintf "unknown query: %s", join ", ", %$contopt);
1599 if ($options->{max} && @$re > $options->{max}) {
1600 @$re = splice @$re, 0, $options->{max};
1602 $re;
1605 sub _recent_events_protocol_x {
1606 my($self,
1607 $data,
1608 $rfile_or_tempfile,
1609 ) = @_;
1610 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1611 # we may be reading meta for the first time
1612 while (my($k,$v) = each %{$data->{meta}}) {
1613 if ($k ne lc $k){ # "Producers"
1614 $self->{ORIG}{$k} = $v;
1615 next;
1617 next if defined $self->$k;
1618 $self->$k($v);
1620 my $re = $self->$meth ($data);
1621 my $minmax;
1622 if (my @stat = stat $rfile_or_tempfile) {
1623 $minmax = { mtime => $stat[9] };
1624 } else {
1625 # defensive because ABH encountered:
1627 #### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
1628 #### Ydr_.yaml ... DONE
1629 #### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
1630 #### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
1631 #### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
1632 #### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
1633 #### gone already at cpan-pause.pl line 0
1635 my $LFH = $self->_logfilehandle;
1636 print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
1638 if (@$re) {
1639 $minmax->{min} = $re->[-1]{epoch};
1640 $minmax->{max} = $re->[0]{epoch};
1642 $self->minmax ( $minmax );
1643 return $re;
1646 sub _try_deserialize {
1647 my($self,
1648 $suffix,
1649 $rfile_or_tempfile,
1650 ) = @_;
1651 if ($suffix eq ".yaml") {
1652 require YAML::Syck;
1653 YAML::Syck::LoadFile($rfile_or_tempfile);
1654 } elsif ($HAVE->{"Data::Serializer"}) {
1655 my $serializer = Data::Serializer->new
1656 ( serializer => $serializers{$suffix} );
1657 my $serialized = do
1659 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1660 local $/;
1661 <$fh>;
1663 $serializer->raw_deserialize($serialized);
1664 } else {
1665 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1669 sub _refresh_internals {
1670 my($self, $dst) = @_;
1671 my $class = ref $self;
1672 my $rfpeek = $class->new_from_file ($dst);
1673 for my $acc (qw(
1674 _merged
1675 minmax
1676 )) {
1677 $self->$acc ( $rfpeek->$acc );
1679 my $old_dirtymark = $self->dirtymark;
1680 my $new_dirtymark = $rfpeek->dirtymark;
1681 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1682 $self->done->reset;
1683 $self->dirtymark ( $new_dirtymark );
1684 $self->_uptodateness_ever_reached(0);
1685 $self->seed;
1689 =head2 $ret = $obj->rfilename
1691 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1692 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1694 =cut
1696 sub rfilename {
1697 my($self) = @_;
1698 my $file = sprintf("%s-%s%s",
1699 $self->filenameroot,
1700 $self->interval,
1701 $self->serializer_suffix,
1703 return $file;
1706 =head2 $str = $self->remote_dir
1708 The directory we are mirroring from.
1710 =cut
1712 sub remote_dir {
1713 my($self, $set) = @_;
1714 if (defined $set) {
1715 $self->_remote_dir ($set);
1717 my $x = $self->_remote_dir;
1718 $self->is_slave (1);
1719 return $x;
1722 =head2 $str = $obj->remoteroot
1724 =head2 (void) $obj->remoteroot ( $set )
1726 Get/Set the composed prefix needed when rsyncing from a remote module.
1727 If remote_host, remote_module, and remote_dir are set, it is composed
1728 from these.
1730 =cut
1732 sub remoteroot {
1733 my($self, $set) = @_;
1734 if (defined $set) {
1735 $self->_remoteroot($set);
1737 my $remoteroot = $self->_remoteroot;
1738 unless (defined $remoteroot) {
1739 $remoteroot = sprintf
1741 "%s%s%s",
1742 defined $self->remote_host ? ($self->remote_host."::") : "",
1743 defined $self->remote_module ? ($self->remote_module."/") : "",
1744 defined $self->remote_dir ? $self->remote_dir : "",
1746 $self->_remoteroot($remoteroot);
1748 return $remoteroot;
1751 =head2 (void) $obj->split_rfilename ( $recentfilename )
1753 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1754 of the pattern
1756 $filenameroot-$interval$serializer_suffix
1758 e.g.
1760 RECENT-1M.yaml
1762 This filename is split into its parts and the parts are fed to the
1763 object itself.
1765 =cut
1767 sub split_rfilename {
1768 my($self, $rfname) = @_;
1769 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1770 if (my($f,$i,$s) = $rfname =~ $splitter) {
1771 $self->filenameroot ($f);
1772 $self->interval ($i);
1773 $self->serializer_suffix ($s);
1774 } else {
1775 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1777 return;
1780 =head2 my $rfile = $obj->rfile
1782 Returns the full path of the I<recentfile>
1784 =cut
1786 sub rfile {
1787 my($self) = @_;
1788 my $rfile = $self->_rfile;
1789 return $rfile if defined $rfile;
1790 $rfile = File::Spec->catfile
1791 ($self->localroot,
1792 $self->rfilename,
1794 $self->_rfile ($rfile);
1795 return $rfile;
1798 =head2 $rsync_obj = $obj->rsync
1800 The File::Rsync object that this object uses for communicating with an
1801 upstream server.
1803 =cut
1805 sub rsync {
1806 my($self) = @_;
1807 my $rsync = $self->_rsync;
1808 unless (defined $rsync) {
1809 my $rsync_options = $self->rsync_options || {};
1810 if ($HAVE->{"File::Rsync"}) {
1811 $rsync = File::Rsync->new($rsync_options);
1812 $self->_rsync($rsync);
1813 } else {
1814 die "File::Rsync required for rsync operations. Cannot continue";
1817 return $rsync;
1820 =head2 (void) $obj->register_rsync_error(@err)
1822 =head2 (void) $obj->un_register_rsync_error()
1824 Register_rsync_error is called whenever the File::Rsync object fails
1825 on an exec (say, connection doesn't succeed). It issues a warning and
1826 sleeps for an increasing amount of time. Un_register_rsync_error
1827 resets the error count. See also accessor C<max_rsync_errors>.
1829 =cut
1832 my $no_success_count = 0;
1833 my $no_success_time = 0;
1834 sub register_rsync_error {
1835 my($self, @err) = @_;
1836 chomp @err;
1837 $no_success_time = time;
1838 $no_success_count++;
1839 my $max_rsync_errors = $self->max_rsync_errors;
1840 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1841 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1842 require Carp;
1843 Carp::confess
1845 sprintf
1847 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1848 $self->interval,
1849 join(" ",@err),
1850 $no_success_count,
1853 my $sleep = 12 * $no_success_count;
1854 $sleep = 300 if $sleep > 300;
1855 require Carp;
1856 Carp::cluck
1857 (sprintf
1859 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1860 scalar(localtime($no_success_time)),
1861 $self->interval,
1862 join(" ",@err),
1863 $sleep,
1865 sleep $sleep
1867 sub un_register_rsync_error {
1868 my($self) = @_;
1869 $no_success_time = 0;
1870 $no_success_count = 0;
1874 =head2 $clone = $obj->_sparse_clone
1876 Clones just as much from itself that it does not hurt. Experimental
1877 method.
1879 Note: what fits better: sparse or shallow? Other suggestions?
1881 =cut
1883 sub _sparse_clone {
1884 my($self) = @_;
1885 my $new = bless {}, ref $self;
1886 for my $m (qw(
1887 _interval
1888 _localroot
1889 _remoteroot
1890 _rfile
1891 _use_tempfile
1892 aggregator
1893 filenameroot
1894 ignore_link_stat_errors
1895 is_slave
1896 max_files_per_connection
1897 protocol
1898 rsync_options
1899 serializer_suffix
1900 sleep_per_connection
1901 tempdir
1902 verbose
1903 )) {
1904 my $o = $self->$m;
1905 $o = Storable::dclone $o if ref $o;
1906 $new->$m($o);
1908 $new;
1911 =head2 $boolean = OBJ->ttl_reached ()
1913 =cut
1915 sub ttl_reached {
1916 my($self) = @_;
1917 my $have_mirrored = $self->have_mirrored || 0;
1918 my $now = Time::HiRes::time;
1919 my $ttl = $self->ttl;
1920 $ttl = 24.2 unless defined $ttl;
1921 if ($now > $have_mirrored + $ttl) {
1922 return 1;
1924 return 0;
1927 =head2 (void) $obj->unlock()
1929 Unlocking is implemented with an C<rmdir> on a locking directory
1930 (C<.lock> appended to $rfile).
1932 =cut
1934 sub unlock {
1935 my($self) = @_;
1936 return unless $self->_is_locked;
1937 my $rfile = $self->rfile;
1938 unlink "$rfile.lock/process" or warn "Could not unlink lockfile '$rfile.lock/process': $!";
1939 rmdir "$rfile.lock" or warn "Could not rmdir lockdir '$rfile.lock': $!";;
1940 $self->_is_locked (0);
1943 =head2 unseed
1945 Sets this recentfile in the state of not 'seeded'.
1947 =cut
1948 sub unseed {
1949 my($self) = @_;
1950 $self->seeded(0);
1953 =head2 $ret = $obj->update ($path, $type)
1955 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1957 =head2 $ret = $obj->update ()
1959 Enter one file into the local I<recentfile>. $path is the (usually
1960 absolute) path. If the path is outside I<our> tree, then it is
1961 ignored.
1963 C<$type> is one of C<new> or C<delete>.
1965 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1966 not used and the epoch is calculated by the update() routine itself
1967 based on current time. But if there is the demand to insert a
1968 not-so-current file into the dataset, then the caller sets
1969 $dirty_epoch. This causes the epoch of the registered event to become
1970 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1971 bit more. As compensation the dirtymark of the whole dataset is set to
1972 now or the current epoch, whichever is higher. Note: setting the
1973 dirty_epoch to the future is prohibited as it's very unlikely to be
1974 intended: it definitely might wreak havoc with the index files.
1976 The new file event is unshifted (or, if dirty_epoch is set, inserted
1977 at the place it belongs to, according to the rule to have a sequence
1978 of strictly decreasing timestamps) to the array of recent_events and
1979 the array is shortened to the length of the timespan allowed. This is
1980 usually the timespan specified by the interval of this recentfile but
1981 as long as this recentfile has not been merged to another one, the
1982 timespan may grow without bounds.
1984 The third form runs an update without inserting a new file. This may
1985 be desired to truncate a recentfile.
1987 =cut
1988 sub _epoch_monotonically_increasing {
1989 my($self,$epoch,$recent) = @_;
1990 return $epoch unless @$recent; # the first one goes unoffended
1991 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1992 return $epoch;
1993 } else {
1994 return _increase_a_bit($recent->[0]{epoch});
1997 sub update {
1998 my($self,$path,$type,$dirty_epoch) = @_;
1999 if (defined $path or defined $type or defined $dirty_epoch) {
2000 die "update called without path argument" unless defined $path;
2001 die "update called without type argument" unless defined $type;
2002 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
2004 $self->lock;
2005 my $ctx = $self->_locked_batch_update([{path=>$path,type=>$type,epoch=>$dirty_epoch}]);
2006 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2007 $self->_assert_symlink;
2008 $self->unlock;
2011 =head2 $obj->batch_update($batch)
2013 Like update but for many files. $batch is an arrayref containing
2014 hashrefs with the structure
2017 path => $path,
2018 type => $type,
2019 epoch => $epoch,
2024 =cut
2025 sub batch_update {
2026 my($self,$batch) = @_;
2027 $self->lock;
2028 my $ctx = $self->_locked_batch_update($batch);
2029 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2030 $self->_assert_symlink;
2031 $self->unlock;
2033 sub _locked_batch_update {
2034 my($self,$batch) = @_;
2035 my $something_done = 0;
2036 my $recent = $self->recent_events;
2037 my %paths_in_recent = map { $_->{path} => undef } @$recent;
2038 my $interval = $self->interval;
2039 my $canonmeth = $self->canonize;
2040 unless ($canonmeth) {
2041 $canonmeth = "naive_path_normalize";
2043 my $oldest_allowed = 0;
2044 my $setting_new_dirty_mark = 0;
2045 my $console;
2046 if ($self->verbose && @$batch > 1) {
2047 eval {require Time::Progress};
2048 warn "dollarat[$@]" if $@;
2049 $| = 1;
2050 $console = new Time::Progress;
2051 $console->attr( min => 1, max => scalar @$batch );
2052 print "\n";
2054 my $i = 0;
2055 my $memo_splicepos;
2056 ITEM: for my $item (sort {($b->{epoch}||0) <=> ($a->{epoch}||0)} @$batch) {
2057 $i++;
2058 print $console->report( "\rdone %p elapsed: %L (%l sec), ETA %E (%e sec)", $i ) if $console and not $i % 50;
2059 my $ctx = $self->_update_batch_item($item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,\%paths_in_recent,$memo_splicepos);
2060 $something_done = $ctx->{something_done};
2061 $oldest_allowed = $ctx->{oldest_allowed};
2062 $setting_new_dirty_mark = $ctx->{setting_new_dirty_mark};
2063 $recent = $ctx->{recent};
2064 $memo_splicepos = $ctx->{memo_splicepos};
2066 print "\n" if $console;
2067 if ($setting_new_dirty_mark) {
2068 $oldest_allowed = 0;
2070 TRUNCATE: while (@$recent) {
2071 # $DB::single++ unless defined $oldest_allowed;
2072 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
2073 pop @$recent;
2074 $something_done = 1;
2075 } else {
2076 last TRUNCATE;
2079 return {something_done=>$something_done,recent=>$recent};
2081 sub _update_batch_item {
2082 my($self,$item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,$paths_in_recent,$memo_splicepos) = @_;
2083 my($path,$type,$dirty_epoch) = @{$item}{qw(path type epoch)};
2084 if (defined $path or defined $type or defined $dirty_epoch) {
2085 $path = $self->$canonmeth($path);
2087 # you must calculate the time after having locked, of course
2088 my $now = Time::HiRes::time;
2090 my $epoch;
2091 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
2092 $epoch = $dirty_epoch;
2093 } else {
2094 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
2096 $recent ||= [];
2097 my $merged = $self->merged;
2098 if ($merged->{epoch} && !$setting_new_dirty_mark) {
2099 my $virtualnow = _bigfloatmax($now,$epoch);
2100 # for the lower bound I think we need no big math, we calc already
2101 my $secs = $self->interval_secs();
2102 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
2103 } else {
2104 # as long as we are not merged at all, no limits!
2106 my $lrd = $self->localroot;
2107 if (defined $path && $path =~ s|^\Q$lrd\E||) {
2108 $path =~ s|^/||;
2109 my $splicepos;
2110 # remove the older duplicates of this $path, irrespective of $type:
2111 if (defined $dirty_epoch) {
2112 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch,$paths_in_recent,$memo_splicepos);
2113 $recent = $ctx->{recent};
2114 $splicepos = $ctx->{splicepos};
2115 $epoch = $ctx->{epoch};
2116 my $dirtymark = $self->dirtymark;
2117 my $new_dm = $now;
2118 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
2119 $new_dm = $epoch;
2121 $self->dirtymark($new_dm);
2122 $setting_new_dirty_mark = 1;
2123 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
2124 $self->merged(+{});
2126 } else {
2127 $recent = [ grep { $_->{path} ne $path } @$recent ];
2128 $splicepos = 0;
2130 if (defined $splicepos) {
2131 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
2132 $paths_in_recent->{$path} = undef;
2134 $memo_splicepos = $splicepos;
2135 $something_done = 1;
2137 return
2139 something_done => $something_done,
2140 oldest_allowed => $oldest_allowed,
2141 setting_new_dirty_mark => $setting_new_dirty_mark,
2142 recent => $recent,
2143 memo_splicepos => $memo_splicepos,
2146 sub _update_with_dirty_epoch {
2147 my($self,$path,$recent,$epoch,$paths_in_recent,$memo_splicepos) = @_;
2148 my $splicepos;
2149 my $new_recent = [];
2150 if (exists $paths_in_recent->{$path}) {
2151 my $cancel = 0;
2152 KNOWN_EVENT: for my $i (0..$#$recent) {
2153 if ($recent->[$i]{path} eq $path) {
2154 if ($recent->[$i]{epoch} eq $epoch) {
2155 # nothing to do
2156 $cancel = 1;
2157 last KNOWN_EVENT;
2159 } else {
2160 push @$new_recent, $recent->[$i];
2163 @$recent = @$new_recent unless $cancel;
2165 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
2166 $splicepos = 0;
2167 } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
2168 $splicepos = @$recent;
2169 } else {
2170 my $startingpoint;
2171 if (_bigfloatgt($memo_splicepos<=$#$recent && $epoch, $recent->[$memo_splicepos]{epoch})) {
2172 $startingpoint = 0;
2173 } else {
2174 $startingpoint = $memo_splicepos;
2176 RECENT: for my $i ($startingpoint..$#$recent) {
2177 my $ev = $recent->[$i];
2178 if ($epoch eq $recent->[$i]{epoch}) {
2179 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
2181 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
2182 $splicepos = $i;
2183 last RECENT;
2187 return {
2188 recent => $recent,
2189 splicepos => $splicepos,
2190 epoch => $epoch,
2194 =head2 seed
2196 Sets this recentfile in the state of 'seeded' which means it has to
2197 re-evaluate its uptodateness.
2199 =cut
2200 sub seed {
2201 my($self) = @_;
2202 $self->seeded(1);
2205 =head2 seeded
2207 Tells if the recentfile is in the state 'seeded'.
2209 =cut
2210 sub seeded {
2211 my($self, $set) = @_;
2212 if (defined $set) {
2213 $self->_seeded ($set);
2215 my $x = $self->_seeded;
2216 unless (defined $x) {
2217 $x = 0;
2218 $self->_seeded ($x);
2220 return $x;
2223 =head2 uptodate
2225 True if this object has mirrored the complete interval covered by the
2226 current recentfile.
2228 =cut
2229 sub uptodate {
2230 my($self) = @_;
2231 my $uptodate;
2232 my $why;
2233 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2234 $why = "saturated";
2235 $uptodate = 1;
2237 # it's too easy to misconfigure ttl and related timings and then
2238 # never reach uptodateness, so disabled 2009-03-22
2239 if (0 and not defined $uptodate) {
2240 if ($self->ttl_reached){
2241 $why = "ttl_reached returned true, so we are not uptodate";
2242 $uptodate = 0 ;
2245 unless (defined $uptodate) {
2246 # look if recentfile has unchanged timestamp
2247 my $minmax = $self->minmax;
2248 if (exists $minmax->{mtime}) {
2249 my $rfile = $self->_my_current_rfile;
2250 my @stat = stat $rfile or die "Could not stat '$rfile': $!";
2251 my $mtime = $stat[9];
2252 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
2253 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2254 $uptodate = 0;
2255 } else {
2256 my $covered = $self->done->covered(@$minmax{qw(max min)});
2257 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2258 $uptodate = $covered;
2262 unless (defined $uptodate) {
2263 $why = "fallthrough, so not uptodate";
2264 $uptodate = 0;
2266 if ($uptodate) {
2267 $self->_uptodateness_ever_reached(1);
2269 my $remember =
2271 uptodate => $uptodate,
2272 why => $why,
2274 $self->_remember_last_uptodate_call($remember);
2275 return $uptodate;
2278 =head2 $obj->write_recent ($recent_files_arrayref)
2280 Writes a I<recentfile> based on the current reflection of the current
2281 state of the tree limited by the current interval.
2283 =cut
2284 sub _resort {
2285 my($self) = @_;
2286 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
2287 return;
2289 sub write_recent {
2290 my ($self,$recent) = @_;
2291 die "write_recent called without argument" unless defined $recent;
2292 my $Last_epoch;
2293 SANITYCHECK: for my $i (0..$#$recent) {
2294 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2295 require Carp;
2296 Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2297 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2298 # you may want to:
2299 # $self->_resort($recent);
2300 # last SANITYCHECK;
2302 $Last_epoch = $recent->[$i]{epoch};
2304 my $minmax = $self->minmax;
2305 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2306 $minmax->{max} = $recent->[0]{epoch};
2308 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2309 $minmax->{min} = $recent->[-1]{epoch};
2311 $self->minmax($minmax);
2312 my $meth = sprintf "write_%d", $self->protocol;
2313 $self->$meth($recent);
2316 =head2 $obj->write_0 ($recent_files_arrayref)
2318 Delegate of C<write_recent()> on protocol 0
2320 =cut
2322 sub write_0 {
2323 my ($self,$recent) = @_;
2324 my $rfile = $self->rfile;
2325 YAML::Syck::DumpFile("$rfile.new",$recent);
2326 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2329 =head2 $obj->write_1 ($recent_files_arrayref)
2331 Delegate of C<write_recent()> on protocol 1
2333 =cut
2335 sub write_1 {
2336 my ($self,$recent) = @_;
2337 my $rfile = $self->rfile;
2338 my $suffix = $self->serializer_suffix;
2339 my $data = {
2340 meta => $self->meta_data,
2341 recent => $recent,
2343 my $serialized;
2344 if ($suffix eq ".yaml") {
2345 $serialized = YAML::Syck::Dump($data);
2346 } elsif ($HAVE->{"Data::Serializer"}) {
2347 my $serializer = Data::Serializer->new
2348 ( serializer => $serializers{$suffix} );
2349 $serialized = $serializer->raw_serialize($data);
2350 } else {
2351 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2353 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2354 print $fh $serialized;
2355 close $fh or die "Could not close '$rfile.new': $!";
2356 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2359 BEGIN {
2360 my $nq = qr/[^"]+/; # non-quotes
2361 my @pod_lines =
2362 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2364 =head1 SERIALIZERS
2366 The following suffixes are supported and trigger the use of these
2367 serializers:
2369 =over 4
2371 =item C<< ".yaml" => "YAML::Syck" >>
2373 =item C<< ".json" => "JSON" >>
2375 =item C<< ".sto" => "Storable" >>
2377 =item C<< ".dd" => "Data::Dumper" >>
2379 =back
2381 =cut
2383 BEGIN {
2384 my @pod_lines =
2385 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2387 =head1 INTERVAL SPEC
2389 An interval spec is a primitive way to express time spans. Normally it
2390 is composed from an integer and a letter.
2392 As a special case, a string that consists only of the single letter
2393 C<Z>, stands for MAX_INT seconds.
2395 The following letters express the specified number of seconds:
2397 =over 4
2399 =item C<< s => 1 >>
2401 =item C<< m => 60 >>
2403 =item C<< h => 60*60 >>
2405 =item C<< d => 60*60*24 >>
2407 =item C<< W => 60*60*24*7 >>
2409 =item C<< M => 60*60*24*30 >>
2411 =item C<< Q => 60*60*24*90 >>
2413 =item C<< Y => 60*60*24*365.25 >>
2415 =back
2417 =cut
2419 =head1 SEE ALSO
2421 L<File::Rsync::Mirror::Recent>,
2422 L<File::Rsync::Mirror::Recentfile::Done>,
2423 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2425 =head1 BUGS
2427 Please report any bugs or feature requests through the web interface
2429 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2430 I will be notified, and then you'll automatically be notified of
2431 progress on your bug as I make changes.
2433 =head1 KNOWN BUGS
2435 Memory hungry: it seems all memory is allocated during the initial
2436 rsync where a list of all files is maintained in memory.
2438 =head1 SUPPORT
2440 You can find documentation for this module with the perldoc command.
2442 perldoc File::Rsync::Mirror::Recentfile
2444 You can also look for information at:
2446 =over 4
2448 =item * RT: CPAN's request tracker
2450 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2452 =item * AnnoCPAN: Annotated CPAN documentation
2454 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2456 =item * CPAN Ratings
2458 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2460 =item * Search CPAN
2462 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2464 =back
2467 =head1 ACKNOWLEDGEMENTS
2469 Thanks to RJBS for module-starter.
2471 =head1 AUTHOR
2473 Andreas König
2475 =head1 COPYRIGHT & LICENSE
2477 Copyright 2008,2009 Andreas König.
2479 This program is free software; you can redistribute it and/or modify it
2480 under the same terms as Perl itself.
2483 =cut
2485 1; # End of File::Rsync::Mirror::Recentfile
2487 # Local Variables:
2488 # mode: cperl
2489 # cperl-indent-level: 4
2490 # End: