s3:messaging: add fds-array to message-backend send function
[Samba.git] / source3 / lib / messages.c
blob0579dbf359944736b80f9d2845db3464c0c02e17
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
56 struct messaging_callback {
57 struct messaging_callback *prev, *next;
58 uint32 msg_type;
59 void (*fn)(struct messaging_context *msg, void *private_data,
60 uint32_t msg_type,
61 struct server_id server_id, DATA_BLOB *data);
62 void *private_data;
65 struct messaging_context {
66 struct server_id id;
67 struct tevent_context *event_ctx;
68 struct messaging_callback *callbacks;
70 struct tevent_req **new_waiters;
71 unsigned num_new_waiters;
73 struct tevent_req **waiters;
74 unsigned num_waiters;
76 struct messaging_backend *remote;
79 struct messaging_hdr {
80 int msg_type;
81 struct server_id dst;
82 struct server_id src;
85 /****************************************************************************
86 A useful function for testing the message system.
87 ****************************************************************************/
89 static void ping_message(struct messaging_context *msg_ctx,
90 void *private_data,
91 uint32_t msg_type,
92 struct server_id src,
93 DATA_BLOB *data)
95 struct server_id_buf idbuf;
97 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
98 server_id_str_buf(src, &idbuf), (int)data->length,
99 data->data ? (char *)data->data : ""));
101 messaging_send(msg_ctx, src, MSG_PONG, data);
104 /****************************************************************************
105 Register/replace a dispatch function for a particular message type.
106 JRA changed Dec 13 2006. Only one message handler now permitted per type.
107 *NOTE*: Dispatch functions must be able to cope with incoming
108 messages on an *odd* byte boundary.
109 ****************************************************************************/
111 struct msg_all {
112 struct messaging_context *msg_ctx;
113 int msg_type;
114 uint32 msg_flag;
115 const void *buf;
116 size_t len;
117 int n_sent;
120 /****************************************************************************
121 Send one of the messages for the broadcast.
122 ****************************************************************************/
124 static int traverse_fn(struct db_record *rec, const struct server_id *id,
125 uint32_t msg_flags, void *state)
127 struct msg_all *msg_all = (struct msg_all *)state;
128 NTSTATUS status;
130 /* Don't send if the receiver hasn't registered an interest. */
132 if((msg_flags & msg_all->msg_flag) == 0) {
133 return 0;
136 /* If the msg send fails because the pid was not found (i.e. smbd died),
137 * the msg has already been deleted from the messages.tdb.*/
139 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
140 (const uint8_t *)msg_all->buf, msg_all->len);
142 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
143 struct server_id_buf idbuf;
146 * If the pid was not found delete the entry from
147 * serverid.tdb
150 DEBUG(2, ("pid %s doesn't exist\n",
151 server_id_str_buf(*id, &idbuf)));
153 dbwrap_record_delete(rec);
155 msg_all->n_sent++;
156 return 0;
160 * Send a message to all smbd processes.
162 * It isn't very efficient, but should be OK for the sorts of
163 * applications that use it. When we need efficient broadcast we can add
164 * it.
166 * @param n_sent Set to the number of messages sent. This should be
167 * equal to the number of processes, but be careful for races.
169 * @retval True for success.
171 bool message_send_all(struct messaging_context *msg_ctx,
172 int msg_type,
173 const void *buf, size_t len,
174 int *n_sent)
176 struct msg_all msg_all;
178 msg_all.msg_type = msg_type;
179 if (msg_type < 0x100) {
180 msg_all.msg_flag = FLAG_MSG_GENERAL;
181 } else if (msg_type > 0x100 && msg_type < 0x200) {
182 msg_all.msg_flag = FLAG_MSG_NMBD;
183 } else if (msg_type > 0x200 && msg_type < 0x300) {
184 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
185 } else if (msg_type > 0x300 && msg_type < 0x400) {
186 msg_all.msg_flag = FLAG_MSG_SMBD;
187 } else if (msg_type > 0x400 && msg_type < 0x600) {
188 msg_all.msg_flag = FLAG_MSG_WINBIND;
189 } else if (msg_type > 4000 && msg_type < 5000) {
190 msg_all.msg_flag = FLAG_MSG_DBWRAP;
191 } else {
192 return false;
195 msg_all.buf = buf;
196 msg_all.len = len;
197 msg_all.n_sent = 0;
198 msg_all.msg_ctx = msg_ctx;
200 serverid_traverse(traverse_fn, &msg_all);
201 if (n_sent)
202 *n_sent = msg_all.n_sent;
203 return true;
206 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
207 const int *fds, size_t num_fds,
208 void *private_data)
210 struct messaging_context *msg_ctx = talloc_get_type_abort(
211 private_data, struct messaging_context);
212 const struct messaging_hdr *hdr;
213 struct server_id_buf idbuf;
214 struct messaging_rec rec;
215 int64_t fds64[MIN(num_fds, INT8_MAX)];
216 size_t i;
218 if (msg_len < sizeof(*hdr)) {
219 for (i=0; i < num_fds; i++) {
220 close(fds[i]);
222 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223 return;
226 if (num_fds > INT8_MAX) {
227 for (i=0; i < num_fds; i++) {
228 close(fds[i]);
230 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
231 return;
234 for (i=0; i < num_fds; i++) {
235 fds64[i] = fds[i];
239 * messages_dgm guarantees alignment, so we can cast here
241 hdr = (const struct messaging_hdr *)msg;
243 DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
244 __func__, (unsigned)hdr->msg_type,
245 (unsigned)(msg_len - sizeof(*hdr)),
246 (unsigned)num_fds,
247 server_id_str_buf(hdr->src, &idbuf)));
249 rec = (struct messaging_rec) {
250 .msg_version = MESSAGE_VERSION,
251 .msg_type = hdr->msg_type,
252 .src = hdr->src,
253 .dest = hdr->dst,
254 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
255 .buf.length = msg_len - sizeof(*hdr),
256 .num_fds = num_fds,
257 .fds = fds64,
260 messaging_dispatch_rec(msg_ctx, &rec);
263 static int messaging_context_destructor(struct messaging_context *ctx)
265 messaging_dgm_destroy();
266 return 0;
269 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
270 struct tevent_context *ev)
272 struct messaging_context *ctx;
273 NTSTATUS status;
274 int ret;
276 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
277 return NULL;
280 ctx->id = procid_self();
281 ctx->event_ctx = ev;
283 sec_init();
285 ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
286 lp_cache_directory(), sec_initial_uid(),
287 messaging_recv_cb, ctx);
289 if (ret != 0) {
290 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
291 TALLOC_FREE(ctx);
292 return NULL;
295 talloc_set_destructor(ctx, messaging_context_destructor);
297 if (lp_clustering()) {
298 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
300 if (!NT_STATUS_IS_OK(status)) {
301 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
302 nt_errstr(status)));
303 TALLOC_FREE(ctx);
304 return NULL;
307 ctx->id.vnn = get_my_vnn();
309 messaging_register(ctx, NULL, MSG_PING, ping_message);
311 /* Register some debugging related messages */
313 register_msg_pool_usage(ctx);
314 register_dmalloc_msgs(ctx);
315 debug_register_msgs(ctx);
317 return ctx;
320 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
322 return msg_ctx->id;
326 * re-init after a fork
328 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
330 NTSTATUS status;
331 int ret;
333 messaging_dgm_destroy();
335 msg_ctx->id = procid_self();
337 ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
338 lp_cache_directory(), sec_initial_uid(),
339 messaging_recv_cb, msg_ctx);
340 if (ret != 0) {
341 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
342 return map_nt_error_from_unix(ret);
345 TALLOC_FREE(msg_ctx->remote);
347 if (lp_clustering()) {
348 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
349 &msg_ctx->remote);
351 if (!NT_STATUS_IS_OK(status)) {
352 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
353 nt_errstr(status)));
354 return status;
358 return NT_STATUS_OK;
363 * Register a dispatch function for a particular message type. Allow multiple
364 * registrants
366 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
367 void *private_data,
368 uint32_t msg_type,
369 void (*fn)(struct messaging_context *msg,
370 void *private_data,
371 uint32_t msg_type,
372 struct server_id server_id,
373 DATA_BLOB *data))
375 struct messaging_callback *cb;
377 DEBUG(5, ("Registering messaging pointer for type %u - "
378 "private_data=%p\n",
379 (unsigned)msg_type, private_data));
382 * Only one callback per type
385 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
386 /* we allow a second registration of the same message
387 type if it has a different private pointer. This is
388 needed in, for example, the internal notify code,
389 which creates a new notify context for each tree
390 connect, and expects to receive messages to each of
391 them. */
392 if (cb->msg_type == msg_type && private_data == cb->private_data) {
393 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
394 (unsigned)msg_type, private_data));
395 cb->fn = fn;
396 cb->private_data = private_data;
397 return NT_STATUS_OK;
401 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
402 return NT_STATUS_NO_MEMORY;
405 cb->msg_type = msg_type;
406 cb->fn = fn;
407 cb->private_data = private_data;
409 DLIST_ADD(msg_ctx->callbacks, cb);
410 return NT_STATUS_OK;
414 De-register the function for a particular message type.
416 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
417 void *private_data)
419 struct messaging_callback *cb, *next;
421 for (cb = ctx->callbacks; cb; cb = next) {
422 next = cb->next;
423 if ((cb->msg_type == msg_type)
424 && (cb->private_data == private_data)) {
425 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
426 (unsigned)msg_type, private_data));
427 DLIST_REMOVE(ctx->callbacks, cb);
428 TALLOC_FREE(cb);
434 Send a message to a particular server
436 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
437 struct server_id server, uint32_t msg_type,
438 const DATA_BLOB *data)
440 struct iovec iov;
442 iov.iov_base = data->data;
443 iov.iov_len = data->length;
445 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
448 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
449 struct server_id server, uint32_t msg_type,
450 const uint8_t *buf, size_t len)
452 DATA_BLOB blob = data_blob_const(buf, len);
453 return messaging_send(msg_ctx, server, msg_type, &blob);
456 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
457 struct server_id server, uint32_t msg_type,
458 const struct iovec *iov, int iovlen)
460 int ret;
461 struct messaging_hdr hdr;
462 struct iovec iov2[iovlen+1];
464 if (server_id_is_disconnected(&server)) {
465 return NT_STATUS_INVALID_PARAMETER_MIX;
468 if (!procid_is_local(&server)) {
469 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
470 msg_type, iov, iovlen,
471 NULL, 0,
472 msg_ctx->remote);
473 if (ret != 0) {
474 return map_nt_error_from_unix(ret);
476 return NT_STATUS_OK;
479 if (server_id_same_process(&msg_ctx->id, &server)) {
480 struct messaging_rec rec;
481 uint8_t *buf;
484 * Self-send, directly dispatch
487 buf = iov_buf(talloc_tos(), iov, iovlen);
488 if (buf == NULL) {
489 return NT_STATUS_NO_MEMORY;
492 rec = (struct messaging_rec) {
493 .msg_version = MESSAGE_VERSION,
494 .msg_type = msg_type & MSG_TYPE_MASK,
495 .dest = server,
496 .src = msg_ctx->id,
497 .buf = data_blob_const(buf, talloc_get_size(buf)),
500 messaging_dispatch_rec(msg_ctx, &rec);
501 TALLOC_FREE(buf);
502 return NT_STATUS_OK;
505 hdr = (struct messaging_hdr) {
506 .msg_type = msg_type,
507 .dst = server,
508 .src = msg_ctx->id
510 iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
511 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
513 become_root();
514 ret = messaging_dgm_send(server.pid, iov2, iovlen+1, NULL, 0);
515 unbecome_root();
517 if (ret != 0) {
518 return map_nt_error_from_unix(ret);
520 return NT_STATUS_OK;
523 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
524 struct messaging_rec *rec)
526 struct messaging_rec *result;
527 size_t fds_size = sizeof(int64_t) * rec->num_fds;
529 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
530 rec->buf.length + fds_size);
531 if (result == NULL) {
532 return NULL;
534 *result = *rec;
536 /* Doesn't fail, see talloc_pooled_object */
538 result->buf.data = talloc_memdup(result, rec->buf.data,
539 rec->buf.length);
541 result->fds = NULL;
542 if (result->num_fds > 0) {
543 result->fds = talloc_array(result, int64_t, result->num_fds);
544 memcpy(result->fds, rec->fds, fds_size);
547 return result;
550 struct messaging_filtered_read_state {
551 struct tevent_context *ev;
552 struct messaging_context *msg_ctx;
553 void *tevent_handle;
555 bool (*filter)(struct messaging_rec *rec, void *private_data);
556 void *private_data;
558 struct messaging_rec *rec;
561 static void messaging_filtered_read_cleanup(struct tevent_req *req,
562 enum tevent_req_state req_state);
564 struct tevent_req *messaging_filtered_read_send(
565 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
566 struct messaging_context *msg_ctx,
567 bool (*filter)(struct messaging_rec *rec, void *private_data),
568 void *private_data)
570 struct tevent_req *req;
571 struct messaging_filtered_read_state *state;
572 size_t new_waiters_len;
574 req = tevent_req_create(mem_ctx, &state,
575 struct messaging_filtered_read_state);
576 if (req == NULL) {
577 return NULL;
579 state->ev = ev;
580 state->msg_ctx = msg_ctx;
581 state->filter = filter;
582 state->private_data = private_data;
585 * We have to defer the callback here, as we might be called from
586 * within a different tevent_context than state->ev
588 tevent_req_defer_callback(req, state->ev);
590 state->tevent_handle = messaging_dgm_register_tevent_context(
591 state, ev);
592 if (tevent_req_nomem(state, req)) {
593 return tevent_req_post(req, ev);
597 * We add ourselves to the "new_waiters" array, not the "waiters"
598 * array. If we are called from within messaging_read_done,
599 * messaging_dispatch_rec will be in an active for-loop on
600 * "waiters". We must be careful not to mess with this array, because
601 * it could mean that a single event is being delivered twice.
604 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
606 if (new_waiters_len == msg_ctx->num_new_waiters) {
607 struct tevent_req **tmp;
609 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
610 struct tevent_req *, new_waiters_len+1);
611 if (tevent_req_nomem(tmp, req)) {
612 return tevent_req_post(req, ev);
614 msg_ctx->new_waiters = tmp;
617 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
618 msg_ctx->num_new_waiters += 1;
619 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
621 return req;
624 static void messaging_filtered_read_cleanup(struct tevent_req *req,
625 enum tevent_req_state req_state)
627 struct messaging_filtered_read_state *state = tevent_req_data(
628 req, struct messaging_filtered_read_state);
629 struct messaging_context *msg_ctx = state->msg_ctx;
630 unsigned i;
632 tevent_req_set_cleanup_fn(req, NULL);
634 TALLOC_FREE(state->tevent_handle);
637 * Just set the [new_]waiters entry to NULL, be careful not to mess
638 * with the other "waiters" array contents. We are often called from
639 * within "messaging_dispatch_rec", which loops over
640 * "waiters". Messing with the "waiters" array will mess up that
641 * for-loop.
644 for (i=0; i<msg_ctx->num_waiters; i++) {
645 if (msg_ctx->waiters[i] == req) {
646 msg_ctx->waiters[i] = NULL;
647 return;
651 for (i=0; i<msg_ctx->num_new_waiters; i++) {
652 if (msg_ctx->new_waiters[i] == req) {
653 msg_ctx->new_waiters[i] = NULL;
654 return;
659 static void messaging_filtered_read_done(struct tevent_req *req,
660 struct messaging_rec *rec)
662 struct messaging_filtered_read_state *state = tevent_req_data(
663 req, struct messaging_filtered_read_state);
665 state->rec = messaging_rec_dup(state, rec);
666 if (tevent_req_nomem(state->rec, req)) {
667 return;
669 tevent_req_done(req);
672 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
673 struct messaging_rec **presult)
675 struct messaging_filtered_read_state *state = tevent_req_data(
676 req, struct messaging_filtered_read_state);
677 int err;
679 if (tevent_req_is_unix_error(req, &err)) {
680 tevent_req_received(req);
681 return err;
683 *presult = talloc_move(mem_ctx, &state->rec);
684 return 0;
687 struct messaging_read_state {
688 uint32_t msg_type;
689 struct messaging_rec *rec;
692 static bool messaging_read_filter(struct messaging_rec *rec,
693 void *private_data);
694 static void messaging_read_done(struct tevent_req *subreq);
696 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
697 struct tevent_context *ev,
698 struct messaging_context *msg,
699 uint32_t msg_type)
701 struct tevent_req *req, *subreq;
702 struct messaging_read_state *state;
704 req = tevent_req_create(mem_ctx, &state,
705 struct messaging_read_state);
706 if (req == NULL) {
707 return NULL;
709 state->msg_type = msg_type;
711 subreq = messaging_filtered_read_send(state, ev, msg,
712 messaging_read_filter, state);
713 if (tevent_req_nomem(subreq, req)) {
714 return tevent_req_post(req, ev);
716 tevent_req_set_callback(subreq, messaging_read_done, req);
717 return req;
720 static bool messaging_read_filter(struct messaging_rec *rec,
721 void *private_data)
723 struct messaging_read_state *state = talloc_get_type_abort(
724 private_data, struct messaging_read_state);
726 if (rec->num_fds != 0) {
727 return false;
730 return rec->msg_type == state->msg_type;
733 static void messaging_read_done(struct tevent_req *subreq)
735 struct tevent_req *req = tevent_req_callback_data(
736 subreq, struct tevent_req);
737 struct messaging_read_state *state = tevent_req_data(
738 req, struct messaging_read_state);
739 int ret;
741 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
742 TALLOC_FREE(subreq);
743 if (tevent_req_error(req, ret)) {
744 return;
746 tevent_req_done(req);
749 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
750 struct messaging_rec **presult)
752 struct messaging_read_state *state = tevent_req_data(
753 req, struct messaging_read_state);
754 int err;
756 if (tevent_req_is_unix_error(req, &err)) {
757 return err;
759 if (presult != NULL) {
760 *presult = talloc_move(mem_ctx, &state->rec);
762 return 0;
765 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
767 if (msg_ctx->num_new_waiters == 0) {
768 return true;
771 if (talloc_array_length(msg_ctx->waiters) <
772 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
773 struct tevent_req **tmp;
774 tmp = talloc_realloc(
775 msg_ctx, msg_ctx->waiters, struct tevent_req *,
776 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
777 if (tmp == NULL) {
778 DEBUG(1, ("%s: talloc failed\n", __func__));
779 return false;
781 msg_ctx->waiters = tmp;
784 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
785 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
787 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
788 msg_ctx->num_new_waiters = 0;
790 return true;
793 struct messaging_defer_callback_state {
794 struct messaging_context *msg_ctx;
795 struct messaging_rec *rec;
796 void (*fn)(struct messaging_context *msg, void *private_data,
797 uint32_t msg_type, struct server_id server_id,
798 DATA_BLOB *data);
799 void *private_data;
802 static void messaging_defer_callback_trigger(struct tevent_context *ev,
803 struct tevent_immediate *im,
804 void *private_data);
806 static void messaging_defer_callback(
807 struct messaging_context *msg_ctx, struct messaging_rec *rec,
808 void (*fn)(struct messaging_context *msg, void *private_data,
809 uint32_t msg_type, struct server_id server_id,
810 DATA_BLOB *data),
811 void *private_data)
813 struct messaging_defer_callback_state *state;
814 struct tevent_immediate *im;
816 state = talloc(msg_ctx, struct messaging_defer_callback_state);
817 if (state == NULL) {
818 DEBUG(1, ("talloc failed\n"));
819 return;
821 state->msg_ctx = msg_ctx;
822 state->fn = fn;
823 state->private_data = private_data;
825 state->rec = messaging_rec_dup(state, rec);
826 if (state->rec == NULL) {
827 DEBUG(1, ("talloc failed\n"));
828 TALLOC_FREE(state);
829 return;
832 im = tevent_create_immediate(state);
833 if (im == NULL) {
834 DEBUG(1, ("tevent_create_immediate failed\n"));
835 TALLOC_FREE(state);
836 return;
838 tevent_schedule_immediate(im, msg_ctx->event_ctx,
839 messaging_defer_callback_trigger, state);
842 static void messaging_defer_callback_trigger(struct tevent_context *ev,
843 struct tevent_immediate *im,
844 void *private_data)
846 struct messaging_defer_callback_state *state = talloc_get_type_abort(
847 private_data, struct messaging_defer_callback_state);
848 struct messaging_rec *rec = state->rec;
850 state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
851 &rec->buf);
852 TALLOC_FREE(state);
856 Dispatch one messaging_rec
858 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
859 struct messaging_rec *rec)
861 struct messaging_callback *cb, *next;
862 unsigned i;
863 size_t j;
865 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
866 next = cb->next;
867 if (cb->msg_type != rec->msg_type) {
868 continue;
872 * the old style callbacks don't support fd passing
874 for (j=0; j < rec->num_fds; j++) {
875 int fd = rec->fds[j];
876 close(fd);
878 rec->num_fds = 0;
879 rec->fds = NULL;
881 if (server_id_same_process(&rec->src, &rec->dest)) {
883 * This is a self-send. We are called here from
884 * messaging_send(), and we don't want to directly
885 * recurse into the callback but go via a
886 * tevent_loop_once
888 messaging_defer_callback(msg_ctx, rec, cb->fn,
889 cb->private_data);
890 } else {
892 * This comes from a different process. we are called
893 * from the event loop, so we should call back
894 * directly.
896 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
897 rec->src, &rec->buf);
900 * we continue looking for matching messages after finding
901 * one. This matters for subsystems like the internal notify
902 * code which register more than one handler for the same
903 * message type
907 if (!messaging_append_new_waiters(msg_ctx)) {
908 for (j=0; j < rec->num_fds; j++) {
909 int fd = rec->fds[j];
910 close(fd);
912 rec->num_fds = 0;
913 rec->fds = NULL;
914 return;
917 i = 0;
918 while (i < msg_ctx->num_waiters) {
919 struct tevent_req *req;
920 struct messaging_filtered_read_state *state;
922 req = msg_ctx->waiters[i];
923 if (req == NULL) {
925 * This got cleaned up. In the meantime,
926 * move everything down one. We need
927 * to keep the order of waiters, as
928 * other code may depend on this.
930 if (i < msg_ctx->num_waiters - 1) {
931 memmove(&msg_ctx->waiters[i],
932 &msg_ctx->waiters[i+1],
933 sizeof(struct tevent_req *) *
934 (msg_ctx->num_waiters - i - 1));
936 msg_ctx->num_waiters -= 1;
937 continue;
940 state = tevent_req_data(
941 req, struct messaging_filtered_read_state);
942 if (state->filter(rec, state->private_data)) {
943 messaging_filtered_read_done(req, rec);
946 * Only the first one gets the fd-array
948 rec->num_fds = 0;
949 rec->fds = NULL;
952 i += 1;
956 * If the fd-array isn't used, just close it.
958 for (j=0; j < rec->num_fds; j++) {
959 int fd = rec->fds[j];
960 close(fd);
962 rec->num_fds = 0;
963 rec->fds = NULL;
966 static int mess_parent_dgm_cleanup(void *private_data);
967 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
969 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
971 struct tevent_req *req;
973 req = background_job_send(
974 msg, msg->event_ctx, msg, NULL, 0,
975 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
976 60*15),
977 mess_parent_dgm_cleanup, msg);
978 if (req == NULL) {
979 return false;
981 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
982 return true;
985 static int mess_parent_dgm_cleanup(void *private_data)
987 int ret;
989 ret = messaging_dgm_wipe();
990 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
991 ret ? strerror(ret) : "ok"));
992 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
993 60*15);
996 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
998 struct messaging_context *msg = tevent_req_callback_data(
999 req, struct messaging_context);
1000 NTSTATUS status;
1002 status = background_job_recv(req);
1003 TALLOC_FREE(req);
1004 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1005 nt_errstr(status)));
1007 req = background_job_send(
1008 msg, msg->event_ctx, msg, NULL, 0,
1009 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1010 60*15),
1011 mess_parent_dgm_cleanup, msg);
1012 if (req == NULL) {
1013 DEBUG(1, ("background_job_send failed\n"));
1015 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1018 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1020 int ret;
1022 if (pid == 0) {
1023 ret = messaging_dgm_wipe();
1024 } else {
1025 ret = messaging_dgm_cleanup(pid);
1028 return ret;
1031 struct tevent_context *messaging_tevent_context(
1032 struct messaging_context *msg_ctx)
1034 return msg_ctx->event_ctx;
1037 /** @} **/