Always shell expand the ssh command, so --ssh and --quick-sh are no
[gsh.git] / gsh / remote_dispatcher.py
blobed095c65a66d59531f5bd09a3833744d957ccab6
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2006, 2007 Guillaume Chazarain <guichaz@yahoo.fr>
19 import asyncore
20 import os
21 import pty
22 import random
23 import signal
24 import sys
25 import termios
26 import time
28 from gsh.buffered_dispatcher import buffered_dispatcher
29 from gsh.console import console_output
31 # Either the remote shell is expecting a command or one is already running
32 STATE_NOT_STARTED, \
33 STATE_IDLE, \
34 STATE_EXPECTING_NEXT_LINE, \
35 STATE_RUNNING, \
36 STATE_TERMINATED = range(5)
38 STATE_NAMES = ['not_started', 'idle', 'expecting_next_line',
39 'running', 'terminated']
41 def all_instances():
42 """Iterator over all the remote_dispatcher instances"""
43 for i in asyncore.socket_map.itervalues():
44 if isinstance(i, remote_dispatcher):
45 yield i
47 def make_unique_name(name):
48 display_names = set([i.display_name for i in all_instances()])
49 candidate_name = name
50 if candidate_name in display_names:
51 i = 1
52 while True:
53 candidate_name = '%s#%d' % (name, i)
54 if candidate_name not in display_names:
55 break
56 i += 1
57 return candidate_name
59 def count_completed_processes():
60 """Return a tuple with the number of ready processes and the total number"""
61 completed_processes = 0
62 total = 0
63 for i in all_instances():
64 if i.enabled:
65 total += 1
66 if i.state is STATE_IDLE:
67 completed_processes += 1
68 return completed_processes, total
70 def handle_unfinished_lines():
71 """Typically we print only lines with a '\n', but if some buffers keep an
72 unfinished line for some time we'll add an artificial '\n'"""
73 for r in all_instances():
74 if r.read_buffer and r.read_buffer[0] != chr(27):
75 break
76 else:
77 # No unfinished lines
78 return
80 begin = time.time()
81 asyncore.loop(count=1, timeout=0.2, use_poll=True)
82 duration = time.time() - begin
83 if duration >= 0.15:
84 for r in all_instances():
85 r.print_unfinished_line()
87 def dispatch_termination_to_all():
88 """Start the termination procedure in all remote shells"""
89 for r in all_instances():
90 r.dispatch_termination()
92 def all_terminated():
93 """For each remote shell we determine if its terminated by checking if
94 it is in the right state or if it requested termination but will never
95 receive the acknowledgement"""
96 for i in all_instances():
97 if i.state is not STATE_TERMINATED:
98 if i.enabled or not i.termination:
99 return False
100 return True
102 def format_info(info_list):
103 """Turn a 2-dimension list of strings into a 1-dimension list of strings
104 with correct spacing"""
105 info_list.sort(key=lambda i:int(i[1][3:]))
106 max_lengths = []
107 if info_list:
108 nr_columns = len(info_list[0])
109 else:
110 nr_columns = 0
111 for i in xrange(nr_columns):
112 max_lengths.append(max([len(str(info[i])) for info in info_list]))
113 for info_id in xrange(len(info_list)):
114 info = info_list[info_id]
115 for str_id in xrange(len(info)):
116 orig_str = str(info[str_id])
117 indent = max_lengths[str_id] - len(orig_str)
118 info[str_id] = orig_str + indent * ' '
119 info_list[info_id] = ' '.join(info)
121 class remote_dispatcher(buffered_dispatcher):
122 """A remote_dispatcher is a ssh process we communicate with"""
124 def __init__(self, options, hostname):
125 self.pid, fd = pty.fork()
126 if self.pid == 0:
127 # Child
128 self.launch_ssh(options, hostname)
129 sys.exit(1)
130 # Parent
131 self.hostname = hostname
132 buffered_dispatcher.__init__(self, fd)
133 self.options = options
134 self.log_path = None
135 self.change_name(hostname)
136 self.active = True # deactived shells are dead forever
137 self.enabled = True # shells can be enabled and disabled
138 self.state = STATE_NOT_STARTED
139 self.termination = None
140 self.set_prompt()
141 self.pending_rename = None
142 if options.command:
143 self.dispatch_write(options.command + '\n')
144 self.dispatch_termination()
145 self.options.interactive = False
146 else:
147 self.options.interactive = sys.stdin.isatty()
149 def launch_ssh(self, options, name):
150 """Launch the ssh command in the child process"""
151 evaluated = options.ssh % {'host': name}
152 shell = os.environ.get('SHELL', '/bin/sh')
153 if options.quick_sh:
154 evaluated = '%s -t %s sh' % (evaluated, name)
155 elif evaluated == options.ssh:
156 evaluated = '%s %s' % (evaluated, name)
157 os.execlp(shell, shell, '-c', evaluated)
159 def change_state(self, state):
160 """Change the state of the remote process, logging the change"""
161 if state is not self.state:
162 if self.is_logging(debug=True):
163 self.log('state => %s\n' % (STATE_NAMES[state]), debug=True)
164 self.state = state
166 def disconnect(self):
167 """We are no more interested in this remote process"""
168 self.read_buffer = ''
169 self.write_buffer = ''
170 self.active = False
171 self.enabled = False
172 if self.options.abort_error:
173 raise asyncore.ExitNow
175 def reconnect(self):
176 """Relaunch and reconnect to this same remote process"""
177 os.kill(self.pid, signal.SIGKILL)
178 self.close()
179 remote_dispatcher(self.options, self.hostname)
181 def dispatch_termination(self):
182 """Start the termination procedure on this remote process, using the
183 same trick as the prompt to hide it"""
184 if not self.termination:
185 self.term1 = '[gsh termination ' + str(random.random())[2:]
186 self.term2 = str(random.random())[2:] + ']'
187 self.termination = self.term1 + self.term2
188 self.dispatch_write('echo "%s""%s"\n' % (self.term1, self.term2))
189 if self.state is not STATE_NOT_STARTED:
190 self.change_state(STATE_EXPECTING_NEXT_LINE)
192 def set_prompt(self):
193 """The prompt is important because we detect the readyness of a process
194 by waiting for its prompt. The prompt is built in two parts for it not
195 to appear in its building"""
196 # No right prompt
197 self.dispatch_write('RPS1=\n')
198 self.dispatch_write('RPROMPT=\n')
199 self.dispatch_write('TERM=ansi\n')
200 prompt1 = '[gsh prompt ' + str(random.random())[2:]
201 prompt2 = str(random.random())[2:] + ']'
202 self.prompt = prompt1 + prompt2
203 self.dispatch_write('PS1="%s""%s\n"\n' % (prompt1, prompt2))
205 def readable(self):
206 """We are always interested in reading from active remote processes if
207 the buffer is OK"""
208 return self.active and buffered_dispatcher.readable(self)
210 def handle_error(self):
211 """An exception may or may not lead to a disconnection"""
212 if buffered_dispatcher.handle_error(self):
213 console_output('Error talking to %s\n ' % (self.display_name),
214 sys.stderr)
215 self.disconnect()
217 def handle_read_fast_case(self, data):
218 """If we are in a fast case we'll avoid the long processing of each
219 line"""
220 if self.prompt in data or self.state is not STATE_RUNNING or \
221 self.termination and (self.term1 in data or self.term2 in data) or \
222 self.pending_rename and self.pending_rename in data:
223 # Slow case :-(
224 return False
226 last_nl = data.rfind('\n')
227 if last_nl == -1:
228 # No '\n' in data => slow case
229 return False
230 self.read_buffer = data[last_nl + 1:]
231 data = data[:last_nl]
232 if self.is_logging():
233 self.log(data + '\n')
234 prefix = self.display_name + ': '
235 console_output(prefix + data.replace('\n', '\n' + prefix) + '\n')
236 return True
238 def handle_read(self):
239 """We got some output from a remote shell, this is one of the state
240 machine"""
241 if not self.active:
242 return
243 new_data = buffered_dispatcher.handle_read(self)
244 if self.is_logging(debug=True):
245 self.log('==> ' + new_data, debug=True)
246 if self.handle_read_fast_case(self.read_buffer):
247 return
248 lf_pos = new_data.find('\n')
249 if lf_pos >= 0:
250 # Optimization: we knew there were no '\n' in the previous read
251 # buffer, so we searched only in the new_data and we offset the
252 # found index by the length of the previous buffer
253 lf_pos += len(self.read_buffer) - len(new_data)
254 limit = buffered_dispatcher.MAX_BUFFER_SIZE / 10
255 if lf_pos < 0 and len(self.read_buffer) > limit:
256 # A large unfinished line is treated as a complete line
257 lf_pos = limit
258 while lf_pos >= 0:
259 # For each line in the buffer
260 line = self.read_buffer[:lf_pos + 1]
261 if self.prompt in line:
262 if self.options.interactive:
263 self.change_state(STATE_IDLE)
264 else:
265 self.change_state(STATE_EXPECTING_NEXT_LINE)
266 elif self.termination and self.termination in line:
267 self.change_state(STATE_TERMINATED)
268 self.disconnect()
269 elif self.termination and self.term1 in line and self.term2 in line:
270 # Just ignore this line
271 pass
272 elif self.pending_rename and self.pending_rename in line:
273 self.received_rename(line)
274 elif self.state is STATE_EXPECTING_NEXT_LINE:
275 self.change_state(STATE_RUNNING)
276 elif self.state is not STATE_NOT_STARTED:
277 if line[-1] != '\n':
278 line += '\n'
279 if self.is_logging():
280 self.log(line)
281 console_output(self.display_name + ': ' + line)
282 self.change_state(STATE_RUNNING)
284 # Go to the next line in the buffer
285 self.read_buffer = self.read_buffer[lf_pos + 1:]
286 if self.handle_read_fast_case(self.read_buffer):
287 return
288 lf_pos = self.read_buffer.find('\n')
290 def print_unfinished_line(self):
291 """The unfinished line stayed long enough in the buffer to be printed"""
292 if self.state is STATE_RUNNING:
293 line = self.read_buffer + '\n'
294 self.read_buffer = ''
295 if self.is_logging():
296 self.log(line)
297 console_output(self.display_name + ': ' + line)
299 def writable(self):
300 """Do we want to write something?"""
301 return self.active and buffered_dispatcher.writable(self)
303 def is_logging(self, debug=False):
304 if debug:
305 return self.options.debug
306 return self.log_path is not None
308 def log(self, msg, debug=False):
309 """Log some information, either to a file or on the console"""
310 if self.log_path is None:
311 if debug and self.options.debug:
312 state = STATE_NAMES[self.state]
313 console_output('[dbg] %s[%s]: %s' %
314 (self.display_name, state, msg))
315 else:
316 # None != False, that's why we use 'not'
317 if (not debug) == (not self.options.debug):
318 log = os.open(self.log_path,
319 os.O_WRONLY|os.O_APPEND|os.O_CREAT, 0664)
320 os.write(log, msg)
321 os.close(log)
323 def get_info(self):
324 """Return a list will all information available about this process"""
325 if self.active:
326 state = STATE_NAMES[self.state]
327 else:
328 state = ''
330 return [self.display_name, 'fd:%d' % (self.fd),
331 'r:%d' % (len(self.read_buffer)),
332 'w:%d' % (len(self.write_buffer)),
333 'active:%s' % (str(self.active)),
334 'enabled:%s' % (str(self.enabled)), state]
336 def dispatch_write(self, buf):
337 """There is new stuff to write when possible"""
338 if self.active and self.enabled:
339 if self.is_logging(debug=True):
340 self.log('<== ' + buf, debug=True)
341 buffered_dispatcher.dispatch_write(self, buf)
343 def change_name(self, name):
344 self.display_name = None
345 self.display_name = make_unique_name(name)
346 if self.options.log_dir:
347 # The log file
348 filename = self.display_name.replace('/', '_')
349 log_path = os.path.join(self.options.log_dir, filename)
350 if self.log_path:
351 # Rename the previous log
352 os.rename(self.log_path, log_path)
353 self.log_path = log_path
355 def rename(self, string):
356 previous_name = self.display_name
357 if string:
358 pending_rename1 = str(random.random())[2:] + ','
359 pending_rename2 = str(random.random())[2:] + ':'
360 self.pending_rename = pending_rename1 + pending_rename2
361 self.dispatch_write('echo "%s""%s" %s\n' %
362 (pending_rename1, pending_rename2, string))
363 self.change_state(STATE_EXPECTING_NEXT_LINE)
364 else:
365 self.change_name(self.hostname)
367 def received_rename(self, line):
368 new_name = line[len(self.pending_rename) + 1:-1]
369 self.change_name(new_name)
370 self.pending_rename = None