tstream: Fix CID 1167982 Unchecked return value
[Samba.git] / source3 / lib / messages.c
blob5a31f3414d74ecc3c4776f7e276905ec1257f84d
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
70 struct messaging_callback {
71 struct messaging_callback *prev, *next;
72 uint32_t msg_type;
73 void (*fn)(struct messaging_context *msg, void *private_data,
74 uint32_t msg_type,
75 struct server_id server_id, DATA_BLOB *data);
76 void *private_data;
79 struct messaging_registered_ev {
80 struct tevent_context *ev;
81 struct tevent_immediate *im;
82 size_t refcount;
85 struct messaging_context {
86 struct server_id id;
87 struct tevent_context *event_ctx;
88 struct messaging_callback *callbacks;
90 struct messaging_rec *posted_msgs;
92 struct messaging_registered_ev *event_contexts;
94 struct tevent_req **new_waiters;
95 size_t num_new_waiters;
97 struct tevent_req **waiters;
98 size_t num_waiters;
100 void *msg_dgm_ref;
101 void *msg_ctdb_ref;
103 struct server_id_db *names_db;
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107 struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109 struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111 struct tevent_context *ev,
112 struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114 struct tevent_context *ev,
115 struct messaging_rec *rec);
117 /****************************************************************************
118 A useful function for testing the message system.
119 ****************************************************************************/
121 static void ping_message(struct messaging_context *msg_ctx,
122 void *private_data,
123 uint32_t msg_type,
124 struct server_id src,
125 DATA_BLOB *data)
127 struct server_id_buf idbuf;
129 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130 server_id_str_buf(src, &idbuf), (int)data->length,
131 data->data ? (char *)data->data : ""));
133 messaging_send(msg_ctx, src, MSG_PONG, data);
136 struct messaging_rec *messaging_rec_create(
137 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138 uint32_t msg_type, const struct iovec *iov, int iovlen,
139 const int *fds, size_t num_fds)
141 ssize_t buflen;
142 uint8_t *buf;
143 struct messaging_rec *result;
145 if (num_fds > INT8_MAX) {
146 return NULL;
149 buflen = iov_buflen(iov, iovlen);
150 if (buflen == -1) {
151 return NULL;
153 buf = talloc_array(mem_ctx, uint8_t, buflen);
154 if (buf == NULL) {
155 return NULL;
157 iov_buf(iov, iovlen, buf, buflen);
160 struct messaging_rec rec;
161 int64_t fds64[num_fds];
162 size_t i;
164 for (i=0; i<num_fds; i++) {
165 fds64[i] = fds[i];
168 rec = (struct messaging_rec) {
169 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170 .src = src, .dest = dst,
171 .buf.data = buf, .buf.length = buflen,
172 .num_fds = num_fds, .fds = fds64,
175 result = messaging_rec_dup(mem_ctx, &rec);
178 TALLOC_FREE(buf);
180 return result;
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184 struct tevent_context *ev)
186 size_t i, num_event_contexts;
187 struct messaging_registered_ev *free_reg = NULL;
188 struct messaging_registered_ev *tmp;
190 num_event_contexts = talloc_array_length(ctx->event_contexts);
192 for (i=0; i<num_event_contexts; i++) {
193 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
195 if (reg->ev == ev) {
196 reg->refcount += 1;
197 return true;
199 if (reg->refcount == 0) {
200 if (reg->ev != NULL) {
201 abort();
203 free_reg = reg;
207 if (free_reg == NULL) {
208 tmp = talloc_realloc(ctx, ctx->event_contexts,
209 struct messaging_registered_ev,
210 num_event_contexts+1);
211 if (tmp == NULL) {
212 return false;
214 ctx->event_contexts = tmp;
216 free_reg = &ctx->event_contexts[num_event_contexts];
219 *free_reg = (struct messaging_registered_ev) { .ev = ev, .refcount = 1 };
221 return true;
224 static bool messaging_deregister_event_context(struct messaging_context *ctx,
225 struct tevent_context *ev)
227 size_t i, num_event_contexts;
229 num_event_contexts = talloc_array_length(ctx->event_contexts);
231 for (i=0; i<num_event_contexts; i++) {
232 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
234 if (reg->ev == ev) {
235 if (reg->refcount == 0) {
236 return false;
238 reg->refcount -= 1;
240 if (reg->refcount == 0) {
242 * Not strictly necessary, just
243 * paranoia
245 reg->ev = NULL;
248 * Do not talloc_free(reg->im),
249 * recycle immediates events.
252 return true;
255 return false;
258 static void messaging_post_main_event_context(struct tevent_context *ev,
259 struct tevent_immediate *im,
260 void *private_data)
262 struct messaging_context *ctx = talloc_get_type_abort(
263 private_data, struct messaging_context);
265 while (ctx->posted_msgs != NULL) {
266 struct messaging_rec *rec = ctx->posted_msgs;
267 bool consumed;
269 DLIST_REMOVE(ctx->posted_msgs, rec);
271 consumed = messaging_dispatch_classic(ctx, rec);
272 if (!consumed) {
273 consumed = messaging_dispatch_waiters(
274 ctx, ctx->event_ctx, rec);
277 if (!consumed) {
278 uint8_t i;
280 for (i=0; i<rec->num_fds; i++) {
281 close(rec->fds[i]);
285 TALLOC_FREE(rec);
289 static void messaging_post_sub_event_context(struct tevent_context *ev,
290 struct tevent_immediate *im,
291 void *private_data)
293 struct messaging_context *ctx = talloc_get_type_abort(
294 private_data, struct messaging_context);
295 struct messaging_rec *rec, *next;
297 for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
298 bool consumed;
300 next = rec->next;
302 consumed = messaging_dispatch_waiters(ctx, ev, rec);
303 if (consumed) {
304 DLIST_REMOVE(ctx->posted_msgs, rec);
305 TALLOC_FREE(rec);
310 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
312 size_t i, num_event_contexts;
314 num_event_contexts = talloc_array_length(ctx->event_contexts);
316 for (i=0; i<num_event_contexts; i++) {
317 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
319 if (reg->refcount == 0) {
320 continue;
323 if (reg->im == NULL) {
324 reg->im = tevent_create_immediate(
325 ctx->event_contexts);
327 if (reg->im == NULL) {
328 DBG_WARNING("Could not create immediate\n");
329 continue;
333 * We depend on schedule_immediate to work
334 * multiple times. Might be a bit inefficient,
335 * but this needs to be proven in tests. The
336 * alternatively would be to track whether the
337 * immediate has already been scheduled. For
338 * now, avoid that complexity here.
341 if (reg->ev == ctx->event_ctx) {
342 tevent_schedule_immediate(
343 reg->im, reg->ev,
344 messaging_post_main_event_context,
345 ctx);
346 } else {
347 tevent_schedule_immediate(
348 reg->im, reg->ev,
349 messaging_post_sub_event_context,
350 ctx);
354 return true;
357 static void messaging_recv_cb(struct tevent_context *ev,
358 const uint8_t *msg, size_t msg_len,
359 int *fds, size_t num_fds,
360 void *private_data)
362 struct messaging_context *msg_ctx = talloc_get_type_abort(
363 private_data, struct messaging_context);
364 struct server_id_buf idbuf;
365 struct messaging_rec rec;
366 int64_t fds64[MIN(num_fds, INT8_MAX)];
367 size_t i;
369 if (msg_len < MESSAGE_HDR_LENGTH) {
370 DBG_WARNING("message too short: %zu\n", msg_len);
371 goto close_fail;
374 if (num_fds > INT8_MAX) {
375 DBG_WARNING("too many fds: %zu\n", num_fds);
376 goto close_fail;
380 * "consume" the fds by copying them and setting
381 * the original variable to -1
383 for (i=0; i < num_fds; i++) {
384 fds64[i] = fds[i];
385 fds[i] = -1;
388 rec = (struct messaging_rec) {
389 .msg_version = MESSAGE_VERSION,
390 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
391 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
392 .num_fds = num_fds,
393 .fds = fds64,
396 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
398 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
399 (unsigned)rec.msg_type, rec.buf.length, num_fds,
400 server_id_str_buf(rec.src, &idbuf));
402 if (server_id_same_process(&rec.src, &msg_ctx->id)) {
403 DBG_DEBUG("Ignoring self-send\n");
404 goto close_fail;
407 messaging_dispatch_rec(msg_ctx, ev, &rec);
408 return;
410 close_fail:
411 for (i=0; i < num_fds; i++) {
412 close(fds[i]);
416 static int messaging_context_destructor(struct messaging_context *ctx)
418 size_t i;
420 for (i=0; i<ctx->num_new_waiters; i++) {
421 if (ctx->new_waiters[i] != NULL) {
422 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
423 ctx->new_waiters[i] = NULL;
426 for (i=0; i<ctx->num_waiters; i++) {
427 if (ctx->waiters[i] != NULL) {
428 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
429 ctx->waiters[i] = NULL;
434 * The immediates from messaging_alert_event_contexts
435 * reference "ctx". Don't let them outlive the
436 * messaging_context we're destroying here.
438 TALLOC_FREE(ctx->event_contexts);
440 return 0;
443 static const char *private_path(const char *name)
445 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
448 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
449 struct tevent_context *ev,
450 struct messaging_context **pmsg_ctx)
452 TALLOC_CTX *frame;
453 struct messaging_context *ctx;
454 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
455 int ret;
456 const char *lck_path;
457 const char *priv_path;
458 bool ok;
461 * sec_init() *must* be called before any other
462 * functions that use sec_XXX(). e.g. sec_initial_uid().
465 sec_init();
467 lck_path = lock_path("msg.lock");
468 if (lck_path == NULL) {
469 return NT_STATUS_NO_MEMORY;
472 ok = directory_create_or_exist_strict(lck_path,
473 sec_initial_uid(),
474 0755);
475 if (!ok) {
476 DBG_DEBUG("Could not create lock directory: %s\n",
477 strerror(errno));
478 return NT_STATUS_ACCESS_DENIED;
481 priv_path = private_path("msg.sock");
482 if (priv_path == NULL) {
483 return NT_STATUS_NO_MEMORY;
486 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
487 0700);
488 if (!ok) {
489 DBG_DEBUG("Could not create msg directory: %s\n",
490 strerror(errno));
491 return NT_STATUS_ACCESS_DENIED;
494 frame = talloc_stackframe();
495 if (frame == NULL) {
496 return NT_STATUS_NO_MEMORY;
499 ctx = talloc_zero(frame, struct messaging_context);
500 if (ctx == NULL) {
501 status = NT_STATUS_NO_MEMORY;
502 goto done;
505 ctx->id = (struct server_id) {
506 .pid = getpid(), .vnn = NONCLUSTER_VNN
509 ctx->event_ctx = ev;
511 ok = messaging_register_event_context(ctx, ev);
512 if (!ok) {
513 status = NT_STATUS_NO_MEMORY;
514 goto done;
517 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
518 ctx->event_ctx,
519 &ctx->id.unique_id,
520 priv_path,
521 lck_path,
522 messaging_recv_cb,
523 ctx,
524 &ret);
525 if (ctx->msg_dgm_ref == NULL) {
526 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
527 status = map_nt_error_from_unix(ret);
528 goto done;
530 talloc_set_destructor(ctx, messaging_context_destructor);
532 #ifdef CLUSTER_SUPPORT
533 if (lp_clustering()) {
534 ctx->msg_ctdb_ref = messaging_ctdb_ref(
535 ctx, ctx->event_ctx,
536 lp_ctdbd_socket(), lp_ctdb_timeout(),
537 ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
538 if (ctx->msg_ctdb_ref == NULL) {
539 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
540 strerror(ret));
541 status = map_nt_error_from_unix(ret);
542 goto done;
545 #endif
547 ctx->id.vnn = get_my_vnn();
549 ctx->names_db = server_id_db_init(ctx,
550 ctx->id,
551 lp_lock_directory(),
553 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
554 if (ctx->names_db == NULL) {
555 DBG_DEBUG("server_id_db_init failed\n");
556 status = NT_STATUS_NO_MEMORY;
557 goto done;
560 messaging_register(ctx, NULL, MSG_PING, ping_message);
562 /* Register some debugging related messages */
564 register_msg_pool_usage(ctx);
565 register_dmalloc_msgs(ctx);
566 debug_register_msgs(ctx);
569 struct server_id_buf tmp;
570 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
573 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
575 status = NT_STATUS_OK;
576 done:
577 TALLOC_FREE(frame);
579 return status;
582 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
583 struct tevent_context *ev)
585 struct messaging_context *ctx = NULL;
586 NTSTATUS status;
588 status = messaging_init_internal(mem_ctx,
590 &ctx);
591 if (!NT_STATUS_IS_OK(status)) {
592 return NULL;
595 return ctx;
598 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
599 struct tevent_context *ev,
600 struct messaging_context **pmsg_ctx)
602 return messaging_init_internal(mem_ctx,
604 pmsg_ctx);
607 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
609 return msg_ctx->id;
613 * re-init after a fork
615 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
617 int ret;
618 char *lck_path;
620 TALLOC_FREE(msg_ctx->msg_dgm_ref);
621 TALLOC_FREE(msg_ctx->msg_ctdb_ref);
623 msg_ctx->id = (struct server_id) {
624 .pid = getpid(), .vnn = msg_ctx->id.vnn
627 lck_path = lock_path("msg.lock");
628 if (lck_path == NULL) {
629 return NT_STATUS_NO_MEMORY;
632 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
633 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
634 private_path("msg.sock"), lck_path,
635 messaging_recv_cb, msg_ctx, &ret);
637 if (msg_ctx->msg_dgm_ref == NULL) {
638 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
639 return map_nt_error_from_unix(ret);
642 if (lp_clustering()) {
643 msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
644 msg_ctx, msg_ctx->event_ctx,
645 lp_ctdbd_socket(), lp_ctdb_timeout(),
646 msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
647 &ret);
648 if (msg_ctx->msg_ctdb_ref == NULL) {
649 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
650 strerror(ret));
651 return map_nt_error_from_unix(ret);
655 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
657 return NT_STATUS_OK;
662 * Register a dispatch function for a particular message type. Allow multiple
663 * registrants
665 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
666 void *private_data,
667 uint32_t msg_type,
668 void (*fn)(struct messaging_context *msg,
669 void *private_data,
670 uint32_t msg_type,
671 struct server_id server_id,
672 DATA_BLOB *data))
674 struct messaging_callback *cb;
676 DEBUG(5, ("Registering messaging pointer for type %u - "
677 "private_data=%p\n",
678 (unsigned)msg_type, private_data));
681 * Only one callback per type
684 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
685 /* we allow a second registration of the same message
686 type if it has a different private pointer. This is
687 needed in, for example, the internal notify code,
688 which creates a new notify context for each tree
689 connect, and expects to receive messages to each of
690 them. */
691 if (cb->msg_type == msg_type && private_data == cb->private_data) {
692 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
693 (unsigned)msg_type, private_data));
694 cb->fn = fn;
695 cb->private_data = private_data;
696 return NT_STATUS_OK;
700 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
701 return NT_STATUS_NO_MEMORY;
704 cb->msg_type = msg_type;
705 cb->fn = fn;
706 cb->private_data = private_data;
708 DLIST_ADD(msg_ctx->callbacks, cb);
709 return NT_STATUS_OK;
713 De-register the function for a particular message type.
715 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
716 void *private_data)
718 struct messaging_callback *cb, *next;
720 for (cb = ctx->callbacks; cb; cb = next) {
721 next = cb->next;
722 if ((cb->msg_type == msg_type)
723 && (cb->private_data == private_data)) {
724 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
725 (unsigned)msg_type, private_data));
726 DLIST_REMOVE(ctx->callbacks, cb);
727 TALLOC_FREE(cb);
733 Send a message to a particular server
735 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
736 struct server_id server, uint32_t msg_type,
737 const DATA_BLOB *data)
739 struct iovec iov = {0};
741 if (data != NULL) {
742 iov.iov_base = data->data;
743 iov.iov_len = data->length;
746 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
749 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
750 struct server_id server, uint32_t msg_type,
751 const uint8_t *buf, size_t len)
753 DATA_BLOB blob = data_blob_const(buf, len);
754 return messaging_send(msg_ctx, server, msg_type, &blob);
757 static int messaging_post_self(struct messaging_context *msg_ctx,
758 struct server_id src, struct server_id dst,
759 uint32_t msg_type,
760 const struct iovec *iov, int iovlen,
761 const int *fds, size_t num_fds)
763 struct messaging_rec *rec;
764 bool ok;
766 rec = messaging_rec_create(
767 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
768 if (rec == NULL) {
769 return ENOMEM;
772 ok = messaging_alert_event_contexts(msg_ctx);
773 if (!ok) {
774 TALLOC_FREE(rec);
775 return ENOMEM;
778 DLIST_ADD_END(msg_ctx->posted_msgs, rec);
780 return 0;
783 int messaging_send_iov_from(struct messaging_context *msg_ctx,
784 struct server_id src, struct server_id dst,
785 uint32_t msg_type,
786 const struct iovec *iov, int iovlen,
787 const int *fds, size_t num_fds)
789 int ret;
790 uint8_t hdr[MESSAGE_HDR_LENGTH];
791 struct iovec iov2[iovlen+1];
793 if (server_id_is_disconnected(&dst)) {
794 return EINVAL;
797 if (num_fds > INT8_MAX) {
798 return EINVAL;
801 if (server_id_equal(&dst, &msg_ctx->id)) {
802 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
803 iov, iovlen, fds, num_fds);
804 return ret;
807 message_hdr_put(hdr, msg_type, src, dst);
808 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
809 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
811 if (dst.vnn != msg_ctx->id.vnn) {
812 if (num_fds > 0) {
813 return ENOSYS;
816 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
817 return ret;
820 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
822 if (ret == EACCES) {
823 become_root();
824 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
825 fds, num_fds);
826 unbecome_root();
829 if (ret == ECONNREFUSED) {
831 * Linux returns this when a socket exists in the file
832 * system without a listening process. This is not
833 * documented in susv4 or the linux manpages, but it's
834 * easily testable. For the higher levels this is the
835 * same as "destination does not exist"
837 ret = ENOENT;
840 return ret;
843 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
844 struct server_id dst, uint32_t msg_type,
845 const struct iovec *iov, int iovlen,
846 const int *fds, size_t num_fds)
848 int ret;
850 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
851 iov, iovlen, fds, num_fds);
852 if (ret != 0) {
853 return map_nt_error_from_unix(ret);
855 return NT_STATUS_OK;
858 struct send_all_state {
859 struct messaging_context *msg_ctx;
860 int msg_type;
861 const void *buf;
862 size_t len;
865 static int send_all_fn(pid_t pid, void *private_data)
867 struct send_all_state *state = private_data;
868 NTSTATUS status;
870 if (pid == getpid()) {
871 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
872 return 0;
875 status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
876 state->msg_type, state->buf, state->len);
877 if (!NT_STATUS_IS_OK(status)) {
878 DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
879 (uintmax_t)pid, nt_errstr(status));
882 return 0;
885 void messaging_send_all(struct messaging_context *msg_ctx,
886 int msg_type, const void *buf, size_t len)
888 struct send_all_state state = {
889 .msg_ctx = msg_ctx, .msg_type = msg_type,
890 .buf = buf, .len = len
892 int ret;
894 #ifdef CLUSTER_SUPPORT
895 if (lp_clustering()) {
896 struct ctdbd_connection *conn = messaging_ctdb_connection();
897 uint8_t msghdr[MESSAGE_HDR_LENGTH];
898 struct iovec iov[] = {
899 { .iov_base = msghdr,
900 .iov_len = sizeof(msghdr) },
901 { .iov_base = discard_const_p(void, buf),
902 .iov_len = len }
905 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
906 (struct server_id) {0});
908 ret = ctdbd_messaging_send_iov(
909 conn, CTDB_BROADCAST_CONNECTED,
910 CTDB_SRVID_SAMBA_PROCESS,
911 iov, ARRAY_SIZE(iov));
912 if (ret != 0) {
913 DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
914 strerror(ret));
917 return;
919 #endif
921 ret = messaging_dgm_forall(send_all_fn, &state);
922 if (ret != 0) {
923 DBG_WARNING("messaging_dgm_forall failed: %s\n",
924 strerror(ret));
928 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
929 struct messaging_rec *rec)
931 struct messaging_rec *result;
932 size_t fds_size = sizeof(int64_t) * rec->num_fds;
933 size_t payload_len;
935 payload_len = rec->buf.length + fds_size;
936 if (payload_len < rec->buf.length) {
937 /* overflow */
938 return NULL;
941 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
942 payload_len);
943 if (result == NULL) {
944 return NULL;
946 *result = *rec;
948 /* Doesn't fail, see talloc_pooled_object */
950 result->buf.data = talloc_memdup(result, rec->buf.data,
951 rec->buf.length);
953 result->fds = NULL;
954 if (result->num_fds > 0) {
955 result->fds = talloc_memdup(result, rec->fds, fds_size);
958 return result;
961 struct messaging_filtered_read_state {
962 struct tevent_context *ev;
963 struct messaging_context *msg_ctx;
964 struct messaging_dgm_fde *fde;
965 struct messaging_ctdb_fde *cluster_fde;
967 bool (*filter)(struct messaging_rec *rec, void *private_data);
968 void *private_data;
970 struct messaging_rec *rec;
973 static void messaging_filtered_read_cleanup(struct tevent_req *req,
974 enum tevent_req_state req_state);
976 struct tevent_req *messaging_filtered_read_send(
977 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
978 struct messaging_context *msg_ctx,
979 bool (*filter)(struct messaging_rec *rec, void *private_data),
980 void *private_data)
982 struct tevent_req *req;
983 struct messaging_filtered_read_state *state;
984 size_t new_waiters_len;
985 bool ok;
987 req = tevent_req_create(mem_ctx, &state,
988 struct messaging_filtered_read_state);
989 if (req == NULL) {
990 return NULL;
992 state->ev = ev;
993 state->msg_ctx = msg_ctx;
994 state->filter = filter;
995 state->private_data = private_data;
998 * We have to defer the callback here, as we might be called from
999 * within a different tevent_context than state->ev
1001 tevent_req_defer_callback(req, state->ev);
1003 state->fde = messaging_dgm_register_tevent_context(state, ev);
1004 if (tevent_req_nomem(state->fde, req)) {
1005 return tevent_req_post(req, ev);
1008 if (lp_clustering()) {
1009 state->cluster_fde =
1010 messaging_ctdb_register_tevent_context(state, ev);
1011 if (tevent_req_nomem(state->cluster_fde, req)) {
1012 return tevent_req_post(req, ev);
1017 * We add ourselves to the "new_waiters" array, not the "waiters"
1018 * array. If we are called from within messaging_read_done,
1019 * messaging_dispatch_rec will be in an active for-loop on
1020 * "waiters". We must be careful not to mess with this array, because
1021 * it could mean that a single event is being delivered twice.
1024 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1026 if (new_waiters_len == msg_ctx->num_new_waiters) {
1027 struct tevent_req **tmp;
1029 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1030 struct tevent_req *, new_waiters_len+1);
1031 if (tevent_req_nomem(tmp, req)) {
1032 return tevent_req_post(req, ev);
1034 msg_ctx->new_waiters = tmp;
1037 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1038 msg_ctx->num_new_waiters += 1;
1039 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1041 ok = messaging_register_event_context(msg_ctx, ev);
1042 if (!ok) {
1043 tevent_req_oom(req);
1044 return tevent_req_post(req, ev);
1047 return req;
1050 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1051 enum tevent_req_state req_state)
1053 struct messaging_filtered_read_state *state = tevent_req_data(
1054 req, struct messaging_filtered_read_state);
1055 struct messaging_context *msg_ctx = state->msg_ctx;
1056 size_t i;
1057 bool ok;
1059 tevent_req_set_cleanup_fn(req, NULL);
1061 TALLOC_FREE(state->fde);
1062 TALLOC_FREE(state->cluster_fde);
1064 ok = messaging_deregister_event_context(msg_ctx, state->ev);
1065 if (!ok) {
1066 abort();
1070 * Just set the [new_]waiters entry to NULL, be careful not to mess
1071 * with the other "waiters" array contents. We are often called from
1072 * within "messaging_dispatch_rec", which loops over
1073 * "waiters". Messing with the "waiters" array will mess up that
1074 * for-loop.
1077 for (i=0; i<msg_ctx->num_waiters; i++) {
1078 if (msg_ctx->waiters[i] == req) {
1079 msg_ctx->waiters[i] = NULL;
1080 return;
1084 for (i=0; i<msg_ctx->num_new_waiters; i++) {
1085 if (msg_ctx->new_waiters[i] == req) {
1086 msg_ctx->new_waiters[i] = NULL;
1087 return;
1092 static void messaging_filtered_read_done(struct tevent_req *req,
1093 struct messaging_rec *rec)
1095 struct messaging_filtered_read_state *state = tevent_req_data(
1096 req, struct messaging_filtered_read_state);
1098 state->rec = messaging_rec_dup(state, rec);
1099 if (tevent_req_nomem(state->rec, req)) {
1100 return;
1102 tevent_req_done(req);
1105 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1106 struct messaging_rec **presult)
1108 struct messaging_filtered_read_state *state = tevent_req_data(
1109 req, struct messaging_filtered_read_state);
1110 int err;
1112 if (tevent_req_is_unix_error(req, &err)) {
1113 tevent_req_received(req);
1114 return err;
1116 if (presult != NULL) {
1117 *presult = talloc_move(mem_ctx, &state->rec);
1119 return 0;
1122 struct messaging_read_state {
1123 uint32_t msg_type;
1124 struct messaging_rec *rec;
1127 static bool messaging_read_filter(struct messaging_rec *rec,
1128 void *private_data);
1129 static void messaging_read_done(struct tevent_req *subreq);
1131 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1132 struct tevent_context *ev,
1133 struct messaging_context *msg,
1134 uint32_t msg_type)
1136 struct tevent_req *req, *subreq;
1137 struct messaging_read_state *state;
1139 req = tevent_req_create(mem_ctx, &state,
1140 struct messaging_read_state);
1141 if (req == NULL) {
1142 return NULL;
1144 state->msg_type = msg_type;
1146 subreq = messaging_filtered_read_send(state, ev, msg,
1147 messaging_read_filter, state);
1148 if (tevent_req_nomem(subreq, req)) {
1149 return tevent_req_post(req, ev);
1151 tevent_req_set_callback(subreq, messaging_read_done, req);
1152 return req;
1155 static bool messaging_read_filter(struct messaging_rec *rec,
1156 void *private_data)
1158 struct messaging_read_state *state = talloc_get_type_abort(
1159 private_data, struct messaging_read_state);
1161 if (rec->num_fds != 0) {
1162 return false;
1165 return rec->msg_type == state->msg_type;
1168 static void messaging_read_done(struct tevent_req *subreq)
1170 struct tevent_req *req = tevent_req_callback_data(
1171 subreq, struct tevent_req);
1172 struct messaging_read_state *state = tevent_req_data(
1173 req, struct messaging_read_state);
1174 int ret;
1176 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1177 TALLOC_FREE(subreq);
1178 if (tevent_req_error(req, ret)) {
1179 return;
1181 tevent_req_done(req);
1184 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1185 struct messaging_rec **presult)
1187 struct messaging_read_state *state = tevent_req_data(
1188 req, struct messaging_read_state);
1189 int err;
1191 if (tevent_req_is_unix_error(req, &err)) {
1192 return err;
1194 if (presult != NULL) {
1195 *presult = talloc_move(mem_ctx, &state->rec);
1197 return 0;
1200 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1202 if (msg_ctx->num_new_waiters == 0) {
1203 return true;
1206 if (talloc_array_length(msg_ctx->waiters) <
1207 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1208 struct tevent_req **tmp;
1209 tmp = talloc_realloc(
1210 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1211 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1212 if (tmp == NULL) {
1213 DEBUG(1, ("%s: talloc failed\n", __func__));
1214 return false;
1216 msg_ctx->waiters = tmp;
1219 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1220 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1222 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1223 msg_ctx->num_new_waiters = 0;
1225 return true;
1228 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1229 struct messaging_rec *rec)
1231 struct messaging_callback *cb, *next;
1233 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1234 size_t j;
1236 next = cb->next;
1237 if (cb->msg_type != rec->msg_type) {
1238 continue;
1242 * the old style callbacks don't support fd passing
1244 for (j=0; j < rec->num_fds; j++) {
1245 int fd = rec->fds[j];
1246 close(fd);
1248 rec->num_fds = 0;
1249 rec->fds = NULL;
1251 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1252 rec->src, &rec->buf);
1254 return true;
1257 return false;
1260 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1261 struct tevent_context *ev,
1262 struct messaging_rec *rec)
1264 size_t i;
1266 if (!messaging_append_new_waiters(msg_ctx)) {
1267 return false;
1270 i = 0;
1271 while (i < msg_ctx->num_waiters) {
1272 struct tevent_req *req;
1273 struct messaging_filtered_read_state *state;
1275 req = msg_ctx->waiters[i];
1276 if (req == NULL) {
1278 * This got cleaned up. In the meantime,
1279 * move everything down one. We need
1280 * to keep the order of waiters, as
1281 * other code may depend on this.
1283 if (i < msg_ctx->num_waiters - 1) {
1284 memmove(&msg_ctx->waiters[i],
1285 &msg_ctx->waiters[i+1],
1286 sizeof(struct tevent_req *) *
1287 (msg_ctx->num_waiters - i - 1));
1289 msg_ctx->num_waiters -= 1;
1290 continue;
1293 state = tevent_req_data(
1294 req, struct messaging_filtered_read_state);
1295 if ((ev == state->ev) &&
1296 state->filter(rec, state->private_data)) {
1297 messaging_filtered_read_done(req, rec);
1298 return true;
1301 i += 1;
1304 return false;
1308 Dispatch one messaging_rec
1310 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1311 struct tevent_context *ev,
1312 struct messaging_rec *rec)
1314 bool consumed;
1315 size_t i;
1317 if (ev == msg_ctx->event_ctx) {
1318 consumed = messaging_dispatch_classic(msg_ctx, rec);
1319 if (consumed) {
1320 return;
1324 consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1325 if (consumed) {
1326 return;
1329 if (ev != msg_ctx->event_ctx) {
1330 struct iovec iov;
1331 int fds[rec->num_fds];
1332 int ret;
1335 * We've been listening on a nested event
1336 * context. Messages need to be handled in the main
1337 * event context, so post to ourselves
1340 iov.iov_base = rec->buf.data;
1341 iov.iov_len = rec->buf.length;
1343 for (i=0; i<rec->num_fds; i++) {
1344 fds[i] = rec->fds[i];
1347 ret = messaging_post_self(
1348 msg_ctx, rec->src, rec->dest, rec->msg_type,
1349 &iov, 1, fds, rec->num_fds);
1350 if (ret == 0) {
1351 return;
1356 * If the fd-array isn't used, just close it.
1358 for (i=0; i < rec->num_fds; i++) {
1359 int fd = rec->fds[i];
1360 close(fd);
1362 rec->num_fds = 0;
1363 rec->fds = NULL;
1366 static int mess_parent_dgm_cleanup(void *private_data);
1367 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1369 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1371 struct tevent_req *req;
1373 req = background_job_send(
1374 msg, msg->event_ctx, msg, NULL, 0,
1375 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1376 60*15),
1377 mess_parent_dgm_cleanup, msg);
1378 if (req == NULL) {
1379 DBG_WARNING("background_job_send failed\n");
1380 return false;
1382 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1383 return true;
1386 static int mess_parent_dgm_cleanup(void *private_data)
1388 int ret;
1390 ret = messaging_dgm_wipe();
1391 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1392 ret ? strerror(ret) : "ok"));
1393 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1394 60*15);
1397 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1399 struct messaging_context *msg = tevent_req_callback_data(
1400 req, struct messaging_context);
1401 NTSTATUS status;
1403 status = background_job_recv(req);
1404 TALLOC_FREE(req);
1405 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1406 nt_errstr(status)));
1408 req = background_job_send(
1409 msg, msg->event_ctx, msg, NULL, 0,
1410 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1411 60*15),
1412 mess_parent_dgm_cleanup, msg);
1413 if (req == NULL) {
1414 DEBUG(1, ("background_job_send failed\n"));
1415 return;
1417 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1420 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1422 int ret;
1424 if (pid == 0) {
1425 ret = messaging_dgm_wipe();
1426 } else {
1427 ret = messaging_dgm_cleanup(pid);
1430 return ret;
1433 struct tevent_context *messaging_tevent_context(
1434 struct messaging_context *msg_ctx)
1436 return msg_ctx->event_ctx;
1439 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1441 return msg_ctx->names_db;
1444 /** @} **/