fix the SO_REUSEPORT check in configure.ac
[lwes-journaller/github-mirror.git] / src / serial_model.c
blob35de97d0927ad2dd8b8c98302e6be479ef4c4f74
1 /*======================================================================*
2 * Copyright (c) 2010, OpenX Inc. All rights reserved. *
3 * Copyright (c) 2010-2016, OpenX Inc. All rights reserved. *
4 * *
5 * Licensed under the New BSD License (the "License"); you may not use *
6 * this file except in compliance with the License. Unless required *
7 * by applicable law or agreed to in writing, software distributed *
8 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
9 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
10 * See the License for the specific language governing permissions and *
11 * limitations under the License. See accompanying LICENSE file. *
12 *======================================================================*/
14 #include "config.h"
16 #include "journal.h"
17 #include "log.h"
18 #include "opt.h"
19 #include "serial_model.h"
20 #include "sig.h"
21 #include "stats.h"
22 #include "time_utils.h"
23 #include "xport.h"
25 #include <fcntl.h>
26 #include <lwes.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
31 #define BUFLEN (65535)
32 #define SERIAL_DEPTH_COMMAND ("\026Internal::Queue::Depth")
34 struct xport xpt;
35 unsigned char buf[BUFLEN];
36 int buflen;
37 struct journal jrn;
38 struct enqueuer_stats est;
39 struct dequeuer_stats dst;
40 unsigned long long tm;
41 struct lwes_emitter* emitter = NULL;
42 /* these are used for doing a depth test, which every depth_dtm milliseconds
43 * sends an event SERIAL_DEPTH_COMMAND to this journaller, and later takes
44 * that and uses it to estimate how many events were in the operating system
45 * queue
47 unsigned long long depth_tm = 0;
48 unsigned long long depth_dtm = 10000;
49 /* count is the total number of events received by this journaller */
50 unsigned long long count = 0;
51 /* pending is the number of events received between sending and receiving
52 * a queue depth test event. It represents the number of events in
53 * the recv buffer of the operating system.
55 unsigned long long pending = 0;
57 static void serial_open_journal(FILE *log);
59 static void serial_ctor(FILE *log)
61 install_termination_signal_handlers(log);
62 install_rotate_signal_handlers(log);
63 install_interval_rotate_handlers(log, 1);
64 install_log_rotate_signal_handlers(log, 1, SIGUSR1);
66 depth_dtm = arg_queue_test_interval;
68 if ( enqueuer_stats_ctor (&est) < 0 )
70 LOG_ER(log, "Failed to create initialize enqueuer stats.\n");
71 exit(EXIT_FAILURE);
74 if ( dequeuer_stats_ctor (&dst) < 0 )
76 LOG_ER(log, "Failed to create initialize dequeuer stats.\n");
77 exit(EXIT_FAILURE);
80 memset(buf, 0, BUFLEN); /* Clear the message. */
82 if ( (xport_factory(&xpt, log) < 0) || (xpt.vtbl->open(&xpt, O_RDONLY) < 0) )
84 LOG_ER(log, "Failed to create xport object.\n");
85 exit(EXIT_FAILURE);
88 if (arg_njournalls != 1)
90 LOG_ER(log, "Expected 1 journal name pattern, but found %d\n", arg_njournalls);
91 exit(EXIT_FAILURE);
94 /* Create journal object. */
95 if (journal_factory(&jrn, arg_journalls[0], log) < 0)
97 LOG_ER(log, "Failed to create journal object for \"%s\".\n", arg_journalls[0]);
98 exit(EXIT_FAILURE);
101 serial_open_journal(log);
103 emitter = lwes_emitter_create( (LWES_CONST_SHORT_STRING) arg_ip,
104 (LWES_CONST_SHORT_STRING) NULL, //arg_interface,
105 (LWES_U_INT_32) arg_port, 0, 60 );
108 static void serial_open_journal(FILE *log)
110 if (jrn.vtbl->open(&jrn, O_WRONLY, log) < 0)
112 LOG_ER(log, "Failed to open the journal \"%s\".\n", arg_journalls[0]);
113 exit(EXIT_FAILURE);
117 static void serial_close_journal(int is_rotate_event, FILE *log)
119 if (gbl_done || gbl_rotate_enqueue || is_rotate_event)
121 enqueuer_stats_rotate(&est, log);
122 if (!gbl_done)
124 enqueuer_stats_flush (&est);
126 CAS_OFF(gbl_rotate_enqueue);
129 if (gbl_done || gbl_rotate_dequeue || is_rotate_event)
131 dequeuer_stats_rotate (&dst, log);
132 if (!gbl_done)
134 dequeuer_stats_flush (&dst);
136 CAS_OFF(gbl_rotate_dequeue);
139 if (jrn.vtbl->close(&jrn, log) < 0) {
140 LOG_ER(log, "Can't close journal \"%s\".\n", arg_journalls[0]);
141 exit(EXIT_FAILURE);
145 static void serial_rotate(int is_rotate_event, FILE *log)
147 serial_close_journal(is_rotate_event, log);
148 serial_open_journal(log);
151 static int serial_read(void)
153 unsigned long addr;
154 short port;
156 /* 0 out part of the the event name so if we get a rotate event the program
157 * will not continually rotate */
158 memset(buf, 0, HEADER_LENGTH+20);
160 int xpt_read_ret =
161 xpt.vtbl->read(&xpt, buf+HEADER_LENGTH, BUFLEN-HEADER_LENGTH, &addr, &port);
163 if (xpt_read_ret >= 0)
165 tm = millis_now ();
166 buflen = xpt_read_ret + HEADER_LENGTH;
167 enqueuer_stats_record_datagram(&est, buflen);
168 header_add(buf, xpt_read_ret, tm, addr, port);
169 ++count;
170 return 0;
172 else if (xpt_read_ret == XPORT_INTR)
174 return XPORT_INTR; /* return something special if interrupted
175 (we might be rotating via a signal or
176 shutting down */
178 else
180 enqueuer_stats_record_socket_error(&est);
181 return -1;
185 static int serial_handle_depth_test()
187 /* check for the serial depth event */
188 if (toknam_eq((unsigned char *)buf + HEADER_LENGTH,
189 (unsigned char *)SERIAL_DEPTH_COMMAND))
191 struct lwes_event_deserialize_tmp event_tmp;
192 int bytes_read;
193 struct lwes_event* event = lwes_event_create_no_name(NULL);
195 /* don't include these internal events in our stats */
196 enqueuer_stats_erase_datagram(&est, buflen);
198 /* we should have one, so deserialize it */
199 bytes_read = lwes_event_from_bytes(event,
200 (LWES_BYTE_P)&buf[HEADER_LENGTH],
201 buflen-HEADER_LENGTH,
202 0, &event_tmp);
203 if (bytes_read != buflen-HEADER_LENGTH)
205 LOG_ER(NULL, "Only able to read %d bytes; expected %d\n", bytes_read, buflen);
207 else
209 /* get the previous count */
210 LWES_U_INT_64 previous_count;
211 lwes_event_get_U_INT_64(event, "count", &previous_count);
212 /* determine the difference and remove one for this special event */
213 pending = count - previous_count - 1;
214 LOG_INF(NULL, "Depth test reports a buffer length of %lld events.\n", pending);
217 lwes_event_destroy(event);
218 return 1;
220 else
222 return 0;
226 /* this code attempts to determine if there are any events in the operating
227 * system queue by sending an event with the current received count
228 * and later checking it
230 static void serial_send_buffer_depth_test(void)
232 struct lwes_event *event =
233 lwes_event_create( (struct lwes_event_type_db *) NULL,
234 (LWES_SHORT_STRING) SERIAL_DEPTH_COMMAND+1);
235 if (event == NULL) return;
236 if (lwes_event_set_U_INT_64(event,"count",count) < 0)
238 LOG_ER(NULL, "Unable to add count to depth event");
240 else
242 lwes_emitter_emitto((char*) "127.0.0.1", NULL, arg_port, emitter, event);
244 lwes_event_destroy(event);
246 depth_tm = millis_now ();
249 static void serial_write(void)
251 /* Write the packet out to the journal. */
252 int jrn_write_ret = jrn.vtbl->write(&jrn, buf, buflen);
254 if (jrn_write_ret != buflen)
256 LOG_ER(NULL, "Journal write error -- attempted to write %d bytes, "
257 "write returned %d.\n", buflen, jrn_write_ret);
258 dequeuer_stats_record_loss(&dst);
261 dequeuer_stats_record(&dst, buflen, pending);
264 static void serial_dtor(FILE *log)
266 serial_close_journal(0, log);
267 lwes_emitter_destroy(emitter);
268 xpt.vtbl->destructor(&xpt);
269 jrn.vtbl->destructor(&jrn, log);
270 enqueuer_stats_dtor(&est);
271 dequeuer_stats_dtor(&dst);
274 void serial_model(FILE *log)
276 serial_ctor(log);
278 do {
279 int is_rotate_event = 0;
280 int read_ret = serial_read();
281 /* -1 is an error we don't deal with, so just skip out of the loop */
282 if (read_ret == -1) continue;
283 /* depth tests are not written to the journal */
284 if (serial_handle_depth_test()) continue;
285 /* XPORT_INTR from read means we were interrupted and should not
286 * write, so write when we are not interrupted, this is for backward
287 * compatibility when we didn't do rotation signals correctly here
289 if (read_ret != XPORT_INTR ) serial_write();
290 /* check for rotation event, or signal's and rotate if necessary */
291 if (header_is_rotate(buf)) {
292 memcpy(&dst.latest_rotate_header, buf, HEADER_LENGTH) ;
293 dst.rotation_type = LJ_RT_EVENT;
294 is_rotate_event = 1;
296 if (is_rotate_event || gbl_rotate_dequeue || gbl_rotate_enqueue) {
297 serial_rotate(is_rotate_event, log);
298 is_rotate_event = 0;
300 if (gbl_rotate_main_log) {
301 log = get_log (log);
303 /* maybe send depth test */
304 if (tm >= depth_tm + depth_dtm) serial_send_buffer_depth_test();
306 while (!gbl_done);
308 serial_dtor (log);