valgrind memcheck is as clean as I can make it
[lwes-journaller/github-mirror.git] / src / serial_model.c
blobf1b2a09e9b58afb0a17d102770da00db9759364a
1 /*======================================================================*
2 * Copyright (c) 2010, OpenX Inc. All rights reserved. *
3 * Copyright (c) 2010-2016, OpenX Inc. All rights reserved. *
4 * *
5 * Licensed under the New BSD License (the "License"); you may not use *
6 * this file except in compliance with the License. Unless required *
7 * by applicable law or agreed to in writing, software distributed *
8 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
9 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
10 * See the License for the specific language governing permissions and *
11 * limitations under the License. See accompanying LICENSE file. *
12 *======================================================================*/
14 #include "config.h"
16 #include "journal.h"
17 #include "log.h"
18 #include "opt.h"
19 #include "serial_model.h"
20 #include "sig.h"
21 #include "stats.h"
22 #include "xport.h"
24 #include <fcntl.h>
25 #include <lwes.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
30 #define BUFLEN (65535)
31 #define SERIAL_DEPTH_COMMAND ("\026Internal::Queue::Depth")
33 struct xport xpt;
34 unsigned char buf[BUFLEN];
35 int buflen;
36 struct journal jrn;
37 struct enqueuer_stats est;
38 struct dequeuer_stats dst;
39 unsigned long long tm;
40 struct lwes_emitter* emitter = NULL;
41 /* these are used for doing a depth test, which every depth_dtm milliseconds
42 * sends an event SERIAL_DEPTH_COMMAND to this journaller, and later takes
43 * that and uses it to estimate how many events were in the operating system
44 * queue
46 unsigned long long depth_tm = 0;
47 unsigned long long depth_dtm = 10000;
48 /* count is the total number of events received by this journaller */
49 unsigned long long count = 0;
50 /* pending is the number of events received between sending and receiving
51 * a queue depth test event. It represents the number of events in
52 * the recv buffer of the operating system.
54 unsigned long long pending = 0;
56 static void serial_open_journal(void);
58 static void serial_ctor(void)
60 install_signal_handlers();
61 install_rotate_signal_handlers();
62 install_interval_rotate_handlers(1);
64 depth_dtm = arg_queue_test_interval;
66 if ( enqueuer_stats_ctor (&est) < 0 )
68 LOG_ER("Failed to create initialize enqueuer stats.\n");
69 exit(EXIT_FAILURE);
72 if ( dequeuer_stats_ctor (&dst) < 0 )
74 LOG_ER("Failed to create initialize dequeuer stats.\n");
75 exit(EXIT_FAILURE);
78 memset(buf, 0, BUFLEN); /* Clear the message. */
80 if ( (xport_factory(&xpt) < 0) || (xpt.vtbl->open(&xpt, O_RDONLY) < 0) )
82 LOG_ER("Failed to create xport object.\n");
83 exit(EXIT_FAILURE);
86 if (arg_njournalls != 1)
88 LOG_ER("Expected 1 journal name pattern, but found %d\n", arg_njournalls);
89 exit(EXIT_FAILURE);
92 /* Create journal object. */
93 if (journal_factory(&jrn, arg_journalls[0]) < 0)
95 LOG_ER("Failed to create journal object for \"%s\".\n", arg_journalls[0]);
96 exit(EXIT_FAILURE);
99 serial_open_journal();
101 emitter = lwes_emitter_create( (LWES_CONST_SHORT_STRING) arg_ip,
102 (LWES_CONST_SHORT_STRING) NULL, //arg_interface,
103 (LWES_U_INT_32) arg_port, 0, 60 );
106 static void serial_open_journal(void)
108 if (jrn.vtbl->open(&jrn, O_WRONLY) < 0)
110 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls[0]);
111 exit(EXIT_FAILURE);
115 static void serial_close_journal(void)
117 if (gbl_done || gbl_rotate_enqueue)
119 enqueuer_stats_rotate(&est);
120 enqueuer_stats_flush();
121 gbl_rotate_enqueue = 0;
124 if (gbl_done || gbl_rotate_dequeue)
126 dequeuer_stats_rotate(&dst);
127 dequeuer_stats_flush();
128 gbl_rotate_dequeue = 0;
131 if (jrn.vtbl->close(&jrn) < 0) {
132 LOG_ER("Can't close journal \"%s\".\n", arg_journalls[0]);
133 exit(EXIT_FAILURE);
137 static void serial_rotate(void)
139 serial_close_journal();
140 serial_open_journal();
143 static int serial_read(void)
145 unsigned long addr;
146 short port;
147 int xpt_read_ret =
148 xpt.vtbl->read(&xpt, buf+HEADER_LENGTH, BUFLEN-HEADER_LENGTH, &addr, &port);
150 if (xpt_read_ret >= 0)
152 tm = time_in_milliseconds();
153 buflen = xpt_read_ret + HEADER_LENGTH;
154 enqueuer_stats_record_datagram(&est, buflen);
155 header_add(buf, xpt_read_ret, tm, addr, port);
156 ++count;
157 return 0;
159 else if (xpt_read_ret == XPORT_INTR)
161 return XPORT_INTR; /* return something special if interrupted
162 (we might be rotating via a signal or
163 shutting down */
165 else
167 enqueuer_stats_record_socket_error(&est);
168 return -1;
172 static int serial_handle_depth_test()
174 /* check for the serial depth event */
175 if (toknam_eq((unsigned char *)buf + HEADER_LENGTH,
176 (unsigned char *)SERIAL_DEPTH_COMMAND))
178 struct lwes_event_deserialize_tmp event_tmp;
179 int bytes_read;
180 struct lwes_event* event = lwes_event_create_no_name(NULL);
182 /* don't include these internal events in our stats */
183 enqueuer_stats_erase_datagram(&est, buflen);
185 /* we should have one, so deserialize it */
186 bytes_read = lwes_event_from_bytes(event,
187 (LWES_BYTE_P)&buf[HEADER_LENGTH],
188 buflen-HEADER_LENGTH,
189 0, &event_tmp);
190 if (bytes_read != buflen-HEADER_LENGTH)
192 LOG_ER("Only able to read %d bytes; expected %d\n", bytes_read, buflen);
194 else
196 /* get the previous count */
197 LWES_U_INT_64 previous_count;
198 lwes_event_get_U_INT_64(event, "count", &previous_count);
199 /* determine the difference and remove one for this special event */
200 pending = count - previous_count - 1;
201 LOG_INF("Depth test reports a buffer length of %lld events.\n", pending);
204 lwes_event_destroy(event);
205 return 1;
207 else
209 return 0;
213 /* this code attempts to determine if there are any events in the operating
214 * system queue by sending an event with the current received count
215 * and later checking it
217 static void serial_send_buffer_depth_test(void)
219 struct lwes_event *event =
220 lwes_event_create( (struct lwes_event_type_db *) NULL,
221 (LWES_SHORT_STRING) SERIAL_DEPTH_COMMAND+1);
222 if (event == NULL) return;
223 if (lwes_event_set_U_INT_64(event,"count",count) < 0)
225 LOG_ER("Unable to add count to depth event");
227 else
229 lwes_emitter_emitto((char*) "127.0.0.1", NULL, arg_port, emitter, event);
231 lwes_event_destroy(event);
233 depth_tm = time_in_milliseconds();
236 static void serial_write(void)
238 /* Write the packet out to the journal. */
239 int jrn_write_ret = jrn.vtbl->write(&jrn, buf, buflen);
241 if (jrn_write_ret != buflen)
243 LOG_ER("Journal write error -- attempted to write %d bytes, "
244 "write returned %d.\n", buflen, jrn_write_ret);
245 dequeuer_stats_record_loss(&dst);
248 dequeuer_stats_record(&dst, buflen, pending);
251 static void serial_dtor(void)
253 serial_close_journal();
254 lwes_emitter_destroy(emitter);
255 xpt.vtbl->destructor(&xpt);
256 jrn.vtbl->destructor(&jrn);
257 enqueuer_stats_dtor(&est);
258 dequeuer_stats_dtor(&dst);
261 void serial_model(void)
263 serial_ctor();
265 do {
266 int is_rotate = 0;
267 int read_ret = serial_read();
268 /* -1 is an error we don't deal with, so just skip out of the loop */
269 if (read_ret == -1) continue;
270 /* depth tests are not written to the journal */
271 if (serial_handle_depth_test()) continue;
272 /* XPORT_INTR from read means we were interrupted and should not
273 * write, so write when we are not interrupted, this is for backward
274 * compatibility when we didn't do rotation signals correctly here
276 if (read_ret != XPORT_INTR ) serial_write();
277 /* check for rotation event, or signal's and rotate if necessary */
278 if (header_is_rotate(buf)) {
279 memcpy(&dst.latest_rotate_header, buf, HEADER_LENGTH) ;
280 dst.rotation_type = LJ_RT_EVENT;
281 is_rotate = 1;
283 if (is_rotate || gbl_rotate_dequeue || gbl_rotate_enqueue) {
284 serial_rotate();
285 is_rotate = 0;
287 /* maybe send depth test */
288 if (tm >= depth_tm + depth_dtm) serial_send_buffer_depth_test();
290 while (!gbl_done);
292 serial_dtor();