11 from test
import test_support
21 class TestBase(unittest
.TestCase
):
23 def _check_sample(self
, msg
):
24 # Inspect a mailbox.Message representation of the sample message
25 self
.assertIsInstance(msg
, email
.message
.Message
)
26 self
.assertIsInstance(msg
, mailbox
.Message
)
27 for key
, value
in _sample_headers
.iteritems():
28 self
.assertIn(value
, msg
.get_all(key
))
29 self
.assertTrue(msg
.is_multipart())
30 self
.assertEqual(len(msg
.get_payload()), len(_sample_payloads
))
31 for i
, payload
in enumerate(_sample_payloads
):
32 part
= msg
.get_payload(i
)
33 self
.assertIsInstance(part
, email
.message
.Message
)
34 self
.assertNotIsInstance(part
, mailbox
.Message
)
35 self
.assertEqual(part
.get_payload(), payload
)
37 def _delete_recursively(self
, target
):
38 # Delete a file or delete a directory recursively
39 if os
.path
.isdir(target
):
40 for path
, dirs
, files
in os
.walk(target
, topdown
=False):
42 os
.remove(os
.path
.join(path
, name
))
44 os
.rmdir(os
.path
.join(path
, name
))
46 elif os
.path
.exists(target
):
50 class TestMailbox(TestBase
):
52 _factory
= None # Overridden by subclasses to reuse tests
53 _template
= 'From: foo\n\n%s'
56 self
._path
= test_support
.TESTFN
57 self
._delete
_recursively
(self
._path
)
58 self
._box
= self
._factory
(self
._path
)
62 self
._delete
_recursively
(self
._path
)
65 # Add copies of a sample message
67 keys
.append(self
._box
.add(self
._template
% 0))
68 self
.assertEqual(len(self
._box
), 1)
69 keys
.append(self
._box
.add(mailbox
.Message(_sample_message
)))
70 self
.assertEqual(len(self
._box
), 2)
71 keys
.append(self
._box
.add(email
.message_from_string(_sample_message
)))
72 self
.assertEqual(len(self
._box
), 3)
73 keys
.append(self
._box
.add(StringIO
.StringIO(_sample_message
)))
74 self
.assertEqual(len(self
._box
), 4)
75 keys
.append(self
._box
.add(_sample_message
))
76 self
.assertEqual(len(self
._box
), 5)
77 self
.assertEqual(self
._box
.get_string(keys
[0]), self
._template
% 0)
78 for i
in (1, 2, 3, 4):
79 self
._check
_sample
(self
._box
[keys
[i
]])
81 def test_remove(self
):
82 # Remove messages using remove()
83 self
._test
_remove
_or
_delitem
(self
._box
.remove
)
85 def test_delitem(self
):
86 # Remove messages using __delitem__()
87 self
._test
_remove
_or
_delitem
(self
._box
.__delitem
__)
89 def _test_remove_or_delitem(self
, method
):
90 # (Used by test_remove() and test_delitem().)
91 key0
= self
._box
.add(self
._template
% 0)
92 key1
= self
._box
.add(self
._template
% 1)
93 self
.assertEqual(len(self
._box
), 2)
96 self
.assertEqual(l
, 1)
97 self
.assertRaises(KeyError, lambda: self
._box
[key0
])
98 self
.assertRaises(KeyError, lambda: method(key0
))
99 self
.assertEqual(self
._box
.get_string(key1
), self
._template
% 1)
100 key2
= self
._box
.add(self
._template
% 2)
101 self
.assertEqual(len(self
._box
), 2)
104 self
.assertEqual(l
, 1)
105 self
.assertRaises(KeyError, lambda: self
._box
[key2
])
106 self
.assertRaises(KeyError, lambda: method(key2
))
107 self
.assertEqual(self
._box
.get_string(key1
), self
._template
% 1)
109 self
.assertEqual(len(self
._box
), 0)
110 self
.assertRaises(KeyError, lambda: self
._box
[key1
])
111 self
.assertRaises(KeyError, lambda: method(key1
))
113 def test_discard(self
, repetitions
=10):
115 key0
= self
._box
.add(self
._template
% 0)
116 key1
= self
._box
.add(self
._template
% 1)
117 self
.assertEqual(len(self
._box
), 2)
118 self
._box
.discard(key0
)
119 self
.assertEqual(len(self
._box
), 1)
120 self
.assertRaises(KeyError, lambda: self
._box
[key0
])
121 self
._box
.discard(key0
)
122 self
.assertEqual(len(self
._box
), 1)
123 self
.assertRaises(KeyError, lambda: self
._box
[key0
])
126 # Retrieve messages using get()
127 key0
= self
._box
.add(self
._template
% 0)
128 msg
= self
._box
.get(key0
)
129 self
.assertEqual(msg
['from'], 'foo')
130 self
.assertEqual(msg
.get_payload(), '0')
131 self
.assertIs(self
._box
.get('foo'), None)
132 self
.assertFalse(self
._box
.get('foo', False))
134 self
._box
= self
._factory
(self
._path
, factory
=rfc822
.Message
)
135 key1
= self
._box
.add(self
._template
% 1)
136 msg
= self
._box
.get(key1
)
137 self
.assertEqual(msg
['from'], 'foo')
138 self
.assertEqual(msg
.fp
.read(), '1')
140 def test_getitem(self
):
141 # Retrieve message using __getitem__()
142 key0
= self
._box
.add(self
._template
% 0)
143 msg
= self
._box
[key0
]
144 self
.assertEqual(msg
['from'], 'foo')
145 self
.assertEqual(msg
.get_payload(), '0')
146 self
.assertRaises(KeyError, lambda: self
._box
['foo'])
147 self
._box
.discard(key0
)
148 self
.assertRaises(KeyError, lambda: self
._box
[key0
])
150 def test_get_message(self
):
151 # Get Message representations of messages
152 key0
= self
._box
.add(self
._template
% 0)
153 key1
= self
._box
.add(_sample_message
)
154 msg0
= self
._box
.get_message(key0
)
155 self
.assertIsInstance(msg0
, mailbox
.Message
)
156 self
.assertEqual(msg0
['from'], 'foo')
157 self
.assertEqual(msg0
.get_payload(), '0')
158 self
._check
_sample
(self
._box
.get_message(key1
))
160 def test_get_string(self
):
161 # Get string representations of messages
162 key0
= self
._box
.add(self
._template
% 0)
163 key1
= self
._box
.add(_sample_message
)
164 self
.assertEqual(self
._box
.get_string(key0
), self
._template
% 0)
165 self
.assertEqual(self
._box
.get_string(key1
), _sample_message
)
167 def test_get_file(self
):
168 # Get file representations of messages
169 key0
= self
._box
.add(self
._template
% 0)
170 key1
= self
._box
.add(_sample_message
)
171 self
.assertEqual(self
._box
.get_file(key0
).read().replace(os
.linesep
, '\n'),
173 self
.assertEqual(self
._box
.get_file(key1
).read().replace(os
.linesep
, '\n'),
176 def test_iterkeys(self
):
177 # Get keys using iterkeys()
178 self
._check
_iteration
(self
._box
.iterkeys
, do_keys
=True, do_values
=False)
181 # Get keys using keys()
182 self
._check
_iteration
(self
._box
.keys
, do_keys
=True, do_values
=False)
184 def test_itervalues(self
):
185 # Get values using itervalues()
186 self
._check
_iteration
(self
._box
.itervalues
, do_keys
=False,
190 # Get values using __iter__()
191 self
._check
_iteration
(self
._box
.__iter
__, do_keys
=False,
194 def test_values(self
):
195 # Get values using values()
196 self
._check
_iteration
(self
._box
.values
, do_keys
=False, do_values
=True)
198 def test_iteritems(self
):
199 # Get keys and values using iteritems()
200 self
._check
_iteration
(self
._box
.iteritems
, do_keys
=True,
203 def test_items(self
):
204 # Get keys and values using items()
205 self
._check
_iteration
(self
._box
.items
, do_keys
=True, do_values
=True)
207 def _check_iteration(self
, method
, do_keys
, do_values
, repetitions
=10):
208 for value
in method():
209 self
.fail("Not empty")
210 keys
, values
= [], []
211 for i
in xrange(repetitions
):
212 keys
.append(self
._box
.add(self
._template
% i
))
213 values
.append(self
._template
% i
)
214 if do_keys
and not do_values
:
215 returned_keys
= list(method())
216 elif do_values
and not do_keys
:
217 returned_values
= list(method())
219 returned_keys
, returned_values
= [], []
220 for key
, value
in method():
221 returned_keys
.append(key
)
222 returned_values
.append(value
)
224 self
.assertEqual(len(keys
), len(returned_keys
))
225 self
.assertEqual(set(keys
), set(returned_keys
))
228 for value
in returned_values
:
229 self
.assertEqual(value
['from'], 'foo')
230 self
.assertTrue(int(value
.get_payload()) < repetitions
,
231 (value
.get_payload(), repetitions
))
233 self
.assertEqual(len(values
), count
)
235 def test_has_key(self
):
236 # Check existence of keys using has_key()
237 self
._test
_has
_key
_or
_contains
(self
._box
.has_key
)
239 def test_contains(self
):
240 # Check existence of keys using __contains__()
241 self
._test
_has
_key
_or
_contains
(self
._box
.__contains
__)
243 def _test_has_key_or_contains(self
, method
):
244 # (Used by test_has_key() and test_contains().)
245 self
.assertFalse(method('foo'))
246 key0
= self
._box
.add(self
._template
% 0)
247 self
.assertTrue(method(key0
))
248 self
.assertFalse(method('foo'))
249 key1
= self
._box
.add(self
._template
% 1)
250 self
.assertTrue(method(key1
))
251 self
.assertTrue(method(key0
))
252 self
.assertFalse(method('foo'))
253 self
._box
.remove(key0
)
254 self
.assertFalse(method(key0
))
255 self
.assertTrue(method(key1
))
256 self
.assertFalse(method('foo'))
257 self
._box
.remove(key1
)
258 self
.assertFalse(method(key1
))
259 self
.assertFalse(method(key0
))
260 self
.assertFalse(method('foo'))
262 def test_len(self
, repetitions
=10):
265 for i
in xrange(repetitions
):
266 self
.assertEqual(len(self
._box
), i
)
267 keys
.append(self
._box
.add(self
._template
% i
))
268 self
.assertEqual(len(self
._box
), i
+ 1)
269 for i
in xrange(repetitions
):
270 self
.assertEqual(len(self
._box
), repetitions
- i
)
271 self
._box
.remove(keys
[i
])
272 self
.assertEqual(len(self
._box
), repetitions
- i
- 1)
274 def test_set_item(self
):
275 # Modify messages using __setitem__()
276 key0
= self
._box
.add(self
._template
% 'original 0')
277 self
.assertEqual(self
._box
.get_string(key0
),
278 self
._template
% 'original 0')
279 key1
= self
._box
.add(self
._template
% 'original 1')
280 self
.assertEqual(self
._box
.get_string(key1
),
281 self
._template
% 'original 1')
282 self
._box
[key0
] = self
._template
% 'changed 0'
283 self
.assertEqual(self
._box
.get_string(key0
),
284 self
._template
% 'changed 0')
285 self
._box
[key1
] = self
._template
% 'changed 1'
286 self
.assertEqual(self
._box
.get_string(key1
),
287 self
._template
% 'changed 1')
288 self
._box
[key0
] = _sample_message
289 self
._check
_sample
(self
._box
[key0
])
290 self
._box
[key1
] = self
._box
[key0
]
291 self
._check
_sample
(self
._box
[key1
])
292 self
._box
[key0
] = self
._template
% 'original 0'
293 self
.assertEqual(self
._box
.get_string(key0
),
294 self
._template
% 'original 0')
295 self
._check
_sample
(self
._box
[key1
])
296 self
.assertRaises(KeyError,
297 lambda: self
._box
.__setitem
__('foo', 'bar'))
298 self
.assertRaises(KeyError, lambda: self
._box
['foo'])
299 self
.assertEqual(len(self
._box
), 2)
301 def test_clear(self
, iterations
=10):
302 # Remove all messages using clear()
304 for i
in xrange(iterations
):
305 self
._box
.add(self
._template
% i
)
306 for i
, key
in enumerate(keys
):
307 self
.assertEqual(self
._box
.get_string(key
), self
._template
% i
)
309 self
.assertEqual(len(self
._box
), 0)
310 for i
, key
in enumerate(keys
):
311 self
.assertRaises(KeyError, lambda: self
._box
.get_string(key
))
314 # Get and remove a message using pop()
315 key0
= self
._box
.add(self
._template
% 0)
316 self
.assertIn(key0
, self
._box
)
317 key1
= self
._box
.add(self
._template
% 1)
318 self
.assertIn(key1
, self
._box
)
319 self
.assertEqual(self
._box
.pop(key0
).get_payload(), '0')
320 self
.assertNotIn(key0
, self
._box
)
321 self
.assertIn(key1
, self
._box
)
322 key2
= self
._box
.add(self
._template
% 2)
323 self
.assertIn(key2
, self
._box
)
324 self
.assertEqual(self
._box
.pop(key2
).get_payload(), '2')
325 self
.assertNotIn(key2
, self
._box
)
326 self
.assertIn(key1
, self
._box
)
327 self
.assertEqual(self
._box
.pop(key1
).get_payload(), '1')
328 self
.assertNotIn(key1
, self
._box
)
329 self
.assertEqual(len(self
._box
), 0)
331 def test_popitem(self
, iterations
=10):
332 # Get and remove an arbitrary (key, message) using popitem()
335 keys
.append(self
._box
.add(self
._template
% i
))
338 key
, msg
= self
._box
.popitem()
339 self
.assertIn(key
, keys
)
340 self
.assertNotIn(key
, seen
)
342 self
.assertEqual(int(msg
.get_payload()), keys
.index(key
))
343 self
.assertEqual(len(self
._box
), 0)
345 self
.assertRaises(KeyError, lambda: self
._box
[key
])
347 def test_update(self
):
348 # Modify multiple messages using update()
349 key0
= self
._box
.add(self
._template
% 'original 0')
350 key1
= self
._box
.add(self
._template
% 'original 1')
351 key2
= self
._box
.add(self
._template
% 'original 2')
352 self
._box
.update({key0
: self
._template
% 'changed 0',
353 key2
: _sample_message
})
354 self
.assertEqual(len(self
._box
), 3)
355 self
.assertEqual(self
._box
.get_string(key0
),
356 self
._template
% 'changed 0')
357 self
.assertEqual(self
._box
.get_string(key1
),
358 self
._template
% 'original 1')
359 self
._check
_sample
(self
._box
[key2
])
360 self
._box
.update([(key2
, self
._template
% 'changed 2'),
361 (key1
, self
._template
% 'changed 1'),
362 (key0
, self
._template
% 'original 0')])
363 self
.assertEqual(len(self
._box
), 3)
364 self
.assertEqual(self
._box
.get_string(key0
),
365 self
._template
% 'original 0')
366 self
.assertEqual(self
._box
.get_string(key1
),
367 self
._template
% 'changed 1')
368 self
.assertEqual(self
._box
.get_string(key2
),
369 self
._template
% 'changed 2')
370 self
.assertRaises(KeyError,
371 lambda: self
._box
.update({'foo': 'bar',
372 key0
: self
._template
% "changed 0"}))
373 self
.assertEqual(len(self
._box
), 3)
374 self
.assertEqual(self
._box
.get_string(key0
),
375 self
._template
% "changed 0")
376 self
.assertEqual(self
._box
.get_string(key1
),
377 self
._template
% "changed 1")
378 self
.assertEqual(self
._box
.get_string(key2
),
379 self
._template
% "changed 2")
381 def test_flush(self
):
382 # Write changes to disk
383 self
._test
_flush
_or
_close
(self
._box
.flush
, True)
385 def test_lock_unlock(self
):
386 # Lock and unlock the mailbox
387 self
.assertFalse(os
.path
.exists(self
._get
_lock
_path
()))
389 self
.assertTrue(os
.path
.exists(self
._get
_lock
_path
()))
391 self
.assertFalse(os
.path
.exists(self
._get
_lock
_path
()))
393 def test_close(self
):
394 # Close mailbox and flush changes to disk
395 self
._test
_flush
_or
_close
(self
._box
.close
, False)
397 def _test_flush_or_close(self
, method
, should_call_close
):
398 contents
= [self
._template
% i
for i
in xrange(3)]
399 self
._box
.add(contents
[0])
400 self
._box
.add(contents
[1])
401 self
._box
.add(contents
[2])
403 if should_call_close
:
405 self
._box
= self
._factory
(self
._path
)
406 keys
= self
._box
.keys()
407 self
.assertEqual(len(keys
), 3)
409 self
.assertIn(self
._box
.get_string(key
), contents
)
411 def test_dump_message(self
):
412 # Write message representations to disk
413 for input in (email
.message_from_string(_sample_message
),
414 _sample_message
, StringIO
.StringIO(_sample_message
)):
415 output
= StringIO
.StringIO()
416 self
._box
._dump
_message
(input, output
)
417 self
.assertEqual(output
.getvalue(),
418 _sample_message
.replace('\n', os
.linesep
))
419 output
= StringIO
.StringIO()
420 self
.assertRaises(TypeError,
421 lambda: self
._box
._dump
_message
(None, output
))
423 def _get_lock_path(self
):
424 # Return the path of the dot lock file. May be overridden.
425 return self
._path
+ '.lock'
428 class TestMailboxSuperclass(TestBase
):
430 def test_notimplemented(self
):
431 # Test that all Mailbox methods raise NotImplementedException.
432 box
= mailbox
.Mailbox('path')
433 self
.assertRaises(NotImplementedError, lambda: box
.add(''))
434 self
.assertRaises(NotImplementedError, lambda: box
.remove(''))
435 self
.assertRaises(NotImplementedError, lambda: box
.__delitem
__(''))
436 self
.assertRaises(NotImplementedError, lambda: box
.discard(''))
437 self
.assertRaises(NotImplementedError, lambda: box
.__setitem
__('', ''))
438 self
.assertRaises(NotImplementedError, lambda: box
.iterkeys())
439 self
.assertRaises(NotImplementedError, lambda: box
.keys())
440 self
.assertRaises(NotImplementedError, lambda: box
.itervalues().next())
441 self
.assertRaises(NotImplementedError, lambda: box
.__iter
__().next())
442 self
.assertRaises(NotImplementedError, lambda: box
.values())
443 self
.assertRaises(NotImplementedError, lambda: box
.iteritems().next())
444 self
.assertRaises(NotImplementedError, lambda: box
.items())
445 self
.assertRaises(NotImplementedError, lambda: box
.get(''))
446 self
.assertRaises(NotImplementedError, lambda: box
.__getitem
__(''))
447 self
.assertRaises(NotImplementedError, lambda: box
.get_message(''))
448 self
.assertRaises(NotImplementedError, lambda: box
.get_string(''))
449 self
.assertRaises(NotImplementedError, lambda: box
.get_file(''))
450 self
.assertRaises(NotImplementedError, lambda: box
.has_key(''))
451 self
.assertRaises(NotImplementedError, lambda: box
.__contains
__(''))
452 self
.assertRaises(NotImplementedError, lambda: box
.__len
__())
453 self
.assertRaises(NotImplementedError, lambda: box
.clear())
454 self
.assertRaises(NotImplementedError, lambda: box
.pop(''))
455 self
.assertRaises(NotImplementedError, lambda: box
.popitem())
456 self
.assertRaises(NotImplementedError, lambda: box
.update((('', ''),)))
457 self
.assertRaises(NotImplementedError, lambda: box
.flush())
458 self
.assertRaises(NotImplementedError, lambda: box
.lock())
459 self
.assertRaises(NotImplementedError, lambda: box
.unlock())
460 self
.assertRaises(NotImplementedError, lambda: box
.close())
463 class TestMaildir(TestMailbox
):
465 _factory
= lambda self
, path
, factory
=None: mailbox
.Maildir(path
, factory
)
468 TestMailbox
.setUp(self
)
469 if os
.name
in ('nt', 'os2') or sys
.platform
== 'cygwin':
470 self
._box
.colon
= '!'
472 def test_add_MM(self
):
473 # Add a MaildirMessage instance
474 msg
= mailbox
.MaildirMessage(self
._template
% 0)
475 msg
.set_subdir('cur')
477 key
= self
._box
.add(msg
)
478 self
.assertTrue(os
.path
.exists(os
.path
.join(self
._path
, 'cur', '%s%sfoo' %
479 (key
, self
._box
.colon
))))
481 def test_get_MM(self
):
482 # Get a MaildirMessage instance
483 msg
= mailbox
.MaildirMessage(self
._template
% 0)
484 msg
.set_subdir('cur')
486 key
= self
._box
.add(msg
)
487 msg_returned
= self
._box
.get_message(key
)
488 self
.assertIsInstance(msg_returned
, mailbox
.MaildirMessage
)
489 self
.assertEqual(msg_returned
.get_subdir(), 'cur')
490 self
.assertEqual(msg_returned
.get_flags(), 'FR')
492 def test_set_MM(self
):
493 # Set with a MaildirMessage instance
494 msg0
= mailbox
.MaildirMessage(self
._template
% 0)
496 key
= self
._box
.add(msg0
)
497 msg_returned
= self
._box
.get_message(key
)
498 self
.assertEqual(msg_returned
.get_subdir(), 'new')
499 self
.assertEqual(msg_returned
.get_flags(), 'PT')
500 msg1
= mailbox
.MaildirMessage(self
._template
% 1)
501 self
._box
[key
] = msg1
502 msg_returned
= self
._box
.get_message(key
)
503 self
.assertEqual(msg_returned
.get_subdir(), 'new')
504 self
.assertEqual(msg_returned
.get_flags(), '')
505 self
.assertEqual(msg_returned
.get_payload(), '1')
506 msg2
= mailbox
.MaildirMessage(self
._template
% 2)
508 self
._box
[key
] = msg2
509 self
._box
[key
] = self
._template
% 3
510 msg_returned
= self
._box
.get_message(key
)
511 self
.assertEqual(msg_returned
.get_subdir(), 'new')
512 self
.assertEqual(msg_returned
.get_flags(), 'S')
513 self
.assertEqual(msg_returned
.get_payload(), '3')
515 def test_consistent_factory(self
):
517 msg
= mailbox
.MaildirMessage(self
._template
% 0)
518 msg
.set_subdir('cur')
520 key
= self
._box
.add(msg
)
522 # Create new mailbox with
523 class FakeMessage(mailbox
.MaildirMessage
):
525 box
= mailbox
.Maildir(self
._path
, factory
=FakeMessage
)
526 box
.colon
= self
._box
.colon
527 msg2
= box
.get_message(key
)
528 self
.assertIsInstance(msg2
, FakeMessage
)
530 def test_initialize_new(self
):
531 # Initialize a non-existent mailbox
533 self
._box
= mailbox
.Maildir(self
._path
)
534 self
._check
_basics
(factory
=rfc822
.Message
)
535 self
._delete
_recursively
(self
._path
)
536 self
._box
= self
._factory
(self
._path
, factory
=None)
539 def test_initialize_existing(self
):
540 # Initialize an existing mailbox
542 for subdir
in '', 'tmp', 'new', 'cur':
543 os
.mkdir(os
.path
.normpath(os
.path
.join(self
._path
, subdir
)))
544 self
._box
= mailbox
.Maildir(self
._path
)
545 self
._check
_basics
(factory
=rfc822
.Message
)
546 self
._box
= mailbox
.Maildir(self
._path
, factory
=None)
549 def _check_basics(self
, factory
=None):
550 # (Used by test_open_new() and test_open_existing().)
551 self
.assertEqual(self
._box
._path
, os
.path
.abspath(self
._path
))
552 self
.assertEqual(self
._box
._factory
, factory
)
553 for subdir
in '', 'tmp', 'new', 'cur':
554 path
= os
.path
.join(self
._path
, subdir
)
555 mode
= os
.stat(path
)[stat
.ST_MODE
]
556 self
.assertTrue(stat
.S_ISDIR(mode
), "Not a directory: '%s'" % path
)
558 def test_list_folders(self
):
560 self
._box
.add_folder('one')
561 self
._box
.add_folder('two')
562 self
._box
.add_folder('three')
563 self
.assertEqual(len(self
._box
.list_folders()), 3)
564 self
.assertEqual(set(self
._box
.list_folders()),
565 set(('one', 'two', 'three')))
567 def test_get_folder(self
):
569 self
._box
.add_folder('foo.bar')
570 folder0
= self
._box
.get_folder('foo.bar')
571 folder0
.add(self
._template
% 'bar')
572 self
.assertTrue(os
.path
.isdir(os
.path
.join(self
._path
, '.foo.bar')))
573 folder1
= self
._box
.get_folder('foo.bar')
574 self
.assertEqual(folder1
.get_string(folder1
.keys()[0]),
575 self
._template
% 'bar')
577 def test_add_and_remove_folders(self
):
579 self
._box
.add_folder('one')
580 self
._box
.add_folder('two')
581 self
.assertEqual(len(self
._box
.list_folders()), 2)
582 self
.assertEqual(set(self
._box
.list_folders()), set(('one', 'two')))
583 self
._box
.remove_folder('one')
584 self
.assertEqual(len(self
._box
.list_folders()), 1)
585 self
.assertEqual(set(self
._box
.list_folders()), set(('two',)))
586 self
._box
.add_folder('three')
587 self
.assertEqual(len(self
._box
.list_folders()), 2)
588 self
.assertEqual(set(self
._box
.list_folders()), set(('two', 'three')))
589 self
._box
.remove_folder('three')
590 self
.assertEqual(len(self
._box
.list_folders()), 1)
591 self
.assertEqual(set(self
._box
.list_folders()), set(('two',)))
592 self
._box
.remove_folder('two')
593 self
.assertEqual(len(self
._box
.list_folders()), 0)
594 self
.assertEqual(self
._box
.list_folders(), [])
596 def test_clean(self
):
597 # Remove old files from 'tmp'
598 foo_path
= os
.path
.join(self
._path
, 'tmp', 'foo')
599 bar_path
= os
.path
.join(self
._path
, 'tmp', 'bar')
600 f
= open(foo_path
, 'w')
603 f
= open(bar_path
, 'w')
607 self
.assertTrue(os
.path
.exists(foo_path
))
608 self
.assertTrue(os
.path
.exists(bar_path
))
609 foo_stat
= os
.stat(foo_path
)
610 os
.utime(foo_path
, (time
.time() - 129600 - 2,
613 self
.assertFalse(os
.path
.exists(foo_path
))
614 self
.assertTrue(os
.path
.exists(bar_path
))
616 def test_create_tmp(self
, repetitions
=10):
617 # Create files in tmp directory
618 hostname
= socket
.gethostname()
620 hostname
= hostname
.replace('/', r
'\057')
622 hostname
= hostname
.replace(':', r
'\072')
624 pattern
= re
.compile(r
"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
625 r
"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")
626 previous_groups
= None
627 for x
in xrange(repetitions
):
628 tmp_file
= self
._box
._create
_tmp
()
629 head
, tail
= os
.path
.split(tmp_file
.name
)
630 self
.assertEqual(head
, os
.path
.abspath(os
.path
.join(self
._path
,
632 "File in wrong location: '%s'" % head
)
633 match
= pattern
.match(tail
)
634 self
.assertTrue(match
is not None, "Invalid file name: '%s'" % tail
)
635 groups
= match
.groups()
636 if previous_groups
is not None:
637 self
.assertTrue(int(groups
[0] >= previous_groups
[0]),
638 "Non-monotonic seconds: '%s' before '%s'" %
639 (previous_groups
[0], groups
[0]))
640 self
.assertTrue(int(groups
[1] >= previous_groups
[1]) or
641 groups
[0] != groups
[1],
642 "Non-monotonic milliseconds: '%s' before '%s'" %
643 (previous_groups
[1], groups
[1]))
644 self
.assertTrue(int(groups
[2]) == pid
,
645 "Process ID mismatch: '%s' should be '%s'" %
647 self
.assertTrue(int(groups
[3]) == int(previous_groups
[3]) + 1,
648 "Non-sequential counter: '%s' before '%s'" %
649 (previous_groups
[3], groups
[3]))
650 self
.assertTrue(groups
[4] == hostname
,
651 "Host name mismatch: '%s' should be '%s'" %
652 (groups
[4], hostname
))
653 previous_groups
= groups
654 tmp_file
.write(_sample_message
)
656 self
.assertTrue(tmp_file
.read() == _sample_message
)
658 file_count
= len(os
.listdir(os
.path
.join(self
._path
, "tmp")))
659 self
.assertTrue(file_count
== repetitions
,
660 "Wrong file count: '%s' should be '%s'" %
661 (file_count
, repetitions
))
663 def test_refresh(self
):
664 # Update the table of contents
665 self
.assertEqual(self
._box
._toc
, {})
666 key0
= self
._box
.add(self
._template
% 0)
667 key1
= self
._box
.add(self
._template
% 1)
668 self
.assertEqual(self
._box
._toc
, {})
670 self
.assertEqual(self
._box
._toc
, {key0
: os
.path
.join('new', key0
),
671 key1
: os
.path
.join('new', key1
)})
672 key2
= self
._box
.add(self
._template
% 2)
673 self
.assertEqual(self
._box
._toc
, {key0
: os
.path
.join('new', key0
),
674 key1
: os
.path
.join('new', key1
)})
676 self
.assertEqual(self
._box
._toc
, {key0
: os
.path
.join('new', key0
),
677 key1
: os
.path
.join('new', key1
),
678 key2
: os
.path
.join('new', key2
)})
680 def test_lookup(self
):
681 # Look up message subpaths in the TOC
682 self
.assertRaises(KeyError, lambda: self
._box
._lookup
('foo'))
683 key0
= self
._box
.add(self
._template
% 0)
684 self
.assertEqual(self
._box
._lookup
(key0
), os
.path
.join('new', key0
))
685 os
.remove(os
.path
.join(self
._path
, 'new', key0
))
686 self
.assertEqual(self
._box
._toc
, {key0
: os
.path
.join('new', key0
)})
687 # Be sure that the TOC is read back from disk (see issue #6896
688 # about bad mtime behaviour on some systems).
690 self
.assertRaises(KeyError, lambda: self
._box
._lookup
(key0
))
691 self
.assertEqual(self
._box
._toc
, {})
693 def test_lock_unlock(self
):
694 # Lock and unlock the mailbox. For Maildir, this does nothing.
698 def test_folder (self
):
699 # Test for bug #1569790: verify that folders returned by .get_folder()
700 # use the same factory function.
701 def dummy_factory (s
):
703 box
= self
._factory
(self
._path
, factory
=dummy_factory
)
704 folder
= box
.add_folder('folder1')
705 self
.assertIs(folder
._factory
, dummy_factory
)
707 folder1_alias
= box
.get_folder('folder1')
708 self
.assertIs(folder1_alias
._factory
, dummy_factory
)
710 def test_directory_in_folder (self
):
711 # Test that mailboxes still work if there's a stray extra directory
714 self
._box
.add(mailbox
.Message(_sample_message
))
716 # Create a stray directory
717 os
.mkdir(os
.path
.join(self
._path
, 'cur', 'stray-dir'))
719 # Check that looping still works with the directory present.
720 for msg
in self
._box
:
723 def test_file_permissions(self
):
724 # Verify that message files are created without execute permissions
725 if not hasattr(os
, "stat") or not hasattr(os
, "umask"):
727 msg
= mailbox
.MaildirMessage(self
._template
% 0)
728 orig_umask
= os
.umask(0)
730 key
= self
._box
.add(msg
)
733 path
= os
.path
.join(self
._path
, self
._box
._lookup
(key
))
734 mode
= os
.stat(path
).st_mode
735 self
.assertEqual(mode
& 0111, 0)
737 def test_folder_file_perms(self
):
738 # From bug #3228, we want to verify that the file created inside a Maildir
739 # subfolder isn't marked as executable.
740 if not hasattr(os
, "stat") or not hasattr(os
, "umask"):
743 orig_umask
= os
.umask(0)
745 subfolder
= self
._box
.add_folder('subfolder')
749 path
= os
.path
.join(subfolder
._path
, 'maildirfolder')
752 self
.assertFalse((perms
& 0111)) # Execute bits should all be off.
754 def test_reread(self
):
758 # Initially, the mailbox has not been read and the time is null.
759 assert getattr(self
._box
, '_last_read', None) is None
761 # Refresh mailbox; the times should now be set to something.
763 assert getattr(self
._box
, '_last_read', None) is not None
765 # Try calling _refresh() again; the modification times shouldn't have
766 # changed, so the mailbox should not be re-reading. Re-reading causes
767 # the ._toc attribute to be assigned a new dictionary object, so
768 # we'll check that the ._toc attribute isn't a different object.
769 orig_toc
= self
._box
._toc
771 return self
._box
._toc
is not orig_toc
773 time
.sleep(1) # Wait 1sec to ensure time.time()'s value changes
775 assert not refreshed()
777 # Now, write something into cur and remove it. This changes
778 # the mtime and should cause a re-read.
779 filename
= os
.path
.join(self
._path
, 'cur', 'stray-file')
780 f
= open(filename
, 'w')
786 class _TestMboxMMDF(TestMailbox
):
790 self
._delete
_recursively
(self
._path
)
791 for lock_remnant
in glob
.glob(self
._path
+ '.*'):
792 test_support
.unlink(lock_remnant
)
794 def test_add_from_string(self
):
795 # Add a string starting with 'From ' to the mailbox
796 key
= self
._box
.add('From foo@bar blah\nFrom: foo\n\n0')
797 self
.assertEqual(self
._box
[key
].get_from(), 'foo@bar blah')
798 self
.assertEqual(self
._box
[key
].get_payload(), '0')
800 def test_add_mbox_or_mmdf_message(self
):
801 # Add an mboxMessage or MMDFMessage
802 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
803 msg
= class_('From foo@bar blah\nFrom: foo\n\n0')
804 key
= self
._box
.add(msg
)
806 def test_open_close_open(self
):
807 # Open and inspect previously-created mailbox
808 values
= [self
._template
% i
for i
in xrange(3)]
812 mtime
= os
.path
.getmtime(self
._path
)
813 self
._box
= self
._factory
(self
._path
)
814 self
.assertEqual(len(self
._box
), 3)
815 for key
in self
._box
.iterkeys():
816 self
.assertIn(self
._box
.get_string(key
), values
)
818 self
.assertEqual(mtime
, os
.path
.getmtime(self
._path
))
820 def test_add_and_close(self
):
821 # Verifying that closing a mailbox doesn't change added items
822 self
._box
.add(_sample_message
)
824 self
._box
.add(self
._template
% i
)
825 self
._box
.add(_sample_message
)
826 self
._box
._file
.flush()
827 self
._box
._file
.seek(0)
828 contents
= self
._box
._file
.read()
830 with
open(self
._path
, 'rb') as f
:
831 self
.assertEqual(contents
, f
.read())
832 self
._box
= self
._factory
(self
._path
)
834 def test_lock_conflict(self
):
835 # Fork off a subprocess that will lock the file for 2 seconds,
836 # unlock it, and then exit.
837 if not hasattr(os
, 'fork'):
841 # In the child, lock the mailbox.
847 # In the parent, sleep a bit to give the child time to acquire
851 self
.assertRaises(mailbox
.ExternalClashError
,
854 # Wait for child to exit. Locking should now succeed.
855 exited_pid
, status
= os
.waitpid(pid
, 0)
860 def test_relock(self
):
861 # Test case for bug #1575506: the mailbox class was locking the
862 # wrong file object in its flush() method.
863 msg
= "Subject: sub\n\nbody\n"
864 key1
= self
._box
.add(msg
)
868 self
._box
= self
._factory
(self
._path
)
870 key2
= self
._box
.add(msg
)
872 self
.assertTrue(self
._box
._locked
)
876 class TestMbox(_TestMboxMMDF
):
878 _factory
= lambda self
, path
, factory
=None: mailbox
.mbox(path
, factory
)
880 def test_file_perms(self
):
881 # From bug #3228, we want to verify that the mailbox file isn't executable,
882 # even if the umask is set to something that would leave executable bits set.
883 # We only run this test on platforms that support umask.
884 if hasattr(os
, 'umask') and hasattr(os
, 'stat'):
886 old_umask
= os
.umask(0077)
888 os
.unlink(self
._path
)
889 self
._box
= mailbox
.mbox(self
._path
, create
=True)
895 st
= os
.stat(self
._path
)
897 self
.assertFalse((perms
& 0111)) # Execute bits should all be off.
899 class TestMMDF(_TestMboxMMDF
):
901 _factory
= lambda self
, path
, factory
=None: mailbox
.MMDF(path
, factory
)
904 class TestMH(TestMailbox
):
906 _factory
= lambda self
, path
, factory
=None: mailbox
.MH(path
, factory
)
908 def test_list_folders(self
):
910 self
._box
.add_folder('one')
911 self
._box
.add_folder('two')
912 self
._box
.add_folder('three')
913 self
.assertEqual(len(self
._box
.list_folders()), 3)
914 self
.assertEqual(set(self
._box
.list_folders()),
915 set(('one', 'two', 'three')))
917 def test_get_folder(self
):
919 def dummy_factory (s
):
921 self
._box
= self
._factory
(self
._path
, dummy_factory
)
923 new_folder
= self
._box
.add_folder('foo.bar')
924 folder0
= self
._box
.get_folder('foo.bar')
925 folder0
.add(self
._template
% 'bar')
926 self
.assertTrue(os
.path
.isdir(os
.path
.join(self
._path
, 'foo.bar')))
927 folder1
= self
._box
.get_folder('foo.bar')
928 self
.assertEqual(folder1
.get_string(folder1
.keys()[0]),
929 self
._template
% 'bar')
931 # Test for bug #1569790: verify that folders returned by .get_folder()
932 # use the same factory function.
933 self
.assertIs(new_folder
._factory
, self
._box
._factory
)
934 self
.assertIs(folder0
._factory
, self
._box
._factory
)
936 def test_add_and_remove_folders(self
):
938 self
._box
.add_folder('one')
939 self
._box
.add_folder('two')
940 self
.assertEqual(len(self
._box
.list_folders()), 2)
941 self
.assertEqual(set(self
._box
.list_folders()), set(('one', 'two')))
942 self
._box
.remove_folder('one')
943 self
.assertEqual(len(self
._box
.list_folders()), 1)
944 self
.assertEqual(set(self
._box
.list_folders()), set(('two', )))
945 self
._box
.add_folder('three')
946 self
.assertEqual(len(self
._box
.list_folders()), 2)
947 self
.assertEqual(set(self
._box
.list_folders()), set(('two', 'three')))
948 self
._box
.remove_folder('three')
949 self
.assertEqual(len(self
._box
.list_folders()), 1)
950 self
.assertEqual(set(self
._box
.list_folders()), set(('two', )))
951 self
._box
.remove_folder('two')
952 self
.assertEqual(len(self
._box
.list_folders()), 0)
953 self
.assertEqual(self
._box
.list_folders(), [])
955 def test_sequences(self
):
956 # Get and set sequences
957 self
.assertEqual(self
._box
.get_sequences(), {})
958 msg0
= mailbox
.MHMessage(self
._template
% 0)
959 msg0
.add_sequence('foo')
960 key0
= self
._box
.add(msg0
)
961 self
.assertEqual(self
._box
.get_sequences(), {'foo':[key0
]})
962 msg1
= mailbox
.MHMessage(self
._template
% 1)
963 msg1
.set_sequences(['bar', 'replied', 'foo'])
964 key1
= self
._box
.add(msg1
)
965 self
.assertEqual(self
._box
.get_sequences(),
966 {'foo':[key0
, key1
], 'bar':[key1
], 'replied':[key1
]})
967 msg0
.set_sequences(['flagged'])
968 self
._box
[key0
] = msg0
969 self
.assertEqual(self
._box
.get_sequences(),
970 {'foo':[key1
], 'bar':[key1
], 'replied':[key1
],
972 self
._box
.remove(key1
)
973 self
.assertEqual(self
._box
.get_sequences(), {'flagged':[key0
]})
975 def test_issue2625(self
):
976 msg0
= mailbox
.MHMessage(self
._template
% 0)
977 msg0
.add_sequence('foo')
978 key0
= self
._box
.add(msg0
)
979 refmsg0
= self
._box
.get_message(key0
)
982 # Pack the contents of the mailbox
983 msg0
= mailbox
.MHMessage(self
._template
% 0)
984 msg1
= mailbox
.MHMessage(self
._template
% 1)
985 msg2
= mailbox
.MHMessage(self
._template
% 2)
986 msg3
= mailbox
.MHMessage(self
._template
% 3)
987 msg0
.set_sequences(['foo', 'unseen'])
988 msg1
.set_sequences(['foo'])
989 msg2
.set_sequences(['foo', 'flagged'])
990 msg3
.set_sequences(['foo', 'bar', 'replied'])
991 key0
= self
._box
.add(msg0
)
992 key1
= self
._box
.add(msg1
)
993 key2
= self
._box
.add(msg2
)
994 key3
= self
._box
.add(msg3
)
995 self
.assertEqual(self
._box
.get_sequences(),
996 {'foo':[key0
,key1
,key2
,key3
], 'unseen':[key0
],
997 'flagged':[key2
], 'bar':[key3
], 'replied':[key3
]})
998 self
._box
.remove(key2
)
999 self
.assertEqual(self
._box
.get_sequences(),
1000 {'foo':[key0
,key1
,key3
], 'unseen':[key0
], 'bar':[key3
],
1003 self
.assertEqual(self
._box
.keys(), [1, 2, 3])
1007 self
.assertEqual(self
._box
.get_sequences(),
1008 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
1010 # Test case for packing while holding the mailbox locked.
1011 key0
= self
._box
.add(msg1
)
1012 key1
= self
._box
.add(msg1
)
1013 key2
= self
._box
.add(msg1
)
1014 key3
= self
._box
.add(msg1
)
1016 self
._box
.remove(key0
)
1017 self
._box
.remove(key2
)
1021 self
.assertEqual(self
._box
.get_sequences(),
1022 {'foo':[1, 2, 3, 4, 5],
1023 'unseen':[1], 'bar':[3], 'replied':[3]})
1025 def _get_lock_path(self
):
1026 return os
.path
.join(self
._path
, '.mh_sequences.lock')
1029 class TestBabyl(TestMailbox
):
1031 _factory
= lambda self
, path
, factory
=None: mailbox
.Babyl(path
, factory
)
1035 self
._delete
_recursively
(self
._path
)
1036 for lock_remnant
in glob
.glob(self
._path
+ '.*'):
1037 test_support
.unlink(lock_remnant
)
1039 def test_labels(self
):
1040 # Get labels from the mailbox
1041 self
.assertEqual(self
._box
.get_labels(), [])
1042 msg0
= mailbox
.BabylMessage(self
._template
% 0)
1043 msg0
.add_label('foo')
1044 key0
= self
._box
.add(msg0
)
1045 self
.assertEqual(self
._box
.get_labels(), ['foo'])
1046 msg1
= mailbox
.BabylMessage(self
._template
% 1)
1047 msg1
.set_labels(['bar', 'answered', 'foo'])
1048 key1
= self
._box
.add(msg1
)
1049 self
.assertEqual(set(self
._box
.get_labels()), set(['foo', 'bar']))
1050 msg0
.set_labels(['blah', 'filed'])
1051 self
._box
[key0
] = msg0
1052 self
.assertEqual(set(self
._box
.get_labels()),
1053 set(['foo', 'bar', 'blah']))
1054 self
._box
.remove(key1
)
1055 self
.assertEqual(set(self
._box
.get_labels()), set(['blah']))
1058 class TestMessage(TestBase
):
1060 _factory
= mailbox
.Message
# Overridden by subclasses to reuse tests
1063 self
._path
= test_support
.TESTFN
1066 self
._delete
_recursively
(self
._path
)
1068 def test_initialize_with_eMM(self
):
1069 # Initialize based on email.message.Message instance
1070 eMM
= email
.message_from_string(_sample_message
)
1071 msg
= self
._factory
(eMM
)
1072 self
._post
_initialize
_hook
(msg
)
1073 self
._check
_sample
(msg
)
1075 def test_initialize_with_string(self
):
1076 # Initialize based on string
1077 msg
= self
._factory
(_sample_message
)
1078 self
._post
_initialize
_hook
(msg
)
1079 self
._check
_sample
(msg
)
1081 def test_initialize_with_file(self
):
1082 # Initialize based on contents of file
1083 f
= open(self
._path
, 'w+')
1084 f
.write(_sample_message
)
1086 msg
= self
._factory
(f
)
1087 self
._post
_initialize
_hook
(msg
)
1088 self
._check
_sample
(msg
)
1091 def test_initialize_with_nothing(self
):
1092 # Initialize without arguments
1093 msg
= self
._factory
()
1094 self
._post
_initialize
_hook
(msg
)
1095 self
.assertIsInstance(msg
, email
.message
.Message
)
1096 self
.assertIsInstance(msg
, mailbox
.Message
)
1097 self
.assertIsInstance(msg
, self
._factory
)
1098 self
.assertEqual(msg
.keys(), [])
1099 self
.assertFalse(msg
.is_multipart())
1100 self
.assertEqual(msg
.get_payload(), None)
1102 def test_initialize_incorrectly(self
):
1103 # Initialize with invalid argument
1104 self
.assertRaises(TypeError, lambda: self
._factory
(object()))
1106 def test_become_message(self
):
1107 # Take on the state of another message
1108 eMM
= email
.message_from_string(_sample_message
)
1109 msg
= self
._factory
()
1110 msg
._become
_message
(eMM
)
1111 self
._check
_sample
(msg
)
1113 def test_explain_to(self
):
1114 # Copy self's format-specific data to other message formats.
1115 # This test is superficial; better ones are in TestMessageConversion.
1116 msg
= self
._factory
()
1117 for class_
in (mailbox
.Message
, mailbox
.MaildirMessage
,
1118 mailbox
.mboxMessage
, mailbox
.MHMessage
,
1119 mailbox
.BabylMessage
, mailbox
.MMDFMessage
):
1120 other_msg
= class_()
1121 msg
._explain
_to
(other_msg
)
1122 other_msg
= email
.message
.Message()
1123 self
.assertRaises(TypeError, lambda: msg
._explain
_to
(other_msg
))
1125 def _post_initialize_hook(self
, msg
):
1126 # Overridden by subclasses to check extra things after initialization
1130 class TestMaildirMessage(TestMessage
):
1132 _factory
= mailbox
.MaildirMessage
1134 def _post_initialize_hook(self
, msg
):
1135 self
.assertEqual(msg
._subdir
, 'new')
1136 self
.assertEqual(msg
._info
,'')
1138 def test_subdir(self
):
1139 # Use get_subdir() and set_subdir()
1140 msg
= mailbox
.MaildirMessage(_sample_message
)
1141 self
.assertEqual(msg
.get_subdir(), 'new')
1142 msg
.set_subdir('cur')
1143 self
.assertEqual(msg
.get_subdir(), 'cur')
1144 msg
.set_subdir('new')
1145 self
.assertEqual(msg
.get_subdir(), 'new')
1146 self
.assertRaises(ValueError, lambda: msg
.set_subdir('tmp'))
1147 self
.assertEqual(msg
.get_subdir(), 'new')
1148 msg
.set_subdir('new')
1149 self
.assertEqual(msg
.get_subdir(), 'new')
1150 self
._check
_sample
(msg
)
1152 def test_flags(self
):
1153 # Use get_flags(), set_flags(), add_flag(), remove_flag()
1154 msg
= mailbox
.MaildirMessage(_sample_message
)
1155 self
.assertEqual(msg
.get_flags(), '')
1156 self
.assertEqual(msg
.get_subdir(), 'new')
1158 self
.assertEqual(msg
.get_subdir(), 'new')
1159 self
.assertEqual(msg
.get_flags(), 'F')
1160 msg
.set_flags('SDTP')
1161 self
.assertEqual(msg
.get_flags(), 'DPST')
1163 self
.assertEqual(msg
.get_flags(), 'DFPST')
1164 msg
.remove_flag('TDRP')
1165 self
.assertEqual(msg
.get_flags(), 'FS')
1166 self
.assertEqual(msg
.get_subdir(), 'new')
1167 self
._check
_sample
(msg
)
1169 def test_date(self
):
1170 # Use get_date() and set_date()
1171 msg
= mailbox
.MaildirMessage(_sample_message
)
1172 diff
= msg
.get_date() - time
.time()
1173 self
.assertTrue(abs(diff
) < 60, diff
)
1175 self
.assertEqual(msg
.get_date(), 0.0)
1177 def test_info(self
):
1178 # Use get_info() and set_info()
1179 msg
= mailbox
.MaildirMessage(_sample_message
)
1180 self
.assertEqual(msg
.get_info(), '')
1181 msg
.set_info('1,foo=bar')
1182 self
.assertEqual(msg
.get_info(), '1,foo=bar')
1183 self
.assertRaises(TypeError, lambda: msg
.set_info(None))
1184 self
._check
_sample
(msg
)
1186 def test_info_and_flags(self
):
1187 # Test interaction of info and flag methods
1188 msg
= mailbox
.MaildirMessage(_sample_message
)
1189 self
.assertEqual(msg
.get_info(), '')
1191 self
.assertEqual(msg
.get_flags(), 'FS')
1192 self
.assertEqual(msg
.get_info(), '2,FS')
1194 self
.assertEqual(msg
.get_flags(), '')
1195 self
.assertEqual(msg
.get_info(), '1,')
1196 msg
.remove_flag('RPT')
1197 self
.assertEqual(msg
.get_flags(), '')
1198 self
.assertEqual(msg
.get_info(), '1,')
1200 self
.assertEqual(msg
.get_flags(), 'D')
1201 self
.assertEqual(msg
.get_info(), '2,D')
1202 self
._check
_sample
(msg
)
1205 class _TestMboxMMDFMessage(TestMessage
):
1207 _factory
= mailbox
._mboxMMDFMessage
1209 def _post_initialize_hook(self
, msg
):
1210 self
._check
_from
(msg
)
1212 def test_initialize_with_unixfrom(self
):
1213 # Initialize with a message that already has a _unixfrom attribute
1214 msg
= mailbox
.Message(_sample_message
)
1215 msg
.set_unixfrom('From foo@bar blah')
1216 msg
= mailbox
.mboxMessage(msg
)
1217 self
.assertEqual(msg
.get_from(), 'foo@bar blah')
1219 def test_from(self
):
1220 # Get and set "From " line
1221 msg
= mailbox
.mboxMessage(_sample_message
)
1222 self
._check
_from
(msg
)
1223 msg
.set_from('foo bar')
1224 self
.assertEqual(msg
.get_from(), 'foo bar')
1225 msg
.set_from('foo@bar', True)
1226 self
._check
_from
(msg
, 'foo@bar')
1227 msg
.set_from('blah@temp', time
.localtime())
1228 self
._check
_from
(msg
, 'blah@temp')
1230 def test_flags(self
):
1231 # Use get_flags(), set_flags(), add_flag(), remove_flag()
1232 msg
= mailbox
.mboxMessage(_sample_message
)
1233 self
.assertEqual(msg
.get_flags(), '')
1235 self
.assertEqual(msg
.get_flags(), 'F')
1236 msg
.set_flags('XODR')
1237 self
.assertEqual(msg
.get_flags(), 'RODX')
1239 self
.assertEqual(msg
.get_flags(), 'RODFAX')
1240 msg
.remove_flag('FDXA')
1241 self
.assertEqual(msg
.get_flags(), 'RO')
1242 self
._check
_sample
(msg
)
1244 def _check_from(self
, msg
, sender
=None):
1245 # Check contents of "From " line
1247 sender
= "MAILER-DAEMON"
1248 self
.assertTrue(re
.match(sender
+ r
" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:"
1249 r
"\d{2} \d{4}", msg
.get_from()))
1252 class TestMboxMessage(_TestMboxMMDFMessage
):
1254 _factory
= mailbox
.mboxMessage
1257 class TestMHMessage(TestMessage
):
1259 _factory
= mailbox
.MHMessage
1261 def _post_initialize_hook(self
, msg
):
1262 self
.assertEqual(msg
._sequences
, [])
1264 def test_sequences(self
):
1265 # Get, set, join, and leave sequences
1266 msg
= mailbox
.MHMessage(_sample_message
)
1267 self
.assertEqual(msg
.get_sequences(), [])
1268 msg
.set_sequences(['foobar'])
1269 self
.assertEqual(msg
.get_sequences(), ['foobar'])
1270 msg
.set_sequences([])
1271 self
.assertEqual(msg
.get_sequences(), [])
1272 msg
.add_sequence('unseen')
1273 self
.assertEqual(msg
.get_sequences(), ['unseen'])
1274 msg
.add_sequence('flagged')
1275 self
.assertEqual(msg
.get_sequences(), ['unseen', 'flagged'])
1276 msg
.add_sequence('flagged')
1277 self
.assertEqual(msg
.get_sequences(), ['unseen', 'flagged'])
1278 msg
.remove_sequence('unseen')
1279 self
.assertEqual(msg
.get_sequences(), ['flagged'])
1280 msg
.add_sequence('foobar')
1281 self
.assertEqual(msg
.get_sequences(), ['flagged', 'foobar'])
1282 msg
.remove_sequence('replied')
1283 self
.assertEqual(msg
.get_sequences(), ['flagged', 'foobar'])
1284 msg
.set_sequences(['foobar', 'replied'])
1285 self
.assertEqual(msg
.get_sequences(), ['foobar', 'replied'])
1288 class TestBabylMessage(TestMessage
):
1290 _factory
= mailbox
.BabylMessage
1292 def _post_initialize_hook(self
, msg
):
1293 self
.assertEqual(msg
._labels
, [])
1295 def test_labels(self
):
1296 # Get, set, join, and leave labels
1297 msg
= mailbox
.BabylMessage(_sample_message
)
1298 self
.assertEqual(msg
.get_labels(), [])
1299 msg
.set_labels(['foobar'])
1300 self
.assertEqual(msg
.get_labels(), ['foobar'])
1302 self
.assertEqual(msg
.get_labels(), [])
1303 msg
.add_label('filed')
1304 self
.assertEqual(msg
.get_labels(), ['filed'])
1305 msg
.add_label('resent')
1306 self
.assertEqual(msg
.get_labels(), ['filed', 'resent'])
1307 msg
.add_label('resent')
1308 self
.assertEqual(msg
.get_labels(), ['filed', 'resent'])
1309 msg
.remove_label('filed')
1310 self
.assertEqual(msg
.get_labels(), ['resent'])
1311 msg
.add_label('foobar')
1312 self
.assertEqual(msg
.get_labels(), ['resent', 'foobar'])
1313 msg
.remove_label('unseen')
1314 self
.assertEqual(msg
.get_labels(), ['resent', 'foobar'])
1315 msg
.set_labels(['foobar', 'answered'])
1316 self
.assertEqual(msg
.get_labels(), ['foobar', 'answered'])
1318 def test_visible(self
):
1319 # Get, set, and update visible headers
1320 msg
= mailbox
.BabylMessage(_sample_message
)
1321 visible
= msg
.get_visible()
1322 self
.assertEqual(visible
.keys(), [])
1323 self
.assertIs(visible
.get_payload(), None)
1324 visible
['User-Agent'] = 'FooBar 1.0'
1325 visible
['X-Whatever'] = 'Blah'
1326 self
.assertEqual(msg
.get_visible().keys(), [])
1327 msg
.set_visible(visible
)
1328 visible
= msg
.get_visible()
1329 self
.assertEqual(visible
.keys(), ['User-Agent', 'X-Whatever'])
1330 self
.assertEqual(visible
['User-Agent'], 'FooBar 1.0')
1331 self
.assertEqual(visible
['X-Whatever'], 'Blah')
1332 self
.assertIs(visible
.get_payload(), None)
1333 msg
.update_visible()
1334 self
.assertEqual(visible
.keys(), ['User-Agent', 'X-Whatever'])
1335 self
.assertIs(visible
.get_payload(), None)
1336 visible
= msg
.get_visible()
1337 self
.assertEqual(visible
.keys(), ['User-Agent', 'Date', 'From', 'To',
1339 for header
in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
1340 self
.assertEqual(visible
[header
], msg
[header
])
1343 class TestMMDFMessage(_TestMboxMMDFMessage
):
1345 _factory
= mailbox
.MMDFMessage
1348 class TestMessageConversion(TestBase
):
1350 def test_plain_to_x(self
):
1351 # Convert Message to all formats
1352 for class_
in (mailbox
.Message
, mailbox
.MaildirMessage
,
1353 mailbox
.mboxMessage
, mailbox
.MHMessage
,
1354 mailbox
.BabylMessage
, mailbox
.MMDFMessage
):
1355 msg_plain
= mailbox
.Message(_sample_message
)
1356 msg
= class_(msg_plain
)
1357 self
._check
_sample
(msg
)
1359 def test_x_to_plain(self
):
1360 # Convert all formats to Message
1361 for class_
in (mailbox
.Message
, mailbox
.MaildirMessage
,
1362 mailbox
.mboxMessage
, mailbox
.MHMessage
,
1363 mailbox
.BabylMessage
, mailbox
.MMDFMessage
):
1364 msg
= class_(_sample_message
)
1365 msg_plain
= mailbox
.Message(msg
)
1366 self
._check
_sample
(msg_plain
)
1368 def test_x_to_invalid(self
):
1369 # Convert all formats to an invalid format
1370 for class_
in (mailbox
.Message
, mailbox
.MaildirMessage
,
1371 mailbox
.mboxMessage
, mailbox
.MHMessage
,
1372 mailbox
.BabylMessage
, mailbox
.MMDFMessage
):
1373 self
.assertRaises(TypeError, lambda: class_(False))
1375 def test_maildir_to_maildir(self
):
1376 # Convert MaildirMessage to MaildirMessage
1377 msg_maildir
= mailbox
.MaildirMessage(_sample_message
)
1378 msg_maildir
.set_flags('DFPRST')
1379 msg_maildir
.set_subdir('cur')
1380 date
= msg_maildir
.get_date()
1381 msg
= mailbox
.MaildirMessage(msg_maildir
)
1382 self
._check
_sample
(msg
)
1383 self
.assertEqual(msg
.get_flags(), 'DFPRST')
1384 self
.assertEqual(msg
.get_subdir(), 'cur')
1385 self
.assertEqual(msg
.get_date(), date
)
1387 def test_maildir_to_mboxmmdf(self
):
1388 # Convert MaildirMessage to mboxmessage and MMDFMessage
1389 pairs
= (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
1390 ('T', 'D'), ('DFPRST', 'RDFA'))
1391 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1392 msg_maildir
= mailbox
.MaildirMessage(_sample_message
)
1393 msg_maildir
.set_date(0.0)
1394 for setting
, result
in pairs
:
1395 msg_maildir
.set_flags(setting
)
1396 msg
= class_(msg_maildir
)
1397 self
.assertEqual(msg
.get_flags(), result
)
1398 self
.assertEqual(msg
.get_from(), 'MAILER-DAEMON %s' %
1399 time
.asctime(time
.gmtime(0.0)))
1400 msg_maildir
.set_subdir('cur')
1401 self
.assertEqual(class_(msg_maildir
).get_flags(), 'RODFA')
1403 def test_maildir_to_mh(self
):
1404 # Convert MaildirMessage to MHMessage
1405 msg_maildir
= mailbox
.MaildirMessage(_sample_message
)
1406 pairs
= (('D', ['unseen']), ('F', ['unseen', 'flagged']),
1407 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
1408 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
1409 for setting
, result
in pairs
:
1410 msg_maildir
.set_flags(setting
)
1411 self
.assertEqual(mailbox
.MHMessage(msg_maildir
).get_sequences(),
1414 def test_maildir_to_babyl(self
):
1415 # Convert MaildirMessage to Babyl
1416 msg_maildir
= mailbox
.MaildirMessage(_sample_message
)
1417 pairs
= (('D', ['unseen']), ('F', ['unseen']),
1418 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
1419 ('S', []), ('T', ['unseen', 'deleted']),
1420 ('DFPRST', ['deleted', 'answered', 'forwarded']))
1421 for setting
, result
in pairs
:
1422 msg_maildir
.set_flags(setting
)
1423 self
.assertEqual(mailbox
.BabylMessage(msg_maildir
).get_labels(),
1426 def test_mboxmmdf_to_maildir(self
):
1427 # Convert mboxMessage and MMDFMessage to MaildirMessage
1428 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1429 msg_mboxMMDF
= class_(_sample_message
)
1430 msg_mboxMMDF
.set_from('foo@bar', time
.gmtime(0.0))
1431 pairs
= (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
1433 for setting
, result
in pairs
:
1434 msg_mboxMMDF
.set_flags(setting
)
1435 msg
= mailbox
.MaildirMessage(msg_mboxMMDF
)
1436 self
.assertEqual(msg
.get_flags(), result
)
1437 self
.assertEqual(msg
.get_date(), 0.0)
1438 msg_mboxMMDF
.set_flags('O')
1439 self
.assertEqual(mailbox
.MaildirMessage(msg_mboxMMDF
).get_subdir(),
1442 def test_mboxmmdf_to_mboxmmdf(self
):
1443 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
1444 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1445 msg_mboxMMDF
= class_(_sample_message
)
1446 msg_mboxMMDF
.set_flags('RODFA')
1447 msg_mboxMMDF
.set_from('foo@bar')
1448 for class2_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1449 msg2
= class2_(msg_mboxMMDF
)
1450 self
.assertEqual(msg2
.get_flags(), 'RODFA')
1451 self
.assertEqual(msg2
.get_from(), 'foo@bar')
1453 def test_mboxmmdf_to_mh(self
):
1454 # Convert mboxMessage and MMDFMessage to MHMessage
1455 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1456 msg_mboxMMDF
= class_(_sample_message
)
1457 pairs
= (('R', []), ('O', ['unseen']), ('D', ['unseen']),
1458 ('F', ['unseen', 'flagged']),
1459 ('A', ['unseen', 'replied']),
1460 ('RODFA', ['replied', 'flagged']))
1461 for setting
, result
in pairs
:
1462 msg_mboxMMDF
.set_flags(setting
)
1463 self
.assertEqual(mailbox
.MHMessage(msg_mboxMMDF
).get_sequences(),
1466 def test_mboxmmdf_to_babyl(self
):
1467 # Convert mboxMessage and MMDFMessage to BabylMessage
1468 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1469 msg
= class_(_sample_message
)
1470 pairs
= (('R', []), ('O', ['unseen']),
1471 ('D', ['unseen', 'deleted']), ('F', ['unseen']),
1472 ('A', ['unseen', 'answered']),
1473 ('RODFA', ['deleted', 'answered']))
1474 for setting
, result
in pairs
:
1475 msg
.set_flags(setting
)
1476 self
.assertEqual(mailbox
.BabylMessage(msg
).get_labels(), result
)
1478 def test_mh_to_maildir(self
):
1479 # Convert MHMessage to MaildirMessage
1480 pairs
= (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
1481 for setting
, result
in pairs
:
1482 msg
= mailbox
.MHMessage(_sample_message
)
1483 msg
.add_sequence(setting
)
1484 self
.assertEqual(mailbox
.MaildirMessage(msg
).get_flags(), result
)
1485 self
.assertEqual(mailbox
.MaildirMessage(msg
).get_subdir(), 'cur')
1486 msg
= mailbox
.MHMessage(_sample_message
)
1487 msg
.add_sequence('unseen')
1488 msg
.add_sequence('replied')
1489 msg
.add_sequence('flagged')
1490 self
.assertEqual(mailbox
.MaildirMessage(msg
).get_flags(), 'FR')
1491 self
.assertEqual(mailbox
.MaildirMessage(msg
).get_subdir(), 'cur')
1493 def test_mh_to_mboxmmdf(self
):
1494 # Convert MHMessage to mboxMessage and MMDFMessage
1495 pairs
= (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
1496 for setting
, result
in pairs
:
1497 msg
= mailbox
.MHMessage(_sample_message
)
1498 msg
.add_sequence(setting
)
1499 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1500 self
.assertEqual(class_(msg
).get_flags(), result
)
1501 msg
= mailbox
.MHMessage(_sample_message
)
1502 msg
.add_sequence('unseen')
1503 msg
.add_sequence('replied')
1504 msg
.add_sequence('flagged')
1505 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1506 self
.assertEqual(class_(msg
).get_flags(), 'OFA')
1508 def test_mh_to_mh(self
):
1509 # Convert MHMessage to MHMessage
1510 msg
= mailbox
.MHMessage(_sample_message
)
1511 msg
.add_sequence('unseen')
1512 msg
.add_sequence('replied')
1513 msg
.add_sequence('flagged')
1514 self
.assertEqual(mailbox
.MHMessage(msg
).get_sequences(),
1515 ['unseen', 'replied', 'flagged'])
1517 def test_mh_to_babyl(self
):
1518 # Convert MHMessage to BabylMessage
1519 pairs
= (('unseen', ['unseen']), ('replied', ['answered']),
1521 for setting
, result
in pairs
:
1522 msg
= mailbox
.MHMessage(_sample_message
)
1523 msg
.add_sequence(setting
)
1524 self
.assertEqual(mailbox
.BabylMessage(msg
).get_labels(), result
)
1525 msg
= mailbox
.MHMessage(_sample_message
)
1526 msg
.add_sequence('unseen')
1527 msg
.add_sequence('replied')
1528 msg
.add_sequence('flagged')
1529 self
.assertEqual(mailbox
.BabylMessage(msg
).get_labels(),
1530 ['unseen', 'answered'])
1532 def test_babyl_to_maildir(self
):
1533 # Convert BabylMessage to MaildirMessage
1534 pairs
= (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
1535 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
1537 for setting
, result
in pairs
:
1538 msg
= mailbox
.BabylMessage(_sample_message
)
1539 msg
.add_label(setting
)
1540 self
.assertEqual(mailbox
.MaildirMessage(msg
).get_flags(), result
)
1541 self
.assertEqual(mailbox
.MaildirMessage(msg
).get_subdir(), 'cur')
1542 msg
= mailbox
.BabylMessage(_sample_message
)
1543 for label
in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1544 'edited', 'resent'):
1545 msg
.add_label(label
)
1546 self
.assertEqual(mailbox
.MaildirMessage(msg
).get_flags(), 'PRT')
1547 self
.assertEqual(mailbox
.MaildirMessage(msg
).get_subdir(), 'cur')
1549 def test_babyl_to_mboxmmdf(self
):
1550 # Convert BabylMessage to mboxMessage and MMDFMessage
1551 pairs
= (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
1552 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
1554 for setting
, result
in pairs
:
1555 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1556 msg
= mailbox
.BabylMessage(_sample_message
)
1557 msg
.add_label(setting
)
1558 self
.assertEqual(class_(msg
).get_flags(), result
)
1559 msg
= mailbox
.BabylMessage(_sample_message
)
1560 for label
in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1561 'edited', 'resent'):
1562 msg
.add_label(label
)
1563 for class_
in (mailbox
.mboxMessage
, mailbox
.MMDFMessage
):
1564 self
.assertEqual(class_(msg
).get_flags(), 'ODA')
1566 def test_babyl_to_mh(self
):
1567 # Convert BabylMessage to MHMessage
1568 pairs
= (('unseen', ['unseen']), ('deleted', []), ('filed', []),
1569 ('answered', ['replied']), ('forwarded', []), ('edited', []),
1571 for setting
, result
in pairs
:
1572 msg
= mailbox
.BabylMessage(_sample_message
)
1573 msg
.add_label(setting
)
1574 self
.assertEqual(mailbox
.MHMessage(msg
).get_sequences(), result
)
1575 msg
= mailbox
.BabylMessage(_sample_message
)
1576 for label
in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1577 'edited', 'resent'):
1578 msg
.add_label(label
)
1579 self
.assertEqual(mailbox
.MHMessage(msg
).get_sequences(),
1580 ['unseen', 'replied'])
1582 def test_babyl_to_babyl(self
):
1583 # Convert BabylMessage to BabylMessage
1584 msg
= mailbox
.BabylMessage(_sample_message
)
1585 msg
.update_visible()
1586 for label
in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1587 'edited', 'resent'):
1588 msg
.add_label(label
)
1589 msg2
= mailbox
.BabylMessage(msg
)
1590 self
.assertEqual(msg2
.get_labels(), ['unseen', 'deleted', 'filed',
1591 'answered', 'forwarded', 'edited',
1593 self
.assertEqual(msg
.get_visible().keys(), msg2
.get_visible().keys())
1594 for key
in msg
.get_visible().keys():
1595 self
.assertEqual(msg
.get_visible()[key
], msg2
.get_visible()[key
])
1598 class TestProxyFileBase(TestBase
):
1600 def _test_read(self
, proxy
):
1603 self
.assertEqual(proxy
.read(), 'bar')
1605 self
.assertEqual(proxy
.read(), 'ar')
1607 self
.assertEqual(proxy
.read(2), 'ba')
1609 self
.assertEqual(proxy
.read(-1), 'ar')
1611 self
.assertEqual(proxy
.read(1000), 'r')
1613 def _test_readline(self
, proxy
):
1616 self
.assertEqual(proxy
.readline(), 'foo' + os
.linesep
)
1617 self
.assertEqual(proxy
.readline(), 'bar' + os
.linesep
)
1618 self
.assertEqual(proxy
.readline(), 'fred' + os
.linesep
)
1619 self
.assertEqual(proxy
.readline(), 'bob')
1621 self
.assertEqual(proxy
.readline(), 'o' + os
.linesep
)
1622 proxy
.seek(6 + 2 * len(os
.linesep
))
1623 self
.assertEqual(proxy
.readline(), 'fred' + os
.linesep
)
1624 proxy
.seek(6 + 2 * len(os
.linesep
))
1625 self
.assertEqual(proxy
.readline(2), 'fr')
1626 self
.assertEqual(proxy
.readline(-10), 'ed' + os
.linesep
)
1628 def _test_readlines(self
, proxy
):
1629 # Read multiple lines
1631 self
.assertEqual(proxy
.readlines(), ['foo' + os
.linesep
,
1633 'fred' + os
.linesep
, 'bob'])
1635 self
.assertEqual(proxy
.readlines(2), ['foo' + os
.linesep
])
1636 proxy
.seek(3 + len(os
.linesep
))
1637 self
.assertEqual(proxy
.readlines(4 + len(os
.linesep
)),
1638 ['bar' + os
.linesep
, 'fred' + os
.linesep
])
1640 self
.assertEqual(proxy
.readlines(1000), [os
.linesep
, 'bar' + os
.linesep
,
1641 'fred' + os
.linesep
, 'bob'])
1643 def _test_iteration(self
, proxy
):
1646 iterator
= iter(proxy
)
1647 self
.assertEqual(list(iterator
),
1648 ['foo' + os
.linesep
, 'bar' + os
.linesep
, 'fred' + os
.linesep
, 'bob'])
1650 def _test_seek_and_tell(self
, proxy
):
1651 # Seek and use tell to check position
1653 self
.assertEqual(proxy
.tell(), 3)
1654 self
.assertEqual(proxy
.read(len(os
.linesep
)), os
.linesep
)
1656 self
.assertEqual(proxy
.read(1 + len(os
.linesep
)), 'r' + os
.linesep
)
1657 proxy
.seek(-3 - len(os
.linesep
), 2)
1658 self
.assertEqual(proxy
.read(3), 'bar')
1660 self
.assertEqual(proxy
.read(), 'o' + os
.linesep
+ 'bar' + os
.linesep
)
1662 self
.assertEqual(proxy
.read(), '')
1664 def _test_close(self
, proxy
):
1667 self
.assertRaises(AttributeError, lambda: proxy
.close())
1670 class TestProxyFile(TestProxyFileBase
):
1673 self
._path
= test_support
.TESTFN
1674 self
._file
= open(self
._path
, 'wb+')
1678 self
._delete
_recursively
(self
._path
)
1680 def test_initialize(self
):
1681 # Initialize and check position
1682 self
._file
.write('foo')
1683 pos
= self
._file
.tell()
1684 proxy0
= mailbox
._ProxyFile
(self
._file
)
1685 self
.assertEqual(proxy0
.tell(), pos
)
1686 self
.assertEqual(self
._file
.tell(), pos
)
1687 proxy1
= mailbox
._ProxyFile
(self
._file
, 0)
1688 self
.assertEqual(proxy1
.tell(), 0)
1689 self
.assertEqual(self
._file
.tell(), pos
)
1691 def test_read(self
):
1692 self
._file
.write('bar')
1693 self
._test
_read
(mailbox
._ProxyFile
(self
._file
))
1695 def test_readline(self
):
1696 self
._file
.write('foo%sbar%sfred%sbob' % (os
.linesep
, os
.linesep
,
1698 self
._test
_readline
(mailbox
._ProxyFile
(self
._file
))
1700 def test_readlines(self
):
1701 self
._file
.write('foo%sbar%sfred%sbob' % (os
.linesep
, os
.linesep
,
1703 self
._test
_readlines
(mailbox
._ProxyFile
(self
._file
))
1705 def test_iteration(self
):
1706 self
._file
.write('foo%sbar%sfred%sbob' % (os
.linesep
, os
.linesep
,
1708 self
._test
_iteration
(mailbox
._ProxyFile
(self
._file
))
1710 def test_seek_and_tell(self
):
1711 self
._file
.write('foo%sbar%s' % (os
.linesep
, os
.linesep
))
1712 self
._test
_seek
_and
_tell
(mailbox
._ProxyFile
(self
._file
))
1714 def test_close(self
):
1715 self
._file
.write('foo%sbar%s' % (os
.linesep
, os
.linesep
))
1716 self
._test
_close
(mailbox
._ProxyFile
(self
._file
))
1719 class TestPartialFile(TestProxyFileBase
):
1722 self
._path
= test_support
.TESTFN
1723 self
._file
= open(self
._path
, 'wb+')
1727 self
._delete
_recursively
(self
._path
)
1729 def test_initialize(self
):
1730 # Initialize and check position
1731 self
._file
.write('foo' + os
.linesep
+ 'bar')
1732 pos
= self
._file
.tell()
1733 proxy
= mailbox
._PartialFile
(self
._file
, 2, 5)
1734 self
.assertEqual(proxy
.tell(), 0)
1735 self
.assertEqual(self
._file
.tell(), pos
)
1737 def test_read(self
):
1738 self
._file
.write('***bar***')
1739 self
._test
_read
(mailbox
._PartialFile
(self
._file
, 3, 6))
1741 def test_readline(self
):
1742 self
._file
.write('!!!!!foo%sbar%sfred%sbob!!!!!' %
1743 (os
.linesep
, os
.linesep
, os
.linesep
))
1744 self
._test
_readline
(mailbox
._PartialFile
(self
._file
, 5,
1745 18 + 3 * len(os
.linesep
)))
1747 def test_readlines(self
):
1748 self
._file
.write('foo%sbar%sfred%sbob?????' %
1749 (os
.linesep
, os
.linesep
, os
.linesep
))
1750 self
._test
_readlines
(mailbox
._PartialFile
(self
._file
, 0,
1751 13 + 3 * len(os
.linesep
)))
1753 def test_iteration(self
):
1754 self
._file
.write('____foo%sbar%sfred%sbob####' %
1755 (os
.linesep
, os
.linesep
, os
.linesep
))
1756 self
._test
_iteration
(mailbox
._PartialFile
(self
._file
, 4,
1757 17 + 3 * len(os
.linesep
)))
1759 def test_seek_and_tell(self
):
1760 self
._file
.write('(((foo%sbar%s$$$' % (os
.linesep
, os
.linesep
))
1761 self
._test
_seek
_and
_tell
(mailbox
._PartialFile
(self
._file
, 3,
1762 9 + 2 * len(os
.linesep
)))
1764 def test_close(self
):
1765 self
._file
.write('&foo%sbar%s^' % (os
.linesep
, os
.linesep
))
1766 self
._test
_close
(mailbox
._PartialFile
(self
._file
, 1,
1767 6 + 3 * len(os
.linesep
)))
1770 ## Start: tests from the original module (for backward compatibility).
1772 FROM_
= "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n"
1773 DUMMY_MESSAGE
= """\
1774 From: some.body@dummy.domain
1776 Subject: Simple Test
1778 This is a dummy message.
1781 class MaildirTestCase(unittest
.TestCase
):
1784 # create a new maildir mailbox to work with:
1785 self
._dir
= test_support
.TESTFN
1787 os
.mkdir(os
.path
.join(self
._dir
, "cur"))
1788 os
.mkdir(os
.path
.join(self
._dir
, "tmp"))
1789 os
.mkdir(os
.path
.join(self
._dir
, "new"))
1794 map(os
.unlink
, self
._msgfiles
)
1795 os
.rmdir(os
.path
.join(self
._dir
, "cur"))
1796 os
.rmdir(os
.path
.join(self
._dir
, "tmp"))
1797 os
.rmdir(os
.path
.join(self
._dir
, "new"))
1800 def createMessage(self
, dir, mbox
=False):
1801 t
= int(time
.time() % 1000000)
1804 filename
= os
.extsep
.join((str(t
), str(pid
), "myhostname", "mydomain"))
1805 tmpname
= os
.path
.join(self
._dir
, "tmp", filename
)
1806 newname
= os
.path
.join(self
._dir
, dir, filename
)
1807 fp
= open(tmpname
, "w")
1808 self
._msgfiles
.append(tmpname
)
1811 fp
.write(DUMMY_MESSAGE
)
1813 if hasattr(os
, "link"):
1814 os
.link(tmpname
, newname
)
1816 fp
= open(newname
, "w")
1817 fp
.write(DUMMY_MESSAGE
)
1819 self
._msgfiles
.append(newname
)
1822 def test_empty_maildir(self
):
1823 """Test an empty maildir mailbox"""
1824 # Test for regression on bug #117490:
1825 # Make sure the boxes attribute actually gets set.
1826 self
.mbox
= mailbox
.Maildir(test_support
.TESTFN
)
1827 #self.assertTrue(hasattr(self.mbox, "boxes"))
1828 #self.assertTrue(len(self.mbox.boxes) == 0)
1829 self
.assertIs(self
.mbox
.next(), None)
1830 self
.assertIs(self
.mbox
.next(), None)
1832 def test_nonempty_maildir_cur(self
):
1833 self
.createMessage("cur")
1834 self
.mbox
= mailbox
.Maildir(test_support
.TESTFN
)
1835 #self.assertTrue(len(self.mbox.boxes) == 1)
1836 self
.assertIsNot(self
.mbox
.next(), None)
1837 self
.assertIs(self
.mbox
.next(), None)
1838 self
.assertIs(self
.mbox
.next(), None)
1840 def test_nonempty_maildir_new(self
):
1841 self
.createMessage("new")
1842 self
.mbox
= mailbox
.Maildir(test_support
.TESTFN
)
1843 #self.assertTrue(len(self.mbox.boxes) == 1)
1844 self
.assertIsNot(self
.mbox
.next(), None)
1845 self
.assertIs(self
.mbox
.next(), None)
1846 self
.assertIs(self
.mbox
.next(), None)
1848 def test_nonempty_maildir_both(self
):
1849 self
.createMessage("cur")
1850 self
.createMessage("new")
1851 self
.mbox
= mailbox
.Maildir(test_support
.TESTFN
)
1852 #self.assertTrue(len(self.mbox.boxes) == 2)
1853 self
.assertIsNot(self
.mbox
.next(), None)
1854 self
.assertIsNot(self
.mbox
.next(), None)
1855 self
.assertIs(self
.mbox
.next(), None)
1856 self
.assertIs(self
.mbox
.next(), None)
1858 def test_unix_mbox(self
):
1859 ### should be better!
1861 fname
= self
.createMessage("cur", True)
1863 for msg
in mailbox
.PortableUnixMailbox(open(fname
),
1864 email
.parser
.Parser().parse
):
1866 self
.assertEqual(msg
["subject"], "Simple Test")
1867 self
.assertEqual(len(str(msg
)), len(FROM_
)+len(DUMMY_MESSAGE
))
1868 self
.assertEqual(n
, 1)
1870 ## End: classes from the original module (for backward compatibility).
1873 _sample_message
= """\
1874 Return-Path: <gkj@gregorykjohnson.com>
1875 X-Original-To: gkj+person@localhost
1876 Delivered-To: gkj+person@localhost
1877 Received: from localhost (localhost [127.0.0.1])
1878 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
1879 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
1880 Delivered-To: gkj@sundance.gregorykjohnson.com
1881 Received: from localhost [127.0.0.1]
1882 by localhost with POP3 (fetchmail-6.2.5)
1883 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
1884 Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
1885 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
1886 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
1887 Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
1888 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
1889 Date: Wed, 13 Jul 2005 17:23:11 -0400
1890 From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
1891 To: gkj@gregorykjohnson.com
1892 Subject: Sample message
1893 Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
1895 Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
1896 Content-Disposition: inline
1897 User-Agent: Mutt/1.5.9i
1901 Content-Type: text/plain; charset=us-ascii
1902 Content-Disposition: inline
1904 This is a sample message.
1910 Content-Type: application/octet-stream
1911 Content-Disposition: attachment; filename="text.gz"
1912 Content-Transfer-Encoding: base64
1914 H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
1917 --NMuMz9nt05w80d4+--
1921 "Return-Path":"<gkj@gregorykjohnson.com>",
1922 "X-Original-To":"gkj+person@localhost",
1923 "Delivered-To":"gkj+person@localhost",
1924 "Received":"""from localhost (localhost [127.0.0.1])
1925 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
1926 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
1927 "Delivered-To":"gkj@sundance.gregorykjohnson.com",
1928 "Received":"""from localhost [127.0.0.1]
1929 by localhost with POP3 (fetchmail-6.2.5)
1930 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
1931 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
1932 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
1933 for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
1934 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
1935 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
1936 "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
1937 "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
1938 "To":"gkj@gregorykjohnson.com",
1939 "Subject":"Sample message",
1940 "Mime-Version":"1.0",
1941 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
1942 "Content-Disposition":"inline",
1943 "User-Agent": "Mutt/1.5.9i" }
1945 _sample_payloads = ("""This
is a sample message
.
1950 """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
1956 tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
1957 TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
1958 TestMHMessage, TestBabylMessage, TestMMDFMessage,
1959 TestMessageConversion, TestProxyFile, TestPartialFile,
1961 test_support.run_unittest(*tests)
1962 test_support.reap_children()
1965 if __name__ == '__main__':