2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
55 from errno
import EALREADY
, EINPROGRESS
, EWOULDBLOCK
, ECONNRESET
, \
56 ENOTCONN
, ESHUTDOWN
, EINTR
, EISCONN
, EBADF
, ECONNABORTED
, errorcode
64 res
= os
.strerror(err
)
65 if res
== 'Unknown error':
69 class ExitNow(Exception):
74 obj
.handle_read_event()
75 except (ExitNow
, KeyboardInterrupt, SystemExit):
82 obj
.handle_write_event()
83 except (ExitNow
, KeyboardInterrupt, SystemExit):
90 obj
.handle_expt_event()
91 except (ExitNow
, KeyboardInterrupt, SystemExit):
96 def readwrite(obj
, flags
):
98 if flags
& (select
.POLLIN | select
.POLLPRI
):
99 obj
.handle_read_event()
100 if flags
& select
.POLLOUT
:
101 obj
.handle_write_event()
102 if flags
& (select
.POLLERR | select
.POLLNVAL
):
103 obj
.handle_expt_event()
104 if flags
& select
.POLLHUP
:
106 except (ExitNow
, KeyboardInterrupt, SystemExit):
111 def poll(timeout
=0.0, map=None):
115 r
= []; w
= []; e
= []
116 for fd
, obj
in map.items():
117 is_r
= obj
.readable()
118 is_w
= obj
.writable()
125 if [] == r
== w
== e
:
130 r
, w
, e
= select
.select(r
, w
, e
, timeout
)
131 except select
.error
, err
:
132 if err
.args
[0] != EINTR
:
155 def poll2(timeout
=0.0, map=None):
156 # Use the poll() support added to the select module in Python 2.0
159 if timeout
is not None:
160 # timeout is in milliseconds
161 timeout
= int(timeout
*1000)
162 pollster
= select
.poll()
164 for fd
, obj
in map.items():
167 flags |
= select
.POLLIN | select
.POLLPRI
169 flags |
= select
.POLLOUT
171 # Only check for exceptions if object was either readable
173 flags |
= select
.POLLERR | select
.POLLHUP | select
.POLLNVAL
174 pollster
.register(fd
, flags
)
176 r
= pollster
.poll(timeout
)
177 except select
.error
, err
:
178 if err
.args
[0] != EINTR
:
185 readwrite(obj
, flags
)
187 poll3
= poll2
# Alias for backward compatibility
189 def loop(timeout
=30.0, use_poll
=False, map=None, count
=None):
193 if use_poll
and hasattr(select
, 'poll'):
200 poll_fun(timeout
, map)
203 while map and count
> 0:
204 poll_fun(timeout
, map)
215 def __init__(self
, sock
=None, map=None):
217 self
._map
= socket_map
224 # Set to nonblocking just to make sure for cases where we
225 # get a socket from a blocking source.
227 self
.set_socket(sock
, map)
228 self
.connected
= True
229 # The constructor no longer requires that the socket
230 # passed be connected.
232 self
.addr
= sock
.getpeername()
233 except socket
.error
, err
:
234 if err
.args
[0] == ENOTCONN
:
235 # To handle the case where we got an unconnected
237 self
.connected
= False
239 # The socket is broken in some unknown way, alert
240 # the user and remove it from the map (to prevent
241 # polling of broken sockets).
242 self
.del_channel(map)
248 status
= [self
.__class
__.__module
__+"."+self
.__class
__.__name
__]
249 if self
.accepting
and self
.addr
:
250 status
.append('listening')
252 status
.append('connected')
253 if self
.addr
is not None:
255 status
.append('%s:%d' % self
.addr
)
257 status
.append(repr(self
.addr
))
258 return '<%s at %#x>' % (' '.join(status
), id(self
))
260 def add_channel(self
, map=None):
261 #self.log_info('adding channel %s' % self)
264 map[self
._fileno
] = self
266 def del_channel(self
, map=None):
271 #self.log_info('closing channel %d:%s' % (fd, self))
275 def create_socket(self
, family
, type):
276 self
.family_and_type
= family
, type
277 sock
= socket
.socket(family
, type)
279 self
.set_socket(sock
)
281 def set_socket(self
, sock
, map=None):
283 ## self.__dict__['socket'] = sock
284 self
._fileno
= sock
.fileno()
285 self
.add_channel(map)
287 def set_reuse_addr(self
):
288 # try to re-use a server port if possible
290 self
.socket
.setsockopt(
291 socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
,
292 self
.socket
.getsockopt(socket
.SOL_SOCKET
,
293 socket
.SO_REUSEADDR
) |
1
298 # ==================================================
299 # predicates for select()
300 # these are used as filters for the lists of sockets
301 # to pass to select().
302 # ==================================================
310 # ==================================================
311 # socket object methods.
312 # ==================================================
314 def listen(self
, num
):
315 self
.accepting
= True
316 if os
.name
== 'nt' and num
> 5:
318 return self
.socket
.listen(num
)
320 def bind(self
, addr
):
322 return self
.socket
.bind(addr
)
324 def connect(self
, address
):
325 self
.connected
= False
326 err
= self
.socket
.connect_ex(address
)
327 # XXX Should interpret Winsock return values
328 if err
in (EINPROGRESS
, EALREADY
, EWOULDBLOCK
):
330 if err
in (0, EISCONN
):
332 self
.handle_connect_event()
334 raise socket
.error(err
, errorcode
[err
])
337 # XXX can return either an address pair or None
339 conn
, addr
= self
.socket
.accept()
341 except socket
.error
, why
:
342 if why
.args
[0] == EWOULDBLOCK
:
347 def send(self
, data
):
349 result
= self
.socket
.send(data
)
351 except socket
.error
, why
:
352 if why
.args
[0] == EWOULDBLOCK
:
354 elif why
.args
[0] in (ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
):
360 def recv(self
, buffer_size
):
362 data
= self
.socket
.recv(buffer_size
)
364 # a closed connection is indicated by signaling
365 # a read condition, and having recv() return 0.
370 except socket
.error
, why
:
371 # winsock sometimes throws ENOTCONN
372 if why
.args
[0] in [ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
]:
379 self
.connected
= False
380 self
.accepting
= False
384 except socket
.error
, why
:
385 if why
.args
[0] not in (ENOTCONN
, EBADF
):
388 # cheap inheritance, used to pass all other attribute
389 # references to the underlying socket object.
390 def __getattr__(self
, attr
):
391 return getattr(self
.socket
, attr
)
393 # log and log_info may be overridden to provide more sophisticated
394 # logging and warning methods. In general, log is for 'hit' logging
395 # and 'log_info' is for informational, warning and error logging.
397 def log(self
, message
):
398 sys
.stderr
.write('log: %s\n' % str(message
))
400 def log_info(self
, message
, type='info'):
401 if __debug__
or type != 'info':
402 print '%s: %s' % (type, message
)
404 def handle_read_event(self
):
406 # accepting sockets are never connected, they "spawn" new
407 # sockets that are connected
409 elif not self
.connected
:
410 self
.handle_connect_event()
415 def handle_connect_event(self
):
416 self
.connected
= True
417 self
.handle_connect()
419 def handle_write_event(self
):
421 # Accepting sockets shouldn't get a write event.
422 # We will pretend it didn't happen.
425 if not self
.connected
:
427 err
= self
.socket
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
429 raise socket
.error(err
, _strerror(err
))
431 self
.handle_connect_event()
434 def handle_expt_event(self
):
435 # if the handle_expt is the same default worthless method,
436 # we'll not even bother calling it, we'll instead generate
440 y1
= self
.__class
__.handle_expt
.im_func
441 y2
= dispatcher
.handle_expt
.im_func
443 except AttributeError:
447 err
= self
.socket
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
450 raise socket
.error(err
, msg
)
454 def handle_error(self
):
455 nil
, t
, v
, tbinfo
= compact_traceback()
457 # sometimes a user repr method will crash.
459 self_repr
= repr(self
)
461 self_repr
= '<__repr__(self) failed for object at %0x>' % id(self
)
464 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
474 def handle_expt(self
):
475 self
.log_info('unhandled exception', 'warning')
477 def handle_read(self
):
478 self
.log_info('unhandled read event', 'warning')
480 def handle_write(self
):
481 self
.log_info('unhandled write event', 'warning')
483 def handle_connect(self
):
484 self
.log_info('unhandled connect event', 'warning')
486 def handle_accept(self
):
487 self
.log_info('unhandled accept event', 'warning')
489 def handle_close(self
):
490 self
.log_info('unhandled close event', 'warning')
493 # ---------------------------------------------------------------------------
494 # adds simple buffered output capability, useful for simple clients.
495 # [for more sophisticated usage use asynchat.async_chat]
496 # ---------------------------------------------------------------------------
498 class dispatcher_with_send(dispatcher
):
500 def __init__(self
, sock
=None, map=None):
501 dispatcher
.__init
__(self
, sock
, map)
504 def initiate_send(self
):
506 num_sent
= dispatcher
.send(self
, self
.out_buffer
[:512])
507 self
.out_buffer
= self
.out_buffer
[num_sent
:]
509 def handle_write(self
):
513 return (not self
.connected
) or len(self
.out_buffer
)
515 def send(self
, data
):
517 self
.log_info('sending %s' % repr(data
))
518 self
.out_buffer
= self
.out_buffer
+ data
521 # ---------------------------------------------------------------------------
522 # used for debugging.
523 # ---------------------------------------------------------------------------
525 def compact_traceback():
526 t
, v
, tb
= sys
.exc_info()
528 if not tb
: # Must have a traceback
529 raise AssertionError("traceback does not exist")
532 tb
.tb_frame
.f_code
.co_filename
,
533 tb
.tb_frame
.f_code
.co_name
,
541 file, function
, line
= tbinfo
[-1]
542 info
= ' '.join(['[%s|%s|%s]' % x
for x
in tbinfo
])
543 return (file, function
, line
), t
, v
, info
545 def close_all(map=None, ignore_all
=False):
548 for x
in map.values():
552 if x
.args
[0] == EBADF
:
556 except (ExitNow
, KeyboardInterrupt, SystemExit):
563 # Asynchronous File I/O:
565 # After a little research (reading man pages on various unixen, and
566 # digging through the linux kernel), I've determined that select()
567 # isn't meant for doing asynchronous file i/o.
568 # Heartening, though - reading linux/mm/filemap.c shows that linux
569 # supports asynchronous read-ahead. So _MOST_ of the time, the data
570 # will be sitting in memory for us already when we go to read it.
572 # What other OS's (besides NT) support async file i/o? [VMS?]
574 # Regardless, this is useful for pipes, and stdin/stdout...
576 if os
.name
== 'posix':
580 # Here we override just enough to make a file
581 # look like a socket for the purposes of asyncore.
582 # The passed fd is automatically os.dup()'d
584 def __init__(self
, fd
):
587 def recv(self
, *args
):
588 return os
.read(self
.fd
, *args
)
590 def send(self
, *args
):
591 return os
.write(self
.fd
, *args
)
602 class file_dispatcher(dispatcher
):
604 def __init__(self
, fd
, map=None):
605 dispatcher
.__init
__(self
, None, map)
606 self
.connected
= True
609 except AttributeError:
612 # set it to non-blocking mode
613 flags
= fcntl
.fcntl(fd
, fcntl
.F_GETFL
, 0)
614 flags
= flags | os
.O_NONBLOCK
615 fcntl
.fcntl(fd
, fcntl
.F_SETFL
, flags
)
617 def set_file(self
, fd
):
619 self
.socket
= file_wrapper(fd
)