Enterprise to opensource. 40 files including Makefil.am and .in from storage, sqllog...
[csql.git] / src / sqllog / SqlLogConnection.cxx
blobff7d27b0660c0c7fd40811efd014edbda9765240
1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #include <os.h>
21 #include <SqlLogConnection.h>
22 #include <SqlLogStatement.h>
23 #include <CSql.h>
24 #include <Network.h>
26 GlobalUniqueID SqlLogConnection::txnUID;
27 List SqlLogConnection::cacheList;
29 DbRetVal SqlLogConnection::addPacket(BasePacket* pkt)
31 logStore.append(pkt);
32 return OK;
34 DbRetVal SqlLogConnection::addPreparePacket(PacketPrepare* pkt)
36 curPrepareStore.append(pkt);
37 return OK;
40 DbRetVal SqlLogConnection::removePreparePacket(int stmtid)
42 ListIterator iter = prepareStore.getIterator();
43 PacketPrepare *pkt = NULL, *dpkt=NULL;
44 while (iter.hasElement())
46 pkt = (PacketPrepare*)iter.nextElement();
47 if (pkt->stmtID == stmtid) dpkt = pkt;
49 if (dpkt == NULL) return OK;
50 //TEMP:mask below error for now
51 if (dpkt == NULL)
53 printError(ErrNotFound, "Prepare packet not found in list for %d\n", stmtid);
54 return ErrNotFound;
56 delete dpkt;
57 prepareStore.remove(dpkt);
58 return OK;
61 DbRetVal SqlLogConnection::connect (char *user, char *pass)
63 DbRetVal rv = OK;
64 //printf("LOG: connect\n");
65 if (innerConn) rv = innerConn->connect(user,pass);
66 if (rv != OK) return rv;
67 if (!Conf::config.useDurability()) return OK;
69 //populate cacheList if not populated by another thread in same process
70 //TODO::cacheList requires mutex guard
71 if (0 == cacheList.size()) rv = populateCachedTableList();
72 rv = SqlLogStatement::stmtUID.open();
73 rv = SqlLogConnection::txnUID.open();
74 return rv;
77 DbRetVal SqlLogConnection::disconnect()
79 DbRetVal rv = OK;
80 //printf("LOG: disconnect\n");
81 if (innerConn) rv =innerConn->disconnect();
82 if (rv != OK) return rv;
83 if (!Conf::config.useDurability()) return OK;
84 SqlLogStatement::stmtUID.close();
85 return rv;
87 DbRetVal SqlLogConnection::beginTrans(IsolationLevel isoLevel, TransSyncMode mode)
89 DbRetVal rv = OK;
90 if (innerConn) rv = innerConn->beginTrans(isoLevel);
91 if (rv != OK) return rv;
92 syncMode = mode;
93 txnID = SqlLogConnection::txnUID.getID(TXN_ID);
94 return OK;
96 DbRetVal SqlLogConnection::commit()
98 DbRetVal rv = OK;
99 //printf("LOG: commit %d\n", syncMode);
100 //if (innerConn) rv = innerConn->commit();
101 if (innerConn) rv = innerConn->commit();
102 if (!Conf::config.useDurability()) return OK;
104 if (execLogStore.size() == 0) {
105 //This means no execution for any non select statements in
106 //this transaction
107 //rollback so that subsequent beginTrans will not report that
108 //transaction is already started
109 if (innerConn) {
110 rv = innerConn->rollback();
111 //if (rv != OK) return rv;
112 //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
114 return rv;
117 //TODO::put the packet in global log store
119 PacketCommit *pkt = new PacketCommit();
120 int tid = txnUID.getID();
121 pkt->setExecPackets(tid, logStore);
122 pkt->marshall();
123 int *p = (int*) pkt->getMarshalledBuffer();
124 NetworkClient *nwClient= nwTable.getNetworkClient();
125 if (syncMode == ASYNC) {
126 rv = nwClient->send(NW_PKT_COMMIT, pkt->getMarshalledBuffer(),
127 pkt->getBufferSize());
128 if (rv !=OK)
130 printError(ErrOS, "Unable to send SQL Logs to peer site\n");
131 return ErrOS;
133 rv = nwClient->receive();
134 if (rv !=OK)
136 printError(ErrOS, "Could not get acknowledgement from peer site\n");
137 return ErrPeerExecFailed;
139 //TODO::remove all sql logs nodes and the list which contains ptr to it
141 int txnId = getTxnID();
142 // len to be sent should also contain txnId
143 int len = 2 * sizeof(int) + os::align(getExecLogStoreSize());
144 printDebug(DM_SqlLog, "commit: size of logstore: %d", len);
145 int bufferSize = sizeof(long) + len;
146 printDebug(DM_SqlLog, "commit: size of buffer: %d", bufferSize);
147 void *buffer = malloc(bufferSize);
148 printDebug(DM_SqlLog, "commit: buffer address: %x", buffer);
149 char *ptr = (char *)buffer + sizeof(long); // long type is for msgtype
150 char *data = ptr;
151 printDebug(DM_SqlLog, "commit: data address: %x", data);
152 *(int *) ptr = len;
153 ptr += sizeof(int);
154 *(int *) ptr = txnId;
155 ptr += sizeof(int);
156 ListIterator logStoreIter = execLogStore.getIterator();
157 ExecLogInfo *elInfo = NULL;
158 while (logStoreIter.hasElement()) {
159 elInfo = (ExecLogInfo *)logStoreIter.nextElement();
160 printDebug(DM_SqlLog, "commit: elem from logstore:: %x", elInfo);
161 *(int *) ptr = elInfo->stmtId;
162 printDebug(DM_SqlLog, "commit: stmtId to marshall: %d", elInfo->stmtId);
163 ptr += sizeof(int);
164 *(int *) ptr = (int) elInfo->type;
165 printDebug(DM_SqlLog, "commit: ExType to marshall: %d", elInfo->type);
166 //printf("PRABA::type is %d\n" , *(int *) ptr);
167 ptr += sizeof(int);
168 if (elInfo->type == SETPARAM) {
169 *(int *) ptr = elInfo->pos;
170 printDebug(DM_SqlLog, "commit: PrmPos to marshall: %d", elInfo->pos);
171 ptr += sizeof(int);
172 *(int *) ptr = (int) elInfo->dataType;
173 printDebug(DM_SqlLog, "commit: DtType to marshall: %d", elInfo->dataType);
174 ptr += sizeof(int);
175 *(int *) ptr = elInfo->len;
176 printDebug(DM_SqlLog, "commit: length to marshall: %d", elInfo->len);
177 ptr += sizeof(int);
178 memcpy(ptr, &elInfo->value, elInfo->len);
179 ptr += elInfo->len;
182 commitLogs(len, data);
183 execLogStore.reset();
184 execLogStoreSize =0;
185 //if (innerConn) rv = innerConn->commit();
186 return rv;
188 DbRetVal SqlLogConnection::rollback()
190 DbRetVal rv = OK;
191 //printf("LOG: rollback \n");
192 if (innerConn) rv = innerConn->rollback();
193 if (rv != OK) return rv;
194 ListIterator logStoreIter = logStore.getIterator();
195 PacketExecute *execPkt = NULL;
196 while (logStoreIter.hasElement())
198 execPkt = (PacketExecute*)logStoreIter.nextElement();
199 delete execPkt;
201 logStore.reset();
203 execLogStore.reset();
204 execLogStoreSize =0;
205 return rv;
208 DbRetVal SqlLogConnection::populateCachedTableList()
210 FILE *fp = NULL;
211 fp = fopen(Conf::config.getTableConfigFile(),"r");
212 if( fp == NULL ) {
213 printError(ErrSysInit, "cache.table file does not exist");
214 return ErrSysInit;
216 char tablename[IDENTIFIER_LENGTH];
217 char fieldname[IDENTIFIER_LENGTH];
218 char condition[IDENTIFIER_LENGTH];
219 char field[IDENTIFIER_LENGTH];
220 int cmode;
221 CachedTable *node;
222 while(!feof(fp))
224 fscanf(fp, "%d:%s %s %s %s\n", &cmode, tablename,fieldname,condition,field);
225 node = new CachedTable();
226 strcpy(node->tableName, tablename);
227 cacheList.append(node);
229 fclose(fp);
230 return OK;
233 bool SqlLogConnection::isTableCached(char *tblName)
235 if (NULL == tblName)
237 printError(ErrBadArg, "tblName passed is NULL\n");
238 return ErrBadArg;
240 ListIterator iter = cacheList.getIterator();
241 CachedTable *node;
242 while (iter.hasElement()) {
243 node = (CachedTable*)iter.nextElement();
244 if (strcmp(node->tableName, tblName) == 0)
246 return true;
249 return false;