1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2006, 2007 Guillaume Chazarain <guichaz@yahoo.fr>
28 from gsh
.buffered_dispatcher
import buffered_dispatcher
29 from gsh
.console
import console_output
31 # Either the remote shell is expecting a command, or we are waiting
32 # for the first line (options.print_first), or we already printed the
33 # first line and a command is running
36 STATE_EXPECTING_NEXT_LINE
, \
37 STATE_EXPECTING_LINE
, \
39 STATE_TERMINATED
= range(6)
41 STATE_NAMES
= ['not_started', 'idle', 'expecting_next_line',
42 'expecting_line', 'running', 'terminated']
45 """Iterator over all the remote_dispatcher instances"""
46 for i
in asyncore
.socket_map
.itervalues():
47 if isinstance(i
, remote_dispatcher
):
50 def make_unique_name(name
):
51 display_names
= set([i
.display_name
for i
in all_instances()])
53 if candidate_name
in display_names
:
56 candidate_name
= '%s#%d' % (name
, i
)
57 if candidate_name
not in display_names
:
62 def count_completed_processes():
63 """Return a tuple with the number of ready processes and the total number"""
64 completed_processes
= 0
66 for i
in all_instances():
69 if i
.state
is STATE_IDLE
:
70 completed_processes
+= 1
71 return completed_processes
, total
73 def handle_unfinished_lines():
74 """Typically we print only lines with a '\n', but if some buffers keep an
75 unfinished line for some time we'll add an artificial '\n'"""
76 for r
in all_instances():
84 asyncore
.loop(count
=1,timeout
=0.2)
85 duration
= time
.time() - begin
87 for r
in all_instances():
88 r
.print_unfinished_line()
90 def dispatch_termination_to_all():
91 """Start the termination procedure in all remote shells"""
92 for r
in all_instances():
93 r
.dispatch_termination()
96 """For each remote shell we determine if its terminated by checking if
97 it is in the right state or if it requested termination but will never
98 receive the acknowledgement"""
99 for i
in all_instances():
100 if i
.state
is not STATE_TERMINATED
:
101 if i
.enabled
or not i
.termination
:
105 def format_info(info_list
):
106 """Turn a 2-dimension list of strings into a 1-dimension list of strings
107 with correct spacing"""
108 info_list
.sort(key
=lambda i
:int(i
[1][3:]))
111 nr_columns
= len(info_list
[0])
114 for i
in xrange(nr_columns
):
115 max_lengths
.append(max([len(str(info
[i
])) for info
in info_list
]))
116 for info_id
in xrange(len(info_list
)):
117 info
= info_list
[info_id
]
118 for str_id
in xrange(len(info
)):
119 orig_str
= str(info
[str_id
])
120 indent
= max_lengths
[str_id
] - len(orig_str
)
121 info
[str_id
] = orig_str
+ indent
* ' '
122 info_list
[info_id
] = ' '.join(info
)
124 def disable_crlf(fd
):
125 """We don't want \n to be replaced with \r\n"""
126 attr
= termios
.tcgetattr(fd
)
127 attr
[1] &= ~termios
.ONLCR
128 termios
.tcsetattr(fd
, termios
.TCSANOW
, attr
)
130 class remote_dispatcher(buffered_dispatcher
):
131 """A remote_dispatcher is a ssh process we communicate with"""
133 def __init__(self
, options
, hostname
):
134 self
.pid
, fd
= pty
.fork()
137 self
.launch_ssh(options
, hostname
)
141 self
.hostname
= hostname
142 buffered_dispatcher
.__init
__(self
, fd
)
143 self
.options
= options
145 self
.change_name(hostname
)
146 self
.active
= True # deactived shells are dead forever
147 self
.enabled
= True # shells can be enabled and disabled
148 self
.state
= STATE_NOT_STARTED
149 self
.termination
= None
151 self
.pending_rename
= None
153 self
.dispatch_write(options
.command
+ '\n')
154 self
.dispatch_termination()
155 self
.options
.interactive
= False
157 self
.options
.interactive
= sys
.stdin
.isatty()
159 def launch_ssh(self
, options
, name
):
160 """Launch the ssh command in the child process"""
161 evaluated
= options
.ssh
% {'host': name
}
163 exec_args
= (evaluated
, '-t', name
, 'sh')
164 elif evaluated
== options
.ssh
:
165 exec_args
= (evaluated
, name
)
167 exec_args
= (evaluated
,)
168 if options
.shell_expand_ssh
:
169 shell
= os
.environ
.get('SHELL', '/bin/sh')
170 exec_args
= (shell
, '-c', ' '.join(exec_args
))
171 os
.execlp(exec_args
[0], *exec_args
)
173 def change_state(self
, state
):
174 """Change the state of the remote process, logging the change"""
175 if self
.is_logging(debug
=True):
176 self
.log('state => %s\n' % (STATE_NAMES
[state
]), debug
=True)
179 def disconnect(self
):
180 """We are no more interested in this remote process"""
181 self
.read_buffer
= ''
182 self
.write_buffer
= ''
185 if self
.options
.abort_error
:
186 raise asyncore
.ExitNow
189 """Relaunch and reconnect to this same remote process"""
190 os
.kill(self
.pid
, signal
.SIGKILL
)
192 remote_dispatcher(self
.options
, self
.hostname
)
194 def dispatch_termination(self
):
195 """Start the termination procedure on this remote process, using the
196 same trick as the prompt to hide it"""
197 if not self
.termination
:
198 self
.term1
= '[gsh termination ' + str(random
.random())[2:]
199 self
.term2
= str(random
.random())[2:] + ']'
200 self
.termination
= self
.term1
+ self
.term2
201 self
.dispatch_write('echo "%s""%s"\n' % (self
.term1
, self
.term2
))
202 if self
.state
is not STATE_NOT_STARTED
:
203 self
.change_state(STATE_EXPECTING_NEXT_LINE
)
205 def set_prompt(self
):
206 """The prompt is important because we detect the readyness of a process
207 by waiting for its prompt. The prompt is built in two parts for it not
208 to appear in its building"""
210 self
.dispatch_write('RPS1=\n')
211 self
.dispatch_write('RPROMPT=\n')
212 self
.dispatch_write('TERM=ansi\n')
213 self
.dispatch_write('stty -onlcr\n')
214 prompt1
= '[gsh prompt ' + str(random
.random())[2:]
215 prompt2
= str(random
.random())[2:] + ']'
216 self
.prompt
= prompt1
+ prompt2
217 self
.dispatch_write('PS1="%s""%s\n"\n' % (prompt1
, prompt2
))
220 """We are always interested in reading from active remote processes if
222 return self
.active
and buffered_dispatcher
.readable(self
)
224 def handle_error(self
):
225 """An exception may or may not lead to a disconnection"""
226 if buffered_dispatcher
.handle_error(self
):
227 console_output('Error talking to %s\n ' % (self
.display_name
),
231 def handle_read_fast_case(self
, data
):
232 """If we are in a fast case we'll avoid the long processing of each
234 if self
.prompt
in data
or self
.state
is not STATE_RUNNING
or \
235 self
.termination
and (self
.term1
in data
or self
.term2
in data
) or \
236 self
.pending_rename
and self
.pending_rename
in data
:
240 last_nl
= data
.rfind('\n')
242 # No '\n' in data => slow case
244 self
.read_buffer
= data
[last_nl
+ 1:]
245 data
= data
[:last_nl
]
246 if self
.is_logging():
247 self
.log(data
+ '\n')
248 prefix
= self
.display_name
+ ': '
249 console_output(prefix
+ data
.replace('\n', '\n' + prefix
) + '\n')
252 def handle_read(self
):
253 """We got some output from a remote shell, this is one of the state
257 new_data
= buffered_dispatcher
.handle_read(self
)
258 if self
.is_logging(debug
=True):
259 self
.log('==> ' + new_data
, debug
=True)
260 if self
.handle_read_fast_case(self
.read_buffer
):
262 lf_pos
= new_data
.find('\n')
264 # Optimization: we knew there were no '\n' in the previous read
265 # buffer, so we searched only in the new_data and we offset the
266 # found index by the length of the previous buffer
267 lf_pos
+= len(self
.read_buffer
) - len(new_data
)
268 limit
= buffered_dispatcher
.MAX_BUFFER_SIZE
/ 10
269 if lf_pos
< 0 and len(self
.read_buffer
) > limit
:
270 # A large unfinished line is treated as a complete line
273 # For each line in the buffer
274 line
= self
.read_buffer
[:lf_pos
+ 1]
275 if self
.prompt
in line
:
276 if self
.options
.interactive
:
277 self
.change_state(STATE_IDLE
)
279 self
.change_state(STATE_EXPECTING_NEXT_LINE
)
280 elif self
.termination
and self
.termination
in line
:
281 self
.change_state(STATE_TERMINATED
)
283 elif self
.termination
and self
.term1
in line
and self
.term2
in line
:
284 # Just ignore this line
286 elif self
.pending_rename
and self
.pending_rename
in line
:
287 self
.received_rename(line
)
288 elif self
.state
is STATE_EXPECTING_NEXT_LINE
:
289 self
.change_state(STATE_EXPECTING_LINE
)
290 elif self
.state
is not STATE_NOT_STARTED
:
293 if self
.is_logging():
295 if not self
.options
.print_first
or \
296 self
.state
is STATE_EXPECTING_LINE
:
297 console_output(self
.display_name
+ ': ' + line
)
298 self
.change_state(STATE_RUNNING
)
300 # Go to the next line in the buffer
301 self
.read_buffer
= self
.read_buffer
[lf_pos
+ 1:]
302 lf_pos
= self
.read_buffer
.find('\n')
304 def print_unfinished_line(self
):
305 """The unfinished line stayed long enough in the buffer to be printed"""
306 if self
.state
in (STATE_EXPECTING_LINE
, STATE_RUNNING
):
307 if not self
.options
.print_first
or \
308 self
.state
is STATE_EXPECTING_LINE
:
309 line
= self
.read_buffer
+ '\n'
310 self
.read_buffer
= ''
311 if self
.is_logging():
313 console_output(self
.display_name
+ ': ' + line
)
316 """Do we want to write something?"""
317 return self
.active
and buffered_dispatcher
.writable(self
)
319 def is_logging(self
, debug
=False):
321 return self
.options
.debug
322 return self
.log_path
is not None
324 def log(self
, msg
, debug
=False):
325 """Log some information, either to a file or on the console"""
326 if self
.log_path
is None:
327 if debug
and self
.options
.debug
:
328 state
= STATE_NAMES
[self
.state
]
329 console_output('[dbg] %s[%s]: %s' %
330 (self
.display_name
, state
, msg
))
332 # None != False, that's why we use 'not'
333 if (not debug
) == (not self
.options
.debug
):
334 log
= os
.open(self
.log_path
, os
.O_WRONLY|os
.O_APPEND|os
.O_CREAT
)
339 """Return a list will all information available about this process"""
341 state
= STATE_NAMES
[self
.state
]
345 return [self
.display_name
, 'fd:%d' % (self
.fd
),
346 'r:%d' % (len(self
.read_buffer
)),
347 'w:%d' % (len(self
.write_buffer
)),
348 'active:%s' % (str(self
.active
)),
349 'enabled:%s' % (str(self
.enabled
)), state
]
351 def dispatch_write(self
, buf
):
352 """There is new stuff to write when possible"""
353 if self
.active
and self
.enabled
:
354 if self
.is_logging(debug
=True):
355 self
.log('<== ' + buf
, debug
=True)
356 buffered_dispatcher
.dispatch_write(self
, buf
)
358 def change_name(self
, name
):
359 self
.display_name
= None
360 self
.display_name
= make_unique_name(name
)
361 if self
.options
.log_dir
:
363 filename
= self
.display_name
.replace('/', '_')
364 log_path
= os
.path
.join(self
.options
.log_dir
, filename
)
366 # Rename the previous log
367 os
.rename(self
.log_path
, log_path
)
368 self
.log_path
= log_path
370 def rename(self
, string
):
371 previous_name
= self
.display_name
373 pending_rename1
= str(random
.random())[2:] + ','
374 pending_rename2
= str(random
.random())[2:] + ':'
375 self
.pending_rename
= pending_rename1
+ pending_rename2
376 self
.dispatch_write('echo "%s""%s" %s\n' %
377 (pending_rename1
, pending_rename2
, string
))
378 self
.change_state(STATE_EXPECTING_NEXT_LINE
)
380 self
.change_name(self
.hostname
)
382 def received_rename(self
, line
):
383 new_name
= line
[len(self
.pending_rename
) + 1:-1]
384 self
.change_name(new_name
)
385 self
.pending_rename
= None