1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2006, 2007 Guillaume Chazarain <guichaz@yahoo.fr>
28 from gsh
.buffered_dispatcher
import buffered_dispatcher
29 from gsh
.console
import console_output
31 # Either the remote shell is expecting a command or one is already running
34 STATE_EXPECTING_NEXT_LINE
, \
36 STATE_TERMINATED
= range(5)
38 STATE_NAMES
= ['not_started', 'idle', 'expecting_next_line',
39 'running', 'terminated']
42 """Iterator over all the remote_dispatcher instances"""
43 for i
in asyncore
.socket_map
.itervalues():
44 if isinstance(i
, remote_dispatcher
):
47 def make_unique_name(name
):
48 display_names
= set([i
.display_name
for i
in all_instances()])
50 if candidate_name
in display_names
:
53 candidate_name
= '%s#%d' % (name
, i
)
54 if candidate_name
not in display_names
:
59 def count_completed_processes():
60 """Return a tuple with the number of ready processes and the total number"""
61 completed_processes
= 0
63 for i
in all_instances():
66 if i
.state
is STATE_IDLE
:
67 completed_processes
+= 1
68 return completed_processes
, total
70 def handle_unfinished_lines():
71 """Typically we print only lines with a '\n', but if some buffers keep an
72 unfinished line for some time we'll add an artificial '\n'"""
73 for r
in all_instances():
74 if r
.read_buffer
and r
.read_buffer
[0] != chr(27):
81 asyncore
.loop(count
=1, timeout
=0.2, use_poll
=True)
82 duration
= time
.time() - begin
84 for r
in all_instances():
85 r
.print_unfinished_line()
87 def dispatch_termination_to_all():
88 """Start the termination procedure in all remote shells"""
89 for r
in all_instances():
90 r
.dispatch_termination()
93 """For each remote shell we determine if its terminated by checking if
94 it is in the right state or if it requested termination but will never
95 receive the acknowledgement"""
96 for i
in all_instances():
97 if i
.state
is not STATE_TERMINATED
:
98 if i
.enabled
or not i
.termination
:
102 def format_info(info_list
):
103 """Turn a 2-dimension list of strings into a 1-dimension list of strings
104 with correct spacing"""
105 info_list
.sort(key
=lambda i
:int(i
[1][3:]))
108 nr_columns
= len(info_list
[0])
111 for i
in xrange(nr_columns
):
112 max_lengths
.append(max([len(str(info
[i
])) for info
in info_list
]))
113 for info_id
in xrange(len(info_list
)):
114 info
= info_list
[info_id
]
115 for str_id
in xrange(len(info
)):
116 orig_str
= str(info
[str_id
])
117 indent
= max_lengths
[str_id
] - len(orig_str
)
118 info
[str_id
] = orig_str
+ indent
* ' '
119 info_list
[info_id
] = ' '.join(info
)
121 class remote_dispatcher(buffered_dispatcher
):
122 """A remote_dispatcher is a ssh process we communicate with"""
124 def __init__(self
, options
, hostname
):
125 self
.pid
, fd
= pty
.fork()
128 self
.launch_ssh(options
, hostname
)
131 self
.hostname
= hostname
132 buffered_dispatcher
.__init
__(self
, fd
)
133 self
.options
= options
135 self
.change_name(hostname
)
136 self
.active
= True # deactived shells are dead forever
137 self
.enabled
= True # shells can be enabled and disabled
138 self
.state
= STATE_NOT_STARTED
139 self
.termination
= None
141 self
.pending_rename
= None
143 self
.dispatch_write(options
.command
+ '\n')
144 self
.dispatch_termination()
145 self
.options
.interactive
= False
147 self
.options
.interactive
= sys
.stdin
.isatty()
149 def launch_ssh(self
, options
, name
):
150 """Launch the ssh command in the child process"""
151 evaluated
= options
.ssh
% {'host': name
}
152 shell
= os
.environ
.get('SHELL', '/bin/sh')
154 evaluated
= '%s -t %s sh' % (evaluated
, name
)
155 elif evaluated
== options
.ssh
:
156 evaluated
= '%s %s' % (evaluated
, name
)
157 os
.execlp(shell
, shell
, '-c', evaluated
)
159 def change_state(self
, state
):
160 """Change the state of the remote process, logging the change"""
161 if state
is not self
.state
:
162 if self
.is_logging(debug
=True):
163 self
.log('state => %s\n' % (STATE_NAMES
[state
]), debug
=True)
166 def disconnect(self
):
167 """We are no more interested in this remote process"""
168 self
.read_buffer
= ''
169 self
.write_buffer
= ''
172 if self
.options
.abort_error
:
173 raise asyncore
.ExitNow
176 """Relaunch and reconnect to this same remote process"""
177 os
.kill(self
.pid
, signal
.SIGKILL
)
179 remote_dispatcher(self
.options
, self
.hostname
)
181 def dispatch_termination(self
):
182 """Start the termination procedure on this remote process, using the
183 same trick as the prompt to hide it"""
184 if not self
.termination
:
185 self
.term1
= '[gsh termination ' + str(random
.random())[2:]
186 self
.term2
= str(random
.random())[2:] + ']'
187 self
.termination
= self
.term1
+ self
.term2
188 self
.dispatch_write('echo "%s""%s"\n' % (self
.term1
, self
.term2
))
189 if self
.state
is not STATE_NOT_STARTED
:
190 self
.change_state(STATE_EXPECTING_NEXT_LINE
)
192 def set_prompt(self
):
193 """The prompt is important because we detect the readyness of a process
194 by waiting for its prompt. The prompt is built in two parts for it not
195 to appear in its building"""
197 self
.dispatch_write('RPS1=\n')
198 self
.dispatch_write('RPROMPT=\n')
199 self
.dispatch_write('TERM=ansi\n')
200 prompt1
= '[gsh prompt ' + str(random
.random())[2:]
201 prompt2
= str(random
.random())[2:] + ']'
202 self
.prompt
= prompt1
+ prompt2
203 self
.dispatch_write('PS1="%s""%s\n"\n' % (prompt1
, prompt2
))
206 """We are always interested in reading from active remote processes if
208 return self
.active
and buffered_dispatcher
.readable(self
)
210 def handle_error(self
):
211 """An exception may or may not lead to a disconnection"""
212 if buffered_dispatcher
.handle_error(self
):
213 console_output('Error talking to %s\n ' % (self
.display_name
),
217 def handle_read_fast_case(self
, data
):
218 """If we are in a fast case we'll avoid the long processing of each
220 if self
.prompt
in data
or self
.state
is not STATE_RUNNING
or \
221 self
.termination
and (self
.term1
in data
or self
.term2
in data
) or \
222 self
.pending_rename
and self
.pending_rename
in data
:
226 last_nl
= data
.rfind('\n')
228 # No '\n' in data => slow case
230 self
.read_buffer
= data
[last_nl
+ 1:]
231 data
= data
[:last_nl
]
232 if self
.is_logging():
233 self
.log(data
+ '\n')
234 prefix
= self
.display_name
+ ': '
235 console_output(prefix
+ data
.replace('\n', '\n' + prefix
) + '\n')
238 def handle_read(self
):
239 """We got some output from a remote shell, this is one of the state
243 new_data
= buffered_dispatcher
.handle_read(self
)
244 if self
.is_logging(debug
=True):
245 self
.log('==> ' + new_data
, debug
=True)
246 if self
.handle_read_fast_case(self
.read_buffer
):
248 lf_pos
= new_data
.find('\n')
250 # Optimization: we knew there were no '\n' in the previous read
251 # buffer, so we searched only in the new_data and we offset the
252 # found index by the length of the previous buffer
253 lf_pos
+= len(self
.read_buffer
) - len(new_data
)
254 limit
= buffered_dispatcher
.MAX_BUFFER_SIZE
/ 10
255 if lf_pos
< 0 and len(self
.read_buffer
) > limit
:
256 # A large unfinished line is treated as a complete line
259 # For each line in the buffer
260 line
= self
.read_buffer
[:lf_pos
+ 1]
261 if self
.prompt
in line
:
262 if self
.options
.interactive
:
263 self
.change_state(STATE_IDLE
)
265 self
.change_state(STATE_EXPECTING_NEXT_LINE
)
266 elif self
.termination
and self
.termination
in line
:
267 self
.change_state(STATE_TERMINATED
)
269 elif self
.termination
and self
.term1
in line
and self
.term2
in line
:
270 # Just ignore this line
272 elif self
.pending_rename
and self
.pending_rename
in line
:
273 self
.received_rename(line
)
274 elif self
.state
is STATE_EXPECTING_NEXT_LINE
:
275 self
.change_state(STATE_RUNNING
)
276 elif self
.state
is not STATE_NOT_STARTED
:
279 if self
.is_logging():
281 console_output(self
.display_name
+ ': ' + line
)
282 self
.change_state(STATE_RUNNING
)
284 # Go to the next line in the buffer
285 self
.read_buffer
= self
.read_buffer
[lf_pos
+ 1:]
286 if self
.handle_read_fast_case(self
.read_buffer
):
288 lf_pos
= self
.read_buffer
.find('\n')
290 def print_unfinished_line(self
):
291 """The unfinished line stayed long enough in the buffer to be printed"""
292 if self
.state
is STATE_RUNNING
:
293 line
= self
.read_buffer
+ '\n'
294 self
.read_buffer
= ''
295 if self
.is_logging():
297 console_output(self
.display_name
+ ': ' + line
)
300 """Do we want to write something?"""
301 return self
.active
and buffered_dispatcher
.writable(self
)
303 def is_logging(self
, debug
=False):
305 return self
.options
.debug
306 return self
.log_path
is not None
308 def log(self
, msg
, debug
=False):
309 """Log some information, either to a file or on the console"""
310 if self
.log_path
is None:
311 if debug
and self
.options
.debug
:
312 state
= STATE_NAMES
[self
.state
]
313 console_output('[dbg] %s[%s]: %s' %
314 (self
.display_name
, state
, msg
))
316 # None != False, that's why we use 'not'
317 if (not debug
) == (not self
.options
.debug
):
318 log
= os
.open(self
.log_path
,
319 os
.O_WRONLY|os
.O_APPEND|os
.O_CREAT
, 0664)
324 """Return a list will all information available about this process"""
326 state
= STATE_NAMES
[self
.state
]
330 return [self
.display_name
, 'fd:%d' % (self
.fd
),
331 'r:%d' % (len(self
.read_buffer
)),
332 'w:%d' % (len(self
.write_buffer
)),
333 'active:%s' % (str(self
.active
)),
334 'enabled:%s' % (str(self
.enabled
)), state
]
336 def dispatch_write(self
, buf
):
337 """There is new stuff to write when possible"""
338 if self
.active
and self
.enabled
:
339 if self
.is_logging(debug
=True):
340 self
.log('<== ' + buf
, debug
=True)
341 buffered_dispatcher
.dispatch_write(self
, buf
)
343 def change_name(self
, name
):
344 self
.display_name
= None
345 self
.display_name
= make_unique_name(name
)
346 if self
.options
.log_dir
:
348 filename
= self
.display_name
.replace('/', '_')
349 log_path
= os
.path
.join(self
.options
.log_dir
, filename
)
351 # Rename the previous log
352 os
.rename(self
.log_path
, log_path
)
353 self
.log_path
= log_path
355 def rename(self
, string
):
356 previous_name
= self
.display_name
358 pending_rename1
= str(random
.random())[2:] + ','
359 pending_rename2
= str(random
.random())[2:] + ':'
360 self
.pending_rename
= pending_rename1
+ pending_rename2
361 self
.dispatch_write('echo "%s""%s" %s\n' %
362 (pending_rename1
, pending_rename2
, string
))
363 self
.change_state(STATE_EXPECTING_NEXT_LINE
)
365 self
.change_name(self
.hostname
)
367 def received_rename(self
, line
):
368 new_name
= line
[len(self
.pending_rename
) + 1:-1]
369 self
.change_name(new_name
)
370 self
.pending_rename
= None