12 from test
import test_support
19 HOST
= test_support
.HOST
21 def server(evt
, buf
, serv
):
25 conn
, addr
= serv
.accept()
26 except socket
.timeout
:
31 r
, w
, e
= select
.select([], [conn
], [])
43 @unittest.skipUnless(threading
, 'Threading required for this test.')
44 class GeneralTests(unittest
.TestCase
):
47 self
._threads
= test_support
.threading_setup()
48 self
.evt
= threading
.Event()
49 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
50 self
.sock
.settimeout(15)
51 self
.port
= test_support
.bind_port(self
.sock
)
52 servargs
= (self
.evt
, "220 Hola mundo\n", self
.sock
)
53 self
.thread
= threading
.Thread(target
=server
, args
=servargs
)
61 test_support
.threading_cleanup(*self
._threads
)
65 smtp
= smtplib
.SMTP(HOST
, self
.port
)
69 # connects, include port in host name
70 smtp
= smtplib
.SMTP("%s:%s" % (HOST
, self
.port
))
73 def testLocalHostName(self
):
74 # check that supplied local_hostname is used
75 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
="testhost")
76 self
.assertEqual(smtp
.local_hostname
, "testhost")
79 def testTimeoutDefault(self
):
80 self
.assertTrue(socket
.getdefaulttimeout() is None)
81 socket
.setdefaulttimeout(30)
83 smtp
= smtplib
.SMTP(HOST
, self
.port
)
85 socket
.setdefaulttimeout(None)
86 self
.assertEqual(smtp
.sock
.gettimeout(), 30)
89 def testTimeoutNone(self
):
90 self
.assertTrue(socket
.getdefaulttimeout() is None)
91 socket
.setdefaulttimeout(30)
93 smtp
= smtplib
.SMTP(HOST
, self
.port
, timeout
=None)
95 socket
.setdefaulttimeout(None)
96 self
.assertTrue(smtp
.sock
.gettimeout() is None)
99 def testTimeoutValue(self
):
100 smtp
= smtplib
.SMTP(HOST
, self
.port
, timeout
=30)
101 self
.assertEqual(smtp
.sock
.gettimeout(), 30)
105 # Test server thread using the specified SMTP server class
106 def debugging_server(serv
, serv_evt
, client_evt
):
110 if hasattr(select
, 'poll'):
111 poll_fun
= asyncore
.poll2
113 poll_fun
= asyncore
.poll
116 while asyncore
.socket_map
and n
> 0:
117 poll_fun(0.01, asyncore
.socket_map
)
119 # when the client conversation is finished, it will
120 # set client_evt, and it's then ok to kill the server
121 if client_evt
.is_set():
127 except socket
.timeout
:
130 if not client_evt
.is_set():
131 # allow some time for the client to read the result
137 MSG_BEGIN
= '---------- MESSAGE FOLLOWS ----------\n'
138 MSG_END
= '------------ END MESSAGE ------------\n'
140 # NOTE: Some SMTP objects in the tests below are created with a non-default
141 # local_hostname argument to the constructor, since (on some systems) the FQDN
142 # lookup caused by the default local_hostname sometimes takes so long that the
143 # test server times out, causing the test to fail.
145 # Test behavior of smtpd.DebuggingServer
146 @unittest.skipUnless(threading
, 'Threading required for this test.')
147 class DebuggingServerTests(unittest
.TestCase
):
150 # temporarily replace sys.stdout to capture DebuggingServer output
151 self
.old_stdout
= sys
.stdout
152 self
.output
= StringIO
.StringIO()
153 sys
.stdout
= self
.output
155 self
._threads
= test_support
.threading_setup()
156 self
.serv_evt
= threading
.Event()
157 self
.client_evt
= threading
.Event()
158 # Pick a random unused port by passing 0 for the port number
159 self
.serv
= smtpd
.DebuggingServer((HOST
, 0), ('nowhere', -1))
160 # Keep a note of what port was assigned
161 self
.port
= self
.serv
.socket
.getsockname()[1]
162 serv_args
= (self
.serv
, self
.serv_evt
, self
.client_evt
)
163 self
.thread
= threading
.Thread(target
=debugging_server
, args
=serv_args
)
166 # wait until server thread has assigned a port number
168 self
.serv_evt
.clear()
171 # indicate that the client is finished
172 self
.client_evt
.set()
173 # wait for the server thread to terminate
176 test_support
.threading_cleanup(*self
._threads
)
178 sys
.stdout
= self
.old_stdout
182 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=3)
186 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=3)
187 expected
= (250, 'Ok')
188 self
.assertEqual(smtp
.noop(), expected
)
192 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=3)
193 expected
= (250, 'Ok')
194 self
.assertEqual(smtp
.rset(), expected
)
197 def testNotImplemented(self
):
198 # EHLO isn't implemented in DebuggingServer
199 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=3)
200 expected
= (502, 'Error: command "EHLO" not implemented')
201 self
.assertEqual(smtp
.ehlo(), expected
)
205 # VRFY isn't implemented in DebuggingServer
206 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=3)
207 expected
= (502, 'Error: command "VRFY" not implemented')
208 self
.assertEqual(smtp
.vrfy('nobody@nowhere.com'), expected
)
209 self
.assertEqual(smtp
.verify('nobody@nowhere.com'), expected
)
212 def testSecondHELO(self
):
213 # check that a second HELO returns a message that it's a duplicate
214 # (this behavior is specific to smtpd.SMTPChannel)
215 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=3)
217 expected
= (503, 'Duplicate HELO/EHLO')
218 self
.assertEqual(smtp
.helo(), expected
)
222 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=3)
223 self
.assertEqual(smtp
.help(), 'Error: command "HELP" not implemented')
227 # connect and send mail
229 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=3)
230 smtp
.sendmail('John', 'Sally', m
)
231 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
232 # in asyncore. This sleep might help, but should really be fixed
233 # properly by using an Event variable.
237 self
.client_evt
.set()
240 mexpect
= '%s%s\n%s' % (MSG_BEGIN
, m
, MSG_END
)
241 self
.assertEqual(self
.output
.getvalue(), mexpect
)
244 class NonConnectingTests(unittest
.TestCase
):
246 def testNotConnected(self
):
247 # Test various operations on an unconnected SMTP object that
248 # should raise exceptions (at present the attempt in SMTP.send
249 # to reference the nonexistent 'sock' attribute of the SMTP object
250 # causes an AttributeError)
251 smtp
= smtplib
.SMTP()
252 self
.assertRaises(smtplib
.SMTPServerDisconnected
, smtp
.ehlo
)
253 self
.assertRaises(smtplib
.SMTPServerDisconnected
,
254 smtp
.send
, 'test msg')
256 def testNonnumericPort(self
):
257 # check that non-numeric port raises socket.error
258 self
.assertRaises(socket
.error
, smtplib
.SMTP
,
259 "localhost", "bogus")
260 self
.assertRaises(socket
.error
, smtplib
.SMTP
,
264 # test response of client to a non-successful HELO message
265 @unittest.skipUnless(threading
, 'Threading required for this test.')
266 class BadHELOServerTests(unittest
.TestCase
):
269 self
.old_stdout
= sys
.stdout
270 self
.output
= StringIO
.StringIO()
271 sys
.stdout
= self
.output
273 self
._threads
= test_support
.threading_setup()
274 self
.evt
= threading
.Event()
275 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
276 self
.sock
.settimeout(15)
277 self
.port
= test_support
.bind_port(self
.sock
)
278 servargs
= (self
.evt
, "199 no hello for you!\n", self
.sock
)
279 self
.thread
= threading
.Thread(target
=server
, args
=servargs
)
287 test_support
.threading_cleanup(*self
._threads
)
288 sys
.stdout
= self
.old_stdout
290 def testFailingHELO(self
):
291 self
.assertRaises(smtplib
.SMTPConnectError
, smtplib
.SMTP
,
292 HOST
, self
.port
, 'localhost', 3)
295 sim_users
= {'Mr.A@somewhere.com':'John A',
296 'Ms.B@somewhere.com':'Sally B',
297 'Mrs.C@somewhereesle.com':'Ruth C',
300 sim_auth
= ('Mr.A@somewhere.com', 'somepassword')
301 sim_cram_md5_challenge
= ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
302 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
303 sim_auth_credentials
= {
304 'login': 'TXIuQUBzb21ld2hlcmUuY29t',
305 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
306 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
307 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
309 sim_auth_login_password
= 'C29TZXBHC3N3B3JK'
311 sim_lists
= {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
312 'list-2':['Ms.B@somewhere.com',],
315 # Simulated SMTP channel & server
316 class SimSMTPChannel(smtpd
.SMTPChannel
):
318 def __init__(self
, extra_features
, *args
, **kw
):
319 self
._extrafeatures
= ''.join(
320 [ "250-{0}\r\n".format(x
) for x
in extra_features
])
321 smtpd
.SMTPChannel
.__init
__(self
, *args
, **kw
)
323 def smtp_EHLO(self
, arg
):
324 resp
= ('250-testhost\r\n'
326 '250-SIZE 20000000\r\n'
329 resp
= resp
+ self
._extrafeatures
+ '250 HELP'
332 def smtp_VRFY(self
, arg
):
333 raw_addr
= email
.utils
.parseaddr(arg
)[1]
334 quoted_addr
= smtplib
.quoteaddr(arg
)
335 if raw_addr
in sim_users
:
336 self
.push('250 %s %s' % (sim_users
[raw_addr
], quoted_addr
))
338 self
.push('550 No such user: %s' % arg
)
340 def smtp_EXPN(self
, arg
):
341 list_name
= email
.utils
.parseaddr(arg
)[1].lower()
342 if list_name
in sim_lists
:
343 user_list
= sim_lists
[list_name
]
344 for n
, user_email
in enumerate(user_list
):
345 quoted_addr
= smtplib
.quoteaddr(user_email
)
346 if n
< len(user_list
) - 1:
347 self
.push('250-%s %s' % (sim_users
[user_email
], quoted_addr
))
349 self
.push('250 %s %s' % (sim_users
[user_email
], quoted_addr
))
351 self
.push('550 No access for you!')
353 def smtp_AUTH(self
, arg
):
354 if arg
.strip().lower()=='cram-md5':
355 self
.push('334 {0}'.format(sim_cram_md5_challenge
))
357 mech
, auth
= arg
.split()
359 if mech
not in sim_auth_credentials
:
360 self
.push('504 auth type unimplemented')
362 if mech
== 'plain' and auth
==sim_auth_credentials
['plain']:
363 self
.push('235 plain auth ok')
364 elif mech
=='login' and auth
==sim_auth_credentials
['login']:
365 self
.push('334 Password:')
367 self
.push('550 No access for you!')
369 def handle_error(self
):
373 class SimSMTPServer(smtpd
.SMTPServer
):
375 def __init__(self
, *args
, **kw
):
376 self
._extra
_features
= []
377 smtpd
.SMTPServer
.__init
__(self
, *args
, **kw
)
379 def handle_accept(self
):
380 conn
, addr
= self
.accept()
381 self
._SMTPchannel
= SimSMTPChannel(self
._extra
_features
,
384 def process_message(self
, peer
, mailfrom
, rcpttos
, data
):
387 def add_feature(self
, feature
):
388 self
._extra
_features
.append(feature
)
390 def handle_error(self
):
394 # Test various SMTP & ESMTP commands/behaviors that require a simulated server
395 # (i.e., something with more features than DebuggingServer)
396 @unittest.skipUnless(threading
, 'Threading required for this test.')
397 class SMTPSimTests(unittest
.TestCase
):
400 self
._threads
= test_support
.threading_setup()
401 self
.serv_evt
= threading
.Event()
402 self
.client_evt
= threading
.Event()
403 # Pick a random unused port by passing 0 for the port number
404 self
.serv
= SimSMTPServer((HOST
, 0), ('nowhere', -1))
405 # Keep a note of what port was assigned
406 self
.port
= self
.serv
.socket
.getsockname()[1]
407 serv_args
= (self
.serv
, self
.serv_evt
, self
.client_evt
)
408 self
.thread
= threading
.Thread(target
=debugging_server
, args
=serv_args
)
411 # wait until server thread has assigned a port number
413 self
.serv_evt
.clear()
416 # indicate that the client is finished
417 self
.client_evt
.set()
418 # wait for the server thread to terminate
421 test_support
.threading_cleanup(*self
._threads
)
425 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=15)
429 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=15)
431 # no features should be present before the EHLO
432 self
.assertEqual(smtp
.esmtp_features
, {})
434 # features expected from the test server
435 expected_features
= {'expn':'',
443 self
.assertEqual(smtp
.esmtp_features
, expected_features
)
444 for k
in expected_features
:
445 self
.assertTrue(smtp
.has_extn(k
))
446 self
.assertFalse(smtp
.has_extn('unsupported-feature'))
450 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=15)
452 for email
, name
in sim_users
.items():
453 expected_known
= (250, '%s %s' % (name
, smtplib
.quoteaddr(email
)))
454 self
.assertEqual(smtp
.vrfy(email
), expected_known
)
456 u
= 'nobody@nowhere.com'
457 expected_unknown
= (550, 'No such user: %s' % smtplib
.quoteaddr(u
))
458 self
.assertEqual(smtp
.vrfy(u
), expected_unknown
)
462 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=15)
464 for listname
, members
in sim_lists
.items():
467 users
.append('%s %s' % (sim_users
[m
], smtplib
.quoteaddr(m
)))
468 expected_known
= (250, '\n'.join(users
))
469 self
.assertEqual(smtp
.expn(listname
), expected_known
)
471 u
= 'PSU-Members-List'
472 expected_unknown
= (550, 'No access for you!')
473 self
.assertEqual(smtp
.expn(u
), expected_unknown
)
476 def testAUTH_PLAIN(self
):
477 self
.serv
.add_feature("AUTH PLAIN")
478 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=15)
480 expected_auth_ok
= (235, b
'plain auth ok')
481 self
.assertEqual(smtp
.login(sim_auth
[0], sim_auth
[1]), expected_auth_ok
)
483 # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they
484 # require a synchronous read to obtain the credentials...so instead smtpd
485 # sees the credential sent by smtplib's login method as an unknown command,
486 # which results in smtplib raising an auth error. Fortunately the error
487 # message contains the encoded credential, so we can partially check that it
488 # was generated correctly (partially, because the 'word' is uppercased in
489 # the error message).
491 def testAUTH_LOGIN(self
):
492 self
.serv
.add_feature("AUTH LOGIN")
493 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=15)
494 try: smtp
.login(sim_auth
[0], sim_auth
[1])
495 except smtplib
.SMTPAuthenticationError
as err
:
496 if sim_auth_login_password
not in str(err
):
497 raise "expected encoded password not found in error message"
499 def testAUTH_CRAM_MD5(self
):
500 self
.serv
.add_feature("AUTH CRAM-MD5")
501 smtp
= smtplib
.SMTP(HOST
, self
.port
, local_hostname
='localhost', timeout
=15)
503 try: smtp
.login(sim_auth
[0], sim_auth
[1])
504 except smtplib
.SMTPAuthenticationError
as err
:
505 if sim_auth_credentials
['cram-md5'] not in str(err
):
506 raise "expected encoded credentials not found in error message"
508 #TODO: add tests for correct AUTH method fallback now that the
509 #test infrastructure can support it.
512 def test_main(verbose
=None):
513 test_support
.run_unittest(GeneralTests
, DebuggingServerTests
,
515 BadHELOServerTests
, SMTPSimTests
)
517 if __name__
== '__main__':