2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 #include <sys/types.h>
34 #ifdef HAVE_SYS_TIME_H
55 void bufferevent_read_pressure_cb(struct evbuffer
*, size_t, size_t, void *);
58 bufferevent_add(struct event
*ev
, int timeout
)
60 struct timeval tv
, *ptv
= NULL
;
63 evutil_timerclear(&tv
);
68 return (event_add(ev
, ptv
));
72 * This callback is executed when the size of the input buffer changes.
73 * We use it to apply back pressure on the reading side.
77 bufferevent_read_pressure_cb(struct evbuffer
*buf
, size_t old
, size_t now
,
79 struct bufferevent
*bufev
= arg
;
81 * If we are below the watermark then reschedule reading if it's
84 if (bufev
->wm_read
.high
== 0 || now
< bufev
->wm_read
.high
) {
85 evbuffer_setcb(buf
, NULL
, NULL
);
87 if (bufev
->enabled
& EV_READ
)
88 bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
);
93 bufferevent_readcb(int fd
, short event
, void *arg
)
95 struct bufferevent
*bufev
= arg
;
97 short what
= EVBUFFER_READ
;
101 if (event
== EV_TIMEOUT
) {
102 what
|= EVBUFFER_TIMEOUT
;
107 * If we have a high watermark configured then we don't want to
108 * read more data than would make us reach the watermark.
110 if (bufev
->wm_read
.high
!= 0) {
111 howmuch
= bufev
->wm_read
.high
- EVBUFFER_LENGTH(bufev
->input
);
112 /* we might have lowered the watermark, stop reading */
114 struct evbuffer
*buf
= bufev
->input
;
115 event_del(&bufev
->ev_read
);
117 bufferevent_read_pressure_cb
, bufev
);
122 res
= evbuffer_read(bufev
->input
, fd
, howmuch
);
124 if (errno
== EAGAIN
|| errno
== EINTR
)
127 what
|= EVBUFFER_ERROR
;
128 } else if (res
== 0) {
130 what
|= EVBUFFER_EOF
;
136 bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
);
138 /* See if this callbacks meets the water marks */
139 len
= EVBUFFER_LENGTH(bufev
->input
);
140 if (bufev
->wm_read
.low
!= 0 && len
< bufev
->wm_read
.low
)
142 if (bufev
->wm_read
.high
!= 0 && len
>= bufev
->wm_read
.high
) {
143 struct evbuffer
*buf
= bufev
->input
;
144 event_del(&bufev
->ev_read
);
146 /* Now schedule a callback for us when the buffer changes */
147 evbuffer_setcb(buf
, bufferevent_read_pressure_cb
, bufev
);
150 /* Invoke the user callback - must always be called last */
151 if (bufev
->readcb
!= NULL
)
152 (*bufev
->readcb
)(bufev
, bufev
->cbarg
);
156 bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
);
160 (*bufev
->errorcb
)(bufev
, what
, bufev
->cbarg
);
164 bufferevent_writecb(int fd
, short event
, void *arg
)
166 struct bufferevent
*bufev
= arg
;
168 short what
= EVBUFFER_WRITE
;
170 if (event
== EV_TIMEOUT
) {
171 what
|= EVBUFFER_TIMEOUT
;
175 if (EVBUFFER_LENGTH(bufev
->output
)) {
176 res
= evbuffer_write(bufev
->output
, fd
);
179 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
180 *set errno. thus this error checking is not portable*/
181 if (errno
== EAGAIN
||
183 errno
== EINPROGRESS
)
186 what
|= EVBUFFER_ERROR
;
192 } else if (res
== 0) {
194 what
|= EVBUFFER_EOF
;
200 if (EVBUFFER_LENGTH(bufev
->output
) != 0)
201 bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
);
204 * Invoke the user callback if our buffer is drained or below the
207 if (bufev
->writecb
!= NULL
&&
208 EVBUFFER_LENGTH(bufev
->output
) <= bufev
->wm_write
.low
)
209 (*bufev
->writecb
)(bufev
, bufev
->cbarg
);
214 if (EVBUFFER_LENGTH(bufev
->output
) != 0)
215 bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
);
219 (*bufev
->errorcb
)(bufev
, what
, bufev
->cbarg
);
223 * Create a new buffered event object.
225 * The read callback is invoked whenever we read new data.
226 * The write callback is invoked whenever the output buffer is drained.
227 * The error callback is invoked on a write/read error or on EOF.
229 * Both read and write callbacks maybe NULL. The error callback is not
230 * allowed to be NULL and have to be provided always.
234 bufferevent_new(int fd
, evbuffercb readcb
, evbuffercb writecb
,
235 everrorcb errorcb
, void *cbarg
)
237 struct bufferevent
*bufev
;
239 if ((bufev
= calloc(1, sizeof(struct bufferevent
))) == NULL
)
242 if ((bufev
->input
= evbuffer_new()) == NULL
) {
247 if ((bufev
->output
= evbuffer_new()) == NULL
) {
248 evbuffer_free(bufev
->input
);
253 event_set(&bufev
->ev_read
, fd
, EV_READ
, bufferevent_readcb
, bufev
);
254 event_set(&bufev
->ev_write
, fd
, EV_WRITE
, bufferevent_writecb
, bufev
);
256 bufferevent_setcb(bufev
, readcb
, writecb
, errorcb
, cbarg
);
259 * Set to EV_WRITE so that using bufferevent_write is going to
260 * trigger a callback. Reading needs to be explicitly enabled
261 * because otherwise no data will be available.
263 bufev
->enabled
= EV_WRITE
;
269 bufferevent_setcb(struct bufferevent
*bufev
,
270 evbuffercb readcb
, evbuffercb writecb
, everrorcb errorcb
, void *cbarg
)
272 bufev
->readcb
= readcb
;
273 bufev
->writecb
= writecb
;
274 bufev
->errorcb
= errorcb
;
276 bufev
->cbarg
= cbarg
;
280 bufferevent_setfd(struct bufferevent
*bufev
, int fd
)
282 event_del(&bufev
->ev_read
);
283 event_del(&bufev
->ev_write
);
285 event_set(&bufev
->ev_read
, fd
, EV_READ
, bufferevent_readcb
, bufev
);
286 event_set(&bufev
->ev_write
, fd
, EV_WRITE
, bufferevent_writecb
, bufev
);
287 if (bufev
->ev_base
!= NULL
) {
288 event_base_set(bufev
->ev_base
, &bufev
->ev_read
);
289 event_base_set(bufev
->ev_base
, &bufev
->ev_write
);
292 /* might have to manually trigger event registration */
296 bufferevent_priority_set(struct bufferevent
*bufev
, int priority
)
298 if (event_priority_set(&bufev
->ev_read
, priority
) == -1)
300 if (event_priority_set(&bufev
->ev_write
, priority
) == -1)
306 /* Closing the file descriptor is the responsibility of the caller */
309 bufferevent_free(struct bufferevent
*bufev
)
311 event_del(&bufev
->ev_read
);
312 event_del(&bufev
->ev_write
);
314 evbuffer_free(bufev
->input
);
315 evbuffer_free(bufev
->output
);
321 * Returns 0 on success;
326 bufferevent_write(struct bufferevent
*bufev
, const void *data
, size_t size
)
330 res
= evbuffer_add(bufev
->output
, data
, size
);
335 /* If everything is okay, we need to schedule a write */
336 if (size
> 0 && (bufev
->enabled
& EV_WRITE
))
337 bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
);
343 bufferevent_write_buffer(struct bufferevent
*bufev
, struct evbuffer
*buf
)
347 res
= bufferevent_write(bufev
, buf
->buffer
, buf
->off
);
349 evbuffer_drain(buf
, buf
->off
);
355 bufferevent_read(struct bufferevent
*bufev
, void *data
, size_t size
)
357 struct evbuffer
*buf
= bufev
->input
;
362 /* Copy the available data to the user buffer */
363 memcpy(data
, buf
->buffer
, size
);
366 evbuffer_drain(buf
, size
);
372 bufferevent_enable(struct bufferevent
*bufev
, short event
)
374 if (event
& EV_READ
) {
375 if (bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
) == -1)
378 if (event
& EV_WRITE
) {
379 if (bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
) == -1)
383 bufev
->enabled
|= event
;
388 bufferevent_disable(struct bufferevent
*bufev
, short event
)
390 if (event
& EV_READ
) {
391 if (event_del(&bufev
->ev_read
) == -1)
394 if (event
& EV_WRITE
) {
395 if (event_del(&bufev
->ev_write
) == -1)
399 bufev
->enabled
&= ~event
;
404 * Sets the read and write timeout for a buffered event.
408 bufferevent_settimeout(struct bufferevent
*bufev
,
409 int timeout_read
, int timeout_write
) {
410 bufev
->timeout_read
= timeout_read
;
411 bufev
->timeout_write
= timeout_write
;
413 if (event_pending(&bufev
->ev_read
, EV_READ
, NULL
))
414 bufferevent_add(&bufev
->ev_read
, timeout_read
);
415 if (event_pending(&bufev
->ev_write
, EV_WRITE
, NULL
))
416 bufferevent_add(&bufev
->ev_write
, timeout_write
);
420 * Sets the water marks
424 bufferevent_setwatermark(struct bufferevent
*bufev
, short events
,
425 size_t lowmark
, size_t highmark
)
427 if (events
& EV_READ
) {
428 bufev
->wm_read
.low
= lowmark
;
429 bufev
->wm_read
.high
= highmark
;
432 if (events
& EV_WRITE
) {
433 bufev
->wm_write
.low
= lowmark
;
434 bufev
->wm_write
.high
= highmark
;
437 /* If the watermarks changed then see if we should call read again */
438 bufferevent_read_pressure_cb(bufev
->input
,
439 0, EVBUFFER_LENGTH(bufev
->input
), bufev
);
443 bufferevent_base_set(struct event_base
*base
, struct bufferevent
*bufev
)
447 bufev
->ev_base
= base
;
449 res
= event_base_set(base
, &bufev
->ev_read
);
453 res
= event_base_set(base
, &bufev
->ev_write
);