2 # Copyright (C) 2008 Stanislav Sinyagin
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 package Torrus
::Collector
::RawExport
;
35 # Register the storage type
36 $Torrus::Collector
::storageTypes
{'raw'} = 1;
39 # List of needed parameters and default values
41 $Torrus::Collector
::params
{'raw-storage'} = {
42 'raw-datadir' => undef,
44 'raw-field-separator' => undef,
45 'raw-timestamp-format' => undef,
47 'raw-counter-base' => undef,
48 'raw-counter-maxrate' => undef,
58 $Torrus::Collector
::initThreadsHandlers
{'raw-storage'} = \
&initThreads
;
62 Verbose
('Initializing the background thread for Raw export');
64 $thrUpdateQueue = new Thread
::Queue
;
65 $thrUpdateThread = threads
->create( \
&rawUpdateThread
);
66 $thrUpdateThread->detach();
71 $Torrus::Collector
::initTarget
{'raw-storage'} = \
&initTarget
;
74 my $base32 = Math
::BigInt
->new(2)->bpow(32);
75 my $base64 = Math
::BigInt
->new(2)->bpow(64);
79 my $collector = shift;
82 my $sref = $collector->storageData( 'raw' );
84 $collector->registerDeleteCallback( $token, \
&deleteTarget
);
86 my $filename = $collector->param($token, 'raw-file');
87 # Replace hash symbol with percent symbol for srtftime format
88 $filename =~ s/\#/%/go;
90 $filename = $collector->param($token, 'raw-datadir') . '/' . $filename;
91 $sref->{'byfile'}{$filename}{$token} = 1;
92 $sref->{'filename'}{$token} = $filename;
94 # We assume that timestamp format is the same within one file
96 if( not exists( $sref->{'timestamp_format'}{$filename} ) )
98 my $timestamp_format =
99 $collector->param($token, 'raw-timestamp-format');
100 $timestamp_format =~ s/\#/%/go;
101 $sref->{'timestamp_format'}{$filename} = $timestamp_format;
103 $sref->{'field_separator'}{$filename} =
104 $collector->param($token, 'raw-field-separator');
107 my $base = $collector->param($token, 'raw-counter-base');
108 if( defined( $base ) )
110 $sref->{'base'}{$token} = ($base == 32 ?
$base32:$base64);
112 my $maxrate = $collector->param($token, 'raw-counter-maxrate');
113 if( defined( $maxrate ) )
115 $sref->{'maxrate'}{$token} = Math
::BigFloat
->new($maxrate);
122 # Callback executed by Collector
126 my $collector = shift;
129 my $sref = $collector->storageData( 'raw' );
130 my $filename = $sref->{'filename'}{$token};
132 delete $sref->{'filename'}{$token};
134 delete $sref->{'byfile'}{$filename}{$token};
135 if( scalar( keys %{$sref->{'byfile'}{$filename}} ) == 0 )
137 delete $sref->{'byfile'}{$filename};
138 delete $sref->{'timestamp_format'}{$filename};
139 delete $sref->{'field_separator'}{$filename};
142 delete $sref->{'values'}{$token};
143 delete $sref->{'base'}{$token};
144 delete $sref->{'maxrate'}{$token};
150 $Torrus::Collector
::setValue
{'raw'} = \
&setValue
;
155 my $collector = shift;
158 my $timestamp = shift;
161 my $sref = $collector->storageData('raw');
163 $sref->{'values'}{$token} = [$value, $timestamp];
167 $Torrus::Collector
::storeData
{'raw'} = \
&storeData
;
171 my $collector = shift;
174 my $qSize = $thrUpdateQueue->pending();
175 $collector->setStatValue( 'RawQueue', $qSize );
176 if( $qSize > $thrQueueLimit )
178 Error
('Cannot enqueue Raw Export jobs: queue size is above limit');
181 while( my ($filename, $tokens) = each %{$sref->{'byfile'}} )
183 &Torrus
::DB
::checkInterrupted
();
185 my $filejob = &threads
::shared
::share
({});
187 $filejob->{'filename'} = $filename;
188 $filejob->{'ts_format'} = $sref->{'timestamp_format'}{$filename};
189 my $separator = $sref->{'field_separator'}{$filename};
190 $filejob->{'separator'} = $separator;
191 $filejob->{'values'} = &threads
::shared
::share
([]);
193 while( my($token, $dummy) = each %{$tokens} )
195 if( exists( $sref->{'values'}{$token} ) )
197 &Torrus
::DB
::checkInterrupted
();
199 my $rowentry = &threads
::shared
::share
({});
201 my ( $value, $timestamp ) = @
{$sref->{'values'}{$token}};
203 if( exists( $sref->{'base'}{$token} ) )
205 # we're dealing with a counter. Calculate the increment
209 delete $sref->{'prevCounter'}{$token};
210 delete $sref->{'prevTimestamp'}{$token};
217 if( exists( $sref->{'prevCounter'}{$token} ) )
219 my $prevValue = $sref->{'prevCounter'}{$token};
220 $prevTimestamp = $sref->{'prevTimestamp'}{$token};
222 if( $prevValue->bcmp( $value ) > 0 )
226 Math
::BigInt
->new($sref->{'base'}{$token});
227 $increment->bsub( $prevValue );
228 $increment->badd( $value );
232 $increment = Math
::BigInt
->new( $value );
233 $increment->bsub( $prevValue );
237 if( defined( $sref->{'maxrate'}{$token} ) )
239 my $rate = Math
::BigFloat
->new( $increment );
240 $rate->bdiv( $timestamp - $prevTimestamp );
241 if( $rate->bcmp($sref->{'maxrate'}{$token}) > 0 )
248 $sref->{'prevCounter'}{$token} = $value;
249 $sref->{'prevTimestamp'}{$token} = $timestamp;
251 # Set the value to pair of text values: increment, interval
252 if( defined( $increment ) )
254 $value = join( $separator, $increment->bstr(),
255 $timestamp - $prevTimestamp );
259 # nothing to store, proceed to the next token
267 # Convert BigInt to string
268 $value = $value->bstr();
272 $rowentry->{'value'} = $value;
273 $rowentry->{'time'} = $timestamp;
274 $rowentry->{'rowid'} =
275 $collector->param($token, 'raw-rowid');
277 push( @
{$filejob->{'values'}}, $rowentry );
281 if( scalar( @
{$filejob->{'values'}} ) > 0 )
283 $thrUpdateQueue->enqueue( $filejob );
287 delete $sref->{'values'};
293 &Torrus
::DB
::setSafeSignalHandlers
();
294 &Torrus
::Log
::setTID
( threads
->tid() );
298 &Torrus
::DB
::checkInterrupted
();
300 my $filejob = $thrUpdateQueue->dequeue();
302 my $fname = time2str
( $filejob->{'filename'}, time() );
304 if( not open(RAWOUT
, '>> ' . $fname) )
306 Error
('Cannot open ' . $fname . ' for writing: ' . $!);
310 my $ts_format = $filejob->{'ts_format'};
311 my $separator = $filejob->{'separator'};
313 for my $rowentry ( @
{$filejob->{'values'}} )
315 print RAWOUT
( join( $separator,
316 time2str
( $ts_format, $rowentry->{'time'} ),
317 $rowentry->{'rowid'},
318 $rowentry->{'value'} ),
321 &Torrus
::DB
::checkInterrupted
();
326 Debug
('RawExport: wrote ' . $fname);