Credit Nir Aides for r77288
[python.git] / Lib / bsddb / test / test_queue.py
blob251a8cfdcc8273562a1eece4d7e63ff07c875423
1 """
2 TestCases for exercising a Queue DB.
3 """
5 import os, string
6 from pprint import pprint
7 import unittest
9 from test_all import db, verbose, get_new_database_path
11 #----------------------------------------------------------------------
13 class SimpleQueueTestCase(unittest.TestCase):
14 def setUp(self):
15 self.filename = get_new_database_path()
17 def tearDown(self):
18 try:
19 os.remove(self.filename)
20 except os.error:
21 pass
24 def test01_basic(self):
25 # Basic Queue tests using the deprecated DBCursor.consume method.
27 if verbose:
28 print '\n', '-=' * 30
29 print "Running %s.test01_basic..." % self.__class__.__name__
31 d = db.DB()
32 d.set_re_len(40) # Queues must be fixed length
33 d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
35 if verbose:
36 print "before appends" + '-' * 30
37 pprint(d.stat())
39 for x in string.letters:
40 d.append(x * 40)
42 self.assertEqual(len(d), len(string.letters))
44 d.put(100, "some more data")
45 d.put(101, "and some more ")
46 d.put(75, "out of order")
47 d.put(1, "replacement data")
49 self.assertEqual(len(d), len(string.letters)+3)
51 if verbose:
52 print "before close" + '-' * 30
53 pprint(d.stat())
55 d.close()
56 del d
57 d = db.DB()
58 d.open(self.filename)
60 if verbose:
61 print "after open" + '-' * 30
62 pprint(d.stat())
64 # Test "txn" as a positional parameter
65 d.append("one more", None)
66 # Test "txn" as a keyword parameter
67 d.append("another one", txn=None)
69 c = d.cursor()
71 if verbose:
72 print "after append" + '-' * 30
73 pprint(d.stat())
75 rec = c.consume()
76 while rec:
77 if verbose:
78 print rec
79 rec = c.consume()
80 c.close()
82 if verbose:
83 print "after consume loop" + '-' * 30
84 pprint(d.stat())
86 self.assertEqual(len(d), 0, \
87 "if you see this message then you need to rebuild " \
88 "Berkeley DB 3.1.17 with the patch in patches/qam_stat.diff")
90 d.close()
94 def test02_basicPost32(self):
95 # Basic Queue tests using the new DB.consume method in DB 3.2+
96 # (No cursor needed)
98 if verbose:
99 print '\n', '-=' * 30
100 print "Running %s.test02_basicPost32..." % self.__class__.__name__
102 if db.version() < (3, 2, 0):
103 if verbose:
104 print "Test not run, DB not new enough..."
105 return
107 d = db.DB()
108 d.set_re_len(40) # Queues must be fixed length
109 d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
111 if verbose:
112 print "before appends" + '-' * 30
113 pprint(d.stat())
115 for x in string.letters:
116 d.append(x * 40)
118 self.assertEqual(len(d), len(string.letters))
120 d.put(100, "some more data")
121 d.put(101, "and some more ")
122 d.put(75, "out of order")
123 d.put(1, "replacement data")
125 self.assertEqual(len(d), len(string.letters)+3)
127 if verbose:
128 print "before close" + '-' * 30
129 pprint(d.stat())
131 d.close()
132 del d
133 d = db.DB()
134 d.open(self.filename)
135 #d.set_get_returns_none(true)
137 if verbose:
138 print "after open" + '-' * 30
139 pprint(d.stat())
141 d.append("one more")
143 if verbose:
144 print "after append" + '-' * 30
145 pprint(d.stat())
147 rec = d.consume()
148 while rec:
149 if verbose:
150 print rec
151 rec = d.consume()
153 if verbose:
154 print "after consume loop" + '-' * 30
155 pprint(d.stat())
157 d.close()
161 #----------------------------------------------------------------------
163 def test_suite():
164 return unittest.makeSuite(SimpleQueueTestCase)
167 if __name__ == '__main__':
168 unittest.main(defaultTest='test_suite')