Use richer assertions in test_mailbox (for better failure messages).
[python.git] / Lib / asyncore.py
blob6d6ca33224179069d493c8cb6591f3c2c5bc8709
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import select
50 import socket
51 import sys
52 import time
54 import os
55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
56 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode
58 try:
59 socket_map
60 except NameError:
61 socket_map = {}
63 def _strerror(err):
64 res = os.strerror(err)
65 if res == 'Unknown error':
66 res = errorcode[err]
67 return res
69 class ExitNow(Exception):
70 pass
72 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
74 def read(obj):
75 try:
76 obj.handle_read_event()
77 except _reraised_exceptions:
78 raise
79 except:
80 obj.handle_error()
82 def write(obj):
83 try:
84 obj.handle_write_event()
85 except _reraised_exceptions:
86 raise
87 except:
88 obj.handle_error()
90 def _exception(obj):
91 try:
92 obj.handle_expt_event()
93 except _reraised_exceptions:
94 raise
95 except:
96 obj.handle_error()
98 def readwrite(obj, flags):
99 try:
100 if flags & select.POLLIN:
101 obj.handle_read_event()
102 if flags & select.POLLOUT:
103 obj.handle_write_event()
104 if flags & select.POLLPRI:
105 obj.handle_expt_event()
106 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
107 obj.handle_close()
108 except socket.error, e:
109 if e.args[0] not in (EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
110 obj.handle_error()
111 else:
112 obj.handle_close()
113 except _reraised_exceptions:
114 raise
115 except:
116 obj.handle_error()
118 def poll(timeout=0.0, map=None):
119 if map is None:
120 map = socket_map
121 if map:
122 r = []; w = []; e = []
123 for fd, obj in map.items():
124 is_r = obj.readable()
125 is_w = obj.writable()
126 if is_r:
127 r.append(fd)
128 if is_w:
129 w.append(fd)
130 if is_r or is_w:
131 e.append(fd)
132 if [] == r == w == e:
133 time.sleep(timeout)
134 return
136 try:
137 r, w, e = select.select(r, w, e, timeout)
138 except select.error, err:
139 if err.args[0] != EINTR:
140 raise
141 else:
142 return
144 for fd in r:
145 obj = map.get(fd)
146 if obj is None:
147 continue
148 read(obj)
150 for fd in w:
151 obj = map.get(fd)
152 if obj is None:
153 continue
154 write(obj)
156 for fd in e:
157 obj = map.get(fd)
158 if obj is None:
159 continue
160 _exception(obj)
162 def poll2(timeout=0.0, map=None):
163 # Use the poll() support added to the select module in Python 2.0
164 if map is None:
165 map = socket_map
166 if timeout is not None:
167 # timeout is in milliseconds
168 timeout = int(timeout*1000)
169 pollster = select.poll()
170 if map:
171 for fd, obj in map.items():
172 flags = 0
173 if obj.readable():
174 flags |= select.POLLIN | select.POLLPRI
175 if obj.writable():
176 flags |= select.POLLOUT
177 if flags:
178 # Only check for exceptions if object was either readable
179 # or writable.
180 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
181 pollster.register(fd, flags)
182 try:
183 r = pollster.poll(timeout)
184 except select.error, err:
185 if err.args[0] != EINTR:
186 raise
187 r = []
188 for fd, flags in r:
189 obj = map.get(fd)
190 if obj is None:
191 continue
192 readwrite(obj, flags)
194 poll3 = poll2 # Alias for backward compatibility
196 def loop(timeout=30.0, use_poll=False, map=None, count=None):
197 if map is None:
198 map = socket_map
200 if use_poll and hasattr(select, 'poll'):
201 poll_fun = poll2
202 else:
203 poll_fun = poll
205 if count is None:
206 while map:
207 poll_fun(timeout, map)
209 else:
210 while map and count > 0:
211 poll_fun(timeout, map)
212 count = count - 1
214 class dispatcher:
216 debug = False
217 connected = False
218 accepting = False
219 closing = False
220 addr = None
221 ignore_log_types = frozenset(['warning'])
223 def __init__(self, sock=None, map=None):
224 if map is None:
225 self._map = socket_map
226 else:
227 self._map = map
229 self._fileno = None
231 if sock:
232 # Set to nonblocking just to make sure for cases where we
233 # get a socket from a blocking source.
234 sock.setblocking(0)
235 self.set_socket(sock, map)
236 self.connected = True
237 # The constructor no longer requires that the socket
238 # passed be connected.
239 try:
240 self.addr = sock.getpeername()
241 except socket.error, err:
242 if err.args[0] == ENOTCONN:
243 # To handle the case where we got an unconnected
244 # socket.
245 self.connected = False
246 else:
247 # The socket is broken in some unknown way, alert
248 # the user and remove it from the map (to prevent
249 # polling of broken sockets).
250 self.del_channel(map)
251 raise
252 else:
253 self.socket = None
255 def __repr__(self):
256 status = [self.__class__.__module__+"."+self.__class__.__name__]
257 if self.accepting and self.addr:
258 status.append('listening')
259 elif self.connected:
260 status.append('connected')
261 if self.addr is not None:
262 try:
263 status.append('%s:%d' % self.addr)
264 except TypeError:
265 status.append(repr(self.addr))
266 return '<%s at %#x>' % (' '.join(status), id(self))
268 def add_channel(self, map=None):
269 #self.log_info('adding channel %s' % self)
270 if map is None:
271 map = self._map
272 map[self._fileno] = self
274 def del_channel(self, map=None):
275 fd = self._fileno
276 if map is None:
277 map = self._map
278 if fd in map:
279 #self.log_info('closing channel %d:%s' % (fd, self))
280 del map[fd]
281 self._fileno = None
283 def create_socket(self, family, type):
284 self.family_and_type = family, type
285 sock = socket.socket(family, type)
286 sock.setblocking(0)
287 self.set_socket(sock)
289 def set_socket(self, sock, map=None):
290 self.socket = sock
291 ## self.__dict__['socket'] = sock
292 self._fileno = sock.fileno()
293 self.add_channel(map)
295 def set_reuse_addr(self):
296 # try to re-use a server port if possible
297 try:
298 self.socket.setsockopt(
299 socket.SOL_SOCKET, socket.SO_REUSEADDR,
300 self.socket.getsockopt(socket.SOL_SOCKET,
301 socket.SO_REUSEADDR) | 1
303 except socket.error:
304 pass
306 # ==================================================
307 # predicates for select()
308 # these are used as filters for the lists of sockets
309 # to pass to select().
310 # ==================================================
312 def readable(self):
313 return True
315 def writable(self):
316 return True
318 # ==================================================
319 # socket object methods.
320 # ==================================================
322 def listen(self, num):
323 self.accepting = True
324 if os.name == 'nt' and num > 5:
325 num = 5
326 return self.socket.listen(num)
328 def bind(self, addr):
329 self.addr = addr
330 return self.socket.bind(addr)
332 def connect(self, address):
333 self.connected = False
334 err = self.socket.connect_ex(address)
335 # XXX Should interpret Winsock return values
336 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
337 return
338 if err in (0, EISCONN):
339 self.addr = address
340 self.handle_connect_event()
341 else:
342 raise socket.error(err, errorcode[err])
344 def accept(self):
345 # XXX can return either an address pair or None
346 try:
347 conn, addr = self.socket.accept()
348 return conn, addr
349 except socket.error, why:
350 if why.args[0] == EWOULDBLOCK:
351 pass
352 else:
353 raise
355 def send(self, data):
356 try:
357 result = self.socket.send(data)
358 return result
359 except socket.error, why:
360 if why.args[0] == EWOULDBLOCK:
361 return 0
362 elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
363 self.handle_close()
364 return 0
365 else:
366 raise
368 def recv(self, buffer_size):
369 try:
370 data = self.socket.recv(buffer_size)
371 if not data:
372 # a closed connection is indicated by signaling
373 # a read condition, and having recv() return 0.
374 self.handle_close()
375 return ''
376 else:
377 return data
378 except socket.error, why:
379 # winsock sometimes throws ENOTCONN
380 if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
381 self.handle_close()
382 return ''
383 else:
384 raise
386 def close(self):
387 self.connected = False
388 self.accepting = False
389 self.del_channel()
390 try:
391 self.socket.close()
392 except socket.error, why:
393 if why.args[0] not in (ENOTCONN, EBADF):
394 raise
396 # cheap inheritance, used to pass all other attribute
397 # references to the underlying socket object.
398 def __getattr__(self, attr):
399 return getattr(self.socket, attr)
401 # log and log_info may be overridden to provide more sophisticated
402 # logging and warning methods. In general, log is for 'hit' logging
403 # and 'log_info' is for informational, warning and error logging.
405 def log(self, message):
406 sys.stderr.write('log: %s\n' % str(message))
408 def log_info(self, message, type='info'):
409 if type not in self.ignore_log_types:
410 print '%s: %s' % (type, message)
412 def handle_read_event(self):
413 if self.accepting:
414 # accepting sockets are never connected, they "spawn" new
415 # sockets that are connected
416 self.handle_accept()
417 elif not self.connected:
418 self.handle_connect_event()
419 self.handle_read()
420 else:
421 self.handle_read()
423 def handle_connect_event(self):
424 self.connected = True
425 self.handle_connect()
427 def handle_write_event(self):
428 if self.accepting:
429 # Accepting sockets shouldn't get a write event.
430 # We will pretend it didn't happen.
431 return
433 if not self.connected:
434 #check for errors
435 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
436 if err != 0:
437 raise socket.error(err, _strerror(err))
439 self.handle_connect_event()
440 self.handle_write()
442 def handle_expt_event(self):
443 # handle_expt_event() is called if there might be an error on the
444 # socket, or if there is OOB data
445 # check for the error condition first
446 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
447 if err != 0:
448 # we can get here when select.select() says that there is an
449 # exceptional condition on the socket
450 # since there is an error, we'll go ahead and close the socket
451 # like we would in a subclassed handle_read() that received no
452 # data
453 self.handle_close()
454 else:
455 self.handle_expt()
457 def handle_error(self):
458 nil, t, v, tbinfo = compact_traceback()
460 # sometimes a user repr method will crash.
461 try:
462 self_repr = repr(self)
463 except:
464 self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
466 self.log_info(
467 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
468 self_repr,
471 tbinfo
473 'error'
475 self.handle_close()
477 def handle_expt(self):
478 self.log_info('unhandled incoming priority event', 'warning')
480 def handle_read(self):
481 self.log_info('unhandled read event', 'warning')
483 def handle_write(self):
484 self.log_info('unhandled write event', 'warning')
486 def handle_connect(self):
487 self.log_info('unhandled connect event', 'warning')
489 def handle_accept(self):
490 self.log_info('unhandled accept event', 'warning')
492 def handle_close(self):
493 self.log_info('unhandled close event', 'warning')
494 self.close()
496 # ---------------------------------------------------------------------------
497 # adds simple buffered output capability, useful for simple clients.
498 # [for more sophisticated usage use asynchat.async_chat]
499 # ---------------------------------------------------------------------------
501 class dispatcher_with_send(dispatcher):
503 def __init__(self, sock=None, map=None):
504 dispatcher.__init__(self, sock, map)
505 self.out_buffer = ''
507 def initiate_send(self):
508 num_sent = 0
509 num_sent = dispatcher.send(self, self.out_buffer[:512])
510 self.out_buffer = self.out_buffer[num_sent:]
512 def handle_write(self):
513 self.initiate_send()
515 def writable(self):
516 return (not self.connected) or len(self.out_buffer)
518 def send(self, data):
519 if self.debug:
520 self.log_info('sending %s' % repr(data))
521 self.out_buffer = self.out_buffer + data
522 self.initiate_send()
524 # ---------------------------------------------------------------------------
525 # used for debugging.
526 # ---------------------------------------------------------------------------
528 def compact_traceback():
529 t, v, tb = sys.exc_info()
530 tbinfo = []
531 if not tb: # Must have a traceback
532 raise AssertionError("traceback does not exist")
533 while tb:
534 tbinfo.append((
535 tb.tb_frame.f_code.co_filename,
536 tb.tb_frame.f_code.co_name,
537 str(tb.tb_lineno)
539 tb = tb.tb_next
541 # just to be safe
542 del tb
544 file, function, line = tbinfo[-1]
545 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
546 return (file, function, line), t, v, info
548 def close_all(map=None, ignore_all=False):
549 if map is None:
550 map = socket_map
551 for x in map.values():
552 try:
553 x.close()
554 except OSError, x:
555 if x.args[0] == EBADF:
556 pass
557 elif not ignore_all:
558 raise
559 except _reraised_exceptions:
560 raise
561 except:
562 if not ignore_all:
563 raise
564 map.clear()
566 # Asynchronous File I/O:
568 # After a little research (reading man pages on various unixen, and
569 # digging through the linux kernel), I've determined that select()
570 # isn't meant for doing asynchronous file i/o.
571 # Heartening, though - reading linux/mm/filemap.c shows that linux
572 # supports asynchronous read-ahead. So _MOST_ of the time, the data
573 # will be sitting in memory for us already when we go to read it.
575 # What other OS's (besides NT) support async file i/o? [VMS?]
577 # Regardless, this is useful for pipes, and stdin/stdout...
579 if os.name == 'posix':
580 import fcntl
582 class file_wrapper:
583 # Here we override just enough to make a file
584 # look like a socket for the purposes of asyncore.
585 # The passed fd is automatically os.dup()'d
587 def __init__(self, fd):
588 self.fd = os.dup(fd)
590 def recv(self, *args):
591 return os.read(self.fd, *args)
593 def send(self, *args):
594 return os.write(self.fd, *args)
596 read = recv
597 write = send
599 def close(self):
600 os.close(self.fd)
602 def fileno(self):
603 return self.fd
605 class file_dispatcher(dispatcher):
607 def __init__(self, fd, map=None):
608 dispatcher.__init__(self, None, map)
609 self.connected = True
610 try:
611 fd = fd.fileno()
612 except AttributeError:
613 pass
614 self.set_file(fd)
615 # set it to non-blocking mode
616 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
617 flags = flags | os.O_NONBLOCK
618 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
620 def set_file(self, fd):
621 self.socket = file_wrapper(fd)
622 self._fileno = self.socket.fileno()
623 self.add_channel()