Refactoring: Changed remaining check parameters starting with an 's' to the new rules...
[check_mk.git] / cmk / ec / history.py
blob9eaea61d0bb8acfaf0ddef198aedc2fbd59378ac
1 #!/usr/bin/env python
2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
9 # | |
10 # | Copyright Mathias Kettner 2018 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
27 import os
28 import string
29 import subprocess
30 import threading
31 import time
33 import six
35 import cmk.ec.actions
36 import cmk.utils.render
38 # TODO: As one can see clearly below, we should really have a class hierarchy here...
41 class History(object):
42 def __init__(self, settings, config, logger, event_columns, history_columns):
43 super(History, self).__init__()
44 self._settings = settings
45 self._config = config
46 self._logger = logger
47 self._event_columns = event_columns
48 self._history_columns = history_columns
49 self._lock = threading.Lock()
50 self._mongodb = MongoDB()
51 self._active_history_period = ActiveHistoryPeriod()
52 self.reload_configuration(config)
54 def reload_configuration(self, config):
55 self._config = config
56 if self._config['archive_mode'] == 'mongodb':
57 _reload_configuration_mongodb(self)
58 else:
59 _reload_configuration_files(self)
61 def flush(self):
62 if self._config['archive_mode'] == 'mongodb':
63 _flush_mongodb(self)
64 else:
65 _flush_files(self)
67 def add(self, event, what, who="", addinfo=""):
68 if self._config['archive_mode'] == 'mongodb':
69 _add_mongodb(self, event, what, who, addinfo)
70 else:
71 _add_files(self, event, what, who, addinfo)
73 def get(self, query):
74 if self._config['archive_mode'] == 'mongodb':
75 return _get_mongodb(self, query)
76 return _get_files(self, query)
78 def housekeeping(self):
79 if self._config['archive_mode'] == 'mongodb':
80 _housekeeping_mongodb(self)
81 else:
82 _housekeeping_files(self)
86 # .--MongoDB-------------------------------------------------------------.
87 # | __ __ ____ ____ |
88 # | | \/ | ___ _ __ __ _ ___ | _ \| __ ) |
89 # | | |\/| |/ _ \| '_ \ / _` |/ _ \| | | | _ \ |
90 # | | | | | (_) | | | | (_| | (_) | |_| | |_) | |
91 # | |_| |_|\___/|_| |_|\__, |\___/|____/|____/ |
92 # | |___/ |
93 # +----------------------------------------------------------------------+
94 # | The Event Log Archive can be stored in a MongoDB instead of files, |
95 # | this section contains MongoDB related code. |
96 # '----------------------------------------------------------------------'
98 try:
99 from pymongo.connection import Connection
100 from pymongo import DESCENDING
101 from pymongo.errors import OperationFailure
102 import datetime
103 except ImportError:
104 Connection = None
107 class MongoDB(object):
108 def __init__(self):
109 super(MongoDB, self).__init__()
110 self.connection = None
111 self.db = None
114 def _reload_configuration_mongodb(history):
115 # Configure the auto deleting indexes in the DB
116 _update_mongodb_indexes(history._settings, history._mongodb)
117 _update_mongodb_history_lifetime(history._settings, history._config, history._mongodb)
120 def _housekeeping_mongodb(history):
121 pass
124 def _connect_mongodb(settings, mongodb):
125 if Connection is None:
126 raise Exception('Could not initialize MongoDB (Python-Modules are missing)')
127 mongodb.connection = Connection(*_mongodb_local_connection_opts(settings))
128 mongodb.db = mongodb.connection.__getitem__(os.environ['OMD_SITE'])
131 def _mongodb_local_connection_opts(settings):
132 ip, port = None, None
133 with settings.paths.mongodb_config_file.value.open(encoding='utf-8') as f:
134 for l in f:
135 if l.startswith('bind_ip'):
136 ip = l.split('=')[1].strip()
137 elif l.startswith('port'):
138 port = int(l.split('=')[1].strip())
139 return ip, port
142 def _flush_mongodb(history):
143 history._mongodb._mongodb.db.ec_archive.drop()
146 def _get_mongodb_max_history_age(mongodb):
147 result = mongodb.db.ec_archive.index_information()
148 if 'dt_-1' not in result or 'expireAfterSeconds' not in result['dt_-1']:
149 return -1
150 return result['dt_-1']['expireAfterSeconds']
153 def _update_mongodb_indexes(settings, mongodb):
154 if not mongodb.connection:
155 _connect_mongodb(settings, mongodb)
156 result = mongodb.db.ec_archive.index_information()
158 if 'time_-1' not in result:
159 mongodb.db.ec_archive.ensure_index([('time', DESCENDING)])
162 def _update_mongodb_history_lifetime(settings, config, mongodb):
163 if not mongodb.connection:
164 _connect_mongodb(settings, mongodb)
166 if _get_mongodb_max_history_age(mongodb) == config['history_lifetime'] * 86400:
167 return # do not update already correct index
169 try:
170 mongodb.db.ec_archive.drop_index("dt_-1")
171 except OperationFailure:
172 pass # Ignore not existing index
174 # Delete messages after x days
175 mongodb.db.ec_archive.ensure_index([('dt', DESCENDING)],
176 expireAfterSeconds=config['history_lifetime'] * 86400,
177 unique=False)
180 def _mongodb_next_id(mongodb, name, first_id=0):
181 ret = mongodb.db.counters.find_and_modify(
182 query={'_id': name}, update={'$inc': {
183 'seq': 1
184 }}, new=True)
186 if not ret:
187 # Initialize the index!
188 mongodb.db.counters.insert({'_id': name, 'seq': first_id})
189 return first_id
190 return ret['seq']
193 def _add_mongodb(history, event, what, who, addinfo):
194 _log_event(history._config, history._logger, event, what, who, addinfo)
195 if not history._mongodb.connection:
196 _connect_mongodb(history._settings, history._mongodb)
197 # We converted _id to be an auto incrementing integer. This makes the unique
198 # index compatible to history_line of the file (which is handled as integer)
199 # within mkeventd. It might be better to use the ObjectId() of MongoDB, but
200 # for the first step, we use the integer index for simplicity
201 now = time.time()
202 history._mongodb.db.ec_archive.insert({
203 '_id': _mongodb_next_id(history._mongodb, 'ec_archive_id'),
204 'dt': datetime.datetime.fromtimestamp(now),
205 'time': now,
206 'event': event,
207 'what': what,
208 'who': who,
209 'addinfo': addinfo,
213 def _log_event(config, logger, event, what, who, addinfo):
214 if config['debug_rules']:
215 logger.info("Event %d: %s/%s/%s - %s" % (event["id"], what, who, addinfo, event["text"]))
218 def _get_mongodb(history, query):
219 filters, limit = query.filters, query.limit
221 history_entries = []
223 if not history._mongodb.connection:
224 _connect_mongodb(history._settings, history._mongodb)
226 # Construct the mongodb filtering specification. We could fetch all information
227 # and do filtering on this data, but this would be way too inefficient.
228 query = {}
229 for column_name, operator_name, _predicate, argument in filters:
231 if operator_name == '=':
232 mongo_filter = argument
233 elif operator_name == '>':
234 mongo_filter = {'$gt': argument}
235 elif operator_name == '<':
236 mongo_filter = {'$lt': argument}
237 elif operator_name == '>=':
238 mongo_filter = {'$gte': argument}
239 elif operator_name == '<=':
240 mongo_filter = {'$lte': argument}
241 elif operator_name == '~': # case sensitive regex, find pattern in string
242 mongo_filter = {'$regex': argument, '$options': ''}
243 elif operator_name == '=~': # case insensitive, match whole string
244 mongo_filter = {'$regex': argument, '$options': 'mi'}
245 elif operator_name == '~~': # case insensitive regex, find pattern in string
246 mongo_filter = {'$regex': argument, '$options': 'i'}
247 elif operator_name == 'in':
248 mongo_filter = {'$in': argument}
249 else:
250 raise Exception(
251 'Filter operator of filter %s not implemented for MongoDB archive' % column_name)
253 if column_name[:6] == 'event_':
254 query['event.' + column_name[6:]] = mongo_filter
255 elif column_name[:8] == 'history_':
256 key = column_name[8:]
257 if key == 'line':
258 key = '_id'
259 query[key] = mongo_filter
260 else:
261 raise Exception('Filter %s not implemented for MongoDB' % column_name)
263 result = history._mongodb.db.ec_archive.find(query).sort('time', -1)
265 # Might be used for debugging / profiling
266 #file(cmk.utils.paths.omd_root + '/var/log/check_mk/ec_history_debug.log', 'a').write(
267 # pprint.pformat(filters) + '\n' + pprint.pformat(result.explain()) + '\n')
269 if limit:
270 result = result.limit(limit + 1)
272 # now convert the MongoDB data structure to the eventd internal one
273 for entry in result:
274 item = [
275 entry['_id'],
276 entry['time'],
277 entry['what'],
278 entry['who'],
279 entry['addinfo'],
281 for colname, defval in history._event_columns:
282 key = colname[6:] # drop "event_"
283 item.append(entry['event'].get(key, defval))
284 history_entries.append(item)
286 return history_entries
290 # .--History-------------------------------------------------------------.
291 # | _ _ _ _ |
292 # | | | | (_)___| |_ ___ _ __ _ _ |
293 # | | |_| | / __| __/ _ \| '__| | | | |
294 # | | _ | \__ \ || (_) | | | |_| | |
295 # | |_| |_|_|___/\__\___/|_| \__, | |
296 # | |___/ |
297 # +----------------------------------------------------------------------+
298 # | Functions for logging the history of events |
299 # '----------------------------------------------------------------------'
302 def _reload_configuration_files(history):
303 pass
306 def _flush_files(history):
307 _expire_logfiles(history._settings, history._config, history._logger, history._lock, True)
310 def _housekeeping_files(history):
311 _expire_logfiles(history._settings, history._config, history._logger, history._lock, False)
314 # Make a new entry in the event history. Each entry is tab-separated line
315 # with the following columns:
316 # 0: time of log entry
317 # 1: type of entry (keyword)
318 # 2: user who initiated the action (for GUI actions)
319 # 3: additional information about the action
320 # 4-oo: StatusTableEvents.columns
321 def _add_files(history, event, what, who, addinfo):
322 _log_event(history._config, history._logger, event, what, who, addinfo)
323 with history._lock:
324 columns = [str(time.time()), scrub_string(what), scrub_string(who), scrub_string(addinfo)]
325 columns += [
326 quote_tab(event.get(colname[6:], defval)) # drop "event_"
327 for colname, defval in history._event_columns
330 with get_logfile(history._config, history._settings.paths.history_dir.value,
331 history._active_history_period).open(mode='ab') as f:
332 f.write("\t".join(map(cmk.ec.actions.to_utf8, columns)) + "\n")
335 def quote_tab(col):
336 ty = type(col)
337 if ty in [float, int]:
338 return str(col)
339 elif ty is bool:
340 return '1' if col else '0'
341 elif ty in [tuple, list]:
342 col = "\1" + "\1".join([quote_tab(e) for e in col])
343 elif col is None:
344 col = "\2"
345 elif ty is unicode:
346 col = col.encode("utf-8")
348 return col.replace("\t", " ")
351 class ActiveHistoryPeriod(object):
352 def __init__(self):
353 super(ActiveHistoryPeriod, self).__init__()
354 self.value = None
357 # Get file object to current log file, handle also
358 # history and lifetime limit.
359 def get_logfile(config, log_dir, active_history_period):
360 log_dir.mkdir(parents=True, exist_ok=True)
361 # Log into file starting at current history period,
362 # but: if a newer logfile exists, use that one. This
363 # can happen if you switch the period from daily to
364 # weekly.
365 timestamp = _current_history_period(config)
367 # Log period has changed or we have not computed a filename yet ->
368 # compute currently active period
369 if active_history_period.value is None or timestamp > active_history_period.value:
371 # Look if newer files exist
372 timestamps = sorted(int(str(path.name)[:-4]) for path in log_dir.glob('*.log'))
373 if len(timestamps) > 0:
374 timestamp = max(timestamps[-1], timestamp)
376 active_history_period.value = timestamp
378 return log_dir / ("%d.log" % timestamp)
381 # Return timestamp of the beginning of the current history
382 # period.
383 def _current_history_period(config):
384 now_broken = list(time.localtime())
385 now_broken[3:6] = [0, 0, 0] # set clock to 00:00:00
386 now_ts = time.mktime(now_broken) # convert to timestamp
387 if config["history_rotation"] == "weekly":
388 now_ts -= now_broken[6] * 86400 # convert to monday
389 return int(now_ts)
392 # Delete old log files
393 def _expire_logfiles(settings, config, logger, lock_history, flush):
394 with lock_history:
395 try:
396 days = config["history_lifetime"]
397 min_mtime = time.time() - days * 86400
398 logger.verbose("Expiring logfiles (Horizon: %d days -> %s)" %
399 (days, cmk.utils.render.date_and_time(min_mtime)))
400 for path in settings.paths.history_dir.value.glob('*.log'):
401 if flush or path.stat().st_mtime < min_mtime:
402 logger.info("Deleting log file %s (age %s)" %
403 (path, cmk.utils.render.date_and_time(path.stat().st_mtime)))
404 path.unlink()
405 except Exception as e:
406 if settings.options.debug:
407 raise
408 logger.exception("Error expiring log files: %s" % e)
411 def _get_files(history, query):
412 filters, limit = query.filters, query.limit
413 history_entries = []
414 if not history._settings.paths.history_dir.value.exists():
415 return []
417 # Optimization: use grep in order to reduce amount
418 # of read lines based on some frequently used filters.
419 grepping_filters = [
420 'event_text',
421 'event_comment',
422 'event_host',
423 'event_host_regex',
424 'event_contact',
425 'event_application',
426 'event_rule_id',
427 'event_owner',
428 'event_ipaddress',
429 'event_core_host',
431 greptexts = []
432 for column_name, operator_name, _predicate, argument in filters:
433 # Make sure that the greptexts are in the same order as in the
434 # actual logfiles. They will be joined with ".*"!
435 try:
436 nr = grepping_filters.index(column_name)
437 if operator_name in ['=' '~~']:
438 greptexts.append((nr, argument))
439 except Exception:
440 pass
442 greptexts.sort()
443 greptexts = [x[1] for x in greptexts]
445 time_filters = [f for f in filters if f[0].split("_")[-1] == "time"]
447 # We do not want to open all files. So our strategy is:
448 # look for "time" filters and first apply the filter to
449 # the first entry and modification time of the file. Only
450 # if at least one of both timestamps is accepted then we
451 # take that file into account.
452 # Use the later logfiles first, to get the newer log entries
453 # first. When a limit is reached, the newer entries should
454 # be processed in most cases. We assume that now.
455 # To keep a consistent order of log entries, we should care
456 # about sorting the log lines in reverse, but that seems to
457 # already be done by the GUI, so we don't do that twice. Skipping
458 # this # will lead into some lines of a single file to be limited in
459 # wrong order. But this should be better than before.
460 for ts, path in sorted(((int(str(path.name)[:-4]), path)
461 for path in history._settings.paths.history_dir.value.glob('*.log')),
462 reverse=True):
463 if limit is not None and limit <= 0:
464 break
465 first_entry, last_entry = _get_logfile_timespan(path)
466 for _column_name, _operator_name, predicate, _argument in time_filters:
467 if predicate(first_entry):
468 break
469 if predicate(last_entry):
470 break
471 else:
472 # If no filter matches but we *have* filters
473 # then we skip this file. It cannot contain
474 # any useful entry for us.
475 if len(time_filters):
476 if history._settings.options.debug:
477 history._logger.info("Skipping logfile %s.log because of time filter" % ts)
478 continue # skip this file
480 new_entries = _parse_history_file(history, path, query, greptexts, limit, history._logger)
481 history_entries += new_entries
482 if limit is not None:
483 limit -= len(new_entries)
485 return history_entries
488 def _parse_history_file(history, path, query, greptexts, limit, logger):
489 entries = []
490 line_no = 0
491 # If we have greptexts we pre-filter the file using the extremely
492 # fast GNU Grep
493 # Revert lines from the log file to have the newer lines processed first
494 cmd = 'tac %s' % cmk.ec.actions.quote_shell_string(str(path))
495 if greptexts:
496 cmd += " | egrep -i -e %s" % cmk.ec.actions.quote_shell_string(".*".join(greptexts))
497 grep = subprocess.Popen(cmd, shell=True, close_fds=True, stdout=subprocess.PIPE) # nosec
499 for line in grep.stdout:
500 line_no += 1
501 if limit is not None and len(entries) > limit:
502 grep.kill()
503 grep.wait()
504 break
506 try:
507 parts = line.decode('utf-8').rstrip('\n').split('\t')
508 _convert_history_line(history, parts)
509 values = [line_no] + parts
510 if query.filter_row(values):
511 entries.append(values)
512 except Exception as e:
513 logger.exception("Invalid line '%s' in history file %s: %s" % (line, path, e))
515 return entries
518 # Speed-critical function for converting string representation
519 # of log line back to Python values
520 def _convert_history_line(history, values):
521 # NOTE: history_line column is missing here, so indices are off by 1! :-P
522 values[0] = float(values[0]) # history_time
523 values[4] = int(values[4]) # event_id
524 values[5] = int(values[5]) # event_count
525 values[7] = float(values[7]) # event_first
526 values[8] = float(values[8]) # event_last
527 values[10] = int(values[10]) # event_sl
528 values[14] = int(values[14]) # event_pid
529 values[15] = int(values[15]) # event_priority
530 values[16] = int(values[16]) # event_facility
531 values[18] = int(values[18]) # event_state
532 values[21] = _unsplit(values[21]) # event_match_groups
533 num_values = len(values)
534 if num_values <= 22: # event_contact_groups
535 values.append(None)
536 else:
537 values[22] = _unsplit(values[22])
538 if num_values <= 23: # event_ipaddress
539 values.append(history._history_columns[24][1])
540 if num_values <= 24: # event_orig_host
541 values.append(history._history_columns[25][1])
542 if num_values <= 25: # event_contact_groups_precedence
543 values.append(history._history_columns[26][1])
544 if num_values <= 26: # event_core_host
545 values.append(history._history_columns[27][1])
546 if num_values <= 27: # event_host_in_downtime
547 values.append(history._history_columns[28][1])
548 else:
549 values[27] = values[27] == "1"
550 if num_values <= 28: # event_match_groups_syslog_application
551 values.append(history._history_columns[29][1])
552 else:
553 values[28] = _unsplit(values[28])
556 def _unsplit(s):
557 if not isinstance(s, six.string_types):
558 return s
560 elif s.startswith('\2'):
561 return None # \2 is the designator for None
563 elif s.startswith('\1'):
564 if len(s) == 1:
565 return ()
566 return tuple(s[1:].split('\1'))
567 return s
570 def _get_logfile_timespan(path):
571 try:
572 with path.open() as f:
573 first_entry = float(f.readline().split('\t', 1)[0])
574 last_entry = path.stat().st_mtime
575 return first_entry, last_entry
576 except Exception:
577 return 0.0, 0.0
580 # Rip out/replace any characters which have a special meaning in the UTF-8
581 # encoded history files, see e.g. quote_tab. In theory this shouldn't be
582 # necessary, because there are a bunch of bytes which are not contained in any
583 # valid UTF-8 string, but following Murphy's Law, those are not used in
584 # Check_MK. To keep backwards compatibility with old history files, we have no
585 # choice and continue to do it wrong... :-/
586 def scrub_string(s):
587 if isinstance(s, str):
588 return s.translate(scrub_string.str_table, "\0\1\2\n")
589 if isinstance(s, unicode):
590 return s.translate(scrub_string.unicode_table)
591 raise TypeError("scrub_string expects a string argument")
594 scrub_string.str_table = string.maketrans("\t", " ")
595 scrub_string.unicode_table = {0: None, 1: None, 2: None, ord("\n"): None, ord("\t"): ord(" ")}