add html user manual
[bwmon.git] / bwmon / aggregator.py
blob7997bcf0232663fbf1ed119ee1f70b5446186b68
1 # -*- coding: utf-8 -*-
3 from __future__ import absolute_import
5 import time
6 import sys
7 import threading
8 import re
9 import shlex
10 import os.path
11 import BaseHTTPServer
12 from datetime import datetime
14 from bwmon import util
15 from bwmon import model
16 from bwmon import http
18 class Aggregator(object):
19 """Aggregator that merges output from multiple monitors
21 This class is used to aggregate the output of several monitors
22 (ip_conntrack-based or pipe-based) into one output.
23 """
25 def __init__(self):
26 """Create a new Aggregator object
28 The aggregator is initialized with a default update frequency
29 of "once per second".
30 """
31 self.monitors = []
32 self.update_frequency = 1
33 self.entries = model.MonitorEntryCollection(self.update_frequency)
34 self.apps = {}
35 self.app_configs = {}
36 self.auto_group = False
37 self.notification_configs = []
38 self.notification_handler = {}
39 http.RequestHandler.monitor = self.entries
41 def add_monitor(self, monitor):
42 """Add a new monitor to the aggregated result
44 The monitor can be a bwmon.pipe.PipeMonitor object or
45 a bwmon.monitor.Monitor object (i.e. a system-wide,
46 ip_conntrack based monitor).
48 @param monitor: A pipe.PipeMonitor or monitor.Monitor object
49 """
50 self.monitors.append(monitor)
53 def add_notification(self, regex, in_threshold, out_threshold, interval, command):
54 """FIXME
56 @param regex: TODO
57 @param in_threshold: TODO
58 @param out_threshold: TODO
59 @param interval: TODO
60 @param command: TODO
61 """
62 self.notification_configs.append( (re.compile(regex), in_threshold, out_threshold, interval, command) )
65 def set_app_config(self, app, regex_list):
66 """FIXME
68 @param app: TODO
69 @param regex_list: TODO
70 """
71 self.app_configs[app] = regex_list
74 def run(self):
75 """Run the aggregator
77 This runs the aggregator in an endless loop, printing
78 the current usage periodically and sends out pre-set
79 notifications.
80 """
81 def thread_proc():
82 server = BaseHTTPServer.HTTPServer(('', 8000), http.RequestHandler)
83 while True:
84 server.handle_request()
85 t = threading.Thread(target=thread_proc)
86 t.setDaemon(True)
87 t.start()
89 while True:
90 for monitor in self.monitors:
91 monitor.update(self.entries)
92 self.group()
93 self.notify()
94 self.output()
95 time.sleep(self.update_frequency)
97 def get_app(self, cmd):
98 """Determine the app name given a command
100 Returns the name of the application that is assigned
101 to a given command (command line) as given by the config.
103 Fallback 1: The basename of the command (auto-grouping).
105 Fallback 2: The command itself.
107 for app, regex in self.app_configs.iteritems():
108 if any([re.search(x, cmd) for x in regex]):
109 return app
111 if self.auto_group:
112 cmds = shlex.split(cmd)
113 return os.path.basename(cmds[0])
115 return cmd
117 def group(self):
118 """Group command-based usage by application
120 This takes the current measurements (command line-based)
121 and calculates per-app statistics using the rules to
122 combine the app.
124 self.apps = {}
125 entries = sorted(self.entries.get_usage())
126 for bytes_in, bytes_out, cmd in entries:
127 cmd = self.get_app(cmd)
128 (b_in, b_out) = self.apps[cmd] if cmd in self.apps else (0, 0)
129 self.apps[cmd] = (b_in + bytes_in, b_out + bytes_out)
131 def notify(self):
132 """Compare current usage against SLA; send notifications
134 Compare the app-based usage values and check if there
135 are any SLA violations. If there are any violations, call
136 any notifiation callback as specified in the config.
138 for (cmd, (bytes_in, bytes_out)) in self.apps.iteritems():
140 # App does not have notifications yet
141 if not cmd in self.notification_handler:
142 self.notification_handler[cmd] = []
143 for (regex, in_threshold, out_threshold, interval, not_command) in self.notification_configs:
144 if regex.search(cmd):
145 self.notification_handler[cmd].append(NotificationHandler(cmd, in_threshold, out_threshold, interval, not_command))
148 for handler in self.notification_handler[cmd]:
149 handler.report_data(bytes_in / 1024, bytes_out / 1024)
150 handler.check_notify()
153 def output(self):
154 """Print the current bandwidth usage to the console
156 The output is (by default) grouped by apps. The columns
157 in the output: BYTES_IN, BYTES_OUT, APP NAME
159 util.clear()
161 for (cmd, (bytes_in, bytes_out)) in self.apps.iteritems():
162 if bytes_in > 1024. or bytes_out > 1024.:
163 if len(cmd) > 60:
164 cmd = cmd[:57] + '...'
165 print '%10.2f KiB / %10.2f KiB -- %s' % (bytes_in/1024., bytes_out/1024., cmd)
166 sys.stdout.flush()
169 def close(self):
170 """Close the aggregator and its monitors
172 for mon in self.monitors:
173 mon.close()
178 class NotificationHandler(object):
179 """Handler object for checking SLA thresholds and sending notifications
182 def __init__(self, cmd, in_threshold, out_threshold, interval, notify_command):
183 """Creates a new NotificationHandler object
185 @param cmd: TODO
186 @param in_threshold: TODO
187 @param out_threshold: TODO
188 @param interval: TODO
189 @param notify_command: TODO
191 self.cmd = cmd
192 self.in_threshold = in_threshold
193 self.out_threshold = out_threshold
194 self.in_data = util.RingBuffer(interval)
195 self.out_data = util.RingBuffer(interval)
196 self.notify_command = notify_command
198 def report_data(self, in_value, out_value):
199 """TODO
201 @param in_value: TODO
202 @param out_value: TODO
204 self.in_data.append(in_value)
205 self.out_data.append(out_value)
208 def check_notify(self):
209 """TODO
211 in_avg = avg(self.in_data.get())
212 if self.in_threshold and in_avg > self.in_threshold:
213 self.notify('in', self.in_threshold, in_avg)
215 out_avg= avg(self.out_data.get())
216 if self.out_threshold and out_avg > self.out_threshold:
217 self.notify('out', self.out_threshold, out_avg)
219 def notify(self, direction, threshold, value):
220 """TODO
222 @param direction: TODO
223 @param threshold: TODO
224 @param value: TODO
226 import sys
227 if len(self.cmd) > 50:
228 cmd = self.cmd[:47] + '...'
229 else:
230 cmd = self.cmd
232 print >> sys.stderr, "%s: %s exceeding '%s' bandwidth limit %10.2f: %10.2f kB/s (%3.2f %%)" % \
233 (datetime.strftime(datetime.now(), '%F %T'), cmd, direction, threshold, value, value / threshold * 100)
236 if self.notify_command:
237 import shlex
238 args = shlex.split(self.notify_command)
240 import subprocess
241 subprocess.Popen(args)
244 def avg(x): return float(sum(x)) / len(x)