From dca572ff1ce1559a2254d9ba46d4f86d48c38c21 Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Mon, 5 May 2014 08:45:52 +0200 Subject: [PATCH] lib: Enhance poll_funcs_tevent for multiple tevent_contexts With this patch it will be possible to use nested event contexts with messaging_filtered_read_send/recv. Before this patchset only the one and only event context a messaging_context is initialized with is able to receive datagrams from the unix domain socket. So if you want to code a synchronous RPC-like operation using a nested event context, you will not see the reply, because the nested event context does not have the required tevent_fd's. Unfortunately, this patchset has to add some advanced array voodoo. The idea is that state->watches[] contains what we hand out with watch_new, and state->contexts contains references to the tevent_contexts. For every watch we need a tevent_fd in every event context, and the routines make sure that the arrays are properly maintained. Signed-off-by: Volker Lendecke Reviewed-by: Stefan Metzmacher Reviewed-by: Jeremy Allison --- source3/lib/messages_dgm.c | 18 +- source3/lib/poll_funcs/poll_funcs_tevent.c | 356 +++++++++++++++++++++++++++-- source3/lib/poll_funcs/poll_funcs_tevent.h | 15 +- source3/lib/unix_msg/test_drain.c | 11 +- source3/lib/unix_msg/test_source.c | 16 +- source3/lib/unix_msg/tests.c | 23 +- 6 files changed, 398 insertions(+), 41 deletions(-) diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c index 354dac36773..56643b1ffd6 100644 --- a/source3/lib/messages_dgm.c +++ b/source3/lib/messages_dgm.c @@ -30,7 +30,8 @@ struct messaging_dgm_context { struct messaging_context *msg_ctx; - struct poll_funcs msg_callbacks; + struct poll_funcs *msg_callbacks; + void *tevent_handle; struct unix_msg_ctx *dgm_ctx; char *cache_dir; int lockfile_fd; @@ -224,7 +225,18 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx, return map_nt_error_from_unix(ret); } - poll_funcs_init_tevent(&ctx->msg_callbacks, msg_ctx->event_ctx); + ctx->msg_callbacks = poll_funcs_init_tevent(ctx); + if (ctx->msg_callbacks == NULL) { + TALLOC_FREE(result); + return NT_STATUS_NO_MEMORY; + } + + ctx->tevent_handle = poll_funcs_tevent_register( + ctx, ctx->msg_callbacks, msg_ctx->event_ctx); + if (ctx->tevent_handle == NULL) { + TALLOC_FREE(result); + return NT_STATUS_NO_MEMORY; + } ok = directory_create_or_exist_strict(socket_dir, sec_initial_uid(), 0700); @@ -239,7 +251,7 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx, generate_random_buffer((uint8_t *)&cookie, sizeof(cookie)); - ret = unix_msg_init(socket_name, &ctx->msg_callbacks, 1024, cookie, + ret = unix_msg_init(socket_name, ctx->msg_callbacks, 1024, cookie, messaging_dgm_recv, ctx, &ctx->dgm_ctx); TALLOC_FREE(socket_name); if (ret != 0) { diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.c b/source3/lib/poll_funcs/poll_funcs_tevent.c index 6e750429bad..ee800badb41 100644 --- a/source3/lib/poll_funcs/poll_funcs_tevent.c +++ b/source3/lib/poll_funcs/poll_funcs_tevent.c @@ -1,6 +1,6 @@ /* * Unix SMB/CIFS implementation. - * Copyright (C) Volker Lendecke 2013 + * Copyright (C) Volker Lendecke 2013,2014 * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -20,14 +20,56 @@ #include "tevent.h" #include "system/select.h" +/* + * A poll_watch is asked for by the engine using this library via + * funcs->watch_new(). It represents interest in "fd" becoming readable or + * writable. + */ + struct poll_watch { - struct tevent_fd *fde; + struct poll_funcs_state *state; + unsigned slot; /* index into state->watches[] */ int fd; + int events; void (*callback)(struct poll_watch *w, int fd, short events, void *private_data); void *private_data; }; +struct poll_funcs_state { + /* + * "watches" is the array of all watches that we have handed out via + * funcs->watch_new(). The "watches" array can contain NULL pointers. + */ + unsigned num_watches; + struct poll_watch **watches; + + /* + * "contexts is the array of tevent_contexts that serve + * "watches". "contexts" can contain NULL pointers. + */ + unsigned num_contexts; + struct poll_funcs_tevent_context **contexts; +}; + +struct poll_funcs_tevent_context { + unsigned refcount; + struct poll_funcs_state *state; + unsigned slot; /* index into state->contexts[] */ + struct tevent_context *ev; + struct tevent_fd **fdes; /* same indexes as state->watches[] */ +}; + +/* + * poll_funcs_tevent_register() hands out a struct poll_funcs_tevent_handle as + * a void *. poll_funcs_tevent_register allows tevent_contexts to be + * registered multiple times, and we can't add a tevent_fd for the same fd's + * multiple times. So we have to share one poll_funcs_tevent_context. + */ +struct poll_funcs_tevent_handle { + struct poll_funcs_tevent_context *ctx; +}; + static uint16_t poll_events_to_tevent(short events) { uint16_t ret = 0; @@ -54,9 +96,56 @@ static short tevent_to_poll_events(uint16_t flags) return ret; } -static void tevent_watch_handler(struct tevent_context *ev, - struct tevent_fd *fde, uint16_t flags, - void *private_data); +/* + * Find or create a free slot in state->watches[] + */ +static bool poll_funcs_watch_find_slot(struct poll_funcs_state *state, + unsigned *slot) +{ + struct poll_watch **watches; + unsigned i; + + for (i=0; inum_watches; i++) { + if (state->watches[i] == NULL) { + *slot = i; + return true; + } + } + + watches = talloc_realloc(state, state->watches, struct poll_watch *, + state->num_watches + 1); + if (watches == NULL) { + return false; + } + watches[state->num_watches] = NULL; + state->watches = watches; + + for (i=0; inum_contexts; i++) { + struct tevent_fd **fdes; + struct poll_funcs_tevent_context *c = state->contexts[i]; + if (c == NULL) { + continue; + } + fdes = talloc_realloc(c, c->fdes, struct tevent_fd *, + state->num_watches + 1); + if (fdes == NULL) { + return false; + } + c->fdes = fdes; + + fdes[state->num_watches] = NULL; + } + + *slot = state->num_watches; + state->num_watches += 1; + + return true; +} + +static void poll_funcs_fde_handler(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, + void *private_data); +static int poll_watch_destructor(struct poll_watch *w); static struct poll_watch *tevent_watch_new( const struct poll_funcs *funcs, int fd, short events, @@ -64,45 +153,87 @@ static struct poll_watch *tevent_watch_new( void *private_data), void *private_data) { - struct tevent_context *ev = talloc_get_type_abort( - funcs->private_data, struct tevent_context); + struct poll_funcs_state *state = talloc_get_type_abort( + funcs->private_data, struct poll_funcs_state); struct poll_watch *w; + unsigned i, slot; - w = talloc(ev, struct poll_watch); - if (w == NULL) { + if (!poll_funcs_watch_find_slot(state, &slot)) { return NULL; } - w->fde = tevent_add_fd(ev, w, fd, poll_events_to_tevent(events), - tevent_watch_handler, w); - if (w->fde == NULL) { - TALLOC_FREE(w); + + w = talloc(state->watches, struct poll_watch); + if (w == NULL) { return NULL; } + w->state = state; + w->slot = slot; + w->fd = fd; + w->events = poll_events_to_tevent(events); w->fd = fd; w->callback = callback; w->private_data = private_data; + state->watches[slot] = w; + + talloc_set_destructor(w, poll_watch_destructor); + + for (i=0; inum_contexts; i++) { + struct poll_funcs_tevent_context *c = state->contexts[i]; + if (c == NULL) { + continue; + } + c->fdes[slot] = tevent_add_fd(c->ev, c->fdes, w->fd, w->events, + poll_funcs_fde_handler, w); + if (c->fdes[slot] == NULL) { + goto fail; + } + } return w; + +fail: + TALLOC_FREE(w); + return NULL; } -static void tevent_watch_handler(struct tevent_context *ev, - struct tevent_fd *fde, uint16_t flags, - void *private_data) +static int poll_watch_destructor(struct poll_watch *w) { - struct poll_watch *w = talloc_get_type_abort( - private_data, struct poll_watch); + struct poll_funcs_state *state = w->state; + unsigned slot = w->slot; + unsigned i; + + TALLOC_FREE(state->watches[slot]); + + for (i=0; inum_contexts; i++) { + struct poll_funcs_tevent_context *c = state->contexts[i]; + if (c == NULL) { + continue; + } + TALLOC_FREE(c->fdes[slot]); + } - w->callback(w, w->fd, tevent_to_poll_events(flags), - w->private_data); + return 0; } static void tevent_watch_update(struct poll_watch *w, short events) { - tevent_fd_set_flags(w->fde, poll_events_to_tevent(events)); + struct poll_funcs_state *state = w->state; + unsigned slot = w->slot; + unsigned i; + + w->events = poll_events_to_tevent(events); + + for (i=0; inum_contexts; i++) { + struct poll_funcs_tevent_context *c = state->contexts[i]; + if (c == NULL) { + continue; + } + tevent_fd_set_flags(c->fdes[slot], w->events); + } } static short tevent_watch_get_events(struct poll_watch *w) { - return tevent_to_poll_events(tevent_fd_get_flags(w->fde)); + return tevent_to_poll_events(w->events); } static void tevent_watch_free(struct poll_watch *w) @@ -130,8 +261,24 @@ static void tevent_timeout_free(struct poll_timeout *t) return; } -void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev) +static int poll_funcs_state_destructor(struct poll_funcs_state *state); + +struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx) { + struct poll_funcs *f; + struct poll_funcs_state *state; + + f = talloc(mem_ctx, struct poll_funcs); + if (f == NULL) { + return NULL; + } + state = talloc_zero(f, struct poll_funcs_state); + if (state == NULL) { + TALLOC_FREE(f); + return NULL; + } + talloc_set_destructor(state, poll_funcs_state_destructor); + f->watch_new = tevent_watch_new; f->watch_update = tevent_watch_update; f->watch_get_events = tevent_watch_get_events; @@ -139,5 +286,166 @@ void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev) f->timeout_new = tevent_timeout_new; f->timeout_update = tevent_timeout_update; f->timeout_free = tevent_timeout_free; - f->private_data = ev; + f->private_data = state; + return f; +} + +static int poll_funcs_state_destructor(struct poll_funcs_state *state) +{ + unsigned i; + /* + * Make sure the watches are cleared before the contexts. The watches + * have destructors attached to them that clean up the fde's + */ + for (i=0; inum_watches; i++) { + TALLOC_FREE(state->watches[i]); + } + return 0; +} + +/* + * Find or create a free slot in state->contexts[] + */ +static bool poll_funcs_context_slot_find(struct poll_funcs_state *state, + struct tevent_context *ev, + unsigned *slot) +{ + struct poll_funcs_tevent_context **contexts; + unsigned i; + + for (i=0; inum_contexts; i++) { + struct poll_funcs_tevent_context *ctx = state->contexts[i]; + + if ((ctx == NULL) || (ctx->ev == ev)) { + *slot = i; + return true; + } + } + + contexts = talloc_realloc(state, state->contexts, + struct poll_funcs_tevent_context *, + state->num_contexts + 1); + if (contexts == NULL) { + return false; + } + state->contexts = contexts; + state->contexts[state->num_contexts] = NULL; + + *slot = state->num_contexts; + state->num_contexts += 1; + + return true; +} + +static int poll_funcs_tevent_context_destructor( + struct poll_funcs_tevent_context *ctx); + +static struct poll_funcs_tevent_context *poll_funcs_tevent_context_new( + TALLOC_CTX *mem_ctx, struct poll_funcs_state *state, + struct tevent_context *ev, unsigned slot) +{ + struct poll_funcs_tevent_context *ctx; + unsigned i; + + ctx = talloc(mem_ctx, struct poll_funcs_tevent_context); + if (ctx == NULL) { + return NULL; + } + + ctx->refcount = 0; + ctx->state = state; + ctx->ev = ev; + ctx->slot = slot; + + ctx->fdes = talloc_array(ctx, struct tevent_fd *, state->num_watches); + if (ctx->fdes == NULL) { + goto fail; + } + + for (i=0; inum_watches; i++) { + struct poll_watch *w = state->watches[i]; + + if (w == NULL) { + ctx->fdes[i] = NULL; + continue; + } + ctx->fdes[i] = tevent_add_fd(ev, ctx->fdes, w->fd, w->events, + poll_funcs_fde_handler, w); + if (ctx->fdes[i] == NULL) { + goto fail; + } + } + talloc_set_destructor(ctx, poll_funcs_tevent_context_destructor); + return ctx; +fail: + TALLOC_FREE(ctx); + return NULL; +} + +static int poll_funcs_tevent_context_destructor( + struct poll_funcs_tevent_context *ctx) +{ + ctx->state->contexts[ctx->slot] = NULL; + return 0; +} + +static void poll_funcs_fde_handler(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, + void *private_data) +{ + struct poll_watch *w = talloc_get_type_abort( + private_data, struct poll_watch); + short events = tevent_to_poll_events(flags); + w->callback(w, w->fd, events, w->private_data); +} + +static int poll_funcs_tevent_handle_destructor( + struct poll_funcs_tevent_handle *handle); + +void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f, + struct tevent_context *ev) +{ + struct poll_funcs_state *state = talloc_get_type_abort( + f->private_data, struct poll_funcs_state); + struct poll_funcs_tevent_handle *handle; + unsigned slot; + + handle = talloc(mem_ctx, struct poll_funcs_tevent_handle); + if (handle == NULL) { + return NULL; + } + + if (!poll_funcs_context_slot_find(state, ev, &slot)) { + goto fail; + } + if (state->contexts[slot] == NULL) { + state->contexts[slot] = poll_funcs_tevent_context_new( + state->contexts, state, ev, slot); + if (state->contexts[slot] == NULL) { + goto fail; + } + } + + handle->ctx = state->contexts[slot]; + handle->ctx->refcount += 1; + talloc_set_destructor(handle, poll_funcs_tevent_handle_destructor); + return handle; +fail: + TALLOC_FREE(handle); + return NULL; +} + +static int poll_funcs_tevent_handle_destructor( + struct poll_funcs_tevent_handle *handle) +{ + if (handle->ctx->refcount == 0) { + abort(); + } + handle->ctx->refcount -= 1; + + if (handle->ctx->refcount != 0) { + return 0; + } + TALLOC_FREE(handle->ctx); + return 0; } diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.h b/source3/lib/poll_funcs/poll_funcs_tevent.h index 2e677203203..8b2964c230d 100644 --- a/source3/lib/poll_funcs/poll_funcs_tevent.h +++ b/source3/lib/poll_funcs/poll_funcs_tevent.h @@ -1,6 +1,6 @@ /* * Unix SMB/CIFS implementation. - * Copyright (C) Volker Lendecke 2013 + * Copyright (C) Volker Lendecke 2013,2014 * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -22,6 +22,17 @@ #include "poll_funcs.h" #include "tevent.h" -void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev); +/* + * Create a new, empty instance of "struct poll_funcs" to be served by tevent. + */ +struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx); + +/* + * Register a tevent_context to handle the watches that the user of + * "poll_funcs" showed interest in. talloc_free() the returned pointer when + * "ev" is not supposed to handle the events anymore. + */ +void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f, + struct tevent_context *ev); #endif diff --git a/source3/lib/unix_msg/test_drain.c b/source3/lib/unix_msg/test_drain.c index 6fe8c188367..c2568b6646b 100644 --- a/source3/lib/unix_msg/test_drain.c +++ b/source3/lib/unix_msg/test_drain.c @@ -16,7 +16,7 @@ static void recv_cb(struct unix_msg_ctx *ctx, int main(int argc, const char *argv[]) { - struct poll_funcs funcs; + struct poll_funcs *funcs; const char *sock; struct unix_msg_ctx *ctx; struct tevent_context *ev; @@ -37,10 +37,13 @@ int main(int argc, const char *argv[]) perror("tevent_context_init failed"); return 1; } - poll_funcs_init_tevent(&funcs, ev); + funcs = poll_funcs_init_tevent(ev); + if (funcs == NULL) { + fprintf(stderr, "poll_funcs_init_tevent failed\n"); + return 1; + } - ret = unix_msg_init(sock, &funcs, 256, 1, - recv_cb, &state, &ctx); + ret = unix_msg_init(sock, funcs, 256, 1, recv_cb, &state, &ctx); if (ret != 0) { fprintf(stderr, "unix_msg_init failed: %s\n", strerror(ret)); diff --git a/source3/lib/unix_msg/test_source.c b/source3/lib/unix_msg/test_source.c index bfafee1fd33..94984d88523 100644 --- a/source3/lib/unix_msg/test_source.c +++ b/source3/lib/unix_msg/test_source.c @@ -5,7 +5,8 @@ int main(int argc, const char *argv[]) { - struct poll_funcs funcs; + struct poll_funcs *funcs; + void *tevent_handle; struct unix_msg_ctx **ctxs; struct tevent_context *ev; struct iovec iov; @@ -26,7 +27,16 @@ int main(int argc, const char *argv[]) perror("tevent_context_init failed"); return 1; } - poll_funcs_init_tevent(&funcs, ev); + funcs = poll_funcs_init_tevent(NULL); + if (funcs == NULL) { + fprintf(stderr, "poll_funcs_init_tevent failed\n"); + return 1; + } + tevent_handle = poll_funcs_tevent_register(NULL, funcs, ev); + if (tevent_handle == NULL) { + fprintf(stderr, "poll_funcs_tevent_register failed\n"); + return 1; + } ctxs = talloc_array(ev, struct unix_msg_ctx *, num_ctxs); if (ctxs == NULL) { @@ -35,7 +45,7 @@ int main(int argc, const char *argv[]) } for (i=0; i