pmrep: consistenly cleanup the writer object, cosmetic
[pcp.git] / src / pmrep / pmrep.py
blob8d3b8fd8059222782998e8155d5378e361c8712e
1 #!/usr/bin/pcp python
3 # Copyright (C) 2015 Marko Myllynen <myllynen@redhat.com>
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 # for more details.
15 # [zbxsend] Copyright (C) 2014 Sergey Kirillov <sergey.kirillov@gmail.com>
16 # All rights reserved.
18 # Redistribution and use in source and binary forms, with or without
19 # modification, are permitted provided that the following conditions
20 # are met:
22 # 1. Redistributions of source code must retain the above copyright
23 # notice, this list of conditions and the following disclaimer.
25 # 2. Redistributions in binary form must reproduce the above copyright
26 # notice, this list of conditions and the following disclaimer in the
27 # documentation and/or other materials provided with the distribution.
29 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 # HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
35 # TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
36 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
39 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 # pylint: disable=fixme, line-too-long, bad-whitespace, invalid-name
42 # pylint: disable=superfluous-parens
43 """ Performance Metrics Reporter """
45 from collections import OrderedDict
46 from datetime import datetime
47 try:
48 import ConfigParser
49 except ImportError:
50 import configparser as ConfigParser
51 try:
52 import json
53 except:
54 import simplejson as json
55 import socket
56 import struct
57 import time
58 import copy
59 import sys
60 import os
61 import re
63 from pcp import pmapi, pmi
64 from cpmapi import PM_CONTEXT_ARCHIVE, PM_CONTEXT_HOST, PM_CONTEXT_LOCAL, PM_MODE_FORW, PM_MODE_INTERP, PM_ERR_TYPE, PM_ERR_EOL, PM_ERR_NAME, PM_IN_NULL, PM_SEM_COUNTER, PM_TIME_MSEC, PM_TIME_SEC, PM_XTB_SET
65 from cpmapi import PM_TYPE_32, PM_TYPE_U32, PM_TYPE_64, PM_TYPE_U64, PM_TYPE_FLOAT, PM_TYPE_DOUBLE, PM_TYPE_STRING
66 from cpmi import PMI_ERR_DUPINSTNAME
68 if sys.version_info[0] >= 3:
69 long = int
71 # Default config
72 DEFAULT_CONFIG = "./pmrep.conf"
74 # Default field separators, config/time formats, missing/truncated values
75 CSVSEP = ","
76 CSVTIME = "%Y-%m-%d %H:%M:%S"
77 OUTSEP = " "
78 OUTTIME = "%H:%M:%S"
79 ZBXPORT = 10051
80 ZBXPRFX = "pcp."
81 NO_VAL = "N/A"
82 TRUNC = "xxx"
83 VERSION = 1
85 # Output targets
86 OUTPUT_ARCHIVE = "archive"
87 OUTPUT_CSV = "csv"
88 OUTPUT_STDOUT = "stdout"
89 OUTPUT_ZABBIX = "zabbix"
91 class ZabbixMetric(object):
92 """ A Zabbix metric """
93 def __init__(self, host, key, value, clock):
94 self.host = host
95 self.key = key
96 self.value = value
97 self.clock = clock
99 def __repr__(self):
100 return 'Metric(%r, %r, %r, %r)' % (self.host, self.key, self.value, self.clock)
102 def recv_from_zabbix(sock, count):
103 """ Receive a response from a Zabbix server. """
104 buf = b''
105 while len(buf) < count:
106 chunk = sock.recv(count - len(buf))
107 if not chunk:
108 return buf
109 buf += chunk
110 return buf
112 def send_to_zabbix(metrics, zabbix_host, zabbix_port, timeout=15):
113 """ Send a set of metrics to a Zabbix server. """
115 j = json.dumps
116 # Zabbix has a very fragile JSON parser, so we cannot use json to
117 # dump the whole packet
118 metrics_data = []
119 for m in metrics:
120 clock = m.clock or time.time()
121 metrics_data.append(('\t\t{\n'
122 '\t\t\t"host":%s,\n'
123 '\t\t\t"key":%s,\n'
124 '\t\t\t"value":%s,\n'
125 '\t\t\t"clock":%.5f}') % (j(m.host), j(m.key), j(m.value), clock))
126 json_data = ('{\n'
127 '\t"request":"sender data",\n'
128 '\t"data":[\n%s]\n'
129 '}') % (',\n'.join(metrics_data))
131 data_len = struct.pack('<Q', len(json_data))
132 packet = b'ZBXD\1' + data_len + json_data.encode('utf-8')
133 try:
134 zabbix = socket.socket()
135 zabbix.connect((zabbix_host, zabbix_port))
136 zabbix.settimeout(timeout)
137 # send metrics to zabbix
138 zabbix.sendall(packet)
139 # get response header from zabbix
140 resp_hdr = recv_from_zabbix(zabbix, 13)
141 if not bytes.decode(resp_hdr).startswith('ZBXD\1') or len(resp_hdr) != 13:
142 # debug: write('Invalid Zabbix response len=%d' % len(resp_hdr))
143 return False
144 resp_body_len = struct.unpack('<Q', resp_hdr[5:])[0]
145 # get response body from zabbix
146 resp_body = zabbix.recv(resp_body_len)
147 resp = json.loads(bytes.decode(resp_body))
148 # debug: write('Got response from Zabbix: %s' % resp)
149 if resp.get('response') != 'success':
150 sys.stderr.write('Error response from Zabbix: %s', resp)
151 return False
152 return True
153 except socket.timeout as err:
154 sys.stderr.write("Zabbix connection timed out: " + str(err))
155 return False
156 finally:
157 zabbix.close()
159 class PMReporter(object):
160 """ Report PCP metrics """
162 def __init__(self):
163 """ Construct object, prepare for command line handling """
164 self.context = None
165 self.check = 0
166 self.format = None # output format
167 self.opts = self.options()
168 pmapi.c_api.pmSetOptionFlags(pmapi.c_api.PM_OPTFLAG_POSIX) # RHBZ#1289912
170 # Configuration directives
171 self.keys = ('source', 'output', 'derived', 'header', 'unitinfo',
172 'globals', 'timestamp', 'samples', 'interval',
173 'delay', 'type', 'width', 'precision', 'delimiter',
174 'extheader', 'repeat_header', 'timefmt', 'interpol',
175 'count_scale', 'space_scale', 'time_scale', 'version',
176 'zabbix_server', 'zabbix_port', 'zabbix_host', 'zabbix_interval')
178 # Special command line switches
179 self.argless = ('-C', '--check', '-L', '--local-PMDA', '-H', '--no-header', '-U', '--no-unit-info', '-G', '--no-globals', '-p', '--timestamps', '-d', '--delay', '-r', '--raw', '-x', '--extended-header', '-u', '--no-interpol', '-z', '--hostzone')
180 self.arghelp = ('-?', '--help', '-V', '--version')
182 # The order of preference for parameters (as present):
183 # 1 - command line parameters
184 # 2 - parameters from configuration file(s)
185 # 3 - built-in defaults defined below
186 self.config = self.set_config_file()
187 self.version = VERSION
188 self.source = "local:"
189 self.output = OUTPUT_STDOUT
190 self.outfile = None
191 self.writer = None
192 self.pmi = None
193 self.derived = None
194 self.header = 1
195 self.unitinfo = 1
196 self.globals = 1
197 self.timestamp = 0
198 self.samples = None # forever
199 self.interval = pmapi.timeval(1) # 1 sec
200 self.opts.pmSetOptionInterval(str(1))
201 self.runtime = -1
202 self.delay = 0
203 self.type = 0
204 self.width = 0
205 self.precision = 3 # .3f
206 self.delimiter = None
207 self.extheader = 0
208 self.repeat_header = 0
209 self.timefmt = None
210 self.interpol = 1
211 self.count_scale = None
212 self.space_scale = None
213 self.time_scale = None
214 self.can_scale = None # PCP 3.9 compat
216 # Performance metrics store
217 # key - metric name
218 # values - 0:label, 1:instance(s), 2:unit/scale, 3:type, 4:width
219 self.metrics = OrderedDict()
221 # Corresponding config file metric specifiers
222 self.metricspec = ('label', 'instance', 'unit', 'type', 'width', 'formula')
224 self.prevvals = None
225 self.currvals = None
226 self.ptstamp = 0
227 self.ctstamp = 0
228 self.pmids = []
229 self.descs = []
230 self.insts = []
232 # Zabbix integration
233 self.zabbix_server = None
234 self.zabbix_port = ZBXPORT
235 self.zabbix_host = None
236 self.zabbix_interval = None
237 self.zabbix_prevsend = None
238 self.zabbix_metrics = []
240 def set_config_file(self):
241 """ Set configuration file """
242 config = DEFAULT_CONFIG
244 # Possibly override the built-in default config file before
245 # parsing the rest of the command line parameters
246 args = iter(sys.argv[1:])
247 for arg in args:
248 if arg in self.arghelp:
249 return None
250 if arg == '-c' or arg == '--config':
251 try:
252 config = next(args)
253 if not os.path.isfile(config) or not os.access(config, os.R_OK):
254 raise IOError("Failed to read configuration file '%s'." % config)
255 except StopIteration:
256 break
257 return config
259 def set_attr(self, name, value):
260 """ Helper to apply config file settings properly """
261 if value in ('true', 'True', 'y', 'yes', 'Yes'):
262 value = 1
263 if value in ('false', 'False', 'n', 'no', 'No'):
264 value = 0
265 if name == 'source':
266 try: # RHBZ#1270176 / PCP < 3.10.8
267 if '/' in value:
268 self.opts.pmSetOptionArchive(value)
269 else:
270 self.opts.pmSetOptionHost(value) # RHBZ#1289911
271 except:
272 sys.stderr.write("PCP 3.10.8 or later required for the 'source' directive.\n")
273 sys.exit(1)
274 elif name == 'samples':
275 self.opts.pmSetOptionSamples(value)
276 self.samples = self.opts.pmGetOptionSamples()
277 elif name == 'interval':
278 self.opts.pmSetOptionInterval(value)
279 self.interval = self.opts.pmGetOptionInterval()
280 elif name == 'type':
281 if value == 'raw':
282 self.type = 1
283 else:
284 self.type = 0
285 else:
286 try:
287 setattr(self, name, int(value))
288 except ValueError:
289 setattr(self, name, value)
291 def read_config(self):
292 """ Read options from configuration file """
293 if self.config is None:
294 return
295 config = ConfigParser.SafeConfigParser()
296 config.read(self.config)
297 if not config.has_section('options'):
298 return
299 for opt in config.options('options'):
300 if opt in self.keys:
301 self.set_attr(opt, config.get('options', opt))
302 else:
303 sys.stderr.write("Invalid directive '%s' in %s.\n" % (opt, self.config))
304 sys.exit(1)
306 def options(self):
307 """ Setup default command line argument option handling """
308 opts = pmapi.pmOptions()
309 opts.pmSetOptionCallback(self.option)
310 opts.pmSetOverrideCallback(self.option_override)
311 opts.pmSetShortOptions("a:h:LK:c:Co:F:e:D:V?HUGpA:S:T:O:s:t:Z:zdrw:P:l:xE:f:uq:b:y:")
312 opts.pmSetShortUsage("[option...] metricspec [...]")
314 opts.pmSetLongOptionHeader("General options")
315 opts.pmSetLongOptionArchive() # -a/--archive
316 opts.pmSetLongOptionArchiveFolio() # --archive-folio
317 opts.pmSetLongOptionHost() # -h/--host
318 opts.pmSetLongOptionLocalPMDA() # -L/--local-PMDA
319 opts.pmSetLongOptionSpecLocal() # -K/--spec-local
320 opts.pmSetLongOption("config", 1, "c", "FILE", "config file path")
321 opts.pmSetLongOption("check", 0, "C", "", "check config and metrics and exit")
322 opts.pmSetLongOption("output", 1, "o", "OUTPUT", "output target: archive, csv, stdout (default), or zabbix")
323 opts.pmSetLongOption("output-file", 1, "F", "OUTFILE", "output file")
324 opts.pmSetLongOption("derived", 1, "e", "FILE|DFNT", "derived metrics definitions")
325 #opts.pmSetLongOptionGuiMode() # -g/--guimode # RHBZ#1289910
326 opts.pmSetLongOptionDebug() # -D/--debug
327 opts.pmSetLongOptionVersion() # -V/--version
328 opts.pmSetLongOptionHelp() # -?/--help
330 opts.pmSetLongOptionHeader("Reporting options")
331 opts.pmSetLongOption("no-header", 0, "H", "", "omit headers")
332 opts.pmSetLongOption("no-unit-info", 0, "U", "", "omit unit info from headers")
333 opts.pmSetLongOption("no-globals", 0, "G", "", "omit global metrics")
334 opts.pmSetLongOption("timestamps", 0, "p", "", "print timestamps")
335 opts.pmSetLongOptionAlign() # -A/--align
336 opts.pmSetLongOptionStart() # -S/--start
337 opts.pmSetLongOptionFinish() # -T/--finish
338 opts.pmSetLongOptionOrigin() # -O/--origin
339 opts.pmSetLongOptionSamples() # -s/--samples
340 opts.pmSetLongOptionInterval() # -t/--interval
341 opts.pmSetLongOptionTimeZone() # -Z/--timezone
342 opts.pmSetLongOptionHostZone() # -z/--hostzone
343 opts.pmSetLongOption("delay", 0, "d", "", "delay, pause between updates for archive replay")
344 opts.pmSetLongOption("raw", 0, "r", "", "output raw counter values (no rate conversion)")
345 opts.pmSetLongOption("width", 1, "w", "N", "default column width")
346 opts.pmSetLongOption("precision", 1, "P", "N", "N digits after the decimal separator (if width enough)")
347 opts.pmSetLongOption("delimiter", 1, "l", "STR", "delimiter to separate csv/stdout columns")
348 opts.pmSetLongOption("extended-header", 0, "x", "", "display extended header")
349 opts.pmSetLongOption("repeat-header", 1, "E", "N", "repeat stdout headers every N lines")
350 opts.pmSetLongOption("timestamp-format", 1, "f", "STR", "strftime string for timestamp format")
351 opts.pmSetLongOption("no-interpol", 0, "u", "", "disable interpolation mode with archives")
352 opts.pmSetLongOption("count-scale", 1, "q", "SCALE", "default count unit")
353 opts.pmSetLongOption("space-scale", 1, "b", "SCALE", "default space unit")
354 opts.pmSetLongOption("time-scale", 1, "y", "SCALE", "default time unit")
356 return opts
358 def option_override(self, opt):
359 """ Override a few standard PCP options """
360 if opt == 'H' or opt == 'p':
361 return 1
362 return 0
364 def option(self, opt, optarg, index):
365 """ Perform setup for an individual command line option """
366 if opt == 'c':
367 self.config = optarg
368 elif opt == 'C':
369 self.check = 1
370 elif opt == 'o':
371 if optarg == OUTPUT_ARCHIVE:
372 self.output = OUTPUT_ARCHIVE
373 elif optarg == OUTPUT_CSV:
374 self.output = OUTPUT_CSV
375 elif optarg == OUTPUT_STDOUT:
376 self.output = OUTPUT_STDOUT
377 elif optarg == OUTPUT_ZABBIX:
378 self.output = OUTPUT_ZABBIX
379 else:
380 sys.stderr.write("Invalid output target %s specified.\n" % optarg)
381 sys.exit(1)
382 elif opt == 'F':
383 if os.path.exists(optarg + ".index"):
384 sys.stderr.write("Archive %s already exists.\n" % optarg)
385 sys.exit(1)
386 if os.path.exists(optarg):
387 sys.stderr.write("File %s already exists.\n" % optarg)
388 sys.exit(1)
389 self.outfile = optarg
390 elif opt == 'e':
391 self.derived = optarg
392 elif opt == 'H':
393 self.header = 0
394 elif opt == 'U':
395 self.unitinfo = 0
396 elif opt == 'G':
397 self.globals = 0
398 elif opt == 'p':
399 self.timestamp = 1
400 elif opt == 'd':
401 self.delay = 1
402 elif opt == 'r':
403 self.type = 1
404 elif opt == 'w':
405 self.width = int(optarg)
406 elif opt == 'P':
407 self.precision = int(optarg)
408 elif opt == 'l':
409 self.delimiter = optarg
410 elif opt == 'x':
411 self.extheader = 1
412 elif opt == 'E':
413 self.repeat_header = int(optarg)
414 elif opt == 'f':
415 self.timefmt = optarg
416 elif opt == 'u':
417 self.interpol = 0
418 elif opt == 'q':
419 self.count_scale = optarg
420 elif opt == 'b':
421 self.space_scale = optarg
422 elif opt == 'y':
423 self.time_scale = optarg
424 else:
425 raise pmapi.pmUsageErr()
427 def get_cmd_line_metrics(self):
428 """ Get metric set specifications from the command line """
429 for arg in sys.argv[1:]:
430 if arg in self.arghelp:
431 return 0
432 metrics = []
433 for arg in reversed(sys.argv[1:]):
434 if arg.startswith('-'):
435 if len(metrics):
436 if arg not in self.argless and '=' not in arg:
437 del metrics[-1]
438 break
439 metrics.append(arg)
440 metrics.reverse()
441 return metrics
443 def parse_metric_info(self, metrics, key, value):
444 """ Parse metric information """
445 # NB. Uses the config key, not the metric, as the dict key
446 if ',' in value:
447 # Compact / one-line definition
448 metrics[key] = (key + "," + value).split(",")
449 else:
450 # Verbose / multi-line definition
451 if not '.' in key or key.rsplit(".", 1)[1] not in self.metricspec:
452 # New metric
453 metrics[key] = value.split()
454 for index in range(0, 6):
455 if len(metrics[key]) <= index:
456 metrics[key].append(None)
457 else:
458 # Additional info
459 key, spec = key.rsplit(".", 1)
460 if key not in metrics:
461 sys.stderr.write("Undeclared metric key %s.\n" % key)
462 sys.exit(1)
463 if spec == "formula":
464 if self.derived == None:
465 self.derived = metrics[key][0] + "=" + value
466 else:
467 self.derived += "," + metrics[key][0] + "=" + value
468 else:
469 metrics[key][self.metricspec.index(spec)+1] = value
471 def prepare_metrics(self):
472 """ Construct and prepare the initial metrics set """
473 # Get direct and/or sets of metrics from the command line
474 metrics = self.get_cmd_line_metrics()
475 if metrics == 0:
476 return
477 if not metrics:
478 sys.stderr.write("No metrics specified.\n")
479 raise pmapi.pmUsageErr()
481 # Don't rely on what get_cmd_line_metrics() might do
482 if '-G' in sys.argv:
483 self.globals = 0
485 # Read config
486 config = ConfigParser.SafeConfigParser()
487 config.read(self.config)
489 # First read global metrics (if not disabled already)
490 globmet = OrderedDict()
491 if self.globals == 1:
492 if config.has_section('global'):
493 parsemet = OrderedDict()
494 for key in config.options('global'):
495 self.parse_metric_info(parsemet, key, config.get('global', key))
496 for metric in parsemet:
497 name = parsemet[metric][:1][0]
498 globmet[name] = parsemet[metric][1:]
500 # Add command line and configuration file metric sets
501 tempmet = OrderedDict()
502 for metric in metrics:
503 if metric.startswith(":"):
504 tempmet[metric[1:]] = None
505 else:
506 m = metric.split(",")
507 tempmet[m[0]] = m[1:]
509 # Get config and set details for configuration file metric sets
510 confmet = OrderedDict()
511 for spec in tempmet:
512 if tempmet[spec] == None:
513 if config.has_section(spec):
514 parsemet = OrderedDict()
515 for key in config.options(spec):
516 if key in self.keys:
517 self.set_attr(key, config.get(spec, key))
518 else:
519 self.parse_metric_info(parsemet, key, config.get(spec, key))
520 for metric in parsemet:
521 name = parsemet[metric][:1][0]
522 confmet[name] = parsemet[metric][1:]
523 tempmet[spec] = confmet
524 else:
525 raise IOError("Metric set definition '%s' not found." % metric)
527 # Create the combined metrics set
528 if self.globals == 1:
529 for metric in globmet:
530 self.metrics[metric] = globmet[metric]
531 for metric in tempmet:
532 if type(tempmet[metric]) is list:
533 self.metrics[metric] = tempmet[metric]
534 else:
535 for m in tempmet[metric]:
536 self.metrics[m] = confmet[m]
538 def check_metric(self, metric):
539 """ Validate individual metric and get its details """
540 try:
541 pmid = self.context.pmLookupName(metric)[0]
542 desc = self.context.pmLookupDescs(pmid)[0]
543 try:
544 if self.context.type == PM_CONTEXT_ARCHIVE:
545 inst = self.context.pmGetInDomArchive(desc)
546 else:
547 inst = self.context.pmGetInDom(desc) # disk.dev.read
548 if not inst[0]:
549 inst = ([PM_IN_NULL], [None]) # pmcd.pmie.logfile
550 except pmapi.pmErr:
551 inst = ([PM_IN_NULL], [None]) # mem.util.free
552 # Reject unsupported types
553 mtype = desc.contents.type
554 if not (mtype == PM_TYPE_32 or
555 mtype == PM_TYPE_U32 or
556 mtype == PM_TYPE_64 or
557 mtype == PM_TYPE_U64 or
558 mtype == PM_TYPE_FLOAT or
559 mtype == PM_TYPE_DOUBLE or
560 mtype == PM_TYPE_STRING):
561 raise pmapi.pmErr(PM_ERR_TYPE)
562 self.pmids.append(pmid)
563 self.descs.append(desc)
564 self.insts.append(inst)
565 except pmapi.pmErr as error:
566 sys.stderr.write("Invalid metric %s (%s).\n" % (metric, str(error)))
567 sys.exit(1)
569 def validate_config(self):
570 """ Validate configuration parameters """
571 if self.version != VERSION:
572 sys.stderr.write("Incompatible configuration file version (read v%s, need v%d).\n" % (self.version, VERSION))
573 sys.exit(1)
575 if self.context.type == PM_CONTEXT_ARCHIVE:
576 self.source = self.opts.pmGetOptionArchives()[0] # RHBZ#1262723
577 if self.context.type == PM_CONTEXT_HOST:
578 self.source = self.context.pmGetContextHostName()
579 if self.context.type == PM_CONTEXT_LOCAL:
580 self.source = "@" # PCPIntro(1), RHBZ#1289911
582 if self.output == OUTPUT_ARCHIVE and not self.outfile:
583 sys.stderr.write("Archive must be defined with archive output.\n")
584 sys.exit(1)
586 if self.output == OUTPUT_ZABBIX and (not self.zabbix_server or \
587 not self.zabbix_port or not self.zabbix_host):
588 sys.stderr.write("zabbix_server, zabbix_port, and zabbix_host must be defined with Zabbix.\n")
589 sys.exit(1)
591 # Runtime overrides samples/interval
592 if self.opts.pmGetOptionFinishOptarg():
593 self.runtime = int(float(self.opts.pmGetOptionFinish()) - float(self.opts.pmGetOptionStart()))
594 if self.opts.pmGetOptionSamples():
595 self.samples = self.opts.pmGetOptionSamples()
596 if self.samples < 2:
597 self.samples = 2
598 self.interval = float(self.runtime) / (self.samples - 1)
599 self.opts.pmSetOptionInterval(str(self.interval))
600 self.interval = self.opts.pmGetOptionInterval()
601 else:
602 self.interval = self.opts.pmGetOptionInterval()
603 if int(self.interval) == 0:
604 sys.stderr.write("Interval can't be less than 1 second.\n")
605 sys.exit(1)
606 self.samples = self.runtime / int(self.interval) + 1
607 if int(self.interval) > self.runtime:
608 sys.stderr.write("Interval can't be longer than runtime.\n")
609 sys.exit(1)
610 else:
611 self.samples = self.opts.pmGetOptionSamples()
612 self.interval = self.opts.pmGetOptionInterval()
614 if self.output == OUTPUT_ZABBIX:
615 if self.zabbix_interval:
616 self.zabbix_interval = int(pmapi.timeval.fromInterval(self.zabbix_interval))
617 if self.zabbix_interval < int(self.interval):
618 self.zabbix_interval = int(self.interval)
619 else:
620 self.zabbix_interval = int(self.interval)
622 self.can_scale = "pmParseUnitsStr" in dir(self.context)
624 def validate_metrics(self):
625 """ Validate the metrics set """
626 # Check the metrics against PMNS, resolve non-leaf metrics
627 if self.derived:
628 if self.derived.startswith("/") or self.derived.startswith("."):
629 try:
630 self.context.pmLoadDerivedConfig(self.derived)
631 except pmapi.pmErr as error:
632 sys.stderr.write("Failed to register derived metric: %s.\n" % str(error))
633 sys.exit(1)
634 else:
635 for definition in self.derived.split(","):
636 err = ""
637 try:
638 name, expr = definition.split("=")
639 self.context.pmLookupName(name.strip())
640 except pmapi.pmErr as error:
641 if error.args[0] == PM_ERR_NAME:
642 self.context.pmRegisterDerived(name.strip(), expr.strip())
643 continue
644 err = error.message()
645 except ValueError as error:
646 err = "Invalid syntax (expected metric=expression)"
647 except Exception as error:
648 #err = self.context.pmDerivedErrStr() # RHBZ#1286733
649 err = "Unknown reason"
650 finally:
651 if err:
652 sys.stderr.write("Failed to register derived metric: %s.\n" % err)
653 sys.exit(1)
654 # Prepare for non-leaf metrics
655 metrics = self.metrics
656 self.metrics = OrderedDict()
657 for metric in metrics:
658 try:
659 l = len(self.pmids)
660 self.context.pmTraversePMNS(metric, self.check_metric)
661 if len(self.pmids) == l + 1:
662 # Leaf
663 if metric == self.context.pmNameID(self.pmids[l]):
664 self.metrics[metric] = metrics[metric]
665 else:
666 # But handle single non-leaf case in an archive
667 self.metrics[self.context.pmNameID(self.pmids[l])] = []
668 else:
669 # Non-leaf
670 for i in range(l, len(self.pmids)):
671 name = self.context.pmNameID(self.pmids[i])
672 # We ignore specs like disk.dm,,,MB on purpose, for now
673 self.metrics[name] = []
674 except pmapi.pmErr as error:
675 sys.stderr.write("Invalid metric %s (%s).\n" % (metric, str(error)))
676 sys.exit(1)
678 # Finalize the metrics set
679 for i, metric in enumerate(self.metrics):
680 # Fill in all fields for easier checking later
681 for index in range(0, 5):
682 if len(self.metrics[metric]) <= index:
683 self.metrics[metric].append(None)
685 # Label
686 if not self.metrics[metric][0]:
687 # mem.util.free -> m.u.free
688 name = ""
689 for m in metric.split("."):
690 name += m[0] + "."
691 self.metrics[metric][0] = name[:-2] + m
693 # Rawness
694 if self.metrics[metric][3] == 'raw' or self.type == 1:
695 self.metrics[metric][3] = 1
696 else:
697 self.metrics[metric][3] = 0
699 # Unit/scale
700 unitstr = str(self.descs[i].contents.units)
701 # Set default unit if not specified on per-metric basis
702 if not self.metrics[metric][2]:
703 done = 0
704 unit = self.descs[i].contents.units
705 if self.count_scale and \
706 unit.dimCount == 1 and ( \
707 unit.dimSpace == 0 and
708 unit.dimTime == 0):
709 self.metrics[metric][2] = self.count_scale
710 done = 1
711 if self.space_scale and \
712 unit.dimSpace == 1 and ( \
713 unit.dimCount == 0 and
714 unit.dimTime == 0):
715 self.metrics[metric][2] = self.space_scale
716 done = 1
717 if self.time_scale and \
718 unit.dimTime == 1 and ( \
719 unit.dimCount == 0 and
720 unit.dimSpace == 0):
721 self.metrics[metric][2] = self.time_scale
722 done = 1
723 if not done:
724 self.metrics[metric][2] = unitstr
725 # Set unit/scale for non-raw numeric metrics
726 try:
727 if self.metrics[metric][3] == 0 and self.can_scale and \
728 self.descs[i].contents.type != PM_TYPE_STRING:
729 (unitstr, mult) = self.context.pmParseUnitsStr(self.metrics[metric][2])
730 label = self.metrics[metric][2]
731 if self.descs[i].sem == PM_SEM_COUNTER:
732 label += "/s"
733 if self.descs[i].contents.units.dimTime == 1:
734 label = "util"
735 self.metrics[metric][2] = (label, unitstr, mult)
736 else:
737 self.metrics[metric][2] = (unitstr, unitstr, 1)
738 except pmapi.pmErr as error:
739 sys.stderr.write("%s: %s.\n" % (str(error), self.metrics[metric][2]))
740 sys.exit(1)
742 # Set default width if not specified on per-metric basis
743 if self.metrics[metric][4]:
744 self.metrics[metric][4] = int(self.metrics[metric][4])
745 elif self.width != 0:
746 self.metrics[metric][4] = self.width
747 else:
748 self.metrics[metric][4] = len(self.metrics[metric][0])
749 if self.metrics[metric][4] < len(TRUNC):
750 self.metrics[metric][4] = len(TRUNC) # Forced minimum
752 # RHBZ#1264147
753 def pmids_to_ctypes(self, pmids):
754 """ Convert a Python list of pmids (numbers) to
755 a ctypes LP_c_uint (a C array of uints).
757 from ctypes import c_uint
758 pmidA = (c_uint * len(pmids))()
759 for i, p in enumerate(pmids):
760 pmidA[i] = c_uint(p)
761 return pmidA
763 def get_mode_step(self):
764 """ Get mode and step for pmSetMode """
765 if not self.interpol or self.output == OUTPUT_ARCHIVE:
766 mode = PM_MODE_FORW
767 step = 0
768 else:
769 mode = PM_MODE_INTERP
770 secs_in_24_days = 2073600
771 if self.interval.tv_sec > secs_in_24_days:
772 step = self.interval.tv_sec
773 mode |= PM_XTB_SET(PM_TIME_SEC)
774 else:
775 step = self.interval.tv_sec*1000 + self.interval.tv_usec/1000
776 mode |= PM_XTB_SET(PM_TIME_MSEC)
777 return (mode, int(step))
779 def execute(self):
780 """ Using a PMAPI context (could be either host or archive),
781 fetch and report the requested set of values on stdout.
783 # Set output primitives
784 if self.delimiter == None:
785 if self.output == OUTPUT_CSV:
786 self.delimiter = CSVSEP
787 else:
788 self.delimiter = OUTSEP
790 # Time
791 if self.opts.pmGetOptionTimezone():
792 os.environ['TZ'] = self.opts.pmGetOptionTimezone()
793 time.tzset()
794 self.context.pmNewZone(self.opts.pmGetOptionTimezone())
796 if self.timefmt == None:
797 if self.output == OUTPUT_CSV:
798 self.timefmt = CSVTIME
799 else:
800 self.timefmt = OUTTIME
801 if not self.timefmt:
802 self.timestamp = 0
804 if self.context.type != PM_CONTEXT_ARCHIVE:
805 self.delay = 1
806 self.interpol = 1
808 # Print preparation
809 self.prepare_writer()
810 if self.output == OUTPUT_STDOUT:
811 self.prepare_stdout()
813 # DBG_TRACE_APPL1 == 4096
814 if "pmDebug" in dir(self.context) and self.context.pmDebug(4096):
815 self.writer.write("Known config file keywords: " + str(self.keys) + "\n")
816 self.writer.write("Known metric spec keywords: " + str(self.metricspec) + "\n")
818 # Headers
819 if self.extheader == 1:
820 self.extheader = 0
821 self.write_ext_header()
823 if self.header == 1:
824 self.header = 0
825 self.write_header()
826 else:
827 self.repeat_header = 0
829 # Just checking
830 if self.check == 1:
831 return
833 # Archive fetching mode
834 if self.context.type == PM_CONTEXT_ARCHIVE:
835 (mode, step) = self.get_mode_step()
836 self.context.pmSetMode(mode, self.opts.pmGetOptionOrigin(), step)
838 lines = 0
839 while self.samples != 0:
840 if self.output == OUTPUT_STDOUT:
841 if lines > 1 and self.repeat_header == lines:
842 self.write_header()
843 lines = 0
844 lines += 1
846 try:
847 result = self.context.pmFetch(self.pmids_to_ctypes(self.pmids))
848 except pmapi.pmErr as error:
849 if error.args[0] == PM_ERR_EOL:
850 self.samples = 0
851 continue
852 raise error
853 self.context.pmSortInstances(result) # XXX Is this really needed?
854 values = self.extract(result)
855 if self.ctstamp == 0:
856 self.ctstamp = copy.copy(result.contents.timestamp)
857 self.ptstamp = self.ctstamp
858 self.ctstamp = copy.copy(result.contents.timestamp)
860 if self.context.type == PM_CONTEXT_ARCHIVE:
861 if float(self.ctstamp) < float(self.opts.pmGetOptionStart()):
862 self.context.pmFreeResult(result)
863 continue
864 if float(self.ctstamp) > float(self.opts.pmGetOptionFinish()):
865 return
867 self.report(self.ctstamp, values)
868 self.context.pmFreeResult(result)
869 if self.samples and self.samples > 0:
870 self.samples -= 1
871 if self.delay and self.interpol and self.samples != 0:
872 self.context.pmtimevalSleep(self.interval)
874 # Allow modules to flush buffered values / say goodbye
875 self.report(None, None)
877 def extract(self, result):
878 """ Extract the metric values from pmResult structure """
879 # Metrics incl. all instance values, must match self.format on return
880 values = []
882 for i, metric in enumerate(self.metrics):
883 # Per-metric values incl. all instance values
884 # We use dict to make it easier to deal with gone/unknown instances
885 values.append({})
887 # Populate instance fields to have values for unavailable instances
888 # Values are (instance id, instance name, instance value)
889 for inst in self.insts[i][0]:
890 values[i][inst] = (-1, None, NO_VAL)
892 # No values available for this metric
893 if result.contents.get_numval(i) == 0:
894 continue
896 # Process all fetched instances
897 for j in range(result.contents.get_numval(i)):
898 inst = result.contents.get_inst(i, j)
900 # Locate the correct instance and its position
901 if inst >= 0:
902 if inst not in self.insts[i][0]:
903 # Ignore newly emerged instances
904 continue
905 k = 0
906 while inst != self.insts[i][0][k]:
907 k += 1
909 # Extract and scale the value
910 try:
911 # Use native type if no rescaling needed
912 if self.metrics[metric][2][2] == 1 and \
913 str(self.descs[i].contents.units) == \
914 str(self.metrics[metric][2][1]):
915 rescale = 0
916 vtype = self.descs[i].contents.type
917 else:
918 rescale = 1
919 vtype = PM_TYPE_DOUBLE
921 atom = self.context.pmExtractValue(
922 result.contents.get_valfmt(i),
923 result.contents.get_vlist(i, j),
924 self.descs[i].contents.type,
925 vtype)
927 if self.metrics[metric][3] != 1 and rescale and \
928 self.descs[i].contents.type != PM_TYPE_STRING and \
929 self.can_scale:
930 atom = self.context.pmConvScale(
931 vtype,
932 atom, self.descs, i,
933 self.metrics[metric][2][1])
935 val = atom.dref(vtype)
936 if rescale and self.can_scale and \
937 self.descs[i].contents.type != PM_TYPE_STRING:
938 val *= self.metrics[metric][2][2]
939 val = int(val) if val == int(val) else val
941 if inst >= 0:
942 values[i][inst] = (inst, self.insts[i][1][k], val)
943 else:
944 values[i][PM_IN_NULL] = (-1, None, val)
946 except pmapi.pmErr as error:
947 sys.stderr.write("%s: %s, aborting.\n" % (metric, str(error)))
948 sys.exit(1)
950 # Convert dicts to lists
951 vals = []
952 for v in values:
953 vals.append(v.values())
954 values = vals
956 # Store current and previous values
957 # Output modules need to handle non-existing self.prevvals
958 self.prevvals = self.currvals
959 self.currvals = values
961 return values # XXX Redundant now
963 def report(self, tstamp, values):
964 """ Report the metric values """
965 if tstamp != None:
966 ts = self.context.pmLocaltime(tstamp.tv_sec)
967 us = int(tstamp.tv_usec)
968 dt = datetime(ts.tm_year+1900, ts.tm_mon+1, ts.tm_mday,
969 ts.tm_hour, ts.tm_min, ts.tm_sec, us, None)
970 tstamp = dt.strftime(self.timefmt)
972 if self.output == OUTPUT_ARCHIVE:
973 self.write_archive(tstamp, values)
974 if self.output == OUTPUT_CSV:
975 self.write_csv(tstamp, values)
976 if self.output == OUTPUT_STDOUT:
977 self.write_stdout(tstamp, values)
978 if self.output == OUTPUT_ZABBIX:
979 self.write_zabbix(tstamp, values)
981 def prepare_writer(self):
982 """ Prepare generic stdout writer """
983 if not self.writer:
984 if self.output == OUTPUT_ARCHIVE or \
985 self.output == OUTPUT_ZABBIX or \
986 self.outfile == None:
987 self.writer = sys.stdout
988 else:
989 self.writer = open(self.outfile, 'wt')
990 return self.writer
992 def prepare_stdout(self):
993 """ Prepare stdout output """
994 index = 0
995 if self.timestamp == 0:
996 #self.format = "{:}{}"
997 self.format = "{0:}{1}"
998 index += 2
999 else:
1000 tstamp = datetime.fromtimestamp(time.time()).strftime(self.timefmt)
1001 #self.format = "{:" + str(len(tstamp)) + "}{}"
1002 self.format = "{" + str(index) + ":" + str(len(tstamp)) + "}"
1003 index += 1
1004 self.format += "{" + str(index) + "}"
1005 index += 1
1006 for i, metric in enumerate(self.metrics):
1007 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1008 for _ in range(ins):
1009 l = str(self.metrics[metric][4])
1010 #self.format += "{:>" + l + "." + l + "}{}"
1011 self.format += "{" + str(index) + ":>" + l + "." + l + "}"
1012 index += 1
1013 self.format += "{" + str(index) + "}"
1014 index += 1
1015 #self.format = self.format[:-2]
1016 l = len(str(index-1)) + 2
1017 self.format = self.format[:-l]
1019 def write_ext_header(self):
1020 """ Write extended header """
1021 comm = "#" if self.output == OUTPUT_CSV else ""
1023 if self.runtime != -1:
1024 duration = self.runtime
1025 samples = self.samples
1026 else:
1027 if self.samples:
1028 duration = (self.samples - 1) * int(self.interval)
1029 samples = self.samples
1030 if self.context.type == PM_CONTEXT_ARCHIVE:
1031 if not self.interpol:
1032 samples = str(samples) + " (requested)"
1033 else:
1034 duration = int(float(self.opts.pmGetOptionFinish()) - float(self.opts.pmGetOptionStart()))
1035 samples = (duration / int(self.interval)) + 1
1036 duration = (samples - 1) * int(self.interval)
1037 if self.context.type == PM_CONTEXT_ARCHIVE:
1038 if not self.interpol:
1039 samples = "N/A"
1040 endtime = float(self.opts.pmGetOptionStart()) + duration
1042 if self.context.type == PM_CONTEXT_ARCHIVE:
1043 host = self.context.pmGetArchiveLabel().hostname
1044 if not self.interpol and not self.opts.pmGetOptionFinish():
1045 endtime = self.context.pmGetArchiveEnd()
1046 if self.context.type == PM_CONTEXT_HOST:
1047 host = self.source
1048 if self.context.type == PM_CONTEXT_LOCAL:
1049 host = "localhost, using DSO PMDAs"
1051 # Figure out the current timezone using the PCP convention
1052 if self.opts.pmGetOptionTimezone():
1053 currtz = self.opts.pmGetOptionTimezone()
1054 else:
1055 dst = time.localtime().tm_isdst
1056 offset = time.altzone if dst else time.timezone
1057 currtz = time.tzname[dst]
1058 if offset:
1059 currtz += str(offset/3600)
1060 timezone = currtz
1062 if self.context.type == PM_CONTEXT_ARCHIVE:
1063 if self.context.pmGetArchiveLabel().tz != timezone:
1064 timezone = self.context.pmGetArchiveLabel().tz
1065 timezone += " (creation, current is " + currtz + ")"
1067 self.writer.write(comm + "\n")
1068 if self.context.type == PM_CONTEXT_ARCHIVE:
1069 self.writer.write(comm + " archive: " + self.source + "\n")
1070 self.writer.write(comm + " host: " + host + "\n")
1071 self.writer.write(comm + " timezone: " + timezone + "\n")
1072 self.writer.write(comm + " start: " + time.asctime(time.localtime(self.opts.pmGetOptionStart())) + "\n")
1073 self.writer.write(comm + " end: " + time.asctime(time.localtime(endtime)) + "\n")
1074 self.writer.write(comm + " metrics: " + str(len(self.pmids)) + "\n")
1075 self.writer.write(comm + " samples: " + str(samples) + "\n")
1076 if not (self.context.type == PM_CONTEXT_ARCHIVE and not self.interpol):
1077 self.writer.write(comm + " interval: " + str(float(self.interval)) + " sec\n")
1078 self.writer.write(comm + " duration: " + str(duration) + " sec\n")
1079 else:
1080 self.writer.write(comm + " interval: N/A\n")
1081 self.writer.write(comm + " duration: N/A\n")
1082 self.writer.write(comm + "\n")
1084 def write_header(self):
1085 """ Write metrics header """
1086 if self.output == OUTPUT_ARCHIVE:
1087 self.writer.write("Recording %d metrics to %s" % (len(self.pmids), self.outfile))
1088 if self.runtime != -1:
1089 self.writer.write(":\n%s samples(s) with %.1f sec interval ~ %d sec duration.\n" % (self.samples, float(self.interval), self.runtime))
1090 elif self.samples:
1091 duration = (self.samples - 1) * int(self.interval)
1092 self.writer.write(":\n%s samples(s) with %.1f sec interval ~ %d sec duration.\n" % (self.samples, float(self.interval), duration))
1093 else:
1094 self.writer.write("...")
1095 if self.context.type != PM_CONTEXT_ARCHIVE:
1096 self.writer.write(" (Ctrl-C to stop)")
1097 self.writer.write("\n")
1098 return
1100 if self.output == OUTPUT_CSV:
1101 self.writer.write("Time")
1102 for metric in self.metrics:
1103 self.writer.write(self.delimiter + metric)
1104 self.writer.write("\n")
1106 if self.output == OUTPUT_STDOUT:
1107 names = ["", self.delimiter] # no timestamp on header line
1108 insts = ["", self.delimiter] # no timestamp on instances line
1109 units = ["", self.delimiter] # no timestamp on units line
1110 prnti = 0
1111 for i, metric in enumerate(self.metrics):
1112 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1113 prnti = 1 if self.insts[i][0][0] != PM_IN_NULL else 0
1114 for j in range(ins):
1115 names.append(self.metrics[metric][0])
1116 names.append(self.delimiter)
1117 units.append(self.metrics[metric][2][0])
1118 units.append(self.delimiter)
1119 if prnti == 1:
1120 insts.append(self.insts[i][1][j])
1121 else:
1122 insts.append(self.delimiter)
1123 insts.append(self.delimiter)
1124 del names[-1]
1125 del units[-1]
1126 del insts[-1]
1127 self.writer.write(self.format.format(*tuple(names)) + "\n")
1128 if prnti == 1:
1129 self.writer.write(self.format.format(*tuple(insts)) + "\n")
1130 if self.unitinfo:
1131 self.writer.write(self.format.format(*tuple(units)) + "\n")
1133 if self.output == OUTPUT_ZABBIX:
1134 if self.context.type == PM_CONTEXT_ARCHIVE:
1135 self.delay = 0
1136 self.interpol = 0
1137 self.zabbix_interval = 250 # See zabbix_sender(8)
1138 self.writer.write("Sending %d archived metrics to Zabbix server %s...\n(Ctrl-C to stop)\n" % (len(self.pmids), self.zabbix_server))
1139 return
1141 self.writer.write("Sending %d metrics to Zabbix server %s every %d sec" % (len(self.pmids), self.zabbix_server, self.zabbix_interval))
1142 if self.runtime != -1:
1143 self.writer.write(":\n%s samples(s) with %.1f sec interval ~ %d sec runtime.\n" % (self.samples, float(self.interval), self.runtime))
1144 elif self.samples:
1145 duration = (self.samples - 1) * int(self.interval)
1146 self.writer.write(":\n%s samples(s) with %.1f sec interval ~ %d sec runtime.\n" % (self.samples, float(self.interval), duration))
1147 else:
1148 self.writer.write("...\n(Ctrl-C to stop)\n")
1150 def write_archive(self, timestamp, values):
1151 """ Write an archive record """
1152 if timestamp == None and values == None:
1153 # Complete and close
1154 self.pmi.pmiEnd()
1155 self.pmi = None
1156 return
1158 if self.pmi == None:
1159 # Create a new archive
1160 self.pmi = pmi.pmiLogImport(self.outfile)
1161 if self.context.type == PM_CONTEXT_ARCHIVE:
1162 self.pmi.pmiSetHostname(self.context.pmGetArchiveLabel().hostname)
1163 self.pmi.pmiSetTimezone(self.context.pmGetArchiveLabel().tz)
1164 for i, metric in enumerate(self.metrics):
1165 self.pmi.pmiAddMetric(metric,
1166 self.pmids[i],
1167 self.descs[i].contents.type,
1168 self.descs[i].contents.indom,
1169 self.descs[i].contents.sem,
1170 self.descs[i].contents.units)
1171 ins = 0 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1172 try:
1173 for j in range(ins):
1174 self.pmi.pmiAddInstance(self.descs[i].contents.indom, self.insts[i][1][j], self.insts[i][0][j])
1175 except pmi.pmiErr as error:
1176 if error.args[0] == PMI_ERR_DUPINSTNAME:
1177 continue
1179 # Add current values
1180 data = 0
1181 for i, metric in enumerate(self.metrics):
1182 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1183 for j in range(ins):
1184 if str(list(values[i])[j][2]) != NO_VAL:
1185 data = 1
1186 inst = self.insts[i][1][j]
1187 if inst == None: # RHBZ#1285371
1188 inst = ""
1189 if self.descs[i].contents.type == PM_TYPE_STRING:
1190 self.pmi.pmiPutValue(metric, inst, str(values[i][j][2]))
1191 elif self.descs[i].contents.type == PM_TYPE_FLOAT or \
1192 self.descs[i].contents.type == PM_TYPE_DOUBLE:
1193 self.pmi.pmiPutValue(metric, inst, "%f" % values[i][j][2])
1194 else:
1195 self.pmi.pmiPutValue(metric, inst, "%d" % values[i][j][2])
1197 # Flush
1198 if data:
1199 # pylint: disable=maybe-no-member
1200 self.pmi.pmiWrite(self.ctstamp.tv_sec, self.ctstamp.tv_usec)
1202 def write_csv(self, timestamp, values):
1203 """ Write results in CSV format """
1204 if timestamp == None and values == None:
1205 # Silent goodbye
1206 return
1208 # Print the results
1209 line = timestamp
1210 for i, metric in enumerate(self.metrics):
1211 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1212 for j in range(ins):
1213 line += self.delimiter
1214 if type(list(values[i])[j][2]) is float:
1215 fmt = "." + str(self.precision) + "f"
1216 line += format(list(values[i])[j][2], fmt)
1217 else:
1218 line += str(list(values[i])[j][2])
1219 self.writer.write(line + "\n")
1221 def write_stdout(self, timestamp, values):
1222 """ Write a line to stdout """
1223 if timestamp == None and values == None:
1224 # Silent goodbye
1225 return
1227 #fmt = self.format.split("{}")
1228 fmt = re.split("{\\d+}", self.format)
1229 line = []
1231 if self.timestamp == 0:
1232 line.append("")
1233 else:
1234 line.append(timestamp)
1235 line.append(self.delimiter)
1237 k = 0
1238 for i, metric in enumerate(self.metrics):
1239 l = self.metrics[metric][4]
1241 for j in range(len(values[i])):
1242 k += 1
1244 # Raw or rate
1245 if not self.metrics[metric][3] and \
1246 (self.prevvals == None or list(self.prevvals[i])[j][2] == NO_VAL):
1247 # Rate not yet possible
1248 value = NO_VAL
1249 elif self.metrics[metric][3] or \
1250 self.descs[i].sem != PM_SEM_COUNTER or \
1251 list(values[i])[j][2] == NO_VAL:
1252 # Raw
1253 value = list(values[i])[j][2]
1254 else:
1255 # Rate
1256 scale = 1
1257 if self.descs[i].contents.units.dimTime != 0:
1258 if self.descs[i].contents.units.scaleTime > PM_TIME_SEC:
1259 scale = pow(60, (PM_TIME_SEC - self.descs[i].contents.units.scaleTime))
1260 else:
1261 scale = pow(1000, (PM_TIME_SEC - self.descs[i].contents.units.scaleTime))
1262 delta = scale * (float(self.ctstamp) - float(self.ptstamp))
1263 value = (list(values[i])[j][2] - list(self.prevvals[i])[j][2]) / delta if delta else 0
1265 # Make sure the value fits
1266 if type(value) is int or type(value) is long:
1267 if len(str(value)) > l:
1268 value = TRUNC
1269 else:
1270 #fmt[k] = "{:" + str(l) + "d}"
1271 fmt[k] = "{X:" + str(l) + "d}"
1273 if type(value) is float:
1274 c = self.precision
1275 s = len(str(int(value)))
1276 if s > l:
1277 c = -1
1278 value = TRUNC
1279 #for _ in reversed(range(c+1)):
1280 #t = "{:" + str(l) + "." + str(c) + "f}"
1281 for f in reversed(range(c+1)):
1282 r = "{X:" + str(l) + "." + str(c) + "f}"
1283 t = "{0:" + str(l) + "." + str(c) + "f}"
1284 if len(t.format(value)) > l:
1285 c -= 1
1286 else:
1287 #fmt[k] = t
1288 fmt[k] = r
1289 break
1291 line.append(value)
1292 line.append(self.delimiter)
1294 del line[-1]
1295 #self.writer.write('{}'.join(fmt).format(*tuple(line)) + "\n")
1296 index = 0
1297 nfmt = ""
1298 for f in fmt:
1299 nfmt += f.replace("{X:", "{" + str(index) + ":")
1300 index += 1
1301 nfmt += "{" + str(index) + "}"
1302 index += 1
1303 l = len(str(index-1)) + 2
1304 nfmt = nfmt[:-l]
1305 self.writer.write(nfmt.format(*tuple(line)) + "\n")
1307 def write_zabbix(self, timestamp, values):
1308 """ Write (send) metrics to a Zabbix server """
1309 if timestamp == None and values == None:
1310 # Send any remaining buffered values
1311 if self.zabbix_metrics:
1312 send_to_zabbix(self.zabbix_metrics, self.zabbix_server, self.zabbix_port)
1313 self.zabbix_metrics = []
1314 return
1316 # Collect the results
1317 ts = float(self.ctstamp)
1318 if self.zabbix_prevsend == None:
1319 self.zabbix_prevsend = ts
1320 for i, metric in enumerate(self.metrics):
1321 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1322 for j in range(ins):
1323 key = ZBXPRFX + metric
1324 if self.insts[i][1][j]:
1325 key += "[" + str(self.insts[i][1][j]) + "]"
1326 val = str(list(values[i])[j][2])
1327 self.zabbix_metrics.append(ZabbixMetric(self.zabbix_host, key, val, ts))
1329 # Send when needed
1330 if self.context.type == PM_CONTEXT_ARCHIVE:
1331 if len(self.zabbix_metrics) >= self.zabbix_interval:
1332 send_to_zabbix(self.zabbix_metrics, self.zabbix_server, self.zabbix_port)
1333 self.zabbix_metrics = []
1334 elif ts - self.zabbix_prevsend > self.zabbix_interval:
1335 send_to_zabbix(self.zabbix_metrics, self.zabbix_server, self.zabbix_port)
1336 self.zabbix_metrics = []
1337 self.zabbix_prevsend = ts
1339 def connect(self):
1340 """ Establish a PMAPI context to archive, host or local, via args """
1341 self.context = pmapi.pmContext.fromOptions(self.opts, sys.argv)
1343 def finalize(self):
1344 """ Finalize and clean up """
1345 if self.writer:
1346 self.writer.flush()
1347 self.writer = None
1348 if self.pmi:
1349 self.pmi.pmiEnd()
1350 self.pmi = None
1352 if __name__ == '__main__':
1353 try:
1354 P = PMReporter()
1355 P.read_config()
1356 P.prepare_metrics()
1357 P.connect()
1358 P.validate_config()
1359 P.validate_metrics()
1360 P.execute()
1361 P.finalize()
1363 except pmapi.pmErr as error:
1364 sys.stderr.write('%s: %s\n' % (error.progname(), error.message()))
1365 except pmapi.pmUsageErr as usage:
1366 usage.message()
1367 except IOError as error:
1368 sys.stderr.write("%s\n" % str(error))
1369 except KeyboardInterrupt:
1370 sys.stdout.write("\n")
1371 P.finalize()