3 # Copyright (C) 2015 Marko Myllynen <myllynen@redhat.com>
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 # [zbxsend] Copyright (C) 2014 Sergey Kirillov <sergey.kirillov@gmail.com>
16 # All rights reserved.
18 # Redistribution and use in source and binary forms, with or without
19 # modification, are permitted provided that the following conditions
22 # 1. Redistributions of source code must retain the above copyright
23 # notice, this list of conditions and the following disclaimer.
25 # 2. Redistributions in binary form must reproduce the above copyright
26 # notice, this list of conditions and the following disclaimer in the
27 # documentation and/or other materials provided with the distribution.
29 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 # HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
35 # TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
36 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
39 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 # pylint: disable=fixme, line-too-long, bad-whitespace, invalid-name
42 # pylint: disable=superfluous-parens
43 """ Performance Metrics Reporter """
45 from collections
import OrderedDict
46 from datetime
import datetime
50 import configparser
as ConfigParser
54 import simplejson
as json
63 from pcp
import pmapi
, pmgui
, pmi
64 from cpmapi
import PM_CONTEXT_ARCHIVE
, PM_CONTEXT_HOST
, PM_CONTEXT_LOCAL
, PM_MODE_FORW
, PM_MODE_INTERP
, PM_ERR_TYPE
, PM_ERR_EOL
, PM_IN_NULL
, PM_SEM_COUNTER
, PM_TIME_MSEC
, PM_TIME_SEC
, PM_XTB_SET
65 from cpmapi
import PM_TYPE_32
, PM_TYPE_U32
, PM_TYPE_64
, PM_TYPE_U64
, PM_TYPE_FLOAT
, PM_TYPE_DOUBLE
, PM_TYPE_STRING
66 from cpmgui
import PM_REC_ON
, PM_REC_OFF
, PM_REC_SETARG
68 if sys
.version_info
[0] >= 3:
72 DEFAULT_CONFIG
= "./pmrep.conf"
74 # Default field separators, config/time formats, missing/truncated values
76 CSVTIME
= "%Y-%m-%d %H:%M:%S"
86 OUTPUT_ARCHIVE
= "archive"
88 OUTPUT_STDOUT
= "stdout"
89 OUTPUT_ZABBIX
= "zabbix"
91 class ZabbixMetric(object):
92 """ A Zabbix metric """
93 def __init__(self
, host
, key
, value
, clock
):
100 return 'Metric(%r, %r, %r, %r)' % (self
.host
, self
.key
, self
.value
, self
.clock
)
102 def recv_from_zabbix(sock
, count
):
103 """ Receive a response from a Zabbix server. """
105 while len(buf
) < count
:
106 chunk
= sock
.recv(count
- len(buf
))
112 def send_to_zabbix(metrics
, zabbix_host
, zabbix_port
, timeout
=15):
113 """ Send a set of metrics to a Zabbix server. """
116 # Zabbix has a very fragile JSON parser, so we cannot use json to
117 # dump the whole packet
120 clock
= m
.clock
or time
.time()
121 metrics_data
.append(('\t\t{\n'
124 '\t\t\t"value":%s,\n'
125 '\t\t\t"clock":%.5f}') % (j(m
.host
), j(m
.key
), j(m
.value
), clock
))
127 '\t"request":"sender data",\n'
129 '}') % (',\n'.join(metrics_data
))
131 data_len
= struct
.pack('<Q', len(json_data
))
132 packet
= b
'ZBXD\1' + data_len
+ json_data
.encode('utf-8')
134 zabbix
= socket
.socket()
135 zabbix
.connect((zabbix_host
, zabbix_port
))
136 zabbix
.settimeout(timeout
)
137 # send metrics to zabbix
138 zabbix
.sendall(packet
)
139 # get response header from zabbix
140 resp_hdr
= recv_from_zabbix(zabbix
, 13)
141 if not bytes
.decode(resp_hdr
).startswith('ZBXD\1') or len(resp_hdr
) != 13:
142 # debug: write('Invalid Zabbix response len=%d' % len(resp_hdr))
144 resp_body_len
= struct
.unpack('<Q', resp_hdr
[5:])[0]
145 # get response body from zabbix
146 resp_body
= zabbix
.recv(resp_body_len
)
147 resp
= json
.loads(bytes
.decode(resp_body
))
148 # debug: write('Got response from Zabbix: %s' % resp)
149 if resp
.get('response') != 'success':
150 sys
.stderr
.write('Error response from Zabbix: %s', resp
)
153 except socket
.timeout
as err
:
154 sys
.stderr
.write("Zabbix connection timed out: " + str(err
))
159 class PMReporter(object):
160 """ Report PCP metrics """
163 """ Construct object, prepare for command line handling """
166 self
.format
= None # output format
167 self
.opts
= self
.options()
169 # Configuration directives
170 self
.keys
= ('source', 'output', 'derived', 'header', 'unitinfo',
171 'globals', 'timestamp', 'samples', 'interval', 'runtime',
172 'delay', 'raw', 'width', 'precision', 'delimiter',
173 'extheader', 'repeat_header', 'timefmt', 'interpol',
174 'count_scale', 'space_scale', 'time_scale', 'version',
175 'zabbix_server', 'zabbix_port', 'zabbix_host', 'zabbix_interval')
177 # Command line switches without arguments
178 self
.argless
= ('-C', '-L', '-H', '-U', '-G', '-p', '-d', '--delay', '-r', '--raw', '-x', '--extended-header', '-u')
179 self
.arghelp
= ('-?', '--help', '-V', '--version')
181 # The order of preference for parameters (as present):
182 # 1 - command line parameters
183 # 2 - parameters from configuration file(s)
184 # 3 - built-in defaults defined below
185 self
.config
= self
.set_config_file()
186 self
.version
= VERSION
187 self
.source
= "local:"
188 self
.output
= OUTPUT_STDOUT
189 self
.archive
= None # output archive
190 self
.log
= None # pmi handle
196 self
.samples
= None # forever
197 self
.interval
= pmapi
.timeval(1) # 1 sec
198 self
.opts
.pmSetOptionInterval(str(1))
203 self
.precision
= 3 # .3f
204 self
.delimiter
= None
206 self
.repeat_header
= 0
209 self
.count_scale
= None
210 self
.space_scale
= None
211 self
.time_scale
= None
212 self
.can_scale
= None # PCP 3.9 compat
214 # Performance metrics store
216 # values - 0:label, 1:instance(s), 2:unit/scale, 3:rawness, 4:width
217 self
.metrics
= OrderedDict()
219 # Corresponding config file metric specifiers
220 self
.metricspec
= ('label', 'instance', 'unit', 'raw', 'width', 'formula')
231 self
.zabbix_server
= None
232 self
.zabbix_port
= ZBXPORT
233 self
.zabbix_host
= None
234 self
.zabbix_interval
= None
235 self
.zabbix_prevsend
= None
236 self
.zabbix_metrics
= []
238 def set_config_file(self
):
239 """ Set configuration file """
240 config
= DEFAULT_CONFIG
242 # Possibly override the built-in default config file before
243 # parsing the rest of the command line parameters
244 args
= iter(sys
.argv
[1:])
246 if arg
in self
.arghelp
:
248 if arg
== '-c' or arg
== '--config':
251 if not os
.path
.isfile(config
) or not os
.access(config
, os
.R_OK
):
252 raise IOError("Failed to read configuration file '%s'." % config
)
253 except StopIteration:
257 def set_attr(self
, name
, value
):
258 """ Helper to apply config file settings properly """
259 if value
in ('true', 'True', 'y', 'yes', 'Yes'):
261 if value
in ('false', 'False', 'n', 'no', 'No'):
264 try: # RHBZ#1270176 / PCP < 3.10.8
266 self
.opts
.pmSetOptionArchive(value
)
268 self
.opts
.pmSetOptionHost(value
)
270 sys
.stderr
.write("PCP 3.10.8 or later required for the 'source' directive.\n")
272 elif name
== 'samples':
273 self
.opts
.pmSetOptionSamples(value
)
274 self
.samples
= self
.opts
.pmGetOptionSamples()
275 elif name
== 'interval':
276 self
.opts
.pmSetOptionInterval(value
)
277 self
.interval
= self
.opts
.pmGetOptionInterval()
280 setattr(self
, name
, int(value
))
282 setattr(self
, name
, value
)
284 def read_config(self
):
285 """ Read options from configuration file """
286 if self
.config
is None:
288 config
= ConfigParser
.SafeConfigParser()
289 config
.read(self
.config
)
290 if not config
.has_section('options'):
292 for opt
in config
.options('options'):
294 self
.set_attr(opt
, config
.get('options', opt
))
296 sys
.stderr
.write("Invalid directive '%s' in %s.\n" % (opt
, self
.config
))
300 """ Setup default command line argument option handling """
301 opts
= pmapi
.pmOptions()
302 opts
.pmSetOptionCallback(self
.option
)
303 opts
.pmSetOverrideCallback(self
.option_override
)
304 opts
.pmSetShortOptions("a:h:LK:c:Co:F:e:D:V?HUGpA:S:T:O:s:t:R:Z:zdrw:f:l:xE:P:uq:b:y:")
305 opts
.pmSetShortUsage("[option...] metricspec [...]")
307 opts
.pmSetLongOptionHeader("General options")
308 opts
.pmSetLongOptionArchive() # -a/--archive
309 opts
.pmSetLongOptionArchiveFolio() # --archive-folio
310 opts
.pmSetLongOptionHost() # -h/--host
311 opts
.pmSetLongOptionLocalPMDA() # -L/--local-PMDA
312 opts
.pmSetLongOptionSpecLocal() # -K/--spec-local
313 opts
.pmSetLongOption("config", 1, "c", "FILE", "config file path")
314 opts
.pmSetLongOption("check", 0, "C", "", "check config and metrics and exit")
315 opts
.pmSetLongOption("output", 1, "o", "OUTPUT", "output target, one of: archive, csv, stdout (default), zabbix")
316 opts
.pmSetLongOption("output-archive", 1, "F", "ARCHIVE", "output archive/folio (with -o archive)")
317 opts
.pmSetLongOption("derived", 1, "e", "FILE|DFNT", "derived metrics definitions")
318 opts
.pmSetLongOptionDebug() # -D/--debug
319 opts
.pmSetLongOptionVersion() # -V/--version
320 opts
.pmSetLongOptionHelp() # -?/--help
322 opts
.pmSetLongOptionHeader("Reporting options")
323 opts
.pmSetLongOption("no-header", 0, "H", "", "omit headers")
324 opts
.pmSetLongOption("no-unit-info", 0, "U", "", "omit unit info from headers")
325 opts
.pmSetLongOption("no-globals", 0, "G", "", "omit global metrics")
326 opts
.pmSetLongOption("timestamps", 0, "p", "", "print timestamps")
327 opts
.pmSetLongOptionAlign() # -A/--align
328 opts
.pmSetLongOptionStart() # -S/--start
329 opts
.pmSetLongOptionFinish() # -T/--finish
330 opts
.pmSetLongOptionOrigin() # -O/--origin
331 opts
.pmSetLongOptionSamples() # -s/--samples
332 opts
.pmSetLongOptionInterval() # -t/--interval
333 opts
.pmSetLongOption("runtime", 1, "R", "N", "runtime duration (overrides -t or -s)")
334 opts
.pmSetLongOptionTimeZone() # -Z/--timezone
335 opts
.pmSetLongOptionHostZone() # -z/--hostzone
336 opts
.pmSetLongOption("delay", 0, "d", "", "delay, pause between updates for archive replay")
337 opts
.pmSetLongOption("raw", 0, "r", "", "output raw counter values (no rate conversion)")
338 opts
.pmSetLongOption("width", 1, "w", "N", "default column width")
339 opts
.pmSetLongOption("fixed-point", 1, "f", "N", "N digits after the decimal separator (if width enough)")
340 opts
.pmSetLongOption("delimiter", 1, "l", "STR", "delimiter to separate csv/stdout columns")
341 opts
.pmSetLongOption("extended-header", 0, "x", "", "display extended header")
342 opts
.pmSetLongOption("repeat-header", 1, "E", "N", "repeat stdout headers every N lines")
343 opts
.pmSetLongOption("timestamp-format", 1, "P", "STR", "strftime string for timestamp format")
344 opts
.pmSetLongOption("no-interpolation", 0, "u", "", "disable interpolation mode with archives")
345 opts
.pmSetLongOption("count-scale", 1, "q", "SCALE", "default count unit")
346 opts
.pmSetLongOption("space-scale", 1, "b", "SCALE", "default space unit")
347 opts
.pmSetLongOption("time-scale", 1, "y", "SCALE", "default time unit")
351 def option_override(self
, opt
):
352 """ Override a few standard PCP options """
353 if opt
== 'H' or opt
== 'p':
357 def option(self
, opt
, optarg
, index
):
358 """ Perform setup for an individual command line option """
364 if optarg
== OUTPUT_ARCHIVE
:
365 self
.output
= OUTPUT_ARCHIVE
366 elif optarg
== OUTPUT_CSV
:
367 self
.output
= OUTPUT_CSV
368 elif optarg
== OUTPUT_STDOUT
:
369 self
.output
= OUTPUT_STDOUT
370 elif optarg
== OUTPUT_ZABBIX
:
371 self
.output
= OUTPUT_ZABBIX
373 sys
.stderr
.write("Invalid output target %s specified.\n" % optarg
)
376 if os
.path
.exists(optarg
) or os
.path
.exists(optarg
+ ".index"):
377 sys
.stderr
.write("Archive/folio %s already exists.\n" % optarg
)
379 self
.archive
= optarg
381 self
.derived
= optarg
391 self
.runtime
= optarg
397 self
.width
= int(optarg
)
399 self
.precision
= int(optarg
)
401 self
.delimiter
= optarg
405 self
.repeat_header
= int(optarg
)
407 self
.timefmt
= optarg
411 self
.count_scale
= optarg
413 self
.space_scale
= optarg
415 self
.time_scale
= optarg
417 raise pmapi
.pmUsageErr()
419 def get_cmd_line_metrics(self
):
420 """ Get metric set specifications from the command line """
422 for arg
in sys
.argv
[1:]:
423 if arg
in self
.arghelp
:
425 for arg
in reversed(sys
.argv
[1:]):
426 if arg
.startswith('-'):
428 if arg
not in self
.argless
and '=' not in arg
:
435 def parse_metric_info(self
, metrics
, key
, value
):
436 """ Parse metric information """
437 # NB. Uses the config key, not the metric, as the dict key
439 # Compact / one-line definition
440 metrics
[key
] = (key
+ "," + value
).split(",")
442 # Chatty / multi-line definition
443 if not '.' in key
or key
.rsplit(".", 1)[1] not in self
.metricspec
:
445 metrics
[key
] = value
.split()
446 for index
in range(0, 6):
447 if len(metrics
[key
]) <= index
:
448 metrics
[key
].append(None)
451 key
, spec
= key
.rsplit(".", 1)
452 if key
not in metrics
:
453 sys
.stderr
.write("Undeclared metric key %s.\n" % key
)
455 if spec
== "formula":
456 if self
.derived
== None:
457 self
.derived
= metrics
[key
][0] + "=" + value
459 self
.derived
+= "," + metrics
[key
][0] + "=" + value
461 metrics
[key
][self
.metricspec
.index(spec
)+1] = value
463 def prepare_metrics(self
):
464 """ Construct and prepare the initial metrics set """
465 # Get direct and/or sets of metrics from the command line
466 metrics
= self
.get_cmd_line_metrics()
470 sys
.stderr
.write("No metrics specified.\n")
471 raise pmapi
.pmUsageErr()
473 # Ugly, but we haven't read our command line via PMAPI yet
478 config
= ConfigParser
.SafeConfigParser()
479 config
.read(self
.config
)
481 # First read global metrics (if not disabled already)
482 globmet
= OrderedDict()
483 if self
.globals == 1:
484 if config
.has_section('global'):
485 parsemet
= OrderedDict()
486 for key
in config
.options('global'):
487 self
.parse_metric_info(parsemet
, key
, config
.get('global', key
))
488 for metric
in parsemet
:
489 name
= parsemet
[metric
][:1][0]
490 globmet
[name
] = parsemet
[metric
][1:]
491 if globmet
[name
][0] == None:
492 globmet
[name
][0] = metric
494 # Add command line and configuration file metric sets
495 tempmet
= OrderedDict()
496 for metric
in metrics
:
497 if metric
.startswith(":"):
498 tempmet
[metric
[1:]] = None
500 m
= metric
.split(",")
501 tempmet
[m
[0]] = m
[1:]
503 # Get config and set details for configuration file metric sets
504 confmet
= OrderedDict()
506 if tempmet
[spec
] == None:
507 if config
.has_section(spec
):
508 parsemet
= OrderedDict()
509 for key
in config
.options(spec
):
511 self
.set_attr(key
, config
.get(spec
, key
))
513 self
.parse_metric_info(parsemet
, key
, config
.get(spec
, key
))
514 for metric
in parsemet
:
515 name
= parsemet
[metric
][:1][0]
516 confmet
[name
] = parsemet
[metric
][1:]
517 if confmet
[name
][0] == None:
518 confmet
[name
][0] = metric
519 tempmet
[spec
] = confmet
521 raise IOError("Metric set definition '%s' not found." % metric
)
523 # Create the combined metrics set
524 if self
.globals == 1:
525 for metric
in globmet
:
526 self
.metrics
[metric
] = globmet
[metric
]
527 for metric
in tempmet
:
528 if type(tempmet
[metric
]) is list:
529 self
.metrics
[metric
] = tempmet
[metric
]
531 for m
in tempmet
[metric
]:
532 self
.metrics
[m
] = confmet
[m
]
534 def check_metric(self
, metric
):
535 """ Validate individual metric and get its details """
537 pmid
= self
.context
.pmLookupName(metric
)[0]
538 desc
= self
.context
.pmLookupDescs(pmid
)[0]
540 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
541 inst
= self
.context
.pmGetInDomArchive(desc
)
543 inst
= self
.context
.pmGetInDom(desc
) # disk.dev.read
545 inst
= ([PM_IN_NULL
], [None]) # pmcd.pmie.logfile
547 inst
= ([PM_IN_NULL
], [None]) # mem.util.free
548 # Reject unsupported types
549 mtype
= desc
.contents
.type
550 if not (mtype
== PM_TYPE_32
or
551 mtype
== PM_TYPE_U32
or
552 mtype
== PM_TYPE_64
or
553 mtype
== PM_TYPE_U64
or
554 mtype
== PM_TYPE_FLOAT
or
555 mtype
== PM_TYPE_DOUBLE
or
556 mtype
== PM_TYPE_STRING
):
557 raise pmapi
.pmErr(PM_ERR_TYPE
)
558 self
.pmids
.append(pmid
)
559 self
.descs
.append(desc
)
560 self
.insts
.append(inst
)
561 except pmapi
.pmErr
as error
:
562 sys
.stderr
.write("Invalid metric %s (%s).\n" % (metric
, str(error
)))
565 def validate_config(self
):
566 """ Validate configuration parameters """
567 if self
.version
!= VERSION
:
568 sys
.stderr
.write("Incompatible configuration file version (read v%s, need v%d).\n" % (self
.version
, VERSION
))
571 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
572 self
.source
= self
.opts
.pmGetOptionArchives()[0] # RHBZ#1262723
573 if self
.context
.type == PM_CONTEXT_HOST
:
574 self
.source
= self
.context
.pmGetContextHostName()
575 if self
.context
.type == PM_CONTEXT_LOCAL
:
576 self
.source
= "@" # PCPIntro(1), RHBZ#1272082
578 if self
.output
== OUTPUT_ARCHIVE
and not self
.archive
:
579 sys
.stderr
.write("Archive/folio must be defined with archive output.\n")
582 if self
.output
== OUTPUT_ZABBIX
and (not self
.zabbix_server
or \
583 not self
.zabbix_port
or not self
.zabbix_host
):
584 sys
.stderr
.write("zabbix_server, zabbix_port, and zabbix_host must be defined with Zabbix.\n")
587 # Runtime overrides samples/interval/endtime
588 if self
.runtime
!= -1:
589 self
.runtime
= int(pmapi
.timeval
.fromInterval(self
.runtime
))
590 if self
.opts
.pmGetOptionSamples():
591 self
.samples
= self
.opts
.pmGetOptionSamples()
594 self
.interval
= float(self
.runtime
) / (self
.samples
- 1)
595 self
.opts
.pmSetOptionInterval(str(self
.interval
))
596 self
.interval
= self
.opts
.pmGetOptionInterval()
598 self
.interval
= self
.opts
.pmGetOptionInterval()
599 self
.samples
= self
.runtime
/ int(self
.interval
) + 1
600 if int(self
.interval
) > self
.runtime
:
601 sys
.stderr
.write("Interval can't be longer than runtime.\n")
604 self
.samples
= self
.opts
.pmGetOptionSamples()
605 self
.interval
= self
.opts
.pmGetOptionInterval()
607 if self
.output
== OUTPUT_ZABBIX
:
608 if self
.zabbix_interval
:
609 self
.zabbix_interval
= int(pmapi
.timeval
.fromInterval(self
.zabbix_interval
))
610 if self
.zabbix_interval
< int(self
.interval
):
611 self
.zabbix_interval
= int(self
.interval
)
613 self
.zabbix_interval
= int(self
.interval
)
615 self
.can_scale
= "pmParseUnitsStr" in dir(self
.context
)
617 def validate_metrics(self
):
618 """ Validate the metrics set """
619 # Check the metrics against PMNS, resolve non-leaf metrics
621 if self
.derived
.startswith("/") or self
.derived
.startswith("."):
623 self
.context
.pmLoadDerivedConfig(self
.derived
)
624 except pmapi
.pmErr
as error
:
625 sys
.stderr
.write("Failed to register derived metric: %s.\n" % str(error
))
628 for definition
in self
.derived
.split(","):
631 name
, expr
= definition
.split("=")
632 self
.context
.pmRegisterDerived(name
.strip(), expr
.strip())
633 except ValueError as error
:
634 err
= "Invalid syntax (expected metric=expression)"
635 except Exception as error
:
636 #err = self.context.pmDerivedErrStr() # RHBZ#1286733
637 err
= "Unknown reason"
640 sys
.stderr
.write("Failed to register derived metric: %s.\n" % err
)
642 # Prepare for non-leaf metrics
643 metrics
= self
.metrics
644 self
.metrics
= OrderedDict()
645 for metric
in metrics
:
648 self
.context
.pmTraversePMNS(metric
, self
.check_metric
)
649 if len(self
.pmids
) == l
+ 1:
651 if metric
== self
.context
.pmNameID(self
.pmids
[l
]):
652 self
.metrics
[metric
] = metrics
[metric
]
654 # But handle single non-leaf case in an archive
655 self
.metrics
[self
.context
.pmNameID(self
.pmids
[l
])] = []
658 for i
in range(l
, len(self
.pmids
)):
659 name
= self
.context
.pmNameID(self
.pmids
[i
])
660 # We ignore specs like disk.dm,,,MB on purpose, for now
661 self
.metrics
[name
] = []
662 except pmapi
.pmErr
as error
:
663 sys
.stderr
.write("Invalid metric %s (%s).\n" % (metric
, str(error
)))
666 # Finalize the metrics set
667 for i
, metric
in enumerate(self
.metrics
):
668 # Fill in all fields for easier checking later
669 for index
in range(0, 5):
670 if len(self
.metrics
[metric
]) <= index
:
671 self
.metrics
[metric
].append(None)
674 if not self
.metrics
[metric
][0]:
675 # mem.util.free -> m.u.free
677 for m
in metric
.split("."):
679 self
.metrics
[metric
][0] = name
[:-2] + m
682 if self
.metrics
[metric
][3] == 'r' or self
.raw
== 1:
683 self
.metrics
[metric
][3] = 1
685 self
.metrics
[metric
][3] = 0
688 unitstr
= str(self
.descs
[i
].contents
.units
)
689 # Set default unit if not specified on per-metric basis
690 if not self
.metrics
[metric
][2]:
692 unit
= self
.descs
[i
].contents
.units
693 if self
.count_scale
and \
694 unit
.dimCount
== 1 and ( \
695 unit
.dimSpace
== 0 and
697 self
.metrics
[metric
][2] = self
.count_scale
699 if self
.space_scale
and \
700 unit
.dimSpace
== 1 and ( \
701 unit
.dimCount
== 0 and
703 self
.metrics
[metric
][2] = self
.space_scale
705 if self
.time_scale
and \
706 unit
.dimTime
== 1 and ( \
707 unit
.dimCount
== 0 and
709 self
.metrics
[metric
][2] = self
.time_scale
712 self
.metrics
[metric
][2] = unitstr
713 # Set unit/scale for non-raw numeric metrics
715 if self
.metrics
[metric
][3] == 0 and self
.can_scale
and \
716 self
.descs
[i
].contents
.type != PM_TYPE_STRING
:
717 (unitstr
, mult
) = self
.context
.pmParseUnitsStr(self
.metrics
[metric
][2])
718 label
= self
.metrics
[metric
][2]
719 if self
.descs
[i
].sem
== PM_SEM_COUNTER
:
721 if self
.descs
[i
].contents
.units
.dimTime
== 1:
723 self
.metrics
[metric
][2] = (label
, unitstr
, mult
)
725 self
.metrics
[metric
][2] = (unitstr
, unitstr
, 1)
726 except pmapi
.pmErr
as error
:
727 sys
.stderr
.write("%s: %s.\n" % (str(error
), self
.metrics
[metric
][2]))
730 # Set default width if not specified on per-metric basis
731 if self
.metrics
[metric
][4]:
732 self
.metrics
[metric
][4] = int(self
.metrics
[metric
][4])
733 elif self
.width
!= 0:
734 self
.metrics
[metric
][4] = self
.width
736 self
.metrics
[metric
][4] = len(self
.metrics
[metric
][0])
737 if self
.metrics
[metric
][4] < len(TRUNC
):
738 self
.metrics
[metric
][4] = len(TRUNC
) # Forced minimum
741 def pmids_to_ctypes(self
, pmids
):
742 """ Convert a Python list of pmids (numbers) to
743 a ctypes LP_c_uint (a C array of uints).
745 from ctypes
import c_uint
746 pmidA
= (c_uint
* len(pmids
))()
747 for i
, p
in enumerate(pmids
):
751 def get_mode_step(self
):
752 """ Get mode and step for pmSetMode """
753 if not self
.interpol
or self
.output
== OUTPUT_ARCHIVE
:
757 mode
= PM_MODE_INTERP
758 secs_in_24_days
= 2073600
759 if self
.interval
.tv_sec
> secs_in_24_days
:
760 step
= self
.interval
.tv_sec
761 mode |
= PM_XTB_SET(PM_TIME_SEC
)
763 step
= self
.interval
.tv_sec
*1000 + self
.interval
.tv_usec
/1000
764 mode |
= PM_XTB_SET(PM_TIME_MSEC
)
768 """ Using a PMAPI context (could be either host or archive),
769 fetch and report the requested set of values on stdout.
771 # Set output primitives
772 if self
.delimiter
== None:
773 if self
.output
== OUTPUT_CSV
:
774 self
.delimiter
= CSVSEP
776 self
.delimiter
= OUTSEP
779 if self
.opts
.pmGetOptionTimezone():
780 os
.environ
['TZ'] = self
.opts
.pmGetOptionTimezone()
782 self
.context
.pmNewZone(self
.opts
.pmGetOptionTimezone())
784 if self
.timefmt
== None:
785 if self
.output
== OUTPUT_CSV
:
786 self
.timefmt
= CSVTIME
788 self
.timefmt
= OUTTIME
792 if self
.context
.type != PM_CONTEXT_ARCHIVE
:
796 # DBG_TRACE_APPL1 == 4096
797 if "pmDebug" in dir(self
.context
) and self
.context
.pmDebug(4096):
798 print("Known config file keywords: " + str(self
.keys
))
799 print("Known metric spec keywords: " + str(self
.metricspec
))
801 # Printing format and headers
802 if self
.format
== None:
805 if self
.extheader
== 1:
807 self
.write_ext_header()
813 self
.repeat_header
= 0
819 # Archive recording from non-archive is handled separately
820 if self
.output
== OUTPUT_ARCHIVE
:
821 if self
.context
.type != PM_CONTEXT_ARCHIVE
:
822 self
.write_archive_pmgui()
825 # Archive fetching mode
826 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
827 (mode
, step
) = self
.get_mode_step()
828 self
.context
.pmSetMode(mode
, self
.opts
.pmGetOptionOrigin(), step
)
831 while self
.samples
!= 0:
832 if self
.output
== OUTPUT_STDOUT
:
833 if lines
> 1 and self
.repeat_header
== lines
:
839 result
= self
.context
.pmFetch(self
.pmids_to_ctypes(self
.pmids
))
840 except pmapi
.pmErr
as error
:
841 if error
.args
[0] == PM_ERR_EOL
:
845 self
.context
.pmSortInstances(result
) # XXX Is this really needed?
846 values
= self
.extract(result
)
847 if self
.ctstamp
== 0:
848 self
.ctstamp
= copy
.copy(result
.contents
.timestamp
)
849 self
.ptstamp
= self
.ctstamp
850 self
.ctstamp
= copy
.copy(result
.contents
.timestamp
)
852 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
853 if float(self
.ctstamp
) < float(self
.opts
.pmGetOptionStart()):
854 self
.context
.pmFreeResult(result
)
856 if float(self
.ctstamp
) > float(self
.opts
.pmGetOptionFinish()):
859 self
.report(self
.ctstamp
, values
)
860 self
.context
.pmFreeResult(result
)
861 if self
.samples
and self
.samples
> 0:
863 if self
.delay
and self
.interpol
and self
.samples
!= 0:
864 self
.context
.pmtimevalSleep(self
.interval
)
866 # Allow modules to flush buffered values / say goodbye
867 self
.report(None, None)
869 def extract(self
, result
):
870 """ Extract the metric values from pmResult structure """
871 # Metrics incl. all instance values, must match self.format on return
874 for i
, metric
in enumerate(self
.metrics
):
875 # Per-metric values incl. all instance values
876 # We use dict to make it easier to deal with gone/unknown instances
879 # Populate instance fields to have values for unavailable instances
880 # Values are (instance id, instance name, instance value)
881 for inst
in self
.insts
[i
][0]:
882 values
[i
][inst
] = (-1, None, NO_VAL
)
884 # No values available for this metric
885 if result
.contents
.get_numval(i
) == 0:
888 # Process all fetched instances
889 for j
in range(result
.contents
.get_numval(i
)):
890 inst
= result
.contents
.get_inst(i
, j
)
892 # Locate the correct instance and its position
894 if inst
not in self
.insts
[i
][0]:
895 # Ignore newly emerged instances
898 while inst
!= self
.insts
[i
][0][k
]:
901 # Extract and scale the value
903 # Use native type if no rescaling needed
904 if self
.metrics
[metric
][2][2] == 1 and \
905 str(self
.descs
[i
].contents
.units
) == \
906 str(self
.metrics
[metric
][2][1]):
908 vtype
= self
.descs
[i
].contents
.type
911 vtype
= PM_TYPE_DOUBLE
913 atom
= self
.context
.pmExtractValue(
914 result
.contents
.get_valfmt(i
),
915 result
.contents
.get_vlist(i
, j
),
916 self
.descs
[i
].contents
.type,
919 if self
.metrics
[metric
][3] != 1 and rescale
and \
920 self
.descs
[i
].contents
.type != PM_TYPE_STRING
and \
922 atom
= self
.context
.pmConvScale(
925 self
.metrics
[metric
][2][1])
927 val
= atom
.dref(vtype
)
928 if rescale
and self
.can_scale
and \
929 self
.descs
[i
].contents
.type != PM_TYPE_STRING
:
930 val
*= self
.metrics
[metric
][2][2]
931 val
= int(val
) if val
== int(val
) else val
934 values
[i
][inst
] = (inst
, self
.insts
[i
][1][k
], val
)
936 values
[i
][PM_IN_NULL
] = (-1, None, val
)
938 except pmapi
.pmErr
as error
:
939 sys
.stderr
.write("%s: %s, aborting.\n" % (metric
, str(error
)))
942 # Convert dicts to lists
945 vals
.append(v
.values())
948 # Store current and previous values
949 # Output modules need to handle non-existing self.prevvals
950 self
.prevvals
= self
.currvals
951 self
.currvals
= values
953 return values
# XXX Redundant now
955 def report(self
, tstamp
, values
):
956 """ Report the metric values """
958 tstamp
= datetime
.fromtimestamp(tstamp
).strftime(self
.timefmt
)
960 if self
.output
== OUTPUT_ARCHIVE
:
961 self
.write_archive_pmi(tstamp
, values
)
962 if self
.output
== OUTPUT_CSV
:
963 self
.write_csv(tstamp
, values
)
964 if self
.output
== OUTPUT_STDOUT
:
965 self
.write_stdout(tstamp
, values
)
966 if self
.output
== OUTPUT_ZABBIX
:
967 self
.write_zabbix(tstamp
, values
)
969 def define_format(self
):
970 """ Define stdout format string """
972 if self
.timestamp
== 0:
973 #self.format = "{:}{}"
974 self
.format
= "{0:}{1}"
977 tstamp
= datetime
.fromtimestamp(time
.time()).strftime(self
.timefmt
)
978 #self.format = "{:" + str(len(tstamp)) + "}{}"
979 self
.format
= "{" + str(index
) + ":" + str(len(tstamp
)) + "}"
981 self
.format
+= "{" + str(index
) + "}"
983 for i
, metric
in enumerate(self
.metrics
):
984 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
986 l
= str(self
.metrics
[metric
][4])
987 #self.format += "{:>" + l + "." + l + "}{}"
988 self
.format
+= "{" + str(index
) + ":>" + l
+ "." + l
+ "}"
990 self
.format
+= "{" + str(index
) + "}"
992 #self.format = self.format[:-2]
993 l
= len(str(index
-1)) + 2
994 self
.format
= self
.format
[:-l
]
996 def write_ext_header(self
):
997 """ Write extended header """
998 comm
= "#" if self
.output
== OUTPUT_CSV
else ""
1000 if self
.runtime
!= -1:
1001 duration
= self
.runtime
1002 samples
= self
.samples
1005 duration
= (self
.samples
- 1) * int(self
.interval
)
1006 samples
= self
.samples
1007 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1008 if not self
.interpol
:
1009 samples
= str(samples
) + " (requested)"
1011 duration
= int(float(self
.opts
.pmGetOptionFinish()) - float(self
.opts
.pmGetOptionStart()))
1012 samples
= (duration
/ int(self
.interval
)) + 1
1013 duration
= (samples
- 1) * int(self
.interval
)
1014 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1015 if not self
.interpol
:
1017 endtime
= float(self
.opts
.pmGetOptionStart()) + duration
1019 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1020 host
= self
.context
.pmGetArchiveLabel().hostname
1021 if not self
.interpol
and not self
.opts
.pmGetOptionFinish():
1022 endtime
= self
.context
.pmGetArchiveEnd()
1023 if self
.context
.type == PM_CONTEXT_HOST
:
1025 if self
.context
.type == PM_CONTEXT_LOCAL
:
1026 host
= "localhost, using DSO PMDAs"
1028 # Figure out the current timezone using the PCP convention
1029 if self
.opts
.pmGetOptionTimezone():
1030 currtz
= self
.opts
.pmGetOptionTimezone()
1032 dst
= time
.localtime().tm_isdst
1033 offset
= time
.altzone
if dst
else time
.timezone
1034 currtz
= time
.tzname
[dst
]
1036 currtz
+= str(offset
/3600)
1039 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1040 if self
.context
.pmGetArchiveLabel().tz
!= timezone
:
1041 timezone
= self
.context
.pmGetArchiveLabel().tz
1042 timezone
+= " (creation, current is " + currtz
+ ")"
1045 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1046 print(comm
+ " archive: " + self
.source
)
1047 print(comm
+ " host: " + host
)
1048 print(comm
+ " timezone: " + timezone
)
1049 print(comm
+ " start: " + time
.asctime(time
.localtime(self
.opts
.pmGetOptionStart())))
1050 print(comm
+ " end: " + time
.asctime(time
.localtime(endtime
)))
1051 print(comm
+ " samples: " + str(samples
))
1052 if not (self
.context
.type == PM_CONTEXT_ARCHIVE
and not self
.interpol
):
1053 print(comm
+ " interval: " + str(float(self
.interval
)) + " sec")
1054 print(comm
+ " duration: " + str(duration
) + " sec")
1056 print(comm
+ " interval: N/A")
1057 print(comm
+ " duration: N/A")
1060 def write_header(self
):
1061 """ Write metrics header """
1062 if self
.output
== OUTPUT_ARCHIVE
:
1063 sys
.stdout
.write("Recording archive/folio %s" % self
.archive
)
1064 if self
.runtime
!= -1:
1065 sys
.stdout
.write(":\n%s samples(s) with %.1f sec interval ~ %d sec duration.\n" % (self
.samples
, float(self
.interval
), self
.runtime
+ 1))
1067 duration
= (self
.samples
- 1) * int(self
.interval
)
1068 sys
.stdout
.write(":\n%s samples(s) with %.1f sec interval ~ %d sec duration.\n" % (self
.samples
, float(self
.interval
), duration
+ 1))
1070 if self
.context
.type != PM_CONTEXT_ARCHIVE
:
1071 sys
.stdout
.write("... (Ctrl-C to stop)")
1072 sys
.stdout
.write("\n")
1075 if self
.output
== OUTPUT_CSV
:
1076 header
= "metric" + self
.delimiter
1077 if self
.timestamp
== 1:
1078 header
+= "timestamp" + self
.delimiter
1079 for f
in "name", "unit", "value":
1080 header
+= f
+ self
.delimiter
1081 header
= header
[:-len(self
.delimiter
)]
1084 if self
.output
== OUTPUT_STDOUT
:
1085 names
= ["", self
.delimiter
] # no timestamp on header line
1086 insts
= ["", self
.delimiter
] # no timestamp on instances line
1087 units
= ["", self
.delimiter
] # no timestamp on units line
1089 for i
, metric
in enumerate(self
.metrics
):
1090 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1091 prnti
= 1 if self
.insts
[i
][0][0] != PM_IN_NULL
else 0
1092 for j
in range(ins
):
1093 names
.append(self
.metrics
[metric
][0])
1094 names
.append(self
.delimiter
)
1095 units
.append(self
.metrics
[metric
][2][0])
1096 units
.append(self
.delimiter
)
1098 insts
.append(self
.insts
[i
][1][j
])
1100 insts
.append(self
.delimiter
)
1101 insts
.append(self
.delimiter
)
1105 print(self
.format
.format(*tuple(names
)))
1107 print(self
.format
.format(*tuple(insts
)))
1109 print(self
.format
.format(*tuple(units
)))
1111 if self
.output
== OUTPUT_ZABBIX
:
1112 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1115 self
.zabbix_interval
= 250 # See zabbix_sender(8)
1116 sys
.stdout
.write("Sending archived metrics to Zabbix server %s...\n(Ctrl-C to stop)\n" % self
.zabbix_server
)
1119 sys
.stdout
.write("Sending metrics to Zabbix server %s every %d sec" % (self
.zabbix_server
, self
.zabbix_interval
))
1120 if self
.runtime
!= -1:
1121 sys
.stdout
.write(":\n%s samples(s) with %.1f sec interval ~ %d sec runtime.\n" % (self
.samples
, float(self
.interval
), self
.runtime
))
1123 duration
= (self
.samples
- 1) * int(self
.interval
)
1124 sys
.stdout
.write(":\n%s samples(s) with %.1f sec interval ~ %d sec runtime.\n" % (self
.samples
, float(self
.interval
), duration
))
1126 sys
.stdout
.write("...\n(Ctrl-C to stop)\n")
1128 def write_archive_pmgui(self
):
1129 """ Write an archive entry using pmgui """
1130 # We're not a graphical app, disable popups
1131 os
.environ
['PCP_XCONFIRM_PROG'] = '/bin/true'
1133 # Create the archive folio using pmgui
1134 context
= pmgui
.GuiClient()
1135 config
= "log mandatory on every " + str(int(self
.interval
)) + " sec {\n"
1136 for metric
in self
.metrics
:
1137 config
+= metric
+ "\n"
1139 context
.pmRecordSetup(self
.archive
, ' '.join(sys
.argv
), 0)
1140 context
.pmRecordAddHost(self
.source
, 1, config
)
1142 if self
.runtime
!= -1:
1143 duration
= self
.runtime
1145 if self
.samples
< 2:
1147 duration
= (self
.samples
- 1) * int(self
.interval
)
1149 endtime
= "-T" + str(duration
) + "sec"
1150 context
.pmRecordControl(0, PM_REC_SETARG
, endtime
)
1151 context
.pmRecordControl(0, PM_REC_ON
, "")
1153 time
.sleep(0xFFFFFFFF) # A very long time
1154 context
.pmRecordControl(0, PM_REC_OFF
, "") # Non-mandatory
1156 for i
in range(duration
):
1157 sys
.stdout
.write("\rProgress: %3d%%" % int(float(i
) / duration
* 100))
1160 sys
.stdout
.write("\rProgress: 99%")
1162 time
.sleep(1) # Make sure the last record gets there
1163 # For cleanliness only, -T should have stopped recording by now
1164 context
.pmRecordControl(0, PM_REC_OFF
, "")
1165 sys
.stdout
.write("\rComplete: 100%.\n")
1167 def write_archive_pmi(self
, timestamp
, values
):
1168 """ Write an archive entry using pmi """
1169 if timestamp
== None and values
== None:
1170 # Complete and close
1174 if self
.log
== None:
1175 # Create a new archive
1176 self
.log
= pmi
.pmiLogImport(self
.archive
)
1177 self
.log
.pmiSetHostname(self
.context
.pmGetArchiveLabel().hostname
)
1178 self
.log
.pmiSetTimezone(self
.context
.pmGetArchiveLabel().tz
)
1179 for i
, metric
in enumerate(self
.metrics
):
1180 self
.log
.pmiAddMetric(metric
,
1182 self
.descs
[i
].contents
.type,
1183 self
.descs
[i
].contents
.indom
,
1184 self
.descs
[i
].contents
.sem
,
1185 self
.descs
[i
].contents
.units
)
1186 ins
= 0 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1187 for j
in range(ins
):
1188 self
.log
.pmiAddInstance(self
.descs
[i
].contents
.indom
, self
.insts
[i
][1][j
], self
.insts
[i
][0][j
])
1190 # Add current values
1192 for i
, metric
in enumerate(self
.metrics
):
1193 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1194 for j
in range(ins
):
1195 if str(list(values
[i
])[j
][2]) != NO_VAL
:
1197 inst
= self
.insts
[i
][1][j
]
1198 if inst
== None: # RHBZ#1285371
1200 if self
.descs
[i
].contents
.type == PM_TYPE_STRING
:
1201 self
.log
.pmiPutValue(metric
, inst
, str(values
[i
][j
][2]))
1202 elif self
.descs
[i
].contents
.type == PM_TYPE_FLOAT
or \
1203 self
.descs
[i
].contents
.type == PM_TYPE_DOUBLE
:
1204 self
.log
.pmiPutValue(metric
, inst
, "%f" % values
[i
][j
][2])
1206 self
.log
.pmiPutValue(metric
, inst
, "%d" % values
[i
][j
][2])
1210 # pylint: disable=maybe-no-member
1211 self
.log
.pmiWrite(self
.ctstamp
.tv_sec
, self
.ctstamp
.tv_usec
)
1213 def write_csv(self
, timestamp
, values
):
1214 """ Write a line in CSV format """
1215 if timestamp
== None and values
== None:
1219 # CSV is always raw not rate
1220 for i
, metric
in enumerate(self
.metrics
):
1221 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1222 for j
in range(ins
):
1224 if self
.insts
[i
][1][j
]:
1225 line
+= "." + str(self
.insts
[i
][1][j
])
1226 line
+= self
.delimiter
1227 if self
.timestamp
== 1:
1228 line
+= timestamp
+ self
.delimiter
1229 line
+= str(self
.metrics
[metric
][0]) + self
.delimiter
1230 line
+= str(self
.metrics
[metric
][2][0]) + self
.delimiter
1231 line
+= str(list(values
[i
])[j
][2])
1234 def write_stdout(self
, timestamp
, values
):
1235 """ Write a line to stdout """
1236 if timestamp
== None and values
== None:
1240 #fmt = self.format.split("{}")
1241 fmt
= re
.split("{\\d+}", self
.format
)
1244 if self
.timestamp
== 0:
1247 line
.append(timestamp
)
1248 line
.append(self
.delimiter
)
1251 for i
, metric
in enumerate(self
.metrics
):
1252 l
= self
.metrics
[metric
][4]
1254 for j
in range(len(values
[i
])):
1258 if not self
.metrics
[metric
][3] and \
1259 (self
.prevvals
== None or self
.prevvals
[i
][j
][2] == NO_VAL
):
1260 # Rate not yet possible
1262 elif self
.metrics
[metric
][3] or \
1263 self
.descs
[i
].sem
!= PM_SEM_COUNTER
or \
1264 values
[i
][j
][2] == NO_VAL
:
1266 value
= list(values
[i
])[j
][2]
1270 if self
.descs
[i
].contents
.units
.dimTime
!= 0:
1271 if self
.descs
[i
].contents
.units
.scaleTime
> PM_TIME_SEC
:
1272 scale
= pow(60, (PM_TIME_SEC
- self
.descs
[i
].contents
.units
.scaleTime
))
1274 scale
= pow(1000, (PM_TIME_SEC
- self
.descs
[i
].contents
.units
.scaleTime
))
1275 delta
= scale
* (float(self
.ctstamp
) - float(self
.ptstamp
))
1276 value
= (values
[i
][j
][2] - self
.prevvals
[i
][j
][2]) / delta
if delta
else 0
1278 # Make sure the value fits
1279 if type(value
) is int or type(value
) is long:
1280 if len(str(value
)) > l
:
1283 #fmt[k] = "{:" + str(l) + "d}"
1284 fmt
[k
] = "{X:" + str(l
) + "d}"
1286 if type(value
) is float:
1288 s
= len(str(int(value
)))
1292 #for _ in reversed(range(c+1)):
1293 #t = "{:" + str(l) + "." + str(c) + "f}"
1294 for f
in reversed(range(c
+1)):
1295 r
= "{X:" + str(l
) + "." + str(c
) + "f}"
1296 t
= "{0:" + str(l
) + "." + str(c
) + "f}"
1297 if len(t
.format(value
)) > l
:
1305 line
.append(self
.delimiter
)
1308 #print('{}'.join(fmt).format(*tuple(line)))
1312 nfmt
+= f
.replace("{X:", "{" + str(index
) + ":")
1314 nfmt
+= "{" + str(index
) + "}"
1316 l
= len(str(index
-1)) + 2
1318 print(nfmt
.format(*tuple(line
)))
1320 def write_zabbix(self
, timestamp
, values
):
1321 """ Write (send) metrics to a Zabbix server """
1322 if timestamp
== None and values
== None:
1323 # Send any remaining buffered values
1324 if self
.zabbix_metrics
:
1325 send_to_zabbix(self
.zabbix_metrics
, self
.zabbix_server
, self
.zabbix_port
)
1326 self
.zabbix_metrics
= []
1329 # Zabbix is always raw not rate
1330 ts
= float(self
.ctstamp
)
1331 if self
.zabbix_prevsend
== None:
1332 self
.zabbix_prevsend
= ts
1333 for i
, metric
in enumerate(self
.metrics
):
1334 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1335 for j
in range(ins
):
1336 key
= ZBXPRFX
+ metric
1337 if self
.insts
[i
][1][j
]:
1338 key
+= "[" + str(self
.insts
[i
][1][j
]) + "]"
1339 val
= str(list(values
[i
])[j
][2])
1340 self
.zabbix_metrics
.append(ZabbixMetric(self
.zabbix_host
, key
, val
, ts
))
1342 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1343 if len(self
.zabbix_metrics
) >= self
.zabbix_interval
:
1344 send_to_zabbix(self
.zabbix_metrics
, self
.zabbix_server
, self
.zabbix_port
)
1345 self
.zabbix_metrics
= []
1346 elif ts
- self
.zabbix_prevsend
> self
.zabbix_interval
:
1347 send_to_zabbix(self
.zabbix_metrics
, self
.zabbix_server
, self
.zabbix_port
)
1348 self
.zabbix_metrics
= []
1349 self
.zabbix_prevsend
= ts
1352 """ Establish a PMAPI context to archive, host or local, via args """
1353 self
.context
= pmapi
.pmContext
.fromOptions(self
.opts
, sys
.argv
)
1355 if __name__
== '__main__':
1362 P
.validate_metrics()
1365 except pmapi
.pmErr
as error
:
1366 sys
.stderr
.write('%s: %s\n' % (error
.progname(), error
.message()))
1367 except pmapi
.pmUsageErr
as usage
:
1369 except IOError as error
:
1370 sys
.stderr
.write("%s\n" % str(error
))
1371 except KeyboardInterrupt:
1372 sys
.stdout
.write("\n")