Must use (void) instead of () for function declarations.
[lwes-journaller.git] / src / queue_to_journal.c
blob757a2494a52b154e4107152d0024ae06733548af
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
22 #include "header.h"
23 #include "queue_to_journal.h"
25 #include "journal.h"
26 #include "log.h"
27 #include "opt.h"
28 #include "peer.h"
29 #include "queue.h"
30 #include "sig.h"
31 #include "stats.h"
32 #include "perror.h"
33 #include "xport.h"
34 #include "stats.h"
36 #include <fcntl.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <unistd.h>
42 struct stats st ;
44 void* queue_to_journal(void* arg)
46 struct queue que;
47 struct journal* jrn;
48 int jc;
49 int jcurr = 0;
50 void* buf = NULL ;
51 size_t bufsiz;
52 int pending ;
54 time_t last_rotate = 0 ; //time(NULL);
55 time_t this_rotate ;
57 int sink_ram_count = 0 ;
58 time_t sink_rotate = 0 ; // latest sink rotate
59 (void)arg; /* appease -Wall -Werror */
61 stats_ctor(&st);
63 install_signal_handlers();
64 install_rotate_signal_handlers(); /* This is the process or thread
65 that does the rotate. */
66 /* Create queue object. */
67 if ( (queue_factory(&que) < 0) || (que.vtbl->open(&que, O_RDONLY) < 0) )
69 LOG_ER("Failed to create or open the queue.\n");
70 exit(EXIT_FAILURE);
73 jrn = (struct journal*)malloc(arg_njournalls * sizeof(struct journal));
74 if ( 0 == jrn )
76 LOG_ER("Failed to allocate space (%d bytes) for journal objects.\n",
77 arg_njournalls * sizeof(struct journal));
78 exit(EXIT_FAILURE);
81 for ( jc=0; jc<arg_njournalls; ++jc )
83 /* Create journal objects. */
84 if ( journal_factory(&jrn[jc], arg_journalls[jc]) < 0 )
86 LOG_ER("Failed to create journal object for \"%s\".\n",
87 arg_journalls[jc]);
88 exit(EXIT_FAILURE);
92 if ( jrn[jcurr].vtbl->open(&jrn[jcurr], O_WRONLY) < 0 )
94 LOG_ER("Failed to open the journal \"%s\".\n",
95 arg_journalls[jcurr]);
96 exit(EXIT_FAILURE);
99 buf = que.vtbl->alloc(&que, &bufsiz);
100 if ( NULL == buf )
102 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz);
103 exit(EXIT_FAILURE);
106 /* Read a packet from the queue, write it to the journal. */
107 while ( ! gbl_done )
109 int que_read_ret;
110 int jrn_write_ret;
112 /* If we have a pending rotate to perform, do it now. */
113 if ( gbl_rotate )
114 { // <gbl_rotate>
116 LOG_PROG("About to rotate journal.\n");
118 stats_rotate(&st);
119 if ( jrn[jcurr].vtbl->close(&jrn[jcurr]) < 0 )
121 LOG_ER("Can't close journal \"%s\".\n", arg_journalls[jcurr]);
122 exit(EXIT_FAILURE);
125 jcurr = (jcurr + 1) % arg_njournalls;
127 if ( jrn[jcurr].vtbl->open(&jrn[jcurr], O_WRONLY) < 0 )
129 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls[jcurr]);
130 exit(EXIT_FAILURE);
133 gbl_rotate = 0;
135 sink_rotate = time(NULL) ;
136 sink_ram_count = 0 ;
137 } // </gbl_rotate>
138 else if ( arg_sink_ram != NULL ) /* perhaps /sink/ram ? */
140 /* time to rotate sink-ram ? */
141 if ( sink_ram_count++ >= 10000 )
143 /* was last rotate in this second? */
144 if ( time(NULL) == sink_rotate )
146 /* then we'll wait a bit before rotate_sink_ram() */
147 sink_ram_count = 9000 ;
149 else
151 if ( jrn[jcurr].vtbl->close(&jrn[jcurr]) < 0 )
153 LOG_ER("Can't close sink-ram journal \"%s\".\n",
154 arg_journalls[jcurr]);
155 exit(EXIT_FAILURE);
157 jcurr = (jcurr + 1) % arg_njournalls;
158 if ( jrn[jcurr].vtbl->open(&jrn[jcurr], O_WRONLY) < 0 )
160 LOG_ER("Failed to open sink-ram journal \"%s\".\n",
161 arg_journalls[jcurr]);
162 exit(EXIT_FAILURE);
164 sink_rotate = time(NULL) ;
165 sink_ram_count = 0 ;
170 if ( (que_read_ret = que.vtbl->read(&que, buf, bufsiz, &pending)) < 0 )
172 /* queue is empty, is panic over? */
173 if ( journaller_panic_mode == PANIC_IN_EFFECT )
175 journaller_panic_mode = PANIC_SHUTDOWN ; /* panic is over */
176 LOG_INF("PANIC is over: hi-burst: %lli packets, %lli bytes.\n",
177 st.packets_in_burst_since_last_rotate,
178 st.bytes_in_burst_since_last_rotate);
180 else
182 /* queue is empty, is hurry-up over? */
183 if ( journaller_panic_mode == PANIC_HURRYUP )
185 journaller_panic_mode = PANIC_NOT ; // panic is over
186 LOG_INF("HURRYUP is over: hi-burst: %lli packets, "
187 "%lli bytes.\n",
188 st.packets_in_burst_since_last_rotate,
189 st.bytes_in_burst_since_last_rotate);
192 continue;
194 LOG_PROG("Read %d bytes from queue (%d pending).\n",
195 que_read_ret, pending);
197 // is this a Command::Rotate?
198 switch ( header_is_rotate(buf, &this_rotate) )
200 case 2:
201 ping(buf, que_read_ret) ;
202 goto fallthru ;
204 case 1:
205 { // Command::Rotate
206 // is it a new enough Command::Rotate, or masked out?
207 time_t since = this_rotate - last_rotate;
208 if ( since < arg_rotate_mask )
210 continue ; // don't respond to duplicate Command::Rotate's
213 last_rotate = this_rotate ;
214 memcpy(&st.latest_rotate_header, buf, HEADER_LENGTH) ;
215 gbl_rotate = 1;
217 peer_correlate(buf, que_read_ret);
218 //TODO: ?what did they mean by this? gdw.2006.11.28
219 // fall through to write this Command::Rotate out before
220 // looping to actually rotate.
223 default:
224 fallthru:
225 // if hurry-up or panic, discard non-revenue-bearing events
226 switch ( journaller_panic_mode )
228 case PANIC_HURRYUP: // clear hurry-up mode if it has worked ...
229 if ( pending <= (arg_queue_max_cnt*arg_hurrydown_at)/100 )
231 LOG_INF("HURRYUP finish, pending=%i <= %i hi-burst: "
232 "%lli packets, %lli bytes.\n",
233 pending,(arg_queue_max_cnt*arg_hurrydown_at)/100,
234 st.packets_in_burst_since_last_rotate,
235 st.bytes_in_burst_since_last_rotate);
236 journaller_panic_mode = PANIC_NOT ;
238 else
240 if ( non_revenue_bearing(buf) )
242 continue ;
246 break ;
248 case PANIC_IN_EFFECT: // clear panic mode if it has worked ...
249 if ( pending <= 100 )
250 { // queue is empty, is panic over?
251 journaller_panic_mode = PANIC_SHUTDOWN ; // panic is over
252 LOG_INF("PANIC is over: hi-burst: %lli packets, "
253 "%lli bytes.\n",
254 st.packets_in_burst_since_last_rotate,
255 st.bytes_in_burst_since_last_rotate);
257 else
259 if ( non_revenue_bearing(buf) )
261 continue ;
264 break ;
266 default:
267 break ;
270 stats_record(&st, que_read_ret, pending);
271 /* Write the packet out to the journal. */
272 if ( (jrn_write_ret = jrn[jcurr].vtbl->write(&jrn[jcurr],
273 buf, que_read_ret))
274 != que_read_ret )
276 LOG_ER("Journal write error -- attempted to write %d bytes, "
277 "write returned %d.\n", que_read_ret, jrn_write_ret);
278 stats_record_loss(&st);
281 } /* while ( ! gdb_done) */
283 stats_rotate(&st);
284 if ( jrn[jcurr].vtbl->close(&jrn[jcurr]) < 0 )
286 LOG_ER("Can't close journal \"%s\".\n", arg_journalls[jcurr]);
288 for ( jc=0; jc<arg_njournalls; ++jc )
290 jrn[jc].vtbl->destructor(&jrn[jc]);
292 free(jrn);
294 /* Empty the journaller system queue upon shutdown */
295 while ( (que.vtbl->read(&que, buf, bufsiz, &pending) >= 0)
296 && arg_queue_max_cnt-- )
299 que.vtbl->dealloc(&que, buf);
300 que.vtbl->destructor(&que);
302 stats_report(&st);
304 return 0;