code reorganisation phase-I
[csql.git] / src / tools / csqlserver.cxx
blobe1e09ec7dd042e87829f88ff344ceebf0b193434
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<os.h>
17 #include<CSql.h>
18 #include<SessionImpl.h>
19 #include<Debug.h>
20 #include<Process.h>
21 #include<Database.h>
22 #include<Transaction.h>
23 #include<Lock.h>
24 #include<CacheTableLoader.h>
25 char* version = "csql-linux-i686-3.0GA";
26 int srvStop =0;
27 pid_t asyncpid=0;
28 pid_t sqlserverpid=0;
29 pid_t cachepid=0;
30 bool recoverFlag=false;
31 bool monitorServer= false;
32 SessionImpl *session = NULL;
33 static void sigTermHandler(int sig)
35 printf("Received signal %d\nStopping the server\n", sig);
36 srvStop = 1;
37 monitorServer=false;
39 static void sigChildHandler(int sig)
41 os::signal(SIGCHLD, sigChildHandler);
42 int stat;
43 waitpid(-1, &stat, WNOHANG);
44 //TODO::move waitpid to os wrapper
47 bool checkDead(pid_t pid)
49 int ret = os::kill(pid, 0);
50 if (ret == -1) {
51 if (errno == EPERM)
52 printError(ErrWarning, "No permission to check process %d is alive.");
53 else
54 return true;
56 return false;
59 DbRetVal releaseAllResources(Database *sysdb, ThreadInfo *info )
61 printf("Releasing all the resources for process %d %lu\n", info->pid_, info->thrid_);
62 //recover for all the mutexes in has_
63 for (int i =0; i < MAX_MUTEX_PER_THREAD; i++)
65 if (info->has_[i] != NULL)
67 printf("Dead Procs: %d %lu holding mutex %x %s \n", info->pid_, info->thrid_, info->has_[i], info->has_[i]->name);
68 logFine(Conf::logger, "Dead Procs: %d %lu holding mutex %x %s \n", info->pid_, info->thrid_, info->has_[i], info->has_[i]->name);
69 //TODO::recovery of mutexes
70 sysdb->recoverMutex(info->has_[i]);
71 //srvStop = 1;
72 //return ErrSysFatal;
75 TransactionManager *tm = new TransactionManager();
76 LockManager *lm = new LockManager(sysdb);
77 if (info->thrTrans_.trans_ != NULL && info->thrTrans_.trans_->status_ == TransRunning)
79 printf("Rollback Transaction %x\n", info->thrTrans_.trans_);
80 tm->rollback(lm, info->thrTrans_.trans_);
81 info->thrTrans_.trans_->status_ = TransNotUsed;
83 info->init();
84 delete tm;
85 delete lm;
86 return OK;
89 DbRetVal cleanupDeadProcs(Database *sysdb)
91 DbRetVal rv = sysdb->getProcessTableMutex(false);
92 if (OK != rv)
94 printError(rv,"Unable to get process table mutex");
95 return rv;
97 pid_t pid;
98 pid = os::getpid();
99 pthread_t thrid = os::getthrid();
102 ThreadInfo* tInfo = sysdb->getThreadInfo(0);
103 int i=0;
104 ThreadInfo* freeSlot = NULL;
105 for (; i < Conf::config.getMaxProcs(); i++)
107 //check whether it is alive
108 if (tInfo->pid_ !=0 && checkDead(tInfo->pid_)) releaseAllResources(sysdb, tInfo);
109 tInfo++;
111 sysdb->releaseProcessTableMutex(false);
112 return OK;
116 DbRetVal logActiveProcs(Database *sysdb)
118 DbRetVal rv = sysdb->getProcessTableMutex(false);
119 if (OK != rv)
121 printError(rv,"Unable to get process table mutex");
122 return rv;
124 ThreadInfo* tInfo = sysdb->getThreadInfo(0);
125 int i=0, count =0;
126 ThreadInfo* freeSlot = NULL;
127 for (; i < Conf::config.getMaxProcs(); i++)
129 if (tInfo->pid_ !=0 ) {
130 logFine(Conf::logger, "Registered Procs: %d %lu\n", tInfo->pid_, tInfo->thrid_);
131 printf("Client process with pid %d is still registered\n", tInfo->pid_);
132 if( tInfo->pid_ != asyncpid && tInfo->pid_ != cachepid &&
133 tInfo->pid_ != sqlserverpid)
134 count++;
136 tInfo++;
138 sysdb->releaseProcessTableMutex(false);
139 if (count) return ErrSysInternal; else return OK;
141 void startCacheServer()
143 char execName[1024];
144 sprintf(execName, "%s/bin/csqlcacheserver", os::getenv("CSQL_INSTALL_ROOT"));
145 if (srvStop) return;
146 //printf("filename is %s\n", execName);
147 cachepid = os::createProcess(execName, "csqlcacheserver");
148 if (cachepid != -1)
149 printf("Cache Receiver Started\t [PID=%d]\n",cachepid);
150 return;
153 void startServiceClient()
155 char execName[1024];
156 sprintf(execName, "%s/bin/csqlsqlserver", os::getenv("CSQL_INSTALL_ROOT"));
157 //printf("filename is %s\n", execName);
158 if (srvStop) return;
159 sqlserverpid = os::createProcess(execName, "csqlsqlserver");
160 if (sqlserverpid != -1)
161 printf("Network Server Started\t [PID=%d] [PORT=%d]\n", sqlserverpid,Conf::config.getPort());
163 return;
166 void startAsyncServer()
168 char execName[1024];
169 sprintf(execName, "%s/bin/csqlasyncserver", os::getenv("CSQL_INSTALL_ROOT"));
170 //printf("filename is %s\n", execName);
171 if (srvStop) return;
172 asyncpid = os::createProcess(execName, "csqlasyncserver");
173 if (asyncpid != -1)
174 printf("Async Cache Server Started [PID=%d]\n", asyncpid);
175 return;
179 void printUsage()
181 printf("Usage: csqlserver [-c] [-v]\n");
182 printf(" v -> print the version.\n");
183 printf(" c -> recover all cached tables from the target database.\n");
184 printf("Description: Start the csql server and initialize the database.\n");
185 return;
187 int main(int argc, char **argv)
189 int c = 0,opt = 0;
190 char cmd[1024];
191 while ((c = getopt(argc, argv, "cv?")) != EOF)
193 switch (c)
195 case '?' : { opt = 10; break; } //print help
196 case 'c' : { opt = 1; break; } //recover all the tables from cache
197 case 'v' : { opt = 2; break; } //print version
198 default: opt=10;
201 }//while options
203 if (opt == 10) {
204 printUsage();
205 return 0;
206 }else if (opt ==2) {
207 printf("%s\n",version);
208 return 0;
210 session = new SessionImpl();
211 DbRetVal rv = session->readConfigFile();
212 if (rv != OK)
214 printf("Unable to read the configuration file \n");
215 return 1;
217 os::signal(SIGINT, sigTermHandler);
218 os::signal(SIGTERM, sigTermHandler);
219 os::signal(SIGCHLD, sigChildHandler);
220 rv = Conf::logger.startLogger(Conf::config.getLogFile(), true);
221 if (rv != OK)
223 printf("Unable to start the Conf::logger\n");
224 return 2;
226 bool isInit = true;
227 logFine(Conf::logger, "Server Started");
228 int ret = session->initSystemDatabase();
229 if (0 != ret)
231 //printf(" System Database Initialization failed\n");
232 printf("Attaching to exising database\n");
233 isInit = false;
234 delete session;
235 session = new SessionImpl();
236 ret = session->open(DBAUSER, DBAPASS);
237 if (ret !=0) {
238 printf("Unable to attach to existing database\n");
239 return 3;
242 bool end = false;
243 struct timeval timeout, tval;
244 timeout.tv_sec = 5;
245 timeout.tv_usec = 0;
246 Database* sysdb = session->getSystemDatabase();
247 recoverFlag = false;
249 GlobalUniqueID UID;
250 if (isInit) UID.create();
252 if(isInit && Conf::config.useDurability())
254 char dbRedoFileName[1024];
255 char dbChkptSchema[1024];
256 char dbChkptMap[1024];
257 char dbChkptData[1024];
258 char dbBackupFile[1024];
260 //check for check point file if present recover
261 sprintf(dbChkptSchema, "%s/db.chkpt.schema1", Conf::config.getDbFile());
262 if (FILE *file = fopen(dbChkptSchema, "r")) {
263 fclose(file);
264 sprintf(cmd, "cp -f %s %s/db.chkpt.schema", dbChkptSchema, Conf::config.getDbFile());
265 int ret = system(cmd);
266 if (ret != 0) {
267 Conf::logger.stopLogger();
268 session->destroySystemDatabase();
269 delete session;
270 return 20;
273 sprintf(dbChkptMap, "%s/db.chkpt.map1", Conf::config.getDbFile());
274 if (FILE *file = fopen(dbChkptMap, "r")) {
275 fclose(file);
276 sprintf(cmd, "cp -f %s %s/db.chkpt.map", dbChkptMap, Conf::config.getDbFile());
277 int ret = system(cmd);
278 if (ret != 0) {
279 Conf::logger.stopLogger();
280 session->destroySystemDatabase();
281 delete session;
282 return 30;
285 sprintf(dbChkptData, "%s/db.chkpt.data", Conf::config.getDbFile());
286 sprintf(dbBackupFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
287 FILE *fl = NULL;
288 if (!Conf::config.useMmap() && (fl = fopen(dbBackupFile, "r"))) {
289 fclose(fl);
290 sprintf(cmd, "cp %s/db.chkpt.data1 %s", Conf::config.getDbFile(), dbChkptData);
291 int ret = system(cmd);
292 if (ret != 0) {
293 printError(ErrOS, "Unable to take backup for chkpt data file");
294 return 40;
297 if (FILE *file = fopen(dbChkptData, "r")) {
298 fclose(file);
299 int ret = system("recover");
300 if (ret != 0) {
301 printf("Recovery failed\n");
302 Conf::logger.stopLogger();
303 session->destroySystemDatabase();
304 delete session;
305 return 50;
309 //check for redo log file if present apply redo logs
310 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
311 if (FILE *file = fopen(dbRedoFileName, "r"))
313 fclose(file);
314 int ret = system("redo -a");
315 if (ret != 0) {
316 printf("Recovery failed. Redo log file corrupted\n");
317 Conf::logger.stopLogger();
318 session->destroySystemDatabase();
319 delete session;
320 return 60;
323 // take check point at this moment
324 sprintf(dbChkptSchema, "%s/db.chkpt.schema", Conf::config.getDbFile());
325 sprintf(dbChkptMap, "%s/db.chkpt.map", Conf::config.getDbFile());
326 sprintf(dbChkptData, "%s/db.chkpt.data", Conf::config.getDbFile());
327 ret = system("checkpoint");
328 if (ret != 0) {
329 printf("Unable to create checkpoint file. Database corrupted.\n");
330 Conf::logger.stopLogger();
331 session->destroySystemDatabase();
332 delete session;
333 return 70;
335 ret = unlink(dbRedoFileName);
336 if (ret != 0) {
337 printf("Unable to delete redo log file. Delete and restart the server\n");
338 Conf::logger.stopLogger();
339 session->destroySystemDatabase();
340 delete session;
341 return 80;
345 bool isCacheReq = false, isSQLReq= false, isAsyncReq=false;
346 recoverFlag = true;
347 if (opt == 1 && isInit && ! Conf::config.useDurability()) {
348 if (Conf::config.useCache()) {
349 printf("Database server recovering cached tables...\n");
350 int ret = system("cachetable -R");
351 if (ret != 0) {
352 printf("Cached Tables recovery failed %d\n", ret);
353 Conf::logger.stopLogger();
354 session->destroySystemDatabase();
355 delete session;
356 return 2;
358 printf("Cached Tables recovered\n");
359 } else {
360 printf("Cache mode is not set in csql.conf. Cannot recover\n");
361 Conf::logger.stopLogger();
362 session->destroySystemDatabase();
363 delete session;
364 return 1;
367 //TODO:: kill all the child servers and restart if !isInit
369 if(Conf::config.useCsqlSqlServer()) {
370 isSQLReq = true;
371 startServiceClient();
373 if ( (Conf::config.useCache() &&
374 Conf::config.getCacheMode()==ASYNC_MODE)) {
375 int msgid = os::msgget(Conf::config.getMsgKey(), 0666);
376 if (msgid != -1) os::msgctl(msgid, IPC_RMID, NULL);
377 isAsyncReq = true;
378 startAsyncServer();
380 if (Conf::config.useCache() && Conf::config.useTwoWayCache()) {
381 isCacheReq = true;
382 startCacheServer();
384 printf("Database Server Started...\n");
385 logFine(Conf::logger, "Database Server Started");
386 monitorServer= Conf::config.useMonitorServers();
388 reloop:
389 while(!srvStop)
391 tval.tv_sec = timeout.tv_sec;
392 tval.tv_usec = timeout.tv_usec;
393 os::select(0, 0, 0, 0, &tval);
395 //send signal to all the registered process..check they are alive
396 cleanupDeadProcs(sysdb);
397 if (srvStop) break;
398 if (monitorServer) {
399 if (isCacheReq && cachepid !=0 && checkDead(cachepid)) {
400 logFine(Conf::logger, "Cache Receiver Died pid:%d", cachepid);
401 startCacheServer();
403 if (isAsyncReq && asyncpid !=0 && checkDead(asyncpid)) {
404 logFine(Conf::logger, "Async Server Died pid:%d", asyncpid);
405 int msgid = os::msgget(Conf::config.getMsgKey(), 0666);
406 if (msgid != -1) os::msgctl(msgid, IPC_RMID, NULL);
407 startAsyncServer();
409 if (isSQLReq && sqlserverpid !=0 && checkDead(sqlserverpid)) {
410 logFine(Conf::logger, "Network Server Died pid:%d", sqlserverpid);
411 os::sleep(5);
412 startServiceClient();
417 if (logActiveProcs(sysdb) != OK) {srvStop = 0;
418 monitorServer= Conf::config.useMonitorServers();
419 goto reloop;
421 if (cachepid) os::kill(cachepid, SIGTERM);
422 if(asyncpid) os::kill(asyncpid, SIGTERM);
423 if (sqlserverpid) os::kill(sqlserverpid, SIGTERM);
424 if (Conf::config.useDurability() && Conf::config.useMmap()) {
425 //ummap the memory
426 char *startAddr = (char *) sysdb->getMetaDataPtr();
427 msync(startAddr + Conf::config.getMaxSysDbSize(),Conf::config.getMaxDbSize(), MS_SYNC);
428 munmap(startAddr + Conf::config.getMaxSysDbSize(), Conf::config.getMaxDbSize());
430 logFine(Conf::logger, "Server Exiting");
431 printf("Server Exiting\n");
432 logFine(Conf::logger, "Server Ended");
433 UID.destroy();
434 session->destroySystemDatabase();
435 Conf::logger.stopLogger();
436 delete session;
437 return 0;