2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
10 # | Copyright Mathias Kettner 2014 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
27 # TODO: Refactor/document locking. It is not clear when and how to apply
28 # locks or when they are held by which component.
30 # TODO: Refactor events to be handled as objects, e.g. in case when
31 # creating objects. Or at least update the documentation. It is not clear
32 # which fields are mandatory for the events.
48 from typing
import Any
, Dict
, List
, Optional
, Tuple
, Union
# pylint: disable=unused-import
50 import pathlib2
as pathlib
54 import cmk
.utils
.daemon
55 import cmk
.utils
.defines
59 import cmk
.ec
.settings
62 import cmk
.utils
.paths
63 import cmk
.utils
.profile
64 import cmk
.utils
.render
65 import cmk
.utils
.regex
66 import cmk
.utils
.debug
68 # suppress "Cannot find module" error from mypy
69 import livestatus
# type: ignore
72 class SyslogPriority(object):
84 def __init__(self
, value
):
85 super(SyslogPriority
, self
).__init
__()
86 self
.value
= int(value
)
89 return "SyslogPriority(%d)" % self
.value
93 return self
.NAMES
[self
.value
]
95 return "(unknown priority %d)" % self
.value
98 class SyslogFacility(object):
124 31: 'snmptrap', # HACK!
127 def __init__(self
, value
):
128 super(SyslogFacility
, self
).__init
__()
129 self
.value
= int(value
)
132 return "SyslogFacility(%d)" % self
.value
136 return self
.NAMES
[self
.value
]
138 return "(unknown facility %d)" % self
.value
141 # Alas, we often have no clue about the actual encoding, so we have to guess:
142 # Initially we assume UTF-8, but fall back to latin-1 if it didn't work.
143 def decode_from_bytes(string_as_bytes
):
144 # This is just a safeguard if we are inadvertedly called with a Unicode
145 # string. In theory this should never happen, but given the typing chaos in
146 # this script, one never knows. In the Unicode case, Python tries to be
147 # "helpful", but this fails miserably: Calling 'decode' on a Unicode string
148 # implicitly converts it via 'encode("ascii")' to a byte string first, but
149 # this can of course fail and doesn't make sense at all when we immediately
150 # call 'decode' on this byte string again. In a nutshell: The implicit
151 # conversions between str and unicode are a fundamentally broken idea, just
152 # like all implicit things and "helpful" ideas in general. :-P For further
153 # info see e.g. http://nedbatchelder.com/text/unipain.html
154 if isinstance(string_as_bytes
, unicode):
155 return string_as_bytes
158 return string_as_bytes
.decode("utf-8")
160 return string_as_bytes
.decode("latin-1")
163 def scrub_and_decode(s
):
164 return decode_from_bytes(cmk
.ec
.history
.scrub_string(s
))
168 # .--Helper functions----------------------------------------------------.
170 # | | | | | ___| |_ __ ___ _ __ ___ |
171 # | | |_| |/ _ \ | '_ \ / _ \ '__/ __| |
172 # | | _ | __/ | |_) | __/ | \__ \ |
173 # | |_| |_|\___|_| .__/ \___|_| |___/ |
175 # +----------------------------------------------------------------------+
176 # | Various helper functions |
177 # '----------------------------------------------------------------------'
180 class ECLock(object):
181 def __init__(self
, logger
):
182 super(ECLock
, self
).__init
__()
183 self
._logger
= logger
184 self
._lock
= threading
.Lock()
186 def acquire(self
, blocking
=True):
187 self
._logger
.debug("[%s] Trying to acquire lock", threading
.current_thread().name
)
189 ret
= self
._lock
.acquire(blocking
)
191 self
._logger
.debug("[%s] Acquired lock", threading
.current_thread().name
)
193 self
._logger
.debug("[%s] Non-blocking aquire failed", threading
.current_thread().name
)
198 self
._logger
.debug("[%s] Releasing lock", threading
.current_thread().name
)
204 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
206 return False # Do not swallow exceptions
209 class ECServerThread(threading
.Thread
):
212 raise NotImplementedError()
214 def __init__(self
, name
, logger
, settings
, config
, slave_status
, profiling_enabled
,
216 super(ECServerThread
, self
).__init
__(name
=name
)
217 self
.settings
= settings
218 self
._config
= config
219 self
._slave
_status
= slave_status
220 self
._profiling
_enabled
= profiling_enabled
221 self
._profile
_file
= profile_file
222 self
._terminate
_event
= threading
.Event()
223 self
._logger
= logger
226 self
._logger
.info("Starting up")
228 while not self
._terminate
_event
.is_set():
230 with cmk
.utils
.profile
.Profile(
231 enabled
=self
._profiling
_enabled
, profile_file
=str(self
._profile
_file
)):
234 self
._logger
.exception("Exception in %s server" % self
.name
)
235 if self
._settings
.options
.debug
:
239 self
._logger
.info("Terminated")
242 self
._terminate
_event
.set()
245 def terminate(terminate_main_event
, event_server
, status_server
):
246 terminate_main_event
.set()
247 status_server
.terminate()
248 event_server
.terminate()
251 def bail_out(logger
, reason
):
252 logger
.error("FATAL ERROR: %s" % reason
)
256 def process_exists(pid
):
264 def drain_pipe(pipe
):
267 readable
= select
.select([pipe
], [], [], 0.1)[0]
268 except select
.error
as e
:
269 if e
[0] == errno
.EINTR
:
276 data
= os
.read(pipe
, 4096)
280 break # Error while reading
282 break # No data available
285 def match(pattern
, text
, complete
=True):
286 """Performs an EC style matching test of pattern on text
288 Returns False in case of no match or a tuple with the match groups.
289 In case no match group is produced, it returns an empty tuple."""
293 elif isinstance(pattern
, six
.string_types
):
295 return () if pattern
== text
.lower() else False
296 return () if pattern
in text
.lower() else False
298 # Assume compiled regex
299 m
= pattern
.search(text
)
303 # Remove None from result tuples and replace it with empty strings
304 return tuple([g
if g
is not None else '' for g
in groups
])
310 def format_pattern(pat
):
317 # Sorry: this code is dupliated in web/plugins/wato/mkeventd.py
318 def match_ipv4_network(pattern
, ipaddress_text
):
319 network
, network_bits
= parse_ipv4_network(pattern
) # is validated by valuespec
320 if network_bits
== 0:
321 return True # event if ipaddress is empty
323 ipaddress
= parse_ipv4_address(ipaddress_text
)
325 return False # invalid address never matches
327 # first network_bits of network and ipaddress must be
328 # identical. Create a bitmask.
331 bitmask
= bitmask
<< 1
338 return (network
& bitmask
) == (ipaddress
& bitmask
)
341 def parse_ipv4_address(text
):
342 parts
= map(int, text
.split("."))
343 return (parts
[0] << 24) + (parts
[1] << 16) + (parts
[2] << 8) + parts
[3]
346 def parse_ipv4_network(text
):
348 return parse_ipv4_address(text
), 32
350 network_text
, bits_text
= text
.split("/")
351 return parse_ipv4_address(network_text
), int(bits_text
)
354 def replace_groups(text
, origtext
, match_groups
):
355 # replace \0 with text itself. This allows to add information
356 # in front or and the end of a message
357 text
= text
.replace("\\0", origtext
)
359 # Generic replacement with \1, \2, ...
360 match_groups_message
= match_groups
.get("match_groups_message")
361 if isinstance(match_groups_message
, tuple):
362 for nr
, g
in enumerate(match_groups_message
):
363 text
= text
.replace("\\%d" % (nr
+ 1), g
)
365 # Replacement with keyword
367 # $MATCH_GROUPS_MESSAGE_x$
368 # $MATCH_GROUPS_SYSLOG_APPLICATION_x$
369 for key_prefix
, values
in match_groups
.iteritems():
370 if not isinstance(values
, tuple):
373 for idx
, match_value
in enumerate(values
):
374 text
= text
.replace("$%s_%d$" % (key_prefix
.upper(), idx
+ 1), match_value
)
379 class MKSignalException(Exception):
380 def __init__(self
, signum
):
381 Exception.__init
__(self
, "Got signal %d" % signum
)
382 self
._signum
= signum
385 class MKClientError(Exception):
386 def __init__(self
, t
):
387 Exception.__init
__(self
, t
)
391 # .--Timeperiods---------------------------------------------------------.
393 # | |_ _(_)_ __ ___ ___ _ __ ___ _ __(_) ___ __| |___ |
394 # | | | | | '_ ` _ \ / _ \ '_ \ / _ \ '__| |/ _ \ / _` / __| |
395 # | | | | | | | | | | __/ |_) | __/ | | | (_) | (_| \__ \ |
396 # | |_| |_|_| |_| |_|\___| .__/ \___|_| |_|\___/ \__,_|___/ |
398 # +----------------------------------------------------------------------+
399 # | Timeperiods are used in rule conditions |
400 # '----------------------------------------------------------------------'
403 class TimePeriods(object):
404 def __init__(self
, logger
):
405 super(TimePeriods
, self
).__init
__()
406 self
._logger
= logger
408 self
._last
_update
= 0
411 if self
._periods
is not None and int(time
.time()) / 60 == self
._last
_update
:
412 return # only update once a minute
414 table
= livestatus
.LocalConnection().query("GET timeperiods\nColumns: name alias in")
416 for tpname
, alias
, isin
in table
:
417 periods
[tpname
] = (alias
, bool(isin
))
418 self
._periods
= periods
419 self
._last
_update
= int(time
.time()) / 60
420 except Exception as e
:
421 self
._logger
.exception("Cannot update timeperiod information: %s" % e
)
424 def check(self
, tpname
):
426 if not self
._periods
:
427 self
._logger
.warning("no timeperiod information, assuming %s is active" % tpname
)
429 if tpname
not in self
._periods
:
430 self
._logger
.warning("no such timeperiod %s, assuming it is active" % tpname
)
432 return self
._periods
[tpname
][1]
436 # .--Host config---------------------------------------------------------.
438 # | | | | | ___ ___| |_ ___ ___ _ __ / _(_) __ _ |
439 # | | |_| |/ _ \/ __| __| / __/ _ \| '_ \| |_| |/ _` | |
440 # | | _ | (_) \__ \ |_ | (_| (_) | | | | _| | (_| | |
441 # | |_| |_|\___/|___/\__| \___\___/|_| |_|_| |_|\__, | |
443 # +----------------------------------------------------------------------+
444 # | Manages the configuration of the hosts of the local monitoring core. |
445 # | It fetches and caches the information during runtine of the EC. |
446 # '----------------------------------------------------------------------'
449 class HostConfig(object):
450 def __init__(self
, logger
):
451 self
._logger
= logger
454 def initialize(self
):
455 self
._logger
.debug("Initializing host config")
456 self
._event
_host
_to
_host
= {}
458 self
._hosts
_by
_name
= {}
459 self
._hosts
_by
_lower
_name
= {}
460 self
._hosts
_by
_lower
_alias
= {}
461 self
._hosts
_by
_lower
_address
= {}
463 self
._got
_config
_from
_core
= False
465 def get(self
, host_name
, deflt
=None):
466 return self
._hosts
_by
_name
.get(host_name
, deflt
)
468 def get_by_event_host_name(self
, event_host_name
, deflt
=None):
470 self
._update
_from
_core
()
472 self
._logger
.exception("Failed to get host info from core. Try again later.")
476 return self
._event
_host
_to
_host
[event_host_name
]
478 pass # Not cached yet
480 # Note: It is important that we use exactly the same algorithm here as in the core
481 # (enterprise/core/src/World.cc getHostByDesignation)
483 # Host name : Case insensitive equality (host_name =~ %s)
484 # Host alias : Case insensitive equality (host_alias =~ %s)
485 # Host address : Case insensitive equality (host_address =~ %s)
486 low_event_host_name
= event_host_name
.lower()
490 self
._hosts
_by
_lower
_name
, self
._hosts
_by
_lower
_address
, self
._hosts
_by
_lower
_alias
493 host
= search_map
[low_event_host_name
]
498 self
._event
_host
_to
_host
[event_host_name
] = host
501 def _update_from_core(self
):
502 if not self
._has
_core
_config
_reloaded
():
506 self
._logger
.debug("Fetching host config from core")
517 query
= "GET hosts\nColumns: %s" % " ".join(columns
)
518 for host
in livestatus
.LocalConnection().query_table_assoc(query
):
519 self
._hosts
_by
_name
[host
["name"]] = host
521 # Lookup maps to improve performance of host searches
522 self
._hosts
_by
_lower
_name
[host
["name"].lower()] = host
523 self
._hosts
_by
_lower
_alias
[host
["alias"].lower()] = host
524 self
._hosts
_by
_lower
_address
[host
["alias"].lower()] = host
526 self
._logger
.debug("Got %d hosts from core" % len(self
._hosts
_by
_name
))
527 self
._got
_config
_from
_core
= self
._get
_core
_start
_time
()
529 def _has_core_config_reloaded(self
):
530 if not self
._got
_config
_from
_core
:
533 if self
._get
_core
_start
_time
() > self
._got
_config
_from
_core
:
538 def _get_core_start_time(self
):
539 query
= ("GET status\n" "Columns: program_start\n")
540 return livestatus
.LocalConnection().query_value(query
)
544 # .--Perfcounters--------------------------------------------------------.
546 # | | _ \ ___ _ __ / _| ___ ___ _ _ _ __ | |_ ___ _ __ ___ |
547 # | | |_) / _ \ '__| |_ / __/ _ \| | | | '_ \| __/ _ \ '__/ __| |
548 # | | __/ __/ | | _| (_| (_) | |_| | | | | || __/ | \__ \ |
549 # | |_| \___|_| |_| \___\___/ \__,_|_| |_|\__\___|_| |___/ |
551 # +----------------------------------------------------------------------+
552 # | Helper class for performance counting |
553 # '----------------------------------------------------------------------'
557 """Linear interpolation between a and b with weight t"""
558 return (1 - t
) * a
+ t
* b
561 class Perfcounters(object):
572 # Average processing times
574 "processing": 0.99, # event processing
575 "sync": 0.95, # Replication sync
576 "request": 0.95, # Client requests
579 # TODO: Why aren't self._times / self._rates / ... not initialized with their defaults?
580 def __init__(self
, logger
):
581 super(Perfcounters
, self
).__init
__()
582 self
._lock
= ECLock(logger
)
584 # Initialize counters
585 self
._counters
= dict([(n
, 0) for n
in self
._counter
_names
])
587 self
._old
_counters
= {}
589 self
._average
_rates
= {}
591 self
._last
_statistics
= None
593 self
._logger
= logger
.getChild("Perfcounters")
595 def count(self
, counter
):
597 self
._counters
[counter
] += 1
599 def count_time(self
, counter
, ptime
):
601 if counter
in self
._times
:
602 self
._times
[counter
] = lerp(ptime
, self
._times
[counter
], self
._weights
[counter
])
604 self
._times
[counter
] = ptime
606 def do_statistics(self
):
609 if self
._last
_statistics
:
610 duration
= now
- self
._last
_statistics
613 for name
, value
in self
._counters
.iteritems():
615 delta
= value
- self
._old
_counters
[name
]
616 rate
= delta
/ duration
617 self
._rates
[name
] = rate
618 if name
in self
._average
_rates
:
619 # We could make the weight configurable
620 self
._average
_rates
[name
] = lerp(rate
, self
._average
_rates
[name
], 0.9)
622 self
._average
_rates
[name
] = rate
624 self
._last
_statistics
= now
625 self
._old
_counters
= self
._counters
.copy()
628 def status_columns(cls
):
630 # Please note: status_columns() and get_status() need to produce lists with exact same column order
631 for name
in cls
._counter
_names
:
632 columns
.append(("status_" + name
, 0))
633 columns
.append(("status_" + name
.rstrip("s") + "_rate", 0.0))
634 columns
.append(("status_average_" + name
.rstrip("s") + "_rate", 0.0))
636 for name
in cls
._weights
:
637 columns
.append(("status_average_%s_time" % name
, 0.0))
641 def get_status(self
):
644 # Please note: status_columns() and get_status() need to produce lists with exact same column order
645 for name
in self
._counter
_names
:
646 row
.append(self
._counters
[name
])
647 row
.append(self
._rates
.get(name
, 0.0))
648 row
.append(self
._average
_rates
.get(name
, 0.0))
650 for name
in self
._weights
:
651 row
.append(self
._times
.get(name
, 0.0))
657 # .--EventServer---------------------------------------------------------.
659 # | | ____|_ _____ _ __ | |_/ ___| ___ _ ____ _____ _ __ |
660 # | | _| \ \ / / _ \ '_ \| __\___ \ / _ \ '__\ \ / / _ \ '__| |
661 # | | |___ \ V / __/ | | | |_ ___) | __/ | \ V / __/ | |
662 # | |_____| \_/ \___|_| |_|\__|____/ \___|_| \_/ \___|_| |
664 # +----------------------------------------------------------------------+
665 # | Verarbeitung und Klassifizierung von eingehenden Events. |
666 # '----------------------------------------------------------------------'
669 class EventServer(ECServerThread
):
685 def __init__(self
, logger
, settings
, config
, slave_status
, perfcounters
, lock_configuration
,
686 history
, event_status
, event_columns
):
687 super(EventServer
, self
).__init
__(
692 slave_status
=slave_status
,
693 profiling_enabled
=settings
.options
.profile_event
,
694 profile_file
=settings
.paths
.event_server_profile
.value
)
696 self
._syslog
_tcp
= None
697 self
._snmptrap
= None
700 self
._hash
_stats
= []
701 for _unused_facility
in xrange(32):
702 self
._hash
_stats
.append([0] * 8)
704 self
.host_config
= HostConfig(self
._logger
)
705 self
._perfcounters
= perfcounters
706 self
._lock
_configuration
= lock_configuration
707 self
._history
= history
708 self
._event
_status
= event_status
709 self
._event
_columns
= event_columns
710 self
._message
_period
= cmk
.ec
.history
.ActiveHistoryPeriod()
711 self
._rule
_matcher
= RuleMatcher(self
._logger
, config
)
714 self
.open_eventsocket()
716 self
.open_syslog_tcp()
718 self
._snmp
_trap
_engine
= cmk
.ec
.snmp
.SNMPTrapEngine(self
.settings
, self
._config
,
719 self
._logger
.getChild("snmp"),
720 self
.handle_snmptrap
)
723 def status_columns(cls
):
724 columns
= cls
._general
_columns
()
725 columns
+= Perfcounters
.status_columns()
726 columns
+= cls
._replication
_columns
()
727 columns
+= cls
._event
_limit
_columns
()
731 def _general_columns(cls
):
733 ("status_config_load_time", 0),
734 ("status_num_open_events", 0),
735 ("status_virtual_memory_size", 0),
739 def _replication_columns(cls
):
741 ("status_replication_slavemode", ""),
742 ("status_replication_last_sync", 0.0),
743 ("status_replication_success", False),
747 def _event_limit_columns(cls
):
749 ("status_event_limit_host", 0),
750 ("status_event_limit_rule", 0),
751 ("status_event_limit_overall", 0),
752 ("status_event_limit_active_hosts", []),
753 ("status_event_limit_active_rules", []),
754 ("status_event_limit_active_overall", False),
757 def get_status(self
):
760 row
+= self
._add
_general
_status
()
761 row
+= self
._perfcounters
.get_status()
762 row
+= self
._add
_replication
_status
()
763 row
+= self
._add
_event
_limit
_status
()
767 def _add_general_status(self
):
769 self
._config
["last_reload"],
770 self
._event
_status
.num_existing_events
,
771 self
._virtual
_memory
_size
(),
774 def _virtual_memory_size(self
):
775 parts
= file('/proc/self/stat').read().split()
776 return int(parts
[22]) # in Bytes
778 def _add_replication_status(self
):
779 if is_replication_slave(self
._config
):
781 self
._slave
_status
["mode"],
782 self
._slave
_status
["last_sync"],
783 self
._slave
_status
["success"],
785 return ["master", 0.0, False]
787 def _add_event_limit_status(self
):
789 self
._config
["event_limit"]["by_host"]["limit"],
790 self
._config
["event_limit"]["by_rule"]["limit"],
791 self
._config
["event_limit"]["overall"]["limit"],
792 self
.get_hosts_with_active_event_limit(),
793 self
.get_rules_with_active_event_limit(),
794 self
.is_overall_event_limit_active(),
797 def create_pipe(self
):
798 path
= self
.settings
.paths
.event_pipe
.value
800 if not path
.is_fifo():
805 if not path
.exists():
808 # We want to be able to receive events from all users on the local system
809 path
.chmod(0o666) # nosec
811 self
._logger
.info("Created FIFO '%s' for receiving events" % path
)
813 def open_syslog(self
):
814 endpoint
= self
.settings
.options
.syslog_udp
816 if isinstance(endpoint
, cmk
.ec
.settings
.FileDescriptor
):
817 self
._syslog
= socket
.fromfd(endpoint
.value
, socket
.AF_INET
, socket
.SOCK_DGRAM
)
818 os
.close(endpoint
.value
)
820 "Opened builtin syslog server on inherited filedescriptor %d" % endpoint
.value
)
821 if isinstance(endpoint
, cmk
.ec
.settings
.PortNumber
):
822 self
._syslog
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
823 self
._syslog
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
824 self
._syslog
.bind(("0.0.0.0", endpoint
.value
))
825 self
._logger
.info("Opened builtin syslog server on UDP port %d" % endpoint
.value
)
826 except Exception as e
:
827 raise Exception("Cannot start builtin syslog server: %s" % e
)
829 def open_syslog_tcp(self
):
830 endpoint
= self
.settings
.options
.syslog_tcp
832 if isinstance(endpoint
, cmk
.ec
.settings
.FileDescriptor
):
833 self
._syslog
_tcp
= socket
.fromfd(endpoint
.value
, socket
.AF_INET
, socket
.SOCK_STREAM
)
834 self
._syslog
_tcp
.listen(20)
835 os
.close(endpoint
.value
)
836 self
._logger
.info("Opened builtin syslog-tcp server on inherited filedescriptor %d"
838 if isinstance(endpoint
, cmk
.ec
.settings
.PortNumber
):
839 self
._syslog
_tcp
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
840 self
._syslog
_tcp
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
841 self
._syslog
_tcp
.bind(("0.0.0.0", endpoint
.value
))
842 self
._syslog
_tcp
.listen(20)
844 "Opened builtin syslog-tcp server on TCP port %d" % endpoint
.value
)
845 except Exception as e
:
846 raise Exception("Cannot start builtin syslog-tcp server: %s" % e
)
848 def open_snmptrap(self
):
849 endpoint
= self
.settings
.options
.snmptrap_udp
851 if isinstance(endpoint
, cmk
.ec
.settings
.FileDescriptor
):
852 self
._snmptrap
= socket
.fromfd(endpoint
.value
, socket
.AF_INET
, socket
.SOCK_DGRAM
)
853 os
.close(endpoint
.value
)
854 self
._logger
.info("Opened builtin snmptrap server on inherited filedescriptor %d" %
856 if isinstance(endpoint
, cmk
.ec
.settings
.PortNumber
):
857 self
._snmptrap
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
858 self
._snmptrap
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
859 self
._snmptrap
.bind(("0.0.0.0", endpoint
.value
))
860 self
._logger
.info("Opened builtin snmptrap server on UDP port %d" % endpoint
.value
)
861 except Exception as e
:
862 raise Exception("Cannot start builtin snmptrap server: %s" % e
)
864 def open_eventsocket(self
):
865 path
= self
.settings
.paths
.event_socket
.value
868 path
.parent
.mkdir(parents
=True, exist_ok
=True)
869 self
._eventsocket
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
870 self
._eventsocket
.bind(str(path
))
872 self
._eventsocket
.listen(self
._config
['eventsocket_queue_len'])
873 self
._logger
.info("Opened UNIX socket '%s' for receiving events" % path
)
876 # Beware: we must open the pipe also for writing. Otherwise
877 # we will see EOF forever after one writer has finished and
878 # select() will trigger even if there is no data. A good article
879 # about this is here:
880 # http://www.outflux.net/blog/archives/2008/03/09/using-select-on-a-fifo/
881 return os
.open(str(self
.settings
.paths
.event_pipe
.value
), os
.O_RDWR | os
.O_NONBLOCK
)
883 def handle_snmptrap(self
, trap
, ipaddress
):
884 self
.process_event(self
._create
_event
_from
_trap
(trap
, ipaddress
))
886 def _create_event_from_trap(self
, trap
, ipaddress
):
887 # use the trap-oid as application
889 for index
, (oid
, _unused_val
) in enumerate(trap
):
890 if oid
in ['1.3.6.1.6.3.1.1.4.1.0', 'SNMPv2-MIB::snmpTrapOID.0']:
891 application
= scrub_and_decode(trap
.pop(index
)[1])
894 # once we got here we have a real parsed trap which we convert to an event now
895 safe_ipaddress
= scrub_and_decode(ipaddress
)
896 text
= scrub_and_decode(', '.join(['%s: %s' % (item
[0], str(item
[1])) for item
in trap
]))
900 'host': safe_ipaddress
,
901 'ipaddress': safe_ipaddress
,
902 'priority': 5, # notice
903 'facility': 31, # not used by syslog -> we use this for all traps
904 'application': application
,
907 'host_in_downtime': False,
914 pipe
= self
.open_pipe()
917 # Wait for incoming syslog packets via UDP
918 if self
._syslog
is not None:
919 listen_list
.append(self
._syslog
.fileno())
921 # Wait for new connections for events via TCP socket
922 if self
._syslog
_tcp
is not None:
923 listen_list
.append(self
._syslog
_tcp
)
925 # Wait for new connections for events via unix socket
926 if self
._eventsocket
:
927 listen_list
.append(self
._eventsocket
)
929 # Wait for incomding SNMP traps
930 if self
._snmptrap
is not None:
931 listen_list
.append(self
._snmptrap
.fileno())
933 # Keep list of client connections via UNIX socket and
934 # read data that is not yet processed. Map from
935 # fd to (fileobject, data)
938 while not self
._terminate
_event
.is_set():
940 readable
= select
.select(listen_list
+ client_sockets
.keys(), [], [],
942 except select
.error
as e
:
943 if e
[0] == errno
.EINTR
:
948 # Accept new connection on event unix socket
949 if self
._eventsocket
in readable
:
950 client_socket
, address
= self
._eventsocket
.accept()
951 # pylint: disable=no-member
952 client_sockets
[client_socket
.fileno()] = (client_socket
, address
, "")
954 # Same for the TCP syslog socket
955 if self
._syslog
_tcp
and self
._syslog
_tcp
in readable
:
956 client_socket
, address
= self
._syslog
_tcp
.accept()
957 # pylint: disable=no-member
958 client_sockets
[client_socket
.fileno()] = (client_socket
, address
, "")
960 # Read data from existing event unix socket connections
961 # NOTE: We modify client_socket in the loop, so we need to copy below!
962 for fd
, (cs
, address
, previous_data
) in list(client_sockets
.iteritems()):
964 # Receive next part of data
966 new_data
= cs
.recv(4096)
971 # Put together with incomplete messages from last time
972 data
= previous_data
+ new_data
974 # Do we have incomplete data? (if the socket has been
975 # closed then we consider the pending message always
976 # as complete, even if there was no trailing \n)
977 if new_data
and not data
.endswith("\n"): # keep fragment
978 # Do we have any complete messages?
980 complete
, rest
= data
.rsplit("\n", 1)
981 self
.process_raw_lines(complete
+ "\n", address
)
983 rest
= data
# keep for next time
985 # Only complete messages
988 self
.process_raw_lines(data
, address
)
991 # Connection still open?
993 client_sockets
[fd
] = (cs
, address
, rest
)
996 del client_sockets
[fd
]
998 # Read data from pipe
1001 data
= os
.read(pipe
, 4096)
1003 # Prepend previous beginning of message to read data
1004 data
= pipe_fragment
+ data
1007 # Last message still incomplete?
1008 if data
[-1] != '\n':
1009 if '\n' in data
: # at least one complete message contained
1010 messages
, pipe_fragment
= data
.rsplit('\n', 1)
1011 self
.process_raw_lines(messages
+ '\n') # got lost in split
1013 pipe_fragment
= data
# keep beginning of message, wait for \n
1015 self
.process_raw_lines(data
)
1018 pipe
= self
.open_pipe()
1019 listen_list
[0] = pipe
1020 # Pending fragments from previos reads that are not terminated
1021 # by a \n are ignored.
1023 self
._logger
.warning(
1024 "Ignoring incomplete message '%s' from pipe" % pipe_fragment
)
1029 # Read events from builtin syslog server
1030 if self
._syslog
is not None and self
._syslog
.fileno() in readable
:
1031 self
.process_raw_lines(*self
._syslog
.recvfrom(4096))
1033 # Read events from builtin snmptrap server
1034 if self
._snmptrap
is not None and self
._snmptrap
.fileno() in readable
:
1036 message
, sender_address
= self
._snmptrap
.recvfrom(65535)
1037 self
.process_raw_data(
1038 lambda: self
._snmp
_trap
_engine
.process_snmptrap(message
, sender_address
))
1040 self
._logger
.exception(
1041 'Exception handling a SNMP trap from "%s". Skipping this one' %
1045 # process the first spool file we get
1046 spool_file
= next(self
.settings
.paths
.spool_dir
.value
.glob('[!.]*'))
1047 self
.process_raw_lines(spool_file
.read_bytes())
1049 select_timeout
= 0 # enable fast processing to process further files
1050 except StopIteration:
1051 select_timeout
= 1 # restore default select timeout
1053 # Processes incoming data, just a wrapper between the real data and the
1054 # handler function to record some statistics etc.
1055 def process_raw_data(self
, handler
):
1056 self
._perfcounters
.count("messages")
1057 before
= time
.time()
1058 # In replication slave mode (when not took over), ignore all events
1059 if not is_replication_slave(self
._config
) or self
._slave
_status
["mode"] != "sync":
1061 elif self
.settings
.options
.debug
:
1062 self
._logger
.info("Replication: we are in slave mode, ignoring event")
1063 elapsed
= time
.time() - before
1064 self
._perfcounters
.count_time("processing", elapsed
)
1066 # Takes several lines of messages, handles encoding and processes them separated
1067 def process_raw_lines(self
, data
, address
=None):
1068 lines
= data
.splitlines()
1070 line
= scrub_and_decode(line
.rstrip())
1074 def handler(line
=line
):
1075 self
.process_line(line
, address
)
1077 self
.process_raw_data(handler
)
1078 except Exception as e
:
1079 self
._logger
.exception(
1080 'Exception handling a log line (skipping this one): %s' % e
)
1082 def do_housekeeping(self
):
1083 with self
._event
_status
.lock
:
1084 with self
._lock
_configuration
:
1085 self
.hk_handle_event_timeouts()
1086 self
.hk_check_expected_messages()
1087 self
.hk_cleanup_downtime_events()
1088 self
._history
.housekeeping()
1090 # For all events that have been created in a host downtime check the host
1091 # whether or not it is still in downtime. In case the downtime has ended
1092 # archive the events that have been created in a downtime.
1093 def hk_cleanup_downtime_events(self
):
1096 for event
in self
._event
_status
.events():
1097 if not event
["host_in_downtime"]:
1098 continue # only care about events created in downtime
1101 in_downtime
= host_downtimes
[event
["core_host"]]
1103 in_downtime
= self
._is
_host
_in
_downtime
(event
)
1104 host_downtimes
[event
["core_host"]] = in_downtime
1107 continue # (still) in downtime, don't delete any event
1109 self
._logger
.verbose(
1110 "Remove event %d (created in downtime, host left downtime)" % event
["id"])
1111 self
._event
_status
.remove_event(event
)
1113 def hk_handle_event_timeouts(self
):
1114 # 1. Automatically delete all events that are in state "counting"
1115 # and have not reached the required number of hits and whose
1117 # 2. Automatically delete all events that are in state "open"
1118 # and whose livetime is elapsed.
1119 events_to_delete
= []
1120 events
= self
._event
_status
.events()
1122 for nr
, event
in enumerate(events
):
1123 rule
= self
._rule
_by
_id
.get(event
["rule_id"])
1125 if event
["phase"] == "counting":
1126 # Event belongs to a rule that does not longer exist? It
1127 # will never reach its count. Better delete it.
1129 self
._logger
.info("Deleting orphaned event %d created by obsolete rule %s" %
1130 (event
["id"], event
["rule_id"]))
1131 event
["phase"] = "closed"
1132 self
._history
.add(event
, "ORPHANED")
1133 events_to_delete
.append(nr
)
1135 elif "count" not in rule
and "expect" not in rule
:
1137 "Count-based event %d belonging to rule %s: rule does not "
1138 "count/expect anymore. Deleting event." % (event
["id"], event
["rule_id"]))
1139 event
["phase"] = "closed"
1140 self
._history
.add(event
, "NOCOUNT")
1141 events_to_delete
.append(nr
)
1144 elif "count" in rule
:
1145 count
= rule
["count"]
1146 if count
.get("algorithm") in ["tokenbucket", "dynabucket"]:
1147 last_token
= event
.get("last_token", event
["first"])
1148 secs_per_token
= count
["period"] / float(count
["count"])
1149 if count
["algorithm"] == "dynabucket": # get fewer tokens if count is lower
1150 if event
["count"] <= 1:
1151 secs_per_token
= count
["period"]
1153 secs_per_token
*= (float(count
["count"]) / float(event
["count"]))
1154 elapsed_secs
= now
- last_token
1155 new_tokens
= int(elapsed_secs
/ secs_per_token
)
1157 if self
.settings
.options
.debug
:
1159 "Rule %s/%s, event %d: got %d new tokens" %
1160 (rule
["pack"], rule
["id"], event
["id"], new_tokens
))
1161 event
["count"] = max(0, event
["count"] - new_tokens
)
1163 "last_token"] = last_token
+ new_tokens
* secs_per_token
# not now! would be unfair
1164 if event
["count"] == 0:
1166 "Rule %s/%s, event %d: again without allowed rate, dropping event"
1167 % (rule
["pack"], rule
["id"], event
["id"]))
1168 event
["phase"] = "closed"
1169 self
._history
.add(event
, "COUNTFAILED")
1170 events_to_delete
.append(nr
)
1172 else: # algorithm 'interval'
1173 if event
["first"] + count
["period"] <= now
: # End of period reached
1175 "Rule %s/%s: reached only %d out of %d events within %d seconds. "
1176 "Resetting to zero." % (rule
["pack"], rule
["id"], event
["count"],
1177 count
["count"], count
["period"]))
1178 event
["phase"] = "closed"
1179 self
._history
.add(event
, "COUNTFAILED")
1180 events_to_delete
.append(nr
)
1182 # Handle delayed actions
1183 elif event
["phase"] == "delayed":
1184 delay_until
= event
.get("delay_until", 0) # should always be present
1185 if now
>= delay_until
:
1186 self
._logger
.info("Delayed event %d of rule %s is now activated." %
1187 (event
["id"], event
["rule_id"]))
1188 event
["phase"] = "open"
1189 self
._history
.add(event
, "DELAYOVER")
1191 cmk
.ec
.actions
.event_has_opened(self
._history
, self
.settings
, self
._config
,
1192 self
._logger
, self
, self
._event
_columns
,
1194 if rule
.get("autodelete"):
1195 event
["phase"] = "closed"
1196 self
._history
.add(event
, "AUTODELETE")
1197 events_to_delete
.append(nr
)
1200 self
._logger
.info("Cannot do rule action: rule %s not present anymore." %
1203 # Handle events with a limited lifetime
1204 elif "live_until" in event
:
1205 if now
>= event
["live_until"]:
1206 allowed_phases
= event
.get("live_until_phases", ["open"])
1207 if event
["phase"] in allowed_phases
:
1208 event
["phase"] = "closed"
1209 events_to_delete
.append(nr
)
1210 self
._logger
.info("Livetime of event %d (rule %s) exceeded. Deleting event."
1211 % (event
["id"], event
["rule_id"]))
1212 self
._history
.add(event
, "EXPIRED")
1214 # Do delayed deletion now (was delayed in order to keep list indices OK)
1215 for nr
in events_to_delete
[::-1]:
1216 self
._event
_status
.remove_event(events
[nr
])
1218 def hk_check_expected_messages(self
):
1220 # "Expecting"-rules are rules that require one or several
1221 # occurrances of a message within a defined time period.
1222 # Whenever one period of time has elapsed, we need to check
1223 # how many messages have been seen for that rule. If these
1224 # are too few, we open an event.
1225 # We need to handle to cases:
1226 # 1. An event for such a rule already exists and is
1227 # in the state "counting" -> this can only be the case if
1228 # more than one occurrance is required.
1229 # 2. No event at all exists.
1231 for rule
in self
._rules
:
1232 if "expect" in rule
:
1234 if not self
.event_rule_matches_site(rule
, event
=None):
1237 # Interval is either a number of seconds, or pair of a number of seconds
1238 # (e.g. 86400, meaning one day) and a timezone offset relative to UTC in hours.
1239 interval
= rule
["expect"]["interval"]
1240 expected_count
= rule
["expect"]["count"]
1242 interval_start
= self
._event
_status
.interval_start(rule
["id"], interval
)
1243 if interval_start
>= now
:
1246 next_interval_start
= self
._event
_status
.next_interval_start(
1247 interval
, interval_start
)
1248 if next_interval_start
> now
:
1251 # Interval has been elapsed. Now comes the truth: do we have enough
1254 # First do not forget to switch to next interval
1255 self
._event
_status
.start_next_interval(rule
["id"], interval
)
1257 # First look for case 1: rule that already have at least one hit
1258 # and this events in the state "counting" exist.
1259 events_to_delete
= []
1260 events
= self
._event
_status
.events()
1261 for nr
, event
in enumerate(events
):
1262 if event
["rule_id"] == rule
["id"] and event
["phase"] == "counting":
1263 # time has elapsed. Now lets see if we have reached
1264 # the neccessary count:
1265 if event
["count"] < expected_count
: # no -> trigger alarm
1266 self
._handle
_absent
_event
(rule
, event
["count"], expected_count
,
1268 else: # yes -> everything is fine. Just log.
1270 "Rule %s/%s has reached %d occurrances (%d required). "
1271 "Starting next period." % (rule
["pack"], rule
["id"], event
["count"],
1273 self
._history
.add(event
, "COUNTREACHED")
1274 # Counting event is no longer needed.
1275 events_to_delete
.append(nr
)
1278 # Ou ou, no event found at all.
1280 self
._handle
_absent
_event
(rule
, 0, expected_count
, interval_start
)
1282 for nr
in events_to_delete
[::-1]:
1283 self
._event
_status
.remove_event(events
[nr
])
1285 def _handle_absent_event(self
, rule
, event_count
, expected_count
, interval_start
):
1288 text
= "Expected message arrived only %d out of %d times since %s" % \
1289 (event_count
, expected_count
, time
.strftime("%F %T", time
.localtime(interval_start
)))
1291 text
= "Expected message did not arrive since %s" % \
1292 time
.strftime("%F %T", time
.localtime(interval_start
))
1294 # If there is already an incidence about this absent message, we can merge and
1295 # not create a new event. There is a setting for this.
1297 merge
= rule
["expect"].get("merge", "open")
1298 if merge
!= "never":
1299 for event
in self
._event
_status
.events():
1300 if event
["rule_id"] == rule
["id"] and \
1301 (event
["phase"] == "open" or
1302 (event
["phase"] == "ack" and merge
== "acked")):
1307 merge_event
["last"] = now
1308 merge_event
["count"] += 1
1309 merge_event
["phase"] = "open"
1310 merge_event
["time"] = now
1311 merge_event
["text"] = text
1312 # Better rewrite (again). Rule might have changed. Also we have changed
1313 # the text and the user might have his own text added via set_text.
1314 self
.rewrite_event(rule
, merge_event
, {})
1315 self
._history
.add(merge_event
, "COUNTFAILED")
1317 # Create artifical event from scratch. Make sure that all important
1318 # fields are defined.
1320 "rule_id": rule
["id"],
1333 "facility": 1, # user
1335 "match_groups_syslog_application": (),
1337 "host_in_downtime": False,
1339 self
._add
_rule
_contact
_groups
_to
_event
(rule
, event
)
1340 self
.rewrite_event(rule
, event
, {})
1341 self
._event
_status
.new_event(event
)
1342 self
._history
.add(event
, "COUNTFAILED")
1343 cmk
.ec
.actions
.event_has_opened(self
._history
, self
.settings
, self
._config
,
1344 self
._logger
, self
, self
._event
_columns
, rule
, event
)
1345 if rule
.get("autodelete"):
1346 event
["phase"] = "closed"
1347 self
._history
.add(event
, "AUTODELETE")
1348 self
._event
_status
.remove_event(event
)
1350 def reload_configuration(self
, config
):
1351 self
._config
= config
1352 self
._snmp
_trap
_engine
= cmk
.ec
.snmp
.SNMPTrapEngine(self
.settings
, self
._config
,
1353 self
._logger
.getChild("snmp"),
1354 self
.handle_snmptrap
)
1355 self
.compile_rules(self
._config
["rules"], self
._config
["rule_packs"])
1356 self
.host_config
.initialize()
1358 # Precompile regular expressions and similar stuff. Also convert legacy
1359 # "rules" parameter into new "rule_packs" parameter
1360 def compile_rules(self
, legacy_rules
, rule_packs
):
1362 self
._rule
_by
_id
= {}
1363 self
._rule
_hash
= {} # Speedup-Hash for rule execution
1366 count_unspecific
= 0
1368 # Loop through all rule packages and with through their rules
1369 for rule_pack
in rule_packs
:
1370 if rule_pack
["disabled"]:
1371 count_disabled
+= len(rule_pack
["rules"])
1374 for rule
in rule_pack
["rules"]:
1375 if rule
.get("disabled"):
1379 rule
= rule
.copy() # keep original intact because of slave replication
1381 # Store information about rule pack right within the rule. This is needed
1382 # for debug output and also for skipping rule packs
1383 rule
["pack"] = rule_pack
["id"]
1384 self
._rules
.append(rule
)
1385 self
._rule
_by
_id
[rule
["id"]] = rule
1388 "match", "match_ok", "match_host", "match_application",
1389 "cancel_application"
1392 value
= self
._compile
_matching
_value
(key
, rule
[key
])
1399 if 'state' in rule
and isinstance(rule
['state'], tuple) \
1400 and rule
['state'][0] == 'text_pattern':
1401 for key
in ['2', '1', '0']:
1402 if key
in rule
['state'][1]:
1403 value
= self
._compile
_matching
_value
(
1404 'state', rule
['state'][1][key
])
1406 del rule
['state'][1][key
]
1408 rule
['state'][1][key
] = value
1410 except Exception as e
:
1411 if self
.settings
.options
.debug
:
1413 rule
["disabled"] = True
1415 self
._logger
.exception(
1416 "Ignoring rule '%s/%s' because of an invalid regex (%s)." %
1417 (rule
["pack"], rule
["id"], e
))
1419 if self
._config
["rule_optimizer"]:
1420 self
.hash_rule(rule
)
1421 if "match_facility" not in rule \
1422 and "match_priority" not in rule \
1423 and "cancel_priority" not in rule \
1424 and "cancel_application" not in rule
:
1425 count_unspecific
+= 1
1428 "Compiled %d active rules (ignoring %d disabled rules)" % (count_rules
, count_disabled
))
1429 if self
._config
["rule_optimizer"]:
1430 self
._logger
.info("Rule hash: %d rules - %d hashed, %d unspecific" % (len(
1431 self
._rules
), len(self
._rules
) - count_unspecific
, count_unspecific
))
1432 for facility
in range(23) + [31]:
1433 if facility
in self
._rule
_hash
:
1435 for prio
, entries
in self
._rule
_hash
[facility
].iteritems():
1436 stats
.append("%s(%d)" % (SyslogPriority(prio
), len(entries
)))
1437 self
._logger
.info(" %-12s: %s" % (SyslogFacility(facility
), " ".join(stats
)))
1440 def _compile_matching_value(key
, val
):
1442 # Remove leading .* from regex. This is redundant and
1443 # dramatically destroys performance when doing an infix search.
1444 if key
in ["match", "match_ok"]:
1445 while value
.startswith(".*") and not value
.startswith(".*?"):
1451 if cmk
.utils
.regex
.is_regex(value
):
1452 return re
.compile(value
, re
.IGNORECASE
)
1455 def hash_rule(self
, rule
):
1456 # Construct rule hash for faster execution.
1457 facility
= rule
.get("match_facility")
1458 if facility
and not rule
.get("invert_matching"):
1459 self
.hash_rule_facility(rule
, facility
)
1461 for facility
in xrange(32): # all syslog facilities
1462 self
.hash_rule_facility(rule
, facility
)
1464 def hash_rule_facility(self
, rule
, facility
):
1465 needed_prios
= [False] * 8
1466 for key
in ["match_priority", "cancel_priority"]:
1468 prio_from
, prio_to
= rule
[key
]
1469 # Beware: from > to!
1470 for p
in xrange(prio_to
, prio_from
+ 1):
1471 needed_prios
[p
] = True
1472 elif key
== "match_priority": # all priorities match
1473 needed_prios
= [True] * 8 # needed to check this rule for all event priorities
1474 elif "match_ok" in rule
: # a cancelling rule where all priorities cancel
1475 needed_prios
= [True] * 8 # needed to check this rule for all event priorities
1477 if rule
.get("invert_matching"):
1478 needed_prios
= [True] * 8
1480 prio_hash
= self
._rule
_hash
.setdefault(facility
, {})
1481 for prio
, need
in enumerate(needed_prios
):
1483 prio_hash
.setdefault(prio
, []).append(rule
)
1485 def output_hash_stats(self
):
1486 self
._logger
.info("Top 20 of facility/priority:")
1489 for facility
in xrange(32):
1490 for priority
in xrange(8):
1491 count
= self
._hash
_stats
[facility
][priority
]
1493 total_count
+= count
1494 entries
.append((count
, (facility
, priority
)))
1497 for count
, (facility
, priority
) in entries
[:20]:
1498 self
._logger
.info(" %s/%s - %d (%.2f%%)" % (SyslogFacility(facility
),
1499 SyslogPriority(priority
), count
,
1500 (100.0 * count
/ float(total_count
))))
1502 def process_line(self
, line
, address
):
1503 line
= line
.rstrip()
1504 if self
._config
["debug_rules"]:
1506 self
._logger
.info(u
"Processing message from %r: '%s'" % (address
, line
))
1508 self
._logger
.info(u
"Processing message '%s'" % line
)
1510 event
= self
.create_event_from_line(line
, address
)
1511 self
.process_event(event
)
1513 def process_event(self
, event
):
1514 self
.do_translate_hostname(event
)
1516 # Log all incoming messages into a syslog-like text file if that is enabled
1517 if self
._config
["log_messages"]:
1518 self
.log_message(event
)
1521 if self
._config
["rule_optimizer"]:
1522 self
._hash
_stats
[event
["facility"]][event
["priority"]] += 1
1523 rule_candidates
= self
._rule
_hash
.get(event
["facility"], {}).get(event
["priority"], [])
1525 rule_candidates
= self
._rules
1528 for rule
in rule_candidates
:
1529 if skip_pack
and rule
["pack"] == skip_pack
:
1530 continue # still in the rule pack that we want to skip
1531 skip_pack
= None # new pack, reset skipping
1534 result
= self
.event_rule_matches(rule
, event
)
1535 except Exception as e
:
1536 self
._logger
.exception(' Exception during matching:\n%s' % e
)
1539 if result
: # A tuple with (True/False, {match_info}).. O.o
1540 self
._perfcounters
.count("rule_hits")
1541 cancelling
, match_groups
= result
1543 if self
._config
["debug_rules"]:
1544 self
._logger
.info(" matching groups:\n%s" % pprint
.pformat(match_groups
))
1546 self
._event
_status
.count_rule_match(rule
["id"])
1547 if self
._config
["log_rulehits"]:
1548 self
._logger
.info("Rule '%s/%s' hit by message %s/%s - '%s'." %
1549 (rule
["pack"], rule
["id"], SyslogFacility(event
["facility"]),
1550 SyslogPriority(event
["priority"]), event
["text"]))
1552 if rule
.get("drop"):
1553 if rule
["drop"] == "skip_pack":
1554 skip_pack
= rule
["pack"]
1555 if self
._config
["debug_rules"]:
1556 self
._logger
.info(" skipping this rule pack (%s)" % skip_pack
)
1559 self
._perfcounters
.count("drops")
1563 self
._event
_status
.cancel_events(self
, self
._event
_columns
, event
, match_groups
,
1567 # Remember the rule id that this event originated from
1568 event
["rule_id"] = rule
["id"]
1570 # Lookup the monitoring core hosts and add the core host
1571 # name to the event when one can be matched
1572 # For the moment we have no rule/condition matching on this
1573 # field. So we only add the core host info for matched events.
1574 self
._add
_core
_host
_to
_new
_event
(event
)
1576 # Attach optional contact group information for visibility
1577 # and eventually for notifications
1578 self
._add
_rule
_contact
_groups
_to
_event
(rule
, event
)
1580 # Store groups from matching this event. In order to make
1581 # persistence easier, we do not safe them as list but join
1583 event
["match_groups"] = match_groups
.get("match_groups_message", ())
1584 event
["match_groups_syslog_application"] = match_groups
.get(
1585 "match_groups_syslog_application", ())
1586 self
.rewrite_event(rule
, event
, match_groups
)
1589 count
= rule
["count"]
1590 # Check if a matching event already exists that we need to
1591 # count up. If the count reaches the limit, the event will
1592 # be opened and its rule actions performed.
1594 self
._event
_status
.count_event(self
, event
, rule
, count
)
1597 if self
._config
["debug_rules"]:
1598 self
._logger
.info("Event opening will be delayed for %d seconds"
1600 existing_event
["delay_until"] = time
.time() + rule
["delay"]
1601 existing_event
["phase"] = "delayed"
1603 cmk
.ec
.actions
.event_has_opened(
1604 self
._history
, self
.settings
, self
._config
, self
._logger
, self
,
1605 self
._event
_columns
, rule
, existing_event
)
1607 self
._history
.add(existing_event
, "COUNTREACHED")
1609 if "delay" not in rule
and rule
.get("autodelete"):
1610 existing_event
["phase"] = "closed"
1611 self
._history
.add(existing_event
, "AUTODELETE")
1612 with self
._event
_status
.lock
:
1613 self
._event
_status
.remove_event(existing_event
)
1614 elif "expect" in rule
:
1615 self
._event
_status
.count_expected_event(self
, event
)
1618 if self
._config
["debug_rules"]:
1620 "Event opening will be delayed for %d seconds" % rule
["delay"])
1621 event
["delay_until"] = time
.time() + rule
["delay"]
1622 event
["phase"] = "delayed"
1624 event
["phase"] = "open"
1626 if self
.new_event_respecting_limits(event
):
1627 if event
["phase"] == "open":
1628 cmk
.ec
.actions
.event_has_opened(self
._history
, self
.settings
,
1629 self
._config
, self
._logger
, self
,
1630 self
._event
_columns
, rule
, event
)
1631 if rule
.get("autodelete"):
1632 event
["phase"] = "closed"
1633 self
._history
.add(event
, "AUTODELETE")
1634 with self
._event
_status
.lock
:
1635 self
._event
_status
.remove_event(event
)
1638 # End of loop over rules.
1639 if self
._config
["archive_orphans"]:
1640 self
._event
_status
.archive_event(event
)
1642 def _add_rule_contact_groups_to_event(self
, rule
, event
):
1643 if rule
.get("contact_groups") is None:
1645 "contact_groups": None,
1646 "contact_groups_notify": False,
1647 "contact_groups_precedence": "host",
1651 "contact_groups": rule
["contact_groups"]["groups"],
1652 "contact_groups_notify": rule
["contact_groups"]["notify"],
1653 "contact_groups_precedence": rule
["contact_groups"]["precedence"],
1656 def add_core_host_to_event(self
, event
):
1657 matched_host
= self
.host_config
.get_by_event_host_name(event
["host"])
1658 if not matched_host
:
1659 event
["core_host"] = ""
1662 event
["core_host"] = matched_host
["name"]
1664 def _add_core_host_to_new_event(self
, event
):
1665 self
.add_core_host_to_event(event
)
1667 # Add some state dependent information (like host is in downtime etc.)
1668 event
["host_in_downtime"] = self
._is
_host
_in
_downtime
(event
)
1670 def _is_host_in_downtime(self
, event
):
1671 if not event
["core_host"]:
1672 return False # Found no host in core: Not in downtime!
1674 query
= ("GET hosts\n"
1675 "Columns: scheduled_downtime_depth\n"
1676 "Filter: host_name = %s\n" % (event
["core_host"]))
1679 return livestatus
.LocalConnection().query_value(query
) >= 1
1681 except livestatus
.MKLivestatusNotFoundError
:
1685 if cmk
.utils
.debug
.enabled():
1689 # Checks if an event matches a rule. Returns either False (no match)
1690 # or a pair of matchtype, groups, where matchtype is False for a
1691 # normal match and True for a cancelling match and the groups is a tuple
1692 # if matched regex groups in either text (normal) or match_ok (cancelling)
1694 def event_rule_matches(self
, rule
, event
):
1695 self
._perfcounters
.count("rule_tries")
1696 with self
._lock
_configuration
:
1697 result
= self
._rule
_matcher
.event_rule_matches_non_inverted(rule
, event
)
1698 if rule
.get("invert_matching"):
1701 if self
._config
["debug_rules"]:
1703 " Rule would not match, but due to inverted matching does.")
1706 if self
._config
["debug_rules"]:
1708 " Rule would match, but due to inverted matching does not.")
1712 # Rewrite texts and compute other fields in the event
1713 def rewrite_event(self
, rule
, event
, groups
):
1714 if rule
["state"] == -1:
1715 prio
= event
["priority"]
1722 elif isinstance(rule
["state"], tuple) and rule
["state"][0] == "text_pattern":
1723 for key
in ['2', '1', '0', '3']:
1724 if key
in rule
["state"][1]:
1725 match_groups
= match(rule
["state"][1][key
], event
["text"], complete
=False)
1726 if match_groups
is not False:
1727 event
["state"] = int(key
)
1729 elif key
== '3': # No rule matched!
1732 event
["state"] = rule
["state"]
1734 if ("sl" not in event
) or (rule
["sl"]["precedence"] == "rule"):
1735 event
["sl"] = rule
["sl"]["value"]
1736 event
["first"] = event
["time"]
1737 event
["last"] = event
["time"]
1738 if "set_comment" in rule
:
1739 event
["comment"] = replace_groups(rule
["set_comment"], event
["text"], groups
)
1740 if "set_text" in rule
:
1741 event
["text"] = replace_groups(rule
["set_text"], event
["text"], groups
)
1742 if "set_host" in rule
:
1743 event
["orig_host"] = event
["host"]
1744 event
["host"] = replace_groups(rule
["set_host"], event
["host"], groups
)
1745 if "set_application" in rule
:
1746 event
["application"] = replace_groups(rule
["set_application"], event
["application"],
1748 if "set_contact" in rule
and "contact" not in event
:
1749 event
["contact"] = replace_groups(rule
["set_contact"], event
.get("contact", ""), groups
)
1751 def parse_syslog_info(self
, line
):
1753 # Replaced ":" by ": " here to make tags with ":" possible. This
1754 # is needed to process logs generated by windows agent logfiles
1755 # like "c://test.log".
1756 tag
, message
= line
.split(": ", 1)
1757 event
["text"] = message
.strip()
1760 app
, pid
= tag
.split('[', 1)
1761 pid
= pid
.rstrip(']')
1766 event
["application"] = app
1770 def parse_rfc5424_syslog_info(self
, line
):
1773 (_unused_version
, timestamp
, hostname
, app_name
, procid
, _unused_msgid
, rest
) = line
.split(
1776 # There is no 3339 parsing built into python. We do ignore subseconds and timezones
1777 # here. This is seems to be ok for the moment - sorry. Please drop a note if you
1778 # got a good solutuion for this.
1779 event
['time'] = time
.mktime(time
.strptime(timestamp
[:19], '%Y-%m-%dT%H:%M:%S'))
1782 event
["host"] = hostname
1785 event
["application"] = app_name
1788 event
["pid"] = procid
1791 # has stuctured data
1792 structured_data
, message
= rest
[1:].split("] ", 1)
1793 elif rest
.startswith("- "):
1794 # has no stuctured data
1795 structured_data
, message
= rest
.split(" ", 1)
1797 raise Exception("Invalid RFC 5424 syslog message")
1799 if structured_data
!= "-":
1800 event
["text"] = "[%s] %s" % (structured_data
, message
)
1802 event
["text"] = message
1806 def parse_monitoring_info(self
, line
):
1808 # line starts with '@'
1810 timestamp_str
, sl
, contact
, rest
= line
[1:].split(';', 3)
1811 host
, rest
= rest
.split(None, 1)
1813 event
["sl"] = int(sl
)
1815 event
["contact"] = contact
1817 timestamp_str
, host
, rest
= line
[1:].split(None, 2)
1819 event
["time"] = float(int(timestamp_str
))
1820 service
, message
= rest
.split(": ", 1)
1821 event
["application"] = service
1822 event
["text"] = message
.strip()
1823 event
["host"] = host
1826 # Translate a hostname if this is configured. We are
1827 # *really* sorry: this code snipped is copied from modules/check_mk_base.py.
1828 # There is still no common library. Please keep this in sync with the
1830 def translate_hostname(self
, backedhost
):
1831 translation
= self
._config
["hostname_translation"]
1833 # Here comes the original code from modules/check_mk_base.py
1835 # 1. Case conversion
1836 caseconf
= translation
.get("case")
1837 if caseconf
== "upper":
1838 backedhost
= backedhost
.upper()
1839 elif caseconf
== "lower":
1840 backedhost
= backedhost
.lower()
1842 # 2. Drop domain part (not applied to IP addresses!)
1843 if translation
.get("drop_domain") and backedhost
:
1844 # only apply if first part does not convert successfully into an int
1845 firstpart
= backedhost
.split(".", 1)[0]
1849 backedhost
= firstpart
1851 # 3. Regular expression conversion
1852 if "regex" in translation
:
1853 regex
, subst
= translation
.get("regex")
1854 if not regex
.endswith('$'):
1856 rcomp
= cmk
.utils
.regex
.regex(regex
)
1857 mo
= rcomp
.match(backedhost
)
1860 for nr
, text
in enumerate(mo
.groups()):
1861 backedhost
= backedhost
.replace("\\%d" % (nr
+ 1), text
)
1863 # 4. Explicity mapping
1864 for from_host
, to_host
in translation
.get("mapping", []):
1865 if from_host
== backedhost
:
1866 backedhost
= to_host
1871 def do_translate_hostname(self
, event
):
1873 event
["host"] = self
.translate_hostname(event
["host"])
1874 except Exception as e
:
1875 if self
._config
["debug_rules"]:
1876 self
._logger
.exception('Unable to parse host "%s" (%s)' % (event
.get("host"), e
))
1879 def create_event_from_line(self
, line
, address
):
1881 # address is either None or a tuple of (ipaddress, port)
1882 "ipaddress": address
and address
[0] or "",
1884 "host_in_downtime": False,
1887 # Variant 1: plain syslog message without priority/facility:
1888 # May 26 13:45:01 Klapprechner CRON[8046]: message....
1890 # Variant 2: syslog message including facility (RFC 3164)
1891 # <78>May 26 13:45:01 Klapprechner CRON[8046]: message....
1893 # Variant 3: local Nagios alert posted by mkevent -n
1894 # <154>@1341847712;5;Contact Info; MyHost My Service: CRIT - This che
1896 # Variant 4: remote Nagios alert posted by mkevent -n -> syslog
1897 # <154>Jul 9 17:28:32 Klapprechner @1341847712;5;Contact Info; MyHost My Service: CRIT - This che
1899 # Variant 5: syslog message
1900 # Timestamp is RFC3339 with additional restrictions:
1901 # - The "T" and "Z" characters in this syntax MUST be upper case.
1902 # - Usage of the "T" character is REQUIRED.
1903 # - Leap seconds MUST NOT be used.
1904 # <166>2013-04-05T13:49:31.685Z esx Vpxa: message....
1906 # Variant 6: syslog message without date / host:
1907 # <5>SYSTEM_INFO: [WLAN-1] Triggering Background Scan
1909 # Variant 7: logwatch.ec event forwarding
1910 # <78>@1341847712 Klapprechner /var/log/syslog: message....
1912 # Variant 7a: Event simulation
1913 # <%PRI%>@%TIMESTAMP%;%SL% %HOSTNAME% %syslogtag%: %msg%
1915 # Variant 8: syslog message from sophos firewall
1916 # <84>2015:03:25-12:02:06 gw pluto[7122]: listening for IKE messages
1918 # Variant 9: syslog message (RFC 5424)
1919 # <134>1 2016-06-02T12:49:05.181+02:00 chrissw7 ChrisApp - TestID - coming from java code
1922 # 2016 May 26 15:41:47 IST XYZ Ebra: %LINEPROTO-5-UPDOWN: Line protocol on Interface Ethernet45 (XXX.ASAD.Et45), changed state to up
1923 # year month day hh:mm:ss timezone HOSTNAME KeyAgent:
1925 # FIXME: Would be better to parse the syslog messages in another way:
1926 # Split the message by the first ":", then split the syslog header part
1927 # and detect which information are present. Take a look at the syslog RFCs
1930 # Variant 2,3,4,5,6,7,8
1931 if line
.startswith('<'):
1933 prio
= int(line
[1:i
])
1935 event
["facility"] = prio
>> 3
1936 event
["priority"] = prio
& 7
1940 event
["facility"] = 1 # user
1941 event
["priority"] = 5 # notice
1944 if line
[0] == '@' and line
[11] in [' ', ';']:
1945 details
, event
['host'], line
= line
.split(' ', 2)
1946 detail_tokens
= details
.split(';')
1947 timestamp
= detail_tokens
[0]
1948 if len(detail_tokens
) > 1:
1949 event
["sl"] = int(detail_tokens
[1])
1950 event
['time'] = float(timestamp
[1:])
1951 event
.update(self
.parse_syslog_info(line
))
1954 elif line
.startswith("@"):
1955 event
.update(self
.parse_monitoring_info(line
))
1958 elif len(line
) > 24 and line
[10] == 'T':
1959 # There is no 3339 parsing built into python. We do ignore subseconds and timezones
1960 # here. This is seems to be ok for the moment - sorry. Please drop a note if you
1961 # got a good solutuion for this.
1962 rfc3339_part
, event
['host'], line
= line
.split(' ', 2)
1963 event
['time'] = time
.mktime(time
.strptime(rfc3339_part
[:19], '%Y-%m-%dT%H:%M:%S'))
1964 event
.update(self
.parse_syslog_info(line
))
1967 elif len(line
) > 24 and line
[12] == "T":
1968 event
.update(self
.parse_rfc5424_syslog_info(line
))
1971 elif line
[10] == '-' and line
[19] == ' ':
1972 event
['host'] = line
.split(' ')[1]
1973 event
['time'] = time
.mktime(time
.strptime(line
.split(' ')[0], '%Y:%m:%d-%H:%M:%S'))
1974 rest
= " ".join(line
.split(' ')[2:])
1975 event
.update(self
.parse_syslog_info(rest
))
1978 elif len(line
.split(': ', 1)[0].split(' ')) == 1:
1979 event
.update(self
.parse_syslog_info(line
))
1980 # There is no datetime information in the message, use current time
1981 event
['time'] = time
.time()
1982 # There is no host information, use the provided address
1983 if address
and isinstance(address
, tuple):
1984 event
["host"] = address
[0]
1987 elif line
[4] == " " and line
[:4].isdigit():
1988 time_part
= line
[:20] # ignoring tz info
1989 event
["host"], application
, line
= line
[25:].split(" ", 2)
1990 event
["application"] = application
.rstrip(":")
1991 event
["text"] = line
1992 event
['time'] = time
.mktime(time
.strptime(time_part
, '%Y %b %d %H:%M:%S'))
1996 month_name
, day
, timeofday
, host
, rest
= line
.split(None, 4)
1997 event
["host"] = host
2000 if rest
.startswith("@"):
2001 event
.update(self
.parse_monitoring_info(rest
))
2005 event
.update(self
.parse_syslog_info(rest
))
2007 month
= EventServer
.month_names
[month_name
]
2010 # Nasty: the year is not contained in the message. We cannot simply
2011 # assume that the message if from the current year.
2012 lt
= time
.localtime()
2013 if lt
.tm_mon
< 6 and month
> 6: # Assume that message is from last year
2014 year
= lt
.tm_year
- 1
2016 year
= lt
.tm_year
# Assume the current year
2018 hours
, minutes
, seconds
= map(int, timeofday
.split(":"))
2020 # A further problem here: we do not now whether the message is in DST or not
2021 event
["time"] = time
.mktime((year
, month
, day
, hours
, minutes
, seconds
, 0, 0,
2024 # The event simulator ships the simulated original IP address in the
2025 # hostname field, separated with a pipe, e.g. "myhost|1.2.3.4"
2026 if "|" in event
["host"]:
2027 event
["host"], event
["ipaddress"] = event
["host"].split("|", 1)
2029 except Exception as e
:
2030 if self
._config
["debug_rules"]:
2031 self
._logger
.exception('Got non-syslog message "%s" (%s)' % (line
, e
))
2037 "ipaddress": address
and address
[0] or "",
2040 "time": time
.time(),
2042 "host_in_downtime": False,
2045 if self
._config
["debug_rules"]:
2046 self
._logger
.info('Parsed message:\n' + ("".join(
2047 [" %-15s %s\n" % (k
+ ":", v
) for (k
, v
) in sorted(event
.iteritems())])).rstrip())
2051 def log_message(self
, event
):
2053 with cmk
.ec
.history
.get_logfile(self
._config
, self
.settings
.paths
.messages_dir
.value
,
2054 self
._message
_period
).open(mode
='ab') as f
:
2055 f
.write("%s %s %s%s: %s\n" % (time
.strftime("%b %d %H:%M:%S",
2056 time
.localtime(event
["time"])),
2057 event
["host"], event
["application"], event
["pid"] and
2058 ("[%s]" % event
["pid"]) or "", event
["text"]))
2060 if self
.settings
.options
.debug
:
2062 # Better silently ignore errors. We could have run out of
2063 # diskspace and make things worse by logging that we could
2066 def get_hosts_with_active_event_limit(self
):
2068 for hostname
, num_existing_events
in self
._event
_status
.num_existing_events_by_host
.iteritems(
2070 if num_existing_events
>= self
._config
["event_limit"]["by_host"]["limit"]:
2071 hosts
.append(hostname
)
2074 def get_rules_with_active_event_limit(self
):
2076 for rule_id
, num_existing_events
in self
._event
_status
.num_existing_events_by_rule
.iteritems(
2079 continue # Ignore rule unrelated overflow events. They have no rule id associated.
2080 if num_existing_events
>= self
._config
["event_limit"]["by_rule"]["limit"]:
2081 rule_ids
.append(rule_id
)
2084 def is_overall_event_limit_active(self
):
2085 return self
._event
_status
.num_existing_events \
2086 >= self
._config
["event_limit"]["overall"]["limit"]
2088 # protected by self._event_status.lock
2089 def new_event_respecting_limits(self
, event
):
2090 self
._logger
.verbose(
2091 "Checking limit for message from %s (rule '%s')" % (event
["host"], event
["rule_id"]))
2093 with self
._event
_status
.lock
:
2094 if self
._handle
_event
_limit
("overall", event
):
2097 if self
._handle
_event
_limit
("by_host", event
):
2100 if self
._handle
_event
_limit
("by_rule", event
):
2103 self
._event
_status
.new_event(event
)
2106 # The following actions can be configured:
2107 # stop Stop creating new events
2108 # stop_overflow Stop creating new events, create overflow event
2109 # stop_overflow_notify Stop creating new events, create overflow event, notfy
2110 # delete_oldest Delete oldest event, create new event
2111 # protected by self._event_status.lock
2113 # Returns False if the event has been created and actions should be
2114 # performed on that event
2115 def _handle_event_limit(self
, ty
, event
):
2116 assert ty
in ["overall", "by_rule", "by_host"]
2118 num_already_open
= self
._event
_status
.get_num_existing_events_by(ty
, event
)
2119 limit
, action
= self
._get
_event
_limit
(ty
, event
)
2120 self
._logger
.verbose(
2121 " Type: %s, already open events: %d, Limit: %d" % (ty
, num_already_open
, limit
))
2123 # Limit not reached: add new event
2124 if num_already_open
< limit
:
2125 num_already_open
+= 1 # after adding this event
2127 # Limit even then still not reached: we are fine
2128 if num_already_open
< limit
:
2131 # Delete oldest messages if that is the configure method of keeping the limit
2132 if action
== "delete_oldest":
2133 while num_already_open
> limit
:
2134 self
._perfcounters
.count("overflows")
2135 self
._event
_status
.remove_oldest_event(ty
, event
)
2136 num_already_open
-= 1
2139 # Limit reached already in the past: Simply drop silently
2140 if num_already_open
> limit
:
2141 # Just log in verbose mode! Otherwise log file will be flooded
2142 self
._logger
.verbose(" Skip processing because limit is already in effect")
2143 self
._perfcounters
.count("overflows")
2144 return True # Prevent creation and prevent one time actions (below)
2146 self
._logger
.info(" The %s limit has been reached" % ty
)
2148 # This is the event which reached the limit, allow creation of it. Further
2149 # events will be stopped.
2151 # Perform one time actions
2152 overflow_event
= self
._create
_overflow
_event
(ty
, event
, limit
)
2154 if "overflow" in action
:
2155 self
._logger
.info(" Creating overflow event")
2156 self
._event
_status
.new_event(overflow_event
)
2158 if "notify" in action
:
2159 self
._logger
.info(" Creating overflow notification")
2160 cmk
.ec
.actions
.do_notify(self
, self
._logger
, overflow_event
)
2164 # protected by self._event_status.lock
2165 def _get_event_limit(self
, ty
, event
):
2166 # Prefer the rule individual limit for by_rule limit (in case there is some)
2168 rule_limit
= self
._rule
_by
_id
[event
["rule_id"]].get("event_limit")
2170 return rule_limit
["limit"], rule_limit
["action"]
2172 # Prefer the host individual limit for by_host limit (in case there is some)
2174 host_config
= self
.host_config
.get(event
["core_host"], {})
2175 host_limit
= host_config
.get("custom_variables", {}).get("EC_EVENT_LIMIT")
2177 limit
, action
= host_limit
.split(":", 1)
2178 return int(limit
), action
2180 limit
= self
._config
["event_limit"][ty
]["limit"]
2181 action
= self
._config
["event_limit"][ty
]["action"]
2183 return limit
, action
2185 def _create_overflow_event(self
, ty
, event
, limit
):
2197 "application": "Event Console",
2199 "priority": 2, # crit
2200 "facility": 1, # user
2202 "match_groups_syslog_application": (),
2206 "host_in_downtime": False,
2208 self
._add
_rule
_contact
_groups
_to
_event
({}, new_event
)
2211 new_event
["text"] = ("The overall event limit of %d open events has been reached. Not "
2212 "opening any additional event until open events have been "
2213 "archived." % limit
)
2215 elif ty
== "by_host":
2217 "host": event
["host"],
2218 "ipaddress": event
["ipaddress"],
2219 "text": ("The host event limit of %d open events has been reached for host \"%s\". "
2220 "Not opening any additional event for this host until open events have "
2221 "been archived." % (limit
, event
["host"]))
2224 # Lookup the monitoring core hosts and add the core host
2225 # name to the event when one can be matched
2226 self
._add
_core
_host
_to
_new
_event
(new_event
)
2228 elif ty
== "by_rule":
2230 "rule_id": event
["rule_id"],
2231 "contact_groups": event
["contact_groups"],
2232 "contact_groups_notify": event
.get("contact_groups_notify", False),
2233 "contact_groups_precedence": event
.get("contact_groups_precedence", "host"),
2234 "text": ("The rule event limit of %d open events has been reached for rule \"%s\". "
2235 "Not opening any additional event for this rule until open events have "
2236 "been archived." % (limit
, event
["rule_id"]))
2240 raise NotImplementedError()
2245 class RuleMatcher(object):
2246 def __init__(self
, logger
, config
):
2247 super(RuleMatcher
, self
).__init
__()
2248 self
._logger
= logger
2249 self
._config
= config
2250 self
._time
_periods
= TimePeriods(logger
)
2253 def _debug_rules(self
):
2254 return self
._config
["debug_rules"]
2256 def event_rule_matches_non_inverted(self
, rule
, event
):
2257 if self
._debug
_rules
:
2258 self
._logger
.info("Trying rule %s/%s..." % (rule
["pack"], rule
["id"]))
2259 self
._logger
.info(" Text: %s" % event
["text"])
2260 self
._logger
.info(" Syslog: %d.%d" % (event
["facility"], event
["priority"]))
2261 self
._logger
.info(" Host: %s" % event
["host"])
2263 # Generic conditions without positive/canceling matches
2264 if not self
.event_rule_matches_generic(rule
, event
):
2267 # Determine syslog priority
2269 if not self
.event_rule_determine_match_priority(rule
, event
, match_priority
):
2270 # Abort on negative outcome, neither positive nor negative
2273 # Determine and cleanup match_groups
2275 if not self
.event_rule_determine_match_groups(rule
, event
, match_groups
):
2276 # Abort on negative outcome, neither positive nor negative
2279 return self
._check
_match
_outcome
(rule
, match_groups
, match_priority
)
2281 def _check_match_outcome(self
, rule
, match_groups
, match_priority
):
2282 # type: (Dict[str, Any], Dict[str, Any], Dict[str, Any]) -> Union[bool, Tuple[bool, Dict[str, Any]]]
2283 """Decide or not a event is created, canceled or nothing is done"""
2285 # Check canceling-event
2286 has_canceling_condition
= bool(
2287 [x
for x
in ["match_ok", "cancel_application", "cancel_priority"] if x
in rule
])
2288 if has_canceling_condition
:
2289 if ("match_ok" not in rule
or match_groups
.get("match_groups_message_ok", False) is not False) and\
2290 ("cancel_application" not in rule
or
2291 match_groups
.get("match_groups_syslog_application_ok", False) is not False) and\
2292 ("cancel_priority" not in rule
or match_priority
["has_canceling_match"] is True):
2293 if self
._debug
_rules
:
2294 self
._logger
.info(" found canceling event")
2295 return True, match_groups
2297 # Check create-event
2298 if match_groups
["match_groups_message"] is not False and\
2299 match_groups
.get("match_groups_syslog_application", ()) is not False and\
2300 match_priority
["has_match"] is True:
2301 if self
._debug
_rules
:
2302 self
._logger
.info(" found new event")
2303 return False, match_groups
2305 # Looks like there was no match, output some additonal info
2306 # Reasons preventing create-event
2307 if self
._debug
_rules
:
2308 if match_groups
["match_groups_message"] is False:
2309 self
._logger
.info(" did not create event, because of wrong message")
2310 if "match_application" in rule
and match_groups
[
2311 "match_groups_syslog_application"] is False:
2312 self
._logger
.info(" did not create event, because of wrong syslog application")
2313 if "match_priority" in rule
and match_priority
["has_match"] is False:
2314 self
._logger
.info(" did not create event, because of wrong syslog priority")
2316 if has_canceling_condition
:
2317 # Reasons preventing cancel-event
2318 if "match_ok" in rule
and match_groups
.get("match_groups_message_ok",
2320 self
._logger
.info(" did not cancel event, because of wrong message")
2321 if "cancel_application" in rule
and \
2322 match_groups
.get("match_groups_syslog_application_ok", False) is False:
2323 self
._logger
.info(" did not cancel event, because of wrong syslog application")
2324 if "cancel_priority" in rule
and match_priority
["has_canceling_match"] is False:
2325 self
._logger
.info(" did not cancel event, because of wrong cancel priority")
2329 def event_rule_matches_generic(self
, rule
, event
):
2330 generic_match_functions
= [
2331 self
.event_rule_matches_site
,
2332 self
.event_rule_matches_host
,
2333 self
.event_rule_matches_ip
,
2334 self
.event_rule_matches_facility
,
2335 self
.event_rule_matches_service_level
,
2336 self
.event_rule_matches_timeperiod
,
2339 for match_function
in generic_match_functions
:
2340 if not match_function(rule
, event
):
2344 def event_rule_determine_match_priority(self
, rule
, event
, match_priority
):
2345 p
= event
["priority"]
2347 if "match_priority" in rule
:
2348 prio_from
, prio_to
= rule
["match_priority"]
2349 if prio_from
> prio_to
:
2350 prio_to
, prio_from
= prio_from
, prio_to
2351 match_priority
["has_match"] = prio_from
<= p
<= prio_to
2353 match_priority
["has_match"] = True
2355 if "cancel_priority" in rule
:
2356 cancel_from
, cancel_to
= rule
["cancel_priority"]
2357 match_priority
["has_canceling_match"] = cancel_from
<= p
<= cancel_to
2359 match_priority
["has_canceling_match"] = False
2361 if match_priority
["has_match"] is False and\
2362 match_priority
["has_canceling_match"] is False:
2367 def event_rule_matches_site(self
, rule
, event
):
2368 return "match_site" not in rule
or cmk
.omd_site() in rule
["match_site"]
2370 def event_rule_matches_host(self
, rule
, event
):
2371 if match(rule
.get("match_host"), event
["host"], complete
=True) is False:
2372 if self
._debug
_rules
:
2373 self
._logger
.info(" did not match because of wrong host '%s' (need '%s')" %
2374 (event
["host"], format_pattern(rule
.get("match_host"))))
2378 def event_rule_matches_ip(self
, rule
, event
):
2379 if not match_ipv4_network(rule
.get("match_ipaddress", "0.0.0.0/0"), event
["ipaddress"]):
2380 if self
._debug
_rules
:
2382 " did not match because of wrong source IP address '%s' (need '%s')" %
2383 (event
["ipaddress"], rule
.get("match_ipaddress")))
2387 def event_rule_matches_facility(self
, rule
, event
):
2388 if "match_facility" in rule
and event
["facility"] != rule
["match_facility"]:
2389 if self
._debug
_rules
:
2390 self
._logger
.info(" did not match because of wrong syslog facility")
2394 def event_rule_matches_service_level(self
, rule
, event
):
2395 if "match_sl" in rule
:
2396 sl_from
, sl_to
= rule
["match_sl"]
2398 sl_to
, sl_from
= sl_from
, sl_to
2399 p
= event
.get("sl", 0)
2400 if p
< sl_from
or p
> sl_to
:
2401 if self
._debug
_rules
:
2403 " did not match because of wrong service level %d (need %d..%d)" %
2404 (p
, sl_from
, sl_to
),)
2408 def event_rule_matches_timeperiod(self
, rule
, event
):
2409 if "match_timeperiod" in rule
and not self
._time
_periods
.check(rule
["match_timeperiod"]):
2410 if self
._debug
_rules
:
2411 self
._logger
.info(" did not match, because timeperiod %s is not active" %
2412 rule
["match_timeperiod"])
2416 def event_rule_determine_match_groups(self
, rule
, event
, match_groups
):
2417 match_group_functions
= [
2418 self
.event_rule_matches_syslog_application
,
2419 self
.event_rule_matches_message
,
2421 for match_function
in match_group_functions
:
2422 if not match_function(rule
, event
, match_groups
):
2426 def event_rule_matches_syslog_application(self
, rule
, event
, match_groups
):
2427 if "match_application" not in rule
and "cancel_application" not in rule
:
2430 # Syslog application
2431 if "match_application" in rule
:
2432 match_groups
["match_groups_syslog_application"] = match(
2433 rule
.get("match_application"), event
["application"], complete
=False)
2435 # Syslog application canceling, this option must be explictly set
2436 if "cancel_application" in rule
:
2437 match_groups
["match_groups_syslog_application_ok"] = match(
2438 rule
.get("cancel_application"), event
["application"], complete
=False)
2440 # Detect impossible match
2441 if match_groups
.get("match_groups_syslog_application", False) is False and\
2442 match_groups
.get("match_groups_syslog_application_ok", False) is False:
2443 if self
._debug
_rules
:
2444 self
._logger
.info(" did not match, syslog application does not match")
2449 def event_rule_matches_message(self
, rule
, event
, match_groups
):
2450 # Message matching, this condition is always active
2451 match_groups
["match_groups_message"] = match(
2452 rule
.get("match"), event
["text"], complete
=False)
2454 # Message canceling, this option must be explictly set
2455 if "match_ok" in rule
:
2456 match_groups
["match_groups_message_ok"] = match(
2457 rule
.get("match_ok"), event
["text"], complete
=False)
2459 # Detect impossible match
2460 if match_groups
["match_groups_message"] is False and\
2461 match_groups
.get("match_groups_message_ok", False) is False:
2462 if self
._debug
_rules
:
2463 self
._logger
.info(" did not match, message text does not match")
2470 # .--Status Queries------------------------------------------------------.
2471 # | ____ _ _ ___ _ |
2472 # | / ___|| |_ __ _| |_ _ _ ___ / _ \ _ _ ___ _ __(_) ___ ___ |
2473 # | \___ \| __/ _` | __| | | / __| | | | | | | |/ _ \ '__| |/ _ \/ __| |
2474 # | ___) | || (_| | |_| |_| \__ \ | |_| | |_| | __/ | | | __/\__ \ |
2475 # | |____/ \__\__,_|\__|\__,_|___/ \__\_\\__,_|\___|_| |_|\___||___/ |
2477 # +----------------------------------------------------------------------+
2478 # | Parsing and processing of status queries |
2479 # '----------------------------------------------------------------------'
2482 class Queries(object):
2483 def __init__(self
, status_server
, sock
, logger
):
2484 super(Queries
, self
).__init
__()
2485 self
._status
_server
= status_server
2487 self
._logger
= logger
2495 parts
= self
._buffer
.split("\n\n", 1)
2498 data
= self
._socket
.recv(4096)
2500 if len(self
._buffer
) == 0:
2501 raise StopIteration()
2502 parts
= [self
._buffer
, ""]
2504 self
._buffer
+= data
2505 request
, self
._buffer
= parts
2506 return Query
.make(self
._status
_server
, request
.decode("utf-8").splitlines(), self
._logger
)
2509 class Query(object):
2511 def make(status_server
, raw_query
, logger
):
2512 parts
= raw_query
[0].split(None, 1)
2514 raise MKClientError("Invalid query. Need GET/COMMAND plus argument(s)")
2517 return QueryGET(status_server
, raw_query
, logger
)
2518 if method
== "REPLICATE":
2519 return QueryREPLICATE(status_server
, raw_query
, logger
)
2520 if method
== "COMMAND":
2521 return QueryCOMMAND(status_server
, raw_query
, logger
)
2522 raise MKClientError("Invalid method %s (allowed are GET, REPLICATE, COMMAND)" % method
)
2524 def __init__(self
, status_server
, raw_query
, logger
):
2525 super(Query
, self
).__init
__()
2527 self
._logger
= logger
2528 self
.output_format
= "python"
2530 self
._raw
_query
= raw_query
2531 self
._from
_raw
_query
(status_server
)
2533 def _from_raw_query(self
, status_server
):
2534 self
._parse
_method
_and
_args
()
2536 def _parse_method_and_args(self
):
2537 parts
= self
._raw
_query
[0].split(None, 1)
2539 raise MKClientError("Invalid query. Need GET/COMMAND plus argument(s)")
2541 self
.method
, self
.method_arg
= parts
2544 return repr("\n".join(self
._raw
_query
))
2547 class QueryGET(Query
):
2548 _filter_operators
= {
2549 "=": (lambda a
, b
: a
== b
),
2550 ">": (lambda a
, b
: a
> b
),
2551 "<": (lambda a
, b
: a
< b
),
2552 ">=": (lambda a
, b
: a
>= b
),
2553 "<=": (lambda a
, b
: a
<= b
),
2554 "~": (lambda a
, b
: cmk
.utils
.regex
.regex(b
).search(a
)),
2555 "=~": (lambda a
, b
: a
.lower() == b
.lower()),
2556 "~~": (lambda a
, b
: cmk
.utils
.regex
.regex(b
.lower()).search(a
.lower())),
2557 "in": (lambda a
, b
: a
in b
),
2560 def _from_raw_query(self
, status_server
):
2561 super(QueryGET
, self
)._from
_raw
_query
(status_server
)
2562 self
._parse
_table
(status_server
)
2563 self
._parse
_header
_lines
()
2565 def _parse_table(self
, status_server
):
2566 self
.table_name
= self
.method_arg
2567 self
.table
= status_server
.table(self
.table_name
)
2569 def _parse_header_lines(self
):
2570 self
.requested_columns
= self
.table
.column_names
# use all columns as default
2573 self
.only_host
= None
2575 self
.header_lines
= []
2576 for line
in self
._raw
_query
[1:]:
2578 header
, argument
= line
.rstrip("\n").split(":", 1)
2579 argument
= argument
.lstrip(" ")
2581 if header
== "OutputFormat":
2582 if argument
not in ["python", "plain", "json"]:
2583 raise MKClientError(
2584 "Invalid output format \"%s\" (allowed are: python, plain, json)" %
2587 self
.output_format
= argument
2589 elif header
== "Columns":
2590 self
.requested_columns
= argument
.split(" ")
2592 elif header
== "Filter":
2593 column_name
, operator_name
, predicate
, argument
= self
._parse
_filter
(argument
)
2595 # Needed for later optimization (check_mkevents)
2596 if column_name
== "event_host" and operator_name
== 'in':
2597 self
.only_host
= set(argument
)
2599 self
.filters
.append((column_name
, operator_name
, predicate
, argument
))
2601 elif header
== "Limit":
2602 self
.limit
= int(argument
)
2605 self
._logger
.info("Ignoring not-implemented header %s" % header
)
2607 except Exception as e
:
2608 raise MKClientError("Invalid header line '%s': %s" % (line
.rstrip(), e
))
2610 def _parse_filter(self
, textspec
):
2613 # name ~= This is some .* text
2615 parts
= textspec
.split(None, 2)
2618 column
, operator_name
, argument
= parts
2621 convert
= self
.table
.column_types
[column
]
2623 raise MKClientError(
2624 "Unknown column: %s (Available are: %s)" % (column
, self
.table
.column_names
))
2626 # TODO: BUG: The query is decoded to unicode after receiving it from
2627 # the socket. The columns with type str (initialied with "") will apply
2628 # str(argument) here and convert the value back to str! This will crash
2629 # when the filter contains non ascii characters!
2630 # Fix this by making the default values unicode and skip unicode conversion
2631 # here (for performance reasons) because argument is already unicode.
2632 if operator_name
== 'in':
2633 argument
= map(convert
, argument
.split())
2635 argument
= convert(argument
)
2637 operator_function
= self
._filter
_operators
.get(operator_name
)
2638 if not operator_function
:
2639 raise MKClientError("Unknown filter operator '%s'" % operator_name
)
2641 return (column
, operator_name
, lambda x
: operator_function(x
, argument
), argument
)
2643 def requested_column_indexes(self
):
2646 for column_name
in self
.requested_columns
:
2648 column_index
= self
.table
.column_indices
[column_name
]
2650 # The column is not known: Use None as index and None value later
2652 indexes
.append(column_index
)
2656 def filter_row(self
, row
):
2657 for column_name
, _operator_name
, predicate
, _argument
in self
.filters
:
2658 if not predicate(row
[self
.table
.column_indices
[column_name
]]):
2663 class QueryREPLICATE(Query
):
2667 class QueryCOMMAND(Query
):
2672 # .--Status Tables-------------------------------------------------------.
2673 # | ____ _ _ _____ _ _ |
2674 # | / ___|| |_ __ _| |_ _ _ ___ |_ _|_ _| |__ | | ___ ___ |
2675 # | \___ \| __/ _` | __| | | / __| | |/ _` | '_ \| |/ _ \/ __| |
2676 # | ___) | || (_| | |_| |_| \__ \ | | (_| | |_) | | __/\__ \ |
2677 # | |____/ \__\__,_|\__|\__,_|___/ |_|\__,_|_.__/|_|\___||___/ |
2679 # +----------------------------------------------------------------------+
2680 # | Definitions of the tables available for status queries |
2681 # '----------------------------------------------------------------------'
2682 # If you need a new column here, then these are the places to change:
2684 # - add column to the end of StatusTableEvents.columns
2685 # - add column to grepping_filters if it is a str column
2686 # - deal with convert_history_line() (if not a str column)
2687 # - make sure that the new column is filled at *every* place where
2688 # an event is being created:
2689 # * _create_event_from_trap()
2690 # * create_event_from_line()
2691 # * _handle_absent_event()
2692 # * _create_overflow_event()
2693 # - When loading the status file add the possibly missing column to all
2694 # loaded events (load_status())
2695 # - Maybe add matching/rewriting for the new column
2696 # - write the actual code using the new column
2698 # - Add column painter for the new column
2701 # - Add painter and filter to all views where appropriate
2702 # - maybe add WATO code for matching rewriting
2703 # - do not forget event_rule_matches() in web!
2704 # - maybe add a field into the event simulator
2707 class StatusTable(object):
2708 prefix
= None # type: Optional[str]
2709 columns
= [] # type: List[Tuple[str, Any]]
2711 # Must return a enumerable type containing fully populated lists (rows) matching the
2712 # columns of the table
2714 def _enumerate(self
, query
):
2715 raise NotImplementedError()
2717 def __init__(self
, logger
):
2718 super(StatusTable
, self
).__init
__()
2719 self
._logger
= logger
.getChild("status_table.%s" % self
.prefix
)
2720 self
._populate
_column
_views
()
2722 def _populate_column_views(self
):
2723 self
.column_names
= [c
[0] for c
in self
.columns
]
2724 self
.column_defaults
= dict(self
.columns
)
2726 self
.column_types
= {}
2727 for name
, def_val
in self
.columns
:
2728 self
.column_types
[name
] = type(def_val
)
2730 self
.column_indices
= dict([(name
, index
) for index
, name
in enumerate(self
.column_names
)])
2732 def query(self
, query
):
2733 requested_column_indexes
= query
.requested_column_indexes()
2735 # Output the column headers
2736 # TODO: Add support for ColumnHeaders like in livestatus?
2737 yield query
.requested_columns
2740 for row
in self
._enumerate
(query
):
2741 if query
.limit
is not None and num_rows
>= query
.limit
:
2742 break # The maximum number of rows has been reached
2745 # TODO: History filtering is done in history load code. Check for improvements
2746 if query
.filters
and query
.table_name
!= "history":
2747 matched
= query
.filter_row(row
)
2751 yield self
._build
_result
_row
(row
, requested_column_indexes
)
2754 def _build_result_row(self
, row
, requested_column_indexes
):
2756 for index
in requested_column_indexes
:
2758 result_row
.append(None)
2760 result_row
.append(row
[index
])
2764 class StatusTableEvents(StatusTable
):
2770 ("event_first", 0.0),
2771 ("event_last", 0.0),
2772 ("event_comment", ""),
2773 ("event_sl", 0), # filter fehlt
2775 ("event_contact", ""),
2776 ("event_application", ""),
2778 ("event_priority", 5),
2779 ("event_facility", 1),
2780 ("event_rule_id", ""),
2782 ("event_phase", ""),
2783 ("event_owner", ""),
2784 ("event_match_groups", ""), # last column up to 1.2.4
2785 ("event_contact_groups", ""), # introduced in 1.2.5i2
2786 ("event_ipaddress", ""), # introduced in 1.2.7i1
2787 ("event_orig_host", ""), # introduced in 1.4.0b1
2788 ("event_contact_groups_precedence", "host"), # introduced in 1.4.0b1
2789 ("event_core_host", ""), # introduced in 1.5.0i1
2790 ("event_host_in_downtime", False), # introduced in 1.5.0i1
2791 ("event_match_groups_syslog_application", ""), # introduced in 1.5.0i2
2794 def __init__(self
, logger
, event_status
):
2795 super(StatusTableEvents
, self
).__init
__(logger
)
2796 self
._event
_status
= event_status
2798 def _enumerate(self
, query
):
2799 for event
in self
._event
_status
.get_events():
2800 # Optimize filters that are set by the check_mkevents active check. Since users
2801 # may have a lot of those checks running, it is a good idea to optimize this.
2802 if query
.only_host
and event
["host"] not in query
.only_host
:
2806 for column_name
in self
.column_names
:
2808 row
.append(event
[column_name
[6:]])
2810 # The row does not have this value. Use the columns default value
2811 row
.append(self
.column_defaults
[column_name
])
2816 class StatusTableHistory(StatusTable
):
2819 ("history_line", 0), # Line number in event history file
2820 ("history_time", 0.0),
2821 ("history_what", ""),
2822 ("history_who", ""),
2823 ("history_addinfo", ""),
2824 ] + StatusTableEvents
.columns
2826 def __init__(self
, logger
, history
):
2827 super(StatusTableHistory
, self
).__init
__(logger
)
2828 self
._history
= history
2830 def _enumerate(self
, query
):
2831 return self
._history
.get(query
)
2834 class StatusTableRules(StatusTable
):
2841 def __init__(self
, logger
, event_status
):
2842 super(StatusTableRules
, self
).__init
__(logger
)
2843 self
._event
_status
= event_status
2845 def _enumerate(self
, query
):
2846 return self
._event
_status
.get_rule_stats()
2849 class StatusTableStatus(StatusTable
):
2851 columns
= EventServer
.status_columns()
2853 def __init__(self
, logger
, event_server
):
2854 super(StatusTableStatus
, self
).__init
__(logger
)
2855 self
._event
_server
= event_server
2857 def _enumerate(self
, query
):
2858 return self
._event
_server
.get_status()
2862 # .--StatusServer--------------------------------------------------------.
2864 # | / ___|| |_ __ _| |_ _ _ ___/ ___| ___ _ ____ _____ _ __ |
2865 # | \___ \| __/ _` | __| | | / __\___ \ / _ \ '__\ \ / / _ \ '__| |
2866 # | ___) | || (_| | |_| |_| \__ \___) | __/ | \ V / __/ | |
2867 # | |____/ \__\__,_|\__|\__,_|___/____/ \___|_| \_/ \___|_| |
2869 # +----------------------------------------------------------------------+
2870 # | Beantworten von Status- und Kommandoanfragen über das UNIX-Socket |
2871 # '----------------------------------------------------------------------'
2874 class StatusServer(ECServerThread
):
2875 def __init__(self
, logger
, settings
, config
, slave_status
, perfcounters
, lock_configuration
,
2876 history
, event_status
, event_server
, terminate_main_event
):
2877 super(StatusServer
, self
).__init
__(
2878 name
="StatusServer",
2882 slave_status
=slave_status
,
2883 profiling_enabled
=settings
.options
.profile_status
,
2884 profile_file
=settings
.paths
.status_server_profile
.value
)
2886 self
._tcp
_socket
= None
2887 self
._reopen
_sockets
= False
2889 self
._table
_events
= StatusTableEvents(logger
, event_status
)
2890 self
._table
_history
= StatusTableHistory(logger
, history
)
2891 self
._table
_rules
= StatusTableRules(logger
, event_status
)
2892 self
._table
_status
= StatusTableStatus(logger
, event_server
)
2893 self
._perfcounters
= perfcounters
2894 self
._lock
_configuration
= lock_configuration
2895 self
._history
= history
2896 self
._event
_status
= event_status
2897 self
._event
_server
= event_server
2898 self
._event
_columns
= StatusTableEvents
.columns
2899 self
._terminate
_main
_event
= terminate_main_event
2901 self
.open_unix_socket()
2902 self
.open_tcp_socket()
2904 def table(self
, name
):
2905 if name
== "events":
2906 return self
._table
_events
2907 if name
== "history":
2908 return self
._table
_history
2910 return self
._table
_rules
2911 if name
== "status":
2912 return self
._table
_status
2913 raise MKClientError(
2914 "Invalid table: %s (allowed are: events, history, rules, status)" % name
)
2916 def open_unix_socket(self
):
2917 path
= self
.settings
.paths
.unix_socket
.value
2920 path
.parent
.mkdir(parents
=True, exist_ok
=True)
2921 self
._socket
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
2922 self
._socket
.bind(str(path
))
2923 # Make sure that socket is group writable
2925 self
._socket
.listen(self
._config
['socket_queue_len'])
2926 self
._unix
_socket
_queue
_len
= self
._config
['socket_queue_len'] # detect changes in config
2928 def open_tcp_socket(self
):
2929 if self
._config
["remote_status"]:
2931 self
._tcp
_port
, self
._tcp
_allow
_commands
= self
._config
["remote_status"][:2]
2933 self
._tcp
_access
_list
= self
._config
["remote_status"][2]
2935 self
._tcp
_access
_list
= None
2937 self
._tcp
_socket
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
2938 self
._tcp
_socket
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
2939 self
._tcp
_socket
.bind(("0.0.0.0", self
._tcp
_port
))
2940 self
._tcp
_socket
.listen(self
._config
['socket_queue_len'])
2942 "Going to listen for status queries on TCP port %d" % self
._tcp
_port
)
2943 except Exception as e
:
2944 if self
.settings
.options
.debug
:
2946 self
._logger
.exception(
2947 "Cannot listen on TCP socket port %d: %s" % (self
._tcp
_port
, e
))
2949 self
._tcp
_socket
= None
2951 self
._tcp
_allow
_commands
= False
2952 self
._tcp
_access
_list
= None
2954 def close_unix_socket(self
):
2956 self
._socket
.close()
2959 def close_tcp_socket(self
):
2960 if self
._tcp
_socket
:
2961 self
._tcp
_socket
.close()
2962 self
._tcp
_socket
= None
2964 def reopen_sockets(self
):
2965 if self
._unix
_socket
_queue
_len
!= self
._config
["socket_queue_len"]:
2966 self
._logger
.info("socket_queue_len has changed. Reopening UNIX socket.")
2967 self
.close_unix_socket()
2968 self
.open_unix_socket()
2970 self
.close_tcp_socket()
2971 self
.open_tcp_socket()
2973 def reload_configuration(self
, config
):
2974 self
._config
= config
2975 self
._reopen
_sockets
= True
2978 while not self
._terminate
_event
.is_set():
2980 client_socket
= None
2983 if self
._reopen
_sockets
:
2984 self
.reopen_sockets()
2985 self
._reopen
_sockets
= False
2987 listen_list
= [self
._socket
]
2988 if self
._tcp
_socket
:
2989 listen_list
.append(self
._tcp
_socket
)
2992 readable
= select
.select(listen_list
, [], [], 0.2)[0]
2993 except select
.error
as e
:
2994 if e
[0] == errno
.EINTR
:
2999 client_socket
, addr_info
= s
.accept()
3000 client_socket
.settimeout(3)
3001 before
= time
.time()
3002 self
._perfcounters
.count("connects")
3004 allow_commands
= self
._tcp
_allow
_commands
3005 if self
.settings
.options
.debug
:
3006 self
._logger
.info("Handle status connection from %s:%d" % addr_info
)
3007 if self
._tcp
_access
_list
is not None and addr_info
[0] not in \
3008 self
._tcp
_access
_list
:
3009 client_socket
.close()
3010 client_socket
= None
3012 "Denying access to status socket from %s (allowed is only %s)" %
3013 (addr_info
[0], ", ".join(self
._tcp
_access
_list
)))
3016 allow_commands
= True
3018 self
.handle_client(client_socket
, allow_commands
, addr_info
and addr_info
[0] or
3021 duration
= time
.time() - before
3022 self
._logger
.verbose("Answered request in %0.2f ms" % (duration
* 1000))
3023 self
._perfcounters
.count_time("request", duration
)
3025 except Exception as e
:
3026 msg
= "Error handling client %s: %s" % (addr_info
, e
)
3027 # Do not log a stack trace for client errors, they are not *our* fault.
3028 if isinstance(e
, MKClientError
):
3029 self
._logger
.error(msg
)
3031 self
._logger
.exception(msg
)
3033 client_socket
.close()
3034 client_socket
= None
3036 client_socket
= None # close without danger of exception
3038 def handle_client(self
, client_socket
, allow_commands
, client_ip
):
3039 for query
in Queries(self
, client_socket
, self
._logger
):
3040 self
._logger
.verbose("Client livestatus query: %r" % query
)
3042 with self
._event
_status
.lock
:
3043 if query
.method
== "GET":
3044 response
= self
.table(query
.table_name
).query(query
)
3046 elif query
.method
== "REPLICATE":
3047 response
= self
.handle_replicate(query
.method_arg
, client_ip
)
3049 elif query
.method
== "COMMAND":
3050 if not allow_commands
:
3051 raise MKClientError("Sorry. Commands are disallowed via TCP")
3052 self
.handle_command_request(query
.method_arg
)
3056 raise NotImplementedError()
3059 self
._answer
_query
(client_socket
, query
, response
)
3060 except socket
.error
as e
:
3061 if e
.errno
== 32: # Broken pipe -> ignore this
3066 client_socket
.close()
3068 # Only GET queries have customizable output formats. COMMAND is always
3069 # a dictionay and COMMAND is always None and always output as "python"
3070 def _answer_query(self
, client_socket
, query
, response
):
3071 if query
.method
!= "GET":
3072 self
._answer
_query
_python
(client_socket
, response
)
3075 if query
.output_format
== "plain":
3076 for row
in response
:
3077 client_socket
.sendall("\t".join([cmk
.ec
.history
.quote_tab(c
) for c
in row
]) + "\n")
3079 elif query
.output_format
== "json":
3080 client_socket
.sendall(json
.dumps(list(response
)) + "\n")
3082 elif query
.output_format
== "python":
3083 self
._answer
_query
_python
(client_socket
, list(response
))
3086 raise NotImplementedError()
3088 def _answer_query_python(self
, client_socket
, response
):
3089 client_socket
.sendall(repr(response
) + "\n")
3091 # All commands are already locked with self._event_status.lock
3092 def handle_command_request(self
, commandline
):
3093 self
._logger
.info("Executing command: %s" % commandline
)
3094 parts
= commandline
.split(";")
3096 replication_allow_command(self
._config
, command
, self
._slave
_status
)
3097 arguments
= parts
[1:]
3098 if command
== "DELETE":
3099 self
.handle_command_delete(arguments
)
3100 elif command
== "RELOAD":
3101 self
.handle_command_reload()
3102 elif command
== "SHUTDOWN":
3103 self
._logger
.info("Going to shut down")
3104 terminate(self
._terminate
_main
_event
, self
._event
_server
, self
)
3105 elif command
== "REOPENLOG":
3106 self
.handle_command_reopenlog()
3107 elif command
== "FLUSH":
3108 self
.handle_command_flush()
3109 elif command
== "SYNC":
3110 self
.handle_command_sync()
3111 elif command
== "RESETCOUNTERS":
3112 self
.handle_command_resetcounters(arguments
)
3113 elif command
== "UPDATE":
3114 self
.handle_command_update(arguments
)
3115 elif command
== "CREATE":
3116 self
.handle_command_create(arguments
)
3117 elif command
== "CHANGESTATE":
3118 self
.handle_command_changestate(arguments
)
3119 elif command
== "ACTION":
3120 self
.handle_command_action(arguments
)
3121 elif command
== "SWITCHMODE":
3122 self
.handle_command_switchmode(arguments
)
3124 raise MKClientError("Unknown command %s" % command
)
3126 def handle_command_delete(self
, arguments
):
3127 if len(arguments
) != 2:
3128 raise MKClientError("Wrong number of arguments for DELETE")
3129 event_id
, user
= arguments
3130 self
._event
_status
.delete_event(int(event_id
), user
)
3132 def handle_command_update(self
, arguments
):
3133 event_id
, user
, acknowledged
, comment
, contact
= arguments
3134 event
= self
._event
_status
.event(int(event_id
))
3136 raise MKClientError("No event with id %s" % event_id
)
3137 # Note the common practice: We validate parameters *before* doing any changes.
3139 ack
= int(acknowledged
)
3140 if ack
and event
["phase"] not in ["open", "ack"]:
3141 raise MKClientError("You cannot acknowledge an event that is not open.")
3142 event
["phase"] = "ack" if ack
else "open"
3144 event
["comment"] = comment
3146 event
["contact"] = contact
3148 event
["owner"] = user
3149 self
._history
.add(event
, "UPDATE", user
)
3151 def handle_command_create(self
, arguments
):
3152 # Would rather use process_raw_line(), but we are already
3153 # holding self._event_status.lock and it's sub functions are setting
3154 # self._event_status.lock too. The lock can not be allocated twice.
3155 # TODO: Change the lock type in future?
3156 # process_raw_lines("%s" % ";".join(arguments))
3157 with
file(str(self
.settings
.paths
.event_pipe
.value
), "w") as pipe
:
3158 pipe
.write(("%s\n" % ";".join(arguments
)).encode("utf-8"))
3160 def handle_command_changestate(self
, arguments
):
3161 event_id
, user
, newstate
= arguments
3162 event
= self
._event
_status
.event(int(event_id
))
3164 raise MKClientError("No event with id %s" % event_id
)
3165 event
["state"] = int(newstate
)
3167 event
["owner"] = user
3168 self
._history
.add(event
, "CHANGESTATE", user
)
3170 def handle_command_reload(self
):
3171 reload_configuration(self
.settings
, self
._logger
, self
._lock
_configuration
, self
._history
,
3172 self
._event
_status
, self
._event
_server
, self
, self
._slave
_status
)
3174 def handle_command_reopenlog(self
):
3175 self
._logger
.info("Closing this logfile")
3176 cmk
.utils
.log
.open_log(str(self
.settings
.paths
.log_file
.value
))
3177 self
._logger
.info("Opened new logfile")
3179 # Erase our current state and history!
3180 def handle_command_flush(self
):
3181 self
._history
.flush()
3182 self
._event
_status
.flush()
3183 self
._event
_status
.save_status()
3184 if is_replication_slave(self
._config
):
3186 self
.settings
.paths
.master_config_file
.value
.unlink()
3187 self
.settings
.paths
.slave_status_file
.value
.unlink()
3188 update_slave_status(self
._slave
_status
, self
.settings
, self
._config
)
3191 self
._logger
.info("Flushed current status and historic events.")
3193 def handle_command_sync(self
):
3194 self
._event
_status
.save_status()
3196 def handle_command_resetcounters(self
, arguments
):
3198 rule_id
= arguments
[0]
3199 self
._logger
.info("Resetting counters of rule " + rule_id
)
3201 rule_id
= None # Reset all rule counters
3202 self
._logger
.info("Resetting all rule counters")
3203 self
._event
_status
.reset_counters(rule_id
)
3205 def handle_command_action(self
, arguments
):
3206 event_id
, user
, action_id
= arguments
3207 event
= self
._event
_status
.event(int(event_id
))
3209 event
["owner"] = user
3211 if action_id
== "@NOTIFY":
3212 cmk
.ec
.actions
.do_notify(
3213 self
._event
_server
, self
._logger
, event
, user
, is_cancelling
=False)
3215 with self
._lock
_configuration
:
3216 if action_id
not in self
._config
["action"]:
3217 raise MKClientError(
3218 "The action '%s' is not defined. After adding new commands please "
3219 "make sure that you activate the changes in the Event Console." % action_id
)
3220 action
= self
._config
["action"][action_id
]
3221 cmk
.ec
.actions
.do_event_action(self
._history
, self
.settings
, self
._config
, self
._logger
,
3222 self
._event
_columns
, action
, event
, user
)
3224 def handle_command_switchmode(self
, arguments
):
3225 new_mode
= arguments
[0]
3226 if not is_replication_slave(self
._config
):
3227 raise MKClientError("Cannot switch replication mode: this is not a replication slave.")
3228 elif new_mode
not in ["sync", "takeover"]:
3229 raise MKClientError(
3230 "Invalid target mode '%s': allowed are only 'sync' and 'takeover'" % new_mode
)
3231 self
._slave
_status
["mode"] = new_mode
3232 save_slave_status(self
.settings
, self
._slave
_status
)
3233 self
._logger
.info("Switched replication mode to '%s' by external command." % new_mode
)
3235 def handle_replicate(self
, argument
, client_ip
):
3236 # Last time our slave got a config update
3238 last_update
= int(argument
)
3239 if self
.settings
.options
.debug
:
3240 self
._logger
.info("Replication: sync request from %s, last update %d seconds ago" %
3241 (client_ip
, time
.time() - last_update
))
3244 raise MKClientError("Invalid arguments to command REPLICATE")
3245 return replication_send(self
._config
, self
._lock
_configuration
, self
._event
_status
,
3250 # .--Dispatching---------------------------------------------------------.
3252 # | | _ \(_)___ _ __ __ _| |_ ___| |__ (_)_ __ __ _ |
3253 # | | | | | / __| '_ \ / _` | __/ __| '_ \| | '_ \ / _` | |
3254 # | | |_| | \__ \ |_) | (_| | || (__| | | | | | | | (_| | |
3255 # | |____/|_|___/ .__/ \__,_|\__\___|_| |_|_|_| |_|\__, | |
3257 # +----------------------------------------------------------------------+
3258 # | Starten und Verwalten der beiden Threads. |
3259 # '----------------------------------------------------------------------'
3262 def run_eventd(terminate_main_event
, settings
, config
, lock_configuration
, history
, perfcounters
,
3263 event_status
, event_server
, status_server
, slave_status
, logger
):
3264 status_server
.start()
3265 event_server
.start()
3267 next_housekeeping
= now
+ config
["housekeeping_interval"]
3268 next_retention
= now
+ config
["retention_interval"]
3269 next_statistics
= now
+ config
["statistics_interval"]
3270 next_replication
= 0 # force immediate replication after restart
3272 while not terminate_main_event
.is_set():
3275 # Wait until either housekeeping or retention is due, but at
3276 # maximum 60 seconds. That way changes of the interval from a very
3277 # high to a low value will never require more than 60 seconds
3279 event_list
= [next_housekeeping
, next_retention
, next_statistics
]
3280 if is_replication_slave(config
):
3281 event_list
.append(next_replication
)
3283 time_left
= max(0, min(event_list
) - time
.time())
3284 time
.sleep(min(time_left
, 60))
3287 if now
> next_housekeeping
:
3288 event_server
.do_housekeeping()
3289 next_housekeeping
= now
+ config
["housekeeping_interval"]
3291 if now
> next_retention
:
3292 with event_status
.lock
:
3293 event_status
.save_status()
3294 next_retention
= now
+ config
["retention_interval"]
3296 if now
> next_statistics
:
3297 perfcounters
.do_statistics()
3298 next_statistics
= now
+ config
["statistics_interval"]
3300 # Beware: replication might be turned on during this loop!
3301 if is_replication_slave(config
) and now
> next_replication
:
3302 replication_pull(settings
, config
, lock_configuration
, perfcounters
,
3303 event_status
, event_server
, slave_status
, logger
)
3304 next_replication
= now
+ config
["replication"]["interval"]
3305 except MKSignalException
as e
:
3307 except Exception as e
:
3308 logger
.exception("Exception in main thread:\n%s" % e
)
3309 if settings
.options
.debug
:
3312 except MKSignalException
as e
:
3314 logger
.info("Received SIGHUP - going to reload configuration")
3315 reload_configuration(settings
, logger
, lock_configuration
, history
, event_status
,
3316 event_server
, status_server
, slave_status
)
3318 logger
.info("Signalled to death by signal %d" % e
._signum
)
3319 terminate(terminate_main_event
, event_server
, status_server
)
3321 # Now wait for termination of the server threads
3323 status_server
.join()
3327 # .--EventStatus---------------------------------------------------------.
3328 # | _____ _ ____ _ _ |
3329 # | | ____|_ _____ _ __ | |_/ ___|| |_ __ _| |_ _ _ ___ |
3330 # | | _| \ \ / / _ \ '_ \| __\___ \| __/ _` | __| | | / __| |
3331 # | | |___ \ V / __/ | | | |_ ___) | || (_| | |_| |_| \__ \ |
3332 # | |_____| \_/ \___|_| |_|\__|____/ \__\__,_|\__|\__,_|___/ |
3334 # +----------------------------------------------------------------------+
3335 # | Bereithalten des aktuellen Event-Status. Dieser schützt sich selbst |
3336 # | durch ein Lock vor gleichzeitigen Zugriffen durch die Threads. |
3337 # '----------------------------------------------------------------------'
3340 class EventStatus(object):
3341 def __init__(self
, settings
, config
, perfcounters
, history
, logger
):
3342 self
.settings
= settings
3343 self
._config
= config
3344 self
._perfcounters
= perfcounters
3345 self
.lock
= threading
.Lock()
3346 self
._history
= history
3347 self
._logger
= logger
3350 def reload_configuration(self
, config
):
3351 self
._config
= config
3355 self
._next
_event
_id
= 1
3356 self
._rule
_stats
= {}
3357 self
._interval
_starts
= {} # needed for expecting rules
3358 self
._initialize
_event
_limit
_status
()
3360 # TODO: might introduce some performance counters, like:
3361 # - number of received messages
3362 # - number of rule hits
3363 # - number of rule misses
3368 def event(self
, eid
):
3369 for event
in self
._events
:
3370 if event
["id"] == eid
:
3373 # Return beginning of current expectation interval. For new rules
3374 # we start with the next interval in future.
3375 def interval_start(self
, rule_id
, interval
):
3376 if rule_id
not in self
._interval
_starts
:
3377 start
= self
.next_interval_start(interval
, time
.time())
3378 self
._interval
_starts
[rule_id
] = start
3381 start
= self
._interval
_starts
[rule_id
]
3382 # Make sure that if the user switches from day to hour and we
3383 # are still waiting for the first interval to begin, that we
3384 # do not wait for the next day.
3385 next_interval
= self
.next_interval_start(interval
, time
.time())
3386 if start
> next_interval
:
3387 start
= next_interval
3388 self
._interval
_starts
[rule_id
] = start
3391 def next_interval_start(self
, interval
, previous_start
):
3392 if isinstance(interval
, tuple):
3393 length
, offset
= interval
3399 previous_start
-= offset
# take into account timezone offset
3400 full_parts
= divmod(previous_start
, length
)[0]
3401 next_start
= (full_parts
+ 1) * length
3402 next_start
+= offset
3405 def start_next_interval(self
, rule_id
, interval
):
3406 current_start
= self
.interval_start(rule_id
, interval
)
3407 next_start
= self
.next_interval_start(interval
, current_start
)
3408 self
._interval
_starts
[rule_id
] = next_start
3409 self
._logger
.debug("Rule %s: next interval starts %s (i.e. now + %.2f sec)" %
3410 (rule_id
, next_start
, time
.time() - next_start
))
3412 def pack_status(self
):
3414 "next_event_id": self
._next
_event
_id
,
3415 "events": self
._events
,
3416 "rule_stats": self
._rule
_stats
,
3417 "interval_starts": self
._interval
_starts
,
3420 def unpack_status(self
, status
):
3421 self
._next
_event
_id
= status
["next_event_id"]
3422 self
._events
= status
["events"]
3423 self
._rule
_stats
= status
["rule_stats"]
3424 self
._interval
_starts
= status
["interval_starts"]
3426 def save_status(self
):
3428 status
= self
.pack_status()
3429 path
= self
.settings
.paths
.status_file
.value
3430 path_new
= path
.parent
/ (path
.name
+ '.new')
3431 # Believe it or not: cPickle is more than two times slower than repr()
3432 with path_new
.open(mode
='wb') as f
:
3433 f
.write(repr(status
) + "\n")
3435 os
.fsync(f
.fileno())
3436 path_new
.rename(path
)
3437 elapsed
= time
.time() - now
3438 self
._logger
.verbose("Saved event state to %s in %.3fms." % (path
, elapsed
* 1000))
3440 def reset_counters(self
, rule_id
):
3442 if rule_id
in self
._rule
_stats
:
3443 del self
._rule
_stats
[rule_id
]
3445 self
._rule
_stats
= {}
3448 def load_status(self
, event_server
):
3449 path
= self
.settings
.paths
.status_file
.value
3452 status
= ast
.literal_eval(path
.read_bytes())
3453 self
._next
_event
_id
= status
["next_event_id"]
3454 self
._events
= status
["events"]
3455 self
._rule
_stats
= status
["rule_stats"]
3456 self
._interval
_starts
= status
.get("interval_starts", {})
3457 self
._initialize
_event
_limit
_status
()
3458 self
._logger
.info("Loaded event state from %s." % path
)
3459 except Exception as e
:
3460 self
._logger
.exception("Error loading event state from %s: %s" % (path
, e
))
3464 for event
in self
._events
:
3465 event
.setdefault("ipaddress", "")
3467 if "core_host" not in event
:
3468 event_server
.add_core_host_to_event(event
)
3469 event
["host_in_downtime"] = False
3471 # Called on Event Console initialization from status file to initialize
3472 # the current event limit state -> Sets internal counters which are
3473 # updated during runtime.
3474 def _initialize_event_limit_status(self
):
3475 self
.num_existing_events
= len(self
._events
)
3477 self
.num_existing_events_by_host
= {}
3478 self
.num_existing_events_by_rule
= {}
3479 for event
in self
._events
:
3480 self
._count
_event
_add
(event
)
3482 def _count_event_add(self
, event
):
3483 if event
["host"] not in self
.num_existing_events_by_host
:
3484 self
.num_existing_events_by_host
[event
["host"]] = 1
3486 self
.num_existing_events_by_host
[event
["host"]] += 1
3488 if event
["rule_id"] not in self
.num_existing_events_by_rule
:
3489 self
.num_existing_events_by_rule
[event
["rule_id"]] = 1
3491 self
.num_existing_events_by_rule
[event
["rule_id"]] += 1
3493 def _count_event_remove(self
, event
):
3494 self
.num_existing_events
-= 1
3495 self
.num_existing_events_by_host
[event
["host"]] -= 1
3496 self
.num_existing_events_by_rule
[event
["rule_id"]] -= 1
3498 def new_event(self
, event
):
3499 self
._perfcounters
.count("events")
3500 event
["id"] = self
._next
_event
_id
3501 self
._next
_event
_id
+= 1
3502 self
._events
.append(event
)
3503 self
.num_existing_events
+= 1
3504 self
._count
_event
_add
(event
)
3505 self
._history
.add(event
, "NEW")
3507 def archive_event(self
, event
):
3508 self
._perfcounters
.count("events")
3509 event
["id"] = self
._next
_event
_id
3510 self
._next
_event
_id
+= 1
3511 event
["phase"] = "closed"
3512 self
._history
.add(event
, "ARCHIVED")
3514 def remove_event(self
, event
):
3516 self
._events
.remove(event
)
3517 self
._count
_event
_remove
(event
)
3519 self
._logger
.exception("Cannot remove event %d: not present" % event
["id"])
3521 # protected by self.lock
3522 def _remove_event_by_nr(self
, index
):
3523 event
= self
._events
.pop(index
)
3524 self
._count
_event
_remove
(event
)
3526 # protected by self.lock
3527 def remove_oldest_event(self
, ty
, event
):
3529 self
._logger
.verbose(" Removing oldest event")
3530 self
._remove
_event
_by
_nr
(0)
3531 elif ty
== "by_rule":
3532 self
._logger
.verbose(" Removing oldest event of rule \"%s\"" % event
["rule_id"])
3533 self
._remove
_oldest
_event
_of
_rule
(event
["rule_id"])
3534 elif ty
== "by_host":
3535 self
._logger
.verbose(" Removing oldest event of host \"%s\"" % event
["host"])
3536 self
._remove
_oldest
_event
_of
_host
(event
["host"])
3538 # protected by self.lock
3539 def _remove_oldest_event_of_rule(self
, rule_id
):
3540 for event
in self
._events
:
3541 if event
["rule_id"] == rule_id
:
3542 self
.remove_event(event
)
3545 # protected by self.lock
3546 def _remove_oldest_event_of_host(self
, hostname
):
3547 for event
in self
._events
:
3548 if event
["host"] == hostname
:
3549 self
.remove_event(event
)
3552 # protected by self.lock
3553 def get_num_existing_events_by(self
, ty
, event
):
3555 return self
.num_existing_events
3556 elif ty
== "by_rule":
3557 return self
.num_existing_events_by_rule
.get(event
["rule_id"], 0)
3558 elif ty
== "by_host":
3559 return self
.num_existing_events_by_host
.get(event
["host"], 0)
3561 raise NotImplementedError()
3563 # Cancel all events the belong to a certain rule id and are
3564 # of the same "breed" as a new event.
3565 def cancel_events(self
, event_server
, event_columns
, new_event
, match_groups
, rule
):
3568 for nr
, event
in enumerate(self
._events
):
3569 if event
["rule_id"] == rule
["id"]:
3570 if self
.cancelling_match(match_groups
, new_event
, event
, rule
):
3571 # Fill a few fields of the cancelled event with data from
3572 # the cancelling event so that action scripts have useful
3573 # values and the logfile entry if more relevant.
3574 previous_phase
= event
["phase"]
3575 event
["phase"] = "closed"
3576 # TODO: Why do we use OK below and not new_event["state"]???
3577 event
["state"] = 0 # OK
3578 event
["text"] = new_event
["text"]
3579 # TODO: This is a hack and partial copy-n-paste from rewrite_events...
3580 if "set_text" in rule
:
3581 event
["text"] = replace_groups(rule
["set_text"], event
["text"],
3583 event
["time"] = new_event
["time"]
3584 event
["last"] = new_event
["time"]
3585 event
["priority"] = new_event
["priority"]
3586 self
._history
.add(event
, "CANCELLED")
3587 actions
= rule
.get("cancel_actions", [])
3589 if previous_phase
!= "open" \
3590 and rule
.get("cancel_action_phases", "always") == "open":
3592 "Do not execute cancelling actions, event %s's phase "
3593 "is not 'open' but '%s'" % (event
["id"], previous_phase
))
3595 cmk
.ec
.actions
.do_event_actions(
3606 to_delete
.append(nr
)
3608 for nr
in to_delete
[::-1]:
3609 self
._remove
_event
_by
_nr
(nr
)
3611 def cancelling_match(self
, match_groups
, new_event
, event
, rule
):
3612 debug
= self
._config
["debug_rules"]
3614 # The match_groups of the canceling match only contain the *_ok match groups
3615 # Since the rewrite definitions are based on the positive match, we need to
3616 # create some missing keys. O.o
3617 for key
in match_groups
.keys():
3618 if key
.endswith("_ok"):
3619 match_groups
[key
[:-3]] = match_groups
[key
]
3621 # Note: before we compare host and application we need to
3622 # apply the rewrite rules to the event. Because if in the previous
3623 # the hostname was rewritten, it wouldn't match anymore here.
3624 host
= new_event
["host"]
3625 if "set_host" in rule
:
3626 host
= replace_groups(rule
["set_host"], host
, match_groups
)
3628 if event
["host"] != host
:
3630 self
._logger
.info("Do not cancel event %d: host is not the same (%s != %s)" %
3631 (event
["id"], event
["host"], host
))
3634 # The same for the application. But in case there is cancelling based on the application
3635 # configured in the rule, then don't check for different applications.
3636 if "cancel_application" not in rule
:
3637 application
= new_event
["application"]
3638 if "set_application" in rule
:
3639 application
= replace_groups(rule
["set_application"], application
, match_groups
)
3640 if event
["application"] != application
:
3643 "Do not cancel event %d: application is not the same (%s != %s)" %
3644 (event
["id"], event
["application"], application
))
3647 if event
["facility"] != new_event
["facility"]:
3650 "Do not cancel event %d: syslog facility is not the same (%d != %d)" %
3651 (event
["id"], event
["facility"], new_event
["facility"]))
3653 # Make sure, that the matching groups are the same. If the OK match
3654 # has less groups, we do not care. If it has more groups, then we
3655 # do not care either. We just compare the common "prefix".
3656 for nr
, (prev_group
, cur_group
) in enumerate(
3657 zip(event
["match_groups"], match_groups
.get("match_groups_message_ok", ()))):
3658 if prev_group
!= cur_group
:
3660 self
._logger
.info("Do not cancel event %d: match group number "
3661 "%d does not match (%s != %s)" % (event
["id"], nr
+ 1,
3662 prev_group
, cur_group
))
3665 # Note: Duplicated code right above
3666 # Make sure, that the syslog_application matching groups are the same. If the OK match
3667 # has less groups, we do not care. If it has more groups, then we
3668 # do not care either. We just compare the common "prefix".
3669 for nr
, (prev_group
, cur_group
) in enumerate(
3671 event
.get("match_groups_syslog_application", ()),
3672 match_groups
.get("match_groups_syslog_application_ok", ()))):
3673 if prev_group
!= cur_group
:
3676 "Do not cancel event %d: syslog application match group number "
3677 "%d does not match (%s != %s)" % (event
["id"], nr
+ 1, prev_group
,
3683 def count_rule_match(self
, rule_id
):
3685 self
._rule
_stats
.setdefault(rule_id
, 0)
3686 self
._rule
_stats
[rule_id
] += 1
3688 def count_event_up(self
, found
, event
):
3689 # Update event with new information from new occurrance,
3690 # but preserve certain attributes from the original (first)
3693 "count": found
.get("count", 1) + 1,
3694 "first": found
["first"],
3696 # When event is already active then do not change
3697 # comment or contact information anymore
3698 if found
["phase"] == "open":
3699 if "comment" in found
:
3700 preserve
["comment"] = found
["comment"]
3701 if "contact" in found
:
3702 preserve
["contact"] = found
["contact"]
3704 found
.update(preserve
)
3706 def count_expected_event(self
, event_server
, event
):
3707 for ev
in self
._events
:
3708 if ev
["rule_id"] == event
["rule_id"] and ev
["phase"] == "counting":
3709 self
.count_event_up(ev
, event
)
3712 # None found, create one
3714 event
["phase"] = "counting"
3715 event_server
.new_event_respecting_limits(event
)
3717 def count_event(self
, event_server
, event
, rule
, count
):
3718 # Find previous occurrance of this event and acount for
3719 # one new occurrance. In case of negated count (expecting rules)
3720 # we do never modify events that are already in the state "open"
3721 # since the event has been created because the count was too
3722 # low in the specified period of time.
3723 for ev
in self
._events
:
3724 if ev
["rule_id"] == event
["rule_id"]:
3725 if ev
["phase"] == "ack" and not count
["count_ack"]:
3726 continue # skip acknowledged events
3728 if count
["separate_host"] and ev
["host"] != event
["host"]:
3729 continue # treat events with separated hosts separately
3731 if count
["separate_application"] and ev
["application"] != event
["application"]:
3732 continue # same for application
3734 if count
["separate_match_groups"] and ev
["match_groups"] != event
["match_groups"]:
3737 if count
.get("count_duration"
3738 ) is not None and ev
["first"] + count
["count_duration"] < event
["time"]:
3739 # Counting has been discontinued on this event after a certain time
3742 if ev
["host_in_downtime"] != event
["host_in_downtime"]:
3743 continue # treat events with different downtime states separately
3746 self
.count_event_up(found
, event
)
3750 event
["phase"] = "counting"
3751 event_server
.new_event_respecting_limits(event
)
3754 # Did we just count the event that was just one too much?
3755 if found
["phase"] == "counting" and found
["count"] >= count
["count"]:
3756 found
["phase"] = "open"
3757 return found
# do event action, return found copy of event
3758 return False # do not do event action
3760 # locked with self.lock
3761 def delete_event(self
, event_id
, user
):
3762 for nr
, event
in enumerate(self
._events
):
3763 if event
["id"] == event_id
:
3764 event
["phase"] = "closed"
3766 event
["owner"] = user
3767 self
._history
.add(event
, "DELETE", user
)
3768 self
._remove
_event
_by
_nr
(nr
)
3770 raise MKClientError("No event with id %s" % event_id
)
3772 def get_events(self
):
3775 def get_rule_stats(self
):
3776 return sorted(self
._rule
_stats
.iteritems(), key
=lambda x
: x
[0])
3780 # .--Replication---------------------------------------------------------.
3782 # | | _ \ ___ _ __ | (_) ___ __ _| |_(_) ___ _ __ |
3783 # | | |_) / _ \ '_ \| | |/ __/ _` | __| |/ _ \| '_ \ |
3784 # | | _ < __/ |_) | | | (_| (_| | |_| | (_) | | | | |
3785 # | |_| \_\___| .__/|_|_|\___\__,_|\__|_|\___/|_| |_| |
3787 # +----------------------------------------------------------------------+
3788 # | Functions for doing replication, master and slave parts. |
3789 # '----------------------------------------------------------------------'
3792 def is_replication_slave(config
):
3793 repl_settings
= config
["replication"]
3794 return repl_settings
and not repl_settings
.get("disabled")
3797 def replication_allow_command(config
, command
, slave_status
):
3798 if is_replication_slave(config
) and slave_status
["mode"] == "sync" \
3799 and command
in ["DELETE", "UPDATE", "CHANGESTATE", "ACTION"]:
3800 raise MKClientError("This command is not allowed on a replication slave "
3801 "while it is in sync mode.")
3804 def replication_send(config
, lock_configuration
, event_status
, last_update
):
3806 with lock_configuration
:
3807 response
["status"] = event_status
.pack_status()
3808 if last_update
< config
["last_reload"]:
3809 response
["rules"] = config
[
3810 "rules"] # Remove one bright day, where legacy rules are not needed anymore
3811 response
["rule_packs"] = config
["rule_packs"]
3812 response
["actions"] = config
["actions"]
3816 def replication_pull(settings
, config
, lock_configuration
, perfcounters
, event_status
, event_server
,
3817 slave_status
, logger
):
3818 # We distinguish two modes:
3819 # 1. slave mode: just pull the current state from the master.
3820 # if the master is not reachable then decide whether to
3821 # switch to takeover mode.
3822 # 2. takeover mode: if automatic fallback is enabled and the
3823 # time frame for that has not yet ellapsed, then try to
3824 # pull the current state from the master. If that is successful
3825 # then switch back to slave mode. If not automatic fallback
3826 # is enabled then simply do nothing.
3828 repl_settings
= config
["replication"]
3829 mode
= slave_status
["mode"]
3830 need_sync
= mode
== "sync" or (
3831 mode
== "takeover" and "fallback" in repl_settings
and
3832 (slave_status
["last_master_down"] is None or
3833 now
- repl_settings
["fallback"] < slave_status
["last_master_down"]))
3836 with event_status
.lock
:
3837 with lock_configuration
:
3840 new_state
= get_state_from_master(config
, slave_status
)
3841 replication_update_state(settings
, config
, event_status
, event_server
,
3843 if repl_settings
.get("logging"):
3844 logger
.info("Successfully synchronized with master")
3845 slave_status
["last_sync"] = now
3846 slave_status
["success"] = True
3848 # Fall back to slave mode after successful sync
3849 # (time frame has already been checked)
3850 if mode
== "takeover":
3851 if slave_status
["last_master_down"] is None:
3852 logger
.info("Replication: master reachable for the first time, "
3853 "switching back to slave mode")
3854 slave_status
["mode"] = "sync"
3856 logger
.info("Replication: master reachable again after %d seconds, "
3857 "switching back to sync mode" %
3858 (now
- slave_status
["last_master_down"]))
3859 slave_status
["mode"] = "sync"
3860 slave_status
["last_master_down"] = None
3862 except Exception as e
:
3863 logger
.warning("Replication: cannot sync with master: %s" % e
)
3864 slave_status
["success"] = False
3865 if slave_status
["last_master_down"] is None:
3866 slave_status
["last_master_down"] = now
3869 if "takeover" in repl_settings
and mode
!= "takeover":
3870 if not slave_status
["last_sync"]:
3871 if repl_settings
.get("logging"):
3873 "Replication: no takeover since master was never reached.")
3875 offline
= now
- slave_status
["last_sync"]
3876 if offline
< repl_settings
["takeover"]:
3877 if repl_settings
.get("logging"):
3879 "Replication: no takeover yet, still %d seconds to wait" %
3880 (repl_settings
["takeover"] - offline
))
3883 "Replication: master not reached for %d seconds, taking over!" %
3885 slave_status
["mode"] = "takeover"
3887 save_slave_status(settings
, slave_status
)
3889 # Compute statistics of the average time needed for a sync
3890 perfcounters
.count_time("sync", time
.time() - now
)
3893 def replication_update_state(settings
, config
, event_status
, event_server
, new_state
):
3895 # Keep a copy of the masters' rules and actions and also prepare using them
3896 if "rules" in new_state
:
3897 save_master_config(settings
, new_state
)
3898 event_server
.compile_rules(new_state
["rules"], new_state
.get("rule_packs", []))
3899 config
["actions"] = new_state
["actions"]
3901 # Update to the masters' event state
3902 event_status
.unpack_status(new_state
["status"])
3905 def save_master_config(settings
, new_state
):
3906 path
= settings
.paths
.master_config_file
.value
3907 path_new
= path
.parent
/ (path
.name
+ '.new')
3908 path_new
.write_bytes(
3910 "rules": new_state
["rules"],
3911 "rule_packs": new_state
["rule_packs"],
3912 "actions": new_state
["actions"],
3914 path_new
.rename(path
)
3917 def load_master_config(settings
, config
, logger
):
3918 path
= settings
.paths
.master_config_file
.value
3920 config
= ast
.literal_eval(path
.read_bytes())
3921 config
["rules"] = config
["rules"]
3922 config
["rule_packs"] = config
.get("rule_packs", [])
3923 config
["actions"] = config
["actions"]
3924 logger
.info("Replication: restored %d rule packs and %d actions from %s" % (len(
3925 config
["rule_packs"]), len(config
["actions"]), path
))
3927 if is_replication_slave(config
):
3928 logger
.error("Replication: no previously saved master state available")
3931 def get_state_from_master(config
, slave_status
):
3932 repl_settings
= config
["replication"]
3934 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
3935 sock
.settimeout(repl_settings
["connect_timeout"])
3936 sock
.connect(repl_settings
["master"])
3938 "REPLICATE %d\n" % (slave_status
["last_sync"] and slave_status
["last_sync"] or 0))
3939 sock
.shutdown(socket
.SHUT_WR
)
3943 chunk
= sock
.recv(8192)
3944 response_text
+= chunk
3948 return ast
.literal_eval(response_text
)
3949 except SyntaxError as e
:
3950 raise Exception("Invalid response from event daemon: <pre>%s</pre>" % response_text
)
3952 except IOError as e
:
3953 raise Exception("Master not responding: %s" % e
)
3955 except Exception as e
:
3956 raise Exception("Cannot connect to event daemon: %s" % e
)
3959 def save_slave_status(settings
, slave_status
):
3960 settings
.paths
.slave_status_file
.value
.write_bytes(repr(slave_status
) + "\n")
3963 def default_slave_status_master():
3966 "last_master_down": None,
3968 "average_sync_time": None,
3972 def default_slave_status_sync():
3975 "last_master_down": None,
3977 "average_sync_time": None,
3981 def update_slave_status(slave_status
, settings
, config
):
3982 path
= settings
.paths
.slave_status_file
.value
3983 if is_replication_slave(config
):
3985 slave_status
.update(ast
.literal_eval(path
.read_bytes()))
3987 slave_status
.update(default_slave_status_sync())
3988 save_slave_status(settings
, slave_status
)
3992 slave_status
.update(default_slave_status_master())
3996 # .--Configuration-------------------------------------------------------.
3998 # | / ___|___ _ __ / _(_) __ _ _ _ _ __ __ _| |_(_) ___ _ __ |
3999 # | | | / _ \| '_ \| |_| |/ _` | | | | '__/ _` | __| |/ _ \| '_ \ |
4000 # | | |__| (_) | | | | _| | (_| | |_| | | | (_| | |_| | (_) | | | | |
4001 # | \____\___/|_| |_|_| |_|\__, |\__,_|_| \__,_|\__|_|\___/|_| |_| |
4003 # +----------------------------------------------------------------------+
4004 # | Loading of the configuration files |
4005 # '----------------------------------------------------------------------'
4008 def load_configuration(settings
, logger
, slave_status
):
4009 config
= cmk
.ec
.export
.load_config(settings
)
4011 # If not set by command line, set the log level by configuration
4012 if settings
.options
.verbosity
== 0:
4013 levels
= config
["log_level"]
4014 logger
.setLevel(levels
["cmk.mkeventd"])
4015 logger
.getChild("EventServer").setLevel(levels
["cmk.mkeventd.EventServer"])
4016 if "cmk.mkeventd.EventServer.snmp" in levels
:
4017 logger
.getChild("EventServer.snmp").setLevel(levels
["cmk.mkeventd.EventServer.snmp"])
4018 logger
.getChild("EventStatus").setLevel(levels
["cmk.mkeventd.EventStatus"])
4019 logger
.getChild("StatusServer").setLevel(levels
["cmk.mkeventd.StatusServer"])
4020 logger
.getChild("lock").setLevel(levels
["cmk.mkeventd.lock"])
4022 # Are we a replication slave? Parts of the configuration
4023 # will be overridden by values from the master.
4024 update_slave_status(slave_status
, settings
, config
)
4025 if is_replication_slave(config
):
4026 logger
.info("Replication: slave configuration, current mode: %s" % slave_status
["mode"])
4027 load_master_config(settings
, config
, logger
)
4029 # Create dictionary for actions for easy access
4030 config
["action"] = {}
4031 for action
in config
["actions"]:
4032 config
["action"][action
["id"]] = action
4034 config
["last_reload"] = time
.time()
4039 def reload_configuration(settings
, logger
, lock_configuration
, history
, event_status
, event_server
,
4040 status_server
, slave_status
):
4041 with lock_configuration
:
4042 config
= load_configuration(settings
, logger
, slave_status
)
4043 history
.reload_configuration(config
)
4044 event_server
.reload_configuration(config
)
4046 event_status
.reload_configuration(config
)
4047 status_server
.reload_configuration(config
)
4048 logger
.info("Reloaded configuration.")
4052 # .--Main----------------------------------------------------------------.
4054 # | | \/ | __ _(_)_ __ |
4055 # | | |\/| |/ _` | | '_ \ |
4056 # | | | | | (_| | | | | | |
4057 # | |_| |_|\__,_|_|_| |_| |
4059 # +----------------------------------------------------------------------+
4060 # | Main entry and option parsing |
4061 # '----------------------------------------------------------------------'
4066 logger
= cmk
.utils
.log
.get_logger("mkeventd")
4067 settings
= cmk
.ec
.settings
.settings(cmk
.__version
__, pathlib
.Path(cmk
.utils
.paths
.omd_root
),
4068 pathlib
.Path(cmk
.utils
.paths
.default_config_dir
), sys
.argv
)
4072 cmk
.utils
.log
.open_log(sys
.stderr
)
4073 cmk
.utils
.log
.set_verbosity(settings
.options
.verbosity
)
4075 settings
.paths
.log_file
.value
.parent
.mkdir(parents
=True, exist_ok
=True)
4076 if not settings
.options
.foreground
:
4077 cmk
.utils
.log
.open_log(str(settings
.paths
.log_file
.value
))
4079 logger
.info("-" * 65)
4080 logger
.info("mkeventd version %s starting" % cmk
.__version
__)
4082 slave_status
= default_slave_status_master()
4083 config
= load_configuration(settings
, logger
, slave_status
)
4084 history
= cmk
.ec
.history
.History(settings
, config
, logger
, StatusTableEvents
.columns
,
4085 StatusTableHistory
.columns
)
4087 pid_path
= settings
.paths
.pid_file
.value
4088 if pid_path
.exists():
4089 old_pid
= int(pid_path
.read_text(encoding
='utf-8'))
4090 if process_exists(old_pid
):
4092 logger
, "Old PID file %s still existing and mkeventd still running with PID %d."
4093 % (pid_path
, old_pid
))
4095 logger
.info("Removed orphaned PID file %s (process %d not running anymore)." %
4096 (pid_path
, old_pid
))
4098 # Make sure paths exist
4099 settings
.paths
.event_pipe
.value
.parent
.mkdir(parents
=True, exist_ok
=True)
4100 settings
.paths
.status_file
.value
.parent
.mkdir(parents
=True, exist_ok
=True)
4102 # First do all things that might fail, before daemonizing
4103 perfcounters
= Perfcounters(logger
.getChild("lock.perfcounters"))
4104 event_status
= EventStatus(settings
, config
, perfcounters
, history
,
4105 logger
.getChild("EventStatus"))
4106 lock_configuration
= ECLock(logger
.getChild("lock.configuration"))
4107 event_server
= EventServer(
4108 logger
.getChild("EventServer"), settings
, config
, slave_status
, perfcounters
,
4109 lock_configuration
, history
, event_status
, StatusTableEvents
.columns
)
4110 terminate_main_event
= threading
.Event()
4111 status_server
= StatusServer(
4112 logger
.getChild("StatusServer"), settings
, config
, slave_status
, perfcounters
,
4113 lock_configuration
, history
, event_status
, event_server
, terminate_main_event
)
4115 event_status
.load_status(event_server
)
4116 event_server
.compile_rules(config
["rules"], config
["rule_packs"])
4118 if not settings
.options
.foreground
:
4119 pid_path
.parent
.mkdir(parents
=True, exist_ok
=True)
4120 cmk
.utils
.daemon
.daemonize()
4121 logger
.info("Daemonized with PID %d." % os
.getpid())
4123 cmk
.utils
.daemon
.lock_with_pid_file(str(pid_path
))
4125 # Install signal hander
4126 def signal_handler(signum
, stack_frame
):
4127 logger
.verbose("Got signal %d." % signum
)
4128 raise MKSignalException(signum
)
4130 signal
.signal(signal
.SIGHUP
, signal_handler
)
4131 signal
.signal(signal
.SIGINT
, signal_handler
)
4132 signal
.signal(signal
.SIGQUIT
, signal_handler
)
4133 signal
.signal(signal
.SIGTERM
, signal_handler
)
4136 run_eventd(terminate_main_event
, settings
, config
, lock_configuration
, history
,
4137 perfcounters
, event_status
, event_server
, status_server
, slave_status
, logger
)
4139 # We reach this point, if the server has been killed by
4140 # a signal or hitting Ctrl-C (in foreground mode)
4142 # TODO: Move this cleanup stuff to the classes that are responsible for these resources
4144 # Remove event pipe and drain it, so that we make sure
4145 # that processes (syslog, etc) will not hang when trying
4146 # to write into the pipe.
4147 logger
.verbose("Cleaning up event pipe")
4148 pipe
= event_server
.open_pipe() # Open it
4149 settings
.paths
.event_pipe
.value
.unlink() # Remove pipe
4150 drain_pipe(pipe
) # Drain any data
4151 os
.close(pipe
) # Close pipe
4153 logger
.verbose("Saving final event state")
4154 event_status
.save_status()
4156 logger
.verbose("Cleaning up sockets")
4157 settings
.paths
.unix_socket
.value
.unlink()
4158 settings
.paths
.event_socket
.value
.unlink()
4160 logger
.verbose("Output hash stats")
4161 event_server
.output_hash_stats()
4163 logger
.verbose("Closing fds which might be still open")
4165 settings
.options
.syslog_udp
, settings
.options
.syslog_tcp
,
4166 settings
.options
.snmptrap_udp
4169 if isinstance(fd
, cmk
.ec
.settings
.FileDescriptor
):
4174 logger
.info("Successfully shut down.")
4178 if settings
.options
.debug
:
4180 bail_out(logger
, traceback
.format_exc())
4183 if pid_path
and cmk
.utils
.store
.have_lock(str(pid_path
)):
4190 if __name__
== "__main__":