10 from test
import test_support
11 from test
.test_support
import TESTFN
, run_unittest
, unlink
12 from StringIO
import StringIO
14 HOST
= test_support
.HOST
28 self
.socket
= dummysocket()
37 def handle_read_event(self
):
38 raise asyncore
.ExitNow()
40 handle_write_event
= handle_read_event
41 handle_close
= handle_read_event
42 handle_expt_event
= handle_read_event
46 self
.error_handled
= False
48 def handle_read_event(self
):
51 handle_write_event
= handle_read_event
52 handle_close
= handle_read_event
53 handle_expt_event
= handle_read_event
55 def handle_error(self
):
56 self
.error_handled
= True
58 # used when testing senders; just collects what it gets until newline is sent
59 def capture_server(evt
, buf
, serv
):
62 conn
, addr
= serv
.accept()
63 except socket
.timeout
:
68 r
, w
, e
= select
.select([conn
], [], [])
71 # keep everything except for the newline terminator
72 buf
.write(data
.replace('\n', ''))
84 class HelperFunctionTests(unittest
.TestCase
):
85 def test_readwriteexc(self
):
86 # Check exception handling behavior of read, write and _exception
88 # check that ExitNow exceptions in the object handler method
89 # bubbles all the way up through asyncore read/write/_exception calls
91 self
.assertRaises(asyncore
.ExitNow
, asyncore
.read
, tr1
)
92 self
.assertRaises(asyncore
.ExitNow
, asyncore
.write
, tr1
)
93 self
.assertRaises(asyncore
.ExitNow
, asyncore
._exception
, tr1
)
95 # check that an exception other than ExitNow in the object handler
96 # method causes the handle_error method to get called
99 self
.assertEqual(tr2
.error_handled
, True)
101 tr2
= crashingdummy()
103 self
.assertEqual(tr2
.error_handled
, True)
105 tr2
= crashingdummy()
106 asyncore
._exception
(tr2
)
107 self
.assertEqual(tr2
.error_handled
, True)
109 # asyncore.readwrite uses constants in the select module that
110 # are not present in Windows systems (see this thread:
111 # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
112 # These constants should be present as long as poll is available
114 if hasattr(select
, 'poll'):
115 def test_readwrite(self
):
116 # Check that correct methods are called by readwrite()
118 attributes
= ('read', 'expt', 'write', 'closed', 'error_handled')
121 (select
.POLLIN
, 'read'),
122 (select
.POLLPRI
, 'expt'),
123 (select
.POLLOUT
, 'write'),
124 (select
.POLLERR
, 'closed'),
125 (select
.POLLHUP
, 'closed'),
126 (select
.POLLNVAL
, 'closed'),
135 self
.error_handled
= False
137 def handle_read_event(self
):
140 def handle_write_event(self
):
143 def handle_close(self
):
146 def handle_expt_event(self
):
149 def handle_error(self
):
150 self
.error_handled
= True
152 for flag
, expectedattr
in expected
:
154 self
.assertEqual(getattr(tobj
, expectedattr
), False)
155 asyncore
.readwrite(tobj
, flag
)
157 # Only the attribute modified by the routine we expect to be
158 # called should be True.
159 for attr
in attributes
:
160 self
.assertEqual(getattr(tobj
, attr
), attr
==expectedattr
)
162 # check that ExitNow exceptions in the object handler method
163 # bubbles all the way up through asyncore readwrite call
165 self
.assertRaises(asyncore
.ExitNow
, asyncore
.readwrite
, tr1
, flag
)
167 # check that an exception other than ExitNow in the object handler
168 # method causes the handle_error method to get called
169 tr2
= crashingdummy()
170 self
.assertEqual(tr2
.error_handled
, False)
171 asyncore
.readwrite(tr2
, flag
)
172 self
.assertEqual(tr2
.error_handled
, True)
174 def test_closeall(self
):
175 self
.closeall_check(False)
177 def test_closeall_default(self
):
178 self
.closeall_check(True)
180 def closeall_check(self
, usedefault
):
181 # Check that close_all() closes everything in a given map
188 self
.assertEqual(c
.socket
.closed
, False)
192 socketmap
= asyncore
.socket_map
194 asyncore
.socket_map
= testmap
197 testmap
, asyncore
.socket_map
= asyncore
.socket_map
, socketmap
199 asyncore
.close_all(testmap
)
201 self
.assertEqual(len(testmap
), 0)
204 self
.assertEqual(c
.socket
.closed
, True)
206 def test_compact_traceback(self
):
208 raise Exception("I don't like spam!")
210 real_t
, real_v
, real_tb
= sys
.exc_info()
211 r
= asyncore
.compact_traceback()
213 self
.fail("Expected exception")
215 (f
, function
, line
), t
, v
, info
= r
216 self
.assertEqual(os
.path
.split(f
)[-1], 'test_asyncore.py')
217 self
.assertEqual(function
, 'test_compact_traceback')
218 self
.assertEqual(t
, real_t
)
219 self
.assertEqual(v
, real_v
)
220 self
.assertEqual(info
, '[%s|%s|%s]' % (f
, function
, line
))
223 class DispatcherTests(unittest
.TestCase
):
230 def test_basic(self
):
231 d
= asyncore
.dispatcher()
232 self
.assertEqual(d
.readable(), True)
233 self
.assertEqual(d
.writable(), True)
236 d
= asyncore
.dispatcher()
237 self
.assertEqual(repr(d
), '<asyncore.dispatcher at %#x>' % id(d
))
240 d
= asyncore
.dispatcher()
242 # capture output of dispatcher.log() (to stderr)
245 l1
= "Lovely spam! Wonderful spam!"
246 l2
= "I don't like spam!"
254 lines
= fp
.getvalue().splitlines()
255 self
.assertEquals(lines
, ['log: %s' % l1
, 'log: %s' % l2
])
257 def test_log_info(self
):
258 d
= asyncore
.dispatcher()
260 # capture output of dispatcher.log_info() (to stdout via print)
263 l1
= "Have you got anything without spam?"
264 l2
= "Why can't she have egg bacon spam and sausage?"
265 l3
= "THAT'S got spam in it!"
268 d
.log_info(l1
, 'EGGS')
270 d
.log_info(l3
, 'SPAM')
274 lines
= fp
.getvalue().splitlines()
275 expected
= ['EGGS: %s' % l1
, 'info: %s' % l2
, 'SPAM: %s' % l3
]
277 self
.assertEquals(lines
, expected
)
279 def test_unhandled(self
):
280 d
= asyncore
.dispatcher()
281 d
.ignore_log_types
= ()
283 # capture output of dispatcher.log_info() (to stdout via print)
296 lines
= fp
.getvalue().splitlines()
297 expected
= ['warning: unhandled incoming priority event',
298 'warning: unhandled read event',
299 'warning: unhandled write event',
300 'warning: unhandled connect event',
301 'warning: unhandled accept event']
302 self
.assertEquals(lines
, expected
)
306 class dispatcherwithsend_noread(asyncore
.dispatcher_with_send
):
310 def handle_connect(self
):
313 class DispatcherWithSendTests(unittest
.TestCase
):
322 @test_support.reap_threads
324 evt
= threading
.Event()
325 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
327 port
= test_support
.bind_port(sock
)
330 args
= (evt
, cap
, sock
)
331 t
= threading
.Thread(target
=capture_server
, args
=args
)
334 # wait a little longer for the server to initialize (it sometimes
335 # refuses connections on slow machines without this wait)
338 data
= "Suppose there isn't a 16-ton weight?"
339 d
= dispatcherwithsend_noread()
340 d
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
341 d
.connect((HOST
, port
))
343 # give time for socket to connect
351 while d
.out_buffer
and n
> 0:
357 self
.assertEqual(cap
.getvalue(), data
*2)
362 class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests
):
365 if hasattr(asyncore
, 'file_wrapper'):
366 class FileWrapperTest(unittest
.TestCase
):
368 self
.d
= "It's not dead, it's sleeping!"
369 file(TESTFN
, 'w').write(self
.d
)
375 fd
= os
.open(TESTFN
, os
.O_RDONLY
)
376 w
= asyncore
.file_wrapper(fd
)
379 self
.assertNotEqual(w
.fd
, fd
)
380 self
.assertNotEqual(w
.fileno(), fd
)
381 self
.assertEqual(w
.recv(13), "It's not dead")
382 self
.assertEqual(w
.read(6), ", it's")
384 self
.assertRaises(OSError, w
.read
, 1)
388 d2
= "I want to buy some cheese."
389 fd
= os
.open(TESTFN
, os
.O_WRONLY | os
.O_APPEND
)
390 w
= asyncore
.file_wrapper(fd
)
396 self
.assertEqual(file(TESTFN
).read(), self
.d
+ d1
+ d2
)
400 tests
= [HelperFunctionTests
, DispatcherTests
, DispatcherWithSendTests
,
401 DispatcherWithSendTests_UsePoll
]
402 if hasattr(asyncore
, 'file_wrapper'):
403 tests
.append(FileWrapperTest
)
407 if __name__
== "__main__":