2 # This program is free software; you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation; either version 2 of the License, or
5 # (at your option) any later version.
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU Library General Public License for more details.
12 # You should have received a copy of the GNU General Public License
13 # along with this program; if not, write to the Free Software
14 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 # See the COPYING file for license information.
18 # Copyright (c) 2007, 2008 Guillaume Chazarain <guichaz@gmail.com>
20 # This file should remain compatible with python-1.5.2
31 from threading
import Event
, Thread
32 from Queue
import Queue
34 # Somewhat protect the stdin, be sure we read what has been sent by gsh, and
35 # not some garbage entered by the user.
36 STDIN_PREFIX
= '!?#%!'
38 UNITS
= ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB']
41 """Return a string of the form '12.34 MiB' given a size in bytes."""
42 for i
in xrange(len(UNITS
) - 1, 0, -1):
43 base
= 2.0 ** (10 * i
)
45 return '%.2f %s' % ((float(size
) / base
), UNITS
[i
])
46 return str(size
) + ' ' + UNITS
[0]
49 def rstrip_char(string
, char
):
50 while string
and string
[-1] == char
:
54 class bandwidth_monitor(Thread
):
58 self
.main_done
= Event()
62 def add_transferred_size(self
, size
):
63 self
.size
= self
.size
+ size
71 previous_sampling_time
= time
.time()
72 previous_bandwidth
= 0
73 while not self
.main_done
.isSet():
74 current_size
= self
.size
75 current_sampling_time
= time
.time()
76 current_bandwidth
= (current_size
- previous_size
) / \
77 (current_sampling_time
- previous_sampling_time
)
78 current_bandwidth
= (2*current_bandwidth
+ previous_bandwidth
) / 3.0
79 if current_bandwidth
< 1:
81 print '%s transferred at %s/s' % (human_unit(current_size
),
82 human_unit(current_bandwidth
))
83 previous_size
= current_size
84 previous_sampling_time
= current_sampling_time
85 previous_bandwidth
= current_bandwidth
86 self
.main_done
.wait(1.0)
87 print 'Done transferring %d bytes' % (self
.size
)
89 def write_fully(fd
, data
):
91 written
= os
.write(fd
, data
)
94 MAX_QUEUE_SIZE
= 256 * 1024 * 1024
95 MAX_QUEUE_ITEM_SIZE
= 8 * 1024
97 def forward(input_file
, output_files
, bandwidth
=0):
99 bw
= bandwidth_monitor()
101 input_fd
= input_file
.fileno()
103 for output_file
in output_files
:
104 output_fds
.append(output_file
.fileno())
107 data
= os
.read(input_fd
, MAX_QUEUE_ITEM_SIZE
)
111 bw
.add_transferred_size(len(data
))
112 for output_fd
in output_fds
:
113 write_fully(output_fd
, data
)
119 for output_file
in output_files
:
122 def init_listening_socket(gsh_prefix
):
123 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
126 host
= socket
.gethostname()
127 port
= s
.getsockname()[1]
128 prefix
= string
.join(gsh_prefix
, '')
129 print '%s%s:%s' % (prefix
, host
, port
)
135 c
= os
.read(sys
.stdin
.fileno(), 1)
140 print 'Received input is too large'
144 def get_destination():
145 fd
= sys
.stdin
.fileno()
146 old_settings
= termios
.tcgetattr(fd
)
147 new_settings
= termios
.tcgetattr(fd
)
148 new_settings
[3] = new_settings
[3] & ~
2 # 3:lflags 2:ICANON
149 new_settings
[6][6] = '\000' # Set VMIN to zero for lookahead only
150 termios
.tcsetattr(fd
, 1, new_settings
) # 1:TCSADRAIN
153 start
= string
.find(line
, STDIN_PREFIX
)
155 line
= line
[start
+ len(STDIN_PREFIX
):]
158 termios
.tcsetattr(fd
, 1, old_settings
) # 1:TCSADRAIN
159 split
= string
.split(line
, ':', 1)
162 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
163 s
.connect((host
, port
))
164 return s
.makefile('r+b')
167 return "'" + string
.replace(s
, "'", "'\\''") + "'"
170 split
= os
.path
.split(rstrip_char(path
, '/'))
171 dirname
, basename
= split
176 stdout
, stdin
= popen2
.popen2('tar c %s' % shell_quote(basename
))
178 forward(stdout
, [get_destination()])
180 def do_forward(gsh_prefix
):
181 listening_socket
= init_listening_socket(gsh_prefix
)
182 stdout
, stdin
= popen2
.popen2('tar x')
184 conn
, addr
= listening_socket
.accept()
185 forward(conn
.makefile(), [get_destination(), stdin
])
187 def do_receive(gsh_prefix
):
188 listening_socket
= init_listening_socket(gsh_prefix
)
189 stdout
, stdin
= popen2
.popen2('tar x')
191 conn
, addr
= listening_socket
.accept()
192 # Only the last item in the chain displays the progress information
193 # as it should be the last one to finish.
194 forward(conn
.makefile(), [stdin
], bandwidth
=1)
199 # => reads host:port on stdin
201 # pity.py forward [GSH1...]
202 # => reads host:port on stdin and prints listening host:port on stdout
203 # prefixed by GSH1...
205 # pity.py receive [GSH1...]
206 # => prints listening host:port on stdout prefixed by GSH1...
209 signal
.signal(signal
.SIGINT
, lambda sig
, frame
: os
.kill(0, signal
.SIGKILL
))
212 if cmd
== 'send' and len(sys
.argv
) >= 3:
214 elif cmd
== 'forward' and len(sys
.argv
) >= 2:
215 do_forward(sys
.argv
[2:])
216 elif cmd
== 'receive' and len(sys
.argv
) >= 2:
217 do_receive(sys
.argv
[2:])
219 print 'Unknown command:', sys
.argv
225 if __name__
== '__main__':