smbd: Restructure brl_conflict_other
[Samba.git] / source3 / lib / messages.c
blob9514392c037340efced925e92e5e1cd0b9b2a026
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
55 struct messaging_callback {
56 struct messaging_callback *prev, *next;
57 uint32 msg_type;
58 void (*fn)(struct messaging_context *msg, void *private_data,
59 uint32_t msg_type,
60 struct server_id server_id, DATA_BLOB *data);
61 void *private_data;
64 struct messaging_context {
65 struct server_id id;
66 struct tevent_context *event_ctx;
67 struct messaging_callback *callbacks;
69 struct tevent_req **new_waiters;
70 unsigned num_new_waiters;
72 struct tevent_req **waiters;
73 unsigned num_waiters;
75 struct messaging_backend *local;
76 struct messaging_backend *remote;
78 bool *have_context;
81 static int messaging_context_destructor(struct messaging_context *msg_ctx);
83 /****************************************************************************
84 A useful function for testing the message system.
85 ****************************************************************************/
87 static void ping_message(struct messaging_context *msg_ctx,
88 void *private_data,
89 uint32_t msg_type,
90 struct server_id src,
91 DATA_BLOB *data)
93 const char *msg = "none";
94 char *free_me = NULL;
96 if (data->data != NULL) {
97 free_me = talloc_strndup(talloc_tos(), (char *)data->data,
98 data->length);
99 msg = free_me;
101 DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
102 procid_str_static(&src), msg));
103 TALLOC_FREE(free_me);
104 messaging_send(msg_ctx, src, MSG_PONG, data);
107 /****************************************************************************
108 Register/replace a dispatch function for a particular message type.
109 JRA changed Dec 13 2006. Only one message handler now permitted per type.
110 *NOTE*: Dispatch functions must be able to cope with incoming
111 messages on an *odd* byte boundary.
112 ****************************************************************************/
114 struct msg_all {
115 struct messaging_context *msg_ctx;
116 int msg_type;
117 uint32 msg_flag;
118 const void *buf;
119 size_t len;
120 int n_sent;
123 /****************************************************************************
124 Send one of the messages for the broadcast.
125 ****************************************************************************/
127 static int traverse_fn(struct db_record *rec, const struct server_id *id,
128 uint32_t msg_flags, void *state)
130 struct msg_all *msg_all = (struct msg_all *)state;
131 NTSTATUS status;
133 /* Don't send if the receiver hasn't registered an interest. */
135 if((msg_flags & msg_all->msg_flag) == 0) {
136 return 0;
139 /* If the msg send fails because the pid was not found (i.e. smbd died),
140 * the msg has already been deleted from the messages.tdb.*/
142 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
143 (const uint8_t *)msg_all->buf, msg_all->len);
145 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
148 * If the pid was not found delete the entry from
149 * serverid.tdb
152 DEBUG(2, ("pid %s doesn't exist\n", procid_str_static(id)));
154 dbwrap_record_delete(rec);
156 msg_all->n_sent++;
157 return 0;
161 * Send a message to all smbd processes.
163 * It isn't very efficient, but should be OK for the sorts of
164 * applications that use it. When we need efficient broadcast we can add
165 * it.
167 * @param n_sent Set to the number of messages sent. This should be
168 * equal to the number of processes, but be careful for races.
170 * @retval True for success.
172 bool message_send_all(struct messaging_context *msg_ctx,
173 int msg_type,
174 const void *buf, size_t len,
175 int *n_sent)
177 struct msg_all msg_all;
179 msg_all.msg_type = msg_type;
180 if (msg_type < 0x100) {
181 msg_all.msg_flag = FLAG_MSG_GENERAL;
182 } else if (msg_type > 0x100 && msg_type < 0x200) {
183 msg_all.msg_flag = FLAG_MSG_NMBD;
184 } else if (msg_type > 0x200 && msg_type < 0x300) {
185 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
186 } else if (msg_type > 0x300 && msg_type < 0x400) {
187 msg_all.msg_flag = FLAG_MSG_SMBD;
188 } else if (msg_type > 0x400 && msg_type < 0x600) {
189 msg_all.msg_flag = FLAG_MSG_WINBIND;
190 } else if (msg_type > 4000 && msg_type < 5000) {
191 msg_all.msg_flag = FLAG_MSG_DBWRAP;
192 } else {
193 return false;
196 msg_all.buf = buf;
197 msg_all.len = len;
198 msg_all.n_sent = 0;
199 msg_all.msg_ctx = msg_ctx;
201 serverid_traverse(traverse_fn, &msg_all);
202 if (n_sent)
203 *n_sent = msg_all.n_sent;
204 return true;
207 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
208 struct tevent_context *ev)
210 struct messaging_context *ctx;
211 NTSTATUS status;
212 int ret;
213 static bool have_context = false;
215 if (have_context) {
216 DEBUG(0, ("No two messaging contexts per process\n"));
217 return NULL;
221 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
222 return NULL;
225 ctx->id = procid_self();
226 ctx->event_ctx = ev;
227 ctx->have_context = &have_context;
229 ret = messaging_dgm_init(ctx, ctx, &ctx->local);
231 if (ret != 0) {
232 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
233 TALLOC_FREE(ctx);
234 return NULL;
237 if (lp_clustering()) {
238 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
240 if (!NT_STATUS_IS_OK(status)) {
241 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
242 nt_errstr(status)));
243 TALLOC_FREE(ctx);
244 return NULL;
247 ctx->id.vnn = get_my_vnn();
249 messaging_register(ctx, NULL, MSG_PING, ping_message);
251 /* Register some debugging related messages */
253 register_msg_pool_usage(ctx);
254 register_dmalloc_msgs(ctx);
255 debug_register_msgs(ctx);
257 have_context = true;
258 talloc_set_destructor(ctx, messaging_context_destructor);
260 return ctx;
263 static int messaging_context_destructor(struct messaging_context *msg_ctx)
265 SMB_ASSERT(*msg_ctx->have_context);
266 *msg_ctx->have_context = false;
267 return 0;
270 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
272 return msg_ctx->id;
276 * re-init after a fork
278 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
280 NTSTATUS status;
281 int ret;
283 TALLOC_FREE(msg_ctx->local);
285 msg_ctx->id = procid_self();
287 ret = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
288 if (ret != 0) {
289 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
290 return map_nt_error_from_unix(ret);
293 TALLOC_FREE(msg_ctx->remote);
295 if (lp_clustering()) {
296 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
297 &msg_ctx->remote);
299 if (!NT_STATUS_IS_OK(status)) {
300 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
301 nt_errstr(status)));
302 return status;
306 return NT_STATUS_OK;
311 * Register a dispatch function for a particular message type. Allow multiple
312 * registrants
314 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
315 void *private_data,
316 uint32_t msg_type,
317 void (*fn)(struct messaging_context *msg,
318 void *private_data,
319 uint32_t msg_type,
320 struct server_id server_id,
321 DATA_BLOB *data))
323 struct messaging_callback *cb;
325 DEBUG(5, ("Registering messaging pointer for type %u - "
326 "private_data=%p\n",
327 (unsigned)msg_type, private_data));
330 * Only one callback per type
333 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
334 /* we allow a second registration of the same message
335 type if it has a different private pointer. This is
336 needed in, for example, the internal notify code,
337 which creates a new notify context for each tree
338 connect, and expects to receive messages to each of
339 them. */
340 if (cb->msg_type == msg_type && private_data == cb->private_data) {
341 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
342 (unsigned)msg_type, private_data));
343 cb->fn = fn;
344 cb->private_data = private_data;
345 return NT_STATUS_OK;
349 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
350 return NT_STATUS_NO_MEMORY;
353 cb->msg_type = msg_type;
354 cb->fn = fn;
355 cb->private_data = private_data;
357 DLIST_ADD(msg_ctx->callbacks, cb);
358 return NT_STATUS_OK;
362 De-register the function for a particular message type.
364 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
365 void *private_data)
367 struct messaging_callback *cb, *next;
369 for (cb = ctx->callbacks; cb; cb = next) {
370 next = cb->next;
371 if ((cb->msg_type == msg_type)
372 && (cb->private_data == private_data)) {
373 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
374 (unsigned)msg_type, private_data));
375 DLIST_REMOVE(ctx->callbacks, cb);
376 TALLOC_FREE(cb);
381 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
382 const struct server_id *dst)
384 return ((msg_ctx->id.vnn == dst->vnn) &&
385 (msg_ctx->id.pid == dst->pid));
389 Send a message to a particular server
391 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
392 struct server_id server, uint32_t msg_type,
393 const DATA_BLOB *data)
395 struct iovec iov;
397 iov.iov_base = data->data;
398 iov.iov_len = data->length;
400 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
403 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
404 struct server_id server, uint32_t msg_type,
405 const uint8_t *buf, size_t len)
407 DATA_BLOB blob = data_blob_const(buf, len);
408 return messaging_send(msg_ctx, server, msg_type, &blob);
411 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
412 struct server_id server, uint32_t msg_type,
413 const struct iovec *iov, int iovlen)
415 int ret;
417 if (server_id_is_disconnected(&server)) {
418 return NT_STATUS_INVALID_PARAMETER_MIX;
421 if (!procid_is_local(&server)) {
422 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
423 msg_type, iov, iovlen,
424 msg_ctx->remote);
425 if (ret != 0) {
426 return map_nt_error_from_unix(ret);
428 return NT_STATUS_OK;
431 if (messaging_is_self_send(msg_ctx, &server)) {
432 struct messaging_rec rec;
433 uint8_t *buf;
434 DATA_BLOB data;
436 buf = iov_buf(talloc_tos(), iov, iovlen);
437 if (buf == NULL) {
438 return NT_STATUS_NO_MEMORY;
441 data = data_blob_const(buf, talloc_get_size(buf));
443 rec.msg_version = MESSAGE_VERSION;
444 rec.msg_type = msg_type & MSG_TYPE_MASK;
445 rec.dest = server;
446 rec.src = msg_ctx->id;
447 rec.buf = data;
448 messaging_dispatch_rec(msg_ctx, &rec);
449 TALLOC_FREE(buf);
450 return NT_STATUS_OK;
453 ret = msg_ctx->local->send_fn(msg_ctx->id, server, msg_type,
454 iov, iovlen, msg_ctx->local);
455 if (ret != 0) {
456 return map_nt_error_from_unix(ret);
458 return NT_STATUS_OK;
461 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
462 struct messaging_rec *rec)
464 struct messaging_rec *result;
466 result = talloc_pooled_object(mem_ctx, struct messaging_rec,
467 1, rec->buf.length);
468 if (result == NULL) {
469 return NULL;
471 *result = *rec;
473 /* Doesn't fail, see talloc_pooled_object */
475 result->buf.data = talloc_memdup(result, rec->buf.data,
476 rec->buf.length);
477 return result;
480 struct messaging_filtered_read_state {
481 struct tevent_context *ev;
482 struct messaging_context *msg_ctx;
483 void *tevent_handle;
485 bool (*filter)(struct messaging_rec *rec, void *private_data);
486 void *private_data;
488 struct messaging_rec *rec;
491 static void messaging_filtered_read_cleanup(struct tevent_req *req,
492 enum tevent_req_state req_state);
494 struct tevent_req *messaging_filtered_read_send(
495 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
496 struct messaging_context *msg_ctx,
497 bool (*filter)(struct messaging_rec *rec, void *private_data),
498 void *private_data)
500 struct tevent_req *req;
501 struct messaging_filtered_read_state *state;
502 size_t new_waiters_len;
504 req = tevent_req_create(mem_ctx, &state,
505 struct messaging_filtered_read_state);
506 if (req == NULL) {
507 return NULL;
509 state->ev = ev;
510 state->msg_ctx = msg_ctx;
511 state->filter = filter;
512 state->private_data = private_data;
515 * We have to defer the callback here, as we might be called from
516 * within a different tevent_context than state->ev
518 tevent_req_defer_callback(req, state->ev);
520 state->tevent_handle = messaging_dgm_register_tevent_context(
521 state, msg_ctx, ev);
522 if (tevent_req_nomem(state, req)) {
523 return tevent_req_post(req, ev);
527 * We add ourselves to the "new_waiters" array, not the "waiters"
528 * array. If we are called from within messaging_read_done,
529 * messaging_dispatch_rec will be in an active for-loop on
530 * "waiters". We must be careful not to mess with this array, because
531 * it could mean that a single event is being delivered twice.
534 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
536 if (new_waiters_len == msg_ctx->num_new_waiters) {
537 struct tevent_req **tmp;
539 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
540 struct tevent_req *, new_waiters_len+1);
541 if (tevent_req_nomem(tmp, req)) {
542 return tevent_req_post(req, ev);
544 msg_ctx->new_waiters = tmp;
547 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
548 msg_ctx->num_new_waiters += 1;
549 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
551 return req;
554 static void messaging_filtered_read_cleanup(struct tevent_req *req,
555 enum tevent_req_state req_state)
557 struct messaging_filtered_read_state *state = tevent_req_data(
558 req, struct messaging_filtered_read_state);
559 struct messaging_context *msg_ctx = state->msg_ctx;
560 unsigned i;
562 tevent_req_set_cleanup_fn(req, NULL);
564 TALLOC_FREE(state->tevent_handle);
567 * Just set the [new_]waiters entry to NULL, be careful not to mess
568 * with the other "waiters" array contents. We are often called from
569 * within "messaging_dispatch_rec", which loops over
570 * "waiters". Messing with the "waiters" array will mess up that
571 * for-loop.
574 for (i=0; i<msg_ctx->num_waiters; i++) {
575 if (msg_ctx->waiters[i] == req) {
576 msg_ctx->waiters[i] = NULL;
577 return;
581 for (i=0; i<msg_ctx->num_new_waiters; i++) {
582 if (msg_ctx->new_waiters[i] == req) {
583 msg_ctx->new_waiters[i] = NULL;
584 return;
589 static void messaging_filtered_read_done(struct tevent_req *req,
590 struct messaging_rec *rec)
592 struct messaging_filtered_read_state *state = tevent_req_data(
593 req, struct messaging_filtered_read_state);
595 state->rec = messaging_rec_dup(state, rec);
596 if (tevent_req_nomem(state->rec, req)) {
597 return;
599 tevent_req_done(req);
602 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
603 struct messaging_rec **presult)
605 struct messaging_filtered_read_state *state = tevent_req_data(
606 req, struct messaging_filtered_read_state);
607 int err;
609 if (tevent_req_is_unix_error(req, &err)) {
610 tevent_req_received(req);
611 return err;
613 *presult = talloc_move(mem_ctx, &state->rec);
614 return 0;
617 struct messaging_read_state {
618 uint32_t msg_type;
619 struct messaging_rec *rec;
622 static bool messaging_read_filter(struct messaging_rec *rec,
623 void *private_data);
624 static void messaging_read_done(struct tevent_req *subreq);
626 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
627 struct tevent_context *ev,
628 struct messaging_context *msg,
629 uint32_t msg_type)
631 struct tevent_req *req, *subreq;
632 struct messaging_read_state *state;
634 req = tevent_req_create(mem_ctx, &state,
635 struct messaging_read_state);
636 if (req == NULL) {
637 return NULL;
639 state->msg_type = msg_type;
641 subreq = messaging_filtered_read_send(state, ev, msg,
642 messaging_read_filter, state);
643 if (tevent_req_nomem(subreq, req)) {
644 return tevent_req_post(req, ev);
646 tevent_req_set_callback(subreq, messaging_read_done, req);
647 return req;
650 static bool messaging_read_filter(struct messaging_rec *rec,
651 void *private_data)
653 struct messaging_read_state *state = talloc_get_type_abort(
654 private_data, struct messaging_read_state);
656 return rec->msg_type == state->msg_type;
659 static void messaging_read_done(struct tevent_req *subreq)
661 struct tevent_req *req = tevent_req_callback_data(
662 subreq, struct tevent_req);
663 struct messaging_read_state *state = tevent_req_data(
664 req, struct messaging_read_state);
665 int ret;
667 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
668 TALLOC_FREE(subreq);
669 if (tevent_req_error(req, ret)) {
670 return;
672 tevent_req_done(req);
675 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
676 struct messaging_rec **presult)
678 struct messaging_read_state *state = tevent_req_data(
679 req, struct messaging_read_state);
680 int err;
682 if (tevent_req_is_unix_error(req, &err)) {
683 return err;
685 if (presult != NULL) {
686 *presult = talloc_move(mem_ctx, &state->rec);
688 return 0;
691 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
693 if (msg_ctx->num_new_waiters == 0) {
694 return true;
697 if (talloc_array_length(msg_ctx->waiters) <
698 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
699 struct tevent_req **tmp;
700 tmp = talloc_realloc(
701 msg_ctx, msg_ctx->waiters, struct tevent_req *,
702 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
703 if (tmp == NULL) {
704 DEBUG(1, ("%s: talloc failed\n", __func__));
705 return false;
707 msg_ctx->waiters = tmp;
710 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
711 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
713 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
714 msg_ctx->num_new_waiters = 0;
716 return true;
719 struct messaging_defer_callback_state {
720 struct messaging_context *msg_ctx;
721 struct messaging_rec *rec;
722 void (*fn)(struct messaging_context *msg, void *private_data,
723 uint32_t msg_type, struct server_id server_id,
724 DATA_BLOB *data);
725 void *private_data;
728 static void messaging_defer_callback_trigger(struct tevent_context *ev,
729 struct tevent_immediate *im,
730 void *private_data);
732 static void messaging_defer_callback(
733 struct messaging_context *msg_ctx, struct messaging_rec *rec,
734 void (*fn)(struct messaging_context *msg, void *private_data,
735 uint32_t msg_type, struct server_id server_id,
736 DATA_BLOB *data),
737 void *private_data)
739 struct messaging_defer_callback_state *state;
740 struct tevent_immediate *im;
742 state = talloc(msg_ctx, struct messaging_defer_callback_state);
743 if (state == NULL) {
744 DEBUG(1, ("talloc failed\n"));
745 return;
747 state->msg_ctx = msg_ctx;
748 state->fn = fn;
749 state->private_data = private_data;
751 state->rec = messaging_rec_dup(state, rec);
752 if (state->rec == NULL) {
753 DEBUG(1, ("talloc failed\n"));
754 TALLOC_FREE(state);
755 return;
758 im = tevent_create_immediate(state);
759 if (im == NULL) {
760 DEBUG(1, ("tevent_create_immediate failed\n"));
761 TALLOC_FREE(state);
762 return;
764 tevent_schedule_immediate(im, msg_ctx->event_ctx,
765 messaging_defer_callback_trigger, state);
768 static void messaging_defer_callback_trigger(struct tevent_context *ev,
769 struct tevent_immediate *im,
770 void *private_data)
772 struct messaging_defer_callback_state *state = talloc_get_type_abort(
773 private_data, struct messaging_defer_callback_state);
774 struct messaging_rec *rec = state->rec;
776 state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
777 &rec->buf);
778 TALLOC_FREE(state);
782 Dispatch one messaging_rec
784 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
785 struct messaging_rec *rec)
787 struct messaging_callback *cb, *next;
788 unsigned i;
790 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
791 next = cb->next;
792 if (cb->msg_type != rec->msg_type) {
793 continue;
796 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
798 * This is a self-send. We are called here from
799 * messaging_send(), and we don't want to directly
800 * recurse into the callback but go via a
801 * tevent_loop_once
803 messaging_defer_callback(msg_ctx, rec, cb->fn,
804 cb->private_data);
805 } else {
807 * This comes from a different process. we are called
808 * from the event loop, so we should call back
809 * directly.
811 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
812 rec->src, &rec->buf);
815 * we continue looking for matching messages after finding
816 * one. This matters for subsystems like the internal notify
817 * code which register more than one handler for the same
818 * message type
822 if (!messaging_append_new_waiters(msg_ctx)) {
823 return;
826 i = 0;
827 while (i < msg_ctx->num_waiters) {
828 struct tevent_req *req;
829 struct messaging_filtered_read_state *state;
831 req = msg_ctx->waiters[i];
832 if (req == NULL) {
834 * This got cleaned up. In the meantime,
835 * move everything down one. We need
836 * to keep the order of waiters, as
837 * other code may depend on this.
839 if (i < msg_ctx->num_waiters - 1) {
840 memmove(&msg_ctx->waiters[i],
841 &msg_ctx->waiters[i+1],
842 sizeof(struct tevent_req *) *
843 (msg_ctx->num_waiters - i - 1));
845 msg_ctx->num_waiters -= 1;
846 continue;
849 state = tevent_req_data(
850 req, struct messaging_filtered_read_state);
851 if (state->filter(rec, state->private_data)) {
852 messaging_filtered_read_done(req, rec);
855 i += 1;
859 static int mess_parent_dgm_cleanup(void *private_data);
860 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
862 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
864 struct tevent_req *req;
866 req = background_job_send(
867 msg, msg->event_ctx, msg, NULL, 0,
868 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
869 60*15),
870 mess_parent_dgm_cleanup, msg);
871 if (req == NULL) {
872 return false;
874 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
875 return true;
878 static int mess_parent_dgm_cleanup(void *private_data)
880 struct messaging_context *msg_ctx = talloc_get_type_abort(
881 private_data, struct messaging_context);
882 int ret;
884 ret = messaging_dgm_wipe(msg_ctx);
885 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
886 ret ? strerror(ret) : "ok"));
887 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
888 60*15);
891 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
893 struct messaging_context *msg = tevent_req_callback_data(
894 req, struct messaging_context);
895 NTSTATUS status;
897 status = background_job_recv(req);
898 TALLOC_FREE(req);
899 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
900 nt_errstr(status)));
902 req = background_job_send(
903 msg, msg->event_ctx, msg, NULL, 0,
904 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
905 60*15),
906 mess_parent_dgm_cleanup, msg);
907 if (req == NULL) {
908 DEBUG(1, ("background_job_send failed\n"));
910 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
913 struct messaging_backend *messaging_local_backend(
914 struct messaging_context *msg_ctx)
916 return msg_ctx->local;
919 struct tevent_context *messaging_tevent_context(
920 struct messaging_context *msg_ctx)
922 return msg_ctx->event_ctx;
925 /** @} **/