7 from unittest
import TestCase
8 from test
import test_support
10 HOST
= test_support
.HOST
13 def server(evt
, serv
, dataq
=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
16 2) [optional] write all the data in dataq to the socket
17 terminate when dataq.get() returns EOF_sigil
18 3) set evt to true to let the parent know we're done
23 conn
, addr
= serv
.accept()
26 new_data
= dataq
.get(True, 0.5)
27 while new_data
is not EOF_sigil
:
28 if type(new_data
) == str:
30 elif type(new_data
) in [int, float]:
32 written
= conn
.send(data
)
34 new_data
= dataq
.get(True, 0.5)
35 except socket
.timeout
:
41 def wibble_float(num
):
42 ''' return a (low, high) tuple that are 1% more and 1% less of num '''
43 return num
* 0.99, num
* 1.01
45 class GeneralTests(TestCase
):
48 self
.evt
= threading
.Event()
49 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
50 self
.sock
.settimeout(3)
51 self
.port
= test_support
.bind_port(self
.sock
)
52 self
.thread
= threading
.Thread(target
=server
, args
=(self
.evt
,self
.sock
))
64 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
67 def testTimeoutDefault(self
):
68 self
.assertTrue(socket
.getdefaulttimeout() is None)
69 socket
.setdefaulttimeout(30)
71 telnet
= telnetlib
.Telnet("localhost", self
.port
)
73 socket
.setdefaulttimeout(None)
74 self
.assertEqual(telnet
.sock
.gettimeout(), 30)
77 def testTimeoutNone(self
):
78 # None, having other default
79 self
.assertTrue(socket
.getdefaulttimeout() is None)
80 socket
.setdefaulttimeout(30)
82 telnet
= telnetlib
.Telnet(HOST
, self
.port
, timeout
=None)
84 socket
.setdefaulttimeout(None)
85 self
.assertTrue(telnet
.sock
.gettimeout() is None)
88 def testTimeoutValue(self
):
89 telnet
= telnetlib
.Telnet("localhost", self
.port
, timeout
=30)
90 self
.assertEqual(telnet
.sock
.gettimeout(), 30)
93 def testTimeoutOpen(self
):
94 telnet
= telnetlib
.Telnet()
95 telnet
.open("localhost", self
.port
, timeout
=30)
96 self
.assertEqual(telnet
.sock
.gettimeout(), 30)
99 def _read_setUp(self
):
100 # the blocking constant should be tuned!
101 self
.blocking_timeout
= 0.0
102 self
.evt
= threading
.Event()
103 self
.dataq
= Queue
.Queue()
104 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
105 self
.sock
.settimeout(3)
106 self
.port
= test_support
.bind_port(self
.sock
)
107 self
.thread
= threading
.Thread(target
=server
, args
=(self
.evt
,self
.sock
, self
.dataq
))
113 def _read_tearDown(self
):
118 class ReadTests(TestCase
):
120 tearDown
= _read_tearDown
122 def _test_blocking(self
, func
):
124 self
.dataq
.put(self
.blocking_timeout
)
125 self
.dataq
.put(EOF_sigil
)
127 low
, high
= wibble_float(self
.blocking_timeout
)
128 self
.assertTrue(time
.time() - start
>= low
)
130 def test_read_until_A(self
):
132 read_until(expected, [timeout])
133 Read until the expected string has been seen, or a timeout is
134 hit (default is no timeout); may block.
136 want
= ['x' * 10, 'match', 'y' * 10, EOF_sigil
]
139 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
140 data
= telnet
.read_until('match')
141 self
.assertEqual(data
, ''.join(want
[:-2]))
143 def test_read_until_B(self
):
144 # test the timeout - it does NOT raise socket.timeout
145 want
= ['hello', self
.blocking_timeout
, EOF_sigil
]
148 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
150 timeout
= self
.blocking_timeout
/ 2
151 data
= telnet
.read_until('not seen', timeout
)
152 low
, high
= wibble_float(timeout
)
153 self
.assertTrue(low
<= time
.time() - high
)
154 self
.assertEqual(data
, want
[0])
156 def test_read_all_A(self
):
159 Read all data until EOF; may block.
161 want
= ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil
]
164 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
165 data
= telnet
.read_all()
166 self
.assertEqual(data
, ''.join(want
[:-1]))
169 def test_read_all_B(self
):
170 self
._test
_blocking
(telnetlib
.Telnet(HOST
, self
.port
).read_all
)
172 def test_read_some_A(self
):
175 Read at least one byte or EOF; may block.
177 # test 'at least one byte'
178 want
= ['x' * 500, EOF_sigil
]
181 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
182 data
= telnet
.read_all()
183 self
.assertTrue(len(data
) >= 1)
185 def test_read_some_B(self
):
187 self
.dataq
.put(EOF_sigil
)
188 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
189 self
.assertEqual('', telnet
.read_some())
191 def test_read_all_C(self
):
192 self
._test
_blocking
(telnetlib
.Telnet(HOST
, self
.port
).read_some
)
194 def _test_read_any_eager_A(self
, func_name
):
197 Read all data available already queued or on the socket,
200 # this never blocks so it should return eat part in turn
201 want
= ['x' * 100, self
.blocking_timeout
/2, 'y' * 100, EOF_sigil
]
202 expects
= want
[0] + want
[2]
205 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
206 func
= getattr(telnet
, func_name
)
207 time
.sleep(self
.blocking_timeout
/10)
212 self
.assertTrue(expects
.startswith(data
))
213 time
.sleep(self
.blocking_timeout
)
216 self
.assertEqual(expects
, data
)
218 def _test_read_any_eager_B(self
, func_name
):
220 self
.dataq
.put(EOF_sigil
)
221 time
.sleep(self
.blocking_timeout
/ 10)
222 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
223 func
= getattr(telnet
, func_name
)
224 self
.assertRaises(EOFError, func
)
226 # read_eager and read_very_eager make the same gaurantees
227 # (they behave differently but we only test the gaurantees)
228 def test_read_very_eager_A(self
):
229 self
._test
_read
_any
_eager
_A
('read_very_eager')
230 def test_read_very_eager_B(self
):
231 self
._test
_read
_any
_eager
_B
('read_very_eager')
232 def test_read_eager_A(self
):
233 self
._test
_read
_any
_eager
_A
('read_eager')
234 def test_read_eager_B(self
):
235 self
._test
_read
_any
_eager
_B
('read_eager')
236 # NB -- we need to test the IAC block which is mentioned in the docstring
237 # but not in the module docs
239 def _test_read_any_lazy_A(self
, func_name
):
240 want
= [self
.blocking_timeout
/2, 'x' * 100, EOF_sigil
]
243 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
244 func
= getattr(telnet
, func_name
)
245 self
.assertEqual('', func())
248 time
.sleep(self
.blocking_timeout
)
256 self
.assertTrue(want
[1].startswith(data
))
259 def _test_read_any_lazy_B(self
, func_name
):
260 self
.dataq
.put(EOF_sigil
)
261 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
262 func
= getattr(telnet
, func_name
)
263 time
.sleep(self
.blocking_timeout
/10)
265 self
.assertRaises(EOFError, func
)
267 # read_lazy and read_very_lazy make the samish gaurantees
268 def test_read_very_lazy_A(self
):
269 data
, want
= self
._test
_read
_any
_lazy
_A
('read_very_lazy')
270 self
.assertEqual(data
, '')
271 def test_read_lazy(self
):
272 data
, want
= self
._test
_read
_any
_lazy
_A
('read_lazy')
273 self
.assertEqual(data
, want
)
274 def test_read_very_lazy_B(self
):
275 self
._test
_read
_any
_lazy
_B
('read_very_lazy')
276 def test_read_lazy_B(self
):
277 self
._test
_read
_any
_lazy
_B
('read_lazy')
279 class nego_collector(object):
280 def __init__(self
, sb_getter
=None):
282 self
.sb_getter
= sb_getter
285 def do_nego(self
, sock
, cmd
, opt
):
286 self
.seen
+= cmd
+ opt
287 if cmd
== tl
.SE
and self
.sb_getter
:
288 sb_data
= self
.sb_getter()
289 self
.sb_seen
+= sb_data
292 class OptionTests(TestCase
):
294 tearDown
= _read_tearDown
296 cmds
= [tl
.AO
, tl
.AYT
, tl
.BRK
, tl
.EC
, tl
.EL
, tl
.GA
, tl
.IP
, tl
.NOP
]
298 def _test_command(self
, data
):
299 """ helper for testing IAC + cmd """
303 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
304 nego
= nego_collector()
305 telnet
.set_option_negotiation_callback(nego
.do_nego
)
306 time
.sleep(self
.blocking_timeout
/10)
307 txt
= telnet
.read_all()
309 self
.assertTrue(len(cmd
) > 0) # we expect at least one command
310 self
.assertTrue(cmd
[0] in self
.cmds
)
311 self
.assertEqual(cmd
[1], tl
.NOOPT
)
312 self
.assertEqual(len(''.join(data
[:-1])), len(txt
+ cmd
))
315 def test_IAC_commands(self
):
317 self
.dataq
.put(EOF_sigil
)
318 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
321 for cmd
in self
.cmds
:
322 self
._test
_command
(['x' * 100, tl
.IAC
+ cmd
, 'y'*100, EOF_sigil
])
323 self
._test
_command
(['x' * 10, tl
.IAC
+ cmd
, 'y'*10, EOF_sigil
])
324 self
._test
_command
([tl
.IAC
+ cmd
, EOF_sigil
])
326 self
._test
_command
([tl
.IAC
+ cmd
for (cmd
) in self
.cmds
] + [EOF_sigil
])
328 def test_SB_commands(self
):
329 # RFC 855, subnegotiations portion
330 send
= [tl
.IAC
+ tl
.SB
+ tl
.IAC
+ tl
.SE
,
331 tl
.IAC
+ tl
.SB
+ tl
.IAC
+ tl
.IAC
+ tl
.IAC
+ tl
.SE
,
332 tl
.IAC
+ tl
.SB
+ tl
.IAC
+ tl
.IAC
+ 'aa' + tl
.IAC
+ tl
.SE
,
333 tl
.IAC
+ tl
.SB
+ 'bb' + tl
.IAC
+ tl
.IAC
+ tl
.IAC
+ tl
.SE
,
334 tl
.IAC
+ tl
.SB
+ 'cc' + tl
.IAC
+ tl
.IAC
+ 'dd' + tl
.IAC
+ tl
.SE
,
339 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
340 nego
= nego_collector(telnet
.read_sb_data
)
341 telnet
.set_option_negotiation_callback(nego
.do_nego
)
342 time
.sleep(self
.blocking_timeout
/10)
343 txt
= telnet
.read_all()
344 self
.assertEqual(txt
, '')
345 want_sb_data
= tl
.IAC
+ tl
.IAC
+ 'aabb' + tl
.IAC
+ 'cc' + tl
.IAC
+ 'dd'
346 self
.assertEqual(nego
.sb_seen
, want_sb_data
)
348 def test_main(verbose
=None):
349 test_support
.run_unittest(GeneralTests
, ReadTests
, OptionTests
)
351 if __name__
== '__main__':