Apply BSD-style license
[bwmon.git] / bwmon / aggregator.py
blob4e387a23fefffd4b300b72bc89ad6da7a48619aa
1 # -*- coding: utf-8 -*-
3 # Copyright 2010 Thomas Perl and Stefan Kögl. All rights reserved.
5 # Developed for a practical course (Large-scaled distributed computing) at the
6 # University of Technology Vienna in the 2010 summer term.
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions are met:
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
14 # 2. Redistributions in binary form must reproduce the above copyright notice,
15 # this list of conditions and the following disclaimer in the documentation
16 # and/or other materials provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR IMPLIED
19 # WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
20 # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
21 # EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
24 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
26 # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 from __future__ import absolute_import
32 import time
33 import sys
34 import threading
35 import re
36 import shlex
37 import os.path
38 import BaseHTTPServer
39 from datetime import datetime
41 from bwmon import util
42 from bwmon import model
43 from bwmon import http
45 class Aggregator(object):
46 """Aggregator that merges output from multiple monitors
48 This class is used to aggregate the output of several monitors
49 (ip_conntrack-based or pipe-based) into one output.
50 """
52 def __init__(self):
53 """Create a new Aggregator object
55 The aggregator is initialized with a default update frequency
56 of "once per second".
57 """
58 self.monitors = []
59 self.update_frequency = 1
60 self.entries = model.MonitorEntryCollection(self.update_frequency)
61 self.apps = {}
62 self.app_configs = {}
63 self.auto_group = False
64 self.notification_configs = []
65 self.notification_handler = {}
66 http.RequestHandler.monitor = self.entries
68 def add_monitor(self, monitor):
69 """Add a new monitor to the aggregated result
71 The monitor can be a bwmon.pipe.PipeMonitor object or
72 a bwmon.monitor.Monitor object (i.e. a system-wide,
73 ip_conntrack based monitor).
75 @param monitor: A pipe.PipeMonitor or monitor.Monitor object
76 """
77 self.monitors.append(monitor)
80 def add_notification(self, regex, in_threshold, out_threshold, interval, command):
81 """Add a notification setting
83 A notification entry consists of a regex that specifies for which
84 processes/applications it is valid and in/out thresholds
86 @param regex: regular expression that is matched against processes/applications
87 @param in_threshold: incoming bandwidth threshold in kB/s
88 @param out_threshold: outgoing bandwidth threshold in kB/s
89 @param interval: interval in seconds for which the average bandwidth is calculated
90 @param command: optional command that shall be executed when a notification is issued
91 """
92 self.notification_configs.append( (re.compile(regex), in_threshold, out_threshold, interval, command) )
95 def set_app_config(self, app, regex_list):
96 """Add a config entry on how to group processes to Applications
98 @param app: name of the formed application
99 @param regex_list: a list of regular expressions that are matched against the processes full commandline
101 self.app_configs[app] = regex_list
104 def run(self):
105 """Run the aggregator
107 This runs the aggregator in an endless loop, printing
108 the current usage periodically and sends out pre-set
109 notifications.
111 def thread_proc():
112 server = BaseHTTPServer.HTTPServer(('', 8000), http.RequestHandler)
113 while True:
114 server.handle_request()
115 t = threading.Thread(target=thread_proc)
116 t.setDaemon(True)
117 t.start()
119 while True:
120 for monitor in self.monitors:
121 monitor.update(self.entries)
122 self.group()
123 self.notify()
124 self.output()
125 time.sleep(self.update_frequency)
127 def get_app(self, cmd):
128 """Determine the app name given a command
130 Returns the name of the application that is assigned
131 to a given command (command line) as given by the config.
133 Fallback 1: The basename of the command (auto-grouping).
135 Fallback 2: The command itself.
137 for app, regex in self.app_configs.iteritems():
138 if any([re.search(x, cmd) for x in regex]):
139 return app
141 if self.auto_group:
142 cmds = shlex.split(cmd)
143 return os.path.basename(cmds[0])
145 return cmd
147 def group(self):
148 """Group command-based usage by application
150 This takes the current measurements (command line-based)
151 and calculates per-app statistics using the rules to
152 combine the app.
154 self.apps = {}
155 entries = sorted(self.entries.get_usage())
156 for bytes_in, bytes_out, cmd in entries:
157 cmd = self.get_app(cmd)
158 (b_in, b_out) = self.apps[cmd] if cmd in self.apps else (0, 0)
159 self.apps[cmd] = (b_in + bytes_in, b_out + bytes_out)
161 def notify(self):
162 """Compare current usage against SLA; send notifications
164 Compare the app-based usage values and check if there
165 are any SLA violations. If there are any violations, call
166 any notifiation callback as specified in the config.
168 for (cmd, (bytes_in, bytes_out)) in self.apps.iteritems():
170 # App does not have notifications yet
171 if not cmd in self.notification_handler:
172 self.notification_handler[cmd] = []
173 for (regex, in_threshold, out_threshold, interval, not_command) in self.notification_configs:
174 if regex.search(cmd):
175 self.notification_handler[cmd].append(NotificationHandler(cmd, in_threshold, out_threshold, interval, not_command))
178 for handler in self.notification_handler[cmd]:
179 handler.report_data(bytes_in / 1024, bytes_out / 1024)
180 handler.check_notify()
183 def output(self):
184 """Print the current bandwidth usage to the console
186 The output is (by default) grouped by apps. The columns
187 in the output: BYTES_IN, BYTES_OUT, APP NAME
189 util.clear()
191 for (cmd, (bytes_in, bytes_out)) in self.apps.iteritems():
192 if bytes_in > 1024. or bytes_out > 1024.:
193 if len(cmd) > 60:
194 cmd = cmd[:57] + '...'
195 print '%10.2f KiB / %10.2f KiB -- %s' % (bytes_in/1024., bytes_out/1024., cmd)
196 sys.stdout.flush()
199 def close(self):
200 """Close the aggregator and its monitors
202 for mon in self.monitors:
203 mon.close()
208 class NotificationHandler(object):
209 """Handler object for checking SLA thresholds and sending notifications
212 def __init__(self, cmd, in_threshold, out_threshold, interval, notify_command):
213 """Creates a new NotificationHandler object
215 @param cmd: commandline of the monitored process
216 @param in_threshold: incoming bandwidth threshold that is configured for the process
217 @param out_threshold: outgoing bandwidth threshold that is configured for the process
218 @param interval: interval that is configured for the process
219 @param notify_command: command that should be called when a notification is issued
221 self.cmd = cmd
222 self.in_threshold = in_threshold
223 self.out_threshold = out_threshold
224 self.in_data = util.RingBuffer(interval)
225 self.out_data = util.RingBuffer(interval)
226 self.notify_command = notify_command
228 def report_data(self, in_value, out_value):
229 """Report current usage data to the handler
231 @param in_value: currently utilized incoming bandwidth
232 @param out_value: currently utilized outgoing bandiwdht
234 self.in_data.append(in_value)
235 self.out_data.append(out_value)
238 def check_notify(self):
239 """Check if a notification should be issued and issue it if necessary
241 The average in-/out-bandwidth for the configured interval is
242 calculated and checked against the thresholds
244 in_avg = avg(self.in_data.get())
245 if self.in_threshold and in_avg > self.in_threshold:
246 self.notify('in', self.in_threshold, in_avg)
248 out_avg= avg(self.out_data.get())
249 if self.out_threshold and out_avg > self.out_threshold:
250 self.notify('out', self.out_threshold, out_avg)
252 def notify(self, direction, threshold, value):
253 """Issue a notification
255 This is called by check_notify if a threshold has been exceeded
257 @param direction: direction (in or out) for which the threshold was exceeded
258 @param threshold: configured threshold for the given direction
259 @param value: actual average bandwidth that has exceeded the threshold
261 import sys
262 if len(self.cmd) > 50:
263 cmd = self.cmd[:47] + '...'
264 else:
265 cmd = self.cmd
267 print >> sys.stderr, "%s: %s exceeding '%s' bandwidth limit %10.2f: %10.2f kB/s (%3.2f %%)" % \
268 (datetime.strftime(datetime.now(), '%F %T'), cmd, direction, threshold, value, value / threshold * 100)
271 if self.notify_command:
272 import shlex
273 args = shlex.split(self.notify_command)
275 import subprocess
276 subprocess.Popen(args)
279 def avg(x): return float(sum(x)) / len(x)