dsdb: Ignore errors from search in dns_notify module
[Samba.git] / source3 / lib / messages.c
blob7eadb0209b9679e27075c574d5e90e9dede6b282
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/iov_buf.h"
56 #include "lib/util/server_id_db.h"
58 struct messaging_callback {
59 struct messaging_callback *prev, *next;
60 uint32 msg_type;
61 void (*fn)(struct messaging_context *msg, void *private_data,
62 uint32_t msg_type,
63 struct server_id server_id, DATA_BLOB *data);
64 void *private_data;
67 struct messaging_context {
68 struct server_id id;
69 struct tevent_context *event_ctx;
70 struct messaging_callback *callbacks;
72 struct tevent_req **new_waiters;
73 unsigned num_new_waiters;
75 struct tevent_req **waiters;
76 unsigned num_waiters;
78 struct messaging_backend *remote;
80 struct server_id_db *names_db;
83 struct messaging_hdr {
84 uint32_t msg_type;
85 struct server_id dst;
86 struct server_id src;
89 /****************************************************************************
90 A useful function for testing the message system.
91 ****************************************************************************/
93 static void ping_message(struct messaging_context *msg_ctx,
94 void *private_data,
95 uint32_t msg_type,
96 struct server_id src,
97 DATA_BLOB *data)
99 struct server_id_buf idbuf;
101 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102 server_id_str_buf(src, &idbuf), (int)data->length,
103 data->data ? (char *)data->data : ""));
105 messaging_send(msg_ctx, src, MSG_PONG, data);
108 /****************************************************************************
109 Register/replace a dispatch function for a particular message type.
110 JRA changed Dec 13 2006. Only one message handler now permitted per type.
111 *NOTE*: Dispatch functions must be able to cope with incoming
112 messages on an *odd* byte boundary.
113 ****************************************************************************/
115 struct msg_all {
116 struct messaging_context *msg_ctx;
117 int msg_type;
118 uint32 msg_flag;
119 const void *buf;
120 size_t len;
121 int n_sent;
124 /****************************************************************************
125 Send one of the messages for the broadcast.
126 ****************************************************************************/
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129 uint32_t msg_flags, void *state)
131 struct msg_all *msg_all = (struct msg_all *)state;
132 NTSTATUS status;
134 /* Don't send if the receiver hasn't registered an interest. */
136 if((msg_flags & msg_all->msg_flag) == 0) {
137 return 0;
140 /* If the msg send fails because the pid was not found (i.e. smbd died),
141 * the msg has already been deleted from the messages.tdb.*/
143 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144 (const uint8_t *)msg_all->buf, msg_all->len);
146 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147 struct server_id_buf idbuf;
150 * If the pid was not found delete the entry from
151 * serverid.tdb
154 DEBUG(2, ("pid %s doesn't exist\n",
155 server_id_str_buf(*id, &idbuf)));
157 dbwrap_record_delete(rec);
159 msg_all->n_sent++;
160 return 0;
164 * Send a message to all smbd processes.
166 * It isn't very efficient, but should be OK for the sorts of
167 * applications that use it. When we need efficient broadcast we can add
168 * it.
170 * @param n_sent Set to the number of messages sent. This should be
171 * equal to the number of processes, but be careful for races.
173 * @retval True for success.
175 bool message_send_all(struct messaging_context *msg_ctx,
176 int msg_type,
177 const void *buf, size_t len,
178 int *n_sent)
180 struct msg_all msg_all;
182 msg_all.msg_type = msg_type;
183 if (msg_type < 0x100) {
184 msg_all.msg_flag = FLAG_MSG_GENERAL;
185 } else if (msg_type > 0x100 && msg_type < 0x200) {
186 msg_all.msg_flag = FLAG_MSG_NMBD;
187 } else if (msg_type > 0x200 && msg_type < 0x300) {
188 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189 } else if (msg_type > 0x300 && msg_type < 0x400) {
190 msg_all.msg_flag = FLAG_MSG_SMBD;
191 } else if (msg_type > 0x400 && msg_type < 0x600) {
192 msg_all.msg_flag = FLAG_MSG_WINBIND;
193 } else if (msg_type > 4000 && msg_type < 5000) {
194 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195 } else {
196 return false;
199 msg_all.buf = buf;
200 msg_all.len = len;
201 msg_all.n_sent = 0;
202 msg_all.msg_ctx = msg_ctx;
204 serverid_traverse(traverse_fn, &msg_all);
205 if (n_sent)
206 *n_sent = msg_all.n_sent;
207 return true;
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211 int *fds, size_t num_fds,
212 void *private_data)
214 struct messaging_context *msg_ctx = talloc_get_type_abort(
215 private_data, struct messaging_context);
216 const struct messaging_hdr *hdr;
217 struct server_id_buf idbuf;
218 struct messaging_rec rec;
219 int64_t fds64[MIN(num_fds, INT8_MAX)];
220 size_t i;
222 if (msg_len < sizeof(*hdr)) {
223 for (i=0; i < num_fds; i++) {
224 close(fds[i]);
226 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
227 return;
230 if (num_fds > INT8_MAX) {
231 for (i=0; i < num_fds; i++) {
232 close(fds[i]);
234 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
235 return;
239 * "consume" the fds by copying them and setting
240 * the original variable to -1
242 for (i=0; i < num_fds; i++) {
243 fds64[i] = fds[i];
244 fds[i] = -1;
248 * messages_dgm guarantees alignment, so we can cast here
250 hdr = (const struct messaging_hdr *)msg;
252 DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
253 __func__, (unsigned)hdr->msg_type,
254 (unsigned)(msg_len - sizeof(*hdr)),
255 (unsigned)num_fds,
256 server_id_str_buf(hdr->src, &idbuf)));
258 rec = (struct messaging_rec) {
259 .msg_version = MESSAGE_VERSION,
260 .msg_type = hdr->msg_type,
261 .src = hdr->src,
262 .dest = hdr->dst,
263 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
264 .buf.length = msg_len - sizeof(*hdr),
265 .num_fds = num_fds,
266 .fds = fds64,
269 messaging_dispatch_rec(msg_ctx, &rec);
272 static int messaging_context_destructor(struct messaging_context *ctx)
274 unsigned i;
276 messaging_dgm_destroy();
278 for (i=0; i<ctx->num_new_waiters; i++) {
279 if (ctx->new_waiters[i] != NULL) {
280 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
281 ctx->new_waiters[i] = NULL;
284 for (i=0; i<ctx->num_waiters; i++) {
285 if (ctx->waiters[i] != NULL) {
286 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
287 ctx->waiters[i] = NULL;
291 return 0;
294 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
295 struct tevent_context *ev)
297 struct messaging_context *ctx;
298 NTSTATUS status;
299 int ret;
301 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
302 return NULL;
305 ctx->id = procid_self();
306 ctx->event_ctx = ev;
308 sec_init();
310 ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
311 lp_cache_directory(), sec_initial_uid(),
312 messaging_recv_cb, ctx);
314 if (ret != 0) {
315 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
316 TALLOC_FREE(ctx);
317 return NULL;
320 ctx->names_db = server_id_db_init(
321 ctx, ctx->id, lp_cache_directory(), 0,
322 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
323 if (ctx->names_db == NULL) {
324 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
325 TALLOC_FREE(ctx);
326 return NULL;
329 talloc_set_destructor(ctx, messaging_context_destructor);
331 if (lp_clustering()) {
332 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
334 if (!NT_STATUS_IS_OK(status)) {
335 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
336 nt_errstr(status)));
337 TALLOC_FREE(ctx);
338 return NULL;
341 ctx->id.vnn = get_my_vnn();
343 messaging_register(ctx, NULL, MSG_PING, ping_message);
345 /* Register some debugging related messages */
347 register_msg_pool_usage(ctx);
348 register_dmalloc_msgs(ctx);
349 debug_register_msgs(ctx);
351 return ctx;
354 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
356 return msg_ctx->id;
360 * re-init after a fork
362 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
364 NTSTATUS status;
365 int ret;
367 messaging_dgm_destroy();
369 msg_ctx->id = procid_self();
371 ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
372 lp_cache_directory(), sec_initial_uid(),
373 messaging_recv_cb, msg_ctx);
374 if (ret != 0) {
375 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
376 return map_nt_error_from_unix(ret);
379 TALLOC_FREE(msg_ctx->remote);
381 if (lp_clustering()) {
382 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
383 &msg_ctx->remote);
385 if (!NT_STATUS_IS_OK(status)) {
386 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
387 nt_errstr(status)));
388 return status;
392 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
394 return NT_STATUS_OK;
399 * Register a dispatch function for a particular message type. Allow multiple
400 * registrants
402 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
403 void *private_data,
404 uint32_t msg_type,
405 void (*fn)(struct messaging_context *msg,
406 void *private_data,
407 uint32_t msg_type,
408 struct server_id server_id,
409 DATA_BLOB *data))
411 struct messaging_callback *cb;
413 DEBUG(5, ("Registering messaging pointer for type %u - "
414 "private_data=%p\n",
415 (unsigned)msg_type, private_data));
418 * Only one callback per type
421 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
422 /* we allow a second registration of the same message
423 type if it has a different private pointer. This is
424 needed in, for example, the internal notify code,
425 which creates a new notify context for each tree
426 connect, and expects to receive messages to each of
427 them. */
428 if (cb->msg_type == msg_type && private_data == cb->private_data) {
429 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
430 (unsigned)msg_type, private_data));
431 cb->fn = fn;
432 cb->private_data = private_data;
433 return NT_STATUS_OK;
437 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
438 return NT_STATUS_NO_MEMORY;
441 cb->msg_type = msg_type;
442 cb->fn = fn;
443 cb->private_data = private_data;
445 DLIST_ADD(msg_ctx->callbacks, cb);
446 return NT_STATUS_OK;
450 De-register the function for a particular message type.
452 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
453 void *private_data)
455 struct messaging_callback *cb, *next;
457 for (cb = ctx->callbacks; cb; cb = next) {
458 next = cb->next;
459 if ((cb->msg_type == msg_type)
460 && (cb->private_data == private_data)) {
461 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
462 (unsigned)msg_type, private_data));
463 DLIST_REMOVE(ctx->callbacks, cb);
464 TALLOC_FREE(cb);
470 Send a message to a particular server
472 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
473 struct server_id server, uint32_t msg_type,
474 const DATA_BLOB *data)
476 struct iovec iov;
478 iov.iov_base = data->data;
479 iov.iov_len = data->length;
481 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
484 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
485 struct server_id server, uint32_t msg_type,
486 const uint8_t *buf, size_t len)
488 DATA_BLOB blob = data_blob_const(buf, len);
489 return messaging_send(msg_ctx, server, msg_type, &blob);
492 NTSTATUS messaging_send_iov_from(struct messaging_context *msg_ctx,
493 struct server_id src, struct server_id dst,
494 uint32_t msg_type,
495 const struct iovec *iov, int iovlen,
496 const int *fds, size_t num_fds)
498 int ret;
499 struct messaging_hdr hdr;
500 struct iovec iov2[iovlen+1];
502 if (server_id_is_disconnected(&dst)) {
503 return NT_STATUS_INVALID_PARAMETER_MIX;
506 if (num_fds > INT8_MAX) {
507 return NT_STATUS_INVALID_PARAMETER_MIX;
510 if (!procid_is_local(&dst)) {
511 if (num_fds > 0) {
512 return NT_STATUS_NOT_SUPPORTED;
515 ret = msg_ctx->remote->send_fn(src, dst,
516 msg_type, iov, iovlen,
517 NULL, 0,
518 msg_ctx->remote);
519 if (ret != 0) {
520 return map_nt_error_from_unix(ret);
522 return NT_STATUS_OK;
525 ZERO_STRUCT(hdr);
526 hdr = (struct messaging_hdr) {
527 .msg_type = msg_type,
528 .dst = dst,
529 .src = src
531 iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
532 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
534 become_root();
535 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
536 unbecome_root();
538 if (ret != 0) {
539 return map_nt_error_from_unix(ret);
541 return NT_STATUS_OK;
544 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
545 struct server_id dst, uint32_t msg_type,
546 const struct iovec *iov, int iovlen,
547 const int *fds, size_t num_fds)
549 return messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
550 iov, iovlen, fds, num_fds);
553 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
554 struct messaging_rec *rec)
556 struct messaging_rec *result;
557 size_t fds_size = sizeof(int64_t) * rec->num_fds;
559 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
560 rec->buf.length + fds_size);
561 if (result == NULL) {
562 return NULL;
564 *result = *rec;
566 /* Doesn't fail, see talloc_pooled_object */
568 result->buf.data = talloc_memdup(result, rec->buf.data,
569 rec->buf.length);
571 result->fds = NULL;
572 if (result->num_fds > 0) {
573 result->fds = talloc_array(result, int64_t, result->num_fds);
574 memcpy(result->fds, rec->fds, fds_size);
577 return result;
580 struct messaging_filtered_read_state {
581 struct tevent_context *ev;
582 struct messaging_context *msg_ctx;
583 void *tevent_handle;
585 bool (*filter)(struct messaging_rec *rec, void *private_data);
586 void *private_data;
588 struct messaging_rec *rec;
591 static void messaging_filtered_read_cleanup(struct tevent_req *req,
592 enum tevent_req_state req_state);
594 struct tevent_req *messaging_filtered_read_send(
595 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
596 struct messaging_context *msg_ctx,
597 bool (*filter)(struct messaging_rec *rec, void *private_data),
598 void *private_data)
600 struct tevent_req *req;
601 struct messaging_filtered_read_state *state;
602 size_t new_waiters_len;
604 req = tevent_req_create(mem_ctx, &state,
605 struct messaging_filtered_read_state);
606 if (req == NULL) {
607 return NULL;
609 state->ev = ev;
610 state->msg_ctx = msg_ctx;
611 state->filter = filter;
612 state->private_data = private_data;
615 * We have to defer the callback here, as we might be called from
616 * within a different tevent_context than state->ev
618 tevent_req_defer_callback(req, state->ev);
620 state->tevent_handle = messaging_dgm_register_tevent_context(
621 state, ev);
622 if (tevent_req_nomem(state, req)) {
623 return tevent_req_post(req, ev);
627 * We add ourselves to the "new_waiters" array, not the "waiters"
628 * array. If we are called from within messaging_read_done,
629 * messaging_dispatch_rec will be in an active for-loop on
630 * "waiters". We must be careful not to mess with this array, because
631 * it could mean that a single event is being delivered twice.
634 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
636 if (new_waiters_len == msg_ctx->num_new_waiters) {
637 struct tevent_req **tmp;
639 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
640 struct tevent_req *, new_waiters_len+1);
641 if (tevent_req_nomem(tmp, req)) {
642 return tevent_req_post(req, ev);
644 msg_ctx->new_waiters = tmp;
647 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
648 msg_ctx->num_new_waiters += 1;
649 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
651 return req;
654 static void messaging_filtered_read_cleanup(struct tevent_req *req,
655 enum tevent_req_state req_state)
657 struct messaging_filtered_read_state *state = tevent_req_data(
658 req, struct messaging_filtered_read_state);
659 struct messaging_context *msg_ctx = state->msg_ctx;
660 unsigned i;
662 tevent_req_set_cleanup_fn(req, NULL);
664 TALLOC_FREE(state->tevent_handle);
667 * Just set the [new_]waiters entry to NULL, be careful not to mess
668 * with the other "waiters" array contents. We are often called from
669 * within "messaging_dispatch_rec", which loops over
670 * "waiters". Messing with the "waiters" array will mess up that
671 * for-loop.
674 for (i=0; i<msg_ctx->num_waiters; i++) {
675 if (msg_ctx->waiters[i] == req) {
676 msg_ctx->waiters[i] = NULL;
677 return;
681 for (i=0; i<msg_ctx->num_new_waiters; i++) {
682 if (msg_ctx->new_waiters[i] == req) {
683 msg_ctx->new_waiters[i] = NULL;
684 return;
689 static void messaging_filtered_read_done(struct tevent_req *req,
690 struct messaging_rec *rec)
692 struct messaging_filtered_read_state *state = tevent_req_data(
693 req, struct messaging_filtered_read_state);
695 state->rec = messaging_rec_dup(state, rec);
696 if (tevent_req_nomem(state->rec, req)) {
697 return;
699 tevent_req_done(req);
702 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
703 struct messaging_rec **presult)
705 struct messaging_filtered_read_state *state = tevent_req_data(
706 req, struct messaging_filtered_read_state);
707 int err;
709 if (tevent_req_is_unix_error(req, &err)) {
710 tevent_req_received(req);
711 return err;
713 *presult = talloc_move(mem_ctx, &state->rec);
714 return 0;
717 struct messaging_read_state {
718 uint32_t msg_type;
719 struct messaging_rec *rec;
722 static bool messaging_read_filter(struct messaging_rec *rec,
723 void *private_data);
724 static void messaging_read_done(struct tevent_req *subreq);
726 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
727 struct tevent_context *ev,
728 struct messaging_context *msg,
729 uint32_t msg_type)
731 struct tevent_req *req, *subreq;
732 struct messaging_read_state *state;
734 req = tevent_req_create(mem_ctx, &state,
735 struct messaging_read_state);
736 if (req == NULL) {
737 return NULL;
739 state->msg_type = msg_type;
741 subreq = messaging_filtered_read_send(state, ev, msg,
742 messaging_read_filter, state);
743 if (tevent_req_nomem(subreq, req)) {
744 return tevent_req_post(req, ev);
746 tevent_req_set_callback(subreq, messaging_read_done, req);
747 return req;
750 static bool messaging_read_filter(struct messaging_rec *rec,
751 void *private_data)
753 struct messaging_read_state *state = talloc_get_type_abort(
754 private_data, struct messaging_read_state);
756 if (rec->num_fds != 0) {
757 return false;
760 return rec->msg_type == state->msg_type;
763 static void messaging_read_done(struct tevent_req *subreq)
765 struct tevent_req *req = tevent_req_callback_data(
766 subreq, struct tevent_req);
767 struct messaging_read_state *state = tevent_req_data(
768 req, struct messaging_read_state);
769 int ret;
771 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
772 TALLOC_FREE(subreq);
773 if (tevent_req_error(req, ret)) {
774 return;
776 tevent_req_done(req);
779 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
780 struct messaging_rec **presult)
782 struct messaging_read_state *state = tevent_req_data(
783 req, struct messaging_read_state);
784 int err;
786 if (tevent_req_is_unix_error(req, &err)) {
787 return err;
789 if (presult != NULL) {
790 *presult = talloc_move(mem_ctx, &state->rec);
792 return 0;
795 struct messaging_handler_state {
796 struct tevent_context *ev;
797 struct messaging_context *msg_ctx;
798 uint32_t msg_type;
799 bool (*handler)(struct messaging_context *msg_ctx,
800 struct messaging_rec **rec, void *private_data);
801 void *private_data;
804 static void messaging_handler_got_msg(struct tevent_req *subreq);
806 struct tevent_req *messaging_handler_send(
807 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
808 struct messaging_context *msg_ctx, uint32_t msg_type,
809 bool (*handler)(struct messaging_context *msg_ctx,
810 struct messaging_rec **rec, void *private_data),
811 void *private_data)
813 struct tevent_req *req, *subreq;
814 struct messaging_handler_state *state;
816 req = tevent_req_create(mem_ctx, &state,
817 struct messaging_handler_state);
818 if (req == NULL) {
819 return NULL;
821 state->ev = ev;
822 state->msg_ctx = msg_ctx;
823 state->msg_type = msg_type;
824 state->handler = handler;
825 state->private_data = private_data;
827 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
828 state->msg_type);
829 if (tevent_req_nomem(subreq, req)) {
830 return tevent_req_post(req, ev);
832 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
833 return req;
836 static void messaging_handler_got_msg(struct tevent_req *subreq)
838 struct tevent_req *req = tevent_req_callback_data(
839 subreq, struct tevent_req);
840 struct messaging_handler_state *state = tevent_req_data(
841 req, struct messaging_handler_state);
842 struct messaging_rec *rec;
843 int ret;
844 bool ok;
846 ret = messaging_read_recv(subreq, state, &rec);
847 TALLOC_FREE(subreq);
848 if (tevent_req_error(req, ret)) {
849 return;
852 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
853 state->msg_type);
854 if (tevent_req_nomem(subreq, req)) {
855 return;
857 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
859 ok = state->handler(state->msg_ctx, &rec, state->private_data);
860 TALLOC_FREE(rec);
861 if (ok) {
863 * Next round
865 return;
867 TALLOC_FREE(subreq);
868 tevent_req_done(req);
871 int messaging_handler_recv(struct tevent_req *req)
873 return tevent_req_simple_recv_unix(req);
876 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
878 if (msg_ctx->num_new_waiters == 0) {
879 return true;
882 if (talloc_array_length(msg_ctx->waiters) <
883 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
884 struct tevent_req **tmp;
885 tmp = talloc_realloc(
886 msg_ctx, msg_ctx->waiters, struct tevent_req *,
887 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
888 if (tmp == NULL) {
889 DEBUG(1, ("%s: talloc failed\n", __func__));
890 return false;
892 msg_ctx->waiters = tmp;
895 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
896 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
898 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
899 msg_ctx->num_new_waiters = 0;
901 return true;
905 Dispatch one messaging_rec
907 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
908 struct messaging_rec *rec)
910 struct messaging_callback *cb, *next;
911 unsigned i;
912 size_t j;
914 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
915 next = cb->next;
916 if (cb->msg_type != rec->msg_type) {
917 continue;
921 * the old style callbacks don't support fd passing
923 for (j=0; j < rec->num_fds; j++) {
924 int fd = rec->fds[j];
925 close(fd);
927 rec->num_fds = 0;
928 rec->fds = NULL;
930 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
931 rec->src, &rec->buf);
934 * we continue looking for matching messages after finding
935 * one. This matters for subsystems like the internal notify
936 * code which register more than one handler for the same
937 * message type
941 if (!messaging_append_new_waiters(msg_ctx)) {
942 for (j=0; j < rec->num_fds; j++) {
943 int fd = rec->fds[j];
944 close(fd);
946 rec->num_fds = 0;
947 rec->fds = NULL;
948 return;
951 i = 0;
952 while (i < msg_ctx->num_waiters) {
953 struct tevent_req *req;
954 struct messaging_filtered_read_state *state;
956 req = msg_ctx->waiters[i];
957 if (req == NULL) {
959 * This got cleaned up. In the meantime,
960 * move everything down one. We need
961 * to keep the order of waiters, as
962 * other code may depend on this.
964 if (i < msg_ctx->num_waiters - 1) {
965 memmove(&msg_ctx->waiters[i],
966 &msg_ctx->waiters[i+1],
967 sizeof(struct tevent_req *) *
968 (msg_ctx->num_waiters - i - 1));
970 msg_ctx->num_waiters -= 1;
971 continue;
974 state = tevent_req_data(
975 req, struct messaging_filtered_read_state);
976 if (state->filter(rec, state->private_data)) {
977 messaging_filtered_read_done(req, rec);
980 * Only the first one gets the fd-array
982 rec->num_fds = 0;
983 rec->fds = NULL;
986 i += 1;
990 * If the fd-array isn't used, just close it.
992 for (j=0; j < rec->num_fds; j++) {
993 int fd = rec->fds[j];
994 close(fd);
996 rec->num_fds = 0;
997 rec->fds = NULL;
1000 static int mess_parent_dgm_cleanup(void *private_data);
1001 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1003 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1005 struct tevent_req *req;
1007 req = background_job_send(
1008 msg, msg->event_ctx, msg, NULL, 0,
1009 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1010 60*15),
1011 mess_parent_dgm_cleanup, msg);
1012 if (req == NULL) {
1013 return false;
1015 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1016 return true;
1019 static int mess_parent_dgm_cleanup(void *private_data)
1021 int ret;
1023 ret = messaging_dgm_wipe();
1024 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1025 ret ? strerror(ret) : "ok"));
1026 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1027 60*15);
1030 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1032 struct messaging_context *msg = tevent_req_callback_data(
1033 req, struct messaging_context);
1034 NTSTATUS status;
1036 status = background_job_recv(req);
1037 TALLOC_FREE(req);
1038 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1039 nt_errstr(status)));
1041 req = background_job_send(
1042 msg, msg->event_ctx, msg, NULL, 0,
1043 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1044 60*15),
1045 mess_parent_dgm_cleanup, msg);
1046 if (req == NULL) {
1047 DEBUG(1, ("background_job_send failed\n"));
1049 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1052 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1054 int ret;
1056 if (pid == 0) {
1057 ret = messaging_dgm_wipe();
1058 } else {
1059 ret = messaging_dgm_cleanup(pid);
1062 return ret;
1065 struct tevent_context *messaging_tevent_context(
1066 struct messaging_context *msg_ctx)
1068 return msg_ctx->event_ctx;
1071 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1073 return msg_ctx->names_db;
1076 /** @} **/