smbd: Protect ea-reading on symlinks
[Samba.git] / source3 / lib / messages.c
blob7d3d46960a92805179d20a65948dbe31a87ae9b6
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messaging/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messaging/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
70 struct messaging_callback {
71 struct messaging_callback *prev, *next;
72 uint32_t msg_type;
73 void (*fn)(struct messaging_context *msg, void *private_data,
74 uint32_t msg_type,
75 struct server_id server_id, DATA_BLOB *data);
76 void *private_data;
79 struct messaging_registered_ev {
80 struct tevent_context *ev;
81 struct tevent_immediate *im;
82 size_t refcount;
85 struct messaging_context {
86 struct server_id id;
87 struct tevent_context *event_ctx;
88 struct messaging_callback *callbacks;
90 struct messaging_rec *posted_msgs;
92 struct messaging_registered_ev *event_contexts;
94 struct tevent_req **new_waiters;
95 size_t num_new_waiters;
97 struct tevent_req **waiters;
98 size_t num_waiters;
100 struct server_id_db *names_db;
102 TALLOC_CTX *per_process_talloc_ctx;
105 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
106 struct messaging_rec *rec);
107 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
108 struct messaging_rec *rec);
109 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
110 struct tevent_context *ev,
111 struct messaging_rec *rec);
112 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
113 struct tevent_context *ev,
114 struct messaging_rec *rec);
116 /****************************************************************************
117 A useful function for testing the message system.
118 ****************************************************************************/
120 static void ping_message(struct messaging_context *msg_ctx,
121 void *private_data,
122 uint32_t msg_type,
123 struct server_id src,
124 DATA_BLOB *data)
126 struct server_id_buf idbuf;
128 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
129 server_id_str_buf(src, &idbuf), (int)data->length,
130 data->data ? (char *)data->data : ""));
132 messaging_send(msg_ctx, src, MSG_PONG, data);
135 struct messaging_rec *messaging_rec_create(
136 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
137 uint32_t msg_type, const struct iovec *iov, int iovlen,
138 const int *fds, size_t num_fds)
140 ssize_t buflen;
141 uint8_t *buf;
142 struct messaging_rec *result;
144 if (num_fds > INT8_MAX) {
145 return NULL;
148 buflen = iov_buflen(iov, iovlen);
149 if (buflen == -1) {
150 return NULL;
152 buf = talloc_array(mem_ctx, uint8_t, buflen);
153 if (buf == NULL) {
154 return NULL;
156 iov_buf(iov, iovlen, buf, buflen);
159 struct messaging_rec rec;
160 int64_t fds64[MAX(1, num_fds)];
161 size_t i;
163 for (i=0; i<num_fds; i++) {
164 fds64[i] = fds[i];
167 rec = (struct messaging_rec) {
168 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
169 .src = src, .dest = dst,
170 .buf.data = buf, .buf.length = buflen,
171 .num_fds = num_fds, .fds = fds64,
174 result = messaging_rec_dup(mem_ctx, &rec);
177 TALLOC_FREE(buf);
179 return result;
182 static bool messaging_register_event_context(struct messaging_context *ctx,
183 struct tevent_context *ev)
185 size_t i, num_event_contexts;
186 struct messaging_registered_ev *free_reg = NULL;
187 struct messaging_registered_ev *tmp;
189 num_event_contexts = talloc_array_length(ctx->event_contexts);
191 for (i=0; i<num_event_contexts; i++) {
192 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
194 if (reg->refcount == 0) {
195 if (reg->ev != NULL) {
196 abort();
198 free_reg = reg;
200 * We continue here and may find another
201 * free_req, but the important thing is
202 * that we continue to search for an
203 * existing registration in the loop.
205 continue;
208 if (reg->ev == ev) {
209 reg->refcount += 1;
210 return true;
214 if (free_reg == NULL) {
215 struct tevent_immediate *im = NULL;
217 im = tevent_create_immediate(ctx);
218 if (im == NULL) {
219 return false;
222 tmp = talloc_realloc(ctx, ctx->event_contexts,
223 struct messaging_registered_ev,
224 num_event_contexts+1);
225 if (tmp == NULL) {
226 return false;
228 ctx->event_contexts = tmp;
230 free_reg = &ctx->event_contexts[num_event_contexts];
231 free_reg->im = talloc_move(ctx->event_contexts, &im);
235 * free_reg->im might be cached
237 free_reg->ev = ev;
238 free_reg->refcount = 1;
240 return true;
243 static bool messaging_deregister_event_context(struct messaging_context *ctx,
244 struct tevent_context *ev)
246 size_t i, num_event_contexts;
248 num_event_contexts = talloc_array_length(ctx->event_contexts);
250 for (i=0; i<num_event_contexts; i++) {
251 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
253 if (reg->refcount == 0) {
254 continue;
257 if (reg->ev == ev) {
258 reg->refcount -= 1;
260 if (reg->refcount == 0) {
262 * The primary event context
263 * is never unregistered using
264 * messaging_deregister_event_context()
265 * it's only registered using
266 * messaging_register_event_context().
268 SMB_ASSERT(ev != ctx->event_ctx);
269 SMB_ASSERT(reg->ev != ctx->event_ctx);
272 * Not strictly necessary, just
273 * paranoia
275 reg->ev = NULL;
278 * Do not talloc_free(reg->im),
279 * recycle immediates events.
281 * We just invalidate it using
282 * the primary event context,
283 * which is never unregistered.
285 tevent_schedule_immediate(reg->im,
286 ctx->event_ctx,
287 NULL, NULL);
289 return true;
292 return false;
295 static void messaging_post_main_event_context(struct tevent_context *ev,
296 struct tevent_immediate *im,
297 void *private_data)
299 struct messaging_context *ctx = talloc_get_type_abort(
300 private_data, struct messaging_context);
302 while (ctx->posted_msgs != NULL) {
303 struct messaging_rec *rec = ctx->posted_msgs;
304 bool consumed;
306 DLIST_REMOVE(ctx->posted_msgs, rec);
308 consumed = messaging_dispatch_classic(ctx, rec);
309 if (!consumed) {
310 consumed = messaging_dispatch_waiters(
311 ctx, ctx->event_ctx, rec);
314 if (!consumed) {
315 uint8_t i;
317 for (i=0; i<rec->num_fds; i++) {
318 close(rec->fds[i]);
322 TALLOC_FREE(rec);
326 static void messaging_post_sub_event_context(struct tevent_context *ev,
327 struct tevent_immediate *im,
328 void *private_data)
330 struct messaging_context *ctx = talloc_get_type_abort(
331 private_data, struct messaging_context);
332 struct messaging_rec *rec, *next;
334 for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
335 bool consumed;
337 next = rec->next;
339 consumed = messaging_dispatch_waiters(ctx, ev, rec);
340 if (consumed) {
341 DLIST_REMOVE(ctx->posted_msgs, rec);
342 TALLOC_FREE(rec);
347 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
349 size_t i, num_event_contexts;
351 num_event_contexts = talloc_array_length(ctx->event_contexts);
353 for (i=0; i<num_event_contexts; i++) {
354 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
356 if (reg->refcount == 0) {
357 continue;
361 * We depend on schedule_immediate to work
362 * multiple times. Might be a bit inefficient,
363 * but this needs to be proven in tests. The
364 * alternatively would be to track whether the
365 * immediate has already been scheduled. For
366 * now, avoid that complexity here.
369 if (reg->ev == ctx->event_ctx) {
370 tevent_schedule_immediate(
371 reg->im, reg->ev,
372 messaging_post_main_event_context,
373 ctx);
374 } else {
375 tevent_schedule_immediate(
376 reg->im, reg->ev,
377 messaging_post_sub_event_context,
378 ctx);
382 return true;
385 static void messaging_recv_cb(struct tevent_context *ev,
386 const uint8_t *msg, size_t msg_len,
387 int *fds, size_t num_fds,
388 void *private_data)
390 struct messaging_context *msg_ctx = talloc_get_type_abort(
391 private_data, struct messaging_context);
392 struct server_id_buf idbuf;
393 struct messaging_rec rec;
394 int64_t fds64[MAX(1, MIN(num_fds, INT8_MAX))];
395 size_t i;
397 if (msg_len < MESSAGE_HDR_LENGTH) {
398 DBG_WARNING("message too short: %zu\n", msg_len);
399 return;
402 if (num_fds > INT8_MAX) {
403 DBG_WARNING("too many fds: %zu\n", num_fds);
404 return;
407 for (i=0; i < num_fds; i++) {
408 fds64[i] = fds[i];
411 rec = (struct messaging_rec) {
412 .msg_version = MESSAGE_VERSION,
413 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
414 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
415 .num_fds = num_fds,
416 .fds = fds64,
419 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
421 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
422 (unsigned)rec.msg_type, rec.buf.length, num_fds,
423 server_id_str_buf(rec.src, &idbuf));
425 if (server_id_same_process(&rec.src, &msg_ctx->id)) {
426 DBG_DEBUG("Ignoring self-send\n");
427 return;
430 messaging_dispatch_rec(msg_ctx, ev, &rec);
432 for (i=0; i<num_fds; i++) {
433 fds[i] = fds64[i];
437 static int messaging_context_destructor(struct messaging_context *ctx)
439 size_t i;
441 for (i=0; i<ctx->num_new_waiters; i++) {
442 if (ctx->new_waiters[i] != NULL) {
443 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
444 ctx->new_waiters[i] = NULL;
447 for (i=0; i<ctx->num_waiters; i++) {
448 if (ctx->waiters[i] != NULL) {
449 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
450 ctx->waiters[i] = NULL;
455 * The immediates from messaging_alert_event_contexts
456 * reference "ctx". Don't let them outlive the
457 * messaging_context we're destroying here.
459 TALLOC_FREE(ctx->event_contexts);
461 return 0;
464 static const char *private_path(const char *name)
466 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
469 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
470 struct tevent_context *ev,
471 struct messaging_context **pmsg_ctx)
473 TALLOC_CTX *frame;
474 struct messaging_context *ctx;
475 NTSTATUS status;
476 int ret;
477 const char *lck_path;
478 const char *priv_path;
479 void *ref;
480 bool ok;
483 * sec_init() *must* be called before any other
484 * functions that use sec_XXX(). e.g. sec_initial_uid().
487 sec_init();
489 lck_path = lock_path(talloc_tos(), "msg.lock");
490 if (lck_path == NULL) {
491 return NT_STATUS_NO_MEMORY;
494 ok = directory_create_or_exist_strict(lck_path,
495 sec_initial_uid(),
496 0755);
497 if (!ok) {
498 DBG_DEBUG("Could not create lock directory: %s\n",
499 strerror(errno));
500 return NT_STATUS_ACCESS_DENIED;
503 priv_path = private_path("msg.sock");
504 if (priv_path == NULL) {
505 return NT_STATUS_NO_MEMORY;
508 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
509 0700);
510 if (!ok) {
511 DBG_DEBUG("Could not create msg directory: %s\n",
512 strerror(errno));
513 return NT_STATUS_ACCESS_DENIED;
516 frame = talloc_stackframe();
517 if (frame == NULL) {
518 return NT_STATUS_NO_MEMORY;
521 ctx = talloc_zero(frame, struct messaging_context);
522 if (ctx == NULL) {
523 status = NT_STATUS_NO_MEMORY;
524 goto done;
527 ctx->id = (struct server_id) {
528 .pid = tevent_cached_getpid(), .vnn = NONCLUSTER_VNN
531 ctx->event_ctx = ev;
533 ctx->per_process_talloc_ctx = talloc_new(ctx);
534 if (ctx->per_process_talloc_ctx == NULL) {
535 status = NT_STATUS_NO_MEMORY;
536 goto done;
539 ok = messaging_register_event_context(ctx, ev);
540 if (!ok) {
541 status = NT_STATUS_NO_MEMORY;
542 goto done;
545 ref = messaging_dgm_ref(
546 ctx->per_process_talloc_ctx,
547 ctx->event_ctx,
548 &ctx->id.unique_id,
549 priv_path,
550 lck_path,
551 messaging_recv_cb,
552 ctx,
553 &ret);
554 if (ref == NULL) {
555 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
556 status = map_nt_error_from_unix(ret);
557 goto done;
559 talloc_set_destructor(ctx, messaging_context_destructor);
561 #ifdef CLUSTER_SUPPORT
562 if (lp_clustering()) {
563 ref = messaging_ctdb_ref(
564 ctx->per_process_talloc_ctx,
565 ctx->event_ctx,
566 lp_ctdbd_socket(),
567 lp_ctdb_timeout(),
568 ctx->id.unique_id,
569 messaging_recv_cb,
570 ctx,
571 &ret);
572 if (ref == NULL) {
573 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
574 strerror(ret));
575 status = map_nt_error_from_unix(ret);
576 goto done;
579 #endif
581 ctx->id.vnn = get_my_vnn();
583 ctx->names_db = server_id_db_init(ctx,
584 ctx->id,
585 lp_lock_directory(),
587 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
588 if (ctx->names_db == NULL) {
589 DBG_DEBUG("server_id_db_init failed\n");
590 status = NT_STATUS_NO_MEMORY;
591 goto done;
594 messaging_register(ctx, NULL, MSG_PING, ping_message);
596 /* Register some debugging related messages */
598 register_msg_pool_usage(ctx->per_process_talloc_ctx, ctx);
599 register_dmalloc_msgs(ctx);
600 debug_register_msgs(ctx);
603 struct server_id_buf tmp;
604 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
607 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
609 status = NT_STATUS_OK;
610 done:
611 TALLOC_FREE(frame);
613 return status;
616 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
617 struct tevent_context *ev)
619 struct messaging_context *ctx = NULL;
620 NTSTATUS status;
622 status = messaging_init_internal(mem_ctx,
624 &ctx);
625 if (!NT_STATUS_IS_OK(status)) {
626 return NULL;
629 return ctx;
632 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
634 return msg_ctx->id;
638 * re-init after a fork
640 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
642 int ret;
643 char *lck_path;
644 void *ref;
646 TALLOC_FREE(msg_ctx->per_process_talloc_ctx);
648 msg_ctx->per_process_talloc_ctx = talloc_new(msg_ctx);
649 if (msg_ctx->per_process_talloc_ctx == NULL) {
650 return NT_STATUS_NO_MEMORY;
653 msg_ctx->id = (struct server_id) {
654 .pid = tevent_cached_getpid(), .vnn = msg_ctx->id.vnn
657 lck_path = lock_path(talloc_tos(), "msg.lock");
658 if (lck_path == NULL) {
659 return NT_STATUS_NO_MEMORY;
662 ref = messaging_dgm_ref(
663 msg_ctx->per_process_talloc_ctx,
664 msg_ctx->event_ctx,
665 &msg_ctx->id.unique_id,
666 private_path("msg.sock"),
667 lck_path,
668 messaging_recv_cb,
669 msg_ctx,
670 &ret);
672 if (ref == NULL) {
673 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
674 return map_nt_error_from_unix(ret);
677 if (lp_clustering()) {
678 ref = messaging_ctdb_ref(
679 msg_ctx->per_process_talloc_ctx,
680 msg_ctx->event_ctx,
681 lp_ctdbd_socket(),
682 lp_ctdb_timeout(),
683 msg_ctx->id.unique_id,
684 messaging_recv_cb,
685 msg_ctx,
686 &ret);
687 if (ref == NULL) {
688 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
689 strerror(ret));
690 return map_nt_error_from_unix(ret);
694 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
695 register_msg_pool_usage(msg_ctx->per_process_talloc_ctx, msg_ctx);
697 return NT_STATUS_OK;
702 * Register a dispatch function for a particular message type. Allow multiple
703 * registrants
705 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
706 void *private_data,
707 uint32_t msg_type,
708 void (*fn)(struct messaging_context *msg,
709 void *private_data,
710 uint32_t msg_type,
711 struct server_id server_id,
712 DATA_BLOB *data))
714 struct messaging_callback *cb;
716 DEBUG(5, ("Registering messaging pointer for type %u - "
717 "private_data=%p\n",
718 (unsigned)msg_type, private_data));
721 * Only one callback per type
724 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
725 /* we allow a second registration of the same message
726 type if it has a different private pointer. This is
727 needed in, for example, the internal notify code,
728 which creates a new notify context for each tree
729 connect, and expects to receive messages to each of
730 them. */
731 if (cb->msg_type == msg_type && private_data == cb->private_data) {
732 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
733 (unsigned)msg_type, private_data));
734 cb->fn = fn;
735 cb->private_data = private_data;
736 return NT_STATUS_OK;
740 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
741 return NT_STATUS_NO_MEMORY;
744 cb->msg_type = msg_type;
745 cb->fn = fn;
746 cb->private_data = private_data;
748 DLIST_ADD(msg_ctx->callbacks, cb);
749 return NT_STATUS_OK;
753 De-register the function for a particular message type.
755 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
756 void *private_data)
758 struct messaging_callback *cb, *next;
760 for (cb = ctx->callbacks; cb; cb = next) {
761 next = cb->next;
762 if ((cb->msg_type == msg_type)
763 && (cb->private_data == private_data)) {
764 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
765 (unsigned)msg_type, private_data));
766 DLIST_REMOVE(ctx->callbacks, cb);
767 TALLOC_FREE(cb);
773 Send a message to a particular server
775 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
776 struct server_id server, uint32_t msg_type,
777 const DATA_BLOB *data)
779 struct iovec iov = {0};
781 if (data != NULL) {
782 iov.iov_base = data->data;
783 iov.iov_len = data->length;
786 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
789 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
790 struct server_id server, uint32_t msg_type,
791 const uint8_t *buf, size_t len)
793 DATA_BLOB blob = data_blob_const(buf, len);
794 return messaging_send(msg_ctx, server, msg_type, &blob);
797 static int messaging_post_self(struct messaging_context *msg_ctx,
798 struct server_id src, struct server_id dst,
799 uint32_t msg_type,
800 const struct iovec *iov, int iovlen,
801 const int *fds, size_t num_fds)
803 struct messaging_rec *rec;
804 bool ok;
806 rec = messaging_rec_create(
807 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
808 if (rec == NULL) {
809 return ENOMEM;
812 ok = messaging_alert_event_contexts(msg_ctx);
813 if (!ok) {
814 TALLOC_FREE(rec);
815 return ENOMEM;
818 DLIST_ADD_END(msg_ctx->posted_msgs, rec);
820 return 0;
823 int messaging_send_iov_from(struct messaging_context *msg_ctx,
824 struct server_id src, struct server_id dst,
825 uint32_t msg_type,
826 const struct iovec *iov, int iovlen,
827 const int *fds, size_t num_fds)
829 int ret;
830 uint8_t hdr[MESSAGE_HDR_LENGTH];
831 struct iovec iov2[iovlen+1];
833 if (server_id_is_disconnected(&dst)) {
834 return EINVAL;
837 if (num_fds > INT8_MAX) {
838 return EINVAL;
841 if (server_id_equal(&dst, &msg_ctx->id)) {
842 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
843 iov, iovlen, fds, num_fds);
844 return ret;
847 message_hdr_put(hdr, msg_type, src, dst);
848 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
849 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
851 if (dst.vnn != msg_ctx->id.vnn) {
852 if (num_fds > 0) {
853 return ENOSYS;
856 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
857 return ret;
860 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
862 if (ret == EACCES) {
863 become_root();
864 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
865 fds, num_fds);
866 unbecome_root();
869 if (ret == ECONNREFUSED) {
871 * Linux returns this when a socket exists in the file
872 * system without a listening process. This is not
873 * documented in susv4 or the linux manpages, but it's
874 * easily testable. For the higher levels this is the
875 * same as "destination does not exist"
877 ret = ENOENT;
880 return ret;
883 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
884 struct server_id dst, uint32_t msg_type,
885 const struct iovec *iov, int iovlen,
886 const int *fds, size_t num_fds)
888 int ret;
890 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
891 iov, iovlen, fds, num_fds);
892 if (ret != 0) {
893 return map_nt_error_from_unix(ret);
895 return NT_STATUS_OK;
898 struct send_all_state {
899 struct messaging_context *msg_ctx;
900 int msg_type;
901 const void *buf;
902 size_t len;
905 static int send_all_fn(pid_t pid, void *private_data)
907 struct send_all_state *state = private_data;
908 NTSTATUS status;
910 if (pid == tevent_cached_getpid()) {
911 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
912 return 0;
915 status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
916 state->msg_type, state->buf, state->len);
917 if (!NT_STATUS_IS_OK(status)) {
918 DBG_NOTICE("messaging_send_buf to %ju failed: %s\n",
919 (uintmax_t)pid, nt_errstr(status));
922 return 0;
925 void messaging_send_all(struct messaging_context *msg_ctx,
926 int msg_type, const void *buf, size_t len)
928 struct send_all_state state = {
929 .msg_ctx = msg_ctx, .msg_type = msg_type,
930 .buf = buf, .len = len
932 int ret;
934 #ifdef CLUSTER_SUPPORT
935 if (lp_clustering()) {
936 struct ctdbd_connection *conn = messaging_ctdb_connection();
937 uint8_t msghdr[MESSAGE_HDR_LENGTH];
938 struct iovec iov[] = {
939 { .iov_base = msghdr,
940 .iov_len = sizeof(msghdr) },
941 { .iov_base = discard_const_p(void, buf),
942 .iov_len = len }
945 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
946 (struct server_id) {0});
948 ret = ctdbd_messaging_send_iov(
949 conn, CTDB_BROADCAST_CONNECTED,
950 CTDB_SRVID_SAMBA_PROCESS,
951 iov, ARRAY_SIZE(iov));
952 if (ret != 0) {
953 DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
954 strerror(ret));
957 return;
959 #endif
961 ret = messaging_dgm_forall(send_all_fn, &state);
962 if (ret != 0) {
963 DBG_WARNING("messaging_dgm_forall failed: %s\n",
964 strerror(ret));
968 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
969 struct messaging_rec *rec)
971 struct messaging_rec *result;
972 size_t fds_size = sizeof(int64_t) * rec->num_fds;
973 size_t payload_len;
975 payload_len = rec->buf.length + fds_size;
976 if (payload_len < rec->buf.length) {
977 /* overflow */
978 return NULL;
981 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
982 payload_len);
983 if (result == NULL) {
984 return NULL;
986 *result = *rec;
988 /* Doesn't fail, see talloc_pooled_object */
990 result->buf.data = talloc_memdup(result, rec->buf.data,
991 rec->buf.length);
993 result->fds = NULL;
994 if (result->num_fds > 0) {
995 size_t i;
997 result->fds = talloc_memdup(result, rec->fds, fds_size);
999 for (i=0; i<rec->num_fds; i++) {
1001 * fd's can only exist once
1003 rec->fds[i] = -1;
1007 return result;
1010 struct messaging_filtered_read_state {
1011 struct tevent_context *ev;
1012 struct messaging_context *msg_ctx;
1013 struct messaging_dgm_fde *fde;
1014 struct messaging_ctdb_fde *cluster_fde;
1016 bool (*filter)(struct messaging_rec *rec, void *private_data);
1017 void *private_data;
1019 struct messaging_rec *rec;
1022 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1023 enum tevent_req_state req_state);
1025 struct tevent_req *messaging_filtered_read_send(
1026 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1027 struct messaging_context *msg_ctx,
1028 bool (*filter)(struct messaging_rec *rec, void *private_data),
1029 void *private_data)
1031 struct tevent_req *req;
1032 struct messaging_filtered_read_state *state;
1033 size_t new_waiters_len;
1034 bool ok;
1036 req = tevent_req_create(mem_ctx, &state,
1037 struct messaging_filtered_read_state);
1038 if (req == NULL) {
1039 return NULL;
1041 state->ev = ev;
1042 state->msg_ctx = msg_ctx;
1043 state->filter = filter;
1044 state->private_data = private_data;
1047 * We have to defer the callback here, as we might be called from
1048 * within a different tevent_context than state->ev
1050 tevent_req_defer_callback(req, state->ev);
1052 state->fde = messaging_dgm_register_tevent_context(state, ev);
1053 if (tevent_req_nomem(state->fde, req)) {
1054 return tevent_req_post(req, ev);
1057 if (lp_clustering()) {
1058 state->cluster_fde =
1059 messaging_ctdb_register_tevent_context(state, ev);
1060 if (tevent_req_nomem(state->cluster_fde, req)) {
1061 return tevent_req_post(req, ev);
1066 * We add ourselves to the "new_waiters" array, not the "waiters"
1067 * array. If we are called from within messaging_read_done,
1068 * messaging_dispatch_rec will be in an active for-loop on
1069 * "waiters". We must be careful not to mess with this array, because
1070 * it could mean that a single event is being delivered twice.
1073 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1075 if (new_waiters_len == msg_ctx->num_new_waiters) {
1076 struct tevent_req **tmp;
1078 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1079 struct tevent_req *, new_waiters_len+1);
1080 if (tevent_req_nomem(tmp, req)) {
1081 return tevent_req_post(req, ev);
1083 msg_ctx->new_waiters = tmp;
1086 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1087 msg_ctx->num_new_waiters += 1;
1088 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1090 ok = messaging_register_event_context(msg_ctx, ev);
1091 if (!ok) {
1092 tevent_req_oom(req);
1093 return tevent_req_post(req, ev);
1096 return req;
1099 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1100 enum tevent_req_state req_state)
1102 struct messaging_filtered_read_state *state = tevent_req_data(
1103 req, struct messaging_filtered_read_state);
1104 struct messaging_context *msg_ctx = state->msg_ctx;
1105 size_t i;
1106 bool ok;
1108 tevent_req_set_cleanup_fn(req, NULL);
1110 TALLOC_FREE(state->fde);
1111 TALLOC_FREE(state->cluster_fde);
1113 ok = messaging_deregister_event_context(msg_ctx, state->ev);
1114 if (!ok) {
1115 abort();
1119 * Just set the [new_]waiters entry to NULL, be careful not to mess
1120 * with the other "waiters" array contents. We are often called from
1121 * within "messaging_dispatch_rec", which loops over
1122 * "waiters". Messing with the "waiters" array will mess up that
1123 * for-loop.
1126 for (i=0; i<msg_ctx->num_waiters; i++) {
1127 if (msg_ctx->waiters[i] == req) {
1128 msg_ctx->waiters[i] = NULL;
1129 return;
1133 for (i=0; i<msg_ctx->num_new_waiters; i++) {
1134 if (msg_ctx->new_waiters[i] == req) {
1135 msg_ctx->new_waiters[i] = NULL;
1136 return;
1141 static void messaging_filtered_read_done(struct tevent_req *req,
1142 struct messaging_rec *rec)
1144 struct messaging_filtered_read_state *state = tevent_req_data(
1145 req, struct messaging_filtered_read_state);
1147 state->rec = messaging_rec_dup(state, rec);
1148 if (tevent_req_nomem(state->rec, req)) {
1149 return;
1151 tevent_req_done(req);
1154 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1155 struct messaging_rec **presult)
1157 struct messaging_filtered_read_state *state = tevent_req_data(
1158 req, struct messaging_filtered_read_state);
1159 int err;
1161 if (tevent_req_is_unix_error(req, &err)) {
1162 tevent_req_received(req);
1163 return err;
1165 if (presult != NULL) {
1166 *presult = talloc_move(mem_ctx, &state->rec);
1168 tevent_req_received(req);
1169 return 0;
1172 struct messaging_read_state {
1173 uint32_t msg_type;
1174 struct messaging_rec *rec;
1177 static bool messaging_read_filter(struct messaging_rec *rec,
1178 void *private_data);
1179 static void messaging_read_done(struct tevent_req *subreq);
1181 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1182 struct tevent_context *ev,
1183 struct messaging_context *msg,
1184 uint32_t msg_type)
1186 struct tevent_req *req, *subreq;
1187 struct messaging_read_state *state;
1189 req = tevent_req_create(mem_ctx, &state,
1190 struct messaging_read_state);
1191 if (req == NULL) {
1192 return NULL;
1194 state->msg_type = msg_type;
1196 subreq = messaging_filtered_read_send(state, ev, msg,
1197 messaging_read_filter, state);
1198 if (tevent_req_nomem(subreq, req)) {
1199 return tevent_req_post(req, ev);
1201 tevent_req_set_callback(subreq, messaging_read_done, req);
1202 return req;
1205 static bool messaging_read_filter(struct messaging_rec *rec,
1206 void *private_data)
1208 struct messaging_read_state *state = talloc_get_type_abort(
1209 private_data, struct messaging_read_state);
1211 if (rec->num_fds != 0) {
1212 return false;
1215 return rec->msg_type == state->msg_type;
1218 static void messaging_read_done(struct tevent_req *subreq)
1220 struct tevent_req *req = tevent_req_callback_data(
1221 subreq, struct tevent_req);
1222 struct messaging_read_state *state = tevent_req_data(
1223 req, struct messaging_read_state);
1224 int ret;
1226 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1227 TALLOC_FREE(subreq);
1228 if (tevent_req_error(req, ret)) {
1229 return;
1231 tevent_req_done(req);
1234 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1235 struct messaging_rec **presult)
1237 struct messaging_read_state *state = tevent_req_data(
1238 req, struct messaging_read_state);
1239 int err;
1241 if (tevent_req_is_unix_error(req, &err)) {
1242 return err;
1244 if (presult != NULL) {
1245 *presult = talloc_move(mem_ctx, &state->rec);
1247 return 0;
1250 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1252 if (msg_ctx->num_new_waiters == 0) {
1253 return true;
1256 if (talloc_array_length(msg_ctx->waiters) <
1257 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1258 struct tevent_req **tmp;
1259 tmp = talloc_realloc(
1260 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1261 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1262 if (tmp == NULL) {
1263 DEBUG(1, ("%s: talloc failed\n", __func__));
1264 return false;
1266 msg_ctx->waiters = tmp;
1269 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1270 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1272 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1273 msg_ctx->num_new_waiters = 0;
1275 return true;
1278 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1279 struct messaging_rec *rec)
1281 struct messaging_callback *cb, *next;
1283 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1284 size_t j;
1286 next = cb->next;
1287 if (cb->msg_type != rec->msg_type) {
1288 continue;
1292 * the old style callbacks don't support fd passing
1294 for (j=0; j < rec->num_fds; j++) {
1295 int fd = rec->fds[j];
1296 close(fd);
1298 rec->num_fds = 0;
1299 rec->fds = NULL;
1301 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1302 rec->src, &rec->buf);
1304 return true;
1307 return false;
1310 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1311 struct tevent_context *ev,
1312 struct messaging_rec *rec)
1314 size_t i;
1316 if (!messaging_append_new_waiters(msg_ctx)) {
1317 return false;
1320 i = 0;
1321 while (i < msg_ctx->num_waiters) {
1322 struct tevent_req *req;
1323 struct messaging_filtered_read_state *state;
1325 req = msg_ctx->waiters[i];
1326 if (req == NULL) {
1328 * This got cleaned up. In the meantime,
1329 * move everything down one. We need
1330 * to keep the order of waiters, as
1331 * other code may depend on this.
1333 ARRAY_DEL_ELEMENT(
1334 msg_ctx->waiters, i, msg_ctx->num_waiters);
1335 msg_ctx->num_waiters -= 1;
1336 continue;
1339 state = tevent_req_data(
1340 req, struct messaging_filtered_read_state);
1341 if ((ev == state->ev) &&
1342 state->filter(rec, state->private_data)) {
1343 messaging_filtered_read_done(req, rec);
1344 return true;
1347 i += 1;
1350 return false;
1354 Dispatch one messaging_rec
1356 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1357 struct tevent_context *ev,
1358 struct messaging_rec *rec)
1360 bool consumed;
1361 size_t i;
1363 if (ev == msg_ctx->event_ctx) {
1364 consumed = messaging_dispatch_classic(msg_ctx, rec);
1365 if (consumed) {
1366 return;
1370 consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1371 if (consumed) {
1372 return;
1375 if (ev != msg_ctx->event_ctx) {
1376 struct iovec iov;
1377 int fds[MAX(1, rec->num_fds)];
1378 int ret;
1381 * We've been listening on a nested event
1382 * context. Messages need to be handled in the main
1383 * event context, so post to ourselves
1386 iov.iov_base = rec->buf.data;
1387 iov.iov_len = rec->buf.length;
1389 for (i=0; i<rec->num_fds; i++) {
1390 fds[i] = rec->fds[i];
1393 ret = messaging_post_self(
1394 msg_ctx, rec->src, rec->dest, rec->msg_type,
1395 &iov, 1, fds, rec->num_fds);
1396 if (ret == 0) {
1397 return;
1402 static int mess_parent_dgm_cleanup(void *private_data);
1403 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1405 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1407 struct tevent_req *req;
1409 req = background_job_send(
1410 msg, msg->event_ctx, msg, NULL, 0,
1411 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1412 60*15),
1413 mess_parent_dgm_cleanup, msg);
1414 if (req == NULL) {
1415 DBG_WARNING("background_job_send failed\n");
1416 return false;
1418 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1419 return true;
1422 static int mess_parent_dgm_cleanup(void *private_data)
1424 int ret;
1426 ret = messaging_dgm_wipe();
1427 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1428 ret ? strerror(ret) : "ok"));
1429 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1430 60*15);
1433 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1435 struct messaging_context *msg = tevent_req_callback_data(
1436 req, struct messaging_context);
1437 NTSTATUS status;
1439 status = background_job_recv(req);
1440 TALLOC_FREE(req);
1441 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1442 nt_errstr(status)));
1444 req = background_job_send(
1445 msg, msg->event_ctx, msg, NULL, 0,
1446 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1447 60*15),
1448 mess_parent_dgm_cleanup, msg);
1449 if (req == NULL) {
1450 DEBUG(1, ("background_job_send failed\n"));
1451 return;
1453 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1456 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1458 int ret;
1460 if (pid == 0) {
1461 ret = messaging_dgm_wipe();
1462 } else {
1463 ret = messaging_dgm_cleanup(pid);
1466 return ret;
1469 struct tevent_context *messaging_tevent_context(
1470 struct messaging_context *msg_ctx)
1472 return msg_ctx->event_ctx;
1475 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1477 return msg_ctx->names_db;
1480 /** @} **/