Cleanup config.nodes_of
[check_mk.git] / cmk_base / snmp_utils.py
blob1baeb03df9150647c2d9d7e6e030d5a549a67f84
1 #!/usr/bin/python
2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
9 # | |
10 # | Copyright Mathias Kettner 2014 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
27 import abc
28 import functools
29 from typing import List, NamedTuple, Union, Tuple, Optional # pylint: disable=unused-import
31 OID_END = 0 # Suffix-part of OID that was not specified
32 OID_STRING = -1 # Complete OID as string ".1.3.6.1.4.1.343...."
33 OID_BIN = -2 # Complete OID as binary string "\x01\x03\x06\x01..."
34 OID_END_BIN = -3 # Same, but just the end part
35 OID_END_OCTET_STRING = -4 # yet same, but omit first byte (assuming that is the length byte)
38 def BINARY(oid):
39 """Tell Check_MK to process this OID as binary data to the check."""
40 return "binary", oid
43 def CACHED_OID(oid):
44 """Use this to mark OIDs as being cached for regular checks,
45 but not for discovery"""
46 return "cached", oid
49 def binstring_to_int(binstring):
50 """Convert a string to an integer.
52 This is done by consideren the string to by a little endian byte string.
53 Such strings are sometimes used by SNMP to encode 64 bit counters without
54 needed COUNTER64 (which is not available in SNMP v1)."""
55 value = 0
56 mult = 1
57 for byte in binstring[::-1]:
58 value += mult * ord(byte)
59 mult *= 256
60 return value
63 def is_snmpv3_host(snmp_config):
64 # type: (SNMPHostConfig) -> bool
65 return isinstance(snmp_config.credentials, tuple)
68 # TODO: Be more specific about the possible tuples
69 # if the credentials are a string, we use that as community,
70 # if it is a four-tuple, we use it as V3 auth parameters:
71 # (1) security level (-l)
72 # (2) auth protocol (-a, e.g. 'md5')
73 # (3) security name (-u)
74 # (4) auth password (-A)
75 # And if it is a six-tuple, it has the following additional arguments:
76 # (5) privacy protocol (DES|AES) (-x)
77 # (6) privacy protocol pass phrase (-X)
78 SNMPCommunity = str
79 # TODO: This does not work as intended
80 #SNMPv3NoAuthNoPriv = Tuple[str, str]
81 #SNMPv3AuthNoPriv = Tuple[str, str, str, str]
82 #SNMPv3AuthPriv = Tuple[str, str, str, str, str, str]
83 #SNMPCredentials = Union[SNMPCommunity, SNMPv3NoAuthNoPriv, SNMPv3AuthNoPriv, SNMPv3AuthPriv]
84 SNMPCredentials = Union[SNMPCommunity, Tuple[str, ...]]
86 # Wraps the configuration of a host into a single object for the SNMP code
87 SNMPHostConfig = NamedTuple(
88 "SNMPHostConfig",
90 ("is_ipv6_primary", bool),
91 ("hostname", str),
92 ("ipaddress", str),
93 ("credentials", SNMPCredentials),
94 ("port", int),
95 ("is_bulkwalk_host", bool),
96 ("is_snmpv2or3_without_bulkwalk_host", bool),
97 ("bulk_walk_size_of", int),
98 # TODO: Cleanup to named tuple
99 ("timing", dict),
100 ("oid_range_limits", list),
101 ("snmpv3_contexts", list),
102 ("character_encoding", Optional[str]),
103 ("is_usewalk_host", bool),
104 ("is_inline_snmp_host", bool),
107 SNMPRowInfo = List[Tuple[str, str]]
110 class ABCSNMPBackend(object):
111 __metaclass__ = abc.ABCMeta
113 @abc.abstractmethod
114 def get(self, snmp_config, oid, context_name=None):
115 # type: (SNMPHostConfig, str, Optional[str]) -> Optional[str]
116 """Fetch a single OID from the given host in the given SNMP context
118 The OID may end with .* to perform a GETNEXT request. Otherwise a GET
119 request is sent to the given host.
121 raise NotImplementedError()
123 @abc.abstractmethod
124 def walk(self, snmp_config, oid, check_plugin_name=None, table_base_oid=None,
125 context_name=None):
126 # type: (SNMPHostConfig, str, Optional[str], Optional[str], Optional[str]) -> SNMPRowInfo
127 return []
130 class MutexScanRegistry(object):
131 """Register scan functions that are checked before a fallback is used
133 Add any number of scan functions to a registry instance by decorating
134 them like this:
136 @mutex_scan_registry_instance.register
137 def my_snmp_scan_function(oid):
140 You can then declare a scan function to be a fallback to those functions
141 by decorating it with "@mutex_scan_registry_instance.as_fallback",
142 meaning that the fallback function will only be evaluated if all of the
143 scan functions registered earlier return something falsey.
146 def __init__(self):
147 super(MutexScanRegistry, self).__init__()
148 self._specific_scans = []
150 def _is_specific(self, oid):
151 return any(scan(oid) for scan in self._specific_scans)
153 def register(self, scan_function):
154 self._specific_scans.append(scan_function)
155 return scan_function
157 def as_fallback(self, scan_function):
158 @functools.wraps(scan_function)
159 def wrapper(oid):
160 if self._is_specific(oid):
161 return False
162 return scan_function(oid)
164 return wrapper