1501526 Composite primary keys
[csql.git] / src / sqllog / SqlLogConnection.cxx
blobaef7f613bfeb11a6cad28b49c54941e6e5425726
1 /***************************************************************************
2  *   Copyright (C) 2007 by Prabakaran Thirumalai   *
3  *   praba_tuty@yahoo.com   *
4  *                                                                         *
5  *   This program is free software; you can redistribute it and/or modify  *
6  *   it under the terms of the GNU General Public License as published by  *
7  *   the Free Software Foundation; either version 2 of the License, or     *
8  *   (at your option) any later version.                                   *
9  *                                                                         *
10  *   This program is distributed in the hope that it will be useful,       *
11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
13  *   GNU General Public License for more details.                          *
14  *                                                                         *
15  *   You should have received a copy of the GNU General Public License     *
16  *   along with this program; if not, write to the                         *
17  *   Free Software Foundation, Inc.,                                       *
18  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
19  ***************************************************************************/
20 #include <SqlLogConnection.h>
21 #include <CSql.h>
22 #include <Network.h>
24 UniqueID SqlLogConnection::txnUID;
25 List SqlLogConnection::cacheList;
27 DbRetVal SqlLogConnection::addPacket(BasePacket* pkt)
29     logStore.append(pkt);
30     return OK;
32 DbRetVal SqlLogConnection::addPreparePacket(PacketPrepare* pkt)
34     curPrepareStore.append(pkt);
35     return OK;
38 DbRetVal SqlLogConnection::removePreparePacket(int stmtid)
40     ListIterator iter = prepareStore.getIterator();
41     PacketPrepare *pkt = NULL, *dpkt=NULL;
42     while (iter.hasElement())
43     {
44         pkt = (PacketPrepare*)iter.nextElement();
45         if (pkt->stmtID == stmtid) dpkt = pkt;
46     }
47     if (dpkt == NULL) return OK;
48     //TEMP:mask below error for now
49     if (dpkt == NULL)
50     {
51         printError(ErrNotFound, "Prepare packet not found in list for %d\n", stmtid);
52         return ErrNotFound;
53     }
54     delete dpkt;
55     prepareStore.remove(dpkt);
56     return OK;
59 DbRetVal SqlLogConnection::connect (char *user, char *pass)
61     DbRetVal rv = OK;
62     //printf("LOG: connect\n");
63     if (innerConn) rv = innerConn->connect(user,pass);
64     if (rv != OK) return rv;
65     if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK;
66     if (rv !=OK) { innerConn->disconnect(); return rv; }
68     //populate cacheList if not populated by another thread in same process
69     //TODO::cacheList requires mutex guard
70     if (0 == cacheList.size()) rv = populateCachedTableList(); 
71     return rv;
72     
74 DbRetVal SqlLogConnection::disconnect()
76     DbRetVal rv = OK;
77     //printf("LOG: disconnect\n");
78     if (innerConn) rv =innerConn->disconnect();
79     if (rv != OK) return rv;
80     if (!Conf::config.useReplication() && !Conf::config.useCache()) return OK;
81     return rv;
83 DbRetVal SqlLogConnection::beginTrans(IsolationLevel isoLevel, TransSyncMode mode)
85     DbRetVal rv = OK;
86     if (innerConn) rv =  innerConn->beginTrans(isoLevel);
87     if (rv != OK) return rv;
89     syncMode = mode;
90     return OK;
92 DbRetVal SqlLogConnection::commit()
94     DbRetVal rv = OK;
95     //printf("LOG: commit %d\n", syncMode);
96     //if (innerConn) rv =  innerConn->commit();
97     if (syncMode == OSYNC) {
98         if (innerConn) rv = innerConn->commit();
99         return rv;
100     }
101     if (logStore.size() == 0) 
102     { 
103         //This means no execution for any non select statements in 
104         //this transaction
105         //rollback so that subsequent beginTrans will not report that
106         //transaction is already started
107         if (innerConn) {
108             rv =  innerConn->rollback(); 
109             //if (rv != OK) return rv;
110             //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
111         }
112         return rv; 
113     }
114     if (syncMode == ASYNC) {
115     //TODO::put the packet in global log store
116     /*
117     PacketCommit *pkt = new PacketCommit();
118     int tid = txnUID.getID();
119     pkt->setExecPackets(tid, logStore);
120     pkt->marshall();
121     int *p = (int*) pkt->getMarshalledBuffer();
122     NetworkClient *nwClient= nwTable.getNetworkClient();
123     if (syncMode == ASYNC) {
124         rv = nwClient->send(NW_PKT_COMMIT, pkt->getMarshalledBuffer(), 
125                                           pkt->getBufferSize());    
126         if (rv !=OK) 
127         {
128             printError(ErrOS, "Unable to send SQL Logs to peer site\n");
129             return ErrOS;
130         }
131         rv = nwClient->receive();    
132         if (rv !=OK) 
133         {
134           printError(ErrOS, "Could not get acknowledgement from peer site\n");
135           return ErrPeerExecFailed;
136         }
137         //TODO::remove all sql logs nodes and the list which contains ptr to it
138         */
139     }
140     
141     ListIterator logStoreIter = logStore.getIterator();
142     PacketExecute *execPkt = NULL;
143     while (logStoreIter.hasElement())
144     {
145         execPkt = (PacketExecute*)logStoreIter.nextElement();
146         delete execPkt;
147     }
148     logStore.reset();
149     if (innerConn) rv = innerConn->commit();
150     return rv;
152 DbRetVal SqlLogConnection::rollback()
154     DbRetVal rv = OK;
155     //printf("LOG: rollback \n");
156     if (innerConn) rv =  innerConn->rollback();
157     if (rv != OK) return rv;
158     ListIterator logStoreIter = logStore.getIterator();
159     PacketExecute *execPkt = NULL;
160     while (logStoreIter.hasElement())
161     {
162         execPkt = (PacketExecute*)logStoreIter.nextElement();
163         delete execPkt;
164     }
165     logStore.reset();
166     return rv;
168 DbRetVal SqlLogConnection::populateCachedTableList()
170     FILE *fp = NULL;
171     fp = fopen(Conf::config.getTableConfigFile(),"r");
172     if( fp == NULL ) {
173         printError(ErrSysInit, "cache.table file does not exist");
174         return ErrSysInit;
175     }
176     char tablename[IDENTIFIER_LENGTH];
177     int cmode;
178     CachedTable *node;
179     while(!feof(fp))
180     {
181         fscanf(fp, "%d:%s\n", &cmode, tablename);
182         node = new CachedTable();
183         strcpy(node->tableName, tablename);
184         cacheList.append(node);
185     }
186     fclose(fp);
187     return OK;
189 bool SqlLogConnection::isTableCached(char *tblName)
191     if (NULL == tblName)
192     {
193         printError(ErrBadArg, "tblName passed is NULL\n");
194         return ErrBadArg;
195     }
196     ListIterator iter = cacheList.getIterator();
197     CachedTable *node;
198     while (iter.hasElement()) {
199         node = (CachedTable*)iter.nextElement();
200         if (strcmp(node->tableName, tblName) == 0)
201         {
202            return true;
203         }
204     }
205     return false;
209 DbRetVal SqlLogConnection::sendAndReceive(NetworkPacketType type, char *packet, int length)
211     return OK;
212     NetworkClient* nwClient = nwTable.getNetworkClient();
213     DbRetVal rv = OK, retRV=OK;
214     printf("isCacheClient %d\n", nwClient->isCacheClient());
215     printf("isConnected %d\n", nwClient->isConnected());
217     if (!nwClient->isConnected()) {
218         if (nwClient->isCacheClient()) return ErrOS; 
219         //TODO::put this packet in send buffer.
220         return OK;
221     }
223     rv = nwClient->send(type, packet, length);
224     if (rv != OK) 
225     {
226        printf("Unable to send pkt to peer with nwid %d\n", nwClient->getNetworkID());
227        //TODO:: put this packet in resend buffer, so that it will sent later by another thread for repl mode
228         nwClient->setConnectFlag(false);
229         if (nwClient->isCacheClient()) return ErrOS; else return OK;
230     }
231     rv = nwClient->receive();
232     if (rv != OK)
233     {
234         printf("Unable to receive ack pkt from peer with nwid %d\n", nwClient->getNetworkID());
235         nwClient->setConnectFlag(false);
236         if (nwClient->isCacheClient()) return ErrOS;
237         //TODO:: put this packet to resend buffer so that it can be sent later
238         //and call continue;
239     }
240     return OK;