Finishing the total time counters
[lwes-journaller.git] / src / xport_to_queue.c
blob7e36db866673698ba47abf5eb6022a143d75c39e
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
23 #include "xport_to_queue.h"
25 #include "header.h"
26 #include "opt.h"
27 #include "perror.h"
28 #include "queue.h"
29 #include "sig.h"
30 #include "xport.h"
31 #include "stats.h"
33 #include <fcntl.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <unistd.h>
38 static void skd(void);
40 struct enqueuer_stats est ;
42 void* xport_to_queue(void* arg)
44 struct xport xpt;
45 struct queue que;
47 unsigned char* buf = 0;
48 size_t bufsiz;
50 int read_errors = 0;
51 (void)arg; /* appease -Wall -Werror */
53 install_signal_handlers();
55 if ( arg_rt )
57 skd();
60 if ( (queue_factory(&que) < 0) || (que.vtbl->open(&que, O_WRONLY) < 0) )
62 LOG_ER("Failed to create or open queue object.\n");
63 exit(EXIT_FAILURE);
66 /* Can we drop root here? */
68 if ( (xport_factory(&xpt) < 0) || (xpt.vtbl->open(&xpt, O_RDONLY) < 0) )
70 LOG_ER("Failed to create xport object.\n");
71 exit(EXIT_FAILURE);
74 buf = (unsigned char*)que.vtbl->alloc(&que, &bufsiz);
75 if ( 0 == buf )
77 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz);
78 exit(EXIT_FAILURE);
80 memset(buf, 0, HEADER_LENGTH); /* Clear the header portion of the message. */
82 /* Read a packet from the transport, write it to the queue. */
83 while ( ! gbl_done )
85 int xpt_read_ret;
86 int que_write_ret;
88 unsigned long addr;
89 short port;
91 if ( (xpt_read_ret = xpt.vtbl->read(&xpt,
92 buf + HEADER_LENGTH,
93 bufsiz - HEADER_LENGTH,
94 &addr, &port)) < 0 )
96 ++read_errors;
97 enqueuer_stats_record_socket_error(&est);
98 continue;
101 enqueuer_stats_record_datagram(&est,xpt_read_ret);
103 /* Return info about packet read. */
104 LOG_PROG("Read %d bytes\n", xpt_read_ret);
105 LOG_PROG("From %08lx:%04x.\n", addr, port&0xffff);
106 if ( read_errors )
108 LOG_PROG("read_errors == %d\n", read_errors);
110 read_errors = 0;
112 header_add(buf, xpt_read_ret, addr, port);
114 if ( header_is_rotate (buf) )
115 { // Command::Rotate: here we just collect some stats.
116 enqueuer_stats_rotate(&est);
119 if ( (que_write_ret = que.vtbl->write(&que,
120 buf,
121 xpt_read_ret + HEADER_LENGTH)) < 0 )
123 LOG_ER("Queue write error attempting to write %d bytes.\n",
124 xpt_read_ret + HEADER_LENGTH);
125 continue;
127 else
129 LOG_PROG("Queue write of %d bytes.\n",
130 xpt_read_ret + HEADER_LENGTH);
133 que.vtbl->dealloc(&que, buf);
135 xpt.vtbl->destructor(&xpt);
136 que.vtbl->destructor(&que);
138 enqueuer_stats_report(&est);
140 return 0;
143 #if HAVE_SCHED_H
144 #include <sched.h>
146 static void skd()
148 struct sched_param sp;
150 sp.sched_priority = sched_get_priority_max(SCHED_FIFO);
151 if ( sched_setscheduler(0, SCHED_FIFO, &sp) )
153 PERROR("sched_setscheduler");
154 LOG_WARN("Increasing thread priority failed"
155 ", will run with standard priorities");
157 else
159 LOG_WARN("Running with FIFO priority");
162 #else
163 static void skd()
165 LOG_WARN("No real-time scheduler support on this platform.\n");
167 #endif