2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
55 from errno
import EALREADY
, EINPROGRESS
, EWOULDBLOCK
, ECONNRESET
, \
56 ENOTCONN
, ESHUTDOWN
, EINTR
, EISCONN
, EBADF
, ECONNABORTED
, errorcode
64 res
= os
.strerror(err
)
65 if res
== 'Unknown error':
69 class ExitNow(Exception):
72 _reraised_exceptions
= (ExitNow
, KeyboardInterrupt, SystemExit)
76 obj
.handle_read_event()
77 except _reraised_exceptions
:
84 obj
.handle_write_event()
85 except _reraised_exceptions
:
92 obj
.handle_expt_event()
93 except _reraised_exceptions
:
98 def readwrite(obj
, flags
):
100 if flags
& select
.POLLIN
:
101 obj
.handle_read_event()
102 if flags
& select
.POLLOUT
:
103 obj
.handle_write_event()
104 if flags
& select
.POLLPRI
:
105 obj
.handle_expt_event()
106 if flags
& (select
.POLLHUP | select
.POLLERR | select
.POLLNVAL
):
108 except socket
.error
, e
:
109 if e
.args
[0] not in (EBADF
, ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
):
113 except _reraised_exceptions
:
118 def poll(timeout
=0.0, map=None):
122 r
= []; w
= []; e
= []
123 for fd
, obj
in map.items():
124 is_r
= obj
.readable()
125 is_w
= obj
.writable()
132 if [] == r
== w
== e
:
137 r
, w
, e
= select
.select(r
, w
, e
, timeout
)
138 except select
.error
, err
:
139 if err
.args
[0] != EINTR
:
162 def poll2(timeout
=0.0, map=None):
163 # Use the poll() support added to the select module in Python 2.0
166 if timeout
is not None:
167 # timeout is in milliseconds
168 timeout
= int(timeout
*1000)
169 pollster
= select
.poll()
171 for fd
, obj
in map.items():
174 flags |
= select
.POLLIN | select
.POLLPRI
176 flags |
= select
.POLLOUT
178 # Only check for exceptions if object was either readable
180 flags |
= select
.POLLERR | select
.POLLHUP | select
.POLLNVAL
181 pollster
.register(fd
, flags
)
183 r
= pollster
.poll(timeout
)
184 except select
.error
, err
:
185 if err
.args
[0] != EINTR
:
192 readwrite(obj
, flags
)
194 poll3
= poll2
# Alias for backward compatibility
196 def loop(timeout
=30.0, use_poll
=False, map=None, count
=None):
200 if use_poll
and hasattr(select
, 'poll'):
207 poll_fun(timeout
, map)
210 while map and count
> 0:
211 poll_fun(timeout
, map)
221 ignore_log_types
= frozenset(['warning'])
223 def __init__(self
, sock
=None, map=None):
225 self
._map
= socket_map
232 # Set to nonblocking just to make sure for cases where we
233 # get a socket from a blocking source.
235 self
.set_socket(sock
, map)
236 self
.connected
= True
237 # The constructor no longer requires that the socket
238 # passed be connected.
240 self
.addr
= sock
.getpeername()
241 except socket
.error
, err
:
242 if err
.args
[0] == ENOTCONN
:
243 # To handle the case where we got an unconnected
245 self
.connected
= False
247 # The socket is broken in some unknown way, alert
248 # the user and remove it from the map (to prevent
249 # polling of broken sockets).
250 self
.del_channel(map)
256 status
= [self
.__class
__.__module
__+"."+self
.__class
__.__name
__]
257 if self
.accepting
and self
.addr
:
258 status
.append('listening')
260 status
.append('connected')
261 if self
.addr
is not None:
263 status
.append('%s:%d' % self
.addr
)
265 status
.append(repr(self
.addr
))
266 return '<%s at %#x>' % (' '.join(status
), id(self
))
268 def add_channel(self
, map=None):
269 #self.log_info('adding channel %s' % self)
272 map[self
._fileno
] = self
274 def del_channel(self
, map=None):
279 #self.log_info('closing channel %d:%s' % (fd, self))
283 def create_socket(self
, family
, type):
284 self
.family_and_type
= family
, type
285 sock
= socket
.socket(family
, type)
287 self
.set_socket(sock
)
289 def set_socket(self
, sock
, map=None):
291 ## self.__dict__['socket'] = sock
292 self
._fileno
= sock
.fileno()
293 self
.add_channel(map)
295 def set_reuse_addr(self
):
296 # try to re-use a server port if possible
298 self
.socket
.setsockopt(
299 socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
,
300 self
.socket
.getsockopt(socket
.SOL_SOCKET
,
301 socket
.SO_REUSEADDR
) |
1
306 # ==================================================
307 # predicates for select()
308 # these are used as filters for the lists of sockets
309 # to pass to select().
310 # ==================================================
318 # ==================================================
319 # socket object methods.
320 # ==================================================
322 def listen(self
, num
):
323 self
.accepting
= True
324 if os
.name
== 'nt' and num
> 5:
326 return self
.socket
.listen(num
)
328 def bind(self
, addr
):
330 return self
.socket
.bind(addr
)
332 def connect(self
, address
):
333 self
.connected
= False
334 err
= self
.socket
.connect_ex(address
)
335 # XXX Should interpret Winsock return values
336 if err
in (EINPROGRESS
, EALREADY
, EWOULDBLOCK
):
338 if err
in (0, EISCONN
):
340 self
.handle_connect_event()
342 raise socket
.error(err
, errorcode
[err
])
345 # XXX can return either an address pair or None
347 conn
, addr
= self
.socket
.accept()
349 except socket
.error
, why
:
350 if why
.args
[0] == EWOULDBLOCK
:
355 def send(self
, data
):
357 result
= self
.socket
.send(data
)
359 except socket
.error
, why
:
360 if why
.args
[0] == EWOULDBLOCK
:
362 elif why
.args
[0] in (ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
):
368 def recv(self
, buffer_size
):
370 data
= self
.socket
.recv(buffer_size
)
372 # a closed connection is indicated by signaling
373 # a read condition, and having recv() return 0.
378 except socket
.error
, why
:
379 # winsock sometimes throws ENOTCONN
380 if why
.args
[0] in [ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
]:
387 self
.connected
= False
388 self
.accepting
= False
392 except socket
.error
, why
:
393 if why
.args
[0] not in (ENOTCONN
, EBADF
):
396 # cheap inheritance, used to pass all other attribute
397 # references to the underlying socket object.
398 def __getattr__(self
, attr
):
399 return getattr(self
.socket
, attr
)
401 # log and log_info may be overridden to provide more sophisticated
402 # logging and warning methods. In general, log is for 'hit' logging
403 # and 'log_info' is for informational, warning and error logging.
405 def log(self
, message
):
406 sys
.stderr
.write('log: %s\n' % str(message
))
408 def log_info(self
, message
, type='info'):
409 if type not in self
.ignore_log_types
:
410 print '%s: %s' % (type, message
)
412 def handle_read_event(self
):
414 # accepting sockets are never connected, they "spawn" new
415 # sockets that are connected
417 elif not self
.connected
:
418 self
.handle_connect_event()
423 def handle_connect_event(self
):
424 self
.connected
= True
425 self
.handle_connect()
427 def handle_write_event(self
):
429 # Accepting sockets shouldn't get a write event.
430 # We will pretend it didn't happen.
433 if not self
.connected
:
435 err
= self
.socket
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
437 raise socket
.error(err
, _strerror(err
))
439 self
.handle_connect_event()
442 def handle_expt_event(self
):
443 # handle_expt_event() is called if there might be an error on the
444 # socket, or if there is OOB data
445 # check for the error condition first
446 err
= self
.socket
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
448 # we can get here when select.select() says that there is an
449 # exceptional condition on the socket
450 # since there is an error, we'll go ahead and close the socket
451 # like we would in a subclassed handle_read() that received no
457 def handle_error(self
):
458 nil
, t
, v
, tbinfo
= compact_traceback()
460 # sometimes a user repr method will crash.
462 self_repr
= repr(self
)
464 self_repr
= '<__repr__(self) failed for object at %0x>' % id(self
)
467 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
477 def handle_expt(self
):
478 self
.log_info('unhandled incoming priority event', 'warning')
480 def handle_read(self
):
481 self
.log_info('unhandled read event', 'warning')
483 def handle_write(self
):
484 self
.log_info('unhandled write event', 'warning')
486 def handle_connect(self
):
487 self
.log_info('unhandled connect event', 'warning')
489 def handle_accept(self
):
490 self
.log_info('unhandled accept event', 'warning')
492 def handle_close(self
):
493 self
.log_info('unhandled close event', 'warning')
496 # ---------------------------------------------------------------------------
497 # adds simple buffered output capability, useful for simple clients.
498 # [for more sophisticated usage use asynchat.async_chat]
499 # ---------------------------------------------------------------------------
501 class dispatcher_with_send(dispatcher
):
503 def __init__(self
, sock
=None, map=None):
504 dispatcher
.__init
__(self
, sock
, map)
507 def initiate_send(self
):
509 num_sent
= dispatcher
.send(self
, self
.out_buffer
[:512])
510 self
.out_buffer
= self
.out_buffer
[num_sent
:]
512 def handle_write(self
):
516 return (not self
.connected
) or len(self
.out_buffer
)
518 def send(self
, data
):
520 self
.log_info('sending %s' % repr(data
))
521 self
.out_buffer
= self
.out_buffer
+ data
524 # ---------------------------------------------------------------------------
525 # used for debugging.
526 # ---------------------------------------------------------------------------
528 def compact_traceback():
529 t
, v
, tb
= sys
.exc_info()
531 if not tb
: # Must have a traceback
532 raise AssertionError("traceback does not exist")
535 tb
.tb_frame
.f_code
.co_filename
,
536 tb
.tb_frame
.f_code
.co_name
,
544 file, function
, line
= tbinfo
[-1]
545 info
= ' '.join(['[%s|%s|%s]' % x
for x
in tbinfo
])
546 return (file, function
, line
), t
, v
, info
548 def close_all(map=None, ignore_all
=False):
551 for x
in map.values():
555 if x
.args
[0] == EBADF
:
559 except _reraised_exceptions
:
566 # Asynchronous File I/O:
568 # After a little research (reading man pages on various unixen, and
569 # digging through the linux kernel), I've determined that select()
570 # isn't meant for doing asynchronous file i/o.
571 # Heartening, though - reading linux/mm/filemap.c shows that linux
572 # supports asynchronous read-ahead. So _MOST_ of the time, the data
573 # will be sitting in memory for us already when we go to read it.
575 # What other OS's (besides NT) support async file i/o? [VMS?]
577 # Regardless, this is useful for pipes, and stdin/stdout...
579 if os
.name
== 'posix':
583 # Here we override just enough to make a file
584 # look like a socket for the purposes of asyncore.
585 # The passed fd is automatically os.dup()'d
587 def __init__(self
, fd
):
590 def recv(self
, *args
):
591 return os
.read(self
.fd
, *args
)
593 def send(self
, *args
):
594 return os
.write(self
.fd
, *args
)
605 class file_dispatcher(dispatcher
):
607 def __init__(self
, fd
, map=None):
608 dispatcher
.__init
__(self
, None, map)
609 self
.connected
= True
612 except AttributeError:
615 # set it to non-blocking mode
616 flags
= fcntl
.fcntl(fd
, fcntl
.F_GETFL
, 0)
617 flags
= flags | os
.O_NONBLOCK
618 fcntl
.fcntl(fd
, fcntl
.F_SETFL
, flags
)
620 def set_file(self
, fd
):
621 self
.socket
= file_wrapper(fd
)
622 self
._fileno
= self
.socket
.fileno()