1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
23 #include "xport_to_queue.h"
40 void* xport_to_queue(void* arg
)
45 unsigned char* buf
= 0;
49 (void)arg
; /* appease -Wall -Werror */
51 install_signal_handlers();
58 if ( (queue_factory(&que
) < 0) || (que
.vtbl
->open(&que
, O_WRONLY
) < 0) )
60 LOG_ER("Failed to create or open queue object.\n");
64 /* Can we drop root here? */
66 if ( (xport_factory(&xpt
) < 0) || (xpt
.vtbl
->open(&xpt
, O_RDONLY
) < 0) )
68 LOG_ER("Failed to create xport object.\n");
72 buf
= (unsigned char*)que
.vtbl
->alloc(&que
, &bufsiz
);
75 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz
);
78 memset(buf
, 0, HEADER_LENGTH
); /* Clear the header portion of the message. */
80 /* Read a packet from the transport, write it to the queue. */
89 switch ( journaller_panic_mode
)
91 case PANIC_STARTUP
: // panic! system queue is max'd out (receiver live-lock?)
93 xpt
.vtbl
->close(&xpt
) ;
94 journaller_panic_mode
= PANIC_IN_EFFECT
;
95 case PANIC_IN_EFFECT
: // waiting for system queue to empty
96 continue ; // socket is closed, so we can't read it anyway
97 case PANIC_SHUTDOWN
: // panic is over
99 LOG_PROG("PANIC shutdown - reopening the multicast socket\n") ;
100 if ( (xport_factory(&xpt
) < 0)
101 || (xpt
.vtbl
->open(&xpt
, O_RDONLY
) < 0) )
103 LOG_ER("Failed to reopen xport object after PANIC.\n");
106 LOG_PROG("PANIC shutdown - opened the multicast socket\n") ;
107 journaller_panic_mode
= PANIC_NOT
;
109 case PANIC_HURRYUP
: // hurry-up mode
115 if ( (xpt_read_ret
= xpt
.vtbl
->read(&xpt
,
117 bufsiz
- HEADER_LENGTH
,
124 /* Return info about packet read. */
125 LOG_PROG("Read %d bytes\n", xpt_read_ret
);
126 LOG_PROG("From %08lx:%04x.\n", addr
, port
&0xffff);
129 LOG_PROG("read_errors == %d\n", read_errors
);
133 // if hurry-up, discard non-revenue-bearing events
134 if ( (journaller_panic_mode
== PANIC_HURRYUP
)
135 && non_revenue_bearing(buf
) )
140 header_add(buf
, xpt_read_ret
, addr
, port
);
142 if ( (que_write_ret
= que
.vtbl
->write(&que
,
144 xpt_read_ret
+ HEADER_LENGTH
)) < 0 )
146 LOG_ER("Queue write error attempting to write %d bytes.\n",
147 xpt_read_ret
+ HEADER_LENGTH
);
152 LOG_PROG("Queue write of %d bytes.\n",
153 xpt_read_ret
+ HEADER_LENGTH
);
156 que
.vtbl
->dealloc(&que
, buf
);
158 xpt
.vtbl
->destructor(&xpt
);
159 que
.vtbl
->destructor(&que
);
169 struct sched_param sp
;
171 sp
.sched_priority
= sched_get_priority_max(SCHED_FIFO
);
172 if ( sched_setscheduler(0, SCHED_FIFO
, &sp
) )
174 PERROR("sched_setscheduler");
175 LOG_WARN("Increasing thread priority failed"
176 ", will run with standard priorities");
180 LOG_WARN("Running with FIFO priority");
186 LOG_WARN("No real-time scheduler support on this platform.\n");