1 /*======================================================================*
2 * Copyright (c) 2010, OpenX Inc. All rights reserved. *
4 * Licensed under the New BSD License (the "License"); you may not use *
5 * this file except in compliance with the License. Unless required *
6 * by applicable law or agreed to in writing, software distributed *
7 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
8 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
9 * See the License for the specific language governing permissions and *
10 * limitations under the License. See accompanying LICENSE file. *
11 *======================================================================*/
18 #include "serial_model.h"
29 #define BUFLEN (65535)
30 #define SERIAL_DEPTH_COMMAND ("\026Internal::Queue::Depth")
33 unsigned char buf
[BUFLEN
];
36 struct enqueuer_stats est
;
37 struct dequeuer_stats dst
;
38 unsigned long long tm
;
39 struct lwes_emitter
* emitter
= NULL
;
40 unsigned long long depth_tm
= 0;
41 const unsigned long long depth_dtm
= 10000;
42 unsigned long long count
= 0;
43 unsigned long long pending
= 0;
45 static void serial_open_journal(void);
47 static void serial_ctor(void)
49 install_signal_handlers();
50 install_rotate_signal_handlers();
52 if ( enqueuer_stats_ctor (&est
) < 0 )
54 LOG_ER("Failed to create initialize enqueuer stats.\n");
58 if ( dequeuer_stats_ctor (&dst
) < 0 )
60 LOG_ER("Failed to create initialize dequeuer stats.\n");
64 memset(buf
, 0, BUFLEN
); /* Clear the message. */
66 if ( (xport_factory(&xpt
) < 0) || (xpt
.vtbl
->open(&xpt
, O_RDONLY
) < 0) )
68 LOG_ER("Failed to create xport object.\n");
72 if (arg_njournalls
!= 1)
74 LOG_ER("Expected 1 journal name pattern, but found %d\n", arg_njournalls
);
78 /* Create journal object. */
79 if (journal_factory(&jrn
, arg_journalls
[0]) < 0)
81 LOG_ER("Failed to create journal object for \"%s\".\n", arg_journalls
[0]);
85 serial_open_journal();
87 emitter
= lwes_emitter_create( (LWES_CONST_SHORT_STRING
) arg_ip
,
88 (LWES_CONST_SHORT_STRING
) NULL
, //arg_interface,
89 (LWES_U_INT_32
) arg_port
, 0, 60 );
92 static void serial_open_journal(void)
94 if (jrn
.vtbl
->open(&jrn
, O_WRONLY
) < 0)
96 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls
[0]);
101 static void serial_close_journal(void)
103 enqueuer_stats_rotate(&est
);
104 dequeuer_stats_rotate(&dst
);
106 if (jrn
.vtbl
->close(&jrn
) < 0) {
107 LOG_ER("Can't close journal \"%s\".\n", arg_journalls
[0]);
112 static void serial_rotate(void)
114 serial_close_journal();
115 serial_open_journal();
119 static int serial_read(void)
124 xpt
.vtbl
->read(&xpt
, buf
+HEADER_LENGTH
, BUFLEN
-HEADER_LENGTH
, &addr
, &port
);
126 if (xpt_read_ret
< 0)
128 enqueuer_stats_record_socket_error(&est
);
132 tm
= time_in_milliseconds();
133 buflen
= xpt_read_ret
+ HEADER_LENGTH
;
134 enqueuer_stats_record_datagram(&est
, buflen
);
135 header_add(buf
, xpt_read_ret
, tm
, addr
, port
);
141 static int serial_handle_depth_test()
143 if (toknam_eq((unsigned char *)buf
+ HEADER_LENGTH
, (unsigned char *)SERIAL_DEPTH_COMMAND
))
145 struct lwes_event_deserialize_tmp event_tmp
;
146 struct lwes_event_enumeration keys
;
147 LWES_CONST_SHORT_STRING key
;
150 struct lwes_event
* event
= lwes_event_create_no_name(NULL
);
152 bytes_read
= lwes_event_from_bytes(event
, (LWES_BYTE_P
)&buf
[HEADER_LENGTH
],
153 buflen
-HEADER_LENGTH
, 0, &event_tmp
);
154 if (bytes_read
!= buflen
-HEADER_LENGTH
)
156 LOG_ER("Only able to read %d bytes; expected %d\n", bytes_read
, buflen
);
158 else if (!lwes_event_keys(event
, &keys
))
160 LOG_ER("Unable to iterate over keys\n");
164 while (lwes_event_enumeration_next_element(&keys
, &key
, &type
))
166 if (strncmp("count",key
,6)==0)
169 lwes_event_get_U_INT_64(event
, key
, &value
);
170 pending
= count
- value
;
171 LOG_INF("Depth test reports a buffer length of %lld events.\n", pending
);
176 lwes_event_destroy(event
);
186 static void serial_send_buffer_depth_test(void)
188 struct lwes_event
*event
=
189 lwes_event_create( (struct lwes_event_type_db
*) NULL
,
190 (LWES_SHORT_STRING
) SERIAL_DEPTH_COMMAND
+1);
191 if (event
== NULL
) return;
192 if (lwes_event_set_U_INT_64(event
,"count",count
) < 0)
194 LOG_ER("Unable to add count to depth event");
198 lwes_emitter_emitto((char*) "127.0.0.1", NULL
, arg_port
, emitter
, event
);
200 lwes_event_destroy(event
);
202 depth_tm
= time_in_milliseconds();
205 static void serial_write(void)
207 /* Write the packet out to the journal. */
208 int jrn_write_ret
= jrn
.vtbl
->write(&jrn
, buf
, buflen
);
210 if (jrn_write_ret
!= buflen
)
212 LOG_ER("Journal write error -- attempted to write %d bytes, "
213 "write returned %d.\n", buflen
, jrn_write_ret
);
214 dequeuer_stats_record_loss(&dst
);
217 dequeuer_stats_record(&dst
, buflen
, pending
);
220 static void serial_dtor(void)
222 serial_close_journal();
225 void serial_model(void)
230 if (serial_read() < 0) continue;
231 if (serial_handle_depth_test()) continue;
233 if (header_is_rotate(buf
) || gbl_rotate
) serial_rotate();
234 if (tm
>= depth_tm
+ depth_dtm
) serial_send_buffer_depth_test();