updates
[torrus-plus.git] / plugins / rawexport / RawExport.pm
blobca5c501f38ad1b03a34001b9658185d1a022c194
2 # Copyright (C) 2008 Stanislav Sinyagin
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 package Torrus::Collector::RawExport;
23 use strict;
24 use threads;
25 use threads::shared;
26 use Thread::Queue;
27 use Date::Format;
28 use Math::BigInt;
29 use Math::BigFloat;
32 use Torrus::Log;
35 # Register the storage type
36 $Torrus::Collector::storageTypes{'raw'} = 1;
39 # List of needed parameters and default values
41 $Torrus::Collector::params{'raw-storage'} = {
42 'raw-datadir' => undef,
43 'raw-file' => undef,
44 'raw-field-separator' => undef,
45 'raw-timestamp-format' => undef,
46 'raw-rowid' => undef,
47 'raw-counter-base' => undef,
48 'raw-counter-maxrate' => undef,
52 our $thrQueueLimit;
54 our $thrUpdateQueue;
55 our $thrUpdateThread;
58 $Torrus::Collector::initThreadsHandlers{'raw-storage'} = \&initThreads;
60 sub initThreads
62 Verbose('Initializing the background thread for Raw export');
64 $thrUpdateQueue = new Thread::Queue;
65 $thrUpdateThread = threads->create( \&rawUpdateThread );
66 $thrUpdateThread->detach();
71 $Torrus::Collector::initTarget{'raw-storage'} = \&initTarget;
74 my $base32 = Math::BigInt->new(2)->bpow(32);
75 my $base64 = Math::BigInt->new(2)->bpow(64);
77 sub initTarget
79 my $collector = shift;
80 my $token = shift;
82 my $sref = $collector->storageData( 'raw' );
84 $collector->registerDeleteCallback( $token, \&deleteTarget );
86 my $filename = $collector->param($token, 'raw-file');
87 # Replace hash symbol with percent symbol for srtftime format
88 $filename =~ s/\#/%/go;
90 $filename = $collector->param($token, 'raw-datadir') . '/' . $filename;
91 $sref->{'byfile'}{$filename}{$token} = 1;
92 $sref->{'filename'}{$token} = $filename;
94 # We assume that timestamp format is the same within one file
96 if( not exists( $sref->{'timestamp_format'}{$filename} ) )
98 my $timestamp_format =
99 $collector->param($token, 'raw-timestamp-format');
100 $timestamp_format =~ s/\#/%/go;
101 $sref->{'timestamp_format'}{$filename} = $timestamp_format;
103 $sref->{'field_separator'}{$filename} =
104 $collector->param($token, 'raw-field-separator');
107 my $base = $collector->param($token, 'raw-counter-base');
108 if( defined( $base ) )
110 $sref->{'base'}{$token} = ($base == 32 ? $base32:$base64);
112 my $maxrate = $collector->param($token, 'raw-counter-maxrate');
113 if( defined( $maxrate ) )
115 $sref->{'maxrate'}{$token} = Math::BigFloat->new($maxrate);
122 # Callback executed by Collector
124 sub deleteTarget
126 my $collector = shift;
127 my $token = shift;
129 my $sref = $collector->storageData( 'raw' );
130 my $filename = $sref->{'filename'}{$token};
132 delete $sref->{'filename'}{$token};
134 delete $sref->{'byfile'}{$filename}{$token};
135 if( scalar( keys %{$sref->{'byfile'}{$filename}} ) == 0 )
137 delete $sref->{'byfile'}{$filename};
138 delete $sref->{'timestamp_format'}{$filename};
139 delete $sref->{'field_separator'}{$filename};
142 delete $sref->{'values'}{$token};
143 delete $sref->{'base'}{$token};
144 delete $sref->{'maxrate'}{$token};
150 $Torrus::Collector::setValue{'raw'} = \&setValue;
153 sub setValue
155 my $collector = shift;
156 my $token = shift;
157 my $value = shift;
158 my $timestamp = shift;
159 my $uptime = shift;
161 my $sref = $collector->storageData('raw');
163 $sref->{'values'}{$token} = [$value, $timestamp];
167 $Torrus::Collector::storeData{'raw'} = \&storeData;
169 sub storeData
171 my $collector = shift;
172 my $sref = shift;
174 my $qSize = $thrUpdateQueue->pending();
175 $collector->setStatValue( 'RawQueue', $qSize );
176 if( $qSize > $thrQueueLimit )
178 Error('Cannot enqueue Raw Export jobs: queue size is above limit');
181 while( my ($filename, $tokens) = each %{$sref->{'byfile'}} )
183 &Torrus::DB::checkInterrupted();
185 my $filejob = &threads::shared::share({});
187 $filejob->{'filename'} = $filename;
188 $filejob->{'ts_format'} = $sref->{'timestamp_format'}{$filename};
189 my $separator = $sref->{'field_separator'}{$filename};
190 $filejob->{'separator'} = $separator;
191 $filejob->{'values'} = &threads::shared::share([]);
193 while( my($token, $dummy) = each %{$tokens} )
195 if( exists( $sref->{'values'}{$token} ) )
197 &Torrus::DB::checkInterrupted();
199 my $rowentry = &threads::shared::share({});
201 my ( $value, $timestamp ) = @{$sref->{'values'}{$token}};
203 if( exists( $sref->{'base'}{$token} ) )
205 # we're dealing with a counter. Calculate the increment
207 if( $value eq 'U' )
209 delete $sref->{'prevCounter'}{$token};
210 delete $sref->{'prevTimestamp'}{$token};
211 next;
214 my $increment;
215 my $prevTimestamp;
217 if( exists( $sref->{'prevCounter'}{$token} ) )
219 my $prevValue = $sref->{'prevCounter'}{$token};
220 $prevTimestamp = $sref->{'prevTimestamp'}{$token};
222 if( $prevValue->bcmp( $value ) > 0 )
224 # previous is bigger
225 $increment =
226 Math::BigInt->new($sref->{'base'}{$token});
227 $increment->bsub( $prevValue );
228 $increment->badd( $value );
230 else
232 $increment = Math::BigInt->new( $value );
233 $increment->bsub( $prevValue );
237 if( defined( $sref->{'maxrate'}{$token} ) )
239 my $rate = Math::BigFloat->new( $increment );
240 $rate->bdiv( $timestamp - $prevTimestamp );
241 if( $rate->bcmp($sref->{'maxrate'}{$token}) > 0 )
243 $increment = undef;
248 $sref->{'prevCounter'}{$token} = $value;
249 $sref->{'prevTimestamp'}{$token} = $timestamp;
251 # Set the value to pair of text values: increment, interval
252 if( defined( $increment ) )
254 $value = join( $separator, $increment->bstr(),
255 $timestamp - $prevTimestamp );
257 else
259 # nothing to store, proceed to the next token
260 next;
263 else
265 if( ref( $value ) )
267 # Convert BigInt to string
268 $value = $value->bstr();
272 $rowentry->{'value'} = $value;
273 $rowentry->{'time'} = $timestamp;
274 $rowentry->{'rowid'} =
275 $collector->param($token, 'raw-rowid');
277 push( @{$filejob->{'values'}}, $rowentry );
281 if( scalar( @{$filejob->{'values'}} ) > 0 )
283 $thrUpdateQueue->enqueue( $filejob );
287 delete $sref->{'values'};
291 sub rawUpdateThread
293 &Torrus::DB::setSafeSignalHandlers();
294 &Torrus::Log::setTID( threads->tid() );
296 while(1)
298 &Torrus::DB::checkInterrupted();
300 my $filejob = $thrUpdateQueue->dequeue();
302 my $fname = time2str( $filejob->{'filename'}, time() );
304 if( not open(RAWOUT, '>> ' . $fname) )
306 Error('Cannot open ' . $fname . ' for writing: ' . $!);
307 next;
310 my $ts_format = $filejob->{'ts_format'};
311 my $separator = $filejob->{'separator'};
313 for my $rowentry ( @{$filejob->{'values'}} )
315 print RAWOUT ( join( $separator,
316 time2str( $ts_format, $rowentry->{'time'} ),
317 $rowentry->{'rowid'},
318 $rowentry->{'value'} ),
319 "\n");
321 &Torrus::DB::checkInterrupted();
324 close(RAWOUT);
326 Debug('RawExport: wrote ' . $fname);