Ripped out --sink-ram option.
[lwes-journaller.git] / src / queue_to_journal.c
blobb19b2b402622fd8f435986e738f5d9e2d71480b7
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
22 #include "header.h"
23 #include "queue_to_journal.h"
25 #include "journal.h"
26 #include "log.h"
27 #include "opt.h"
28 #include "peer.h"
29 #include "queue.h"
30 #include "sig.h"
31 #include "stats.h"
32 #include "perror.h"
33 #include "xport.h"
34 #include "stats.h"
36 #include <fcntl.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <unistd.h>
42 struct stats st ;
44 void* queue_to_journal(void* arg)
46 struct queue que;
47 struct journal* jrn;
48 int jc;
49 int jcurr = 0;
50 void* buf = NULL ;
51 size_t bufsiz;
52 int pending ;
54 time_t last_rotate = 0 ; //time(NULL);
55 time_t this_rotate ;
57 (void)arg; /* appease -Wall -Werror */
59 stats_ctor(&st);
61 install_signal_handlers();
62 install_rotate_signal_handlers(); /* This is the process or thread
63 that does the rotate. */
64 /* Create queue object. */
65 if ( (queue_factory(&que) < 0) || (que.vtbl->open(&que, O_RDONLY) < 0) )
67 LOG_ER("Failed to create or open the queue.\n");
68 exit(EXIT_FAILURE);
71 jrn = (struct journal*)malloc(arg_njournalls * sizeof(struct journal));
72 if ( 0 == jrn )
74 LOG_ER("Failed to allocate space (%d bytes) for journal objects.\n",
75 arg_njournalls * sizeof(struct journal));
76 exit(EXIT_FAILURE);
79 for ( jc=0; jc<arg_njournalls; ++jc )
81 /* Create journal objects. */
82 if ( journal_factory(&jrn[jc], arg_journalls[jc]) < 0 )
84 LOG_ER("Failed to create journal object for \"%s\".\n",
85 arg_journalls[jc]);
86 exit(EXIT_FAILURE);
90 if ( jrn[jcurr].vtbl->open(&jrn[jcurr], O_WRONLY) < 0 )
92 LOG_ER("Failed to open the journal \"%s\".\n",
93 arg_journalls[jcurr]);
94 exit(EXIT_FAILURE);
97 buf = que.vtbl->alloc(&que, &bufsiz);
98 if ( NULL == buf )
100 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz);
101 exit(EXIT_FAILURE);
104 /* Read a packet from the queue, write it to the journal. */
105 while ( 1 )
107 int que_read_ret;
108 int jrn_write_ret;
110 /* If we have a pending rotate to perform, do it now. */
111 if ( gbl_rotate )
112 { // <gbl_rotate>
114 LOG_PROG("About to rotate journal.\n");
116 stats_rotate(&st);
117 if ( jrn[jcurr].vtbl->close(&jrn[jcurr]) < 0 )
119 LOG_ER("Can't close journal \"%s\".\n", arg_journalls[jcurr]);
120 exit(EXIT_FAILURE);
123 jcurr = (jcurr + 1) % arg_njournalls;
125 if ( jrn[jcurr].vtbl->open(&jrn[jcurr], O_WRONLY) < 0 )
127 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls[jcurr]);
128 exit(EXIT_FAILURE);
131 gbl_rotate = 0;
132 } // </gbl_rotate>
134 if ( (que_read_ret = que.vtbl->read(&que, buf, bufsiz, &pending)) < 0 )
136 /* queue is empty; if we're shutting down, exit this loop. */
137 if (gbl_done) break;
138 /* queue is empty */
139 continue;
141 LOG_PROG("Read %d bytes from queue (%d pending).\n",
142 que_read_ret, pending);
144 // is this a command event?
145 switch ( header_is_rotate(buf, &this_rotate) )
147 case 2:
148 { // System::Ping
149 ping(buf, que_read_ret) ;
150 goto fallthru ;
153 case 1:
154 { // Command::Rotate
155 // is it a new enough Command::Rotate, or masked out?
156 time_t since = this_rotate - last_rotate;
157 if ( since < arg_rotate_mask )
159 continue ; // don't respond to duplicate Command::Rotate's
162 last_rotate = this_rotate ;
163 memcpy(&st.latest_rotate_header, buf, HEADER_LENGTH) ;
164 gbl_rotate = 1;
166 peer_correlate(buf, que_read_ret);
167 //TODO: ?what did they mean by this? gdw.2006.11.28
168 // fall through to write this Command::Rotate out before
169 // looping to actually rotate.
172 default:
173 fallthru:
174 stats_record(&st, que_read_ret, pending);
175 /* Write the packet out to the journal. */
176 if ( (jrn_write_ret = jrn[jcurr].vtbl->write(&jrn[jcurr],
177 buf, que_read_ret))
178 != que_read_ret )
180 LOG_ER("Journal write error -- attempted to write %d bytes, "
181 "write returned %d.\n", que_read_ret, jrn_write_ret);
182 stats_record_loss(&st);
185 } /* while ( ! gdb_done) */
187 stats_rotate(&st);
188 if ( jrn[jcurr].vtbl->close(&jrn[jcurr]) < 0 )
190 LOG_ER("Can't close journal \"%s\".\n", arg_journalls[jcurr]);
192 for ( jc=0; jc<arg_njournalls; ++jc )
194 jrn[jc].vtbl->destructor(&jrn[jc]);
196 free(jrn);
198 /* Empty the journaller system queue upon shutdown */
199 while ( (que.vtbl->read(&que, buf, bufsiz, &pending) >= 0)
200 && arg_queue_max_cnt-- )
203 que.vtbl->dealloc(&que, buf);
204 que.vtbl->destructor(&que);
206 stats_report(&st);
208 return 0;