1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
21 #include <SqlLogConnection.h>
22 #include <SqlLogStatement.h>
26 List
SqlLogConnection::cacheList
;
28 DbRetVal
SqlLogConnection::addPacket(BasePacket
* pkt
)
33 DbRetVal
SqlLogConnection::addPreparePacket(PacketPrepare
* pkt
)
35 curPrepareStore
.append(pkt
);
39 DbRetVal
SqlLogConnection::removePreparePacket(int stmtid
)
41 ListIterator iter
= prepareStore
.getIterator();
42 PacketPrepare
*pkt
= NULL
, *dpkt
=NULL
;
43 while (iter
.hasElement())
45 pkt
= (PacketPrepare
*)iter
.nextElement();
46 if (pkt
->stmtID
== stmtid
) dpkt
= pkt
;
48 if (dpkt
== NULL
) return OK
;
49 //TEMP:mask below error for now
52 printError(ErrNotFound
, "Prepare packet not found in list for %d\n", stmtid
);
56 prepareStore
.remove(dpkt
);
60 SqlLogConnection::~SqlLogConnection()
62 if (msgQSend
) { delete msgQSend
; msgQSend
= NULL
; }
63 if (fileSend
) { delete fileSend
; fileSend
= NULL
; }
65 ListIterator it
= cacheList
.getIterator();
66 while(it
.hasElement()) delete (CachedTable
*) it
.nextElement();
68 it
= execLogStore
.getIterator();
69 while(it
.hasElement()) ::free ((ExecLogInfo
*) it
.nextElement());
73 DbRetVal
SqlLogConnection::connect (char *user
, char *pass
)
76 //printf("LOG: connect\n");
77 if (innerConn
) rv
= innerConn
->connect(user
,pass
);
78 if (rv
!= OK
) return rv
;
79 if ( (!Conf::config
.useCache() && Conf::config
.getCacheMode() == SYNC_MODE
)
80 && !Conf::config
.useDurability()) return OK
;
81 if (rv
!=OK
) { innerConn
->disconnect(); return rv
; }
83 //populate cacheList if not populated by another thread in same process
84 //TODO::cacheList requires mutex guard
85 if (0 == cacheList
.size()) rv
= populateCachedTableList();
89 DbRetVal
SqlLogConnection::disconnect()
92 //printf("LOG: disconnect\n");
93 if (innerConn
) rv
=innerConn
->disconnect();
94 if (rv
!= OK
) return rv
;
95 if ( (!Conf::config
.useCache() && Conf::config
.getCacheMode() == SYNC_MODE
) && !Conf::config
.useDurability()) return OK
;
98 DbRetVal
SqlLogConnection::beginTrans(IsolationLevel isoLevel
, TransSyncMode mode
)
101 if (innerConn
) rv
= innerConn
->beginTrans(isoLevel
);
102 if (rv
!= OK
) return rv
;
104 txnID
= SqlLogConnection::txnUID
.getID(TXN_ID
);
107 DbRetVal
SqlLogConnection::commit()
110 //printf("LOG: commit %d\n", syncMode);
111 //if (innerConn) rv = innerConn->commit();
112 if (innerConn
) rv
= innerConn
->commit();
113 if (( !Conf::config
.useCache() && Conf::config
.getCacheMode() == SYNC_MODE
)
114 && !Conf::config
.useDurability()) return OK
;
116 if (execLogStore
.size() == 0) {
117 //This means no execution for any non select statements in
119 //rollback so that subsequent beginTrans will not report that
120 //transaction is already started
122 rv
= innerConn
->rollback();
123 //if (rv != OK) return rv;
124 //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
129 //TODO::put the packet in global log store
131 PacketCommit *pkt = new PacketCommit();
132 int tid = txnUID.getID();
133 pkt->setExecPackets(tid, logStore);
135 int *p = (int*) pkt->getMarshalledBuffer();
136 NetworkClient *nwClient= nwTable.getNetworkClient();
137 if (syncMode == ASYNC) {
138 rv = nwClient->send(NW_PKT_COMMIT, pkt->getMarshalledBuffer(),
139 pkt->getBufferSize());
142 printError(ErrOS, "Unable to send SQL Logs to peer site\n");
145 rv = nwClient->receive();
148 printError(ErrOS, "Could not get acknowledgement from peer site\n");
149 return ErrPeerExecFailed;
151 //TODO::remove all sql logs nodes and the list which contains ptr to it
153 int txnId
= getTxnID();
154 // len to be sent should also contain txnId
155 int len
= 2 * sizeof(int) + os::align(getExecLogStoreSize());
156 printDebug(DM_SqlLog
, "commit: size of logstore: %d", len
);
157 int bufferSize
= sizeof(long) + len
;
158 printDebug(DM_SqlLog
, "commit: size of buffer: %d", bufferSize
);
159 void *buffer
= malloc(bufferSize
);
160 printDebug(DM_SqlLog
, "commit: buffer address: %x", buffer
);
161 char *ptr
= (char *)buffer
+ sizeof(long); // long type is for msgtype
163 printDebug(DM_SqlLog
, "commit: data address: %x", data
);
166 *(int *) ptr
= txnId
;
168 ListIterator logStoreIter
= execLogStore
.getIterator();
169 ExecLogInfo
*elInfo
= NULL
;
170 while (logStoreIter
.hasElement()) {
171 elInfo
= (ExecLogInfo
*)logStoreIter
.nextElement();
172 printDebug(DM_SqlLog
, "commit: elem from logstore:: %x", elInfo
);
173 *(int *) ptr
= elInfo
->stmtId
;
174 printDebug(DM_SqlLog
, "commit: stmtId to marshall: %d", elInfo
->stmtId
);
176 *(int *) ptr
= (int) elInfo
->type
;
177 printDebug(DM_SqlLog
, "commit: ExType to marshall: %d", elInfo
->type
);
178 //printf("PRABA::type is %d\n" , *(int *) ptr);
180 if (elInfo
->type
== SETPARAM
) {
181 *(int *) ptr
= elInfo
->pos
;
182 printDebug(DM_SqlLog
, "commit: PrmPos to marshall: %d", elInfo
->pos
);
184 *(int *) ptr
= (int) elInfo
->dataType
;
185 printDebug(DM_SqlLog
, "commit: DtType to marshall: %d", elInfo
->dataType
);
187 *(int *) ptr
= elInfo
->len
;
188 printDebug(DM_SqlLog
, "commit: length to marshall: %d", elInfo
->len
);
190 memcpy(ptr
, &elInfo
->value
, elInfo
->len
);
194 commitLogs(len
, data
);
195 ListIterator it
= execLogStore
.getIterator();
196 while(it
.hasElement()) ::free ((ExecLogInfo
*) it
.nextElement());
197 execLogStore
.reset();
200 //if (innerConn) rv = innerConn->commit();
203 DbRetVal
SqlLogConnection::rollback()
206 //printf("LOG: rollback \n");
207 if (innerConn
) rv
= innerConn
->rollback();
208 if (rv
!= OK
) return rv
;
209 if (( !Conf::config
.useCache() && Conf::config
.getCacheMode() == SYNC_MODE
)
210 && !Conf::config
.useDurability()) return OK
;
212 ListIterator logStoreIter
= execLogStore
.getIterator();
213 ExecLogInfo
*elInfo
= NULL
;
214 while (logStoreIter
.hasElement())
216 elInfo
= (ExecLogInfo
*)logStoreIter
.nextElement();
219 execLogStore
.reset();
224 DbRetVal
SqlLogConnection::populateCachedTableList()
227 fp
= fopen(Conf::config
.getTableConfigFile(),"r");
229 printError(ErrSysInit
, "cache.table file does not exist");
232 char tablename
[IDENTIFIER_LENGTH
];
233 char fieldname
[IDENTIFIER_LENGTH
];
234 char condition
[IDENTIFIER_LENGTH
];
235 char field
[IDENTIFIER_LENGTH
];
236 char dsnName
[IDENTIFIER_LENGTH
];
243 CachedTable
*node
=NULL
;
246 fscanf(fp
, "%d %s %s %s %s %s\n", &cmode
, tablename
,fieldname
,condition
,field
,dsnName
);
247 node
= new CachedTable();
248 strcpy(node
->tableName
, tablename
);
249 cacheList
.append(node
);
255 bool SqlLogConnection::isTableCached(char *tblName
)
259 printError(ErrBadArg
, "tblName passed is NULL\n");
262 ListIterator iter
= cacheList
.getIterator();
264 while (iter
.hasElement()) {
265 node
= (CachedTable
*)iter
.nextElement();
266 if (strcmp(node
->tableName
, tblName
) == 0)
274 DbRetVal
MsgQueueSend::prepare(int txnId
, int stmtId
, int len
, char *stmt
,
277 //strlen is not included string is the last element in the following
279 int datalen
= 3 * sizeof(int) + IDENTIFIER_LENGTH
+ os::align(len
); // for len + txnId + stmtId + tblName + string
281 int buffer
= sizeof(Message
) - 1 + datalen
;
282 Message
*msg
= (Message
*) malloc(buffer
);
284 *(int *)&msg
->data
= datalen
;
285 char *ptr
= (char *) &msg
->data
+ sizeof(int);
288 *(int *)ptr
= stmtId
;
290 strncpy(ptr
, tblName
, IDENTIFIER_LENGTH
);
291 ptr
[IDENTIFIER_LENGTH
-1] ='\0';
292 ptr
+=IDENTIFIER_LENGTH
;
293 strncpy(ptr
, stmt
, len
);
294 printDebug(DM_SqlLog
, "stmtstr = | %s |\n", ptr
);
295 printDebug(DM_SqlLog
, "length of msg sent = %d\n", datalen
);
296 int ret
= os::msgsnd(msgQId
, msg
, datalen
, 0666);
298 printError(ErrSysInternal
, "message send failed\n");
300 return ErrSysInternal
;
306 DbRetVal
MsgQueueSend::commit(int len
, void *data
)
308 Message
*msg
= (Message
*) ((char *)data
- sizeof (long));
310 int ret
= os::msgsnd(msgQId
, msg
, len
, 0666);
312 printError(ErrSysInternal
, "message send failed\n");
313 return ErrSysInternal
;
318 DbRetVal
MsgQueueSend::free(int txnId
, int stmtId
)
320 // data to be sent is len + txn id + stmtId
321 int dataLen
= 3 * sizeof(int);
322 int bufferSize
= sizeof(Message
) - 1 + dataLen
;
323 Message
*msg
= (Message
*) malloc(bufferSize
);
325 *(int *) &msg
->data
= dataLen
;
326 char *ptr
= (char *) &msg
->data
;
328 *(int *) ptr
= txnId
;
330 *(int *) ptr
= stmtId
;
331 printDebug(DM_SqlLog
, "stmtID sent = %d\n", *(int *) ptr
);
332 int ret
= os::msgsnd(msgQId
, msg
, dataLen
, 0666);
334 printError(ErrSysInternal
, "message send failed\n");
336 return ErrSysInternal
;