1 """Test script for ftplib module."""
3 # Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
19 from unittest
import TestCase
20 from test
import test_support
21 from test
.test_support
import HOST
24 # the dummy data returned by server over the data channel when
25 # RETR, LIST and NLST commands are issued
26 RETR_DATA
= 'abcde12345\r\n' * 1000
27 LIST_DATA
= 'foo\r\nbar\r\n'
28 NLST_DATA
= 'foo\r\nbar\r\n'
31 class DummyDTPHandler(asynchat
.async_chat
):
33 def __init__(self
, conn
, baseclass
):
34 asynchat
.async_chat
.__init
__(self
, conn
)
35 self
.baseclass
= baseclass
36 self
.baseclass
.last_received_data
= ''
38 def handle_read(self
):
39 self
.baseclass
.last_received_data
+= self
.recv(1024)
41 def handle_close(self
):
42 self
.baseclass
.push('226 transfer complete')
46 class DummyFTPHandler(asynchat
.async_chat
):
48 dtp_handler
= DummyDTPHandler
50 def __init__(self
, conn
):
51 asynchat
.async_chat
.__init
__(self
, conn
)
52 self
.set_terminator("\r\n")
55 self
.last_received_cmd
= None
56 self
.last_received_data
= ''
57 self
.next_response
= ''
58 self
.push('220 welcome')
60 def collect_incoming_data(self
, data
):
61 self
.in_buffer
.append(data
)
63 def found_terminator(self
):
64 line
= ''.join(self
.in_buffer
)
66 if self
.next_response
:
67 self
.push(self
.next_response
)
68 self
.next_response
= ''
69 cmd
= line
.split(' ')[0].lower()
70 self
.last_received_cmd
= cmd
71 space
= line
.find(' ')
73 arg
= line
[space
+ 1:]
76 if hasattr(self
, 'cmd_' + cmd
):
77 method
= getattr(self
, 'cmd_' + cmd
)
80 self
.push('550 command "%s" not understood.' %cmd
)
82 def handle_error(self
):
86 asynchat
.async_chat
.push(self
, data
+ '\r\n')
88 def cmd_port(self
, arg
):
89 addr
= map(int, arg
.split(','))
90 ip
= '%d.%d.%d.%d' %tuple(addr
[:4])
91 port
= (addr
[4] * 256) + addr
[5]
92 s
= socket
.create_connection((ip
, port
), timeout
=2)
93 self
.dtp
= self
.dtp_handler(s
, baseclass
=self
)
94 self
.push('200 active data connection established')
96 def cmd_pasv(self
, arg
):
97 sock
= socket
.socket()
98 sock
.bind((self
.socket
.getsockname()[0], 0))
101 ip
, port
= sock
.getsockname()[:2]
102 ip
= ip
.replace('.', ','); p1
= port
/ 256; p2
= port
% 256
103 self
.push('227 entering passive mode (%s,%d,%d)' %(ip
, p1
, p2
))
104 conn
, addr
= sock
.accept()
105 self
.dtp
= self
.dtp_handler(conn
, baseclass
=self
)
107 def cmd_eprt(self
, arg
):
108 af
, ip
, port
= arg
.split(arg
[0])[1:-1]
110 s
= socket
.create_connection((ip
, port
), timeout
=2)
111 self
.dtp
= self
.dtp_handler(s
, baseclass
=self
)
112 self
.push('200 active data connection established')
114 def cmd_epsv(self
, arg
):
115 sock
= socket
.socket(socket
.AF_INET6
)
116 sock
.bind((self
.socket
.getsockname()[0], 0))
119 port
= sock
.getsockname()[1]
120 self
.push('229 entering extended passive mode (|||%d|)' %port
)
121 conn
, addr
= sock
.accept()
122 self
.dtp
= self
.dtp_handler(conn
, baseclass
=self
)
124 def cmd_echo(self
, arg
):
125 # sends back the received string (used by the test suite)
128 def cmd_user(self
, arg
):
129 self
.push('331 username ok')
131 def cmd_pass(self
, arg
):
132 self
.push('230 password ok')
134 def cmd_acct(self
, arg
):
135 self
.push('230 acct ok')
137 def cmd_rnfr(self
, arg
):
138 self
.push('350 rnfr ok')
140 def cmd_rnto(self
, arg
):
141 self
.push('250 rnto ok')
143 def cmd_dele(self
, arg
):
144 self
.push('250 dele ok')
146 def cmd_cwd(self
, arg
):
147 self
.push('250 cwd ok')
149 def cmd_size(self
, arg
):
150 self
.push('250 1000')
152 def cmd_mkd(self
, arg
):
153 self
.push('257 "%s"' %arg
)
155 def cmd_rmd(self
, arg
):
156 self
.push('250 rmd ok')
158 def cmd_pwd(self
, arg
):
159 self
.push('257 "pwd ok"')
161 def cmd_type(self
, arg
):
162 self
.push('200 type ok')
164 def cmd_quit(self
, arg
):
165 self
.push('221 quit ok')
168 def cmd_stor(self
, arg
):
169 self
.push('125 stor ok')
171 def cmd_retr(self
, arg
):
172 self
.push('125 retr ok')
173 self
.dtp
.push(RETR_DATA
)
174 self
.dtp
.close_when_done()
176 def cmd_list(self
, arg
):
177 self
.push('125 list ok')
178 self
.dtp
.push(LIST_DATA
)
179 self
.dtp
.close_when_done()
181 def cmd_nlst(self
, arg
):
182 self
.push('125 nlst ok')
183 self
.dtp
.push(NLST_DATA
)
184 self
.dtp
.close_when_done()
187 class DummyFTPServer(asyncore
.dispatcher
, threading
.Thread
):
189 handler
= DummyFTPHandler
191 def __init__(self
, address
, af
=socket
.AF_INET
):
192 threading
.Thread
.__init
__(self
)
193 asyncore
.dispatcher
.__init
__(self
)
194 self
.create_socket(af
, socket
.SOCK_STREAM
)
198 self
.active_lock
= threading
.Lock()
199 self
.host
, self
.port
= self
.socket
.getsockname()[:2]
202 assert not self
.active
203 self
.__flag
= threading
.Event()
204 threading
.Thread
.start(self
)
210 while self
.active
and asyncore
.socket_map
:
211 self
.active_lock
.acquire()
212 asyncore
.loop(timeout
=0.1, count
=1)
213 self
.active_lock
.release()
214 asyncore
.close_all(ignore_all
=True)
221 def handle_accept(self
):
222 conn
, addr
= self
.accept()
223 self
.handler
= self
.handler(conn
)
226 def handle_connect(self
):
228 handle_read
= handle_connect
233 def handle_error(self
):
239 CERTFILE
= os
.path
.join(os
.path
.dirname(__file__
), "keycert.pem")
241 class SSLConnection(object, asyncore
.dispatcher
):
242 """An asyncore.dispatcher subclass supporting TLS/SSL."""
244 _ssl_accepting
= False
246 def secure_connection(self
):
247 self
.socket
= ssl
.wrap_socket(self
.socket
, suppress_ragged_eofs
=False,
248 certfile
=CERTFILE
, server_side
=True,
249 do_handshake_on_connect
=False,
250 ssl_version
=ssl
.PROTOCOL_SSLv23
)
251 self
._ssl
_accepting
= True
253 def _do_ssl_handshake(self
):
255 self
.socket
.do_handshake()
256 except ssl
.SSLError
, err
:
257 if err
.args
[0] in (ssl
.SSL_ERROR_WANT_READ
,
258 ssl
.SSL_ERROR_WANT_WRITE
):
260 elif err
.args
[0] == ssl
.SSL_ERROR_EOF
:
261 return self
.handle_close()
263 except socket
.error
, err
:
264 if err
.args
[0] == errno
.ECONNABORTED
:
265 return self
.handle_close()
267 self
._ssl
_accepting
= False
269 def handle_read_event(self
):
270 if self
._ssl
_accepting
:
271 self
._do
_ssl
_handshake
()
273 super(SSLConnection
, self
).handle_read_event()
275 def handle_write_event(self
):
276 if self
._ssl
_accepting
:
277 self
._do
_ssl
_handshake
()
279 super(SSLConnection
, self
).handle_write_event()
281 def send(self
, data
):
283 return super(SSLConnection
, self
).send(data
)
284 except ssl
.SSLError
, err
:
285 if err
.args
[0] in (ssl
.SSL_ERROR_EOF
, ssl
.SSL_ERROR_ZERO_RETURN
):
289 def recv(self
, buffer_size
):
291 return super(SSLConnection
, self
).recv(buffer_size
)
292 except ssl
.SSLError
, err
:
293 if err
.args
[0] in (ssl
.SSL_ERROR_EOF
, ssl
.SSL_ERROR_ZERO_RETURN
):
298 def handle_error(self
):
303 if isinstance(self
.socket
, ssl
.SSLSocket
):
304 if self
.socket
._sslobj
is not None:
307 super(SSLConnection
, self
).close()
310 class DummyTLS_DTPHandler(SSLConnection
, DummyDTPHandler
):
311 """A DummyDTPHandler subclass supporting TLS/SSL."""
313 def __init__(self
, conn
, baseclass
):
314 DummyDTPHandler
.__init
__(self
, conn
, baseclass
)
315 if self
.baseclass
.secure_data_channel
:
316 self
.secure_connection()
319 class DummyTLS_FTPHandler(SSLConnection
, DummyFTPHandler
):
320 """A DummyFTPHandler subclass supporting TLS/SSL."""
322 dtp_handler
= DummyTLS_DTPHandler
324 def __init__(self
, conn
):
325 DummyFTPHandler
.__init
__(self
, conn
)
326 self
.secure_data_channel
= False
328 def cmd_auth(self
, line
):
329 """Set up secure control channel."""
330 self
.push('234 AUTH TLS successful')
331 self
.secure_connection()
333 def cmd_pbsz(self
, line
):
334 """Negotiate size of buffer for secure data transfer.
335 For TLS/SSL the only valid value for the parameter is '0'.
336 Any other value is accepted but ignored.
338 self
.push('200 PBSZ=0 successful.')
340 def cmd_prot(self
, line
):
341 """Setup un/secure data channel."""
344 self
.push('200 Protection set to Clear')
345 self
.secure_data_channel
= False
347 self
.push('200 Protection set to Private')
348 self
.secure_data_channel
= True
350 self
.push("502 Unrecognized PROT type (use C or P).")
353 class DummyTLS_FTPServer(DummyFTPServer
):
354 handler
= DummyTLS_FTPHandler
357 class TestFTPClass(TestCase
):
360 self
.server
= DummyFTPServer((HOST
, 0))
362 self
.client
= ftplib
.FTP(timeout
=2)
363 self
.client
.connect(self
.server
.host
, self
.server
.port
)
369 def test_getwelcome(self
):
370 self
.assertEqual(self
.client
.getwelcome(), '220 welcome')
372 def test_sanitize(self
):
373 self
.assertEqual(self
.client
.sanitize('foo'), repr('foo'))
374 self
.assertEqual(self
.client
.sanitize('pass 12345'), repr('pass *****'))
375 self
.assertEqual(self
.client
.sanitize('PASS 12345'), repr('PASS *****'))
377 def test_exceptions(self
):
378 self
.assertRaises(ftplib
.error_temp
, self
.client
.sendcmd
, 'echo 400')
379 self
.assertRaises(ftplib
.error_temp
, self
.client
.sendcmd
, 'echo 499')
380 self
.assertRaises(ftplib
.error_perm
, self
.client
.sendcmd
, 'echo 500')
381 self
.assertRaises(ftplib
.error_perm
, self
.client
.sendcmd
, 'echo 599')
382 self
.assertRaises(ftplib
.error_proto
, self
.client
.sendcmd
, 'echo 999')
384 def test_all_errors(self
):
385 exceptions
= (ftplib
.error_reply
, ftplib
.error_temp
, ftplib
.error_perm
,
386 ftplib
.error_proto
, ftplib
.Error
, IOError, EOFError)
389 raise x('exception not included in all_errors set')
390 except ftplib
.all_errors
:
393 def test_set_pasv(self
):
394 # passive mode is supposed to be enabled by default
395 self
.assertTrue(self
.client
.passiveserver
)
396 self
.client
.set_pasv(True)
397 self
.assertTrue(self
.client
.passiveserver
)
398 self
.client
.set_pasv(False)
399 self
.assertFalse(self
.client
.passiveserver
)
401 def test_voidcmd(self
):
402 self
.client
.voidcmd('echo 200')
403 self
.client
.voidcmd('echo 299')
404 self
.assertRaises(ftplib
.error_reply
, self
.client
.voidcmd
, 'echo 199')
405 self
.assertRaises(ftplib
.error_reply
, self
.client
.voidcmd
, 'echo 300')
407 def test_login(self
):
411 self
.client
.acct('passwd')
413 def test_rename(self
):
414 self
.client
.rename('a', 'b')
415 self
.server
.handler
.next_response
= '200'
416 self
.assertRaises(ftplib
.error_reply
, self
.client
.rename
, 'a', 'b')
418 def test_delete(self
):
419 self
.client
.delete('foo')
420 self
.server
.handler
.next_response
= '199'
421 self
.assertRaises(ftplib
.error_reply
, self
.client
.delete
, 'foo')
424 self
.client
.size('foo')
427 dir = self
.client
.mkd('/foo')
428 self
.assertEqual(dir, '/foo')
431 self
.client
.rmd('foo')
434 dir = self
.client
.pwd()
435 self
.assertEqual(dir, 'pwd ok')
438 self
.assertEqual(self
.client
.quit(), '221 quit ok')
439 # Ensure the connection gets closed; sock attribute should be None
440 self
.assertEqual(self
.client
.sock
, None)
442 def test_retrbinary(self
):
444 self
.client
.retrbinary('retr', received
.append
)
445 self
.assertEqual(''.join(received
), RETR_DATA
)
447 def test_retrlines(self
):
449 self
.client
.retrlines('retr', received
.append
)
450 self
.assertEqual(''.join(received
), RETR_DATA
.replace('\r\n', ''))
452 def test_storbinary(self
):
453 f
= StringIO
.StringIO(RETR_DATA
)
454 self
.client
.storbinary('stor', f
)
455 self
.assertEqual(self
.server
.handler
.last_received_data
, RETR_DATA
)
456 # test new callback arg
459 self
.client
.storbinary('stor', f
, callback
=lambda x
: flag
.append(None))
460 self
.assertTrue(flag
)
462 def test_storlines(self
):
463 f
= StringIO
.StringIO(RETR_DATA
.replace('\r\n', '\n'))
464 self
.client
.storlines('stor', f
)
465 self
.assertEqual(self
.server
.handler
.last_received_data
, RETR_DATA
)
466 # test new callback arg
469 self
.client
.storlines('stor foo', f
, callback
=lambda x
: flag
.append(None))
470 self
.assertTrue(flag
)
474 self
.assertEqual(self
.client
.nlst(), NLST_DATA
.split('\r\n')[:-1])
478 self
.client
.dir(lambda x
: l
.append(x
))
479 self
.assertEqual(''.join(l
), LIST_DATA
.replace('\r\n', ''))
481 def test_makeport(self
):
482 self
.client
.makeport()
483 # IPv4 is in use, just make sure send_eprt has not been used
484 self
.assertEqual(self
.server
.handler
.last_received_cmd
, 'port')
486 def test_makepasv(self
):
487 host
, port
= self
.client
.makepasv()
488 conn
= socket
.create_connection((host
, port
), 2)
490 # IPv4 is in use, just make sure send_epsv has not been used
491 self
.assertEqual(self
.server
.handler
.last_received_cmd
, 'pasv')
494 class TestIPv6Environment(TestCase
):
497 self
.server
= DummyFTPServer((HOST
, 0), af
=socket
.AF_INET6
)
499 self
.client
= ftplib
.FTP()
500 self
.client
.connect(self
.server
.host
, self
.server
.port
)
507 self
.assertEqual(self
.client
.af
, socket
.AF_INET6
)
509 def test_makeport(self
):
510 self
.client
.makeport()
511 self
.assertEqual(self
.server
.handler
.last_received_cmd
, 'eprt')
513 def test_makepasv(self
):
514 host
, port
= self
.client
.makepasv()
515 conn
= socket
.create_connection((host
, port
), 2)
517 self
.assertEqual(self
.server
.handler
.last_received_cmd
, 'epsv')
519 def test_transfer(self
):
522 self
.client
.retrbinary('retr', received
.append
)
523 self
.assertEqual(''.join(received
), RETR_DATA
)
524 self
.client
.set_pasv(True)
526 self
.client
.set_pasv(False)
530 class TestTLS_FTPClassMixin(TestFTPClass
):
531 """Repeat TestFTPClass tests starting the TLS layer for both control
532 and data connections first.
536 self
.server
= DummyTLS_FTPServer((HOST
, 0))
538 self
.client
= ftplib
.FTP_TLS(timeout
=2)
539 self
.client
.connect(self
.server
.host
, self
.server
.port
)
545 class TestTLS_FTPClass(TestCase
):
546 """Specific TLS_FTP class tests."""
549 self
.server
= DummyTLS_FTPServer((HOST
, 0))
551 self
.client
= ftplib
.FTP_TLS(timeout
=2)
552 self
.client
.connect(self
.server
.host
, self
.server
.port
)
558 def test_control_connection(self
):
559 self
.assertFalse(isinstance(self
.client
.sock
, ssl
.SSLSocket
))
561 self
.assertTrue(isinstance(self
.client
.sock
, ssl
.SSLSocket
))
563 def test_data_connection(self
):
565 sock
= self
.client
.transfercmd('list')
566 self
.assertFalse(isinstance(sock
, ssl
.SSLSocket
))
568 self
.client
.voidresp()
570 # secured, after PROT P
572 sock
= self
.client
.transfercmd('list')
573 self
.assertTrue(isinstance(sock
, ssl
.SSLSocket
))
575 self
.client
.voidresp()
577 # PROT C is issued, the connection must be in cleartext again
579 sock
= self
.client
.transfercmd('list')
580 self
.assertFalse(isinstance(sock
, ssl
.SSLSocket
))
582 self
.client
.voidresp()
584 def test_login(self
):
585 # login() is supposed to implicitly secure the control connection
586 self
.assertFalse(isinstance(self
.client
.sock
, ssl
.SSLSocket
))
588 self
.assertTrue(isinstance(self
.client
.sock
, ssl
.SSLSocket
))
589 # make sure that AUTH TLS doesn't get issued again
592 def test_auth_issued_twice(self
):
594 self
.assertRaises(ValueError, self
.client
.auth
)
596 def test_auth_ssl(self
):
598 self
.client
.ssl_version
= ssl
.PROTOCOL_SSLv3
600 self
.assertRaises(ValueError, self
.client
.auth
)
602 self
.client
.ssl_version
= ssl
.PROTOCOL_TLSv1
605 class TestTimeouts(TestCase
):
608 self
.evt
= threading
.Event()
609 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
610 self
.sock
.settimeout(3)
611 self
.port
= test_support
.bind_port(self
.sock
)
612 threading
.Thread(target
=self
.server
, args
=(self
.evt
,self
.sock
)).start()
613 # Wait for the server to be ready.
616 ftplib
.FTP
.port
= self
.port
621 def server(self
, evt
, serv
):
622 # This method sets the evt 3 times:
623 # 1) when the connection is ready to be accepted.
624 # 2) when it is safe for the caller to close the connection
625 # 3) when we have closed the socket
627 # (1) Signal the caller that we are ready to accept the connection.
630 conn
, addr
= serv
.accept()
631 except socket
.timeout
:
634 conn
.send("1 Hola mundo\n")
635 # (2) Signal the caller that it is safe to close the socket.
640 # (3) Signal the caller that we are done.
643 def testTimeoutDefault(self
):
644 # default -- use global socket timeout
645 self
.assertTrue(socket
.getdefaulttimeout() is None)
646 socket
.setdefaulttimeout(30)
648 ftp
= ftplib
.FTP("localhost")
650 socket
.setdefaulttimeout(None)
651 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
655 def testTimeoutNone(self
):
656 # no timeout -- do not use global socket timeout
657 self
.assertTrue(socket
.getdefaulttimeout() is None)
658 socket
.setdefaulttimeout(30)
660 ftp
= ftplib
.FTP("localhost", timeout
=None)
662 socket
.setdefaulttimeout(None)
663 self
.assertTrue(ftp
.sock
.gettimeout() is None)
667 def testTimeoutValue(self
):
669 ftp
= ftplib
.FTP(HOST
, timeout
=30)
670 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
674 def testTimeoutConnect(self
):
676 ftp
.connect(HOST
, timeout
=30)
677 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
681 def testTimeoutDifferentOrder(self
):
682 ftp
= ftplib
.FTP(timeout
=30)
684 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
688 def testTimeoutDirectAccess(self
):
692 self
.assertEqual(ftp
.sock
.gettimeout(), 30)
698 tests
= [TestFTPClass
, TestTimeouts
]
701 DummyFTPServer((HOST
, 0), af
=socket
.AF_INET6
)
705 tests
.append(TestIPv6Environment
)
708 tests
.extend([TestTLS_FTPClassMixin
, TestTLS_FTPClass
])
710 thread_info
= test_support
.threading_setup()
712 test_support
.run_unittest(*tests
)
714 test_support
.threading_cleanup(*thread_info
)
717 if __name__
== '__main__':