lib:util: Move discard_const(_p) to own header for libndr.h
[Samba.git] / source3 / lib / messages.c
blobdd19173b9736707bccecfb27569a08d2af70f2ac
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
70 struct messaging_callback {
71 struct messaging_callback *prev, *next;
72 uint32_t msg_type;
73 void (*fn)(struct messaging_context *msg, void *private_data,
74 uint32_t msg_type,
75 struct server_id server_id, DATA_BLOB *data);
76 void *private_data;
79 struct messaging_registered_ev {
80 struct tevent_context *ev;
81 struct tevent_immediate *im;
82 size_t refcount;
85 struct messaging_context {
86 struct server_id id;
87 struct tevent_context *event_ctx;
88 struct messaging_callback *callbacks;
90 struct messaging_rec *posted_msgs;
92 struct messaging_registered_ev *event_contexts;
94 struct tevent_req **new_waiters;
95 size_t num_new_waiters;
97 struct tevent_req **waiters;
98 size_t num_waiters;
100 void *msg_dgm_ref;
101 void *msg_ctdb_ref;
103 struct server_id_db *names_db;
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107 struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109 struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111 struct tevent_context *ev,
112 struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114 struct tevent_context *ev,
115 struct messaging_rec *rec);
117 /****************************************************************************
118 A useful function for testing the message system.
119 ****************************************************************************/
121 static void ping_message(struct messaging_context *msg_ctx,
122 void *private_data,
123 uint32_t msg_type,
124 struct server_id src,
125 DATA_BLOB *data)
127 struct server_id_buf idbuf;
129 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130 server_id_str_buf(src, &idbuf), (int)data->length,
131 data->data ? (char *)data->data : ""));
133 messaging_send(msg_ctx, src, MSG_PONG, data);
136 struct messaging_rec *messaging_rec_create(
137 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138 uint32_t msg_type, const struct iovec *iov, int iovlen,
139 const int *fds, size_t num_fds)
141 ssize_t buflen;
142 uint8_t *buf;
143 struct messaging_rec *result;
145 if (num_fds > INT8_MAX) {
146 return NULL;
149 buflen = iov_buflen(iov, iovlen);
150 if (buflen == -1) {
151 return NULL;
153 buf = talloc_array(mem_ctx, uint8_t, buflen);
154 if (buf == NULL) {
155 return NULL;
157 iov_buf(iov, iovlen, buf, buflen);
160 struct messaging_rec rec;
161 int64_t fds64[num_fds];
162 size_t i;
164 for (i=0; i<num_fds; i++) {
165 fds64[i] = fds[i];
168 rec = (struct messaging_rec) {
169 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170 .src = src, .dest = dst,
171 .buf.data = buf, .buf.length = buflen,
172 .num_fds = num_fds, .fds = fds64,
175 result = messaging_rec_dup(mem_ctx, &rec);
178 TALLOC_FREE(buf);
180 return result;
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184 struct tevent_context *ev)
186 size_t i, num_event_contexts;
187 struct messaging_registered_ev *free_reg = NULL;
188 struct messaging_registered_ev *tmp;
190 num_event_contexts = talloc_array_length(ctx->event_contexts);
192 for (i=0; i<num_event_contexts; i++) {
193 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
195 if (reg->refcount == 0) {
196 if (reg->ev != NULL) {
197 abort();
199 free_reg = reg;
201 * We continue here and may find another
202 * free_req, but the important thing is
203 * that we continue to search for an
204 * existing registration in the loop.
206 continue;
209 if (reg->ev == ev) {
210 reg->refcount += 1;
211 return true;
215 if (free_reg == NULL) {
216 struct tevent_immediate *im = NULL;
218 im = tevent_create_immediate(ctx);
219 if (im == NULL) {
220 return false;
223 tmp = talloc_realloc(ctx, ctx->event_contexts,
224 struct messaging_registered_ev,
225 num_event_contexts+1);
226 if (tmp == NULL) {
227 return false;
229 ctx->event_contexts = tmp;
231 free_reg = &ctx->event_contexts[num_event_contexts];
232 free_reg->im = talloc_move(ctx->event_contexts, &im);
236 * free_reg->im might be cached
238 free_reg->ev = ev;
239 free_reg->refcount = 1;
241 return true;
244 static bool messaging_deregister_event_context(struct messaging_context *ctx,
245 struct tevent_context *ev)
247 size_t i, num_event_contexts;
249 num_event_contexts = talloc_array_length(ctx->event_contexts);
251 for (i=0; i<num_event_contexts; i++) {
252 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
254 if (reg->refcount == 0) {
255 continue;
258 if (reg->ev == ev) {
259 reg->refcount -= 1;
261 if (reg->refcount == 0) {
263 * The primary event context
264 * is never unregistered using
265 * messaging_deregister_event_context()
266 * it's only registered using
267 * messaging_register_event_context().
269 SMB_ASSERT(ev != ctx->event_ctx);
270 SMB_ASSERT(reg->ev != ctx->event_ctx);
273 * Not strictly necessary, just
274 * paranoia
276 reg->ev = NULL;
279 * Do not talloc_free(reg->im),
280 * recycle immediates events.
282 * We just invalidate it using
283 * the primary event context,
284 * which is never unregistered.
286 tevent_schedule_immediate(reg->im,
287 ctx->event_ctx,
288 NULL, NULL);
290 return true;
293 return false;
296 static void messaging_post_main_event_context(struct tevent_context *ev,
297 struct tevent_immediate *im,
298 void *private_data)
300 struct messaging_context *ctx = talloc_get_type_abort(
301 private_data, struct messaging_context);
303 while (ctx->posted_msgs != NULL) {
304 struct messaging_rec *rec = ctx->posted_msgs;
305 bool consumed;
307 DLIST_REMOVE(ctx->posted_msgs, rec);
309 consumed = messaging_dispatch_classic(ctx, rec);
310 if (!consumed) {
311 consumed = messaging_dispatch_waiters(
312 ctx, ctx->event_ctx, rec);
315 if (!consumed) {
316 uint8_t i;
318 for (i=0; i<rec->num_fds; i++) {
319 close(rec->fds[i]);
323 TALLOC_FREE(rec);
327 static void messaging_post_sub_event_context(struct tevent_context *ev,
328 struct tevent_immediate *im,
329 void *private_data)
331 struct messaging_context *ctx = talloc_get_type_abort(
332 private_data, struct messaging_context);
333 struct messaging_rec *rec, *next;
335 for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
336 bool consumed;
338 next = rec->next;
340 consumed = messaging_dispatch_waiters(ctx, ev, rec);
341 if (consumed) {
342 DLIST_REMOVE(ctx->posted_msgs, rec);
343 TALLOC_FREE(rec);
348 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
350 size_t i, num_event_contexts;
352 num_event_contexts = talloc_array_length(ctx->event_contexts);
354 for (i=0; i<num_event_contexts; i++) {
355 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
357 if (reg->refcount == 0) {
358 continue;
362 * We depend on schedule_immediate to work
363 * multiple times. Might be a bit inefficient,
364 * but this needs to be proven in tests. The
365 * alternatively would be to track whether the
366 * immediate has already been scheduled. For
367 * now, avoid that complexity here.
370 if (reg->ev == ctx->event_ctx) {
371 tevent_schedule_immediate(
372 reg->im, reg->ev,
373 messaging_post_main_event_context,
374 ctx);
375 } else {
376 tevent_schedule_immediate(
377 reg->im, reg->ev,
378 messaging_post_sub_event_context,
379 ctx);
383 return true;
386 static void messaging_recv_cb(struct tevent_context *ev,
387 const uint8_t *msg, size_t msg_len,
388 int *fds, size_t num_fds,
389 void *private_data)
391 struct messaging_context *msg_ctx = talloc_get_type_abort(
392 private_data, struct messaging_context);
393 struct server_id_buf idbuf;
394 struct messaging_rec rec;
395 int64_t fds64[MIN(num_fds, INT8_MAX)];
396 size_t i;
398 if (msg_len < MESSAGE_HDR_LENGTH) {
399 DBG_WARNING("message too short: %zu\n", msg_len);
400 goto close_fail;
403 if (num_fds > INT8_MAX) {
404 DBG_WARNING("too many fds: %zu\n", num_fds);
405 goto close_fail;
409 * "consume" the fds by copying them and setting
410 * the original variable to -1
412 for (i=0; i < num_fds; i++) {
413 fds64[i] = fds[i];
414 fds[i] = -1;
417 rec = (struct messaging_rec) {
418 .msg_version = MESSAGE_VERSION,
419 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
420 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
421 .num_fds = num_fds,
422 .fds = fds64,
425 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
427 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
428 (unsigned)rec.msg_type, rec.buf.length, num_fds,
429 server_id_str_buf(rec.src, &idbuf));
431 if (server_id_same_process(&rec.src, &msg_ctx->id)) {
432 DBG_DEBUG("Ignoring self-send\n");
433 goto close_fail;
436 messaging_dispatch_rec(msg_ctx, ev, &rec);
437 return;
439 close_fail:
440 for (i=0; i < num_fds; i++) {
441 close(fds[i]);
445 static int messaging_context_destructor(struct messaging_context *ctx)
447 size_t i;
449 for (i=0; i<ctx->num_new_waiters; i++) {
450 if (ctx->new_waiters[i] != NULL) {
451 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
452 ctx->new_waiters[i] = NULL;
455 for (i=0; i<ctx->num_waiters; i++) {
456 if (ctx->waiters[i] != NULL) {
457 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
458 ctx->waiters[i] = NULL;
463 * The immediates from messaging_alert_event_contexts
464 * reference "ctx". Don't let them outlive the
465 * messaging_context we're destroying here.
467 TALLOC_FREE(ctx->event_contexts);
469 return 0;
472 static const char *private_path(const char *name)
474 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
477 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
478 struct tevent_context *ev,
479 struct messaging_context **pmsg_ctx)
481 TALLOC_CTX *frame;
482 struct messaging_context *ctx;
483 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
484 int ret;
485 const char *lck_path;
486 const char *priv_path;
487 bool ok;
490 * sec_init() *must* be called before any other
491 * functions that use sec_XXX(). e.g. sec_initial_uid().
494 sec_init();
496 lck_path = lock_path(talloc_tos(), "msg.lock");
497 if (lck_path == NULL) {
498 return NT_STATUS_NO_MEMORY;
501 ok = directory_create_or_exist_strict(lck_path,
502 sec_initial_uid(),
503 0755);
504 if (!ok) {
505 DBG_DEBUG("Could not create lock directory: %s\n",
506 strerror(errno));
507 return NT_STATUS_ACCESS_DENIED;
510 priv_path = private_path("msg.sock");
511 if (priv_path == NULL) {
512 return NT_STATUS_NO_MEMORY;
515 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
516 0700);
517 if (!ok) {
518 DBG_DEBUG("Could not create msg directory: %s\n",
519 strerror(errno));
520 return NT_STATUS_ACCESS_DENIED;
523 frame = talloc_stackframe();
524 if (frame == NULL) {
525 return NT_STATUS_NO_MEMORY;
528 ctx = talloc_zero(frame, struct messaging_context);
529 if (ctx == NULL) {
530 status = NT_STATUS_NO_MEMORY;
531 goto done;
534 ctx->id = (struct server_id) {
535 .pid = getpid(), .vnn = NONCLUSTER_VNN
538 ctx->event_ctx = ev;
540 ok = messaging_register_event_context(ctx, ev);
541 if (!ok) {
542 status = NT_STATUS_NO_MEMORY;
543 goto done;
546 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
547 ctx->event_ctx,
548 &ctx->id.unique_id,
549 priv_path,
550 lck_path,
551 messaging_recv_cb,
552 ctx,
553 &ret);
554 if (ctx->msg_dgm_ref == NULL) {
555 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
556 status = map_nt_error_from_unix(ret);
557 goto done;
559 talloc_set_destructor(ctx, messaging_context_destructor);
561 #ifdef CLUSTER_SUPPORT
562 if (lp_clustering()) {
563 ctx->msg_ctdb_ref = messaging_ctdb_ref(
564 ctx, ctx->event_ctx,
565 lp_ctdbd_socket(), lp_ctdb_timeout(),
566 ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
567 if (ctx->msg_ctdb_ref == NULL) {
568 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
569 strerror(ret));
570 status = map_nt_error_from_unix(ret);
571 goto done;
574 #endif
576 ctx->id.vnn = get_my_vnn();
578 ctx->names_db = server_id_db_init(ctx,
579 ctx->id,
580 lp_lock_directory(),
582 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
583 if (ctx->names_db == NULL) {
584 DBG_DEBUG("server_id_db_init failed\n");
585 status = NT_STATUS_NO_MEMORY;
586 goto done;
589 messaging_register(ctx, NULL, MSG_PING, ping_message);
591 /* Register some debugging related messages */
593 register_msg_pool_usage(ctx);
594 register_dmalloc_msgs(ctx);
595 debug_register_msgs(ctx);
598 struct server_id_buf tmp;
599 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
602 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
604 status = NT_STATUS_OK;
605 done:
606 TALLOC_FREE(frame);
608 return status;
611 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
612 struct tevent_context *ev)
614 struct messaging_context *ctx = NULL;
615 NTSTATUS status;
617 status = messaging_init_internal(mem_ctx,
619 &ctx);
620 if (!NT_STATUS_IS_OK(status)) {
621 return NULL;
624 return ctx;
627 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
629 return msg_ctx->id;
633 * re-init after a fork
635 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
637 int ret;
638 char *lck_path;
640 TALLOC_FREE(msg_ctx->msg_dgm_ref);
641 TALLOC_FREE(msg_ctx->msg_ctdb_ref);
643 msg_ctx->id = (struct server_id) {
644 .pid = getpid(), .vnn = msg_ctx->id.vnn
647 lck_path = lock_path(talloc_tos(), "msg.lock");
648 if (lck_path == NULL) {
649 return NT_STATUS_NO_MEMORY;
652 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
653 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
654 private_path("msg.sock"), lck_path,
655 messaging_recv_cb, msg_ctx, &ret);
657 if (msg_ctx->msg_dgm_ref == NULL) {
658 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
659 return map_nt_error_from_unix(ret);
662 if (lp_clustering()) {
663 msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
664 msg_ctx, msg_ctx->event_ctx,
665 lp_ctdbd_socket(), lp_ctdb_timeout(),
666 msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
667 &ret);
668 if (msg_ctx->msg_ctdb_ref == NULL) {
669 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
670 strerror(ret));
671 return map_nt_error_from_unix(ret);
675 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
677 return NT_STATUS_OK;
682 * Register a dispatch function for a particular message type. Allow multiple
683 * registrants
685 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
686 void *private_data,
687 uint32_t msg_type,
688 void (*fn)(struct messaging_context *msg,
689 void *private_data,
690 uint32_t msg_type,
691 struct server_id server_id,
692 DATA_BLOB *data))
694 struct messaging_callback *cb;
696 DEBUG(5, ("Registering messaging pointer for type %u - "
697 "private_data=%p\n",
698 (unsigned)msg_type, private_data));
701 * Only one callback per type
704 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
705 /* we allow a second registration of the same message
706 type if it has a different private pointer. This is
707 needed in, for example, the internal notify code,
708 which creates a new notify context for each tree
709 connect, and expects to receive messages to each of
710 them. */
711 if (cb->msg_type == msg_type && private_data == cb->private_data) {
712 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
713 (unsigned)msg_type, private_data));
714 cb->fn = fn;
715 cb->private_data = private_data;
716 return NT_STATUS_OK;
720 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
721 return NT_STATUS_NO_MEMORY;
724 cb->msg_type = msg_type;
725 cb->fn = fn;
726 cb->private_data = private_data;
728 DLIST_ADD(msg_ctx->callbacks, cb);
729 return NT_STATUS_OK;
733 De-register the function for a particular message type.
735 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
736 void *private_data)
738 struct messaging_callback *cb, *next;
740 for (cb = ctx->callbacks; cb; cb = next) {
741 next = cb->next;
742 if ((cb->msg_type == msg_type)
743 && (cb->private_data == private_data)) {
744 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
745 (unsigned)msg_type, private_data));
746 DLIST_REMOVE(ctx->callbacks, cb);
747 TALLOC_FREE(cb);
753 Send a message to a particular server
755 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
756 struct server_id server, uint32_t msg_type,
757 const DATA_BLOB *data)
759 struct iovec iov = {0};
761 if (data != NULL) {
762 iov.iov_base = data->data;
763 iov.iov_len = data->length;
766 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
769 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
770 struct server_id server, uint32_t msg_type,
771 const uint8_t *buf, size_t len)
773 DATA_BLOB blob = data_blob_const(buf, len);
774 return messaging_send(msg_ctx, server, msg_type, &blob);
777 static int messaging_post_self(struct messaging_context *msg_ctx,
778 struct server_id src, struct server_id dst,
779 uint32_t msg_type,
780 const struct iovec *iov, int iovlen,
781 const int *fds, size_t num_fds)
783 struct messaging_rec *rec;
784 bool ok;
786 rec = messaging_rec_create(
787 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
788 if (rec == NULL) {
789 return ENOMEM;
792 ok = messaging_alert_event_contexts(msg_ctx);
793 if (!ok) {
794 TALLOC_FREE(rec);
795 return ENOMEM;
798 DLIST_ADD_END(msg_ctx->posted_msgs, rec);
800 return 0;
803 int messaging_send_iov_from(struct messaging_context *msg_ctx,
804 struct server_id src, struct server_id dst,
805 uint32_t msg_type,
806 const struct iovec *iov, int iovlen,
807 const int *fds, size_t num_fds)
809 int ret;
810 uint8_t hdr[MESSAGE_HDR_LENGTH];
811 struct iovec iov2[iovlen+1];
813 if (server_id_is_disconnected(&dst)) {
814 return EINVAL;
817 if (num_fds > INT8_MAX) {
818 return EINVAL;
821 if (server_id_equal(&dst, &msg_ctx->id)) {
822 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
823 iov, iovlen, fds, num_fds);
824 return ret;
827 message_hdr_put(hdr, msg_type, src, dst);
828 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
829 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
831 if (dst.vnn != msg_ctx->id.vnn) {
832 if (num_fds > 0) {
833 return ENOSYS;
836 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
837 return ret;
840 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
842 if (ret == EACCES) {
843 become_root();
844 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
845 fds, num_fds);
846 unbecome_root();
849 if (ret == ECONNREFUSED) {
851 * Linux returns this when a socket exists in the file
852 * system without a listening process. This is not
853 * documented in susv4 or the linux manpages, but it's
854 * easily testable. For the higher levels this is the
855 * same as "destination does not exist"
857 ret = ENOENT;
860 return ret;
863 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
864 struct server_id dst, uint32_t msg_type,
865 const struct iovec *iov, int iovlen,
866 const int *fds, size_t num_fds)
868 int ret;
870 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
871 iov, iovlen, fds, num_fds);
872 if (ret != 0) {
873 return map_nt_error_from_unix(ret);
875 return NT_STATUS_OK;
878 struct send_all_state {
879 struct messaging_context *msg_ctx;
880 int msg_type;
881 const void *buf;
882 size_t len;
885 static int send_all_fn(pid_t pid, void *private_data)
887 struct send_all_state *state = private_data;
888 NTSTATUS status;
890 if (pid == getpid()) {
891 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
892 return 0;
895 status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
896 state->msg_type, state->buf, state->len);
897 if (!NT_STATUS_IS_OK(status)) {
898 DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
899 (uintmax_t)pid, nt_errstr(status));
902 return 0;
905 void messaging_send_all(struct messaging_context *msg_ctx,
906 int msg_type, const void *buf, size_t len)
908 struct send_all_state state = {
909 .msg_ctx = msg_ctx, .msg_type = msg_type,
910 .buf = buf, .len = len
912 int ret;
914 #ifdef CLUSTER_SUPPORT
915 if (lp_clustering()) {
916 struct ctdbd_connection *conn = messaging_ctdb_connection();
917 uint8_t msghdr[MESSAGE_HDR_LENGTH];
918 struct iovec iov[] = {
919 { .iov_base = msghdr,
920 .iov_len = sizeof(msghdr) },
921 { .iov_base = discard_const_p(void, buf),
922 .iov_len = len }
925 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
926 (struct server_id) {0});
928 ret = ctdbd_messaging_send_iov(
929 conn, CTDB_BROADCAST_CONNECTED,
930 CTDB_SRVID_SAMBA_PROCESS,
931 iov, ARRAY_SIZE(iov));
932 if (ret != 0) {
933 DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
934 strerror(ret));
937 return;
939 #endif
941 ret = messaging_dgm_forall(send_all_fn, &state);
942 if (ret != 0) {
943 DBG_WARNING("messaging_dgm_forall failed: %s\n",
944 strerror(ret));
948 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
949 struct messaging_rec *rec)
951 struct messaging_rec *result;
952 size_t fds_size = sizeof(int64_t) * rec->num_fds;
953 size_t payload_len;
955 payload_len = rec->buf.length + fds_size;
956 if (payload_len < rec->buf.length) {
957 /* overflow */
958 return NULL;
961 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
962 payload_len);
963 if (result == NULL) {
964 return NULL;
966 *result = *rec;
968 /* Doesn't fail, see talloc_pooled_object */
970 result->buf.data = talloc_memdup(result, rec->buf.data,
971 rec->buf.length);
973 result->fds = NULL;
974 if (result->num_fds > 0) {
975 result->fds = talloc_memdup(result, rec->fds, fds_size);
978 return result;
981 struct messaging_filtered_read_state {
982 struct tevent_context *ev;
983 struct messaging_context *msg_ctx;
984 struct messaging_dgm_fde *fde;
985 struct messaging_ctdb_fde *cluster_fde;
987 bool (*filter)(struct messaging_rec *rec, void *private_data);
988 void *private_data;
990 struct messaging_rec *rec;
993 static void messaging_filtered_read_cleanup(struct tevent_req *req,
994 enum tevent_req_state req_state);
996 struct tevent_req *messaging_filtered_read_send(
997 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
998 struct messaging_context *msg_ctx,
999 bool (*filter)(struct messaging_rec *rec, void *private_data),
1000 void *private_data)
1002 struct tevent_req *req;
1003 struct messaging_filtered_read_state *state;
1004 size_t new_waiters_len;
1005 bool ok;
1007 req = tevent_req_create(mem_ctx, &state,
1008 struct messaging_filtered_read_state);
1009 if (req == NULL) {
1010 return NULL;
1012 state->ev = ev;
1013 state->msg_ctx = msg_ctx;
1014 state->filter = filter;
1015 state->private_data = private_data;
1018 * We have to defer the callback here, as we might be called from
1019 * within a different tevent_context than state->ev
1021 tevent_req_defer_callback(req, state->ev);
1023 state->fde = messaging_dgm_register_tevent_context(state, ev);
1024 if (tevent_req_nomem(state->fde, req)) {
1025 return tevent_req_post(req, ev);
1028 if (lp_clustering()) {
1029 state->cluster_fde =
1030 messaging_ctdb_register_tevent_context(state, ev);
1031 if (tevent_req_nomem(state->cluster_fde, req)) {
1032 return tevent_req_post(req, ev);
1037 * We add ourselves to the "new_waiters" array, not the "waiters"
1038 * array. If we are called from within messaging_read_done,
1039 * messaging_dispatch_rec will be in an active for-loop on
1040 * "waiters". We must be careful not to mess with this array, because
1041 * it could mean that a single event is being delivered twice.
1044 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1046 if (new_waiters_len == msg_ctx->num_new_waiters) {
1047 struct tevent_req **tmp;
1049 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1050 struct tevent_req *, new_waiters_len+1);
1051 if (tevent_req_nomem(tmp, req)) {
1052 return tevent_req_post(req, ev);
1054 msg_ctx->new_waiters = tmp;
1057 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1058 msg_ctx->num_new_waiters += 1;
1059 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1061 ok = messaging_register_event_context(msg_ctx, ev);
1062 if (!ok) {
1063 tevent_req_oom(req);
1064 return tevent_req_post(req, ev);
1067 return req;
1070 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1071 enum tevent_req_state req_state)
1073 struct messaging_filtered_read_state *state = tevent_req_data(
1074 req, struct messaging_filtered_read_state);
1075 struct messaging_context *msg_ctx = state->msg_ctx;
1076 size_t i;
1077 bool ok;
1079 tevent_req_set_cleanup_fn(req, NULL);
1081 TALLOC_FREE(state->fde);
1082 TALLOC_FREE(state->cluster_fde);
1084 ok = messaging_deregister_event_context(msg_ctx, state->ev);
1085 if (!ok) {
1086 abort();
1090 * Just set the [new_]waiters entry to NULL, be careful not to mess
1091 * with the other "waiters" array contents. We are often called from
1092 * within "messaging_dispatch_rec", which loops over
1093 * "waiters". Messing with the "waiters" array will mess up that
1094 * for-loop.
1097 for (i=0; i<msg_ctx->num_waiters; i++) {
1098 if (msg_ctx->waiters[i] == req) {
1099 msg_ctx->waiters[i] = NULL;
1100 return;
1104 for (i=0; i<msg_ctx->num_new_waiters; i++) {
1105 if (msg_ctx->new_waiters[i] == req) {
1106 msg_ctx->new_waiters[i] = NULL;
1107 return;
1112 static void messaging_filtered_read_done(struct tevent_req *req,
1113 struct messaging_rec *rec)
1115 struct messaging_filtered_read_state *state = tevent_req_data(
1116 req, struct messaging_filtered_read_state);
1118 state->rec = messaging_rec_dup(state, rec);
1119 if (tevent_req_nomem(state->rec, req)) {
1120 return;
1122 tevent_req_done(req);
1125 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1126 struct messaging_rec **presult)
1128 struct messaging_filtered_read_state *state = tevent_req_data(
1129 req, struct messaging_filtered_read_state);
1130 int err;
1132 if (tevent_req_is_unix_error(req, &err)) {
1133 tevent_req_received(req);
1134 return err;
1136 if (presult != NULL) {
1137 *presult = talloc_move(mem_ctx, &state->rec);
1139 return 0;
1142 struct messaging_read_state {
1143 uint32_t msg_type;
1144 struct messaging_rec *rec;
1147 static bool messaging_read_filter(struct messaging_rec *rec,
1148 void *private_data);
1149 static void messaging_read_done(struct tevent_req *subreq);
1151 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1152 struct tevent_context *ev,
1153 struct messaging_context *msg,
1154 uint32_t msg_type)
1156 struct tevent_req *req, *subreq;
1157 struct messaging_read_state *state;
1159 req = tevent_req_create(mem_ctx, &state,
1160 struct messaging_read_state);
1161 if (req == NULL) {
1162 return NULL;
1164 state->msg_type = msg_type;
1166 subreq = messaging_filtered_read_send(state, ev, msg,
1167 messaging_read_filter, state);
1168 if (tevent_req_nomem(subreq, req)) {
1169 return tevent_req_post(req, ev);
1171 tevent_req_set_callback(subreq, messaging_read_done, req);
1172 return req;
1175 static bool messaging_read_filter(struct messaging_rec *rec,
1176 void *private_data)
1178 struct messaging_read_state *state = talloc_get_type_abort(
1179 private_data, struct messaging_read_state);
1181 if (rec->num_fds != 0) {
1182 return false;
1185 return rec->msg_type == state->msg_type;
1188 static void messaging_read_done(struct tevent_req *subreq)
1190 struct tevent_req *req = tevent_req_callback_data(
1191 subreq, struct tevent_req);
1192 struct messaging_read_state *state = tevent_req_data(
1193 req, struct messaging_read_state);
1194 int ret;
1196 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1197 TALLOC_FREE(subreq);
1198 if (tevent_req_error(req, ret)) {
1199 return;
1201 tevent_req_done(req);
1204 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1205 struct messaging_rec **presult)
1207 struct messaging_read_state *state = tevent_req_data(
1208 req, struct messaging_read_state);
1209 int err;
1211 if (tevent_req_is_unix_error(req, &err)) {
1212 return err;
1214 if (presult != NULL) {
1215 *presult = talloc_move(mem_ctx, &state->rec);
1217 return 0;
1220 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1222 if (msg_ctx->num_new_waiters == 0) {
1223 return true;
1226 if (talloc_array_length(msg_ctx->waiters) <
1227 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1228 struct tevent_req **tmp;
1229 tmp = talloc_realloc(
1230 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1231 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1232 if (tmp == NULL) {
1233 DEBUG(1, ("%s: talloc failed\n", __func__));
1234 return false;
1236 msg_ctx->waiters = tmp;
1239 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1240 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1242 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1243 msg_ctx->num_new_waiters = 0;
1245 return true;
1248 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1249 struct messaging_rec *rec)
1251 struct messaging_callback *cb, *next;
1253 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1254 size_t j;
1256 next = cb->next;
1257 if (cb->msg_type != rec->msg_type) {
1258 continue;
1262 * the old style callbacks don't support fd passing
1264 for (j=0; j < rec->num_fds; j++) {
1265 int fd = rec->fds[j];
1266 close(fd);
1268 rec->num_fds = 0;
1269 rec->fds = NULL;
1271 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1272 rec->src, &rec->buf);
1274 return true;
1277 return false;
1280 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1281 struct tevent_context *ev,
1282 struct messaging_rec *rec)
1284 size_t i;
1286 if (!messaging_append_new_waiters(msg_ctx)) {
1287 return false;
1290 i = 0;
1291 while (i < msg_ctx->num_waiters) {
1292 struct tevent_req *req;
1293 struct messaging_filtered_read_state *state;
1295 req = msg_ctx->waiters[i];
1296 if (req == NULL) {
1298 * This got cleaned up. In the meantime,
1299 * move everything down one. We need
1300 * to keep the order of waiters, as
1301 * other code may depend on this.
1303 if (i < msg_ctx->num_waiters - 1) {
1304 memmove(&msg_ctx->waiters[i],
1305 &msg_ctx->waiters[i+1],
1306 sizeof(struct tevent_req *) *
1307 (msg_ctx->num_waiters - i - 1));
1309 msg_ctx->num_waiters -= 1;
1310 continue;
1313 state = tevent_req_data(
1314 req, struct messaging_filtered_read_state);
1315 if ((ev == state->ev) &&
1316 state->filter(rec, state->private_data)) {
1317 messaging_filtered_read_done(req, rec);
1318 return true;
1321 i += 1;
1324 return false;
1328 Dispatch one messaging_rec
1330 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1331 struct tevent_context *ev,
1332 struct messaging_rec *rec)
1334 bool consumed;
1335 size_t i;
1337 if (ev == msg_ctx->event_ctx) {
1338 consumed = messaging_dispatch_classic(msg_ctx, rec);
1339 if (consumed) {
1340 return;
1344 consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1345 if (consumed) {
1346 return;
1349 if (ev != msg_ctx->event_ctx) {
1350 struct iovec iov;
1351 int fds[rec->num_fds];
1352 int ret;
1355 * We've been listening on a nested event
1356 * context. Messages need to be handled in the main
1357 * event context, so post to ourselves
1360 iov.iov_base = rec->buf.data;
1361 iov.iov_len = rec->buf.length;
1363 for (i=0; i<rec->num_fds; i++) {
1364 fds[i] = rec->fds[i];
1367 ret = messaging_post_self(
1368 msg_ctx, rec->src, rec->dest, rec->msg_type,
1369 &iov, 1, fds, rec->num_fds);
1370 if (ret == 0) {
1371 return;
1376 * If the fd-array isn't used, just close it.
1378 for (i=0; i < rec->num_fds; i++) {
1379 int fd = rec->fds[i];
1380 close(fd);
1382 rec->num_fds = 0;
1383 rec->fds = NULL;
1386 static int mess_parent_dgm_cleanup(void *private_data);
1387 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1389 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1391 struct tevent_req *req;
1393 req = background_job_send(
1394 msg, msg->event_ctx, msg, NULL, 0,
1395 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1396 60*15),
1397 mess_parent_dgm_cleanup, msg);
1398 if (req == NULL) {
1399 DBG_WARNING("background_job_send failed\n");
1400 return false;
1402 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1403 return true;
1406 static int mess_parent_dgm_cleanup(void *private_data)
1408 int ret;
1410 ret = messaging_dgm_wipe();
1411 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1412 ret ? strerror(ret) : "ok"));
1413 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1414 60*15);
1417 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1419 struct messaging_context *msg = tevent_req_callback_data(
1420 req, struct messaging_context);
1421 NTSTATUS status;
1423 status = background_job_recv(req);
1424 TALLOC_FREE(req);
1425 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1426 nt_errstr(status)));
1428 req = background_job_send(
1429 msg, msg->event_ctx, msg, NULL, 0,
1430 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1431 60*15),
1432 mess_parent_dgm_cleanup, msg);
1433 if (req == NULL) {
1434 DEBUG(1, ("background_job_send failed\n"));
1435 return;
1437 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1440 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1442 int ret;
1444 if (pid == 0) {
1445 ret = messaging_dgm_wipe();
1446 } else {
1447 ret = messaging_dgm_cleanup(pid);
1450 return ret;
1453 struct tevent_context *messaging_tevent_context(
1454 struct messaging_context *msg_ctx)
1456 return msg_ctx->event_ctx;
1459 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1461 return msg_ctx->names_db;
1464 /** @} **/