Set the same virtual terminal size to all remote shells, accounting for the
[gsh.git] / gsh / stdin.py
blob9325a9b3d2e50f7b498b531cc1d1b5543dcc4192
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2006, 2007 Guillaume Chazarain <guichaz@yahoo.fr>
19 import asyncore
20 import atexit
21 import errno
22 import fcntl
23 import os
24 import readline # Just to say we want to use it with raw_input
25 import socket
26 import sys
27 from threading import Thread, Event, Lock
29 from gsh import remote_dispatcher
30 from gsh.console import console_output
32 # Handling of stdin is certainly the most complex part of gsh
34 stdin_fcntl_flags = fcntl.fcntl(0, fcntl.F_GETFL, 0)
36 def set_stdin_blocking(blocking):
37 """We set O_NONBLOCK on stdin only when we read from it"""
38 if blocking:
39 flags = stdin_fcntl_flags & ~os.O_NONBLOCK
40 else:
41 flags = stdin_fcntl_flags | os.O_NONBLOCK
42 fcntl.fcntl(0, fcntl.F_SETFL, flags)
44 class stdin_dispatcher(asyncore.file_dispatcher):
45 """The stdin reader in the main thread => no fancy editing"""
46 def __init__(self):
47 asyncore.file_dispatcher.__init__(self, 0)
48 self.is_readable = True
49 atexit.register(fcntl.fcntl, 0, fcntl.F_SETFL, stdin_fcntl_flags)
50 set_stdin_blocking(True)
52 def readable(self):
53 """We set it to be readable only when the stdin thread is not in
54 raw_input()"""
55 return self.is_readable
57 def handle_expt(self):
58 # Emulate the select with poll as in: asyncore.loop(use_poll=True)
59 self.handle_read()
61 def writable(self):
62 """We don't write to stdin"""
63 return False
65 def handle_close(self):
66 """Ctrl-D was received but the remote processes were not ready"""
67 remote_dispatcher.dispatch_termination_to_all()
69 def handle_read(self):
70 """Some data can be read on stdin"""
71 while True:
72 try:
73 set_stdin_blocking(False)
74 try:
75 data = self.recv(4096)
76 finally:
77 set_stdin_blocking(True)
78 except OSError, e:
79 if e.errno == errno.EAGAIN:
80 # End of available data
81 break
82 else:
83 raise
84 else:
85 if data:
86 # Handle the just read data
87 the_stdin_thread.input_buffer.add(data)
88 process_input_buffer()
89 else:
90 # Closed?
91 self.is_readable = False
92 break
94 class input_buffer(object):
95 """The shared input buffer between the main thread and the stdin thread"""
96 def __init__(self):
97 self.lock = Lock()
98 self.buf = ''
100 def add(self, data):
101 """Add data to the buffer"""
102 self.lock.acquire()
103 try:
104 self.buf += data
105 finally:
106 self.lock.release()
108 def get(self):
109 """Get the content of the buffer"""
110 self.lock.acquire()
111 try:
112 data = self.buf
113 if data:
114 self.buf = ''
115 return data
116 finally:
117 self.lock.release()
119 def process_input_buffer():
120 """Send the content of the input buffer to all remote processes, this must
121 be called in the main thread"""
122 data = the_stdin_thread.input_buffer.get()
123 if not data:
124 return
125 for r in remote_dispatcher.all_instances():
126 try:
127 r.dispatch_write(data)
128 except Exception, msg:
129 console_output('%s for %s, disconnecting\n' % (msg, r.display_name),
130 output=sys.stderr)
131 r.disconnect()
132 else:
133 if r.is_logging():
134 r.log('<== ' + data)
135 if r.enabled and r.state is remote_dispatcher.STATE_IDLE:
136 r.change_state(remote_dispatcher.STATE_EXPECTING_NEXT_LINE)
138 # The stdin thread uses a synchronous (with ACK) socket to communicate with the
139 # main thread, which is most of the time waiting in the poll() loop.
140 # Socket character protocol:
141 # s: entering in raw_input, the main loop should not read stdin
142 # e: leaving raw_input, the main loop can read stdin
143 # q: Ctrl-D was pressed, exiting
144 # d: there is new data to send
145 # A: ACK, same reply for every message, communications are synchronous, so the
146 # stdin thread sends a character to the socket, the main thread processes it,
147 # sends the ACK, and the stdin thread can go on.
149 class socket_notification_reader(asyncore.dispatcher):
150 """The socket reader in the main thread"""
151 def __init__(self):
152 asyncore.dispatcher.__init__(self, the_stdin_thread.socket_read)
154 def _do(self, c):
155 if c in ('s', 'e'):
156 the_stdin_dispatcher.is_readable = c == 'e'
157 console_output('\r')
158 elif c == 'q':
159 remote_dispatcher.dispatch_termination_to_all()
160 elif c == 'd':
161 process_input_buffer()
162 else:
163 raise Exception, 'Unknown code: %s' % (c)
165 def handle_read(self):
166 """Handle all the available character commands in the socket"""
167 while True:
168 try:
169 c = self.recv(1)
170 except socket.error, why:
171 assert why[0] == errno.EWOULDBLOCK
172 return
173 else:
174 self._do(c)
175 self.socket.setblocking(True)
176 self.send('A')
177 self.socket.setblocking(False)
179 def writable(self):
180 """Our writes are blocking"""
181 return False
183 # All the words that have been typed in gsh. Used by the completion mechanism.
184 history_words = set()
186 def complete(text, state):
187 """On tab press, return the next possible completion"""
188 l = len(text)
189 matches = [w for w in history_words if len(w) > l and w.startswith(text)]
190 if state <= len(matches):
191 return matches[state]
193 def write_main_socket(c):
194 """Synchronous write to the main socket, wait for ACK"""
195 the_stdin_thread.socket_write.send(c)
196 if True or c != 'e':
197 the_stdin_thread.socket_write.recv(1)
199 class stdin_thread(Thread):
200 """The stdin thread, used to call raw_input()"""
201 def __init__(self):
202 Thread.__init__(self, name='stdin thread')
204 @staticmethod
205 def activate(interactive):
206 """Activate the thread at initialization time"""
207 the_stdin_thread.ready_event = Event()
208 the_stdin_thread.input_buffer = input_buffer()
209 if interactive:
210 the_stdin_thread.interrupted_event = Event()
211 s1, s2 = socket.socketpair()
212 the_stdin_thread.socket_read, the_stdin_thread.socket_write = s1, s2
213 the_stdin_thread.wants_control_shell = False
214 the_stdin_thread.setDaemon(True)
215 the_stdin_thread.start()
216 socket_notification_reader()
217 else:
218 the_stdin_thread.ready_event.set()
220 def run(self):
221 while True:
222 while True:
223 self.ready_event.wait()
224 nr, total = remote_dispatcher.count_completed_processes()
225 if nr == total:
226 break
227 # The remote processes are ready, the thread can call raw_input
228 self.interrupted_event.clear()
229 try:
230 try:
231 write_main_socket('s')
232 readline.set_completer(complete)
233 readline.parse_and_bind('tab: complete')
234 readline.set_completer_delims(' \t\n')
235 cmd = raw_input('ready (%d)> ' % (nr))
236 if self.wants_control_shell:
237 # This seems to be needed if Ctrl-C is hit when some
238 # text is in the line buffer
239 raise EOFError
240 if len(history_words) < 10000:
241 words = [w + ' ' for w in cmd.split() if len(w) > 1]
242 history_words.update(words)
243 finally:
244 if not self.wants_control_shell:
245 write_main_socket('e')
246 except EOFError:
247 if self.wants_control_shell:
248 self.ready_event.clear()
249 # Ok, we are no more in raw_input(), tell it to the
250 # main thread
251 self.interrupted_event.set()
252 else:
253 write_main_socket('q')
254 return
255 else:
256 self.ready_event.clear()
257 self.input_buffer.add(cmd + '\n')
258 write_main_socket('d')
260 the_stdin_thread = stdin_thread()
261 the_stdin_dispatcher = stdin_dispatcher()