uwrap: Add mutex in uwrap_destructor().
[Samba.git] / source3 / lib / messages.c
blobbbc5183343f5567111be60d8038bff3aa93618a3
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
55 struct messaging_callback {
56 struct messaging_callback *prev, *next;
57 uint32 msg_type;
58 void (*fn)(struct messaging_context *msg, void *private_data,
59 uint32_t msg_type,
60 struct server_id server_id, DATA_BLOB *data);
61 void *private_data;
64 struct messaging_context {
65 struct server_id id;
66 struct tevent_context *event_ctx;
67 struct messaging_callback *callbacks;
69 struct tevent_req **new_waiters;
70 unsigned num_new_waiters;
72 struct tevent_req **waiters;
73 unsigned num_waiters;
75 struct messaging_backend *local;
76 struct messaging_backend *remote;
78 bool *have_context;
81 static int messaging_context_destructor(struct messaging_context *msg_ctx);
83 /****************************************************************************
84 A useful function for testing the message system.
85 ****************************************************************************/
87 static void ping_message(struct messaging_context *msg_ctx,
88 void *private_data,
89 uint32_t msg_type,
90 struct server_id src,
91 DATA_BLOB *data)
93 struct server_id_buf idbuf;
95 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
96 server_id_str_buf(src, &idbuf), (int)data->length,
97 data->data ? (char *)data->data : ""));
99 messaging_send(msg_ctx, src, MSG_PONG, data);
102 /****************************************************************************
103 Register/replace a dispatch function for a particular message type.
104 JRA changed Dec 13 2006. Only one message handler now permitted per type.
105 *NOTE*: Dispatch functions must be able to cope with incoming
106 messages on an *odd* byte boundary.
107 ****************************************************************************/
109 struct msg_all {
110 struct messaging_context *msg_ctx;
111 int msg_type;
112 uint32 msg_flag;
113 const void *buf;
114 size_t len;
115 int n_sent;
118 /****************************************************************************
119 Send one of the messages for the broadcast.
120 ****************************************************************************/
122 static int traverse_fn(struct db_record *rec, const struct server_id *id,
123 uint32_t msg_flags, void *state)
125 struct msg_all *msg_all = (struct msg_all *)state;
126 NTSTATUS status;
128 /* Don't send if the receiver hasn't registered an interest. */
130 if((msg_flags & msg_all->msg_flag) == 0) {
131 return 0;
134 /* If the msg send fails because the pid was not found (i.e. smbd died),
135 * the msg has already been deleted from the messages.tdb.*/
137 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
138 (const uint8_t *)msg_all->buf, msg_all->len);
140 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
141 struct server_id_buf idbuf;
144 * If the pid was not found delete the entry from
145 * serverid.tdb
148 DEBUG(2, ("pid %s doesn't exist\n",
149 server_id_str_buf(*id, &idbuf)));
151 dbwrap_record_delete(rec);
153 msg_all->n_sent++;
154 return 0;
158 * Send a message to all smbd processes.
160 * It isn't very efficient, but should be OK for the sorts of
161 * applications that use it. When we need efficient broadcast we can add
162 * it.
164 * @param n_sent Set to the number of messages sent. This should be
165 * equal to the number of processes, but be careful for races.
167 * @retval True for success.
169 bool message_send_all(struct messaging_context *msg_ctx,
170 int msg_type,
171 const void *buf, size_t len,
172 int *n_sent)
174 struct msg_all msg_all;
176 msg_all.msg_type = msg_type;
177 if (msg_type < 0x100) {
178 msg_all.msg_flag = FLAG_MSG_GENERAL;
179 } else if (msg_type > 0x100 && msg_type < 0x200) {
180 msg_all.msg_flag = FLAG_MSG_NMBD;
181 } else if (msg_type > 0x200 && msg_type < 0x300) {
182 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
183 } else if (msg_type > 0x300 && msg_type < 0x400) {
184 msg_all.msg_flag = FLAG_MSG_SMBD;
185 } else if (msg_type > 0x400 && msg_type < 0x600) {
186 msg_all.msg_flag = FLAG_MSG_WINBIND;
187 } else if (msg_type > 4000 && msg_type < 5000) {
188 msg_all.msg_flag = FLAG_MSG_DBWRAP;
189 } else {
190 return false;
193 msg_all.buf = buf;
194 msg_all.len = len;
195 msg_all.n_sent = 0;
196 msg_all.msg_ctx = msg_ctx;
198 serverid_traverse(traverse_fn, &msg_all);
199 if (n_sent)
200 *n_sent = msg_all.n_sent;
201 return true;
204 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
205 struct tevent_context *ev)
207 struct messaging_context *ctx;
208 NTSTATUS status;
209 int ret;
210 static bool have_context = false;
212 if (have_context) {
213 DEBUG(0, ("No two messaging contexts per process\n"));
214 return NULL;
218 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
219 return NULL;
222 ctx->id = procid_self();
223 ctx->event_ctx = ev;
224 ctx->have_context = &have_context;
226 ret = messaging_dgm_init(ctx, ctx, &ctx->local);
228 if (ret != 0) {
229 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
230 TALLOC_FREE(ctx);
231 return NULL;
234 if (lp_clustering()) {
235 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
237 if (!NT_STATUS_IS_OK(status)) {
238 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
239 nt_errstr(status)));
240 TALLOC_FREE(ctx);
241 return NULL;
244 ctx->id.vnn = get_my_vnn();
246 messaging_register(ctx, NULL, MSG_PING, ping_message);
248 /* Register some debugging related messages */
250 register_msg_pool_usage(ctx);
251 register_dmalloc_msgs(ctx);
252 debug_register_msgs(ctx);
254 have_context = true;
255 talloc_set_destructor(ctx, messaging_context_destructor);
257 return ctx;
260 static int messaging_context_destructor(struct messaging_context *msg_ctx)
262 SMB_ASSERT(*msg_ctx->have_context);
263 *msg_ctx->have_context = false;
264 return 0;
267 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
269 return msg_ctx->id;
273 * re-init after a fork
275 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
277 NTSTATUS status;
278 int ret;
280 TALLOC_FREE(msg_ctx->local);
282 msg_ctx->id = procid_self();
284 ret = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
285 if (ret != 0) {
286 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
287 return map_nt_error_from_unix(ret);
290 TALLOC_FREE(msg_ctx->remote);
292 if (lp_clustering()) {
293 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
294 &msg_ctx->remote);
296 if (!NT_STATUS_IS_OK(status)) {
297 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
298 nt_errstr(status)));
299 return status;
303 return NT_STATUS_OK;
308 * Register a dispatch function for a particular message type. Allow multiple
309 * registrants
311 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
312 void *private_data,
313 uint32_t msg_type,
314 void (*fn)(struct messaging_context *msg,
315 void *private_data,
316 uint32_t msg_type,
317 struct server_id server_id,
318 DATA_BLOB *data))
320 struct messaging_callback *cb;
322 DEBUG(5, ("Registering messaging pointer for type %u - "
323 "private_data=%p\n",
324 (unsigned)msg_type, private_data));
327 * Only one callback per type
330 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
331 /* we allow a second registration of the same message
332 type if it has a different private pointer. This is
333 needed in, for example, the internal notify code,
334 which creates a new notify context for each tree
335 connect, and expects to receive messages to each of
336 them. */
337 if (cb->msg_type == msg_type && private_data == cb->private_data) {
338 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
339 (unsigned)msg_type, private_data));
340 cb->fn = fn;
341 cb->private_data = private_data;
342 return NT_STATUS_OK;
346 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
347 return NT_STATUS_NO_MEMORY;
350 cb->msg_type = msg_type;
351 cb->fn = fn;
352 cb->private_data = private_data;
354 DLIST_ADD(msg_ctx->callbacks, cb);
355 return NT_STATUS_OK;
359 De-register the function for a particular message type.
361 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
362 void *private_data)
364 struct messaging_callback *cb, *next;
366 for (cb = ctx->callbacks; cb; cb = next) {
367 next = cb->next;
368 if ((cb->msg_type == msg_type)
369 && (cb->private_data == private_data)) {
370 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
371 (unsigned)msg_type, private_data));
372 DLIST_REMOVE(ctx->callbacks, cb);
373 TALLOC_FREE(cb);
378 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
379 const struct server_id *dst)
381 return ((msg_ctx->id.vnn == dst->vnn) &&
382 (msg_ctx->id.pid == dst->pid));
386 Send a message to a particular server
388 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
389 struct server_id server, uint32_t msg_type,
390 const DATA_BLOB *data)
392 struct iovec iov;
394 iov.iov_base = data->data;
395 iov.iov_len = data->length;
397 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
400 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
401 struct server_id server, uint32_t msg_type,
402 const uint8_t *buf, size_t len)
404 DATA_BLOB blob = data_blob_const(buf, len);
405 return messaging_send(msg_ctx, server, msg_type, &blob);
408 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
409 struct server_id server, uint32_t msg_type,
410 const struct iovec *iov, int iovlen)
412 int ret;
414 if (server_id_is_disconnected(&server)) {
415 return NT_STATUS_INVALID_PARAMETER_MIX;
418 if (!procid_is_local(&server)) {
419 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
420 msg_type, iov, iovlen,
421 msg_ctx->remote);
422 if (ret != 0) {
423 return map_nt_error_from_unix(ret);
425 return NT_STATUS_OK;
428 if (messaging_is_self_send(msg_ctx, &server)) {
429 struct messaging_rec rec;
430 uint8_t *buf;
432 buf = iov_buf(talloc_tos(), iov, iovlen);
433 if (buf == NULL) {
434 return NT_STATUS_NO_MEMORY;
437 rec.msg_version = MESSAGE_VERSION;
438 rec.msg_type = msg_type & MSG_TYPE_MASK;
439 rec.dest = server;
440 rec.src = msg_ctx->id;
441 rec.buf = data_blob_const(buf, talloc_get_size(buf));
442 messaging_dispatch_rec(msg_ctx, &rec);
443 TALLOC_FREE(buf);
444 return NT_STATUS_OK;
447 ret = msg_ctx->local->send_fn(msg_ctx->id, server, msg_type,
448 iov, iovlen, msg_ctx->local);
449 if (ret != 0) {
450 return map_nt_error_from_unix(ret);
452 return NT_STATUS_OK;
455 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
456 struct messaging_rec *rec)
458 struct messaging_rec *result;
460 result = talloc_pooled_object(mem_ctx, struct messaging_rec,
461 1, rec->buf.length);
462 if (result == NULL) {
463 return NULL;
465 *result = *rec;
467 /* Doesn't fail, see talloc_pooled_object */
469 result->buf.data = talloc_memdup(result, rec->buf.data,
470 rec->buf.length);
471 return result;
474 struct messaging_filtered_read_state {
475 struct tevent_context *ev;
476 struct messaging_context *msg_ctx;
477 void *tevent_handle;
479 bool (*filter)(struct messaging_rec *rec, void *private_data);
480 void *private_data;
482 struct messaging_rec *rec;
485 static void messaging_filtered_read_cleanup(struct tevent_req *req,
486 enum tevent_req_state req_state);
488 struct tevent_req *messaging_filtered_read_send(
489 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
490 struct messaging_context *msg_ctx,
491 bool (*filter)(struct messaging_rec *rec, void *private_data),
492 void *private_data)
494 struct tevent_req *req;
495 struct messaging_filtered_read_state *state;
496 size_t new_waiters_len;
498 req = tevent_req_create(mem_ctx, &state,
499 struct messaging_filtered_read_state);
500 if (req == NULL) {
501 return NULL;
503 state->ev = ev;
504 state->msg_ctx = msg_ctx;
505 state->filter = filter;
506 state->private_data = private_data;
509 * We have to defer the callback here, as we might be called from
510 * within a different tevent_context than state->ev
512 tevent_req_defer_callback(req, state->ev);
514 state->tevent_handle = messaging_dgm_register_tevent_context(
515 state, msg_ctx, ev);
516 if (tevent_req_nomem(state, req)) {
517 return tevent_req_post(req, ev);
521 * We add ourselves to the "new_waiters" array, not the "waiters"
522 * array. If we are called from within messaging_read_done,
523 * messaging_dispatch_rec will be in an active for-loop on
524 * "waiters". We must be careful not to mess with this array, because
525 * it could mean that a single event is being delivered twice.
528 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
530 if (new_waiters_len == msg_ctx->num_new_waiters) {
531 struct tevent_req **tmp;
533 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
534 struct tevent_req *, new_waiters_len+1);
535 if (tevent_req_nomem(tmp, req)) {
536 return tevent_req_post(req, ev);
538 msg_ctx->new_waiters = tmp;
541 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
542 msg_ctx->num_new_waiters += 1;
543 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
545 return req;
548 static void messaging_filtered_read_cleanup(struct tevent_req *req,
549 enum tevent_req_state req_state)
551 struct messaging_filtered_read_state *state = tevent_req_data(
552 req, struct messaging_filtered_read_state);
553 struct messaging_context *msg_ctx = state->msg_ctx;
554 unsigned i;
556 tevent_req_set_cleanup_fn(req, NULL);
558 TALLOC_FREE(state->tevent_handle);
561 * Just set the [new_]waiters entry to NULL, be careful not to mess
562 * with the other "waiters" array contents. We are often called from
563 * within "messaging_dispatch_rec", which loops over
564 * "waiters". Messing with the "waiters" array will mess up that
565 * for-loop.
568 for (i=0; i<msg_ctx->num_waiters; i++) {
569 if (msg_ctx->waiters[i] == req) {
570 msg_ctx->waiters[i] = NULL;
571 return;
575 for (i=0; i<msg_ctx->num_new_waiters; i++) {
576 if (msg_ctx->new_waiters[i] == req) {
577 msg_ctx->new_waiters[i] = NULL;
578 return;
583 static void messaging_filtered_read_done(struct tevent_req *req,
584 struct messaging_rec *rec)
586 struct messaging_filtered_read_state *state = tevent_req_data(
587 req, struct messaging_filtered_read_state);
589 state->rec = messaging_rec_dup(state, rec);
590 if (tevent_req_nomem(state->rec, req)) {
591 return;
593 tevent_req_done(req);
596 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
597 struct messaging_rec **presult)
599 struct messaging_filtered_read_state *state = tevent_req_data(
600 req, struct messaging_filtered_read_state);
601 int err;
603 if (tevent_req_is_unix_error(req, &err)) {
604 tevent_req_received(req);
605 return err;
607 *presult = talloc_move(mem_ctx, &state->rec);
608 return 0;
611 struct messaging_read_state {
612 uint32_t msg_type;
613 struct messaging_rec *rec;
616 static bool messaging_read_filter(struct messaging_rec *rec,
617 void *private_data);
618 static void messaging_read_done(struct tevent_req *subreq);
620 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
621 struct tevent_context *ev,
622 struct messaging_context *msg,
623 uint32_t msg_type)
625 struct tevent_req *req, *subreq;
626 struct messaging_read_state *state;
628 req = tevent_req_create(mem_ctx, &state,
629 struct messaging_read_state);
630 if (req == NULL) {
631 return NULL;
633 state->msg_type = msg_type;
635 subreq = messaging_filtered_read_send(state, ev, msg,
636 messaging_read_filter, state);
637 if (tevent_req_nomem(subreq, req)) {
638 return tevent_req_post(req, ev);
640 tevent_req_set_callback(subreq, messaging_read_done, req);
641 return req;
644 static bool messaging_read_filter(struct messaging_rec *rec,
645 void *private_data)
647 struct messaging_read_state *state = talloc_get_type_abort(
648 private_data, struct messaging_read_state);
650 return rec->msg_type == state->msg_type;
653 static void messaging_read_done(struct tevent_req *subreq)
655 struct tevent_req *req = tevent_req_callback_data(
656 subreq, struct tevent_req);
657 struct messaging_read_state *state = tevent_req_data(
658 req, struct messaging_read_state);
659 int ret;
661 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
662 TALLOC_FREE(subreq);
663 if (tevent_req_error(req, ret)) {
664 return;
666 tevent_req_done(req);
669 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
670 struct messaging_rec **presult)
672 struct messaging_read_state *state = tevent_req_data(
673 req, struct messaging_read_state);
674 int err;
676 if (tevent_req_is_unix_error(req, &err)) {
677 return err;
679 if (presult != NULL) {
680 *presult = talloc_move(mem_ctx, &state->rec);
682 return 0;
685 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
687 if (msg_ctx->num_new_waiters == 0) {
688 return true;
691 if (talloc_array_length(msg_ctx->waiters) <
692 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
693 struct tevent_req **tmp;
694 tmp = talloc_realloc(
695 msg_ctx, msg_ctx->waiters, struct tevent_req *,
696 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
697 if (tmp == NULL) {
698 DEBUG(1, ("%s: talloc failed\n", __func__));
699 return false;
701 msg_ctx->waiters = tmp;
704 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
705 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
707 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
708 msg_ctx->num_new_waiters = 0;
710 return true;
713 struct messaging_defer_callback_state {
714 struct messaging_context *msg_ctx;
715 struct messaging_rec *rec;
716 void (*fn)(struct messaging_context *msg, void *private_data,
717 uint32_t msg_type, struct server_id server_id,
718 DATA_BLOB *data);
719 void *private_data;
722 static void messaging_defer_callback_trigger(struct tevent_context *ev,
723 struct tevent_immediate *im,
724 void *private_data);
726 static void messaging_defer_callback(
727 struct messaging_context *msg_ctx, struct messaging_rec *rec,
728 void (*fn)(struct messaging_context *msg, void *private_data,
729 uint32_t msg_type, struct server_id server_id,
730 DATA_BLOB *data),
731 void *private_data)
733 struct messaging_defer_callback_state *state;
734 struct tevent_immediate *im;
736 state = talloc(msg_ctx, struct messaging_defer_callback_state);
737 if (state == NULL) {
738 DEBUG(1, ("talloc failed\n"));
739 return;
741 state->msg_ctx = msg_ctx;
742 state->fn = fn;
743 state->private_data = private_data;
745 state->rec = messaging_rec_dup(state, rec);
746 if (state->rec == NULL) {
747 DEBUG(1, ("talloc failed\n"));
748 TALLOC_FREE(state);
749 return;
752 im = tevent_create_immediate(state);
753 if (im == NULL) {
754 DEBUG(1, ("tevent_create_immediate failed\n"));
755 TALLOC_FREE(state);
756 return;
758 tevent_schedule_immediate(im, msg_ctx->event_ctx,
759 messaging_defer_callback_trigger, state);
762 static void messaging_defer_callback_trigger(struct tevent_context *ev,
763 struct tevent_immediate *im,
764 void *private_data)
766 struct messaging_defer_callback_state *state = talloc_get_type_abort(
767 private_data, struct messaging_defer_callback_state);
768 struct messaging_rec *rec = state->rec;
770 state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
771 &rec->buf);
772 TALLOC_FREE(state);
776 Dispatch one messaging_rec
778 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
779 struct messaging_rec *rec)
781 struct messaging_callback *cb, *next;
782 unsigned i;
784 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
785 next = cb->next;
786 if (cb->msg_type != rec->msg_type) {
787 continue;
790 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
792 * This is a self-send. We are called here from
793 * messaging_send(), and we don't want to directly
794 * recurse into the callback but go via a
795 * tevent_loop_once
797 messaging_defer_callback(msg_ctx, rec, cb->fn,
798 cb->private_data);
799 } else {
801 * This comes from a different process. we are called
802 * from the event loop, so we should call back
803 * directly.
805 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
806 rec->src, &rec->buf);
809 * we continue looking for matching messages after finding
810 * one. This matters for subsystems like the internal notify
811 * code which register more than one handler for the same
812 * message type
816 if (!messaging_append_new_waiters(msg_ctx)) {
817 return;
820 i = 0;
821 while (i < msg_ctx->num_waiters) {
822 struct tevent_req *req;
823 struct messaging_filtered_read_state *state;
825 req = msg_ctx->waiters[i];
826 if (req == NULL) {
828 * This got cleaned up. In the meantime,
829 * move everything down one. We need
830 * to keep the order of waiters, as
831 * other code may depend on this.
833 if (i < msg_ctx->num_waiters - 1) {
834 memmove(&msg_ctx->waiters[i],
835 &msg_ctx->waiters[i+1],
836 sizeof(struct tevent_req *) *
837 (msg_ctx->num_waiters - i - 1));
839 msg_ctx->num_waiters -= 1;
840 continue;
843 state = tevent_req_data(
844 req, struct messaging_filtered_read_state);
845 if (state->filter(rec, state->private_data)) {
846 messaging_filtered_read_done(req, rec);
849 i += 1;
853 static int mess_parent_dgm_cleanup(void *private_data);
854 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
856 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
858 struct tevent_req *req;
860 req = background_job_send(
861 msg, msg->event_ctx, msg, NULL, 0,
862 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
863 60*15),
864 mess_parent_dgm_cleanup, msg);
865 if (req == NULL) {
866 return false;
868 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
869 return true;
872 static int mess_parent_dgm_cleanup(void *private_data)
874 struct messaging_context *msg_ctx = talloc_get_type_abort(
875 private_data, struct messaging_context);
876 int ret;
878 ret = messaging_dgm_wipe(msg_ctx);
879 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
880 ret ? strerror(ret) : "ok"));
881 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
882 60*15);
885 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
887 struct messaging_context *msg = tevent_req_callback_data(
888 req, struct messaging_context);
889 NTSTATUS status;
891 status = background_job_recv(req);
892 TALLOC_FREE(req);
893 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
894 nt_errstr(status)));
896 req = background_job_send(
897 msg, msg->event_ctx, msg, NULL, 0,
898 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
899 60*15),
900 mess_parent_dgm_cleanup, msg);
901 if (req == NULL) {
902 DEBUG(1, ("background_job_send failed\n"));
904 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
907 struct messaging_backend *messaging_local_backend(
908 struct messaging_context *msg_ctx)
910 return msg_ctx->local;
913 struct tevent_context *messaging_tevent_context(
914 struct messaging_context *msg_ctx)
916 return msg_ctx->event_ctx;
919 /** @} **/