Refactoring: Changed remaining check parameters starting with an 's' to the new rules...
[check_mk.git] / cmk / ec / main.py
blob1fb01622eb4b27242ee3d3d88865f609c0a31959
1 #!/usr/bin/env python
2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
9 # | |
10 # | Copyright Mathias Kettner 2014 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
27 # TODO: Refactor/document locking. It is not clear when and how to apply
28 # locks or when they are held by which component.
30 # TODO: Refactor events to be handled as objects, e.g. in case when
31 # creating objects. Or at least update the documentation. It is not clear
32 # which fields are mandatory for the events.
34 import abc
35 import ast
36 import errno
37 import json
38 import os
39 import pprint
40 import re
41 import select
42 import signal
43 import socket
44 import sys
45 import threading
46 import time
47 import traceback
48 from typing import Any, Dict, List, Optional, Tuple, Union # pylint: disable=unused-import
50 import pathlib2 as pathlib
51 import six
53 import cmk
54 import cmk.utils.daemon
55 import cmk.utils.defines
56 import cmk.ec.actions
57 import cmk.ec.export
58 import cmk.ec.history
59 import cmk.ec.settings
60 import cmk.ec.snmp
61 import cmk.utils.log
62 import cmk.utils.paths
63 import cmk.utils.profile
64 import cmk.utils.render
65 import cmk.utils.regex
66 import cmk.utils.debug
68 # suppress "Cannot find module" error from mypy
69 import livestatus # type: ignore
72 class SyslogPriority(object):
73 NAMES = {
74 0: "emerg",
75 1: "alert",
76 2: "crit",
77 3: "err",
78 4: "warning",
79 5: "notice",
80 6: "info",
81 7: "debug",
84 def __init__(self, value):
85 super(SyslogPriority, self).__init__()
86 self.value = int(value)
88 def __repr__(self):
89 return "SyslogPriority(%d)" % self.value
91 def __str__(self):
92 try:
93 return self.NAMES[self.value]
94 except KeyError:
95 return "(unknown priority %d)" % self.value
98 class SyslogFacility(object):
99 NAMES = {
100 0: 'kern',
101 1: 'user',
102 2: 'mail',
103 3: 'daemon',
104 4: 'auth',
105 5: 'syslog',
106 6: 'lpr',
107 7: 'news',
108 8: 'uucp',
109 9: 'cron',
110 10: 'authpriv',
111 11: 'ftp',
112 12: "ntp",
113 13: "logaudit",
114 14: "logalert",
115 15: "clock",
116 16: 'local0',
117 17: 'local1',
118 18: 'local2',
119 19: 'local3',
120 20: 'local4',
121 21: 'local5',
122 22: 'local6',
123 23: 'local7',
124 31: 'snmptrap', # HACK!
127 def __init__(self, value):
128 super(SyslogFacility, self).__init__()
129 self.value = int(value)
131 def __repr__(self):
132 return "SyslogFacility(%d)" % self.value
134 def __str__(self):
135 try:
136 return self.NAMES[self.value]
137 except KeyError:
138 return "(unknown facility %d)" % self.value
141 # Alas, we often have no clue about the actual encoding, so we have to guess:
142 # Initially we assume UTF-8, but fall back to latin-1 if it didn't work.
143 def decode_from_bytes(string_as_bytes):
144 # This is just a safeguard if we are inadvertedly called with a Unicode
145 # string. In theory this should never happen, but given the typing chaos in
146 # this script, one never knows. In the Unicode case, Python tries to be
147 # "helpful", but this fails miserably: Calling 'decode' on a Unicode string
148 # implicitly converts it via 'encode("ascii")' to a byte string first, but
149 # this can of course fail and doesn't make sense at all when we immediately
150 # call 'decode' on this byte string again. In a nutshell: The implicit
151 # conversions between str and unicode are a fundamentally broken idea, just
152 # like all implicit things and "helpful" ideas in general. :-P For further
153 # info see e.g. http://nedbatchelder.com/text/unipain.html
154 if isinstance(string_as_bytes, unicode):
155 return string_as_bytes
157 try:
158 return string_as_bytes.decode("utf-8")
159 except Exception:
160 return string_as_bytes.decode("latin-1")
163 def scrub_and_decode(s):
164 return decode_from_bytes(cmk.ec.history.scrub_string(s))
168 # .--Helper functions----------------------------------------------------.
169 # | _ _ _ |
170 # | | | | | ___| |_ __ ___ _ __ ___ |
171 # | | |_| |/ _ \ | '_ \ / _ \ '__/ __| |
172 # | | _ | __/ | |_) | __/ | \__ \ |
173 # | |_| |_|\___|_| .__/ \___|_| |___/ |
174 # | |_| |
175 # +----------------------------------------------------------------------+
176 # | Various helper functions |
177 # '----------------------------------------------------------------------'
180 class ECLock(object):
181 def __init__(self, logger):
182 super(ECLock, self).__init__()
183 self._logger = logger
184 self._lock = threading.Lock()
186 def acquire(self, blocking=True):
187 self._logger.debug("[%s] Trying to acquire lock", threading.current_thread().name)
189 ret = self._lock.acquire(blocking)
190 if ret is True:
191 self._logger.debug("[%s] Acquired lock", threading.current_thread().name)
192 else:
193 self._logger.debug("[%s] Non-blocking aquire failed", threading.current_thread().name)
195 return ret
197 def release(self):
198 self._logger.debug("[%s] Releasing lock", threading.current_thread().name)
199 self._lock.release()
201 def __enter__(self):
202 self.acquire()
204 def __exit__(self, exc_type, exc_val, exc_tb):
205 self.release()
206 return False # Do not swallow exceptions
209 class ECServerThread(threading.Thread):
210 @abc.abstractmethod
211 def serve(self):
212 raise NotImplementedError()
214 def __init__(self, name, logger, settings, config, slave_status, profiling_enabled,
215 profile_file):
216 super(ECServerThread, self).__init__(name=name)
217 self.settings = settings
218 self._config = config
219 self._slave_status = slave_status
220 self._profiling_enabled = profiling_enabled
221 self._profile_file = profile_file
222 self._terminate_event = threading.Event()
223 self._logger = logger
225 def run(self):
226 self._logger.info("Starting up")
228 while not self._terminate_event.is_set():
229 try:
230 with cmk.utils.profile.Profile(
231 enabled=self._profiling_enabled, profile_file=str(self._profile_file)):
232 self.serve()
233 except Exception:
234 self._logger.exception("Exception in %s server" % self.name)
235 if self._settings.options.debug:
236 raise
237 time.sleep(1)
239 self._logger.info("Terminated")
241 def terminate(self):
242 self._terminate_event.set()
245 def terminate(terminate_main_event, event_server, status_server):
246 terminate_main_event.set()
247 status_server.terminate()
248 event_server.terminate()
251 def bail_out(logger, reason):
252 logger.error("FATAL ERROR: %s" % reason)
253 sys.exit(1)
256 def process_exists(pid):
257 try:
258 os.kill(pid, 0)
259 return True
260 except Exception:
261 return False
264 def drain_pipe(pipe):
265 while True:
266 try:
267 readable = select.select([pipe], [], [], 0.1)[0]
268 except select.error as e:
269 if e[0] == errno.EINTR:
270 continue
271 raise
273 data = None
274 if pipe in readable:
275 try:
276 data = os.read(pipe, 4096)
277 if not data: # EOF
278 break
279 except Exception:
280 break # Error while reading
281 else:
282 break # No data available
285 def match(pattern, text, complete=True):
286 """Performs an EC style matching test of pattern on text
288 Returns False in case of no match or a tuple with the match groups.
289 In case no match group is produced, it returns an empty tuple."""
290 if pattern is None:
291 return ()
293 elif isinstance(pattern, six.string_types):
294 if complete:
295 return () if pattern == text.lower() else False
296 return () if pattern in text.lower() else False
298 # Assume compiled regex
299 m = pattern.search(text)
300 if m:
301 groups = m.groups()
302 if None in groups:
303 # Remove None from result tuples and replace it with empty strings
304 return tuple([g if g is not None else '' for g in groups])
305 return groups
307 return False
310 def format_pattern(pat):
311 try:
312 return pat.pattern
313 except Exception:
314 return pat
317 # Sorry: this code is dupliated in web/plugins/wato/mkeventd.py
318 def match_ipv4_network(pattern, ipaddress_text):
319 network, network_bits = parse_ipv4_network(pattern) # is validated by valuespec
320 if network_bits == 0:
321 return True # event if ipaddress is empty
322 try:
323 ipaddress = parse_ipv4_address(ipaddress_text)
324 except Exception:
325 return False # invalid address never matches
327 # first network_bits of network and ipaddress must be
328 # identical. Create a bitmask.
329 bitmask = 0
330 for n in xrange(32):
331 bitmask = bitmask << 1
332 if n < network_bits:
333 bit = 1
334 else:
335 bit = 0
336 bitmask += bit
338 return (network & bitmask) == (ipaddress & bitmask)
341 def parse_ipv4_address(text):
342 parts = map(int, text.split("."))
343 return (parts[0] << 24) + (parts[1] << 16) + (parts[2] << 8) + parts[3]
346 def parse_ipv4_network(text):
347 if "/" not in text:
348 return parse_ipv4_address(text), 32
350 network_text, bits_text = text.split("/")
351 return parse_ipv4_address(network_text), int(bits_text)
354 def replace_groups(text, origtext, match_groups):
355 # replace \0 with text itself. This allows to add information
356 # in front or and the end of a message
357 text = text.replace("\\0", origtext)
359 # Generic replacement with \1, \2, ...
360 match_groups_message = match_groups.get("match_groups_message")
361 if isinstance(match_groups_message, tuple):
362 for nr, g in enumerate(match_groups_message):
363 text = text.replace("\\%d" % (nr + 1), g)
365 # Replacement with keyword
366 # Right now we have
367 # $MATCH_GROUPS_MESSAGE_x$
368 # $MATCH_GROUPS_SYSLOG_APPLICATION_x$
369 for key_prefix, values in match_groups.iteritems():
370 if not isinstance(values, tuple):
371 continue
373 for idx, match_value in enumerate(values):
374 text = text.replace("$%s_%d$" % (key_prefix.upper(), idx + 1), match_value)
376 return text
379 class MKSignalException(Exception):
380 def __init__(self, signum):
381 Exception.__init__(self, "Got signal %d" % signum)
382 self._signum = signum
385 class MKClientError(Exception):
386 def __init__(self, t):
387 Exception.__init__(self, t)
391 # .--Timeperiods---------------------------------------------------------.
392 # | _____ _ _ _ |
393 # | |_ _(_)_ __ ___ ___ _ __ ___ _ __(_) ___ __| |___ |
394 # | | | | | '_ ` _ \ / _ \ '_ \ / _ \ '__| |/ _ \ / _` / __| |
395 # | | | | | | | | | | __/ |_) | __/ | | | (_) | (_| \__ \ |
396 # | |_| |_|_| |_| |_|\___| .__/ \___|_| |_|\___/ \__,_|___/ |
397 # | |_| |
398 # +----------------------------------------------------------------------+
399 # | Timeperiods are used in rule conditions |
400 # '----------------------------------------------------------------------'
403 class TimePeriods(object):
404 def __init__(self, logger):
405 super(TimePeriods, self).__init__()
406 self._logger = logger
407 self._periods = None
408 self._last_update = 0
410 def _update(self):
411 if self._periods is not None and int(time.time()) / 60 == self._last_update:
412 return # only update once a minute
413 try:
414 table = livestatus.LocalConnection().query("GET timeperiods\nColumns: name alias in")
415 periods = {}
416 for tpname, alias, isin in table:
417 periods[tpname] = (alias, bool(isin))
418 self._periods = periods
419 self._last_update = int(time.time()) / 60
420 except Exception as e:
421 self._logger.exception("Cannot update timeperiod information: %s" % e)
422 raise
424 def check(self, tpname):
425 self._update()
426 if not self._periods:
427 self._logger.warning("no timeperiod information, assuming %s is active" % tpname)
428 return True
429 if tpname not in self._periods:
430 self._logger.warning("no such timeperiod %s, assuming it is active" % tpname)
431 return True
432 return self._periods[tpname][1]
436 # .--Host config---------------------------------------------------------.
437 # | _ _ _ __ _ |
438 # | | | | | ___ ___| |_ ___ ___ _ __ / _(_) __ _ |
439 # | | |_| |/ _ \/ __| __| / __/ _ \| '_ \| |_| |/ _` | |
440 # | | _ | (_) \__ \ |_ | (_| (_) | | | | _| | (_| | |
441 # | |_| |_|\___/|___/\__| \___\___/|_| |_|_| |_|\__, | |
442 # | |___/ |
443 # +----------------------------------------------------------------------+
444 # | Manages the configuration of the hosts of the local monitoring core. |
445 # | It fetches and caches the information during runtine of the EC. |
446 # '----------------------------------------------------------------------'
449 class HostConfig(object):
450 def __init__(self, logger):
451 self._logger = logger
452 self.initialize()
454 def initialize(self):
455 self._logger.debug("Initializing host config")
456 self._event_host_to_host = {}
458 self._hosts_by_name = {}
459 self._hosts_by_lower_name = {}
460 self._hosts_by_lower_alias = {}
461 self._hosts_by_lower_address = {}
463 self._got_config_from_core = False
465 def get(self, host_name, deflt=None):
466 return self._hosts_by_name.get(host_name, deflt)
468 def get_by_event_host_name(self, event_host_name, deflt=None):
469 try:
470 self._update_from_core()
471 except Exception:
472 self._logger.exception("Failed to get host info from core. Try again later.")
473 return
475 try:
476 return self._event_host_to_host[event_host_name]
477 except KeyError:
478 pass # Not cached yet
480 # Note: It is important that we use exactly the same algorithm here as in the core
481 # (enterprise/core/src/World.cc getHostByDesignation)
483 # Host name : Case insensitive equality (host_name =~ %s)
484 # Host alias : Case insensitive equality (host_alias =~ %s)
485 # Host address : Case insensitive equality (host_address =~ %s)
486 low_event_host_name = event_host_name.lower()
488 host = deflt
489 for search_map in [
490 self._hosts_by_lower_name, self._hosts_by_lower_address, self._hosts_by_lower_alias
492 try:
493 host = search_map[low_event_host_name]
494 break
495 except KeyError:
496 continue
498 self._event_host_to_host[event_host_name] = host
499 return host
501 def _update_from_core(self):
502 if not self._has_core_config_reloaded():
503 return
505 self.initialize()
506 self._logger.debug("Fetching host config from core")
508 columns = [
509 "name",
510 "alias",
511 "address",
512 "custom_variables",
513 "contacts",
514 "contact_groups",
517 query = "GET hosts\nColumns: %s" % " ".join(columns)
518 for host in livestatus.LocalConnection().query_table_assoc(query):
519 self._hosts_by_name[host["name"]] = host
521 # Lookup maps to improve performance of host searches
522 self._hosts_by_lower_name[host["name"].lower()] = host
523 self._hosts_by_lower_alias[host["alias"].lower()] = host
524 self._hosts_by_lower_address[host["alias"].lower()] = host
526 self._logger.debug("Got %d hosts from core" % len(self._hosts_by_name))
527 self._got_config_from_core = self._get_core_start_time()
529 def _has_core_config_reloaded(self):
530 if not self._got_config_from_core:
531 return True
533 if self._get_core_start_time() > self._got_config_from_core:
534 return True
536 return False
538 def _get_core_start_time(self):
539 query = ("GET status\n" "Columns: program_start\n")
540 return livestatus.LocalConnection().query_value(query)
544 # .--Perfcounters--------------------------------------------------------.
545 # | ____ __ _ |
546 # | | _ \ ___ _ __ / _| ___ ___ _ _ _ __ | |_ ___ _ __ ___ |
547 # | | |_) / _ \ '__| |_ / __/ _ \| | | | '_ \| __/ _ \ '__/ __| |
548 # | | __/ __/ | | _| (_| (_) | |_| | | | | || __/ | \__ \ |
549 # | |_| \___|_| |_| \___\___/ \__,_|_| |_|\__\___|_| |___/ |
550 # | |
551 # +----------------------------------------------------------------------+
552 # | Helper class for performance counting |
553 # '----------------------------------------------------------------------'
556 def lerp(a, b, t):
557 """Linear interpolation between a and b with weight t"""
558 return (1 - t) * a + t * b
561 class Perfcounters(object):
562 _counter_names = [
563 "messages",
564 "rule_tries",
565 "rule_hits",
566 "drops",
567 "overflows",
568 "events",
569 "connects",
572 # Average processing times
573 _weights = {
574 "processing": 0.99, # event processing
575 "sync": 0.95, # Replication sync
576 "request": 0.95, # Client requests
579 # TODO: Why aren't self._times / self._rates / ... not initialized with their defaults?
580 def __init__(self, logger):
581 super(Perfcounters, self).__init__()
582 self._lock = ECLock(logger)
584 # Initialize counters
585 self._counters = dict([(n, 0) for n in self._counter_names])
587 self._old_counters = {}
588 self._rates = {}
589 self._average_rates = {}
590 self._times = {}
591 self._last_statistics = None
593 self._logger = logger.getChild("Perfcounters")
595 def count(self, counter):
596 with self._lock:
597 self._counters[counter] += 1
599 def count_time(self, counter, ptime):
600 with self._lock:
601 if counter in self._times:
602 self._times[counter] = lerp(ptime, self._times[counter], self._weights[counter])
603 else:
604 self._times[counter] = ptime
606 def do_statistics(self):
607 with self._lock:
608 now = time.time()
609 if self._last_statistics:
610 duration = now - self._last_statistics
611 else:
612 duration = 0
613 for name, value in self._counters.iteritems():
614 if duration:
615 delta = value - self._old_counters[name]
616 rate = delta / duration
617 self._rates[name] = rate
618 if name in self._average_rates:
619 # We could make the weight configurable
620 self._average_rates[name] = lerp(rate, self._average_rates[name], 0.9)
621 else:
622 self._average_rates[name] = rate
624 self._last_statistics = now
625 self._old_counters = self._counters.copy()
627 @classmethod
628 def status_columns(cls):
629 columns = []
630 # Please note: status_columns() and get_status() need to produce lists with exact same column order
631 for name in cls._counter_names:
632 columns.append(("status_" + name, 0))
633 columns.append(("status_" + name.rstrip("s") + "_rate", 0.0))
634 columns.append(("status_average_" + name.rstrip("s") + "_rate", 0.0))
636 for name in cls._weights:
637 columns.append(("status_average_%s_time" % name, 0.0))
639 return columns
641 def get_status(self):
642 with self._lock:
643 row = []
644 # Please note: status_columns() and get_status() need to produce lists with exact same column order
645 for name in self._counter_names:
646 row.append(self._counters[name])
647 row.append(self._rates.get(name, 0.0))
648 row.append(self._average_rates.get(name, 0.0))
650 for name in self._weights:
651 row.append(self._times.get(name, 0.0))
653 return row
657 # .--EventServer---------------------------------------------------------.
658 # | _____ _ ____ |
659 # | | ____|_ _____ _ __ | |_/ ___| ___ _ ____ _____ _ __ |
660 # | | _| \ \ / / _ \ '_ \| __\___ \ / _ \ '__\ \ / / _ \ '__| |
661 # | | |___ \ V / __/ | | | |_ ___) | __/ | \ V / __/ | |
662 # | |_____| \_/ \___|_| |_|\__|____/ \___|_| \_/ \___|_| |
663 # | |
664 # +----------------------------------------------------------------------+
665 # | Verarbeitung und Klassifizierung von eingehenden Events. |
666 # '----------------------------------------------------------------------'
669 class EventServer(ECServerThread):
670 month_names = {
671 "Jan": 1,
672 "Feb": 2,
673 "Mar": 3,
674 "Apr": 4,
675 "May": 5,
676 "Jun": 6,
677 "Jul": 7,
678 "Aug": 8,
679 "Sep": 9,
680 "Oct": 10,
681 "Nov": 11,
682 "Dec": 12,
685 def __init__(self, logger, settings, config, slave_status, perfcounters, lock_configuration,
686 history, event_status, event_columns):
687 super(EventServer, self).__init__(
688 name="EventServer",
689 logger=logger,
690 settings=settings,
691 config=config,
692 slave_status=slave_status,
693 profiling_enabled=settings.options.profile_event,
694 profile_file=settings.paths.event_server_profile.value)
695 self._syslog = None
696 self._syslog_tcp = None
697 self._snmptrap = None
699 self._rules = []
700 self._hash_stats = []
701 for _unused_facility in xrange(32):
702 self._hash_stats.append([0] * 8)
704 self.host_config = HostConfig(self._logger)
705 self._perfcounters = perfcounters
706 self._lock_configuration = lock_configuration
707 self._history = history
708 self._event_status = event_status
709 self._event_columns = event_columns
710 self._message_period = cmk.ec.history.ActiveHistoryPeriod()
711 self._rule_matcher = RuleMatcher(self._logger, config)
713 self.create_pipe()
714 self.open_eventsocket()
715 self.open_syslog()
716 self.open_syslog_tcp()
717 self.open_snmptrap()
718 self._snmp_trap_engine = cmk.ec.snmp.SNMPTrapEngine(self.settings, self._config,
719 self._logger.getChild("snmp"),
720 self.handle_snmptrap)
722 @classmethod
723 def status_columns(cls):
724 columns = cls._general_columns()
725 columns += Perfcounters.status_columns()
726 columns += cls._replication_columns()
727 columns += cls._event_limit_columns()
728 return columns
730 @classmethod
731 def _general_columns(cls):
732 return [
733 ("status_config_load_time", 0),
734 ("status_num_open_events", 0),
735 ("status_virtual_memory_size", 0),
738 @classmethod
739 def _replication_columns(cls):
740 return [
741 ("status_replication_slavemode", ""),
742 ("status_replication_last_sync", 0.0),
743 ("status_replication_success", False),
746 @classmethod
747 def _event_limit_columns(cls):
748 return [
749 ("status_event_limit_host", 0),
750 ("status_event_limit_rule", 0),
751 ("status_event_limit_overall", 0),
752 ("status_event_limit_active_hosts", []),
753 ("status_event_limit_active_rules", []),
754 ("status_event_limit_active_overall", False),
757 def get_status(self):
758 row = []
760 row += self._add_general_status()
761 row += self._perfcounters.get_status()
762 row += self._add_replication_status()
763 row += self._add_event_limit_status()
765 return [row]
767 def _add_general_status(self):
768 return [
769 self._config["last_reload"],
770 self._event_status.num_existing_events,
771 self._virtual_memory_size(),
774 def _virtual_memory_size(self):
775 parts = file('/proc/self/stat').read().split()
776 return int(parts[22]) # in Bytes
778 def _add_replication_status(self):
779 if is_replication_slave(self._config):
780 return [
781 self._slave_status["mode"],
782 self._slave_status["last_sync"],
783 self._slave_status["success"],
785 return ["master", 0.0, False]
787 def _add_event_limit_status(self):
788 return [
789 self._config["event_limit"]["by_host"]["limit"],
790 self._config["event_limit"]["by_rule"]["limit"],
791 self._config["event_limit"]["overall"]["limit"],
792 self.get_hosts_with_active_event_limit(),
793 self.get_rules_with_active_event_limit(),
794 self.is_overall_event_limit_active(),
797 def create_pipe(self):
798 path = self.settings.paths.event_pipe.value
799 try:
800 if not path.is_fifo():
801 path.unlink()
802 except Exception:
803 pass
805 if not path.exists():
806 os.mkfifo(str(path))
808 # We want to be able to receive events from all users on the local system
809 path.chmod(0o666) # nosec
811 self._logger.info("Created FIFO '%s' for receiving events" % path)
813 def open_syslog(self):
814 endpoint = self.settings.options.syslog_udp
815 try:
816 if isinstance(endpoint, cmk.ec.settings.FileDescriptor):
817 self._syslog = socket.fromfd(endpoint.value, socket.AF_INET, socket.SOCK_DGRAM)
818 os.close(endpoint.value)
819 self._logger.info(
820 "Opened builtin syslog server on inherited filedescriptor %d" % endpoint.value)
821 if isinstance(endpoint, cmk.ec.settings.PortNumber):
822 self._syslog = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
823 self._syslog.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
824 self._syslog.bind(("0.0.0.0", endpoint.value))
825 self._logger.info("Opened builtin syslog server on UDP port %d" % endpoint.value)
826 except Exception as e:
827 raise Exception("Cannot start builtin syslog server: %s" % e)
829 def open_syslog_tcp(self):
830 endpoint = self.settings.options.syslog_tcp
831 try:
832 if isinstance(endpoint, cmk.ec.settings.FileDescriptor):
833 self._syslog_tcp = socket.fromfd(endpoint.value, socket.AF_INET, socket.SOCK_STREAM)
834 self._syslog_tcp.listen(20)
835 os.close(endpoint.value)
836 self._logger.info("Opened builtin syslog-tcp server on inherited filedescriptor %d"
837 % endpoint.value)
838 if isinstance(endpoint, cmk.ec.settings.PortNumber):
839 self._syslog_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
840 self._syslog_tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
841 self._syslog_tcp.bind(("0.0.0.0", endpoint.value))
842 self._syslog_tcp.listen(20)
843 self._logger.info(
844 "Opened builtin syslog-tcp server on TCP port %d" % endpoint.value)
845 except Exception as e:
846 raise Exception("Cannot start builtin syslog-tcp server: %s" % e)
848 def open_snmptrap(self):
849 endpoint = self.settings.options.snmptrap_udp
850 try:
851 if isinstance(endpoint, cmk.ec.settings.FileDescriptor):
852 self._snmptrap = socket.fromfd(endpoint.value, socket.AF_INET, socket.SOCK_DGRAM)
853 os.close(endpoint.value)
854 self._logger.info("Opened builtin snmptrap server on inherited filedescriptor %d" %
855 endpoint.value)
856 if isinstance(endpoint, cmk.ec.settings.PortNumber):
857 self._snmptrap = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
858 self._snmptrap.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
859 self._snmptrap.bind(("0.0.0.0", endpoint.value))
860 self._logger.info("Opened builtin snmptrap server on UDP port %d" % endpoint.value)
861 except Exception as e:
862 raise Exception("Cannot start builtin snmptrap server: %s" % e)
864 def open_eventsocket(self):
865 path = self.settings.paths.event_socket.value
866 if path.exists():
867 path.unlink()
868 path.parent.mkdir(parents=True, exist_ok=True)
869 self._eventsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
870 self._eventsocket.bind(str(path))
871 path.chmod(0o664)
872 self._eventsocket.listen(self._config['eventsocket_queue_len'])
873 self._logger.info("Opened UNIX socket '%s' for receiving events" % path)
875 def open_pipe(self):
876 # Beware: we must open the pipe also for writing. Otherwise
877 # we will see EOF forever after one writer has finished and
878 # select() will trigger even if there is no data. A good article
879 # about this is here:
880 # http://www.outflux.net/blog/archives/2008/03/09/using-select-on-a-fifo/
881 return os.open(str(self.settings.paths.event_pipe.value), os.O_RDWR | os.O_NONBLOCK)
883 def handle_snmptrap(self, trap, ipaddress):
884 self.process_event(self._create_event_from_trap(trap, ipaddress))
886 def _create_event_from_trap(self, trap, ipaddress):
887 # use the trap-oid as application
888 application = u''
889 for index, (oid, _unused_val) in enumerate(trap):
890 if oid in ['1.3.6.1.6.3.1.1.4.1.0', 'SNMPv2-MIB::snmpTrapOID.0']:
891 application = scrub_and_decode(trap.pop(index)[1])
892 break
894 # once we got here we have a real parsed trap which we convert to an event now
895 safe_ipaddress = scrub_and_decode(ipaddress)
896 text = scrub_and_decode(', '.join(['%s: %s' % (item[0], str(item[1])) for item in trap]))
898 event = {
899 'time': time.time(),
900 'host': safe_ipaddress,
901 'ipaddress': safe_ipaddress,
902 'priority': 5, # notice
903 'facility': 31, # not used by syslog -> we use this for all traps
904 'application': application,
905 'text': text,
906 'core_host': '',
907 'host_in_downtime': False,
910 return event
912 def serve(self):
913 pipe_fragment = ''
914 pipe = self.open_pipe()
915 listen_list = [pipe]
917 # Wait for incoming syslog packets via UDP
918 if self._syslog is not None:
919 listen_list.append(self._syslog.fileno())
921 # Wait for new connections for events via TCP socket
922 if self._syslog_tcp is not None:
923 listen_list.append(self._syslog_tcp)
925 # Wait for new connections for events via unix socket
926 if self._eventsocket:
927 listen_list.append(self._eventsocket)
929 # Wait for incomding SNMP traps
930 if self._snmptrap is not None:
931 listen_list.append(self._snmptrap.fileno())
933 # Keep list of client connections via UNIX socket and
934 # read data that is not yet processed. Map from
935 # fd to (fileobject, data)
936 client_sockets = {}
937 select_timeout = 1
938 while not self._terminate_event.is_set():
939 try:
940 readable = select.select(listen_list + client_sockets.keys(), [], [],
941 select_timeout)[0]
942 except select.error as e:
943 if e[0] == errno.EINTR:
944 continue
945 raise
946 data = None
948 # Accept new connection on event unix socket
949 if self._eventsocket in readable:
950 client_socket, address = self._eventsocket.accept()
951 # pylint: disable=no-member
952 client_sockets[client_socket.fileno()] = (client_socket, address, "")
954 # Same for the TCP syslog socket
955 if self._syslog_tcp and self._syslog_tcp in readable:
956 client_socket, address = self._syslog_tcp.accept()
957 # pylint: disable=no-member
958 client_sockets[client_socket.fileno()] = (client_socket, address, "")
960 # Read data from existing event unix socket connections
961 # NOTE: We modify client_socket in the loop, so we need to copy below!
962 for fd, (cs, address, previous_data) in list(client_sockets.iteritems()):
963 if fd in readable:
964 # Receive next part of data
965 try:
966 new_data = cs.recv(4096)
967 except Exception:
968 new_data = ""
969 address = None
971 # Put together with incomplete messages from last time
972 data = previous_data + new_data
974 # Do we have incomplete data? (if the socket has been
975 # closed then we consider the pending message always
976 # as complete, even if there was no trailing \n)
977 if new_data and not data.endswith("\n"): # keep fragment
978 # Do we have any complete messages?
979 if '\n' in data:
980 complete, rest = data.rsplit("\n", 1)
981 self.process_raw_lines(complete + "\n", address)
982 else:
983 rest = data # keep for next time
985 # Only complete messages
986 else:
987 if data:
988 self.process_raw_lines(data, address)
989 rest = ""
991 # Connection still open?
992 if new_data:
993 client_sockets[fd] = (cs, address, rest)
994 else:
995 cs.close()
996 del client_sockets[fd]
998 # Read data from pipe
999 if pipe in readable:
1000 try:
1001 data = os.read(pipe, 4096)
1002 if data:
1003 # Prepend previous beginning of message to read data
1004 data = pipe_fragment + data
1005 pipe_fragment = ""
1007 # Last message still incomplete?
1008 if data[-1] != '\n':
1009 if '\n' in data: # at least one complete message contained
1010 messages, pipe_fragment = data.rsplit('\n', 1)
1011 self.process_raw_lines(messages + '\n') # got lost in split
1012 else:
1013 pipe_fragment = data # keep beginning of message, wait for \n
1014 else:
1015 self.process_raw_lines(data)
1016 else: # EOF
1017 os.close(pipe)
1018 pipe = self.open_pipe()
1019 listen_list[0] = pipe
1020 # Pending fragments from previos reads that are not terminated
1021 # by a \n are ignored.
1022 if pipe_fragment:
1023 self._logger.warning(
1024 "Ignoring incomplete message '%s' from pipe" % pipe_fragment)
1025 pipe_fragment = ""
1026 except Exception:
1027 pass
1029 # Read events from builtin syslog server
1030 if self._syslog is not None and self._syslog.fileno() in readable:
1031 self.process_raw_lines(*self._syslog.recvfrom(4096))
1033 # Read events from builtin snmptrap server
1034 if self._snmptrap is not None and self._snmptrap.fileno() in readable:
1035 try:
1036 message, sender_address = self._snmptrap.recvfrom(65535)
1037 self.process_raw_data(
1038 lambda: self._snmp_trap_engine.process_snmptrap(message, sender_address))
1039 except Exception:
1040 self._logger.exception(
1041 'Exception handling a SNMP trap from "%s". Skipping this one' %
1042 sender_address[0])
1044 try:
1045 # process the first spool file we get
1046 spool_file = next(self.settings.paths.spool_dir.value.glob('[!.]*'))
1047 self.process_raw_lines(spool_file.read_bytes())
1048 spool_file.unlink()
1049 select_timeout = 0 # enable fast processing to process further files
1050 except StopIteration:
1051 select_timeout = 1 # restore default select timeout
1053 # Processes incoming data, just a wrapper between the real data and the
1054 # handler function to record some statistics etc.
1055 def process_raw_data(self, handler):
1056 self._perfcounters.count("messages")
1057 before = time.time()
1058 # In replication slave mode (when not took over), ignore all events
1059 if not is_replication_slave(self._config) or self._slave_status["mode"] != "sync":
1060 handler()
1061 elif self.settings.options.debug:
1062 self._logger.info("Replication: we are in slave mode, ignoring event")
1063 elapsed = time.time() - before
1064 self._perfcounters.count_time("processing", elapsed)
1066 # Takes several lines of messages, handles encoding and processes them separated
1067 def process_raw_lines(self, data, address=None):
1068 lines = data.splitlines()
1069 for line in lines:
1070 line = scrub_and_decode(line.rstrip())
1071 if line:
1072 try:
1074 def handler(line=line):
1075 self.process_line(line, address)
1077 self.process_raw_data(handler)
1078 except Exception as e:
1079 self._logger.exception(
1080 'Exception handling a log line (skipping this one): %s' % e)
1082 def do_housekeeping(self):
1083 with self._event_status.lock:
1084 with self._lock_configuration:
1085 self.hk_handle_event_timeouts()
1086 self.hk_check_expected_messages()
1087 self.hk_cleanup_downtime_events()
1088 self._history.housekeeping()
1090 # For all events that have been created in a host downtime check the host
1091 # whether or not it is still in downtime. In case the downtime has ended
1092 # archive the events that have been created in a downtime.
1093 def hk_cleanup_downtime_events(self):
1094 host_downtimes = {}
1096 for event in self._event_status.events():
1097 if not event["host_in_downtime"]:
1098 continue # only care about events created in downtime
1100 try:
1101 in_downtime = host_downtimes[event["core_host"]]
1102 except KeyError:
1103 in_downtime = self._is_host_in_downtime(event)
1104 host_downtimes[event["core_host"]] = in_downtime
1106 if in_downtime:
1107 continue # (still) in downtime, don't delete any event
1109 self._logger.verbose(
1110 "Remove event %d (created in downtime, host left downtime)" % event["id"])
1111 self._event_status.remove_event(event)
1113 def hk_handle_event_timeouts(self):
1114 # 1. Automatically delete all events that are in state "counting"
1115 # and have not reached the required number of hits and whose
1116 # time is elapsed.
1117 # 2. Automatically delete all events that are in state "open"
1118 # and whose livetime is elapsed.
1119 events_to_delete = []
1120 events = self._event_status.events()
1121 now = time.time()
1122 for nr, event in enumerate(events):
1123 rule = self._rule_by_id.get(event["rule_id"])
1125 if event["phase"] == "counting":
1126 # Event belongs to a rule that does not longer exist? It
1127 # will never reach its count. Better delete it.
1128 if not rule:
1129 self._logger.info("Deleting orphaned event %d created by obsolete rule %s" %
1130 (event["id"], event["rule_id"]))
1131 event["phase"] = "closed"
1132 self._history.add(event, "ORPHANED")
1133 events_to_delete.append(nr)
1135 elif "count" not in rule and "expect" not in rule:
1136 self._logger.info(
1137 "Count-based event %d belonging to rule %s: rule does not "
1138 "count/expect anymore. Deleting event." % (event["id"], event["rule_id"]))
1139 event["phase"] = "closed"
1140 self._history.add(event, "NOCOUNT")
1141 events_to_delete.append(nr)
1143 # handle counting
1144 elif "count" in rule:
1145 count = rule["count"]
1146 if count.get("algorithm") in ["tokenbucket", "dynabucket"]:
1147 last_token = event.get("last_token", event["first"])
1148 secs_per_token = count["period"] / float(count["count"])
1149 if count["algorithm"] == "dynabucket": # get fewer tokens if count is lower
1150 if event["count"] <= 1:
1151 secs_per_token = count["period"]
1152 else:
1153 secs_per_token *= (float(count["count"]) / float(event["count"]))
1154 elapsed_secs = now - last_token
1155 new_tokens = int(elapsed_secs / secs_per_token)
1156 if new_tokens:
1157 if self.settings.options.debug:
1158 self._logger.info(
1159 "Rule %s/%s, event %d: got %d new tokens" %
1160 (rule["pack"], rule["id"], event["id"], new_tokens))
1161 event["count"] = max(0, event["count"] - new_tokens)
1162 event[
1163 "last_token"] = last_token + new_tokens * secs_per_token # not now! would be unfair
1164 if event["count"] == 0:
1165 self._logger.info(
1166 "Rule %s/%s, event %d: again without allowed rate, dropping event"
1167 % (rule["pack"], rule["id"], event["id"]))
1168 event["phase"] = "closed"
1169 self._history.add(event, "COUNTFAILED")
1170 events_to_delete.append(nr)
1172 else: # algorithm 'interval'
1173 if event["first"] + count["period"] <= now: # End of period reached
1174 self._logger.info(
1175 "Rule %s/%s: reached only %d out of %d events within %d seconds. "
1176 "Resetting to zero." % (rule["pack"], rule["id"], event["count"],
1177 count["count"], count["period"]))
1178 event["phase"] = "closed"
1179 self._history.add(event, "COUNTFAILED")
1180 events_to_delete.append(nr)
1182 # Handle delayed actions
1183 elif event["phase"] == "delayed":
1184 delay_until = event.get("delay_until", 0) # should always be present
1185 if now >= delay_until:
1186 self._logger.info("Delayed event %d of rule %s is now activated." %
1187 (event["id"], event["rule_id"]))
1188 event["phase"] = "open"
1189 self._history.add(event, "DELAYOVER")
1190 if rule:
1191 cmk.ec.actions.event_has_opened(self._history, self.settings, self._config,
1192 self._logger, self, self._event_columns,
1193 rule, event)
1194 if rule.get("autodelete"):
1195 event["phase"] = "closed"
1196 self._history.add(event, "AUTODELETE")
1197 events_to_delete.append(nr)
1199 else:
1200 self._logger.info("Cannot do rule action: rule %s not present anymore." %
1201 event["rule_id"])
1203 # Handle events with a limited lifetime
1204 elif "live_until" in event:
1205 if now >= event["live_until"]:
1206 allowed_phases = event.get("live_until_phases", ["open"])
1207 if event["phase"] in allowed_phases:
1208 event["phase"] = "closed"
1209 events_to_delete.append(nr)
1210 self._logger.info("Livetime of event %d (rule %s) exceeded. Deleting event."
1211 % (event["id"], event["rule_id"]))
1212 self._history.add(event, "EXPIRED")
1214 # Do delayed deletion now (was delayed in order to keep list indices OK)
1215 for nr in events_to_delete[::-1]:
1216 self._event_status.remove_event(events[nr])
1218 def hk_check_expected_messages(self):
1219 now = time.time()
1220 # "Expecting"-rules are rules that require one or several
1221 # occurrances of a message within a defined time period.
1222 # Whenever one period of time has elapsed, we need to check
1223 # how many messages have been seen for that rule. If these
1224 # are too few, we open an event.
1225 # We need to handle to cases:
1226 # 1. An event for such a rule already exists and is
1227 # in the state "counting" -> this can only be the case if
1228 # more than one occurrance is required.
1229 # 2. No event at all exists.
1230 # in that case.
1231 for rule in self._rules:
1232 if "expect" in rule:
1234 if not self.event_rule_matches_site(rule, event=None):
1235 continue
1237 # Interval is either a number of seconds, or pair of a number of seconds
1238 # (e.g. 86400, meaning one day) and a timezone offset relative to UTC in hours.
1239 interval = rule["expect"]["interval"]
1240 expected_count = rule["expect"]["count"]
1242 interval_start = self._event_status.interval_start(rule["id"], interval)
1243 if interval_start >= now:
1244 continue
1246 next_interval_start = self._event_status.next_interval_start(
1247 interval, interval_start)
1248 if next_interval_start > now:
1249 continue
1251 # Interval has been elapsed. Now comes the truth: do we have enough
1252 # rule matches?
1254 # First do not forget to switch to next interval
1255 self._event_status.start_next_interval(rule["id"], interval)
1257 # First look for case 1: rule that already have at least one hit
1258 # and this events in the state "counting" exist.
1259 events_to_delete = []
1260 events = self._event_status.events()
1261 for nr, event in enumerate(events):
1262 if event["rule_id"] == rule["id"] and event["phase"] == "counting":
1263 # time has elapsed. Now lets see if we have reached
1264 # the neccessary count:
1265 if event["count"] < expected_count: # no -> trigger alarm
1266 self._handle_absent_event(rule, event["count"], expected_count,
1267 event["last"])
1268 else: # yes -> everything is fine. Just log.
1269 self._logger.info(
1270 "Rule %s/%s has reached %d occurrances (%d required). "
1271 "Starting next period." % (rule["pack"], rule["id"], event["count"],
1272 expected_count))
1273 self._history.add(event, "COUNTREACHED")
1274 # Counting event is no longer needed.
1275 events_to_delete.append(nr)
1276 break
1278 # Ou ou, no event found at all.
1279 else:
1280 self._handle_absent_event(rule, 0, expected_count, interval_start)
1282 for nr in events_to_delete[::-1]:
1283 self._event_status.remove_event(events[nr])
1285 def _handle_absent_event(self, rule, event_count, expected_count, interval_start):
1286 now = time.time()
1287 if event_count:
1288 text = "Expected message arrived only %d out of %d times since %s" % \
1289 (event_count, expected_count, time.strftime("%F %T", time.localtime(interval_start)))
1290 else:
1291 text = "Expected message did not arrive since %s" % \
1292 time.strftime("%F %T", time.localtime(interval_start))
1294 # If there is already an incidence about this absent message, we can merge and
1295 # not create a new event. There is a setting for this.
1296 merge_event = None
1297 merge = rule["expect"].get("merge", "open")
1298 if merge != "never":
1299 for event in self._event_status.events():
1300 if event["rule_id"] == rule["id"] and \
1301 (event["phase"] == "open" or
1302 (event["phase"] == "ack" and merge == "acked")):
1303 merge_event = event
1304 break
1306 if merge_event:
1307 merge_event["last"] = now
1308 merge_event["count"] += 1
1309 merge_event["phase"] = "open"
1310 merge_event["time"] = now
1311 merge_event["text"] = text
1312 # Better rewrite (again). Rule might have changed. Also we have changed
1313 # the text and the user might have his own text added via set_text.
1314 self.rewrite_event(rule, merge_event, {})
1315 self._history.add(merge_event, "COUNTFAILED")
1316 else:
1317 # Create artifical event from scratch. Make sure that all important
1318 # fields are defined.
1319 event = {
1320 "rule_id": rule["id"],
1321 "text": text,
1322 "phase": "open",
1323 "count": 1,
1324 "time": now,
1325 "first": now,
1326 "last": now,
1327 "comment": "",
1328 "host": "",
1329 "ipaddress": "",
1330 "application": "",
1331 "pid": 0,
1332 "priority": 3,
1333 "facility": 1, # user
1334 "match_groups": (),
1335 "match_groups_syslog_application": (),
1336 "core_host": "",
1337 "host_in_downtime": False,
1339 self._add_rule_contact_groups_to_event(rule, event)
1340 self.rewrite_event(rule, event, {})
1341 self._event_status.new_event(event)
1342 self._history.add(event, "COUNTFAILED")
1343 cmk.ec.actions.event_has_opened(self._history, self.settings, self._config,
1344 self._logger, self, self._event_columns, rule, event)
1345 if rule.get("autodelete"):
1346 event["phase"] = "closed"
1347 self._history.add(event, "AUTODELETE")
1348 self._event_status.remove_event(event)
1350 def reload_configuration(self, config):
1351 self._config = config
1352 self._snmp_trap_engine = cmk.ec.snmp.SNMPTrapEngine(self.settings, self._config,
1353 self._logger.getChild("snmp"),
1354 self.handle_snmptrap)
1355 self.compile_rules(self._config["rules"], self._config["rule_packs"])
1356 self.host_config.initialize()
1358 # Precompile regular expressions and similar stuff. Also convert legacy
1359 # "rules" parameter into new "rule_packs" parameter
1360 def compile_rules(self, legacy_rules, rule_packs):
1361 self._rules = []
1362 self._rule_by_id = {}
1363 self._rule_hash = {} # Speedup-Hash for rule execution
1364 count_disabled = 0
1365 count_rules = 0
1366 count_unspecific = 0
1368 # Loop through all rule packages and with through their rules
1369 for rule_pack in rule_packs:
1370 if rule_pack["disabled"]:
1371 count_disabled += len(rule_pack["rules"])
1372 continue
1374 for rule in rule_pack["rules"]:
1375 if rule.get("disabled"):
1376 count_disabled += 1
1377 else:
1378 count_rules += 1
1379 rule = rule.copy() # keep original intact because of slave replication
1381 # Store information about rule pack right within the rule. This is needed
1382 # for debug output and also for skipping rule packs
1383 rule["pack"] = rule_pack["id"]
1384 self._rules.append(rule)
1385 self._rule_by_id[rule["id"]] = rule
1386 try:
1387 for key in [
1388 "match", "match_ok", "match_host", "match_application",
1389 "cancel_application"
1391 if key in rule:
1392 value = self._compile_matching_value(key, rule[key])
1393 if value is None:
1394 del rule[key]
1395 continue
1397 rule[key] = value
1399 if 'state' in rule and isinstance(rule['state'], tuple) \
1400 and rule['state'][0] == 'text_pattern':
1401 for key in ['2', '1', '0']:
1402 if key in rule['state'][1]:
1403 value = self._compile_matching_value(
1404 'state', rule['state'][1][key])
1405 if value is None:
1406 del rule['state'][1][key]
1407 else:
1408 rule['state'][1][key] = value
1410 except Exception as e:
1411 if self.settings.options.debug:
1412 raise
1413 rule["disabled"] = True
1414 count_disabled += 1
1415 self._logger.exception(
1416 "Ignoring rule '%s/%s' because of an invalid regex (%s)." %
1417 (rule["pack"], rule["id"], e))
1419 if self._config["rule_optimizer"]:
1420 self.hash_rule(rule)
1421 if "match_facility" not in rule \
1422 and "match_priority" not in rule \
1423 and "cancel_priority" not in rule \
1424 and "cancel_application" not in rule:
1425 count_unspecific += 1
1427 self._logger.info(
1428 "Compiled %d active rules (ignoring %d disabled rules)" % (count_rules, count_disabled))
1429 if self._config["rule_optimizer"]:
1430 self._logger.info("Rule hash: %d rules - %d hashed, %d unspecific" % (len(
1431 self._rules), len(self._rules) - count_unspecific, count_unspecific))
1432 for facility in range(23) + [31]:
1433 if facility in self._rule_hash:
1434 stats = []
1435 for prio, entries in self._rule_hash[facility].iteritems():
1436 stats.append("%s(%d)" % (SyslogPriority(prio), len(entries)))
1437 self._logger.info(" %-12s: %s" % (SyslogFacility(facility), " ".join(stats)))
1439 @staticmethod
1440 def _compile_matching_value(key, val):
1441 value = val.strip()
1442 # Remove leading .* from regex. This is redundant and
1443 # dramatically destroys performance when doing an infix search.
1444 if key in ["match", "match_ok"]:
1445 while value.startswith(".*") and not value.startswith(".*?"):
1446 value = value[2:]
1448 if not value:
1449 return None
1451 if cmk.utils.regex.is_regex(value):
1452 return re.compile(value, re.IGNORECASE)
1453 return val.lower()
1455 def hash_rule(self, rule):
1456 # Construct rule hash for faster execution.
1457 facility = rule.get("match_facility")
1458 if facility and not rule.get("invert_matching"):
1459 self.hash_rule_facility(rule, facility)
1460 else:
1461 for facility in xrange(32): # all syslog facilities
1462 self.hash_rule_facility(rule, facility)
1464 def hash_rule_facility(self, rule, facility):
1465 needed_prios = [False] * 8
1466 for key in ["match_priority", "cancel_priority"]:
1467 if key in rule:
1468 prio_from, prio_to = rule[key]
1469 # Beware: from > to!
1470 for p in xrange(prio_to, prio_from + 1):
1471 needed_prios[p] = True
1472 elif key == "match_priority": # all priorities match
1473 needed_prios = [True] * 8 # needed to check this rule for all event priorities
1474 elif "match_ok" in rule: # a cancelling rule where all priorities cancel
1475 needed_prios = [True] * 8 # needed to check this rule for all event priorities
1477 if rule.get("invert_matching"):
1478 needed_prios = [True] * 8
1480 prio_hash = self._rule_hash.setdefault(facility, {})
1481 for prio, need in enumerate(needed_prios):
1482 if need:
1483 prio_hash.setdefault(prio, []).append(rule)
1485 def output_hash_stats(self):
1486 self._logger.info("Top 20 of facility/priority:")
1487 entries = []
1488 total_count = 0
1489 for facility in xrange(32):
1490 for priority in xrange(8):
1491 count = self._hash_stats[facility][priority]
1492 if count:
1493 total_count += count
1494 entries.append((count, (facility, priority)))
1495 entries.sort()
1496 entries.reverse()
1497 for count, (facility, priority) in entries[:20]:
1498 self._logger.info(" %s/%s - %d (%.2f%%)" % (SyslogFacility(facility),
1499 SyslogPriority(priority), count,
1500 (100.0 * count / float(total_count))))
1502 def process_line(self, line, address):
1503 line = line.rstrip()
1504 if self._config["debug_rules"]:
1505 if address:
1506 self._logger.info(u"Processing message from %r: '%s'" % (address, line))
1507 else:
1508 self._logger.info(u"Processing message '%s'" % line)
1510 event = self.create_event_from_line(line, address)
1511 self.process_event(event)
1513 def process_event(self, event):
1514 self.do_translate_hostname(event)
1516 # Log all incoming messages into a syslog-like text file if that is enabled
1517 if self._config["log_messages"]:
1518 self.log_message(event)
1520 # Rule optimizer
1521 if self._config["rule_optimizer"]:
1522 self._hash_stats[event["facility"]][event["priority"]] += 1
1523 rule_candidates = self._rule_hash.get(event["facility"], {}).get(event["priority"], [])
1524 else:
1525 rule_candidates = self._rules
1527 skip_pack = None
1528 for rule in rule_candidates:
1529 if skip_pack and rule["pack"] == skip_pack:
1530 continue # still in the rule pack that we want to skip
1531 skip_pack = None # new pack, reset skipping
1533 try:
1534 result = self.event_rule_matches(rule, event)
1535 except Exception as e:
1536 self._logger.exception(' Exception during matching:\n%s' % e)
1537 result = False
1539 if result: # A tuple with (True/False, {match_info}).. O.o
1540 self._perfcounters.count("rule_hits")
1541 cancelling, match_groups = result
1543 if self._config["debug_rules"]:
1544 self._logger.info(" matching groups:\n%s" % pprint.pformat(match_groups))
1546 self._event_status.count_rule_match(rule["id"])
1547 if self._config["log_rulehits"]:
1548 self._logger.info("Rule '%s/%s' hit by message %s/%s - '%s'." %
1549 (rule["pack"], rule["id"], SyslogFacility(event["facility"]),
1550 SyslogPriority(event["priority"]), event["text"]))
1552 if rule.get("drop"):
1553 if rule["drop"] == "skip_pack":
1554 skip_pack = rule["pack"]
1555 if self._config["debug_rules"]:
1556 self._logger.info(" skipping this rule pack (%s)" % skip_pack)
1557 continue
1558 else:
1559 self._perfcounters.count("drops")
1560 return
1562 if cancelling:
1563 self._event_status.cancel_events(self, self._event_columns, event, match_groups,
1564 rule)
1565 return
1566 else:
1567 # Remember the rule id that this event originated from
1568 event["rule_id"] = rule["id"]
1570 # Lookup the monitoring core hosts and add the core host
1571 # name to the event when one can be matched
1572 # For the moment we have no rule/condition matching on this
1573 # field. So we only add the core host info for matched events.
1574 self._add_core_host_to_new_event(event)
1576 # Attach optional contact group information for visibility
1577 # and eventually for notifications
1578 self._add_rule_contact_groups_to_event(rule, event)
1580 # Store groups from matching this event. In order to make
1581 # persistence easier, we do not safe them as list but join
1582 # them on ASCII-1.
1583 event["match_groups"] = match_groups.get("match_groups_message", ())
1584 event["match_groups_syslog_application"] = match_groups.get(
1585 "match_groups_syslog_application", ())
1586 self.rewrite_event(rule, event, match_groups)
1588 if "count" in rule:
1589 count = rule["count"]
1590 # Check if a matching event already exists that we need to
1591 # count up. If the count reaches the limit, the event will
1592 # be opened and its rule actions performed.
1593 existing_event = \
1594 self._event_status.count_event(self, event, rule, count)
1595 if existing_event:
1596 if "delay" in rule:
1597 if self._config["debug_rules"]:
1598 self._logger.info("Event opening will be delayed for %d seconds"
1599 % rule["delay"])
1600 existing_event["delay_until"] = time.time() + rule["delay"]
1601 existing_event["phase"] = "delayed"
1602 else:
1603 cmk.ec.actions.event_has_opened(
1604 self._history, self.settings, self._config, self._logger, self,
1605 self._event_columns, rule, existing_event)
1607 self._history.add(existing_event, "COUNTREACHED")
1609 if "delay" not in rule and rule.get("autodelete"):
1610 existing_event["phase"] = "closed"
1611 self._history.add(existing_event, "AUTODELETE")
1612 with self._event_status.lock:
1613 self._event_status.remove_event(existing_event)
1614 elif "expect" in rule:
1615 self._event_status.count_expected_event(self, event)
1616 else:
1617 if "delay" in rule:
1618 if self._config["debug_rules"]:
1619 self._logger.info(
1620 "Event opening will be delayed for %d seconds" % rule["delay"])
1621 event["delay_until"] = time.time() + rule["delay"]
1622 event["phase"] = "delayed"
1623 else:
1624 event["phase"] = "open"
1626 if self.new_event_respecting_limits(event):
1627 if event["phase"] == "open":
1628 cmk.ec.actions.event_has_opened(self._history, self.settings,
1629 self._config, self._logger, self,
1630 self._event_columns, rule, event)
1631 if rule.get("autodelete"):
1632 event["phase"] = "closed"
1633 self._history.add(event, "AUTODELETE")
1634 with self._event_status.lock:
1635 self._event_status.remove_event(event)
1636 return
1638 # End of loop over rules.
1639 if self._config["archive_orphans"]:
1640 self._event_status.archive_event(event)
1642 def _add_rule_contact_groups_to_event(self, rule, event):
1643 if rule.get("contact_groups") is None:
1644 event.update({
1645 "contact_groups": None,
1646 "contact_groups_notify": False,
1647 "contact_groups_precedence": "host",
1649 else:
1650 event.update({
1651 "contact_groups": rule["contact_groups"]["groups"],
1652 "contact_groups_notify": rule["contact_groups"]["notify"],
1653 "contact_groups_precedence": rule["contact_groups"]["precedence"],
1656 def add_core_host_to_event(self, event):
1657 matched_host = self.host_config.get_by_event_host_name(event["host"])
1658 if not matched_host:
1659 event["core_host"] = ""
1660 return
1662 event["core_host"] = matched_host["name"]
1664 def _add_core_host_to_new_event(self, event):
1665 self.add_core_host_to_event(event)
1667 # Add some state dependent information (like host is in downtime etc.)
1668 event["host_in_downtime"] = self._is_host_in_downtime(event)
1670 def _is_host_in_downtime(self, event):
1671 if not event["core_host"]:
1672 return False # Found no host in core: Not in downtime!
1674 query = ("GET hosts\n"
1675 "Columns: scheduled_downtime_depth\n"
1676 "Filter: host_name = %s\n" % (event["core_host"]))
1678 try:
1679 return livestatus.LocalConnection().query_value(query) >= 1
1681 except livestatus.MKLivestatusNotFoundError:
1682 return False
1684 except Exception:
1685 if cmk.utils.debug.enabled():
1686 raise
1687 return False
1689 # Checks if an event matches a rule. Returns either False (no match)
1690 # or a pair of matchtype, groups, where matchtype is False for a
1691 # normal match and True for a cancelling match and the groups is a tuple
1692 # if matched regex groups in either text (normal) or match_ok (cancelling)
1693 # match.
1694 def event_rule_matches(self, rule, event):
1695 self._perfcounters.count("rule_tries")
1696 with self._lock_configuration:
1697 result = self._rule_matcher.event_rule_matches_non_inverted(rule, event)
1698 if rule.get("invert_matching"):
1699 if result is False:
1700 result = False, {}
1701 if self._config["debug_rules"]:
1702 self._logger.info(
1703 " Rule would not match, but due to inverted matching does.")
1704 else:
1705 result = False
1706 if self._config["debug_rules"]:
1707 self._logger.info(
1708 " Rule would match, but due to inverted matching does not.")
1710 return result
1712 # Rewrite texts and compute other fields in the event
1713 def rewrite_event(self, rule, event, groups):
1714 if rule["state"] == -1:
1715 prio = event["priority"]
1716 if prio >= 5:
1717 event["state"] = 0
1718 elif prio < 4:
1719 event["state"] = 2
1720 else:
1721 event["state"] = 1
1722 elif isinstance(rule["state"], tuple) and rule["state"][0] == "text_pattern":
1723 for key in ['2', '1', '0', '3']:
1724 if key in rule["state"][1]:
1725 match_groups = match(rule["state"][1][key], event["text"], complete=False)
1726 if match_groups is not False:
1727 event["state"] = int(key)
1728 break
1729 elif key == '3': # No rule matched!
1730 event["state"] = 3
1731 else:
1732 event["state"] = rule["state"]
1734 if ("sl" not in event) or (rule["sl"]["precedence"] == "rule"):
1735 event["sl"] = rule["sl"]["value"]
1736 event["first"] = event["time"]
1737 event["last"] = event["time"]
1738 if "set_comment" in rule:
1739 event["comment"] = replace_groups(rule["set_comment"], event["text"], groups)
1740 if "set_text" in rule:
1741 event["text"] = replace_groups(rule["set_text"], event["text"], groups)
1742 if "set_host" in rule:
1743 event["orig_host"] = event["host"]
1744 event["host"] = replace_groups(rule["set_host"], event["host"], groups)
1745 if "set_application" in rule:
1746 event["application"] = replace_groups(rule["set_application"], event["application"],
1747 groups)
1748 if "set_contact" in rule and "contact" not in event:
1749 event["contact"] = replace_groups(rule["set_contact"], event.get("contact", ""), groups)
1751 def parse_syslog_info(self, line):
1752 event = {}
1753 # Replaced ":" by ": " here to make tags with ":" possible. This
1754 # is needed to process logs generated by windows agent logfiles
1755 # like "c://test.log".
1756 tag, message = line.split(": ", 1)
1757 event["text"] = message.strip()
1759 if '[' in tag:
1760 app, pid = tag.split('[', 1)
1761 pid = pid.rstrip(']')
1762 else:
1763 app = tag
1764 pid = 0
1766 event["application"] = app
1767 event["pid"] = pid
1768 return event
1770 def parse_rfc5424_syslog_info(self, line):
1771 event = {}
1773 (_unused_version, timestamp, hostname, app_name, procid, _unused_msgid, rest) = line.split(
1774 " ", 6)
1776 # There is no 3339 parsing built into python. We do ignore subseconds and timezones
1777 # here. This is seems to be ok for the moment - sorry. Please drop a note if you
1778 # got a good solutuion for this.
1779 event['time'] = time.mktime(time.strptime(timestamp[:19], '%Y-%m-%dT%H:%M:%S'))
1781 if hostname != "-":
1782 event["host"] = hostname
1784 if app_name != "-":
1785 event["application"] = app_name
1787 if procid != "-":
1788 event["pid"] = procid
1790 if rest[0] == "[":
1791 # has stuctured data
1792 structured_data, message = rest[1:].split("] ", 1)
1793 elif rest.startswith("- "):
1794 # has no stuctured data
1795 structured_data, message = rest.split(" ", 1)
1796 else:
1797 raise Exception("Invalid RFC 5424 syslog message")
1799 if structured_data != "-":
1800 event["text"] = "[%s] %s" % (structured_data, message)
1801 else:
1802 event["text"] = message
1804 return event
1806 def parse_monitoring_info(self, line):
1807 event = {}
1808 # line starts with '@'
1809 if line[11] == ';':
1810 timestamp_str, sl, contact, rest = line[1:].split(';', 3)
1811 host, rest = rest.split(None, 1)
1812 if len(sl):
1813 event["sl"] = int(sl)
1814 if len(contact):
1815 event["contact"] = contact
1816 else:
1817 timestamp_str, host, rest = line[1:].split(None, 2)
1819 event["time"] = float(int(timestamp_str))
1820 service, message = rest.split(": ", 1)
1821 event["application"] = service
1822 event["text"] = message.strip()
1823 event["host"] = host
1824 return event
1826 # Translate a hostname if this is configured. We are
1827 # *really* sorry: this code snipped is copied from modules/check_mk_base.py.
1828 # There is still no common library. Please keep this in sync with the
1829 # original code
1830 def translate_hostname(self, backedhost):
1831 translation = self._config["hostname_translation"]
1833 # Here comes the original code from modules/check_mk_base.py
1834 if translation:
1835 # 1. Case conversion
1836 caseconf = translation.get("case")
1837 if caseconf == "upper":
1838 backedhost = backedhost.upper()
1839 elif caseconf == "lower":
1840 backedhost = backedhost.lower()
1842 # 2. Drop domain part (not applied to IP addresses!)
1843 if translation.get("drop_domain") and backedhost:
1844 # only apply if first part does not convert successfully into an int
1845 firstpart = backedhost.split(".", 1)[0]
1846 try:
1847 int(firstpart)
1848 except Exception:
1849 backedhost = firstpart
1851 # 3. Regular expression conversion
1852 if "regex" in translation:
1853 regex, subst = translation.get("regex")
1854 if not regex.endswith('$'):
1855 regex += '$'
1856 rcomp = cmk.utils.regex.regex(regex)
1857 mo = rcomp.match(backedhost)
1858 if mo:
1859 backedhost = subst
1860 for nr, text in enumerate(mo.groups()):
1861 backedhost = backedhost.replace("\\%d" % (nr + 1), text)
1863 # 4. Explicity mapping
1864 for from_host, to_host in translation.get("mapping", []):
1865 if from_host == backedhost:
1866 backedhost = to_host
1867 break
1869 return backedhost
1871 def do_translate_hostname(self, event):
1872 try:
1873 event["host"] = self.translate_hostname(event["host"])
1874 except Exception as e:
1875 if self._config["debug_rules"]:
1876 self._logger.exception('Unable to parse host "%s" (%s)' % (event.get("host"), e))
1877 event["host"] = ""
1879 def create_event_from_line(self, line, address):
1880 event = {
1881 # address is either None or a tuple of (ipaddress, port)
1882 "ipaddress": address and address[0] or "",
1883 "core_host": "",
1884 "host_in_downtime": False,
1886 try:
1887 # Variant 1: plain syslog message without priority/facility:
1888 # May 26 13:45:01 Klapprechner CRON[8046]: message....
1890 # Variant 2: syslog message including facility (RFC 3164)
1891 # <78>May 26 13:45:01 Klapprechner CRON[8046]: message....
1893 # Variant 3: local Nagios alert posted by mkevent -n
1894 # <154>@1341847712;5;Contact Info; MyHost My Service: CRIT - This che
1896 # Variant 4: remote Nagios alert posted by mkevent -n -> syslog
1897 # <154>Jul 9 17:28:32 Klapprechner @1341847712;5;Contact Info; MyHost My Service: CRIT - This che
1899 # Variant 5: syslog message
1900 # Timestamp is RFC3339 with additional restrictions:
1901 # - The "T" and "Z" characters in this syntax MUST be upper case.
1902 # - Usage of the "T" character is REQUIRED.
1903 # - Leap seconds MUST NOT be used.
1904 # <166>2013-04-05T13:49:31.685Z esx Vpxa: message....
1906 # Variant 6: syslog message without date / host:
1907 # <5>SYSTEM_INFO: [WLAN-1] Triggering Background Scan
1909 # Variant 7: logwatch.ec event forwarding
1910 # <78>@1341847712 Klapprechner /var/log/syslog: message....
1912 # Variant 7a: Event simulation
1913 # <%PRI%>@%TIMESTAMP%;%SL% %HOSTNAME% %syslogtag%: %msg%
1915 # Variant 8: syslog message from sophos firewall
1916 # <84>2015:03:25-12:02:06 gw pluto[7122]: listening for IKE messages
1918 # Variant 9: syslog message (RFC 5424)
1919 # <134>1 2016-06-02T12:49:05.181+02:00 chrissw7 ChrisApp - TestID - coming from java code
1921 # Variant 10:
1922 # 2016 May 26 15:41:47 IST XYZ Ebra: %LINEPROTO-5-UPDOWN: Line protocol on Interface Ethernet45 (XXX.ASAD.Et45), changed state to up
1923 # year month day hh:mm:ss timezone HOSTNAME KeyAgent:
1925 # FIXME: Would be better to parse the syslog messages in another way:
1926 # Split the message by the first ":", then split the syslog header part
1927 # and detect which information are present. Take a look at the syslog RFCs
1928 # for details.
1930 # Variant 2,3,4,5,6,7,8
1931 if line.startswith('<'):
1932 i = line.find('>')
1933 prio = int(line[1:i])
1934 line = line[i + 1:]
1935 event["facility"] = prio >> 3
1936 event["priority"] = prio & 7
1938 # Variant 1
1939 else:
1940 event["facility"] = 1 # user
1941 event["priority"] = 5 # notice
1943 # Variant 7 and 7a
1944 if line[0] == '@' and line[11] in [' ', ';']:
1945 details, event['host'], line = line.split(' ', 2)
1946 detail_tokens = details.split(';')
1947 timestamp = detail_tokens[0]
1948 if len(detail_tokens) > 1:
1949 event["sl"] = int(detail_tokens[1])
1950 event['time'] = float(timestamp[1:])
1951 event.update(self.parse_syslog_info(line))
1953 # Variant 3
1954 elif line.startswith("@"):
1955 event.update(self.parse_monitoring_info(line))
1957 # Variant 5
1958 elif len(line) > 24 and line[10] == 'T':
1959 # There is no 3339 parsing built into python. We do ignore subseconds and timezones
1960 # here. This is seems to be ok for the moment - sorry. Please drop a note if you
1961 # got a good solutuion for this.
1962 rfc3339_part, event['host'], line = line.split(' ', 2)
1963 event['time'] = time.mktime(time.strptime(rfc3339_part[:19], '%Y-%m-%dT%H:%M:%S'))
1964 event.update(self.parse_syslog_info(line))
1966 # Variant 9
1967 elif len(line) > 24 and line[12] == "T":
1968 event.update(self.parse_rfc5424_syslog_info(line))
1970 # Variant 8
1971 elif line[10] == '-' and line[19] == ' ':
1972 event['host'] = line.split(' ')[1]
1973 event['time'] = time.mktime(time.strptime(line.split(' ')[0], '%Y:%m:%d-%H:%M:%S'))
1974 rest = " ".join(line.split(' ')[2:])
1975 event.update(self.parse_syslog_info(rest))
1977 # Variant 6
1978 elif len(line.split(': ', 1)[0].split(' ')) == 1:
1979 event.update(self.parse_syslog_info(line))
1980 # There is no datetime information in the message, use current time
1981 event['time'] = time.time()
1982 # There is no host information, use the provided address
1983 if address and isinstance(address, tuple):
1984 event["host"] = address[0]
1986 # Variant 10
1987 elif line[4] == " " and line[:4].isdigit():
1988 time_part = line[:20] # ignoring tz info
1989 event["host"], application, line = line[25:].split(" ", 2)
1990 event["application"] = application.rstrip(":")
1991 event["text"] = line
1992 event['time'] = time.mktime(time.strptime(time_part, '%Y %b %d %H:%M:%S'))
1994 # Variant 1,2,4
1995 else:
1996 month_name, day, timeofday, host, rest = line.split(None, 4)
1997 event["host"] = host
1999 # Variant 4
2000 if rest.startswith("@"):
2001 event.update(self.parse_monitoring_info(rest))
2003 # Variant 1, 2
2004 else:
2005 event.update(self.parse_syslog_info(rest))
2007 month = EventServer.month_names[month_name]
2008 day = int(day)
2010 # Nasty: the year is not contained in the message. We cannot simply
2011 # assume that the message if from the current year.
2012 lt = time.localtime()
2013 if lt.tm_mon < 6 and month > 6: # Assume that message is from last year
2014 year = lt.tm_year - 1
2015 else:
2016 year = lt.tm_year # Assume the current year
2018 hours, minutes, seconds = map(int, timeofday.split(":"))
2020 # A further problem here: we do not now whether the message is in DST or not
2021 event["time"] = time.mktime((year, month, day, hours, minutes, seconds, 0, 0,
2022 lt.tm_isdst))
2024 # The event simulator ships the simulated original IP address in the
2025 # hostname field, separated with a pipe, e.g. "myhost|1.2.3.4"
2026 if "|" in event["host"]:
2027 event["host"], event["ipaddress"] = event["host"].split("|", 1)
2029 except Exception as e:
2030 if self._config["debug_rules"]:
2031 self._logger.exception('Got non-syslog message "%s" (%s)' % (line, e))
2032 event = {
2033 "facility": 1,
2034 "priority": 0,
2035 "text": line,
2036 "host": "",
2037 "ipaddress": address and address[0] or "",
2038 "application": "",
2039 "pid": 0,
2040 "time": time.time(),
2041 "core_host": "",
2042 "host_in_downtime": False,
2045 if self._config["debug_rules"]:
2046 self._logger.info('Parsed message:\n' + ("".join(
2047 [" %-15s %s\n" % (k + ":", v) for (k, v) in sorted(event.iteritems())])).rstrip())
2049 return event
2051 def log_message(self, event):
2052 try:
2053 with cmk.ec.history.get_logfile(self._config, self.settings.paths.messages_dir.value,
2054 self._message_period).open(mode='ab') as f:
2055 f.write("%s %s %s%s: %s\n" % (time.strftime("%b %d %H:%M:%S",
2056 time.localtime(event["time"])),
2057 event["host"], event["application"], event["pid"] and
2058 ("[%s]" % event["pid"]) or "", event["text"]))
2059 except Exception:
2060 if self.settings.options.debug:
2061 raise
2062 # Better silently ignore errors. We could have run out of
2063 # diskspace and make things worse by logging that we could
2064 # not log.
2066 def get_hosts_with_active_event_limit(self):
2067 hosts = []
2068 for hostname, num_existing_events in self._event_status.num_existing_events_by_host.iteritems(
2070 if num_existing_events >= self._config["event_limit"]["by_host"]["limit"]:
2071 hosts.append(hostname)
2072 return hosts
2074 def get_rules_with_active_event_limit(self):
2075 rule_ids = []
2076 for rule_id, num_existing_events in self._event_status.num_existing_events_by_rule.iteritems(
2078 if rule_id is None:
2079 continue # Ignore rule unrelated overflow events. They have no rule id associated.
2080 if num_existing_events >= self._config["event_limit"]["by_rule"]["limit"]:
2081 rule_ids.append(rule_id)
2082 return rule_ids
2084 def is_overall_event_limit_active(self):
2085 return self._event_status.num_existing_events \
2086 >= self._config["event_limit"]["overall"]["limit"]
2088 # protected by self._event_status.lock
2089 def new_event_respecting_limits(self, event):
2090 self._logger.verbose(
2091 "Checking limit for message from %s (rule '%s')" % (event["host"], event["rule_id"]))
2093 with self._event_status.lock:
2094 if self._handle_event_limit("overall", event):
2095 return False
2097 if self._handle_event_limit("by_host", event):
2098 return False
2100 if self._handle_event_limit("by_rule", event):
2101 return False
2103 self._event_status.new_event(event)
2104 return True
2106 # The following actions can be configured:
2107 # stop Stop creating new events
2108 # stop_overflow Stop creating new events, create overflow event
2109 # stop_overflow_notify Stop creating new events, create overflow event, notfy
2110 # delete_oldest Delete oldest event, create new event
2111 # protected by self._event_status.lock
2113 # Returns False if the event has been created and actions should be
2114 # performed on that event
2115 def _handle_event_limit(self, ty, event):
2116 assert ty in ["overall", "by_rule", "by_host"]
2118 num_already_open = self._event_status.get_num_existing_events_by(ty, event)
2119 limit, action = self._get_event_limit(ty, event)
2120 self._logger.verbose(
2121 " Type: %s, already open events: %d, Limit: %d" % (ty, num_already_open, limit))
2123 # Limit not reached: add new event
2124 if num_already_open < limit:
2125 num_already_open += 1 # after adding this event
2127 # Limit even then still not reached: we are fine
2128 if num_already_open < limit:
2129 return False
2131 # Delete oldest messages if that is the configure method of keeping the limit
2132 if action == "delete_oldest":
2133 while num_already_open > limit:
2134 self._perfcounters.count("overflows")
2135 self._event_status.remove_oldest_event(ty, event)
2136 num_already_open -= 1
2137 return False
2139 # Limit reached already in the past: Simply drop silently
2140 if num_already_open > limit:
2141 # Just log in verbose mode! Otherwise log file will be flooded
2142 self._logger.verbose(" Skip processing because limit is already in effect")
2143 self._perfcounters.count("overflows")
2144 return True # Prevent creation and prevent one time actions (below)
2146 self._logger.info(" The %s limit has been reached" % ty)
2148 # This is the event which reached the limit, allow creation of it. Further
2149 # events will be stopped.
2151 # Perform one time actions
2152 overflow_event = self._create_overflow_event(ty, event, limit)
2154 if "overflow" in action:
2155 self._logger.info(" Creating overflow event")
2156 self._event_status.new_event(overflow_event)
2158 if "notify" in action:
2159 self._logger.info(" Creating overflow notification")
2160 cmk.ec.actions.do_notify(self, self._logger, overflow_event)
2162 return False
2164 # protected by self._event_status.lock
2165 def _get_event_limit(self, ty, event):
2166 # Prefer the rule individual limit for by_rule limit (in case there is some)
2167 if ty == "by_rule":
2168 rule_limit = self._rule_by_id[event["rule_id"]].get("event_limit")
2169 if rule_limit:
2170 return rule_limit["limit"], rule_limit["action"]
2172 # Prefer the host individual limit for by_host limit (in case there is some)
2173 if ty == "by_host":
2174 host_config = self.host_config.get(event["core_host"], {})
2175 host_limit = host_config.get("custom_variables", {}).get("EC_EVENT_LIMIT")
2176 if host_limit:
2177 limit, action = host_limit.split(":", 1)
2178 return int(limit), action
2180 limit = self._config["event_limit"][ty]["limit"]
2181 action = self._config["event_limit"][ty]["action"]
2183 return limit, action
2185 def _create_overflow_event(self, ty, event, limit):
2186 now = time.time()
2187 new_event = {
2188 "rule_id": None,
2189 "phase": "open",
2190 "count": 1,
2191 "time": now,
2192 "first": now,
2193 "last": now,
2194 "comment": "",
2195 "host": "",
2196 "ipaddress": "",
2197 "application": "Event Console",
2198 "pid": 0,
2199 "priority": 2, # crit
2200 "facility": 1, # user
2201 "match_groups": (),
2202 "match_groups_syslog_application": (),
2203 "state": 2, # crit
2204 "sl": event["sl"],
2205 "core_host": "",
2206 "host_in_downtime": False,
2208 self._add_rule_contact_groups_to_event({}, new_event)
2210 if ty == "overall":
2211 new_event["text"] = ("The overall event limit of %d open events has been reached. Not "
2212 "opening any additional event until open events have been "
2213 "archived." % limit)
2215 elif ty == "by_host":
2216 new_event.update({
2217 "host": event["host"],
2218 "ipaddress": event["ipaddress"],
2219 "text": ("The host event limit of %d open events has been reached for host \"%s\". "
2220 "Not opening any additional event for this host until open events have "
2221 "been archived." % (limit, event["host"]))
2224 # Lookup the monitoring core hosts and add the core host
2225 # name to the event when one can be matched
2226 self._add_core_host_to_new_event(new_event)
2228 elif ty == "by_rule":
2229 new_event.update({
2230 "rule_id": event["rule_id"],
2231 "contact_groups": event["contact_groups"],
2232 "contact_groups_notify": event.get("contact_groups_notify", False),
2233 "contact_groups_precedence": event.get("contact_groups_precedence", "host"),
2234 "text": ("The rule event limit of %d open events has been reached for rule \"%s\". "
2235 "Not opening any additional event for this rule until open events have "
2236 "been archived." % (limit, event["rule_id"]))
2239 else:
2240 raise NotImplementedError()
2242 return new_event
2245 class RuleMatcher(object):
2246 def __init__(self, logger, config):
2247 super(RuleMatcher, self).__init__()
2248 self._logger = logger
2249 self._config = config
2250 self._time_periods = TimePeriods(logger)
2252 @property
2253 def _debug_rules(self):
2254 return self._config["debug_rules"]
2256 def event_rule_matches_non_inverted(self, rule, event):
2257 if self._debug_rules:
2258 self._logger.info("Trying rule %s/%s..." % (rule["pack"], rule["id"]))
2259 self._logger.info(" Text: %s" % event["text"])
2260 self._logger.info(" Syslog: %d.%d" % (event["facility"], event["priority"]))
2261 self._logger.info(" Host: %s" % event["host"])
2263 # Generic conditions without positive/canceling matches
2264 if not self.event_rule_matches_generic(rule, event):
2265 return False
2267 # Determine syslog priority
2268 match_priority = {}
2269 if not self.event_rule_determine_match_priority(rule, event, match_priority):
2270 # Abort on negative outcome, neither positive nor negative
2271 return False
2273 # Determine and cleanup match_groups
2274 match_groups = {}
2275 if not self.event_rule_determine_match_groups(rule, event, match_groups):
2276 # Abort on negative outcome, neither positive nor negative
2277 return False
2279 return self._check_match_outcome(rule, match_groups, match_priority)
2281 def _check_match_outcome(self, rule, match_groups, match_priority):
2282 # type: (Dict[str, Any], Dict[str, Any], Dict[str, Any]) -> Union[bool, Tuple[bool, Dict[str, Any]]]
2283 """Decide or not a event is created, canceled or nothing is done"""
2285 # Check canceling-event
2286 has_canceling_condition = bool(
2287 [x for x in ["match_ok", "cancel_application", "cancel_priority"] if x in rule])
2288 if has_canceling_condition:
2289 if ("match_ok" not in rule or match_groups.get("match_groups_message_ok", False) is not False) and\
2290 ("cancel_application" not in rule or
2291 match_groups.get("match_groups_syslog_application_ok", False) is not False) and\
2292 ("cancel_priority" not in rule or match_priority["has_canceling_match"] is True):
2293 if self._debug_rules:
2294 self._logger.info(" found canceling event")
2295 return True, match_groups
2297 # Check create-event
2298 if match_groups["match_groups_message"] is not False and\
2299 match_groups.get("match_groups_syslog_application", ()) is not False and\
2300 match_priority["has_match"] is True:
2301 if self._debug_rules:
2302 self._logger.info(" found new event")
2303 return False, match_groups
2305 # Looks like there was no match, output some additonal info
2306 # Reasons preventing create-event
2307 if self._debug_rules:
2308 if match_groups["match_groups_message"] is False:
2309 self._logger.info(" did not create event, because of wrong message")
2310 if "match_application" in rule and match_groups[
2311 "match_groups_syslog_application"] is False:
2312 self._logger.info(" did not create event, because of wrong syslog application")
2313 if "match_priority" in rule and match_priority["has_match"] is False:
2314 self._logger.info(" did not create event, because of wrong syslog priority")
2316 if has_canceling_condition:
2317 # Reasons preventing cancel-event
2318 if "match_ok" in rule and match_groups.get("match_groups_message_ok",
2319 False) is False:
2320 self._logger.info(" did not cancel event, because of wrong message")
2321 if "cancel_application" in rule and \
2322 match_groups.get("match_groups_syslog_application_ok", False) is False:
2323 self._logger.info(" did not cancel event, because of wrong syslog application")
2324 if "cancel_priority" in rule and match_priority["has_canceling_match"] is False:
2325 self._logger.info(" did not cancel event, because of wrong cancel priority")
2327 return False
2329 def event_rule_matches_generic(self, rule, event):
2330 generic_match_functions = [
2331 self.event_rule_matches_site,
2332 self.event_rule_matches_host,
2333 self.event_rule_matches_ip,
2334 self.event_rule_matches_facility,
2335 self.event_rule_matches_service_level,
2336 self.event_rule_matches_timeperiod,
2339 for match_function in generic_match_functions:
2340 if not match_function(rule, event):
2341 return False
2342 return True
2344 def event_rule_determine_match_priority(self, rule, event, match_priority):
2345 p = event["priority"]
2347 if "match_priority" in rule:
2348 prio_from, prio_to = rule["match_priority"]
2349 if prio_from > prio_to:
2350 prio_to, prio_from = prio_from, prio_to
2351 match_priority["has_match"] = prio_from <= p <= prio_to
2352 else:
2353 match_priority["has_match"] = True
2355 if "cancel_priority" in rule:
2356 cancel_from, cancel_to = rule["cancel_priority"]
2357 match_priority["has_canceling_match"] = cancel_from <= p <= cancel_to
2358 else:
2359 match_priority["has_canceling_match"] = False
2361 if match_priority["has_match"] is False and\
2362 match_priority["has_canceling_match"] is False:
2363 return False
2365 return True
2367 def event_rule_matches_site(self, rule, event):
2368 return "match_site" not in rule or cmk.omd_site() in rule["match_site"]
2370 def event_rule_matches_host(self, rule, event):
2371 if match(rule.get("match_host"), event["host"], complete=True) is False:
2372 if self._debug_rules:
2373 self._logger.info(" did not match because of wrong host '%s' (need '%s')" %
2374 (event["host"], format_pattern(rule.get("match_host"))))
2375 return False
2376 return True
2378 def event_rule_matches_ip(self, rule, event):
2379 if not match_ipv4_network(rule.get("match_ipaddress", "0.0.0.0/0"), event["ipaddress"]):
2380 if self._debug_rules:
2381 self._logger.info(
2382 " did not match because of wrong source IP address '%s' (need '%s')" %
2383 (event["ipaddress"], rule.get("match_ipaddress")))
2384 return False
2385 return True
2387 def event_rule_matches_facility(self, rule, event):
2388 if "match_facility" in rule and event["facility"] != rule["match_facility"]:
2389 if self._debug_rules:
2390 self._logger.info(" did not match because of wrong syslog facility")
2391 return False
2392 return True
2394 def event_rule_matches_service_level(self, rule, event):
2395 if "match_sl" in rule:
2396 sl_from, sl_to = rule["match_sl"]
2397 if sl_from > sl_to:
2398 sl_to, sl_from = sl_from, sl_to
2399 p = event.get("sl", 0)
2400 if p < sl_from or p > sl_to:
2401 if self._debug_rules:
2402 self._logger.info(
2403 " did not match because of wrong service level %d (need %d..%d)" %
2404 (p, sl_from, sl_to),)
2405 return False
2406 return True
2408 def event_rule_matches_timeperiod(self, rule, event):
2409 if "match_timeperiod" in rule and not self._time_periods.check(rule["match_timeperiod"]):
2410 if self._debug_rules:
2411 self._logger.info(" did not match, because timeperiod %s is not active" %
2412 rule["match_timeperiod"])
2413 return False
2414 return True
2416 def event_rule_determine_match_groups(self, rule, event, match_groups):
2417 match_group_functions = [
2418 self.event_rule_matches_syslog_application,
2419 self.event_rule_matches_message,
2421 for match_function in match_group_functions:
2422 if not match_function(rule, event, match_groups):
2423 return False
2424 return True
2426 def event_rule_matches_syslog_application(self, rule, event, match_groups):
2427 if "match_application" not in rule and "cancel_application" not in rule:
2428 return True
2430 # Syslog application
2431 if "match_application" in rule:
2432 match_groups["match_groups_syslog_application"] = match(
2433 rule.get("match_application"), event["application"], complete=False)
2435 # Syslog application canceling, this option must be explictly set
2436 if "cancel_application" in rule:
2437 match_groups["match_groups_syslog_application_ok"] = match(
2438 rule.get("cancel_application"), event["application"], complete=False)
2440 # Detect impossible match
2441 if match_groups.get("match_groups_syslog_application", False) is False and\
2442 match_groups.get("match_groups_syslog_application_ok", False) is False:
2443 if self._debug_rules:
2444 self._logger.info(" did not match, syslog application does not match")
2445 return False
2447 return True
2449 def event_rule_matches_message(self, rule, event, match_groups):
2450 # Message matching, this condition is always active
2451 match_groups["match_groups_message"] = match(
2452 rule.get("match"), event["text"], complete=False)
2454 # Message canceling, this option must be explictly set
2455 if "match_ok" in rule:
2456 match_groups["match_groups_message_ok"] = match(
2457 rule.get("match_ok"), event["text"], complete=False)
2459 # Detect impossible match
2460 if match_groups["match_groups_message"] is False and\
2461 match_groups.get("match_groups_message_ok", False) is False:
2462 if self._debug_rules:
2463 self._logger.info(" did not match, message text does not match")
2464 return False
2466 return True
2470 # .--Status Queries------------------------------------------------------.
2471 # | ____ _ _ ___ _ |
2472 # | / ___|| |_ __ _| |_ _ _ ___ / _ \ _ _ ___ _ __(_) ___ ___ |
2473 # | \___ \| __/ _` | __| | | / __| | | | | | | |/ _ \ '__| |/ _ \/ __| |
2474 # | ___) | || (_| | |_| |_| \__ \ | |_| | |_| | __/ | | | __/\__ \ |
2475 # | |____/ \__\__,_|\__|\__,_|___/ \__\_\\__,_|\___|_| |_|\___||___/ |
2476 # | |
2477 # +----------------------------------------------------------------------+
2478 # | Parsing and processing of status queries |
2479 # '----------------------------------------------------------------------'
2482 class Queries(object):
2483 def __init__(self, status_server, sock, logger):
2484 super(Queries, self).__init__()
2485 self._status_server = status_server
2486 self._socket = sock
2487 self._logger = logger
2488 self._buffer = ""
2490 def __iter__(self):
2491 return self
2493 def next(self):
2494 while True:
2495 parts = self._buffer.split("\n\n", 1)
2496 if len(parts) > 1:
2497 break
2498 data = self._socket.recv(4096)
2499 if not data:
2500 if len(self._buffer) == 0:
2501 raise StopIteration()
2502 parts = [self._buffer, ""]
2503 break
2504 self._buffer += data
2505 request, self._buffer = parts
2506 return Query.make(self._status_server, request.decode("utf-8").splitlines(), self._logger)
2509 class Query(object):
2510 @staticmethod
2511 def make(status_server, raw_query, logger):
2512 parts = raw_query[0].split(None, 1)
2513 if len(parts) != 2:
2514 raise MKClientError("Invalid query. Need GET/COMMAND plus argument(s)")
2515 method = parts[0]
2516 if method == "GET":
2517 return QueryGET(status_server, raw_query, logger)
2518 if method == "REPLICATE":
2519 return QueryREPLICATE(status_server, raw_query, logger)
2520 if method == "COMMAND":
2521 return QueryCOMMAND(status_server, raw_query, logger)
2522 raise MKClientError("Invalid method %s (allowed are GET, REPLICATE, COMMAND)" % method)
2524 def __init__(self, status_server, raw_query, logger):
2525 super(Query, self).__init__()
2527 self._logger = logger
2528 self.output_format = "python"
2530 self._raw_query = raw_query
2531 self._from_raw_query(status_server)
2533 def _from_raw_query(self, status_server):
2534 self._parse_method_and_args()
2536 def _parse_method_and_args(self):
2537 parts = self._raw_query[0].split(None, 1)
2538 if len(parts) != 2:
2539 raise MKClientError("Invalid query. Need GET/COMMAND plus argument(s)")
2541 self.method, self.method_arg = parts
2543 def __repr__(self):
2544 return repr("\n".join(self._raw_query))
2547 class QueryGET(Query):
2548 _filter_operators = {
2549 "=": (lambda a, b: a == b),
2550 ">": (lambda a, b: a > b),
2551 "<": (lambda a, b: a < b),
2552 ">=": (lambda a, b: a >= b),
2553 "<=": (lambda a, b: a <= b),
2554 "~": (lambda a, b: cmk.utils.regex.regex(b).search(a)),
2555 "=~": (lambda a, b: a.lower() == b.lower()),
2556 "~~": (lambda a, b: cmk.utils.regex.regex(b.lower()).search(a.lower())),
2557 "in": (lambda a, b: a in b),
2560 def _from_raw_query(self, status_server):
2561 super(QueryGET, self)._from_raw_query(status_server)
2562 self._parse_table(status_server)
2563 self._parse_header_lines()
2565 def _parse_table(self, status_server):
2566 self.table_name = self.method_arg
2567 self.table = status_server.table(self.table_name)
2569 def _parse_header_lines(self):
2570 self.requested_columns = self.table.column_names # use all columns as default
2571 self.filters = []
2572 self.limit = None
2573 self.only_host = None
2575 self.header_lines = []
2576 for line in self._raw_query[1:]:
2577 try:
2578 header, argument = line.rstrip("\n").split(":", 1)
2579 argument = argument.lstrip(" ")
2581 if header == "OutputFormat":
2582 if argument not in ["python", "plain", "json"]:
2583 raise MKClientError(
2584 "Invalid output format \"%s\" (allowed are: python, plain, json)" %
2585 argument)
2587 self.output_format = argument
2589 elif header == "Columns":
2590 self.requested_columns = argument.split(" ")
2592 elif header == "Filter":
2593 column_name, operator_name, predicate, argument = self._parse_filter(argument)
2595 # Needed for later optimization (check_mkevents)
2596 if column_name == "event_host" and operator_name == 'in':
2597 self.only_host = set(argument)
2599 self.filters.append((column_name, operator_name, predicate, argument))
2601 elif header == "Limit":
2602 self.limit = int(argument)
2604 else:
2605 self._logger.info("Ignoring not-implemented header %s" % header)
2607 except Exception as e:
2608 raise MKClientError("Invalid header line '%s': %s" % (line.rstrip(), e))
2610 def _parse_filter(self, textspec):
2611 # Examples:
2612 # id = 17
2613 # name ~= This is some .* text
2614 # host_name =
2615 parts = textspec.split(None, 2)
2616 if len(parts) == 2:
2617 parts.append("")
2618 column, operator_name, argument = parts
2620 try:
2621 convert = self.table.column_types[column]
2622 except KeyError:
2623 raise MKClientError(
2624 "Unknown column: %s (Available are: %s)" % (column, self.table.column_names))
2626 # TODO: BUG: The query is decoded to unicode after receiving it from
2627 # the socket. The columns with type str (initialied with "") will apply
2628 # str(argument) here and convert the value back to str! This will crash
2629 # when the filter contains non ascii characters!
2630 # Fix this by making the default values unicode and skip unicode conversion
2631 # here (for performance reasons) because argument is already unicode.
2632 if operator_name == 'in':
2633 argument = map(convert, argument.split())
2634 else:
2635 argument = convert(argument)
2637 operator_function = self._filter_operators.get(operator_name)
2638 if not operator_function:
2639 raise MKClientError("Unknown filter operator '%s'" % operator_name)
2641 return (column, operator_name, lambda x: operator_function(x, argument), argument)
2643 def requested_column_indexes(self):
2644 indexes = []
2646 for column_name in self.requested_columns:
2647 try:
2648 column_index = self.table.column_indices[column_name]
2649 except KeyError:
2650 # The column is not known: Use None as index and None value later
2651 column_index = None
2652 indexes.append(column_index)
2654 return indexes
2656 def filter_row(self, row):
2657 for column_name, _operator_name, predicate, _argument in self.filters:
2658 if not predicate(row[self.table.column_indices[column_name]]):
2659 return None
2660 return row
2663 class QueryREPLICATE(Query):
2664 pass
2667 class QueryCOMMAND(Query):
2668 pass
2672 # .--Status Tables-------------------------------------------------------.
2673 # | ____ _ _ _____ _ _ |
2674 # | / ___|| |_ __ _| |_ _ _ ___ |_ _|_ _| |__ | | ___ ___ |
2675 # | \___ \| __/ _` | __| | | / __| | |/ _` | '_ \| |/ _ \/ __| |
2676 # | ___) | || (_| | |_| |_| \__ \ | | (_| | |_) | | __/\__ \ |
2677 # | |____/ \__\__,_|\__|\__,_|___/ |_|\__,_|_.__/|_|\___||___/ |
2678 # | |
2679 # +----------------------------------------------------------------------+
2680 # | Definitions of the tables available for status queries |
2681 # '----------------------------------------------------------------------'
2682 # If you need a new column here, then these are the places to change:
2683 # bin/mkeventd:
2684 # - add column to the end of StatusTableEvents.columns
2685 # - add column to grepping_filters if it is a str column
2686 # - deal with convert_history_line() (if not a str column)
2687 # - make sure that the new column is filled at *every* place where
2688 # an event is being created:
2689 # * _create_event_from_trap()
2690 # * create_event_from_line()
2691 # * _handle_absent_event()
2692 # * _create_overflow_event()
2693 # - When loading the status file add the possibly missing column to all
2694 # loaded events (load_status())
2695 # - Maybe add matching/rewriting for the new column
2696 # - write the actual code using the new column
2697 # web:
2698 # - Add column painter for the new column
2699 # - Create a sorter
2700 # - Create a filter
2701 # - Add painter and filter to all views where appropriate
2702 # - maybe add WATO code for matching rewriting
2703 # - do not forget event_rule_matches() in web!
2704 # - maybe add a field into the event simulator
2707 class StatusTable(object):
2708 prefix = None # type: Optional[str]
2709 columns = [] # type: List[Tuple[str, Any]]
2711 # Must return a enumerable type containing fully populated lists (rows) matching the
2712 # columns of the table
2713 @abc.abstractmethod
2714 def _enumerate(self, query):
2715 raise NotImplementedError()
2717 def __init__(self, logger):
2718 super(StatusTable, self).__init__()
2719 self._logger = logger.getChild("status_table.%s" % self.prefix)
2720 self._populate_column_views()
2722 def _populate_column_views(self):
2723 self.column_names = [c[0] for c in self.columns]
2724 self.column_defaults = dict(self.columns)
2726 self.column_types = {}
2727 for name, def_val in self.columns:
2728 self.column_types[name] = type(def_val)
2730 self.column_indices = dict([(name, index) for index, name in enumerate(self.column_names)])
2732 def query(self, query):
2733 requested_column_indexes = query.requested_column_indexes()
2735 # Output the column headers
2736 # TODO: Add support for ColumnHeaders like in livestatus?
2737 yield query.requested_columns
2739 num_rows = 0
2740 for row in self._enumerate(query):
2741 if query.limit is not None and num_rows >= query.limit:
2742 break # The maximum number of rows has been reached
2744 # Apply filters
2745 # TODO: History filtering is done in history load code. Check for improvements
2746 if query.filters and query.table_name != "history":
2747 matched = query.filter_row(row)
2748 if not matched:
2749 continue
2751 yield self._build_result_row(row, requested_column_indexes)
2752 num_rows += 1
2754 def _build_result_row(self, row, requested_column_indexes):
2755 result_row = []
2756 for index in requested_column_indexes:
2757 if index is None:
2758 result_row.append(None)
2759 else:
2760 result_row.append(row[index])
2761 return result_row
2764 class StatusTableEvents(StatusTable):
2765 prefix = "event"
2766 columns = [
2767 ("event_id", 1),
2768 ("event_count", 1),
2769 ("event_text", ""),
2770 ("event_first", 0.0),
2771 ("event_last", 0.0),
2772 ("event_comment", ""),
2773 ("event_sl", 0), # filter fehlt
2774 ("event_host", ""),
2775 ("event_contact", ""),
2776 ("event_application", ""),
2777 ("event_pid", 0),
2778 ("event_priority", 5),
2779 ("event_facility", 1),
2780 ("event_rule_id", ""),
2781 ("event_state", 0),
2782 ("event_phase", ""),
2783 ("event_owner", ""),
2784 ("event_match_groups", ""), # last column up to 1.2.4
2785 ("event_contact_groups", ""), # introduced in 1.2.5i2
2786 ("event_ipaddress", ""), # introduced in 1.2.7i1
2787 ("event_orig_host", ""), # introduced in 1.4.0b1
2788 ("event_contact_groups_precedence", "host"), # introduced in 1.4.0b1
2789 ("event_core_host", ""), # introduced in 1.5.0i1
2790 ("event_host_in_downtime", False), # introduced in 1.5.0i1
2791 ("event_match_groups_syslog_application", ""), # introduced in 1.5.0i2
2794 def __init__(self, logger, event_status):
2795 super(StatusTableEvents, self).__init__(logger)
2796 self._event_status = event_status
2798 def _enumerate(self, query):
2799 for event in self._event_status.get_events():
2800 # Optimize filters that are set by the check_mkevents active check. Since users
2801 # may have a lot of those checks running, it is a good idea to optimize this.
2802 if query.only_host and event["host"] not in query.only_host:
2803 continue
2805 row = []
2806 for column_name in self.column_names:
2807 try:
2808 row.append(event[column_name[6:]])
2809 except KeyError:
2810 # The row does not have this value. Use the columns default value
2811 row.append(self.column_defaults[column_name])
2813 yield row
2816 class StatusTableHistory(StatusTable):
2817 prefix = "history"
2818 columns = [
2819 ("history_line", 0), # Line number in event history file
2820 ("history_time", 0.0),
2821 ("history_what", ""),
2822 ("history_who", ""),
2823 ("history_addinfo", ""),
2824 ] + StatusTableEvents.columns
2826 def __init__(self, logger, history):
2827 super(StatusTableHistory, self).__init__(logger)
2828 self._history = history
2830 def _enumerate(self, query):
2831 return self._history.get(query)
2834 class StatusTableRules(StatusTable):
2835 prefix = "rule"
2836 columns = [
2837 ("rule_id", ""),
2838 ("rule_hits", 0),
2841 def __init__(self, logger, event_status):
2842 super(StatusTableRules, self).__init__(logger)
2843 self._event_status = event_status
2845 def _enumerate(self, query):
2846 return self._event_status.get_rule_stats()
2849 class StatusTableStatus(StatusTable):
2850 prefix = "status"
2851 columns = EventServer.status_columns()
2853 def __init__(self, logger, event_server):
2854 super(StatusTableStatus, self).__init__(logger)
2855 self._event_server = event_server
2857 def _enumerate(self, query):
2858 return self._event_server.get_status()
2862 # .--StatusServer--------------------------------------------------------.
2863 # | ____ _ _ ____ |
2864 # | / ___|| |_ __ _| |_ _ _ ___/ ___| ___ _ ____ _____ _ __ |
2865 # | \___ \| __/ _` | __| | | / __\___ \ / _ \ '__\ \ / / _ \ '__| |
2866 # | ___) | || (_| | |_| |_| \__ \___) | __/ | \ V / __/ | |
2867 # | |____/ \__\__,_|\__|\__,_|___/____/ \___|_| \_/ \___|_| |
2868 # | |
2869 # +----------------------------------------------------------------------+
2870 # | Beantworten von Status- und Kommandoanfragen über das UNIX-Socket |
2871 # '----------------------------------------------------------------------'
2874 class StatusServer(ECServerThread):
2875 def __init__(self, logger, settings, config, slave_status, perfcounters, lock_configuration,
2876 history, event_status, event_server, terminate_main_event):
2877 super(StatusServer, self).__init__(
2878 name="StatusServer",
2879 logger=logger,
2880 settings=settings,
2881 config=config,
2882 slave_status=slave_status,
2883 profiling_enabled=settings.options.profile_status,
2884 profile_file=settings.paths.status_server_profile.value)
2885 self._socket = None
2886 self._tcp_socket = None
2887 self._reopen_sockets = False
2889 self._table_events = StatusTableEvents(logger, event_status)
2890 self._table_history = StatusTableHistory(logger, history)
2891 self._table_rules = StatusTableRules(logger, event_status)
2892 self._table_status = StatusTableStatus(logger, event_server)
2893 self._perfcounters = perfcounters
2894 self._lock_configuration = lock_configuration
2895 self._history = history
2896 self._event_status = event_status
2897 self._event_server = event_server
2898 self._event_columns = StatusTableEvents.columns
2899 self._terminate_main_event = terminate_main_event
2901 self.open_unix_socket()
2902 self.open_tcp_socket()
2904 def table(self, name):
2905 if name == "events":
2906 return self._table_events
2907 if name == "history":
2908 return self._table_history
2909 if name == "rules":
2910 return self._table_rules
2911 if name == "status":
2912 return self._table_status
2913 raise MKClientError(
2914 "Invalid table: %s (allowed are: events, history, rules, status)" % name)
2916 def open_unix_socket(self):
2917 path = self.settings.paths.unix_socket.value
2918 if path.exists():
2919 path.unlink()
2920 path.parent.mkdir(parents=True, exist_ok=True)
2921 self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
2922 self._socket.bind(str(path))
2923 # Make sure that socket is group writable
2924 path.chmod(0o664)
2925 self._socket.listen(self._config['socket_queue_len'])
2926 self._unix_socket_queue_len = self._config['socket_queue_len'] # detect changes in config
2928 def open_tcp_socket(self):
2929 if self._config["remote_status"]:
2930 try:
2931 self._tcp_port, self._tcp_allow_commands = self._config["remote_status"][:2]
2932 try:
2933 self._tcp_access_list = self._config["remote_status"][2]
2934 except Exception:
2935 self._tcp_access_list = None
2937 self._tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2938 self._tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
2939 self._tcp_socket.bind(("0.0.0.0", self._tcp_port))
2940 self._tcp_socket.listen(self._config['socket_queue_len'])
2941 self._logger.info(
2942 "Going to listen for status queries on TCP port %d" % self._tcp_port)
2943 except Exception as e:
2944 if self.settings.options.debug:
2945 raise
2946 self._logger.exception(
2947 "Cannot listen on TCP socket port %d: %s" % (self._tcp_port, e))
2948 else:
2949 self._tcp_socket = None
2950 self._tcp_port = 0
2951 self._tcp_allow_commands = False
2952 self._tcp_access_list = None
2954 def close_unix_socket(self):
2955 if self._socket:
2956 self._socket.close()
2957 self._socket = None
2959 def close_tcp_socket(self):
2960 if self._tcp_socket:
2961 self._tcp_socket.close()
2962 self._tcp_socket = None
2964 def reopen_sockets(self):
2965 if self._unix_socket_queue_len != self._config["socket_queue_len"]:
2966 self._logger.info("socket_queue_len has changed. Reopening UNIX socket.")
2967 self.close_unix_socket()
2968 self.open_unix_socket()
2970 self.close_tcp_socket()
2971 self.open_tcp_socket()
2973 def reload_configuration(self, config):
2974 self._config = config
2975 self._reopen_sockets = True
2977 def serve(self):
2978 while not self._terminate_event.is_set():
2979 try:
2980 client_socket = None
2981 addr_info = None
2983 if self._reopen_sockets:
2984 self.reopen_sockets()
2985 self._reopen_sockets = False
2987 listen_list = [self._socket]
2988 if self._tcp_socket:
2989 listen_list.append(self._tcp_socket)
2991 try:
2992 readable = select.select(listen_list, [], [], 0.2)[0]
2993 except select.error as e:
2994 if e[0] == errno.EINTR:
2995 continue
2996 raise
2998 for s in readable:
2999 client_socket, addr_info = s.accept()
3000 client_socket.settimeout(3)
3001 before = time.time()
3002 self._perfcounters.count("connects")
3003 if addr_info:
3004 allow_commands = self._tcp_allow_commands
3005 if self.settings.options.debug:
3006 self._logger.info("Handle status connection from %s:%d" % addr_info)
3007 if self._tcp_access_list is not None and addr_info[0] not in \
3008 self._tcp_access_list:
3009 client_socket.close()
3010 client_socket = None
3011 self._logger.info(
3012 "Denying access to status socket from %s (allowed is only %s)" %
3013 (addr_info[0], ", ".join(self._tcp_access_list)))
3014 continue
3015 else:
3016 allow_commands = True
3018 self.handle_client(client_socket, allow_commands, addr_info and addr_info[0] or
3021 duration = time.time() - before
3022 self._logger.verbose("Answered request in %0.2f ms" % (duration * 1000))
3023 self._perfcounters.count_time("request", duration)
3025 except Exception as e:
3026 msg = "Error handling client %s: %s" % (addr_info, e)
3027 # Do not log a stack trace for client errors, they are not *our* fault.
3028 if isinstance(e, MKClientError):
3029 self._logger.error(msg)
3030 else:
3031 self._logger.exception(msg)
3032 if client_socket:
3033 client_socket.close()
3034 client_socket = None
3035 time.sleep(0.2)
3036 client_socket = None # close without danger of exception
3038 def handle_client(self, client_socket, allow_commands, client_ip):
3039 for query in Queries(self, client_socket, self._logger):
3040 self._logger.verbose("Client livestatus query: %r" % query)
3042 with self._event_status.lock:
3043 if query.method == "GET":
3044 response = self.table(query.table_name).query(query)
3046 elif query.method == "REPLICATE":
3047 response = self.handle_replicate(query.method_arg, client_ip)
3049 elif query.method == "COMMAND":
3050 if not allow_commands:
3051 raise MKClientError("Sorry. Commands are disallowed via TCP")
3052 self.handle_command_request(query.method_arg)
3053 response = None
3055 else:
3056 raise NotImplementedError()
3058 try:
3059 self._answer_query(client_socket, query, response)
3060 except socket.error as e:
3061 if e.errno == 32: # Broken pipe -> ignore this
3062 pass
3063 else:
3064 raise
3066 client_socket.close()
3068 # Only GET queries have customizable output formats. COMMAND is always
3069 # a dictionay and COMMAND is always None and always output as "python"
3070 def _answer_query(self, client_socket, query, response):
3071 if query.method != "GET":
3072 self._answer_query_python(client_socket, response)
3073 return
3075 if query.output_format == "plain":
3076 for row in response:
3077 client_socket.sendall("\t".join([cmk.ec.history.quote_tab(c) for c in row]) + "\n")
3079 elif query.output_format == "json":
3080 client_socket.sendall(json.dumps(list(response)) + "\n")
3082 elif query.output_format == "python":
3083 self._answer_query_python(client_socket, list(response))
3085 else:
3086 raise NotImplementedError()
3088 def _answer_query_python(self, client_socket, response):
3089 client_socket.sendall(repr(response) + "\n")
3091 # All commands are already locked with self._event_status.lock
3092 def handle_command_request(self, commandline):
3093 self._logger.info("Executing command: %s" % commandline)
3094 parts = commandline.split(";")
3095 command = parts[0]
3096 replication_allow_command(self._config, command, self._slave_status)
3097 arguments = parts[1:]
3098 if command == "DELETE":
3099 self.handle_command_delete(arguments)
3100 elif command == "RELOAD":
3101 self.handle_command_reload()
3102 elif command == "SHUTDOWN":
3103 self._logger.info("Going to shut down")
3104 terminate(self._terminate_main_event, self._event_server, self)
3105 elif command == "REOPENLOG":
3106 self.handle_command_reopenlog()
3107 elif command == "FLUSH":
3108 self.handle_command_flush()
3109 elif command == "SYNC":
3110 self.handle_command_sync()
3111 elif command == "RESETCOUNTERS":
3112 self.handle_command_resetcounters(arguments)
3113 elif command == "UPDATE":
3114 self.handle_command_update(arguments)
3115 elif command == "CREATE":
3116 self.handle_command_create(arguments)
3117 elif command == "CHANGESTATE":
3118 self.handle_command_changestate(arguments)
3119 elif command == "ACTION":
3120 self.handle_command_action(arguments)
3121 elif command == "SWITCHMODE":
3122 self.handle_command_switchmode(arguments)
3123 else:
3124 raise MKClientError("Unknown command %s" % command)
3126 def handle_command_delete(self, arguments):
3127 if len(arguments) != 2:
3128 raise MKClientError("Wrong number of arguments for DELETE")
3129 event_id, user = arguments
3130 self._event_status.delete_event(int(event_id), user)
3132 def handle_command_update(self, arguments):
3133 event_id, user, acknowledged, comment, contact = arguments
3134 event = self._event_status.event(int(event_id))
3135 if not event:
3136 raise MKClientError("No event with id %s" % event_id)
3137 # Note the common practice: We validate parameters *before* doing any changes.
3138 if acknowledged:
3139 ack = int(acknowledged)
3140 if ack and event["phase"] not in ["open", "ack"]:
3141 raise MKClientError("You cannot acknowledge an event that is not open.")
3142 event["phase"] = "ack" if ack else "open"
3143 if comment:
3144 event["comment"] = comment
3145 if contact:
3146 event["contact"] = contact
3147 if user:
3148 event["owner"] = user
3149 self._history.add(event, "UPDATE", user)
3151 def handle_command_create(self, arguments):
3152 # Would rather use process_raw_line(), but we are already
3153 # holding self._event_status.lock and it's sub functions are setting
3154 # self._event_status.lock too. The lock can not be allocated twice.
3155 # TODO: Change the lock type in future?
3156 # process_raw_lines("%s" % ";".join(arguments))
3157 with file(str(self.settings.paths.event_pipe.value), "w") as pipe:
3158 pipe.write(("%s\n" % ";".join(arguments)).encode("utf-8"))
3160 def handle_command_changestate(self, arguments):
3161 event_id, user, newstate = arguments
3162 event = self._event_status.event(int(event_id))
3163 if not event:
3164 raise MKClientError("No event with id %s" % event_id)
3165 event["state"] = int(newstate)
3166 if user:
3167 event["owner"] = user
3168 self._history.add(event, "CHANGESTATE", user)
3170 def handle_command_reload(self):
3171 reload_configuration(self.settings, self._logger, self._lock_configuration, self._history,
3172 self._event_status, self._event_server, self, self._slave_status)
3174 def handle_command_reopenlog(self):
3175 self._logger.info("Closing this logfile")
3176 cmk.utils.log.open_log(str(self.settings.paths.log_file.value))
3177 self._logger.info("Opened new logfile")
3179 # Erase our current state and history!
3180 def handle_command_flush(self):
3181 self._history.flush()
3182 self._event_status.flush()
3183 self._event_status.save_status()
3184 if is_replication_slave(self._config):
3185 try:
3186 self.settings.paths.master_config_file.value.unlink()
3187 self.settings.paths.slave_status_file.value.unlink()
3188 update_slave_status(self._slave_status, self.settings, self._config)
3189 except Exception:
3190 pass
3191 self._logger.info("Flushed current status and historic events.")
3193 def handle_command_sync(self):
3194 self._event_status.save_status()
3196 def handle_command_resetcounters(self, arguments):
3197 if arguments:
3198 rule_id = arguments[0]
3199 self._logger.info("Resetting counters of rule " + rule_id)
3200 else:
3201 rule_id = None # Reset all rule counters
3202 self._logger.info("Resetting all rule counters")
3203 self._event_status.reset_counters(rule_id)
3205 def handle_command_action(self, arguments):
3206 event_id, user, action_id = arguments
3207 event = self._event_status.event(int(event_id))
3208 if user:
3209 event["owner"] = user
3211 if action_id == "@NOTIFY":
3212 cmk.ec.actions.do_notify(
3213 self._event_server, self._logger, event, user, is_cancelling=False)
3214 else:
3215 with self._lock_configuration:
3216 if action_id not in self._config["action"]:
3217 raise MKClientError(
3218 "The action '%s' is not defined. After adding new commands please "
3219 "make sure that you activate the changes in the Event Console." % action_id)
3220 action = self._config["action"][action_id]
3221 cmk.ec.actions.do_event_action(self._history, self.settings, self._config, self._logger,
3222 self._event_columns, action, event, user)
3224 def handle_command_switchmode(self, arguments):
3225 new_mode = arguments[0]
3226 if not is_replication_slave(self._config):
3227 raise MKClientError("Cannot switch replication mode: this is not a replication slave.")
3228 elif new_mode not in ["sync", "takeover"]:
3229 raise MKClientError(
3230 "Invalid target mode '%s': allowed are only 'sync' and 'takeover'" % new_mode)
3231 self._slave_status["mode"] = new_mode
3232 save_slave_status(self.settings, self._slave_status)
3233 self._logger.info("Switched replication mode to '%s' by external command." % new_mode)
3235 def handle_replicate(self, argument, client_ip):
3236 # Last time our slave got a config update
3237 try:
3238 last_update = int(argument)
3239 if self.settings.options.debug:
3240 self._logger.info("Replication: sync request from %s, last update %d seconds ago" %
3241 (client_ip, time.time() - last_update))
3243 except Exception:
3244 raise MKClientError("Invalid arguments to command REPLICATE")
3245 return replication_send(self._config, self._lock_configuration, self._event_status,
3246 last_update)
3250 # .--Dispatching---------------------------------------------------------.
3251 # | ____ _ _ _ _ |
3252 # | | _ \(_)___ _ __ __ _| |_ ___| |__ (_)_ __ __ _ |
3253 # | | | | | / __| '_ \ / _` | __/ __| '_ \| | '_ \ / _` | |
3254 # | | |_| | \__ \ |_) | (_| | || (__| | | | | | | | (_| | |
3255 # | |____/|_|___/ .__/ \__,_|\__\___|_| |_|_|_| |_|\__, | |
3256 # | |_| |___/ |
3257 # +----------------------------------------------------------------------+
3258 # | Starten und Verwalten der beiden Threads. |
3259 # '----------------------------------------------------------------------'
3262 def run_eventd(terminate_main_event, settings, config, lock_configuration, history, perfcounters,
3263 event_status, event_server, status_server, slave_status, logger):
3264 status_server.start()
3265 event_server.start()
3266 now = time.time()
3267 next_housekeeping = now + config["housekeeping_interval"]
3268 next_retention = now + config["retention_interval"]
3269 next_statistics = now + config["statistics_interval"]
3270 next_replication = 0 # force immediate replication after restart
3272 while not terminate_main_event.is_set():
3273 try:
3274 try:
3275 # Wait until either housekeeping or retention is due, but at
3276 # maximum 60 seconds. That way changes of the interval from a very
3277 # high to a low value will never require more than 60 seconds
3279 event_list = [next_housekeeping, next_retention, next_statistics]
3280 if is_replication_slave(config):
3281 event_list.append(next_replication)
3283 time_left = max(0, min(event_list) - time.time())
3284 time.sleep(min(time_left, 60))
3286 now = time.time()
3287 if now > next_housekeeping:
3288 event_server.do_housekeeping()
3289 next_housekeeping = now + config["housekeeping_interval"]
3291 if now > next_retention:
3292 with event_status.lock:
3293 event_status.save_status()
3294 next_retention = now + config["retention_interval"]
3296 if now > next_statistics:
3297 perfcounters.do_statistics()
3298 next_statistics = now + config["statistics_interval"]
3300 # Beware: replication might be turned on during this loop!
3301 if is_replication_slave(config) and now > next_replication:
3302 replication_pull(settings, config, lock_configuration, perfcounters,
3303 event_status, event_server, slave_status, logger)
3304 next_replication = now + config["replication"]["interval"]
3305 except MKSignalException as e:
3306 raise e
3307 except Exception as e:
3308 logger.exception("Exception in main thread:\n%s" % e)
3309 if settings.options.debug:
3310 raise
3311 time.sleep(1)
3312 except MKSignalException as e:
3313 if e._signum == 1:
3314 logger.info("Received SIGHUP - going to reload configuration")
3315 reload_configuration(settings, logger, lock_configuration, history, event_status,
3316 event_server, status_server, slave_status)
3317 else:
3318 logger.info("Signalled to death by signal %d" % e._signum)
3319 terminate(terminate_main_event, event_server, status_server)
3321 # Now wait for termination of the server threads
3322 event_server.join()
3323 status_server.join()
3327 # .--EventStatus---------------------------------------------------------.
3328 # | _____ _ ____ _ _ |
3329 # | | ____|_ _____ _ __ | |_/ ___|| |_ __ _| |_ _ _ ___ |
3330 # | | _| \ \ / / _ \ '_ \| __\___ \| __/ _` | __| | | / __| |
3331 # | | |___ \ V / __/ | | | |_ ___) | || (_| | |_| |_| \__ \ |
3332 # | |_____| \_/ \___|_| |_|\__|____/ \__\__,_|\__|\__,_|___/ |
3333 # | |
3334 # +----------------------------------------------------------------------+
3335 # | Bereithalten des aktuellen Event-Status. Dieser schützt sich selbst |
3336 # | durch ein Lock vor gleichzeitigen Zugriffen durch die Threads. |
3337 # '----------------------------------------------------------------------'
3340 class EventStatus(object):
3341 def __init__(self, settings, config, perfcounters, history, logger):
3342 self.settings = settings
3343 self._config = config
3344 self._perfcounters = perfcounters
3345 self.lock = threading.Lock()
3346 self._history = history
3347 self._logger = logger
3348 self.flush()
3350 def reload_configuration(self, config):
3351 self._config = config
3353 def flush(self):
3354 self._events = []
3355 self._next_event_id = 1
3356 self._rule_stats = {}
3357 self._interval_starts = {} # needed for expecting rules
3358 self._initialize_event_limit_status()
3360 # TODO: might introduce some performance counters, like:
3361 # - number of received messages
3362 # - number of rule hits
3363 # - number of rule misses
3365 def events(self):
3366 return self._events
3368 def event(self, eid):
3369 for event in self._events:
3370 if event["id"] == eid:
3371 return event
3373 # Return beginning of current expectation interval. For new rules
3374 # we start with the next interval in future.
3375 def interval_start(self, rule_id, interval):
3376 if rule_id not in self._interval_starts:
3377 start = self.next_interval_start(interval, time.time())
3378 self._interval_starts[rule_id] = start
3379 return start
3380 else:
3381 start = self._interval_starts[rule_id]
3382 # Make sure that if the user switches from day to hour and we
3383 # are still waiting for the first interval to begin, that we
3384 # do not wait for the next day.
3385 next_interval = self.next_interval_start(interval, time.time())
3386 if start > next_interval:
3387 start = next_interval
3388 self._interval_starts[rule_id] = start
3389 return start
3391 def next_interval_start(self, interval, previous_start):
3392 if isinstance(interval, tuple):
3393 length, offset = interval
3394 offset *= 3600
3395 else:
3396 length = interval
3397 offset = 0
3399 previous_start -= offset # take into account timezone offset
3400 full_parts = divmod(previous_start, length)[0]
3401 next_start = (full_parts + 1) * length
3402 next_start += offset
3403 return next_start
3405 def start_next_interval(self, rule_id, interval):
3406 current_start = self.interval_start(rule_id, interval)
3407 next_start = self.next_interval_start(interval, current_start)
3408 self._interval_starts[rule_id] = next_start
3409 self._logger.debug("Rule %s: next interval starts %s (i.e. now + %.2f sec)" %
3410 (rule_id, next_start, time.time() - next_start))
3412 def pack_status(self):
3413 return {
3414 "next_event_id": self._next_event_id,
3415 "events": self._events,
3416 "rule_stats": self._rule_stats,
3417 "interval_starts": self._interval_starts,
3420 def unpack_status(self, status):
3421 self._next_event_id = status["next_event_id"]
3422 self._events = status["events"]
3423 self._rule_stats = status["rule_stats"]
3424 self._interval_starts = status["interval_starts"]
3426 def save_status(self):
3427 now = time.time()
3428 status = self.pack_status()
3429 path = self.settings.paths.status_file.value
3430 path_new = path.parent / (path.name + '.new')
3431 # Believe it or not: cPickle is more than two times slower than repr()
3432 with path_new.open(mode='wb') as f:
3433 f.write(repr(status) + "\n")
3434 f.flush()
3435 os.fsync(f.fileno())
3436 path_new.rename(path)
3437 elapsed = time.time() - now
3438 self._logger.verbose("Saved event state to %s in %.3fms." % (path, elapsed * 1000))
3440 def reset_counters(self, rule_id):
3441 if rule_id:
3442 if rule_id in self._rule_stats:
3443 del self._rule_stats[rule_id]
3444 else:
3445 self._rule_stats = {}
3446 self.save_status()
3448 def load_status(self, event_server):
3449 path = self.settings.paths.status_file.value
3450 if path.exists():
3451 try:
3452 status = ast.literal_eval(path.read_bytes())
3453 self._next_event_id = status["next_event_id"]
3454 self._events = status["events"]
3455 self._rule_stats = status["rule_stats"]
3456 self._interval_starts = status.get("interval_starts", {})
3457 self._initialize_event_limit_status()
3458 self._logger.info("Loaded event state from %s." % path)
3459 except Exception as e:
3460 self._logger.exception("Error loading event state from %s: %s" % (path, e))
3461 raise
3463 # Add new columns
3464 for event in self._events:
3465 event.setdefault("ipaddress", "")
3467 if "core_host" not in event:
3468 event_server.add_core_host_to_event(event)
3469 event["host_in_downtime"] = False
3471 # Called on Event Console initialization from status file to initialize
3472 # the current event limit state -> Sets internal counters which are
3473 # updated during runtime.
3474 def _initialize_event_limit_status(self):
3475 self.num_existing_events = len(self._events)
3477 self.num_existing_events_by_host = {}
3478 self.num_existing_events_by_rule = {}
3479 for event in self._events:
3480 self._count_event_add(event)
3482 def _count_event_add(self, event):
3483 if event["host"] not in self.num_existing_events_by_host:
3484 self.num_existing_events_by_host[event["host"]] = 1
3485 else:
3486 self.num_existing_events_by_host[event["host"]] += 1
3488 if event["rule_id"] not in self.num_existing_events_by_rule:
3489 self.num_existing_events_by_rule[event["rule_id"]] = 1
3490 else:
3491 self.num_existing_events_by_rule[event["rule_id"]] += 1
3493 def _count_event_remove(self, event):
3494 self.num_existing_events -= 1
3495 self.num_existing_events_by_host[event["host"]] -= 1
3496 self.num_existing_events_by_rule[event["rule_id"]] -= 1
3498 def new_event(self, event):
3499 self._perfcounters.count("events")
3500 event["id"] = self._next_event_id
3501 self._next_event_id += 1
3502 self._events.append(event)
3503 self.num_existing_events += 1
3504 self._count_event_add(event)
3505 self._history.add(event, "NEW")
3507 def archive_event(self, event):
3508 self._perfcounters.count("events")
3509 event["id"] = self._next_event_id
3510 self._next_event_id += 1
3511 event["phase"] = "closed"
3512 self._history.add(event, "ARCHIVED")
3514 def remove_event(self, event):
3515 try:
3516 self._events.remove(event)
3517 self._count_event_remove(event)
3518 except ValueError:
3519 self._logger.exception("Cannot remove event %d: not present" % event["id"])
3521 # protected by self.lock
3522 def _remove_event_by_nr(self, index):
3523 event = self._events.pop(index)
3524 self._count_event_remove(event)
3526 # protected by self.lock
3527 def remove_oldest_event(self, ty, event):
3528 if ty == "overall":
3529 self._logger.verbose(" Removing oldest event")
3530 self._remove_event_by_nr(0)
3531 elif ty == "by_rule":
3532 self._logger.verbose(" Removing oldest event of rule \"%s\"" % event["rule_id"])
3533 self._remove_oldest_event_of_rule(event["rule_id"])
3534 elif ty == "by_host":
3535 self._logger.verbose(" Removing oldest event of host \"%s\"" % event["host"])
3536 self._remove_oldest_event_of_host(event["host"])
3538 # protected by self.lock
3539 def _remove_oldest_event_of_rule(self, rule_id):
3540 for event in self._events:
3541 if event["rule_id"] == rule_id:
3542 self.remove_event(event)
3543 return
3545 # protected by self.lock
3546 def _remove_oldest_event_of_host(self, hostname):
3547 for event in self._events:
3548 if event["host"] == hostname:
3549 self.remove_event(event)
3550 return
3552 # protected by self.lock
3553 def get_num_existing_events_by(self, ty, event):
3554 if ty == "overall":
3555 return self.num_existing_events
3556 elif ty == "by_rule":
3557 return self.num_existing_events_by_rule.get(event["rule_id"], 0)
3558 elif ty == "by_host":
3559 return self.num_existing_events_by_host.get(event["host"], 0)
3560 else:
3561 raise NotImplementedError()
3563 # Cancel all events the belong to a certain rule id and are
3564 # of the same "breed" as a new event.
3565 def cancel_events(self, event_server, event_columns, new_event, match_groups, rule):
3566 with self.lock:
3567 to_delete = []
3568 for nr, event in enumerate(self._events):
3569 if event["rule_id"] == rule["id"]:
3570 if self.cancelling_match(match_groups, new_event, event, rule):
3571 # Fill a few fields of the cancelled event with data from
3572 # the cancelling event so that action scripts have useful
3573 # values and the logfile entry if more relevant.
3574 previous_phase = event["phase"]
3575 event["phase"] = "closed"
3576 # TODO: Why do we use OK below and not new_event["state"]???
3577 event["state"] = 0 # OK
3578 event["text"] = new_event["text"]
3579 # TODO: This is a hack and partial copy-n-paste from rewrite_events...
3580 if "set_text" in rule:
3581 event["text"] = replace_groups(rule["set_text"], event["text"],
3582 match_groups)
3583 event["time"] = new_event["time"]
3584 event["last"] = new_event["time"]
3585 event["priority"] = new_event["priority"]
3586 self._history.add(event, "CANCELLED")
3587 actions = rule.get("cancel_actions", [])
3588 if actions:
3589 if previous_phase != "open" \
3590 and rule.get("cancel_action_phases", "always") == "open":
3591 self._logger.info(
3592 "Do not execute cancelling actions, event %s's phase "
3593 "is not 'open' but '%s'" % (event["id"], previous_phase))
3594 else:
3595 cmk.ec.actions.do_event_actions(
3596 self._history,
3597 self.settings,
3598 self._config,
3599 self._logger,
3600 event_server,
3601 event_columns,
3602 actions,
3603 event,
3604 is_cancelling=True)
3606 to_delete.append(nr)
3608 for nr in to_delete[::-1]:
3609 self._remove_event_by_nr(nr)
3611 def cancelling_match(self, match_groups, new_event, event, rule):
3612 debug = self._config["debug_rules"]
3614 # The match_groups of the canceling match only contain the *_ok match groups
3615 # Since the rewrite definitions are based on the positive match, we need to
3616 # create some missing keys. O.o
3617 for key in match_groups.keys():
3618 if key.endswith("_ok"):
3619 match_groups[key[:-3]] = match_groups[key]
3621 # Note: before we compare host and application we need to
3622 # apply the rewrite rules to the event. Because if in the previous
3623 # the hostname was rewritten, it wouldn't match anymore here.
3624 host = new_event["host"]
3625 if "set_host" in rule:
3626 host = replace_groups(rule["set_host"], host, match_groups)
3628 if event["host"] != host:
3629 if debug:
3630 self._logger.info("Do not cancel event %d: host is not the same (%s != %s)" %
3631 (event["id"], event["host"], host))
3632 return False
3634 # The same for the application. But in case there is cancelling based on the application
3635 # configured in the rule, then don't check for different applications.
3636 if "cancel_application" not in rule:
3637 application = new_event["application"]
3638 if "set_application" in rule:
3639 application = replace_groups(rule["set_application"], application, match_groups)
3640 if event["application"] != application:
3641 if debug:
3642 self._logger.info(
3643 "Do not cancel event %d: application is not the same (%s != %s)" %
3644 (event["id"], event["application"], application))
3645 return False
3647 if event["facility"] != new_event["facility"]:
3648 if debug:
3649 self._logger.info(
3650 "Do not cancel event %d: syslog facility is not the same (%d != %d)" %
3651 (event["id"], event["facility"], new_event["facility"]))
3653 # Make sure, that the matching groups are the same. If the OK match
3654 # has less groups, we do not care. If it has more groups, then we
3655 # do not care either. We just compare the common "prefix".
3656 for nr, (prev_group, cur_group) in enumerate(
3657 zip(event["match_groups"], match_groups.get("match_groups_message_ok", ()))):
3658 if prev_group != cur_group:
3659 if debug:
3660 self._logger.info("Do not cancel event %d: match group number "
3661 "%d does not match (%s != %s)" % (event["id"], nr + 1,
3662 prev_group, cur_group))
3663 return False
3665 # Note: Duplicated code right above
3666 # Make sure, that the syslog_application matching groups are the same. If the OK match
3667 # has less groups, we do not care. If it has more groups, then we
3668 # do not care either. We just compare the common "prefix".
3669 for nr, (prev_group, cur_group) in enumerate(
3670 zip(
3671 event.get("match_groups_syslog_application", ()),
3672 match_groups.get("match_groups_syslog_application_ok", ()))):
3673 if prev_group != cur_group:
3674 if debug:
3675 self._logger.info(
3676 "Do not cancel event %d: syslog application match group number "
3677 "%d does not match (%s != %s)" % (event["id"], nr + 1, prev_group,
3678 cur_group))
3679 return False
3681 return True
3683 def count_rule_match(self, rule_id):
3684 with self.lock:
3685 self._rule_stats.setdefault(rule_id, 0)
3686 self._rule_stats[rule_id] += 1
3688 def count_event_up(self, found, event):
3689 # Update event with new information from new occurrance,
3690 # but preserve certain attributes from the original (first)
3691 # event.
3692 preserve = {
3693 "count": found.get("count", 1) + 1,
3694 "first": found["first"],
3696 # When event is already active then do not change
3697 # comment or contact information anymore
3698 if found["phase"] == "open":
3699 if "comment" in found:
3700 preserve["comment"] = found["comment"]
3701 if "contact" in found:
3702 preserve["contact"] = found["contact"]
3703 found.update(event)
3704 found.update(preserve)
3706 def count_expected_event(self, event_server, event):
3707 for ev in self._events:
3708 if ev["rule_id"] == event["rule_id"] and ev["phase"] == "counting":
3709 self.count_event_up(ev, event)
3710 return
3712 # None found, create one
3713 event["count"] = 1
3714 event["phase"] = "counting"
3715 event_server.new_event_respecting_limits(event)
3717 def count_event(self, event_server, event, rule, count):
3718 # Find previous occurrance of this event and acount for
3719 # one new occurrance. In case of negated count (expecting rules)
3720 # we do never modify events that are already in the state "open"
3721 # since the event has been created because the count was too
3722 # low in the specified period of time.
3723 for ev in self._events:
3724 if ev["rule_id"] == event["rule_id"]:
3725 if ev["phase"] == "ack" and not count["count_ack"]:
3726 continue # skip acknowledged events
3728 if count["separate_host"] and ev["host"] != event["host"]:
3729 continue # treat events with separated hosts separately
3731 if count["separate_application"] and ev["application"] != event["application"]:
3732 continue # same for application
3734 if count["separate_match_groups"] and ev["match_groups"] != event["match_groups"]:
3735 continue
3737 if count.get("count_duration"
3738 ) is not None and ev["first"] + count["count_duration"] < event["time"]:
3739 # Counting has been discontinued on this event after a certain time
3740 continue
3742 if ev["host_in_downtime"] != event["host_in_downtime"]:
3743 continue # treat events with different downtime states separately
3745 found = ev
3746 self.count_event_up(found, event)
3747 break
3748 else:
3749 event["count"] = 1
3750 event["phase"] = "counting"
3751 event_server.new_event_respecting_limits(event)
3752 found = event
3754 # Did we just count the event that was just one too much?
3755 if found["phase"] == "counting" and found["count"] >= count["count"]:
3756 found["phase"] = "open"
3757 return found # do event action, return found copy of event
3758 return False # do not do event action
3760 # locked with self.lock
3761 def delete_event(self, event_id, user):
3762 for nr, event in enumerate(self._events):
3763 if event["id"] == event_id:
3764 event["phase"] = "closed"
3765 if user:
3766 event["owner"] = user
3767 self._history.add(event, "DELETE", user)
3768 self._remove_event_by_nr(nr)
3769 return
3770 raise MKClientError("No event with id %s" % event_id)
3772 def get_events(self):
3773 return self._events
3775 def get_rule_stats(self):
3776 return sorted(self._rule_stats.iteritems(), key=lambda x: x[0])
3780 # .--Replication---------------------------------------------------------.
3781 # | ____ _ _ _ _ |
3782 # | | _ \ ___ _ __ | (_) ___ __ _| |_(_) ___ _ __ |
3783 # | | |_) / _ \ '_ \| | |/ __/ _` | __| |/ _ \| '_ \ |
3784 # | | _ < __/ |_) | | | (_| (_| | |_| | (_) | | | | |
3785 # | |_| \_\___| .__/|_|_|\___\__,_|\__|_|\___/|_| |_| |
3786 # | |_| |
3787 # +----------------------------------------------------------------------+
3788 # | Functions for doing replication, master and slave parts. |
3789 # '----------------------------------------------------------------------'
3792 def is_replication_slave(config):
3793 repl_settings = config["replication"]
3794 return repl_settings and not repl_settings.get("disabled")
3797 def replication_allow_command(config, command, slave_status):
3798 if is_replication_slave(config) and slave_status["mode"] == "sync" \
3799 and command in ["DELETE", "UPDATE", "CHANGESTATE", "ACTION"]:
3800 raise MKClientError("This command is not allowed on a replication slave "
3801 "while it is in sync mode.")
3804 def replication_send(config, lock_configuration, event_status, last_update):
3805 response = {}
3806 with lock_configuration:
3807 response["status"] = event_status.pack_status()
3808 if last_update < config["last_reload"]:
3809 response["rules"] = config[
3810 "rules"] # Remove one bright day, where legacy rules are not needed anymore
3811 response["rule_packs"] = config["rule_packs"]
3812 response["actions"] = config["actions"]
3813 return response
3816 def replication_pull(settings, config, lock_configuration, perfcounters, event_status, event_server,
3817 slave_status, logger):
3818 # We distinguish two modes:
3819 # 1. slave mode: just pull the current state from the master.
3820 # if the master is not reachable then decide whether to
3821 # switch to takeover mode.
3822 # 2. takeover mode: if automatic fallback is enabled and the
3823 # time frame for that has not yet ellapsed, then try to
3824 # pull the current state from the master. If that is successful
3825 # then switch back to slave mode. If not automatic fallback
3826 # is enabled then simply do nothing.
3827 now = time.time()
3828 repl_settings = config["replication"]
3829 mode = slave_status["mode"]
3830 need_sync = mode == "sync" or (
3831 mode == "takeover" and "fallback" in repl_settings and
3832 (slave_status["last_master_down"] is None or
3833 now - repl_settings["fallback"] < slave_status["last_master_down"]))
3835 if need_sync:
3836 with event_status.lock:
3837 with lock_configuration:
3839 try:
3840 new_state = get_state_from_master(config, slave_status)
3841 replication_update_state(settings, config, event_status, event_server,
3842 new_state)
3843 if repl_settings.get("logging"):
3844 logger.info("Successfully synchronized with master")
3845 slave_status["last_sync"] = now
3846 slave_status["success"] = True
3848 # Fall back to slave mode after successful sync
3849 # (time frame has already been checked)
3850 if mode == "takeover":
3851 if slave_status["last_master_down"] is None:
3852 logger.info("Replication: master reachable for the first time, "
3853 "switching back to slave mode")
3854 slave_status["mode"] = "sync"
3855 else:
3856 logger.info("Replication: master reachable again after %d seconds, "
3857 "switching back to sync mode" %
3858 (now - slave_status["last_master_down"]))
3859 slave_status["mode"] = "sync"
3860 slave_status["last_master_down"] = None
3862 except Exception as e:
3863 logger.warning("Replication: cannot sync with master: %s" % e)
3864 slave_status["success"] = False
3865 if slave_status["last_master_down"] is None:
3866 slave_status["last_master_down"] = now
3868 # Takeover
3869 if "takeover" in repl_settings and mode != "takeover":
3870 if not slave_status["last_sync"]:
3871 if repl_settings.get("logging"):
3872 logger.error(
3873 "Replication: no takeover since master was never reached.")
3874 else:
3875 offline = now - slave_status["last_sync"]
3876 if offline < repl_settings["takeover"]:
3877 if repl_settings.get("logging"):
3878 logger.warning(
3879 "Replication: no takeover yet, still %d seconds to wait" %
3880 (repl_settings["takeover"] - offline))
3881 else:
3882 logger.info(
3883 "Replication: master not reached for %d seconds, taking over!" %
3884 offline)
3885 slave_status["mode"] = "takeover"
3887 save_slave_status(settings, slave_status)
3889 # Compute statistics of the average time needed for a sync
3890 perfcounters.count_time("sync", time.time() - now)
3893 def replication_update_state(settings, config, event_status, event_server, new_state):
3895 # Keep a copy of the masters' rules and actions and also prepare using them
3896 if "rules" in new_state:
3897 save_master_config(settings, new_state)
3898 event_server.compile_rules(new_state["rules"], new_state.get("rule_packs", []))
3899 config["actions"] = new_state["actions"]
3901 # Update to the masters' event state
3902 event_status.unpack_status(new_state["status"])
3905 def save_master_config(settings, new_state):
3906 path = settings.paths.master_config_file.value
3907 path_new = path.parent / (path.name + '.new')
3908 path_new.write_bytes(
3909 repr({
3910 "rules": new_state["rules"],
3911 "rule_packs": new_state["rule_packs"],
3912 "actions": new_state["actions"],
3913 }) + "\n")
3914 path_new.rename(path)
3917 def load_master_config(settings, config, logger):
3918 path = settings.paths.master_config_file.value
3919 try:
3920 config = ast.literal_eval(path.read_bytes())
3921 config["rules"] = config["rules"]
3922 config["rule_packs"] = config.get("rule_packs", [])
3923 config["actions"] = config["actions"]
3924 logger.info("Replication: restored %d rule packs and %d actions from %s" % (len(
3925 config["rule_packs"]), len(config["actions"]), path))
3926 except Exception:
3927 if is_replication_slave(config):
3928 logger.error("Replication: no previously saved master state available")
3931 def get_state_from_master(config, slave_status):
3932 repl_settings = config["replication"]
3933 try:
3934 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3935 sock.settimeout(repl_settings["connect_timeout"])
3936 sock.connect(repl_settings["master"])
3937 sock.sendall(
3938 "REPLICATE %d\n" % (slave_status["last_sync"] and slave_status["last_sync"] or 0))
3939 sock.shutdown(socket.SHUT_WR)
3941 response_text = ""
3942 while True:
3943 chunk = sock.recv(8192)
3944 response_text += chunk
3945 if not chunk:
3946 break
3948 return ast.literal_eval(response_text)
3949 except SyntaxError as e:
3950 raise Exception("Invalid response from event daemon: <pre>%s</pre>" % response_text)
3952 except IOError as e:
3953 raise Exception("Master not responding: %s" % e)
3955 except Exception as e:
3956 raise Exception("Cannot connect to event daemon: %s" % e)
3959 def save_slave_status(settings, slave_status):
3960 settings.paths.slave_status_file.value.write_bytes(repr(slave_status) + "\n")
3963 def default_slave_status_master():
3964 return {
3965 "last_sync": 0,
3966 "last_master_down": None,
3967 "mode": "master",
3968 "average_sync_time": None,
3972 def default_slave_status_sync():
3973 return {
3974 "last_sync": 0,
3975 "last_master_down": None,
3976 "mode": "sync",
3977 "average_sync_time": None,
3981 def update_slave_status(slave_status, settings, config):
3982 path = settings.paths.slave_status_file.value
3983 if is_replication_slave(config):
3984 try:
3985 slave_status.update(ast.literal_eval(path.read_bytes()))
3986 except Exception:
3987 slave_status.update(default_slave_status_sync())
3988 save_slave_status(settings, slave_status)
3989 else:
3990 if path.exists():
3991 path.unlink()
3992 slave_status.update(default_slave_status_master())
3996 # .--Configuration-------------------------------------------------------.
3997 # | ____ __ _ _ _ |
3998 # | / ___|___ _ __ / _(_) __ _ _ _ _ __ __ _| |_(_) ___ _ __ |
3999 # | | | / _ \| '_ \| |_| |/ _` | | | | '__/ _` | __| |/ _ \| '_ \ |
4000 # | | |__| (_) | | | | _| | (_| | |_| | | | (_| | |_| | (_) | | | | |
4001 # | \____\___/|_| |_|_| |_|\__, |\__,_|_| \__,_|\__|_|\___/|_| |_| |
4002 # | |___/ |
4003 # +----------------------------------------------------------------------+
4004 # | Loading of the configuration files |
4005 # '----------------------------------------------------------------------'
4008 def load_configuration(settings, logger, slave_status):
4009 config = cmk.ec.export.load_config(settings)
4011 # If not set by command line, set the log level by configuration
4012 if settings.options.verbosity == 0:
4013 levels = config["log_level"]
4014 logger.setLevel(levels["cmk.mkeventd"])
4015 logger.getChild("EventServer").setLevel(levels["cmk.mkeventd.EventServer"])
4016 if "cmk.mkeventd.EventServer.snmp" in levels:
4017 logger.getChild("EventServer.snmp").setLevel(levels["cmk.mkeventd.EventServer.snmp"])
4018 logger.getChild("EventStatus").setLevel(levels["cmk.mkeventd.EventStatus"])
4019 logger.getChild("StatusServer").setLevel(levels["cmk.mkeventd.StatusServer"])
4020 logger.getChild("lock").setLevel(levels["cmk.mkeventd.lock"])
4022 # Are we a replication slave? Parts of the configuration
4023 # will be overridden by values from the master.
4024 update_slave_status(slave_status, settings, config)
4025 if is_replication_slave(config):
4026 logger.info("Replication: slave configuration, current mode: %s" % slave_status["mode"])
4027 load_master_config(settings, config, logger)
4029 # Create dictionary for actions for easy access
4030 config["action"] = {}
4031 for action in config["actions"]:
4032 config["action"][action["id"]] = action
4034 config["last_reload"] = time.time()
4036 return config
4039 def reload_configuration(settings, logger, lock_configuration, history, event_status, event_server,
4040 status_server, slave_status):
4041 with lock_configuration:
4042 config = load_configuration(settings, logger, slave_status)
4043 history.reload_configuration(config)
4044 event_server.reload_configuration(config)
4046 event_status.reload_configuration(config)
4047 status_server.reload_configuration(config)
4048 logger.info("Reloaded configuration.")
4052 # .--Main----------------------------------------------------------------.
4053 # | __ __ _ |
4054 # | | \/ | __ _(_)_ __ |
4055 # | | |\/| |/ _` | | '_ \ |
4056 # | | | | | (_| | | | | | |
4057 # | |_| |_|\__,_|_|_| |_| |
4058 # | |
4059 # +----------------------------------------------------------------------+
4060 # | Main entry and option parsing |
4061 # '----------------------------------------------------------------------'
4064 def main():
4065 os.unsetenv("LANG")
4066 logger = cmk.utils.log.get_logger("mkeventd")
4067 settings = cmk.ec.settings.settings(cmk.__version__, pathlib.Path(cmk.utils.paths.omd_root),
4068 pathlib.Path(cmk.utils.paths.default_config_dir), sys.argv)
4070 pid_path = None
4071 try:
4072 cmk.utils.log.open_log(sys.stderr)
4073 cmk.utils.log.set_verbosity(settings.options.verbosity)
4075 settings.paths.log_file.value.parent.mkdir(parents=True, exist_ok=True)
4076 if not settings.options.foreground:
4077 cmk.utils.log.open_log(str(settings.paths.log_file.value))
4079 logger.info("-" * 65)
4080 logger.info("mkeventd version %s starting" % cmk.__version__)
4082 slave_status = default_slave_status_master()
4083 config = load_configuration(settings, logger, slave_status)
4084 history = cmk.ec.history.History(settings, config, logger, StatusTableEvents.columns,
4085 StatusTableHistory.columns)
4087 pid_path = settings.paths.pid_file.value
4088 if pid_path.exists():
4089 old_pid = int(pid_path.read_text(encoding='utf-8'))
4090 if process_exists(old_pid):
4091 bail_out(
4092 logger, "Old PID file %s still existing and mkeventd still running with PID %d."
4093 % (pid_path, old_pid))
4094 pid_path.unlink()
4095 logger.info("Removed orphaned PID file %s (process %d not running anymore)." %
4096 (pid_path, old_pid))
4098 # Make sure paths exist
4099 settings.paths.event_pipe.value.parent.mkdir(parents=True, exist_ok=True)
4100 settings.paths.status_file.value.parent.mkdir(parents=True, exist_ok=True)
4102 # First do all things that might fail, before daemonizing
4103 perfcounters = Perfcounters(logger.getChild("lock.perfcounters"))
4104 event_status = EventStatus(settings, config, perfcounters, history,
4105 logger.getChild("EventStatus"))
4106 lock_configuration = ECLock(logger.getChild("lock.configuration"))
4107 event_server = EventServer(
4108 logger.getChild("EventServer"), settings, config, slave_status, perfcounters,
4109 lock_configuration, history, event_status, StatusTableEvents.columns)
4110 terminate_main_event = threading.Event()
4111 status_server = StatusServer(
4112 logger.getChild("StatusServer"), settings, config, slave_status, perfcounters,
4113 lock_configuration, history, event_status, event_server, terminate_main_event)
4115 event_status.load_status(event_server)
4116 event_server.compile_rules(config["rules"], config["rule_packs"])
4118 if not settings.options.foreground:
4119 pid_path.parent.mkdir(parents=True, exist_ok=True)
4120 cmk.utils.daemon.daemonize()
4121 logger.info("Daemonized with PID %d." % os.getpid())
4123 cmk.utils.daemon.lock_with_pid_file(str(pid_path))
4125 # Install signal hander
4126 def signal_handler(signum, stack_frame):
4127 logger.verbose("Got signal %d." % signum)
4128 raise MKSignalException(signum)
4130 signal.signal(signal.SIGHUP, signal_handler)
4131 signal.signal(signal.SIGINT, signal_handler)
4132 signal.signal(signal.SIGQUIT, signal_handler)
4133 signal.signal(signal.SIGTERM, signal_handler)
4135 # Now let's go...
4136 run_eventd(terminate_main_event, settings, config, lock_configuration, history,
4137 perfcounters, event_status, event_server, status_server, slave_status, logger)
4139 # We reach this point, if the server has been killed by
4140 # a signal or hitting Ctrl-C (in foreground mode)
4142 # TODO: Move this cleanup stuff to the classes that are responsible for these resources
4144 # Remove event pipe and drain it, so that we make sure
4145 # that processes (syslog, etc) will not hang when trying
4146 # to write into the pipe.
4147 logger.verbose("Cleaning up event pipe")
4148 pipe = event_server.open_pipe() # Open it
4149 settings.paths.event_pipe.value.unlink() # Remove pipe
4150 drain_pipe(pipe) # Drain any data
4151 os.close(pipe) # Close pipe
4153 logger.verbose("Saving final event state")
4154 event_status.save_status()
4156 logger.verbose("Cleaning up sockets")
4157 settings.paths.unix_socket.value.unlink()
4158 settings.paths.event_socket.value.unlink()
4160 logger.verbose("Output hash stats")
4161 event_server.output_hash_stats()
4163 logger.verbose("Closing fds which might be still open")
4164 for fd in [
4165 settings.options.syslog_udp, settings.options.syslog_tcp,
4166 settings.options.snmptrap_udp
4168 try:
4169 if isinstance(fd, cmk.ec.settings.FileDescriptor):
4170 os.close(fd.value)
4171 except Exception:
4172 pass
4174 logger.info("Successfully shut down.")
4175 sys.exit(0)
4177 except Exception:
4178 if settings.options.debug:
4179 raise
4180 bail_out(logger, traceback.format_exc())
4182 finally:
4183 if pid_path and cmk.utils.store.have_lock(str(pid_path)):
4184 try:
4185 pid_path.unlink()
4186 except OSError:
4187 pass
4190 if __name__ == "__main__":
4191 main()