1 /*======================================================================*
2 * Copyright (c) 2008, Yahoo! Inc. All rights reserved. *
4 * Licensed under the New BSD License (the "License"); you may not use *
5 * this file except in compliance with the License. Unless required *
6 * by applicable law or agreed to in writing, software distributed *
7 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
8 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
9 * See the License for the specific language governing permissions and *
10 * limitations under the License. See accompanying LICENSE file. *
11 *======================================================================*/
15 #include "xport_to_queue.h"
30 static void skd(void);
32 struct enqueuer_stats est
;
34 void* xport_to_queue(void* arg
)
39 unsigned char* buf
= 0;
42 (void)arg
; /* appease -Wall -Werror */
44 install_signal_handlers();
51 if ( (queue_factory(&que
) < 0) || (que
.vtbl
->open(&que
, O_WRONLY
) < 0) )
53 LOG_ER("Failed to create or open queue object.\n");
57 /* Can we drop root here? */
59 if ( (xport_factory(&xpt
) < 0) || (xpt
.vtbl
->open(&xpt
, O_RDONLY
) < 0) )
61 LOG_ER("Failed to create xport object.\n");
65 buf
= (unsigned char*)que
.vtbl
->alloc(&que
, &bufsiz
);
68 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz
);
71 memset(buf
, 0, HEADER_LENGTH
); /* Clear the header portion of the message. */
73 /* Read a packet from the transport, write it to the queue. */
79 unsigned long long tm
;
83 if ( (xpt_read_ret
= xpt
.vtbl
->read(&xpt
,
85 bufsiz
- HEADER_LENGTH
,
88 enqueuer_stats_record_socket_error(&est
);
91 tm
= time_in_milliseconds();
93 enqueuer_stats_record_datagram(&est
,xpt_read_ret
);
95 /* Return info about packet read. */
96 LOG_PROG("Read %d bytes\n", xpt_read_ret
);
97 LOG_PROG("From %08lx:%04x.\n", addr
, port
&0xffff);
99 header_add(buf
, xpt_read_ret
, tm
, addr
, port
);
101 if ( header_is_rotate (buf
) )
102 { // Command::Rotate: here we just collect some stats.
103 enqueuer_stats_rotate(&est
);
106 if ( (que_write_ret
= que
.vtbl
->write(&que
,
108 xpt_read_ret
+ HEADER_LENGTH
)) < 0 )
110 LOG_ER("Queue write error attempting to write %d bytes.\n",
111 xpt_read_ret
+ HEADER_LENGTH
);
116 LOG_PROG("Queue write of %d bytes.\n",
117 xpt_read_ret
+ HEADER_LENGTH
);
120 que
.vtbl
->dealloc(&que
, buf
);
122 xpt
.vtbl
->destructor(&xpt
);
123 que
.vtbl
->destructor(&que
);
125 enqueuer_stats_report(&est
);
135 struct sched_param sp
;
137 sp
.sched_priority
= sched_get_priority_max(SCHED_FIFO
);
138 if ( sched_setscheduler(0, SCHED_FIFO
, &sp
) )
140 PERROR("sched_setscheduler");
141 LOG_WARN("Increasing thread priority failed"
142 ", will run with standard priorities");
146 LOG_WARN("Running with FIFO priority");
152 LOG_WARN("No real-time scheduler support on this platform.\n");