Revert "s3:messages: allow messaging_filtered_read_send() to use wrapper tevent_context"
[Samba.git] / source3 / lib / messages.c
blob864d758fb13ec4b1f72e4fa02cb0abb1e70e6d27
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
70 struct messaging_callback {
71 struct messaging_callback *prev, *next;
72 uint32_t msg_type;
73 void (*fn)(struct messaging_context *msg, void *private_data,
74 uint32_t msg_type,
75 struct server_id server_id, DATA_BLOB *data);
76 void *private_data;
79 struct messaging_registered_ev {
80 struct tevent_context *ev;
81 struct tevent_immediate *im;
82 size_t refcount;
85 struct messaging_context {
86 struct server_id id;
87 struct tevent_context *event_ctx;
88 struct messaging_callback *callbacks;
90 struct messaging_rec *posted_msgs;
92 struct messaging_registered_ev *event_contexts;
94 struct tevent_req **new_waiters;
95 size_t num_new_waiters;
97 struct tevent_req **waiters;
98 size_t num_waiters;
100 void *msg_dgm_ref;
101 void *msg_ctdb_ref;
103 struct server_id_db *names_db;
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107 struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109 struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111 struct tevent_context *ev,
112 struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114 struct tevent_context *ev,
115 struct messaging_rec *rec);
117 /****************************************************************************
118 A useful function for testing the message system.
119 ****************************************************************************/
121 static void ping_message(struct messaging_context *msg_ctx,
122 void *private_data,
123 uint32_t msg_type,
124 struct server_id src,
125 DATA_BLOB *data)
127 struct server_id_buf idbuf;
129 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130 server_id_str_buf(src, &idbuf), (int)data->length,
131 data->data ? (char *)data->data : ""));
133 messaging_send(msg_ctx, src, MSG_PONG, data);
136 struct messaging_rec *messaging_rec_create(
137 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138 uint32_t msg_type, const struct iovec *iov, int iovlen,
139 const int *fds, size_t num_fds)
141 ssize_t buflen;
142 uint8_t *buf;
143 struct messaging_rec *result;
145 if (num_fds > INT8_MAX) {
146 return NULL;
149 buflen = iov_buflen(iov, iovlen);
150 if (buflen == -1) {
151 return NULL;
153 buf = talloc_array(mem_ctx, uint8_t, buflen);
154 if (buf == NULL) {
155 return NULL;
157 iov_buf(iov, iovlen, buf, buflen);
160 struct messaging_rec rec;
161 int64_t fds64[num_fds];
162 size_t i;
164 for (i=0; i<num_fds; i++) {
165 fds64[i] = fds[i];
168 rec = (struct messaging_rec) {
169 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170 .src = src, .dest = dst,
171 .buf.data = buf, .buf.length = buflen,
172 .num_fds = num_fds, .fds = fds64,
175 result = messaging_rec_dup(mem_ctx, &rec);
178 TALLOC_FREE(buf);
180 return result;
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184 struct tevent_context *ev)
186 size_t i, num_event_contexts;
187 struct messaging_registered_ev *free_reg = NULL;
188 struct messaging_registered_ev *tmp;
190 num_event_contexts = talloc_array_length(ctx->event_contexts);
192 for (i=0; i<num_event_contexts; i++) {
193 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
195 if (reg->refcount == 0) {
196 if (reg->ev != NULL) {
197 abort();
199 free_reg = reg;
201 * We continue here and may find another
202 * free_req, but the important thing is
203 * that we continue to search for an
204 * existing registration in the loop.
206 continue;
209 if (reg->ev == ev) {
210 reg->refcount += 1;
211 return true;
215 if (free_reg == NULL) {
216 struct tevent_immediate *im = NULL;
218 im = tevent_create_immediate(ctx);
219 if (im == NULL) {
220 return false;
223 tmp = talloc_realloc(ctx, ctx->event_contexts,
224 struct messaging_registered_ev,
225 num_event_contexts+1);
226 if (tmp == NULL) {
227 return false;
229 ctx->event_contexts = tmp;
231 free_reg = &ctx->event_contexts[num_event_contexts];
232 free_reg->im = talloc_move(ctx->event_contexts, &im);
236 * free_reg->im might be cached
238 free_reg->ev = ev;
239 free_reg->refcount = 1;
241 return true;
244 static bool messaging_deregister_event_context(struct messaging_context *ctx,
245 struct tevent_context *ev)
247 size_t i, num_event_contexts;
249 num_event_contexts = talloc_array_length(ctx->event_contexts);
251 for (i=0; i<num_event_contexts; i++) {
252 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
254 if (reg->refcount == 0) {
255 continue;
258 if (reg->ev == ev) {
259 reg->refcount -= 1;
261 if (reg->refcount == 0) {
263 * The primary event context
264 * is never unregistered using
265 * messaging_deregister_event_context()
266 * it's only registered using
267 * messaging_register_event_context().
269 SMB_ASSERT(ev != ctx->event_ctx);
270 SMB_ASSERT(reg->ev != ctx->event_ctx);
273 * Not strictly necessary, just
274 * paranoia
276 reg->ev = NULL;
279 * Do not talloc_free(reg->im),
280 * recycle immediates events.
282 * We just invalidate it using
283 * the primary event context,
284 * which is never unregistered.
286 tevent_schedule_immediate(reg->im,
287 ctx->event_ctx,
288 NULL, NULL);
290 return true;
293 return false;
296 static void messaging_post_main_event_context(struct tevent_context *ev,
297 struct tevent_immediate *im,
298 void *private_data)
300 struct messaging_context *ctx = talloc_get_type_abort(
301 private_data, struct messaging_context);
303 while (ctx->posted_msgs != NULL) {
304 struct messaging_rec *rec = ctx->posted_msgs;
305 bool consumed;
307 DLIST_REMOVE(ctx->posted_msgs, rec);
309 consumed = messaging_dispatch_classic(ctx, rec);
310 if (!consumed) {
311 consumed = messaging_dispatch_waiters(
312 ctx, ctx->event_ctx, rec);
315 if (!consumed) {
316 uint8_t i;
318 for (i=0; i<rec->num_fds; i++) {
319 close(rec->fds[i]);
323 TALLOC_FREE(rec);
327 static void messaging_post_sub_event_context(struct tevent_context *ev,
328 struct tevent_immediate *im,
329 void *private_data)
331 struct messaging_context *ctx = talloc_get_type_abort(
332 private_data, struct messaging_context);
333 struct messaging_rec *rec, *next;
335 for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
336 bool consumed;
338 next = rec->next;
340 consumed = messaging_dispatch_waiters(ctx, ev, rec);
341 if (consumed) {
342 DLIST_REMOVE(ctx->posted_msgs, rec);
343 TALLOC_FREE(rec);
348 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
350 size_t i, num_event_contexts;
352 num_event_contexts = talloc_array_length(ctx->event_contexts);
354 for (i=0; i<num_event_contexts; i++) {
355 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
357 if (reg->refcount == 0) {
358 continue;
362 * We depend on schedule_immediate to work
363 * multiple times. Might be a bit inefficient,
364 * but this needs to be proven in tests. The
365 * alternatively would be to track whether the
366 * immediate has already been scheduled. For
367 * now, avoid that complexity here.
369 * reg->ev and ctx->event_ctx can't
370 * be wrapper tevent_context pointers
371 * so we don't need to use
372 * tevent_context_same_loop().
375 if (reg->ev == ctx->event_ctx) {
376 tevent_schedule_immediate(
377 reg->im, reg->ev,
378 messaging_post_main_event_context,
379 ctx);
380 } else {
381 tevent_schedule_immediate(
382 reg->im, reg->ev,
383 messaging_post_sub_event_context,
384 ctx);
388 return true;
391 static void messaging_recv_cb(struct tevent_context *ev,
392 const uint8_t *msg, size_t msg_len,
393 int *fds, size_t num_fds,
394 void *private_data)
396 struct messaging_context *msg_ctx = talloc_get_type_abort(
397 private_data, struct messaging_context);
398 struct server_id_buf idbuf;
399 struct messaging_rec rec;
400 int64_t fds64[MIN(num_fds, INT8_MAX)];
401 size_t i;
403 if (msg_len < MESSAGE_HDR_LENGTH) {
404 DBG_WARNING("message too short: %zu\n", msg_len);
405 goto close_fail;
408 if (num_fds > INT8_MAX) {
409 DBG_WARNING("too many fds: %zu\n", num_fds);
410 goto close_fail;
414 * "consume" the fds by copying them and setting
415 * the original variable to -1
417 for (i=0; i < num_fds; i++) {
418 fds64[i] = fds[i];
419 fds[i] = -1;
422 rec = (struct messaging_rec) {
423 .msg_version = MESSAGE_VERSION,
424 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
425 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
426 .num_fds = num_fds,
427 .fds = fds64,
430 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
432 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
433 (unsigned)rec.msg_type, rec.buf.length, num_fds,
434 server_id_str_buf(rec.src, &idbuf));
436 if (server_id_same_process(&rec.src, &msg_ctx->id)) {
437 DBG_DEBUG("Ignoring self-send\n");
438 goto close_fail;
441 messaging_dispatch_rec(msg_ctx, ev, &rec);
442 return;
444 close_fail:
445 for (i=0; i < num_fds; i++) {
446 close(fds[i]);
450 static int messaging_context_destructor(struct messaging_context *ctx)
452 size_t i;
454 for (i=0; i<ctx->num_new_waiters; i++) {
455 if (ctx->new_waiters[i] != NULL) {
456 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
457 ctx->new_waiters[i] = NULL;
460 for (i=0; i<ctx->num_waiters; i++) {
461 if (ctx->waiters[i] != NULL) {
462 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
463 ctx->waiters[i] = NULL;
468 * The immediates from messaging_alert_event_contexts
469 * reference "ctx". Don't let them outlive the
470 * messaging_context we're destroying here.
472 TALLOC_FREE(ctx->event_contexts);
474 return 0;
477 static const char *private_path(const char *name)
479 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
482 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
483 struct tevent_context *ev,
484 struct messaging_context **pmsg_ctx)
486 TALLOC_CTX *frame;
487 struct messaging_context *ctx;
488 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
489 int ret;
490 const char *lck_path;
491 const char *priv_path;
492 bool ok;
495 * sec_init() *must* be called before any other
496 * functions that use sec_XXX(). e.g. sec_initial_uid().
499 sec_init();
501 if (tevent_context_is_wrapper(ev)) {
502 /* This is really a programmer error! */
503 DBG_ERR("Should not be used with a wrapper tevent context\n");
504 return NT_STATUS_INVALID_PARAMETER;
507 lck_path = lock_path(talloc_tos(), "msg.lock");
508 if (lck_path == NULL) {
509 return NT_STATUS_NO_MEMORY;
512 ok = directory_create_or_exist_strict(lck_path,
513 sec_initial_uid(),
514 0755);
515 if (!ok) {
516 DBG_DEBUG("Could not create lock directory: %s\n",
517 strerror(errno));
518 return NT_STATUS_ACCESS_DENIED;
521 priv_path = private_path("msg.sock");
522 if (priv_path == NULL) {
523 return NT_STATUS_NO_MEMORY;
526 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
527 0700);
528 if (!ok) {
529 DBG_DEBUG("Could not create msg directory: %s\n",
530 strerror(errno));
531 return NT_STATUS_ACCESS_DENIED;
534 frame = talloc_stackframe();
535 if (frame == NULL) {
536 return NT_STATUS_NO_MEMORY;
539 ctx = talloc_zero(frame, struct messaging_context);
540 if (ctx == NULL) {
541 status = NT_STATUS_NO_MEMORY;
542 goto done;
545 ctx->id = (struct server_id) {
546 .pid = getpid(), .vnn = NONCLUSTER_VNN
549 ctx->event_ctx = ev;
551 ok = messaging_register_event_context(ctx, ev);
552 if (!ok) {
553 status = NT_STATUS_NO_MEMORY;
554 goto done;
557 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
558 ctx->event_ctx,
559 &ctx->id.unique_id,
560 priv_path,
561 lck_path,
562 messaging_recv_cb,
563 ctx,
564 &ret);
565 if (ctx->msg_dgm_ref == NULL) {
566 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
567 status = map_nt_error_from_unix(ret);
568 goto done;
570 talloc_set_destructor(ctx, messaging_context_destructor);
572 #ifdef CLUSTER_SUPPORT
573 if (lp_clustering()) {
574 ctx->msg_ctdb_ref = messaging_ctdb_ref(
575 ctx, ctx->event_ctx,
576 lp_ctdbd_socket(), lp_ctdb_timeout(),
577 ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
578 if (ctx->msg_ctdb_ref == NULL) {
579 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
580 strerror(ret));
581 status = map_nt_error_from_unix(ret);
582 goto done;
585 #endif
587 ctx->id.vnn = get_my_vnn();
589 ctx->names_db = server_id_db_init(ctx,
590 ctx->id,
591 lp_lock_directory(),
593 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
594 if (ctx->names_db == NULL) {
595 DBG_DEBUG("server_id_db_init failed\n");
596 status = NT_STATUS_NO_MEMORY;
597 goto done;
600 messaging_register(ctx, NULL, MSG_PING, ping_message);
602 /* Register some debugging related messages */
604 register_msg_pool_usage(ctx);
605 register_dmalloc_msgs(ctx);
606 debug_register_msgs(ctx);
609 struct server_id_buf tmp;
610 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
613 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
615 status = NT_STATUS_OK;
616 done:
617 TALLOC_FREE(frame);
619 return status;
622 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
623 struct tevent_context *ev)
625 struct messaging_context *ctx = NULL;
626 NTSTATUS status;
628 status = messaging_init_internal(mem_ctx,
630 &ctx);
631 if (!NT_STATUS_IS_OK(status)) {
632 return NULL;
635 return ctx;
638 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
640 return msg_ctx->id;
644 * re-init after a fork
646 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
648 int ret;
649 char *lck_path;
651 TALLOC_FREE(msg_ctx->msg_dgm_ref);
652 TALLOC_FREE(msg_ctx->msg_ctdb_ref);
654 msg_ctx->id = (struct server_id) {
655 .pid = getpid(), .vnn = msg_ctx->id.vnn
658 lck_path = lock_path(talloc_tos(), "msg.lock");
659 if (lck_path == NULL) {
660 return NT_STATUS_NO_MEMORY;
663 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
664 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
665 private_path("msg.sock"), lck_path,
666 messaging_recv_cb, msg_ctx, &ret);
668 if (msg_ctx->msg_dgm_ref == NULL) {
669 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
670 return map_nt_error_from_unix(ret);
673 if (lp_clustering()) {
674 msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
675 msg_ctx, msg_ctx->event_ctx,
676 lp_ctdbd_socket(), lp_ctdb_timeout(),
677 msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
678 &ret);
679 if (msg_ctx->msg_ctdb_ref == NULL) {
680 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
681 strerror(ret));
682 return map_nt_error_from_unix(ret);
686 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
688 return NT_STATUS_OK;
693 * Register a dispatch function for a particular message type. Allow multiple
694 * registrants
696 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
697 void *private_data,
698 uint32_t msg_type,
699 void (*fn)(struct messaging_context *msg,
700 void *private_data,
701 uint32_t msg_type,
702 struct server_id server_id,
703 DATA_BLOB *data))
705 struct messaging_callback *cb;
707 DEBUG(5, ("Registering messaging pointer for type %u - "
708 "private_data=%p\n",
709 (unsigned)msg_type, private_data));
712 * Only one callback per type
715 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
716 /* we allow a second registration of the same message
717 type if it has a different private pointer. This is
718 needed in, for example, the internal notify code,
719 which creates a new notify context for each tree
720 connect, and expects to receive messages to each of
721 them. */
722 if (cb->msg_type == msg_type && private_data == cb->private_data) {
723 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
724 (unsigned)msg_type, private_data));
725 cb->fn = fn;
726 cb->private_data = private_data;
727 return NT_STATUS_OK;
731 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
732 return NT_STATUS_NO_MEMORY;
735 cb->msg_type = msg_type;
736 cb->fn = fn;
737 cb->private_data = private_data;
739 DLIST_ADD(msg_ctx->callbacks, cb);
740 return NT_STATUS_OK;
744 De-register the function for a particular message type.
746 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
747 void *private_data)
749 struct messaging_callback *cb, *next;
751 for (cb = ctx->callbacks; cb; cb = next) {
752 next = cb->next;
753 if ((cb->msg_type == msg_type)
754 && (cb->private_data == private_data)) {
755 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
756 (unsigned)msg_type, private_data));
757 DLIST_REMOVE(ctx->callbacks, cb);
758 TALLOC_FREE(cb);
764 Send a message to a particular server
766 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
767 struct server_id server, uint32_t msg_type,
768 const DATA_BLOB *data)
770 struct iovec iov = {0};
772 if (data != NULL) {
773 iov.iov_base = data->data;
774 iov.iov_len = data->length;
777 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
780 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
781 struct server_id server, uint32_t msg_type,
782 const uint8_t *buf, size_t len)
784 DATA_BLOB blob = data_blob_const(buf, len);
785 return messaging_send(msg_ctx, server, msg_type, &blob);
788 static int messaging_post_self(struct messaging_context *msg_ctx,
789 struct server_id src, struct server_id dst,
790 uint32_t msg_type,
791 const struct iovec *iov, int iovlen,
792 const int *fds, size_t num_fds)
794 struct messaging_rec *rec;
795 bool ok;
797 rec = messaging_rec_create(
798 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
799 if (rec == NULL) {
800 return ENOMEM;
803 ok = messaging_alert_event_contexts(msg_ctx);
804 if (!ok) {
805 TALLOC_FREE(rec);
806 return ENOMEM;
809 DLIST_ADD_END(msg_ctx->posted_msgs, rec);
811 return 0;
814 int messaging_send_iov_from(struct messaging_context *msg_ctx,
815 struct server_id src, struct server_id dst,
816 uint32_t msg_type,
817 const struct iovec *iov, int iovlen,
818 const int *fds, size_t num_fds)
820 int ret;
821 uint8_t hdr[MESSAGE_HDR_LENGTH];
822 struct iovec iov2[iovlen+1];
824 if (server_id_is_disconnected(&dst)) {
825 return EINVAL;
828 if (num_fds > INT8_MAX) {
829 return EINVAL;
832 if (server_id_equal(&dst, &msg_ctx->id)) {
833 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
834 iov, iovlen, fds, num_fds);
835 return ret;
838 message_hdr_put(hdr, msg_type, src, dst);
839 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
840 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
842 if (dst.vnn != msg_ctx->id.vnn) {
843 if (num_fds > 0) {
844 return ENOSYS;
847 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
848 return ret;
851 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
853 if (ret == EACCES) {
854 become_root();
855 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
856 fds, num_fds);
857 unbecome_root();
860 if (ret == ECONNREFUSED) {
862 * Linux returns this when a socket exists in the file
863 * system without a listening process. This is not
864 * documented in susv4 or the linux manpages, but it's
865 * easily testable. For the higher levels this is the
866 * same as "destination does not exist"
868 ret = ENOENT;
871 return ret;
874 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
875 struct server_id dst, uint32_t msg_type,
876 const struct iovec *iov, int iovlen,
877 const int *fds, size_t num_fds)
879 int ret;
881 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
882 iov, iovlen, fds, num_fds);
883 if (ret != 0) {
884 return map_nt_error_from_unix(ret);
886 return NT_STATUS_OK;
889 struct send_all_state {
890 struct messaging_context *msg_ctx;
891 int msg_type;
892 const void *buf;
893 size_t len;
896 static int send_all_fn(pid_t pid, void *private_data)
898 struct send_all_state *state = private_data;
899 NTSTATUS status;
901 if (pid == getpid()) {
902 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
903 return 0;
906 status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
907 state->msg_type, state->buf, state->len);
908 if (!NT_STATUS_IS_OK(status)) {
909 DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
910 (uintmax_t)pid, nt_errstr(status));
913 return 0;
916 void messaging_send_all(struct messaging_context *msg_ctx,
917 int msg_type, const void *buf, size_t len)
919 struct send_all_state state = {
920 .msg_ctx = msg_ctx, .msg_type = msg_type,
921 .buf = buf, .len = len
923 int ret;
925 #ifdef CLUSTER_SUPPORT
926 if (lp_clustering()) {
927 struct ctdbd_connection *conn = messaging_ctdb_connection();
928 uint8_t msghdr[MESSAGE_HDR_LENGTH];
929 struct iovec iov[] = {
930 { .iov_base = msghdr,
931 .iov_len = sizeof(msghdr) },
932 { .iov_base = discard_const_p(void, buf),
933 .iov_len = len }
936 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
937 (struct server_id) {0});
939 ret = ctdbd_messaging_send_iov(
940 conn, CTDB_BROADCAST_CONNECTED,
941 CTDB_SRVID_SAMBA_PROCESS,
942 iov, ARRAY_SIZE(iov));
943 if (ret != 0) {
944 DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
945 strerror(ret));
948 return;
950 #endif
952 ret = messaging_dgm_forall(send_all_fn, &state);
953 if (ret != 0) {
954 DBG_WARNING("messaging_dgm_forall failed: %s\n",
955 strerror(ret));
959 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
960 struct messaging_rec *rec)
962 struct messaging_rec *result;
963 size_t fds_size = sizeof(int64_t) * rec->num_fds;
964 size_t payload_len;
966 payload_len = rec->buf.length + fds_size;
967 if (payload_len < rec->buf.length) {
968 /* overflow */
969 return NULL;
972 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
973 payload_len);
974 if (result == NULL) {
975 return NULL;
977 *result = *rec;
979 /* Doesn't fail, see talloc_pooled_object */
981 result->buf.data = talloc_memdup(result, rec->buf.data,
982 rec->buf.length);
984 result->fds = NULL;
985 if (result->num_fds > 0) {
986 result->fds = talloc_memdup(result, rec->fds, fds_size);
989 return result;
992 struct messaging_filtered_read_state {
993 struct tevent_context *ev;
994 struct messaging_context *msg_ctx;
995 struct messaging_dgm_fde *fde;
996 struct messaging_ctdb_fde *cluster_fde;
998 bool (*filter)(struct messaging_rec *rec, void *private_data);
999 void *private_data;
1001 struct messaging_rec *rec;
1004 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1005 enum tevent_req_state req_state);
1007 struct tevent_req *messaging_filtered_read_send(
1008 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1009 struct messaging_context *msg_ctx,
1010 bool (*filter)(struct messaging_rec *rec, void *private_data),
1011 void *private_data)
1013 struct tevent_req *req;
1014 struct messaging_filtered_read_state *state;
1015 size_t new_waiters_len;
1016 bool ok;
1018 req = tevent_req_create(mem_ctx, &state,
1019 struct messaging_filtered_read_state);
1020 if (req == NULL) {
1021 return NULL;
1023 state->ev = ev;
1024 state->msg_ctx = msg_ctx;
1025 state->filter = filter;
1026 state->private_data = private_data;
1028 if (tevent_context_is_wrapper(ev)) {
1029 /* This is really a programmer error! */
1030 DBG_ERR("Wrapper tevent context doesn't use main context.\n");
1031 tevent_req_error(req, EINVAL);
1032 return tevent_req_post(req, ev);
1036 * We have to defer the callback here, as we might be called from
1037 * within a different tevent_context than state->ev
1039 tevent_req_defer_callback(req, state->ev);
1041 state->fde = messaging_dgm_register_tevent_context(state, ev);
1042 if (tevent_req_nomem(state->fde, req)) {
1043 return tevent_req_post(req, ev);
1046 if (lp_clustering()) {
1047 state->cluster_fde =
1048 messaging_ctdb_register_tevent_context(state, ev);
1049 if (tevent_req_nomem(state->cluster_fde, req)) {
1050 return tevent_req_post(req, ev);
1055 * We add ourselves to the "new_waiters" array, not the "waiters"
1056 * array. If we are called from within messaging_read_done,
1057 * messaging_dispatch_rec will be in an active for-loop on
1058 * "waiters". We must be careful not to mess with this array, because
1059 * it could mean that a single event is being delivered twice.
1062 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1064 if (new_waiters_len == msg_ctx->num_new_waiters) {
1065 struct tevent_req **tmp;
1067 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1068 struct tevent_req *, new_waiters_len+1);
1069 if (tevent_req_nomem(tmp, req)) {
1070 return tevent_req_post(req, ev);
1072 msg_ctx->new_waiters = tmp;
1075 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1076 msg_ctx->num_new_waiters += 1;
1077 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1079 ok = messaging_register_event_context(msg_ctx, ev);
1080 if (!ok) {
1081 tevent_req_oom(req);
1082 return tevent_req_post(req, ev);
1085 return req;
1088 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1089 enum tevent_req_state req_state)
1091 struct messaging_filtered_read_state *state = tevent_req_data(
1092 req, struct messaging_filtered_read_state);
1093 struct messaging_context *msg_ctx = state->msg_ctx;
1094 size_t i;
1095 bool ok;
1097 tevent_req_set_cleanup_fn(req, NULL);
1099 TALLOC_FREE(state->fde);
1100 TALLOC_FREE(state->cluster_fde);
1102 ok = messaging_deregister_event_context(msg_ctx, state->ev);
1103 if (!ok) {
1104 abort();
1108 * Just set the [new_]waiters entry to NULL, be careful not to mess
1109 * with the other "waiters" array contents. We are often called from
1110 * within "messaging_dispatch_rec", which loops over
1111 * "waiters". Messing with the "waiters" array will mess up that
1112 * for-loop.
1115 for (i=0; i<msg_ctx->num_waiters; i++) {
1116 if (msg_ctx->waiters[i] == req) {
1117 msg_ctx->waiters[i] = NULL;
1118 return;
1122 for (i=0; i<msg_ctx->num_new_waiters; i++) {
1123 if (msg_ctx->new_waiters[i] == req) {
1124 msg_ctx->new_waiters[i] = NULL;
1125 return;
1130 static void messaging_filtered_read_done(struct tevent_req *req,
1131 struct messaging_rec *rec)
1133 struct messaging_filtered_read_state *state = tevent_req_data(
1134 req, struct messaging_filtered_read_state);
1136 state->rec = messaging_rec_dup(state, rec);
1137 if (tevent_req_nomem(state->rec, req)) {
1138 return;
1140 tevent_req_done(req);
1143 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1144 struct messaging_rec **presult)
1146 struct messaging_filtered_read_state *state = tevent_req_data(
1147 req, struct messaging_filtered_read_state);
1148 int err;
1150 if (tevent_req_is_unix_error(req, &err)) {
1151 tevent_req_received(req);
1152 return err;
1154 if (presult != NULL) {
1155 *presult = talloc_move(mem_ctx, &state->rec);
1157 return 0;
1160 struct messaging_read_state {
1161 uint32_t msg_type;
1162 struct messaging_rec *rec;
1165 static bool messaging_read_filter(struct messaging_rec *rec,
1166 void *private_data);
1167 static void messaging_read_done(struct tevent_req *subreq);
1169 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1170 struct tevent_context *ev,
1171 struct messaging_context *msg,
1172 uint32_t msg_type)
1174 struct tevent_req *req, *subreq;
1175 struct messaging_read_state *state;
1177 req = tevent_req_create(mem_ctx, &state,
1178 struct messaging_read_state);
1179 if (req == NULL) {
1180 return NULL;
1182 state->msg_type = msg_type;
1184 subreq = messaging_filtered_read_send(state, ev, msg,
1185 messaging_read_filter, state);
1186 if (tevent_req_nomem(subreq, req)) {
1187 return tevent_req_post(req, ev);
1189 tevent_req_set_callback(subreq, messaging_read_done, req);
1190 return req;
1193 static bool messaging_read_filter(struct messaging_rec *rec,
1194 void *private_data)
1196 struct messaging_read_state *state = talloc_get_type_abort(
1197 private_data, struct messaging_read_state);
1199 if (rec->num_fds != 0) {
1200 return false;
1203 return rec->msg_type == state->msg_type;
1206 static void messaging_read_done(struct tevent_req *subreq)
1208 struct tevent_req *req = tevent_req_callback_data(
1209 subreq, struct tevent_req);
1210 struct messaging_read_state *state = tevent_req_data(
1211 req, struct messaging_read_state);
1212 int ret;
1214 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1215 TALLOC_FREE(subreq);
1216 if (tevent_req_error(req, ret)) {
1217 return;
1219 tevent_req_done(req);
1222 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1223 struct messaging_rec **presult)
1225 struct messaging_read_state *state = tevent_req_data(
1226 req, struct messaging_read_state);
1227 int err;
1229 if (tevent_req_is_unix_error(req, &err)) {
1230 return err;
1232 if (presult != NULL) {
1233 *presult = talloc_move(mem_ctx, &state->rec);
1235 return 0;
1238 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1240 if (msg_ctx->num_new_waiters == 0) {
1241 return true;
1244 if (talloc_array_length(msg_ctx->waiters) <
1245 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1246 struct tevent_req **tmp;
1247 tmp = talloc_realloc(
1248 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1249 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1250 if (tmp == NULL) {
1251 DEBUG(1, ("%s: talloc failed\n", __func__));
1252 return false;
1254 msg_ctx->waiters = tmp;
1257 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1258 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1260 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1261 msg_ctx->num_new_waiters = 0;
1263 return true;
1266 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1267 struct messaging_rec *rec)
1269 struct messaging_callback *cb, *next;
1271 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1272 size_t j;
1274 next = cb->next;
1275 if (cb->msg_type != rec->msg_type) {
1276 continue;
1280 * the old style callbacks don't support fd passing
1282 for (j=0; j < rec->num_fds; j++) {
1283 int fd = rec->fds[j];
1284 close(fd);
1286 rec->num_fds = 0;
1287 rec->fds = NULL;
1289 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1290 rec->src, &rec->buf);
1292 return true;
1295 return false;
1298 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1299 struct tevent_context *ev,
1300 struct messaging_rec *rec)
1302 size_t i;
1304 if (!messaging_append_new_waiters(msg_ctx)) {
1305 return false;
1308 i = 0;
1309 while (i < msg_ctx->num_waiters) {
1310 struct tevent_req *req;
1311 struct messaging_filtered_read_state *state;
1313 req = msg_ctx->waiters[i];
1314 if (req == NULL) {
1316 * This got cleaned up. In the meantime,
1317 * move everything down one. We need
1318 * to keep the order of waiters, as
1319 * other code may depend on this.
1321 if (i < msg_ctx->num_waiters - 1) {
1322 memmove(&msg_ctx->waiters[i],
1323 &msg_ctx->waiters[i+1],
1324 sizeof(struct tevent_req *) *
1325 (msg_ctx->num_waiters - i - 1));
1327 msg_ctx->num_waiters -= 1;
1328 continue;
1331 state = tevent_req_data(
1332 req, struct messaging_filtered_read_state);
1333 if ((ev == state->ev) &&
1334 state->filter(rec, state->private_data)) {
1335 messaging_filtered_read_done(req, rec);
1336 return true;
1339 i += 1;
1342 return false;
1346 Dispatch one messaging_rec
1348 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1349 struct tevent_context *ev,
1350 struct messaging_rec *rec)
1352 bool consumed;
1353 size_t i;
1356 * ev and msg_ctx->event_ctx can't be wrapper tevent_context pointers
1357 * so we don't need to use tevent_context_same_loop().
1360 if (ev == msg_ctx->event_ctx) {
1361 consumed = messaging_dispatch_classic(msg_ctx, rec);
1362 if (consumed) {
1363 return;
1367 consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1368 if (consumed) {
1369 return;
1372 if (ev != msg_ctx->event_ctx) {
1373 struct iovec iov;
1374 int fds[rec->num_fds];
1375 int ret;
1378 * We've been listening on a nested event
1379 * context. Messages need to be handled in the main
1380 * event context, so post to ourselves
1383 iov.iov_base = rec->buf.data;
1384 iov.iov_len = rec->buf.length;
1386 for (i=0; i<rec->num_fds; i++) {
1387 fds[i] = rec->fds[i];
1390 ret = messaging_post_self(
1391 msg_ctx, rec->src, rec->dest, rec->msg_type,
1392 &iov, 1, fds, rec->num_fds);
1393 if (ret == 0) {
1394 return;
1399 * If the fd-array isn't used, just close it.
1401 for (i=0; i < rec->num_fds; i++) {
1402 int fd = rec->fds[i];
1403 close(fd);
1405 rec->num_fds = 0;
1406 rec->fds = NULL;
1409 static int mess_parent_dgm_cleanup(void *private_data);
1410 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1412 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1414 struct tevent_req *req;
1416 req = background_job_send(
1417 msg, msg->event_ctx, msg, NULL, 0,
1418 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1419 60*15),
1420 mess_parent_dgm_cleanup, msg);
1421 if (req == NULL) {
1422 DBG_WARNING("background_job_send failed\n");
1423 return false;
1425 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1426 return true;
1429 static int mess_parent_dgm_cleanup(void *private_data)
1431 int ret;
1433 ret = messaging_dgm_wipe();
1434 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1435 ret ? strerror(ret) : "ok"));
1436 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1437 60*15);
1440 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1442 struct messaging_context *msg = tevent_req_callback_data(
1443 req, struct messaging_context);
1444 NTSTATUS status;
1446 status = background_job_recv(req);
1447 TALLOC_FREE(req);
1448 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1449 nt_errstr(status)));
1451 req = background_job_send(
1452 msg, msg->event_ctx, msg, NULL, 0,
1453 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1454 60*15),
1455 mess_parent_dgm_cleanup, msg);
1456 if (req == NULL) {
1457 DEBUG(1, ("background_job_send failed\n"));
1458 return;
1460 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1463 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1465 int ret;
1467 if (pid == 0) {
1468 ret = messaging_dgm_wipe();
1469 } else {
1470 ret = messaging_dgm_cleanup(pid);
1473 return ret;
1476 struct tevent_context *messaging_tevent_context(
1477 struct messaging_context *msg_ctx)
1479 return msg_ctx->event_ctx;
1482 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1484 return msg_ctx->names_db;
1487 /** @} **/