dbchecker: improve verbose output of do_modify()
[Samba.git] / source3 / lib / messages.c
blobaa899142333ddae72c06b280286de81c7a72efb5
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
70 struct messaging_callback {
71 struct messaging_callback *prev, *next;
72 uint32_t msg_type;
73 void (*fn)(struct messaging_context *msg, void *private_data,
74 uint32_t msg_type,
75 struct server_id server_id, DATA_BLOB *data);
76 void *private_data;
79 struct messaging_registered_ev {
80 struct tevent_context *ev;
81 struct tevent_immediate *im;
82 size_t refcount;
85 struct messaging_context {
86 struct server_id id;
87 struct tevent_context *event_ctx;
88 struct messaging_callback *callbacks;
90 struct messaging_rec *posted_msgs;
92 struct messaging_registered_ev *event_contexts;
94 struct tevent_req **new_waiters;
95 size_t num_new_waiters;
97 struct tevent_req **waiters;
98 size_t num_waiters;
100 void *msg_dgm_ref;
101 void *msg_ctdb_ref;
103 struct server_id_db *names_db;
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107 struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109 struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111 struct tevent_context *ev,
112 struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114 struct tevent_context *ev,
115 struct messaging_rec *rec);
117 /****************************************************************************
118 A useful function for testing the message system.
119 ****************************************************************************/
121 static void ping_message(struct messaging_context *msg_ctx,
122 void *private_data,
123 uint32_t msg_type,
124 struct server_id src,
125 DATA_BLOB *data)
127 struct server_id_buf idbuf;
129 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130 server_id_str_buf(src, &idbuf), (int)data->length,
131 data->data ? (char *)data->data : ""));
133 messaging_send(msg_ctx, src, MSG_PONG, data);
136 struct messaging_rec *messaging_rec_create(
137 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138 uint32_t msg_type, const struct iovec *iov, int iovlen,
139 const int *fds, size_t num_fds)
141 ssize_t buflen;
142 uint8_t *buf;
143 struct messaging_rec *result;
145 if (num_fds > INT8_MAX) {
146 return NULL;
149 buflen = iov_buflen(iov, iovlen);
150 if (buflen == -1) {
151 return NULL;
153 buf = talloc_array(mem_ctx, uint8_t, buflen);
154 if (buf == NULL) {
155 return NULL;
157 iov_buf(iov, iovlen, buf, buflen);
160 struct messaging_rec rec;
161 int64_t fds64[num_fds];
162 size_t i;
164 for (i=0; i<num_fds; i++) {
165 fds64[i] = fds[i];
168 rec = (struct messaging_rec) {
169 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170 .src = src, .dest = dst,
171 .buf.data = buf, .buf.length = buflen,
172 .num_fds = num_fds, .fds = fds64,
175 result = messaging_rec_dup(mem_ctx, &rec);
178 TALLOC_FREE(buf);
180 return result;
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184 struct tevent_context *ev)
186 size_t i, num_event_contexts;
187 struct messaging_registered_ev *free_reg = NULL;
188 struct messaging_registered_ev *tmp;
190 num_event_contexts = talloc_array_length(ctx->event_contexts);
192 for (i=0; i<num_event_contexts; i++) {
193 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
195 if (reg->refcount == 0) {
196 if (reg->ev != NULL) {
197 abort();
199 free_reg = reg;
201 * We continue here and may find another
202 * free_req, but the important thing is
203 * that we continue to search for an
204 * existing registration in the loop.
206 continue;
209 if (tevent_context_same_loop(reg->ev, ev)) {
210 reg->refcount += 1;
211 return true;
215 if (free_reg == NULL) {
216 struct tevent_immediate *im = NULL;
218 im = tevent_create_immediate(ctx);
219 if (im == NULL) {
220 return false;
223 tmp = talloc_realloc(ctx, ctx->event_contexts,
224 struct messaging_registered_ev,
225 num_event_contexts+1);
226 if (tmp == NULL) {
227 return false;
229 ctx->event_contexts = tmp;
231 free_reg = &ctx->event_contexts[num_event_contexts];
232 free_reg->im = talloc_move(ctx->event_contexts, &im);
236 * free_reg->im might be cached
238 free_reg->ev = ev;
239 free_reg->refcount = 1;
241 return true;
244 static bool messaging_deregister_event_context(struct messaging_context *ctx,
245 struct tevent_context *ev)
247 size_t i, num_event_contexts;
249 num_event_contexts = talloc_array_length(ctx->event_contexts);
251 for (i=0; i<num_event_contexts; i++) {
252 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
254 if (reg->refcount == 0) {
255 continue;
258 if (tevent_context_same_loop(reg->ev, ev)) {
259 reg->refcount -= 1;
261 if (reg->refcount == 0) {
263 * The primary event context
264 * is never unregistered using
265 * messaging_deregister_event_context()
266 * it's only registered using
267 * messaging_register_event_context().
269 SMB_ASSERT(ev != ctx->event_ctx);
270 SMB_ASSERT(reg->ev != ctx->event_ctx);
273 * Not strictly necessary, just
274 * paranoia
276 reg->ev = NULL;
279 * Do not talloc_free(reg->im),
280 * recycle immediates events.
282 * We just invalidate it using
283 * the primary event context,
284 * which is never unregistered.
286 tevent_schedule_immediate(reg->im,
287 ctx->event_ctx,
288 NULL, NULL);
290 return true;
293 return false;
296 static void messaging_post_main_event_context(struct tevent_context *ev,
297 struct tevent_immediate *im,
298 void *private_data)
300 struct messaging_context *ctx = talloc_get_type_abort(
301 private_data, struct messaging_context);
303 while (ctx->posted_msgs != NULL) {
304 struct messaging_rec *rec = ctx->posted_msgs;
305 bool consumed;
307 DLIST_REMOVE(ctx->posted_msgs, rec);
309 consumed = messaging_dispatch_classic(ctx, rec);
310 if (!consumed) {
311 consumed = messaging_dispatch_waiters(
312 ctx, ctx->event_ctx, rec);
315 if (!consumed) {
316 uint8_t i;
318 for (i=0; i<rec->num_fds; i++) {
319 close(rec->fds[i]);
323 TALLOC_FREE(rec);
327 static void messaging_post_sub_event_context(struct tevent_context *ev,
328 struct tevent_immediate *im,
329 void *private_data)
331 struct messaging_context *ctx = talloc_get_type_abort(
332 private_data, struct messaging_context);
333 struct messaging_rec *rec, *next;
335 for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
336 bool consumed;
338 next = rec->next;
340 consumed = messaging_dispatch_waiters(ctx, ev, rec);
341 if (consumed) {
342 DLIST_REMOVE(ctx->posted_msgs, rec);
343 TALLOC_FREE(rec);
348 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
350 size_t i, num_event_contexts;
352 num_event_contexts = talloc_array_length(ctx->event_contexts);
354 for (i=0; i<num_event_contexts; i++) {
355 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
357 if (reg->refcount == 0) {
358 continue;
362 * We depend on schedule_immediate to work
363 * multiple times. Might be a bit inefficient,
364 * but this needs to be proven in tests. The
365 * alternatively would be to track whether the
366 * immediate has already been scheduled. For
367 * now, avoid that complexity here.
369 * reg->ev and ctx->event_ctx can't
370 * be wrapper tevent_context pointers
371 * so we don't need to use
372 * tevent_context_same_loop().
375 if (reg->ev == ctx->event_ctx) {
376 tevent_schedule_immediate(
377 reg->im, reg->ev,
378 messaging_post_main_event_context,
379 ctx);
380 } else {
381 tevent_schedule_immediate(
382 reg->im, reg->ev,
383 messaging_post_sub_event_context,
384 ctx);
388 return true;
391 static void messaging_recv_cb(struct tevent_context *ev,
392 const uint8_t *msg, size_t msg_len,
393 int *fds, size_t num_fds,
394 void *private_data)
396 struct messaging_context *msg_ctx = talloc_get_type_abort(
397 private_data, struct messaging_context);
398 struct server_id_buf idbuf;
399 struct messaging_rec rec;
400 int64_t fds64[MIN(num_fds, INT8_MAX)];
401 size_t i;
403 if (msg_len < MESSAGE_HDR_LENGTH) {
404 DBG_WARNING("message too short: %zu\n", msg_len);
405 goto close_fail;
408 if (num_fds > INT8_MAX) {
409 DBG_WARNING("too many fds: %zu\n", num_fds);
410 goto close_fail;
414 * "consume" the fds by copying them and setting
415 * the original variable to -1
417 for (i=0; i < num_fds; i++) {
418 fds64[i] = fds[i];
419 fds[i] = -1;
422 rec = (struct messaging_rec) {
423 .msg_version = MESSAGE_VERSION,
424 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
425 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
426 .num_fds = num_fds,
427 .fds = fds64,
430 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
432 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
433 (unsigned)rec.msg_type, rec.buf.length, num_fds,
434 server_id_str_buf(rec.src, &idbuf));
436 if (server_id_same_process(&rec.src, &msg_ctx->id)) {
437 DBG_DEBUG("Ignoring self-send\n");
438 goto close_fail;
441 messaging_dispatch_rec(msg_ctx, ev, &rec);
442 return;
444 close_fail:
445 for (i=0; i < num_fds; i++) {
446 close(fds[i]);
450 static int messaging_context_destructor(struct messaging_context *ctx)
452 size_t i;
454 for (i=0; i<ctx->num_new_waiters; i++) {
455 if (ctx->new_waiters[i] != NULL) {
456 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
457 ctx->new_waiters[i] = NULL;
460 for (i=0; i<ctx->num_waiters; i++) {
461 if (ctx->waiters[i] != NULL) {
462 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
463 ctx->waiters[i] = NULL;
468 * The immediates from messaging_alert_event_contexts
469 * reference "ctx". Don't let them outlive the
470 * messaging_context we're destroying here.
472 TALLOC_FREE(ctx->event_contexts);
474 return 0;
477 static const char *private_path(const char *name)
479 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
482 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
483 struct tevent_context *ev,
484 struct messaging_context **pmsg_ctx)
486 TALLOC_CTX *frame;
487 struct messaging_context *ctx;
488 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
489 int ret;
490 const char *lck_path;
491 const char *priv_path;
492 bool ok;
495 * sec_init() *must* be called before any other
496 * functions that use sec_XXX(). e.g. sec_initial_uid().
499 sec_init();
501 if (tevent_context_is_wrapper(ev)) {
502 /* This is really a programmer error! */
503 DBG_ERR("Should not be used with a wrapper tevent context\n");
504 return NT_STATUS_INVALID_PARAMETER;
507 lck_path = lock_path(talloc_tos(), "msg.lock");
508 if (lck_path == NULL) {
509 return NT_STATUS_NO_MEMORY;
512 ok = directory_create_or_exist_strict(lck_path,
513 sec_initial_uid(),
514 0755);
515 if (!ok) {
516 DBG_DEBUG("Could not create lock directory: %s\n",
517 strerror(errno));
518 return NT_STATUS_ACCESS_DENIED;
521 priv_path = private_path("msg.sock");
522 if (priv_path == NULL) {
523 return NT_STATUS_NO_MEMORY;
526 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
527 0700);
528 if (!ok) {
529 DBG_DEBUG("Could not create msg directory: %s\n",
530 strerror(errno));
531 return NT_STATUS_ACCESS_DENIED;
534 frame = talloc_stackframe();
535 if (frame == NULL) {
536 return NT_STATUS_NO_MEMORY;
539 ctx = talloc_zero(frame, struct messaging_context);
540 if (ctx == NULL) {
541 status = NT_STATUS_NO_MEMORY;
542 goto done;
545 ctx->id = (struct server_id) {
546 .pid = getpid(), .vnn = NONCLUSTER_VNN
549 ctx->event_ctx = ev;
551 ok = messaging_register_event_context(ctx, ev);
552 if (!ok) {
553 status = NT_STATUS_NO_MEMORY;
554 goto done;
557 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
558 ctx->event_ctx,
559 &ctx->id.unique_id,
560 priv_path,
561 lck_path,
562 messaging_recv_cb,
563 ctx,
564 &ret);
565 if (ctx->msg_dgm_ref == NULL) {
566 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
567 status = map_nt_error_from_unix(ret);
568 goto done;
570 talloc_set_destructor(ctx, messaging_context_destructor);
572 #ifdef CLUSTER_SUPPORT
573 if (lp_clustering()) {
574 ctx->msg_ctdb_ref = messaging_ctdb_ref(
575 ctx, ctx->event_ctx,
576 lp_ctdbd_socket(), lp_ctdb_timeout(),
577 ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
578 if (ctx->msg_ctdb_ref == NULL) {
579 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
580 strerror(ret));
581 status = map_nt_error_from_unix(ret);
582 goto done;
585 #endif
587 ctx->id.vnn = get_my_vnn();
589 ctx->names_db = server_id_db_init(ctx,
590 ctx->id,
591 lp_lock_directory(),
593 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
594 if (ctx->names_db == NULL) {
595 DBG_DEBUG("server_id_db_init failed\n");
596 status = NT_STATUS_NO_MEMORY;
597 goto done;
600 messaging_register(ctx, NULL, MSG_PING, ping_message);
602 /* Register some debugging related messages */
604 register_msg_pool_usage(ctx);
605 register_dmalloc_msgs(ctx);
606 debug_register_msgs(ctx);
609 struct server_id_buf tmp;
610 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
613 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
615 status = NT_STATUS_OK;
616 done:
617 TALLOC_FREE(frame);
619 return status;
622 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
623 struct tevent_context *ev)
625 struct messaging_context *ctx = NULL;
626 NTSTATUS status;
628 status = messaging_init_internal(mem_ctx,
630 &ctx);
631 if (!NT_STATUS_IS_OK(status)) {
632 return NULL;
635 return ctx;
638 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
640 return msg_ctx->id;
644 * re-init after a fork
646 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
648 int ret;
649 char *lck_path;
651 TALLOC_FREE(msg_ctx->msg_dgm_ref);
652 TALLOC_FREE(msg_ctx->msg_ctdb_ref);
654 msg_ctx->id = (struct server_id) {
655 .pid = getpid(), .vnn = msg_ctx->id.vnn
658 lck_path = lock_path(talloc_tos(), "msg.lock");
659 if (lck_path == NULL) {
660 return NT_STATUS_NO_MEMORY;
663 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
664 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
665 private_path("msg.sock"), lck_path,
666 messaging_recv_cb, msg_ctx, &ret);
668 if (msg_ctx->msg_dgm_ref == NULL) {
669 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
670 return map_nt_error_from_unix(ret);
673 if (lp_clustering()) {
674 msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
675 msg_ctx, msg_ctx->event_ctx,
676 lp_ctdbd_socket(), lp_ctdb_timeout(),
677 msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
678 &ret);
679 if (msg_ctx->msg_ctdb_ref == NULL) {
680 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
681 strerror(ret));
682 return map_nt_error_from_unix(ret);
686 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
688 return NT_STATUS_OK;
693 * Register a dispatch function for a particular message type. Allow multiple
694 * registrants
696 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
697 void *private_data,
698 uint32_t msg_type,
699 void (*fn)(struct messaging_context *msg,
700 void *private_data,
701 uint32_t msg_type,
702 struct server_id server_id,
703 DATA_BLOB *data))
705 struct messaging_callback *cb;
707 DEBUG(5, ("Registering messaging pointer for type %u - "
708 "private_data=%p\n",
709 (unsigned)msg_type, private_data));
712 * Only one callback per type
715 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
716 /* we allow a second registration of the same message
717 type if it has a different private pointer. This is
718 needed in, for example, the internal notify code,
719 which creates a new notify context for each tree
720 connect, and expects to receive messages to each of
721 them. */
722 if (cb->msg_type == msg_type && private_data == cb->private_data) {
723 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
724 (unsigned)msg_type, private_data));
725 cb->fn = fn;
726 cb->private_data = private_data;
727 return NT_STATUS_OK;
731 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
732 return NT_STATUS_NO_MEMORY;
735 cb->msg_type = msg_type;
736 cb->fn = fn;
737 cb->private_data = private_data;
739 DLIST_ADD(msg_ctx->callbacks, cb);
740 return NT_STATUS_OK;
744 De-register the function for a particular message type.
746 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
747 void *private_data)
749 struct messaging_callback *cb, *next;
751 for (cb = ctx->callbacks; cb; cb = next) {
752 next = cb->next;
753 if ((cb->msg_type == msg_type)
754 && (cb->private_data == private_data)) {
755 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
756 (unsigned)msg_type, private_data));
757 DLIST_REMOVE(ctx->callbacks, cb);
758 TALLOC_FREE(cb);
764 Send a message to a particular server
766 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
767 struct server_id server, uint32_t msg_type,
768 const DATA_BLOB *data)
770 struct iovec iov = {0};
772 if (data != NULL) {
773 iov.iov_base = data->data;
774 iov.iov_len = data->length;
777 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
780 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
781 struct server_id server, uint32_t msg_type,
782 const uint8_t *buf, size_t len)
784 DATA_BLOB blob = data_blob_const(buf, len);
785 return messaging_send(msg_ctx, server, msg_type, &blob);
788 static int messaging_post_self(struct messaging_context *msg_ctx,
789 struct server_id src, struct server_id dst,
790 uint32_t msg_type,
791 const struct iovec *iov, int iovlen,
792 const int *fds, size_t num_fds)
794 struct messaging_rec *rec;
795 bool ok;
797 rec = messaging_rec_create(
798 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
799 if (rec == NULL) {
800 return ENOMEM;
803 ok = messaging_alert_event_contexts(msg_ctx);
804 if (!ok) {
805 TALLOC_FREE(rec);
806 return ENOMEM;
809 DLIST_ADD_END(msg_ctx->posted_msgs, rec);
811 return 0;
814 int messaging_send_iov_from(struct messaging_context *msg_ctx,
815 struct server_id src, struct server_id dst,
816 uint32_t msg_type,
817 const struct iovec *iov, int iovlen,
818 const int *fds, size_t num_fds)
820 int ret;
821 uint8_t hdr[MESSAGE_HDR_LENGTH];
822 struct iovec iov2[iovlen+1];
824 if (server_id_is_disconnected(&dst)) {
825 return EINVAL;
828 if (num_fds > INT8_MAX) {
829 return EINVAL;
832 if (server_id_equal(&dst, &msg_ctx->id)) {
833 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
834 iov, iovlen, fds, num_fds);
835 return ret;
838 message_hdr_put(hdr, msg_type, src, dst);
839 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
840 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
842 if (dst.vnn != msg_ctx->id.vnn) {
843 if (num_fds > 0) {
844 return ENOSYS;
847 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
848 return ret;
851 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
853 if (ret == EACCES) {
854 become_root();
855 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
856 fds, num_fds);
857 unbecome_root();
860 if (ret == ECONNREFUSED) {
862 * Linux returns this when a socket exists in the file
863 * system without a listening process. This is not
864 * documented in susv4 or the linux manpages, but it's
865 * easily testable. For the higher levels this is the
866 * same as "destination does not exist"
868 ret = ENOENT;
871 return ret;
874 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
875 struct server_id dst, uint32_t msg_type,
876 const struct iovec *iov, int iovlen,
877 const int *fds, size_t num_fds)
879 int ret;
881 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
882 iov, iovlen, fds, num_fds);
883 if (ret != 0) {
884 return map_nt_error_from_unix(ret);
886 return NT_STATUS_OK;
889 struct send_all_state {
890 struct messaging_context *msg_ctx;
891 int msg_type;
892 const void *buf;
893 size_t len;
896 static int send_all_fn(pid_t pid, void *private_data)
898 struct send_all_state *state = private_data;
899 NTSTATUS status;
901 if (pid == getpid()) {
902 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
903 return 0;
906 status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
907 state->msg_type, state->buf, state->len);
908 if (!NT_STATUS_IS_OK(status)) {
909 DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
910 (uintmax_t)pid, nt_errstr(status));
913 return 0;
916 void messaging_send_all(struct messaging_context *msg_ctx,
917 int msg_type, const void *buf, size_t len)
919 struct send_all_state state = {
920 .msg_ctx = msg_ctx, .msg_type = msg_type,
921 .buf = buf, .len = len
923 int ret;
925 #ifdef CLUSTER_SUPPORT
926 if (lp_clustering()) {
927 struct ctdbd_connection *conn = messaging_ctdb_connection();
928 uint8_t msghdr[MESSAGE_HDR_LENGTH];
929 struct iovec iov[] = {
930 { .iov_base = msghdr,
931 .iov_len = sizeof(msghdr) },
932 { .iov_base = discard_const_p(void, buf),
933 .iov_len = len }
936 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
937 (struct server_id) {0});
939 ret = ctdbd_messaging_send_iov(
940 conn, CTDB_BROADCAST_CONNECTED,
941 CTDB_SRVID_SAMBA_PROCESS,
942 iov, ARRAY_SIZE(iov));
943 if (ret != 0) {
944 DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
945 strerror(ret));
948 return;
950 #endif
952 ret = messaging_dgm_forall(send_all_fn, &state);
953 if (ret != 0) {
954 DBG_WARNING("messaging_dgm_forall failed: %s\n",
955 strerror(ret));
959 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
960 struct messaging_rec *rec)
962 struct messaging_rec *result;
963 size_t fds_size = sizeof(int64_t) * rec->num_fds;
964 size_t payload_len;
966 payload_len = rec->buf.length + fds_size;
967 if (payload_len < rec->buf.length) {
968 /* overflow */
969 return NULL;
972 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
973 payload_len);
974 if (result == NULL) {
975 return NULL;
977 *result = *rec;
979 /* Doesn't fail, see talloc_pooled_object */
981 result->buf.data = talloc_memdup(result, rec->buf.data,
982 rec->buf.length);
984 result->fds = NULL;
985 if (result->num_fds > 0) {
986 result->fds = talloc_memdup(result, rec->fds, fds_size);
989 return result;
992 struct messaging_filtered_read_state {
993 struct tevent_context *ev;
994 struct messaging_context *msg_ctx;
995 struct messaging_dgm_fde *fde;
996 struct messaging_ctdb_fde *cluster_fde;
998 bool (*filter)(struct messaging_rec *rec, void *private_data);
999 void *private_data;
1001 struct messaging_rec *rec;
1004 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1005 enum tevent_req_state req_state);
1007 struct tevent_req *messaging_filtered_read_send(
1008 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1009 struct messaging_context *msg_ctx,
1010 bool (*filter)(struct messaging_rec *rec, void *private_data),
1011 void *private_data)
1013 struct tevent_req *req;
1014 struct messaging_filtered_read_state *state;
1015 size_t new_waiters_len;
1016 bool ok;
1018 req = tevent_req_create(mem_ctx, &state,
1019 struct messaging_filtered_read_state);
1020 if (req == NULL) {
1021 return NULL;
1023 state->ev = ev;
1024 state->msg_ctx = msg_ctx;
1025 state->filter = filter;
1026 state->private_data = private_data;
1028 if (tevent_context_is_wrapper(ev) &&
1029 !tevent_context_same_loop(ev, msg_ctx->event_ctx))
1031 /* This is really a programmer error! */
1032 DBG_ERR("Wrapper tevent context doesn't use main context.\n");
1033 tevent_req_error(req, EINVAL);
1034 return tevent_req_post(req, ev);
1038 * We have to defer the callback here, as we might be called from
1039 * within a different tevent_context than state->ev.
1041 * This is important for two cases:
1042 * 1. nested event contexts, used by blocking ctdb calls
1043 * 2. possible impersonation using wrapper tevent contexts.
1045 tevent_req_defer_callback(req, state->ev);
1047 state->fde = messaging_dgm_register_tevent_context(state, ev);
1048 if (tevent_req_nomem(state->fde, req)) {
1049 return tevent_req_post(req, ev);
1052 if (lp_clustering()) {
1053 state->cluster_fde =
1054 messaging_ctdb_register_tevent_context(state, ev);
1055 if (tevent_req_nomem(state->cluster_fde, req)) {
1056 return tevent_req_post(req, ev);
1061 * We add ourselves to the "new_waiters" array, not the "waiters"
1062 * array. If we are called from within messaging_read_done,
1063 * messaging_dispatch_rec will be in an active for-loop on
1064 * "waiters". We must be careful not to mess with this array, because
1065 * it could mean that a single event is being delivered twice.
1068 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1070 if (new_waiters_len == msg_ctx->num_new_waiters) {
1071 struct tevent_req **tmp;
1073 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1074 struct tevent_req *, new_waiters_len+1);
1075 if (tevent_req_nomem(tmp, req)) {
1076 return tevent_req_post(req, ev);
1078 msg_ctx->new_waiters = tmp;
1081 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1082 msg_ctx->num_new_waiters += 1;
1083 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1085 ok = messaging_register_event_context(msg_ctx, ev);
1086 if (!ok) {
1087 tevent_req_oom(req);
1088 return tevent_req_post(req, ev);
1091 return req;
1094 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1095 enum tevent_req_state req_state)
1097 struct messaging_filtered_read_state *state = tevent_req_data(
1098 req, struct messaging_filtered_read_state);
1099 struct messaging_context *msg_ctx = state->msg_ctx;
1100 size_t i;
1101 bool ok;
1103 tevent_req_set_cleanup_fn(req, NULL);
1105 TALLOC_FREE(state->fde);
1106 TALLOC_FREE(state->cluster_fde);
1108 ok = messaging_deregister_event_context(msg_ctx, state->ev);
1109 if (!ok) {
1110 abort();
1114 * Just set the [new_]waiters entry to NULL, be careful not to mess
1115 * with the other "waiters" array contents. We are often called from
1116 * within "messaging_dispatch_rec", which loops over
1117 * "waiters". Messing with the "waiters" array will mess up that
1118 * for-loop.
1121 for (i=0; i<msg_ctx->num_waiters; i++) {
1122 if (msg_ctx->waiters[i] == req) {
1123 msg_ctx->waiters[i] = NULL;
1124 return;
1128 for (i=0; i<msg_ctx->num_new_waiters; i++) {
1129 if (msg_ctx->new_waiters[i] == req) {
1130 msg_ctx->new_waiters[i] = NULL;
1131 return;
1136 static void messaging_filtered_read_done(struct tevent_req *req,
1137 struct messaging_rec *rec)
1139 struct messaging_filtered_read_state *state = tevent_req_data(
1140 req, struct messaging_filtered_read_state);
1142 state->rec = messaging_rec_dup(state, rec);
1143 if (tevent_req_nomem(state->rec, req)) {
1144 return;
1146 tevent_req_done(req);
1149 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1150 struct messaging_rec **presult)
1152 struct messaging_filtered_read_state *state = tevent_req_data(
1153 req, struct messaging_filtered_read_state);
1154 int err;
1156 if (tevent_req_is_unix_error(req, &err)) {
1157 tevent_req_received(req);
1158 return err;
1160 if (presult != NULL) {
1161 *presult = talloc_move(mem_ctx, &state->rec);
1163 return 0;
1166 struct messaging_read_state {
1167 uint32_t msg_type;
1168 struct messaging_rec *rec;
1171 static bool messaging_read_filter(struct messaging_rec *rec,
1172 void *private_data);
1173 static void messaging_read_done(struct tevent_req *subreq);
1175 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1176 struct tevent_context *ev,
1177 struct messaging_context *msg,
1178 uint32_t msg_type)
1180 struct tevent_req *req, *subreq;
1181 struct messaging_read_state *state;
1183 req = tevent_req_create(mem_ctx, &state,
1184 struct messaging_read_state);
1185 if (req == NULL) {
1186 return NULL;
1188 state->msg_type = msg_type;
1190 subreq = messaging_filtered_read_send(state, ev, msg,
1191 messaging_read_filter, state);
1192 if (tevent_req_nomem(subreq, req)) {
1193 return tevent_req_post(req, ev);
1195 tevent_req_set_callback(subreq, messaging_read_done, req);
1196 return req;
1199 static bool messaging_read_filter(struct messaging_rec *rec,
1200 void *private_data)
1202 struct messaging_read_state *state = talloc_get_type_abort(
1203 private_data, struct messaging_read_state);
1205 if (rec->num_fds != 0) {
1206 return false;
1209 return rec->msg_type == state->msg_type;
1212 static void messaging_read_done(struct tevent_req *subreq)
1214 struct tevent_req *req = tevent_req_callback_data(
1215 subreq, struct tevent_req);
1216 struct messaging_read_state *state = tevent_req_data(
1217 req, struct messaging_read_state);
1218 int ret;
1220 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1221 TALLOC_FREE(subreq);
1222 if (tevent_req_error(req, ret)) {
1223 return;
1225 tevent_req_done(req);
1228 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1229 struct messaging_rec **presult)
1231 struct messaging_read_state *state = tevent_req_data(
1232 req, struct messaging_read_state);
1233 int err;
1235 if (tevent_req_is_unix_error(req, &err)) {
1236 return err;
1238 if (presult != NULL) {
1239 *presult = talloc_move(mem_ctx, &state->rec);
1241 return 0;
1244 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1246 if (msg_ctx->num_new_waiters == 0) {
1247 return true;
1250 if (talloc_array_length(msg_ctx->waiters) <
1251 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1252 struct tevent_req **tmp;
1253 tmp = talloc_realloc(
1254 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1255 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1256 if (tmp == NULL) {
1257 DEBUG(1, ("%s: talloc failed\n", __func__));
1258 return false;
1260 msg_ctx->waiters = tmp;
1263 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1264 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1266 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1267 msg_ctx->num_new_waiters = 0;
1269 return true;
1272 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1273 struct messaging_rec *rec)
1275 struct messaging_callback *cb, *next;
1277 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1278 size_t j;
1280 next = cb->next;
1281 if (cb->msg_type != rec->msg_type) {
1282 continue;
1286 * the old style callbacks don't support fd passing
1288 for (j=0; j < rec->num_fds; j++) {
1289 int fd = rec->fds[j];
1290 close(fd);
1292 rec->num_fds = 0;
1293 rec->fds = NULL;
1295 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1296 rec->src, &rec->buf);
1298 return true;
1301 return false;
1304 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1305 struct tevent_context *ev,
1306 struct messaging_rec *rec)
1308 size_t i;
1310 if (!messaging_append_new_waiters(msg_ctx)) {
1311 return false;
1314 i = 0;
1315 while (i < msg_ctx->num_waiters) {
1316 struct tevent_req *req;
1317 struct messaging_filtered_read_state *state;
1319 req = msg_ctx->waiters[i];
1320 if (req == NULL) {
1322 * This got cleaned up. In the meantime,
1323 * move everything down one. We need
1324 * to keep the order of waiters, as
1325 * other code may depend on this.
1327 if (i < msg_ctx->num_waiters - 1) {
1328 memmove(&msg_ctx->waiters[i],
1329 &msg_ctx->waiters[i+1],
1330 sizeof(struct tevent_req *) *
1331 (msg_ctx->num_waiters - i - 1));
1333 msg_ctx->num_waiters -= 1;
1334 continue;
1337 state = tevent_req_data(
1338 req, struct messaging_filtered_read_state);
1339 if (tevent_context_same_loop(ev, state->ev) &&
1340 state->filter(rec, state->private_data)) {
1341 messaging_filtered_read_done(req, rec);
1342 return true;
1345 i += 1;
1348 return false;
1352 Dispatch one messaging_rec
1354 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1355 struct tevent_context *ev,
1356 struct messaging_rec *rec)
1358 bool consumed;
1359 size_t i;
1362 * ev and msg_ctx->event_ctx can't be wrapper tevent_context pointers
1363 * so we don't need to use tevent_context_same_loop().
1366 if (ev == msg_ctx->event_ctx) {
1367 consumed = messaging_dispatch_classic(msg_ctx, rec);
1368 if (consumed) {
1369 return;
1373 consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1374 if (consumed) {
1375 return;
1378 if (ev != msg_ctx->event_ctx) {
1379 struct iovec iov;
1380 int fds[rec->num_fds];
1381 int ret;
1384 * We've been listening on a nested event
1385 * context. Messages need to be handled in the main
1386 * event context, so post to ourselves
1389 iov.iov_base = rec->buf.data;
1390 iov.iov_len = rec->buf.length;
1392 for (i=0; i<rec->num_fds; i++) {
1393 fds[i] = rec->fds[i];
1396 ret = messaging_post_self(
1397 msg_ctx, rec->src, rec->dest, rec->msg_type,
1398 &iov, 1, fds, rec->num_fds);
1399 if (ret == 0) {
1400 return;
1405 * If the fd-array isn't used, just close it.
1407 for (i=0; i < rec->num_fds; i++) {
1408 int fd = rec->fds[i];
1409 close(fd);
1411 rec->num_fds = 0;
1412 rec->fds = NULL;
1415 static int mess_parent_dgm_cleanup(void *private_data);
1416 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1418 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1420 struct tevent_req *req;
1422 req = background_job_send(
1423 msg, msg->event_ctx, msg, NULL, 0,
1424 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1425 60*15),
1426 mess_parent_dgm_cleanup, msg);
1427 if (req == NULL) {
1428 DBG_WARNING("background_job_send failed\n");
1429 return false;
1431 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1432 return true;
1435 static int mess_parent_dgm_cleanup(void *private_data)
1437 int ret;
1439 ret = messaging_dgm_wipe();
1440 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1441 ret ? strerror(ret) : "ok"));
1442 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1443 60*15);
1446 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1448 struct messaging_context *msg = tevent_req_callback_data(
1449 req, struct messaging_context);
1450 NTSTATUS status;
1452 status = background_job_recv(req);
1453 TALLOC_FREE(req);
1454 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1455 nt_errstr(status)));
1457 req = background_job_send(
1458 msg, msg->event_ctx, msg, NULL, 0,
1459 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1460 60*15),
1461 mess_parent_dgm_cleanup, msg);
1462 if (req == NULL) {
1463 DEBUG(1, ("background_job_send failed\n"));
1464 return;
1466 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1469 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1471 int ret;
1473 if (pid == 0) {
1474 ret = messaging_dgm_wipe();
1475 } else {
1476 ret = messaging_dgm_cleanup(pid);
1479 return ret;
1482 struct tevent_context *messaging_tevent_context(
1483 struct messaging_context *msg_ctx)
1485 return msg_ctx->event_ctx;
1488 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1490 return msg_ctx->names_db;
1493 /** @} **/