1 /*======================================================================*
2 * Copyright (c) 2010, OpenX Inc. All rights reserved. *
3 * Copyright (c) 2010-2016, OpenX Inc. All rights reserved. *
5 * Licensed under the New BSD License (the "License"); you may not use *
6 * this file except in compliance with the License. Unless required *
7 * by applicable law or agreed to in writing, software distributed *
8 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
9 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
10 * See the License for the specific language governing permissions and *
11 * limitations under the License. See accompanying LICENSE file. *
12 *======================================================================*/
19 #include "serial_model.h"
22 #include "time_utils.h"
31 #define BUFLEN (65535)
32 #define SERIAL_DEPTH_COMMAND ("\026Internal::Queue::Depth")
35 unsigned char buf
[BUFLEN
];
38 struct enqueuer_stats est
;
39 struct dequeuer_stats dst
;
40 unsigned long long tm
;
41 struct lwes_emitter
* emitter
= NULL
;
42 /* these are used for doing a depth test, which every depth_dtm milliseconds
43 * sends an event SERIAL_DEPTH_COMMAND to this journaller, and later takes
44 * that and uses it to estimate how many events were in the operating system
47 unsigned long long depth_tm
= 0;
48 unsigned long long depth_dtm
= 10000;
49 /* count is the total number of events received by this journaller */
50 unsigned long long count
= 0;
51 /* pending is the number of events received between sending and receiving
52 * a queue depth test event. It represents the number of events in
53 * the recv buffer of the operating system.
55 unsigned long long pending
= 0;
57 static void serial_open_journal(FILE *log
);
59 static void serial_ctor(FILE *log
)
61 install_termination_signal_handlers(log
);
62 install_rotate_signal_handlers(log
);
63 install_interval_rotate_handlers(log
, 1);
64 install_log_rotate_signal_handlers(log
, 1, SIGUSR1
);
66 depth_dtm
= arg_queue_test_interval
;
68 if ( enqueuer_stats_ctor (&est
) < 0 )
70 LOG_ER(log
, "Failed to create initialize enqueuer stats.\n");
74 if ( dequeuer_stats_ctor (&dst
) < 0 )
76 LOG_ER(log
, "Failed to create initialize dequeuer stats.\n");
80 memset(buf
, 0, BUFLEN
); /* Clear the message. */
82 if ( (xport_factory(&xpt
, log
) < 0) || (xpt
.vtbl
->open(&xpt
, O_RDONLY
) < 0) )
84 LOG_ER(log
, "Failed to create xport object.\n");
88 if (arg_njournalls
!= 1)
90 LOG_ER(log
, "Expected 1 journal name pattern, but found %d\n", arg_njournalls
);
94 /* Create journal object. */
95 if (journal_factory(&jrn
, arg_journalls
[0], log
) < 0)
97 LOG_ER(log
, "Failed to create journal object for \"%s\".\n", arg_journalls
[0]);
101 serial_open_journal(log
);
103 emitter
= lwes_emitter_create( (LWES_CONST_SHORT_STRING
) arg_ip
,
104 (LWES_CONST_SHORT_STRING
) NULL
, //arg_interface,
105 (LWES_U_INT_32
) arg_port
, 0, 60 );
108 static void serial_open_journal(FILE *log
)
110 if (jrn
.vtbl
->open(&jrn
, O_WRONLY
, log
) < 0)
112 LOG_ER(log
, "Failed to open the journal \"%s\".\n", arg_journalls
[0]);
117 static void serial_close_journal(int is_rotate_event
, FILE *log
)
119 if (gbl_done
|| gbl_rotate_enqueue
|| is_rotate_event
)
121 enqueuer_stats_rotate(&est
, log
);
124 enqueuer_stats_flush (&est
);
126 CAS_OFF(gbl_rotate_enqueue
);
129 if (gbl_done
|| gbl_rotate_dequeue
|| is_rotate_event
)
131 dequeuer_stats_rotate (&dst
, log
);
134 dequeuer_stats_flush (&dst
);
136 CAS_OFF(gbl_rotate_dequeue
);
139 if (jrn
.vtbl
->close(&jrn
, log
) < 0) {
140 LOG_ER(log
, "Can't close journal \"%s\".\n", arg_journalls
[0]);
145 static void serial_rotate(int is_rotate_event
, FILE *log
)
147 serial_close_journal(is_rotate_event
, log
);
148 serial_open_journal(log
);
151 static int serial_read(void)
156 /* 0 out part of the the event name so if we get a rotate event the program
157 * will not continually rotate */
158 memset(buf
, 0, HEADER_LENGTH
+20);
161 xpt
.vtbl
->read(&xpt
, buf
+HEADER_LENGTH
, BUFLEN
-HEADER_LENGTH
, &addr
, &port
);
163 if (xpt_read_ret
>= 0)
166 buflen
= xpt_read_ret
+ HEADER_LENGTH
;
167 enqueuer_stats_record_datagram(&est
, buflen
);
168 header_add(buf
, xpt_read_ret
, tm
, addr
, port
);
172 else if (xpt_read_ret
== XPORT_INTR
)
174 return XPORT_INTR
; /* return something special if interrupted
175 (we might be rotating via a signal or
180 enqueuer_stats_record_socket_error(&est
);
185 static int serial_handle_depth_test()
187 /* check for the serial depth event */
188 if (toknam_eq((unsigned char *)buf
+ HEADER_LENGTH
,
189 (unsigned char *)SERIAL_DEPTH_COMMAND
))
191 struct lwes_event_deserialize_tmp event_tmp
;
193 struct lwes_event
* event
= lwes_event_create_no_name(NULL
);
195 /* don't include these internal events in our stats */
196 enqueuer_stats_erase_datagram(&est
, buflen
);
198 /* we should have one, so deserialize it */
199 bytes_read
= lwes_event_from_bytes(event
,
200 (LWES_BYTE_P
)&buf
[HEADER_LENGTH
],
201 buflen
-HEADER_LENGTH
,
203 if (bytes_read
!= buflen
-HEADER_LENGTH
)
205 LOG_ER(NULL
, "Only able to read %d bytes; expected %d\n", bytes_read
, buflen
);
209 /* get the previous count */
210 LWES_U_INT_64 previous_count
;
211 lwes_event_get_U_INT_64(event
, "count", &previous_count
);
212 /* determine the difference and remove one for this special event */
213 pending
= count
- previous_count
- 1;
214 LOG_INF(NULL
, "Depth test reports a buffer length of %lld events.\n", pending
);
217 lwes_event_destroy(event
);
226 /* this code attempts to determine if there are any events in the operating
227 * system queue by sending an event with the current received count
228 * and later checking it
230 static void serial_send_buffer_depth_test(void)
232 struct lwes_event
*event
=
233 lwes_event_create( (struct lwes_event_type_db
*) NULL
,
234 (LWES_SHORT_STRING
) SERIAL_DEPTH_COMMAND
+1);
235 if (event
== NULL
) return;
236 if (lwes_event_set_U_INT_64(event
,"count",count
) < 0)
238 LOG_ER(NULL
, "Unable to add count to depth event");
242 lwes_emitter_emitto((char*) "127.0.0.1", NULL
, arg_port
, emitter
, event
);
244 lwes_event_destroy(event
);
246 depth_tm
= millis_now ();
249 static void serial_write(void)
251 /* Write the packet out to the journal. */
252 int jrn_write_ret
= jrn
.vtbl
->write(&jrn
, buf
, buflen
);
254 if (jrn_write_ret
!= buflen
)
256 LOG_ER(NULL
, "Journal write error -- attempted to write %d bytes, "
257 "write returned %d.\n", buflen
, jrn_write_ret
);
258 dequeuer_stats_record_loss(&dst
);
261 dequeuer_stats_record(&dst
, buflen
, pending
);
264 static void serial_dtor(FILE *log
)
266 serial_close_journal(0, log
);
267 lwes_emitter_destroy(emitter
);
268 xpt
.vtbl
->destructor(&xpt
);
269 jrn
.vtbl
->destructor(&jrn
, log
);
270 enqueuer_stats_dtor(&est
);
271 dequeuer_stats_dtor(&dst
);
274 void serial_model(FILE *log
)
279 int is_rotate_event
= 0;
280 int read_ret
= serial_read();
281 /* -1 is an error we don't deal with, so just skip out of the loop */
282 if (read_ret
== -1) continue;
283 /* depth tests are not written to the journal */
284 if (serial_handle_depth_test()) continue;
285 /* XPORT_INTR from read means we were interrupted and should not
286 * write, so write when we are not interrupted, this is for backward
287 * compatibility when we didn't do rotation signals correctly here
289 if (read_ret
!= XPORT_INTR
) serial_write();
290 /* check for rotation event, or signal's and rotate if necessary */
291 if (header_is_rotate(buf
)) {
292 memcpy(&dst
.latest_rotate_header
, buf
, HEADER_LENGTH
) ;
293 dst
.rotation_type
= LJ_RT_EVENT
;
296 if (is_rotate_event
|| gbl_rotate_dequeue
|| gbl_rotate_enqueue
) {
297 serial_rotate(is_rotate_event
, log
);
300 if (gbl_rotate_main_log
) {
303 /* maybe send depth test */
304 if (tm
>= depth_tm
+ depth_dtm
) serial_send_buffer_depth_test();