Update in sync with enterprise version.
[csql.git] / src / sqllog / SqlLogConnection.cxx
blobdea7f55fa426f23a9bb66f543107c455f3a1b49d
1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #include <os.h>
21 #include <SqlLogConnection.h>
22 #include <SqlLogStatement.h>
23 #include <CSql.h>
24 #include <Network.h>
26 List SqlLogConnection::cacheList;
28 DbRetVal SqlLogConnection::addPacket(BasePacket* pkt)
30 logStore.append(pkt);
31 return OK;
33 DbRetVal SqlLogConnection::addPreparePacket(PacketPrepare* pkt)
35 curPrepareStore.append(pkt);
36 return OK;
39 DbRetVal SqlLogConnection::removePreparePacket(int stmtid)
41 ListIterator iter = prepareStore.getIterator();
42 PacketPrepare *pkt = NULL, *dpkt=NULL;
43 while (iter.hasElement())
45 pkt = (PacketPrepare*)iter.nextElement();
46 if (pkt->stmtID == stmtid) dpkt = pkt;
48 if (dpkt == NULL) return OK;
49 //TEMP:mask below error for now
50 if (dpkt == NULL)
52 printError(ErrNotFound, "Prepare packet not found in list for %d\n", stmtid);
53 return ErrNotFound;
55 delete dpkt;
56 prepareStore.remove(dpkt);
57 return OK;
60 SqlLogConnection::~SqlLogConnection()
62 if (msgQSend) { delete msgQSend; msgQSend = NULL; }
63 if (fileSend) { delete fileSend; fileSend = NULL; }
64 txnUID.close();
65 ListIterator it = cacheList.getIterator();
66 while(it.hasElement()) delete (CachedTable *) it.nextElement();
67 cacheList.reset();
68 it = execLogStore.getIterator();
69 while(it.hasElement()) ::free ((ExecLogInfo *) it.nextElement());
70 execLogStore.reset();
73 DbRetVal SqlLogConnection::connect (char *user, char *pass)
75 DbRetVal rv = OK;
76 //printf("LOG: connect\n");
77 if (innerConn) rv = innerConn->connect(user,pass);
78 if (rv != OK) return rv;
79 if ( (!Conf::config.useCache() && Conf::config.getCacheMode() == SYNC_MODE)
80 && !Conf::config.useDurability()) return OK;
81 if (rv !=OK) { innerConn->disconnect(); return rv; }
83 //populate cacheList if not populated by another thread in same process
84 //TODO::cacheList requires mutex guard
85 if (0 == cacheList.size()) rv = populateCachedTableList();
86 return rv;
89 DbRetVal SqlLogConnection::disconnect()
91 DbRetVal rv = OK;
92 //printf("LOG: disconnect\n");
93 if (innerConn) rv =innerConn->disconnect();
94 if (rv != OK) return rv;
95 if ( (!Conf::config.useCache() && Conf::config.getCacheMode() == SYNC_MODE) && !Conf::config.useDurability()) return OK;
96 return rv;
98 DbRetVal SqlLogConnection::beginTrans(IsolationLevel isoLevel, TransSyncMode mode)
100 DbRetVal rv = OK;
101 if (innerConn) rv = innerConn->beginTrans(isoLevel);
102 if (rv != OK) return rv;
103 syncMode = mode;
104 txnID = SqlLogConnection::txnUID.getID(TXN_ID);
105 return OK;
107 DbRetVal SqlLogConnection::commit()
109 DbRetVal rv = OK;
110 //printf("LOG: commit %d\n", syncMode);
111 //if (innerConn) rv = innerConn->commit();
112 if (innerConn) rv = innerConn->commit();
113 if (( !Conf::config.useCache() && Conf::config.getCacheMode() == SYNC_MODE)
114 && !Conf::config.useDurability()) return OK;
116 if (execLogStore.size() == 0) {
117 //This means no execution for any non select statements in
118 //this transaction
119 //rollback so that subsequent beginTrans will not report that
120 //transaction is already started
121 if (innerConn) {
122 rv = innerConn->rollback();
123 //if (rv != OK) return rv;
124 //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
126 return rv;
129 //TODO::put the packet in global log store
131 PacketCommit *pkt = new PacketCommit();
132 int tid = txnUID.getID();
133 pkt->setExecPackets(tid, logStore);
134 pkt->marshall();
135 int *p = (int*) pkt->getMarshalledBuffer();
136 NetworkClient *nwClient= nwTable.getNetworkClient();
137 if (syncMode == ASYNC) {
138 rv = nwClient->send(NW_PKT_COMMIT, pkt->getMarshalledBuffer(),
139 pkt->getBufferSize());
140 if (rv !=OK)
142 printError(ErrOS, "Unable to send SQL Logs to peer site\n");
143 return ErrOS;
145 rv = nwClient->receive();
146 if (rv !=OK)
148 printError(ErrOS, "Could not get acknowledgement from peer site\n");
149 return ErrPeerExecFailed;
151 //TODO::remove all sql logs nodes and the list which contains ptr to it
153 int txnId = getTxnID();
154 // len to be sent should also contain txnId
155 int len = 2 * sizeof(int) + os::align(getExecLogStoreSize());
156 printDebug(DM_SqlLog, "commit: size of logstore: %d", len);
157 int bufferSize = sizeof(long) + len;
158 printDebug(DM_SqlLog, "commit: size of buffer: %d", bufferSize);
159 void *buffer = malloc(bufferSize);
160 printDebug(DM_SqlLog, "commit: buffer address: %x", buffer);
161 char *ptr = (char *)buffer + sizeof(long); // long type is for msgtype
162 char *data = ptr;
163 printDebug(DM_SqlLog, "commit: data address: %x", data);
164 *(int *) ptr = len;
165 ptr += sizeof(int);
166 *(int *) ptr = txnId;
167 ptr += sizeof(int);
168 ListIterator logStoreIter = execLogStore.getIterator();
169 ExecLogInfo *elInfo = NULL;
170 while (logStoreIter.hasElement()) {
171 elInfo = (ExecLogInfo *)logStoreIter.nextElement();
172 printDebug(DM_SqlLog, "commit: elem from logstore:: %x", elInfo);
173 *(int *) ptr = elInfo->stmtId;
174 printDebug(DM_SqlLog, "commit: stmtId to marshall: %d", elInfo->stmtId);
175 ptr += sizeof(int);
176 *(int *) ptr = (int) elInfo->type;
177 printDebug(DM_SqlLog, "commit: ExType to marshall: %d", elInfo->type);
178 //printf("PRABA::type is %d\n" , *(int *) ptr);
179 ptr += sizeof(int);
180 if (elInfo->type == SETPARAM) {
181 *(int *) ptr = elInfo->pos;
182 printDebug(DM_SqlLog, "commit: PrmPos to marshall: %d", elInfo->pos);
183 ptr += sizeof(int);
184 *(int *) ptr = (int) elInfo->dataType;
185 printDebug(DM_SqlLog, "commit: DtType to marshall: %d", elInfo->dataType);
186 ptr += sizeof(int);
187 *(int *) ptr = elInfo->len;
188 printDebug(DM_SqlLog, "commit: length to marshall: %d", elInfo->len);
189 ptr += sizeof(int);
190 memcpy(ptr, &elInfo->value, elInfo->len);
191 ptr += elInfo->len;
194 commitLogs(len, data);
195 ListIterator it = execLogStore.getIterator();
196 while(it.hasElement()) ::free ((ExecLogInfo *) it.nextElement());
197 execLogStore.reset();
198 ::free(buffer);
199 execLogStoreSize =0;
200 //if (innerConn) rv = innerConn->commit();
201 return rv;
203 DbRetVal SqlLogConnection::rollback()
205 DbRetVal rv = OK;
206 //printf("LOG: rollback \n");
207 if (innerConn) rv = innerConn->rollback();
208 if (rv != OK) return rv;
209 if (( !Conf::config.useCache() && Conf::config.getCacheMode() == SYNC_MODE)
210 && !Conf::config.useDurability()) return OK;
212 ListIterator logStoreIter = execLogStore.getIterator();
213 ExecLogInfo *elInfo = NULL;
214 while (logStoreIter.hasElement())
216 elInfo = (ExecLogInfo *)logStoreIter.nextElement();
217 delete elInfo;
219 execLogStore.reset();
220 execLogStoreSize =0;
221 return rv;
224 DbRetVal SqlLogConnection::populateCachedTableList()
226 FILE *fp = NULL;
227 fp = fopen(Conf::config.getTableConfigFile(),"r");
228 if( fp == NULL ) {
229 printError(ErrSysInit, "cache.table file does not exist");
230 return ErrSysInit;
232 char tablename[IDENTIFIER_LENGTH];
233 char fieldname[IDENTIFIER_LENGTH];
234 char condition[IDENTIFIER_LENGTH];
235 char field[IDENTIFIER_LENGTH];
236 char dsnName[IDENTIFIER_LENGTH];
237 tablename[0] = '\0';
238 fieldname[0] = '\0';
239 condition[0] = '\0';
240 field[0] = '\0';
241 dsnName[0] = '\0';
242 int cmode;
243 CachedTable *node=NULL;
244 while(!feof(fp))
246 fscanf(fp, "%d %s %s %s %s %s\n", &cmode, tablename,fieldname,condition,field,dsnName);
247 node = new CachedTable();
248 strcpy(node->tableName, tablename);
249 cacheList.append(node);
251 fclose(fp);
252 return OK;
255 bool SqlLogConnection::isTableCached(char *tblName)
257 if (NULL == tblName)
259 printError(ErrBadArg, "tblName passed is NULL\n");
260 return ErrBadArg;
262 ListIterator iter = cacheList.getIterator();
263 CachedTable *node;
264 while (iter.hasElement()) {
265 node = (CachedTable*)iter.nextElement();
266 if (strcmp(node->tableName, tblName) == 0)
268 return true;
271 return false;
274 DbRetVal MsgQueueSend::prepare(int txnId, int stmtId, int len, char *stmt,
275 char *tblName)
277 //strlen is not included string is the last element in the following
278 //structure
279 int datalen = 3 * sizeof(int) + IDENTIFIER_LENGTH + os::align(len); // for len + txnId + stmtId + tblName + string
281 int buffer = sizeof(Message) - 1 + datalen;
282 Message *msg = (Message *) malloc(buffer);
283 msg->mtype = 1;
284 *(int *)&msg->data = datalen;
285 char *ptr = (char *) &msg->data + sizeof(int);
286 *(int *)ptr = txnId;
287 ptr += sizeof(int);
288 *(int *)ptr = stmtId;
289 ptr += sizeof(int);
290 strncpy(ptr, tblName, IDENTIFIER_LENGTH);
291 ptr[IDENTIFIER_LENGTH-1] ='\0';
292 ptr +=IDENTIFIER_LENGTH;
293 strncpy(ptr, stmt, len);
294 printDebug(DM_SqlLog, "stmtstr = | %s |\n", ptr);
295 printDebug(DM_SqlLog, "length of msg sent = %d\n", datalen);
296 int ret = os::msgsnd(msgQId, msg, datalen, 0666);
297 if (ret != 0) {
298 printError(ErrSysInternal, "message send failed\n");
299 ::free(msg);
300 return ErrSysInternal;
302 ::free(msg);
303 return OK;
306 DbRetVal MsgQueueSend::commit(int len, void *data)
308 Message *msg = (Message *) ((char *)data - sizeof (long));
309 msg->mtype = 2;
310 int ret = os::msgsnd(msgQId, msg, len, 0666);
311 if (ret != 0) {
312 printError(ErrSysInternal, "message send failed\n");
313 return ErrSysInternal;
315 return OK;
318 DbRetVal MsgQueueSend::free(int txnId, int stmtId)
320 // data to be sent is len + txn id + stmtId
321 int dataLen = 3 * sizeof(int);
322 int bufferSize = sizeof(Message) - 1 + dataLen;
323 Message *msg = (Message *) malloc(bufferSize);
324 msg->mtype = 3;
325 *(int *) &msg->data = dataLen;
326 char *ptr = (char *) &msg->data;
327 ptr += sizeof(int);
328 *(int *) ptr = txnId;
329 ptr += sizeof(int);
330 *(int *) ptr = stmtId;
331 printDebug(DM_SqlLog, "stmtID sent = %d\n", *(int *) ptr);
332 int ret = os::msgsnd(msgQId, msg, dataLen, 0666);
333 if (ret != 0) {
334 printError(ErrSysInternal, "message send failed\n");
335 ::free(msg);
336 return ErrSysInternal;
338 ::free(msg);
339 return OK;