2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 #include <sys/types.h>
31 #include "event2/event-config.h"
33 #ifdef _EVENT_HAVE_SYS_TIME_H
41 #ifdef _EVENT_HAVE_STDARG_H
49 #include "event2/util.h"
50 #include "event2/bufferevent.h"
51 #include "event2/buffer.h"
52 #include "event2/bufferevent_struct.h"
53 #include "event2/event.h"
54 #include "log-internal.h"
55 #include "mm-internal.h"
56 #include "bufferevent-internal.h"
57 #include "util-internal.h"
60 static int be_filter_enable(struct bufferevent
*, short);
61 static int be_filter_disable(struct bufferevent
*, short);
62 static void be_filter_destruct(struct bufferevent
*);
64 static void be_filter_readcb(struct bufferevent
*, void *);
65 static void be_filter_writecb(struct bufferevent
*, void *);
66 static void be_filter_eventcb(struct bufferevent
*, short, void *);
67 static int be_filter_flush(struct bufferevent
*bufev
,
68 short iotype
, enum bufferevent_flush_mode mode
);
69 static int be_filter_ctrl(struct bufferevent
*, enum bufferevent_ctrl_op
, union bufferevent_ctrl_data
*);
71 static void bufferevent_filtered_outbuf_cb(struct evbuffer
*buf
,
72 const struct evbuffer_cb_info
*info
, void *arg
);
74 struct bufferevent_filtered
{
75 struct bufferevent_private bev
;
77 /** The bufferevent that we read/write filtered data from/to. */
78 struct bufferevent
*underlying
;
79 /** A callback on our outbuf to notice when somebody adds data */
80 struct evbuffer_cb_entry
*outbuf_cb
;
81 /** True iff we have received an EOF callback from the underlying
85 /** Function to free context when we're done. */
86 void (*free_context
)(void *);
88 bufferevent_filter_cb process_in
;
90 bufferevent_filter_cb process_out
;
91 /** User-supplied argument to the filters. */
95 const struct bufferevent_ops bufferevent_ops_filter
= {
97 evutil_offsetof(struct bufferevent_filtered
, bev
.bev
),
101 _bufferevent_generic_adj_timeouts
,
106 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
107 * return that bufferevent_filtered. Returns NULL otherwise.*/
108 static inline struct bufferevent_filtered
*
109 upcast(struct bufferevent
*bev
)
111 struct bufferevent_filtered
*bev_f
;
112 if (bev
->be_ops
!= &bufferevent_ops_filter
)
114 bev_f
= (void*)( ((char*)bev
) -
115 evutil_offsetof(struct bufferevent_filtered
, bev
.bev
));
116 EVUTIL_ASSERT(bev_f
->bev
.bev
.be_ops
== &bufferevent_ops_filter
);
120 #define downcast(bev_f) (&(bev_f)->bev.bev)
122 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
123 * over its high watermark such that we should not write to it in a given
126 be_underlying_writebuf_full(struct bufferevent_filtered
*bevf
,
127 enum bufferevent_flush_mode state
)
129 struct bufferevent
*u
= bevf
->underlying
;
130 return state
== BEV_NORMAL
&&
132 evbuffer_get_length(u
->output
) >= u
->wm_write
.high
;
135 /** Return 1 if our input buffer is at or over its high watermark such that we
136 * should not write to it in a given flush mode. */
138 be_readbuf_full(struct bufferevent_filtered
*bevf
,
139 enum bufferevent_flush_mode state
)
141 struct bufferevent
*bufev
= downcast(bevf
);
142 return state
== BEV_NORMAL
&&
143 bufev
->wm_read
.high
&&
144 evbuffer_get_length(bufev
->input
) >= bufev
->wm_read
.high
;
148 /* Filter to use when we're created with a NULL filter. */
149 static enum bufferevent_filter_result
150 be_null_filter(struct evbuffer
*src
, struct evbuffer
*dst
, ev_ssize_t lim
,
151 enum bufferevent_flush_mode state
, void *ctx
)
154 if (evbuffer_remove_buffer(src
, dst
, lim
) == 0)
161 bufferevent_filter_new(struct bufferevent
*underlying
,
162 bufferevent_filter_cb input_filter
,
163 bufferevent_filter_cb output_filter
,
165 void (*free_context
)(void *),
168 struct bufferevent_filtered
*bufev_f
;
169 int tmp_options
= options
& ~BEV_OPT_THREADSAFE
;
175 input_filter
= be_null_filter
;
177 output_filter
= be_null_filter
;
179 bufev_f
= mm_calloc(1, sizeof(struct bufferevent_filtered
));
183 if (bufferevent_init_common(&bufev_f
->bev
, underlying
->ev_base
,
184 &bufferevent_ops_filter
, tmp_options
) < 0) {
188 if (options
& BEV_OPT_THREADSAFE
) {
189 bufferevent_enable_locking(downcast(bufev_f
), NULL
);
192 bufev_f
->underlying
= underlying
;
194 bufev_f
->process_in
= input_filter
;
195 bufev_f
->process_out
= output_filter
;
196 bufev_f
->free_context
= free_context
;
197 bufev_f
->context
= ctx
;
199 bufferevent_setcb(bufev_f
->underlying
,
200 be_filter_readcb
, be_filter_writecb
, be_filter_eventcb
, bufev_f
);
202 bufev_f
->outbuf_cb
= evbuffer_add_cb(downcast(bufev_f
)->output
,
203 bufferevent_filtered_outbuf_cb
, bufev_f
);
205 _bufferevent_init_generic_timeout_cbs(downcast(bufev_f
));
206 bufferevent_incref(underlying
);
208 bufferevent_enable(underlying
, EV_READ
|EV_WRITE
);
209 bufferevent_suspend_read(underlying
, BEV_SUSPEND_FILT_READ
);
211 return downcast(bufev_f
);
215 be_filter_destruct(struct bufferevent
*bev
)
217 struct bufferevent_filtered
*bevf
= upcast(bev
);
219 if (bevf
->free_context
)
220 bevf
->free_context(bevf
->context
);
222 if (bevf
->bev
.options
& BEV_OPT_CLOSE_ON_FREE
) {
223 /* Yes, there is also a decref in bufferevent_decref.
224 * That decref corresponds to the incref when we set
225 * underlying for the first time. This decref is an
226 * extra one to remove the last reference.
228 if (BEV_UPCAST(bevf
->underlying
)->refcnt
< 2) {
229 event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
230 "bufferevent with too few references");
232 bufferevent_free(bevf
->underlying
);
235 if (bevf
->underlying
) {
236 if (bevf
->underlying
->errorcb
== be_filter_eventcb
)
237 bufferevent_setcb(bevf
->underlying
,
238 NULL
, NULL
, NULL
, NULL
);
239 bufferevent_unsuspend_read(bevf
->underlying
,
240 BEV_SUSPEND_FILT_READ
);
244 _bufferevent_del_generic_timeout_cbs(bev
);
248 be_filter_enable(struct bufferevent
*bev
, short event
)
250 struct bufferevent_filtered
*bevf
= upcast(bev
);
251 if (event
& EV_WRITE
)
252 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev
);
254 if (event
& EV_READ
) {
255 BEV_RESET_GENERIC_READ_TIMEOUT(bev
);
256 bufferevent_unsuspend_read(bevf
->underlying
,
257 BEV_SUSPEND_FILT_READ
);
263 be_filter_disable(struct bufferevent
*bev
, short event
)
265 struct bufferevent_filtered
*bevf
= upcast(bev
);
266 if (event
& EV_WRITE
)
267 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev
);
268 if (event
& EV_READ
) {
269 BEV_DEL_GENERIC_READ_TIMEOUT(bev
);
270 bufferevent_suspend_read(bevf
->underlying
,
271 BEV_SUSPEND_FILT_READ
);
276 static enum bufferevent_filter_result
277 be_filter_process_input(struct bufferevent_filtered
*bevf
,
278 enum bufferevent_flush_mode state
,
281 enum bufferevent_filter_result res
;
282 struct bufferevent
*bev
= downcast(bevf
);
284 if (state
== BEV_NORMAL
) {
285 /* If we're in 'normal' mode, don't urge data on the filter
286 * unless we're reading data and under our high-water mark.*/
287 if (!(bev
->enabled
& EV_READ
) ||
288 be_readbuf_full(bevf
, state
))
293 ev_ssize_t limit
= -1;
294 if (state
== BEV_NORMAL
&& bev
->wm_read
.high
)
295 limit
= bev
->wm_read
.high
-
296 evbuffer_get_length(bev
->input
);
298 res
= bevf
->process_in(bevf
->underlying
->input
,
299 bev
->input
, limit
, state
, bevf
->context
);
303 } while (res
== BEV_OK
&&
304 (bev
->enabled
& EV_READ
) &&
305 evbuffer_get_length(bevf
->underlying
->input
) &&
306 !be_readbuf_full(bevf
, state
));
309 BEV_RESET_GENERIC_READ_TIMEOUT(bev
);
315 static enum bufferevent_filter_result
316 be_filter_process_output(struct bufferevent_filtered
*bevf
,
317 enum bufferevent_flush_mode state
,
320 /* Requires references and lock: might call writecb */
321 enum bufferevent_filter_result res
= BEV_OK
;
322 struct bufferevent
*bufev
= downcast(bevf
);
325 if (state
== BEV_NORMAL
) {
326 /* If we're in 'normal' mode, don't urge data on the
327 * filter unless we're writing data, and the underlying
328 * bufferevent is accepting data, and we have data to
329 * give the filter. If we're in 'flush' or 'finish',
330 * call the filter no matter what. */
331 if (!(bufev
->enabled
& EV_WRITE
) ||
332 be_underlying_writebuf_full(bevf
, state
) ||
333 !evbuffer_get_length(bufev
->output
))
337 /* disable the callback that calls this function
338 when the user adds to the output buffer. */
339 evbuffer_cb_set_flags(bufev
->output
, bevf
->outbuf_cb
, 0);
346 ev_ssize_t limit
= -1;
347 if (state
== BEV_NORMAL
&&
348 bevf
->underlying
->wm_write
.high
)
349 limit
= bevf
->underlying
->wm_write
.high
-
350 evbuffer_get_length(bevf
->underlying
->output
);
352 res
= bevf
->process_out(downcast(bevf
)->output
,
353 bevf
->underlying
->output
,
359 processed
= *processed_out
= 1;
360 } while (/* Stop if the filter wasn't successful...*/
362 /* Or if we aren't writing any more. */
363 (bufev
->enabled
& EV_WRITE
) &&
364 /* Of if we have nothing more to write and we are
366 evbuffer_get_length(bufev
->output
) &&
367 /* Or if we have filled the underlying output buffer. */
368 !be_underlying_writebuf_full(bevf
,state
));
371 evbuffer_get_length(bufev
->output
) <= bufev
->wm_write
.low
) {
372 /* call the write callback.*/
373 _bufferevent_run_writecb(bufev
);
376 (bufev
->enabled
& EV_WRITE
) &&
377 evbuffer_get_length(bufev
->output
) &&
378 !be_underlying_writebuf_full(bevf
, state
)) {
384 /* reenable the outbuf_cb */
385 evbuffer_cb_set_flags(bufev
->output
,bevf
->outbuf_cb
,
386 EVBUFFER_CB_ENABLED
);
389 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev
);
394 /* Called when the size of our outbuf changes. */
396 bufferevent_filtered_outbuf_cb(struct evbuffer
*buf
,
397 const struct evbuffer_cb_info
*cbinfo
, void *arg
)
399 struct bufferevent_filtered
*bevf
= arg
;
400 struct bufferevent
*bev
= downcast(bevf
);
402 if (cbinfo
->n_added
) {
403 int processed_any
= 0;
404 /* Somebody added more data to the output buffer. Try to
405 * process it, if we should. */
406 _bufferevent_incref_and_lock(bev
);
407 be_filter_process_output(bevf
, BEV_NORMAL
, &processed_any
);
408 _bufferevent_decref_and_unlock(bev
);
412 /* Called when the underlying socket has read. */
414 be_filter_readcb(struct bufferevent
*underlying
, void *_me
)
416 struct bufferevent_filtered
*bevf
= _me
;
417 enum bufferevent_filter_result res
;
418 enum bufferevent_flush_mode state
;
419 struct bufferevent
*bufev
= downcast(bevf
);
420 int processed_any
= 0;
422 _bufferevent_incref_and_lock(bufev
);
425 state
= BEV_FINISHED
;
429 /* XXXX use return value */
430 res
= be_filter_process_input(bevf
, state
, &processed_any
);
433 /* XXX This should be in process_input, not here. There are
434 * other places that can call process-input, and they should
435 * force readcb calls as needed. */
437 evbuffer_get_length(bufev
->input
) >= bufev
->wm_read
.low
)
438 _bufferevent_run_readcb(bufev
);
440 _bufferevent_decref_and_unlock(bufev
);
443 /* Called when the underlying socket has drained enough that we can write to
446 be_filter_writecb(struct bufferevent
*underlying
, void *_me
)
448 struct bufferevent_filtered
*bevf
= _me
;
449 struct bufferevent
*bev
= downcast(bevf
);
450 int processed_any
= 0;
452 _bufferevent_incref_and_lock(bev
);
453 be_filter_process_output(bevf
, BEV_NORMAL
, &processed_any
);
454 _bufferevent_decref_and_unlock(bev
);
457 /* Called when the underlying socket has given us an error */
459 be_filter_eventcb(struct bufferevent
*underlying
, short what
, void *_me
)
461 struct bufferevent_filtered
*bevf
= _me
;
462 struct bufferevent
*bev
= downcast(bevf
);
464 _bufferevent_incref_and_lock(bev
);
465 /* All we can really to is tell our own eventcb. */
466 _bufferevent_run_eventcb(bev
, what
);
467 _bufferevent_decref_and_unlock(bev
);
471 be_filter_flush(struct bufferevent
*bufev
,
472 short iotype
, enum bufferevent_flush_mode mode
)
474 struct bufferevent_filtered
*bevf
= upcast(bufev
);
475 int processed_any
= 0;
478 _bufferevent_incref_and_lock(bufev
);
480 if (iotype
& EV_READ
) {
481 be_filter_process_input(bevf
, mode
, &processed_any
);
483 if (iotype
& EV_WRITE
) {
484 be_filter_process_output(bevf
, mode
, &processed_any
);
486 /* XXX check the return value? */
487 /* XXX does this want to recursively call lower-level flushes? */
488 bufferevent_flush(bevf
->underlying
, iotype
, mode
);
490 _bufferevent_decref_and_unlock(bufev
);
492 return processed_any
;
496 be_filter_ctrl(struct bufferevent
*bev
, enum bufferevent_ctrl_op op
,
497 union bufferevent_ctrl_data
*data
)
499 struct bufferevent_filtered
*bevf
;
501 case BEV_CTRL_GET_UNDERLYING
:
503 data
->ptr
= bevf
->underlying
;
505 case BEV_CTRL_GET_FD
:
506 case BEV_CTRL_SET_FD
:
507 case BEV_CTRL_CANCEL_ALL
: