Log the queue size before and after journal rotation, to see if rotation is killing us.
[lwes-journaller.git] / src / stats.c
blob44a1a4b721c4d80e225db3902d0951a1c3bb1afb
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
23 #include "stats.h"
25 #include "header.h"
26 #include "log.h"
27 #include "opt.h"
28 #include "sig.h"
29 #include "marshal.h"
30 #include "perror.h"
31 #include "lwes_mondemand.h"
33 #include <string.h>
34 #include <sys/time.h>
35 #include <time.h>
37 /* To rid us of gcc warnings, see the strftime(3) man page on Linux
38 for more info. */
40 static size_t my_strftime (char* s, size_t max, const char* fmt,
41 const struct tm* tm)
43 return strftime (s, max, fmt, tm);
46 unsigned long long time_in_milliseconds ()
48 struct timeval t;
50 if ( -1 == gettimeofday(&t, 0) ) /* This is where we need to */
51 PERROR("gettimeofday"); /* timestamp the packet. */
53 return (((long long)t.tv_sec) * 1000LL) + (long long)(t.tv_usec/1000);
56 int enqueuer_stats_ctor (struct enqueuer_stats* st)
58 memset (st, 0, sizeof(*st));
60 st->start_time = time (NULL);
61 st->last_rotate = st->start_time;
63 return 0;
66 int dequeuer_stats_ctor (struct dequeuer_stats* st)
68 memset (st, 0, sizeof(*st));
70 st->start_time = time (NULL);
71 st->hiq_start = st->start_time;
72 st->last_rotate = st->start_time;
74 return 0;
77 void enqueuer_stats_record_socket_error (struct enqueuer_stats* st)
79 ++st->socket_errors_since_last_rotate;
82 void enqueuer_stats_record_datagram (struct enqueuer_stats* st, int bytes)
84 st->bytes_received_since_last_rotate += bytes;
85 st->bytes_received_total += bytes;
86 ++st->packets_received_since_last_rotate;
87 ++st->packets_received_total;
90 void dequeuer_stats_record (struct dequeuer_stats* st, int bytes, int pending)
92 /* Collect some stats. All byte counts represent the bytes used
93 in the original packets, not with the headers applied. */
95 st->bytes_written_since_last_rotate += bytes;
96 st->bytes_written_total += bytes;
98 ++st->packets_written_since_last_rotate;
99 ++st->packets_written_total;
101 /* No pending packets in queue. */
102 if ( 0 == pending )
104 if ( st->hiq )
106 /* We just grabbed that last packet of a burst. */
107 st->bytes_written_in_burst_since_last_rotate = st->bytes_written_in_burst;
108 st->packets_written_in_burst_since_last_rotate = st->packets_written_in_burst;
110 st->hiq = 0;
111 st->bytes_written_in_burst = 0;
112 st->packets_written_in_burst = 0;
115 else /* In a burst. */
117 /* This is a new burst. */
118 if ( 0 == st->hiq )
120 st->hiq_last = st->hiq_start;
121 st->hiq_start = time(NULL);
123 /* Possible peak. */
124 if ( pending > st->hiq )
126 st->hiq = pending;
129 st->bytes_written_in_burst += bytes;
130 st->packets_written_in_burst += 1;
133 if ( st->hiq > st->hiq_since_last_rotate )
135 st->hiq_since_last_rotate = st->hiq ;
139 void dequeuer_stats_record_loss (struct dequeuer_stats* st)
141 st->loss_since_last_rotate += 1;
145 static void log_rates(log_mask_t level, const char* file, int line, double bps, double pps,
146 const char* notes)
148 if ( bps > 1000000. )
150 log_msg(level, file, line, " %g mbps, %g pps%s.\n", bps / 1000000., pps, notes);
152 else if ( bps > 1000. )
154 log_msg(level, file, line, " %g kbps, %g pps%s.\n", bps / 1000., pps, notes);
156 else
158 log_msg(level, file, line, " %g bps, %g pps%s.\n", bps, pps, notes);
162 void enqueuer_stats_rotate(struct enqueuer_stats* st)
164 double rbps, rpps;
166 time_t now = time(NULL);
167 time_t uptime;
169 if ( st->socket_errors_since_last_rotate )
171 LOG_ER("*** %lld packets had socket errors in this journal ***\n",
172 st->socket_errors_since_last_rotate);
175 LOG_INF("Socket read errors since last rotate: %lld\n",
176 st->socket_errors_since_last_rotate);
178 LOG_INF("Events read since last rotate:\n");
179 LOG_INF(" %lld bytes, %lld packets in this journal.\n",
180 st->bytes_received_since_last_rotate,
181 st->packets_received_since_last_rotate);
182 uptime = now - st->last_rotate;
183 rbps = (8. * (double)st->bytes_received_since_last_rotate) / (double)uptime;
184 rpps = ((double)st->packets_received_since_last_rotate) / (double)uptime;
185 log_rates(LOG_INFO,__FILE__,__LINE__,rbps,rpps," received");
187 st->socket_errors_since_last_rotate = 0LL;
188 st->bytes_received_since_last_rotate = 0LL;
189 st->packets_received_since_last_rotate = 0LL;
190 st->last_rotate = now;
192 LOG_INF("Enqueuer stats summary v2:\t%d\t%lld\t%lld\t%lld\t%lld\t%lld\t%d\n",
193 now, st->socket_errors_since_last_rotate,
194 st->bytes_received_total, st->bytes_received_since_last_rotate,
195 st->packets_received_total, st->packets_received_since_last_rotate,
196 uptime);
198 mondemand_enqueuer_stats(st,now);
201 void dequeuer_stats_rotate(struct dequeuer_stats* st)
203 double wbps, wpps;
205 time_t now = time(NULL);
206 time_t uptime;
208 if ( st->loss_since_last_rotate )
210 LOG_ER("*** %lld packets lost in this journal ***\n",
211 st->loss_since_last_rotate);
214 LOG_INF("Events written since last rotate:\n");
215 LOG_INF(" %lld bytes, %lld packets in this journal.\n",
216 st->bytes_written_since_last_rotate,
217 st->packets_written_since_last_rotate);
218 uptime = now - st->last_rotate;
219 wbps = (8. * (double)st->bytes_written_since_last_rotate) / (double)uptime;
220 wpps = ((double)st->packets_written_since_last_rotate) / (double)uptime;
221 log_rates(LOG_INFO,__FILE__,__LINE__,wbps,wpps," written");
223 LOG_INF("Highest queue utilization since last rotate: %d packets.\n",
224 st->hiq_since_last_rotate);
225 LOG_INF("Highest burst since last rotate: %lli packets, %lli bytes.\n",
226 st->packets_written_in_burst_since_last_rotate,
227 st->bytes_written_in_burst_since_last_rotate);
229 LOG_INF("Command::Rotate from IP %s traversed the queue in %ld ms\n",
230 header_sender_ip_formatted(st->latest_rotate_header),
231 time_in_milliseconds()-header_receipt_time(st->latest_rotate_header));
233 st->bytes_written_since_last_rotate = 0LL;
234 st->packets_written_since_last_rotate = 0LL;
235 st->hiq_since_last_rotate = 0;
236 st->packets_written_in_burst_since_last_rotate = st->bytes_written_in_burst_since_last_rotate = 0LL ;
237 st->loss_since_last_rotate = 0LL;
238 st->last_rotate = now;
240 LOG_INF("Dequeuer stats summary v2:\t%d\t%lld\t%lld\t%lld\t%lld\t%lld\t%lld\t%lld\t%d\t%d\t%d\t%d\t%lld\t%lld\t%d\n",
241 now, st->loss_since_last_rotate,
242 st->bytes_written_total, st->bytes_written_since_last_rotate,
243 st->packets_written_total, st->packets_written_since_last_rotate,
244 st->bytes_written_in_burst, st->packets_written_in_burst, st->hiq,
245 st->hiq_start, st->hiq_last, st->hiq_since_last_rotate,
246 st->bytes_written_in_burst_since_last_rotate,
247 st->packets_written_in_burst_since_last_rotate, uptime);
249 mondemand_dequeuer_stats(st,now);
252 void enqueuer_stats_report(struct enqueuer_stats* st)
254 double rbps, rpps;
255 char startbfr[100];
256 char nowbfr[100];
258 struct tm tm_st;
260 time_t now = time(NULL);
261 time_t uptime;
263 if ( my_strftime(startbfr, sizeof(startbfr),
264 "%c", localtime_r(&st->start_time, &tm_st)) == 0 )
266 LOG_ER("strftime failure");
269 if ( my_strftime(nowbfr, sizeof(nowbfr),
270 "%c", localtime_r(&now, &tm_st)) == 0 )
272 LOG_ER("strftime failure");
275 uptime = now - st->start_time;
277 rbps = (8. * (double)st->bytes_received_total) / (double)uptime;
278 rpps = ((double)st->packets_received_total) / (double)uptime;
280 LOG_INF("Total network traffic received:\n");
281 LOG_INF(" %lld bytes, %lld packets.\n",
282 st->bytes_received_total,
283 st->packets_received_total);
284 log_rates(LOG_INFO,__FILE__,__LINE__,rbps,rpps," received");
287 void dequeuer_stats_report(struct dequeuer_stats* st)
289 double wbps, wpps;
290 char startbfr[100];
291 char nowbfr[100];
293 struct tm tm_st;
295 time_t now = time(NULL);
296 time_t uptime;
298 if ( my_strftime(startbfr, sizeof(startbfr),
299 "%c", localtime_r(&st->start_time, &tm_st)) == 0 )
301 LOG_ER("strftime failure");
304 if ( my_strftime(nowbfr, sizeof(nowbfr),
305 "%c", localtime_r(&now, &tm_st)) == 0 )
307 LOG_ER("strftime failure");
310 uptime = now - st->start_time;
312 wbps = (8. * (double)st->bytes_written_total) / (double)uptime;
313 wpps = ((double)st->packets_written_total) / (double)uptime;
315 LOG_INF("Total network traffic recorded:\n");
316 LOG_INF(" %lld bytes, %lld packets.\n",
317 st->bytes_written_total,
318 st->packets_written_total);
319 log_rates(LOG_INFO,__FILE__,__LINE__,wbps,wpps," written");