7af7625eba8cd13bf279d5479fcdcefbac62d8e0
[bwmon.git] / bwmon / pipe.py
blob7af7625eba8cd13bf279d5479fcdcefbac62d8e0
1 # -*- coding: utf-8 -*-
3 from __future__ import absolute_import
5 import sys
6 import socket
7 import threading
8 import time
10 from bwmon import model
11 from bwmon import util
13 class PipeThread(threading.Thread):
14 """A uni-direcational data pipe thread
16 Instances of this class will write from one file
17 ("source") and write the read data to the other
18 file ("sink"). The amount of bytes transferred
19 will be logged internally and can be retrieved
20 from the "traffic" attribute.
21 """
22 def __init__(self, source, sink):
23 """Create a new PipeThread object
25 @param source: The source file to be read from
26 @param sink: The destination file to be written to
27 """
28 threading.Thread.__init__(self)
29 self.source = source
30 self.sink = sink
31 self.traffic = 0
32 self.finished = False
34 def run(self):
35 """Run this thread (start forwarding data)
37 Data will be forwarded between the source and
38 sink files (as given to the constructor), and
39 the "traffic" attribute will be updated to tell
40 the value of bytes transferred. The attribute
41 "finished" will be set to True when the transfer
42 is complete.
43 """
44 while True:
45 try:
46 data = self.source.recv(1024)
47 if not data:
48 break
49 self.traffic += len(data)
50 self.sink.send(data)
51 except:
52 break
54 #print >>sys.stderr, 'Closing: (%s->%s) with total traffic %d' % (self.source.getpeername(),
55 # self.sink.getpeername(), self.traffic)
56 self.finished = True
58 class Pipe(threading.Thread):
59 """A data pipe from a local port to a (possibly remote) port
60 """
61 def __init__(self, port, newhost, newport):
62 """Create a new Pipe object
64 @param port: The source (listening) port
65 @param newhost: The target hostname
66 @param newport: The target port
67 """
68 threading.Thread.__init__(self)
69 self.closed = False
70 self.port = port
71 self.newhost = newhost
72 self.newport = newport
73 self.pipes_in = []
74 self.pipes_out = []
75 self.total_in = 0
76 self.total_out = 0
77 self.setup()
79 def setup(self):
80 """Setup the internal state of this object
82 This will automatically be called by the constructor,
83 and there should never be the need to call this from
84 outside.
85 """
86 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
87 self.sock.bind(('', self.port))
88 self.sock.settimeout(5)
89 self.sock.listen(5)
91 def update(self):
92 """Update the internal traffic stats and retire threads
94 This method should be called periodically to sum up the
95 traffic amounts. The total traffic of finished threads
96 will be summed up, and the threads will be removed from
97 the internal data structures.
98 """
99 sum_in, sum_out = self.total_in, self.total_out
101 # Calculate input traffic + retire finished sessions
102 for pipe in list(self.pipes_in):
103 sum_in += pipe.traffic
104 if pipe.finished:
105 self.total_in += pipe.traffic
106 self.pipes_in.remove(pipe)
108 # Calculate output traffic + retire finished sessions
109 for pipe in list(self.pipes_out):
110 sum_out += pipe.traffic
111 if pipe.finished:
112 self.total_out += pipe.traffic
113 self.pipes_out.remove(pipe)
115 return sum_in, sum_out
117 def run(self):
118 """Start the pipe, including its "child" pipes
120 This will listen for new connections until the
121 pipe itself is closed.
123 while not self.closed:
124 try:
125 newsock, address = self.sock.accept()
126 except socket.timeout:
127 continue
128 fwd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129 fwd.connect((self.newhost, self.newport))
130 in_pipe = PipeThread(newsock, fwd)
131 out_pipe = PipeThread(fwd, newsock)
132 self.pipes_in.append(in_pipe)
133 self.pipes_out.append(out_pipe)
134 in_pipe.start()
135 out_pipe.start()
137 def close(self):
138 """Close this Pipe, don't accept new threads"""
139 self.closed = True
141 class PipeMonitor(object):
142 """A lightweight monitoring object for Pipe objects
144 This is a lightweight wrapper for Pipe objects. It can
145 be used to obtain and expose monitoring data from the
146 Pipe to the Monitor or Aggregator objects.
148 DEFAULT_TIMEOUT = 1
150 def __init__(self, pipe):
151 """Create a new PipeMonitor object
153 @param pipe: A Pipe object to be monitored
155 self.pipe = pipe
156 self.cmdline = 'pipe-%d:%s:%d' % (pipe.port, pipe.newhost, pipe.newport)
157 self.timeout = self.DEFAULT_TIMEOUT
158 self.entries = model.MonitorEntryCollection(self.timeout)
160 def update(self, entry_collection):
161 """Update the monitor values from the pipe
163 Take the current traffic values from the Pipe object and
164 add a new MonitorEntry object to the entry_collection.
166 @param entry_collection: A MonitorEntryCollection object
168 bytes_in, bytes_out = self.pipe.update()
169 entry = model.MonitorEntry(self.cmdline, bytes_in, bytes_out, time.time())
170 entry_collection.add(entry)
171 entry_collection.expire()
173 def output(self):
174 """Print out the current status of this monitor object
176 This will print the current traffic to stdout.
178 util.clear()
179 entries = sorted(self.entries.get_traffic())
181 for bytes_in, bytes_out, cmd in entries:
182 if bytes_in or bytes_out:
183 if len(cmd) > 60:
184 cmd = cmd[:57] + '...'
185 print '%10d / %10d -- %s' % (bytes_in, bytes_out, cmd)
186 sys.stdout.flush()
188 def run(self):
189 """Run the mainloop for this monitor
191 This periodically updates and outputs the data.
193 while True:
194 self.update(self.entries)
195 self.output()
196 time.sleep(self.timeout)
198 def close(self):
199 """Close the underlying pipe"""
200 self.pipe.close()