04a933efe7467de763aa10970cd3a43010a001d0
1 # -*- coding: utf-8 -*-
3 from __future__
import absolute_import
12 from datetime
import datetime
14 from bwmon
import util
15 from bwmon
import model
16 from bwmon
import http
18 class Aggregator(object):
19 """Aggregator that merges output from multiple monitors
21 This class is used to aggregate the output of several monitors
22 (ip_conntrack-based or pipe-based) into one output.
26 """Create a new Aggregator object
28 The aggregator is initialized with a default update frequency
32 self
.update_frequency
= 1
33 self
.entries
= model
.MonitorEntryCollection(self
.update_frequency
)
36 self
.auto_group
= False
37 self
.notification_configs
= []
38 self
.notification_handler
= {}
39 http
.RequestHandler
.monitor
= self
.entries
41 def add_monitor(self
, monitor
):
42 """Add a new monitor to the aggregated result
44 The monitor can be a bwmon.pipe.PipeMonitor object or
45 a bwmon.monitor.Monitor object (i.e. a system-wide,
46 ip_conntrack based monitor).
48 @param monitor: A pipe.PipeMonitor or monitor.Monitor object
50 self
.monitors
.append(monitor
)
53 def add_notification(self
, regex
, in_threshold
, out_threshold
, interval
, command
):
54 """Add a notification setting
56 A notification entry consists of a regex that specifies for which
57 processes/applications it is valid and in/out thresholds
59 @param regex: regular expression that is matched against processes/applications
60 @param in_threshold: incoming bandwidth threshold in kB/s
61 @param out_threshold: outgoing bandwidth threshold in kB/s
62 @param interval: interval in seconds for which the average bandwidth is calculated
63 @param command: optional command that shall be executed when a notification is issued
65 self
.notification_configs
.append( (re
.compile(regex
), in_threshold
, out_threshold
, interval
, command
) )
68 def set_app_config(self
, app
, regex_list
):
69 """Add a config entry on how to group processes to Applications
71 @param app: name of the formed application
72 @param regex_list: a list of regular expressions that are matched against the processes full commandline
74 self
.app_configs
[app
] = regex_list
80 This runs the aggregator in an endless loop, printing
81 the current usage periodically and sends out pre-set
85 server
= BaseHTTPServer
.HTTPServer(('', 8000), http
.RequestHandler
)
87 server
.handle_request()
88 t
= threading
.Thread(target
=thread_proc
)
93 for monitor
in self
.monitors
:
94 monitor
.update(self
.entries
)
98 time
.sleep(self
.update_frequency
)
100 def get_app(self
, cmd
):
101 """Determine the app name given a command
103 Returns the name of the application that is assigned
104 to a given command (command line) as given by the config.
106 Fallback 1: The basename of the command (auto-grouping).
108 Fallback 2: The command itself.
110 for app
, regex
in self
.app_configs
.iteritems():
111 if any([re
.search(x
, cmd
) for x
in regex
]):
115 cmds
= shlex
.split(cmd
)
116 return os
.path
.basename(cmds
[0])
121 """Group command-based usage by application
123 This takes the current measurements (command line-based)
124 and calculates per-app statistics using the rules to
128 entries
= sorted(self
.entries
.get_usage())
129 for bytes_in
, bytes_out
, cmd
in entries
:
130 cmd
= self
.get_app(cmd
)
131 (b_in
, b_out
) = self
.apps
[cmd
] if cmd
in self
.apps
else (0, 0)
132 self
.apps
[cmd
] = (b_in
+ bytes_in
, b_out
+ bytes_out
)
135 """Compare current usage against SLA; send notifications
137 Compare the app-based usage values and check if there
138 are any SLA violations. If there are any violations, call
139 any notifiation callback as specified in the config.
141 for (cmd
, (bytes_in
, bytes_out
)) in self
.apps
.iteritems():
143 # App does not have notifications yet
144 if not cmd
in self
.notification_handler
:
145 self
.notification_handler
[cmd
] = []
146 for (regex
, in_threshold
, out_threshold
, interval
, not_command
) in self
.notification_configs
:
147 if regex
.search(cmd
):
148 self
.notification_handler
[cmd
].append(NotificationHandler(cmd
, in_threshold
, out_threshold
, interval
, not_command
))
151 for handler
in self
.notification_handler
[cmd
]:
152 handler
.report_data(bytes_in
/ 1024, bytes_out
/ 1024)
153 handler
.check_notify()
157 """Print the current bandwidth usage to the console
159 The output is (by default) grouped by apps. The columns
160 in the output: BYTES_IN, BYTES_OUT, APP NAME
164 for (cmd
, (bytes_in
, bytes_out
)) in self
.apps
.iteritems():
165 if bytes_in
> 1024. or bytes_out
> 1024.:
167 cmd
= cmd
[:57] + '...'
168 print '%10.2f KiB / %10.2f KiB -- %s' % (bytes_in
/1024., bytes_out
/1024., cmd
)
173 """Close the aggregator and its monitors
175 for mon
in self
.monitors
:
181 class NotificationHandler(object):
182 """Handler object for checking SLA thresholds and sending notifications
185 def __init__(self
, cmd
, in_threshold
, out_threshold
, interval
, notify_command
):
186 """Creates a new NotificationHandler object
188 @param cmd: commandline of the monitored process
189 @param in_threshold: incoming bandwidth threshold that is configured for the process
190 @param out_threshold: outgoing bandwidth threshold that is configured for the process
191 @param interval: interval that is configured for the process
192 @param notify_command: command that should be called when a notification is issued
195 self
.in_threshold
= in_threshold
196 self
.out_threshold
= out_threshold
197 self
.in_data
= util
.RingBuffer(interval
)
198 self
.out_data
= util
.RingBuffer(interval
)
199 self
.notify_command
= notify_command
201 def report_data(self
, in_value
, out_value
):
202 """Report current usage data to the handler
204 @param in_value: currently utilized incoming bandwidth
205 @param out_value: currently utilized outgoing bandiwdht
207 self
.in_data
.append(in_value
)
208 self
.out_data
.append(out_value
)
211 def check_notify(self
):
212 """Check if a notification should be issued and issue it if necessary
214 The average in-/out-bandwidth for the configured interval is
215 calculated and checked against the thresholds
217 in_avg
= avg(self
.in_data
.get())
218 if self
.in_threshold
and in_avg
> self
.in_threshold
:
219 self
.notify('in', self
.in_threshold
, in_avg
)
221 out_avg
= avg(self
.out_data
.get())
222 if self
.out_threshold
and out_avg
> self
.out_threshold
:
223 self
.notify('out', self
.out_threshold
, out_avg
)
225 def notify(self
, direction
, threshold
, value
):
226 """Issue a notification
228 This is called by check_notify if a threshold has been exceeded
230 @param direction: direction (in or out) for which the threshold was exceeded
231 @param threshold: configured threshold for the given direction
232 @param value: actual average bandwidth that has exceeded the threshold
235 if len(self
.cmd
) > 50:
236 cmd
= self
.cmd
[:47] + '...'
240 print >> sys
.stderr
, "%s: %s exceeding '%s' bandwidth limit %10.2f: %10.2f kB/s (%3.2f %%)" % \
241 (datetime
.strftime(datetime
.now(), '%F %T'), cmd
, direction
, threshold
, value
, value
/ threshold
* 100)
244 if self
.notify_command
:
246 args
= shlex
.split(self
.notify_command
)
249 subprocess
.Popen(args
)
252 def avg(x
): return float(sum(x
)) / len(x
)