messaging3: Directly refer to messaging_dgm in messages.c
[Samba.git] / source3 / lib / messages.c
blob18376bbe533e1a1fb26d22869b93f01e6bf0da6b
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
55 struct messaging_callback {
56 struct messaging_callback *prev, *next;
57 uint32 msg_type;
58 void (*fn)(struct messaging_context *msg, void *private_data,
59 uint32_t msg_type,
60 struct server_id server_id, DATA_BLOB *data);
61 void *private_data;
64 struct messaging_context {
65 struct server_id id;
66 struct tevent_context *event_ctx;
67 struct messaging_callback *callbacks;
69 struct tevent_req **new_waiters;
70 unsigned num_new_waiters;
72 struct tevent_req **waiters;
73 unsigned num_waiters;
75 struct messaging_dgm_context *local;
77 struct messaging_backend *remote;
79 bool *have_context;
82 static int messaging_context_destructor(struct messaging_context *msg_ctx);
84 /****************************************************************************
85 A useful function for testing the message system.
86 ****************************************************************************/
88 static void ping_message(struct messaging_context *msg_ctx,
89 void *private_data,
90 uint32_t msg_type,
91 struct server_id src,
92 DATA_BLOB *data)
94 struct server_id_buf idbuf;
96 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
97 server_id_str_buf(src, &idbuf), (int)data->length,
98 data->data ? (char *)data->data : ""));
100 messaging_send(msg_ctx, src, MSG_PONG, data);
103 /****************************************************************************
104 Register/replace a dispatch function for a particular message type.
105 JRA changed Dec 13 2006. Only one message handler now permitted per type.
106 *NOTE*: Dispatch functions must be able to cope with incoming
107 messages on an *odd* byte boundary.
108 ****************************************************************************/
110 struct msg_all {
111 struct messaging_context *msg_ctx;
112 int msg_type;
113 uint32 msg_flag;
114 const void *buf;
115 size_t len;
116 int n_sent;
119 /****************************************************************************
120 Send one of the messages for the broadcast.
121 ****************************************************************************/
123 static int traverse_fn(struct db_record *rec, const struct server_id *id,
124 uint32_t msg_flags, void *state)
126 struct msg_all *msg_all = (struct msg_all *)state;
127 NTSTATUS status;
129 /* Don't send if the receiver hasn't registered an interest. */
131 if((msg_flags & msg_all->msg_flag) == 0) {
132 return 0;
135 /* If the msg send fails because the pid was not found (i.e. smbd died),
136 * the msg has already been deleted from the messages.tdb.*/
138 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
139 (const uint8_t *)msg_all->buf, msg_all->len);
141 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
142 struct server_id_buf idbuf;
145 * If the pid was not found delete the entry from
146 * serverid.tdb
149 DEBUG(2, ("pid %s doesn't exist\n",
150 server_id_str_buf(*id, &idbuf)));
152 dbwrap_record_delete(rec);
154 msg_all->n_sent++;
155 return 0;
159 * Send a message to all smbd processes.
161 * It isn't very efficient, but should be OK for the sorts of
162 * applications that use it. When we need efficient broadcast we can add
163 * it.
165 * @param n_sent Set to the number of messages sent. This should be
166 * equal to the number of processes, but be careful for races.
168 * @retval True for success.
170 bool message_send_all(struct messaging_context *msg_ctx,
171 int msg_type,
172 const void *buf, size_t len,
173 int *n_sent)
175 struct msg_all msg_all;
177 msg_all.msg_type = msg_type;
178 if (msg_type < 0x100) {
179 msg_all.msg_flag = FLAG_MSG_GENERAL;
180 } else if (msg_type > 0x100 && msg_type < 0x200) {
181 msg_all.msg_flag = FLAG_MSG_NMBD;
182 } else if (msg_type > 0x200 && msg_type < 0x300) {
183 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
184 } else if (msg_type > 0x300 && msg_type < 0x400) {
185 msg_all.msg_flag = FLAG_MSG_SMBD;
186 } else if (msg_type > 0x400 && msg_type < 0x600) {
187 msg_all.msg_flag = FLAG_MSG_WINBIND;
188 } else if (msg_type > 4000 && msg_type < 5000) {
189 msg_all.msg_flag = FLAG_MSG_DBWRAP;
190 } else {
191 return false;
194 msg_all.buf = buf;
195 msg_all.len = len;
196 msg_all.n_sent = 0;
197 msg_all.msg_ctx = msg_ctx;
199 serverid_traverse(traverse_fn, &msg_all);
200 if (n_sent)
201 *n_sent = msg_all.n_sent;
202 return true;
205 static void messaging_recv_cb(int msg_type,
206 struct server_id src, struct server_id dst,
207 const uint8_t *msg, size_t msg_len,
208 void *private_data)
210 struct messaging_context *msg_ctx = talloc_get_type_abort(
211 private_data, struct messaging_context);
212 struct messaging_rec rec;
214 rec = (struct messaging_rec) {
215 .msg_version = MESSAGE_VERSION,
216 .msg_type = msg_type,
217 .src = src,
218 .dest = dst,
219 .buf.data = discard_const_p(uint8, msg),
220 .buf.length = msg_len
223 messaging_dispatch_rec(msg_ctx, &rec);
226 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
227 struct tevent_context *ev)
229 struct messaging_context *ctx;
230 NTSTATUS status;
231 int ret;
232 static bool have_context = false;
234 if (have_context) {
235 DEBUG(0, ("No two messaging contexts per process\n"));
236 return NULL;
240 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
241 return NULL;
244 ctx->id = procid_self();
245 ctx->event_ctx = ev;
246 ctx->have_context = &have_context;
248 ret = messaging_dgm_init(ctx, ctx->event_ctx, ctx->id,
249 messaging_recv_cb, ctx, &ctx->local);
251 if (ret != 0) {
252 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
253 TALLOC_FREE(ctx);
254 return NULL;
257 if (lp_clustering()) {
258 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
260 if (!NT_STATUS_IS_OK(status)) {
261 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
262 nt_errstr(status)));
263 TALLOC_FREE(ctx);
264 return NULL;
267 ctx->id.vnn = get_my_vnn();
269 messaging_register(ctx, NULL, MSG_PING, ping_message);
271 /* Register some debugging related messages */
273 register_msg_pool_usage(ctx);
274 register_dmalloc_msgs(ctx);
275 debug_register_msgs(ctx);
277 have_context = true;
278 talloc_set_destructor(ctx, messaging_context_destructor);
280 return ctx;
283 static int messaging_context_destructor(struct messaging_context *msg_ctx)
285 SMB_ASSERT(*msg_ctx->have_context);
286 *msg_ctx->have_context = false;
287 return 0;
290 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
292 return msg_ctx->id;
296 * re-init after a fork
298 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
300 NTSTATUS status;
301 int ret;
303 TALLOC_FREE(msg_ctx->local);
305 msg_ctx->id = procid_self();
307 ret = messaging_dgm_init(msg_ctx, msg_ctx->event_ctx,
308 msg_ctx->id, messaging_recv_cb, msg_ctx,
309 &msg_ctx->local);
310 if (ret != 0) {
311 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
312 return map_nt_error_from_unix(ret);
315 TALLOC_FREE(msg_ctx->remote);
317 if (lp_clustering()) {
318 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
319 &msg_ctx->remote);
321 if (!NT_STATUS_IS_OK(status)) {
322 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
323 nt_errstr(status)));
324 return status;
328 return NT_STATUS_OK;
333 * Register a dispatch function for a particular message type. Allow multiple
334 * registrants
336 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
337 void *private_data,
338 uint32_t msg_type,
339 void (*fn)(struct messaging_context *msg,
340 void *private_data,
341 uint32_t msg_type,
342 struct server_id server_id,
343 DATA_BLOB *data))
345 struct messaging_callback *cb;
347 DEBUG(5, ("Registering messaging pointer for type %u - "
348 "private_data=%p\n",
349 (unsigned)msg_type, private_data));
352 * Only one callback per type
355 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
356 /* we allow a second registration of the same message
357 type if it has a different private pointer. This is
358 needed in, for example, the internal notify code,
359 which creates a new notify context for each tree
360 connect, and expects to receive messages to each of
361 them. */
362 if (cb->msg_type == msg_type && private_data == cb->private_data) {
363 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
364 (unsigned)msg_type, private_data));
365 cb->fn = fn;
366 cb->private_data = private_data;
367 return NT_STATUS_OK;
371 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
372 return NT_STATUS_NO_MEMORY;
375 cb->msg_type = msg_type;
376 cb->fn = fn;
377 cb->private_data = private_data;
379 DLIST_ADD(msg_ctx->callbacks, cb);
380 return NT_STATUS_OK;
384 De-register the function for a particular message type.
386 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
387 void *private_data)
389 struct messaging_callback *cb, *next;
391 for (cb = ctx->callbacks; cb; cb = next) {
392 next = cb->next;
393 if ((cb->msg_type == msg_type)
394 && (cb->private_data == private_data)) {
395 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
396 (unsigned)msg_type, private_data));
397 DLIST_REMOVE(ctx->callbacks, cb);
398 TALLOC_FREE(cb);
403 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
404 const struct server_id *dst)
406 return ((msg_ctx->id.vnn == dst->vnn) &&
407 (msg_ctx->id.pid == dst->pid));
411 Send a message to a particular server
413 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
414 struct server_id server, uint32_t msg_type,
415 const DATA_BLOB *data)
417 struct iovec iov;
419 iov.iov_base = data->data;
420 iov.iov_len = data->length;
422 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
425 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
426 struct server_id server, uint32_t msg_type,
427 const uint8_t *buf, size_t len)
429 DATA_BLOB blob = data_blob_const(buf, len);
430 return messaging_send(msg_ctx, server, msg_type, &blob);
433 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
434 struct server_id server, uint32_t msg_type,
435 const struct iovec *iov, int iovlen)
437 int ret;
439 if (server_id_is_disconnected(&server)) {
440 return NT_STATUS_INVALID_PARAMETER_MIX;
443 if (!procid_is_local(&server)) {
444 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
445 msg_type, iov, iovlen,
446 msg_ctx->remote);
447 if (ret != 0) {
448 return map_nt_error_from_unix(ret);
450 return NT_STATUS_OK;
453 if (messaging_is_self_send(msg_ctx, &server)) {
454 struct messaging_rec rec;
455 uint8_t *buf;
457 buf = iov_buf(talloc_tos(), iov, iovlen);
458 if (buf == NULL) {
459 return NT_STATUS_NO_MEMORY;
462 rec.msg_version = MESSAGE_VERSION;
463 rec.msg_type = msg_type & MSG_TYPE_MASK;
464 rec.dest = server;
465 rec.src = msg_ctx->id;
466 rec.buf = data_blob_const(buf, talloc_get_size(buf));
467 messaging_dispatch_rec(msg_ctx, &rec);
468 TALLOC_FREE(buf);
469 return NT_STATUS_OK;
472 ret = messaging_dgm_send(msg_ctx->local, msg_ctx->id, server, msg_type,
473 iov, iovlen);
474 if (ret != 0) {
475 return map_nt_error_from_unix(ret);
477 return NT_STATUS_OK;
480 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
481 struct messaging_rec *rec)
483 struct messaging_rec *result;
485 result = talloc_pooled_object(mem_ctx, struct messaging_rec,
486 1, rec->buf.length);
487 if (result == NULL) {
488 return NULL;
490 *result = *rec;
492 /* Doesn't fail, see talloc_pooled_object */
494 result->buf.data = talloc_memdup(result, rec->buf.data,
495 rec->buf.length);
496 return result;
499 struct messaging_filtered_read_state {
500 struct tevent_context *ev;
501 struct messaging_context *msg_ctx;
502 void *tevent_handle;
504 bool (*filter)(struct messaging_rec *rec, void *private_data);
505 void *private_data;
507 struct messaging_rec *rec;
510 static void messaging_filtered_read_cleanup(struct tevent_req *req,
511 enum tevent_req_state req_state);
513 struct tevent_req *messaging_filtered_read_send(
514 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
515 struct messaging_context *msg_ctx,
516 bool (*filter)(struct messaging_rec *rec, void *private_data),
517 void *private_data)
519 struct tevent_req *req;
520 struct messaging_filtered_read_state *state;
521 size_t new_waiters_len;
523 req = tevent_req_create(mem_ctx, &state,
524 struct messaging_filtered_read_state);
525 if (req == NULL) {
526 return NULL;
528 state->ev = ev;
529 state->msg_ctx = msg_ctx;
530 state->filter = filter;
531 state->private_data = private_data;
534 * We have to defer the callback here, as we might be called from
535 * within a different tevent_context than state->ev
537 tevent_req_defer_callback(req, state->ev);
539 state->tevent_handle = messaging_dgm_register_tevent_context(
540 state, msg_ctx->local, ev);
541 if (tevent_req_nomem(state, req)) {
542 return tevent_req_post(req, ev);
546 * We add ourselves to the "new_waiters" array, not the "waiters"
547 * array. If we are called from within messaging_read_done,
548 * messaging_dispatch_rec will be in an active for-loop on
549 * "waiters". We must be careful not to mess with this array, because
550 * it could mean that a single event is being delivered twice.
553 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
555 if (new_waiters_len == msg_ctx->num_new_waiters) {
556 struct tevent_req **tmp;
558 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
559 struct tevent_req *, new_waiters_len+1);
560 if (tevent_req_nomem(tmp, req)) {
561 return tevent_req_post(req, ev);
563 msg_ctx->new_waiters = tmp;
566 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
567 msg_ctx->num_new_waiters += 1;
568 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
570 return req;
573 static void messaging_filtered_read_cleanup(struct tevent_req *req,
574 enum tevent_req_state req_state)
576 struct messaging_filtered_read_state *state = tevent_req_data(
577 req, struct messaging_filtered_read_state);
578 struct messaging_context *msg_ctx = state->msg_ctx;
579 unsigned i;
581 tevent_req_set_cleanup_fn(req, NULL);
583 TALLOC_FREE(state->tevent_handle);
586 * Just set the [new_]waiters entry to NULL, be careful not to mess
587 * with the other "waiters" array contents. We are often called from
588 * within "messaging_dispatch_rec", which loops over
589 * "waiters". Messing with the "waiters" array will mess up that
590 * for-loop.
593 for (i=0; i<msg_ctx->num_waiters; i++) {
594 if (msg_ctx->waiters[i] == req) {
595 msg_ctx->waiters[i] = NULL;
596 return;
600 for (i=0; i<msg_ctx->num_new_waiters; i++) {
601 if (msg_ctx->new_waiters[i] == req) {
602 msg_ctx->new_waiters[i] = NULL;
603 return;
608 static void messaging_filtered_read_done(struct tevent_req *req,
609 struct messaging_rec *rec)
611 struct messaging_filtered_read_state *state = tevent_req_data(
612 req, struct messaging_filtered_read_state);
614 state->rec = messaging_rec_dup(state, rec);
615 if (tevent_req_nomem(state->rec, req)) {
616 return;
618 tevent_req_done(req);
621 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
622 struct messaging_rec **presult)
624 struct messaging_filtered_read_state *state = tevent_req_data(
625 req, struct messaging_filtered_read_state);
626 int err;
628 if (tevent_req_is_unix_error(req, &err)) {
629 tevent_req_received(req);
630 return err;
632 *presult = talloc_move(mem_ctx, &state->rec);
633 return 0;
636 struct messaging_read_state {
637 uint32_t msg_type;
638 struct messaging_rec *rec;
641 static bool messaging_read_filter(struct messaging_rec *rec,
642 void *private_data);
643 static void messaging_read_done(struct tevent_req *subreq);
645 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
646 struct tevent_context *ev,
647 struct messaging_context *msg,
648 uint32_t msg_type)
650 struct tevent_req *req, *subreq;
651 struct messaging_read_state *state;
653 req = tevent_req_create(mem_ctx, &state,
654 struct messaging_read_state);
655 if (req == NULL) {
656 return NULL;
658 state->msg_type = msg_type;
660 subreq = messaging_filtered_read_send(state, ev, msg,
661 messaging_read_filter, state);
662 if (tevent_req_nomem(subreq, req)) {
663 return tevent_req_post(req, ev);
665 tevent_req_set_callback(subreq, messaging_read_done, req);
666 return req;
669 static bool messaging_read_filter(struct messaging_rec *rec,
670 void *private_data)
672 struct messaging_read_state *state = talloc_get_type_abort(
673 private_data, struct messaging_read_state);
675 return rec->msg_type == state->msg_type;
678 static void messaging_read_done(struct tevent_req *subreq)
680 struct tevent_req *req = tevent_req_callback_data(
681 subreq, struct tevent_req);
682 struct messaging_read_state *state = tevent_req_data(
683 req, struct messaging_read_state);
684 int ret;
686 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
687 TALLOC_FREE(subreq);
688 if (tevent_req_error(req, ret)) {
689 return;
691 tevent_req_done(req);
694 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
695 struct messaging_rec **presult)
697 struct messaging_read_state *state = tevent_req_data(
698 req, struct messaging_read_state);
699 int err;
701 if (tevent_req_is_unix_error(req, &err)) {
702 return err;
704 if (presult != NULL) {
705 *presult = talloc_move(mem_ctx, &state->rec);
707 return 0;
710 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
712 if (msg_ctx->num_new_waiters == 0) {
713 return true;
716 if (talloc_array_length(msg_ctx->waiters) <
717 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
718 struct tevent_req **tmp;
719 tmp = talloc_realloc(
720 msg_ctx, msg_ctx->waiters, struct tevent_req *,
721 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
722 if (tmp == NULL) {
723 DEBUG(1, ("%s: talloc failed\n", __func__));
724 return false;
726 msg_ctx->waiters = tmp;
729 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
730 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
732 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
733 msg_ctx->num_new_waiters = 0;
735 return true;
738 struct messaging_defer_callback_state {
739 struct messaging_context *msg_ctx;
740 struct messaging_rec *rec;
741 void (*fn)(struct messaging_context *msg, void *private_data,
742 uint32_t msg_type, struct server_id server_id,
743 DATA_BLOB *data);
744 void *private_data;
747 static void messaging_defer_callback_trigger(struct tevent_context *ev,
748 struct tevent_immediate *im,
749 void *private_data);
751 static void messaging_defer_callback(
752 struct messaging_context *msg_ctx, struct messaging_rec *rec,
753 void (*fn)(struct messaging_context *msg, void *private_data,
754 uint32_t msg_type, struct server_id server_id,
755 DATA_BLOB *data),
756 void *private_data)
758 struct messaging_defer_callback_state *state;
759 struct tevent_immediate *im;
761 state = talloc(msg_ctx, struct messaging_defer_callback_state);
762 if (state == NULL) {
763 DEBUG(1, ("talloc failed\n"));
764 return;
766 state->msg_ctx = msg_ctx;
767 state->fn = fn;
768 state->private_data = private_data;
770 state->rec = messaging_rec_dup(state, rec);
771 if (state->rec == NULL) {
772 DEBUG(1, ("talloc failed\n"));
773 TALLOC_FREE(state);
774 return;
777 im = tevent_create_immediate(state);
778 if (im == NULL) {
779 DEBUG(1, ("tevent_create_immediate failed\n"));
780 TALLOC_FREE(state);
781 return;
783 tevent_schedule_immediate(im, msg_ctx->event_ctx,
784 messaging_defer_callback_trigger, state);
787 static void messaging_defer_callback_trigger(struct tevent_context *ev,
788 struct tevent_immediate *im,
789 void *private_data)
791 struct messaging_defer_callback_state *state = talloc_get_type_abort(
792 private_data, struct messaging_defer_callback_state);
793 struct messaging_rec *rec = state->rec;
795 state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
796 &rec->buf);
797 TALLOC_FREE(state);
801 Dispatch one messaging_rec
803 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
804 struct messaging_rec *rec)
806 struct messaging_callback *cb, *next;
807 unsigned i;
809 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
810 next = cb->next;
811 if (cb->msg_type != rec->msg_type) {
812 continue;
815 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
817 * This is a self-send. We are called here from
818 * messaging_send(), and we don't want to directly
819 * recurse into the callback but go via a
820 * tevent_loop_once
822 messaging_defer_callback(msg_ctx, rec, cb->fn,
823 cb->private_data);
824 } else {
826 * This comes from a different process. we are called
827 * from the event loop, so we should call back
828 * directly.
830 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
831 rec->src, &rec->buf);
834 * we continue looking for matching messages after finding
835 * one. This matters for subsystems like the internal notify
836 * code which register more than one handler for the same
837 * message type
841 if (!messaging_append_new_waiters(msg_ctx)) {
842 return;
845 i = 0;
846 while (i < msg_ctx->num_waiters) {
847 struct tevent_req *req;
848 struct messaging_filtered_read_state *state;
850 req = msg_ctx->waiters[i];
851 if (req == NULL) {
853 * This got cleaned up. In the meantime,
854 * move everything down one. We need
855 * to keep the order of waiters, as
856 * other code may depend on this.
858 if (i < msg_ctx->num_waiters - 1) {
859 memmove(&msg_ctx->waiters[i],
860 &msg_ctx->waiters[i+1],
861 sizeof(struct tevent_req *) *
862 (msg_ctx->num_waiters - i - 1));
864 msg_ctx->num_waiters -= 1;
865 continue;
868 state = tevent_req_data(
869 req, struct messaging_filtered_read_state);
870 if (state->filter(rec, state->private_data)) {
871 messaging_filtered_read_done(req, rec);
874 i += 1;
878 static int mess_parent_dgm_cleanup(void *private_data);
879 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
881 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
883 struct tevent_req *req;
885 req = background_job_send(
886 msg, msg->event_ctx, msg, NULL, 0,
887 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
888 60*15),
889 mess_parent_dgm_cleanup, msg);
890 if (req == NULL) {
891 return false;
893 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
894 return true;
897 static int mess_parent_dgm_cleanup(void *private_data)
899 struct messaging_context *msg_ctx = talloc_get_type_abort(
900 private_data, struct messaging_context);
901 int ret;
903 ret = messaging_dgm_wipe(msg_ctx->local);
904 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
905 ret ? strerror(ret) : "ok"));
906 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
907 60*15);
910 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
912 struct messaging_context *msg = tevent_req_callback_data(
913 req, struct messaging_context);
914 NTSTATUS status;
916 status = background_job_recv(req);
917 TALLOC_FREE(req);
918 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
919 nt_errstr(status)));
921 req = background_job_send(
922 msg, msg->event_ctx, msg, NULL, 0,
923 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
924 60*15),
925 mess_parent_dgm_cleanup, msg);
926 if (req == NULL) {
927 DEBUG(1, ("background_job_send failed\n"));
929 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
932 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
934 int ret;
936 if (pid == 0) {
937 ret = messaging_dgm_wipe(msg_ctx->local);
938 } else {
939 ret = messaging_dgm_cleanup(msg_ctx->local, pid);
942 return ret;
945 struct tevent_context *messaging_tevent_context(
946 struct messaging_context *msg_ctx)
948 return msg_ctx->event_ctx;
951 /** @} **/