allocator fixes
[csql.git] / include / SqlLogConnection.h
blobc0a587e717e03b1c2410a4456a6611c820ec6560
1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #ifndef SQLLOGCONNECTION_H
21 #define SQLLOGCONNECTION_H
22 #include<CSql.h>
23 #include<SqlFactory.h>
24 #include<Util.h>
25 #include<Network.h>
27 /**
28 * @class SqlLogConnection
32 typedef struct my_msgbuffer {
33 long mtype;
34 char data[1];
35 } Message;
37 class AbsSqlLogSend
39 public:
40 virtual DbRetVal prepare(int tId, int sId, int len, char *st, char *tn)=0;
41 virtual DbRetVal commit(int len, void *data)=0;
42 virtual DbRetVal free(int txnId, int stmtId)=0;
45 class MsgQueueSend : public AbsSqlLogSend
47 int msgQId;
48 public:
49 MsgQueueSend() { msgQId = os::msgget(Conf::config.getMsgKey(), 0666); }
50 DbRetVal prepare(int tId, int sId, int len, char *stmt, char *tn);
51 DbRetVal commit(int len, void *data);
52 DbRetVal free(int txnId, int stmtId);
55 class FileSend : public AbsSqlLogSend
57 int fdRedoLog;
58 public:
59 FileSend();
60 ~FileSend();
61 DbRetVal openRedoFile();
62 DbRetVal prepare(int txnId, int stmtId, int len, char *stmt, char*tn);
63 DbRetVal commit(int len, void *data);
64 DbRetVal free(int txnId, int stmtId);
67 class OfflineLog : public AbsSqlLogSend
69 int fdOfflineLog;
70 void *metadata;
71 int fileSize;
72 DbRetVal createMetadataFile();
73 void *openMetadataFile();
74 public:
75 OfflineLog();
76 ~OfflineLog();
77 DbRetVal openOfflineLogFile();
78 DbRetVal prepare(int txnId, int stmtId, int len, char *stmt, char*tn);
79 DbRetVal commit(int len, void *data);
80 DbRetVal free(int txnId, int stmtId);
85 enum ExecType
87 EXECONLY = 0,
88 SETPARAM
91 class ExecLogInfo
93 public:
94 ExecLogInfo() : pos(0), len(0) {}
95 int stmtId;
96 ExecType type;
97 int pos;
98 DataType dataType;
99 int len;
100 int value; //Extendible value as per parameter type size
103 class SqlLogConnection : public AbsSqlConnection
105 Connection dummyConn;
107 //stores all the sql log packets to be shipped to peers
108 List logStore;
110 List execLogStore;
111 int execLogStoreSize;
113 //stores all the prepare log packets to be shipped to peers
114 //as soon as connection is reestablished to cache server
115 List prepareStore;
117 //stores all the prepare log packets to be shipped between two
118 //consecutive commits. Commit() call sends first all the stmts
119 //prepared during the course and then sends the exec pkts
120 List curPrepareStore;
122 //sync mode of the current transaction
123 TransSyncMode syncMode;
125 //stores client objects in it for peer
126 NetworkTable nwTable;
127 AbsSqlLogSend *msgQSend;
128 AbsSqlLogSend *fileSend;
129 AbsSqlLogSend *offlineLog;
131 GlobalUniqueID txnUID;
132 static List cacheList;
133 int txnID;
134 DbRetVal populateCachedTableList();
135 public:
136 SqlLogConnection() {
137 innerConn = NULL; syncMode = ASYNC;
138 if (Conf::config.useCache() && Conf::config.getCacheMode()==ASYNC_MODE)
139 msgQSend = new MsgQueueSend();
140 else msgQSend = NULL;
141 if (Conf::config.useDurability()) { fileSend = new FileSend(); }
142 else fileSend = NULL;
143 if (Conf::config.useCache() &&
144 Conf::config.getCacheMode() == OFFLINE_MODE)
145 offlineLog = new OfflineLog;
146 else offlineLog = NULL;
147 txnUID.open();
148 execLogStoreSize =0;
149 noMsgLog = false;
150 noOfflineLog = false;
152 ~SqlLogConnection();
153 bool isTableCached(char *name);
154 bool noMsgLog;
155 bool noOfflineLog;
156 //Note::forced to implement this as it is pure virtual in base class
157 Connection& getConnObject(){ return dummyConn; }
159 DbRetVal connect (char *user, char * pass);
161 DbRetVal disconnect();
163 DbRetVal commit();
165 DbRetVal rollback();
167 DbRetVal beginTrans (IsolationLevel isoLevel, TransSyncMode mode);
169 DbRetVal msgPrepare(int tId, int sId, int len, char *stmt, char *tname)
171 return msgQSend->prepare(tId, sId, len, stmt, tname);
173 DbRetVal fileLogPrepare(int tId, int sId, int len, char *stmt, char *tname)
175 return fileSend->prepare(tId, sId, len, stmt, tname);
177 DbRetVal offlineLogPrepare(int tId, int sId, int len, char *st, char *tnm)
179 return offlineLog->prepare(tId, sId, len, st, tnm);
181 DbRetVal commitLogs(int logSize, void *data)
183 int txnId = getTxnID();
184 if (((Conf::config.useCache() &&
185 Conf::config.getCacheMode() == ASYNC_MODE)) && !noMsgLog)
186 msgQSend->commit(logSize, data);
187 if (Conf::config.useDurability()) fileSend->commit(logSize, data);
188 if (Conf::config.useCache() &&
189 Conf::config.getCacheMode()==OFFLINE_MODE &&
190 !noOfflineLog)
191 offlineLog->commit(logSize, data);
192 return OK;
194 DbRetVal freeLogs(int stmtId)
196 int txnId = getTxnID();
197 if ( ((Conf::config.useCache() &&
198 Conf::config.getCacheMode() == ASYNC_MODE)) && !noMsgLog)
199 msgQSend->free(txnId, stmtId);
200 if (Conf::config.useDurability()) fileSend->free(txnId, stmtId);
201 if (Conf::config.useCache() &&
202 Conf::config.getCacheMode()==OFFLINE_MODE &&
203 !noOfflineLog)
204 offlineLog->free(txnId, stmtId);
205 return OK;
207 void addExecLog(ExecLogInfo *info) { execLogStore.append(info); }
208 void addToExecLogSize(int size){ execLogStoreSize += size; }
209 int getExecLogStoreSize() { return execLogStoreSize; }
210 List getExecLogList() { return execLogStore; }
211 DbRetVal addPacket(BasePacket *pkt);
212 DbRetVal addPreparePacket(PacketPrepare *pkt);
213 DbRetVal removePreparePacket(int stmtid);
215 DbRetVal setSyncMode(TransSyncMode mode);
216 void setNoMsgLog(bool nmlog) { noMsgLog = nmlog; }
217 void setNoOfflineLog(bool nolog) { noOfflineLog = nolog; }
218 TransSyncMode getSyncMode() { return syncMode; }
219 int getTxnID() { return txnID; }
220 DbRetVal connectIfNotConnected() { return nwTable.connectIfNotConnected(); }
221 DbRetVal sendAndReceive(NetworkPacketType type, char *packet, int length);
222 friend class SqlFactory;
225 #endif