2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
10 # | Copyright Mathias Kettner 2018 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
36 import cmk
.utils
.render
38 # TODO: As one can see clearly below, we should really have a class hierarchy here...
41 class History(object):
42 def __init__(self
, settings
, config
, logger
, event_columns
, history_columns
):
43 super(History
, self
).__init
__()
44 self
._settings
= settings
47 self
._event
_columns
= event_columns
48 self
._history
_columns
= history_columns
49 self
._lock
= threading
.Lock()
50 self
._mongodb
= MongoDB()
51 self
._active
_history
_period
= ActiveHistoryPeriod()
52 self
.reload_configuration(config
)
54 def reload_configuration(self
, config
):
56 if self
._config
['archive_mode'] == 'mongodb':
57 _reload_configuration_mongodb(self
)
59 _reload_configuration_files(self
)
62 if self
._config
['archive_mode'] == 'mongodb':
67 def add(self
, event
, what
, who
="", addinfo
=""):
68 if self
._config
['archive_mode'] == 'mongodb':
69 _add_mongodb(self
, event
, what
, who
, addinfo
)
71 _add_files(self
, event
, what
, who
, addinfo
)
74 if self
._config
['archive_mode'] == 'mongodb':
75 return _get_mongodb(self
, query
)
76 return _get_files(self
, query
)
78 def housekeeping(self
):
79 if self
._config
['archive_mode'] == 'mongodb':
80 _housekeeping_mongodb(self
)
82 _housekeeping_files(self
)
86 # .--MongoDB-------------------------------------------------------------.
88 # | | \/ | ___ _ __ __ _ ___ | _ \| __ ) |
89 # | | |\/| |/ _ \| '_ \ / _` |/ _ \| | | | _ \ |
90 # | | | | | (_) | | | | (_| | (_) | |_| | |_) | |
91 # | |_| |_|\___/|_| |_|\__, |\___/|____/|____/ |
93 # +----------------------------------------------------------------------+
94 # | The Event Log Archive can be stored in a MongoDB instead of files, |
95 # | this section contains MongoDB related code. |
96 # '----------------------------------------------------------------------'
99 from pymongo
.connection
import Connection
100 from pymongo
import DESCENDING
101 from pymongo
.errors
import OperationFailure
107 class MongoDB(object):
109 super(MongoDB
, self
).__init
__()
110 self
.connection
= None
114 def _reload_configuration_mongodb(history
):
115 # Configure the auto deleting indexes in the DB
116 _update_mongodb_indexes(history
._settings
, history
._mongodb
)
117 _update_mongodb_history_lifetime(history
._settings
, history
._config
, history
._mongodb
)
120 def _housekeeping_mongodb(history
):
124 def _connect_mongodb(settings
, mongodb
):
125 if Connection
is None:
126 raise Exception('Could not initialize MongoDB (Python-Modules are missing)')
127 mongodb
.connection
= Connection(*_mongodb_local_connection_opts(settings
))
128 mongodb
.db
= mongodb
.connection
.__getitem
__(os
.environ
['OMD_SITE'])
131 def _mongodb_local_connection_opts(settings
):
132 ip
, port
= None, None
133 with settings
.paths
.mongodb_config_file
.value
.open(encoding
='utf-8') as f
:
135 if l
.startswith('bind_ip'):
136 ip
= l
.split('=')[1].strip()
137 elif l
.startswith('port'):
138 port
= int(l
.split('=')[1].strip())
142 def _flush_mongodb(history
):
143 history
._mongodb
._mongodb
.db
.ec_archive
.drop()
146 def _get_mongodb_max_history_age(mongodb
):
147 result
= mongodb
.db
.ec_archive
.index_information()
148 if 'dt_-1' not in result
or 'expireAfterSeconds' not in result
['dt_-1']:
150 return result
['dt_-1']['expireAfterSeconds']
153 def _update_mongodb_indexes(settings
, mongodb
):
154 if not mongodb
.connection
:
155 _connect_mongodb(settings
, mongodb
)
156 result
= mongodb
.db
.ec_archive
.index_information()
158 if 'time_-1' not in result
:
159 mongodb
.db
.ec_archive
.ensure_index([('time', DESCENDING
)])
162 def _update_mongodb_history_lifetime(settings
, config
, mongodb
):
163 if not mongodb
.connection
:
164 _connect_mongodb(settings
, mongodb
)
166 if _get_mongodb_max_history_age(mongodb
) == config
['history_lifetime'] * 86400:
167 return # do not update already correct index
170 mongodb
.db
.ec_archive
.drop_index("dt_-1")
171 except OperationFailure
:
172 pass # Ignore not existing index
174 # Delete messages after x days
175 mongodb
.db
.ec_archive
.ensure_index([('dt', DESCENDING
)],
176 expireAfterSeconds
=config
['history_lifetime'] * 86400,
180 def _mongodb_next_id(mongodb
, name
, first_id
=0):
181 ret
= mongodb
.db
.counters
.find_and_modify(
182 query
={'_id': name
}, update
={'$inc': {
187 # Initialize the index!
188 mongodb
.db
.counters
.insert({'_id': name
, 'seq': first_id
})
193 def _add_mongodb(history
, event
, what
, who
, addinfo
):
194 _log_event(history
._config
, history
._logger
, event
, what
, who
, addinfo
)
195 if not history
._mongodb
.connection
:
196 _connect_mongodb(history
._settings
, history
._mongodb
)
197 # We converted _id to be an auto incrementing integer. This makes the unique
198 # index compatible to history_line of the file (which is handled as integer)
199 # within mkeventd. It might be better to use the ObjectId() of MongoDB, but
200 # for the first step, we use the integer index for simplicity
202 history
._mongodb
.db
.ec_archive
.insert({
203 '_id': _mongodb_next_id(history
._mongodb
, 'ec_archive_id'),
204 'dt': datetime
.datetime
.fromtimestamp(now
),
213 def _log_event(config
, logger
, event
, what
, who
, addinfo
):
214 if config
['debug_rules']:
215 logger
.info("Event %d: %s/%s/%s - %s" % (event
["id"], what
, who
, addinfo
, event
["text"]))
218 def _get_mongodb(history
, query
):
219 filters
, limit
= query
.filters
, query
.limit
223 if not history
._mongodb
.connection
:
224 _connect_mongodb(history
._settings
, history
._mongodb
)
226 # Construct the mongodb filtering specification. We could fetch all information
227 # and do filtering on this data, but this would be way too inefficient.
229 for column_name
, operator_name
, _predicate
, argument
in filters
:
231 if operator_name
== '=':
232 mongo_filter
= argument
233 elif operator_name
== '>':
234 mongo_filter
= {'$gt': argument
}
235 elif operator_name
== '<':
236 mongo_filter
= {'$lt': argument
}
237 elif operator_name
== '>=':
238 mongo_filter
= {'$gte': argument
}
239 elif operator_name
== '<=':
240 mongo_filter
= {'$lte': argument
}
241 elif operator_name
== '~': # case sensitive regex, find pattern in string
242 mongo_filter
= {'$regex': argument
, '$options': ''}
243 elif operator_name
== '=~': # case insensitive, match whole string
244 mongo_filter
= {'$regex': argument
, '$options': 'mi'}
245 elif operator_name
== '~~': # case insensitive regex, find pattern in string
246 mongo_filter
= {'$regex': argument
, '$options': 'i'}
247 elif operator_name
== 'in':
248 mongo_filter
= {'$in': argument
}
251 'Filter operator of filter %s not implemented for MongoDB archive' % column_name
)
253 if column_name
[:6] == 'event_':
254 query
['event.' + column_name
[6:]] = mongo_filter
255 elif column_name
[:8] == 'history_':
256 key
= column_name
[8:]
259 query
[key
] = mongo_filter
261 raise Exception('Filter %s not implemented for MongoDB' % column_name
)
263 result
= history
._mongodb
.db
.ec_archive
.find(query
).sort('time', -1)
265 # Might be used for debugging / profiling
266 #file(cmk.utils.paths.omd_root + '/var/log/check_mk/ec_history_debug.log', 'a').write(
267 # pprint.pformat(filters) + '\n' + pprint.pformat(result.explain()) + '\n')
270 result
= result
.limit(limit
+ 1)
272 # now convert the MongoDB data structure to the eventd internal one
281 for colname
, defval
in history
._event
_columns
:
282 key
= colname
[6:] # drop "event_"
283 item
.append(entry
['event'].get(key
, defval
))
284 history_entries
.append(item
)
286 return history_entries
290 # .--History-------------------------------------------------------------.
292 # | | | | (_)___| |_ ___ _ __ _ _ |
293 # | | |_| | / __| __/ _ \| '__| | | | |
294 # | | _ | \__ \ || (_) | | | |_| | |
295 # | |_| |_|_|___/\__\___/|_| \__, | |
297 # +----------------------------------------------------------------------+
298 # | Functions for logging the history of events |
299 # '----------------------------------------------------------------------'
302 def _reload_configuration_files(history
):
306 def _flush_files(history
):
307 _expire_logfiles(history
._settings
, history
._config
, history
._logger
, history
._lock
, True)
310 def _housekeeping_files(history
):
311 _expire_logfiles(history
._settings
, history
._config
, history
._logger
, history
._lock
, False)
314 # Make a new entry in the event history. Each entry is tab-separated line
315 # with the following columns:
316 # 0: time of log entry
317 # 1: type of entry (keyword)
318 # 2: user who initiated the action (for GUI actions)
319 # 3: additional information about the action
320 # 4-oo: StatusTableEvents.columns
321 def _add_files(history
, event
, what
, who
, addinfo
):
322 _log_event(history
._config
, history
._logger
, event
, what
, who
, addinfo
)
324 columns
= [str(time
.time()), scrub_string(what
), scrub_string(who
), scrub_string(addinfo
)]
326 quote_tab(event
.get(colname
[6:], defval
)) # drop "event_"
327 for colname
, defval
in history
._event
_columns
330 with
get_logfile(history
._config
, history
._settings
.paths
.history_dir
.value
,
331 history
._active
_history
_period
).open(mode
='ab') as f
:
332 f
.write("\t".join(map(cmk
.ec
.actions
.to_utf8
, columns
)) + "\n")
337 if ty
in [float, int]:
340 return '1' if col
else '0'
341 elif ty
in [tuple, list]:
342 col
= "\1" + "\1".join([quote_tab(e
) for e
in col
])
346 col
= col
.encode("utf-8")
348 return col
.replace("\t", " ")
351 class ActiveHistoryPeriod(object):
353 super(ActiveHistoryPeriod
, self
).__init
__()
357 # Get file object to current log file, handle also
358 # history and lifetime limit.
359 def get_logfile(config
, log_dir
, active_history_period
):
360 log_dir
.mkdir(parents
=True, exist_ok
=True)
361 # Log into file starting at current history period,
362 # but: if a newer logfile exists, use that one. This
363 # can happen if you switch the period from daily to
365 timestamp
= _current_history_period(config
)
367 # Log period has changed or we have not computed a filename yet ->
368 # compute currently active period
369 if active_history_period
.value
is None or timestamp
> active_history_period
.value
:
371 # Look if newer files exist
372 timestamps
= sorted(int(str(path
.name
)[:-4]) for path
in log_dir
.glob('*.log'))
373 if len(timestamps
) > 0:
374 timestamp
= max(timestamps
[-1], timestamp
)
376 active_history_period
.value
= timestamp
378 return log_dir
/ ("%d.log" % timestamp
)
381 # Return timestamp of the beginning of the current history
383 def _current_history_period(config
):
384 now_broken
= list(time
.localtime())
385 now_broken
[3:6] = [0, 0, 0] # set clock to 00:00:00
386 now_ts
= time
.mktime(now_broken
) # convert to timestamp
387 if config
["history_rotation"] == "weekly":
388 now_ts
-= now_broken
[6] * 86400 # convert to monday
392 # Delete old log files
393 def _expire_logfiles(settings
, config
, logger
, lock_history
, flush
):
396 days
= config
["history_lifetime"]
397 min_mtime
= time
.time() - days
* 86400
398 logger
.verbose("Expiring logfiles (Horizon: %d days -> %s)" %
399 (days
, cmk
.utils
.render
.date_and_time(min_mtime
)))
400 for path
in settings
.paths
.history_dir
.value
.glob('*.log'):
401 if flush
or path
.stat().st_mtime
< min_mtime
:
402 logger
.info("Deleting log file %s (age %s)" %
403 (path
, cmk
.utils
.render
.date_and_time(path
.stat().st_mtime
)))
405 except Exception as e
:
406 if settings
.options
.debug
:
408 logger
.exception("Error expiring log files: %s" % e
)
411 def _get_files(history
, query
):
412 filters
, limit
= query
.filters
, query
.limit
414 if not history
._settings
.paths
.history_dir
.value
.exists():
417 # Optimization: use grep in order to reduce amount
418 # of read lines based on some frequently used filters.
432 for column_name
, operator_name
, _predicate
, argument
in filters
:
433 # Make sure that the greptexts are in the same order as in the
434 # actual logfiles. They will be joined with ".*"!
436 nr
= grepping_filters
.index(column_name
)
437 if operator_name
in ['=' '~~']:
438 greptexts
.append((nr
, argument
))
443 greptexts
= [x
[1] for x
in greptexts
]
445 time_filters
= [f
for f
in filters
if f
[0].split("_")[-1] == "time"]
447 # We do not want to open all files. So our strategy is:
448 # look for "time" filters and first apply the filter to
449 # the first entry and modification time of the file. Only
450 # if at least one of both timestamps is accepted then we
451 # take that file into account.
452 # Use the later logfiles first, to get the newer log entries
453 # first. When a limit is reached, the newer entries should
454 # be processed in most cases. We assume that now.
455 # To keep a consistent order of log entries, we should care
456 # about sorting the log lines in reverse, but that seems to
457 # already be done by the GUI, so we don't do that twice. Skipping
458 # this # will lead into some lines of a single file to be limited in
459 # wrong order. But this should be better than before.
460 for ts
, path
in sorted(((int(str(path
.name
)[:-4]), path
)
461 for path
in history
._settings
.paths
.history_dir
.value
.glob('*.log')),
463 if limit
is not None and limit
<= 0:
465 first_entry
, last_entry
= _get_logfile_timespan(path
)
466 for _column_name
, _operator_name
, predicate
, _argument
in time_filters
:
467 if predicate(first_entry
):
469 if predicate(last_entry
):
472 # If no filter matches but we *have* filters
473 # then we skip this file. It cannot contain
474 # any useful entry for us.
475 if len(time_filters
):
476 if history
._settings
.options
.debug
:
477 history
._logger
.info("Skipping logfile %s.log because of time filter" % ts
)
478 continue # skip this file
480 new_entries
= _parse_history_file(history
, path
, query
, greptexts
, limit
, history
._logger
)
481 history_entries
+= new_entries
482 if limit
is not None:
483 limit
-= len(new_entries
)
485 return history_entries
488 def _parse_history_file(history
, path
, query
, greptexts
, limit
, logger
):
491 # If we have greptexts we pre-filter the file using the extremely
493 # Revert lines from the log file to have the newer lines processed first
494 cmd
= 'tac %s' % cmk
.ec
.actions
.quote_shell_string(str(path
))
496 cmd
+= " | egrep -i -e %s" % cmk
.ec
.actions
.quote_shell_string(".*".join(greptexts
))
497 grep
= subprocess
.Popen(cmd
, shell
=True, close_fds
=True, stdout
=subprocess
.PIPE
) # nosec
499 for line
in grep
.stdout
:
501 if limit
is not None and len(entries
) > limit
:
507 parts
= line
.decode('utf-8').rstrip('\n').split('\t')
508 _convert_history_line(history
, parts
)
509 values
= [line_no
] + parts
510 if query
.filter_row(values
):
511 entries
.append(values
)
512 except Exception as e
:
513 logger
.exception("Invalid line '%s' in history file %s: %s" % (line
, path
, e
))
518 # Speed-critical function for converting string representation
519 # of log line back to Python values
520 def _convert_history_line(history
, values
):
521 # NOTE: history_line column is missing here, so indices are off by 1! :-P
522 values
[0] = float(values
[0]) # history_time
523 values
[4] = int(values
[4]) # event_id
524 values
[5] = int(values
[5]) # event_count
525 values
[7] = float(values
[7]) # event_first
526 values
[8] = float(values
[8]) # event_last
527 values
[10] = int(values
[10]) # event_sl
528 values
[14] = int(values
[14]) # event_pid
529 values
[15] = int(values
[15]) # event_priority
530 values
[16] = int(values
[16]) # event_facility
531 values
[18] = int(values
[18]) # event_state
532 values
[21] = _unsplit(values
[21]) # event_match_groups
533 num_values
= len(values
)
534 if num_values
<= 22: # event_contact_groups
537 values
[22] = _unsplit(values
[22])
538 if num_values
<= 23: # event_ipaddress
539 values
.append(history
._history
_columns
[24][1])
540 if num_values
<= 24: # event_orig_host
541 values
.append(history
._history
_columns
[25][1])
542 if num_values
<= 25: # event_contact_groups_precedence
543 values
.append(history
._history
_columns
[26][1])
544 if num_values
<= 26: # event_core_host
545 values
.append(history
._history
_columns
[27][1])
546 if num_values
<= 27: # event_host_in_downtime
547 values
.append(history
._history
_columns
[28][1])
549 values
[27] = values
[27] == "1"
550 if num_values
<= 28: # event_match_groups_syslog_application
551 values
.append(history
._history
_columns
[29][1])
553 values
[28] = _unsplit(values
[28])
557 if not isinstance(s
, six
.string_types
):
560 elif s
.startswith('\2'):
561 return None # \2 is the designator for None
563 elif s
.startswith('\1'):
566 return tuple(s
[1:].split('\1'))
570 def _get_logfile_timespan(path
):
572 with path
.open() as f
:
573 first_entry
= float(f
.readline().split('\t', 1)[0])
574 last_entry
= path
.stat().st_mtime
575 return first_entry
, last_entry
580 # Rip out/replace any characters which have a special meaning in the UTF-8
581 # encoded history files, see e.g. quote_tab. In theory this shouldn't be
582 # necessary, because there are a bunch of bytes which are not contained in any
583 # valid UTF-8 string, but following Murphy's Law, those are not used in
584 # Check_MK. To keep backwards compatibility with old history files, we have no
585 # choice and continue to do it wrong... :-/
587 if isinstance(s
, str):
588 return s
.translate(scrub_string
.str_table
, "\0\1\2\n")
589 if isinstance(s
, unicode):
590 return s
.translate(scrub_string
.unicode_table
)
591 raise TypeError("scrub_string expects a string argument")
594 scrub_string
.str_table
= string
.maketrans("\t", " ")
595 scrub_string
.unicode_table
= {0: None, 1: None, 2: None, ord("\n"): None, ord("\t"): ord(" ")}