*** empty log message ***
[csql.git] / src / storage / DatabaseRecovery.cxx
blobccb0e9a73c6310d70f1bc2445f1ea5de30994143
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<Database.h>
17 #include<os.h>
18 #include<CatalogTables.h>
19 #include<Transaction.h>
20 #include<Lock.h>
21 #include<Debug.h>
22 #include<Config.h>
23 #include<Process.h>
24 #include<HeapAllocator.h>
26 DbRetVal Database::writeDirtyPages(char *dataFile)
28 int fd = os::open(dataFile, fileOpenCreat, 0);
29 os::lseek(fd, 0, SEEK_SET);
30 void *buf = (void *) metaData_;
31 int sizeToWrite = os::alignLong(sizeof(DatabaseMetaData));
32 size_t retSize = os::write(fd, (char*)buf, sizeToWrite);
33 if (-1 == retSize)
35 printError(ErrWarning, "Warning:Unable to write metadata");
36 return ErrSysInternal;
38 PageInfo *pageInfo = (PageInfo*) getFirstPage();
39 long pageSize =PAGE_SIZE;
40 int pagesWritten=0, writeOffset=0;
41 long long totalBytesWritten=0;
42 while(isValidAddress((char*) pageInfo))
44 if ( NULL == pageInfo ) break;
45 if (pageInfo > getCurrentPage()) {
46 char *a="0";
47 os::lseek(fd, getMaxSize() -1, SEEK_SET);
48 if ( -1 == os::write(fd, a, 1)) {
49 printError(ErrSysInternal, "Unable to extend chkpt file");
50 os::close(fd);
51 return ErrSysInternal;
53 break;
55 if (BITSET(pageInfo->flags, IS_DIRTY)) {
56 if (NULL == pageInfo->nextPageAfterMerge_)
57 pageSize = PAGE_SIZE;
58 else
59 pageSize = (long)pageInfo->nextPageAfterMerge_ - (long)pageInfo;
60 writeOffset = (long) pageInfo - (long) metaData_;
61 ::lseek(fd, writeOffset, SEEK_SET);
62 CLEARBIT(pageInfo->flags, IS_DIRTY);
63 retSize = os::write(fd, (char*)pageInfo, pageSize);
64 if ( -1 == retSize ) {
65 printError(ErrSysInternal, "Unable to write dirty page %x", pageInfo);
66 os::close(fd);
67 return ErrSysInternal;
69 totalBytesWritten= totalBytesWritten + retSize;
70 pagesWritten++;
72 if ( pageInfo->nextPageAfterMerge_ == NULL) {
73 pageInfo = (PageInfo*)((char*)pageInfo + PAGE_SIZE);
74 } else {
75 pageInfo = (PageInfo*)pageInfo->nextPageAfterMerge_;
78 //printf("Total Dirty pages written %d %lld\n", pagesWritten, totalBytesWritten);
79 logFine(Conf::logger, "Total Dirty pages written %d\n", pagesWritten);
80 os::close(fd);
81 return OK;
84 DbRetVal Database::checkPoint()
86 char dataFile[MAX_FILE_LEN];
87 char cmd[MAX_FILE_LEN];
88 char dbRedoFileName[MAX_FILE_LEN];
89 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
90 if (!Conf::config.useMmap()) {
91 // sprintf(dataFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
92 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
93 FILE *fp = NULL;
94 if (fp = fopen(dataFile, "r")) {
95 fclose(fp);
96 int ret = ::unlink(dataFile);
97 if (ret != OK) {
98 printError(ErrOS, "Unable to delete old chkpt file. Failure");
99 return ErrOS;
102 int fd = ::open(dataFile, O_WRONLY|O_CREAT, 0644);
103 void *buf = (void *) metaData_;
104 os::lseek(fd, 0, SEEK_SET);
105 os::write(fd, (char*) buf, Conf::config.getMaxDbSize());
106 os::close(fd);
107 sprintf(cmd, "cp -f %s/db.chkpt.data %s/db.chkpt.data1", Conf::config.getDbFile(), Conf::config.getDbFile());
108 int ret = system(cmd);
109 if (ret != 0) {
110 printError(ErrOS, "Unable to take checkpoint back up file");
111 return ErrOS;
113 } else {
114 file_desc fd = getChkptfd();
115 if (!os::fdatasync(fd)) {
116 logFine(Conf::logger, "fsync succedded");
118 filterAndRemoveStmtLogs();
119 int ret = os::truncate(dbRedoFileName);
120 if (ret != 0) {
121 os::closeFile(fd);
122 printError(ErrSysInternal, "Unable to truncate redo log file");
123 printError(ErrSysInternal, "Delete %s manually and restart the server", dbRedoFileName);
124 return ErrOS;
126 //switch the checkpoint so that during recovery, fsynced checkpoint is
127 //used during recovery if the below step(writeDirtyPages)
128 //is not completed succesfully.
129 if (Database::getCheckpointID() == 0)
130 Database::setCheckpointID(1);
131 else
132 Database::setCheckpointID(0);
134 int val=Database::getCheckpointID();
136 sprintf(dataFile, "%s/db.chkpt.data%d", Conf::config.getDbFile(), val);
137 DbRetVal rv = writeDirtyPages(dataFile);
138 if (OK != rv)
140 printError(ErrSysInternal, "Unable to write dirty pages");
141 os::closeFile(fd);
142 return rv;
145 //Note: do not change order, chkpt id should be switched only after
146 //all dirty pages are written to disk. otherwise(if server crashes
147 //when it writes these dirty pages) recovery should use
148 //mapped file as fsync is already done on that file.
149 if (Database::getCheckpointID() == 0)
150 Database::setCheckpointID(1);
151 else
152 Database::setCheckpointID(0);
154 os::closeFile(fd);
155 return OK;
157 filterAndRemoveStmtLogs();
158 int ret = os::truncate(dbRedoFileName);
159 if (ret != 0) {
160 printError(ErrSysInternal, "Unable to truncate redo log file. Delete and restart the server\n");
161 return ErrOS;
163 return OK;
166 DbRetVal Database::filterAndRemoveStmtLogs()
168 struct stat st;
169 char fName[MAX_FILE_LEN];
170 sprintf(fName, "%s/csql.db.stmt", Conf::config.getDbFile());
171 file_desc fdRead = os::openFile(fName, fileOpenReadOnly,0);
172 if ((file_desc)-1 == fdRead) { return OK; }
173 if (::stat(fName, &st) == -1) {
174 printError(ErrSysInternal, "Unable to retrieve stmt log file size");
175 os::closeFile(fdRead);
176 return ErrSysInternal;
178 if (st.st_size ==0) {
179 os::closeFile(fdRead);
180 return OK;
182 void *startAddr = os::mmap(NULL, st.st_size, mapProtRead, mapPrivate, fdRead, 0);
183 if ((void*) MAP_FAILED == startAddr) {
184 printError(ErrSysInternal, "Unable to mmap stmt log file\n");
185 return ErrSysInternal;
187 sprintf(fName, "%s/csql.db.stmt1", Conf::config.getDbFile());
188 int fd = os::openFileForAppend(fName, O_CREAT|O_TRUNC);
189 char *iter = (char*)startAddr;
190 char *logStart = NULL, *logEnd = NULL;
191 int logType;
192 int stmtID;
193 int len =0, ret =0;
194 int txnID, loglen;
195 DbRetVal rv = OK;
196 HashMap stmtMap;
197 stmtMap.setKeySize(sizeof(int));
198 //PASS-I load all prepare stmts and free them
199 while(true) {
200 if (iter - (char*)startAddr >= st.st_size) break;
201 logType = *(int*)iter;
202 logStart = iter;
203 if (logType == -1) { //prepare
204 iter = iter + sizeof(int);
205 len = *(int*) iter;
206 iter = iter + 2 * sizeof(int);
207 stmtID = *(int*)iter;
208 stmtMap.insert(iter);
209 iter = logStart+ len;
210 ret =0;
212 else if(logType == -3) { //free
213 iter = iter + sizeof(int);
214 txnID = *(int*) iter; iter += sizeof(int);
215 loglen = *(int*) iter; iter += sizeof(int);
216 stmtID = *(int*)iter;
217 stmtMap.remove(iter);
218 iter = iter + sizeof(int);
219 }else{
220 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
221 rv = ErrSysInternal;
222 break;
225 //PASS-II take the prepared statements which are not freed into another backup file
226 while(true) {
227 if (iter - (char*)startAddr >= st.st_size) break;
228 logType = *(int*)iter;
229 logStart = iter;
230 if (logType == -1) { //prepare
231 iter = iter + sizeof(int);
232 len = *(int*) iter;
233 iter = iter + 2 * sizeof(int);
234 stmtID = *(int*)iter;
235 iter = logStart+ len;
236 ret =0;
237 if (stmtMap.find(&stmtID))
238 ret = os::write(fd, logStart, len);
239 if (-1 == ret) {
240 printError(ErrSysInternal, "Unable to write statement logs");
243 else if(logType == -3) { //free
244 iter = logStart + 4 *sizeof(int);
245 //neglet free stmt logs in this pass
246 }else{
247 printError(ErrSysInternal, "Stmt Redo log file corrupted: logType:%d", logType);
248 rv = ErrSysInternal;
249 break;
253 os::close(fd);
254 os::munmap((char*)startAddr, st.st_size);
255 os::closeFile(fdRead);
256 stmtMap.removeAll();
257 char cmd[MAX_FILE_LEN *2];
258 sprintf(cmd, "mv %s/csql.db.stmt1 %s/csql.db.stmt",
259 Conf::config.getDbFile(), Conf::config.getDbFile());
260 ret = system(cmd);
261 return rv;
264 int Database::getCheckpointID()
266 int id=0;
267 char curCkptFile[MAX_FILE_LEN];
268 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
269 FILE *fp = fopen(curCkptFile, "r");
270 if (NULL == fp) { setCheckpointID(0); return 0; }
271 fscanf(fp, "%d", &id);
272 fclose(fp);
273 return id;
276 void Database::setCheckpointID(int id)
278 char curCkptFile[MAX_FILE_LEN];
279 sprintf(curCkptFile, "%s/db.chkpt.cur", Conf::config.getDbFile());
280 FILE *fp = fopen(curCkptFile, "w");
281 if (NULL == fp) {
283 printError(ErrSysInternal, "Unable to set checkpointID");
284 return;
286 fprintf(fp, "%d", id);
287 logFine(Conf::logger, "Current checkpoint set to %d", id);
288 fclose(fp);
289 return;
292 //used only by the user database not the system database
293 DbRetVal Database::recoverUserDB()
295 char dataFile[MAX_FILE_LEN];
296 char cmd[MAX_FILE_LEN];
297 sprintf(dataFile, "%s/db.chkpt.data", Conf::config.getDbFile());
298 int fd = os::open(dataFile, fileOpenReadOnly, 0);
299 if (-1 == fd) { return OK; }
300 void *buf = (void *) metaData_;
301 int readbytes = read(fd, buf, Conf::config.getMaxDbSize());
302 if (readbytes == -1) { os::close(fd); return ErrOS; }
303 os::close(fd);
304 return OK;
307 //used only by the system database
308 DbRetVal Database::recoverSystemDB()
310 char mapFile[MAX_FILE_LEN];
311 sprintf(mapFile, "%s/db.chkpt.map", Conf::config.getDbFile());
312 int fd = open(mapFile, O_RDONLY);
313 if (-1 == fd) { return OK; }
314 CatalogTableTABLE cTable(this);
315 CatalogTableINDEX cIndex(this);
316 struct Object buf;
317 while (read(fd, &buf, sizeof(buf))) {
318 if (buf.type == Tbl) {
319 cTable.setChunkPtr(buf.name, buf.firstPage, buf.curPage);
321 else if (buf.type == hIdx || buf.type == tIdx) {
322 cIndex.setChunkPtr(buf.name, buf.type, buf.bucketChunk, buf.firstPage, buf.curPage);
325 return OK;