Must use (void) instead of () for function declarations.
[lwes-journaller.git] / src / xport_to_queue.c
blob95de40800ef0bb8c651973b3d0cb8d5b94868fc4
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
23 #include "xport_to_queue.h"
25 #include "header.h"
26 #include "opt.h"
27 #include "perror.h"
28 #include "queue.h"
29 #include "sig.h"
30 #include "xport.h"
31 #include "stats.h"
33 #include <fcntl.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <unistd.h>
38 static void skd();
40 void* xport_to_queue(void* arg)
42 struct xport xpt;
43 struct queue que;
45 unsigned char* buf = 0;
46 size_t bufsiz;
48 int read_errors = 0;
49 (void)arg; /* appease -Wall -Werror */
51 install_signal_handlers();
53 if ( arg_rt )
55 skd();
58 if ( (queue_factory(&que) < 0) || (que.vtbl->open(&que, O_WRONLY) < 0) )
60 LOG_ER("Failed to create or open queue object.\n");
61 exit(EXIT_FAILURE);
64 /* Can we drop root here? */
66 if ( (xport_factory(&xpt) < 0) || (xpt.vtbl->open(&xpt, O_RDONLY) < 0) )
68 LOG_ER("Failed to create xport object.\n");
69 exit(EXIT_FAILURE);
72 buf = (unsigned char*)que.vtbl->alloc(&que, &bufsiz);
73 if ( 0 == buf )
75 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz);
76 exit(EXIT_FAILURE);
78 memset(buf, 0, HEADER_LENGTH); /* Clear the header portion of the message. */
80 /* Read a packet from the transport, write it to the queue. */
81 while ( ! gbl_done )
83 int xpt_read_ret;
84 int que_write_ret;
86 unsigned long addr;
87 short port;
89 switch ( journaller_panic_mode )
91 case PANIC_STARTUP: // panic! system queue is max'd out (receiver live-lock?)
92 // close the socket
93 xpt.vtbl->close(&xpt) ;
94 journaller_panic_mode = PANIC_IN_EFFECT ;
95 case PANIC_IN_EFFECT: // waiting for system queue to empty
96 continue ; // socket is closed, so we can't read it anyway
97 case PANIC_SHUTDOWN: // panic is over
98 // reopen the socket
99 LOG_PROG("PANIC shutdown - reopening the multicast socket\n") ;
100 if ( (xport_factory(&xpt) < 0)
101 || (xpt.vtbl->open(&xpt, O_RDONLY) < 0) )
103 LOG_ER("Failed to reopen xport object after PANIC.\n");
104 exit(EXIT_FAILURE);
106 LOG_PROG("PANIC shutdown - opened the multicast socket\n") ;
107 journaller_panic_mode = PANIC_NOT ;
108 break ;
109 case PANIC_HURRYUP: // hurry-up mode
110 break ;
111 case PANIC_NOT:
112 break ;
115 if ( (xpt_read_ret = xpt.vtbl->read(&xpt,
116 buf + HEADER_LENGTH,
117 bufsiz - HEADER_LENGTH,
118 &addr, &port)) < 0 )
120 ++read_errors;
121 continue;
124 /* Return info about packet read. */
125 LOG_PROG("Read %d bytes\n", xpt_read_ret);
126 LOG_PROG("From %08lx:%04x.\n", addr, port&0xffff);
127 if ( read_errors )
129 LOG_PROG("read_errors == %d\n", read_errors);
131 read_errors = 0;
133 // if hurry-up, discard non-revenue-bearing events
134 if ( (journaller_panic_mode == PANIC_HURRYUP)
135 && non_revenue_bearing(buf) )
137 continue ;
140 header_add(buf, xpt_read_ret, addr, port);
142 if ( (que_write_ret = que.vtbl->write(&que,
143 buf,
144 xpt_read_ret + HEADER_LENGTH)) < 0 )
146 LOG_ER("Queue write error attempting to write %d bytes.\n",
147 xpt_read_ret + HEADER_LENGTH);
148 continue;
150 else
152 LOG_PROG("Queue write of %d bytes.\n",
153 xpt_read_ret + HEADER_LENGTH);
156 que.vtbl->dealloc(&que, buf);
158 xpt.vtbl->destructor(&xpt);
159 que.vtbl->destructor(&que);
161 return 0;
164 #if HAVE_SCHED_H
165 #include <sched.h>
167 static void skd()
169 struct sched_param sp;
171 sp.sched_priority = sched_get_priority_max(SCHED_FIFO);
172 if ( sched_setscheduler(0, SCHED_FIFO, &sp) )
174 PERROR("sched_setscheduler");
175 LOG_WARN("Increasing thread priority failed"
176 ", will run with standard priorities");
178 else
180 LOG_WARN("Running with FIFO priority");
183 #else
184 static void skd()
186 LOG_WARN("No real-time scheduler support on this platform.\n");
188 #endif