Backport importlib to at least Python 2.5 by getting rid of use of str.format.
[python.git] / Lib / asyncore.py
blob75fd5aedd2cbdd9a87489ed8b0b9e0d1a6f9dfbb
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import select
50 import socket
51 import sys
52 import time
54 import os
55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
56 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode
58 try:
59 socket_map
60 except NameError:
61 socket_map = {}
63 def _strerror(err):
64 res = os.strerror(err)
65 if res == 'Unknown error':
66 res = errorcode[err]
67 return res
69 class ExitNow(Exception):
70 pass
72 def read(obj):
73 try:
74 obj.handle_read_event()
75 except (ExitNow, KeyboardInterrupt, SystemExit):
76 raise
77 except:
78 obj.handle_error()
80 def write(obj):
81 try:
82 obj.handle_write_event()
83 except (ExitNow, KeyboardInterrupt, SystemExit):
84 raise
85 except:
86 obj.handle_error()
88 def _exception(obj):
89 try:
90 obj.handle_expt_event()
91 except (ExitNow, KeyboardInterrupt, SystemExit):
92 raise
93 except:
94 obj.handle_error()
96 def readwrite(obj, flags):
97 try:
98 if flags & (select.POLLIN | select.POLLPRI):
99 obj.handle_read_event()
100 if flags & select.POLLOUT:
101 obj.handle_write_event()
102 if flags & (select.POLLERR | select.POLLNVAL):
103 obj.handle_expt_event()
104 if flags & select.POLLHUP:
105 obj.handle_close()
106 except (ExitNow, KeyboardInterrupt, SystemExit):
107 raise
108 except:
109 obj.handle_error()
111 def poll(timeout=0.0, map=None):
112 if map is None:
113 map = socket_map
114 if map:
115 r = []; w = []; e = []
116 for fd, obj in map.items():
117 is_r = obj.readable()
118 is_w = obj.writable()
119 if is_r:
120 r.append(fd)
121 if is_w:
122 w.append(fd)
123 if is_r or is_w:
124 e.append(fd)
125 if [] == r == w == e:
126 time.sleep(timeout)
127 return
129 try:
130 r, w, e = select.select(r, w, e, timeout)
131 except select.error, err:
132 if err.args[0] != EINTR:
133 raise
134 else:
135 return
137 for fd in r:
138 obj = map.get(fd)
139 if obj is None:
140 continue
141 read(obj)
143 for fd in w:
144 obj = map.get(fd)
145 if obj is None:
146 continue
147 write(obj)
149 for fd in e:
150 obj = map.get(fd)
151 if obj is None:
152 continue
153 _exception(obj)
155 def poll2(timeout=0.0, map=None):
156 # Use the poll() support added to the select module in Python 2.0
157 if map is None:
158 map = socket_map
159 if timeout is not None:
160 # timeout is in milliseconds
161 timeout = int(timeout*1000)
162 pollster = select.poll()
163 if map:
164 for fd, obj in map.items():
165 flags = 0
166 if obj.readable():
167 flags |= select.POLLIN | select.POLLPRI
168 if obj.writable():
169 flags |= select.POLLOUT
170 if flags:
171 # Only check for exceptions if object was either readable
172 # or writable.
173 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
174 pollster.register(fd, flags)
175 try:
176 r = pollster.poll(timeout)
177 except select.error, err:
178 if err.args[0] != EINTR:
179 raise
180 r = []
181 for fd, flags in r:
182 obj = map.get(fd)
183 if obj is None:
184 continue
185 readwrite(obj, flags)
187 poll3 = poll2 # Alias for backward compatibility
189 def loop(timeout=30.0, use_poll=False, map=None, count=None):
190 if map is None:
191 map = socket_map
193 if use_poll and hasattr(select, 'poll'):
194 poll_fun = poll2
195 else:
196 poll_fun = poll
198 if count is None:
199 while map:
200 poll_fun(timeout, map)
202 else:
203 while map and count > 0:
204 poll_fun(timeout, map)
205 count = count - 1
207 class dispatcher:
209 debug = False
210 connected = False
211 accepting = False
212 closing = False
213 addr = None
215 def __init__(self, sock=None, map=None):
216 if map is None:
217 self._map = socket_map
218 else:
219 self._map = map
221 self._fileno = None
223 if sock:
224 # Set to nonblocking just to make sure for cases where we
225 # get a socket from a blocking source.
226 sock.setblocking(0)
227 self.set_socket(sock, map)
228 self.connected = True
229 # The constructor no longer requires that the socket
230 # passed be connected.
231 try:
232 self.addr = sock.getpeername()
233 except socket.error, err:
234 if err.args[0] == ENOTCONN:
235 # To handle the case where we got an unconnected
236 # socket.
237 self.connected = False
238 else:
239 # The socket is broken in some unknown way, alert
240 # the user and remove it from the map (to prevent
241 # polling of broken sockets).
242 self.del_channel(map)
243 raise
244 else:
245 self.socket = None
247 def __repr__(self):
248 status = [self.__class__.__module__+"."+self.__class__.__name__]
249 if self.accepting and self.addr:
250 status.append('listening')
251 elif self.connected:
252 status.append('connected')
253 if self.addr is not None:
254 try:
255 status.append('%s:%d' % self.addr)
256 except TypeError:
257 status.append(repr(self.addr))
258 return '<%s at %#x>' % (' '.join(status), id(self))
260 def add_channel(self, map=None):
261 #self.log_info('adding channel %s' % self)
262 if map is None:
263 map = self._map
264 map[self._fileno] = self
266 def del_channel(self, map=None):
267 fd = self._fileno
268 if map is None:
269 map = self._map
270 if fd in map:
271 #self.log_info('closing channel %d:%s' % (fd, self))
272 del map[fd]
273 self._fileno = None
275 def create_socket(self, family, type):
276 self.family_and_type = family, type
277 sock = socket.socket(family, type)
278 sock.setblocking(0)
279 self.set_socket(sock)
281 def set_socket(self, sock, map=None):
282 self.socket = sock
283 ## self.__dict__['socket'] = sock
284 self._fileno = sock.fileno()
285 self.add_channel(map)
287 def set_reuse_addr(self):
288 # try to re-use a server port if possible
289 try:
290 self.socket.setsockopt(
291 socket.SOL_SOCKET, socket.SO_REUSEADDR,
292 self.socket.getsockopt(socket.SOL_SOCKET,
293 socket.SO_REUSEADDR) | 1
295 except socket.error:
296 pass
298 # ==================================================
299 # predicates for select()
300 # these are used as filters for the lists of sockets
301 # to pass to select().
302 # ==================================================
304 def readable(self):
305 return True
307 def writable(self):
308 return True
310 # ==================================================
311 # socket object methods.
312 # ==================================================
314 def listen(self, num):
315 self.accepting = True
316 if os.name == 'nt' and num > 5:
317 num = 5
318 return self.socket.listen(num)
320 def bind(self, addr):
321 self.addr = addr
322 return self.socket.bind(addr)
324 def connect(self, address):
325 self.connected = False
326 err = self.socket.connect_ex(address)
327 # XXX Should interpret Winsock return values
328 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
329 return
330 if err in (0, EISCONN):
331 self.addr = address
332 self.handle_connect_event()
333 else:
334 raise socket.error(err, errorcode[err])
336 def accept(self):
337 # XXX can return either an address pair or None
338 try:
339 conn, addr = self.socket.accept()
340 return conn, addr
341 except socket.error, why:
342 if why.args[0] == EWOULDBLOCK:
343 pass
344 else:
345 raise
347 def send(self, data):
348 try:
349 result = self.socket.send(data)
350 return result
351 except socket.error, why:
352 if why.args[0] == EWOULDBLOCK:
353 return 0
354 elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
355 self.handle_close()
356 return 0
357 else:
358 raise
360 def recv(self, buffer_size):
361 try:
362 data = self.socket.recv(buffer_size)
363 if not data:
364 # a closed connection is indicated by signaling
365 # a read condition, and having recv() return 0.
366 self.handle_close()
367 return ''
368 else:
369 return data
370 except socket.error, why:
371 # winsock sometimes throws ENOTCONN
372 if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
373 self.handle_close()
374 return ''
375 else:
376 raise
378 def close(self):
379 self.connected = False
380 self.accepting = False
381 self.del_channel()
382 try:
383 self.socket.close()
384 except socket.error, why:
385 if why.args[0] not in (ENOTCONN, EBADF):
386 raise
388 # cheap inheritance, used to pass all other attribute
389 # references to the underlying socket object.
390 def __getattr__(self, attr):
391 return getattr(self.socket, attr)
393 # log and log_info may be overridden to provide more sophisticated
394 # logging and warning methods. In general, log is for 'hit' logging
395 # and 'log_info' is for informational, warning and error logging.
397 def log(self, message):
398 sys.stderr.write('log: %s\n' % str(message))
400 def log_info(self, message, type='info'):
401 if __debug__ or type != 'info':
402 print '%s: %s' % (type, message)
404 def handle_read_event(self):
405 if self.accepting:
406 # accepting sockets are never connected, they "spawn" new
407 # sockets that are connected
408 self.handle_accept()
409 elif not self.connected:
410 self.handle_connect_event()
411 self.handle_read()
412 else:
413 self.handle_read()
415 def handle_connect_event(self):
416 self.connected = True
417 self.handle_connect()
419 def handle_write_event(self):
420 if self.accepting:
421 # Accepting sockets shouldn't get a write event.
422 # We will pretend it didn't happen.
423 return
425 if not self.connected:
426 #check for errors
427 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
428 if err != 0:
429 raise socket.error(err, _strerror(err))
431 self.handle_connect_event()
432 self.handle_write()
434 def handle_expt_event(self):
435 # if the handle_expt is the same default worthless method,
436 # we'll not even bother calling it, we'll instead generate
437 # a useful error
438 x = True
439 try:
440 y1 = self.__class__.handle_expt.im_func
441 y2 = dispatcher.handle_expt.im_func
442 x = y1 is y2
443 except AttributeError:
444 pass
446 if x:
447 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
448 msg = _strerror(err)
450 raise socket.error(err, msg)
451 else:
452 self.handle_expt()
454 def handle_error(self):
455 nil, t, v, tbinfo = compact_traceback()
457 # sometimes a user repr method will crash.
458 try:
459 self_repr = repr(self)
460 except:
461 self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
463 self.log_info(
464 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
465 self_repr,
468 tbinfo
470 'error'
472 self.handle_close()
474 def handle_expt(self):
475 self.log_info('unhandled exception', 'warning')
477 def handle_read(self):
478 self.log_info('unhandled read event', 'warning')
480 def handle_write(self):
481 self.log_info('unhandled write event', 'warning')
483 def handle_connect(self):
484 self.log_info('unhandled connect event', 'warning')
486 def handle_accept(self):
487 self.log_info('unhandled accept event', 'warning')
489 def handle_close(self):
490 self.log_info('unhandled close event', 'warning')
491 self.close()
493 # ---------------------------------------------------------------------------
494 # adds simple buffered output capability, useful for simple clients.
495 # [for more sophisticated usage use asynchat.async_chat]
496 # ---------------------------------------------------------------------------
498 class dispatcher_with_send(dispatcher):
500 def __init__(self, sock=None, map=None):
501 dispatcher.__init__(self, sock, map)
502 self.out_buffer = ''
504 def initiate_send(self):
505 num_sent = 0
506 num_sent = dispatcher.send(self, self.out_buffer[:512])
507 self.out_buffer = self.out_buffer[num_sent:]
509 def handle_write(self):
510 self.initiate_send()
512 def writable(self):
513 return (not self.connected) or len(self.out_buffer)
515 def send(self, data):
516 if self.debug:
517 self.log_info('sending %s' % repr(data))
518 self.out_buffer = self.out_buffer + data
519 self.initiate_send()
521 # ---------------------------------------------------------------------------
522 # used for debugging.
523 # ---------------------------------------------------------------------------
525 def compact_traceback():
526 t, v, tb = sys.exc_info()
527 tbinfo = []
528 if not tb: # Must have a traceback
529 raise AssertionError("traceback does not exist")
530 while tb:
531 tbinfo.append((
532 tb.tb_frame.f_code.co_filename,
533 tb.tb_frame.f_code.co_name,
534 str(tb.tb_lineno)
536 tb = tb.tb_next
538 # just to be safe
539 del tb
541 file, function, line = tbinfo[-1]
542 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
543 return (file, function, line), t, v, info
545 def close_all(map=None, ignore_all=False):
546 if map is None:
547 map = socket_map
548 for x in map.values():
549 try:
550 x.close()
551 except OSError, x:
552 if x.args[0] == EBADF:
553 pass
554 elif not ignore_all:
555 raise
556 except (ExitNow, KeyboardInterrupt, SystemExit):
557 raise
558 except:
559 if not ignore_all:
560 raise
561 map.clear()
563 # Asynchronous File I/O:
565 # After a little research (reading man pages on various unixen, and
566 # digging through the linux kernel), I've determined that select()
567 # isn't meant for doing asynchronous file i/o.
568 # Heartening, though - reading linux/mm/filemap.c shows that linux
569 # supports asynchronous read-ahead. So _MOST_ of the time, the data
570 # will be sitting in memory for us already when we go to read it.
572 # What other OS's (besides NT) support async file i/o? [VMS?]
574 # Regardless, this is useful for pipes, and stdin/stdout...
576 if os.name == 'posix':
577 import fcntl
579 class file_wrapper:
580 # Here we override just enough to make a file
581 # look like a socket for the purposes of asyncore.
582 # The passed fd is automatically os.dup()'d
584 def __init__(self, fd):
585 self.fd = os.dup(fd)
587 def recv(self, *args):
588 return os.read(self.fd, *args)
590 def send(self, *args):
591 return os.write(self.fd, *args)
593 read = recv
594 write = send
596 def close(self):
597 os.close(self.fd)
599 def fileno(self):
600 return self.fd
602 class file_dispatcher(dispatcher):
604 def __init__(self, fd, map=None):
605 dispatcher.__init__(self, None, map)
606 self.connected = True
607 try:
608 fd = fd.fileno()
609 except AttributeError:
610 pass
611 self.set_file(fd)
612 # set it to non-blocking mode
613 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
614 flags = flags | os.O_NONBLOCK
615 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
617 def set_file(self, fd):
618 self.socket = file_wrapper(fd)
619 self._fileno = self.socket.fileno()
620 self.add_channel()