redo log changes. prepare with param will go to stmt logs and other stmts go to redo...
[csql.git] / include / SqlLogConnection.h
blob1fe4b630912c09652146ecf765ea4a79c3c196c2
1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #ifndef SQLLOGCONNECTION_H
21 #define SQLLOGCONNECTION_H
22 #include<CSql.h>
23 #include<SqlFactory.h>
24 #include<Util.h>
25 #include<Network.h>
29 * @class SqlLogConnection
33 typedef struct my_msgbuffer {
34 long mtype;
35 char data[1];
36 } Message;
38 class AbsSqlLogSend
40 public:
41 virtual DbRetVal prepare(int tId, int sId, int len, char *st, char *tn,
42 bool hasParam)=0;
43 virtual DbRetVal commit(int len, void *data)=0;
44 virtual DbRetVal free(int txnId, int stmtId, bool hasParam)=0;
47 class MsgQueueSend : public AbsSqlLogSend
49 int msgQId;
50 public:
51 MsgQueueSend() { msgQId = os::msgget(Conf::config.getMsgKey(), 0666); }
52 DbRetVal prepare(int tId, int sId, int len, char *stmt, char *tn,
53 bool hasParam);
54 DbRetVal commit(int len, void *data);
55 DbRetVal free(int txnId, int stmtId, bool hasParam);
58 class FileSend : public AbsSqlLogSend
60 int fdRedoLog;
61 int fdStmtLog;
62 public:
63 FileSend();
64 ~FileSend();
65 DbRetVal openRedoFile();
66 DbRetVal prepare(int txnId, int stmtId, int len, char *stmt, char*tn,
67 bool hasParam);
68 DbRetVal commit(int len, void *data);
69 DbRetVal free(int txnId, int stmtId, bool hasParam);
73 class OfflineLog : public AbsSqlLogSend
75 int fdOfflineLog;
76 void *metadata;
77 int fileSize;
78 DbRetVal createMetadataFile();
79 void *openMetadataFile();
80 public:
81 OfflineLog();
82 ~OfflineLog();
83 DbRetVal openOfflineLogFile();
84 DbRetVal prepare(int txnId, int stmtId, int len, char *stmt, char*tn,
85 bool hasParam);
86 DbRetVal commit(int len, void *data);
87 DbRetVal free(int txnId, int stmtId, bool hasParam);
91 enum ExecType
93 EXECONLY = 0,
94 SETPARAM
97 class ExecLogInfo
99 public:
100 ExecLogInfo() : pos(0), len(0) {}
101 int stmtId;
102 ExecType type;
103 int pos;
104 DataType dataType;
105 int len;
106 int value; //Extendible value as per parameter type size
109 class SqlLogConnection : public AbsSqlConnection
111 Connection dummyConn;
113 //stores all the sql log packets to be shipped to peers
114 List logStore;
116 List execLogStore;
117 int execLogStoreSize;
119 //stores all the prepare log packets to be shipped to peers
120 //as soon as connection is reestablished to cache server
121 List prepareStore;
123 //stores all the prepare log packets to be shipped between two
124 //consecutive commits. Commit() call sends first all the stmts
125 //prepared during the course and then sends the exec pkts
126 List curPrepareStore;
128 //sync mode of the current transaction
129 TransSyncMode syncMode;
131 //stores client objects in it for peer
132 NetworkTable nwTable;
133 AbsSqlLogSend *msgQSend;
134 AbsSqlLogSend *fileSend;
135 AbsSqlLogSend *offlineLog;
137 GlobalUniqueID txnUID;
138 static List cacheList;
139 int txnID;
140 DbRetVal populateCachedTableList();
141 public:
142 SqlLogConnection() {
143 innerConn = NULL; syncMode = ASYNC;
144 if ( Conf::config.useCache() &&
145 Conf::config.getCacheMode()==ASYNC_MODE)
146 msgQSend = new MsgQueueSend();
147 else msgQSend = NULL;
148 if (Conf::config.useDurability()) { fileSend = new FileSend(); }
149 else fileSend = NULL;
150 if (Conf::config.useCache() &&
151 Conf::config.getCacheMode() == OFFLINE_MODE)
152 offlineLog = new OfflineLog;
153 else offlineLog = NULL;
154 txnUID.open();
155 execLogStoreSize =0;
156 noMsgLog = false;
157 noOfflineLog = false;
158 txnID = 0;
160 ~SqlLogConnection();
161 bool isTableCached(char *name);
162 bool noMsgLog;
163 bool noOfflineLog;
164 //Note::forced to implement this as it is pure virtual in base class
165 Connection& getConnObject(){ return dummyConn; }
167 DbRetVal connect (char *user, char * pass);
169 DbRetVal disconnect();
171 DbRetVal commit();
173 DbRetVal rollback();
175 DbRetVal beginTrans (IsolationLevel isoLevel, TransSyncMode mode);
177 DbRetVal msgPrepare(int tId, int sId, int len, char *stmt, char *tname,
178 bool hasParam)
180 return msgQSend->prepare(tId, sId, len, stmt, tname, hasParam);
182 DbRetVal fileLogPrepare(int tId, int sId, int len, char *stmt, char *tname,
183 bool hasParam)
185 return fileSend->prepare(tId, sId, len, stmt, tname, hasParam);
187 DbRetVal offlineLogPrepare(int tId, int sId, int len, char *stmt,
188 char *tname, bool hasParam)
190 return offlineLog->prepare(tId, sId, len, stmt, tname, hasParam);
192 DbRetVal commitLogs(int logSize, void *data)
194 int txnId = getTxnID();
195 if (((Conf::config.useCache() &&
196 Conf::config.getCacheMode() == ASYNC_MODE)) && !noMsgLog)
197 msgQSend->commit(logSize, data);
198 if (Conf::config.useDurability()) fileSend->commit(logSize, data);
199 if (Conf::config.useCache() &&
200 Conf::config.getCacheMode()==OFFLINE_MODE &&
201 !noOfflineLog)
202 offlineLog->commit(logSize, data);
203 return OK;
205 DbRetVal freeLogs(int stmtId, bool hasParam)
207 int txnId = getTxnID();
208 if ( ((Conf::config.useCache() &&
209 Conf::config.getCacheMode() == ASYNC_MODE)) && !noMsgLog)
210 msgQSend->free(txnId, stmtId, hasParam);
211 if (Conf::config.useDurability()) fileSend->free(txnId, stmtId, hasParam);
212 if (Conf::config.useCache() &&
213 Conf::config.getCacheMode()==OFFLINE_MODE &&
214 !noOfflineLog)
215 offlineLog->free(txnId, stmtId, hasParam);
216 return OK;
218 void addExecLog(ExecLogInfo *info) { execLogStore.append(info); }
219 void addToExecLogSize(int size){ execLogStoreSize += size; }
220 int getExecLogStoreSize() { return execLogStoreSize; }
221 List getExecLogList() { return execLogStore; }
222 DbRetVal addPacket(BasePacket *pkt);
223 DbRetVal addPreparePacket(PacketPrepare *pkt);
224 DbRetVal removePreparePacket(int stmtid);
226 DbRetVal setSyncMode(TransSyncMode mode);
227 void setNoMsgLog(bool nmlog) { noMsgLog = nmlog; }
228 void setNoOfflineLog(bool nolog) { noOfflineLog = nolog; }
229 TransSyncMode getSyncMode() { return syncMode; }
230 int getTxnID() { return txnID; }
231 DbRetVal connectIfNotConnected() { return nwTable.connectIfNotConnected(); }
232 DbRetVal sendAndReceive(NetworkPacketType type, char *packet, int length);
233 friend class SqlFactory;
236 #endif