bury all dead whitespace, better off to just do it in one command. i wonder why ss...
[torrus-plus.git] / src / lib / Torrus / Collector / RRDStorage.pm
blob623793f20348c3d067a781d7414239346f1cb921
1 # Copyright (C) 2002-2007 Stanislav Sinyagin
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
17 # Stanislav Sinyagin <ssinyagin@yahoo.com>
19 package Torrus::Collector::RRDStorage;
20 use strict;
21 use warnings;
23 use Torrus::ConfigTree;
24 use Torrus::Log;
25 use RRDs;
27 our $VERSION = 1.0;
29 our $useThreads;
30 our $threadsInUse = 0;
31 our $thrQueueLimit;
32 our $thrUpdateQueue;
33 our $thrErrorsQueue;
34 # RRDtool is not reentrant. use this semaphore for every call to RRDs::*
35 our $rrdtoolSemaphore;
36 our $thrUpdateThread;
38 our $moveConflictRRD;
39 our $conflictRRDPath;
41 # Register the storage type
42 $Torrus::Collector::storageTypes{'rrd'} = 1;
45 # List of needed parameters and default values
47 $Torrus::Collector::params{'rrd-storage'} = {
48 'data-dir' => undef,
49 'data-file' => undef,
50 'rrd-create-rra' => undef,
51 'rrd-create-heartbeat' => undef,
52 'rrd-create-min' => 'U',
53 'rrd-create-max' => 'U',
54 'rrd-hwpredict' => {
55 'enabled' => {
56 'rrd-create-hw-alpha' => 0.1,
57 'rrd-create-hw-beta' => 0.0035,
58 'rrd-create-hw-gamma' => 0.1,
59 'rrd-create-hw-winlen' => 9,
60 'rrd-create-hw-failth' => 6,
61 'rrd-create-hw-season' => 288,
62 'rrd-create-hw-rralen' => undef },
63 'disabled' => undef },
64 'rrd-create-dstype' => undef,
65 'rrd-ds' => undef
69 $Torrus::Collector::initThreadsHandlers{'rrd-storage'} =
70 \&Torrus::Collector::RRDStorage::initThreads;
72 sub initThreads
74 if( $useThreads and not defined( $thrUpdateThread ) )
76 Verbose('RRD storage is configured for multithreading. Initializing ' .
77 'the background thread');
78 require threads;
79 require threads::shared;
80 require Thread::Queue;
81 require Thread::Semaphore;
83 $thrUpdateQueue = Thread::Queue->new();
84 $thrErrorsQueue = Thread::Queue->new();
85 $rrdtoolSemaphore = Thread::Semaphore->new();
87 $thrUpdateThread = threads->create( \&rrdUpdateThread );
88 $thrUpdateThread->detach();
89 $threadsInUse = 1;
91 return;
96 $Torrus::Collector::initTarget{'rrd-storage'} =
97 \&Torrus::Collector::RRDStorage::initTarget;
99 sub initTarget
101 my $collector = shift;
102 my $token = shift;
104 my $sref = $collector->storageData( 'rrd' );
106 $collector->registerDeleteCallback
107 ( $token, \&Torrus::Collector::RRDStorage::deleteTarget );
109 my $filename =
110 $collector->param($token, 'data-dir') . '/' .
111 $collector->param($token, 'data-file');
113 $sref->{'byfile'}{$filename}{$token} = 1;
114 $sref->{'filename'}{$token} = $filename;
115 return 1;
120 $Torrus::Collector::setValue{'rrd'} =
121 \&Torrus::Collector::RRDStorage::setValue;
124 sub setValue
126 my $collector = shift;
127 my $token = shift;
128 my $value = shift;
129 my $timestamp = shift;
130 my $uptime = shift;
132 my $sref = $collector->storageData( 'rrd' );
134 $sref->{'values'}{$token} = [$value, $timestamp, $uptime];
135 return;
139 $Torrus::Collector::storeData{'rrd'} =
140 \&Torrus::Collector::RRDStorage::storeData;
142 sub storeData
144 my $collector = shift;
145 my $sref = shift;
147 if( $threadsInUse )
149 $collector->setStatValue( 'RRDQueue', $thrUpdateQueue->pending() );
152 if( $threadsInUse and $thrUpdateQueue->pending() > $thrQueueLimit )
154 Error('Cannot enqueue RRD files for updating: ' .
155 'queue size is above limit');
157 else
159 while( my ($filename, $tokens) = each %{$sref->{'byfile'}} )
161 &Torrus::DB::checkInterrupted();
163 if( not -e $filename )
165 createRRD( $collector, $sref, $filename, $tokens );
168 if( -e $filename )
170 updateRRD( $collector, $sref, $filename, $tokens );
175 delete $sref->{'values'};
176 return;
180 sub semaphoreDown
182 if( $threadsInUse )
184 $rrdtoolSemaphore->down();
186 return;
189 sub semaphoreUp
191 if( $threadsInUse )
193 $rrdtoolSemaphore->up();
195 return;
199 sub createRRD
201 my $collector = shift;
202 my $sref = shift;
203 my $filename = shift;
204 my $tokens = shift;
206 # We use hashes here, in order to make the superset of RRA
207 # definitions, and unique RRD names
208 my %DS_hash;
209 my %RRA_hash;
211 # Holt-Winters parameters
212 my $needs_hw = 0;
213 my %hwparam;
215 my $timestamp = time();
217 for my $token ( keys %{$tokens} )
219 my $ds_string =
220 sprintf('DS:%s:%s:%d:%s:%s',
221 $collector->param($token, 'rrd-ds'),
222 $collector->param($token, 'rrd-create-dstype'),
223 $collector->param($token, 'rrd-create-heartbeat'),
224 $collector->param($token, 'rrd-create-min'),
225 $collector->param($token, 'rrd-create-max'));
226 $DS_hash{$ds_string} = 1;
228 for my $rra_string
229 ( split(/\s+/, $collector->param($token, 'rrd-create-rra')) )
231 $RRA_hash{$rra_string} = 1;
234 if( $collector->param($token, 'rrd-hwpredict') eq 'enabled' )
236 $needs_hw = 1;
238 for my $param ( 'alpha', 'beta', 'gamma', 'winlen', 'failth',
239 'season', 'rralen' )
241 my $value = $collector->param($token, 'rrd-create-hw-'.$param);
243 if( defined( $hwparam{$param} ) and
244 $hwparam{$param} != $value )
246 my $paramname = 'rrd-create-hw-'.$param;
247 Warn("Parameter " . $paramname . " was already defined " .
248 "with differentr value for " . $filename);
251 $hwparam{$param} = $value;
255 if( ref $sref->{'values'}{$token} )
257 my $new_ts = $sref->{'values'}{$token}[1];
258 if( $new_ts > 0 and $new_ts < $timestamp )
260 $timestamp = $new_ts;
265 my @DS = sort keys %DS_hash;
266 my @RRA = sort keys %RRA_hash;
268 if( $needs_hw )
270 ## Define the RRAs for Holt-Winters prediction
272 my $hwpredict_rran = scalar(@RRA) + 1;
273 my $seasonal_rran = $hwpredict_rran + 1;
274 my $devseasonal_rran = $hwpredict_rran + 2;
275 my $devpredict_rran = $hwpredict_rran + 3;
276 my $failures_rran = $hwpredict_rran + 4;
278 push( @RRA, sprintf('RRA:HWPREDICT:%d:%e:%e:%d:%d',
279 $hwparam{'rralen'},
280 $hwparam{'alpha'},
281 $hwparam{'beta'},
282 $hwparam{'season'},
283 $seasonal_rran));
285 push( @RRA, sprintf('RRA:SEASONAL:%d:%e:%d',
286 $hwparam{'season'},
287 $hwparam{'gamma'},
288 $hwpredict_rran));
290 push( @RRA, sprintf('RRA:DEVSEASONAL:%d:%e:%d',
291 $hwparam{'season'},
292 $hwparam{'gamma'},
293 $hwpredict_rran));
295 push( @RRA, sprintf('RRA:DEVPREDICT:%d:%d',
296 $hwparam{'rralen'},
297 $devseasonal_rran));
299 push( @RRA, sprintf('RRA:FAILURES:%d:%d:%d:%d',
300 $hwparam{'rralen'},
301 $hwparam{'failth'},
302 $hwparam{'winlen'},
303 $devseasonal_rran));
306 my $step = $collector->period();
307 my $start = $timestamp - $step;
309 my @OPT = ( sprintf( '--start=%d', $start ),
310 sprintf( '--step=%d', $step ) );
312 &Torrus::DB::checkInterrupted();
314 Debug("Creating RRD $filename: " . join(" ", @OPT, @DS, @RRA));
316 semaphoreDown();
318 RRDs::create($filename,
319 @OPT,
320 @DS,
321 @RRA);
323 my $err = RRDs::error();
325 semaphoreUp();
327 Error("ERROR creating $filename: $err") if $err;
329 delete $sref->{'rrdinfo_ds'}{$filename};
330 return;
334 sub updateRRD
336 my $collector = shift;
337 my $sref = shift;
338 my $filename = shift;
339 my $tokens = shift;
341 if( not defined( $sref->{'rrdinfo_ds'}{$filename} ) )
343 my $ref = {};
344 $sref->{'rrdinfo_ds'}{$filename} = $ref;
346 semaphoreDown();
348 my $rrdinfo = RRDs::info( $filename );
350 semaphoreUp();
352 for my $prop ( keys %$rrdinfo )
354 if( $prop =~ /^ds\[(\S+)\]\./o )
356 $ref->{$1} = 1;
360 &Torrus::DB::checkInterrupted();
363 # First we compare the sets of datasources in our memory and in RRD file
364 my %ds_updating = ();
365 my $ds_conflict = 0;
367 for my $token ( keys %{$tokens} )
369 $ds_updating{ $collector->param($token, 'rrd-ds') } = $token;
372 # Check if we update all datasources in RRD file
373 for my $ds ( keys %{$sref->{'rrdinfo_ds'}{$filename}} )
375 if( not $ds_updating{$ds} )
377 Warn('Datasource exists in RRD file, but it is not updated: ' .
378 $ds . ' in ' . $filename);
379 $ds_conflict = 1;
383 # Check if all DS that we update are defined in RRD
384 for my $ds ( keys %ds_updating )
386 if( not $sref->{'rrdinfo_ds'}{$filename}{$ds} )
388 Error("Datasource being updated does not exist: $ds in $filename");
389 delete $ds_updating{$ds};
390 $ds_conflict = 1;
394 if( $ds_conflict and $moveConflictRRD )
396 if( not -f $filename )
398 Error($filename . 'is not a regular file');
399 return;
402 my( $sec, $min, $hour, $mday, $mon, $year) = localtime( time() );
403 my $destfile = sprintf('%s_%04d%02d%02d%02d%02d',
404 $filename,
405 $year + 1900, $mon+1, $mday, $hour, $min);
407 my $destdir = $conflictRRDPath;
408 if( defined( $destdir ) and -d $destdir )
410 my @fpath = split('/', $destfile);
411 my $fname = pop( @fpath );
412 $destfile = $destdir . '/' . $fname;
415 Warn('Moving the conflicted RRD file ' . $filename .
416 ' to ' . $destfile);
417 rename( $filename, $destfile ) or
418 Error("Cannot rename $filename to $destfile: $!");
420 delete $sref->{'rrdinfo_ds'}{$filename};
422 createRRD( $collector, $sref, $filename, $tokens );
425 if( scalar( keys %ds_updating ) == 0 )
427 Error("No datasources to update in $filename");
428 return;
431 &Torrus::DB::checkInterrupted();
433 # Build the arguments for RRDs::update.
434 my $template = '';
435 my $values;
437 # We will use the average timestamp
438 my @timestamps;
439 my $max_ts = 0;
440 my $min_ts = time();
442 my $step = $collector->period();
444 my $has_values = 0;
446 for my $ds ( keys %ds_updating )
448 my $token = $ds_updating{$ds};
449 if( length($template) > 0 )
451 $template .= ':';
453 $template .= $ds;
455 my $now = time();
456 my ( $value, $timestamp, $uptime ) = ( 'U', $now, $now );
457 if( ref $sref->{'values'}{$token} )
459 ($value, $timestamp, $uptime) = @{$sref->{'values'}{$token}};
460 $has_values = 1;
463 push( @timestamps, $timestamp );
464 if( $timestamp > $max_ts )
466 $max_ts = $timestamp;
468 if( $timestamp < $min_ts )
470 $min_ts = $timestamp;
473 # The plus sign generated by BigInt is not a problem for rrdtool
474 $values .= ':'. $value;
477 if( not $has_values )
479 return;
482 # Get the average timestamp
483 my $sum = 0;
484 map {$sum += $_} @timestamps;
485 my $avg_ts = $sum / scalar( @timestamps );
487 if( ($max_ts - $avg_ts) > $Torrus::Global::RRDTimestampTolerance )
489 Error("Maximum timestamp value is beyond the tolerance in $filename");
491 if( ($avg_ts - $min_ts) > $Torrus::Global::RRDTimestampTolerance )
493 Error("Minimum timestamp value is beyond the tolerance in $filename");
496 my @cmd = ( "--template=" . $template,
497 sprintf("%d%s", $avg_ts, $values) );
499 &Torrus::DB::checkInterrupted();
501 if( $threadsInUse )
503 # Process errors from RRD update thread
504 my $errfilename;
505 while( defined( $errfilename = $thrErrorsQueue->dequeue_nb() ) )
507 delete $sref->{'rrdinfo_ds'}{$errfilename};
510 Debug('Enqueueing update job for ' . $filename);
512 my $cmdlist = &threads::shared::share([]);
513 push( @{$cmdlist}, $filename, @cmd );
514 $thrUpdateQueue->enqueue( $cmdlist );
516 else
518 if( isDebug )
520 Debug("Updating $filename: " . join(' ', @cmd));
522 RRDs::update( $filename, @cmd );
523 my $err = RRDs::error();
524 if( $err )
526 Error("ERROR updating $filename: $err");
527 delete $sref->{'rrdinfo_ds'}{$filename};
530 return;
534 # A background thread that updates RRD files
535 sub rrdUpdateThread
537 &Torrus::DB::setSafeSignalHandlers();
538 &Torrus::Log::setTID( threads->tid() );
540 my $cmdlist;
541 &threads::shared::share( \$cmdlist );
543 while(1)
545 &Torrus::DB::checkInterrupted();
547 $cmdlist = $thrUpdateQueue->dequeue();
549 if( isDebug )
551 Debug("Updating RRD: " . join(' ', @{$cmdlist}));
554 $rrdtoolSemaphore->down();
556 RRDs::update( @{$cmdlist} );
557 my $err = RRDs::error();
559 $rrdtoolSemaphore->up();
561 if( $err )
563 Error('ERROR updating' . $cmdlist->[0] . ': ' . $err);
564 $thrErrorsQueue->enqueue( $cmdlist->[0] );
567 return;
572 # Callback executed by Collector
574 sub deleteTarget
576 my $collector = shift;
577 my $token = shift;
579 my $sref = $collector->storageData( 'rrd' );
580 my $filename = $sref->{'filename'}{$token};
582 delete $sref->{'filename'}{$token};
584 delete $sref->{'byfile'}{$filename}{$token};
585 if( scalar( keys %{$sref->{'byfile'}{$filename}} ) == 0 )
587 delete $sref->{'byfile'}{$filename};
590 delete $sref->{'values'}{$token};
591 return;
598 # Local Variables:
599 # mode: perl
600 # indent-tabs-mode: nil
601 # perl-indent-level: 4
602 # End: