2 # This program is free software; you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation; either version 2 of the License, or
5 # (at your option) any later version.
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU Library General Public License for more details.
12 # You should have received a copy of the GNU General Public License
13 # along with this program; if not, write to the Free Software
14 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 # See the COPYING file for license information.
18 # Copyright (c) 2007, 2008 Guillaume Chazarain <guichaz@gmail.com>
20 # This file should remain compatible with python-1.5.2
31 from threading
import Event
, Thread
32 from Queue
import Queue
34 # Somewhat protect the stdin, be sure we read what has been sent by gsh, and
35 # not some garbage entered by the user.
36 STDIN_PREFIX
= '!?#%!'
38 UNITS
= ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB']
40 def long_to_str(number
):
47 """Return a string of the form '12.34 MiB' given a size in bytes."""
48 for i
in xrange(len(UNITS
) - 1, 0, -1):
49 base
= 2.0 ** (10 * i
)
51 return '%.2f %s' % ((float(size
) / base
), UNITS
[i
])
52 return long_to_str(size
) + ' ' + UNITS
[0]
55 def rstrip_char(string
, char
):
56 while string
and string
[-1] == char
:
60 class bandwidth_monitor(Thread
):
64 self
.main_done
= Event()
68 def add_transferred_size(self
, size
):
69 self
.size
= self
.size
+ size
77 previous_sampling_time
= time
.time()
78 previous_bandwidth
= 0L
79 while not self
.main_done
.isSet():
80 current_size
= self
.size
81 current_sampling_time
= time
.time()
82 current_bandwidth
= (current_size
- previous_size
) / \
83 (current_sampling_time
- previous_sampling_time
)
84 current_bandwidth
= (2*current_bandwidth
+ previous_bandwidth
) / 3.0
85 if current_bandwidth
< 1:
86 current_bandwidth
= 0L
87 print '%s transferred at %s/s' % (human_unit(current_size
),
88 human_unit(current_bandwidth
))
89 previous_size
= current_size
90 previous_sampling_time
= current_sampling_time
91 previous_bandwidth
= current_bandwidth
92 self
.main_done
.wait(1.0)
93 print 'Done transferring %s bytes (%s)' % (long_to_str(self
.size
),
94 human_unit(self
.size
))
96 def write_fully(fd
, data
):
98 written
= os
.write(fd
, data
)
101 MAX_QUEUE_ITEM_SIZE
= 8 * 1024
103 def forward(input_file
, output_files
, bandwidth
=0):
105 bw
= bandwidth_monitor()
107 input_fd
= input_file
.fileno()
109 for output_file
in output_files
:
110 output_fds
.append(output_file
.fileno())
113 data
= os
.read(input_fd
, MAX_QUEUE_ITEM_SIZE
)
117 bw
.add_transferred_size(len(data
))
118 for output_fd
in output_fds
:
119 write_fully(output_fd
, data
)
125 for output_file
in output_files
:
128 def init_listening_socket(gsh_prefix
):
129 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
132 host
= socket
.gethostname()
133 port
= s
.getsockname()[1]
134 prefix
= string
.join(gsh_prefix
, '')
135 print '%s%s:%s' % (prefix
, host
, port
)
141 c
= os
.read(sys
.stdin
.fileno(), 1)
146 print 'Received input is too large'
150 def get_destination():
151 fd
= sys
.stdin
.fileno()
152 old_settings
= termios
.tcgetattr(fd
)
153 new_settings
= termios
.tcgetattr(fd
)
154 new_settings
[3] = new_settings
[3] & ~
2 # 3:lflags 2:ICANON
155 new_settings
[6][6] = '\000' # Set VMIN to zero for lookahead only
156 termios
.tcsetattr(fd
, 1, new_settings
) # 1:TCSADRAIN
159 start
= string
.find(line
, STDIN_PREFIX
)
161 line
= line
[start
+ len(STDIN_PREFIX
):]
164 termios
.tcsetattr(fd
, 1, old_settings
) # 1:TCSADRAIN
165 split
= string
.split(line
, ':', 1)
168 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
169 s
.connect((host
, port
))
170 return s
.makefile('r+b')
173 return "'" + string
.replace(s
, "'", "'\\''") + "'"
176 split
= os
.path
.split(rstrip_char(path
, '/'))
177 dirname
, basename
= split
182 stdout
, stdin
= popen2
.popen2('tar c %s' % shell_quote(basename
))
184 forward(stdout
, [get_destination()])
186 def do_forward(gsh_prefix
):
187 listening_socket
= init_listening_socket(gsh_prefix
)
188 stdout
, stdin
= popen2
.popen2('tar x')
190 conn
, addr
= listening_socket
.accept()
191 forward(conn
.makefile(), [get_destination(), stdin
])
193 def do_receive(gsh_prefix
):
194 listening_socket
= init_listening_socket(gsh_prefix
)
195 stdout
, stdin
= popen2
.popen2('tar x')
197 conn
, addr
= listening_socket
.accept()
198 # Only the last item in the chain displays the progress information
199 # as it should be the last one to finish.
200 forward(conn
.makefile(), [stdin
], bandwidth
=1)
205 # => reads host:port on stdin
207 # pity.py forward [GSH1...]
208 # => reads host:port on stdin and prints listening host:port on stdout
209 # prefixed by GSH1...
211 # pity.py receive [GSH1...]
212 # => prints listening host:port on stdout prefixed by GSH1...
215 signal
.signal(signal
.SIGINT
, lambda sig
, frame
: os
.kill(0, signal
.SIGKILL
))
218 if cmd
== 'send' and len(sys
.argv
) >= 3:
220 elif cmd
== 'forward' and len(sys
.argv
) >= 2:
221 do_forward(sys
.argv
[2:])
222 elif cmd
== 'receive' and len(sys
.argv
) >= 2:
223 do_receive(sys
.argv
[2:])
225 print 'Unknown command:', sys
.argv
231 if __name__
== '__main__':