Merging serial model from branch; converting to the hideous indentation style of...
[lwes-journaller.git] / src / lwes-journaller.c
blob52c4340c05d4075d664e69ef780d96b1fbabbc41
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
23 #include "log.h"
24 #include "opt.h"
25 #include "perror.h"
26 #include "queue_to_journal.h"
27 #include "serial_model.h"
28 #include "sig.h"
29 #include "xport_to_queue.h"
31 #if HAVE_PTHREAD_H
32 #include <pthread.h>
33 #endif
35 #include <assert.h>
36 #include <fcntl.h>
37 #include <limits.h>
38 #include <signal.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <strings.h>
42 #include <unistd.h>
44 #include <netinet/in.h>
46 #include <sys/stat.h>
47 #include <sys/types.h>
48 #include <sys/wait.h>
50 #define NREADERS_MAX 5
52 #ifndef NAME_MAX
53 #define NAME_MAX 255
54 #endif
56 #ifndef INADDR_NONE
57 #define INADDR_NONE 0
58 #endif
60 /* Start a new process via fork() and exec() Return the process ID of
61 the new child process. */
63 static int start_program(const char* program, const char* argv[])
65 int i;
66 int pid;
67 char* fn;
68 char prg[PATH_MAX + NAME_MAX + 1];
69 char exec_path[PATH_MAX + 1];
71 /* Make sure the path to our executable is within reasonable
72 limits. */
74 if ( strlen(argv[0]) > PATH_MAX )
76 LOG_ER("Program path too long: %s.\n", argv[0]);
77 exit(EXIT_FAILURE);
79 if ( strlen(program) > NAME_MAX )
81 LOG_ER("Program name too long: %s.\n", program);
82 exit(EXIT_FAILURE);
85 /* Create a child process. */
86 if ( (pid = fork()) < 0 )
88 PERROR("fork()");
89 exit(EXIT_FAILURE);
92 /* If we're the parent process, we're done -- the child process
93 continues on. */
94 if ( pid )
96 return pid;
99 /* Get the path to our executable, use it to exec() other programs
100 from the same directory. */
102 if ( 0 != (fn = rindex(argv[0], '/')) )
104 strncpy(exec_path, argv[0], fn - argv[0] + 1);
105 exec_path[fn - argv[0] + 1] = '\0';
107 else
109 exec_path[0] = '\0';
111 snprintf(prg, sizeof(prg), "%s%s", exec_path, program);
113 /* Use new program name. Make sure to mess with argv[0] after the
114 fork() call, so as not to bugger up the calling processes
115 argv[0]. */
117 argv[0] = prg;
119 LOG_PROG("About to execvp(\"%s\",\n", prg);
120 for ( i=0; argv[i]; ++i )
122 LOG_PROG(" argv[%d] == \"%s\"\n", i, argv[i]);
124 LOG_PROG(" argv[%d] == NULL\n\n", i);
126 /* Replace our current process image with the new one. */
127 execvp(prg, (char** const)argv);
129 /* Any return is an error. */
130 PERROR("execvp()");
131 LOG_ER("The execvp() function returned, exiting.\n");
132 exit(EXIT_FAILURE); /* Should never get to this anyway. */
136 /* Start the external processes, then loop to restart any program
137 that dies. */
139 static void process_model(const char* argv[])
141 int i;
142 int waiting_pid = -1;
143 int queue_to_journal_pid = -1;
144 int xport_to_queue_pid[NREADERS_MAX];
146 for ( i=0; i<NREADERS_MAX; ++i )
148 xport_to_queue_pid[i] = -1;
151 while ( ! gbl_done )
153 int status;
155 if ((-1 == queue_to_journal_pid) || (waiting_pid == queue_to_journal_pid))
157 queue_to_journal_pid = start_program("queue_to_journal", argv);
160 for ( i=0; i<arg_nreaders; ++i )
162 if ((-1 == xport_to_queue_pid[i]) ||
163 (waiting_pid == xport_to_queue_pid[i]))
165 xport_to_queue_pid[i] = start_program("xport_to_queue", argv);
169 /* Wait and restart any program that exits. */
170 waiting_pid = wait(&status);
172 /* If a signal interrupts the wait, we loop. */
173 if ( (waiting_pid < 0) && (errno == EINTR) )
175 /* If we get a rotate, send it along to the process
176 * that writes journals. */
177 if ( gbl_rotate )
179 if ( -1 != queue_to_journal_pid )
181 LOG_PROG("Sending SIGHUP to queue_to_journal[%d] to "
182 "trigger log rotate.\n", queue_to_journal_pid);
183 kill(queue_to_journal_pid, SIGHUP);
185 gbl_rotate = 0;
187 continue;
190 if ( WIFEXITED(status) )
192 break;
195 if ( WIFSIGNALED(status) )
197 /* If any child process exits abnormally, log it and restart. */
198 const char* program = "unknown";
200 if ( waiting_pid == queue_to_journal_pid )
202 program = "queue_to_journal";
205 for ( i=0; i<arg_nreaders; ++i )
207 if ( waiting_pid == xport_to_queue_pid[i] )
209 program = "xport_to_queue";
213 log_msg(LOG_WARNING, __FILE__, __LINE__,
214 "Our %s process exited (pid=%d) with "
215 "signal %d, restarting.\n",
216 program, waiting_pid, WTERMSIG(status));
220 LOG_INF("Shutting down.");
222 /* We give the children a chance to run before we kill them. */
223 sleep(1);
225 /* We are done, send a quit signal to our children. */
226 kill(0, SIGQUIT);
227 while ( wait(0) > 0 ) /* Wait for children. */
229 if ( errno != ECHILD && errno != EINTR )
231 PERROR("wait");
236 #if HAVE_PTHREAD_H
237 static void thread_model()
239 int i;
240 pthread_t queue_to_journal_tid;
241 pthread_t xport_to_queue_tid[NREADERS_MAX];
243 static pthread_attr_t m_pthread_attr ;
244 // Start thread attributes with pthread FIFO policy (default would be OTHER)
245 pthread_attr_init(&m_pthread_attr) ;
246 pthread_attr_setdetachstate(&m_pthread_attr, PTHREAD_CREATE_JOINABLE);
247 if ( arg_rt )
249 pthread_attr_setschedpolicy(&m_pthread_attr, SCHED_FIFO) ;
252 if ( pthread_create(&queue_to_journal_tid,
253 &m_pthread_attr, queue_to_journal, NULL) )
255 PERROR("pthread_create(&queue_to_journal_tid, NULL, queue_to_journal, NULL)");
257 LOG_INF("queue_to_journal_tid == %d\n", queue_to_journal_tid);
259 for ( i=0; i<arg_nreaders; ++i )
261 if ( pthread_create(&xport_to_queue_tid[i], &m_pthread_attr,
262 xport_to_queue, NULL) )
264 PERROR("pthread_create(&xport_to_queue_tid[i], NULL, xport_to_queue, NULL)");
267 int schedpolicy ;
268 pthread_attr_getschedpolicy((pthread_attr_t*)&m_pthread_attr,
269 &schedpolicy) ;
270 switch ( schedpolicy )
272 case SCHED_OTHER: LOG_ER("pthread_policy=SCHED_OTHER\n") ; break ;
273 case SCHED_FIFO: LOG_ER("pthread_policy=SCHED_FIFO") ; break ;
274 case SCHED_RR: LOG_ER("pthread_policy=SCHED_RR") ; break ;
275 //case SCHED_MIN = SCHED_OTHER,
276 //case SCHED_MAX = SCHED_RR
277 default: LOG_ER("pthread_policy=unknown") ; break ;
280 LOG_INF("xport_to_queue_tid[%d] == %d\n",
281 i, xport_to_queue_tid[i]);
285 while ( ! gbl_done )
287 sleep(99);
290 LOG_INF("Shutting down.");
292 /* All threads share a single gbl_done, so just interrupt them and
293 wait for them to finish on their own. */
295 for ( i=0; i<arg_nreaders; ++i )
297 LOG_PROG("Sending SIGQUIT to xport_to_queue thread %d (%d).\n",
298 i, xport_to_queue_tid);
299 if ( pthread_kill(xport_to_queue_tid[i], SIGQUIT) )
301 PERROR("pthread_kill");
305 LOG_PROG("Sending SIGQUIT to queue_to_journal thread (%d).\n",
306 queue_to_journal_tid);
307 if ( pthread_kill(queue_to_journal_tid, SIGQUIT) )
309 switch ( errno )
311 default:
312 PERROR("pthread_kill");
314 case ESRCH: /* If the thread has already terminated. */
315 case ECHILD:
316 break;
320 for (i=0; i<arg_nreaders; ++i)
322 LOG_PROG("Joining xport_to_queue thread %d (%d).\n",
323 i, xport_to_queue_tid[i]);
324 if ( pthread_join(xport_to_queue_tid[i], NULL) )
326 PERROR("pthread_join");
330 LOG_PROG("Joining queue_to_journal thread (%d).\n", queue_to_journal_tid);
331 if ( pthread_join(queue_to_journal_tid, NULL) )
333 PERROR("pthread_join");
336 pthread_attr_destroy(&m_pthread_attr);
338 #else
339 static void thread_model()
341 LOG_ER("No POSIX thread support on this platform.\n");
343 #endif
345 /* Return non-zero if addrstr is a multicast address. */
347 #include <netinet/in.h>
348 #include <arpa/inet.h>
350 static int is_multicast(const char* addrstr)
352 struct in_addr addr;
354 if ( 0 == inet_aton(addrstr, &addr) )
356 return 0;
359 return IN_MULTICAST(ntohl(addr.s_addr));
362 static void do_fork()
364 switch ( fork() )
366 case 0:
367 break;
369 case -1:
370 PERROR("fork()");
371 exit(EXIT_FAILURE);
373 default:
374 exit(EXIT_SUCCESS);
378 static void daemonize()
380 int fd, fdlimit;
381 const char *chdir_root = "/";
383 do_fork();
384 if ( setsid() < 0 )
386 PERROR("setsid()");
387 exit(EXIT_FAILURE);
389 do_fork();
391 umask(0);
392 if ( chdir(chdir_root) < 0)
394 LOG_ER("Unable to chdir(\"%s\"): %s\n", chdir_root, strerror(errno));
395 exit(EXIT_FAILURE);
398 /* Close all open files. */
399 fdlimit = sysconf (_SC_OPEN_MAX);
400 for ( fd=0; fd<fdlimit; ++fd )
402 close(fd);
405 /* Open 0, 1 and 2. */
406 open("/dev/null", O_RDWR);
407 if ( dup(0) != 1 )
409 PERROR("Unable to dup() to replace stdout: %s\n");
410 exit(EXIT_FAILURE);
412 if ( dup(0) != 2 )
414 PERROR("Unable to dup() to replace stderr: %s\n");
415 exit(EXIT_FAILURE);
420 /* Write the process ID of this process into a file so we may be
421 easily signaled. */
423 static void write_pid_file()
425 /* Write out the PID into the PID file. */
426 if ( arg_pid_file )
428 FILE* pidfp = fopen(arg_pid_file, "w");
429 if ( ! pidfp )
431 LOG_ER("Can't open PID file \"%s\"\n", arg_pid_file);
433 else
435 /* This obnoxiousness is to avoid compiler warnings that we'd get
436 compiling on systems with long or int pid_t. */
437 pid_t pid = getpid();
438 if ( sizeof(pid_t) == sizeof(long) )
440 long x = pid;
441 fprintf(pidfp, "%ld\n", x);
443 else
445 int x = pid;
446 fprintf(pidfp, "%d\n", x);
448 fclose(pidfp);
454 /* Delete the PID file, if one was configured. */
456 static void delete_pid_file()
458 if ( arg_pid_file )
460 if ( unlink ( arg_pid_file ) < 0 )
462 PERROR("Unable to delete pid file");
468 int main(int argc, const char* argv[])
470 char _buf[100] ; // for log messages
471 char _progver[30] ;
473 process_options(argc, argv);
475 if ( ! arg_nodaemonize )
477 daemonize();
480 strcpy(_progver, "lwes-journaller-") ;
481 strcat(_progver, ""VERSION"") ;
483 strcpy(_buf, "Initializing - ") ;
484 strcat(_buf, _progver) ;
485 strcat(_buf, "\n") ;
486 LOG_INF(_buf);
488 install_signal_handlers();
490 write_pid_file();
492 /* Check options. */
493 if ( is_multicast(arg_ip) )
495 if ( ! arg_join_group )
497 LOG_WARN("Using multi-cast address without joining group.\n");
499 if ( arg_nreaders > 1 )
501 LOG_WARN("Multiple reader threads can't be used with multi-cast "
502 "addresses, will use a single thread instead.\n");
503 arg_nreaders = 1;
506 else
508 if ( arg_join_group )
510 LOG_WARN("Can't join group with uni-cast address.\n");
513 if ( arg_nreaders > NREADERS_MAX )
515 LOG_WARN("Too many reader threads specified (%d) reducing to %d.\n",
516 arg_nreaders, NREADERS_MAX);
517 arg_nreaders = NREADERS_MAX;
519 if ( arg_njournalls > 30 )
521 LOG_WARN("suspiciously large (%d) number of journals.",
522 arg_njournalls);
525 strcpy(_buf, "Starting up - ") ;
526 strcat(_buf, _progver) ;
527 strcat(_buf, "\n") ;
528 LOG_INF(_buf);
530 if ( strcmp(arg_proc_type, ARG_PROCESS) == 0 )
532 process_model(argv);
535 if ( strcmp(arg_proc_type, ARG_THREAD) == 0 )
537 thread_model();
540 if ( strcmp(arg_proc_type, ARG_SERIAL) == 0 )
542 serial_model();
545 strcpy(_buf, "Normal shutdown complete - ") ;
546 strcat(_buf, _progver) ;
547 strcat(_buf, "\n") ;
548 LOG_INF(_buf);
550 delete_pid_file();
552 return 0;