pmrep: decimals -> precision
[pcp.git] / src / pmrep / pmrep.py
blobc86ca0b7a3092a32ed1bc9006de21e4bb7e5543c
1 #!/usr/bin/pcp python
3 # Copyright (C) 2015 Marko Myllynen <myllynen@redhat.com>
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 # for more details.
15 # [zbxsend] Copyright (C) 2014 Sergey Kirillov <sergey.kirillov@gmail.com>
16 # All rights reserved.
18 # Redistribution and use in source and binary forms, with or without
19 # modification, are permitted provided that the following conditions
20 # are met:
22 # 1. Redistributions of source code must retain the above copyright
23 # notice, this list of conditions and the following disclaimer.
25 # 2. Redistributions in binary form must reproduce the above copyright
26 # notice, this list of conditions and the following disclaimer in the
27 # documentation and/or other materials provided with the distribution.
29 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 # HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
35 # TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
36 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
39 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 # pylint: disable=fixme, line-too-long, bad-whitespace, invalid-name
42 # pylint: disable=superfluous-parens
43 """ Performance Metrics Reporter """
45 from collections import OrderedDict
46 from datetime import datetime
47 try:
48 import ConfigParser
49 except ImportError:
50 import configparser as ConfigParser
51 try:
52 import json
53 except:
54 import simplejson as json
55 import socket
56 import struct
57 import time
58 import copy
59 import sys
60 import os
61 import re
63 from pcp import pmapi, pmgui, pmi
64 from cpmapi import PM_CONTEXT_ARCHIVE, PM_CONTEXT_HOST, PM_CONTEXT_LOCAL, PM_MODE_FORW, PM_MODE_INTERP, PM_ERR_TYPE, PM_ERR_EOL, PM_IN_NULL, PM_SEM_COUNTER, PM_TIME_MSEC, PM_TIME_SEC, PM_XTB_SET
65 from cpmapi import PM_TYPE_32, PM_TYPE_U32, PM_TYPE_64, PM_TYPE_U64, PM_TYPE_FLOAT, PM_TYPE_DOUBLE, PM_TYPE_STRING
66 from cpmgui import PM_REC_ON, PM_REC_OFF, PM_REC_SETARG
68 if sys.version_info[0] >= 3:
69 long = int
71 # Default config
72 DEFAULT_CONFIG = "./pmrep.conf"
74 # Default field separators, config/time formats, missing/truncated values
75 CSVSEP = ","
76 CSVTIME = "%Y-%m-%d %H:%M:%S"
77 OUTSEP = " "
78 OUTTIME = "%H:%M:%S"
79 ZBXPORT = 10051
80 ZBXPRFX = "pcp."
81 NO_VAL = "N/A"
82 TRUNC = "xxx"
83 VERSION = 1
85 # Output targets
86 OUTPUT_ARCHIVE = "archive"
87 OUTPUT_CSV = "csv"
88 OUTPUT_STDOUT = "stdout"
89 OUTPUT_ZABBIX = "zabbix"
91 class ZabbixMetric(object):
92 """ A Zabbix metric """
93 def __init__(self, host, key, value, clock):
94 self.host = host
95 self.key = key
96 self.value = value
97 self.clock = clock
99 def __repr__(self):
100 return 'Metric(%r, %r, %r, %r)' % (self.host, self.key, self.value, self.clock)
102 def recv_from_zabbix(sock, count):
103 """ Receive a response from a Zabbix server. """
104 buf = b''
105 while len(buf) < count:
106 chunk = sock.recv(count - len(buf))
107 if not chunk:
108 return buf
109 buf += chunk
110 return buf
112 def send_to_zabbix(metrics, zabbix_host, zabbix_port, timeout=15):
113 """ Send a set of metrics to a Zabbix server. """
115 j = json.dumps
116 # Zabbix has a very fragile JSON parser, so we cannot use json to
117 # dump the whole packet
118 metrics_data = []
119 for m in metrics:
120 clock = m.clock or time.time()
121 metrics_data.append(('\t\t{\n'
122 '\t\t\t"host":%s,\n'
123 '\t\t\t"key":%s,\n'
124 '\t\t\t"value":%s,\n'
125 '\t\t\t"clock":%.5f}') % (j(m.host), j(m.key), j(m.value), clock))
126 json_data = ('{\n'
127 '\t"request":"sender data",\n'
128 '\t"data":[\n%s]\n'
129 '}') % (',\n'.join(metrics_data))
131 data_len = struct.pack('<Q', len(json_data))
132 packet = b'ZBXD\1' + data_len + json_data.encode('utf-8')
133 try:
134 zabbix = socket.socket()
135 zabbix.connect((zabbix_host, zabbix_port))
136 zabbix.settimeout(timeout)
137 # send metrics to zabbix
138 zabbix.sendall(packet)
139 # get response header from zabbix
140 resp_hdr = recv_from_zabbix(zabbix, 13)
141 if not bytes.decode(resp_hdr).startswith('ZBXD\1') or len(resp_hdr) != 13:
142 # debug: write('Invalid Zabbix response len=%d' % len(resp_hdr))
143 return False
144 resp_body_len = struct.unpack('<Q', resp_hdr[5:])[0]
145 # get response body from zabbix
146 resp_body = zabbix.recv(resp_body_len)
147 resp = json.loads(bytes.decode(resp_body))
148 # debug: write('Got response from Zabbix: %s' % resp)
149 if resp.get('response') != 'success':
150 sys.stderr.write('Error response from Zabbix: %s', resp)
151 return False
152 return True
153 except socket.timeout as err:
154 sys.stderr.write("Zabbix connection timed out: " + str(err))
155 return False
156 finally:
157 zabbix.close()
159 class PMReporter(object):
160 """ Report PCP metrics """
162 def __init__(self):
163 """ Construct object, prepare for command line handling """
164 self.context = None
165 self.check = 0
166 self.format = None # output format
167 self.opts = self.options()
169 # Configuration directives
170 self.keys = ('source', 'output', 'derived', 'header', 'unitinfo',
171 'globals', 'timestamp', 'samples', 'interval', 'runtime',
172 'delay', 'raw', 'width', 'precision', 'delimiter',
173 'extheader', 'repeat_header', 'timefmt', 'interpol',
174 'count_scale', 'space_scale', 'time_scale', 'version',
175 'zabbix_server', 'zabbix_port', 'zabbix_host', 'zabbix_interval')
177 # Command line switches without arguments
178 self.argless = ('-C', '-L', '-H', '-U', '-G', '-p', '-d', '--delay', '-r', '--raw', '-x', '--extended-header', '-u')
179 self.arghelp = ('-?', '--help', '-V', '--version')
181 # The order of preference for parameters (as present):
182 # 1 - command line parameters
183 # 2 - parameters from configuration file(s)
184 # 3 - built-in defaults defined below
185 self.config = self.set_config_file()
186 self.version = VERSION
187 self.source = "local:"
188 self.output = OUTPUT_STDOUT
189 self.archive = None # output archive
190 self.log = None # pmi handle
191 self.derived = None
192 self.header = 1
193 self.unitinfo = 1
194 self.globals = 1
195 self.timestamp = 0
196 self.samples = None # forever
197 self.interval = pmapi.timeval(1) # 1 sec
198 self.opts.pmSetOptionInterval(str(1))
199 self.runtime = -1
200 self.delay = 0
201 self.raw = 0
202 self.width = 0
203 self.precision = 3 # .3f
204 self.delimiter = None
205 self.extheader = 0
206 self.repeat_header = 0
207 self.timefmt = None
208 self.interpol = 1
209 self.count_scale = None
210 self.space_scale = None
211 self.time_scale = None
212 self.can_scale = None # PCP 3.9 compat
214 # Performance metrics store
215 # key - metric name
216 # values - 0:label, 1:instance(s), 2:unit/scale, 3:rawness, 4:width
217 self.metrics = OrderedDict()
219 # Corresponding config file metric specifiers
220 self.metricspec = ('label', 'instance', 'unit', 'raw', 'width', 'formula')
222 self.prevvals = None
223 self.currvals = None
224 self.ptstamp = 0
225 self.ctstamp = 0
226 self.pmids = []
227 self.descs = []
228 self.insts = []
230 # Zabbix integration
231 self.zabbix_server = None
232 self.zabbix_port = ZBXPORT
233 self.zabbix_host = None
234 self.zabbix_interval = None
235 self.zabbix_prevsend = None
236 self.zabbix_metrics = []
238 def set_config_file(self):
239 """ Set configuration file """
240 config = DEFAULT_CONFIG
242 # Possibly override the built-in default config file before
243 # parsing the rest of the command line parameters
244 args = iter(sys.argv[1:])
245 for arg in args:
246 if arg in self.arghelp:
247 return None
248 if arg == '-c' or arg == '--config':
249 try:
250 config = next(args)
251 if not os.path.isfile(config) or not os.access(config, os.R_OK):
252 raise IOError("Failed to read configuration file '%s'." % config)
253 except StopIteration:
254 break
255 return config
257 def set_attr(self, name, value):
258 """ Helper to apply config file settings properly """
259 if value in ('true', 'True', 'y', 'yes', 'Yes'):
260 value = 1
261 if value in ('false', 'False', 'n', 'no', 'No'):
262 value = 0
263 if name == 'source':
264 try: # RHBZ#1270176 / PCP < 3.10.8
265 if '/' in value:
266 self.opts.pmSetOptionArchive(value)
267 else:
268 self.opts.pmSetOptionHost(value)
269 except:
270 sys.stderr.write("PCP 3.10.8 or later required for the 'source' directive.\n")
271 sys.exit(1)
272 elif name == 'samples':
273 self.opts.pmSetOptionSamples(value)
274 self.samples = self.opts.pmGetOptionSamples()
275 elif name == 'interval':
276 self.opts.pmSetOptionInterval(value)
277 self.interval = self.opts.pmGetOptionInterval()
278 else:
279 try:
280 setattr(self, name, int(value))
281 except ValueError:
282 setattr(self, name, value)
284 def read_config(self):
285 """ Read options from configuration file """
286 if self.config is None:
287 return
288 config = ConfigParser.SafeConfigParser()
289 config.read(self.config)
290 if not config.has_section('options'):
291 return
292 for opt in config.options('options'):
293 if opt in self.keys:
294 self.set_attr(opt, config.get('options', opt))
295 else:
296 sys.stderr.write("Invalid directive '%s' in %s.\n" % (opt, self.config))
297 sys.exit(1)
299 def options(self):
300 """ Setup default command line argument option handling """
301 opts = pmapi.pmOptions()
302 opts.pmSetOptionCallback(self.option)
303 opts.pmSetOverrideCallback(self.option_override)
304 opts.pmSetShortOptions("a:h:LK:c:Co:F:e:D:V?HUGpA:S:T:O:s:t:R:Z:zdrw:f:l:xE:P:uq:b:y:")
305 opts.pmSetShortUsage("[option...] metricspec [...]")
307 opts.pmSetLongOptionHeader("General options")
308 opts.pmSetLongOptionArchive() # -a/--archive
309 opts.pmSetLongOptionArchiveFolio() # --archive-folio
310 opts.pmSetLongOptionHost() # -h/--host
311 opts.pmSetLongOptionLocalPMDA() # -L/--local-PMDA
312 opts.pmSetLongOptionSpecLocal() # -K/--spec-local
313 opts.pmSetLongOption("config", 1, "c", "FILE", "config file path")
314 opts.pmSetLongOption("check", 0, "C", "", "check config and metrics and exit")
315 opts.pmSetLongOption("output", 1, "o", "OUTPUT", "output target, one of: archive, csv, stdout (default), zabbix")
316 opts.pmSetLongOption("output-archive", 1, "F", "ARCHIVE", "output archive/folio (with -o archive)")
317 opts.pmSetLongOption("derived", 1, "e", "FILE|DFNT", "derived metrics definitions")
318 opts.pmSetLongOptionDebug() # -D/--debug
319 opts.pmSetLongOptionVersion() # -V/--version
320 opts.pmSetLongOptionHelp() # -?/--help
322 opts.pmSetLongOptionHeader("Reporting options")
323 opts.pmSetLongOption("no-header", 0, "H", "", "omit headers")
324 opts.pmSetLongOption("no-unit-info", 0, "U", "", "omit unit info from headers")
325 opts.pmSetLongOption("no-globals", 0, "G", "", "omit global metrics")
326 opts.pmSetLongOption("timestamps", 0, "p", "", "print timestamps")
327 opts.pmSetLongOptionAlign() # -A/--align
328 opts.pmSetLongOptionStart() # -S/--start
329 opts.pmSetLongOptionFinish() # -T/--finish
330 opts.pmSetLongOptionOrigin() # -O/--origin
331 opts.pmSetLongOptionSamples() # -s/--samples
332 opts.pmSetLongOptionInterval() # -t/--interval
333 opts.pmSetLongOption("runtime", 1, "R", "N", "runtime duration (overrides -t or -s)")
334 opts.pmSetLongOptionTimeZone() # -Z/--timezone
335 opts.pmSetLongOptionHostZone() # -z/--hostzone
336 opts.pmSetLongOption("delay", 0, "d", "", "delay, pause between updates for archive replay")
337 opts.pmSetLongOption("raw", 0, "r", "", "output raw counter values (no rate conversion)")
338 opts.pmSetLongOption("width", 1, "w", "N", "default column width")
339 opts.pmSetLongOption("fixed-point", 1, "f", "N", "N digits after the decimal separator (if width enough)")
340 opts.pmSetLongOption("delimiter", 1, "l", "STR", "delimiter to separate csv/stdout columns")
341 opts.pmSetLongOption("extended-header", 0, "x", "", "display extended header")
342 opts.pmSetLongOption("repeat-header", 1, "E", "N", "repeat stdout headers every N lines")
343 opts.pmSetLongOption("timestamp-format", 1, "P", "STR", "strftime string for timestamp format")
344 opts.pmSetLongOption("no-interpolation", 0, "u", "", "disable interpolation mode with archives")
345 opts.pmSetLongOption("count-scale", 1, "q", "SCALE", "default count unit")
346 opts.pmSetLongOption("space-scale", 1, "b", "SCALE", "default space unit")
347 opts.pmSetLongOption("time-scale", 1, "y", "SCALE", "default time unit")
349 return opts
351 def option_override(self, opt):
352 """ Override a few standard PCP options """
353 if opt == 'H' or opt == 'p':
354 return 1
355 return 0
357 def option(self, opt, optarg, index):
358 """ Perform setup for an individual command line option """
359 if opt == 'c':
360 self.config = optarg
361 elif opt == 'C':
362 self.check = 1
363 elif opt == 'o':
364 if optarg == OUTPUT_ARCHIVE:
365 self.output = OUTPUT_ARCHIVE
366 elif optarg == OUTPUT_CSV:
367 self.output = OUTPUT_CSV
368 elif optarg == OUTPUT_STDOUT:
369 self.output = OUTPUT_STDOUT
370 elif optarg == OUTPUT_ZABBIX:
371 self.output = OUTPUT_ZABBIX
372 else:
373 sys.stderr.write("Invalid output target %s specified.\n" % optarg)
374 sys.exit(1)
375 elif opt == 'F':
376 if os.path.exists(optarg) or os.path.exists(optarg + ".index"):
377 sys.stderr.write("Archive/folio %s already exists.\n" % optarg)
378 sys.exit(1)
379 self.archive = optarg
380 elif opt == 'e':
381 self.derived = optarg
382 elif opt == 'H':
383 self.header = 0
384 elif opt == 'U':
385 self.unitinfo = 0
386 elif opt == 'G':
387 self.globals = 0
388 elif opt == 'p':
389 self.timestamp = 1
390 elif opt == 'R':
391 self.runtime = optarg
392 elif opt == 'd':
393 self.delay = 1
394 elif opt == 'r':
395 self.raw = 1
396 elif opt == 'w':
397 self.width = int(optarg)
398 elif opt == 'f':
399 self.precision = int(optarg)
400 elif opt == 'l':
401 self.delimiter = optarg
402 elif opt == 'x':
403 self.extheader = 1
404 elif opt == 'E':
405 self.repeat_header = int(optarg)
406 elif opt == 'P':
407 self.timefmt = optarg
408 elif opt == 'u':
409 self.interpol = 0
410 elif opt == 'q':
411 self.count_scale = optarg
412 elif opt == 'b':
413 self.space_scale = optarg
414 elif opt == 'y':
415 self.time_scale = optarg
416 else:
417 raise pmapi.pmUsageErr()
419 def get_cmd_line_metrics(self):
420 """ Get metric set specifications from the command line """
421 metrics = []
422 for arg in sys.argv[1:]:
423 if arg in self.arghelp:
424 return 0
425 for arg in reversed(sys.argv[1:]):
426 if arg.startswith('-'):
427 if len(metrics):
428 if arg not in self.argless and '=' not in arg:
429 del metrics[-1]
430 break
431 metrics.append(arg)
432 metrics.reverse()
433 return metrics
435 def parse_metric_info(self, metrics, key, value):
436 """ Parse metric information """
437 # NB. Uses the config key, not the metric, as the dict key
438 if ',' in value:
439 # Compact / one-line definition
440 metrics[key] = (key + "," + value).split(",")
441 else:
442 # Chatty / multi-line definition
443 if not '.' in key or key.rsplit(".", 1)[1] not in self.metricspec:
444 # New metric
445 metrics[key] = value.split()
446 for index in range(0, 6):
447 if len(metrics[key]) <= index:
448 metrics[key].append(None)
449 else:
450 # Additional info
451 key, spec = key.rsplit(".", 1)
452 if key not in metrics:
453 sys.stderr.write("Undeclared metric key %s.\n" % key)
454 sys.exit(1)
455 if spec == "formula":
456 if self.derived == None:
457 self.derived = metrics[key][0] + "=" + value
458 else:
459 self.derived += "," + metrics[key][0] + "=" + value
460 else:
461 metrics[key][self.metricspec.index(spec)+1] = value
463 def prepare_metrics(self):
464 """ Construct and prepare the initial metrics set """
465 # Get direct and/or sets of metrics from the command line
466 metrics = self.get_cmd_line_metrics()
467 if metrics == 0:
468 return
469 if not metrics:
470 sys.stderr.write("No metrics specified.\n")
471 raise pmapi.pmUsageErr()
473 # Ugly, but we haven't read our command line via PMAPI yet
474 if '-G' in sys.argv:
475 self.globals = 0
477 # Read config
478 config = ConfigParser.SafeConfigParser()
479 config.read(self.config)
481 # First read global metrics (if not disabled already)
482 globmet = OrderedDict()
483 if self.globals == 1:
484 if config.has_section('global'):
485 parsemet = OrderedDict()
486 for key in config.options('global'):
487 self.parse_metric_info(parsemet, key, config.get('global', key))
488 for metric in parsemet:
489 name = parsemet[metric][:1][0]
490 globmet[name] = parsemet[metric][1:]
491 if globmet[name][0] == None:
492 globmet[name][0] = metric
494 # Add command line and configuration file metric sets
495 tempmet = OrderedDict()
496 for metric in metrics:
497 if metric.startswith(":"):
498 tempmet[metric[1:]] = None
499 else:
500 m = metric.split(",")
501 tempmet[m[0]] = m[1:]
503 # Get config and set details for configuration file metric sets
504 confmet = OrderedDict()
505 for spec in tempmet:
506 if tempmet[spec] == None:
507 if config.has_section(spec):
508 parsemet = OrderedDict()
509 for key in config.options(spec):
510 if key in self.keys:
511 self.set_attr(key, config.get(spec, key))
512 else:
513 self.parse_metric_info(parsemet, key, config.get(spec, key))
514 for metric in parsemet:
515 name = parsemet[metric][:1][0]
516 confmet[name] = parsemet[metric][1:]
517 if confmet[name][0] == None:
518 confmet[name][0] = metric
519 tempmet[spec] = confmet
520 else:
521 raise IOError("Metric set definition '%s' not found." % metric)
523 # Create the combined metrics set
524 if self.globals == 1:
525 for metric in globmet:
526 self.metrics[metric] = globmet[metric]
527 for metric in tempmet:
528 if type(tempmet[metric]) is list:
529 self.metrics[metric] = tempmet[metric]
530 else:
531 for m in tempmet[metric]:
532 self.metrics[m] = confmet[m]
534 def check_metric(self, metric):
535 """ Validate individual metric and get its details """
536 try:
537 pmid = self.context.pmLookupName(metric)[0]
538 desc = self.context.pmLookupDescs(pmid)[0]
539 try:
540 if self.context.type == PM_CONTEXT_ARCHIVE:
541 inst = self.context.pmGetInDomArchive(desc)
542 else:
543 inst = self.context.pmGetInDom(desc) # disk.dev.read
544 if not inst[0]:
545 inst = ([PM_IN_NULL], [None]) # pmcd.pmie.logfile
546 except pmapi.pmErr:
547 inst = ([PM_IN_NULL], [None]) # mem.util.free
548 # Reject unsupported types
549 mtype = desc.contents.type
550 if not (mtype == PM_TYPE_32 or
551 mtype == PM_TYPE_U32 or
552 mtype == PM_TYPE_64 or
553 mtype == PM_TYPE_U64 or
554 mtype == PM_TYPE_FLOAT or
555 mtype == PM_TYPE_DOUBLE or
556 mtype == PM_TYPE_STRING):
557 raise pmapi.pmErr(PM_ERR_TYPE)
558 self.pmids.append(pmid)
559 self.descs.append(desc)
560 self.insts.append(inst)
561 except pmapi.pmErr as error:
562 sys.stderr.write("Invalid metric %s (%s).\n" % (metric, str(error)))
563 sys.exit(1)
565 def validate_config(self):
566 """ Validate configuration parameters """
567 if self.version != VERSION:
568 sys.stderr.write("Incompatible configuration file version (read v%s, need v%d).\n" % (self.version, VERSION))
569 sys.exit(1)
571 if self.context.type == PM_CONTEXT_ARCHIVE:
572 self.source = self.opts.pmGetOptionArchives()[0] # RHBZ#1262723
573 if self.context.type == PM_CONTEXT_HOST:
574 self.source = self.context.pmGetContextHostName()
575 if self.context.type == PM_CONTEXT_LOCAL:
576 self.source = "@" # PCPIntro(1), RHBZ#1272082
578 if self.output == OUTPUT_ARCHIVE and not self.archive:
579 sys.stderr.write("Archive/folio must be defined with archive output.\n")
580 sys.exit(1)
582 if self.output == OUTPUT_ZABBIX and (not self.zabbix_server or \
583 not self.zabbix_port or not self.zabbix_host):
584 sys.stderr.write("zabbix_server, zabbix_port, and zabbix_host must be defined with Zabbix.\n")
585 sys.exit(1)
587 # Runtime overrides samples/interval/endtime
588 if self.runtime != -1:
589 self.runtime = int(pmapi.timeval.fromInterval(self.runtime))
590 if self.opts.pmGetOptionSamples():
591 self.samples = self.opts.pmGetOptionSamples()
592 if self.samples < 2:
593 self.samples = 2
594 self.interval = float(self.runtime) / (self.samples - 1)
595 self.opts.pmSetOptionInterval(str(self.interval))
596 self.interval = self.opts.pmGetOptionInterval()
597 else:
598 self.interval = self.opts.pmGetOptionInterval()
599 self.samples = self.runtime / int(self.interval) + 1
600 if int(self.interval) > self.runtime:
601 sys.stderr.write("Interval can't be longer than runtime.\n")
602 sys.exit(1)
603 else:
604 self.samples = self.opts.pmGetOptionSamples()
605 self.interval = self.opts.pmGetOptionInterval()
607 if self.output == OUTPUT_ZABBIX:
608 if self.zabbix_interval:
609 self.zabbix_interval = int(pmapi.timeval.fromInterval(self.zabbix_interval))
610 if self.zabbix_interval < int(self.interval):
611 self.zabbix_interval = int(self.interval)
612 else:
613 self.zabbix_interval = int(self.interval)
615 self.can_scale = "pmParseUnitsStr" in dir(self.context)
617 def validate_metrics(self):
618 """ Validate the metrics set """
619 # Check the metrics against PMNS, resolve non-leaf metrics
620 if self.derived:
621 if self.derived.startswith("/") or self.derived.startswith("."):
622 try:
623 self.context.pmLoadDerivedConfig(self.derived)
624 except pmapi.pmErr as error:
625 sys.stderr.write("Failed to register derived metric: %s.\n" % str(error))
626 sys.exit(1)
627 else:
628 for definition in self.derived.split(","):
629 err = ""
630 try:
631 name, expr = definition.split("=")
632 self.context.pmRegisterDerived(name.strip(), expr.strip())
633 except ValueError as error:
634 err = "Invalid syntax (expected metric=expression)"
635 except Exception as error:
636 #err = self.context.pmDerivedErrStr() # RHBZ#1286733
637 err = "Unknown reason"
638 finally:
639 if err:
640 sys.stderr.write("Failed to register derived metric: %s.\n" % err)
641 sys.exit(1)
642 # Prepare for non-leaf metrics
643 metrics = self.metrics
644 self.metrics = OrderedDict()
645 for metric in metrics:
646 try:
647 l = len(self.pmids)
648 self.context.pmTraversePMNS(metric, self.check_metric)
649 if len(self.pmids) == l + 1:
650 # Leaf
651 if metric == self.context.pmNameID(self.pmids[l]):
652 self.metrics[metric] = metrics[metric]
653 else:
654 # But handle single non-leaf case in an archive
655 self.metrics[self.context.pmNameID(self.pmids[l])] = []
656 else:
657 # Non-leaf
658 for i in range(l, len(self.pmids)):
659 name = self.context.pmNameID(self.pmids[i])
660 # We ignore specs like disk.dm,,,MB on purpose, for now
661 self.metrics[name] = []
662 except pmapi.pmErr as error:
663 sys.stderr.write("Invalid metric %s (%s).\n" % (metric, str(error)))
664 sys.exit(1)
666 # Finalize the metrics set
667 for i, metric in enumerate(self.metrics):
668 # Fill in all fields for easier checking later
669 for index in range(0, 5):
670 if len(self.metrics[metric]) <= index:
671 self.metrics[metric].append(None)
673 # Label
674 if not self.metrics[metric][0]:
675 # mem.util.free -> m.u.free
676 name = ""
677 for m in metric.split("."):
678 name += m[0] + "."
679 self.metrics[metric][0] = name[:-2] + m
681 # Rawness
682 if self.metrics[metric][3] == 'r' or self.raw == 1:
683 self.metrics[metric][3] = 1
684 else:
685 self.metrics[metric][3] = 0
687 # Unit/scale
688 unitstr = str(self.descs[i].contents.units)
689 # Set default unit if not specified on per-metric basis
690 if not self.metrics[metric][2]:
691 done = 0
692 unit = self.descs[i].contents.units
693 if self.count_scale and \
694 unit.dimCount == 1 and ( \
695 unit.dimSpace == 0 and
696 unit.dimTime == 0):
697 self.metrics[metric][2] = self.count_scale
698 done = 1
699 if self.space_scale and \
700 unit.dimSpace == 1 and ( \
701 unit.dimCount == 0 and
702 unit.dimTime == 0):
703 self.metrics[metric][2] = self.space_scale
704 done = 1
705 if self.time_scale and \
706 unit.dimTime == 1 and ( \
707 unit.dimCount == 0 and
708 unit.dimSpace == 0):
709 self.metrics[metric][2] = self.time_scale
710 done = 1
711 if not done:
712 self.metrics[metric][2] = unitstr
713 # Set unit/scale for non-raw numeric metrics
714 try:
715 if self.metrics[metric][3] == 0 and self.can_scale and \
716 self.descs[i].contents.type != PM_TYPE_STRING:
717 (unitstr, mult) = self.context.pmParseUnitsStr(self.metrics[metric][2])
718 label = self.metrics[metric][2]
719 if self.descs[i].sem == PM_SEM_COUNTER:
720 label += "/s"
721 if self.descs[i].contents.units.dimTime == 1:
722 label = "util"
723 self.metrics[metric][2] = (label, unitstr, mult)
724 else:
725 self.metrics[metric][2] = (unitstr, unitstr, 1)
726 except pmapi.pmErr as error:
727 sys.stderr.write("%s: %s.\n" % (str(error), self.metrics[metric][2]))
728 sys.exit(1)
730 # Set default width if not specified on per-metric basis
731 if self.metrics[metric][4]:
732 self.metrics[metric][4] = int(self.metrics[metric][4])
733 elif self.width != 0:
734 self.metrics[metric][4] = self.width
735 else:
736 self.metrics[metric][4] = len(self.metrics[metric][0])
737 if self.metrics[metric][4] < len(TRUNC):
738 self.metrics[metric][4] = len(TRUNC) # Forced minimum
740 # RHBZ#1264147
741 def pmids_to_ctypes(self, pmids):
742 """ Convert a Python list of pmids (numbers) to
743 a ctypes LP_c_uint (a C array of uints).
745 from ctypes import c_uint
746 pmidA = (c_uint * len(pmids))()
747 for i, p in enumerate(pmids):
748 pmidA[i] = c_uint(p)
749 return pmidA
751 def get_mode_step(self):
752 """ Get mode and step for pmSetMode """
753 if not self.interpol or self.output == OUTPUT_ARCHIVE:
754 mode = PM_MODE_FORW
755 step = 0
756 else:
757 mode = PM_MODE_INTERP
758 secs_in_24_days = 2073600
759 if self.interval.tv_sec > secs_in_24_days:
760 step = self.interval.tv_sec
761 mode |= PM_XTB_SET(PM_TIME_SEC)
762 else:
763 step = self.interval.tv_sec*1000 + self.interval.tv_usec/1000
764 mode |= PM_XTB_SET(PM_TIME_MSEC)
765 return (mode, step)
767 def execute(self):
768 """ Using a PMAPI context (could be either host or archive),
769 fetch and report the requested set of values on stdout.
771 # Set output primitives
772 if self.delimiter == None:
773 if self.output == OUTPUT_CSV:
774 self.delimiter = CSVSEP
775 else:
776 self.delimiter = OUTSEP
778 # Time
779 if self.opts.pmGetOptionTimezone():
780 os.environ['TZ'] = self.opts.pmGetOptionTimezone()
781 time.tzset()
782 self.context.pmNewZone(self.opts.pmGetOptionTimezone())
784 if self.timefmt == None:
785 if self.output == OUTPUT_CSV:
786 self.timefmt = CSVTIME
787 else:
788 self.timefmt = OUTTIME
789 if not self.timefmt:
790 self.timestamp = 0
792 if self.context.type != PM_CONTEXT_ARCHIVE:
793 self.delay = 1
794 self.interpol = 1
796 # DBG_TRACE_APPL1 == 4096
797 if "pmDebug" in dir(self.context) and self.context.pmDebug(4096):
798 print("Known config file keywords: " + str(self.keys))
799 print("Known metric spec keywords: " + str(self.metricspec))
801 # Printing format and headers
802 if self.format == None:
803 self.define_format()
805 if self.extheader == 1:
806 self.extheader = 0
807 self.write_ext_header()
809 if self.header == 1:
810 self.header = 0
811 self.write_header()
812 else:
813 self.repeat_header = 0
815 # Just checking
816 if self.check == 1:
817 return
819 # Archive recording from non-archive is handled separately
820 if self.output == OUTPUT_ARCHIVE:
821 if self.context.type != PM_CONTEXT_ARCHIVE:
822 self.write_archive_pmgui()
823 return
825 # Archive fetching mode
826 if self.context.type == PM_CONTEXT_ARCHIVE:
827 (mode, step) = self.get_mode_step()
828 self.context.pmSetMode(mode, self.opts.pmGetOptionOrigin(), step)
830 lines = 0
831 while self.samples != 0:
832 if self.output == OUTPUT_STDOUT:
833 if lines > 1 and self.repeat_header == lines:
834 self.write_header()
835 lines = 0
836 lines += 1
838 try:
839 result = self.context.pmFetch(self.pmids_to_ctypes(self.pmids))
840 except pmapi.pmErr as error:
841 if error.args[0] == PM_ERR_EOL:
842 self.samples = 0
843 continue
844 raise error
845 self.context.pmSortInstances(result) # XXX Is this really needed?
846 values = self.extract(result)
847 if self.ctstamp == 0:
848 self.ctstamp = copy.copy(result.contents.timestamp)
849 self.ptstamp = self.ctstamp
850 self.ctstamp = copy.copy(result.contents.timestamp)
852 if self.context.type == PM_CONTEXT_ARCHIVE:
853 if float(self.ctstamp) < float(self.opts.pmGetOptionStart()):
854 self.context.pmFreeResult(result)
855 continue
856 if float(self.ctstamp) > float(self.opts.pmGetOptionFinish()):
857 return
859 self.report(self.ctstamp, values)
860 self.context.pmFreeResult(result)
861 if self.samples and self.samples > 0:
862 self.samples -= 1
863 if self.delay and self.interpol and self.samples != 0:
864 self.context.pmtimevalSleep(self.interval)
866 # Allow modules to flush buffered values / say goodbye
867 self.report(None, None)
869 def extract(self, result):
870 """ Extract the metric values from pmResult structure """
871 # Metrics incl. all instance values, must match self.format on return
872 values = []
874 for i, metric in enumerate(self.metrics):
875 # Per-metric values incl. all instance values
876 # We use dict to make it easier to deal with gone/unknown instances
877 values.append({})
879 # Populate instance fields to have values for unavailable instances
880 # Values are (instance id, instance name, instance value)
881 for inst in self.insts[i][0]:
882 values[i][inst] = (-1, None, NO_VAL)
884 # No values available for this metric
885 if result.contents.get_numval(i) == 0:
886 continue
888 # Process all fetched instances
889 for j in range(result.contents.get_numval(i)):
890 inst = result.contents.get_inst(i, j)
892 # Locate the correct instance and its position
893 if inst >= 0:
894 if inst not in self.insts[i][0]:
895 # Ignore newly emerged instances
896 continue
897 k = 0
898 while inst != self.insts[i][0][k]:
899 k += 1
901 # Extract and scale the value
902 try:
903 # Use native type if no rescaling needed
904 if self.metrics[metric][2][2] == 1 and \
905 str(self.descs[i].contents.units) == \
906 str(self.metrics[metric][2][1]):
907 rescale = 0
908 vtype = self.descs[i].contents.type
909 else:
910 rescale = 1
911 vtype = PM_TYPE_DOUBLE
913 atom = self.context.pmExtractValue(
914 result.contents.get_valfmt(i),
915 result.contents.get_vlist(i, j),
916 self.descs[i].contents.type,
917 vtype)
919 if self.metrics[metric][3] != 1 and rescale and \
920 self.descs[i].contents.type != PM_TYPE_STRING and \
921 self.can_scale:
922 atom = self.context.pmConvScale(
923 vtype,
924 atom, self.descs, i,
925 self.metrics[metric][2][1])
927 val = atom.dref(vtype)
928 if rescale and self.can_scale and \
929 self.descs[i].contents.type != PM_TYPE_STRING:
930 val *= self.metrics[metric][2][2]
931 val = int(val) if val == int(val) else val
933 if inst >= 0:
934 values[i][inst] = (inst, self.insts[i][1][k], val)
935 else:
936 values[i][PM_IN_NULL] = (-1, None, val)
938 except pmapi.pmErr as error:
939 sys.stderr.write("%s: %s, aborting.\n" % (metric, str(error)))
940 sys.exit(1)
942 # Convert dicts to lists
943 vals = []
944 for v in values:
945 vals.append(v.values())
946 values = vals
948 # Store current and previous values
949 # Output modules need to handle non-existing self.prevvals
950 self.prevvals = self.currvals
951 self.currvals = values
953 return values # XXX Redundant now
955 def report(self, tstamp, values):
956 """ Report the metric values """
957 if tstamp != None:
958 tstamp = datetime.fromtimestamp(tstamp).strftime(self.timefmt)
960 if self.output == OUTPUT_ARCHIVE:
961 self.write_archive_pmi(tstamp, values)
962 if self.output == OUTPUT_CSV:
963 self.write_csv(tstamp, values)
964 if self.output == OUTPUT_STDOUT:
965 self.write_stdout(tstamp, values)
966 if self.output == OUTPUT_ZABBIX:
967 self.write_zabbix(tstamp, values)
969 def define_format(self):
970 """ Define stdout format string """
971 index = 0
972 if self.timestamp == 0:
973 #self.format = "{:}{}"
974 self.format = "{0:}{1}"
975 index += 2
976 else:
977 tstamp = datetime.fromtimestamp(time.time()).strftime(self.timefmt)
978 #self.format = "{:" + str(len(tstamp)) + "}{}"
979 self.format = "{" + str(index) + ":" + str(len(tstamp)) + "}"
980 index += 1
981 self.format += "{" + str(index) + "}"
982 index += 1
983 for i, metric in enumerate(self.metrics):
984 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
985 for _ in range(ins):
986 l = str(self.metrics[metric][4])
987 #self.format += "{:>" + l + "." + l + "}{}"
988 self.format += "{" + str(index) + ":>" + l + "." + l + "}"
989 index += 1
990 self.format += "{" + str(index) + "}"
991 index += 1
992 #self.format = self.format[:-2]
993 l = len(str(index-1)) + 2
994 self.format = self.format[:-l]
996 def write_ext_header(self):
997 """ Write extended header """
998 comm = "#" if self.output == OUTPUT_CSV else ""
1000 if self.runtime != -1:
1001 duration = self.runtime
1002 samples = self.samples
1003 else:
1004 if self.samples:
1005 duration = (self.samples - 1) * int(self.interval)
1006 samples = self.samples
1007 if self.context.type == PM_CONTEXT_ARCHIVE:
1008 if not self.interpol:
1009 samples = str(samples) + " (requested)"
1010 else:
1011 duration = int(float(self.opts.pmGetOptionFinish()) - float(self.opts.pmGetOptionStart()))
1012 samples = (duration / int(self.interval)) + 1
1013 duration = (samples - 1) * int(self.interval)
1014 if self.context.type == PM_CONTEXT_ARCHIVE:
1015 if not self.interpol:
1016 samples = "N/A"
1017 endtime = float(self.opts.pmGetOptionStart()) + duration
1019 if self.context.type == PM_CONTEXT_ARCHIVE:
1020 host = self.context.pmGetArchiveLabel().hostname
1021 if not self.interpol and not self.opts.pmGetOptionFinish():
1022 endtime = self.context.pmGetArchiveEnd()
1023 if self.context.type == PM_CONTEXT_HOST:
1024 host = self.source
1025 if self.context.type == PM_CONTEXT_LOCAL:
1026 host = "localhost, using DSO PMDAs"
1028 # Figure out the current timezone using the PCP convention
1029 if self.opts.pmGetOptionTimezone():
1030 currtz = self.opts.pmGetOptionTimezone()
1031 else:
1032 dst = time.localtime().tm_isdst
1033 offset = time.altzone if dst else time.timezone
1034 currtz = time.tzname[dst]
1035 if offset:
1036 currtz += str(offset/3600)
1037 timezone = currtz
1039 if self.context.type == PM_CONTEXT_ARCHIVE:
1040 if self.context.pmGetArchiveLabel().tz != timezone:
1041 timezone = self.context.pmGetArchiveLabel().tz
1042 timezone += " (creation, current is " + currtz + ")"
1044 print(comm)
1045 if self.context.type == PM_CONTEXT_ARCHIVE:
1046 print(comm + " archive: " + self.source)
1047 print(comm + " host: " + host)
1048 print(comm + " timezone: " + timezone)
1049 print(comm + " start: " + time.asctime(time.localtime(self.opts.pmGetOptionStart())))
1050 print(comm + " end: " + time.asctime(time.localtime(endtime)))
1051 print(comm + " samples: " + str(samples))
1052 if not (self.context.type == PM_CONTEXT_ARCHIVE and not self.interpol):
1053 print(comm + " interval: " + str(float(self.interval)) + " sec")
1054 print(comm + " duration: " + str(duration) + " sec")
1055 else:
1056 print(comm + " interval: N/A")
1057 print(comm + " duration: N/A")
1058 print(comm)
1060 def write_header(self):
1061 """ Write metrics header """
1062 if self.output == OUTPUT_ARCHIVE:
1063 sys.stdout.write("Recording archive/folio %s" % self.archive)
1064 if self.runtime != -1:
1065 sys.stdout.write(":\n%s samples(s) with %.1f sec interval ~ %d sec duration.\n" % (self.samples, float(self.interval), self.runtime + 1))
1066 elif self.samples:
1067 duration = (self.samples - 1) * int(self.interval)
1068 sys.stdout.write(":\n%s samples(s) with %.1f sec interval ~ %d sec duration.\n" % (self.samples, float(self.interval), duration + 1))
1069 else:
1070 if self.context.type != PM_CONTEXT_ARCHIVE:
1071 sys.stdout.write("... (Ctrl-C to stop)")
1072 sys.stdout.write("\n")
1073 return
1075 if self.output == OUTPUT_CSV:
1076 header = "metric" + self.delimiter
1077 if self.timestamp == 1:
1078 header += "timestamp" + self.delimiter
1079 for f in "name", "unit", "value":
1080 header += f + self.delimiter
1081 header = header[:-len(self.delimiter)]
1082 print(header)
1084 if self.output == OUTPUT_STDOUT:
1085 names = ["", self.delimiter] # no timestamp on header line
1086 insts = ["", self.delimiter] # no timestamp on instances line
1087 units = ["", self.delimiter] # no timestamp on units line
1088 prnti = 0
1089 for i, metric in enumerate(self.metrics):
1090 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1091 prnti = 1 if self.insts[i][0][0] != PM_IN_NULL else 0
1092 for j in range(ins):
1093 names.append(self.metrics[metric][0])
1094 names.append(self.delimiter)
1095 units.append(self.metrics[metric][2][0])
1096 units.append(self.delimiter)
1097 if prnti == 1:
1098 insts.append(self.insts[i][1][j])
1099 else:
1100 insts.append(self.delimiter)
1101 insts.append(self.delimiter)
1102 del names[-1]
1103 del units[-1]
1104 del insts[-1]
1105 print(self.format.format(*tuple(names)))
1106 if prnti == 1:
1107 print(self.format.format(*tuple(insts)))
1108 if self.unitinfo:
1109 print(self.format.format(*tuple(units)))
1111 if self.output == OUTPUT_ZABBIX:
1112 if self.context.type == PM_CONTEXT_ARCHIVE:
1113 self.delay = 0
1114 self.interpol = 0
1115 self.zabbix_interval = 250 # See zabbix_sender(8)
1116 sys.stdout.write("Sending archived metrics to Zabbix server %s...\n(Ctrl-C to stop)\n" % self.zabbix_server)
1117 return
1119 sys.stdout.write("Sending metrics to Zabbix server %s every %d sec" % (self.zabbix_server, self.zabbix_interval))
1120 if self.runtime != -1:
1121 sys.stdout.write(":\n%s samples(s) with %.1f sec interval ~ %d sec runtime.\n" % (self.samples, float(self.interval), self.runtime))
1122 elif self.samples:
1123 duration = (self.samples - 1) * int(self.interval)
1124 sys.stdout.write(":\n%s samples(s) with %.1f sec interval ~ %d sec runtime.\n" % (self.samples, float(self.interval), duration))
1125 else:
1126 sys.stdout.write("...\n(Ctrl-C to stop)\n")
1128 def write_archive_pmgui(self):
1129 """ Write an archive entry using pmgui """
1130 # We're not a graphical app, disable popups
1131 os.environ['PCP_XCONFIRM_PROG'] = '/bin/true'
1133 # Create the archive folio using pmgui
1134 context = pmgui.GuiClient()
1135 config = "log mandatory on every " + str(int(self.interval)) + " sec {\n"
1136 for metric in self.metrics:
1137 config += metric + "\n"
1138 config += "}\n"
1139 context.pmRecordSetup(self.archive, ' '.join(sys.argv), 0)
1140 context.pmRecordAddHost(self.source, 1, config)
1141 duration = 0
1142 if self.runtime != -1:
1143 duration = self.runtime
1144 elif self.samples:
1145 if self.samples < 2:
1146 self.samples = 2
1147 duration = (self.samples - 1) * int(self.interval)
1148 if duration:
1149 endtime = "-T" + str(duration) + "sec"
1150 context.pmRecordControl(0, PM_REC_SETARG, endtime)
1151 context.pmRecordControl(0, PM_REC_ON, "")
1152 if not duration:
1153 time.sleep(0xFFFFFFFF) # A very long time
1154 context.pmRecordControl(0, PM_REC_OFF, "") # Non-mandatory
1155 return
1156 for i in range(duration):
1157 sys.stdout.write("\rProgress: %3d%%" % int(float(i) / duration * 100))
1158 sys.stdout.flush()
1159 time.sleep(1)
1160 sys.stdout.write("\rProgress: 99%")
1161 sys.stdout.flush()
1162 time.sleep(1) # Make sure the last record gets there
1163 # For cleanliness only, -T should have stopped recording by now
1164 context.pmRecordControl(0, PM_REC_OFF, "")
1165 sys.stdout.write("\rComplete: 100%.\n")
1167 def write_archive_pmi(self, timestamp, values):
1168 """ Write an archive entry using pmi """
1169 if timestamp == None and values == None:
1170 # Complete and close
1171 self.log.pmiEnd()
1172 return
1174 if self.log == None:
1175 # Create a new archive
1176 self.log = pmi.pmiLogImport(self.archive)
1177 self.log.pmiSetHostname(self.context.pmGetArchiveLabel().hostname)
1178 self.log.pmiSetTimezone(self.context.pmGetArchiveLabel().tz)
1179 for i, metric in enumerate(self.metrics):
1180 self.log.pmiAddMetric(metric,
1181 self.pmids[i],
1182 self.descs[i].contents.type,
1183 self.descs[i].contents.indom,
1184 self.descs[i].contents.sem,
1185 self.descs[i].contents.units)
1186 ins = 0 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1187 for j in range(ins):
1188 self.log.pmiAddInstance(self.descs[i].contents.indom, self.insts[i][1][j], self.insts[i][0][j])
1190 # Add current values
1191 data = 0
1192 for i, metric in enumerate(self.metrics):
1193 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1194 for j in range(ins):
1195 if str(list(values[i])[j][2]) != NO_VAL:
1196 data = 1
1197 inst = self.insts[i][1][j]
1198 if inst == None: # RHBZ#1285371
1199 inst = ""
1200 if self.descs[i].contents.type == PM_TYPE_STRING:
1201 self.log.pmiPutValue(metric, inst, str(values[i][j][2]))
1202 elif self.descs[i].contents.type == PM_TYPE_FLOAT or \
1203 self.descs[i].contents.type == PM_TYPE_DOUBLE:
1204 self.log.pmiPutValue(metric, inst, "%f" % values[i][j][2])
1205 else:
1206 self.log.pmiPutValue(metric, inst, "%d" % values[i][j][2])
1208 # Flush
1209 if data:
1210 # pylint: disable=maybe-no-member
1211 self.log.pmiWrite(self.ctstamp.tv_sec, self.ctstamp.tv_usec)
1213 def write_csv(self, timestamp, values):
1214 """ Write a line in CSV format """
1215 if timestamp == None and values == None:
1216 # Silent goodbye
1217 return
1219 # CSV is always raw not rate
1220 for i, metric in enumerate(self.metrics):
1221 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1222 for j in range(ins):
1223 line = metric
1224 if self.insts[i][1][j]:
1225 line += "." + str(self.insts[i][1][j])
1226 line += self.delimiter
1227 if self.timestamp == 1:
1228 line += timestamp + self.delimiter
1229 line += str(self.metrics[metric][0]) + self.delimiter
1230 line += str(self.metrics[metric][2][0]) + self.delimiter
1231 line += str(list(values[i])[j][2])
1232 print(line)
1234 def write_stdout(self, timestamp, values):
1235 """ Write a line to stdout """
1236 if timestamp == None and values == None:
1237 # Silent goodbye
1238 return
1240 #fmt = self.format.split("{}")
1241 fmt = re.split("{\\d+}", self.format)
1242 line = []
1244 if self.timestamp == 0:
1245 line.append("")
1246 else:
1247 line.append(timestamp)
1248 line.append(self.delimiter)
1250 k = 0
1251 for i, metric in enumerate(self.metrics):
1252 l = self.metrics[metric][4]
1254 for j in range(len(values[i])):
1255 k += 1
1257 # Raw or rate
1258 if not self.metrics[metric][3] and \
1259 (self.prevvals == None or self.prevvals[i][j][2] == NO_VAL):
1260 # Rate not yet possible
1261 value = NO_VAL
1262 elif self.metrics[metric][3] or \
1263 self.descs[i].sem != PM_SEM_COUNTER or \
1264 values[i][j][2] == NO_VAL:
1265 # Raw
1266 value = list(values[i])[j][2]
1267 else:
1268 # Rate
1269 scale = 1
1270 if self.descs[i].contents.units.dimTime != 0:
1271 if self.descs[i].contents.units.scaleTime > PM_TIME_SEC:
1272 scale = pow(60, (PM_TIME_SEC - self.descs[i].contents.units.scaleTime))
1273 else:
1274 scale = pow(1000, (PM_TIME_SEC - self.descs[i].contents.units.scaleTime))
1275 delta = scale * (float(self.ctstamp) - float(self.ptstamp))
1276 value = (values[i][j][2] - self.prevvals[i][j][2]) / delta if delta else 0
1278 # Make sure the value fits
1279 if type(value) is int or type(value) is long:
1280 if len(str(value)) > l:
1281 value = TRUNC
1282 else:
1283 #fmt[k] = "{:" + str(l) + "d}"
1284 fmt[k] = "{X:" + str(l) + "d}"
1286 if type(value) is float:
1287 c = self.precision
1288 s = len(str(int(value)))
1289 if s > l:
1290 c = -1
1291 value = TRUNC
1292 #for _ in reversed(range(c+1)):
1293 #t = "{:" + str(l) + "." + str(c) + "f}"
1294 for f in reversed(range(c+1)):
1295 r = "{X:" + str(l) + "." + str(c) + "f}"
1296 t = "{0:" + str(l) + "." + str(c) + "f}"
1297 if len(t.format(value)) > l:
1298 c -= 1
1299 else:
1300 #fmt[k] = t
1301 fmt[k] = r
1302 break
1304 line.append(value)
1305 line.append(self.delimiter)
1307 del line[-1]
1308 #print('{}'.join(fmt).format(*tuple(line)))
1309 index = 0
1310 nfmt = ""
1311 for f in fmt:
1312 nfmt += f.replace("{X:", "{" + str(index) + ":")
1313 index += 1
1314 nfmt += "{" + str(index) + "}"
1315 index += 1
1316 l = len(str(index-1)) + 2
1317 nfmt = nfmt[:-l]
1318 print(nfmt.format(*tuple(line)))
1320 def write_zabbix(self, timestamp, values):
1321 """ Write (send) metrics to a Zabbix server """
1322 if timestamp == None and values == None:
1323 # Send any remaining buffered values
1324 if self.zabbix_metrics:
1325 send_to_zabbix(self.zabbix_metrics, self.zabbix_server, self.zabbix_port)
1326 self.zabbix_metrics = []
1327 return
1329 # Zabbix is always raw not rate
1330 ts = float(self.ctstamp)
1331 if self.zabbix_prevsend == None:
1332 self.zabbix_prevsend = ts
1333 for i, metric in enumerate(self.metrics):
1334 ins = 1 if self.insts[i][0][0] == PM_IN_NULL else len(self.insts[i][0])
1335 for j in range(ins):
1336 key = ZBXPRFX + metric
1337 if self.insts[i][1][j]:
1338 key += "[" + str(self.insts[i][1][j]) + "]"
1339 val = str(list(values[i])[j][2])
1340 self.zabbix_metrics.append(ZabbixMetric(self.zabbix_host, key, val, ts))
1342 if self.context.type == PM_CONTEXT_ARCHIVE:
1343 if len(self.zabbix_metrics) >= self.zabbix_interval:
1344 send_to_zabbix(self.zabbix_metrics, self.zabbix_server, self.zabbix_port)
1345 self.zabbix_metrics = []
1346 elif ts - self.zabbix_prevsend > self.zabbix_interval:
1347 send_to_zabbix(self.zabbix_metrics, self.zabbix_server, self.zabbix_port)
1348 self.zabbix_metrics = []
1349 self.zabbix_prevsend = ts
1351 def connect(self):
1352 """ Establish a PMAPI context to archive, host or local, via args """
1353 self.context = pmapi.pmContext.fromOptions(self.opts, sys.argv)
1355 if __name__ == '__main__':
1356 try:
1357 P = PMReporter()
1358 P.read_config()
1359 P.prepare_metrics()
1360 P.connect()
1361 P.validate_config()
1362 P.validate_metrics()
1363 P.execute()
1365 except pmapi.pmErr as error:
1366 sys.stderr.write('%s: %s\n' % (error.progname(), error.message()))
1367 except pmapi.pmUsageErr as usage:
1368 usage.message()
1369 except IOError as error:
1370 sys.stderr.write("%s\n" % str(error))
1371 except KeyboardInterrupt:
1372 sys.stdout.write("\n")