add release date
[python/dscho.git] / Lib / asyncore.py
blob3e3907eee860b30639d9bd51bb87c87bef84a2bc
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import select
50 import socket
51 import sys
52 import time
53 import warnings
55 import os
56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
57 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode
59 try:
60 socket_map
61 except NameError:
62 socket_map = {}
64 def _strerror(err):
65 try:
66 return os.strerror(err)
67 except (ValueError, OverflowError, NameError):
68 if err in errorcode:
69 return errorcode[err]
70 return "Unknown error %s" %err
72 class ExitNow(Exception):
73 pass
75 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
77 def read(obj):
78 try:
79 obj.handle_read_event()
80 except _reraised_exceptions:
81 raise
82 except:
83 obj.handle_error()
85 def write(obj):
86 try:
87 obj.handle_write_event()
88 except _reraised_exceptions:
89 raise
90 except:
91 obj.handle_error()
93 def _exception(obj):
94 try:
95 obj.handle_expt_event()
96 except _reraised_exceptions:
97 raise
98 except:
99 obj.handle_error()
101 def readwrite(obj, flags):
102 try:
103 if flags & select.POLLIN:
104 obj.handle_read_event()
105 if flags & select.POLLOUT:
106 obj.handle_write_event()
107 if flags & select.POLLPRI:
108 obj.handle_expt_event()
109 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
110 obj.handle_close()
111 except socket.error, e:
112 if e.args[0] not in (EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
113 obj.handle_error()
114 else:
115 obj.handle_close()
116 except _reraised_exceptions:
117 raise
118 except:
119 obj.handle_error()
121 def poll(timeout=0.0, map=None):
122 if map is None:
123 map = socket_map
124 if map:
125 r = []; w = []; e = []
126 for fd, obj in map.items():
127 is_r = obj.readable()
128 is_w = obj.writable()
129 if is_r:
130 r.append(fd)
131 if is_w:
132 w.append(fd)
133 if is_r or is_w:
134 e.append(fd)
135 if [] == r == w == e:
136 time.sleep(timeout)
137 return
139 try:
140 r, w, e = select.select(r, w, e, timeout)
141 except select.error, err:
142 if err.args[0] != EINTR:
143 raise
144 else:
145 return
147 for fd in r:
148 obj = map.get(fd)
149 if obj is None:
150 continue
151 read(obj)
153 for fd in w:
154 obj = map.get(fd)
155 if obj is None:
156 continue
157 write(obj)
159 for fd in e:
160 obj = map.get(fd)
161 if obj is None:
162 continue
163 _exception(obj)
165 def poll2(timeout=0.0, map=None):
166 # Use the poll() support added to the select module in Python 2.0
167 if map is None:
168 map = socket_map
169 if timeout is not None:
170 # timeout is in milliseconds
171 timeout = int(timeout*1000)
172 pollster = select.poll()
173 if map:
174 for fd, obj in map.items():
175 flags = 0
176 if obj.readable():
177 flags |= select.POLLIN | select.POLLPRI
178 if obj.writable():
179 flags |= select.POLLOUT
180 if flags:
181 # Only check for exceptions if object was either readable
182 # or writable.
183 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
184 pollster.register(fd, flags)
185 try:
186 r = pollster.poll(timeout)
187 except select.error, err:
188 if err.args[0] != EINTR:
189 raise
190 r = []
191 for fd, flags in r:
192 obj = map.get(fd)
193 if obj is None:
194 continue
195 readwrite(obj, flags)
197 poll3 = poll2 # Alias for backward compatibility
199 def loop(timeout=30.0, use_poll=False, map=None, count=None):
200 if map is None:
201 map = socket_map
203 if use_poll and hasattr(select, 'poll'):
204 poll_fun = poll2
205 else:
206 poll_fun = poll
208 if count is None:
209 while map:
210 poll_fun(timeout, map)
212 else:
213 while map and count > 0:
214 poll_fun(timeout, map)
215 count = count - 1
217 class dispatcher:
219 debug = False
220 connected = False
221 accepting = False
222 closing = False
223 addr = None
224 ignore_log_types = frozenset(['warning'])
226 def __init__(self, sock=None, map=None):
227 if map is None:
228 self._map = socket_map
229 else:
230 self._map = map
232 self._fileno = None
234 if sock:
235 # Set to nonblocking just to make sure for cases where we
236 # get a socket from a blocking source.
237 sock.setblocking(0)
238 self.set_socket(sock, map)
239 self.connected = True
240 # The constructor no longer requires that the socket
241 # passed be connected.
242 try:
243 self.addr = sock.getpeername()
244 except socket.error, err:
245 if err.args[0] == ENOTCONN:
246 # To handle the case where we got an unconnected
247 # socket.
248 self.connected = False
249 else:
250 # The socket is broken in some unknown way, alert
251 # the user and remove it from the map (to prevent
252 # polling of broken sockets).
253 self.del_channel(map)
254 raise
255 else:
256 self.socket = None
258 def __repr__(self):
259 status = [self.__class__.__module__+"."+self.__class__.__name__]
260 if self.accepting and self.addr:
261 status.append('listening')
262 elif self.connected:
263 status.append('connected')
264 if self.addr is not None:
265 try:
266 status.append('%s:%d' % self.addr)
267 except TypeError:
268 status.append(repr(self.addr))
269 return '<%s at %#x>' % (' '.join(status), id(self))
271 __str__ = __repr__
273 def add_channel(self, map=None):
274 #self.log_info('adding channel %s' % self)
275 if map is None:
276 map = self._map
277 map[self._fileno] = self
279 def del_channel(self, map=None):
280 fd = self._fileno
281 if map is None:
282 map = self._map
283 if fd in map:
284 #self.log_info('closing channel %d:%s' % (fd, self))
285 del map[fd]
286 self._fileno = None
288 def create_socket(self, family, type):
289 self.family_and_type = family, type
290 sock = socket.socket(family, type)
291 sock.setblocking(0)
292 self.set_socket(sock)
294 def set_socket(self, sock, map=None):
295 self.socket = sock
296 ## self.__dict__['socket'] = sock
297 self._fileno = sock.fileno()
298 self.add_channel(map)
300 def set_reuse_addr(self):
301 # try to re-use a server port if possible
302 try:
303 self.socket.setsockopt(
304 socket.SOL_SOCKET, socket.SO_REUSEADDR,
305 self.socket.getsockopt(socket.SOL_SOCKET,
306 socket.SO_REUSEADDR) | 1
308 except socket.error:
309 pass
311 # ==================================================
312 # predicates for select()
313 # these are used as filters for the lists of sockets
314 # to pass to select().
315 # ==================================================
317 def readable(self):
318 return True
320 def writable(self):
321 return True
323 # ==================================================
324 # socket object methods.
325 # ==================================================
327 def listen(self, num):
328 self.accepting = True
329 if os.name == 'nt' and num > 5:
330 num = 5
331 return self.socket.listen(num)
333 def bind(self, addr):
334 self.addr = addr
335 return self.socket.bind(addr)
337 def connect(self, address):
338 self.connected = False
339 err = self.socket.connect_ex(address)
340 # XXX Should interpret Winsock return values
341 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
342 return
343 if err in (0, EISCONN):
344 self.addr = address
345 self.handle_connect_event()
346 else:
347 raise socket.error(err, errorcode[err])
349 def accept(self):
350 # XXX can return either an address pair or None
351 try:
352 conn, addr = self.socket.accept()
353 return conn, addr
354 except socket.error, why:
355 if why.args[0] == EWOULDBLOCK:
356 pass
357 else:
358 raise
360 def send(self, data):
361 try:
362 result = self.socket.send(data)
363 return result
364 except socket.error, why:
365 if why.args[0] == EWOULDBLOCK:
366 return 0
367 elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
368 self.handle_close()
369 return 0
370 else:
371 raise
373 def recv(self, buffer_size):
374 try:
375 data = self.socket.recv(buffer_size)
376 if not data:
377 # a closed connection is indicated by signaling
378 # a read condition, and having recv() return 0.
379 self.handle_close()
380 return ''
381 else:
382 return data
383 except socket.error, why:
384 # winsock sometimes throws ENOTCONN
385 if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
386 self.handle_close()
387 return ''
388 else:
389 raise
391 def close(self):
392 self.connected = False
393 self.accepting = False
394 self.del_channel()
395 try:
396 self.socket.close()
397 except socket.error, why:
398 if why.args[0] not in (ENOTCONN, EBADF):
399 raise
401 # cheap inheritance, used to pass all other attribute
402 # references to the underlying socket object.
403 def __getattr__(self, attr):
404 try:
405 retattr = getattr(self.socket, attr)
406 except AttributeError:
407 raise AttributeError("%s instance has no attribute '%s'"
408 %(self.__class__.__name__, attr))
409 else:
410 msg = "%(me)s.%(attr)s is deprecated. Use %(me)s.socket.%(attr)s " \
411 "instead." % {'me': self.__class__.__name__, 'attr':attr}
412 warnings.warn(msg, DeprecationWarning, stacklevel=2)
413 return retattr
415 # log and log_info may be overridden to provide more sophisticated
416 # logging and warning methods. In general, log is for 'hit' logging
417 # and 'log_info' is for informational, warning and error logging.
419 def log(self, message):
420 sys.stderr.write('log: %s\n' % str(message))
422 def log_info(self, message, type='info'):
423 if type not in self.ignore_log_types:
424 print '%s: %s' % (type, message)
426 def handle_read_event(self):
427 if self.accepting:
428 # accepting sockets are never connected, they "spawn" new
429 # sockets that are connected
430 self.handle_accept()
431 elif not self.connected:
432 self.handle_connect_event()
433 self.handle_read()
434 else:
435 self.handle_read()
437 def handle_connect_event(self):
438 self.connected = True
439 self.handle_connect()
441 def handle_write_event(self):
442 if self.accepting:
443 # Accepting sockets shouldn't get a write event.
444 # We will pretend it didn't happen.
445 return
447 if not self.connected:
448 #check for errors
449 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
450 if err != 0:
451 raise socket.error(err, _strerror(err))
453 self.handle_connect_event()
454 self.handle_write()
456 def handle_expt_event(self):
457 # handle_expt_event() is called if there might be an error on the
458 # socket, or if there is OOB data
459 # check for the error condition first
460 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
461 if err != 0:
462 # we can get here when select.select() says that there is an
463 # exceptional condition on the socket
464 # since there is an error, we'll go ahead and close the socket
465 # like we would in a subclassed handle_read() that received no
466 # data
467 self.handle_close()
468 else:
469 self.handle_expt()
471 def handle_error(self):
472 nil, t, v, tbinfo = compact_traceback()
474 # sometimes a user repr method will crash.
475 try:
476 self_repr = repr(self)
477 except:
478 self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
480 self.log_info(
481 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
482 self_repr,
485 tbinfo
487 'error'
489 self.handle_close()
491 def handle_expt(self):
492 self.log_info('unhandled incoming priority event', 'warning')
494 def handle_read(self):
495 self.log_info('unhandled read event', 'warning')
497 def handle_write(self):
498 self.log_info('unhandled write event', 'warning')
500 def handle_connect(self):
501 self.log_info('unhandled connect event', 'warning')
503 def handle_accept(self):
504 self.log_info('unhandled accept event', 'warning')
506 def handle_close(self):
507 self.log_info('unhandled close event', 'warning')
508 self.close()
510 # ---------------------------------------------------------------------------
511 # adds simple buffered output capability, useful for simple clients.
512 # [for more sophisticated usage use asynchat.async_chat]
513 # ---------------------------------------------------------------------------
515 class dispatcher_with_send(dispatcher):
517 def __init__(self, sock=None, map=None):
518 dispatcher.__init__(self, sock, map)
519 self.out_buffer = ''
521 def initiate_send(self):
522 num_sent = 0
523 num_sent = dispatcher.send(self, self.out_buffer[:512])
524 self.out_buffer = self.out_buffer[num_sent:]
526 def handle_write(self):
527 self.initiate_send()
529 def writable(self):
530 return (not self.connected) or len(self.out_buffer)
532 def send(self, data):
533 if self.debug:
534 self.log_info('sending %s' % repr(data))
535 self.out_buffer = self.out_buffer + data
536 self.initiate_send()
538 # ---------------------------------------------------------------------------
539 # used for debugging.
540 # ---------------------------------------------------------------------------
542 def compact_traceback():
543 t, v, tb = sys.exc_info()
544 tbinfo = []
545 if not tb: # Must have a traceback
546 raise AssertionError("traceback does not exist")
547 while tb:
548 tbinfo.append((
549 tb.tb_frame.f_code.co_filename,
550 tb.tb_frame.f_code.co_name,
551 str(tb.tb_lineno)
553 tb = tb.tb_next
555 # just to be safe
556 del tb
558 file, function, line = tbinfo[-1]
559 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
560 return (file, function, line), t, v, info
562 def close_all(map=None, ignore_all=False):
563 if map is None:
564 map = socket_map
565 for x in map.values():
566 try:
567 x.close()
568 except OSError, x:
569 if x.args[0] == EBADF:
570 pass
571 elif not ignore_all:
572 raise
573 except _reraised_exceptions:
574 raise
575 except:
576 if not ignore_all:
577 raise
578 map.clear()
580 # Asynchronous File I/O:
582 # After a little research (reading man pages on various unixen, and
583 # digging through the linux kernel), I've determined that select()
584 # isn't meant for doing asynchronous file i/o.
585 # Heartening, though - reading linux/mm/filemap.c shows that linux
586 # supports asynchronous read-ahead. So _MOST_ of the time, the data
587 # will be sitting in memory for us already when we go to read it.
589 # What other OS's (besides NT) support async file i/o? [VMS?]
591 # Regardless, this is useful for pipes, and stdin/stdout...
593 if os.name == 'posix':
594 import fcntl
596 class file_wrapper:
597 # Here we override just enough to make a file
598 # look like a socket for the purposes of asyncore.
599 # The passed fd is automatically os.dup()'d
601 def __init__(self, fd):
602 self.fd = os.dup(fd)
604 def recv(self, *args):
605 return os.read(self.fd, *args)
607 def send(self, *args):
608 return os.write(self.fd, *args)
610 read = recv
611 write = send
613 def close(self):
614 os.close(self.fd)
616 def fileno(self):
617 return self.fd
619 class file_dispatcher(dispatcher):
621 def __init__(self, fd, map=None):
622 dispatcher.__init__(self, None, map)
623 self.connected = True
624 try:
625 fd = fd.fileno()
626 except AttributeError:
627 pass
628 self.set_file(fd)
629 # set it to non-blocking mode
630 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
631 flags = flags | os.O_NONBLOCK
632 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
634 def set_file(self, fd):
635 self.socket = file_wrapper(fd)
636 self._fileno = self.socket.fileno()
637 self.add_channel()