1 # -*- coding: utf-8 -*-
3 from __future__
import absolute_import
12 from datetime
import datetime
14 from bwmon
import util
15 from bwmon
import model
16 from bwmon
import http
18 class Aggregator(object):
19 """Aggregator that merges output from multiple monitors
21 This class is used to aggregate the output of several monitors
22 (ip_conntrack-based or pipe-based) into one output.
26 """Create a new Aggregator object
28 The aggregator is initialized with a default update frequency
32 self
.update_frequency
= 1
33 self
.entries
= model
.MonitorEntryCollection(self
.update_frequency
)
36 self
.auto_group
= False
37 self
.notification_configs
= []
38 self
.notification_handler
= {}
39 http
.RequestHandler
.monitor
= self
.entries
41 def add_monitor(self
, monitor
):
42 """Add a new monitor to the aggregated result
44 The monitor can be a bwmon.pipe.PipeMonitor object or
45 a bwmon.monitor.Monitor object (i.e. a system-wide,
46 ip_conntrack based monitor).
48 @param monitor: A pipe.PipeMonitor or monitor.Monitor object
50 self
.monitors
.append(monitor
)
53 def add_notification(self
, regex
, in_threshold
, out_threshold
, interval
, command
):
57 @param in_threshold: TODO
58 @param out_threshold: TODO
62 self
.notification_configs
.append( (re
.compile(regex
), in_threshold
, out_threshold
, interval
, command
) )
65 def set_app_config(self
, app
, regex_list
):
69 @param regex_list: TODO
71 self
.app_configs
[app
] = regex_list
77 This runs the aggregator in an endless loop, printing
78 the current usage periodically and sends out pre-set
82 server
= BaseHTTPServer
.HTTPServer(('', 8000), http
.RequestHandler
)
84 server
.handle_request()
85 t
= threading
.Thread(target
=thread_proc
)
90 for monitor
in self
.monitors
:
91 monitor
.update(self
.entries
)
95 time
.sleep(self
.update_frequency
)
97 def get_app(self
, cmd
):
98 """Determine the app name given a command
100 Returns the name of the application that is assigned
101 to a given command (command line) as given by the config.
103 Fallback 1: The basename of the command (auto-grouping).
105 Fallback 2: The command itself.
107 for app
, regex
in self
.app_configs
.iteritems():
108 if any([re
.search(x
, cmd
) for x
in regex
]):
112 cmds
= shlex
.split(cmd
)
113 return os
.path
.basename(cmds
[0])
118 """Group command-based usage by application
120 This takes the current measurements (command line-based)
121 and calculates per-app statistics using the rules to
125 entries
= sorted(self
.entries
.get_usage())
126 for bytes_in
, bytes_out
, cmd
in entries
:
127 cmd
= self
.get_app(cmd
)
128 (b_in
, b_out
) = self
.apps
[cmd
] if cmd
in self
.apps
else (0, 0)
129 self
.apps
[cmd
] = (b_in
+ bytes_in
, b_out
+ bytes_out
)
132 """Compare current usage against SLA; send notifications
134 Compare the app-based usage values and check if there
135 are any SLA violations. If there are any violations, call
136 any notifiation callback as specified in the config.
138 for (cmd
, (bytes_in
, bytes_out
)) in self
.apps
.iteritems():
140 # App does not have notifications yet
141 if not cmd
in self
.notification_handler
:
142 self
.notification_handler
[cmd
] = []
143 for (regex
, in_threshold
, out_threshold
, interval
, not_command
) in self
.notification_configs
:
144 if regex
.search(cmd
):
145 self
.notification_handler
[cmd
].append(NotificationHandler(cmd
, in_threshold
, out_threshold
, interval
, not_command
))
148 for handler
in self
.notification_handler
[cmd
]:
149 handler
.report_data(bytes_in
/ 1024, bytes_out
/ 1024)
150 handler
.check_notify()
154 """Print the current bandwidth usage to the console
156 The output is (by default) grouped by apps. The columns
157 in the output: BYTES_IN, BYTES_OUT, APP NAME
161 for (cmd
, (bytes_in
, bytes_out
)) in self
.apps
.iteritems():
162 if bytes_in
> 1024. or bytes_out
> 1024.:
164 cmd
= cmd
[:57] + '...'
165 print '%10.2f KiB / %10.2f KiB -- %s' % (bytes_in
/1024., bytes_out
/1024., cmd
)
170 """Close the aggregator and its monitors
172 for mon
in self
.monitors
:
178 class NotificationHandler(object):
179 """Handler object for checking SLA thresholds and sending notifications
182 def __init__(self
, cmd
, in_threshold
, out_threshold
, interval
, notify_command
):
183 """Creates a new NotificationHandler object
186 @param in_threshold: TODO
187 @param out_threshold: TODO
188 @param interval: TODO
189 @param notify_command: TODO
192 self
.in_threshold
= in_threshold
193 self
.out_threshold
= out_threshold
194 self
.in_data
= util
.RingBuffer(interval
)
195 self
.out_data
= util
.RingBuffer(interval
)
196 self
.notify_command
= notify_command
198 def report_data(self
, in_value
, out_value
):
201 @param in_value: TODO
202 @param out_value: TODO
204 self
.in_data
.append(in_value
)
205 self
.out_data
.append(out_value
)
208 def check_notify(self
):
211 in_avg
= avg(self
.in_data
.get())
212 if self
.in_threshold
and in_avg
> self
.in_threshold
:
213 self
.notify('in', self
.in_threshold
, in_avg
)
215 out_avg
= avg(self
.out_data
.get())
216 if self
.out_threshold
and out_avg
> self
.out_threshold
:
217 self
.notify('out', self
.out_threshold
, out_avg
)
219 def notify(self
, direction
, threshold
, value
):
222 @param direction: TODO
223 @param threshold: TODO
227 if len(self
.cmd
) > 50:
228 cmd
= self
.cmd
[:47] + '...'
232 print >> sys
.stderr
, "%s: %s exceeding '%s' bandwidth limit %10.2f: %10.2f kB/s (%3.2f %%)" % \
233 (datetime
.strftime(datetime
.now(), '%F %T'), cmd
, direction
, threshold
, value
, value
/ threshold
* 100)
236 if self
.notify_command
:
238 args
= shlex
.split(self
.notify_command
)
241 subprocess
.Popen(args
)
244 def avg(x
): return float(sum(x
)) / len(x
)