s3:trusts_util: pass dcname to trust_pw_change()
[Samba.git] / source3 / lib / messages.c
blobb0edb30ab14cc889d30a8a977cb1ea817acbf236
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/messages_ctdbd.h"
57 #include "lib/util/iov_buf.h"
58 #include "lib/util/server_id_db.h"
59 #include "lib/messages_dgm_ref.h"
60 #include "lib/messages_util.h"
62 struct messaging_callback {
63 struct messaging_callback *prev, *next;
64 uint32_t msg_type;
65 void (*fn)(struct messaging_context *msg, void *private_data,
66 uint32_t msg_type,
67 struct server_id server_id, DATA_BLOB *data);
68 void *private_data;
71 struct messaging_context {
72 struct server_id id;
73 struct tevent_context *event_ctx;
74 struct messaging_callback *callbacks;
76 struct tevent_req **new_waiters;
77 size_t num_new_waiters;
79 struct tevent_req **waiters;
80 size_t num_waiters;
82 void *msg_dgm_ref;
83 struct messaging_backend *remote;
85 struct server_id_db *names_db;
88 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
89 struct messaging_rec *rec);
90 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
91 struct tevent_context *ev,
92 struct messaging_rec *rec);
94 /****************************************************************************
95 A useful function for testing the message system.
96 ****************************************************************************/
98 static void ping_message(struct messaging_context *msg_ctx,
99 void *private_data,
100 uint32_t msg_type,
101 struct server_id src,
102 DATA_BLOB *data)
104 struct server_id_buf idbuf;
106 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
107 server_id_str_buf(src, &idbuf), (int)data->length,
108 data->data ? (char *)data->data : ""));
110 messaging_send(msg_ctx, src, MSG_PONG, data);
113 static struct messaging_rec *messaging_rec_create(
114 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
115 uint32_t msg_type, const struct iovec *iov, int iovlen,
116 const int *fds, size_t num_fds)
118 ssize_t buflen;
119 uint8_t *buf;
120 struct messaging_rec *result;
122 if (num_fds > INT8_MAX) {
123 return NULL;
126 buflen = iov_buflen(iov, iovlen);
127 if (buflen == -1) {
128 return NULL;
130 buf = talloc_array(mem_ctx, uint8_t, buflen);
131 if (buf == NULL) {
132 return NULL;
134 iov_buf(iov, iovlen, buf, buflen);
137 struct messaging_rec rec;
138 int64_t fds64[num_fds];
139 size_t i;
141 for (i=0; i<num_fds; i++) {
142 fds64[i] = fds[i];
145 rec = (struct messaging_rec) {
146 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
147 .src = src, .dest = dst,
148 .buf.data = buf, .buf.length = buflen,
149 .num_fds = num_fds, .fds = fds64,
152 result = messaging_rec_dup(mem_ctx, &rec);
155 TALLOC_FREE(buf);
157 return result;
160 static void messaging_recv_cb(struct tevent_context *ev,
161 const uint8_t *msg, size_t msg_len,
162 int *fds, size_t num_fds,
163 void *private_data)
165 struct messaging_context *msg_ctx = talloc_get_type_abort(
166 private_data, struct messaging_context);
167 struct server_id_buf idbuf;
168 struct messaging_rec rec;
169 int64_t fds64[MIN(num_fds, INT8_MAX)];
170 size_t i;
172 if (msg_len < MESSAGE_HDR_LENGTH) {
173 DBG_WARNING("message too short: %zu\n", msg_len);
174 goto close_fail;
177 if (num_fds > INT8_MAX) {
178 DBG_WARNING("too many fds: %zu\n", num_fds);
179 goto close_fail;
183 * "consume" the fds by copying them and setting
184 * the original variable to -1
186 for (i=0; i < num_fds; i++) {
187 fds64[i] = fds[i];
188 fds[i] = -1;
191 rec = (struct messaging_rec) {
192 .msg_version = MESSAGE_VERSION,
193 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
194 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
195 .num_fds = num_fds,
196 .fds = fds64,
199 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
201 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
202 (unsigned)rec.msg_type, rec.buf.length, num_fds,
203 server_id_str_buf(rec.src, &idbuf));
205 messaging_dispatch_rec(msg_ctx, ev, &rec);
206 return;
208 close_fail:
209 for (i=0; i < num_fds; i++) {
210 close(fds[i]);
214 static int messaging_context_destructor(struct messaging_context *ctx)
216 size_t i;
218 for (i=0; i<ctx->num_new_waiters; i++) {
219 if (ctx->new_waiters[i] != NULL) {
220 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
221 ctx->new_waiters[i] = NULL;
224 for (i=0; i<ctx->num_waiters; i++) {
225 if (ctx->waiters[i] != NULL) {
226 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
227 ctx->waiters[i] = NULL;
231 return 0;
234 static const char *private_path(const char *name)
236 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
239 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
240 struct tevent_context *ev,
241 struct messaging_context **pmsg_ctx)
243 TALLOC_CTX *frame;
244 struct messaging_context *ctx;
245 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
246 int ret;
247 const char *lck_path;
248 const char *priv_path;
249 bool ok;
251 lck_path = lock_path("msg.lock");
252 if (lck_path == NULL) {
253 return NT_STATUS_NO_MEMORY;
256 ok = directory_create_or_exist_strict(lck_path,
257 sec_initial_uid(),
258 0755);
259 if (!ok) {
260 DBG_DEBUG("Could not create lock directory: %s\n",
261 strerror(errno));
262 return NT_STATUS_ACCESS_DENIED;
265 priv_path = private_path("msg.sock");
266 if (priv_path == NULL) {
267 return NT_STATUS_NO_MEMORY;
270 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
271 0700);
272 if (!ok) {
273 DBG_DEBUG("Could not create msg directory: %s\n",
274 strerror(errno));
275 return NT_STATUS_ACCESS_DENIED;
278 frame = talloc_stackframe();
279 if (frame == NULL) {
280 return NT_STATUS_NO_MEMORY;
283 ctx = talloc_zero(frame, struct messaging_context);
284 if (ctx == NULL) {
285 status = NT_STATUS_NO_MEMORY;
286 goto done;
289 ctx->id = (struct server_id) {
290 .pid = getpid(), .vnn = NONCLUSTER_VNN
293 ctx->event_ctx = ev;
295 sec_init();
297 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
298 ctx->event_ctx,
299 &ctx->id.unique_id,
300 priv_path,
301 lck_path,
302 messaging_recv_cb,
303 ctx,
304 &ret);
305 if (ctx->msg_dgm_ref == NULL) {
306 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
307 status = map_nt_error_from_unix(ret);
308 goto done;
310 talloc_set_destructor(ctx, messaging_context_destructor);
312 if (lp_clustering()) {
313 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
315 if (ret != 0) {
316 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
317 strerror(ret)));
318 status = map_nt_error_from_unix(ret);
319 goto done;
322 ctx->id.vnn = get_my_vnn();
324 ctx->names_db = server_id_db_init(ctx,
325 ctx->id,
326 lp_lock_directory(),
328 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
329 if (ctx->names_db == NULL) {
330 DBG_DEBUG("server_id_db_init failed\n");
331 status = NT_STATUS_NO_MEMORY;
332 goto done;
335 messaging_register(ctx, NULL, MSG_PING, ping_message);
337 /* Register some debugging related messages */
339 register_msg_pool_usage(ctx);
340 register_dmalloc_msgs(ctx);
341 debug_register_msgs(ctx);
344 struct server_id_buf tmp;
345 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
348 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
350 status = NT_STATUS_OK;
351 done:
352 TALLOC_FREE(frame);
354 return status;
357 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
358 struct tevent_context *ev)
360 struct messaging_context *ctx = NULL;
361 NTSTATUS status;
363 status = messaging_init_internal(mem_ctx,
365 &ctx);
366 if (!NT_STATUS_IS_OK(status)) {
367 return NULL;
370 return ctx;
373 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
374 struct tevent_context *ev,
375 struct messaging_context **pmsg_ctx)
377 return messaging_init_internal(mem_ctx,
379 pmsg_ctx);
382 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
384 return msg_ctx->id;
388 * re-init after a fork
390 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
392 int ret;
393 char *lck_path;
395 TALLOC_FREE(msg_ctx->msg_dgm_ref);
397 msg_ctx->id = (struct server_id) {
398 .pid = getpid(), .vnn = msg_ctx->id.vnn
401 lck_path = lock_path("msg.lock");
402 if (lck_path == NULL) {
403 return NT_STATUS_NO_MEMORY;
406 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
407 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
408 private_path("msg.sock"), lck_path,
409 messaging_recv_cb, msg_ctx, &ret);
411 if (msg_ctx->msg_dgm_ref == NULL) {
412 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
413 return map_nt_error_from_unix(ret);
416 if (lp_clustering()) {
417 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
418 msg_ctx->remote);
420 if (ret != 0) {
421 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
422 strerror(ret)));
423 return map_nt_error_from_unix(ret);
427 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
429 return NT_STATUS_OK;
434 * Register a dispatch function for a particular message type. Allow multiple
435 * registrants
437 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
438 void *private_data,
439 uint32_t msg_type,
440 void (*fn)(struct messaging_context *msg,
441 void *private_data,
442 uint32_t msg_type,
443 struct server_id server_id,
444 DATA_BLOB *data))
446 struct messaging_callback *cb;
448 DEBUG(5, ("Registering messaging pointer for type %u - "
449 "private_data=%p\n",
450 (unsigned)msg_type, private_data));
453 * Only one callback per type
456 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
457 /* we allow a second registration of the same message
458 type if it has a different private pointer. This is
459 needed in, for example, the internal notify code,
460 which creates a new notify context for each tree
461 connect, and expects to receive messages to each of
462 them. */
463 if (cb->msg_type == msg_type && private_data == cb->private_data) {
464 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
465 (unsigned)msg_type, private_data));
466 cb->fn = fn;
467 cb->private_data = private_data;
468 return NT_STATUS_OK;
472 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
473 return NT_STATUS_NO_MEMORY;
476 cb->msg_type = msg_type;
477 cb->fn = fn;
478 cb->private_data = private_data;
480 DLIST_ADD(msg_ctx->callbacks, cb);
481 return NT_STATUS_OK;
485 De-register the function for a particular message type.
487 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
488 void *private_data)
490 struct messaging_callback *cb, *next;
492 for (cb = ctx->callbacks; cb; cb = next) {
493 next = cb->next;
494 if ((cb->msg_type == msg_type)
495 && (cb->private_data == private_data)) {
496 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
497 (unsigned)msg_type, private_data));
498 DLIST_REMOVE(ctx->callbacks, cb);
499 TALLOC_FREE(cb);
505 Send a message to a particular server
507 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
508 struct server_id server, uint32_t msg_type,
509 const DATA_BLOB *data)
511 struct iovec iov = {0};
513 if (data != NULL) {
514 iov.iov_base = data->data;
515 iov.iov_len = data->length;
518 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
521 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
522 struct server_id server, uint32_t msg_type,
523 const uint8_t *buf, size_t len)
525 DATA_BLOB blob = data_blob_const(buf, len);
526 return messaging_send(msg_ctx, server, msg_type, &blob);
529 struct messaging_post_state {
530 struct messaging_context *msg_ctx;
531 struct messaging_rec *rec;
534 static void messaging_post_handler(struct tevent_context *ev,
535 struct tevent_immediate *ti,
536 void *private_data);
538 static int messaging_post_self(struct messaging_context *msg_ctx,
539 struct server_id src, struct server_id dst,
540 uint32_t msg_type,
541 const struct iovec *iov, int iovlen,
542 const int *fds, size_t num_fds)
544 struct tevent_immediate *ti;
545 struct messaging_post_state *state;
547 state = talloc(msg_ctx, struct messaging_post_state);
548 if (state == NULL) {
549 return ENOMEM;
551 state->msg_ctx = msg_ctx;
553 ti = tevent_create_immediate(state);
554 if (ti == NULL) {
555 goto fail;
557 state->rec = messaging_rec_create(
558 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
559 if (state->rec == NULL) {
560 goto fail;
563 tevent_schedule_immediate(ti, msg_ctx->event_ctx,
564 messaging_post_handler, state);
565 return 0;
567 fail:
568 TALLOC_FREE(state);
569 return ENOMEM;
572 static void messaging_post_handler(struct tevent_context *ev,
573 struct tevent_immediate *ti,
574 void *private_data)
576 struct messaging_post_state *state = talloc_get_type_abort(
577 private_data, struct messaging_post_state);
578 messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
579 TALLOC_FREE(state);
582 int messaging_send_iov_from(struct messaging_context *msg_ctx,
583 struct server_id src, struct server_id dst,
584 uint32_t msg_type,
585 const struct iovec *iov, int iovlen,
586 const int *fds, size_t num_fds)
588 int ret;
589 uint8_t hdr[MESSAGE_HDR_LENGTH];
590 struct iovec iov2[iovlen+1];
592 if (server_id_is_disconnected(&dst)) {
593 return EINVAL;
596 if (num_fds > INT8_MAX) {
597 return EINVAL;
600 if (dst.vnn != msg_ctx->id.vnn) {
601 if (num_fds > 0) {
602 return ENOSYS;
605 ret = msg_ctx->remote->send_fn(src, dst,
606 msg_type, iov, iovlen,
607 NULL, 0,
608 msg_ctx->remote);
609 return ret;
612 if (server_id_equal(&dst, &msg_ctx->id)) {
613 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
614 iov, iovlen, fds, num_fds);
615 return ret;
618 message_hdr_put(hdr, msg_type, src, dst);
619 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
620 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
622 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
624 if (ret == EACCES) {
625 become_root();
626 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
627 fds, num_fds);
628 unbecome_root();
631 if (ret == ECONNREFUSED) {
633 * Linux returns this when a socket exists in the file
634 * system without a listening process. This is not
635 * documented in susv4 or the linux manpages, but it's
636 * easily testable. For the higher levels this is the
637 * same as "destination does not exist"
639 ret = ENOENT;
642 return ret;
645 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
646 struct server_id dst, uint32_t msg_type,
647 const struct iovec *iov, int iovlen,
648 const int *fds, size_t num_fds)
650 int ret;
652 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
653 iov, iovlen, fds, num_fds);
654 if (ret != 0) {
655 return map_nt_error_from_unix(ret);
657 return NT_STATUS_OK;
660 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
661 struct messaging_rec *rec)
663 struct messaging_rec *result;
664 size_t fds_size = sizeof(int64_t) * rec->num_fds;
665 size_t payload_len;
667 payload_len = rec->buf.length + fds_size;
668 if (payload_len < rec->buf.length) {
669 /* overflow */
670 return NULL;
673 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
674 payload_len);
675 if (result == NULL) {
676 return NULL;
678 *result = *rec;
680 /* Doesn't fail, see talloc_pooled_object */
682 result->buf.data = talloc_memdup(result, rec->buf.data,
683 rec->buf.length);
685 result->fds = NULL;
686 if (result->num_fds > 0) {
687 result->fds = talloc_memdup(result, rec->fds, fds_size);
690 return result;
693 struct messaging_filtered_read_state {
694 struct tevent_context *ev;
695 struct messaging_context *msg_ctx;
696 struct messaging_dgm_fde *fde;
698 bool (*filter)(struct messaging_rec *rec, void *private_data);
699 void *private_data;
701 struct messaging_rec *rec;
704 static void messaging_filtered_read_cleanup(struct tevent_req *req,
705 enum tevent_req_state req_state);
707 struct tevent_req *messaging_filtered_read_send(
708 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
709 struct messaging_context *msg_ctx,
710 bool (*filter)(struct messaging_rec *rec, void *private_data),
711 void *private_data)
713 struct tevent_req *req;
714 struct messaging_filtered_read_state *state;
715 size_t new_waiters_len;
717 req = tevent_req_create(mem_ctx, &state,
718 struct messaging_filtered_read_state);
719 if (req == NULL) {
720 return NULL;
722 state->ev = ev;
723 state->msg_ctx = msg_ctx;
724 state->filter = filter;
725 state->private_data = private_data;
728 * We have to defer the callback here, as we might be called from
729 * within a different tevent_context than state->ev
731 tevent_req_defer_callback(req, state->ev);
733 state->fde = messaging_dgm_register_tevent_context(state, ev);
734 if (tevent_req_nomem(state->fde, req)) {
735 return tevent_req_post(req, ev);
739 * We add ourselves to the "new_waiters" array, not the "waiters"
740 * array. If we are called from within messaging_read_done,
741 * messaging_dispatch_rec will be in an active for-loop on
742 * "waiters". We must be careful not to mess with this array, because
743 * it could mean that a single event is being delivered twice.
746 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
748 if (new_waiters_len == msg_ctx->num_new_waiters) {
749 struct tevent_req **tmp;
751 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
752 struct tevent_req *, new_waiters_len+1);
753 if (tevent_req_nomem(tmp, req)) {
754 return tevent_req_post(req, ev);
756 msg_ctx->new_waiters = tmp;
759 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
760 msg_ctx->num_new_waiters += 1;
761 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
763 return req;
766 static void messaging_filtered_read_cleanup(struct tevent_req *req,
767 enum tevent_req_state req_state)
769 struct messaging_filtered_read_state *state = tevent_req_data(
770 req, struct messaging_filtered_read_state);
771 struct messaging_context *msg_ctx = state->msg_ctx;
772 size_t i;
774 tevent_req_set_cleanup_fn(req, NULL);
776 TALLOC_FREE(state->fde);
779 * Just set the [new_]waiters entry to NULL, be careful not to mess
780 * with the other "waiters" array contents. We are often called from
781 * within "messaging_dispatch_rec", which loops over
782 * "waiters". Messing with the "waiters" array will mess up that
783 * for-loop.
786 for (i=0; i<msg_ctx->num_waiters; i++) {
787 if (msg_ctx->waiters[i] == req) {
788 msg_ctx->waiters[i] = NULL;
789 return;
793 for (i=0; i<msg_ctx->num_new_waiters; i++) {
794 if (msg_ctx->new_waiters[i] == req) {
795 msg_ctx->new_waiters[i] = NULL;
796 return;
801 static void messaging_filtered_read_done(struct tevent_req *req,
802 struct messaging_rec *rec)
804 struct messaging_filtered_read_state *state = tevent_req_data(
805 req, struct messaging_filtered_read_state);
807 state->rec = messaging_rec_dup(state, rec);
808 if (tevent_req_nomem(state->rec, req)) {
809 return;
811 tevent_req_done(req);
814 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
815 struct messaging_rec **presult)
817 struct messaging_filtered_read_state *state = tevent_req_data(
818 req, struct messaging_filtered_read_state);
819 int err;
821 if (tevent_req_is_unix_error(req, &err)) {
822 tevent_req_received(req);
823 return err;
825 if (presult != NULL) {
826 *presult = talloc_move(mem_ctx, &state->rec);
828 return 0;
831 struct messaging_read_state {
832 uint32_t msg_type;
833 struct messaging_rec *rec;
836 static bool messaging_read_filter(struct messaging_rec *rec,
837 void *private_data);
838 static void messaging_read_done(struct tevent_req *subreq);
840 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
841 struct tevent_context *ev,
842 struct messaging_context *msg,
843 uint32_t msg_type)
845 struct tevent_req *req, *subreq;
846 struct messaging_read_state *state;
848 req = tevent_req_create(mem_ctx, &state,
849 struct messaging_read_state);
850 if (req == NULL) {
851 return NULL;
853 state->msg_type = msg_type;
855 subreq = messaging_filtered_read_send(state, ev, msg,
856 messaging_read_filter, state);
857 if (tevent_req_nomem(subreq, req)) {
858 return tevent_req_post(req, ev);
860 tevent_req_set_callback(subreq, messaging_read_done, req);
861 return req;
864 static bool messaging_read_filter(struct messaging_rec *rec,
865 void *private_data)
867 struct messaging_read_state *state = talloc_get_type_abort(
868 private_data, struct messaging_read_state);
870 if (rec->num_fds != 0) {
871 return false;
874 return rec->msg_type == state->msg_type;
877 static void messaging_read_done(struct tevent_req *subreq)
879 struct tevent_req *req = tevent_req_callback_data(
880 subreq, struct tevent_req);
881 struct messaging_read_state *state = tevent_req_data(
882 req, struct messaging_read_state);
883 int ret;
885 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
886 TALLOC_FREE(subreq);
887 if (tevent_req_error(req, ret)) {
888 return;
890 tevent_req_done(req);
893 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
894 struct messaging_rec **presult)
896 struct messaging_read_state *state = tevent_req_data(
897 req, struct messaging_read_state);
898 int err;
900 if (tevent_req_is_unix_error(req, &err)) {
901 return err;
903 if (presult != NULL) {
904 *presult = talloc_move(mem_ctx, &state->rec);
906 return 0;
909 struct messaging_handler_state {
910 struct tevent_context *ev;
911 struct messaging_context *msg_ctx;
912 uint32_t msg_type;
913 bool (*handler)(struct messaging_context *msg_ctx,
914 struct messaging_rec **rec, void *private_data);
915 void *private_data;
918 static void messaging_handler_got_msg(struct tevent_req *subreq);
920 struct tevent_req *messaging_handler_send(
921 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
922 struct messaging_context *msg_ctx, uint32_t msg_type,
923 bool (*handler)(struct messaging_context *msg_ctx,
924 struct messaging_rec **rec, void *private_data),
925 void *private_data)
927 struct tevent_req *req, *subreq;
928 struct messaging_handler_state *state;
930 req = tevent_req_create(mem_ctx, &state,
931 struct messaging_handler_state);
932 if (req == NULL) {
933 return NULL;
935 state->ev = ev;
936 state->msg_ctx = msg_ctx;
937 state->msg_type = msg_type;
938 state->handler = handler;
939 state->private_data = private_data;
941 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
942 state->msg_type);
943 if (tevent_req_nomem(subreq, req)) {
944 return tevent_req_post(req, ev);
946 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
947 return req;
950 static void messaging_handler_got_msg(struct tevent_req *subreq)
952 struct tevent_req *req = tevent_req_callback_data(
953 subreq, struct tevent_req);
954 struct messaging_handler_state *state = tevent_req_data(
955 req, struct messaging_handler_state);
956 struct messaging_rec *rec;
957 int ret;
958 bool ok;
960 ret = messaging_read_recv(subreq, state, &rec);
961 TALLOC_FREE(subreq);
962 if (tevent_req_error(req, ret)) {
963 return;
966 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
967 state->msg_type);
968 if (tevent_req_nomem(subreq, req)) {
969 return;
971 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
973 ok = state->handler(state->msg_ctx, &rec, state->private_data);
974 TALLOC_FREE(rec);
975 if (ok) {
977 * Next round
979 return;
981 TALLOC_FREE(subreq);
982 tevent_req_done(req);
985 int messaging_handler_recv(struct tevent_req *req)
987 return tevent_req_simple_recv_unix(req);
990 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
992 if (msg_ctx->num_new_waiters == 0) {
993 return true;
996 if (talloc_array_length(msg_ctx->waiters) <
997 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
998 struct tevent_req **tmp;
999 tmp = talloc_realloc(
1000 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1001 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1002 if (tmp == NULL) {
1003 DEBUG(1, ("%s: talloc failed\n", __func__));
1004 return false;
1006 msg_ctx->waiters = tmp;
1009 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1010 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1012 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1013 msg_ctx->num_new_waiters = 0;
1015 return true;
1018 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1019 struct messaging_rec *rec)
1021 struct messaging_callback *cb, *next;
1023 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1024 size_t j;
1026 next = cb->next;
1027 if (cb->msg_type != rec->msg_type) {
1028 continue;
1032 * the old style callbacks don't support fd passing
1034 for (j=0; j < rec->num_fds; j++) {
1035 int fd = rec->fds[j];
1036 close(fd);
1038 rec->num_fds = 0;
1039 rec->fds = NULL;
1041 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1042 rec->src, &rec->buf);
1044 return true;
1047 return false;
1051 Dispatch one messaging_rec
1053 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1054 struct tevent_context *ev,
1055 struct messaging_rec *rec)
1057 size_t i;
1058 bool consumed;
1060 if (ev == msg_ctx->event_ctx) {
1061 consumed = messaging_dispatch_classic(msg_ctx, rec);
1062 if (consumed) {
1063 return;
1067 if (!messaging_append_new_waiters(msg_ctx)) {
1068 size_t j;
1069 for (j=0; j < rec->num_fds; j++) {
1070 int fd = rec->fds[j];
1071 close(fd);
1073 rec->num_fds = 0;
1074 rec->fds = NULL;
1075 return;
1078 i = 0;
1079 while (i < msg_ctx->num_waiters) {
1080 struct tevent_req *req;
1081 struct messaging_filtered_read_state *state;
1083 req = msg_ctx->waiters[i];
1084 if (req == NULL) {
1086 * This got cleaned up. In the meantime,
1087 * move everything down one. We need
1088 * to keep the order of waiters, as
1089 * other code may depend on this.
1091 if (i < msg_ctx->num_waiters - 1) {
1092 memmove(&msg_ctx->waiters[i],
1093 &msg_ctx->waiters[i+1],
1094 sizeof(struct tevent_req *) *
1095 (msg_ctx->num_waiters - i - 1));
1097 msg_ctx->num_waiters -= 1;
1098 continue;
1101 state = tevent_req_data(
1102 req, struct messaging_filtered_read_state);
1103 if ((ev == state->ev) &&
1104 state->filter(rec, state->private_data)) {
1105 messaging_filtered_read_done(req, rec);
1106 return;
1109 i += 1;
1112 if (ev != msg_ctx->event_ctx) {
1113 struct iovec iov;
1114 int fds[rec->num_fds];
1115 int ret;
1118 * We've been listening on a nested event
1119 * context. Messages need to be handled in the main
1120 * event context, so post to ourselves
1123 iov.iov_base = rec->buf.data;
1124 iov.iov_len = rec->buf.length;
1126 for (i=0; i<rec->num_fds; i++) {
1127 fds[i] = rec->fds[i];
1130 ret = messaging_post_self(
1131 msg_ctx, rec->src, rec->dest, rec->msg_type,
1132 &iov, 1, fds, rec->num_fds);
1133 if (ret == 0) {
1134 return;
1139 * If the fd-array isn't used, just close it.
1141 for (i=0; i < rec->num_fds; i++) {
1142 int fd = rec->fds[i];
1143 close(fd);
1145 rec->num_fds = 0;
1146 rec->fds = NULL;
1149 static int mess_parent_dgm_cleanup(void *private_data);
1150 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1152 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1154 struct tevent_req *req;
1156 req = background_job_send(
1157 msg, msg->event_ctx, msg, NULL, 0,
1158 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1159 60*15),
1160 mess_parent_dgm_cleanup, msg);
1161 if (req == NULL) {
1162 return false;
1164 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1165 return true;
1168 static int mess_parent_dgm_cleanup(void *private_data)
1170 int ret;
1172 ret = messaging_dgm_wipe();
1173 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1174 ret ? strerror(ret) : "ok"));
1175 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1176 60*15);
1179 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1181 struct messaging_context *msg = tevent_req_callback_data(
1182 req, struct messaging_context);
1183 NTSTATUS status;
1185 status = background_job_recv(req);
1186 TALLOC_FREE(req);
1187 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1188 nt_errstr(status)));
1190 req = background_job_send(
1191 msg, msg->event_ctx, msg, NULL, 0,
1192 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1193 60*15),
1194 mess_parent_dgm_cleanup, msg);
1195 if (req == NULL) {
1196 DEBUG(1, ("background_job_send failed\n"));
1197 return;
1199 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1202 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1204 int ret;
1206 if (pid == 0) {
1207 ret = messaging_dgm_wipe();
1208 } else {
1209 ret = messaging_dgm_cleanup(pid);
1212 return ret;
1215 struct tevent_context *messaging_tevent_context(
1216 struct messaging_context *msg_ctx)
1218 return msg_ctx->event_ctx;
1221 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1223 return msg_ctx->names_db;
1226 /** @} **/