more defensive against broken procfile
[rersyncrecent.git] / lib / File / Rsync / Mirror / Recentfile.pm
blob60fa2886e2a16dcdeb4bc73a3a1fd10ca01af8f2
1 package File::Rsync::Mirror::Recentfile;
3 # use warnings;
4 use strict;
6 =encoding utf-8
8 =head1 NAME
10 File::Rsync::Mirror::Recentfile - mirroring via rsync made efficient
12 =cut
14 my $HAVE = {};
15 for my $package (
16 "Data::Serializer",
17 "File::Rsync"
18 ) {
19 $HAVE->{$package} = eval qq{ require $package; };
21 use Config;
22 use File::Basename qw(basename dirname fileparse);
23 use File::Copy qw(cp);
24 use File::Path qw(mkpath);
25 use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all);
26 use File::Temp;
27 use List::Util qw(first max min);
28 use Scalar::Util qw(reftype);
29 use Storable;
30 use Time::HiRes qw();
31 use YAML::Syck;
33 use version; our $VERSION = qv('0.0.8');
35 use constant MAX_INT => ~0>>1; # anything better?
36 use constant DEFAULT_PROTOCOL => 1;
38 # cf. interval_secs
39 my %seconds;
41 # maybe subclass if this mapping is bad?
42 my %serializers;
44 =head1 SYNOPSIS
46 Writer (of a single file):
48 use File::Rsync::Mirror::Recentfile;
49 my $fr = File::Rsync::Mirror::Recentfile->new
51 interval => q(6h),
52 filenameroot => "RECENT",
53 comment => "These 'RECENT' files are part of a test of a new CPAN mirroring concept. Please ignore them for now.",
54 localroot => "/home/ftp/pub/PAUSE/authors/",
55 aggregator => [qw(1d 1W 1M 1Q 1Y Z)],
57 $rf->update("/home/ftp/pub/PAUSE/authors/id/A/AN/ANDK/CPAN-1.92_63.tar.gz","new");
59 Reader/mirrorer:
61 my $rf = File::Rsync::Mirror::Recentfile->new
63 filenameroot => "RECENT",
64 interval => q(6h),
65 localroot => "/home/ftp/pub/PAUSE/authors",
66 remote_dir => "",
67 remote_host => "pause.perl.org",
68 remote_module => "authors",
69 rsync_options => {
70 compress => 1,
71 'rsync-path' => '/usr/bin/rsync',
72 links => 1,
73 times => 1,
74 'omit-dir-times' => 1,
75 checksum => 1,
77 verbose => 1,
79 $rf->mirror;
81 Aggregator (usually the writer):
83 my $rf = File::Rsync::Mirror::Recentfile->new_from_file ( $file );
84 $rf->aggregate;
86 =head1 DESCRIPTION
88 Lower level than F:R:M:Recent, handles one recentfile. Whereas a tree
89 is always composed of several recentfiles, controlled by the
90 F:R:M:Recent object. The Recentfile object has to do the bookkeeping
91 for a single timeslice.
93 =head1 EXPORT
95 No exports.
97 =head1 CONSTRUCTORS / DESTRUCTOR
99 =head2 my $obj = CLASS->new(%hash)
101 Constructor. On every argument pair the key is a method name and the
102 value is an argument to that method name.
104 If a recentfile for this resource already exists, metadata that are
105 not defined by the constructor will be fetched from there as soon as
106 it is being read by recent_events().
108 =cut
110 sub new {
111 my($class, @args) = @_;
112 my $self = bless {}, $class;
113 while (@args) {
114 my($method,$arg) = splice @args, 0, 2;
115 $self->$method($arg);
117 unless (defined $self->protocol) {
118 $self->protocol(DEFAULT_PROTOCOL);
120 unless (defined $self->filenameroot) {
121 $self->filenameroot("RECENT");
123 unless (defined $self->serializer_suffix) {
124 $self->serializer_suffix(".yaml");
126 return $self;
129 =head2 my $obj = CLASS->new_from_file($file)
131 Constructor. $file is a I<recentfile>.
133 =cut
135 sub new_from_file {
136 my($class, $file) = @_;
137 my $self = bless {}, $class;
138 $self->_rfile($file);
139 #?# $self->lock;
140 my $serialized = do { open my $fh, $file or die "Could not open '$file': $!";
141 local $/;
142 <$fh>;
144 # XXX: we can skip this step when the metadata are sufficient, but
145 # we cannot parse the file without some magic stuff about
146 # serialized formats
147 while (-l $file) {
148 my($name,$path) = fileparse $file;
149 my $symlink = readlink $file;
150 if ($symlink =~ m|/|) {
151 die "FIXME: filenames containing '/' not supported, got $symlink";
153 $file = File::Spec->catfile ( $path, $symlink );
155 my($name,$path,$suffix) = fileparse $file, keys %serializers;
156 $self->serializer_suffix($suffix);
157 $self->localroot($path);
158 die "Could not determine file format from suffix" unless $suffix;
159 my $deserialized;
160 if ($suffix eq ".yaml") {
161 require YAML::Syck;
162 $deserialized = YAML::Syck::LoadFile($file);
163 } elsif ($HAVE->{"Data::Serializer"}) {
164 my $serializer = Data::Serializer->new
165 ( serializer => $serializers{$suffix} );
166 $deserialized = $serializer->raw_deserialize($serialized);
167 } else {
168 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
170 while (my($k,$v) = each %{$deserialized->{meta}}) {
171 next if $k ne lc $k; # "Producers"
172 $self->$k($v);
174 unless (defined $self->protocol) {
175 $self->protocol(DEFAULT_PROTOCOL);
177 return $self;
180 =head2 DESTROY
182 A simple unlock.
184 =cut
185 sub DESTROY {
186 my $self = shift;
187 $self->unlock;
188 unless ($self->_current_tempfile_fh) {
189 if (my $tempfile = $self->_current_tempfile) {
190 if (-e $tempfile) {
191 # unlink $tempfile; # may fail in global destruction
197 =head1 ACCESSORS
199 =cut
201 my @accessors;
203 BEGIN {
204 @accessors = (
205 "_current_tempfile",
206 "_current_tempfile_fh",
207 "_delayed_operations",
208 "_done",
209 "_interval",
210 "_is_locked",
211 "_localroot",
212 "_merged",
213 "_pathdb",
214 "_remember_last_uptodate_call",
215 "_remote_dir",
216 "_remoteroot",
217 "_rfile",
218 "_rsync",
219 "__verified_tempdir",
220 "_seeded",
221 "_uptodateness_ever_reached",
222 "_use_tempfile",
225 my @pod_lines =
226 split /\n/, <<'=cut'; push @accessors, grep {s/^=item\s+//} @pod_lines; }
228 =over 4
230 =item aggregator
232 A list of interval specs that tell the aggregator which I<recentfile>s
233 are to be produced.
235 =item canonize
237 The name of a method to canonize the path before rsyncing. Only
238 supported value is C<naive_path_normalize>. Defaults to that.
240 =item comment
242 A comment about this tree and setup.
244 =item dirtymark
246 A timestamp. The dirtymark is updated whenever an out of band change
247 on the origin server is performed that violates the protocol. Say,
248 they add or remove files in the middle somewhere. Slaves must react
249 with a devaluation of their C<done> structure which then leads to a
250 full re-sync of all files. Implementation note: dirtymark may increase
251 or decrease.
253 =item filenameroot
255 The (prefix of the) filename we use for this I<recentfile>. Defaults to
256 C<RECENT>. The string must not contain a directory separator.
258 =item have_mirrored
260 Timestamp remembering when we mirrored this recentfile the last time.
261 Only relevant for slaves.
263 =item ignore_link_stat_errors
265 If set to true, rsync errors are ignored that complain about link stat
266 errors. These seem to happen only when there are files missing at the
267 origin. In race conditions this can always happen, so it defaults to
268 true.
270 =item is_slave
272 If set to true, this object will fetch a new recentfile from remote
273 when the timespan between the last mirror (see have_mirrored) and now
274 is too large (see C<ttl>).
276 =item keep_delete_objects_forever
278 The default for delete events is that they are passed through the
279 collection of recentfile objects until they reach the Z file. There
280 they get dropped so that the associated file object ceases to exist at
281 all. By setting C<keep_delete_objects_forever> the delete objects are
282 kept forever. This makes the Z file larger but has the advantage that
283 slaves that have interrupted mirroring for a long time still can clean
284 up their copy.
286 =item locktimeout
288 After how many seconds shall we die if we cannot lock a I<recentfile>?
289 Defaults to 600 seconds.
291 =item loopinterval
293 When mirror_loop is called, this accessor can specify how much time
294 every loop shall at least take. If the work of a loop is done before
295 that time has gone, sleeps for the rest of the time. Defaults to
296 arbitrary 42 seconds.
298 =item max_files_per_connection
300 Maximum number of files that are transferred on a single rsync call.
301 Setting it higher means higher performance at the price of holding
302 connections longer and potentially disturbing other users in the pool.
303 Defaults to the arbitrary value 42.
305 =item max_rsync_errors
307 When rsync operations encounter that many errors without any resetting
308 success in between, then we die. Defaults to unlimited. A value of
309 -1 means we run forever ignoring all rsync errors.
311 =item minmax
313 Hashref remembering when we read the recent_events from this file the
314 last time and what the timespan was.
316 =item protocol
318 When the RECENT file format changes, we increment the protocol. We try
319 to support older protocols in later releases.
321 =item remote_host
323 The host we are mirroring from. Leave empty for the local filesystem.
325 =item remote_module
327 Rsync servers have so called modules to separate directory trees from
328 each other. Put here the name of the module under which we are
329 mirroring. Leave empty for local filesystem.
331 =item rsync_options
333 Things like compress, links, times or checksums. Passed in to the
334 File::Rsync object used to run the mirror.
336 =item serializer_suffix
338 Mostly untested accessor. The only well tested format for
339 I<recentfile>s at the moment is YAML. It is used with YAML::Syck via
340 Data::Serializer. But in principle other formats are supported as
341 well. See section SERIALIZERS below.
343 =item sleep_per_connection
345 Sleep that many seconds (floating point OK) after every chunk of rsyncing
346 has finished. Defaults to arbitrary 0.42.
348 =item tempdir
350 Directory to write temporary files to. Must allow rename operations
351 into the tree which usually means it must live on the same partition
352 as the target directory. Defaults to C<< $self->localroot >>.
354 =item ttl
356 Time to live. Number of seconds after which this recentfile must be
357 fetched again from the origin server. Only relevant for slaves.
358 Defaults to arbitrary 24.2 seconds.
360 =item verbose
362 Boolean to turn on a bit verbosity.
364 =item verboselog
366 Path to the logfile to write verbose progress information to. This is
367 a primitive stop gap solution to get simple verbose logging working.
368 Switching to Log4perl or similar is probably the way to go.
370 =back
372 =cut
374 use accessors @accessors;
376 =head1 METHODS
378 =head2 (void) $obj->aggregate( %options )
380 Takes all intervals that are collected in the accessor called
381 aggregator. Sorts them by actual length of the interval.
382 Removes those that are shorter than our own interval. Then merges this
383 object into the next larger object. The merging continues upwards
384 as long as the next I<recentfile> is old enough to warrant a merge.
386 If a merge is warranted is decided according to the interval of the
387 previous interval so that larger files are not so often updated as
388 smaller ones. If $options{force} is true, all files get updated.
390 Here is an example to illustrate the behaviour. Given aggregators
392 1h 1d 1W 1M 1Q 1Y Z
394 then
396 1h updates 1d on every call to aggregate()
397 1d updates 1W earliest after 1h
398 1W updates 1M earliest after 1d
399 1M updates 1Q earliest after 1W
400 1Q updates 1Y earliest after 1M
401 1Y updates Z earliest after 1Q
403 Note that all but the smallest recentfile get updated at an arbitrary
404 rate and as such are quite useless on their own.
406 =cut
408 sub aggregate {
409 my($self, %option) = @_;
410 my %seen_interval;
411 my @aggs = sort { $a->{secs} <=> $b->{secs} }
412 grep { !$seen_interval{$_->{interval}}++ && $_->{secs} >= $self->interval_secs }
413 map { { interval => $_, secs => $self->interval_secs($_)} }
414 $self->interval, @{$self->aggregator || []};
415 $self->update;
416 $aggs[0]{object} = $self;
417 AGGREGATOR: for my $i (0..$#aggs-1) {
418 my $this = $aggs[$i]{object};
419 my $next = $this->_sparse_clone;
420 $next->interval($aggs[$i+1]{interval});
421 my $want_merge = 0;
422 if ($option{force} || $i == 0) {
423 $want_merge = 1;
424 } else {
425 my $next_rfile = $next->rfile;
426 if (-e $next_rfile) {
427 my $prev = $aggs[$i-1]{object};
428 local $^T = time;
429 my $next_age = 86400 * -M $next_rfile;
430 if ($next_age > $prev->interval_secs) {
431 $want_merge = 1;
433 } else {
434 $want_merge = 1;
437 if ($want_merge) {
438 $next->merge($this);
439 $aggs[$i+1]{object} = $next;
440 } else {
441 last AGGREGATOR;
446 # collect file size and mtime for all files of this aggregate
447 sub _debug_aggregate {
448 my($self) = @_;
449 my @aggs = sort { $a->{secs} <=> $b->{secs} }
450 map { { interval => $_, secs => $self->interval_secs($_)} }
451 $self->interval, @{$self->aggregator || []};
452 my $report = [];
453 for my $i (0..$#aggs) {
454 my $this = Storable::dclone $self;
455 $this->interval($aggs[$i]{interval});
456 my $rfile = $this->rfile;
457 my @stat = stat $rfile;
458 push @$report, {rfile => $rfile, size => $stat[7], mtime => $stat[9]};
460 $report;
463 # (void) $self->_assert_symlink()
464 sub _assert_symlink {
465 my($self) = @_;
466 my $recentrecentfile = File::Spec->catfile
468 $self->localroot,
469 sprintf
471 "%s.recent",
472 $self->filenameroot
475 if ($Config{d_symlink} eq "define") {
476 my $howto_create_symlink; # 0=no need; 1=straight symlink; 2=rename symlink
477 if (-l $recentrecentfile) {
478 my $found_symlink = readlink $recentrecentfile;
479 if ($found_symlink eq $self->rfilename) {
480 return;
481 } else {
482 $howto_create_symlink = 2;
484 } else {
485 $howto_create_symlink = 1;
487 if (1 == $howto_create_symlink) {
488 symlink $self->rfilename, $recentrecentfile or die "Could not create symlink '$recentrecentfile': $!"
489 } else {
490 unlink "$recentrecentfile.$$"; # may fail
491 symlink $self->rfilename, "$recentrecentfile.$$" or die "Could not create symlink '$recentrecentfile.$$': $!";
492 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
494 } else {
495 warn "Warning: symlinks not supported on this system, doing a copy instead\n";
496 unlink "$recentrecentfile.$$"; # may fail
497 cp $self->rfilename, "$recentrecentfile.$$" or die "Could not copy to '$recentrecentfile.$$': $!";
498 rename "$recentrecentfile.$$", $recentrecentfile or die "Could not rename '$recentrecentfile.$$' to $recentrecentfile: $!";
502 =head2 $hashref = $obj->delayed_operations
504 A hash of hashes containing unlink and rmdir operations which had to
505 wait until the recentfile got unhidden in order to not confuse
506 downstream mirrors (in case we have some).
508 =cut
510 sub delayed_operations {
511 my($self) = @_;
512 my $x = $self->_delayed_operations;
513 unless (defined $x) {
514 $x = {
515 unlink => {},
516 rmdir => {},
518 $self->_delayed_operations ($x);
520 return $x;
523 =head2 $done = $obj->done
525 C<$done> is a reference to a L<File::Rsync::Mirror::Recentfile::Done>
526 object that keeps track of rsync activities. Only needed and used when
527 we are a mirroring slave.
529 =cut
531 sub done {
532 my($self) = @_;
533 my $done = $self->_done;
534 if (!$done) {
535 require File::Rsync::Mirror::Recentfile::Done;
536 $done = File::Rsync::Mirror::Recentfile::Done->new();
537 $done->_rfinterval ($self->interval);
538 $self->_done ( $done );
540 return $done;
543 =head2 $tempfilename = $obj->get_remote_recentfile_as_tempfile ()
545 Stores the remote I<recentfile> locally as a tempfile. The caller is
546 responsible to remove the file after use.
548 Note: if you're intending to act as an rsync server for other slaves,
549 then you must prefer this method to fetch that file with
550 get_remotefile(). Otherwise downstream mirrors would expect you to
551 already have mirrored all the files that are in the I<recentfile>
552 before you have them mirrored.
554 =cut
556 sub get_remote_recentfile_as_tempfile {
557 my($self) = @_;
558 mkpath $self->localroot;
559 my $fh;
560 my $trfilename;
561 if ( $self->_use_tempfile() ) {
562 if ($self->ttl_reached) {
563 $fh = $self->_current_tempfile_fh;
564 $trfilename = $self->rfilename;
565 } else {
566 return $self->_current_tempfile;
568 } else {
569 $trfilename = $self->rfilename;
572 my $dst;
573 if ($fh) {
574 $dst = $self->_current_tempfile;
575 } else {
576 $fh = $self->_get_remote_rat_provide_tempfile_object ($trfilename);
577 $dst = $fh->filename;
578 $self->_current_tempfile ($dst);
579 my $rfile = eval { $self->rfile; }; # may fail (RECENT.recent has no rfile)
580 if (defined $rfile && -e $rfile) {
581 # saving on bandwidth. Might need to be configurable
582 # $self->bandwidth_is_cheap?
583 cp $rfile, $dst or die "Could not copy '$rfile' to '$dst': $!"
586 my $src = join ("/",
587 $self->remoteroot,
588 $trfilename,
590 if ($self->verbose) {
591 my $doing = -e $dst ? "Sync" : "Get";
592 my $display_dst = join "/", "...", basename(dirname($dst)), basename($dst);
593 my $LFH = $self->_logfilehandle;
594 printf $LFH
596 "%-4s %d (1/1/%s) temp %s ... ",
597 $doing,
598 time,
599 $self->interval,
600 $display_dst,
603 my $gaveup = 0;
604 my $retried = 0;
605 local($ENV{LANG}) = "C";
606 while (!$self->rsync->exec(
607 src => $src,
608 dst => $dst,
609 )) {
610 $self->register_rsync_error ($self->rsync->err);
611 if (++$retried >= 3) {
612 warn "XXX giving up";
613 $gaveup = 1;
614 last;
617 if ($gaveup) {
618 my $LFH = $self->_logfilehandle;
619 printf $LFH "Warning: gave up mirroring %s, will try again later", $self->interval;
620 } else {
621 $self->_refresh_internals ($dst);
622 $self->have_mirrored (Time::HiRes::time);
623 $self->un_register_rsync_error ();
625 $self->unseed;
626 if ($self->verbose) {
627 my $LFH = $self->_logfilehandle;
628 print $LFH "DONE\n";
630 my $mode = 0644;
631 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
632 return $dst;
635 sub _verified_tempdir {
636 my($self) = @_;
637 my $tempdir = $self->__verified_tempdir();
638 return $tempdir if defined $tempdir;
639 unless ($tempdir = $self->tempdir) {
640 $tempdir = $self->localroot;
642 unless (-d $tempdir) {
643 mkpath $tempdir;
645 $self->__verified_tempdir($tempdir);
646 return $tempdir;
649 sub _get_remote_rat_provide_tempfile_object {
650 my($self, $trfilename) = @_;
651 my $_verified_tempdir = $self->_verified_tempdir;
652 my $fh = File::Temp->new
653 (TEMPLATE => sprintf(".FRMRecent-%s-XXXX",
654 $trfilename,
656 DIR => $_verified_tempdir,
657 SUFFIX => $self->serializer_suffix,
658 UNLINK => $self->_use_tempfile,
660 my $mode = 0644;
661 my $dst = $fh->filename;
662 chmod $mode, $dst or die "Could not chmod $mode '$dst': $!";
663 if ($self->_use_tempfile) {
664 $self->_current_tempfile_fh ($fh); # delay self destruction
666 return $fh;
669 sub _logfilehandle {
670 my($self) = @_;
671 my $fh;
672 if (my $vl = $self->verboselog) {
673 open $fh, ">>", $vl or die "Could not open >> '$vl': $!";
674 } else {
675 $fh = \*STDERR;
677 return $fh;
680 =head2 $localpath = $obj->get_remotefile ( $relative_path )
682 Rsyncs one single remote file to local filesystem.
684 Note: no locking is done on this file. Any number of processes may
685 mirror this object.
687 Note II: do not use for recentfiles. If you are a cascading
688 slave/server combination, it would confuse other slaves. They would
689 expect the contents of these recentfiles to be available. Use
690 get_remote_recentfile_as_tempfile() instead.
692 =cut
694 sub get_remotefile {
695 my($self, $path) = @_;
696 my $dst = File::Spec->catfile($self->localroot, $path);
697 mkpath dirname $dst;
698 if ($self->verbose) {
699 my $doing = -e $dst ? "Sync" : "Get";
700 my $LFH = $self->_logfilehandle;
701 printf $LFH
703 "%-4s %d (1/1/%s) %s ... ",
704 $doing,
705 time,
706 $self->interval,
707 $path,
710 local($ENV{LANG}) = "C";
711 my $remoteroot = $self->remoteroot or die "Alert: missing remoteroot. Cannot continue";
712 while (!$self->rsync->exec(
713 src => join("/",
714 $remoteroot,
715 $path),
716 dst => $dst,
717 )) {
718 $self->register_rsync_error ($self->rsync->err);
720 $self->un_register_rsync_error ();
721 if ($self->verbose) {
722 my $LFH = $self->_logfilehandle;
723 print $LFH "DONE\n";
725 return $dst;
728 =head2 $obj->interval ( $interval_spec )
730 Get/set accessor. $interval_spec is a string and described below in
731 the section INTERVAL SPEC.
733 =cut
735 sub interval {
736 my ($self, $interval) = @_;
737 if (@_ >= 2) {
738 $self->_interval($interval);
739 $self->_rfile(undef);
741 $interval = $self->_interval;
742 unless (defined $interval) {
743 # do not ask the $self too much, it recurses!
744 require Carp;
745 Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue.");
747 return $interval;
750 =head2 $secs = $obj->interval_secs ( $interval_spec )
752 $interval_spec is described below in the section INTERVAL SPEC. If
753 empty defaults to the inherent interval for this object.
755 =cut
757 sub interval_secs {
758 my ($self, $interval) = @_;
759 $interval ||= $self->interval;
760 unless (defined $interval) {
761 die "interval_secs() called without argument on an object without a declared one";
763 my ($n,$t) = $interval =~ /^(\d*)([smhdWMQYZ]$)/ or
764 die "Could not determine seconds from interval[$interval]";
765 if ($interval eq "Z") {
766 return MAX_INT;
767 } elsif (exists $seconds{$t} and $n =~ /^\d+$/) {
768 return $seconds{$t}*$n;
769 } else {
770 die "Invalid interval specification: n[$n]t[$t]";
774 =head2 $obj->localroot ( $localroot )
776 Get/set accessor. The local root of the tree.
778 =cut
780 sub localroot {
781 my ($self, $localroot) = @_;
782 if (@_ >= 2) {
783 $self->_localroot($localroot);
784 $self->_rfile(undef);
786 $localroot = $self->_localroot;
789 =head2 $ret = $obj->local_path($path_found_in_recentfile)
791 Combines the path to our local mirror and the path of an object found
792 in this I<recentfile>. In other words: the target of a mirror operation.
794 Implementation note: We split on slashes and then use
795 File::Spec::catfile to adjust to the local operating system.
797 =cut
799 sub local_path {
800 my($self,$path) = @_;
801 unless (defined $path) {
802 # seems like a degenerated case
803 return $self->localroot;
805 my @p = split m|/|, $path;
806 File::Spec->catfile($self->localroot,@p);
809 =head2 (void) $obj->lock
811 Locking is implemented with an C<mkdir> on a locking directory
812 (C<.lock> appended to $rfile).
814 =cut
816 sub lock {
817 my ($self) = @_;
818 # not using flock because it locks on filehandles instead of
819 # old school ressources.
820 my $locked = $self->_is_locked and return;
821 my $rfile = $self->rfile;
822 # XXX need a way to allow breaking the lock
823 my $start = time;
824 my $locktimeout = $self->locktimeout || 600;
825 my %have_warned;
826 my $lockdir = "$rfile.lock";
827 my $procfile = "$lockdir/process";
828 GETLOCK: while (not mkdir $lockdir) {
829 if (open my $fh, "<", $procfile) {
830 chomp(my $process = <$fh>);
831 if (0) {
832 } elsif ($process !~ /^\d+$/) {
833 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
834 } elsif ($$ == $process) {
835 last GETLOCK;
836 } elsif (kill 0, $process) {
837 warn "Warning: process $process holds a lock in '$lockdir', waiting..." unless $have_warned{$process}++;
838 } else {
839 warn "Warning: breaking lock held by process $process";
840 sleep 1;
841 last GETLOCK;
843 } else {
844 warn "Warning: unknown process holds a lock in '$lockdir', waiting..." unless $have_warned{unknown}++;
846 Time::HiRes::sleep 0.01;
847 if (time - $start > $locktimeout) {
848 die "Could not acquire lockdirectory '$rfile.lock': $!";
850 } # GETLOCK
851 open my $fh, ">", $procfile or die "Could not open >$procfile\: $!";
852 print $fh $$, "\n";
853 close $fh or die "Could not close: $!";
854 $self->_is_locked (1);
857 =head2 (void) $obj->merge ($other)
859 Bulk update of this object with another one. It's used to merge a
860 smaller and younger $other object into the current one. If this file
861 is a C<Z> file, then we normally do not merge in objects of type
862 C<delete>; this can be overridden by setting
863 keep_delete_objects_forever. But if we encounter an object of type
864 delete we delete the corresponding C<new> object if we have it.
866 If there is nothing to be merged, nothing is done.
868 =cut
870 sub merge {
871 my($self, $other) = @_;
872 $self->_merge_sanitycheck ( $other );
873 $other->lock;
874 my $other_recent = $other->recent_events || [];
875 $self->lock;
876 $self->_merge_locked ( $other, $other_recent );
877 $self->unlock;
878 $other->unlock;
881 sub _merge_locked {
882 my($self, $other, $other_recent) = @_;
883 my $my_recent = $self->recent_events || [];
885 # calculate the target time span
886 my $myepoch = $my_recent->[0] ? $my_recent->[0]{epoch} : undef;
887 my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $myepoch;
888 my $oldest_allowed = 0;
889 my $something_done;
890 unless ($my_recent->[0]) {
891 # obstetrics
892 $something_done = 1;
894 if ($epoch) {
895 if (($other->dirtymark||0) ne ($self->dirtymark||0)) {
896 $oldest_allowed = 0;
897 $something_done = 1;
898 } elsif (my $merged = $self->merged) {
899 my $secs = $self->interval_secs();
900 $oldest_allowed = min($epoch - $secs, $merged->{epoch}||0);
901 if (@$other_recent and
902 _bigfloatlt($other_recent->[-1]{epoch}, $oldest_allowed)
904 $oldest_allowed = $other_recent->[-1]{epoch};
907 while (@$my_recent && _bigfloatlt($my_recent->[-1]{epoch}, $oldest_allowed)) {
908 pop @$my_recent;
909 $something_done = 1;
913 my %have_path;
914 my $other_recent_filtered = [];
915 for my $oev (@$other_recent) {
916 my $oevepoch = $oev->{epoch} || 0;
917 next if _bigfloatlt($oevepoch, $oldest_allowed);
918 my $path = $oev->{path};
919 next if $have_path{$path}++;
920 if ( $self->interval eq "Z"
921 and $oev->{type} eq "delete"
922 and ! $self->keep_delete_objects_forever
924 # do nothing
925 } else {
926 if (!$myepoch || _bigfloatgt($oevepoch, $myepoch)) {
927 $something_done = 1;
929 push @$other_recent_filtered, { epoch => $oev->{epoch}, path => $path, type => $oev->{type} };
932 if ($something_done) {
933 $self->_merge_something_done ($other_recent_filtered, $my_recent, $other_recent, $other, \%have_path, $epoch);
937 sub _merge_something_done {
938 my($self, $other_recent_filtered, $my_recent, $other_recent, $other, $have_path, $epoch) = @_;
939 my $recent = [];
940 my $epoch_conflict = 0;
941 my $last_epoch;
942 ZIP: while (@$other_recent_filtered || @$my_recent) {
943 my $event;
944 if (!@$my_recent ||
945 @$other_recent_filtered && _bigfloatge($other_recent_filtered->[0]{epoch},$my_recent->[0]{epoch})) {
946 $event = shift @$other_recent_filtered;
947 } else {
948 $event = shift @$my_recent;
949 next ZIP if $have_path->{$event->{path}}++;
951 $epoch_conflict=1 if defined $last_epoch && $event->{epoch} eq $last_epoch;
952 $last_epoch = $event->{epoch};
953 push @$recent, $event;
955 if ($epoch_conflict) {
956 my %have_epoch;
957 for (my $i = $#$recent;$i>=0;$i--) {
958 my $epoch = $recent->[$i]{epoch};
959 if ($have_epoch{$epoch}++) {
960 while ($have_epoch{$epoch}) {
961 $epoch = _increase_a_bit($epoch);
963 $recent->[$i]{epoch} = $epoch;
964 $have_epoch{$epoch}++;
968 if (!$self->dirtymark || $other->dirtymark ne $self->dirtymark) {
969 $self->dirtymark ( $other->dirtymark );
971 $self->write_recent($recent);
972 $other->merged({
973 time => Time::HiRes::time, # not used anywhere
974 epoch => $recent->[0]{epoch},
975 into_interval => $self->interval, # not used anywhere
977 $other->write_recent($other_recent);
980 sub _merge_sanitycheck {
981 my($self, $other) = @_;
982 if ($self->interval_secs <= $other->interval_secs) {
983 require Carp;
984 Carp::confess
985 (sprintf
987 "Alert: illegal merge operation of a bigger interval[%d] into a smaller[%d]",
988 $self->interval_secs,
989 $other->interval_secs,
994 =head2 merged
996 Hashref denoting when this recentfile has been merged into some other
997 at which epoch.
999 =cut
1001 sub merged {
1002 my($self, $set) = @_;
1003 if (defined $set) {
1004 $self->_merged ($set);
1006 my $merged = $self->_merged;
1007 my $into;
1008 if ($merged and $into = $merged->{into_interval} and defined $self->_interval) {
1009 # sanity checks
1010 if ($into eq $self->interval) {
1011 require Carp;
1012 Carp::cluck(sprintf
1014 "Warning: into_interval[%s] same as own interval[%s]. Danger ahead.",
1015 $into,
1016 $self->interval,
1018 } elsif ($self->interval_secs($into) < $self->interval_secs) {
1019 require Carp;
1020 Carp::cluck(sprintf
1022 "Warning: into_interval_secs[%s] smaller than own interval_secs[%s] on interval[%s]. Danger ahead.",
1023 $self->interval_secs($into),
1024 $self->interval_secs,
1025 $self->interval,
1029 $merged;
1032 =head2 $hashref = $obj->meta_data
1034 Returns the hashref of metadata that the server has to add to the
1035 I<recentfile>.
1037 =cut
1039 sub meta_data {
1040 my($self) = @_;
1041 my $ret = $self->{meta};
1042 for my $m (
1043 "aggregator",
1044 "canonize",
1045 "comment",
1046 "dirtymark",
1047 "filenameroot",
1048 "interval",
1049 "merged",
1050 "minmax",
1051 "protocol",
1052 "serializer_suffix",
1054 my $v = $self->$m;
1055 if (defined $v) {
1056 $ret->{$m} = $v;
1059 # XXX need to reset the Producer if I am a writer, keep it when I
1060 # am a reader
1061 $ret->{Producers} ||= {
1062 __PACKAGE__, "$VERSION", # stringified it looks better
1063 '$0', $0,
1064 'time', Time::HiRes::time,
1066 $ret->{dirtymark} ||= Time::HiRes::time;
1067 return $ret;
1070 =head2 $success = $obj->mirror ( %options )
1072 Mirrors the files in this I<recentfile> as reported by
1073 C<recent_events>. Options named C<after>, C<before>, C<max> are passed
1074 through to the C<recent_events> call. The boolean option C<piecemeal>,
1075 if true, causes C<mirror> to only rsync C<max_files_per_connection>
1076 and keep track of the rsynced files so that future calls will rsync
1077 different files until all files are brought to sync.
1079 =cut
1081 sub mirror {
1082 my($self, %options) = @_;
1083 my $trecentfile = $self->get_remote_recentfile_as_tempfile();
1084 $self->_use_tempfile (1);
1085 # skip-deletes is inadequat for passthrough within mirror. We
1086 # would never reach uptodateness when a delete were on a
1087 # borderline
1088 my %passthrough = map { ($_ => $options{$_}) } qw(before after max);
1089 my ($recent_events) = $self->recent_events(%passthrough);
1090 my(@error, @dlcollector); # download-collector: array containing paths we need
1091 my $first_item = 0;
1092 my $last_item = $#$recent_events;
1093 my $done = $self->done;
1094 my $pathdb = $self->_pathdb;
1095 ITEM: for my $i ($first_item..$last_item) {
1096 my $status = +{};
1097 $self->_mirror_item
1100 $recent_events,
1101 $last_item,
1102 $done,
1103 $pathdb,
1104 \@dlcollector,
1105 \%options,
1106 $status,
1107 \@error,
1109 last if $i == $last_item;
1110 if ($status->{mustreturn}){
1111 if ($self->_current_tempfile && ! $self->_current_tempfile_fh) {
1112 # looks like a bug somewhere else
1113 my $t = $self->_current_tempfile;
1114 unlink $t or die "Could not unlink '$t': $!";
1115 $self->_current_tempfile(undef);
1116 $self->_use_tempfile(0);
1118 return;
1121 if (@dlcollector) {
1122 my $success = eval { $self->_mirror_dlcollector (\@dlcollector,$pathdb,$recent_events);};
1123 if (!$success || $@) {
1124 warn "Warning: Unknown error while mirroring: $@";
1125 push @error, $@;
1126 sleep 1;
1129 if ($self->verbose) {
1130 my $LFH = $self->_logfilehandle;
1131 print $LFH "DONE\n";
1133 # once we've gone to the end we consider ourselves free of obligations
1134 $self->unseed;
1135 $self->_mirror_unhide_tempfile ($trecentfile);
1136 $self->_mirror_perform_delayed_ops(\%options);
1137 return !@error;
1140 sub _mirror_item {
1141 my($self,
1143 $recent_events,
1144 $last_item,
1145 $done,
1146 $pathdb,
1147 $dlcollector,
1148 $options,
1149 $status,
1150 $error,
1151 ) = @_;
1152 my $recent_event = $recent_events->[$i];
1153 return if $done->covered ( $recent_event->{epoch} );
1154 if ($pathdb) {
1155 my $rec = $pathdb->{$recent_event->{path}};
1156 if ($rec && $rec->{recentepoch}) {
1157 if (_bigfloatgt
1158 ( $rec->{recentepoch}, $recent_event->{epoch} )){
1159 $done->register ($recent_events, [$i]);
1160 return;
1164 my $dst = $self->local_path($recent_event->{path});
1165 if ($recent_event->{type} eq "new"){
1166 $self->_mirror_item_new
1168 $dst,
1170 $last_item,
1171 $recent_events,
1172 $recent_event,
1173 $dlcollector,
1174 $pathdb,
1175 $status,
1176 $error,
1177 $options,
1179 } elsif ($recent_event->{type} eq "delete") {
1180 my $activity;
1181 if ($options->{'skip-deletes'}) {
1182 $activity = "skipped";
1183 } else {
1184 if (! -e $dst) {
1185 $activity = "not_found";
1186 } elsif (-l $dst or not -d _) {
1187 $self->delayed_operations->{unlink}{$dst}++;
1188 $activity = "deleted";
1189 } else {
1190 $self->delayed_operations->{rmdir}{$dst}++;
1191 $activity = "deleted";
1194 $done->register ($recent_events, [$i]);
1195 if ($pathdb) {
1196 $self->_mirror_register_path($pathdb,[$recent_event],$activity);
1198 } else {
1199 warn "Warning: invalid upload type '$recent_event->{type}'";
1203 sub _mirror_item_new {
1204 my($self,
1205 $dst,
1207 $last_item,
1208 $recent_events,
1209 $recent_event,
1210 $dlcollector,
1211 $pathdb,
1212 $status,
1213 $error,
1214 $options,
1215 ) = @_;
1216 if ($self->verbose) {
1217 my $doing = -e $dst ? "Sync" : "Get";
1218 my $LFH = $self->_logfilehandle;
1219 printf $LFH
1221 "%-4s %d (%d/%d/%s) %s ... ",
1222 $doing,
1223 time,
1224 1+$i,
1225 1+$last_item,
1226 $self->interval,
1227 $recent_event->{path},
1230 my $max_files_per_connection = $self->max_files_per_connection || 42;
1231 my $success;
1232 if ($self->verbose) {
1233 my $LFH = $self->_logfilehandle;
1234 print $LFH "\n";
1236 push @$dlcollector, { rev => $recent_event, i => $i };
1237 if (@$dlcollector >= $max_files_per_connection) {
1238 $success = eval {$self->_mirror_dlcollector ($dlcollector,$pathdb,$recent_events);};
1239 my $sleep = $self->sleep_per_connection;
1240 $sleep = 0.42 unless defined $sleep;
1241 Time::HiRes::sleep $sleep;
1242 if ($options->{piecemeal}) {
1243 $status->{mustreturn} = 1;
1244 return;
1246 } else {
1247 return;
1249 if (!$success || $@) {
1250 warn "Warning: Error while mirroring: $@";
1251 push @$error, $@;
1252 sleep 1;
1254 if ($self->verbose) {
1255 my $LFH = $self->_logfilehandle;
1256 print $LFH "DONE\n";
1260 sub _mirror_dlcollector {
1261 my($self,$xcoll,$pathdb,$recent_events) = @_;
1262 my $success = $self->mirror_path([map {$_->{rev}{path}} @$xcoll]);
1263 if ($pathdb) {
1264 $self->_mirror_register_path($pathdb,[map {$_->{rev}} @$xcoll],"rsync");
1266 $self->done->register($recent_events, [map {$_->{i}} @$xcoll]);
1267 @$xcoll = ();
1268 return $success;
1271 sub _mirror_register_path {
1272 my($self,$pathdb,$coll,$activity) = @_;
1273 my $time = time;
1274 for my $item (@$coll) {
1275 $pathdb->{$item->{path}} =
1277 recentepoch => $item->{epoch},
1278 ($activity."_on") => $time,
1283 sub _mirror_unhide_tempfile {
1284 my($self, $trecentfile) = @_;
1285 my $rfile = $self->rfile;
1286 if (rename $trecentfile, $rfile) {
1287 # warn "DEBUG: renamed '$trecentfile' to '$rfile'";
1288 } else {
1289 require Carp;
1290 Carp::confess("Could not rename '$trecentfile' to '$rfile': $!");
1292 $self->_use_tempfile (0);
1293 if (my $ctfh = $self->_current_tempfile_fh) {
1294 $ctfh->unlink_on_destroy (0);
1295 $self->_current_tempfile_fh (undef);
1299 sub _mirror_perform_delayed_ops {
1300 my($self,$options) = @_;
1301 my $delayed = $self->delayed_operations;
1302 for my $dst (keys %{$delayed->{unlink}}) {
1303 unless (unlink $dst) {
1304 require Carp;
1305 Carp::cluck ( "Warning: Error while unlinking '$dst': $!" ) if $options->{verbose};
1307 if ($self->verbose) {
1308 my $doing = "Del";
1309 my $LFH = $self->_logfilehandle;
1310 printf $LFH
1312 "%-4s %d (%s) %s DONE\n",
1313 $doing,
1314 time,
1315 $self->interval,
1316 $dst,
1318 delete $delayed->{unlink}{$dst};
1321 for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) {
1322 unless (rmdir $dst) {
1323 require Carp;
1324 Carp::cluck ( "Warning: Error on rmdir '$dst': $!" ) if $options->{verbose};
1326 if ($self->verbose) {
1327 my $doing = "Del";
1328 my $LFH = $self->_logfilehandle;
1329 printf $LFH
1331 "%-4s %d (%s) %s DONE\n",
1332 $doing,
1333 time,
1334 $self->interval,
1335 $dst,
1337 delete $delayed->{rmdir}{$dst};
1342 =head2 $success = $obj->mirror_path ( $arrref | $path )
1344 If the argument is a scalar it is treated as a path. The remote path
1345 is mirrored into the local copy. $path is the path found in the
1346 I<recentfile>, i.e. it is relative to the root directory of the
1347 mirror.
1349 If the argument is an array reference then all elements are treated as
1350 a path below the current tree and all are rsynced with a single
1351 command (and a single connection).
1353 =cut
1355 sub mirror_path {
1356 my($self,$path) = @_;
1357 # XXX simplify the two branches such that $path is treated as
1358 # [$path] maybe even demand the argument as an arrayref to
1359 # simplify docs and code. (rsync-over-recentfile-2.pl uses the
1360 # interface)
1361 if (ref $path and ref $path eq "ARRAY") {
1362 my $dst = $self->localroot;
1363 mkpath dirname $dst;
1364 my($fh) = File::Temp->new(TEMPLATE => sprintf(".%s-XXXX",
1365 lc $self->filenameroot,
1367 TMPDIR => 1,
1368 UNLINK => 0,
1370 for my $p (@$path) {
1371 print $fh $p, "\n";
1373 $fh->flush;
1374 $fh->unlink_on_destroy(1);
1375 my $gaveup = 0;
1376 my $retried = 0;
1377 local($ENV{LANG}) = "C";
1378 while (!$self->rsync->exec
1380 src => join("/",
1381 $self->remoteroot,
1383 dst => $dst,
1384 'files-from' => $fh->filename,
1385 )) {
1386 my(@err) = $self->rsync->err;
1387 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1388 if ($self->verbose) {
1389 my $LFH = $self->_logfilehandle;
1390 print $LFH "Info: ignoring link_stat error '@err'";
1392 return 1;
1394 $self->register_rsync_error (@err);
1395 if (++$retried >= 3) {
1396 my $batchsize = @$path;
1397 warn "The number of rsync retries now reached 3 within a batch of size $batchsize. Error was '@err'. Giving up now, will retry later, ";
1398 $gaveup = 1;
1399 last;
1401 sleep 1;
1403 unless ($gaveup) {
1404 $self->un_register_rsync_error ();
1406 } else {
1407 my $dst = $self->local_path($path);
1408 mkpath dirname $dst;
1409 local($ENV{LANG}) = "C";
1410 while (!$self->rsync->exec
1412 src => join("/",
1413 $self->remoteroot,
1414 $path
1416 dst => $dst,
1417 )) {
1418 my(@err) = $self->rsync->err;
1419 if ($self->_my_ignore_link_stat_errors && "@err" =~ m{^ rsync: \s link_stat }x ) {
1420 if ($self->verbose) {
1421 my $LFH = $self->_logfilehandle;
1422 print $LFH "Info: ignoring link_stat error '@err'";
1424 return 1;
1426 $self->register_rsync_error (@err);
1428 $self->un_register_rsync_error ();
1430 return 1;
1433 sub _my_ignore_link_stat_errors {
1434 my($self) = @_;
1435 my $x = $self->ignore_link_stat_errors;
1436 $x = 1 unless defined $x;
1437 return $x;
1440 sub _my_current_rfile {
1441 my($self) = @_;
1442 my $rfile;
1443 if ($self->_use_tempfile) {
1444 $rfile = $self->_current_tempfile;
1446 unless ($rfile && -s $rfile) {
1447 $rfile = $self->rfile;
1449 return $rfile;
1452 =head2 $path = $obj->naive_path_normalize ($path)
1454 Takes an absolute unix style path as argument and canonicalizes it to
1455 a shorter path if possible, removing things like double slashes or
1456 C</./> and removes references to C<../> directories to get a shorter
1457 unambiguos path. This is used to make the code easier that determines
1458 if a file passed to C<upgrade()> is indeed below our C<localroot>.
1460 =cut
1462 sub naive_path_normalize {
1463 my($self,$path) = @_;
1464 $path =~ s|/+|/|g;
1465 1 while $path =~ s|/[^/]+/\.\./|/|;
1466 $path =~ s|/$||;
1467 $path;
1470 =head2 $ret = $obj->read_recent_1 ( $data )
1472 Delegate of C<recent_events()> on protocol 1
1474 =cut
1476 sub read_recent_1 {
1477 my($self, $data) = @_;
1478 return $data->{recent};
1481 =head2 $array_ref = $obj->recent_events ( %options )
1483 Note: the code relies on the resource being written atomically. We
1484 cannot lock because we may have no write access. If the caller has
1485 write access (eg. aggregate() or update()), it has to care for any
1486 necessary locking and it MUST write atomically.
1488 If C<$options{after}> is specified, only file events after this
1489 timestamp are returned.
1491 If C<$options{before}> is specified, only file events before this
1492 timestamp are returned.
1494 If C<$options{max}> is specified only a maximum of this many most
1495 recent events is returned.
1497 If C<$options{'skip-deletes'}> is specified, no files-to-be-deleted
1498 will be returned.
1500 If C<$options{contains}> is specified the value must be a hash
1501 reference containing a query. The query may contain the keys C<epoch>,
1502 C<path>, and C<type>. Each represents a condition that must be met. If
1503 there is more than one such key, the conditions are ANDed.
1505 If C<$options{info}> is specified, it must be a hashref. This hashref
1506 will be filled with metadata about the unfiltered recent_events of
1507 this object, in key C<first> there is the first item, in key C<last>
1508 is the last.
1510 =cut
1512 sub recent_events {
1513 my ($self, %options) = @_;
1514 my $info = $options{info};
1515 if ($self->is_slave) {
1516 # XXX seems dubious, might produce tempfiles without removing them?
1517 $self->get_remote_recentfile_as_tempfile;
1519 my $rfile_or_tempfile = $self->_my_current_rfile or return [];
1520 -e $rfile_or_tempfile or return [];
1521 my $suffix = $self->serializer_suffix;
1522 my ($data) = eval {
1523 $self->_try_deserialize
1525 $suffix,
1526 $rfile_or_tempfile,
1529 my $err = $@;
1530 if ($err or !$data) {
1531 return [];
1533 my $re;
1534 if (reftype $data eq 'ARRAY') { # protocol 0
1535 $re = $data;
1536 } else {
1537 $re = $self->_recent_events_protocol_x
1539 $data,
1540 $rfile_or_tempfile,
1543 return $re unless grep {defined $options{$_}} qw(after before contains max skip-deletes);
1544 $self->_recent_events_handle_options ($re, \%options);
1547 # File::Rsync::Mirror::Recentfile::_recent_events_handle_options
1548 sub _recent_events_handle_options {
1549 my($self, $re, $options) = @_;
1550 my $last_item = $#$re;
1551 my $info = $options->{info};
1552 if ($info) {
1553 $info->{first} = $re->[0];
1554 $info->{last} = $re->[-1];
1556 if (defined $options->{after}) {
1557 if ($re->[0]{epoch} > $options->{after}) {
1558 if (
1559 my $f = first
1560 {$re->[$_]{epoch} <= $options->{after}}
1561 0..$#$re
1563 $last_item = $f-1;
1565 } else {
1566 $last_item = -1;
1569 my $first_item = 0;
1570 if (defined $options->{before}) {
1571 if ($re->[0]{epoch} > $options->{before}) {
1572 if (
1573 my $f = first
1574 {$re->[$_]{epoch} < $options->{before}}
1575 0..$last_item
1577 $first_item = $f;
1579 } else {
1580 $first_item = 0;
1583 if (0 != $first_item || -1 != $last_item) {
1584 @$re = splice @$re, $first_item, 1+$last_item-$first_item;
1586 if ($options->{'skip-deletes'}) {
1587 @$re = grep { $_->{type} ne "delete" } @$re;
1589 if (my $contopt = $options->{contains}) {
1590 my $seen_allowed = 0;
1591 for my $allow (qw(epoch path type)) {
1592 if (exists $contopt->{$allow}) {
1593 $seen_allowed++;
1594 my $v = $contopt->{$allow};
1595 @$re = grep { $_->{$allow} eq $v } @$re;
1598 if (keys %$contopt > $seen_allowed) {
1599 require Carp;
1600 Carp::confess
1601 (sprintf "unknown query: %s", join ", ", %$contopt);
1604 if ($options->{max} && @$re > $options->{max}) {
1605 @$re = splice @$re, 0, $options->{max};
1607 $re;
1610 sub _recent_events_protocol_x {
1611 my($self,
1612 $data,
1613 $rfile_or_tempfile,
1614 ) = @_;
1615 my $meth = sprintf "read_recent_%d", $data->{meta}{protocol};
1616 # we may be reading meta for the first time
1617 while (my($k,$v) = each %{$data->{meta}}) {
1618 if ($k ne lc $k){ # "Producers"
1619 $self->{ORIG}{$k} = $v;
1620 next;
1622 next if defined $self->$k;
1623 $self->$k($v);
1625 my $re = $self->$meth ($data);
1626 my $minmax;
1627 if (my @stat = stat $rfile_or_tempfile) {
1628 $minmax = { mtime => $stat[9] };
1629 } else {
1630 # defensive because ABH encountered:
1632 #### Sync 1239828608 (1/1/Z) temp .../authors/.FRMRecent-RECENT-Z.yaml-
1633 #### Ydr_.yaml ... DONE
1634 #### Cannot stat '/mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-
1635 #### Ydr_.yaml': No such file or directory at /usr/lib/perl5/site_perl/
1636 #### 5.8.8/File/Rsync/Mirror/Recentfile.pm line 1558.
1637 #### unlink0: /mirrors/CPAN/authors/.FRMRecent-RECENT-Z.yaml-Ydr_.yaml is
1638 #### gone already at cpan-pause.pl line 0
1640 my $LFH = $self->_logfilehandle;
1641 print $LFH "Warning (maybe harmless): Cannot stat '$rfile_or_tempfile': $!"
1643 if (@$re) {
1644 $minmax->{min} = $re->[-1]{epoch};
1645 $minmax->{max} = $re->[0]{epoch};
1647 $self->minmax ( $minmax );
1648 return $re;
1651 sub _try_deserialize {
1652 my($self,
1653 $suffix,
1654 $rfile_or_tempfile,
1655 ) = @_;
1656 if ($suffix eq ".yaml") {
1657 require YAML::Syck;
1658 YAML::Syck::LoadFile($rfile_or_tempfile);
1659 } elsif ($HAVE->{"Data::Serializer"}) {
1660 my $serializer = Data::Serializer->new
1661 ( serializer => $serializers{$suffix} );
1662 my $serialized = do
1664 open my $fh, $rfile_or_tempfile or die "Could not open: $!";
1665 local $/;
1666 <$fh>;
1668 $serializer->raw_deserialize($serialized);
1669 } else {
1670 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
1674 sub _refresh_internals {
1675 my($self, $dst) = @_;
1676 my $class = ref $self;
1677 my $rfpeek = $class->new_from_file ($dst);
1678 for my $acc (qw(
1679 _merged
1680 minmax
1681 )) {
1682 $self->$acc ( $rfpeek->$acc );
1684 my $old_dirtymark = $self->dirtymark;
1685 my $new_dirtymark = $rfpeek->dirtymark;
1686 if ($old_dirtymark && $new_dirtymark && $new_dirtymark ne $old_dirtymark) {
1687 $self->done->reset;
1688 $self->dirtymark ( $new_dirtymark );
1689 $self->_uptodateness_ever_reached(0);
1690 $self->seed;
1694 =head2 $ret = $obj->rfilename
1696 Just the basename of our I<recentfile>, composed from C<filenameroot>,
1697 a dash, C<interval>, and C<serializer_suffix>. E.g. C<RECENT-6h.yaml>
1699 =cut
1701 sub rfilename {
1702 my($self) = @_;
1703 my $file = sprintf("%s-%s%s",
1704 $self->filenameroot,
1705 $self->interval,
1706 $self->serializer_suffix,
1708 return $file;
1711 =head2 $str = $self->remote_dir
1713 The directory we are mirroring from.
1715 =cut
1717 sub remote_dir {
1718 my($self, $set) = @_;
1719 if (defined $set) {
1720 $self->_remote_dir ($set);
1722 my $x = $self->_remote_dir;
1723 $self->is_slave (1);
1724 return $x;
1727 =head2 $str = $obj->remoteroot
1729 =head2 (void) $obj->remoteroot ( $set )
1731 Get/Set the composed prefix needed when rsyncing from a remote module.
1732 If remote_host, remote_module, and remote_dir are set, it is composed
1733 from these.
1735 =cut
1737 sub remoteroot {
1738 my($self, $set) = @_;
1739 if (defined $set) {
1740 $self->_remoteroot($set);
1742 my $remoteroot = $self->_remoteroot;
1743 unless (defined $remoteroot) {
1744 $remoteroot = sprintf
1746 "%s%s%s",
1747 defined $self->remote_host ? ($self->remote_host."::") : "",
1748 defined $self->remote_module ? ($self->remote_module."/") : "",
1749 defined $self->remote_dir ? $self->remote_dir : "",
1751 $self->_remoteroot($remoteroot);
1753 return $remoteroot;
1756 =head2 (void) $obj->split_rfilename ( $recentfilename )
1758 Inverse method to C<rfilename>. C<$recentfilename> is a plain filename
1759 of the pattern
1761 $filenameroot-$interval$serializer_suffix
1763 e.g.
1765 RECENT-1M.yaml
1767 This filename is split into its parts and the parts are fed to the
1768 object itself.
1770 =cut
1772 sub split_rfilename {
1773 my($self, $rfname) = @_;
1774 my($splitter) = qr(^(.+)-([^-\.]+)(\.[^\.]+));
1775 if (my($f,$i,$s) = $rfname =~ $splitter) {
1776 $self->filenameroot ($f);
1777 $self->interval ($i);
1778 $self->serializer_suffix ($s);
1779 } else {
1780 die "Alert: cannot split '$rfname', doesn't match '$splitter'";
1782 return;
1785 =head2 my $rfile = $obj->rfile
1787 Returns the full path of the I<recentfile>
1789 =cut
1791 sub rfile {
1792 my($self) = @_;
1793 my $rfile = $self->_rfile;
1794 return $rfile if defined $rfile;
1795 $rfile = File::Spec->catfile
1796 ($self->localroot,
1797 $self->rfilename,
1799 $self->_rfile ($rfile);
1800 return $rfile;
1803 =head2 $rsync_obj = $obj->rsync
1805 The File::Rsync object that this object uses for communicating with an
1806 upstream server.
1808 =cut
1810 sub rsync {
1811 my($self) = @_;
1812 my $rsync = $self->_rsync;
1813 unless (defined $rsync) {
1814 my $rsync_options = $self->rsync_options || {};
1815 if ($HAVE->{"File::Rsync"}) {
1816 $rsync = File::Rsync->new($rsync_options);
1817 $self->_rsync($rsync);
1818 } else {
1819 die "File::Rsync required for rsync operations. Cannot continue";
1822 return $rsync;
1825 =head2 (void) $obj->register_rsync_error(@err)
1827 =head2 (void) $obj->un_register_rsync_error()
1829 Register_rsync_error is called whenever the File::Rsync object fails
1830 on an exec (say, connection doesn't succeed). It issues a warning and
1831 sleeps for an increasing amount of time. Un_register_rsync_error
1832 resets the error count. See also accessor C<max_rsync_errors>.
1834 =cut
1837 my $no_success_count = 0;
1838 my $no_success_time = 0;
1839 sub register_rsync_error {
1840 my($self, @err) = @_;
1841 chomp @err;
1842 $no_success_time = time;
1843 $no_success_count++;
1844 my $max_rsync_errors = $self->max_rsync_errors;
1845 $max_rsync_errors = MAX_INT unless defined $max_rsync_errors;
1846 if ($max_rsync_errors>=0 && $no_success_count >= $max_rsync_errors) {
1847 require Carp;
1848 Carp::confess
1850 sprintf
1852 "Alert: Error while rsyncing (%s): '%s', error count: %d, exiting now,",
1853 $self->interval,
1854 join(" ",@err),
1855 $no_success_count,
1858 my $sleep = 12 * $no_success_count;
1859 $sleep = 300 if $sleep > 300;
1860 require Carp;
1861 Carp::cluck
1862 (sprintf
1864 "Warning: %s, Error while rsyncing (%s): '%s', sleeping %d",
1865 scalar(localtime($no_success_time)),
1866 $self->interval,
1867 join(" ",@err),
1868 $sleep,
1870 sleep $sleep
1872 sub un_register_rsync_error {
1873 my($self) = @_;
1874 $no_success_time = 0;
1875 $no_success_count = 0;
1879 =head2 $clone = $obj->_sparse_clone
1881 Clones just as much from itself that it does not hurt. Experimental
1882 method.
1884 Note: what fits better: sparse or shallow? Other suggestions?
1886 =cut
1888 sub _sparse_clone {
1889 my($self) = @_;
1890 my $new = bless {}, ref $self;
1891 for my $m (qw(
1892 _interval
1893 _localroot
1894 _remoteroot
1895 _rfile
1896 _use_tempfile
1897 aggregator
1898 filenameroot
1899 ignore_link_stat_errors
1900 is_slave
1901 max_files_per_connection
1902 protocol
1903 rsync_options
1904 serializer_suffix
1905 sleep_per_connection
1906 tempdir
1907 verbose
1908 )) {
1909 my $o = $self->$m;
1910 $o = Storable::dclone $o if ref $o;
1911 $new->$m($o);
1913 $new;
1916 =head2 $boolean = OBJ->ttl_reached ()
1918 =cut
1920 sub ttl_reached {
1921 my($self) = @_;
1922 my $have_mirrored = $self->have_mirrored || 0;
1923 my $now = Time::HiRes::time;
1924 my $ttl = $self->ttl;
1925 $ttl = 24.2 unless defined $ttl;
1926 if ($now > $have_mirrored + $ttl) {
1927 return 1;
1929 return 0;
1932 =head2 (void) $obj->unlock()
1934 Unlocking is implemented with an C<rmdir> on a locking directory
1935 (C<.lock> appended to $rfile).
1937 =cut
1939 sub unlock {
1940 my($self) = @_;
1941 return unless $self->_is_locked;
1942 my $rfile = $self->rfile;
1943 unlink "$rfile.lock/process" or warn "Could not unlink lockfile '$rfile.lock/process': $!";
1944 rmdir "$rfile.lock" or warn "Could not rmdir lockdir '$rfile.lock': $!";;
1945 $self->_is_locked (0);
1948 =head2 unseed
1950 Sets this recentfile in the state of not 'seeded'.
1952 =cut
1953 sub unseed {
1954 my($self) = @_;
1955 $self->seeded(0);
1958 =head2 $ret = $obj->update ($path, $type)
1960 =head2 $ret = $obj->update ($path, "new", $dirty_epoch)
1962 =head2 $ret = $obj->update ()
1964 Enter one file into the local I<recentfile>. $path is the (usually
1965 absolute) path. If the path is outside I<our> tree, then it is
1966 ignored.
1968 C<$type> is one of C<new> or C<delete>.
1970 Events of type C<new> may set $dirty_epoch. $dirty_epoch is normally
1971 not used and the epoch is calculated by the update() routine itself
1972 based on current time. But if there is the demand to insert a
1973 not-so-current file into the dataset, then the caller sets
1974 $dirty_epoch. This causes the epoch of the registered event to become
1975 $dirty_epoch or -- if the exact value given is already taken -- a tiny
1976 bit more. As compensation the dirtymark of the whole dataset is set to
1977 now or the current epoch, whichever is higher. Note: setting the
1978 dirty_epoch to the future is prohibited as it's very unlikely to be
1979 intended: it definitely might wreak havoc with the index files.
1981 The new file event is unshifted (or, if dirty_epoch is set, inserted
1982 at the place it belongs to, according to the rule to have a sequence
1983 of strictly decreasing timestamps) to the array of recent_events and
1984 the array is shortened to the length of the timespan allowed. This is
1985 usually the timespan specified by the interval of this recentfile but
1986 as long as this recentfile has not been merged to another one, the
1987 timespan may grow without bounds.
1989 The third form runs an update without inserting a new file. This may
1990 be desired to truncate a recentfile.
1992 =cut
1993 sub _epoch_monotonically_increasing {
1994 my($self,$epoch,$recent) = @_;
1995 return $epoch unless @$recent; # the first one goes unoffended
1996 if (_bigfloatgt("".$epoch,$recent->[0]{epoch})) {
1997 return $epoch;
1998 } else {
1999 return _increase_a_bit($recent->[0]{epoch});
2002 sub update {
2003 my($self,$path,$type,$dirty_epoch) = @_;
2004 if (defined $path or defined $type or defined $dirty_epoch) {
2005 die "update called without path argument" unless defined $path;
2006 die "update called without type argument" unless defined $type;
2007 die "update called with illegal type argument: $type" unless $type =~ /(new|delete)/;
2009 $self->lock;
2010 my $ctx = $self->_locked_batch_update([{path=>$path,type=>$type,epoch=>$dirty_epoch}]);
2011 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2012 $self->_assert_symlink;
2013 $self->unlock;
2016 =head2 $obj->batch_update($batch)
2018 Like update but for many files. $batch is an arrayref containing
2019 hashrefs with the structure
2022 path => $path,
2023 type => $type,
2024 epoch => $epoch,
2029 =cut
2030 sub batch_update {
2031 my($self,$batch) = @_;
2032 $self->lock;
2033 my $ctx = $self->_locked_batch_update($batch);
2034 $self->write_recent($ctx->{recent}) if $ctx->{something_done};
2035 $self->_assert_symlink;
2036 $self->unlock;
2038 sub _locked_batch_update {
2039 my($self,$batch) = @_;
2040 my $something_done = 0;
2041 my $recent = $self->recent_events;
2042 unless ($recent->[0]) {
2043 # obstetrics
2044 $something_done = 1;
2046 my %paths_in_recent = map { $_->{path} => undef } @$recent;
2047 my $interval = $self->interval;
2048 my $canonmeth = $self->canonize;
2049 unless ($canonmeth) {
2050 $canonmeth = "naive_path_normalize";
2052 my $oldest_allowed = 0;
2053 my $setting_new_dirty_mark = 0;
2054 my $console;
2055 if ($self->verbose && @$batch > 1) {
2056 eval {require Time::Progress};
2057 warn "dollarat[$@]" if $@;
2058 $| = 1;
2059 $console = new Time::Progress;
2060 $console->attr( min => 1, max => scalar @$batch );
2061 print "\n";
2063 my $i = 0;
2064 my $memo_splicepos;
2065 ITEM: for my $item (sort {($b->{epoch}||0) <=> ($a->{epoch}||0)} @$batch) {
2066 $i++;
2067 print $console->report( "\rdone %p elapsed: %L (%l sec), ETA %E (%e sec)", $i ) if $console and not $i % 50;
2068 my $ctx = $self->_update_batch_item($item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,\%paths_in_recent,$memo_splicepos);
2069 $something_done = $ctx->{something_done};
2070 $oldest_allowed = $ctx->{oldest_allowed};
2071 $setting_new_dirty_mark = $ctx->{setting_new_dirty_mark};
2072 $recent = $ctx->{recent};
2073 $memo_splicepos = $ctx->{memo_splicepos};
2075 print "\n" if $console;
2076 if ($setting_new_dirty_mark) {
2077 $oldest_allowed = 0;
2079 TRUNCATE: while (@$recent) {
2080 if (_bigfloatlt($recent->[-1]{epoch}, $oldest_allowed)) {
2081 pop @$recent;
2082 $something_done = 1;
2083 } else {
2084 last TRUNCATE;
2087 return {something_done=>$something_done,recent=>$recent};
2089 sub _update_batch_item {
2090 my($self,$item,$canonmeth,$recent,$setting_new_dirty_mark,$oldest_allowed,$something_done,$paths_in_recent,$memo_splicepos) = @_;
2091 my($path,$type,$dirty_epoch) = @{$item}{qw(path type epoch)};
2092 if (defined $path or defined $type or defined $dirty_epoch) {
2093 $path = $self->$canonmeth($path);
2095 # you must calculate the time after having locked, of course
2096 my $now = Time::HiRes::time;
2098 my $epoch;
2099 if (defined $dirty_epoch && _bigfloatgt($now,$dirty_epoch)) {
2100 $epoch = $dirty_epoch;
2101 } else {
2102 $epoch = $self->_epoch_monotonically_increasing($now,$recent);
2104 $recent ||= [];
2105 my $merged = $self->merged;
2106 if ($merged->{epoch} && !$setting_new_dirty_mark) {
2107 my $virtualnow = _bigfloatmax($now,$epoch);
2108 # for the lower bound I think we need no big math, we calc already
2109 my $secs = $self->interval_secs();
2110 $oldest_allowed = min($virtualnow - $secs, $merged->{epoch}, $epoch);
2111 } else {
2112 # as long as we are not merged at all, no limits!
2114 my $lrd = $self->localroot;
2115 if (defined $path && $path =~ s|^\Q$lrd\E||) {
2116 $path =~ s|^/||;
2117 my $splicepos;
2118 # remove the older duplicates of this $path, irrespective of $type:
2119 if (defined $dirty_epoch) {
2120 my $ctx = $self->_update_with_dirty_epoch($path,$recent,$epoch,$paths_in_recent,$memo_splicepos);
2121 $recent = $ctx->{recent};
2122 $splicepos = $ctx->{splicepos};
2123 $epoch = $ctx->{epoch};
2124 my $dirtymark = $self->dirtymark;
2125 my $new_dm = $now;
2126 if (_bigfloatgt($epoch, $now)) { # just in case we had to increase it
2127 $new_dm = $epoch;
2129 $self->dirtymark($new_dm);
2130 $setting_new_dirty_mark = 1;
2131 if (not defined $merged->{epoch} or _bigfloatlt($epoch,$merged->{epoch})) {
2132 $self->merged(+{});
2134 } else {
2135 $recent = [ grep { $_->{path} ne $path } @$recent ];
2136 $splicepos = 0;
2138 if (defined $splicepos) {
2139 splice @$recent, $splicepos, 0, { epoch => $epoch, path => $path, type => $type };
2140 $paths_in_recent->{$path} = undef;
2142 $memo_splicepos = $splicepos;
2143 $something_done = 1;
2145 return
2147 something_done => $something_done,
2148 oldest_allowed => $oldest_allowed,
2149 setting_new_dirty_mark => $setting_new_dirty_mark,
2150 recent => $recent,
2151 memo_splicepos => $memo_splicepos,
2154 sub _update_with_dirty_epoch {
2155 my($self,$path,$recent,$epoch,$paths_in_recent,$memo_splicepos) = @_;
2156 my $splicepos;
2157 my $new_recent = [];
2158 if (exists $paths_in_recent->{$path}) {
2159 my $cancel = 0;
2160 KNOWN_EVENT: for my $i (0..$#$recent) {
2161 if ($recent->[$i]{path} eq $path) {
2162 if ($recent->[$i]{epoch} eq $epoch) {
2163 # nothing to do
2164 $cancel = 1;
2165 last KNOWN_EVENT;
2167 } else {
2168 push @$new_recent, $recent->[$i];
2171 @$recent = @$new_recent unless $cancel;
2173 if (!exists $recent->[0] or _bigfloatgt($epoch,$recent->[0]{epoch})) {
2174 $splicepos = 0;
2175 } elsif (_bigfloatlt($epoch,$recent->[-1]{epoch})) {
2176 $splicepos = @$recent;
2177 } else {
2178 my $startingpoint;
2179 if (_bigfloatgt($memo_splicepos<=$#$recent && $epoch, $recent->[$memo_splicepos]{epoch})) {
2180 $startingpoint = 0;
2181 } else {
2182 $startingpoint = $memo_splicepos;
2184 RECENT: for my $i ($startingpoint..$#$recent) {
2185 my $ev = $recent->[$i];
2186 if ($epoch eq $recent->[$i]{epoch}) {
2187 $epoch = _increase_a_bit($epoch, $i ? $recent->[$i-1]{epoch} : undef);
2189 if (_bigfloatgt($epoch,$recent->[$i]{epoch})) {
2190 $splicepos = $i;
2191 last RECENT;
2195 return {
2196 recent => $recent,
2197 splicepos => $splicepos,
2198 epoch => $epoch,
2202 =head2 seed
2204 Sets this recentfile in the state of 'seeded' which means it has to
2205 re-evaluate its uptodateness.
2207 =cut
2208 sub seed {
2209 my($self) = @_;
2210 $self->seeded(1);
2213 =head2 seeded
2215 Tells if the recentfile is in the state 'seeded'.
2217 =cut
2218 sub seeded {
2219 my($self, $set) = @_;
2220 if (defined $set) {
2221 $self->_seeded ($set);
2223 my $x = $self->_seeded;
2224 unless (defined $x) {
2225 $x = 0;
2226 $self->_seeded ($x);
2228 return $x;
2231 =head2 uptodate
2233 True if this object has mirrored the complete interval covered by the
2234 current recentfile.
2236 =cut
2237 sub uptodate {
2238 my($self) = @_;
2239 my $uptodate;
2240 my $why;
2241 if ($self->_uptodateness_ever_reached and not $self->seeded) {
2242 $why = "saturated";
2243 $uptodate = 1;
2245 # it's too easy to misconfigure ttl and related timings and then
2246 # never reach uptodateness, so disabled 2009-03-22
2247 if (0 and not defined $uptodate) {
2248 if ($self->ttl_reached){
2249 $why = "ttl_reached returned true, so we are not uptodate";
2250 $uptodate = 0 ;
2253 unless (defined $uptodate) {
2254 # look if recentfile has unchanged timestamp
2255 my $minmax = $self->minmax;
2256 if (exists $minmax->{mtime}) {
2257 my $rfile = $self->_my_current_rfile;
2258 my @stat = stat $rfile or die "Could not stat '$rfile': $!";
2259 my $mtime = $stat[9];
2260 if (defined $mtime && defined $minmax->{mtime} && $mtime > $minmax->{mtime}) {
2261 $why = "mtime[$mtime] of rfile[$rfile] > minmax/mtime[$minmax->{mtime}], so we are not uptodate";
2262 $uptodate = 0;
2263 } else {
2264 my $covered = $self->done->covered(@$minmax{qw(max min)});
2265 $why = sprintf "minmax covered[%s], so we return that", defined $covered ? $covered : "UNDEF";
2266 $uptodate = $covered;
2270 unless (defined $uptodate) {
2271 $why = "fallthrough, so not uptodate";
2272 $uptodate = 0;
2274 if ($uptodate) {
2275 $self->_uptodateness_ever_reached(1);
2277 my $remember =
2279 uptodate => $uptodate,
2280 why => $why,
2282 $self->_remember_last_uptodate_call($remember);
2283 return $uptodate;
2286 =head2 $obj->write_recent ($recent_files_arrayref)
2288 Writes a I<recentfile> based on the current reflection of the current
2289 state of the tree limited by the current interval.
2291 =cut
2292 sub _resort {
2293 my($self) = @_;
2294 @{$_[1]} = sort { _bigfloatcmp($b->{epoch},$a->{epoch}) } @{$_[1]};
2295 return;
2297 sub write_recent {
2298 my ($self,$recent) = @_;
2299 die "write_recent called without argument" unless defined $recent;
2300 my $Last_epoch;
2301 SANITYCHECK: for my $i (0..$#$recent) {
2302 if (defined($Last_epoch) and _bigfloatge($recent->[$i]{epoch},$Last_epoch)) {
2303 require Carp;
2304 Carp::confess(sprintf "Warning: disorder '%s'>='%s', re-sorting %s\n",
2305 $recent->[$i]{epoch}, $Last_epoch, $self->interval);
2306 # you may want to:
2307 # $self->_resort($recent);
2308 # last SANITYCHECK;
2310 $Last_epoch = $recent->[$i]{epoch};
2312 my $minmax = $self->minmax;
2313 if (!defined $minmax->{max} || _bigfloatlt($minmax->{max},$recent->[0]{epoch})) {
2314 $minmax->{max} = @$recent && exists $recent->[0]{epoch} ? $recent->[0]{epoch} : undef;
2316 if (!defined $minmax->{min} || _bigfloatlt($minmax->{min},$recent->[-1]{epoch})) {
2317 $minmax->{min} = @$recent && exists $recent->[-1]{epoch} ? $recent->[-1]{epoch} : undef;
2319 $self->minmax($minmax);
2320 my $meth = sprintf "write_%d", $self->protocol;
2321 $self->$meth($recent);
2324 =head2 $obj->write_0 ($recent_files_arrayref)
2326 Delegate of C<write_recent()> on protocol 0
2328 =cut
2330 sub write_0 {
2331 my ($self,$recent) = @_;
2332 my $rfile = $self->rfile;
2333 YAML::Syck::DumpFile("$rfile.new",$recent);
2334 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2337 =head2 $obj->write_1 ($recent_files_arrayref)
2339 Delegate of C<write_recent()> on protocol 1
2341 =cut
2343 sub write_1 {
2344 my ($self,$recent) = @_;
2345 my $rfile = $self->rfile;
2346 my $suffix = $self->serializer_suffix;
2347 my $data = {
2348 meta => $self->meta_data,
2349 recent => $recent,
2351 my $serialized;
2352 if ($suffix eq ".yaml") {
2353 $serialized = YAML::Syck::Dump($data);
2354 } elsif ($HAVE->{"Data::Serializer"}) {
2355 my $serializer = Data::Serializer->new
2356 ( serializer => $serializers{$suffix} );
2357 $serialized = $serializer->raw_serialize($data);
2358 } else {
2359 die "Data::Serializer not installed, cannot proceed with suffix '$suffix'";
2361 open my $fh, ">", "$rfile.new" or die "Could not open >'$rfile.new': $!";
2362 print $fh $serialized;
2363 close $fh or die "Could not close '$rfile.new': $!";
2364 rename "$rfile.new", $rfile or die "Could not rename to '$rfile': $!";
2367 BEGIN {
2368 my $nq = qr/[^"]+/; # non-quotes
2369 my @pod_lines =
2370 split /\n/, <<'=cut'; %serializers = map { my @x = /"($nq)"\s+=>\s+"($nq)"/; @x } grep {s/^=item\s+C<<\s+(.+)\s+>>$/$1/} @pod_lines; }
2372 =head1 SERIALIZERS
2374 The following suffixes are supported and trigger the use of these
2375 serializers:
2377 =over 4
2379 =item C<< ".yaml" => "YAML::Syck" >>
2381 =item C<< ".json" => "JSON" >>
2383 =item C<< ".sto" => "Storable" >>
2385 =item C<< ".dd" => "Data::Dumper" >>
2387 =back
2389 =cut
2391 BEGIN {
2392 my @pod_lines =
2393 split /\n/, <<'=cut'; %seconds = map { eval } grep {s/^=item\s+C<<(.+)>>$/$1/} @pod_lines; }
2395 =head1 INTERVAL SPEC
2397 An interval spec is a primitive way to express time spans. Normally it
2398 is composed from an integer and a letter.
2400 As a special case, a string that consists only of the single letter
2401 C<Z>, stands for MAX_INT seconds.
2403 The following letters express the specified number of seconds:
2405 =over 4
2407 =item C<< s => 1 >>
2409 =item C<< m => 60 >>
2411 =item C<< h => 60*60 >>
2413 =item C<< d => 60*60*24 >>
2415 =item C<< W => 60*60*24*7 >>
2417 =item C<< M => 60*60*24*30 >>
2419 =item C<< Q => 60*60*24*90 >>
2421 =item C<< Y => 60*60*24*365.25 >>
2423 =back
2425 =cut
2427 =head1 SEE ALSO
2429 L<File::Rsync::Mirror::Recent>,
2430 L<File::Rsync::Mirror::Recentfile::Done>,
2431 L<File::Rsync::Mirror::Recentfile::FakeBigFloat>
2433 =head1 BUGS
2435 Please report any bugs or feature requests through the web interface
2437 L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=File-Rsync-Mirror-Recentfile>.
2438 I will be notified, and then you'll automatically be notified of
2439 progress on your bug as I make changes.
2441 =head1 KNOWN BUGS
2443 Memory hungry: it seems all memory is allocated during the initial
2444 rsync where a list of all files is maintained in memory.
2446 =head1 SUPPORT
2448 You can find documentation for this module with the perldoc command.
2450 perldoc File::Rsync::Mirror::Recentfile
2452 You can also look for information at:
2454 =over 4
2456 =item * RT: CPAN's request tracker
2458 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=File-Rsync-Mirror-Recentfile>
2460 =item * AnnoCPAN: Annotated CPAN documentation
2462 L<http://annocpan.org/dist/File-Rsync-Mirror-Recentfile>
2464 =item * CPAN Ratings
2466 L<http://cpanratings.perl.org/d/File-Rsync-Mirror-Recentfile>
2468 =item * Search CPAN
2470 L<http://search.cpan.org/dist/File-Rsync-Mirror-Recentfile>
2472 =back
2475 =head1 ACKNOWLEDGEMENTS
2477 Thanks to RJBS for module-starter.
2479 =head1 AUTHOR
2481 Andreas König
2483 =head1 COPYRIGHT & LICENSE
2485 Copyright 2008,2009 Andreas König.
2487 This program is free software; you can redistribute it and/or modify it
2488 under the same terms as Perl itself.
2491 =cut
2493 1; # End of File::Rsync::Mirror::Recentfile
2495 # Local Variables:
2496 # mode: cperl
2497 # cperl-indent-level: 4
2498 # End: