2 * FCP adapter trace utility
6 * Copyright IBM Corp. 2008
7 * Author(s): Stefan Raspl <raspl@linux.vnet.ibm.com>
17 #include "ziomon_dacc.h"
18 #include "ziomon_util.h"
19 #include "ziomon_msg_tools.h"
22 #define ZIOMON_DACC_GARBAGE_MSG -1U
24 extern const char *toolname
;
28 * Structure of binary file:
30 * +-----+---+---+-----+---+---+---------+---+---+---------+- -+-----+ |
31 * | hdr | l | t | dat | l | t | dat | l | t | dat | | dat | |
32 * +-----+---+---+-----+---+---+---------+---+---+---------+- .... -+-----+ |
36 * The messages might differ in length, depending how many monitored devices
37 * were available in each interval. When we *would* exceed the limit, we wrap
38 * around, overwriting old messages. Hence it is utterly important that we keep
39 * the lengths 'l' intact, since we will have random garbage after one of the
40 * messages once we wrapped around for the first time.
41 * 't' denotes the type of the message, 'dat' is the (variable length) data.
42 * The length is specified as __u32 and specifies the length of the variable
43 * part of the message entry, beginning after the message type.
44 * 'length' will always specify the length of the variable part of the message
46 * Aggregated statistics are put into a separate file.
47 * Messages inside the aggregated file are written in fixed order (so we do not
48 * need struct file_header to figure out what is where).
49 * Note that we write a single garbage message for any message that might have
54 /* indicates whether we already wrapped or not */
55 static int wrapped
= -1;
58 static int open_count
= 0;
63 * Calculate size of the message. In contrast to the message's
64 * length attribute, this also includes the header */
65 static __u32
get_total_msg_size(struct message
*msg
) {
66 return (msg
->length
+ 8);
71 * Position at first _physical_ message, not necessarily the first
73 static int position_at_first_msg(FILE * fp
)
75 return fseek(fp
, sizeof(struct file_header
) - sizeof(__u64
), SEEK_SET
);
79 static void swap_header(struct file_header
*f_hdr
)
81 swap_32(f_hdr
->magic
);
82 swap_32(f_hdr
->version
);
83 swap_64(f_hdr
->size_limit
);
84 swap_32(f_hdr
->interval_length
);
85 swap_32(f_hdr
->msgid_utilization
);
86 swap_32(f_hdr
->msgid_ioerr
);
87 swap_32(f_hdr
->msgid_blkiomon
);
88 swap_32(f_hdr
->msgid_zfcpdd
);
89 swap_64(f_hdr
->end_time
);
90 swap_64(f_hdr
->first_msg_offset
);
91 swap_64(f_hdr
->begin_time
);
95 static void swap_msg_header(struct message
*msg
)
102 static int write_f_header(FILE *fp
, struct file_header
*f_hdr
)
108 if (fwrite(f_hdr
, sizeof(struct file_header
)
109 - sizeof(__u64
), 1, fp
) != 1) {
110 fprintf(stderr
, "%s: Failed to write"
111 " header\n", toolname
);
120 * write garbage of specified length to file, where 'length' is the total
121 * length of the garbage message.
122 * Note: This is only done for consistency, we rewind to start of garbage
123 * immediately so we may overwrite it next time again
125 static int write_garbage_message(FILE *fp
, int length
)
129 vverbose_msg("writing garbage at pos=%ld, total size=%d\n", ftell(fp
), length
);
130 msg
.type
= ZIOMON_DACC_GARBAGE_MSG
;
131 msg
.length
= length
- 8;
133 swap_msg_header(&msg
);
134 if (!fwrite(&msg
, 8, 1, fp
)) {
135 fprintf(stderr
, "%s: Error writing"
136 " garbage\n", toolname
);
144 static int read_message_header(FILE *fp
, __u32
*length
, __u32
*type
)
147 long pos
= ftell(fp
);
150 if (fread(length
, 4, 1, fp
) != 1) {
152 return 1; /* end of file reached */
154 fprintf(stderr
, "%s: Error reading"
155 " message length\n", toolname
);
159 if (fread(type
, 4, 1, fp
) != 1) {
160 fprintf(stderr
, "%s: Error reading message"
161 " type\n", toolname
);
167 vverbose_msg("read %smsg at pos=%ld, data size=%d\n",
168 (*type
== ZIOMON_DACC_GARBAGE_MSG
? "garbage " : ""), pos
, *length
);
174 * Read the next message.
175 * @param msgid_blkiomon: ID of blkiomon messages or IS_NO_BLKIOMON_MSG if we
176 * already know that it is none, or IS_BLKIOMON_MSG
179 #define IS_NO_BLKIOMON_MSG 0xfffffffe
180 #define IS_BLKIOMON_MSG 0xffffffff
182 static int read_message(FILE *fp
, struct message
*msg
, __u32 ver
,
183 __u32 msgid_blkiomon
)
187 if ( (rc
= read_message_header(fp
, &msg
->length
, &msg
->type
)) )
190 if (msg
->type
== ZIOMON_DACC_GARBAGE_MSG
) {
191 fseek(fp
, msg
->length
, SEEK_CUR
);
195 msg
->data
= malloc(msg
->length
);
196 if (fread(msg
->data
, msg
->length
, 1, fp
) != 1) {
197 fprintf(stderr
, "%s: Error reading %u Bytes message"
198 " content\n", toolname
, msg
->length
);
201 if (ver
== DATA_MGR_V2
&& msgid_blkiomon
!= IS_NO_BLKIOMON_MSG
202 && (msg
->type
== IS_BLKIOMON_MSG
|| msg
->type
== msgid_blkiomon
))
203 conv_blkiomon_v2_to_v3(msg
);
210 static int read_message_preview(FILE *fp
, struct message_preview
*msg
,
211 struct file_header
*f_hdr
)
215 msg
->pos
= ftell(fp
);
216 if ( (rc
= read_message_header(fp
, &msg
->length
, &msg
->type
)) )
219 /* Eventually read timestamp and fforward to next msg */
220 if (msg
->type
!= ZIOMON_DACC_GARBAGE_MSG
) {
221 /* per convention, the first 8 bytes of the actual message
222 * is the timestamp. */
223 assert(msg
->length
>= 8);
224 if (fread(&msg
->timestamp
, 8, 1, fp
) != 1) {
225 fprintf(stderr
, "%s: Error reading"
226 " message timestamp\n", toolname
);
229 swap_64(msg
->timestamp
);
230 fseek(fp
, msg
->length
- 8, SEEK_CUR
);
231 msg
->is_blkiomon_v2
= (f_hdr
->version
== DATA_MGR_V2
232 && msg
->type
== f_hdr
->msgid_blkiomon
);
235 fseek(fp
, msg
->length
, SEEK_CUR
);
242 * Add msg to the bunch of aggregated messages.
244 static int aggregate_message(FILE *fp
, struct message
***del_msg
,
245 int *num_del_msg
, struct file_header
*f_hdr
)
247 struct message tmp_msg
;
248 long cur_pos
= ftell(fp
);
249 struct message
**tmp
= *del_msg
;
252 /* read message and rewind */
253 if (read_message(fp
, &tmp_msg
, f_hdr
->version
, f_hdr
->msgid_blkiomon
))
255 fseek(fp
, cur_pos
, SEEK_SET
);
257 /* append the message that we read to the array */
258 if (tmp_msg
.type
!= ZIOMON_DACC_GARBAGE_MSG
) {
259 *del_msg
= malloc((*num_del_msg
+ 1)*sizeof(struct message
*));
260 for (i
= 0; i
< *num_del_msg
; ++i
)
261 (*del_msg
)[i
] = tmp
[i
];
262 if (*num_del_msg
> 0)
264 // add new msg at end
265 (*del_msg
)[*num_del_msg
] = malloc(sizeof(struct message
));
266 *(*del_msg
)[*num_del_msg
] = tmp_msg
;
275 * Calculate next message's size, including header, then rewind to current position
277 static int get_next_msg_size(FILE *fp
, int *length
) {
278 if (fread(length
, 4, 1, fp
) != 1)
280 fseek(fp
, -4, SEEK_CUR
);
289 * Position at the right place in the file.
290 * Returns 0 on success, <0 on error and >0 if an additional gargabe message
291 * has to be written after this message. If so, the value returned
292 * is length of the garbage message.
294 static __s32
position_in_file(FILE *fp
, struct message
*msg
,
295 struct file_header
*f_hdr
,
296 struct message
***del_msg
, int *num_del_msg
)
298 __s32 next_msg_length
;
304 * Are we at the end of the file?
306 if (get_next_msg_size(fp
, &next_msg_length
)) {
307 /* end of file reached - size limit also reached? */
308 if ((__u64
)ftell(fp
) + get_total_msg_size(msg
) > f_hdr
->size_limit
) {
309 if (position_at_first_msg(fp
) < 0)
311 vverbose_msg("end reached - WRAP\n");
312 /* we have rewound, but need to aggregate first few messages now,
313 so we better go on... */
316 return 0; /* still enough space left - finished */
320 * Is there still enough room for our message?
322 if ((__u64
)ftell(fp
) + get_total_msg_size(msg
) > f_hdr
->size_limit
) {
323 /* doesn't fit in anymore - wrap around */
324 vverbose_msg("msg doesn't fit anymore - WRAP\n");
325 /* aggregate final message so all msgs are still in sequence */
326 if (get_next_msg_size(fp
, &next_msg_length
))
328 if (aggregate_message(fp
, del_msg
, num_del_msg
, f_hdr
))
330 if (write_garbage_message(fp
, next_msg_length
) < 0
331 || position_at_first_msg(fp
) < 0)
336 * Aggregate messages that will be overwritten
338 vverbose_msg("make room for new message\n");
339 start_pos
= ftell(fp
);
340 while (cum_size
< (long)get_total_msg_size(msg
)) {
341 if (get_next_msg_size(fp
, &next_msg_length
)) {
342 /* end of file reached, but we checked before
343 that it will fit in */
344 cum_size
= get_total_msg_size(msg
);; /* prevents garbage msg */
347 if (aggregate_message(fp
, del_msg
, num_del_msg
, f_hdr
))
349 cum_size
+= next_msg_length
;
350 fseek(fp
, next_msg_length
, SEEK_CUR
);
351 vverbose_msg(" read msg, cum_size=%d, needed=%d\n", cum_size
, get_total_msg_size(msg
));
353 if (cum_size
- get_total_msg_size(msg
) > 0
354 && cum_size
- get_total_msg_size(msg
) < 8) {
355 /* garbage message required, but even minimum size
356 would overwrite start of next msg - aggregate one more! */
357 if (get_next_msg_size(fp
, &next_msg_length
)) {
358 /* corner case: we are at EOF, but our message
359 doesn't delete everything from the previous one.
360 So we write the minimum garbage possible*/
361 vverbose_msg("garbage needed, but end of file reached\n");
365 vverbose_msg("garbage needed, but doesn't fit - aggregate one more\n");
366 if (aggregate_message(fp
, del_msg
, num_del_msg
, f_hdr
))
368 cum_size
+= next_msg_length
;
372 rc
= cum_size
- get_total_msg_size(msg
);
373 fseek(fp
, start_pos
, SEEK_SET
);
379 static int write_message(FILE *fp
, struct message
*msg
)
381 vverbose_msg("write msg at pos=%ld, total size=%d\n", ftell(fp
),
382 get_total_msg_size(msg
));
383 swap_msg_header(msg
);
384 if (!fwrite(&msg
->length
, 4, 1, fp
)) {
385 fprintf(stderr
, "%s: Writing of length"
386 " failed\n", toolname
);
387 swap_msg_header(msg
);
390 if (!fwrite(&msg
->type
, 4, 1, fp
)) {
391 fprintf(stderr
, "%s: Writing of type failed\n", toolname
);
392 swap_msg_header(msg
);
395 swap_msg_header(msg
);
397 if (!fwrite(msg
->data
, msg
->length
, 1, fp
)) {
398 fprintf(stderr
, "%s: Writing of message data"
399 " failed\n", toolname
);
407 int add_msg(FILE *fp
, struct message
*msg
, struct file_header
*f_hdr
,
408 struct message
***del_msg
, int *num_del_msg
)
411 long cur_pos
, old_pos
= ftell(fp
);
414 add_garbage
= position_in_file(fp
, msg
, f_hdr
, del_msg
, num_del_msg
);
418 if (write_message(fp
, msg
) < 0)
421 if (add_garbage
> 0) {
423 if (write_garbage_message(fp
, add_garbage
) < 0)
425 fseek(fp
, cur_pos
, SEEK_SET
);
428 f_hdr
->end_time
= *(__u64
*)(msg
->data
);
429 swap_64(f_hdr
->end_time
); /* msg content is BE by convention */
431 if (f_hdr
->first_msg_offset
!= 0 || cur_pos
< old_pos
)
432 f_hdr
->first_msg_offset
= ftell(fp
);
433 if (write_f_header(fp
, f_hdr
))
435 fseek(fp
, cur_pos
, SEEK_SET
);
441 int init_file(FILE *fp
, struct file_header
*f_hdr
, long version
)
443 f_hdr
->magic
= DATA_MGR_MAGIC
;
445 f_hdr
->version
= DATA_MGR_V2
;
446 else if (version
== 3)
447 f_hdr
->version
= DATA_MGR_V3
;
449 fprintf(stderr
, "%s: Unsupported version: %ld\n",
453 f_hdr
->first_msg_offset
= 0;
455 f_hdr
->begin_time
= 0;
457 if (write_f_header(fp
, f_hdr
))
464 static int check_version(__u32 ver
) {
465 if (ver
!= DATA_MGR_V2
&& ver
!= DATA_MGR_V3
) {
466 fprintf(stderr
, "%s: Wrong version: .log data is in version %u"
467 " format, while this tool only supports version %u"
469 " Get the matching tool version and try again.\n",
470 toolname
, ver
, DATA_MGR_V2
, DATA_MGR_V3
);
478 static int get_header(FILE *fp
, struct file_header
*hdr
)
481 if (fread(hdr
, sizeof(struct file_header
)
482 - sizeof(__u64
), 1, fp
) != 1) {
483 fprintf(stderr
, "%s: Could not read header\n", toolname
);
487 if (hdr
->magic
!= DATA_MGR_MAGIC
) {
488 fprintf(stderr
, "%s: Unregocgnized data in .log file.\n",
492 if (check_version(hdr
->version
))
500 int open_log_file(FILE **fp
, const char *filename
, struct file_header
*fhdr
)
504 struct message_preview msg_prev
;
506 fname
= (char*)malloc(strlen(filename
) + strlen(DACC_FILE_EXT_LOG
) + 1);
507 sprintf(fname
, "%s%s", filename
, DACC_FILE_EXT_LOG
);
508 *fp
= fopen(fname
, "r");
510 fprintf(stderr
, "%s: Could not open %s"
511 " - file not accessible?", toolname
, fname
);
515 if (get_header(*fp
, fhdr
)) {
519 if (get_next_msg_preview(*fp
, &msg_prev
, fhdr
)) {
523 rewind_to(*fp
, &msg_prev
);
524 fhdr
->begin_time
= msg_prev
.timestamp
;
535 void close_log_file(FILE *fp
)
543 void close_data_files(FILE *fp
)
547 assert(open_count
== 0);
553 static void swap_agg_header(struct aggr_data
*hdr
)
556 swap_32(hdr
->version
);
557 swap_64(hdr
->begin_time
);
558 swap_64(hdr
->num_zfcpdd
);
559 swap_64(hdr
->num_blkiomon
);
560 swap_64(hdr
->end_time
);
564 static void conv_agg_header_from_BE(struct aggr_data
*hdr
) {
565 swap_agg_header(hdr
);
569 static void conv_agg_header_to_BE(struct aggr_data
*hdr
) {
570 swap_agg_header(hdr
);
574 static int read_aggr_file(FILE *fp
, struct aggr_data
*data
)
581 if (fread(data
, DACC_AGGR_FILE_HDR_LEN
, 1, fp
) != 1) {
582 fprintf(stderr
, "%s: Error reading aggregation"
583 " content\n", toolname
);
586 conv_agg_header_from_BE(data
);
587 if (data
->magic
!= DATA_MGR_MAGIC_AGGR
) {
588 fprintf(stderr
, "%s: Unregocgnized data in .agg file.\n",
592 if (check_version(data
->version
))
595 data
->util_aggr
= NULL
;
596 if ( (rc
= read_message(fp
, &msg
, data
->version
, IS_NO_BLKIOMON_MSG
)) < 0)
599 if (msg
.type
!= ZIOMON_DACC_GARBAGE_MSG
) {
600 data
->util_aggr
= malloc(sizeof(struct message
));
601 *(data
->util_aggr
) = msg
;
604 data
->ioerr_aggr
= NULL
;
605 if ( (rc
= read_message(fp
, &msg
, data
->version
, IS_NO_BLKIOMON_MSG
)) < 0)
607 if (msg
.type
!= ZIOMON_DACC_GARBAGE_MSG
) {
608 data
->ioerr_aggr
= malloc(sizeof(struct message
));
609 *(data
->ioerr_aggr
) = msg
;
612 if (data
->num_blkiomon
> 0) {
613 data
->blkio_aggr
= calloc(data
->num_blkiomon
, sizeof(struct message
*));
614 for (i
=0; i
<data
->num_blkiomon
; ++i
) {
615 if ( (rc
= read_message(fp
, &msg
, data
->version
, IS_BLKIOMON_MSG
)) < 0)
617 data
->blkio_aggr
[i
] = malloc(sizeof(struct message
));
618 *(data
->blkio_aggr
[i
]) = msg
;
622 /* this _must_ be a garbage message */
623 data
->blkio_aggr
= NULL
;
624 if ( (rc
= read_message(fp
, &msg
, data
->version
, IS_NO_BLKIOMON_MSG
)) < 0)
628 if (data
->num_zfcpdd
> 0) {
629 data
->zfcpdd_aggr
= calloc(data
->num_zfcpdd
, sizeof(struct message
*));
630 for (i
=0; i
<data
->num_zfcpdd
; ++i
) {
631 if ( (rc
= read_message(fp
, &msg
, data
->version
, IS_BLKIOMON_MSG
)) < 0)
633 data
->zfcpdd_aggr
[i
] = malloc(sizeof(struct message
));
634 *(data
->zfcpdd_aggr
[i
]) = msg
;
638 /* this _must_ be a garbage message */
639 data
->zfcpdd_aggr
= NULL
;
640 if ( (rc
= read_message(fp
, &msg
, data
->version
, IS_NO_BLKIOMON_MSG
)) < 0)
648 int open_agg_file(FILE **fp
, const char *filename
, struct aggr_data
*agg
)
653 fname
= (char*)malloc(strlen(filename
) + strlen(DACC_FILE_EXT_AGG
) + 1);
654 sprintf(fname
, "%s%s", filename
, DACC_FILE_EXT_AGG
);
657 if (access(fname
, F_OK
) != 0) {
661 *fp
= fopen(fname
, "r");
663 fprintf(stderr
, "%s: Could not open %s"
664 " - file not accessible?", toolname
, fname
);
668 if (read_aggr_file(*fp
, agg
)) {
681 void close_agg_file(FILE *fp
)
688 static int seek_initial_file_pos(FILE *fp
, struct file_header
*f_hdr
)
693 if (f_hdr
->first_msg_offset
)
694 pos
= f_hdr
->first_msg_offset
;
696 pos
= sizeof(struct file_header
) - sizeof(__u64
);
697 rc
= 1; /* no need to wrap */
699 fseek(fp
, pos
, SEEK_SET
);
705 int get_next_msg(FILE *fp
, struct message
*msg
, struct file_header
*f_hdr
)
710 wrapped
= seek_initial_file_pos(fp
, f_hdr
);
713 if (f_hdr
->first_msg_offset
!= 0 && wrapped
714 && ftell(fp
) >= (long long)f_hdr
->first_msg_offset
)
715 return 1; /* final msg read */
717 rc
= read_message(fp
, msg
, f_hdr
->version
, f_hdr
->msgid_blkiomon
);
718 if (rc
> 0 && !wrapped
) {
719 position_at_first_msg(fp
);
720 rc
= read_message(fp
, msg
, f_hdr
->version
, f_hdr
->msgid_blkiomon
);
723 } while (!rc
&& msg
->type
== ZIOMON_DACC_GARBAGE_MSG
);
729 int get_next_msg_preview(FILE *fp
, struct message_preview
*msg
,
730 struct file_header
*f_hdr
)
735 wrapped
= seek_initial_file_pos(fp
, f_hdr
);
738 if (f_hdr
->first_msg_offset
!= 0 && wrapped
739 && ftell(fp
) >= (long long)f_hdr
->first_msg_offset
)
740 return 1; /* final msg read */
742 rc
= read_message_preview(fp
, msg
, f_hdr
);
743 if (rc
> 0 && !wrapped
) {
744 position_at_first_msg(fp
);
745 rc
= read_message_preview(fp
, msg
, f_hdr
);
748 } while (!rc
&& msg
->type
== ZIOMON_DACC_GARBAGE_MSG
);
754 void rewind_to(FILE *fp
, struct message_preview
*msg
)
756 assert(msg
->pos
> 0);
757 fseek(fp
, msg
->pos
, SEEK_SET
);
761 int get_complete_msg(FILE *fp
, struct message_preview
*msg_prev
,
764 long pos
= ftell(fp
);
767 fseek(fp
, msg_prev
->pos
, SEEK_SET
);
768 if (msg_prev
->is_blkiomon_v2
)
769 // make sure message is converted
770 rc
= read_message(fp
, msg
, DATA_MGR_V2
, msg_prev
->type
);
772 // use an arbitrary version != V2
773 rc
= read_message(fp
, msg
, DATA_MGR_V3
, msg_prev
->type
);
774 fseek(fp
, pos
, SEEK_SET
);
780 void discard_msg(struct message
*msg
)
789 int write_aggr_file(FILE *fp
, struct aggr_data
*data
)
793 conv_agg_header_to_BE(data
);
795 i
= fwrite(data
, DACC_AGGR_FILE_HDR_LEN
, 1, fp
);
796 conv_agg_header_from_BE(data
);
800 if (data
->util_aggr
) {
801 if (write_message(fp
, data
->util_aggr
))
804 else if (write_garbage_message(fp
, 8))
807 if (data
->ioerr_aggr
) {
808 if (write_message(fp
, data
->ioerr_aggr
))
811 else if (write_garbage_message(fp
, 8))
814 if (data
->num_blkiomon
> 0) {
815 for (i
= 0; i
<data
->num_blkiomon
; ++i
) {
816 if (write_message(fp
, data
->blkio_aggr
[i
]))
820 else if (write_garbage_message(fp
, 8))
823 if (data
->num_zfcpdd
> 0) {
824 for (i
=0; i
<data
->num_zfcpdd
; ++i
) {
825 if (write_message(fp
, data
->zfcpdd_aggr
[i
]))
829 else if (write_garbage_message(fp
, 8))
836 void init_aggr_data_struct(struct aggr_data
*data
)
838 data
->magic
= DATA_MGR_MAGIC_AGGR
;
839 data
->version
= DATA_MGR_V2
;
840 data
->num_zfcpdd
= 0;
841 data
->num_blkiomon
= 0;
843 data
->begin_time
= 0;
844 data
->util_aggr
= NULL
;
845 data
->ioerr_aggr
= NULL
;
846 data
->blkio_aggr
= NULL
;
847 data
->zfcpdd_aggr
= NULL
;
851 void discard_aggr_data_struct(struct aggr_data
*data
)
856 discard_msg(data
->util_aggr
);
857 discard_msg(data
->ioerr_aggr
);
858 for (i
=0; i
<data
->num_blkiomon
; ++i
) {
859 discard_msg(data
->blkio_aggr
[i
]);
860 free(data
->blkio_aggr
[i
]);
862 for (i
=0; i
<data
->num_zfcpdd
; ++i
) {
863 discard_msg(data
->zfcpdd_aggr
[i
]);
864 free(data
->zfcpdd_aggr
[i
]);
866 free(data
->util_aggr
);
867 free(data
->ioerr_aggr
);
868 free(data
->blkio_aggr
);
869 free(data
->zfcpdd_aggr
);
874 int open_data_files(FILE **fp
, const char *filename
, struct file_header
*f_hdr
,
875 struct aggr_data
**agg
)
877 struct message_preview msg_prev
;
885 assert(open_count
== 0);
889 verbose_msg("open data\n");
892 * Open .agg file if exists
895 *agg
= (struct aggr_data
*)malloc(sizeof(struct aggr_data
));
896 if ( (rc
= open_agg_file(fp
, filename
, *agg
)) < 0 ) {
901 verbose_msg(" found .agg file\n");
905 verbose_msg(" no .agg file found\n");
913 if ( (rc
= open_log_file(fp
, filename
, f_hdr
)) )
917 * Eventually add messages from final frame of .agg file and adjust
918 * respective boundaries
922 conv_aggr_data_msg_data_from_BE(*agg
);
924 /* We use the first message that we have as the basis to
925 calculate when the final timeframe of the .agg data
926 would have ended. Note that we always add all messages
927 that are interval/2 after that timestamp! */
928 end_of_agg
= ((*agg
)->end_time
- (*agg
)->begin_time
-
929 f_hdr
->interval_length
/ 2) % f_hdr
->interval_length
;
931 end_of_agg
= (*agg
)->end_time
932 + (f_hdr
->interval_length
- end_of_agg
);
935 while ( (rc
= get_next_msg_preview(*fp
, &msg_prev
, f_hdr
)) == 0
936 && msg_prev
.timestamp
<= end_of_agg
) {
937 if (get_complete_msg(*fp
, &msg_prev
, &msg
))
939 rc
= add_to_agg(*agg
, &msg
, f_hdr
);
946 fprintf(stderr
, "%s: Could not read"
947 " any messages in %s%s\n", toolname
, filename
,
951 // condition of the check impossible to fail, but still...
952 if (msg_prev
.timestamp
> end_of_agg
)
953 rewind_to(*fp
, &msg_prev
);
955 // finally, adjust boundaries
956 (*agg
)->end_time
= end_of_agg
- f_hdr
->interval_length
/ 2;
957 f_hdr
->begin_time
= (*agg
)->end_time
+ f_hdr
->interval_length
;
959 t
= (*agg
)->end_time
;
960 verbose_msg(" adjust agg end time to : %s",
962 t
= (*agg
)->begin_time
;
963 verbose_msg(" adjust log begin time to: %s",
967 conv_aggr_data_msg_data_to_BE(*agg
);
968 verbose_msg(" added %d messages to aggregated structure\n",
972 verbose_msg("open data finished\n");