smbXcli: Add "force_channel_sequence"
[Samba.git] / source3 / lib / messages.c
blobb94a6965eb888ea4df0bc35e755f63b71a04f2d2
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/messages_ctdbd.h"
57 #include "lib/util/iov_buf.h"
58 #include "lib/util/server_id_db.h"
59 #include "lib/messages_dgm_ref.h"
60 #include "lib/messages_util.h"
62 struct messaging_callback {
63 struct messaging_callback *prev, *next;
64 uint32_t msg_type;
65 void (*fn)(struct messaging_context *msg, void *private_data,
66 uint32_t msg_type,
67 struct server_id server_id, DATA_BLOB *data);
68 void *private_data;
71 struct messaging_context {
72 struct server_id id;
73 struct tevent_context *event_ctx;
74 struct messaging_callback *callbacks;
76 struct tevent_req **new_waiters;
77 size_t num_new_waiters;
79 struct tevent_req **waiters;
80 size_t num_waiters;
82 void *msg_dgm_ref;
83 struct messaging_backend *remote;
85 struct server_id_db *names_db;
88 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
89 struct messaging_rec *rec);
90 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
91 struct tevent_context *ev,
92 struct messaging_rec *rec);
94 /****************************************************************************
95 A useful function for testing the message system.
96 ****************************************************************************/
98 static void ping_message(struct messaging_context *msg_ctx,
99 void *private_data,
100 uint32_t msg_type,
101 struct server_id src,
102 DATA_BLOB *data)
104 struct server_id_buf idbuf;
106 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
107 server_id_str_buf(src, &idbuf), (int)data->length,
108 data->data ? (char *)data->data : ""));
110 messaging_send(msg_ctx, src, MSG_PONG, data);
113 struct messaging_rec *messaging_rec_create(
114 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
115 uint32_t msg_type, const struct iovec *iov, int iovlen,
116 const int *fds, size_t num_fds)
118 ssize_t buflen;
119 uint8_t *buf;
120 struct messaging_rec *result;
122 if (num_fds > INT8_MAX) {
123 return NULL;
126 buflen = iov_buflen(iov, iovlen);
127 if (buflen == -1) {
128 return NULL;
130 buf = talloc_array(mem_ctx, uint8_t, buflen);
131 if (buf == NULL) {
132 return NULL;
134 iov_buf(iov, iovlen, buf, buflen);
137 struct messaging_rec rec;
138 int64_t fds64[num_fds];
139 size_t i;
141 for (i=0; i<num_fds; i++) {
142 fds64[i] = fds[i];
145 rec = (struct messaging_rec) {
146 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
147 .src = src, .dest = dst,
148 .buf.data = buf, .buf.length = buflen,
149 .num_fds = num_fds, .fds = fds64,
152 result = messaging_rec_dup(mem_ctx, &rec);
155 TALLOC_FREE(buf);
157 return result;
160 static void messaging_recv_cb(struct tevent_context *ev,
161 const uint8_t *msg, size_t msg_len,
162 int *fds, size_t num_fds,
163 void *private_data)
165 struct messaging_context *msg_ctx = talloc_get_type_abort(
166 private_data, struct messaging_context);
167 struct server_id_buf idbuf;
168 struct messaging_rec rec;
169 int64_t fds64[MIN(num_fds, INT8_MAX)];
170 size_t i;
172 if (msg_len < MESSAGE_HDR_LENGTH) {
173 DBG_WARNING("message too short: %zu\n", msg_len);
174 goto close_fail;
177 if (num_fds > INT8_MAX) {
178 DBG_WARNING("too many fds: %zu\n", num_fds);
179 goto close_fail;
183 * "consume" the fds by copying them and setting
184 * the original variable to -1
186 for (i=0; i < num_fds; i++) {
187 fds64[i] = fds[i];
188 fds[i] = -1;
191 rec = (struct messaging_rec) {
192 .msg_version = MESSAGE_VERSION,
193 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
194 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
195 .num_fds = num_fds,
196 .fds = fds64,
199 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
201 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
202 (unsigned)rec.msg_type, rec.buf.length, num_fds,
203 server_id_str_buf(rec.src, &idbuf));
205 messaging_dispatch_rec(msg_ctx, ev, &rec);
206 return;
208 close_fail:
209 for (i=0; i < num_fds; i++) {
210 close(fds[i]);
214 static int messaging_context_destructor(struct messaging_context *ctx)
216 size_t i;
218 for (i=0; i<ctx->num_new_waiters; i++) {
219 if (ctx->new_waiters[i] != NULL) {
220 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
221 ctx->new_waiters[i] = NULL;
224 for (i=0; i<ctx->num_waiters; i++) {
225 if (ctx->waiters[i] != NULL) {
226 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
227 ctx->waiters[i] = NULL;
231 return 0;
234 static const char *private_path(const char *name)
236 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
239 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
240 struct tevent_context *ev,
241 struct messaging_context **pmsg_ctx)
243 TALLOC_CTX *frame;
244 struct messaging_context *ctx;
245 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
246 int ret;
247 const char *lck_path;
248 const char *priv_path;
249 bool ok;
251 lck_path = lock_path("msg.lock");
252 if (lck_path == NULL) {
253 return NT_STATUS_NO_MEMORY;
256 ok = directory_create_or_exist_strict(lck_path,
257 sec_initial_uid(),
258 0755);
259 if (!ok) {
260 DBG_DEBUG("Could not create lock directory: %s\n",
261 strerror(errno));
262 return NT_STATUS_ACCESS_DENIED;
265 priv_path = private_path("msg.sock");
266 if (priv_path == NULL) {
267 return NT_STATUS_NO_MEMORY;
270 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
271 0700);
272 if (!ok) {
273 DBG_DEBUG("Could not create msg directory: %s\n",
274 strerror(errno));
275 return NT_STATUS_ACCESS_DENIED;
278 frame = talloc_stackframe();
279 if (frame == NULL) {
280 return NT_STATUS_NO_MEMORY;
283 ctx = talloc_zero(frame, struct messaging_context);
284 if (ctx == NULL) {
285 status = NT_STATUS_NO_MEMORY;
286 goto done;
289 ctx->id = (struct server_id) {
290 .pid = getpid(), .vnn = NONCLUSTER_VNN
293 ctx->event_ctx = ev;
295 sec_init();
297 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
298 ctx->event_ctx,
299 &ctx->id.unique_id,
300 priv_path,
301 lck_path,
302 messaging_recv_cb,
303 ctx,
304 &ret);
305 if (ctx->msg_dgm_ref == NULL) {
306 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
307 status = map_nt_error_from_unix(ret);
308 goto done;
310 talloc_set_destructor(ctx, messaging_context_destructor);
312 if (lp_clustering()) {
313 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
315 if (ret != 0) {
316 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
317 strerror(ret)));
318 status = map_nt_error_from_unix(ret);
319 goto done;
322 ctx->id.vnn = get_my_vnn();
324 ctx->names_db = server_id_db_init(ctx,
325 ctx->id,
326 lp_lock_directory(),
328 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
329 if (ctx->names_db == NULL) {
330 DBG_DEBUG("server_id_db_init failed\n");
331 status = NT_STATUS_NO_MEMORY;
332 goto done;
335 messaging_register(ctx, NULL, MSG_PING, ping_message);
337 /* Register some debugging related messages */
339 register_msg_pool_usage(ctx);
340 register_dmalloc_msgs(ctx);
341 debug_register_msgs(ctx);
344 struct server_id_buf tmp;
345 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
348 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
350 status = NT_STATUS_OK;
351 done:
352 TALLOC_FREE(frame);
354 return status;
357 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
358 struct tevent_context *ev)
360 struct messaging_context *ctx = NULL;
361 NTSTATUS status;
363 status = messaging_init_internal(mem_ctx,
365 &ctx);
366 if (!NT_STATUS_IS_OK(status)) {
367 return NULL;
370 return ctx;
373 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
374 struct tevent_context *ev,
375 struct messaging_context **pmsg_ctx)
377 return messaging_init_internal(mem_ctx,
379 pmsg_ctx);
382 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
384 return msg_ctx->id;
388 * re-init after a fork
390 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
392 int ret;
393 char *lck_path;
395 TALLOC_FREE(msg_ctx->msg_dgm_ref);
397 msg_ctx->id = (struct server_id) {
398 .pid = getpid(), .vnn = msg_ctx->id.vnn
401 lck_path = lock_path("msg.lock");
402 if (lck_path == NULL) {
403 return NT_STATUS_NO_MEMORY;
406 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
407 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
408 private_path("msg.sock"), lck_path,
409 messaging_recv_cb, msg_ctx, &ret);
411 if (msg_ctx->msg_dgm_ref == NULL) {
412 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
413 return map_nt_error_from_unix(ret);
416 if (lp_clustering()) {
417 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
418 msg_ctx->remote);
420 if (ret != 0) {
421 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
422 strerror(ret)));
423 return map_nt_error_from_unix(ret);
427 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
429 return NT_STATUS_OK;
434 * Register a dispatch function for a particular message type. Allow multiple
435 * registrants
437 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
438 void *private_data,
439 uint32_t msg_type,
440 void (*fn)(struct messaging_context *msg,
441 void *private_data,
442 uint32_t msg_type,
443 struct server_id server_id,
444 DATA_BLOB *data))
446 struct messaging_callback *cb;
448 DEBUG(5, ("Registering messaging pointer for type %u - "
449 "private_data=%p\n",
450 (unsigned)msg_type, private_data));
453 * Only one callback per type
456 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
457 /* we allow a second registration of the same message
458 type if it has a different private pointer. This is
459 needed in, for example, the internal notify code,
460 which creates a new notify context for each tree
461 connect, and expects to receive messages to each of
462 them. */
463 if (cb->msg_type == msg_type && private_data == cb->private_data) {
464 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
465 (unsigned)msg_type, private_data));
466 cb->fn = fn;
467 cb->private_data = private_data;
468 return NT_STATUS_OK;
472 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
473 return NT_STATUS_NO_MEMORY;
476 cb->msg_type = msg_type;
477 cb->fn = fn;
478 cb->private_data = private_data;
480 DLIST_ADD(msg_ctx->callbacks, cb);
481 return NT_STATUS_OK;
485 De-register the function for a particular message type.
487 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
488 void *private_data)
490 struct messaging_callback *cb, *next;
492 for (cb = ctx->callbacks; cb; cb = next) {
493 next = cb->next;
494 if ((cb->msg_type == msg_type)
495 && (cb->private_data == private_data)) {
496 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
497 (unsigned)msg_type, private_data));
498 DLIST_REMOVE(ctx->callbacks, cb);
499 TALLOC_FREE(cb);
505 Send a message to a particular server
507 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
508 struct server_id server, uint32_t msg_type,
509 const DATA_BLOB *data)
511 struct iovec iov = {0};
513 if (data != NULL) {
514 iov.iov_base = data->data;
515 iov.iov_len = data->length;
518 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
521 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
522 struct server_id server, uint32_t msg_type,
523 const uint8_t *buf, size_t len)
525 DATA_BLOB blob = data_blob_const(buf, len);
526 return messaging_send(msg_ctx, server, msg_type, &blob);
529 struct messaging_post_state {
530 struct messaging_context *msg_ctx;
531 struct messaging_rec *rec;
534 static void messaging_post_handler(struct tevent_context *ev,
535 struct tevent_immediate *ti,
536 void *private_data);
538 static int messaging_post_self(struct messaging_context *msg_ctx,
539 struct server_id src, struct server_id dst,
540 uint32_t msg_type,
541 const struct iovec *iov, int iovlen,
542 const int *fds, size_t num_fds)
544 struct tevent_immediate *ti;
545 struct messaging_post_state *state;
547 state = talloc(msg_ctx, struct messaging_post_state);
548 if (state == NULL) {
549 return ENOMEM;
551 state->msg_ctx = msg_ctx;
553 ti = tevent_create_immediate(state);
554 if (ti == NULL) {
555 goto fail;
557 state->rec = messaging_rec_create(
558 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
559 if (state->rec == NULL) {
560 goto fail;
563 tevent_schedule_immediate(ti, msg_ctx->event_ctx,
564 messaging_post_handler, state);
565 return 0;
567 fail:
568 TALLOC_FREE(state);
569 return ENOMEM;
572 static void messaging_post_handler(struct tevent_context *ev,
573 struct tevent_immediate *ti,
574 void *private_data)
576 struct messaging_post_state *state = talloc_get_type_abort(
577 private_data, struct messaging_post_state);
578 messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
579 TALLOC_FREE(state);
582 int messaging_send_iov_from(struct messaging_context *msg_ctx,
583 struct server_id src, struct server_id dst,
584 uint32_t msg_type,
585 const struct iovec *iov, int iovlen,
586 const int *fds, size_t num_fds)
588 int ret;
589 uint8_t hdr[MESSAGE_HDR_LENGTH];
590 struct iovec iov2[iovlen+1];
592 if (server_id_is_disconnected(&dst)) {
593 return EINVAL;
596 if (num_fds > INT8_MAX) {
597 return EINVAL;
600 if (dst.vnn != msg_ctx->id.vnn) {
601 if (num_fds > 0) {
602 return ENOSYS;
605 ret = msg_ctx->remote->send_fn(src, dst,
606 msg_type, iov, iovlen,
607 NULL, 0,
608 msg_ctx->remote);
609 return ret;
612 if (server_id_equal(&dst, &msg_ctx->id)) {
613 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
614 iov, iovlen, fds, num_fds);
615 return ret;
618 message_hdr_put(hdr, msg_type, src, dst);
619 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
620 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
622 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
624 if (ret == EACCES) {
625 become_root();
626 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
627 fds, num_fds);
628 unbecome_root();
631 if (ret == ECONNREFUSED) {
633 * Linux returns this when a socket exists in the file
634 * system without a listening process. This is not
635 * documented in susv4 or the linux manpages, but it's
636 * easily testable. For the higher levels this is the
637 * same as "destination does not exist"
639 ret = ENOENT;
642 return ret;
645 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
646 struct server_id dst, uint32_t msg_type,
647 const struct iovec *iov, int iovlen,
648 const int *fds, size_t num_fds)
650 int ret;
652 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
653 iov, iovlen, fds, num_fds);
654 if (ret != 0) {
655 return map_nt_error_from_unix(ret);
657 return NT_STATUS_OK;
660 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
661 struct messaging_rec *rec)
663 struct messaging_rec *result;
664 size_t fds_size = sizeof(int64_t) * rec->num_fds;
665 size_t payload_len;
667 payload_len = rec->buf.length + fds_size;
668 if (payload_len < rec->buf.length) {
669 /* overflow */
670 return NULL;
673 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
674 payload_len);
675 if (result == NULL) {
676 return NULL;
678 *result = *rec;
680 /* Doesn't fail, see talloc_pooled_object */
682 result->buf.data = talloc_memdup(result, rec->buf.data,
683 rec->buf.length);
685 result->fds = NULL;
686 if (result->num_fds > 0) {
687 result->fds = talloc_memdup(result, rec->fds, fds_size);
690 return result;
693 struct messaging_filtered_read_state {
694 struct tevent_context *ev;
695 struct messaging_context *msg_ctx;
696 struct messaging_dgm_fde *fde;
698 bool (*filter)(struct messaging_rec *rec, void *private_data);
699 void *private_data;
701 struct messaging_rec *rec;
704 static void messaging_filtered_read_cleanup(struct tevent_req *req,
705 enum tevent_req_state req_state);
707 struct tevent_req *messaging_filtered_read_send(
708 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
709 struct messaging_context *msg_ctx,
710 bool (*filter)(struct messaging_rec *rec, void *private_data),
711 void *private_data)
713 struct tevent_req *req;
714 struct messaging_filtered_read_state *state;
715 size_t new_waiters_len;
717 req = tevent_req_create(mem_ctx, &state,
718 struct messaging_filtered_read_state);
719 if (req == NULL) {
720 return NULL;
722 state->ev = ev;
723 state->msg_ctx = msg_ctx;
724 state->filter = filter;
725 state->private_data = private_data;
728 * We have to defer the callback here, as we might be called from
729 * within a different tevent_context than state->ev
731 tevent_req_defer_callback(req, state->ev);
733 state->fde = messaging_dgm_register_tevent_context(state, ev);
734 if (tevent_req_nomem(state->fde, req)) {
735 return tevent_req_post(req, ev);
739 * We add ourselves to the "new_waiters" array, not the "waiters"
740 * array. If we are called from within messaging_read_done,
741 * messaging_dispatch_rec will be in an active for-loop on
742 * "waiters". We must be careful not to mess with this array, because
743 * it could mean that a single event is being delivered twice.
746 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
748 if (new_waiters_len == msg_ctx->num_new_waiters) {
749 struct tevent_req **tmp;
751 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
752 struct tevent_req *, new_waiters_len+1);
753 if (tevent_req_nomem(tmp, req)) {
754 return tevent_req_post(req, ev);
756 msg_ctx->new_waiters = tmp;
759 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
760 msg_ctx->num_new_waiters += 1;
761 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
763 return req;
766 static void messaging_filtered_read_cleanup(struct tevent_req *req,
767 enum tevent_req_state req_state)
769 struct messaging_filtered_read_state *state = tevent_req_data(
770 req, struct messaging_filtered_read_state);
771 struct messaging_context *msg_ctx = state->msg_ctx;
772 size_t i;
774 tevent_req_set_cleanup_fn(req, NULL);
776 TALLOC_FREE(state->fde);
779 * Just set the [new_]waiters entry to NULL, be careful not to mess
780 * with the other "waiters" array contents. We are often called from
781 * within "messaging_dispatch_rec", which loops over
782 * "waiters". Messing with the "waiters" array will mess up that
783 * for-loop.
786 for (i=0; i<msg_ctx->num_waiters; i++) {
787 if (msg_ctx->waiters[i] == req) {
788 msg_ctx->waiters[i] = NULL;
789 return;
793 for (i=0; i<msg_ctx->num_new_waiters; i++) {
794 if (msg_ctx->new_waiters[i] == req) {
795 msg_ctx->new_waiters[i] = NULL;
796 return;
801 static void messaging_filtered_read_done(struct tevent_req *req,
802 struct messaging_rec *rec)
804 struct messaging_filtered_read_state *state = tevent_req_data(
805 req, struct messaging_filtered_read_state);
807 state->rec = messaging_rec_dup(state, rec);
808 if (tevent_req_nomem(state->rec, req)) {
809 return;
811 tevent_req_done(req);
814 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
815 struct messaging_rec **presult)
817 struct messaging_filtered_read_state *state = tevent_req_data(
818 req, struct messaging_filtered_read_state);
819 int err;
821 if (tevent_req_is_unix_error(req, &err)) {
822 tevent_req_received(req);
823 return err;
825 if (presult != NULL) {
826 *presult = talloc_move(mem_ctx, &state->rec);
828 return 0;
831 struct messaging_read_state {
832 uint32_t msg_type;
833 struct messaging_rec *rec;
836 static bool messaging_read_filter(struct messaging_rec *rec,
837 void *private_data);
838 static void messaging_read_done(struct tevent_req *subreq);
840 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
841 struct tevent_context *ev,
842 struct messaging_context *msg,
843 uint32_t msg_type)
845 struct tevent_req *req, *subreq;
846 struct messaging_read_state *state;
848 req = tevent_req_create(mem_ctx, &state,
849 struct messaging_read_state);
850 if (req == NULL) {
851 return NULL;
853 state->msg_type = msg_type;
855 subreq = messaging_filtered_read_send(state, ev, msg,
856 messaging_read_filter, state);
857 if (tevent_req_nomem(subreq, req)) {
858 return tevent_req_post(req, ev);
860 tevent_req_set_callback(subreq, messaging_read_done, req);
861 return req;
864 static bool messaging_read_filter(struct messaging_rec *rec,
865 void *private_data)
867 struct messaging_read_state *state = talloc_get_type_abort(
868 private_data, struct messaging_read_state);
870 if (rec->num_fds != 0) {
871 return false;
874 return rec->msg_type == state->msg_type;
877 static void messaging_read_done(struct tevent_req *subreq)
879 struct tevent_req *req = tevent_req_callback_data(
880 subreq, struct tevent_req);
881 struct messaging_read_state *state = tevent_req_data(
882 req, struct messaging_read_state);
883 int ret;
885 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
886 TALLOC_FREE(subreq);
887 if (tevent_req_error(req, ret)) {
888 return;
890 tevent_req_done(req);
893 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
894 struct messaging_rec **presult)
896 struct messaging_read_state *state = tevent_req_data(
897 req, struct messaging_read_state);
898 int err;
900 if (tevent_req_is_unix_error(req, &err)) {
901 return err;
903 if (presult != NULL) {
904 *presult = talloc_move(mem_ctx, &state->rec);
906 return 0;
909 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
911 if (msg_ctx->num_new_waiters == 0) {
912 return true;
915 if (talloc_array_length(msg_ctx->waiters) <
916 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
917 struct tevent_req **tmp;
918 tmp = talloc_realloc(
919 msg_ctx, msg_ctx->waiters, struct tevent_req *,
920 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
921 if (tmp == NULL) {
922 DEBUG(1, ("%s: talloc failed\n", __func__));
923 return false;
925 msg_ctx->waiters = tmp;
928 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
929 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
931 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
932 msg_ctx->num_new_waiters = 0;
934 return true;
937 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
938 struct messaging_rec *rec)
940 struct messaging_callback *cb, *next;
942 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
943 size_t j;
945 next = cb->next;
946 if (cb->msg_type != rec->msg_type) {
947 continue;
951 * the old style callbacks don't support fd passing
953 for (j=0; j < rec->num_fds; j++) {
954 int fd = rec->fds[j];
955 close(fd);
957 rec->num_fds = 0;
958 rec->fds = NULL;
960 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
961 rec->src, &rec->buf);
963 return true;
966 return false;
970 Dispatch one messaging_rec
972 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
973 struct tevent_context *ev,
974 struct messaging_rec *rec)
976 size_t i;
977 bool consumed;
979 if (ev == msg_ctx->event_ctx) {
980 consumed = messaging_dispatch_classic(msg_ctx, rec);
981 if (consumed) {
982 return;
986 if (!messaging_append_new_waiters(msg_ctx)) {
987 size_t j;
988 for (j=0; j < rec->num_fds; j++) {
989 int fd = rec->fds[j];
990 close(fd);
992 rec->num_fds = 0;
993 rec->fds = NULL;
994 return;
997 i = 0;
998 while (i < msg_ctx->num_waiters) {
999 struct tevent_req *req;
1000 struct messaging_filtered_read_state *state;
1002 req = msg_ctx->waiters[i];
1003 if (req == NULL) {
1005 * This got cleaned up. In the meantime,
1006 * move everything down one. We need
1007 * to keep the order of waiters, as
1008 * other code may depend on this.
1010 if (i < msg_ctx->num_waiters - 1) {
1011 memmove(&msg_ctx->waiters[i],
1012 &msg_ctx->waiters[i+1],
1013 sizeof(struct tevent_req *) *
1014 (msg_ctx->num_waiters - i - 1));
1016 msg_ctx->num_waiters -= 1;
1017 continue;
1020 state = tevent_req_data(
1021 req, struct messaging_filtered_read_state);
1022 if ((ev == state->ev) &&
1023 state->filter(rec, state->private_data)) {
1024 messaging_filtered_read_done(req, rec);
1025 return;
1028 i += 1;
1031 if (ev != msg_ctx->event_ctx) {
1032 struct iovec iov;
1033 int fds[rec->num_fds];
1034 int ret;
1037 * We've been listening on a nested event
1038 * context. Messages need to be handled in the main
1039 * event context, so post to ourselves
1042 iov.iov_base = rec->buf.data;
1043 iov.iov_len = rec->buf.length;
1045 for (i=0; i<rec->num_fds; i++) {
1046 fds[i] = rec->fds[i];
1049 ret = messaging_post_self(
1050 msg_ctx, rec->src, rec->dest, rec->msg_type,
1051 &iov, 1, fds, rec->num_fds);
1052 if (ret == 0) {
1053 return;
1058 * If the fd-array isn't used, just close it.
1060 for (i=0; i < rec->num_fds; i++) {
1061 int fd = rec->fds[i];
1062 close(fd);
1064 rec->num_fds = 0;
1065 rec->fds = NULL;
1068 static int mess_parent_dgm_cleanup(void *private_data);
1069 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1071 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1073 struct tevent_req *req;
1075 req = background_job_send(
1076 msg, msg->event_ctx, msg, NULL, 0,
1077 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1078 60*15),
1079 mess_parent_dgm_cleanup, msg);
1080 if (req == NULL) {
1081 return false;
1083 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1084 return true;
1087 static int mess_parent_dgm_cleanup(void *private_data)
1089 int ret;
1091 ret = messaging_dgm_wipe();
1092 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1093 ret ? strerror(ret) : "ok"));
1094 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1095 60*15);
1098 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1100 struct messaging_context *msg = tevent_req_callback_data(
1101 req, struct messaging_context);
1102 NTSTATUS status;
1104 status = background_job_recv(req);
1105 TALLOC_FREE(req);
1106 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1107 nt_errstr(status)));
1109 req = background_job_send(
1110 msg, msg->event_ctx, msg, NULL, 0,
1111 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1112 60*15),
1113 mess_parent_dgm_cleanup, msg);
1114 if (req == NULL) {
1115 DEBUG(1, ("background_job_send failed\n"));
1116 return;
1118 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1121 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1123 int ret;
1125 if (pid == 0) {
1126 ret = messaging_dgm_wipe();
1127 } else {
1128 ret = messaging_dgm_cleanup(pid);
1131 return ret;
1134 struct tevent_context *messaging_tevent_context(
1135 struct messaging_context *msg_ctx)
1137 return msg_ctx->event_ctx;
1140 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1142 return msg_ctx->names_db;
1145 /** @} **/