Cleanup config.nodes_of
[check_mk.git] / cmk / ec / main.py
blob26f86d05b95e728e2c4f51a4cc6105d50f7f4cf5
1 #!/usr/bin/env python
2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
9 # | |
10 # | Copyright Mathias Kettner 2014 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
27 # TODO: Refactor/document locking. It is not clear when and how to apply
28 # locks or when they are held by which component.
30 # TODO: Refactor events to be handled as objects, e.g. in case when
31 # creating objects. Or at least update the documentation. It is not clear
32 # which fields are mandatory for the events.
34 import abc
35 import ast
36 import errno
37 import json
38 import os
39 import pprint
40 import re
41 import select
42 import signal
43 import socket
44 import sys
45 import threading
46 import time
47 import traceback
48 from typing import Any, Dict, List, Optional, Tuple, Union # pylint: disable=unused-import
50 import pathlib2 as pathlib
51 import six
53 import cmk
54 import cmk.utils.daemon
55 import cmk.utils.defines
56 import cmk.ec.actions
57 import cmk.ec.export
58 import cmk.ec.history
59 import cmk.ec.settings
60 import cmk.ec.snmp
61 import cmk.utils.log
62 import cmk.utils.paths
63 import cmk.utils.profile
64 import cmk.utils.render
65 import cmk.utils.regex
66 import cmk.utils.debug
68 # suppress "Cannot find module" error from mypy
69 import livestatus # type: ignore
72 class SyslogPriority(object):
73 NAMES = {
74 0: "emerg",
75 1: "alert",
76 2: "crit",
77 3: "err",
78 4: "warning",
79 5: "notice",
80 6: "info",
81 7: "debug",
84 def __init__(self, value):
85 super(SyslogPriority, self).__init__()
86 self.value = int(value)
88 def __repr__(self):
89 return "SyslogPriority(%d)" % self.value
91 def __str__(self):
92 try:
93 return self.NAMES[self.value]
94 except KeyError:
95 return "(unknown priority %d)" % self.value
98 class SyslogFacility(object):
99 NAMES = {
100 0: 'kern',
101 1: 'user',
102 2: 'mail',
103 3: 'daemon',
104 4: 'auth',
105 5: 'syslog',
106 6: 'lpr',
107 7: 'news',
108 8: 'uucp',
109 9: 'cron',
110 10: 'authpriv',
111 11: 'ftp',
112 12: "ntp",
113 13: "logaudit",
114 14: "logalert",
115 15: "clock",
116 16: 'local0',
117 17: 'local1',
118 18: 'local2',
119 19: 'local3',
120 20: 'local4',
121 21: 'local5',
122 22: 'local6',
123 23: 'local7',
124 31: 'snmptrap', # HACK!
127 def __init__(self, value):
128 super(SyslogFacility, self).__init__()
129 self.value = int(value)
131 def __repr__(self):
132 return "SyslogFacility(%d)" % self.value
134 def __str__(self):
135 try:
136 return self.NAMES[self.value]
137 except KeyError:
138 return "(unknown facility %d)" % self.value
141 # Alas, we often have no clue about the actual encoding, so we have to guess:
142 # Initially we assume UTF-8, but fall back to latin-1 if it didn't work.
143 def decode_from_bytes(string_as_bytes):
144 # This is just a safeguard if we are inadvertedly called with a Unicode
145 # string. In theory this should never happen, but given the typing chaos in
146 # this script, one never knows. In the Unicode case, Python tries to be
147 # "helpful", but this fails miserably: Calling 'decode' on a Unicode string
148 # implicitly converts it via 'encode("ascii")' to a byte string first, but
149 # this can of course fail and doesn't make sense at all when we immediately
150 # call 'decode' on this byte string again. In a nutshell: The implicit
151 # conversions between str and unicode are a fundamentally broken idea, just
152 # like all implicit things and "helpful" ideas in general. :-P For further
153 # info see e.g. http://nedbatchelder.com/text/unipain.html
154 if isinstance(string_as_bytes, unicode):
155 return string_as_bytes
157 try:
158 return string_as_bytes.decode("utf-8")
159 except Exception:
160 return string_as_bytes.decode("latin-1")
163 def scrub_and_decode(s):
164 return decode_from_bytes(cmk.ec.history.scrub_string(s))
168 # .--Helper functions----------------------------------------------------.
169 # | _ _ _ |
170 # | | | | | ___| |_ __ ___ _ __ ___ |
171 # | | |_| |/ _ \ | '_ \ / _ \ '__/ __| |
172 # | | _ | __/ | |_) | __/ | \__ \ |
173 # | |_| |_|\___|_| .__/ \___|_| |___/ |
174 # | |_| |
175 # +----------------------------------------------------------------------+
176 # | Various helper functions |
177 # '----------------------------------------------------------------------'
180 class ECLock(object):
181 def __init__(self, logger):
182 super(ECLock, self).__init__()
183 self._logger = logger
184 self._lock = threading.Lock()
186 def acquire(self, blocking=True):
187 self._logger.debug("[%s] Trying to acquire lock", threading.current_thread().name)
189 ret = self._lock.acquire(blocking)
190 if ret is True:
191 self._logger.debug("[%s] Acquired lock", threading.current_thread().name)
192 else:
193 self._logger.debug("[%s] Non-blocking aquire failed", threading.current_thread().name)
195 return ret
197 def release(self):
198 self._logger.debug("[%s] Releasing lock", threading.current_thread().name)
199 self._lock.release()
201 def __enter__(self):
202 self.acquire()
204 def __exit__(self, exc_type, exc_val, exc_tb):
205 self.release()
206 return False # Do not swallow exceptions
209 class ECServerThread(threading.Thread):
210 @abc.abstractmethod
211 def serve(self):
212 raise NotImplementedError()
214 def __init__(self, name, logger, settings, config, slave_status, profiling_enabled,
215 profile_file):
216 super(ECServerThread, self).__init__(name=name)
217 self.settings = settings
218 self._config = config
219 self._slave_status = slave_status
220 self._profiling_enabled = profiling_enabled
221 self._profile_file = profile_file
222 self._terminate_event = threading.Event()
223 self._logger = logger
225 def run(self):
226 self._logger.info("Starting up")
228 while not self._terminate_event.is_set():
229 try:
230 with cmk.utils.profile.Profile(
231 enabled=self._profiling_enabled, profile_file=str(self._profile_file)):
232 self.serve()
233 except Exception:
234 self._logger.exception("Exception in %s server" % self.name)
235 if self.settings.options.debug:
236 raise
237 time.sleep(1)
239 self._logger.info("Terminated")
241 def terminate(self):
242 self._terminate_event.set()
245 def terminate(terminate_main_event, event_server, status_server):
246 terminate_main_event.set()
247 status_server.terminate()
248 event_server.terminate()
251 def bail_out(logger, reason):
252 logger.error("FATAL ERROR: %s" % reason)
253 sys.exit(1)
256 def process_exists(pid):
257 try:
258 os.kill(pid, 0)
259 return True
260 except Exception:
261 return False
264 def drain_pipe(pipe):
265 while True:
266 try:
267 readable = select.select([pipe], [], [], 0.1)[0]
268 except select.error as e:
269 if e[0] == errno.EINTR:
270 continue
271 raise
273 data = None
274 if pipe in readable:
275 try:
276 data = os.read(pipe, 4096)
277 if not data: # EOF
278 break
279 except Exception:
280 break # Error while reading
281 else:
282 break # No data available
285 def match(pattern, text, complete=True):
286 """Performs an EC style matching test of pattern on text
288 Returns False in case of no match or a tuple with the match groups.
289 In case no match group is produced, it returns an empty tuple."""
290 if pattern is None:
291 return ()
293 elif isinstance(pattern, six.string_types):
294 if complete:
295 return () if pattern == text.lower() else False
296 return () if pattern in text.lower() else False
298 # Assume compiled regex
299 m = pattern.search(text)
300 if m:
301 groups = m.groups()
302 if None in groups:
303 # Remove None from result tuples and replace it with empty strings
304 return tuple([g if g is not None else '' for g in groups])
305 return groups
307 return False
310 def format_pattern(pat):
311 try:
312 return pat.pattern
313 except Exception:
314 return pat
317 # Sorry: this code is dupliated in web/plugins/wato/mkeventd.py
318 def match_ipv4_network(pattern, ipaddress_text):
319 network, network_bits = parse_ipv4_network(pattern) # is validated by valuespec
320 if network_bits == 0:
321 return True # event if ipaddress is empty
322 try:
323 ipaddress = parse_ipv4_address(ipaddress_text)
324 except Exception:
325 return False # invalid address never matches
327 # first network_bits of network and ipaddress must be
328 # identical. Create a bitmask.
329 bitmask = 0
330 for n in xrange(32):
331 bitmask = bitmask << 1
332 if n < network_bits:
333 bit = 1
334 else:
335 bit = 0
336 bitmask += bit
338 return (network & bitmask) == (ipaddress & bitmask)
341 def parse_ipv4_address(text):
342 parts = map(int, text.split("."))
343 return (parts[0] << 24) + (parts[1] << 16) + (parts[2] << 8) + parts[3]
346 def parse_ipv4_network(text):
347 if "/" not in text:
348 return parse_ipv4_address(text), 32
350 network_text, bits_text = text.split("/")
351 return parse_ipv4_address(network_text), int(bits_text)
354 def replace_groups(text, origtext, match_groups):
355 # replace \0 with text itself. This allows to add information
356 # in front or and the end of a message
357 text = text.replace("\\0", origtext)
359 # Generic replacement with \1, \2, ...
360 match_groups_message = match_groups.get("match_groups_message")
361 if isinstance(match_groups_message, tuple):
362 for nr, g in enumerate(match_groups_message):
363 text = text.replace("\\%d" % (nr + 1), g)
365 # Replacement with keyword
366 # Right now we have
367 # $MATCH_GROUPS_MESSAGE_x$
368 # $MATCH_GROUPS_SYSLOG_APPLICATION_x$
369 for key_prefix, values in match_groups.iteritems():
370 if not isinstance(values, tuple):
371 continue
373 for idx, match_value in enumerate(values):
374 text = text.replace("$%s_%d$" % (key_prefix.upper(), idx + 1), match_value)
376 return text
379 class MKSignalException(Exception):
380 def __init__(self, signum):
381 Exception.__init__(self, "Got signal %d" % signum)
382 self._signum = signum
385 class MKClientError(Exception):
386 def __init__(self, t):
387 Exception.__init__(self, t)
391 # .--Timeperiods---------------------------------------------------------.
392 # | _____ _ _ _ |
393 # | |_ _(_)_ __ ___ ___ _ __ ___ _ __(_) ___ __| |___ |
394 # | | | | | '_ ` _ \ / _ \ '_ \ / _ \ '__| |/ _ \ / _` / __| |
395 # | | | | | | | | | | __/ |_) | __/ | | | (_) | (_| \__ \ |
396 # | |_| |_|_| |_| |_|\___| .__/ \___|_| |_|\___/ \__,_|___/ |
397 # | |_| |
398 # +----------------------------------------------------------------------+
399 # | Timeperiods are used in rule conditions |
400 # '----------------------------------------------------------------------'
403 class TimePeriods(object):
404 def __init__(self, logger):
405 super(TimePeriods, self).__init__()
406 self._logger = logger
407 self._periods = None
408 self._last_update = 0
410 def _update(self):
411 if self._periods is not None and int(time.time()) / 60 == self._last_update:
412 return # only update once a minute
413 try:
414 table = livestatus.LocalConnection().query("GET timeperiods\nColumns: name alias in")
415 periods = {}
416 for tpname, alias, isin in table:
417 periods[tpname] = (alias, bool(isin))
418 self._periods = periods
419 self._last_update = int(time.time()) / 60
420 except Exception as e:
421 self._logger.exception("Cannot update timeperiod information: %s" % e)
422 raise
424 def check(self, tpname):
425 self._update()
426 if not self._periods:
427 self._logger.warning("no timeperiod information, assuming %s is active" % tpname)
428 return True
429 if tpname not in self._periods:
430 self._logger.warning("no such timeperiod %s, assuming it is active" % tpname)
431 return True
432 return self._periods[tpname][1]
436 # .--Host config---------------------------------------------------------.
437 # | _ _ _ __ _ |
438 # | | | | | ___ ___| |_ ___ ___ _ __ / _(_) __ _ |
439 # | | |_| |/ _ \/ __| __| / __/ _ \| '_ \| |_| |/ _` | |
440 # | | _ | (_) \__ \ |_ | (_| (_) | | | | _| | (_| | |
441 # | |_| |_|\___/|___/\__| \___\___/|_| |_|_| |_|\__, | |
442 # | |___/ |
443 # +----------------------------------------------------------------------+
444 # | Manages the configuration of the hosts of the local monitoring core. |
445 # | It fetches and caches the information during runtine of the EC. |
446 # '----------------------------------------------------------------------'
449 class HostConfig(object):
450 def __init__(self, logger):
451 self._logger = logger
452 self.initialize()
454 def initialize(self):
455 self._logger.debug("Initializing host config")
456 self._event_host_to_host = {}
458 self._hosts_by_name = {}
459 self._hosts_by_lower_name = {}
460 self._hosts_by_lower_alias = {}
461 self._hosts_by_lower_address = {}
463 self._got_config_from_core = False
465 def get(self, host_name, deflt=None):
466 return self._hosts_by_name.get(host_name, deflt)
468 def get_by_event_host_name(self, event_host_name, deflt=None):
469 try:
470 self._update_from_core()
471 except Exception:
472 self._logger.exception("Failed to get host info from core. Try again later.")
473 return
475 try:
476 return self._event_host_to_host[event_host_name]
477 except KeyError:
478 pass # Not cached yet
480 # Note: It is important that we use exactly the same algorithm here as in the core
481 # (enterprise/core/src/World.cc getHostByDesignation)
483 # Host name : Case insensitive equality (host_name =~ %s)
484 # Host alias : Case insensitive equality (host_alias =~ %s)
485 # Host address : Case insensitive equality (host_address =~ %s)
486 low_event_host_name = event_host_name.lower()
488 host = deflt
489 for search_map in [
490 self._hosts_by_lower_name, self._hosts_by_lower_address, self._hosts_by_lower_alias
492 try:
493 host = search_map[low_event_host_name]
494 break
495 except KeyError:
496 continue
498 self._event_host_to_host[event_host_name] = host
499 return host
501 def _update_from_core(self):
502 if not self._has_core_config_reloaded():
503 return
505 self.initialize()
506 self._logger.debug("Fetching host config from core")
508 columns = [
509 "name",
510 "alias",
511 "address",
512 "custom_variables",
513 "contacts",
514 "contact_groups",
517 query = "GET hosts\nColumns: %s" % " ".join(columns)
518 for host in livestatus.LocalConnection().query_table_assoc(query):
519 self._hosts_by_name[host["name"]] = host
521 # Lookup maps to improve performance of host searches
522 self._hosts_by_lower_name[host["name"].lower()] = host
523 self._hosts_by_lower_alias[host["alias"].lower()] = host
524 self._hosts_by_lower_address[host["alias"].lower()] = host
526 self._logger.debug("Got %d hosts from core" % len(self._hosts_by_name))
527 self._got_config_from_core = self._get_core_start_time()
529 def _has_core_config_reloaded(self):
530 if not self._got_config_from_core:
531 return True
533 if self._get_core_start_time() > self._got_config_from_core:
534 return True
536 return False
538 def _get_core_start_time(self):
539 query = ("GET status\n" "Columns: program_start\n")
540 return livestatus.LocalConnection().query_value(query)
544 # .--Perfcounters--------------------------------------------------------.
545 # | ____ __ _ |
546 # | | _ \ ___ _ __ / _| ___ ___ _ _ _ __ | |_ ___ _ __ ___ |
547 # | | |_) / _ \ '__| |_ / __/ _ \| | | | '_ \| __/ _ \ '__/ __| |
548 # | | __/ __/ | | _| (_| (_) | |_| | | | | || __/ | \__ \ |
549 # | |_| \___|_| |_| \___\___/ \__,_|_| |_|\__\___|_| |___/ |
550 # | |
551 # +----------------------------------------------------------------------+
552 # | Helper class for performance counting |
553 # '----------------------------------------------------------------------'
556 def lerp(a, b, t):
557 """Linear interpolation between a and b with weight t"""
558 return (1 - t) * a + t * b
561 class Perfcounters(object):
562 _counter_names = [
563 "messages",
564 "rule_tries",
565 "rule_hits",
566 "drops",
567 "overflows",
568 "events",
569 "connects",
572 # Average processing times
573 _weights = {
574 "processing": 0.99, # event processing
575 "sync": 0.95, # Replication sync
576 "request": 0.95, # Client requests
579 # TODO: Why aren't self._times / self._rates / ... not initialized with their defaults?
580 def __init__(self, logger):
581 super(Perfcounters, self).__init__()
582 self._lock = ECLock(logger)
584 # Initialize counters
585 self._counters = dict([(n, 0) for n in self._counter_names])
587 self._old_counters = {}
588 self._rates = {}
589 self._average_rates = {}
590 self._times = {}
591 self._last_statistics = None
593 self._logger = logger.getChild("Perfcounters")
595 def count(self, counter):
596 with self._lock:
597 self._counters[counter] += 1
599 def count_time(self, counter, ptime):
600 with self._lock:
601 if counter in self._times:
602 self._times[counter] = lerp(ptime, self._times[counter], self._weights[counter])
603 else:
604 self._times[counter] = ptime
606 def do_statistics(self):
607 with self._lock:
608 now = time.time()
609 if self._last_statistics:
610 duration = now - self._last_statistics
611 else:
612 duration = 0
613 for name, value in self._counters.iteritems():
614 if duration:
615 delta = value - self._old_counters[name]
616 rate = delta / duration
617 self._rates[name] = rate
618 if name in self._average_rates:
619 # We could make the weight configurable
620 self._average_rates[name] = lerp(rate, self._average_rates[name], 0.9)
621 else:
622 self._average_rates[name] = rate
624 self._last_statistics = now
625 self._old_counters = self._counters.copy()
627 @classmethod
628 def status_columns(cls):
629 columns = []
630 # Please note: status_columns() and get_status() need to produce lists with exact same column order
631 for name in cls._counter_names:
632 columns.append(("status_" + name, 0))
633 columns.append(("status_" + name.rstrip("s") + "_rate", 0.0))
634 columns.append(("status_average_" + name.rstrip("s") + "_rate", 0.0))
636 for name in cls._weights:
637 columns.append(("status_average_%s_time" % name, 0.0))
639 return columns
641 def get_status(self):
642 with self._lock:
643 row = []
644 # Please note: status_columns() and get_status() need to produce lists with exact same column order
645 for name in self._counter_names:
646 row.append(self._counters[name])
647 row.append(self._rates.get(name, 0.0))
648 row.append(self._average_rates.get(name, 0.0))
650 for name in self._weights:
651 row.append(self._times.get(name, 0.0))
653 return row
657 # .--EventServer---------------------------------------------------------.
658 # | _____ _ ____ |
659 # | | ____|_ _____ _ __ | |_/ ___| ___ _ ____ _____ _ __ |
660 # | | _| \ \ / / _ \ '_ \| __\___ \ / _ \ '__\ \ / / _ \ '__| |
661 # | | |___ \ V / __/ | | | |_ ___) | __/ | \ V / __/ | |
662 # | |_____| \_/ \___|_| |_|\__|____/ \___|_| \_/ \___|_| |
663 # | |
664 # +----------------------------------------------------------------------+
665 # | Verarbeitung und Klassifizierung von eingehenden Events. |
666 # '----------------------------------------------------------------------'
669 class EventServer(ECServerThread):
670 month_names = {
671 "Jan": 1,
672 "Feb": 2,
673 "Mar": 3,
674 "Apr": 4,
675 "May": 5,
676 "Jun": 6,
677 "Jul": 7,
678 "Aug": 8,
679 "Sep": 9,
680 "Oct": 10,
681 "Nov": 11,
682 "Dec": 12,
685 def __init__(self, logger, settings, config, slave_status, perfcounters, lock_configuration,
686 history, event_status, event_columns):
687 super(EventServer, self).__init__(
688 name="EventServer",
689 logger=logger,
690 settings=settings,
691 config=config,
692 slave_status=slave_status,
693 profiling_enabled=settings.options.profile_event,
694 profile_file=settings.paths.event_server_profile.value)
695 self._syslog = None
696 self._syslog_tcp = None
697 self._snmptrap = None
699 self._rules = []
700 self._hash_stats = []
701 for _unused_facility in xrange(32):
702 self._hash_stats.append([0] * 8)
704 self.host_config = HostConfig(self._logger)
705 self._perfcounters = perfcounters
706 self._lock_configuration = lock_configuration
707 self._history = history
708 self._event_status = event_status
709 self._event_columns = event_columns
710 self._message_period = cmk.ec.history.ActiveHistoryPeriod()
711 self._rule_matcher = RuleMatcher(self._logger, config)
712 self._event_creator = EventCreator(self._logger, config)
714 self.create_pipe()
715 self.open_eventsocket()
716 self.open_syslog()
717 self.open_syslog_tcp()
718 self.open_snmptrap()
719 self._snmp_trap_engine = cmk.ec.snmp.SNMPTrapEngine(self.settings, self._config,
720 self._logger.getChild("snmp"),
721 self.handle_snmptrap)
723 @classmethod
724 def status_columns(cls):
725 columns = cls._general_columns()
726 columns += Perfcounters.status_columns()
727 columns += cls._replication_columns()
728 columns += cls._event_limit_columns()
729 return columns
731 @classmethod
732 def _general_columns(cls):
733 return [
734 ("status_config_load_time", 0),
735 ("status_num_open_events", 0),
736 ("status_virtual_memory_size", 0),
739 @classmethod
740 def _replication_columns(cls):
741 return [
742 ("status_replication_slavemode", ""),
743 ("status_replication_last_sync", 0.0),
744 ("status_replication_success", False),
747 @classmethod
748 def _event_limit_columns(cls):
749 return [
750 ("status_event_limit_host", 0),
751 ("status_event_limit_rule", 0),
752 ("status_event_limit_overall", 0),
753 ("status_event_limit_active_hosts", []),
754 ("status_event_limit_active_rules", []),
755 ("status_event_limit_active_overall", False),
758 def get_status(self):
759 row = []
761 row += self._add_general_status()
762 row += self._perfcounters.get_status()
763 row += self._add_replication_status()
764 row += self._add_event_limit_status()
766 return [row]
768 def _add_general_status(self):
769 return [
770 self._config["last_reload"],
771 self._event_status.num_existing_events,
772 self._virtual_memory_size(),
775 def _virtual_memory_size(self):
776 parts = file('/proc/self/stat').read().split()
777 return int(parts[22]) # in Bytes
779 def _add_replication_status(self):
780 if is_replication_slave(self._config):
781 return [
782 self._slave_status["mode"],
783 self._slave_status["last_sync"],
784 self._slave_status["success"],
786 return ["master", 0.0, False]
788 def _add_event_limit_status(self):
789 return [
790 self._config["event_limit"]["by_host"]["limit"],
791 self._config["event_limit"]["by_rule"]["limit"],
792 self._config["event_limit"]["overall"]["limit"],
793 self.get_hosts_with_active_event_limit(),
794 self.get_rules_with_active_event_limit(),
795 self.is_overall_event_limit_active(),
798 def create_pipe(self):
799 path = self.settings.paths.event_pipe.value
800 try:
801 if not path.is_fifo():
802 path.unlink()
803 except Exception:
804 pass
806 if not path.exists():
807 os.mkfifo(str(path))
809 # We want to be able to receive events from all users on the local system
810 path.chmod(0o666) # nosec
812 self._logger.info("Created FIFO '%s' for receiving events" % path)
814 def open_syslog(self):
815 endpoint = self.settings.options.syslog_udp
816 try:
817 if isinstance(endpoint, cmk.ec.settings.FileDescriptor):
818 self._syslog = socket.fromfd(endpoint.value, socket.AF_INET, socket.SOCK_DGRAM)
819 os.close(endpoint.value)
820 self._logger.info(
821 "Opened builtin syslog server on inherited filedescriptor %d" % endpoint.value)
822 if isinstance(endpoint, cmk.ec.settings.PortNumber):
823 self._syslog = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
824 self._syslog.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
825 self._syslog.bind(("0.0.0.0", endpoint.value))
826 self._logger.info("Opened builtin syslog server on UDP port %d" % endpoint.value)
827 except Exception as e:
828 raise Exception("Cannot start builtin syslog server: %s" % e)
830 def open_syslog_tcp(self):
831 endpoint = self.settings.options.syslog_tcp
832 try:
833 if isinstance(endpoint, cmk.ec.settings.FileDescriptor):
834 self._syslog_tcp = socket.fromfd(endpoint.value, socket.AF_INET, socket.SOCK_STREAM)
835 self._syslog_tcp.listen(20)
836 os.close(endpoint.value)
837 self._logger.info("Opened builtin syslog-tcp server on inherited filedescriptor %d"
838 % endpoint.value)
839 if isinstance(endpoint, cmk.ec.settings.PortNumber):
840 self._syslog_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
841 self._syslog_tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
842 self._syslog_tcp.bind(("0.0.0.0", endpoint.value))
843 self._syslog_tcp.listen(20)
844 self._logger.info(
845 "Opened builtin syslog-tcp server on TCP port %d" % endpoint.value)
846 except Exception as e:
847 raise Exception("Cannot start builtin syslog-tcp server: %s" % e)
849 def open_snmptrap(self):
850 endpoint = self.settings.options.snmptrap_udp
851 try:
852 if isinstance(endpoint, cmk.ec.settings.FileDescriptor):
853 self._snmptrap = socket.fromfd(endpoint.value, socket.AF_INET, socket.SOCK_DGRAM)
854 os.close(endpoint.value)
855 self._logger.info("Opened builtin snmptrap server on inherited filedescriptor %d" %
856 endpoint.value)
857 if isinstance(endpoint, cmk.ec.settings.PortNumber):
858 self._snmptrap = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
859 self._snmptrap.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
860 self._snmptrap.bind(("0.0.0.0", endpoint.value))
861 self._logger.info("Opened builtin snmptrap server on UDP port %d" % endpoint.value)
862 except Exception as e:
863 raise Exception("Cannot start builtin snmptrap server: %s" % e)
865 def open_eventsocket(self):
866 path = self.settings.paths.event_socket.value
867 if path.exists():
868 path.unlink()
869 path.parent.mkdir(parents=True, exist_ok=True)
870 self._eventsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
871 self._eventsocket.bind(str(path))
872 path.chmod(0o664)
873 self._eventsocket.listen(self._config['eventsocket_queue_len'])
874 self._logger.info("Opened UNIX socket '%s' for receiving events" % path)
876 def open_pipe(self):
877 # Beware: we must open the pipe also for writing. Otherwise
878 # we will see EOF forever after one writer has finished and
879 # select() will trigger even if there is no data. A good article
880 # about this is here:
881 # http://www.outflux.net/blog/archives/2008/03/09/using-select-on-a-fifo/
882 return os.open(str(self.settings.paths.event_pipe.value), os.O_RDWR | os.O_NONBLOCK)
884 def handle_snmptrap(self, trap, ipaddress):
885 self.process_event(self._event_creator.create_event_from_trap(trap, ipaddress))
887 def serve(self):
888 pipe_fragment = ''
889 pipe = self.open_pipe()
890 listen_list = [pipe]
892 # Wait for incoming syslog packets via UDP
893 if self._syslog is not None:
894 listen_list.append(self._syslog.fileno())
896 # Wait for new connections for events via TCP socket
897 if self._syslog_tcp is not None:
898 listen_list.append(self._syslog_tcp)
900 # Wait for new connections for events via unix socket
901 if self._eventsocket:
902 listen_list.append(self._eventsocket)
904 # Wait for incomding SNMP traps
905 if self._snmptrap is not None:
906 listen_list.append(self._snmptrap.fileno())
908 # Keep list of client connections via UNIX socket and
909 # read data that is not yet processed. Map from
910 # fd to (fileobject, data)
911 client_sockets = {}
912 select_timeout = 1
913 while not self._terminate_event.is_set():
914 try:
915 readable = select.select(listen_list + client_sockets.keys(), [], [],
916 select_timeout)[0]
917 except select.error as e:
918 if e[0] == errno.EINTR:
919 continue
920 raise
921 data = None
923 # Accept new connection on event unix socket
924 if self._eventsocket in readable:
925 client_socket, address = self._eventsocket.accept()
926 # pylint: disable=no-member
927 client_sockets[client_socket.fileno()] = (client_socket, address, "")
929 # Same for the TCP syslog socket
930 if self._syslog_tcp and self._syslog_tcp in readable:
931 client_socket, address = self._syslog_tcp.accept()
932 # pylint: disable=no-member
933 client_sockets[client_socket.fileno()] = (client_socket, address, "")
935 # Read data from existing event unix socket connections
936 # NOTE: We modify client_socket in the loop, so we need to copy below!
937 for fd, (cs, address, previous_data) in list(client_sockets.iteritems()):
938 if fd in readable:
939 # Receive next part of data
940 try:
941 new_data = cs.recv(4096)
942 except Exception:
943 new_data = ""
944 address = None
946 # Put together with incomplete messages from last time
947 data = previous_data + new_data
949 # Do we have incomplete data? (if the socket has been
950 # closed then we consider the pending message always
951 # as complete, even if there was no trailing \n)
952 if new_data and not data.endswith("\n"): # keep fragment
953 # Do we have any complete messages?
954 if '\n' in data:
955 complete, rest = data.rsplit("\n", 1)
956 self.process_raw_lines(complete + "\n", address)
957 else:
958 rest = data # keep for next time
960 # Only complete messages
961 else:
962 if data:
963 self.process_raw_lines(data, address)
964 rest = ""
966 # Connection still open?
967 if new_data:
968 client_sockets[fd] = (cs, address, rest)
969 else:
970 cs.close()
971 del client_sockets[fd]
973 # Read data from pipe
974 if pipe in readable:
975 try:
976 data = os.read(pipe, 4096)
977 if data:
978 # Prepend previous beginning of message to read data
979 data = pipe_fragment + data
980 pipe_fragment = ""
982 # Last message still incomplete?
983 if data[-1] != '\n':
984 if '\n' in data: # at least one complete message contained
985 messages, pipe_fragment = data.rsplit('\n', 1)
986 self.process_raw_lines(messages + '\n') # got lost in split
987 else:
988 pipe_fragment = data # keep beginning of message, wait for \n
989 else:
990 self.process_raw_lines(data)
991 else: # EOF
992 os.close(pipe)
993 pipe = self.open_pipe()
994 listen_list[0] = pipe
995 # Pending fragments from previos reads that are not terminated
996 # by a \n are ignored.
997 if pipe_fragment:
998 self._logger.warning(
999 "Ignoring incomplete message '%s' from pipe" % pipe_fragment)
1000 pipe_fragment = ""
1001 except Exception:
1002 pass
1004 # Read events from builtin syslog server
1005 if self._syslog is not None and self._syslog.fileno() in readable:
1006 self.process_raw_lines(*self._syslog.recvfrom(4096))
1008 # Read events from builtin snmptrap server
1009 if self._snmptrap is not None and self._snmptrap.fileno() in readable:
1010 try:
1011 message, sender_address = self._snmptrap.recvfrom(65535)
1012 self.process_raw_data(lambda: self._snmp_trap_engine.process_snmptrap(
1013 message, sender_address))
1014 except Exception:
1015 self._logger.exception(
1016 'Exception handling a SNMP trap from "%s". Skipping this one' %
1017 sender_address[0])
1019 try:
1020 # process the first spool file we get
1021 spool_file = next(self.settings.paths.spool_dir.value.glob('[!.]*'))
1022 self.process_raw_lines(spool_file.read_bytes())
1023 spool_file.unlink()
1024 select_timeout = 0 # enable fast processing to process further files
1025 except StopIteration:
1026 select_timeout = 1 # restore default select timeout
1028 # Processes incoming data, just a wrapper between the real data and the
1029 # handler function to record some statistics etc.
1030 def process_raw_data(self, handler):
1031 self._perfcounters.count("messages")
1032 before = time.time()
1033 # In replication slave mode (when not took over), ignore all events
1034 if not is_replication_slave(self._config) or self._slave_status["mode"] != "sync":
1035 handler()
1036 elif self.settings.options.debug:
1037 self._logger.info("Replication: we are in slave mode, ignoring event")
1038 elapsed = time.time() - before
1039 self._perfcounters.count_time("processing", elapsed)
1041 # Takes several lines of messages, handles encoding and processes them separated
1042 def process_raw_lines(self, data, address=None):
1043 lines = data.splitlines()
1044 for line in lines:
1045 line = scrub_and_decode(line.rstrip())
1046 if line:
1047 try:
1049 def handler(line=line):
1050 self.process_line(line, address)
1052 self.process_raw_data(handler)
1053 except Exception as e:
1054 self._logger.exception(
1055 'Exception handling a log line (skipping this one): %s' % e)
1057 def do_housekeeping(self):
1058 with self._event_status.lock:
1059 with self._lock_configuration:
1060 self.hk_handle_event_timeouts()
1061 self.hk_check_expected_messages()
1062 self.hk_cleanup_downtime_events()
1063 self._history.housekeeping()
1065 # For all events that have been created in a host downtime check the host
1066 # whether or not it is still in downtime. In case the downtime has ended
1067 # archive the events that have been created in a downtime.
1068 def hk_cleanup_downtime_events(self):
1069 host_downtimes = {}
1071 for event in self._event_status.events():
1072 if not event["host_in_downtime"]:
1073 continue # only care about events created in downtime
1075 try:
1076 in_downtime = host_downtimes[event["core_host"]]
1077 except KeyError:
1078 in_downtime = self._is_host_in_downtime(event)
1079 host_downtimes[event["core_host"]] = in_downtime
1081 if in_downtime:
1082 continue # (still) in downtime, don't delete any event
1084 self._logger.verbose(
1085 "Remove event %d (created in downtime, host left downtime)" % event["id"])
1086 self._event_status.remove_event(event)
1088 def hk_handle_event_timeouts(self):
1089 # 1. Automatically delete all events that are in state "counting"
1090 # and have not reached the required number of hits and whose
1091 # time is elapsed.
1092 # 2. Automatically delete all events that are in state "open"
1093 # and whose livetime is elapsed.
1094 events_to_delete = []
1095 events = self._event_status.events()
1096 now = time.time()
1097 for nr, event in enumerate(events):
1098 rule = self._rule_by_id.get(event["rule_id"])
1100 if event["phase"] == "counting":
1101 # Event belongs to a rule that does not longer exist? It
1102 # will never reach its count. Better delete it.
1103 if not rule:
1104 self._logger.info("Deleting orphaned event %d created by obsolete rule %s" %
1105 (event["id"], event["rule_id"]))
1106 event["phase"] = "closed"
1107 self._history.add(event, "ORPHANED")
1108 events_to_delete.append(nr)
1110 elif "count" not in rule and "expect" not in rule:
1111 self._logger.info(
1112 "Count-based event %d belonging to rule %s: rule does not "
1113 "count/expect anymore. Deleting event." % (event["id"], event["rule_id"]))
1114 event["phase"] = "closed"
1115 self._history.add(event, "NOCOUNT")
1116 events_to_delete.append(nr)
1118 # handle counting
1119 elif "count" in rule:
1120 count = rule["count"]
1121 if count.get("algorithm") in ["tokenbucket", "dynabucket"]:
1122 last_token = event.get("last_token", event["first"])
1123 secs_per_token = count["period"] / float(count["count"])
1124 if count["algorithm"] == "dynabucket": # get fewer tokens if count is lower
1125 if event["count"] <= 1:
1126 secs_per_token = count["period"]
1127 else:
1128 secs_per_token *= (float(count["count"]) / float(event["count"]))
1129 elapsed_secs = now - last_token
1130 new_tokens = int(elapsed_secs / secs_per_token)
1131 if new_tokens:
1132 if self.settings.options.debug:
1133 self._logger.info(
1134 "Rule %s/%s, event %d: got %d new tokens" %
1135 (rule["pack"], rule["id"], event["id"], new_tokens))
1136 event["count"] = max(0, event["count"] - new_tokens)
1137 event[
1138 "last_token"] = last_token + new_tokens * secs_per_token # not now! would be unfair
1139 if event["count"] == 0:
1140 self._logger.info(
1141 "Rule %s/%s, event %d: again without allowed rate, dropping event"
1142 % (rule["pack"], rule["id"], event["id"]))
1143 event["phase"] = "closed"
1144 self._history.add(event, "COUNTFAILED")
1145 events_to_delete.append(nr)
1147 else: # algorithm 'interval'
1148 if event["first"] + count["period"] <= now: # End of period reached
1149 self._logger.info(
1150 "Rule %s/%s: reached only %d out of %d events within %d seconds. "
1151 "Resetting to zero." % (rule["pack"], rule["id"], event["count"],
1152 count["count"], count["period"]))
1153 event["phase"] = "closed"
1154 self._history.add(event, "COUNTFAILED")
1155 events_to_delete.append(nr)
1157 # Handle delayed actions
1158 elif event["phase"] == "delayed":
1159 delay_until = event.get("delay_until", 0) # should always be present
1160 if now >= delay_until:
1161 self._logger.info("Delayed event %d of rule %s is now activated." %
1162 (event["id"], event["rule_id"]))
1163 event["phase"] = "open"
1164 self._history.add(event, "DELAYOVER")
1165 if rule:
1166 cmk.ec.actions.event_has_opened(self._history, self.settings, self._config,
1167 self._logger, self, self._event_columns,
1168 rule, event)
1169 if rule.get("autodelete"):
1170 event["phase"] = "closed"
1171 self._history.add(event, "AUTODELETE")
1172 events_to_delete.append(nr)
1174 else:
1175 self._logger.info("Cannot do rule action: rule %s not present anymore." %
1176 event["rule_id"])
1178 # Handle events with a limited lifetime
1179 elif "live_until" in event:
1180 if now >= event["live_until"]:
1181 allowed_phases = event.get("live_until_phases", ["open"])
1182 if event["phase"] in allowed_phases:
1183 event["phase"] = "closed"
1184 events_to_delete.append(nr)
1185 self._logger.info("Livetime of event %d (rule %s) exceeded. Deleting event."
1186 % (event["id"], event["rule_id"]))
1187 self._history.add(event, "EXPIRED")
1189 # Do delayed deletion now (was delayed in order to keep list indices OK)
1190 for nr in events_to_delete[::-1]:
1191 self._event_status.remove_event(events[nr])
1193 def hk_check_expected_messages(self):
1194 now = time.time()
1195 # "Expecting"-rules are rules that require one or several
1196 # occurrances of a message within a defined time period.
1197 # Whenever one period of time has elapsed, we need to check
1198 # how many messages have been seen for that rule. If these
1199 # are too few, we open an event.
1200 # We need to handle to cases:
1201 # 1. An event for such a rule already exists and is
1202 # in the state "counting" -> this can only be the case if
1203 # more than one occurrance is required.
1204 # 2. No event at all exists.
1205 # in that case.
1206 for rule in self._rules:
1207 if "expect" in rule:
1209 if not self._rule_matcher.event_rule_matches_site(rule, event=None):
1210 continue
1212 # Interval is either a number of seconds, or pair of a number of seconds
1213 # (e.g. 86400, meaning one day) and a timezone offset relative to UTC in hours.
1214 interval = rule["expect"]["interval"]
1215 expected_count = rule["expect"]["count"]
1217 interval_start = self._event_status.interval_start(rule["id"], interval)
1218 if interval_start >= now:
1219 continue
1221 next_interval_start = self._event_status.next_interval_start(
1222 interval, interval_start)
1223 if next_interval_start > now:
1224 continue
1226 # Interval has been elapsed. Now comes the truth: do we have enough
1227 # rule matches?
1229 # First do not forget to switch to next interval
1230 self._event_status.start_next_interval(rule["id"], interval)
1232 # First look for case 1: rule that already have at least one hit
1233 # and this events in the state "counting" exist.
1234 events_to_delete = []
1235 events = self._event_status.events()
1236 for nr, event in enumerate(events):
1237 if event["rule_id"] == rule["id"] and event["phase"] == "counting":
1238 # time has elapsed. Now lets see if we have reached
1239 # the neccessary count:
1240 if event["count"] < expected_count: # no -> trigger alarm
1241 self._handle_absent_event(rule, event["count"], expected_count,
1242 event["last"])
1243 else: # yes -> everything is fine. Just log.
1244 self._logger.info(
1245 "Rule %s/%s has reached %d occurrances (%d required). "
1246 "Starting next period." % (rule["pack"], rule["id"], event["count"],
1247 expected_count))
1248 self._history.add(event, "COUNTREACHED")
1249 # Counting event is no longer needed.
1250 events_to_delete.append(nr)
1251 break
1253 # Ou ou, no event found at all.
1254 else:
1255 self._handle_absent_event(rule, 0, expected_count, interval_start)
1257 for nr in events_to_delete[::-1]:
1258 self._event_status.remove_event(events[nr])
1260 def _handle_absent_event(self, rule, event_count, expected_count, interval_start):
1261 now = time.time()
1262 if event_count:
1263 text = "Expected message arrived only %d out of %d times since %s" % \
1264 (event_count, expected_count, time.strftime("%F %T", time.localtime(interval_start)))
1265 else:
1266 text = "Expected message did not arrive since %s" % \
1267 time.strftime("%F %T", time.localtime(interval_start))
1269 # If there is already an incidence about this absent message, we can merge and
1270 # not create a new event. There is a setting for this.
1271 merge_event = None
1272 merge = rule["expect"].get("merge", "open")
1273 if merge != "never":
1274 for event in self._event_status.events():
1275 if event["rule_id"] == rule["id"] and \
1276 (event["phase"] == "open" or
1277 (event["phase"] == "ack" and merge == "acked")):
1278 merge_event = event
1279 break
1281 if merge_event:
1282 merge_event["last"] = now
1283 merge_event["count"] += 1
1284 merge_event["phase"] = "open"
1285 merge_event["time"] = now
1286 merge_event["text"] = text
1287 # Better rewrite (again). Rule might have changed. Also we have changed
1288 # the text and the user might have his own text added via set_text.
1289 self.rewrite_event(rule, merge_event, {}, set_first=False)
1290 self._history.add(merge_event, "COUNTFAILED")
1291 else:
1292 # Create artifical event from scratch. Make sure that all important
1293 # fields are defined.
1294 event = {
1295 "rule_id": rule["id"],
1296 "text": text,
1297 "phase": "open",
1298 "count": 1,
1299 "time": now,
1300 "first": now,
1301 "last": now,
1302 "comment": "",
1303 "host": "",
1304 "ipaddress": "",
1305 "application": "",
1306 "pid": 0,
1307 "priority": 3,
1308 "facility": 1, # user
1309 "match_groups": (),
1310 "match_groups_syslog_application": (),
1311 "core_host": "",
1312 "host_in_downtime": False,
1314 self._add_rule_contact_groups_to_event(rule, event)
1315 self.rewrite_event(rule, event, {})
1316 self._event_status.new_event(event)
1317 self._history.add(event, "COUNTFAILED")
1318 cmk.ec.actions.event_has_opened(self._history, self.settings, self._config,
1319 self._logger, self, self._event_columns, rule, event)
1320 if rule.get("autodelete"):
1321 event["phase"] = "closed"
1322 self._history.add(event, "AUTODELETE")
1323 self._event_status.remove_event(event)
1325 def reload_configuration(self, config):
1326 self._config = config
1327 self._snmp_trap_engine = cmk.ec.snmp.SNMPTrapEngine(self.settings, self._config,
1328 self._logger.getChild("snmp"),
1329 self.handle_snmptrap)
1330 self.compile_rules(self._config["rules"], self._config["rule_packs"])
1331 self.host_config.initialize()
1333 # Precompile regular expressions and similar stuff. Also convert legacy
1334 # "rules" parameter into new "rule_packs" parameter
1335 def compile_rules(self, legacy_rules, rule_packs):
1336 self._rules = []
1337 self._rule_by_id = {}
1338 self._rule_hash = {} # Speedup-Hash for rule execution
1339 count_disabled = 0
1340 count_rules = 0
1341 count_unspecific = 0
1343 # Loop through all rule packages and with through their rules
1344 for rule_pack in rule_packs:
1345 if rule_pack["disabled"]:
1346 count_disabled += len(rule_pack["rules"])
1347 continue
1349 for rule in rule_pack["rules"]:
1350 if rule.get("disabled"):
1351 count_disabled += 1
1352 else:
1353 count_rules += 1
1354 rule = rule.copy() # keep original intact because of slave replication
1356 # Store information about rule pack right within the rule. This is needed
1357 # for debug output and also for skipping rule packs
1358 rule["pack"] = rule_pack["id"]
1359 self._rules.append(rule)
1360 self._rule_by_id[rule["id"]] = rule
1361 try:
1362 for key in [
1363 "match", "match_ok", "match_host", "match_application",
1364 "cancel_application"
1366 if key in rule:
1367 value = self._compile_matching_value(key, rule[key])
1368 if value is None:
1369 del rule[key]
1370 continue
1372 rule[key] = value
1374 if 'state' in rule and isinstance(rule['state'], tuple) \
1375 and rule['state'][0] == 'text_pattern':
1376 for key in ['2', '1', '0']:
1377 if key in rule['state'][1]:
1378 value = self._compile_matching_value(
1379 'state', rule['state'][1][key])
1380 if value is None:
1381 del rule['state'][1][key]
1382 else:
1383 rule['state'][1][key] = value
1385 except Exception as e:
1386 if self.settings.options.debug:
1387 raise
1388 rule["disabled"] = True
1389 count_disabled += 1
1390 self._logger.exception(
1391 "Ignoring rule '%s/%s' because of an invalid regex (%s)." %
1392 (rule["pack"], rule["id"], e))
1394 if self._config["rule_optimizer"]:
1395 self.hash_rule(rule)
1396 if "match_facility" not in rule \
1397 and "match_priority" not in rule \
1398 and "cancel_priority" not in rule \
1399 and "cancel_application" not in rule:
1400 count_unspecific += 1
1402 self._logger.info(
1403 "Compiled %d active rules (ignoring %d disabled rules)" % (count_rules, count_disabled))
1404 if self._config["rule_optimizer"]:
1405 self._logger.info("Rule hash: %d rules - %d hashed, %d unspecific" % (len(
1406 self._rules), len(self._rules) - count_unspecific, count_unspecific))
1407 for facility in range(23) + [31]:
1408 if facility in self._rule_hash:
1409 stats = []
1410 for prio, entries in self._rule_hash[facility].iteritems():
1411 stats.append("%s(%d)" % (SyslogPriority(prio), len(entries)))
1412 self._logger.info(" %-12s: %s" % (SyslogFacility(facility), " ".join(stats)))
1414 @staticmethod
1415 def _compile_matching_value(key, val):
1416 value = val.strip()
1417 # Remove leading .* from regex. This is redundant and
1418 # dramatically destroys performance when doing an infix search.
1419 if key in ["match", "match_ok"]:
1420 while value.startswith(".*") and not value.startswith(".*?"):
1421 value = value[2:]
1423 if not value:
1424 return None
1426 if cmk.utils.regex.is_regex(value):
1427 return re.compile(value, re.IGNORECASE)
1428 return val.lower()
1430 def hash_rule(self, rule):
1431 # Construct rule hash for faster execution.
1432 facility = rule.get("match_facility")
1433 if facility and not rule.get("invert_matching"):
1434 self.hash_rule_facility(rule, facility)
1435 else:
1436 for facility in xrange(32): # all syslog facilities
1437 self.hash_rule_facility(rule, facility)
1439 def hash_rule_facility(self, rule, facility):
1440 needed_prios = [False] * 8
1441 for key in ["match_priority", "cancel_priority"]:
1442 if key in rule:
1443 prio_from, prio_to = rule[key]
1444 # Beware: from > to!
1445 for p in xrange(prio_to, prio_from + 1):
1446 needed_prios[p] = True
1447 elif key == "match_priority": # all priorities match
1448 needed_prios = [True] * 8 # needed to check this rule for all event priorities
1449 elif "match_ok" in rule: # a cancelling rule where all priorities cancel
1450 needed_prios = [True] * 8 # needed to check this rule for all event priorities
1452 if rule.get("invert_matching"):
1453 needed_prios = [True] * 8
1455 prio_hash = self._rule_hash.setdefault(facility, {})
1456 for prio, need in enumerate(needed_prios):
1457 if need:
1458 prio_hash.setdefault(prio, []).append(rule)
1460 def output_hash_stats(self):
1461 self._logger.info("Top 20 of facility/priority:")
1462 entries = []
1463 total_count = 0
1464 for facility in xrange(32):
1465 for priority in xrange(8):
1466 count = self._hash_stats[facility][priority]
1467 if count:
1468 total_count += count
1469 entries.append((count, (facility, priority)))
1470 entries.sort()
1471 entries.reverse()
1472 for count, (facility, priority) in entries[:20]:
1473 self._logger.info(" %s/%s - %d (%.2f%%)" % (SyslogFacility(facility),
1474 SyslogPriority(priority), count,
1475 (100.0 * count / float(total_count))))
1477 def process_line(self, line, address):
1478 line = line.rstrip()
1479 if self._config["debug_rules"]:
1480 if address:
1481 self._logger.info(u"Processing message from %r: '%s'" % (address, line))
1482 else:
1483 self._logger.info(u"Processing message '%s'" % line)
1485 event = self._event_creator.create_event_from_line(line, address)
1486 self.process_event(event)
1488 def process_event(self, event):
1489 self.do_translate_hostname(event)
1491 # Log all incoming messages into a syslog-like text file if that is enabled
1492 if self._config["log_messages"]:
1493 self.log_message(event)
1495 # Rule optimizer
1496 if self._config["rule_optimizer"]:
1497 self._hash_stats[event["facility"]][event["priority"]] += 1
1498 rule_candidates = self._rule_hash.get(event["facility"], {}).get(event["priority"], [])
1499 else:
1500 rule_candidates = self._rules
1502 skip_pack = None
1503 for rule in rule_candidates:
1504 if skip_pack and rule["pack"] == skip_pack:
1505 continue # still in the rule pack that we want to skip
1506 skip_pack = None # new pack, reset skipping
1508 try:
1509 result = self.event_rule_matches(rule, event)
1510 except Exception as e:
1511 self._logger.exception(' Exception during matching:\n%s' % e)
1512 result = False
1514 if result: # A tuple with (True/False, {match_info}).. O.o
1515 self._perfcounters.count("rule_hits")
1516 cancelling, match_groups = result
1518 if self._config["debug_rules"]:
1519 self._logger.info(" matching groups:\n%s" % pprint.pformat(match_groups))
1521 self._event_status.count_rule_match(rule["id"])
1522 if self._config["log_rulehits"]:
1523 self._logger.info("Rule '%s/%s' hit by message %s/%s - '%s'." %
1524 (rule["pack"], rule["id"], SyslogFacility(event["facility"]),
1525 SyslogPriority(event["priority"]), event["text"]))
1527 if rule.get("drop"):
1528 if rule["drop"] == "skip_pack":
1529 skip_pack = rule["pack"]
1530 if self._config["debug_rules"]:
1531 self._logger.info(" skipping this rule pack (%s)" % skip_pack)
1532 continue
1533 else:
1534 self._perfcounters.count("drops")
1535 return
1537 if cancelling:
1538 self._event_status.cancel_events(self, self._event_columns, event, match_groups,
1539 rule)
1540 return
1541 else:
1542 # Remember the rule id that this event originated from
1543 event["rule_id"] = rule["id"]
1545 # Lookup the monitoring core hosts and add the core host
1546 # name to the event when one can be matched
1547 # For the moment we have no rule/condition matching on this
1548 # field. So we only add the core host info for matched events.
1549 self._add_core_host_to_new_event(event)
1551 # Attach optional contact group information for visibility
1552 # and eventually for notifications
1553 self._add_rule_contact_groups_to_event(rule, event)
1555 # Store groups from matching this event. In order to make
1556 # persistence easier, we do not safe them as list but join
1557 # them on ASCII-1.
1558 event["match_groups"] = match_groups.get("match_groups_message", ())
1559 event["match_groups_syslog_application"] = match_groups.get(
1560 "match_groups_syslog_application", ())
1561 self.rewrite_event(rule, event, match_groups)
1563 if "count" in rule:
1564 count = rule["count"]
1565 # Check if a matching event already exists that we need to
1566 # count up. If the count reaches the limit, the event will
1567 # be opened and its rule actions performed.
1568 existing_event = \
1569 self._event_status.count_event(self, event, rule, count)
1570 if existing_event:
1571 if "delay" in rule:
1572 if self._config["debug_rules"]:
1573 self._logger.info("Event opening will be delayed for %d seconds"
1574 % rule["delay"])
1575 existing_event["delay_until"] = time.time() + rule["delay"]
1576 existing_event["phase"] = "delayed"
1577 else:
1578 cmk.ec.actions.event_has_opened(
1579 self._history, self.settings, self._config, self._logger, self,
1580 self._event_columns, rule, existing_event)
1582 self._history.add(existing_event, "COUNTREACHED")
1584 if "delay" not in rule and rule.get("autodelete"):
1585 existing_event["phase"] = "closed"
1586 self._history.add(existing_event, "AUTODELETE")
1587 with self._event_status.lock:
1588 self._event_status.remove_event(existing_event)
1589 elif "expect" in rule:
1590 self._event_status.count_expected_event(self, event)
1591 else:
1592 if "delay" in rule:
1593 if self._config["debug_rules"]:
1594 self._logger.info(
1595 "Event opening will be delayed for %d seconds" % rule["delay"])
1596 event["delay_until"] = time.time() + rule["delay"]
1597 event["phase"] = "delayed"
1598 else:
1599 event["phase"] = "open"
1601 if self.new_event_respecting_limits(event):
1602 if event["phase"] == "open":
1603 cmk.ec.actions.event_has_opened(self._history, self.settings,
1604 self._config, self._logger, self,
1605 self._event_columns, rule, event)
1606 if rule.get("autodelete"):
1607 event["phase"] = "closed"
1608 self._history.add(event, "AUTODELETE")
1609 with self._event_status.lock:
1610 self._event_status.remove_event(event)
1611 return
1613 # End of loop over rules.
1614 if self._config["archive_orphans"]:
1615 self._event_status.archive_event(event)
1617 def _add_rule_contact_groups_to_event(self, rule, event):
1618 if rule.get("contact_groups") is None:
1619 event.update({
1620 "contact_groups": None,
1621 "contact_groups_notify": False,
1622 "contact_groups_precedence": "host",
1624 else:
1625 event.update({
1626 "contact_groups": rule["contact_groups"]["groups"],
1627 "contact_groups_notify": rule["contact_groups"]["notify"],
1628 "contact_groups_precedence": rule["contact_groups"]["precedence"],
1631 def add_core_host_to_event(self, event):
1632 matched_host = self.host_config.get_by_event_host_name(event["host"])
1633 if not matched_host:
1634 event["core_host"] = ""
1635 return
1637 event["core_host"] = matched_host["name"]
1639 def _add_core_host_to_new_event(self, event):
1640 self.add_core_host_to_event(event)
1642 # Add some state dependent information (like host is in downtime etc.)
1643 event["host_in_downtime"] = self._is_host_in_downtime(event)
1645 def _is_host_in_downtime(self, event):
1646 if not event["core_host"]:
1647 return False # Found no host in core: Not in downtime!
1649 query = ("GET hosts\n"
1650 "Columns: scheduled_downtime_depth\n"
1651 "Filter: host_name = %s\n" % (event["core_host"]))
1653 try:
1654 return livestatus.LocalConnection().query_value(query) >= 1
1656 except livestatus.MKLivestatusNotFoundError:
1657 return False
1659 except Exception:
1660 if cmk.utils.debug.enabled():
1661 raise
1662 return False
1664 # Checks if an event matches a rule. Returns either False (no match)
1665 # or a pair of matchtype, groups, where matchtype is False for a
1666 # normal match and True for a cancelling match and the groups is a tuple
1667 # if matched regex groups in either text (normal) or match_ok (cancelling)
1668 # match.
1669 def event_rule_matches(self, rule, event):
1670 self._perfcounters.count("rule_tries")
1671 with self._lock_configuration:
1672 result = self._rule_matcher.event_rule_matches_non_inverted(rule, event)
1673 if rule.get("invert_matching"):
1674 if result is False:
1675 result = False, {}
1676 if self._config["debug_rules"]:
1677 self._logger.info(
1678 " Rule would not match, but due to inverted matching does.")
1679 else:
1680 result = False
1681 if self._config["debug_rules"]:
1682 self._logger.info(
1683 " Rule would match, but due to inverted matching does not.")
1685 return result
1687 # Rewrite texts and compute other fields in the event
1688 def rewrite_event(self, rule, event, groups, set_first=True):
1689 if rule["state"] == -1:
1690 prio = event["priority"]
1691 if prio >= 5:
1692 event["state"] = 0
1693 elif prio < 4:
1694 event["state"] = 2
1695 else:
1696 event["state"] = 1
1697 elif isinstance(rule["state"], tuple) and rule["state"][0] == "text_pattern":
1698 for key in ['2', '1', '0', '3']:
1699 if key in rule["state"][1]:
1700 match_groups = match(rule["state"][1][key], event["text"], complete=False)
1701 if match_groups is not False:
1702 event["state"] = int(key)
1703 break
1704 elif key == '3': # No rule matched!
1705 event["state"] = 3
1706 else:
1707 event["state"] = rule["state"]
1709 if ("sl" not in event) or (rule["sl"]["precedence"] == "rule"):
1710 event["sl"] = rule["sl"]["value"]
1711 if set_first:
1712 event["first"] = event["time"]
1713 event["last"] = event["time"]
1714 if "set_comment" in rule:
1715 event["comment"] = replace_groups(rule["set_comment"], event["text"], groups)
1716 if "set_text" in rule:
1717 event["text"] = replace_groups(rule["set_text"], event["text"], groups)
1718 if "set_host" in rule:
1719 event["orig_host"] = event["host"]
1720 event["host"] = replace_groups(rule["set_host"], event["host"], groups)
1721 if "set_application" in rule:
1722 event["application"] = replace_groups(rule["set_application"], event["application"],
1723 groups)
1724 if "set_contact" in rule and "contact" not in event:
1725 event["contact"] = replace_groups(rule["set_contact"], event.get("contact", ""), groups)
1727 # Translate a hostname if this is configured. We are
1728 # *really* sorry: this code snipped is copied from modules/check_mk_base.py.
1729 # There is still no common library. Please keep this in sync with the
1730 # original code
1731 def translate_hostname(self, backedhost):
1732 translation = self._config["hostname_translation"]
1734 # Here comes the original code from modules/check_mk_base.py
1735 if translation:
1736 # 1. Case conversion
1737 caseconf = translation.get("case")
1738 if caseconf == "upper":
1739 backedhost = backedhost.upper()
1740 elif caseconf == "lower":
1741 backedhost = backedhost.lower()
1743 # 2. Drop domain part (not applied to IP addresses!)
1744 if translation.get("drop_domain") and backedhost:
1745 # only apply if first part does not convert successfully into an int
1746 firstpart = backedhost.split(".", 1)[0]
1747 try:
1748 int(firstpart)
1749 except Exception:
1750 backedhost = firstpart
1752 # 3. Regular expression conversion
1753 if "regex" in translation:
1754 for regex, subst in translation["regex"]:
1755 if not regex.endswith('$'):
1756 regex += '$'
1757 rcomp = cmk.utils.regex.regex(regex)
1758 mo = rcomp.match(backedhost)
1759 if mo:
1760 backedhost = subst
1761 for nr, text in enumerate(mo.groups()):
1762 backedhost = backedhost.replace("\\%d" % (nr + 1), text)
1763 break
1765 # 4. Explicity mapping
1766 for from_host, to_host in translation.get("mapping", []):
1767 if from_host == backedhost:
1768 backedhost = to_host
1769 break
1771 return backedhost
1773 def do_translate_hostname(self, event):
1774 try:
1775 event["host"] = self.translate_hostname(event["host"])
1776 except Exception as e:
1777 if self._config["debug_rules"]:
1778 self._logger.exception('Unable to parse host "%s" (%s)' % (event.get("host"), e))
1779 event["host"] = ""
1781 def log_message(self, event):
1782 try:
1783 with cmk.ec.history.get_logfile(self._config, self.settings.paths.messages_dir.value,
1784 self._message_period).open(mode='ab') as f:
1785 f.write("%s %s %s%s: %s\n" % (time.strftime("%b %d %H:%M:%S",
1786 time.localtime(event["time"])),
1787 event["host"], event["application"], event["pid"] and
1788 ("[%s]" % event["pid"]) or "", event["text"]))
1789 except Exception:
1790 if self.settings.options.debug:
1791 raise
1792 # Better silently ignore errors. We could have run out of
1793 # diskspace and make things worse by logging that we could
1794 # not log.
1796 def get_hosts_with_active_event_limit(self):
1797 hosts = []
1798 for hostname, num_existing_events in self._event_status.num_existing_events_by_host.iteritems(
1800 if num_existing_events >= self._config["event_limit"]["by_host"]["limit"]:
1801 hosts.append(hostname)
1802 return hosts
1804 def get_rules_with_active_event_limit(self):
1805 rule_ids = []
1806 for rule_id, num_existing_events in self._event_status.num_existing_events_by_rule.iteritems(
1808 if rule_id is None:
1809 continue # Ignore rule unrelated overflow events. They have no rule id associated.
1810 if num_existing_events >= self._config["event_limit"]["by_rule"]["limit"]:
1811 rule_ids.append(rule_id)
1812 return rule_ids
1814 def is_overall_event_limit_active(self):
1815 return self._event_status.num_existing_events \
1816 >= self._config["event_limit"]["overall"]["limit"]
1818 # protected by self._event_status.lock
1819 def new_event_respecting_limits(self, event):
1820 self._logger.verbose(
1821 "Checking limit for message from %s (rule '%s')" % (event["host"], event["rule_id"]))
1823 with self._event_status.lock:
1824 if self._handle_event_limit("overall", event):
1825 return False
1827 if self._handle_event_limit("by_host", event):
1828 return False
1830 if self._handle_event_limit("by_rule", event):
1831 return False
1833 self._event_status.new_event(event)
1834 return True
1836 # The following actions can be configured:
1837 # stop Stop creating new events
1838 # stop_overflow Stop creating new events, create overflow event
1839 # stop_overflow_notify Stop creating new events, create overflow event, notfy
1840 # delete_oldest Delete oldest event, create new event
1841 # protected by self._event_status.lock
1843 # Returns False if the event has been created and actions should be
1844 # performed on that event
1845 def _handle_event_limit(self, ty, event):
1846 assert ty in ["overall", "by_rule", "by_host"]
1848 num_already_open = self._event_status.get_num_existing_events_by(ty, event)
1849 limit, action = self._get_event_limit(ty, event)
1850 self._logger.verbose(
1851 " Type: %s, already open events: %d, Limit: %d" % (ty, num_already_open, limit))
1853 # Limit not reached: add new event
1854 if num_already_open < limit:
1855 num_already_open += 1 # after adding this event
1857 # Limit even then still not reached: we are fine
1858 if num_already_open < limit:
1859 return False
1861 # Delete oldest messages if that is the configure method of keeping the limit
1862 if action == "delete_oldest":
1863 while num_already_open > limit:
1864 self._perfcounters.count("overflows")
1865 self._event_status.remove_oldest_event(ty, event)
1866 num_already_open -= 1
1867 return False
1869 # Limit reached already in the past: Simply drop silently
1870 if num_already_open > limit:
1871 # Just log in verbose mode! Otherwise log file will be flooded
1872 self._logger.verbose(" Skip processing because limit is already in effect")
1873 self._perfcounters.count("overflows")
1874 return True # Prevent creation and prevent one time actions (below)
1876 self._logger.info(" The %s limit has been reached" % ty)
1878 # This is the event which reached the limit, allow creation of it. Further
1879 # events will be stopped.
1881 # Perform one time actions
1882 overflow_event = self._create_overflow_event(ty, event, limit)
1884 if "overflow" in action:
1885 self._logger.info(" Creating overflow event")
1886 self._event_status.new_event(overflow_event)
1888 if "notify" in action:
1889 self._logger.info(" Creating overflow notification")
1890 cmk.ec.actions.do_notify(self, self._logger, overflow_event)
1892 return False
1894 # protected by self._event_status.lock
1895 def _get_event_limit(self, ty, event):
1896 # Prefer the rule individual limit for by_rule limit (in case there is some)
1897 if ty == "by_rule":
1898 rule_limit = self._rule_by_id[event["rule_id"]].get("event_limit")
1899 if rule_limit:
1900 return rule_limit["limit"], rule_limit["action"]
1902 # Prefer the host individual limit for by_host limit (in case there is some)
1903 if ty == "by_host":
1904 host_config = self.host_config.get(event["core_host"], {})
1905 host_limit = host_config.get("custom_variables", {}).get("EC_EVENT_LIMIT")
1906 if host_limit:
1907 limit, action = host_limit.split(":", 1)
1908 return int(limit), action
1910 limit = self._config["event_limit"][ty]["limit"]
1911 action = self._config["event_limit"][ty]["action"]
1913 return limit, action
1915 def _create_overflow_event(self, ty, event, limit):
1916 now = time.time()
1917 new_event = {
1918 "rule_id": None,
1919 "phase": "open",
1920 "count": 1,
1921 "time": now,
1922 "first": now,
1923 "last": now,
1924 "comment": "",
1925 "host": "",
1926 "ipaddress": "",
1927 "application": "Event Console",
1928 "pid": 0,
1929 "priority": 2, # crit
1930 "facility": 1, # user
1931 "match_groups": (),
1932 "match_groups_syslog_application": (),
1933 "state": 2, # crit
1934 "sl": event["sl"],
1935 "core_host": "",
1936 "host_in_downtime": False,
1938 self._add_rule_contact_groups_to_event({}, new_event)
1940 if ty == "overall":
1941 new_event["text"] = ("The overall event limit of %d open events has been reached. Not "
1942 "opening any additional event until open events have been "
1943 "archived." % limit)
1945 elif ty == "by_host":
1946 new_event.update({
1947 "host": event["host"],
1948 "ipaddress": event["ipaddress"],
1949 "text": ("The host event limit of %d open events has been reached for host \"%s\". "
1950 "Not opening any additional event for this host until open events have "
1951 "been archived." % (limit, event["host"]))
1954 # Lookup the monitoring core hosts and add the core host
1955 # name to the event when one can be matched
1956 self._add_core_host_to_new_event(new_event)
1958 elif ty == "by_rule":
1959 new_event.update({
1960 "rule_id": event["rule_id"],
1961 "contact_groups": event["contact_groups"],
1962 "contact_groups_notify": event.get("contact_groups_notify", False),
1963 "contact_groups_precedence": event.get("contact_groups_precedence", "host"),
1964 "text": ("The rule event limit of %d open events has been reached for rule \"%s\". "
1965 "Not opening any additional event for this rule until open events have "
1966 "been archived." % (limit, event["rule_id"]))
1969 else:
1970 raise NotImplementedError()
1972 return new_event
1975 class EventCreator(object):
1976 def __init__(self, logger, config):
1977 super(EventCreator, self).__init__()
1978 self._logger = logger
1979 self._config = config
1981 def create_event_from_line(self, line, address):
1982 event = {
1983 # address is either None or a tuple of (ipaddress, port)
1984 "ipaddress": address and address[0] or "",
1985 "core_host": "",
1986 "host_in_downtime": False,
1988 try:
1989 # Variant 1: plain syslog message without priority/facility:
1990 # May 26 13:45:01 Klapprechner CRON[8046]: message....
1992 # Variant 1a: plain syslog message without priority/facility/host:
1993 # May 26 13:45:01 Klapprechner CRON[8046]: message....
1995 # Variant 2: syslog message including facility (RFC 3164)
1996 # <78>May 26 13:45:01 Klapprechner CRON[8046]: message....
1998 # Variant 3: local Nagios alert posted by mkevent -n
1999 # <154>@1341847712;5;Contact Info; MyHost My Service: CRIT - This che
2001 # Variant 4: remote Nagios alert posted by mkevent -n -> syslog
2002 # <154>Jul 9 17:28:32 Klapprechner @1341847712;5;Contact Info; MyHost My Service: CRIT - This che
2004 # Variant 5: syslog message
2005 # Timestamp is RFC3339 with additional restrictions:
2006 # - The "T" and "Z" characters in this syntax MUST be upper case.
2007 # - Usage of the "T" character is REQUIRED.
2008 # - Leap seconds MUST NOT be used.
2009 # <166>2013-04-05T13:49:31.685Z esx Vpxa: message....
2011 # Variant 6: syslog message without date / host:
2012 # <5>SYSTEM_INFO: [WLAN-1] Triggering Background Scan
2014 # Variant 7: logwatch.ec event forwarding
2015 # <78>@1341847712 Klapprechner /var/log/syslog: message....
2017 # Variant 7a: Event simulation
2018 # <%PRI%>@%TIMESTAMP%;%SL% %HOSTNAME% %syslogtag%: %msg%
2020 # Variant 8: syslog message from sophos firewall
2021 # <84>2015:03:25-12:02:06 gw pluto[7122]: listening for IKE messages
2023 # Variant 9: syslog message (RFC 5424)
2024 # <134>1 2016-06-02T12:49:05.181+02:00 chrissw7 ChrisApp - TestID - coming from java code
2026 # Variant 10:
2027 # 2016 May 26 15:41:47 IST XYZ Ebra: %LINEPROTO-5-UPDOWN: Line protocol on Interface Ethernet45 (XXX.ASAD.Et45), changed state to up
2028 # year month day hh:mm:ss timezone HOSTNAME KeyAgent:
2030 # FIXME: Would be better to parse the syslog messages in another way:
2031 # Split the message by the first ":", then split the syslog header part
2032 # and detect which information are present. Take a look at the syslog RFCs
2033 # for details.
2035 # Variant 2,3,4,5,6,7,8
2036 if line.startswith('<'):
2037 i = line.find('>')
2038 prio = int(line[1:i])
2039 line = line[i + 1:]
2040 event["facility"] = prio >> 3
2041 event["priority"] = prio & 7
2043 # Variant 1,1a
2044 else:
2045 event["facility"] = 1 # user
2046 event["priority"] = 5 # notice
2048 # Variant 7 and 7a
2049 if line[0] == '@' and line[11] in [' ', ';']:
2050 details, event['host'], line = line.split(' ', 2)
2051 detail_tokens = details.split(';')
2052 timestamp = detail_tokens[0]
2053 if len(detail_tokens) > 1:
2054 event["sl"] = int(detail_tokens[1])
2055 event['time'] = float(timestamp[1:])
2056 event.update(self._parse_syslog_info(line))
2058 # Variant 3
2059 elif line.startswith("@"):
2060 event.update(self._parse_monitoring_info(line))
2062 # Variant 5
2063 elif len(line) > 24 and line[10] == 'T':
2064 # There is no 3339 parsing built into python. We do ignore subseconds and timezones
2065 # here. This is seems to be ok for the moment - sorry. Please drop a note if you
2066 # got a good solutuion for this.
2067 rfc3339_part, event['host'], line = line.split(' ', 2)
2068 event['time'] = time.mktime(time.strptime(rfc3339_part[:19], '%Y-%m-%dT%H:%M:%S'))
2069 event.update(self._parse_syslog_info(line))
2071 # Variant 9
2072 elif len(line) > 24 and line[12] == "T":
2073 event.update(self._parse_rfc5424_syslog_info(line))
2075 # Variant 8
2076 elif line[10] == '-' and line[19] == ' ':
2077 event['host'] = line.split(' ')[1]
2078 event['time'] = time.mktime(time.strptime(line.split(' ')[0], '%Y:%m:%d-%H:%M:%S'))
2079 rest = " ".join(line.split(' ')[2:])
2080 event.update(self._parse_syslog_info(rest))
2082 # Variant 6
2083 elif len(line.split(': ', 1)[0].split(' ')) == 1:
2084 event.update(self._parse_syslog_info(line))
2085 # There is no datetime information in the message, use current time
2086 event['time'] = time.time()
2087 # There is no host information, use the provided address
2088 if address and isinstance(address, tuple):
2089 event["host"] = address[0]
2091 # Variant 10
2092 elif line[4] == " " and line[:4].isdigit():
2093 time_part = line[:20] # ignoring tz info
2094 event["host"], application, line = line[25:].split(" ", 2)
2095 event["application"] = application.rstrip(":")
2096 event["text"] = line
2097 event['time'] = time.mktime(time.strptime(time_part, '%Y %b %d %H:%M:%S'))
2099 # Variant 1,1a,2,4
2100 else:
2101 month_name, day, timeofday, rest = line.split(None, 3)
2103 # Special handling for variant 1a. Detect whether or not host
2104 # is a hostname or syslog tag
2105 host, tmp_rest = rest.split(None, 1)
2106 if host.endswith(":"):
2107 # There is no host information sent, use the source address as "host"
2108 host = address[0]
2109 else:
2110 # Use the extracted host and continue with the remaining message text
2111 rest = tmp_rest
2113 event["host"] = host
2115 # Variant 4
2116 if rest.startswith("@"):
2117 event.update(self._parse_monitoring_info(rest))
2119 # Variant 1, 2
2120 else:
2121 event.update(self._parse_syslog_info(rest))
2123 month = EventServer.month_names[month_name]
2124 day = int(day)
2126 # Nasty: the year is not contained in the message. We cannot simply
2127 # assume that the message if from the current year.
2128 lt = time.localtime()
2129 if lt.tm_mon < 6 and month > 6: # Assume that message is from last year
2130 year = lt.tm_year - 1
2131 else:
2132 year = lt.tm_year # Assume the current year
2134 hours, minutes, seconds = map(int, timeofday.split(":"))
2136 # A further problem here: we do not now whether the message is in DST or not
2137 event["time"] = time.mktime((year, month, day, hours, minutes, seconds, 0, 0,
2138 lt.tm_isdst))
2140 # The event simulator ships the simulated original IP address in the
2141 # hostname field, separated with a pipe, e.g. "myhost|1.2.3.4"
2142 if "|" in event["host"]:
2143 event["host"], event["ipaddress"] = event["host"].split("|", 1)
2145 except Exception as e:
2146 if self._config["debug_rules"]:
2147 self._logger.exception('Got non-syslog message "%s" (%s)' % (line, e))
2148 event = {
2149 "facility": 1,
2150 "priority": 0,
2151 "text": line,
2152 "host": "",
2153 "ipaddress": address and address[0] or "",
2154 "application": "",
2155 "pid": 0,
2156 "time": time.time(),
2157 "core_host": "",
2158 "host_in_downtime": False,
2161 if self._config["debug_rules"]:
2162 self._logger.info('Parsed message:\n' + ("".join(
2163 [" %-15s %s\n" % (k + ":", v) for (k, v) in sorted(event.iteritems())])).rstrip())
2165 return event
2167 def _parse_rfc5424_syslog_info(self, line):
2168 event = {}
2170 (_unused_version, timestamp, hostname, app_name, procid, _unused_msgid, rest) = line.split(
2171 " ", 6)
2173 # There is no 3339 parsing built into python. We do ignore subseconds and timezones
2174 # here. This is seems to be ok for the moment - sorry. Please drop a note if you
2175 # got a good solutuion for this.
2176 event['time'] = time.mktime(time.strptime(timestamp[:19], '%Y-%m-%dT%H:%M:%S'))
2178 if hostname != "-":
2179 event["host"] = hostname
2181 if app_name != "-":
2182 event["application"] = app_name
2184 if procid != "-":
2185 event["pid"] = procid
2187 if rest[0] == "[":
2188 # has stuctured data
2189 structured_data, message = rest[1:].split("] ", 1)
2190 elif rest.startswith("- "):
2191 # has no stuctured data
2192 structured_data, message = rest.split(" ", 1)
2193 else:
2194 raise Exception("Invalid RFC 5424 syslog message")
2196 if structured_data != "-":
2197 event["text"] = "[%s] %s" % (structured_data, message)
2198 else:
2199 event["text"] = message
2201 return event
2203 def _parse_syslog_info(self, line):
2204 event = {}
2205 # Replaced ":" by ": " here to make tags with ":" possible. This
2206 # is needed to process logs generated by windows agent logfiles
2207 # like "c://test.log".
2208 tag, message = line.split(": ", 1)
2209 event["text"] = message.strip()
2211 if '[' in tag:
2212 app, pid = tag.split('[', 1)
2213 pid = pid.rstrip(']')
2214 else:
2215 app = tag
2216 pid = 0
2218 event["application"] = app
2219 event["pid"] = pid
2220 return event
2222 def _parse_monitoring_info(self, line):
2223 event = {}
2224 # line starts with '@'
2225 if line[11] == ';':
2226 timestamp_str, sl, contact, rest = line[1:].split(';', 3)
2227 host, rest = rest.split(None, 1)
2228 if len(sl):
2229 event["sl"] = int(sl)
2230 if len(contact):
2231 event["contact"] = contact
2232 else:
2233 timestamp_str, host, rest = line[1:].split(None, 2)
2235 event["time"] = float(int(timestamp_str))
2236 service, message = rest.split(": ", 1)
2237 event["application"] = service
2238 event["text"] = message.strip()
2239 event["host"] = host
2240 return event
2242 def create_event_from_trap(self, trap, ipaddress):
2243 # use the trap-oid as application
2244 application = u''
2245 for index, (oid, _unused_val) in enumerate(trap):
2246 if oid in ['1.3.6.1.6.3.1.1.4.1.0', 'SNMPv2-MIB::snmpTrapOID.0']:
2247 application = scrub_and_decode(trap.pop(index)[1])
2248 break
2250 # once we got here we have a real parsed trap which we convert to an event now
2251 safe_ipaddress = scrub_and_decode(ipaddress)
2252 text = scrub_and_decode(', '.join(['%s: %s' % (item[0], str(item[1])) for item in trap]))
2254 event = {
2255 'time': time.time(),
2256 'host': safe_ipaddress,
2257 'ipaddress': safe_ipaddress,
2258 'priority': 5, # notice
2259 'facility': 31, # not used by syslog -> we use this for all traps
2260 'application': application,
2261 'text': text,
2262 'core_host': '',
2263 'host_in_downtime': False,
2266 return event
2269 class RuleMatcher(object):
2270 def __init__(self, logger, config):
2271 super(RuleMatcher, self).__init__()
2272 self._logger = logger
2273 self._config = config
2274 self._time_periods = TimePeriods(logger)
2276 @property
2277 def _debug_rules(self):
2278 return self._config["debug_rules"]
2280 def event_rule_matches_non_inverted(self, rule, event):
2281 if self._debug_rules:
2282 self._logger.info("Trying rule %s/%s..." % (rule["pack"], rule["id"]))
2283 self._logger.info(" Text: %s" % event["text"])
2284 self._logger.info(" Syslog: %d.%d" % (event["facility"], event["priority"]))
2285 self._logger.info(" Host: %s" % event["host"])
2287 # Generic conditions without positive/canceling matches
2288 if not self.event_rule_matches_generic(rule, event):
2289 return False
2291 # Determine syslog priority
2292 match_priority = {}
2293 if not self.event_rule_determine_match_priority(rule, event, match_priority):
2294 # Abort on negative outcome, neither positive nor negative
2295 return False
2297 # Determine and cleanup match_groups
2298 match_groups = {}
2299 if not self.event_rule_determine_match_groups(rule, event, match_groups):
2300 # Abort on negative outcome, neither positive nor negative
2301 return False
2303 return self._check_match_outcome(rule, match_groups, match_priority)
2305 def _check_match_outcome(self, rule, match_groups, match_priority):
2306 # type: (Dict[str, Any], Dict[str, Any], Dict[str, Any]) -> Union[bool, Tuple[bool, Dict[str, Any]]]
2307 """Decide or not a event is created, canceled or nothing is done"""
2309 # Check canceling-event
2310 has_canceling_condition = bool(
2311 [x for x in ["match_ok", "cancel_application", "cancel_priority"] if x in rule])
2312 if has_canceling_condition:
2313 if ("match_ok" not in rule or match_groups.get("match_groups_message_ok", False) is not False) and\
2314 ("cancel_application" not in rule or
2315 match_groups.get("match_groups_syslog_application_ok", False) is not False) and\
2316 ("cancel_priority" not in rule or match_priority["has_canceling_match"] is True):
2317 if self._debug_rules:
2318 self._logger.info(" found canceling event")
2319 return True, match_groups
2321 # Check create-event
2322 if match_groups["match_groups_message"] is not False and\
2323 match_groups.get("match_groups_syslog_application", ()) is not False and\
2324 match_priority["has_match"] is True:
2325 if self._debug_rules:
2326 self._logger.info(" found new event")
2327 return False, match_groups
2329 # Looks like there was no match, output some additonal info
2330 # Reasons preventing create-event
2331 if self._debug_rules:
2332 if match_groups["match_groups_message"] is False:
2333 self._logger.info(" did not create event, because of wrong message")
2334 if "match_application" in rule and match_groups[
2335 "match_groups_syslog_application"] is False:
2336 self._logger.info(" did not create event, because of wrong syslog application")
2337 if "match_priority" in rule and match_priority["has_match"] is False:
2338 self._logger.info(" did not create event, because of wrong syslog priority")
2340 if has_canceling_condition:
2341 # Reasons preventing cancel-event
2342 if "match_ok" in rule and match_groups.get("match_groups_message_ok",
2343 False) is False:
2344 self._logger.info(" did not cancel event, because of wrong message")
2345 if "cancel_application" in rule and \
2346 match_groups.get("match_groups_syslog_application_ok", False) is False:
2347 self._logger.info(" did not cancel event, because of wrong syslog application")
2348 if "cancel_priority" in rule and match_priority["has_canceling_match"] is False:
2349 self._logger.info(" did not cancel event, because of wrong cancel priority")
2351 return False
2353 def event_rule_matches_generic(self, rule, event):
2354 generic_match_functions = [
2355 self.event_rule_matches_site,
2356 self.event_rule_matches_host,
2357 self.event_rule_matches_ip,
2358 self.event_rule_matches_facility,
2359 self.event_rule_matches_service_level,
2360 self.event_rule_matches_timeperiod,
2363 for match_function in generic_match_functions:
2364 if not match_function(rule, event):
2365 return False
2366 return True
2368 def event_rule_determine_match_priority(self, rule, event, match_priority):
2369 p = event["priority"]
2371 if "match_priority" in rule:
2372 prio_from, prio_to = sorted(rule["match_priority"])
2373 match_priority["has_match"] = prio_from <= p <= prio_to
2374 else:
2375 match_priority["has_match"] = True
2377 if "cancel_priority" in rule:
2378 cancel_from, cancel_to = sorted(rule["cancel_priority"])
2379 match_priority["has_canceling_match"] = cancel_from <= p <= cancel_to
2380 else:
2381 match_priority["has_canceling_match"] = False
2383 if match_priority["has_match"] is False and\
2384 match_priority["has_canceling_match"] is False:
2385 return False
2387 return True
2389 def event_rule_matches_site(self, rule, event):
2390 return "match_site" not in rule or cmk.omd_site() in rule["match_site"]
2392 def event_rule_matches_host(self, rule, event):
2393 if match(rule.get("match_host"), event["host"], complete=True) is False:
2394 if self._debug_rules:
2395 self._logger.info(" did not match because of wrong host '%s' (need '%s')" %
2396 (event["host"], format_pattern(rule.get("match_host"))))
2397 return False
2398 return True
2400 def event_rule_matches_ip(self, rule, event):
2401 if not match_ipv4_network(rule.get("match_ipaddress", "0.0.0.0/0"), event["ipaddress"]):
2402 if self._debug_rules:
2403 self._logger.info(
2404 " did not match because of wrong source IP address '%s' (need '%s')" %
2405 (event["ipaddress"], rule.get("match_ipaddress")))
2406 return False
2407 return True
2409 def event_rule_matches_facility(self, rule, event):
2410 if "match_facility" in rule and event["facility"] != rule["match_facility"]:
2411 if self._debug_rules:
2412 self._logger.info(" did not match because of wrong syslog facility")
2413 return False
2414 return True
2416 def event_rule_matches_service_level(self, rule, event):
2417 if "match_sl" in rule:
2418 sl_from, sl_to = rule["match_sl"]
2419 if sl_from > sl_to:
2420 sl_to, sl_from = sl_from, sl_to
2421 p = event.get("sl", 0)
2422 if p < sl_from or p > sl_to:
2423 if self._debug_rules:
2424 self._logger.info(
2425 " did not match because of wrong service level %d (need %d..%d)" %
2426 (p, sl_from, sl_to),)
2427 return False
2428 return True
2430 def event_rule_matches_timeperiod(self, rule, event):
2431 if "match_timeperiod" in rule and not self._time_periods.check(rule["match_timeperiod"]):
2432 if self._debug_rules:
2433 self._logger.info(" did not match, because timeperiod %s is not active" %
2434 rule["match_timeperiod"])
2435 return False
2436 return True
2438 def event_rule_determine_match_groups(self, rule, event, match_groups):
2439 match_group_functions = [
2440 self.event_rule_matches_syslog_application,
2441 self.event_rule_matches_message,
2443 for match_function in match_group_functions:
2444 if not match_function(rule, event, match_groups):
2445 return False
2446 return True
2448 def event_rule_matches_syslog_application(self, rule, event, match_groups):
2449 if "match_application" not in rule and "cancel_application" not in rule:
2450 return True
2452 # Syslog application
2453 if "match_application" in rule:
2454 match_groups["match_groups_syslog_application"] = match(
2455 rule.get("match_application"), event["application"], complete=False)
2457 # Syslog application canceling, this option must be explictly set
2458 if "cancel_application" in rule:
2459 match_groups["match_groups_syslog_application_ok"] = match(
2460 rule.get("cancel_application"), event["application"], complete=False)
2462 # Detect impossible match
2463 if match_groups.get("match_groups_syslog_application", False) is False and\
2464 match_groups.get("match_groups_syslog_application_ok", False) is False:
2465 if self._debug_rules:
2466 self._logger.info(" did not match, syslog application does not match")
2467 return False
2469 return True
2471 def event_rule_matches_message(self, rule, event, match_groups):
2472 # Message matching, this condition is always active
2473 match_groups["match_groups_message"] = match(
2474 rule.get("match"), event["text"], complete=False)
2476 # Message canceling, this option must be explictly set
2477 if "match_ok" in rule:
2478 match_groups["match_groups_message_ok"] = match(
2479 rule.get("match_ok"), event["text"], complete=False)
2481 # Detect impossible match
2482 if match_groups["match_groups_message"] is False and\
2483 match_groups.get("match_groups_message_ok", False) is False:
2484 if self._debug_rules:
2485 self._logger.info(" did not match, message text does not match")
2486 return False
2488 return True
2492 # .--Status Queries------------------------------------------------------.
2493 # | ____ _ _ ___ _ |
2494 # | / ___|| |_ __ _| |_ _ _ ___ / _ \ _ _ ___ _ __(_) ___ ___ |
2495 # | \___ \| __/ _` | __| | | / __| | | | | | | |/ _ \ '__| |/ _ \/ __| |
2496 # | ___) | || (_| | |_| |_| \__ \ | |_| | |_| | __/ | | | __/\__ \ |
2497 # | |____/ \__\__,_|\__|\__,_|___/ \__\_\\__,_|\___|_| |_|\___||___/ |
2498 # | |
2499 # +----------------------------------------------------------------------+
2500 # | Parsing and processing of status queries |
2501 # '----------------------------------------------------------------------'
2504 class Queries(object):
2505 def __init__(self, status_server, sock, logger):
2506 super(Queries, self).__init__()
2507 self._status_server = status_server
2508 self._socket = sock
2509 self._logger = logger
2510 self._buffer = ""
2512 def __iter__(self):
2513 return self
2515 def next(self):
2516 while True:
2517 parts = self._buffer.split("\n\n", 1)
2518 if len(parts) > 1:
2519 break
2520 data = self._socket.recv(4096)
2521 if not data:
2522 if len(self._buffer) == 0:
2523 raise StopIteration()
2524 parts = [self._buffer, ""]
2525 break
2526 self._buffer += data
2527 request, self._buffer = parts
2528 return Query.make(self._status_server, request.decode("utf-8").splitlines(), self._logger)
2531 class Query(object):
2532 @staticmethod
2533 def make(status_server, raw_query, logger):
2534 parts = raw_query[0].split(None, 1)
2535 if len(parts) != 2:
2536 raise MKClientError("Invalid query. Need GET/COMMAND plus argument(s)")
2537 method = parts[0]
2538 if method == "GET":
2539 return QueryGET(status_server, raw_query, logger)
2540 if method == "REPLICATE":
2541 return QueryREPLICATE(status_server, raw_query, logger)
2542 if method == "COMMAND":
2543 return QueryCOMMAND(status_server, raw_query, logger)
2544 raise MKClientError("Invalid method %s (allowed are GET, REPLICATE, COMMAND)" % method)
2546 def __init__(self, status_server, raw_query, logger):
2547 super(Query, self).__init__()
2549 self._logger = logger
2550 self.output_format = "python"
2552 self._raw_query = raw_query
2553 self._from_raw_query(status_server)
2555 def _from_raw_query(self, status_server):
2556 self._parse_method_and_args()
2558 def _parse_method_and_args(self):
2559 parts = self._raw_query[0].split(None, 1)
2560 if len(parts) != 2:
2561 raise MKClientError("Invalid query. Need GET/COMMAND plus argument(s)")
2563 self.method, self.method_arg = parts
2565 def __repr__(self):
2566 return repr("\n".join(self._raw_query))
2569 class QueryGET(Query):
2570 _filter_operators = {
2571 "=": (lambda a, b: a == b),
2572 ">": (lambda a, b: a > b),
2573 "<": (lambda a, b: a < b),
2574 ">=": (lambda a, b: a >= b),
2575 "<=": (lambda a, b: a <= b),
2576 "~": (lambda a, b: cmk.utils.regex.regex(b).search(a)),
2577 "=~": (lambda a, b: a.lower() == b.lower()),
2578 "~~": (lambda a, b: cmk.utils.regex.regex(b.lower()).search(a.lower())),
2579 "in": (lambda a, b: a in b),
2582 def _from_raw_query(self, status_server):
2583 super(QueryGET, self)._from_raw_query(status_server)
2584 self._parse_table(status_server)
2585 self._parse_header_lines()
2587 def _parse_table(self, status_server):
2588 self.table_name = self.method_arg
2589 self.table = status_server.table(self.table_name)
2591 def _parse_header_lines(self):
2592 self.requested_columns = self.table.column_names # use all columns as default
2593 self.filters = []
2594 self.limit = None
2595 self.only_host = None
2597 self.header_lines = []
2598 for line in self._raw_query[1:]:
2599 try:
2600 header, argument = line.rstrip("\n").split(":", 1)
2601 argument = argument.lstrip(" ")
2603 if header == "OutputFormat":
2604 if argument not in ["python", "plain", "json"]:
2605 raise MKClientError(
2606 "Invalid output format \"%s\" (allowed are: python, plain, json)" %
2607 argument)
2609 self.output_format = argument
2611 elif header == "Columns":
2612 self.requested_columns = argument.split(" ")
2614 elif header == "Filter":
2615 column_name, operator_name, predicate, argument = self._parse_filter(argument)
2617 # Needed for later optimization (check_mkevents)
2618 if column_name == "event_host" and operator_name == 'in':
2619 self.only_host = set(argument)
2621 self.filters.append((column_name, operator_name, predicate, argument))
2623 elif header == "Limit":
2624 self.limit = int(argument)
2626 else:
2627 self._logger.info("Ignoring not-implemented header %s" % header)
2629 except Exception as e:
2630 raise MKClientError("Invalid header line '%s': %s" % (line.rstrip(), e))
2632 def _parse_filter(self, textspec):
2633 # Examples:
2634 # id = 17
2635 # name ~= This is some .* text
2636 # host_name =
2637 parts = textspec.split(None, 2)
2638 if len(parts) == 2:
2639 parts.append("")
2640 column, operator_name, argument = parts
2642 try:
2643 convert = self.table.column_types[column]
2644 except KeyError:
2645 raise MKClientError(
2646 "Unknown column: %s (Available are: %s)" % (column, self.table.column_names))
2648 # TODO: BUG: The query is decoded to unicode after receiving it from
2649 # the socket. The columns with type str (initialied with "") will apply
2650 # str(argument) here and convert the value back to str! This will crash
2651 # when the filter contains non ascii characters!
2652 # Fix this by making the default values unicode and skip unicode conversion
2653 # here (for performance reasons) because argument is already unicode.
2654 if operator_name == 'in':
2655 argument = map(convert, argument.split())
2656 else:
2657 argument = convert(argument)
2659 operator_function = self._filter_operators.get(operator_name)
2660 if not operator_function:
2661 raise MKClientError("Unknown filter operator '%s'" % operator_name)
2663 return (column, operator_name, lambda x: operator_function(x, argument), argument)
2665 def requested_column_indexes(self):
2666 indexes = []
2668 for column_name in self.requested_columns:
2669 try:
2670 column_index = self.table.column_indices[column_name]
2671 except KeyError:
2672 # The column is not known: Use None as index and None value later
2673 column_index = None
2674 indexes.append(column_index)
2676 return indexes
2678 def filter_row(self, row):
2679 for column_name, _operator_name, predicate, _argument in self.filters:
2680 if not predicate(row[self.table.column_indices[column_name]]):
2681 return None
2682 return row
2685 class QueryREPLICATE(Query):
2686 pass
2689 class QueryCOMMAND(Query):
2690 pass
2694 # .--Status Tables-------------------------------------------------------.
2695 # | ____ _ _ _____ _ _ |
2696 # | / ___|| |_ __ _| |_ _ _ ___ |_ _|_ _| |__ | | ___ ___ |
2697 # | \___ \| __/ _` | __| | | / __| | |/ _` | '_ \| |/ _ \/ __| |
2698 # | ___) | || (_| | |_| |_| \__ \ | | (_| | |_) | | __/\__ \ |
2699 # | |____/ \__\__,_|\__|\__,_|___/ |_|\__,_|_.__/|_|\___||___/ |
2700 # | |
2701 # +----------------------------------------------------------------------+
2702 # | Definitions of the tables available for status queries |
2703 # '----------------------------------------------------------------------'
2704 # If you need a new column here, then these are the places to change:
2705 # bin/mkeventd:
2706 # - add column to the end of StatusTableEvents.columns
2707 # - add column to grepping_filters if it is a str column
2708 # - deal with convert_history_line() (if not a str column)
2709 # - make sure that the new column is filled at *every* place where
2710 # an event is being created:
2711 # * _create_event_from_trap()
2712 # * create_event_from_line()
2713 # * _handle_absent_event()
2714 # * _create_overflow_event()
2715 # - When loading the status file add the possibly missing column to all
2716 # loaded events (load_status())
2717 # - Maybe add matching/rewriting for the new column
2718 # - write the actual code using the new column
2719 # web:
2720 # - Add column painter for the new column
2721 # - Create a sorter
2722 # - Create a filter
2723 # - Add painter and filter to all views where appropriate
2724 # - maybe add WATO code for matching rewriting
2725 # - do not forget event_rule_matches() in web!
2726 # - maybe add a field into the event simulator
2729 class StatusTable(object):
2730 prefix = None # type: Optional[str]
2731 columns = [] # type: List[Tuple[str, Any]]
2733 # Must return a enumerable type containing fully populated lists (rows) matching the
2734 # columns of the table
2735 @abc.abstractmethod
2736 def _enumerate(self, query):
2737 raise NotImplementedError()
2739 def __init__(self, logger):
2740 super(StatusTable, self).__init__()
2741 self._logger = logger.getChild("status_table.%s" % self.prefix)
2742 self._populate_column_views()
2744 def _populate_column_views(self):
2745 self.column_names = [c[0] for c in self.columns]
2746 self.column_defaults = dict(self.columns)
2748 self.column_types = {}
2749 for name, def_val in self.columns:
2750 self.column_types[name] = type(def_val)
2752 self.column_indices = dict([(name, index) for index, name in enumerate(self.column_names)])
2754 def query(self, query):
2755 requested_column_indexes = query.requested_column_indexes()
2757 # Output the column headers
2758 # TODO: Add support for ColumnHeaders like in livestatus?
2759 yield query.requested_columns
2761 num_rows = 0
2762 for row in self._enumerate(query):
2763 if query.limit is not None and num_rows >= query.limit:
2764 break # The maximum number of rows has been reached
2766 # Apply filters
2767 # TODO: History filtering is done in history load code. Check for improvements
2768 if query.filters and query.table_name != "history":
2769 matched = query.filter_row(row)
2770 if not matched:
2771 continue
2773 yield self._build_result_row(row, requested_column_indexes)
2774 num_rows += 1
2776 def _build_result_row(self, row, requested_column_indexes):
2777 result_row = []
2778 for index in requested_column_indexes:
2779 if index is None:
2780 result_row.append(None)
2781 else:
2782 result_row.append(row[index])
2783 return result_row
2786 class StatusTableEvents(StatusTable):
2787 prefix = "event"
2788 columns = [
2789 ("event_id", 1),
2790 ("event_count", 1),
2791 ("event_text", ""),
2792 ("event_first", 0.0),
2793 ("event_last", 0.0),
2794 ("event_comment", ""),
2795 ("event_sl", 0), # filter fehlt
2796 ("event_host", ""),
2797 ("event_contact", ""),
2798 ("event_application", ""),
2799 ("event_pid", 0),
2800 ("event_priority", 5),
2801 ("event_facility", 1),
2802 ("event_rule_id", ""),
2803 ("event_state", 0),
2804 ("event_phase", ""),
2805 ("event_owner", ""),
2806 ("event_match_groups", ""), # last column up to 1.2.4
2807 ("event_contact_groups", ""), # introduced in 1.2.5i2
2808 ("event_ipaddress", ""), # introduced in 1.2.7i1
2809 ("event_orig_host", ""), # introduced in 1.4.0b1
2810 ("event_contact_groups_precedence", "host"), # introduced in 1.4.0b1
2811 ("event_core_host", ""), # introduced in 1.5.0i1
2812 ("event_host_in_downtime", False), # introduced in 1.5.0i1
2813 ("event_match_groups_syslog_application", ""), # introduced in 1.5.0i2
2816 def __init__(self, logger, event_status):
2817 super(StatusTableEvents, self).__init__(logger)
2818 self._event_status = event_status
2820 def _enumerate(self, query):
2821 for event in self._event_status.get_events():
2822 # Optimize filters that are set by the check_mkevents active check. Since users
2823 # may have a lot of those checks running, it is a good idea to optimize this.
2824 if query.only_host and event["host"] not in query.only_host:
2825 continue
2827 row = []
2828 for column_name in self.column_names:
2829 try:
2830 row.append(event[column_name[6:]])
2831 except KeyError:
2832 # The row does not have this value. Use the columns default value
2833 row.append(self.column_defaults[column_name])
2835 yield row
2838 class StatusTableHistory(StatusTable):
2839 prefix = "history"
2840 columns = [
2841 ("history_line", 0), # Line number in event history file
2842 ("history_time", 0.0),
2843 ("history_what", ""),
2844 ("history_who", ""),
2845 ("history_addinfo", ""),
2846 ] + StatusTableEvents.columns
2848 def __init__(self, logger, history):
2849 super(StatusTableHistory, self).__init__(logger)
2850 self._history = history
2852 def _enumerate(self, query):
2853 return self._history.get(query)
2856 class StatusTableRules(StatusTable):
2857 prefix = "rule"
2858 columns = [
2859 ("rule_id", ""),
2860 ("rule_hits", 0),
2863 def __init__(self, logger, event_status):
2864 super(StatusTableRules, self).__init__(logger)
2865 self._event_status = event_status
2867 def _enumerate(self, query):
2868 return self._event_status.get_rule_stats()
2871 class StatusTableStatus(StatusTable):
2872 prefix = "status"
2873 columns = EventServer.status_columns()
2875 def __init__(self, logger, event_server):
2876 super(StatusTableStatus, self).__init__(logger)
2877 self._event_server = event_server
2879 def _enumerate(self, query):
2880 return self._event_server.get_status()
2884 # .--StatusServer--------------------------------------------------------.
2885 # | ____ _ _ ____ |
2886 # | / ___|| |_ __ _| |_ _ _ ___/ ___| ___ _ ____ _____ _ __ |
2887 # | \___ \| __/ _` | __| | | / __\___ \ / _ \ '__\ \ / / _ \ '__| |
2888 # | ___) | || (_| | |_| |_| \__ \___) | __/ | \ V / __/ | |
2889 # | |____/ \__\__,_|\__|\__,_|___/____/ \___|_| \_/ \___|_| |
2890 # | |
2891 # +----------------------------------------------------------------------+
2892 # | Beantworten von Status- und Kommandoanfragen über das UNIX-Socket |
2893 # '----------------------------------------------------------------------'
2896 class StatusServer(ECServerThread):
2897 def __init__(self, logger, settings, config, slave_status, perfcounters, lock_configuration,
2898 history, event_status, event_server, terminate_main_event):
2899 super(StatusServer, self).__init__(
2900 name="StatusServer",
2901 logger=logger,
2902 settings=settings,
2903 config=config,
2904 slave_status=slave_status,
2905 profiling_enabled=settings.options.profile_status,
2906 profile_file=settings.paths.status_server_profile.value)
2907 self._socket = None
2908 self._tcp_socket = None
2909 self._reopen_sockets = False
2911 self._table_events = StatusTableEvents(logger, event_status)
2912 self._table_history = StatusTableHistory(logger, history)
2913 self._table_rules = StatusTableRules(logger, event_status)
2914 self._table_status = StatusTableStatus(logger, event_server)
2915 self._perfcounters = perfcounters
2916 self._lock_configuration = lock_configuration
2917 self._history = history
2918 self._event_status = event_status
2919 self._event_server = event_server
2920 self._event_columns = StatusTableEvents.columns
2921 self._terminate_main_event = terminate_main_event
2923 self.open_unix_socket()
2924 self.open_tcp_socket()
2926 def table(self, name):
2927 if name == "events":
2928 return self._table_events
2929 if name == "history":
2930 return self._table_history
2931 if name == "rules":
2932 return self._table_rules
2933 if name == "status":
2934 return self._table_status
2935 raise MKClientError(
2936 "Invalid table: %s (allowed are: events, history, rules, status)" % name)
2938 def open_unix_socket(self):
2939 path = self.settings.paths.unix_socket.value
2940 if path.exists():
2941 path.unlink()
2942 path.parent.mkdir(parents=True, exist_ok=True)
2943 self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
2944 self._socket.bind(str(path))
2945 # Make sure that socket is group writable
2946 path.chmod(0o664)
2947 self._socket.listen(self._config['socket_queue_len'])
2948 self._unix_socket_queue_len = self._config['socket_queue_len'] # detect changes in config
2950 def open_tcp_socket(self):
2951 if self._config["remote_status"]:
2952 try:
2953 self._tcp_port, self._tcp_allow_commands = self._config["remote_status"][:2]
2954 try:
2955 self._tcp_access_list = self._config["remote_status"][2]
2956 except Exception:
2957 self._tcp_access_list = None
2959 self._tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2960 self._tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
2961 self._tcp_socket.bind(("0.0.0.0", self._tcp_port))
2962 self._tcp_socket.listen(self._config['socket_queue_len'])
2963 self._logger.info(
2964 "Going to listen for status queries on TCP port %d" % self._tcp_port)
2965 except Exception as e:
2966 if self.settings.options.debug:
2967 raise
2968 self._logger.exception(
2969 "Cannot listen on TCP socket port %d: %s" % (self._tcp_port, e))
2970 else:
2971 self._tcp_socket = None
2972 self._tcp_port = 0
2973 self._tcp_allow_commands = False
2974 self._tcp_access_list = None
2976 def close_unix_socket(self):
2977 if self._socket:
2978 self._socket.close()
2979 self._socket = None
2981 def close_tcp_socket(self):
2982 if self._tcp_socket:
2983 self._tcp_socket.close()
2984 self._tcp_socket = None
2986 def reopen_sockets(self):
2987 if self._unix_socket_queue_len != self._config["socket_queue_len"]:
2988 self._logger.info("socket_queue_len has changed. Reopening UNIX socket.")
2989 self.close_unix_socket()
2990 self.open_unix_socket()
2992 self.close_tcp_socket()
2993 self.open_tcp_socket()
2995 def reload_configuration(self, config):
2996 self._config = config
2997 self._reopen_sockets = True
2999 def serve(self):
3000 while not self._terminate_event.is_set():
3001 try:
3002 client_socket = None
3003 addr_info = None
3005 if self._reopen_sockets:
3006 self.reopen_sockets()
3007 self._reopen_sockets = False
3009 listen_list = [self._socket]
3010 if self._tcp_socket:
3011 listen_list.append(self._tcp_socket)
3013 try:
3014 readable = select.select(listen_list, [], [], 0.2)[0]
3015 except select.error as e:
3016 if e[0] == errno.EINTR:
3017 continue
3018 raise
3020 for s in readable:
3021 client_socket, addr_info = s.accept()
3022 client_socket.settimeout(3)
3023 before = time.time()
3024 self._perfcounters.count("connects")
3025 if addr_info:
3026 allow_commands = self._tcp_allow_commands
3027 if self.settings.options.debug:
3028 self._logger.info("Handle status connection from %s:%d" % addr_info)
3029 if self._tcp_access_list is not None and addr_info[0] not in \
3030 self._tcp_access_list:
3031 client_socket.close()
3032 client_socket = None
3033 self._logger.info(
3034 "Denying access to status socket from %s (allowed is only %s)" %
3035 (addr_info[0], ", ".join(self._tcp_access_list)))
3036 continue
3037 else:
3038 allow_commands = True
3040 self.handle_client(client_socket, allow_commands, addr_info and addr_info[0] or
3043 duration = time.time() - before
3044 self._logger.verbose("Answered request in %0.2f ms" % (duration * 1000))
3045 self._perfcounters.count_time("request", duration)
3047 except Exception as e:
3048 msg = "Error handling client %s: %s" % (addr_info, e)
3049 # Do not log a stack trace for client errors, they are not *our* fault.
3050 if isinstance(e, MKClientError):
3051 self._logger.error(msg)
3052 else:
3053 self._logger.exception(msg)
3054 if client_socket:
3055 client_socket.close()
3056 client_socket = None
3057 time.sleep(0.2)
3058 client_socket = None # close without danger of exception
3060 def handle_client(self, client_socket, allow_commands, client_ip):
3061 for query in Queries(self, client_socket, self._logger):
3062 self._logger.verbose("Client livestatus query: %r" % query)
3064 with self._event_status.lock:
3065 if query.method == "GET":
3066 response = self.table(query.table_name).query(query)
3068 elif query.method == "REPLICATE":
3069 response = self.handle_replicate(query.method_arg, client_ip)
3071 elif query.method == "COMMAND":
3072 if not allow_commands:
3073 raise MKClientError("Sorry. Commands are disallowed via TCP")
3074 self.handle_command_request(query.method_arg)
3075 response = None
3077 else:
3078 raise NotImplementedError()
3080 try:
3081 self._answer_query(client_socket, query, response)
3082 except socket.error as e:
3083 if e.errno == 32: # Broken pipe -> ignore this
3084 pass
3085 else:
3086 raise
3088 client_socket.close()
3090 # Only GET queries have customizable output formats. COMMAND is always
3091 # a dictionay and COMMAND is always None and always output as "python"
3092 def _answer_query(self, client_socket, query, response):
3093 if query.method != "GET":
3094 self._answer_query_python(client_socket, response)
3095 return
3097 if query.output_format == "plain":
3098 for row in response:
3099 client_socket.sendall("\t".join([cmk.ec.history.quote_tab(c) for c in row]) + "\n")
3101 elif query.output_format == "json":
3102 client_socket.sendall(json.dumps(list(response)) + "\n")
3104 elif query.output_format == "python":
3105 self._answer_query_python(client_socket, list(response))
3107 else:
3108 raise NotImplementedError()
3110 def _answer_query_python(self, client_socket, response):
3111 client_socket.sendall(repr(response) + "\n")
3113 # All commands are already locked with self._event_status.lock
3114 def handle_command_request(self, commandline):
3115 self._logger.info("Executing command: %s" % commandline)
3116 parts = commandline.split(";")
3117 command = parts[0]
3118 replication_allow_command(self._config, command, self._slave_status)
3119 arguments = parts[1:]
3120 if command == "DELETE":
3121 self.handle_command_delete(arguments)
3122 elif command == "RELOAD":
3123 self.handle_command_reload()
3124 elif command == "SHUTDOWN":
3125 self._logger.info("Going to shut down")
3126 terminate(self._terminate_main_event, self._event_server, self)
3127 elif command == "REOPENLOG":
3128 self.handle_command_reopenlog()
3129 elif command == "FLUSH":
3130 self.handle_command_flush()
3131 elif command == "SYNC":
3132 self.handle_command_sync()
3133 elif command == "RESETCOUNTERS":
3134 self.handle_command_resetcounters(arguments)
3135 elif command == "UPDATE":
3136 self.handle_command_update(arguments)
3137 elif command == "CREATE":
3138 self.handle_command_create(arguments)
3139 elif command == "CHANGESTATE":
3140 self.handle_command_changestate(arguments)
3141 elif command == "ACTION":
3142 self.handle_command_action(arguments)
3143 elif command == "SWITCHMODE":
3144 self.handle_command_switchmode(arguments)
3145 else:
3146 raise MKClientError("Unknown command %s" % command)
3148 def handle_command_delete(self, arguments):
3149 if len(arguments) != 2:
3150 raise MKClientError("Wrong number of arguments for DELETE")
3151 event_id, user = arguments
3152 self._event_status.delete_event(int(event_id), user)
3154 def handle_command_update(self, arguments):
3155 event_id, user, acknowledged, comment, contact = arguments
3156 event = self._event_status.event(int(event_id))
3157 if not event:
3158 raise MKClientError("No event with id %s" % event_id)
3159 # Note the common practice: We validate parameters *before* doing any changes.
3160 if acknowledged:
3161 ack = int(acknowledged)
3162 if ack and event["phase"] not in ["open", "ack"]:
3163 raise MKClientError("You cannot acknowledge an event that is not open.")
3164 event["phase"] = "ack" if ack else "open"
3165 if comment:
3166 event["comment"] = comment
3167 if contact:
3168 event["contact"] = contact
3169 if user:
3170 event["owner"] = user
3171 self._history.add(event, "UPDATE", user)
3173 def handle_command_create(self, arguments):
3174 # Would rather use process_raw_line(), but we are already
3175 # holding self._event_status.lock and it's sub functions are setting
3176 # self._event_status.lock too. The lock can not be allocated twice.
3177 # TODO: Change the lock type in future?
3178 # process_raw_lines("%s" % ";".join(arguments))
3179 with file(str(self.settings.paths.event_pipe.value), "w") as pipe:
3180 pipe.write(("%s\n" % ";".join(arguments)).encode("utf-8"))
3182 def handle_command_changestate(self, arguments):
3183 event_id, user, newstate = arguments
3184 event = self._event_status.event(int(event_id))
3185 if not event:
3186 raise MKClientError("No event with id %s" % event_id)
3187 event["state"] = int(newstate)
3188 if user:
3189 event["owner"] = user
3190 self._history.add(event, "CHANGESTATE", user)
3192 def handle_command_reload(self):
3193 reload_configuration(self.settings, self._logger, self._lock_configuration, self._history,
3194 self._event_status, self._event_server, self, self._slave_status)
3196 def handle_command_reopenlog(self):
3197 self._logger.info("Closing this logfile")
3198 cmk.utils.log.open_log(str(self.settings.paths.log_file.value))
3199 self._logger.info("Opened new logfile")
3201 # Erase our current state and history!
3202 def handle_command_flush(self):
3203 self._history.flush()
3204 self._event_status.flush()
3205 self._event_status.save_status()
3206 if is_replication_slave(self._config):
3207 try:
3208 self.settings.paths.master_config_file.value.unlink()
3209 self.settings.paths.slave_status_file.value.unlink()
3210 update_slave_status(self._slave_status, self.settings, self._config)
3211 except Exception:
3212 pass
3213 self._logger.info("Flushed current status and historic events.")
3215 def handle_command_sync(self):
3216 self._event_status.save_status()
3218 def handle_command_resetcounters(self, arguments):
3219 if arguments:
3220 rule_id = arguments[0]
3221 self._logger.info("Resetting counters of rule " + rule_id)
3222 else:
3223 rule_id = None # Reset all rule counters
3224 self._logger.info("Resetting all rule counters")
3225 self._event_status.reset_counters(rule_id)
3227 def handle_command_action(self, arguments):
3228 event_id, user, action_id = arguments
3229 event = self._event_status.event(int(event_id))
3230 if user:
3231 event["owner"] = user
3233 if action_id == "@NOTIFY":
3234 cmk.ec.actions.do_notify(
3235 self._event_server, self._logger, event, user, is_cancelling=False)
3236 else:
3237 with self._lock_configuration:
3238 if action_id not in self._config["action"]:
3239 raise MKClientError(
3240 "The action '%s' is not defined. After adding new commands please "
3241 "make sure that you activate the changes in the Event Console." % action_id)
3242 action = self._config["action"][action_id]
3243 cmk.ec.actions.do_event_action(self._history, self.settings, self._config, self._logger,
3244 self._event_columns, action, event, user)
3246 def handle_command_switchmode(self, arguments):
3247 new_mode = arguments[0]
3248 if not is_replication_slave(self._config):
3249 raise MKClientError("Cannot switch replication mode: this is not a replication slave.")
3250 elif new_mode not in ["sync", "takeover"]:
3251 raise MKClientError(
3252 "Invalid target mode '%s': allowed are only 'sync' and 'takeover'" % new_mode)
3253 self._slave_status["mode"] = new_mode
3254 save_slave_status(self.settings, self._slave_status)
3255 self._logger.info("Switched replication mode to '%s' by external command." % new_mode)
3257 def handle_replicate(self, argument, client_ip):
3258 # Last time our slave got a config update
3259 try:
3260 last_update = int(argument)
3261 if self.settings.options.debug:
3262 self._logger.info("Replication: sync request from %s, last update %d seconds ago" %
3263 (client_ip, time.time() - last_update))
3265 except Exception:
3266 raise MKClientError("Invalid arguments to command REPLICATE")
3267 return replication_send(self._config, self._lock_configuration, self._event_status,
3268 last_update)
3272 # .--Dispatching---------------------------------------------------------.
3273 # | ____ _ _ _ _ |
3274 # | | _ \(_)___ _ __ __ _| |_ ___| |__ (_)_ __ __ _ |
3275 # | | | | | / __| '_ \ / _` | __/ __| '_ \| | '_ \ / _` | |
3276 # | | |_| | \__ \ |_) | (_| | || (__| | | | | | | | (_| | |
3277 # | |____/|_|___/ .__/ \__,_|\__\___|_| |_|_|_| |_|\__, | |
3278 # | |_| |___/ |
3279 # +----------------------------------------------------------------------+
3280 # | Starten und Verwalten der beiden Threads. |
3281 # '----------------------------------------------------------------------'
3284 def run_eventd(terminate_main_event, settings, config, lock_configuration, history, perfcounters,
3285 event_status, event_server, status_server, slave_status, logger):
3286 status_server.start()
3287 event_server.start()
3288 now = time.time()
3289 next_housekeeping = now + config["housekeeping_interval"]
3290 next_retention = now + config["retention_interval"]
3291 next_statistics = now + config["statistics_interval"]
3292 next_replication = 0 # force immediate replication after restart
3294 while not terminate_main_event.is_set():
3295 try:
3296 try:
3297 # Wait until either housekeeping or retention is due, but at
3298 # maximum 60 seconds. That way changes of the interval from a very
3299 # high to a low value will never require more than 60 seconds
3301 event_list = [next_housekeeping, next_retention, next_statistics]
3302 if is_replication_slave(config):
3303 event_list.append(next_replication)
3305 time_left = max(0, min(event_list) - time.time())
3306 time.sleep(min(time_left, 60))
3308 now = time.time()
3309 if now > next_housekeeping:
3310 event_server.do_housekeeping()
3311 next_housekeeping = now + config["housekeeping_interval"]
3313 if now > next_retention:
3314 with event_status.lock:
3315 event_status.save_status()
3316 next_retention = now + config["retention_interval"]
3318 if now > next_statistics:
3319 perfcounters.do_statistics()
3320 next_statistics = now + config["statistics_interval"]
3322 # Beware: replication might be turned on during this loop!
3323 if is_replication_slave(config) and now > next_replication:
3324 replication_pull(settings, config, lock_configuration, perfcounters,
3325 event_status, event_server, slave_status, logger)
3326 next_replication = now + config["replication"]["interval"]
3327 except MKSignalException as e:
3328 raise e
3329 except Exception as e:
3330 logger.exception("Exception in main thread:\n%s" % e)
3331 if settings.options.debug:
3332 raise
3333 time.sleep(1)
3334 except MKSignalException as e:
3335 if e._signum == 1:
3336 logger.info("Received SIGHUP - going to reload configuration")
3337 reload_configuration(settings, logger, lock_configuration, history, event_status,
3338 event_server, status_server, slave_status)
3339 else:
3340 logger.info("Signalled to death by signal %d" % e._signum)
3341 terminate(terminate_main_event, event_server, status_server)
3343 # Now wait for termination of the server threads
3344 event_server.join()
3345 status_server.join()
3349 # .--EventStatus---------------------------------------------------------.
3350 # | _____ _ ____ _ _ |
3351 # | | ____|_ _____ _ __ | |_/ ___|| |_ __ _| |_ _ _ ___ |
3352 # | | _| \ \ / / _ \ '_ \| __\___ \| __/ _` | __| | | / __| |
3353 # | | |___ \ V / __/ | | | |_ ___) | || (_| | |_| |_| \__ \ |
3354 # | |_____| \_/ \___|_| |_|\__|____/ \__\__,_|\__|\__,_|___/ |
3355 # | |
3356 # +----------------------------------------------------------------------+
3357 # | Bereithalten des aktuellen Event-Status. Dieser schützt sich selbst |
3358 # | durch ein Lock vor gleichzeitigen Zugriffen durch die Threads. |
3359 # '----------------------------------------------------------------------'
3362 class EventStatus(object):
3363 def __init__(self, settings, config, perfcounters, history, logger):
3364 self.settings = settings
3365 self._config = config
3366 self._perfcounters = perfcounters
3367 self.lock = threading.Lock()
3368 self._history = history
3369 self._logger = logger
3370 self.flush()
3372 def reload_configuration(self, config):
3373 self._config = config
3375 def flush(self):
3376 self._events = []
3377 self._next_event_id = 1
3378 self._rule_stats = {}
3379 self._interval_starts = {} # needed for expecting rules
3380 self._initialize_event_limit_status()
3382 # TODO: might introduce some performance counters, like:
3383 # - number of received messages
3384 # - number of rule hits
3385 # - number of rule misses
3387 def events(self):
3388 return self._events
3390 def event(self, eid):
3391 for event in self._events:
3392 if event["id"] == eid:
3393 return event
3395 # Return beginning of current expectation interval. For new rules
3396 # we start with the next interval in future.
3397 def interval_start(self, rule_id, interval):
3398 if rule_id not in self._interval_starts:
3399 start = self.next_interval_start(interval, time.time())
3400 self._interval_starts[rule_id] = start
3401 return start
3402 else:
3403 start = self._interval_starts[rule_id]
3404 # Make sure that if the user switches from day to hour and we
3405 # are still waiting for the first interval to begin, that we
3406 # do not wait for the next day.
3407 next_interval = self.next_interval_start(interval, time.time())
3408 if start > next_interval:
3409 start = next_interval
3410 self._interval_starts[rule_id] = start
3411 return start
3413 def next_interval_start(self, interval, previous_start):
3414 if isinstance(interval, tuple):
3415 length, offset = interval
3416 offset *= 3600
3417 else:
3418 length = interval
3419 offset = 0
3421 previous_start -= offset # take into account timezone offset
3422 full_parts = divmod(previous_start, length)[0]
3423 next_start = (full_parts + 1) * length
3424 next_start += offset
3425 return next_start
3427 def start_next_interval(self, rule_id, interval):
3428 current_start = self.interval_start(rule_id, interval)
3429 next_start = self.next_interval_start(interval, current_start)
3430 self._interval_starts[rule_id] = next_start
3431 self._logger.debug("Rule %s: next interval starts %s (i.e. now + %.2f sec)" %
3432 (rule_id, next_start, time.time() - next_start))
3434 def pack_status(self):
3435 return {
3436 "next_event_id": self._next_event_id,
3437 "events": self._events,
3438 "rule_stats": self._rule_stats,
3439 "interval_starts": self._interval_starts,
3442 def unpack_status(self, status):
3443 self._next_event_id = status["next_event_id"]
3444 self._events = status["events"]
3445 self._rule_stats = status["rule_stats"]
3446 self._interval_starts = status["interval_starts"]
3448 def save_status(self):
3449 now = time.time()
3450 status = self.pack_status()
3451 path = self.settings.paths.status_file.value
3452 path_new = path.parent / (path.name + '.new')
3453 # Believe it or not: cPickle is more than two times slower than repr()
3454 with path_new.open(mode='wb') as f:
3455 f.write(repr(status) + "\n")
3456 f.flush()
3457 os.fsync(f.fileno())
3458 path_new.rename(path)
3459 elapsed = time.time() - now
3460 self._logger.verbose("Saved event state to %s in %.3fms." % (path, elapsed * 1000))
3462 def reset_counters(self, rule_id):
3463 if rule_id:
3464 if rule_id in self._rule_stats:
3465 del self._rule_stats[rule_id]
3466 else:
3467 self._rule_stats = {}
3468 self.save_status()
3470 def load_status(self, event_server):
3471 path = self.settings.paths.status_file.value
3472 if path.exists():
3473 try:
3474 status = ast.literal_eval(path.read_bytes())
3475 self._next_event_id = status["next_event_id"]
3476 self._events = status["events"]
3477 self._rule_stats = status["rule_stats"]
3478 self._interval_starts = status.get("interval_starts", {})
3479 self._initialize_event_limit_status()
3480 self._logger.info("Loaded event state from %s." % path)
3481 except Exception as e:
3482 self._logger.exception("Error loading event state from %s: %s" % (path, e))
3483 raise
3485 # Add new columns
3486 for event in self._events:
3487 event.setdefault("ipaddress", "")
3489 if "core_host" not in event:
3490 event_server.add_core_host_to_event(event)
3491 event["host_in_downtime"] = False
3493 # Called on Event Console initialization from status file to initialize
3494 # the current event limit state -> Sets internal counters which are
3495 # updated during runtime.
3496 def _initialize_event_limit_status(self):
3497 self.num_existing_events = len(self._events)
3499 self.num_existing_events_by_host = {}
3500 self.num_existing_events_by_rule = {}
3501 for event in self._events:
3502 self._count_event_add(event)
3504 def _count_event_add(self, event):
3505 if event["host"] not in self.num_existing_events_by_host:
3506 self.num_existing_events_by_host[event["host"]] = 1
3507 else:
3508 self.num_existing_events_by_host[event["host"]] += 1
3510 if event["rule_id"] not in self.num_existing_events_by_rule:
3511 self.num_existing_events_by_rule[event["rule_id"]] = 1
3512 else:
3513 self.num_existing_events_by_rule[event["rule_id"]] += 1
3515 def _count_event_remove(self, event):
3516 self.num_existing_events -= 1
3517 self.num_existing_events_by_host[event["host"]] -= 1
3518 self.num_existing_events_by_rule[event["rule_id"]] -= 1
3520 def new_event(self, event):
3521 self._perfcounters.count("events")
3522 event["id"] = self._next_event_id
3523 self._next_event_id += 1
3524 self._events.append(event)
3525 self.num_existing_events += 1
3526 self._count_event_add(event)
3527 self._history.add(event, "NEW")
3529 def archive_event(self, event):
3530 self._perfcounters.count("events")
3531 event["id"] = self._next_event_id
3532 self._next_event_id += 1
3533 event["phase"] = "closed"
3534 self._history.add(event, "ARCHIVED")
3536 def remove_event(self, event):
3537 try:
3538 self._events.remove(event)
3539 self._count_event_remove(event)
3540 except ValueError:
3541 self._logger.exception("Cannot remove event %d: not present" % event["id"])
3543 # protected by self.lock
3544 def _remove_event_by_nr(self, index):
3545 event = self._events.pop(index)
3546 self._count_event_remove(event)
3548 # protected by self.lock
3549 def remove_oldest_event(self, ty, event):
3550 if ty == "overall":
3551 self._logger.verbose(" Removing oldest event")
3552 self._remove_event_by_nr(0)
3553 elif ty == "by_rule":
3554 self._logger.verbose(" Removing oldest event of rule \"%s\"" % event["rule_id"])
3555 self._remove_oldest_event_of_rule(event["rule_id"])
3556 elif ty == "by_host":
3557 self._logger.verbose(" Removing oldest event of host \"%s\"" % event["host"])
3558 self._remove_oldest_event_of_host(event["host"])
3560 # protected by self.lock
3561 def _remove_oldest_event_of_rule(self, rule_id):
3562 for event in self._events:
3563 if event["rule_id"] == rule_id:
3564 self.remove_event(event)
3565 return
3567 # protected by self.lock
3568 def _remove_oldest_event_of_host(self, hostname):
3569 for event in self._events:
3570 if event["host"] == hostname:
3571 self.remove_event(event)
3572 return
3574 # protected by self.lock
3575 def get_num_existing_events_by(self, ty, event):
3576 if ty == "overall":
3577 return self.num_existing_events
3578 elif ty == "by_rule":
3579 return self.num_existing_events_by_rule.get(event["rule_id"], 0)
3580 elif ty == "by_host":
3581 return self.num_existing_events_by_host.get(event["host"], 0)
3582 else:
3583 raise NotImplementedError()
3585 # Cancel all events the belong to a certain rule id and are
3586 # of the same "breed" as a new event.
3587 def cancel_events(self, event_server, event_columns, new_event, match_groups, rule):
3588 with self.lock:
3589 to_delete = []
3590 for nr, event in enumerate(self._events):
3591 if event["rule_id"] == rule["id"]:
3592 if self.cancelling_match(match_groups, new_event, event, rule):
3593 # Fill a few fields of the cancelled event with data from
3594 # the cancelling event so that action scripts have useful
3595 # values and the logfile entry if more relevant.
3596 previous_phase = event["phase"]
3597 event["phase"] = "closed"
3598 # TODO: Why do we use OK below and not new_event["state"]???
3599 event["state"] = 0 # OK
3600 event["text"] = new_event["text"]
3601 # TODO: This is a hack and partial copy-n-paste from rewrite_events...
3602 if "set_text" in rule:
3603 event["text"] = replace_groups(rule["set_text"], event["text"],
3604 match_groups)
3605 event["time"] = new_event["time"]
3606 event["last"] = new_event["time"]
3607 event["priority"] = new_event["priority"]
3608 self._history.add(event, "CANCELLED")
3609 actions = rule.get("cancel_actions", [])
3610 if actions:
3611 if previous_phase != "open" \
3612 and rule.get("cancel_action_phases", "always") == "open":
3613 self._logger.info(
3614 "Do not execute cancelling actions, event %s's phase "
3615 "is not 'open' but '%s'" % (event["id"], previous_phase))
3616 else:
3617 cmk.ec.actions.do_event_actions(
3618 self._history,
3619 self.settings,
3620 self._config,
3621 self._logger,
3622 event_server,
3623 event_columns,
3624 actions,
3625 event,
3626 is_cancelling=True)
3628 to_delete.append(nr)
3630 for nr in to_delete[::-1]:
3631 self._remove_event_by_nr(nr)
3633 def cancelling_match(self, match_groups, new_event, event, rule):
3634 debug = self._config["debug_rules"]
3636 # The match_groups of the canceling match only contain the *_ok match groups
3637 # Since the rewrite definitions are based on the positive match, we need to
3638 # create some missing keys. O.o
3639 for key in match_groups.keys():
3640 if key.endswith("_ok"):
3641 match_groups[key[:-3]] = match_groups[key]
3643 # Note: before we compare host and application we need to
3644 # apply the rewrite rules to the event. Because if in the previous
3645 # the hostname was rewritten, it wouldn't match anymore here.
3646 host = new_event["host"]
3647 if "set_host" in rule:
3648 host = replace_groups(rule["set_host"], host, match_groups)
3650 if event["host"] != host:
3651 if debug:
3652 self._logger.info("Do not cancel event %d: host is not the same (%s != %s)" %
3653 (event["id"], event["host"], host))
3654 return False
3656 # The same for the application. But in case there is cancelling based on the application
3657 # configured in the rule, then don't check for different applications.
3658 if "cancel_application" not in rule:
3659 application = new_event["application"]
3660 if "set_application" in rule:
3661 application = replace_groups(rule["set_application"], application, match_groups)
3662 if event["application"] != application:
3663 if debug:
3664 self._logger.info(
3665 "Do not cancel event %d: application is not the same (%s != %s)" %
3666 (event["id"], event["application"], application))
3667 return False
3669 if event["facility"] != new_event["facility"]:
3670 if debug:
3671 self._logger.info(
3672 "Do not cancel event %d: syslog facility is not the same (%d != %d)" %
3673 (event["id"], event["facility"], new_event["facility"]))
3675 # Make sure, that the matching groups are the same. If the OK match
3676 # has less groups, we do not care. If it has more groups, then we
3677 # do not care either. We just compare the common "prefix".
3678 for nr, (prev_group, cur_group) in enumerate(
3679 zip(event["match_groups"], match_groups.get("match_groups_message_ok", ()))):
3680 if prev_group != cur_group:
3681 if debug:
3682 self._logger.info("Do not cancel event %d: match group number "
3683 "%d does not match (%s != %s)" % (event["id"], nr + 1,
3684 prev_group, cur_group))
3685 return False
3687 # Note: Duplicated code right above
3688 # Make sure, that the syslog_application matching groups are the same. If the OK match
3689 # has less groups, we do not care. If it has more groups, then we
3690 # do not care either. We just compare the common "prefix".
3691 for nr, (prev_group, cur_group) in enumerate(
3692 zip(
3693 event.get("match_groups_syslog_application", ()),
3694 match_groups.get("match_groups_syslog_application_ok", ()))):
3695 if prev_group != cur_group:
3696 if debug:
3697 self._logger.info(
3698 "Do not cancel event %d: syslog application match group number "
3699 "%d does not match (%s != %s)" % (event["id"], nr + 1, prev_group,
3700 cur_group))
3701 return False
3703 return True
3705 def count_rule_match(self, rule_id):
3706 with self.lock:
3707 self._rule_stats.setdefault(rule_id, 0)
3708 self._rule_stats[rule_id] += 1
3710 def count_event_up(self, found, event):
3711 # Update event with new information from new occurrance,
3712 # but preserve certain attributes from the original (first)
3713 # event.
3714 preserve = {
3715 "count": found.get("count", 1) + 1,
3716 "first": found["first"],
3718 # When event is already active then do not change
3719 # comment or contact information anymore
3720 if found["phase"] == "open":
3721 if "comment" in found:
3722 preserve["comment"] = found["comment"]
3723 if "contact" in found:
3724 preserve["contact"] = found["contact"]
3725 found.update(event)
3726 found.update(preserve)
3728 def count_expected_event(self, event_server, event):
3729 for ev in self._events:
3730 if ev["rule_id"] == event["rule_id"] and ev["phase"] == "counting":
3731 self.count_event_up(ev, event)
3732 return
3734 # None found, create one
3735 event["count"] = 1
3736 event["phase"] = "counting"
3737 event_server.new_event_respecting_limits(event)
3739 def count_event(self, event_server, event, rule, count):
3740 # Find previous occurrance of this event and acount for
3741 # one new occurrance. In case of negated count (expecting rules)
3742 # we do never modify events that are already in the state "open"
3743 # since the event has been created because the count was too
3744 # low in the specified period of time.
3745 for ev in self._events:
3746 if ev["rule_id"] == event["rule_id"]:
3747 if ev["phase"] == "ack" and not count["count_ack"]:
3748 continue # skip acknowledged events
3750 if count["separate_host"] and ev["host"] != event["host"]:
3751 continue # treat events with separated hosts separately
3753 if count["separate_application"] and ev["application"] != event["application"]:
3754 continue # same for application
3756 if count["separate_match_groups"] and ev["match_groups"] != event["match_groups"]:
3757 continue
3759 if count.get("count_duration"
3760 ) is not None and ev["first"] + count["count_duration"] < event["time"]:
3761 # Counting has been discontinued on this event after a certain time
3762 continue
3764 if ev["host_in_downtime"] != event["host_in_downtime"]:
3765 continue # treat events with different downtime states separately
3767 found = ev
3768 self.count_event_up(found, event)
3769 break
3770 else:
3771 event["count"] = 1
3772 event["phase"] = "counting"
3773 event_server.new_event_respecting_limits(event)
3774 found = event
3776 # Did we just count the event that was just one too much?
3777 if found["phase"] == "counting" and found["count"] >= count["count"]:
3778 found["phase"] = "open"
3779 return found # do event action, return found copy of event
3780 return False # do not do event action
3782 # locked with self.lock
3783 def delete_event(self, event_id, user):
3784 for nr, event in enumerate(self._events):
3785 if event["id"] == event_id:
3786 event["phase"] = "closed"
3787 if user:
3788 event["owner"] = user
3789 self._history.add(event, "DELETE", user)
3790 self._remove_event_by_nr(nr)
3791 return
3792 raise MKClientError("No event with id %s" % event_id)
3794 def get_events(self):
3795 return self._events
3797 def get_rule_stats(self):
3798 return sorted(self._rule_stats.iteritems(), key=lambda x: x[0])
3802 # .--Replication---------------------------------------------------------.
3803 # | ____ _ _ _ _ |
3804 # | | _ \ ___ _ __ | (_) ___ __ _| |_(_) ___ _ __ |
3805 # | | |_) / _ \ '_ \| | |/ __/ _` | __| |/ _ \| '_ \ |
3806 # | | _ < __/ |_) | | | (_| (_| | |_| | (_) | | | | |
3807 # | |_| \_\___| .__/|_|_|\___\__,_|\__|_|\___/|_| |_| |
3808 # | |_| |
3809 # +----------------------------------------------------------------------+
3810 # | Functions for doing replication, master and slave parts. |
3811 # '----------------------------------------------------------------------'
3814 def is_replication_slave(config):
3815 repl_settings = config["replication"]
3816 return repl_settings and not repl_settings.get("disabled")
3819 def replication_allow_command(config, command, slave_status):
3820 if is_replication_slave(config) and slave_status["mode"] == "sync" \
3821 and command in ["DELETE", "UPDATE", "CHANGESTATE", "ACTION"]:
3822 raise MKClientError("This command is not allowed on a replication slave "
3823 "while it is in sync mode.")
3826 def replication_send(config, lock_configuration, event_status, last_update):
3827 response = {}
3828 with lock_configuration:
3829 response["status"] = event_status.pack_status()
3830 if last_update < config["last_reload"]:
3831 response["rules"] = config[
3832 "rules"] # Remove one bright day, where legacy rules are not needed anymore
3833 response["rule_packs"] = config["rule_packs"]
3834 response["actions"] = config["actions"]
3835 return response
3838 def replication_pull(settings, config, lock_configuration, perfcounters, event_status, event_server,
3839 slave_status, logger):
3840 # We distinguish two modes:
3841 # 1. slave mode: just pull the current state from the master.
3842 # if the master is not reachable then decide whether to
3843 # switch to takeover mode.
3844 # 2. takeover mode: if automatic fallback is enabled and the
3845 # time frame for that has not yet ellapsed, then try to
3846 # pull the current state from the master. If that is successful
3847 # then switch back to slave mode. If not automatic fallback
3848 # is enabled then simply do nothing.
3849 now = time.time()
3850 repl_settings = config["replication"]
3851 mode = slave_status["mode"]
3852 need_sync = mode == "sync" or (
3853 mode == "takeover" and "fallback" in repl_settings and
3854 (slave_status["last_master_down"] is None or
3855 now - repl_settings["fallback"] < slave_status["last_master_down"]))
3857 if need_sync:
3858 with event_status.lock:
3859 with lock_configuration:
3861 try:
3862 new_state = get_state_from_master(config, slave_status)
3863 replication_update_state(settings, config, event_status, event_server,
3864 new_state)
3865 if repl_settings.get("logging"):
3866 logger.info("Successfully synchronized with master")
3867 slave_status["last_sync"] = now
3868 slave_status["success"] = True
3870 # Fall back to slave mode after successful sync
3871 # (time frame has already been checked)
3872 if mode == "takeover":
3873 if slave_status["last_master_down"] is None:
3874 logger.info("Replication: master reachable for the first time, "
3875 "switching back to slave mode")
3876 slave_status["mode"] = "sync"
3877 else:
3878 logger.info("Replication: master reachable again after %d seconds, "
3879 "switching back to sync mode" %
3880 (now - slave_status["last_master_down"]))
3881 slave_status["mode"] = "sync"
3882 slave_status["last_master_down"] = None
3884 except Exception as e:
3885 logger.warning("Replication: cannot sync with master: %s" % e)
3886 slave_status["success"] = False
3887 if slave_status["last_master_down"] is None:
3888 slave_status["last_master_down"] = now
3890 # Takeover
3891 if "takeover" in repl_settings and mode != "takeover":
3892 if not slave_status["last_sync"]:
3893 if repl_settings.get("logging"):
3894 logger.error(
3895 "Replication: no takeover since master was never reached.")
3896 else:
3897 offline = now - slave_status["last_sync"]
3898 if offline < repl_settings["takeover"]:
3899 if repl_settings.get("logging"):
3900 logger.warning(
3901 "Replication: no takeover yet, still %d seconds to wait" %
3902 (repl_settings["takeover"] - offline))
3903 else:
3904 logger.info(
3905 "Replication: master not reached for %d seconds, taking over!" %
3906 offline)
3907 slave_status["mode"] = "takeover"
3909 save_slave_status(settings, slave_status)
3911 # Compute statistics of the average time needed for a sync
3912 perfcounters.count_time("sync", time.time() - now)
3915 def replication_update_state(settings, config, event_status, event_server, new_state):
3917 # Keep a copy of the masters' rules and actions and also prepare using them
3918 if "rules" in new_state:
3919 save_master_config(settings, new_state)
3920 event_server.compile_rules(new_state["rules"], new_state.get("rule_packs", []))
3921 config["actions"] = new_state["actions"]
3923 # Update to the masters' event state
3924 event_status.unpack_status(new_state["status"])
3927 def save_master_config(settings, new_state):
3928 path = settings.paths.master_config_file.value
3929 path_new = path.parent / (path.name + '.new')
3930 path_new.write_bytes(
3931 repr({
3932 "rules": new_state["rules"],
3933 "rule_packs": new_state["rule_packs"],
3934 "actions": new_state["actions"],
3935 }) + "\n")
3936 path_new.rename(path)
3939 def load_master_config(settings, config, logger):
3940 path = settings.paths.master_config_file.value
3941 try:
3942 config = ast.literal_eval(path.read_bytes())
3943 config["rules"] = config["rules"]
3944 config["rule_packs"] = config.get("rule_packs", [])
3945 config["actions"] = config["actions"]
3946 logger.info("Replication: restored %d rule packs and %d actions from %s" % (len(
3947 config["rule_packs"]), len(config["actions"]), path))
3948 except Exception:
3949 if is_replication_slave(config):
3950 logger.error("Replication: no previously saved master state available")
3953 def get_state_from_master(config, slave_status):
3954 repl_settings = config["replication"]
3955 try:
3956 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3957 sock.settimeout(repl_settings["connect_timeout"])
3958 sock.connect(repl_settings["master"])
3959 sock.sendall(
3960 "REPLICATE %d\n" % (slave_status["last_sync"] and slave_status["last_sync"] or 0))
3961 sock.shutdown(socket.SHUT_WR)
3963 response_text = ""
3964 while True:
3965 chunk = sock.recv(8192)
3966 response_text += chunk
3967 if not chunk:
3968 break
3970 return ast.literal_eval(response_text)
3971 except SyntaxError as e:
3972 raise Exception("Invalid response from event daemon: <pre>%s</pre>" % response_text)
3974 except IOError as e:
3975 raise Exception("Master not responding: %s" % e)
3977 except Exception as e:
3978 raise Exception("Cannot connect to event daemon: %s" % e)
3981 def save_slave_status(settings, slave_status):
3982 settings.paths.slave_status_file.value.write_bytes(repr(slave_status) + "\n")
3985 def default_slave_status_master():
3986 return {
3987 "last_sync": 0,
3988 "last_master_down": None,
3989 "mode": "master",
3990 "average_sync_time": None,
3994 def default_slave_status_sync():
3995 return {
3996 "last_sync": 0,
3997 "last_master_down": None,
3998 "mode": "sync",
3999 "average_sync_time": None,
4003 def update_slave_status(slave_status, settings, config):
4004 path = settings.paths.slave_status_file.value
4005 if is_replication_slave(config):
4006 try:
4007 slave_status.update(ast.literal_eval(path.read_bytes()))
4008 except Exception:
4009 slave_status.update(default_slave_status_sync())
4010 save_slave_status(settings, slave_status)
4011 else:
4012 if path.exists():
4013 path.unlink()
4014 slave_status.update(default_slave_status_master())
4018 # .--Configuration-------------------------------------------------------.
4019 # | ____ __ _ _ _ |
4020 # | / ___|___ _ __ / _(_) __ _ _ _ _ __ __ _| |_(_) ___ _ __ |
4021 # | | | / _ \| '_ \| |_| |/ _` | | | | '__/ _` | __| |/ _ \| '_ \ |
4022 # | | |__| (_) | | | | _| | (_| | |_| | | | (_| | |_| | (_) | | | | |
4023 # | \____\___/|_| |_|_| |_|\__, |\__,_|_| \__,_|\__|_|\___/|_| |_| |
4024 # | |___/ |
4025 # +----------------------------------------------------------------------+
4026 # | Loading of the configuration files |
4027 # '----------------------------------------------------------------------'
4030 def load_configuration(settings, logger, slave_status):
4031 config = cmk.ec.export.load_config(settings)
4033 # If not set by command line, set the log level by configuration
4034 if settings.options.verbosity == 0:
4035 levels = config["log_level"]
4036 logger.setLevel(levels["cmk.mkeventd"])
4037 logger.getChild("EventServer").setLevel(levels["cmk.mkeventd.EventServer"])
4038 if "cmk.mkeventd.EventServer.snmp" in levels:
4039 logger.getChild("EventServer.snmp").setLevel(levels["cmk.mkeventd.EventServer.snmp"])
4040 logger.getChild("EventStatus").setLevel(levels["cmk.mkeventd.EventStatus"])
4041 logger.getChild("StatusServer").setLevel(levels["cmk.mkeventd.StatusServer"])
4042 logger.getChild("lock").setLevel(levels["cmk.mkeventd.lock"])
4044 # Are we a replication slave? Parts of the configuration
4045 # will be overridden by values from the master.
4046 update_slave_status(slave_status, settings, config)
4047 if is_replication_slave(config):
4048 logger.info("Replication: slave configuration, current mode: %s" % slave_status["mode"])
4049 load_master_config(settings, config, logger)
4051 # Create dictionary for actions for easy access
4052 config["action"] = {}
4053 for action in config["actions"]:
4054 config["action"][action["id"]] = action
4056 config["last_reload"] = time.time()
4058 return config
4061 def reload_configuration(settings, logger, lock_configuration, history, event_status, event_server,
4062 status_server, slave_status):
4063 with lock_configuration:
4064 config = load_configuration(settings, logger, slave_status)
4065 history.reload_configuration(config)
4066 event_server.reload_configuration(config)
4068 event_status.reload_configuration(config)
4069 status_server.reload_configuration(config)
4070 logger.info("Reloaded configuration.")
4074 # .--Main----------------------------------------------------------------.
4075 # | __ __ _ |
4076 # | | \/ | __ _(_)_ __ |
4077 # | | |\/| |/ _` | | '_ \ |
4078 # | | | | | (_| | | | | | |
4079 # | |_| |_|\__,_|_|_| |_| |
4080 # | |
4081 # +----------------------------------------------------------------------+
4082 # | Main entry and option parsing |
4083 # '----------------------------------------------------------------------'
4086 def main():
4087 os.unsetenv("LANG")
4088 logger = cmk.utils.log.get_logger("mkeventd")
4089 settings = cmk.ec.settings.settings(cmk.__version__, pathlib.Path(cmk.utils.paths.omd_root),
4090 pathlib.Path(cmk.utils.paths.default_config_dir), sys.argv)
4092 pid_path = None
4093 try:
4094 cmk.utils.log.open_log(sys.stderr)
4095 cmk.utils.log.set_verbosity(settings.options.verbosity)
4097 settings.paths.log_file.value.parent.mkdir(parents=True, exist_ok=True)
4098 if not settings.options.foreground:
4099 cmk.utils.log.open_log(str(settings.paths.log_file.value))
4101 logger.info("-" * 65)
4102 logger.info("mkeventd version %s starting" % cmk.__version__)
4104 slave_status = default_slave_status_master()
4105 config = load_configuration(settings, logger, slave_status)
4106 history = cmk.ec.history.History(settings, config, logger, StatusTableEvents.columns,
4107 StatusTableHistory.columns)
4109 pid_path = settings.paths.pid_file.value
4110 if pid_path.exists():
4111 old_pid = int(pid_path.read_text(encoding='utf-8'))
4112 if process_exists(old_pid):
4113 bail_out(
4114 logger, "Old PID file %s still existing and mkeventd still running with PID %d."
4115 % (pid_path, old_pid))
4116 pid_path.unlink()
4117 logger.info("Removed orphaned PID file %s (process %d not running anymore)." %
4118 (pid_path, old_pid))
4120 # Make sure paths exist
4121 settings.paths.event_pipe.value.parent.mkdir(parents=True, exist_ok=True)
4122 settings.paths.status_file.value.parent.mkdir(parents=True, exist_ok=True)
4124 # First do all things that might fail, before daemonizing
4125 perfcounters = Perfcounters(logger.getChild("lock.perfcounters"))
4126 event_status = EventStatus(settings, config, perfcounters, history,
4127 logger.getChild("EventStatus"))
4128 lock_configuration = ECLock(logger.getChild("lock.configuration"))
4129 event_server = EventServer(
4130 logger.getChild("EventServer"), settings, config, slave_status, perfcounters,
4131 lock_configuration, history, event_status, StatusTableEvents.columns)
4132 terminate_main_event = threading.Event()
4133 status_server = StatusServer(
4134 logger.getChild("StatusServer"), settings, config, slave_status, perfcounters,
4135 lock_configuration, history, event_status, event_server, terminate_main_event)
4137 event_status.load_status(event_server)
4138 event_server.compile_rules(config["rules"], config["rule_packs"])
4140 if not settings.options.foreground:
4141 pid_path.parent.mkdir(parents=True, exist_ok=True)
4142 cmk.utils.daemon.daemonize()
4143 logger.info("Daemonized with PID %d." % os.getpid())
4145 cmk.utils.daemon.lock_with_pid_file(str(pid_path))
4147 # Install signal hander
4148 def signal_handler(signum, stack_frame):
4149 logger.verbose("Got signal %d." % signum)
4150 raise MKSignalException(signum)
4152 signal.signal(signal.SIGHUP, signal_handler)
4153 signal.signal(signal.SIGINT, signal_handler)
4154 signal.signal(signal.SIGQUIT, signal_handler)
4155 signal.signal(signal.SIGTERM, signal_handler)
4157 # Now let's go...
4158 run_eventd(terminate_main_event, settings, config, lock_configuration, history,
4159 perfcounters, event_status, event_server, status_server, slave_status, logger)
4161 # We reach this point, if the server has been killed by
4162 # a signal or hitting Ctrl-C (in foreground mode)
4164 # TODO: Move this cleanup stuff to the classes that are responsible for these resources
4166 # Remove event pipe and drain it, so that we make sure
4167 # that processes (syslog, etc) will not hang when trying
4168 # to write into the pipe.
4169 logger.verbose("Cleaning up event pipe")
4170 pipe = event_server.open_pipe() # Open it
4171 settings.paths.event_pipe.value.unlink() # Remove pipe
4172 drain_pipe(pipe) # Drain any data
4173 os.close(pipe) # Close pipe
4175 logger.verbose("Saving final event state")
4176 event_status.save_status()
4178 logger.verbose("Cleaning up sockets")
4179 settings.paths.unix_socket.value.unlink()
4180 settings.paths.event_socket.value.unlink()
4182 logger.verbose("Output hash stats")
4183 event_server.output_hash_stats()
4185 logger.verbose("Closing fds which might be still open")
4186 for fd in [
4187 settings.options.syslog_udp, settings.options.syslog_tcp,
4188 settings.options.snmptrap_udp
4190 try:
4191 if isinstance(fd, cmk.ec.settings.FileDescriptor):
4192 os.close(fd.value)
4193 except Exception:
4194 pass
4196 logger.info("Successfully shut down.")
4197 sys.exit(0)
4199 except Exception:
4200 if settings.options.debug:
4201 raise
4202 bail_out(logger, traceback.format_exc())
4204 finally:
4205 if pid_path and cmk.utils.store.have_lock(str(pid_path)):
4206 try:
4207 pid_path.unlink()
4208 except OSError:
4209 pass
4212 if __name__ == "__main__":
4213 main()