cleanup
[csql.git] / src / tools / csqlserver.cxx
blobb6aec3a2bb545a19d05d7fd4d4761eb832d6af0c
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<os.h>
17 #include<CSql.h>
18 #include<SessionImpl.h>
19 #include<Debug.h>
20 #include<Process.h>
21 #include<Database.h>
22 #include<Transaction.h>
23 #include<Lock.h>
24 #include<CacheTableLoader.h>
25 #include<sys/wait.h> //TODO::move this to os.h
26 char* version = "csql-linux-i686-3.0GA";
27 int srvStop =0;
28 pid_t asyncpid=0;
29 pid_t sqlserverpid=0;
30 pid_t cachepid=0;
31 bool recoverFlag=false;
32 void dumpData();
33 SessionImpl *session = NULL;
34 static void sigTermHandler(int sig)
36 printf("Received signal %d\nStopping the server\n", sig);
37 srvStop = 1;
39 static void sigChildHandler(int sig)
41 os::signal(SIGCHLD, sigChildHandler);
42 int stat;
43 waitpid(-1, &stat, WNOHANG);
44 //TODO::move waitpid to os wrapper
47 bool checkDead(pid_t pid)
49 int ret = os::kill(pid, 0);
50 if (ret == -1) return true; else return false;
53 DbRetVal releaseAllResources(Database *sysdb, ThreadInfo *info )
55 printf("Releasing all the resources for process %d %lu\n", info->pid_, info->thrid_);
56 //recover for all the mutexes in has_
57 for (int i =0; i < MAX_MUTEX_PER_THREAD; i++)
59 if (info->has_[i] != NULL)
61 printf("Dead Procs: %d %lu holding mutex %x %s \n", info->pid_, info->thrid_, info->has_[i], info->has_[i]->name);
62 logFine(Conf::logger, "Dead Procs: %d %lu holding mutex %x %s \n", info->pid_, info->thrid_, info->has_[i], info->has_[i]->name);
63 //TODO::recovery of mutexes
64 sysdb->recoverMutex(info->has_[i]);
65 //srvStop = 1;
66 //return ErrSysFatal;
69 TransactionManager *tm = new TransactionManager();
70 LockManager *lm = new LockManager(sysdb);
71 for (int i = 0 ;i < MAX_THREADS_PER_PROCESS; i++)
73 if (info->thrTrans_[i].trans_ != NULL && info->thrTrans_[i].trans_->status_ == TransRunning)
75 printf("Rollback Transaction %x\n", info->thrTrans_[i].trans_);
76 tm->rollback(lm, info->thrTrans_[i].trans_);
77 info->thrTrans_[i].trans_->status_ = TransNotUsed;
80 info->init();
81 delete tm;
82 delete lm;
83 return OK;
86 DbRetVal cleanupDeadProcs(Database *sysdb)
88 DbRetVal rv = sysdb->getProcessTableMutex(false);
89 if (OK != rv)
91 printError(rv,"Unable to get process table mutex");
92 return rv;
94 pid_t pid;
95 pid = os::getpid();
96 pthread_t thrid = os::getthrid();
99 ThreadInfo* pInfo = sysdb->getThreadInfo(0);
100 int i=0;
101 ThreadInfo* freeSlot = NULL;
102 for (; i < Conf::config.getMaxProcs(); i++)
104 //check whether it is alive
105 if (pInfo->pid_ !=0 && checkDead(pInfo->pid_)) releaseAllResources(sysdb, pInfo);
106 pInfo++;
108 sysdb->releaseProcessTableMutex(false);
109 return OK;
113 DbRetVal logActiveProcs(Database *sysdb)
115 DbRetVal rv = sysdb->getProcessTableMutex(false);
116 if (OK != rv)
118 printError(rv,"Unable to get process table mutex");
119 return rv;
121 ThreadInfo* pInfo = sysdb->getThreadInfo(0);
122 int i=0, count =0;
123 ThreadInfo* freeSlot = NULL;
124 for (; i < Conf::config.getMaxProcs(); i++)
126 if (pInfo->pid_ !=0 ) {
127 logFine(Conf::logger, "Registered Procs: %d %lu\n", pInfo->pid_, pInfo->thrid_);
128 printf("Client process with pid %d is still registered\n", pInfo->pid_);
129 if( pInfo->pid_ != asyncpid && pInfo->pid_ != cachepid &&
130 pInfo->pid_ != sqlserverpid)
131 count++;
133 pInfo++;
135 sysdb->releaseProcessTableMutex(false);
136 if (count) return ErrSysInternal; else return OK;
138 void startCacheServer()
140 char execName[1024];
141 sprintf(execName, "%s/bin/csqlcacheserver", os::getenv("CSQL_INSTALL_ROOT"));
142 if (srvStop) return;
143 //printf("filename is %s\n", execName);
144 cachepid = os::createProcess(execName, "csqlcacheserver");
145 if (cachepid != -1)
146 printf("Cache Receiver Started\t [PID=%d]\n",cachepid);
147 return;
150 void startServiceClient()
152 char execName[1024];
153 sprintf(execName, "%s/bin/csqlsqlserver", os::getenv("CSQL_INSTALL_ROOT"));
154 //printf("filename is %s\n", execName);
155 if (srvStop) return;
156 sqlserverpid = os::createProcess(execName, "csqlsqlserver");
157 if (sqlserverpid != -1)
158 printf("Network Server Started\t [PID=%d] [PORT=%d]\n", sqlserverpid,Conf::config.getPort());
160 return;
163 void startAsyncServer()
165 char execName[1024];
166 sprintf(execName, "%s/bin/csqlasyncserver", os::getenv("CSQL_INSTALL_ROOT"));
167 //printf("filename is %s\n", execName);
168 if (srvStop) return;
169 asyncpid = os::createProcess(execName, "csqlasyncserver");
170 if (asyncpid != -1)
171 printf("Async Cache Server Started [PID=%d]\n", asyncpid);
172 return;
176 void printUsage()
178 printf("Usage: csqlserver [-c] [-v]\n");
179 printf(" v -> print the version.\n");
180 printf(" c -> recover all cached tables from the target database.\n");
181 printf("Description: Start the csql server and initialize the database.\n");
182 return;
184 int main(int argc, char **argv)
186 int c = 0,opt = 0;
187 char cmd[1024];
188 while ((c = getopt(argc, argv, "cv?")) != EOF)
190 switch (c)
192 case '?' : { opt = 10; break; } //print help
193 case 'c' : { opt = 1; break; } //recover all the tables from cache
194 case 'v' : { opt = 2; break; } //print version
195 default: opt=10;
198 }//while options
200 if (opt == 10) {
201 printUsage();
202 return 0;
203 }else if (opt ==2) {
204 printf("%s\n",version);
205 return 0;
207 session = new SessionImpl();
208 DbRetVal rv = session->readConfigFile();
209 if (rv != OK)
211 printf("Unable to read the configuration file \n");
212 return 1;
214 os::signal(SIGINT, sigTermHandler);
215 os::signal(SIGTERM, sigTermHandler);
216 os::signal(SIGCHLD, sigChildHandler);
217 rv = Conf::logger.startLogger(Conf::config.getLogFile(), true);
218 if (rv != OK)
220 printf("Unable to start the Conf::logger\n");
221 return 2;
223 bool isInit = true;
224 logFine(Conf::logger, "Server Started");
225 int ret = session->initSystemDatabase();
226 if (0 != ret)
228 //printf(" System Database Initialization failed\n");
229 printf("Attaching to exising database\n");
230 isInit = false;
231 delete session;
232 session = new SessionImpl();
233 ret = session->open(DBAUSER, DBAPASS);
234 if (ret !=0) {
235 printf("Unable to attach to existing database\n");
236 return 3;
239 bool end = false;
240 struct timeval timeout, tval;
241 timeout.tv_sec = 5;
242 timeout.tv_usec = 0;
243 Database* sysdb = session->getSystemDatabase();
244 recoverFlag = false;
246 GlobalUniqueID UID;
247 if (isInit) UID.create();
249 if(isInit && Conf::config.useDurability())
251 char dbRedoFileName[1024];
252 char dbChkptSchema[1024];
253 char dbChkptMap[1024];
254 char dbChkptData[1024];
255 char dbBackupFile[1024];
257 //check for check point file if present recover
258 sprintf(dbChkptSchema, "%s/db.chkpt.schema1", Conf::config.getDbFile());
259 if (FILE *file = fopen(dbChkptSchema, "r")) {
260 fclose(file);
261 sprintf(cmd, "cp -f %s %s/db.chkpt.schema", dbChkptSchema, Conf::config.getDbFile());
262 int ret = system(cmd);
263 if (ret != 0) {
264 Conf::logger.stopLogger();
265 session->destroySystemDatabase();
266 delete session;
267 return 20;
270 sprintf(dbChkptMap, "%s/db.chkpt.map1", Conf::config.getDbFile());
271 if (FILE *file = fopen(dbChkptMap, "r")) {
272 fclose(file);
273 sprintf(cmd, "cp -f %s %s/db.chkpt.map", dbChkptMap, Conf::config.getDbFile());
274 int ret = system(cmd);
275 if (ret != 0) {
276 Conf::logger.stopLogger();
277 session->destroySystemDatabase();
278 delete session;
279 return 30;
282 sprintf(dbChkptData, "%s/db.chkpt.data", Conf::config.getDbFile());
283 sprintf(dbBackupFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
284 FILE *fl = NULL;
285 if (!Conf::config.useMmap() && (fl = fopen(dbBackupFile, "r"))) {
286 fclose(fl);
287 sprintf(cmd, "cp %s/db.chkpt.data1 %s", Conf::config.getDbFile(), dbChkptData);
288 int ret = system(cmd);
289 if (ret != 0) {
290 printError(ErrOS, "Unable to take backup for chkpt data file");
291 return 40;
294 if (FILE *file = fopen(dbChkptData, "r")) {
295 fclose(file);
296 int ret = system("recover");
297 if (ret != 0) {
298 printf("Recovery failed\n");
299 Conf::logger.stopLogger();
300 session->destroySystemDatabase();
301 delete session;
302 return 50;
306 //check for redo log file if present apply redo logs
307 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
308 if (FILE *file = fopen(dbRedoFileName, "r"))
310 fclose(file);
311 int ret = system("redo -a");
312 if (ret != 0) {
313 printf("Recovery failed. Redo log file corrupted\n");
314 Conf::logger.stopLogger();
315 session->destroySystemDatabase();
316 delete session;
317 return 60;
320 // take check point at this moment
321 sprintf(dbChkptSchema, "%s/db.chkpt.schema", Conf::config.getDbFile());
322 sprintf(dbChkptMap, "%s/db.chkpt.map", Conf::config.getDbFile());
323 sprintf(dbChkptData, "%s/db.chkpt.data", Conf::config.getDbFile());
324 ret = system("checkpoint");
325 if (ret != 0) {
326 printf("Unable to create checkpoint file. Database corrupted.\n");
327 Conf::logger.stopLogger();
328 session->destroySystemDatabase();
329 delete session;
330 return 70;
332 ret = unlink(dbRedoFileName);
333 if (ret != 0) {
334 printf("Unable to delete redo log file. Delete and restart the server\n");
335 Conf::logger.stopLogger();
336 session->destroySystemDatabase();
337 delete session;
338 return 80;
342 bool isCacheReq = false, isSQLReq= false;
343 recoverFlag = true;
344 if (opt == 1 && isInit && ! Conf::config.useDurability()) {
345 if (Conf::config.useCache()) {
346 printf("Database server recovering cached tables...\n");
347 int ret = system("cachetable -R");
348 if (ret != 0) {
349 printf("Cached Tables recovery failed %d\n", ret);
350 Conf::logger.stopLogger();
351 session->destroySystemDatabase();
352 delete session;
353 return 2;
355 printf("Cached Tables recovered\n");
356 } else {
357 printf("Cache mode is not set in csql.conf. Cannot recover\n");
358 Conf::logger.stopLogger();
359 session->destroySystemDatabase();
360 delete session;
361 return 1;
364 //TODO:: kill all the child servers and restart if !isInit
366 if(Conf::config.useCsqlSqlServer()) {
367 isSQLReq = true;
368 startServiceClient();
370 if ( (Conf::config.useCache() &&
371 Conf::config.getCacheMode()==ASYNC_MODE)) {
372 startAsyncServer();
374 if (Conf::config.useCache() && Conf::config.useTwoWayCache()) {
375 isCacheReq = true;
376 startCacheServer();
378 printf("Database Server Started...\n");
380 reloop:
381 while(!srvStop)
383 tval.tv_sec = timeout.tv_sec;
384 tval.tv_usec = timeout.tv_usec;
385 os::select(0, 0, 0, 0, &tval);
387 //send signal to all the registered process..check they are alive
388 cleanupDeadProcs(sysdb);
389 if (srvStop) break;
390 //TODO::if it fails to start 5 times, exit
391 if (isCacheReq && cachepid !=0 && checkDead(cachepid))
392 startCacheServer();
394 if (logActiveProcs(sysdb) != OK) {srvStop = 0; goto reloop; }
395 if (cachepid) os::kill(cachepid, SIGTERM);
396 if(asyncpid) os::kill(asyncpid, SIGTERM);
397 if (sqlserverpid) os::kill(sqlserverpid, SIGTERM);
398 //if (recoverFlag) dumpData();
399 if (Conf::config.useDurability() && Conf::config.useMmap()) {
400 //ummap the memory
401 char *startAddr = (char *) sysdb->getMetaDataPtr();
402 msync(startAddr + Conf::config.getMaxSysDbSize(),Conf::config.getMaxDbSize(), MS_SYNC);
403 munmap(startAddr + Conf::config.getMaxSysDbSize(), Conf::config.getMaxDbSize());
405 logFine(Conf::logger, "Server Exiting");
406 printf("Server Exiting\n");
407 logFine(Conf::logger, "Server Ended");
408 UID.destroy();
409 session->destroySystemDatabase();
410 Conf::logger.stopLogger();
411 delete session;
412 return 0;
414 void dumpData()
416 char cmd[1024];
417 //TODO::TAKE exclusive lock
418 sprintf(cmd, "csqldump >%s/csql.db.chkpt.1",Conf::config.getDbFile());
419 int ret = system(cmd);
420 if (ret != 0) return;
421 sprintf(cmd, "rm -rf %s/csql.db.cur", Conf::config.getDbFile());
422 if (ret != 0) return;
423 sprintf(cmd, "mv %s/csql.db.chkpt.1 %s/csql.db.chkpt", Conf::config.getDbFile());
424 if (ret != 0) return;
425 return;