1 """TestCases for distributed transactions.
7 from test_all
import db
, test_support
, get_new_environment_path
, \
13 from sets
import Set
as set
17 from test_all
import verbose
19 #----------------------------------------------------------------------
21 class DBTxn_distributed(unittest
.TestCase
):
25 def _create_env(self
, must_open_db
) :
26 self
.dbenv
= db
.DBEnv()
27 self
.dbenv
.set_tx_max(self
.num_txns
)
28 self
.dbenv
.set_lk_max_lockers(self
.num_txns
*2)
29 self
.dbenv
.set_lk_max_locks(self
.num_txns
*2)
30 self
.dbenv
.set_lk_max_objects(self
.num_txns
*2)
32 self
.dbenv
.set_flags(db
.DB_TXN_NOSYNC
,True)
33 self
.dbenv
.open(self
.homeDir
, db
.DB_CREATE | db
.DB_THREAD |
35 db
.DB_INIT_TXN | db
.DB_INIT_LOG | db
.DB_INIT_MPOOL |
36 db
.DB_INIT_LOCK
, 0666)
37 self
.db
= db
.DB(self
.dbenv
)
38 self
.db
.set_re_len(db
.DB_XIDDATASIZE
)
40 if db
.version() > (4,1) :
41 txn
=self
.dbenv
.txn_begin()
42 self
.db
.open(self
.filename
,
43 db
.DB_QUEUE
, db
.DB_CREATE | db
.DB_THREAD
, 0666,
47 self
.db
.open(self
.filename
,
48 db
.DB_QUEUE
, db
.DB_CREATE | db
.DB_THREAD
, 0666)
51 self
.homeDir
= get_new_environment_path()
52 self
.filename
= "test"
53 return self
._create
_env
(must_open_db
=True)
55 def _destroy_env(self
):
56 if self
.nosync
or (db
.version()[:2] == (4,6)): # Known bug
57 self
.dbenv
.log_flush()
63 test_support
.rmtree(self
.homeDir
)
65 def _recreate_env(self
,must_open_db
) :
67 self
._create
_env
(must_open_db
)
69 def test01_distributed_transactions(self
) :
73 if sys
.version_info
[0] >= 3 :
74 adapt
= lambda x
: bytes(x
, "ascii")
75 # Create transactions, "prepare" them, and
76 # let them be garbage collected.
77 for i
in xrange(self
.num_txns
) :
78 txn
= self
.dbenv
.txn_begin()
79 gid
= "%%%dd" %db
.DB_XIDDATASIZE
81 self
.db
.put(i
, gid
, txn
=txn
, flags
=db
.DB_APPEND
)
86 self
._recreate
_env
(self
.must_open_db
)
88 # Get "to be recovered" transactions but
89 # let them be garbage collected.
90 recovered_txns
=self
.dbenv
.txn_recover()
91 self
.assertEquals(self
.num_txns
,len(recovered_txns
))
92 for gid
,txn
in recovered_txns
:
93 self
.assert_(gid
in txns
)
97 self
._recreate
_env
(self
.must_open_db
)
99 # Get "to be recovered" transactions. Commit, abort and
101 recovered_txns
=self
.dbenv
.txn_recover()
102 self
.assertEquals(self
.num_txns
,len(recovered_txns
))
106 for gid
,txn
in recovered_txns
:
107 if state
==0 or state
==1:
108 committed_txns
.add(gid
)
114 discard_txns
.add(gid
)
120 self
._recreate
_env
(self
.must_open_db
)
122 # Verify the discarded transactions are still
123 # around, and dispose them.
124 recovered_txns
=self
.dbenv
.txn_recover()
125 self
.assertEquals(len(discard_txns
),len(recovered_txns
))
126 for gid
,txn
in recovered_txns
:
131 self
._recreate
_env
(must_open_db
=True)
133 # Be sure there are not pending transactions.
134 # Check also database size.
135 recovered_txns
=self
.dbenv
.txn_recover()
136 self
.assert_(len(recovered_txns
)==0)
137 self
.assertEquals(len(committed_txns
),self
.db
.stat()["nkeys"])
139 class DBTxn_distributedSYNC(DBTxn_distributed
):
142 class DBTxn_distributed_must_open_db(DBTxn_distributed
):
145 class DBTxn_distributedSYNC_must_open_db(DBTxn_distributed
):
149 #----------------------------------------------------------------------
152 suite
= unittest
.TestSuite()
153 if db
.version() >= (4,5) :
154 suite
.addTest(unittest
.makeSuite(DBTxn_distributed
))
155 suite
.addTest(unittest
.makeSuite(DBTxn_distributedSYNC
))
156 if db
.version() >= (4,6) :
157 suite
.addTest(unittest
.makeSuite(DBTxn_distributed_must_open_db
))
158 suite
.addTest(unittest
.makeSuite(DBTxn_distributedSYNC_must_open_db
))
162 if __name__
== '__main__':
163 unittest
.main(defaultTest
='test_suite')