gssapi: check for gss_acquire_cred_from
[Samba.git] / source3 / lib / messages.c
blobd7ad49d7c309c5a07e02b979ce7182994c6d2b91
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_util.h"
61 struct messaging_callback {
62 struct messaging_callback *prev, *next;
63 uint32_t msg_type;
64 void (*fn)(struct messaging_context *msg, void *private_data,
65 uint32_t msg_type,
66 struct server_id server_id, DATA_BLOB *data);
67 void *private_data;
70 struct messaging_context {
71 struct server_id id;
72 struct tevent_context *event_ctx;
73 struct messaging_callback *callbacks;
75 struct tevent_req **new_waiters;
76 unsigned num_new_waiters;
78 struct tevent_req **waiters;
79 unsigned num_waiters;
81 void *msg_dgm_ref;
82 struct messaging_backend *remote;
84 struct server_id_db *names_db;
87 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
88 struct messaging_rec *rec);
89 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
90 struct tevent_context *ev,
91 struct messaging_rec *rec);
93 /****************************************************************************
94 A useful function for testing the message system.
95 ****************************************************************************/
97 static void ping_message(struct messaging_context *msg_ctx,
98 void *private_data,
99 uint32_t msg_type,
100 struct server_id src,
101 DATA_BLOB *data)
103 struct server_id_buf idbuf;
105 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
106 server_id_str_buf(src, &idbuf), (int)data->length,
107 data->data ? (char *)data->data : ""));
109 messaging_send(msg_ctx, src, MSG_PONG, data);
112 static struct messaging_rec *messaging_rec_create(
113 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
114 uint32_t msg_type, const struct iovec *iov, int iovlen,
115 const int *fds, size_t num_fds)
117 ssize_t buflen;
118 uint8_t *buf;
119 struct messaging_rec *result;
121 if (num_fds > INT8_MAX) {
122 return NULL;
125 buflen = iov_buflen(iov, iovlen);
126 if (buflen == -1) {
127 return NULL;
129 buf = talloc_array(mem_ctx, uint8_t, buflen);
130 if (buf == NULL) {
131 return NULL;
133 iov_buf(iov, iovlen, buf, buflen);
136 struct messaging_rec rec;
137 int64_t fds64[num_fds];
138 size_t i;
140 for (i=0; i<num_fds; i++) {
141 fds64[i] = fds[i];
144 rec = (struct messaging_rec) {
145 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
146 .src = src, .dest = dst,
147 .buf.data = buf, .buf.length = buflen,
148 .num_fds = num_fds, .fds = fds64,
151 result = messaging_rec_dup(mem_ctx, &rec);
154 TALLOC_FREE(buf);
156 return result;
159 static void messaging_recv_cb(struct tevent_context *ev,
160 const uint8_t *msg, size_t msg_len,
161 int *fds, size_t num_fds,
162 void *private_data)
164 struct messaging_context *msg_ctx = talloc_get_type_abort(
165 private_data, struct messaging_context);
166 struct server_id_buf idbuf;
167 struct messaging_rec rec;
168 int64_t fds64[MIN(num_fds, INT8_MAX)];
169 size_t i;
171 if (msg_len < MESSAGE_HDR_LENGTH) {
172 DBG_WARNING("message too short: %zu\n", msg_len);
173 goto close_fail;
176 if (num_fds > INT8_MAX) {
177 DBG_WARNING("too many fds: %zu\n", num_fds);
178 goto close_fail;
182 * "consume" the fds by copying them and setting
183 * the original variable to -1
185 for (i=0; i < num_fds; i++) {
186 fds64[i] = fds[i];
187 fds[i] = -1;
190 rec = (struct messaging_rec) {
191 .msg_version = MESSAGE_VERSION,
192 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
193 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
194 .num_fds = num_fds,
195 .fds = fds64,
198 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
200 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
201 (unsigned)rec.msg_type, rec.buf.length, num_fds,
202 server_id_str_buf(rec.src, &idbuf));
204 messaging_dispatch_rec(msg_ctx, ev, &rec);
205 return;
207 close_fail:
208 for (i=0; i < num_fds; i++) {
209 close(fds[i]);
213 static int messaging_context_destructor(struct messaging_context *ctx)
215 unsigned i;
217 for (i=0; i<ctx->num_new_waiters; i++) {
218 if (ctx->new_waiters[i] != NULL) {
219 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
220 ctx->new_waiters[i] = NULL;
223 for (i=0; i<ctx->num_waiters; i++) {
224 if (ctx->waiters[i] != NULL) {
225 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
226 ctx->waiters[i] = NULL;
230 return 0;
233 static const char *private_path(const char *name)
235 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
238 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
239 struct tevent_context *ev,
240 struct messaging_context **pmsg_ctx)
242 TALLOC_CTX *frame;
243 struct messaging_context *ctx;
244 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
245 int ret;
246 const char *lck_path;
247 const char *priv_path;
248 bool ok;
250 lck_path = lock_path("msg.lock");
251 if (lck_path == NULL) {
252 return NT_STATUS_NO_MEMORY;
255 ok = directory_create_or_exist_strict(lck_path,
256 sec_initial_uid(),
257 0755);
258 if (!ok) {
259 DBG_DEBUG("Could not create lock directory: %s\n",
260 strerror(errno));
261 return NT_STATUS_ACCESS_DENIED;
264 priv_path = private_path("msg.sock");
265 if (priv_path == NULL) {
266 return NT_STATUS_NO_MEMORY;
269 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
270 0700);
271 if (!ok) {
272 DBG_DEBUG("Could not create msg directory: %s\n",
273 strerror(errno));
274 return NT_STATUS_ACCESS_DENIED;
277 frame = talloc_stackframe();
278 if (frame == NULL) {
279 return NT_STATUS_NO_MEMORY;
282 ctx = talloc_zero(frame, struct messaging_context);
283 if (ctx == NULL) {
284 status = NT_STATUS_NO_MEMORY;
285 goto done;
288 ctx->id = (struct server_id) {
289 .pid = getpid(), .vnn = NONCLUSTER_VNN
292 ctx->event_ctx = ev;
294 sec_init();
296 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
297 ctx->event_ctx,
298 &ctx->id.unique_id,
299 priv_path,
300 lck_path,
301 messaging_recv_cb,
302 ctx,
303 &ret);
304 if (ctx->msg_dgm_ref == NULL) {
305 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
306 status = NT_STATUS_INTERNAL_ERROR;
307 goto done;
309 talloc_set_destructor(ctx, messaging_context_destructor);
311 if (lp_clustering()) {
312 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
314 if (ret != 0) {
315 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
316 strerror(ret)));
317 status = NT_STATUS_INTERNAL_ERROR;
318 goto done;
321 ctx->id.vnn = get_my_vnn();
323 ctx->names_db = server_id_db_init(ctx,
324 ctx->id,
325 lp_lock_directory(),
327 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
328 if (ctx->names_db == NULL) {
329 DBG_DEBUG("server_id_db_init failed\n");
330 status = NT_STATUS_INTERNAL_ERROR;
331 goto done;
334 messaging_register(ctx, NULL, MSG_PING, ping_message);
336 /* Register some debugging related messages */
338 register_msg_pool_usage(ctx);
339 register_dmalloc_msgs(ctx);
340 debug_register_msgs(ctx);
343 struct server_id_buf tmp;
344 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
347 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
349 status = NT_STATUS_OK;
350 done:
351 TALLOC_FREE(frame);
353 return status;
356 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
357 struct tevent_context *ev)
359 struct messaging_context *ctx = NULL;
360 NTSTATUS status;
362 status = messaging_init_internal(mem_ctx,
364 &ctx);
365 if (!NT_STATUS_IS_OK(status)) {
366 return NULL;
369 return ctx;
372 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
373 struct tevent_context *ev,
374 struct messaging_context **pmsg_ctx)
376 return messaging_init_internal(mem_ctx,
378 pmsg_ctx);
381 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
383 return msg_ctx->id;
387 * re-init after a fork
389 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
391 int ret;
392 char *lck_path;
394 TALLOC_FREE(msg_ctx->msg_dgm_ref);
396 msg_ctx->id = (struct server_id) {
397 .pid = getpid(), .vnn = msg_ctx->id.vnn
400 lck_path = lock_path("msg.lock");
401 if (lck_path == NULL) {
402 return NT_STATUS_NO_MEMORY;
405 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
406 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
407 private_path("msg.sock"), lck_path,
408 messaging_recv_cb, msg_ctx, &ret);
410 if (msg_ctx->msg_dgm_ref == NULL) {
411 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
412 return map_nt_error_from_unix(ret);
415 if (lp_clustering()) {
416 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
417 msg_ctx->remote);
419 if (ret != 0) {
420 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
421 strerror(ret)));
422 return map_nt_error_from_unix(ret);
426 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
428 return NT_STATUS_OK;
433 * Register a dispatch function for a particular message type. Allow multiple
434 * registrants
436 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
437 void *private_data,
438 uint32_t msg_type,
439 void (*fn)(struct messaging_context *msg,
440 void *private_data,
441 uint32_t msg_type,
442 struct server_id server_id,
443 DATA_BLOB *data))
445 struct messaging_callback *cb;
447 DEBUG(5, ("Registering messaging pointer for type %u - "
448 "private_data=%p\n",
449 (unsigned)msg_type, private_data));
452 * Only one callback per type
455 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
456 /* we allow a second registration of the same message
457 type if it has a different private pointer. This is
458 needed in, for example, the internal notify code,
459 which creates a new notify context for each tree
460 connect, and expects to receive messages to each of
461 them. */
462 if (cb->msg_type == msg_type && private_data == cb->private_data) {
463 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
464 (unsigned)msg_type, private_data));
465 cb->fn = fn;
466 cb->private_data = private_data;
467 return NT_STATUS_OK;
471 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
472 return NT_STATUS_NO_MEMORY;
475 cb->msg_type = msg_type;
476 cb->fn = fn;
477 cb->private_data = private_data;
479 DLIST_ADD(msg_ctx->callbacks, cb);
480 return NT_STATUS_OK;
484 De-register the function for a particular message type.
486 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
487 void *private_data)
489 struct messaging_callback *cb, *next;
491 for (cb = ctx->callbacks; cb; cb = next) {
492 next = cb->next;
493 if ((cb->msg_type == msg_type)
494 && (cb->private_data == private_data)) {
495 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
496 (unsigned)msg_type, private_data));
497 DLIST_REMOVE(ctx->callbacks, cb);
498 TALLOC_FREE(cb);
504 Send a message to a particular server
506 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
507 struct server_id server, uint32_t msg_type,
508 const DATA_BLOB *data)
510 struct iovec iov = {0};
512 if (data != NULL) {
513 iov.iov_base = data->data;
514 iov.iov_len = data->length;
517 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
520 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
521 struct server_id server, uint32_t msg_type,
522 const uint8_t *buf, size_t len)
524 DATA_BLOB blob = data_blob_const(buf, len);
525 return messaging_send(msg_ctx, server, msg_type, &blob);
528 struct messaging_post_state {
529 struct messaging_context *msg_ctx;
530 struct messaging_rec *rec;
533 static void messaging_post_handler(struct tevent_context *ev,
534 struct tevent_immediate *ti,
535 void *private_data);
537 static int messaging_post_self(struct messaging_context *msg_ctx,
538 struct server_id src, struct server_id dst,
539 uint32_t msg_type,
540 const struct iovec *iov, int iovlen,
541 const int *fds, size_t num_fds)
543 struct tevent_immediate *ti;
544 struct messaging_post_state *state;
546 state = talloc(msg_ctx, struct messaging_post_state);
547 if (state == NULL) {
548 return ENOMEM;
550 state->msg_ctx = msg_ctx;
552 ti = tevent_create_immediate(state);
553 if (ti == NULL) {
554 goto fail;
556 state->rec = messaging_rec_create(
557 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
558 if (state->rec == NULL) {
559 goto fail;
562 tevent_schedule_immediate(ti, msg_ctx->event_ctx,
563 messaging_post_handler, state);
564 return 0;
566 fail:
567 TALLOC_FREE(state);
568 return ENOMEM;
571 static void messaging_post_handler(struct tevent_context *ev,
572 struct tevent_immediate *ti,
573 void *private_data)
575 struct messaging_post_state *state = talloc_get_type_abort(
576 private_data, struct messaging_post_state);
577 messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
578 TALLOC_FREE(state);
581 int messaging_send_iov_from(struct messaging_context *msg_ctx,
582 struct server_id src, struct server_id dst,
583 uint32_t msg_type,
584 const struct iovec *iov, int iovlen,
585 const int *fds, size_t num_fds)
587 int ret;
588 uint8_t hdr[MESSAGE_HDR_LENGTH];
589 struct iovec iov2[iovlen+1];
591 if (server_id_is_disconnected(&dst)) {
592 return EINVAL;
595 if (num_fds > INT8_MAX) {
596 return EINVAL;
599 if (dst.vnn != msg_ctx->id.vnn) {
600 if (num_fds > 0) {
601 return ENOSYS;
604 ret = msg_ctx->remote->send_fn(src, dst,
605 msg_type, iov, iovlen,
606 NULL, 0,
607 msg_ctx->remote);
608 return ret;
611 if (server_id_equal(&dst, &msg_ctx->id)) {
612 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
613 iov, iovlen, fds, num_fds);
614 return ret;
617 message_hdr_put(hdr, msg_type, src, dst);
618 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
619 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
621 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
623 if (ret == EACCES) {
624 become_root();
625 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
626 fds, num_fds);
627 unbecome_root();
630 if (ret == ECONNREFUSED) {
632 * Linux returns this when a socket exists in the file
633 * system without a listening process. This is not
634 * documented in susv4 or the linux manpages, but it's
635 * easily testable. For the higher levels this is the
636 * same as "destination does not exist"
638 ret = ENOENT;
641 return ret;
644 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
645 struct server_id dst, uint32_t msg_type,
646 const struct iovec *iov, int iovlen,
647 const int *fds, size_t num_fds)
649 int ret;
651 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
652 iov, iovlen, fds, num_fds);
653 if (ret != 0) {
654 return map_nt_error_from_unix(ret);
656 return NT_STATUS_OK;
659 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
660 struct messaging_rec *rec)
662 struct messaging_rec *result;
663 size_t fds_size = sizeof(int64_t) * rec->num_fds;
664 size_t payload_len;
666 payload_len = rec->buf.length + fds_size;
667 if (payload_len < rec->buf.length) {
668 /* overflow */
669 return NULL;
672 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
673 payload_len);
674 if (result == NULL) {
675 return NULL;
677 *result = *rec;
679 /* Doesn't fail, see talloc_pooled_object */
681 result->buf.data = talloc_memdup(result, rec->buf.data,
682 rec->buf.length);
684 result->fds = NULL;
685 if (result->num_fds > 0) {
686 result->fds = talloc_memdup(result, rec->fds, fds_size);
689 return result;
692 struct messaging_filtered_read_state {
693 struct tevent_context *ev;
694 struct messaging_context *msg_ctx;
695 struct messaging_dgm_fde *fde;
697 bool (*filter)(struct messaging_rec *rec, void *private_data);
698 void *private_data;
700 struct messaging_rec *rec;
703 static void messaging_filtered_read_cleanup(struct tevent_req *req,
704 enum tevent_req_state req_state);
706 struct tevent_req *messaging_filtered_read_send(
707 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
708 struct messaging_context *msg_ctx,
709 bool (*filter)(struct messaging_rec *rec, void *private_data),
710 void *private_data)
712 struct tevent_req *req;
713 struct messaging_filtered_read_state *state;
714 size_t new_waiters_len;
716 req = tevent_req_create(mem_ctx, &state,
717 struct messaging_filtered_read_state);
718 if (req == NULL) {
719 return NULL;
721 state->ev = ev;
722 state->msg_ctx = msg_ctx;
723 state->filter = filter;
724 state->private_data = private_data;
727 * We have to defer the callback here, as we might be called from
728 * within a different tevent_context than state->ev
730 tevent_req_defer_callback(req, state->ev);
732 state->fde = messaging_dgm_register_tevent_context(state, ev);
733 if (tevent_req_nomem(state->fde, req)) {
734 return tevent_req_post(req, ev);
738 * We add ourselves to the "new_waiters" array, not the "waiters"
739 * array. If we are called from within messaging_read_done,
740 * messaging_dispatch_rec will be in an active for-loop on
741 * "waiters". We must be careful not to mess with this array, because
742 * it could mean that a single event is being delivered twice.
745 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
747 if (new_waiters_len == msg_ctx->num_new_waiters) {
748 struct tevent_req **tmp;
750 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
751 struct tevent_req *, new_waiters_len+1);
752 if (tevent_req_nomem(tmp, req)) {
753 return tevent_req_post(req, ev);
755 msg_ctx->new_waiters = tmp;
758 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
759 msg_ctx->num_new_waiters += 1;
760 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
762 return req;
765 static void messaging_filtered_read_cleanup(struct tevent_req *req,
766 enum tevent_req_state req_state)
768 struct messaging_filtered_read_state *state = tevent_req_data(
769 req, struct messaging_filtered_read_state);
770 struct messaging_context *msg_ctx = state->msg_ctx;
771 unsigned i;
773 tevent_req_set_cleanup_fn(req, NULL);
775 TALLOC_FREE(state->fde);
778 * Just set the [new_]waiters entry to NULL, be careful not to mess
779 * with the other "waiters" array contents. We are often called from
780 * within "messaging_dispatch_rec", which loops over
781 * "waiters". Messing with the "waiters" array will mess up that
782 * for-loop.
785 for (i=0; i<msg_ctx->num_waiters; i++) {
786 if (msg_ctx->waiters[i] == req) {
787 msg_ctx->waiters[i] = NULL;
788 return;
792 for (i=0; i<msg_ctx->num_new_waiters; i++) {
793 if (msg_ctx->new_waiters[i] == req) {
794 msg_ctx->new_waiters[i] = NULL;
795 return;
800 static void messaging_filtered_read_done(struct tevent_req *req,
801 struct messaging_rec *rec)
803 struct messaging_filtered_read_state *state = tevent_req_data(
804 req, struct messaging_filtered_read_state);
806 state->rec = messaging_rec_dup(state, rec);
807 if (tevent_req_nomem(state->rec, req)) {
808 return;
810 tevent_req_done(req);
813 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
814 struct messaging_rec **presult)
816 struct messaging_filtered_read_state *state = tevent_req_data(
817 req, struct messaging_filtered_read_state);
818 int err;
820 if (tevent_req_is_unix_error(req, &err)) {
821 tevent_req_received(req);
822 return err;
824 if (presult != NULL) {
825 *presult = talloc_move(mem_ctx, &state->rec);
827 return 0;
830 struct messaging_read_state {
831 uint32_t msg_type;
832 struct messaging_rec *rec;
835 static bool messaging_read_filter(struct messaging_rec *rec,
836 void *private_data);
837 static void messaging_read_done(struct tevent_req *subreq);
839 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
840 struct tevent_context *ev,
841 struct messaging_context *msg,
842 uint32_t msg_type)
844 struct tevent_req *req, *subreq;
845 struct messaging_read_state *state;
847 req = tevent_req_create(mem_ctx, &state,
848 struct messaging_read_state);
849 if (req == NULL) {
850 return NULL;
852 state->msg_type = msg_type;
854 subreq = messaging_filtered_read_send(state, ev, msg,
855 messaging_read_filter, state);
856 if (tevent_req_nomem(subreq, req)) {
857 return tevent_req_post(req, ev);
859 tevent_req_set_callback(subreq, messaging_read_done, req);
860 return req;
863 static bool messaging_read_filter(struct messaging_rec *rec,
864 void *private_data)
866 struct messaging_read_state *state = talloc_get_type_abort(
867 private_data, struct messaging_read_state);
869 if (rec->num_fds != 0) {
870 return false;
873 return rec->msg_type == state->msg_type;
876 static void messaging_read_done(struct tevent_req *subreq)
878 struct tevent_req *req = tevent_req_callback_data(
879 subreq, struct tevent_req);
880 struct messaging_read_state *state = tevent_req_data(
881 req, struct messaging_read_state);
882 int ret;
884 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
885 TALLOC_FREE(subreq);
886 if (tevent_req_error(req, ret)) {
887 return;
889 tevent_req_done(req);
892 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
893 struct messaging_rec **presult)
895 struct messaging_read_state *state = tevent_req_data(
896 req, struct messaging_read_state);
897 int err;
899 if (tevent_req_is_unix_error(req, &err)) {
900 return err;
902 if (presult != NULL) {
903 *presult = talloc_move(mem_ctx, &state->rec);
905 return 0;
908 struct messaging_handler_state {
909 struct tevent_context *ev;
910 struct messaging_context *msg_ctx;
911 uint32_t msg_type;
912 bool (*handler)(struct messaging_context *msg_ctx,
913 struct messaging_rec **rec, void *private_data);
914 void *private_data;
917 static void messaging_handler_got_msg(struct tevent_req *subreq);
919 struct tevent_req *messaging_handler_send(
920 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
921 struct messaging_context *msg_ctx, uint32_t msg_type,
922 bool (*handler)(struct messaging_context *msg_ctx,
923 struct messaging_rec **rec, void *private_data),
924 void *private_data)
926 struct tevent_req *req, *subreq;
927 struct messaging_handler_state *state;
929 req = tevent_req_create(mem_ctx, &state,
930 struct messaging_handler_state);
931 if (req == NULL) {
932 return NULL;
934 state->ev = ev;
935 state->msg_ctx = msg_ctx;
936 state->msg_type = msg_type;
937 state->handler = handler;
938 state->private_data = private_data;
940 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
941 state->msg_type);
942 if (tevent_req_nomem(subreq, req)) {
943 return tevent_req_post(req, ev);
945 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
946 return req;
949 static void messaging_handler_got_msg(struct tevent_req *subreq)
951 struct tevent_req *req = tevent_req_callback_data(
952 subreq, struct tevent_req);
953 struct messaging_handler_state *state = tevent_req_data(
954 req, struct messaging_handler_state);
955 struct messaging_rec *rec;
956 int ret;
957 bool ok;
959 ret = messaging_read_recv(subreq, state, &rec);
960 TALLOC_FREE(subreq);
961 if (tevent_req_error(req, ret)) {
962 return;
965 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
966 state->msg_type);
967 if (tevent_req_nomem(subreq, req)) {
968 return;
970 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
972 ok = state->handler(state->msg_ctx, &rec, state->private_data);
973 TALLOC_FREE(rec);
974 if (ok) {
976 * Next round
978 return;
980 TALLOC_FREE(subreq);
981 tevent_req_done(req);
984 int messaging_handler_recv(struct tevent_req *req)
986 return tevent_req_simple_recv_unix(req);
989 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
991 if (msg_ctx->num_new_waiters == 0) {
992 return true;
995 if (talloc_array_length(msg_ctx->waiters) <
996 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
997 struct tevent_req **tmp;
998 tmp = talloc_realloc(
999 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1000 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1001 if (tmp == NULL) {
1002 DEBUG(1, ("%s: talloc failed\n", __func__));
1003 return false;
1005 msg_ctx->waiters = tmp;
1008 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1009 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1011 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1012 msg_ctx->num_new_waiters = 0;
1014 return true;
1017 static void messaging_dispatch_classic(struct messaging_context *msg_ctx,
1018 struct messaging_rec *rec)
1020 struct messaging_callback *cb, *next;
1022 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1023 size_t j;
1025 next = cb->next;
1026 if (cb->msg_type != rec->msg_type) {
1027 continue;
1031 * the old style callbacks don't support fd passing
1033 for (j=0; j < rec->num_fds; j++) {
1034 int fd = rec->fds[j];
1035 close(fd);
1037 rec->num_fds = 0;
1038 rec->fds = NULL;
1040 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1041 rec->src, &rec->buf);
1044 * we continue looking for matching messages after finding
1045 * one. This matters for subsystems like the internal notify
1046 * code which register more than one handler for the same
1047 * message type
1053 Dispatch one messaging_rec
1055 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1056 struct tevent_context *ev,
1057 struct messaging_rec *rec)
1059 unsigned i;
1060 size_t j;
1062 if (ev == msg_ctx->event_ctx) {
1063 messaging_dispatch_classic(msg_ctx, rec);
1066 if (!messaging_append_new_waiters(msg_ctx)) {
1067 for (j=0; j < rec->num_fds; j++) {
1068 int fd = rec->fds[j];
1069 close(fd);
1071 rec->num_fds = 0;
1072 rec->fds = NULL;
1073 return;
1076 i = 0;
1077 while (i < msg_ctx->num_waiters) {
1078 struct tevent_req *req;
1079 struct messaging_filtered_read_state *state;
1081 req = msg_ctx->waiters[i];
1082 if (req == NULL) {
1084 * This got cleaned up. In the meantime,
1085 * move everything down one. We need
1086 * to keep the order of waiters, as
1087 * other code may depend on this.
1089 if (i < msg_ctx->num_waiters - 1) {
1090 memmove(&msg_ctx->waiters[i],
1091 &msg_ctx->waiters[i+1],
1092 sizeof(struct tevent_req *) *
1093 (msg_ctx->num_waiters - i - 1));
1095 msg_ctx->num_waiters -= 1;
1096 continue;
1099 state = tevent_req_data(
1100 req, struct messaging_filtered_read_state);
1101 if ((ev == state->ev) &&
1102 state->filter(rec, state->private_data)) {
1103 messaging_filtered_read_done(req, rec);
1106 * Only the first one gets the fd-array
1108 rec->num_fds = 0;
1109 rec->fds = NULL;
1112 i += 1;
1115 if (ev != msg_ctx->event_ctx) {
1116 struct iovec iov;
1117 int fds[rec->num_fds];
1118 int ret;
1121 * We've been listening on a nested event
1122 * context. Messages need to be handled in the main
1123 * event context, so post to ourselves
1126 iov.iov_base = rec->buf.data;
1127 iov.iov_len = rec->buf.length;
1129 for (i=0; i<rec->num_fds; i++) {
1130 fds[i] = rec->fds[i];
1133 ret = messaging_post_self(
1134 msg_ctx, rec->src, rec->dest, rec->msg_type,
1135 &iov, 1, fds, rec->num_fds);
1136 if (ret == 0) {
1137 return;
1142 * If the fd-array isn't used, just close it.
1144 for (j=0; j < rec->num_fds; j++) {
1145 int fd = rec->fds[j];
1146 close(fd);
1148 rec->num_fds = 0;
1149 rec->fds = NULL;
1152 static int mess_parent_dgm_cleanup(void *private_data);
1153 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1155 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1157 struct tevent_req *req;
1159 req = background_job_send(
1160 msg, msg->event_ctx, msg, NULL, 0,
1161 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1162 60*15),
1163 mess_parent_dgm_cleanup, msg);
1164 if (req == NULL) {
1165 return false;
1167 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1168 return true;
1171 static int mess_parent_dgm_cleanup(void *private_data)
1173 int ret;
1175 ret = messaging_dgm_wipe();
1176 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1177 ret ? strerror(ret) : "ok"));
1178 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1179 60*15);
1182 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1184 struct messaging_context *msg = tevent_req_callback_data(
1185 req, struct messaging_context);
1186 NTSTATUS status;
1188 status = background_job_recv(req);
1189 TALLOC_FREE(req);
1190 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1191 nt_errstr(status)));
1193 req = background_job_send(
1194 msg, msg->event_ctx, msg, NULL, 0,
1195 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1196 60*15),
1197 mess_parent_dgm_cleanup, msg);
1198 if (req == NULL) {
1199 DEBUG(1, ("background_job_send failed\n"));
1200 return;
1202 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1205 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1207 int ret;
1209 if (pid == 0) {
1210 ret = messaging_dgm_wipe();
1211 } else {
1212 ret = messaging_dgm_cleanup(pid);
1215 return ret;
1218 struct tevent_context *messaging_tevent_context(
1219 struct messaging_context *msg_ctx)
1221 return msg_ctx->event_ctx;
1224 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1226 return msg_ctx->names_db;
1229 /** @} **/