lib:util: Move discard_const(_p) to own header for libndr.h
[Samba.git] / source3 / lib / messages_dgm.c
blobaaafcc103078dc547c8fdc847aac275c2cf44ef7
1 /*
2 * Unix SMB/CIFS implementation.
3 * Samba internal messaging functions
4 * Copyright (C) 2013 by Volker Lendecke
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/select.h"
25 #include "lib/util/debug.h"
26 #include "lib/messages_dgm.h"
27 #include "lib/util/genrand.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/pthreadpool/pthreadpool_tevent.h"
30 #include "lib/util/msghdr.h"
31 #include "lib/util/iov_buf.h"
32 #include "lib/util/blocking.h"
33 #include "lib/util/tevent_unix.h"
35 #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
37 struct sun_path_buf {
39 * This will carry enough for a socket path
41 char buf[sizeof(struct sockaddr_un)];
45 * We can only have one tevent_fd per dgm_context and per
46 * tevent_context. Maintain a list of registered tevent_contexts per
47 * dgm_context.
49 struct messaging_dgm_fde_ev {
50 struct messaging_dgm_fde_ev *prev, *next;
53 * Backreference to enable DLIST_REMOVE from our
54 * destructor. Also, set to NULL when the dgm_context dies
55 * before the messaging_dgm_fde_ev.
57 struct messaging_dgm_context *ctx;
59 struct tevent_context *ev;
60 struct tevent_fd *fde;
63 struct messaging_dgm_out {
64 struct messaging_dgm_out *prev, *next;
65 struct messaging_dgm_context *ctx;
67 pid_t pid;
68 int sock;
69 bool is_blocking;
70 uint64_t cookie;
72 struct tevent_queue *queue;
73 struct tevent_timer *idle_timer;
76 struct messaging_dgm_in_msg {
77 struct messaging_dgm_in_msg *prev, *next;
78 struct messaging_dgm_context *ctx;
79 size_t msglen;
80 size_t received;
81 pid_t sender_pid;
82 int sender_sock;
83 uint64_t cookie;
84 uint8_t buf[];
87 struct messaging_dgm_context {
88 struct tevent_context *ev;
89 pid_t pid;
90 struct sun_path_buf socket_dir;
91 struct sun_path_buf lockfile_dir;
92 int lockfile_fd;
94 int sock;
95 struct messaging_dgm_in_msg *in_msgs;
97 struct messaging_dgm_fde_ev *fde_evs;
98 void (*recv_cb)(struct tevent_context *ev,
99 const uint8_t *msg,
100 size_t msg_len,
101 int *fds,
102 size_t num_fds,
103 void *private_data);
104 void *recv_cb_private_data;
106 bool *have_dgm_context;
108 struct pthreadpool_tevent *pool;
109 struct messaging_dgm_out *outsocks;
112 /* Set socket close on exec. */
113 static int prepare_socket_cloexec(int sock)
115 #ifdef FD_CLOEXEC
116 int flags;
118 flags = fcntl(sock, F_GETFD, 0);
119 if (flags == -1) {
120 return errno;
122 flags |= FD_CLOEXEC;
123 if (fcntl(sock, F_SETFD, flags) == -1) {
124 return errno;
126 #endif
127 return 0;
130 static void close_fd_array(int *fds, size_t num_fds)
132 size_t i;
134 for (i = 0; i < num_fds; i++) {
135 if (fds[i] == -1) {
136 continue;
139 close(fds[i]);
140 fds[i] = -1;
145 * The idle handler can free the struct messaging_dgm_out *,
146 * if it's unused (qlen of zero) which closes the socket.
149 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
150 struct tevent_timer *te,
151 struct timeval current_time,
152 void *private_data)
154 struct messaging_dgm_out *out = talloc_get_type_abort(
155 private_data, struct messaging_dgm_out);
156 size_t qlen;
158 out->idle_timer = NULL;
160 qlen = tevent_queue_length(out->queue);
161 if (qlen == 0) {
162 TALLOC_FREE(out);
167 * Setup the idle handler to fire afer 1 second if the
168 * queue is zero.
171 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
173 size_t qlen;
175 qlen = tevent_queue_length(out->queue);
176 if (qlen != 0) {
177 TALLOC_FREE(out->idle_timer);
178 return;
181 if (out->idle_timer != NULL) {
182 tevent_update_timer(out->idle_timer,
183 tevent_timeval_current_ofs(1, 0));
184 return;
187 out->idle_timer = tevent_add_timer(
188 out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
189 messaging_dgm_out_idle_handler, out);
191 * No NULL check, we'll come back here. Worst case we're
192 * leaking a bit.
196 static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
197 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
198 struct tevent_timer *te,
199 struct timeval current_time,
200 void *private_data);
203 * Connect to an existing rendezvous point for another
204 * pid - wrapped inside a struct messaging_dgm_out *.
207 static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
208 struct messaging_dgm_context *ctx,
209 pid_t pid, struct messaging_dgm_out **pout)
211 struct messaging_dgm_out *out;
212 struct sockaddr_un addr = { .sun_family = AF_UNIX };
213 int ret = ENOMEM;
214 int out_pathlen;
215 char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
217 out = talloc(mem_ctx, struct messaging_dgm_out);
218 if (out == NULL) {
219 goto fail;
222 *out = (struct messaging_dgm_out) {
223 .pid = pid,
224 .ctx = ctx,
225 .cookie = 1
228 out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
229 "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
230 if (out_pathlen < 0) {
231 goto errno_fail;
233 if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
234 ret = ENAMETOOLONG;
235 goto fail;
238 memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
240 out->queue = tevent_queue_create(out, addr.sun_path);
241 if (out->queue == NULL) {
242 ret = ENOMEM;
243 goto fail;
246 out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
247 if (out->sock == -1) {
248 goto errno_fail;
251 DLIST_ADD(ctx->outsocks, out);
252 talloc_set_destructor(out, messaging_dgm_out_destructor);
254 do {
255 ret = connect(out->sock,
256 (const struct sockaddr *)(const void *)&addr,
257 sizeof(addr));
258 } while ((ret == -1) && (errno == EINTR));
260 if (ret == -1) {
261 goto errno_fail;
264 ret = set_blocking(out->sock, false);
265 if (ret == -1) {
266 goto errno_fail;
268 out->is_blocking = false;
270 *pout = out;
271 return 0;
272 errno_fail:
273 ret = errno;
274 fail:
275 TALLOC_FREE(out);
276 return ret;
279 static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
281 DLIST_REMOVE(out->ctx->outsocks, out);
283 if ((tevent_queue_length(out->queue) != 0) &&
284 (getpid() == out->ctx->pid)) {
286 * We have pending jobs. We can't close the socket,
287 * this has been handed over to messaging_dgm_out_queue_state.
289 return 0;
292 if (out->sock != -1) {
293 close(out->sock);
294 out->sock = -1;
296 return 0;
300 * Find the struct messaging_dgm_out * to talk to pid.
301 * If we don't have one, create it. Set the timer to
302 * delete after 1 sec.
305 static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
306 struct messaging_dgm_out **pout)
308 struct messaging_dgm_out *out;
309 int ret;
311 for (out = ctx->outsocks; out != NULL; out = out->next) {
312 if (out->pid == pid) {
313 break;
317 if (out == NULL) {
318 ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
319 if (ret != 0) {
320 return ret;
324 messaging_dgm_out_rearm_idle_timer(out);
326 *pout = out;
327 return 0;
331 * This function is called directly to send a message fragment
332 * when the outgoing queue is zero, and from a pthreadpool
333 * job thread when messages are being queued (qlen != 0).
334 * Make sure *ONLY* thread-safe functions are called within.
337 static ssize_t messaging_dgm_sendmsg(int sock,
338 const struct iovec *iov, int iovlen,
339 const int *fds, size_t num_fds,
340 int *perrno)
342 struct msghdr msg;
343 ssize_t fdlen, ret;
346 * Do the actual sendmsg syscall. This will be called from a
347 * pthreadpool helper thread, so be careful what you do here.
350 msg = (struct msghdr) {
351 .msg_iov = discard_const_p(struct iovec, iov),
352 .msg_iovlen = iovlen
355 fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
356 if (fdlen == -1) {
357 *perrno = EINVAL;
358 return -1;
362 uint8_t buf[fdlen];
364 msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
366 do {
367 ret = sendmsg(sock, &msg, 0);
368 } while ((ret == -1) && (errno == EINTR));
371 if (ret == -1) {
372 *perrno = errno;
374 return ret;
377 struct messaging_dgm_out_queue_state {
378 struct tevent_context *ev;
379 struct pthreadpool_tevent *pool;
381 struct tevent_req *req;
382 struct tevent_req *subreq;
384 int sock;
386 int *fds;
387 uint8_t *buf;
389 ssize_t sent;
390 int err;
393 static int messaging_dgm_out_queue_state_destructor(
394 struct messaging_dgm_out_queue_state *state);
395 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
396 void *private_data);
397 static void messaging_dgm_out_threaded_job(void *private_data);
398 static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
401 * Push a message fragment onto a queue to be sent by a
402 * threadpool job. Makes copies of data/fd's to be sent.
403 * The running tevent_queue internally creates an immediate
404 * event to schedule the write.
407 static struct tevent_req *messaging_dgm_out_queue_send(
408 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
409 struct messaging_dgm_out *out,
410 const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
412 struct tevent_req *req;
413 struct messaging_dgm_out_queue_state *state;
414 struct tevent_queue_entry *e;
415 size_t i;
416 ssize_t buflen;
418 req = tevent_req_create(out, &state,
419 struct messaging_dgm_out_queue_state);
420 if (req == NULL) {
421 return NULL;
423 state->ev = ev;
424 state->pool = out->ctx->pool;
425 state->sock = out->sock;
426 state->req = req;
429 * Go blocking in a thread
431 if (!out->is_blocking) {
432 int ret = set_blocking(out->sock, true);
433 if (ret == -1) {
434 tevent_req_error(req, errno);
435 return tevent_req_post(req, ev);
437 out->is_blocking = true;
440 buflen = iov_buflen(iov, iovlen);
441 if (buflen == -1) {
442 tevent_req_error(req, EMSGSIZE);
443 return tevent_req_post(req, ev);
446 state->buf = talloc_array(state, uint8_t, buflen);
447 if (tevent_req_nomem(state->buf, req)) {
448 return tevent_req_post(req, ev);
450 iov_buf(iov, iovlen, state->buf, buflen);
452 state->fds = talloc_array(state, int, num_fds);
453 if (tevent_req_nomem(state->fds, req)) {
454 return tevent_req_post(req, ev);
457 for (i=0; i<num_fds; i++) {
458 state->fds[i] = -1;
461 for (i=0; i<num_fds; i++) {
463 state->fds[i] = dup(fds[i]);
465 if (state->fds[i] == -1) {
466 int ret = errno;
468 close_fd_array(state->fds, num_fds);
470 tevent_req_error(req, ret);
471 return tevent_req_post(req, ev);
475 talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
477 e = tevent_queue_add_entry(out->queue, ev, req,
478 messaging_dgm_out_queue_trigger, req);
479 if (tevent_req_nomem(e, req)) {
480 return tevent_req_post(req, ev);
482 return req;
485 static int messaging_dgm_out_queue_state_destructor(
486 struct messaging_dgm_out_queue_state *state)
488 int *fds;
489 size_t num_fds;
491 if (state->subreq != NULL) {
493 * We're scheduled, but we're destroyed. This happens
494 * if the messaging_dgm_context is destroyed while
495 * we're stuck in a blocking send. There's nothing we
496 * can do but to leak memory.
498 TALLOC_FREE(state->subreq);
499 (void)talloc_reparent(state->req, NULL, state);
500 return -1;
503 fds = state->fds;
504 num_fds = talloc_array_length(fds);
505 close_fd_array(fds, num_fds);
506 return 0;
510 * tevent_queue callback that schedules the pthreadpool to actually
511 * send the queued message fragment.
514 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
515 void *private_data)
517 struct messaging_dgm_out_queue_state *state = tevent_req_data(
518 req, struct messaging_dgm_out_queue_state);
520 tevent_req_reset_endtime(req);
522 state->subreq = pthreadpool_tevent_job_send(
523 state, state->ev, state->pool,
524 messaging_dgm_out_threaded_job, state);
525 if (tevent_req_nomem(state->subreq, req)) {
526 return;
528 tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
529 req);
533 * Wrapper function run by the pthread that calls
534 * messaging_dgm_sendmsg() to actually do the sendmsg().
537 static void messaging_dgm_out_threaded_job(void *private_data)
539 struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
540 private_data, struct messaging_dgm_out_queue_state);
542 struct iovec iov = { .iov_base = state->buf,
543 .iov_len = talloc_get_size(state->buf) };
544 size_t num_fds = talloc_array_length(state->fds);
545 int msec = 1;
547 while (true) {
548 int ret;
550 state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
551 state->fds, num_fds, &state->err);
553 if (state->sent != -1) {
554 return;
556 if (state->err != ENOBUFS) {
557 return;
561 * ENOBUFS is the FreeBSD way of saying "Try
562 * again". We have to do polling.
564 do {
565 ret = poll(NULL, 0, msec);
566 } while ((ret == -1) && (errno == EINTR));
569 * Exponential backoff up to once a second
571 msec *= 2;
572 msec = MIN(msec, 1000);
577 * Pickup the results of the pthread sendmsg().
580 static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
582 struct tevent_req *req = tevent_req_callback_data(
583 subreq, struct tevent_req);
584 struct messaging_dgm_out_queue_state *state = tevent_req_data(
585 req, struct messaging_dgm_out_queue_state);
586 int ret;
588 if (subreq != state->subreq) {
589 abort();
592 ret = pthreadpool_tevent_job_recv(subreq);
594 TALLOC_FREE(subreq);
595 state->subreq = NULL;
597 if (tevent_req_error(req, ret)) {
598 return;
600 if (state->sent == -1) {
601 tevent_req_error(req, state->err);
602 return;
604 tevent_req_done(req);
607 static int messaging_dgm_out_queue_recv(struct tevent_req *req)
609 return tevent_req_simple_recv_unix(req);
612 static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
615 * Core function to send a message fragment given a
616 * connected struct messaging_dgm_out * destination.
617 * If no current queue tries to send nonblocking
618 * directly. If not, queues the fragment (which makes
619 * a copy of it) and adds a 60-second timeout on the send.
622 static int messaging_dgm_out_send_fragment(
623 struct tevent_context *ev, struct messaging_dgm_out *out,
624 const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
626 struct tevent_req *req;
627 size_t qlen;
628 bool ok;
630 qlen = tevent_queue_length(out->queue);
631 if (qlen == 0) {
632 ssize_t nsent;
633 int err = 0;
635 if (out->is_blocking) {
636 int ret = set_blocking(out->sock, false);
637 if (ret == -1) {
638 return errno;
640 out->is_blocking = false;
643 nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
644 num_fds, &err);
645 if (nsent >= 0) {
646 return 0;
649 if (err == ENOBUFS) {
651 * FreeBSD's way of telling us the dst socket
652 * is full. EWOULDBLOCK makes us spawn a
653 * polling helper thread.
655 err = EWOULDBLOCK;
658 if (err != EWOULDBLOCK) {
659 return err;
663 req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
664 fds, num_fds);
665 if (req == NULL) {
666 return ENOMEM;
668 tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
670 ok = tevent_req_set_endtime(req, ev,
671 tevent_timeval_current_ofs(60, 0));
672 if (!ok) {
673 TALLOC_FREE(req);
674 return ENOMEM;
677 return 0;
681 * Pickup the result of the fragment send. Reset idle timer
682 * if queue empty.
685 static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
687 struct messaging_dgm_out *out = tevent_req_callback_data(
688 req, struct messaging_dgm_out);
689 int ret;
691 ret = messaging_dgm_out_queue_recv(req);
692 TALLOC_FREE(req);
694 if (ret != 0) {
695 DBG_WARNING("messaging_out_queue_recv returned %s\n",
696 strerror(ret));
699 messaging_dgm_out_rearm_idle_timer(out);
703 struct messaging_dgm_fragment_hdr {
704 size_t msglen;
705 pid_t pid;
706 int sock;
710 * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
711 * size chunks and send it.
713 * Message fragments are prefixed by a 64-bit cookie that
714 * stays the same for all fragments. This allows the receiver
715 * to recognise fragments of the same message and re-assemble
716 * them on the other end.
718 * Note that this allows other message fragments from other
719 * senders to be interleaved in the receive read processing,
720 * the combination of the cookie and header info allows unique
721 * identification of the message from a specific sender in
722 * re-assembly.
724 * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
725 * then send a single message with cookie set to zero.
727 * Otherwise the message is fragmented into chunks and added
728 * to the sending queue. Any file descriptors are passed only
729 * in the last fragment.
731 * Finally the cookie is incremented (wrap over zero) to
732 * prepare for the next message sent to this channel.
736 static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
737 struct messaging_dgm_out *out,
738 const struct iovec *iov,
739 int iovlen,
740 const int *fds, size_t num_fds)
742 ssize_t msglen, sent;
743 int ret = 0;
744 struct iovec iov_copy[iovlen+2];
745 struct messaging_dgm_fragment_hdr hdr;
746 struct iovec src_iov;
748 if (iovlen < 0) {
749 return EINVAL;
752 msglen = iov_buflen(iov, iovlen);
753 if (msglen == -1) {
754 return EMSGSIZE;
756 if (num_fds > INT8_MAX) {
757 return EINVAL;
760 if ((size_t) msglen <=
761 (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
762 uint64_t cookie = 0;
764 iov_copy[0].iov_base = &cookie;
765 iov_copy[0].iov_len = sizeof(cookie);
766 if (iovlen > 0) {
767 memcpy(&iov_copy[1], iov,
768 sizeof(struct iovec) * iovlen);
771 return messaging_dgm_out_send_fragment(
772 ev, out, iov_copy, iovlen+1, fds, num_fds);
776 hdr = (struct messaging_dgm_fragment_hdr) {
777 .msglen = msglen,
778 .pid = getpid(),
779 .sock = out->sock
782 iov_copy[0].iov_base = &out->cookie;
783 iov_copy[0].iov_len = sizeof(out->cookie);
784 iov_copy[1].iov_base = &hdr;
785 iov_copy[1].iov_len = sizeof(hdr);
787 sent = 0;
788 src_iov = iov[0];
791 * The following write loop sends the user message in pieces. We have
792 * filled the first two iovecs above with "cookie" and "hdr". In the
793 * following loops we pull message chunks from the user iov array and
794 * fill iov_copy piece by piece, possibly truncating chunks from the
795 * caller's iov array. Ugly, but hopefully efficient.
798 while (sent < msglen) {
799 size_t fragment_len;
800 size_t iov_index = 2;
802 fragment_len = sizeof(out->cookie) + sizeof(hdr);
804 while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
805 size_t space, chunk;
807 space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
808 chunk = MIN(space, src_iov.iov_len);
810 iov_copy[iov_index].iov_base = src_iov.iov_base;
811 iov_copy[iov_index].iov_len = chunk;
812 iov_index += 1;
814 src_iov.iov_base = (char *)src_iov.iov_base + chunk;
815 src_iov.iov_len -= chunk;
816 fragment_len += chunk;
818 if (src_iov.iov_len == 0) {
819 iov += 1;
820 iovlen -= 1;
821 if (iovlen == 0) {
822 break;
824 src_iov = iov[0];
827 sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
830 * only the last fragment should pass the fd array.
831 * That simplifies the receiver a lot.
833 if (sent < msglen) {
834 ret = messaging_dgm_out_send_fragment(
835 ev, out, iov_copy, iov_index, NULL, 0);
836 } else {
837 ret = messaging_dgm_out_send_fragment(
838 ev, out, iov_copy, iov_index, fds, num_fds);
840 if (ret != 0) {
841 break;
845 out->cookie += 1;
846 if (out->cookie == 0) {
847 out->cookie += 1;
850 return ret;
853 static struct messaging_dgm_context *global_dgm_context;
855 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
857 static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
858 pid_t pid, int *plockfile_fd,
859 uint64_t *punique)
861 char buf[64];
862 int lockfile_fd;
863 struct sun_path_buf lockfile_name;
864 struct flock lck;
865 uint64_t unique;
866 int unique_len, ret;
867 ssize_t written;
869 ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
870 "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
871 if (ret < 0) {
872 return errno;
874 if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
875 return ENAMETOOLONG;
878 /* no O_EXCL, existence check is via the fcntl lock */
880 lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
881 0644);
883 if ((lockfile_fd == -1) &&
884 ((errno == ENXIO) /* Linux */ ||
885 (errno == ENODEV) /* Linux kernel bug */ ||
886 (errno == EOPNOTSUPP) /* FreeBSD */)) {
888 * Huh -- a socket? This might be a stale socket from
889 * an upgrade of Samba. Just unlink and retry, nobody
890 * else is supposed to be here at this time.
892 * Yes, this is racy, but I don't see a way to deal
893 * with this properly.
895 unlink(lockfile_name.buf);
897 lockfile_fd = open(lockfile_name.buf,
898 O_NONBLOCK|O_CREAT|O_WRONLY,
899 0644);
902 if (lockfile_fd == -1) {
903 ret = errno;
904 DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
905 return ret;
908 lck = (struct flock) {
909 .l_type = F_WRLCK,
910 .l_whence = SEEK_SET
913 ret = fcntl(lockfile_fd, F_SETLK, &lck);
914 if (ret == -1) {
915 ret = errno;
916 DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
917 goto fail_close;
921 * Directly using the binary value for
922 * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
923 * violation. But including all of ndr here just for this
924 * seems to be a bit overkill to me. Also, messages_dgm might
925 * be replaced sooner or later by something streams-based,
926 * where unique_id generation will be handled differently.
929 do {
930 generate_random_buffer((uint8_t *)&unique, sizeof(unique));
931 } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
933 unique_len = snprintf(buf, sizeof(buf), "%ju\n", (uintmax_t)unique);
935 /* shorten a potentially preexisting file */
937 ret = ftruncate(lockfile_fd, unique_len);
938 if (ret == -1) {
939 ret = errno;
940 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
941 strerror(ret)));
942 goto fail_unlink;
945 written = write(lockfile_fd, buf, unique_len);
946 if (written != unique_len) {
947 ret = errno;
948 DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
949 goto fail_unlink;
952 *plockfile_fd = lockfile_fd;
953 *punique = unique;
954 return 0;
956 fail_unlink:
957 unlink(lockfile_name.buf);
958 fail_close:
959 close(lockfile_fd);
960 return ret;
963 static void messaging_dgm_read_handler(struct tevent_context *ev,
964 struct tevent_fd *fde,
965 uint16_t flags,
966 void *private_data);
969 * Create the rendezvous point in the file system
970 * that other processes can use to send messages to
971 * this pid.
974 int messaging_dgm_init(struct tevent_context *ev,
975 uint64_t *punique,
976 const char *socket_dir,
977 const char *lockfile_dir,
978 void (*recv_cb)(struct tevent_context *ev,
979 const uint8_t *msg,
980 size_t msg_len,
981 int *fds,
982 size_t num_fds,
983 void *private_data),
984 void *recv_cb_private_data)
986 struct messaging_dgm_context *ctx;
987 int ret;
988 struct sockaddr_un socket_address;
989 size_t len;
990 static bool have_dgm_context = false;
992 if (have_dgm_context) {
993 return EEXIST;
996 ctx = talloc_zero(NULL, struct messaging_dgm_context);
997 if (ctx == NULL) {
998 goto fail_nomem;
1000 ctx->ev = ev;
1001 ctx->pid = getpid();
1002 ctx->recv_cb = recv_cb;
1003 ctx->recv_cb_private_data = recv_cb_private_data;
1005 len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
1006 sizeof(ctx->lockfile_dir.buf));
1007 if (len >= sizeof(ctx->lockfile_dir.buf)) {
1008 TALLOC_FREE(ctx);
1009 return ENAMETOOLONG;
1012 len = strlcpy(ctx->socket_dir.buf, socket_dir,
1013 sizeof(ctx->socket_dir.buf));
1014 if (len >= sizeof(ctx->socket_dir.buf)) {
1015 TALLOC_FREE(ctx);
1016 return ENAMETOOLONG;
1019 socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
1020 len = snprintf(socket_address.sun_path,
1021 sizeof(socket_address.sun_path),
1022 "%s/%u", socket_dir, (unsigned)ctx->pid);
1023 if (len >= sizeof(socket_address.sun_path)) {
1024 TALLOC_FREE(ctx);
1025 return ENAMETOOLONG;
1028 ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
1029 punique);
1030 if (ret != 0) {
1031 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1032 __func__, strerror(ret)));
1033 TALLOC_FREE(ctx);
1034 return ret;
1037 unlink(socket_address.sun_path);
1039 ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
1040 if (ctx->sock == -1) {
1041 ret = errno;
1042 DBG_WARNING("socket failed: %s\n", strerror(ret));
1043 TALLOC_FREE(ctx);
1044 return ret;
1047 ret = prepare_socket_cloexec(ctx->sock);
1048 if (ret == -1) {
1049 ret = errno;
1050 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1051 strerror(ret));
1052 TALLOC_FREE(ctx);
1053 return ret;
1056 ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
1057 sizeof(socket_address));
1058 if (ret == -1) {
1059 ret = errno;
1060 DBG_WARNING("bind failed: %s\n", strerror(ret));
1061 TALLOC_FREE(ctx);
1062 return ret;
1065 talloc_set_destructor(ctx, messaging_dgm_context_destructor);
1067 ctx->have_dgm_context = &have_dgm_context;
1069 ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
1070 if (ret != 0) {
1071 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1072 strerror(ret));
1073 TALLOC_FREE(ctx);
1074 return ret;
1077 global_dgm_context = ctx;
1078 return 0;
1080 fail_nomem:
1081 TALLOC_FREE(ctx);
1082 return ENOMEM;
1086 * Remove the rendezvous point in the filesystem
1087 * if we're the owner.
1090 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
1092 while (c->outsocks != NULL) {
1093 TALLOC_FREE(c->outsocks);
1095 while (c->in_msgs != NULL) {
1096 TALLOC_FREE(c->in_msgs);
1098 while (c->fde_evs != NULL) {
1099 tevent_fd_set_flags(c->fde_evs->fde, 0);
1100 c->fde_evs->ctx = NULL;
1101 DLIST_REMOVE(c->fde_evs, c->fde_evs);
1104 close(c->sock);
1106 if (getpid() == c->pid) {
1107 struct sun_path_buf name;
1108 int ret;
1110 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1111 c->socket_dir.buf, (unsigned)c->pid);
1112 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1114 * We've checked the length when creating, so this
1115 * should never happen
1117 abort();
1119 unlink(name.buf);
1121 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1122 c->lockfile_dir.buf, (unsigned)c->pid);
1123 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1125 * We've checked the length when creating, so this
1126 * should never happen
1128 abort();
1130 unlink(name.buf);
1132 close(c->lockfile_fd);
1134 if (c->have_dgm_context != NULL) {
1135 *c->have_dgm_context = false;
1138 return 0;
1141 static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
1143 #ifdef DEVELOPER
1144 pid_t pid = getpid();
1145 struct sockaddr_storage addr;
1146 socklen_t addrlen = sizeof(addr);
1147 struct sockaddr_un *un_addr;
1148 struct sun_path_buf pathbuf;
1149 struct stat st1, st2;
1150 int ret;
1153 * Protect against using the wrong messaging context after a
1154 * fork without reinit_after_fork.
1157 ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
1158 if (ret == -1) {
1159 DBG_ERR("getsockname failed: %s\n", strerror(errno));
1160 goto fail;
1162 if (addr.ss_family != AF_UNIX) {
1163 DBG_ERR("getsockname returned family %d\n",
1164 (int)addr.ss_family);
1165 goto fail;
1167 un_addr = (struct sockaddr_un *)&addr;
1169 ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1170 "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
1171 if (ret < 0) {
1172 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1173 goto fail;
1175 if ((size_t)ret >= sizeof(pathbuf.buf)) {
1176 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1177 goto fail;
1180 if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
1181 DBG_ERR("sockname wrong: Expected %s, got %s\n",
1182 pathbuf.buf, un_addr->sun_path);
1183 goto fail;
1186 ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1187 "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
1188 if (ret < 0) {
1189 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1190 goto fail;
1192 if ((size_t)ret >= sizeof(pathbuf.buf)) {
1193 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1194 goto fail;
1197 ret = stat(pathbuf.buf, &st1);
1198 if (ret == -1) {
1199 DBG_ERR("stat failed: %s\n", strerror(errno));
1200 goto fail;
1202 ret = fstat(ctx->lockfile_fd, &st2);
1203 if (ret == -1) {
1204 DBG_ERR("fstat failed: %s\n", strerror(errno));
1205 goto fail;
1208 if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
1209 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1210 (int)st2.st_dev, (int)st2.st_ino,
1211 (int)st1.st_dev, (int)st1.st_ino);
1212 goto fail;
1215 return;
1216 fail:
1217 abort();
1218 #else
1219 return;
1220 #endif
1223 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1224 struct tevent_context *ev,
1225 uint8_t *msg, size_t msg_len,
1226 int *fds, size_t num_fds);
1229 * Raw read callback handler - passes to messaging_dgm_recv()
1230 * for fragment reassembly processing.
1233 static void messaging_dgm_read_handler(struct tevent_context *ev,
1234 struct tevent_fd *fde,
1235 uint16_t flags,
1236 void *private_data)
1238 struct messaging_dgm_context *ctx = talloc_get_type_abort(
1239 private_data, struct messaging_dgm_context);
1240 ssize_t received;
1241 struct msghdr msg;
1242 struct iovec iov;
1243 size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
1244 uint8_t msgbuf[msgbufsize];
1245 uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
1246 size_t num_fds;
1248 messaging_dgm_validate(ctx);
1250 if ((flags & TEVENT_FD_READ) == 0) {
1251 return;
1254 iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
1255 msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
1257 msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
1259 #ifdef MSG_CMSG_CLOEXEC
1260 msg.msg_flags |= MSG_CMSG_CLOEXEC;
1261 #endif
1263 received = recvmsg(ctx->sock, &msg, 0);
1264 if (received == -1) {
1265 if ((errno == EAGAIN) ||
1266 (errno == EWOULDBLOCK) ||
1267 (errno == EINTR) ||
1268 (errno == ENOMEM)) {
1269 /* Not really an error - just try again. */
1270 return;
1272 /* Problem with the socket. Set it unreadable. */
1273 tevent_fd_set_flags(fde, 0);
1274 return;
1277 if ((size_t)received > sizeof(buf)) {
1278 /* More than we expected, not for us */
1279 return;
1282 num_fds = msghdr_extract_fds(&msg, NULL, 0);
1283 if (num_fds == 0) {
1284 int fds[1];
1286 messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
1287 } else {
1288 size_t i;
1289 int fds[num_fds];
1291 msghdr_extract_fds(&msg, fds, num_fds);
1293 for (i = 0; i < num_fds; i++) {
1294 int err;
1296 err = prepare_socket_cloexec(fds[i]);
1297 if (err != 0) {
1298 close_fd_array(fds, num_fds);
1299 num_fds = 0;
1303 messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
1307 static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
1309 DLIST_REMOVE(m->ctx->in_msgs, m);
1310 return 0;
1314 * Deal with identification of fragmented messages and
1315 * re-assembly into full messages sent, then calls the
1316 * callback.
1319 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1320 struct tevent_context *ev,
1321 uint8_t *buf, size_t buflen,
1322 int *fds, size_t num_fds)
1324 struct messaging_dgm_fragment_hdr hdr;
1325 struct messaging_dgm_in_msg *msg;
1326 size_t space;
1327 uint64_t cookie;
1329 if (buflen < sizeof(cookie)) {
1330 goto close_fds;
1332 memcpy(&cookie, buf, sizeof(cookie));
1333 buf += sizeof(cookie);
1334 buflen -= sizeof(cookie);
1336 if (cookie == 0) {
1337 ctx->recv_cb(ev, buf, buflen, fds, num_fds,
1338 ctx->recv_cb_private_data);
1339 return;
1342 if (buflen < sizeof(hdr)) {
1343 goto close_fds;
1345 memcpy(&hdr, buf, sizeof(hdr));
1346 buf += sizeof(hdr);
1347 buflen -= sizeof(hdr);
1349 for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
1350 if ((msg->sender_pid == hdr.pid) &&
1351 (msg->sender_sock == hdr.sock)) {
1352 break;
1356 if ((msg != NULL) && (msg->cookie != cookie)) {
1357 TALLOC_FREE(msg);
1360 if (msg == NULL) {
1361 size_t msglen;
1362 msglen = offsetof(struct messaging_dgm_in_msg, buf) +
1363 hdr.msglen;
1365 msg = talloc_size(ctx, msglen);
1366 if (msg == NULL) {
1367 goto close_fds;
1369 talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
1371 *msg = (struct messaging_dgm_in_msg) {
1372 .ctx = ctx, .msglen = hdr.msglen,
1373 .sender_pid = hdr.pid, .sender_sock = hdr.sock,
1374 .cookie = cookie
1376 DLIST_ADD(ctx->in_msgs, msg);
1377 talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
1380 space = msg->msglen - msg->received;
1381 if (buflen > space) {
1382 goto close_fds;
1385 memcpy(msg->buf + msg->received, buf, buflen);
1386 msg->received += buflen;
1388 if (msg->received < msg->msglen) {
1390 * Any valid sender will send the fds in the last
1391 * block. Invalid senders might have sent fd's that we
1392 * need to close here.
1394 goto close_fds;
1397 DLIST_REMOVE(ctx->in_msgs, msg);
1398 talloc_set_destructor(msg, NULL);
1400 ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
1401 ctx->recv_cb_private_data);
1403 TALLOC_FREE(msg);
1404 return;
1406 close_fds:
1407 close_fd_array(fds, num_fds);
1410 void messaging_dgm_destroy(void)
1412 TALLOC_FREE(global_dgm_context);
1415 int messaging_dgm_send(pid_t pid,
1416 const struct iovec *iov, int iovlen,
1417 const int *fds, size_t num_fds)
1419 struct messaging_dgm_context *ctx = global_dgm_context;
1420 struct messaging_dgm_out *out;
1421 int ret;
1422 unsigned retries = 0;
1424 if (ctx == NULL) {
1425 return ENOTCONN;
1428 messaging_dgm_validate(ctx);
1430 again:
1431 ret = messaging_dgm_out_get(ctx, pid, &out);
1432 if (ret != 0) {
1433 return ret;
1436 DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
1438 ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
1439 fds, num_fds);
1440 if (ret == ECONNREFUSED) {
1442 * We cache outgoing sockets. If the receiver has
1443 * closed and re-opened the socket since our last
1444 * message, we get connection refused. Retry.
1447 TALLOC_FREE(out);
1449 if (retries < 5) {
1450 retries += 1;
1451 goto again;
1454 return ret;
1457 static int messaging_dgm_read_unique(int fd, uint64_t *punique)
1459 char buf[25];
1460 ssize_t rw_ret;
1461 unsigned long long unique;
1462 char *endptr;
1464 rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
1465 if (rw_ret == -1) {
1466 return errno;
1468 buf[rw_ret] = '\0';
1470 unique = strtoull(buf, &endptr, 10);
1471 if ((unique == 0) && (errno == EINVAL)) {
1472 return EINVAL;
1474 if ((unique == ULLONG_MAX) && (errno == ERANGE)) {
1475 return ERANGE;
1477 if (endptr[0] != '\n') {
1478 return EINVAL;
1480 *punique = unique;
1481 return 0;
1484 int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
1486 struct messaging_dgm_context *ctx = global_dgm_context;
1487 struct sun_path_buf lockfile_name;
1488 int ret, fd;
1490 if (ctx == NULL) {
1491 return EBADF;
1494 messaging_dgm_validate(ctx);
1496 if (pid == getpid()) {
1498 * Protect against losing our own lock
1500 return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
1503 ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
1504 "%s/%u", ctx->lockfile_dir.buf, (int)pid);
1505 if (ret < 0) {
1506 return errno;
1508 if ((size_t)ret >= sizeof(lockfile_name.buf)) {
1509 return ENAMETOOLONG;
1512 fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
1513 if (fd == -1) {
1514 return errno;
1517 ret = messaging_dgm_read_unique(fd, unique);
1518 close(fd);
1519 return ret;
1522 int messaging_dgm_cleanup(pid_t pid)
1524 struct messaging_dgm_context *ctx = global_dgm_context;
1525 struct sun_path_buf lockfile_name, socket_name;
1526 int fd, len, ret;
1527 struct flock lck = {};
1529 if (ctx == NULL) {
1530 return ENOTCONN;
1533 len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
1534 ctx->socket_dir.buf, (unsigned)pid);
1535 if (len < 0) {
1536 return errno;
1538 if ((size_t)len >= sizeof(socket_name.buf)) {
1539 return ENAMETOOLONG;
1542 len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
1543 ctx->lockfile_dir.buf, (unsigned)pid);
1544 if (len < 0) {
1545 return errno;
1547 if ((size_t)len >= sizeof(lockfile_name.buf)) {
1548 return ENAMETOOLONG;
1551 fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
1552 if (fd == -1) {
1553 ret = errno;
1554 if (ret != ENOENT) {
1555 DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
1556 lockfile_name.buf, strerror(ret)));
1558 return ret;
1561 lck.l_type = F_WRLCK;
1562 lck.l_whence = SEEK_SET;
1563 lck.l_start = 0;
1564 lck.l_len = 0;
1566 ret = fcntl(fd, F_SETLK, &lck);
1567 if (ret != 0) {
1568 ret = errno;
1569 if ((ret != EACCES) && (ret != EAGAIN)) {
1570 DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
1571 strerror(ret)));
1573 close(fd);
1574 return ret;
1577 DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
1579 (void)unlink(socket_name.buf);
1580 (void)unlink(lockfile_name.buf);
1581 (void)close(fd);
1582 return 0;
1585 static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
1587 pid_t *our_pid = (pid_t *)private_data;
1588 int ret;
1590 if (pid == *our_pid) {
1592 * fcntl(F_GETLK) will succeed for ourselves, we hold
1593 * that lock ourselves.
1595 return 0;
1598 ret = messaging_dgm_cleanup(pid);
1599 DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1600 (unsigned long)pid, ret ? strerror(ret) : "ok"));
1602 return 0;
1605 int messaging_dgm_wipe(void)
1607 pid_t pid = getpid();
1608 messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
1609 return 0;
1612 int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
1613 void *private_data)
1615 struct messaging_dgm_context *ctx = global_dgm_context;
1616 DIR *msgdir;
1617 struct dirent *dp;
1619 if (ctx == NULL) {
1620 return ENOTCONN;
1623 messaging_dgm_validate(ctx);
1626 * We scan the socket directory and not the lock directory. Otherwise
1627 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1628 * and fcntl(SETLK).
1631 msgdir = opendir(ctx->socket_dir.buf);
1632 if (msgdir == NULL) {
1633 return errno;
1636 while ((dp = readdir(msgdir)) != NULL) {
1637 unsigned long pid;
1638 int ret;
1640 pid = strtoul(dp->d_name, NULL, 10);
1641 if (pid == 0) {
1643 * . and .. and other malformed entries
1645 continue;
1648 ret = fn(pid, private_data);
1649 if (ret != 0) {
1650 break;
1653 closedir(msgdir);
1655 return 0;
1658 struct messaging_dgm_fde {
1659 struct tevent_fd *fde;
1662 static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
1664 if (fde_ev->ctx != NULL) {
1665 DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
1666 fde_ev->ctx = NULL;
1668 return 0;
1672 * Reference counter for a struct tevent_fd messaging read event
1673 * (with callback function) on a struct tevent_context registered
1674 * on a messaging context.
1676 * If we've already registered this struct tevent_context before
1677 * (so already have a read event), just increase the reference count.
1679 * Otherwise create a new struct tevent_fd messaging read event on the
1680 * previously unseen struct tevent_context - this is what drives
1681 * the message receive processing.
1685 struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
1686 TALLOC_CTX *mem_ctx, struct tevent_context *ev)
1688 struct messaging_dgm_context *ctx = global_dgm_context;
1689 struct messaging_dgm_fde_ev *fde_ev;
1690 struct messaging_dgm_fde *fde;
1692 if (ctx == NULL) {
1693 return NULL;
1696 fde = talloc(mem_ctx, struct messaging_dgm_fde);
1697 if (fde == NULL) {
1698 return NULL;
1701 for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
1702 if (tevent_fd_get_flags(fde_ev->fde) == 0) {
1704 * If the event context got deleted,
1705 * tevent_fd_get_flags() will return 0
1706 * for the stale fde.
1708 * In that case we should not
1709 * use fde_ev->ev anymore.
1711 continue;
1713 if (fde_ev->ev == ev) {
1714 break;
1718 if (fde_ev == NULL) {
1719 fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
1720 if (fde_ev == NULL) {
1721 return NULL;
1723 fde_ev->fde = tevent_add_fd(
1724 ev, fde_ev, ctx->sock, TEVENT_FD_READ,
1725 messaging_dgm_read_handler, ctx);
1726 if (fde_ev->fde == NULL) {
1727 TALLOC_FREE(fde);
1728 return NULL;
1730 fde_ev->ev = ev;
1731 fde_ev->ctx = ctx;
1732 DLIST_ADD(ctx->fde_evs, fde_ev);
1733 talloc_set_destructor(
1734 fde_ev, messaging_dgm_fde_ev_destructor);
1735 } else {
1737 * Same trick as with tdb_wrap: The caller will never
1738 * see the talloc_referenced object, the
1739 * messaging_dgm_fde_ev, so problems with
1740 * talloc_unlink will not happen.
1742 if (talloc_reference(fde, fde_ev) == NULL) {
1743 TALLOC_FREE(fde);
1744 return NULL;
1748 fde->fde = fde_ev->fde;
1749 return fde;
1752 bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
1754 uint16_t flags;
1756 if (fde == NULL) {
1757 return false;
1759 flags = tevent_fd_get_flags(fde->fde);
1760 return (flags != 0);