1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
31 #include "lwes_mondemand.h"
37 /* To rid us of gcc warnings, see the strftime(3) man page on Linux
40 static size_t my_strftime (char* s
, size_t max
, const char* fmt
,
43 return strftime (s
, max
, fmt
, tm
);
46 unsigned long long time_in_milliseconds ()
50 if ( -1 == gettimeofday(&t
, 0) ) /* This is where we need to */
51 PERROR("gettimeofday"); /* timestamp the packet. */
53 return (((long long)t
.tv_sec
) * 1000LL) + (long long)(t
.tv_usec
/1000);
56 int enqueuer_stats_ctor (struct enqueuer_stats
* st
)
58 memset (st
, 0, sizeof(*st
));
60 st
->start_time
= time (NULL
);
61 st
->last_rotate
= st
->start_time
;
66 int dequeuer_stats_ctor (struct dequeuer_stats
* st
)
68 memset (st
, 0, sizeof(*st
));
70 st
->start_time
= time (NULL
);
71 st
->hiq_start
= st
->start_time
;
72 st
->last_rotate
= st
->start_time
;
77 void enqueuer_stats_record_socket_error (struct enqueuer_stats
* st
)
79 ++st
->socket_errors_since_last_rotate
;
82 void enqueuer_stats_record_datagram (struct enqueuer_stats
* st
, int bytes
)
84 st
->bytes_received_since_last_rotate
+= bytes
;
85 st
->bytes_received_total
+= bytes
;
86 ++st
->packets_received_since_last_rotate
;
87 ++st
->packets_received_total
;
90 void dequeuer_stats_record (struct dequeuer_stats
* st
, int bytes
, int pending
)
92 /* Collect some stats. All byte counts represent the bytes used
93 in the original packets, not with the headers applied. */
95 st
->bytes_written_since_last_rotate
+= bytes
;
96 st
->bytes_written_total
+= bytes
;
98 ++st
->packets_written_since_last_rotate
;
99 ++st
->packets_written_total
;
101 /* No pending packets in queue. */
106 /* We just grabbed that last packet of a burst. */
107 st
->bytes_written_in_burst_since_last_rotate
= st
->bytes_written_in_burst
;
108 st
->packets_written_in_burst_since_last_rotate
= st
->packets_written_in_burst
;
111 st
->bytes_written_in_burst
= 0;
112 st
->packets_written_in_burst
= 0;
115 else /* In a burst. */
117 /* This is a new burst. */
120 st
->hiq_last
= st
->hiq_start
;
121 st
->hiq_start
= time(NULL
);
124 if ( pending
> st
->hiq
)
129 st
->bytes_written_in_burst
+= bytes
;
130 st
->packets_written_in_burst
+= 1;
133 if ( st
->hiq
> st
->hiq_since_last_rotate
)
135 st
->hiq_since_last_rotate
= st
->hiq
;
139 void dequeuer_stats_record_loss (struct dequeuer_stats
* st
)
141 st
->loss_since_last_rotate
+= 1;
145 static void log_rates(log_mask_t level
, const char* file
, int line
, double bps
, double pps
,
148 if ( bps
> 1000000. )
150 log_msg(level
, file
, line
, " %g mbps, %g pps%s.\n", bps
/ 1000000., pps
, notes
);
152 else if ( bps
> 1000. )
154 log_msg(level
, file
, line
, " %g kbps, %g pps%s.\n", bps
/ 1000., pps
, notes
);
158 log_msg(level
, file
, line
, " %g bps, %g pps%s.\n", bps
, pps
, notes
);
162 void enqueuer_stats_rotate(struct enqueuer_stats
* st
)
166 time_t now
= time(NULL
);
169 if ( st
->socket_errors_since_last_rotate
)
171 LOG_ER("*** %lld packets had socket errors in this journal ***\n",
172 st
->socket_errors_since_last_rotate
);
175 LOG_INF("Socket read errors since last rotate: %lld\n",
176 st
->socket_errors_since_last_rotate
);
178 LOG_INF("Events read since last rotate:\n");
179 LOG_INF(" %lld bytes, %lld packets in this journal.\n",
180 st
->bytes_received_since_last_rotate
,
181 st
->packets_received_since_last_rotate
);
182 uptime
= now
- st
->last_rotate
;
183 rbps
= (8. * (double)st
->bytes_received_since_last_rotate
) / (double)uptime
;
184 rpps
= ((double)st
->packets_received_since_last_rotate
) / (double)uptime
;
185 log_rates(LOG_INFO
,__FILE__
,__LINE__
,rbps
,rpps
," received");
187 st
->socket_errors_since_last_rotate
= 0LL;
188 st
->bytes_received_since_last_rotate
= 0LL;
189 st
->packets_received_since_last_rotate
= 0LL;
190 st
->last_rotate
= now
;
192 LOG_INF("Enqueuer stats summary v2:\t%d\t%lld\t%lld\t%lld\t%lld\t%lld\t%d\n",
193 now
, st
->socket_errors_since_last_rotate
,
194 st
->bytes_received_total
, st
->bytes_received_since_last_rotate
,
195 st
->packets_received_total
, st
->packets_received_since_last_rotate
,
198 mondemand_enqueuer_stats(st
,now
);
201 void dequeuer_stats_rotate(struct dequeuer_stats
* st
)
205 time_t now
= time(NULL
);
208 if ( st
->loss_since_last_rotate
)
210 LOG_ER("*** %lld packets lost in this journal ***\n",
211 st
->loss_since_last_rotate
);
214 LOG_INF("Events written since last rotate:\n");
215 LOG_INF(" %lld bytes, %lld packets in this journal.\n",
216 st
->bytes_written_since_last_rotate
,
217 st
->packets_written_since_last_rotate
);
218 uptime
= now
- st
->last_rotate
;
219 wbps
= (8. * (double)st
->bytes_written_since_last_rotate
) / (double)uptime
;
220 wpps
= ((double)st
->packets_written_since_last_rotate
) / (double)uptime
;
221 log_rates(LOG_INFO
,__FILE__
,__LINE__
,wbps
,wpps
," written");
223 LOG_INF("Highest queue utilization since last rotate: %d packets.\n",
224 st
->hiq_since_last_rotate
);
225 LOG_INF("Highest burst since last rotate: %lli packets, %lli bytes.\n",
226 st
->packets_written_in_burst_since_last_rotate
,
227 st
->bytes_written_in_burst_since_last_rotate
);
229 LOG_INF("Command::Rotate from IP %s traversed the queue in %ld ms\n",
230 header_sender_ip_formatted(st
->latest_rotate_header
),
231 time_in_milliseconds()-header_receipt_time(st
->latest_rotate_header
));
233 st
->bytes_written_since_last_rotate
= 0LL;
234 st
->packets_written_since_last_rotate
= 0LL;
235 st
->hiq_since_last_rotate
= 0;
236 st
->packets_written_in_burst_since_last_rotate
= st
->bytes_written_in_burst_since_last_rotate
= 0LL ;
237 st
->loss_since_last_rotate
= 0LL;
238 st
->last_rotate
= now
;
240 LOG_INF("Dequeuer stats summary v2:\t%d\t%lld\t%lld\t%lld\t%lld\t%lld\t%lld\t%lld\t%d\t%d\t%d\t%d\t%lld\t%lld\t%d\n",
241 now
, st
->loss_since_last_rotate
,
242 st
->bytes_written_total
, st
->bytes_written_since_last_rotate
,
243 st
->packets_written_total
, st
->packets_written_since_last_rotate
,
244 st
->bytes_written_in_burst
, st
->packets_written_in_burst
, st
->hiq
,
245 st
->hiq_start
, st
->hiq_last
, st
->hiq_since_last_rotate
,
246 st
->bytes_written_in_burst_since_last_rotate
,
247 st
->packets_written_in_burst_since_last_rotate
, uptime
);
249 mondemand_dequeuer_stats(st
,now
);
252 void enqueuer_stats_report(struct enqueuer_stats
* st
)
260 time_t now
= time(NULL
);
263 if ( my_strftime(startbfr
, sizeof(startbfr
),
264 "%c", localtime_r(&st
->start_time
, &tm_st
)) == 0 )
266 LOG_ER("strftime failure");
269 if ( my_strftime(nowbfr
, sizeof(nowbfr
),
270 "%c", localtime_r(&now
, &tm_st
)) == 0 )
272 LOG_ER("strftime failure");
275 uptime
= now
- st
->start_time
;
277 rbps
= (8. * (double)st
->bytes_received_total
) / (double)uptime
;
278 rpps
= ((double)st
->packets_received_total
) / (double)uptime
;
280 LOG_INF("Total network traffic received:\n");
281 LOG_INF(" %lld bytes, %lld packets.\n",
282 st
->bytes_received_total
,
283 st
->packets_received_total
);
284 log_rates(LOG_INFO
,__FILE__
,__LINE__
,rbps
,rpps
," received");
287 void dequeuer_stats_report(struct dequeuer_stats
* st
)
295 time_t now
= time(NULL
);
298 if ( my_strftime(startbfr
, sizeof(startbfr
),
299 "%c", localtime_r(&st
->start_time
, &tm_st
)) == 0 )
301 LOG_ER("strftime failure");
304 if ( my_strftime(nowbfr
, sizeof(nowbfr
),
305 "%c", localtime_r(&now
, &tm_st
)) == 0 )
307 LOG_ER("strftime failure");
310 uptime
= now
- st
->start_time
;
312 wbps
= (8. * (double)st
->bytes_written_total
) / (double)uptime
;
313 wpps
= ((double)st
->packets_written_total
) / (double)uptime
;
315 LOG_INF("Total network traffic recorded:\n");
316 LOG_INF(" %lld bytes, %lld packets.\n",
317 st
->bytes_written_total
,
318 st
->packets_written_total
);
319 log_rates(LOG_INFO
,__FILE__
,__LINE__
,wbps
,wpps
," written");