1 """Unit tests for contextlib.py, and other context managers."""
10 from contextlib
import * # Tests __all__
11 from test
import test_support
14 class ContextManagerTestCase(unittest
.TestCase
):
16 def test_contextmanager_plain(self
):
24 self
.assertEqual(state
, [1])
25 self
.assertEqual(x
, 42)
27 self
.assertEqual(state
, [1, 42, 999])
29 def test_contextmanager_finally(self
):
40 self
.assertEqual(state
, [1])
41 self
.assertEqual(x
, 42)
43 raise ZeroDivisionError()
44 except ZeroDivisionError:
47 self
.fail("Expected ZeroDivisionError")
48 self
.assertEqual(state
, [1, 42, 999])
50 def test_contextmanager_no_reraise(self
):
56 # Calling __exit__ should not result in an exception
57 self
.assertFalse(ctx
.__exit
__(TypeError, TypeError("foo"), None))
59 def test_contextmanager_trap_yield_after_throw(self
):
69 RuntimeError, ctx
.__exit
__, TypeError, TypeError("foo"), None
72 def test_contextmanager_except(self
):
79 except ZeroDivisionError, e
:
80 state
.append(e
.args
[0])
81 self
.assertEqual(state
, [1, 42, 999])
83 self
.assertEqual(state
, [1])
84 self
.assertEqual(x
, 42)
86 raise ZeroDivisionError(999)
87 self
.assertEqual(state
, [1, 42, 999])
89 def test_contextmanager_attribs(self
):
92 for k
,v
in kw
.items():
100 self
.assertEqual(baz
.__name
__,'baz')
101 self
.assertEqual(baz
.foo
, 'bar')
102 self
.assertEqual(baz
.__doc
__, "Whee!")
104 class NestedTestCase(unittest
.TestCase
):
106 # XXX This needs more work
108 def test_nested(self
):
118 with
nested(a(), b(), c()) as (x
, y
, z
):
119 self
.assertEqual(x
, 1)
120 self
.assertEqual(y
, 2)
121 self
.assertEqual(z
, 3)
123 def test_nested_cleanup(self
):
140 with
nested(a(), b()) as (x
, y
):
144 except ZeroDivisionError:
145 self
.assertEqual(state
, [1, 4, 2, 5, 6, 3])
147 self
.fail("Didn't raise ZeroDivisionError")
149 def test_nested_right_exception(self
):
157 def __exit__(self
, *exc_info
):
163 with
nested(a(), b()) as (x
, y
):
165 except ZeroDivisionError:
166 self
.assertEqual((x
, y
), (1, 2))
168 self
.fail("Reraised wrong exception")
170 self
.fail("Didn't raise ZeroDivisionError")
172 def test_nested_b_swallows(self
):
181 # Swallow the exception
184 with
nested(a(), b()):
186 except ZeroDivisionError:
187 self
.fail("Didn't swallow ZeroDivisionError")
189 def test_nested_break(self
):
196 with
nested(a(), a()):
199 self
.assertEqual(state
, 1)
201 def test_nested_continue(self
):
208 with
nested(a(), a()):
211 self
.assertEqual(state
, 3)
213 def test_nested_return(self
):
221 with
nested(a(), a()):
224 self
.assertEqual(foo(), 1)
226 class ClosingTestCase(unittest
.TestCase
):
228 # XXX This needs more work
230 def test_closing(self
):
236 self
.assertEqual(state
, [])
237 with
closing(x
) as y
:
238 self
.assertEqual(x
, y
)
239 self
.assertEqual(state
, [1])
241 def test_closing_error(self
):
247 self
.assertEqual(state
, [])
249 with
closing(x
) as y
:
250 self
.assertEqual(x
, y
)
252 except ZeroDivisionError:
253 self
.assertEqual(state
, [1])
255 self
.fail("Didn't raise ZeroDivisionError")
257 class FileContextTestCase(unittest
.TestCase
):
259 def testWithOpen(self
):
260 tfn
= tempfile
.mktemp()
263 with
open(tfn
, "w") as f
:
264 self
.assertFalse(f
.closed
)
266 self
.assertTrue(f
.closed
)
269 with
open(tfn
, "r") as f
:
270 self
.assertFalse(f
.closed
)
271 self
.assertEqual(f
.read(), "Booh\n")
273 except ZeroDivisionError:
274 self
.assertTrue(f
.closed
)
276 self
.fail("Didn't raise ZeroDivisionError")
283 class LockContextTestCase(unittest
.TestCase
):
285 def boilerPlate(self
, lock
, locked
):
286 self
.assertFalse(locked())
288 self
.assertTrue(locked())
289 self
.assertFalse(locked())
292 self
.assertTrue(locked())
294 except ZeroDivisionError:
295 self
.assertFalse(locked())
297 self
.fail("Didn't raise ZeroDivisionError")
299 def testWithLock(self
):
300 lock
= threading
.Lock()
301 self
.boilerPlate(lock
, lock
.locked
)
303 def testWithRLock(self
):
304 lock
= threading
.RLock()
305 self
.boilerPlate(lock
, lock
._is
_owned
)
307 def testWithCondition(self
):
308 lock
= threading
.Condition()
310 return lock
._is
_owned
()
311 self
.boilerPlate(lock
, locked
)
313 def testWithSemaphore(self
):
314 lock
= threading
.Semaphore()
316 if lock
.acquire(False):
321 self
.boilerPlate(lock
, locked
)
323 def testWithBoundedSemaphore(self
):
324 lock
= threading
.BoundedSemaphore()
326 if lock
.acquire(False):
331 self
.boilerPlate(lock
, locked
)
333 # This is needed to make the test actually run under regrtest.py!
335 with warnings
.catch_warnings():
336 warnings
.simplefilter('ignore')
337 test_support
.run_unittest(__name__
)
339 if __name__
== "__main__":