From 7a266c575af9fa31583c2bd64f79e3b66fd30815 Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Thu, 24 Apr 2014 09:05:53 +0000 Subject: [PATCH] messaging3: Add messaging_filtered_read This delegates the decision whether to read a message to a callback Signed-off-by: Volker Lendecke Reviewed-by: Stefan Metzmacher Reviewed-by: Jeremy Allison --- source3/include/messages.h | 8 +++ source3/lib/messages.c | 128 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 111 insertions(+), 25 deletions(-) diff --git a/source3/include/messages.h b/source3/include/messages.h index 06c174833cd..7801dfb3d70 100644 --- a/source3/include/messages.h +++ b/source3/include/messages.h @@ -142,6 +142,14 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx, void messaging_dispatch_rec(struct messaging_context *msg_ctx, struct messaging_rec *rec); +struct tevent_req *messaging_filtered_read_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct messaging_context *msg_ctx, + bool (*filter)(struct messaging_rec *rec, void *private_data), + void *private_data); +int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + struct messaging_rec **presult); + struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct messaging_context *msg, diff --git a/source3/lib/messages.c b/source3/lib/messages.c index 9284ac132ab..ca254a4cfea 100644 --- a/source3/lib/messages.c +++ b/source3/lib/messages.c @@ -458,33 +458,38 @@ static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx, return result; } -struct messaging_read_state { +struct messaging_filtered_read_state { struct tevent_context *ev; struct messaging_context *msg_ctx; - uint32_t msg_type; + + bool (*filter)(struct messaging_rec *rec, void *private_data); + void *private_data; + struct messaging_rec *rec; }; -static void messaging_read_cleanup(struct tevent_req *req, - enum tevent_req_state req_state); +static void messaging_filtered_read_cleanup(struct tevent_req *req, + enum tevent_req_state req_state); -struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx, - struct tevent_context *ev, - struct messaging_context *msg_ctx, - uint32_t msg_type) +struct tevent_req *messaging_filtered_read_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct messaging_context *msg_ctx, + bool (*filter)(struct messaging_rec *rec, void *private_data), + void *private_data) { struct tevent_req *req; - struct messaging_read_state *state; + struct messaging_filtered_read_state *state; size_t new_waiters_len; req = tevent_req_create(mem_ctx, &state, - struct messaging_read_state); + struct messaging_filtered_read_state); if (req == NULL) { return NULL; } state->ev = ev; state->msg_ctx = msg_ctx; - state->msg_type = msg_type; + state->filter = filter; + state->private_data = private_data; new_waiters_len = talloc_array_length(msg_ctx->new_waiters); @@ -501,16 +506,16 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx, msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req; msg_ctx->num_new_waiters += 1; - tevent_req_set_cleanup_fn(req, messaging_read_cleanup); + tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup); return req; } -static void messaging_read_cleanup(struct tevent_req *req, - enum tevent_req_state req_state) +static void messaging_filtered_read_cleanup(struct tevent_req *req, + enum tevent_req_state req_state) { - struct messaging_read_state *state = tevent_req_data( - req, struct messaging_read_state); + struct messaging_filtered_read_state *state = tevent_req_data( + req, struct messaging_filtered_read_state); struct messaging_context *msg_ctx = state->msg_ctx; unsigned i; @@ -531,11 +536,11 @@ static void messaging_read_cleanup(struct tevent_req *req, } } -static void messaging_read_done(struct tevent_req *req, - struct messaging_rec *rec) +static void messaging_filtered_read_done(struct tevent_req *req, + struct messaging_rec *rec) { - struct messaging_read_state *state = tevent_req_data( - req, struct messaging_read_state); + struct messaging_filtered_read_state *state = tevent_req_data( + req, struct messaging_filtered_read_state); state->rec = messaging_rec_dup(state, rec); if (tevent_req_nomem(state->rec, req)) { @@ -544,6 +549,79 @@ static void messaging_read_done(struct tevent_req *req, tevent_req_done(req); } +int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + struct messaging_rec **presult) +{ + struct messaging_filtered_read_state *state = tevent_req_data( + req, struct messaging_filtered_read_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + tevent_req_received(req); + return err; + } + *presult = talloc_move(mem_ctx, &state->rec); + return 0; +} + +struct messaging_read_state { + uint32_t msg_type; + struct messaging_rec *rec; +}; + +static bool messaging_read_filter(struct messaging_rec *rec, + void *private_data); +static void messaging_read_done(struct tevent_req *subreq); + +struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct messaging_context *msg, + uint32_t msg_type) +{ + struct tevent_req *req, *subreq; + struct messaging_read_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct messaging_read_state); + if (req == NULL) { + return NULL; + } + state->msg_type = msg_type; + + subreq = messaging_filtered_read_send(state, ev, msg, + messaging_read_filter, state); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, messaging_read_done, req); + return req; +} + +static bool messaging_read_filter(struct messaging_rec *rec, + void *private_data) +{ + struct messaging_read_state *state = talloc_get_type_abort( + private_data, struct messaging_read_state); + + return rec->msg_type == state->msg_type; +} + +static void messaging_read_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct messaging_read_state *state = tevent_req_data( + req, struct messaging_read_state); + int ret; + + ret = messaging_filtered_read_recv(subreq, state, &state->rec); + TALLOC_FREE(subreq); + if (tevent_req_error(req, ret)) { + return; + } + tevent_req_done(req); +} + int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, struct messaging_rec **presult) { @@ -552,7 +630,6 @@ int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, int err; if (tevent_req_is_unix_error(req, &err)) { - tevent_req_received(req); return err; } if (presult != NULL) { @@ -618,7 +695,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx, i = 0; while (i < msg_ctx->num_waiters) { struct tevent_req *req; - struct messaging_read_state *state; + struct messaging_filtered_read_state *state; req = msg_ctx->waiters[i]; if (req == NULL) { @@ -638,9 +715,10 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx, continue; } - state = tevent_req_data(req, struct messaging_read_state); - if (state->msg_type == rec->msg_type) { - messaging_read_done(req, rec); + state = tevent_req_data( + req, struct messaging_filtered_read_state); + if (state->filter(rec, state->private_data)) { + messaging_filtered_read_done(req, rec); } i += 1; -- 2.11.4.GIT