Revert "smbd: fullpath based on fsp->fsp_name may contain an @GMT token"
[samba.git] / source3 / lib / messages_dgm.c
blob661e032b908e022ad8dd215eb00079daf355478e
1 /*
2 * Unix SMB/CIFS implementation.
3 * Samba internal messaging functions
4 * Copyright (C) 2013 by Volker Lendecke
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "util/util.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/select.h"
26 #include "lib/util/debug.h"
27 #include "lib/messages_dgm.h"
28 #include "lib/util/genrand.h"
29 #include "lib/util/dlinklist.h"
30 #include "lib/pthreadpool/pthreadpool_tevent.h"
31 #include "lib/util/msghdr.h"
32 #include "lib/util/iov_buf.h"
33 #include "lib/util/blocking.h"
34 #include "lib/util/tevent_unix.h"
36 #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
38 struct sun_path_buf {
40 * This will carry enough for a socket path
42 char buf[sizeof(struct sockaddr_un)];
46 * We can only have one tevent_fd per dgm_context and per
47 * tevent_context. Maintain a list of registered tevent_contexts per
48 * dgm_context.
50 struct messaging_dgm_fde_ev {
51 struct messaging_dgm_fde_ev *prev, *next;
54 * Backreference to enable DLIST_REMOVE from our
55 * destructor. Also, set to NULL when the dgm_context dies
56 * before the messaging_dgm_fde_ev.
58 struct messaging_dgm_context *ctx;
60 struct tevent_context *ev;
61 struct tevent_fd *fde;
64 struct messaging_dgm_out {
65 struct messaging_dgm_out *prev, *next;
66 struct messaging_dgm_context *ctx;
68 pid_t pid;
69 int sock;
70 bool is_blocking;
71 uint64_t cookie;
73 struct tevent_queue *queue;
74 struct tevent_timer *idle_timer;
77 struct messaging_dgm_in_msg {
78 struct messaging_dgm_in_msg *prev, *next;
79 struct messaging_dgm_context *ctx;
80 size_t msglen;
81 size_t received;
82 pid_t sender_pid;
83 int sender_sock;
84 uint64_t cookie;
85 uint8_t buf[];
88 struct messaging_dgm_context {
89 struct tevent_context *ev;
90 pid_t pid;
91 struct sun_path_buf socket_dir;
92 struct sun_path_buf lockfile_dir;
93 int lockfile_fd;
95 int sock;
96 struct messaging_dgm_in_msg *in_msgs;
98 struct messaging_dgm_fde_ev *fde_evs;
99 void (*recv_cb)(struct tevent_context *ev,
100 const uint8_t *msg,
101 size_t msg_len,
102 int *fds,
103 size_t num_fds,
104 void *private_data);
105 void *recv_cb_private_data;
107 bool *have_dgm_context;
109 struct pthreadpool_tevent *pool;
110 struct messaging_dgm_out *outsocks;
113 /* Set socket close on exec. */
114 static int prepare_socket_cloexec(int sock)
116 #ifdef FD_CLOEXEC
117 int flags;
119 flags = fcntl(sock, F_GETFD, 0);
120 if (flags == -1) {
121 return errno;
123 flags |= FD_CLOEXEC;
124 if (fcntl(sock, F_SETFD, flags) == -1) {
125 return errno;
127 #endif
128 return 0;
131 static void close_fd_array(int *fds, size_t num_fds)
133 size_t i;
135 for (i = 0; i < num_fds; i++) {
136 if (fds[i] == -1) {
137 continue;
140 close(fds[i]);
141 fds[i] = -1;
146 * The idle handler can free the struct messaging_dgm_out *,
147 * if it's unused (qlen of zero) which closes the socket.
150 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
151 struct tevent_timer *te,
152 struct timeval current_time,
153 void *private_data)
155 struct messaging_dgm_out *out = talloc_get_type_abort(
156 private_data, struct messaging_dgm_out);
157 size_t qlen;
159 out->idle_timer = NULL;
161 qlen = tevent_queue_length(out->queue);
162 if (qlen == 0) {
163 TALLOC_FREE(out);
168 * Setup the idle handler to fire afer 1 second if the
169 * queue is zero.
172 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
174 size_t qlen;
176 qlen = tevent_queue_length(out->queue);
177 if (qlen != 0) {
178 TALLOC_FREE(out->idle_timer);
179 return;
182 if (out->idle_timer != NULL) {
183 tevent_update_timer(out->idle_timer,
184 tevent_timeval_current_ofs(1, 0));
185 return;
188 out->idle_timer = tevent_add_timer(
189 out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
190 messaging_dgm_out_idle_handler, out);
192 * No NULL check, we'll come back here. Worst case we're
193 * leaking a bit.
197 static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
198 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
199 struct tevent_timer *te,
200 struct timeval current_time,
201 void *private_data);
204 * Connect to an existing rendezvous point for another
205 * pid - wrapped inside a struct messaging_dgm_out *.
208 static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
209 struct messaging_dgm_context *ctx,
210 pid_t pid, struct messaging_dgm_out **pout)
212 struct messaging_dgm_out *out;
213 struct sockaddr_un addr = { .sun_family = AF_UNIX };
214 int ret = ENOMEM;
215 int out_pathlen;
216 char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
218 out = talloc(mem_ctx, struct messaging_dgm_out);
219 if (out == NULL) {
220 goto fail;
223 *out = (struct messaging_dgm_out) {
224 .pid = pid,
225 .ctx = ctx,
226 .cookie = 1
229 out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
230 "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
231 if (out_pathlen < 0) {
232 goto errno_fail;
234 if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
235 ret = ENAMETOOLONG;
236 goto fail;
239 memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
241 out->queue = tevent_queue_create(out, addr.sun_path);
242 if (out->queue == NULL) {
243 ret = ENOMEM;
244 goto fail;
247 out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
248 if (out->sock == -1) {
249 goto errno_fail;
252 DLIST_ADD(ctx->outsocks, out);
253 talloc_set_destructor(out, messaging_dgm_out_destructor);
255 do {
256 ret = connect(out->sock,
257 (const struct sockaddr *)(const void *)&addr,
258 sizeof(addr));
259 } while ((ret == -1) && (errno == EINTR));
261 if (ret == -1) {
262 goto errno_fail;
265 ret = set_blocking(out->sock, false);
266 if (ret == -1) {
267 goto errno_fail;
269 out->is_blocking = false;
271 *pout = out;
272 return 0;
273 errno_fail:
274 ret = errno;
275 fail:
276 TALLOC_FREE(out);
277 return ret;
280 static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
282 DLIST_REMOVE(out->ctx->outsocks, out);
284 if ((tevent_queue_length(out->queue) != 0) &&
285 (getpid() == out->ctx->pid)) {
287 * We have pending jobs. We can't close the socket,
288 * this has been handed over to messaging_dgm_out_queue_state.
290 return 0;
293 if (out->sock != -1) {
294 close(out->sock);
295 out->sock = -1;
297 return 0;
301 * Find the struct messaging_dgm_out * to talk to pid.
302 * If we don't have one, create it. Set the timer to
303 * delete after 1 sec.
306 static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
307 struct messaging_dgm_out **pout)
309 struct messaging_dgm_out *out;
310 int ret;
312 for (out = ctx->outsocks; out != NULL; out = out->next) {
313 if (out->pid == pid) {
314 break;
318 if (out == NULL) {
319 ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
320 if (ret != 0) {
321 return ret;
326 * shouldn't be possible, should be set if messaging_dgm_out_create
327 * succeeded. This check is to satisfy static checker
329 if (out == NULL) {
330 return EINVAL;
332 messaging_dgm_out_rearm_idle_timer(out);
334 *pout = out;
335 return 0;
339 * This function is called directly to send a message fragment
340 * when the outgoing queue is zero, and from a pthreadpool
341 * job thread when messages are being queued (qlen != 0).
342 * Make sure *ONLY* thread-safe functions are called within.
345 static ssize_t messaging_dgm_sendmsg(int sock,
346 const struct iovec *iov, int iovlen,
347 const int *fds, size_t num_fds,
348 int *perrno)
350 struct msghdr msg;
351 ssize_t fdlen, ret;
354 * Do the actual sendmsg syscall. This will be called from a
355 * pthreadpool helper thread, so be careful what you do here.
358 msg = (struct msghdr) {
359 .msg_iov = discard_const_p(struct iovec, iov),
360 .msg_iovlen = iovlen
363 fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
364 if (fdlen == -1) {
365 *perrno = EINVAL;
366 return -1;
370 uint8_t buf[fdlen];
372 msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
374 do {
375 ret = sendmsg(sock, &msg, 0);
376 } while ((ret == -1) && (errno == EINTR));
379 if (ret == -1) {
380 *perrno = errno;
382 return ret;
385 struct messaging_dgm_out_queue_state {
386 struct tevent_context *ev;
387 struct pthreadpool_tevent *pool;
389 struct tevent_req *req;
390 struct tevent_req *subreq;
392 int sock;
394 int *fds;
395 uint8_t *buf;
397 ssize_t sent;
398 int err;
401 static int messaging_dgm_out_queue_state_destructor(
402 struct messaging_dgm_out_queue_state *state);
403 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
404 void *private_data);
405 static void messaging_dgm_out_threaded_job(void *private_data);
406 static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
409 * Push a message fragment onto a queue to be sent by a
410 * threadpool job. Makes copies of data/fd's to be sent.
411 * The running tevent_queue internally creates an immediate
412 * event to schedule the write.
415 static struct tevent_req *messaging_dgm_out_queue_send(
416 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
417 struct messaging_dgm_out *out,
418 const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
420 struct tevent_req *req;
421 struct messaging_dgm_out_queue_state *state;
422 struct tevent_queue_entry *e;
423 size_t i;
424 ssize_t buflen;
426 req = tevent_req_create(out, &state,
427 struct messaging_dgm_out_queue_state);
428 if (req == NULL) {
429 return NULL;
431 state->ev = ev;
432 state->pool = out->ctx->pool;
433 state->sock = out->sock;
434 state->req = req;
437 * Go blocking in a thread
439 if (!out->is_blocking) {
440 int ret = set_blocking(out->sock, true);
441 if (ret == -1) {
442 tevent_req_error(req, errno);
443 return tevent_req_post(req, ev);
445 out->is_blocking = true;
448 buflen = iov_buflen(iov, iovlen);
449 if (buflen == -1) {
450 tevent_req_error(req, EMSGSIZE);
451 return tevent_req_post(req, ev);
454 state->buf = talloc_array(state, uint8_t, buflen);
455 if (tevent_req_nomem(state->buf, req)) {
456 return tevent_req_post(req, ev);
458 iov_buf(iov, iovlen, state->buf, buflen);
460 state->fds = talloc_array(state, int, num_fds);
461 if (tevent_req_nomem(state->fds, req)) {
462 return tevent_req_post(req, ev);
465 for (i=0; i<num_fds; i++) {
466 state->fds[i] = -1;
469 for (i=0; i<num_fds; i++) {
471 state->fds[i] = dup(fds[i]);
473 if (state->fds[i] == -1) {
474 int ret = errno;
476 close_fd_array(state->fds, num_fds);
478 tevent_req_error(req, ret);
479 return tevent_req_post(req, ev);
483 talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
485 e = tevent_queue_add_entry(out->queue, ev, req,
486 messaging_dgm_out_queue_trigger, req);
487 if (tevent_req_nomem(e, req)) {
488 return tevent_req_post(req, ev);
490 return req;
493 static int messaging_dgm_out_queue_state_destructor(
494 struct messaging_dgm_out_queue_state *state)
496 int *fds;
497 size_t num_fds;
499 if (state->subreq != NULL) {
501 * We're scheduled, but we're destroyed. This happens
502 * if the messaging_dgm_context is destroyed while
503 * we're stuck in a blocking send. There's nothing we
504 * can do but to leak memory.
506 TALLOC_FREE(state->subreq);
507 (void)talloc_reparent(state->req, NULL, state);
508 return -1;
511 fds = state->fds;
512 num_fds = talloc_array_length(fds);
513 close_fd_array(fds, num_fds);
514 return 0;
518 * tevent_queue callback that schedules the pthreadpool to actually
519 * send the queued message fragment.
522 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
523 void *private_data)
525 struct messaging_dgm_out_queue_state *state = tevent_req_data(
526 req, struct messaging_dgm_out_queue_state);
528 tevent_req_reset_endtime(req);
530 state->subreq = pthreadpool_tevent_job_send(
531 state, state->ev, state->pool,
532 messaging_dgm_out_threaded_job, state);
533 if (tevent_req_nomem(state->subreq, req)) {
534 return;
536 tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
537 req);
541 * Wrapper function run by the pthread that calls
542 * messaging_dgm_sendmsg() to actually do the sendmsg().
545 static void messaging_dgm_out_threaded_job(void *private_data)
547 struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
548 private_data, struct messaging_dgm_out_queue_state);
550 struct iovec iov = { .iov_base = state->buf,
551 .iov_len = talloc_get_size(state->buf) };
552 size_t num_fds = talloc_array_length(state->fds);
553 int msec = 1;
555 while (true) {
556 int ret;
558 state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
559 state->fds, num_fds, &state->err);
561 if (state->sent != -1) {
562 return;
564 if (state->err != ENOBUFS) {
565 return;
569 * ENOBUFS is the FreeBSD way of saying "Try
570 * again". We have to do polling.
572 do {
573 ret = poll(NULL, 0, msec);
574 } while ((ret == -1) && (errno == EINTR));
577 * Exponential backoff up to once a second
579 msec *= 2;
580 msec = MIN(msec, 1000);
585 * Pickup the results of the pthread sendmsg().
588 static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
590 struct tevent_req *req = tevent_req_callback_data(
591 subreq, struct tevent_req);
592 struct messaging_dgm_out_queue_state *state = tevent_req_data(
593 req, struct messaging_dgm_out_queue_state);
594 int ret;
596 if (subreq != state->subreq) {
597 abort();
600 ret = pthreadpool_tevent_job_recv(subreq);
602 TALLOC_FREE(subreq);
603 state->subreq = NULL;
605 if (tevent_req_error(req, ret)) {
606 return;
608 if (state->sent == -1) {
609 tevent_req_error(req, state->err);
610 return;
612 tevent_req_done(req);
615 static int messaging_dgm_out_queue_recv(struct tevent_req *req)
617 return tevent_req_simple_recv_unix(req);
620 static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
623 * Core function to send a message fragment given a
624 * connected struct messaging_dgm_out * destination.
625 * If no current queue tries to send nonblocking
626 * directly. If not, queues the fragment (which makes
627 * a copy of it) and adds a 60-second timeout on the send.
630 static int messaging_dgm_out_send_fragment(
631 struct tevent_context *ev, struct messaging_dgm_out *out,
632 const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
634 struct tevent_req *req;
635 size_t qlen;
636 bool ok;
638 qlen = tevent_queue_length(out->queue);
639 if (qlen == 0) {
640 ssize_t nsent;
641 int err = 0;
643 if (out->is_blocking) {
644 int ret = set_blocking(out->sock, false);
645 if (ret == -1) {
646 return errno;
648 out->is_blocking = false;
651 nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
652 num_fds, &err);
653 if (nsent >= 0) {
654 return 0;
657 if (err == ENOBUFS) {
659 * FreeBSD's way of telling us the dst socket
660 * is full. EWOULDBLOCK makes us spawn a
661 * polling helper thread.
663 err = EWOULDBLOCK;
666 if (err != EWOULDBLOCK) {
667 return err;
671 req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
672 fds, num_fds);
673 if (req == NULL) {
674 return ENOMEM;
676 tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
678 ok = tevent_req_set_endtime(req, ev,
679 tevent_timeval_current_ofs(60, 0));
680 if (!ok) {
681 TALLOC_FREE(req);
682 return ENOMEM;
685 return 0;
689 * Pickup the result of the fragment send. Reset idle timer
690 * if queue empty.
693 static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
695 struct messaging_dgm_out *out = tevent_req_callback_data(
696 req, struct messaging_dgm_out);
697 int ret;
699 ret = messaging_dgm_out_queue_recv(req);
700 TALLOC_FREE(req);
702 if (ret != 0) {
703 DBG_WARNING("messaging_out_queue_recv returned %s\n",
704 strerror(ret));
707 messaging_dgm_out_rearm_idle_timer(out);
711 struct messaging_dgm_fragment_hdr {
712 size_t msglen;
713 pid_t pid;
714 int sock;
718 * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
719 * size chunks and send it.
721 * Message fragments are prefixed by a 64-bit cookie that
722 * stays the same for all fragments. This allows the receiver
723 * to recognise fragments of the same message and re-assemble
724 * them on the other end.
726 * Note that this allows other message fragments from other
727 * senders to be interleaved in the receive read processing,
728 * the combination of the cookie and header info allows unique
729 * identification of the message from a specific sender in
730 * re-assembly.
732 * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
733 * then send a single message with cookie set to zero.
735 * Otherwise the message is fragmented into chunks and added
736 * to the sending queue. Any file descriptors are passed only
737 * in the last fragment.
739 * Finally the cookie is incremented (wrap over zero) to
740 * prepare for the next message sent to this channel.
744 static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
745 struct messaging_dgm_out *out,
746 const struct iovec *iov,
747 int iovlen,
748 const int *fds, size_t num_fds)
750 ssize_t msglen, sent;
751 int ret = 0;
752 struct iovec iov_copy[iovlen+2];
753 struct messaging_dgm_fragment_hdr hdr;
754 struct iovec src_iov;
756 if (iovlen < 0) {
757 return EINVAL;
760 msglen = iov_buflen(iov, iovlen);
761 if (msglen == -1) {
762 return EMSGSIZE;
764 if (num_fds > INT8_MAX) {
765 return EINVAL;
768 if ((size_t) msglen <=
769 (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
770 uint64_t cookie = 0;
772 iov_copy[0].iov_base = &cookie;
773 iov_copy[0].iov_len = sizeof(cookie);
774 if (iovlen > 0) {
775 memcpy(&iov_copy[1], iov,
776 sizeof(struct iovec) * iovlen);
779 return messaging_dgm_out_send_fragment(
780 ev, out, iov_copy, iovlen+1, fds, num_fds);
784 hdr = (struct messaging_dgm_fragment_hdr) {
785 .msglen = msglen,
786 .pid = getpid(),
787 .sock = out->sock
790 iov_copy[0].iov_base = &out->cookie;
791 iov_copy[0].iov_len = sizeof(out->cookie);
792 iov_copy[1].iov_base = &hdr;
793 iov_copy[1].iov_len = sizeof(hdr);
795 sent = 0;
796 src_iov = iov[0];
799 * The following write loop sends the user message in pieces. We have
800 * filled the first two iovecs above with "cookie" and "hdr". In the
801 * following loops we pull message chunks from the user iov array and
802 * fill iov_copy piece by piece, possibly truncating chunks from the
803 * caller's iov array. Ugly, but hopefully efficient.
806 while (sent < msglen) {
807 size_t fragment_len;
808 size_t iov_index = 2;
810 fragment_len = sizeof(out->cookie) + sizeof(hdr);
812 while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
813 size_t space, chunk;
815 space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
816 chunk = MIN(space, src_iov.iov_len);
818 iov_copy[iov_index].iov_base = src_iov.iov_base;
819 iov_copy[iov_index].iov_len = chunk;
820 iov_index += 1;
822 src_iov.iov_base = (char *)src_iov.iov_base + chunk;
823 src_iov.iov_len -= chunk;
824 fragment_len += chunk;
826 if (src_iov.iov_len == 0) {
827 iov += 1;
828 iovlen -= 1;
829 if (iovlen == 0) {
830 break;
832 src_iov = iov[0];
835 sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
838 * only the last fragment should pass the fd array.
839 * That simplifies the receiver a lot.
841 if (sent < msglen) {
842 ret = messaging_dgm_out_send_fragment(
843 ev, out, iov_copy, iov_index, NULL, 0);
844 } else {
845 ret = messaging_dgm_out_send_fragment(
846 ev, out, iov_copy, iov_index, fds, num_fds);
848 if (ret != 0) {
849 break;
853 out->cookie += 1;
854 if (out->cookie == 0) {
855 out->cookie += 1;
858 return ret;
861 static struct messaging_dgm_context *global_dgm_context;
863 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
865 static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
866 pid_t pid, int *plockfile_fd,
867 uint64_t *punique)
869 char buf[64];
870 int lockfile_fd;
871 struct sun_path_buf lockfile_name;
872 struct flock lck;
873 uint64_t unique;
874 int unique_len, ret;
875 ssize_t written;
877 ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
878 "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
879 if (ret < 0) {
880 return errno;
882 if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
883 return ENAMETOOLONG;
886 /* no O_EXCL, existence check is via the fcntl lock */
888 lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
889 0644);
891 if ((lockfile_fd == -1) &&
892 ((errno == ENXIO) /* Linux */ ||
893 (errno == ENODEV) /* Linux kernel bug */ ||
894 (errno == EOPNOTSUPP) /* FreeBSD */)) {
896 * Huh -- a socket? This might be a stale socket from
897 * an upgrade of Samba. Just unlink and retry, nobody
898 * else is supposed to be here at this time.
900 * Yes, this is racy, but I don't see a way to deal
901 * with this properly.
903 unlink(lockfile_name.buf);
905 lockfile_fd = open(lockfile_name.buf,
906 O_NONBLOCK|O_CREAT|O_WRONLY,
907 0644);
910 if (lockfile_fd == -1) {
911 ret = errno;
912 DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
913 return ret;
916 lck = (struct flock) {
917 .l_type = F_WRLCK,
918 .l_whence = SEEK_SET
921 ret = fcntl(lockfile_fd, F_SETLK, &lck);
922 if (ret == -1) {
923 ret = errno;
924 DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
925 goto fail_close;
929 * Directly using the binary value for
930 * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
931 * violation. But including all of ndr here just for this
932 * seems to be a bit overkill to me. Also, messages_dgm might
933 * be replaced sooner or later by something streams-based,
934 * where unique_id generation will be handled differently.
937 do {
938 generate_random_buffer((uint8_t *)&unique, sizeof(unique));
939 } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
941 unique_len = snprintf(buf, sizeof(buf), "%ju\n", (uintmax_t)unique);
943 /* shorten a potentially preexisting file */
945 ret = ftruncate(lockfile_fd, unique_len);
946 if (ret == -1) {
947 ret = errno;
948 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
949 strerror(ret)));
950 goto fail_unlink;
953 written = write(lockfile_fd, buf, unique_len);
954 if (written != unique_len) {
955 ret = errno;
956 DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
957 goto fail_unlink;
960 *plockfile_fd = lockfile_fd;
961 *punique = unique;
962 return 0;
964 fail_unlink:
965 unlink(lockfile_name.buf);
966 fail_close:
967 close(lockfile_fd);
968 return ret;
971 static void messaging_dgm_read_handler(struct tevent_context *ev,
972 struct tevent_fd *fde,
973 uint16_t flags,
974 void *private_data);
977 * Create the rendezvous point in the file system
978 * that other processes can use to send messages to
979 * this pid.
982 int messaging_dgm_init(struct tevent_context *ev,
983 uint64_t *punique,
984 const char *socket_dir,
985 const char *lockfile_dir,
986 void (*recv_cb)(struct tevent_context *ev,
987 const uint8_t *msg,
988 size_t msg_len,
989 int *fds,
990 size_t num_fds,
991 void *private_data),
992 void *recv_cb_private_data)
994 struct messaging_dgm_context *ctx;
995 int ret;
996 struct sockaddr_un socket_address;
997 size_t len;
998 static bool have_dgm_context = false;
1000 if (have_dgm_context) {
1001 return EEXIST;
1004 ctx = talloc_zero(NULL, struct messaging_dgm_context);
1005 if (ctx == NULL) {
1006 goto fail_nomem;
1008 ctx->ev = ev;
1009 ctx->pid = getpid();
1010 ctx->recv_cb = recv_cb;
1011 ctx->recv_cb_private_data = recv_cb_private_data;
1013 len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
1014 sizeof(ctx->lockfile_dir.buf));
1015 if (len >= sizeof(ctx->lockfile_dir.buf)) {
1016 TALLOC_FREE(ctx);
1017 return ENAMETOOLONG;
1020 len = strlcpy(ctx->socket_dir.buf, socket_dir,
1021 sizeof(ctx->socket_dir.buf));
1022 if (len >= sizeof(ctx->socket_dir.buf)) {
1023 TALLOC_FREE(ctx);
1024 return ENAMETOOLONG;
1027 socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
1028 len = snprintf(socket_address.sun_path,
1029 sizeof(socket_address.sun_path),
1030 "%s/%u", socket_dir, (unsigned)ctx->pid);
1031 if (len >= sizeof(socket_address.sun_path)) {
1032 TALLOC_FREE(ctx);
1033 return ENAMETOOLONG;
1036 ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
1037 punique);
1038 if (ret != 0) {
1039 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1040 __func__, strerror(ret)));
1041 TALLOC_FREE(ctx);
1042 return ret;
1045 unlink(socket_address.sun_path);
1047 ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
1048 if (ctx->sock == -1) {
1049 ret = errno;
1050 DBG_WARNING("socket failed: %s\n", strerror(ret));
1051 TALLOC_FREE(ctx);
1052 return ret;
1055 ret = prepare_socket_cloexec(ctx->sock);
1056 if (ret == -1) {
1057 ret = errno;
1058 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1059 strerror(ret));
1060 TALLOC_FREE(ctx);
1061 return ret;
1064 ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
1065 sizeof(socket_address));
1066 if (ret == -1) {
1067 ret = errno;
1068 DBG_WARNING("bind failed: %s\n", strerror(ret));
1069 TALLOC_FREE(ctx);
1070 return ret;
1073 talloc_set_destructor(ctx, messaging_dgm_context_destructor);
1075 ctx->have_dgm_context = &have_dgm_context;
1077 ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
1078 if (ret != 0) {
1079 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1080 strerror(ret));
1081 TALLOC_FREE(ctx);
1082 return ret;
1085 global_dgm_context = ctx;
1086 return 0;
1088 fail_nomem:
1089 TALLOC_FREE(ctx);
1090 return ENOMEM;
1094 * Remove the rendezvous point in the filesystem
1095 * if we're the owner.
1098 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
1100 while (c->outsocks != NULL) {
1101 TALLOC_FREE(c->outsocks);
1103 while (c->in_msgs != NULL) {
1104 TALLOC_FREE(c->in_msgs);
1106 while (c->fde_evs != NULL) {
1107 tevent_fd_set_flags(c->fde_evs->fde, 0);
1108 c->fde_evs->ctx = NULL;
1109 DLIST_REMOVE(c->fde_evs, c->fde_evs);
1112 close(c->sock);
1114 if (getpid() == c->pid) {
1115 struct sun_path_buf name;
1116 int ret;
1118 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1119 c->socket_dir.buf, (unsigned)c->pid);
1120 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1122 * We've checked the length when creating, so this
1123 * should never happen
1125 abort();
1127 unlink(name.buf);
1129 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1130 c->lockfile_dir.buf, (unsigned)c->pid);
1131 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1133 * We've checked the length when creating, so this
1134 * should never happen
1136 abort();
1138 unlink(name.buf);
1140 close(c->lockfile_fd);
1142 if (c->have_dgm_context != NULL) {
1143 *c->have_dgm_context = false;
1146 return 0;
1149 static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
1151 #ifdef DEVELOPER
1152 pid_t pid = getpid();
1153 struct sockaddr_storage addr;
1154 socklen_t addrlen = sizeof(addr);
1155 struct sockaddr_un *un_addr;
1156 struct sun_path_buf pathbuf;
1157 struct stat st1, st2;
1158 int ret;
1161 * Protect against using the wrong messaging context after a
1162 * fork without reinit_after_fork.
1165 ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
1166 if (ret == -1) {
1167 DBG_ERR("getsockname failed: %s\n", strerror(errno));
1168 goto fail;
1170 if (addr.ss_family != AF_UNIX) {
1171 DBG_ERR("getsockname returned family %d\n",
1172 (int)addr.ss_family);
1173 goto fail;
1175 un_addr = (struct sockaddr_un *)&addr;
1177 ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1178 "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
1179 if (ret < 0) {
1180 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1181 goto fail;
1183 if ((size_t)ret >= sizeof(pathbuf.buf)) {
1184 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1185 goto fail;
1188 if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
1189 DBG_ERR("sockname wrong: Expected %s, got %s\n",
1190 pathbuf.buf, un_addr->sun_path);
1191 goto fail;
1194 ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1195 "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
1196 if (ret < 0) {
1197 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1198 goto fail;
1200 if ((size_t)ret >= sizeof(pathbuf.buf)) {
1201 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1202 goto fail;
1205 ret = stat(pathbuf.buf, &st1);
1206 if (ret == -1) {
1207 DBG_ERR("stat failed: %s\n", strerror(errno));
1208 goto fail;
1210 ret = fstat(ctx->lockfile_fd, &st2);
1211 if (ret == -1) {
1212 DBG_ERR("fstat failed: %s\n", strerror(errno));
1213 goto fail;
1216 if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
1217 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1218 (int)st2.st_dev, (int)st2.st_ino,
1219 (int)st1.st_dev, (int)st1.st_ino);
1220 goto fail;
1223 return;
1224 fail:
1225 abort();
1226 #else
1227 return;
1228 #endif
1231 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1232 struct tevent_context *ev,
1233 uint8_t *msg, size_t msg_len,
1234 int *fds, size_t num_fds);
1237 * Raw read callback handler - passes to messaging_dgm_recv()
1238 * for fragment reassembly processing.
1241 static void messaging_dgm_read_handler(struct tevent_context *ev,
1242 struct tevent_fd *fde,
1243 uint16_t flags,
1244 void *private_data)
1246 struct messaging_dgm_context *ctx = talloc_get_type_abort(
1247 private_data, struct messaging_dgm_context);
1248 ssize_t received;
1249 struct msghdr msg;
1250 struct iovec iov;
1251 size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
1252 uint8_t msgbuf[msgbufsize];
1253 uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
1254 size_t num_fds;
1256 messaging_dgm_validate(ctx);
1258 if ((flags & TEVENT_FD_READ) == 0) {
1259 return;
1262 iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
1263 msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
1265 msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
1267 #ifdef MSG_CMSG_CLOEXEC
1268 msg.msg_flags |= MSG_CMSG_CLOEXEC;
1269 #endif
1271 received = recvmsg(ctx->sock, &msg, 0);
1272 if (received == -1) {
1273 if ((errno == EAGAIN) ||
1274 (errno == EWOULDBLOCK) ||
1275 (errno == EINTR) ||
1276 (errno == ENOMEM)) {
1277 /* Not really an error - just try again. */
1278 return;
1280 /* Problem with the socket. Set it unreadable. */
1281 tevent_fd_set_flags(fde, 0);
1282 return;
1285 if ((size_t)received > sizeof(buf)) {
1286 /* More than we expected, not for us */
1287 return;
1290 num_fds = msghdr_extract_fds(&msg, NULL, 0);
1291 if (num_fds == 0) {
1292 int fds[1];
1294 messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
1295 } else {
1296 size_t i;
1297 int fds[num_fds];
1299 msghdr_extract_fds(&msg, fds, num_fds);
1301 for (i = 0; i < num_fds; i++) {
1302 int err;
1304 err = prepare_socket_cloexec(fds[i]);
1305 if (err != 0) {
1306 close_fd_array(fds, num_fds);
1307 num_fds = 0;
1311 messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
1315 static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
1317 DLIST_REMOVE(m->ctx->in_msgs, m);
1318 return 0;
1322 * Deal with identification of fragmented messages and
1323 * re-assembly into full messages sent, then calls the
1324 * callback.
1327 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1328 struct tevent_context *ev,
1329 uint8_t *buf, size_t buflen,
1330 int *fds, size_t num_fds)
1332 struct messaging_dgm_fragment_hdr hdr;
1333 struct messaging_dgm_in_msg *msg;
1334 size_t space;
1335 uint64_t cookie;
1337 if (buflen < sizeof(cookie)) {
1338 goto close_fds;
1340 memcpy(&cookie, buf, sizeof(cookie));
1341 buf += sizeof(cookie);
1342 buflen -= sizeof(cookie);
1344 if (cookie == 0) {
1345 ctx->recv_cb(ev, buf, buflen, fds, num_fds,
1346 ctx->recv_cb_private_data);
1347 return;
1350 if (buflen < sizeof(hdr)) {
1351 goto close_fds;
1353 memcpy(&hdr, buf, sizeof(hdr));
1354 buf += sizeof(hdr);
1355 buflen -= sizeof(hdr);
1357 for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
1358 if ((msg->sender_pid == hdr.pid) &&
1359 (msg->sender_sock == hdr.sock)) {
1360 break;
1364 if ((msg != NULL) && (msg->cookie != cookie)) {
1365 TALLOC_FREE(msg);
1368 if (msg == NULL) {
1369 size_t msglen;
1370 msglen = offsetof(struct messaging_dgm_in_msg, buf) +
1371 hdr.msglen;
1373 msg = talloc_size(ctx, msglen);
1374 if (msg == NULL) {
1375 goto close_fds;
1377 talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
1379 *msg = (struct messaging_dgm_in_msg) {
1380 .ctx = ctx, .msglen = hdr.msglen,
1381 .sender_pid = hdr.pid, .sender_sock = hdr.sock,
1382 .cookie = cookie
1384 DLIST_ADD(ctx->in_msgs, msg);
1385 talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
1388 space = msg->msglen - msg->received;
1389 if (buflen > space) {
1390 goto close_fds;
1393 memcpy(msg->buf + msg->received, buf, buflen);
1394 msg->received += buflen;
1396 if (msg->received < msg->msglen) {
1398 * Any valid sender will send the fds in the last
1399 * block. Invalid senders might have sent fd's that we
1400 * need to close here.
1402 goto close_fds;
1405 DLIST_REMOVE(ctx->in_msgs, msg);
1406 talloc_set_destructor(msg, NULL);
1408 ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
1409 ctx->recv_cb_private_data);
1411 TALLOC_FREE(msg);
1412 return;
1414 close_fds:
1415 close_fd_array(fds, num_fds);
1418 void messaging_dgm_destroy(void)
1420 TALLOC_FREE(global_dgm_context);
1423 int messaging_dgm_send(pid_t pid,
1424 const struct iovec *iov, int iovlen,
1425 const int *fds, size_t num_fds)
1427 struct messaging_dgm_context *ctx = global_dgm_context;
1428 struct messaging_dgm_out *out;
1429 int ret;
1430 unsigned retries = 0;
1432 if (ctx == NULL) {
1433 return ENOTCONN;
1436 messaging_dgm_validate(ctx);
1438 again:
1439 ret = messaging_dgm_out_get(ctx, pid, &out);
1440 if (ret != 0) {
1441 return ret;
1444 DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
1446 ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
1447 fds, num_fds);
1448 if (ret == ECONNREFUSED) {
1450 * We cache outgoing sockets. If the receiver has
1451 * closed and re-opened the socket since our last
1452 * message, we get connection refused. Retry.
1455 TALLOC_FREE(out);
1457 if (retries < 5) {
1458 retries += 1;
1459 goto again;
1462 return ret;
1465 static int messaging_dgm_read_unique(int fd, uint64_t *punique)
1467 char buf[25];
1468 ssize_t rw_ret;
1469 int error = 0;
1470 unsigned long long unique;
1471 char *endptr;
1473 rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
1474 if (rw_ret == -1) {
1475 return errno;
1477 buf[rw_ret] = '\0';
1479 unique = smb_strtoull(buf, &endptr, 10, &error, SMB_STR_STANDARD);
1480 if (error != 0) {
1481 return error;
1484 if (endptr[0] != '\n') {
1485 return EINVAL;
1487 *punique = unique;
1488 return 0;
1491 int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
1493 struct messaging_dgm_context *ctx = global_dgm_context;
1494 struct sun_path_buf lockfile_name;
1495 int ret, fd;
1497 if (ctx == NULL) {
1498 return EBADF;
1501 messaging_dgm_validate(ctx);
1503 if (pid == getpid()) {
1505 * Protect against losing our own lock
1507 return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
1510 ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
1511 "%s/%u", ctx->lockfile_dir.buf, (int)pid);
1512 if (ret < 0) {
1513 return errno;
1515 if ((size_t)ret >= sizeof(lockfile_name.buf)) {
1516 return ENAMETOOLONG;
1519 fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
1520 if (fd == -1) {
1521 return errno;
1524 ret = messaging_dgm_read_unique(fd, unique);
1525 close(fd);
1526 return ret;
1529 int messaging_dgm_cleanup(pid_t pid)
1531 struct messaging_dgm_context *ctx = global_dgm_context;
1532 struct sun_path_buf lockfile_name, socket_name;
1533 int fd, len, ret;
1534 struct flock lck = {
1535 .l_pid = 0,
1538 if (ctx == NULL) {
1539 return ENOTCONN;
1542 len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
1543 ctx->socket_dir.buf, (unsigned)pid);
1544 if (len < 0) {
1545 return errno;
1547 if ((size_t)len >= sizeof(socket_name.buf)) {
1548 return ENAMETOOLONG;
1551 len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
1552 ctx->lockfile_dir.buf, (unsigned)pid);
1553 if (len < 0) {
1554 return errno;
1556 if ((size_t)len >= sizeof(lockfile_name.buf)) {
1557 return ENAMETOOLONG;
1560 fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
1561 if (fd == -1) {
1562 ret = errno;
1563 if (ret != ENOENT) {
1564 DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
1565 lockfile_name.buf, strerror(ret)));
1567 return ret;
1570 lck.l_type = F_WRLCK;
1571 lck.l_whence = SEEK_SET;
1572 lck.l_start = 0;
1573 lck.l_len = 0;
1575 ret = fcntl(fd, F_SETLK, &lck);
1576 if (ret != 0) {
1577 ret = errno;
1578 if ((ret != EACCES) && (ret != EAGAIN)) {
1579 DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
1580 strerror(ret)));
1582 close(fd);
1583 return ret;
1586 DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
1588 (void)unlink(socket_name.buf);
1589 (void)unlink(lockfile_name.buf);
1590 (void)close(fd);
1591 return 0;
1594 static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
1596 pid_t *our_pid = (pid_t *)private_data;
1597 int ret;
1599 if (pid == *our_pid) {
1601 * fcntl(F_GETLK) will succeed for ourselves, we hold
1602 * that lock ourselves.
1604 return 0;
1607 ret = messaging_dgm_cleanup(pid);
1608 DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1609 (unsigned long)pid, ret ? strerror(ret) : "ok"));
1611 return 0;
1614 int messaging_dgm_wipe(void)
1616 pid_t pid = getpid();
1617 messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
1618 return 0;
1621 int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
1622 void *private_data)
1624 struct messaging_dgm_context *ctx = global_dgm_context;
1625 DIR *msgdir;
1626 struct dirent *dp;
1627 int error = 0;
1629 if (ctx == NULL) {
1630 return ENOTCONN;
1633 messaging_dgm_validate(ctx);
1636 * We scan the socket directory and not the lock directory. Otherwise
1637 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1638 * and fcntl(SETLK).
1641 msgdir = opendir(ctx->socket_dir.buf);
1642 if (msgdir == NULL) {
1643 return errno;
1646 while ((dp = readdir(msgdir)) != NULL) {
1647 unsigned long pid;
1648 int ret;
1650 pid = smb_strtoul(dp->d_name, NULL, 10, &error, SMB_STR_STANDARD);
1651 if ((pid == 0) || (error != 0)) {
1653 * . and .. and other malformed entries
1655 continue;
1658 ret = fn(pid, private_data);
1659 if (ret != 0) {
1660 break;
1663 closedir(msgdir);
1665 return 0;
1668 struct messaging_dgm_fde {
1669 struct tevent_fd *fde;
1672 static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
1674 if (fde_ev->ctx != NULL) {
1675 DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
1676 fde_ev->ctx = NULL;
1678 return 0;
1682 * Reference counter for a struct tevent_fd messaging read event
1683 * (with callback function) on a struct tevent_context registered
1684 * on a messaging context.
1686 * If we've already registered this struct tevent_context before
1687 * (so already have a read event), just increase the reference count.
1689 * Otherwise create a new struct tevent_fd messaging read event on the
1690 * previously unseen struct tevent_context - this is what drives
1691 * the message receive processing.
1695 struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
1696 TALLOC_CTX *mem_ctx, struct tevent_context *ev)
1698 struct messaging_dgm_context *ctx = global_dgm_context;
1699 struct messaging_dgm_fde_ev *fde_ev;
1700 struct messaging_dgm_fde *fde;
1702 if (ctx == NULL) {
1703 return NULL;
1706 fde = talloc(mem_ctx, struct messaging_dgm_fde);
1707 if (fde == NULL) {
1708 return NULL;
1711 for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
1712 if (tevent_fd_get_flags(fde_ev->fde) == 0) {
1714 * If the event context got deleted,
1715 * tevent_fd_get_flags() will return 0
1716 * for the stale fde.
1718 * In that case we should not
1719 * use fde_ev->ev anymore.
1721 continue;
1723 if (fde_ev->ev == ev) {
1724 break;
1728 if (fde_ev == NULL) {
1729 fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
1730 if (fde_ev == NULL) {
1731 return NULL;
1733 fde_ev->fde = tevent_add_fd(
1734 ev, fde_ev, ctx->sock, TEVENT_FD_READ,
1735 messaging_dgm_read_handler, ctx);
1736 if (fde_ev->fde == NULL) {
1737 TALLOC_FREE(fde);
1738 return NULL;
1740 fde_ev->ev = ev;
1741 fde_ev->ctx = ctx;
1742 DLIST_ADD(ctx->fde_evs, fde_ev);
1743 talloc_set_destructor(
1744 fde_ev, messaging_dgm_fde_ev_destructor);
1745 } else {
1747 * Same trick as with tdb_wrap: The caller will never
1748 * see the talloc_referenced object, the
1749 * messaging_dgm_fde_ev, so problems with
1750 * talloc_unlink will not happen.
1752 if (talloc_reference(fde, fde_ev) == NULL) {
1753 TALLOC_FREE(fde);
1754 return NULL;
1758 fde->fde = fde_ev->fde;
1759 return fde;
1762 bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
1764 uint16_t flags;
1766 if (fde == NULL) {
1767 return false;
1769 flags = tevent_fd_get_flags(fde->fde);
1770 return (flags != 0);