Merged revisions 85328 via svnmerge from
[python/dscho.git] / Lib / asyncore.py
blob7f06e43e7161e7cd6b09a43651d6fae9fcdfae13
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import select
50 import socket
51 import sys
52 import time
53 import os
54 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
55 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode
57 try:
58 socket_map
59 except NameError:
60 socket_map = {}
62 def _strerror(err):
63 try:
64 return os.strerror(err)
65 except (ValueError, OverflowError, NameError):
66 if err in errorcode:
67 return errorcode[err]
68 return "Unknown error %s" %err
70 class ExitNow(Exception):
71 pass
73 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
75 def read(obj):
76 try:
77 obj.handle_read_event()
78 except _reraised_exceptions:
79 raise
80 except:
81 obj.handle_error()
83 def write(obj):
84 try:
85 obj.handle_write_event()
86 except _reraised_exceptions:
87 raise
88 except:
89 obj.handle_error()
91 def _exception(obj):
92 try:
93 obj.handle_expt_event()
94 except _reraised_exceptions:
95 raise
96 except:
97 obj.handle_error()
99 def readwrite(obj, flags):
100 try:
101 if flags & select.POLLIN:
102 obj.handle_read_event()
103 if flags & select.POLLOUT:
104 obj.handle_write_event()
105 if flags & select.POLLPRI:
106 obj.handle_expt_event()
107 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
108 obj.handle_close()
109 except socket.error as e:
110 if e.args[0] not in (EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
111 obj.handle_error()
112 else:
113 obj.handle_close()
114 except _reraised_exceptions:
115 raise
116 except:
117 obj.handle_error()
119 def poll(timeout=0.0, map=None):
120 if map is None:
121 map = socket_map
122 if map:
123 r = []; w = []; e = []
124 for fd, obj in list(map.items()):
125 is_r = obj.readable()
126 is_w = obj.writable()
127 if is_r:
128 r.append(fd)
129 if is_w:
130 w.append(fd)
131 if is_r or is_w:
132 e.append(fd)
133 if [] == r == w == e:
134 time.sleep(timeout)
135 return
137 try:
138 r, w, e = select.select(r, w, e, timeout)
139 except select.error as err:
140 if err.args[0] != EINTR:
141 raise
142 else:
143 return
145 for fd in r:
146 obj = map.get(fd)
147 if obj is None:
148 continue
149 read(obj)
151 for fd in w:
152 obj = map.get(fd)
153 if obj is None:
154 continue
155 write(obj)
157 for fd in e:
158 obj = map.get(fd)
159 if obj is None:
160 continue
161 _exception(obj)
163 def poll2(timeout=0.0, map=None):
164 # Use the poll() support added to the select module in Python 2.0
165 if map is None:
166 map = socket_map
167 if timeout is not None:
168 # timeout is in milliseconds
169 timeout = int(timeout*1000)
170 pollster = select.poll()
171 if map:
172 for fd, obj in list(map.items()):
173 flags = 0
174 if obj.readable():
175 flags |= select.POLLIN | select.POLLPRI
176 if obj.writable():
177 flags |= select.POLLOUT
178 if flags:
179 # Only check for exceptions if object was either readable
180 # or writable.
181 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
182 pollster.register(fd, flags)
183 try:
184 r = pollster.poll(timeout)
185 except select.error as err:
186 if err.args[0] != EINTR:
187 raise
188 r = []
189 for fd, flags in r:
190 obj = map.get(fd)
191 if obj is None:
192 continue
193 readwrite(obj, flags)
195 poll3 = poll2 # Alias for backward compatibility
197 def loop(timeout=30.0, use_poll=False, map=None, count=None):
198 if map is None:
199 map = socket_map
201 if use_poll and hasattr(select, 'poll'):
202 poll_fun = poll2
203 else:
204 poll_fun = poll
206 if count is None:
207 while map:
208 poll_fun(timeout, map)
210 else:
211 while map and count > 0:
212 poll_fun(timeout, map)
213 count = count - 1
215 class dispatcher:
217 debug = False
218 connected = False
219 accepting = False
220 closing = False
221 addr = None
222 ignore_log_types = frozenset(['warning'])
224 def __init__(self, sock=None, map=None):
225 if map is None:
226 self._map = socket_map
227 else:
228 self._map = map
230 self._fileno = None
232 if sock:
233 # Set to nonblocking just to make sure for cases where we
234 # get a socket from a blocking source.
235 sock.setblocking(0)
236 self.set_socket(sock, map)
237 self.connected = True
238 # The constructor no longer requires that the socket
239 # passed be connected.
240 try:
241 self.addr = sock.getpeername()
242 except socket.error as err:
243 if err.args[0] == ENOTCONN:
244 # To handle the case where we got an unconnected
245 # socket.
246 self.connected = False
247 else:
248 # The socket is broken in some unknown way, alert
249 # the user and remove it from the map (to prevent
250 # polling of broken sockets).
251 self.del_channel(map)
252 raise
253 else:
254 self.socket = None
256 def __repr__(self):
257 status = [self.__class__.__module__+"."+self.__class__.__name__]
258 if self.accepting and self.addr:
259 status.append('listening')
260 elif self.connected:
261 status.append('connected')
262 if self.addr is not None:
263 try:
264 status.append('%s:%d' % self.addr)
265 except TypeError:
266 status.append(repr(self.addr))
267 return '<%s at %#x>' % (' '.join(status), id(self))
269 def add_channel(self, map=None):
270 #self.log_info('adding channel %s' % self)
271 if map is None:
272 map = self._map
273 map[self._fileno] = self
275 def del_channel(self, map=None):
276 fd = self._fileno
277 if map is None:
278 map = self._map
279 if fd in map:
280 #self.log_info('closing channel %d:%s' % (fd, self))
281 del map[fd]
282 self._fileno = None
284 def create_socket(self, family, type):
285 self.family_and_type = family, type
286 sock = socket.socket(family, type)
287 sock.setblocking(0)
288 self.set_socket(sock)
290 def set_socket(self, sock, map=None):
291 self.socket = sock
292 ## self.__dict__['socket'] = sock
293 self._fileno = sock.fileno()
294 self.add_channel(map)
296 def set_reuse_addr(self):
297 # try to re-use a server port if possible
298 try:
299 self.socket.setsockopt(
300 socket.SOL_SOCKET, socket.SO_REUSEADDR,
301 self.socket.getsockopt(socket.SOL_SOCKET,
302 socket.SO_REUSEADDR) | 1
304 except socket.error:
305 pass
307 # ==================================================
308 # predicates for select()
309 # these are used as filters for the lists of sockets
310 # to pass to select().
311 # ==================================================
313 def readable(self):
314 return True
316 def writable(self):
317 return True
319 # ==================================================
320 # socket object methods.
321 # ==================================================
323 def listen(self, num):
324 self.accepting = True
325 if os.name == 'nt' and num > 5:
326 num = 5
327 return self.socket.listen(num)
329 def bind(self, addr):
330 self.addr = addr
331 return self.socket.bind(addr)
333 def connect(self, address):
334 self.connected = False
335 err = self.socket.connect_ex(address)
336 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
337 or err == EINVAL and os.name in ('nt', 'ce'):
338 return
339 if err in (0, EISCONN):
340 self.addr = address
341 self.handle_connect_event()
342 else:
343 raise socket.error(err, errorcode[err])
345 def accept(self):
346 # XXX can return either an address pair or None
347 try:
348 conn, addr = self.socket.accept()
349 return conn, addr
350 except socket.error as why:
351 if why.args[0] == EWOULDBLOCK:
352 pass
353 else:
354 raise
356 def send(self, data):
357 try:
358 result = self.socket.send(data)
359 return result
360 except socket.error as why:
361 if why.args[0] == EWOULDBLOCK:
362 return 0
363 elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
364 self.handle_close()
365 return 0
366 else:
367 raise
369 def recv(self, buffer_size):
370 try:
371 data = self.socket.recv(buffer_size)
372 if not data:
373 # a closed connection is indicated by signaling
374 # a read condition, and having recv() return 0.
375 self.handle_close()
376 return b''
377 else:
378 return data
379 except socket.error as why:
380 # winsock sometimes throws ENOTCONN
381 if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
382 self.handle_close()
383 return b''
384 else:
385 raise
387 def close(self):
388 self.connected = False
389 self.accepting = False
390 self.del_channel()
391 try:
392 self.socket.close()
393 except socket.error as why:
394 if why.args[0] not in (ENOTCONN, EBADF):
395 raise
397 # cheap inheritance, used to pass all other attribute
398 # references to the underlying socket object.
399 def __getattr__(self, attr):
400 try:
401 return getattr(self.socket, attr)
402 except AttributeError:
403 raise AttributeError("%s instance has no attribute '%s'"
404 %(self.__class__.__name__, attr))
406 # log and log_info may be overridden to provide more sophisticated
407 # logging and warning methods. In general, log is for 'hit' logging
408 # and 'log_info' is for informational, warning and error logging.
410 def log(self, message):
411 sys.stderr.write('log: %s\n' % str(message))
413 def log_info(self, message, type='info'):
414 if type not in self.ignore_log_types:
415 print('%s: %s' % (type, message))
417 def handle_read_event(self):
418 if self.accepting:
419 # accepting sockets are never connected, they "spawn" new
420 # sockets that are connected
421 self.handle_accept()
422 elif not self.connected:
423 self.handle_connect_event()
424 self.handle_read()
425 else:
426 self.handle_read()
428 def handle_connect_event(self):
429 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
430 if err != 0:
431 raise socket.error(err, _strerror(err))
432 self.handle_connect()
433 self.connected = True
435 def handle_write_event(self):
436 if self.accepting:
437 # Accepting sockets shouldn't get a write event.
438 # We will pretend it didn't happen.
439 return
441 if not self.connected:
442 #check for errors
443 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
444 if err != 0:
445 raise socket.error(err, _strerror(err))
447 self.handle_connect_event()
448 self.handle_write()
450 def handle_expt_event(self):
451 # handle_expt_event() is called if there might be an error on the
452 # socket, or if there is OOB data
453 # check for the error condition first
454 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
455 if err != 0:
456 # we can get here when select.select() says that there is an
457 # exceptional condition on the socket
458 # since there is an error, we'll go ahead and close the socket
459 # like we would in a subclassed handle_read() that received no
460 # data
461 self.handle_close()
462 else:
463 self.handle_expt()
465 def handle_error(self):
466 nil, t, v, tbinfo = compact_traceback()
468 # sometimes a user repr method will crash.
469 try:
470 self_repr = repr(self)
471 except:
472 self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
474 self.log_info(
475 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
476 self_repr,
479 tbinfo
481 'error'
483 self.handle_close()
485 def handle_expt(self):
486 self.log_info('unhandled incoming priority event', 'warning')
488 def handle_read(self):
489 self.log_info('unhandled read event', 'warning')
491 def handle_write(self):
492 self.log_info('unhandled write event', 'warning')
494 def handle_connect(self):
495 self.log_info('unhandled connect event', 'warning')
497 def handle_accept(self):
498 self.log_info('unhandled accept event', 'warning')
500 def handle_close(self):
501 self.log_info('unhandled close event', 'warning')
502 self.close()
504 # ---------------------------------------------------------------------------
505 # adds simple buffered output capability, useful for simple clients.
506 # [for more sophisticated usage use asynchat.async_chat]
507 # ---------------------------------------------------------------------------
509 class dispatcher_with_send(dispatcher):
511 def __init__(self, sock=None, map=None):
512 dispatcher.__init__(self, sock, map)
513 self.out_buffer = b''
515 def initiate_send(self):
516 num_sent = 0
517 num_sent = dispatcher.send(self, self.out_buffer[:512])
518 self.out_buffer = self.out_buffer[num_sent:]
520 def handle_write(self):
521 self.initiate_send()
523 def writable(self):
524 return (not self.connected) or len(self.out_buffer)
526 def send(self, data):
527 if self.debug:
528 self.log_info('sending %s' % repr(data))
529 self.out_buffer = self.out_buffer + data
530 self.initiate_send()
532 # ---------------------------------------------------------------------------
533 # used for debugging.
534 # ---------------------------------------------------------------------------
536 def compact_traceback():
537 t, v, tb = sys.exc_info()
538 tbinfo = []
539 if not tb: # Must have a traceback
540 raise AssertionError("traceback does not exist")
541 while tb:
542 tbinfo.append((
543 tb.tb_frame.f_code.co_filename,
544 tb.tb_frame.f_code.co_name,
545 str(tb.tb_lineno)
547 tb = tb.tb_next
549 # just to be safe
550 del tb
552 file, function, line = tbinfo[-1]
553 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
554 return (file, function, line), t, v, info
556 def close_all(map=None, ignore_all=False):
557 if map is None:
558 map = socket_map
559 for x in list(map.values()):
560 try:
561 x.close()
562 except OSError as x:
563 if x.args[0] == EBADF:
564 pass
565 elif not ignore_all:
566 raise
567 except _reraised_exceptions:
568 raise
569 except:
570 if not ignore_all:
571 raise
572 map.clear()
574 # Asynchronous File I/O:
576 # After a little research (reading man pages on various unixen, and
577 # digging through the linux kernel), I've determined that select()
578 # isn't meant for doing asynchronous file i/o.
579 # Heartening, though - reading linux/mm/filemap.c shows that linux
580 # supports asynchronous read-ahead. So _MOST_ of the time, the data
581 # will be sitting in memory for us already when we go to read it.
583 # What other OS's (besides NT) support async file i/o? [VMS?]
585 # Regardless, this is useful for pipes, and stdin/stdout...
587 if os.name == 'posix':
588 import fcntl
590 class file_wrapper:
591 # Here we override just enough to make a file
592 # look like a socket for the purposes of asyncore.
593 # The passed fd is automatically os.dup()'d
595 def __init__(self, fd):
596 self.fd = os.dup(fd)
598 def recv(self, *args):
599 return os.read(self.fd, *args)
601 def send(self, *args):
602 return os.write(self.fd, *args)
604 def getsockopt(self, level, optname, buflen=None):
605 if (level == socket.SOL_SOCKET and
606 optname == socket.SO_ERROR and
607 not buflen):
608 return 0
609 raise NotImplementedError("Only asyncore specific behaviour "
610 "implemented.")
612 read = recv
613 write = send
615 def close(self):
616 os.close(self.fd)
618 def fileno(self):
619 return self.fd
621 class file_dispatcher(dispatcher):
623 def __init__(self, fd, map=None):
624 dispatcher.__init__(self, None, map)
625 self.connected = True
626 try:
627 fd = fd.fileno()
628 except AttributeError:
629 pass
630 self.set_file(fd)
631 # set it to non-blocking mode
632 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
633 flags = flags | os.O_NONBLOCK
634 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
636 def set_file(self, fd):
637 self.socket = file_wrapper(fd)
638 self._fileno = self.socket.fileno()
639 self.add_channel()