4 from test_all
import db
, test_support
, get_new_environment_path
, get_new_database_path
7 class DBSequenceTest(unittest
.TestCase
):
9 if sys
.version_info
[:3] < (2, 4, 0):
10 def assertTrue(self
, expr
, msg
=None):
11 self
.failUnless(expr
,msg
=msg
)
14 self
.int_32_max
= 0x100000000
15 self
.homeDir
= get_new_environment_path()
16 self
.filename
= "test"
18 self
.dbenv
= db
.DBEnv()
19 self
.dbenv
.open(self
.homeDir
, db
.DB_CREATE | db
.DB_INIT_MPOOL
, 0666)
20 self
.d
= db
.DB(self
.dbenv
)
21 self
.d
.open(self
.filename
, db
.DB_BTREE
, db
.DB_CREATE
, 0666)
24 if hasattr(self
, 'seq'):
27 if hasattr(self
, 'd'):
30 if hasattr(self
, 'dbenv'):
34 test_support
.rmtree(self
.homeDir
)
37 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
38 start_value
= 10 * self
.int_32_max
39 self
.assertEqual(0xA00000000, start_value
)
40 self
.assertEquals(None, self
.seq
.init_value(start_value
))
41 self
.assertEquals(None, self
.seq
.open(key
='id', txn
=None, flags
=db
.DB_CREATE
))
42 self
.assertEquals(start_value
, self
.seq
.get(5))
43 self
.assertEquals(start_value
+ 5, self
.seq
.get())
45 def test_remove(self
):
46 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
47 self
.assertEquals(None, self
.seq
.open(key
='foo', txn
=None, flags
=db
.DB_CREATE
))
48 self
.assertEquals(None, self
.seq
.remove(txn
=None, flags
=0))
51 def test_get_key(self
):
52 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
54 self
.assertEquals(None, self
.seq
.open(key
=key
, txn
=None, flags
=db
.DB_CREATE
))
55 self
.assertEquals(key
, self
.seq
.get_key())
57 def test_get_dbp(self
):
58 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
59 self
.assertEquals(None, self
.seq
.open(key
='foo', txn
=None, flags
=db
.DB_CREATE
))
60 self
.assertEquals(self
.d
, self
.seq
.get_dbp())
62 def test_cachesize(self
):
63 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
65 self
.assertEquals(None, self
.seq
.set_cachesize(cashe_size
))
66 self
.assertEquals(None, self
.seq
.open(key
='foo', txn
=None, flags
=db
.DB_CREATE
))
67 self
.assertEquals(cashe_size
, self
.seq
.get_cachesize())
70 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
71 flag
= db
.DB_SEQ_WRAP
;
72 self
.assertEquals(None, self
.seq
.set_flags(flag
))
73 self
.assertEquals(None, self
.seq
.open(key
='foo', txn
=None, flags
=db
.DB_CREATE
))
74 self
.assertEquals(flag
, self
.seq
.get_flags() & flag
)
77 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
78 seq_range
= (10 * self
.int_32_max
, 11 * self
.int_32_max
- 1)
79 self
.assertEquals(None, self
.seq
.set_range(seq_range
))
80 self
.seq
.init_value(seq_range
[0])
81 self
.assertEquals(None, self
.seq
.open(key
='foo', txn
=None, flags
=db
.DB_CREATE
))
82 self
.assertEquals(seq_range
, self
.seq
.get_range())
85 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
86 self
.assertEquals(None, self
.seq
.open(key
='foo', txn
=None, flags
=db
.DB_CREATE
))
87 stat
= self
.seq
.stat()
88 for param
in ('nowait', 'min', 'max', 'value', 'current',
89 'flags', 'cache_size', 'last_value', 'wait'):
90 self
.assertTrue(param
in stat
, "parameter %s isn't in stat info" % param
)
92 if db
.version() >= (4,7) :
93 # This code checks a crash solved in Berkeley DB 4.7
94 def test_stat_crash(self
) :
96 d
.open(None,dbtype
=db
.DB_HASH
,flags
=db
.DB_CREATE
) # In RAM
97 seq
= db
.DBSequence(d
, flags
=0)
99 self
.assertRaises(db
.DBNotFoundError
, seq
.open,
100 key
='id', txn
=None, flags
=0)
102 self
.assertRaises(db
.DBInvalidArgError
, seq
.stat
)
106 def test_64bits(self
) :
107 # We don't use both extremes because they are problematic
108 value_plus
=(1L<<63)-2
109 self
.assertEquals(9223372036854775806L,value_plus
)
110 value_minus
=(-1L<<63)+1 # Two complement
111 self
.assertEquals(-9223372036854775807L,value_minus
)
112 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
113 self
.assertEquals(None, self
.seq
.init_value(value_plus
-1))
114 self
.assertEquals(None, self
.seq
.open(key
='id', txn
=None,
116 self
.assertEquals(value_plus
-1, self
.seq
.get(1))
117 self
.assertEquals(value_plus
, self
.seq
.get(1))
119 self
.seq
.remove(txn
=None, flags
=0)
121 self
.seq
= db
.DBSequence(self
.d
, flags
=0)
122 self
.assertEquals(None, self
.seq
.init_value(value_minus
))
123 self
.assertEquals(None, self
.seq
.open(key
='id', txn
=None,
125 self
.assertEquals(value_minus
, self
.seq
.get(1))
126 self
.assertEquals(value_minus
+1, self
.seq
.get(1))
128 def test_multiple_close(self
):
129 self
.seq
= db
.DBSequence(self
.d
)
130 self
.seq
.close() # You can close a Sequence multiple times
135 suite
= unittest
.TestSuite()
136 if db
.version() >= (4,3):
137 suite
.addTest(unittest
.makeSuite(DBSequenceTest
))
141 if __name__
== '__main__':
142 unittest
.main(defaultTest
='test_suite')