csqlcacheserver reconnect, logger rotate, monitor_server
[csql.git] / src / cache / CacheTableLoader.cxx
blob34a97988d64dc88810e4f6e7fadb0f69a23a13f9
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<os.h>
17 #include<CacheTableLoader.h>
18 #include<TableConfig.h>
19 #include<Util.h>
20 #include<SqlConnection.h>
21 #include<SqlLogConnection.h>
22 #include<SqlStatement.h>
23 #include<SqlFactory.h>
27 DbRetVal CacheTableLoader::checkSecondTimeSqlPrimaryKeys(SQLHSTMT hstmtmeta,char *tableName,char *ptr,HashIndexInitInfo *inf,bool &isPriIndex)
29 int retValue =0;
30 char columnname[IDENTIFIER_LENGTH];
31 retValue=SQLPrimaryKeys(hstmtmeta, NULL, SQL_NTS, NULL, SQL_NTS, (SQLCHAR*) tableName, SQL_NTS);
32 retValue = SQLBindCol(hstmtmeta, 4, SQL_C_CHAR,columnname, 129,NULL);
33 char indname[IDENTIFIER_LENGTH];
34 if(SQLFetch( hstmtmeta ) == SQL_SUCCESS)
36 Util::str_tolower(columnname);
37 inf->list.append(columnname);
38 sprintf(ptr, "%s ", columnname);
39 ptr += strlen(ptr);
40 while ( SQLFetch( hstmtmeta ) == SQL_SUCCESS ) {
41 Util::str_tolower(columnname);
42 inf->list.append(columnname);
43 sprintf(ptr, ", %s ", columnname);
44 ptr += strlen(ptr);
46 sprintf(ptr, ") PRIMARY SIZE 10007;");
47 isPriIndex=true;
49 return OK;
52 DbRetVal CacheTableLoader::load(bool tabDefinition)
54 AbsSqlConnection *conn = SqlFactory::createConnection(CSqlLog);
55 DbRetVal rv = conn->connect(userName, password);
56 if (rv != OK) { delete conn; return ErrSysInit; }
57 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlLog);
58 stmt->setConnection(conn);
59 SqlLogConnection *logConn = (SqlLogConnection *) conn;
60 logConn->setNoMsgLog(true);
61 SqlConnection *con = (SqlConnection *) conn->getInnerConnection();
62 DatabaseManager *dbMgr = con->getConnObject().getDatabaseManager();
63 if (tabDefinition == false) {
64 Table *tbl = dbMgr->openTable(tableName);
65 if (tbl == NULL) {
66 conn->disconnect();
67 delete stmt;
68 delete conn;
69 return ErrNotExists;
71 if (tbl->numTuples()) {
72 printError(ErrNotEmpty, "The table '\%s\' is not empty", tableName);
73 dbMgr->closeTable(tbl);
74 conn->disconnect();
75 delete stmt;
76 delete conn;
77 return ErrNotEmpty;
79 dbMgr->closeTable(tbl);
81 conn->beginTrans();
82 rv = load(conn, stmt, tabDefinition);
83 conn->commit();
84 stmt->free();
85 conn->disconnect();
86 delete stmt;
87 delete conn;
88 return rv;
91 DbRetVal CacheTableLoader::load(AbsSqlConnection *conn, AbsSqlStatement *stmt, bool tabDefinition)
93 char dsn[72];
94 DbRetVal rv = OK;
95 FILE *fp;
96 fp = fopen(Conf :: config.getDsConfigFile(),"r");
97 if(fp==NULL) {
98 printError(ErrSysInit, "csqlds.conf file does not exist");
99 return ErrSysInit;
101 char dsnId[IDENTIFIER_LENGTH]; dsnId[0]='\0';
102 char user[IDENTIFIER_LENGTH]; user[0] = '\0';
103 char passwd[IDENTIFIER_LENGTH]; passwd[0] = '\0';
104 char tdb[IDENTIFIER_LENGTH]; tdb[0]='\0';
106 // STARTs Here:
107 // DSN, user and password value is read here from csql.conf fiel and csqlds.conf file.
109 if(strcmp(dsnName,"")==0) { // it's true if -d option is specified and the DSN value not matched with csql.conf's DSN.
110 strcpy(dsnName, Conf::config.getDSN());
112 bool isDSNExist=false;
113 while(!feof(fp)) {
114 fscanf(fp,"%s %s %s %s\n",dsnId,user,passwd,tdb);
115 if(strcmp(dsnId,dsnName)==0) { // Both the DSN is matched here
116 if( strcmp(user,"NULL")!=0 && strcmp(passwd,"NULL")!=0) {
117 sprintf(dsn,"DSN=%s;UID=%s;PWD=%s;",dsnName,user,passwd);
118 isDSNExist=true;
119 break;
120 } else {
121 sprintf(dsn,"DSN=%s;",dsnName);
122 isDSNExist=true;
123 break;
127 if(!isDSNExist) {
128 printError(ErrNotExists,"Entries is not present in the csqlds.conf file\n");
129 fclose(fp);
130 return ErrNotExists;
132 fclose(fp);
133 TDBInfo tdbName=mysql;
134 if (strcasecmp(tdb,"postgres")==0) tdbName=postgres;
135 else if (strcasecmp(tdb,"postgres")==0) tdbName=mysql;
136 else printError(ErrNotFound,"Target Database Name is not properly set.Tdb name could be mysql, postgres, sybase, db2, oracle\n");
138 logFine(Conf::logger, "TDB Name:%s\n", tdb);
140 //DatabaseManager *dbMgr = (DatabaseManager *) conn->getDatabaseManager();
141 //char dsn[72];
142 SqlConnection *con = (SqlConnection *) conn->getInnerConnection();
143 DatabaseManager *dbMgr = con->getConnObject().getDatabaseManager();
145 SQLCHAR outstr[1024];
146 SQLSMALLINT outstrlen;
147 int retValue =0;
148 SQLHENV henv;
149 SQLHDBC hdbc;
150 SQLHSTMT hstmt;
151 retValue = SQLAllocHandle (SQL_HANDLE_ENV, SQL_NULL_HANDLE, &henv);
152 if (retValue) {
153 printError(ErrSysInit, "Unable to allocate ODBC handle \n");
154 return ErrSysInit;
156 SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (void *) SQL_OV_ODBC3, 0);
157 retValue = SQLAllocHandle (SQL_HANDLE_DBC, henv, &hdbc);
158 if (retValue) {
159 printError(ErrSysInit, "Unable to allocate ODBC handle \n");
160 return ErrSysInit;
162 retValue = SQLDriverConnect(hdbc, NULL, (SQLCHAR*)dsn, SQL_NTS,
163 outstr, sizeof(outstr), &outstrlen,
164 SQL_DRIVER_NOPROMPT);
165 if (SQL_SUCCEEDED(retValue)) {
166 printDebug(DM_Gateway, "Connected to target database using dsn = %s\n", dsn);
167 } else {
168 printError(ErrSysInit, "Failed to connect to target database\n");
169 return ErrSysInit;
172 retValue=SQLAllocHandle (SQL_HANDLE_STMT, hdbc, &hstmt);
173 if (retValue) {
174 printError(ErrSysInit, "Unable to allocate ODBC handle \n");
175 return ErrSysInit;
177 char stmtBuf[1024];
179 if(((strcmp(conditionVal,"")==0) || (strcmp(conditionVal,"NULL")==0)) && ((strcmp(fieldlistVal,"")==0) || (strcmp(fieldlistVal,"NULL")==0)))
181 sprintf(stmtBuf, "SELECT * FROM %s;", tableName);
183 else if(((strcmp(conditionVal,"")!=0) || (strcmp(conditionVal,"NULL")!=0)) && ((strcmp(fieldlistVal,"")==0) || (strcmp(fieldlistVal,"NULL")==0)))
185 sprintf(stmtBuf,"SELECT * FROM %s where %s;",tableName,conditionVal);
188 else if(((strcmp(conditionVal,"")==0) || (strcmp(conditionVal,"NULL")==0)) && ((strcmp(fieldlistVal,"")!=0) || (strcmp(fieldlistVal,"NULL")!=0)))
190 sprintf(stmtBuf,"SELECT %s FROM %s;",fieldlistVal,tableName);
192 else
193 sprintf(stmtBuf,"SELECT %s FROM %s where %s;",fieldlistVal,tableName,conditionVal);
195 retValue = SQLPrepare (hstmt, (unsigned char *) stmtBuf, SQL_NTS);
196 if (retValue) {
197 printError(ErrSysInit, "Unable to Prepare ODBC statement \n");
198 return ErrSysInit;
200 logFinest(Conf::logger, "Cache Table Stmt %s", stmtBuf);
201 if (tabDefinition) {
202 short totalFields=0;
203 retValue = SQLNumResultCols (hstmt, &totalFields);
204 if (retValue) {
205 printError(ErrSysInit, "Unable to retrieve ODBC total columns\n");
206 return ErrSysInit;
208 UWORD icol;
209 UCHAR colName[IDENTIFIER_LENGTH];
210 SWORD colNameMax;
211 SWORD nameLength;
212 SWORD colType;
213 SQLULEN colLength = 0;
214 SWORD scale;
215 SWORD nullable;
216 TableDef tabDef;
217 icol = 1;
218 colNameMax = IDENTIFIER_LENGTH;
219 char columnname[IDENTIFIER_LENGTH];
220 char indexname[IDENTIFIER_LENGTH];
221 short type; short unique;
222 SQLHSTMT hstmtmeta;
223 retValue=SQLAllocHandle (SQL_HANDLE_STMT, hdbc, &hstmtmeta);
224 if (retValue)
226 printError(ErrSysInit, "Unable to allocate ODBC handle \n");
227 return ErrSysInit;
230 retValue=SQLPrimaryKeys(hstmtmeta, NULL, SQL_NTS, NULL, SQL_NTS, (SQLCHAR*) tableName, SQL_NTS);
231 retValue = SQLBindCol(hstmtmeta, 4, SQL_C_CHAR,columnname, 129,NULL);
232 HashIndexInitInfo *inf = new HashIndexInitInfo();
233 char crtIdxStmt[1024];
234 char *name = NULL;
235 char *ptr=crtIdxStmt;
236 sprintf(ptr, "CREATE INDEX %s_PRIMARY on %s ( ", tableName, tableName);
237 ptr += strlen(ptr);
238 bool isPriIndex=false;
239 char indname[IDENTIFIER_LENGTH];
240 if(SQLFetch( hstmtmeta ) == SQL_SUCCESS)
242 Util::str_tolower(columnname);
243 inf->list.append(columnname);
244 sprintf(ptr, "%s ", columnname);
245 ptr += strlen(ptr);
246 while ( SQLFetch( hstmtmeta ) == SQL_SUCCESS ) {
247 Util::str_tolower(columnname);
248 inf->list.append(columnname);
249 sprintf(ptr, ", %s ", columnname);
250 ptr += strlen(ptr);
252 sprintf(ptr, ") PRIMARY SIZE 10007;");
253 inf->indType = hashIndex;
254 inf->bucketSize = 10007;
255 inf->isUnique = true; inf->isPrimary = true;
256 strcpy(inf->tableName, tableName);
257 strcpy(indexname,"PRIMARY");
258 sprintf(indname, "%s_%s", tableName, indexname);
259 isPriIndex=true;
261 bool iskeyfieldExist=false;
262 bool isPKFieldSpecified = false;
263 if((strcmp(fieldName,"")!=0) && (strcmp(fieldName,"NULL")!=0) )
265 isPKFieldSpecified = true;
267 if ( isPriIndex && ( strcmp(fieldlistVal,"")!=0 ) &&
268 ( strcmp(fieldlistVal,"NULL") != 0 )) {
269 inf->list.resetIter();
270 while ( (name=inf->list.nextFieldName()) != NULL) {
271 iskeyfieldExist = TableConf::config.isFieldExist(name);
272 if(!iskeyfieldExist) { break; }
274 } else if (isPriIndex) { iskeyfieldExist = true; }
275 if ( isPKFieldSpecified && !(TableConf::config.isFieldExist(fieldName)) )
277 if ( Conf::config.useTwoWayCache() &&
278 (strcmp(fieldlistVal,"")!=0) &&
279 (strcmp(fieldlistVal,"NULL")!=0))
281 printError(ErrSysInit, "Bidirectional caching should have primary key in %s \n", tableName);
282 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
283 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
284 SQLDisconnect (hdbc);
285 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
286 SQLFreeHandle (SQL_HANDLE_ENV, henv);
287 delete inf;
288 return ErrSysInit;
291 if (!iskeyfieldExist && !isPKFieldSpecified )
293 if(Conf::config.useTwoWayCache())
295 printError(ErrSysInit, "Bidirectional caching fail for no primary key in %s \n", tableName);
296 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
297 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
298 SQLDisconnect (hdbc);
299 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
300 SQLFreeHandle (SQL_HANDLE_ENV, henv);
301 delete inf;
302 return ErrSysInit;
306 /* if(isPriIndex) ;
307 else if (Conf::config.useTwoWayCache() && !iskeyfieldExist) {
308 printError(ErrSysInit, "Bidirectonal caching fail for no primary key in %s \n", tableName);
309 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
310 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
311 SQLDisconnect (hdbc);
312 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
313 SQLFreeHandle (SQL_HANDLE_ENV, henv);
314 return ErrSysInit;
317 bool isKeyFld=false;
318 bool isNullfld=false;
319 bool firstFld = true;
320 char crtTblStmt[1024];
321 ptr = crtTblStmt;
322 sprintf(ptr, "CREATE TABLE %s ( ", tableName);
323 ptr += strlen(ptr);
324 while (icol <= totalFields) {
325 retValue = SQLDescribeCol(hstmt, icol, colName, colNameMax,
326 &nameLength, &colType, &colLength,
327 &scale, &nullable);
328 if (retValue) {
329 printError(ErrSysInit, "Unable to retrieve ODBC column info\n");
330 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
331 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
332 SQLDisconnect (hdbc);
333 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
334 SQLFreeHandle (SQL_HANDLE_ENV, henv);
335 delete inf;
336 return ErrSysInit;
338 Util::str_tolower((char*)colName);
339 printDebug(DM_Gateway, "Describe Column %s %d %d %d %d \n", colName, colType, colLength, scale, nullable);
340 logFinest(Conf::logger, "Describe Column colName:%s colType:%d colLen:%d scale:%d nullable:%d\n", colName, colType, colLength, scale, nullable);
342 icol++;
343 if(strcmp((char*)colName,fieldName)== 0)
345 isKeyFld=true;
346 isNullfld=true;
348 bool isPriFld=false;
349 if (nullable) {
350 inf->list.resetIter();
351 while ((name=inf->list.nextFieldName())!=NULL) {
352 if(0==strcmp((char*)colName,name)) {
353 if (firstFld) {
354 firstFld = false;
355 sprintf(ptr, "%s %s", colName, AllDataType::getSQLString(AllDataType::convertFromSQLType(colType,colLength,scale,tdbName)));
356 ptr += strlen(ptr);
357 if (colType == SQL_CHAR || colType == SQL_VARCHAR || colType == SQL_BINARY)
359 sprintf(ptr, "(%d) NOT NULL",colLength);
360 } else { sprintf(ptr, " NOT NULL"); }
361 ptr += strlen(ptr);
362 } else {
363 sprintf(ptr, ", %s %s", colName, AllDataType::getSQLString(AllDataType::convertFromSQLType(colType,colLength,scale,tdbName)));
364 ptr += strlen(ptr);
365 if (colType == SQL_CHAR || colType == SQL_VARCHAR || colType == SQL_BINARY)
367 sprintf(ptr, "(%d) NOT NULL",colLength);
368 } else { sprintf(ptr, " NOT NULL"); }
369 ptr += strlen(ptr);
371 tabDef.addField((char*)colName, AllDataType::convertFromSQLType(colType,colLength,scale,tdbName), colLength +1, NULL, true);
372 isPriFld=true;
373 break;
376 if(!isPriFld) {
377 if(!isNullfld) {
378 if (firstFld) {
379 firstFld = false;
380 sprintf(ptr, "%s %s", colName, AllDataType::getSQLString(AllDataType::convertFromSQLType(colType,colLength,scale,tdbName)));
381 ptr += strlen(ptr);
382 if (colType == SQL_CHAR || colType == SQL_VARCHAR || colType == SQL_BINARY) {
383 sprintf(ptr, "(%d)",colLength);
384 ptr += strlen(ptr);
386 } else {
387 sprintf(ptr, ", %s %s", colName, AllDataType::getSQLString(AllDataType::convertFromSQLType(colType,colLength,scale,tdbName)));
388 ptr += strlen(ptr);
389 if (colType == SQL_CHAR || colType == SQL_VARCHAR || colType == SQL_BINARY) {
390 sprintf(ptr, "(%d)",colLength);
391 ptr += strlen(ptr);
394 tabDef.addField((char*)colName, AllDataType::convertFromSQLType(colType,colLength,scale,tdbName), colLength+1);
395 } else {
396 if (firstFld) {
397 firstFld = false;
398 sprintf(ptr, "%s %s", colName, AllDataType::getSQLString(AllDataType::convertFromSQLType(colType,colLength,scale,tdbName)));
399 ptr += strlen(ptr);
400 if (colType == SQL_CHAR || colType == SQL_VARCHAR || colType == SQL_BINARY) {
401 sprintf(ptr, "(%d) NOT NULL",colLength);
402 } else { sprintf(ptr, " NOT NULL",colLength); }
403 ptr += strlen(ptr);
404 } else {
405 sprintf(ptr, ", %s %s", colName, AllDataType::getSQLString(AllDataType::convertFromSQLType(colType,colLength,scale,tdbName)));
406 ptr += strlen(ptr);
407 if (colType == SQL_CHAR || colType == SQL_VARCHAR || colType == SQL_BINARY) {
408 sprintf(ptr, "(%d) NOT NULL",colLength);
409 } else { sprintf(ptr, " NOT NULL",colLength); }
410 ptr += strlen(ptr);
412 tabDef.addField((char*)colName, AllDataType::convertFromSQLType(colType,colLength,scale,tdbName), colLength+1, NULL, true);
413 isNullfld=false;
416 } else {
417 if (firstFld) {
418 firstFld = false;
419 sprintf(ptr, "%s %s", colName, AllDataType::getSQLString(AllDataType::convertFromSQLType(colType,colLength,scale,tdbName)));
420 ptr += strlen(ptr);
421 if (colType == SQL_CHAR || colType == SQL_VARCHAR || colType == SQL_BINARY) {
422 sprintf(ptr, "(%d) NOT NULL",colLength);
423 } else { sprintf(ptr, " NOT NULL",colLength); }
424 ptr += strlen(ptr);
425 } else {
426 sprintf(ptr, ", %s %s", colName, AllDataType::getSQLString(AllDataType::convertFromSQLType(colType,colLength, scale, tdbName)));
427 ptr += strlen(ptr);
428 if (colType == SQL_CHAR || colType == SQL_VARCHAR || colType == SQL_BINARY) {
429 sprintf(ptr, "(%d) NOT NULL",colLength);
430 } else { sprintf(ptr, " NOT NULL",colLength); }
431 ptr += strlen(ptr);
433 tabDef.addField((char*)colName, AllDataType::convertFromSQLType(colType,colLength,scale, tdbName), colLength +1, NULL, true);
436 sprintf(ptr, ");");
437 ptr += strlen(ptr);
438 //printf("table stmt '%s'\n", crtTblStmt);
439 if(((strcmp(fieldName,"")!=0) && (strcmp(fieldName,"NULL")!=0))
440 && !isKeyFld) {
441 printError(ErrSysInit, "Unable to cache Table for %s with key field %s\n", tableName,fieldName);
442 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
443 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
444 SQLDisconnect (hdbc);
445 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
446 SQLFreeHandle (SQL_HANDLE_ENV, henv);
447 delete inf;
448 return ErrSysInit;
450 rv = stmt->prepare(crtTblStmt);
451 if (rv != OK) {
452 printError(ErrSysInit, "Unable to prepare create table stmt\n");
453 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
454 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
455 SQLDisconnect (hdbc);
456 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
457 SQLFreeHandle (SQL_HANDLE_ENV, henv);
458 delete inf;
459 return ErrSysInit;
461 int rows = 0;
462 rv = stmt->execute(rows);
463 if (rv != OK) {
464 printError(ErrSysInit, "Unable to execute create table stmt\n");
465 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
466 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
467 SQLDisconnect (hdbc);
468 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
469 SQLFreeHandle (SQL_HANDLE_ENV, henv);
470 delete inf;
471 return ErrSysInit;
473 logFinest(Conf::logger, "Cache Table: Table Created :%s", crtTblStmt);
475 //Table is created.
476 //Create primary key index if present
477 if (isPriIndex && ( iskeyfieldExist ||
478 (strcmp(fieldlistVal,"")==0 || strcmp(fieldlistVal,"NULL")== 0))) {
479 rv = stmt->prepare(crtIdxStmt);
480 if (rv != OK) {
481 printError(ErrSysInit, "Unable to prepare create table stmt\n");
482 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
483 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
484 SQLDisconnect (hdbc);
485 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
486 SQLFreeHandle (SQL_HANDLE_ENV, henv);
487 delete inf;
488 return ErrSysInit;
490 int rows = 0;
491 rv = stmt->execute(rows);
492 if (rv != OK) {
493 printError(ErrSysInit, "Unable to execute create table stmt\n");
494 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
495 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
496 SQLDisconnect (hdbc);
497 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
498 SQLFreeHandle (SQL_HANDLE_ENV, henv);
499 delete inf;
500 return ErrSysInit;
502 //printf("Primary index created from create Index stmt\n");
504 retValue = SQLCloseCursor(hstmtmeta);
505 rv = createIndex(hstmtmeta, tableName, inf, stmt,isPKFieldSpecified);
506 if(rv!=OK) {
507 dbMgr->dropTable(tableName);
508 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
509 SQLDisconnect (hdbc);
510 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
511 SQLFreeHandle (SQL_HANDLE_ENV, henv);
512 //delete inf;
513 return rv;
515 //delete inf;
517 // Now load the table with records
518 char insStmt[1024];
519 char *ptr = insStmt;
520 sprintf(ptr,"INSERT INTO %s VALUES(", tableName);
521 ptr += strlen(ptr);
522 bool firstFld = true;
523 SqlStatement *sqlStmt = (SqlStatement *)stmt->getInnerStatement();
524 sqlStmt->setConnection(con);
525 List fNameList = sqlStmt->getFieldNameList(tableName);
526 int noOfFields = fNameList.size();
528 while (noOfFields--) {
529 if (firstFld) {
530 firstFld = false;
531 sprintf(ptr,"?", tableName);
532 ptr += strlen(ptr);
533 } else {
534 sprintf(ptr, ",?");
535 ptr += strlen(ptr);
538 sprintf(ptr, ");");
539 ptr += strlen(ptr);
540 //printf("insert stmt: '%s'\n", insStmt);
542 rv = stmt->prepare(insStmt);
543 if (rv != OK) {
544 printError(ErrSysInit, "Unable to prepare create table stmt\n");
545 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
546 SQLDisconnect (hdbc);
547 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
548 SQLFreeHandle (SQL_HANDLE_ENV, henv);
549 return ErrSysInit;
551 ListIterator fNameIter = fNameList.getIterator();
552 FieldInfo *info = new FieldInfo();
553 int fcount =1; void *valBuf;
554 Identifier *elem = NULL;
555 void *tembuf=NULL;//For postgre BigInt type
556 BindBuffer *bBuf;
557 List valBufList;
558 SQLINTEGER len[IDENTIFIER_LENGTH];
559 while (fNameIter.hasElement()) {
560 elem = (Identifier*) fNameIter.nextElement();
561 sqlStmt->getFieldInfo(tableName, (const char*)elem->name, info);
562 if (info->type == typeString) {
563 valBuf = AllDataType::alloc(info->type, info->length+1);
564 os::memset(valBuf,0,info->length+1);
565 } else {
566 valBuf = AllDataType::alloc(info->type);
567 os::memset(valBuf,0,AllDataType::size(info->type));
569 switch(info->type)
571 case typeDate:
572 bBuf = new BindBuffer();
573 bBuf->csql = valBuf;
574 bBuf->type = typeDate;
575 bBuf->length = sizeof(DATE_STRUCT);
576 bBuf->targetdb = malloc(bBuf->length);
577 memset(bBuf->targetdb, 0, bBuf->length);
578 valBuf = bBuf->targetdb;
579 valBufList.append(bBuf);
580 break;
581 case typeTime:
582 bBuf = new BindBuffer();
583 bBuf->csql = valBuf;
584 bBuf->type = typeTime;
585 bBuf->length = sizeof(TIME_STRUCT);
586 bBuf->targetdb = malloc(bBuf->length);
587 memset(bBuf->targetdb, 0, bBuf->length);
588 valBuf = bBuf->targetdb;
589 valBufList.append(bBuf);
590 break;
591 case typeTimeStamp:
592 bBuf = new BindBuffer();
593 bBuf->csql = valBuf;
594 bBuf->type = typeTimeStamp;
595 bBuf->length = sizeof(TIMESTAMP_STRUCT);
596 bBuf->targetdb = malloc(bBuf->length);
597 memset(bBuf->targetdb, 0, bBuf->length);
598 valBuf = bBuf->targetdb;
599 valBufList.append(bBuf);
600 break;
601 case typeLongLong:
603 if( tdbName == postgres )
605 bBuf = new BindBuffer();
606 bBuf->type = typeLongLong;
607 bBuf->length = 40;
608 bBuf->csql = valBuf;
609 bBuf->targetdb = AllDataType::alloc(typeString,bBuf->length);
610 memset(bBuf->targetdb, 0, bBuf->length);
611 valBuf = bBuf->targetdb;
612 valBufList.append(bBuf);
613 break;
615 else
617 bBuf = new BindBuffer();
618 bBuf->type = info->type;
619 bBuf->csql = valBuf;
620 valBufList.append(bBuf);
621 bBuf->length = info->length;
622 break;
625 case typeString:
626 if( tdbName != mysql)
628 bBuf = new BindBuffer();
629 bBuf->type = typeString;
630 bBuf->csql = valBuf;
631 bBuf->length = info->length+1;
632 valBufList.append(bBuf);
633 break;
635 default:
636 bBuf = new BindBuffer();
637 bBuf->type = info->type;
638 bBuf->csql = valBuf;
639 valBufList.append(bBuf);
640 bBuf->length = info->length;
641 break;
643 //os::memset(valBuf,0,bBuf->length);
644 retValue = SQLBindCol (hstmt, fcount, AllDataType::convertToSQL_C_Type(info->type,tdbName), valBuf, bBuf->length, &len[fcount]);
645 fcount++;
646 if (retValue) {
647 printError(ErrSysInit, "Unable to bind columns in ODBC\n");
648 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
649 SQLDisconnect (hdbc);
650 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
651 SQLFreeHandle (SQL_HANDLE_ENV, henv);
652 return ErrSysInit;
655 delete info;
656 fNameIter.reset();
657 while (fNameIter.hasElement())
658 delete ((FieldName *) fNameIter.nextElement());
659 fNameList.reset();
661 retValue = SQLExecute (hstmt);
662 if (retValue) {
663 printError(ErrSysInit, "Unable to execute ODBC statement\n");
664 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
665 SQLDisconnect (hdbc);
666 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
667 SQLFreeHandle (SQL_HANDLE_ENV, henv);
668 return ErrSysInit;
670 int fldpos=0;
671 int countForCommit = 0;
672 while(true) {
673 //TODO: if SQLFetch return other than record not found error
674 //it should drop the table
675 retValue = SQLFetch (hstmt);
676 if (retValue) break;
677 ListIterator bindIter = valBufList.getIterator();
678 fldpos = 0;
679 while (bindIter.hasElement()) {
680 bBuf = (BindBuffer*) bindIter.nextElement();
681 switch (bBuf->type) {
682 case typeString:
684 if( tdbName != mysql)
686 Util::trimRight((char*)bBuf->csql);
688 break;
690 case typeDate:
692 Date *dtCSQL = (Date*) bBuf->csql;
693 DATE_STRUCT *dtTarget = (DATE_STRUCT*) bBuf->targetdb;
694 dtCSQL->set(dtTarget->year,dtTarget->month,dtTarget->day);
695 break;
697 case typeTime:
699 Time *dtCSQL = (Time*) bBuf->csql;
700 TIME_STRUCT *dtTarget = (TIME_STRUCT*) bBuf->targetdb;
701 dtCSQL->set(dtTarget->hour,dtTarget->minute,dtTarget->second);
702 break;
704 case typeTimeStamp:
706 TimeStamp *dtCSQL = (TimeStamp*) bBuf->csql;
707 TIMESTAMP_STRUCT *dtTarget = (TIMESTAMP_STRUCT*) bBuf->targetdb;
708 dtCSQL->setDate(dtTarget->year,dtTarget->month,dtTarget->day);
709 dtCSQL->setTime(dtTarget->hour,dtTarget->minute,dtTarget->second, dtTarget->fraction);
710 break;
712 case typeLongLong:
714 if ( tdbName == postgres) {
715 sscanf((const char*)bBuf->targetdb,"%lld",(long long*) bBuf->csql);
717 break;
720 setParamValues(stmt, ++fldpos, bBuf->type, bBuf->length, (char *) bBuf->csql);
722 fldpos=0;
723 //table->resetNullinfo();
724 while(fldpos < fcount-1) {
725 if(len[++fldpos] == SQL_NULL_DATA) {
726 stmt->setNull(fldpos);
729 int rows = 0;
730 rv = stmt->execute(rows);
731 if (rv != OK) {
732 printError(ErrSysInit, "Unable to cache record in CSQL.\n");
733 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
734 SQLDisconnect (hdbc);
735 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
736 SQLFreeHandle (SQL_HANDLE_ENV, henv);
737 return ErrSysInit;
739 countForCommit++;
740 if (countForCommit == 1000) {
741 countForCommit = 0;
742 conn->commit();
743 conn->beginTrans();
746 //TODO::leak:: valBufList and its targetdb buffer
747 ListIterator it = valBufList.getIterator();
748 while(it.hasElement()) {
749 BindBuffer *bb = (BindBuffer *) it.nextElement();
750 if (bb->csql) { free(bb->csql); bb->csql = NULL; }
751 if (bb->targetdb) { free(bb->targetdb); bb->targetdb = NULL; }
752 delete bb; bb = NULL;
754 valBufList.reset();
755 SQLCloseCursor (hstmt);
756 SQLFreeHandle (SQL_HANDLE_STMT, hstmt);
757 SQLDisconnect (hdbc);
758 SQLFreeHandle (SQL_HANDLE_DBC, hdbc);
759 SQLFreeHandle (SQL_HANDLE_ENV, henv);
760 return OK;
763 DbRetVal CacheTableLoader::reload()
765 FILE *fp=NULL;
766 DbRetVal rv = unload(false);
767 if (rv != OK) return rv;
768 //get table cache senarios
769 fp = fopen(Conf::config.getTableConfigFile(),"r");
770 if( fp == NULL ) {
771 printError(ErrSysInit, "cachetable.conf file does not exist");
772 return OK;
774 int mode;
775 rv = OK;
776 char tablename[IDENTIFIER_LENGTH];
777 char fieldname[IDENTIFIER_LENGTH];
778 char field[IDENTIFIER_LENGTH];
779 char condition[IDENTIFIER_LENGTH];
780 char dsnname[IDENTIFIER_LENGTH];
781 while(!feof(fp))
783 fscanf(fp, "%d %s %s %s %s %s\n", &mode, tablename,fieldname,condition,field,dsnname);
784 if(strcmp(tablename,tableName)==0) break;
786 fclose(fp);
787 setCondition(TableConf::config.getRealConditionFromFile(condition));
788 setFieldName(fieldname);
789 setFieldListVal(field);
790 setDsnName(dsnname);
791 rv = load(false);
792 return rv;
795 DbRetVal CacheTableLoader::unload(bool tabDefinition)
797 AbsSqlConnection *conn = SqlFactory::createConnection(CSqlLog);
798 DbRetVal rv = conn->connect(userName, password);
799 if (rv != OK) return ErrSysInit;
800 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlLog);
801 stmt->setConnection(conn);
802 SqlLogConnection *logConn = (SqlLogConnection *) conn;
803 logConn->setNoMsgLog(true);
804 char statement[1024];
805 if (TableConf::config.isTableCached(tableName) != OK) {
806 printError(ErrNotCached, "The table \'%s\' is not cached", tableName);
807 conn->disconnect();
808 delete stmt;
809 delete conn;
810 return ErrNotCached;
812 SqlConnection *con = (SqlConnection *) conn->getInnerConnection();
813 DatabaseManager *dbMgr = (DatabaseManager*) con->getConnObject().getDatabaseManager();
814 if (dbMgr == NULL) {
815 conn->disconnect();
816 delete stmt; delete conn;
817 printError(ErrSysInit, "Authentication failed\n");
818 return ErrSysInit;
820 if (!tabDefinition)
822 sprintf(statement, "DELETE FROM %s;", tableName);
823 SqlStatement *sqlStmt = (SqlStatement*)stmt;
824 sqlStmt->setLoading(true);
825 rv = stmt->prepare(statement);
826 if (rv != OK) {
827 conn->disconnect();
828 delete stmt; delete conn;
829 return ErrBadCall;
831 conn->beginTrans();
832 int rows = 0;
833 rv = stmt->execute(rows);
834 if (rv != OK) {
835 conn->disconnect();
836 delete stmt; delete conn;
837 return ErrBadCall;
839 conn->commit();
841 else
843 rv = TableConf::config.removeFromCacheTableFile();
844 if (rv != OK) {
845 conn->disconnect(); delete stmt; delete conn;
846 return ErrBadCall;
848 sprintf(statement, "DROP TABLE %s;", tableName);
849 SqlStatement *sqlStmt = (SqlStatement*)stmt;
850 sqlStmt->setLoading(true);
851 rv = stmt->prepare(statement);
852 if (rv != OK) {
853 //TableConf::config.addToCacheTableFile(false);
854 conn->disconnect();
855 delete stmt; delete conn;
856 return ErrBadCall;
858 int rows = 0;
859 rv = stmt->execute(rows);
860 if (rv != OK) {
861 //TableConf::config.addToCacheTableFile(false);
862 conn->disconnect(); delete stmt; delete conn;
863 return ErrBadCall;
866 conn->disconnect();
867 delete stmt; delete conn;
868 logFine(Conf::logger, "Unloaded Cached Table: %s", tableName);
869 return rv;
872 DbRetVal CacheTableLoader::refresh()
874 return OK;
877 DbRetVal CacheTableLoader::recoverAllCachedTables()
879 FILE *fp;
880 Connection conn;
881 DbRetVal rv = conn.open(userName, password);
882 if(rv !=OK) return ErrSysInit;
884 //Note: if connection is not open, configuration veriables may be incorrect
886 fp = fopen(Conf::config.getTableConfigFile(),"r");
887 if( fp == NULL ) {
888 printError(ErrSysInit, "cachetable.conf file does not exist");
889 conn.close();
890 return OK;
892 conn.close();
893 //TODO::take exclusive lock on database
894 char tablename[IDENTIFIER_LENGTH];
895 char fieldname[IDENTIFIER_LENGTH];
896 char condition[IDENTIFIER_LENGTH];
897 char field[IDENTIFIER_LENGTH];
898 char dsnname[IDENTIFIER_LENGTH];
900 int mode;
901 int scanItems=0;
902 rv = OK;
903 while(!feof(fp))
905 scanItems = fscanf(fp, "%d %s %s %s %s %s\n", &mode, tablename,fieldname,condition,field,dsnname);
906 if (scanItems != 6) {
907 tablename[0]='\0';
908 printf("There is no table to be cached.\n");
909 return OK;
911 //if (mode ==2 ) //just replicated table and not cached
912 //continue;
913 printDebug(DM_Gateway, "Recovering Table from target db: %s\n", tablename);
914 setCondition(TableConf::config.getRealConditionFromFile(condition));
915 if( (strcmp(Conf::config.getDSN(),dsnname)!=0) ){
916 setDsnName(dsnname);
917 setTable(tablename);
918 setFieldName(fieldname);
919 setFieldListVal(field);
920 printf("Recovering table %s %s %s\n", tablename,condition,field);
921 rv = load();
922 if (rv != OK) { fclose(fp); return rv; }
923 } else {
924 setDsnName(Conf::config.getDSN());
925 setTable(tablename);
926 setFieldName(fieldname);
927 setFieldListVal(field);
928 printf("Recovering table %s %s %s\n", tablename,condition,field);
929 rv = load();
930 if (rv != OK) { fclose(fp); return rv; }
932 logFine(Conf::logger, "Recovering Table from target db:%s", tablename);
934 fclose(fp);
935 return OK;
938 void CacheTableLoader::setParamValues(AbsSqlStatement *stmt, int parampos, DataType type, int length, char *value)
940 switch(type)
942 case typeInt:
943 stmt->setIntParam(parampos, *(int*)value);
944 break;
945 case typeLong:
946 stmt->setLongParam(parampos, *(long*)value);
947 break;
948 case typeLongLong:
949 stmt->setLongLongParam(parampos, *(long long*)value);
950 break;
951 case typeShort:
952 stmt->setShortParam(parampos, *(short*)value);
953 break;
954 case typeByteInt:
955 stmt->setByteIntParam(parampos, *(char*)value);
956 break;
957 case typeDouble:
958 stmt->setDoubleParam(parampos, *(double*)value);
959 break;
960 case typeFloat:
961 stmt->setFloatParam(parampos, *(float*)value);
962 break;
963 case typeDate:
964 stmt->setDateParam(parampos, *(Date*)value);
965 break;
966 case typeTime:
967 stmt->setTimeParam(parampos, *(Time*)value);
968 break;
969 case typeTimeStamp:
970 stmt->setTimeStampParam(parampos, *(TimeStamp*)value);
971 break;
972 case typeString:
974 char *d =(char*)value;
975 d[length-1] = '\0';
976 stmt->setStringParam(parampos, (char*)value);
977 break;
979 case typeBinary:
980 stmt->setBinaryParam(parampos, (char *) value, length);
981 break;
983 return;
986 DbRetVal CacheTableLoader::createIndex(SQLHSTMT hstmtmeta, char *tableName, HashIndexInitInfo *inf,AbsSqlStatement *stmt,bool isPKFieldSpecified)
988 bool isKeyFld= false;
989 int retValue = 0;
990 char columnname[IDENTIFIER_LENGTH];
991 char indexname[IDENTIFIER_LENGTH];
992 short type;
993 short unique;
994 char *name = NULL;
995 DbRetVal rv = OK;
996 retValue = SQLStatistics(hstmtmeta, NULL, 0, NULL, SQL_NTS,
997 (SQLCHAR*) tableName, SQL_NTS, SQL_INDEX_ALL, SQL_QUICK);
998 retValue = SQLBindCol(hstmtmeta, 4, SQL_C_SHORT,
999 &unique, 2, NULL);
1000 retValue = SQLBindCol(hstmtmeta, 6, SQL_C_CHAR,
1001 indexname, 129, NULL);
1002 retValue = SQLBindCol(hstmtmeta, 7, SQL_C_SHORT,
1003 &type, 2, NULL);
1004 retValue = SQLBindCol(hstmtmeta, 9, SQL_C_CHAR,
1005 columnname, 129,NULL);
1006 List indexList;
1007 bool isSecondTime = false;
1008 CacheIndexInfo *info=NULL;
1009 while ((retValue = SQLFetch(hstmtmeta)) == SQL_SUCCESS) {
1010 //if (type != SQL_TABLE_STAT)
1012 printDebug(DM_Gateway, "Column: %-18s Index Name: %-18s unique:%hd type:%hd\n", columnname, indexname, unique, type);
1015 if (type == 3)
1018 bool isFldAdd = false;
1019 ListIterator iter = indexList.getIterator();
1020 iter.reset();
1021 while (iter.hasElement())
1023 CacheIndexInfo *indInfo = (CacheIndexInfo *)iter.nextElement();
1024 if(0 == strcmp( indInfo->indexName, indexname))
1026 indInfo->fieldList.append(columnname);
1027 isFldAdd = true;
1030 if(!isFldAdd){
1031 info = new CacheIndexInfo();
1032 info->fieldList.append(columnname);
1033 strcpy(info->indexName, indexname);
1034 indexList.append(info);
1035 isSecondTime = true;
1040 ListIterator iter = indexList.getIterator();
1041 iter.reset();
1042 int noOfPkfield = inf->list.size();
1043 char *fName=NULL;
1044 char *cptr = NULL;
1045 while (iter.hasElement())
1047 cptr = columnname;
1048 bool isFieldExistInCondition = false;
1049 bool isPrimary=false;
1050 CacheIndexInfo *indInfo = (CacheIndexInfo *)iter.nextElement();
1051 int noOfFld= indInfo->fieldList.size();
1052 indInfo->fieldList.resetIter();
1053 while ((fName = indInfo->fieldList.nextFieldName())!=NULL)
1055 if(( 1 == noOfFld) && (0 == strcmp(fName,fieldName))) { isKeyFld=true; }
1056 inf->list.resetIter();
1057 while ((name=inf->list.nextFieldName())!=NULL)
1059 if(0==strcmp(fName,name)) { isPrimary = true; break; }
1060 isPrimary = false;
1062 if (!TableConf::config.isFieldExist(fName) && ( (strcmp(fieldlistVal,"")!=0) && (strcmp(fieldlistVal,"NULL")!=0) ))
1064 isFieldExistInCondition =true;
1065 continue;
1067 sprintf(cptr, "%s ,",fName);
1068 cptr += strlen(cptr);
1071 if(isFieldExistInCondition) continue;
1072 cptr -=1;
1073 *cptr = '\0';
1075 if (isPrimary) { continue; }
1076 char crtIdxStmt[1024];
1077 char indname[128];
1078 sprintf(indname, "%s_%s", tableName, indInfo->indexName);
1079 sprintf(crtIdxStmt, "CREATE INDEX %s on %s(%s) HASH SIZE 10007;", indname, tableName, columnname);
1080 //printf("create index stmt \n'%s'\n", crtIdxStmt);
1081 rv = stmt->prepare(crtIdxStmt);
1082 if (rv != OK) {
1083 printError(ErrSysInit, "Unable to prepare create table stmt\n");
1084 return ErrSysInit;
1086 int rows = 0;
1087 rv = stmt->execute(rows);
1088 if (rv != OK) {
1089 printError(ErrSysInit, "Unable to execute create table stmt\n");
1090 return ErrSysInit;
1092 delete indInfo;
1093 }// while meta data fetch for index creation
1094 delete inf;
1095 SQLCloseCursor (hstmtmeta);
1096 SQLFreeHandle (SQL_HANDLE_STMT, hstmtmeta);
1097 if( !isKeyFld && isPKFieldSpecified) {
1098 if(shouldForce) {
1099 char frcIndStmt[1024];
1100 char indname[128];
1101 sprintf(indname, "%s_%s", tableName, "keyInd");
1102 sprintf(frcIndStmt, "CREATE INDEX %s on %s(%s) HASH;", indname, tableName, fieldName);
1103 rv = stmt->prepare(frcIndStmt);
1104 if (rv != OK) {
1105 printError(ErrSysInit, "Unable to prepare create table stmt\n");
1106 return ErrSysInit;
1108 int rows = 0;
1109 rv = stmt->execute(rows);
1110 if (rv != OK) {
1111 printError(ErrSysInit, "Unable to execute create table stmt\n");
1112 return ErrSysInit;
1114 } else {
1115 printError(ErrSysInit, "Unable to cache Table for %s with key field %s\n", tableName,fieldName);
1116 return ErrSysInit;
1119 return OK;