2 # -*- encoding: utf-8; py-indent-offset: 4 -*-
3 # +------------------------------------------------------------------+
4 # | ____ _ _ __ __ _ __ |
5 # | / ___| |__ ___ ___| | __ | \/ | |/ / |
6 # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
7 # | | |___| | | | __/ (__| < | | | | . \ |
8 # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
10 # | Copyright Mathias Kettner 2014 mk@mathias-kettner.de |
11 # +------------------------------------------------------------------+
13 # This file is part of Check_MK.
14 # The official homepage is at http://mathias-kettner.de/check_mk.
16 # check_mk is free software; you can redistribute it and/or modify it
17 # under the terms of the GNU General Public License as published by
18 # the Free Software Foundation in version 2. check_mk is distributed
19 # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
20 # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
21 # PARTICULAR PURPOSE. See the GNU General Public License for more de-
22 # tails. You should have received a copy of the GNU General Public
23 # License along with GNU Make; see the file COPYING. If not, write
24 # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25 # Boston, MA 02110-1301 USA.
26 """Code for predictive monitoring / anomaly detection"""
33 import cmk
.utils
.debug
36 import cmk
.utils
.defines
as defines
37 import cmk
.utils
.prediction
38 from cmk
.utils
.exceptions
import MKGeneralException
40 logger
= cmk
.utils
.log
.get_logger(__name__
)
43 def day_start(timestamp
):
44 return (timestamp
- cmk
.utils
.prediction
.timezone_at(timestamp
)) % 86400
47 def hour_start(timestamp
):
48 return (timestamp
- cmk
.utils
.prediction
.timezone_at(timestamp
)) % 3600
52 wday
= time
.localtime(t
).tm_wday
53 return defines
.weekday_ids()[wday
], day_start(t
)
57 return "everyday", day_start(t
)
60 def group_by_day_of_month(t
):
61 mday
= time
.localtime(t
).tm_mday
62 return str(mday
), day_start(t
)
65 def group_by_everyhour(t
):
66 return "everyhour", hour_start(t
)
69 prediction_periods
= {
71 "slice": 86400, # 7 slices
72 "groupby": group_by_wday
,
76 "slice": 86400, # 31 slices
77 "groupby": group_by_day_of_month
,
81 "slice": 86400, # 1 slice
82 "groupby": group_by_day
,
86 "slice": 3600, # 1 slice
87 "groupby": group_by_everyhour
,
93 def get_prediction_timegroup(t
, period_info
):
96 timegroup: name of the group, like 'monday' or '12'
97 from_time: absolute epoch time of the first second of the
99 until_time: absolute epoch time of the first second *not* in the slice
100 rel_time: seconds offset of now in the current slice
102 # Convert to local timezone
103 timegroup
, rel_time
= period_info
["groupby"](t
)
104 from_time
= t
- rel_time
105 until_time
= t
- rel_time
+ period_info
["slice"]
106 return timegroup
, from_time
, until_time
, rel_time
109 def retrieve_grouped_data_from_rrd(hostname
, service_description
, timegroup
, params
, period_info
,
110 from_time
, dsname
, cf
):
111 # Collect all slices back into the past until the time horizon
115 absolute_begin
= from_time
- params
["horizon"] * 86400
116 # The resolutions of the different time ranges differ. We interpolate
117 # to the best resolution. We assume that the youngest slice has the
118 # finest resolution. We also assume, that each step is always dividable
119 # by the smallest step.
121 # Note: due to the f**king DST, we can have several shifts between
122 # DST and non-DST during are computation. We need to compensate for
123 # those. DST swaps within slices are being ignored. The DST flag
124 # is checked against the beginning of the slice.
126 while begin
>= absolute_begin
:
127 tg
, fr
, un
= get_prediction_timegroup(begin
, period_info
)[:3]
129 step
, data
= cmk
.utils
.prediction
.get_rrd_data(hostname
, service_description
, dsname
,
131 if smallest_step
is None:
133 slices
.append((fr
, step
/ float(smallest_step
), data
))
134 begin
-= period_info
["slice"]
135 return slices
, smallest_step
138 def consolidate_data(slices
):
139 # Now we have all the RRD data we need. The next step is to consolidate
140 # all that data into one new array.
142 num_points
= slices
[0][2]
144 raise MKGeneralException("Got no historic metrics")
147 for i
in xrange(len(num_points
)):
149 for _from_time
, scale
, data
in slices
:
152 idx
= int(i
/ float(scale
)) # left data-point mapping
158 average
= sum(point_line
) / float(len(point_line
))
159 consolidated
.append([
163 stdev(point_line
, average
),
166 consolidated
.append([None, None, None, None])
170 def aggregate_data_for_prediction_and_save(hostname
, service_description
, pred_file
, params
,
171 period_info
, dsname
, cf
, now
):
172 _clean_predictions_dir(os
.path
.dirname(pred_file
), params
)
174 timegroup
, from_time
, until_time
, _rel_time
= get_prediction_timegroup(now
, period_info
)
175 logger
.verbose("Aggregating data for time group %s", timegroup
)
176 slices
, smallest_step
= retrieve_grouped_data_from_rrd(
177 hostname
, service_description
, timegroup
, params
, period_info
, from_time
, dsname
, cf
)
179 consolidated
= consolidate_data(slices
)
182 "num_points": len(consolidated
),
183 "step": smallest_step
,
184 "columns": ["average", "min", "max", "stdev"],
185 "points": consolidated
,
190 "range": (from_time
, until_time
),
193 "slice": period_info
["slice"],
197 with
open(pred_file
+ '.info', "w") as fname
:
198 json
.dump(info
, fname
)
199 with
open(pred_file
, "w") as fname
:
200 json
.dump(data_for_pred
, fname
)
205 def stdev(point_line
, average
):
206 samples
= len(point_line
)
207 # In the case of a single data-point an unbiased standard deviation is
208 # undefined. In this case we take the magnitude of the measured value
209 # itself as a measure of the dispersion.
212 return math
.sqrt(abs(sum(p
**2 for p
in point_line
) - average
**2 * samples
) / float(samples
- 1))
215 def is_prediction_up2date(pred_file
, timegroup
, params
):
216 # Check, if we need to (re-)compute the prediction file. This is
218 # - no prediction has been done yet for this time group
219 # - the prediction from the last time is outdated
220 # - the prediction from the last time was done with other parameters
222 last_info
= cmk
.utils
.prediction
.retrieve_data_for_prediction(pred_file
+ ".info", timegroup
)
223 if last_info
is None:
226 period_info
= prediction_periods
[params
["period"]]
228 if last_info
["time"] + period_info
["valid"] * period_info
["slice"] < now
:
229 logger
.verbose("Prediction of %s outdated", timegroup
)
232 jsonized_params
= json
.loads(json
.dumps(params
))
233 if last_info
.get('params') != jsonized_params
:
234 logger
.verbose("Prediction parameters have changed.")
240 def _clean_predictions_dir(pred_dir
, params
):
241 # Remove all prediction files that result from other
242 # prediction periods. This is e.g. needed if the user switches
243 # the parameter from 'wday' to 'day'.
244 for f
in os
.listdir(pred_dir
):
245 if f
.endswith(".info"):
246 info_file
= os
.path
.join(pred_dir
, f
)
247 info
= cmk
.utils
.prediction
.retrieve_data_for_prediction(info_file
, '')
248 if info
is None or info
["params"]["period"] != params
["period"]:
249 cmk
.utils
.prediction
.clean_prediction_files(info_file
[:-5], force
=True)
252 # cf: consilidation function (MAX, MIN, AVERAGE)
253 # levels_factor: this multiplies all absolute levels. Usage for example
254 # in the cpu.loads check the multiplies the levels by the number of CPU
256 def get_levels(hostname
, service_description
, dsname
, params
, cf
, levels_factor
=1.0):
259 period_info
= prediction_periods
[params
["period"]]
261 timegroup
, rel_time
= period_info
["groupby"](now
)
263 pred_dir
= cmk
.utils
.prediction
.predictions_dir(
264 hostname
, service_description
, dsname
, create
=True)
266 pred_file
= os
.path
.join(pred_dir
, timegroup
)
267 cmk
.utils
.prediction
.clean_prediction_files(pred_file
)
269 if is_prediction_up2date(pred_file
, timegroup
, params
):
270 data_for_pred
= cmk
.utils
.prediction
.retrieve_data_for_prediction(pred_file
, timegroup
)
272 data_for_pred
= aggregate_data_for_prediction_and_save(
273 hostname
, service_description
, pred_file
, params
, period_info
, dsname
, cf
, now
)
275 # Find reference value in data_for_pred
276 index
= int(rel_time
/ data_for_pred
["step"])
277 reference
= dict(zip(data_for_pred
["columns"], data_for_pred
["points"][index
]))
278 return cmk
.utils
.prediction
.estimate_levels(reference
, params
, levels_factor
)