1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
23 #include "xport_to_queue.h"
38 static void skd(void);
40 struct enqueuer_stats est
;
42 void* xport_to_queue(void* arg
)
47 unsigned char* buf
= 0;
51 (void)arg
; /* appease -Wall -Werror */
53 install_signal_handlers();
60 if ( (queue_factory(&que
) < 0) || (que
.vtbl
->open(&que
, O_WRONLY
) < 0) )
62 LOG_ER("Failed to create or open queue object.\n");
66 /* Can we drop root here? */
68 if ( (xport_factory(&xpt
) < 0) || (xpt
.vtbl
->open(&xpt
, O_RDONLY
) < 0) )
70 LOG_ER("Failed to create xport object.\n");
74 buf
= (unsigned char*)que
.vtbl
->alloc(&que
, &bufsiz
);
77 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz
);
80 memset(buf
, 0, HEADER_LENGTH
); /* Clear the header portion of the message. */
82 /* Read a packet from the transport, write it to the queue. */
91 if ( (xpt_read_ret
= xpt
.vtbl
->read(&xpt
,
93 bufsiz
- HEADER_LENGTH
,
97 enqueuer_stats_record_socket_error(&est
);
101 enqueuer_stats_record_datagram(&est
,xpt_read_ret
);
103 /* Return info about packet read. */
104 LOG_PROG("Read %d bytes\n", xpt_read_ret
);
105 LOG_PROG("From %08lx:%04x.\n", addr
, port
&0xffff);
108 LOG_PROG("read_errors == %d\n", read_errors
);
112 header_add(buf
, xpt_read_ret
, addr
, port
);
114 if ( header_is_rotate (buf
) )
115 { // Command::Rotate: here we just collect some stats.
116 enqueuer_stats_rotate(&est
);
119 if ( (que_write_ret
= que
.vtbl
->write(&que
,
121 xpt_read_ret
+ HEADER_LENGTH
)) < 0 )
123 LOG_ER("Queue write error attempting to write %d bytes.\n",
124 xpt_read_ret
+ HEADER_LENGTH
);
129 LOG_PROG("Queue write of %d bytes.\n",
130 xpt_read_ret
+ HEADER_LENGTH
);
133 que
.vtbl
->dealloc(&que
, buf
);
135 xpt
.vtbl
->destructor(&xpt
);
136 que
.vtbl
->destructor(&que
);
138 enqueuer_stats_report(&est
);
148 struct sched_param sp
;
150 sp
.sched_priority
= sched_get_priority_max(SCHED_FIFO
);
151 if ( sched_setscheduler(0, SCHED_FIFO
, &sp
) )
153 PERROR("sched_setscheduler");
154 LOG_WARN("Increasing thread priority failed"
155 ", will run with standard priorities");
159 LOG_WARN("Running with FIFO priority");
165 LOG_WARN("No real-time scheduler support on this platform.\n");