04a933efe7467de763aa10970cd3a43010a001d0
[bwmon.git] / bwmon / aggregator.py
blob04a933efe7467de763aa10970cd3a43010a001d0
1 # -*- coding: utf-8 -*-
3 from __future__ import absolute_import
5 import time
6 import sys
7 import threading
8 import re
9 import shlex
10 import os.path
11 import BaseHTTPServer
12 from datetime import datetime
14 from bwmon import util
15 from bwmon import model
16 from bwmon import http
18 class Aggregator(object):
19 """Aggregator that merges output from multiple monitors
21 This class is used to aggregate the output of several monitors
22 (ip_conntrack-based or pipe-based) into one output.
23 """
25 def __init__(self):
26 """Create a new Aggregator object
28 The aggregator is initialized with a default update frequency
29 of "once per second".
30 """
31 self.monitors = []
32 self.update_frequency = 1
33 self.entries = model.MonitorEntryCollection(self.update_frequency)
34 self.apps = {}
35 self.app_configs = {}
36 self.auto_group = False
37 self.notification_configs = []
38 self.notification_handler = {}
39 http.RequestHandler.monitor = self.entries
41 def add_monitor(self, monitor):
42 """Add a new monitor to the aggregated result
44 The monitor can be a bwmon.pipe.PipeMonitor object or
45 a bwmon.monitor.Monitor object (i.e. a system-wide,
46 ip_conntrack based monitor).
48 @param monitor: A pipe.PipeMonitor or monitor.Monitor object
49 """
50 self.monitors.append(monitor)
53 def add_notification(self, regex, in_threshold, out_threshold, interval, command):
54 """Add a notification setting
56 A notification entry consists of a regex that specifies for which
57 processes/applications it is valid and in/out thresholds
59 @param regex: regular expression that is matched against processes/applications
60 @param in_threshold: incoming bandwidth threshold in kB/s
61 @param out_threshold: outgoing bandwidth threshold in kB/s
62 @param interval: interval in seconds for which the average bandwidth is calculated
63 @param command: optional command that shall be executed when a notification is issued
64 """
65 self.notification_configs.append( (re.compile(regex), in_threshold, out_threshold, interval, command) )
68 def set_app_config(self, app, regex_list):
69 """Add a config entry on how to group processes to Applications
71 @param app: name of the formed application
72 @param regex_list: a list of regular expressions that are matched against the processes full commandline
73 """
74 self.app_configs[app] = regex_list
77 def run(self):
78 """Run the aggregator
80 This runs the aggregator in an endless loop, printing
81 the current usage periodically and sends out pre-set
82 notifications.
83 """
84 def thread_proc():
85 server = BaseHTTPServer.HTTPServer(('', 8000), http.RequestHandler)
86 while True:
87 server.handle_request()
88 t = threading.Thread(target=thread_proc)
89 t.setDaemon(True)
90 t.start()
92 while True:
93 for monitor in self.monitors:
94 monitor.update(self.entries)
95 self.group()
96 self.notify()
97 self.output()
98 time.sleep(self.update_frequency)
100 def get_app(self, cmd):
101 """Determine the app name given a command
103 Returns the name of the application that is assigned
104 to a given command (command line) as given by the config.
106 Fallback 1: The basename of the command (auto-grouping).
108 Fallback 2: The command itself.
110 for app, regex in self.app_configs.iteritems():
111 if any([re.search(x, cmd) for x in regex]):
112 return app
114 if self.auto_group:
115 cmds = shlex.split(cmd)
116 return os.path.basename(cmds[0])
118 return cmd
120 def group(self):
121 """Group command-based usage by application
123 This takes the current measurements (command line-based)
124 and calculates per-app statistics using the rules to
125 combine the app.
127 self.apps = {}
128 entries = sorted(self.entries.get_usage())
129 for bytes_in, bytes_out, cmd in entries:
130 cmd = self.get_app(cmd)
131 (b_in, b_out) = self.apps[cmd] if cmd in self.apps else (0, 0)
132 self.apps[cmd] = (b_in + bytes_in, b_out + bytes_out)
134 def notify(self):
135 """Compare current usage against SLA; send notifications
137 Compare the app-based usage values and check if there
138 are any SLA violations. If there are any violations, call
139 any notifiation callback as specified in the config.
141 for (cmd, (bytes_in, bytes_out)) in self.apps.iteritems():
143 # App does not have notifications yet
144 if not cmd in self.notification_handler:
145 self.notification_handler[cmd] = []
146 for (regex, in_threshold, out_threshold, interval, not_command) in self.notification_configs:
147 if regex.search(cmd):
148 self.notification_handler[cmd].append(NotificationHandler(cmd, in_threshold, out_threshold, interval, not_command))
151 for handler in self.notification_handler[cmd]:
152 handler.report_data(bytes_in / 1024, bytes_out / 1024)
153 handler.check_notify()
156 def output(self):
157 """Print the current bandwidth usage to the console
159 The output is (by default) grouped by apps. The columns
160 in the output: BYTES_IN, BYTES_OUT, APP NAME
162 util.clear()
164 for (cmd, (bytes_in, bytes_out)) in self.apps.iteritems():
165 if bytes_in > 1024. or bytes_out > 1024.:
166 if len(cmd) > 60:
167 cmd = cmd[:57] + '...'
168 print '%10.2f KiB / %10.2f KiB -- %s' % (bytes_in/1024., bytes_out/1024., cmd)
169 sys.stdout.flush()
172 def close(self):
173 """Close the aggregator and its monitors
175 for mon in self.monitors:
176 mon.close()
181 class NotificationHandler(object):
182 """Handler object for checking SLA thresholds and sending notifications
185 def __init__(self, cmd, in_threshold, out_threshold, interval, notify_command):
186 """Creates a new NotificationHandler object
188 @param cmd: commandline of the monitored process
189 @param in_threshold: incoming bandwidth threshold that is configured for the process
190 @param out_threshold: outgoing bandwidth threshold that is configured for the process
191 @param interval: interval that is configured for the process
192 @param notify_command: command that should be called when a notification is issued
194 self.cmd = cmd
195 self.in_threshold = in_threshold
196 self.out_threshold = out_threshold
197 self.in_data = util.RingBuffer(interval)
198 self.out_data = util.RingBuffer(interval)
199 self.notify_command = notify_command
201 def report_data(self, in_value, out_value):
202 """Report current usage data to the handler
204 @param in_value: currently utilized incoming bandwidth
205 @param out_value: currently utilized outgoing bandiwdht
207 self.in_data.append(in_value)
208 self.out_data.append(out_value)
211 def check_notify(self):
212 """Check if a notification should be issued and issue it if necessary
214 The average in-/out-bandwidth for the configured interval is
215 calculated and checked against the thresholds
217 in_avg = avg(self.in_data.get())
218 if self.in_threshold and in_avg > self.in_threshold:
219 self.notify('in', self.in_threshold, in_avg)
221 out_avg= avg(self.out_data.get())
222 if self.out_threshold and out_avg > self.out_threshold:
223 self.notify('out', self.out_threshold, out_avg)
225 def notify(self, direction, threshold, value):
226 """Issue a notification
228 This is called by check_notify if a threshold has been exceeded
230 @param direction: direction (in or out) for which the threshold was exceeded
231 @param threshold: configured threshold for the given direction
232 @param value: actual average bandwidth that has exceeded the threshold
234 import sys
235 if len(self.cmd) > 50:
236 cmd = self.cmd[:47] + '...'
237 else:
238 cmd = self.cmd
240 print >> sys.stderr, "%s: %s exceeding '%s' bandwidth limit %10.2f: %10.2f kB/s (%3.2f %%)" % \
241 (datetime.strftime(datetime.now(), '%F %T'), cmd, direction, threshold, value, value / threshold * 100)
244 if self.notify_command:
245 import shlex
246 args = shlex.split(self.notify_command)
248 import subprocess
249 subprocess.Popen(args)
252 def avg(x): return float(sum(x)) / len(x)