Log the queue size before and after journal rotation, to see if rotation is killing us.
[lwes-journaller.git] / src / queue_to_journal.c
blobb467b0728961e36480ebe1171f8bef5ddf6be45e
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
22 #include "header.h"
23 #include "queue_to_journal.h"
25 #include "journal.h"
26 #include "log.h"
27 #include "opt.h"
28 #include "queue.h"
29 #include "sig.h"
30 #include "stats.h"
31 #include "perror.h"
32 #include "xport.h"
33 #include "stats.h"
35 #include <fcntl.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <unistd.h>
41 struct dequeuer_stats dst ;
43 void* queue_to_journal(void* arg)
45 struct queue que;
46 struct journal* jrn;
47 int jc;
48 int jcurr = 0;
49 void* buf = NULL ;
50 size_t bufsiz;
51 int pending = 0, write_pending = 0;
53 (void)arg; /* appease -Wall -Werror */
55 dequeuer_stats_ctor(&dst);
57 install_signal_handlers();
58 install_rotate_signal_handlers(); /* This is the process or thread
59 that does the rotate. */
60 /* Create queue object. */
61 if ( (queue_factory(&que) < 0) || (que.vtbl->open(&que, O_RDONLY) < 0) )
63 LOG_ER("Failed to create or open the queue.\n");
64 exit(EXIT_FAILURE);
67 jrn = (struct journal*)malloc(arg_njournalls * sizeof(struct journal));
68 if ( 0 == jrn )
70 LOG_ER("Failed to allocate space (%d bytes) for journal objects.\n",
71 arg_njournalls * sizeof(struct journal));
72 exit(EXIT_FAILURE);
75 for ( jc=0; jc<arg_njournalls; ++jc )
77 /* Create journal objects. */
78 if ( journal_factory(&jrn[jc], arg_journalls[jc]) < 0 )
80 LOG_ER("Failed to create journal object for \"%s\".\n",
81 arg_journalls[jc]);
82 exit(EXIT_FAILURE);
86 if ( jrn[jcurr].vtbl->open(&jrn[jcurr], O_WRONLY) < 0 )
88 LOG_ER("Failed to open the journal \"%s\".\n",
89 arg_journalls[jcurr]);
90 exit(EXIT_FAILURE);
93 buf = que.vtbl->alloc(&que, &bufsiz);
94 if ( NULL == buf )
96 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz);
97 exit(EXIT_FAILURE);
100 /* Read a packet from the queue, write it to the journal. */
101 while ( 1 )
103 int que_read_ret;
104 int jrn_write_ret;
106 /* If we have a pending rotate to perform, do it now. */
107 if ( gbl_rotate )
108 { // <gbl_rotate>
110 LOG_INF("About to rotate journal (%d pending).\n", pending);
111 write_pending = 1;
113 dequeuer_stats_rotate(&dst);
114 if ( jrn[jcurr].vtbl->close(&jrn[jcurr]) < 0 )
116 LOG_ER("Can't close journal \"%s\".\n", arg_journalls[jcurr]);
117 exit(EXIT_FAILURE);
120 jcurr = (jcurr + 1) % arg_njournalls;
122 if ( jrn[jcurr].vtbl->open(&jrn[jcurr], O_WRONLY) < 0 )
124 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls[jcurr]);
125 exit(EXIT_FAILURE);
128 gbl_rotate = 0;
129 } // </gbl_rotate>
131 if ( (que_read_ret = que.vtbl->read(&que, buf, bufsiz, &pending)) < 0 )
133 /* queue is empty */
134 if (gbl_done) break; /* if we're shutting down, exit this loop. */
135 continue; /* no event, so do not process the rest */
137 LOG_PROG("Read %d bytes from queue (%d pending).\n",
138 que_read_ret, pending);
139 if (write_pending)
141 LOG_INF("Done with rotating journal (%d pending).\n", pending);
142 write_pending = 0;
145 // is this a command event?
146 if ( header_is_rotate(buf) )
147 { // Command::Rotate
148 // is it a new enough Command::Rotate, or masked out?
149 memcpy(&dst.latest_rotate_header, buf, HEADER_LENGTH) ;
150 gbl_rotate = 1;
153 dequeuer_stats_record(&dst, que_read_ret-HEADER_LENGTH, pending);
154 /* Write the packet out to the journal. */
155 if ( (jrn_write_ret = jrn[jcurr].vtbl->write(&jrn[jcurr],
156 buf, que_read_ret))
157 != que_read_ret )
159 LOG_ER("Journal write error -- attempted to write %d bytes, "
160 "write returned %d.\n", que_read_ret, jrn_write_ret);
161 dequeuer_stats_record_loss(&dst);
163 } /* while ( ! gdb_done) */
165 dequeuer_stats_rotate(&dst);
166 if ( jrn[jcurr].vtbl->close(&jrn[jcurr]) < 0 )
168 LOG_ER("Can't close journal \"%s\".\n", arg_journalls[jcurr]);
170 for ( jc=0; jc<arg_njournalls; ++jc )
172 jrn[jc].vtbl->destructor(&jrn[jc]);
174 free(jrn);
176 /* Empty the journaller system queue upon shutdown */
177 while ( (que.vtbl->read(&que, buf, bufsiz, &pending) >= 0)
178 && arg_queue_max_cnt-- )
181 que.vtbl->dealloc(&que, buf);
182 que.vtbl->destructor(&que);
184 dequeuer_stats_report(&dst);
186 return 0;