changing max size to fit default system shm max size
[csql.git] / src / tools / csqlcacheserver.cxx
blob4349f6c8d2501d715e1dde809023f5f847ce8b52
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include <AbsSqlConnection.h>
17 #include <AbsSqlStatement.h>
18 #include <SqlLogConnection.h>
19 #include <SqlLogStatement.h>
20 #include <SqlOdbcStatement.h>
21 #include <SqlFactory.h>
22 #include <SqlConnection.h>
23 #include <SqlStatement.h>
24 #include <CSql.h>
25 #include <CacheTableLoader.h>
27 #define STMTBUCKETS dsnThrInfo->stmtBuckets
28 #define CSQLCONNECT dsnThrInfo->csqlcon
29 #define TRDBCONNECT dsnThrInfo->targetcon
30 #define CACHELIST dsnThrInfo->cacheTableList
31 #define DSN dsnThrInfo->dsn
32 #define TSELSTMT dsnThrInfo->targetSelStmt
33 #define TDELSTMT dsnThrInfo->targetDelStmt
34 #define TABLENAME_ARRAY dsnThrInfo->tableName
35 #define PKID_ARRAY dsnThrInfo->pkid
36 #define OPERATION_ARRAY dsnThrInfo->operation
37 #define CACHEID_ARRAY dsnThrInfo->cacheid
38 #define AUTOID_ARRAY dsnThrInfo->autoid
40 #define PKID PKID_ARRAY[row]
41 #define OPERATION OPERATION_ARRAY[row]
42 #define CACHEID CACHEID_ARRAY[row]
43 #define AUTOID AUTOID_ARRAY[row]
45 typedef class CachedTableStmtNode
47 public:
48 char tableName[IDENTIFIER_LENGTH];
49 AbsSqlStatement *adptStmt;
50 AbsSqlStatement *insStmt;
51 AbsSqlStatement *delStmt;
52 CachedTableStmtNode(const char *tname, AbsSqlStatement *ast, AbsSqlStatement *ist)
54 strcpy(tableName, tname); adptStmt = ast;
55 insStmt = ist; delStmt = NULL;
57 CachedTableStmtNode(const char *tname, AbsSqlStatement *dst)
59 strcpy(tableName, tname); adptStmt = NULL;
60 insStmt = NULL; delStmt = dst;
62 ~CachedTableStmtNode()
64 if (insStmt) { insStmt->free(); delete insStmt; }
65 if (delStmt) { delStmt->free(); delete delStmt; }
66 if (adptStmt) { adptStmt->free(); delete adptStmt; }
68 } CTStmtNode;
70 int insert(char *table, int pkid, void *thrInfo);
71 int remove(char *table, int pkid, void *thrInfo);
72 int getRecordsFromTargetDb(void *thrInfo);
73 void createCacheTableList(AbsSqlConnection *tcon, List *cacheTableList);
74 DbRetVal getPKFieldName(char *tblName,char *fldName, List *cacheTableList);
75 DbRetVal getCacheField(char *tblName,char *fldName, List *cacheTableList);
76 DbRetVal getCacheProjField(char *tblName,char *fielflist, List *cacheTableList);
77 DbRetVal getCacheCondition(char *tblName,char *condition, List *cacheTableList);
78 void *fillBindBuffer(TDBInfo tName, DataType type, void *valBuf, int length=0);
79 int srvStop =0;
80 static void sigTermHandler(int sig)
82 printf("Received signal %d\nStopping the server\n", sig);
83 srvStop = 1;
86 void printUsage()
88 printf("Usage: csqlcacheserver \n");
89 printf("Description: Start the csql caching server.\n");
90 return;
93 void addToHashTable(char *tableName, AbsSqlStatement *adHdl, AbsSqlStatement *insHdl, void *stmtBuckets)
95 unsigned int hval = Util::hashString(tableName);
96 int bucketNo = hval % STMT_BUCKET_SIZE;
97 StmtBucket *buck = (StmtBucket *) stmtBuckets;
98 StmtBucket *stmtBucket = &buck[bucketNo];
99 CTStmtNode *node = new CTStmtNode(tableName, adHdl, insHdl);
100 stmtBucket->bucketList.append(node);
101 return;
104 void addToHashTable(char *tableName, AbsSqlStatement *delHdl, void *stmtBuckets)
106 unsigned int hval = Util::hashString(tableName);
107 int bucketNo = hval % STMT_BUCKET_SIZE;
108 StmtBucket *buck = (StmtBucket *) stmtBuckets;
109 StmtBucket *stmtBucket = &buck[bucketNo];
110 CTStmtNode *node = new CTStmtNode(tableName, delHdl);
111 stmtBucket->bucketList.append(node);
112 return;
115 void removeFromHashTable(char *tableName, void *stmtBuckets)
117 unsigned int hval = Util::hashString(tableName);
118 int bucketNo = hval % STMT_BUCKET_SIZE;
119 StmtBucket *buck = (StmtBucket *) stmtBuckets;
120 StmtBucket *stmtBucket = &buck[bucketNo];
121 CTStmtNode *node = NULL, *delNode = NULL;
122 ListIterator it = stmtBucket->bucketList.getIterator();
123 while(it.hasElement()) {
124 node = (CTStmtNode *) it.nextElement();
125 if(strcmp(node->tableName, tableName) == 0) { delNode = node; break; }
127 it.reset();
128 if (delNode != NULL) {
129 stmtBucket->bucketList.remove(delNode);
130 delete delNode;
132 return;
135 CTStmtNode *getStmtFromHashTable(char *tableName, void *stmtBuckets)
137 unsigned int hval = Util::hashString(tableName);
138 int bucketNo = hval % STMT_BUCKET_SIZE;
139 StmtBucket *buck = (StmtBucket *) stmtBuckets;
140 StmtBucket *stmtBucket = &buck[bucketNo];
141 if (stmtBucket == NULL) return NULL;
142 CTStmtNode *node = NULL;
143 ListIterator it = stmtBucket->bucketList.getIterator();
144 while(it.hasElement()) {
145 node = (CTStmtNode *) it.nextElement();
146 if(strcmp(node->tableName, tableName) == 0) return node;
148 return NULL;
151 void freeAllStmtHandles(void *stmtBuckets)
153 if (NULL == stmtBuckets) return;
154 StmtBucket *buck = (StmtBucket *) stmtBuckets;
155 CTStmtNode *node = NULL;
156 for (int i=0; i <STMT_BUCKET_SIZE; i++)
158 StmtBucket *stmtBucket = &buck[i];
159 if (stmtBucket == NULL) continue;
160 ListIterator it = stmtBucket->bucketList.getIterator();
161 while(it.hasElement()) {
162 node = (CTStmtNode *)it.nextElement();
163 delete node;
165 stmtBucket->bucketList.reset();
167 ::free(stmtBuckets);
170 //MultiDSN Section
171 class DsnThrInput
173 public:
174 char dsn[IDENTIFIER_LENGTH];
175 char tdb[IDENTIFIER_LENGTH];
176 char uname[IDENTIFIER_LENGTH];
177 char pname[IDENTIFIER_LENGTH];
178 void *stmtBuckets;
179 AbsSqlConnection *csqlcon;
180 AbsSqlConnection *targetcon;
181 AbsSqlStatement *targetSelStmt;
182 AbsSqlStatement *targetDelStmt;
183 List cacheTableList;
184 //Arrays for result set fetch
185 char **tableName;
186 int *pkid;
187 int *operation;
188 int *cacheid;
189 int *autoid;
190 DsnThrInput *next;
191 DsnThrInput()
193 dsn[0]='\0'; tdb[0]='\0'; uname[0]='\0'; pname[0]='\0'; next = NULL;
194 stmtBuckets = NULL; cacheTableList.init();
195 csqlcon = NULL; targetcon = NULL;
196 targetSelStmt = NULL; targetDelStmt = NULL;
197 tableName = NULL; pkid = NULL; operation = NULL; cacheid = NULL;
198 autoid = NULL;
200 ~DsnThrInput()
202 if (tableName) free(tableName);
203 if (pkid) free(pkid);
204 if (operation) free(operation);
205 if (cacheid) free(cacheid);
206 if (autoid) free(autoid);
210 void *startThread(void *p);// Function is used for Thread
211 DsnThrInput **multiDsnArray;
213 int main(int argc, char **argv)
215 int c = 0, opt = 0;
216 while ((c = getopt(argc, argv, "?")) != EOF)
218 switch (c)
220 case '?' : { opt = 10; break; } //print help
221 default: opt=10;
224 }//while options
226 if (opt == 10) {
227 printUsage();
228 return 0;
231 os::signal(SIGINT, sigTermHandler);
232 os::signal(SIGTERM, sigTermHandler);
234 Conf::config.readAllValues(os::getenv("CSQL_CONFIG_FILE"));
236 DbRetVal rv=OK;
238 // Reading "csqlds.conf file"
239 FILE *fp = NULL;
240 fp = fopen(Conf::config.getDsConfigFile(),"r");
241 if(fp==NULL){
242 printError(ErrSysInit,"csqlds.conf file does not exist");
243 exit(1);
245 struct DsnThrInput *head=NULL, *pnode=NULL;
247 char dsnname[IDENTIFIER_LENGTH];dsnname[0]='\0';
248 char tdbname[IDENTIFIER_LENGTH];tdbname[0] = '\0';
249 char username[IDENTIFIER_LENGTH];username[0]='\0';
250 char password[IDENTIFIER_LENGTH];password[0]='\0';
251 int totalDsn=0;
253 // Populate the List
254 while (!feof(fp)) {
255 int inputItems = fscanf(fp,"%s %s %s %s\n",
256 dsnname,username,password,tdbname);
257 if (inputItems != 4) {
258 printError(ErrNotExists, "No Entry found in csqlds.conf file");
259 return 1;
261 DsnThrInput *dsnThrInput = new DsnThrInput();
262 totalDsn++;
263 strcpy(dsnThrInput->dsn,dsnname);
264 strcpy(dsnThrInput->uname,username);
265 strcpy(dsnThrInput->pname,password);
266 strcpy(dsnThrInput->tdb,tdbname);
267 dsnThrInput->next=NULL;
269 if(pnode==NULL) {head=dsnThrInput; pnode=dsnThrInput;}
270 else { pnode->next=dsnThrInput; pnode=pnode->next; }
272 fclose(fp);
274 bool singleThread = (totalDsn == 1);
276 pthread_t *thrId = NULL;
278 pnode=head;
280 if (!singleThread) {
281 thrId =new pthread_t [totalDsn];
282 multiDsnArray = (DsnThrInput **)
283 malloc (sizeof(DsnThrInput *) * totalDsn);
284 for (int i = 0; i < totalDsn; i++) {
285 multiDsnArray[i] = pnode;
286 pthread_create(&thrId[i], NULL, &startThread, multiDsnArray[i]);
287 pnode=pnode->next;
289 // Pthread_join
290 for(int i=0; i<totalDsn; i++) pthread_join(thrId[i], NULL);
291 } else {
292 startThread(pnode);
295 printf("Cache Server Exiting\n");
296 if (!singleThread) {
297 for (int i = 0; i < totalDsn; i++) delete multiDsnArray[i];
298 free (multiDsnArray);
299 delete [] thrId;
300 } else { if (pnode) delete pnode; }
301 return 0;
304 // Function for THreads
305 void *startThread(void *thrInfo)
307 DsnThrInput *dsnThrInfo = (DsnThrInput *)thrInfo;
308 STMTBUCKETS = malloc (STMT_BUCKET_SIZE * sizeof(StmtBucket));
309 memset(STMTBUCKETS, 0, STMT_BUCKET_SIZE * sizeof(StmtBucket));
310 DbRetVal rv = OK;
311 CSQLCONNECT = SqlFactory::createConnection(CSqlLog);
312 SqlLogConnection *logConn = (SqlLogConnection *) CSQLCONNECT;
313 logConn->setNoMsgLog(true);
314 rv = CSQLCONNECT->connect(I_USER, I_PASS);
315 if (rv != OK) return NULL;
316 TRDBCONNECT = SqlFactory::createConnection(CSqlAdapter);
317 SqlOdbcConnection *dsn = (SqlOdbcConnection*) TRDBCONNECT;
318 dsn->setDsName(DSN);//line added
319 struct timeval timeout, tval;
320 timeout.tv_sec = Conf::config.getCacheWaitSecs();
321 timeout.tv_usec = 0;
322 CACHELIST.init();
323 reconnect:
324 while(!srvStop) {
325 rv = TRDBCONNECT->connect(I_USER, I_PASS);
326 if (rv != OK) {
327 printError(ErrSysInternal, "Unable to connect to target database:%s", DSN);
328 tval.tv_sec = timeout.tv_sec;
329 tval.tv_usec = timeout.tv_usec;
330 os::select(0, 0, 0, 0, &tval);
331 } else break;
332 if (srvStop) {
333 CSQLCONNECT->disconnect(); delete CSQLCONNECT;
334 TRDBCONNECT->disconnect(); delete TRDBCONNECT;
335 return NULL;
338 if (srvStop) {
339 CSQLCONNECT->disconnect(); delete CSQLCONNECT;
340 TRDBCONNECT->disconnect(); delete TRDBCONNECT;
341 return NULL;
343 if (!Conf::config.useCache())
345 printError(ErrSysInternal, "Cache is set to OFF in csql.conf file\n");
346 CSQLCONNECT->disconnect(); delete CSQLCONNECT;
347 TRDBCONNECT->disconnect(); delete TRDBCONNECT;
348 return NULL;
351 int ret = 0;
352 struct stat ofstatus,nfstatus;
353 ret=stat(Conf::config.getTableConfigFile(),&ofstatus);
354 createCacheTableList(TRDBCONNECT, &CACHELIST);
355 while(!srvStop)
357 tval.tv_sec = timeout.tv_sec;
358 tval.tv_usec = timeout.tv_usec;
359 ret = os::select(0, 0, 0, 0, &tval);
360 printf("Checking for cache updates\n");
361 if (srvStop) break;
362 ret=stat(Conf::config.getTableConfigFile(),&nfstatus);
363 if(ofstatus.st_mtime != nfstatus.st_mtime)
365 ListIterator it = CACHELIST.getIterator();
366 while (it.hasElement()) delete it.nextElement();
367 CACHELIST.reset();
368 createCacheTableList(TRDBCONNECT, &CACHELIST);
369 ofstatus.st_mtime = nfstatus.st_mtime;
371 if((ret = getRecordsFromTargetDb(thrInfo)) == 1) {
372 if (srvStop) break;
373 TRDBCONNECT->disconnect();
374 ListIterator it = CACHELIST.getIterator();
375 while (it.hasElement()) delete it.nextElement();
376 CACHELIST.reset();
377 goto reconnect;
381 freeAllStmtHandles(STMTBUCKETS);
382 TRDBCONNECT->disconnect(); delete TRDBCONNECT;
383 CSQLCONNECT->disconnect();
384 delete CSQLCONNECT;
386 ListIterator it = CACHELIST.getIterator();
387 while (it.hasElement()) delete it.nextElement();
388 CACHELIST.reset();
389 return NULL;
392 int getRecordsFromTargetDb(void *thrInfo)
394 DsnThrInput *dsnThrInfo = (DsnThrInput *)thrInfo;
395 int rows =0;
396 DbRetVal rv = OK;
397 int ret =0;
398 char StmtStr[1024];
399 int cacheId = Conf::config.getSiteID();
400 TSELSTMT = SqlFactory::createStatement(CSqlAdapter);
401 TSELSTMT->setConnection(TRDBCONNECT);
402 TDELSTMT = SqlFactory::createStatement(CSqlAdapter);
403 TDELSTMT->setConnection(TRDBCONNECT);
404 //rv = delstmt->prepare("DELETE from csql_log_int where id=?;");
405 sprintf(StmtStr, "SELECT * FROM csql_log_int where cacheid = %d;", cacheId);
407 SqlOdbcStatement *oselstmt = (SqlOdbcStatement *) TSELSTMT;
408 rv = oselstmt->prepareForResultSet(StmtStr);
409 if (rv != OK) {
410 printError(ErrSysInternal, "Statement prepare failed. TDB may be down");
411 return 1;
414 int noOfRowsFetched = 0;
415 int nLogRecords = Conf::config.getNoOfRowsToFetchFromTDB();
416 if (TABLENAME_ARRAY == NULL) {
417 TABLENAME_ARRAY = (char **) malloc(nLogRecords * IDENTIFIER_LENGTH);
418 PKID_ARRAY = (int *) malloc(nLogRecords * sizeof(int));
419 OPERATION_ARRAY = (int *) malloc(nLogRecords * sizeof(int));
420 CACHEID_ARRAY = (int *) malloc(nLogRecords * sizeof(int));
421 AUTOID_ARRAY = (int *) malloc(nLogRecords * sizeof(int));
423 memset(TABLENAME_ARRAY, 0, nLogRecords * IDENTIFIER_LENGTH);
424 memset(PKID_ARRAY, 0, nLogRecords * sizeof(int));
425 memset(OPERATION_ARRAY, 0, nLogRecords * sizeof(int));
426 memset(CACHEID_ARRAY, 0, nLogRecords * sizeof(int));
427 memset(AUTOID_ARRAY, 0, nLogRecords * sizeof(int));
429 oselstmt->setResultSetInfo(nLogRecords);
431 oselstmt->rsBindField(1, TABLENAME_ARRAY);
432 oselstmt->rsBindField(2, PKID_ARRAY);
433 oselstmt->rsBindField(3, OPERATION_ARRAY);
434 oselstmt->rsBindField(4, CACHEID_ARRAY);
435 oselstmt->rsBindField(5, AUTOID_ARRAY);
437 sprintf(StmtStr, "DELETE from csql_log_int where id=?;");
438 rv = TDELSTMT->prepare(StmtStr);
439 if (rv != OK) {
440 printError(ErrSysInternal, "Statement prepare failed. TDB may be down");
441 TSELSTMT->free(); TDELSTMT->free(); delete TSELSTMT; delete TDELSTMT;
442 return 1;
444 int retVal =0;
445 TDBInfo tdbname = ((SqlOdbcConnection*)TRDBCONNECT)->getTrDbName();
446 rv = TRDBCONNECT->beginTrans();
447 rv = oselstmt->executeForResultSet();
448 if (rv != OK) {
449 printError(ErrSysInit, "Unable to execute stmt in target db");
450 TRDBCONNECT->rollback();
451 TSELSTMT->free(); TDELSTMT->free();
452 delete TSELSTMT; delete TDELSTMT;
453 return 1;
455 bool found = false;
456 do {
457 rv = oselstmt->fetchScroll(&noOfRowsFetched);
458 int row = 0;
459 /* display each row */
460 for (row = 0; row < noOfRowsFetched; row++) {
461 char *tblName = (char *) TABLENAME_ARRAY + IDENTIFIER_LENGTH * row;
462 /*printf( "Row %d>", row );
463 printf( " %s <>", tblName);
464 printf( " %d <>", PKID);
465 printf( " %d <>", OPERATION);
466 printf( " %d <>", CACHEID);
467 printf( " %d <>", AUTOID);
468 printf( "\n" ); */
469 Util::trimEnd(tblName);
470 logFiner(Conf::logger, "Row value is Table:%s PK:%d OP:%d CID:%d\n", tblName, PKID, OPERATION, CACHEID);
472 if (OPERATION == 2) { //DELETE
473 retVal = remove(tblName,PKID, thrInfo);
474 logFinest(Conf::logger, "DELETE %s %d", tblName, PKID);
475 } //DELETE
476 else {
477 retVal = insert(tblName, PKID, thrInfo);
478 logFinest(Conf::logger, "INSERT %s %d", tblName, PKID);
481 if (retVal) ret =2;
482 TDELSTMT->setIntParam(1, AUTOID);
483 rv = TDELSTMT->execute(rows);
484 if (rv != OK) {
485 printError(ErrSysInternal, "Log record table:%s PK:%d RowID:%d not deleted from the target db %d\n", tblName, PKID, AUTOID, rv);
486 TRDBCONNECT->rollback();
487 break;
490 } while (rv == OK && noOfRowsFetched == nLogRecords);
491 TSELSTMT->close();
492 TDELSTMT->close();
493 TRDBCONNECT->commit();
494 TSELSTMT->free(); TDELSTMT->free(); delete TSELSTMT; delete TDELSTMT;
495 return ret;
498 int insert(char *tablename, int pkid, void *thrInfo)
500 DsnThrInput *dsnThrInfo = (DsnThrInput *)thrInfo;
501 DbRetVal rv = OK;
502 List fNameList;
503 AbsSqlStatement *astmt = NULL;
504 SqlOdbcStatement *ostmt = NULL;
505 AbsSqlStatement *istmt = NULL;
506 SqlStatement *sqlstmt = NULL;
507 TDBInfo tdbname = ((SqlOdbcConnection*)TRDBCONNECT)->getTrDbName();
508 CTStmtNode *node = getStmtFromHashTable(tablename, STMTBUCKETS);
509 if ((node == NULL) || (node && node->insStmt == NULL)) {
510 astmt = SqlFactory::createStatement(CSqlAdapter);
511 astmt->setConnection(TRDBCONNECT);
512 istmt = SqlFactory::createStatement(CSqlLog);
513 istmt->setConnection(CSQLCONNECT);
514 sqlstmt = (SqlStatement *) istmt->getInnerStatement();
515 char insStmt[1024];
516 char pkfieldname[128]; pkfieldname[0]='\0';
517 DbRetVal rv=getCacheField(tablename, pkfieldname, &CACHELIST);
518 if (rv!=OK) {
519 rv = getPKFieldName(tablename, pkfieldname, &CACHELIST);
520 if (rv != OK) {
523 //Util::str_tolower(pkfieldname);
524 char fieldlist[IDENTIFIER_LENGTH];
525 char condition[IDENTIFIER_LENGTH];
526 char sbuf[1024];
527 rv=getCacheProjField(tablename,fieldlist, &CACHELIST);
528 if(rv!=OK){
529 rv=getCacheCondition(tablename,condition, &CACHELIST);
530 if(rv!=OK){
531 sprintf(sbuf, "SELECT * FROM %s where %s = ?;", tablename, pkfieldname);
532 } else {
533 sprintf(sbuf, "SELECT * FROM %s where %s = ? and %s ;", tablename, pkfieldname, condition);
535 } else {
536 rv=getCacheCondition(tablename,condition, &CACHELIST);
537 if(rv!=OK){
538 sprintf(sbuf, "SELECT %s FROM %s where %s = ?;",fieldlist,tablename, pkfieldname);
539 } else {
540 sprintf(sbuf, "SELECT %s FROM %s where %s = ? and %s;",fieldlist,tablename, pkfieldname, condition);
543 //TODO::get the primary key field name from the table interface. need to implement it
544 //printf("Select String from adapter\n: *****%s\n", sbuf);
545 rv = astmt->prepare(sbuf);
546 if (rv != OK) return 2;
547 char *ptr = insStmt;
548 sprintf(ptr,"INSERT INTO %s VALUES(", tablename); ptr += strlen(ptr);
549 bool firstFld = true;
550 fNameList = sqlstmt->getFieldNameList(tablename, rv);
551 int noOfFields = fNameList.size();
552 while (noOfFields--) {
553 if (firstFld) {
554 firstFld = false;
555 sprintf(ptr,"?", tablename); ptr += strlen(ptr);
556 } else {
557 sprintf(ptr, ",?"); ptr += strlen(ptr);
560 sprintf(ptr, ");"); ptr += strlen(ptr);
561 //printf("ins stmt: '%s'\n", insStmt);
562 rv = istmt->prepare(insStmt);
563 if (rv != OK) { return 2; }
564 if(node == NULL) addToHashTable(tablename, astmt, istmt, STMTBUCKETS);
565 else {
566 node->adptStmt = astmt;
567 node->insStmt = istmt;
569 } else {
570 istmt = node->insStmt;
571 astmt = node->adptStmt;
572 sqlstmt = (SqlStatement *) istmt->getInnerStatement();
573 fNameList = sqlstmt->getFieldNameList(tablename, rv);
575 List valBufList;
576 ListIterator fNameIter = fNameList.getIterator();
577 FieldInfo *info = new FieldInfo();
578 int fcount =0; void *valBuf; int fieldsize=0;
579 void *buf[128];//TODO:resticts to support only 128 fields in table
580 for (int i=0; i < 128; i++) buf[i]= NULL;
581 DataType dType[128];
582 Identifier *elem = NULL;
583 BindBuffer *bBuf = NULL;
584 while (fNameIter.hasElement()) {
585 elem = (Identifier*) fNameIter.nextElement();
586 sqlstmt->getFieldInfo(tablename, (const char*)elem->name, info);
587 valBuf = AllDataType::alloc(info->type, info->length);
588 os::memset(valBuf,0,info->length);
589 bBuf = (BindBuffer *) SqlStatement::fillBindBuffer(tdbname, info->type, valBuf, info->length);
590 valBufList.append(bBuf);
591 dType[fcount] = info->type;
592 buf[fcount] = bBuf->csql;
593 astmt->bindField(fcount+1, buf[fcount]);
594 fcount++;
596 delete info;
597 int rows=0;
598 astmt->setIntParam(1, pkid);
599 int retValue = astmt->execute(rows);
600 if (retValue && rows != 1) {
601 printError(ErrSysInit, "Unable to execute statement at target db\n");
602 return ErrSysInit;
604 ListIterator bindIter = valBufList.getIterator();
605 if (astmt->fetch() != NULL) {
606 ostmt = (SqlOdbcStatement *) astmt;
607 ostmt->setNullInfo(istmt);
608 if(tdbname == postgres){
609 for (int i=0; i < fcount; i++) {
610 if(dType[i] == typeString) Util::trimRight((char *)buf[i]);
613 //setXXXParams to be called here
614 int pos = 1;
615 while (bindIter.hasElement()) {
616 bBuf = (BindBuffer *) bindIter.nextElement();
617 SqlStatement::setParamValues(istmt, pos++, bBuf->type,
618 bBuf->length, bBuf->csql);
620 CSQLCONNECT->beginTrans();
621 int rows = 0;
622 rv = istmt->execute(rows);
623 if (rv != OK) {
624 printf ("execute failed \n");
625 //printf(" STMT: %s\n",insStmt);
626 return 3;
628 CSQLCONNECT->commit();
629 //printf("successfully inserted value with pkid = %d\n", pkid);
630 //Note:insert may fail if the record is inserted from this cache
632 astmt->close();
633 //for (int i=0; i < fcount; i++) free(buf[i]);
634 ListIterator iter = valBufList.getIterator();
635 while (iter.hasElement()){
636 bBuf = (BindBuffer*) iter.nextElement();
637 delete bBuf;
639 valBufList.reset();
640 iter = fNameList.getIterator();
641 while (iter.hasElement()) delete iter.nextElement();
642 fNameList.reset();
644 return 0;
646 int remove(char *tablename, int pkid, void *thrInfo)
648 DsnThrInput *dsnThrInfo = (DsnThrInput *)thrInfo;
649 DbRetVal rv = OK;
650 List fNameList;
651 SqlOdbcStatement *ostmt = NULL;
652 AbsSqlStatement *dstmt = NULL;
653 SqlStatement *sqlstmt = NULL;
654 TDBInfo tdbname = ((SqlOdbcConnection*)TRDBCONNECT)->getTrDbName();
655 CTStmtNode *node = getStmtFromHashTable(tablename, STMTBUCKETS);
656 if (node == NULL || (node && node->delStmt == NULL)) {
657 dstmt = SqlFactory::createStatement(CSqlLog);
658 dstmt->setConnection(CSQLCONNECT);
659 sqlstmt = (SqlStatement *) dstmt->getInnerStatement();
660 char delStmt[1024];
661 char pkfieldname[128]; pkfieldname[0]='\0';
662 DbRetVal rv=getCacheField(tablename, pkfieldname, &CACHELIST);
663 if (rv!=OK) {
664 rv = getPKFieldName(tablename, pkfieldname, &CACHELIST);
665 if (rv != OK) {
668 Util::str_tolower(pkfieldname);
669 sprintf(delStmt, "DELETE FROM %s where %s = ?;", tablename, pkfieldname);
670 //printf("Delete stmt: %s\n", delStmt);
671 rv = dstmt->prepare(delStmt);
672 if (rv != OK) return 2;
673 if (node == NULL) addToHashTable(tablename, dstmt, STMTBUCKETS);
674 else { node->delStmt = dstmt; }
675 } else { dstmt = node->delStmt; }
676 dstmt->setIntParam(1, pkid);
678 rv = CSQLCONNECT->beginTrans();
679 if (rv != OK) return 2;
680 int rows = 0;
681 //printf("DEBUG: pkid = %d\n", pkid);
682 rv = dstmt->execute(rows);
683 if (rv != OK || rows !=1)
685 CSQLCONNECT->rollback();
686 //printf("DEBUG: delete stmt execute failed in csql = %d\n", rv);
687 // printError(ErrSysInternal, "Delete failed for stmt %s\n", delStmt);
688 return 3;
690 rv = CSQLCONNECT->commit();
691 return 0;
694 void createCacheTableList(AbsSqlConnection *tcon, List *cacheTableList)
696 AbsSqlStatement *stmt = SqlFactory::createStatement(CSqlAdapter);
697 stmt->setConnection(tcon);
698 SqlOdbcStatement *ostmt = (SqlOdbcStatement*) stmt;
699 FILE *fp;
700 fp = fopen(Conf::config.getTableConfigFile(),"r");
701 if( fp == NULL ) {
702 printError(ErrSysInit, "csqltable.conf file does not exist");
703 fclose(fp);
705 char tablename[IDENTIFIER_LENGTH];
706 char fieldname[IDENTIFIER_LENGTH];
707 char condition[IDENTIFIER_LENGTH];
708 char field[IDENTIFIER_LENGTH];
709 char dsnName[IDENTIFIER_LENGTH];
710 char pkfield[IDENTIFIER_LENGTH];
712 int mode;
713 while(!feof(fp))
715 pkfield[0]='\0';
716 int items = fscanf(fp,"%d %s %s %s %s %s \n",&mode,tablename,fieldname,condition,field,dsnName);
717 if (items != 6) break;
718 CacheTableInfo *cacheTable=new CacheTableInfo();
719 cacheTable->setTableName(tablename);
720 cacheTable->setFieldName(fieldname);
721 cacheTable->setProjFieldList(field);
722 cacheTable->setCondition(condition);
723 ostmt->getPrimaryKeyFieldName(tablename, pkfield);
724 cacheTable->setPKField(pkfield);
725 cacheTableList->append(cacheTable);
727 // printf("Table %s is not cached\n",tabname);
728 fclose(fp);
729 delete stmt;
732 DbRetVal getCacheCondition(char *tblName,char *condition, List *cacheTableList)
734 ListIterator iter=cacheTableList->getIterator();
735 CacheTableInfo *cacheTable;
736 while(iter.hasElement())
738 cacheTable=(CacheTableInfo*)iter.nextElement();
739 if(strcmp(cacheTable->getTableName(),tblName)==0){
740 if(strcmp(cacheTable->getCondition(),"NULL")!=0)
742 strcpy(condition,cacheTable->getCondition());
743 return OK;
747 return ErrNotExists;
750 DbRetVal getCacheProjField(char *tblName,char *fieldlist, List *cacheTableList)
752 ListIterator iter=cacheTableList->getIterator();
753 CacheTableInfo *cacheTable;
754 while(iter.hasElement())
756 cacheTable=(CacheTableInfo*)iter.nextElement();
757 if(strcmp(cacheTable->getTableName(),tblName)==0){
758 if(strcmp(cacheTable->getProjFieldList(),"NULL")!=0)
760 strcpy(fieldlist,cacheTable->getProjFieldList());
761 return OK;
765 return ErrNotExists;
767 DbRetVal getCacheField(char *tblName,char *fldName, List *cacheTableList)
769 ListIterator iter=cacheTableList->getIterator();
770 CacheTableInfo *cacheTable;
771 while(iter.hasElement())
773 cacheTable=(CacheTableInfo*)iter.nextElement();
774 if(strcmp(cacheTable->getTableName(),tblName)==0){
775 if(strcmp(cacheTable->getFieldName(),"NULL")!=0)
777 strcpy(fldName,cacheTable->getFieldName());
778 return OK;
783 return ErrNotExists;
786 DbRetVal getPKFieldName(char *tblName, char *pkFldName, List *cacheTableList)
788 ListIterator iter=cacheTableList->getIterator();
789 CacheTableInfo *cacheTable;
790 while(iter.hasElement())
792 cacheTable=(CacheTableInfo*)iter.nextElement();
793 if(strcmp(cacheTable->getTableName(),tblName)==0){
794 strcpy(pkFldName, cacheTable->getPKFieldName());
795 return OK;
798 return ErrNotExists;
802 void *fillBindBuffer(TDBInfo tdbName, DataType type, void *valBuf, int length)
804 BindBuffer *bBuf = NULL;
805 switch(type)
807 case typeDate:
808 bBuf = new BindBuffer();
809 bBuf->csql = valBuf;
810 bBuf->type = typeDate;
811 bBuf->length = sizeof(DATE_STRUCT);
812 bBuf->targetdb = malloc(bBuf->length);
813 memset(bBuf->targetdb, 0, bBuf->length);
814 valBuf = bBuf->targetdb;
815 break;
816 case typeTime:
817 bBuf = new BindBuffer();
818 bBuf->csql = valBuf;
819 bBuf->type = typeTime;
820 bBuf->length = sizeof(TIME_STRUCT);
821 bBuf->targetdb = malloc(bBuf->length);
822 memset(bBuf->targetdb, 0, bBuf->length);
823 valBuf = bBuf->targetdb;
824 break;
825 case typeTimeStamp:
826 bBuf = new BindBuffer();
827 bBuf->csql = valBuf;
828 bBuf->type = typeTimeStamp;
829 bBuf->length = sizeof(TIMESTAMP_STRUCT);
830 bBuf->targetdb = malloc(bBuf->length);
831 memset(bBuf->targetdb, 0, bBuf->length);
832 valBuf = bBuf->targetdb;
833 break;
834 case typeLongLong:
836 if( tdbName == postgres || tdbName == oracle )
838 bBuf = new BindBuffer();
839 bBuf->type = typeLongLong;
840 bBuf->length = 40;
841 bBuf->csql = valBuf;
842 bBuf->targetdb = AllDataType::alloc(typeString,bBuf->length);
843 memset(bBuf->targetdb, 0, bBuf->length);
844 valBuf = bBuf->targetdb;
845 break;
846 }else
848 bBuf = new BindBuffer();
849 bBuf->type = type;
850 bBuf->csql = valBuf;
851 bBuf->length = length;
852 break;
855 case typeVarchar:
856 case typeString:
858 bBuf = new BindBuffer();
859 bBuf->type = typeString;
860 bBuf->csql = valBuf;
861 bBuf->length = length+1;
862 break;
864 default:
865 bBuf = new BindBuffer();
866 bBuf->type = type;
867 bBuf->csql = valBuf;
868 bBuf->length = length;
869 break;
871 return bBuf;