1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
26 #include "queue_to_journal.h"
27 #include "serial_model.h"
29 #include "xport_to_queue.h"
44 #include <netinet/in.h>
47 #include <sys/types.h>
50 #define NREADERS_MAX 5
60 /* Start a new process via fork() and exec() Return the process ID of
61 the new child process. */
63 static int start_program(const char* program
, const char* argv
[])
68 char prg
[PATH_MAX
+ NAME_MAX
+ 1];
69 char exec_path
[PATH_MAX
+ 1];
71 /* Make sure the path to our executable is within reasonable
74 if ( strlen(argv
[0]) > PATH_MAX
)
76 LOG_ER("Program path too long: %s.\n", argv
[0]);
79 if ( strlen(program
) > NAME_MAX
)
81 LOG_ER("Program name too long: %s.\n", program
);
85 /* Create a child process. */
86 if ( (pid
= fork()) < 0 )
92 /* If we're the parent process, we're done -- the child process
99 /* Get the path to our executable, use it to exec() other programs
100 from the same directory. */
102 if ( 0 != (fn
= rindex(argv
[0], '/')) )
104 strncpy(exec_path
, argv
[0], fn
- argv
[0] + 1);
105 exec_path
[fn
- argv
[0] + 1] = '\0';
111 snprintf(prg
, sizeof(prg
), "%s%s", exec_path
, program
);
113 /* Use new program name. Make sure to mess with argv[0] after the
114 fork() call, so as not to bugger up the calling processes
119 LOG_PROG("About to execvp(\"%s\",\n", prg
);
120 for ( i
=0; argv
[i
]; ++i
)
122 LOG_PROG(" argv[%d] == \"%s\"\n", i
, argv
[i
]);
124 LOG_PROG(" argv[%d] == NULL\n\n", i
);
126 /* Replace our current process image with the new one. */
127 execvp(prg
, (char** const)argv
);
129 /* Any return is an error. */
131 LOG_ER("The execvp() function returned, exiting.\n");
132 exit(EXIT_FAILURE
); /* Should never get to this anyway. */
136 /* Start the external processes, then loop to restart any program
139 static void process_model(const char* argv
[])
142 int waiting_pid
= -1;
143 int queue_to_journal_pid
= -1;
144 int xport_to_queue_pid
[NREADERS_MAX
];
146 for ( i
=0; i
<NREADERS_MAX
; ++i
)
148 xport_to_queue_pid
[i
] = -1;
155 if ((-1 == queue_to_journal_pid
) || (waiting_pid
== queue_to_journal_pid
))
157 queue_to_journal_pid
= start_program("queue_to_journal", argv
);
160 for ( i
=0; i
<arg_nreaders
; ++i
)
162 if ((-1 == xport_to_queue_pid
[i
]) ||
163 (waiting_pid
== xport_to_queue_pid
[i
]))
165 xport_to_queue_pid
[i
] = start_program("xport_to_queue", argv
);
169 /* Wait and restart any program that exits. */
170 waiting_pid
= wait(&status
);
172 /* If a signal interrupts the wait, we loop. */
173 if ( (waiting_pid
< 0) && (errno
== EINTR
) )
175 /* If we get a rotate, send it along to the process
176 * that writes journals. */
179 if ( -1 != queue_to_journal_pid
)
181 LOG_PROG("Sending SIGHUP to queue_to_journal[%d] to "
182 "trigger log rotate.\n", queue_to_journal_pid
);
183 kill(queue_to_journal_pid
, SIGHUP
);
190 if ( WIFEXITED(status
) )
195 if ( WIFSIGNALED(status
) )
197 /* If any child process exits abnormally, log it and restart. */
198 const char* program
= "unknown";
200 if ( waiting_pid
== queue_to_journal_pid
)
202 program
= "queue_to_journal";
205 for ( i
=0; i
<arg_nreaders
; ++i
)
207 if ( waiting_pid
== xport_to_queue_pid
[i
] )
209 program
= "xport_to_queue";
213 log_msg(LOG_WARNING
, __FILE__
, __LINE__
,
214 "Our %s process exited (pid=%d) with "
215 "signal %d, restarting.\n",
216 program
, waiting_pid
, WTERMSIG(status
));
220 LOG_INF("Shutting down.");
222 /* We give the children a chance to run before we kill them. */
225 /* We are done, send a quit signal to our children. */
227 while ( wait(0) > 0 ) /* Wait for children. */
229 if ( errno
!= ECHILD
&& errno
!= EINTR
)
237 static void thread_model()
240 pthread_t queue_to_journal_tid
;
241 pthread_t xport_to_queue_tid
[NREADERS_MAX
];
243 static pthread_attr_t m_pthread_attr
;
244 // Start thread attributes with pthread FIFO policy (default would be OTHER)
245 pthread_attr_init(&m_pthread_attr
) ;
246 pthread_attr_setdetachstate(&m_pthread_attr
, PTHREAD_CREATE_JOINABLE
);
249 pthread_attr_setschedpolicy(&m_pthread_attr
, SCHED_FIFO
) ;
252 if ( pthread_create(&queue_to_journal_tid
,
253 &m_pthread_attr
, queue_to_journal
, NULL
) )
255 PERROR("pthread_create(&queue_to_journal_tid, NULL, queue_to_journal, NULL)");
257 LOG_INF("queue_to_journal_tid == %d\n", queue_to_journal_tid
);
259 for ( i
=0; i
<arg_nreaders
; ++i
)
261 if ( pthread_create(&xport_to_queue_tid
[i
], &m_pthread_attr
,
262 xport_to_queue
, NULL
) )
264 PERROR("pthread_create(&xport_to_queue_tid[i], NULL, xport_to_queue, NULL)");
268 pthread_attr_getschedpolicy((pthread_attr_t
*)&m_pthread_attr
,
270 switch ( schedpolicy
)
272 case SCHED_OTHER
: LOG_ER("pthread_policy=SCHED_OTHER\n") ; break ;
273 case SCHED_FIFO
: LOG_ER("pthread_policy=SCHED_FIFO") ; break ;
274 case SCHED_RR
: LOG_ER("pthread_policy=SCHED_RR") ; break ;
275 //case SCHED_MIN = SCHED_OTHER,
276 //case SCHED_MAX = SCHED_RR
277 default: LOG_ER("pthread_policy=unknown") ; break ;
280 LOG_INF("xport_to_queue_tid[%d] == %d\n",
281 i
, xport_to_queue_tid
[i
]);
290 LOG_INF("Shutting down.");
292 /* All threads share a single gbl_done, so just interrupt them and
293 wait for them to finish on their own. */
295 for ( i
=0; i
<arg_nreaders
; ++i
)
297 LOG_PROG("Sending SIGQUIT to xport_to_queue thread %d (%d).\n",
298 i
, xport_to_queue_tid
);
299 if ( pthread_kill(xport_to_queue_tid
[i
], SIGQUIT
) )
301 PERROR("pthread_kill");
305 LOG_PROG("Sending SIGQUIT to queue_to_journal thread (%d).\n",
306 queue_to_journal_tid
);
307 if ( pthread_kill(queue_to_journal_tid
, SIGQUIT
) )
312 PERROR("pthread_kill");
314 case ESRCH
: /* If the thread has already terminated. */
320 for (i
=0; i
<arg_nreaders
; ++i
)
322 LOG_PROG("Joining xport_to_queue thread %d (%d).\n",
323 i
, xport_to_queue_tid
[i
]);
324 if ( pthread_join(xport_to_queue_tid
[i
], NULL
) )
326 PERROR("pthread_join");
330 LOG_PROG("Joining queue_to_journal thread (%d).\n", queue_to_journal_tid
);
331 if ( pthread_join(queue_to_journal_tid
, NULL
) )
333 PERROR("pthread_join");
336 pthread_attr_destroy(&m_pthread_attr
);
339 static void thread_model()
341 LOG_ER("No POSIX thread support on this platform.\n");
345 /* Return non-zero if addrstr is a multicast address. */
347 #include <netinet/in.h>
348 #include <arpa/inet.h>
350 static int is_multicast(const char* addrstr
)
354 if ( 0 == inet_aton(addrstr
, &addr
) )
359 return IN_MULTICAST(ntohl(addr
.s_addr
));
362 static void do_fork()
378 static void daemonize()
381 const char *chdir_root
= "/";
392 if ( chdir(chdir_root
) < 0)
394 LOG_ER("Unable to chdir(\"%s\"): %s\n", chdir_root
, strerror(errno
));
398 /* Close all open files. */
399 fdlimit
= sysconf (_SC_OPEN_MAX
);
400 for ( fd
=0; fd
<fdlimit
; ++fd
)
405 /* Open 0, 1 and 2. */
406 open("/dev/null", O_RDWR
);
409 PERROR("Unable to dup() to replace stdout: %s\n");
414 PERROR("Unable to dup() to replace stderr: %s\n");
420 /* Write the process ID of this process into a file so we may be
423 static void write_pid_file()
425 /* Write out the PID into the PID file. */
428 FILE* pidfp
= fopen(arg_pid_file
, "w");
431 LOG_ER("Can't open PID file \"%s\"\n", arg_pid_file
);
435 /* This obnoxiousness is to avoid compiler warnings that we'd get
436 compiling on systems with long or int pid_t. */
437 pid_t pid
= getpid();
438 if ( sizeof(pid_t
) == sizeof(long) )
441 fprintf(pidfp
, "%ld\n", x
);
446 fprintf(pidfp
, "%d\n", x
);
454 /* Delete the PID file, if one was configured. */
456 static void delete_pid_file()
460 if ( unlink ( arg_pid_file
) < 0 )
462 PERROR("Unable to delete pid file");
468 int main(int argc
, const char* argv
[])
470 char _buf
[100] ; // for log messages
473 process_options(argc
, argv
);
475 if ( ! arg_nodaemonize
)
480 strcpy(_progver
, "lwes-journaller-") ;
481 strcat(_progver
, ""VERSION
"") ;
483 strcpy(_buf
, "Initializing - ") ;
484 strcat(_buf
, _progver
) ;
488 install_signal_handlers();
493 if ( is_multicast(arg_ip
) )
495 if ( ! arg_join_group
)
497 LOG_WARN("Using multi-cast address without joining group.\n");
499 if ( arg_nreaders
> 1 )
501 LOG_WARN("Multiple reader threads can't be used with multi-cast "
502 "addresses, will use a single thread instead.\n");
508 if ( arg_join_group
)
510 LOG_WARN("Can't join group with uni-cast address.\n");
513 if ( arg_nreaders
> NREADERS_MAX
)
515 LOG_WARN("Too many reader threads specified (%d) reducing to %d.\n",
516 arg_nreaders
, NREADERS_MAX
);
517 arg_nreaders
= NREADERS_MAX
;
519 if ( arg_njournalls
> 30 )
521 LOG_WARN("suspiciously large (%d) number of journals.",
525 strcpy(_buf
, "Starting up - ") ;
526 strcat(_buf
, _progver
) ;
530 if ( strcmp(arg_proc_type
, ARG_PROCESS
) == 0 )
535 if ( strcmp(arg_proc_type
, ARG_THREAD
) == 0 )
540 if ( strcmp(arg_proc_type
, ARG_SERIAL
) == 0 )
545 strcpy(_buf
, "Normal shutdown complete - ") ;
546 strcat(_buf
, _progver
) ;