move sections
[python/dscho.git] / Lib / test / test_bsddb.py
blobd1ee0a14a967aeeb33ad0be5a21c3cbb8a4737cc
1 #! /usr/bin/env python
2 """Test script for the bsddb C module by Roger E. Masse
3 Adapted to unittest format and expanded scope by Raymond Hettinger
4 """
5 import os, sys
6 import unittest
7 from test import test_support
9 # Skip test if _bsddb wasn't built.
10 test_support.import_module('_bsddb')
12 bsddb = test_support.import_module('bsddb', deprecated=True)
13 # Just so we know it's imported:
14 test_support.import_module('dbhash', deprecated=True)
17 class TestBSDDB(unittest.TestCase):
18 openflag = 'c'
20 def setUp(self):
21 self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
22 self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
23 for k, v in self.d.iteritems():
24 self.f[k] = v
26 def tearDown(self):
27 self.f.sync()
28 self.f.close()
29 if self.fname is None:
30 return
31 try:
32 os.remove(self.fname)
33 except os.error:
34 pass
36 def test_getitem(self):
37 for k, v in self.d.iteritems():
38 self.assertEqual(self.f[k], v)
40 def test_len(self):
41 self.assertEqual(len(self.f), len(self.d))
43 def test_change(self):
44 self.f['r'] = 'discovered'
45 self.assertEqual(self.f['r'], 'discovered')
46 self.assertIn('r', self.f.keys())
47 self.assertIn('discovered', self.f.values())
49 def test_close_and_reopen(self):
50 if self.fname is None:
51 # if we're using an in-memory only db, we can't reopen it
52 # so finish here.
53 return
54 self.f.close()
55 self.f = self.openmethod[0](self.fname, 'w')
56 for k, v in self.d.iteritems():
57 self.assertEqual(self.f[k], v)
59 def assertSetEquals(self, seqn1, seqn2):
60 self.assertEqual(set(seqn1), set(seqn2))
62 def test_mapping_iteration_methods(self):
63 f = self.f
64 d = self.d
65 self.assertSetEquals(d, f)
66 self.assertSetEquals(d.keys(), f.keys())
67 self.assertSetEquals(d.values(), f.values())
68 self.assertSetEquals(d.items(), f.items())
69 self.assertSetEquals(d.iterkeys(), f.iterkeys())
70 self.assertSetEquals(d.itervalues(), f.itervalues())
71 self.assertSetEquals(d.iteritems(), f.iteritems())
73 def test_iter_while_modifying_values(self):
74 di = iter(self.d)
75 while 1:
76 try:
77 key = di.next()
78 self.d[key] = 'modified '+key
79 except StopIteration:
80 break
82 # it should behave the same as a dict. modifying values
83 # of existing keys should not break iteration. (adding
84 # or removing keys should)
85 loops_left = len(self.f)
86 fi = iter(self.f)
87 while 1:
88 try:
89 key = fi.next()
90 self.f[key] = 'modified '+key
91 loops_left -= 1
92 except StopIteration:
93 break
94 self.assertEqual(loops_left, 0)
96 self.test_mapping_iteration_methods()
98 def test_iter_abort_on_changed_size(self):
99 def DictIterAbort():
100 di = iter(self.d)
101 while 1:
102 try:
103 di.next()
104 self.d['newkey'] = 'SPAM'
105 except StopIteration:
106 break
107 self.assertRaises(RuntimeError, DictIterAbort)
109 def DbIterAbort():
110 fi = iter(self.f)
111 while 1:
112 try:
113 fi.next()
114 self.f['newkey'] = 'SPAM'
115 except StopIteration:
116 break
117 self.assertRaises(RuntimeError, DbIterAbort)
119 def test_iteritems_abort_on_changed_size(self):
120 def DictIteritemsAbort():
121 di = self.d.iteritems()
122 while 1:
123 try:
124 di.next()
125 self.d['newkey'] = 'SPAM'
126 except StopIteration:
127 break
128 self.assertRaises(RuntimeError, DictIteritemsAbort)
130 def DbIteritemsAbort():
131 fi = self.f.iteritems()
132 while 1:
133 try:
134 key, value = fi.next()
135 del self.f[key]
136 except StopIteration:
137 break
138 self.assertRaises(RuntimeError, DbIteritemsAbort)
140 def test_iteritems_while_modifying_values(self):
141 di = self.d.iteritems()
142 while 1:
143 try:
144 k, v = di.next()
145 self.d[k] = 'modified '+v
146 except StopIteration:
147 break
149 # it should behave the same as a dict. modifying values
150 # of existing keys should not break iteration. (adding
151 # or removing keys should)
152 loops_left = len(self.f)
153 fi = self.f.iteritems()
154 while 1:
155 try:
156 k, v = fi.next()
157 self.f[k] = 'modified '+v
158 loops_left -= 1
159 except StopIteration:
160 break
161 self.assertEqual(loops_left, 0)
163 self.test_mapping_iteration_methods()
165 def test_first_next_looping(self):
166 items = [self.f.first()]
167 for i in xrange(1, len(self.f)):
168 items.append(self.f.next())
169 self.assertSetEquals(items, self.d.items())
171 def test_previous_last_looping(self):
172 items = [self.f.last()]
173 for i in xrange(1, len(self.f)):
174 items.append(self.f.previous())
175 self.assertSetEquals(items, self.d.items())
177 def test_first_while_deleting(self):
178 # Test for bug 1725856
179 self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
180 for _ in self.d:
181 key = self.f.first()[0]
182 del self.f[key]
183 self.assertEqual([], self.f.items(), "expected empty db after test")
185 def test_last_while_deleting(self):
186 # Test for bug 1725856's evil twin
187 self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
188 for _ in self.d:
189 key = self.f.last()[0]
190 del self.f[key]
191 self.assertEqual([], self.f.items(), "expected empty db after test")
193 def test_set_location(self):
194 self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
196 def test_contains(self):
197 for k in self.d:
198 self.assertIn(k, self.f)
199 self.assertNotIn('not here', self.f)
201 def test_has_key(self):
202 for k in self.d:
203 self.assertTrue(self.f.has_key(k))
204 self.assertTrue(not self.f.has_key('not here'))
206 def test_clear(self):
207 self.f.clear()
208 self.assertEqual(len(self.f), 0)
210 def test__no_deadlock_first(self, debug=0):
211 # do this so that testers can see what function we're in in
212 # verbose mode when we deadlock.
213 sys.stdout.flush()
215 # in pybsddb's _DBWithCursor this causes an internal DBCursor
216 # object is created. Other test_ methods in this class could
217 # inadvertently cause the deadlock but an explicit test is needed.
218 if debug: print "A"
219 k,v = self.f.first()
220 if debug: print "B", k
221 self.f[k] = "deadlock. do not pass go. do not collect $200."
222 if debug: print "C"
223 # if the bsddb implementation leaves the DBCursor open during
224 # the database write and locking+threading support is enabled
225 # the cursor's read lock will deadlock the write lock request..
227 # test the iterator interface
228 if True:
229 if debug: print "D"
230 i = self.f.iteritems()
231 k,v = i.next()
232 if debug: print "E"
233 self.f[k] = "please don't deadlock"
234 if debug: print "F"
235 while 1:
236 try:
237 k,v = i.next()
238 except StopIteration:
239 break
240 if debug: print "F2"
242 i = iter(self.f)
243 if debug: print "G"
244 while i:
245 try:
246 if debug: print "H"
247 k = i.next()
248 if debug: print "I"
249 self.f[k] = "deadlocks-r-us"
250 if debug: print "J"
251 except StopIteration:
252 i = None
253 if debug: print "K"
255 # test the legacy cursor interface mixed with writes
256 self.assertIn(self.f.first()[0], self.d)
257 k = self.f.next()[0]
258 self.assertIn(k, self.d)
259 self.f[k] = "be gone with ye deadlocks"
260 self.assertTrue(self.f[k], "be gone with ye deadlocks")
262 def test_for_cursor_memleak(self):
263 # do the bsddb._DBWithCursor iterator internals leak cursors?
264 nc1 = len(self.f._cursor_refs)
265 # create iterator
266 i = self.f.iteritems()
267 nc2 = len(self.f._cursor_refs)
268 # use the iterator (should run to the first yield, creating the cursor)
269 k, v = i.next()
270 nc3 = len(self.f._cursor_refs)
271 # destroy the iterator; this should cause the weakref callback
272 # to remove the cursor object from self.f._cursor_refs
273 del i
274 nc4 = len(self.f._cursor_refs)
276 self.assertEqual(nc1, nc2)
277 self.assertEqual(nc1, nc4)
278 self.assertTrue(nc3 == nc1+1)
280 def test_popitem(self):
281 k, v = self.f.popitem()
282 self.assertIn(k, self.d)
283 self.assertIn(v, self.d.values())
284 self.assertNotIn(k, self.f)
285 self.assertEqual(len(self.d)-1, len(self.f))
287 def test_pop(self):
288 k = 'w'
289 v = self.f.pop(k)
290 self.assertEqual(v, self.d[k])
291 self.assertNotIn(k, self.f)
292 self.assertNotIn(v, self.f.values())
293 self.assertEqual(len(self.d)-1, len(self.f))
295 def test_get(self):
296 self.assertEqual(self.f.get('NotHere'), None)
297 self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
298 self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
300 def test_setdefault(self):
301 self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
302 self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
304 def test_update(self):
305 new = dict(y='life', u='of', i='brian')
306 self.f.update(new)
307 self.d.update(new)
308 for k, v in self.d.iteritems():
309 self.assertEqual(self.f[k], v)
311 def test_keyordering(self):
312 if self.openmethod[0] is not bsddb.btopen:
313 return
314 keys = self.d.keys()
315 keys.sort()
316 self.assertEqual(self.f.first()[0], keys[0])
317 self.assertEqual(self.f.next()[0], keys[1])
318 self.assertEqual(self.f.last()[0], keys[-1])
319 self.assertEqual(self.f.previous()[0], keys[-2])
320 self.assertEqual(list(self.f), keys)
322 class TestBTree(TestBSDDB):
323 fname = test_support.TESTFN
324 openmethod = [bsddb.btopen]
326 class TestBTree_InMemory(TestBSDDB):
327 fname = None
328 openmethod = [bsddb.btopen]
330 class TestBTree_InMemory_Truncate(TestBSDDB):
331 fname = None
332 openflag = 'n'
333 openmethod = [bsddb.btopen]
335 class TestHashTable(TestBSDDB):
336 fname = test_support.TESTFN
337 openmethod = [bsddb.hashopen]
339 class TestHashTable_InMemory(TestBSDDB):
340 fname = None
341 openmethod = [bsddb.hashopen]
343 ## # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
344 ## # appears broken... at least on
345 ## # Solaris Intel - rmasse 1/97
347 def test_main(verbose=None):
348 test_support.run_unittest(
349 TestBTree,
350 TestHashTable,
351 TestBTree_InMemory,
352 TestHashTable_InMemory,
353 TestBTree_InMemory_Truncate,
356 if __name__ == "__main__":
357 test_main(verbose=True)