1 """TestCases for distributed transactions.
8 from test_all
import db
, test_support
, have_threads
, verbose
, \
9 get_new_environment_path
, get_new_database_path
12 #----------------------------------------------------------------------
14 class DBReplicationManager(unittest
.TestCase
):
16 if sys
.version_info
[:3] < (2, 4, 0):
17 def assertTrue(self
, expr
, msg
=None):
18 self
.failUnless(expr
,msg
=msg
)
21 self
.homeDirMaster
= get_new_environment_path()
22 self
.homeDirClient
= get_new_environment_path()
24 self
.dbenvMaster
= db
.DBEnv()
25 self
.dbenvClient
= db
.DBEnv()
27 # Must use "DB_THREAD" because the Replication Manager will
28 # be executed in other threads but will use the same environment.
29 # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
30 self
.dbenvMaster
.open(self
.homeDirMaster
, db
.DB_CREATE | db
.DB_INIT_TXN
31 | db
.DB_INIT_LOG | db
.DB_INIT_MPOOL | db
.DB_INIT_LOCK |
32 db
.DB_INIT_REP | db
.DB_RECOVER | db
.DB_THREAD
, 0666)
33 self
.dbenvClient
.open(self
.homeDirClient
, db
.DB_CREATE | db
.DB_INIT_TXN
34 | db
.DB_INIT_LOG | db
.DB_INIT_MPOOL | db
.DB_INIT_LOCK |
35 db
.DB_INIT_REP | db
.DB_RECOVER | db
.DB_THREAD
, 0666)
37 self
.confirmed_master
=self
.client_startupdone
=False
38 def confirmed_master(a
,b
,c
) :
39 if b
==db
.DB_EVENT_REP_MASTER
:
40 self
.confirmed_master
=True
42 def client_startupdone(a
,b
,c
) :
43 if b
==db
.DB_EVENT_REP_STARTUPDONE
:
44 self
.client_startupdone
=True
46 self
.dbenvMaster
.set_event_notify(confirmed_master
)
47 self
.dbenvClient
.set_event_notify(client_startupdone
)
49 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
50 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
51 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
52 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
54 self
.dbMaster
= self
.dbClient
= None
62 self
.dbenvClient
.close()
63 self
.dbenvMaster
.close()
64 test_support
.rmtree(self
.homeDirClient
)
65 test_support
.rmtree(self
.homeDirMaster
)
67 def test01_basic_replication(self
) :
68 master_port
= test_support
.find_unused_port()
69 self
.dbenvMaster
.repmgr_set_local_site("127.0.0.1", master_port
)
70 client_port
= test_support
.find_unused_port()
71 self
.dbenvClient
.repmgr_set_local_site("127.0.0.1", client_port
)
72 self
.dbenvMaster
.repmgr_add_remote_site("127.0.0.1", client_port
)
73 self
.dbenvClient
.repmgr_add_remote_site("127.0.0.1", master_port
)
74 self
.dbenvMaster
.rep_set_nsites(2)
75 self
.dbenvClient
.rep_set_nsites(2)
76 self
.dbenvMaster
.rep_set_priority(10)
77 self
.dbenvClient
.rep_set_priority(0)
79 self
.dbenvMaster
.rep_set_timeout(db
.DB_REP_CONNECTION_RETRY
,100123)
80 self
.dbenvClient
.rep_set_timeout(db
.DB_REP_CONNECTION_RETRY
,100321)
81 self
.assertEquals(self
.dbenvMaster
.rep_get_timeout(
82 db
.DB_REP_CONNECTION_RETRY
), 100123)
83 self
.assertEquals(self
.dbenvClient
.rep_get_timeout(
84 db
.DB_REP_CONNECTION_RETRY
), 100321)
86 self
.dbenvMaster
.rep_set_timeout(db
.DB_REP_ELECTION_TIMEOUT
, 100234)
87 self
.dbenvClient
.rep_set_timeout(db
.DB_REP_ELECTION_TIMEOUT
, 100432)
88 self
.assertEquals(self
.dbenvMaster
.rep_get_timeout(
89 db
.DB_REP_ELECTION_TIMEOUT
), 100234)
90 self
.assertEquals(self
.dbenvClient
.rep_get_timeout(
91 db
.DB_REP_ELECTION_TIMEOUT
), 100432)
93 self
.dbenvMaster
.rep_set_timeout(db
.DB_REP_ELECTION_RETRY
, 100345)
94 self
.dbenvClient
.rep_set_timeout(db
.DB_REP_ELECTION_RETRY
, 100543)
95 self
.assertEquals(self
.dbenvMaster
.rep_get_timeout(
96 db
.DB_REP_ELECTION_RETRY
), 100345)
97 self
.assertEquals(self
.dbenvClient
.rep_get_timeout(
98 db
.DB_REP_ELECTION_RETRY
), 100543)
100 self
.dbenvMaster
.repmgr_set_ack_policy(db
.DB_REPMGR_ACKS_ALL
)
101 self
.dbenvClient
.repmgr_set_ack_policy(db
.DB_REPMGR_ACKS_ALL
)
103 self
.dbenvMaster
.repmgr_start(1, db
.DB_REP_MASTER
);
104 self
.dbenvClient
.repmgr_start(1, db
.DB_REP_CLIENT
);
106 self
.assertEquals(self
.dbenvMaster
.rep_get_nsites(),2)
107 self
.assertEquals(self
.dbenvClient
.rep_get_nsites(),2)
108 self
.assertEquals(self
.dbenvMaster
.rep_get_priority(),10)
109 self
.assertEquals(self
.dbenvClient
.rep_get_priority(),0)
110 self
.assertEquals(self
.dbenvMaster
.repmgr_get_ack_policy(),
111 db
.DB_REPMGR_ACKS_ALL
)
112 self
.assertEquals(self
.dbenvClient
.repmgr_get_ack_policy(),
113 db
.DB_REPMGR_ACKS_ALL
)
115 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
116 # is not generated if the master has no new transactions.
117 # This is solved in BDB 4.6 (#15542).
119 timeout
= time
.time()+30
120 while (time
.time()<timeout
) and not (self
.confirmed_master
and self
.client_startupdone
) :
122 # this fails on Windows as self.client_startupdone never gets set
123 # to True - see bug 3892. BUT - even though this assertion
124 # fails on Windows the rest of the test passes - so to prove
125 # that we let the rest of the test run. Sadly we can't
126 # make use of raising TestSkipped() here (unittest still
127 # reports it as an error), so we yell to stderr.
129 if sys
.platform
=="win32":
130 print >> sys
.stderr
, \
131 "XXX - windows bsddb replication fails on windows and is skipped"
132 print >> sys
.stderr
, "XXX - Please see issue #3892"
134 self
.assertTrue(time
.time()<timeout
)
136 d
= self
.dbenvMaster
.repmgr_site_list()
137 self
.assertEquals(len(d
), 1)
138 self
.assertEquals(d
[0][0], "127.0.0.1")
139 self
.assertEquals(d
[0][1], client_port
)
140 self
.assertTrue((d
[0][2]==db
.DB_REPMGR_CONNECTED
) or \
141 (d
[0][2]==db
.DB_REPMGR_DISCONNECTED
))
143 d
= self
.dbenvClient
.repmgr_site_list()
144 self
.assertEquals(len(d
), 1)
145 self
.assertEquals(d
[0][0], "127.0.0.1")
146 self
.assertEquals(d
[0][1], master_port
)
147 self
.assertTrue((d
[0][2]==db
.DB_REPMGR_CONNECTED
) or \
148 (d
[0][2]==db
.DB_REPMGR_DISCONNECTED
))
150 if db
.version() >= (4,6) :
151 d
= self
.dbenvMaster
.repmgr_stat(flags
=db
.DB_STAT_CLEAR
);
152 self
.assertTrue("msgs_queued" in d
)
154 self
.dbMaster
=db
.DB(self
.dbenvMaster
)
155 txn
=self
.dbenvMaster
.txn_begin()
156 self
.dbMaster
.open("test", db
.DB_HASH
, db
.DB_CREATE
, 0666, txn
=txn
)
160 timeout
=time
.time()+10
161 while (time
.time()<timeout
) and \
162 not (os
.path
.exists(os
.path
.join(self
.homeDirClient
,"test"))) :
165 self
.dbClient
=db
.DB(self
.dbenvClient
)
167 txn
=self
.dbenvClient
.txn_begin()
169 self
.dbClient
.open("test", db
.DB_HASH
, flags
=db
.DB_RDONLY
,
171 except db
.DBRepHandleDeadError
:
173 self
.dbClient
.close()
174 self
.dbClient
=db
.DB(self
.dbenvClient
)
180 txn
=self
.dbenvMaster
.txn_begin()
181 self
.dbMaster
.put("ABC", "123", txn
=txn
)
184 timeout
=time
.time()+10
186 while (time
.time()<timeout
) and (v
==None) :
187 txn
=self
.dbenvClient
.txn_begin()
188 v
=self
.dbClient
.get("ABC", txn
=txn
)
192 self
.assertTrue(time
.time()<timeout
)
193 self
.assertEquals("123", v
)
195 txn
=self
.dbenvMaster
.txn_begin()
196 self
.dbMaster
.delete("ABC", txn
=txn
)
198 timeout
=time
.time()+10
199 while (time
.time()<timeout
) and (v
!=None) :
200 txn
=self
.dbenvClient
.txn_begin()
201 v
=self
.dbClient
.get("ABC", txn
=txn
)
205 self
.assertTrue(time
.time()<timeout
)
206 self
.assertEquals(None, v
)
208 class DBBaseReplication(DBReplicationManager
):
210 DBReplicationManager
.setUp(self
)
211 def confirmed_master(a
,b
,c
) :
212 if (b
== db
.DB_EVENT_REP_MASTER
) or (b
== db
.DB_EVENT_REP_ELECTED
) :
213 self
.confirmed_master
= True
215 def client_startupdone(a
,b
,c
) :
216 if b
== db
.DB_EVENT_REP_STARTUPDONE
:
217 self
.client_startupdone
= True
219 self
.dbenvMaster
.set_event_notify(confirmed_master
)
220 self
.dbenvClient
.set_event_notify(client_startupdone
)
223 self
.m2c
= Queue
.Queue()
224 self
.c2m
= Queue
.Queue()
226 # There are only two nodes, so we don't need to
227 # do any routing decision
228 def m2c(dbenv
, control
, rec
, lsnp
, envid
, flags
) :
229 self
.m2c
.put((control
, rec
))
231 def c2m(dbenv
, control
, rec
, lsnp
, envid
, flags
) :
232 self
.c2m
.put((control
, rec
))
234 self
.dbenvMaster
.rep_set_transport(13,m2c
)
235 self
.dbenvMaster
.rep_set_priority(10)
236 self
.dbenvClient
.rep_set_transport(3,c2m
)
237 self
.dbenvClient
.rep_set_priority(0)
239 self
.assertEquals(self
.dbenvMaster
.rep_get_priority(),10)
240 self
.assertEquals(self
.dbenvClient
.rep_get_priority(),0)
242 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
243 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
244 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
245 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
247 def thread_master() :
248 return self
.thread_do(self
.dbenvMaster
, self
.c2m
, 3,
249 self
.master_doing_election
, True)
251 def thread_client() :
252 return self
.thread_do(self
.dbenvClient
, self
.m2c
, 13,
253 self
.client_doing_election
, False)
255 from threading
import Thread
256 t_m
=Thread(target
=thread_master
)
257 t_c
=Thread(target
=thread_client
)
259 if sys
.version_info
[0] < 3 :
269 self
.dbMaster
= self
.dbClient
= None
271 self
.master_doing_election
=[False]
272 self
.client_doing_election
=[False]
277 self
.dbClient
.close()
279 self
.dbMaster
.close()
284 self
.dbenvClient
.close()
285 self
.dbenvMaster
.close()
286 test_support
.rmtree(self
.homeDirClient
)
287 test_support
.rmtree(self
.homeDirMaster
)
289 def basic_rep_threading(self
) :
290 self
.dbenvMaster
.rep_start(flags
=db
.DB_REP_MASTER
)
291 self
.dbenvClient
.rep_start(flags
=db
.DB_REP_CLIENT
)
293 def thread_do(env
, q
, envid
, election_status
, must_be_master
) :
296 if v
== None : return
297 env
.rep_process_message(v
[0], v
[1], envid
)
299 self
.thread_do
= thread_do
304 def test01_basic_replication(self
) :
305 self
.basic_rep_threading()
307 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
308 # is not generated if the master has no new transactions.
309 # This is solved in BDB 4.6 (#15542).
311 timeout
= time
.time()+60
312 while (time
.time()<timeout
) and not (self
.confirmed_master
and
313 self
.client_startupdone
) :
315 self
.assertTrue(time
.time()<timeout
)
317 self
.dbMaster
=db
.DB(self
.dbenvMaster
)
318 txn
=self
.dbenvMaster
.txn_begin()
319 self
.dbMaster
.open("test", db
.DB_HASH
, db
.DB_CREATE
, 0666, txn
=txn
)
323 timeout
=time
.time()+10
324 while (time
.time()<timeout
) and \
325 not (os
.path
.exists(os
.path
.join(self
.homeDirClient
,"test"))) :
328 self
.dbClient
=db
.DB(self
.dbenvClient
)
330 txn
=self
.dbenvClient
.txn_begin()
332 self
.dbClient
.open("test", db
.DB_HASH
, flags
=db
.DB_RDONLY
,
334 except db
.DBRepHandleDeadError
:
336 self
.dbClient
.close()
337 self
.dbClient
=db
.DB(self
.dbenvClient
)
343 txn
=self
.dbenvMaster
.txn_begin()
344 self
.dbMaster
.put("ABC", "123", txn
=txn
)
347 timeout
=time
.time()+10
349 while (time
.time()<timeout
) and (v
==None) :
350 txn
=self
.dbenvClient
.txn_begin()
351 v
=self
.dbClient
.get("ABC", txn
=txn
)
355 self
.assertTrue(time
.time()<timeout
)
356 self
.assertEquals("123", v
)
358 txn
=self
.dbenvMaster
.txn_begin()
359 self
.dbMaster
.delete("ABC", txn
=txn
)
361 timeout
=time
.time()+10
362 while (time
.time()<timeout
) and (v
!=None) :
363 txn
=self
.dbenvClient
.txn_begin()
364 v
=self
.dbClient
.get("ABC", txn
=txn
)
368 self
.assertTrue(time
.time()<timeout
)
369 self
.assertEquals(None, v
)
371 if db
.version() >= (4,7) :
372 def test02_test_request(self
) :
373 self
.basic_rep_threading()
374 (minimum
, maximum
) = self
.dbenvClient
.rep_get_request()
375 self
.dbenvClient
.rep_set_request(minimum
-1, maximum
+1)
376 self
.assertEqual(self
.dbenvClient
.rep_get_request(),
377 (minimum
-1, maximum
+1))
379 if db
.version() >= (4,6) :
380 def test03_master_election(self
) :
381 # Get ready to hold an election
382 #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
383 self
.dbenvMaster
.rep_start(flags
=db
.DB_REP_CLIENT
)
384 self
.dbenvClient
.rep_start(flags
=db
.DB_REP_CLIENT
)
386 def thread_do(env
, q
, envid
, election_status
, must_be_master
) :
389 if v
== None : return
390 r
= env
.rep_process_message(v
[0],v
[1],envid
)
391 if must_be_master
and self
.confirmed_master
:
392 self
.dbenvMaster
.rep_start(flags
= db
.DB_REP_MASTER
)
393 must_be_master
= False
395 if r
[0] == db
.DB_REP_HOLDELECTION
:
400 election_status
[0] = False
402 except db
.DBRepUnavailError
:
404 if not election_status
[0] and not self
.confirmed_master
:
405 from threading
import Thread
406 election_status
[0] = True
407 t
=Thread(target
=elect
)
409 if sys
.version_info
[0] < 3 :
415 self
.thread_do
= thread_do
420 self
.dbenvMaster
.rep_set_timeout(db
.DB_REP_ELECTION_TIMEOUT
, 50000)
421 self
.dbenvClient
.rep_set_timeout(db
.DB_REP_ELECTION_TIMEOUT
, 50000)
422 self
.client_doing_election
[0] = True
425 self
.dbenvClient
.rep_elect(2, 1)
426 self
.client_doing_election
[0] = False
428 except db
.DBRepUnavailError
:
431 self
.assertTrue(self
.confirmed_master
)
433 #----------------------------------------------------------------------
436 suite
= unittest
.TestSuite()
437 if db
.version() >= (4, 6) :
440 dbenv
.repmgr_get_ack_policy()
441 ReplicationManager_available
=True
443 ReplicationManager_available
=False
446 if ReplicationManager_available
:
447 suite
.addTest(unittest
.makeSuite(DBReplicationManager
))
450 suite
.addTest(unittest
.makeSuite(DBBaseReplication
))
455 if __name__
== '__main__':
456 unittest
.main(defaultTest
='test_suite')