1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2007 Guillaume Chazarain <guichaz@gmail.com>
29 from threading
import Event
, Thread
30 from Queue
import Queue
32 # Somewhat protect the stdin, be sure we read what has been sent by polysh, and
33 # not some garbage entered by the user.
34 STDIN_PREFIX
= '!?^%!'
36 UNITS
= ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB']
38 BASE64_TERMINATOR
= '.'
41 """Return a string of the form '12.34 MiB' given a size in bytes."""
42 for i
in xrange(len(UNITS
) - 1, 0, -1):
43 base
= 2.0 ** (10 * i
)
45 return '%.2f %s' % ((float(size
) / base
), UNITS
[i
])
46 return str(size
) + ' ' + UNITS
[0]
49 class bandwidth_monitor(Thread
):
53 self
.main_done
= Event()
57 def add_transferred_size(self
, size
):
58 self
.size
= self
.size
+ size
66 previous_sampling_time
= time
.time()
67 previous_bandwidth
= 0
68 while not self
.main_done
.isSet():
69 current_size
= self
.size
70 current_sampling_time
= time
.time()
71 current_bandwidth
= (current_size
- previous_size
) / \
72 (current_sampling_time
- previous_sampling_time
)
73 current_bandwidth
= (2*current_bandwidth
+ previous_bandwidth
) / 3.0
74 if current_bandwidth
< 1:
76 print '%s transferred at %s/s' % (human_unit(current_size
),
77 human_unit(current_bandwidth
))
78 previous_size
= current_size
79 previous_sampling_time
= current_sampling_time
80 previous_bandwidth
= current_bandwidth
81 self
.main_done
.wait(1.0)
82 print 'Done transferring %s bytes (%s)' % (self
.size
,
83 human_unit(self
.size
))
85 def write_fully(fd
, data
):
87 written
= os
.write(fd
, data
)
92 def __init__(self
, input_file
):
93 self
.input_file
= input_file
94 self
.fd
= input_file
.fileno()
97 return self
.input_file
.close()
100 class Base64Reader(Reader
):
101 def __init__(self
, input_file
):
102 super(Base64Reader
, self
).__init
__(input_file
)
104 self
.eof_found
= False
109 assert not self
.buffer, self
.buffer
112 piece
= os
.read(self
.fd
, 77 * 1024)
113 if BASE64_TERMINATOR
in piece
[-4:]:
114 self
.eof_found
= True
115 piece
= piece
[:piece
.index(BASE64_TERMINATOR
)]
116 self
.buffer += piece
.replace('\n', '')
117 if len(self
.buffer) % 4:
118 end_offset
= 4 * (len(self
.buffer) // 4)
119 to_decode
= self
.buffer[:end_offset
]
120 self
.buffer = self
.buffer[end_offset
:]
122 to_decode
= self
.buffer
125 return binascii
.a2b_base64(to_decode
)
128 class FileReader(Reader
):
129 def __init__(self
, input_file
):
130 super(FileReader
, self
).__init
__(input_file
)
133 return os
.read(self
.fd
, 32 * 1024)
136 def forward(reader
, output_files
, print_bw
):
138 bw
= bandwidth_monitor()
140 output_fds
= [output_file
.fileno() for output_file
in output_files
]
147 bw
.add_transferred_size(len(data
))
148 for output_fd
in output_fds
:
149 write_fully(output_fd
, data
)
155 for output_file
in output_files
:
158 def init_listening_socket(polysh1
, polysh2
):
159 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
162 host
= socket
.gethostname()
163 port
= s
.getsockname()[1]
164 print '%s%s%s:%s' % (polysh1
, polysh2
, host
, port
)
169 p
= subprocess
.Popen(['tar', 'x'],
170 stdin
=subprocess
.PIPE
,
176 def new_connection(host_port
):
177 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
178 host
, port_str
= host_port
.split(':')
179 s
.connect((host
, int(port_str
)))
180 return s
.makefile('r+b')
183 def do_replicate(destinations
, print_bw
):
184 connections
= [new_connection(host_port
) for host_port
in destinations
]
185 forward(FileReader(sys
.stdin
), connections
, print_bw
)
188 def do_upload(destinations
, print_bw
):
189 untar
= pipe_to_untar()
190 connections
= [new_connection(host_port
) for host_port
in destinations
]
191 forward(Base64Reader(sys
.stdin
), [untar
] + connections
, print_bw
)
194 def do_forward(polysh1
, polysh2
, destinations
, print_bw
):
195 listening_socket
= init_listening_socket(polysh1
, polysh2
)
196 untar
= pipe_to_untar()
197 connections
= [new_connection(host_port
) for host_port
in destinations
]
198 conn
, addr
= listening_socket
.accept()
199 forward(FileReader(conn
), [untar
] + connections
, print_bw
)
204 # pity.py [--print-bw] replicate host:port...
205 # => reads data on stdin and forwards it to the optional list of host:port
207 # pity.py [--print-bw] upload host:port...
208 # => reads base64 on stdin and forwards it to the optional list of host:port
210 # pity.py [--print-bw] forward POLYSH1 POLYSH2 host:port...
211 # => prints listening host:port on stdout prefixed by POLYSH1POLYSH2 and
212 # forwards from this port to the optional list of host:port
215 signal
.signal(signal
.SIGINT
, lambda sig
, frame
: os
.kill(0, signal
.SIGKILL
))
216 if sys
.argv
[1] == '--print-bw':
224 if cmd
== 'replicate':
225 do_replicate(argv
[1:], print_bw
)
226 elif cmd
== 'upload':
227 do_upload(argv
[1:], print_bw
)
228 elif cmd
== 'forward' and len(argv
) >= 3:
229 do_forward(argv
[1], argv
[2], argv
[3:], print_bw
)
231 print 'Unknown command:', argv
238 if __name__
== '__main__':