1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
26 #include "queue_to_journal.h"
28 #include "xport_to_queue.h"
43 #include <netinet/in.h>
46 #include <sys/types.h>
49 #define NREADERS_MAX 5
59 /* Start a new process via fork() and exec() Return the process ID of
60 the new child process. */
62 static int start_program(const char* program
, const char* argv
[])
67 char prg
[PATH_MAX
+ NAME_MAX
+ 1];
68 char exec_path
[PATH_MAX
+ 1];
70 /* Make sure the path to our executable is within reasonable
73 if ( strlen(argv
[0]) > PATH_MAX
)
75 LOG_ER("Program path too long: %s.\n", argv
[0]);
78 if ( strlen(program
) > NAME_MAX
)
80 LOG_ER("Program name too long: %s.\n", program
);
84 /* Create a child process. */
85 if ( (pid
= fork()) < 0 )
91 /* If we're the parent process, we're done -- the child process
98 /* Get the path to our executable, use it to exec() other programs
99 from the same directory. */
101 if ( 0 != (fn
= rindex(argv
[0], '/')) )
103 strncpy(exec_path
, argv
[0], fn
- argv
[0] + 1);
104 exec_path
[fn
- argv
[0] + 1] = '\0';
110 snprintf(prg
, sizeof(prg
), "%s%s", exec_path
, program
);
112 /* Use new program name. Make sure to mess with argv[0] after the
113 fork() call, so as not to bugger up the calling processes
118 LOG_PROG("About to execvp(\"%s\",\n", prg
);
119 for ( i
=0; argv
[i
]; ++i
)
121 LOG_PROG(" argv[%d] == \"%s\"\n", i
, argv
[i
]);
123 LOG_PROG(" argv[%d] == NULL\n\n", i
);
125 /* Replace our current process image with the new one. */
126 execvp(prg
, (char** const)argv
);
128 /* Any return is an error. */
130 LOG_ER("The execvp() function returned, exiting.\n");
131 exit(EXIT_FAILURE
); /* Should never get to this anyway. */
135 /* Start the external processes, then loop to restart any program
138 static void process_model(const char* argv
[])
141 int waiting_pid
= -1;
142 int queue_to_journal_pid
= -1;
143 int xport_to_queue_pid
[NREADERS_MAX
];
145 for ( i
=0; i
<NREADERS_MAX
; ++i
)
147 xport_to_queue_pid
[i
] = -1;
154 if ((-1 == queue_to_journal_pid
) || (waiting_pid
== queue_to_journal_pid
))
156 queue_to_journal_pid
= start_program("queue_to_journal", argv
);
159 for ( i
=0; i
<arg_nreaders
; ++i
)
161 if ((-1 == xport_to_queue_pid
[i
]) ||
162 (waiting_pid
== xport_to_queue_pid
[i
]))
164 xport_to_queue_pid
[i
] = start_program("xport_to_queue", argv
);
168 /* Wait and restart any program that exits. */
169 waiting_pid
= wait(&status
);
171 /* If a signal interrupts the wait, we loop. */
172 if ( (waiting_pid
< 0) && (errno
== EINTR
) )
174 /* If we get a rotate, send it along to the process
175 * that writes journals. */
178 if ( -1 != queue_to_journal_pid
)
180 LOG_PROG("Sending SIGHUP to queue_to_journal[%d] to "
181 "trigger log rotate.\n", queue_to_journal_pid
);
182 kill(queue_to_journal_pid
, SIGHUP
);
189 if ( WIFEXITED(status
) )
194 if ( WIFSIGNALED(status
) )
196 /* If any child process exits abnormally, log it and restart. */
197 const char* program
= "unknown";
199 if ( waiting_pid
== queue_to_journal_pid
)
201 program
= "queue_to_journal";
204 for ( i
=0; i
<arg_nreaders
; ++i
)
206 if ( waiting_pid
== xport_to_queue_pid
[i
] )
208 program
= "xport_to_queue";
212 log_msg(LOG_MASK_WARNING
, __FILE__
, __LINE__
,
213 "Our %s process exited (pid=%d) with "
214 "signal %d, restarting.\n",
215 program
, waiting_pid
, WTERMSIG(status
));
219 LOG_INF("Shutting down.");
221 /* We give the children a chance to run before we kill them. */
224 /* We are done, send a quit signal to our children. */
226 while ( wait(0) > 0 ) /* Wait for children. */
228 if ( errno
!= ECHILD
&& errno
!= EINTR
)
236 static void thread_model()
239 pthread_t queue_to_journal_tid
;
240 pthread_t xport_to_queue_tid
[NREADERS_MAX
];
242 static pthread_attr_t m_pthread_attr
;
243 // Start thread attributes with pthread FIFO policy (default would be OTHER)
244 pthread_attr_init(&m_pthread_attr
) ;
245 pthread_attr_setdetachstate(&m_pthread_attr
, PTHREAD_CREATE_JOINABLE
);
248 pthread_attr_setschedpolicy(&m_pthread_attr
, SCHED_FIFO
) ;
251 if ( pthread_create(&queue_to_journal_tid
,
252 &m_pthread_attr
, queue_to_journal
, NULL
) )
254 PERROR("pthread_create(&queue_to_journal_tid, NULL, queue_to_journal, NULL)");
256 LOG_PROG("queue_to_journal_tid == %d\n", queue_to_journal_tid
);
258 for ( i
=0; i
<arg_nreaders
; ++i
)
260 if ( pthread_create(&xport_to_queue_tid
[i
], &m_pthread_attr
,
261 xport_to_queue
, NULL
) )
263 PERROR("pthread_create(&xport_to_queue_tid[i], NULL, xport_to_queue, NULL)");
267 pthread_attr_getschedpolicy((pthread_attr_t
*)&m_pthread_attr
,
269 switch ( schedpolicy
)
271 case SCHED_OTHER
: LOG_ER("pthread_policy=SCHED_OTHER\n") ; break ;
272 case SCHED_FIFO
: LOG_ER("pthread_policy=SCHED_FIFO") ; break ;
273 case SCHED_RR
: LOG_ER("pthread_policy=SCHED_RR") ; break ;
274 //case SCHED_MIN = SCHED_OTHER,
275 //case SCHED_MAX = SCHED_RR
276 default: LOG_ER("pthread_policy=unknown") ; break ;
279 LOG_PROG("xport_to_queue_tid[%d] == %d\n",
280 i
, xport_to_queue_tid
[i
]);
289 LOG_INF("Shutting down.");
291 /* All threads share a single gbl_done, so just interrupt them and
292 wait for them to finish on their own. */
294 for ( i
=0; i
<arg_nreaders
; ++i
)
296 LOG_PROG("Sending SIGQUIT to xport_to_queue thread %d (%d).\n",
297 i
, xport_to_queue_tid
);
298 if ( pthread_kill(xport_to_queue_tid
[i
], SIGQUIT
) )
300 PERROR("pthread_kill");
304 LOG_PROG("Sending SIGQUIT to queue_to_journal thread (%d).\n",
305 queue_to_journal_tid
);
306 if ( pthread_kill(queue_to_journal_tid
, SIGQUIT
) )
311 PERROR("pthread_kill");
313 case ESRCH
: /* If the thread has already terminated. */
319 for (i
=0; i
<arg_nreaders
; ++i
)
321 LOG_PROG("Joining xport_to_queue thread %d (%d).\n",
322 i
, xport_to_queue_tid
[i
]);
323 if ( pthread_join(xport_to_queue_tid
[i
], NULL
) )
325 PERROR("pthread_join");
329 LOG_PROG("Joining queue_to_journal thread (%d).\n", queue_to_journal_tid
);
330 if ( pthread_join(queue_to_journal_tid
, NULL
) )
332 PERROR("pthread_join");
335 pthread_attr_destroy(&m_pthread_attr
);
338 static void thread_model()
340 LOG_ER("No POSIX thread support on this platform.\n");
344 /* Return non-zero if addrstr is a multicast address. */
346 #include <netinet/in.h>
347 #include <arpa/inet.h>
349 static int is_multicast(const char* addrstr
)
353 if ( 0 == inet_aton(addrstr
, &addr
) )
358 return IN_MULTICAST(ntohl(addr
.s_addr
));
361 static void do_fork()
377 static void daemonize()
392 /* Close all open files. */
393 fdlimit
= sysconf (_SC_OPEN_MAX
);
394 for ( fd
=0; fd
<fdlimit
; ++fd
)
399 /* Open 0, 1 and 2. */
400 open("/dev/null", O_RDWR
);
406 /* Write the process ID of this process into a file so we may be
409 static void write_pid_file()
411 /* Write out the PID into the PID file. */
414 FILE* pidfp
= fopen(arg_pid_file
, "w");
417 LOG_ER("Can't open PID file \"%s\"\n", arg_pid_file
);
421 /* This obnoxiousness is to avoid compiler warnings that we'd get
422 compiling on systems with long or int pid_t. */
423 pid_t pid
= getpid();
424 if ( sizeof(pid_t
) == sizeof(long) )
427 fprintf(pidfp
, "%ld\n", x
);
432 fprintf(pidfp
, "%d\n", x
);
440 int main(int argc
, const char* argv
[])
442 char _buf
[100] ; // for log messages
445 process_options(argc
, argv
);
447 if ( ! arg_nodaemonize
)
452 strcpy(_progver
, "lwes-journaller-") ;
453 strcat(_progver
, ""VERSION
"") ;
455 strcpy(_buf
, "Initializing - ") ;
456 strcat(_buf
, _progver
) ;
460 install_signal_handlers();
465 if ( is_multicast(arg_ip
) )
467 if ( ! arg_join_group
)
469 LOG_WARN("Using multi-cast address without joining group.\n");
471 if ( arg_nreaders
> 1 )
473 LOG_WARN("Multiple reader threads can't be used with multi-cast "
474 "addresses, will use a single thread instead.\n");
480 if ( arg_join_group
)
482 LOG_WARN("Can't join group with uni-cast address.\n");
485 if ( arg_nreaders
> NREADERS_MAX
)
487 LOG_WARN("Too many reader threads specified (%d) reducing to %d.\n",
488 arg_nreaders
, NREADERS_MAX
);
489 arg_nreaders
= NREADERS_MAX
;
491 if ( arg_njournalls
> 30 )
493 LOG_WARN("suspiciously large (%d) number of journals.",
497 strcpy(_buf
, "Starting up - ") ;
498 strcat(_buf
, _progver
) ;
502 if ( strcmp(arg_proc_type
, ARG_PROCESS
) == 0 )
507 if ( strcmp(arg_proc_type
, ARG_THREAD
) == 0 )
512 strcpy(_buf
, "Normal shutdown complete - ") ;
513 strcat(_buf
, _progver
) ;