10 from test
import test_support
11 from test
.test_support
import TESTFN
, run_unittest
, unlink
12 from StringIO
import StringIO
14 HOST
= test_support
.HOST
28 self
.socket
= dummysocket()
37 def handle_read_event(self
):
38 raise asyncore
.ExitNow()
40 handle_write_event
= handle_read_event
41 handle_close
= handle_read_event
42 handle_expt_event
= handle_read_event
46 self
.error_handled
= False
48 def handle_read_event(self
):
51 handle_write_event
= handle_read_event
52 handle_close
= handle_read_event
53 handle_expt_event
= handle_read_event
55 def handle_error(self
):
56 self
.error_handled
= True
58 # used when testing senders; just collects what it gets until newline is sent
59 def capture_server(evt
, buf
, serv
):
62 conn
, addr
= serv
.accept()
63 except socket
.timeout
:
68 r
, w
, e
= select
.select([conn
], [], [])
71 # keep everything except for the newline terminator
72 buf
.write(data
.replace('\n', ''))
84 class HelperFunctionTests(unittest
.TestCase
):
85 def test_readwriteexc(self
):
86 # Check exception handling behavior of read, write and _exception
88 # check that ExitNow exceptions in the object handler method
89 # bubbles all the way up through asyncore read/write/_exception calls
91 self
.assertRaises(asyncore
.ExitNow
, asyncore
.read
, tr1
)
92 self
.assertRaises(asyncore
.ExitNow
, asyncore
.write
, tr1
)
93 self
.assertRaises(asyncore
.ExitNow
, asyncore
._exception
, tr1
)
95 # check that an exception other than ExitNow in the object handler
96 # method causes the handle_error method to get called
99 self
.assertEqual(tr2
.error_handled
, True)
101 tr2
= crashingdummy()
103 self
.assertEqual(tr2
.error_handled
, True)
105 tr2
= crashingdummy()
106 asyncore
._exception
(tr2
)
107 self
.assertEqual(tr2
.error_handled
, True)
109 # asyncore.readwrite uses constants in the select module that
110 # are not present in Windows systems (see this thread:
111 # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
112 # These constants should be present as long as poll is available
114 if hasattr(select
, 'poll'):
115 def test_readwrite(self
):
116 # Check that correct methods are called by readwrite()
125 def handle_read_event(self
):
128 def handle_write_event(self
):
131 def handle_close(self
):
134 def handle_expt_event(self
):
137 def handle_error(self
):
138 self
.error_handled
= True
140 for flag
in (select
.POLLIN
, select
.POLLPRI
):
142 self
.assertEqual(tobj
.read
, False)
143 asyncore
.readwrite(tobj
, flag
)
144 self
.assertEqual(tobj
.read
, True)
146 # check that ExitNow exceptions in the object handler method
147 # bubbles all the way up through asyncore readwrite call
149 self
.assertRaises(asyncore
.ExitNow
, asyncore
.readwrite
, tr1
, flag
)
151 # check that an exception other than ExitNow in the object handler
152 # method causes the handle_error method to get called
153 tr2
= crashingdummy()
154 asyncore
.readwrite(tr2
, flag
)
155 self
.assertEqual(tr2
.error_handled
, True)
158 self
.assertEqual(tobj
.write
, False)
159 asyncore
.readwrite(tobj
, select
.POLLOUT
)
160 self
.assertEqual(tobj
.write
, True)
162 # check that ExitNow exceptions in the object handler method
163 # bubbles all the way up through asyncore readwrite call
165 self
.assertRaises(asyncore
.ExitNow
, asyncore
.readwrite
, tr1
,
168 # check that an exception other than ExitNow in the object handler
169 # method causes the handle_error method to get called
170 tr2
= crashingdummy()
171 asyncore
.readwrite(tr2
, select
.POLLOUT
)
172 self
.assertEqual(tr2
.error_handled
, True)
174 for flag
in (select
.POLLERR
, select
.POLLHUP
, select
.POLLNVAL
):
176 self
.assertEqual((tobj
.expt
, tobj
.closed
)[flag
== select
.POLLHUP
], False)
177 asyncore
.readwrite(tobj
, flag
)
178 self
.assertEqual((tobj
.expt
, tobj
.closed
)[flag
== select
.POLLHUP
], True)
180 # check that ExitNow exceptions in the object handler method
181 # bubbles all the way up through asyncore readwrite calls
183 self
.assertRaises(asyncore
.ExitNow
, asyncore
.readwrite
, tr1
, flag
)
185 # check that an exception other than ExitNow in the object handler
186 # method causes the handle_error method to get called
187 tr2
= crashingdummy()
188 asyncore
.readwrite(tr2
, flag
)
189 self
.assertEqual(tr2
.error_handled
, True)
191 def test_closeall(self
):
192 self
.closeall_check(False)
194 def test_closeall_default(self
):
195 self
.closeall_check(True)
197 def closeall_check(self
, usedefault
):
198 # Check that close_all() closes everything in a given map
205 self
.assertEqual(c
.socket
.closed
, False)
209 socketmap
= asyncore
.socket_map
211 asyncore
.socket_map
= testmap
214 testmap
, asyncore
.socket_map
= asyncore
.socket_map
, socketmap
216 asyncore
.close_all(testmap
)
218 self
.assertEqual(len(testmap
), 0)
221 self
.assertEqual(c
.socket
.closed
, True)
223 def test_compact_traceback(self
):
225 raise Exception("I don't like spam!")
227 real_t
, real_v
, real_tb
= sys
.exc_info()
228 r
= asyncore
.compact_traceback()
230 self
.fail("Expected exception")
232 (f
, function
, line
), t
, v
, info
= r
233 self
.assertEqual(os
.path
.split(f
)[-1], 'test_asyncore.py')
234 self
.assertEqual(function
, 'test_compact_traceback')
235 self
.assertEqual(t
, real_t
)
236 self
.assertEqual(v
, real_v
)
237 self
.assertEqual(info
, '[%s|%s|%s]' % (f
, function
, line
))
240 class DispatcherTests(unittest
.TestCase
):
247 def test_basic(self
):
248 d
= asyncore
.dispatcher()
249 self
.assertEqual(d
.readable(), True)
250 self
.assertEqual(d
.writable(), True)
253 d
= asyncore
.dispatcher()
254 self
.assertEqual(repr(d
), '<asyncore.dispatcher at %#x>' % id(d
))
257 d
= asyncore
.dispatcher()
259 # capture output of dispatcher.log() (to stderr)
262 l1
= "Lovely spam! Wonderful spam!"
263 l2
= "I don't like spam!"
271 lines
= fp
.getvalue().splitlines()
272 self
.assertEquals(lines
, ['log: %s' % l1
, 'log: %s' % l2
])
274 def test_log_info(self
):
275 d
= asyncore
.dispatcher()
277 # capture output of dispatcher.log_info() (to stdout via print)
280 l1
= "Have you got anything without spam?"
281 l2
= "Why can't she have egg bacon spam and sausage?"
282 l3
= "THAT'S got spam in it!"
285 d
.log_info(l1
, 'EGGS')
287 d
.log_info(l3
, 'SPAM')
291 lines
= fp
.getvalue().splitlines()
293 expected
= ['EGGS: %s' % l1
, 'info: %s' % l2
, 'SPAM: %s' % l3
]
295 expected
= ['EGGS: %s' % l1
, 'SPAM: %s' % l3
]
297 self
.assertEquals(lines
, expected
)
299 def test_unhandled(self
):
300 d
= asyncore
.dispatcher()
302 # capture output of dispatcher.log_info() (to stdout via print)
315 lines
= fp
.getvalue().splitlines()
316 expected
= ['warning: unhandled exception',
317 'warning: unhandled read event',
318 'warning: unhandled write event',
319 'warning: unhandled connect event',
320 'warning: unhandled accept event']
321 self
.assertEquals(lines
, expected
)
325 class dispatcherwithsend_noread(asyncore
.dispatcher_with_send
):
329 def handle_connect(self
):
332 class DispatcherWithSendTests(unittest
.TestCase
):
342 self
.evt
= threading
.Event()
343 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
344 self
.sock
.settimeout(3)
345 self
.port
= test_support
.bind_port(self
.sock
)
348 args
= (self
.evt
, cap
, self
.sock
)
349 threading
.Thread(target
=capture_server
, args
=args
).start()
351 # wait a little longer for the server to initialize (it sometimes
352 # refuses connections on slow machines without this wait)
355 data
= "Suppose there isn't a 16-ton weight?"
356 d
= dispatcherwithsend_noread()
357 d
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
358 d
.connect((HOST
, self
.port
))
360 # give time for socket to connect
368 while d
.out_buffer
and n
> 0:
374 self
.assertEqual(cap
.getvalue(), data
*2)
377 class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests
):
380 if hasattr(asyncore
, 'file_wrapper'):
381 class FileWrapperTest(unittest
.TestCase
):
383 self
.d
= "It's not dead, it's sleeping!"
384 file(TESTFN
, 'w').write(self
.d
)
390 fd
= os
.open(TESTFN
, os
.O_RDONLY
)
391 w
= asyncore
.file_wrapper(fd
)
394 self
.assertNotEqual(w
.fd
, fd
)
395 self
.assertNotEqual(w
.fileno(), fd
)
396 self
.assertEqual(w
.recv(13), "It's not dead")
397 self
.assertEqual(w
.read(6), ", it's")
399 self
.assertRaises(OSError, w
.read
, 1)
403 d2
= "I want to buy some cheese."
404 fd
= os
.open(TESTFN
, os
.O_WRONLY | os
.O_APPEND
)
405 w
= asyncore
.file_wrapper(fd
)
411 self
.assertEqual(file(TESTFN
).read(), self
.d
+ d1
+ d2
)
415 tests
= [HelperFunctionTests
, DispatcherTests
, DispatcherWithSendTests
,
416 DispatcherWithSendTests_UsePoll
]
417 if hasattr(asyncore
, 'file_wrapper'):
418 tests
.append(FileWrapperTest
)
422 if __name__
== "__main__":