Credit Nir Aides for r77288
[python.git] / Lib / bsddb / test / test_distributed_transactions.py
blobf864b883362ef353844a5e7758c8207d07a7f0a3
1 """TestCases for distributed transactions.
2 """
4 import os
5 import unittest
7 from test_all import db, test_support, get_new_environment_path, \
8 get_new_database_path
10 try :
11 a=set()
12 except : # Python 2.3
13 from sets import Set as set
14 else :
15 del a
17 from test_all import verbose
19 #----------------------------------------------------------------------
21 class DBTxn_distributed(unittest.TestCase):
22 num_txns=1234
23 nosync=True
24 must_open_db=False
25 def _create_env(self, must_open_db) :
26 self.dbenv = db.DBEnv()
27 self.dbenv.set_tx_max(self.num_txns)
28 self.dbenv.set_lk_max_lockers(self.num_txns*2)
29 self.dbenv.set_lk_max_locks(self.num_txns*2)
30 self.dbenv.set_lk_max_objects(self.num_txns*2)
31 if self.nosync :
32 self.dbenv.set_flags(db.DB_TXN_NOSYNC,True)
33 self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_THREAD |
34 db.DB_RECOVER |
35 db.DB_INIT_TXN | db.DB_INIT_LOG | db.DB_INIT_MPOOL |
36 db.DB_INIT_LOCK, 0666)
37 self.db = db.DB(self.dbenv)
38 self.db.set_re_len(db.DB_XIDDATASIZE)
39 if must_open_db :
40 if db.version() > (4,1) :
41 txn=self.dbenv.txn_begin()
42 self.db.open(self.filename,
43 db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666,
44 txn=txn)
45 txn.commit()
46 else :
47 self.db.open(self.filename,
48 db.DB_QUEUE, db.DB_CREATE | db.DB_THREAD, 0666)
50 def setUp(self) :
51 self.homeDir = get_new_environment_path()
52 self.filename = "test"
53 return self._create_env(must_open_db=True)
55 def _destroy_env(self):
56 if self.nosync or (db.version()[:2] == (4,6)): # Known bug
57 self.dbenv.log_flush()
58 self.db.close()
59 self.dbenv.close()
61 def tearDown(self):
62 self._destroy_env()
63 test_support.rmtree(self.homeDir)
65 def _recreate_env(self,must_open_db) :
66 self._destroy_env()
67 self._create_env(must_open_db)
69 def test01_distributed_transactions(self) :
70 txns=set()
71 adapt = lambda x : x
72 import sys
73 if sys.version_info[0] >= 3 :
74 adapt = lambda x : bytes(x, "ascii")
75 # Create transactions, "prepare" them, and
76 # let them be garbage collected.
77 for i in xrange(self.num_txns) :
78 txn = self.dbenv.txn_begin()
79 gid = "%%%dd" %db.DB_XIDDATASIZE
80 gid = adapt(gid %i)
81 self.db.put(i, gid, txn=txn, flags=db.DB_APPEND)
82 txns.add(gid)
83 txn.prepare(gid)
84 del txn
86 self._recreate_env(self.must_open_db)
88 # Get "to be recovered" transactions but
89 # let them be garbage collected.
90 recovered_txns=self.dbenv.txn_recover()
91 self.assertEquals(self.num_txns,len(recovered_txns))
92 for gid,txn in recovered_txns :
93 self.assert_(gid in txns)
94 del txn
95 del recovered_txns
97 self._recreate_env(self.must_open_db)
99 # Get "to be recovered" transactions. Commit, abort and
100 # discard them.
101 recovered_txns=self.dbenv.txn_recover()
102 self.assertEquals(self.num_txns,len(recovered_txns))
103 discard_txns=set()
104 committed_txns=set()
105 state=0
106 for gid,txn in recovered_txns :
107 if state==0 or state==1:
108 committed_txns.add(gid)
109 txn.commit()
110 elif state==2 :
111 txn.abort()
112 elif state==3 :
113 txn.discard()
114 discard_txns.add(gid)
115 state=-1
116 state+=1
117 del txn
118 del recovered_txns
120 self._recreate_env(self.must_open_db)
122 # Verify the discarded transactions are still
123 # around, and dispose them.
124 recovered_txns=self.dbenv.txn_recover()
125 self.assertEquals(len(discard_txns),len(recovered_txns))
126 for gid,txn in recovered_txns :
127 txn.abort()
128 del txn
129 del recovered_txns
131 self._recreate_env(must_open_db=True)
133 # Be sure there are not pending transactions.
134 # Check also database size.
135 recovered_txns=self.dbenv.txn_recover()
136 self.assert_(len(recovered_txns)==0)
137 self.assertEquals(len(committed_txns),self.db.stat()["nkeys"])
139 class DBTxn_distributedSYNC(DBTxn_distributed):
140 nosync=False
142 class DBTxn_distributed_must_open_db(DBTxn_distributed):
143 must_open_db=True
145 class DBTxn_distributedSYNC_must_open_db(DBTxn_distributed):
146 nosync=False
147 must_open_db=True
149 #----------------------------------------------------------------------
151 def test_suite():
152 suite = unittest.TestSuite()
153 if db.version() >= (4,5) :
154 suite.addTest(unittest.makeSuite(DBTxn_distributed))
155 suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC))
156 if db.version() >= (4,6) :
157 suite.addTest(unittest.makeSuite(DBTxn_distributed_must_open_db))
158 suite.addTest(unittest.makeSuite(DBTxn_distributedSYNC_must_open_db))
159 return suite
162 if __name__ == '__main__':
163 unittest.main(defaultTest='test_suite')