tevent: use TEVENT_FD_* instead of EVENT_FD_*
[Samba.git] / lib / tevent / tevent_aio.c
blobba798cc79918e83c308ab647f1c2f5c2ffb182a4
1 /*
2 Unix SMB/CIFS implementation.
4 main select loop and event handling - aio/epoll hybrid implementation
6 Copyright (C) Andrew Tridgell 2006
8 based on events_standard.c
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 3 of the License, or
13 (at your option) any later version.
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 this is a very strange beast. The Linux AIO implementation doesn't
25 yet integrate properly with epoll, but there is a kernel patch that
26 allows the aio wait primitives to be used to wait for epoll events,
27 and this can be used to give us a unified event system incorporating
28 both aio events and epoll events
30 this is _very_ experimental code
33 #include "replace.h"
34 #include "system/filesys.h"
35 #include "system/network.h"
36 #include "system/select.h"
37 #include "tevent.h"
38 #include "tevent_internal.h"
39 #include "tevent_util.h"
40 #include <libaio.h>
42 #define MAX_AIO_QUEUE_DEPTH 100
43 #ifndef IOCB_CMD_EPOLL_WAIT
44 #define IOCB_CMD_EPOLL_WAIT 9
45 #endif
47 struct aio_event_context {
48 /* a pointer back to the generic event_context */
49 struct tevent_context *ev;
51 /* list of filedescriptor events */
52 struct tevent_fd *fd_events;
54 /* number of registered fd event handlers */
55 int num_fd_events;
57 uint32_t destruction_count;
59 io_context_t ioctx;
61 struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
63 struct iocb *epoll_iocb;
65 int epoll_fd;
66 int is_epoll_set;
67 pid_t pid;
70 struct tevent_aio {
71 struct tevent_context *event_ctx;
72 struct iocb iocb;
73 void *private_data;
74 tevent_aio_handler_t handler;
78 map from EVENT_FD_* to EPOLLIN/EPOLLOUT
80 static uint32_t epoll_map_flags(uint16_t flags)
82 uint32_t ret = 0;
83 if (flags & TEVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
84 if (flags & TEVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
85 return ret;
89 free the epoll fd
91 static int aio_ctx_destructor(struct aio_event_context *aio_ev)
93 io_queue_release(aio_ev->ioctx);
94 close(aio_ev->epoll_fd);
95 aio_ev->epoll_fd = -1;
96 return 0;
99 static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde);
102 reopen the epoll handle when our pid changes
103 see http://junkcode.samba.org/ftp/unpacked/junkcode/epoll_fork.c for an
104 demonstration of why this is needed
106 static void epoll_check_reopen(struct aio_event_context *aio_ev)
108 struct tevent_fd *fde;
110 if (aio_ev->pid == getpid()) {
111 return;
114 close(aio_ev->epoll_fd);
115 aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
116 if (aio_ev->epoll_fd == -1) {
117 tevent_debug(aio_ev->ev, TEVENT_DEBUG_FATAL,
118 "Failed to recreate epoll handle after fork\n");
119 return;
121 aio_ev->pid = getpid();
122 for (fde=aio_ev->fd_events;fde;fde=fde->next) {
123 epoll_add_event(aio_ev, fde);
127 #define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT (1<<0)
128 #define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR (1<<1)
129 #define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR (1<<2)
132 add the epoll event to the given fd_event
134 static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
136 struct epoll_event event;
137 if (aio_ev->epoll_fd == -1) return;
139 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
141 /* if we don't want events yet, don't add an aio_event */
142 if (fde->flags == 0) return;
144 memset(&event, 0, sizeof(event));
145 event.events = epoll_map_flags(fde->flags);
146 event.data.ptr = fde;
147 epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
148 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
150 /* only if we want to read we want to tell the event handler about errors */
151 if (fde->flags & TEVENT_FD_READ) {
152 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
157 delete the epoll event for given fd_event
159 static void epoll_del_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
161 struct epoll_event event;
163 DLIST_REMOVE(aio_ev->fd_events, fde);
165 if (aio_ev->epoll_fd == -1) return;
167 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
169 /* if there's no aio_event, we don't need to delete it */
170 if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
172 ZERO_STRUCT(event);
173 event.events = epoll_map_flags(fde->flags);
174 event.data.ptr = fde;
175 epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
177 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
181 change the epoll event to the given fd_event
183 static void epoll_mod_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
185 struct epoll_event event;
186 if (aio_ev->epoll_fd == -1) return;
188 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
190 ZERO_STRUCT(event);
191 event.events = epoll_map_flags(fde->flags);
192 event.data.ptr = fde;
193 epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
195 /* only if we want to read we want to tell the event handler about errors */
196 if (fde->flags & TEVENT_FD_READ) {
197 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
201 static void epoll_change_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
203 bool got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
204 bool want_read = (fde->flags & TEVENT_FD_READ);
205 bool want_write= (fde->flags & TEVENT_FD_WRITE);
207 if (aio_ev->epoll_fd == -1) return;
209 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
211 /* there's already an event */
212 if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
213 if (want_read || (want_write && !got_error)) {
214 epoll_mod_event(aio_ev, fde);
215 return;
217 epoll_del_event(aio_ev, fde);
218 return;
221 /* there's no aio_event attached to the fde */
222 if (want_read || (want_write && !got_error)) {
223 DLIST_ADD(aio_ev->fd_events, fde);
224 epoll_add_event(aio_ev, fde);
225 return;
229 static int setup_epoll_wait(struct aio_event_context *aio_ev)
231 if (aio_ev->is_epoll_set) {
232 return 0;
234 memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
235 aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
236 aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
237 aio_ev->epoll_iocb->aio_reqprio = 0;
239 aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
240 aio_ev->epoll_iocb->u.c.offset = -1;
241 aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
243 if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
244 return -1;
246 aio_ev->is_epoll_set = 1;
248 return 0;
253 event loop handling using aio/epoll hybrid
255 static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
257 int ret, i;
258 uint32_t destruction_count = ++aio_ev->destruction_count;
259 struct timespec timeout;
260 struct io_event events[8];
262 if (aio_ev->epoll_fd == -1) return -1;
264 if (aio_ev->ev->num_signal_handlers &&
265 tevent_common_check_signal(aio_ev->ev)) {
266 return 0;
269 if (tvalp) {
270 timeout.tv_sec = tvalp->tv_sec;
271 timeout.tv_nsec = tvalp->tv_usec;
272 timeout.tv_nsec *= 1000;
275 if (setup_epoll_wait(aio_ev) < 0)
276 return -1;
278 ret = io_getevents(aio_ev->ioctx, 1, 8,
279 events, tvalp?&timeout:NULL);
281 if (ret == -EINTR) {
282 if (aio_ev->ev->num_signal_handlers) {
283 tevent_common_check_signal(aio_ev->ev);
285 return 0;
288 if (ret == 0 && tvalp) {
289 /* we don't care about a possible delay here */
290 tevent_common_loop_timer_delay(aio_ev->ev);
291 return 0;
294 for (i=0;i<ret;i++) {
295 struct io_event *event = &events[i];
296 struct iocb *finished = event->obj;
298 switch (finished->aio_lio_opcode) {
299 case IO_CMD_PWRITE:
300 case IO_CMD_PREAD: {
301 struct tevent_aio *ae = talloc_get_type(finished->data,
302 struct tevent_aio);
303 if (ae) {
304 talloc_set_destructor(ae, NULL);
305 ae->handler(ae->event_ctx, ae,
306 event->res, ae->private_data);
307 talloc_free(ae);
309 break;
311 case IOCB_CMD_EPOLL_WAIT: {
312 struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
313 struct tevent_fd *fde;
314 uint16_t flags = 0;
315 int j;
317 aio_ev->is_epoll_set = 0;
319 for (j=0; j<event->res; j++, ep++) {
320 fde = talloc_get_type(ep->data.ptr,
321 struct tevent_fd);
322 if (fde == NULL) {
323 return -1;
325 if (ep->events & (EPOLLHUP|EPOLLERR)) {
326 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
327 if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
328 epoll_del_event(aio_ev, fde);
329 continue;
331 flags |= TEVENT_FD_READ;
333 if (ep->events & EPOLLIN) flags |= TEVENT_FD_READ;
334 if (ep->events & EPOLLOUT) flags |= TEVENT_FD_WRITE;
335 if (flags) {
336 fde->handler(aio_ev->ev, fde, flags, fde->private_data);
339 break;
342 if (destruction_count != aio_ev->destruction_count) {
343 return 0;
347 return 0;
351 create a aio_event_context structure.
353 static int aio_event_context_init(struct tevent_context *ev)
355 struct aio_event_context *aio_ev;
357 aio_ev = talloc_zero(ev, struct aio_event_context);
358 if (!aio_ev) return -1;
360 aio_ev->ev = ev;
361 aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
363 if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
364 talloc_free(aio_ev);
365 return -1;
368 aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
369 if (aio_ev->epoll_fd == -1) {
370 talloc_free(aio_ev);
371 return -1;
373 aio_ev->pid = getpid();
375 talloc_set_destructor(aio_ev, aio_ctx_destructor);
377 ev->additional_data = aio_ev;
379 if (setup_epoll_wait(aio_ev) < 0) {
380 talloc_free(aio_ev);
381 return -1;
384 return 0;
388 destroy an fd_event
390 static int aio_event_fd_destructor(struct tevent_fd *fde)
392 struct tevent_context *ev = fde->event_ctx;
393 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
394 struct aio_event_context);
396 epoll_check_reopen(aio_ev);
398 aio_ev->num_fd_events--;
399 aio_ev->destruction_count++;
401 epoll_del_event(aio_ev, fde);
403 if (fde->flags & TEVENT_FD_AUTOCLOSE) {
404 close(fde->fd);
405 fde->fd = -1;
408 return 0;
412 add a fd based event
413 return NULL on failure (memory allocation error)
415 static struct tevent_fd *aio_event_add_fd(struct tevent_context *ev, TALLOC_CTX *mem_ctx,
416 int fd, uint16_t flags,
417 tevent_fd_handler_t handler,
418 void *private_data,
419 const char *handler_name,
420 const char *location)
422 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
423 struct aio_event_context);
424 struct tevent_fd *fde;
426 epoll_check_reopen(aio_ev);
428 fde = talloc(mem_ctx?mem_ctx:ev, struct tevent_fd);
429 if (!fde) return NULL;
431 fde->event_ctx = ev;
432 fde->fd = fd;
433 fde->flags = flags;
434 fde->handler = handler;
435 fde->private_data = private_data;
436 fde->handler_name = handler_name;
437 fde->location = location;
438 fde->additional_flags = 0;
439 fde->additional_data = NULL;
441 aio_ev->num_fd_events++;
442 talloc_set_destructor(fde, aio_event_fd_destructor);
444 DLIST_ADD(aio_ev->fd_events, fde);
445 epoll_add_event(aio_ev, fde);
447 return fde;
452 return the fd event flags
454 static uint16_t aio_event_get_fd_flags(struct tevent_fd *fde)
456 return fde->flags;
460 set the fd event flags
462 static void aio_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
464 struct tevent_context *ev;
465 struct aio_event_context *aio_ev;
467 if (fde->flags == flags) return;
469 ev = fde->event_ctx;
470 aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
472 fde->flags = flags;
474 epoll_check_reopen(aio_ev);
476 epoll_change_event(aio_ev, fde);
480 do a single event loop using the events defined in ev
482 static int aio_event_loop_once(struct tevent_context *ev)
484 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
485 struct aio_event_context);
486 struct timeval tval;
488 tval = tevent_common_loop_timer_delay(ev);
489 if (ev_timeval_is_zero(&tval)) {
490 return 0;
493 epoll_check_reopen(aio_ev);
495 return aio_event_loop(aio_ev, &tval);
499 return on failure or (with 0) if all fd events are removed
501 static int aio_event_loop_wait(struct tevent_context *ev)
503 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
504 struct aio_event_context);
505 while (aio_ev->num_fd_events) {
506 if (aio_event_loop_once(ev) != 0) {
507 break;
511 return 0;
515 called when a disk IO event needs to be cancelled
517 static int aio_destructor(struct tevent_aio *ae)
519 struct tevent_context *ev = ae->event_ctx;
520 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
521 struct aio_event_context);
522 struct io_event result;
523 io_cancel(aio_ev->ioctx, &ae->iocb, &result);
524 /* TODO: handle errors from io_cancel()! */
525 return 0;
528 /* submit an aio disk IO event */
529 static struct tevent_aio *aio_event_add_aio(struct tevent_context *ev,
530 TALLOC_CTX *mem_ctx,
531 struct iocb *iocb,
532 tevent_aio_handler_t handler,
533 void *private_data,
534 const char *handler_name,
535 const char *location)
537 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
538 struct aio_event_context);
539 struct iocb *iocbp;
540 struct tevent_aio *ae = talloc(mem_ctx?mem_ctx:ev, struct tevent_aio);
541 if (ae == NULL) return NULL;
543 ae->event_ctx = ev;
544 ae->iocb = *iocb;
545 ae->handler = handler;
546 ae->private_data = private_data;
547 iocbp = &ae->iocb;
549 if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
550 talloc_free(ae);
551 return NULL;
553 ae->iocb.data = ae;
554 talloc_set_destructor(ae, aio_destructor);
556 return ae;
559 static const struct tevent_ops aio_event_ops = {
560 .context_init = aio_event_context_init,
561 .add_fd = aio_event_add_fd,
562 .add_aio = aio_event_add_aio,
563 .get_fd_flags = aio_event_get_fd_flags,
564 .set_fd_flags = aio_event_set_fd_flags,
565 .add_timer = tevent_common_add_timer,
566 .add_signal = tevent_common_add_signal,
567 .loop_once = aio_event_loop_once,
568 .loop_wait = aio_event_loop_wait,
571 bool tevent_aio_init(void)
573 return tevent_register_backend("aio", &aio_event_ops);