From 0bee51eb21d766760e49cdd43b2589c9f3ca8daf Mon Sep 17 00:00:00 2001 From: "Andreas J. Koenig" Date: Sat, 9 Oct 2010 02:58:17 +0200 Subject: [PATCH] parent reads too much of the child data --- lib/File/Rsync/Mirror/Recent.pm | 172 +++++++++++++++++++++++++----------- lib/File/Rsync/Mirror/Recentfile.pm | 10 +-- 2 files changed, 127 insertions(+), 55 deletions(-) diff --git a/lib/File/Rsync/Mirror/Recent.pm b/lib/File/Rsync/Mirror/Recent.pm index 36f3869..d180816 100644 --- a/lib/File/Rsync/Mirror/Recent.pm +++ b/lib/File/Rsync/Mirror/Recent.pm @@ -23,7 +23,7 @@ use File::Rsync::Mirror::Recentfile::FakeBigFloat qw(:all); use File::Temp; use List::Pairwise qw(mapp grepp); use List::Util qw(first max); -use Scalar::Util qw(reftype); +use Scalar::Util qw(blessed reftype); use Storable; use Time::HiRes qw(); use YAML::Syck; @@ -76,16 +76,15 @@ sub new { =head2 my $obj = CLASS->thaw($statusfile) -(experimental) Constructor from a statusfile left over from a previous -rmirror run. See _runstatusfile in the code for details. +Constructor from a statusfile left over from a previous +rmirror run. See also C. =cut sub thaw { - my($class, $file) = @_; + my($self, $file) = @_; die "thaw called without statusfile argument" unless defined $file; die "Alert: statusfile '$file' not found" unless -e $file; - require YAML::Syck; my $start = time; my $sleeptime = 0.02; @@ -100,20 +99,21 @@ sub thaw { } my $serialized = YAML::Syck::LoadFile($file); rmdir "$file.lock" or die "Could not rmdir lockfile: $!"; - warn sprintf "Reading '$file' which was written %d seconds ago\n", time-$serialized->{time}; - my $self = $serialized->{reduced_self}; - bless $self, $class; + warn sprintf "DEBUG: Reading '$file' which was written %d seconds ago", time-$serialized->{time}; + my $charged_self = $serialized->{reduced_self}; + my $class = blessed $self; + bless $charged_self, $class; my $rfs = $serialized->{reduced_rfs}; my $rfclass = $class . "file"; # "Recent" . "file" - my $pathdb = $self->_pathdb; + my $pathdb = $charged_self->_pathdb; for my $rf (@$rfs) { bless $rf, $rfclass; $rf->_pathdb($pathdb); } - $self->_recentfiles($rfs); - $self->_principal_recentfile($rfs->[0]); + $charged_self->_recentfiles($rfs); + $charged_self->_principal_recentfile($rfs->[0]); # die "FIXME: thaw all recentfiles from reduced_rfs into _recentfiles as well, watch out for pathdb and rsync"; - return $self; + return $charged_self; } =head1 ACCESSORS @@ -135,7 +135,7 @@ BEGIN { "_principal_recentfile", "_recentfiles", "_rsync", - "_runstatusfile", # frequently dumps all rfs + "_runstatusfile", # occasionally dumps all rfs "_verbose", # internal variable for verbose setter/getter "_verboselog", # internal variable for verboselog setter/getter ); @@ -592,10 +592,7 @@ sub rmirror { my $rfs = $self->recentfiles; - my $_every_20_seconds = sub { - $self->principal_recentfile->seed; - }; - $_every_20_seconds->(); + $self->principal_recentfile->seed; my $_sigint = sub { # XXX exit gracefully (reminder) }; @@ -626,53 +623,80 @@ sub rmirror { $self->_dirtymark($dirtymark); } } + my $file = $self->runstatusfile; + $self->_rmirror_runstatusfile_write ($file, \%options); + $self->_rmirror_loop($minimum_time_per_loop,\%options); +} + +sub _rmirror_loop { + my($self,$minimum_time_per_loop,$options) = @_; LOOP: while () { my $ttleave = time + $minimum_time_per_loop; - if (my $file = $self->_runstatusfile) { - $self->_rmirror_runstatusfile ($file, \%options); + my $file = $self->runstatusfile; + my $child = $self->thaw ($file); + if ($child->recentfiles->[-1]->uptodate) { + warn "DEBUG: parent process[$$] about to leave loop"; + last LOOP; } - RECENTFILE: for my $i (0..$#$rfs) { - my $rf = $rfs->[$i]; - if (time > $ttleave){ - # Must make sure that one file can get fetched in any case - $self->_max_one_state(1); - } - if ($rf->seeded) { - $self->_rmirror_mirror ($i, \%options); - } elsif ($rf->uptodate){ - if ($i < $#$rfs){ - $rfs->[$i+1]->done->merge($rf->done); + my $pid; + if ($options->{loop}) { + $pid = fork; + } else { + $pid = 0; + } + if (! defined $pid) { + } elsif ($pid) { + waitpid($pid,0); + } else { + $self = $child; + my $rfs = $self->recentfiles; + $self->principal_recentfile->seed; + RECENTFILE: for my $i (0..$#$rfs) { + my $rf = $rfs->[$i]; + if (time > $ttleave) { + # Must make sure that one file can get fetched in any case + $self->_max_one_state(1); } - # no further seed necessary because "every_20_seconds" does it - next RECENTFILE; - } - WORKUNIT: while (time < $ttleave) { - if ($rf->uptodate) { - $self->_rmirror_sleep_per_connection ($i); + if ($rf->seeded) { + $self->_rmirror_mirror ($i, $options); + } elsif ($rf->uptodate) { + if ($i < $#$rfs) { + $rfs->[$i+1]->done->merge($rf->done); + } + # no further seed necessary because "periodic" does it next RECENTFILE; - } else { - $self->_rmirror_mirror ($i, \%options); + } + WORKUNIT: while (time < $ttleave) { + if ($rf->uptodate) { + $self->_rmirror_sleep_per_connection ($i); + next RECENTFILE; + } else { + $self->_rmirror_mirror ($i, $options); + } + } + if ($self->_max_one_state) { + last RECENTFILE; } } - if ($self->_max_one_state) { - last RECENTFILE; - } - } - $self->_max_one_state(0); - if ($rfs->[-1]->uptodate) { - $self->_rmirror_cleanup; - if ($options{loop}) { - } else { - last LOOP; + $self->_max_one_state(0); + $self->_rmirror_runstatusfile_write ($file, $options); + if ($rfs->[-1]->uptodate) { + $self->_rmirror_cleanup; + my $file = $self->runstatusfile; + unless ($options->{loop}) { + # warn "DEBUG: child process[$$] about to leave loop"; + sleep 1.5; + last LOOP; + } } } + my $sleep = $ttleave - time; if ($sleep > 0.01) { $self->_rmirror_endofloop_sleep ($sleep); } else { # negative time not invented yet:) } - $_every_20_seconds->(); } } @@ -728,7 +752,55 @@ sub _rmirror_cleanup { } } -sub _rmirror_runstatusfile { +=head2 $file = $obj->runstatusfile ($set) + +Getter/setter for C<_runstatusfile> attribute. Defaults to a temporary +file created by C. A status file is required for +C working. Since it may be interesting for debugging +purposes, you may want to specify a permanent file for this. + +=cut +sub runstatusfile { + my($self,$set) = @_; + if (defined $set) { + $self->_runstatusfile ($set); + } + my $x = $self->_runstatusfile; + unless (defined $x) { + require File::Temp; + my $tfile = File::Temp->new + ( + TEMPLATE => "Recent-XXXX", + TMPDIR => 1, + UNLINK => 0, + CLEANUP => 0, + SUFFIX => '.dat', + ); + $self->_runstatusfile($tfile->filename); + } + return $self->_runstatusfile; +} + +# unused code.... it was an oops, discovered the thaw() method too +# late, and starting writing this here.... +sub _rmirror_runstatusfile_read { + my($self, $file) = @_; + + require YAML::Syck; + my $start = time; + # XXX is locking useful here? + while (not mkdir "$file.lock") { + Time::HiRes::sleep 0.2; + warn "*** waiting for lock ***" if time - $start >= 3; + } + my $yml = YAML::Syck::LoadFile $file; + rmdir "$file.lock" or die "Could not rmdir lockfile: $!"; + my $rself = $yml->{reduced_self}; + my $rfs = $yml->{reduced_rfs}; + # XXX bring them into self +} + +sub _rmirror_runstatusfile_write { my($self, $file, $options) = @_; my $rself; while (my($k,$v) = each %$self) { diff --git a/lib/File/Rsync/Mirror/Recentfile.pm b/lib/File/Rsync/Mirror/Recentfile.pm index b816d50..4539a3b 100644 --- a/lib/File/Rsync/Mirror/Recentfile.pm +++ b/lib/File/Rsync/Mirror/Recentfile.pm @@ -188,7 +188,7 @@ sub DESTROY { unless ($self->_current_tempfile_fh) { if (my $tempfile = $self->_current_tempfile) { if (-e $tempfile) { - unlink $tempfile; # may fail in global destruction + # unlink $tempfile; # may fail in global destruction } } } @@ -1125,7 +1125,7 @@ sub mirror { # once we've gone to the end we consider ourselves free of obligations $self->unseed; $self->_mirror_unhide_tempfile ($trecentfile); - $self->_mirror_perform_delayed_ops; + $self->_mirror_perform_delayed_ops(\%options); return !@error; } @@ -1289,12 +1289,12 @@ sub _mirror_unhide_tempfile { } sub _mirror_perform_delayed_ops { - my($self) = @_; + my($self,$options) = @_; my $delayed = $self->delayed_operations; for my $dst (keys %{$delayed->{unlink}}) { unless (unlink $dst) { require Carp; - Carp::cluck ( "Warning: Error while unlinking '$dst': $!" ); + Carp::cluck ( "Warning: Error while unlinking '$dst': $!" ) if $options->{verbose}; } if ($self->verbose) { my $doing = "Del"; @@ -1313,7 +1313,7 @@ sub _mirror_perform_delayed_ops { for my $dst (sort {length($b) <=> length($a)} keys %{$delayed->{rmdir}}) { unless (rmdir $dst) { require Carp; - Carp::cluck ( "Warning: Error on rmdir '$dst': $!" ); + Carp::cluck ( "Warning: Error on rmdir '$dst': $!" ) if $options->{verbose}; } if ($self->verbose) { my $doing = "Del"; -- 2.11.4.GIT