reverting back to version 1.12
[csql.git] / src / sqllog / SqlLogConnection.cxx
blob892f326c056c5612231d25f41040919126a4f14b
1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #include <SqlLogConnection.h>
21 #include <CSql.h>
22 #include <Network.h>
24 UniqueID SqlLogConnection::txnUID;
25 List SqlLogConnection::cacheList;
27 DbRetVal SqlLogConnection::addPacket(BasePacket* pkt)
29 logStore.append(pkt);
30 return OK;
32 DbRetVal SqlLogConnection::addPreparePacket(PacketPrepare* pkt)
34 curPrepareStore.append(pkt);
35 return OK;
38 DbRetVal SqlLogConnection::removePreparePacket(int stmtid)
40 ListIterator iter = prepareStore.getIterator();
41 PacketPrepare *pkt = NULL, *dpkt=NULL;
42 while (iter.hasElement())
44 pkt = (PacketPrepare*)iter.nextElement();
45 if (pkt->stmtID == stmtid) dpkt = pkt;
47 if (dpkt == NULL) return OK;
48 //TEMP:mask below error for now
49 if (dpkt == NULL)
51 printError(ErrNotFound, "Prepare packet not found in list for %d\n", stmtid);
52 return ErrNotFound;
54 delete dpkt;
55 prepareStore.remove(dpkt);
56 return OK;
59 DbRetVal SqlLogConnection::connect (char *user, char *pass)
61 DbRetVal rv = OK;
62 //printf("LOG: connect\n");
63 if (innerConn) rv = innerConn->connect(user,pass);
64 if (rv != OK) return rv;
65 if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK;
66 if (rv !=OK) { innerConn->disconnect(); return rv; }
68 //populate cacheList if not populated by another thread in same process
69 //TODO::cacheList requires mutex guard
70 if (0 == cacheList.size()) rv = populateCachedTableList();
71 return rv;
74 DbRetVal SqlLogConnection::disconnect()
76 DbRetVal rv = OK;
77 //printf("LOG: disconnect\n");
78 if (innerConn) rv =innerConn->disconnect();
79 if (rv != OK) return rv;
80 if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK;
81 return rv;
83 DbRetVal SqlLogConnection::beginTrans(IsolationLevel isoLevel, TransSyncMode mode)
85 DbRetVal rv = OK;
86 if (innerConn) rv = innerConn->beginTrans(isoLevel);
87 if (rv != OK) return rv;
89 syncMode = mode;
90 return OK;
92 DbRetVal SqlLogConnection::commit()
94 DbRetVal rv = OK;
95 //printf("LOG: commit %d\n", syncMode);
96 //if (innerConn) rv = innerConn->commit();
97 if (syncMode == OSYNC) {
98 if (innerConn) rv = innerConn->commit();
99 return rv;
101 if (logStore.size() == 0)
103 //This means no execution for any non select statements in
104 //this transaction
105 //rollback so that subsequent beginTrans will not report that
106 //transaction is already started
107 if (innerConn) {
108 rv = innerConn->rollback();
109 //if (rv != OK) return rv;
110 //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
112 return rv;
114 if (syncMode == ASYNC) {
115 //TODO::put the packet in global log store
117 PacketCommit *pkt = new PacketCommit();
118 int tid = txnUID.getID();
119 pkt->setExecPackets(tid, logStore);
120 pkt->marshall();
121 int *p = (int*) pkt->getMarshalledBuffer();
122 NetworkClient *nwClient= nwTable.getNetworkClient();
123 if (syncMode == ASYNC) {
124 rv = nwClient->send(NW_PKT_COMMIT, pkt->getMarshalledBuffer(),
125 pkt->getBufferSize());
126 if (rv !=OK)
128 printError(ErrOS, "Unable to send SQL Logs to peer site\n");
129 return ErrOS;
131 rv = nwClient->receive();
132 if (rv !=OK)
134 printError(ErrOS, "Could not get acknowledgement from peer site\n");
135 return ErrPeerExecFailed;
137 //TODO::remove all sql logs nodes and the list which contains ptr to it
141 ListIterator logStoreIter = logStore.getIterator();
142 PacketExecute *execPkt = NULL;
143 while (logStoreIter.hasElement())
145 execPkt = (PacketExecute*)logStoreIter.nextElement();
146 delete execPkt;
148 logStore.reset();
149 if (innerConn) rv = innerConn->commit();
150 return rv;
152 DbRetVal SqlLogConnection::rollback()
154 DbRetVal rv = OK;
155 //printf("LOG: rollback \n");
156 if (innerConn) rv = innerConn->rollback();
157 if (rv != OK) return rv;
158 ListIterator logStoreIter = logStore.getIterator();
159 PacketExecute *execPkt = NULL;
160 while (logStoreIter.hasElement())
162 execPkt = (PacketExecute*)logStoreIter.nextElement();
163 delete execPkt;
165 logStore.reset();
166 return rv;
168 DbRetVal SqlLogConnection::populateCachedTableList()
170 FILE *fp = NULL;
171 fp = fopen(Conf::config.getTableConfigFile(),"r");
172 if( fp == NULL ) {
173 printError(ErrSysInit, "cache.table file does not exist");
174 return ErrSysInit;
176 char tablename[IDENTIFIER_LENGTH];
177 char fieldname[IDENTIFIER_LENGTH];
178 char condition[IDENTIFIER_LENGTH];
179 char field[IDENTIFIER_LENGTH];
180 int cmode;
181 CachedTable *node;
182 while(!feof(fp))
184 fscanf(fp, "%d:%s %s %s %s\n", &cmode, tablename,fieldname,condition,field);
185 node = new CachedTable();
186 strcpy(node->tableName, tablename);
187 cacheList.append(node);
189 fclose(fp);
190 return OK;
192 bool SqlLogConnection::isTableCached(char *tblName)
194 if (NULL == tblName)
196 printError(ErrBadArg, "tblName passed is NULL\n");
197 return ErrBadArg;
199 ListIterator iter = cacheList.getIterator();
200 CachedTable *node;
201 while (iter.hasElement()) {
202 node = (CachedTable*)iter.nextElement();
203 if (strcmp(node->tableName, tblName) == 0)
205 return true;
208 return false;
212 DbRetVal SqlLogConnection::sendAndReceive(NetworkPacketType type, char *packet, int length)
214 return OK;
215 NetworkClient* nwClient = nwTable.getNetworkClient();
216 DbRetVal rv = OK, retRV=OK;
217 printf("isCacheClient %d\n", nwClient->isCacheClient());
218 printf("isConnected %d\n", nwClient->isConnected());
220 if (!nwClient->isConnected()) {
221 if (nwClient->isCacheClient()) return ErrOS;
222 //TODO::put this packet in send buffer.
223 return OK;
226 rv = nwClient->send(type, packet, length);
227 if (rv != OK)
229 printf("Unable to send pkt to peer with nwid %d\n", nwClient->getNetworkID());
230 //TODO:: put this packet in resend buffer, so that it will sent later by another thread for repl mode
231 nwClient->setConnectFlag(false);
232 if (nwClient->isCacheClient()) return ErrOS; else return OK;
234 rv = nwClient->receive();
235 if (rv != OK)
237 printf("Unable to receive ack pkt from peer with nwid %d\n", nwClient->getNetworkID());
238 nwClient->setConnectFlag(false);
239 if (nwClient->isCacheClient()) return ErrOS;
240 //TODO:: put this packet to resend buffer so that it can be sent later
241 //and call continue;
243 return OK;