CVE-2016-2118: s3:rpc_server/{samr,lsa,netlogon}: reject DCERPC_AUTH_LEVEL_CONNECT...
[Samba.git] / source3 / lib / messages.c
blob07d1c83717c3ea0f319a1d6a38a2909de1d2c0c8
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
60 struct messaging_callback {
61 struct messaging_callback *prev, *next;
62 uint32_t msg_type;
63 void (*fn)(struct messaging_context *msg, void *private_data,
64 uint32_t msg_type,
65 struct server_id server_id, DATA_BLOB *data);
66 void *private_data;
69 struct messaging_context {
70 struct server_id id;
71 struct tevent_context *event_ctx;
72 struct messaging_callback *callbacks;
74 struct tevent_req **new_waiters;
75 unsigned num_new_waiters;
77 struct tevent_req **waiters;
78 unsigned num_waiters;
80 void *msg_dgm_ref;
81 struct messaging_backend *remote;
83 struct server_id_db *names_db;
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87 struct messaging_rec *rec);
89 /****************************************************************************
90 A useful function for testing the message system.
91 ****************************************************************************/
93 static void ping_message(struct messaging_context *msg_ctx,
94 void *private_data,
95 uint32_t msg_type,
96 struct server_id src,
97 DATA_BLOB *data)
99 struct server_id_buf idbuf;
101 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102 server_id_str_buf(src, &idbuf), (int)data->length,
103 data->data ? (char *)data->data : ""));
105 messaging_send(msg_ctx, src, MSG_PONG, data);
108 /****************************************************************************
109 Register/replace a dispatch function for a particular message type.
110 JRA changed Dec 13 2006. Only one message handler now permitted per type.
111 *NOTE*: Dispatch functions must be able to cope with incoming
112 messages on an *odd* byte boundary.
113 ****************************************************************************/
115 struct msg_all {
116 struct messaging_context *msg_ctx;
117 int msg_type;
118 uint32_t msg_flag;
119 const void *buf;
120 size_t len;
121 int n_sent;
124 /****************************************************************************
125 Send one of the messages for the broadcast.
126 ****************************************************************************/
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129 uint32_t msg_flags, void *state)
131 struct msg_all *msg_all = (struct msg_all *)state;
132 NTSTATUS status;
134 /* Don't send if the receiver hasn't registered an interest. */
136 if((msg_flags & msg_all->msg_flag) == 0) {
137 return 0;
140 /* If the msg send fails because the pid was not found (i.e. smbd died),
141 * the msg has already been deleted from the messages.tdb.*/
143 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144 (const uint8_t *)msg_all->buf, msg_all->len);
146 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147 struct server_id_buf idbuf;
150 * If the pid was not found delete the entry from
151 * serverid.tdb
154 DEBUG(2, ("pid %s doesn't exist\n",
155 server_id_str_buf(*id, &idbuf)));
157 dbwrap_record_delete(rec);
159 msg_all->n_sent++;
160 return 0;
164 * Send a message to all smbd processes.
166 * It isn't very efficient, but should be OK for the sorts of
167 * applications that use it. When we need efficient broadcast we can add
168 * it.
170 * @param n_sent Set to the number of messages sent. This should be
171 * equal to the number of processes, but be careful for races.
173 * @retval True for success.
175 bool message_send_all(struct messaging_context *msg_ctx,
176 int msg_type,
177 const void *buf, size_t len,
178 int *n_sent)
180 struct msg_all msg_all;
182 msg_all.msg_type = msg_type;
183 if (msg_type < 0x100) {
184 msg_all.msg_flag = FLAG_MSG_GENERAL;
185 } else if (msg_type > 0x100 && msg_type < 0x200) {
186 msg_all.msg_flag = FLAG_MSG_NMBD;
187 } else if (msg_type > 0x200 && msg_type < 0x300) {
188 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189 } else if (msg_type > 0x300 && msg_type < 0x400) {
190 msg_all.msg_flag = FLAG_MSG_SMBD;
191 } else if (msg_type > 0x400 && msg_type < 0x600) {
192 msg_all.msg_flag = FLAG_MSG_WINBIND;
193 } else if (msg_type > 4000 && msg_type < 5000) {
194 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195 } else {
196 return false;
199 msg_all.buf = buf;
200 msg_all.len = len;
201 msg_all.n_sent = 0;
202 msg_all.msg_ctx = msg_ctx;
204 serverid_traverse(traverse_fn, &msg_all);
205 if (n_sent)
206 *n_sent = msg_all.n_sent;
207 return true;
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211 int *fds, size_t num_fds,
212 void *private_data)
214 struct messaging_context *msg_ctx = talloc_get_type_abort(
215 private_data, struct messaging_context);
216 struct server_id_buf idbuf;
217 struct messaging_rec rec;
218 int64_t fds64[MIN(num_fds, INT8_MAX)];
219 size_t i;
221 if (msg_len < MESSAGE_HDR_LENGTH) {
222 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223 goto close_fail;
226 if (num_fds > INT8_MAX) {
227 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
228 goto close_fail;
232 * "consume" the fds by copying them and setting
233 * the original variable to -1
235 for (i=0; i < num_fds; i++) {
236 fds64[i] = fds[i];
237 fds[i] = -1;
240 rec = (struct messaging_rec) {
241 .msg_version = MESSAGE_VERSION,
242 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
243 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
244 .num_fds = num_fds,
245 .fds = fds64,
248 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
250 DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
251 __func__, (unsigned)rec.msg_type,
252 (unsigned)rec.buf.length,
253 (unsigned)num_fds,
254 server_id_str_buf(rec.src, &idbuf)));
256 messaging_dispatch_rec(msg_ctx, &rec);
257 return;
259 close_fail:
260 for (i=0; i < num_fds; i++) {
261 close(fds[i]);
265 static int messaging_context_destructor(struct messaging_context *ctx)
267 unsigned i;
269 for (i=0; i<ctx->num_new_waiters; i++) {
270 if (ctx->new_waiters[i] != NULL) {
271 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
272 ctx->new_waiters[i] = NULL;
275 for (i=0; i<ctx->num_waiters; i++) {
276 if (ctx->waiters[i] != NULL) {
277 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
278 ctx->waiters[i] = NULL;
282 return 0;
285 static const char *private_path(const char *name)
287 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
290 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
291 struct tevent_context *ev)
293 struct messaging_context *ctx;
294 NTSTATUS status;
295 int ret;
296 const char *lck_path;
297 const char *priv_path;
298 bool ok;
300 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
301 return NULL;
304 ctx->id = procid_self();
305 ctx->event_ctx = ev;
307 sec_init();
309 lck_path = lock_path("msg.lock");
310 if (lck_path == NULL) {
311 TALLOC_FREE(ctx);
312 return NULL;
315 ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
316 0755);
317 if (!ok) {
318 DEBUG(10, ("%s: Could not create lock directory: %s\n",
319 __func__, strerror(errno)));
320 TALLOC_FREE(ctx);
321 return NULL;
324 priv_path = private_path("msg.sock");
325 if (priv_path == NULL) {
326 TALLOC_FREE(ctx);
327 return NULL;
330 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
331 0700);
332 if (!ok) {
333 DEBUG(10, ("%s: Could not create msg directory: %s\n",
334 __func__, strerror(errno)));
335 TALLOC_FREE(ctx);
336 return NULL;
339 ctx->msg_dgm_ref = messaging_dgm_ref(
340 ctx, ctx->event_ctx, ctx->id.unique_id,
341 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
343 if (ctx->msg_dgm_ref == NULL) {
344 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
345 TALLOC_FREE(ctx);
346 return NULL;
349 talloc_set_destructor(ctx, messaging_context_destructor);
351 if (lp_clustering()) {
352 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
354 if (!NT_STATUS_IS_OK(status)) {
355 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
356 nt_errstr(status)));
357 TALLOC_FREE(ctx);
358 return NULL;
361 ctx->id.vnn = get_my_vnn();
363 ctx->names_db = server_id_db_init(
364 ctx, ctx->id, lp_lock_directory(), 0,
365 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
366 if (ctx->names_db == NULL) {
367 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
368 TALLOC_FREE(ctx);
369 return NULL;
372 messaging_register(ctx, NULL, MSG_PING, ping_message);
374 /* Register some debugging related messages */
376 register_msg_pool_usage(ctx);
377 register_dmalloc_msgs(ctx);
378 debug_register_msgs(ctx);
380 return ctx;
383 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
385 return msg_ctx->id;
389 * re-init after a fork
391 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
393 NTSTATUS status;
394 int ret;
396 TALLOC_FREE(msg_ctx->msg_dgm_ref);
398 msg_ctx->id = procid_self();
400 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
401 msg_ctx, msg_ctx->event_ctx, msg_ctx->id.unique_id,
402 private_path("msg.sock"), lock_path("msg.lock"),
403 messaging_recv_cb, msg_ctx, &ret);
405 if (msg_ctx->msg_dgm_ref == NULL) {
406 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
407 return map_nt_error_from_unix(ret);
410 TALLOC_FREE(msg_ctx->remote);
412 if (lp_clustering()) {
413 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
414 &msg_ctx->remote);
416 if (!NT_STATUS_IS_OK(status)) {
417 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
418 nt_errstr(status)));
419 return status;
423 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
425 return NT_STATUS_OK;
430 * Register a dispatch function for a particular message type. Allow multiple
431 * registrants
433 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
434 void *private_data,
435 uint32_t msg_type,
436 void (*fn)(struct messaging_context *msg,
437 void *private_data,
438 uint32_t msg_type,
439 struct server_id server_id,
440 DATA_BLOB *data))
442 struct messaging_callback *cb;
444 DEBUG(5, ("Registering messaging pointer for type %u - "
445 "private_data=%p\n",
446 (unsigned)msg_type, private_data));
449 * Only one callback per type
452 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
453 /* we allow a second registration of the same message
454 type if it has a different private pointer. This is
455 needed in, for example, the internal notify code,
456 which creates a new notify context for each tree
457 connect, and expects to receive messages to each of
458 them. */
459 if (cb->msg_type == msg_type && private_data == cb->private_data) {
460 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
461 (unsigned)msg_type, private_data));
462 cb->fn = fn;
463 cb->private_data = private_data;
464 return NT_STATUS_OK;
468 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
469 return NT_STATUS_NO_MEMORY;
472 cb->msg_type = msg_type;
473 cb->fn = fn;
474 cb->private_data = private_data;
476 DLIST_ADD(msg_ctx->callbacks, cb);
477 return NT_STATUS_OK;
481 De-register the function for a particular message type.
483 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
484 void *private_data)
486 struct messaging_callback *cb, *next;
488 for (cb = ctx->callbacks; cb; cb = next) {
489 next = cb->next;
490 if ((cb->msg_type == msg_type)
491 && (cb->private_data == private_data)) {
492 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
493 (unsigned)msg_type, private_data));
494 DLIST_REMOVE(ctx->callbacks, cb);
495 TALLOC_FREE(cb);
501 Send a message to a particular server
503 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
504 struct server_id server, uint32_t msg_type,
505 const DATA_BLOB *data)
507 struct iovec iov;
509 iov.iov_base = data->data;
510 iov.iov_len = data->length;
512 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
515 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
516 struct server_id server, uint32_t msg_type,
517 const uint8_t *buf, size_t len)
519 DATA_BLOB blob = data_blob_const(buf, len);
520 return messaging_send(msg_ctx, server, msg_type, &blob);
523 NTSTATUS messaging_send_iov_from(struct messaging_context *msg_ctx,
524 struct server_id src, struct server_id dst,
525 uint32_t msg_type,
526 const struct iovec *iov, int iovlen,
527 const int *fds, size_t num_fds)
529 int ret;
530 uint8_t hdr[MESSAGE_HDR_LENGTH];
531 struct iovec iov2[iovlen+1];
533 if (server_id_is_disconnected(&dst)) {
534 return NT_STATUS_INVALID_PARAMETER_MIX;
537 if (num_fds > INT8_MAX) {
538 return NT_STATUS_INVALID_PARAMETER_MIX;
541 if (!procid_is_local(&dst)) {
542 if (num_fds > 0) {
543 return NT_STATUS_NOT_SUPPORTED;
546 ret = msg_ctx->remote->send_fn(src, dst,
547 msg_type, iov, iovlen,
548 NULL, 0,
549 msg_ctx->remote);
550 if (ret != 0) {
551 return map_nt_error_from_unix(ret);
553 return NT_STATUS_OK;
556 message_hdr_put(hdr, msg_type, src, dst);
557 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
558 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
560 become_root();
561 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
562 unbecome_root();
564 if (ret != 0) {
565 return map_nt_error_from_unix(ret);
567 return NT_STATUS_OK;
570 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
571 struct server_id dst, uint32_t msg_type,
572 const struct iovec *iov, int iovlen,
573 const int *fds, size_t num_fds)
575 return messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
576 iov, iovlen, fds, num_fds);
579 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
580 struct messaging_rec *rec)
582 struct messaging_rec *result;
583 size_t fds_size = sizeof(int64_t) * rec->num_fds;
585 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
586 rec->buf.length + fds_size);
587 if (result == NULL) {
588 return NULL;
590 *result = *rec;
592 /* Doesn't fail, see talloc_pooled_object */
594 result->buf.data = talloc_memdup(result, rec->buf.data,
595 rec->buf.length);
597 result->fds = NULL;
598 if (result->num_fds > 0) {
599 result->fds = talloc_memdup(result, rec->fds, fds_size);
602 return result;
605 struct messaging_filtered_read_state {
606 struct tevent_context *ev;
607 struct messaging_context *msg_ctx;
608 void *tevent_handle;
610 bool (*filter)(struct messaging_rec *rec, void *private_data);
611 void *private_data;
613 struct messaging_rec *rec;
616 static void messaging_filtered_read_cleanup(struct tevent_req *req,
617 enum tevent_req_state req_state);
619 struct tevent_req *messaging_filtered_read_send(
620 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
621 struct messaging_context *msg_ctx,
622 bool (*filter)(struct messaging_rec *rec, void *private_data),
623 void *private_data)
625 struct tevent_req *req;
626 struct messaging_filtered_read_state *state;
627 size_t new_waiters_len;
629 req = tevent_req_create(mem_ctx, &state,
630 struct messaging_filtered_read_state);
631 if (req == NULL) {
632 return NULL;
634 state->ev = ev;
635 state->msg_ctx = msg_ctx;
636 state->filter = filter;
637 state->private_data = private_data;
640 * We have to defer the callback here, as we might be called from
641 * within a different tevent_context than state->ev
643 tevent_req_defer_callback(req, state->ev);
645 state->tevent_handle = messaging_dgm_register_tevent_context(
646 state, ev);
647 if (tevent_req_nomem(state, req)) {
648 return tevent_req_post(req, ev);
652 * We add ourselves to the "new_waiters" array, not the "waiters"
653 * array. If we are called from within messaging_read_done,
654 * messaging_dispatch_rec will be in an active for-loop on
655 * "waiters". We must be careful not to mess with this array, because
656 * it could mean that a single event is being delivered twice.
659 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
661 if (new_waiters_len == msg_ctx->num_new_waiters) {
662 struct tevent_req **tmp;
664 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
665 struct tevent_req *, new_waiters_len+1);
666 if (tevent_req_nomem(tmp, req)) {
667 return tevent_req_post(req, ev);
669 msg_ctx->new_waiters = tmp;
672 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
673 msg_ctx->num_new_waiters += 1;
674 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
676 return req;
679 static void messaging_filtered_read_cleanup(struct tevent_req *req,
680 enum tevent_req_state req_state)
682 struct messaging_filtered_read_state *state = tevent_req_data(
683 req, struct messaging_filtered_read_state);
684 struct messaging_context *msg_ctx = state->msg_ctx;
685 unsigned i;
687 tevent_req_set_cleanup_fn(req, NULL);
689 TALLOC_FREE(state->tevent_handle);
692 * Just set the [new_]waiters entry to NULL, be careful not to mess
693 * with the other "waiters" array contents. We are often called from
694 * within "messaging_dispatch_rec", which loops over
695 * "waiters". Messing with the "waiters" array will mess up that
696 * for-loop.
699 for (i=0; i<msg_ctx->num_waiters; i++) {
700 if (msg_ctx->waiters[i] == req) {
701 msg_ctx->waiters[i] = NULL;
702 return;
706 for (i=0; i<msg_ctx->num_new_waiters; i++) {
707 if (msg_ctx->new_waiters[i] == req) {
708 msg_ctx->new_waiters[i] = NULL;
709 return;
714 static void messaging_filtered_read_done(struct tevent_req *req,
715 struct messaging_rec *rec)
717 struct messaging_filtered_read_state *state = tevent_req_data(
718 req, struct messaging_filtered_read_state);
720 state->rec = messaging_rec_dup(state, rec);
721 if (tevent_req_nomem(state->rec, req)) {
722 return;
724 tevent_req_done(req);
727 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
728 struct messaging_rec **presult)
730 struct messaging_filtered_read_state *state = tevent_req_data(
731 req, struct messaging_filtered_read_state);
732 int err;
734 if (tevent_req_is_unix_error(req, &err)) {
735 tevent_req_received(req);
736 return err;
738 *presult = talloc_move(mem_ctx, &state->rec);
739 return 0;
742 struct messaging_read_state {
743 uint32_t msg_type;
744 struct messaging_rec *rec;
747 static bool messaging_read_filter(struct messaging_rec *rec,
748 void *private_data);
749 static void messaging_read_done(struct tevent_req *subreq);
751 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
752 struct tevent_context *ev,
753 struct messaging_context *msg,
754 uint32_t msg_type)
756 struct tevent_req *req, *subreq;
757 struct messaging_read_state *state;
759 req = tevent_req_create(mem_ctx, &state,
760 struct messaging_read_state);
761 if (req == NULL) {
762 return NULL;
764 state->msg_type = msg_type;
766 subreq = messaging_filtered_read_send(state, ev, msg,
767 messaging_read_filter, state);
768 if (tevent_req_nomem(subreq, req)) {
769 return tevent_req_post(req, ev);
771 tevent_req_set_callback(subreq, messaging_read_done, req);
772 return req;
775 static bool messaging_read_filter(struct messaging_rec *rec,
776 void *private_data)
778 struct messaging_read_state *state = talloc_get_type_abort(
779 private_data, struct messaging_read_state);
781 if (rec->num_fds != 0) {
782 return false;
785 return rec->msg_type == state->msg_type;
788 static void messaging_read_done(struct tevent_req *subreq)
790 struct tevent_req *req = tevent_req_callback_data(
791 subreq, struct tevent_req);
792 struct messaging_read_state *state = tevent_req_data(
793 req, struct messaging_read_state);
794 int ret;
796 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
797 TALLOC_FREE(subreq);
798 if (tevent_req_error(req, ret)) {
799 return;
801 tevent_req_done(req);
804 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
805 struct messaging_rec **presult)
807 struct messaging_read_state *state = tevent_req_data(
808 req, struct messaging_read_state);
809 int err;
811 if (tevent_req_is_unix_error(req, &err)) {
812 return err;
814 if (presult != NULL) {
815 *presult = talloc_move(mem_ctx, &state->rec);
817 return 0;
820 struct messaging_handler_state {
821 struct tevent_context *ev;
822 struct messaging_context *msg_ctx;
823 uint32_t msg_type;
824 bool (*handler)(struct messaging_context *msg_ctx,
825 struct messaging_rec **rec, void *private_data);
826 void *private_data;
829 static void messaging_handler_got_msg(struct tevent_req *subreq);
831 struct tevent_req *messaging_handler_send(
832 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
833 struct messaging_context *msg_ctx, uint32_t msg_type,
834 bool (*handler)(struct messaging_context *msg_ctx,
835 struct messaging_rec **rec, void *private_data),
836 void *private_data)
838 struct tevent_req *req, *subreq;
839 struct messaging_handler_state *state;
841 req = tevent_req_create(mem_ctx, &state,
842 struct messaging_handler_state);
843 if (req == NULL) {
844 return NULL;
846 state->ev = ev;
847 state->msg_ctx = msg_ctx;
848 state->msg_type = msg_type;
849 state->handler = handler;
850 state->private_data = private_data;
852 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
853 state->msg_type);
854 if (tevent_req_nomem(subreq, req)) {
855 return tevent_req_post(req, ev);
857 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
858 return req;
861 static void messaging_handler_got_msg(struct tevent_req *subreq)
863 struct tevent_req *req = tevent_req_callback_data(
864 subreq, struct tevent_req);
865 struct messaging_handler_state *state = tevent_req_data(
866 req, struct messaging_handler_state);
867 struct messaging_rec *rec;
868 int ret;
869 bool ok;
871 ret = messaging_read_recv(subreq, state, &rec);
872 TALLOC_FREE(subreq);
873 if (tevent_req_error(req, ret)) {
874 return;
877 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
878 state->msg_type);
879 if (tevent_req_nomem(subreq, req)) {
880 return;
882 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
884 ok = state->handler(state->msg_ctx, &rec, state->private_data);
885 TALLOC_FREE(rec);
886 if (ok) {
888 * Next round
890 return;
892 TALLOC_FREE(subreq);
893 tevent_req_done(req);
896 int messaging_handler_recv(struct tevent_req *req)
898 return tevent_req_simple_recv_unix(req);
901 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
903 if (msg_ctx->num_new_waiters == 0) {
904 return true;
907 if (talloc_array_length(msg_ctx->waiters) <
908 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
909 struct tevent_req **tmp;
910 tmp = talloc_realloc(
911 msg_ctx, msg_ctx->waiters, struct tevent_req *,
912 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
913 if (tmp == NULL) {
914 DEBUG(1, ("%s: talloc failed\n", __func__));
915 return false;
917 msg_ctx->waiters = tmp;
920 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
921 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
923 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
924 msg_ctx->num_new_waiters = 0;
926 return true;
930 Dispatch one messaging_rec
932 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
933 struct messaging_rec *rec)
935 struct messaging_callback *cb, *next;
936 unsigned i;
937 size_t j;
939 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
940 next = cb->next;
941 if (cb->msg_type != rec->msg_type) {
942 continue;
946 * the old style callbacks don't support fd passing
948 for (j=0; j < rec->num_fds; j++) {
949 int fd = rec->fds[j];
950 close(fd);
952 rec->num_fds = 0;
953 rec->fds = NULL;
955 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
956 rec->src, &rec->buf);
959 * we continue looking for matching messages after finding
960 * one. This matters for subsystems like the internal notify
961 * code which register more than one handler for the same
962 * message type
966 if (!messaging_append_new_waiters(msg_ctx)) {
967 for (j=0; j < rec->num_fds; j++) {
968 int fd = rec->fds[j];
969 close(fd);
971 rec->num_fds = 0;
972 rec->fds = NULL;
973 return;
976 i = 0;
977 while (i < msg_ctx->num_waiters) {
978 struct tevent_req *req;
979 struct messaging_filtered_read_state *state;
981 req = msg_ctx->waiters[i];
982 if (req == NULL) {
984 * This got cleaned up. In the meantime,
985 * move everything down one. We need
986 * to keep the order of waiters, as
987 * other code may depend on this.
989 if (i < msg_ctx->num_waiters - 1) {
990 memmove(&msg_ctx->waiters[i],
991 &msg_ctx->waiters[i+1],
992 sizeof(struct tevent_req *) *
993 (msg_ctx->num_waiters - i - 1));
995 msg_ctx->num_waiters -= 1;
996 continue;
999 state = tevent_req_data(
1000 req, struct messaging_filtered_read_state);
1001 if (state->filter(rec, state->private_data)) {
1002 messaging_filtered_read_done(req, rec);
1005 * Only the first one gets the fd-array
1007 rec->num_fds = 0;
1008 rec->fds = NULL;
1011 i += 1;
1015 * If the fd-array isn't used, just close it.
1017 for (j=0; j < rec->num_fds; j++) {
1018 int fd = rec->fds[j];
1019 close(fd);
1021 rec->num_fds = 0;
1022 rec->fds = NULL;
1025 static int mess_parent_dgm_cleanup(void *private_data);
1026 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1028 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1030 struct tevent_req *req;
1032 req = background_job_send(
1033 msg, msg->event_ctx, msg, NULL, 0,
1034 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1035 60*15),
1036 mess_parent_dgm_cleanup, msg);
1037 if (req == NULL) {
1038 return false;
1040 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1041 return true;
1044 static int mess_parent_dgm_cleanup(void *private_data)
1046 int ret;
1048 ret = messaging_dgm_wipe();
1049 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1050 ret ? strerror(ret) : "ok"));
1051 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1052 60*15);
1055 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1057 struct messaging_context *msg = tevent_req_callback_data(
1058 req, struct messaging_context);
1059 NTSTATUS status;
1061 status = background_job_recv(req);
1062 TALLOC_FREE(req);
1063 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1064 nt_errstr(status)));
1066 req = background_job_send(
1067 msg, msg->event_ctx, msg, NULL, 0,
1068 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1069 60*15),
1070 mess_parent_dgm_cleanup, msg);
1071 if (req == NULL) {
1072 DEBUG(1, ("background_job_send failed\n"));
1073 return;
1075 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1078 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1080 int ret;
1082 if (pid == 0) {
1083 ret = messaging_dgm_wipe();
1084 } else {
1085 ret = messaging_dgm_cleanup(pid);
1088 return ret;
1091 struct tevent_context *messaging_tevent_context(
1092 struct messaging_context *msg_ctx)
1094 return msg_ctx->event_ctx;
1097 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1099 return msg_ctx->names_db;
1102 /** @} **/