Issue #6845: Add restart support for binary upload in ftplib. The
[python.git] / Lib / bsddb / test / test_recno.py
blob895be8a6d058bdb6aeec784e0ef8199824a7aa40
1 """TestCases for exercising a Recno DB.
2 """
4 import os
5 import errno
6 from pprint import pprint
7 import unittest
9 from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path
11 letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
14 #----------------------------------------------------------------------
16 class SimpleRecnoTestCase(unittest.TestCase):
17 import sys
18 if sys.version_info[:3] < (2, 4, 0):
19 def assertFalse(self, expr, msg=None):
20 self.failIf(expr,msg=msg)
22 def setUp(self):
23 self.filename = get_new_database_path()
24 self.homeDir = None
26 def tearDown(self):
27 test_support.unlink(self.filename)
28 if self.homeDir:
29 test_support.rmtree(self.homeDir)
31 def test01_basic(self):
32 d = db.DB()
34 get_returns_none = d.set_get_returns_none(2)
35 d.set_get_returns_none(get_returns_none)
37 d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
39 for x in letters:
40 recno = d.append(x * 60)
41 self.assertEqual(type(recno), type(0))
42 self.assert_(recno >= 1)
43 if verbose:
44 print recno,
46 if verbose: print
48 stat = d.stat()
49 if verbose:
50 pprint(stat)
52 for recno in range(1, len(d)+1):
53 data = d[recno]
54 if verbose:
55 print data
57 self.assertEqual(type(data), type(""))
58 self.assertEqual(data, d.get(recno))
60 try:
61 data = d[0] # This should raise a KeyError!?!?!
62 except db.DBInvalidArgError, val:
63 import sys
64 if sys.version_info[0] < 3 :
65 self.assertEqual(val[0], db.EINVAL)
66 else :
67 self.assertEqual(val.args[0], db.EINVAL)
68 if verbose: print val
69 else:
70 self.fail("expected exception")
72 # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2)
73 try:
74 d.has_key(0)
75 except db.DBError, val:
76 pass
77 else:
78 self.fail("has_key did not raise a proper exception")
80 try:
81 data = d[100]
82 except KeyError:
83 pass
84 else:
85 self.fail("expected exception")
87 try:
88 data = d.get(100)
89 except db.DBNotFoundError, val:
90 if get_returns_none:
91 self.fail("unexpected exception")
92 else:
93 self.assertEqual(data, None)
95 keys = d.keys()
96 if verbose:
97 print keys
98 self.assertEqual(type(keys), type([]))
99 self.assertEqual(type(keys[0]), type(123))
100 self.assertEqual(len(keys), len(d))
102 items = d.items()
103 if verbose:
104 pprint(items)
105 self.assertEqual(type(items), type([]))
106 self.assertEqual(type(items[0]), type(()))
107 self.assertEqual(len(items[0]), 2)
108 self.assertEqual(type(items[0][0]), type(123))
109 self.assertEqual(type(items[0][1]), type(""))
110 self.assertEqual(len(items), len(d))
112 self.assert_(d.has_key(25))
114 del d[25]
115 self.assertFalse(d.has_key(25))
117 d.delete(13)
118 self.assertFalse(d.has_key(13))
120 data = d.get_both(26, "z" * 60)
121 self.assertEqual(data, "z" * 60, 'was %r' % data)
122 if verbose:
123 print data
125 fd = d.fd()
126 if verbose:
127 print fd
129 c = d.cursor()
130 rec = c.first()
131 while rec:
132 if verbose:
133 print rec
134 rec = c.next()
136 c.set(50)
137 rec = c.current()
138 if verbose:
139 print rec
141 c.put(-1, "a replacement record", db.DB_CURRENT)
143 c.set(50)
144 rec = c.current()
145 self.assertEqual(rec, (50, "a replacement record"))
146 if verbose:
147 print rec
149 rec = c.set_range(30)
150 if verbose:
151 print rec
153 # test that non-existent key lookups work (and that
154 # DBC_set_range doesn't have a memleak under valgrind)
155 rec = c.set_range(999999)
156 self.assertEqual(rec, None)
157 if verbose:
158 print rec
160 c.close()
161 d.close()
163 d = db.DB()
164 d.open(self.filename)
165 c = d.cursor()
167 # put a record beyond the consecutive end of the recno's
168 d[100] = "way out there"
169 self.assertEqual(d[100], "way out there")
171 try:
172 data = d[99]
173 except KeyError:
174 pass
175 else:
176 self.fail("expected exception")
178 try:
179 d.get(99)
180 except db.DBKeyEmptyError, val:
181 if get_returns_none:
182 self.fail("unexpected DBKeyEmptyError exception")
183 else:
184 self.assertEqual(val[0], db.DB_KEYEMPTY)
185 if verbose: print val
186 else:
187 if not get_returns_none:
188 self.fail("expected exception")
190 rec = c.set(40)
191 while rec:
192 if verbose:
193 print rec
194 rec = c.next()
196 c.close()
197 d.close()
199 def test02_WithSource(self):
201 A Recno file that is given a "backing source file" is essentially a
202 simple ASCII file. Normally each record is delimited by \n and so is
203 just a line in the file, but you can set a different record delimiter
204 if needed.
206 homeDir = get_new_environment_path()
207 self.homeDir = homeDir
208 source = os.path.join(homeDir, 'test_recno.txt')
209 if not os.path.isdir(homeDir):
210 os.mkdir(homeDir)
211 f = open(source, 'w') # create the file
212 f.close()
214 d = db.DB()
215 # This is the default value, just checking if both int
216 d.set_re_delim(0x0A)
217 d.set_re_delim('\n') # and char can be used...
218 d.set_re_source(source)
219 d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
221 data = "The quick brown fox jumped over the lazy dog".split()
222 for datum in data:
223 d.append(datum)
224 d.sync()
225 d.close()
227 # get the text from the backing source
228 text = open(source, 'r').read()
229 text = text.strip()
230 if verbose:
231 print text
232 print data
233 print text.split('\n')
235 self.assertEqual(text.split('\n'), data)
237 # open as a DB again
238 d = db.DB()
239 d.set_re_source(source)
240 d.open(self.filename, db.DB_RECNO)
242 d[3] = 'reddish-brown'
243 d[8] = 'comatose'
245 d.sync()
246 d.close()
248 text = open(source, 'r').read()
249 text = text.strip()
250 if verbose:
251 print text
252 print text.split('\n')
254 self.assertEqual(text.split('\n'),
255 "The quick reddish-brown fox jumped over the comatose dog".split())
257 def test03_FixedLength(self):
258 d = db.DB()
259 d.set_re_len(40) # fixed length records, 40 bytes long
260 d.set_re_pad('-') # sets the pad character...
261 d.set_re_pad(45) # ...test both int and char
262 d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
264 for x in letters:
265 d.append(x * 35) # These will be padded
267 d.append('.' * 40) # this one will be exact
269 try: # this one will fail
270 d.append('bad' * 20)
271 except db.DBInvalidArgError, val:
272 import sys
273 if sys.version_info[0] < 3 :
274 self.assertEqual(val[0], db.EINVAL)
275 else :
276 self.assertEqual(val.args[0], db.EINVAL)
277 if verbose: print val
278 else:
279 self.fail("expected exception")
281 c = d.cursor()
282 rec = c.first()
283 while rec:
284 if verbose:
285 print rec
286 rec = c.next()
288 c.close()
289 d.close()
292 #----------------------------------------------------------------------
295 def test_suite():
296 return unittest.makeSuite(SimpleRecnoTestCase)
299 if __name__ == '__main__':
300 unittest.main(defaultTest='test_suite')