include headers in distribution
[lwes-journaller.git] / src / serial_model.c
blob594f8bd329d3f8b48390bda1b9656b0202dddec0
1 /*======================================================================*
2 * Copyright (c) 2010, OpenX Inc. All rights reserved. *
3 * *
4 * Licensed under the New BSD License (the "License"); you may not use *
5 * this file except in compliance with the License. Unless required *
6 * by applicable law or agreed to in writing, software distributed *
7 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
8 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
9 * See the License for the specific language governing permissions and *
10 * limitations under the License. See accompanying LICENSE file. *
11 *======================================================================*/
13 #include "config.h"
15 #include "journal.h"
16 #include "log.h"
17 #include "opt.h"
18 #include "serial_model.h"
19 #include "sig.h"
20 #include "stats.h"
21 #include "xport.h"
23 #include <fcntl.h>
24 #include <lwes.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
29 #define BUFLEN (65535)
30 #define SERIAL_DEPTH_COMMAND ("\026Internal::Queue::Depth")
32 struct xport xpt;
33 unsigned char buf[BUFLEN];
34 int buflen;
35 struct journal jrn;
36 struct enqueuer_stats est;
37 struct dequeuer_stats dst;
38 unsigned long long tm;
39 struct lwes_emitter* emitter = NULL;
40 unsigned long long depth_tm = 0;
41 const unsigned long long depth_dtm = 10000;
42 unsigned long long count = 0;
43 unsigned long long pending = 0;
45 static void serial_open_journal(void);
47 static void serial_ctor(void)
49 install_signal_handlers();
50 install_rotate_signal_handlers();
52 if ( enqueuer_stats_ctor (&est) < 0 )
54 LOG_ER("Failed to create initialize enqueuer stats.\n");
55 exit(EXIT_FAILURE);
58 if ( dequeuer_stats_ctor (&dst) < 0 )
60 LOG_ER("Failed to create initialize dequeuer stats.\n");
61 exit(EXIT_FAILURE);
64 memset(buf, 0, BUFLEN); /* Clear the message. */
66 if ( (xport_factory(&xpt) < 0) || (xpt.vtbl->open(&xpt, O_RDONLY) < 0) )
68 LOG_ER("Failed to create xport object.\n");
69 exit(EXIT_FAILURE);
72 if (arg_njournalls != 1)
74 LOG_ER("Expected 1 journal name pattern, but found %d\n", arg_njournalls);
75 exit(EXIT_FAILURE);
78 /* Create journal object. */
79 if (journal_factory(&jrn, arg_journalls[0]) < 0)
81 LOG_ER("Failed to create journal object for \"%s\".\n", arg_journalls[0]);
82 exit(EXIT_FAILURE);
85 serial_open_journal();
87 emitter = lwes_emitter_create( (LWES_CONST_SHORT_STRING) arg_ip,
88 (LWES_CONST_SHORT_STRING) NULL, //arg_interface,
89 (LWES_U_INT_32) arg_port, 0, 60 );
92 static void serial_open_journal(void)
94 if (jrn.vtbl->open(&jrn, O_WRONLY) < 0)
96 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls[0]);
97 exit(EXIT_FAILURE);
101 static void serial_close_journal(void)
103 enqueuer_stats_rotate(&est);
104 dequeuer_stats_rotate(&dst);
106 if (jrn.vtbl->close(&jrn) < 0) {
107 LOG_ER("Can't close journal \"%s\".\n", arg_journalls[0]);
108 exit(EXIT_FAILURE);
112 static void serial_rotate(void)
114 serial_close_journal();
115 serial_open_journal();
116 gbl_rotate = 0;
119 static int serial_read(void)
121 unsigned long addr;
122 short port;
123 int xpt_read_ret =
124 xpt.vtbl->read(&xpt, buf+HEADER_LENGTH, BUFLEN-HEADER_LENGTH, &addr, &port);
126 if (xpt_read_ret < 0)
128 enqueuer_stats_record_socket_error(&est);
129 return -1;
132 tm = time_in_milliseconds();
133 buflen = xpt_read_ret + HEADER_LENGTH;
134 enqueuer_stats_record_datagram(&est, buflen);
135 header_add(buf, xpt_read_ret, tm, addr, port);
136 ++count;
138 return 0;
141 static int serial_handle_depth_test()
143 if (toknam_eq((unsigned char *)buf + HEADER_LENGTH, (unsigned char *)SERIAL_DEPTH_COMMAND))
145 struct lwes_event_deserialize_tmp event_tmp;
146 struct lwes_event_enumeration keys;
147 LWES_CONST_SHORT_STRING key;
148 LWES_TYPE type;
149 int bytes_read;
150 struct lwes_event* event = lwes_event_create_no_name(NULL);
152 bytes_read = lwes_event_from_bytes(event, (LWES_BYTE_P)&buf[HEADER_LENGTH],
153 buflen-HEADER_LENGTH, 0, &event_tmp);
154 if (bytes_read != buflen-HEADER_LENGTH)
156 LOG_ER("Only able to read %d bytes; expected %d\n", bytes_read, buflen);
158 else if (!lwes_event_keys(event, &keys))
160 LOG_ER("Unable to iterate over keys\n");
162 else
164 while (lwes_event_enumeration_next_element(&keys, &key, &type))
166 if (strncmp("count",key,6)==0)
168 LWES_U_INT_64 value;
169 lwes_event_get_U_INT_64(event, key, &value);
170 pending = count - value;
171 LOG_INF("Depth test reports a buffer length of %lld events.\n", pending);
176 lwes_event_destroy(event);
178 return 1;
180 else
182 return 0;
186 static void serial_send_buffer_depth_test(void)
188 struct lwes_event *event =
189 lwes_event_create( (struct lwes_event_type_db *) NULL,
190 (LWES_SHORT_STRING) SERIAL_DEPTH_COMMAND+1);
191 if (event == NULL) return;
192 if (lwes_event_set_U_INT_64(event,"count",count) < 0)
194 LOG_ER("Unable to add count to depth event");
196 else
198 lwes_emitter_emitto((char*) "127.0.0.1", NULL, arg_port, emitter, event);
200 lwes_event_destroy(event);
202 depth_tm = time_in_milliseconds();
205 static void serial_write(void)
207 /* Write the packet out to the journal. */
208 int jrn_write_ret = jrn.vtbl->write(&jrn, buf, buflen);
210 if (jrn_write_ret != buflen)
212 LOG_ER("Journal write error -- attempted to write %d bytes, "
213 "write returned %d.\n", buflen, jrn_write_ret);
214 dequeuer_stats_record_loss(&dst);
217 dequeuer_stats_record(&dst, buflen, pending);
220 static void serial_dtor(void)
222 serial_close_journal();
225 void serial_model(void)
227 serial_ctor();
229 do {
230 if (serial_read() < 0) continue;
231 if (serial_handle_depth_test()) continue;
232 serial_write();
233 if (header_is_rotate(buf) || gbl_rotate) serial_rotate();
234 if (tm >= depth_tm + depth_dtm) serial_send_buffer_depth_test();
236 while (!gbl_done);
238 serial_dtor();