messaging_dgm: Receive through a cb function
[Samba.git] / source3 / lib / messages.c
blob2e80bab4c7da6d4a3307b4d1e3296a86817db2f6
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
55 struct messaging_callback {
56 struct messaging_callback *prev, *next;
57 uint32 msg_type;
58 void (*fn)(struct messaging_context *msg, void *private_data,
59 uint32_t msg_type,
60 struct server_id server_id, DATA_BLOB *data);
61 void *private_data;
64 struct messaging_context {
65 struct server_id id;
66 struct tevent_context *event_ctx;
67 struct messaging_callback *callbacks;
69 struct tevent_req **new_waiters;
70 unsigned num_new_waiters;
72 struct tevent_req **waiters;
73 unsigned num_waiters;
75 struct messaging_backend *local;
76 struct messaging_backend *remote;
78 bool *have_context;
81 static int messaging_context_destructor(struct messaging_context *msg_ctx);
83 /****************************************************************************
84 A useful function for testing the message system.
85 ****************************************************************************/
87 static void ping_message(struct messaging_context *msg_ctx,
88 void *private_data,
89 uint32_t msg_type,
90 struct server_id src,
91 DATA_BLOB *data)
93 struct server_id_buf idbuf;
95 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
96 server_id_str_buf(src, &idbuf), (int)data->length,
97 data->data ? (char *)data->data : ""));
99 messaging_send(msg_ctx, src, MSG_PONG, data);
102 /****************************************************************************
103 Register/replace a dispatch function for a particular message type.
104 JRA changed Dec 13 2006. Only one message handler now permitted per type.
105 *NOTE*: Dispatch functions must be able to cope with incoming
106 messages on an *odd* byte boundary.
107 ****************************************************************************/
109 struct msg_all {
110 struct messaging_context *msg_ctx;
111 int msg_type;
112 uint32 msg_flag;
113 const void *buf;
114 size_t len;
115 int n_sent;
118 /****************************************************************************
119 Send one of the messages for the broadcast.
120 ****************************************************************************/
122 static int traverse_fn(struct db_record *rec, const struct server_id *id,
123 uint32_t msg_flags, void *state)
125 struct msg_all *msg_all = (struct msg_all *)state;
126 NTSTATUS status;
128 /* Don't send if the receiver hasn't registered an interest. */
130 if((msg_flags & msg_all->msg_flag) == 0) {
131 return 0;
134 /* If the msg send fails because the pid was not found (i.e. smbd died),
135 * the msg has already been deleted from the messages.tdb.*/
137 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
138 (const uint8_t *)msg_all->buf, msg_all->len);
140 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
141 struct server_id_buf idbuf;
144 * If the pid was not found delete the entry from
145 * serverid.tdb
148 DEBUG(2, ("pid %s doesn't exist\n",
149 server_id_str_buf(*id, &idbuf)));
151 dbwrap_record_delete(rec);
153 msg_all->n_sent++;
154 return 0;
158 * Send a message to all smbd processes.
160 * It isn't very efficient, but should be OK for the sorts of
161 * applications that use it. When we need efficient broadcast we can add
162 * it.
164 * @param n_sent Set to the number of messages sent. This should be
165 * equal to the number of processes, but be careful for races.
167 * @retval True for success.
169 bool message_send_all(struct messaging_context *msg_ctx,
170 int msg_type,
171 const void *buf, size_t len,
172 int *n_sent)
174 struct msg_all msg_all;
176 msg_all.msg_type = msg_type;
177 if (msg_type < 0x100) {
178 msg_all.msg_flag = FLAG_MSG_GENERAL;
179 } else if (msg_type > 0x100 && msg_type < 0x200) {
180 msg_all.msg_flag = FLAG_MSG_NMBD;
181 } else if (msg_type > 0x200 && msg_type < 0x300) {
182 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
183 } else if (msg_type > 0x300 && msg_type < 0x400) {
184 msg_all.msg_flag = FLAG_MSG_SMBD;
185 } else if (msg_type > 0x400 && msg_type < 0x600) {
186 msg_all.msg_flag = FLAG_MSG_WINBIND;
187 } else if (msg_type > 4000 && msg_type < 5000) {
188 msg_all.msg_flag = FLAG_MSG_DBWRAP;
189 } else {
190 return false;
193 msg_all.buf = buf;
194 msg_all.len = len;
195 msg_all.n_sent = 0;
196 msg_all.msg_ctx = msg_ctx;
198 serverid_traverse(traverse_fn, &msg_all);
199 if (n_sent)
200 *n_sent = msg_all.n_sent;
201 return true;
204 static void messaging_recv_cb(int msg_type,
205 struct server_id src, struct server_id dst,
206 const uint8_t *msg, size_t msg_len,
207 void *private_data)
209 struct messaging_context *msg_ctx = talloc_get_type_abort(
210 private_data, struct messaging_context);
211 struct messaging_rec rec;
213 rec = (struct messaging_rec) {
214 .msg_version = MESSAGE_VERSION,
215 .msg_type = msg_type,
216 .src = src,
217 .dest = dst,
218 .buf.data = discard_const_p(uint8, msg),
219 .buf.length = msg_len
222 messaging_dispatch_rec(msg_ctx, &rec);
225 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
226 struct tevent_context *ev)
228 struct messaging_context *ctx;
229 NTSTATUS status;
230 int ret;
231 static bool have_context = false;
233 if (have_context) {
234 DEBUG(0, ("No two messaging contexts per process\n"));
235 return NULL;
239 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
240 return NULL;
243 ctx->id = procid_self();
244 ctx->event_ctx = ev;
245 ctx->have_context = &have_context;
247 ret = messaging_dgm_init(ctx, ctx, &ctx->local,
248 messaging_recv_cb, ctx);
250 if (ret != 0) {
251 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
252 TALLOC_FREE(ctx);
253 return NULL;
256 if (lp_clustering()) {
257 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
259 if (!NT_STATUS_IS_OK(status)) {
260 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
261 nt_errstr(status)));
262 TALLOC_FREE(ctx);
263 return NULL;
266 ctx->id.vnn = get_my_vnn();
268 messaging_register(ctx, NULL, MSG_PING, ping_message);
270 /* Register some debugging related messages */
272 register_msg_pool_usage(ctx);
273 register_dmalloc_msgs(ctx);
274 debug_register_msgs(ctx);
276 have_context = true;
277 talloc_set_destructor(ctx, messaging_context_destructor);
279 return ctx;
282 static int messaging_context_destructor(struct messaging_context *msg_ctx)
284 SMB_ASSERT(*msg_ctx->have_context);
285 *msg_ctx->have_context = false;
286 return 0;
289 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
291 return msg_ctx->id;
295 * re-init after a fork
297 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
299 NTSTATUS status;
300 int ret;
302 TALLOC_FREE(msg_ctx->local);
304 msg_ctx->id = procid_self();
306 ret = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local,
307 messaging_recv_cb, msg_ctx);
308 if (ret != 0) {
309 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
310 return map_nt_error_from_unix(ret);
313 TALLOC_FREE(msg_ctx->remote);
315 if (lp_clustering()) {
316 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
317 &msg_ctx->remote);
319 if (!NT_STATUS_IS_OK(status)) {
320 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
321 nt_errstr(status)));
322 return status;
326 return NT_STATUS_OK;
331 * Register a dispatch function for a particular message type. Allow multiple
332 * registrants
334 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
335 void *private_data,
336 uint32_t msg_type,
337 void (*fn)(struct messaging_context *msg,
338 void *private_data,
339 uint32_t msg_type,
340 struct server_id server_id,
341 DATA_BLOB *data))
343 struct messaging_callback *cb;
345 DEBUG(5, ("Registering messaging pointer for type %u - "
346 "private_data=%p\n",
347 (unsigned)msg_type, private_data));
350 * Only one callback per type
353 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
354 /* we allow a second registration of the same message
355 type if it has a different private pointer. This is
356 needed in, for example, the internal notify code,
357 which creates a new notify context for each tree
358 connect, and expects to receive messages to each of
359 them. */
360 if (cb->msg_type == msg_type && private_data == cb->private_data) {
361 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
362 (unsigned)msg_type, private_data));
363 cb->fn = fn;
364 cb->private_data = private_data;
365 return NT_STATUS_OK;
369 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
370 return NT_STATUS_NO_MEMORY;
373 cb->msg_type = msg_type;
374 cb->fn = fn;
375 cb->private_data = private_data;
377 DLIST_ADD(msg_ctx->callbacks, cb);
378 return NT_STATUS_OK;
382 De-register the function for a particular message type.
384 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
385 void *private_data)
387 struct messaging_callback *cb, *next;
389 for (cb = ctx->callbacks; cb; cb = next) {
390 next = cb->next;
391 if ((cb->msg_type == msg_type)
392 && (cb->private_data == private_data)) {
393 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
394 (unsigned)msg_type, private_data));
395 DLIST_REMOVE(ctx->callbacks, cb);
396 TALLOC_FREE(cb);
401 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
402 const struct server_id *dst)
404 return ((msg_ctx->id.vnn == dst->vnn) &&
405 (msg_ctx->id.pid == dst->pid));
409 Send a message to a particular server
411 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
412 struct server_id server, uint32_t msg_type,
413 const DATA_BLOB *data)
415 struct iovec iov;
417 iov.iov_base = data->data;
418 iov.iov_len = data->length;
420 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
423 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
424 struct server_id server, uint32_t msg_type,
425 const uint8_t *buf, size_t len)
427 DATA_BLOB blob = data_blob_const(buf, len);
428 return messaging_send(msg_ctx, server, msg_type, &blob);
431 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
432 struct server_id server, uint32_t msg_type,
433 const struct iovec *iov, int iovlen)
435 int ret;
437 if (server_id_is_disconnected(&server)) {
438 return NT_STATUS_INVALID_PARAMETER_MIX;
441 if (!procid_is_local(&server)) {
442 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
443 msg_type, iov, iovlen,
444 msg_ctx->remote);
445 if (ret != 0) {
446 return map_nt_error_from_unix(ret);
448 return NT_STATUS_OK;
451 if (messaging_is_self_send(msg_ctx, &server)) {
452 struct messaging_rec rec;
453 uint8_t *buf;
455 buf = iov_buf(talloc_tos(), iov, iovlen);
456 if (buf == NULL) {
457 return NT_STATUS_NO_MEMORY;
460 rec.msg_version = MESSAGE_VERSION;
461 rec.msg_type = msg_type & MSG_TYPE_MASK;
462 rec.dest = server;
463 rec.src = msg_ctx->id;
464 rec.buf = data_blob_const(buf, talloc_get_size(buf));
465 messaging_dispatch_rec(msg_ctx, &rec);
466 TALLOC_FREE(buf);
467 return NT_STATUS_OK;
470 ret = msg_ctx->local->send_fn(msg_ctx->id, server, msg_type,
471 iov, iovlen, msg_ctx->local);
472 if (ret != 0) {
473 return map_nt_error_from_unix(ret);
475 return NT_STATUS_OK;
478 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
479 struct messaging_rec *rec)
481 struct messaging_rec *result;
483 result = talloc_pooled_object(mem_ctx, struct messaging_rec,
484 1, rec->buf.length);
485 if (result == NULL) {
486 return NULL;
488 *result = *rec;
490 /* Doesn't fail, see talloc_pooled_object */
492 result->buf.data = talloc_memdup(result, rec->buf.data,
493 rec->buf.length);
494 return result;
497 struct messaging_filtered_read_state {
498 struct tevent_context *ev;
499 struct messaging_context *msg_ctx;
500 void *tevent_handle;
502 bool (*filter)(struct messaging_rec *rec, void *private_data);
503 void *private_data;
505 struct messaging_rec *rec;
508 static void messaging_filtered_read_cleanup(struct tevent_req *req,
509 enum tevent_req_state req_state);
511 struct tevent_req *messaging_filtered_read_send(
512 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
513 struct messaging_context *msg_ctx,
514 bool (*filter)(struct messaging_rec *rec, void *private_data),
515 void *private_data)
517 struct tevent_req *req;
518 struct messaging_filtered_read_state *state;
519 size_t new_waiters_len;
521 req = tevent_req_create(mem_ctx, &state,
522 struct messaging_filtered_read_state);
523 if (req == NULL) {
524 return NULL;
526 state->ev = ev;
527 state->msg_ctx = msg_ctx;
528 state->filter = filter;
529 state->private_data = private_data;
532 * We have to defer the callback here, as we might be called from
533 * within a different tevent_context than state->ev
535 tevent_req_defer_callback(req, state->ev);
537 state->tevent_handle = messaging_dgm_register_tevent_context(
538 state, msg_ctx, ev);
539 if (tevent_req_nomem(state, req)) {
540 return tevent_req_post(req, ev);
544 * We add ourselves to the "new_waiters" array, not the "waiters"
545 * array. If we are called from within messaging_read_done,
546 * messaging_dispatch_rec will be in an active for-loop on
547 * "waiters". We must be careful not to mess with this array, because
548 * it could mean that a single event is being delivered twice.
551 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
553 if (new_waiters_len == msg_ctx->num_new_waiters) {
554 struct tevent_req **tmp;
556 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
557 struct tevent_req *, new_waiters_len+1);
558 if (tevent_req_nomem(tmp, req)) {
559 return tevent_req_post(req, ev);
561 msg_ctx->new_waiters = tmp;
564 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
565 msg_ctx->num_new_waiters += 1;
566 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
568 return req;
571 static void messaging_filtered_read_cleanup(struct tevent_req *req,
572 enum tevent_req_state req_state)
574 struct messaging_filtered_read_state *state = tevent_req_data(
575 req, struct messaging_filtered_read_state);
576 struct messaging_context *msg_ctx = state->msg_ctx;
577 unsigned i;
579 tevent_req_set_cleanup_fn(req, NULL);
581 TALLOC_FREE(state->tevent_handle);
584 * Just set the [new_]waiters entry to NULL, be careful not to mess
585 * with the other "waiters" array contents. We are often called from
586 * within "messaging_dispatch_rec", which loops over
587 * "waiters". Messing with the "waiters" array will mess up that
588 * for-loop.
591 for (i=0; i<msg_ctx->num_waiters; i++) {
592 if (msg_ctx->waiters[i] == req) {
593 msg_ctx->waiters[i] = NULL;
594 return;
598 for (i=0; i<msg_ctx->num_new_waiters; i++) {
599 if (msg_ctx->new_waiters[i] == req) {
600 msg_ctx->new_waiters[i] = NULL;
601 return;
606 static void messaging_filtered_read_done(struct tevent_req *req,
607 struct messaging_rec *rec)
609 struct messaging_filtered_read_state *state = tevent_req_data(
610 req, struct messaging_filtered_read_state);
612 state->rec = messaging_rec_dup(state, rec);
613 if (tevent_req_nomem(state->rec, req)) {
614 return;
616 tevent_req_done(req);
619 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
620 struct messaging_rec **presult)
622 struct messaging_filtered_read_state *state = tevent_req_data(
623 req, struct messaging_filtered_read_state);
624 int err;
626 if (tevent_req_is_unix_error(req, &err)) {
627 tevent_req_received(req);
628 return err;
630 *presult = talloc_move(mem_ctx, &state->rec);
631 return 0;
634 struct messaging_read_state {
635 uint32_t msg_type;
636 struct messaging_rec *rec;
639 static bool messaging_read_filter(struct messaging_rec *rec,
640 void *private_data);
641 static void messaging_read_done(struct tevent_req *subreq);
643 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
644 struct tevent_context *ev,
645 struct messaging_context *msg,
646 uint32_t msg_type)
648 struct tevent_req *req, *subreq;
649 struct messaging_read_state *state;
651 req = tevent_req_create(mem_ctx, &state,
652 struct messaging_read_state);
653 if (req == NULL) {
654 return NULL;
656 state->msg_type = msg_type;
658 subreq = messaging_filtered_read_send(state, ev, msg,
659 messaging_read_filter, state);
660 if (tevent_req_nomem(subreq, req)) {
661 return tevent_req_post(req, ev);
663 tevent_req_set_callback(subreq, messaging_read_done, req);
664 return req;
667 static bool messaging_read_filter(struct messaging_rec *rec,
668 void *private_data)
670 struct messaging_read_state *state = talloc_get_type_abort(
671 private_data, struct messaging_read_state);
673 return rec->msg_type == state->msg_type;
676 static void messaging_read_done(struct tevent_req *subreq)
678 struct tevent_req *req = tevent_req_callback_data(
679 subreq, struct tevent_req);
680 struct messaging_read_state *state = tevent_req_data(
681 req, struct messaging_read_state);
682 int ret;
684 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
685 TALLOC_FREE(subreq);
686 if (tevent_req_error(req, ret)) {
687 return;
689 tevent_req_done(req);
692 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
693 struct messaging_rec **presult)
695 struct messaging_read_state *state = tevent_req_data(
696 req, struct messaging_read_state);
697 int err;
699 if (tevent_req_is_unix_error(req, &err)) {
700 return err;
702 if (presult != NULL) {
703 *presult = talloc_move(mem_ctx, &state->rec);
705 return 0;
708 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
710 if (msg_ctx->num_new_waiters == 0) {
711 return true;
714 if (talloc_array_length(msg_ctx->waiters) <
715 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
716 struct tevent_req **tmp;
717 tmp = talloc_realloc(
718 msg_ctx, msg_ctx->waiters, struct tevent_req *,
719 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
720 if (tmp == NULL) {
721 DEBUG(1, ("%s: talloc failed\n", __func__));
722 return false;
724 msg_ctx->waiters = tmp;
727 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
728 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
730 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
731 msg_ctx->num_new_waiters = 0;
733 return true;
736 struct messaging_defer_callback_state {
737 struct messaging_context *msg_ctx;
738 struct messaging_rec *rec;
739 void (*fn)(struct messaging_context *msg, void *private_data,
740 uint32_t msg_type, struct server_id server_id,
741 DATA_BLOB *data);
742 void *private_data;
745 static void messaging_defer_callback_trigger(struct tevent_context *ev,
746 struct tevent_immediate *im,
747 void *private_data);
749 static void messaging_defer_callback(
750 struct messaging_context *msg_ctx, struct messaging_rec *rec,
751 void (*fn)(struct messaging_context *msg, void *private_data,
752 uint32_t msg_type, struct server_id server_id,
753 DATA_BLOB *data),
754 void *private_data)
756 struct messaging_defer_callback_state *state;
757 struct tevent_immediate *im;
759 state = talloc(msg_ctx, struct messaging_defer_callback_state);
760 if (state == NULL) {
761 DEBUG(1, ("talloc failed\n"));
762 return;
764 state->msg_ctx = msg_ctx;
765 state->fn = fn;
766 state->private_data = private_data;
768 state->rec = messaging_rec_dup(state, rec);
769 if (state->rec == NULL) {
770 DEBUG(1, ("talloc failed\n"));
771 TALLOC_FREE(state);
772 return;
775 im = tevent_create_immediate(state);
776 if (im == NULL) {
777 DEBUG(1, ("tevent_create_immediate failed\n"));
778 TALLOC_FREE(state);
779 return;
781 tevent_schedule_immediate(im, msg_ctx->event_ctx,
782 messaging_defer_callback_trigger, state);
785 static void messaging_defer_callback_trigger(struct tevent_context *ev,
786 struct tevent_immediate *im,
787 void *private_data)
789 struct messaging_defer_callback_state *state = talloc_get_type_abort(
790 private_data, struct messaging_defer_callback_state);
791 struct messaging_rec *rec = state->rec;
793 state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
794 &rec->buf);
795 TALLOC_FREE(state);
799 Dispatch one messaging_rec
801 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
802 struct messaging_rec *rec)
804 struct messaging_callback *cb, *next;
805 unsigned i;
807 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
808 next = cb->next;
809 if (cb->msg_type != rec->msg_type) {
810 continue;
813 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
815 * This is a self-send. We are called here from
816 * messaging_send(), and we don't want to directly
817 * recurse into the callback but go via a
818 * tevent_loop_once
820 messaging_defer_callback(msg_ctx, rec, cb->fn,
821 cb->private_data);
822 } else {
824 * This comes from a different process. we are called
825 * from the event loop, so we should call back
826 * directly.
828 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
829 rec->src, &rec->buf);
832 * we continue looking for matching messages after finding
833 * one. This matters for subsystems like the internal notify
834 * code which register more than one handler for the same
835 * message type
839 if (!messaging_append_new_waiters(msg_ctx)) {
840 return;
843 i = 0;
844 while (i < msg_ctx->num_waiters) {
845 struct tevent_req *req;
846 struct messaging_filtered_read_state *state;
848 req = msg_ctx->waiters[i];
849 if (req == NULL) {
851 * This got cleaned up. In the meantime,
852 * move everything down one. We need
853 * to keep the order of waiters, as
854 * other code may depend on this.
856 if (i < msg_ctx->num_waiters - 1) {
857 memmove(&msg_ctx->waiters[i],
858 &msg_ctx->waiters[i+1],
859 sizeof(struct tevent_req *) *
860 (msg_ctx->num_waiters - i - 1));
862 msg_ctx->num_waiters -= 1;
863 continue;
866 state = tevent_req_data(
867 req, struct messaging_filtered_read_state);
868 if (state->filter(rec, state->private_data)) {
869 messaging_filtered_read_done(req, rec);
872 i += 1;
876 static int mess_parent_dgm_cleanup(void *private_data);
877 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
879 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
881 struct tevent_req *req;
883 req = background_job_send(
884 msg, msg->event_ctx, msg, NULL, 0,
885 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
886 60*15),
887 mess_parent_dgm_cleanup, msg);
888 if (req == NULL) {
889 return false;
891 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
892 return true;
895 static int mess_parent_dgm_cleanup(void *private_data)
897 struct messaging_context *msg_ctx = talloc_get_type_abort(
898 private_data, struct messaging_context);
899 int ret;
901 ret = messaging_dgm_wipe(msg_ctx);
902 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
903 ret ? strerror(ret) : "ok"));
904 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
905 60*15);
908 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
910 struct messaging_context *msg = tevent_req_callback_data(
911 req, struct messaging_context);
912 NTSTATUS status;
914 status = background_job_recv(req);
915 TALLOC_FREE(req);
916 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
917 nt_errstr(status)));
919 req = background_job_send(
920 msg, msg->event_ctx, msg, NULL, 0,
921 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
922 60*15),
923 mess_parent_dgm_cleanup, msg);
924 if (req == NULL) {
925 DEBUG(1, ("background_job_send failed\n"));
927 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
930 struct messaging_backend *messaging_local_backend(
931 struct messaging_context *msg_ctx)
933 return msg_ctx->local;
936 struct tevent_context *messaging_tevent_context(
937 struct messaging_context *msg_ctx)
939 return msg_ctx->event_ctx;
942 /** @} **/