Finishing the total time counters
[lwes-journaller.git] / src / lwes-journaller.c
blob77488bd59298950bd8b62663b69f3590e70a710d
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
23 #include "log.h"
24 #include "opt.h"
25 #include "perror.h"
26 #include "queue_to_journal.h"
27 #include "sig.h"
28 #include "xport_to_queue.h"
30 #if HAVE_PTHREAD_H
31 #include <pthread.h>
32 #endif
34 #include <assert.h>
35 #include <fcntl.h>
36 #include <limits.h>
37 #include <signal.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <strings.h>
41 #include <unistd.h>
43 #include <netinet/in.h>
45 #include <sys/stat.h>
46 #include <sys/types.h>
47 #include <sys/wait.h>
49 #define NREADERS_MAX 5
51 #ifndef NAME_MAX
52 #define NAME_MAX 255
53 #endif
55 #ifndef INADDR_NONE
56 #define INADDR_NONE 0
57 #endif
59 /* Start a new process via fork() and exec() Return the process ID of
60 the new child process. */
62 static int start_program(const char* program, const char* argv[])
64 int i;
65 int pid;
66 char* fn;
67 char prg[PATH_MAX + NAME_MAX + 1];
68 char exec_path[PATH_MAX + 1];
70 /* Make sure the path to our executable is within reasonable
71 limits. */
73 if ( strlen(argv[0]) > PATH_MAX )
75 LOG_ER("Program path too long: %s.\n", argv[0]);
76 exit(EXIT_FAILURE);
78 if ( strlen(program) > NAME_MAX )
80 LOG_ER("Program name too long: %s.\n", program);
81 exit(EXIT_FAILURE);
84 /* Create a child process. */
85 if ( (pid = fork()) < 0 )
87 PERROR("fork()");
88 exit(EXIT_FAILURE);
91 /* If we're the parent process, we're done -- the child process
92 continues on. */
93 if ( pid )
95 return pid;
98 /* Get the path to our executable, use it to exec() other programs
99 from the same directory. */
101 if ( 0 != (fn = rindex(argv[0], '/')) )
103 strncpy(exec_path, argv[0], fn - argv[0] + 1);
104 exec_path[fn - argv[0] + 1] = '\0';
106 else
108 exec_path[0] = '\0';
110 snprintf(prg, sizeof(prg), "%s%s", exec_path, program);
112 /* Use new program name. Make sure to mess with argv[0] after the
113 fork() call, so as not to bugger up the calling processes
114 argv[0]. */
116 argv[0] = prg;
118 LOG_PROG("About to execvp(\"%s\",\n", prg);
119 for ( i=0; argv[i]; ++i )
121 LOG_PROG(" argv[%d] == \"%s\"\n", i, argv[i]);
123 LOG_PROG(" argv[%d] == NULL\n\n", i);
125 /* Replace our current process image with the new one. */
126 execvp(prg, (char** const)argv);
128 /* Any return is an error. */
129 PERROR("execvp()");
130 LOG_ER("The execvp() function returned, exiting.\n");
131 exit(EXIT_FAILURE); /* Should never get to this anyway. */
135 /* Start the external processes, then loop to restart any program
136 that dies. */
138 static void process_model(const char* argv[])
140 int i;
141 int waiting_pid = -1;
142 int queue_to_journal_pid = -1;
143 int xport_to_queue_pid[NREADERS_MAX];
145 for ( i=0; i<NREADERS_MAX; ++i )
147 xport_to_queue_pid[i] = -1;
150 while ( ! gbl_done )
152 int status;
154 if ((-1 == queue_to_journal_pid) || (waiting_pid == queue_to_journal_pid))
156 queue_to_journal_pid = start_program("queue_to_journal", argv);
159 for ( i=0; i<arg_nreaders; ++i )
161 if ((-1 == xport_to_queue_pid[i]) ||
162 (waiting_pid == xport_to_queue_pid[i]))
164 xport_to_queue_pid[i] = start_program("xport_to_queue", argv);
168 /* Wait and restart any program that exits. */
169 waiting_pid = wait(&status);
171 /* If a signal interrupts the wait, we loop. */
172 if ( (waiting_pid < 0) && (errno == EINTR) )
174 /* If we get a rotate, send it along to the process
175 * that writes journals. */
176 if ( gbl_rotate )
178 if ( -1 != queue_to_journal_pid )
180 LOG_PROG("Sending SIGHUP to queue_to_journal[%d] to "
181 "trigger log rotate.\n", queue_to_journal_pid);
182 kill(queue_to_journal_pid, SIGHUP);
184 gbl_rotate = 0;
186 continue;
189 if ( WIFEXITED(status) )
191 break;
194 if ( WIFSIGNALED(status) )
196 /* If any child process exits abnormally, log it and restart. */
197 const char* program = "unknown";
199 if ( waiting_pid == queue_to_journal_pid )
201 program = "queue_to_journal";
204 for ( i=0; i<arg_nreaders; ++i )
206 if ( waiting_pid == xport_to_queue_pid[i] )
208 program = "xport_to_queue";
212 log_msg(LOG_WARNING, __FILE__, __LINE__,
213 "Our %s process exited (pid=%d) with "
214 "signal %d, restarting.\n",
215 program, waiting_pid, WTERMSIG(status));
219 LOG_INF("Shutting down.");
221 /* We give the children a chance to run before we kill them. */
222 sleep(1);
224 /* We are done, send a quit signal to our children. */
225 kill(0, SIGQUIT);
226 while ( wait(0) > 0 ) /* Wait for children. */
228 if ( errno != ECHILD && errno != EINTR )
230 PERROR("wait");
235 #if HAVE_PTHREAD_H
236 static void thread_model()
238 int i;
239 pthread_t queue_to_journal_tid;
240 pthread_t xport_to_queue_tid[NREADERS_MAX];
242 static pthread_attr_t m_pthread_attr ;
243 // Start thread attributes with pthread FIFO policy (default would be OTHER)
244 pthread_attr_init(&m_pthread_attr) ;
245 pthread_attr_setdetachstate(&m_pthread_attr, PTHREAD_CREATE_JOINABLE);
246 if ( arg_rt )
248 pthread_attr_setschedpolicy(&m_pthread_attr, SCHED_FIFO) ;
251 if ( pthread_create(&queue_to_journal_tid,
252 &m_pthread_attr, queue_to_journal, NULL) )
254 PERROR("pthread_create(&queue_to_journal_tid, NULL, queue_to_journal, NULL)");
256 LOG_PROG("queue_to_journal_tid == %d\n", queue_to_journal_tid);
258 for ( i=0; i<arg_nreaders; ++i )
260 if ( pthread_create(&xport_to_queue_tid[i], &m_pthread_attr,
261 xport_to_queue, NULL) )
263 PERROR("pthread_create(&xport_to_queue_tid[i], NULL, xport_to_queue, NULL)");
266 int schedpolicy ;
267 pthread_attr_getschedpolicy((pthread_attr_t*)&m_pthread_attr,
268 &schedpolicy) ;
269 switch ( schedpolicy )
271 case SCHED_OTHER: LOG_ER("pthread_policy=SCHED_OTHER\n") ; break ;
272 case SCHED_FIFO: LOG_ER("pthread_policy=SCHED_FIFO") ; break ;
273 case SCHED_RR: LOG_ER("pthread_policy=SCHED_RR") ; break ;
274 //case SCHED_MIN = SCHED_OTHER,
275 //case SCHED_MAX = SCHED_RR
276 default: LOG_ER("pthread_policy=unknown") ; break ;
279 LOG_PROG("xport_to_queue_tid[%d] == %d\n",
280 i, xport_to_queue_tid[i]);
284 while ( ! gbl_done )
286 sleep(99);
289 LOG_INF("Shutting down.");
291 /* All threads share a single gbl_done, so just interrupt them and
292 wait for them to finish on their own. */
294 for ( i=0; i<arg_nreaders; ++i )
296 LOG_PROG("Sending SIGQUIT to xport_to_queue thread %d (%d).\n",
297 i, xport_to_queue_tid);
298 if ( pthread_kill(xport_to_queue_tid[i], SIGQUIT) )
300 PERROR("pthread_kill");
304 LOG_PROG("Sending SIGQUIT to queue_to_journal thread (%d).\n",
305 queue_to_journal_tid);
306 if ( pthread_kill(queue_to_journal_tid, SIGQUIT) )
308 switch ( errno )
310 default:
311 PERROR("pthread_kill");
313 case ESRCH: /* If the thread has already terminated. */
314 case ECHILD:
315 break;
319 for (i=0; i<arg_nreaders; ++i)
321 LOG_PROG("Joining xport_to_queue thread %d (%d).\n",
322 i, xport_to_queue_tid[i]);
323 if ( pthread_join(xport_to_queue_tid[i], NULL) )
325 PERROR("pthread_join");
329 LOG_PROG("Joining queue_to_journal thread (%d).\n", queue_to_journal_tid);
330 if ( pthread_join(queue_to_journal_tid, NULL) )
332 PERROR("pthread_join");
335 pthread_attr_destroy(&m_pthread_attr);
337 #else
338 static void thread_model()
340 LOG_ER("No POSIX thread support on this platform.\n");
342 #endif
344 /* Return non-zero if addrstr is a multicast address. */
346 #include <netinet/in.h>
347 #include <arpa/inet.h>
349 static int is_multicast(const char* addrstr)
351 struct in_addr addr;
353 if ( 0 == inet_aton(addrstr, &addr) )
355 return 0;
358 return IN_MULTICAST(ntohl(addr.s_addr));
361 static void do_fork()
363 switch ( fork() )
365 case 0:
366 break;
368 case -1:
369 PERROR("fork()");
370 exit(EXIT_FAILURE);
372 default:
373 exit(EXIT_SUCCESS);
377 static void daemonize()
379 int fd, fdlimit;
380 const char *chdir_root = "/";
382 do_fork();
383 if ( setsid() < 0 )
385 PERROR("setsid()");
386 exit(EXIT_FAILURE);
388 do_fork();
390 umask(0);
391 if ( chdir(chdir_root) < 0)
393 LOG_ER("Unable to chdir(\"%s\"): %s\n", chdir_root, strerror(errno));
394 exit(EXIT_FAILURE);
397 /* Close all open files. */
398 fdlimit = sysconf (_SC_OPEN_MAX);
399 for ( fd=0; fd<fdlimit; ++fd )
401 close(fd);
404 /* Open 0, 1 and 2. */
405 open("/dev/null", O_RDWR);
406 if ( dup(0) != 1 )
408 PERROR("Unable to dup() to replace stdout: %s\n");
409 exit(EXIT_FAILURE);
411 if ( dup(0) != 2 )
413 PERROR("Unable to dup() to replace stderr: %s\n");
414 exit(EXIT_FAILURE);
419 /* Write the process ID of this process into a file so we may be
420 easily signaled. */
422 static void write_pid_file()
424 /* Write out the PID into the PID file. */
425 if ( arg_pid_file )
427 FILE* pidfp = fopen(arg_pid_file, "w");
428 if ( ! pidfp )
430 LOG_ER("Can't open PID file \"%s\"\n", arg_pid_file);
432 else
434 /* This obnoxiousness is to avoid compiler warnings that we'd get
435 compiling on systems with long or int pid_t. */
436 pid_t pid = getpid();
437 if ( sizeof(pid_t) == sizeof(long) )
439 long x = pid;
440 fprintf(pidfp, "%ld\n", x);
442 else
444 int x = pid;
445 fprintf(pidfp, "%d\n", x);
447 fclose(pidfp);
453 /* Delete the PID file, if one was configured. */
455 static void delete_pid_file()
457 if ( arg_pid_file )
459 if ( unlink ( arg_pid_file ) < 0 )
461 PERROR("Unable to delete pid file");
467 int main(int argc, const char* argv[])
469 char _buf[100] ; // for log messages
470 char _progver[30] ;
472 process_options(argc, argv);
474 if ( ! arg_nodaemonize )
476 daemonize();
479 strcpy(_progver, "lwes-journaller-") ;
480 strcat(_progver, ""VERSION"") ;
482 strcpy(_buf, "Initializing - ") ;
483 strcat(_buf, _progver) ;
484 strcat(_buf, "\n") ;
485 LOG_INF(_buf);
487 install_signal_handlers();
489 write_pid_file();
491 /* Check options. */
492 if ( is_multicast(arg_ip) )
494 if ( ! arg_join_group )
496 LOG_WARN("Using multi-cast address without joining group.\n");
498 if ( arg_nreaders > 1 )
500 LOG_WARN("Multiple reader threads can't be used with multi-cast "
501 "addresses, will use a single thread instead.\n");
502 arg_nreaders = 1;
505 else
507 if ( arg_join_group )
509 LOG_WARN("Can't join group with uni-cast address.\n");
512 if ( arg_nreaders > NREADERS_MAX )
514 LOG_WARN("Too many reader threads specified (%d) reducing to %d.\n",
515 arg_nreaders, NREADERS_MAX);
516 arg_nreaders = NREADERS_MAX;
518 if ( arg_njournalls > 30 )
520 LOG_WARN("suspiciously large (%d) number of journals.",
521 arg_njournalls);
524 strcpy(_buf, "Starting up - ") ;
525 strcat(_buf, _progver) ;
526 strcat(_buf, "\n") ;
527 LOG_INF(_buf);
529 if ( strcmp(arg_proc_type, ARG_PROCESS) == 0 )
531 process_model(argv);
534 if ( strcmp(arg_proc_type, ARG_THREAD) == 0 )
536 thread_model();
539 strcpy(_buf, "Normal shutdown complete - ") ;
540 strcat(_buf, _progver) ;
541 strcat(_buf, "\n") ;
542 LOG_INF(_buf);
544 delete_pid_file();
546 return 0;