mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / ndbapi-examples / ndbapi_async / ndbapi_async.cpp
blob0845073e85ecc5c40b836d7a9c608d6d12861841
3 /* Copyright (c) 2003, 2005, 2006 MySQL AB
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
19 /**
20 * ndbapi_async.cpp:
21 * Illustrates how to use callbacks and error handling using the asynchronous
22 * part of the NDBAPI.
24 * Classes and methods in NDBAPI used in this example:
26 * Ndb_cluster_connection
27 * connect()
28 * wait_until_ready()
30 * Ndb
31 * init()
32 * startTransaction()
33 * closeTransaction()
34 * sendPollNdb()
35 * getNdbError()
37 * NdbConnection
38 * getNdbOperation()
39 * executeAsynchPrepare()
40 * getNdbError()
42 * NdbOperation
43 * insertTuple()
44 * equal()
45 * setValue()
50 #include <mysql.h>
51 #include <mysqld_error.h>
52 #include <NdbApi.hpp>
54 #include <iostream> // Used for cout
56 /**
57 * Helper sleep function
59 static void
60 milliSleep(int milliseconds){
61 struct timeval sleeptime;
62 sleeptime.tv_sec = milliseconds / 1000;
63 sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
64 select(0, 0, 0, 0, &sleeptime);
68 /**
69 * error printout macro
71 #define PRINT_ERROR(code,msg) \
72 std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
73 << ", code: " << code \
74 << ", msg: " << msg << "." << std::endl
75 #define MYSQLERROR(mysql) { \
76 PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
77 exit(-1); }
78 #define APIERROR(error) { \
79 PRINT_ERROR(error.code,error.message); \
80 exit(-1); }
82 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
83 /**
84 * callback struct.
85 * transaction : index of the transaction in transaction[] array below
86 * data : the data that the transaction was modifying.
87 * retries : counter for how many times the trans. has been retried
89 typedef struct {
90 Ndb * ndb;
91 int transaction;
92 int data;
93 int retries;
94 } async_callback_t;
96 /**
97 * Structure used in "free list" to a NdbTransaction
99 typedef struct {
100 NdbTransaction* conn;
101 int used;
102 } transaction_t;
105 * Free list holding transactions
107 transaction_t transaction[1024]; //1024 - max number of outstanding
108 //transaction in one Ndb object
110 #endif
112 * prototypes
116 * Prepare and send transaction
118 int populate(Ndb * myNdb, int data, async_callback_t * cbData);
121 * Error handler.
123 bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb);
126 * Exit function
128 void asynchExitHandler(Ndb * m_ndb) ;
131 * Helper function used in callback(...)
133 void closeTransaction(Ndb * ndb , async_callback_t * cb);
136 * Function to create table
138 void create_table(MYSQL &mysql);
141 * Function to drop table
143 void drop_table(MYSQL &mysql);
146 * stat. variables
148 int tempErrors = 0;
149 int permErrors = 0;
151 void
152 closeTransaction(Ndb * ndb , async_callback_t * cb)
154 ndb->closeTransaction(transaction[cb->transaction].conn);
155 transaction[cb->transaction].conn = 0;
156 transaction[cb->transaction].used = 0;
157 cb->retries++;
161 * Callback executed when transaction has return from NDB
163 static void
164 callback(int result, NdbTransaction* trans, void* aObject)
166 async_callback_t * cbData = (async_callback_t *)aObject;
167 if (result<0)
170 * Error: Temporary or permanent?
172 if (asynchErrorHandler(trans, (Ndb*)cbData->ndb))
174 closeTransaction((Ndb*)cbData->ndb, cbData);
175 while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0)
176 milliSleep(10);
178 else
180 std::cout << "Restore: Failed to restore data "
181 << "due to a unrecoverable error. Exiting..." << std::endl;
182 delete cbData;
183 asynchExitHandler((Ndb*)cbData->ndb);
186 else
189 * OK! close transaction
191 closeTransaction((Ndb*)cbData->ndb, cbData);
192 delete cbData;
198 * Create table "GARAGE"
200 void create_table(MYSQL &mysql)
202 while (mysql_query(&mysql,
203 "CREATE TABLE"
204 " GARAGE"
205 " (REG_NO INT UNSIGNED NOT NULL,"
206 " BRAND CHAR(20) NOT NULL,"
207 " COLOR CHAR(20) NOT NULL,"
208 " PRIMARY KEY USING HASH (REG_NO))"
209 " ENGINE=NDB"))
211 if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
212 MYSQLERROR(mysql);
213 std::cout << "MySQL Cluster already has example table: GARAGE. "
214 << "Dropping it..." << std::endl;
215 drop_table(mysql);
216 create_table(mysql);
221 * Drop table GARAGE
223 void drop_table(MYSQL &mysql)
225 if (mysql_query(&mysql, "DROP TABLE GARAGE"))
226 MYSQLERROR(mysql);
230 void asynchExitHandler(Ndb * m_ndb)
232 if (m_ndb != NULL)
233 delete m_ndb;
234 exit(-1);
237 /* returns true if is recoverable (temporary),
238 * false if it is an error that is permanent.
240 bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb)
242 NdbError error = trans->getNdbError();
243 switch(error.status)
245 case NdbError::Success:
246 return false;
247 break;
249 case NdbError::TemporaryError:
251 * The error code indicates a temporary error.
252 * The application should typically retry.
253 * (Includes classifications: NdbError::InsufficientSpace,
254 * NdbError::TemporaryResourceError, NdbError::NodeRecoveryError,
255 * NdbError::OverloadError, NdbError::NodeShutdown
256 * and NdbError::TimeoutExpired.)
258 * We should sleep for a while and retry, except for insufficient space
260 if(error.classification == NdbError::InsufficientSpace)
261 return false;
262 milliSleep(10);
263 tempErrors++;
264 return true;
265 break;
266 case NdbError::UnknownResult:
267 std::cout << error.message << std::endl;
268 return false;
269 break;
270 default:
271 case NdbError::PermanentError:
272 switch (error.code)
274 case 499:
275 case 250:
276 milliSleep(10);
277 return true; // SCAN errors that can be retried. Requires restart of scan.
278 default:
279 break;
281 //ERROR
282 std::cout << error.message << std::endl;
283 return false;
284 break;
286 return false;
289 static int nPreparedTransactions = 0;
290 static int MAX_RETRIES = 10;
291 static int parallelism = 100;
294 /************************************************************************
295 * populate()
296 * 1. Prepare 'parallelism' number of insert transactions.
297 * 2. Send transactions to NDB and wait for callbacks to execute
299 int populate(Ndb * myNdb, int data, async_callback_t * cbData)
302 NdbOperation* myNdbOperation; // For operations
303 const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
304 const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
305 if (myTable == NULL)
306 APIERROR(myDict->getNdbError());
308 async_callback_t * cb;
309 int retries = 0;
310 int current = 0;
311 for(int i=0; i<1024; i++)
313 if(transaction[i].used == 0)
315 current = i;
316 if (cbData == 0)
319 * We already have a callback
320 * This is an absolutely new transaction
322 cb = new async_callback_t;
323 cb->retries = 0;
325 else
328 * We already have a callback
330 cb =cbData;
331 retries = cbData->retries;
334 * Set data used by the callback
336 cb->ndb = myNdb; //handle to Ndb object so that we can close transaction
337 // in the callback (alt. make myNdb global).
339 cb->data = data; //this is the data we want to insert
340 cb->transaction = current; //This is the number (id) of this transaction
341 transaction[current].used = 1 ; //Mark the transaction as used
342 break;
345 if(!current)
346 return -1;
348 while(retries < MAX_RETRIES)
350 transaction[current].conn = myNdb->startTransaction();
351 if (transaction[current].conn == NULL) {
353 * no transaction to close since conn == null
355 milliSleep(10);
356 retries++;
357 continue;
359 myNdbOperation = transaction[current].conn->getNdbOperation(myTable);
360 if (myNdbOperation == NULL)
362 if (asynchErrorHandler(transaction[current].conn, myNdb))
364 myNdb->closeTransaction(transaction[current].conn);
365 transaction[current].conn = 0;
366 milliSleep(10);
367 retries++;
368 continue;
370 asynchExitHandler(myNdb);
371 } // if
372 if(myNdbOperation->insertTuple() < 0 ||
373 myNdbOperation->equal("REG_NO", data) < 0 ||
374 myNdbOperation->setValue("BRAND", "Mercedes") <0 ||
375 myNdbOperation->setValue("COLOR", "Blue") < 0)
377 if (asynchErrorHandler(transaction[current].conn, myNdb))
379 myNdb->closeTransaction(transaction[current].conn);
380 transaction[current].conn = 0;
381 retries++;
382 milliSleep(10);
383 continue;
385 asynchExitHandler(myNdb);
388 /*Prepare transaction (the transaction is NOT yet sent to NDB)*/
389 transaction[current].conn->executeAsynchPrepare(NdbTransaction::Commit,
390 &callback,
391 cb);
393 * When we have prepared parallelism number of transactions ->
394 * send the transaction to ndb.
395 * Next time we will deal with the transactions are in the
396 * callback. There we will see which ones that were successful
397 * and which ones to retry.
399 if (nPreparedTransactions == parallelism-1)
401 // send-poll all transactions
402 // close transaction is done in callback
403 myNdb->sendPollNdb(3000, parallelism );
404 nPreparedTransactions=0;
406 else
407 nPreparedTransactions++;
408 return 1;
410 std::cout << "Unable to recover from errors. Exiting..." << std::endl;
411 asynchExitHandler(myNdb);
412 return -1;
415 int main(int argc, char** argv)
417 if (argc != 3)
419 std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n";
420 exit(-1);
422 char * mysqld_sock = argv[1];
423 const char *connectstring = argv[2];
424 ndb_init();
425 MYSQL mysql;
427 /**************************************************************
428 * Connect to mysql server and create table *
429 **************************************************************/
431 if ( !mysql_init(&mysql) ) {
432 std::cout << "mysql_init failed\n";
433 exit(-1);
435 if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
436 0, mysqld_sock, 0) )
437 MYSQLERROR(mysql);
439 mysql_query(&mysql, "CREATE DATABASE TEST_DB");
440 if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql);
442 create_table(mysql);
445 /**************************************************************
446 * Connect to ndb cluster *
447 **************************************************************/
448 Ndb_cluster_connection cluster_connection(connectstring);
449 if (cluster_connection.connect(4, 5, 1))
451 std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
452 exit(-1);
454 // Optionally connect and wait for the storage nodes (ndbd's)
455 if (cluster_connection.wait_until_ready(30,0) < 0)
457 std::cout << "Cluster was not ready within 30 secs.\n";
458 exit(-1);
461 Ndb* myNdb = new Ndb( &cluster_connection,
462 "TEST_DB" ); // Object representing the database
463 if (myNdb->init(1024) == -1) { // Set max 1024 parallel transactions
464 APIERROR(myNdb->getNdbError());
468 * Initialise transaction array
470 for(int i = 0 ; i < 10 ; i++)
472 transaction[i].used = 0;
473 transaction[i].conn = 0;
476 int i=0;
478 * Do 10 insert transactions.
480 while(i < 10)
482 while(populate(myNdb,i,0)<0) // <0, no space on free list. Sleep and try again.
483 milliSleep(10);
485 i++;
487 std::cout << "Number of temporary errors: " << tempErrors << std::endl;
488 delete myNdb;
490 drop_table(mysql);