1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
23 #include "queue_to_journal.h"
44 void* queue_to_journal(void* arg
)
54 time_t last_rotate
= 0 ; //time(NULL);
57 (void)arg
; /* appease -Wall -Werror */
61 install_signal_handlers();
62 install_rotate_signal_handlers(); /* This is the process or thread
63 that does the rotate. */
64 /* Create queue object. */
65 if ( (queue_factory(&que
) < 0) || (que
.vtbl
->open(&que
, O_RDONLY
) < 0) )
67 LOG_ER("Failed to create or open the queue.\n");
71 jrn
= (struct journal
*)malloc(arg_njournalls
* sizeof(struct journal
));
74 LOG_ER("Failed to allocate space (%d bytes) for journal objects.\n",
75 arg_njournalls
* sizeof(struct journal
));
79 for ( jc
=0; jc
<arg_njournalls
; ++jc
)
81 /* Create journal objects. */
82 if ( journal_factory(&jrn
[jc
], arg_journalls
[jc
]) < 0 )
84 LOG_ER("Failed to create journal object for \"%s\".\n",
90 if ( jrn
[jcurr
].vtbl
->open(&jrn
[jcurr
], O_WRONLY
) < 0 )
92 LOG_ER("Failed to open the journal \"%s\".\n",
93 arg_journalls
[jcurr
]);
97 buf
= que
.vtbl
->alloc(&que
, &bufsiz
);
100 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz
);
104 /* Read a packet from the queue, write it to the journal. */
110 /* If we have a pending rotate to perform, do it now. */
114 LOG_PROG("About to rotate journal.\n");
117 if ( jrn
[jcurr
].vtbl
->close(&jrn
[jcurr
]) < 0 )
119 LOG_ER("Can't close journal \"%s\".\n", arg_journalls
[jcurr
]);
123 jcurr
= (jcurr
+ 1) % arg_njournalls
;
125 if ( jrn
[jcurr
].vtbl
->open(&jrn
[jcurr
], O_WRONLY
) < 0 )
127 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls
[jcurr
]);
134 if ( (que_read_ret
= que
.vtbl
->read(&que
, buf
, bufsiz
, &pending
)) < 0 )
136 /* queue is empty; if we're shutting down, exit this loop. */
141 LOG_PROG("Read %d bytes from queue (%d pending).\n",
142 que_read_ret
, pending
);
144 // is this a command event?
145 switch ( header_is_rotate(buf
, &this_rotate
) )
149 ping(buf
, que_read_ret
) ;
155 // is it a new enough Command::Rotate, or masked out?
156 time_t since
= this_rotate
- last_rotate
;
157 if ( since
< arg_rotate_mask
)
159 continue ; // don't respond to duplicate Command::Rotate's
162 last_rotate
= this_rotate
;
163 memcpy(&st
.latest_rotate_header
, buf
, HEADER_LENGTH
) ;
166 peer_correlate(buf
, que_read_ret
);
167 //TODO: ?what did they mean by this? gdw.2006.11.28
168 // fall through to write this Command::Rotate out before
169 // looping to actually rotate.
174 stats_record(&st
, que_read_ret
, pending
);
175 /* Write the packet out to the journal. */
176 if ( (jrn_write_ret
= jrn
[jcurr
].vtbl
->write(&jrn
[jcurr
],
180 LOG_ER("Journal write error -- attempted to write %d bytes, "
181 "write returned %d.\n", que_read_ret
, jrn_write_ret
);
182 stats_record_loss(&st
);
185 } /* while ( ! gdb_done) */
188 if ( jrn
[jcurr
].vtbl
->close(&jrn
[jcurr
]) < 0 )
190 LOG_ER("Can't close journal \"%s\".\n", arg_journalls
[jcurr
]);
192 for ( jc
=0; jc
<arg_njournalls
; ++jc
)
194 jrn
[jc
].vtbl
->destructor(&jrn
[jc
]);
198 /* Empty the journaller system queue upon shutdown */
199 while ( (que
.vtbl
->read(&que
, buf
, bufsiz
, &pending
) >= 0)
200 && arg_queue_max_cnt
-- )
203 que
.vtbl
->dealloc(&que
, buf
);
204 que
.vtbl
->destructor(&que
);