6 from unittest
import TestCase
7 from test
import test_support
8 threading
= test_support
.import_module('threading')
10 HOST
= test_support
.HOST
13 def server(evt
, serv
, dataq
=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
16 2) [optional] if is not False, write the list of data from dataq.get()
18 3) set evt to true to let the parent know we're done
23 conn
, addr
= serv
.accept()
26 new_data
= dataq
.get(True, 0.5)
31 if type(item
) in [int, float]:
35 written
= conn
.send(data
)
37 except socket
.timeout
:
43 class GeneralTests(TestCase
):
46 self
.evt
= threading
.Event()
47 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
48 self
.sock
.settimeout(3)
49 self
.port
= test_support
.bind_port(self
.sock
)
50 self
.thread
= threading
.Thread(target
=server
, args
=(self
.evt
,self
.sock
))
62 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
65 def testTimeoutDefault(self
):
66 self
.assertTrue(socket
.getdefaulttimeout() is None)
67 socket
.setdefaulttimeout(30)
69 telnet
= telnetlib
.Telnet("localhost", self
.port
)
71 socket
.setdefaulttimeout(None)
72 self
.assertEqual(telnet
.sock
.gettimeout(), 30)
75 def testTimeoutNone(self
):
76 # None, having other default
77 self
.assertTrue(socket
.getdefaulttimeout() is None)
78 socket
.setdefaulttimeout(30)
80 telnet
= telnetlib
.Telnet(HOST
, self
.port
, timeout
=None)
82 socket
.setdefaulttimeout(None)
83 self
.assertTrue(telnet
.sock
.gettimeout() is None)
86 def testTimeoutValue(self
):
87 telnet
= telnetlib
.Telnet("localhost", self
.port
, timeout
=30)
88 self
.assertEqual(telnet
.sock
.gettimeout(), 30)
91 def testTimeoutOpen(self
):
92 telnet
= telnetlib
.Telnet()
93 telnet
.open("localhost", self
.port
, timeout
=30)
94 self
.assertEqual(telnet
.sock
.gettimeout(), 30)
97 def _read_setUp(self
):
98 self
.evt
= threading
.Event()
99 self
.dataq
= Queue
.Queue()
100 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
101 self
.sock
.settimeout(3)
102 self
.port
= test_support
.bind_port(self
.sock
)
103 self
.thread
= threading
.Thread(target
=server
, args
=(self
.evt
,self
.sock
, self
.dataq
))
109 def _read_tearDown(self
):
113 class ReadTests(TestCase
):
115 tearDown
= _read_tearDown
117 # use a similar approach to testing timeouts as test_timeout.py
118 # these will never pass 100% but make the fuzz big enough that it is rare
121 def test_read_until_A(self
):
123 read_until(expected, [timeout])
124 Read until the expected string has been seen, or a timeout is
125 hit (default is no timeout); may block.
127 want
= ['x' * 10, 'match', 'y' * 10, EOF_sigil
]
129 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
131 data
= telnet
.read_until('match')
132 self
.assertEqual(data
, ''.join(want
[:-2]))
134 def test_read_until_B(self
):
135 # test the timeout - it does NOT raise socket.timeout
136 want
= ['hello', self
.block_long
, 'not seen', EOF_sigil
]
138 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
140 data
= telnet
.read_until('not seen', self
.block_short
)
141 self
.assertEqual(data
, want
[0])
142 self
.assertEqual(telnet
.read_all(), 'not seen')
144 def test_read_all_A(self
):
147 Read all data until EOF; may block.
149 want
= ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil
]
151 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
153 data
= telnet
.read_all()
154 self
.assertEqual(data
, ''.join(want
[:-1]))
157 def _test_blocking(self
, func
):
158 self
.dataq
.put([self
.block_long
, EOF_sigil
])
162 self
.assertTrue(self
.block_short
<= time
.time() - start
)
164 def test_read_all_B(self
):
165 self
._test
_blocking
(telnetlib
.Telnet(HOST
, self
.port
).read_all
)
167 def test_read_all_C(self
):
168 self
.dataq
.put([EOF_sigil
])
169 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
172 telnet
.read_all() # shouldn't raise
174 def test_read_some_A(self
):
177 Read at least one byte or EOF; may block.
179 # test 'at least one byte'
180 want
= ['x' * 500, EOF_sigil
]
182 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
184 data
= telnet
.read_all()
185 self
.assertTrue(len(data
) >= 1)
187 def test_read_some_B(self
):
189 self
.dataq
.put([EOF_sigil
])
190 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
192 self
.assertEqual('', telnet
.read_some())
194 def test_read_some_C(self
):
195 self
._test
_blocking
(telnetlib
.Telnet(HOST
, self
.port
).read_some
)
197 def _test_read_any_eager_A(self
, func_name
):
200 Read all data available already queued or on the socket,
203 want
= [self
.block_long
, 'x' * 100, 'y' * 100, EOF_sigil
]
204 expects
= want
[1] + want
[2]
206 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
208 func
= getattr(telnet
, func_name
)
213 self
.assertTrue(expects
.startswith(data
))
216 self
.assertEqual(expects
, data
)
218 def _test_read_any_eager_B(self
, func_name
):
220 self
.dataq
.put([EOF_sigil
])
221 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
223 time
.sleep(self
.block_short
)
224 func
= getattr(telnet
, func_name
)
225 self
.assertRaises(EOFError, func
)
227 # read_eager and read_very_eager make the same gaurantees
228 # (they behave differently but we only test the gaurantees)
229 def test_read_very_eager_A(self
):
230 self
._test
_read
_any
_eager
_A
('read_very_eager')
231 def test_read_very_eager_B(self
):
232 self
._test
_read
_any
_eager
_B
('read_very_eager')
233 def test_read_eager_A(self
):
234 self
._test
_read
_any
_eager
_A
('read_eager')
235 def test_read_eager_B(self
):
236 self
._test
_read
_any
_eager
_B
('read_eager')
237 # NB -- we need to test the IAC block which is mentioned in the docstring
238 # but not in the module docs
240 def _test_read_any_lazy_B(self
, func_name
):
241 self
.dataq
.put([EOF_sigil
])
242 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
244 func
= getattr(telnet
, func_name
)
246 self
.assertRaises(EOFError, func
)
248 def test_read_lazy_A(self
):
249 want
= ['x' * 100, EOF_sigil
]
251 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
253 time
.sleep(self
.block_short
)
254 self
.assertEqual('', telnet
.read_lazy())
258 read_data
= telnet
.read_lazy()
264 self
.assertTrue(want
[0].startswith(data
))
265 self
.assertEqual(data
, want
[0])
267 def test_read_lazy_B(self
):
268 self
._test
_read
_any
_lazy
_B
('read_lazy')
270 def test_read_very_lazy_A(self
):
271 want
= ['x' * 100, EOF_sigil
]
273 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
275 time
.sleep(self
.block_short
)
276 self
.assertEqual('', telnet
.read_very_lazy())
280 read_data
= telnet
.read_very_lazy()
286 self
.assertEqual('', telnet
.cookedq
)
287 telnet
.process_rawq()
288 self
.assertTrue(want
[0].startswith(data
))
289 self
.assertEqual(data
, want
[0])
291 def test_read_very_lazy_B(self
):
292 self
._test
_read
_any
_lazy
_B
('read_very_lazy')
294 class nego_collector(object):
295 def __init__(self
, sb_getter
=None):
297 self
.sb_getter
= sb_getter
300 def do_nego(self
, sock
, cmd
, opt
):
301 self
.seen
+= cmd
+ opt
302 if cmd
== tl
.SE
and self
.sb_getter
:
303 sb_data
= self
.sb_getter()
304 self
.sb_seen
+= sb_data
307 class OptionTests(TestCase
):
309 tearDown
= _read_tearDown
311 cmds
= [tl
.AO
, tl
.AYT
, tl
.BRK
, tl
.EC
, tl
.EL
, tl
.GA
, tl
.IP
, tl
.NOP
]
313 def _test_command(self
, data
):
314 """ helper for testing IAC + cmd """
317 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
319 nego
= nego_collector()
320 telnet
.set_option_negotiation_callback(nego
.do_nego
)
321 txt
= telnet
.read_all()
323 self
.assertTrue(len(cmd
) > 0) # we expect at least one command
324 self
.assertIn(cmd
[0], self
.cmds
)
325 self
.assertEqual(cmd
[1], tl
.NOOPT
)
326 self
.assertEqual(len(''.join(data
[:-1])), len(txt
+ cmd
))
327 nego
.sb_getter
= None # break the nego => telnet cycle
330 def test_IAC_commands(self
):
332 self
.dataq
.put([EOF_sigil
])
333 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
337 for cmd
in self
.cmds
:
338 self
._test
_command
(['x' * 100, tl
.IAC
+ cmd
, 'y'*100, EOF_sigil
])
339 self
._test
_command
(['x' * 10, tl
.IAC
+ cmd
, 'y'*10, EOF_sigil
])
340 self
._test
_command
([tl
.IAC
+ cmd
, EOF_sigil
])
342 self
._test
_command
([tl
.IAC
+ cmd
for (cmd
) in self
.cmds
] + [EOF_sigil
])
343 self
.assertEqual('', telnet
.read_sb_data())
345 def test_SB_commands(self
):
346 # RFC 855, subnegotiations portion
347 send
= [tl
.IAC
+ tl
.SB
+ tl
.IAC
+ tl
.SE
,
348 tl
.IAC
+ tl
.SB
+ tl
.IAC
+ tl
.IAC
+ tl
.IAC
+ tl
.SE
,
349 tl
.IAC
+ tl
.SB
+ tl
.IAC
+ tl
.IAC
+ 'aa' + tl
.IAC
+ tl
.SE
,
350 tl
.IAC
+ tl
.SB
+ 'bb' + tl
.IAC
+ tl
.IAC
+ tl
.IAC
+ tl
.SE
,
351 tl
.IAC
+ tl
.SB
+ 'cc' + tl
.IAC
+ tl
.IAC
+ 'dd' + tl
.IAC
+ tl
.SE
,
355 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
357 nego
= nego_collector(telnet
.read_sb_data
)
358 telnet
.set_option_negotiation_callback(nego
.do_nego
)
359 txt
= telnet
.read_all()
360 self
.assertEqual(txt
, '')
361 want_sb_data
= tl
.IAC
+ tl
.IAC
+ 'aabb' + tl
.IAC
+ 'cc' + tl
.IAC
+ 'dd'
362 self
.assertEqual(nego
.sb_seen
, want_sb_data
)
363 self
.assertEqual('', telnet
.read_sb_data())
364 nego
.sb_getter
= None # break the nego => telnet cycle
366 def test_main(verbose
=None):
367 test_support
.run_unittest(GeneralTests
, ReadTests
, OptionTests
)
369 if __name__
== '__main__':