1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
23 #include "queue_to_journal.h"
44 void* queue_to_journal(void* arg
)
54 time_t last_rotate
= 0 ; //time(NULL);
57 int sink_ram_count
= 0 ;
58 time_t sink_rotate
= 0 ; // latest sink rotate
59 (void)arg
; /* appease -Wall -Werror */
63 install_signal_handlers();
64 install_rotate_signal_handlers(); /* This is the process or thread
65 that does the rotate. */
66 /* Create queue object. */
67 if ( (queue_factory(&que
) < 0) || (que
.vtbl
->open(&que
, O_RDONLY
) < 0) )
69 LOG_ER("Failed to create or open the queue.\n");
73 jrn
= (struct journal
*)malloc(arg_njournalls
* sizeof(struct journal
));
76 LOG_ER("Failed to allocate space (%d bytes) for journal objects.\n",
77 arg_njournalls
* sizeof(struct journal
));
81 for ( jc
=0; jc
<arg_njournalls
; ++jc
)
83 /* Create journal objects. */
84 if ( journal_factory(&jrn
[jc
], arg_journalls
[jc
]) < 0 )
86 LOG_ER("Failed to create journal object for \"%s\".\n",
92 if ( jrn
[jcurr
].vtbl
->open(&jrn
[jcurr
], O_WRONLY
) < 0 )
94 LOG_ER("Failed to open the journal \"%s\".\n",
95 arg_journalls
[jcurr
]);
99 buf
= que
.vtbl
->alloc(&que
, &bufsiz
);
102 LOG_ER("unable to allocate %d bytes for message buffer.\n", bufsiz
);
106 /* Read a packet from the queue, write it to the journal. */
112 /* If we have a pending rotate to perform, do it now. */
116 LOG_PROG("About to rotate journal.\n");
119 if ( jrn
[jcurr
].vtbl
->close(&jrn
[jcurr
]) < 0 )
121 LOG_ER("Can't close journal \"%s\".\n", arg_journalls
[jcurr
]);
125 jcurr
= (jcurr
+ 1) % arg_njournalls
;
127 if ( jrn
[jcurr
].vtbl
->open(&jrn
[jcurr
], O_WRONLY
) < 0 )
129 LOG_ER("Failed to open the journal \"%s\".\n", arg_journalls
[jcurr
]);
135 sink_rotate
= time(NULL
) ;
138 else if ( arg_sink_ram
!= NULL
) /* perhaps /sink/ram ? */
140 /* time to rotate sink-ram ? */
141 if ( sink_ram_count
++ >= 10000 )
143 /* was last rotate in this second? */
144 if ( time(NULL
) == sink_rotate
)
146 /* then we'll wait a bit before rotate_sink_ram() */
147 sink_ram_count
= 9000 ;
151 if ( jrn
[jcurr
].vtbl
->close(&jrn
[jcurr
]) < 0 )
153 LOG_ER("Can't close sink-ram journal \"%s\".\n",
154 arg_journalls
[jcurr
]);
157 jcurr
= (jcurr
+ 1) % arg_njournalls
;
158 if ( jrn
[jcurr
].vtbl
->open(&jrn
[jcurr
], O_WRONLY
) < 0 )
160 LOG_ER("Failed to open sink-ram journal \"%s\".\n",
161 arg_journalls
[jcurr
]);
164 sink_rotate
= time(NULL
) ;
170 if ( (que_read_ret
= que
.vtbl
->read(&que
, buf
, bufsiz
, &pending
)) < 0 )
172 /* queue is empty, is panic over? */
173 if ( journaller_panic_mode
== PANIC_IN_EFFECT
)
175 journaller_panic_mode
= PANIC_SHUTDOWN
; /* panic is over */
176 LOG_INF("PANIC is over: hi-burst: %lli packets, %lli bytes.\n",
177 st
.packets_in_burst_since_last_rotate
,
178 st
.bytes_in_burst_since_last_rotate
);
182 /* queue is empty, is hurry-up over? */
183 if ( journaller_panic_mode
== PANIC_HURRYUP
)
185 journaller_panic_mode
= PANIC_NOT
; // panic is over
186 LOG_INF("HURRYUP is over: hi-burst: %lli packets, "
188 st
.packets_in_burst_since_last_rotate
,
189 st
.bytes_in_burst_since_last_rotate
);
194 LOG_PROG("Read %d bytes from queue (%d pending).\n",
195 que_read_ret
, pending
);
197 // is this a Command::Rotate?
198 switch ( header_is_rotate(buf
, &this_rotate
) )
201 ping(buf
, que_read_ret
) ;
206 // is it a new enough Command::Rotate, or masked out?
207 time_t since
= this_rotate
- last_rotate
;
208 if ( since
< arg_rotate_mask
)
210 continue ; // don't respond to duplicate Command::Rotate's
213 last_rotate
= this_rotate
;
214 memcpy(&st
.latest_rotate_header
, buf
, HEADER_LENGTH
) ;
217 peer_correlate(buf
, que_read_ret
);
218 //TODO: ?what did they mean by this? gdw.2006.11.28
219 // fall through to write this Command::Rotate out before
220 // looping to actually rotate.
225 // if hurry-up or panic, discard non-revenue-bearing events
226 switch ( journaller_panic_mode
)
228 case PANIC_HURRYUP
: // clear hurry-up mode if it has worked ...
229 if ( pending
<= (arg_queue_max_cnt
*arg_hurrydown_at
)/100 )
231 LOG_INF("HURRYUP finish, pending=%i <= %i hi-burst: "
232 "%lli packets, %lli bytes.\n",
233 pending
,(arg_queue_max_cnt
*arg_hurrydown_at
)/100,
234 st
.packets_in_burst_since_last_rotate
,
235 st
.bytes_in_burst_since_last_rotate
);
236 journaller_panic_mode
= PANIC_NOT
;
240 if ( non_revenue_bearing(buf
) )
248 case PANIC_IN_EFFECT
: // clear panic mode if it has worked ...
249 if ( pending
<= 100 )
250 { // queue is empty, is panic over?
251 journaller_panic_mode
= PANIC_SHUTDOWN
; // panic is over
252 LOG_INF("PANIC is over: hi-burst: %lli packets, "
254 st
.packets_in_burst_since_last_rotate
,
255 st
.bytes_in_burst_since_last_rotate
);
259 if ( non_revenue_bearing(buf
) )
270 stats_record(&st
, que_read_ret
, pending
);
271 /* Write the packet out to the journal. */
272 if ( (jrn_write_ret
= jrn
[jcurr
].vtbl
->write(&jrn
[jcurr
],
276 LOG_ER("Journal write error -- attempted to write %d bytes, "
277 "write returned %d.\n", que_read_ret
, jrn_write_ret
);
278 stats_record_loss(&st
);
281 } /* while ( ! gdb_done) */
284 if ( jrn
[jcurr
].vtbl
->close(&jrn
[jcurr
]) < 0 )
286 LOG_ER("Can't close journal \"%s\".\n", arg_journalls
[jcurr
]);
288 for ( jc
=0; jc
<arg_njournalls
; ++jc
)
290 jrn
[jc
].vtbl
->destructor(&jrn
[jc
]);
294 /* Empty the journaller system queue upon shutdown */
295 while ( (que
.vtbl
->read(&que
, buf
, bufsiz
, &pending
) >= 0)
296 && arg_queue_max_cnt
-- )
299 que
.vtbl
->dealloc(&que
, buf
);
300 que
.vtbl
->destructor(&que
);