Credit Nir Aides for r77288
[python.git] / Lib / bsddb / test / test_replication.py
blobb3775c733037c1cf8393fea761df34507291997e
1 """TestCases for distributed transactions.
2 """
4 import os
5 import time
6 import unittest
8 from test_all import db, test_support, have_threads, verbose, \
9 get_new_environment_path, get_new_database_path
12 #----------------------------------------------------------------------
14 class DBReplicationManager(unittest.TestCase):
15 import sys
16 if sys.version_info[:3] < (2, 4, 0):
17 def assertTrue(self, expr, msg=None):
18 self.failUnless(expr,msg=msg)
20 def setUp(self) :
21 self.homeDirMaster = get_new_environment_path()
22 self.homeDirClient = get_new_environment_path()
24 self.dbenvMaster = db.DBEnv()
25 self.dbenvClient = db.DBEnv()
27 # Must use "DB_THREAD" because the Replication Manager will
28 # be executed in other threads but will use the same environment.
29 # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
30 self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN
31 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
32 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
33 self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN
34 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
35 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
37 self.confirmed_master=self.client_startupdone=False
38 def confirmed_master(a,b,c) :
39 if b==db.DB_EVENT_REP_MASTER :
40 self.confirmed_master=True
42 def client_startupdone(a,b,c) :
43 if b==db.DB_EVENT_REP_STARTUPDONE :
44 self.client_startupdone=True
46 self.dbenvMaster.set_event_notify(confirmed_master)
47 self.dbenvClient.set_event_notify(client_startupdone)
49 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
50 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
51 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
52 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
54 self.dbMaster = self.dbClient = None
57 def tearDown(self):
58 if self.dbClient :
59 self.dbClient.close()
60 if self.dbMaster :
61 self.dbMaster.close()
62 self.dbenvClient.close()
63 self.dbenvMaster.close()
64 test_support.rmtree(self.homeDirClient)
65 test_support.rmtree(self.homeDirMaster)
67 def test01_basic_replication(self) :
68 master_port = test_support.find_unused_port()
69 self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
70 client_port = test_support.find_unused_port()
71 self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port)
72 self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port)
73 self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port)
74 self.dbenvMaster.rep_set_nsites(2)
75 self.dbenvClient.rep_set_nsites(2)
76 self.dbenvMaster.rep_set_priority(10)
77 self.dbenvClient.rep_set_priority(0)
79 self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
80 self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
81 self.assertEquals(self.dbenvMaster.rep_get_timeout(
82 db.DB_REP_CONNECTION_RETRY), 100123)
83 self.assertEquals(self.dbenvClient.rep_get_timeout(
84 db.DB_REP_CONNECTION_RETRY), 100321)
86 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
87 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
88 self.assertEquals(self.dbenvMaster.rep_get_timeout(
89 db.DB_REP_ELECTION_TIMEOUT), 100234)
90 self.assertEquals(self.dbenvClient.rep_get_timeout(
91 db.DB_REP_ELECTION_TIMEOUT), 100432)
93 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
94 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
95 self.assertEquals(self.dbenvMaster.rep_get_timeout(
96 db.DB_REP_ELECTION_RETRY), 100345)
97 self.assertEquals(self.dbenvClient.rep_get_timeout(
98 db.DB_REP_ELECTION_RETRY), 100543)
100 self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
101 self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
103 self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER);
104 self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT);
106 self.assertEquals(self.dbenvMaster.rep_get_nsites(),2)
107 self.assertEquals(self.dbenvClient.rep_get_nsites(),2)
108 self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
109 self.assertEquals(self.dbenvClient.rep_get_priority(),0)
110 self.assertEquals(self.dbenvMaster.repmgr_get_ack_policy(),
111 db.DB_REPMGR_ACKS_ALL)
112 self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(),
113 db.DB_REPMGR_ACKS_ALL)
115 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
116 # is not generated if the master has no new transactions.
117 # This is solved in BDB 4.6 (#15542).
118 import time
119 timeout = time.time()+60
120 while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
121 time.sleep(0.02)
122 # this fails on Windows as self.client_startupdone never gets set
123 # to True - see bug 3892. BUT - even though this assertion
124 # fails on Windows the rest of the test passes - so to prove
125 # that we let the rest of the test run. Sadly we can't
126 # make use of raising TestSkipped() here (unittest still
127 # reports it as an error), so we yell to stderr.
128 import sys
129 if sys.platform=="win32":
130 print >> sys.stderr, \
131 "XXX - windows bsddb replication fails on windows and is skipped"
132 print >> sys.stderr, "XXX - Please see issue #3892"
133 # It also fails irregularly on other platforms, and again the
134 # rest of the tests pass. Since bsddb support is unmaintained, and
135 # is gone in py3k, we just emit a warning instead of a test failure
136 # so as to improve buildbot stability.
137 elif time.time()>timeout:
138 print >> sys.stderr, \
139 "XXX - timeout before startup confirmed, see issue #3892."
141 d = self.dbenvMaster.repmgr_site_list()
142 self.assertEquals(len(d), 1)
143 self.assertEquals(d[0][0], "127.0.0.1")
144 self.assertEquals(d[0][1], client_port)
145 self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \
146 (d[0][2]==db.DB_REPMGR_DISCONNECTED))
148 d = self.dbenvClient.repmgr_site_list()
149 self.assertEquals(len(d), 1)
150 self.assertEquals(d[0][0], "127.0.0.1")
151 self.assertEquals(d[0][1], master_port)
152 self.assertTrue((d[0][2]==db.DB_REPMGR_CONNECTED) or \
153 (d[0][2]==db.DB_REPMGR_DISCONNECTED))
155 if db.version() >= (4,6) :
156 d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
157 self.assertTrue("msgs_queued" in d)
159 self.dbMaster=db.DB(self.dbenvMaster)
160 txn=self.dbenvMaster.txn_begin()
161 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
162 txn.commit()
164 import time,os.path
165 timeout=time.time()+10
166 while (time.time()<timeout) and \
167 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
168 time.sleep(0.01)
170 self.dbClient=db.DB(self.dbenvClient)
171 while True :
172 txn=self.dbenvClient.txn_begin()
173 try :
174 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
175 mode=0666, txn=txn)
176 except db.DBRepHandleDeadError :
177 txn.abort()
178 self.dbClient.close()
179 self.dbClient=db.DB(self.dbenvClient)
180 continue
182 txn.commit()
183 break
185 txn=self.dbenvMaster.txn_begin()
186 self.dbMaster.put("ABC", "123", txn=txn)
187 txn.commit()
188 import time
189 timeout=time.time()+10
190 v=None
191 while (time.time()<timeout) and (v==None) :
192 txn=self.dbenvClient.txn_begin()
193 v=self.dbClient.get("ABC", txn=txn)
194 txn.commit()
195 if v==None :
196 time.sleep(0.02)
197 self.assertTrue(time.time()<timeout)
198 self.assertEquals("123", v)
200 txn=self.dbenvMaster.txn_begin()
201 self.dbMaster.delete("ABC", txn=txn)
202 txn.commit()
203 timeout=time.time()+10
204 while (time.time()<timeout) and (v!=None) :
205 txn=self.dbenvClient.txn_begin()
206 v=self.dbClient.get("ABC", txn=txn)
207 txn.commit()
208 if v==None :
209 time.sleep(0.02)
210 self.assertTrue(time.time()<timeout)
211 self.assertEquals(None, v)
213 class DBBaseReplication(DBReplicationManager):
214 def setUp(self) :
215 DBReplicationManager.setUp(self)
216 def confirmed_master(a,b,c) :
217 if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
218 self.confirmed_master = True
220 def client_startupdone(a,b,c) :
221 if b == db.DB_EVENT_REP_STARTUPDONE :
222 self.client_startupdone = True
224 self.dbenvMaster.set_event_notify(confirmed_master)
225 self.dbenvClient.set_event_notify(client_startupdone)
227 import Queue
228 self.m2c = Queue.Queue()
229 self.c2m = Queue.Queue()
231 # There are only two nodes, so we don't need to
232 # do any routing decision
233 def m2c(dbenv, control, rec, lsnp, envid, flags) :
234 self.m2c.put((control, rec))
236 def c2m(dbenv, control, rec, lsnp, envid, flags) :
237 self.c2m.put((control, rec))
239 self.dbenvMaster.rep_set_transport(13,m2c)
240 self.dbenvMaster.rep_set_priority(10)
241 self.dbenvClient.rep_set_transport(3,c2m)
242 self.dbenvClient.rep_set_priority(0)
244 self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
245 self.assertEquals(self.dbenvClient.rep_get_priority(),0)
247 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
248 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
249 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
250 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
252 def thread_master() :
253 return self.thread_do(self.dbenvMaster, self.c2m, 3,
254 self.master_doing_election, True)
256 def thread_client() :
257 return self.thread_do(self.dbenvClient, self.m2c, 13,
258 self.client_doing_election, False)
260 from threading import Thread
261 t_m=Thread(target=thread_master)
262 t_c=Thread(target=thread_client)
263 import sys
264 if sys.version_info[0] < 3 :
265 t_m.setDaemon(True)
266 t_c.setDaemon(True)
267 else :
268 t_m.daemon = True
269 t_c.daemon = True
271 self.t_m = t_m
272 self.t_c = t_c
274 self.dbMaster = self.dbClient = None
276 self.master_doing_election=[False]
277 self.client_doing_election=[False]
280 def tearDown(self):
281 if self.dbClient :
282 self.dbClient.close()
283 if self.dbMaster :
284 self.dbMaster.close()
285 self.m2c.put(None)
286 self.c2m.put(None)
287 self.t_m.join()
288 self.t_c.join()
289 self.dbenvClient.close()
290 self.dbenvMaster.close()
291 test_support.rmtree(self.homeDirClient)
292 test_support.rmtree(self.homeDirMaster)
294 def basic_rep_threading(self) :
295 self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
296 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
298 def thread_do(env, q, envid, election_status, must_be_master) :
299 while True :
300 v=q.get()
301 if v == None : return
302 env.rep_process_message(v[0], v[1], envid)
304 self.thread_do = thread_do
306 self.t_m.start()
307 self.t_c.start()
309 def test01_basic_replication(self) :
310 self.basic_rep_threading()
312 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
313 # is not generated if the master has no new transactions.
314 # This is solved in BDB 4.6 (#15542).
315 import time
316 timeout = time.time()+60
317 while (time.time()<timeout) and not (self.confirmed_master and
318 self.client_startupdone) :
319 time.sleep(0.02)
320 self.assertTrue(time.time()<timeout)
322 self.dbMaster=db.DB(self.dbenvMaster)
323 txn=self.dbenvMaster.txn_begin()
324 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
325 txn.commit()
327 import time,os.path
328 timeout=time.time()+10
329 while (time.time()<timeout) and \
330 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
331 time.sleep(0.01)
333 self.dbClient=db.DB(self.dbenvClient)
334 while True :
335 txn=self.dbenvClient.txn_begin()
336 try :
337 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
338 mode=0666, txn=txn)
339 except db.DBRepHandleDeadError :
340 txn.abort()
341 self.dbClient.close()
342 self.dbClient=db.DB(self.dbenvClient)
343 continue
345 txn.commit()
346 break
348 txn=self.dbenvMaster.txn_begin()
349 self.dbMaster.put("ABC", "123", txn=txn)
350 txn.commit()
351 import time
352 timeout=time.time()+10
353 v=None
354 while (time.time()<timeout) and (v==None) :
355 txn=self.dbenvClient.txn_begin()
356 v=self.dbClient.get("ABC", txn=txn)
357 txn.commit()
358 if v==None :
359 time.sleep(0.02)
360 self.assertTrue(time.time()<timeout)
361 self.assertEquals("123", v)
363 txn=self.dbenvMaster.txn_begin()
364 self.dbMaster.delete("ABC", txn=txn)
365 txn.commit()
366 timeout=time.time()+10
367 while (time.time()<timeout) and (v!=None) :
368 txn=self.dbenvClient.txn_begin()
369 v=self.dbClient.get("ABC", txn=txn)
370 txn.commit()
371 if v==None :
372 time.sleep(0.02)
373 self.assertTrue(time.time()<timeout)
374 self.assertEquals(None, v)
376 if db.version() >= (4,7) :
377 def test02_test_request(self) :
378 self.basic_rep_threading()
379 (minimum, maximum) = self.dbenvClient.rep_get_request()
380 self.dbenvClient.rep_set_request(minimum-1, maximum+1)
381 self.assertEqual(self.dbenvClient.rep_get_request(),
382 (minimum-1, maximum+1))
384 if db.version() >= (4,6) :
385 def test03_master_election(self) :
386 # Get ready to hold an election
387 #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
388 self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
389 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
391 def thread_do(env, q, envid, election_status, must_be_master) :
392 while True :
393 v=q.get()
394 if v == None : return
395 r = env.rep_process_message(v[0],v[1],envid)
396 if must_be_master and self.confirmed_master :
397 self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
398 must_be_master = False
400 if r[0] == db.DB_REP_HOLDELECTION :
401 def elect() :
402 while True :
403 try :
404 env.rep_elect(2, 1)
405 election_status[0] = False
406 break
407 except db.DBRepUnavailError :
408 pass
409 if not election_status[0] and not self.confirmed_master :
410 from threading import Thread
411 election_status[0] = True
412 t=Thread(target=elect)
413 import sys
414 if sys.version_info[0] < 3 :
415 t.setDaemon(True)
416 else :
417 t.daemon = True
418 t.start()
420 self.thread_do = thread_do
422 self.t_m.start()
423 self.t_c.start()
425 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
426 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
427 self.client_doing_election[0] = True
428 while True :
429 try :
430 self.dbenvClient.rep_elect(2, 1)
431 self.client_doing_election[0] = False
432 break
433 except db.DBRepUnavailError :
434 pass
436 self.assertTrue(self.confirmed_master)
438 #----------------------------------------------------------------------
440 def test_suite():
441 suite = unittest.TestSuite()
442 if db.version() >= (4, 6) :
443 dbenv = db.DBEnv()
444 try :
445 dbenv.repmgr_get_ack_policy()
446 ReplicationManager_available=True
447 except :
448 ReplicationManager_available=False
449 dbenv.close()
450 del dbenv
451 if ReplicationManager_available :
452 suite.addTest(unittest.makeSuite(DBReplicationManager))
454 if have_threads :
455 suite.addTest(unittest.makeSuite(DBBaseReplication))
457 return suite
460 if __name__ == '__main__':
461 unittest.main(defaultTest='test_suite')