Rename gsh to polysh.
[polysh.git] / polysh / pity.py
blob171d5c4cbac27c5ee42f7e46c3f2f2ebae73f05d
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2007 Guillaume Chazarain <guichaz@gmail.com>
20 import binascii
21 import os
22 import signal
23 import socket
24 import string
25 import subprocess
26 import sys
27 import termios
28 import time
29 from threading import Event, Thread
30 from Queue import Queue
32 # Somewhat protect the stdin, be sure we read what has been sent by polysh, and
33 # not some garbage entered by the user.
34 STDIN_PREFIX = '!?^%!'
36 UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB']
38 BASE64_TERMINATOR = '.'
40 def human_unit(size):
41 """Return a string of the form '12.34 MiB' given a size in bytes."""
42 for i in xrange(len(UNITS) - 1, 0, -1):
43 base = 2.0 ** (10 * i)
44 if 2 * base < size:
45 return '%.2f %s' % ((float(size) / base), UNITS[i])
46 return str(size) + ' ' + UNITS[0]
49 class bandwidth_monitor(Thread):
50 def __init__(self):
51 Thread.__init__(self)
52 self.setDaemon(True)
53 self.main_done = Event()
54 self.size = 0
55 self.start()
57 def add_transferred_size(self, size):
58 self.size = self.size + size
60 def finish(self):
61 self.main_done.set()
62 self.join()
64 def run(self):
65 previous_size = 0
66 previous_sampling_time = time.time()
67 previous_bandwidth = 0
68 while not self.main_done.isSet():
69 current_size = self.size
70 current_sampling_time = time.time()
71 current_bandwidth = (current_size - previous_size) / \
72 (current_sampling_time - previous_sampling_time)
73 current_bandwidth = (2*current_bandwidth + previous_bandwidth) / 3.0
74 if current_bandwidth < 1:
75 current_bandwidth = 0
76 print '%s transferred at %s/s' % (human_unit(current_size),
77 human_unit(current_bandwidth))
78 previous_size = current_size
79 previous_sampling_time = current_sampling_time
80 previous_bandwidth = current_bandwidth
81 self.main_done.wait(1.0)
82 print 'Done transferring %s bytes (%s)' % (self.size,
83 human_unit(self.size))
85 def write_fully(fd, data):
86 while data:
87 written = os.write(fd, data)
88 data = data[written:]
91 class Reader(object):
92 def __init__(self, input_file):
93 self.input_file = input_file
94 self.fd = input_file.fileno()
96 def close(self):
97 return self.input_file.close()
100 class Base64Reader(Reader):
101 def __init__(self, input_file):
102 super(Base64Reader, self).__init__(input_file)
103 self.buffer = ''
104 self.eof_found = False
106 def read(self):
107 while True:
108 if self.eof_found:
109 assert not self.buffer, self.buffer
110 return None
112 piece = os.read(self.fd, 77 * 1024)
113 if BASE64_TERMINATOR in piece[-4:]:
114 self.eof_found = True
115 piece = piece[:piece.index(BASE64_TERMINATOR)]
116 self.buffer += piece.replace('\n', '')
117 if len(self.buffer) % 4:
118 end_offset = 4 * (len(self.buffer) // 4)
119 to_decode = self.buffer[:end_offset]
120 self.buffer = self.buffer[end_offset:]
121 else:
122 to_decode = self.buffer
123 self.buffer = ''
124 if to_decode:
125 return binascii.a2b_base64(to_decode)
128 class FileReader(Reader):
129 def __init__(self, input_file):
130 super(FileReader, self).__init__(input_file)
132 def read(self):
133 return os.read(self.fd, 32 * 1024)
136 def forward(reader, output_files, print_bw):
137 if print_bw:
138 bw = bandwidth_monitor()
140 output_fds = [output_file.fileno() for output_file in output_files]
142 while True:
143 data = reader.read()
144 if not data:
145 break
146 if print_bw:
147 bw.add_transferred_size(len(data))
148 for output_fd in output_fds:
149 write_fully(output_fd, data)
151 if print_bw:
152 bw.finish()
154 reader.close()
155 for output_file in output_files:
156 output_file.close()
158 def init_listening_socket(polysh1, polysh2):
159 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
160 s.bind(('', 0))
161 s.listen(5)
162 host = socket.gethostname()
163 port = s.getsockname()[1]
164 print '%s%s%s:%s' % (polysh1, polysh2, host, port)
165 return s
168 def pipe_to_untar():
169 p = subprocess.Popen(['tar', 'x'],
170 stdin=subprocess.PIPE,
171 stdout=None,
172 close_fds=True)
174 return p.stdin
176 def new_connection(host_port):
177 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
178 host, port_str = host_port.split(':')
179 s.connect((host, int(port_str)))
180 return s.makefile('r+b')
183 def do_replicate(destinations, print_bw):
184 connections = [new_connection(host_port) for host_port in destinations]
185 forward(FileReader(sys.stdin), connections, print_bw)
188 def do_upload(destinations, print_bw):
189 untar = pipe_to_untar()
190 connections = [new_connection(host_port) for host_port in destinations]
191 forward(Base64Reader(sys.stdin), [untar] + connections, print_bw)
194 def do_forward(polysh1, polysh2, destinations, print_bw):
195 listening_socket = init_listening_socket(polysh1, polysh2)
196 untar = pipe_to_untar()
197 connections = [new_connection(host_port) for host_port in destinations]
198 conn, addr = listening_socket.accept()
199 forward(FileReader(conn), [untar] + connections, print_bw)
202 # Usage:
204 # pity.py [--print-bw] replicate host:port...
205 # => reads data on stdin and forwards it to the optional list of host:port
207 # pity.py [--print-bw] upload host:port...
208 # => reads base64 on stdin and forwards it to the optional list of host:port
210 # pity.py [--print-bw] forward POLYSH1 POLYSH2 host:port...
211 # => prints listening host:port on stdout prefixed by POLYSH1POLYSH2 and
212 # forwards from this port to the optional list of host:port
214 def main():
215 signal.signal(signal.SIGINT, lambda sig, frame: os.kill(0, signal.SIGKILL))
216 if sys.argv[1] == '--print-bw':
217 print_bw = True
218 argv = sys.argv[2:]
219 else:
220 print_bw = False
221 argv = sys.argv[1:]
222 cmd = argv[0]
223 try:
224 if cmd == 'replicate':
225 do_replicate(argv[1:], print_bw)
226 elif cmd == 'upload':
227 do_upload(argv[1:], print_bw)
228 elif cmd == 'forward' and len(argv) >= 3:
229 do_forward(argv[1], argv[2], argv[3:], print_bw)
230 else:
231 print 'Unknown command:', argv
232 sys.exit(1)
233 except OSError, e:
234 print e
235 sys.exit(1)
238 if __name__ == '__main__':
239 main()