move sections
[python/dscho.git] / Lib / test / test_telnetlib.py
blobfd5c6f7c3c5372896c2956a7445d7e3516db46e9
1 import socket
2 import telnetlib
3 import time
4 import Queue
6 from unittest import TestCase
7 from test import test_support
8 threading = test_support.import_module('threading')
10 HOST = test_support.HOST
11 EOF_sigil = object()
13 def server(evt, serv, dataq=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
16 2) [optional] if is not False, write the list of data from dataq.get()
17 to the socket.
18 3) set evt to true to let the parent know we're done
19 """
20 serv.listen(5)
21 evt.set()
22 try:
23 conn, addr = serv.accept()
24 if dataq:
25 data = ''
26 new_data = dataq.get(True, 0.5)
27 dataq.task_done()
28 for item in new_data:
29 if item == EOF_sigil:
30 break
31 if type(item) in [int, float]:
32 time.sleep(item)
33 else:
34 data += item
35 written = conn.send(data)
36 data = data[written:]
37 except socket.timeout:
38 pass
39 finally:
40 serv.close()
41 evt.set()
43 class GeneralTests(TestCase):
45 def setUp(self):
46 self.evt = threading.Event()
47 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48 self.sock.settimeout(3)
49 self.port = test_support.bind_port(self.sock)
50 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
51 self.thread.start()
52 self.evt.wait()
53 self.evt.clear()
54 time.sleep(.1)
56 def tearDown(self):
57 self.evt.wait()
58 self.thread.join()
60 def testBasic(self):
61 # connects
62 telnet = telnetlib.Telnet(HOST, self.port)
63 telnet.sock.close()
65 def testTimeoutDefault(self):
66 self.assertTrue(socket.getdefaulttimeout() is None)
67 socket.setdefaulttimeout(30)
68 try:
69 telnet = telnetlib.Telnet("localhost", self.port)
70 finally:
71 socket.setdefaulttimeout(None)
72 self.assertEqual(telnet.sock.gettimeout(), 30)
73 telnet.sock.close()
75 def testTimeoutNone(self):
76 # None, having other default
77 self.assertTrue(socket.getdefaulttimeout() is None)
78 socket.setdefaulttimeout(30)
79 try:
80 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
81 finally:
82 socket.setdefaulttimeout(None)
83 self.assertTrue(telnet.sock.gettimeout() is None)
84 telnet.sock.close()
86 def testTimeoutValue(self):
87 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
88 self.assertEqual(telnet.sock.gettimeout(), 30)
89 telnet.sock.close()
91 def testTimeoutOpen(self):
92 telnet = telnetlib.Telnet()
93 telnet.open("localhost", self.port, timeout=30)
94 self.assertEqual(telnet.sock.gettimeout(), 30)
95 telnet.sock.close()
97 def _read_setUp(self):
98 self.evt = threading.Event()
99 self.dataq = Queue.Queue()
100 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
101 self.sock.settimeout(3)
102 self.port = test_support.bind_port(self.sock)
103 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
104 self.thread.start()
105 self.evt.wait()
106 self.evt.clear()
107 time.sleep(.1)
109 def _read_tearDown(self):
110 self.evt.wait()
111 self.thread.join()
113 class ReadTests(TestCase):
114 setUp = _read_setUp
115 tearDown = _read_tearDown
117 # use a similar approach to testing timeouts as test_timeout.py
118 # these will never pass 100% but make the fuzz big enough that it is rare
119 block_long = 0.6
120 block_short = 0.3
121 def test_read_until_A(self):
123 read_until(expected, [timeout])
124 Read until the expected string has been seen, or a timeout is
125 hit (default is no timeout); may block.
127 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
128 self.dataq.put(want)
129 telnet = telnetlib.Telnet(HOST, self.port)
130 self.dataq.join()
131 data = telnet.read_until('match')
132 self.assertEqual(data, ''.join(want[:-2]))
134 def test_read_until_B(self):
135 # test the timeout - it does NOT raise socket.timeout
136 want = ['hello', self.block_long, 'not seen', EOF_sigil]
137 self.dataq.put(want)
138 telnet = telnetlib.Telnet(HOST, self.port)
139 self.dataq.join()
140 data = telnet.read_until('not seen', self.block_short)
141 self.assertEqual(data, want[0])
142 self.assertEqual(telnet.read_all(), 'not seen')
144 def test_read_all_A(self):
146 read_all()
147 Read all data until EOF; may block.
149 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
150 self.dataq.put(want)
151 telnet = telnetlib.Telnet(HOST, self.port)
152 self.dataq.join()
153 data = telnet.read_all()
154 self.assertEqual(data, ''.join(want[:-1]))
155 return
157 def _test_blocking(self, func):
158 self.dataq.put([self.block_long, EOF_sigil])
159 self.dataq.join()
160 start = time.time()
161 data = func()
162 self.assertTrue(self.block_short <= time.time() - start)
164 def test_read_all_B(self):
165 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
167 def test_read_all_C(self):
168 self.dataq.put([EOF_sigil])
169 telnet = telnetlib.Telnet(HOST, self.port)
170 self.dataq.join()
171 telnet.read_all()
172 telnet.read_all() # shouldn't raise
174 def test_read_some_A(self):
176 read_some()
177 Read at least one byte or EOF; may block.
179 # test 'at least one byte'
180 want = ['x' * 500, EOF_sigil]
181 self.dataq.put(want)
182 telnet = telnetlib.Telnet(HOST, self.port)
183 self.dataq.join()
184 data = telnet.read_all()
185 self.assertTrue(len(data) >= 1)
187 def test_read_some_B(self):
188 # test EOF
189 self.dataq.put([EOF_sigil])
190 telnet = telnetlib.Telnet(HOST, self.port)
191 self.dataq.join()
192 self.assertEqual('', telnet.read_some())
194 def test_read_some_C(self):
195 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
197 def _test_read_any_eager_A(self, func_name):
199 read_very_eager()
200 Read all data available already queued or on the socket,
201 without blocking.
203 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
204 expects = want[1] + want[2]
205 self.dataq.put(want)
206 telnet = telnetlib.Telnet(HOST, self.port)
207 self.dataq.join()
208 func = getattr(telnet, func_name)
209 data = ''
210 while True:
211 try:
212 data += func()
213 self.assertTrue(expects.startswith(data))
214 except EOFError:
215 break
216 self.assertEqual(expects, data)
218 def _test_read_any_eager_B(self, func_name):
219 # test EOF
220 self.dataq.put([EOF_sigil])
221 telnet = telnetlib.Telnet(HOST, self.port)
222 self.dataq.join()
223 time.sleep(self.block_short)
224 func = getattr(telnet, func_name)
225 self.assertRaises(EOFError, func)
227 # read_eager and read_very_eager make the same gaurantees
228 # (they behave differently but we only test the gaurantees)
229 def test_read_very_eager_A(self):
230 self._test_read_any_eager_A('read_very_eager')
231 def test_read_very_eager_B(self):
232 self._test_read_any_eager_B('read_very_eager')
233 def test_read_eager_A(self):
234 self._test_read_any_eager_A('read_eager')
235 def test_read_eager_B(self):
236 self._test_read_any_eager_B('read_eager')
237 # NB -- we need to test the IAC block which is mentioned in the docstring
238 # but not in the module docs
240 def _test_read_any_lazy_B(self, func_name):
241 self.dataq.put([EOF_sigil])
242 telnet = telnetlib.Telnet(HOST, self.port)
243 self.dataq.join()
244 func = getattr(telnet, func_name)
245 telnet.fill_rawq()
246 self.assertRaises(EOFError, func)
248 def test_read_lazy_A(self):
249 want = ['x' * 100, EOF_sigil]
250 self.dataq.put(want)
251 telnet = telnetlib.Telnet(HOST, self.port)
252 self.dataq.join()
253 time.sleep(self.block_short)
254 self.assertEqual('', telnet.read_lazy())
255 data = ''
256 while True:
257 try:
258 read_data = telnet.read_lazy()
259 data += read_data
260 if not read_data:
261 telnet.fill_rawq()
262 except EOFError:
263 break
264 self.assertTrue(want[0].startswith(data))
265 self.assertEqual(data, want[0])
267 def test_read_lazy_B(self):
268 self._test_read_any_lazy_B('read_lazy')
270 def test_read_very_lazy_A(self):
271 want = ['x' * 100, EOF_sigil]
272 self.dataq.put(want)
273 telnet = telnetlib.Telnet(HOST, self.port)
274 self.dataq.join()
275 time.sleep(self.block_short)
276 self.assertEqual('', telnet.read_very_lazy())
277 data = ''
278 while True:
279 try:
280 read_data = telnet.read_very_lazy()
281 except EOFError:
282 break
283 data += read_data
284 if not read_data:
285 telnet.fill_rawq()
286 self.assertEqual('', telnet.cookedq)
287 telnet.process_rawq()
288 self.assertTrue(want[0].startswith(data))
289 self.assertEqual(data, want[0])
291 def test_read_very_lazy_B(self):
292 self._test_read_any_lazy_B('read_very_lazy')
294 class nego_collector(object):
295 def __init__(self, sb_getter=None):
296 self.seen = ''
297 self.sb_getter = sb_getter
298 self.sb_seen = ''
300 def do_nego(self, sock, cmd, opt):
301 self.seen += cmd + opt
302 if cmd == tl.SE and self.sb_getter:
303 sb_data = self.sb_getter()
304 self.sb_seen += sb_data
306 tl = telnetlib
307 class OptionTests(TestCase):
308 setUp = _read_setUp
309 tearDown = _read_tearDown
310 # RFC 854 commands
311 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
313 def _test_command(self, data):
314 """ helper for testing IAC + cmd """
315 self.setUp()
316 self.dataq.put(data)
317 telnet = telnetlib.Telnet(HOST, self.port)
318 self.dataq.join()
319 nego = nego_collector()
320 telnet.set_option_negotiation_callback(nego.do_nego)
321 txt = telnet.read_all()
322 cmd = nego.seen
323 self.assertTrue(len(cmd) > 0) # we expect at least one command
324 self.assertIn(cmd[0], self.cmds)
325 self.assertEqual(cmd[1], tl.NOOPT)
326 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
327 nego.sb_getter = None # break the nego => telnet cycle
328 self.tearDown()
330 def test_IAC_commands(self):
331 # reset our setup
332 self.dataq.put([EOF_sigil])
333 telnet = telnetlib.Telnet(HOST, self.port)
334 self.dataq.join()
335 self.tearDown()
337 for cmd in self.cmds:
338 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
339 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
340 self._test_command([tl.IAC + cmd, EOF_sigil])
341 # all at once
342 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
343 self.assertEqual('', telnet.read_sb_data())
345 def test_SB_commands(self):
346 # RFC 855, subnegotiations portion
347 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
348 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
349 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
350 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
351 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
352 EOF_sigil,
354 self.dataq.put(send)
355 telnet = telnetlib.Telnet(HOST, self.port)
356 self.dataq.join()
357 nego = nego_collector(telnet.read_sb_data)
358 telnet.set_option_negotiation_callback(nego.do_nego)
359 txt = telnet.read_all()
360 self.assertEqual(txt, '')
361 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
362 self.assertEqual(nego.sb_seen, want_sb_data)
363 self.assertEqual('', telnet.read_sb_data())
364 nego.sb_getter = None # break the nego => telnet cycle
366 def test_main(verbose=None):
367 test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
369 if __name__ == '__main__':
370 test_main()