redo log changes. prepare with param will go to stmt logs and other stmts go to redo...
[csql.git] / src / sqllog / FileSend.cxx
blobf419e4148da610e413853930cbe3cf696022c31d
1 /***************************************************************************
2 * Copyright (C) 2007 by Prabakaran Thirumalai *
3 * praba_tuty@yahoo.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19 ***************************************************************************/
20 #include <os.h>
21 #include <SqlLogConnection.h>
22 #include <CSql.h>
24 FileSend::FileSend()
26 fdRedoLog = -1;
27 fdStmtLog = -1;
28 openRedoFile();
30 DbRetVal FileSend::openRedoFile()
32 if (fdRedoLog > 0) os::closeFile(fdRedoLog);
33 if (fdStmtLog > 0) os::closeFile(fdStmtLog);
34 char fileName[MAX_FILE_LEN];
35 char stmtFileName[MAX_FILE_LEN];
36 sprintf(fileName, "%s/csql.db.cur", Conf::config.getDbFile());
37 sprintf(stmtFileName, "%s/csql.db.stmt", Conf::config.getDbFile());
38 int durableMode = Conf::config.getDurableMode();
39 switch(durableMode) {
40 case 1:
41 case 2:
42 fdRedoLog = os::openFileForAppend(fileName, O_CREAT);
43 fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT);
44 break;
45 case 3:
46 fdRedoLog = os::openFileForAppend(fileName, O_CREAT|O_SYNC);
47 fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT|O_SYNC);
48 break;
49 case 4:
50 #ifdef SOLARIS
51 fdRedoLog = os::openFileForAppend(fileName, O_CREAT|O_DSYNC);
52 fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT|O_DSYNC);
53 #else
54 fdRedoLog = os::openFileForAppend(fileName, O_CREAT|O_DIRECT);
55 fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT|O_DIRECT);
56 #endif
57 break;
58 default:
59 fdRedoLog = os::openFileForAppend(fileName, O_CREAT);
60 fdStmtLog = os::openFileForAppend(stmtFileName, O_CREAT);
61 break;
63 if (-1 == fdRedoLog || -1 == fdStmtLog) {
64 printError(ErrSysInternal, "Unable to open redo log file");
65 return ErrSysInternal;
67 return OK;
70 FileSend::~FileSend() {
71 if (fdRedoLog > 0) os::closeFile(fdRedoLog);
72 if (fdStmtLog > 0) os::closeFile(fdStmtLog);
73 fdRedoLog = -1;
74 fdStmtLog = -1;
77 DbRetVal FileSend::prepare(int txnId, int stmtId, int len, char *stmt,
78 char *tblName, bool hasParam)
80 if (fdRedoLog < 0) {
81 printError(ErrBadArg, "Redo Log file not opened");
82 return ErrBadArg;
84 if (fdStmtLog < 0) {
85 printError(ErrBadArg, "Redo Stmt Log file not opened");
86 return ErrBadArg;
88 int fd =fdRedoLog;
89 if (hasParam) fd=fdStmtLog;
90 //The following structure needs strlen after stmt id for traversal in
91 //redolog file unlike msg queue structure where string is the last element
92 //and is not a continuous piece of memory.
93 int datalen = os::align(5 * sizeof(int) + len); // for len + txnId + msg type + stmtId + tableName + stmtstrlen + stmtstring
94 char *buf = (char*) malloc(datalen);
95 char *msg = buf;
96 //Note:: msg type is taken as -ve as we need to differentiate between
97 //statement id and logtype during recovery.
98 *(int*) msg = -1;
99 if (strlen(stmt) > 6 && ( strncasecmp(stmt,"CREATE", 6) == 0 || strncasecmp(stmt,"DROP", 4) == 0 ))
100 *(int*)msg = -4; //means prepare and execute the stmt
101 msg = msg+sizeof(int);
102 *(int *)msg = datalen;
103 msg = msg+sizeof(int);
104 *(int *)msg = txnId;
105 msg = msg+sizeof(int);
106 *(int *)msg = stmtId;
107 msg = msg+ sizeof(int);
108 *(int *)msg = os::align(len);
109 msg = msg+ sizeof(int);
110 msg[len-1] = '\0';
111 strcpy(msg, stmt);
112 int ret =0;
113 bool firstTime=true;
114 retry:
115 if (Conf::config.getDurableMode() != 1) {
116 ret = os::lockFile(fd);
117 if (-1 == ret) {
118 ::free(buf);
119 printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file");
120 return ErrLockTimeOut;
123 ret = os::write(fd, buf, datalen);
124 if (Conf::config.getDurableMode() != 1) {
125 os::unlockFile(fd);
127 if (-1 == ret) {
128 DbRetVal rv = openRedoFile();
129 if (hasParam) fd=fdStmtLog; else fd=fdRedoLog;
130 if (OK == rv) {
131 logFine(Conf::logger, "Reopening redo log file");
132 if(firstTime) { firstTime = false; goto retry; }
134 printError(ErrOS, "Unable to write undo log");
135 ::free(buf);
136 return ErrOS;
138 ::free(buf);
139 return OK;
142 DbRetVal FileSend::commit(int len, void *data)
144 if (fdRedoLog < 0) {
145 printError(ErrBadArg, "Redo Log file not opened");
146 return ErrBadArg;
148 char *dat=(char*)data - sizeof(int);
149 *(int*)dat = -2; //type 2->commit
150 bool firstTime = true;
151 retry:
152 if (Conf::config.getDurableMode() != 1) {
153 int ret = os::lockFile(fdRedoLog);
154 if (-1 == ret) {
155 printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file");
156 return ErrLockTimeOut;
159 int ret = os::write(fdRedoLog, dat, len+sizeof(int));
160 if (Conf::config.getDurableMode() != 1) {
161 os::unlockFile(fdRedoLog);
163 if (-1 == ret) {
164 DbRetVal rv = openRedoFile();
165 if (OK == rv) {
166 logFine(Conf::logger, "Reopening redo log file");
167 if(firstTime) { firstTime = false; goto retry; }
169 printError(ErrOS, "Unable to write undo log");
170 return ErrOS;
172 return OK;
174 DbRetVal FileSend::free(int txnId, int stmtId, bool hasParam)
176 if (fdRedoLog < 0) {
177 printError(ErrBadArg, "Redo Log file not opened");
178 return ErrBadArg;
180 if (fdStmtLog < 0) {
181 printError(ErrBadArg, "Redo Stmt Log file not opened");
182 return ErrBadArg;
184 int fd =fdRedoLog;
185 int buflen = 4 *sizeof(int);
186 char *msg = (char *) malloc(buflen);
187 char *ptr = msg;
188 *(int*)ptr = -3;
189 ptr += sizeof(int);
190 *(int *)ptr = 3 * sizeof(int); // for len + txnId + stmtId
191 ptr += sizeof(int);
192 *(int *)ptr = txnId;
193 ptr += sizeof(int);
194 *(int *)ptr = stmtId;
195 printDebug(DM_SqlLog, "stmtID sent = %d\n", *(int *)ptr);
196 bool firstTime = true;
197 retry:
198 if (Conf::config.getDurableMode() != 1) {
199 int ret = os::lockFile(fd);
200 if (-1 == ret) {
201 ::free(msg);
202 printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file");
203 return ErrLockTimeOut;
206 int ret = os::write(fd, msg, buflen);
207 if (Conf::config.getDurableMode() != 1) {
208 os::unlockFile(fd);
210 if (-1 == ret) {
211 DbRetVal rv = openRedoFile();
212 if (hasParam) fd=fdStmtLog; else fd=fdRedoLog;
213 if (OK == rv) {
214 logFine(Conf::logger, "Reopening redo log file");
215 if(firstTime) { firstTime = false; goto retry; }
217 printError(ErrOS, "Unable to write undo log");
218 ::free(msg);
219 return ErrOS;
221 if (!hasParam) {
222 //For non parameterized stmts , no need to write in stmt log
223 ::free(msg);
224 return OK;
226 fd=fdStmtLog;
227 retry1:
228 if (Conf::config.getDurableMode() != 1) {
229 int ret = os::lockFile(fd);
230 if (-1 == ret) {
231 ::free(msg);
232 printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file");
233 return ErrLockTimeOut;
237 ret = os::write(fd, msg, buflen);
238 if (Conf::config.getDurableMode() != 1) {
239 os::unlockFile(fd);
241 if (-1 == ret) {
242 DbRetVal rv = openRedoFile();
243 if (OK == rv) {
244 logFine(Conf::logger, "Reopening redo log file");
245 if(firstTime) { firstTime = false; goto retry1; }
247 printError(ErrOS, "Unable to write undo log");
248 ::free(msg);
249 return ErrOS;
251 ::free(msg);
252 return OK;
255 OfflineLog::OfflineLog()
257 fdOfflineLog = -1;
258 metadata = NULL;
259 createMetadataFile();
260 openOfflineLogFile();
263 DbRetVal OfflineLog::openOfflineLogFile()
265 char fileName[MAX_FILE_LEN];
266 int offlineLogFileSize = Conf::config.getOfflineLogFileSize();
267 fileSize = 0;
268 if (metadata == NULL) {
269 metadata = openMetadataFile();
270 if (metadata == NULL) {
271 printError(ErrOS, "Unable to open Metadata file");
272 return ErrOS;
275 char *ptr = (char *) metadata;
276 sprintf(fileName, "%s/offlineLogFile.%d",
277 Conf::config.getDbFile(), *(int *)ptr);
278 int ret = 0;
279 if ( ((ret = ::access(fileName, F_OK)) == 0) &&
280 ((fileSize = os::getFileSize(fileName)) >= offlineLogFileSize) )
281 sprintf(fileName, "%s/offlineLogFile.%d",
282 Conf::config.getDbFile(), ++(*(int *)ptr));
283 else if (ret == 0)
284 sprintf(fileName, "%s/offlineLogFile.%d", Conf::config.getDbFile(),
285 *(int *)ptr);
286 else {
287 sprintf(fileName, "%s/offlineLogFile.0", Conf::config.getDbFile());
288 *(int *) ptr = 0;
290 int durableMode = Conf::config.getDurableMode();
291 switch(durableMode) {
292 case 1:
293 case 2:
294 fdOfflineLog = os::openFileForAppend(fileName, O_CREAT);
295 break;
296 case 3:
297 fdOfflineLog = os::openFileForAppend(fileName, O_CREAT|O_SYNC);
298 break;
299 case 4:
300 #ifdef SOLARIS
301 fdOfflineLog = os::openFileForAppend(fileName, O_CREAT|O_DSYNC);
302 #else
303 fdOfflineLog = os::openFileForAppend(fileName, O_CREAT|O_DIRECT);
304 #endif
305 break;
306 default:
307 fdOfflineLog = os::openFileForAppend(fileName, O_CREAT);
308 break;
310 if (-1 == fdOfflineLog) {
311 printError(ErrSysInternal, "Unable to open redo log file");
312 return ErrSysInternal;
314 ret = msync(metadata, sizeof(int), MS_SYNC);
315 if (ret) {
316 printError(ErrOS, "Unable to sync file index to metadata file.");
317 return ErrOS;
319 return OK;
322 OfflineLog::~OfflineLog()
324 if (fdOfflineLog > 0) os::closeFile(fdOfflineLog);
325 fdOfflineLog = -1;
328 DbRetVal OfflineLog::prepare(int txnId, int stId, int len, char *stmt,
329 char*tn, bool hasParam)
331 if (fdOfflineLog < 0) return ErrBadArg;
332 DbRetVal rv = OK;
333 if (fileSize > Conf::config.getOfflineLogFileSize())
335 rv = openOfflineLogFile();
336 if (rv != OK) return rv;
338 //The following structure needs strlen after stmt id for traversal in
339 //redolog file unlike msg queue structure where string is the last element
340 //and is not a continuous piece of memory.
341 int datalen = os::align(5 * sizeof(int) + len); // for len + txnId + msg type + stmtId + tableName + stmtstrlen + stmtstring
342 char *buf = (char*) malloc(datalen);
343 char *msg = buf;
344 //Note:: msg type is taken as -ve as we need to differentiate between
345 //statement id and logtype during recovery.
346 *(int*) msg = -1;
347 if (strlen(stmt) > 6 && ( strncasecmp(stmt,"CREATE", 6) == 0 || strncasecmp(stmt,"DROP", 4) == 0 ))
348 *(int*)msg = -4; //means prepare and execute the stmt
349 msg = msg+sizeof(int);
350 *(int *)msg = datalen;
351 msg = msg+sizeof(int);
352 *(int *)msg = txnId;
353 msg = msg+sizeof(int);
354 *(int *)msg = stId;
355 msg = msg+ sizeof(int);
356 *(int *)msg = os::align(len);
357 msg = msg+ sizeof(int);
358 msg[len-1] = '\0';
359 strcpy(msg, stmt);
360 int ret =0;
361 bool firstTime=true;
362 retry:
363 if (Conf::config.getDurableMode() != 1) {
364 ret = os::lockFile(fdOfflineLog);
365 if (-1 == ret) {
366 ::free(buf);
367 printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file");
368 return ErrLockTimeOut;
371 ret = os::write(fdOfflineLog, buf, datalen);
372 if (Conf::config.getDurableMode() != 1) {
373 os::unlockFile(fdOfflineLog);
375 if (-1 == ret) {
376 DbRetVal rv = openOfflineLogFile();
377 if (OK == rv) {
378 logFine(Conf::logger, "Reopening redo log file");
379 if(firstTime) { firstTime = false; goto retry; }
381 printError(ErrOS, "Unable to write undo log");
382 ::free(buf);
383 return ErrOS;
385 ::free(buf);
386 fileSize += datalen;
387 return OK;
389 DbRetVal OfflineLog::commit(int len, void *data)
391 if (fdOfflineLog < 0) return ErrBadArg;
392 DbRetVal rv = OK;
393 if (fileSize > Conf::config.getOfflineLogFileSize())
395 rv = openOfflineLogFile();
396 if (rv != OK) return rv;
398 char *dat=(char*)data - sizeof(int);
399 *(int*)dat = -2; //type 2->commit
400 bool firstTime = true;
401 retry:
402 if (Conf::config.getDurableMode() != 1) {
403 int ret = os::lockFile(fdOfflineLog);
404 if (-1 == ret) {
405 printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file");
406 return ErrLockTimeOut;
409 int ret = os::write(fdOfflineLog, dat, len+sizeof(int));
410 if (Conf::config.getDurableMode() != 1) {
411 os::unlockFile(fdOfflineLog);
413 if (-1 == ret) {
414 DbRetVal rv = openOfflineLogFile();
415 if (OK == rv) {
416 logFine(Conf::logger, "Reopening redo log file");
417 if(firstTime) { firstTime = false; goto retry; }
419 printError(ErrOS, "Unable to write undo log");
420 return ErrOS;
422 fileSize += len+sizeof(int);
423 return OK;
425 DbRetVal OfflineLog::free(int txnId, int stId, bool hasParam)
427 if (fdOfflineLog < 0) return ErrBadArg;
428 DbRetVal rv = OK;
429 if (fileSize > Conf::config.getOfflineLogFileSize())
431 rv = openOfflineLogFile();
432 if (rv != OK) return rv;
434 int buflen = 4 *sizeof(int);
435 char *msg = (char *) malloc(buflen);
436 char *ptr = msg;
437 *(int*)ptr = -3;
438 ptr += sizeof(int);
439 *(int *)ptr = 3 * sizeof(int); // for len + txnId + stmtId
440 ptr += sizeof(int);
441 *(int *)ptr = txnId;
442 ptr += sizeof(int);
443 *(int *)ptr = stId;
444 printDebug(DM_SqlLog, "stmtID sent = %d\n", *(int *)ptr);
445 bool firstTime = false;
446 retry:
447 if (Conf::config.getDurableMode() != 1) {
448 int ret = os::lockFile(fdOfflineLog);
449 if (-1 == ret) {
450 ::free(msg);
451 printError(ErrLockTimeOut,"Unable to get exclusive lock on redo log file");
452 return ErrLockTimeOut;
455 int ret = os::write(fdOfflineLog, msg, buflen);
456 if (Conf::config.getDurableMode() != 1) {
457 os::unlockFile(fdOfflineLog);
459 if (-1 == ret) {
460 DbRetVal rv = openOfflineLogFile();
461 if (OK == rv) {
462 logFine(Conf::logger, "Reopening redo log file");
463 if(firstTime) { firstTime = false; goto retry; }
465 printError(ErrOS, "Unable to write undo log");
466 ::free(msg);
467 return ErrOS;
469 ::free(msg);
470 fileSize += buflen;
471 return OK;
474 DbRetVal OfflineLog::createMetadataFile()
476 int fd = -1;
477 char mmapFile[128];
478 sprintf(mmapFile, "%s/offlineLogFile.dat", Conf::config.getDbFile());
479 int size = sizeof(int);
480 // int for offlineLogFile index + long for committed TxnID
481 char buffer[size];
482 if (::access(mmapFile, F_OK) == 0) return OK;
483 else {
484 fd = ::open(mmapFile, O_CREAT | O_RDWR, 0664);
485 if (fd == -1) {
486 printError(ErrOS, "Unable to create '%s'file to mmap", mmapFile);
487 return ErrOS;
489 memset(buffer, 0, size);
490 int sz = ::write(fd, buffer, size);
491 if (sz != size) {
492 printError(ErrOS, "Unable to initialize mmap file %s", mmapFile);
493 return ErrOS;
495 ::close(fd);
497 return OK;
500 void *OfflineLog::openMetadataFile()
502 char mmapFile[128];
503 int size = sizeof(int) + sizeof(long);
504 sprintf(mmapFile, "%s/offlineLogFile.dat", Conf::config.getDbFile());
505 int fd = ::open(mmapFile, O_RDWR, 0666);
506 if (fd == -1) {
507 printError(ErrOS, "Unable to open Mmap file %s", mmapFile);
508 return NULL;
510 metadata = os::mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
511 if (metadata == NULL) {
512 printError(ErrOS, "Unable to map the file %s to memory", mmapFile);
513 return NULL;
515 return metadata;