revert unrelated change to test_telnetlib
[python.git] / Lib / test / test_telnetlib.py
blob16f7f935442bab778a5e82debdf0ade89c8268c4
1 import socket
2 import threading
3 import telnetlib
4 import time
5 import Queue
7 from unittest import TestCase
8 from test import test_support
10 HOST = test_support.HOST
11 EOF_sigil = object()
13 def server(evt, serv, dataq=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
16 2) [optional] write all the data in dataq to the socket
17 terminate when dataq.get() returns EOF_sigil
18 3) set evt to true to let the parent know we're done
19 """
20 serv.listen(5)
21 evt.set()
22 try:
23 conn, addr = serv.accept()
24 if dataq:
25 data = ''
26 new_data = dataq.get(True, 0.5)
27 while new_data is not EOF_sigil:
28 if type(new_data) == str:
29 data += new_data
30 elif type(new_data) in [int, float]:
31 time.sleep(new_data)
32 written = conn.send(data)
33 data = data[written:]
34 new_data = dataq.get(True, 0.5)
35 except socket.timeout:
36 pass
37 finally:
38 serv.close()
39 evt.set()
41 def wibble_float(num):
42 ''' return a (low, high) tuple that are 1% more and 1% less of num '''
43 return num * 0.99, num * 1.01
45 class GeneralTests(TestCase):
47 def setUp(self):
48 self.evt = threading.Event()
49 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 self.sock.settimeout(3)
51 self.port = test_support.bind_port(self.sock)
52 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
53 self.thread.start()
54 self.evt.wait()
55 self.evt.clear()
56 time.sleep(.1)
58 def tearDown(self):
59 self.evt.wait()
60 self.thread.join()
62 def testBasic(self):
63 # connects
64 telnet = telnetlib.Telnet(HOST, self.port)
65 telnet.sock.close()
67 def testTimeoutDefault(self):
68 self.assertTrue(socket.getdefaulttimeout() is None)
69 socket.setdefaulttimeout(30)
70 try:
71 telnet = telnetlib.Telnet("localhost", self.port)
72 finally:
73 socket.setdefaulttimeout(None)
74 self.assertEqual(telnet.sock.gettimeout(), 30)
75 telnet.sock.close()
77 def testTimeoutNone(self):
78 # None, having other default
79 self.assertTrue(socket.getdefaulttimeout() is None)
80 socket.setdefaulttimeout(30)
81 try:
82 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
83 finally:
84 socket.setdefaulttimeout(None)
85 self.assertTrue(telnet.sock.gettimeout() is None)
86 telnet.sock.close()
88 def testTimeoutValue(self):
89 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
90 self.assertEqual(telnet.sock.gettimeout(), 30)
91 telnet.sock.close()
93 def testTimeoutOpen(self):
94 telnet = telnetlib.Telnet()
95 telnet.open("localhost", self.port, timeout=30)
96 self.assertEqual(telnet.sock.gettimeout(), 30)
97 telnet.sock.close()
99 def _read_setUp(self):
100 # the blocking constant should be tuned!
101 self.blocking_timeout = 0.0
102 self.evt = threading.Event()
103 self.dataq = Queue.Queue()
104 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105 self.sock.settimeout(3)
106 self.port = test_support.bind_port(self.sock)
107 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
108 self.thread.start()
109 self.evt.wait()
110 self.evt.clear()
111 time.sleep(.1)
113 def _read_tearDown(self):
114 self.evt.wait()
115 self.thread.join()
118 class ReadTests(TestCase):
119 setUp = _read_setUp
120 tearDown = _read_tearDown
122 def _test_blocking(self, func):
123 start = time.time()
124 self.dataq.put(self.blocking_timeout)
125 self.dataq.put(EOF_sigil)
126 data = func()
127 low, high = wibble_float(self.blocking_timeout)
128 self.assertTrue(time.time() - start >= low)
130 def test_read_until_A(self):
132 read_until(expected, [timeout])
133 Read until the expected string has been seen, or a timeout is
134 hit (default is no timeout); may block.
136 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
137 for item in want:
138 self.dataq.put(item)
139 telnet = telnetlib.Telnet(HOST, self.port)
140 data = telnet.read_until('match')
141 self.assertEqual(data, ''.join(want[:-2]))
143 def test_read_until_B(self):
144 # test the timeout - it does NOT raise socket.timeout
145 want = ['hello', self.blocking_timeout, EOF_sigil]
146 for item in want:
147 self.dataq.put(item)
148 telnet = telnetlib.Telnet(HOST, self.port)
149 start = time.time()
150 timeout = self.blocking_timeout / 2
151 data = telnet.read_until('not seen', timeout)
152 low, high = wibble_float(timeout)
153 self.assertTrue(low <= time.time() - high)
154 self.assertEqual(data, want[0])
156 def test_read_all_A(self):
158 read_all()
159 Read all data until EOF; may block.
161 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
162 for item in want:
163 self.dataq.put(item)
164 telnet = telnetlib.Telnet(HOST, self.port)
165 data = telnet.read_all()
166 self.assertEqual(data, ''.join(want[:-1]))
167 return
169 def test_read_all_B(self):
170 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
172 def test_read_some_A(self):
174 read_some()
175 Read at least one byte or EOF; may block.
177 # test 'at least one byte'
178 want = ['x' * 500, EOF_sigil]
179 for item in want:
180 self.dataq.put(item)
181 telnet = telnetlib.Telnet(HOST, self.port)
182 data = telnet.read_all()
183 self.assertTrue(len(data) >= 1)
185 def test_read_some_B(self):
186 # test EOF
187 self.dataq.put(EOF_sigil)
188 telnet = telnetlib.Telnet(HOST, self.port)
189 self.assertEqual('', telnet.read_some())
191 def test_read_all_C(self):
192 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
194 def _test_read_any_eager_A(self, func_name):
196 read_very_eager()
197 Read all data available already queued or on the socket,
198 without blocking.
200 # this never blocks so it should return eat part in turn
201 want = ['x' * 100, self.blocking_timeout/2, 'y' * 100, EOF_sigil]
202 expects = want[0] + want[2]
203 for item in want:
204 self.dataq.put(item)
205 telnet = telnetlib.Telnet(HOST, self.port)
206 func = getattr(telnet, func_name)
207 time.sleep(self.blocking_timeout/10)
208 data = ''
209 while True:
210 try:
211 data += func()
212 self.assertTrue(expects.startswith(data))
213 time.sleep(self.blocking_timeout)
214 except EOFError:
215 break
216 self.assertEqual(expects, data)
218 def _test_read_any_eager_B(self, func_name):
219 # test EOF
220 self.dataq.put(EOF_sigil)
221 time.sleep(self.blocking_timeout / 10)
222 telnet = telnetlib.Telnet(HOST, self.port)
223 func = getattr(telnet, func_name)
224 self.assertRaises(EOFError, func)
226 # read_eager and read_very_eager make the same gaurantees
227 # (they behave differently but we only test the gaurantees)
228 def test_read_very_eager_A(self):
229 self._test_read_any_eager_A('read_very_eager')
230 def test_read_very_eager_B(self):
231 self._test_read_any_eager_B('read_very_eager')
232 def test_read_eager_A(self):
233 self._test_read_any_eager_A('read_eager')
234 def test_read_eager_B(self):
235 self._test_read_any_eager_B('read_eager')
236 # NB -- we need to test the IAC block which is mentioned in the docstring
237 # but not in the module docs
239 def _test_read_any_lazy_A(self, func_name):
240 want = [self.blocking_timeout/2, 'x' * 100, EOF_sigil]
241 for item in want:
242 self.dataq.put(item)
243 telnet = telnetlib.Telnet(HOST, self.port)
244 func = getattr(telnet, func_name)
245 self.assertEqual('', func())
246 data = ''
247 while True:
248 time.sleep(self.blocking_timeout)
249 try:
250 telnet.fill_rawq()
251 data += func()
252 if not data:
253 break
254 except EOFError:
255 break
256 self.assertTrue(want[1].startswith(data))
257 return data, want[1]
259 def _test_read_any_lazy_B(self, func_name):
260 self.dataq.put(EOF_sigil)
261 telnet = telnetlib.Telnet(HOST, self.port)
262 func = getattr(telnet, func_name)
263 time.sleep(self.blocking_timeout/10)
264 telnet.fill_rawq()
265 self.assertRaises(EOFError, func)
267 # read_lazy and read_very_lazy make the samish gaurantees
268 def test_read_very_lazy_A(self):
269 data, want = self._test_read_any_lazy_A('read_very_lazy')
270 self.assertEqual(data, '')
271 def test_read_lazy(self):
272 data, want = self._test_read_any_lazy_A('read_lazy')
273 self.assertEqual(data, want)
274 def test_read_very_lazy_B(self):
275 self._test_read_any_lazy_B('read_very_lazy')
276 def test_read_lazy_B(self):
277 self._test_read_any_lazy_B('read_lazy')
279 class nego_collector(object):
280 def __init__(self, sb_getter=None):
281 self.seen = ''
282 self.sb_getter = sb_getter
283 self.sb_seen = ''
285 def do_nego(self, sock, cmd, opt):
286 self.seen += cmd + opt
287 if cmd == tl.SE and self.sb_getter:
288 sb_data = self.sb_getter()
289 self.sb_seen += sb_data
291 tl = telnetlib
292 class OptionTests(TestCase):
293 setUp = _read_setUp
294 tearDown = _read_tearDown
295 # RFC 854 commands
296 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
298 def _test_command(self, data):
299 """ helper for testing IAC + cmd """
300 self.setUp()
301 for item in data:
302 self.dataq.put(item)
303 telnet = telnetlib.Telnet(HOST, self.port)
304 nego = nego_collector()
305 telnet.set_option_negotiation_callback(nego.do_nego)
306 time.sleep(self.blocking_timeout/10)
307 txt = telnet.read_all()
308 cmd = nego.seen
309 self.assertTrue(len(cmd) > 0) # we expect at least one command
310 self.assertTrue(cmd[0] in self.cmds)
311 self.assertEqual(cmd[1], tl.NOOPT)
312 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
313 self.tearDown()
315 def test_IAC_commands(self):
316 # reset our setup
317 self.dataq.put(EOF_sigil)
318 telnet = telnetlib.Telnet(HOST, self.port)
319 self.tearDown()
321 for cmd in self.cmds:
322 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
323 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
324 self._test_command([tl.IAC + cmd, EOF_sigil])
325 # all at once
326 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
328 def test_SB_commands(self):
329 # RFC 855, subnegotiations portion
330 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
331 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
332 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
333 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
334 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
335 EOF_sigil,
337 for item in send:
338 self.dataq.put(item)
339 telnet = telnetlib.Telnet(HOST, self.port)
340 nego = nego_collector(telnet.read_sb_data)
341 telnet.set_option_negotiation_callback(nego.do_nego)
342 time.sleep(self.blocking_timeout/10)
343 txt = telnet.read_all()
344 self.assertEqual(txt, '')
345 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
346 self.assertEqual(nego.sb_seen, want_sb_data)
348 def test_main(verbose=None):
349 test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
351 if __name__ == '__main__':
352 test_main()