11 from test
import test_support
21 class TestBase(unittest
.TestCase
):
23 def _check_sample(self
, msg
):
24 # Inspect a mailbox.Message representation of the sample message
25 self
.assertTrue(isinstance(msg
, email
.message
.Message
))
26 self
.assertTrue(isinstance(msg
, mailbox
.Message
))
27 for key
, value
in _sample_headers
.iteritems():
28 self
.assertTrue(value
in msg
.get_all(key
))
29 self
.assertTrue(msg
.is_multipart())
30 self
.assertTrue(len(msg
.get_payload()) == len(_sample_payloads
))
31 for i
, payload
in enumerate(_sample_payloads
):
32 part
= msg
.get_payload(i
)
33 self
.assertTrue(isinstance(part
, email
.message
.Message
))
34 self
.assertTrue(not isinstance(part
, mailbox
.Message
))
35 self
.assertTrue(part
.get_payload() == payload
)
37 def _delete_recursively(self
, target
):
38 # Delete a file or delete a directory recursively
39 if os
.path
.isdir(target
):
40 for path
, dirs
, files
in os
.walk(target
, topdown
=False):
42 os
.remove(os
.path
.join(path
, name
))
44 os
.rmdir(os
.path
.join(path
, name
))
46 elif os
.path
.exists(target
):
50 class TestMailbox(TestBase
):
52 _factory
= None # Overridden by subclasses to reuse tests
53 _template
= 'From: foo\n\n%s'
56 self
._path
= test_support
.TESTFN
57 self
._delete
_recursively
(self
._path
)
58 self
._box
= self
._factory
(self
._path
)
62 self
._delete
_recursively
(self
._path
)
65 # Add copies of a sample message
67 keys
.append(self
._box
.add(self
._template
% 0))
68 self
.assertTrue(len(self
._box
) == 1)
69 keys
.append(self
._box
.add(mailbox
.Message(_sample_message
)))
70 self
.assertTrue(len(self
._box
) == 2)
71 keys
.append(self
._box
.add(email
.message_from_string(_sample_message
)))
72 self
.assertTrue(len(self
._box
) == 3)
73 keys
.append(self
._box
.add(StringIO
.StringIO(_sample_message
)))
74 self
.assertTrue(len(self
._box
) == 4)
75 keys
.append(self
._box
.add(_sample_message
))
76 self
.assertTrue(len(self
._box
) == 5)
77 self
.assertTrue(self
._box
.get_string(keys
[0]) == self
._template
% 0)
78 for i
in (1, 2, 3, 4):
79 self
._check
_sample
(self
._box
[keys
[i
]])
81 def test_remove(self
):
82 # Remove messages using remove()
83 self
._test
_remove
_or
_delitem
(self
._box
.remove
)
85 def test_delitem(self
):
86 # Remove messages using __delitem__()
87 self
._test
_remove
_or
_delitem
(self
._box
.__delitem
__)
89 def _test_remove_or_delitem(self
, method
):
90 # (Used by test_remove() and test_delitem().)
91 key0
= self
._box
.add(self
._template
% 0)
92 key1
= self
._box
.add(self
._template
% 1)
93 self
.assertTrue(len(self
._box
) == 2)
96 self
.assertTrue(l
== 1, "actual l: %s" % l
)
97 self
.assertRaises(KeyError, lambda: self
._box
[key0
])
98 self
.assertRaises(KeyError, lambda: method(key0
))
99 self
.assertTrue(self
._box
.get_string(key1
) == self
._template
% 1)
100 key2
= self
._box
.add(self
._template
% 2)
101 self
.assertTrue(len(self
._box
) == 2)
104 self
.assertTrue(l
== 1, "actual l: %s" % l
)
105 self
.assertRaises(KeyError, lambda: self
._box
[key2
])
106 self
.assertRaises(KeyError, lambda: method(key2
))
107 self
.assertTrue(self
._box
.get_string(key1
) == self
._template
% 1)
109 self
.assertTrue(len(self
._box
) == 0)
110 self
.assertRaises(KeyError, lambda: self
._box
[key1
])
111 self
.assertRaises(KeyError, lambda: method(key1
))
113 def test_discard(self
, repetitions
=10):
115 key0
= self
._box
.add(self
._template
% 0)
116 key1
= self
._box
.add(self
._template
% 1)
117 self
.assertTrue(len(self
._box
) == 2)
118 self
._box
.discard(key0
)
119 self
.assertTrue(len(self
._box
) == 1)
120 self
.assertRaises(KeyError, lambda: self
._box
[key0
])
121 self
._box
.discard(key0
)
122 self
.assertTrue(len(self
._box
) == 1)
123 self
.assertRaises(KeyError, lambda: self
._box
[key0
])
126 # Retrieve messages using get()
127 key0
= self
._box
.add(self
._template
% 0)
128 msg
= self
._box
.get(key0
)
129 self
.assertTrue(msg
['from'] == 'foo')
130 self
.assertTrue(msg
.get_payload() == '0')
131 self
.assertTrue(self
._box
.get('foo') is None)
132 self
.assertTrue(self
._box
.get('foo', False) is False)
134 self
._box
= self
._factory
(self
._path
, factory
=rfc822
.Message
)
135 key1
= self
._box
.add(self
._template
% 1)
136 msg
= self
._box
.get(key1
)
137 self
.assertTrue(msg
['from'] == 'foo')
138 self
.assertTrue(msg
.fp
.read() == '1')
140 def test_getitem(self
):
141 # Retrieve message using __getitem__()
142 key0
= self
._box
.add(self
._template
% 0)
143 msg
= self
._box
[key0
]
144 self
.assertTrue(msg
['from'] == 'foo')
145 self
.assertTrue(msg
.get_payload() == '0')
146 self
.assertRaises(KeyError, lambda: self
._box
['foo'])
147 self
._box
.discard(key0
)
148 self
.assertRaises(KeyError, lambda: self
._box
[key0
])
150 def test_get_message(self
):
151 # Get Message representations of messages
152 key0
= self
._box
.add(self
._template
% 0)
153 key1
= self
._box
.add(_sample_message
)
154 msg0
= self
._box
.get_message(key0
)
155 self
.assertTrue(isinstance(msg0
, mailbox
.Message
))
156 self
.assertTrue(msg0
['from'] == 'foo')
157 self
.assertTrue(msg0
.get_payload() == '0')
158 self
._check
_sample
(self
._box
.get_message(key1
))
160 def test_get_string(self
):
161 # Get string representations of messages
162 key0
= self
._box
.add(self
._template
% 0)
163 key1
= self
._box
.add(_sample_message
)
164 self
.assertTrue(self
._box
.get_string(key0
) == self
._template
% 0)
165 self
.assertTrue(self
._box
.get_string(key1
) == _sample_message
)
167 def test_get_file(self
):
168 # Get file representations of messages
169 key0
= self
._box
.add(self
._template
% 0)
170 key1
= self
._box
.add(_sample_message
)
171 self
.assertTrue(self
._box
.get_file(key0
).read().replace(os
.linesep
, '\n')
172 == self
._template
% 0)
173 self
.assertTrue(self
._box
.get_file(key1
).read().replace(os
.linesep
, '\n')
176 def test_iterkeys(self
):
177 # Get keys using iterkeys()
178 self
._check
_iteration
(self
._box
.iterkeys
, do_keys
=True, do_values
=False)
181 # Get keys using keys()
182 self
._check
_iteration
(self
._box
.keys
, do_keys
=True, do_values
=False)
184 def test_itervalues(self
):
185 # Get values using itervalues()
186 self
._check
_iteration
(self
._box
.itervalues
, do_keys
=False,
190 # Get values using __iter__()
191 self
._check
_iteration
(self
._box
.__iter
__, do_keys
=False,
194 def test_values(self
):
195 # Get values using values()
196 self
._check
_iteration
(self
._box
.values
, do_keys
=False, do_values
=True)
198 def test_iteritems(self
):
199 # Get keys and values using iteritems()
200 self
._check
_iteration
(self
._box
.iteritems
, do_keys
=True,
203 def test_items(self
):
204 # Get keys and values using items()
205 self
._check
_iteration
(self
._box
.items
, do_keys
=True, do_values
=True)
207 def _check_iteration(self
, method
, do_keys
, do_values
, repetitions
=10):
208 for value
in method():
209 self
.fail("Not empty")
210 keys
, values
= [], []
211 for i
in xrange(repetitions
):
212 keys
.append(self
._box
.add(self
._template
% i
))
213 values
.append(self
._template
% i
)
214 if do_keys
and not do_values
:
215 returned_keys
= list(method())
216 elif do_values
and not do_keys
:
217 returned_values
= list(method())
219 returned_keys
, returned_values
= [], []
220 for key
, value
in method():
221 returned_keys
.append(key
)
222 returned_values
.append(value
)
224 self
.assertTrue(len(keys
) == len(returned_keys
))
225 self
.assertTrue(set(keys
) == set(returned_keys
))
228 for value
in returned_values
:
229 self
.assertTrue(value
['from'] == 'foo')
230 self
.assertTrue(int(value
.get_payload()) < repetitions
)
232 self
.assertTrue(len(values
) == count
)
234 def test_has_key(self
):
235 # Check existence of keys using has_key()
236 self
._test
_has
_key
_or
_contains
(self
._box
.has_key
)
238 def test_contains(self
):
239 # Check existence of keys using __contains__()
240 self
._test
_has
_key
_or
_contains
(self
._box
.__contains
__)
242 def _test_has_key_or_contains(self
, method
):
243 # (Used by test_has_key() and test_contains().)
244 self
.assertTrue(not method('foo'))
245 key0
= self
._box
.add(self
._template
% 0)
246 self
.assertTrue(method(key0
))
247 self
.assertTrue(not method('foo'))
248 key1
= self
._box
.add(self
._template
% 1)
249 self
.assertTrue(method(key1
))
250 self
.assertTrue(method(key0
))
251 self
.assertTrue(not method('foo'))
252 self
._box
.remove(key0
)
253 self
.assertTrue(not method(key0
))
254 self
.assertTrue(method(key1
))
255 self
.assertTrue(not method('foo'))
256 self
._box
.remove(key1
)
257 self
.assertTrue(not method(key1
))
258 self
.assertTrue(not method(key0
))
259 self
.assertTrue(not method('foo'))
261 def test_len(self
, repetitions
=10):
264 for i
in xrange(repetitions
):
265 self
.assertTrue(len(self
._box
) == i
)
266 keys
.append(self
._box
.add(self
._template
% i
))
267 self
.assertTrue(len(self
._box
) == i
+ 1)
268 for i
in xrange(repetitions
):
269 self
.assertTrue(len(self
._box
) == repetitions
- i
)
270 self
._box
.remove(keys
[i
])
271 self
.assertTrue(len(self
._box
) == repetitions
- i
- 1)
273 def test_set_item(self
):
274 # Modify messages using __setitem__()
275 key0
= self
._box
.add(self
._template
% 'original 0')
276 self
.assertTrue(self
._box
.get_string(key0
) == \
277 self
._template
% 'original 0')
278 key1
= self
._box
.add(self
._template
% 'original 1')
279 self
.assertTrue(self
._box
.get_string(key1
) == \
280 self
._template
% 'original 1')
281 self
._box
[key0
] = self
._template
% 'changed 0'
282 self
.assertTrue(self
._box
.get_string(key0
) == \
283 self
._template
% 'changed 0')
284 self
._box
[key1
] = self
._template
% 'changed 1'
285 self
.assertTrue(self
._box
.get_string(key1
) == \
286 self
._template
% 'changed 1')
287 self
._box
[key0
] = _sample_message
288 self
._check
_sample
(self
._box
[key0
])
289 self
._box
[key1
] = self
._box
[key0
]
290 self
._check
_sample
(self
._box
[key1
])
291 self
._box
[key0
] = self
._template
% 'original 0'
292 self
.assertTrue(self
._box
.get_string(key0
) ==
293 self
._template
% 'original 0')
294 self
._check
_sample
(self
._box
[key1
])
295 self
.assertRaises(KeyError,
296 lambda: self
._box
.__setitem
__('foo', 'bar'))
297 self
.assertRaises(KeyError, lambda: self
._box
['foo'])
298 self
.assertTrue(len(self
._box
) == 2)
300 def test_clear(self
, iterations
=10):
301 # Remove all messages using clear()
303 for i
in xrange(iterations
):
304 self
._box
.add(self
._template
% i
)
305 for i
, key
in enumerate(keys
):
306 self
.assertTrue(self
._box
.get_string(key
) == self
._template
% i
)
308 self
.assertTrue(len(self
._box
) == 0)
309 for i
, key
in enumerate(keys
):
310 self
.assertRaises(KeyError, lambda: self
._box
.get_string(key
))
313 # Get and remove a message using pop()
314 key0
= self
._box
.add(self
._template
% 0)
315 self
.assertTrue(key0
in self
._box
)
316 key1
= self
._box
.add(self
._template
% 1)
317 self
.assertTrue(key1
in self
._box
)
318 self
.assertTrue(self
._box
.pop(key0
).get_payload() == '0')
319 self
.assertTrue(key0
not in self
._box
)
320 self
.assertTrue(key1
in self
._box
)
321 key2
= self
._box
.add(self
._template
% 2)
322 self
.assertTrue(key2
in self
._box
)
323 self
.assertTrue(self
._box
.pop(key2
).get_payload() == '2')
324 self
.assertTrue(key2
not in self
._box
)
325 self
.assertTrue(key1
in self
._box
)
326 self
.assertTrue(self
._box
.pop(key1
).get_payload() == '1')
327 self
.assertTrue(key1
not in self
._box
)
328 self
.assertTrue(len(self
._box
) == 0)
330 def test_popitem(self
, iterations
=10):
331 # Get and remove an arbitrary (key, message) using popitem()
334 keys
.append(self
._box
.add(self
._template
% i
))
337 key
, msg
= self
._box
.popitem()
338 self
.assertTrue(key
in keys
)
339 self
.assertTrue(key
not in seen
)
341 self
.assertTrue(int(msg
.get_payload()) == keys
.index(key
))
342 self
.assertTrue(len(self
._box
) == 0)
344 self
.assertRaises(KeyError, lambda: self
._box
[key
])
346 def test_update(self
):
347 # Modify multiple messages using update()
348 key0
= self
._box
.add(self
._template
% 'original 0')
349 key1
= self
._box
.add(self
._template
% 'original 1')
350 key2
= self
._box
.add(self
._template
% 'original 2')
351 self
._box
.update({key0
: self
._template
% 'changed 0',
352 key2
: _sample_message
})
353 self
.assertTrue(len(self
._box
) == 3)
354 self
.assertTrue(self
._box
.get_string(key0
) ==
355 self
._template
% 'changed 0')
356 self
.assertTrue(self
._box
.get_string(key1
) ==
357 self
._template
% 'original 1')
358 self
._check
_sample
(self
._box
[key2
])
359 self
._box
.update([(key2
, self
._template
% 'changed 2'),
360 (key1
, self
._template
% 'changed 1'),
361 (key0
, self
._template
% 'original 0')])
362 self
.assertTrue(len(self
._box
) == 3)
363 self
.assertTrue(self
._box
.get_string(key0
) ==
364 self
._template
% 'original 0')
365 self
.assertTrue(self
._box
.get_string(key1
) ==
366 self
._template
% 'changed 1')
367 self
.assertTrue(self
._box
.get_string(key2
) ==
368 self
._template
% 'changed 2')
369 self
.assertRaises(KeyError,
370 lambda: self
._box
.update({'foo': 'bar',
371 key0
: self
._template
% "changed 0"}))
372 self
.assertTrue(len(self
._box
) == 3)
373 self
.assertTrue(self
._box
.get_string(key0
) ==
374 self
._template
% "changed 0")
375 self
.assertTrue(self
._box
.get_string(key1
) ==
376 self
._template
% "changed 1")
377 self
.assertTrue(self
._box
.get_string(key2
) ==
378 self
._template
% "changed 2")
380 def test_flush(self
):
381 # Write changes to disk
382 self
._test
_flush
_or
_close
(self
._box
.flush
, True)
384 def test_lock_unlock(self
):
385 # Lock and unlock the mailbox
386 self
.assertTrue(not os
.path
.exists(self
._get
_lock
_path
()))
388 self
.assertTrue(os
.path
.exists(self
._get
_lock
_path
()))
390 self
.assertTrue(not os
.path
.exists(self
._get
_lock
_path
()))
392 def test_close(self
):
393 # Close mailbox and flush changes to disk
394 self
._test
_flush
_or
_close
(self
._box
.close
, False)
396 def _test_flush_or_close(self
, method
, should_call_close
):
397 contents
= [self
._template
% i
for i
in xrange(3)]
398 self
._box
.add(contents
[0])
399 self
._box
.add(contents
[1])
400 self
._box
.add(contents
[2])
402 if should_call_close
:
404 self
._box
= self
._factory
(self
._path
)
405 keys
= self
._box
.keys()
406 self
.assertTrue(len(keys
) == 3)
408 self
.assertTrue(self
._box
.get_string(key
) in contents
)
410 def test_dump_message(self
):
411 # Write message representations to disk
412 for input in (email
.message_from_string(_sample_message
),
413 _sample_message
, StringIO
.StringIO(_sample_message
)):
414 output
= StringIO
.StringIO()
415 self
._box
._dump
_message
(input, output
)
416 self
.assertTrue(output
.getvalue() ==
417 _sample_message
.replace('\n', os
.linesep
))
418 output
= StringIO
.StringIO()
419 self
.assertRaises(TypeError,
420 lambda: self
._box
._dump
_message
(None, output
))
422 def _get_lock_path(self
):
423 # Return the path of the dot lock file. May be overridden.
424 return self
._path
+ '.lock'
427 class TestMailboxSuperclass(TestBase
):
429 def test_notimplemented(self
):
430 # Test that all Mailbox methods raise NotImplementedException.
431 box
= mailbox
.Mailbox('path')
432 self
.assertRaises(NotImplementedError, lambda: box
.add(''))
433 self
.assertRaises(NotImplementedError, lambda: box
.remove(''))
434 self
.assertRaises(NotImplementedError, lambda: box
.__delitem
__(''))
435 self
.assertRaises(NotImplementedError, lambda: box
.discard(''))
436 self
.assertRaises(NotImplementedError, lambda: box
.__setitem
__('', ''))
437 self
.assertRaises(NotImplementedError, lambda: box
.iterkeys())
438 self
.assertRaises(NotImplementedError, lambda: box
.keys())
439 self
.assertRaises(NotImplementedError, lambda: box
.itervalues().next())
440 self
.assertRaises(NotImplementedError, lambda: box
.__iter
__().next())
441 self
.assertRaises(NotImplementedError, lambda: box
.values())
442 self
.assertRaises(NotImplementedError, lambda: box
.iteritems().next())
443 self
.assertRaises(NotImplementedError, lambda: box
.items())
444 self
.assertRaises(NotImplementedError, lambda: box
.get(''))
445 self
.assertRaises(NotImplementedError, lambda: box
.__getitem
__(''))
446 self
.assertRaises(NotImplementedError, lambda: box
.get_message(''))
447 self
.assertRaises(NotImplementedError, lambda: box
.get_string(''))
448 self
.assertRaises(NotImplementedError, lambda: box
.get_file(''))
449 self
.assertRaises(NotImplementedError, lambda: box
.has_key(''))
450 self
.assertRaises(NotImplementedError, lambda: box
.__contains
__(''))
451 self
.assertRaises(NotImplementedError, lambda: box
.__len
__())
452 self
.assertRaises(NotImplementedError, lambda: box
.clear())
453 self
.assertRaises(NotImplementedError, lambda: box
.pop(''))
454 self
.assertRaises(NotImplementedError, lambda: box
.popitem())
455 self
.assertRaises(NotImplementedError, lambda: box
.update((('', ''),)))
456 self
.assertRaises(NotImplementedError, lambda: box
.flush())
457 self
.assertRaises(NotImplementedError, lambda: box
.lock())
458 self
.assertRaises(NotImplementedError, lambda: box
.unlock())
459 self
.assertRaises(NotImplementedError, lambda: box
.close())
462 class TestMaildir(TestMailbox
):
464 _factory
= lambda self
, path
, factory
=None: mailbox
.Maildir(path
, factory
)
467 TestMailbox
.setUp(self
)
468 if os
.name
in ('nt', 'os2') or sys
.platform
== 'cygwin':
469 self
._box
.colon
= '!'
471 def test_add_MM(self
):
472 # Add a MaildirMessage instance
473 msg
= mailbox
.MaildirMessage(self
._template
% 0)
474 msg
.set_subdir('cur')
476 key
= self
._box
.add(msg
)
477 self
.assertTrue(os
.path
.exists(os
.path
.join(self
._path
, 'cur', '%s%sfoo' %
478 (key
, self
._box
.colon
))))
480 def test_get_MM(self
):
481 # Get a MaildirMessage instance
482 msg
= mailbox
.MaildirMessage(self
._template
% 0)
483 msg
.set_subdir('cur')
485 key
= self
._box
.add(msg
)
486 msg_returned
= self
._box
.get_message(key
)
487 self
.assertTrue(isinstance(msg_returned
, mailbox
.MaildirMessage
))
488 self
.assertTrue(msg_returned
.get_subdir() == 'cur')
489 self
.assertTrue(msg_returned
.get_flags() == 'FR')
491 def test_set_MM(self
):
492 # Set with a MaildirMessage instance
493 msg0
= mailbox
.MaildirMessage(self
._template
% 0)
495 key
= self
._box
.add(msg0
)
496 msg_returned
= self
._box
.get_message(key
)
497 self
.assertTrue(msg_returned
.get_subdir() == 'new')
498 self
.assertTrue(msg_returned
.get_flags() == 'PT')
499 msg1
= mailbox
.MaildirMessage(self
._template
% 1)
500 self
._box
[key
] = msg1
501 msg_returned
= self
._box
.get_message(key
)
502 self
.assertTrue(msg_returned
.get_subdir() == 'new')
503 self
.assertTrue(msg_returned
.get_flags() == '')
504 self
.assertTrue(msg_returned
.get_payload() == '1')
505 msg2
= mailbox
.MaildirMessage(self
._template
% 2)
507 self
._box
[key
] = msg2
508 self
._box
[key
] = self
._template
% 3
509 msg_returned
= self
._box
.get_message(key
)
510 self
.assertTrue(msg_returned
.get_subdir() == 'new')
511 self
.assertTrue(msg_returned
.get_flags() == 'S')
512 self
.assertTrue(msg_returned
.get_payload() == '3')
514 def test_consistent_factory(self
):
516 msg
= mailbox
.MaildirMessage(self
._template
% 0)
517 msg
.set_subdir('cur')
519 key
= self
._box
.add(msg
)
521 # Create new mailbox with
522 class FakeMessage(mailbox
.MaildirMessage
):
524 box
= mailbox
.Maildir(self
._path
, factory
=FakeMessage
)
525 box
.colon
= self
._box
.colon
526 msg2
= box
.get_message(key
)
527 self
.assertTrue(isinstance(msg2
, FakeMessage
))
529 def test_initialize_new(self
):
530 # Initialize a non-existent mailbox
532 self
._box
= mailbox
.Maildir(self
._path
)
533 self
._check
_basics
(factory
=rfc822
.Message
)
534 self
._delete
_recursively
(self
._path
)
535 self
._box
= self
._factory
(self
._path
, factory
=None)
538 def test_initialize_existing(self
):
539 # Initialize an existing mailbox
541 for subdir
in '', 'tmp', 'new', 'cur':
542 os
.mkdir(os
.path
.normpath(os
.path
.join(self
._path
, subdir
)))
543 self
._box
= mailbox
.Maildir(self
._path
)
544 self
._check
_basics
(factory
=rfc822
.Message
)
545 self
._box
= mailbox
.Maildir(self
._path
, factory
=None)
548 def _check_basics(self
, factory
=None):
549 # (Used by test_open_new() and test_open_existing().)
550 self
.assertEqual(self
._box
._path
, os
.path
.abspath(self
._path
))
551 self
.assertEqual(self
._box
._factory
, factory
)
552 for subdir
in '', 'tmp', 'new', 'cur':
553 path
= os
.path
.join(self
._path
, subdir
)
554 mode
= os
.stat(path
)[stat
.ST_MODE
]
555 self
.assertTrue(stat
.S_ISDIR(mode
), "Not a directory: '%s'" % path
)
557 def test_list_folders(self
):
559 self
._box
.add_folder('one')
560 self
._box
.add_folder('two')
561 self
._box
.add_folder('three')
562 self
.assertTrue(len(self
._box
.list_folders()) == 3)
563 self
.assertTrue(set(self
._box
.list_folders()) ==
564 set(('one', 'two', 'three')))
566 def test_get_folder(self
):
568 self
._box
.add_folder('foo.bar')
569 folder0
= self
._box
.get_folder('foo.bar')
570 folder0
.add(self
._template
% 'bar')
571 self
.assertTrue(os
.path
.isdir(os
.path
.join(self
._path
, '.foo.bar')))
572 folder1
= self
._box
.get_folder('foo.bar')
573 self
.assertTrue(folder1
.get_string(folder1
.keys()[0]) == \
574 self
._template
% 'bar')
576 def test_add_and_remove_folders(self
):
578 self
._box
.add_folder('one')
579 self
._box
.add_folder('two')
580 self
.assertTrue(len(self
._box
.list_folders()) == 2)
581 self
.assertTrue(set(self
._box
.list_folders()) == set(('one', 'two')))
582 self
._box
.remove_folder('one')
583 self
.assertTrue(len(self
._box
.list_folders()) == 1)
584 self
.assertTrue(set(self
._box
.list_folders()) == set(('two',)))
585 self
._box
.add_folder('three')
586 self
.assertTrue(len(self
._box
.list_folders()) == 2)
587 self
.assertTrue(set(self
._box
.list_folders()) == set(('two', 'three')))
588 self
._box
.remove_folder('three')
589 self
.assertTrue(len(self
._box
.list_folders()) == 1)
590 self
.assertTrue(set(self
._box
.list_folders()) == set(('two',)))
591 self
._box
.remove_folder('two')
592 self
.assertTrue(len(self
._box
.list_folders()) == 0)
593 self
.assertTrue(self
._box
.list_folders() == [])
595 def test_clean(self
):
596 # Remove old files from 'tmp'
597 foo_path
= os
.path
.join(self
._path
, 'tmp', 'foo')
598 bar_path
= os
.path
.join(self
._path
, 'tmp', 'bar')
599 f
= open(foo_path
, 'w')
602 f
= open(bar_path
, 'w')
606 self
.assertTrue(os
.path
.exists(foo_path
))
607 self
.assertTrue(os
.path
.exists(bar_path
))
608 foo_stat
= os
.stat(foo_path
)
609 os
.utime(foo_path
, (time
.time() - 129600 - 2,
612 self
.assertTrue(not os
.path
.exists(foo_path
))
613 self
.assertTrue(os
.path
.exists(bar_path
))
615 def test_create_tmp(self
, repetitions
=10):
616 # Create files in tmp directory
617 hostname
= socket
.gethostname()
619 hostname
= hostname
.replace('/', r
'\057')
621 hostname
= hostname
.replace(':', r
'\072')
623 pattern
= re
.compile(r
"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
624 r
"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")
625 previous_groups
= None
626 for x
in xrange(repetitions
):
627 tmp_file
= self
._box
._create
_tmp
()
628 head
, tail
= os
.path
.split(tmp_file
.name
)
629 self
.assertEqual(head
, os
.path
.abspath(os
.path
.join(self
._path
,
631 "File in wrong location: '%s'" % head
)
632 match
= pattern
.match(tail
)
633 self
.assertTrue(match
is not None, "Invalid file name: '%s'" % tail
)
634 groups
= match
.groups()
635 if previous_groups
is not None:
636 self
.assertTrue(int(groups
[0] >= previous_groups
[0]),
637 "Non-monotonic seconds: '%s' before '%s'" %
638 (previous_groups
[0], groups
[0]))
639 self
.assertTrue(int(groups
[1] >= previous_groups
[1]) or
640 groups
[0] != groups
[1],
641 "Non-monotonic milliseconds: '%s' before '%s'" %
642 (previous_groups
[1], groups
[1]))
643 self
.assertTrue(int(groups
[2]) == pid
,
644 "Process ID mismatch: '%s' should be '%s'" %
646 self
.assertTrue(int(groups
[3]) == int(previous_groups
[3]) + 1,
647 "Non-sequential counter: '%s' before '%s'" %
648 (previous_groups
[3], groups
[3]))
649 self
.assertTrue(groups
[4] == hostname
,
650 "Host name mismatch: '%s' should be '%s'" %
651 (groups
[4], hostname
))
652 previous_groups
= groups
653 tmp_file
.write(_sample_message
)
655 self
.assertTrue(tmp_file
.read() == _sample_message
)
657 file_count
= len(os
.listdir(os
.path
.join(self
._path
, "tmp")))
658 self
.assertTrue(file_count
== repetitions
,
659 "Wrong file count: '%s' should be '%s'" %
660 (file_count
, repetitions
))
662 def test_refresh(self
):
663 # Update the table of contents
664 self
.assertTrue(self
._box
._toc
== {})
665 key0
= self
._box
.add(self
._template
% 0)
666 key1
= self
._box
.add(self
._template
% 1)
667 self
.assertTrue(self
._box
._toc
== {})
669 self
.assertTrue(self
._box
._toc
== {key0
: os
.path
.join('new', key0
),
670 key1
: os
.path
.join('new', key1
)})
671 key2
= self
._box
.add(self
._template
% 2)
672 self
.assertTrue(self
._box
._toc
== {key0
: os
.path
.join('new', key0
),
673 key1
: os
.path
.join('new', key1
)})
675 self
.assertTrue(self
._box
._toc
== {key0
: os
.path
.join('new', key0
),
676 key1
: os
.path
.join('new', key1
),
677 key2
: os
.path
.join('new', key2
)})
679 def test_lookup(self
):
680 # Look up message subpaths in the TOC
681 self
.assertRaises(KeyError, lambda: self
._box
._lookup
('foo'))
682 key0
= self
._box
.add(self
._template
% 0)
683 self
.assertTrue(self
._box
._lookup
(key0
) == os
.path
.join('new', key0
))
684 os
.remove(os
.path
.join(self
._path
, 'new', key0
))
685 self
.assertTrue(self
._box
._toc
== {key0
: os
.path
.join('new', key0
)})
686 self
.assertRaises(KeyError, lambda: self
._box
._lookup
(key0
))
687 self
.assertTrue(self
._box
._toc
== {})
689 def test_lock_unlock(self
):
690 # Lock and unlock the mailbox. For Maildir, this does nothing.
694 def test_folder (self
):
695 # Test for bug #1569790: verify that folders returned by .get_folder()
696 # use the same factory function.
697 def dummy_factory (s
):
699 box
= self
._factory
(self
._path
, factory
=dummy_factory
)
700 folder
= box
.add_folder('folder1')
701 self
.assertTrue(folder
._factory
is dummy_factory
)
703 folder1_alias
= box
.get_folder('folder1')
704 self
.assertTrue(folder1_alias
._factory
is dummy_factory
)
706 def test_directory_in_folder (self
):
707 # Test that mailboxes still work if there's a stray extra directory
710 self
._box
.add(mailbox
.Message(_sample_message
))
712 # Create a stray directory
713 os
.mkdir(os
.path
.join(self
._path
, 'cur', 'stray-dir'))
715 # Check that looping still works with the directory present.
716 for msg
in self
._box
:
719 def test_file_permissions(self
):
720 # Verify that message files are created without execute permissions
721 if not hasattr(os
, "stat") or not hasattr(os
, "umask"):
723 msg
= mailbox
.MaildirMessage(self
._template
% 0)
724 orig_umask
= os
.umask(0)
726 key
= self
._box
.add(msg
)
729 path
= os
.path
.join(self
._path
, self
._box
._lookup
(key
))
730 mode
= os
.stat(path
).st_mode
731 self
.assertTrue(mode
& 0111 == 0)
733 def test_folder_file_perms(self
):
734 # From bug #3228, we want to verify that the file created inside a Maildir
735 # subfolder isn't marked as executable.
736 if not hasattr(os
, "stat") or not hasattr(os
, "umask"):
739 orig_umask
= os
.umask(0)
741 subfolder
= self
._box
.add_folder('subfolder')
745 path
= os
.path
.join(subfolder
._path
, 'maildirfolder')
748 self
.assertFalse((perms
& 0111)) # Execute bits should all be off.
750 def test_reread(self
):
754 # Initially, the mailbox has not been read and the time is null.
755 assert getattr(self
._box
, '_last_read', None) is None
757 # Refresh mailbox; the times should now be set to something.
759 assert getattr(self
._box
, '_last_read', None) is not None
761 # Try calling _refresh() again; the modification times shouldn't have
762 # changed, so the mailbox should not be re-reading. Re-reading causes
763 # the ._toc attribute to be assigned a new dictionary object, so
764 # we'll check that the ._toc attribute isn't a different object.
765 orig_toc
= self
._box
._toc
767 return self
._box
._toc
is not orig_toc
769 time
.sleep(1) # Wait 1sec to ensure time.time()'s value changes
771 assert not refreshed()
773 # Now, write something into cur and remove it. This changes
774 # the mtime and should cause a re-read.
775 filename
= os
.path
.join(self
._path
, 'cur', 'stray-file')
776 f
= open(filename
, 'w')
782 class _TestMboxMMDF(TestMailbox
):
786 self
._delete
_recursively
(self
._path
)
787 for lock_remnant
in glob
.glob(self
._path
+ '.*'):
788 test_support
.unlink(lock_remnant
)
790 def test_add_from_string(self
):
791 # Add a string starting with 'From ' to the mailbox
792 key
= self
._box
.add('From foo@bar blah\nFrom: foo\n\n0')
793 self
.assertTrue(self
._box
[key
].get_from() == 'foo@bar blah')
794 self
.assertTrue(self
._box
[key
].get_payload() == '0')
796 def test_add_mbox_or_mmdf_message(self
):
797 # Add an mboxMessage or MMDFMessage
798 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
799 msg
= class_('From foo@bar blah\nFrom: foo\n\n0')
800 key
= self
._box
.add(msg
)
802 def test_open_close_open(self
):
803 # Open and inspect previously-created mailbox
804 values
= [self
._template
% i
for i
in xrange(3)]
808 mtime
= os
.path
.getmtime(self
._path
)
809 self
._box
= self
._factory
(self
._path
)
810 self
.assertTrue(len(self
._box
) == 3)
811 for key
in self
._box
.iterkeys():
812 self
.assertTrue(self
._box
.get_string(key
) in values
)
814 self
.assertTrue(mtime
== os
.path
.getmtime(self
._path
))
816 def test_add_and_close(self
):
817 # Verifying that closing a mailbox doesn't change added items
818 self
._box
.add(_sample_message
)
820 self
._box
.add(self
._template
% i
)
821 self
._box
.add(_sample_message
)
822 self
._box
._file
.flush()
823 self
._box
._file
.seek(0)
824 contents
= self
._box
._file
.read()
826 self
.assertTrue(contents
== open(self
._path
, 'rb').read())
827 self
._box
= self
._factory
(self
._path
)
829 def test_lock_conflict(self
):
830 # Fork off a subprocess that will lock the file for 2 seconds,
831 # unlock it, and then exit.
832 if not hasattr(os
, 'fork'):
836 # In the child, lock the mailbox.
842 # In the parent, sleep a bit to give the child time to acquire
846 self
.assertRaises(mailbox
.ExternalClashError
,
849 # Wait for child to exit. Locking should now succeed.
850 exited_pid
, status
= os
.waitpid(pid
, 0)
855 def test_relock(self
):
856 # Test case for bug #1575506: the mailbox class was locking the
857 # wrong file object in its flush() method.
858 msg
= "Subject: sub\n\nbody\n"
859 key1
= self
._box
.add(msg
)
863 self
._box
= self
._factory
(self
._path
)
865 key2
= self
._box
.add(msg
)
867 self
.assertTrue(self
._box
._locked
)
871 class TestMbox(_TestMboxMMDF
):
873 _factory
= lambda self
, path
, factory
=None: mailbox
.mbox(path
, factory
)
875 def test_file_perms(self
):
876 # From bug #3228, we want to verify that the mailbox file isn't executable,
877 # even if the umask is set to something that would leave executable bits set.
878 # We only run this test on platforms that support umask.
879 if hasattr(os
, 'umask') and hasattr(os
, 'stat'):
881 old_umask
= os
.umask(0077)
883 os
.unlink(self
._path
)
884 self
._box
= mailbox
.mbox(self
._path
, create
=True)
890 st
= os
.stat(self
._path
)
892 self
.assertFalse((perms
& 0111)) # Execute bits should all be off.
894 class TestMMDF(_TestMboxMMDF
):
896 _factory
= lambda self
, path
, factory
=None: mailbox
.MMDF(path
, factory
)
899 class TestMH(TestMailbox
):
901 _factory
= lambda self
, path
, factory
=None: mailbox
.MH(path
, factory
)
903 def test_list_folders(self
):
905 self
._box
.add_folder('one')
906 self
._box
.add_folder('two')
907 self
._box
.add_folder('three')
908 self
.assertTrue(len(self
._box
.list_folders()) == 3)
909 self
.assertTrue(set(self
._box
.list_folders()) ==
910 set(('one', 'two', 'three')))
912 def test_get_folder(self
):
914 def dummy_factory (s
):
916 self
._box
= self
._factory
(self
._path
, dummy_factory
)
918 new_folder
= self
._box
.add_folder('foo.bar')
919 folder0
= self
._box
.get_folder('foo.bar')
920 folder0
.add(self
._template
% 'bar')
921 self
.assertTrue(os
.path
.isdir(os
.path
.join(self
._path
, 'foo.bar')))
922 folder1
= self
._box
.get_folder('foo.bar')
923 self
.assertTrue(folder1
.get_string(folder1
.keys()[0]) == \
924 self
._template
% 'bar')
926 # Test for bug #1569790: verify that folders returned by .get_folder()
927 # use the same factory function.
928 self
.assertTrue(new_folder
._factory
is self
._box
._factory
)
929 self
.assertTrue(folder0
._factory
is self
._box
._factory
)
931 def test_add_and_remove_folders(self
):
933 self
._box
.add_folder('one')
934 self
._box
.add_folder('two')
935 self
.assertTrue(len(self
._box
.list_folders()) == 2)
936 self
.assertTrue(set(self
._box
.list_folders()) == set(('one', 'two')))
937 self
._box
.remove_folder('one')
938 self
.assertTrue(len(self
._box
.list_folders()) == 1)
939 self
.assertTrue(set(self
._box
.list_folders()) == set(('two',)))
940 self
._box
.add_folder('three')
941 self
.assertTrue(len(self
._box
.list_folders()) == 2)
942 self
.assertTrue(set(self
._box
.list_folders()) == set(('two', 'three')))
943 self
._box
.remove_folder('three')
944 self
.assertTrue(len(self
._box
.list_folders()) == 1)
945 self
.assertTrue(set(self
._box
.list_folders()) == set(('two',)))
946 self
._box
.remove_folder('two')
947 self
.assertTrue(len(self
._box
.list_folders()) == 0)
948 self
.assertTrue(self
._box
.list_folders() == [])
950 def test_sequences(self
):
951 # Get and set sequences
952 self
.assertTrue(self
._box
.get_sequences() == {})
953 msg0
= mailbox
.MHMessage(self
._template
% 0)
954 msg0
.add_sequence('foo')
955 key0
= self
._box
.add(msg0
)
956 self
.assertTrue(self
._box
.get_sequences() == {'foo':[key0
]})
957 msg1
= mailbox
.MHMessage(self
._template
% 1)
958 msg1
.set_sequences(['bar', 'replied', 'foo'])
959 key1
= self
._box
.add(msg1
)
960 self
.assertTrue(self
._box
.get_sequences() ==
961 {'foo':[key0
, key1
], 'bar':[key1
], 'replied':[key1
]})
962 msg0
.set_sequences(['flagged'])
963 self
._box
[key0
] = msg0
964 self
.assertTrue(self
._box
.get_sequences() ==
965 {'foo':[key1
], 'bar':[key1
], 'replied':[key1
],
967 self
._box
.remove(key1
)
968 self
.assertTrue(self
._box
.get_sequences() == {'flagged':[key0
]})
970 def test_issue2625(self
):
971 msg0
= mailbox
.MHMessage(self
._template
% 0)
972 msg0
.add_sequence('foo')
973 key0
= self
._box
.add(msg0
)
974 refmsg0
= self
._box
.get_message(key0
)
977 # Pack the contents of the mailbox
978 msg0
= mailbox
.MHMessage(self
._template
% 0)
979 msg1
= mailbox
.MHMessage(self
._template
% 1)
980 msg2
= mailbox
.MHMessage(self
._template
% 2)
981 msg3
= mailbox
.MHMessage(self
._template
% 3)
982 msg0
.set_sequences(['foo', 'unseen'])
983 msg1
.set_sequences(['foo'])
984 msg2
.set_sequences(['foo', 'flagged'])
985 msg3
.set_sequences(['foo', 'bar', 'replied'])
986 key0
= self
._box
.add(msg0
)
987 key1
= self
._box
.add(msg1
)
988 key2
= self
._box
.add(msg2
)
989 key3
= self
._box
.add(msg3
)
990 self
.assertTrue(self
._box
.get_sequences() ==
991 {'foo':[key0
,key1
,key2
,key3
], 'unseen':[key0
],
992 'flagged':[key2
], 'bar':[key3
], 'replied':[key3
]})
993 self
._box
.remove(key2
)
994 self
.assertTrue(self
._box
.get_sequences() ==
995 {'foo':[key0
,key1
,key3
], 'unseen':[key0
], 'bar':[key3
],
998 self
.assertTrue(self
._box
.keys() == [1, 2, 3])
1002 self
.assertTrue(self
._box
.get_sequences() ==
1003 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
1005 # Test case for packing while holding the mailbox locked.
1006 key0
= self
._box
.add(msg1
)
1007 key1
= self
._box
.add(msg1
)
1008 key2
= self
._box
.add(msg1
)
1009 key3
= self
._box
.add(msg1
)
1011 self
._box
.remove(key0
)
1012 self
._box
.remove(key2
)
1016 self
.assertTrue(self
._box
.get_sequences() ==
1017 {'foo':[1, 2, 3, 4, 5],
1018 'unseen':[1], 'bar':[3], 'replied':[3]})
1020 def _get_lock_path(self
):
1021 return os
.path
.join(self
._path
, '.mh_sequences.lock')
1024 class TestBabyl(TestMailbox
):
1026 _factory
= lambda self
, path
, factory
=None: mailbox
.Babyl(path
, factory
)
1030 self
._delete
_recursively
(self
._path
)
1031 for lock_remnant
in glob
.glob(self
._path
+ '.*'):
1032 test_support
.unlink(lock_remnant
)
1034 def test_labels(self
):
1035 # Get labels from the mailbox
1036 self
.assertTrue(self
._box
.get_labels() == [])
1037 msg0
= mailbox
.BabylMessage(self
._template
% 0)
1038 msg0
.add_label('foo')
1039 key0
= self
._box
.add(msg0
)
1040 self
.assertTrue(self
._box
.get_labels() == ['foo'])
1041 msg1
= mailbox
.BabylMessage(self
._template
% 1)
1042 msg1
.set_labels(['bar', 'answered', 'foo'])
1043 key1
= self
._box
.add(msg1
)
1044 self
.assertTrue(set(self
._box
.get_labels()) == set(['foo', 'bar']))
1045 msg0
.set_labels(['blah', 'filed'])
1046 self
._box
[key0
] = msg0
1047 self
.assertTrue(set(self
._box
.get_labels()) ==
1048 set(['foo', 'bar', 'blah']))
1049 self
._box
.remove(key1
)
1050 self
.assertTrue(set(self
._box
.get_labels()) == set(['blah']))
1053 class TestMessage(TestBase
):
1055 _factory
= mailbox
.Message
# Overridden by subclasses to reuse tests
1058 self
._path
= test_support
.TESTFN
1061 self
._delete
_recursively
(self
._path
)
1063 def test_initialize_with_eMM(self
):
1064 # Initialize based on email.message.Message instance
1065 eMM
= email
.message_from_string(_sample_message
)
1066 msg
= self
._factory
(eMM
)
1067 self
._post
_initialize
_hook
(msg
)
1068 self
._check
_sample
(msg
)
1070 def test_initialize_with_string(self
):
1071 # Initialize based on string
1072 msg
= self
._factory
(_sample_message
)
1073 self
._post
_initialize
_hook
(msg
)
1074 self
._check
_sample
(msg
)
1076 def test_initialize_with_file(self
):
1077 # Initialize based on contents of file
1078 f
= open(self
._path
, 'w+')
1079 f
.write(_sample_message
)
1081 msg
= self
._factory
(f
)
1082 self
._post
_initialize
_hook
(msg
)
1083 self
._check
_sample
(msg
)
1086 def test_initialize_with_nothing(self
):
1087 # Initialize without arguments
1088 msg
= self
._factory
()
1089 self
._post
_initialize
_hook
(msg
)
1090 self
.assertTrue(isinstance(msg
, email
.message
.Message
))
1091 self
.assertTrue(isinstance(msg
, mailbox
.Message
))
1092 self
.assertTrue(isinstance(msg
, self
._factory
))
1093 self
.assertTrue(msg
.keys() == [])
1094 self
.assertTrue(not msg
.is_multipart())
1095 self
.assertTrue(msg
.get_payload() == None)
1097 def test_initialize_incorrectly(self
):
1098 # Initialize with invalid argument
1099 self
.assertRaises(TypeError, lambda: self
._factory
(object()))
1101 def test_become_message(self
):
1102 # Take on the state of another message
1103 eMM
= email
.message_from_string(_sample_message
)
1104 msg
= self
._factory
()
1105 msg
._become
_message
(eMM
)
1106 self
._check
_sample
(msg
)
1108 def test_explain_to(self
):
1109 # Copy self's format-specific data to other message formats.
1110 # This test is superficial; better ones are in TestMessageConversion.
1111 msg
= self
._factory
()
1112 for class_
in (mailbox
.Message
, mailbox
.MaildirMessage
,
1113 mailbox
.mboxMessage
, mailbox
.MHMessage
,
1114 mailbox
.BabylMessage
, mailbox
.MMDFMessage
):
1115 other_msg
= class_()
1116 msg
._explain
_to
(other_msg
)
1117 other_msg
= email
.message
.Message()
1118 self
.assertRaises(TypeError, lambda: msg
._explain
_to
(other_msg
))
1120 def _post_initialize_hook(self
, msg
):
1121 # Overridden by subclasses to check extra things after initialization
1125 class TestMaildirMessage(TestMessage
):
1127 _factory
= mailbox
.MaildirMessage
1129 def _post_initialize_hook(self
, msg
):
1130 self
.assertTrue(msg
._subdir
== 'new')
1131 self
.assertTrue(msg
._info
== '')
1133 def test_subdir(self
):
1134 # Use get_subdir() and set_subdir()
1135 msg
= mailbox
.MaildirMessage(_sample_message
)
1136 self
.assertTrue(msg
.get_subdir() == 'new')
1137 msg
.set_subdir('cur')
1138 self
.assertTrue(msg
.get_subdir() == 'cur')
1139 msg
.set_subdir('new')
1140 self
.assertTrue(msg
.get_subdir() == 'new')
1141 self
.assertRaises(ValueError, lambda: msg
.set_subdir('tmp'))
1142 self
.assertTrue(msg
.get_subdir() == 'new')
1143 msg
.set_subdir('new')
1144 self
.assertTrue(msg
.get_subdir() == 'new')
1145 self
._check
_sample
(msg
)
1147 def test_flags(self
):
1148 # Use get_flags(), set_flags(), add_flag(), remove_flag()
1149 msg
= mailbox
.MaildirMessage(_sample_message
)
1150 self
.assertTrue(msg
.get_flags() == '')
1151 self
.assertTrue(msg
.get_subdir() == 'new')
1153 self
.assertTrue(msg
.get_subdir() == 'new')
1154 self
.assertTrue(msg
.get_flags() == 'F')
1155 msg
.set_flags('SDTP')
1156 self
.assertTrue(msg
.get_flags() == 'DPST')
1158 self
.assertTrue(msg
.get_flags() == 'DFPST')
1159 msg
.remove_flag('TDRP')
1160 self
.assertTrue(msg
.get_flags() == 'FS')
1161 self
.assertTrue(msg
.get_subdir() == 'new')
1162 self
._check
_sample
(msg
)
1164 def test_date(self
):
1165 # Use get_date() and set_date()
1166 msg
= mailbox
.MaildirMessage(_sample_message
)
1167 self
.assertTrue(abs(msg
.get_date() - time
.time()) < 60)
1169 self
.assertTrue(msg
.get_date() == 0.0)
1171 def test_info(self
):
1172 # Use get_info() and set_info()
1173 msg
= mailbox
.MaildirMessage(_sample_message
)
1174 self
.assertTrue(msg
.get_info() == '')
1175 msg
.set_info('1,foo=bar')
1176 self
.assertTrue(msg
.get_info() == '1,foo=bar')
1177 self
.assertRaises(TypeError, lambda: msg
.set_info(None))
1178 self
._check
_sample
(msg
)
1180 def test_info_and_flags(self
):
1181 # Test interaction of info and flag methods
1182 msg
= mailbox
.MaildirMessage(_sample_message
)
1183 self
.assertTrue(msg
.get_info() == '')
1185 self
.assertTrue(msg
.get_flags() == 'FS')
1186 self
.assertTrue(msg
.get_info() == '2,FS')
1188 self
.assertTrue(msg
.get_flags() == '')
1189 self
.assertTrue(msg
.get_info() == '1,')
1190 msg
.remove_flag('RPT')
1191 self
.assertTrue(msg
.get_flags() == '')
1192 self
.assertTrue(msg
.get_info() == '1,')
1194 self
.assertTrue(msg
.get_flags() == 'D')
1195 self
.assertTrue(msg
.get_info() == '2,D')
1196 self
._check
_sample
(msg
)
1199 class _TestMboxMMDFMessage(TestMessage
):
1201 _factory
= mailbox
._mboxMMDFMessage
1203 def _post_initialize_hook(self
, msg
):
1204 self
._check
_from
(msg
)
1206 def test_initialize_with_unixfrom(self
):
1207 # Initialize with a message that already has a _unixfrom attribute
1208 msg
= mailbox
.Message(_sample_message
)
1209 msg
.set_unixfrom('From foo@bar blah')
1210 msg
= mailbox
.mboxMessage(msg
)
1211 self
.assertTrue(msg
.get_from() == 'foo@bar blah', msg
.get_from())
1213 def test_from(self
):
1214 # Get and set "From " line
1215 msg
= mailbox
.mboxMessage(_sample_message
)
1216 self
._check
_from
(msg
)
1217 msg
.set_from('foo bar')
1218 self
.assertTrue(msg
.get_from() == 'foo bar')
1219 msg
.set_from('foo@bar', True)
1220 self
._check
_from
(msg
, 'foo@bar')
1221 msg
.set_from('blah@temp', time
.localtime())
1222 self
._check
_from
(msg
, 'blah@temp')
1224 def test_flags(self
):
1225 # Use get_flags(), set_flags(), add_flag(), remove_flag()
1226 msg
= mailbox
.mboxMessage(_sample_message
)
1227 self
.assertTrue(msg
.get_flags() == '')
1229 self
.assertTrue(msg
.get_flags() == 'F')
1230 msg
.set_flags('XODR')
1231 self
.assertTrue(msg
.get_flags() == 'RODX')
1233 self
.assertTrue(msg
.get_flags() == 'RODFAX')
1234 msg
.remove_flag('FDXA')
1235 self
.assertTrue(msg
.get_flags() == 'RO')
1236 self
._check
_sample
(msg
)
1238 def _check_from(self
, msg
, sender
=None):
1239 # Check contents of "From " line
1241 sender
= "MAILER-DAEMON"
1242 self
.assertTrue(re
.match(sender
+ r
" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:"
1243 r
"\d{2} \d{4}", msg
.get_from()) is not None)
1246 class TestMboxMessage(_TestMboxMMDFMessage
):
1248 _factory
= mailbox
.mboxMessage
1251 class TestMHMessage(TestMessage
):
1253 _factory
= mailbox
.MHMessage
1255 def _post_initialize_hook(self
, msg
):
1256 self
.assertTrue(msg
._sequences
== [])
1258 def test_sequences(self
):
1259 # Get, set, join, and leave sequences
1260 msg
= mailbox
.MHMessage(_sample_message
)
1261 self
.assertTrue(msg
.get_sequences() == [])
1262 msg
.set_sequences(['foobar'])
1263 self
.assertTrue(msg
.get_sequences() == ['foobar'])
1264 msg
.set_sequences([])
1265 self
.assertTrue(msg
.get_sequences() == [])
1266 msg
.add_sequence('unseen')
1267 self
.assertTrue(msg
.get_sequences() == ['unseen'])
1268 msg
.add_sequence('flagged')
1269 self
.assertTrue(msg
.get_sequences() == ['unseen', 'flagged'])
1270 msg
.add_sequence('flagged')
1271 self
.assertTrue(msg
.get_sequences() == ['unseen', 'flagged'])
1272 msg
.remove_sequence('unseen')
1273 self
.assertTrue(msg
.get_sequences() == ['flagged'])
1274 msg
.add_sequence('foobar')
1275 self
.assertTrue(msg
.get_sequences() == ['flagged', 'foobar'])
1276 msg
.remove_sequence('replied')
1277 self
.assertTrue(msg
.get_sequences() == ['flagged', 'foobar'])
1278 msg
.set_sequences(['foobar', 'replied'])
1279 self
.assertTrue(msg
.get_sequences() == ['foobar', 'replied'])
1282 class TestBabylMessage(TestMessage
):
1284 _factory
= mailbox
.BabylMessage
1286 def _post_initialize_hook(self
, msg
):
1287 self
.assertTrue(msg
._labels
== [])
1289 def test_labels(self
):
1290 # Get, set, join, and leave labels
1291 msg
= mailbox
.BabylMessage(_sample_message
)
1292 self
.assertTrue(msg
.get_labels() == [])
1293 msg
.set_labels(['foobar'])
1294 self
.assertTrue(msg
.get_labels() == ['foobar'])
1296 self
.assertTrue(msg
.get_labels() == [])
1297 msg
.add_label('filed')
1298 self
.assertTrue(msg
.get_labels() == ['filed'])
1299 msg
.add_label('resent')
1300 self
.assertTrue(msg
.get_labels() == ['filed', 'resent'])
1301 msg
.add_label('resent')
1302 self
.assertTrue(msg
.get_labels() == ['filed', 'resent'])
1303 msg
.remove_label('filed')
1304 self
.assertTrue(msg
.get_labels() == ['resent'])
1305 msg
.add_label('foobar')
1306 self
.assertTrue(msg
.get_labels() == ['resent', 'foobar'])
1307 msg
.remove_label('unseen')
1308 self
.assertTrue(msg
.get_labels() == ['resent', 'foobar'])
1309 msg
.set_labels(['foobar', 'answered'])
1310 self
.assertTrue(msg
.get_labels() == ['foobar', 'answered'])
1312 def test_visible(self
):
1313 # Get, set, and update visible headers
1314 msg
= mailbox
.BabylMessage(_sample_message
)
1315 visible
= msg
.get_visible()
1316 self
.assertTrue(visible
.keys() == [])
1317 self
.assertTrue(visible
.get_payload() is None)
1318 visible
['User-Agent'] = 'FooBar 1.0'
1319 visible
['X-Whatever'] = 'Blah'
1320 self
.assertTrue(msg
.get_visible().keys() == [])
1321 msg
.set_visible(visible
)
1322 visible
= msg
.get_visible()
1323 self
.assertTrue(visible
.keys() == ['User-Agent', 'X-Whatever'])
1324 self
.assertTrue(visible
['User-Agent'] == 'FooBar 1.0')
1325 self
.assertTrue(visible
['X-Whatever'] == 'Blah')
1326 self
.assertTrue(visible
.get_payload() is None)
1327 msg
.update_visible()
1328 self
.assertTrue(visible
.keys() == ['User-Agent', 'X-Whatever'])
1329 self
.assertTrue(visible
.get_payload() is None)
1330 visible
= msg
.get_visible()
1331 self
.assertTrue(visible
.keys() == ['User-Agent', 'Date', 'From', 'To',
1333 for header
in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
1334 self
.assertTrue(visible
[header
] == msg
[header
])
1337 class TestMMDFMessage(_TestMboxMMDFMessage
):
1339 _factory
= mailbox
.MMDFMessage
1342 class TestMessageConversion(TestBase
):
1344 def test_plain_to_x(self
):
1345 # Convert Message to all formats
1346 for class_
in (mailbox
.Message
, mailbox
.MaildirMessage
,
1347 mailbox
.mboxMessage
, mailbox
.MHMessage
,
1348 mailbox
.BabylMessage
, mailbox
.MMDFMessage
):
1349 msg_plain
= mailbox
.Message(_sample_message
)
1350 msg
= class_(msg_plain
)
1351 self
._check
_sample
(msg
)
1353 def test_x_to_plain(self
):
1354 # Convert all formats to Message
1355 for class_
in (mailbox
.Message
, mailbox
.MaildirMessage
,
1356 mailbox
.mboxMessage
, mailbox
.MHMessage
,
1357 mailbox
.BabylMessage
, mailbox
.MMDFMessage
):
1358 msg
= class_(_sample_message
)
1359 msg_plain
= mailbox
.Message(msg
)
1360 self
._check
_sample
(msg_plain
)
1362 def test_x_to_invalid(self
):
1363 # Convert all formats to an invalid format
1364 for class_
in (mailbox
.Message
, mailbox
.MaildirMessage
,
1365 mailbox
.mboxMessage
, mailbox
.MHMessage
,
1366 mailbox
.BabylMessage
, mailbox
.MMDFMessage
):
1367 self
.assertRaises(TypeError, lambda: class_(False))
1369 def test_maildir_to_maildir(self
):
1370 # Convert MaildirMessage to MaildirMessage
1371 msg_maildir
= mailbox
.MaildirMessage(_sample_message
)
1372 msg_maildir
.set_flags('DFPRST')
1373 msg_maildir
.set_subdir('cur')
1374 date
= msg_maildir
.get_date()
1375 msg
= mailbox
.MaildirMessage(msg_maildir
)
1376 self
._check
_sample
(msg
)
1377 self
.assertTrue(msg
.get_flags() == 'DFPRST')
1378 self
.assertTrue(msg
.get_subdir() == 'cur')
1379 self
.assertTrue(msg
.get_date() == date
)
1381 def test_maildir_to_mboxmmdf(self
):
1382 # Convert MaildirMessage to mboxmessage and MMDFMessage
1383 pairs
= (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
1384 ('T', 'D'), ('DFPRST', 'RDFA'))
1385 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1386 msg_maildir
= mailbox
.MaildirMessage(_sample_message
)
1387 msg_maildir
.set_date(0.0)
1388 for setting
, result
in pairs
:
1389 msg_maildir
.set_flags(setting
)
1390 msg
= class_(msg_maildir
)
1391 self
.assertTrue(msg
.get_flags() == result
)
1392 self
.assertTrue(msg
.get_from() == 'MAILER-DAEMON %s' %
1393 time
.asctime(time
.gmtime(0.0)))
1394 msg_maildir
.set_subdir('cur')
1395 self
.assertTrue(class_(msg_maildir
).get_flags() == 'RODFA')
1397 def test_maildir_to_mh(self
):
1398 # Convert MaildirMessage to MHMessage
1399 msg_maildir
= mailbox
.MaildirMessage(_sample_message
)
1400 pairs
= (('D', ['unseen']), ('F', ['unseen', 'flagged']),
1401 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
1402 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
1403 for setting
, result
in pairs
:
1404 msg_maildir
.set_flags(setting
)
1405 self
.assertTrue(mailbox
.MHMessage(msg_maildir
).get_sequences() == \
1408 def test_maildir_to_babyl(self
):
1409 # Convert MaildirMessage to Babyl
1410 msg_maildir
= mailbox
.MaildirMessage(_sample_message
)
1411 pairs
= (('D', ['unseen']), ('F', ['unseen']),
1412 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
1413 ('S', []), ('T', ['unseen', 'deleted']),
1414 ('DFPRST', ['deleted', 'answered', 'forwarded']))
1415 for setting
, result
in pairs
:
1416 msg_maildir
.set_flags(setting
)
1417 self
.assertTrue(mailbox
.BabylMessage(msg_maildir
).get_labels() == \
1420 def test_mboxmmdf_to_maildir(self
):
1421 # Convert mboxMessage and MMDFMessage to MaildirMessage
1422 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1423 msg_mboxMMDF
= class_(_sample_message
)
1424 msg_mboxMMDF
.set_from('foo@bar', time
.gmtime(0.0))
1425 pairs
= (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
1427 for setting
, result
in pairs
:
1428 msg_mboxMMDF
.set_flags(setting
)
1429 msg
= mailbox
.MaildirMessage(msg_mboxMMDF
)
1430 self
.assertTrue(msg
.get_flags() == result
)
1431 self
.assertTrue(msg
.get_date() == 0.0, msg
.get_date())
1432 msg_mboxMMDF
.set_flags('O')
1433 self
.assertTrue(mailbox
.MaildirMessage(msg_mboxMMDF
).get_subdir() == \
1436 def test_mboxmmdf_to_mboxmmdf(self
):
1437 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
1438 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1439 msg_mboxMMDF
= class_(_sample_message
)
1440 msg_mboxMMDF
.set_flags('RODFA')
1441 msg_mboxMMDF
.set_from('foo@bar')
1442 for class2_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1443 msg2
= class2_(msg_mboxMMDF
)
1444 self
.assertTrue(msg2
.get_flags() == 'RODFA')
1445 self
.assertTrue(msg2
.get_from() == 'foo@bar')
1447 def test_mboxmmdf_to_mh(self
):
1448 # Convert mboxMessage and MMDFMessage to MHMessage
1449 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1450 msg_mboxMMDF
= class_(_sample_message
)
1451 pairs
= (('R', []), ('O', ['unseen']), ('D', ['unseen']),
1452 ('F', ['unseen', 'flagged']),
1453 ('A', ['unseen', 'replied']),
1454 ('RODFA', ['replied', 'flagged']))
1455 for setting
, result
in pairs
:
1456 msg_mboxMMDF
.set_flags(setting
)
1457 self
.assertTrue(mailbox
.MHMessage(msg_mboxMMDF
).get_sequences() \
1460 def test_mboxmmdf_to_babyl(self
):
1461 # Convert mboxMessage and MMDFMessage to BabylMessage
1462 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1463 msg
= class_(_sample_message
)
1464 pairs
= (('R', []), ('O', ['unseen']),
1465 ('D', ['unseen', 'deleted']), ('F', ['unseen']),
1466 ('A', ['unseen', 'answered']),
1467 ('RODFA', ['deleted', 'answered']))
1468 for setting
, result
in pairs
:
1469 msg
.set_flags(setting
)
1470 self
.assertTrue(mailbox
.BabylMessage(msg
).get_labels() == result
)
1472 def test_mh_to_maildir(self
):
1473 # Convert MHMessage to MaildirMessage
1474 pairs
= (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
1475 for setting
, result
in pairs
:
1476 msg
= mailbox
.MHMessage(_sample_message
)
1477 msg
.add_sequence(setting
)
1478 self
.assertTrue(mailbox
.MaildirMessage(msg
).get_flags() == result
)
1479 self
.assertTrue(mailbox
.MaildirMessage(msg
).get_subdir() == 'cur')
1480 msg
= mailbox
.MHMessage(_sample_message
)
1481 msg
.add_sequence('unseen')
1482 msg
.add_sequence('replied')
1483 msg
.add_sequence('flagged')
1484 self
.assertTrue(mailbox
.MaildirMessage(msg
).get_flags() == 'FR')
1485 self
.assertTrue(mailbox
.MaildirMessage(msg
).get_subdir() == 'cur')
1487 def test_mh_to_mboxmmdf(self
):
1488 # Convert MHMessage to mboxMessage and MMDFMessage
1489 pairs
= (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
1490 for setting
, result
in pairs
:
1491 msg
= mailbox
.MHMessage(_sample_message
)
1492 msg
.add_sequence(setting
)
1493 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1494 self
.assertTrue(class_(msg
).get_flags() == result
)
1495 msg
= mailbox
.MHMessage(_sample_message
)
1496 msg
.add_sequence('unseen')
1497 msg
.add_sequence('replied')
1498 msg
.add_sequence('flagged')
1499 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1500 self
.assertTrue(class_(msg
).get_flags() == 'OFA')
1502 def test_mh_to_mh(self
):
1503 # Convert MHMessage to MHMessage
1504 msg
= mailbox
.MHMessage(_sample_message
)
1505 msg
.add_sequence('unseen')
1506 msg
.add_sequence('replied')
1507 msg
.add_sequence('flagged')
1508 self
.assertTrue(mailbox
.MHMessage(msg
).get_sequences() == \
1509 ['unseen', 'replied', 'flagged'])
1511 def test_mh_to_babyl(self
):
1512 # Convert MHMessage to BabylMessage
1513 pairs
= (('unseen', ['unseen']), ('replied', ['answered']),
1515 for setting
, result
in pairs
:
1516 msg
= mailbox
.MHMessage(_sample_message
)
1517 msg
.add_sequence(setting
)
1518 self
.assertTrue(mailbox
.BabylMessage(msg
).get_labels() == result
)
1519 msg
= mailbox
.MHMessage(_sample_message
)
1520 msg
.add_sequence('unseen')
1521 msg
.add_sequence('replied')
1522 msg
.add_sequence('flagged')
1523 self
.assertTrue(mailbox
.BabylMessage(msg
).get_labels() == \
1524 ['unseen', 'answered'])
1526 def test_babyl_to_maildir(self
):
1527 # Convert BabylMessage to MaildirMessage
1528 pairs
= (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
1529 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
1531 for setting
, result
in pairs
:
1532 msg
= mailbox
.BabylMessage(_sample_message
)
1533 msg
.add_label(setting
)
1534 self
.assertTrue(mailbox
.MaildirMessage(msg
).get_flags() == result
)
1535 self
.assertTrue(mailbox
.MaildirMessage(msg
).get_subdir() == 'cur')
1536 msg
= mailbox
.BabylMessage(_sample_message
)
1537 for label
in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1538 'edited', 'resent'):
1539 msg
.add_label(label
)
1540 self
.assertTrue(mailbox
.MaildirMessage(msg
).get_flags() == 'PRT')
1541 self
.assertTrue(mailbox
.MaildirMessage(msg
).get_subdir() == 'cur')
1543 def test_babyl_to_mboxmmdf(self
):
1544 # Convert BabylMessage to mboxMessage and MMDFMessage
1545 pairs
= (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
1546 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
1548 for setting
, result
in pairs
:
1549 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1550 msg
= mailbox
.BabylMessage(_sample_message
)
1551 msg
.add_label(setting
)
1552 self
.assertTrue(class_(msg
).get_flags() == result
)
1553 msg
= mailbox
.BabylMessage(_sample_message
)
1554 for label
in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1555 'edited', 'resent'):
1556 msg
.add_label(label
)
1557 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1558 self
.assertTrue(class_(msg
).get_flags() == 'ODA')
1560 def test_babyl_to_mh(self
):
1561 # Convert BabylMessage to MHMessage
1562 pairs
= (('unseen', ['unseen']), ('deleted', []), ('filed', []),
1563 ('answered', ['replied']), ('forwarded', []), ('edited', []),
1565 for setting
, result
in pairs
:
1566 msg
= mailbox
.BabylMessage(_sample_message
)
1567 msg
.add_label(setting
)
1568 self
.assertTrue(mailbox
.MHMessage(msg
).get_sequences() == result
)
1569 msg
= mailbox
.BabylMessage(_sample_message
)
1570 for label
in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1571 'edited', 'resent'):
1572 msg
.add_label(label
)
1573 self
.assertTrue(mailbox
.MHMessage(msg
).get_sequences() == \
1574 ['unseen', 'replied'])
1576 def test_babyl_to_babyl(self
):
1577 # Convert BabylMessage to BabylMessage
1578 msg
= mailbox
.BabylMessage(_sample_message
)
1579 msg
.update_visible()
1580 for label
in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1581 'edited', 'resent'):
1582 msg
.add_label(label
)
1583 msg2
= mailbox
.BabylMessage(msg
)
1584 self
.assertTrue(msg2
.get_labels() == ['unseen', 'deleted', 'filed',
1585 'answered', 'forwarded', 'edited',
1587 self
.assertTrue(msg
.get_visible().keys() == msg2
.get_visible().keys())
1588 for key
in msg
.get_visible().keys():
1589 self
.assertTrue(msg
.get_visible()[key
] == msg2
.get_visible()[key
])
1592 class TestProxyFileBase(TestBase
):
1594 def _test_read(self
, proxy
):
1597 self
.assertTrue(proxy
.read() == 'bar')
1599 self
.assertTrue(proxy
.read() == 'ar')
1601 self
.assertTrue(proxy
.read(2) == 'ba')
1603 self
.assertTrue(proxy
.read(-1) == 'ar')
1605 self
.assertTrue(proxy
.read(1000) == 'r')
1607 def _test_readline(self
, proxy
):
1610 self
.assertTrue(proxy
.readline() == 'foo' + os
.linesep
)
1611 self
.assertTrue(proxy
.readline() == 'bar' + os
.linesep
)
1612 self
.assertTrue(proxy
.readline() == 'fred' + os
.linesep
)
1613 self
.assertTrue(proxy
.readline() == 'bob')
1615 self
.assertTrue(proxy
.readline() == 'o' + os
.linesep
)
1616 proxy
.seek(6 + 2 * len(os
.linesep
))
1617 self
.assertTrue(proxy
.readline() == 'fred' + os
.linesep
)
1618 proxy
.seek(6 + 2 * len(os
.linesep
))
1619 self
.assertTrue(proxy
.readline(2) == 'fr')
1620 self
.assertTrue(proxy
.readline(-10) == 'ed' + os
.linesep
)
1622 def _test_readlines(self
, proxy
):
1623 # Read multiple lines
1625 self
.assertTrue(proxy
.readlines() == ['foo' + os
.linesep
,
1627 'fred' + os
.linesep
, 'bob'])
1629 self
.assertTrue(proxy
.readlines(2) == ['foo' + os
.linesep
])
1630 proxy
.seek(3 + len(os
.linesep
))
1631 self
.assertTrue(proxy
.readlines(4 + len(os
.linesep
)) ==
1632 ['bar' + os
.linesep
, 'fred' + os
.linesep
])
1634 self
.assertTrue(proxy
.readlines(1000) == [os
.linesep
, 'bar' + os
.linesep
,
1635 'fred' + os
.linesep
, 'bob'])
1637 def _test_iteration(self
, proxy
):
1640 iterator
= iter(proxy
)
1641 self
.assertTrue(iterator
.next() == 'foo' + os
.linesep
)
1642 self
.assertTrue(iterator
.next() == 'bar' + os
.linesep
)
1643 self
.assertTrue(iterator
.next() == 'fred' + os
.linesep
)
1644 self
.assertTrue(iterator
.next() == 'bob')
1645 self
.assertRaises(StopIteration, lambda: iterator
.next())
1647 def _test_seek_and_tell(self
, proxy
):
1648 # Seek and use tell to check position
1650 self
.assertTrue(proxy
.tell() == 3)
1651 self
.assertTrue(proxy
.read(len(os
.linesep
)) == os
.linesep
)
1653 self
.assertTrue(proxy
.read(1 + len(os
.linesep
)) == 'r' + os
.linesep
)
1654 proxy
.seek(-3 - len(os
.linesep
), 2)
1655 self
.assertTrue(proxy
.read(3) == 'bar')
1657 self
.assertTrue(proxy
.read() == 'o' + os
.linesep
+ 'bar' + os
.linesep
)
1659 self
.assertTrue(proxy
.read() == '')
1661 def _test_close(self
, proxy
):
1664 self
.assertRaises(AttributeError, lambda: proxy
.close())
1667 class TestProxyFile(TestProxyFileBase
):
1670 self
._path
= test_support
.TESTFN
1671 self
._file
= open(self
._path
, 'wb+')
1675 self
._delete
_recursively
(self
._path
)
1677 def test_initialize(self
):
1678 # Initialize and check position
1679 self
._file
.write('foo')
1680 pos
= self
._file
.tell()
1681 proxy0
= mailbox
._ProxyFile
(self
._file
)
1682 self
.assertTrue(proxy0
.tell() == pos
)
1683 self
.assertTrue(self
._file
.tell() == pos
)
1684 proxy1
= mailbox
._ProxyFile
(self
._file
, 0)
1685 self
.assertTrue(proxy1
.tell() == 0)
1686 self
.assertTrue(self
._file
.tell() == pos
)
1688 def test_read(self
):
1689 self
._file
.write('bar')
1690 self
._test
_read
(mailbox
._ProxyFile
(self
._file
))
1692 def test_readline(self
):
1693 self
._file
.write('foo%sbar%sfred%sbob' % (os
.linesep
, os
.linesep
,
1695 self
._test
_readline
(mailbox
._ProxyFile
(self
._file
))
1697 def test_readlines(self
):
1698 self
._file
.write('foo%sbar%sfred%sbob' % (os
.linesep
, os
.linesep
,
1700 self
._test
_readlines
(mailbox
._ProxyFile
(self
._file
))
1702 def test_iteration(self
):
1703 self
._file
.write('foo%sbar%sfred%sbob' % (os
.linesep
, os
.linesep
,
1705 self
._test
_iteration
(mailbox
._ProxyFile
(self
._file
))
1707 def test_seek_and_tell(self
):
1708 self
._file
.write('foo%sbar%s' % (os
.linesep
, os
.linesep
))
1709 self
._test
_seek
_and
_tell
(mailbox
._ProxyFile
(self
._file
))
1711 def test_close(self
):
1712 self
._file
.write('foo%sbar%s' % (os
.linesep
, os
.linesep
))
1713 self
._test
_close
(mailbox
._ProxyFile
(self
._file
))
1716 class TestPartialFile(TestProxyFileBase
):
1719 self
._path
= test_support
.TESTFN
1720 self
._file
= open(self
._path
, 'wb+')
1724 self
._delete
_recursively
(self
._path
)
1726 def test_initialize(self
):
1727 # Initialize and check position
1728 self
._file
.write('foo' + os
.linesep
+ 'bar')
1729 pos
= self
._file
.tell()
1730 proxy
= mailbox
._PartialFile
(self
._file
, 2, 5)
1731 self
.assertTrue(proxy
.tell() == 0)
1732 self
.assertTrue(self
._file
.tell() == pos
)
1734 def test_read(self
):
1735 self
._file
.write('***bar***')
1736 self
._test
_read
(mailbox
._PartialFile
(self
._file
, 3, 6))
1738 def test_readline(self
):
1739 self
._file
.write('!!!!!foo%sbar%sfred%sbob!!!!!' %
1740 (os
.linesep
, os
.linesep
, os
.linesep
))
1741 self
._test
_readline
(mailbox
._PartialFile
(self
._file
, 5,
1742 18 + 3 * len(os
.linesep
)))
1744 def test_readlines(self
):
1745 self
._file
.write('foo%sbar%sfred%sbob?????' %
1746 (os
.linesep
, os
.linesep
, os
.linesep
))
1747 self
._test
_readlines
(mailbox
._PartialFile
(self
._file
, 0,
1748 13 + 3 * len(os
.linesep
)))
1750 def test_iteration(self
):
1751 self
._file
.write('____foo%sbar%sfred%sbob####' %
1752 (os
.linesep
, os
.linesep
, os
.linesep
))
1753 self
._test
_iteration
(mailbox
._PartialFile
(self
._file
, 4,
1754 17 + 3 * len(os
.linesep
)))
1756 def test_seek_and_tell(self
):
1757 self
._file
.write('(((foo%sbar%s$$$' % (os
.linesep
, os
.linesep
))
1758 self
._test
_seek
_and
_tell
(mailbox
._PartialFile
(self
._file
, 3,
1759 9 + 2 * len(os
.linesep
)))
1761 def test_close(self
):
1762 self
._file
.write('&foo%sbar%s^' % (os
.linesep
, os
.linesep
))
1763 self
._test
_close
(mailbox
._PartialFile
(self
._file
, 1,
1764 6 + 3 * len(os
.linesep
)))
1767 ## Start: tests from the original module (for backward compatibility).
1769 FROM_
= "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n"
1770 DUMMY_MESSAGE
= """\
1771 From: some.body@dummy.domain
1773 Subject: Simple Test
1775 This is a dummy message.
1778 class MaildirTestCase(unittest
.TestCase
):
1781 # create a new maildir mailbox to work with:
1782 self
._dir
= test_support
.TESTFN
1784 os
.mkdir(os
.path
.join(self
._dir
, "cur"))
1785 os
.mkdir(os
.path
.join(self
._dir
, "tmp"))
1786 os
.mkdir(os
.path
.join(self
._dir
, "new"))
1791 map(os
.unlink
, self
._msgfiles
)
1792 os
.rmdir(os
.path
.join(self
._dir
, "cur"))
1793 os
.rmdir(os
.path
.join(self
._dir
, "tmp"))
1794 os
.rmdir(os
.path
.join(self
._dir
, "new"))
1797 def createMessage(self
, dir, mbox
=False):
1798 t
= int(time
.time() % 1000000)
1801 filename
= os
.extsep
.join((str(t
), str(pid
), "myhostname", "mydomain"))
1802 tmpname
= os
.path
.join(self
._dir
, "tmp", filename
)
1803 newname
= os
.path
.join(self
._dir
, dir, filename
)
1804 fp
= open(tmpname
, "w")
1805 self
._msgfiles
.append(tmpname
)
1808 fp
.write(DUMMY_MESSAGE
)
1810 if hasattr(os
, "link"):
1811 os
.link(tmpname
, newname
)
1813 fp
= open(newname
, "w")
1814 fp
.write(DUMMY_MESSAGE
)
1816 self
._msgfiles
.append(newname
)
1819 def test_empty_maildir(self
):
1820 """Test an empty maildir mailbox"""
1821 # Test for regression on bug #117490:
1822 # Make sure the boxes attribute actually gets set.
1823 self
.mbox
= mailbox
.Maildir(test_support
.TESTFN
)
1824 #self.assertTrue(hasattr(self.mbox, "boxes"))
1825 #self.assertTrue(len(self.mbox.boxes) == 0)
1826 self
.assertTrue(self
.mbox
.next() is None)
1827 self
.assertTrue(self
.mbox
.next() is None)
1829 def test_nonempty_maildir_cur(self
):
1830 self
.createMessage("cur")
1831 self
.mbox
= mailbox
.Maildir(test_support
.TESTFN
)
1832 #self.assertTrue(len(self.mbox.boxes) == 1)
1833 self
.assertTrue(self
.mbox
.next() is not None)
1834 self
.assertTrue(self
.mbox
.next() is None)
1835 self
.assertTrue(self
.mbox
.next() is None)
1837 def test_nonempty_maildir_new(self
):
1838 self
.createMessage("new")
1839 self
.mbox
= mailbox
.Maildir(test_support
.TESTFN
)
1840 #self.assertTrue(len(self.mbox.boxes) == 1)
1841 self
.assertTrue(self
.mbox
.next() is not None)
1842 self
.assertTrue(self
.mbox
.next() is None)
1843 self
.assertTrue(self
.mbox
.next() is None)
1845 def test_nonempty_maildir_both(self
):
1846 self
.createMessage("cur")
1847 self
.createMessage("new")
1848 self
.mbox
= mailbox
.Maildir(test_support
.TESTFN
)
1849 #self.assertTrue(len(self.mbox.boxes) == 2)
1850 self
.assertTrue(self
.mbox
.next() is not None)
1851 self
.assertTrue(self
.mbox
.next() is not None)
1852 self
.assertTrue(self
.mbox
.next() is None)
1853 self
.assertTrue(self
.mbox
.next() is None)
1855 def test_unix_mbox(self
):
1856 ### should be better!
1858 fname
= self
.createMessage("cur", True)
1860 for msg
in mailbox
.PortableUnixMailbox(open(fname
),
1861 email
.parser
.Parser().parse
):
1863 self
.assertEqual(msg
["subject"], "Simple Test")
1864 self
.assertEqual(len(str(msg
)), len(FROM_
)+len(DUMMY_MESSAGE
))
1865 self
.assertEqual(n
, 1)
1867 ## End: classes from the original module (for backward compatibility).
1870 _sample_message
= """\
1871 Return-Path: <gkj@gregorykjohnson.com>
1872 X-Original-To: gkj+person@localhost
1873 Delivered-To: gkj+person@localhost
1874 Received: from localhost (localhost [127.0.0.1])
1875 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
1876 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
1877 Delivered-To: gkj@sundance.gregorykjohnson.com
1878 Received: from localhost [127.0.0.1]
1879 by localhost with POP3 (fetchmail-6.2.5)
1880 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
1881 Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
1882 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
1883 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
1884 Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
1885 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
1886 Date: Wed, 13 Jul 2005 17:23:11 -0400
1887 From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
1888 To: gkj@gregorykjohnson.com
1889 Subject: Sample message
1890 Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
1892 Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
1893 Content-Disposition: inline
1894 User-Agent: Mutt/1.5.9i
1898 Content-Type: text/plain; charset=us-ascii
1899 Content-Disposition: inline
1901 This is a sample message.
1907 Content-Type: application/octet-stream
1908 Content-Disposition: attachment; filename="text.gz"
1909 Content-Transfer-Encoding: base64
1911 H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
1914 --NMuMz9nt05w80d4+--
1918 "Return-Path":"<gkj@gregorykjohnson.com>",
1919 "X-Original-To":"gkj+person@localhost",
1920 "Delivered-To":"gkj+person@localhost",
1921 "Received":"""from localhost (localhost [127.0.0.1])
1922 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
1923 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
1924 "Delivered-To":"gkj@sundance.gregorykjohnson.com",
1925 "Received":"""from localhost [127.0.0.1]
1926 by localhost with POP3 (fetchmail-6.2.5)
1927 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
1928 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
1929 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
1930 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
1931 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
1932 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
1933 "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
1934 "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
1935 "To":"gkj@gregorykjohnson.com",
1936 "Subject":"Sample message",
1937 "Mime-Version":"1.0",
1938 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
1939 "Content-Disposition":"inline",
1940 "User-Agent": "Mutt/1.5.9i" }
1942 _sample_payloads = ("""This
is a sample message
.
1947 """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
1953 tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
1954 TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
1955 TestMHMessage, TestBabylMessage, TestMMDFMessage,
1956 TestMessageConversion, TestProxyFile, TestPartialFile,
1958 test_support.run_unittest(*tests)
1959 test_support.reap_children()
1962 if __name__ == '__main__':