Refactoring: Changed all remaining check parameters to the new rulespec registry...
[check_mk.git] / cmk_base / prediction.py
blobdca104850507ff3822b39647ab5619b19517244e
1 #!/usr/bin/python
2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
9 # | |
10 # | Copyright Mathias Kettner 2014 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
26 """Code for predictive monitoring / anomaly detection"""
28 import json
29 import math
30 import os
31 import time
33 import cmk.utils.debug
34 import cmk.utils
35 import cmk.utils.log
36 import cmk.utils.defines as defines
37 import cmk.utils.prediction
38 from cmk.utils.exceptions import MKGeneralException
40 logger = cmk.utils.log.get_logger(__name__)
43 def day_start(timestamp):
44 return (timestamp - cmk.utils.prediction.timezone_at(timestamp)) % 86400
47 def hour_start(timestamp):
48 return (timestamp - cmk.utils.prediction.timezone_at(timestamp)) % 3600
51 def group_by_wday(t):
52 wday = time.localtime(t).tm_wday
53 return defines.weekday_ids()[wday], day_start(t)
56 def group_by_day(t):
57 return "everyday", day_start(t)
60 def group_by_day_of_month(t):
61 mday = time.localtime(t).tm_mday
62 return str(mday), day_start(t)
65 def group_by_everyhour(t):
66 return "everyhour", hour_start(t)
69 prediction_periods = {
70 "wday": {
71 "slice": 86400, # 7 slices
72 "groupby": group_by_wday,
73 "valid": 7,
75 "day": {
76 "slice": 86400, # 31 slices
77 "groupby": group_by_day_of_month,
78 "valid": 28,
80 "hour": {
81 "slice": 86400, # 1 slice
82 "groupby": group_by_day,
83 "valid": 1,
85 "minute": {
86 "slice": 3600, # 1 slice
87 "groupby": group_by_everyhour,
88 "valid": 24,
93 def get_prediction_timegroup(t, period_info):
94 """
95 Return:
96 timegroup: name of the group, like 'monday' or '12'
97 from_time: absolute epoch time of the first second of the
98 current slice.
99 until_time: absolute epoch time of the first second *not* in the slice
100 rel_time: seconds offset of now in the current slice
102 # Convert to local timezone
103 timegroup, rel_time = period_info["groupby"](t)
104 from_time = t - rel_time
105 until_time = t - rel_time + period_info["slice"]
106 return timegroup, from_time, until_time, rel_time
109 def retrieve_grouped_data_from_rrd(hostname, service_description, timegroup, params, period_info,
110 from_time, dsname, cf):
111 # Collect all slices back into the past until the time horizon
112 # is reached
113 begin = from_time
114 slices = []
115 absolute_begin = from_time - params["horizon"] * 86400
116 # The resolutions of the different time ranges differ. We interpolate
117 # to the best resolution. We assume that the youngest slice has the
118 # finest resolution. We also assume, that each step is always dividable
119 # by the smallest step.
121 # Note: due to the f**king DST, we can have several shifts between
122 # DST and non-DST during are computation. We need to compensate for
123 # those. DST swaps within slices are being ignored. The DST flag
124 # is checked against the beginning of the slice.
125 smallest_step = None
126 while begin >= absolute_begin:
127 tg, fr, un = get_prediction_timegroup(begin, period_info)[:3]
128 if tg == timegroup:
129 step, data = cmk.utils.prediction.get_rrd_data(hostname, service_description, dsname,
130 cf, fr, un - 1)
131 if smallest_step is None:
132 smallest_step = step
133 slices.append((fr, step / float(smallest_step), data))
134 begin -= period_info["slice"]
135 return slices, smallest_step
138 def consolidate_data(slices):
139 # Now we have all the RRD data we need. The next step is to consolidate
140 # all that data into one new array.
141 try:
142 num_points = slices[0][2]
143 except IndexError:
144 raise MKGeneralException("Got no historic metrics")
146 consolidated = []
147 for i in xrange(len(num_points)):
148 point_line = []
149 for _from_time, scale, data in slices:
150 if not data:
151 continue
152 idx = int(i / float(scale)) # left data-point mapping
153 d = data[idx]
154 if d is not None:
155 point_line.append(d)
157 if point_line:
158 average = sum(point_line) / float(len(point_line))
159 consolidated.append([
160 average,
161 min(point_line),
162 max(point_line),
163 stdev(point_line, average),
165 else:
166 consolidated.append([None, None, None, None])
167 return consolidated
170 def aggregate_data_for_prediction_and_save(hostname, service_description, pred_file, params,
171 period_info, dsname, cf, now):
172 _clean_predictions_dir(os.path.dirname(pred_file), params)
174 timegroup, from_time, until_time, _rel_time = get_prediction_timegroup(now, period_info)
175 logger.verbose("Aggregating data for time group %s", timegroup)
176 slices, smallest_step = retrieve_grouped_data_from_rrd(
177 hostname, service_description, timegroup, params, period_info, from_time, dsname, cf)
179 consolidated = consolidate_data(slices)
181 data_for_pred = {
182 "num_points": len(consolidated),
183 "step": smallest_step,
184 "columns": ["average", "min", "max", "stdev"],
185 "points": consolidated,
188 info = {
189 "time": now,
190 "range": (from_time, until_time),
191 "cf": cf,
192 "dsname": dsname,
193 "slice": period_info["slice"],
194 "params": params,
197 with open(pred_file + '.info', "w") as fname:
198 json.dump(info, fname)
199 with open(pred_file, "w") as fname:
200 json.dump(data_for_pred, fname)
202 return data_for_pred
205 def stdev(point_line, average):
206 samples = len(point_line)
207 # In the case of a single data-point an unbiased standard deviation is
208 # undefined. In this case we take the magnitude of the measured value
209 # itself as a measure of the dispersion.
210 if samples == 1:
211 return abs(average)
212 return math.sqrt(abs(sum(p**2 for p in point_line) - average**2 * samples) / float(samples - 1))
215 def is_prediction_up2date(pred_file, timegroup, params):
216 # Check, if we need to (re-)compute the prediction file. This is
217 # the case if:
218 # - no prediction has been done yet for this time group
219 # - the prediction from the last time is outdated
220 # - the prediction from the last time was done with other parameters
222 last_info = cmk.utils.prediction.retrieve_data_for_prediction(pred_file + ".info", timegroup)
223 if last_info is None:
224 return False
226 period_info = prediction_periods[params["period"]]
227 now = time.time()
228 if last_info["time"] + period_info["valid"] * period_info["slice"] < now:
229 logger.verbose("Prediction of %s outdated", timegroup)
230 return False
232 jsonized_params = json.loads(json.dumps(params))
233 if last_info.get('params') != jsonized_params:
234 logger.verbose("Prediction parameters have changed.")
235 return False
237 return True
240 def _clean_predictions_dir(pred_dir, params):
241 # Remove all prediction files that result from other
242 # prediction periods. This is e.g. needed if the user switches
243 # the parameter from 'wday' to 'day'.
244 for f in os.listdir(pred_dir):
245 if f.endswith(".info"):
246 info_file = os.path.join(pred_dir, f)
247 info = cmk.utils.prediction.retrieve_data_for_prediction(info_file, '')
248 if info is None or info["params"]["period"] != params["period"]:
249 cmk.utils.prediction.clean_prediction_files(info_file[:-5], force=True)
252 # cf: consilidation function (MAX, MIN, AVERAGE)
253 # levels_factor: this multiplies all absolute levels. Usage for example
254 # in the cpu.loads check the multiplies the levels by the number of CPU
255 # cores.
256 def get_levels(hostname, service_description, dsname, params, cf, levels_factor=1.0):
257 # Compute timegroup
258 now = time.time()
259 period_info = prediction_periods[params["period"]]
261 timegroup, rel_time = period_info["groupby"](now)
263 pred_dir = cmk.utils.prediction.predictions_dir(
264 hostname, service_description, dsname, create=True)
266 pred_file = os.path.join(pred_dir, timegroup)
267 cmk.utils.prediction.clean_prediction_files(pred_file)
269 if is_prediction_up2date(pred_file, timegroup, params):
270 data_for_pred = cmk.utils.prediction.retrieve_data_for_prediction(pred_file, timegroup)
271 else:
272 data_for_pred = aggregate_data_for_prediction_and_save(
273 hostname, service_description, pred_file, params, period_info, dsname, cf, now)
275 # Find reference value in data_for_pred
276 index = int(rel_time / data_for_pred["step"])
277 reference = dict(zip(data_for_pred["columns"], data_for_pred["points"][index]))
278 return cmk.utils.prediction.estimate_levels(reference, params, levels_factor)