1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #include <SqlLogConnection.h>
24 UniqueID
SqlLogConnection::txnUID
;
25 List
SqlLogConnection::cacheList
;
27 DbRetVal
SqlLogConnection::addPacket(BasePacket
* pkt
)
32 DbRetVal
SqlLogConnection::addPreparePacket(PacketPrepare
* pkt
)
34 curPrepareStore
.append(pkt
);
38 DbRetVal
SqlLogConnection::removePreparePacket(int stmtid
)
40 ListIterator iter
= prepareStore
.getIterator();
41 PacketPrepare
*pkt
= NULL
, *dpkt
=NULL
;
42 while (iter
.hasElement())
44 pkt
= (PacketPrepare
*)iter
.nextElement();
45 if (pkt
->stmtID
== stmtid
) dpkt
= pkt
;
47 if (dpkt
== NULL
) return OK
;
48 //TEMP:mask below error for now
51 printError(ErrNotFound
, "Prepare packet not found in list for %d\n", stmtid
);
55 prepareStore
.remove(dpkt
);
59 DbRetVal
SqlLogConnection::connect (char *user
, char *pass
)
62 //printf("LOG: connect\n");
63 if (innerConn
) rv
= innerConn
->connect(user
,pass
);
64 if (rv
!= OK
) return rv
;
65 if (!Conf::config
.useReplication() && !Conf::config
.useCache()) return OK
;
66 if (rv
!=OK
) { innerConn
->disconnect(); return rv
; }
68 //populate cacheList if not populated by another thread in same process
69 //TODO::cacheList requires mutex guard
70 if (0 == cacheList
.size()) rv
= populateCachedTableList();
74 DbRetVal
SqlLogConnection::disconnect()
77 //printf("LOG: disconnect\n");
78 if (innerConn
) rv
=innerConn
->disconnect();
79 if (rv
!= OK
) return rv
;
80 if (!Conf::config
.useReplication() && !Conf::config
.useCache()) return OK
;
83 DbRetVal
SqlLogConnection::beginTrans(IsolationLevel isoLevel
, TransSyncMode mode
)
86 if (innerConn
) rv
= innerConn
->beginTrans(isoLevel
);
87 if (rv
!= OK
) return rv
;
92 DbRetVal
SqlLogConnection::commit()
95 //printf("LOG: commit %d\n", syncMode);
96 //if (innerConn) rv = innerConn->commit();
97 if (syncMode
== OSYNC
) {
98 if (innerConn
) rv
= innerConn
->commit();
101 if (logStore
.size() == 0)
103 //This means no execution for any non select statements in
105 //rollback so that subsequent beginTrans will not report that
106 //transaction is already started
108 rv
= innerConn
->rollback();
109 //if (rv != OK) return rv;
110 //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
114 if (syncMode
== ASYNC
) {
115 //TODO::put the packet in global log store
117 PacketCommit *pkt = new PacketCommit();
118 int tid = txnUID.getID();
119 pkt->setExecPackets(tid, logStore);
121 int *p = (int*) pkt->getMarshalledBuffer();
122 NetworkClient *nwClient= nwTable.getNetworkClient();
123 if (syncMode == ASYNC) {
124 rv = nwClient->send(NW_PKT_COMMIT, pkt->getMarshalledBuffer(),
125 pkt->getBufferSize());
128 printError(ErrOS, "Unable to send SQL Logs to peer site\n");
131 rv = nwClient->receive();
134 printError(ErrOS, "Could not get acknowledgement from peer site\n");
135 return ErrPeerExecFailed;
137 //TODO::remove all sql logs nodes and the list which contains ptr to it
141 ListIterator logStoreIter
= logStore
.getIterator();
142 PacketExecute
*execPkt
= NULL
;
143 while (logStoreIter
.hasElement())
145 execPkt
= (PacketExecute
*)logStoreIter
.nextElement();
149 if (innerConn
) rv
= innerConn
->commit();
152 DbRetVal
SqlLogConnection::rollback()
155 //printf("LOG: rollback \n");
156 if (innerConn
) rv
= innerConn
->rollback();
157 if (rv
!= OK
) return rv
;
158 ListIterator logStoreIter
= logStore
.getIterator();
159 PacketExecute
*execPkt
= NULL
;
160 while (logStoreIter
.hasElement())
162 execPkt
= (PacketExecute
*)logStoreIter
.nextElement();
168 DbRetVal
SqlLogConnection::populateCachedTableList()
171 fp
= fopen(Conf::config
.getTableConfigFile(),"r");
173 printError(ErrSysInit
, "cache.table file does not exist");
176 char tablename
[IDENTIFIER_LENGTH
];
181 fscanf(fp
, "%d:%s\n", &cmode
, tablename
);
182 node
= new CachedTable();
183 strcpy(node
->tableName
, tablename
);
184 cacheList
.append(node
);
189 bool SqlLogConnection::isTableCached(char *tblName
)
193 printError(ErrBadArg
, "tblName passed is NULL\n");
196 ListIterator iter
= cacheList
.getIterator();
198 while (iter
.hasElement()) {
199 node
= (CachedTable
*)iter
.nextElement();
200 if (strcmp(node
->tableName
, tblName
) == 0)
209 DbRetVal
SqlLogConnection::sendAndReceive(NetworkPacketType type
, char *packet
, int length
)
212 NetworkClient
* nwClient
= nwTable
.getNetworkClient();
213 DbRetVal rv
= OK
, retRV
=OK
;
214 printf("isCacheClient %d\n", nwClient
->isCacheClient());
215 printf("isConnected %d\n", nwClient
->isConnected());
217 if (!nwClient->isConnected()) {
218 if (nwClient->isCacheClient()) return ErrOS;
219 //TODO::put this packet in send buffer.
223 rv
= nwClient
->send(type
, packet
, length
);
226 printf("Unable to send pkt to peer with nwid %d\n", nwClient
->getNetworkID());
227 //TODO:: put this packet in resend buffer, so that it will sent later by another thread for repl mode
228 nwClient
->setConnectFlag(false);
229 if (nwClient
->isCacheClient()) return ErrOS
; else return OK
;
231 rv
= nwClient
->receive();
234 printf("Unable to receive ack pkt from peer with nwid %d\n", nwClient
->getNetworkID());
235 nwClient
->setConnectFlag(false);
236 if (nwClient
->isCacheClient()) return ErrOS
;
237 //TODO:: put this packet to resend buffer so that it can be sent later