smbd: Fix cached dos attributes
[Samba.git] / source3 / lib / messages.c
blobb856a2889b1f5bb750e71dd0c3c0493b01ff3b60
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messaging/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messaging/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
65 #include "source3/lib/tallocmsg.h"
67 #ifdef CLUSTER_SUPPORT
68 #include "ctdb_protocol.h"
69 #endif
71 struct messaging_callback {
72 struct messaging_callback *prev, *next;
73 uint32_t msg_type;
74 void (*fn)(struct messaging_context *msg, void *private_data,
75 uint32_t msg_type,
76 struct server_id server_id, DATA_BLOB *data);
77 void *private_data;
80 struct messaging_registered_ev {
81 struct tevent_context *ev;
82 struct tevent_immediate *im;
83 size_t refcount;
86 struct messaging_context {
87 struct server_id id;
88 struct tevent_context *event_ctx;
89 struct messaging_callback *callbacks;
91 struct messaging_rec *posted_msgs;
93 struct messaging_registered_ev *event_contexts;
95 struct tevent_req **new_waiters;
96 size_t num_new_waiters;
98 struct tevent_req **waiters;
99 size_t num_waiters;
101 struct server_id_db *names_db;
103 TALLOC_CTX *per_process_talloc_ctx;
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107 struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109 struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111 struct tevent_context *ev,
112 struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114 struct tevent_context *ev,
115 struct messaging_rec *rec);
117 /****************************************************************************
118 A useful function for testing the message system.
119 ****************************************************************************/
121 static void ping_message(struct messaging_context *msg_ctx,
122 void *private_data,
123 uint32_t msg_type,
124 struct server_id src,
125 DATA_BLOB *data)
127 struct server_id_buf idbuf;
129 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130 server_id_str_buf(src, &idbuf), (int)data->length,
131 data->data ? (char *)data->data : ""));
133 messaging_send(msg_ctx, src, MSG_PONG, data);
136 struct messaging_rec *messaging_rec_create(
137 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138 uint32_t msg_type, const struct iovec *iov, int iovlen,
139 const int *fds, size_t num_fds)
141 ssize_t buflen;
142 uint8_t *buf;
143 struct messaging_rec *result;
145 if (num_fds > INT8_MAX) {
146 return NULL;
149 buflen = iov_buflen(iov, iovlen);
150 if (buflen == -1) {
151 return NULL;
153 buf = talloc_array(mem_ctx, uint8_t, buflen);
154 if (buf == NULL) {
155 return NULL;
157 iov_buf(iov, iovlen, buf, buflen);
160 struct messaging_rec rec;
161 int64_t fds64[MAX(1, num_fds)];
162 size_t i;
164 for (i=0; i<num_fds; i++) {
165 fds64[i] = fds[i];
168 rec = (struct messaging_rec) {
169 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170 .src = src, .dest = dst,
171 .buf.data = buf, .buf.length = buflen,
172 .num_fds = num_fds, .fds = fds64,
175 result = messaging_rec_dup(mem_ctx, &rec);
178 TALLOC_FREE(buf);
180 return result;
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184 struct tevent_context *ev)
186 size_t i, num_event_contexts;
187 struct messaging_registered_ev *free_reg = NULL;
188 struct messaging_registered_ev *tmp;
190 num_event_contexts = talloc_array_length(ctx->event_contexts);
192 for (i=0; i<num_event_contexts; i++) {
193 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
195 if (reg->refcount == 0) {
196 if (reg->ev != NULL) {
197 abort();
199 free_reg = reg;
201 * We continue here and may find another
202 * free_req, but the important thing is
203 * that we continue to search for an
204 * existing registration in the loop.
206 continue;
209 if (reg->ev == ev) {
210 reg->refcount += 1;
211 return true;
215 if (free_reg == NULL) {
216 struct tevent_immediate *im = NULL;
218 im = tevent_create_immediate(ctx);
219 if (im == NULL) {
220 return false;
223 tmp = talloc_realloc(ctx, ctx->event_contexts,
224 struct messaging_registered_ev,
225 num_event_contexts+1);
226 if (tmp == NULL) {
227 return false;
229 ctx->event_contexts = tmp;
231 free_reg = &ctx->event_contexts[num_event_contexts];
232 free_reg->im = talloc_move(ctx->event_contexts, &im);
236 * free_reg->im might be cached
238 free_reg->ev = ev;
239 free_reg->refcount = 1;
241 return true;
244 static bool messaging_deregister_event_context(struct messaging_context *ctx,
245 struct tevent_context *ev)
247 size_t i, num_event_contexts;
249 num_event_contexts = talloc_array_length(ctx->event_contexts);
251 for (i=0; i<num_event_contexts; i++) {
252 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
254 if (reg->refcount == 0) {
255 continue;
258 if (reg->ev == ev) {
259 reg->refcount -= 1;
261 if (reg->refcount == 0) {
263 * The primary event context
264 * is never unregistered using
265 * messaging_deregister_event_context()
266 * it's only registered using
267 * messaging_register_event_context().
269 SMB_ASSERT(ev != ctx->event_ctx);
270 SMB_ASSERT(reg->ev != ctx->event_ctx);
273 * Not strictly necessary, just
274 * paranoia
276 reg->ev = NULL;
279 * Do not talloc_free(reg->im),
280 * recycle immediates events.
282 * We just invalidate it using
283 * the primary event context,
284 * which is never unregistered.
286 tevent_schedule_immediate(reg->im,
287 ctx->event_ctx,
288 NULL, NULL);
290 return true;
293 return false;
296 static void messaging_post_main_event_context(struct tevent_context *ev,
297 struct tevent_immediate *im,
298 void *private_data)
300 struct messaging_context *ctx = talloc_get_type_abort(
301 private_data, struct messaging_context);
303 while (ctx->posted_msgs != NULL) {
304 struct messaging_rec *rec = ctx->posted_msgs;
305 bool consumed;
307 DLIST_REMOVE(ctx->posted_msgs, rec);
309 consumed = messaging_dispatch_classic(ctx, rec);
310 if (!consumed) {
311 consumed = messaging_dispatch_waiters(
312 ctx, ctx->event_ctx, rec);
315 if (!consumed) {
316 uint8_t i;
318 for (i=0; i<rec->num_fds; i++) {
319 close(rec->fds[i]);
323 TALLOC_FREE(rec);
327 static void messaging_post_sub_event_context(struct tevent_context *ev,
328 struct tevent_immediate *im,
329 void *private_data)
331 struct messaging_context *ctx = talloc_get_type_abort(
332 private_data, struct messaging_context);
333 struct messaging_rec *rec, *next;
335 for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
336 bool consumed;
338 next = rec->next;
340 consumed = messaging_dispatch_waiters(ctx, ev, rec);
341 if (consumed) {
342 DLIST_REMOVE(ctx->posted_msgs, rec);
343 TALLOC_FREE(rec);
348 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
350 size_t i, num_event_contexts;
352 num_event_contexts = talloc_array_length(ctx->event_contexts);
354 for (i=0; i<num_event_contexts; i++) {
355 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
357 if (reg->refcount == 0) {
358 continue;
362 * We depend on schedule_immediate to work
363 * multiple times. Might be a bit inefficient,
364 * but this needs to be proven in tests. The
365 * alternatively would be to track whether the
366 * immediate has already been scheduled. For
367 * now, avoid that complexity here.
370 if (reg->ev == ctx->event_ctx) {
371 tevent_schedule_immediate(
372 reg->im, reg->ev,
373 messaging_post_main_event_context,
374 ctx);
375 } else {
376 tevent_schedule_immediate(
377 reg->im, reg->ev,
378 messaging_post_sub_event_context,
379 ctx);
383 return true;
386 static void messaging_recv_cb(struct tevent_context *ev,
387 const uint8_t *msg, size_t msg_len,
388 int *fds, size_t num_fds,
389 void *private_data)
391 struct messaging_context *msg_ctx = talloc_get_type_abort(
392 private_data, struct messaging_context);
393 struct server_id_buf idbuf;
394 struct messaging_rec rec;
395 int64_t fds64[MAX(1, MIN(num_fds, INT8_MAX))];
396 size_t i;
398 if (msg_len < MESSAGE_HDR_LENGTH) {
399 DBG_WARNING("message too short: %zu\n", msg_len);
400 return;
403 if (num_fds > INT8_MAX) {
404 DBG_WARNING("too many fds: %zu\n", num_fds);
405 return;
408 for (i=0; i < num_fds; i++) {
409 fds64[i] = fds[i];
412 rec = (struct messaging_rec) {
413 .msg_version = MESSAGE_VERSION,
414 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
415 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
416 .num_fds = num_fds,
417 .fds = fds64,
420 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
422 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
423 (unsigned)rec.msg_type, rec.buf.length, num_fds,
424 server_id_str_buf(rec.src, &idbuf));
426 if (server_id_same_process(&rec.src, &msg_ctx->id)) {
427 DBG_DEBUG("Ignoring self-send\n");
428 return;
431 messaging_dispatch_rec(msg_ctx, ev, &rec);
433 for (i=0; i<num_fds; i++) {
434 fds[i] = fds64[i];
438 static int messaging_context_destructor(struct messaging_context *ctx)
440 size_t i;
442 for (i=0; i<ctx->num_new_waiters; i++) {
443 if (ctx->new_waiters[i] != NULL) {
444 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
445 ctx->new_waiters[i] = NULL;
448 for (i=0; i<ctx->num_waiters; i++) {
449 if (ctx->waiters[i] != NULL) {
450 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
451 ctx->waiters[i] = NULL;
456 * The immediates from messaging_alert_event_contexts
457 * reference "ctx". Don't let them outlive the
458 * messaging_context we're destroying here.
460 TALLOC_FREE(ctx->event_contexts);
462 return 0;
465 static const char *private_path(const char *name)
467 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
470 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
471 struct tevent_context *ev,
472 struct messaging_context **pmsg_ctx)
474 TALLOC_CTX *frame;
475 struct messaging_context *ctx;
476 NTSTATUS status;
477 int ret;
478 const char *lck_path;
479 const char *priv_path;
480 void *ref;
481 bool ok;
484 * sec_init() *must* be called before any other
485 * functions that use sec_XXX(). e.g. sec_initial_uid().
488 sec_init();
490 lck_path = lock_path(talloc_tos(), "msg.lock");
491 if (lck_path == NULL) {
492 return NT_STATUS_NO_MEMORY;
495 ok = directory_create_or_exist_strict(lck_path,
496 sec_initial_uid(),
497 0755);
498 if (!ok) {
499 DBG_DEBUG("Could not create lock directory: %s\n",
500 strerror(errno));
501 return NT_STATUS_ACCESS_DENIED;
504 priv_path = private_path("msg.sock");
505 if (priv_path == NULL) {
506 return NT_STATUS_NO_MEMORY;
509 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
510 0700);
511 if (!ok) {
512 DBG_DEBUG("Could not create msg directory: %s\n",
513 strerror(errno));
514 return NT_STATUS_ACCESS_DENIED;
517 frame = talloc_stackframe();
518 if (frame == NULL) {
519 return NT_STATUS_NO_MEMORY;
522 ctx = talloc_zero(frame, struct messaging_context);
523 if (ctx == NULL) {
524 status = NT_STATUS_NO_MEMORY;
525 goto done;
528 ctx->id = (struct server_id) {
529 .pid = tevent_cached_getpid(), .vnn = NONCLUSTER_VNN
532 ctx->event_ctx = ev;
534 ctx->per_process_talloc_ctx = talloc_new(ctx);
535 if (ctx->per_process_talloc_ctx == NULL) {
536 status = NT_STATUS_NO_MEMORY;
537 goto done;
540 ok = messaging_register_event_context(ctx, ev);
541 if (!ok) {
542 status = NT_STATUS_NO_MEMORY;
543 goto done;
546 ref = messaging_dgm_ref(
547 ctx->per_process_talloc_ctx,
548 ctx->event_ctx,
549 &ctx->id.unique_id,
550 priv_path,
551 lck_path,
552 messaging_recv_cb,
553 ctx,
554 &ret);
555 if (ref == NULL) {
556 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
557 status = map_nt_error_from_unix(ret);
558 goto done;
560 talloc_set_destructor(ctx, messaging_context_destructor);
562 #ifdef CLUSTER_SUPPORT
563 if (lp_clustering()) {
564 ref = messaging_ctdb_ref(
565 ctx->per_process_talloc_ctx,
566 ctx->event_ctx,
567 lp_ctdbd_socket(),
568 lp_ctdb_timeout(),
569 ctx->id.unique_id,
570 messaging_recv_cb,
571 ctx,
572 &ret);
573 if (ref == NULL) {
574 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
575 strerror(ret));
576 status = map_nt_error_from_unix(ret);
577 goto done;
580 #endif
582 ctx->id.vnn = get_my_vnn();
584 ctx->names_db = server_id_db_init(ctx,
585 ctx->id,
586 lp_lock_directory(),
588 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
589 if (ctx->names_db == NULL) {
590 DBG_DEBUG("server_id_db_init failed\n");
591 status = NT_STATUS_NO_MEMORY;
592 goto done;
595 messaging_register(ctx, NULL, MSG_PING, ping_message);
597 /* Register some debugging related messages */
599 register_msg_pool_usage(ctx->per_process_talloc_ctx, ctx);
600 register_dmalloc_msgs(ctx);
601 debug_register_msgs(ctx);
604 struct server_id_buf tmp;
605 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
608 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
610 status = NT_STATUS_OK;
611 done:
612 TALLOC_FREE(frame);
614 return status;
617 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
618 struct tevent_context *ev)
620 struct messaging_context *ctx = NULL;
621 NTSTATUS status;
623 status = messaging_init_internal(mem_ctx,
625 &ctx);
626 if (!NT_STATUS_IS_OK(status)) {
627 return NULL;
630 return ctx;
633 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
635 return msg_ctx->id;
639 * re-init after a fork
641 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
643 int ret;
644 char *lck_path;
645 void *ref;
647 TALLOC_FREE(msg_ctx->per_process_talloc_ctx);
649 msg_ctx->per_process_talloc_ctx = talloc_new(msg_ctx);
650 if (msg_ctx->per_process_talloc_ctx == NULL) {
651 return NT_STATUS_NO_MEMORY;
654 msg_ctx->id = (struct server_id) {
655 .pid = tevent_cached_getpid(), .vnn = msg_ctx->id.vnn
658 lck_path = lock_path(talloc_tos(), "msg.lock");
659 if (lck_path == NULL) {
660 return NT_STATUS_NO_MEMORY;
663 ref = messaging_dgm_ref(
664 msg_ctx->per_process_talloc_ctx,
665 msg_ctx->event_ctx,
666 &msg_ctx->id.unique_id,
667 private_path("msg.sock"),
668 lck_path,
669 messaging_recv_cb,
670 msg_ctx,
671 &ret);
673 if (ref == NULL) {
674 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
675 return map_nt_error_from_unix(ret);
678 if (lp_clustering()) {
679 ref = messaging_ctdb_ref(
680 msg_ctx->per_process_talloc_ctx,
681 msg_ctx->event_ctx,
682 lp_ctdbd_socket(),
683 lp_ctdb_timeout(),
684 msg_ctx->id.unique_id,
685 messaging_recv_cb,
686 msg_ctx,
687 &ret);
688 if (ref == NULL) {
689 DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
690 strerror(ret));
691 return map_nt_error_from_unix(ret);
695 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
696 register_msg_pool_usage(msg_ctx->per_process_talloc_ctx, msg_ctx);
698 return NT_STATUS_OK;
703 * Register a dispatch function for a particular message type. Allow multiple
704 * registrants
706 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
707 void *private_data,
708 uint32_t msg_type,
709 void (*fn)(struct messaging_context *msg,
710 void *private_data,
711 uint32_t msg_type,
712 struct server_id server_id,
713 DATA_BLOB *data))
715 struct messaging_callback *cb;
717 DEBUG(5, ("Registering messaging pointer for type %u - "
718 "private_data=%p\n",
719 (unsigned)msg_type, private_data));
722 * Only one callback per type
725 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
726 /* we allow a second registration of the same message
727 type if it has a different private pointer. This is
728 needed in, for example, the internal notify code,
729 which creates a new notify context for each tree
730 connect, and expects to receive messages to each of
731 them. */
732 if (cb->msg_type == msg_type && private_data == cb->private_data) {
733 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
734 (unsigned)msg_type, private_data));
735 cb->fn = fn;
736 cb->private_data = private_data;
737 return NT_STATUS_OK;
741 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
742 return NT_STATUS_NO_MEMORY;
745 cb->msg_type = msg_type;
746 cb->fn = fn;
747 cb->private_data = private_data;
749 DLIST_ADD(msg_ctx->callbacks, cb);
750 return NT_STATUS_OK;
754 De-register the function for a particular message type.
756 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
757 void *private_data)
759 struct messaging_callback *cb, *next;
761 for (cb = ctx->callbacks; cb; cb = next) {
762 next = cb->next;
763 if ((cb->msg_type == msg_type)
764 && (cb->private_data == private_data)) {
765 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
766 (unsigned)msg_type, private_data));
767 DLIST_REMOVE(ctx->callbacks, cb);
768 TALLOC_FREE(cb);
774 Send a message to a particular server
776 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
777 struct server_id server, uint32_t msg_type,
778 const DATA_BLOB *data)
780 struct iovec iov = {0};
782 if (data != NULL) {
783 iov.iov_base = data->data;
784 iov.iov_len = data->length;
787 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
790 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
791 struct server_id server, uint32_t msg_type,
792 const uint8_t *buf, size_t len)
794 DATA_BLOB blob = data_blob_const(buf, len);
795 return messaging_send(msg_ctx, server, msg_type, &blob);
798 static int messaging_post_self(struct messaging_context *msg_ctx,
799 struct server_id src, struct server_id dst,
800 uint32_t msg_type,
801 const struct iovec *iov, int iovlen,
802 const int *fds, size_t num_fds)
804 struct messaging_rec *rec;
805 bool ok;
807 rec = messaging_rec_create(
808 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
809 if (rec == NULL) {
810 return ENOMEM;
813 ok = messaging_alert_event_contexts(msg_ctx);
814 if (!ok) {
815 TALLOC_FREE(rec);
816 return ENOMEM;
819 DLIST_ADD_END(msg_ctx->posted_msgs, rec);
821 return 0;
824 int messaging_send_iov_from(struct messaging_context *msg_ctx,
825 struct server_id src, struct server_id dst,
826 uint32_t msg_type,
827 const struct iovec *iov, int iovlen,
828 const int *fds, size_t num_fds)
830 int ret;
831 uint8_t hdr[MESSAGE_HDR_LENGTH];
832 struct iovec iov2[iovlen+1];
834 if (server_id_is_disconnected(&dst)) {
835 return EINVAL;
838 if (num_fds > INT8_MAX) {
839 return EINVAL;
842 if (server_id_equal(&dst, &msg_ctx->id)) {
843 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
844 iov, iovlen, fds, num_fds);
845 return ret;
848 message_hdr_put(hdr, msg_type, src, dst);
849 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
850 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
852 if (dst.vnn != msg_ctx->id.vnn) {
853 if (num_fds > 0) {
854 return ENOSYS;
857 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
858 return ret;
861 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
863 if (ret == EACCES) {
864 become_root();
865 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
866 fds, num_fds);
867 unbecome_root();
870 if (ret == ECONNREFUSED) {
872 * Linux returns this when a socket exists in the file
873 * system without a listening process. This is not
874 * documented in susv4 or the linux manpages, but it's
875 * easily testable. For the higher levels this is the
876 * same as "destination does not exist"
878 ret = ENOENT;
881 return ret;
884 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
885 struct server_id dst, uint32_t msg_type,
886 const struct iovec *iov, int iovlen,
887 const int *fds, size_t num_fds)
889 int ret;
891 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
892 iov, iovlen, fds, num_fds);
893 if (ret != 0) {
894 return map_nt_error_from_unix(ret);
896 return NT_STATUS_OK;
899 struct send_all_state {
900 struct messaging_context *msg_ctx;
901 int msg_type;
902 const void *buf;
903 size_t len;
906 static int send_all_fn(pid_t pid, void *private_data)
908 struct send_all_state *state = private_data;
909 NTSTATUS status;
911 if (pid == tevent_cached_getpid()) {
912 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
913 return 0;
916 status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
917 state->msg_type, state->buf, state->len);
918 if (!NT_STATUS_IS_OK(status)) {
919 DBG_NOTICE("messaging_send_buf to %ju failed: %s\n",
920 (uintmax_t)pid, nt_errstr(status));
923 return 0;
926 void messaging_send_all(struct messaging_context *msg_ctx,
927 int msg_type, const void *buf, size_t len)
929 struct send_all_state state = {
930 .msg_ctx = msg_ctx, .msg_type = msg_type,
931 .buf = buf, .len = len
933 int ret;
935 #ifdef CLUSTER_SUPPORT
936 if (lp_clustering()) {
937 struct ctdbd_connection *conn = messaging_ctdb_connection();
938 uint8_t msghdr[MESSAGE_HDR_LENGTH];
939 struct iovec iov[] = {
940 { .iov_base = msghdr,
941 .iov_len = sizeof(msghdr) },
942 { .iov_base = discard_const_p(void, buf),
943 .iov_len = len }
946 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
947 (struct server_id) {0});
949 ret = ctdbd_messaging_send_iov(
950 conn, CTDB_BROADCAST_CONNECTED,
951 CTDB_SRVID_SAMBA_PROCESS,
952 iov, ARRAY_SIZE(iov));
953 if (ret != 0) {
954 DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
955 strerror(ret));
958 return;
960 #endif
962 ret = messaging_dgm_forall(send_all_fn, &state);
963 if (ret != 0) {
964 DBG_WARNING("messaging_dgm_forall failed: %s\n",
965 strerror(ret));
969 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
970 struct messaging_rec *rec)
972 struct messaging_rec *result;
973 size_t fds_size = sizeof(int64_t) * rec->num_fds;
974 size_t payload_len;
976 payload_len = rec->buf.length + fds_size;
977 if (payload_len < rec->buf.length) {
978 /* overflow */
979 return NULL;
982 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
983 payload_len);
984 if (result == NULL) {
985 return NULL;
987 *result = *rec;
989 /* Doesn't fail, see talloc_pooled_object */
991 result->buf.data = talloc_memdup(result, rec->buf.data,
992 rec->buf.length);
994 result->fds = NULL;
995 if (result->num_fds > 0) {
996 size_t i;
998 result->fds = talloc_memdup(result, rec->fds, fds_size);
1000 for (i=0; i<rec->num_fds; i++) {
1002 * fd's can only exist once
1004 rec->fds[i] = -1;
1008 return result;
1011 struct messaging_filtered_read_state {
1012 struct tevent_context *ev;
1013 struct messaging_context *msg_ctx;
1014 struct messaging_dgm_fde *fde;
1015 struct messaging_ctdb_fde *cluster_fde;
1017 bool (*filter)(struct messaging_rec *rec, void *private_data);
1018 void *private_data;
1020 struct messaging_rec *rec;
1023 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1024 enum tevent_req_state req_state);
1026 struct tevent_req *messaging_filtered_read_send(
1027 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1028 struct messaging_context *msg_ctx,
1029 bool (*filter)(struct messaging_rec *rec, void *private_data),
1030 void *private_data)
1032 struct tevent_req *req;
1033 struct messaging_filtered_read_state *state;
1034 size_t new_waiters_len;
1035 bool ok;
1037 req = tevent_req_create(mem_ctx, &state,
1038 struct messaging_filtered_read_state);
1039 if (req == NULL) {
1040 return NULL;
1042 state->ev = ev;
1043 state->msg_ctx = msg_ctx;
1044 state->filter = filter;
1045 state->private_data = private_data;
1048 * We have to defer the callback here, as we might be called from
1049 * within a different tevent_context than state->ev
1051 tevent_req_defer_callback(req, state->ev);
1053 state->fde = messaging_dgm_register_tevent_context(state, ev);
1054 if (tevent_req_nomem(state->fde, req)) {
1055 return tevent_req_post(req, ev);
1058 if (lp_clustering()) {
1059 state->cluster_fde =
1060 messaging_ctdb_register_tevent_context(state, ev);
1061 if (tevent_req_nomem(state->cluster_fde, req)) {
1062 return tevent_req_post(req, ev);
1067 * We add ourselves to the "new_waiters" array, not the "waiters"
1068 * array. If we are called from within messaging_read_done,
1069 * messaging_dispatch_rec will be in an active for-loop on
1070 * "waiters". We must be careful not to mess with this array, because
1071 * it could mean that a single event is being delivered twice.
1074 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1076 if (new_waiters_len == msg_ctx->num_new_waiters) {
1077 struct tevent_req **tmp;
1079 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1080 struct tevent_req *, new_waiters_len+1);
1081 if (tevent_req_nomem(tmp, req)) {
1082 return tevent_req_post(req, ev);
1084 msg_ctx->new_waiters = tmp;
1087 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1088 msg_ctx->num_new_waiters += 1;
1089 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1091 ok = messaging_register_event_context(msg_ctx, ev);
1092 if (!ok) {
1093 tevent_req_oom(req);
1094 return tevent_req_post(req, ev);
1097 return req;
1100 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1101 enum tevent_req_state req_state)
1103 struct messaging_filtered_read_state *state = tevent_req_data(
1104 req, struct messaging_filtered_read_state);
1105 struct messaging_context *msg_ctx = state->msg_ctx;
1106 size_t i;
1107 bool ok;
1109 tevent_req_set_cleanup_fn(req, NULL);
1111 TALLOC_FREE(state->fde);
1112 TALLOC_FREE(state->cluster_fde);
1114 ok = messaging_deregister_event_context(msg_ctx, state->ev);
1115 if (!ok) {
1116 abort();
1120 * Just set the [new_]waiters entry to NULL, be careful not to mess
1121 * with the other "waiters" array contents. We are often called from
1122 * within "messaging_dispatch_rec", which loops over
1123 * "waiters". Messing with the "waiters" array will mess up that
1124 * for-loop.
1127 for (i=0; i<msg_ctx->num_waiters; i++) {
1128 if (msg_ctx->waiters[i] == req) {
1129 msg_ctx->waiters[i] = NULL;
1130 return;
1134 for (i=0; i<msg_ctx->num_new_waiters; i++) {
1135 if (msg_ctx->new_waiters[i] == req) {
1136 msg_ctx->new_waiters[i] = NULL;
1137 return;
1142 static void messaging_filtered_read_done(struct tevent_req *req,
1143 struct messaging_rec *rec)
1145 struct messaging_filtered_read_state *state = tevent_req_data(
1146 req, struct messaging_filtered_read_state);
1148 state->rec = messaging_rec_dup(state, rec);
1149 if (tevent_req_nomem(state->rec, req)) {
1150 return;
1152 tevent_req_done(req);
1155 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1156 struct messaging_rec **presult)
1158 struct messaging_filtered_read_state *state = tevent_req_data(
1159 req, struct messaging_filtered_read_state);
1160 int err;
1162 if (tevent_req_is_unix_error(req, &err)) {
1163 tevent_req_received(req);
1164 return err;
1166 if (presult != NULL) {
1167 *presult = talloc_move(mem_ctx, &state->rec);
1169 tevent_req_received(req);
1170 return 0;
1173 struct messaging_read_state {
1174 uint32_t msg_type;
1175 struct messaging_rec *rec;
1178 static bool messaging_read_filter(struct messaging_rec *rec,
1179 void *private_data);
1180 static void messaging_read_done(struct tevent_req *subreq);
1182 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1183 struct tevent_context *ev,
1184 struct messaging_context *msg,
1185 uint32_t msg_type)
1187 struct tevent_req *req, *subreq;
1188 struct messaging_read_state *state;
1190 req = tevent_req_create(mem_ctx, &state,
1191 struct messaging_read_state);
1192 if (req == NULL) {
1193 return NULL;
1195 state->msg_type = msg_type;
1197 subreq = messaging_filtered_read_send(state, ev, msg,
1198 messaging_read_filter, state);
1199 if (tevent_req_nomem(subreq, req)) {
1200 return tevent_req_post(req, ev);
1202 tevent_req_set_callback(subreq, messaging_read_done, req);
1203 return req;
1206 static bool messaging_read_filter(struct messaging_rec *rec,
1207 void *private_data)
1209 struct messaging_read_state *state = talloc_get_type_abort(
1210 private_data, struct messaging_read_state);
1212 if (rec->num_fds != 0) {
1213 return false;
1216 return rec->msg_type == state->msg_type;
1219 static void messaging_read_done(struct tevent_req *subreq)
1221 struct tevent_req *req = tevent_req_callback_data(
1222 subreq, struct tevent_req);
1223 struct messaging_read_state *state = tevent_req_data(
1224 req, struct messaging_read_state);
1225 int ret;
1227 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1228 TALLOC_FREE(subreq);
1229 if (tevent_req_error(req, ret)) {
1230 return;
1232 tevent_req_done(req);
1235 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1236 struct messaging_rec **presult)
1238 struct messaging_read_state *state = tevent_req_data(
1239 req, struct messaging_read_state);
1240 int err;
1242 if (tevent_req_is_unix_error(req, &err)) {
1243 return err;
1245 if (presult != NULL) {
1246 *presult = talloc_move(mem_ctx, &state->rec);
1248 return 0;
1251 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1253 if (msg_ctx->num_new_waiters == 0) {
1254 return true;
1257 if (talloc_array_length(msg_ctx->waiters) <
1258 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1259 struct tevent_req **tmp;
1260 tmp = talloc_realloc(
1261 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1262 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1263 if (tmp == NULL) {
1264 DEBUG(1, ("%s: talloc failed\n", __func__));
1265 return false;
1267 msg_ctx->waiters = tmp;
1270 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1271 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1273 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1274 msg_ctx->num_new_waiters = 0;
1276 return true;
1279 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1280 struct messaging_rec *rec)
1282 struct messaging_callback *cb, *next;
1284 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1285 size_t j;
1287 next = cb->next;
1288 if (cb->msg_type != rec->msg_type) {
1289 continue;
1293 * the old style callbacks don't support fd passing
1295 for (j=0; j < rec->num_fds; j++) {
1296 int fd = rec->fds[j];
1297 close(fd);
1299 rec->num_fds = 0;
1300 rec->fds = NULL;
1302 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1303 rec->src, &rec->buf);
1305 return true;
1308 return false;
1311 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1312 struct tevent_context *ev,
1313 struct messaging_rec *rec)
1315 size_t i;
1317 if (!messaging_append_new_waiters(msg_ctx)) {
1318 return false;
1321 i = 0;
1322 while (i < msg_ctx->num_waiters) {
1323 struct tevent_req *req;
1324 struct messaging_filtered_read_state *state;
1326 req = msg_ctx->waiters[i];
1327 if (req == NULL) {
1329 * This got cleaned up. In the meantime,
1330 * move everything down one. We need
1331 * to keep the order of waiters, as
1332 * other code may depend on this.
1334 ARRAY_DEL_ELEMENT(
1335 msg_ctx->waiters, i, msg_ctx->num_waiters);
1336 msg_ctx->num_waiters -= 1;
1337 continue;
1340 state = tevent_req_data(
1341 req, struct messaging_filtered_read_state);
1342 if ((ev == state->ev) &&
1343 state->filter(rec, state->private_data)) {
1344 messaging_filtered_read_done(req, rec);
1345 return true;
1348 i += 1;
1351 return false;
1355 Dispatch one messaging_rec
1357 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1358 struct tevent_context *ev,
1359 struct messaging_rec *rec)
1361 bool consumed;
1362 size_t i;
1364 if (ev == msg_ctx->event_ctx) {
1365 consumed = messaging_dispatch_classic(msg_ctx, rec);
1366 if (consumed) {
1367 return;
1371 consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1372 if (consumed) {
1373 return;
1376 if (ev != msg_ctx->event_ctx) {
1377 struct iovec iov;
1378 int fds[MAX(1, rec->num_fds)];
1379 int ret;
1382 * We've been listening on a nested event
1383 * context. Messages need to be handled in the main
1384 * event context, so post to ourselves
1387 iov.iov_base = rec->buf.data;
1388 iov.iov_len = rec->buf.length;
1390 for (i=0; i<rec->num_fds; i++) {
1391 fds[i] = rec->fds[i];
1394 ret = messaging_post_self(
1395 msg_ctx, rec->src, rec->dest, rec->msg_type,
1396 &iov, 1, fds, rec->num_fds);
1397 if (ret == 0) {
1398 return;
1403 static int mess_parent_dgm_cleanup(void *private_data);
1404 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1406 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1408 struct tevent_req *req;
1410 req = background_job_send(
1411 msg, msg->event_ctx, msg, NULL, 0,
1412 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1413 60*15),
1414 mess_parent_dgm_cleanup, msg);
1415 if (req == NULL) {
1416 DBG_WARNING("background_job_send failed\n");
1417 return false;
1419 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1420 return true;
1423 static int mess_parent_dgm_cleanup(void *private_data)
1425 int ret;
1427 ret = messaging_dgm_wipe();
1428 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1429 ret ? strerror(ret) : "ok"));
1430 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1431 60*15);
1434 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1436 struct messaging_context *msg = tevent_req_callback_data(
1437 req, struct messaging_context);
1438 NTSTATUS status;
1440 status = background_job_recv(req);
1441 TALLOC_FREE(req);
1442 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1443 nt_errstr(status)));
1445 req = background_job_send(
1446 msg, msg->event_ctx, msg, NULL, 0,
1447 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1448 60*15),
1449 mess_parent_dgm_cleanup, msg);
1450 if (req == NULL) {
1451 DEBUG(1, ("background_job_send failed\n"));
1452 return;
1454 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1457 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1459 int ret;
1461 if (pid == 0) {
1462 ret = messaging_dgm_wipe();
1463 } else {
1464 ret = messaging_dgm_cleanup(pid);
1467 return ret;
1470 struct tevent_context *messaging_tevent_context(
1471 struct messaging_context *msg_ctx)
1473 return msg_ctx->event_ctx;
1476 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1478 return msg_ctx->names_db;
1481 /** @} **/