code reorg
[csql.git] / src / storage / DatabaseRecovery.cxx
blobb3dda2204a7aff1f135f46379a3924a107f52141
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<os.h>
18 #include<CatalogTables.h>
19 #include<Transaction.h>
20 #include<Lock.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
24 #include<HeapAllocator.h>
26 DbRetVal Database::writeDirtyPages(char *dataFile)
28 int fd = os::open(dataFile, fileOpenCreat, 0);
29 os::lseek(fd, 0, SEEK_SET);
30 void *buf = (void *) metaData_;
31 int sizeToWrite = os::alignLong(sizeof(DatabaseMetaData));
32 size_t retSize = os::write(fd, (char*)buf, sizeToWrite);
33 if (-1 == retSize)
35 printError(ErrWarning, "Warning:Unable to write metadata");
36 return ErrSysInternal;
38 PageInfo *pageInfo = (PageInfo*) getFirstPage();
39 long pageSize =PAGE_SIZE;
40 int pagesWritten=0, writeOffset=0;
41 long long totalBytesWritten=0;
42 while(isValidAddress((char*) pageInfo))
44 if ( NULL == pageInfo ) break;
45 if (pageInfo > getCurrentPage()) {
46 char *a="0";
47 os::lseek(fd, getMaxSize() -1, SEEK_SET);
48 if ( -1 == os::write(fd, a, 1)) {
49 printError(ErrSysInternal, "Unable to extend chkpt file");
50 os::close(fd);
51 return ErrSysInternal;
53 break;
55 if (BITSET(pageInfo->flags, IS_DIRTY)) {
56 if (NULL == pageInfo->nextPageAfterMerge_)
57 pageSize = PAGE_SIZE;
58 else
59 pageSize = (long)pageInfo->nextPageAfterMerge_ - (long)pageInfo;
60 writeOffset = (long) pageInfo - (long) metaData_;
61 ::lseek(fd, writeOffset, SEEK_SET);
62 CLEARBIT(pageInfo->flags, IS_DIRTY);
63 retSize = os::write(fd, (char*)pageInfo, pageSize);
64 if ( -1 == retSize ) {
65 printError(ErrSysInternal, "Unable to write dirty page %x", pageInfo);
66 os::close(fd);
67 return ErrSysInternal;
69 totalBytesWritten= totalBytesWritten + retSize;
70 pagesWritten++;
72 if ( pageInfo->nextPageAfterMerge_ == NULL) {
73 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
74 } else {
75 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
78 //printf("Total Dirty pages written %d %lld\n", pagesWritten, totalBytesWritten);
79 logFine(Conf::logger, "Total Dirty pages written %d\n", pagesWritten);
80 os::close(fd);
81 return OK;
84 DbRetVal Database::checkPoint()
86 char dataFile[MAX_FILE_LEN];
87 char cmd[MAX_FILE_LEN];
88 char dbRedoFileName[MAX_FILE_LEN];
89 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
90 if (!Conf::config.useMmap()) {
91 // sprintf(dataFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
92 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
93 FILE *fp = NULL;
94 if (fp = fopen(dataFile, "r")) {
95 fclose(fp);
96 int ret = ::unlink(dataFile);
97 if (ret != OK) {
98 printError(ErrOS, "Unable to delete old chkpt file. Failure");
99 return ErrOS;
102 int fd = ::open(dataFile, O_WRONLY|O_CREAT, 0644);
103 void *buf = (void *) metaData_;
104 os::lseek(fd, 0, SEEK_SET);
105 os::write(fd, (char*) buf, Conf::config.getMaxDbSize());
106 os::close(fd);
107 sprintf(cmd, "cp -f %s/db.chkpt.data %s/db.chkpt.data1", Conf::config.getDbFile(), Conf::config.getDbFile());
108 int ret = system(cmd);
109 if (ret != 0) {
110 printError(ErrOS, "Unable to take checkpoint back up file");
111 return ErrOS;
113 } else {
114 file_desc fd = getChkptfd();
115 if (!os::fdatasync(fd)) {
116 logFine(Conf::logger, "fsync succedded");
118 filterAndRemoveStmtLogs();
119 int ret = os::truncate(dbRedoFileName);
120 if (ret != 0) {
121 os::closeFile(fd);
122 printError(ErrSysInternal, "Unable to truncate redo log file");
123 printError(ErrSysInternal, "Delete %s manually and restart the server", dbRedoFileName);
124 return ErrOS;
126 //switch the checkpoint so that during recovery, fsynced checkpoint is
127 //used during recovery if the below step(writeDirtyPages)
128 //is not completed succesfully.
129 if (Database::getCheckpointID() == 0)
130 Database::setCheckpointID(1);
131 else
132 Database::setCheckpointID(0);
134 int val=Database::getCheckpointID();
136 sprintf(dataFile, "%s/db.chkpt.data%d", Conf::config.getDbFile(), val);
137 DbRetVal rv = writeDirtyPages(dataFile);
138 if (OK != rv)
140 printError(ErrSysInternal, "Unable to write dirty pages");
141 os::closeFile(fd);
142 return rv;
145 //Note: do not change order, chkpt id should be switched only after
146 //all dirty pages are written to disk. otherwise(if server crashes
147 //when it writes these dirty pages) recovery should use
148 //mapped file as fsync is already done on that file.
149 if (Database::getCheckpointID() == 0)
150 Database::setCheckpointID(1);
151 else
152 Database::setCheckpointID(0);
154 os::closeFile(fd);
155 return OK;
157 filterAndRemoveStmtLogs();
158 int ret = os::truncate(dbRedoFileName);
159 if (ret != 0) {
160 printError(ErrSysInternal, "Unable to truncate redo log file. Delete and restart the server\n");
161 return ErrOS;
163 return OK;
165 DbRetVal Database::filterAndRemoveStmtLogs()
167 struct stat st;
168 char fName[MAX_FILE_LEN];
169 sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile());
170 file_desc fdRead = os::openFile(fName, fileOpenReadOnly,0);
171 if ((file_desc)-1 == fdRead) { return OK; }
172 if (::stat(fName, &st) == -1) {
173 printError(ErrSysInternal, "Unable to retrieve stmt log file size");
174 os::closeFile(fdRead);
175 return ErrSysInternal;
177 if (st.st_size ==0) {
178 os::closeFile(fdRead);
179 return OK;
181 void *startAddr = os::mmap(NULL, st.st_size, mapProtRead, mapPrivate, fdRead, 0);
182 if ((void*) MAP_FAILED == startAddr) {
183 printError(ErrSysInternal, "Unable to mmap stmt log file\n");
184 return ErrSysInternal;
186 sprintf(fName, "%s/csql.db.stmt1", Conf::config.getDbFile());
187 int fd = os::openFileForAppend(fName, O_CREAT|O_TRUNC);
188 char *iter = (char*)startAddr;
189 char *logStart = NULL, *logEnd = NULL;
190 int logType;
191 int stmtID;
192 int len =0, ret =0;
193 int txnID, loglen;
194 DbRetVal rv = OK;
195 HashMap stmtMap;
196 stmtMap.setKeySize(sizeof(int));
197 //PASS-I load all prepare stmts and free them
198 while(true) {
199 if (iter - (char*)startAddr >= st.st_size) break;
200 logType = *(int*)iter;
201 logStart = iter;
202 if (logType == -1) { //prepare
203 iter = iter + sizeof(int);
204 len = *(int*) iter;
205 iter = iter + 2 * sizeof(int);
206 stmtID = *(int*)iter;
207 stmtMap.insert(iter);
208 iter = logStart+ len;
209 ret =0;
211 else if(logType == -3) { //free
212 iter = iter + sizeof(int);
213 txnID = *(int*) iter; iter += sizeof(int);
214 loglen = *(int*) iter; iter += sizeof(int);
215 stmtID = *(int*)iter;
216 stmtMap.remove(iter);
217 iter = iter + sizeof(int);
218 }else{
219 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
220 rv = ErrSysInternal;
221 break;
224 //PASS-II take the prepared statements which are not freed into another backup file
225 while(true) {
226 if (iter - (char*)startAddr >= st.st_size) break;
227 logType = *(int*)iter;
228 logStart = iter;
229 if (logType == -1) { //prepare
230 iter = iter + sizeof(int);
231 len = *(int*) iter;
232 iter = iter + 2 * sizeof(int);
233 stmtID = *(int*)iter;
234 iter = logStart+ len;
235 ret =0;
236 if (stmtMap.find(&stmtID))
237 ret = os::write(fd, logStart, len);
238 if (-1 == ret) {
239 printError(ErrSysInternal, "Unable to write statement logs");
242 else if(logType == -3) { //free
243 iter = logStart + 4 *sizeof(int);
244 //neglet free stmt logs in this pass
245 }else{
246 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
247 rv = ErrSysInternal;
248 break;
252 os::close(fd);
253 os::munmap((char*)startAddr, st.st_size);
254 os::closeFile(fdRead);
255 stmtMap.removeAll();
256 char cmd[MAX_FILE_LEN *2];
257 sprintf(cmd, "mv %s/csql.db.stmt1 %s/csql.db.stmt",
258 Conf::config.getDbFile(), Conf::config.getDbFile());
259 ret = system(cmd);
260 return rv;
262 int Database::getCheckpointID()
264 int id=0;
265 char curCkptFile[MAX_FILE_LEN];
266 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
267 FILE *fp = fopen(curCkptFile, "r");
268 if (NULL == fp) { setCheckpointID(0); return 0; }
269 fscanf(fp, "%d", &id);
270 fclose(fp);
271 return id;
273 void Database::setCheckpointID(int id)
275 char curCkptFile[MAX_FILE_LEN];
276 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
277 FILE *fp = fopen(curCkptFile, "w");
278 if (NULL == fp) {
280 printError(ErrSysInternal, "Unable to set checkpointID");
281 return;
283 fprintf(fp, "%d", id);
284 logFine(Conf::logger, "Current checkpoint set to %d", id);
285 fclose(fp);
286 return;
290 //used only by the user database not the system database
291 DbRetVal Database::recoverUserDB()
293 char dataFile[MAX_FILE_LEN];
294 char cmd[MAX_FILE_LEN];
295 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
296 int fd = os::open(dataFile, fileOpenReadOnly, 0);
297 if (-1 == fd) { return OK; }
298 void *buf = (void *) metaData_;
299 int readbytes = read(fd, buf, Conf::config.getMaxDbSize());
300 if (readbytes == -1) { os::close(fd); return ErrOS; }
301 os::close(fd);
302 return OK;
305 //used only by the system database
306 DbRetVal Database::recoverSystemDB()
308 char mapFile[MAX_FILE_LEN];
309 sprintf(mapFile, "%s/db.chkpt.map", Conf::config.getDbFile());
310 int fd = open(mapFile, O_RDONLY);
311 if (-1 == fd) { return OK; }
312 CatalogTableTABLE cTable(this);
313 CatalogTableINDEX cIndex(this);
314 struct Object buf;
315 while (read(fd, &buf, sizeof(buf))) {
316 if (buf.type == Tbl) {
317 cTable.setChunkPtr(buf.name, buf.firstPage, buf.curPage);
319 else if (buf.type == hIdx || buf.type == tIdx) {
320 cIndex.setChunkPtr(buf.name, buf.type, buf.bucketChunk, buf.firstPage, buf.curPage);
323 return OK;