smbd: Use full_path_tos() where appropriate
[Samba/wip.git] / source3 / lib / messages.c
blob1263bf1698c15c07b010d35b0e82acf6d854fd82
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
55 struct messaging_callback {
56 struct messaging_callback *prev, *next;
57 uint32 msg_type;
58 void (*fn)(struct messaging_context *msg, void *private_data,
59 uint32_t msg_type,
60 struct server_id server_id, DATA_BLOB *data);
61 void *private_data;
64 struct messaging_context {
65 struct server_id id;
66 struct tevent_context *event_ctx;
67 struct messaging_callback *callbacks;
69 struct tevent_req **new_waiters;
70 unsigned num_new_waiters;
72 struct tevent_req **waiters;
73 unsigned num_waiters;
75 struct messaging_backend *local;
76 struct messaging_backend *remote;
78 bool *have_context;
81 static int messaging_context_destructor(struct messaging_context *msg_ctx);
83 /****************************************************************************
84 A useful function for testing the message system.
85 ****************************************************************************/
87 static void ping_message(struct messaging_context *msg_ctx,
88 void *private_data,
89 uint32_t msg_type,
90 struct server_id src,
91 DATA_BLOB *data)
93 const char *msg = "none";
94 char *free_me = NULL;
96 if (data->data != NULL) {
97 free_me = talloc_strndup(talloc_tos(), (char *)data->data,
98 data->length);
99 msg = free_me;
101 DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
102 procid_str_static(&src), msg));
103 TALLOC_FREE(free_me);
104 messaging_send(msg_ctx, src, MSG_PONG, data);
107 /****************************************************************************
108 Register/replace a dispatch function for a particular message type.
109 JRA changed Dec 13 2006. Only one message handler now permitted per type.
110 *NOTE*: Dispatch functions must be able to cope with incoming
111 messages on an *odd* byte boundary.
112 ****************************************************************************/
114 struct msg_all {
115 struct messaging_context *msg_ctx;
116 int msg_type;
117 uint32 msg_flag;
118 const void *buf;
119 size_t len;
120 int n_sent;
123 /****************************************************************************
124 Send one of the messages for the broadcast.
125 ****************************************************************************/
127 static int traverse_fn(struct db_record *rec, const struct server_id *id,
128 uint32_t msg_flags, void *state)
130 struct msg_all *msg_all = (struct msg_all *)state;
131 NTSTATUS status;
133 /* Don't send if the receiver hasn't registered an interest. */
135 if((msg_flags & msg_all->msg_flag) == 0) {
136 return 0;
139 /* If the msg send fails because the pid was not found (i.e. smbd died),
140 * the msg has already been deleted from the messages.tdb.*/
142 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
143 (const uint8_t *)msg_all->buf, msg_all->len);
145 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
148 * If the pid was not found delete the entry from
149 * serverid.tdb
152 DEBUG(2, ("pid %s doesn't exist\n", procid_str_static(id)));
154 dbwrap_record_delete(rec);
156 msg_all->n_sent++;
157 return 0;
161 * Send a message to all smbd processes.
163 * It isn't very efficient, but should be OK for the sorts of
164 * applications that use it. When we need efficient broadcast we can add
165 * it.
167 * @param n_sent Set to the number of messages sent. This should be
168 * equal to the number of processes, but be careful for races.
170 * @retval True for success.
172 bool message_send_all(struct messaging_context *msg_ctx,
173 int msg_type,
174 const void *buf, size_t len,
175 int *n_sent)
177 struct msg_all msg_all;
179 msg_all.msg_type = msg_type;
180 if (msg_type < 0x100) {
181 msg_all.msg_flag = FLAG_MSG_GENERAL;
182 } else if (msg_type > 0x100 && msg_type < 0x200) {
183 msg_all.msg_flag = FLAG_MSG_NMBD;
184 } else if (msg_type > 0x200 && msg_type < 0x300) {
185 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
186 } else if (msg_type > 0x300 && msg_type < 0x400) {
187 msg_all.msg_flag = FLAG_MSG_SMBD;
188 } else if (msg_type > 0x400 && msg_type < 0x600) {
189 msg_all.msg_flag = FLAG_MSG_WINBIND;
190 } else if (msg_type > 4000 && msg_type < 5000) {
191 msg_all.msg_flag = FLAG_MSG_DBWRAP;
192 } else {
193 return false;
196 msg_all.buf = buf;
197 msg_all.len = len;
198 msg_all.n_sent = 0;
199 msg_all.msg_ctx = msg_ctx;
201 serverid_traverse(traverse_fn, &msg_all);
202 if (n_sent)
203 *n_sent = msg_all.n_sent;
204 return true;
207 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
208 struct tevent_context *ev)
210 struct messaging_context *ctx;
211 NTSTATUS status;
212 static bool have_context = false;
214 if (have_context) {
215 DEBUG(0, ("No two messaging contexts per process\n"));
216 return NULL;
220 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
221 return NULL;
224 ctx->id = procid_self();
225 ctx->event_ctx = ev;
226 ctx->have_context = &have_context;
228 status = messaging_dgm_init(ctx, ctx, &ctx->local);
230 if (!NT_STATUS_IS_OK(status)) {
231 DEBUG(2, ("messaging_dgm_init failed: %s\n",
232 nt_errstr(status)));
233 TALLOC_FREE(ctx);
234 return NULL;
237 if (lp_clustering()) {
238 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
240 if (!NT_STATUS_IS_OK(status)) {
241 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
242 nt_errstr(status)));
243 TALLOC_FREE(ctx);
244 return NULL;
247 ctx->id.vnn = get_my_vnn();
249 messaging_register(ctx, NULL, MSG_PING, ping_message);
251 /* Register some debugging related messages */
253 register_msg_pool_usage(ctx);
254 register_dmalloc_msgs(ctx);
255 debug_register_msgs(ctx);
257 have_context = true;
258 talloc_set_destructor(ctx, messaging_context_destructor);
260 return ctx;
263 static int messaging_context_destructor(struct messaging_context *msg_ctx)
265 SMB_ASSERT(*msg_ctx->have_context);
266 *msg_ctx->have_context = false;
267 return 0;
270 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
272 return msg_ctx->id;
276 * re-init after a fork
278 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
280 NTSTATUS status;
282 TALLOC_FREE(msg_ctx->local);
284 msg_ctx->id = procid_self();
286 status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
287 if (!NT_STATUS_IS_OK(status)) {
288 DEBUG(0, ("messaging_dgm_init failed: %s\n",
289 nt_errstr(status)));
290 return status;
293 TALLOC_FREE(msg_ctx->remote);
295 if (lp_clustering()) {
296 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
297 &msg_ctx->remote);
299 if (!NT_STATUS_IS_OK(status)) {
300 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
301 nt_errstr(status)));
302 return status;
306 return NT_STATUS_OK;
311 * Register a dispatch function for a particular message type. Allow multiple
312 * registrants
314 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
315 void *private_data,
316 uint32_t msg_type,
317 void (*fn)(struct messaging_context *msg,
318 void *private_data,
319 uint32_t msg_type,
320 struct server_id server_id,
321 DATA_BLOB *data))
323 struct messaging_callback *cb;
325 DEBUG(5, ("Registering messaging pointer for type %u - "
326 "private_data=%p\n",
327 (unsigned)msg_type, private_data));
330 * Only one callback per type
333 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
334 /* we allow a second registration of the same message
335 type if it has a different private pointer. This is
336 needed in, for example, the internal notify code,
337 which creates a new notify context for each tree
338 connect, and expects to receive messages to each of
339 them. */
340 if (cb->msg_type == msg_type && private_data == cb->private_data) {
341 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
342 (unsigned)msg_type, private_data));
343 cb->fn = fn;
344 cb->private_data = private_data;
345 return NT_STATUS_OK;
349 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
350 return NT_STATUS_NO_MEMORY;
353 cb->msg_type = msg_type;
354 cb->fn = fn;
355 cb->private_data = private_data;
357 DLIST_ADD(msg_ctx->callbacks, cb);
358 return NT_STATUS_OK;
362 De-register the function for a particular message type.
364 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
365 void *private_data)
367 struct messaging_callback *cb, *next;
369 for (cb = ctx->callbacks; cb; cb = next) {
370 next = cb->next;
371 if ((cb->msg_type == msg_type)
372 && (cb->private_data == private_data)) {
373 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
374 (unsigned)msg_type, private_data));
375 DLIST_REMOVE(ctx->callbacks, cb);
376 TALLOC_FREE(cb);
381 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
382 const struct server_id *dst)
384 return ((msg_ctx->id.vnn == dst->vnn) &&
385 (msg_ctx->id.pid == dst->pid));
389 Send a message to a particular server
391 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
392 struct server_id server, uint32_t msg_type,
393 const DATA_BLOB *data)
395 struct iovec iov;
397 iov.iov_base = data->data;
398 iov.iov_len = data->length;
400 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
403 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
404 struct server_id server, uint32_t msg_type,
405 const uint8_t *buf, size_t len)
407 DATA_BLOB blob = data_blob_const(buf, len);
408 return messaging_send(msg_ctx, server, msg_type, &blob);
411 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
412 struct server_id server, uint32_t msg_type,
413 const struct iovec *iov, int iovlen)
415 if (server_id_is_disconnected(&server)) {
416 return NT_STATUS_INVALID_PARAMETER_MIX;
419 if (!procid_is_local(&server)) {
420 return msg_ctx->remote->send_fn(msg_ctx->id, server,
421 msg_type, iov, iovlen,
422 msg_ctx->remote);
425 if (messaging_is_self_send(msg_ctx, &server)) {
426 struct messaging_rec rec;
427 uint8_t *buf;
428 DATA_BLOB data;
430 buf = iov_buf(talloc_tos(), iov, iovlen);
431 if (buf == NULL) {
432 return NT_STATUS_NO_MEMORY;
435 data = data_blob_const(buf, talloc_get_size(buf));
437 rec.msg_version = MESSAGE_VERSION;
438 rec.msg_type = msg_type & MSG_TYPE_MASK;
439 rec.dest = server;
440 rec.src = msg_ctx->id;
441 rec.buf = data;
442 messaging_dispatch_rec(msg_ctx, &rec);
443 TALLOC_FREE(buf);
444 return NT_STATUS_OK;
447 return msg_ctx->local->send_fn(msg_ctx->id, server, msg_type,
448 iov, iovlen, msg_ctx->local);
451 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
452 struct messaging_rec *rec)
454 struct messaging_rec *result;
456 result = talloc_pooled_object(mem_ctx, struct messaging_rec,
457 1, rec->buf.length);
458 if (result == NULL) {
459 return NULL;
461 *result = *rec;
463 /* Doesn't fail, see talloc_pooled_object */
465 result->buf.data = talloc_memdup(result, rec->buf.data,
466 rec->buf.length);
467 return result;
470 struct messaging_filtered_read_state {
471 struct tevent_context *ev;
472 struct messaging_context *msg_ctx;
473 void *tevent_handle;
475 bool (*filter)(struct messaging_rec *rec, void *private_data);
476 void *private_data;
478 struct messaging_rec *rec;
481 static void messaging_filtered_read_cleanup(struct tevent_req *req,
482 enum tevent_req_state req_state);
484 struct tevent_req *messaging_filtered_read_send(
485 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
486 struct messaging_context *msg_ctx,
487 bool (*filter)(struct messaging_rec *rec, void *private_data),
488 void *private_data)
490 struct tevent_req *req;
491 struct messaging_filtered_read_state *state;
492 size_t new_waiters_len;
494 req = tevent_req_create(mem_ctx, &state,
495 struct messaging_filtered_read_state);
496 if (req == NULL) {
497 return NULL;
499 state->ev = ev;
500 state->msg_ctx = msg_ctx;
501 state->filter = filter;
502 state->private_data = private_data;
505 * We have to defer the callback here, as we might be called from
506 * within a different tevent_context than state->ev
508 tevent_req_defer_callback(req, state->ev);
510 state->tevent_handle = messaging_dgm_register_tevent_context(
511 state, msg_ctx, ev);
512 if (tevent_req_nomem(state, req)) {
513 return tevent_req_post(req, ev);
517 * We add ourselves to the "new_waiters" array, not the "waiters"
518 * array. If we are called from within messaging_read_done,
519 * messaging_dispatch_rec will be in an active for-loop on
520 * "waiters". We must be careful not to mess with this array, because
521 * it could mean that a single event is being delivered twice.
524 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
526 if (new_waiters_len == msg_ctx->num_new_waiters) {
527 struct tevent_req **tmp;
529 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
530 struct tevent_req *, new_waiters_len+1);
531 if (tevent_req_nomem(tmp, req)) {
532 return tevent_req_post(req, ev);
534 msg_ctx->new_waiters = tmp;
537 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
538 msg_ctx->num_new_waiters += 1;
539 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
541 return req;
544 static void messaging_filtered_read_cleanup(struct tevent_req *req,
545 enum tevent_req_state req_state)
547 struct messaging_filtered_read_state *state = tevent_req_data(
548 req, struct messaging_filtered_read_state);
549 struct messaging_context *msg_ctx = state->msg_ctx;
550 unsigned i;
552 tevent_req_set_cleanup_fn(req, NULL);
554 TALLOC_FREE(state->tevent_handle);
557 * Just set the [new_]waiters entry to NULL, be careful not to mess
558 * with the other "waiters" array contents. We are often called from
559 * within "messaging_dispatch_rec", which loops over
560 * "waiters". Messing with the "waiters" array will mess up that
561 * for-loop.
564 for (i=0; i<msg_ctx->num_waiters; i++) {
565 if (msg_ctx->waiters[i] == req) {
566 msg_ctx->waiters[i] = NULL;
567 return;
571 for (i=0; i<msg_ctx->num_new_waiters; i++) {
572 if (msg_ctx->new_waiters[i] == req) {
573 msg_ctx->new_waiters[i] = NULL;
574 return;
579 static void messaging_filtered_read_done(struct tevent_req *req,
580 struct messaging_rec *rec)
582 struct messaging_filtered_read_state *state = tevent_req_data(
583 req, struct messaging_filtered_read_state);
585 state->rec = messaging_rec_dup(state, rec);
586 if (tevent_req_nomem(state->rec, req)) {
587 return;
589 tevent_req_done(req);
592 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
593 struct messaging_rec **presult)
595 struct messaging_filtered_read_state *state = tevent_req_data(
596 req, struct messaging_filtered_read_state);
597 int err;
599 if (tevent_req_is_unix_error(req, &err)) {
600 tevent_req_received(req);
601 return err;
603 *presult = talloc_move(mem_ctx, &state->rec);
604 return 0;
607 struct messaging_read_state {
608 uint32_t msg_type;
609 struct messaging_rec *rec;
612 static bool messaging_read_filter(struct messaging_rec *rec,
613 void *private_data);
614 static void messaging_read_done(struct tevent_req *subreq);
616 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
617 struct tevent_context *ev,
618 struct messaging_context *msg,
619 uint32_t msg_type)
621 struct tevent_req *req, *subreq;
622 struct messaging_read_state *state;
624 req = tevent_req_create(mem_ctx, &state,
625 struct messaging_read_state);
626 if (req == NULL) {
627 return NULL;
629 state->msg_type = msg_type;
631 subreq = messaging_filtered_read_send(state, ev, msg,
632 messaging_read_filter, state);
633 if (tevent_req_nomem(subreq, req)) {
634 return tevent_req_post(req, ev);
636 tevent_req_set_callback(subreq, messaging_read_done, req);
637 return req;
640 static bool messaging_read_filter(struct messaging_rec *rec,
641 void *private_data)
643 struct messaging_read_state *state = talloc_get_type_abort(
644 private_data, struct messaging_read_state);
646 return rec->msg_type == state->msg_type;
649 static void messaging_read_done(struct tevent_req *subreq)
651 struct tevent_req *req = tevent_req_callback_data(
652 subreq, struct tevent_req);
653 struct messaging_read_state *state = tevent_req_data(
654 req, struct messaging_read_state);
655 int ret;
657 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
658 TALLOC_FREE(subreq);
659 if (tevent_req_error(req, ret)) {
660 return;
662 tevent_req_done(req);
665 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
666 struct messaging_rec **presult)
668 struct messaging_read_state *state = tevent_req_data(
669 req, struct messaging_read_state);
670 int err;
672 if (tevent_req_is_unix_error(req, &err)) {
673 return err;
675 if (presult != NULL) {
676 *presult = talloc_move(mem_ctx, &state->rec);
678 return 0;
681 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
683 if (msg_ctx->num_new_waiters == 0) {
684 return true;
687 if (talloc_array_length(msg_ctx->waiters) <
688 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
689 struct tevent_req **tmp;
690 tmp = talloc_realloc(
691 msg_ctx, msg_ctx->waiters, struct tevent_req *,
692 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
693 if (tmp == NULL) {
694 DEBUG(1, ("%s: talloc failed\n", __func__));
695 return false;
697 msg_ctx->waiters = tmp;
700 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
701 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
703 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
704 msg_ctx->num_new_waiters = 0;
706 return true;
709 struct messaging_defer_callback_state {
710 struct messaging_context *msg_ctx;
711 struct messaging_rec *rec;
712 void (*fn)(struct messaging_context *msg, void *private_data,
713 uint32_t msg_type, struct server_id server_id,
714 DATA_BLOB *data);
715 void *private_data;
718 static void messaging_defer_callback_trigger(struct tevent_context *ev,
719 struct tevent_immediate *im,
720 void *private_data);
722 static void messaging_defer_callback(
723 struct messaging_context *msg_ctx, struct messaging_rec *rec,
724 void (*fn)(struct messaging_context *msg, void *private_data,
725 uint32_t msg_type, struct server_id server_id,
726 DATA_BLOB *data),
727 void *private_data)
729 struct messaging_defer_callback_state *state;
730 struct tevent_immediate *im;
732 state = talloc(msg_ctx, struct messaging_defer_callback_state);
733 if (state == NULL) {
734 DEBUG(1, ("talloc failed\n"));
735 return;
737 state->msg_ctx = msg_ctx;
738 state->fn = fn;
739 state->private_data = private_data;
741 state->rec = messaging_rec_dup(state, rec);
742 if (state->rec == NULL) {
743 DEBUG(1, ("talloc failed\n"));
744 TALLOC_FREE(state);
745 return;
748 im = tevent_create_immediate(state);
749 if (im == NULL) {
750 DEBUG(1, ("tevent_create_immediate failed\n"));
751 TALLOC_FREE(state);
752 return;
754 tevent_schedule_immediate(im, msg_ctx->event_ctx,
755 messaging_defer_callback_trigger, state);
758 static void messaging_defer_callback_trigger(struct tevent_context *ev,
759 struct tevent_immediate *im,
760 void *private_data)
762 struct messaging_defer_callback_state *state = talloc_get_type_abort(
763 private_data, struct messaging_defer_callback_state);
764 struct messaging_rec *rec = state->rec;
766 state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
767 &rec->buf);
768 TALLOC_FREE(state);
772 Dispatch one messaging_rec
774 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
775 struct messaging_rec *rec)
777 struct messaging_callback *cb, *next;
778 unsigned i;
780 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
781 next = cb->next;
782 if (cb->msg_type != rec->msg_type) {
783 continue;
786 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
788 * This is a self-send. We are called here from
789 * messaging_send(), and we don't want to directly
790 * recurse into the callback but go via a
791 * tevent_loop_once
793 messaging_defer_callback(msg_ctx, rec, cb->fn,
794 cb->private_data);
795 } else {
797 * This comes from a different process. we are called
798 * from the event loop, so we should call back
799 * directly.
801 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
802 rec->src, &rec->buf);
805 * we continue looking for matching messages after finding
806 * one. This matters for subsystems like the internal notify
807 * code which register more than one handler for the same
808 * message type
812 if (!messaging_append_new_waiters(msg_ctx)) {
813 return;
816 i = 0;
817 while (i < msg_ctx->num_waiters) {
818 struct tevent_req *req;
819 struct messaging_filtered_read_state *state;
821 req = msg_ctx->waiters[i];
822 if (req == NULL) {
824 * This got cleaned up. In the meantime,
825 * move everything down one. We need
826 * to keep the order of waiters, as
827 * other code may depend on this.
829 if (i < msg_ctx->num_waiters - 1) {
830 memmove(&msg_ctx->waiters[i],
831 &msg_ctx->waiters[i+1],
832 sizeof(struct tevent_req *) *
833 (msg_ctx->num_waiters - i - 1));
835 msg_ctx->num_waiters -= 1;
836 continue;
839 state = tevent_req_data(
840 req, struct messaging_filtered_read_state);
841 if (state->filter(rec, state->private_data)) {
842 messaging_filtered_read_done(req, rec);
845 i += 1;
849 static int mess_parent_dgm_cleanup(void *private_data);
850 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
852 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
854 struct tevent_req *req;
856 req = background_job_send(
857 msg, msg->event_ctx, msg, NULL, 0,
858 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
859 60*15),
860 mess_parent_dgm_cleanup, msg);
861 if (req == NULL) {
862 return false;
864 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
865 return true;
868 static int mess_parent_dgm_cleanup(void *private_data)
870 struct messaging_context *msg_ctx = talloc_get_type_abort(
871 private_data, struct messaging_context);
872 NTSTATUS status;
874 status = messaging_dgm_wipe(msg_ctx);
875 DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
876 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
877 60*15);
880 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
882 struct messaging_context *msg = tevent_req_callback_data(
883 req, struct messaging_context);
884 NTSTATUS status;
886 status = background_job_recv(req);
887 TALLOC_FREE(req);
888 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
889 nt_errstr(status)));
891 req = background_job_send(
892 msg, msg->event_ctx, msg, NULL, 0,
893 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
894 60*15),
895 mess_parent_dgm_cleanup, msg);
896 if (req == NULL) {
897 DEBUG(1, ("background_job_send failed\n"));
899 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
902 struct messaging_backend *messaging_local_backend(
903 struct messaging_context *msg_ctx)
905 return msg_ctx->local;
908 struct tevent_context *messaging_tevent_context(
909 struct messaging_context *msg_ctx)
911 return msg_ctx->event_ctx;
914 /** @} **/