mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / test / ndbapi / testLcp.cpp
blobcd4601a8c58c6c0c18da14d4d4b480df9582a426
1 /* Copyright (C) 2004, 2005 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include <NDBT.hpp>
17 #include <NdbApi.hpp>
18 #include <NdbRestarter.hpp>
19 #include <HugoOperations.hpp>
20 #include <HugoTransactions.hpp>
21 #include <UtilTransactions.hpp>
22 #include <signaldata/DumpStateOrd.hpp>
24 #include <getarg.h>
25 #include <InputStream.hpp>
27 struct CASE
29 bool start_row;
30 bool end_row;
31 bool curr_row;
32 const char * op1;
33 const char * op2;
34 const char * op3;
35 int val;
38 static CASE g_op_types[] =
40 { false, true, false, "INS", 0, 0, 0 }, // 0x001 a
41 { true, true, false, "UPD", 0, 0, 0 }, // 0x002 d
42 { true, false, false, "DEL", 0, 0, 0 }, // 0x004 g
44 { false, true, false, "INS", "UPD", 0, 0 }, // 0x008 b
45 { false, false, false, "INS", "DEL", 0, 0 }, // 0x010 c
46 { true, true, false, "UPD", "UPD", 0, 0 }, // 0x020 e
47 { true, false, false, "UPD", "DEL", 0, 0 }, // 0x040 f
48 { true, true, false, "DEL", "INS", 0, 0 }, // 0x080 h
50 { false, true, false, "INS", "DEL", "INS", 0 }, // 0x100 i
51 { true, false, false, "DEL", "INS", "DEL", 0 } // 0x200 j
53 const size_t OP_COUNT = (sizeof(g_op_types)/sizeof(g_op_types[0]));
55 static Ndb* g_ndb = 0;
56 static CASE* g_ops;
57 static Ndb_cluster_connection *g_cluster_connection= 0;
58 static HugoOperations* g_hugo_ops;
59 static int g_use_ops = 1 | 2 | 4;
60 static int g_cases = 0x1;
61 static int g_case_loop = 2;
62 static int g_rows = 10;
63 static int g_setup_tables = 1;
64 static int g_one_op_at_a_time = 0;
65 static const char * g_tablename = "T1";
66 static const NdbDictionary::Table* g_table = 0;
67 static NdbRestarter g_restarter;
69 static int init_ndb(int argc, char** argv);
70 static int parse_args(int argc, char** argv);
71 static int connect_ndb();
72 static int drop_all_tables();
73 static int load_table();
74 static int pause_lcp(int error);
75 static int do_op(int row);
76 static int continue_lcp(int error = 0);
77 static int commit();
78 static int restart();
79 static int validate();
81 #define require(x) { bool b = x; if(!b){g_err << __LINE__ << endl; abort();}}
83 int
84 main(int argc, char ** argv){
85 ndb_init();
86 require(!init_ndb(argc, argv));
87 if(parse_args(argc, argv))
88 return -1;
89 require(!connect_ndb());
91 if(g_setup_tables){
92 require(!drop_all_tables());
94 if(NDBT_Tables::createTable(g_ndb, g_tablename) != 0){
95 exit(-1);
99 g_table = g_ndb->getDictionary()->getTable(g_tablename);
100 if(g_table == 0){
101 g_err << "Failed to retreive table: " << g_tablename << endl;
102 exit(-1);
104 require(g_hugo_ops = new HugoOperations(* g_table));
105 require(!g_hugo_ops->startTransaction(g_ndb));
107 g_ops= new CASE[g_rows];
109 const int use_ops = g_use_ops;
110 for(size_t i = 0; i<OP_COUNT; i++)
112 if(g_one_op_at_a_time){
113 while(i < OP_COUNT && (use_ops & (1 << i)) == 0) i++;
114 if(i == OP_COUNT)
115 break;
116 ndbout_c("-- loop\noperation: %c use_ops: %x", 'a'+i, use_ops);
117 g_use_ops = (1 << i);
118 } else {
119 i = OP_COUNT - 1;
122 size_t test_case = 0;
123 if((1 << test_case++) & g_cases)
125 for(size_t tl = 0; tl<g_case_loop; tl++){
126 g_info << "Performing all ops wo/ inteference of LCP" << endl;
128 g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
129 g_info << " where ZLCP_OP_WRITE_RT_BREAK is "
130 " finished before SAVE_PAGES" << endl;
131 require(!load_table());
132 require(!pause_lcp(5900));
133 for(size_t j = 0; j<g_rows; j++){
134 require(!do_op(j));
136 require(!continue_lcp(5900));
137 require(!commit());
138 require(!pause_lcp(5900));
139 require(!restart());
140 require(!validate());
144 if((1 << test_case++) & g_cases)
146 for(size_t tl = 0; tl<g_case_loop; tl++){
147 g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
148 g_info << " where ZLCP_OP_WRITE_RT_BREAK is finished after SAVE_PAGES"
149 << endl;
150 require(!load_table());
151 require(!pause_lcp(5901));
152 for(size_t j = 0; j<g_rows; j++){
153 require(!do_op(j));
155 require(!continue_lcp(5901));
156 require(!commit());
157 require(!pause_lcp(5900));
158 require(!restart());
159 require(!validate());
163 if((1 << test_case++) & g_cases)
165 for(size_t tl = 0; tl<g_case_loop; tl++){
166 g_info << "Testing pre LCP operations, undo-ed at commit" << endl;
167 require(!load_table());
168 require(!pause_lcp(5902));
169 for(size_t j = 0; j<g_rows; j++){
170 require(!do_op(j));
172 require(!continue_lcp(5902));
173 require(!commit());
174 require(!continue_lcp(5903));
175 require(!pause_lcp(5900));
176 require(!restart());
177 require(!validate());
181 if((1 << test_case++) & g_cases)
183 for(size_t tl = 0; tl<g_case_loop; tl++){
184 g_info << "Testing prepared during LCP and committed after" << endl;
185 require(!load_table());
186 require(!pause_lcp(5904)); // Start LCP, but don't save pages
187 for(size_t j = 0; j<g_rows; j++){
188 require(!do_op(j));
190 require(!continue_lcp(5904)); // Start ACC save pages
191 require(!pause_lcp(5900)); // Next LCP
192 require(!commit());
193 require(!restart());
194 require(!validate());
200 static int init_ndb(int argc, char** argv)
202 ndb_init();
203 return 0;
206 static int parse_args(int argc, char** argv)
208 size_t i;
209 char * ops= 0, *cases=0;
210 struct getargs args[] = {
211 { "records", 0, arg_integer, &g_rows, "Number of records", "records" },
212 { "operations", 'o', arg_string, &ops, "Operations [a-h]", 0 },
213 { "1", '1', arg_flag, &g_one_op_at_a_time, "One op at a time", 0 },
214 { "0", '0', arg_negative_flag, &g_one_op_at_a_time, "All ops at once", 0 },
215 { "cases", 'c', arg_string, &cases, "Cases [a-c]", 0 },
216 { 0, 't', arg_flag, &g_setup_tables, "Create table", 0 },
217 { 0, 'u', arg_negative_flag, &g_setup_tables, "Dont create table", 0 }
220 int optind= 0;
221 const int num_args = sizeof(args)/sizeof(args[0]);
222 if(getarg(args, num_args, argc, (const char**)argv, &optind)) {
223 arg_printusage(args, num_args, argv[0], " tabname1\n");
224 ndbout_c("\n -- Operations [a-%c] = ", 'a'+OP_COUNT-1);
225 for(i = 0; i<OP_COUNT; i++){
226 ndbout_c("\t%c = %s %s",
227 'a'+i, g_op_types[i].op1,
228 g_op_types[i].op2 ? g_op_types[i].op2 : "");
230 return -1;
233 if(ops != 0){
234 g_use_ops = 0;
235 char * s = ops;
236 while(* s)
237 g_use_ops |= (1 << ((* s++) - 'a'));
240 if(cases != 0){
241 g_cases = 0;
242 char * s = cases;
243 while(* s)
244 g_cases |= (1 << ((* s++) - 'a'));
247 ndbout_c("table: %s", g_tablename);
248 printf("operations: ");
249 for(i = 0; i<OP_COUNT; i++)
250 if(g_use_ops & (1 << i))
251 printf("%c", 'a'+i);
252 printf("\n");
254 printf("test cases: ");
255 for(i = 0; i<3; i++)
256 if(g_cases & (1 << i))
257 printf("%c", '1'+i);
258 printf("\n");
259 printf("-------------\n");
260 return 0;
263 static int connect_ndb()
265 g_cluster_connection = new Ndb_cluster_connection();
266 if(g_cluster_connection->connect(12, 5, 1) != 0)
268 return 1;
271 g_ndb = new Ndb(g_cluster_connection, "TEST_DB");
272 g_ndb->init(256);
273 if(g_ndb->waitUntilReady(30) == 0){
274 return 0;
275 // int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP };
276 // return g_restarter.dumpStateAllNodes(args, 1);
278 return -1;
281 static int disconnect_ndb()
283 delete g_ndb;
284 delete g_cluster_connection;
285 g_ndb = 0;
286 g_table = 0;
287 g_cluster_connection= 0;
288 return 0;
291 static int drop_all_tables()
293 NdbDictionary::Dictionary * dict = g_ndb->getDictionary();
294 require(dict);
296 BaseString db = g_ndb->getDatabaseName();
297 BaseString schema = g_ndb->getSchemaName();
299 NdbDictionary::Dictionary::List list;
300 if (dict->listObjects(list, NdbDictionary::Object::TypeUndefined) == -1){
301 g_err << "Failed to list tables: " << endl
302 << dict->getNdbError() << endl;
303 return -1;
305 for (unsigned i = 0; i < list.count; i++) {
306 NdbDictionary::Dictionary::List::Element& elt = list.elements[i];
307 switch (elt.type) {
308 case NdbDictionary::Object::SystemTable:
309 case NdbDictionary::Object::UserTable:
310 g_ndb->setDatabaseName(elt.database);
311 g_ndb->setSchemaName(elt.schema);
312 if(dict->dropTable(elt.name) != 0){
313 g_err << "Failed to drop table: "
314 << elt.database << "/" << elt.schema << "/" << elt.name <<endl;
315 g_err << dict->getNdbError() << endl;
316 return -1;
318 break;
319 case NdbDictionary::Object::UniqueHashIndex:
320 case NdbDictionary::Object::OrderedIndex:
321 case NdbDictionary::Object::HashIndexTrigger:
322 case NdbDictionary::Object::IndexTrigger:
323 case NdbDictionary::Object::SubscriptionTrigger:
324 case NdbDictionary::Object::ReadOnlyConstraint:
325 default:
326 break;
330 g_ndb->setDatabaseName(db.c_str());
331 g_ndb->setSchemaName(schema.c_str());
333 return 0;
336 static int load_table()
338 UtilTransactions clear(* g_table);
339 require(!clear.clearTable(g_ndb));
341 HugoOperations ops(* g_table);
342 require(!ops.startTransaction(g_ndb));
343 size_t op = 0;
344 size_t rows = 0;
345 size_t uncommitted = 0;
346 bool prepared = false;
347 for(size_t i = 0; i<g_rows; i++){
348 for(op %= OP_COUNT; !((1 << op) & g_use_ops); op = (op + 1) % OP_COUNT);
349 g_ops[i] = g_op_types[op++];
350 if(g_ops[i].start_row){
351 g_ops[i].curr_row = true;
352 g_ops[i].val = rand();
353 require(!ops.pkInsertRecord(g_ndb, i, 1, g_ops[i].val));
354 uncommitted++;
355 } else {
356 g_ops[i].curr_row = false;
358 if(uncommitted >= 100){
359 require(!ops.execute_Commit(g_ndb));
360 require(!ops.getTransaction()->restart());
361 rows += uncommitted;
362 uncommitted = 0;
365 if(uncommitted)
366 require(!ops.execute_Commit(g_ndb));
368 require(!ops.closeTransaction(g_ndb));
369 rows += uncommitted;
370 g_info << "Inserted " << rows << " rows" << endl;
371 return 0;
374 static int pause_lcp(int error)
376 int nodes = g_restarter.getNumDbNodes();
378 int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
379 int fd = ndb_mgm_listen_event(g_restarter.handle, filter);
380 require(fd >= 0);
381 require(!g_restarter.insertErrorInAllNodes(error));
382 int dump[] = { DumpStateOrd::DihStartLcpImmediately };
383 require(!g_restarter.dumpStateAllNodes(dump, 1));
385 char *tmp;
386 char buf[1024];
387 SocketInputStream in(fd, 1000);
388 int count = 0;
389 do {
390 tmp = in.gets(buf, 1024);
391 if(tmp)
393 int id;
394 if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
395 --nodes == 0){
396 close(fd);
397 return 0;
400 } while(count++ < 30);
402 close(fd);
403 return -1;
406 static int do_op(int row)
408 HugoOperations & ops = * g_hugo_ops;
409 if(strcmp(g_ops[row].op1, "INS") == 0){
410 require(!g_ops[row].curr_row);
411 g_ops[row].curr_row = true;
412 g_ops[row].val = rand();
413 require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
414 } else if(strcmp(g_ops[row].op1, "UPD") == 0){
415 require(g_ops[row].curr_row);
416 g_ops[row].val = rand();
417 require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
418 } else if(strcmp(g_ops[row].op1, "DEL") == 0){
419 require(g_ops[row].curr_row);
420 g_ops[row].curr_row = false;
421 require(!ops.pkDeleteRecord(g_ndb, row, 1));
424 require(!ops.execute_NoCommit(g_ndb));
426 if(g_ops[row].op2 == 0){
427 } else if(strcmp(g_ops[row].op2, "INS") == 0){
428 require(!g_ops[row].curr_row);
429 g_ops[row].curr_row = true;
430 g_ops[row].val = rand();
431 require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
432 } else if(strcmp(g_ops[row].op2, "UPD") == 0){
433 require(g_ops[row].curr_row);
434 g_ops[row].val = rand();
435 require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
436 } else if(strcmp(g_ops[row].op2, "DEL") == 0){
437 require(g_ops[row].curr_row);
438 g_ops[row].curr_row = false;
439 require(!ops.pkDeleteRecord(g_ndb, row, 1));
442 if(g_ops[row].op2 != 0)
443 require(!ops.execute_NoCommit(g_ndb));
445 if(g_ops[row].op3 == 0){
446 } else if(strcmp(g_ops[row].op3, "INS") == 0){
447 require(!g_ops[row].curr_row);
448 g_ops[row].curr_row = true;
449 g_ops[row].val = rand();
450 require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
451 } else if(strcmp(g_ops[row].op3, "UPD") == 0){
452 require(g_ops[row].curr_row);
453 g_ops[row].val = rand();
454 require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
455 } else if(strcmp(g_ops[row].op3, "DEL") == 0){
456 require(g_ops[row].curr_row);
457 g_ops[row].curr_row = false;
458 require(!ops.pkDeleteRecord(g_ndb, row, 1));
461 if(g_ops[row].op3 != 0)
462 require(!ops.execute_NoCommit(g_ndb));
464 return 0;
467 static int continue_lcp(int error)
469 int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
470 int fd = -1;
471 if(error){
472 fd = ndb_mgm_listen_event(g_restarter.handle, filter);
473 require(fd >= 0);
476 int args[] = { DumpStateOrd::LCPContinue };
477 if(g_restarter.dumpStateAllNodes(args, 1) != 0)
478 return -1;
480 if(error){
481 char *tmp;
482 char buf[1024];
483 SocketInputStream in(fd, 1000);
484 int count = 0;
485 int nodes = g_restarter.getNumDbNodes();
486 do {
487 tmp = in.gets(buf, 1024);
488 if(tmp)
490 int id;
491 if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
492 --nodes == 0){
493 close(fd);
494 return 0;
497 } while(count++ < 30);
499 close(fd);
501 return 0;
504 static int commit()
506 HugoOperations & ops = * g_hugo_ops;
507 int res = ops.execute_Commit(g_ndb);
508 if(res == 0){
509 return ops.getTransaction()->restart();
511 return res;
514 static int restart()
516 g_info << "Restarting cluster" << endl;
517 g_hugo_ops->closeTransaction(g_ndb);
518 disconnect_ndb();
519 delete g_hugo_ops;
521 require(!g_restarter.restartAll());
522 require(!g_restarter.waitClusterStarted(30));
523 require(!connect_ndb());
525 g_table = g_ndb->getDictionary()->getTable(g_tablename);
526 require(g_table);
527 require(g_hugo_ops = new HugoOperations(* g_table));
528 require(!g_hugo_ops->startTransaction(g_ndb));
529 return 0;
532 static int validate()
534 HugoOperations ops(* g_table);
535 for(size_t i = 0; i<g_rows; i++){
536 require(g_ops[i].curr_row == g_ops[i].end_row);
537 require(!ops.startTransaction(g_ndb));
538 ops.pkReadRecord(g_ndb, i, 1);
539 int res = ops.execute_Commit(g_ndb);
540 if(g_ops[i].curr_row){
541 require(res == 0 && ops.verifyUpdatesValue(g_ops[i].val) == 0);
542 } else {
543 require(res == 626);
545 ops.closeTransaction(g_ndb);
548 for(size_t j = 0; j<10; j++){
549 UtilTransactions clear(* g_table);
550 require(!clear.clearTable(g_ndb));
552 HugoTransactions trans(* g_table);
553 require(trans.loadTable(g_ndb, 1024) == 0);
555 return 0;