Before asyncServer is started msg queue is removed.
[csql.git] / src / tools / csqlserver.cxx
blob7b90184128e5bcea5a4cb08f6b9cdff2051153ac
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<os.h>
17 #include<CSql.h>
18 #include<SessionImpl.h>
19 #include<Debug.h>
20 #include<Process.h>
21 #include<Database.h>
22 #include<Transaction.h>
23 #include<Lock.h>
24 #include<CacheTableLoader.h>
25 #include<sys/wait.h> //TODO::move this to os.h
26 char* version = "csql-linux-i686-3.0GA";
27 int srvStop =0;
28 pid_t asyncpid=0;
29 pid_t sqlserverpid=0;
30 pid_t cachepid=0;
31 bool recoverFlag=false;
32 void dumpData();
33 SessionImpl *session = NULL;
34 static void sigTermHandler(int sig)
36 printf("Received signal %d\nStopping the server\n", sig);
37 srvStop = 1;
39 static void sigChildHandler(int sig)
41 os::signal(SIGCHLD, sigChildHandler);
42 int stat;
43 waitpid(-1, &stat, WNOHANG);
44 //TODO::move waitpid to os wrapper
47 bool checkDead(pid_t pid)
49 int ret = os::kill(pid, 0);
50 if (ret == -1) {
51 if (errno == EPERM)
52 printError(ErrWarning, "No permission to check process %d is alive.");
53 else
54 return true;
56 return false;
59 DbRetVal releaseAllResources(Database *sysdb, ThreadInfo *info )
61 printf("Releasing all the resources for process %d %lu\n", info->pid_, info->thrid_);
62 //recover for all the mutexes in has_
63 for (int i =0; i < MAX_MUTEX_PER_THREAD; i++)
65 if (info->has_[i] != NULL)
67 printf("Dead Procs: %d %lu holding mutex %x %s \n", info->pid_, info->thrid_, info->has_[i], info->has_[i]->name);
68 logFine(Conf::logger, "Dead Procs: %d %lu holding mutex %x %s \n", info->pid_, info->thrid_, info->has_[i], info->has_[i]->name);
69 //TODO::recovery of mutexes
70 sysdb->recoverMutex(info->has_[i]);
71 //srvStop = 1;
72 //return ErrSysFatal;
75 TransactionManager *tm = new TransactionManager();
76 LockManager *lm = new LockManager(sysdb);
77 for (int i = 0 ;i < MAX_THREADS_PER_PROCESS; i++)
79 if (info->thrTrans_[i].trans_ != NULL && info->thrTrans_[i].trans_->status_ == TransRunning)
81 printf("Rollback Transaction %x\n", info->thrTrans_[i].trans_);
82 tm->rollback(lm, info->thrTrans_[i].trans_);
83 info->thrTrans_[i].trans_->status_ = TransNotUsed;
86 info->init();
87 delete tm;
88 delete lm;
89 return OK;
92 DbRetVal cleanupDeadProcs(Database *sysdb)
94 DbRetVal rv = sysdb->getProcessTableMutex(false);
95 if (OK != rv)
97 printError(rv,"Unable to get process table mutex");
98 return rv;
100 pid_t pid;
101 pid = os::getpid();
102 pthread_t thrid = os::getthrid();
105 ThreadInfo* pInfo = sysdb->getThreadInfo(0);
106 int i=0;
107 ThreadInfo* freeSlot = NULL;
108 for (; i < Conf::config.getMaxProcs(); i++)
110 //check whether it is alive
111 if (pInfo->pid_ !=0 && checkDead(pInfo->pid_)) releaseAllResources(sysdb, pInfo);
112 pInfo++;
114 sysdb->releaseProcessTableMutex(false);
115 return OK;
119 DbRetVal logActiveProcs(Database *sysdb)
121 DbRetVal rv = sysdb->getProcessTableMutex(false);
122 if (OK != rv)
124 printError(rv,"Unable to get process table mutex");
125 return rv;
127 ThreadInfo* pInfo = sysdb->getThreadInfo(0);
128 int i=0, count =0;
129 ThreadInfo* freeSlot = NULL;
130 for (; i < Conf::config.getMaxProcs(); i++)
132 if (pInfo->pid_ !=0 ) {
133 logFine(Conf::logger, "Registered Procs: %d %lu\n", pInfo->pid_, pInfo->thrid_);
134 printf("Client process with pid %d is still registered\n", pInfo->pid_);
135 if( pInfo->pid_ != asyncpid && pInfo->pid_ != cachepid &&
136 pInfo->pid_ != sqlserverpid)
137 count++;
139 pInfo++;
141 sysdb->releaseProcessTableMutex(false);
142 if (count) return ErrSysInternal; else return OK;
144 void startCacheServer()
146 char execName[1024];
147 sprintf(execName, "%s/bin/csqlcacheserver", os::getenv("CSQL_INSTALL_ROOT"));
148 if (srvStop) return;
149 //printf("filename is %s\n", execName);
150 cachepid = os::createProcess(execName, "csqlcacheserver");
151 if (cachepid != -1)
152 printf("Cache Receiver Started\t [PID=%d]\n",cachepid);
153 return;
156 void startServiceClient()
158 char execName[1024];
159 sprintf(execName, "%s/bin/csqlsqlserver", os::getenv("CSQL_INSTALL_ROOT"));
160 //printf("filename is %s\n", execName);
161 if (srvStop) return;
162 sqlserverpid = os::createProcess(execName, "csqlsqlserver");
163 if (sqlserverpid != -1)
164 printf("Network Server Started\t [PID=%d] [PORT=%d]\n", sqlserverpid,Conf::config.getPort());
166 return;
169 void startAsyncServer()
171 char execName[1024];
172 sprintf(execName, "%s/bin/csqlasyncserver", os::getenv("CSQL_INSTALL_ROOT"));
173 //printf("filename is %s\n", execName);
174 if (srvStop) return;
175 asyncpid = os::createProcess(execName, "csqlasyncserver");
176 if (asyncpid != -1)
177 printf("Async Cache Server Started [PID=%d]\n", asyncpid);
178 return;
182 void printUsage()
184 printf("Usage: csqlserver [-c] [-v]\n");
185 printf(" v -> print the version.\n");
186 printf(" c -> recover all cached tables from the target database.\n");
187 printf("Description: Start the csql server and initialize the database.\n");
188 return;
190 int main(int argc, char **argv)
192 int c = 0,opt = 0;
193 char cmd[1024];
194 while ((c = getopt(argc, argv, "cv?")) != EOF)
196 switch (c)
198 case '?' : { opt = 10; break; } //print help
199 case 'c' : { opt = 1; break; } //recover all the tables from cache
200 case 'v' : { opt = 2; break; } //print version
201 default: opt=10;
204 }//while options
206 if (opt == 10) {
207 printUsage();
208 return 0;
209 }else if (opt ==2) {
210 printf("%s\n",version);
211 return 0;
213 session = new SessionImpl();
214 DbRetVal rv = session->readConfigFile();
215 if (rv != OK)
217 printf("Unable to read the configuration file \n");
218 return 1;
220 os::signal(SIGINT, sigTermHandler);
221 os::signal(SIGTERM, sigTermHandler);
222 os::signal(SIGCHLD, sigChildHandler);
223 rv = Conf::logger.startLogger(Conf::config.getLogFile(), true);
224 if (rv != OK)
226 printf("Unable to start the Conf::logger\n");
227 return 2;
229 bool isInit = true;
230 logFine(Conf::logger, "Server Started");
231 int ret = session->initSystemDatabase();
232 if (0 != ret)
234 //printf(" System Database Initialization failed\n");
235 printf("Attaching to exising database\n");
236 isInit = false;
237 delete session;
238 session = new SessionImpl();
239 ret = session->open(DBAUSER, DBAPASS);
240 if (ret !=0) {
241 printf("Unable to attach to existing database\n");
242 return 3;
245 bool end = false;
246 struct timeval timeout, tval;
247 timeout.tv_sec = 5;
248 timeout.tv_usec = 0;
249 Database* sysdb = session->getSystemDatabase();
250 recoverFlag = false;
252 GlobalUniqueID UID;
253 if (isInit) UID.create();
255 if(isInit && Conf::config.useDurability())
257 char dbRedoFileName[1024];
258 char dbChkptSchema[1024];
259 char dbChkptMap[1024];
260 char dbChkptData[1024];
261 char dbBackupFile[1024];
263 //check for check point file if present recover
264 sprintf(dbChkptSchema, "%s/db.chkpt.schema1", Conf::config.getDbFile());
265 if (FILE *file = fopen(dbChkptSchema, "r")) {
266 fclose(file);
267 sprintf(cmd, "cp -f %s %s/db.chkpt.schema", dbChkptSchema, Conf::config.getDbFile());
268 int ret = system(cmd);
269 if (ret != 0) {
270 Conf::logger.stopLogger();
271 session->destroySystemDatabase();
272 delete session;
273 return 20;
276 sprintf(dbChkptMap, "%s/db.chkpt.map1", Conf::config.getDbFile());
277 if (FILE *file = fopen(dbChkptMap, "r")) {
278 fclose(file);
279 sprintf(cmd, "cp -f %s %s/db.chkpt.map", dbChkptMap, Conf::config.getDbFile());
280 int ret = system(cmd);
281 if (ret != 0) {
282 Conf::logger.stopLogger();
283 session->destroySystemDatabase();
284 delete session;
285 return 30;
288 sprintf(dbChkptData, "%s/db.chkpt.data", Conf::config.getDbFile());
289 sprintf(dbBackupFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
290 FILE *fl = NULL;
291 if (!Conf::config.useMmap() && (fl = fopen(dbBackupFile, "r"))) {
292 fclose(fl);
293 sprintf(cmd, "cp %s/db.chkpt.data1 %s", Conf::config.getDbFile(), dbChkptData);
294 int ret = system(cmd);
295 if (ret != 0) {
296 printError(ErrOS, "Unable to take backup for chkpt data file");
297 return 40;
300 if (FILE *file = fopen(dbChkptData, "r")) {
301 fclose(file);
302 int ret = system("recover");
303 if (ret != 0) {
304 printf("Recovery failed\n");
305 Conf::logger.stopLogger();
306 session->destroySystemDatabase();
307 delete session;
308 return 50;
312 //check for redo log file if present apply redo logs
313 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
314 if (FILE *file = fopen(dbRedoFileName, "r"))
316 fclose(file);
317 int ret = system("redo -a");
318 if (ret != 0) {
319 printf("Recovery failed. Redo log file corrupted\n");
320 Conf::logger.stopLogger();
321 session->destroySystemDatabase();
322 delete session;
323 return 60;
326 // take check point at this moment
327 sprintf(dbChkptSchema, "%s/db.chkpt.schema", Conf::config.getDbFile());
328 sprintf(dbChkptMap, "%s/db.chkpt.map", Conf::config.getDbFile());
329 sprintf(dbChkptData, "%s/db.chkpt.data", Conf::config.getDbFile());
330 ret = system("checkpoint");
331 if (ret != 0) {
332 printf("Unable to create checkpoint file. Database corrupted.\n");
333 Conf::logger.stopLogger();
334 session->destroySystemDatabase();
335 delete session;
336 return 70;
338 ret = unlink(dbRedoFileName);
339 if (ret != 0) {
340 printf("Unable to delete redo log file. Delete and restart the server\n");
341 Conf::logger.stopLogger();
342 session->destroySystemDatabase();
343 delete session;
344 return 80;
348 bool isCacheReq = false, isSQLReq= false;
349 recoverFlag = true;
350 if (opt == 1 && isInit && ! Conf::config.useDurability()) {
351 if (Conf::config.useCache()) {
352 printf("Database server recovering cached tables...\n");
353 int ret = system("cachetable -R");
354 if (ret != 0) {
355 printf("Cached Tables recovery failed %d\n", ret);
356 Conf::logger.stopLogger();
357 session->destroySystemDatabase();
358 delete session;
359 return 2;
361 printf("Cached Tables recovered\n");
362 } else {
363 printf("Cache mode is not set in csql.conf. Cannot recover\n");
364 Conf::logger.stopLogger();
365 session->destroySystemDatabase();
366 delete session;
367 return 1;
370 //TODO:: kill all the child servers and restart if !isInit
372 if(Conf::config.useCsqlSqlServer()) {
373 isSQLReq = true;
374 startServiceClient();
376 if ( (Conf::config.useCache() &&
377 Conf::config.getCacheMode()==ASYNC_MODE)) {
378 int msgid = os::msgget(Conf::config.getMsgKey(), 0666);
379 if (msgid != -1) os::msgctl(msgid, IPC_RMID, NULL);
380 startAsyncServer();
382 if (Conf::config.useCache() && Conf::config.useTwoWayCache()) {
383 isCacheReq = true;
384 startCacheServer();
386 printf("Database Server Started...\n");
388 reloop:
389 while(!srvStop)
391 tval.tv_sec = timeout.tv_sec;
392 tval.tv_usec = timeout.tv_usec;
393 os::select(0, 0, 0, 0, &tval);
395 //send signal to all the registered process..check they are alive
396 cleanupDeadProcs(sysdb);
397 if (srvStop) break;
398 //TODO::if it fails to start 5 times, exit
399 if (isCacheReq && cachepid !=0 && checkDead(cachepid))
400 startCacheServer();
402 if (logActiveProcs(sysdb) != OK) {srvStop = 0; goto reloop; }
403 if (cachepid) os::kill(cachepid, SIGTERM);
404 if(asyncpid) os::kill(asyncpid, SIGTERM);
405 if (sqlserverpid) os::kill(sqlserverpid, SIGTERM);
406 //if (recoverFlag) dumpData();
407 if (Conf::config.useDurability() && Conf::config.useMmap()) {
408 //ummap the memory
409 char *startAddr = (char *) sysdb->getMetaDataPtr();
410 msync(startAddr + Conf::config.getMaxSysDbSize(),Conf::config.getMaxDbSize(), MS_SYNC);
411 munmap(startAddr + Conf::config.getMaxSysDbSize(), Conf::config.getMaxDbSize());
413 logFine(Conf::logger, "Server Exiting");
414 printf("Server Exiting\n");
415 logFine(Conf::logger, "Server Ended");
416 UID.destroy();
417 session->destroySystemDatabase();
418 Conf::logger.stopLogger();
419 delete session;
420 return 0;
422 void dumpData()
424 char cmd[1024];
425 //TODO::TAKE exclusive lock
426 sprintf(cmd, "csqldump >%s/csql.db.chkpt.1",Conf::config.getDbFile());
427 int ret = system(cmd);
428 if (ret != 0) return;
429 sprintf(cmd, "rm -rf %s/csql.db.cur", Conf::config.getDbFile());
430 if (ret != 0) return;
431 sprintf(cmd, "mv %s/csql.db.chkpt.1 %s/csql.db.chkpt", Conf::config.getDbFile());
432 if (ret != 0) return;
433 return;