2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
10 # | Copyright Mathias Kettner 2014 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
27 # TODO: Refactor/document locking. It is not clear when and how to apply
28 # locks or when they are held by which component.
30 # TODO: Refactor events to be handled as objects, e.g. in case when
31 # creating objects. Or at least update the documentation. It is not clear
32 # which fields are mandatory for the events.
48 from typing
import Any
, Dict
, List
, Optional
, Tuple
, Union
# pylint: disable=unused-import
50 import pathlib2
as pathlib
54 import cmk
.utils
.daemon
55 import cmk
.utils
.defines
59 import cmk
.ec
.settings
62 import cmk
.utils
.paths
63 import cmk
.utils
.profile
64 import cmk
.utils
.render
65 import cmk
.utils
.regex
66 import cmk
.utils
.debug
68 # suppress "Cannot find module" error from mypy
69 import livestatus
# type: ignore
72 class SyslogPriority(object):
84 def __init__(self
, value
):
85 super(SyslogPriority
, self
).__init
__()
86 self
.value
= int(value
)
89 return "SyslogPriority(%d)" % self
.value
93 return self
.NAMES
[self
.value
]
95 return "(unknown priority %d)" % self
.value
98 class SyslogFacility(object):
124 31: 'snmptrap', # HACK!
127 def __init__(self
, value
):
128 super(SyslogFacility
, self
).__init
__()
129 self
.value
= int(value
)
132 return "SyslogFacility(%d)" % self
.value
136 return self
.NAMES
[self
.value
]
138 return "(unknown facility %d)" % self
.value
141 # Alas, we often have no clue about the actual encoding, so we have to guess:
142 # Initially we assume UTF-8, but fall back to latin-1 if it didn't work.
143 def decode_from_bytes(string_as_bytes
):
144 # This is just a safeguard if we are inadvertedly called with a Unicode
145 # string. In theory this should never happen, but given the typing chaos in
146 # this script, one never knows. In the Unicode case, Python tries to be
147 # "helpful", but this fails miserably: Calling 'decode' on a Unicode string
148 # implicitly converts it via 'encode("ascii")' to a byte string first, but
149 # this can of course fail and doesn't make sense at all when we immediately
150 # call 'decode' on this byte string again. In a nutshell: The implicit
151 # conversions between str and unicode are a fundamentally broken idea, just
152 # like all implicit things and "helpful" ideas in general. :-P For further
153 # info see e.g. http://nedbatchelder.com/text/unipain.html
154 if isinstance(string_as_bytes
, unicode):
155 return string_as_bytes
158 return string_as_bytes
.decode("utf-8")
160 return string_as_bytes
.decode("latin-1")
163 def scrub_and_decode(s
):
164 return decode_from_bytes(cmk
.ec
.history
.scrub_string(s
))
168 # .--Helper functions----------------------------------------------------.
170 # | | | | | ___| |_ __ ___ _ __ ___ |
171 # | | |_| |/ _ \ | '_ \ / _ \ '__/ __| |
172 # | | _ | __/ | |_) | __/ | \__ \ |
173 # | |_| |_|\___|_| .__/ \___|_| |___/ |
175 # +----------------------------------------------------------------------+
176 # | Various helper functions |
177 # '----------------------------------------------------------------------'
180 class ECLock(object):
181 def __init__(self
, logger
):
182 super(ECLock
, self
).__init
__()
183 self
._logger
= logger
184 self
._lock
= threading
.Lock()
186 def acquire(self
, blocking
=True):
187 self
._logger
.debug("[%s] Trying to acquire lock", threading
.current_thread().name
)
189 ret
= self
._lock
.acquire(blocking
)
191 self
._logger
.debug("[%s] Acquired lock", threading
.current_thread().name
)
193 self
._logger
.debug("[%s] Non-blocking aquire failed", threading
.current_thread().name
)
198 self
._logger
.debug("[%s] Releasing lock", threading
.current_thread().name
)
204 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
206 return False # Do not swallow exceptions
209 class ECServerThread(threading
.Thread
):
212 raise NotImplementedError()
214 def __init__(self
, name
, logger
, settings
, config
, slave_status
, profiling_enabled
,
216 super(ECServerThread
, self
).__init
__(name
=name
)
217 self
.settings
= settings
218 self
._config
= config
219 self
._slave
_status
= slave_status
220 self
._profiling
_enabled
= profiling_enabled
221 self
._profile
_file
= profile_file
222 self
._terminate
_event
= threading
.Event()
223 self
._logger
= logger
226 self
._logger
.info("Starting up")
228 while not self
._terminate
_event
.is_set():
230 with cmk
.utils
.profile
.Profile(
231 enabled
=self
._profiling
_enabled
, profile_file
=str(self
._profile
_file
)):
234 self
._logger
.exception("Exception in %s server" % self
.name
)
235 if self
.settings
.options
.debug
:
239 self
._logger
.info("Terminated")
242 self
._terminate
_event
.set()
245 def terminate(terminate_main_event
, event_server
, status_server
):
246 terminate_main_event
.set()
247 status_server
.terminate()
248 event_server
.terminate()
251 def bail_out(logger
, reason
):
252 logger
.error("FATAL ERROR: %s" % reason
)
256 def process_exists(pid
):
264 def drain_pipe(pipe
):
267 readable
= select
.select([pipe
], [], [], 0.1)[0]
268 except select
.error
as e
:
269 if e
[0] == errno
.EINTR
:
276 data
= os
.read(pipe
, 4096)
280 break # Error while reading
282 break # No data available
285 def match(pattern
, text
, complete
=True):
286 """Performs an EC style matching test of pattern on text
288 Returns False in case of no match or a tuple with the match groups.
289 In case no match group is produced, it returns an empty tuple."""
293 elif isinstance(pattern
, six
.string_types
):
295 return () if pattern
== text
.lower() else False
296 return () if pattern
in text
.lower() else False
298 # Assume compiled regex
299 m
= pattern
.search(text
)
303 # Remove None from result tuples and replace it with empty strings
304 return tuple([g
if g
is not None else '' for g
in groups
])
310 def format_pattern(pat
):
317 # Sorry: this code is dupliated in web/plugins/wato/mkeventd.py
318 def match_ipv4_network(pattern
, ipaddress_text
):
319 network
, network_bits
= parse_ipv4_network(pattern
) # is validated by valuespec
320 if network_bits
== 0:
321 return True # event if ipaddress is empty
323 ipaddress
= parse_ipv4_address(ipaddress_text
)
325 return False # invalid address never matches
327 # first network_bits of network and ipaddress must be
328 # identical. Create a bitmask.
331 bitmask
= bitmask
<< 1
338 return (network
& bitmask
) == (ipaddress
& bitmask
)
341 def parse_ipv4_address(text
):
342 parts
= map(int, text
.split("."))
343 return (parts
[0] << 24) + (parts
[1] << 16) + (parts
[2] << 8) + parts
[3]
346 def parse_ipv4_network(text
):
348 return parse_ipv4_address(text
), 32
350 network_text
, bits_text
= text
.split("/")
351 return parse_ipv4_address(network_text
), int(bits_text
)
354 def replace_groups(text
, origtext
, match_groups
):
355 # replace \0 with text itself. This allows to add information
356 # in front or and the end of a message
357 text
= text
.replace("\\0", origtext
)
359 # Generic replacement with \1, \2, ...
360 match_groups_message
= match_groups
.get("match_groups_message")
361 if isinstance(match_groups_message
, tuple):
362 for nr
, g
in enumerate(match_groups_message
):
363 text
= text
.replace("\\%d" % (nr
+ 1), g
)
365 # Replacement with keyword
367 # $MATCH_GROUPS_MESSAGE_x$
368 # $MATCH_GROUPS_SYSLOG_APPLICATION_x$
369 for key_prefix
, values
in match_groups
.iteritems():
370 if not isinstance(values
, tuple):
373 for idx
, match_value
in enumerate(values
):
374 text
= text
.replace("$%s_%d$" % (key_prefix
.upper(), idx
+ 1), match_value
)
379 class MKSignalException(Exception):
380 def __init__(self
, signum
):
381 Exception.__init
__(self
, "Got signal %d" % signum
)
382 self
._signum
= signum
385 class MKClientError(Exception):
386 def __init__(self
, t
):
387 Exception.__init
__(self
, t
)
391 # .--Timeperiods---------------------------------------------------------.
393 # | |_ _(_)_ __ ___ ___ _ __ ___ _ __(_) ___ __| |___ |
394 # | | | | | '_ ` _ \ / _ \ '_ \ / _ \ '__| |/ _ \ / _` / __| |
395 # | | | | | | | | | | __/ |_) | __/ | | | (_) | (_| \__ \ |
396 # | |_| |_|_| |_| |_|\___| .__/ \___|_| |_|\___/ \__,_|___/ |
398 # +----------------------------------------------------------------------+
399 # | Timeperiods are used in rule conditions |
400 # '----------------------------------------------------------------------'
403 class TimePeriods(object):
404 def __init__(self
, logger
):
405 super(TimePeriods
, self
).__init
__()
406 self
._logger
= logger
408 self
._last
_update
= 0
411 if self
._periods
is not None and int(time
.time()) / 60 == self
._last
_update
:
412 return # only update once a minute
414 table
= livestatus
.LocalConnection().query("GET timeperiods\nColumns: name alias in")
416 for tpname
, alias
, isin
in table
:
417 periods
[tpname
] = (alias
, bool(isin
))
418 self
._periods
= periods
419 self
._last
_update
= int(time
.time()) / 60
420 except Exception as e
:
421 self
._logger
.exception("Cannot update timeperiod information: %s" % e
)
424 def check(self
, tpname
):
426 if not self
._periods
:
427 self
._logger
.warning("no timeperiod information, assuming %s is active" % tpname
)
429 if tpname
not in self
._periods
:
430 self
._logger
.warning("no such timeperiod %s, assuming it is active" % tpname
)
432 return self
._periods
[tpname
][1]
436 # .--Host config---------------------------------------------------------.
438 # | | | | | ___ ___| |_ ___ ___ _ __ / _(_) __ _ |
439 # | | |_| |/ _ \/ __| __| / __/ _ \| '_ \| |_| |/ _` | |
440 # | | _ | (_) \__ \ |_ | (_| (_) | | | | _| | (_| | |
441 # | |_| |_|\___/|___/\__| \___\___/|_| |_|_| |_|\__, | |
443 # +----------------------------------------------------------------------+
444 # | Manages the configuration of the hosts of the local monitoring core. |
445 # | It fetches and caches the information during runtine of the EC. |
446 # '----------------------------------------------------------------------'
449 class HostConfig(object):
450 def __init__(self
, logger
):
451 self
._logger
= logger
454 def initialize(self
):
455 self
._logger
.debug("Initializing host config")
456 self
._event
_host
_to
_host
= {}
458 self
._hosts
_by
_name
= {}
459 self
._hosts
_by
_lower
_name
= {}
460 self
._hosts
_by
_lower
_alias
= {}
461 self
._hosts
_by
_lower
_address
= {}
463 self
._got
_config
_from
_core
= False
465 def get(self
, host_name
, deflt
=None):
466 return self
._hosts
_by
_name
.get(host_name
, deflt
)
468 def get_by_event_host_name(self
, event_host_name
, deflt
=None):
470 self
._update
_from
_core
()
472 self
._logger
.exception("Failed to get host info from core. Try again later.")
476 return self
._event
_host
_to
_host
[event_host_name
]
478 pass # Not cached yet
480 # Note: It is important that we use exactly the same algorithm here as in the core
481 # (enterprise/core/src/World.cc getHostByDesignation)
483 # Host name : Case insensitive equality (host_name =~ %s)
484 # Host alias : Case insensitive equality (host_alias =~ %s)
485 # Host address : Case insensitive equality (host_address =~ %s)
486 low_event_host_name
= event_host_name
.lower()
490 self
._hosts
_by
_lower
_name
, self
._hosts
_by
_lower
_address
, self
._hosts
_by
_lower
_alias
493 host
= search_map
[low_event_host_name
]
498 self
._event
_host
_to
_host
[event_host_name
] = host
501 def _update_from_core(self
):
502 if not self
._has
_core
_config
_reloaded
():
506 self
._logger
.debug("Fetching host config from core")
517 query
= "GET hosts\nColumns: %s" % " ".join(columns
)
518 for host
in livestatus
.LocalConnection().query_table_assoc(query
):
519 self
._hosts
_by
_name
[host
["name"]] = host
521 # Lookup maps to improve performance of host searches
522 self
._hosts
_by
_lower
_name
[host
["name"].lower()] = host
523 self
._hosts
_by
_lower
_alias
[host
["alias"].lower()] = host
524 self
._hosts
_by
_lower
_address
[host
["alias"].lower()] = host
526 self
._logger
.debug("Got %d hosts from core" % len(self
._hosts
_by
_name
))
527 self
._got
_config
_from
_core
= self
._get
_core
_start
_time
()
529 def _has_core_config_reloaded(self
):
530 if not self
._got
_config
_from
_core
:
533 if self
._get
_core
_start
_time
() > self
._got
_config
_from
_core
:
538 def _get_core_start_time(self
):
539 query
= ("GET status\n" "Columns: program_start\n")
540 return livestatus
.LocalConnection().query_value(query
)
544 # .--Perfcounters--------------------------------------------------------.
546 # | | _ \ ___ _ __ / _| ___ ___ _ _ _ __ | |_ ___ _ __ ___ |
547 # | | |_) / _ \ '__| |_ / __/ _ \| | | | '_ \| __/ _ \ '__/ __| |
548 # | | __/ __/ | | _| (_| (_) | |_| | | | | || __/ | \__ \ |
549 # | |_| \___|_| |_| \___\___/ \__,_|_| |_|\__\___|_| |___/ |
551 # +----------------------------------------------------------------------+
552 # | Helper class for performance counting |
553 # '----------------------------------------------------------------------'
557 """Linear interpolation between a and b with weight t"""
558 return (1 - t
) * a
+ t
* b
561 class Perfcounters(object):
572 # Average processing times
574 "processing": 0.99, # event processing
575 "sync": 0.95, # Replication sync
576 "request": 0.95, # Client requests
579 # TODO: Why aren't self._times / self._rates / ... not initialized with their defaults?
580 def __init__(self
, logger
):
581 super(Perfcounters
, self
).__init
__()
582 self
._lock
= ECLock(logger
)
584 # Initialize counters
585 self
._counters
= dict([(n
, 0) for n
in self
._counter
_names
])
587 self
._old
_counters
= {}
589 self
._average
_rates
= {}
591 self
._last
_statistics
= None
593 self
._logger
= logger
.getChild("Perfcounters")
595 def count(self
, counter
):
597 self
._counters
[counter
] += 1
599 def count_time(self
, counter
, ptime
):
601 if counter
in self
._times
:
602 self
._times
[counter
] = lerp(ptime
, self
._times
[counter
], self
._weights
[counter
])
604 self
._times
[counter
] = ptime
606 def do_statistics(self
):
609 if self
._last
_statistics
:
610 duration
= now
- self
._last
_statistics
613 for name
, value
in self
._counters
.iteritems():
615 delta
= value
- self
._old
_counters
[name
]
616 rate
= delta
/ duration
617 self
._rates
[name
] = rate
618 if name
in self
._average
_rates
:
619 # We could make the weight configurable
620 self
._average
_rates
[name
] = lerp(rate
, self
._average
_rates
[name
], 0.9)
622 self
._average
_rates
[name
] = rate
624 self
._last
_statistics
= now
625 self
._old
_counters
= self
._counters
.copy()
628 def status_columns(cls
):
630 # Please note: status_columns() and get_status() need to produce lists with exact same column order
631 for name
in cls
._counter
_names
:
632 columns
.append(("status_" + name
, 0))
633 columns
.append(("status_" + name
.rstrip("s") + "_rate", 0.0))
634 columns
.append(("status_average_" + name
.rstrip("s") + "_rate", 0.0))
636 for name
in cls
._weights
:
637 columns
.append(("status_average_%s_time" % name
, 0.0))
641 def get_status(self
):
644 # Please note: status_columns() and get_status() need to produce lists with exact same column order
645 for name
in self
._counter
_names
:
646 row
.append(self
._counters
[name
])
647 row
.append(self
._rates
.get(name
, 0.0))
648 row
.append(self
._average
_rates
.get(name
, 0.0))
650 for name
in self
._weights
:
651 row
.append(self
._times
.get(name
, 0.0))
657 # .--EventServer---------------------------------------------------------.
659 # | | ____|_ _____ _ __ | |_/ ___| ___ _ ____ _____ _ __ |
660 # | | _| \ \ / / _ \ '_ \| __\___ \ / _ \ '__\ \ / / _ \ '__| |
661 # | | |___ \ V / __/ | | | |_ ___) | __/ | \ V / __/ | |
662 # | |_____| \_/ \___|_| |_|\__|____/ \___|_| \_/ \___|_| |
664 # +----------------------------------------------------------------------+
665 # | Verarbeitung und Klassifizierung von eingehenden Events. |
666 # '----------------------------------------------------------------------'
669 class EventServer(ECServerThread
):
685 def __init__(self
, logger
, settings
, config
, slave_status
, perfcounters
, lock_configuration
,
686 history
, event_status
, event_columns
):
687 super(EventServer
, self
).__init
__(
692 slave_status
=slave_status
,
693 profiling_enabled
=settings
.options
.profile_event
,
694 profile_file
=settings
.paths
.event_server_profile
.value
)
696 self
._syslog
_tcp
= None
697 self
._snmptrap
= None
700 self
._hash
_stats
= []
701 for _unused_facility
in xrange(32):
702 self
._hash
_stats
.append([0] * 8)
704 self
.host_config
= HostConfig(self
._logger
)
705 self
._perfcounters
= perfcounters
706 self
._lock
_configuration
= lock_configuration
707 self
._history
= history
708 self
._event
_status
= event_status
709 self
._event
_columns
= event_columns
710 self
._message
_period
= cmk
.ec
.history
.ActiveHistoryPeriod()
711 self
._rule
_matcher
= RuleMatcher(self
._logger
, config
)
712 self
._event
_creator
= EventCreator(self
._logger
, config
)
715 self
.open_eventsocket()
717 self
.open_syslog_tcp()
719 self
._snmp
_trap
_engine
= cmk
.ec
.snmp
.SNMPTrapEngine(self
.settings
, self
._config
,
720 self
._logger
.getChild("snmp"),
721 self
.handle_snmptrap
)
724 def status_columns(cls
):
725 columns
= cls
._general
_columns
()
726 columns
+= Perfcounters
.status_columns()
727 columns
+= cls
._replication
_columns
()
728 columns
+= cls
._event
_limit
_columns
()
732 def _general_columns(cls
):
734 ("status_config_load_time", 0),
735 ("status_num_open_events", 0),
736 ("status_virtual_memory_size", 0),
740 def _replication_columns(cls
):
742 ("status_replication_slavemode", ""),
743 ("status_replication_last_sync", 0.0),
744 ("status_replication_success", False),
748 def _event_limit_columns(cls
):
750 ("status_event_limit_host", 0),
751 ("status_event_limit_rule", 0),
752 ("status_event_limit_overall", 0),
753 ("status_event_limit_active_hosts", []),
754 ("status_event_limit_active_rules", []),
755 ("status_event_limit_active_overall", False),
758 def get_status(self
):
761 row
+= self
._add
_general
_status
()
762 row
+= self
._perfcounters
.get_status()
763 row
+= self
._add
_replication
_status
()
764 row
+= self
._add
_event
_limit
_status
()
768 def _add_general_status(self
):
770 self
._config
["last_reload"],
771 self
._event
_status
.num_existing_events
,
772 self
._virtual
_memory
_size
(),
775 def _virtual_memory_size(self
):
776 parts
= file('/proc/self/stat').read().split()
777 return int(parts
[22]) # in Bytes
779 def _add_replication_status(self
):
780 if is_replication_slave(self
._config
):
782 self
._slave
_status
["mode"],
783 self
._slave
_status
["last_sync"],
784 self
._slave
_status
["success"],
786 return ["master", 0.0, False]
788 def _add_event_limit_status(self
):
790 self
._config
["event_limit"]["by_host"]["limit"],
791 self
._config
["event_limit"]["by_rule"]["limit"],
792 self
._config
["event_limit"]["overall"]["limit"],
793 self
.get_hosts_with_active_event_limit(),
794 self
.get_rules_with_active_event_limit(),
795 self
.is_overall_event_limit_active(),
798 def create_pipe(self
):
799 path
= self
.settings
.paths
.event_pipe
.value
801 if not path
.is_fifo():
806 if not path
.exists():
809 # We want to be able to receive events from all users on the local system
810 path
.chmod(0o666) # nosec
812 self
._logger
.info("Created FIFO '%s' for receiving events" % path
)
814 def open_syslog(self
):
815 endpoint
= self
.settings
.options
.syslog_udp
817 if isinstance(endpoint
, cmk
.ec
.settings
.FileDescriptor
):
818 self
._syslog
= socket
.fromfd(endpoint
.value
, socket
.AF_INET
, socket
.SOCK_DGRAM
)
819 os
.close(endpoint
.value
)
821 "Opened builtin syslog server on inherited filedescriptor %d" % endpoint
.value
)
822 if isinstance(endpoint
, cmk
.ec
.settings
.PortNumber
):
823 self
._syslog
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
824 self
._syslog
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
825 self
._syslog
.bind(("0.0.0.0", endpoint
.value
))
826 self
._logger
.info("Opened builtin syslog server on UDP port %d" % endpoint
.value
)
827 except Exception as e
:
828 raise Exception("Cannot start builtin syslog server: %s" % e
)
830 def open_syslog_tcp(self
):
831 endpoint
= self
.settings
.options
.syslog_tcp
833 if isinstance(endpoint
, cmk
.ec
.settings
.FileDescriptor
):
834 self
._syslog
_tcp
= socket
.fromfd(endpoint
.value
, socket
.AF_INET
, socket
.SOCK_STREAM
)
835 self
._syslog
_tcp
.listen(20)
836 os
.close(endpoint
.value
)
837 self
._logger
.info("Opened builtin syslog-tcp server on inherited filedescriptor %d"
839 if isinstance(endpoint
, cmk
.ec
.settings
.PortNumber
):
840 self
._syslog
_tcp
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
841 self
._syslog
_tcp
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
842 self
._syslog
_tcp
.bind(("0.0.0.0", endpoint
.value
))
843 self
._syslog
_tcp
.listen(20)
845 "Opened builtin syslog-tcp server on TCP port %d" % endpoint
.value
)
846 except Exception as e
:
847 raise Exception("Cannot start builtin syslog-tcp server: %s" % e
)
849 def open_snmptrap(self
):
850 endpoint
= self
.settings
.options
.snmptrap_udp
852 if isinstance(endpoint
, cmk
.ec
.settings
.FileDescriptor
):
853 self
._snmptrap
= socket
.fromfd(endpoint
.value
, socket
.AF_INET
, socket
.SOCK_DGRAM
)
854 os
.close(endpoint
.value
)
855 self
._logger
.info("Opened builtin snmptrap server on inherited filedescriptor %d" %
857 if isinstance(endpoint
, cmk
.ec
.settings
.PortNumber
):
858 self
._snmptrap
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
859 self
._snmptrap
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
860 self
._snmptrap
.bind(("0.0.0.0", endpoint
.value
))
861 self
._logger
.info("Opened builtin snmptrap server on UDP port %d" % endpoint
.value
)
862 except Exception as e
:
863 raise Exception("Cannot start builtin snmptrap server: %s" % e
)
865 def open_eventsocket(self
):
866 path
= self
.settings
.paths
.event_socket
.value
869 path
.parent
.mkdir(parents
=True, exist_ok
=True)
870 self
._eventsocket
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
871 self
._eventsocket
.bind(str(path
))
873 self
._eventsocket
.listen(self
._config
['eventsocket_queue_len'])
874 self
._logger
.info("Opened UNIX socket '%s' for receiving events" % path
)
877 # Beware: we must open the pipe also for writing. Otherwise
878 # we will see EOF forever after one writer has finished and
879 # select() will trigger even if there is no data. A good article
880 # about this is here:
881 # http://www.outflux.net/blog/archives/2008/03/09/using-select-on-a-fifo/
882 return os
.open(str(self
.settings
.paths
.event_pipe
.value
), os
.O_RDWR | os
.O_NONBLOCK
)
884 def handle_snmptrap(self
, trap
, ipaddress
):
885 self
.process_event(self
._event
_creator
.create_event_from_trap(trap
, ipaddress
))
889 pipe
= self
.open_pipe()
892 # Wait for incoming syslog packets via UDP
893 if self
._syslog
is not None:
894 listen_list
.append(self
._syslog
.fileno())
896 # Wait for new connections for events via TCP socket
897 if self
._syslog
_tcp
is not None:
898 listen_list
.append(self
._syslog
_tcp
)
900 # Wait for new connections for events via unix socket
901 if self
._eventsocket
:
902 listen_list
.append(self
._eventsocket
)
904 # Wait for incomding SNMP traps
905 if self
._snmptrap
is not None:
906 listen_list
.append(self
._snmptrap
.fileno())
908 # Keep list of client connections via UNIX socket and
909 # read data that is not yet processed. Map from
910 # fd to (fileobject, data)
913 while not self
._terminate
_event
.is_set():
915 readable
= select
.select(listen_list
+ client_sockets
.keys(), [], [],
917 except select
.error
as e
:
918 if e
[0] == errno
.EINTR
:
923 # Accept new connection on event unix socket
924 if self
._eventsocket
in readable
:
925 client_socket
, address
= self
._eventsocket
.accept()
926 # pylint: disable=no-member
927 client_sockets
[client_socket
.fileno()] = (client_socket
, address
, "")
929 # Same for the TCP syslog socket
930 if self
._syslog
_tcp
and self
._syslog
_tcp
in readable
:
931 client_socket
, address
= self
._syslog
_tcp
.accept()
932 # pylint: disable=no-member
933 client_sockets
[client_socket
.fileno()] = (client_socket
, address
, "")
935 # Read data from existing event unix socket connections
936 # NOTE: We modify client_socket in the loop, so we need to copy below!
937 for fd
, (cs
, address
, previous_data
) in list(client_sockets
.iteritems()):
939 # Receive next part of data
941 new_data
= cs
.recv(4096)
946 # Put together with incomplete messages from last time
947 data
= previous_data
+ new_data
949 # Do we have incomplete data? (if the socket has been
950 # closed then we consider the pending message always
951 # as complete, even if there was no trailing \n)
952 if new_data
and not data
.endswith("\n"): # keep fragment
953 # Do we have any complete messages?
955 complete
, rest
= data
.rsplit("\n", 1)
956 self
.process_raw_lines(complete
+ "\n", address
)
958 rest
= data
# keep for next time
960 # Only complete messages
963 self
.process_raw_lines(data
, address
)
966 # Connection still open?
968 client_sockets
[fd
] = (cs
, address
, rest
)
971 del client_sockets
[fd
]
973 # Read data from pipe
976 data
= os
.read(pipe
, 4096)
978 # Prepend previous beginning of message to read data
979 data
= pipe_fragment
+ data
982 # Last message still incomplete?
984 if '\n' in data
: # at least one complete message contained
985 messages
, pipe_fragment
= data
.rsplit('\n', 1)
986 self
.process_raw_lines(messages
+ '\n') # got lost in split
988 pipe_fragment
= data
# keep beginning of message, wait for \n
990 self
.process_raw_lines(data
)
993 pipe
= self
.open_pipe()
994 listen_list
[0] = pipe
995 # Pending fragments from previos reads that are not terminated
996 # by a \n are ignored.
998 self
._logger
.warning(
999 "Ignoring incomplete message '%s' from pipe" % pipe_fragment
)
1004 # Read events from builtin syslog server
1005 if self
._syslog
is not None and self
._syslog
.fileno() in readable
:
1006 self
.process_raw_lines(*self
._syslog
.recvfrom(4096))
1008 # Read events from builtin snmptrap server
1009 if self
._snmptrap
is not None and self
._snmptrap
.fileno() in readable
:
1011 message
, sender_address
= self
._snmptrap
.recvfrom(65535)
1012 self
.process_raw_data(lambda: self
._snmp
_trap
_engine
.process_snmptrap(
1013 message
, sender_address
))
1015 self
._logger
.exception(
1016 'Exception handling a SNMP trap from "%s". Skipping this one' %
1020 # process the first spool file we get
1021 spool_file
= next(self
.settings
.paths
.spool_dir
.value
.glob('[!.]*'))
1022 self
.process_raw_lines(spool_file
.read_bytes())
1024 select_timeout
= 0 # enable fast processing to process further files
1025 except StopIteration:
1026 select_timeout
= 1 # restore default select timeout
1028 # Processes incoming data, just a wrapper between the real data and the
1029 # handler function to record some statistics etc.
1030 def process_raw_data(self
, handler
):
1031 self
._perfcounters
.count("messages")
1032 before
= time
.time()
1033 # In replication slave mode (when not took over), ignore all events
1034 if not is_replication_slave(self
._config
) or self
._slave
_status
["mode"] != "sync":
1036 elif self
.settings
.options
.debug
:
1037 self
._logger
.info("Replication: we are in slave mode, ignoring event")
1038 elapsed
= time
.time() - before
1039 self
._perfcounters
.count_time("processing", elapsed
)
1041 # Takes several lines of messages, handles encoding and processes them separated
1042 def process_raw_lines(self
, data
, address
=None):
1043 lines
= data
.splitlines()
1045 line
= scrub_and_decode(line
.rstrip())
1049 def handler(line
=line
):
1050 self
.process_line(line
, address
)
1052 self
.process_raw_data(handler
)
1053 except Exception as e
:
1054 self
._logger
.exception(
1055 'Exception handling a log line (skipping this one): %s' % e
)
1057 def do_housekeeping(self
):
1058 with self
._event
_status
.lock
:
1059 with self
._lock
_configuration
:
1060 self
.hk_handle_event_timeouts()
1061 self
.hk_check_expected_messages()
1062 self
.hk_cleanup_downtime_events()
1063 self
._history
.housekeeping()
1065 # For all events that have been created in a host downtime check the host
1066 # whether or not it is still in downtime. In case the downtime has ended
1067 # archive the events that have been created in a downtime.
1068 def hk_cleanup_downtime_events(self
):
1071 for event
in self
._event
_status
.events():
1072 if not event
["host_in_downtime"]:
1073 continue # only care about events created in downtime
1076 in_downtime
= host_downtimes
[event
["core_host"]]
1078 in_downtime
= self
._is
_host
_in
_downtime
(event
)
1079 host_downtimes
[event
["core_host"]] = in_downtime
1082 continue # (still) in downtime, don't delete any event
1084 self
._logger
.verbose(
1085 "Remove event %d (created in downtime, host left downtime)" % event
["id"])
1086 self
._event
_status
.remove_event(event
)
1088 def hk_handle_event_timeouts(self
):
1089 # 1. Automatically delete all events that are in state "counting"
1090 # and have not reached the required number of hits and whose
1092 # 2. Automatically delete all events that are in state "open"
1093 # and whose livetime is elapsed.
1094 events_to_delete
= []
1095 events
= self
._event
_status
.events()
1097 for nr
, event
in enumerate(events
):
1098 rule
= self
._rule
_by
_id
.get(event
["rule_id"])
1100 if event
["phase"] == "counting":
1101 # Event belongs to a rule that does not longer exist? It
1102 # will never reach its count. Better delete it.
1104 self
._logger
.info("Deleting orphaned event %d created by obsolete rule %s" %
1105 (event
["id"], event
["rule_id"]))
1106 event
["phase"] = "closed"
1107 self
._history
.add(event
, "ORPHANED")
1108 events_to_delete
.append(nr
)
1110 elif "count" not in rule
and "expect" not in rule
:
1112 "Count-based event %d belonging to rule %s: rule does not "
1113 "count/expect anymore. Deleting event." % (event
["id"], event
["rule_id"]))
1114 event
["phase"] = "closed"
1115 self
._history
.add(event
, "NOCOUNT")
1116 events_to_delete
.append(nr
)
1119 elif "count" in rule
:
1120 count
= rule
["count"]
1121 if count
.get("algorithm") in ["tokenbucket", "dynabucket"]:
1122 last_token
= event
.get("last_token", event
["first"])
1123 secs_per_token
= count
["period"] / float(count
["count"])
1124 if count
["algorithm"] == "dynabucket": # get fewer tokens if count is lower
1125 if event
["count"] <= 1:
1126 secs_per_token
= count
["period"]
1128 secs_per_token
*= (float(count
["count"]) / float(event
["count"]))
1129 elapsed_secs
= now
- last_token
1130 new_tokens
= int(elapsed_secs
/ secs_per_token
)
1132 if self
.settings
.options
.debug
:
1134 "Rule %s/%s, event %d: got %d new tokens" %
1135 (rule
["pack"], rule
["id"], event
["id"], new_tokens
))
1136 event
["count"] = max(0, event
["count"] - new_tokens
)
1138 "last_token"] = last_token
+ new_tokens
* secs_per_token
# not now! would be unfair
1139 if event
["count"] == 0:
1141 "Rule %s/%s, event %d: again without allowed rate, dropping event"
1142 % (rule
["pack"], rule
["id"], event
["id"]))
1143 event
["phase"] = "closed"
1144 self
._history
.add(event
, "COUNTFAILED")
1145 events_to_delete
.append(nr
)
1147 else: # algorithm 'interval'
1148 if event
["first"] + count
["period"] <= now
: # End of period reached
1150 "Rule %s/%s: reached only %d out of %d events within %d seconds. "
1151 "Resetting to zero." % (rule
["pack"], rule
["id"], event
["count"],
1152 count
["count"], count
["period"]))
1153 event
["phase"] = "closed"
1154 self
._history
.add(event
, "COUNTFAILED")
1155 events_to_delete
.append(nr
)
1157 # Handle delayed actions
1158 elif event
["phase"] == "delayed":
1159 delay_until
= event
.get("delay_until", 0) # should always be present
1160 if now
>= delay_until
:
1161 self
._logger
.info("Delayed event %d of rule %s is now activated." %
1162 (event
["id"], event
["rule_id"]))
1163 event
["phase"] = "open"
1164 self
._history
.add(event
, "DELAYOVER")
1166 cmk
.ec
.actions
.event_has_opened(self
._history
, self
.settings
, self
._config
,
1167 self
._logger
, self
, self
._event
_columns
,
1169 if rule
.get("autodelete"):
1170 event
["phase"] = "closed"
1171 self
._history
.add(event
, "AUTODELETE")
1172 events_to_delete
.append(nr
)
1175 self
._logger
.info("Cannot do rule action: rule %s not present anymore." %
1178 # Handle events with a limited lifetime
1179 elif "live_until" in event
:
1180 if now
>= event
["live_until"]:
1181 allowed_phases
= event
.get("live_until_phases", ["open"])
1182 if event
["phase"] in allowed_phases
:
1183 event
["phase"] = "closed"
1184 events_to_delete
.append(nr
)
1185 self
._logger
.info("Livetime of event %d (rule %s) exceeded. Deleting event."
1186 % (event
["id"], event
["rule_id"]))
1187 self
._history
.add(event
, "EXPIRED")
1189 # Do delayed deletion now (was delayed in order to keep list indices OK)
1190 for nr
in events_to_delete
[::-1]:
1191 self
._event
_status
.remove_event(events
[nr
])
1193 def hk_check_expected_messages(self
):
1195 # "Expecting"-rules are rules that require one or several
1196 # occurrances of a message within a defined time period.
1197 # Whenever one period of time has elapsed, we need to check
1198 # how many messages have been seen for that rule. If these
1199 # are too few, we open an event.
1200 # We need to handle to cases:
1201 # 1. An event for such a rule already exists and is
1202 # in the state "counting" -> this can only be the case if
1203 # more than one occurrance is required.
1204 # 2. No event at all exists.
1206 for rule
in self
._rules
:
1207 if "expect" in rule
:
1209 if not self
._rule
_matcher
.event_rule_matches_site(rule
, event
=None):
1212 # Interval is either a number of seconds, or pair of a number of seconds
1213 # (e.g. 86400, meaning one day) and a timezone offset relative to UTC in hours.
1214 interval
= rule
["expect"]["interval"]
1215 expected_count
= rule
["expect"]["count"]
1217 interval_start
= self
._event
_status
.interval_start(rule
["id"], interval
)
1218 if interval_start
>= now
:
1221 next_interval_start
= self
._event
_status
.next_interval_start(
1222 interval
, interval_start
)
1223 if next_interval_start
> now
:
1226 # Interval has been elapsed. Now comes the truth: do we have enough
1229 # First do not forget to switch to next interval
1230 self
._event
_status
.start_next_interval(rule
["id"], interval
)
1232 # First look for case 1: rule that already have at least one hit
1233 # and this events in the state "counting" exist.
1234 events_to_delete
= []
1235 events
= self
._event
_status
.events()
1236 for nr
, event
in enumerate(events
):
1237 if event
["rule_id"] == rule
["id"] and event
["phase"] == "counting":
1238 # time has elapsed. Now lets see if we have reached
1239 # the neccessary count:
1240 if event
["count"] < expected_count
: # no -> trigger alarm
1241 self
._handle
_absent
_event
(rule
, event
["count"], expected_count
,
1243 else: # yes -> everything is fine. Just log.
1245 "Rule %s/%s has reached %d occurrances (%d required). "
1246 "Starting next period." % (rule
["pack"], rule
["id"], event
["count"],
1248 self
._history
.add(event
, "COUNTREACHED")
1249 # Counting event is no longer needed.
1250 events_to_delete
.append(nr
)
1253 # Ou ou, no event found at all.
1255 self
._handle
_absent
_event
(rule
, 0, expected_count
, interval_start
)
1257 for nr
in events_to_delete
[::-1]:
1258 self
._event
_status
.remove_event(events
[nr
])
1260 def _handle_absent_event(self
, rule
, event_count
, expected_count
, interval_start
):
1263 text
= "Expected message arrived only %d out of %d times since %s" % \
1264 (event_count
, expected_count
, time
.strftime("%F %T", time
.localtime(interval_start
)))
1266 text
= "Expected message did not arrive since %s" % \
1267 time
.strftime("%F %T", time
.localtime(interval_start
))
1269 # If there is already an incidence about this absent message, we can merge and
1270 # not create a new event. There is a setting for this.
1272 merge
= rule
["expect"].get("merge", "open")
1273 if merge
!= "never":
1274 for event
in self
._event
_status
.events():
1275 if event
["rule_id"] == rule
["id"] and \
1276 (event
["phase"] == "open" or
1277 (event
["phase"] == "ack" and merge
== "acked")):
1282 merge_event
["last"] = now
1283 merge_event
["count"] += 1
1284 merge_event
["phase"] = "open"
1285 merge_event
["time"] = now
1286 merge_event
["text"] = text
1287 # Better rewrite (again). Rule might have changed. Also we have changed
1288 # the text and the user might have his own text added via set_text.
1289 self
.rewrite_event(rule
, merge_event
, {}, set_first
=False)
1290 self
._history
.add(merge_event
, "COUNTFAILED")
1292 # Create artifical event from scratch. Make sure that all important
1293 # fields are defined.
1295 "rule_id": rule
["id"],
1308 "facility": 1, # user
1310 "match_groups_syslog_application": (),
1312 "host_in_downtime": False,
1314 self
._add
_rule
_contact
_groups
_to
_event
(rule
, event
)
1315 self
.rewrite_event(rule
, event
, {})
1316 self
._event
_status
.new_event(event
)
1317 self
._history
.add(event
, "COUNTFAILED")
1318 cmk
.ec
.actions
.event_has_opened(self
._history
, self
.settings
, self
._config
,
1319 self
._logger
, self
, self
._event
_columns
, rule
, event
)
1320 if rule
.get("autodelete"):
1321 event
["phase"] = "closed"
1322 self
._history
.add(event
, "AUTODELETE")
1323 self
._event
_status
.remove_event(event
)
1325 def reload_configuration(self
, config
):
1326 self
._config
= config
1327 self
._snmp
_trap
_engine
= cmk
.ec
.snmp
.SNMPTrapEngine(self
.settings
, self
._config
,
1328 self
._logger
.getChild("snmp"),
1329 self
.handle_snmptrap
)
1330 self
.compile_rules(self
._config
["rules"], self
._config
["rule_packs"])
1331 self
.host_config
.initialize()
1333 # Precompile regular expressions and similar stuff. Also convert legacy
1334 # "rules" parameter into new "rule_packs" parameter
1335 def compile_rules(self
, legacy_rules
, rule_packs
):
1337 self
._rule
_by
_id
= {}
1338 self
._rule
_hash
= {} # Speedup-Hash for rule execution
1341 count_unspecific
= 0
1343 # Loop through all rule packages and with through their rules
1344 for rule_pack
in rule_packs
:
1345 if rule_pack
["disabled"]:
1346 count_disabled
+= len(rule_pack
["rules"])
1349 for rule
in rule_pack
["rules"]:
1350 if rule
.get("disabled"):
1354 rule
= rule
.copy() # keep original intact because of slave replication
1356 # Store information about rule pack right within the rule. This is needed
1357 # for debug output and also for skipping rule packs
1358 rule
["pack"] = rule_pack
["id"]
1359 self
._rules
.append(rule
)
1360 self
._rule
_by
_id
[rule
["id"]] = rule
1363 "match", "match_ok", "match_host", "match_application",
1364 "cancel_application"
1367 value
= self
._compile
_matching
_value
(key
, rule
[key
])
1374 if 'state' in rule
and isinstance(rule
['state'], tuple) \
1375 and rule
['state'][0] == 'text_pattern':
1376 for key
in ['2', '1', '0']:
1377 if key
in rule
['state'][1]:
1378 value
= self
._compile
_matching
_value
(
1379 'state', rule
['state'][1][key
])
1381 del rule
['state'][1][key
]
1383 rule
['state'][1][key
] = value
1385 except Exception as e
:
1386 if self
.settings
.options
.debug
:
1388 rule
["disabled"] = True
1390 self
._logger
.exception(
1391 "Ignoring rule '%s/%s' because of an invalid regex (%s)." %
1392 (rule
["pack"], rule
["id"], e
))
1394 if self
._config
["rule_optimizer"]:
1395 self
.hash_rule(rule
)
1396 if "match_facility" not in rule \
1397 and "match_priority" not in rule \
1398 and "cancel_priority" not in rule \
1399 and "cancel_application" not in rule
:
1400 count_unspecific
+= 1
1403 "Compiled %d active rules (ignoring %d disabled rules)" % (count_rules
, count_disabled
))
1404 if self
._config
["rule_optimizer"]:
1405 self
._logger
.info("Rule hash: %d rules - %d hashed, %d unspecific" % (len(
1406 self
._rules
), len(self
._rules
) - count_unspecific
, count_unspecific
))
1407 for facility
in range(23) + [31]:
1408 if facility
in self
._rule
_hash
:
1410 for prio
, entries
in self
._rule
_hash
[facility
].iteritems():
1411 stats
.append("%s(%d)" % (SyslogPriority(prio
), len(entries
)))
1412 self
._logger
.info(" %-12s: %s" % (SyslogFacility(facility
), " ".join(stats
)))
1415 def _compile_matching_value(key
, val
):
1417 # Remove leading .* from regex. This is redundant and
1418 # dramatically destroys performance when doing an infix search.
1419 if key
in ["match", "match_ok"]:
1420 while value
.startswith(".*") and not value
.startswith(".*?"):
1426 if cmk
.utils
.regex
.is_regex(value
):
1427 return re
.compile(value
, re
.IGNORECASE
)
1430 def hash_rule(self
, rule
):
1431 # Construct rule hash for faster execution.
1432 facility
= rule
.get("match_facility")
1433 if facility
and not rule
.get("invert_matching"):
1434 self
.hash_rule_facility(rule
, facility
)
1436 for facility
in xrange(32): # all syslog facilities
1437 self
.hash_rule_facility(rule
, facility
)
1439 def hash_rule_facility(self
, rule
, facility
):
1440 needed_prios
= [False] * 8
1441 for key
in ["match_priority", "cancel_priority"]:
1443 prio_from
, prio_to
= rule
[key
]
1444 # Beware: from > to!
1445 for p
in xrange(prio_to
, prio_from
+ 1):
1446 needed_prios
[p
] = True
1447 elif key
== "match_priority": # all priorities match
1448 needed_prios
= [True] * 8 # needed to check this rule for all event priorities
1449 elif "match_ok" in rule
: # a cancelling rule where all priorities cancel
1450 needed_prios
= [True] * 8 # needed to check this rule for all event priorities
1452 if rule
.get("invert_matching"):
1453 needed_prios
= [True] * 8
1455 prio_hash
= self
._rule
_hash
.setdefault(facility
, {})
1456 for prio
, need
in enumerate(needed_prios
):
1458 prio_hash
.setdefault(prio
, []).append(rule
)
1460 def output_hash_stats(self
):
1461 self
._logger
.info("Top 20 of facility/priority:")
1464 for facility
in xrange(32):
1465 for priority
in xrange(8):
1466 count
= self
._hash
_stats
[facility
][priority
]
1468 total_count
+= count
1469 entries
.append((count
, (facility
, priority
)))
1472 for count
, (facility
, priority
) in entries
[:20]:
1473 self
._logger
.info(" %s/%s - %d (%.2f%%)" % (SyslogFacility(facility
),
1474 SyslogPriority(priority
), count
,
1475 (100.0 * count
/ float(total_count
))))
1477 def process_line(self
, line
, address
):
1478 line
= line
.rstrip()
1479 if self
._config
["debug_rules"]:
1481 self
._logger
.info(u
"Processing message from %r: '%s'" % (address
, line
))
1483 self
._logger
.info(u
"Processing message '%s'" % line
)
1485 event
= self
._event
_creator
.create_event_from_line(line
, address
)
1486 self
.process_event(event
)
1488 def process_event(self
, event
):
1489 self
.do_translate_hostname(event
)
1491 # Log all incoming messages into a syslog-like text file if that is enabled
1492 if self
._config
["log_messages"]:
1493 self
.log_message(event
)
1496 if self
._config
["rule_optimizer"]:
1497 self
._hash
_stats
[event
["facility"]][event
["priority"]] += 1
1498 rule_candidates
= self
._rule
_hash
.get(event
["facility"], {}).get(event
["priority"], [])
1500 rule_candidates
= self
._rules
1503 for rule
in rule_candidates
:
1504 if skip_pack
and rule
["pack"] == skip_pack
:
1505 continue # still in the rule pack that we want to skip
1506 skip_pack
= None # new pack, reset skipping
1509 result
= self
.event_rule_matches(rule
, event
)
1510 except Exception as e
:
1511 self
._logger
.exception(' Exception during matching:\n%s' % e
)
1514 if result
: # A tuple with (True/False, {match_info}).. O.o
1515 self
._perfcounters
.count("rule_hits")
1516 cancelling
, match_groups
= result
1518 if self
._config
["debug_rules"]:
1519 self
._logger
.info(" matching groups:\n%s" % pprint
.pformat(match_groups
))
1521 self
._event
_status
.count_rule_match(rule
["id"])
1522 if self
._config
["log_rulehits"]:
1523 self
._logger
.info("Rule '%s/%s' hit by message %s/%s - '%s'." %
1524 (rule
["pack"], rule
["id"], SyslogFacility(event
["facility"]),
1525 SyslogPriority(event
["priority"]), event
["text"]))
1527 if rule
.get("drop"):
1528 if rule
["drop"] == "skip_pack":
1529 skip_pack
= rule
["pack"]
1530 if self
._config
["debug_rules"]:
1531 self
._logger
.info(" skipping this rule pack (%s)" % skip_pack
)
1534 self
._perfcounters
.count("drops")
1538 self
._event
_status
.cancel_events(self
, self
._event
_columns
, event
, match_groups
,
1542 # Remember the rule id that this event originated from
1543 event
["rule_id"] = rule
["id"]
1545 # Lookup the monitoring core hosts and add the core host
1546 # name to the event when one can be matched
1547 # For the moment we have no rule/condition matching on this
1548 # field. So we only add the core host info for matched events.
1549 self
._add
_core
_host
_to
_new
_event
(event
)
1551 # Attach optional contact group information for visibility
1552 # and eventually for notifications
1553 self
._add
_rule
_contact
_groups
_to
_event
(rule
, event
)
1555 # Store groups from matching this event. In order to make
1556 # persistence easier, we do not safe them as list but join
1558 event
["match_groups"] = match_groups
.get("match_groups_message", ())
1559 event
["match_groups_syslog_application"] = match_groups
.get(
1560 "match_groups_syslog_application", ())
1561 self
.rewrite_event(rule
, event
, match_groups
)
1564 count
= rule
["count"]
1565 # Check if a matching event already exists that we need to
1566 # count up. If the count reaches the limit, the event will
1567 # be opened and its rule actions performed.
1569 self
._event
_status
.count_event(self
, event
, rule
, count
)
1572 if self
._config
["debug_rules"]:
1573 self
._logger
.info("Event opening will be delayed for %d seconds"
1575 existing_event
["delay_until"] = time
.time() + rule
["delay"]
1576 existing_event
["phase"] = "delayed"
1578 cmk
.ec
.actions
.event_has_opened(
1579 self
._history
, self
.settings
, self
._config
, self
._logger
, self
,
1580 self
._event
_columns
, rule
, existing_event
)
1582 self
._history
.add(existing_event
, "COUNTREACHED")
1584 if "delay" not in rule
and rule
.get("autodelete"):
1585 existing_event
["phase"] = "closed"
1586 self
._history
.add(existing_event
, "AUTODELETE")
1587 with self
._event
_status
.lock
:
1588 self
._event
_status
.remove_event(existing_event
)
1589 elif "expect" in rule
:
1590 self
._event
_status
.count_expected_event(self
, event
)
1593 if self
._config
["debug_rules"]:
1595 "Event opening will be delayed for %d seconds" % rule
["delay"])
1596 event
["delay_until"] = time
.time() + rule
["delay"]
1597 event
["phase"] = "delayed"
1599 event
["phase"] = "open"
1601 if self
.new_event_respecting_limits(event
):
1602 if event
["phase"] == "open":
1603 cmk
.ec
.actions
.event_has_opened(self
._history
, self
.settings
,
1604 self
._config
, self
._logger
, self
,
1605 self
._event
_columns
, rule
, event
)
1606 if rule
.get("autodelete"):
1607 event
["phase"] = "closed"
1608 self
._history
.add(event
, "AUTODELETE")
1609 with self
._event
_status
.lock
:
1610 self
._event
_status
.remove_event(event
)
1613 # End of loop over rules.
1614 if self
._config
["archive_orphans"]:
1615 self
._event
_status
.archive_event(event
)
1617 def _add_rule_contact_groups_to_event(self
, rule
, event
):
1618 if rule
.get("contact_groups") is None:
1620 "contact_groups": None,
1621 "contact_groups_notify": False,
1622 "contact_groups_precedence": "host",
1626 "contact_groups": rule
["contact_groups"]["groups"],
1627 "contact_groups_notify": rule
["contact_groups"]["notify"],
1628 "contact_groups_precedence": rule
["contact_groups"]["precedence"],
1631 def add_core_host_to_event(self
, event
):
1632 matched_host
= self
.host_config
.get_by_event_host_name(event
["host"])
1633 if not matched_host
:
1634 event
["core_host"] = ""
1637 event
["core_host"] = matched_host
["name"]
1639 def _add_core_host_to_new_event(self
, event
):
1640 self
.add_core_host_to_event(event
)
1642 # Add some state dependent information (like host is in downtime etc.)
1643 event
["host_in_downtime"] = self
._is
_host
_in
_downtime
(event
)
1645 def _is_host_in_downtime(self
, event
):
1646 if not event
["core_host"]:
1647 return False # Found no host in core: Not in downtime!
1649 query
= ("GET hosts\n"
1650 "Columns: scheduled_downtime_depth\n"
1651 "Filter: host_name = %s\n" % (event
["core_host"]))
1654 return livestatus
.LocalConnection().query_value(query
) >= 1
1656 except livestatus
.MKLivestatusNotFoundError
:
1660 if cmk
.utils
.debug
.enabled():
1664 # Checks if an event matches a rule. Returns either False (no match)
1665 # or a pair of matchtype, groups, where matchtype is False for a
1666 # normal match and True for a cancelling match and the groups is a tuple
1667 # if matched regex groups in either text (normal) or match_ok (cancelling)
1669 def event_rule_matches(self
, rule
, event
):
1670 self
._perfcounters
.count("rule_tries")
1671 with self
._lock
_configuration
:
1672 result
= self
._rule
_matcher
.event_rule_matches_non_inverted(rule
, event
)
1673 if rule
.get("invert_matching"):
1676 if self
._config
["debug_rules"]:
1678 " Rule would not match, but due to inverted matching does.")
1681 if self
._config
["debug_rules"]:
1683 " Rule would match, but due to inverted matching does not.")
1687 # Rewrite texts and compute other fields in the event
1688 def rewrite_event(self
, rule
, event
, groups
, set_first
=True):
1689 if rule
["state"] == -1:
1690 prio
= event
["priority"]
1697 elif isinstance(rule
["state"], tuple) and rule
["state"][0] == "text_pattern":
1698 for key
in ['2', '1', '0', '3']:
1699 if key
in rule
["state"][1]:
1700 match_groups
= match(rule
["state"][1][key
], event
["text"], complete
=False)
1701 if match_groups
is not False:
1702 event
["state"] = int(key
)
1704 elif key
== '3': # No rule matched!
1707 event
["state"] = rule
["state"]
1709 if ("sl" not in event
) or (rule
["sl"]["precedence"] == "rule"):
1710 event
["sl"] = rule
["sl"]["value"]
1712 event
["first"] = event
["time"]
1713 event
["last"] = event
["time"]
1714 if "set_comment" in rule
:
1715 event
["comment"] = replace_groups(rule
["set_comment"], event
["text"], groups
)
1716 if "set_text" in rule
:
1717 event
["text"] = replace_groups(rule
["set_text"], event
["text"], groups
)
1718 if "set_host" in rule
:
1719 event
["orig_host"] = event
["host"]
1720 event
["host"] = replace_groups(rule
["set_host"], event
["host"], groups
)
1721 if "set_application" in rule
:
1722 event
["application"] = replace_groups(rule
["set_application"], event
["application"],
1724 if "set_contact" in rule
and "contact" not in event
:
1725 event
["contact"] = replace_groups(rule
["set_contact"], event
.get("contact", ""), groups
)
1727 # Translate a hostname if this is configured. We are
1728 # *really* sorry: this code snipped is copied from modules/check_mk_base.py.
1729 # There is still no common library. Please keep this in sync with the
1731 def translate_hostname(self
, backedhost
):
1732 translation
= self
._config
["hostname_translation"]
1734 # Here comes the original code from modules/check_mk_base.py
1736 # 1. Case conversion
1737 caseconf
= translation
.get("case")
1738 if caseconf
== "upper":
1739 backedhost
= backedhost
.upper()
1740 elif caseconf
== "lower":
1741 backedhost
= backedhost
.lower()
1743 # 2. Drop domain part (not applied to IP addresses!)
1744 if translation
.get("drop_domain") and backedhost
:
1745 # only apply if first part does not convert successfully into an int
1746 firstpart
= backedhost
.split(".", 1)[0]
1750 backedhost
= firstpart
1752 # 3. Regular expression conversion
1753 if "regex" in translation
:
1754 for regex
, subst
in translation
["regex"]:
1755 if not regex
.endswith('$'):
1757 rcomp
= cmk
.utils
.regex
.regex(regex
)
1758 mo
= rcomp
.match(backedhost
)
1761 for nr
, text
in enumerate(mo
.groups()):
1762 backedhost
= backedhost
.replace("\\%d" % (nr
+ 1), text
)
1765 # 4. Explicity mapping
1766 for from_host
, to_host
in translation
.get("mapping", []):
1767 if from_host
== backedhost
:
1768 backedhost
= to_host
1773 def do_translate_hostname(self
, event
):
1775 event
["host"] = self
.translate_hostname(event
["host"])
1776 except Exception as e
:
1777 if self
._config
["debug_rules"]:
1778 self
._logger
.exception('Unable to parse host "%s" (%s)' % (event
.get("host"), e
))
1781 def log_message(self
, event
):
1783 with cmk
.ec
.history
.get_logfile(self
._config
, self
.settings
.paths
.messages_dir
.value
,
1784 self
._message
_period
).open(mode
='ab') as f
:
1785 f
.write("%s %s %s%s: %s\n" % (time
.strftime("%b %d %H:%M:%S",
1786 time
.localtime(event
["time"])),
1787 event
["host"], event
["application"], event
["pid"] and
1788 ("[%s]" % event
["pid"]) or "", event
["text"]))
1790 if self
.settings
.options
.debug
:
1792 # Better silently ignore errors. We could have run out of
1793 # diskspace and make things worse by logging that we could
1796 def get_hosts_with_active_event_limit(self
):
1798 for hostname
, num_existing_events
in self
._event
_status
.num_existing_events_by_host
.iteritems(
1800 if num_existing_events
>= self
._config
["event_limit"]["by_host"]["limit"]:
1801 hosts
.append(hostname
)
1804 def get_rules_with_active_event_limit(self
):
1806 for rule_id
, num_existing_events
in self
._event
_status
.num_existing_events_by_rule
.iteritems(
1809 continue # Ignore rule unrelated overflow events. They have no rule id associated.
1810 if num_existing_events
>= self
._config
["event_limit"]["by_rule"]["limit"]:
1811 rule_ids
.append(rule_id
)
1814 def is_overall_event_limit_active(self
):
1815 return self
._event
_status
.num_existing_events \
1816 >= self
._config
["event_limit"]["overall"]["limit"]
1818 # protected by self._event_status.lock
1819 def new_event_respecting_limits(self
, event
):
1820 self
._logger
.verbose(
1821 "Checking limit for message from %s (rule '%s')" % (event
["host"], event
["rule_id"]))
1823 with self
._event
_status
.lock
:
1824 if self
._handle
_event
_limit
("overall", event
):
1827 if self
._handle
_event
_limit
("by_host", event
):
1830 if self
._handle
_event
_limit
("by_rule", event
):
1833 self
._event
_status
.new_event(event
)
1836 # The following actions can be configured:
1837 # stop Stop creating new events
1838 # stop_overflow Stop creating new events, create overflow event
1839 # stop_overflow_notify Stop creating new events, create overflow event, notfy
1840 # delete_oldest Delete oldest event, create new event
1841 # protected by self._event_status.lock
1843 # Returns False if the event has been created and actions should be
1844 # performed on that event
1845 def _handle_event_limit(self
, ty
, event
):
1846 assert ty
in ["overall", "by_rule", "by_host"]
1848 num_already_open
= self
._event
_status
.get_num_existing_events_by(ty
, event
)
1849 limit
, action
= self
._get
_event
_limit
(ty
, event
)
1850 self
._logger
.verbose(
1851 " Type: %s, already open events: %d, Limit: %d" % (ty
, num_already_open
, limit
))
1853 # Limit not reached: add new event
1854 if num_already_open
< limit
:
1855 num_already_open
+= 1 # after adding this event
1857 # Limit even then still not reached: we are fine
1858 if num_already_open
< limit
:
1861 # Delete oldest messages if that is the configure method of keeping the limit
1862 if action
== "delete_oldest":
1863 while num_already_open
> limit
:
1864 self
._perfcounters
.count("overflows")
1865 self
._event
_status
.remove_oldest_event(ty
, event
)
1866 num_already_open
-= 1
1869 # Limit reached already in the past: Simply drop silently
1870 if num_already_open
> limit
:
1871 # Just log in verbose mode! Otherwise log file will be flooded
1872 self
._logger
.verbose(" Skip processing because limit is already in effect")
1873 self
._perfcounters
.count("overflows")
1874 return True # Prevent creation and prevent one time actions (below)
1876 self
._logger
.info(" The %s limit has been reached" % ty
)
1878 # This is the event which reached the limit, allow creation of it. Further
1879 # events will be stopped.
1881 # Perform one time actions
1882 overflow_event
= self
._create
_overflow
_event
(ty
, event
, limit
)
1884 if "overflow" in action
:
1885 self
._logger
.info(" Creating overflow event")
1886 self
._event
_status
.new_event(overflow_event
)
1888 if "notify" in action
:
1889 self
._logger
.info(" Creating overflow notification")
1890 cmk
.ec
.actions
.do_notify(self
, self
._logger
, overflow_event
)
1894 # protected by self._event_status.lock
1895 def _get_event_limit(self
, ty
, event
):
1896 # Prefer the rule individual limit for by_rule limit (in case there is some)
1898 rule_limit
= self
._rule
_by
_id
[event
["rule_id"]].get("event_limit")
1900 return rule_limit
["limit"], rule_limit
["action"]
1902 # Prefer the host individual limit for by_host limit (in case there is some)
1904 host_config
= self
.host_config
.get(event
["core_host"], {})
1905 host_limit
= host_config
.get("custom_variables", {}).get("EC_EVENT_LIMIT")
1907 limit
, action
= host_limit
.split(":", 1)
1908 return int(limit
), action
1910 limit
= self
._config
["event_limit"][ty
]["limit"]
1911 action
= self
._config
["event_limit"][ty
]["action"]
1913 return limit
, action
1915 def _create_overflow_event(self
, ty
, event
, limit
):
1927 "application": "Event Console",
1929 "priority": 2, # crit
1930 "facility": 1, # user
1932 "match_groups_syslog_application": (),
1936 "host_in_downtime": False,
1938 self
._add
_rule
_contact
_groups
_to
_event
({}, new_event
)
1941 new_event
["text"] = ("The overall event limit of %d open events has been reached. Not "
1942 "opening any additional event until open events have been "
1943 "archived." % limit
)
1945 elif ty
== "by_host":
1947 "host": event
["host"],
1948 "ipaddress": event
["ipaddress"],
1949 "text": ("The host event limit of %d open events has been reached for host \"%s\". "
1950 "Not opening any additional event for this host until open events have "
1951 "been archived." % (limit
, event
["host"]))
1954 # Lookup the monitoring core hosts and add the core host
1955 # name to the event when one can be matched
1956 self
._add
_core
_host
_to
_new
_event
(new_event
)
1958 elif ty
== "by_rule":
1960 "rule_id": event
["rule_id"],
1961 "contact_groups": event
["contact_groups"],
1962 "contact_groups_notify": event
.get("contact_groups_notify", False),
1963 "contact_groups_precedence": event
.get("contact_groups_precedence", "host"),
1964 "text": ("The rule event limit of %d open events has been reached for rule \"%s\". "
1965 "Not opening any additional event for this rule until open events have "
1966 "been archived." % (limit
, event
["rule_id"]))
1970 raise NotImplementedError()
1975 class EventCreator(object):
1976 def __init__(self
, logger
, config
):
1977 super(EventCreator
, self
).__init
__()
1978 self
._logger
= logger
1979 self
._config
= config
1981 def create_event_from_line(self
, line
, address
):
1983 # address is either None or a tuple of (ipaddress, port)
1984 "ipaddress": address
and address
[0] or "",
1986 "host_in_downtime": False,
1989 # Variant 1: plain syslog message without priority/facility:
1990 # May 26 13:45:01 Klapprechner CRON[8046]: message....
1992 # Variant 1a: plain syslog message without priority/facility/host:
1993 # May 26 13:45:01 Klapprechner CRON[8046]: message....
1995 # Variant 2: syslog message including facility (RFC 3164)
1996 # <78>May 26 13:45:01 Klapprechner CRON[8046]: message....
1998 # Variant 3: local Nagios alert posted by mkevent -n
1999 # <154>@1341847712;5;Contact Info; MyHost My Service: CRIT - This che
2001 # Variant 4: remote Nagios alert posted by mkevent -n -> syslog
2002 # <154>Jul 9 17:28:32 Klapprechner @1341847712;5;Contact Info; MyHost My Service: CRIT - This che
2004 # Variant 5: syslog message
2005 # Timestamp is RFC3339 with additional restrictions:
2006 # - The "T" and "Z" characters in this syntax MUST be upper case.
2007 # - Usage of the "T" character is REQUIRED.
2008 # - Leap seconds MUST NOT be used.
2009 # <166>2013-04-05T13:49:31.685Z esx Vpxa: message....
2011 # Variant 6: syslog message without date / host:
2012 # <5>SYSTEM_INFO: [WLAN-1] Triggering Background Scan
2014 # Variant 7: logwatch.ec event forwarding
2015 # <78>@1341847712 Klapprechner /var/log/syslog: message....
2017 # Variant 7a: Event simulation
2018 # <%PRI%>@%TIMESTAMP%;%SL% %HOSTNAME% %syslogtag%: %msg%
2020 # Variant 8: syslog message from sophos firewall
2021 # <84>2015:03:25-12:02:06 gw pluto[7122]: listening for IKE messages
2023 # Variant 9: syslog message (RFC 5424)
2024 # <134>1 2016-06-02T12:49:05.181+02:00 chrissw7 ChrisApp - TestID - coming from java code
2027 # 2016 May 26 15:41:47 IST XYZ Ebra: %LINEPROTO-5-UPDOWN: Line protocol on Interface Ethernet45 (XXX.ASAD.Et45), changed state to up
2028 # year month day hh:mm:ss timezone HOSTNAME KeyAgent:
2030 # FIXME: Would be better to parse the syslog messages in another way:
2031 # Split the message by the first ":", then split the syslog header part
2032 # and detect which information are present. Take a look at the syslog RFCs
2035 # Variant 2,3,4,5,6,7,8
2036 if line
.startswith('<'):
2038 prio
= int(line
[1:i
])
2040 event
["facility"] = prio
>> 3
2041 event
["priority"] = prio
& 7
2045 event
["facility"] = 1 # user
2046 event
["priority"] = 5 # notice
2049 if line
[0] == '@' and line
[11] in [' ', ';']:
2050 details
, event
['host'], line
= line
.split(' ', 2)
2051 detail_tokens
= details
.split(';')
2052 timestamp
= detail_tokens
[0]
2053 if len(detail_tokens
) > 1:
2054 event
["sl"] = int(detail_tokens
[1])
2055 event
['time'] = float(timestamp
[1:])
2056 event
.update(self
._parse
_syslog
_info
(line
))
2059 elif line
.startswith("@"):
2060 event
.update(self
._parse
_monitoring
_info
(line
))
2063 elif len(line
) > 24 and line
[10] == 'T':
2064 # There is no 3339 parsing built into python. We do ignore subseconds and timezones
2065 # here. This is seems to be ok for the moment - sorry. Please drop a note if you
2066 # got a good solutuion for this.
2067 rfc3339_part
, event
['host'], line
= line
.split(' ', 2)
2068 event
['time'] = time
.mktime(time
.strptime(rfc3339_part
[:19], '%Y-%m-%dT%H:%M:%S'))
2069 event
.update(self
._parse
_syslog
_info
(line
))
2072 elif len(line
) > 24 and line
[12] == "T":
2073 event
.update(self
._parse
_rfc
5424_syslog
_info
(line
))
2076 elif line
[10] == '-' and line
[19] == ' ':
2077 event
['host'] = line
.split(' ')[1]
2078 event
['time'] = time
.mktime(time
.strptime(line
.split(' ')[0], '%Y:%m:%d-%H:%M:%S'))
2079 rest
= " ".join(line
.split(' ')[2:])
2080 event
.update(self
._parse
_syslog
_info
(rest
))
2083 elif len(line
.split(': ', 1)[0].split(' ')) == 1:
2084 event
.update(self
._parse
_syslog
_info
(line
))
2085 # There is no datetime information in the message, use current time
2086 event
['time'] = time
.time()
2087 # There is no host information, use the provided address
2088 if address
and isinstance(address
, tuple):
2089 event
["host"] = address
[0]
2092 elif line
[4] == " " and line
[:4].isdigit():
2093 time_part
= line
[:20] # ignoring tz info
2094 event
["host"], application
, line
= line
[25:].split(" ", 2)
2095 event
["application"] = application
.rstrip(":")
2096 event
["text"] = line
2097 event
['time'] = time
.mktime(time
.strptime(time_part
, '%Y %b %d %H:%M:%S'))
2101 month_name
, day
, timeofday
, rest
= line
.split(None, 3)
2103 # Special handling for variant 1a. Detect whether or not host
2104 # is a hostname or syslog tag
2105 host
, tmp_rest
= rest
.split(None, 1)
2106 if host
.endswith(":"):
2107 # There is no host information sent, use the source address as "host"
2110 # Use the extracted host and continue with the remaining message text
2113 event
["host"] = host
2116 if rest
.startswith("@"):
2117 event
.update(self
._parse
_monitoring
_info
(rest
))
2121 event
.update(self
._parse
_syslog
_info
(rest
))
2123 month
= EventServer
.month_names
[month_name
]
2126 # Nasty: the year is not contained in the message. We cannot simply
2127 # assume that the message if from the current year.
2128 lt
= time
.localtime()
2129 if lt
.tm_mon
< 6 and month
> 6: # Assume that message is from last year
2130 year
= lt
.tm_year
- 1
2132 year
= lt
.tm_year
# Assume the current year
2134 hours
, minutes
, seconds
= map(int, timeofday
.split(":"))
2136 # A further problem here: we do not now whether the message is in DST or not
2137 event
["time"] = time
.mktime((year
, month
, day
, hours
, minutes
, seconds
, 0, 0,
2140 # The event simulator ships the simulated original IP address in the
2141 # hostname field, separated with a pipe, e.g. "myhost|1.2.3.4"
2142 if "|" in event
["host"]:
2143 event
["host"], event
["ipaddress"] = event
["host"].split("|", 1)
2145 except Exception as e
:
2146 if self
._config
["debug_rules"]:
2147 self
._logger
.exception('Got non-syslog message "%s" (%s)' % (line
, e
))
2153 "ipaddress": address
and address
[0] or "",
2156 "time": time
.time(),
2158 "host_in_downtime": False,
2161 if self
._config
["debug_rules"]:
2162 self
._logger
.info('Parsed message:\n' + ("".join(
2163 [" %-15s %s\n" % (k
+ ":", v
) for (k
, v
) in sorted(event
.iteritems())])).rstrip())
2167 def _parse_rfc5424_syslog_info(self
, line
):
2170 (_unused_version
, timestamp
, hostname
, app_name
, procid
, _unused_msgid
, rest
) = line
.split(
2173 # There is no 3339 parsing built into python. We do ignore subseconds and timezones
2174 # here. This is seems to be ok for the moment - sorry. Please drop a note if you
2175 # got a good solutuion for this.
2176 event
['time'] = time
.mktime(time
.strptime(timestamp
[:19], '%Y-%m-%dT%H:%M:%S'))
2179 event
["host"] = hostname
2182 event
["application"] = app_name
2185 event
["pid"] = procid
2188 # has stuctured data
2189 structured_data
, message
= rest
[1:].split("] ", 1)
2190 elif rest
.startswith("- "):
2191 # has no stuctured data
2192 structured_data
, message
= rest
.split(" ", 1)
2194 raise Exception("Invalid RFC 5424 syslog message")
2196 if structured_data
!= "-":
2197 event
["text"] = "[%s] %s" % (structured_data
, message
)
2199 event
["text"] = message
2203 def _parse_syslog_info(self
, line
):
2205 # Replaced ":" by ": " here to make tags with ":" possible. This
2206 # is needed to process logs generated by windows agent logfiles
2207 # like "c://test.log".
2208 tag
, message
= line
.split(": ", 1)
2209 event
["text"] = message
.strip()
2212 app
, pid
= tag
.split('[', 1)
2213 pid
= pid
.rstrip(']')
2218 event
["application"] = app
2222 def _parse_monitoring_info(self
, line
):
2224 # line starts with '@'
2226 timestamp_str
, sl
, contact
, rest
= line
[1:].split(';', 3)
2227 host
, rest
= rest
.split(None, 1)
2229 event
["sl"] = int(sl
)
2231 event
["contact"] = contact
2233 timestamp_str
, host
, rest
= line
[1:].split(None, 2)
2235 event
["time"] = float(int(timestamp_str
))
2236 service
, message
= rest
.split(": ", 1)
2237 event
["application"] = service
2238 event
["text"] = message
.strip()
2239 event
["host"] = host
2242 def create_event_from_trap(self
, trap
, ipaddress
):
2243 # use the trap-oid as application
2245 for index
, (oid
, _unused_val
) in enumerate(trap
):
2246 if oid
in ['1.3.6.1.6.3.1.1.4.1.0', 'SNMPv2-MIB::snmpTrapOID.0']:
2247 application
= scrub_and_decode(trap
.pop(index
)[1])
2250 # once we got here we have a real parsed trap which we convert to an event now
2251 safe_ipaddress
= scrub_and_decode(ipaddress
)
2252 text
= scrub_and_decode(', '.join(['%s: %s' % (item
[0], str(item
[1])) for item
in trap
]))
2255 'time': time
.time(),
2256 'host': safe_ipaddress
,
2257 'ipaddress': safe_ipaddress
,
2258 'priority': 5, # notice
2259 'facility': 31, # not used by syslog -> we use this for all traps
2260 'application': application
,
2263 'host_in_downtime': False,
2269 class RuleMatcher(object):
2270 def __init__(self
, logger
, config
):
2271 super(RuleMatcher
, self
).__init
__()
2272 self
._logger
= logger
2273 self
._config
= config
2274 self
._time
_periods
= TimePeriods(logger
)
2277 def _debug_rules(self
):
2278 return self
._config
["debug_rules"]
2280 def event_rule_matches_non_inverted(self
, rule
, event
):
2281 if self
._debug
_rules
:
2282 self
._logger
.info("Trying rule %s/%s..." % (rule
["pack"], rule
["id"]))
2283 self
._logger
.info(" Text: %s" % event
["text"])
2284 self
._logger
.info(" Syslog: %d.%d" % (event
["facility"], event
["priority"]))
2285 self
._logger
.info(" Host: %s" % event
["host"])
2287 # Generic conditions without positive/canceling matches
2288 if not self
.event_rule_matches_generic(rule
, event
):
2291 # Determine syslog priority
2293 if not self
.event_rule_determine_match_priority(rule
, event
, match_priority
):
2294 # Abort on negative outcome, neither positive nor negative
2297 # Determine and cleanup match_groups
2299 if not self
.event_rule_determine_match_groups(rule
, event
, match_groups
):
2300 # Abort on negative outcome, neither positive nor negative
2303 return self
._check
_match
_outcome
(rule
, match_groups
, match_priority
)
2305 def _check_match_outcome(self
, rule
, match_groups
, match_priority
):
2306 # type: (Dict[str, Any], Dict[str, Any], Dict[str, Any]) -> Union[bool, Tuple[bool, Dict[str, Any]]]
2307 """Decide or not a event is created, canceled or nothing is done"""
2309 # Check canceling-event
2310 has_canceling_condition
= bool(
2311 [x
for x
in ["match_ok", "cancel_application", "cancel_priority"] if x
in rule
])
2312 if has_canceling_condition
:
2313 if ("match_ok" not in rule
or match_groups
.get("match_groups_message_ok", False) is not False) and\
2314 ("cancel_application" not in rule
or
2315 match_groups
.get("match_groups_syslog_application_ok", False) is not False) and\
2316 ("cancel_priority" not in rule
or match_priority
["has_canceling_match"] is True):
2317 if self
._debug
_rules
:
2318 self
._logger
.info(" found canceling event")
2319 return True, match_groups
2321 # Check create-event
2322 if match_groups
["match_groups_message"] is not False and\
2323 match_groups
.get("match_groups_syslog_application", ()) is not False and\
2324 match_priority
["has_match"] is True:
2325 if self
._debug
_rules
:
2326 self
._logger
.info(" found new event")
2327 return False, match_groups
2329 # Looks like there was no match, output some additonal info
2330 # Reasons preventing create-event
2331 if self
._debug
_rules
:
2332 if match_groups
["match_groups_message"] is False:
2333 self
._logger
.info(" did not create event, because of wrong message")
2334 if "match_application" in rule
and match_groups
[
2335 "match_groups_syslog_application"] is False:
2336 self
._logger
.info(" did not create event, because of wrong syslog application")
2337 if "match_priority" in rule
and match_priority
["has_match"] is False:
2338 self
._logger
.info(" did not create event, because of wrong syslog priority")
2340 if has_canceling_condition
:
2341 # Reasons preventing cancel-event
2342 if "match_ok" in rule
and match_groups
.get("match_groups_message_ok",
2344 self
._logger
.info(" did not cancel event, because of wrong message")
2345 if "cancel_application" in rule
and \
2346 match_groups
.get("match_groups_syslog_application_ok", False) is False:
2347 self
._logger
.info(" did not cancel event, because of wrong syslog application")
2348 if "cancel_priority" in rule
and match_priority
["has_canceling_match"] is False:
2349 self
._logger
.info(" did not cancel event, because of wrong cancel priority")
2353 def event_rule_matches_generic(self
, rule
, event
):
2354 generic_match_functions
= [
2355 self
.event_rule_matches_site
,
2356 self
.event_rule_matches_host
,
2357 self
.event_rule_matches_ip
,
2358 self
.event_rule_matches_facility
,
2359 self
.event_rule_matches_service_level
,
2360 self
.event_rule_matches_timeperiod
,
2363 for match_function
in generic_match_functions
:
2364 if not match_function(rule
, event
):
2368 def event_rule_determine_match_priority(self
, rule
, event
, match_priority
):
2369 p
= event
["priority"]
2371 if "match_priority" in rule
:
2372 prio_from
, prio_to
= sorted(rule
["match_priority"])
2373 match_priority
["has_match"] = prio_from
<= p
<= prio_to
2375 match_priority
["has_match"] = True
2377 if "cancel_priority" in rule
:
2378 cancel_from
, cancel_to
= sorted(rule
["cancel_priority"])
2379 match_priority
["has_canceling_match"] = cancel_from
<= p
<= cancel_to
2381 match_priority
["has_canceling_match"] = False
2383 if match_priority
["has_match"] is False and\
2384 match_priority
["has_canceling_match"] is False:
2389 def event_rule_matches_site(self
, rule
, event
):
2390 return "match_site" not in rule
or cmk
.omd_site() in rule
["match_site"]
2392 def event_rule_matches_host(self
, rule
, event
):
2393 if match(rule
.get("match_host"), event
["host"], complete
=True) is False:
2394 if self
._debug
_rules
:
2395 self
._logger
.info(" did not match because of wrong host '%s' (need '%s')" %
2396 (event
["host"], format_pattern(rule
.get("match_host"))))
2400 def event_rule_matches_ip(self
, rule
, event
):
2401 if not match_ipv4_network(rule
.get("match_ipaddress", "0.0.0.0/0"), event
["ipaddress"]):
2402 if self
._debug
_rules
:
2404 " did not match because of wrong source IP address '%s' (need '%s')" %
2405 (event
["ipaddress"], rule
.get("match_ipaddress")))
2409 def event_rule_matches_facility(self
, rule
, event
):
2410 if "match_facility" in rule
and event
["facility"] != rule
["match_facility"]:
2411 if self
._debug
_rules
:
2412 self
._logger
.info(" did not match because of wrong syslog facility")
2416 def event_rule_matches_service_level(self
, rule
, event
):
2417 if "match_sl" in rule
:
2418 sl_from
, sl_to
= rule
["match_sl"]
2420 sl_to
, sl_from
= sl_from
, sl_to
2421 p
= event
.get("sl", 0)
2422 if p
< sl_from
or p
> sl_to
:
2423 if self
._debug
_rules
:
2425 " did not match because of wrong service level %d (need %d..%d)" %
2426 (p
, sl_from
, sl_to
),)
2430 def event_rule_matches_timeperiod(self
, rule
, event
):
2431 if "match_timeperiod" in rule
and not self
._time
_periods
.check(rule
["match_timeperiod"]):
2432 if self
._debug
_rules
:
2433 self
._logger
.info(" did not match, because timeperiod %s is not active" %
2434 rule
["match_timeperiod"])
2438 def event_rule_determine_match_groups(self
, rule
, event
, match_groups
):
2439 match_group_functions
= [
2440 self
.event_rule_matches_syslog_application
,
2441 self
.event_rule_matches_message
,
2443 for match_function
in match_group_functions
:
2444 if not match_function(rule
, event
, match_groups
):
2448 def event_rule_matches_syslog_application(self
, rule
, event
, match_groups
):
2449 if "match_application" not in rule
and "cancel_application" not in rule
:
2452 # Syslog application
2453 if "match_application" in rule
:
2454 match_groups
["match_groups_syslog_application"] = match(
2455 rule
.get("match_application"), event
["application"], complete
=False)
2457 # Syslog application canceling, this option must be explictly set
2458 if "cancel_application" in rule
:
2459 match_groups
["match_groups_syslog_application_ok"] = match(
2460 rule
.get("cancel_application"), event
["application"], complete
=False)
2462 # Detect impossible match
2463 if match_groups
.get("match_groups_syslog_application", False) is False and\
2464 match_groups
.get("match_groups_syslog_application_ok", False) is False:
2465 if self
._debug
_rules
:
2466 self
._logger
.info(" did not match, syslog application does not match")
2471 def event_rule_matches_message(self
, rule
, event
, match_groups
):
2472 # Message matching, this condition is always active
2473 match_groups
["match_groups_message"] = match(
2474 rule
.get("match"), event
["text"], complete
=False)
2476 # Message canceling, this option must be explictly set
2477 if "match_ok" in rule
:
2478 match_groups
["match_groups_message_ok"] = match(
2479 rule
.get("match_ok"), event
["text"], complete
=False)
2481 # Detect impossible match
2482 if match_groups
["match_groups_message"] is False and\
2483 match_groups
.get("match_groups_message_ok", False) is False:
2484 if self
._debug
_rules
:
2485 self
._logger
.info(" did not match, message text does not match")
2492 # .--Status Queries------------------------------------------------------.
2493 # | ____ _ _ ___ _ |
2494 # | / ___|| |_ __ _| |_ _ _ ___ / _ \ _ _ ___ _ __(_) ___ ___ |
2495 # | \___ \| __/ _` | __| | | / __| | | | | | | |/ _ \ '__| |/ _ \/ __| |
2496 # | ___) | || (_| | |_| |_| \__ \ | |_| | |_| | __/ | | | __/\__ \ |
2497 # | |____/ \__\__,_|\__|\__,_|___/ \__\_\\__,_|\___|_| |_|\___||___/ |
2499 # +----------------------------------------------------------------------+
2500 # | Parsing and processing of status queries |
2501 # '----------------------------------------------------------------------'
2504 class Queries(object):
2505 def __init__(self
, status_server
, sock
, logger
):
2506 super(Queries
, self
).__init
__()
2507 self
._status
_server
= status_server
2509 self
._logger
= logger
2517 parts
= self
._buffer
.split("\n\n", 1)
2520 data
= self
._socket
.recv(4096)
2522 if len(self
._buffer
) == 0:
2523 raise StopIteration()
2524 parts
= [self
._buffer
, ""]
2526 self
._buffer
+= data
2527 request
, self
._buffer
= parts
2528 return Query
.make(self
._status
_server
, request
.decode("utf-8").splitlines(), self
._logger
)
2531 class Query(object):
2533 def make(status_server
, raw_query
, logger
):
2534 parts
= raw_query
[0].split(None, 1)
2536 raise MKClientError("Invalid query. Need GET/COMMAND plus argument(s)")
2539 return QueryGET(status_server
, raw_query
, logger
)
2540 if method
== "REPLICATE":
2541 return QueryREPLICATE(status_server
, raw_query
, logger
)
2542 if method
== "COMMAND":
2543 return QueryCOMMAND(status_server
, raw_query
, logger
)
2544 raise MKClientError("Invalid method %s (allowed are GET, REPLICATE, COMMAND)" % method
)
2546 def __init__(self
, status_server
, raw_query
, logger
):
2547 super(Query
, self
).__init
__()
2549 self
._logger
= logger
2550 self
.output_format
= "python"
2552 self
._raw
_query
= raw_query
2553 self
._from
_raw
_query
(status_server
)
2555 def _from_raw_query(self
, status_server
):
2556 self
._parse
_method
_and
_args
()
2558 def _parse_method_and_args(self
):
2559 parts
= self
._raw
_query
[0].split(None, 1)
2561 raise MKClientError("Invalid query. Need GET/COMMAND plus argument(s)")
2563 self
.method
, self
.method_arg
= parts
2566 return repr("\n".join(self
._raw
_query
))
2569 class QueryGET(Query
):
2570 _filter_operators
= {
2571 "=": (lambda a
, b
: a
== b
),
2572 ">": (lambda a
, b
: a
> b
),
2573 "<": (lambda a
, b
: a
< b
),
2574 ">=": (lambda a
, b
: a
>= b
),
2575 "<=": (lambda a
, b
: a
<= b
),
2576 "~": (lambda a
, b
: cmk
.utils
.regex
.regex(b
).search(a
)),
2577 "=~": (lambda a
, b
: a
.lower() == b
.lower()),
2578 "~~": (lambda a
, b
: cmk
.utils
.regex
.regex(b
.lower()).search(a
.lower())),
2579 "in": (lambda a
, b
: a
in b
),
2582 def _from_raw_query(self
, status_server
):
2583 super(QueryGET
, self
)._from
_raw
_query
(status_server
)
2584 self
._parse
_table
(status_server
)
2585 self
._parse
_header
_lines
()
2587 def _parse_table(self
, status_server
):
2588 self
.table_name
= self
.method_arg
2589 self
.table
= status_server
.table(self
.table_name
)
2591 def _parse_header_lines(self
):
2592 self
.requested_columns
= self
.table
.column_names
# use all columns as default
2595 self
.only_host
= None
2597 self
.header_lines
= []
2598 for line
in self
._raw
_query
[1:]:
2600 header
, argument
= line
.rstrip("\n").split(":", 1)
2601 argument
= argument
.lstrip(" ")
2603 if header
== "OutputFormat":
2604 if argument
not in ["python", "plain", "json"]:
2605 raise MKClientError(
2606 "Invalid output format \"%s\" (allowed are: python, plain, json)" %
2609 self
.output_format
= argument
2611 elif header
== "Columns":
2612 self
.requested_columns
= argument
.split(" ")
2614 elif header
== "Filter":
2615 column_name
, operator_name
, predicate
, argument
= self
._parse
_filter
(argument
)
2617 # Needed for later optimization (check_mkevents)
2618 if column_name
== "event_host" and operator_name
== 'in':
2619 self
.only_host
= set(argument
)
2621 self
.filters
.append((column_name
, operator_name
, predicate
, argument
))
2623 elif header
== "Limit":
2624 self
.limit
= int(argument
)
2627 self
._logger
.info("Ignoring not-implemented header %s" % header
)
2629 except Exception as e
:
2630 raise MKClientError("Invalid header line '%s': %s" % (line
.rstrip(), e
))
2632 def _parse_filter(self
, textspec
):
2635 # name ~= This is some .* text
2637 parts
= textspec
.split(None, 2)
2640 column
, operator_name
, argument
= parts
2643 convert
= self
.table
.column_types
[column
]
2645 raise MKClientError(
2646 "Unknown column: %s (Available are: %s)" % (column
, self
.table
.column_names
))
2648 # TODO: BUG: The query is decoded to unicode after receiving it from
2649 # the socket. The columns with type str (initialied with "") will apply
2650 # str(argument) here and convert the value back to str! This will crash
2651 # when the filter contains non ascii characters!
2652 # Fix this by making the default values unicode and skip unicode conversion
2653 # here (for performance reasons) because argument is already unicode.
2654 if operator_name
== 'in':
2655 argument
= map(convert
, argument
.split())
2657 argument
= convert(argument
)
2659 operator_function
= self
._filter
_operators
.get(operator_name
)
2660 if not operator_function
:
2661 raise MKClientError("Unknown filter operator '%s'" % operator_name
)
2663 return (column
, operator_name
, lambda x
: operator_function(x
, argument
), argument
)
2665 def requested_column_indexes(self
):
2668 for column_name
in self
.requested_columns
:
2670 column_index
= self
.table
.column_indices
[column_name
]
2672 # The column is not known: Use None as index and None value later
2674 indexes
.append(column_index
)
2678 def filter_row(self
, row
):
2679 for column_name
, _operator_name
, predicate
, _argument
in self
.filters
:
2680 if not predicate(row
[self
.table
.column_indices
[column_name
]]):
2685 class QueryREPLICATE(Query
):
2689 class QueryCOMMAND(Query
):
2694 # .--Status Tables-------------------------------------------------------.
2695 # | ____ _ _ _____ _ _ |
2696 # | / ___|| |_ __ _| |_ _ _ ___ |_ _|_ _| |__ | | ___ ___ |
2697 # | \___ \| __/ _` | __| | | / __| | |/ _` | '_ \| |/ _ \/ __| |
2698 # | ___) | || (_| | |_| |_| \__ \ | | (_| | |_) | | __/\__ \ |
2699 # | |____/ \__\__,_|\__|\__,_|___/ |_|\__,_|_.__/|_|\___||___/ |
2701 # +----------------------------------------------------------------------+
2702 # | Definitions of the tables available for status queries |
2703 # '----------------------------------------------------------------------'
2704 # If you need a new column here, then these are the places to change:
2706 # - add column to the end of StatusTableEvents.columns
2707 # - add column to grepping_filters if it is a str column
2708 # - deal with convert_history_line() (if not a str column)
2709 # - make sure that the new column is filled at *every* place where
2710 # an event is being created:
2711 # * _create_event_from_trap()
2712 # * create_event_from_line()
2713 # * _handle_absent_event()
2714 # * _create_overflow_event()
2715 # - When loading the status file add the possibly missing column to all
2716 # loaded events (load_status())
2717 # - Maybe add matching/rewriting for the new column
2718 # - write the actual code using the new column
2720 # - Add column painter for the new column
2723 # - Add painter and filter to all views where appropriate
2724 # - maybe add WATO code for matching rewriting
2725 # - do not forget event_rule_matches() in web!
2726 # - maybe add a field into the event simulator
2729 class StatusTable(object):
2730 prefix
= None # type: Optional[str]
2731 columns
= [] # type: List[Tuple[str, Any]]
2733 # Must return a enumerable type containing fully populated lists (rows) matching the
2734 # columns of the table
2736 def _enumerate(self
, query
):
2737 raise NotImplementedError()
2739 def __init__(self
, logger
):
2740 super(StatusTable
, self
).__init
__()
2741 self
._logger
= logger
.getChild("status_table.%s" % self
.prefix
)
2742 self
._populate
_column
_views
()
2744 def _populate_column_views(self
):
2745 self
.column_names
= [c
[0] for c
in self
.columns
]
2746 self
.column_defaults
= dict(self
.columns
)
2748 self
.column_types
= {}
2749 for name
, def_val
in self
.columns
:
2750 self
.column_types
[name
] = type(def_val
)
2752 self
.column_indices
= dict([(name
, index
) for index
, name
in enumerate(self
.column_names
)])
2754 def query(self
, query
):
2755 requested_column_indexes
= query
.requested_column_indexes()
2757 # Output the column headers
2758 # TODO: Add support for ColumnHeaders like in livestatus?
2759 yield query
.requested_columns
2762 for row
in self
._enumerate
(query
):
2763 if query
.limit
is not None and num_rows
>= query
.limit
:
2764 break # The maximum number of rows has been reached
2767 # TODO: History filtering is done in history load code. Check for improvements
2768 if query
.filters
and query
.table_name
!= "history":
2769 matched
= query
.filter_row(row
)
2773 yield self
._build
_result
_row
(row
, requested_column_indexes
)
2776 def _build_result_row(self
, row
, requested_column_indexes
):
2778 for index
in requested_column_indexes
:
2780 result_row
.append(None)
2782 result_row
.append(row
[index
])
2786 class StatusTableEvents(StatusTable
):
2792 ("event_first", 0.0),
2793 ("event_last", 0.0),
2794 ("event_comment", ""),
2795 ("event_sl", 0), # filter fehlt
2797 ("event_contact", ""),
2798 ("event_application", ""),
2800 ("event_priority", 5),
2801 ("event_facility", 1),
2802 ("event_rule_id", ""),
2804 ("event_phase", ""),
2805 ("event_owner", ""),
2806 ("event_match_groups", ""), # last column up to 1.2.4
2807 ("event_contact_groups", ""), # introduced in 1.2.5i2
2808 ("event_ipaddress", ""), # introduced in 1.2.7i1
2809 ("event_orig_host", ""), # introduced in 1.4.0b1
2810 ("event_contact_groups_precedence", "host"), # introduced in 1.4.0b1
2811 ("event_core_host", ""), # introduced in 1.5.0i1
2812 ("event_host_in_downtime", False), # introduced in 1.5.0i1
2813 ("event_match_groups_syslog_application", ""), # introduced in 1.5.0i2
2816 def __init__(self
, logger
, event_status
):
2817 super(StatusTableEvents
, self
).__init
__(logger
)
2818 self
._event
_status
= event_status
2820 def _enumerate(self
, query
):
2821 for event
in self
._event
_status
.get_events():
2822 # Optimize filters that are set by the check_mkevents active check. Since users
2823 # may have a lot of those checks running, it is a good idea to optimize this.
2824 if query
.only_host
and event
["host"] not in query
.only_host
:
2828 for column_name
in self
.column_names
:
2830 row
.append(event
[column_name
[6:]])
2832 # The row does not have this value. Use the columns default value
2833 row
.append(self
.column_defaults
[column_name
])
2838 class StatusTableHistory(StatusTable
):
2841 ("history_line", 0), # Line number in event history file
2842 ("history_time", 0.0),
2843 ("history_what", ""),
2844 ("history_who", ""),
2845 ("history_addinfo", ""),
2846 ] + StatusTableEvents
.columns
2848 def __init__(self
, logger
, history
):
2849 super(StatusTableHistory
, self
).__init
__(logger
)
2850 self
._history
= history
2852 def _enumerate(self
, query
):
2853 return self
._history
.get(query
)
2856 class StatusTableRules(StatusTable
):
2863 def __init__(self
, logger
, event_status
):
2864 super(StatusTableRules
, self
).__init
__(logger
)
2865 self
._event
_status
= event_status
2867 def _enumerate(self
, query
):
2868 return self
._event
_status
.get_rule_stats()
2871 class StatusTableStatus(StatusTable
):
2873 columns
= EventServer
.status_columns()
2875 def __init__(self
, logger
, event_server
):
2876 super(StatusTableStatus
, self
).__init
__(logger
)
2877 self
._event
_server
= event_server
2879 def _enumerate(self
, query
):
2880 return self
._event
_server
.get_status()
2884 # .--StatusServer--------------------------------------------------------.
2886 # | / ___|| |_ __ _| |_ _ _ ___/ ___| ___ _ ____ _____ _ __ |
2887 # | \___ \| __/ _` | __| | | / __\___ \ / _ \ '__\ \ / / _ \ '__| |
2888 # | ___) | || (_| | |_| |_| \__ \___) | __/ | \ V / __/ | |
2889 # | |____/ \__\__,_|\__|\__,_|___/____/ \___|_| \_/ \___|_| |
2891 # +----------------------------------------------------------------------+
2892 # | Beantworten von Status- und Kommandoanfragen über das UNIX-Socket |
2893 # '----------------------------------------------------------------------'
2896 class StatusServer(ECServerThread
):
2897 def __init__(self
, logger
, settings
, config
, slave_status
, perfcounters
, lock_configuration
,
2898 history
, event_status
, event_server
, terminate_main_event
):
2899 super(StatusServer
, self
).__init
__(
2900 name
="StatusServer",
2904 slave_status
=slave_status
,
2905 profiling_enabled
=settings
.options
.profile_status
,
2906 profile_file
=settings
.paths
.status_server_profile
.value
)
2908 self
._tcp
_socket
= None
2909 self
._reopen
_sockets
= False
2911 self
._table
_events
= StatusTableEvents(logger
, event_status
)
2912 self
._table
_history
= StatusTableHistory(logger
, history
)
2913 self
._table
_rules
= StatusTableRules(logger
, event_status
)
2914 self
._table
_status
= StatusTableStatus(logger
, event_server
)
2915 self
._perfcounters
= perfcounters
2916 self
._lock
_configuration
= lock_configuration
2917 self
._history
= history
2918 self
._event
_status
= event_status
2919 self
._event
_server
= event_server
2920 self
._event
_columns
= StatusTableEvents
.columns
2921 self
._terminate
_main
_event
= terminate_main_event
2923 self
.open_unix_socket()
2924 self
.open_tcp_socket()
2926 def table(self
, name
):
2927 if name
== "events":
2928 return self
._table
_events
2929 if name
== "history":
2930 return self
._table
_history
2932 return self
._table
_rules
2933 if name
== "status":
2934 return self
._table
_status
2935 raise MKClientError(
2936 "Invalid table: %s (allowed are: events, history, rules, status)" % name
)
2938 def open_unix_socket(self
):
2939 path
= self
.settings
.paths
.unix_socket
.value
2942 path
.parent
.mkdir(parents
=True, exist_ok
=True)
2943 self
._socket
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
2944 self
._socket
.bind(str(path
))
2945 # Make sure that socket is group writable
2947 self
._socket
.listen(self
._config
['socket_queue_len'])
2948 self
._unix
_socket
_queue
_len
= self
._config
['socket_queue_len'] # detect changes in config
2950 def open_tcp_socket(self
):
2951 if self
._config
["remote_status"]:
2953 self
._tcp
_port
, self
._tcp
_allow
_commands
= self
._config
["remote_status"][:2]
2955 self
._tcp
_access
_list
= self
._config
["remote_status"][2]
2957 self
._tcp
_access
_list
= None
2959 self
._tcp
_socket
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
2960 self
._tcp
_socket
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
2961 self
._tcp
_socket
.bind(("0.0.0.0", self
._tcp
_port
))
2962 self
._tcp
_socket
.listen(self
._config
['socket_queue_len'])
2964 "Going to listen for status queries on TCP port %d" % self
._tcp
_port
)
2965 except Exception as e
:
2966 if self
.settings
.options
.debug
:
2968 self
._logger
.exception(
2969 "Cannot listen on TCP socket port %d: %s" % (self
._tcp
_port
, e
))
2971 self
._tcp
_socket
= None
2973 self
._tcp
_allow
_commands
= False
2974 self
._tcp
_access
_list
= None
2976 def close_unix_socket(self
):
2978 self
._socket
.close()
2981 def close_tcp_socket(self
):
2982 if self
._tcp
_socket
:
2983 self
._tcp
_socket
.close()
2984 self
._tcp
_socket
= None
2986 def reopen_sockets(self
):
2987 if self
._unix
_socket
_queue
_len
!= self
._config
["socket_queue_len"]:
2988 self
._logger
.info("socket_queue_len has changed. Reopening UNIX socket.")
2989 self
.close_unix_socket()
2990 self
.open_unix_socket()
2992 self
.close_tcp_socket()
2993 self
.open_tcp_socket()
2995 def reload_configuration(self
, config
):
2996 self
._config
= config
2997 self
._reopen
_sockets
= True
3000 while not self
._terminate
_event
.is_set():
3002 client_socket
= None
3005 if self
._reopen
_sockets
:
3006 self
.reopen_sockets()
3007 self
._reopen
_sockets
= False
3009 listen_list
= [self
._socket
]
3010 if self
._tcp
_socket
:
3011 listen_list
.append(self
._tcp
_socket
)
3014 readable
= select
.select(listen_list
, [], [], 0.2)[0]
3015 except select
.error
as e
:
3016 if e
[0] == errno
.EINTR
:
3021 client_socket
, addr_info
= s
.accept()
3022 client_socket
.settimeout(3)
3023 before
= time
.time()
3024 self
._perfcounters
.count("connects")
3026 allow_commands
= self
._tcp
_allow
_commands
3027 if self
.settings
.options
.debug
:
3028 self
._logger
.info("Handle status connection from %s:%d" % addr_info
)
3029 if self
._tcp
_access
_list
is not None and addr_info
[0] not in \
3030 self
._tcp
_access
_list
:
3031 client_socket
.close()
3032 client_socket
= None
3034 "Denying access to status socket from %s (allowed is only %s)" %
3035 (addr_info
[0], ", ".join(self
._tcp
_access
_list
)))
3038 allow_commands
= True
3040 self
.handle_client(client_socket
, allow_commands
, addr_info
and addr_info
[0] or
3043 duration
= time
.time() - before
3044 self
._logger
.verbose("Answered request in %0.2f ms" % (duration
* 1000))
3045 self
._perfcounters
.count_time("request", duration
)
3047 except Exception as e
:
3048 msg
= "Error handling client %s: %s" % (addr_info
, e
)
3049 # Do not log a stack trace for client errors, they are not *our* fault.
3050 if isinstance(e
, MKClientError
):
3051 self
._logger
.error(msg
)
3053 self
._logger
.exception(msg
)
3055 client_socket
.close()
3056 client_socket
= None
3058 client_socket
= None # close without danger of exception
3060 def handle_client(self
, client_socket
, allow_commands
, client_ip
):
3061 for query
in Queries(self
, client_socket
, self
._logger
):
3062 self
._logger
.verbose("Client livestatus query: %r" % query
)
3064 with self
._event
_status
.lock
:
3065 if query
.method
== "GET":
3066 response
= self
.table(query
.table_name
).query(query
)
3068 elif query
.method
== "REPLICATE":
3069 response
= self
.handle_replicate(query
.method_arg
, client_ip
)
3071 elif query
.method
== "COMMAND":
3072 if not allow_commands
:
3073 raise MKClientError("Sorry. Commands are disallowed via TCP")
3074 self
.handle_command_request(query
.method_arg
)
3078 raise NotImplementedError()
3081 self
._answer
_query
(client_socket
, query
, response
)
3082 except socket
.error
as e
:
3083 if e
.errno
== 32: # Broken pipe -> ignore this
3088 client_socket
.close()
3090 # Only GET queries have customizable output formats. COMMAND is always
3091 # a dictionay and COMMAND is always None and always output as "python"
3092 def _answer_query(self
, client_socket
, query
, response
):
3093 if query
.method
!= "GET":
3094 self
._answer
_query
_python
(client_socket
, response
)
3097 if query
.output_format
== "plain":
3098 for row
in response
:
3099 client_socket
.sendall("\t".join([cmk
.ec
.history
.quote_tab(c
) for c
in row
]) + "\n")
3101 elif query
.output_format
== "json":
3102 client_socket
.sendall(json
.dumps(list(response
)) + "\n")
3104 elif query
.output_format
== "python":
3105 self
._answer
_query
_python
(client_socket
, list(response
))
3108 raise NotImplementedError()
3110 def _answer_query_python(self
, client_socket
, response
):
3111 client_socket
.sendall(repr(response
) + "\n")
3113 # All commands are already locked with self._event_status.lock
3114 def handle_command_request(self
, commandline
):
3115 self
._logger
.info("Executing command: %s" % commandline
)
3116 parts
= commandline
.split(";")
3118 replication_allow_command(self
._config
, command
, self
._slave
_status
)
3119 arguments
= parts
[1:]
3120 if command
== "DELETE":
3121 self
.handle_command_delete(arguments
)
3122 elif command
== "RELOAD":
3123 self
.handle_command_reload()
3124 elif command
== "SHUTDOWN":
3125 self
._logger
.info("Going to shut down")
3126 terminate(self
._terminate
_main
_event
, self
._event
_server
, self
)
3127 elif command
== "REOPENLOG":
3128 self
.handle_command_reopenlog()
3129 elif command
== "FLUSH":
3130 self
.handle_command_flush()
3131 elif command
== "SYNC":
3132 self
.handle_command_sync()
3133 elif command
== "RESETCOUNTERS":
3134 self
.handle_command_resetcounters(arguments
)
3135 elif command
== "UPDATE":
3136 self
.handle_command_update(arguments
)
3137 elif command
== "CREATE":
3138 self
.handle_command_create(arguments
)
3139 elif command
== "CHANGESTATE":
3140 self
.handle_command_changestate(arguments
)
3141 elif command
== "ACTION":
3142 self
.handle_command_action(arguments
)
3143 elif command
== "SWITCHMODE":
3144 self
.handle_command_switchmode(arguments
)
3146 raise MKClientError("Unknown command %s" % command
)
3148 def handle_command_delete(self
, arguments
):
3149 if len(arguments
) != 2:
3150 raise MKClientError("Wrong number of arguments for DELETE")
3151 event_id
, user
= arguments
3152 self
._event
_status
.delete_event(int(event_id
), user
)
3154 def handle_command_update(self
, arguments
):
3155 event_id
, user
, acknowledged
, comment
, contact
= arguments
3156 event
= self
._event
_status
.event(int(event_id
))
3158 raise MKClientError("No event with id %s" % event_id
)
3159 # Note the common practice: We validate parameters *before* doing any changes.
3161 ack
= int(acknowledged
)
3162 if ack
and event
["phase"] not in ["open", "ack"]:
3163 raise MKClientError("You cannot acknowledge an event that is not open.")
3164 event
["phase"] = "ack" if ack
else "open"
3166 event
["comment"] = comment
3168 event
["contact"] = contact
3170 event
["owner"] = user
3171 self
._history
.add(event
, "UPDATE", user
)
3173 def handle_command_create(self
, arguments
):
3174 # Would rather use process_raw_line(), but we are already
3175 # holding self._event_status.lock and it's sub functions are setting
3176 # self._event_status.lock too. The lock can not be allocated twice.
3177 # TODO: Change the lock type in future?
3178 # process_raw_lines("%s" % ";".join(arguments))
3179 with
file(str(self
.settings
.paths
.event_pipe
.value
), "w") as pipe
:
3180 pipe
.write(("%s\n" % ";".join(arguments
)).encode("utf-8"))
3182 def handle_command_changestate(self
, arguments
):
3183 event_id
, user
, newstate
= arguments
3184 event
= self
._event
_status
.event(int(event_id
))
3186 raise MKClientError("No event with id %s" % event_id
)
3187 event
["state"] = int(newstate
)
3189 event
["owner"] = user
3190 self
._history
.add(event
, "CHANGESTATE", user
)
3192 def handle_command_reload(self
):
3193 reload_configuration(self
.settings
, self
._logger
, self
._lock
_configuration
, self
._history
,
3194 self
._event
_status
, self
._event
_server
, self
, self
._slave
_status
)
3196 def handle_command_reopenlog(self
):
3197 self
._logger
.info("Closing this logfile")
3198 cmk
.utils
.log
.open_log(str(self
.settings
.paths
.log_file
.value
))
3199 self
._logger
.info("Opened new logfile")
3201 # Erase our current state and history!
3202 def handle_command_flush(self
):
3203 self
._history
.flush()
3204 self
._event
_status
.flush()
3205 self
._event
_status
.save_status()
3206 if is_replication_slave(self
._config
):
3208 self
.settings
.paths
.master_config_file
.value
.unlink()
3209 self
.settings
.paths
.slave_status_file
.value
.unlink()
3210 update_slave_status(self
._slave
_status
, self
.settings
, self
._config
)
3213 self
._logger
.info("Flushed current status and historic events.")
3215 def handle_command_sync(self
):
3216 self
._event
_status
.save_status()
3218 def handle_command_resetcounters(self
, arguments
):
3220 rule_id
= arguments
[0]
3221 self
._logger
.info("Resetting counters of rule " + rule_id
)
3223 rule_id
= None # Reset all rule counters
3224 self
._logger
.info("Resetting all rule counters")
3225 self
._event
_status
.reset_counters(rule_id
)
3227 def handle_command_action(self
, arguments
):
3228 event_id
, user
, action_id
= arguments
3229 event
= self
._event
_status
.event(int(event_id
))
3231 event
["owner"] = user
3233 if action_id
== "@NOTIFY":
3234 cmk
.ec
.actions
.do_notify(
3235 self
._event
_server
, self
._logger
, event
, user
, is_cancelling
=False)
3237 with self
._lock
_configuration
:
3238 if action_id
not in self
._config
["action"]:
3239 raise MKClientError(
3240 "The action '%s' is not defined. After adding new commands please "
3241 "make sure that you activate the changes in the Event Console." % action_id
)
3242 action
= self
._config
["action"][action_id
]
3243 cmk
.ec
.actions
.do_event_action(self
._history
, self
.settings
, self
._config
, self
._logger
,
3244 self
._event
_columns
, action
, event
, user
)
3246 def handle_command_switchmode(self
, arguments
):
3247 new_mode
= arguments
[0]
3248 if not is_replication_slave(self
._config
):
3249 raise MKClientError("Cannot switch replication mode: this is not a replication slave.")
3250 elif new_mode
not in ["sync", "takeover"]:
3251 raise MKClientError(
3252 "Invalid target mode '%s': allowed are only 'sync' and 'takeover'" % new_mode
)
3253 self
._slave
_status
["mode"] = new_mode
3254 save_slave_status(self
.settings
, self
._slave
_status
)
3255 self
._logger
.info("Switched replication mode to '%s' by external command." % new_mode
)
3257 def handle_replicate(self
, argument
, client_ip
):
3258 # Last time our slave got a config update
3260 last_update
= int(argument
)
3261 if self
.settings
.options
.debug
:
3262 self
._logger
.info("Replication: sync request from %s, last update %d seconds ago" %
3263 (client_ip
, time
.time() - last_update
))
3266 raise MKClientError("Invalid arguments to command REPLICATE")
3267 return replication_send(self
._config
, self
._lock
_configuration
, self
._event
_status
,
3272 # .--Dispatching---------------------------------------------------------.
3274 # | | _ \(_)___ _ __ __ _| |_ ___| |__ (_)_ __ __ _ |
3275 # | | | | | / __| '_ \ / _` | __/ __| '_ \| | '_ \ / _` | |
3276 # | | |_| | \__ \ |_) | (_| | || (__| | | | | | | | (_| | |
3277 # | |____/|_|___/ .__/ \__,_|\__\___|_| |_|_|_| |_|\__, | |
3279 # +----------------------------------------------------------------------+
3280 # | Starten und Verwalten der beiden Threads. |
3281 # '----------------------------------------------------------------------'
3284 def run_eventd(terminate_main_event
, settings
, config
, lock_configuration
, history
, perfcounters
,
3285 event_status
, event_server
, status_server
, slave_status
, logger
):
3286 status_server
.start()
3287 event_server
.start()
3289 next_housekeeping
= now
+ config
["housekeeping_interval"]
3290 next_retention
= now
+ config
["retention_interval"]
3291 next_statistics
= now
+ config
["statistics_interval"]
3292 next_replication
= 0 # force immediate replication after restart
3294 while not terminate_main_event
.is_set():
3297 # Wait until either housekeeping or retention is due, but at
3298 # maximum 60 seconds. That way changes of the interval from a very
3299 # high to a low value will never require more than 60 seconds
3301 event_list
= [next_housekeeping
, next_retention
, next_statistics
]
3302 if is_replication_slave(config
):
3303 event_list
.append(next_replication
)
3305 time_left
= max(0, min(event_list
) - time
.time())
3306 time
.sleep(min(time_left
, 60))
3309 if now
> next_housekeeping
:
3310 event_server
.do_housekeeping()
3311 next_housekeeping
= now
+ config
["housekeeping_interval"]
3313 if now
> next_retention
:
3314 with event_status
.lock
:
3315 event_status
.save_status()
3316 next_retention
= now
+ config
["retention_interval"]
3318 if now
> next_statistics
:
3319 perfcounters
.do_statistics()
3320 next_statistics
= now
+ config
["statistics_interval"]
3322 # Beware: replication might be turned on during this loop!
3323 if is_replication_slave(config
) and now
> next_replication
:
3324 replication_pull(settings
, config
, lock_configuration
, perfcounters
,
3325 event_status
, event_server
, slave_status
, logger
)
3326 next_replication
= now
+ config
["replication"]["interval"]
3327 except MKSignalException
as e
:
3329 except Exception as e
:
3330 logger
.exception("Exception in main thread:\n%s" % e
)
3331 if settings
.options
.debug
:
3334 except MKSignalException
as e
:
3336 logger
.info("Received SIGHUP - going to reload configuration")
3337 reload_configuration(settings
, logger
, lock_configuration
, history
, event_status
,
3338 event_server
, status_server
, slave_status
)
3340 logger
.info("Signalled to death by signal %d" % e
._signum
)
3341 terminate(terminate_main_event
, event_server
, status_server
)
3343 # Now wait for termination of the server threads
3345 status_server
.join()
3349 # .--EventStatus---------------------------------------------------------.
3350 # | _____ _ ____ _ _ |
3351 # | | ____|_ _____ _ __ | |_/ ___|| |_ __ _| |_ _ _ ___ |
3352 # | | _| \ \ / / _ \ '_ \| __\___ \| __/ _` | __| | | / __| |
3353 # | | |___ \ V / __/ | | | |_ ___) | || (_| | |_| |_| \__ \ |
3354 # | |_____| \_/ \___|_| |_|\__|____/ \__\__,_|\__|\__,_|___/ |
3356 # +----------------------------------------------------------------------+
3357 # | Bereithalten des aktuellen Event-Status. Dieser schützt sich selbst |
3358 # | durch ein Lock vor gleichzeitigen Zugriffen durch die Threads. |
3359 # '----------------------------------------------------------------------'
3362 class EventStatus(object):
3363 def __init__(self
, settings
, config
, perfcounters
, history
, logger
):
3364 self
.settings
= settings
3365 self
._config
= config
3366 self
._perfcounters
= perfcounters
3367 self
.lock
= threading
.Lock()
3368 self
._history
= history
3369 self
._logger
= logger
3372 def reload_configuration(self
, config
):
3373 self
._config
= config
3377 self
._next
_event
_id
= 1
3378 self
._rule
_stats
= {}
3379 self
._interval
_starts
= {} # needed for expecting rules
3380 self
._initialize
_event
_limit
_status
()
3382 # TODO: might introduce some performance counters, like:
3383 # - number of received messages
3384 # - number of rule hits
3385 # - number of rule misses
3390 def event(self
, eid
):
3391 for event
in self
._events
:
3392 if event
["id"] == eid
:
3395 # Return beginning of current expectation interval. For new rules
3396 # we start with the next interval in future.
3397 def interval_start(self
, rule_id
, interval
):
3398 if rule_id
not in self
._interval
_starts
:
3399 start
= self
.next_interval_start(interval
, time
.time())
3400 self
._interval
_starts
[rule_id
] = start
3403 start
= self
._interval
_starts
[rule_id
]
3404 # Make sure that if the user switches from day to hour and we
3405 # are still waiting for the first interval to begin, that we
3406 # do not wait for the next day.
3407 next_interval
= self
.next_interval_start(interval
, time
.time())
3408 if start
> next_interval
:
3409 start
= next_interval
3410 self
._interval
_starts
[rule_id
] = start
3413 def next_interval_start(self
, interval
, previous_start
):
3414 if isinstance(interval
, tuple):
3415 length
, offset
= interval
3421 previous_start
-= offset
# take into account timezone offset
3422 full_parts
= divmod(previous_start
, length
)[0]
3423 next_start
= (full_parts
+ 1) * length
3424 next_start
+= offset
3427 def start_next_interval(self
, rule_id
, interval
):
3428 current_start
= self
.interval_start(rule_id
, interval
)
3429 next_start
= self
.next_interval_start(interval
, current_start
)
3430 self
._interval
_starts
[rule_id
] = next_start
3431 self
._logger
.debug("Rule %s: next interval starts %s (i.e. now + %.2f sec)" %
3432 (rule_id
, next_start
, time
.time() - next_start
))
3434 def pack_status(self
):
3436 "next_event_id": self
._next
_event
_id
,
3437 "events": self
._events
,
3438 "rule_stats": self
._rule
_stats
,
3439 "interval_starts": self
._interval
_starts
,
3442 def unpack_status(self
, status
):
3443 self
._next
_event
_id
= status
["next_event_id"]
3444 self
._events
= status
["events"]
3445 self
._rule
_stats
= status
["rule_stats"]
3446 self
._interval
_starts
= status
["interval_starts"]
3448 def save_status(self
):
3450 status
= self
.pack_status()
3451 path
= self
.settings
.paths
.status_file
.value
3452 path_new
= path
.parent
/ (path
.name
+ '.new')
3453 # Believe it or not: cPickle is more than two times slower than repr()
3454 with path_new
.open(mode
='wb') as f
:
3455 f
.write(repr(status
) + "\n")
3457 os
.fsync(f
.fileno())
3458 path_new
.rename(path
)
3459 elapsed
= time
.time() - now
3460 self
._logger
.verbose("Saved event state to %s in %.3fms." % (path
, elapsed
* 1000))
3462 def reset_counters(self
, rule_id
):
3464 if rule_id
in self
._rule
_stats
:
3465 del self
._rule
_stats
[rule_id
]
3467 self
._rule
_stats
= {}
3470 def load_status(self
, event_server
):
3471 path
= self
.settings
.paths
.status_file
.value
3474 status
= ast
.literal_eval(path
.read_bytes())
3475 self
._next
_event
_id
= status
["next_event_id"]
3476 self
._events
= status
["events"]
3477 self
._rule
_stats
= status
["rule_stats"]
3478 self
._interval
_starts
= status
.get("interval_starts", {})
3479 self
._initialize
_event
_limit
_status
()
3480 self
._logger
.info("Loaded event state from %s." % path
)
3481 except Exception as e
:
3482 self
._logger
.exception("Error loading event state from %s: %s" % (path
, e
))
3486 for event
in self
._events
:
3487 event
.setdefault("ipaddress", "")
3489 if "core_host" not in event
:
3490 event_server
.add_core_host_to_event(event
)
3491 event
["host_in_downtime"] = False
3493 # Called on Event Console initialization from status file to initialize
3494 # the current event limit state -> Sets internal counters which are
3495 # updated during runtime.
3496 def _initialize_event_limit_status(self
):
3497 self
.num_existing_events
= len(self
._events
)
3499 self
.num_existing_events_by_host
= {}
3500 self
.num_existing_events_by_rule
= {}
3501 for event
in self
._events
:
3502 self
._count
_event
_add
(event
)
3504 def _count_event_add(self
, event
):
3505 if event
["host"] not in self
.num_existing_events_by_host
:
3506 self
.num_existing_events_by_host
[event
["host"]] = 1
3508 self
.num_existing_events_by_host
[event
["host"]] += 1
3510 if event
["rule_id"] not in self
.num_existing_events_by_rule
:
3511 self
.num_existing_events_by_rule
[event
["rule_id"]] = 1
3513 self
.num_existing_events_by_rule
[event
["rule_id"]] += 1
3515 def _count_event_remove(self
, event
):
3516 self
.num_existing_events
-= 1
3517 self
.num_existing_events_by_host
[event
["host"]] -= 1
3518 self
.num_existing_events_by_rule
[event
["rule_id"]] -= 1
3520 def new_event(self
, event
):
3521 self
._perfcounters
.count("events")
3522 event
["id"] = self
._next
_event
_id
3523 self
._next
_event
_id
+= 1
3524 self
._events
.append(event
)
3525 self
.num_existing_events
+= 1
3526 self
._count
_event
_add
(event
)
3527 self
._history
.add(event
, "NEW")
3529 def archive_event(self
, event
):
3530 self
._perfcounters
.count("events")
3531 event
["id"] = self
._next
_event
_id
3532 self
._next
_event
_id
+= 1
3533 event
["phase"] = "closed"
3534 self
._history
.add(event
, "ARCHIVED")
3536 def remove_event(self
, event
):
3538 self
._events
.remove(event
)
3539 self
._count
_event
_remove
(event
)
3541 self
._logger
.exception("Cannot remove event %d: not present" % event
["id"])
3543 # protected by self.lock
3544 def _remove_event_by_nr(self
, index
):
3545 event
= self
._events
.pop(index
)
3546 self
._count
_event
_remove
(event
)
3548 # protected by self.lock
3549 def remove_oldest_event(self
, ty
, event
):
3551 self
._logger
.verbose(" Removing oldest event")
3552 self
._remove
_event
_by
_nr
(0)
3553 elif ty
== "by_rule":
3554 self
._logger
.verbose(" Removing oldest event of rule \"%s\"" % event
["rule_id"])
3555 self
._remove
_oldest
_event
_of
_rule
(event
["rule_id"])
3556 elif ty
== "by_host":
3557 self
._logger
.verbose(" Removing oldest event of host \"%s\"" % event
["host"])
3558 self
._remove
_oldest
_event
_of
_host
(event
["host"])
3560 # protected by self.lock
3561 def _remove_oldest_event_of_rule(self
, rule_id
):
3562 for event
in self
._events
:
3563 if event
["rule_id"] == rule_id
:
3564 self
.remove_event(event
)
3567 # protected by self.lock
3568 def _remove_oldest_event_of_host(self
, hostname
):
3569 for event
in self
._events
:
3570 if event
["host"] == hostname
:
3571 self
.remove_event(event
)
3574 # protected by self.lock
3575 def get_num_existing_events_by(self
, ty
, event
):
3577 return self
.num_existing_events
3578 elif ty
== "by_rule":
3579 return self
.num_existing_events_by_rule
.get(event
["rule_id"], 0)
3580 elif ty
== "by_host":
3581 return self
.num_existing_events_by_host
.get(event
["host"], 0)
3583 raise NotImplementedError()
3585 # Cancel all events the belong to a certain rule id and are
3586 # of the same "breed" as a new event.
3587 def cancel_events(self
, event_server
, event_columns
, new_event
, match_groups
, rule
):
3590 for nr
, event
in enumerate(self
._events
):
3591 if event
["rule_id"] == rule
["id"]:
3592 if self
.cancelling_match(match_groups
, new_event
, event
, rule
):
3593 # Fill a few fields of the cancelled event with data from
3594 # the cancelling event so that action scripts have useful
3595 # values and the logfile entry if more relevant.
3596 previous_phase
= event
["phase"]
3597 event
["phase"] = "closed"
3598 # TODO: Why do we use OK below and not new_event["state"]???
3599 event
["state"] = 0 # OK
3600 event
["text"] = new_event
["text"]
3601 # TODO: This is a hack and partial copy-n-paste from rewrite_events...
3602 if "set_text" in rule
:
3603 event
["text"] = replace_groups(rule
["set_text"], event
["text"],
3605 event
["time"] = new_event
["time"]
3606 event
["last"] = new_event
["time"]
3607 event
["priority"] = new_event
["priority"]
3608 self
._history
.add(event
, "CANCELLED")
3609 actions
= rule
.get("cancel_actions", [])
3611 if previous_phase
!= "open" \
3612 and rule
.get("cancel_action_phases", "always") == "open":
3614 "Do not execute cancelling actions, event %s's phase "
3615 "is not 'open' but '%s'" % (event
["id"], previous_phase
))
3617 cmk
.ec
.actions
.do_event_actions(
3628 to_delete
.append(nr
)
3630 for nr
in to_delete
[::-1]:
3631 self
._remove
_event
_by
_nr
(nr
)
3633 def cancelling_match(self
, match_groups
, new_event
, event
, rule
):
3634 debug
= self
._config
["debug_rules"]
3636 # The match_groups of the canceling match only contain the *_ok match groups
3637 # Since the rewrite definitions are based on the positive match, we need to
3638 # create some missing keys. O.o
3639 for key
in match_groups
.keys():
3640 if key
.endswith("_ok"):
3641 match_groups
[key
[:-3]] = match_groups
[key
]
3643 # Note: before we compare host and application we need to
3644 # apply the rewrite rules to the event. Because if in the previous
3645 # the hostname was rewritten, it wouldn't match anymore here.
3646 host
= new_event
["host"]
3647 if "set_host" in rule
:
3648 host
= replace_groups(rule
["set_host"], host
, match_groups
)
3650 if event
["host"] != host
:
3652 self
._logger
.info("Do not cancel event %d: host is not the same (%s != %s)" %
3653 (event
["id"], event
["host"], host
))
3656 # The same for the application. But in case there is cancelling based on the application
3657 # configured in the rule, then don't check for different applications.
3658 if "cancel_application" not in rule
:
3659 application
= new_event
["application"]
3660 if "set_application" in rule
:
3661 application
= replace_groups(rule
["set_application"], application
, match_groups
)
3662 if event
["application"] != application
:
3665 "Do not cancel event %d: application is not the same (%s != %s)" %
3666 (event
["id"], event
["application"], application
))
3669 if event
["facility"] != new_event
["facility"]:
3672 "Do not cancel event %d: syslog facility is not the same (%d != %d)" %
3673 (event
["id"], event
["facility"], new_event
["facility"]))
3675 # Make sure, that the matching groups are the same. If the OK match
3676 # has less groups, we do not care. If it has more groups, then we
3677 # do not care either. We just compare the common "prefix".
3678 for nr
, (prev_group
, cur_group
) in enumerate(
3679 zip(event
["match_groups"], match_groups
.get("match_groups_message_ok", ()))):
3680 if prev_group
!= cur_group
:
3682 self
._logger
.info("Do not cancel event %d: match group number "
3683 "%d does not match (%s != %s)" % (event
["id"], nr
+ 1,
3684 prev_group
, cur_group
))
3687 # Note: Duplicated code right above
3688 # Make sure, that the syslog_application matching groups are the same. If the OK match
3689 # has less groups, we do not care. If it has more groups, then we
3690 # do not care either. We just compare the common "prefix".
3691 for nr
, (prev_group
, cur_group
) in enumerate(
3693 event
.get("match_groups_syslog_application", ()),
3694 match_groups
.get("match_groups_syslog_application_ok", ()))):
3695 if prev_group
!= cur_group
:
3698 "Do not cancel event %d: syslog application match group number "
3699 "%d does not match (%s != %s)" % (event
["id"], nr
+ 1, prev_group
,
3705 def count_rule_match(self
, rule_id
):
3707 self
._rule
_stats
.setdefault(rule_id
, 0)
3708 self
._rule
_stats
[rule_id
] += 1
3710 def count_event_up(self
, found
, event
):
3711 # Update event with new information from new occurrance,
3712 # but preserve certain attributes from the original (first)
3715 "count": found
.get("count", 1) + 1,
3716 "first": found
["first"],
3718 # When event is already active then do not change
3719 # comment or contact information anymore
3720 if found
["phase"] == "open":
3721 if "comment" in found
:
3722 preserve
["comment"] = found
["comment"]
3723 if "contact" in found
:
3724 preserve
["contact"] = found
["contact"]
3726 found
.update(preserve
)
3728 def count_expected_event(self
, event_server
, event
):
3729 for ev
in self
._events
:
3730 if ev
["rule_id"] == event
["rule_id"] and ev
["phase"] == "counting":
3731 self
.count_event_up(ev
, event
)
3734 # None found, create one
3736 event
["phase"] = "counting"
3737 event_server
.new_event_respecting_limits(event
)
3739 def count_event(self
, event_server
, event
, rule
, count
):
3740 # Find previous occurrance of this event and acount for
3741 # one new occurrance. In case of negated count (expecting rules)
3742 # we do never modify events that are already in the state "open"
3743 # since the event has been created because the count was too
3744 # low in the specified period of time.
3745 for ev
in self
._events
:
3746 if ev
["rule_id"] == event
["rule_id"]:
3747 if ev
["phase"] == "ack" and not count
["count_ack"]:
3748 continue # skip acknowledged events
3750 if count
["separate_host"] and ev
["host"] != event
["host"]:
3751 continue # treat events with separated hosts separately
3753 if count
["separate_application"] and ev
["application"] != event
["application"]:
3754 continue # same for application
3756 if count
["separate_match_groups"] and ev
["match_groups"] != event
["match_groups"]:
3759 if count
.get("count_duration"
3760 ) is not None and ev
["first"] + count
["count_duration"] < event
["time"]:
3761 # Counting has been discontinued on this event after a certain time
3764 if ev
["host_in_downtime"] != event
["host_in_downtime"]:
3765 continue # treat events with different downtime states separately
3768 self
.count_event_up(found
, event
)
3772 event
["phase"] = "counting"
3773 event_server
.new_event_respecting_limits(event
)
3776 # Did we just count the event that was just one too much?
3777 if found
["phase"] == "counting" and found
["count"] >= count
["count"]:
3778 found
["phase"] = "open"
3779 return found
# do event action, return found copy of event
3780 return False # do not do event action
3782 # locked with self.lock
3783 def delete_event(self
, event_id
, user
):
3784 for nr
, event
in enumerate(self
._events
):
3785 if event
["id"] == event_id
:
3786 event
["phase"] = "closed"
3788 event
["owner"] = user
3789 self
._history
.add(event
, "DELETE", user
)
3790 self
._remove
_event
_by
_nr
(nr
)
3792 raise MKClientError("No event with id %s" % event_id
)
3794 def get_events(self
):
3797 def get_rule_stats(self
):
3798 return sorted(self
._rule
_stats
.iteritems(), key
=lambda x
: x
[0])
3802 # .--Replication---------------------------------------------------------.
3804 # | | _ \ ___ _ __ | (_) ___ __ _| |_(_) ___ _ __ |
3805 # | | |_) / _ \ '_ \| | |/ __/ _` | __| |/ _ \| '_ \ |
3806 # | | _ < __/ |_) | | | (_| (_| | |_| | (_) | | | | |
3807 # | |_| \_\___| .__/|_|_|\___\__,_|\__|_|\___/|_| |_| |
3809 # +----------------------------------------------------------------------+
3810 # | Functions for doing replication, master and slave parts. |
3811 # '----------------------------------------------------------------------'
3814 def is_replication_slave(config
):
3815 repl_settings
= config
["replication"]
3816 return repl_settings
and not repl_settings
.get("disabled")
3819 def replication_allow_command(config
, command
, slave_status
):
3820 if is_replication_slave(config
) and slave_status
["mode"] == "sync" \
3821 and command
in ["DELETE", "UPDATE", "CHANGESTATE", "ACTION"]:
3822 raise MKClientError("This command is not allowed on a replication slave "
3823 "while it is in sync mode.")
3826 def replication_send(config
, lock_configuration
, event_status
, last_update
):
3828 with lock_configuration
:
3829 response
["status"] = event_status
.pack_status()
3830 if last_update
< config
["last_reload"]:
3831 response
["rules"] = config
[
3832 "rules"] # Remove one bright day, where legacy rules are not needed anymore
3833 response
["rule_packs"] = config
["rule_packs"]
3834 response
["actions"] = config
["actions"]
3838 def replication_pull(settings
, config
, lock_configuration
, perfcounters
, event_status
, event_server
,
3839 slave_status
, logger
):
3840 # We distinguish two modes:
3841 # 1. slave mode: just pull the current state from the master.
3842 # if the master is not reachable then decide whether to
3843 # switch to takeover mode.
3844 # 2. takeover mode: if automatic fallback is enabled and the
3845 # time frame for that has not yet ellapsed, then try to
3846 # pull the current state from the master. If that is successful
3847 # then switch back to slave mode. If not automatic fallback
3848 # is enabled then simply do nothing.
3850 repl_settings
= config
["replication"]
3851 mode
= slave_status
["mode"]
3852 need_sync
= mode
== "sync" or (
3853 mode
== "takeover" and "fallback" in repl_settings
and
3854 (slave_status
["last_master_down"] is None or
3855 now
- repl_settings
["fallback"] < slave_status
["last_master_down"]))
3858 with event_status
.lock
:
3859 with lock_configuration
:
3862 new_state
= get_state_from_master(config
, slave_status
)
3863 replication_update_state(settings
, config
, event_status
, event_server
,
3865 if repl_settings
.get("logging"):
3866 logger
.info("Successfully synchronized with master")
3867 slave_status
["last_sync"] = now
3868 slave_status
["success"] = True
3870 # Fall back to slave mode after successful sync
3871 # (time frame has already been checked)
3872 if mode
== "takeover":
3873 if slave_status
["last_master_down"] is None:
3874 logger
.info("Replication: master reachable for the first time, "
3875 "switching back to slave mode")
3876 slave_status
["mode"] = "sync"
3878 logger
.info("Replication: master reachable again after %d seconds, "
3879 "switching back to sync mode" %
3880 (now
- slave_status
["last_master_down"]))
3881 slave_status
["mode"] = "sync"
3882 slave_status
["last_master_down"] = None
3884 except Exception as e
:
3885 logger
.warning("Replication: cannot sync with master: %s" % e
)
3886 slave_status
["success"] = False
3887 if slave_status
["last_master_down"] is None:
3888 slave_status
["last_master_down"] = now
3891 if "takeover" in repl_settings
and mode
!= "takeover":
3892 if not slave_status
["last_sync"]:
3893 if repl_settings
.get("logging"):
3895 "Replication: no takeover since master was never reached.")
3897 offline
= now
- slave_status
["last_sync"]
3898 if offline
< repl_settings
["takeover"]:
3899 if repl_settings
.get("logging"):
3901 "Replication: no takeover yet, still %d seconds to wait" %
3902 (repl_settings
["takeover"] - offline
))
3905 "Replication: master not reached for %d seconds, taking over!" %
3907 slave_status
["mode"] = "takeover"
3909 save_slave_status(settings
, slave_status
)
3911 # Compute statistics of the average time needed for a sync
3912 perfcounters
.count_time("sync", time
.time() - now
)
3915 def replication_update_state(settings
, config
, event_status
, event_server
, new_state
):
3917 # Keep a copy of the masters' rules and actions and also prepare using them
3918 if "rules" in new_state
:
3919 save_master_config(settings
, new_state
)
3920 event_server
.compile_rules(new_state
["rules"], new_state
.get("rule_packs", []))
3921 config
["actions"] = new_state
["actions"]
3923 # Update to the masters' event state
3924 event_status
.unpack_status(new_state
["status"])
3927 def save_master_config(settings
, new_state
):
3928 path
= settings
.paths
.master_config_file
.value
3929 path_new
= path
.parent
/ (path
.name
+ '.new')
3930 path_new
.write_bytes(
3932 "rules": new_state
["rules"],
3933 "rule_packs": new_state
["rule_packs"],
3934 "actions": new_state
["actions"],
3936 path_new
.rename(path
)
3939 def load_master_config(settings
, config
, logger
):
3940 path
= settings
.paths
.master_config_file
.value
3942 config
= ast
.literal_eval(path
.read_bytes())
3943 config
["rules"] = config
["rules"]
3944 config
["rule_packs"] = config
.get("rule_packs", [])
3945 config
["actions"] = config
["actions"]
3946 logger
.info("Replication: restored %d rule packs and %d actions from %s" % (len(
3947 config
["rule_packs"]), len(config
["actions"]), path
))
3949 if is_replication_slave(config
):
3950 logger
.error("Replication: no previously saved master state available")
3953 def get_state_from_master(config
, slave_status
):
3954 repl_settings
= config
["replication"]
3956 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
3957 sock
.settimeout(repl_settings
["connect_timeout"])
3958 sock
.connect(repl_settings
["master"])
3960 "REPLICATE %d\n" % (slave_status
["last_sync"] and slave_status
["last_sync"] or 0))
3961 sock
.shutdown(socket
.SHUT_WR
)
3965 chunk
= sock
.recv(8192)
3966 response_text
+= chunk
3970 return ast
.literal_eval(response_text
)
3971 except SyntaxError as e
:
3972 raise Exception("Invalid response from event daemon: <pre>%s</pre>" % response_text
)
3974 except IOError as e
:
3975 raise Exception("Master not responding: %s" % e
)
3977 except Exception as e
:
3978 raise Exception("Cannot connect to event daemon: %s" % e
)
3981 def save_slave_status(settings
, slave_status
):
3982 settings
.paths
.slave_status_file
.value
.write_bytes(repr(slave_status
) + "\n")
3985 def default_slave_status_master():
3988 "last_master_down": None,
3990 "average_sync_time": None,
3994 def default_slave_status_sync():
3997 "last_master_down": None,
3999 "average_sync_time": None,
4003 def update_slave_status(slave_status
, settings
, config
):
4004 path
= settings
.paths
.slave_status_file
.value
4005 if is_replication_slave(config
):
4007 slave_status
.update(ast
.literal_eval(path
.read_bytes()))
4009 slave_status
.update(default_slave_status_sync())
4010 save_slave_status(settings
, slave_status
)
4014 slave_status
.update(default_slave_status_master())
4018 # .--Configuration-------------------------------------------------------.
4020 # | / ___|___ _ __ / _(_) __ _ _ _ _ __ __ _| |_(_) ___ _ __ |
4021 # | | | / _ \| '_ \| |_| |/ _` | | | | '__/ _` | __| |/ _ \| '_ \ |
4022 # | | |__| (_) | | | | _| | (_| | |_| | | | (_| | |_| | (_) | | | | |
4023 # | \____\___/|_| |_|_| |_|\__, |\__,_|_| \__,_|\__|_|\___/|_| |_| |
4025 # +----------------------------------------------------------------------+
4026 # | Loading of the configuration files |
4027 # '----------------------------------------------------------------------'
4030 def load_configuration(settings
, logger
, slave_status
):
4031 config
= cmk
.ec
.export
.load_config(settings
)
4033 # If not set by command line, set the log level by configuration
4034 if settings
.options
.verbosity
== 0:
4035 levels
= config
["log_level"]
4036 logger
.setLevel(levels
["cmk.mkeventd"])
4037 logger
.getChild("EventServer").setLevel(levels
["cmk.mkeventd.EventServer"])
4038 if "cmk.mkeventd.EventServer.snmp" in levels
:
4039 logger
.getChild("EventServer.snmp").setLevel(levels
["cmk.mkeventd.EventServer.snmp"])
4040 logger
.getChild("EventStatus").setLevel(levels
["cmk.mkeventd.EventStatus"])
4041 logger
.getChild("StatusServer").setLevel(levels
["cmk.mkeventd.StatusServer"])
4042 logger
.getChild("lock").setLevel(levels
["cmk.mkeventd.lock"])
4044 # Are we a replication slave? Parts of the configuration
4045 # will be overridden by values from the master.
4046 update_slave_status(slave_status
, settings
, config
)
4047 if is_replication_slave(config
):
4048 logger
.info("Replication: slave configuration, current mode: %s" % slave_status
["mode"])
4049 load_master_config(settings
, config
, logger
)
4051 # Create dictionary for actions for easy access
4052 config
["action"] = {}
4053 for action
in config
["actions"]:
4054 config
["action"][action
["id"]] = action
4056 config
["last_reload"] = time
.time()
4061 def reload_configuration(settings
, logger
, lock_configuration
, history
, event_status
, event_server
,
4062 status_server
, slave_status
):
4063 with lock_configuration
:
4064 config
= load_configuration(settings
, logger
, slave_status
)
4065 history
.reload_configuration(config
)
4066 event_server
.reload_configuration(config
)
4068 event_status
.reload_configuration(config
)
4069 status_server
.reload_configuration(config
)
4070 logger
.info("Reloaded configuration.")
4074 # .--Main----------------------------------------------------------------.
4076 # | | \/ | __ _(_)_ __ |
4077 # | | |\/| |/ _` | | '_ \ |
4078 # | | | | | (_| | | | | | |
4079 # | |_| |_|\__,_|_|_| |_| |
4081 # +----------------------------------------------------------------------+
4082 # | Main entry and option parsing |
4083 # '----------------------------------------------------------------------'
4088 logger
= cmk
.utils
.log
.get_logger("mkeventd")
4089 settings
= cmk
.ec
.settings
.settings(cmk
.__version
__, pathlib
.Path(cmk
.utils
.paths
.omd_root
),
4090 pathlib
.Path(cmk
.utils
.paths
.default_config_dir
), sys
.argv
)
4094 cmk
.utils
.log
.open_log(sys
.stderr
)
4095 cmk
.utils
.log
.set_verbosity(settings
.options
.verbosity
)
4097 settings
.paths
.log_file
.value
.parent
.mkdir(parents
=True, exist_ok
=True)
4098 if not settings
.options
.foreground
:
4099 cmk
.utils
.log
.open_log(str(settings
.paths
.log_file
.value
))
4101 logger
.info("-" * 65)
4102 logger
.info("mkeventd version %s starting" % cmk
.__version
__)
4104 slave_status
= default_slave_status_master()
4105 config
= load_configuration(settings
, logger
, slave_status
)
4106 history
= cmk
.ec
.history
.History(settings
, config
, logger
, StatusTableEvents
.columns
,
4107 StatusTableHistory
.columns
)
4109 pid_path
= settings
.paths
.pid_file
.value
4110 if pid_path
.exists():
4111 old_pid
= int(pid_path
.read_text(encoding
='utf-8'))
4112 if process_exists(old_pid
):
4114 logger
, "Old PID file %s still existing and mkeventd still running with PID %d."
4115 % (pid_path
, old_pid
))
4117 logger
.info("Removed orphaned PID file %s (process %d not running anymore)." %
4118 (pid_path
, old_pid
))
4120 # Make sure paths exist
4121 settings
.paths
.event_pipe
.value
.parent
.mkdir(parents
=True, exist_ok
=True)
4122 settings
.paths
.status_file
.value
.parent
.mkdir(parents
=True, exist_ok
=True)
4124 # First do all things that might fail, before daemonizing
4125 perfcounters
= Perfcounters(logger
.getChild("lock.perfcounters"))
4126 event_status
= EventStatus(settings
, config
, perfcounters
, history
,
4127 logger
.getChild("EventStatus"))
4128 lock_configuration
= ECLock(logger
.getChild("lock.configuration"))
4129 event_server
= EventServer(
4130 logger
.getChild("EventServer"), settings
, config
, slave_status
, perfcounters
,
4131 lock_configuration
, history
, event_status
, StatusTableEvents
.columns
)
4132 terminate_main_event
= threading
.Event()
4133 status_server
= StatusServer(
4134 logger
.getChild("StatusServer"), settings
, config
, slave_status
, perfcounters
,
4135 lock_configuration
, history
, event_status
, event_server
, terminate_main_event
)
4137 event_status
.load_status(event_server
)
4138 event_server
.compile_rules(config
["rules"], config
["rule_packs"])
4140 if not settings
.options
.foreground
:
4141 pid_path
.parent
.mkdir(parents
=True, exist_ok
=True)
4142 cmk
.utils
.daemon
.daemonize()
4143 logger
.info("Daemonized with PID %d." % os
.getpid())
4145 cmk
.utils
.daemon
.lock_with_pid_file(str(pid_path
))
4147 # Install signal hander
4148 def signal_handler(signum
, stack_frame
):
4149 logger
.verbose("Got signal %d." % signum
)
4150 raise MKSignalException(signum
)
4152 signal
.signal(signal
.SIGHUP
, signal_handler
)
4153 signal
.signal(signal
.SIGINT
, signal_handler
)
4154 signal
.signal(signal
.SIGQUIT
, signal_handler
)
4155 signal
.signal(signal
.SIGTERM
, signal_handler
)
4158 run_eventd(terminate_main_event
, settings
, config
, lock_configuration
, history
,
4159 perfcounters
, event_status
, event_server
, status_server
, slave_status
, logger
)
4161 # We reach this point, if the server has been killed by
4162 # a signal or hitting Ctrl-C (in foreground mode)
4164 # TODO: Move this cleanup stuff to the classes that are responsible for these resources
4166 # Remove event pipe and drain it, so that we make sure
4167 # that processes (syslog, etc) will not hang when trying
4168 # to write into the pipe.
4169 logger
.verbose("Cleaning up event pipe")
4170 pipe
= event_server
.open_pipe() # Open it
4171 settings
.paths
.event_pipe
.value
.unlink() # Remove pipe
4172 drain_pipe(pipe
) # Drain any data
4173 os
.close(pipe
) # Close pipe
4175 logger
.verbose("Saving final event state")
4176 event_status
.save_status()
4178 logger
.verbose("Cleaning up sockets")
4179 settings
.paths
.unix_socket
.value
.unlink()
4180 settings
.paths
.event_socket
.value
.unlink()
4182 logger
.verbose("Output hash stats")
4183 event_server
.output_hash_stats()
4185 logger
.verbose("Closing fds which might be still open")
4187 settings
.options
.syslog_udp
, settings
.options
.syslog_tcp
,
4188 settings
.options
.snmptrap_udp
4191 if isinstance(fd
, cmk
.ec
.settings
.FileDescriptor
):
4196 logger
.info("Successfully shut down.")
4200 if settings
.options
.debug
:
4202 bail_out(logger
, traceback
.format_exc())
4205 if pid_path
and cmk
.utils
.store
.have_lock(str(pid_path
)):
4212 if __name__
== "__main__":