Cleanup config.nodes_of
[check_mk.git] / cmk / special_agents / agent_kubernetes.py
blob351e152ee5a9eab2b39edc8a67fa649dc51414c0
1 #!/usr/bin/env python
2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
9 # | |
10 # | Copyright Mathias Kettner 2019 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
26 """
27 Special agent for monitoring Kubernetes clusters.
28 """
30 from __future__ import (
31 absolute_import,
32 division,
33 print_function,
36 import argparse
37 from collections import OrderedDict, MutableSequence
38 import functools
39 import itertools
40 import json
41 import logging
42 import operator
43 import os
44 import sys
45 import time
46 from typing import ( # pylint: disable=unused-import
47 Any, Dict, Generic, List, Mapping, Optional, TypeVar, Union,
50 from dateutil.parser import parse as parse_time
51 from kubernetes import client
52 from kubernetes.client.rest import ApiException
54 import cmk.utils.profile
55 import cmk.utils.password_store
58 class PathPrefixAction(argparse.Action):
59 def __call__(self, parser, namespace, values, option_string=None):
60 if not values:
61 return ''
62 path_prefix = '/' + values.strip('/')
63 setattr(namespace, self.dest, path_prefix)
66 def parse(args):
67 # type: (List[str]) -> argparse.Namespace
68 p = argparse.ArgumentParser(description=__doc__)
69 p.add_argument('--debug', action='store_true', help='Debug mode: raise Python exceptions')
70 p.add_argument(
71 '-v',
72 '--verbose',
73 action='count',
74 default=0,
75 help='Verbose mode (for even more output use -vvv)')
76 p.add_argument('host', metavar='HOST', help='Kubernetes host to connect to')
77 p.add_argument('--port', type=int, default=443, help='Port to connect to')
78 p.add_argument('--token', required=True, help='Token for that user')
79 p.add_argument(
80 '--infos',
81 type=lambda x: x.split(','),
82 required=True,
83 help='Comma separated list of items that should be fetched',
85 p.add_argument('--url-prefix', help='Custom URL prefix for Kubernetes API calls')
86 p.add_argument(
87 '--path-prefix',
88 default='',
89 action=PathPrefixAction,
90 help='Optional URL path prefix to prepend to Kubernetes API calls')
91 p.add_argument('--no-cert-check', action='store_true', help='Disable certificate verification')
92 p.add_argument(
93 '--profile',
94 metavar='FILE',
95 help='Profile the performance of the agent and write the output to a file')
97 arguments = p.parse_args(args)
98 return arguments
101 def setup_logging(verbosity):
102 # type: (int) -> None
103 if verbosity >= 3:
104 fmt = '%(levelname)s: %(name)s: %(filename)s: %(lineno)s: %(message)s'
105 lvl = logging.DEBUG
106 elif verbosity == 2:
107 fmt = '%(levelname)s: %(filename)s: %(lineno)s: %(message)s'
108 lvl = logging.DEBUG
109 elif verbosity == 1:
110 fmt = '%(levelname)s: %(funcName)s: %(message)s'
111 lvl = logging.INFO
112 else:
113 fmt = '%(levelname)s: %(message)s'
114 lvl = logging.WARNING
115 logging.basicConfig(level=lvl, format=fmt)
118 def parse_frac_prefix(value):
119 # type: (str) -> float
120 if value.endswith('m'):
121 return 0.001 * float(value[:-1])
122 return float(value)
125 def parse_memory(value):
126 # type: (str) -> float
127 if value.endswith('Ki'):
128 return 1024**1 * float(value[:-2])
129 if value.endswith('Mi'):
130 return 1024**2 * float(value[:-2])
131 if value.endswith('Gi'):
132 return 1024**3 * float(value[:-2])
133 if value.endswith('Ti'):
134 return 1024**4 * float(value[:-2])
135 if value.endswith('Pi'):
136 return 1024**5 * float(value[:-2])
137 if value.endswith('Ei'):
138 return 1024**6 * float(value[:-2])
140 if value.endswith('K') or value.endswith('k'):
141 return 1e3 * float(value[:-1])
142 if value.endswith('M'):
143 return 1e6 * float(value[:-1])
144 if value.endswith('G'):
145 return 1e9 * float(value[:-1])
146 if value.endswith('T'):
147 return 1e12 * float(value[:-1])
148 if value.endswith('P'):
149 return 1e15 * float(value[:-1])
150 if value.endswith('E'):
151 return 1e18 * float(value[:-1])
153 return float(value)
156 def left_join_dicts(initial, new, operation):
157 d = {}
158 for key, value in initial.iteritems():
159 if isinstance(value, dict):
160 d[key] = left_join_dicts(value, new.get(key, {}), operation)
161 else:
162 if key in new:
163 d[key] = operation(value, new[key])
164 else:
165 d[key] = value
166 return d
169 class Metadata(object):
170 def __init__(self, metadata):
171 # type: (Optional[client.V1ObjectMeta]) -> None
172 if metadata:
173 self.name = metadata.name
174 self.namespace = metadata.namespace
175 self.creation_timestamp = (time.mktime(metadata.creation_timestamp.utctimetuple())
176 if metadata.creation_timestamp else None)
177 self.labels = metadata.labels if metadata.labels else {}
178 else:
179 self.name = None
180 self.namespace = None
181 self.creation_timestamp = None
182 self.labels = {}
184 def matches(self, selectors):
185 if not selectors:
186 return False
188 for name, value in selectors.iteritems():
189 if name not in self.labels or self.labels[name] != value:
190 return False
191 return True
194 class Node(Metadata):
195 def __init__(self, node, stats):
196 # type: (client.V1Node, str) -> None
197 super(Node, self).__init__(node.metadata)
198 self._status = node.status
199 # kubelet replies statistics for the last 2 minutes with 10s
200 # intervals. We only need the latest state.
201 self.stats = eval(stats)['stats'][-1]
202 # The timestamps are returned in RFC3339Nano format which cannot be parsed
203 # by Pythons time module. Therefore we use dateutils parse function here.
204 self.stats['timestamp'] = time.mktime(parse_time(self.stats['timestamp']).utctimetuple())
206 @property
207 def conditions(self):
208 # type: () -> Optional[Dict[str, str]]
209 if not self._status:
210 return None
211 conditions = self._status.conditions
212 if not conditions:
213 return None
214 return {c.type: c.status for c in conditions}
216 @staticmethod
217 def zero_resources():
218 return {
219 'capacity': {
220 'cpu': 0.0,
221 'memory': 0.0,
222 'pods': 0,
224 'allocatable': {
225 'cpu': 0.0,
226 'memory': 0.0,
227 'pods': 0,
231 @property
232 def resources(self):
233 # type: () -> Dict[str, Dict[str, float]]
234 view = self.zero_resources()
235 if not self._status:
236 return view
237 capacity, allocatable = self._status.capacity, self._status.allocatable
238 if capacity:
239 view['capacity']['cpu'] += parse_frac_prefix(capacity.get('cpu', '0.0'))
240 view['capacity']['memory'] += parse_memory(capacity.get('memory', '0.0'))
241 view['capacity']['pods'] += int(capacity.get('pods', '0'))
242 if allocatable:
243 view['allocatable']['cpu'] += parse_frac_prefix(allocatable.get('cpu', '0.0'))
244 view['allocatable']['memory'] += parse_memory(allocatable.get('memory', '0.0'))
245 view['allocatable']['pods'] += int(allocatable.get('pods', '0'))
246 return view
249 class ComponentStatus(Metadata):
250 def __init__(self, status):
251 # type: (client.V1ComponentStatus) -> None
252 super(ComponentStatus, self).__init__(status.metadata)
253 self._conditions = status.conditions
255 @property
256 def conditions(self):
257 # type: () -> List[Dict[str, str]]
258 if not self._conditions:
259 return []
260 return [{'type': c.type, 'status': c.status} for c in self._conditions]
263 class Service(Metadata):
264 def __init__(self, service):
265 super(Service, self).__init__(service.metadata)
267 spec = service.spec
268 if spec:
269 # For details refer to:
270 # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ServiceSpec.md
272 # type may be: ExternalName, ClusterIP, NodePort, and LoadBalancer
273 self._type = spec.type
274 self._selector = spec.selector if spec.selector else {}
275 # cluster_ip may be: None (headless service), "" (no IP), or a str (valid IP)
276 self._cluster_ip = spec.cluster_ip
277 # only applies to type LoadBalancer
278 self._load_balancer_ip = spec.load_balancer_ip
279 self._ports = spec.ports if spec.ports else []
280 else:
281 self._type_ = None
282 self._selector = {}
283 self._cluster_ip = ""
284 self._load_balancer_ip = ""
285 self._ports = []
287 status = service.status
288 if status and status.load_balancer and status.load_balancer.ingress:
289 self._ingress = {
291 # for ingress points that are DNS based (typically AWS load-balancers)
292 'hostname': ingress.hostname,
293 # for ingress points that are IP based (typically GCE or OpenStack load-balancers)
294 'ip': ingress.ip
295 } for ingress in status.load_balancer.ingress
298 @property
299 def info(self):
300 return {
301 'type': self._type,
302 'cluster_ip': self._cluster_ip,
303 'load_balancer_ip': self._load_balancer_ip,
306 @property
307 def selector(self):
308 return self._selector
310 @property
311 def ports(self):
312 # port is the only field that is not optional
313 return {
314 port.name if port.name else port.port: {
315 'port': port.port,
316 'name': port.name,
317 'protocol': port.protocol,
318 'target_port': port.target_port,
319 'node_port': port.node_port,
320 } for port in self._ports
324 class Deployment(Metadata):
325 # TODO: include pods of the deployment?
326 def __init__(self, deployment):
327 # type: (client.V1Deployment) -> None
328 super(Deployment, self).__init__(deployment.metadata)
329 spec = deployment.spec
330 if spec:
331 self._paused = spec.paused
333 strategy = spec.strategy
334 if strategy:
335 self._strategy_type = strategy.type
336 rolling_update = strategy.rolling_update
337 if rolling_update:
338 self._max_surge = rolling_update.max_surge
339 self._max_unavailable = rolling_update.max_unavailable
340 else:
341 self._max_surge = None
342 self._max_unavailable = None
343 else:
344 self._strategy_type = None
345 self._max_surge = None
346 self._max_unavailable = None
347 else:
348 self._paused = None
349 self._strategy_type = None
350 self._max_surge = None
351 self._max_unavailable = None
353 status = deployment.status
354 if status:
355 self._ready_replicas = status.ready_replicas
356 self._replicas = status.replicas
357 else:
358 self._ready_replicas = None
359 self._replicas = None
361 @property
362 def replicas(self):
363 return {
364 'paused': self._paused,
365 'ready_replicas': self._ready_replicas,
366 'replicas': self._replicas,
367 'strategy_type': self._strategy_type,
368 'max_surge': self._max_surge,
369 'max_unavailable': self._max_unavailable,
373 class Pod(Metadata):
374 def __init__(self, pod):
375 # type: (client.V1Pod) -> None
376 super(Pod, self).__init__(pod.metadata)
377 spec = pod.spec
378 if spec:
379 self.node = spec.node_name
380 self.host_network = (spec.host_network if spec.host_network is not None else False)
381 self.dns_policy = spec.dns_policy
382 self._containers = spec.containers
383 else:
384 self.node = None
385 self.host_network = False
386 self.dns_policy = None
387 self._containers = []
389 status = pod.status
390 if status:
391 self.host_ip = status.host_ip
392 self.pod_ip = status.pod_ip
393 self.qos_class = status.qos_class
394 self._container_statuses = (status.container_statuses
395 if status.container_statuses else [])
396 else:
397 self.host_ip = None
398 self.pod_ip = None
399 self.qos_class = None
400 self._container_statuses = []
402 @staticmethod
403 def zero_resources():
404 return {
405 'limits': {
406 'cpu': 0.0,
407 'memory': 0.0,
409 'requests': {
410 'cpu': 0.0,
411 'memory': 0.0,
415 @property
416 def resources(self):
417 view = self.zero_resources()
418 for container in self._containers:
419 resources = container.resources
420 if not resources:
421 continue
422 limits = resources.limits
423 if limits:
424 view['limits']['cpu'] += parse_frac_prefix(limits.get('cpu', 'inf'))
425 view['limits']['memory'] += parse_memory(limits.get('memory', 'inf'))
426 else:
427 view['limits']['cpu'] += float('inf')
428 view['limits']['memory'] += float('inf')
429 requests = resources.requests
430 if requests:
431 view['requests']['cpu'] += parse_frac_prefix(requests.get('cpu', '0.0'))
432 view['requests']['memory'] += parse_memory(requests.get('memory', '0.0'))
433 return view
435 @property
436 def containers(self):
437 view = {
438 container.name: {
439 'image': container.image,
440 'image_pull_policy': container.image_pull_policy,
441 'ready': False,
442 'restart_count': 0,
443 'container_id': None,
444 'image_id': None,
445 } for container in self._containers
447 for container_status in self._container_statuses:
448 data = view[container_status.name]
449 data['ready'] = container_status.ready
450 data['restart_count'] = container_status.restart_count
451 data['container_id'] = (container_status.container_id.replace('docker://', '')
452 if container_status.container_id else '')
453 data['image_id'] = container_status.image_id
454 return view
456 @property
457 def info(self):
458 return {
459 'node': self.node,
460 'host_network': self.host_network,
461 'dns_policy': self.dns_policy,
462 'host_ip': self.host_ip,
463 'pod_ip': self.pod_ip,
464 'qos_class': self.qos_class,
468 class Namespace(Metadata):
469 # TODO: namespaces may have resource quotas and limits
470 # https://kubernetes.io/docs/tasks/administer-cluster/namespaces/
471 def __init__(self, namespace):
472 # type: (client.V1Namespace) -> None
473 super(Namespace, self).__init__(namespace.metadata)
474 self._status = namespace.status
476 @property
477 def phase(self):
478 # type: () -> Optional[str]
479 if self._status:
480 return self._status.phase
481 return None
484 class PersistentVolume(Metadata):
485 def __init__(self, pv):
486 # type: (client.V1PersistentVolume) -> None
487 super(PersistentVolume, self).__init__(pv.metadata)
488 self._status = pv.status
489 self._spec = pv.spec
491 @property
492 def access_modes(self):
493 # type: () -> Optional[List[str]]
494 if self._spec:
495 return self._spec.access_modes
496 return None
498 @property
499 def capacity(self):
500 # type: () -> Optional[float]
501 if not self._spec or not self._spec.capacity:
502 return None
503 storage = self._spec.capacity.get('storage')
504 if storage:
505 return parse_memory(storage)
506 return None
508 @property
509 def phase(self):
510 # type: () -> Optional[str]
511 if self._status:
512 return self._status.phase
513 return None
516 class PersistentVolumeClaim(Metadata):
517 def __init__(self, pvc):
518 # type: (client.V1PersistentVolumeClaim) -> None
519 super(PersistentVolumeClaim, self).__init__(pvc.metadata)
520 self._status = pvc.status
521 self._spec = pvc.spec
523 @property
524 def conditions(self):
525 # type: () -> Optional[client.V1PersistentVolumeClaimCondition]
526 # TODO: don't return client specific object
527 if self._status:
528 return self._status.conditions
529 return None
531 @property
532 def phase(self):
533 # type: () -> Optional[str]
534 if self._status:
535 return self._status.phase
536 return None
538 @property
539 def volume_name(self):
540 # type: () -> Optional[str]
541 if self._spec:
542 return self._spec.volume_name
543 return None
546 class StorageClass(Metadata):
547 def __init__(self, storage_class):
548 # type: (client.V1StorageClass) -> None
549 super(StorageClass, self).__init__(storage_class.metadata)
550 self.provisioner = storage_class.provisioner
551 self.reclaim_policy = storage_class.reclaim_policy
554 class Role(Metadata):
555 def __init__(self, role):
556 # type: (Union[client.V1Role, client.V1ClusterRole]) -> None
557 super(Role, self).__init__(role.metadata)
560 ListElem = TypeVar('ListElem', bound=Metadata)
563 class K8sList(Generic[ListElem], MutableSequence):
564 def __init__(self, elements):
565 # type: (List[ListElem]) -> None
566 super(K8sList, self).__init__()
567 self._elements = elements
569 def __getitem__(self, index):
570 return self._elements[index]
572 def __setitem__(self, index, value):
573 self._elements.__setitem__(index, value)
575 def __delitem__(self, index):
576 self._elements.__delitem__(index)
578 def __len__(self):
579 # type: () -> int
580 return len(self._elements)
582 def insert(self, index, value):
583 self._elements.insert(index, value)
585 def labels(self):
586 return {item.name: item.labels for item in self}
588 def group_by(self, selectors):
589 grouped = {}
590 for element in self:
591 for name, selector in selectors.iteritems():
592 if element.matches(selector):
593 grouped.setdefault(name, self.__class__(elements=[])).append(element)
594 return grouped
597 class NodeList(K8sList[Node]):
598 def list_nodes(self):
599 # type: () -> Dict[str, List[str]]
600 return {'nodes': [node.name for node in self if node.name]}
602 def conditions(self):
603 # type: () -> Dict[str, Dict[str, str]]
604 return {node.name: node.conditions for node in self if node.name and node.conditions}
606 def resources(self):
607 # type: () -> Dict[str, Dict[str, Dict[str, Optional[float]]]]
608 return {node.name: node.resources for node in self if node.name}
610 def stats(self):
611 return {node.name: node.stats for node in self if node.name}
613 def total_resources(self):
614 merge = functools.partial(left_join_dicts, operation=operator.add)
615 return reduce(merge, self.resources().itervalues())
617 def cluster_stats(self):
618 stats = self.stats()
619 merge = functools.partial(left_join_dicts, operation=operator.add)
620 result = reduce(merge, stats.itervalues())
621 # During the merging process the sum of all timestamps is calculated.
622 # To obtain the average time of all nodes devide by the number of nodes.
623 result['timestamp'] = round(result['timestamp'] / len(stats), 1)
624 return result
627 class ComponentStatusList(K8sList[ComponentStatus]):
628 def list_statuses(self):
629 # type: () -> Dict[str, List[Dict[str, str]]]
630 return {status.name: status.conditions for status in self if status.name}
633 class ServiceList(K8sList[Service]):
634 def infos(self):
635 return {service.name: service.info for service in self}
637 def selector(self):
638 return {service.name: service.selector for service in self}
640 def ports(self):
641 return {service.name: service.ports for service in self}
644 class DeploymentList(K8sList[Deployment]):
645 def replicas(self):
646 return {deployment.name: deployment.replicas for deployment in self}
649 class PodList(K8sList[Pod]):
650 def pods_per_node(self):
651 # type: () -> Dict[str, Dict[str, Dict[str, int]]]
652 pods_sorted = sorted(self, key=lambda pod: pod.node)
653 by_node = itertools.groupby(pods_sorted, lambda pod: pod.node)
654 return {
655 node: {
656 'requests': {
657 'pods': len(list(pods))
659 } for node, pods in by_node if node is not None
662 def pods_in_cluster(self):
663 return {'requests': {'pods': len(self)}}
665 def info(self):
666 return {pod.name: pod.info for pod in self}
668 def resources(self):
669 return {pod.name: pod.resources for pod in self}
671 def containers(self):
672 return {pod.name: pod.containers for pod in self}
674 def resources_per_node(self):
675 # type: () -> Dict[str, Dict[str, Dict[str, float]]]
677 Returns the limits and requests of all containers grouped by node. If at least
678 one container does not specify a limit, infinity is returned as the container
679 may consume any amount of resources.
682 pods_sorted = sorted(self, key=lambda pod: pod.node)
683 by_node = itertools.groupby(pods_sorted, lambda pod: pod.node)
684 merge = functools.partial(left_join_dicts, operation=operator.add)
685 return {
686 node: reduce(merge, [p.resources for p in pods], Pod.zero_resources())
687 for node, pods in by_node
688 if node is not None
691 def total_resources(self):
692 merge = functools.partial(left_join_dicts, operation=operator.add)
693 return reduce(merge, [p.resources for p in self], Pod.zero_resources())
696 class NamespaceList(K8sList[Namespace]):
697 def list_namespaces(self):
698 # type: () -> Dict[str, Dict[str, Dict[str, Optional[str]]]]
699 return {
700 namespace.name: {
701 'status': {
702 'phase': namespace.phase,
704 } for namespace in self if namespace.name
708 class PersistentVolumeList(K8sList[PersistentVolume]):
709 def list_volumes(self):
710 # type: () -> Dict[str, Dict[str, Union[Optional[List[str]], Optional[float], Dict[str, Optional[str]]]]]
711 # TODO: Output details of the different types of volumes
712 return {
713 pv.name: {
714 'access': pv.access_modes,
715 'capacity': pv.capacity,
716 'status': {
717 'phase': pv.phase,
719 } for pv in self if pv.name
723 class PersistentVolumeClaimList(K8sList[PersistentVolumeClaim]):
724 def list_volume_claims(self):
725 # type: () -> Dict[str, Dict[str, Any]]
726 # TODO: Fix "Any"
727 return {
728 pvc.name: {
729 'namespace': pvc.namespace,
730 'condition': pvc.conditions,
731 'phase': pvc.phase,
732 'volume': pvc.volume_name,
733 } for pvc in self if pvc.name
737 class StorageClassList(K8sList[StorageClass]):
738 def list_storage_classes(self):
739 # type: () -> Dict[Any, Dict[str, Any]]
740 # TODO: should be Dict[str, Dict[str, Optional[str]]]
741 return {
742 storage_class.name: {
743 'provisioner': storage_class.provisioner,
744 'reclaim_policy': storage_class.reclaim_policy
745 } for storage_class in self if storage_class.name
749 class RoleList(K8sList[Role]):
750 def list_roles(self):
751 return [{
752 'name': role.name,
753 'namespace': role.namespace,
754 'creation_timestamp': role.creation_timestamp
755 } for role in self if role.name]
758 class Metric(Metadata):
759 def __init__(self, metric):
760 # Initialize Metric objects without metadata for now, because
761 # the provided metadata only contains a selfLink and no other
762 # valuable information.
763 super(Metric, self).__init__(metadata=None)
764 self.from_object = metric['describedObject']
765 self.metrics = {metric['metricName']: metric.get('value')}
767 def __add__(self, other):
768 assert self.from_object == other.from_object
769 self.metrics.update(other.metrics)
770 return self
772 def __str__(self):
773 return str(self.__dict__)
775 def __repr__(self):
776 return str(self.__dict__)
779 class MetricList(K8sList[Metric]):
780 def __add__(self, other):
781 return MetricList([a + b for a, b in zip(self, other)])
783 def list_metrics(self):
784 return [item.__dict__ for item in self]
787 class PiggybackGroup(object):
789 A group of elements where an element is e.g. a piggyback host.
792 def __init__(self):
793 # type: () -> None
794 super(PiggybackGroup, self).__init__()
795 self._elements = OrderedDict() # type: OrderedDict[str, PiggybackHost]
797 def get(self, element_name):
798 # type: (str) -> PiggybackHost
799 if element_name not in self._elements:
800 self._elements[element_name] = PiggybackHost()
801 return self._elements[element_name]
803 def join(self, section_name, pairs):
804 # type: (str, Mapping[str, Dict[str, Any]]) -> PiggybackGroup
805 for element_name, data in pairs.iteritems():
806 section = self.get(element_name).get(section_name)
807 section.insert(data)
808 return self
810 def output(self, piggyback_prefix=""):
811 # type: (str) -> List[str]
812 # The names of elements may not be unique. Kubernetes guarantees e.g. that
813 # only one object of a given kind can have one one name at a time. I.e.
814 # there may only be one deployment with the name "foo", but there may exist
815 # a service with name "foo" as well.
816 # To obtain unique names for piggyback hosts it is therefore possible to
817 # specify a name prefix.
818 # see: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
819 data = []
820 for name, element in self._elements.iteritems():
821 data.append('<<<<%s>>>>' % (piggyback_prefix + name))
822 data.extend(element.output())
823 data.append('<<<<>>>>')
824 return data
827 class PiggybackHost(object):
829 An element that bundles a collection of sections.
832 def __init__(self):
833 # type: () -> None
834 super(PiggybackHost, self).__init__()
835 self._sections = OrderedDict() # type: OrderedDict[str, Section]
837 def get(self, section_name):
838 # type: (str) -> Section
839 if section_name not in self._sections:
840 self._sections[section_name] = Section()
841 return self._sections[section_name]
843 def output(self):
844 # type: () -> List[str]
845 data = []
846 for name, section in self._sections.iteritems():
847 data.append('<<<%s:sep(0)>>>' % name)
848 data.append(section.output())
849 return data
852 class Section(object):
854 An agent section.
857 def __init__(self):
858 # type: () -> None
859 super(Section, self).__init__()
860 self._content = OrderedDict() # type: OrderedDict[str, Dict[str, Any]]
862 def insert(self, data):
863 # type: (Dict[str, Any]) -> None
864 for key, value in data.iteritems():
865 if key not in self._content:
866 self._content[key] = value
867 else:
868 if isinstance(value, dict):
869 self._content[key].update(value)
870 else:
871 raise ValueError('Key %s is already present and cannot be merged' % key)
873 def output(self):
874 # type: () -> str
875 return json.dumps(self._content)
878 class ApiData(object):
880 Contains the collected API data.
883 def __init__(self, api_client):
884 # type: (client.ApiClient) -> None
885 super(ApiData, self).__init__()
886 logging.info('Collecting API data')
888 logging.debug('Constructing API client wrappers')
889 core_api = client.CoreV1Api(api_client)
890 storage_api = client.StorageV1Api(api_client)
891 rbac_authorization_api = client.RbacAuthorizationV1Api(api_client)
892 apps_api = client.ExtensionsV1beta1Api(api_client)
894 self.custom_api = client.CustomObjectsApi(api_client)
896 logging.debug('Retrieving data')
897 storage_classes = storage_api.list_storage_class()
898 namespaces = core_api.list_namespace()
899 roles = rbac_authorization_api.list_role_for_all_namespaces()
900 cluster_roles = rbac_authorization_api.list_cluster_role()
901 component_statuses = core_api.list_component_status()
902 nodes = core_api.list_node()
903 # Try to make it a post, when client api support sending post data
904 # include {"num_stats": 1} to get the latest only and use less bandwidth
905 nodes_stats = [
906 core_api.connect_get_node_proxy_with_path(node.metadata.name, "stats")
907 for node in nodes.items
909 pvs = core_api.list_persistent_volume()
910 pvcs = core_api.list_persistent_volume_claim_for_all_namespaces()
911 pods = core_api.list_pod_for_all_namespaces()
912 services = core_api.list_service_for_all_namespaces()
913 deployments = apps_api.list_deployment_for_all_namespaces()
915 logging.debug('Assigning collected data')
916 self.storage_classes = StorageClassList(map(StorageClass, storage_classes.items))
917 self.namespaces = NamespaceList(map(Namespace, namespaces.items))
918 self.roles = RoleList(map(Role, roles.items))
919 self.cluster_roles = RoleList(map(Role, cluster_roles.items))
920 self.component_statuses = ComponentStatusList(
921 map(ComponentStatus, component_statuses.items))
922 self.nodes = NodeList(map(Node, nodes.items, nodes_stats))
923 self.persistent_volumes = PersistentVolumeList(map(PersistentVolume, pvs.items))
924 self.persistent_volume_claims = PersistentVolumeClaimList(
925 map(PersistentVolumeClaim, pvcs.items))
926 self.pods = PodList(map(Pod, pods.items))
927 self.services = ServiceList(map(Service, services.items))
928 self.deployments = DeploymentList(map(Deployment, deployments.items))
930 pods_custom_metrics = {
931 "memory": ['memory_rss', 'memory_swap', 'memory_usage_bytes', 'memory_max_usage_bytes'],
932 "fs": ['fs_inodes', 'fs_reads', 'fs_writes', 'fs_limit_bytes', 'fs_usage_bytes'],
933 "cpu": ['cpu_system', 'cpu_user', 'cpu_usage']
936 self.pods_Metrics = dict() # type: Dict[str, Dict[str, List]]
937 for metric_group, metrics in pods_custom_metrics.items():
938 self.pods_Metrics[metric_group] = self.get_namespaced_group_metric(metrics)
940 def get_namespaced_group_metric(self, metrics):
941 # type: (List[str]) -> Dict[str, List]
942 queries = [self.get_namespaced_custom_pod_metric(metric) for metric in metrics]
944 grouped_metrics = {} # type: Dict[str, List]
945 for response in queries:
946 for namespace in response:
947 grouped_metrics.setdefault(namespace, []).append(response[namespace])
949 for namespace in grouped_metrics:
950 grouped_metrics[namespace] = reduce(operator.add,
951 grouped_metrics[namespace]).list_metrics()
953 return grouped_metrics
955 def get_namespaced_custom_pod_metric(self, metric):
956 # type: (str) -> Dict
958 logging.debug('Query Custom Metrics Endpoint: %s', metric)
959 custom_metric = {}
960 for namespace in self.namespaces:
961 try:
962 data = map(
963 Metric,
964 self.custom_api.get_namespaced_custom_object(
965 'custom.metrics.k8s.io',
966 'v1beta1',
967 namespace.name,
968 'pods/*',
969 metric,
970 )['items'])
971 custom_metric[namespace.name] = MetricList(data)
972 except ApiException as err:
973 if err.status == 404:
974 logging.info('Data unavailable. No pods in namespace %s', namespace.name)
975 elif err.status == 500:
976 logging.info('Data unavailable. %s', err)
977 else:
978 raise err
980 return custom_metric
982 def cluster_sections(self):
983 # type: () -> str
984 logging.info('Output cluster sections')
985 e = PiggybackHost()
986 e.get('k8s_nodes').insert(self.nodes.list_nodes())
987 e.get('k8s_namespaces').insert(self.namespaces.list_namespaces())
988 e.get('k8s_persistent_volumes').insert(self.persistent_volumes.list_volumes())
989 e.get('k8s_component_statuses').insert(self.component_statuses.list_statuses())
990 e.get('k8s_persistent_volume_claims').insert(
991 self.persistent_volume_claims.list_volume_claims())
992 e.get('k8s_storage_classes').insert(self.storage_classes.list_storage_classes())
993 e.get('k8s_roles').insert({'roles': self.roles.list_roles()})
994 e.get('k8s_roles').insert({'cluster_roles': self.cluster_roles.list_roles()})
995 e.get('k8s_resources').insert(self.nodes.total_resources())
996 e.get('k8s_resources').insert(self.pods.total_resources())
997 e.get('k8s_resources').insert(self.pods.pods_in_cluster())
998 e.get('k8s_stats').insert(self.nodes.cluster_stats())
999 return '\n'.join(e.output())
1001 def node_sections(self):
1002 # type: () -> str
1003 logging.info('Output node sections')
1004 g = PiggybackGroup()
1005 g.join('labels', self.nodes.labels())
1006 g.join('k8s_resources', self.nodes.resources())
1007 g.join('k8s_resources', self.pods.resources_per_node())
1008 g.join('k8s_resources', self.pods.pods_per_node())
1009 g.join('k8s_stats', self.nodes.stats())
1010 g.join('k8s_conditions', self.nodes.conditions())
1011 return '\n'.join(g.output(piggyback_prefix="node_"))
1013 def custom_metrics_section(self):
1014 # type: () -> str
1015 logging.info('Output pods custom metrics')
1016 e = PiggybackHost()
1017 for c_metric in self.pods_Metrics:
1018 e.get('k8s_pods_%s' % c_metric).insert(self.pods_Metrics[c_metric])
1019 return '\n'.join(e.output())
1021 def pod_sections(self):
1022 logging.info('Output pod sections')
1023 g = PiggybackGroup()
1024 g.join('labels', self.pods.labels())
1025 g.join('k8s_resources', self.pods.resources())
1026 g.join('k8s_pod_container', self.pods.containers())
1027 g.join('k8s_pod_info', self.pods.info())
1028 return '\n'.join(g.output(piggyback_prefix="pod_"))
1030 def service_sections(self):
1031 logging.info('Output service sections')
1032 g = PiggybackGroup()
1033 g.join('labels', self.services.labels())
1034 g.join('k8s_selector', self.services.selector())
1035 g.join('k8s_service_info', self.services.infos())
1036 g.join('k8s_service_port', self.services.ports())
1037 pod_names = {
1038 service_name: {
1039 'names': [pod.name for pod in pods]
1040 } for service_name, pods in self.pods.group_by(self.services.selector()).iteritems()
1042 g.join('k8s_assigned_pods', pod_names)
1043 return '\n'.join(g.output(piggyback_prefix="service_"))
1045 def deployment_sections(self):
1046 logging.info('Output node sections')
1047 g = PiggybackGroup()
1048 g.join('labels', self.deployments.labels())
1049 g.join('k8s_replicas', self.deployments.replicas())
1050 return '\n'.join(g.output(piggyback_prefix="deployment_"))
1053 def get_api_client(arguments):
1054 # type: (argparse.Namespace) -> client.ApiClient
1055 logging.info('Constructing API client')
1057 config = client.Configuration()
1058 if arguments.url_prefix:
1059 config.host = '%s:%s%s' % (arguments.url_prefix, arguments.port, arguments.path_prefix)
1060 else:
1061 config.host = 'https://%s:%s%s' % (arguments.host, arguments.port, arguments.path_prefix)
1063 config.api_key_prefix['authorization'] = 'Bearer'
1064 config.api_key['authorization'] = arguments.token
1066 if arguments.no_cert_check:
1067 logging.warn('Disabling SSL certificate verification')
1068 config.verify_ssl = False
1069 else:
1070 config.ssl_ca_cert = os.environ.get('REQUESTS_CA_BUNDLE')
1072 return client.ApiClient(config)
1075 def main(args=None):
1076 # type: (Optional[List[str]]) -> int
1077 if args is None:
1078 cmk.utils.password_store.replace_passwords()
1079 args = sys.argv[1:]
1080 arguments = parse(args)
1082 try:
1083 setup_logging(arguments.verbose)
1084 logging.debug('parsed arguments: %s\n', arguments)
1086 with cmk.utils.profile.Profile(
1087 enabled=bool(arguments.profile), profile_file=arguments.profile):
1088 api_client = get_api_client(arguments)
1089 api_data = ApiData(api_client)
1090 print(api_data.cluster_sections())
1091 print(api_data.custom_metrics_section())
1092 if 'nodes' in arguments.infos:
1093 print(api_data.node_sections())
1094 if 'pods' in arguments.infos:
1095 print(api_data.pod_sections())
1096 if 'deployments' in arguments.infos:
1097 print(api_data.deployment_sections())
1098 if 'services' in arguments.infos:
1099 print(api_data.service_sections())
1100 except Exception as e:
1101 if arguments.debug:
1102 raise
1103 print("%s" % e, file=sys.stderr)
1104 return 1
1105 return 0