1 """Test script for ftplib module."""
3 # Modified by Giampaolo Rodola' to test FTP class and IPv6 environment
12 from unittest
import TestCase
13 from test
import support
14 from test
.support
import HOST
16 # the dummy data returned by server over the data channel when
17 # RETR, LIST and NLST commands are issued
18 RETR_DATA
= 'abcde12345\r\n' * 1000
19 LIST_DATA
= 'foo\r\nbar\r\n'
20 NLST_DATA
= 'foo\r\nbar\r\n'
23 class DummyDTPHandler(asynchat
.async_chat
):
25 def __init__(self
, conn
, baseclass
):
26 asynchat
.async_chat
.__init
__(self
, conn
)
27 self
.baseclass
= baseclass
28 self
.baseclass
.last_received_data
= ''
30 def handle_read(self
):
31 self
.baseclass
.last_received_data
+= self
.recv(1024).decode('ascii')
33 def handle_close(self
):
34 self
.baseclass
.push('226 transfer complete')
38 super(DummyDTPHandler
, self
).push(what
.encode('ascii'))
41 class DummyFTPHandler(asynchat
.async_chat
):
43 def __init__(self
, conn
):
44 asynchat
.async_chat
.__init
__(self
, conn
)
45 self
.set_terminator(b
"\r\n")
48 self
.last_received_cmd
= None
49 self
.last_received_data
= ''
50 self
.next_response
= ''
51 self
.push('220 welcome')
53 def collect_incoming_data(self
, data
):
54 self
.in_buffer
.append(data
)
56 def found_terminator(self
):
57 line
= b
''.join(self
.in_buffer
).decode('ascii')
59 if self
.next_response
:
60 self
.push(self
.next_response
)
61 self
.next_response
= ''
62 cmd
= line
.split(' ')[0].lower()
63 self
.last_received_cmd
= cmd
64 space
= line
.find(' ')
66 arg
= line
[space
+ 1:]
69 if hasattr(self
, 'cmd_' + cmd
):
70 method
= getattr(self
, 'cmd_' + cmd
)
73 self
.push('550 command "%s" not understood.' %cmd
)
75 def handle_error(self
):
79 asynchat
.async_chat
.push(self
, data
.encode('ascii') + b
'\r\n')
81 def cmd_port(self
, arg
):
82 addr
= list(map(int, arg
.split(',')))
83 ip
= '%d.%d.%d.%d' %tuple(addr
[:4])
84 port
= (addr
[4] * 256) + addr
[5]
85 s
= socket
.create_connection((ip
, port
), timeout
=2)
86 self
.dtp
= DummyDTPHandler(s
, baseclass
=self
)
87 self
.push('200 active data connection established')
89 def cmd_pasv(self
, arg
):
90 sock
= socket
.socket()
91 sock
.bind((self
.socket
.getsockname()[0], 0))
94 ip
, port
= sock
.getsockname()[:2]
95 ip
= ip
.replace('.', ','); p1
= port
/ 256; p2
= port
% 256
96 self
.push('227 entering passive mode (%s,%d,%d)' %(ip
, p1
, p2
))
97 conn
, addr
= sock
.accept()
98 self
.dtp
= DummyDTPHandler(conn
, baseclass
=self
)
100 def cmd_eprt(self
, arg
):
101 af
, ip
, port
= arg
.split(arg
[0])[1:-1]
103 s
= socket
.create_connection((ip
, port
), timeout
=2)
104 self
.dtp
= DummyDTPHandler(s
, baseclass
=self
)
105 self
.push('200 active data connection established')
107 def cmd_epsv(self
, arg
):
108 sock
= socket
.socket(socket
.AF_INET6
)
109 sock
.bind((self
.socket
.getsockname()[0], 0))
112 port
= sock
.getsockname()[1]
113 self
.push('229 entering extended passive mode (|||%d|)' %port
)
114 conn
, addr
= sock
.accept()
115 self
.dtp
= DummyDTPHandler(conn
, baseclass
=self
)
117 def cmd_echo(self
, arg
):
118 # sends back the received string (used by the test suite)
121 def cmd_user(self
, arg
):
122 self
.push('331 username ok')
124 def cmd_pass(self
, arg
):
125 self
.push('230 password ok')
127 def cmd_acct(self
, arg
):
128 self
.push('230 acct ok')
130 def cmd_rnfr(self
, arg
):
131 self
.push('350 rnfr ok')
133 def cmd_rnto(self
, arg
):
134 self
.push('250 rnto ok')
136 def cmd_dele(self
, arg
):
137 self
.push('250 dele ok')
139 def cmd_cwd(self
, arg
):
140 self
.push('250 cwd ok')
142 def cmd_size(self
, arg
):
143 self
.push('250 1000')
145 def cmd_mkd(self
, arg
):
146 self
.push('257 "%s"' %arg
)
148 def cmd_rmd(self
, arg
):
149 self
.push('250 rmd ok')
151 def cmd_pwd(self
, arg
):
152 self
.push('257 "pwd ok"')
154 def cmd_type(self
, arg
):
155 self
.push('200 type ok')
157 def cmd_quit(self
, arg
):
158 self
.push('221 quit ok')
161 def cmd_stor(self
, arg
):
162 self
.push('125 stor ok')
164 def cmd_retr(self
, arg
):
165 self
.push('125 retr ok')
166 self
.dtp
.push(RETR_DATA
)
167 self
.dtp
.close_when_done()
169 def cmd_list(self
, arg
):
170 self
.push('125 list ok')
171 self
.dtp
.push(LIST_DATA
)
172 self
.dtp
.close_when_done()
174 def cmd_nlst(self
, arg
):
175 self
.push('125 nlst ok')
176 self
.dtp
.push(NLST_DATA
)
177 self
.dtp
.close_when_done()
180 class DummyFTPServer(asyncore
.dispatcher
, threading
.Thread
):
182 handler
= DummyFTPHandler
184 def __init__(self
, address
, af
=socket
.AF_INET
):
185 threading
.Thread
.__init
__(self
)
186 asyncore
.dispatcher
.__init
__(self
)
187 self
.create_socket(af
, socket
.SOCK_STREAM
)
191 self
.active_lock
= threading
.Lock()
192 self
.host
, self
.port
= self
.socket
.getsockname()[:2]
195 assert not self
.active
196 self
.__flag
= threading
.Event()
197 threading
.Thread
.start(self
)
203 while self
.active
and asyncore
.socket_map
:
204 self
.active_lock
.acquire()
205 asyncore
.loop(timeout
=0.1, count
=1)
206 self
.active_lock
.release()
207 asyncore
.close_all(ignore_all
=True)
214 def handle_accept(self
):
215 conn
, addr
= self
.accept()
216 self
.handler
= self
.handler(conn
)
219 def handle_connect(self
):
221 handle_read
= handle_connect
226 def handle_error(self
):
230 class TestFTPClass(TestCase
):
233 self
.server
= DummyFTPServer((HOST
, 0))
235 self
.client
= ftplib
.FTP(timeout
=2)
236 self
.client
.connect(self
.server
.host
, self
.server
.port
)
242 def test_getwelcome(self
):
243 self
.assertEqual(self
.client
.getwelcome(), '220 welcome')
245 def test_sanitize(self
):
246 self
.assertEqual(self
.client
.sanitize('foo'), repr('foo'))
247 self
.assertEqual(self
.client
.sanitize('pass 12345'), repr('pass *****'))
248 self
.assertEqual(self
.client
.sanitize('PASS 12345'), repr('PASS *****'))
250 def test_exceptions(self
):
251 self
.assertRaises(ftplib
.error_temp
, self
.client
.sendcmd
, 'echo 400')
252 self
.assertRaises(ftplib
.error_temp
, self
.client
.sendcmd
, 'echo 499')
253 self
.assertRaises(ftplib
.error_perm
, self
.client
.sendcmd
, 'echo 500')
254 self
.assertRaises(ftplib
.error_perm
, self
.client
.sendcmd
, 'echo 599')
255 self
.assertRaises(ftplib
.error_proto
, self
.client
.sendcmd
, 'echo 999')
257 def test_all_errors(self
):
258 exceptions
= (ftplib
.error_reply
, ftplib
.error_temp
, ftplib
.error_perm
,
259 ftplib
.error_proto
, ftplib
.Error
, IOError, EOFError)
262 raise x('exception not included in all_errors set')
263 except ftplib
.all_errors
:
266 def test_set_pasv(self
):
267 # passive mode is supposed to be enabled by default
268 self
.assertTrue(self
.client
.passiveserver
)
269 self
.client
.set_pasv(True)
270 self
.assertTrue(self
.client
.passiveserver
)
271 self
.client
.set_pasv(False)
272 self
.assertFalse(self
.client
.passiveserver
)
274 def test_voidcmd(self
):
275 self
.client
.voidcmd('echo 200')
276 self
.client
.voidcmd('echo 299')
277 self
.assertRaises(ftplib
.error_reply
, self
.client
.voidcmd
, 'echo 199')
278 self
.assertRaises(ftplib
.error_reply
, self
.client
.voidcmd
, 'echo 300')
280 def test_login(self
):
284 self
.client
.acct('passwd')
286 def test_rename(self
):
287 self
.client
.rename('a', 'b')
288 self
.server
.handler
.next_response
= '200'
289 self
.assertRaises(ftplib
.error_reply
, self
.client
.rename
, 'a', 'b')
291 def test_delete(self
):
292 self
.client
.delete('foo')
293 self
.server
.handler
.next_response
= '199'
294 self
.assertRaises(ftplib
.error_reply
, self
.client
.delete
, 'foo')
297 self
.client
.size('foo')
300 dir = self
.client
.mkd('/foo')
301 self
.assertEqual(dir, '/foo')
304 self
.client
.rmd('foo')
307 dir = self
.client
.pwd()
308 self
.assertEqual(dir, 'pwd ok')
311 self
.assertEqual(self
.client
.quit(), '221 quit ok')
312 # Ensure the connection gets closed; sock attribute should be None
313 self
.assertEqual(self
.client
.sock
, None)
315 def test_retrbinary(self
):
317 received
.append(data
.decode('ascii'))
319 self
.client
.retrbinary('retr', callback
)
320 self
.assertEqual(''.join(received
), RETR_DATA
)
322 def test_retrlines(self
):
324 self
.client
.retrlines('retr', received
.append
)
325 self
.assertEqual(''.join(received
), RETR_DATA
.replace('\r\n', ''))
327 def test_storbinary(self
):
328 f
= io
.BytesIO(RETR_DATA
.encode('ascii'))
329 self
.client
.storbinary('stor', f
)
330 self
.assertEqual(self
.server
.handler
.last_received_data
, RETR_DATA
)
331 # test new callback arg
334 self
.client
.storbinary('stor', f
, callback
=lambda x
: flag
.append(None))
335 self
.assertTrue(flag
)
337 def test_storlines(self
):
338 f
= io
.BytesIO(RETR_DATA
.replace('\r\n', '\n').encode('ascii'))
339 self
.client
.storlines('stor', f
)
340 self
.assertEqual(self
.server
.handler
.last_received_data
, RETR_DATA
)
341 # test new callback arg
344 self
.client
.storlines('stor foo', f
, callback
=lambda x
: flag
.append(None))
345 self
.assertTrue(flag
)
349 self
.assertEqual(self
.client
.nlst(), NLST_DATA
.split('\r\n')[:-1])
353 self
.client
.dir(lambda x
: l
.append(x
))
354 self
.assertEqual(''.join(l
), LIST_DATA
.replace('\r\n', ''))
356 def test_makeport(self
):
357 self
.client
.makeport()
358 # IPv4 is in use, just make sure send_eprt has not been used
359 self
.assertEqual(self
.server
.handler
.last_received_cmd
, 'port')
361 def test_makepasv(self
):
362 host
, port
= self
.client
.makepasv()
363 conn
= socket
.create_connection((host
, port
), 2)
365 # IPv4 is in use, just make sure send_epsv has not been used
366 self
.assertEqual(self
.server
.handler
.last_received_cmd
, 'pasv')
369 class TestIPv6Environment(TestCase
):
372 self
.server
= DummyFTPServer((HOST
, 0), af
=socket
.AF_INET6
)
374 self
.client
= ftplib
.FTP()
375 self
.client
.connect(self
.server
.host
, self
.server
.port
)
382 self
.assertEqual(self
.client
.af
, socket
.AF_INET6
)
384 def test_makeport(self
):
385 self
.client
.makeport()
386 self
.assertEqual(self
.server
.handler
.last_received_cmd
, 'eprt')
388 def test_makepasv(self
):
389 host
, port
= self
.client
.makepasv()
390 conn
= socket
.create_connection((host
, port
), 2)
392 self
.assertEqual(self
.server
.handler
.last_received_cmd
, 'epsv')
394 def test_transfer(self
):
397 received
.append(data
.decode('ascii'))
399 self
.client
.retrbinary('retr', callback
)
400 self
.assertEqual(''.join(received
), RETR_DATA
)
401 self
.client
.set_pasv(True)
403 self
.client
.set_pasv(False)
407 class TestTimeouts(TestCase
):
410 self
.evt
= threading
.Event()
411 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
412 self
.sock
.settimeout(3)
413 self
.port
= support
.bind_port(self
.sock
)
414 threading
.Thread(target
=self
.server
, args
=(self
.evt
,self
.sock
)).start()
415 # Wait for the server to be ready.
418 ftplib
.FTP
.port
= self
.port
423 def server(self
, evt
, serv
):
424 # This method sets the evt 3 times:
425 # 1) when the connection is ready to be accepted.
426 # 2) when it is safe for the caller to close the connection
427 # 3) when we have closed the socket
429 # (1) Signal the caller that we are ready to accept the connection.
432 conn
, addr
= serv
.accept()
433 except socket
.timeout
:
436 conn
.send(b
"1 Hola mundo\n")
437 # (2) Signal the caller that it is safe to close the socket.
442 # (3) Signal the caller that we are done.
445 def testTimeoutDefault(self
):
446 # default -- use global socket timeout
447 self
.assert_(socket
.getdefaulttimeout() is None)
448 socket
.setdefaulttimeout(30)
450 ftp
= ftplib
.FTP("localhost")
452 socket
.setdefaulttimeout(None)
453 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
457 def testTimeoutNone(self
):
458 # no timeout -- do not use global socket timeout
459 self
.assert_(socket
.getdefaulttimeout() is None)
460 socket
.setdefaulttimeout(30)
462 ftp
= ftplib
.FTP("localhost", timeout
=None)
464 socket
.setdefaulttimeout(None)
465 self
.assertTrue(ftp
.sock
.gettimeout() is None)
469 def testTimeoutValue(self
):
471 ftp
= ftplib
.FTP(HOST
, timeout
=30)
472 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
476 def testTimeoutConnect(self
):
478 ftp
.connect(HOST
, timeout
=30)
479 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
483 def testTimeoutDifferentOrder(self
):
484 ftp
= ftplib
.FTP(timeout
=30)
486 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
490 def testTimeoutDirectAccess(self
):
494 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
500 tests
= [TestFTPClass
, TestTimeouts
]
503 DummyFTPServer((HOST
, 0), af
=socket
.AF_INET6
)
507 tests
.append(TestIPv6Environment
)
508 thread_info
= support
.threading_setup()
510 support
.run_unittest(*tests
)
512 support
.threading_cleanup(*thread_info
)
515 if __name__
== '__main__':