1 /* Copyright (C) 2004, 2005 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
18 #include <NdbRestarter.hpp>
19 #include <HugoOperations.hpp>
20 #include <HugoTransactions.hpp>
21 #include <UtilTransactions.hpp>
22 #include <signaldata/DumpStateOrd.hpp>
25 #include <InputStream.hpp>
38 static CASE g_op_types
[] =
40 { false, true, false, "INS", 0, 0, 0 }, // 0x001 a
41 { true, true, false, "UPD", 0, 0, 0 }, // 0x002 d
42 { true, false, false, "DEL", 0, 0, 0 }, // 0x004 g
44 { false, true, false, "INS", "UPD", 0, 0 }, // 0x008 b
45 { false, false, false, "INS", "DEL", 0, 0 }, // 0x010 c
46 { true, true, false, "UPD", "UPD", 0, 0 }, // 0x020 e
47 { true, false, false, "UPD", "DEL", 0, 0 }, // 0x040 f
48 { true, true, false, "DEL", "INS", 0, 0 }, // 0x080 h
50 { false, true, false, "INS", "DEL", "INS", 0 }, // 0x100 i
51 { true, false, false, "DEL", "INS", "DEL", 0 } // 0x200 j
53 const size_t OP_COUNT
= (sizeof(g_op_types
)/sizeof(g_op_types
[0]));
55 static Ndb
* g_ndb
= 0;
57 static Ndb_cluster_connection
*g_cluster_connection
= 0;
58 static HugoOperations
* g_hugo_ops
;
59 static int g_use_ops
= 1 | 2 | 4;
60 static int g_cases
= 0x1;
61 static int g_case_loop
= 2;
62 static int g_rows
= 10;
63 static int g_setup_tables
= 1;
64 static int g_one_op_at_a_time
= 0;
65 static const char * g_tablename
= "T1";
66 static const NdbDictionary::Table
* g_table
= 0;
67 static NdbRestarter g_restarter
;
69 static int init_ndb(int argc
, char** argv
);
70 static int parse_args(int argc
, char** argv
);
71 static int connect_ndb();
72 static int drop_all_tables();
73 static int load_table();
74 static int pause_lcp(int error
);
75 static int do_op(int row
);
76 static int continue_lcp(int error
= 0);
79 static int validate();
81 #define require(x) { bool b = x; if(!b){g_err << __LINE__ << endl; abort();}}
84 main(int argc
, char ** argv
){
86 require(!init_ndb(argc
, argv
));
87 if(parse_args(argc
, argv
))
89 require(!connect_ndb());
92 require(!drop_all_tables());
94 if(NDBT_Tables::createTable(g_ndb
, g_tablename
) != 0){
99 g_table
= g_ndb
->getDictionary()->getTable(g_tablename
);
101 g_err
<< "Failed to retreive table: " << g_tablename
<< endl
;
104 require(g_hugo_ops
= new HugoOperations(* g_table
));
105 require(!g_hugo_ops
->startTransaction(g_ndb
));
107 g_ops
= new CASE
[g_rows
];
109 const int use_ops
= g_use_ops
;
110 for(size_t i
= 0; i
<OP_COUNT
; i
++)
112 if(g_one_op_at_a_time
){
113 while(i
< OP_COUNT
&& (use_ops
& (1 << i
)) == 0) i
++;
116 ndbout_c("-- loop\noperation: %c use_ops: %x", 'a'+i
, use_ops
);
117 g_use_ops
= (1 << i
);
122 size_t test_case
= 0;
123 if((1 << test_case
++) & g_cases
)
125 for(size_t tl
= 0; tl
<g_case_loop
; tl
++){
126 g_info
<< "Performing all ops wo/ inteference of LCP" << endl
;
128 g_info
<< "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl
;
129 g_info
<< " where ZLCP_OP_WRITE_RT_BREAK is "
130 " finished before SAVE_PAGES" << endl
;
131 require(!load_table());
132 require(!pause_lcp(5900));
133 for(size_t j
= 0; j
<g_rows
; j
++){
136 require(!continue_lcp(5900));
138 require(!pause_lcp(5900));
140 require(!validate());
144 if((1 << test_case
++) & g_cases
)
146 for(size_t tl
= 0; tl
<g_case_loop
; tl
++){
147 g_info
<< "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl
;
148 g_info
<< " where ZLCP_OP_WRITE_RT_BREAK is finished after SAVE_PAGES"
150 require(!load_table());
151 require(!pause_lcp(5901));
152 for(size_t j
= 0; j
<g_rows
; j
++){
155 require(!continue_lcp(5901));
157 require(!pause_lcp(5900));
159 require(!validate());
163 if((1 << test_case
++) & g_cases
)
165 for(size_t tl
= 0; tl
<g_case_loop
; tl
++){
166 g_info
<< "Testing pre LCP operations, undo-ed at commit" << endl
;
167 require(!load_table());
168 require(!pause_lcp(5902));
169 for(size_t j
= 0; j
<g_rows
; j
++){
172 require(!continue_lcp(5902));
174 require(!continue_lcp(5903));
175 require(!pause_lcp(5900));
177 require(!validate());
181 if((1 << test_case
++) & g_cases
)
183 for(size_t tl
= 0; tl
<g_case_loop
; tl
++){
184 g_info
<< "Testing prepared during LCP and committed after" << endl
;
185 require(!load_table());
186 require(!pause_lcp(5904)); // Start LCP, but don't save pages
187 for(size_t j
= 0; j
<g_rows
; j
++){
190 require(!continue_lcp(5904)); // Start ACC save pages
191 require(!pause_lcp(5900)); // Next LCP
194 require(!validate());
200 static int init_ndb(int argc
, char** argv
)
206 static int parse_args(int argc
, char** argv
)
209 char * ops
= 0, *cases
=0;
210 struct getargs args
[] = {
211 { "records", 0, arg_integer
, &g_rows
, "Number of records", "records" },
212 { "operations", 'o', arg_string
, &ops
, "Operations [a-h]", 0 },
213 { "1", '1', arg_flag
, &g_one_op_at_a_time
, "One op at a time", 0 },
214 { "0", '0', arg_negative_flag
, &g_one_op_at_a_time
, "All ops at once", 0 },
215 { "cases", 'c', arg_string
, &cases
, "Cases [a-c]", 0 },
216 { 0, 't', arg_flag
, &g_setup_tables
, "Create table", 0 },
217 { 0, 'u', arg_negative_flag
, &g_setup_tables
, "Dont create table", 0 }
221 const int num_args
= sizeof(args
)/sizeof(args
[0]);
222 if(getarg(args
, num_args
, argc
, (const char**)argv
, &optind
)) {
223 arg_printusage(args
, num_args
, argv
[0], " tabname1\n");
224 ndbout_c("\n -- Operations [a-%c] = ", 'a'+OP_COUNT
-1);
225 for(i
= 0; i
<OP_COUNT
; i
++){
226 ndbout_c("\t%c = %s %s",
227 'a'+i
, g_op_types
[i
].op1
,
228 g_op_types
[i
].op2
? g_op_types
[i
].op2
: "");
237 g_use_ops
|= (1 << ((* s
++) - 'a'));
244 g_cases
|= (1 << ((* s
++) - 'a'));
247 ndbout_c("table: %s", g_tablename
);
248 printf("operations: ");
249 for(i
= 0; i
<OP_COUNT
; i
++)
250 if(g_use_ops
& (1 << i
))
254 printf("test cases: ");
256 if(g_cases
& (1 << i
))
259 printf("-------------\n");
263 static int connect_ndb()
265 g_cluster_connection
= new Ndb_cluster_connection();
266 if(g_cluster_connection
->connect(12, 5, 1) != 0)
271 g_ndb
= new Ndb(g_cluster_connection
, "TEST_DB");
273 if(g_ndb
->waitUntilReady(30) == 0){
275 // int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP };
276 // return g_restarter.dumpStateAllNodes(args, 1);
281 static int disconnect_ndb()
284 delete g_cluster_connection
;
287 g_cluster_connection
= 0;
291 static int drop_all_tables()
293 NdbDictionary::Dictionary
* dict
= g_ndb
->getDictionary();
296 BaseString db
= g_ndb
->getDatabaseName();
297 BaseString schema
= g_ndb
->getSchemaName();
299 NdbDictionary::Dictionary::List list
;
300 if (dict
->listObjects(list
, NdbDictionary::Object::TypeUndefined
) == -1){
301 g_err
<< "Failed to list tables: " << endl
302 << dict
->getNdbError() << endl
;
305 for (unsigned i
= 0; i
< list
.count
; i
++) {
306 NdbDictionary::Dictionary::List::Element
& elt
= list
.elements
[i
];
308 case NdbDictionary::Object::SystemTable
:
309 case NdbDictionary::Object::UserTable
:
310 g_ndb
->setDatabaseName(elt
.database
);
311 g_ndb
->setSchemaName(elt
.schema
);
312 if(dict
->dropTable(elt
.name
) != 0){
313 g_err
<< "Failed to drop table: "
314 << elt
.database
<< "/" << elt
.schema
<< "/" << elt
.name
<<endl
;
315 g_err
<< dict
->getNdbError() << endl
;
319 case NdbDictionary::Object::UniqueHashIndex
:
320 case NdbDictionary::Object::OrderedIndex
:
321 case NdbDictionary::Object::HashIndexTrigger
:
322 case NdbDictionary::Object::IndexTrigger
:
323 case NdbDictionary::Object::SubscriptionTrigger
:
324 case NdbDictionary::Object::ReadOnlyConstraint
:
330 g_ndb
->setDatabaseName(db
.c_str());
331 g_ndb
->setSchemaName(schema
.c_str());
336 static int load_table()
338 UtilTransactions
clear(* g_table
);
339 require(!clear
.clearTable(g_ndb
));
341 HugoOperations
ops(* g_table
);
342 require(!ops
.startTransaction(g_ndb
));
345 size_t uncommitted
= 0;
346 bool prepared
= false;
347 for(size_t i
= 0; i
<g_rows
; i
++){
348 for(op
%= OP_COUNT
; !((1 << op
) & g_use_ops
); op
= (op
+ 1) % OP_COUNT
);
349 g_ops
[i
] = g_op_types
[op
++];
350 if(g_ops
[i
].start_row
){
351 g_ops
[i
].curr_row
= true;
352 g_ops
[i
].val
= rand();
353 require(!ops
.pkInsertRecord(g_ndb
, i
, 1, g_ops
[i
].val
));
356 g_ops
[i
].curr_row
= false;
358 if(uncommitted
>= 100){
359 require(!ops
.execute_Commit(g_ndb
));
360 require(!ops
.getTransaction()->restart());
366 require(!ops
.execute_Commit(g_ndb
));
368 require(!ops
.closeTransaction(g_ndb
));
370 g_info
<< "Inserted " << rows
<< " rows" << endl
;
374 static int pause_lcp(int error
)
376 int nodes
= g_restarter
.getNumDbNodes();
378 int filter
[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO
, 0 };
379 int fd
= ndb_mgm_listen_event(g_restarter
.handle
, filter
);
381 require(!g_restarter
.insertErrorInAllNodes(error
));
382 int dump
[] = { DumpStateOrd::DihStartLcpImmediately
};
383 require(!g_restarter
.dumpStateAllNodes(dump
, 1));
387 SocketInputStream
in(fd
, 1000);
390 tmp
= in
.gets(buf
, 1024);
394 if(sscanf(tmp
, "%*[^:]: LCP: %d ", &id
) == 1 && id
== error
&&
400 } while(count
++ < 30);
406 static int do_op(int row
)
408 HugoOperations
& ops
= * g_hugo_ops
;
409 if(strcmp(g_ops
[row
].op1
, "INS") == 0){
410 require(!g_ops
[row
].curr_row
);
411 g_ops
[row
].curr_row
= true;
412 g_ops
[row
].val
= rand();
413 require(!ops
.pkInsertRecord(g_ndb
, row
, 1, g_ops
[row
].val
));
414 } else if(strcmp(g_ops
[row
].op1
, "UPD") == 0){
415 require(g_ops
[row
].curr_row
);
416 g_ops
[row
].val
= rand();
417 require(!ops
.pkUpdateRecord(g_ndb
, row
, 1, g_ops
[row
].val
));
418 } else if(strcmp(g_ops
[row
].op1
, "DEL") == 0){
419 require(g_ops
[row
].curr_row
);
420 g_ops
[row
].curr_row
= false;
421 require(!ops
.pkDeleteRecord(g_ndb
, row
, 1));
424 require(!ops
.execute_NoCommit(g_ndb
));
426 if(g_ops
[row
].op2
== 0){
427 } else if(strcmp(g_ops
[row
].op2
, "INS") == 0){
428 require(!g_ops
[row
].curr_row
);
429 g_ops
[row
].curr_row
= true;
430 g_ops
[row
].val
= rand();
431 require(!ops
.pkInsertRecord(g_ndb
, row
, 1, g_ops
[row
].val
));
432 } else if(strcmp(g_ops
[row
].op2
, "UPD") == 0){
433 require(g_ops
[row
].curr_row
);
434 g_ops
[row
].val
= rand();
435 require(!ops
.pkUpdateRecord(g_ndb
, row
, 1, g_ops
[row
].val
));
436 } else if(strcmp(g_ops
[row
].op2
, "DEL") == 0){
437 require(g_ops
[row
].curr_row
);
438 g_ops
[row
].curr_row
= false;
439 require(!ops
.pkDeleteRecord(g_ndb
, row
, 1));
442 if(g_ops
[row
].op2
!= 0)
443 require(!ops
.execute_NoCommit(g_ndb
));
445 if(g_ops
[row
].op3
== 0){
446 } else if(strcmp(g_ops
[row
].op3
, "INS") == 0){
447 require(!g_ops
[row
].curr_row
);
448 g_ops
[row
].curr_row
= true;
449 g_ops
[row
].val
= rand();
450 require(!ops
.pkInsertRecord(g_ndb
, row
, 1, g_ops
[row
].val
));
451 } else if(strcmp(g_ops
[row
].op3
, "UPD") == 0){
452 require(g_ops
[row
].curr_row
);
453 g_ops
[row
].val
= rand();
454 require(!ops
.pkUpdateRecord(g_ndb
, row
, 1, g_ops
[row
].val
));
455 } else if(strcmp(g_ops
[row
].op3
, "DEL") == 0){
456 require(g_ops
[row
].curr_row
);
457 g_ops
[row
].curr_row
= false;
458 require(!ops
.pkDeleteRecord(g_ndb
, row
, 1));
461 if(g_ops
[row
].op3
!= 0)
462 require(!ops
.execute_NoCommit(g_ndb
));
467 static int continue_lcp(int error
)
469 int filter
[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO
, 0 };
472 fd
= ndb_mgm_listen_event(g_restarter
.handle
, filter
);
476 int args
[] = { DumpStateOrd::LCPContinue
};
477 if(g_restarter
.dumpStateAllNodes(args
, 1) != 0)
483 SocketInputStream
in(fd
, 1000);
485 int nodes
= g_restarter
.getNumDbNodes();
487 tmp
= in
.gets(buf
, 1024);
491 if(sscanf(tmp
, "%*[^:]: LCP: %d ", &id
) == 1 && id
== error
&&
497 } while(count
++ < 30);
506 HugoOperations
& ops
= * g_hugo_ops
;
507 int res
= ops
.execute_Commit(g_ndb
);
509 return ops
.getTransaction()->restart();
516 g_info
<< "Restarting cluster" << endl
;
517 g_hugo_ops
->closeTransaction(g_ndb
);
521 require(!g_restarter
.restartAll());
522 require(!g_restarter
.waitClusterStarted(30));
523 require(!connect_ndb());
525 g_table
= g_ndb
->getDictionary()->getTable(g_tablename
);
527 require(g_hugo_ops
= new HugoOperations(* g_table
));
528 require(!g_hugo_ops
->startTransaction(g_ndb
));
532 static int validate()
534 HugoOperations
ops(* g_table
);
535 for(size_t i
= 0; i
<g_rows
; i
++){
536 require(g_ops
[i
].curr_row
== g_ops
[i
].end_row
);
537 require(!ops
.startTransaction(g_ndb
));
538 ops
.pkReadRecord(g_ndb
, i
, 1);
539 int res
= ops
.execute_Commit(g_ndb
);
540 if(g_ops
[i
].curr_row
){
541 require(res
== 0 && ops
.verifyUpdatesValue(g_ops
[i
].val
) == 0);
545 ops
.closeTransaction(g_ndb
);
548 for(size_t j
= 0; j
<10; j
++){
549 UtilTransactions
clear(* g_table
);
550 require(!clear
.clearTable(g_ndb
));
552 HugoTransactions
trans(* g_table
);
553 require(trans
.loadTable(g_ndb
, 1024) == 0);