s4:torture: Add smb2.oplock test batch9a and raw.oplock test batch9a
[Samba.git] / source3 / lib / messages.c
blobaaaee52e3a6f53398c22f7ea4b537b9425f5b54e
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
56 struct messaging_callback {
57 struct messaging_callback *prev, *next;
58 uint32 msg_type;
59 void (*fn)(struct messaging_context *msg, void *private_data,
60 uint32_t msg_type,
61 struct server_id server_id, DATA_BLOB *data);
62 void *private_data;
65 struct messaging_context {
66 struct server_id id;
67 struct tevent_context *event_ctx;
68 struct messaging_callback *callbacks;
70 struct tevent_req **new_waiters;
71 unsigned num_new_waiters;
73 struct tevent_req **waiters;
74 unsigned num_waiters;
76 struct messaging_backend *remote;
79 struct messaging_hdr {
80 uint32_t msg_type;
81 struct server_id dst;
82 struct server_id src;
85 /****************************************************************************
86 A useful function for testing the message system.
87 ****************************************************************************/
89 static void ping_message(struct messaging_context *msg_ctx,
90 void *private_data,
91 uint32_t msg_type,
92 struct server_id src,
93 DATA_BLOB *data)
95 struct server_id_buf idbuf;
97 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
98 server_id_str_buf(src, &idbuf), (int)data->length,
99 data->data ? (char *)data->data : ""));
101 messaging_send(msg_ctx, src, MSG_PONG, data);
104 /****************************************************************************
105 Register/replace a dispatch function for a particular message type.
106 JRA changed Dec 13 2006. Only one message handler now permitted per type.
107 *NOTE*: Dispatch functions must be able to cope with incoming
108 messages on an *odd* byte boundary.
109 ****************************************************************************/
111 struct msg_all {
112 struct messaging_context *msg_ctx;
113 int msg_type;
114 uint32 msg_flag;
115 const void *buf;
116 size_t len;
117 int n_sent;
120 /****************************************************************************
121 Send one of the messages for the broadcast.
122 ****************************************************************************/
124 static int traverse_fn(struct db_record *rec, const struct server_id *id,
125 uint32_t msg_flags, void *state)
127 struct msg_all *msg_all = (struct msg_all *)state;
128 NTSTATUS status;
130 /* Don't send if the receiver hasn't registered an interest. */
132 if((msg_flags & msg_all->msg_flag) == 0) {
133 return 0;
136 /* If the msg send fails because the pid was not found (i.e. smbd died),
137 * the msg has already been deleted from the messages.tdb.*/
139 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
140 (const uint8_t *)msg_all->buf, msg_all->len);
142 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
143 struct server_id_buf idbuf;
146 * If the pid was not found delete the entry from
147 * serverid.tdb
150 DEBUG(2, ("pid %s doesn't exist\n",
151 server_id_str_buf(*id, &idbuf)));
153 dbwrap_record_delete(rec);
155 msg_all->n_sent++;
156 return 0;
160 * Send a message to all smbd processes.
162 * It isn't very efficient, but should be OK for the sorts of
163 * applications that use it. When we need efficient broadcast we can add
164 * it.
166 * @param n_sent Set to the number of messages sent. This should be
167 * equal to the number of processes, but be careful for races.
169 * @retval True for success.
171 bool message_send_all(struct messaging_context *msg_ctx,
172 int msg_type,
173 const void *buf, size_t len,
174 int *n_sent)
176 struct msg_all msg_all;
178 msg_all.msg_type = msg_type;
179 if (msg_type < 0x100) {
180 msg_all.msg_flag = FLAG_MSG_GENERAL;
181 } else if (msg_type > 0x100 && msg_type < 0x200) {
182 msg_all.msg_flag = FLAG_MSG_NMBD;
183 } else if (msg_type > 0x200 && msg_type < 0x300) {
184 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
185 } else if (msg_type > 0x300 && msg_type < 0x400) {
186 msg_all.msg_flag = FLAG_MSG_SMBD;
187 } else if (msg_type > 0x400 && msg_type < 0x600) {
188 msg_all.msg_flag = FLAG_MSG_WINBIND;
189 } else if (msg_type > 4000 && msg_type < 5000) {
190 msg_all.msg_flag = FLAG_MSG_DBWRAP;
191 } else {
192 return false;
195 msg_all.buf = buf;
196 msg_all.len = len;
197 msg_all.n_sent = 0;
198 msg_all.msg_ctx = msg_ctx;
200 serverid_traverse(traverse_fn, &msg_all);
201 if (n_sent)
202 *n_sent = msg_all.n_sent;
203 return true;
206 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
207 int *fds, size_t num_fds,
208 void *private_data)
210 struct messaging_context *msg_ctx = talloc_get_type_abort(
211 private_data, struct messaging_context);
212 const struct messaging_hdr *hdr;
213 struct server_id_buf idbuf;
214 struct messaging_rec rec;
215 int64_t fds64[MIN(num_fds, INT8_MAX)];
216 size_t i;
218 if (msg_len < sizeof(*hdr)) {
219 for (i=0; i < num_fds; i++) {
220 close(fds[i]);
222 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223 return;
226 if (num_fds > INT8_MAX) {
227 for (i=0; i < num_fds; i++) {
228 close(fds[i]);
230 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
231 return;
235 * "consume" the fds by copying them and setting
236 * the original variable to -1
238 for (i=0; i < num_fds; i++) {
239 fds64[i] = fds[i];
240 fds[i] = -1;
244 * messages_dgm guarantees alignment, so we can cast here
246 hdr = (const struct messaging_hdr *)msg;
248 DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
249 __func__, (unsigned)hdr->msg_type,
250 (unsigned)(msg_len - sizeof(*hdr)),
251 (unsigned)num_fds,
252 server_id_str_buf(hdr->src, &idbuf)));
254 rec = (struct messaging_rec) {
255 .msg_version = MESSAGE_VERSION,
256 .msg_type = hdr->msg_type,
257 .src = hdr->src,
258 .dest = hdr->dst,
259 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
260 .buf.length = msg_len - sizeof(*hdr),
261 .num_fds = num_fds,
262 .fds = fds64,
265 messaging_dispatch_rec(msg_ctx, &rec);
268 static int messaging_context_destructor(struct messaging_context *ctx)
270 messaging_dgm_destroy();
271 return 0;
274 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
275 struct tevent_context *ev)
277 struct messaging_context *ctx;
278 NTSTATUS status;
279 int ret;
281 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
282 return NULL;
285 ctx->id = procid_self();
286 ctx->event_ctx = ev;
288 sec_init();
290 ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
291 lp_cache_directory(), sec_initial_uid(),
292 messaging_recv_cb, ctx);
294 if (ret != 0) {
295 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
296 TALLOC_FREE(ctx);
297 return NULL;
300 talloc_set_destructor(ctx, messaging_context_destructor);
302 if (lp_clustering()) {
303 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
305 if (!NT_STATUS_IS_OK(status)) {
306 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
307 nt_errstr(status)));
308 TALLOC_FREE(ctx);
309 return NULL;
312 ctx->id.vnn = get_my_vnn();
314 messaging_register(ctx, NULL, MSG_PING, ping_message);
316 /* Register some debugging related messages */
318 register_msg_pool_usage(ctx);
319 register_dmalloc_msgs(ctx);
320 debug_register_msgs(ctx);
322 return ctx;
325 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
327 return msg_ctx->id;
331 * re-init after a fork
333 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
335 NTSTATUS status;
336 int ret;
338 messaging_dgm_destroy();
340 msg_ctx->id = procid_self();
342 ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
343 lp_cache_directory(), sec_initial_uid(),
344 messaging_recv_cb, msg_ctx);
345 if (ret != 0) {
346 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
347 return map_nt_error_from_unix(ret);
350 TALLOC_FREE(msg_ctx->remote);
352 if (lp_clustering()) {
353 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
354 &msg_ctx->remote);
356 if (!NT_STATUS_IS_OK(status)) {
357 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
358 nt_errstr(status)));
359 return status;
363 return NT_STATUS_OK;
368 * Register a dispatch function for a particular message type. Allow multiple
369 * registrants
371 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
372 void *private_data,
373 uint32_t msg_type,
374 void (*fn)(struct messaging_context *msg,
375 void *private_data,
376 uint32_t msg_type,
377 struct server_id server_id,
378 DATA_BLOB *data))
380 struct messaging_callback *cb;
382 DEBUG(5, ("Registering messaging pointer for type %u - "
383 "private_data=%p\n",
384 (unsigned)msg_type, private_data));
387 * Only one callback per type
390 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
391 /* we allow a second registration of the same message
392 type if it has a different private pointer. This is
393 needed in, for example, the internal notify code,
394 which creates a new notify context for each tree
395 connect, and expects to receive messages to each of
396 them. */
397 if (cb->msg_type == msg_type && private_data == cb->private_data) {
398 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
399 (unsigned)msg_type, private_data));
400 cb->fn = fn;
401 cb->private_data = private_data;
402 return NT_STATUS_OK;
406 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
407 return NT_STATUS_NO_MEMORY;
410 cb->msg_type = msg_type;
411 cb->fn = fn;
412 cb->private_data = private_data;
414 DLIST_ADD(msg_ctx->callbacks, cb);
415 return NT_STATUS_OK;
419 De-register the function for a particular message type.
421 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
422 void *private_data)
424 struct messaging_callback *cb, *next;
426 for (cb = ctx->callbacks; cb; cb = next) {
427 next = cb->next;
428 if ((cb->msg_type == msg_type)
429 && (cb->private_data == private_data)) {
430 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
431 (unsigned)msg_type, private_data));
432 DLIST_REMOVE(ctx->callbacks, cb);
433 TALLOC_FREE(cb);
439 Send a message to a particular server
441 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
442 struct server_id server, uint32_t msg_type,
443 const DATA_BLOB *data)
445 struct iovec iov;
447 iov.iov_base = data->data;
448 iov.iov_len = data->length;
450 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
453 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
454 struct server_id server, uint32_t msg_type,
455 const uint8_t *buf, size_t len)
457 DATA_BLOB blob = data_blob_const(buf, len);
458 return messaging_send(msg_ctx, server, msg_type, &blob);
461 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
462 struct server_id server, uint32_t msg_type,
463 const struct iovec *iov, int iovlen,
464 const int *fds, size_t num_fds)
466 int ret;
467 struct messaging_hdr hdr;
468 struct iovec iov2[iovlen+1];
470 if (server_id_is_disconnected(&server)) {
471 return NT_STATUS_INVALID_PARAMETER_MIX;
474 if (num_fds > INT8_MAX) {
475 return NT_STATUS_INVALID_PARAMETER_MIX;
478 if (!procid_is_local(&server)) {
479 if (num_fds > 0) {
480 return NT_STATUS_NOT_SUPPORTED;
483 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
484 msg_type, iov, iovlen,
485 NULL, 0,
486 msg_ctx->remote);
487 if (ret != 0) {
488 return map_nt_error_from_unix(ret);
490 return NT_STATUS_OK;
493 if (server_id_same_process(&msg_ctx->id, &server)) {
494 struct messaging_rec rec;
495 uint8_t *buf;
498 * Self-send, directly dispatch
501 if (num_fds > 0) {
502 return NT_STATUS_NOT_SUPPORTED;
505 buf = iov_buf(talloc_tos(), iov, iovlen);
506 if (buf == NULL) {
507 return NT_STATUS_NO_MEMORY;
510 rec = (struct messaging_rec) {
511 .msg_version = MESSAGE_VERSION,
512 .msg_type = msg_type & MSG_TYPE_MASK,
513 .dest = server,
514 .src = msg_ctx->id,
515 .buf = data_blob_const(buf, talloc_get_size(buf)),
518 messaging_dispatch_rec(msg_ctx, &rec);
519 TALLOC_FREE(buf);
520 return NT_STATUS_OK;
523 ZERO_STRUCT(hdr);
524 hdr = (struct messaging_hdr) {
525 .msg_type = msg_type,
526 .dst = server,
527 .src = msg_ctx->id
529 iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
530 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
532 become_root();
533 ret = messaging_dgm_send(server.pid, iov2, iovlen+1, fds, num_fds);
534 unbecome_root();
536 if (ret != 0) {
537 return map_nt_error_from_unix(ret);
539 return NT_STATUS_OK;
542 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
543 struct messaging_rec *rec)
545 struct messaging_rec *result;
546 size_t fds_size = sizeof(int64_t) * rec->num_fds;
548 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
549 rec->buf.length + fds_size);
550 if (result == NULL) {
551 return NULL;
553 *result = *rec;
555 /* Doesn't fail, see talloc_pooled_object */
557 result->buf.data = talloc_memdup(result, rec->buf.data,
558 rec->buf.length);
560 result->fds = NULL;
561 if (result->num_fds > 0) {
562 result->fds = talloc_array(result, int64_t, result->num_fds);
563 memcpy(result->fds, rec->fds, fds_size);
566 return result;
569 struct messaging_filtered_read_state {
570 struct tevent_context *ev;
571 struct messaging_context *msg_ctx;
572 void *tevent_handle;
574 bool (*filter)(struct messaging_rec *rec, void *private_data);
575 void *private_data;
577 struct messaging_rec *rec;
580 static void messaging_filtered_read_cleanup(struct tevent_req *req,
581 enum tevent_req_state req_state);
583 struct tevent_req *messaging_filtered_read_send(
584 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
585 struct messaging_context *msg_ctx,
586 bool (*filter)(struct messaging_rec *rec, void *private_data),
587 void *private_data)
589 struct tevent_req *req;
590 struct messaging_filtered_read_state *state;
591 size_t new_waiters_len;
593 req = tevent_req_create(mem_ctx, &state,
594 struct messaging_filtered_read_state);
595 if (req == NULL) {
596 return NULL;
598 state->ev = ev;
599 state->msg_ctx = msg_ctx;
600 state->filter = filter;
601 state->private_data = private_data;
604 * We have to defer the callback here, as we might be called from
605 * within a different tevent_context than state->ev
607 tevent_req_defer_callback(req, state->ev);
609 state->tevent_handle = messaging_dgm_register_tevent_context(
610 state, ev);
611 if (tevent_req_nomem(state, req)) {
612 return tevent_req_post(req, ev);
616 * We add ourselves to the "new_waiters" array, not the "waiters"
617 * array. If we are called from within messaging_read_done,
618 * messaging_dispatch_rec will be in an active for-loop on
619 * "waiters". We must be careful not to mess with this array, because
620 * it could mean that a single event is being delivered twice.
623 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
625 if (new_waiters_len == msg_ctx->num_new_waiters) {
626 struct tevent_req **tmp;
628 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
629 struct tevent_req *, new_waiters_len+1);
630 if (tevent_req_nomem(tmp, req)) {
631 return tevent_req_post(req, ev);
633 msg_ctx->new_waiters = tmp;
636 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
637 msg_ctx->num_new_waiters += 1;
638 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
640 return req;
643 static void messaging_filtered_read_cleanup(struct tevent_req *req,
644 enum tevent_req_state req_state)
646 struct messaging_filtered_read_state *state = tevent_req_data(
647 req, struct messaging_filtered_read_state);
648 struct messaging_context *msg_ctx = state->msg_ctx;
649 unsigned i;
651 tevent_req_set_cleanup_fn(req, NULL);
653 TALLOC_FREE(state->tevent_handle);
656 * Just set the [new_]waiters entry to NULL, be careful not to mess
657 * with the other "waiters" array contents. We are often called from
658 * within "messaging_dispatch_rec", which loops over
659 * "waiters". Messing with the "waiters" array will mess up that
660 * for-loop.
663 for (i=0; i<msg_ctx->num_waiters; i++) {
664 if (msg_ctx->waiters[i] == req) {
665 msg_ctx->waiters[i] = NULL;
666 return;
670 for (i=0; i<msg_ctx->num_new_waiters; i++) {
671 if (msg_ctx->new_waiters[i] == req) {
672 msg_ctx->new_waiters[i] = NULL;
673 return;
678 static void messaging_filtered_read_done(struct tevent_req *req,
679 struct messaging_rec *rec)
681 struct messaging_filtered_read_state *state = tevent_req_data(
682 req, struct messaging_filtered_read_state);
684 state->rec = messaging_rec_dup(state, rec);
685 if (tevent_req_nomem(state->rec, req)) {
686 return;
688 tevent_req_done(req);
691 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
692 struct messaging_rec **presult)
694 struct messaging_filtered_read_state *state = tevent_req_data(
695 req, struct messaging_filtered_read_state);
696 int err;
698 if (tevent_req_is_unix_error(req, &err)) {
699 tevent_req_received(req);
700 return err;
702 *presult = talloc_move(mem_ctx, &state->rec);
703 return 0;
706 struct messaging_read_state {
707 uint32_t msg_type;
708 struct messaging_rec *rec;
711 static bool messaging_read_filter(struct messaging_rec *rec,
712 void *private_data);
713 static void messaging_read_done(struct tevent_req *subreq);
715 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
716 struct tevent_context *ev,
717 struct messaging_context *msg,
718 uint32_t msg_type)
720 struct tevent_req *req, *subreq;
721 struct messaging_read_state *state;
723 req = tevent_req_create(mem_ctx, &state,
724 struct messaging_read_state);
725 if (req == NULL) {
726 return NULL;
728 state->msg_type = msg_type;
730 subreq = messaging_filtered_read_send(state, ev, msg,
731 messaging_read_filter, state);
732 if (tevent_req_nomem(subreq, req)) {
733 return tevent_req_post(req, ev);
735 tevent_req_set_callback(subreq, messaging_read_done, req);
736 return req;
739 static bool messaging_read_filter(struct messaging_rec *rec,
740 void *private_data)
742 struct messaging_read_state *state = talloc_get_type_abort(
743 private_data, struct messaging_read_state);
745 if (rec->num_fds != 0) {
746 return false;
749 return rec->msg_type == state->msg_type;
752 static void messaging_read_done(struct tevent_req *subreq)
754 struct tevent_req *req = tevent_req_callback_data(
755 subreq, struct tevent_req);
756 struct messaging_read_state *state = tevent_req_data(
757 req, struct messaging_read_state);
758 int ret;
760 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
761 TALLOC_FREE(subreq);
762 if (tevent_req_error(req, ret)) {
763 return;
765 tevent_req_done(req);
768 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
769 struct messaging_rec **presult)
771 struct messaging_read_state *state = tevent_req_data(
772 req, struct messaging_read_state);
773 int err;
775 if (tevent_req_is_unix_error(req, &err)) {
776 return err;
778 if (presult != NULL) {
779 *presult = talloc_move(mem_ctx, &state->rec);
781 return 0;
784 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
786 if (msg_ctx->num_new_waiters == 0) {
787 return true;
790 if (talloc_array_length(msg_ctx->waiters) <
791 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
792 struct tevent_req **tmp;
793 tmp = talloc_realloc(
794 msg_ctx, msg_ctx->waiters, struct tevent_req *,
795 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
796 if (tmp == NULL) {
797 DEBUG(1, ("%s: talloc failed\n", __func__));
798 return false;
800 msg_ctx->waiters = tmp;
803 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
804 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
806 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
807 msg_ctx->num_new_waiters = 0;
809 return true;
812 struct messaging_defer_callback_state {
813 struct messaging_context *msg_ctx;
814 struct messaging_rec *rec;
815 void (*fn)(struct messaging_context *msg, void *private_data,
816 uint32_t msg_type, struct server_id server_id,
817 DATA_BLOB *data);
818 void *private_data;
821 static void messaging_defer_callback_trigger(struct tevent_context *ev,
822 struct tevent_immediate *im,
823 void *private_data);
825 static void messaging_defer_callback(
826 struct messaging_context *msg_ctx, struct messaging_rec *rec,
827 void (*fn)(struct messaging_context *msg, void *private_data,
828 uint32_t msg_type, struct server_id server_id,
829 DATA_BLOB *data),
830 void *private_data)
832 struct messaging_defer_callback_state *state;
833 struct tevent_immediate *im;
835 state = talloc(msg_ctx, struct messaging_defer_callback_state);
836 if (state == NULL) {
837 DEBUG(1, ("talloc failed\n"));
838 return;
840 state->msg_ctx = msg_ctx;
841 state->fn = fn;
842 state->private_data = private_data;
844 state->rec = messaging_rec_dup(state, rec);
845 if (state->rec == NULL) {
846 DEBUG(1, ("talloc failed\n"));
847 TALLOC_FREE(state);
848 return;
851 im = tevent_create_immediate(state);
852 if (im == NULL) {
853 DEBUG(1, ("tevent_create_immediate failed\n"));
854 TALLOC_FREE(state);
855 return;
857 tevent_schedule_immediate(im, msg_ctx->event_ctx,
858 messaging_defer_callback_trigger, state);
861 static void messaging_defer_callback_trigger(struct tevent_context *ev,
862 struct tevent_immediate *im,
863 void *private_data)
865 struct messaging_defer_callback_state *state = talloc_get_type_abort(
866 private_data, struct messaging_defer_callback_state);
867 struct messaging_rec *rec = state->rec;
869 state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
870 &rec->buf);
871 TALLOC_FREE(state);
875 Dispatch one messaging_rec
877 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
878 struct messaging_rec *rec)
880 struct messaging_callback *cb, *next;
881 unsigned i;
882 size_t j;
884 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
885 next = cb->next;
886 if (cb->msg_type != rec->msg_type) {
887 continue;
891 * the old style callbacks don't support fd passing
893 for (j=0; j < rec->num_fds; j++) {
894 int fd = rec->fds[j];
895 close(fd);
897 rec->num_fds = 0;
898 rec->fds = NULL;
900 if (server_id_same_process(&rec->src, &rec->dest)) {
902 * This is a self-send. We are called here from
903 * messaging_send(), and we don't want to directly
904 * recurse into the callback but go via a
905 * tevent_loop_once
907 messaging_defer_callback(msg_ctx, rec, cb->fn,
908 cb->private_data);
909 } else {
911 * This comes from a different process. we are called
912 * from the event loop, so we should call back
913 * directly.
915 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
916 rec->src, &rec->buf);
919 * we continue looking for matching messages after finding
920 * one. This matters for subsystems like the internal notify
921 * code which register more than one handler for the same
922 * message type
926 if (!messaging_append_new_waiters(msg_ctx)) {
927 for (j=0; j < rec->num_fds; j++) {
928 int fd = rec->fds[j];
929 close(fd);
931 rec->num_fds = 0;
932 rec->fds = NULL;
933 return;
936 i = 0;
937 while (i < msg_ctx->num_waiters) {
938 struct tevent_req *req;
939 struct messaging_filtered_read_state *state;
941 req = msg_ctx->waiters[i];
942 if (req == NULL) {
944 * This got cleaned up. In the meantime,
945 * move everything down one. We need
946 * to keep the order of waiters, as
947 * other code may depend on this.
949 if (i < msg_ctx->num_waiters - 1) {
950 memmove(&msg_ctx->waiters[i],
951 &msg_ctx->waiters[i+1],
952 sizeof(struct tevent_req *) *
953 (msg_ctx->num_waiters - i - 1));
955 msg_ctx->num_waiters -= 1;
956 continue;
959 state = tevent_req_data(
960 req, struct messaging_filtered_read_state);
961 if (state->filter(rec, state->private_data)) {
962 messaging_filtered_read_done(req, rec);
965 * Only the first one gets the fd-array
967 rec->num_fds = 0;
968 rec->fds = NULL;
971 i += 1;
975 * If the fd-array isn't used, just close it.
977 for (j=0; j < rec->num_fds; j++) {
978 int fd = rec->fds[j];
979 close(fd);
981 rec->num_fds = 0;
982 rec->fds = NULL;
985 static int mess_parent_dgm_cleanup(void *private_data);
986 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
988 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
990 struct tevent_req *req;
992 req = background_job_send(
993 msg, msg->event_ctx, msg, NULL, 0,
994 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
995 60*15),
996 mess_parent_dgm_cleanup, msg);
997 if (req == NULL) {
998 return false;
1000 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1001 return true;
1004 static int mess_parent_dgm_cleanup(void *private_data)
1006 int ret;
1008 ret = messaging_dgm_wipe();
1009 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1010 ret ? strerror(ret) : "ok"));
1011 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1012 60*15);
1015 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1017 struct messaging_context *msg = tevent_req_callback_data(
1018 req, struct messaging_context);
1019 NTSTATUS status;
1021 status = background_job_recv(req);
1022 TALLOC_FREE(req);
1023 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1024 nt_errstr(status)));
1026 req = background_job_send(
1027 msg, msg->event_ctx, msg, NULL, 0,
1028 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1029 60*15),
1030 mess_parent_dgm_cleanup, msg);
1031 if (req == NULL) {
1032 DEBUG(1, ("background_job_send failed\n"));
1034 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1037 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1039 int ret;
1041 if (pid == 0) {
1042 ret = messaging_dgm_wipe();
1043 } else {
1044 ret = messaging_dgm_cleanup(pid);
1047 return ret;
1050 struct tevent_context *messaging_tevent_context(
1051 struct messaging_context *msg_ctx)
1053 return msg_ctx->event_ctx;
1056 /** @} **/