netcmd: delegation: pep8 fix blank lines
[Samba.git] / lib / messaging / messages_dgm.c
blob1c96b2f94ed8db2d91bf3bbef8ccf2301f9ad0c3
1 /*
2 * Unix SMB/CIFS implementation.
3 * Samba internal messaging functions
4 * Copyright (C) 2013 by Volker Lendecke
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "util/util.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/select.h"
26 #include "lib/util/debug.h"
27 #include "messages_dgm.h"
28 #include "lib/util/genrand.h"
29 #include "lib/util/dlinklist.h"
30 #include "lib/pthreadpool/pthreadpool_tevent.h"
31 #include "lib/util/msghdr.h"
32 #include "lib/util/iov_buf.h"
33 #include "lib/util/blocking.h"
34 #include "lib/util/tevent_unix.h"
35 #include "lib/util/smb_strtox.h"
37 #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
39 struct sun_path_buf {
41 * This will carry enough for a socket path
43 char buf[sizeof(struct sockaddr_un)];
47 * We can only have one tevent_fd per dgm_context and per
48 * tevent_context. Maintain a list of registered tevent_contexts per
49 * dgm_context.
51 struct messaging_dgm_fde_ev {
52 struct messaging_dgm_fde_ev *prev, *next;
55 * Backreference to enable DLIST_REMOVE from our
56 * destructor. Also, set to NULL when the dgm_context dies
57 * before the messaging_dgm_fde_ev.
59 struct messaging_dgm_context *ctx;
61 struct tevent_context *ev;
62 struct tevent_fd *fde;
65 struct messaging_dgm_out {
66 struct messaging_dgm_out *prev, *next;
67 struct messaging_dgm_context *ctx;
69 pid_t pid;
70 int sock;
71 bool is_blocking;
72 uint64_t cookie;
74 struct tevent_queue *queue;
75 struct tevent_timer *idle_timer;
78 struct messaging_dgm_in_msg {
79 struct messaging_dgm_in_msg *prev, *next;
80 struct messaging_dgm_context *ctx;
81 size_t msglen;
82 size_t received;
83 pid_t sender_pid;
84 int sender_sock;
85 uint64_t cookie;
86 uint8_t buf[];
89 struct messaging_dgm_context {
90 struct tevent_context *ev;
91 pid_t pid;
92 struct sun_path_buf socket_dir;
93 struct sun_path_buf lockfile_dir;
94 int lockfile_fd;
96 int sock;
97 struct messaging_dgm_in_msg *in_msgs;
99 struct messaging_dgm_fde_ev *fde_evs;
100 void (*recv_cb)(struct tevent_context *ev,
101 const uint8_t *msg,
102 size_t msg_len,
103 int *fds,
104 size_t num_fds,
105 void *private_data);
106 void *recv_cb_private_data;
108 bool *have_dgm_context;
110 struct pthreadpool_tevent *pool;
111 struct messaging_dgm_out *outsocks;
114 /* Set socket close on exec. */
115 static int prepare_socket_cloexec(int sock)
117 #ifdef FD_CLOEXEC
118 int flags;
120 flags = fcntl(sock, F_GETFD, 0);
121 if (flags == -1) {
122 return errno;
124 flags |= FD_CLOEXEC;
125 if (fcntl(sock, F_SETFD, flags) == -1) {
126 return errno;
128 #endif
129 return 0;
132 static void close_fd_array(int *fds, size_t num_fds)
134 size_t i;
136 for (i = 0; i < num_fds; i++) {
137 if (fds[i] == -1) {
138 continue;
141 close(fds[i]);
142 fds[i] = -1;
147 * The idle handler can free the struct messaging_dgm_out *,
148 * if it's unused (qlen of zero) which closes the socket.
151 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
152 struct tevent_timer *te,
153 struct timeval current_time,
154 void *private_data)
156 struct messaging_dgm_out *out = talloc_get_type_abort(
157 private_data, struct messaging_dgm_out);
158 size_t qlen;
160 out->idle_timer = NULL;
162 qlen = tevent_queue_length(out->queue);
163 if (qlen == 0) {
164 TALLOC_FREE(out);
169 * Setup the idle handler to fire after 1 second if the
170 * queue is zero.
173 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
175 size_t qlen;
177 qlen = tevent_queue_length(out->queue);
178 if (qlen != 0) {
179 TALLOC_FREE(out->idle_timer);
180 return;
183 if (out->idle_timer != NULL) {
184 tevent_update_timer(out->idle_timer,
185 tevent_timeval_current_ofs(1, 0));
186 return;
189 out->idle_timer = tevent_add_timer(
190 out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
191 messaging_dgm_out_idle_handler, out);
193 * No NULL check, we'll come back here. Worst case we're
194 * leaking a bit.
198 static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
199 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
200 struct tevent_timer *te,
201 struct timeval current_time,
202 void *private_data);
205 * Connect to an existing rendezvous point for another
206 * pid - wrapped inside a struct messaging_dgm_out *.
209 static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
210 struct messaging_dgm_context *ctx,
211 pid_t pid, struct messaging_dgm_out **pout)
213 struct messaging_dgm_out *out;
214 struct sockaddr_un addr = { .sun_family = AF_UNIX };
215 int ret = ENOMEM;
216 int out_pathlen;
217 char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
219 out = talloc(mem_ctx, struct messaging_dgm_out);
220 if (out == NULL) {
221 goto fail;
224 *out = (struct messaging_dgm_out) {
225 .pid = pid,
226 .ctx = ctx,
227 .cookie = 1
230 out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
231 "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
232 if (out_pathlen < 0) {
233 goto errno_fail;
235 if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
236 ret = ENAMETOOLONG;
237 goto fail;
240 memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
242 out->queue = tevent_queue_create(out, addr.sun_path);
243 if (out->queue == NULL) {
244 ret = ENOMEM;
245 goto fail;
248 out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
249 if (out->sock == -1) {
250 goto errno_fail;
253 DLIST_ADD(ctx->outsocks, out);
254 talloc_set_destructor(out, messaging_dgm_out_destructor);
256 do {
257 ret = connect(out->sock,
258 (const struct sockaddr *)(const void *)&addr,
259 sizeof(addr));
260 } while ((ret == -1) && (errno == EINTR));
262 if (ret == -1) {
263 goto errno_fail;
266 ret = set_blocking(out->sock, false);
267 if (ret == -1) {
268 goto errno_fail;
270 out->is_blocking = false;
272 *pout = out;
273 return 0;
274 errno_fail:
275 ret = errno;
276 fail:
277 TALLOC_FREE(out);
278 return ret;
281 static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
283 DLIST_REMOVE(out->ctx->outsocks, out);
285 if ((tevent_queue_length(out->queue) != 0) &&
286 (tevent_cached_getpid() == out->ctx->pid)) {
288 * We have pending jobs. We can't close the socket,
289 * this has been handed over to messaging_dgm_out_queue_state.
291 return 0;
294 if (out->sock != -1) {
295 close(out->sock);
296 out->sock = -1;
298 return 0;
302 * Find the struct messaging_dgm_out * to talk to pid.
303 * If we don't have one, create it. Set the timer to
304 * delete after 1 sec.
307 static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
308 struct messaging_dgm_out **pout)
310 struct messaging_dgm_out *out;
311 int ret;
313 for (out = ctx->outsocks; out != NULL; out = out->next) {
314 if (out->pid == pid) {
315 break;
319 if (out == NULL) {
320 ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
321 if (ret != 0) {
322 return ret;
327 * shouldn't be possible, should be set if messaging_dgm_out_create
328 * succeeded. This check is to satisfy static checker
330 if (out == NULL) {
331 return EINVAL;
333 messaging_dgm_out_rearm_idle_timer(out);
335 *pout = out;
336 return 0;
340 * This function is called directly to send a message fragment
341 * when the outgoing queue is zero, and from a pthreadpool
342 * job thread when messages are being queued (qlen != 0).
343 * Make sure *ONLY* thread-safe functions are called within.
346 static ssize_t messaging_dgm_sendmsg(int sock,
347 const struct iovec *iov, int iovlen,
348 const int *fds, size_t num_fds,
349 int *perrno)
351 struct msghdr msg;
352 ssize_t fdlen, ret;
355 * Do the actual sendmsg syscall. This will be called from a
356 * pthreadpool helper thread, so be careful what you do here.
359 msg = (struct msghdr) {
360 .msg_iov = discard_const_p(struct iovec, iov),
361 .msg_iovlen = iovlen
364 fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
365 if (fdlen == -1) {
366 *perrno = EINVAL;
367 return -1;
371 uint8_t buf[fdlen];
373 msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
375 do {
376 ret = sendmsg(sock, &msg, 0);
377 } while ((ret == -1) && (errno == EINTR));
380 if (ret == -1) {
381 *perrno = errno;
383 return ret;
386 struct messaging_dgm_out_queue_state {
387 struct tevent_context *ev;
388 struct pthreadpool_tevent *pool;
390 struct tevent_req *req;
391 struct tevent_req *subreq;
393 int sock;
395 int *fds;
396 uint8_t *buf;
398 ssize_t sent;
399 int err;
402 static int messaging_dgm_out_queue_state_destructor(
403 struct messaging_dgm_out_queue_state *state);
404 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
405 void *private_data);
406 static void messaging_dgm_out_threaded_job(void *private_data);
407 static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
410 * Push a message fragment onto a queue to be sent by a
411 * threadpool job. Makes copies of data/fd's to be sent.
412 * The running tevent_queue internally creates an immediate
413 * event to schedule the write.
416 static struct tevent_req *messaging_dgm_out_queue_send(
417 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
418 struct messaging_dgm_out *out,
419 const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
421 struct tevent_req *req;
422 struct messaging_dgm_out_queue_state *state;
423 struct tevent_queue_entry *e;
424 size_t i;
425 ssize_t buflen;
427 req = tevent_req_create(out, &state,
428 struct messaging_dgm_out_queue_state);
429 if (req == NULL) {
430 return NULL;
432 state->ev = ev;
433 state->pool = out->ctx->pool;
434 state->sock = out->sock;
435 state->req = req;
438 * Go blocking in a thread
440 if (!out->is_blocking) {
441 int ret = set_blocking(out->sock, true);
442 if (ret == -1) {
443 tevent_req_error(req, errno);
444 return tevent_req_post(req, ev);
446 out->is_blocking = true;
449 buflen = iov_buflen(iov, iovlen);
450 if (buflen == -1) {
451 tevent_req_error(req, EMSGSIZE);
452 return tevent_req_post(req, ev);
455 state->buf = talloc_array(state, uint8_t, buflen);
456 if (tevent_req_nomem(state->buf, req)) {
457 return tevent_req_post(req, ev);
459 iov_buf(iov, iovlen, state->buf, buflen);
461 state->fds = talloc_array(state, int, num_fds);
462 if (tevent_req_nomem(state->fds, req)) {
463 return tevent_req_post(req, ev);
466 for (i=0; i<num_fds; i++) {
467 state->fds[i] = -1;
470 for (i=0; i<num_fds; i++) {
472 state->fds[i] = dup(fds[i]);
474 if (state->fds[i] == -1) {
475 int ret = errno;
477 close_fd_array(state->fds, num_fds);
479 tevent_req_error(req, ret);
480 return tevent_req_post(req, ev);
484 talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
486 e = tevent_queue_add_entry(out->queue, ev, req,
487 messaging_dgm_out_queue_trigger, req);
488 if (tevent_req_nomem(e, req)) {
489 return tevent_req_post(req, ev);
491 return req;
494 static int messaging_dgm_out_queue_state_destructor(
495 struct messaging_dgm_out_queue_state *state)
497 int *fds;
498 size_t num_fds;
500 if (state->subreq != NULL) {
502 * We're scheduled, but we're destroyed. This happens
503 * if the messaging_dgm_context is destroyed while
504 * we're stuck in a blocking send. There's nothing we
505 * can do but to leak memory.
507 TALLOC_FREE(state->subreq);
508 (void)talloc_reparent(state->req, NULL, state);
509 return -1;
512 fds = state->fds;
513 num_fds = talloc_array_length(fds);
514 close_fd_array(fds, num_fds);
515 return 0;
519 * tevent_queue callback that schedules the pthreadpool to actually
520 * send the queued message fragment.
523 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
524 void *private_data)
526 struct messaging_dgm_out_queue_state *state = tevent_req_data(
527 req, struct messaging_dgm_out_queue_state);
529 tevent_req_reset_endtime(req);
531 state->subreq = pthreadpool_tevent_job_send(
532 state, state->ev, state->pool,
533 messaging_dgm_out_threaded_job, state);
534 if (tevent_req_nomem(state->subreq, req)) {
535 return;
537 tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
538 req);
542 * Wrapper function run by the pthread that calls
543 * messaging_dgm_sendmsg() to actually do the sendmsg().
546 static void messaging_dgm_out_threaded_job(void *private_data)
548 struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
549 private_data, struct messaging_dgm_out_queue_state);
551 struct iovec iov = { .iov_base = state->buf,
552 .iov_len = talloc_get_size(state->buf) };
553 size_t num_fds = talloc_array_length(state->fds);
554 int msec = 1;
556 while (true) {
557 int ret;
559 state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
560 state->fds, num_fds, &state->err);
562 if (state->sent != -1) {
563 return;
565 if (state->err != ENOBUFS) {
566 return;
570 * ENOBUFS is the FreeBSD way of saying "Try
571 * again". We have to do polling.
573 do {
574 ret = poll(NULL, 0, msec);
575 } while ((ret == -1) && (errno == EINTR));
578 * Exponential backoff up to once a second
580 msec *= 2;
581 msec = MIN(msec, 1000);
586 * Pickup the results of the pthread sendmsg().
589 static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
591 struct tevent_req *req = tevent_req_callback_data(
592 subreq, struct tevent_req);
593 struct messaging_dgm_out_queue_state *state = tevent_req_data(
594 req, struct messaging_dgm_out_queue_state);
595 int ret;
597 if (subreq != state->subreq) {
598 abort();
601 ret = pthreadpool_tevent_job_recv(subreq);
603 TALLOC_FREE(subreq);
604 state->subreq = NULL;
606 if (tevent_req_error(req, ret)) {
607 return;
609 if (state->sent == -1) {
610 tevent_req_error(req, state->err);
611 return;
613 tevent_req_done(req);
616 static int messaging_dgm_out_queue_recv(struct tevent_req *req)
618 return tevent_req_simple_recv_unix(req);
621 static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
624 * Core function to send a message fragment given a
625 * connected struct messaging_dgm_out * destination.
626 * If no current queue tries to send nonblocking
627 * directly. If not, queues the fragment (which makes
628 * a copy of it) and adds a 60-second timeout on the send.
631 static int messaging_dgm_out_send_fragment(
632 struct tevent_context *ev, struct messaging_dgm_out *out,
633 const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
635 struct tevent_req *req;
636 size_t qlen;
637 bool ok;
639 qlen = tevent_queue_length(out->queue);
640 if (qlen == 0) {
641 ssize_t nsent;
642 int err = 0;
644 if (out->is_blocking) {
645 int ret = set_blocking(out->sock, false);
646 if (ret == -1) {
647 return errno;
649 out->is_blocking = false;
652 nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
653 num_fds, &err);
654 if (nsent >= 0) {
655 return 0;
658 if (err == ENOBUFS) {
660 * FreeBSD's way of telling us the dst socket
661 * is full. EWOULDBLOCK makes us spawn a
662 * polling helper thread.
664 err = EWOULDBLOCK;
667 if (err != EWOULDBLOCK) {
668 return err;
672 req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
673 fds, num_fds);
674 if (req == NULL) {
675 return ENOMEM;
677 tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
679 ok = tevent_req_set_endtime(req, ev,
680 tevent_timeval_current_ofs(60, 0));
681 if (!ok) {
682 TALLOC_FREE(req);
683 return ENOMEM;
686 return 0;
690 * Pickup the result of the fragment send. Reset idle timer
691 * if queue empty.
694 static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
696 struct messaging_dgm_out *out = tevent_req_callback_data(
697 req, struct messaging_dgm_out);
698 int ret;
700 ret = messaging_dgm_out_queue_recv(req);
701 TALLOC_FREE(req);
703 if (ret != 0) {
704 DBG_WARNING("messaging_out_queue_recv returned %s\n",
705 strerror(ret));
708 messaging_dgm_out_rearm_idle_timer(out);
712 struct messaging_dgm_fragment_hdr {
713 size_t msglen;
714 pid_t pid;
715 int sock;
719 * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
720 * size chunks and send it.
722 * Message fragments are prefixed by a 64-bit cookie that
723 * stays the same for all fragments. This allows the receiver
724 * to recognise fragments of the same message and re-assemble
725 * them on the other end.
727 * Note that this allows other message fragments from other
728 * senders to be interleaved in the receive read processing,
729 * the combination of the cookie and header info allows unique
730 * identification of the message from a specific sender in
731 * re-assembly.
733 * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
734 * then send a single message with cookie set to zero.
736 * Otherwise the message is fragmented into chunks and added
737 * to the sending queue. Any file descriptors are passed only
738 * in the last fragment.
740 * Finally the cookie is incremented (wrap over zero) to
741 * prepare for the next message sent to this channel.
745 static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
746 struct messaging_dgm_out *out,
747 const struct iovec *iov,
748 int iovlen,
749 const int *fds, size_t num_fds)
751 ssize_t msglen, sent;
752 int ret = 0;
753 struct iovec iov_copy[iovlen+2];
754 struct messaging_dgm_fragment_hdr hdr;
755 struct iovec src_iov;
757 if (iovlen < 0) {
758 return EINVAL;
761 msglen = iov_buflen(iov, iovlen);
762 if (msglen == -1) {
763 return EMSGSIZE;
765 if (num_fds > INT8_MAX) {
766 return EINVAL;
769 if ((size_t) msglen <=
770 (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
771 uint64_t cookie = 0;
773 iov_copy[0].iov_base = &cookie;
774 iov_copy[0].iov_len = sizeof(cookie);
775 if (iovlen > 0) {
776 memcpy(&iov_copy[1], iov,
777 sizeof(struct iovec) * iovlen);
780 return messaging_dgm_out_send_fragment(
781 ev, out, iov_copy, iovlen+1, fds, num_fds);
785 hdr = (struct messaging_dgm_fragment_hdr) {
786 .msglen = msglen,
787 .pid = tevent_cached_getpid(),
788 .sock = out->sock
791 iov_copy[0].iov_base = &out->cookie;
792 iov_copy[0].iov_len = sizeof(out->cookie);
793 iov_copy[1].iov_base = &hdr;
794 iov_copy[1].iov_len = sizeof(hdr);
796 sent = 0;
797 src_iov = iov[0];
800 * The following write loop sends the user message in pieces. We have
801 * filled the first two iovecs above with "cookie" and "hdr". In the
802 * following loops we pull message chunks from the user iov array and
803 * fill iov_copy piece by piece, possibly truncating chunks from the
804 * caller's iov array. Ugly, but hopefully efficient.
807 while (sent < msglen) {
808 size_t fragment_len;
809 size_t iov_index = 2;
811 fragment_len = sizeof(out->cookie) + sizeof(hdr);
813 while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
814 size_t space, chunk;
816 space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
817 chunk = MIN(space, src_iov.iov_len);
819 iov_copy[iov_index].iov_base = src_iov.iov_base;
820 iov_copy[iov_index].iov_len = chunk;
821 iov_index += 1;
823 src_iov.iov_base = (char *)src_iov.iov_base + chunk;
824 src_iov.iov_len -= chunk;
825 fragment_len += chunk;
827 if (src_iov.iov_len == 0) {
828 iov += 1;
829 iovlen -= 1;
830 if (iovlen == 0) {
831 break;
833 src_iov = iov[0];
836 sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
839 * only the last fragment should pass the fd array.
840 * That simplifies the receiver a lot.
842 if (sent < msglen) {
843 ret = messaging_dgm_out_send_fragment(
844 ev, out, iov_copy, iov_index, NULL, 0);
845 } else {
846 ret = messaging_dgm_out_send_fragment(
847 ev, out, iov_copy, iov_index, fds, num_fds);
849 if (ret != 0) {
850 break;
854 out->cookie += 1;
855 if (out->cookie == 0) {
856 out->cookie += 1;
859 return ret;
862 static struct messaging_dgm_context *global_dgm_context;
864 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
866 static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
867 pid_t pid, int *plockfile_fd,
868 uint64_t *punique)
870 char buf[64];
871 int lockfile_fd;
872 struct sun_path_buf lockfile_name;
873 struct flock lck;
874 uint64_t unique;
875 int unique_len, ret;
876 ssize_t written;
878 ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
879 "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
880 if (ret < 0) {
881 return errno;
883 if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
884 return ENAMETOOLONG;
887 /* no O_EXCL, existence check is via the fcntl lock */
889 lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
890 0644);
892 if ((lockfile_fd == -1) &&
893 ((errno == ENXIO) /* Linux */ ||
894 (errno == ENODEV) /* Linux kernel bug */ ||
895 (errno == EOPNOTSUPP) /* FreeBSD */)) {
897 * Huh -- a socket? This might be a stale socket from
898 * an upgrade of Samba. Just unlink and retry, nobody
899 * else is supposed to be here at this time.
901 * Yes, this is racy, but I don't see a way to deal
902 * with this properly.
904 unlink(lockfile_name.buf);
906 lockfile_fd = open(lockfile_name.buf,
907 O_NONBLOCK|O_CREAT|O_WRONLY,
908 0644);
911 if (lockfile_fd == -1) {
912 ret = errno;
913 DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
914 return ret;
917 lck = (struct flock) {
918 .l_type = F_WRLCK,
919 .l_whence = SEEK_SET
922 ret = fcntl(lockfile_fd, F_SETLK, &lck);
923 if (ret == -1) {
924 ret = errno;
925 DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
926 goto fail_close;
930 * Directly using the binary value for
931 * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
932 * violation. But including all of ndr here just for this
933 * seems to be a bit overkill to me. Also, messages_dgm might
934 * be replaced sooner or later by something streams-based,
935 * where unique_id generation will be handled differently.
938 do {
939 generate_random_buffer((uint8_t *)&unique, sizeof(unique));
940 } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
942 unique_len = snprintf(buf, sizeof(buf), "%"PRIu64"\n", unique);
944 /* shorten a potentially preexisting file */
946 ret = ftruncate(lockfile_fd, unique_len);
947 if (ret == -1) {
948 ret = errno;
949 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
950 strerror(ret)));
951 goto fail_unlink;
954 written = write(lockfile_fd, buf, unique_len);
955 if (written != unique_len) {
956 ret = errno;
957 DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
958 goto fail_unlink;
961 *plockfile_fd = lockfile_fd;
962 *punique = unique;
963 return 0;
965 fail_unlink:
966 unlink(lockfile_name.buf);
967 fail_close:
968 close(lockfile_fd);
969 return ret;
972 static void messaging_dgm_read_handler(struct tevent_context *ev,
973 struct tevent_fd *fde,
974 uint16_t flags,
975 void *private_data);
978 * Create the rendezvous point in the file system
979 * that other processes can use to send messages to
980 * this pid.
983 int messaging_dgm_init(struct tevent_context *ev,
984 uint64_t *punique,
985 const char *socket_dir,
986 const char *lockfile_dir,
987 void (*recv_cb)(struct tevent_context *ev,
988 const uint8_t *msg,
989 size_t msg_len,
990 int *fds,
991 size_t num_fds,
992 void *private_data),
993 void *recv_cb_private_data)
995 struct messaging_dgm_context *ctx;
996 int ret;
997 struct sockaddr_un socket_address;
998 size_t len;
999 static bool have_dgm_context = false;
1001 if (have_dgm_context) {
1002 return EEXIST;
1005 if ((socket_dir == NULL) || (lockfile_dir == NULL)) {
1006 return EINVAL;
1009 ctx = talloc_zero(NULL, struct messaging_dgm_context);
1010 if (ctx == NULL) {
1011 goto fail_nomem;
1013 ctx->ev = ev;
1014 ctx->pid = tevent_cached_getpid();
1015 ctx->recv_cb = recv_cb;
1016 ctx->recv_cb_private_data = recv_cb_private_data;
1018 len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
1019 sizeof(ctx->lockfile_dir.buf));
1020 if (len >= sizeof(ctx->lockfile_dir.buf)) {
1021 TALLOC_FREE(ctx);
1022 return ENAMETOOLONG;
1025 len = strlcpy(ctx->socket_dir.buf, socket_dir,
1026 sizeof(ctx->socket_dir.buf));
1027 if (len >= sizeof(ctx->socket_dir.buf)) {
1028 TALLOC_FREE(ctx);
1029 return ENAMETOOLONG;
1032 socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
1033 len = snprintf(socket_address.sun_path,
1034 sizeof(socket_address.sun_path),
1035 "%s/%u", socket_dir, (unsigned)ctx->pid);
1036 if (len >= sizeof(socket_address.sun_path)) {
1037 TALLOC_FREE(ctx);
1038 return ENAMETOOLONG;
1041 ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
1042 punique);
1043 if (ret != 0) {
1044 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1045 __func__, strerror(ret)));
1046 TALLOC_FREE(ctx);
1047 return ret;
1050 unlink(socket_address.sun_path);
1052 ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
1053 if (ctx->sock == -1) {
1054 ret = errno;
1055 DBG_WARNING("socket failed: %s\n", strerror(ret));
1056 TALLOC_FREE(ctx);
1057 return ret;
1060 ret = prepare_socket_cloexec(ctx->sock);
1061 if (ret == -1) {
1062 ret = errno;
1063 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1064 strerror(ret));
1065 TALLOC_FREE(ctx);
1066 return ret;
1069 ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
1070 sizeof(socket_address));
1071 if (ret == -1) {
1072 ret = errno;
1073 DBG_WARNING("bind failed: %s\n", strerror(ret));
1074 TALLOC_FREE(ctx);
1075 return ret;
1078 talloc_set_destructor(ctx, messaging_dgm_context_destructor);
1080 ctx->have_dgm_context = &have_dgm_context;
1082 ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
1083 if (ret != 0) {
1084 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1085 strerror(ret));
1086 TALLOC_FREE(ctx);
1087 return ret;
1090 global_dgm_context = ctx;
1091 return 0;
1093 fail_nomem:
1094 TALLOC_FREE(ctx);
1095 return ENOMEM;
1099 * Remove the rendezvous point in the filesystem
1100 * if we're the owner.
1103 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
1105 while (c->outsocks != NULL) {
1106 TALLOC_FREE(c->outsocks);
1108 while (c->in_msgs != NULL) {
1109 TALLOC_FREE(c->in_msgs);
1111 while (c->fde_evs != NULL) {
1112 tevent_fd_set_flags(c->fde_evs->fde, 0);
1113 c->fde_evs->ctx = NULL;
1114 DLIST_REMOVE(c->fde_evs, c->fde_evs);
1117 close(c->sock);
1119 if (tevent_cached_getpid() == c->pid) {
1120 struct sun_path_buf name;
1121 int ret;
1123 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1124 c->socket_dir.buf, (unsigned)c->pid);
1125 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1127 * We've checked the length when creating, so this
1128 * should never happen
1130 abort();
1132 unlink(name.buf);
1134 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1135 c->lockfile_dir.buf, (unsigned)c->pid);
1136 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1138 * We've checked the length when creating, so this
1139 * should never happen
1141 abort();
1143 unlink(name.buf);
1145 close(c->lockfile_fd);
1147 if (c->have_dgm_context != NULL) {
1148 *c->have_dgm_context = false;
1151 return 0;
1154 static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
1156 #ifdef DEVELOPER
1157 pid_t pid = tevent_cached_getpid();
1158 struct sockaddr_storage addr;
1159 socklen_t addrlen = sizeof(addr);
1160 struct sockaddr_un *un_addr;
1161 struct sun_path_buf pathbuf;
1162 struct stat st1, st2;
1163 int ret;
1166 * Protect against using the wrong messaging context after a
1167 * fork without reinit_after_fork.
1170 ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
1171 if (ret == -1) {
1172 DBG_ERR("getsockname failed: %s\n", strerror(errno));
1173 goto fail;
1175 if (addr.ss_family != AF_UNIX) {
1176 DBG_ERR("getsockname returned family %d\n",
1177 (int)addr.ss_family);
1178 goto fail;
1180 un_addr = (struct sockaddr_un *)&addr;
1182 ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1183 "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
1184 if (ret < 0) {
1185 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1186 goto fail;
1188 if ((size_t)ret >= sizeof(pathbuf.buf)) {
1189 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1190 goto fail;
1193 if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
1194 DBG_ERR("sockname wrong: Expected %s, got %s\n",
1195 pathbuf.buf, un_addr->sun_path);
1196 goto fail;
1199 ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1200 "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
1201 if (ret < 0) {
1202 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1203 goto fail;
1205 if ((size_t)ret >= sizeof(pathbuf.buf)) {
1206 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1207 goto fail;
1210 ret = stat(pathbuf.buf, &st1);
1211 if (ret == -1) {
1212 DBG_ERR("stat failed: %s\n", strerror(errno));
1213 goto fail;
1215 ret = fstat(ctx->lockfile_fd, &st2);
1216 if (ret == -1) {
1217 DBG_ERR("fstat failed: %s\n", strerror(errno));
1218 goto fail;
1221 if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
1222 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1223 (int)st2.st_dev, (int)st2.st_ino,
1224 (int)st1.st_dev, (int)st1.st_ino);
1225 goto fail;
1228 return;
1229 fail:
1230 abort();
1231 #else
1232 return;
1233 #endif
1236 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1237 struct tevent_context *ev,
1238 uint8_t *msg, size_t msg_len,
1239 int *fds, size_t num_fds);
1242 * Raw read callback handler - passes to messaging_dgm_recv()
1243 * for fragment reassembly processing.
1246 static void messaging_dgm_read_handler(struct tevent_context *ev,
1247 struct tevent_fd *fde,
1248 uint16_t flags,
1249 void *private_data)
1251 struct messaging_dgm_context *ctx = talloc_get_type_abort(
1252 private_data, struct messaging_dgm_context);
1253 ssize_t received;
1254 struct msghdr msg;
1255 struct iovec iov;
1256 size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
1257 uint8_t msgbuf[msgbufsize];
1258 uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
1259 size_t num_fds;
1261 messaging_dgm_validate(ctx);
1263 if ((flags & TEVENT_FD_READ) == 0) {
1264 return;
1267 iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
1268 msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
1270 msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
1272 #ifdef MSG_CMSG_CLOEXEC
1273 msg.msg_flags |= MSG_CMSG_CLOEXEC;
1274 #endif
1276 received = recvmsg(ctx->sock, &msg, 0);
1277 if (received == -1) {
1278 if ((errno == EAGAIN) ||
1279 (errno == EWOULDBLOCK) ||
1280 (errno == EINTR) ||
1281 (errno == ENOMEM)) {
1282 /* Not really an error - just try again. */
1283 return;
1285 /* Problem with the socket. Set it unreadable. */
1286 tevent_fd_set_flags(fde, 0);
1287 return;
1290 if ((size_t)received > sizeof(buf)) {
1291 /* More than we expected, not for us */
1292 return;
1295 num_fds = msghdr_extract_fds(&msg, NULL, 0);
1296 if (num_fds == 0) {
1297 int fds[1];
1299 messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
1300 } else {
1301 size_t i;
1302 int fds[num_fds];
1304 msghdr_extract_fds(&msg, fds, num_fds);
1306 for (i = 0; i < num_fds; i++) {
1307 int err;
1309 err = prepare_socket_cloexec(fds[i]);
1310 if (err != 0) {
1311 close_fd_array(fds, num_fds);
1312 num_fds = 0;
1316 messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
1320 static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
1322 DLIST_REMOVE(m->ctx->in_msgs, m);
1323 return 0;
1326 static void messaging_dgm_close_unconsumed(int *fds, size_t num_fds)
1328 size_t i;
1330 for (i=0; i<num_fds; i++) {
1331 if (fds[i] != -1) {
1332 close(fds[i]);
1333 fds[i] = -1;
1339 * Deal with identification of fragmented messages and
1340 * re-assembly into full messages sent, then calls the
1341 * callback.
1344 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1345 struct tevent_context *ev,
1346 uint8_t *buf, size_t buflen,
1347 int *fds, size_t num_fds)
1349 struct messaging_dgm_fragment_hdr hdr;
1350 struct messaging_dgm_in_msg *msg;
1351 size_t space;
1352 uint64_t cookie;
1354 if (buflen < sizeof(cookie)) {
1355 goto close_fds;
1357 memcpy(&cookie, buf, sizeof(cookie));
1358 buf += sizeof(cookie);
1359 buflen -= sizeof(cookie);
1361 if (cookie == 0) {
1362 ctx->recv_cb(ev, buf, buflen, fds, num_fds,
1363 ctx->recv_cb_private_data);
1364 messaging_dgm_close_unconsumed(fds, num_fds);
1365 return;
1368 if (buflen < sizeof(hdr)) {
1369 goto close_fds;
1371 memcpy(&hdr, buf, sizeof(hdr));
1372 buf += sizeof(hdr);
1373 buflen -= sizeof(hdr);
1375 for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
1376 if ((msg->sender_pid == hdr.pid) &&
1377 (msg->sender_sock == hdr.sock)) {
1378 break;
1382 if ((msg != NULL) && (msg->cookie != cookie)) {
1383 TALLOC_FREE(msg);
1386 if (msg == NULL) {
1387 size_t msglen;
1388 msglen = offsetof(struct messaging_dgm_in_msg, buf) +
1389 hdr.msglen;
1391 msg = talloc_size(ctx, msglen);
1392 if (msg == NULL) {
1393 goto close_fds;
1395 talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
1397 *msg = (struct messaging_dgm_in_msg) {
1398 .ctx = ctx, .msglen = hdr.msglen,
1399 .sender_pid = hdr.pid, .sender_sock = hdr.sock,
1400 .cookie = cookie
1402 DLIST_ADD(ctx->in_msgs, msg);
1403 talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
1406 space = msg->msglen - msg->received;
1407 if (buflen > space) {
1408 goto close_fds;
1411 memcpy(msg->buf + msg->received, buf, buflen);
1412 msg->received += buflen;
1414 if (msg->received < msg->msglen) {
1416 * Any valid sender will send the fds in the last
1417 * block. Invalid senders might have sent fd's that we
1418 * need to close here.
1420 goto close_fds;
1423 DLIST_REMOVE(ctx->in_msgs, msg);
1424 talloc_set_destructor(msg, NULL);
1426 ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
1427 ctx->recv_cb_private_data);
1428 messaging_dgm_close_unconsumed(fds, num_fds);
1430 TALLOC_FREE(msg);
1431 return;
1433 close_fds:
1434 close_fd_array(fds, num_fds);
1437 void messaging_dgm_destroy(void)
1439 TALLOC_FREE(global_dgm_context);
1442 int messaging_dgm_send(pid_t pid,
1443 const struct iovec *iov, int iovlen,
1444 const int *fds, size_t num_fds)
1446 struct messaging_dgm_context *ctx = global_dgm_context;
1447 struct messaging_dgm_out *out;
1448 int ret;
1449 unsigned retries = 0;
1451 if (ctx == NULL) {
1452 return ENOTCONN;
1455 messaging_dgm_validate(ctx);
1457 again:
1458 ret = messaging_dgm_out_get(ctx, pid, &out);
1459 if (ret != 0) {
1460 return ret;
1463 DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
1465 ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
1466 fds, num_fds);
1467 if (ret == ECONNREFUSED) {
1469 * We cache outgoing sockets. If the receiver has
1470 * closed and re-opened the socket since our last
1471 * message, we get connection refused. Retry.
1474 TALLOC_FREE(out);
1476 if (retries < 5) {
1477 retries += 1;
1478 goto again;
1481 return ret;
1484 static int messaging_dgm_read_unique(int fd, uint64_t *punique)
1486 char buf[25];
1487 ssize_t rw_ret;
1488 int error = 0;
1489 unsigned long long unique;
1490 char *endptr;
1492 rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
1493 if (rw_ret == -1) {
1494 return errno;
1496 buf[rw_ret] = '\0';
1498 unique = smb_strtoull(buf, &endptr, 10, &error, SMB_STR_STANDARD);
1499 if (error != 0) {
1500 return error;
1503 if (endptr[0] != '\n') {
1504 return EINVAL;
1506 *punique = unique;
1507 return 0;
1510 int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
1512 struct messaging_dgm_context *ctx = global_dgm_context;
1513 struct sun_path_buf lockfile_name;
1514 int ret, fd;
1516 if (ctx == NULL) {
1517 return EBADF;
1520 messaging_dgm_validate(ctx);
1522 if (pid == tevent_cached_getpid()) {
1524 * Protect against losing our own lock
1526 return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
1529 ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
1530 "%s/%u", ctx->lockfile_dir.buf, (int)pid);
1531 if (ret < 0) {
1532 return errno;
1534 if ((size_t)ret >= sizeof(lockfile_name.buf)) {
1535 return ENAMETOOLONG;
1538 fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
1539 if (fd == -1) {
1540 return errno;
1543 ret = messaging_dgm_read_unique(fd, unique);
1544 close(fd);
1545 return ret;
1548 int messaging_dgm_cleanup(pid_t pid)
1550 struct messaging_dgm_context *ctx = global_dgm_context;
1551 struct sun_path_buf lockfile_name, socket_name;
1552 int fd, len, ret;
1553 struct flock lck = {
1554 .l_pid = 0,
1557 if (ctx == NULL) {
1558 return ENOTCONN;
1561 len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
1562 ctx->socket_dir.buf, (unsigned)pid);
1563 if (len < 0) {
1564 return errno;
1566 if ((size_t)len >= sizeof(socket_name.buf)) {
1567 return ENAMETOOLONG;
1570 len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
1571 ctx->lockfile_dir.buf, (unsigned)pid);
1572 if (len < 0) {
1573 return errno;
1575 if ((size_t)len >= sizeof(lockfile_name.buf)) {
1576 return ENAMETOOLONG;
1579 fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
1580 if (fd == -1) {
1581 ret = errno;
1582 if (ret != ENOENT) {
1583 DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
1584 lockfile_name.buf, strerror(ret)));
1586 return ret;
1589 lck.l_type = F_WRLCK;
1590 lck.l_whence = SEEK_SET;
1591 lck.l_start = 0;
1592 lck.l_len = 0;
1594 ret = fcntl(fd, F_SETLK, &lck);
1595 if (ret != 0) {
1596 ret = errno;
1597 if ((ret != EACCES) && (ret != EAGAIN)) {
1598 DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
1599 strerror(ret)));
1601 close(fd);
1602 return ret;
1605 DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
1607 (void)unlink(socket_name.buf);
1608 (void)unlink(lockfile_name.buf);
1609 (void)close(fd);
1610 return 0;
1613 static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
1615 pid_t *our_pid = (pid_t *)private_data;
1616 int ret;
1618 if (pid == *our_pid) {
1620 * fcntl(F_GETLK) will succeed for ourselves, we hold
1621 * that lock ourselves.
1623 return 0;
1626 ret = messaging_dgm_cleanup(pid);
1627 DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1628 (unsigned long)pid, ret ? strerror(ret) : "ok"));
1630 return 0;
1633 int messaging_dgm_wipe(void)
1635 pid_t pid = tevent_cached_getpid();
1636 messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
1637 return 0;
1640 int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
1641 void *private_data)
1643 struct messaging_dgm_context *ctx = global_dgm_context;
1644 DIR *msgdir;
1645 struct dirent *dp;
1646 int error = 0;
1648 if (ctx == NULL) {
1649 return ENOTCONN;
1652 messaging_dgm_validate(ctx);
1655 * We scan the socket directory and not the lock directory. Otherwise
1656 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1657 * and fcntl(SETLK).
1660 msgdir = opendir(ctx->socket_dir.buf);
1661 if (msgdir == NULL) {
1662 return errno;
1665 while ((dp = readdir(msgdir)) != NULL) {
1666 unsigned long pid;
1667 int ret;
1669 pid = smb_strtoul(dp->d_name, NULL, 10, &error, SMB_STR_STANDARD);
1670 if ((pid == 0) || (error != 0)) {
1672 * . and .. and other malformed entries
1674 continue;
1677 ret = fn(pid, private_data);
1678 if (ret != 0) {
1679 break;
1682 closedir(msgdir);
1684 return 0;
1687 struct messaging_dgm_fde {
1688 struct tevent_fd *fde;
1691 static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
1693 if (fde_ev->ctx != NULL) {
1694 DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
1695 fde_ev->ctx = NULL;
1697 return 0;
1701 * Reference counter for a struct tevent_fd messaging read event
1702 * (with callback function) on a struct tevent_context registered
1703 * on a messaging context.
1705 * If we've already registered this struct tevent_context before
1706 * (so already have a read event), just increase the reference count.
1708 * Otherwise create a new struct tevent_fd messaging read event on the
1709 * previously unseen struct tevent_context - this is what drives
1710 * the message receive processing.
1714 struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
1715 TALLOC_CTX *mem_ctx, struct tevent_context *ev)
1717 struct messaging_dgm_context *ctx = global_dgm_context;
1718 struct messaging_dgm_fde_ev *fde_ev;
1719 struct messaging_dgm_fde *fde;
1721 if (ctx == NULL) {
1722 return NULL;
1725 fde = talloc(mem_ctx, struct messaging_dgm_fde);
1726 if (fde == NULL) {
1727 return NULL;
1730 for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
1731 if (tevent_fd_get_flags(fde_ev->fde) == 0) {
1733 * If the event context got deleted,
1734 * tevent_fd_get_flags() will return 0
1735 * for the stale fde.
1737 * In that case we should not
1738 * use fde_ev->ev anymore.
1740 continue;
1742 if (fde_ev->ev == ev) {
1743 break;
1747 if (fde_ev == NULL) {
1748 fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
1749 if (fde_ev == NULL) {
1750 return NULL;
1752 fde_ev->fde = tevent_add_fd(
1753 ev, fde_ev, ctx->sock, TEVENT_FD_READ,
1754 messaging_dgm_read_handler, ctx);
1755 if (fde_ev->fde == NULL) {
1756 TALLOC_FREE(fde);
1757 return NULL;
1759 fde_ev->ev = ev;
1760 fde_ev->ctx = ctx;
1761 DLIST_ADD(ctx->fde_evs, fde_ev);
1762 talloc_set_destructor(
1763 fde_ev, messaging_dgm_fde_ev_destructor);
1764 } else {
1766 * Same trick as with tdb_wrap: The caller will never
1767 * see the talloc_referenced object, the
1768 * messaging_dgm_fde_ev, so problems with
1769 * talloc_unlink will not happen.
1771 if (talloc_reference(fde, fde_ev) == NULL) {
1772 TALLOC_FREE(fde);
1773 return NULL;
1777 fde->fde = fde_ev->fde;
1778 return fde;
1781 bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
1783 uint16_t flags;
1785 if (fde == NULL) {
1786 return false;
1788 flags = tevent_fd_get_flags(fde->fde);
1789 return (flags != 0);