1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
23 #include "queue_to_journal.h"
41 struct dequeuer_stats dst
;
43 void* queue_to_journal(void* arg
)
51 int pending
= 0, write_pending
= 0;
53 (void)arg
; /* appease -Wall -Werror */
55 dequeuer_stats_ctor(&dst
);
57 install_signal_handlers();
58 install_rotate_signal_handlers(); /* This is the process or thread
59 that does the rotate. */
60 /* Create queue object. */
61 if ( (queue_factory(&que
) < 0) || (que
.vtbl
->open(&que
, O_RDONLY
) < 0) )
63 LOG_ER("Failed to create or open the queue.\n");
67 jrn
= (struct journal
*)malloc(arg_njournalls
* sizeof(struct journal
));
70 LOG_ER("Failed to allocate space (%d bytes) for journal objects.\n",
71 arg_njournalls
* sizeof(struct journal
));
75 for ( jc
=0; jc
<arg_njournalls
; ++jc
)
77 /* Create journal objects. */
78 if ( journal_factory(&jrn
[jc
], arg_journalls
[jc
]) < 0 )
80 LOG_ER("Failed to create journal object for \"%s\".\n",
86 if ( jrn
[jcurr
].vtbl
->open(&jrn
[jcurr
], O_WRONLY
) < 0 )
88 LOG_ER("Failed to open the journal \"%s\".\n",
89 arg_journalls
[jcurr
]);
93 buf
= que
.vtbl
->alloc(&que
, &bufsiz
);
96 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz
);
100 /* Read a packet from the queue, write it to the journal. */
106 /* If we have a pending rotate to perform, do it now. */
110 LOG_INF("About to rotate journal (%d pending).\n", pending
);
113 dequeuer_stats_rotate(&dst
);
114 if ( jrn
[jcurr
].vtbl
->close(&jrn
[jcurr
]) < 0 )
116 LOG_ER("Can't close journal \"%s\".\n", arg_journalls
[jcurr
]);
120 jcurr
= (jcurr
+ 1) % arg_njournalls
;
122 if ( jrn
[jcurr
].vtbl
->open(&jrn
[jcurr
], O_WRONLY
) < 0 )
124 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls
[jcurr
]);
131 if ( (que_read_ret
= que
.vtbl
->read(&que
, buf
, bufsiz
, &pending
)) < 0 )
134 if (gbl_done
) break; /* if we're shutting down, exit this loop. */
135 continue; /* no event, so do not process the rest */
137 LOG_PROG("Read %d bytes from queue (%d pending).\n",
138 que_read_ret
, pending
);
141 LOG_INF("Done with rotating journal (%d pending).\n", pending
);
145 // is this a command event?
146 if ( header_is_rotate(buf
) )
148 // is it a new enough Command::Rotate, or masked out?
149 memcpy(&dst
.latest_rotate_header
, buf
, HEADER_LENGTH
) ;
153 dequeuer_stats_record(&dst
, que_read_ret
-HEADER_LENGTH
, pending
);
154 /* Write the packet out to the journal. */
155 if ( (jrn_write_ret
= jrn
[jcurr
].vtbl
->write(&jrn
[jcurr
],
159 LOG_ER("Journal write error -- attempted to write %d bytes, "
160 "write returned %d.\n", que_read_ret
, jrn_write_ret
);
161 dequeuer_stats_record_loss(&dst
);
163 } /* while ( ! gdb_done) */
165 dequeuer_stats_rotate(&dst
);
166 if ( jrn
[jcurr
].vtbl
->close(&jrn
[jcurr
]) < 0 )
168 LOG_ER("Can't close journal \"%s\".\n", arg_journalls
[jcurr
]);
170 for ( jc
=0; jc
<arg_njournalls
; ++jc
)
172 jrn
[jc
].vtbl
->destructor(&jrn
[jc
]);
176 /* Empty the journaller system queue upon shutdown */
177 while ( (que
.vtbl
->read(&que
, buf
, bufsiz
, &pending
) >= 0)
178 && arg_queue_max_cnt
-- )
181 que
.vtbl
->dealloc(&que
, buf
);
182 que
.vtbl
->destructor(&que
);
184 dequeuer_stats_report(&dst
);