Licenses: Updated the list of licenses and added a PDF containing all license texts
[check_mk.git] / cmk_base / prediction.py
blob10c266d96013844d8dafb82ea287c24268426267
1 #!/usr/bin/python
2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
9 # | |
10 # | Copyright Mathias Kettner 2014 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
26 """Code for predictive monitoring / anomaly detection"""
28 import json
29 import math
30 import os
31 import time
33 import cmk.utils.debug
34 import cmk.utils
35 import cmk.utils.log
36 import cmk.utils.defines as defines
37 import cmk.utils.prediction
39 logger = cmk.utils.log.get_logger(__name__)
42 def window_start(timestamp, span):
43 """If time is partitioned in SPAN intervals, how many seconds is TIMESTAMP away from the start
45 It works well across time zones, but has an unfair behavior with daylight savings time."""
46 return (timestamp - cmk.utils.prediction.timezone_at(timestamp)) % span
49 def group_by_wday(t):
50 wday = time.localtime(t).tm_wday
51 return defines.weekday_ids()[wday], window_start(t, 86400)
54 def group_by_day(t):
55 return "everyday", window_start(t, 86400)
58 def group_by_day_of_month(t):
59 mday = time.localtime(t).tm_mday
60 return str(mday), window_start(t, 86400)
63 def group_by_everyhour(t):
64 return "everyhour", window_start(t, 3600)
67 prediction_periods = {
68 "wday": {
69 "slice": 86400, # 7 slices
70 "groupby": group_by_wday,
71 "valid": 7,
73 "day": {
74 "slice": 86400, # 31 slices
75 "groupby": group_by_day_of_month,
76 "valid": 28,
78 "hour": {
79 "slice": 86400, # 1 slice
80 "groupby": group_by_day,
81 "valid": 1,
83 "minute": {
84 "slice": 3600, # 1 slice
85 "groupby": group_by_everyhour,
86 "valid": 24,
91 def get_prediction_timegroup(t, period_info):
92 """
93 Return:
94 timegroup: name of the group, like 'monday' or '12'
95 from_time: absolute epoch time of the first second of the
96 current slice.
97 until_time: absolute epoch time of the first second *not* in the slice
98 rel_time: seconds offset of now in the current slice
99 """
100 # Convert to local timezone
101 timegroup, rel_time = period_info["groupby"](t)
102 from_time = t - rel_time
103 until_time = from_time + period_info["slice"]
104 return timegroup, from_time, until_time, rel_time
107 def time_slices(timestamp, horizon, period_info, timegroup):
108 "Collect all slices back into the past until time horizon is reached"
109 timestamp = int(timestamp)
110 abs_begin = timestamp - horizon
111 slices = []
113 # Note: due to the f**king DST, we can have several shifts between DST
114 # and non-DST during a computation. Treatment is unfair on those longer
115 # or shorter days. All days have 24hrs. DST swaps within slices are
116 # being ignored, we work with slice shifts. The DST flag is checked
117 # against the query timestamp. In general that means test is done at
118 # the beginning of the day(because predictive levels refresh at
119 # midnight) and most likely before DST swap is applied.
121 # Have fun understanding the tests for this function.
123 for begin in range(timestamp, abs_begin, -period_info["slice"]):
124 tg, start, end = get_prediction_timegroup(begin, period_info)[:3]
125 if tg == timegroup:
126 slices.append((start, end))
127 return slices
130 def retrieve_grouped_data_from_rrd(rrd_column, time_windows):
131 "Collect all time slices and up-sample them to same resolution"
132 from_time = time_windows[0][0]
134 slices = [(rrd_column(start, end), from_time - start) for start, end in time_windows]
136 # The resolutions of the different time ranges differ. We upsample
137 # to the best resolution. We assume that the youngest slice has the
138 # finest resolution.
139 twindow = slices[0][0].twindow
140 return twindow, [ts.bfill_upsample(twindow, shift) for ts, shift in slices]
143 def data_stats(slices):
144 "Statistically summarize all the upsampled RRD data"
146 descriptors = []
148 for time_column in zip(*slices):
149 point_line = [x for x in time_column if x is not None]
150 if point_line:
151 average = sum(point_line) / float(len(point_line))
152 descriptors.append([
153 average,
154 min(point_line),
155 max(point_line),
156 stdev(point_line, average),
158 else:
159 descriptors.append([None, None, None, None])
161 return descriptors
164 def calculate_data_for_prediction(time_windows, rrd_datacolumn):
165 twindow, slices = retrieve_grouped_data_from_rrd(rrd_datacolumn, time_windows)
167 descriptors = data_stats(slices)
169 return {
170 u"columns": [u"average", u"min", u"max", u"stdev"],
171 u"points": descriptors,
172 u"num_points": len(descriptors),
173 u"data_twindow": list(twindow[:2]),
174 u"step": twindow[2],
178 def save_predictions(pred_file, info, data_for_pred):
179 with open(pred_file + '.info', "w") as fname:
180 json.dump(info, fname)
181 with open(pred_file, "w") as fname:
182 json.dump(data_for_pred, fname)
185 def stdev(point_line, average):
186 samples = len(point_line)
187 # In the case of a single data-point an unbiased standard deviation is
188 # undefined. In this case we take the magnitude of the measured value
189 # itself as a measure of the dispersion.
190 if samples == 1:
191 return abs(average)
192 return math.sqrt(abs(sum(p**2 for p in point_line) - average**2 * samples) / float(samples - 1))
195 def is_prediction_up2date(pred_file, timegroup, params):
196 # Check, if we need to (re-)compute the prediction file. This is
197 # the case if:
198 # - no prediction has been done yet for this time group
199 # - the prediction from the last time is outdated
200 # - the prediction from the last time was done with other parameters
202 last_info = cmk.utils.prediction.retrieve_data_for_prediction(pred_file + ".info", timegroup)
203 if last_info is None:
204 return False
206 period_info = prediction_periods[params["period"]]
207 now = time.time()
208 if last_info["time"] + period_info["valid"] * period_info["slice"] < now:
209 logger.verbose("Prediction of %s outdated", timegroup)
210 return False
212 jsonized_params = json.loads(json.dumps(params))
213 if last_info.get('params') != jsonized_params:
214 logger.verbose("Prediction parameters have changed.")
215 return False
217 return True
220 # cf: consilidation function (MAX, MIN, AVERAGE)
221 # levels_factor: this multiplies all absolute levels. Usage for example
222 # in the cpu.loads check the multiplies the levels by the number of CPU
223 # cores.
224 def get_levels(hostname, service_description, dsname, params, cf, levels_factor=1.0):
225 # Compute timegroup
226 now = time.time()
227 period_info = prediction_periods[params["period"]]
229 timegroup, rel_time = period_info["groupby"](now)
231 pred_dir = cmk.utils.prediction.predictions_dir(
232 hostname, service_description, dsname, create=True)
234 pred_file = os.path.join(pred_dir, timegroup)
235 cmk.utils.prediction.clean_prediction_files(pred_file)
237 if is_prediction_up2date(pred_file, timegroup, params):
238 data_for_pred = cmk.utils.prediction.retrieve_data_for_prediction(pred_file, timegroup)
239 else:
240 logger.verbose("Calculating prediction data for time group %s", timegroup)
241 cmk.utils.prediction.clean_prediction_files(pred_file, force=True)
243 time_windows = time_slices(now, int(params["horizon"] * 86400), period_info, timegroup)
245 rrd_datacolumn = cmk.utils.prediction.rrd_datacolum(hostname, service_description, dsname,
248 data_for_pred = calculate_data_for_prediction(time_windows, rrd_datacolumn)
250 info = {
251 u"time": now,
252 u"range": time_windows[0],
253 u"cf": cf,
254 u"dsname": dsname,
255 u"slice": period_info["slice"],
256 u"params": params,
258 save_predictions(pred_file, info, data_for_pred)
260 # Find reference value in data_for_pred
261 index = int(rel_time / data_for_pred["step"])
262 reference = dict(zip(data_for_pred["columns"], data_for_pred["points"][index]))
263 return cmk.utils.prediction.estimate_levels(reference, params, levels_factor)