windows changes
[csql.git] / src / tools / csqlserver.cxx
blob597c88f48905106e93dde01999fc88caa109cdde
1 /***************************************************************************
2 * Copyright (C) 2007 by www.databasecache.com *
3 * Contact: praba_tuty@databasecache.com *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 ***************************************************************************/
16 #include<os.h>
17 #include<CSql.h>
18 #include<SessionImpl.h>
19 #include<Debug.h>
20 #include<Process.h>
21 #include<Database.h>
22 #include<Transaction.h>
23 #include<Lock.h>
24 #include<CacheTableLoader.h>
25 char* version = "csql-linux-i686-3.0GA";
26 int srvStop =0;
27 pid_t asyncpid=0;
28 pid_t sqlserverpid=0;
29 pid_t cachepid=0;
30 pid_t chkptpid=0;
31 bool recoverFlag=false;
32 bool monitorServer= false;
33 SessionImpl *session = NULL;
34 static void sigTermHandler(int sig)
36 printf("Received signal %d\nStopping the server\n", sig);
37 srvStop = 1;
38 monitorServer=false;
40 static void sigChildHandler(int sig)
42 os::signal(SIGCHLD, sigChildHandler);
43 int stat;
44 waitpid(-1, &stat, WNOHANG);
45 //TODO::move waitpid to os wrapper
48 bool checkDead(pid_t pid)
50 int ret = os::kill(pid, 0);
51 if (ret == -1) {
52 if (errno == EPERM)
53 printError(ErrWarning, "No permission to check process %d is alive.");
54 else
55 return true;
57 return false;
60 DbRetVal releaseAllResources(Database *sysdb, ThreadInfo *info )
62 printf("Releasing all the resources for process %d %lu\n", info->pid_, info->thrid_);
63 //recover for all the mutexes in has_
64 for (int i =0; i < MAX_MUTEX_PER_THREAD; i++)
66 if (info->has_[i] != NULL)
68 printf("Dead Procs: %d %lu holding mutex %x %s \n", info->pid_, info->thrid_, info->has_[i], info->has_[i]->name);
69 logFine(Conf::logger, "Dead Procs: %d %lu holding mutex %x %s \n", info->pid_, info->thrid_, info->has_[i], info->has_[i]->name);
70 //TODO::recovery of mutexes
71 sysdb->recoverMutex(info->has_[i]);
72 //srvStop = 1;
73 //return ErrSysFatal;
76 TransactionManager *tm = new TransactionManager();
77 LockManager *lm = new LockManager(sysdb);
78 if (info->thrTrans_.trans_ != NULL && info->thrTrans_.trans_->status_ == TransRunning)
80 printf("Rollback Transaction %x\n", info->thrTrans_.trans_);
81 tm->rollback(lm, info->thrTrans_.trans_);
82 info->thrTrans_.trans_->status_ = TransNotUsed;
84 info->init();
85 delete tm;
86 delete lm;
87 return OK;
90 DbRetVal cleanupDeadProcs(Database *sysdb)
92 DbRetVal rv = sysdb->getProcessTableMutex(false);
93 if (OK != rv)
95 printError(rv,"Unable to get process table mutex");
96 return rv;
98 pid_t pid;
99 pid = os::getpid();
100 pthread_t thrid = os::getthrid();
103 ThreadInfo* tInfo = sysdb->getThreadInfo(0);
104 int i=0;
105 ThreadInfo* freeSlot = NULL;
106 for (; i < Conf::config.getMaxProcs(); i++)
108 //check whether it is alive
109 if (tInfo->pid_ !=0 && checkDead(tInfo->pid_)) releaseAllResources(sysdb, tInfo);
110 tInfo++;
112 sysdb->releaseProcessTableMutex(false);
113 return OK;
117 DbRetVal logActiveProcs(Database *sysdb)
119 DbRetVal rv = sysdb->getProcessTableMutex(false);
120 if (OK != rv)
122 printError(rv,"Unable to get process table mutex");
123 return rv;
125 ThreadInfo* tInfo = sysdb->getThreadInfo(0);
126 int i=0, count =0;
127 ThreadInfo* freeSlot = NULL;
128 for (; i < Conf::config.getMaxProcs(); i++)
130 if (tInfo->pid_ !=0 ) {
131 logFine(Conf::logger, "Registered Procs: %d %lu\n", tInfo->pid_, tInfo->thrid_);
132 printf("Client process with pid %d is still registered\n", tInfo->pid_);
133 if( tInfo->pid_ != asyncpid && tInfo->pid_ != cachepid &&
134 tInfo->pid_ != sqlserverpid)
135 count++;
137 tInfo++;
139 sysdb->releaseProcessTableMutex(false);
140 if (count) return ErrSysInternal; else return OK;
142 void startCacheServer()
144 char execName[1024];
145 sprintf(execName, "%s/bin/csqlcacheserver", os::getenv("CSQL_INSTALL_ROOT"));
146 if (srvStop) return;
147 //printf("filename is %s\n", execName);
148 cachepid = os::createProcess(execName, "csqlcacheserver");
149 if (cachepid != -1)
150 printf("Cache Receiver Started\t [PID=%d]\n",cachepid);
151 return;
154 void startServiceClient()
156 char execName[1024];
157 sprintf(execName, "%s/bin/csqlsqlserver", os::getenv("CSQL_INSTALL_ROOT"));
158 //printf("filename is %s\n", execName);
159 if (srvStop) return;
160 sqlserverpid = os::createProcess(execName, "csqlsqlserver");
161 if (sqlserverpid != -1)
162 printf("Network Server Started\t [PID=%d] [PORT=%d]\n", sqlserverpid,Conf::config.getPort());
164 return;
167 void startAsyncServer()
169 char execName[1024];
170 sprintf(execName, "%s/bin/csqlasyncserver", os::getenv("CSQL_INSTALL_ROOT"));
171 //printf("filename is %s\n", execName);
172 if (srvStop) return;
173 asyncpid = os::createProcess(execName, "csqlasyncserver");
174 if (asyncpid != -1)
175 printf("Async Cache Server Started [PID=%d]\n", asyncpid);
176 return;
178 void startCheckpointServer()
180 char execName[1024];
181 sprintf(execName, "%s/bin/csqlcheckpointserver", os::getenv("CSQL_INSTALL_ROOT"));
182 if (srvStop) return;
183 chkptpid = os::createProcess(execName, "csqlcheckpointserver");
184 if (chkptpid != -1) {
185 printf("Checkpoint Server Started [PID=%d]\n", chkptpid);
186 logFine(Conf::logger, "Checkpoint Server Started pid:%d", chkptpid);
188 return;
191 int recoverAndCheckPoint()
193 char dbRedoFileName[MAX_FILE_LEN];
194 char dbChkptSchema[MAX_FILE_LEN];
195 char dbChkptMap[MAX_FILE_LEN];
196 char dbChkptData[MAX_FILE_LEN];
197 char dbBackupFile[MAX_FILE_LEN];
198 char cmd[1024];
200 //check for check point file if present recover
201 sprintf(dbChkptSchema, "%s/db.chkpt.schema1", Conf::config.getDbFile());
202 if (FILE *file = fopen(dbChkptSchema, "r")) {
203 fclose(file);
204 sprintf(cmd, "cp -f %s %s/db.chkpt.schema", dbChkptSchema,
205 Conf::config.getDbFile());
206 int ret = system(cmd);
207 if (ret != 0) {
208 printError(ErrOS, "backup schema file: Recovery failed.");
209 return 1;
213 sprintf(dbChkptMap, "%s/db.chkpt.map1", Conf::config.getDbFile());
214 if (FILE *file = fopen(dbChkptMap, "r")) {
215 fclose(file);
216 sprintf(cmd, "cp -f %s %s/db.chkpt.map", dbChkptMap,
217 Conf::config.getDbFile());
218 int ret = system(cmd);
219 if (ret != 0) {
220 printError(ErrOS, "backup map file: Recovery failed.");
221 return 2;
225 int chkptID= Database::getCheckpointID();
226 sprintf(dbChkptData, "%s/db.chkpt.data%d", Conf::config.getDbFile(),
227 chkptID);
228 sprintf(dbBackupFile, "%s/db.chkpt.data1", Conf::config.getDbFile());
230 FILE *fl = NULL;
231 if (!Conf::config.useMmap() && (fl = fopen(dbBackupFile, "r"))) {
232 fclose(fl);
233 sprintf(cmd, "cp -f %s/db.chkpt.data1 %s", Conf::config.getDbFile(),
234 dbChkptData);
235 int ret = system(cmd);
236 if (ret != 0) {
237 printError(ErrOS, "backup data file. Recovery failed.");
238 return 3;
241 if (FILE *file = fopen(dbChkptData, "r")) {
242 fclose(file);
243 int ret = system("recover");
244 if (ret != 0) {
245 printError(ErrSysInternal, "recover: Recovery failed\n");
246 return 4;
250 //check for redo log file if present apply redo logs
251 sprintf(dbRedoFileName, "%s/csql.db.cur", Conf::config.getDbFile());
252 if (FILE *file = fopen(dbRedoFileName, "r")) {
253 fclose(file);
254 int ret = system("redo -a");
255 if (ret != 0) {
256 printError(ErrSysInternal, "redo: Recovery failed.\n");
257 return 5;
259 DatabaseManager *dbMgr = session->getDatabaseManager();
260 DbRetVal rv = dbMgr->checkPoint();
261 if (rv != OK) {
262 printError(ErrSysInternal,"checkpoint: Recovery failed.");
263 return 6;
266 return 0;
269 DbRetVal recoverCachedTables()
271 printf("Database server recovering cached tables...\n");
272 logFine(Conf::logger, "Recovering Cached tables");
273 int ret = system("cachetable -R");
274 if (ret != 0) {
275 printError(ErrSysInternal, "cachetable: Recovery failed %d\n", ret);
276 return ErrSysInternal;
278 printf("Cached Tables recovered\n");
279 logFine(Conf::logger, "Cache Tables Recovery Complete");
282 void printUsage()
284 printf("Usage: csqlserver [-c] [-v]\n");
285 printf(" v -> print the version.\n");
286 printf(" c -> recover all cached tables from the target database.\n");
287 printf("Description: Start the csql server and initialize the database.\n");
288 return;
291 int main(int argc, char **argv)
293 int c = 0, opt = 0;
294 bool freshStart = false;
295 while ((c = getopt(argc, argv, "cvi?")) != EOF)
297 switch (c)
299 case '?' : { opt = 10; break; } //print help
300 case 'c' : { opt = 1; break; } //recover all the tables from cache
301 case 'v' : { opt = 2; break; } //print version
302 case 'i' : { freshStart = true; break; }
303 default: opt=10;
305 }//while options
307 if (opt == 10) { printUsage(); return 0; }
308 else if (opt ==2) { printf("%s\n",version); return 0; }
310 session = new SessionImpl();
311 DbRetVal rv = session->readConfigFile();
312 if (rv != OK)
314 printf("Unable to read the configuration file \n");
315 delete session;
316 return 1;
319 if (freshStart) {
320 char cmd[1024];
321 sprintf(cmd, "rm -rf %s/*", Conf::config.getDbFile());
322 int ret = system(cmd);
323 if (ret != 0) { delete session; return 2; }
324 if (Conf::config.useDurability()) {
325 FILE *fp = fopen(Conf::config.getTableConfigFile(), "w+");
326 fclose(fp);
330 os::signal(SIGINT, sigTermHandler);
331 os::signal(SIGTERM, sigTermHandler);
332 os::signal(SIGCHLD, sigChildHandler);
333 rv = Conf::logger.startLogger(Conf::config.getLogFile(), true);
334 if (rv != OK)
336 printf("Unable to start the Conf::logger\n");
337 delete session;
338 return 2;
340 bool isInit = true;
341 logFine(Conf::logger, "Server Started");
342 int ret = session->initSystemDatabase();
343 if (0 != ret) {
344 printf("Attaching to exising database\n");
345 logFine(Conf::logger, "Attaching to existing database instance");
346 isInit = false;
347 delete session;
348 session = new SessionImpl();
349 ret = session->open(DBAUSER, DBAPASS);
350 if (ret !=0) {
351 printError(ErrSysInternal,
352 "Unable to attach to existing database\n");
353 Conf::logger.stopLogger();
354 session->destroySystemDatabase();
355 delete session;
356 return 3;
359 bool end = false;
360 struct timeval timeout, tval;
361 timeout.tv_sec = 5;
362 timeout.tv_usec = 0;
363 Database* sysdb = session->getSystemDatabase();
364 recoverFlag = false;
366 GlobalUniqueID UID;
367 if (isInit) UID.create();
369 if(isInit && Conf::config.useDurability()) {
370 ret = recoverAndCheckPoint();
371 if (ret) {
372 Conf::logger.stopLogger();
373 session->destroySystemDatabase();
374 delete session;
375 return 4;
378 recoverFlag = true;
380 bool isAsyncReq = Conf::config.useCache() &&
381 Conf::config.getCacheMode() == ASYNC_MODE;
382 bool isCacheReq = Conf::config.useCache() && Conf::config.useTwoWayCache()
383 && Conf::config.getCacheMode() != OFFLINE_MODE;
384 bool isSQLReq = Conf::config.useCsqlSqlServer();
385 bool isChkptReq = Conf::config.useDurability();
387 if (isInit && !Conf::config.useDurability() && Conf::config.useCache()) {
388 rv = recoverCachedTables();
389 if (rv != OK) {
390 Conf::logger.stopLogger();
391 session->destroySystemDatabase();
392 delete session;
393 return 5;
397 //TODO:: kill all the child servers and restart if !isInit
399 if(isSQLReq) startServiceClient();
400 if (isAsyncReq) {
401 int msgid = os::msgget(Conf::config.getMsgKey(), 0666);
402 if (msgid != -1) os::msgctl(msgid, IPC_RMID, NULL);
403 startAsyncServer();
405 if (isCacheReq) startCacheServer();
406 if(isChkptReq) startCheckpointServer();
408 printf("Database Server Started...\n");
409 logFine(Conf::logger, "Database Server Started");
410 monitorServer= Conf::config.useMonitorServers();
412 reloop:
413 while(!srvStop)
415 tval.tv_sec = timeout.tv_sec;
416 tval.tv_usec = timeout.tv_usec;
417 os::select(0, 0, 0, 0, &tval);
419 //send signal to all the registered process..check they are alive
420 cleanupDeadProcs(sysdb);
421 if (srvStop) break;
422 if (monitorServer) {
423 if (isCacheReq && cachepid !=0 && checkDead(cachepid)) {
424 logFine(Conf::logger, "Cache Receiver Died pid:%d", cachepid);
425 startCacheServer();
427 if (isAsyncReq && asyncpid !=0 && checkDead(asyncpid)) {
428 logFine(Conf::logger, "Async Server Died pid:%d", asyncpid);
429 int msgid = os::msgget(Conf::config.getMsgKey(), 0666);
430 if (msgid != -1) os::msgctl(msgid, IPC_RMID, NULL);
431 startAsyncServer();
433 if (isSQLReq && sqlserverpid !=0 && checkDead(sqlserverpid)) {
434 logFine(Conf::logger, "Network Server Died pid:%d", sqlserverpid);
435 startServiceClient();
437 if (isChkptReq && chkptpid !=0 && checkDead(chkptpid)) {
438 logFine(Conf::logger, "Checkpoint Server Died pid:%d", chkptpid);
439 startCheckpointServer();
443 if (logActiveProcs(sysdb) != OK) {srvStop = 0;
444 monitorServer= Conf::config.useMonitorServers();
445 goto reloop;
447 if (cachepid) os::kill(cachepid, SIGTERM);
448 if(asyncpid) os::kill(asyncpid, SIGTERM);
449 if (sqlserverpid) os::kill(sqlserverpid, SIGTERM);
450 if (chkptpid) os::kill(chkptpid, SIGTERM);
452 if (Conf::config.useDurability() && Conf::config.useMmap()) {
453 //ummap the memory
454 char *startAddr = (char *) sysdb->getMetaDataPtr();
455 msync(startAddr + Conf::config.getMaxSysDbSize(),
456 Conf::config.getMaxDbSize(), MS_SYNC);
457 munmap(startAddr + Conf::config.getMaxSysDbSize(),
458 Conf::config.getMaxDbSize());
460 logFine(Conf::logger, "Server Exiting");
461 printf("Server Exiting\n");
462 logFine(Conf::logger, "Server Ended");
463 UID.destroy();
464 session->destroySystemDatabase();
465 Conf::logger.stopLogger();
466 delete session;
467 return 0;