s3: lib: messages: Don't use the result of sec_init() before calling sec_init().
[Samba.git] / source3 / lib / messages.c
blobfca399483296eb61ffa2e1243b4b26f5937a659c
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/messages_ctdbd.h"
57 #include "lib/util/iov_buf.h"
58 #include "lib/util/server_id_db.h"
59 #include "lib/messages_dgm_ref.h"
60 #include "lib/messages_util.h"
62 struct messaging_callback {
63 struct messaging_callback *prev, *next;
64 uint32_t msg_type;
65 void (*fn)(struct messaging_context *msg, void *private_data,
66 uint32_t msg_type,
67 struct server_id server_id, DATA_BLOB *data);
68 void *private_data;
71 struct messaging_context {
72 struct server_id id;
73 struct tevent_context *event_ctx;
74 struct messaging_callback *callbacks;
76 struct tevent_req **new_waiters;
77 size_t num_new_waiters;
79 struct tevent_req **waiters;
80 size_t num_waiters;
82 void *msg_dgm_ref;
83 struct messaging_backend *remote;
85 struct server_id_db *names_db;
88 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
89 struct messaging_rec *rec);
90 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
91 struct tevent_context *ev,
92 struct messaging_rec *rec);
94 /****************************************************************************
95 A useful function for testing the message system.
96 ****************************************************************************/
98 static void ping_message(struct messaging_context *msg_ctx,
99 void *private_data,
100 uint32_t msg_type,
101 struct server_id src,
102 DATA_BLOB *data)
104 struct server_id_buf idbuf;
106 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
107 server_id_str_buf(src, &idbuf), (int)data->length,
108 data->data ? (char *)data->data : ""));
110 messaging_send(msg_ctx, src, MSG_PONG, data);
113 struct messaging_rec *messaging_rec_create(
114 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
115 uint32_t msg_type, const struct iovec *iov, int iovlen,
116 const int *fds, size_t num_fds)
118 ssize_t buflen;
119 uint8_t *buf;
120 struct messaging_rec *result;
122 if (num_fds > INT8_MAX) {
123 return NULL;
126 buflen = iov_buflen(iov, iovlen);
127 if (buflen == -1) {
128 return NULL;
130 buf = talloc_array(mem_ctx, uint8_t, buflen);
131 if (buf == NULL) {
132 return NULL;
134 iov_buf(iov, iovlen, buf, buflen);
137 struct messaging_rec rec;
138 int64_t fds64[num_fds];
139 size_t i;
141 for (i=0; i<num_fds; i++) {
142 fds64[i] = fds[i];
145 rec = (struct messaging_rec) {
146 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
147 .src = src, .dest = dst,
148 .buf.data = buf, .buf.length = buflen,
149 .num_fds = num_fds, .fds = fds64,
152 result = messaging_rec_dup(mem_ctx, &rec);
155 TALLOC_FREE(buf);
157 return result;
160 static void messaging_recv_cb(struct tevent_context *ev,
161 const uint8_t *msg, size_t msg_len,
162 int *fds, size_t num_fds,
163 void *private_data)
165 struct messaging_context *msg_ctx = talloc_get_type_abort(
166 private_data, struct messaging_context);
167 struct server_id_buf idbuf;
168 struct messaging_rec rec;
169 int64_t fds64[MIN(num_fds, INT8_MAX)];
170 size_t i;
172 if (msg_len < MESSAGE_HDR_LENGTH) {
173 DBG_WARNING("message too short: %zu\n", msg_len);
174 goto close_fail;
177 if (num_fds > INT8_MAX) {
178 DBG_WARNING("too many fds: %zu\n", num_fds);
179 goto close_fail;
183 * "consume" the fds by copying them and setting
184 * the original variable to -1
186 for (i=0; i < num_fds; i++) {
187 fds64[i] = fds[i];
188 fds[i] = -1;
191 rec = (struct messaging_rec) {
192 .msg_version = MESSAGE_VERSION,
193 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
194 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
195 .num_fds = num_fds,
196 .fds = fds64,
199 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
201 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
202 (unsigned)rec.msg_type, rec.buf.length, num_fds,
203 server_id_str_buf(rec.src, &idbuf));
205 messaging_dispatch_rec(msg_ctx, ev, &rec);
206 return;
208 close_fail:
209 for (i=0; i < num_fds; i++) {
210 close(fds[i]);
214 static int messaging_context_destructor(struct messaging_context *ctx)
216 size_t i;
218 for (i=0; i<ctx->num_new_waiters; i++) {
219 if (ctx->new_waiters[i] != NULL) {
220 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
221 ctx->new_waiters[i] = NULL;
224 for (i=0; i<ctx->num_waiters; i++) {
225 if (ctx->waiters[i] != NULL) {
226 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
227 ctx->waiters[i] = NULL;
231 return 0;
234 static const char *private_path(const char *name)
236 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
239 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
240 struct tevent_context *ev,
241 struct messaging_context **pmsg_ctx)
243 TALLOC_CTX *frame;
244 struct messaging_context *ctx;
245 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
246 int ret;
247 const char *lck_path;
248 const char *priv_path;
249 bool ok;
252 * sec_init() *must* be called before any other
253 * functions that use sec_XXX(). e.g. sec_initial_uid().
256 sec_init();
258 lck_path = lock_path("msg.lock");
259 if (lck_path == NULL) {
260 return NT_STATUS_NO_MEMORY;
263 ok = directory_create_or_exist_strict(lck_path,
264 sec_initial_uid(),
265 0755);
266 if (!ok) {
267 DBG_DEBUG("Could not create lock directory: %s\n",
268 strerror(errno));
269 return NT_STATUS_ACCESS_DENIED;
272 priv_path = private_path("msg.sock");
273 if (priv_path == NULL) {
274 return NT_STATUS_NO_MEMORY;
277 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
278 0700);
279 if (!ok) {
280 DBG_DEBUG("Could not create msg directory: %s\n",
281 strerror(errno));
282 return NT_STATUS_ACCESS_DENIED;
285 frame = talloc_stackframe();
286 if (frame == NULL) {
287 return NT_STATUS_NO_MEMORY;
290 ctx = talloc_zero(frame, struct messaging_context);
291 if (ctx == NULL) {
292 status = NT_STATUS_NO_MEMORY;
293 goto done;
296 ctx->id = (struct server_id) {
297 .pid = getpid(), .vnn = NONCLUSTER_VNN
300 ctx->event_ctx = ev;
302 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
303 ctx->event_ctx,
304 &ctx->id.unique_id,
305 priv_path,
306 lck_path,
307 messaging_recv_cb,
308 ctx,
309 &ret);
310 if (ctx->msg_dgm_ref == NULL) {
311 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
312 status = map_nt_error_from_unix(ret);
313 goto done;
315 talloc_set_destructor(ctx, messaging_context_destructor);
317 if (lp_clustering()) {
318 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
320 if (ret != 0) {
321 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
322 strerror(ret)));
323 status = map_nt_error_from_unix(ret);
324 goto done;
327 ctx->id.vnn = get_my_vnn();
329 ctx->names_db = server_id_db_init(ctx,
330 ctx->id,
331 lp_lock_directory(),
333 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
334 if (ctx->names_db == NULL) {
335 DBG_DEBUG("server_id_db_init failed\n");
336 status = NT_STATUS_NO_MEMORY;
337 goto done;
340 messaging_register(ctx, NULL, MSG_PING, ping_message);
342 /* Register some debugging related messages */
344 register_msg_pool_usage(ctx);
345 register_dmalloc_msgs(ctx);
346 debug_register_msgs(ctx);
349 struct server_id_buf tmp;
350 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
353 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
355 status = NT_STATUS_OK;
356 done:
357 TALLOC_FREE(frame);
359 return status;
362 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
363 struct tevent_context *ev)
365 struct messaging_context *ctx = NULL;
366 NTSTATUS status;
368 status = messaging_init_internal(mem_ctx,
370 &ctx);
371 if (!NT_STATUS_IS_OK(status)) {
372 return NULL;
375 return ctx;
378 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
379 struct tevent_context *ev,
380 struct messaging_context **pmsg_ctx)
382 return messaging_init_internal(mem_ctx,
384 pmsg_ctx);
387 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
389 return msg_ctx->id;
393 * re-init after a fork
395 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
397 int ret;
398 char *lck_path;
400 TALLOC_FREE(msg_ctx->msg_dgm_ref);
402 msg_ctx->id = (struct server_id) {
403 .pid = getpid(), .vnn = msg_ctx->id.vnn
406 lck_path = lock_path("msg.lock");
407 if (lck_path == NULL) {
408 return NT_STATUS_NO_MEMORY;
411 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
412 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
413 private_path("msg.sock"), lck_path,
414 messaging_recv_cb, msg_ctx, &ret);
416 if (msg_ctx->msg_dgm_ref == NULL) {
417 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
418 return map_nt_error_from_unix(ret);
421 if (lp_clustering()) {
422 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
423 msg_ctx->remote);
425 if (ret != 0) {
426 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
427 strerror(ret)));
428 return map_nt_error_from_unix(ret);
432 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
434 return NT_STATUS_OK;
439 * Register a dispatch function for a particular message type. Allow multiple
440 * registrants
442 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
443 void *private_data,
444 uint32_t msg_type,
445 void (*fn)(struct messaging_context *msg,
446 void *private_data,
447 uint32_t msg_type,
448 struct server_id server_id,
449 DATA_BLOB *data))
451 struct messaging_callback *cb;
453 DEBUG(5, ("Registering messaging pointer for type %u - "
454 "private_data=%p\n",
455 (unsigned)msg_type, private_data));
458 * Only one callback per type
461 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
462 /* we allow a second registration of the same message
463 type if it has a different private pointer. This is
464 needed in, for example, the internal notify code,
465 which creates a new notify context for each tree
466 connect, and expects to receive messages to each of
467 them. */
468 if (cb->msg_type == msg_type && private_data == cb->private_data) {
469 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
470 (unsigned)msg_type, private_data));
471 cb->fn = fn;
472 cb->private_data = private_data;
473 return NT_STATUS_OK;
477 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
478 return NT_STATUS_NO_MEMORY;
481 cb->msg_type = msg_type;
482 cb->fn = fn;
483 cb->private_data = private_data;
485 DLIST_ADD(msg_ctx->callbacks, cb);
486 return NT_STATUS_OK;
490 De-register the function for a particular message type.
492 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
493 void *private_data)
495 struct messaging_callback *cb, *next;
497 for (cb = ctx->callbacks; cb; cb = next) {
498 next = cb->next;
499 if ((cb->msg_type == msg_type)
500 && (cb->private_data == private_data)) {
501 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
502 (unsigned)msg_type, private_data));
503 DLIST_REMOVE(ctx->callbacks, cb);
504 TALLOC_FREE(cb);
510 Send a message to a particular server
512 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
513 struct server_id server, uint32_t msg_type,
514 const DATA_BLOB *data)
516 struct iovec iov = {0};
518 if (data != NULL) {
519 iov.iov_base = data->data;
520 iov.iov_len = data->length;
523 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
526 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
527 struct server_id server, uint32_t msg_type,
528 const uint8_t *buf, size_t len)
530 DATA_BLOB blob = data_blob_const(buf, len);
531 return messaging_send(msg_ctx, server, msg_type, &blob);
534 struct messaging_post_state {
535 struct messaging_context *msg_ctx;
536 struct messaging_rec *rec;
539 static void messaging_post_handler(struct tevent_context *ev,
540 struct tevent_immediate *ti,
541 void *private_data);
543 static int messaging_post_self(struct messaging_context *msg_ctx,
544 struct server_id src, struct server_id dst,
545 uint32_t msg_type,
546 const struct iovec *iov, int iovlen,
547 const int *fds, size_t num_fds)
549 struct tevent_immediate *ti;
550 struct messaging_post_state *state;
552 state = talloc(msg_ctx, struct messaging_post_state);
553 if (state == NULL) {
554 return ENOMEM;
556 state->msg_ctx = msg_ctx;
558 ti = tevent_create_immediate(state);
559 if (ti == NULL) {
560 goto fail;
562 state->rec = messaging_rec_create(
563 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
564 if (state->rec == NULL) {
565 goto fail;
568 tevent_schedule_immediate(ti, msg_ctx->event_ctx,
569 messaging_post_handler, state);
570 return 0;
572 fail:
573 TALLOC_FREE(state);
574 return ENOMEM;
577 static void messaging_post_handler(struct tevent_context *ev,
578 struct tevent_immediate *ti,
579 void *private_data)
581 struct messaging_post_state *state = talloc_get_type_abort(
582 private_data, struct messaging_post_state);
583 messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
584 TALLOC_FREE(state);
587 int messaging_send_iov_from(struct messaging_context *msg_ctx,
588 struct server_id src, struct server_id dst,
589 uint32_t msg_type,
590 const struct iovec *iov, int iovlen,
591 const int *fds, size_t num_fds)
593 int ret;
594 uint8_t hdr[MESSAGE_HDR_LENGTH];
595 struct iovec iov2[iovlen+1];
597 if (server_id_is_disconnected(&dst)) {
598 return EINVAL;
601 if (num_fds > INT8_MAX) {
602 return EINVAL;
605 if (dst.vnn != msg_ctx->id.vnn) {
606 if (num_fds > 0) {
607 return ENOSYS;
610 ret = msg_ctx->remote->send_fn(src, dst,
611 msg_type, iov, iovlen,
612 NULL, 0,
613 msg_ctx->remote);
614 return ret;
617 if (server_id_equal(&dst, &msg_ctx->id)) {
618 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
619 iov, iovlen, fds, num_fds);
620 return ret;
623 message_hdr_put(hdr, msg_type, src, dst);
624 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
625 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
627 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
629 if (ret == EACCES) {
630 become_root();
631 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
632 fds, num_fds);
633 unbecome_root();
636 if (ret == ECONNREFUSED) {
638 * Linux returns this when a socket exists in the file
639 * system without a listening process. This is not
640 * documented in susv4 or the linux manpages, but it's
641 * easily testable. For the higher levels this is the
642 * same as "destination does not exist"
644 ret = ENOENT;
647 return ret;
650 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
651 struct server_id dst, uint32_t msg_type,
652 const struct iovec *iov, int iovlen,
653 const int *fds, size_t num_fds)
655 int ret;
657 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
658 iov, iovlen, fds, num_fds);
659 if (ret != 0) {
660 return map_nt_error_from_unix(ret);
662 return NT_STATUS_OK;
665 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
666 struct messaging_rec *rec)
668 struct messaging_rec *result;
669 size_t fds_size = sizeof(int64_t) * rec->num_fds;
670 size_t payload_len;
672 payload_len = rec->buf.length + fds_size;
673 if (payload_len < rec->buf.length) {
674 /* overflow */
675 return NULL;
678 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
679 payload_len);
680 if (result == NULL) {
681 return NULL;
683 *result = *rec;
685 /* Doesn't fail, see talloc_pooled_object */
687 result->buf.data = talloc_memdup(result, rec->buf.data,
688 rec->buf.length);
690 result->fds = NULL;
691 if (result->num_fds > 0) {
692 result->fds = talloc_memdup(result, rec->fds, fds_size);
695 return result;
698 struct messaging_filtered_read_state {
699 struct tevent_context *ev;
700 struct messaging_context *msg_ctx;
701 struct messaging_dgm_fde *fde;
703 bool (*filter)(struct messaging_rec *rec, void *private_data);
704 void *private_data;
706 struct messaging_rec *rec;
709 static void messaging_filtered_read_cleanup(struct tevent_req *req,
710 enum tevent_req_state req_state);
712 struct tevent_req *messaging_filtered_read_send(
713 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
714 struct messaging_context *msg_ctx,
715 bool (*filter)(struct messaging_rec *rec, void *private_data),
716 void *private_data)
718 struct tevent_req *req;
719 struct messaging_filtered_read_state *state;
720 size_t new_waiters_len;
722 req = tevent_req_create(mem_ctx, &state,
723 struct messaging_filtered_read_state);
724 if (req == NULL) {
725 return NULL;
727 state->ev = ev;
728 state->msg_ctx = msg_ctx;
729 state->filter = filter;
730 state->private_data = private_data;
733 * We have to defer the callback here, as we might be called from
734 * within a different tevent_context than state->ev
736 tevent_req_defer_callback(req, state->ev);
738 state->fde = messaging_dgm_register_tevent_context(state, ev);
739 if (tevent_req_nomem(state->fde, req)) {
740 return tevent_req_post(req, ev);
744 * We add ourselves to the "new_waiters" array, not the "waiters"
745 * array. If we are called from within messaging_read_done,
746 * messaging_dispatch_rec will be in an active for-loop on
747 * "waiters". We must be careful not to mess with this array, because
748 * it could mean that a single event is being delivered twice.
751 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
753 if (new_waiters_len == msg_ctx->num_new_waiters) {
754 struct tevent_req **tmp;
756 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
757 struct tevent_req *, new_waiters_len+1);
758 if (tevent_req_nomem(tmp, req)) {
759 return tevent_req_post(req, ev);
761 msg_ctx->new_waiters = tmp;
764 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
765 msg_ctx->num_new_waiters += 1;
766 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
768 return req;
771 static void messaging_filtered_read_cleanup(struct tevent_req *req,
772 enum tevent_req_state req_state)
774 struct messaging_filtered_read_state *state = tevent_req_data(
775 req, struct messaging_filtered_read_state);
776 struct messaging_context *msg_ctx = state->msg_ctx;
777 size_t i;
779 tevent_req_set_cleanup_fn(req, NULL);
781 TALLOC_FREE(state->fde);
784 * Just set the [new_]waiters entry to NULL, be careful not to mess
785 * with the other "waiters" array contents. We are often called from
786 * within "messaging_dispatch_rec", which loops over
787 * "waiters". Messing with the "waiters" array will mess up that
788 * for-loop.
791 for (i=0; i<msg_ctx->num_waiters; i++) {
792 if (msg_ctx->waiters[i] == req) {
793 msg_ctx->waiters[i] = NULL;
794 return;
798 for (i=0; i<msg_ctx->num_new_waiters; i++) {
799 if (msg_ctx->new_waiters[i] == req) {
800 msg_ctx->new_waiters[i] = NULL;
801 return;
806 static void messaging_filtered_read_done(struct tevent_req *req,
807 struct messaging_rec *rec)
809 struct messaging_filtered_read_state *state = tevent_req_data(
810 req, struct messaging_filtered_read_state);
812 state->rec = messaging_rec_dup(state, rec);
813 if (tevent_req_nomem(state->rec, req)) {
814 return;
816 tevent_req_done(req);
819 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
820 struct messaging_rec **presult)
822 struct messaging_filtered_read_state *state = tevent_req_data(
823 req, struct messaging_filtered_read_state);
824 int err;
826 if (tevent_req_is_unix_error(req, &err)) {
827 tevent_req_received(req);
828 return err;
830 if (presult != NULL) {
831 *presult = talloc_move(mem_ctx, &state->rec);
833 return 0;
836 struct messaging_read_state {
837 uint32_t msg_type;
838 struct messaging_rec *rec;
841 static bool messaging_read_filter(struct messaging_rec *rec,
842 void *private_data);
843 static void messaging_read_done(struct tevent_req *subreq);
845 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
846 struct tevent_context *ev,
847 struct messaging_context *msg,
848 uint32_t msg_type)
850 struct tevent_req *req, *subreq;
851 struct messaging_read_state *state;
853 req = tevent_req_create(mem_ctx, &state,
854 struct messaging_read_state);
855 if (req == NULL) {
856 return NULL;
858 state->msg_type = msg_type;
860 subreq = messaging_filtered_read_send(state, ev, msg,
861 messaging_read_filter, state);
862 if (tevent_req_nomem(subreq, req)) {
863 return tevent_req_post(req, ev);
865 tevent_req_set_callback(subreq, messaging_read_done, req);
866 return req;
869 static bool messaging_read_filter(struct messaging_rec *rec,
870 void *private_data)
872 struct messaging_read_state *state = talloc_get_type_abort(
873 private_data, struct messaging_read_state);
875 if (rec->num_fds != 0) {
876 return false;
879 return rec->msg_type == state->msg_type;
882 static void messaging_read_done(struct tevent_req *subreq)
884 struct tevent_req *req = tevent_req_callback_data(
885 subreq, struct tevent_req);
886 struct messaging_read_state *state = tevent_req_data(
887 req, struct messaging_read_state);
888 int ret;
890 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
891 TALLOC_FREE(subreq);
892 if (tevent_req_error(req, ret)) {
893 return;
895 tevent_req_done(req);
898 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
899 struct messaging_rec **presult)
901 struct messaging_read_state *state = tevent_req_data(
902 req, struct messaging_read_state);
903 int err;
905 if (tevent_req_is_unix_error(req, &err)) {
906 return err;
908 if (presult != NULL) {
909 *presult = talloc_move(mem_ctx, &state->rec);
911 return 0;
914 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
916 if (msg_ctx->num_new_waiters == 0) {
917 return true;
920 if (talloc_array_length(msg_ctx->waiters) <
921 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
922 struct tevent_req **tmp;
923 tmp = talloc_realloc(
924 msg_ctx, msg_ctx->waiters, struct tevent_req *,
925 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
926 if (tmp == NULL) {
927 DEBUG(1, ("%s: talloc failed\n", __func__));
928 return false;
930 msg_ctx->waiters = tmp;
933 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
934 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
936 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
937 msg_ctx->num_new_waiters = 0;
939 return true;
942 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
943 struct messaging_rec *rec)
945 struct messaging_callback *cb, *next;
947 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
948 size_t j;
950 next = cb->next;
951 if (cb->msg_type != rec->msg_type) {
952 continue;
956 * the old style callbacks don't support fd passing
958 for (j=0; j < rec->num_fds; j++) {
959 int fd = rec->fds[j];
960 close(fd);
962 rec->num_fds = 0;
963 rec->fds = NULL;
965 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
966 rec->src, &rec->buf);
968 return true;
971 return false;
975 Dispatch one messaging_rec
977 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
978 struct tevent_context *ev,
979 struct messaging_rec *rec)
981 size_t i;
982 bool consumed;
984 if (ev == msg_ctx->event_ctx) {
985 consumed = messaging_dispatch_classic(msg_ctx, rec);
986 if (consumed) {
987 return;
991 if (!messaging_append_new_waiters(msg_ctx)) {
992 size_t j;
993 for (j=0; j < rec->num_fds; j++) {
994 int fd = rec->fds[j];
995 close(fd);
997 rec->num_fds = 0;
998 rec->fds = NULL;
999 return;
1002 i = 0;
1003 while (i < msg_ctx->num_waiters) {
1004 struct tevent_req *req;
1005 struct messaging_filtered_read_state *state;
1007 req = msg_ctx->waiters[i];
1008 if (req == NULL) {
1010 * This got cleaned up. In the meantime,
1011 * move everything down one. We need
1012 * to keep the order of waiters, as
1013 * other code may depend on this.
1015 if (i < msg_ctx->num_waiters - 1) {
1016 memmove(&msg_ctx->waiters[i],
1017 &msg_ctx->waiters[i+1],
1018 sizeof(struct tevent_req *) *
1019 (msg_ctx->num_waiters - i - 1));
1021 msg_ctx->num_waiters -= 1;
1022 continue;
1025 state = tevent_req_data(
1026 req, struct messaging_filtered_read_state);
1027 if ((ev == state->ev) &&
1028 state->filter(rec, state->private_data)) {
1029 messaging_filtered_read_done(req, rec);
1030 return;
1033 i += 1;
1036 if (ev != msg_ctx->event_ctx) {
1037 struct iovec iov;
1038 int fds[rec->num_fds];
1039 int ret;
1042 * We've been listening on a nested event
1043 * context. Messages need to be handled in the main
1044 * event context, so post to ourselves
1047 iov.iov_base = rec->buf.data;
1048 iov.iov_len = rec->buf.length;
1050 for (i=0; i<rec->num_fds; i++) {
1051 fds[i] = rec->fds[i];
1054 ret = messaging_post_self(
1055 msg_ctx, rec->src, rec->dest, rec->msg_type,
1056 &iov, 1, fds, rec->num_fds);
1057 if (ret == 0) {
1058 return;
1063 * If the fd-array isn't used, just close it.
1065 for (i=0; i < rec->num_fds; i++) {
1066 int fd = rec->fds[i];
1067 close(fd);
1069 rec->num_fds = 0;
1070 rec->fds = NULL;
1073 static int mess_parent_dgm_cleanup(void *private_data);
1074 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1076 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1078 struct tevent_req *req;
1080 req = background_job_send(
1081 msg, msg->event_ctx, msg, NULL, 0,
1082 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1083 60*15),
1084 mess_parent_dgm_cleanup, msg);
1085 if (req == NULL) {
1086 return false;
1088 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1089 return true;
1092 static int mess_parent_dgm_cleanup(void *private_data)
1094 int ret;
1096 ret = messaging_dgm_wipe();
1097 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1098 ret ? strerror(ret) : "ok"));
1099 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1100 60*15);
1103 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1105 struct messaging_context *msg = tevent_req_callback_data(
1106 req, struct messaging_context);
1107 NTSTATUS status;
1109 status = background_job_recv(req);
1110 TALLOC_FREE(req);
1111 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1112 nt_errstr(status)));
1114 req = background_job_send(
1115 msg, msg->event_ctx, msg, NULL, 0,
1116 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1117 60*15),
1118 mess_parent_dgm_cleanup, msg);
1119 if (req == NULL) {
1120 DEBUG(1, ("background_job_send failed\n"));
1121 return;
1123 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1126 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1128 int ret;
1130 if (pid == 0) {
1131 ret = messaging_dgm_wipe();
1132 } else {
1133 ret = messaging_dgm_cleanup(pid);
1136 return ret;
1139 struct tevent_context *messaging_tevent_context(
1140 struct messaging_context *msg_ctx)
1142 return msg_ctx->event_ctx;
1145 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1147 return msg_ctx->names_db;
1150 /** @} **/