1 # -*- Mode: Python; tab-width: 4 -*-
2 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 r
"""A class supporting chat-style (command/response) protocols.
30 This class adds support for 'chat' style protocols - where one side
31 sends a 'command', and the other sends a response (examples would be
32 the common internet protocols - smtp, nntp, ftp, etc..).
34 The handle_read() method looks at the input stream for the current
35 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
36 for multi-line output), calling self.found_terminator() on its
40 Say you build an async nntp client using this class. At the start
41 of the connection, you'll have self.terminator set to '\r\n', in
42 order to process the single-line greeting. Just before issuing a
43 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST
44 command will be accumulated (using your own 'collect_incoming_data'
45 method) up to the terminator, and then control will be returned to
46 you - by calling your self.found_terminator() method.
51 from collections
import deque
53 class async_chat (asyncore
.dispatcher
):
54 """This is an abstract class. You must derive from this class, and add
55 the two methods collect_incoming_data() and found_terminator()"""
57 # these are overridable defaults
59 ac_in_buffer_size
= 4096
60 ac_out_buffer_size
= 4096
62 def __init__ (self
, conn
=None):
63 self
.ac_in_buffer
= ''
64 self
.ac_out_buffer
= ''
65 self
.producer_fifo
= fifo()
66 asyncore
.dispatcher
.__init
__ (self
, conn
)
68 def collect_incoming_data(self
, data
):
69 raise NotImplementedError, "must be implemented in subclass"
71 def found_terminator(self
):
72 raise NotImplementedError, "must be implemented in subclass"
74 def set_terminator (self
, term
):
75 "Set the input delimiter. Can be a fixed string of any length, an integer, or None"
76 self
.terminator
= term
78 def get_terminator (self
):
79 return self
.terminator
81 # grab some more data from the socket,
82 # throw it to the collector method,
83 # check for the terminator,
84 # if found, transition to the next state.
86 def handle_read (self
):
89 data
= self
.recv (self
.ac_in_buffer_size
)
90 except socket
.error
, why
:
94 self
.ac_in_buffer
= self
.ac_in_buffer
+ data
96 # Continue to search for self.terminator in self.ac_in_buffer,
97 # while calling self.collect_incoming_data. The while loop
98 # is necessary because we might read several data+terminator
99 # combos with a single recv(1024).
101 while self
.ac_in_buffer
:
102 lb
= len(self
.ac_in_buffer
)
103 terminator
= self
.get_terminator()
105 # no terminator, collect it all
106 self
.collect_incoming_data (self
.ac_in_buffer
)
107 self
.ac_in_buffer
= ''
108 elif isinstance(terminator
, int) or isinstance(terminator
, long):
112 self
.collect_incoming_data (self
.ac_in_buffer
)
113 self
.ac_in_buffer
= ''
114 self
.terminator
= self
.terminator
- lb
116 self
.collect_incoming_data (self
.ac_in_buffer
[:n
])
117 self
.ac_in_buffer
= self
.ac_in_buffer
[n
:]
119 self
.found_terminator()
122 # 1) end of buffer matches terminator exactly:
123 # collect data, transition
124 # 2) end of buffer matches some prefix:
125 # collect data to the prefix
126 # 3) end of buffer does not match any prefix:
128 terminator_len
= len(terminator
)
129 index
= self
.ac_in_buffer
.find(terminator
)
131 # we found the terminator
133 # don't bother reporting the empty string (source of subtle bugs)
134 self
.collect_incoming_data (self
.ac_in_buffer
[:index
])
135 self
.ac_in_buffer
= self
.ac_in_buffer
[index
+terminator_len
:]
136 # This does the Right Thing if the terminator is changed here.
137 self
.found_terminator()
139 # check for a prefix of the terminator
140 index
= find_prefix_at_end (self
.ac_in_buffer
, terminator
)
143 # we found a prefix, collect up to the prefix
144 self
.collect_incoming_data (self
.ac_in_buffer
[:-index
])
145 self
.ac_in_buffer
= self
.ac_in_buffer
[-index
:]
148 # no prefix, collect it all
149 self
.collect_incoming_data (self
.ac_in_buffer
)
150 self
.ac_in_buffer
= ''
152 def handle_write (self
):
153 self
.initiate_send ()
155 def handle_close (self
):
158 def push (self
, data
):
159 self
.producer_fifo
.push (simple_producer (data
))
162 def push_with_producer (self
, producer
):
163 self
.producer_fifo
.push (producer
)
167 "predicate for inclusion in the readable for select()"
168 return (len(self
.ac_in_buffer
) <= self
.ac_in_buffer_size
)
171 "predicate for inclusion in the writable for select()"
172 # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
173 # this is about twice as fast, though not as clear.
175 (self
.ac_out_buffer
== '') and
176 self
.producer_fifo
.is_empty() and
180 def close_when_done (self
):
181 "automatically close this channel once the outgoing queue is empty"
182 self
.producer_fifo
.push (None)
184 # refill the outgoing buffer by calling the more() method
185 # of the first producer in the queue
186 def refill_buffer (self
):
188 if len(self
.producer_fifo
):
189 p
= self
.producer_fifo
.first()
190 # a 'None' in the producer fifo is a sentinel,
191 # telling us to close the channel.
193 if not self
.ac_out_buffer
:
194 self
.producer_fifo
.pop()
197 elif isinstance(p
, str):
198 self
.producer_fifo
.pop()
199 self
.ac_out_buffer
= self
.ac_out_buffer
+ p
203 self
.ac_out_buffer
= self
.ac_out_buffer
+ data
206 self
.producer_fifo
.pop()
210 def initiate_send (self
):
211 obs
= self
.ac_out_buffer_size
212 # try to refill the buffer
213 if (len (self
.ac_out_buffer
) < obs
):
216 if self
.ac_out_buffer
and self
.connected
:
217 # try to send the buffer
219 num_sent
= self
.send (self
.ac_out_buffer
[:obs
])
221 self
.ac_out_buffer
= self
.ac_out_buffer
[num_sent
:]
223 except socket
.error
, why
:
227 def discard_buffers (self
):
229 self
.ac_in_buffer
= ''
230 self
.ac_out_buffer
= ''
231 while self
.producer_fifo
:
232 self
.producer_fifo
.pop()
235 class simple_producer
:
237 def __init__ (self
, data
, buffer_size
=512):
239 self
.buffer_size
= buffer_size
242 if len (self
.data
) > self
.buffer_size
:
243 result
= self
.data
[:self
.buffer_size
]
244 self
.data
= self
.data
[self
.buffer_size
:]
252 def __init__ (self
, list=None):
256 self
.list = deque(list)
259 return len(self
.list)
267 def push (self
, data
):
268 self
.list.append(data
)
272 return (1, self
.list.popleft())
276 # Given 'haystack', see if any prefix of 'needle' is at its end. This
277 # assumes an exact match has already been checked. Return the number of
278 # characters matched.
280 # f_p_a_e ("qwerty\r", "\r\n") => 1
281 # f_p_a_e ("qwertydkjf", "\r\n") => 0
282 # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
284 # this could maybe be made faster with a computed regex?
285 # [answer: no; circa Python-2.0, Jan 2001]
286 # new python: 28961/s
287 # old python: 18307/s
291 def find_prefix_at_end (haystack
, needle
):
293 while l
and not haystack
.endswith(needle
[:l
]):