1 # -*- coding: utf-8 -*-
3 # Copyright 2010 Thomas Perl and Stefan Kögl. All rights reserved.
5 # Developed for a practical course (Large-scaled distributed computing) at the
6 # University of Technology Vienna in the 2010 summer term.
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions are met:
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
14 # 2. Redistributions in binary form must reproduce the above copyright notice,
15 # this list of conditions and the following disclaimer in the documentation
16 # and/or other materials provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR IMPLIED
19 # WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
20 # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
21 # EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
24 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
26 # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 from __future__
import absolute_import
39 from datetime
import datetime
41 from bwmon
import util
42 from bwmon
import model
43 from bwmon
import http
45 class Aggregator(object):
46 """Aggregator that merges output from multiple monitors
48 This class is used to aggregate the output of several monitors
49 (ip_conntrack-based or pipe-based) into one output.
53 """Create a new Aggregator object
55 The aggregator is initialized with a default update frequency
59 self
.update_frequency
= 1
60 self
.entries
= model
.MonitorEntryCollection(self
.update_frequency
)
63 self
.auto_group
= False
64 self
.notification_configs
= []
65 self
.notification_handler
= {}
66 http
.RequestHandler
.monitor
= self
.entries
68 def add_monitor(self
, monitor
):
69 """Add a new monitor to the aggregated result
71 The monitor can be a bwmon.pipe.PipeMonitor object or
72 a bwmon.monitor.Monitor object (i.e. a system-wide,
73 ip_conntrack based monitor).
75 @param monitor: A pipe.PipeMonitor or monitor.Monitor object
77 self
.monitors
.append(monitor
)
80 def add_notification(self
, regex
, in_threshold
, out_threshold
, interval
, command
):
81 """Add a notification setting
83 A notification entry consists of a regex that specifies for which
84 processes/applications it is valid and in/out thresholds
86 @param regex: regular expression that is matched against processes/applications
87 @param in_threshold: incoming bandwidth threshold in kB/s
88 @param out_threshold: outgoing bandwidth threshold in kB/s
89 @param interval: interval in seconds for which the average bandwidth is calculated
90 @param command: optional command that shall be executed when a notification is issued
92 self
.notification_configs
.append( (re
.compile(regex
), in_threshold
, out_threshold
, interval
, command
) )
95 def set_app_config(self
, app
, regex_list
):
96 """Add a config entry on how to group processes to Applications
98 @param app: name of the formed application
99 @param regex_list: a list of regular expressions that are matched against the processes full commandline
101 self
.app_configs
[app
] = regex_list
105 """Run the aggregator
107 This runs the aggregator in an endless loop, printing
108 the current usage periodically and sends out pre-set
112 server
= BaseHTTPServer
.HTTPServer(('', 8000), http
.RequestHandler
)
114 server
.handle_request()
115 t
= threading
.Thread(target
=thread_proc
)
120 for monitor
in self
.monitors
:
121 monitor
.update(self
.entries
)
125 time
.sleep(self
.update_frequency
)
127 def get_app(self
, cmd
):
128 """Determine the app name given a command
130 Returns the name of the application that is assigned
131 to a given command (command line) as given by the config.
133 Fallback 1: The basename of the command (auto-grouping).
135 Fallback 2: The command itself.
137 for app
, regex
in self
.app_configs
.iteritems():
138 if any([re
.search(x
, cmd
) for x
in regex
]):
142 cmds
= shlex
.split(cmd
)
143 return os
.path
.basename(cmds
[0])
148 """Group command-based usage by application
150 This takes the current measurements (command line-based)
151 and calculates per-app statistics using the rules to
155 entries
= sorted(self
.entries
.get_usage())
156 for bytes_in
, bytes_out
, cmd
in entries
:
157 cmd
= self
.get_app(cmd
)
158 (b_in
, b_out
) = self
.apps
[cmd
] if cmd
in self
.apps
else (0, 0)
159 self
.apps
[cmd
] = (b_in
+ bytes_in
, b_out
+ bytes_out
)
162 """Compare current usage against SLA; send notifications
164 Compare the app-based usage values and check if there
165 are any SLA violations. If there are any violations, call
166 any notifiation callback as specified in the config.
168 for (cmd
, (bytes_in
, bytes_out
)) in self
.apps
.iteritems():
170 # App does not have notifications yet
171 if not cmd
in self
.notification_handler
:
172 self
.notification_handler
[cmd
] = []
173 for (regex
, in_threshold
, out_threshold
, interval
, not_command
) in self
.notification_configs
:
174 if regex
.search(cmd
):
175 self
.notification_handler
[cmd
].append(NotificationHandler(cmd
, in_threshold
, out_threshold
, interval
, not_command
))
178 for handler
in self
.notification_handler
[cmd
]:
179 handler
.report_data(bytes_in
/ 1024, bytes_out
/ 1024)
180 handler
.check_notify()
184 """Print the current bandwidth usage to the console
186 The output is (by default) grouped by apps. The columns
187 in the output: BYTES_IN, BYTES_OUT, APP NAME
191 for (cmd
, (bytes_in
, bytes_out
)) in self
.apps
.iteritems():
192 if bytes_in
> 1024. or bytes_out
> 1024.:
194 cmd
= cmd
[:57] + '...'
195 print '%10.2f KiB / %10.2f KiB -- %s' % (bytes_in
/1024., bytes_out
/1024., cmd
)
200 """Close the aggregator and its monitors
202 for mon
in self
.monitors
:
208 class NotificationHandler(object):
209 """Handler object for checking SLA thresholds and sending notifications
212 def __init__(self
, cmd
, in_threshold
, out_threshold
, interval
, notify_command
):
213 """Creates a new NotificationHandler object
215 @param cmd: commandline of the monitored process
216 @param in_threshold: incoming bandwidth threshold that is configured for the process
217 @param out_threshold: outgoing bandwidth threshold that is configured for the process
218 @param interval: interval that is configured for the process
219 @param notify_command: command that should be called when a notification is issued
222 self
.in_threshold
= in_threshold
223 self
.out_threshold
= out_threshold
224 self
.in_data
= util
.RingBuffer(interval
)
225 self
.out_data
= util
.RingBuffer(interval
)
226 self
.notify_command
= notify_command
228 def report_data(self
, in_value
, out_value
):
229 """Report current usage data to the handler
231 @param in_value: currently utilized incoming bandwidth
232 @param out_value: currently utilized outgoing bandiwdht
234 self
.in_data
.append(in_value
)
235 self
.out_data
.append(out_value
)
238 def check_notify(self
):
239 """Check if a notification should be issued and issue it if necessary
241 The average in-/out-bandwidth for the configured interval is
242 calculated and checked against the thresholds
244 in_avg
= avg(self
.in_data
.get())
245 if self
.in_threshold
and in_avg
> self
.in_threshold
:
246 self
.notify('in', self
.in_threshold
, in_avg
)
248 out_avg
= avg(self
.out_data
.get())
249 if self
.out_threshold
and out_avg
> self
.out_threshold
:
250 self
.notify('out', self
.out_threshold
, out_avg
)
252 def notify(self
, direction
, threshold
, value
):
253 """Issue a notification
255 This is called by check_notify if a threshold has been exceeded
257 @param direction: direction (in or out) for which the threshold was exceeded
258 @param threshold: configured threshold for the given direction
259 @param value: actual average bandwidth that has exceeded the threshold
262 if len(self
.cmd
) > 50:
263 cmd
= self
.cmd
[:47] + '...'
267 print >> sys
.stderr
, "%s: %s exceeding '%s' bandwidth limit %10.2f: %10.2f kB/s (%3.2f %%)" % \
268 (datetime
.strftime(datetime
.now(), '%F %T'), cmd
, direction
, threshold
, value
, value
/ threshold
* 100)
271 if self
.notify_command
:
273 args
= shlex
.split(self
.notify_command
)
276 subprocess
.Popen(args
)
279 def avg(x
): return float(sum(x
)) / len(x
)