Convert all uint32/16/8 to _t in source3/lib.
[Samba.git] / source3 / lib / messages.c
blob51e88e264192d19ac2971d3c8f5eb3a93b8e4acc
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
60 struct messaging_callback {
61 struct messaging_callback *prev, *next;
62 uint32_t msg_type;
63 void (*fn)(struct messaging_context *msg, void *private_data,
64 uint32_t msg_type,
65 struct server_id server_id, DATA_BLOB *data);
66 void *private_data;
69 struct messaging_context {
70 struct server_id id;
71 struct tevent_context *event_ctx;
72 struct messaging_callback *callbacks;
74 struct tevent_req **new_waiters;
75 unsigned num_new_waiters;
77 struct tevent_req **waiters;
78 unsigned num_waiters;
80 void *msg_dgm_ref;
81 struct messaging_backend *remote;
83 struct server_id_db *names_db;
86 /****************************************************************************
87 A useful function for testing the message system.
88 ****************************************************************************/
90 static void ping_message(struct messaging_context *msg_ctx,
91 void *private_data,
92 uint32_t msg_type,
93 struct server_id src,
94 DATA_BLOB *data)
96 struct server_id_buf idbuf;
98 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
99 server_id_str_buf(src, &idbuf), (int)data->length,
100 data->data ? (char *)data->data : ""));
102 messaging_send(msg_ctx, src, MSG_PONG, data);
105 /****************************************************************************
106 Register/replace a dispatch function for a particular message type.
107 JRA changed Dec 13 2006. Only one message handler now permitted per type.
108 *NOTE*: Dispatch functions must be able to cope with incoming
109 messages on an *odd* byte boundary.
110 ****************************************************************************/
112 struct msg_all {
113 struct messaging_context *msg_ctx;
114 int msg_type;
115 uint32_t msg_flag;
116 const void *buf;
117 size_t len;
118 int n_sent;
121 /****************************************************************************
122 Send one of the messages for the broadcast.
123 ****************************************************************************/
125 static int traverse_fn(struct db_record *rec, const struct server_id *id,
126 uint32_t msg_flags, void *state)
128 struct msg_all *msg_all = (struct msg_all *)state;
129 NTSTATUS status;
131 /* Don't send if the receiver hasn't registered an interest. */
133 if((msg_flags & msg_all->msg_flag) == 0) {
134 return 0;
137 /* If the msg send fails because the pid was not found (i.e. smbd died),
138 * the msg has already been deleted from the messages.tdb.*/
140 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
141 (const uint8_t *)msg_all->buf, msg_all->len);
143 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
144 struct server_id_buf idbuf;
147 * If the pid was not found delete the entry from
148 * serverid.tdb
151 DEBUG(2, ("pid %s doesn't exist\n",
152 server_id_str_buf(*id, &idbuf)));
154 dbwrap_record_delete(rec);
156 msg_all->n_sent++;
157 return 0;
161 * Send a message to all smbd processes.
163 * It isn't very efficient, but should be OK for the sorts of
164 * applications that use it. When we need efficient broadcast we can add
165 * it.
167 * @param n_sent Set to the number of messages sent. This should be
168 * equal to the number of processes, but be careful for races.
170 * @retval True for success.
172 bool message_send_all(struct messaging_context *msg_ctx,
173 int msg_type,
174 const void *buf, size_t len,
175 int *n_sent)
177 struct msg_all msg_all;
179 msg_all.msg_type = msg_type;
180 if (msg_type < 0x100) {
181 msg_all.msg_flag = FLAG_MSG_GENERAL;
182 } else if (msg_type > 0x100 && msg_type < 0x200) {
183 msg_all.msg_flag = FLAG_MSG_NMBD;
184 } else if (msg_type > 0x200 && msg_type < 0x300) {
185 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
186 } else if (msg_type > 0x300 && msg_type < 0x400) {
187 msg_all.msg_flag = FLAG_MSG_SMBD;
188 } else if (msg_type > 0x400 && msg_type < 0x600) {
189 msg_all.msg_flag = FLAG_MSG_WINBIND;
190 } else if (msg_type > 4000 && msg_type < 5000) {
191 msg_all.msg_flag = FLAG_MSG_DBWRAP;
192 } else {
193 return false;
196 msg_all.buf = buf;
197 msg_all.len = len;
198 msg_all.n_sent = 0;
199 msg_all.msg_ctx = msg_ctx;
201 serverid_traverse(traverse_fn, &msg_all);
202 if (n_sent)
203 *n_sent = msg_all.n_sent;
204 return true;
207 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
208 int *fds, size_t num_fds,
209 void *private_data)
211 struct messaging_context *msg_ctx = talloc_get_type_abort(
212 private_data, struct messaging_context);
213 uint8_t hdr[MESSAGE_HDR_LENGTH];
214 struct server_id_buf idbuf;
215 struct messaging_rec rec;
216 int64_t fds64[MIN(num_fds, INT8_MAX)];
217 size_t i;
219 if (msg_len < sizeof(hdr)) {
220 for (i=0; i < num_fds; i++) {
221 close(fds[i]);
223 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
224 return;
227 if (num_fds > INT8_MAX) {
228 for (i=0; i < num_fds; i++) {
229 close(fds[i]);
231 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
232 return;
236 * "consume" the fds by copying them and setting
237 * the original variable to -1
239 for (i=0; i < num_fds; i++) {
240 fds64[i] = fds[i];
241 fds[i] = -1;
244 rec = (struct messaging_rec) {
245 .msg_version = MESSAGE_VERSION,
246 .buf.data = discard_const_p(uint8_t, msg) + sizeof(hdr),
247 .buf.length = msg_len - sizeof(hdr),
248 .num_fds = num_fds,
249 .fds = fds64,
252 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
254 DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
255 __func__, (unsigned)rec.msg_type,
256 (unsigned)rec.buf.length,
257 (unsigned)num_fds,
258 server_id_str_buf(rec.src, &idbuf)));
260 messaging_dispatch_rec(msg_ctx, &rec);
263 static int messaging_context_destructor(struct messaging_context *ctx)
265 unsigned i;
267 messaging_dgm_destroy();
269 for (i=0; i<ctx->num_new_waiters; i++) {
270 if (ctx->new_waiters[i] != NULL) {
271 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
272 ctx->new_waiters[i] = NULL;
275 for (i=0; i<ctx->num_waiters; i++) {
276 if (ctx->waiters[i] != NULL) {
277 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
278 ctx->waiters[i] = NULL;
282 return 0;
285 static const char *private_path(const char *name)
287 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
290 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
291 struct tevent_context *ev)
293 struct messaging_context *ctx;
294 NTSTATUS status;
295 int ret;
296 const char *lck_path;
297 const char *priv_path;
298 bool ok;
300 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
301 return NULL;
304 ctx->id = procid_self();
305 ctx->event_ctx = ev;
307 sec_init();
309 lck_path = lock_path("msg");
310 if (lck_path == NULL) {
311 TALLOC_FREE(ctx);
312 return NULL;
315 ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
316 0755);
317 if (!ok) {
318 DEBUG(10, ("%s: Could not create lock directory: %s\n",
319 __func__, strerror(errno)));
320 TALLOC_FREE(ctx);
321 return NULL;
324 priv_path = private_path("sock");
326 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
327 0700);
328 if (!ok) {
329 DEBUG(10, ("%s: Could not create msg directory: %s\n",
330 __func__, strerror(errno)));
331 TALLOC_FREE(ctx);
332 return NULL;
335 ctx->msg_dgm_ref = messaging_dgm_ref(
336 ctx, ctx->event_ctx, ctx->id.unique_id,
337 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
339 if (ctx->msg_dgm_ref == NULL) {
340 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
341 TALLOC_FREE(ctx);
342 return NULL;
345 ctx->names_db = server_id_db_init(
346 ctx, ctx->id, lp_lock_directory(), 0,
347 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
348 if (ctx->names_db == NULL) {
349 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
350 TALLOC_FREE(ctx);
351 return NULL;
354 talloc_set_destructor(ctx, messaging_context_destructor);
356 if (lp_clustering()) {
357 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
359 if (!NT_STATUS_IS_OK(status)) {
360 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
361 nt_errstr(status)));
362 TALLOC_FREE(ctx);
363 return NULL;
366 ctx->id.vnn = get_my_vnn();
368 messaging_register(ctx, NULL, MSG_PING, ping_message);
370 /* Register some debugging related messages */
372 register_msg_pool_usage(ctx);
373 register_dmalloc_msgs(ctx);
374 debug_register_msgs(ctx);
376 return ctx;
379 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
381 return msg_ctx->id;
385 * re-init after a fork
387 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
389 NTSTATUS status;
390 int ret;
392 TALLOC_FREE(msg_ctx->msg_dgm_ref);
394 msg_ctx->id = procid_self();
396 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
397 msg_ctx, msg_ctx->event_ctx, msg_ctx->id.unique_id,
398 private_path("sock"), lock_path("msg"),
399 messaging_recv_cb, msg_ctx, &ret);
401 if (msg_ctx->msg_dgm_ref == NULL) {
402 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
403 return map_nt_error_from_unix(ret);
406 TALLOC_FREE(msg_ctx->remote);
408 if (lp_clustering()) {
409 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
410 &msg_ctx->remote);
412 if (!NT_STATUS_IS_OK(status)) {
413 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
414 nt_errstr(status)));
415 return status;
419 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
421 return NT_STATUS_OK;
426 * Register a dispatch function for a particular message type. Allow multiple
427 * registrants
429 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
430 void *private_data,
431 uint32_t msg_type,
432 void (*fn)(struct messaging_context *msg,
433 void *private_data,
434 uint32_t msg_type,
435 struct server_id server_id,
436 DATA_BLOB *data))
438 struct messaging_callback *cb;
440 DEBUG(5, ("Registering messaging pointer for type %u - "
441 "private_data=%p\n",
442 (unsigned)msg_type, private_data));
445 * Only one callback per type
448 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
449 /* we allow a second registration of the same message
450 type if it has a different private pointer. This is
451 needed in, for example, the internal notify code,
452 which creates a new notify context for each tree
453 connect, and expects to receive messages to each of
454 them. */
455 if (cb->msg_type == msg_type && private_data == cb->private_data) {
456 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
457 (unsigned)msg_type, private_data));
458 cb->fn = fn;
459 cb->private_data = private_data;
460 return NT_STATUS_OK;
464 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
465 return NT_STATUS_NO_MEMORY;
468 cb->msg_type = msg_type;
469 cb->fn = fn;
470 cb->private_data = private_data;
472 DLIST_ADD(msg_ctx->callbacks, cb);
473 return NT_STATUS_OK;
477 De-register the function for a particular message type.
479 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
480 void *private_data)
482 struct messaging_callback *cb, *next;
484 for (cb = ctx->callbacks; cb; cb = next) {
485 next = cb->next;
486 if ((cb->msg_type == msg_type)
487 && (cb->private_data == private_data)) {
488 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
489 (unsigned)msg_type, private_data));
490 DLIST_REMOVE(ctx->callbacks, cb);
491 TALLOC_FREE(cb);
497 Send a message to a particular server
499 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
500 struct server_id server, uint32_t msg_type,
501 const DATA_BLOB *data)
503 struct iovec iov;
505 iov.iov_base = data->data;
506 iov.iov_len = data->length;
508 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
511 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
512 struct server_id server, uint32_t msg_type,
513 const uint8_t *buf, size_t len)
515 DATA_BLOB blob = data_blob_const(buf, len);
516 return messaging_send(msg_ctx, server, msg_type, &blob);
519 NTSTATUS messaging_send_iov_from(struct messaging_context *msg_ctx,
520 struct server_id src, struct server_id dst,
521 uint32_t msg_type,
522 const struct iovec *iov, int iovlen,
523 const int *fds, size_t num_fds)
525 int ret;
526 uint8_t hdr[MESSAGE_HDR_LENGTH];
527 struct iovec iov2[iovlen+1];
529 if (server_id_is_disconnected(&dst)) {
530 return NT_STATUS_INVALID_PARAMETER_MIX;
533 if (num_fds > INT8_MAX) {
534 return NT_STATUS_INVALID_PARAMETER_MIX;
537 if (!procid_is_local(&dst)) {
538 if (num_fds > 0) {
539 return NT_STATUS_NOT_SUPPORTED;
542 ret = msg_ctx->remote->send_fn(src, dst,
543 msg_type, iov, iovlen,
544 NULL, 0,
545 msg_ctx->remote);
546 if (ret != 0) {
547 return map_nt_error_from_unix(ret);
549 return NT_STATUS_OK;
552 message_hdr_put(hdr, msg_type, src, dst);
553 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
554 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
556 become_root();
557 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
558 unbecome_root();
560 if (ret != 0) {
561 return map_nt_error_from_unix(ret);
563 return NT_STATUS_OK;
566 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
567 struct server_id dst, uint32_t msg_type,
568 const struct iovec *iov, int iovlen,
569 const int *fds, size_t num_fds)
571 return messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
572 iov, iovlen, fds, num_fds);
575 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
576 struct messaging_rec *rec)
578 struct messaging_rec *result;
579 size_t fds_size = sizeof(int64_t) * rec->num_fds;
581 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
582 rec->buf.length + fds_size);
583 if (result == NULL) {
584 return NULL;
586 *result = *rec;
588 /* Doesn't fail, see talloc_pooled_object */
590 result->buf.data = talloc_memdup(result, rec->buf.data,
591 rec->buf.length);
593 result->fds = NULL;
594 if (result->num_fds > 0) {
595 result->fds = talloc_memdup(result, rec->fds, fds_size);
598 return result;
601 struct messaging_filtered_read_state {
602 struct tevent_context *ev;
603 struct messaging_context *msg_ctx;
604 void *tevent_handle;
606 bool (*filter)(struct messaging_rec *rec, void *private_data);
607 void *private_data;
609 struct messaging_rec *rec;
612 static void messaging_filtered_read_cleanup(struct tevent_req *req,
613 enum tevent_req_state req_state);
615 struct tevent_req *messaging_filtered_read_send(
616 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
617 struct messaging_context *msg_ctx,
618 bool (*filter)(struct messaging_rec *rec, void *private_data),
619 void *private_data)
621 struct tevent_req *req;
622 struct messaging_filtered_read_state *state;
623 size_t new_waiters_len;
625 req = tevent_req_create(mem_ctx, &state,
626 struct messaging_filtered_read_state);
627 if (req == NULL) {
628 return NULL;
630 state->ev = ev;
631 state->msg_ctx = msg_ctx;
632 state->filter = filter;
633 state->private_data = private_data;
636 * We have to defer the callback here, as we might be called from
637 * within a different tevent_context than state->ev
639 tevent_req_defer_callback(req, state->ev);
641 state->tevent_handle = messaging_dgm_register_tevent_context(
642 state, ev);
643 if (tevent_req_nomem(state, req)) {
644 return tevent_req_post(req, ev);
648 * We add ourselves to the "new_waiters" array, not the "waiters"
649 * array. If we are called from within messaging_read_done,
650 * messaging_dispatch_rec will be in an active for-loop on
651 * "waiters". We must be careful not to mess with this array, because
652 * it could mean that a single event is being delivered twice.
655 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
657 if (new_waiters_len == msg_ctx->num_new_waiters) {
658 struct tevent_req **tmp;
660 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
661 struct tevent_req *, new_waiters_len+1);
662 if (tevent_req_nomem(tmp, req)) {
663 return tevent_req_post(req, ev);
665 msg_ctx->new_waiters = tmp;
668 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
669 msg_ctx->num_new_waiters += 1;
670 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
672 return req;
675 static void messaging_filtered_read_cleanup(struct tevent_req *req,
676 enum tevent_req_state req_state)
678 struct messaging_filtered_read_state *state = tevent_req_data(
679 req, struct messaging_filtered_read_state);
680 struct messaging_context *msg_ctx = state->msg_ctx;
681 unsigned i;
683 tevent_req_set_cleanup_fn(req, NULL);
685 TALLOC_FREE(state->tevent_handle);
688 * Just set the [new_]waiters entry to NULL, be careful not to mess
689 * with the other "waiters" array contents. We are often called from
690 * within "messaging_dispatch_rec", which loops over
691 * "waiters". Messing with the "waiters" array will mess up that
692 * for-loop.
695 for (i=0; i<msg_ctx->num_waiters; i++) {
696 if (msg_ctx->waiters[i] == req) {
697 msg_ctx->waiters[i] = NULL;
698 return;
702 for (i=0; i<msg_ctx->num_new_waiters; i++) {
703 if (msg_ctx->new_waiters[i] == req) {
704 msg_ctx->new_waiters[i] = NULL;
705 return;
710 static void messaging_filtered_read_done(struct tevent_req *req,
711 struct messaging_rec *rec)
713 struct messaging_filtered_read_state *state = tevent_req_data(
714 req, struct messaging_filtered_read_state);
716 state->rec = messaging_rec_dup(state, rec);
717 if (tevent_req_nomem(state->rec, req)) {
718 return;
720 tevent_req_done(req);
723 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
724 struct messaging_rec **presult)
726 struct messaging_filtered_read_state *state = tevent_req_data(
727 req, struct messaging_filtered_read_state);
728 int err;
730 if (tevent_req_is_unix_error(req, &err)) {
731 tevent_req_received(req);
732 return err;
734 *presult = talloc_move(mem_ctx, &state->rec);
735 return 0;
738 struct messaging_read_state {
739 uint32_t msg_type;
740 struct messaging_rec *rec;
743 static bool messaging_read_filter(struct messaging_rec *rec,
744 void *private_data);
745 static void messaging_read_done(struct tevent_req *subreq);
747 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
748 struct tevent_context *ev,
749 struct messaging_context *msg,
750 uint32_t msg_type)
752 struct tevent_req *req, *subreq;
753 struct messaging_read_state *state;
755 req = tevent_req_create(mem_ctx, &state,
756 struct messaging_read_state);
757 if (req == NULL) {
758 return NULL;
760 state->msg_type = msg_type;
762 subreq = messaging_filtered_read_send(state, ev, msg,
763 messaging_read_filter, state);
764 if (tevent_req_nomem(subreq, req)) {
765 return tevent_req_post(req, ev);
767 tevent_req_set_callback(subreq, messaging_read_done, req);
768 return req;
771 static bool messaging_read_filter(struct messaging_rec *rec,
772 void *private_data)
774 struct messaging_read_state *state = talloc_get_type_abort(
775 private_data, struct messaging_read_state);
777 if (rec->num_fds != 0) {
778 return false;
781 return rec->msg_type == state->msg_type;
784 static void messaging_read_done(struct tevent_req *subreq)
786 struct tevent_req *req = tevent_req_callback_data(
787 subreq, struct tevent_req);
788 struct messaging_read_state *state = tevent_req_data(
789 req, struct messaging_read_state);
790 int ret;
792 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
793 TALLOC_FREE(subreq);
794 if (tevent_req_error(req, ret)) {
795 return;
797 tevent_req_done(req);
800 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
801 struct messaging_rec **presult)
803 struct messaging_read_state *state = tevent_req_data(
804 req, struct messaging_read_state);
805 int err;
807 if (tevent_req_is_unix_error(req, &err)) {
808 return err;
810 if (presult != NULL) {
811 *presult = talloc_move(mem_ctx, &state->rec);
813 return 0;
816 struct messaging_handler_state {
817 struct tevent_context *ev;
818 struct messaging_context *msg_ctx;
819 uint32_t msg_type;
820 bool (*handler)(struct messaging_context *msg_ctx,
821 struct messaging_rec **rec, void *private_data);
822 void *private_data;
825 static void messaging_handler_got_msg(struct tevent_req *subreq);
827 struct tevent_req *messaging_handler_send(
828 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
829 struct messaging_context *msg_ctx, uint32_t msg_type,
830 bool (*handler)(struct messaging_context *msg_ctx,
831 struct messaging_rec **rec, void *private_data),
832 void *private_data)
834 struct tevent_req *req, *subreq;
835 struct messaging_handler_state *state;
837 req = tevent_req_create(mem_ctx, &state,
838 struct messaging_handler_state);
839 if (req == NULL) {
840 return NULL;
842 state->ev = ev;
843 state->msg_ctx = msg_ctx;
844 state->msg_type = msg_type;
845 state->handler = handler;
846 state->private_data = private_data;
848 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
849 state->msg_type);
850 if (tevent_req_nomem(subreq, req)) {
851 return tevent_req_post(req, ev);
853 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
854 return req;
857 static void messaging_handler_got_msg(struct tevent_req *subreq)
859 struct tevent_req *req = tevent_req_callback_data(
860 subreq, struct tevent_req);
861 struct messaging_handler_state *state = tevent_req_data(
862 req, struct messaging_handler_state);
863 struct messaging_rec *rec;
864 int ret;
865 bool ok;
867 ret = messaging_read_recv(subreq, state, &rec);
868 TALLOC_FREE(subreq);
869 if (tevent_req_error(req, ret)) {
870 return;
873 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
874 state->msg_type);
875 if (tevent_req_nomem(subreq, req)) {
876 return;
878 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
880 ok = state->handler(state->msg_ctx, &rec, state->private_data);
881 TALLOC_FREE(rec);
882 if (ok) {
884 * Next round
886 return;
888 TALLOC_FREE(subreq);
889 tevent_req_done(req);
892 int messaging_handler_recv(struct tevent_req *req)
894 return tevent_req_simple_recv_unix(req);
897 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
899 if (msg_ctx->num_new_waiters == 0) {
900 return true;
903 if (talloc_array_length(msg_ctx->waiters) <
904 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
905 struct tevent_req **tmp;
906 tmp = talloc_realloc(
907 msg_ctx, msg_ctx->waiters, struct tevent_req *,
908 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
909 if (tmp == NULL) {
910 DEBUG(1, ("%s: talloc failed\n", __func__));
911 return false;
913 msg_ctx->waiters = tmp;
916 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
917 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
919 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
920 msg_ctx->num_new_waiters = 0;
922 return true;
926 Dispatch one messaging_rec
928 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
929 struct messaging_rec *rec)
931 struct messaging_callback *cb, *next;
932 unsigned i;
933 size_t j;
935 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
936 next = cb->next;
937 if (cb->msg_type != rec->msg_type) {
938 continue;
942 * the old style callbacks don't support fd passing
944 for (j=0; j < rec->num_fds; j++) {
945 int fd = rec->fds[j];
946 close(fd);
948 rec->num_fds = 0;
949 rec->fds = NULL;
951 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
952 rec->src, &rec->buf);
955 * we continue looking for matching messages after finding
956 * one. This matters for subsystems like the internal notify
957 * code which register more than one handler for the same
958 * message type
962 if (!messaging_append_new_waiters(msg_ctx)) {
963 for (j=0; j < rec->num_fds; j++) {
964 int fd = rec->fds[j];
965 close(fd);
967 rec->num_fds = 0;
968 rec->fds = NULL;
969 return;
972 i = 0;
973 while (i < msg_ctx->num_waiters) {
974 struct tevent_req *req;
975 struct messaging_filtered_read_state *state;
977 req = msg_ctx->waiters[i];
978 if (req == NULL) {
980 * This got cleaned up. In the meantime,
981 * move everything down one. We need
982 * to keep the order of waiters, as
983 * other code may depend on this.
985 if (i < msg_ctx->num_waiters - 1) {
986 memmove(&msg_ctx->waiters[i],
987 &msg_ctx->waiters[i+1],
988 sizeof(struct tevent_req *) *
989 (msg_ctx->num_waiters - i - 1));
991 msg_ctx->num_waiters -= 1;
992 continue;
995 state = tevent_req_data(
996 req, struct messaging_filtered_read_state);
997 if (state->filter(rec, state->private_data)) {
998 messaging_filtered_read_done(req, rec);
1001 * Only the first one gets the fd-array
1003 rec->num_fds = 0;
1004 rec->fds = NULL;
1007 i += 1;
1011 * If the fd-array isn't used, just close it.
1013 for (j=0; j < rec->num_fds; j++) {
1014 int fd = rec->fds[j];
1015 close(fd);
1017 rec->num_fds = 0;
1018 rec->fds = NULL;
1021 static int mess_parent_dgm_cleanup(void *private_data);
1022 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1024 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1026 struct tevent_req *req;
1028 req = background_job_send(
1029 msg, msg->event_ctx, msg, NULL, 0,
1030 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1031 60*15),
1032 mess_parent_dgm_cleanup, msg);
1033 if (req == NULL) {
1034 return false;
1036 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1037 return true;
1040 static int mess_parent_dgm_cleanup(void *private_data)
1042 int ret;
1044 ret = messaging_dgm_wipe();
1045 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1046 ret ? strerror(ret) : "ok"));
1047 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1048 60*15);
1051 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1053 struct messaging_context *msg = tevent_req_callback_data(
1054 req, struct messaging_context);
1055 NTSTATUS status;
1057 status = background_job_recv(req);
1058 TALLOC_FREE(req);
1059 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1060 nt_errstr(status)));
1062 req = background_job_send(
1063 msg, msg->event_ctx, msg, NULL, 0,
1064 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1065 60*15),
1066 mess_parent_dgm_cleanup, msg);
1067 if (req == NULL) {
1068 DEBUG(1, ("background_job_send failed\n"));
1069 return;
1071 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1074 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1076 int ret;
1078 if (pid == 0) {
1079 ret = messaging_dgm_wipe();
1080 } else {
1081 ret = messaging_dgm_cleanup(pid);
1084 return ret;
1087 struct tevent_context *messaging_tevent_context(
1088 struct messaging_context *msg_ctx)
1090 return msg_ctx->event_ctx;
1093 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1095 return msg_ctx->names_db;
1098 /** @} **/