messaging: Act on messages within the right context
[Samba.git] / source3 / lib / messages.c
blobfd128e92018dee731c1c927dbc75472b9d03c160
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
60 struct messaging_callback {
61 struct messaging_callback *prev, *next;
62 uint32_t msg_type;
63 void (*fn)(struct messaging_context *msg, void *private_data,
64 uint32_t msg_type,
65 struct server_id server_id, DATA_BLOB *data);
66 void *private_data;
69 struct messaging_context {
70 struct server_id id;
71 struct tevent_context *event_ctx;
72 struct messaging_callback *callbacks;
74 struct tevent_req **new_waiters;
75 unsigned num_new_waiters;
77 struct tevent_req **waiters;
78 unsigned num_waiters;
80 void *msg_dgm_ref;
81 struct messaging_backend *remote;
83 struct server_id_db *names_db;
86 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
87 struct messaging_rec *rec);
88 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
89 struct tevent_context *ev,
90 struct messaging_rec *rec);
92 /****************************************************************************
93 A useful function for testing the message system.
94 ****************************************************************************/
96 static void ping_message(struct messaging_context *msg_ctx,
97 void *private_data,
98 uint32_t msg_type,
99 struct server_id src,
100 DATA_BLOB *data)
102 struct server_id_buf idbuf;
104 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
105 server_id_str_buf(src, &idbuf), (int)data->length,
106 data->data ? (char *)data->data : ""));
108 messaging_send(msg_ctx, src, MSG_PONG, data);
111 static struct messaging_rec *messaging_rec_create(
112 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
113 uint32_t msg_type, const struct iovec *iov, int iovlen,
114 const int *fds, size_t num_fds)
116 ssize_t buflen;
117 uint8_t *buf;
118 struct messaging_rec *result;
120 if (num_fds > INT8_MAX) {
121 return NULL;
124 buflen = iov_buflen(iov, iovlen);
125 if (buflen == -1) {
126 return NULL;
128 buf = talloc_array(mem_ctx, uint8_t, buflen);
129 if (buf == NULL) {
130 return NULL;
132 iov_buf(iov, iovlen, buf, buflen);
135 struct messaging_rec rec;
136 int64_t fds64[num_fds];
137 size_t i;
139 for (i=0; i<num_fds; i++) {
140 fds64[i] = fds[i];
143 rec = (struct messaging_rec) {
144 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
145 .src = src, .dest = dst,
146 .buf.data = buf, .buf.length = buflen,
147 .num_fds = num_fds, .fds = fds64,
150 result = messaging_rec_dup(mem_ctx, &rec);
153 TALLOC_FREE(buf);
155 return result;
158 static void messaging_recv_cb(struct tevent_context *ev,
159 const uint8_t *msg, size_t msg_len,
160 int *fds, size_t num_fds,
161 void *private_data)
163 struct messaging_context *msg_ctx = talloc_get_type_abort(
164 private_data, struct messaging_context);
165 struct server_id_buf idbuf;
166 struct messaging_rec rec;
167 int64_t fds64[MIN(num_fds, INT8_MAX)];
168 size_t i;
170 if (msg_len < MESSAGE_HDR_LENGTH) {
171 DBG_WARNING("message too short: %zu\n", msg_len);
172 goto close_fail;
175 if (num_fds > INT8_MAX) {
176 DBG_WARNING("too many fds: %zu\n", num_fds);
177 goto close_fail;
181 * "consume" the fds by copying them and setting
182 * the original variable to -1
184 for (i=0; i < num_fds; i++) {
185 fds64[i] = fds[i];
186 fds[i] = -1;
189 rec = (struct messaging_rec) {
190 .msg_version = MESSAGE_VERSION,
191 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
192 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
193 .num_fds = num_fds,
194 .fds = fds64,
197 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
199 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
200 (unsigned)rec.msg_type, rec.buf.length, num_fds,
201 server_id_str_buf(rec.src, &idbuf));
203 messaging_dispatch_rec(msg_ctx, ev, &rec);
204 return;
206 close_fail:
207 for (i=0; i < num_fds; i++) {
208 close(fds[i]);
212 static int messaging_context_destructor(struct messaging_context *ctx)
214 unsigned i;
216 for (i=0; i<ctx->num_new_waiters; i++) {
217 if (ctx->new_waiters[i] != NULL) {
218 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
219 ctx->new_waiters[i] = NULL;
222 for (i=0; i<ctx->num_waiters; i++) {
223 if (ctx->waiters[i] != NULL) {
224 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
225 ctx->waiters[i] = NULL;
229 return 0;
232 static const char *private_path(const char *name)
234 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
237 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
238 struct tevent_context *ev)
240 struct messaging_context *ctx;
241 int ret;
242 const char *lck_path;
243 const char *priv_path;
244 bool ok;
246 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
247 return NULL;
250 ctx->id = (struct server_id) {
251 .pid = getpid(), .vnn = NONCLUSTER_VNN
254 ctx->event_ctx = ev;
256 sec_init();
258 lck_path = lock_path("msg.lock");
259 if (lck_path == NULL) {
260 TALLOC_FREE(ctx);
261 return NULL;
264 ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
265 0755);
266 if (!ok) {
267 DEBUG(10, ("%s: Could not create lock directory: %s\n",
268 __func__, strerror(errno)));
269 TALLOC_FREE(ctx);
270 return NULL;
273 priv_path = private_path("msg.sock");
274 if (priv_path == NULL) {
275 TALLOC_FREE(ctx);
276 return NULL;
279 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
280 0700);
281 if (!ok) {
282 DEBUG(10, ("%s: Could not create msg directory: %s\n",
283 __func__, strerror(errno)));
284 TALLOC_FREE(ctx);
285 return NULL;
288 ctx->msg_dgm_ref = messaging_dgm_ref(
289 ctx, ctx->event_ctx, &ctx->id.unique_id,
290 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
292 if (ctx->msg_dgm_ref == NULL) {
293 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
294 TALLOC_FREE(ctx);
295 return NULL;
298 talloc_set_destructor(ctx, messaging_context_destructor);
300 if (lp_clustering()) {
301 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
303 if (ret != 0) {
304 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
305 strerror(ret)));
306 TALLOC_FREE(ctx);
307 return NULL;
310 ctx->id.vnn = get_my_vnn();
312 ctx->names_db = server_id_db_init(
313 ctx, ctx->id, lp_lock_directory(), 0,
314 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
315 if (ctx->names_db == NULL) {
316 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
317 TALLOC_FREE(ctx);
318 return NULL;
321 messaging_register(ctx, NULL, MSG_PING, ping_message);
323 /* Register some debugging related messages */
325 register_msg_pool_usage(ctx);
326 register_dmalloc_msgs(ctx);
327 debug_register_msgs(ctx);
330 struct server_id_buf tmp;
331 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
334 return ctx;
337 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
339 return msg_ctx->id;
343 * re-init after a fork
345 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
347 int ret;
348 char *lck_path;
350 TALLOC_FREE(msg_ctx->msg_dgm_ref);
352 msg_ctx->id = (struct server_id) {
353 .pid = getpid(), .vnn = msg_ctx->id.vnn
356 lck_path = lock_path("msg.lock");
357 if (lck_path == NULL) {
358 return NT_STATUS_NO_MEMORY;
361 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
362 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
363 private_path("msg.sock"), lck_path,
364 messaging_recv_cb, msg_ctx, &ret);
366 if (msg_ctx->msg_dgm_ref == NULL) {
367 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
368 return map_nt_error_from_unix(ret);
371 if (lp_clustering()) {
372 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
373 msg_ctx->remote);
375 if (ret != 0) {
376 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
377 strerror(ret)));
378 return map_nt_error_from_unix(ret);
382 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
384 return NT_STATUS_OK;
389 * Register a dispatch function for a particular message type. Allow multiple
390 * registrants
392 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
393 void *private_data,
394 uint32_t msg_type,
395 void (*fn)(struct messaging_context *msg,
396 void *private_data,
397 uint32_t msg_type,
398 struct server_id server_id,
399 DATA_BLOB *data))
401 struct messaging_callback *cb;
403 DEBUG(5, ("Registering messaging pointer for type %u - "
404 "private_data=%p\n",
405 (unsigned)msg_type, private_data));
408 * Only one callback per type
411 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
412 /* we allow a second registration of the same message
413 type if it has a different private pointer. This is
414 needed in, for example, the internal notify code,
415 which creates a new notify context for each tree
416 connect, and expects to receive messages to each of
417 them. */
418 if (cb->msg_type == msg_type && private_data == cb->private_data) {
419 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
420 (unsigned)msg_type, private_data));
421 cb->fn = fn;
422 cb->private_data = private_data;
423 return NT_STATUS_OK;
427 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
428 return NT_STATUS_NO_MEMORY;
431 cb->msg_type = msg_type;
432 cb->fn = fn;
433 cb->private_data = private_data;
435 DLIST_ADD(msg_ctx->callbacks, cb);
436 return NT_STATUS_OK;
440 De-register the function for a particular message type.
442 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
443 void *private_data)
445 struct messaging_callback *cb, *next;
447 for (cb = ctx->callbacks; cb; cb = next) {
448 next = cb->next;
449 if ((cb->msg_type == msg_type)
450 && (cb->private_data == private_data)) {
451 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
452 (unsigned)msg_type, private_data));
453 DLIST_REMOVE(ctx->callbacks, cb);
454 TALLOC_FREE(cb);
460 Send a message to a particular server
462 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
463 struct server_id server, uint32_t msg_type,
464 const DATA_BLOB *data)
466 struct iovec iov = {0};
468 if (data != NULL) {
469 iov.iov_base = data->data;
470 iov.iov_len = data->length;
473 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
476 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
477 struct server_id server, uint32_t msg_type,
478 const uint8_t *buf, size_t len)
480 DATA_BLOB blob = data_blob_const(buf, len);
481 return messaging_send(msg_ctx, server, msg_type, &blob);
484 struct messaging_post_state {
485 struct messaging_context *msg_ctx;
486 struct messaging_rec *rec;
489 static void messaging_post_handler(struct tevent_context *ev,
490 struct tevent_immediate *ti,
491 void *private_data);
493 static int messaging_post_self(struct messaging_context *msg_ctx,
494 struct server_id src, struct server_id dst,
495 uint32_t msg_type,
496 const struct iovec *iov, int iovlen,
497 const int *fds, size_t num_fds)
499 struct tevent_immediate *ti;
500 struct messaging_post_state *state;
502 state = talloc(msg_ctx, struct messaging_post_state);
503 if (state == NULL) {
504 return ENOMEM;
506 state->msg_ctx = msg_ctx;
508 ti = tevent_create_immediate(state);
509 if (ti == NULL) {
510 goto fail;
512 state->rec = messaging_rec_create(
513 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
514 if (state->rec == NULL) {
515 goto fail;
518 tevent_schedule_immediate(ti, msg_ctx->event_ctx,
519 messaging_post_handler, state);
520 return 0;
522 fail:
523 TALLOC_FREE(state);
524 return ENOMEM;
527 static void messaging_post_handler(struct tevent_context *ev,
528 struct tevent_immediate *ti,
529 void *private_data)
531 struct messaging_post_state *state = talloc_get_type_abort(
532 private_data, struct messaging_post_state);
533 messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
534 TALLOC_FREE(state);
537 int messaging_send_iov_from(struct messaging_context *msg_ctx,
538 struct server_id src, struct server_id dst,
539 uint32_t msg_type,
540 const struct iovec *iov, int iovlen,
541 const int *fds, size_t num_fds)
543 int ret;
544 uint8_t hdr[MESSAGE_HDR_LENGTH];
545 struct iovec iov2[iovlen+1];
547 if (server_id_is_disconnected(&dst)) {
548 return EINVAL;
551 if (num_fds > INT8_MAX) {
552 return EINVAL;
555 if (dst.vnn != msg_ctx->id.vnn) {
556 if (num_fds > 0) {
557 return ENOSYS;
560 ret = msg_ctx->remote->send_fn(src, dst,
561 msg_type, iov, iovlen,
562 NULL, 0,
563 msg_ctx->remote);
564 return ret;
567 if (server_id_equal(&dst, &msg_ctx->id)) {
568 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
569 iov, iovlen, fds, num_fds);
570 return ret;
573 message_hdr_put(hdr, msg_type, src, dst);
574 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
575 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
577 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
579 if (ret == EACCES) {
580 become_root();
581 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
582 fds, num_fds);
583 unbecome_root();
586 return ret;
589 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
590 struct server_id dst, uint32_t msg_type,
591 const struct iovec *iov, int iovlen,
592 const int *fds, size_t num_fds)
594 int ret;
596 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
597 iov, iovlen, fds, num_fds);
598 if (ret != 0) {
599 return map_nt_error_from_unix(ret);
601 return NT_STATUS_OK;
604 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
605 struct messaging_rec *rec)
607 struct messaging_rec *result;
608 size_t fds_size = sizeof(int64_t) * rec->num_fds;
609 size_t payload_len;
611 payload_len = rec->buf.length + fds_size;
612 if (payload_len < rec->buf.length) {
613 /* overflow */
614 return NULL;
617 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
618 payload_len);
619 if (result == NULL) {
620 return NULL;
622 *result = *rec;
624 /* Doesn't fail, see talloc_pooled_object */
626 result->buf.data = talloc_memdup(result, rec->buf.data,
627 rec->buf.length);
629 result->fds = NULL;
630 if (result->num_fds > 0) {
631 result->fds = talloc_memdup(result, rec->fds, fds_size);
634 return result;
637 struct messaging_filtered_read_state {
638 struct tevent_context *ev;
639 struct messaging_context *msg_ctx;
640 void *tevent_handle;
642 bool (*filter)(struct messaging_rec *rec, void *private_data);
643 void *private_data;
645 struct messaging_rec *rec;
648 static void messaging_filtered_read_cleanup(struct tevent_req *req,
649 enum tevent_req_state req_state);
651 struct tevent_req *messaging_filtered_read_send(
652 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
653 struct messaging_context *msg_ctx,
654 bool (*filter)(struct messaging_rec *rec, void *private_data),
655 void *private_data)
657 struct tevent_req *req;
658 struct messaging_filtered_read_state *state;
659 size_t new_waiters_len;
661 req = tevent_req_create(mem_ctx, &state,
662 struct messaging_filtered_read_state);
663 if (req == NULL) {
664 return NULL;
666 state->ev = ev;
667 state->msg_ctx = msg_ctx;
668 state->filter = filter;
669 state->private_data = private_data;
672 * We have to defer the callback here, as we might be called from
673 * within a different tevent_context than state->ev
675 tevent_req_defer_callback(req, state->ev);
677 state->tevent_handle = messaging_dgm_register_tevent_context(
678 state, ev);
679 if (tevent_req_nomem(state->tevent_handle, req)) {
680 return tevent_req_post(req, ev);
684 * We add ourselves to the "new_waiters" array, not the "waiters"
685 * array. If we are called from within messaging_read_done,
686 * messaging_dispatch_rec will be in an active for-loop on
687 * "waiters". We must be careful not to mess with this array, because
688 * it could mean that a single event is being delivered twice.
691 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
693 if (new_waiters_len == msg_ctx->num_new_waiters) {
694 struct tevent_req **tmp;
696 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
697 struct tevent_req *, new_waiters_len+1);
698 if (tevent_req_nomem(tmp, req)) {
699 return tevent_req_post(req, ev);
701 msg_ctx->new_waiters = tmp;
704 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
705 msg_ctx->num_new_waiters += 1;
706 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
708 return req;
711 static void messaging_filtered_read_cleanup(struct tevent_req *req,
712 enum tevent_req_state req_state)
714 struct messaging_filtered_read_state *state = tevent_req_data(
715 req, struct messaging_filtered_read_state);
716 struct messaging_context *msg_ctx = state->msg_ctx;
717 unsigned i;
719 tevent_req_set_cleanup_fn(req, NULL);
721 TALLOC_FREE(state->tevent_handle);
724 * Just set the [new_]waiters entry to NULL, be careful not to mess
725 * with the other "waiters" array contents. We are often called from
726 * within "messaging_dispatch_rec", which loops over
727 * "waiters". Messing with the "waiters" array will mess up that
728 * for-loop.
731 for (i=0; i<msg_ctx->num_waiters; i++) {
732 if (msg_ctx->waiters[i] == req) {
733 msg_ctx->waiters[i] = NULL;
734 return;
738 for (i=0; i<msg_ctx->num_new_waiters; i++) {
739 if (msg_ctx->new_waiters[i] == req) {
740 msg_ctx->new_waiters[i] = NULL;
741 return;
746 static void messaging_filtered_read_done(struct tevent_req *req,
747 struct messaging_rec *rec)
749 struct messaging_filtered_read_state *state = tevent_req_data(
750 req, struct messaging_filtered_read_state);
752 state->rec = messaging_rec_dup(state, rec);
753 if (tevent_req_nomem(state->rec, req)) {
754 return;
756 tevent_req_done(req);
759 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
760 struct messaging_rec **presult)
762 struct messaging_filtered_read_state *state = tevent_req_data(
763 req, struct messaging_filtered_read_state);
764 int err;
766 if (tevent_req_is_unix_error(req, &err)) {
767 tevent_req_received(req);
768 return err;
770 if (presult != NULL) {
771 *presult = talloc_move(mem_ctx, &state->rec);
773 return 0;
776 struct messaging_read_state {
777 uint32_t msg_type;
778 struct messaging_rec *rec;
781 static bool messaging_read_filter(struct messaging_rec *rec,
782 void *private_data);
783 static void messaging_read_done(struct tevent_req *subreq);
785 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
786 struct tevent_context *ev,
787 struct messaging_context *msg,
788 uint32_t msg_type)
790 struct tevent_req *req, *subreq;
791 struct messaging_read_state *state;
793 req = tevent_req_create(mem_ctx, &state,
794 struct messaging_read_state);
795 if (req == NULL) {
796 return NULL;
798 state->msg_type = msg_type;
800 subreq = messaging_filtered_read_send(state, ev, msg,
801 messaging_read_filter, state);
802 if (tevent_req_nomem(subreq, req)) {
803 return tevent_req_post(req, ev);
805 tevent_req_set_callback(subreq, messaging_read_done, req);
806 return req;
809 static bool messaging_read_filter(struct messaging_rec *rec,
810 void *private_data)
812 struct messaging_read_state *state = talloc_get_type_abort(
813 private_data, struct messaging_read_state);
815 if (rec->num_fds != 0) {
816 return false;
819 return rec->msg_type == state->msg_type;
822 static void messaging_read_done(struct tevent_req *subreq)
824 struct tevent_req *req = tevent_req_callback_data(
825 subreq, struct tevent_req);
826 struct messaging_read_state *state = tevent_req_data(
827 req, struct messaging_read_state);
828 int ret;
830 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
831 TALLOC_FREE(subreq);
832 if (tevent_req_error(req, ret)) {
833 return;
835 tevent_req_done(req);
838 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
839 struct messaging_rec **presult)
841 struct messaging_read_state *state = tevent_req_data(
842 req, struct messaging_read_state);
843 int err;
845 if (tevent_req_is_unix_error(req, &err)) {
846 return err;
848 if (presult != NULL) {
849 *presult = talloc_move(mem_ctx, &state->rec);
851 return 0;
854 struct messaging_handler_state {
855 struct tevent_context *ev;
856 struct messaging_context *msg_ctx;
857 uint32_t msg_type;
858 bool (*handler)(struct messaging_context *msg_ctx,
859 struct messaging_rec **rec, void *private_data);
860 void *private_data;
863 static void messaging_handler_got_msg(struct tevent_req *subreq);
865 struct tevent_req *messaging_handler_send(
866 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
867 struct messaging_context *msg_ctx, uint32_t msg_type,
868 bool (*handler)(struct messaging_context *msg_ctx,
869 struct messaging_rec **rec, void *private_data),
870 void *private_data)
872 struct tevent_req *req, *subreq;
873 struct messaging_handler_state *state;
875 req = tevent_req_create(mem_ctx, &state,
876 struct messaging_handler_state);
877 if (req == NULL) {
878 return NULL;
880 state->ev = ev;
881 state->msg_ctx = msg_ctx;
882 state->msg_type = msg_type;
883 state->handler = handler;
884 state->private_data = private_data;
886 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
887 state->msg_type);
888 if (tevent_req_nomem(subreq, req)) {
889 return tevent_req_post(req, ev);
891 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
892 return req;
895 static void messaging_handler_got_msg(struct tevent_req *subreq)
897 struct tevent_req *req = tevent_req_callback_data(
898 subreq, struct tevent_req);
899 struct messaging_handler_state *state = tevent_req_data(
900 req, struct messaging_handler_state);
901 struct messaging_rec *rec;
902 int ret;
903 bool ok;
905 ret = messaging_read_recv(subreq, state, &rec);
906 TALLOC_FREE(subreq);
907 if (tevent_req_error(req, ret)) {
908 return;
911 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
912 state->msg_type);
913 if (tevent_req_nomem(subreq, req)) {
914 return;
916 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
918 ok = state->handler(state->msg_ctx, &rec, state->private_data);
919 TALLOC_FREE(rec);
920 if (ok) {
922 * Next round
924 return;
926 TALLOC_FREE(subreq);
927 tevent_req_done(req);
930 int messaging_handler_recv(struct tevent_req *req)
932 return tevent_req_simple_recv_unix(req);
935 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
937 if (msg_ctx->num_new_waiters == 0) {
938 return true;
941 if (talloc_array_length(msg_ctx->waiters) <
942 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
943 struct tevent_req **tmp;
944 tmp = talloc_realloc(
945 msg_ctx, msg_ctx->waiters, struct tevent_req *,
946 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
947 if (tmp == NULL) {
948 DEBUG(1, ("%s: talloc failed\n", __func__));
949 return false;
951 msg_ctx->waiters = tmp;
954 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
955 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
957 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
958 msg_ctx->num_new_waiters = 0;
960 return true;
963 static void messaging_dispatch_classic(struct messaging_context *msg_ctx,
964 struct messaging_rec *rec)
966 struct messaging_callback *cb, *next;
968 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
969 size_t j;
971 next = cb->next;
972 if (cb->msg_type != rec->msg_type) {
973 continue;
977 * the old style callbacks don't support fd passing
979 for (j=0; j < rec->num_fds; j++) {
980 int fd = rec->fds[j];
981 close(fd);
983 rec->num_fds = 0;
984 rec->fds = NULL;
986 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
987 rec->src, &rec->buf);
990 * we continue looking for matching messages after finding
991 * one. This matters for subsystems like the internal notify
992 * code which register more than one handler for the same
993 * message type
999 Dispatch one messaging_rec
1001 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1002 struct tevent_context *ev,
1003 struct messaging_rec *rec)
1005 unsigned i;
1006 size_t j;
1008 if (ev == msg_ctx->event_ctx) {
1009 messaging_dispatch_classic(msg_ctx, rec);
1012 if (!messaging_append_new_waiters(msg_ctx)) {
1013 for (j=0; j < rec->num_fds; j++) {
1014 int fd = rec->fds[j];
1015 close(fd);
1017 rec->num_fds = 0;
1018 rec->fds = NULL;
1019 return;
1022 i = 0;
1023 while (i < msg_ctx->num_waiters) {
1024 struct tevent_req *req;
1025 struct messaging_filtered_read_state *state;
1027 req = msg_ctx->waiters[i];
1028 if (req == NULL) {
1030 * This got cleaned up. In the meantime,
1031 * move everything down one. We need
1032 * to keep the order of waiters, as
1033 * other code may depend on this.
1035 if (i < msg_ctx->num_waiters - 1) {
1036 memmove(&msg_ctx->waiters[i],
1037 &msg_ctx->waiters[i+1],
1038 sizeof(struct tevent_req *) *
1039 (msg_ctx->num_waiters - i - 1));
1041 msg_ctx->num_waiters -= 1;
1042 continue;
1045 state = tevent_req_data(
1046 req, struct messaging_filtered_read_state);
1047 if ((ev == state->ev) &&
1048 state->filter(rec, state->private_data)) {
1049 messaging_filtered_read_done(req, rec);
1052 * Only the first one gets the fd-array
1054 rec->num_fds = 0;
1055 rec->fds = NULL;
1058 i += 1;
1061 if (ev != msg_ctx->event_ctx) {
1062 struct iovec iov;
1063 int fds[rec->num_fds];
1064 int ret;
1067 * We've been listening on a nested event
1068 * context. Messages need to be handled in the main
1069 * event context, so post to ourselves
1072 iov.iov_base = rec->buf.data;
1073 iov.iov_len = rec->buf.length;
1075 for (i=0; i<rec->num_fds; i++) {
1076 fds[i] = rec->fds[i];
1079 ret = messaging_post_self(
1080 msg_ctx, rec->src, rec->dest, rec->msg_type,
1081 &iov, 1, fds, rec->num_fds);
1082 if (ret == 0) {
1083 return;
1088 * If the fd-array isn't used, just close it.
1090 for (j=0; j < rec->num_fds; j++) {
1091 int fd = rec->fds[j];
1092 close(fd);
1094 rec->num_fds = 0;
1095 rec->fds = NULL;
1098 static int mess_parent_dgm_cleanup(void *private_data);
1099 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1101 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1103 struct tevent_req *req;
1105 req = background_job_send(
1106 msg, msg->event_ctx, msg, NULL, 0,
1107 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1108 60*15),
1109 mess_parent_dgm_cleanup, msg);
1110 if (req == NULL) {
1111 return false;
1113 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1114 return true;
1117 static int mess_parent_dgm_cleanup(void *private_data)
1119 int ret;
1121 ret = messaging_dgm_wipe();
1122 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1123 ret ? strerror(ret) : "ok"));
1124 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1125 60*15);
1128 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1130 struct messaging_context *msg = tevent_req_callback_data(
1131 req, struct messaging_context);
1132 NTSTATUS status;
1134 status = background_job_recv(req);
1135 TALLOC_FREE(req);
1136 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1137 nt_errstr(status)));
1139 req = background_job_send(
1140 msg, msg->event_ctx, msg, NULL, 0,
1141 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1142 60*15),
1143 mess_parent_dgm_cleanup, msg);
1144 if (req == NULL) {
1145 DEBUG(1, ("background_job_send failed\n"));
1146 return;
1148 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1151 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1153 int ret;
1155 if (pid == 0) {
1156 ret = messaging_dgm_wipe();
1157 } else {
1158 ret = messaging_dgm_cleanup(pid);
1161 return ret;
1164 struct tevent_context *messaging_tevent_context(
1165 struct messaging_context *msg_ctx)
1167 return msg_ctx->event_ctx;
1170 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1172 return msg_ctx->names_db;
1175 /** @} **/