include headers in distribution
[lwes-journaller.git] / src / xport_to_queue.c
blob24c45f7d2a726e0170b761f07a5ab3c60ddf1a97
1 /*======================================================================*
2 * Copyright (c) 2008, Yahoo! Inc. All rights reserved. *
3 * *
4 * Licensed under the New BSD License (the "License"); you may not use *
5 * this file except in compliance with the License. Unless required *
6 * by applicable law or agreed to in writing, software distributed *
7 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
8 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
9 * See the License for the specific language governing permissions and *
10 * limitations under the License. See accompanying LICENSE file. *
11 *======================================================================*/
13 #include "config.h"
15 #include "xport_to_queue.h"
17 #include "header.h"
18 #include "opt.h"
19 #include "perror.h"
20 #include "queue.h"
21 #include "sig.h"
22 #include "xport.h"
23 #include "stats.h"
25 #include <fcntl.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <unistd.h>
30 static void skd(void);
32 struct enqueuer_stats est ;
34 void* xport_to_queue(void* arg)
36 struct xport xpt;
37 struct queue que;
39 unsigned char* buf = 0;
40 size_t bufsiz;
42 (void)arg; /* appease -Wall -Werror */
44 install_signal_handlers();
46 if ( arg_rt )
48 skd();
51 if ( (queue_factory(&que) < 0) || (que.vtbl->open(&que, O_WRONLY) < 0) )
53 LOG_ER("Failed to create or open queue object.\n");
54 exit(EXIT_FAILURE);
57 /* Can we drop root here? */
59 if ( (xport_factory(&xpt) < 0) || (xpt.vtbl->open(&xpt, O_RDONLY) < 0) )
61 LOG_ER("Failed to create xport object.\n");
62 exit(EXIT_FAILURE);
65 buf = (unsigned char*)que.vtbl->alloc(&que, &bufsiz);
66 if ( 0 == buf )
68 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz);
69 exit(EXIT_FAILURE);
71 memset(buf, 0, HEADER_LENGTH); /* Clear the header portion of the message. */
73 /* Read a packet from the transport, write it to the queue. */
74 while ( ! gbl_done )
76 int xpt_read_ret;
77 int que_write_ret;
79 unsigned long long tm;
80 unsigned long addr;
81 short port;
83 if ( (xpt_read_ret = xpt.vtbl->read(&xpt,
84 buf + HEADER_LENGTH,
85 bufsiz - HEADER_LENGTH,
86 &addr, &port)) < 0 )
88 enqueuer_stats_record_socket_error(&est);
89 continue;
91 tm = time_in_milliseconds();
93 enqueuer_stats_record_datagram(&est,xpt_read_ret);
95 /* Return info about packet read. */
96 LOG_PROG("Read %d bytes\n", xpt_read_ret);
97 LOG_PROG("From %08lx:%04x.\n", addr, port&0xffff);
99 header_add(buf, xpt_read_ret, tm, addr, port);
101 if ( header_is_rotate (buf) )
102 { // Command::Rotate: here we just collect some stats.
103 enqueuer_stats_rotate(&est);
106 if ( (que_write_ret = que.vtbl->write(&que,
107 buf,
108 xpt_read_ret + HEADER_LENGTH)) < 0 )
110 LOG_ER("Queue write error attempting to write %d bytes.\n",
111 xpt_read_ret + HEADER_LENGTH);
112 continue;
114 else
116 LOG_PROG("Queue write of %d bytes.\n",
117 xpt_read_ret + HEADER_LENGTH);
120 que.vtbl->dealloc(&que, buf);
122 xpt.vtbl->destructor(&xpt);
123 que.vtbl->destructor(&que);
125 enqueuer_stats_report(&est);
127 return 0;
130 #if HAVE_SCHED_H
131 #include <sched.h>
133 static void skd()
135 struct sched_param sp;
137 sp.sched_priority = sched_get_priority_max(SCHED_FIFO);
138 if ( sched_setscheduler(0, SCHED_FIFO, &sp) )
140 PERROR("sched_setscheduler");
141 LOG_WARN("Increasing thread priority failed"
142 ", will run with standard priorities");
144 else
146 LOG_WARN("Running with FIFO priority");
149 #else
150 static void skd()
152 LOG_WARN("No real-time scheduler support on this platform.\n");
154 #endif