7af7625eba8cd13bf279d5479fcdcefbac62d8e0
1 # -*- coding: utf-8 -*-
3 from __future__
import absolute_import
10 from bwmon
import model
11 from bwmon
import util
13 class PipeThread(threading
.Thread
):
14 """A uni-direcational data pipe thread
16 Instances of this class will write from one file
17 ("source") and write the read data to the other
18 file ("sink"). The amount of bytes transferred
19 will be logged internally and can be retrieved
20 from the "traffic" attribute.
22 def __init__(self
, source
, sink
):
23 """Create a new PipeThread object
25 @param source: The source file to be read from
26 @param sink: The destination file to be written to
28 threading
.Thread
.__init
__(self
)
35 """Run this thread (start forwarding data)
37 Data will be forwarded between the source and
38 sink files (as given to the constructor), and
39 the "traffic" attribute will be updated to tell
40 the value of bytes transferred. The attribute
41 "finished" will be set to True when the transfer
46 data
= self
.source
.recv(1024)
49 self
.traffic
+= len(data
)
54 #print >>sys.stderr, 'Closing: (%s->%s) with total traffic %d' % (self.source.getpeername(),
55 # self.sink.getpeername(), self.traffic)
58 class Pipe(threading
.Thread
):
59 """A data pipe from a local port to a (possibly remote) port
61 def __init__(self
, port
, newhost
, newport
):
62 """Create a new Pipe object
64 @param port: The source (listening) port
65 @param newhost: The target hostname
66 @param newport: The target port
68 threading
.Thread
.__init
__(self
)
71 self
.newhost
= newhost
72 self
.newport
= newport
80 """Setup the internal state of this object
82 This will automatically be called by the constructor,
83 and there should never be the need to call this from
86 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
87 self
.sock
.bind(('', self
.port
))
88 self
.sock
.settimeout(5)
92 """Update the internal traffic stats and retire threads
94 This method should be called periodically to sum up the
95 traffic amounts. The total traffic of finished threads
96 will be summed up, and the threads will be removed from
97 the internal data structures.
99 sum_in
, sum_out
= self
.total_in
, self
.total_out
101 # Calculate input traffic + retire finished sessions
102 for pipe
in list(self
.pipes_in
):
103 sum_in
+= pipe
.traffic
105 self
.total_in
+= pipe
.traffic
106 self
.pipes_in
.remove(pipe
)
108 # Calculate output traffic + retire finished sessions
109 for pipe
in list(self
.pipes_out
):
110 sum_out
+= pipe
.traffic
112 self
.total_out
+= pipe
.traffic
113 self
.pipes_out
.remove(pipe
)
115 return sum_in
, sum_out
118 """Start the pipe, including its "child" pipes
120 This will listen for new connections until the
121 pipe itself is closed.
123 while not self
.closed
:
125 newsock
, address
= self
.sock
.accept()
126 except socket
.timeout
:
128 fwd
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
129 fwd
.connect((self
.newhost
, self
.newport
))
130 in_pipe
= PipeThread(newsock
, fwd
)
131 out_pipe
= PipeThread(fwd
, newsock
)
132 self
.pipes_in
.append(in_pipe
)
133 self
.pipes_out
.append(out_pipe
)
138 """Close this Pipe, don't accept new threads"""
141 class PipeMonitor(object):
142 """A lightweight monitoring object for Pipe objects
144 This is a lightweight wrapper for Pipe objects. It can
145 be used to obtain and expose monitoring data from the
146 Pipe to the Monitor or Aggregator objects.
150 def __init__(self
, pipe
):
151 """Create a new PipeMonitor object
153 @param pipe: A Pipe object to be monitored
156 self
.cmdline
= 'pipe-%d:%s:%d' % (pipe
.port
, pipe
.newhost
, pipe
.newport
)
157 self
.timeout
= self
.DEFAULT_TIMEOUT
158 self
.entries
= model
.MonitorEntryCollection(self
.timeout
)
160 def update(self
, entry_collection
):
161 """Update the monitor values from the pipe
163 Take the current traffic values from the Pipe object and
164 add a new MonitorEntry object to the entry_collection.
166 @param entry_collection: A MonitorEntryCollection object
168 bytes_in
, bytes_out
= self
.pipe
.update()
169 entry
= model
.MonitorEntry(self
.cmdline
, bytes_in
, bytes_out
, time
.time())
170 entry_collection
.add(entry
)
171 entry_collection
.expire()
174 """Print out the current status of this monitor object
176 This will print the current traffic to stdout.
179 entries
= sorted(self
.entries
.get_traffic())
181 for bytes_in
, bytes_out
, cmd
in entries
:
182 if bytes_in
or bytes_out
:
184 cmd
= cmd
[:57] + '...'
185 print '%10d / %10d -- %s' % (bytes_in
, bytes_out
, cmd
)
189 """Run the mainloop for this monitor
191 This periodically updates and outputs the data.
194 self
.update(self
.entries
)
196 time
.sleep(self
.timeout
)
199 """Close the underlying pipe"""