2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
54 from errno
import EALREADY
, EINPROGRESS
, EWOULDBLOCK
, ECONNRESET
, EINVAL
, \
55 ENOTCONN
, ESHUTDOWN
, EINTR
, EISCONN
, EBADF
, ECONNABORTED
, errorcode
64 return os
.strerror(err
)
65 except (ValueError, OverflowError, NameError):
68 return "Unknown error %s" %err
70 class ExitNow(Exception):
73 _reraised_exceptions
= (ExitNow
, KeyboardInterrupt, SystemExit)
77 obj
.handle_read_event()
78 except _reraised_exceptions
:
85 obj
.handle_write_event()
86 except _reraised_exceptions
:
93 obj
.handle_expt_event()
94 except _reraised_exceptions
:
99 def readwrite(obj
, flags
):
101 if flags
& select
.POLLIN
:
102 obj
.handle_read_event()
103 if flags
& select
.POLLOUT
:
104 obj
.handle_write_event()
105 if flags
& select
.POLLPRI
:
106 obj
.handle_expt_event()
107 if flags
& (select
.POLLHUP | select
.POLLERR | select
.POLLNVAL
):
109 except socket
.error
as e
:
110 if e
.args
[0] not in (EBADF
, ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
):
114 except _reraised_exceptions
:
119 def poll(timeout
=0.0, map=None):
123 r
= []; w
= []; e
= []
124 for fd
, obj
in list(map.items()):
125 is_r
= obj
.readable()
126 is_w
= obj
.writable()
133 if [] == r
== w
== e
:
138 r
, w
, e
= select
.select(r
, w
, e
, timeout
)
139 except select
.error
as err
:
140 if err
.args
[0] != EINTR
:
163 def poll2(timeout
=0.0, map=None):
164 # Use the poll() support added to the select module in Python 2.0
167 if timeout
is not None:
168 # timeout is in milliseconds
169 timeout
= int(timeout
*1000)
170 pollster
= select
.poll()
172 for fd
, obj
in list(map.items()):
175 flags |
= select
.POLLIN | select
.POLLPRI
177 flags |
= select
.POLLOUT
179 # Only check for exceptions if object was either readable
181 flags |
= select
.POLLERR | select
.POLLHUP | select
.POLLNVAL
182 pollster
.register(fd
, flags
)
184 r
= pollster
.poll(timeout
)
185 except select
.error
as err
:
186 if err
.args
[0] != EINTR
:
193 readwrite(obj
, flags
)
195 poll3
= poll2
# Alias for backward compatibility
197 def loop(timeout
=30.0, use_poll
=False, map=None, count
=None):
201 if use_poll
and hasattr(select
, 'poll'):
208 poll_fun(timeout
, map)
211 while map and count
> 0:
212 poll_fun(timeout
, map)
222 ignore_log_types
= frozenset(['warning'])
224 def __init__(self
, sock
=None, map=None):
226 self
._map
= socket_map
233 # Set to nonblocking just to make sure for cases where we
234 # get a socket from a blocking source.
236 self
.set_socket(sock
, map)
237 self
.connected
= True
238 # The constructor no longer requires that the socket
239 # passed be connected.
241 self
.addr
= sock
.getpeername()
242 except socket
.error
as err
:
243 if err
.args
[0] == ENOTCONN
:
244 # To handle the case where we got an unconnected
246 self
.connected
= False
248 # The socket is broken in some unknown way, alert
249 # the user and remove it from the map (to prevent
250 # polling of broken sockets).
251 self
.del_channel(map)
257 status
= [self
.__class
__.__module
__+"."+self
.__class
__.__name
__]
258 if self
.accepting
and self
.addr
:
259 status
.append('listening')
261 status
.append('connected')
262 if self
.addr
is not None:
264 status
.append('%s:%d' % self
.addr
)
266 status
.append(repr(self
.addr
))
267 return '<%s at %#x>' % (' '.join(status
), id(self
))
269 def add_channel(self
, map=None):
270 #self.log_info('adding channel %s' % self)
273 map[self
._fileno
] = self
275 def del_channel(self
, map=None):
280 #self.log_info('closing channel %d:%s' % (fd, self))
284 def create_socket(self
, family
, type):
285 self
.family_and_type
= family
, type
286 sock
= socket
.socket(family
, type)
288 self
.set_socket(sock
)
290 def set_socket(self
, sock
, map=None):
292 ## self.__dict__['socket'] = sock
293 self
._fileno
= sock
.fileno()
294 self
.add_channel(map)
296 def set_reuse_addr(self
):
297 # try to re-use a server port if possible
299 self
.socket
.setsockopt(
300 socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
,
301 self
.socket
.getsockopt(socket
.SOL_SOCKET
,
302 socket
.SO_REUSEADDR
) |
1
307 # ==================================================
308 # predicates for select()
309 # these are used as filters for the lists of sockets
310 # to pass to select().
311 # ==================================================
319 # ==================================================
320 # socket object methods.
321 # ==================================================
323 def listen(self
, num
):
324 self
.accepting
= True
325 if os
.name
== 'nt' and num
> 5:
327 return self
.socket
.listen(num
)
329 def bind(self
, addr
):
331 return self
.socket
.bind(addr
)
333 def connect(self
, address
):
334 self
.connected
= False
335 err
= self
.socket
.connect_ex(address
)
336 if err
in (EINPROGRESS
, EALREADY
, EWOULDBLOCK
) \
337 or err
== EINVAL
and os
.name
in ('nt', 'ce'):
339 if err
in (0, EISCONN
):
341 self
.handle_connect_event()
343 raise socket
.error(err
, errorcode
[err
])
346 # XXX can return either an address pair or None
348 conn
, addr
= self
.socket
.accept()
350 except socket
.error
as why
:
351 if why
.args
[0] == EWOULDBLOCK
:
356 def send(self
, data
):
358 result
= self
.socket
.send(data
)
360 except socket
.error
as why
:
361 if why
.args
[0] == EWOULDBLOCK
:
363 elif why
.args
[0] in (ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
):
369 def recv(self
, buffer_size
):
371 data
= self
.socket
.recv(buffer_size
)
373 # a closed connection is indicated by signaling
374 # a read condition, and having recv() return 0.
379 except socket
.error
as why
:
380 # winsock sometimes throws ENOTCONN
381 if why
.args
[0] in [ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
]:
388 self
.connected
= False
389 self
.accepting
= False
393 except socket
.error
as why
:
394 if why
.args
[0] not in (ENOTCONN
, EBADF
):
397 # cheap inheritance, used to pass all other attribute
398 # references to the underlying socket object.
399 def __getattr__(self
, attr
):
401 return getattr(self
.socket
, attr
)
402 except AttributeError:
403 raise AttributeError("%s instance has no attribute '%s'"
404 %(self
.__class
__.__name
__, attr
))
406 # log and log_info may be overridden to provide more sophisticated
407 # logging and warning methods. In general, log is for 'hit' logging
408 # and 'log_info' is for informational, warning and error logging.
410 def log(self
, message
):
411 sys
.stderr
.write('log: %s\n' % str(message
))
413 def log_info(self
, message
, type='info'):
414 if type not in self
.ignore_log_types
:
415 print('%s: %s' % (type, message
))
417 def handle_read_event(self
):
419 # accepting sockets are never connected, they "spawn" new
420 # sockets that are connected
422 elif not self
.connected
:
423 self
.handle_connect_event()
428 def handle_connect_event(self
):
429 err
= self
.socket
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
431 raise socket
.error(err
, _strerror(err
))
432 self
.handle_connect()
433 self
.connected
= True
435 def handle_write_event(self
):
437 # Accepting sockets shouldn't get a write event.
438 # We will pretend it didn't happen.
441 if not self
.connected
:
443 err
= self
.socket
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
445 raise socket
.error(err
, _strerror(err
))
447 self
.handle_connect_event()
450 def handle_expt_event(self
):
451 # handle_expt_event() is called if there might be an error on the
452 # socket, or if there is OOB data
453 # check for the error condition first
454 err
= self
.socket
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
456 # we can get here when select.select() says that there is an
457 # exceptional condition on the socket
458 # since there is an error, we'll go ahead and close the socket
459 # like we would in a subclassed handle_read() that received no
465 def handle_error(self
):
466 nil
, t
, v
, tbinfo
= compact_traceback()
468 # sometimes a user repr method will crash.
470 self_repr
= repr(self
)
472 self_repr
= '<__repr__(self) failed for object at %0x>' % id(self
)
475 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
485 def handle_expt(self
):
486 self
.log_info('unhandled incoming priority event', 'warning')
488 def handle_read(self
):
489 self
.log_info('unhandled read event', 'warning')
491 def handle_write(self
):
492 self
.log_info('unhandled write event', 'warning')
494 def handle_connect(self
):
495 self
.log_info('unhandled connect event', 'warning')
497 def handle_accept(self
):
498 self
.log_info('unhandled accept event', 'warning')
500 def handle_close(self
):
501 self
.log_info('unhandled close event', 'warning')
504 # ---------------------------------------------------------------------------
505 # adds simple buffered output capability, useful for simple clients.
506 # [for more sophisticated usage use asynchat.async_chat]
507 # ---------------------------------------------------------------------------
509 class dispatcher_with_send(dispatcher
):
511 def __init__(self
, sock
=None, map=None):
512 dispatcher
.__init
__(self
, sock
, map)
513 self
.out_buffer
= b
''
515 def initiate_send(self
):
517 num_sent
= dispatcher
.send(self
, self
.out_buffer
[:512])
518 self
.out_buffer
= self
.out_buffer
[num_sent
:]
520 def handle_write(self
):
524 return (not self
.connected
) or len(self
.out_buffer
)
526 def send(self
, data
):
528 self
.log_info('sending %s' % repr(data
))
529 self
.out_buffer
= self
.out_buffer
+ data
532 # ---------------------------------------------------------------------------
533 # used for debugging.
534 # ---------------------------------------------------------------------------
536 def compact_traceback():
537 t
, v
, tb
= sys
.exc_info()
539 if not tb
: # Must have a traceback
540 raise AssertionError("traceback does not exist")
543 tb
.tb_frame
.f_code
.co_filename
,
544 tb
.tb_frame
.f_code
.co_name
,
552 file, function
, line
= tbinfo
[-1]
553 info
= ' '.join(['[%s|%s|%s]' % x
for x
in tbinfo
])
554 return (file, function
, line
), t
, v
, info
556 def close_all(map=None, ignore_all
=False):
559 for x
in list(map.values()):
563 if x
.args
[0] == EBADF
:
567 except _reraised_exceptions
:
574 # Asynchronous File I/O:
576 # After a little research (reading man pages on various unixen, and
577 # digging through the linux kernel), I've determined that select()
578 # isn't meant for doing asynchronous file i/o.
579 # Heartening, though - reading linux/mm/filemap.c shows that linux
580 # supports asynchronous read-ahead. So _MOST_ of the time, the data
581 # will be sitting in memory for us already when we go to read it.
583 # What other OS's (besides NT) support async file i/o? [VMS?]
585 # Regardless, this is useful for pipes, and stdin/stdout...
587 if os
.name
== 'posix':
591 # Here we override just enough to make a file
592 # look like a socket for the purposes of asyncore.
593 # The passed fd is automatically os.dup()'d
595 def __init__(self
, fd
):
598 def recv(self
, *args
):
599 return os
.read(self
.fd
, *args
)
601 def send(self
, *args
):
602 return os
.write(self
.fd
, *args
)
604 def getsockopt(self
, level
, optname
, buflen
=None):
605 if (level
== socket
.SOL_SOCKET
and
606 optname
== socket
.SO_ERROR
and
609 raise NotImplementedError("Only asyncore specific behaviour "
621 class file_dispatcher(dispatcher
):
623 def __init__(self
, fd
, map=None):
624 dispatcher
.__init
__(self
, None, map)
625 self
.connected
= True
628 except AttributeError:
631 # set it to non-blocking mode
632 flags
= fcntl
.fcntl(fd
, fcntl
.F_GETFL
, 0)
633 flags
= flags | os
.O_NONBLOCK
634 fcntl
.fcntl(fd
, fcntl
.F_SETFL
, flags
)
636 def set_file(self
, fd
):
637 self
.socket
= file_wrapper(fd
)
638 self
._fileno
= self
.socket
.fileno()