1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
21 #include <SqlLogConnection.h>
22 #include <SqlLogStatement.h>
26 List
SqlLogConnection::cacheList
;
28 SqlLogConnection::~SqlLogConnection()
30 if (msgQSend
) { delete msgQSend
; msgQSend
= NULL
; }
31 if (fileSend
) { delete fileSend
; fileSend
= NULL
; }
32 if (offlineLog
) { delete offlineLog
; offlineLog
= NULL
; }
34 ListIterator iter
= cacheList
.getIterator();
35 while(iter
.hasElement()) delete (CachedTable
*) iter
.nextElement();
37 ListIterator it
= execLogStore
.getIterator();
38 while(it
.hasElement()) ::free ((ExecLogInfo
*) it
.nextElement());
42 DbRetVal
SqlLogConnection::connect (char *user
, char *pass
)
45 if (innerConn
) return innerConn
->connect(user
,pass
);
46 //populate cacheList if not populated by another thread in same process
47 //TODO::cacheList requires mutex guard
48 if (0 == cacheList
.size()) rv
= populateCachedTableList();
52 DbRetVal
SqlLogConnection::disconnect()
54 if (innerConn
) innerConn
->disconnect();
58 DbRetVal
SqlLogConnection::beginTrans(IsolationLevel isoLevel
, TransSyncMode mode
)
61 if (innerConn
) rv
= innerConn
->beginTrans(isoLevel
);
62 if (rv
!= OK
) return rv
;
64 txnID
= SqlLogConnection::txnUID
.getID(TXN_ID
);
68 DbRetVal
SqlLogConnection::commit()
71 if (innerConn
) rv
= innerConn
->commit();
72 if (!msgQSend
&& !fileSend
&& !offlineLog
) return OK
;
73 if (execLogStore
.size() == 0) {
74 //This means no execution for any non select statements in
76 //rollback so that subsequent beginTrans will not report that
77 //transaction is already started
79 rv
= innerConn
->rollback();
80 //if (rv != OK) return rv;
81 //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
85 int txnId
= getTxnID();
86 // len to be sent should also contain txnId
87 int len
= 2 * sizeof(int) + os::align(getExecLogStoreSize());
88 printDebug(DM_SqlLog
, "commit: size of logstore: %d", len
);
89 int bufferSize
= sizeof(long) + len
;
90 printDebug(DM_SqlLog
, "commit: size of buffer: %d", bufferSize
);
91 void *buffer
= malloc(bufferSize
);
92 printDebug(DM_SqlLog
, "commit: buffer address: %x", buffer
);
93 char *ptr
= (char *)buffer
+ sizeof(long); // long type is for msgtype
95 printDebug(DM_SqlLog
, "commit: data address: %x", data
);
100 ListIterator logStoreIter
= execLogStore
.getIterator();
101 ExecLogInfo
*elInfo
= NULL
;
102 while (logStoreIter
.hasElement()) {
103 elInfo
= (ExecLogInfo
*)logStoreIter
.nextElement();
104 printDebug(DM_SqlLog
, "commit: elem from logstore:: %x", elInfo
);
105 *(int *) ptr
= elInfo
->stmtId
;
106 printDebug(DM_SqlLog
, "commit: stmtId to marshall: %d", elInfo
->stmtId
);
108 *(int *) ptr
= (int) elInfo
->type
;
109 printDebug(DM_SqlLog
, "commit: ExType to marshall: %d", elInfo
->type
);
111 if (elInfo
->type
== SETPARAM
) {
112 *(int *) ptr
= elInfo
->pos
;
113 printDebug(DM_SqlLog
, "commit: PrmPos to marshall: %d", elInfo
->pos
);
115 *(int *) ptr
= elInfo
->isNull
;
116 printDebug(DM_SqlLog
, "commit: isNull to marshall: %d", elInfo
->isNull
);
118 if (elInfo
->isNull
== 0) {
119 *(int *) ptr
= (int) elInfo
->dataType
;
120 printDebug(DM_SqlLog
, "commit: DtType to marshall: %d", elInfo
->dataType
);
122 *(int *) ptr
= elInfo
->len
;
123 printDebug(DM_SqlLog
, "commit: length to marshall: %d", elInfo
->len
);
125 memcpy(ptr
, &elInfo
->value
, elInfo
->len
);
130 commitLogs(len
, data
);
131 ListIterator it
= execLogStore
.getIterator();
132 while(it
.hasElement()) ::free ((ExecLogInfo
*) it
.nextElement());
133 execLogStore
.reset();
138 DbRetVal
SqlLogConnection::rollback()
141 if (innerConn
) rv
= innerConn
->rollback();
142 if (rv
!= OK
) return rv
;
143 if (!msgQSend
&& !fileSend
&& !offlineLog
) return OK
;
144 ListIterator logStoreIter
= execLogStore
.getIterator();
145 ExecLogInfo
*elInfo
= NULL
;
146 while (logStoreIter
.hasElement())
148 elInfo
= (ExecLogInfo
*)logStoreIter
.nextElement();
151 execLogStore
.reset();
156 DbRetVal
SqlLogConnection::populateCachedTableList()
159 fp
= fopen(Conf::config
.getTableConfigFile(),"r");
161 printError(ErrSysInit
, "cache.table file does not exist");
164 char tablename
[IDENTIFIER_LENGTH
];
165 char fieldname
[IDENTIFIER_LENGTH
];
166 char condition
[IDENTIFIER_LENGTH
];
167 char field
[IDENTIFIER_LENGTH
];
168 char dsnName
[IDENTIFIER_LENGTH
];
175 CachedTable
*node
=NULL
;
178 int input
= fscanf(fp
, "%d %s %s %s %s %s\n", &cmode
, tablename
,fieldname
,condition
,field
,dsnName
);
179 if (input
!= 6) break;
180 node
= new CachedTable();
181 strcpy(node
->tableName
, tablename
);
182 cacheList
.append(node
);
188 bool SqlLogConnection::isTableCached(char *tblName
)
192 printError(ErrBadArg
, "tblName passed is NULL\n");
195 ListIterator iter
= cacheList
.getIterator();
197 while (iter
.hasElement()) {
198 node
= (CachedTable
*)iter
.nextElement();
199 if (strcmp(node
->tableName
, tblName
) == 0)
207 DbRetVal
MsgQueueSend::prepare(int txnId
, int stmtId
, int len
, char *stmt
,
208 char *tblName
, bool hasParam
)
210 //strlen is not included string is the last element in the following
212 int datalen
= 3 * sizeof(int) + IDENTIFIER_LENGTH
+ os::align(len
); // for len + txnId + stmtId + tblName + string
214 int buffer
= sizeof(Message
) - sizeof(void *) + datalen
;
215 Message
*msg
= (Message
*) malloc(buffer
);
217 *(int *)&msg
->data
= datalen
;
218 char *ptr
= (char *) &msg
->data
+ sizeof(int);
221 *(int *)ptr
= stmtId
;
223 strncpy(ptr
, tblName
, IDENTIFIER_LENGTH
);
224 ptr
[IDENTIFIER_LENGTH
-1] ='\0';
225 ptr
+=IDENTIFIER_LENGTH
;
226 strncpy(ptr
, stmt
, len
);
227 printDebug(DM_SqlLog
, "stmtstr = | %s |\n", ptr
);
228 printDebug(DM_SqlLog
, "length of msg sent = %d\n", datalen
);
229 int ret
= os::msgsnd(msgQId
, msg
, datalen
, 0666);
231 printError(ErrSysInternal
, "message send failed\n");
233 return ErrSysInternal
;
239 DbRetVal
MsgQueueSend::commit(int len
, void *data
)
241 Message
*msg
= (Message
*) ((char *)data
- sizeof (long));
243 int ret
= os::msgsnd(msgQId
, msg
, len
, 0666);
245 printError(ErrSysInternal
, "message send failed\n");
246 return ErrSysInternal
;
251 DbRetVal
MsgQueueSend::free(int txnId
, int stmtId
, bool hasParam
)
253 // data to be sent is len + txn id + stmtId
254 int dataLen
= 3 * sizeof(int);
255 int bufferSize
= sizeof(Message
) - sizeof(void *) + dataLen
;
256 Message
*msg
= (Message
*) malloc(bufferSize
);
258 *(int *) &msg
->data
= dataLen
;
259 char *ptr
= (char *) &msg
->data
;
261 *(int *) ptr
= txnId
;
263 *(int *) ptr
= stmtId
;
264 printDebug(DM_SqlLog
, "stmtID sent = %d\n", *(int *) ptr
);
265 int ret
= os::msgsnd(msgQId
, msg
, dataLen
, 0666);
267 printError(ErrSysInternal
, "message send failed\n");
269 return ErrSysInternal
;