lib/messaging: Move messages_dgm out of source3
[samba.git] / source3 / lib / messages.c
blobc63b027c61791c2cce73162728b086268ac5d9d3
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messaging/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messaging/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
70 struct messaging_callback {
71 struct messaging_callback *prev, *next;
72 uint32_t msg_type;
73 void (*fn)(struct messaging_context *msg, void *private_data,
74 uint32_t msg_type,
75 struct server_id server_id, DATA_BLOB *data);
76 void *private_data;
79 struct messaging_registered_ev {
80 struct tevent_context *ev;
81 struct tevent_immediate *im;
82 size_t refcount;
85 struct messaging_context {
86 struct server_id id;
87 struct tevent_context *event_ctx;
88 struct messaging_callback *callbacks;
90 struct messaging_rec *posted_msgs;
92 struct messaging_registered_ev *event_contexts;
94 struct tevent_req **new_waiters;
95 size_t num_new_waiters;
97 struct tevent_req **waiters;
98 size_t num_waiters;
100 struct server_id_db *names_db;
102 TALLOC_CTX *per_process_talloc_ctx;
105 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
106 struct messaging_rec *rec);
107 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
108 struct messaging_rec *rec);
109 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
110 struct tevent_context *ev,
111 struct messaging_rec *rec);
112 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
113 struct tevent_context *ev,
114 struct messaging_rec *rec);
116 /****************************************************************************
117 A useful function for testing the message system.
118 ****************************************************************************/
120 static void ping_message(struct messaging_context *msg_ctx,
121 void *private_data,
122 uint32_t msg_type,
123 struct server_id src,
124 DATA_BLOB *data)
126 struct server_id_buf idbuf;
128 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
129 server_id_str_buf(src, &idbuf), (int)data->length,
130 data->data ? (char *)data->data : ""));
132 messaging_send(msg_ctx, src, MSG_PONG, data);
135 struct messaging_rec *messaging_rec_create(
136 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
137 uint32_t msg_type, const struct iovec *iov, int iovlen,
138 const int *fds, size_t num_fds)
140 ssize_t buflen;
141 uint8_t *buf;
142 struct messaging_rec *result;
144 if (num_fds > INT8_MAX) {
145 return NULL;
148 buflen = iov_buflen(iov, iovlen);
149 if (buflen == -1) {
150 return NULL;
152 buf = talloc_array(mem_ctx, uint8_t, buflen);
153 if (buf == NULL) {
154 return NULL;
156 iov_buf(iov, iovlen, buf, buflen);
159 struct messaging_rec rec;
160 int64_t fds64[num_fds];
161 size_t i;
163 for (i=0; i<num_fds; i++) {
164 fds64[i] = fds[i];
167 rec = (struct messaging_rec) {
168 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
169 .src = src, .dest = dst,
170 .buf.data = buf, .buf.length = buflen,
171 .num_fds = num_fds, .fds = fds64,
174 result = messaging_rec_dup(mem_ctx, &rec);
177 TALLOC_FREE(buf);
179 return result;
182 static bool messaging_register_event_context(struct messaging_context *ctx,
183 struct tevent_context *ev)
185 size_t i, num_event_contexts;
186 struct messaging_registered_ev *free_reg = NULL;
187 struct messaging_registered_ev *tmp;
189 num_event_contexts = talloc_array_length(ctx->event_contexts);
191 for (i=0; i<num_event_contexts; i++) {
192 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
194 if (reg->refcount == 0) {
195 if (reg->ev != NULL) {
196 abort();
198 free_reg = reg;
200 * We continue here and may find another
201 * free_req, but the important thing is
202 * that we continue to search for an
203 * existing registration in the loop.
205 continue;
208 if (reg->ev == ev) {
209 reg->refcount += 1;
210 return true;
214 if (free_reg == NULL) {
215 struct tevent_immediate *im = NULL;
217 im = tevent_create_immediate(ctx);
218 if (im == NULL) {
219 return false;
222 tmp = talloc_realloc(ctx, ctx->event_contexts,
223 struct messaging_registered_ev,
224 num_event_contexts+1);
225 if (tmp == NULL) {
226 return false;
228 ctx->event_contexts = tmp;
230 free_reg = &ctx->event_contexts[num_event_contexts];
231 free_reg->im = talloc_move(ctx->event_contexts, &im);
235 * free_reg->im might be cached
237 free_reg->ev = ev;
238 free_reg->refcount = 1;
240 return true;
243 static bool messaging_deregister_event_context(struct messaging_context *ctx,
244 struct tevent_context *ev)
246 size_t i, num_event_contexts;
248 num_event_contexts = talloc_array_length(ctx->event_contexts);
250 for (i=0; i<num_event_contexts; i++) {
251 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
253 if (reg->refcount == 0) {
254 continue;
257 if (reg->ev == ev) {
258 reg->refcount -= 1;
260 if (reg->refcount == 0) {
262 * The primary event context
263 * is never unregistered using
264 * messaging_deregister_event_context()
265 * it's only registered using
266 * messaging_register_event_context().
268 SMB_ASSERT(ev != ctx->event_ctx);
269 SMB_ASSERT(reg->ev != ctx->event_ctx);
272 * Not strictly necessary, just
273 * paranoia
275 reg->ev = NULL;
278 * Do not talloc_free(reg->im),
279 * recycle immediates events.
281 * We just invalidate it using
282 * the primary event context,
283 * which is never unregistered.
285 tevent_schedule_immediate(reg->im,
286 ctx->event_ctx,
287 NULL, NULL);
289 return true;
292 return false;
295 static void messaging_post_main_event_context(struct tevent_context *ev,
296 struct tevent_immediate *im,
297 void *private_data)
299 struct messaging_context *ctx = talloc_get_type_abort(
300 private_data, struct messaging_context);
302 while (ctx->posted_msgs != NULL) {
303 struct messaging_rec *rec = ctx->posted_msgs;
304 bool consumed;
306 DLIST_REMOVE(ctx->posted_msgs, rec);
308 consumed = messaging_dispatch_classic(ctx, rec);
309 if (!consumed) {
310 consumed = messaging_dispatch_waiters(
311 ctx, ctx->event_ctx, rec);
314 if (!consumed) {
315 uint8_t i;
317 for (i=0; i<rec->num_fds; i++) {
318 close(rec->fds[i]);
322 TALLOC_FREE(rec);
326 static void messaging_post_sub_event_context(struct tevent_context *ev,
327 struct tevent_immediate *im,
328 void *private_data)
330 struct messaging_context *ctx = talloc_get_type_abort(
331 private_data, struct messaging_context);
332 struct messaging_rec *rec, *next;
334 for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
335 bool consumed;
337 next = rec->next;
339 consumed = messaging_dispatch_waiters(ctx, ev, rec);
340 if (consumed) {
341 DLIST_REMOVE(ctx->posted_msgs, rec);
342 TALLOC_FREE(rec);
347 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
349 size_t i, num_event_contexts;
351 num_event_contexts = talloc_array_length(ctx->event_contexts);
353 for (i=0; i<num_event_contexts; i++) {
354 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
356 if (reg->refcount == 0) {
357 continue;
361 * We depend on schedule_immediate to work
362 * multiple times. Might be a bit inefficient,
363 * but this needs to be proven in tests. The
364 * alternatively would be to track whether the
365 * immediate has already been scheduled. For
366 * now, avoid that complexity here.
369 if (reg->ev == ctx->event_ctx) {
370 tevent_schedule_immediate(
371 reg->im, reg->ev,
372 messaging_post_main_event_context,
373 ctx);
374 } else {
375 tevent_schedule_immediate(
376 reg->im, reg->ev,
377 messaging_post_sub_event_context,
378 ctx);
382 return true;
385 static void messaging_recv_cb(struct tevent_context *ev,
386 const uint8_t *msg, size_t msg_len,
387 int *fds, size_t num_fds,
388 void *private_data)
390 struct messaging_context *msg_ctx = talloc_get_type_abort(
391 private_data, struct messaging_context);
392 struct server_id_buf idbuf;
393 struct messaging_rec rec;
394 int64_t fds64[MIN(num_fds, INT8_MAX)];
395 size_t i;
397 if (msg_len < MESSAGE_HDR_LENGTH) {
398 DBG_WARNING("message too short: %zu\n", msg_len);
399 goto close_fail;
402 if (num_fds > INT8_MAX) {
403 DBG_WARNING("too many fds: %zu\n", num_fds);
404 goto close_fail;
408 * "consume" the fds by copying them and setting
409 * the original variable to -1
411 for (i=0; i < num_fds; i++) {
412 fds64[i] = fds[i];
413 fds[i] = -1;
416 rec = (struct messaging_rec) {
417 .msg_version = MESSAGE_VERSION,
418 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
419 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
420 .num_fds = num_fds,
421 .fds = fds64,
424 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
426 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
427 (unsigned)rec.msg_type, rec.buf.length, num_fds,
428 server_id_str_buf(rec.src, &idbuf));
430 if (server_id_same_process(&rec.src, &msg_ctx->id)) {
431 DBG_DEBUG("Ignoring self-send\n");
432 goto close_fail;
435 messaging_dispatch_rec(msg_ctx, ev, &rec);
436 return;
438 close_fail:
439 for (i=0; i < num_fds; i++) {
440 close(fds[i]);
444 static int messaging_context_destructor(struct messaging_context *ctx)
446 size_t i;
448 for (i=0; i<ctx->num_new_waiters; i++) {
449 if (ctx->new_waiters[i] != NULL) {
450 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
451 ctx->new_waiters[i] = NULL;
454 for (i=0; i<ctx->num_waiters; i++) {
455 if (ctx->waiters[i] != NULL) {
456 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
457 ctx->waiters[i] = NULL;
462 * The immediates from messaging_alert_event_contexts
463 * reference "ctx". Don't let them outlive the
464 * messaging_context we're destroying here.
466 TALLOC_FREE(ctx->event_contexts);
468 return 0;
471 static const char *private_path(const char *name)
473 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
476 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
477 struct tevent_context *ev,
478 struct messaging_context **pmsg_ctx)
480 TALLOC_CTX *frame;
481 struct messaging_context *ctx;
482 NTSTATUS status;
483 int ret;
484 const char *lck_path;
485 const char *priv_path;
486 void *ref;
487 bool ok;
490 * sec_init() *must* be called before any other
491 * functions that use sec_XXX(). e.g. sec_initial_uid().
494 sec_init();
496 lck_path = lock_path(talloc_tos(), "msg.lock");
497 if (lck_path == NULL) {
498 return NT_STATUS_NO_MEMORY;
501 ok = directory_create_or_exist_strict(lck_path,
502 sec_initial_uid(),
503 0755);
504 if (!ok) {
505 DBG_DEBUG("Could not create lock directory: %s\n",
506 strerror(errno));
507 return NT_STATUS_ACCESS_DENIED;
510 priv_path = private_path("msg.sock");
511 if (priv_path == NULL) {
512 return NT_STATUS_NO_MEMORY;
515 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
516 0700);
517 if (!ok) {
518 DBG_DEBUG("Could not create msg directory: %s\n",
519 strerror(errno));
520 return NT_STATUS_ACCESS_DENIED;
523 frame = talloc_stackframe();
524 if (frame == NULL) {
525 return NT_STATUS_NO_MEMORY;
528 ctx = talloc_zero(frame, struct messaging_context);
529 if (ctx == NULL) {
530 status = NT_STATUS_NO_MEMORY;
531 goto done;
534 ctx->id = (struct server_id) {
535 .pid = getpid(), .vnn = NONCLUSTER_VNN
538 ctx->event_ctx = ev;
540 ctx->per_process_talloc_ctx = talloc_new(ctx);
541 if (ctx->per_process_talloc_ctx == NULL) {
542 status = NT_STATUS_NO_MEMORY;
543 goto done;
546 ok = messaging_register_event_context(ctx, ev);
547 if (!ok) {
548 status = NT_STATUS_NO_MEMORY;
549 goto done;
552 ref = messaging_dgm_ref(
553 ctx->per_process_talloc_ctx,
554 ctx->event_ctx,
555 &ctx->id.unique_id,
556 priv_path,
557 lck_path,
558 messaging_recv_cb,
559 ctx,
560 &ret);
561 if (ref == NULL) {
562 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
563 status = map_nt_error_from_unix(ret);
564 goto done;
566 talloc_set_destructor(ctx, messaging_context_destructor);
568 #ifdef CLUSTER_SUPPORT
569 if (lp_clustering()) {
570 ref = messaging_ctdb_ref(
571 ctx->per_process_talloc_ctx,
572 ctx->event_ctx,
573 lp_ctdbd_socket(),
574 lp_ctdb_timeout(),
575 ctx->id.unique_id,
576 messaging_recv_cb,
577 ctx,
578 &ret);
579 if (ref == NULL) {
580 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
581 strerror(ret));
582 status = map_nt_error_from_unix(ret);
583 goto done;
586 #endif
588 ctx->id.vnn = get_my_vnn();
590 ctx->names_db = server_id_db_init(ctx,
591 ctx->id,
592 lp_lock_directory(),
594 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
595 if (ctx->names_db == NULL) {
596 DBG_DEBUG("server_id_db_init failed\n");
597 status = NT_STATUS_NO_MEMORY;
598 goto done;
601 messaging_register(ctx, NULL, MSG_PING, ping_message);
603 /* Register some debugging related messages */
605 register_msg_pool_usage(ctx->per_process_talloc_ctx, ctx);
606 register_dmalloc_msgs(ctx);
607 debug_register_msgs(ctx);
610 struct server_id_buf tmp;
611 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
614 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
616 status = NT_STATUS_OK;
617 done:
618 TALLOC_FREE(frame);
620 return status;
623 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
624 struct tevent_context *ev)
626 struct messaging_context *ctx = NULL;
627 NTSTATUS status;
629 status = messaging_init_internal(mem_ctx,
631 &ctx);
632 if (!NT_STATUS_IS_OK(status)) {
633 return NULL;
636 return ctx;
639 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
641 return msg_ctx->id;
645 * re-init after a fork
647 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
649 int ret;
650 char *lck_path;
651 void *ref;
653 TALLOC_FREE(msg_ctx->per_process_talloc_ctx);
655 msg_ctx->per_process_talloc_ctx = talloc_new(msg_ctx);
656 if (msg_ctx->per_process_talloc_ctx == NULL) {
657 return NT_STATUS_NO_MEMORY;
660 msg_ctx->id = (struct server_id) {
661 .pid = getpid(), .vnn = msg_ctx->id.vnn
664 lck_path = lock_path(talloc_tos(), "msg.lock");
665 if (lck_path == NULL) {
666 return NT_STATUS_NO_MEMORY;
669 ref = messaging_dgm_ref(
670 msg_ctx->per_process_talloc_ctx,
671 msg_ctx->event_ctx,
672 &msg_ctx->id.unique_id,
673 private_path("msg.sock"),
674 lck_path,
675 messaging_recv_cb,
676 msg_ctx,
677 &ret);
679 if (ref == NULL) {
680 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
681 return map_nt_error_from_unix(ret);
684 if (lp_clustering()) {
685 ref = messaging_ctdb_ref(
686 msg_ctx->per_process_talloc_ctx,
687 msg_ctx->event_ctx,
688 lp_ctdbd_socket(),
689 lp_ctdb_timeout(),
690 msg_ctx->id.unique_id,
691 messaging_recv_cb,
692 msg_ctx,
693 &ret);
694 if (ref == NULL) {
695 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
696 strerror(ret));
697 return map_nt_error_from_unix(ret);
701 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
702 register_msg_pool_usage(msg_ctx->per_process_talloc_ctx, msg_ctx);
704 return NT_STATUS_OK;
709 * Register a dispatch function for a particular message type. Allow multiple
710 * registrants
712 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
713 void *private_data,
714 uint32_t msg_type,
715 void (*fn)(struct messaging_context *msg,
716 void *private_data,
717 uint32_t msg_type,
718 struct server_id server_id,
719 DATA_BLOB *data))
721 struct messaging_callback *cb;
723 DEBUG(5, ("Registering messaging pointer for type %u - "
724 "private_data=%p\n",
725 (unsigned)msg_type, private_data));
728 * Only one callback per type
731 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
732 /* we allow a second registration of the same message
733 type if it has a different private pointer. This is
734 needed in, for example, the internal notify code,
735 which creates a new notify context for each tree
736 connect, and expects to receive messages to each of
737 them. */
738 if (cb->msg_type == msg_type && private_data == cb->private_data) {
739 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
740 (unsigned)msg_type, private_data));
741 cb->fn = fn;
742 cb->private_data = private_data;
743 return NT_STATUS_OK;
747 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
748 return NT_STATUS_NO_MEMORY;
751 cb->msg_type = msg_type;
752 cb->fn = fn;
753 cb->private_data = private_data;
755 DLIST_ADD(msg_ctx->callbacks, cb);
756 return NT_STATUS_OK;
760 De-register the function for a particular message type.
762 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
763 void *private_data)
765 struct messaging_callback *cb, *next;
767 for (cb = ctx->callbacks; cb; cb = next) {
768 next = cb->next;
769 if ((cb->msg_type == msg_type)
770 && (cb->private_data == private_data)) {
771 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
772 (unsigned)msg_type, private_data));
773 DLIST_REMOVE(ctx->callbacks, cb);
774 TALLOC_FREE(cb);
780 Send a message to a particular server
782 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
783 struct server_id server, uint32_t msg_type,
784 const DATA_BLOB *data)
786 struct iovec iov = {0};
788 if (data != NULL) {
789 iov.iov_base = data->data;
790 iov.iov_len = data->length;
793 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
796 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
797 struct server_id server, uint32_t msg_type,
798 const uint8_t *buf, size_t len)
800 DATA_BLOB blob = data_blob_const(buf, len);
801 return messaging_send(msg_ctx, server, msg_type, &blob);
804 static int messaging_post_self(struct messaging_context *msg_ctx,
805 struct server_id src, struct server_id dst,
806 uint32_t msg_type,
807 const struct iovec *iov, int iovlen,
808 const int *fds, size_t num_fds)
810 struct messaging_rec *rec;
811 bool ok;
813 rec = messaging_rec_create(
814 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
815 if (rec == NULL) {
816 return ENOMEM;
819 ok = messaging_alert_event_contexts(msg_ctx);
820 if (!ok) {
821 TALLOC_FREE(rec);
822 return ENOMEM;
825 DLIST_ADD_END(msg_ctx->posted_msgs, rec);
827 return 0;
830 int messaging_send_iov_from(struct messaging_context *msg_ctx,
831 struct server_id src, struct server_id dst,
832 uint32_t msg_type,
833 const struct iovec *iov, int iovlen,
834 const int *fds, size_t num_fds)
836 int ret;
837 uint8_t hdr[MESSAGE_HDR_LENGTH];
838 struct iovec iov2[iovlen+1];
840 if (server_id_is_disconnected(&dst)) {
841 return EINVAL;
844 if (num_fds > INT8_MAX) {
845 return EINVAL;
848 if (server_id_equal(&dst, &msg_ctx->id)) {
849 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
850 iov, iovlen, fds, num_fds);
851 return ret;
854 message_hdr_put(hdr, msg_type, src, dst);
855 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
856 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
858 if (dst.vnn != msg_ctx->id.vnn) {
859 if (num_fds > 0) {
860 return ENOSYS;
863 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
864 return ret;
867 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
869 if (ret == EACCES) {
870 become_root();
871 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
872 fds, num_fds);
873 unbecome_root();
876 if (ret == ECONNREFUSED) {
878 * Linux returns this when a socket exists in the file
879 * system without a listening process. This is not
880 * documented in susv4 or the linux manpages, but it's
881 * easily testable. For the higher levels this is the
882 * same as "destination does not exist"
884 ret = ENOENT;
887 return ret;
890 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
891 struct server_id dst, uint32_t msg_type,
892 const struct iovec *iov, int iovlen,
893 const int *fds, size_t num_fds)
895 int ret;
897 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
898 iov, iovlen, fds, num_fds);
899 if (ret != 0) {
900 return map_nt_error_from_unix(ret);
902 return NT_STATUS_OK;
905 struct send_all_state {
906 struct messaging_context *msg_ctx;
907 int msg_type;
908 const void *buf;
909 size_t len;
912 static int send_all_fn(pid_t pid, void *private_data)
914 struct send_all_state *state = private_data;
915 NTSTATUS status;
917 if (pid == getpid()) {
918 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
919 return 0;
922 status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
923 state->msg_type, state->buf, state->len);
924 if (!NT_STATUS_IS_OK(status)) {
925 DBG_NOTICE("messaging_send_buf to %ju failed: %s\n",
926 (uintmax_t)pid, nt_errstr(status));
929 return 0;
932 void messaging_send_all(struct messaging_context *msg_ctx,
933 int msg_type, const void *buf, size_t len)
935 struct send_all_state state = {
936 .msg_ctx = msg_ctx, .msg_type = msg_type,
937 .buf = buf, .len = len
939 int ret;
941 #ifdef CLUSTER_SUPPORT
942 if (lp_clustering()) {
943 struct ctdbd_connection *conn = messaging_ctdb_connection();
944 uint8_t msghdr[MESSAGE_HDR_LENGTH];
945 struct iovec iov[] = {
946 { .iov_base = msghdr,
947 .iov_len = sizeof(msghdr) },
948 { .iov_base = discard_const_p(void, buf),
949 .iov_len = len }
952 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
953 (struct server_id) {0});
955 ret = ctdbd_messaging_send_iov(
956 conn, CTDB_BROADCAST_CONNECTED,
957 CTDB_SRVID_SAMBA_PROCESS,
958 iov, ARRAY_SIZE(iov));
959 if (ret != 0) {
960 DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
961 strerror(ret));
964 return;
966 #endif
968 ret = messaging_dgm_forall(send_all_fn, &state);
969 if (ret != 0) {
970 DBG_WARNING("messaging_dgm_forall failed: %s\n",
971 strerror(ret));
975 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
976 struct messaging_rec *rec)
978 struct messaging_rec *result;
979 size_t fds_size = sizeof(int64_t) * rec->num_fds;
980 size_t payload_len;
982 payload_len = rec->buf.length + fds_size;
983 if (payload_len < rec->buf.length) {
984 /* overflow */
985 return NULL;
988 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
989 payload_len);
990 if (result == NULL) {
991 return NULL;
993 *result = *rec;
995 /* Doesn't fail, see talloc_pooled_object */
997 result->buf.data = talloc_memdup(result, rec->buf.data,
998 rec->buf.length);
1000 result->fds = NULL;
1001 if (result->num_fds > 0) {
1002 result->fds = talloc_memdup(result, rec->fds, fds_size);
1005 return result;
1008 struct messaging_filtered_read_state {
1009 struct tevent_context *ev;
1010 struct messaging_context *msg_ctx;
1011 struct messaging_dgm_fde *fde;
1012 struct messaging_ctdb_fde *cluster_fde;
1014 bool (*filter)(struct messaging_rec *rec, void *private_data);
1015 void *private_data;
1017 struct messaging_rec *rec;
1020 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1021 enum tevent_req_state req_state);
1023 struct tevent_req *messaging_filtered_read_send(
1024 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1025 struct messaging_context *msg_ctx,
1026 bool (*filter)(struct messaging_rec *rec, void *private_data),
1027 void *private_data)
1029 struct tevent_req *req;
1030 struct messaging_filtered_read_state *state;
1031 size_t new_waiters_len;
1032 bool ok;
1034 req = tevent_req_create(mem_ctx, &state,
1035 struct messaging_filtered_read_state);
1036 if (req == NULL) {
1037 return NULL;
1039 state->ev = ev;
1040 state->msg_ctx = msg_ctx;
1041 state->filter = filter;
1042 state->private_data = private_data;
1045 * We have to defer the callback here, as we might be called from
1046 * within a different tevent_context than state->ev
1048 tevent_req_defer_callback(req, state->ev);
1050 state->fde = messaging_dgm_register_tevent_context(state, ev);
1051 if (tevent_req_nomem(state->fde, req)) {
1052 return tevent_req_post(req, ev);
1055 if (lp_clustering()) {
1056 state->cluster_fde =
1057 messaging_ctdb_register_tevent_context(state, ev);
1058 if (tevent_req_nomem(state->cluster_fde, req)) {
1059 return tevent_req_post(req, ev);
1064 * We add ourselves to the "new_waiters" array, not the "waiters"
1065 * array. If we are called from within messaging_read_done,
1066 * messaging_dispatch_rec will be in an active for-loop on
1067 * "waiters". We must be careful not to mess with this array, because
1068 * it could mean that a single event is being delivered twice.
1071 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1073 if (new_waiters_len == msg_ctx->num_new_waiters) {
1074 struct tevent_req **tmp;
1076 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1077 struct tevent_req *, new_waiters_len+1);
1078 if (tevent_req_nomem(tmp, req)) {
1079 return tevent_req_post(req, ev);
1081 msg_ctx->new_waiters = tmp;
1084 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1085 msg_ctx->num_new_waiters += 1;
1086 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1088 ok = messaging_register_event_context(msg_ctx, ev);
1089 if (!ok) {
1090 tevent_req_oom(req);
1091 return tevent_req_post(req, ev);
1094 return req;
1097 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1098 enum tevent_req_state req_state)
1100 struct messaging_filtered_read_state *state = tevent_req_data(
1101 req, struct messaging_filtered_read_state);
1102 struct messaging_context *msg_ctx = state->msg_ctx;
1103 size_t i;
1104 bool ok;
1106 tevent_req_set_cleanup_fn(req, NULL);
1108 TALLOC_FREE(state->fde);
1109 TALLOC_FREE(state->cluster_fde);
1111 ok = messaging_deregister_event_context(msg_ctx, state->ev);
1112 if (!ok) {
1113 abort();
1117 * Just set the [new_]waiters entry to NULL, be careful not to mess
1118 * with the other "waiters" array contents. We are often called from
1119 * within "messaging_dispatch_rec", which loops over
1120 * "waiters". Messing with the "waiters" array will mess up that
1121 * for-loop.
1124 for (i=0; i<msg_ctx->num_waiters; i++) {
1125 if (msg_ctx->waiters[i] == req) {
1126 msg_ctx->waiters[i] = NULL;
1127 return;
1131 for (i=0; i<msg_ctx->num_new_waiters; i++) {
1132 if (msg_ctx->new_waiters[i] == req) {
1133 msg_ctx->new_waiters[i] = NULL;
1134 return;
1139 static void messaging_filtered_read_done(struct tevent_req *req,
1140 struct messaging_rec *rec)
1142 struct messaging_filtered_read_state *state = tevent_req_data(
1143 req, struct messaging_filtered_read_state);
1145 state->rec = messaging_rec_dup(state, rec);
1146 if (tevent_req_nomem(state->rec, req)) {
1147 return;
1149 tevent_req_done(req);
1152 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1153 struct messaging_rec **presult)
1155 struct messaging_filtered_read_state *state = tevent_req_data(
1156 req, struct messaging_filtered_read_state);
1157 int err;
1159 if (tevent_req_is_unix_error(req, &err)) {
1160 tevent_req_received(req);
1161 return err;
1163 if (presult != NULL) {
1164 *presult = talloc_move(mem_ctx, &state->rec);
1166 return 0;
1169 struct messaging_read_state {
1170 uint32_t msg_type;
1171 struct messaging_rec *rec;
1174 static bool messaging_read_filter(struct messaging_rec *rec,
1175 void *private_data);
1176 static void messaging_read_done(struct tevent_req *subreq);
1178 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1179 struct tevent_context *ev,
1180 struct messaging_context *msg,
1181 uint32_t msg_type)
1183 struct tevent_req *req, *subreq;
1184 struct messaging_read_state *state;
1186 req = tevent_req_create(mem_ctx, &state,
1187 struct messaging_read_state);
1188 if (req == NULL) {
1189 return NULL;
1191 state->msg_type = msg_type;
1193 subreq = messaging_filtered_read_send(state, ev, msg,
1194 messaging_read_filter, state);
1195 if (tevent_req_nomem(subreq, req)) {
1196 return tevent_req_post(req, ev);
1198 tevent_req_set_callback(subreq, messaging_read_done, req);
1199 return req;
1202 static bool messaging_read_filter(struct messaging_rec *rec,
1203 void *private_data)
1205 struct messaging_read_state *state = talloc_get_type_abort(
1206 private_data, struct messaging_read_state);
1208 if (rec->num_fds != 0) {
1209 return false;
1212 return rec->msg_type == state->msg_type;
1215 static void messaging_read_done(struct tevent_req *subreq)
1217 struct tevent_req *req = tevent_req_callback_data(
1218 subreq, struct tevent_req);
1219 struct messaging_read_state *state = tevent_req_data(
1220 req, struct messaging_read_state);
1221 int ret;
1223 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1224 TALLOC_FREE(subreq);
1225 if (tevent_req_error(req, ret)) {
1226 return;
1228 tevent_req_done(req);
1231 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1232 struct messaging_rec **presult)
1234 struct messaging_read_state *state = tevent_req_data(
1235 req, struct messaging_read_state);
1236 int err;
1238 if (tevent_req_is_unix_error(req, &err)) {
1239 return err;
1241 if (presult != NULL) {
1242 *presult = talloc_move(mem_ctx, &state->rec);
1244 return 0;
1247 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1249 if (msg_ctx->num_new_waiters == 0) {
1250 return true;
1253 if (talloc_array_length(msg_ctx->waiters) <
1254 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1255 struct tevent_req **tmp;
1256 tmp = talloc_realloc(
1257 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1258 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1259 if (tmp == NULL) {
1260 DEBUG(1, ("%s: talloc failed\n", __func__));
1261 return false;
1263 msg_ctx->waiters = tmp;
1266 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1267 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1269 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1270 msg_ctx->num_new_waiters = 0;
1272 return true;
1275 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1276 struct messaging_rec *rec)
1278 struct messaging_callback *cb, *next;
1280 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1281 size_t j;
1283 next = cb->next;
1284 if (cb->msg_type != rec->msg_type) {
1285 continue;
1289 * the old style callbacks don't support fd passing
1291 for (j=0; j < rec->num_fds; j++) {
1292 int fd = rec->fds[j];
1293 close(fd);
1295 rec->num_fds = 0;
1296 rec->fds = NULL;
1298 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1299 rec->src, &rec->buf);
1301 return true;
1304 return false;
1307 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1308 struct tevent_context *ev,
1309 struct messaging_rec *rec)
1311 size_t i;
1313 if (!messaging_append_new_waiters(msg_ctx)) {
1314 return false;
1317 i = 0;
1318 while (i < msg_ctx->num_waiters) {
1319 struct tevent_req *req;
1320 struct messaging_filtered_read_state *state;
1322 req = msg_ctx->waiters[i];
1323 if (req == NULL) {
1325 * This got cleaned up. In the meantime,
1326 * move everything down one. We need
1327 * to keep the order of waiters, as
1328 * other code may depend on this.
1330 ARRAY_DEL_ELEMENT(
1331 msg_ctx->waiters, i, msg_ctx->num_waiters);
1332 msg_ctx->num_waiters -= 1;
1333 continue;
1336 state = tevent_req_data(
1337 req, struct messaging_filtered_read_state);
1338 if ((ev == state->ev) &&
1339 state->filter(rec, state->private_data)) {
1340 messaging_filtered_read_done(req, rec);
1341 return true;
1344 i += 1;
1347 return false;
1351 Dispatch one messaging_rec
1353 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1354 struct tevent_context *ev,
1355 struct messaging_rec *rec)
1357 bool consumed;
1358 size_t i;
1360 if (ev == msg_ctx->event_ctx) {
1361 consumed = messaging_dispatch_classic(msg_ctx, rec);
1362 if (consumed) {
1363 return;
1367 consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1368 if (consumed) {
1369 return;
1372 if (ev != msg_ctx->event_ctx) {
1373 struct iovec iov;
1374 int fds[rec->num_fds];
1375 int ret;
1378 * We've been listening on a nested event
1379 * context. Messages need to be handled in the main
1380 * event context, so post to ourselves
1383 iov.iov_base = rec->buf.data;
1384 iov.iov_len = rec->buf.length;
1386 for (i=0; i<rec->num_fds; i++) {
1387 fds[i] = rec->fds[i];
1390 ret = messaging_post_self(
1391 msg_ctx, rec->src, rec->dest, rec->msg_type,
1392 &iov, 1, fds, rec->num_fds);
1393 if (ret == 0) {
1394 return;
1399 * If the fd-array isn't used, just close it.
1401 for (i=0; i < rec->num_fds; i++) {
1402 int fd = rec->fds[i];
1403 close(fd);
1405 rec->num_fds = 0;
1406 rec->fds = NULL;
1409 static int mess_parent_dgm_cleanup(void *private_data);
1410 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1412 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1414 struct tevent_req *req;
1416 req = background_job_send(
1417 msg, msg->event_ctx, msg, NULL, 0,
1418 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1419 60*15),
1420 mess_parent_dgm_cleanup, msg);
1421 if (req == NULL) {
1422 DBG_WARNING("background_job_send failed\n");
1423 return false;
1425 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1426 return true;
1429 static int mess_parent_dgm_cleanup(void *private_data)
1431 int ret;
1433 ret = messaging_dgm_wipe();
1434 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1435 ret ? strerror(ret) : "ok"));
1436 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1437 60*15);
1440 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1442 struct messaging_context *msg = tevent_req_callback_data(
1443 req, struct messaging_context);
1444 NTSTATUS status;
1446 status = background_job_recv(req);
1447 TALLOC_FREE(req);
1448 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1449 nt_errstr(status)));
1451 req = background_job_send(
1452 msg, msg->event_ctx, msg, NULL, 0,
1453 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1454 60*15),
1455 mess_parent_dgm_cleanup, msg);
1456 if (req == NULL) {
1457 DEBUG(1, ("background_job_send failed\n"));
1458 return;
1460 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1463 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1465 int ret;
1467 if (pid == 0) {
1468 ret = messaging_dgm_wipe();
1469 } else {
1470 ret = messaging_dgm_cleanup(pid);
1473 return ret;
1476 struct tevent_context *messaging_tevent_context(
1477 struct messaging_context *msg_ctx)
1479 return msg_ctx->event_ctx;
1482 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1484 return msg_ctx->names_db;
1487 /** @} **/