1 # -*- coding: utf-8 -*-
3 # Copyright 2010 Thomas Perl and Stefan Kögl. All rights reserved.
5 # Developed for a practical course (Large-scaled distributed computing) at the
6 # University of Technology Vienna in the 2010 summer term.
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions are met:
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
14 # 2. Redistributions in binary form must reproduce the above copyright notice,
15 # this list of conditions and the following disclaimer in the documentation
16 # and/or other materials provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR IMPLIED
19 # WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
20 # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
21 # EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
24 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
26 # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 from __future__
import absolute_import
37 from bwmon
import model
38 from bwmon
import util
40 class PipeThread(threading
.Thread
):
41 """A uni-direcational data pipe thread
43 Instances of this class will write from one file
44 ("source") and write the read data to the other
45 file ("sink"). The amount of bytes transferred
46 will be logged internally and can be retrieved
47 from the "traffic" attribute.
49 def __init__(self
, source
, sink
):
50 """Create a new PipeThread object
52 @param source: The source file to be read from
53 @param sink: The destination file to be written to
55 threading
.Thread
.__init
__(self
)
62 """Run this thread (start forwarding data)
64 Data will be forwarded between the source and
65 sink files (as given to the constructor), and
66 the "traffic" attribute will be updated to tell
67 the value of bytes transferred. The attribute
68 "finished" will be set to True when the transfer
73 data
= self
.source
.recv(1024)
76 self
.traffic
+= len(data
)
81 #print >>sys.stderr, 'Closing: (%s->%s) with total traffic %d' % (self.source.getpeername(),
82 # self.sink.getpeername(), self.traffic)
85 class Pipe(threading
.Thread
):
86 """A data pipe from a local port to a (possibly remote) port
88 def __init__(self
, port
, newhost
, newport
):
89 """Create a new Pipe object
91 @param port: The source (listening) port
92 @param newhost: The target hostname
93 @param newport: The target port
95 threading
.Thread
.__init
__(self
)
98 self
.newhost
= newhost
99 self
.newport
= newport
107 """Setup the internal state of this object
109 This will automatically be called by the constructor,
110 and there should never be the need to call this from
113 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
114 self
.sock
.bind(('', self
.port
))
115 self
.sock
.settimeout(5)
119 """Update the internal traffic stats and retire threads
121 This method should be called periodically to sum up the
122 traffic amounts. The total traffic of finished threads
123 will be summed up, and the threads will be removed from
124 the internal data structures.
126 sum_in
, sum_out
= self
.total_in
, self
.total_out
128 # Calculate input traffic + retire finished sessions
129 for pipe
in list(self
.pipes_in
):
130 sum_in
+= pipe
.traffic
132 self
.total_in
+= pipe
.traffic
133 self
.pipes_in
.remove(pipe
)
135 # Calculate output traffic + retire finished sessions
136 for pipe
in list(self
.pipes_out
):
137 sum_out
+= pipe
.traffic
139 self
.total_out
+= pipe
.traffic
140 self
.pipes_out
.remove(pipe
)
142 return sum_in
, sum_out
145 """Start the pipe, including its "child" pipes
147 This will listen for new connections until the
148 pipe itself is closed.
150 while not self
.closed
:
152 newsock
, address
= self
.sock
.accept()
153 except socket
.timeout
:
155 fwd
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
156 fwd
.connect((self
.newhost
, self
.newport
))
157 in_pipe
= PipeThread(newsock
, fwd
)
158 out_pipe
= PipeThread(fwd
, newsock
)
159 self
.pipes_in
.append(in_pipe
)
160 self
.pipes_out
.append(out_pipe
)
165 """Close this Pipe, don't accept new threads"""
168 class PipeMonitor(object):
169 """A lightweight monitoring object for Pipe objects
171 This is a lightweight wrapper for Pipe objects. It can
172 be used to obtain and expose monitoring data from the
173 Pipe to the Monitor or Aggregator objects.
177 def __init__(self
, pipe
):
178 """Create a new PipeMonitor object
180 @param pipe: A Pipe object to be monitored
183 self
.cmdline
= 'pipe-%d:%s:%d' % (pipe
.port
, pipe
.newhost
, pipe
.newport
)
184 self
.timeout
= self
.DEFAULT_TIMEOUT
185 self
.entries
= model
.MonitorEntryCollection(self
.timeout
)
187 def update(self
, entry_collection
):
188 """Update the monitor values from the pipe
190 Take the current traffic values from the Pipe object and
191 add a new MonitorEntry object to the entry_collection.
193 @param entry_collection: A MonitorEntryCollection object
195 bytes_in
, bytes_out
= self
.pipe
.update()
196 entry
= model
.MonitorEntry(self
.cmdline
, bytes_in
, bytes_out
, time
.time())
197 entry_collection
.add(entry
)
198 entry_collection
.expire()
201 """Print out the current status of this monitor object
203 This will print the current traffic to stdout.
206 entries
= sorted(self
.entries
.get_traffic())
208 for bytes_in
, bytes_out
, cmd
in entries
:
209 if bytes_in
or bytes_out
:
211 cmd
= cmd
[:57] + '...'
212 print '%10d / %10d -- %s' % (bytes_in
, bytes_out
, cmd
)
216 """Run the mainloop for this monitor
218 This periodically updates and outputs the data.
221 self
.update(self
.entries
)
223 time
.sleep(self
.timeout
)
226 """Close the underlying pipe"""