setuptools-0.6c5-py2.4.egg is broken wrt RPM building so we provide a snapshot
[gsh.git] / gsh / remote_dispatcher.py
blob0cc57e11551d41f87014908383ba1c305f576baa
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2006, 2007 Guillaume Chazarain <guichaz@yahoo.fr>
19 import asyncore
20 import os
21 import pty
22 import random
23 import signal
24 import sys
25 import termios
26 import time
28 from gsh.buffered_dispatcher import buffered_dispatcher
29 from gsh.console import console_output
31 # Either the remote shell is expecting a command, or we are waiting
32 # for the first line (options.print_first), or we already printed the
33 # first line and a command is running
34 STATE_NOT_STARTED, \
35 STATE_IDLE, \
36 STATE_EXPECTING_NEXT_LINE, \
37 STATE_EXPECTING_LINE, \
38 STATE_RUNNING, \
39 STATE_TERMINATED = range(6)
41 STATE_NAMES = ['not_started', 'idle', 'expecting_next_line',
42 'expecting_line', 'running', 'terminated']
44 def all_instances():
45 """Iterator over all the remote_dispatcher instances"""
46 for i in asyncore.socket_map.itervalues():
47 if isinstance(i, remote_dispatcher):
48 yield i
50 def make_unique_name(name):
51 display_names = set([i.display_name for i in all_instances()])
52 candidate_name = name
53 if candidate_name in display_names:
54 i = 1
55 while True:
56 candidate_name = '%s#%d' % (name, i)
57 if candidate_name not in display_names:
58 break
59 i += 1
60 return candidate_name
62 def count_completed_processes():
63 """Return a tuple with the number of ready processes and the total number"""
64 completed_processes = 0
65 total = 0
66 for i in all_instances():
67 if i.enabled:
68 total += 1
69 if i.state is STATE_IDLE:
70 completed_processes += 1
71 return completed_processes, total
73 def handle_unfinished_lines():
74 """Typically we print only lines with a '\n', but if some buffers keep an
75 unfinished line for some time we'll add an artificial '\n'"""
76 for r in all_instances():
77 if r.read_buffer:
78 break
79 else:
80 # No unfinished lines
81 return
83 begin = time.time()
84 asyncore.loop(count=1,timeout=0.2)
85 duration = time.time() - begin
86 if duration >= 0.15:
87 for r in all_instances():
88 r.print_unfinished_line()
90 def dispatch_termination_to_all():
91 """Start the termination procedure in all remote shells"""
92 for r in all_instances():
93 r.dispatch_termination()
95 def all_terminated():
96 """For each remote shell we determine if its terminated by checking if
97 it is in the right state or if it requested termination but will never
98 receive the acknowledgement"""
99 for i in all_instances():
100 if i.state is not STATE_TERMINATED:
101 if i.enabled or not i.termination:
102 return False
103 return True
105 def format_info(info_list):
106 """Turn a 2-dimension list of strings into a 1-dimension list of strings
107 with correct spacing"""
108 info_list.sort(key=lambda i:int(i[1][3:]))
109 max_lengths = []
110 if info_list:
111 nr_columns = len(info_list[0])
112 else:
113 nr_columns = 0
114 for i in xrange(nr_columns):
115 max_lengths.append(max([len(str(info[i])) for info in info_list]))
116 for info_id in xrange(len(info_list)):
117 info = info_list[info_id]
118 for str_id in xrange(len(info)):
119 orig_str = str(info[str_id])
120 indent = max_lengths[str_id] - len(orig_str)
121 info[str_id] = orig_str + indent * ' '
122 info_list[info_id] = ' '.join(info)
124 def disable_crlf(fd):
125 """We don't want \n to be replaced with \r\n"""
126 attr = termios.tcgetattr(fd)
127 attr[1] &= ~termios.ONLCR
128 termios.tcsetattr(fd, termios.TCSANOW, attr)
130 class remote_dispatcher(buffered_dispatcher):
131 """A remote_dispatcher is a ssh process we communicate with"""
133 def __init__(self, options, hostname):
134 self.pid, fd = pty.fork()
135 if self.pid == 0:
136 # Child
137 self.launch_ssh(options, hostname)
138 sys.exit(1)
139 # Parent
140 disable_crlf(fd)
141 self.hostname = hostname
142 buffered_dispatcher.__init__(self, fd)
143 self.options = options
144 self.log_path = None
145 self.change_name(hostname)
146 self.active = True # deactived shells are dead forever
147 self.enabled = True # shells can be enabled and disabled
148 self.state = STATE_NOT_STARTED
149 self.termination = None
150 self.set_prompt()
151 self.pending_rename = None
152 if options.command:
153 self.dispatch_write(options.command + '\n')
154 self.dispatch_termination()
155 self.options.interactive = False
156 else:
157 self.options.interactive = sys.stdin.isatty()
159 def launch_ssh(self, options, name):
160 """Launch the ssh command in the child process"""
161 evaluated = options.ssh % {'host': name}
162 if options.quick_sh:
163 exec_args = (evaluated, '-t', name, 'sh')
164 elif evaluated == options.ssh:
165 exec_args = (evaluated, name)
166 else:
167 exec_args = (evaluated,)
168 if options.shell_expand_ssh:
169 shell = os.environ.get('SHELL', '/bin/sh')
170 exec_args = (shell, '-c', ' '.join(exec_args))
171 os.execlp(exec_args[0], *exec_args)
173 def change_state(self, state):
174 """Change the state of the remote process, logging the change"""
175 if self.is_logging(debug=True):
176 self.log('state => %s\n' % (STATE_NAMES[state]), debug=True)
177 self.state = state
179 def disconnect(self):
180 """We are no more interested in this remote process"""
181 self.read_buffer = ''
182 self.write_buffer = ''
183 self.active = False
184 self.enabled = False
185 if self.options.abort_error:
186 raise asyncore.ExitNow
188 def reconnect(self):
189 """Relaunch and reconnect to this same remote process"""
190 os.kill(self.pid, signal.SIGKILL)
191 self.close()
192 remote_dispatcher(self.options, self.hostname)
194 def dispatch_termination(self):
195 """Start the termination procedure on this remote process, using the
196 same trick as the prompt to hide it"""
197 if not self.termination:
198 self.term1 = '[gsh termination ' + str(random.random())[2:]
199 self.term2 = str(random.random())[2:] + ']'
200 self.termination = self.term1 + self.term2
201 self.dispatch_write('echo "%s""%s"\n' % (self.term1, self.term2))
202 if self.state is not STATE_NOT_STARTED:
203 self.change_state(STATE_EXPECTING_NEXT_LINE)
205 def set_prompt(self):
206 """The prompt is important because we detect the readyness of a process
207 by waiting for its prompt. The prompt is built in two parts for it not
208 to appear in its building"""
209 # No right prompt
210 self.dispatch_write('RPS1=\n')
211 self.dispatch_write('RPROMPT=\n')
212 self.dispatch_write('TERM=ansi\n')
213 self.dispatch_write('stty -onlcr\n')
214 prompt1 = '[gsh prompt ' + str(random.random())[2:]
215 prompt2 = str(random.random())[2:] + ']'
216 self.prompt = prompt1 + prompt2
217 self.dispatch_write('PS1="%s""%s\n"\n' % (prompt1, prompt2))
219 def readable(self):
220 """We are always interested in reading from active remote processes if
221 the buffer is OK"""
222 return self.active and buffered_dispatcher.readable(self)
224 def handle_error(self):
225 """An exception may or may not lead to a disconnection"""
226 if buffered_dispatcher.handle_error(self):
227 console_output('Error talking to %s\n ' % (self.display_name),
228 sys.stderr)
229 self.disconnect()
231 def handle_read_fast_case(self, data):
232 """If we are in a fast case we'll avoid the long processing of each
233 line"""
234 if self.prompt in data or self.state is not STATE_RUNNING or \
235 self.termination and (self.term1 in data or self.term2 in data) or \
236 self.pending_rename and self.pending_rename in data:
237 # Slow case :-(
238 return False
240 last_nl = data.rfind('\n')
241 if last_nl == -1:
242 # No '\n' in data => slow case
243 return False
244 self.read_buffer = data[last_nl + 1:]
245 data = data[:last_nl]
246 if self.is_logging():
247 self.log(data + '\n')
248 prefix = self.display_name + ': '
249 console_output(prefix + data.replace('\n', '\n' + prefix) + '\n')
250 return True
252 def handle_read(self):
253 """We got some output from a remote shell, this is one of the state
254 machine"""
255 if not self.active:
256 return
257 new_data = buffered_dispatcher.handle_read(self)
258 if self.is_logging(debug=True):
259 self.log('==> ' + new_data, debug=True)
260 if self.handle_read_fast_case(self.read_buffer):
261 return
262 lf_pos = new_data.find('\n')
263 if lf_pos >= 0:
264 # Optimization: we knew there were no '\n' in the previous read
265 # buffer, so we searched only in the new_data and we offset the
266 # found index by the length of the previous buffer
267 lf_pos += len(self.read_buffer) - len(new_data)
268 limit = buffered_dispatcher.MAX_BUFFER_SIZE / 10
269 if lf_pos < 0 and len(self.read_buffer) > limit:
270 # A large unfinished line is treated as a complete line
271 lf_pos = limit
272 while lf_pos >= 0:
273 # For each line in the buffer
274 line = self.read_buffer[:lf_pos + 1]
275 if self.prompt in line:
276 if self.options.interactive:
277 self.change_state(STATE_IDLE)
278 else:
279 self.change_state(STATE_EXPECTING_NEXT_LINE)
280 elif self.termination and self.termination in line:
281 self.change_state(STATE_TERMINATED)
282 self.disconnect()
283 elif self.termination and self.term1 in line and self.term2 in line:
284 # Just ignore this line
285 pass
286 elif self.pending_rename and self.pending_rename in line:
287 self.received_rename(line)
288 elif self.state is STATE_EXPECTING_NEXT_LINE:
289 self.change_state(STATE_EXPECTING_LINE)
290 elif self.state is not STATE_NOT_STARTED:
291 if line[-1] != '\n':
292 line += '\n'
293 if self.is_logging():
294 self.log(line)
295 if not self.options.print_first or \
296 self.state is STATE_EXPECTING_LINE:
297 console_output(self.display_name + ': ' + line)
298 self.change_state(STATE_RUNNING)
300 # Go to the next line in the buffer
301 self.read_buffer = self.read_buffer[lf_pos + 1:]
302 lf_pos = self.read_buffer.find('\n')
304 def print_unfinished_line(self):
305 """The unfinished line stayed long enough in the buffer to be printed"""
306 if self.state in (STATE_EXPECTING_LINE, STATE_RUNNING):
307 if not self.options.print_first or \
308 self.state is STATE_EXPECTING_LINE:
309 line = self.read_buffer + '\n'
310 self.read_buffer = ''
311 if self.is_logging():
312 self.log(line)
313 console_output(self.display_name + ': ' + line)
315 def writable(self):
316 """Do we want to write something?"""
317 return self.active and buffered_dispatcher.writable(self)
319 def is_logging(self, debug=False):
320 if debug:
321 return self.options.debug
322 return self.log_path is not None
324 def log(self, msg, debug=False):
325 """Log some information, either to a file or on the console"""
326 if self.log_path is None:
327 if debug and self.options.debug:
328 state = STATE_NAMES[self.state]
329 console_output('[dbg] %s[%s]: %s' %
330 (self.display_name, state, msg))
331 else:
332 # None != False, that's why we use 'not'
333 if (not debug) == (not self.options.debug):
334 log = os.open(self.log_path, os.O_WRONLY|os.O_APPEND|os.O_CREAT)
335 os.write(log, msg)
336 os.close(log)
338 def get_info(self):
339 """Return a list will all information available about this process"""
340 if self.active:
341 state = STATE_NAMES[self.state]
342 else:
343 state = ''
345 return [self.display_name, 'fd:%d' % (self.fd),
346 'r:%d' % (len(self.read_buffer)),
347 'w:%d' % (len(self.write_buffer)),
348 'active:%s' % (str(self.active)),
349 'enabled:%s' % (str(self.enabled)), state]
351 def dispatch_write(self, buf):
352 """There is new stuff to write when possible"""
353 if self.active and self.enabled:
354 if self.is_logging(debug=True):
355 self.log('<== ' + buf, debug=True)
356 buffered_dispatcher.dispatch_write(self, buf)
358 def change_name(self, name):
359 self.display_name = None
360 self.display_name = make_unique_name(name)
361 if self.options.log_dir:
362 # The log file
363 filename = self.display_name.replace('/', '_')
364 log_path = os.path.join(self.options.log_dir, filename)
365 if self.log_path:
366 # Rename the previous log
367 os.rename(self.log_path, log_path)
368 self.log_path = log_path
370 def rename(self, string):
371 previous_name = self.display_name
372 if string:
373 pending_rename1 = str(random.random())[2:] + ','
374 pending_rename2 = str(random.random())[2:] + ':'
375 self.pending_rename = pending_rename1 + pending_rename2
376 self.dispatch_write('echo "%s""%s" %s\n' %
377 (pending_rename1, pending_rename2, string))
378 self.change_state(STATE_EXPECTING_NEXT_LINE)
379 else:
380 self.change_name(self.hostname)
382 def received_rename(self, line):
383 new_name = line[len(self.pending_rename) + 1:-1]
384 self.change_name(new_name)
385 self.pending_rename = None