From 592061dfa257ad18f70327464aba7c62ad9408b2 Mon Sep 17 00:00:00 2001 From: Andreas Koenig Date: Fri, 8 Aug 2008 12:50:20 +0200 Subject: [PATCH] describing the need of and a first implementation of the merged accessor and a test --- Todo | 7 ++++ lib/File/Rsync/Mirror/Recentfile.pm | 53 +++++++++++++++++++------ t/02-operation.t | 77 +++++++++++++++++++++++++++++++++---- 3 files changed, 118 insertions(+), 19 deletions(-) diff --git a/Todo b/Todo index 51e80d1..13fca64 100644 --- a/Todo +++ b/Todo @@ -1,3 +1,10 @@ +2008-08-07 Andreas Koenig + + * There must be an allow-me-to-truncate flag in every recentfile. + Without it one could construct a sequence of updates winning the locking + battle against the aggregator. Only if an aggregator has managed to + merge data over to the next level, truncating can be allowed. + 2008-08-06 Andreas Koenig * We should probably guarantee that no duplicates enter the aggregator diff --git a/lib/File/Rsync/Mirror/Recentfile.pm b/lib/File/Rsync/Mirror/Recentfile.pm index d19d35f..b30a99e 100644 --- a/lib/File/Rsync/Mirror/Recentfile.pm +++ b/lib/File/Rsync/Mirror/Recentfile.pm @@ -25,7 +25,7 @@ use File::Basename qw(dirname fileparse); use File::Copy qw(cp); use File::Path qw(mkpath); use File::Temp; -use List::Util qw(first); +use List::Util qw(first min); use Scalar::Util qw(reftype); use Storable; use Time::HiRes qw(); @@ -250,6 +250,11 @@ When rsync operations encounter that many errors without any resetting success in between, then we die. Defaults to -1 which means we run forever ignoring all rsync errors. +=item merged + +Hashref denoting when this recentfile has been merged into some other +at which epoch. + =item protocol When the RECENT file format changes, we increment the protocol. We try @@ -499,7 +504,8 @@ sub interval { $interval = $self->_interval; unless (defined $interval) { # do not ask the $self too much, it recurses! - die "Alert: interval undefined for '".$self."'. Cannot continue."; + require Carp; + Carp::confess("Alert: interval undefined for '".$self."'. Cannot continue."); } return $interval; } @@ -604,23 +610,26 @@ Bulk update of this object with another one. It's intended (but not enforced) to only merge smaller and younger $other objects into the current one. If this file is a C file, then we do not merge in objects of type C. But if we encounter an object of type -delete we delete the corresponding C object. +delete we delete the corresponding C object. =cut sub merge { my($self,$other) = @_; - my $lrd = $self->localroot; + $other->lock; my $other_recent = $other->recent_events || []; $self->lock; - my $interval = $self->interval; - my $secs = $self->interval_secs(); my $my_recent = $self->recent_events || []; # calculate the target time span my $epoch = $other_recent->[0] ? $other_recent->[0]{epoch} : $my_recent->[0] ? $my_recent->[0]{epoch} : undef; + my $oldest_allowed = 0; if ($epoch) { - my $oldest_allowed = $epoch - $secs; + $DB::single++; + if (my $merged = $self->merged) { + my $secs = $self->interval_secs(); + $oldest_allowed = min($epoch - $secs, $merged->{epoch}); + } # throw away outsiders while (@$my_recent && $my_recent->[-1]{epoch} < $oldest_allowed) { pop @$my_recent; @@ -630,9 +639,12 @@ sub merge { my %have; my $recent = []; for my $ev (@$other_recent) { + my $epoch = $ev->{epoch} || 0; + next if $epoch < $oldest_allowed; my $path = $ev->{path}; next if $have{$path}++; - if ($self->interval eq "Z" and $ev->{type} eq "delete") { + if ( $self->interval eq "Z" + and $ev->{type} eq "delete") { } else { push @$recent, { epoch => $ev->{epoch}, path => $path, type => $ev->{type} }; } @@ -641,6 +653,13 @@ sub merge { $self->recent_events($recent); $self->write_recent($recent); $self->unlock; + $other->merged({ + time => Time::HiRes::time, # not used anywhere + epoch => $epoch, # used in oldest_allowed + interval => $self->interval, # not used anywhere + }); + $other->write_recent($other_recent); + $other->unlock; } =head2 $hashref = $obj->meta_data @@ -658,10 +677,14 @@ sub meta_data { "canonize", "comment", "filenameroot", + "merged", "interval", "protocol", ) { - $ret->{$m} = $self->$m; + my $v = $self->$m; + if (defined $v) { + $ret->{$m} = $v; + } } # XXX need to reset the Producer if I am a writer, keep it when I # am a reader @@ -910,7 +933,9 @@ sub read_recent_1 { =head2 $array_ref = $obj->recent_events Note: the code relies on the resource being written atomically. We -cannot lock because we may have no write access. +cannot lock because we may have no write access. If the caller has +write access (eg. aggregate() or update()), it has to care for any +necessary locking. =cut @@ -1139,7 +1164,12 @@ sub update { my $interval = $self->interval; my $secs = $self->interval_secs(); my $epoch = Time::HiRes::time; - my $oldest_allowed = $epoch-$secs; + # XXX next four lines copy&paste from merge() + my $oldest_allowed = 0; + if (my $merged = $self->merged) { + my $secs = $self->interval_secs(); + $oldest_allowed = min($epoch - $secs, $merged->{epoch}); + } $self->lock; my $recent = $self->recent_events; @@ -1155,7 +1185,6 @@ sub update { $recent = [ grep { $_->{path} ne $path } @$recent ]; unshift @$recent, { epoch => $epoch, path => $path, type => $type }; - # sort? $self->write_recent($recent); $self->_assert_symlink; $self->unlock; diff --git a/t/02-operation.t b/t/02-operation.t index 1a39225..202bdcd 100644 --- a/t/02-operation.t +++ b/t/02-operation.t @@ -13,10 +13,75 @@ use YAML::Syck; my $root_from = "t/ta"; my $root_to = "t/tb"; -for my $root ($root_from, $root_to) { - rmtree $root; +rmtree [$root_from, $root_to]; + +{ + my @intervals; + BEGIN { + $tests += 13; + @intervals = qw( 2s 4s 8s 16s 32s Z ); + } + is 6, scalar @intervals, "array has 6 elements: @intervals"; + my $rf0 = File::Rsync::Mirror::Recentfile->new + ( + aggregator => [@intervals[1..$#intervals]], + interval => $intervals[0], + localroot => $root_from, + rsync_options => { + compress => 0, + links => 1, + times => 1, + checksum => 0, + }, + ); + for my $iv (@intervals) { + for my $i (0..3) { + my $file = sprintf + ( + "%s/A%s-%02d", + $root_from, + $iv, + $i, + ); + mkpath dirname $file; + open my $fh, ">", $file or die "Could not open '$file': $!"; + print $fh time, ":", $file, "\n"; + close $fh or die "Could not close '$file': $!"; + $rf0->update($file,"new"); + } + } + my $recent_events = $rf0->recent_events; + # faking internals as if the contents were wide-spread in time + for my $evi (0..$#$recent_events) { + my $ev = $recent_events->[$evi]; + $ev->{epoch} -= 2**($evi*.25); + } + $rf0->write_recent($recent_events); + $rf0->aggregate; + for my $iv (@intervals) { + my $rf = "$root_from/RECENT-$iv.yaml"; + my $filesize = -s $rf; + # now they have 1700+ bytes because they were merged for the + # first time ever and could not be truncated for this reason. + ok(1700 < $filesize, "file $iv has good size[$filesize]"); + utime 0, 0, $rf; # so that the next aggregate isn't skipped + } + open my $fh, ">", "$root_from/finissage" or die "Could not open: $!"; + print $fh "fin"; + close $fh or die "Could not close: $!"; + $rf0->update("$root_from/finissage","new"); + $rf0 = File::Rsync::Mirror::Recentfile->new_from_file("$root_from/RECENT-2s.yaml"); + $rf0->aggregate; + for my $iv (@intervals) { + my $filesize = -s "$root_from/RECENT-$iv.yaml"; + # now they have <1700 bytes because the second aggregate could + # truncate them + ok($iv eq "Z" || 1700 > $filesize, "file $iv has good size[$filesize]"); + } } +rmtree [$root_from, $root_to]; + { BEGIN { $tests += 38 } my $rf = File::Rsync::Mirror::Recentfile->new_from_file("t/RECENT-6h.yaml"); @@ -31,7 +96,7 @@ for my $root ($root_from, $root_to) { $rf->localroot($root_from); $rf->comment("produced during the test 02-operation.t"); $rf->aggregator([qw(30s 1m 2m 1h Z)]); - $rf->verbose(0); + $rf->verbose(1); my $start = Time::HiRes::time; for my $e (@$recent_events) { for my $pass (0,1) { @@ -87,7 +152,7 @@ for my $root ($root_from, $root_to) { my $span = $rece->[0]{epoch} - $rece->[-1]{epoch}; $have_worked = Time::HiRes::time - $start - $have_slept; ok($rececnt > 0 && $span < 30, "i[$i] cnt[$rececnt] span[$span] worked[$have_worked]"); - $have_slept += Time::HiRes::sleep 1; + $have_slept += Time::HiRes::sleep 0.99; } } @@ -100,10 +165,9 @@ for my $root ($root_from, $root_to) { localroot => $root_to, max_rsync_errors => 0, remote_dir => $root_from, - # verbose => 1, + verbose => 1, rsync_options => { compress => 0, - 'rsync-path' => '/usr/bin/rsync', links => 1, times => 1, # not available in rsync 3.0.3: 'omit-dir-times' => 1, @@ -124,7 +188,6 @@ for my $root ($root_from, $root_to) { } } - rmtree [$root_from, $root_to]; BEGIN { plan tests => $tests } -- 2.11.4.GIT