2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 #include <sys/types.h>
34 #ifdef HAVE_SYS_TIME_H
50 void bufferevent_setwatermark(struct bufferevent
*, short, size_t, size_t);
51 void bufferevent_read_pressure_cb(struct evbuffer
*, size_t, size_t, void *);
54 bufferevent_add(struct event
*ev
, int timeout
)
56 struct timeval tv
, *ptv
= NULL
;
64 return (event_add(ev
, ptv
));
68 * This callback is executed when the size of the input buffer changes.
69 * We use it to apply back pressure on the reading side.
73 bufferevent_read_pressure_cb(struct evbuffer
*buf
, size_t old
, size_t now
,
75 struct bufferevent
*bufev
= arg
;
77 * If we are below the watermark then reschedule reading if it's
80 if (bufev
->wm_read
.high
== 0 || now
< bufev
->wm_read
.high
) {
81 evbuffer_setcb(buf
, NULL
, NULL
);
83 if (bufev
->enabled
& EV_READ
)
84 bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
);
89 bufferevent_readcb(int fd
, short event
, void *arg
)
91 struct bufferevent
*bufev
= arg
;
93 short what
= EVBUFFER_READ
;
97 if (event
== EV_TIMEOUT
) {
98 what
|= EVBUFFER_TIMEOUT
;
103 * If we have a high watermark configured then we don't want to
104 * read more data than would make us reach the watermark.
106 if (bufev
->wm_read
.high
!= 0)
107 howmuch
= bufev
->wm_read
.high
;
109 res
= evbuffer_read(bufev
->input
, fd
, howmuch
);
111 if (errno
== EAGAIN
|| errno
== EINTR
)
114 what
|= EVBUFFER_ERROR
;
115 } else if (res
== 0) {
117 what
|= EVBUFFER_EOF
;
123 bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
);
125 /* See if this callbacks meets the water marks */
126 len
= EVBUFFER_LENGTH(bufev
->input
);
127 if (bufev
->wm_read
.low
!= 0 && len
< bufev
->wm_read
.low
)
129 if (bufev
->wm_read
.high
!= 0 && len
> bufev
->wm_read
.high
) {
130 struct evbuffer
*buf
= bufev
->input
;
131 event_del(&bufev
->ev_read
);
133 /* Now schedule a callback for us */
134 evbuffer_setcb(buf
, bufferevent_read_pressure_cb
, bufev
);
138 /* Invoke the user callback - must always be called last */
139 if (bufev
->readcb
!= NULL
)
140 (*bufev
->readcb
)(bufev
, bufev
->cbarg
);
144 bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
);
148 (*bufev
->errorcb
)(bufev
, what
, bufev
->cbarg
);
152 bufferevent_writecb(int fd
, short event
, void *arg
)
154 struct bufferevent
*bufev
= arg
;
156 short what
= EVBUFFER_WRITE
;
158 if (event
== EV_TIMEOUT
) {
159 what
|= EVBUFFER_TIMEOUT
;
163 if (EVBUFFER_LENGTH(bufev
->output
)) {
164 res
= evbuffer_write(bufev
->output
, fd
);
167 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
168 *set errno. thus this error checking is not portable*/
169 if (errno
== EAGAIN
||
171 errno
== EINPROGRESS
)
174 what
|= EVBUFFER_ERROR
;
180 } else if (res
== 0) {
182 what
|= EVBUFFER_EOF
;
188 if (EVBUFFER_LENGTH(bufev
->output
) != 0)
189 bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
);
192 * Invoke the user callback if our buffer is drained or below the
195 if (bufev
->writecb
!= NULL
&&
196 EVBUFFER_LENGTH(bufev
->output
) <= bufev
->wm_write
.low
)
197 (*bufev
->writecb
)(bufev
, bufev
->cbarg
);
202 if (EVBUFFER_LENGTH(bufev
->output
) != 0)
203 bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
);
207 (*bufev
->errorcb
)(bufev
, what
, bufev
->cbarg
);
211 * Create a new buffered event object.
213 * The read callback is invoked whenever we read new data.
214 * The write callback is invoked whenever the output buffer is drained.
215 * The error callback is invoked on a write/read error or on EOF.
217 * Both read and write callbacks maybe NULL. The error callback is not
218 * allowed to be NULL and have to be provided always.
222 bufferevent_new(int fd
, evbuffercb readcb
, evbuffercb writecb
,
223 everrorcb errorcb
, void *cbarg
)
225 struct bufferevent
*bufev
;
227 if ((bufev
= calloc(1, sizeof(struct bufferevent
))) == NULL
)
230 if ((bufev
->input
= evbuffer_new()) == NULL
) {
235 if ((bufev
->output
= evbuffer_new()) == NULL
) {
236 evbuffer_free(bufev
->input
);
241 event_set(&bufev
->ev_read
, fd
, EV_READ
, bufferevent_readcb
, bufev
);
242 event_set(&bufev
->ev_write
, fd
, EV_WRITE
, bufferevent_writecb
, bufev
);
244 bufev
->readcb
= readcb
;
245 bufev
->writecb
= writecb
;
246 bufev
->errorcb
= errorcb
;
248 bufev
->cbarg
= cbarg
;
251 * Set to EV_WRITE so that using bufferevent_write is going to
252 * trigger a callback. Reading needs to be explicitly enabled
253 * because otherwise no data will be available.
255 bufev
->enabled
= EV_WRITE
;
261 bufferevent_priority_set(struct bufferevent
*bufev
, int priority
)
263 if (event_priority_set(&bufev
->ev_read
, priority
) == -1)
265 if (event_priority_set(&bufev
->ev_write
, priority
) == -1)
271 /* Closing the file descriptor is the responsibility of the caller */
274 bufferevent_free(struct bufferevent
*bufev
)
276 event_del(&bufev
->ev_read
);
277 event_del(&bufev
->ev_write
);
279 evbuffer_free(bufev
->input
);
280 evbuffer_free(bufev
->output
);
286 * Returns 0 on success;
291 bufferevent_write(struct bufferevent
*bufev
, void *data
, size_t size
)
295 res
= evbuffer_add(bufev
->output
, data
, size
);
300 /* If everything is okay, we need to schedule a write */
301 if (size
> 0 && (bufev
->enabled
& EV_WRITE
))
302 bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
);
308 bufferevent_write_buffer(struct bufferevent
*bufev
, struct evbuffer
*buf
)
312 res
= bufferevent_write(bufev
, buf
->buffer
, buf
->off
);
314 evbuffer_drain(buf
, buf
->off
);
320 bufferevent_read(struct bufferevent
*bufev
, void *data
, size_t size
)
322 struct evbuffer
*buf
= bufev
->input
;
327 /* Copy the available data to the user buffer */
328 memcpy(data
, buf
->buffer
, size
);
331 evbuffer_drain(buf
, size
);
337 bufferevent_enable(struct bufferevent
*bufev
, short event
)
339 if (event
& EV_READ
) {
340 if (bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
) == -1)
343 if (event
& EV_WRITE
) {
344 if (bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
) == -1)
348 bufev
->enabled
|= event
;
353 bufferevent_disable(struct bufferevent
*bufev
, short event
)
355 if (event
& EV_READ
) {
356 if (event_del(&bufev
->ev_read
) == -1)
359 if (event
& EV_WRITE
) {
360 if (event_del(&bufev
->ev_write
) == -1)
364 bufev
->enabled
&= ~event
;
369 * Sets the read and write timeout for a buffered event.
373 bufferevent_settimeout(struct bufferevent
*bufev
,
374 int timeout_read
, int timeout_write
) {
375 bufev
->timeout_read
= timeout_read
;
376 bufev
->timeout_write
= timeout_write
;
380 * Sets the water marks
384 bufferevent_setwatermark(struct bufferevent
*bufev
, short events
,
385 size_t lowmark
, size_t highmark
)
387 if (events
& EV_READ
) {
388 bufev
->wm_read
.low
= lowmark
;
389 bufev
->wm_read
.high
= highmark
;
392 if (events
& EV_WRITE
) {
393 bufev
->wm_write
.low
= lowmark
;
394 bufev
->wm_write
.high
= highmark
;
397 /* If the watermarks changed then see if we should call read again */
398 bufferevent_read_pressure_cb(bufev
->input
,
399 0, EVBUFFER_LENGTH(bufev
->input
), bufev
);
403 bufferevent_base_set(struct event_base
*base
, struct bufferevent
*bufev
)
407 res
= event_base_set(base
, &bufev
->ev_read
);
411 res
= event_base_set(base
, &bufev
->ev_write
);