Apply BSD-style license
[bwmon.git] / bwmon / pipe.py
blob504b595a9f0751fd4fae03fecff26e0ea9ae81eb
1 # -*- coding: utf-8 -*-
3 # Copyright 2010 Thomas Perl and Stefan Kögl. All rights reserved.
5 # Developed for a practical course (Large-scaled distributed computing) at the
6 # University of Technology Vienna in the 2010 summer term.
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions are met:
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
14 # 2. Redistributions in binary form must reproduce the above copyright notice,
15 # this list of conditions and the following disclaimer in the documentation
16 # and/or other materials provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR IMPLIED
19 # WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
20 # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
21 # EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
24 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
26 # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 from __future__ import absolute_import
32 import sys
33 import socket
34 import threading
35 import time
37 from bwmon import model
38 from bwmon import util
40 class PipeThread(threading.Thread):
41 """A uni-direcational data pipe thread
43 Instances of this class will write from one file
44 ("source") and write the read data to the other
45 file ("sink"). The amount of bytes transferred
46 will be logged internally and can be retrieved
47 from the "traffic" attribute.
48 """
49 def __init__(self, source, sink):
50 """Create a new PipeThread object
52 @param source: The source file to be read from
53 @param sink: The destination file to be written to
54 """
55 threading.Thread.__init__(self)
56 self.source = source
57 self.sink = sink
58 self.traffic = 0
59 self.finished = False
61 def run(self):
62 """Run this thread (start forwarding data)
64 Data will be forwarded between the source and
65 sink files (as given to the constructor), and
66 the "traffic" attribute will be updated to tell
67 the value of bytes transferred. The attribute
68 "finished" will be set to True when the transfer
69 is complete.
70 """
71 while True:
72 try:
73 data = self.source.recv(1024)
74 if not data:
75 break
76 self.traffic += len(data)
77 self.sink.send(data)
78 except:
79 break
81 #print >>sys.stderr, 'Closing: (%s->%s) with total traffic %d' % (self.source.getpeername(),
82 # self.sink.getpeername(), self.traffic)
83 self.finished = True
85 class Pipe(threading.Thread):
86 """A data pipe from a local port to a (possibly remote) port
87 """
88 def __init__(self, port, newhost, newport):
89 """Create a new Pipe object
91 @param port: The source (listening) port
92 @param newhost: The target hostname
93 @param newport: The target port
94 """
95 threading.Thread.__init__(self)
96 self.closed = False
97 self.port = port
98 self.newhost = newhost
99 self.newport = newport
100 self.pipes_in = []
101 self.pipes_out = []
102 self.total_in = 0
103 self.total_out = 0
104 self.setup()
106 def setup(self):
107 """Setup the internal state of this object
109 This will automatically be called by the constructor,
110 and there should never be the need to call this from
111 outside.
113 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
114 self.sock.bind(('', self.port))
115 self.sock.settimeout(5)
116 self.sock.listen(5)
118 def update(self):
119 """Update the internal traffic stats and retire threads
121 This method should be called periodically to sum up the
122 traffic amounts. The total traffic of finished threads
123 will be summed up, and the threads will be removed from
124 the internal data structures.
126 sum_in, sum_out = self.total_in, self.total_out
128 # Calculate input traffic + retire finished sessions
129 for pipe in list(self.pipes_in):
130 sum_in += pipe.traffic
131 if pipe.finished:
132 self.total_in += pipe.traffic
133 self.pipes_in.remove(pipe)
135 # Calculate output traffic + retire finished sessions
136 for pipe in list(self.pipes_out):
137 sum_out += pipe.traffic
138 if pipe.finished:
139 self.total_out += pipe.traffic
140 self.pipes_out.remove(pipe)
142 return sum_in, sum_out
144 def run(self):
145 """Start the pipe, including its "child" pipes
147 This will listen for new connections until the
148 pipe itself is closed.
150 while not self.closed:
151 try:
152 newsock, address = self.sock.accept()
153 except socket.timeout:
154 continue
155 fwd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
156 fwd.connect((self.newhost, self.newport))
157 in_pipe = PipeThread(newsock, fwd)
158 out_pipe = PipeThread(fwd, newsock)
159 self.pipes_in.append(in_pipe)
160 self.pipes_out.append(out_pipe)
161 in_pipe.start()
162 out_pipe.start()
164 def close(self):
165 """Close this Pipe, don't accept new threads"""
166 self.closed = True
168 class PipeMonitor(object):
169 """A lightweight monitoring object for Pipe objects
171 This is a lightweight wrapper for Pipe objects. It can
172 be used to obtain and expose monitoring data from the
173 Pipe to the Monitor or Aggregator objects.
175 DEFAULT_TIMEOUT = 1
177 def __init__(self, pipe):
178 """Create a new PipeMonitor object
180 @param pipe: A Pipe object to be monitored
182 self.pipe = pipe
183 self.cmdline = 'pipe-%d:%s:%d' % (pipe.port, pipe.newhost, pipe.newport)
184 self.timeout = self.DEFAULT_TIMEOUT
185 self.entries = model.MonitorEntryCollection(self.timeout)
187 def update(self, entry_collection):
188 """Update the monitor values from the pipe
190 Take the current traffic values from the Pipe object and
191 add a new MonitorEntry object to the entry_collection.
193 @param entry_collection: A MonitorEntryCollection object
195 bytes_in, bytes_out = self.pipe.update()
196 entry = model.MonitorEntry(self.cmdline, bytes_in, bytes_out, time.time())
197 entry_collection.add(entry)
198 entry_collection.expire()
200 def output(self):
201 """Print out the current status of this monitor object
203 This will print the current traffic to stdout.
205 util.clear()
206 entries = sorted(self.entries.get_traffic())
208 for bytes_in, bytes_out, cmd in entries:
209 if bytes_in or bytes_out:
210 if len(cmd) > 60:
211 cmd = cmd[:57] + '...'
212 print '%10d / %10d -- %s' % (bytes_in, bytes_out, cmd)
213 sys.stdout.flush()
215 def run(self):
216 """Run the mainloop for this monitor
218 This periodically updates and outputs the data.
220 while True:
221 self.update(self.entries)
222 self.output()
223 time.sleep(self.timeout)
225 def close(self):
226 """Close the underlying pipe"""
227 self.pipe.close()