Added WatchedFileHandler (based on SF patch #1598415)
[python.git] / Lib / asyncore.py
blob886c84545b100a6b83fb628431e9cc5967064f06
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import select
50 import socket
51 import sys
52 import time
54 import os
55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
56 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode
58 try:
59 socket_map
60 except NameError:
61 socket_map = {}
63 class ExitNow(Exception):
64 pass
66 def read(obj):
67 try:
68 obj.handle_read_event()
69 except ExitNow:
70 raise
71 except:
72 obj.handle_error()
74 def write(obj):
75 try:
76 obj.handle_write_event()
77 except ExitNow:
78 raise
79 except:
80 obj.handle_error()
82 def _exception (obj):
83 try:
84 obj.handle_expt_event()
85 except ExitNow:
86 raise
87 except:
88 obj.handle_error()
90 def readwrite(obj, flags):
91 try:
92 if flags & (select.POLLIN | select.POLLPRI):
93 obj.handle_read_event()
94 if flags & select.POLLOUT:
95 obj.handle_write_event()
96 if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
97 obj.handle_expt_event()
98 except ExitNow:
99 raise
100 except:
101 obj.handle_error()
103 def poll(timeout=0.0, map=None):
104 if map is None:
105 map = socket_map
106 if map:
107 r = []; w = []; e = []
108 for fd, obj in map.items():
109 is_r = obj.readable()
110 is_w = obj.writable()
111 if is_r:
112 r.append(fd)
113 if is_w:
114 w.append(fd)
115 if is_r or is_w:
116 e.append(fd)
117 if [] == r == w == e:
118 time.sleep(timeout)
119 else:
120 try:
121 r, w, e = select.select(r, w, e, timeout)
122 except select.error, err:
123 if err[0] != EINTR:
124 raise
125 else:
126 return
128 for fd in r:
129 obj = map.get(fd)
130 if obj is None:
131 continue
132 read(obj)
134 for fd in w:
135 obj = map.get(fd)
136 if obj is None:
137 continue
138 write(obj)
140 for fd in e:
141 obj = map.get(fd)
142 if obj is None:
143 continue
144 _exception(obj)
146 def poll2(timeout=0.0, map=None):
147 # Use the poll() support added to the select module in Python 2.0
148 if map is None:
149 map = socket_map
150 if timeout is not None:
151 # timeout is in milliseconds
152 timeout = int(timeout*1000)
153 pollster = select.poll()
154 if map:
155 for fd, obj in map.items():
156 flags = 0
157 if obj.readable():
158 flags |= select.POLLIN | select.POLLPRI
159 if obj.writable():
160 flags |= select.POLLOUT
161 if flags:
162 # Only check for exceptions if object was either readable
163 # or writable.
164 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
165 pollster.register(fd, flags)
166 try:
167 r = pollster.poll(timeout)
168 except select.error, err:
169 if err[0] != EINTR:
170 raise
171 r = []
172 for fd, flags in r:
173 obj = map.get(fd)
174 if obj is None:
175 continue
176 readwrite(obj, flags)
178 poll3 = poll2 # Alias for backward compatibility
180 def loop(timeout=30.0, use_poll=False, map=None, count=None):
181 if map is None:
182 map = socket_map
184 if use_poll and hasattr(select, 'poll'):
185 poll_fun = poll2
186 else:
187 poll_fun = poll
189 if count is None:
190 while map:
191 poll_fun(timeout, map)
193 else:
194 while map and count > 0:
195 poll_fun(timeout, map)
196 count = count - 1
198 class dispatcher:
200 debug = False
201 connected = False
202 accepting = False
203 closing = False
204 addr = None
206 def __init__(self, sock=None, map=None):
207 if map is None:
208 self._map = socket_map
209 else:
210 self._map = map
212 if sock:
213 self.set_socket(sock, map)
214 # I think it should inherit this anyway
215 self.socket.setblocking(0)
216 self.connected = True
217 # XXX Does the constructor require that the socket passed
218 # be connected?
219 try:
220 self.addr = sock.getpeername()
221 except socket.error:
222 # The addr isn't crucial
223 pass
224 else:
225 self.socket = None
227 def __repr__(self):
228 status = [self.__class__.__module__+"."+self.__class__.__name__]
229 if self.accepting and self.addr:
230 status.append('listening')
231 elif self.connected:
232 status.append('connected')
233 if self.addr is not None:
234 try:
235 status.append('%s:%d' % self.addr)
236 except TypeError:
237 status.append(repr(self.addr))
238 return '<%s at %#x>' % (' '.join(status), id(self))
240 def add_channel(self, map=None):
241 #self.log_info('adding channel %s' % self)
242 if map is None:
243 map = self._map
244 map[self._fileno] = self
246 def del_channel(self, map=None):
247 fd = self._fileno
248 if map is None:
249 map = self._map
250 if map.has_key(fd):
251 #self.log_info('closing channel %d:%s' % (fd, self))
252 del map[fd]
253 self._fileno = None
255 def create_socket(self, family, type):
256 self.family_and_type = family, type
257 self.socket = socket.socket(family, type)
258 self.socket.setblocking(0)
259 self._fileno = self.socket.fileno()
260 self.add_channel()
262 def set_socket(self, sock, map=None):
263 self.socket = sock
264 ## self.__dict__['socket'] = sock
265 self._fileno = sock.fileno()
266 self.add_channel(map)
268 def set_reuse_addr(self):
269 # try to re-use a server port if possible
270 try:
271 self.socket.setsockopt(
272 socket.SOL_SOCKET, socket.SO_REUSEADDR,
273 self.socket.getsockopt(socket.SOL_SOCKET,
274 socket.SO_REUSEADDR) | 1
276 except socket.error:
277 pass
279 # ==================================================
280 # predicates for select()
281 # these are used as filters for the lists of sockets
282 # to pass to select().
283 # ==================================================
285 def readable(self):
286 return True
288 def writable(self):
289 return True
291 # ==================================================
292 # socket object methods.
293 # ==================================================
295 def listen(self, num):
296 self.accepting = True
297 if os.name == 'nt' and num > 5:
298 num = 1
299 return self.socket.listen(num)
301 def bind(self, addr):
302 self.addr = addr
303 return self.socket.bind(addr)
305 def connect(self, address):
306 self.connected = False
307 err = self.socket.connect_ex(address)
308 # XXX Should interpret Winsock return values
309 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
310 return
311 if err in (0, EISCONN):
312 self.addr = address
313 self.connected = True
314 self.handle_connect()
315 else:
316 raise socket.error, (err, errorcode[err])
318 def accept(self):
319 # XXX can return either an address pair or None
320 try:
321 conn, addr = self.socket.accept()
322 return conn, addr
323 except socket.error, why:
324 if why[0] == EWOULDBLOCK:
325 pass
326 else:
327 raise
329 def send(self, data):
330 try:
331 result = self.socket.send(data)
332 return result
333 except socket.error, why:
334 if why[0] == EWOULDBLOCK:
335 return 0
336 else:
337 raise
338 return 0
340 def recv(self, buffer_size):
341 try:
342 data = self.socket.recv(buffer_size)
343 if not data:
344 # a closed connection is indicated by signaling
345 # a read condition, and having recv() return 0.
346 self.handle_close()
347 return ''
348 else:
349 return data
350 except socket.error, why:
351 # winsock sometimes throws ENOTCONN
352 if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
353 self.handle_close()
354 return ''
355 else:
356 raise
358 def close(self):
359 self.del_channel()
360 self.socket.close()
362 # cheap inheritance, used to pass all other attribute
363 # references to the underlying socket object.
364 def __getattr__(self, attr):
365 return getattr(self.socket, attr)
367 # log and log_info may be overridden to provide more sophisticated
368 # logging and warning methods. In general, log is for 'hit' logging
369 # and 'log_info' is for informational, warning and error logging.
371 def log(self, message):
372 sys.stderr.write('log: %s\n' % str(message))
374 def log_info(self, message, type='info'):
375 if __debug__ or type != 'info':
376 print '%s: %s' % (type, message)
378 def handle_read_event(self):
379 if self.accepting:
380 # for an accepting socket, getting a read implies
381 # that we are connected
382 if not self.connected:
383 self.connected = True
384 self.handle_accept()
385 elif not self.connected:
386 self.handle_connect()
387 self.connected = True
388 self.handle_read()
389 else:
390 self.handle_read()
392 def handle_write_event(self):
393 # getting a write implies that we are connected
394 if not self.connected:
395 self.handle_connect()
396 self.connected = True
397 self.handle_write()
399 def handle_expt_event(self):
400 self.handle_expt()
402 def handle_error(self):
403 nil, t, v, tbinfo = compact_traceback()
405 # sometimes a user repr method will crash.
406 try:
407 self_repr = repr(self)
408 except:
409 self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
411 self.log_info(
412 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
413 self_repr,
416 tbinfo
418 'error'
420 self.close()
422 def handle_expt(self):
423 self.log_info('unhandled exception', 'warning')
425 def handle_read(self):
426 self.log_info('unhandled read event', 'warning')
428 def handle_write(self):
429 self.log_info('unhandled write event', 'warning')
431 def handle_connect(self):
432 self.log_info('unhandled connect event', 'warning')
434 def handle_accept(self):
435 self.log_info('unhandled accept event', 'warning')
437 def handle_close(self):
438 self.log_info('unhandled close event', 'warning')
439 self.close()
441 # ---------------------------------------------------------------------------
442 # adds simple buffered output capability, useful for simple clients.
443 # [for more sophisticated usage use asynchat.async_chat]
444 # ---------------------------------------------------------------------------
446 class dispatcher_with_send(dispatcher):
448 def __init__(self, sock=None, map=None):
449 dispatcher.__init__(self, sock, map)
450 self.out_buffer = ''
452 def initiate_send(self):
453 num_sent = 0
454 num_sent = dispatcher.send(self, self.out_buffer[:512])
455 self.out_buffer = self.out_buffer[num_sent:]
457 def handle_write(self):
458 self.initiate_send()
460 def writable(self):
461 return (not self.connected) or len(self.out_buffer)
463 def send(self, data):
464 if self.debug:
465 self.log_info('sending %s' % repr(data))
466 self.out_buffer = self.out_buffer + data
467 self.initiate_send()
469 # ---------------------------------------------------------------------------
470 # used for debugging.
471 # ---------------------------------------------------------------------------
473 def compact_traceback():
474 t, v, tb = sys.exc_info()
475 tbinfo = []
476 assert tb # Must have a traceback
477 while tb:
478 tbinfo.append((
479 tb.tb_frame.f_code.co_filename,
480 tb.tb_frame.f_code.co_name,
481 str(tb.tb_lineno)
483 tb = tb.tb_next
485 # just to be safe
486 del tb
488 file, function, line = tbinfo[-1]
489 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
490 return (file, function, line), t, v, info
492 def close_all(map=None):
493 if map is None:
494 map = socket_map
495 for x in map.values():
496 x.socket.close()
497 map.clear()
499 # Asynchronous File I/O:
501 # After a little research (reading man pages on various unixen, and
502 # digging through the linux kernel), I've determined that select()
503 # isn't meant for doing asynchronous file i/o.
504 # Heartening, though - reading linux/mm/filemap.c shows that linux
505 # supports asynchronous read-ahead. So _MOST_ of the time, the data
506 # will be sitting in memory for us already when we go to read it.
508 # What other OS's (besides NT) support async file i/o? [VMS?]
510 # Regardless, this is useful for pipes, and stdin/stdout...
512 if os.name == 'posix':
513 import fcntl
515 class file_wrapper:
516 # here we override just enough to make a file
517 # look like a socket for the purposes of asyncore.
519 def __init__(self, fd):
520 self.fd = fd
522 def recv(self, *args):
523 return os.read(self.fd, *args)
525 def send(self, *args):
526 return os.write(self.fd, *args)
528 read = recv
529 write = send
531 def close(self):
532 os.close(self.fd)
534 def fileno(self):
535 return self.fd
537 class file_dispatcher(dispatcher):
539 def __init__(self, fd, map=None):
540 dispatcher.__init__(self, None, map)
541 self.connected = True
542 self.set_file(fd)
543 # set it to non-blocking mode
544 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
545 flags = flags | os.O_NONBLOCK
546 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
548 def set_file(self, fd):
549 self._fileno = fd
550 self.socket = file_wrapper(fd)
551 self.add_channel()