1 """Unit tests for contextlib.py, and other context managers."""
6 from contextlib
import * # Tests __all__
7 from test
import test_support
14 class ContextManagerTestCase(unittest
.TestCase
):
16 def test_contextmanager_plain(self
):
24 self
.assertEqual(state
, [1])
25 self
.assertEqual(x
, 42)
27 self
.assertEqual(state
, [1, 42, 999])
29 def test_contextmanager_finally(self
):
38 with self
.assertRaises(ZeroDivisionError):
40 self
.assertEqual(state
, [1])
41 self
.assertEqual(x
, 42)
43 raise ZeroDivisionError()
44 self
.assertEqual(state
, [1, 42, 999])
46 def test_contextmanager_no_reraise(self
):
52 # Calling __exit__ should not result in an exception
53 self
.assertFalse(ctx
.__exit
__(TypeError, TypeError("foo"), None))
55 def test_contextmanager_trap_yield_after_throw(self
):
65 RuntimeError, ctx
.__exit
__, TypeError, TypeError("foo"), None
68 def test_contextmanager_except(self
):
75 except ZeroDivisionError, e
:
76 state
.append(e
.args
[0])
77 self
.assertEqual(state
, [1, 42, 999])
79 self
.assertEqual(state
, [1])
80 self
.assertEqual(x
, 42)
82 raise ZeroDivisionError(999)
83 self
.assertEqual(state
, [1, 42, 999])
85 def _create_contextmanager_attribs(self
):
88 for k
,v
in kw
.items():
98 def test_contextmanager_attribs(self
):
99 baz
= self
._create
_contextmanager
_attribs
()
100 self
.assertEqual(baz
.__name
__,'baz')
101 self
.assertEqual(baz
.foo
, 'bar')
103 @unittest.skipIf(sys
.flags
.optimize
>= 2,
104 "Docstrings are omitted with -O2 and above")
105 def test_contextmanager_doc_attrib(self
):
106 baz
= self
._create
_contextmanager
_attribs
()
107 self
.assertEqual(baz
.__doc
__, "Whee!")
109 class NestedTestCase(unittest
.TestCase
):
111 # XXX This needs more work
113 def test_nested(self
):
123 with
nested(a(), b(), c()) as (x
, y
, z
):
124 self
.assertEqual(x
, 1)
125 self
.assertEqual(y
, 2)
126 self
.assertEqual(z
, 3)
128 def test_nested_cleanup(self
):
144 with self
.assertRaises(ZeroDivisionError):
145 with
nested(a(), b()) as (x
, y
):
149 self
.assertEqual(state
, [1, 4, 2, 5, 6, 3])
151 def test_nested_right_exception(self
):
158 def __exit__(self
, *exc_info
):
163 with self
.assertRaises(ZeroDivisionError):
164 with
nested(a(), b()) as (x
, y
):
166 self
.assertEqual((x
, y
), (1, 2))
168 def test_nested_b_swallows(self
):
177 # Swallow the exception
180 with
nested(a(), b()):
182 except ZeroDivisionError:
183 self
.fail("Didn't swallow ZeroDivisionError")
185 def test_nested_break(self
):
192 with
nested(a(), a()):
195 self
.assertEqual(state
, 1)
197 def test_nested_continue(self
):
204 with
nested(a(), a()):
207 self
.assertEqual(state
, 3)
209 def test_nested_return(self
):
217 with
nested(a(), a()):
220 self
.assertEqual(foo(), 1)
222 class ClosingTestCase(unittest
.TestCase
):
224 # XXX This needs more work
226 def test_closing(self
):
232 self
.assertEqual(state
, [])
233 with
closing(x
) as y
:
234 self
.assertEqual(x
, y
)
235 self
.assertEqual(state
, [1])
237 def test_closing_error(self
):
243 self
.assertEqual(state
, [])
244 with self
.assertRaises(ZeroDivisionError):
245 with
closing(x
) as y
:
246 self
.assertEqual(x
, y
)
248 self
.assertEqual(state
, [1])
250 class FileContextTestCase(unittest
.TestCase
):
252 def testWithOpen(self
):
253 tfn
= tempfile
.mktemp()
256 with
open(tfn
, "w") as f
:
257 self
.assertFalse(f
.closed
)
259 self
.assertTrue(f
.closed
)
261 with self
.assertRaises(ZeroDivisionError):
262 with
open(tfn
, "r") as f
:
263 self
.assertFalse(f
.closed
)
264 self
.assertEqual(f
.read(), "Booh\n")
266 self
.assertTrue(f
.closed
)
268 test_support
.unlink(tfn
)
270 @unittest.skipUnless(threading
, 'Threading required for this test.')
271 class LockContextTestCase(unittest
.TestCase
):
273 def boilerPlate(self
, lock
, locked
):
274 self
.assertFalse(locked())
276 self
.assertTrue(locked())
277 self
.assertFalse(locked())
278 with self
.assertRaises(ZeroDivisionError):
280 self
.assertTrue(locked())
282 self
.assertFalse(locked())
284 def testWithLock(self
):
285 lock
= threading
.Lock()
286 self
.boilerPlate(lock
, lock
.locked
)
288 def testWithRLock(self
):
289 lock
= threading
.RLock()
290 self
.boilerPlate(lock
, lock
._is
_owned
)
292 def testWithCondition(self
):
293 lock
= threading
.Condition()
295 return lock
._is
_owned
()
296 self
.boilerPlate(lock
, locked
)
298 def testWithSemaphore(self
):
299 lock
= threading
.Semaphore()
301 if lock
.acquire(False):
306 self
.boilerPlate(lock
, locked
)
308 def testWithBoundedSemaphore(self
):
309 lock
= threading
.BoundedSemaphore()
311 if lock
.acquire(False):
316 self
.boilerPlate(lock
, locked
)
318 # This is needed to make the test actually run under regrtest.py!
320 with test_support
.check_warnings(("With-statements now directly support "
321 "multiple context managers",
322 DeprecationWarning)):
323 test_support
.run_unittest(__name__
)
325 if __name__
== "__main__":