1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #include <SqlLogConnection.h>
24 UniqueID SqlLogConnection::txnUID;
25 List SqlLogConnection::cacheList;
27 DbRetVal SqlLogConnection::addPacket(BasePacket* pkt)
32 DbRetVal SqlLogConnection::addPreparePacket(PacketPrepare* pkt)
34 curPrepareStore.append(pkt);
38 DbRetVal SqlLogConnection::removePreparePacket(int stmtid)
40 ListIterator iter = prepareStore.getIterator();
41 PacketPrepare *pkt = NULL, *dpkt=NULL;
42 while (iter.hasElement())
44 pkt = (PacketPrepare*)iter.nextElement();
45 if (pkt->stmtID == stmtid) dpkt = pkt;
47 if (dpkt == NULL) return OK;
48 //TEMP:mask below error for now
51 printError(ErrNotFound, "Prepare packet not found in list for %d\n", stmtid);
55 prepareStore.remove(dpkt);
59 DbRetVal SqlLogConnection::connect (char *user, char *pass)
62 //printf("LOG: connect\n");
63 if (innerConn) rv = innerConn->connect(user,pass);
64 if (rv != OK) return rv;
65 if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK;
66 if (rv !=OK) { innerConn->disconnect(); return rv; }
68 //populate cacheList if not populated by another thread in same process
69 //TODO::cacheList requires mutex guard
70 if (0 == cacheList.size()) rv = populateCachedTableList();
74 DbRetVal SqlLogConnection::disconnect()
77 //printf("LOG: disconnect\n");
78 if (innerConn) rv =innerConn->disconnect();
79 if (rv != OK) return rv;
80 if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK;
83 DbRetVal SqlLogConnection::beginTrans(IsolationLevel isoLevel, TransSyncMode mode)
86 if (innerConn) rv = innerConn->beginTrans(isoLevel);
87 if (rv != OK) return rv;
92 DbRetVal SqlLogConnection::commit()
95 //printf("LOG: commit %d\n", syncMode);
96 //if (innerConn) rv = innerConn->commit();
97 if (syncMode == OSYNC) {
98 if (innerConn) rv = innerConn->commit();
101 if (logStore.size() == 0)
103 //This means no execution for any non select statements in
105 //rollback so that subsequent beginTrans will not report that
106 //transaction is already started
108 rv = innerConn->rollback();
109 //if (rv != OK) return rv;
110 //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
114 if (syncMode == ASYNC) {
115 //TODO::put the packet in global log store
117 PacketCommit *pkt = new PacketCommit();
118 int tid = txnUID.getID();
119 pkt->setExecPackets(tid, logStore);
121 int *p = (int*) pkt->getMarshalledBuffer();
122 NetworkClient *nwClient= nwTable.getNetworkClient();
123 if (syncMode == ASYNC) {
124 rv = nwClient->send(NW_PKT_COMMIT, pkt->getMarshalledBuffer(),
125 pkt->getBufferSize());
128 printError(ErrOS, "Unable to send SQL Logs to peer site\n");
131 rv = nwClient->receive();
134 printError(ErrOS, "Could not get acknowledgement from peer site\n");
135 return ErrPeerExecFailed;
137 //TODO::remove all sql logs nodes and the list which contains ptr to it
141 ListIterator logStoreIter = logStore.getIterator();
142 PacketExecute *execPkt = NULL;
143 while (logStoreIter.hasElement())
145 execPkt = (PacketExecute*)logStoreIter.nextElement();
149 if (innerConn) rv = innerConn->commit();
152 DbRetVal SqlLogConnection::rollback()
155 //printf("LOG: rollback \n");
156 if (innerConn) rv = innerConn->rollback();
157 if (rv != OK) return rv;
158 ListIterator logStoreIter = logStore.getIterator();
159 PacketExecute *execPkt = NULL;
160 while (logStoreIter.hasElement())
162 execPkt = (PacketExecute*)logStoreIter.nextElement();
168 DbRetVal SqlLogConnection::populateCachedTableList()
171 fp = fopen(Conf::config.getTableConfigFile(),"r");
173 printError(ErrSysInit, "cache.table file does not exist");
176 char tablename[IDENTIFIER_LENGTH];
181 fscanf(fp, "%d:%s\n", &cmode, tablename);
182 node = new CachedTable();
183 strcpy(node->tableName, tablename);
184 cacheList.append(node);
189 bool SqlLogConnection::isTableCached(char *tblName)
193 printError(ErrBadArg, "tblName passed is NULL\n");
196 ListIterator iter = cacheList.getIterator();
198 while (iter.hasElement()) {
199 node = (CachedTable*)iter.nextElement();
200 if (strcmp(node->tableName, tblName) == 0)
209 DbRetVal SqlLogConnection::sendAndReceive(NetworkPacketType type, char *packet, int length)
212 NetworkClient* nwClient = nwTable.getNetworkClient();
213 DbRetVal rv = OK, retRV=OK;
214 printf("isCacheClient %d\n", nwClient->isCacheClient());
215 printf("isConnected %d\n", nwClient->isConnected());
217 if (!nwClient->isConnected()) {
218 if (nwClient->isCacheClient()) return ErrOS;
219 //TODO::put this packet in send buffer.
223 rv = nwClient->send(type, packet, length);
226 printf("Unable to send pkt to peer with nwid %d\n", nwClient->getNetworkID());
227 //TODO:: put this packet in resend buffer, so that it will sent later by another thread for repl mode
228 nwClient->setConnectFlag(false);
229 if (nwClient->isCacheClient()) return ErrOS; else return OK;
231 rv = nwClient->receive();
234 printf("Unable to receive ack pkt from peer with nwid %d\n", nwClient->getNetworkID());
235 nwClient->setConnectFlag(false);
236 if (nwClient->isCacheClient()) return ErrOS;
237 //TODO:: put this packet to resend buffer so that it can be sent later