3 # Copyright (C) 2015 Marko Myllynen <myllynen@redhat.com>
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 # [zbxsend] Copyright (C) 2014 Sergey Kirillov <sergey.kirillov@gmail.com>
16 # All rights reserved.
18 # Redistribution and use in source and binary forms, with or without
19 # modification, are permitted provided that the following conditions
22 # 1. Redistributions of source code must retain the above copyright
23 # notice, this list of conditions and the following disclaimer.
25 # 2. Redistributions in binary form must reproduce the above copyright
26 # notice, this list of conditions and the following disclaimer in the
27 # documentation and/or other materials provided with the distribution.
29 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 # HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
35 # TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
36 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
39 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 # pylint: disable=fixme, line-too-long, bad-whitespace, invalid-name
42 # pylint: disable=superfluous-parens
43 """ Performance Metrics Reporter """
45 from collections
import OrderedDict
46 from datetime
import datetime
50 import configparser
as ConfigParser
54 import simplejson
as json
63 from pcp
import pmapi
, pmi
64 from cpmapi
import PM_CONTEXT_ARCHIVE
, PM_CONTEXT_HOST
, PM_CONTEXT_LOCAL
, PM_MODE_FORW
, PM_MODE_INTERP
, PM_ERR_TYPE
, PM_ERR_EOL
, PM_ERR_NAME
, PM_IN_NULL
, PM_SEM_COUNTER
, PM_TIME_MSEC
, PM_TIME_SEC
, PM_XTB_SET
65 from cpmapi
import PM_TYPE_32
, PM_TYPE_U32
, PM_TYPE_64
, PM_TYPE_U64
, PM_TYPE_FLOAT
, PM_TYPE_DOUBLE
, PM_TYPE_STRING
66 from cpmi
import PMI_ERR_DUPINSTNAME
68 if sys
.version_info
[0] >= 3:
72 DEFAULT_CONFIG
= "./pmrep.conf"
74 # Default field separators, config/time formats, missing/truncated values
76 CSVTIME
= "%Y-%m-%d %H:%M:%S"
86 OUTPUT_ARCHIVE
= "archive"
88 OUTPUT_STDOUT
= "stdout"
89 OUTPUT_ZABBIX
= "zabbix"
91 class ZabbixMetric(object):
92 """ A Zabbix metric """
93 def __init__(self
, host
, key
, value
, clock
):
100 return 'Metric(%r, %r, %r, %r)' % (self
.host
, self
.key
, self
.value
, self
.clock
)
102 def recv_from_zabbix(sock
, count
):
103 """ Receive a response from a Zabbix server. """
105 while len(buf
) < count
:
106 chunk
= sock
.recv(count
- len(buf
))
112 def send_to_zabbix(metrics
, zabbix_host
, zabbix_port
, timeout
=15):
113 """ Send a set of metrics to a Zabbix server. """
116 # Zabbix has a very fragile JSON parser, so we cannot use json to
117 # dump the whole packet
120 clock
= m
.clock
or time
.time()
121 metrics_data
.append(('\t\t{\n'
124 '\t\t\t"value":%s,\n'
125 '\t\t\t"clock":%.5f}') % (j(m
.host
), j(m
.key
), j(m
.value
), clock
))
127 '\t"request":"sender data",\n'
129 '}') % (',\n'.join(metrics_data
))
131 data_len
= struct
.pack('<Q', len(json_data
))
132 packet
= b
'ZBXD\1' + data_len
+ json_data
.encode('utf-8')
134 zabbix
= socket
.socket()
135 zabbix
.connect((zabbix_host
, zabbix_port
))
136 zabbix
.settimeout(timeout
)
137 # send metrics to zabbix
138 zabbix
.sendall(packet
)
139 # get response header from zabbix
140 resp_hdr
= recv_from_zabbix(zabbix
, 13)
141 if not bytes
.decode(resp_hdr
).startswith('ZBXD\1') or len(resp_hdr
) != 13:
142 # debug: write('Invalid Zabbix response len=%d' % len(resp_hdr))
144 resp_body_len
= struct
.unpack('<Q', resp_hdr
[5:])[0]
145 # get response body from zabbix
146 resp_body
= zabbix
.recv(resp_body_len
)
147 resp
= json
.loads(bytes
.decode(resp_body
))
148 # debug: write('Got response from Zabbix: %s' % resp)
149 if resp
.get('response') != 'success':
150 sys
.stderr
.write('Error response from Zabbix: %s', resp
)
153 except socket
.timeout
as err
:
154 sys
.stderr
.write("Zabbix connection timed out: " + str(err
))
159 class PMReporter(object):
160 """ Report PCP metrics """
163 """ Construct object, prepare for command line handling """
166 self
.format
= None # output format
167 self
.opts
= self
.options()
168 pmapi
.c_api
.pmSetOptionFlags(pmapi
.c_api
.PM_OPTFLAG_POSIX
) # RHBZ#1289912
170 # Configuration directives
171 self
.keys
= ('source', 'output', 'derived', 'header', 'unitinfo',
172 'globals', 'timestamp', 'samples', 'interval',
173 'delay', 'type', 'width', 'precision', 'delimiter',
174 'extheader', 'repeat_header', 'timefmt', 'interpol',
175 'count_scale', 'space_scale', 'time_scale', 'version',
176 'zabbix_server', 'zabbix_port', 'zabbix_host', 'zabbix_interval')
178 # Special command line switches
179 self
.argless
= ('-C', '--check', '-L', '--local-PMDA', '-H', '--no-header', '-U', '--no-unit-info', '-G', '--no-globals', '-p', '--timestamps', '-d', '--delay', '-r', '--raw', '-x', '--extended-header', '-u', '--no-interpol', '-z', '--hostzone')
180 self
.arghelp
= ('-?', '--help', '-V', '--version')
182 # The order of preference for parameters (as present):
183 # 1 - command line parameters
184 # 2 - parameters from configuration file(s)
185 # 3 - built-in defaults defined below
186 self
.config
= self
.set_config_file()
187 self
.version
= VERSION
188 self
.source
= "local:"
189 self
.output
= OUTPUT_STDOUT
198 self
.samples
= None # forever
199 self
.interval
= pmapi
.timeval(1) # 1 sec
200 self
.opts
.pmSetOptionInterval(str(1))
205 self
.precision
= 3 # .3f
206 self
.delimiter
= None
208 self
.repeat_header
= 0
211 self
.count_scale
= None
212 self
.space_scale
= None
213 self
.time_scale
= None
214 self
.can_scale
= None # PCP 3.9 compat
216 # Performance metrics store
218 # values - 0:label, 1:instance(s), 2:unit/scale, 3:type, 4:width
219 self
.metrics
= OrderedDict()
221 # Corresponding config file metric specifiers
222 self
.metricspec
= ('label', 'instance', 'unit', 'type', 'width', 'formula')
233 self
.zabbix_server
= None
234 self
.zabbix_port
= ZBXPORT
235 self
.zabbix_host
= None
236 self
.zabbix_interval
= None
237 self
.zabbix_prevsend
= None
238 self
.zabbix_metrics
= []
240 def set_config_file(self
):
241 """ Set configuration file """
242 config
= DEFAULT_CONFIG
244 # Possibly override the built-in default config file before
245 # parsing the rest of the command line parameters
246 args
= iter(sys
.argv
[1:])
248 if arg
in self
.arghelp
:
250 if arg
== '-c' or arg
== '--config':
253 if not os
.path
.isfile(config
) or not os
.access(config
, os
.R_OK
):
254 raise IOError("Failed to read configuration file '%s'." % config
)
255 except StopIteration:
259 def set_attr(self
, name
, value
):
260 """ Helper to apply config file settings properly """
261 if value
in ('true', 'True', 'y', 'yes', 'Yes'):
263 if value
in ('false', 'False', 'n', 'no', 'No'):
266 try: # RHBZ#1270176 / PCP < 3.10.8
268 self
.opts
.pmSetOptionArchive(value
)
270 self
.opts
.pmSetOptionHost(value
) # RHBZ#1289911
272 sys
.stderr
.write("PCP 3.10.8 or later required for the 'source' directive.\n")
274 elif name
== 'samples':
275 self
.opts
.pmSetOptionSamples(value
)
276 self
.samples
= self
.opts
.pmGetOptionSamples()
277 elif name
== 'interval':
278 self
.opts
.pmSetOptionInterval(value
)
279 self
.interval
= self
.opts
.pmGetOptionInterval()
287 setattr(self
, name
, int(value
))
289 setattr(self
, name
, value
)
291 def read_config(self
):
292 """ Read options from configuration file """
293 if self
.config
is None:
295 config
= ConfigParser
.SafeConfigParser()
296 config
.read(self
.config
)
297 if not config
.has_section('options'):
299 for opt
in config
.options('options'):
301 self
.set_attr(opt
, config
.get('options', opt
))
303 sys
.stderr
.write("Invalid directive '%s' in %s.\n" % (opt
, self
.config
))
307 """ Setup default command line argument option handling """
308 opts
= pmapi
.pmOptions()
309 opts
.pmSetOptionCallback(self
.option
)
310 opts
.pmSetOverrideCallback(self
.option_override
)
311 opts
.pmSetShortOptions("a:h:LK:c:Co:F:e:D:V?HUGpA:S:T:O:s:t:Z:zdrw:P:l:xE:f:uq:b:y:")
312 opts
.pmSetShortUsage("[option...] metricspec [...]")
314 opts
.pmSetLongOptionHeader("General options")
315 opts
.pmSetLongOptionArchive() # -a/--archive
316 opts
.pmSetLongOptionArchiveFolio() # --archive-folio
317 opts
.pmSetLongOptionHost() # -h/--host
318 opts
.pmSetLongOptionLocalPMDA() # -L/--local-PMDA
319 opts
.pmSetLongOptionSpecLocal() # -K/--spec-local
320 opts
.pmSetLongOption("config", 1, "c", "FILE", "config file path")
321 opts
.pmSetLongOption("check", 0, "C", "", "check config and metrics and exit")
322 opts
.pmSetLongOption("output", 1, "o", "OUTPUT", "output target: archive, csv, stdout (default), or zabbix")
323 opts
.pmSetLongOption("output-file", 1, "F", "OUTFILE", "output file")
324 opts
.pmSetLongOption("derived", 1, "e", "FILE|DFNT", "derived metrics definitions")
325 #opts.pmSetLongOptionGuiMode() # -g/--guimode # RHBZ#1289910
326 opts
.pmSetLongOptionDebug() # -D/--debug
327 opts
.pmSetLongOptionVersion() # -V/--version
328 opts
.pmSetLongOptionHelp() # -?/--help
330 opts
.pmSetLongOptionHeader("Reporting options")
331 opts
.pmSetLongOption("no-header", 0, "H", "", "omit headers")
332 opts
.pmSetLongOption("no-unit-info", 0, "U", "", "omit unit info from headers")
333 opts
.pmSetLongOption("no-globals", 0, "G", "", "omit global metrics")
334 opts
.pmSetLongOption("timestamps", 0, "p", "", "print timestamps")
335 opts
.pmSetLongOptionAlign() # -A/--align
336 opts
.pmSetLongOptionStart() # -S/--start
337 opts
.pmSetLongOptionFinish() # -T/--finish
338 opts
.pmSetLongOptionOrigin() # -O/--origin
339 opts
.pmSetLongOptionSamples() # -s/--samples
340 opts
.pmSetLongOptionInterval() # -t/--interval
341 opts
.pmSetLongOptionTimeZone() # -Z/--timezone
342 opts
.pmSetLongOptionHostZone() # -z/--hostzone
343 opts
.pmSetLongOption("delay", 0, "d", "", "delay, pause between updates for archive replay")
344 opts
.pmSetLongOption("raw", 0, "r", "", "output raw counter values (no rate conversion)")
345 opts
.pmSetLongOption("width", 1, "w", "N", "default column width")
346 opts
.pmSetLongOption("precision", 1, "P", "N", "N digits after the decimal separator (if width enough)")
347 opts
.pmSetLongOption("delimiter", 1, "l", "STR", "delimiter to separate csv/stdout columns")
348 opts
.pmSetLongOption("extended-header", 0, "x", "", "display extended header")
349 opts
.pmSetLongOption("repeat-header", 1, "E", "N", "repeat stdout headers every N lines")
350 opts
.pmSetLongOption("timestamp-format", 1, "f", "STR", "strftime string for timestamp format")
351 opts
.pmSetLongOption("no-interpol", 0, "u", "", "disable interpolation mode with archives")
352 opts
.pmSetLongOption("count-scale", 1, "q", "SCALE", "default count unit")
353 opts
.pmSetLongOption("space-scale", 1, "b", "SCALE", "default space unit")
354 opts
.pmSetLongOption("time-scale", 1, "y", "SCALE", "default time unit")
358 def option_override(self
, opt
):
359 """ Override a few standard PCP options """
360 if opt
== 'H' or opt
== 'p':
364 def option(self
, opt
, optarg
, index
):
365 """ Perform setup for an individual command line option """
371 if optarg
== OUTPUT_ARCHIVE
:
372 self
.output
= OUTPUT_ARCHIVE
373 elif optarg
== OUTPUT_CSV
:
374 self
.output
= OUTPUT_CSV
375 elif optarg
== OUTPUT_STDOUT
:
376 self
.output
= OUTPUT_STDOUT
377 elif optarg
== OUTPUT_ZABBIX
:
378 self
.output
= OUTPUT_ZABBIX
380 sys
.stderr
.write("Invalid output target %s specified.\n" % optarg
)
383 if os
.path
.exists(optarg
+ ".index"):
384 sys
.stderr
.write("Archive %s already exists.\n" % optarg
)
386 if os
.path
.exists(optarg
):
387 sys
.stderr
.write("File %s already exists.\n" % optarg
)
389 self
.outfile
= optarg
391 self
.derived
= optarg
405 self
.width
= int(optarg
)
407 self
.precision
= int(optarg
)
409 self
.delimiter
= optarg
413 self
.repeat_header
= int(optarg
)
415 self
.timefmt
= optarg
419 self
.count_scale
= optarg
421 self
.space_scale
= optarg
423 self
.time_scale
= optarg
425 raise pmapi
.pmUsageErr()
427 def get_cmd_line_metrics(self
):
428 """ Get metric set specifications from the command line """
429 for arg
in sys
.argv
[1:]:
430 if arg
in self
.arghelp
:
433 for arg
in reversed(sys
.argv
[1:]):
434 if arg
.startswith('-'):
436 if arg
not in self
.argless
and '=' not in arg
:
443 def parse_metric_info(self
, metrics
, key
, value
):
444 """ Parse metric information """
445 # NB. Uses the config key, not the metric, as the dict key
447 # Compact / one-line definition
448 metrics
[key
] = (key
+ "," + value
).split(",")
450 # Verbose / multi-line definition
451 if not '.' in key
or key
.rsplit(".", 1)[1] not in self
.metricspec
:
453 metrics
[key
] = value
.split()
454 for index
in range(0, 6):
455 if len(metrics
[key
]) <= index
:
456 metrics
[key
].append(None)
459 key
, spec
= key
.rsplit(".", 1)
460 if key
not in metrics
:
461 sys
.stderr
.write("Undeclared metric key %s.\n" % key
)
463 if spec
== "formula":
464 if self
.derived
== None:
465 self
.derived
= metrics
[key
][0] + "=" + value
467 self
.derived
+= "," + metrics
[key
][0] + "=" + value
469 metrics
[key
][self
.metricspec
.index(spec
)+1] = value
471 def prepare_metrics(self
):
472 """ Construct and prepare the initial metrics set """
473 # Get direct and/or sets of metrics from the command line
474 metrics
= self
.get_cmd_line_metrics()
478 sys
.stderr
.write("No metrics specified.\n")
479 raise pmapi
.pmUsageErr()
481 # Don't rely on what get_cmd_line_metrics() might do
486 config
= ConfigParser
.SafeConfigParser()
487 config
.read(self
.config
)
489 # First read global metrics (if not disabled already)
490 globmet
= OrderedDict()
491 if self
.globals == 1:
492 if config
.has_section('global'):
493 parsemet
= OrderedDict()
494 for key
in config
.options('global'):
495 self
.parse_metric_info(parsemet
, key
, config
.get('global', key
))
496 for metric
in parsemet
:
497 name
= parsemet
[metric
][:1][0]
498 globmet
[name
] = parsemet
[metric
][1:]
500 # Add command line and configuration file metric sets
501 tempmet
= OrderedDict()
502 for metric
in metrics
:
503 if metric
.startswith(":"):
504 tempmet
[metric
[1:]] = None
506 m
= metric
.split(",")
507 tempmet
[m
[0]] = m
[1:]
509 # Get config and set details for configuration file metric sets
510 confmet
= OrderedDict()
512 if tempmet
[spec
] == None:
513 if config
.has_section(spec
):
514 parsemet
= OrderedDict()
515 for key
in config
.options(spec
):
517 self
.set_attr(key
, config
.get(spec
, key
))
519 self
.parse_metric_info(parsemet
, key
, config
.get(spec
, key
))
520 for metric
in parsemet
:
521 name
= parsemet
[metric
][:1][0]
522 confmet
[name
] = parsemet
[metric
][1:]
523 tempmet
[spec
] = confmet
525 raise IOError("Metric set definition '%s' not found." % metric
)
527 # Create the combined metrics set
528 if self
.globals == 1:
529 for metric
in globmet
:
530 self
.metrics
[metric
] = globmet
[metric
]
531 for metric
in tempmet
:
532 if type(tempmet
[metric
]) is list:
533 self
.metrics
[metric
] = tempmet
[metric
]
535 for m
in tempmet
[metric
]:
536 self
.metrics
[m
] = confmet
[m
]
538 def check_metric(self
, metric
):
539 """ Validate individual metric and get its details """
541 pmid
= self
.context
.pmLookupName(metric
)[0]
542 desc
= self
.context
.pmLookupDescs(pmid
)[0]
544 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
545 inst
= self
.context
.pmGetInDomArchive(desc
)
547 inst
= self
.context
.pmGetInDom(desc
) # disk.dev.read
549 inst
= ([PM_IN_NULL
], [None]) # pmcd.pmie.logfile
551 inst
= ([PM_IN_NULL
], [None]) # mem.util.free
552 # Reject unsupported types
553 mtype
= desc
.contents
.type
554 if not (mtype
== PM_TYPE_32
or
555 mtype
== PM_TYPE_U32
or
556 mtype
== PM_TYPE_64
or
557 mtype
== PM_TYPE_U64
or
558 mtype
== PM_TYPE_FLOAT
or
559 mtype
== PM_TYPE_DOUBLE
or
560 mtype
== PM_TYPE_STRING
):
561 raise pmapi
.pmErr(PM_ERR_TYPE
)
562 self
.pmids
.append(pmid
)
563 self
.descs
.append(desc
)
564 self
.insts
.append(inst
)
565 except pmapi
.pmErr
as error
:
566 sys
.stderr
.write("Invalid metric %s (%s).\n" % (metric
, str(error
)))
569 def validate_config(self
):
570 """ Validate configuration parameters """
571 if self
.version
!= VERSION
:
572 sys
.stderr
.write("Incompatible configuration file version (read v%s, need v%d).\n" % (self
.version
, VERSION
))
575 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
576 self
.source
= self
.opts
.pmGetOptionArchives()[0] # RHBZ#1262723
577 if self
.context
.type == PM_CONTEXT_HOST
:
578 self
.source
= self
.context
.pmGetContextHostName()
579 if self
.context
.type == PM_CONTEXT_LOCAL
:
580 self
.source
= "@" # PCPIntro(1), RHBZ#1289911
582 if self
.output
== OUTPUT_ARCHIVE
and not self
.outfile
:
583 sys
.stderr
.write("Archive must be defined with archive output.\n")
586 if self
.output
== OUTPUT_ZABBIX
and (not self
.zabbix_server
or \
587 not self
.zabbix_port
or not self
.zabbix_host
):
588 sys
.stderr
.write("zabbix_server, zabbix_port, and zabbix_host must be defined with Zabbix.\n")
591 # Runtime overrides samples/interval
592 if self
.opts
.pmGetOptionFinishOptarg():
593 self
.runtime
= int(float(self
.opts
.pmGetOptionFinish()) - float(self
.opts
.pmGetOptionStart()))
594 if self
.opts
.pmGetOptionSamples():
595 self
.samples
= self
.opts
.pmGetOptionSamples()
598 self
.interval
= float(self
.runtime
) / (self
.samples
- 1)
599 self
.opts
.pmSetOptionInterval(str(self
.interval
))
600 self
.interval
= self
.opts
.pmGetOptionInterval()
602 self
.interval
= self
.opts
.pmGetOptionInterval()
603 if int(self
.interval
) == 0:
604 sys
.stderr
.write("Interval can't be less than 1 second.\n")
606 self
.samples
= self
.runtime
/ int(self
.interval
) + 1
607 if int(self
.interval
) > self
.runtime
:
608 sys
.stderr
.write("Interval can't be longer than runtime.\n")
611 self
.samples
= self
.opts
.pmGetOptionSamples()
612 self
.interval
= self
.opts
.pmGetOptionInterval()
614 if self
.output
== OUTPUT_ZABBIX
:
615 if self
.zabbix_interval
:
616 self
.zabbix_interval
= int(pmapi
.timeval
.fromInterval(self
.zabbix_interval
))
617 if self
.zabbix_interval
< int(self
.interval
):
618 self
.zabbix_interval
= int(self
.interval
)
620 self
.zabbix_interval
= int(self
.interval
)
622 self
.can_scale
= "pmParseUnitsStr" in dir(self
.context
)
624 def validate_metrics(self
):
625 """ Validate the metrics set """
626 # Check the metrics against PMNS, resolve non-leaf metrics
628 if self
.derived
.startswith("/") or self
.derived
.startswith("."):
630 self
.context
.pmLoadDerivedConfig(self
.derived
)
631 except pmapi
.pmErr
as error
:
632 sys
.stderr
.write("Failed to register derived metric: %s.\n" % str(error
))
635 for definition
in self
.derived
.split(","):
638 name
, expr
= definition
.split("=")
639 self
.context
.pmLookupName(name
.strip())
640 except pmapi
.pmErr
as error
:
641 if error
.args
[0] == PM_ERR_NAME
:
642 self
.context
.pmRegisterDerived(name
.strip(), expr
.strip())
644 err
= error
.message()
645 except ValueError as error
:
646 err
= "Invalid syntax (expected metric=expression)"
647 except Exception as error
:
648 #err = self.context.pmDerivedErrStr() # RHBZ#1286733
649 err
= "Unknown reason"
652 sys
.stderr
.write("Failed to register derived metric: %s.\n" % err
)
654 # Prepare for non-leaf metrics
655 metrics
= self
.metrics
656 self
.metrics
= OrderedDict()
657 for metric
in metrics
:
660 self
.context
.pmTraversePMNS(metric
, self
.check_metric
)
661 if len(self
.pmids
) == l
+ 1:
663 if metric
== self
.context
.pmNameID(self
.pmids
[l
]):
664 self
.metrics
[metric
] = metrics
[metric
]
666 # But handle single non-leaf case in an archive
667 self
.metrics
[self
.context
.pmNameID(self
.pmids
[l
])] = []
670 for i
in range(l
, len(self
.pmids
)):
671 name
= self
.context
.pmNameID(self
.pmids
[i
])
672 # We ignore specs like disk.dm,,,MB on purpose, for now
673 self
.metrics
[name
] = []
674 except pmapi
.pmErr
as error
:
675 sys
.stderr
.write("Invalid metric %s (%s).\n" % (metric
, str(error
)))
678 # Finalize the metrics set
679 for i
, metric
in enumerate(self
.metrics
):
680 # Fill in all fields for easier checking later
681 for index
in range(0, 5):
682 if len(self
.metrics
[metric
]) <= index
:
683 self
.metrics
[metric
].append(None)
686 if not self
.metrics
[metric
][0]:
687 # mem.util.free -> m.u.free
689 for m
in metric
.split("."):
691 self
.metrics
[metric
][0] = name
[:-2] + m
694 if self
.metrics
[metric
][3] == 'raw' or self
.type == 1:
695 self
.metrics
[metric
][3] = 1
697 self
.metrics
[metric
][3] = 0
700 unitstr
= str(self
.descs
[i
].contents
.units
)
701 # Set default unit if not specified on per-metric basis
702 if not self
.metrics
[metric
][2]:
704 unit
= self
.descs
[i
].contents
.units
705 if self
.count_scale
and \
706 unit
.dimCount
== 1 and ( \
707 unit
.dimSpace
== 0 and
709 self
.metrics
[metric
][2] = self
.count_scale
711 if self
.space_scale
and \
712 unit
.dimSpace
== 1 and ( \
713 unit
.dimCount
== 0 and
715 self
.metrics
[metric
][2] = self
.space_scale
717 if self
.time_scale
and \
718 unit
.dimTime
== 1 and ( \
719 unit
.dimCount
== 0 and
721 self
.metrics
[metric
][2] = self
.time_scale
724 self
.metrics
[metric
][2] = unitstr
725 # Set unit/scale for non-raw numeric metrics
727 if self
.metrics
[metric
][3] == 0 and self
.can_scale
and \
728 self
.descs
[i
].contents
.type != PM_TYPE_STRING
:
729 (unitstr
, mult
) = self
.context
.pmParseUnitsStr(self
.metrics
[metric
][2])
730 label
= self
.metrics
[metric
][2]
731 if self
.descs
[i
].sem
== PM_SEM_COUNTER
:
733 if self
.descs
[i
].contents
.units
.dimTime
== 1:
735 self
.metrics
[metric
][2] = (label
, unitstr
, mult
)
737 self
.metrics
[metric
][2] = (unitstr
, unitstr
, 1)
738 except pmapi
.pmErr
as error
:
739 sys
.stderr
.write("%s: %s.\n" % (str(error
), self
.metrics
[metric
][2]))
742 # Set default width if not specified on per-metric basis
743 if self
.metrics
[metric
][4]:
744 self
.metrics
[metric
][4] = int(self
.metrics
[metric
][4])
745 elif self
.width
!= 0:
746 self
.metrics
[metric
][4] = self
.width
748 self
.metrics
[metric
][4] = len(self
.metrics
[metric
][0])
749 if self
.metrics
[metric
][4] < len(TRUNC
):
750 self
.metrics
[metric
][4] = len(TRUNC
) # Forced minimum
753 def pmids_to_ctypes(self
, pmids
):
754 """ Convert a Python list of pmids (numbers) to
755 a ctypes LP_c_uint (a C array of uints).
757 from ctypes
import c_uint
758 pmidA
= (c_uint
* len(pmids
))()
759 for i
, p
in enumerate(pmids
):
763 def get_mode_step(self
):
764 """ Get mode and step for pmSetMode """
765 if not self
.interpol
or self
.output
== OUTPUT_ARCHIVE
:
769 mode
= PM_MODE_INTERP
770 secs_in_24_days
= 2073600
771 if self
.interval
.tv_sec
> secs_in_24_days
:
772 step
= self
.interval
.tv_sec
773 mode |
= PM_XTB_SET(PM_TIME_SEC
)
775 step
= self
.interval
.tv_sec
*1000 + self
.interval
.tv_usec
/1000
776 mode |
= PM_XTB_SET(PM_TIME_MSEC
)
777 return (mode
, int(step
))
780 """ Using a PMAPI context (could be either host or archive),
781 fetch and report the requested set of values on stdout.
783 # Set output primitives
784 if self
.delimiter
== None:
785 if self
.output
== OUTPUT_CSV
:
786 self
.delimiter
= CSVSEP
788 self
.delimiter
= OUTSEP
791 if self
.opts
.pmGetOptionTimezone():
792 os
.environ
['TZ'] = self
.opts
.pmGetOptionTimezone()
794 self
.context
.pmNewZone(self
.opts
.pmGetOptionTimezone())
796 if self
.timefmt
== None:
797 if self
.output
== OUTPUT_CSV
:
798 self
.timefmt
= CSVTIME
800 self
.timefmt
= OUTTIME
804 if self
.context
.type != PM_CONTEXT_ARCHIVE
:
809 self
.prepare_writer()
810 if self
.output
== OUTPUT_STDOUT
:
811 self
.prepare_stdout()
813 # DBG_TRACE_APPL1 == 4096
814 if "pmDebug" in dir(self
.context
) and self
.context
.pmDebug(4096):
815 self
.writer
.write("Known config file keywords: " + str(self
.keys
) + "\n")
816 self
.writer
.write("Known metric spec keywords: " + str(self
.metricspec
) + "\n")
819 if self
.extheader
== 1:
821 self
.write_ext_header()
827 self
.repeat_header
= 0
833 # Archive fetching mode
834 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
835 (mode
, step
) = self
.get_mode_step()
836 self
.context
.pmSetMode(mode
, self
.opts
.pmGetOptionOrigin(), step
)
839 while self
.samples
!= 0:
840 if self
.output
== OUTPUT_STDOUT
:
841 if lines
> 1 and self
.repeat_header
== lines
:
847 result
= self
.context
.pmFetch(self
.pmids_to_ctypes(self
.pmids
))
848 except pmapi
.pmErr
as error
:
849 if error
.args
[0] == PM_ERR_EOL
:
853 self
.context
.pmSortInstances(result
) # XXX Is this really needed?
854 values
= self
.extract(result
)
855 if self
.ctstamp
== 0:
856 self
.ctstamp
= copy
.copy(result
.contents
.timestamp
)
857 self
.ptstamp
= self
.ctstamp
858 self
.ctstamp
= copy
.copy(result
.contents
.timestamp
)
860 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
861 if float(self
.ctstamp
) < float(self
.opts
.pmGetOptionStart()):
862 self
.context
.pmFreeResult(result
)
864 if float(self
.ctstamp
) > float(self
.opts
.pmGetOptionFinish()):
867 self
.report(self
.ctstamp
, values
)
868 self
.context
.pmFreeResult(result
)
869 if self
.samples
and self
.samples
> 0:
871 if self
.delay
and self
.interpol
and self
.samples
!= 0:
872 self
.context
.pmtimevalSleep(self
.interval
)
874 # Allow modules to flush buffered values / say goodbye
875 self
.report(None, None)
877 def extract(self
, result
):
878 """ Extract the metric values from pmResult structure """
879 # Metrics incl. all instance values, must match self.format on return
882 for i
, metric
in enumerate(self
.metrics
):
883 # Per-metric values incl. all instance values
884 # We use dict to make it easier to deal with gone/unknown instances
887 # Populate instance fields to have values for unavailable instances
888 # Values are (instance id, instance name, instance value)
889 for inst
in self
.insts
[i
][0]:
890 values
[i
][inst
] = (-1, None, NO_VAL
)
892 # No values available for this metric
893 if result
.contents
.get_numval(i
) == 0:
896 # Process all fetched instances
897 for j
in range(result
.contents
.get_numval(i
)):
898 inst
= result
.contents
.get_inst(i
, j
)
900 # Locate the correct instance and its position
902 if inst
not in self
.insts
[i
][0]:
903 # Ignore newly emerged instances
906 while inst
!= self
.insts
[i
][0][k
]:
909 # Extract and scale the value
911 # Use native type if no rescaling needed
912 if self
.metrics
[metric
][2][2] == 1 and \
913 str(self
.descs
[i
].contents
.units
) == \
914 str(self
.metrics
[metric
][2][1]):
916 vtype
= self
.descs
[i
].contents
.type
919 vtype
= PM_TYPE_DOUBLE
921 atom
= self
.context
.pmExtractValue(
922 result
.contents
.get_valfmt(i
),
923 result
.contents
.get_vlist(i
, j
),
924 self
.descs
[i
].contents
.type,
927 if self
.metrics
[metric
][3] != 1 and rescale
and \
928 self
.descs
[i
].contents
.type != PM_TYPE_STRING
and \
930 atom
= self
.context
.pmConvScale(
933 self
.metrics
[metric
][2][1])
935 val
= atom
.dref(vtype
)
936 if rescale
and self
.can_scale
and \
937 self
.descs
[i
].contents
.type != PM_TYPE_STRING
:
938 val
*= self
.metrics
[metric
][2][2]
939 val
= int(val
) if val
== int(val
) else val
942 values
[i
][inst
] = (inst
, self
.insts
[i
][1][k
], val
)
944 values
[i
][PM_IN_NULL
] = (-1, None, val
)
946 except pmapi
.pmErr
as error
:
947 sys
.stderr
.write("%s: %s, aborting.\n" % (metric
, str(error
)))
950 # Convert dicts to lists
953 vals
.append(v
.values())
956 # Store current and previous values
957 # Output modules need to handle non-existing self.prevvals
958 self
.prevvals
= self
.currvals
959 self
.currvals
= values
961 return values
# XXX Redundant now
963 def report(self
, tstamp
, values
):
964 """ Report the metric values """
966 ts
= self
.context
.pmLocaltime(tstamp
.tv_sec
)
967 us
= int(tstamp
.tv_usec
)
968 dt
= datetime(ts
.tm_year
+1900, ts
.tm_mon
+1, ts
.tm_mday
,
969 ts
.tm_hour
, ts
.tm_min
, ts
.tm_sec
, us
, None)
970 tstamp
= dt
.strftime(self
.timefmt
)
972 if self
.output
== OUTPUT_ARCHIVE
:
973 self
.write_archive(tstamp
, values
)
974 if self
.output
== OUTPUT_CSV
:
975 self
.write_csv(tstamp
, values
)
976 if self
.output
== OUTPUT_STDOUT
:
977 self
.write_stdout(tstamp
, values
)
978 if self
.output
== OUTPUT_ZABBIX
:
979 self
.write_zabbix(tstamp
, values
)
981 def prepare_writer(self
):
982 """ Prepare generic stdout writer """
984 if self
.output
== OUTPUT_ARCHIVE
or \
985 self
.output
== OUTPUT_ZABBIX
or \
986 self
.outfile
== None:
987 self
.writer
= sys
.stdout
989 self
.writer
= open(self
.outfile
, 'wt')
992 def prepare_stdout(self
):
993 """ Prepare stdout output """
995 if self
.timestamp
== 0:
996 #self.format = "{:}{}"
997 self
.format
= "{0:}{1}"
1000 tstamp
= datetime
.fromtimestamp(time
.time()).strftime(self
.timefmt
)
1001 #self.format = "{:" + str(len(tstamp)) + "}{}"
1002 self
.format
= "{" + str(index
) + ":" + str(len(tstamp
)) + "}"
1004 self
.format
+= "{" + str(index
) + "}"
1006 for i
, metric
in enumerate(self
.metrics
):
1007 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1008 for _
in range(ins
):
1009 l
= str(self
.metrics
[metric
][4])
1010 #self.format += "{:>" + l + "." + l + "}{}"
1011 self
.format
+= "{" + str(index
) + ":>" + l
+ "." + l
+ "}"
1013 self
.format
+= "{" + str(index
) + "}"
1015 #self.format = self.format[:-2]
1016 l
= len(str(index
-1)) + 2
1017 self
.format
= self
.format
[:-l
]
1019 def write_ext_header(self
):
1020 """ Write extended header """
1021 comm
= "#" if self
.output
== OUTPUT_CSV
else ""
1023 if self
.runtime
!= -1:
1024 duration
= self
.runtime
1025 samples
= self
.samples
1028 duration
= (self
.samples
- 1) * int(self
.interval
)
1029 samples
= self
.samples
1030 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1031 if not self
.interpol
:
1032 samples
= str(samples
) + " (requested)"
1034 duration
= int(float(self
.opts
.pmGetOptionFinish()) - float(self
.opts
.pmGetOptionStart()))
1035 samples
= (duration
/ int(self
.interval
)) + 1
1036 duration
= (samples
- 1) * int(self
.interval
)
1037 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1038 if not self
.interpol
:
1040 endtime
= float(self
.opts
.pmGetOptionStart()) + duration
1042 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1043 host
= self
.context
.pmGetArchiveLabel().hostname
1044 if not self
.interpol
and not self
.opts
.pmGetOptionFinish():
1045 endtime
= self
.context
.pmGetArchiveEnd()
1046 if self
.context
.type == PM_CONTEXT_HOST
:
1048 if self
.context
.type == PM_CONTEXT_LOCAL
:
1049 host
= "localhost, using DSO PMDAs"
1051 # Figure out the current timezone using the PCP convention
1052 if self
.opts
.pmGetOptionTimezone():
1053 currtz
= self
.opts
.pmGetOptionTimezone()
1055 dst
= time
.localtime().tm_isdst
1056 offset
= time
.altzone
if dst
else time
.timezone
1057 currtz
= time
.tzname
[dst
]
1059 currtz
+= str(offset
/3600)
1062 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1063 if self
.context
.pmGetArchiveLabel().tz
!= timezone
:
1064 timezone
= self
.context
.pmGetArchiveLabel().tz
1065 timezone
+= " (creation, current is " + currtz
+ ")"
1067 self
.writer
.write(comm
+ "\n")
1068 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1069 self
.writer
.write(comm
+ " archive: " + self
.source
+ "\n")
1070 self
.writer
.write(comm
+ " host: " + host
+ "\n")
1071 self
.writer
.write(comm
+ " timezone: " + timezone
+ "\n")
1072 self
.writer
.write(comm
+ " start: " + time
.asctime(time
.localtime(self
.opts
.pmGetOptionStart())) + "\n")
1073 self
.writer
.write(comm
+ " end: " + time
.asctime(time
.localtime(endtime
)) + "\n")
1074 self
.writer
.write(comm
+ " metrics: " + str(len(self
.pmids
)) + "\n")
1075 self
.writer
.write(comm
+ " samples: " + str(samples
) + "\n")
1076 if not (self
.context
.type == PM_CONTEXT_ARCHIVE
and not self
.interpol
):
1077 self
.writer
.write(comm
+ " interval: " + str(float(self
.interval
)) + " sec\n")
1078 self
.writer
.write(comm
+ " duration: " + str(duration
) + " sec\n")
1080 self
.writer
.write(comm
+ " interval: N/A\n")
1081 self
.writer
.write(comm
+ " duration: N/A\n")
1082 self
.writer
.write(comm
+ "\n")
1084 def write_header(self
):
1085 """ Write metrics header """
1086 if self
.output
== OUTPUT_ARCHIVE
:
1087 self
.writer
.write("Recording %d metrics to %s" % (len(self
.pmids
), self
.outfile
))
1088 if self
.runtime
!= -1:
1089 self
.writer
.write(":\n%s samples(s) with %.1f sec interval ~ %d sec duration.\n" % (self
.samples
, float(self
.interval
), self
.runtime
))
1091 duration
= (self
.samples
- 1) * int(self
.interval
)
1092 self
.writer
.write(":\n%s samples(s) with %.1f sec interval ~ %d sec duration.\n" % (self
.samples
, float(self
.interval
), duration
))
1094 self
.writer
.write("...")
1095 if self
.context
.type != PM_CONTEXT_ARCHIVE
:
1096 self
.writer
.write(" (Ctrl-C to stop)")
1097 self
.writer
.write("\n")
1100 if self
.output
== OUTPUT_CSV
:
1101 self
.writer
.write("Time")
1102 for metric
in self
.metrics
:
1103 self
.writer
.write(self
.delimiter
+ metric
)
1104 self
.writer
.write("\n")
1106 if self
.output
== OUTPUT_STDOUT
:
1107 names
= ["", self
.delimiter
] # no timestamp on header line
1108 insts
= ["", self
.delimiter
] # no timestamp on instances line
1109 units
= ["", self
.delimiter
] # no timestamp on units line
1111 for i
, metric
in enumerate(self
.metrics
):
1112 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1113 prnti
= 1 if self
.insts
[i
][0][0] != PM_IN_NULL
else 0
1114 for j
in range(ins
):
1115 names
.append(self
.metrics
[metric
][0])
1116 names
.append(self
.delimiter
)
1117 units
.append(self
.metrics
[metric
][2][0])
1118 units
.append(self
.delimiter
)
1120 insts
.append(self
.insts
[i
][1][j
])
1122 insts
.append(self
.delimiter
)
1123 insts
.append(self
.delimiter
)
1127 self
.writer
.write(self
.format
.format(*tuple(names
)) + "\n")
1129 self
.writer
.write(self
.format
.format(*tuple(insts
)) + "\n")
1131 self
.writer
.write(self
.format
.format(*tuple(units
)) + "\n")
1133 if self
.output
== OUTPUT_ZABBIX
:
1134 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1137 self
.zabbix_interval
= 250 # See zabbix_sender(8)
1138 self
.writer
.write("Sending %d archived metrics to Zabbix server %s...\n(Ctrl-C to stop)\n" % (len(self
.pmids
), self
.zabbix_server
))
1141 self
.writer
.write("Sending %d metrics to Zabbix server %s every %d sec" % (len(self
.pmids
), self
.zabbix_server
, self
.zabbix_interval
))
1142 if self
.runtime
!= -1:
1143 self
.writer
.write(":\n%s samples(s) with %.1f sec interval ~ %d sec runtime.\n" % (self
.samples
, float(self
.interval
), self
.runtime
))
1145 duration
= (self
.samples
- 1) * int(self
.interval
)
1146 self
.writer
.write(":\n%s samples(s) with %.1f sec interval ~ %d sec runtime.\n" % (self
.samples
, float(self
.interval
), duration
))
1148 self
.writer
.write("...\n(Ctrl-C to stop)\n")
1150 def write_archive(self
, timestamp
, values
):
1151 """ Write an archive record """
1152 if timestamp
== None and values
== None:
1153 # Complete and close
1158 if self
.pmi
== None:
1159 # Create a new archive
1160 self
.pmi
= pmi
.pmiLogImport(self
.outfile
)
1161 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1162 self
.pmi
.pmiSetHostname(self
.context
.pmGetArchiveLabel().hostname
)
1163 self
.pmi
.pmiSetTimezone(self
.context
.pmGetArchiveLabel().tz
)
1164 for i
, metric
in enumerate(self
.metrics
):
1165 self
.pmi
.pmiAddMetric(metric
,
1167 self
.descs
[i
].contents
.type,
1168 self
.descs
[i
].contents
.indom
,
1169 self
.descs
[i
].contents
.sem
,
1170 self
.descs
[i
].contents
.units
)
1171 ins
= 0 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1173 for j
in range(ins
):
1174 self
.pmi
.pmiAddInstance(self
.descs
[i
].contents
.indom
, self
.insts
[i
][1][j
], self
.insts
[i
][0][j
])
1175 except pmi
.pmiErr
as error
:
1176 if error
.args
[0] == PMI_ERR_DUPINSTNAME
:
1179 # Add current values
1181 for i
, metric
in enumerate(self
.metrics
):
1182 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1183 for j
in range(ins
):
1184 if str(list(values
[i
])[j
][2]) != NO_VAL
:
1186 inst
= self
.insts
[i
][1][j
]
1187 if inst
== None: # RHBZ#1285371
1189 if self
.descs
[i
].contents
.type == PM_TYPE_STRING
:
1190 self
.pmi
.pmiPutValue(metric
, inst
, str(values
[i
][j
][2]))
1191 elif self
.descs
[i
].contents
.type == PM_TYPE_FLOAT
or \
1192 self
.descs
[i
].contents
.type == PM_TYPE_DOUBLE
:
1193 self
.pmi
.pmiPutValue(metric
, inst
, "%f" % values
[i
][j
][2])
1195 self
.pmi
.pmiPutValue(metric
, inst
, "%d" % values
[i
][j
][2])
1199 # pylint: disable=maybe-no-member
1200 self
.pmi
.pmiWrite(self
.ctstamp
.tv_sec
, self
.ctstamp
.tv_usec
)
1202 def write_csv(self
, timestamp
, values
):
1203 """ Write results in CSV format """
1204 if timestamp
== None and values
== None:
1210 for i
, metric
in enumerate(self
.metrics
):
1211 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1212 for j
in range(ins
):
1213 line
+= self
.delimiter
1214 if type(list(values
[i
])[j
][2]) is float:
1215 fmt
= "." + str(self
.precision
) + "f"
1216 line
+= format(list(values
[i
])[j
][2], fmt
)
1218 line
+= str(list(values
[i
])[j
][2])
1219 self
.writer
.write(line
+ "\n")
1221 def write_stdout(self
, timestamp
, values
):
1222 """ Write a line to stdout """
1223 if timestamp
== None and values
== None:
1227 #fmt = self.format.split("{}")
1228 fmt
= re
.split("{\\d+}", self
.format
)
1231 if self
.timestamp
== 0:
1234 line
.append(timestamp
)
1235 line
.append(self
.delimiter
)
1238 for i
, metric
in enumerate(self
.metrics
):
1239 l
= self
.metrics
[metric
][4]
1241 for j
in range(len(values
[i
])):
1245 if not self
.metrics
[metric
][3] and \
1246 (self
.prevvals
== None or list(self
.prevvals
[i
])[j
][2] == NO_VAL
):
1247 # Rate not yet possible
1249 elif self
.metrics
[metric
][3] or \
1250 self
.descs
[i
].sem
!= PM_SEM_COUNTER
or \
1251 list(values
[i
])[j
][2] == NO_VAL
:
1253 value
= list(values
[i
])[j
][2]
1257 if self
.descs
[i
].contents
.units
.dimTime
!= 0:
1258 if self
.descs
[i
].contents
.units
.scaleTime
> PM_TIME_SEC
:
1259 scale
= pow(60, (PM_TIME_SEC
- self
.descs
[i
].contents
.units
.scaleTime
))
1261 scale
= pow(1000, (PM_TIME_SEC
- self
.descs
[i
].contents
.units
.scaleTime
))
1262 delta
= scale
* (float(self
.ctstamp
) - float(self
.ptstamp
))
1263 value
= (list(values
[i
])[j
][2] - list(self
.prevvals
[i
])[j
][2]) / delta
if delta
else 0
1265 # Make sure the value fits
1266 if type(value
) is int or type(value
) is long:
1267 if len(str(value
)) > l
:
1270 #fmt[k] = "{:" + str(l) + "d}"
1271 fmt
[k
] = "{X:" + str(l
) + "d}"
1273 if type(value
) is float:
1275 s
= len(str(int(value
)))
1279 #for _ in reversed(range(c+1)):
1280 #t = "{:" + str(l) + "." + str(c) + "f}"
1281 for f
in reversed(range(c
+1)):
1282 r
= "{X:" + str(l
) + "." + str(c
) + "f}"
1283 t
= "{0:" + str(l
) + "." + str(c
) + "f}"
1284 if len(t
.format(value
)) > l
:
1292 line
.append(self
.delimiter
)
1295 #self.writer.write('{}'.join(fmt).format(*tuple(line)) + "\n")
1299 nfmt
+= f
.replace("{X:", "{" + str(index
) + ":")
1301 nfmt
+= "{" + str(index
) + "}"
1303 l
= len(str(index
-1)) + 2
1305 self
.writer
.write(nfmt
.format(*tuple(line
)) + "\n")
1307 def write_zabbix(self
, timestamp
, values
):
1308 """ Write (send) metrics to a Zabbix server """
1309 if timestamp
== None and values
== None:
1310 # Send any remaining buffered values
1311 if self
.zabbix_metrics
:
1312 send_to_zabbix(self
.zabbix_metrics
, self
.zabbix_server
, self
.zabbix_port
)
1313 self
.zabbix_metrics
= []
1316 # Collect the results
1317 ts
= float(self
.ctstamp
)
1318 if self
.zabbix_prevsend
== None:
1319 self
.zabbix_prevsend
= ts
1320 for i
, metric
in enumerate(self
.metrics
):
1321 ins
= 1 if self
.insts
[i
][0][0] == PM_IN_NULL
else len(self
.insts
[i
][0])
1322 for j
in range(ins
):
1323 key
= ZBXPRFX
+ metric
1324 if self
.insts
[i
][1][j
]:
1325 key
+= "[" + str(self
.insts
[i
][1][j
]) + "]"
1326 val
= str(list(values
[i
])[j
][2])
1327 self
.zabbix_metrics
.append(ZabbixMetric(self
.zabbix_host
, key
, val
, ts
))
1330 if self
.context
.type == PM_CONTEXT_ARCHIVE
:
1331 if len(self
.zabbix_metrics
) >= self
.zabbix_interval
:
1332 send_to_zabbix(self
.zabbix_metrics
, self
.zabbix_server
, self
.zabbix_port
)
1333 self
.zabbix_metrics
= []
1334 elif ts
- self
.zabbix_prevsend
> self
.zabbix_interval
:
1335 send_to_zabbix(self
.zabbix_metrics
, self
.zabbix_server
, self
.zabbix_port
)
1336 self
.zabbix_metrics
= []
1337 self
.zabbix_prevsend
= ts
1340 """ Establish a PMAPI context to archive, host or local, via args """
1341 self
.context
= pmapi
.pmContext
.fromOptions(self
.opts
, sys
.argv
)
1344 """ Finalize and clean up """
1352 if __name__
== '__main__':
1359 P
.validate_metrics()
1363 except pmapi
.pmErr
as error
:
1364 sys
.stderr
.write('%s: %s\n' % (error
.progname(), error
.message()))
1365 except pmapi
.pmUsageErr
as usage
:
1367 except IOError as error
:
1368 sys
.stderr
.write("%s\n" % str(error
))
1369 except KeyboardInterrupt:
1370 sys
.stdout
.write("\n")