messaging_dgm: Don't expose the messaging_dgm_context
[Samba.git] / source3 / lib / messages.c
blob52d6538542fb165b0a17aeee34999846ba4f6276
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
56 struct messaging_callback {
57 struct messaging_callback *prev, *next;
58 uint32 msg_type;
59 void (*fn)(struct messaging_context *msg, void *private_data,
60 uint32_t msg_type,
61 struct server_id server_id, DATA_BLOB *data);
62 void *private_data;
65 struct messaging_context {
66 struct server_id id;
67 struct tevent_context *event_ctx;
68 struct messaging_callback *callbacks;
70 struct tevent_req **new_waiters;
71 unsigned num_new_waiters;
73 struct tevent_req **waiters;
74 unsigned num_waiters;
76 struct messaging_backend *remote;
79 struct messaging_hdr {
80 int msg_type;
81 struct server_id dst;
82 struct server_id src;
85 /****************************************************************************
86 A useful function for testing the message system.
87 ****************************************************************************/
89 static void ping_message(struct messaging_context *msg_ctx,
90 void *private_data,
91 uint32_t msg_type,
92 struct server_id src,
93 DATA_BLOB *data)
95 struct server_id_buf idbuf;
97 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
98 server_id_str_buf(src, &idbuf), (int)data->length,
99 data->data ? (char *)data->data : ""));
101 messaging_send(msg_ctx, src, MSG_PONG, data);
104 /****************************************************************************
105 Register/replace a dispatch function for a particular message type.
106 JRA changed Dec 13 2006. Only one message handler now permitted per type.
107 *NOTE*: Dispatch functions must be able to cope with incoming
108 messages on an *odd* byte boundary.
109 ****************************************************************************/
111 struct msg_all {
112 struct messaging_context *msg_ctx;
113 int msg_type;
114 uint32 msg_flag;
115 const void *buf;
116 size_t len;
117 int n_sent;
120 /****************************************************************************
121 Send one of the messages for the broadcast.
122 ****************************************************************************/
124 static int traverse_fn(struct db_record *rec, const struct server_id *id,
125 uint32_t msg_flags, void *state)
127 struct msg_all *msg_all = (struct msg_all *)state;
128 NTSTATUS status;
130 /* Don't send if the receiver hasn't registered an interest. */
132 if((msg_flags & msg_all->msg_flag) == 0) {
133 return 0;
136 /* If the msg send fails because the pid was not found (i.e. smbd died),
137 * the msg has already been deleted from the messages.tdb.*/
139 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
140 (const uint8_t *)msg_all->buf, msg_all->len);
142 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
143 struct server_id_buf idbuf;
146 * If the pid was not found delete the entry from
147 * serverid.tdb
150 DEBUG(2, ("pid %s doesn't exist\n",
151 server_id_str_buf(*id, &idbuf)));
153 dbwrap_record_delete(rec);
155 msg_all->n_sent++;
156 return 0;
160 * Send a message to all smbd processes.
162 * It isn't very efficient, but should be OK for the sorts of
163 * applications that use it. When we need efficient broadcast we can add
164 * it.
166 * @param n_sent Set to the number of messages sent. This should be
167 * equal to the number of processes, but be careful for races.
169 * @retval True for success.
171 bool message_send_all(struct messaging_context *msg_ctx,
172 int msg_type,
173 const void *buf, size_t len,
174 int *n_sent)
176 struct msg_all msg_all;
178 msg_all.msg_type = msg_type;
179 if (msg_type < 0x100) {
180 msg_all.msg_flag = FLAG_MSG_GENERAL;
181 } else if (msg_type > 0x100 && msg_type < 0x200) {
182 msg_all.msg_flag = FLAG_MSG_NMBD;
183 } else if (msg_type > 0x200 && msg_type < 0x300) {
184 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
185 } else if (msg_type > 0x300 && msg_type < 0x400) {
186 msg_all.msg_flag = FLAG_MSG_SMBD;
187 } else if (msg_type > 0x400 && msg_type < 0x600) {
188 msg_all.msg_flag = FLAG_MSG_WINBIND;
189 } else if (msg_type > 4000 && msg_type < 5000) {
190 msg_all.msg_flag = FLAG_MSG_DBWRAP;
191 } else {
192 return false;
195 msg_all.buf = buf;
196 msg_all.len = len;
197 msg_all.n_sent = 0;
198 msg_all.msg_ctx = msg_ctx;
200 serverid_traverse(traverse_fn, &msg_all);
201 if (n_sent)
202 *n_sent = msg_all.n_sent;
203 return true;
206 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
207 void *private_data)
209 struct messaging_context *msg_ctx = talloc_get_type_abort(
210 private_data, struct messaging_context);
211 const struct messaging_hdr *hdr;
212 struct server_id_buf idbuf;
213 struct messaging_rec rec;
215 if (msg_len < sizeof(*hdr)) {
216 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
217 return;
221 * messages_dgm guarantees alignment, so we can cast here
223 hdr = (const struct messaging_hdr *)msg;
225 DEBUG(10, ("%s: Received message 0x%x len %u from %s\n", __func__,
226 (unsigned)hdr->msg_type, (unsigned)(msg_len - sizeof(*hdr)),
227 server_id_str_buf(hdr->src, &idbuf)));
229 rec = (struct messaging_rec) {
230 .msg_version = MESSAGE_VERSION,
231 .msg_type = hdr->msg_type,
232 .src = hdr->src,
233 .dest = hdr->dst,
234 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
235 .buf.length = msg_len - sizeof(*hdr)
238 messaging_dispatch_rec(msg_ctx, &rec);
241 static int messaging_context_destructor(struct messaging_context *ctx)
243 messaging_dgm_destroy();
244 return 0;
247 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
248 struct tevent_context *ev)
250 struct messaging_context *ctx;
251 NTSTATUS status;
252 int ret;
254 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
255 return NULL;
258 ctx->id = procid_self();
259 ctx->event_ctx = ev;
261 sec_init();
263 ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
264 lp_cache_directory(), sec_initial_uid(),
265 messaging_recv_cb, ctx);
267 if (ret != 0) {
268 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
269 TALLOC_FREE(ctx);
270 return NULL;
273 talloc_set_destructor(ctx, messaging_context_destructor);
275 if (lp_clustering()) {
276 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
278 if (!NT_STATUS_IS_OK(status)) {
279 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
280 nt_errstr(status)));
281 TALLOC_FREE(ctx);
282 return NULL;
285 ctx->id.vnn = get_my_vnn();
287 messaging_register(ctx, NULL, MSG_PING, ping_message);
289 /* Register some debugging related messages */
291 register_msg_pool_usage(ctx);
292 register_dmalloc_msgs(ctx);
293 debug_register_msgs(ctx);
295 return ctx;
298 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
300 return msg_ctx->id;
304 * re-init after a fork
306 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
308 NTSTATUS status;
309 int ret;
311 messaging_dgm_destroy();
313 msg_ctx->id = procid_self();
315 ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
316 lp_cache_directory(), sec_initial_uid(),
317 messaging_recv_cb, msg_ctx);
318 if (ret != 0) {
319 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
320 return map_nt_error_from_unix(ret);
323 TALLOC_FREE(msg_ctx->remote);
325 if (lp_clustering()) {
326 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
327 &msg_ctx->remote);
329 if (!NT_STATUS_IS_OK(status)) {
330 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
331 nt_errstr(status)));
332 return status;
336 return NT_STATUS_OK;
341 * Register a dispatch function for a particular message type. Allow multiple
342 * registrants
344 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
345 void *private_data,
346 uint32_t msg_type,
347 void (*fn)(struct messaging_context *msg,
348 void *private_data,
349 uint32_t msg_type,
350 struct server_id server_id,
351 DATA_BLOB *data))
353 struct messaging_callback *cb;
355 DEBUG(5, ("Registering messaging pointer for type %u - "
356 "private_data=%p\n",
357 (unsigned)msg_type, private_data));
360 * Only one callback per type
363 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
364 /* we allow a second registration of the same message
365 type if it has a different private pointer. This is
366 needed in, for example, the internal notify code,
367 which creates a new notify context for each tree
368 connect, and expects to receive messages to each of
369 them. */
370 if (cb->msg_type == msg_type && private_data == cb->private_data) {
371 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
372 (unsigned)msg_type, private_data));
373 cb->fn = fn;
374 cb->private_data = private_data;
375 return NT_STATUS_OK;
379 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
380 return NT_STATUS_NO_MEMORY;
383 cb->msg_type = msg_type;
384 cb->fn = fn;
385 cb->private_data = private_data;
387 DLIST_ADD(msg_ctx->callbacks, cb);
388 return NT_STATUS_OK;
392 De-register the function for a particular message type.
394 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
395 void *private_data)
397 struct messaging_callback *cb, *next;
399 for (cb = ctx->callbacks; cb; cb = next) {
400 next = cb->next;
401 if ((cb->msg_type == msg_type)
402 && (cb->private_data == private_data)) {
403 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
404 (unsigned)msg_type, private_data));
405 DLIST_REMOVE(ctx->callbacks, cb);
406 TALLOC_FREE(cb);
412 Send a message to a particular server
414 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
415 struct server_id server, uint32_t msg_type,
416 const DATA_BLOB *data)
418 struct iovec iov;
420 iov.iov_base = data->data;
421 iov.iov_len = data->length;
423 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
426 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
427 struct server_id server, uint32_t msg_type,
428 const uint8_t *buf, size_t len)
430 DATA_BLOB blob = data_blob_const(buf, len);
431 return messaging_send(msg_ctx, server, msg_type, &blob);
434 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
435 struct server_id server, uint32_t msg_type,
436 const struct iovec *iov, int iovlen)
438 int ret;
439 struct messaging_hdr hdr;
440 struct iovec iov2[iovlen+1];
442 if (server_id_is_disconnected(&server)) {
443 return NT_STATUS_INVALID_PARAMETER_MIX;
446 if (!procid_is_local(&server)) {
447 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
448 msg_type, iov, iovlen,
449 msg_ctx->remote);
450 if (ret != 0) {
451 return map_nt_error_from_unix(ret);
453 return NT_STATUS_OK;
456 if (server_id_same_process(&msg_ctx->id, &server)) {
457 struct messaging_rec rec;
458 uint8_t *buf;
461 * Self-send, directly dispatch
464 buf = iov_buf(talloc_tos(), iov, iovlen);
465 if (buf == NULL) {
466 return NT_STATUS_NO_MEMORY;
469 rec.msg_version = MESSAGE_VERSION;
470 rec.msg_type = msg_type & MSG_TYPE_MASK;
471 rec.dest = server;
472 rec.src = msg_ctx->id;
473 rec.buf = data_blob_const(buf, talloc_get_size(buf));
474 messaging_dispatch_rec(msg_ctx, &rec);
475 TALLOC_FREE(buf);
476 return NT_STATUS_OK;
479 hdr = (struct messaging_hdr) {
480 .msg_type = msg_type,
481 .dst = server,
482 .src = msg_ctx->id
484 iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
485 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
487 become_root();
488 ret = messaging_dgm_send(server.pid, iov2, iovlen+1);
489 unbecome_root();
491 if (ret != 0) {
492 return map_nt_error_from_unix(ret);
494 return NT_STATUS_OK;
497 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
498 struct messaging_rec *rec)
500 struct messaging_rec *result;
502 result = talloc_pooled_object(mem_ctx, struct messaging_rec,
503 1, rec->buf.length);
504 if (result == NULL) {
505 return NULL;
507 *result = *rec;
509 /* Doesn't fail, see talloc_pooled_object */
511 result->buf.data = talloc_memdup(result, rec->buf.data,
512 rec->buf.length);
513 return result;
516 struct messaging_filtered_read_state {
517 struct tevent_context *ev;
518 struct messaging_context *msg_ctx;
519 void *tevent_handle;
521 bool (*filter)(struct messaging_rec *rec, void *private_data);
522 void *private_data;
524 struct messaging_rec *rec;
527 static void messaging_filtered_read_cleanup(struct tevent_req *req,
528 enum tevent_req_state req_state);
530 struct tevent_req *messaging_filtered_read_send(
531 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
532 struct messaging_context *msg_ctx,
533 bool (*filter)(struct messaging_rec *rec, void *private_data),
534 void *private_data)
536 struct tevent_req *req;
537 struct messaging_filtered_read_state *state;
538 size_t new_waiters_len;
540 req = tevent_req_create(mem_ctx, &state,
541 struct messaging_filtered_read_state);
542 if (req == NULL) {
543 return NULL;
545 state->ev = ev;
546 state->msg_ctx = msg_ctx;
547 state->filter = filter;
548 state->private_data = private_data;
551 * We have to defer the callback here, as we might be called from
552 * within a different tevent_context than state->ev
554 tevent_req_defer_callback(req, state->ev);
556 state->tevent_handle = messaging_dgm_register_tevent_context(
557 state, ev);
558 if (tevent_req_nomem(state, req)) {
559 return tevent_req_post(req, ev);
563 * We add ourselves to the "new_waiters" array, not the "waiters"
564 * array. If we are called from within messaging_read_done,
565 * messaging_dispatch_rec will be in an active for-loop on
566 * "waiters". We must be careful not to mess with this array, because
567 * it could mean that a single event is being delivered twice.
570 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
572 if (new_waiters_len == msg_ctx->num_new_waiters) {
573 struct tevent_req **tmp;
575 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
576 struct tevent_req *, new_waiters_len+1);
577 if (tevent_req_nomem(tmp, req)) {
578 return tevent_req_post(req, ev);
580 msg_ctx->new_waiters = tmp;
583 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
584 msg_ctx->num_new_waiters += 1;
585 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
587 return req;
590 static void messaging_filtered_read_cleanup(struct tevent_req *req,
591 enum tevent_req_state req_state)
593 struct messaging_filtered_read_state *state = tevent_req_data(
594 req, struct messaging_filtered_read_state);
595 struct messaging_context *msg_ctx = state->msg_ctx;
596 unsigned i;
598 tevent_req_set_cleanup_fn(req, NULL);
600 TALLOC_FREE(state->tevent_handle);
603 * Just set the [new_]waiters entry to NULL, be careful not to mess
604 * with the other "waiters" array contents. We are often called from
605 * within "messaging_dispatch_rec", which loops over
606 * "waiters". Messing with the "waiters" array will mess up that
607 * for-loop.
610 for (i=0; i<msg_ctx->num_waiters; i++) {
611 if (msg_ctx->waiters[i] == req) {
612 msg_ctx->waiters[i] = NULL;
613 return;
617 for (i=0; i<msg_ctx->num_new_waiters; i++) {
618 if (msg_ctx->new_waiters[i] == req) {
619 msg_ctx->new_waiters[i] = NULL;
620 return;
625 static void messaging_filtered_read_done(struct tevent_req *req,
626 struct messaging_rec *rec)
628 struct messaging_filtered_read_state *state = tevent_req_data(
629 req, struct messaging_filtered_read_state);
631 state->rec = messaging_rec_dup(state, rec);
632 if (tevent_req_nomem(state->rec, req)) {
633 return;
635 tevent_req_done(req);
638 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
639 struct messaging_rec **presult)
641 struct messaging_filtered_read_state *state = tevent_req_data(
642 req, struct messaging_filtered_read_state);
643 int err;
645 if (tevent_req_is_unix_error(req, &err)) {
646 tevent_req_received(req);
647 return err;
649 *presult = talloc_move(mem_ctx, &state->rec);
650 return 0;
653 struct messaging_read_state {
654 uint32_t msg_type;
655 struct messaging_rec *rec;
658 static bool messaging_read_filter(struct messaging_rec *rec,
659 void *private_data);
660 static void messaging_read_done(struct tevent_req *subreq);
662 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
663 struct tevent_context *ev,
664 struct messaging_context *msg,
665 uint32_t msg_type)
667 struct tevent_req *req, *subreq;
668 struct messaging_read_state *state;
670 req = tevent_req_create(mem_ctx, &state,
671 struct messaging_read_state);
672 if (req == NULL) {
673 return NULL;
675 state->msg_type = msg_type;
677 subreq = messaging_filtered_read_send(state, ev, msg,
678 messaging_read_filter, state);
679 if (tevent_req_nomem(subreq, req)) {
680 return tevent_req_post(req, ev);
682 tevent_req_set_callback(subreq, messaging_read_done, req);
683 return req;
686 static bool messaging_read_filter(struct messaging_rec *rec,
687 void *private_data)
689 struct messaging_read_state *state = talloc_get_type_abort(
690 private_data, struct messaging_read_state);
692 return rec->msg_type == state->msg_type;
695 static void messaging_read_done(struct tevent_req *subreq)
697 struct tevent_req *req = tevent_req_callback_data(
698 subreq, struct tevent_req);
699 struct messaging_read_state *state = tevent_req_data(
700 req, struct messaging_read_state);
701 int ret;
703 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
704 TALLOC_FREE(subreq);
705 if (tevent_req_error(req, ret)) {
706 return;
708 tevent_req_done(req);
711 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
712 struct messaging_rec **presult)
714 struct messaging_read_state *state = tevent_req_data(
715 req, struct messaging_read_state);
716 int err;
718 if (tevent_req_is_unix_error(req, &err)) {
719 return err;
721 if (presult != NULL) {
722 *presult = talloc_move(mem_ctx, &state->rec);
724 return 0;
727 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
729 if (msg_ctx->num_new_waiters == 0) {
730 return true;
733 if (talloc_array_length(msg_ctx->waiters) <
734 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
735 struct tevent_req **tmp;
736 tmp = talloc_realloc(
737 msg_ctx, msg_ctx->waiters, struct tevent_req *,
738 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
739 if (tmp == NULL) {
740 DEBUG(1, ("%s: talloc failed\n", __func__));
741 return false;
743 msg_ctx->waiters = tmp;
746 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
747 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
749 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
750 msg_ctx->num_new_waiters = 0;
752 return true;
755 struct messaging_defer_callback_state {
756 struct messaging_context *msg_ctx;
757 struct messaging_rec *rec;
758 void (*fn)(struct messaging_context *msg, void *private_data,
759 uint32_t msg_type, struct server_id server_id,
760 DATA_BLOB *data);
761 void *private_data;
764 static void messaging_defer_callback_trigger(struct tevent_context *ev,
765 struct tevent_immediate *im,
766 void *private_data);
768 static void messaging_defer_callback(
769 struct messaging_context *msg_ctx, struct messaging_rec *rec,
770 void (*fn)(struct messaging_context *msg, void *private_data,
771 uint32_t msg_type, struct server_id server_id,
772 DATA_BLOB *data),
773 void *private_data)
775 struct messaging_defer_callback_state *state;
776 struct tevent_immediate *im;
778 state = talloc(msg_ctx, struct messaging_defer_callback_state);
779 if (state == NULL) {
780 DEBUG(1, ("talloc failed\n"));
781 return;
783 state->msg_ctx = msg_ctx;
784 state->fn = fn;
785 state->private_data = private_data;
787 state->rec = messaging_rec_dup(state, rec);
788 if (state->rec == NULL) {
789 DEBUG(1, ("talloc failed\n"));
790 TALLOC_FREE(state);
791 return;
794 im = tevent_create_immediate(state);
795 if (im == NULL) {
796 DEBUG(1, ("tevent_create_immediate failed\n"));
797 TALLOC_FREE(state);
798 return;
800 tevent_schedule_immediate(im, msg_ctx->event_ctx,
801 messaging_defer_callback_trigger, state);
804 static void messaging_defer_callback_trigger(struct tevent_context *ev,
805 struct tevent_immediate *im,
806 void *private_data)
808 struct messaging_defer_callback_state *state = talloc_get_type_abort(
809 private_data, struct messaging_defer_callback_state);
810 struct messaging_rec *rec = state->rec;
812 state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
813 &rec->buf);
814 TALLOC_FREE(state);
818 Dispatch one messaging_rec
820 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
821 struct messaging_rec *rec)
823 struct messaging_callback *cb, *next;
824 unsigned i;
826 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
827 next = cb->next;
828 if (cb->msg_type != rec->msg_type) {
829 continue;
832 if (server_id_same_process(&rec->src, &rec->dest)) {
834 * This is a self-send. We are called here from
835 * messaging_send(), and we don't want to directly
836 * recurse into the callback but go via a
837 * tevent_loop_once
839 messaging_defer_callback(msg_ctx, rec, cb->fn,
840 cb->private_data);
841 } else {
843 * This comes from a different process. we are called
844 * from the event loop, so we should call back
845 * directly.
847 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
848 rec->src, &rec->buf);
851 * we continue looking for matching messages after finding
852 * one. This matters for subsystems like the internal notify
853 * code which register more than one handler for the same
854 * message type
858 if (!messaging_append_new_waiters(msg_ctx)) {
859 return;
862 i = 0;
863 while (i < msg_ctx->num_waiters) {
864 struct tevent_req *req;
865 struct messaging_filtered_read_state *state;
867 req = msg_ctx->waiters[i];
868 if (req == NULL) {
870 * This got cleaned up. In the meantime,
871 * move everything down one. We need
872 * to keep the order of waiters, as
873 * other code may depend on this.
875 if (i < msg_ctx->num_waiters - 1) {
876 memmove(&msg_ctx->waiters[i],
877 &msg_ctx->waiters[i+1],
878 sizeof(struct tevent_req *) *
879 (msg_ctx->num_waiters - i - 1));
881 msg_ctx->num_waiters -= 1;
882 continue;
885 state = tevent_req_data(
886 req, struct messaging_filtered_read_state);
887 if (state->filter(rec, state->private_data)) {
888 messaging_filtered_read_done(req, rec);
891 i += 1;
895 static int mess_parent_dgm_cleanup(void *private_data);
896 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
898 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
900 struct tevent_req *req;
902 req = background_job_send(
903 msg, msg->event_ctx, msg, NULL, 0,
904 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
905 60*15),
906 mess_parent_dgm_cleanup, msg);
907 if (req == NULL) {
908 return false;
910 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
911 return true;
914 static int mess_parent_dgm_cleanup(void *private_data)
916 int ret;
918 ret = messaging_dgm_wipe();
919 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
920 ret ? strerror(ret) : "ok"));
921 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
922 60*15);
925 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
927 struct messaging_context *msg = tevent_req_callback_data(
928 req, struct messaging_context);
929 NTSTATUS status;
931 status = background_job_recv(req);
932 TALLOC_FREE(req);
933 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
934 nt_errstr(status)));
936 req = background_job_send(
937 msg, msg->event_ctx, msg, NULL, 0,
938 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
939 60*15),
940 mess_parent_dgm_cleanup, msg);
941 if (req == NULL) {
942 DEBUG(1, ("background_job_send failed\n"));
944 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
947 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
949 int ret;
951 if (pid == 0) {
952 ret = messaging_dgm_wipe();
953 } else {
954 ret = messaging_dgm_cleanup(pid);
957 return ret;
960 struct tevent_context *messaging_tevent_context(
961 struct messaging_context *msg_ctx)
963 return msg_ctx->event_ctx;
966 /** @} **/