1 /*======================================================================*
2 * Copyright (c) 2010, OpenX Inc. All rights reserved. *
3 * Copyright (c) 2010-2016, OpenX Inc. All rights reserved. *
5 * Licensed under the New BSD License (the "License"); you may not use *
6 * this file except in compliance with the License. Unless required *
7 * by applicable law or agreed to in writing, software distributed *
8 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
9 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
10 * See the License for the specific language governing permissions and *
11 * limitations under the License. See accompanying LICENSE file. *
12 *======================================================================*/
19 #include "serial_model.h"
30 #define BUFLEN (65535)
31 #define SERIAL_DEPTH_COMMAND ("\026Internal::Queue::Depth")
34 unsigned char buf
[BUFLEN
];
37 struct enqueuer_stats est
;
38 struct dequeuer_stats dst
;
39 unsigned long long tm
;
40 struct lwes_emitter
* emitter
= NULL
;
41 /* these are used for doing a depth test, which every depth_dtm milliseconds
42 * sends an event SERIAL_DEPTH_COMMAND to this journaller, and later takes
43 * that and uses it to estimate how many events were in the operating system
46 unsigned long long depth_tm
= 0;
47 unsigned long long depth_dtm
= 10000;
48 /* count is the total number of events received by this journaller */
49 unsigned long long count
= 0;
50 /* pending is the number of events received between sending and receiving
51 * a queue depth test event. It represents the number of events in
52 * the recv buffer of the operating system.
54 unsigned long long pending
= 0;
56 static void serial_open_journal(void);
58 static void serial_ctor(void)
60 install_signal_handlers();
61 install_rotate_signal_handlers();
62 install_interval_rotate_handlers(1);
64 depth_dtm
= arg_queue_test_interval
;
66 if ( enqueuer_stats_ctor (&est
) < 0 )
68 LOG_ER("Failed to create initialize enqueuer stats.\n");
72 if ( dequeuer_stats_ctor (&dst
) < 0 )
74 LOG_ER("Failed to create initialize dequeuer stats.\n");
78 memset(buf
, 0, BUFLEN
); /* Clear the message. */
80 if ( (xport_factory(&xpt
) < 0) || (xpt
.vtbl
->open(&xpt
, O_RDONLY
) < 0) )
82 LOG_ER("Failed to create xport object.\n");
86 if (arg_njournalls
!= 1)
88 LOG_ER("Expected 1 journal name pattern, but found %d\n", arg_njournalls
);
92 /* Create journal object. */
93 if (journal_factory(&jrn
, arg_journalls
[0]) < 0)
95 LOG_ER("Failed to create journal object for \"%s\".\n", arg_journalls
[0]);
99 serial_open_journal();
101 emitter
= lwes_emitter_create( (LWES_CONST_SHORT_STRING
) arg_ip
,
102 (LWES_CONST_SHORT_STRING
) NULL
, //arg_interface,
103 (LWES_U_INT_32
) arg_port
, 0, 60 );
106 static void serial_open_journal(void)
108 if (jrn
.vtbl
->open(&jrn
, O_WRONLY
) < 0)
110 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls
[0]);
115 static void serial_close_journal(void)
117 if (gbl_done
|| gbl_rotate_enqueue
)
119 enqueuer_stats_rotate(&est
);
120 enqueuer_stats_flush();
121 gbl_rotate_enqueue
= 0;
124 if (gbl_done
|| gbl_rotate_dequeue
)
126 dequeuer_stats_rotate(&dst
);
127 dequeuer_stats_flush();
128 gbl_rotate_dequeue
= 0;
131 if (jrn
.vtbl
->close(&jrn
) < 0) {
132 LOG_ER("Can't close journal \"%s\".\n", arg_journalls
[0]);
137 static void serial_rotate(void)
139 serial_close_journal();
140 serial_open_journal();
143 static int serial_read(void)
148 xpt
.vtbl
->read(&xpt
, buf
+HEADER_LENGTH
, BUFLEN
-HEADER_LENGTH
, &addr
, &port
);
150 if (xpt_read_ret
>= 0)
152 tm
= time_in_milliseconds();
153 buflen
= xpt_read_ret
+ HEADER_LENGTH
;
154 enqueuer_stats_record_datagram(&est
, buflen
);
155 header_add(buf
, xpt_read_ret
, tm
, addr
, port
);
159 else if (xpt_read_ret
== XPORT_INTR
)
161 return XPORT_INTR
; /* return something special if interrupted
162 (we might be rotating via a signal or
167 enqueuer_stats_record_socket_error(&est
);
172 static int serial_handle_depth_test()
174 /* check for the serial depth event */
175 if (toknam_eq((unsigned char *)buf
+ HEADER_LENGTH
,
176 (unsigned char *)SERIAL_DEPTH_COMMAND
))
178 struct lwes_event_deserialize_tmp event_tmp
;
180 struct lwes_event
* event
= lwes_event_create_no_name(NULL
);
182 /* don't include these internal events in our stats */
183 enqueuer_stats_erase_datagram(&est
, buflen
);
185 /* we should have one, so deserialize it */
186 bytes_read
= lwes_event_from_bytes(event
,
187 (LWES_BYTE_P
)&buf
[HEADER_LENGTH
],
188 buflen
-HEADER_LENGTH
,
190 if (bytes_read
!= buflen
-HEADER_LENGTH
)
192 LOG_ER("Only able to read %d bytes; expected %d\n", bytes_read
, buflen
);
196 /* get the previous count */
197 LWES_U_INT_64 previous_count
;
198 lwes_event_get_U_INT_64(event
, "count", &previous_count
);
199 /* determine the difference and remove one for this special event */
200 pending
= count
- previous_count
- 1;
201 LOG_INF("Depth test reports a buffer length of %lld events.\n", pending
);
204 lwes_event_destroy(event
);
213 /* this code attempts to determine if there are any events in the operating
214 * system queue by sending an event with the current received count
215 * and later checking it
217 static void serial_send_buffer_depth_test(void)
219 struct lwes_event
*event
=
220 lwes_event_create( (struct lwes_event_type_db
*) NULL
,
221 (LWES_SHORT_STRING
) SERIAL_DEPTH_COMMAND
+1);
222 if (event
== NULL
) return;
223 if (lwes_event_set_U_INT_64(event
,"count",count
) < 0)
225 LOG_ER("Unable to add count to depth event");
229 lwes_emitter_emitto((char*) "127.0.0.1", NULL
, arg_port
, emitter
, event
);
231 lwes_event_destroy(event
);
233 depth_tm
= time_in_milliseconds();
236 static void serial_write(void)
238 /* Write the packet out to the journal. */
239 int jrn_write_ret
= jrn
.vtbl
->write(&jrn
, buf
, buflen
);
241 if (jrn_write_ret
!= buflen
)
243 LOG_ER("Journal write error -- attempted to write %d bytes, "
244 "write returned %d.\n", buflen
, jrn_write_ret
);
245 dequeuer_stats_record_loss(&dst
);
248 dequeuer_stats_record(&dst
, buflen
, pending
);
251 static void serial_dtor(void)
253 serial_close_journal();
254 lwes_emitter_destroy(emitter
);
255 xpt
.vtbl
->destructor(&xpt
);
256 jrn
.vtbl
->destructor(&jrn
);
257 enqueuer_stats_dtor(&est
);
258 dequeuer_stats_dtor(&dst
);
261 void serial_model(void)
267 int read_ret
= serial_read();
268 /* -1 is an error we don't deal with, so just skip out of the loop */
269 if (read_ret
== -1) continue;
270 /* depth tests are not written to the journal */
271 if (serial_handle_depth_test()) continue;
272 /* XPORT_INTR from read means we were interrupted and should not
273 * write, so write when we are not interrupted, this is for backward
274 * compatibility when we didn't do rotation signals correctly here
276 if (read_ret
!= XPORT_INTR
) serial_write();
277 /* check for rotation event, or signal's and rotate if necessary */
278 if (header_is_rotate(buf
)) {
279 memcpy(&dst
.latest_rotate_header
, buf
, HEADER_LENGTH
) ;
280 dst
.rotation_type
= LJ_RT_EVENT
;
283 if (is_rotate
|| gbl_rotate_dequeue
|| gbl_rotate_enqueue
) {
287 /* maybe send depth test */
288 if (tm
>= depth_tm
+ depth_dtm
) serial_send_buffer_depth_test();