1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
21 #include <SqlLogConnection.h>
22 #include <SqlLogStatement.h>
26 GlobalUniqueID
SqlLogConnection::txnUID
;
27 List
SqlLogConnection::cacheList
;
29 DbRetVal
SqlLogConnection::addPacket(BasePacket
* pkt
)
34 DbRetVal
SqlLogConnection::addPreparePacket(PacketPrepare
* pkt
)
36 curPrepareStore
.append(pkt
);
40 DbRetVal
SqlLogConnection::removePreparePacket(int stmtid
)
42 ListIterator iter
= prepareStore
.getIterator();
43 PacketPrepare
*pkt
= NULL
, *dpkt
=NULL
;
44 while (iter
.hasElement())
46 pkt
= (PacketPrepare
*)iter
.nextElement();
47 if (pkt
->stmtID
== stmtid
) dpkt
= pkt
;
49 if (dpkt
== NULL
) return OK
;
50 //TEMP:mask below error for now
53 printError(ErrNotFound
, "Prepare packet not found in list for %d\n", stmtid
);
57 prepareStore
.remove(dpkt
);
61 DbRetVal
SqlLogConnection::connect (char *user
, char *pass
)
64 //printf("LOG: connect\n");
65 if (innerConn
) rv
= innerConn
->connect(user
,pass
);
66 if (rv
!= OK
) return rv
;
67 if (!Conf::config
.useDurability()) return OK
;
69 //populate cacheList if not populated by another thread in same process
70 //TODO::cacheList requires mutex guard
71 if (0 == cacheList
.size()) rv
= populateCachedTableList();
72 rv
= SqlLogStatement::stmtUID
.open();
73 rv
= SqlLogConnection::txnUID
.open();
77 DbRetVal
SqlLogConnection::disconnect()
80 //printf("LOG: disconnect\n");
81 if (innerConn
) rv
=innerConn
->disconnect();
82 if (rv
!= OK
) return rv
;
83 if (!Conf::config
.useDurability()) return OK
;
84 SqlLogStatement::stmtUID
.close();
87 DbRetVal
SqlLogConnection::beginTrans(IsolationLevel isoLevel
, TransSyncMode mode
)
90 if (innerConn
) rv
= innerConn
->beginTrans(isoLevel
);
91 if (rv
!= OK
) return rv
;
93 txnID
= SqlLogConnection::txnUID
.getID(TXN_ID
);
96 DbRetVal
SqlLogConnection::commit()
99 //printf("LOG: commit %d\n", syncMode);
100 //if (innerConn) rv = innerConn->commit();
101 if (innerConn
) rv
= innerConn
->commit();
102 if (!Conf::config
.useDurability()) return OK
;
104 if (execLogStore
.size() == 0) {
105 //This means no execution for any non select statements in
107 //rollback so that subsequent beginTrans will not report that
108 //transaction is already started
110 rv
= innerConn
->rollback();
111 //if (rv != OK) return rv;
112 //rv = innerConn->beginTrans(READ_COMMITTED, syncMode);
117 //TODO::put the packet in global log store
119 PacketCommit *pkt = new PacketCommit();
120 int tid = txnUID.getID();
121 pkt->setExecPackets(tid, logStore);
123 int *p = (int*) pkt->getMarshalledBuffer();
124 NetworkClient *nwClient= nwTable.getNetworkClient();
125 if (syncMode == ASYNC) {
126 rv = nwClient->send(NW_PKT_COMMIT, pkt->getMarshalledBuffer(),
127 pkt->getBufferSize());
130 printError(ErrOS, "Unable to send SQL Logs to peer site\n");
133 rv = nwClient->receive();
136 printError(ErrOS, "Could not get acknowledgement from peer site\n");
137 return ErrPeerExecFailed;
139 //TODO::remove all sql logs nodes and the list which contains ptr to it
141 int txnId
= getTxnID();
142 // len to be sent should also contain txnId
143 int len
= 2 * sizeof(int) + os::align(getExecLogStoreSize());
144 printDebug(DM_SqlLog
, "commit: size of logstore: %d", len
);
145 int bufferSize
= sizeof(long) + len
;
146 printDebug(DM_SqlLog
, "commit: size of buffer: %d", bufferSize
);
147 void *buffer
= malloc(bufferSize
);
148 printDebug(DM_SqlLog
, "commit: buffer address: %x", buffer
);
149 char *ptr
= (char *)buffer
+ sizeof(long); // long type is for msgtype
151 printDebug(DM_SqlLog
, "commit: data address: %x", data
);
154 *(int *) ptr
= txnId
;
156 ListIterator logStoreIter
= execLogStore
.getIterator();
157 ExecLogInfo
*elInfo
= NULL
;
158 while (logStoreIter
.hasElement()) {
159 elInfo
= (ExecLogInfo
*)logStoreIter
.nextElement();
160 printDebug(DM_SqlLog
, "commit: elem from logstore:: %x", elInfo
);
161 *(int *) ptr
= elInfo
->stmtId
;
162 printDebug(DM_SqlLog
, "commit: stmtId to marshall: %d", elInfo
->stmtId
);
164 *(int *) ptr
= (int) elInfo
->type
;
165 printDebug(DM_SqlLog
, "commit: ExType to marshall: %d", elInfo
->type
);
166 //printf("PRABA::type is %d\n" , *(int *) ptr);
168 if (elInfo
->type
== SETPARAM
) {
169 *(int *) ptr
= elInfo
->pos
;
170 printDebug(DM_SqlLog
, "commit: PrmPos to marshall: %d", elInfo
->pos
);
172 *(int *) ptr
= (int) elInfo
->dataType
;
173 printDebug(DM_SqlLog
, "commit: DtType to marshall: %d", elInfo
->dataType
);
175 *(int *) ptr
= elInfo
->len
;
176 printDebug(DM_SqlLog
, "commit: length to marshall: %d", elInfo
->len
);
178 memcpy(ptr
, &elInfo
->value
, elInfo
->len
);
182 commitLogs(len
, data
);
183 execLogStore
.reset();
185 //if (innerConn) rv = innerConn->commit();
188 DbRetVal
SqlLogConnection::rollback()
191 //printf("LOG: rollback \n");
192 if (innerConn
) rv
= innerConn
->rollback();
193 if (rv
!= OK
) return rv
;
194 ListIterator logStoreIter
= logStore
.getIterator();
195 PacketExecute
*execPkt
= NULL
;
196 while (logStoreIter
.hasElement())
198 execPkt
= (PacketExecute
*)logStoreIter
.nextElement();
203 execLogStore
.reset();
208 DbRetVal
SqlLogConnection::populateCachedTableList()
211 fp
= fopen(Conf::config
.getTableConfigFile(),"r");
213 printError(ErrSysInit
, "cache.table file does not exist");
216 char tablename
[IDENTIFIER_LENGTH
];
217 char fieldname
[IDENTIFIER_LENGTH
];
218 char condition
[IDENTIFIER_LENGTH
];
219 char field
[IDENTIFIER_LENGTH
];
224 fscanf(fp
, "%d:%s %s %s %s\n", &cmode
, tablename
,fieldname
,condition
,field
);
225 node
= new CachedTable();
226 strcpy(node
->tableName
, tablename
);
227 cacheList
.append(node
);
233 bool SqlLogConnection::isTableCached(char *tblName
)
237 printError(ErrBadArg
, "tblName passed is NULL\n");
240 ListIterator iter
= cacheList
.getIterator();
242 while (iter
.hasElement()) {
243 node
= (CachedTable
*)iter
.nextElement();
244 if (strcmp(node
->tableName
, tblName
) == 0)