s3:auth: only call secrets_fetch_domain_sid() once in finalize_local_nt_token()
[Samba.git] / source3 / lib / messages.c
blob464233fda2c3d0b0e9b4af97ca025d8aa5484884
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
70 struct messaging_callback {
71 struct messaging_callback *prev, *next;
72 uint32_t msg_type;
73 void (*fn)(struct messaging_context *msg, void *private_data,
74 uint32_t msg_type,
75 struct server_id server_id, DATA_BLOB *data);
76 void *private_data;
79 struct messaging_registered_ev {
80 struct tevent_context *ev;
81 struct tevent_immediate *im;
82 size_t refcount;
85 struct messaging_context {
86 struct server_id id;
87 struct tevent_context *event_ctx;
88 struct messaging_callback *callbacks;
90 struct messaging_rec *posted_msgs;
92 struct messaging_registered_ev *event_contexts;
94 struct tevent_req **new_waiters;
95 size_t num_new_waiters;
97 struct tevent_req **waiters;
98 size_t num_waiters;
100 void *msg_dgm_ref;
101 void *msg_ctdb_ref;
103 struct server_id_db *names_db;
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107 struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109 struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111 struct tevent_context *ev,
112 struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114 struct tevent_context *ev,
115 struct messaging_rec *rec);
117 /****************************************************************************
118 A useful function for testing the message system.
119 ****************************************************************************/
121 static void ping_message(struct messaging_context *msg_ctx,
122 void *private_data,
123 uint32_t msg_type,
124 struct server_id src,
125 DATA_BLOB *data)
127 struct server_id_buf idbuf;
129 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130 server_id_str_buf(src, &idbuf), (int)data->length,
131 data->data ? (char *)data->data : ""));
133 messaging_send(msg_ctx, src, MSG_PONG, data);
136 struct messaging_rec *messaging_rec_create(
137 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138 uint32_t msg_type, const struct iovec *iov, int iovlen,
139 const int *fds, size_t num_fds)
141 ssize_t buflen;
142 uint8_t *buf;
143 struct messaging_rec *result;
145 if (num_fds > INT8_MAX) {
146 return NULL;
149 buflen = iov_buflen(iov, iovlen);
150 if (buflen == -1) {
151 return NULL;
153 buf = talloc_array(mem_ctx, uint8_t, buflen);
154 if (buf == NULL) {
155 return NULL;
157 iov_buf(iov, iovlen, buf, buflen);
160 struct messaging_rec rec;
161 int64_t fds64[num_fds];
162 size_t i;
164 for (i=0; i<num_fds; i++) {
165 fds64[i] = fds[i];
168 rec = (struct messaging_rec) {
169 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170 .src = src, .dest = dst,
171 .buf.data = buf, .buf.length = buflen,
172 .num_fds = num_fds, .fds = fds64,
175 result = messaging_rec_dup(mem_ctx, &rec);
178 TALLOC_FREE(buf);
180 return result;
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184 struct tevent_context *ev)
186 size_t i, num_event_contexts;
187 struct messaging_registered_ev *free_reg = NULL;
188 struct messaging_registered_ev *tmp;
190 num_event_contexts = talloc_array_length(ctx->event_contexts);
192 for (i=0; i<num_event_contexts; i++) {
193 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
195 if (reg->ev == ev) {
196 reg->refcount += 1;
197 return true;
199 if (reg->refcount == 0) {
200 if (reg->ev != NULL) {
201 abort();
203 free_reg = reg;
207 if (free_reg == NULL) {
208 tmp = talloc_realloc(ctx, ctx->event_contexts,
209 struct messaging_registered_ev,
210 num_event_contexts+1);
211 if (tmp == NULL) {
212 return false;
214 ctx->event_contexts = tmp;
216 free_reg = &ctx->event_contexts[num_event_contexts];
219 *free_reg = (struct messaging_registered_ev) { .ev = ev, .refcount = 1 };
221 return true;
224 static bool messaging_deregister_event_context(struct messaging_context *ctx,
225 struct tevent_context *ev)
227 size_t i, num_event_contexts;
229 num_event_contexts = talloc_array_length(ctx->event_contexts);
231 for (i=0; i<num_event_contexts; i++) {
232 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
234 if (reg->ev == ev) {
235 if (reg->refcount == 0) {
236 return false;
238 reg->refcount -= 1;
240 if (reg->refcount == 0) {
242 * Not strictly necessary, just
243 * paranoia
245 reg->ev = NULL;
248 * Do not talloc_free(reg->im),
249 * recycle immediates events.
252 return true;
255 return false;
258 static void messaging_post_main_event_context(struct tevent_context *ev,
259 struct tevent_immediate *im,
260 void *private_data)
262 struct messaging_context *ctx = talloc_get_type_abort(
263 private_data, struct messaging_context);
265 while (ctx->posted_msgs != NULL) {
266 struct messaging_rec *rec = ctx->posted_msgs;
267 bool consumed;
269 DLIST_REMOVE(ctx->posted_msgs, rec);
271 consumed = messaging_dispatch_classic(ctx, rec);
272 if (!consumed) {
273 consumed = messaging_dispatch_waiters(
274 ctx, ctx->event_ctx, rec);
277 if (!consumed) {
278 uint8_t i;
280 for (i=0; i<rec->num_fds; i++) {
281 close(rec->fds[i]);
285 TALLOC_FREE(rec);
289 static void messaging_post_sub_event_context(struct tevent_context *ev,
290 struct tevent_immediate *im,
291 void *private_data)
293 struct messaging_context *ctx = talloc_get_type_abort(
294 private_data, struct messaging_context);
295 struct messaging_rec *rec, *next;
297 for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
298 bool consumed;
300 next = rec->next;
302 consumed = messaging_dispatch_waiters(ctx, ev, rec);
303 if (consumed) {
304 DLIST_REMOVE(ctx->posted_msgs, rec);
305 TALLOC_FREE(rec);
310 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
312 size_t i, num_event_contexts;
314 num_event_contexts = talloc_array_length(ctx->event_contexts);
316 for (i=0; i<num_event_contexts; i++) {
317 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
319 if (reg->refcount == 0) {
320 continue;
323 if (reg->im == NULL) {
324 reg->im = tevent_create_immediate(
325 ctx->event_contexts);
327 if (reg->im == NULL) {
328 DBG_WARNING("Could not create immediate\n");
329 continue;
333 * We depend on schedule_immediate to work
334 * multiple times. Might be a bit inefficient,
335 * but this needs to be proven in tests. The
336 * alternatively would be to track whether the
337 * immediate has already been scheduled. For
338 * now, avoid that complexity here.
341 if (reg->ev == ctx->event_ctx) {
342 tevent_schedule_immediate(
343 reg->im, reg->ev,
344 messaging_post_main_event_context,
345 ctx);
346 } else {
347 tevent_schedule_immediate(
348 reg->im, reg->ev,
349 messaging_post_sub_event_context,
350 ctx);
354 return true;
357 static void messaging_recv_cb(struct tevent_context *ev,
358 const uint8_t *msg, size_t msg_len,
359 int *fds, size_t num_fds,
360 void *private_data)
362 struct messaging_context *msg_ctx = talloc_get_type_abort(
363 private_data, struct messaging_context);
364 struct server_id_buf idbuf;
365 struct messaging_rec rec;
366 int64_t fds64[MIN(num_fds, INT8_MAX)];
367 size_t i;
369 if (msg_len < MESSAGE_HDR_LENGTH) {
370 DBG_WARNING("message too short: %zu\n", msg_len);
371 goto close_fail;
374 if (num_fds > INT8_MAX) {
375 DBG_WARNING("too many fds: %zu\n", num_fds);
376 goto close_fail;
380 * "consume" the fds by copying them and setting
381 * the original variable to -1
383 for (i=0; i < num_fds; i++) {
384 fds64[i] = fds[i];
385 fds[i] = -1;
388 rec = (struct messaging_rec) {
389 .msg_version = MESSAGE_VERSION,
390 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
391 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
392 .num_fds = num_fds,
393 .fds = fds64,
396 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
398 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
399 (unsigned)rec.msg_type, rec.buf.length, num_fds,
400 server_id_str_buf(rec.src, &idbuf));
402 if (server_id_same_process(&rec.src, &msg_ctx->id)) {
403 DBG_DEBUG("Ignoring self-send\n");
404 goto close_fail;
407 messaging_dispatch_rec(msg_ctx, ev, &rec);
408 return;
410 close_fail:
411 for (i=0; i < num_fds; i++) {
412 close(fds[i]);
416 static int messaging_context_destructor(struct messaging_context *ctx)
418 size_t i;
420 for (i=0; i<ctx->num_new_waiters; i++) {
421 if (ctx->new_waiters[i] != NULL) {
422 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
423 ctx->new_waiters[i] = NULL;
426 for (i=0; i<ctx->num_waiters; i++) {
427 if (ctx->waiters[i] != NULL) {
428 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
429 ctx->waiters[i] = NULL;
434 * The immediates from messaging_alert_event_contexts
435 * reference "ctx". Don't let them outlive the
436 * messaging_context we're destroying here.
438 TALLOC_FREE(ctx->event_contexts);
440 return 0;
443 static const char *private_path(const char *name)
445 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
448 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
449 struct tevent_context *ev,
450 struct messaging_context **pmsg_ctx)
452 TALLOC_CTX *frame;
453 struct messaging_context *ctx;
454 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
455 int ret;
456 const char *lck_path;
457 const char *priv_path;
458 bool ok;
460 lck_path = lock_path("msg.lock");
461 if (lck_path == NULL) {
462 return NT_STATUS_NO_MEMORY;
465 ok = directory_create_or_exist_strict(lck_path,
466 sec_initial_uid(),
467 0755);
468 if (!ok) {
469 DBG_DEBUG("Could not create lock directory: %s\n",
470 strerror(errno));
471 return NT_STATUS_ACCESS_DENIED;
474 priv_path = private_path("msg.sock");
475 if (priv_path == NULL) {
476 return NT_STATUS_NO_MEMORY;
479 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
480 0700);
481 if (!ok) {
482 DBG_DEBUG("Could not create msg directory: %s\n",
483 strerror(errno));
484 return NT_STATUS_ACCESS_DENIED;
487 frame = talloc_stackframe();
488 if (frame == NULL) {
489 return NT_STATUS_NO_MEMORY;
492 ctx = talloc_zero(frame, struct messaging_context);
493 if (ctx == NULL) {
494 status = NT_STATUS_NO_MEMORY;
495 goto done;
498 ctx->id = (struct server_id) {
499 .pid = getpid(), .vnn = NONCLUSTER_VNN
502 ctx->event_ctx = ev;
504 ok = messaging_register_event_context(ctx, ev);
505 if (!ok) {
506 status = NT_STATUS_NO_MEMORY;
507 goto done;
510 sec_init();
512 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
513 ctx->event_ctx,
514 &ctx->id.unique_id,
515 priv_path,
516 lck_path,
517 messaging_recv_cb,
518 ctx,
519 &ret);
520 if (ctx->msg_dgm_ref == NULL) {
521 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
522 status = map_nt_error_from_unix(ret);
523 goto done;
525 talloc_set_destructor(ctx, messaging_context_destructor);
527 #ifdef CLUSTER_SUPPORT
528 if (lp_clustering()) {
529 ctx->msg_ctdb_ref = messaging_ctdb_ref(
530 ctx, ctx->event_ctx,
531 lp_ctdbd_socket(), lp_ctdb_timeout(),
532 ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
533 if (ctx->msg_ctdb_ref == NULL) {
534 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
535 strerror(ret));
536 status = map_nt_error_from_unix(ret);
537 goto done;
540 #endif
542 ctx->id.vnn = get_my_vnn();
544 ctx->names_db = server_id_db_init(ctx,
545 ctx->id,
546 lp_lock_directory(),
548 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
549 if (ctx->names_db == NULL) {
550 DBG_DEBUG("server_id_db_init failed\n");
551 status = NT_STATUS_NO_MEMORY;
552 goto done;
555 messaging_register(ctx, NULL, MSG_PING, ping_message);
557 /* Register some debugging related messages */
559 register_msg_pool_usage(ctx);
560 register_dmalloc_msgs(ctx);
561 debug_register_msgs(ctx);
564 struct server_id_buf tmp;
565 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
568 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
570 status = NT_STATUS_OK;
571 done:
572 TALLOC_FREE(frame);
574 return status;
577 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
578 struct tevent_context *ev)
580 struct messaging_context *ctx = NULL;
581 NTSTATUS status;
583 status = messaging_init_internal(mem_ctx,
585 &ctx);
586 if (!NT_STATUS_IS_OK(status)) {
587 return NULL;
590 return ctx;
593 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
594 struct tevent_context *ev,
595 struct messaging_context **pmsg_ctx)
597 return messaging_init_internal(mem_ctx,
599 pmsg_ctx);
602 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
604 return msg_ctx->id;
608 * re-init after a fork
610 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
612 int ret;
613 char *lck_path;
615 TALLOC_FREE(msg_ctx->msg_dgm_ref);
616 TALLOC_FREE(msg_ctx->msg_ctdb_ref);
618 msg_ctx->id = (struct server_id) {
619 .pid = getpid(), .vnn = msg_ctx->id.vnn
622 lck_path = lock_path("msg.lock");
623 if (lck_path == NULL) {
624 return NT_STATUS_NO_MEMORY;
627 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
628 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
629 private_path("msg.sock"), lck_path,
630 messaging_recv_cb, msg_ctx, &ret);
632 if (msg_ctx->msg_dgm_ref == NULL) {
633 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
634 return map_nt_error_from_unix(ret);
637 if (lp_clustering()) {
638 msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
639 msg_ctx, msg_ctx->event_ctx,
640 lp_ctdbd_socket(), lp_ctdb_timeout(),
641 msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
642 &ret);
643 if (msg_ctx->msg_ctdb_ref == NULL) {
644 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
645 strerror(ret));
646 return map_nt_error_from_unix(ret);
650 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
652 return NT_STATUS_OK;
657 * Register a dispatch function for a particular message type. Allow multiple
658 * registrants
660 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
661 void *private_data,
662 uint32_t msg_type,
663 void (*fn)(struct messaging_context *msg,
664 void *private_data,
665 uint32_t msg_type,
666 struct server_id server_id,
667 DATA_BLOB *data))
669 struct messaging_callback *cb;
671 DEBUG(5, ("Registering messaging pointer for type %u - "
672 "private_data=%p\n",
673 (unsigned)msg_type, private_data));
676 * Only one callback per type
679 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
680 /* we allow a second registration of the same message
681 type if it has a different private pointer. This is
682 needed in, for example, the internal notify code,
683 which creates a new notify context for each tree
684 connect, and expects to receive messages to each of
685 them. */
686 if (cb->msg_type == msg_type && private_data == cb->private_data) {
687 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
688 (unsigned)msg_type, private_data));
689 cb->fn = fn;
690 cb->private_data = private_data;
691 return NT_STATUS_OK;
695 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
696 return NT_STATUS_NO_MEMORY;
699 cb->msg_type = msg_type;
700 cb->fn = fn;
701 cb->private_data = private_data;
703 DLIST_ADD(msg_ctx->callbacks, cb);
704 return NT_STATUS_OK;
708 De-register the function for a particular message type.
710 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
711 void *private_data)
713 struct messaging_callback *cb, *next;
715 for (cb = ctx->callbacks; cb; cb = next) {
716 next = cb->next;
717 if ((cb->msg_type == msg_type)
718 && (cb->private_data == private_data)) {
719 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
720 (unsigned)msg_type, private_data));
721 DLIST_REMOVE(ctx->callbacks, cb);
722 TALLOC_FREE(cb);
728 Send a message to a particular server
730 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
731 struct server_id server, uint32_t msg_type,
732 const DATA_BLOB *data)
734 struct iovec iov = {0};
736 if (data != NULL) {
737 iov.iov_base = data->data;
738 iov.iov_len = data->length;
741 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
744 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
745 struct server_id server, uint32_t msg_type,
746 const uint8_t *buf, size_t len)
748 DATA_BLOB blob = data_blob_const(buf, len);
749 return messaging_send(msg_ctx, server, msg_type, &blob);
752 static int messaging_post_self(struct messaging_context *msg_ctx,
753 struct server_id src, struct server_id dst,
754 uint32_t msg_type,
755 const struct iovec *iov, int iovlen,
756 const int *fds, size_t num_fds)
758 struct messaging_rec *rec;
759 bool ok;
761 rec = messaging_rec_create(
762 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
763 if (rec == NULL) {
764 return ENOMEM;
767 ok = messaging_alert_event_contexts(msg_ctx);
768 if (!ok) {
769 TALLOC_FREE(rec);
770 return ENOMEM;
773 DLIST_ADD_END(msg_ctx->posted_msgs, rec);
775 return 0;
778 int messaging_send_iov_from(struct messaging_context *msg_ctx,
779 struct server_id src, struct server_id dst,
780 uint32_t msg_type,
781 const struct iovec *iov, int iovlen,
782 const int *fds, size_t num_fds)
784 int ret;
785 uint8_t hdr[MESSAGE_HDR_LENGTH];
786 struct iovec iov2[iovlen+1];
788 if (server_id_is_disconnected(&dst)) {
789 return EINVAL;
792 if (num_fds > INT8_MAX) {
793 return EINVAL;
796 if (server_id_equal(&dst, &msg_ctx->id)) {
797 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
798 iov, iovlen, fds, num_fds);
799 return ret;
802 message_hdr_put(hdr, msg_type, src, dst);
803 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
804 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
806 if (dst.vnn != msg_ctx->id.vnn) {
807 if (num_fds > 0) {
808 return ENOSYS;
811 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
812 return ret;
815 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
817 if (ret == EACCES) {
818 become_root();
819 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
820 fds, num_fds);
821 unbecome_root();
824 if (ret == ECONNREFUSED) {
826 * Linux returns this when a socket exists in the file
827 * system without a listening process. This is not
828 * documented in susv4 or the linux manpages, but it's
829 * easily testable. For the higher levels this is the
830 * same as "destination does not exist"
832 ret = ENOENT;
835 return ret;
838 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
839 struct server_id dst, uint32_t msg_type,
840 const struct iovec *iov, int iovlen,
841 const int *fds, size_t num_fds)
843 int ret;
845 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
846 iov, iovlen, fds, num_fds);
847 if (ret != 0) {
848 return map_nt_error_from_unix(ret);
850 return NT_STATUS_OK;
853 struct send_all_state {
854 struct messaging_context *msg_ctx;
855 int msg_type;
856 const void *buf;
857 size_t len;
860 static int send_all_fn(pid_t pid, void *private_data)
862 struct send_all_state *state = private_data;
863 NTSTATUS status;
865 if (pid == getpid()) {
866 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
867 return 0;
870 status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
871 state->msg_type, state->buf, state->len);
872 if (!NT_STATUS_IS_OK(status)) {
873 DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
874 (uintmax_t)pid, nt_errstr(status));
877 return 0;
880 void messaging_send_all(struct messaging_context *msg_ctx,
881 int msg_type, const void *buf, size_t len)
883 struct send_all_state state = {
884 .msg_ctx = msg_ctx, .msg_type = msg_type,
885 .buf = buf, .len = len
887 int ret;
889 #ifdef CLUSTER_SUPPORT
890 if (lp_clustering()) {
891 struct ctdbd_connection *conn = messaging_ctdb_connection();
892 uint8_t msghdr[MESSAGE_HDR_LENGTH];
893 struct iovec iov[] = {
894 { .iov_base = msghdr,
895 .iov_len = sizeof(msghdr) },
896 { .iov_base = discard_const_p(void, buf),
897 .iov_len = len }
900 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
901 (struct server_id) {0});
903 ret = ctdbd_messaging_send_iov(
904 conn, CTDB_BROADCAST_CONNECTED,
905 CTDB_SRVID_SAMBA_PROCESS,
906 iov, ARRAY_SIZE(iov));
907 if (ret != 0) {
908 DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
909 strerror(ret));
912 return;
914 #endif
916 ret = messaging_dgm_forall(send_all_fn, &state);
917 if (ret != 0) {
918 DBG_WARNING("messaging_dgm_forall failed: %s\n",
919 strerror(ret));
923 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
924 struct messaging_rec *rec)
926 struct messaging_rec *result;
927 size_t fds_size = sizeof(int64_t) * rec->num_fds;
928 size_t payload_len;
930 payload_len = rec->buf.length + fds_size;
931 if (payload_len < rec->buf.length) {
932 /* overflow */
933 return NULL;
936 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
937 payload_len);
938 if (result == NULL) {
939 return NULL;
941 *result = *rec;
943 /* Doesn't fail, see talloc_pooled_object */
945 result->buf.data = talloc_memdup(result, rec->buf.data,
946 rec->buf.length);
948 result->fds = NULL;
949 if (result->num_fds > 0) {
950 result->fds = talloc_memdup(result, rec->fds, fds_size);
953 return result;
956 struct messaging_filtered_read_state {
957 struct tevent_context *ev;
958 struct messaging_context *msg_ctx;
959 struct messaging_dgm_fde *fde;
960 struct messaging_ctdb_fde *cluster_fde;
962 bool (*filter)(struct messaging_rec *rec, void *private_data);
963 void *private_data;
965 struct messaging_rec *rec;
968 static void messaging_filtered_read_cleanup(struct tevent_req *req,
969 enum tevent_req_state req_state);
971 struct tevent_req *messaging_filtered_read_send(
972 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
973 struct messaging_context *msg_ctx,
974 bool (*filter)(struct messaging_rec *rec, void *private_data),
975 void *private_data)
977 struct tevent_req *req;
978 struct messaging_filtered_read_state *state;
979 size_t new_waiters_len;
980 bool ok;
982 req = tevent_req_create(mem_ctx, &state,
983 struct messaging_filtered_read_state);
984 if (req == NULL) {
985 return NULL;
987 state->ev = ev;
988 state->msg_ctx = msg_ctx;
989 state->filter = filter;
990 state->private_data = private_data;
993 * We have to defer the callback here, as we might be called from
994 * within a different tevent_context than state->ev
996 tevent_req_defer_callback(req, state->ev);
998 state->fde = messaging_dgm_register_tevent_context(state, ev);
999 if (tevent_req_nomem(state->fde, req)) {
1000 return tevent_req_post(req, ev);
1003 if (lp_clustering()) {
1004 state->cluster_fde =
1005 messaging_ctdb_register_tevent_context(state, ev);
1006 if (tevent_req_nomem(state->cluster_fde, req)) {
1007 return tevent_req_post(req, ev);
1012 * We add ourselves to the "new_waiters" array, not the "waiters"
1013 * array. If we are called from within messaging_read_done,
1014 * messaging_dispatch_rec will be in an active for-loop on
1015 * "waiters". We must be careful not to mess with this array, because
1016 * it could mean that a single event is being delivered twice.
1019 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1021 if (new_waiters_len == msg_ctx->num_new_waiters) {
1022 struct tevent_req **tmp;
1024 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1025 struct tevent_req *, new_waiters_len+1);
1026 if (tevent_req_nomem(tmp, req)) {
1027 return tevent_req_post(req, ev);
1029 msg_ctx->new_waiters = tmp;
1032 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1033 msg_ctx->num_new_waiters += 1;
1034 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1036 ok = messaging_register_event_context(msg_ctx, ev);
1037 if (!ok) {
1038 tevent_req_oom(req);
1039 return tevent_req_post(req, ev);
1042 return req;
1045 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1046 enum tevent_req_state req_state)
1048 struct messaging_filtered_read_state *state = tevent_req_data(
1049 req, struct messaging_filtered_read_state);
1050 struct messaging_context *msg_ctx = state->msg_ctx;
1051 size_t i;
1052 bool ok;
1054 tevent_req_set_cleanup_fn(req, NULL);
1056 TALLOC_FREE(state->fde);
1057 TALLOC_FREE(state->cluster_fde);
1059 ok = messaging_deregister_event_context(msg_ctx, state->ev);
1060 if (!ok) {
1061 abort();
1065 * Just set the [new_]waiters entry to NULL, be careful not to mess
1066 * with the other "waiters" array contents. We are often called from
1067 * within "messaging_dispatch_rec", which loops over
1068 * "waiters". Messing with the "waiters" array will mess up that
1069 * for-loop.
1072 for (i=0; i<msg_ctx->num_waiters; i++) {
1073 if (msg_ctx->waiters[i] == req) {
1074 msg_ctx->waiters[i] = NULL;
1075 return;
1079 for (i=0; i<msg_ctx->num_new_waiters; i++) {
1080 if (msg_ctx->new_waiters[i] == req) {
1081 msg_ctx->new_waiters[i] = NULL;
1082 return;
1087 static void messaging_filtered_read_done(struct tevent_req *req,
1088 struct messaging_rec *rec)
1090 struct messaging_filtered_read_state *state = tevent_req_data(
1091 req, struct messaging_filtered_read_state);
1093 state->rec = messaging_rec_dup(state, rec);
1094 if (tevent_req_nomem(state->rec, req)) {
1095 return;
1097 tevent_req_done(req);
1100 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1101 struct messaging_rec **presult)
1103 struct messaging_filtered_read_state *state = tevent_req_data(
1104 req, struct messaging_filtered_read_state);
1105 int err;
1107 if (tevent_req_is_unix_error(req, &err)) {
1108 tevent_req_received(req);
1109 return err;
1111 if (presult != NULL) {
1112 *presult = talloc_move(mem_ctx, &state->rec);
1114 return 0;
1117 struct messaging_read_state {
1118 uint32_t msg_type;
1119 struct messaging_rec *rec;
1122 static bool messaging_read_filter(struct messaging_rec *rec,
1123 void *private_data);
1124 static void messaging_read_done(struct tevent_req *subreq);
1126 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1127 struct tevent_context *ev,
1128 struct messaging_context *msg,
1129 uint32_t msg_type)
1131 struct tevent_req *req, *subreq;
1132 struct messaging_read_state *state;
1134 req = tevent_req_create(mem_ctx, &state,
1135 struct messaging_read_state);
1136 if (req == NULL) {
1137 return NULL;
1139 state->msg_type = msg_type;
1141 subreq = messaging_filtered_read_send(state, ev, msg,
1142 messaging_read_filter, state);
1143 if (tevent_req_nomem(subreq, req)) {
1144 return tevent_req_post(req, ev);
1146 tevent_req_set_callback(subreq, messaging_read_done, req);
1147 return req;
1150 static bool messaging_read_filter(struct messaging_rec *rec,
1151 void *private_data)
1153 struct messaging_read_state *state = talloc_get_type_abort(
1154 private_data, struct messaging_read_state);
1156 if (rec->num_fds != 0) {
1157 return false;
1160 return rec->msg_type == state->msg_type;
1163 static void messaging_read_done(struct tevent_req *subreq)
1165 struct tevent_req *req = tevent_req_callback_data(
1166 subreq, struct tevent_req);
1167 struct messaging_read_state *state = tevent_req_data(
1168 req, struct messaging_read_state);
1169 int ret;
1171 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1172 TALLOC_FREE(subreq);
1173 if (tevent_req_error(req, ret)) {
1174 return;
1176 tevent_req_done(req);
1179 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1180 struct messaging_rec **presult)
1182 struct messaging_read_state *state = tevent_req_data(
1183 req, struct messaging_read_state);
1184 int err;
1186 if (tevent_req_is_unix_error(req, &err)) {
1187 return err;
1189 if (presult != NULL) {
1190 *presult = talloc_move(mem_ctx, &state->rec);
1192 return 0;
1195 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1197 if (msg_ctx->num_new_waiters == 0) {
1198 return true;
1201 if (talloc_array_length(msg_ctx->waiters) <
1202 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1203 struct tevent_req **tmp;
1204 tmp = talloc_realloc(
1205 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1206 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1207 if (tmp == NULL) {
1208 DEBUG(1, ("%s: talloc failed\n", __func__));
1209 return false;
1211 msg_ctx->waiters = tmp;
1214 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1215 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1217 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1218 msg_ctx->num_new_waiters = 0;
1220 return true;
1223 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1224 struct messaging_rec *rec)
1226 struct messaging_callback *cb, *next;
1228 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1229 size_t j;
1231 next = cb->next;
1232 if (cb->msg_type != rec->msg_type) {
1233 continue;
1237 * the old style callbacks don't support fd passing
1239 for (j=0; j < rec->num_fds; j++) {
1240 int fd = rec->fds[j];
1241 close(fd);
1243 rec->num_fds = 0;
1244 rec->fds = NULL;
1246 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1247 rec->src, &rec->buf);
1249 return true;
1252 return false;
1255 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1256 struct tevent_context *ev,
1257 struct messaging_rec *rec)
1259 size_t i;
1261 if (!messaging_append_new_waiters(msg_ctx)) {
1262 return false;
1265 i = 0;
1266 while (i < msg_ctx->num_waiters) {
1267 struct tevent_req *req;
1268 struct messaging_filtered_read_state *state;
1270 req = msg_ctx->waiters[i];
1271 if (req == NULL) {
1273 * This got cleaned up. In the meantime,
1274 * move everything down one. We need
1275 * to keep the order of waiters, as
1276 * other code may depend on this.
1278 if (i < msg_ctx->num_waiters - 1) {
1279 memmove(&msg_ctx->waiters[i],
1280 &msg_ctx->waiters[i+1],
1281 sizeof(struct tevent_req *) *
1282 (msg_ctx->num_waiters - i - 1));
1284 msg_ctx->num_waiters -= 1;
1285 continue;
1288 state = tevent_req_data(
1289 req, struct messaging_filtered_read_state);
1290 if ((ev == state->ev) &&
1291 state->filter(rec, state->private_data)) {
1292 messaging_filtered_read_done(req, rec);
1293 return true;
1296 i += 1;
1299 return false;
1303 Dispatch one messaging_rec
1305 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1306 struct tevent_context *ev,
1307 struct messaging_rec *rec)
1309 bool consumed;
1310 size_t i;
1312 if (ev == msg_ctx->event_ctx) {
1313 consumed = messaging_dispatch_classic(msg_ctx, rec);
1314 if (consumed) {
1315 return;
1319 consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1320 if (consumed) {
1321 return;
1324 if (ev != msg_ctx->event_ctx) {
1325 struct iovec iov;
1326 int fds[rec->num_fds];
1327 int ret;
1330 * We've been listening on a nested event
1331 * context. Messages need to be handled in the main
1332 * event context, so post to ourselves
1335 iov.iov_base = rec->buf.data;
1336 iov.iov_len = rec->buf.length;
1338 for (i=0; i<rec->num_fds; i++) {
1339 fds[i] = rec->fds[i];
1342 ret = messaging_post_self(
1343 msg_ctx, rec->src, rec->dest, rec->msg_type,
1344 &iov, 1, fds, rec->num_fds);
1345 if (ret == 0) {
1346 return;
1351 * If the fd-array isn't used, just close it.
1353 for (i=0; i < rec->num_fds; i++) {
1354 int fd = rec->fds[i];
1355 close(fd);
1357 rec->num_fds = 0;
1358 rec->fds = NULL;
1361 static int mess_parent_dgm_cleanup(void *private_data);
1362 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1364 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1366 struct tevent_req *req;
1368 req = background_job_send(
1369 msg, msg->event_ctx, msg, NULL, 0,
1370 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1371 60*15),
1372 mess_parent_dgm_cleanup, msg);
1373 if (req == NULL) {
1374 DBG_WARNING("background_job_send failed\n");
1375 return false;
1377 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1378 return true;
1381 static int mess_parent_dgm_cleanup(void *private_data)
1383 int ret;
1385 ret = messaging_dgm_wipe();
1386 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1387 ret ? strerror(ret) : "ok"));
1388 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1389 60*15);
1392 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1394 struct messaging_context *msg = tevent_req_callback_data(
1395 req, struct messaging_context);
1396 NTSTATUS status;
1398 status = background_job_recv(req);
1399 TALLOC_FREE(req);
1400 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1401 nt_errstr(status)));
1403 req = background_job_send(
1404 msg, msg->event_ctx, msg, NULL, 0,
1405 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1406 60*15),
1407 mess_parent_dgm_cleanup, msg);
1408 if (req == NULL) {
1409 DEBUG(1, ("background_job_send failed\n"));
1410 return;
1412 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1415 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1417 int ret;
1419 if (pid == 0) {
1420 ret = messaging_dgm_wipe();
1421 } else {
1422 ret = messaging_dgm_cleanup(pid);
1425 return ret;
1428 struct tevent_context *messaging_tevent_context(
1429 struct messaging_context *msg_ctx)
1431 return msg_ctx->event_ctx;
1434 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1436 return msg_ctx->names_db;
1439 /** @} **/