2 * Unix SMB/CIFS implementation.
3 * Samba internal messaging functions
4 * Copyright (C) 2013 by Volker Lendecke
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/select.h"
25 #include "lib/util/debug.h"
26 #include "lib/messages_dgm.h"
27 #include "lib/util/genrand.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/pthreadpool/pthreadpool_tevent.h"
30 #include "lib/util/msghdr.h"
31 #include "lib/util/iov_buf.h"
32 #include "lib/util/blocking.h"
33 #include "lib/util/tevent_unix.h"
35 #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
39 * This will carry enough for a socket path
41 char buf
[sizeof(struct sockaddr_un
)];
45 * We can only have one tevent_fd per dgm_context and per
46 * tevent_context. Maintain a list of registered tevent_contexts per
49 struct messaging_dgm_fde_ev
{
50 struct messaging_dgm_fde_ev
*prev
, *next
;
53 * Backreference to enable DLIST_REMOVE from our
54 * destructor. Also, set to NULL when the dgm_context dies
55 * before the messaging_dgm_fde_ev.
57 struct messaging_dgm_context
*ctx
;
59 struct tevent_context
*ev
;
60 struct tevent_fd
*fde
;
63 struct messaging_dgm_out
{
64 struct messaging_dgm_out
*prev
, *next
;
65 struct messaging_dgm_context
*ctx
;
72 struct tevent_queue
*queue
;
73 struct tevent_timer
*idle_timer
;
76 struct messaging_dgm_in_msg
{
77 struct messaging_dgm_in_msg
*prev
, *next
;
78 struct messaging_dgm_context
*ctx
;
87 struct messaging_dgm_context
{
88 struct tevent_context
*ev
;
90 struct sun_path_buf socket_dir
;
91 struct sun_path_buf lockfile_dir
;
95 struct messaging_dgm_in_msg
*in_msgs
;
97 struct messaging_dgm_fde_ev
*fde_evs
;
98 void (*recv_cb
)(struct tevent_context
*ev
,
104 void *recv_cb_private_data
;
106 bool *have_dgm_context
;
108 struct pthreadpool_tevent
*pool
;
109 struct messaging_dgm_out
*outsocks
;
112 /* Set socket close on exec. */
113 static int prepare_socket_cloexec(int sock
)
118 flags
= fcntl(sock
, F_GETFD
, 0);
123 if (fcntl(sock
, F_SETFD
, flags
) == -1) {
130 static void close_fd_array(int *fds
, size_t num_fds
)
134 for (i
= 0; i
< num_fds
; i
++) {
145 * The idle handler can free the struct messaging_dgm_out *,
146 * if it's unused (qlen of zero) which closes the socket.
149 static void messaging_dgm_out_idle_handler(struct tevent_context
*ev
,
150 struct tevent_timer
*te
,
151 struct timeval current_time
,
154 struct messaging_dgm_out
*out
= talloc_get_type_abort(
155 private_data
, struct messaging_dgm_out
);
158 out
->idle_timer
= NULL
;
160 qlen
= tevent_queue_length(out
->queue
);
167 * Setup the idle handler to fire afer 1 second if the
171 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out
*out
)
175 qlen
= tevent_queue_length(out
->queue
);
177 TALLOC_FREE(out
->idle_timer
);
181 if (out
->idle_timer
!= NULL
) {
182 tevent_update_timer(out
->idle_timer
,
183 tevent_timeval_current_ofs(1, 0));
187 out
->idle_timer
= tevent_add_timer(
188 out
->ctx
->ev
, out
, tevent_timeval_current_ofs(1, 0),
189 messaging_dgm_out_idle_handler
, out
);
191 * No NULL check, we'll come back here. Worst case we're
196 static int messaging_dgm_out_destructor(struct messaging_dgm_out
*dst
);
197 static void messaging_dgm_out_idle_handler(struct tevent_context
*ev
,
198 struct tevent_timer
*te
,
199 struct timeval current_time
,
203 * Connect to an existing rendezvous point for another
204 * pid - wrapped inside a struct messaging_dgm_out *.
207 static int messaging_dgm_out_create(TALLOC_CTX
*mem_ctx
,
208 struct messaging_dgm_context
*ctx
,
209 pid_t pid
, struct messaging_dgm_out
**pout
)
211 struct messaging_dgm_out
*out
;
212 struct sockaddr_un addr
= { .sun_family
= AF_UNIX
};
215 char addr_buf
[sizeof(addr
.sun_path
) + (3 * sizeof(unsigned) + 2)];
217 out
= talloc(mem_ctx
, struct messaging_dgm_out
);
222 *out
= (struct messaging_dgm_out
) {
228 out_pathlen
= snprintf(addr_buf
, sizeof(addr_buf
),
229 "%s/%u", ctx
->socket_dir
.buf
, (unsigned)pid
);
230 if (out_pathlen
< 0) {
233 if ((size_t)out_pathlen
>= sizeof(addr
.sun_path
)) {
238 memcpy(addr
.sun_path
, addr_buf
, out_pathlen
+ 1);
240 out
->queue
= tevent_queue_create(out
, addr
.sun_path
);
241 if (out
->queue
== NULL
) {
246 out
->sock
= socket(AF_UNIX
, SOCK_DGRAM
, 0);
247 if (out
->sock
== -1) {
251 DLIST_ADD(ctx
->outsocks
, out
);
252 talloc_set_destructor(out
, messaging_dgm_out_destructor
);
255 ret
= connect(out
->sock
,
256 (const struct sockaddr
*)(const void *)&addr
,
258 } while ((ret
== -1) && (errno
== EINTR
));
264 ret
= set_blocking(out
->sock
, false);
268 out
->is_blocking
= false;
279 static int messaging_dgm_out_destructor(struct messaging_dgm_out
*out
)
281 DLIST_REMOVE(out
->ctx
->outsocks
, out
);
283 if ((tevent_queue_length(out
->queue
) != 0) &&
284 (getpid() == out
->ctx
->pid
)) {
286 * We have pending jobs. We can't close the socket,
287 * this has been handed over to messaging_dgm_out_queue_state.
292 if (out
->sock
!= -1) {
300 * Find the struct messaging_dgm_out * to talk to pid.
301 * If we don't have one, create it. Set the timer to
302 * delete after 1 sec.
305 static int messaging_dgm_out_get(struct messaging_dgm_context
*ctx
, pid_t pid
,
306 struct messaging_dgm_out
**pout
)
308 struct messaging_dgm_out
*out
;
311 for (out
= ctx
->outsocks
; out
!= NULL
; out
= out
->next
) {
312 if (out
->pid
== pid
) {
318 ret
= messaging_dgm_out_create(ctx
, ctx
, pid
, &out
);
324 messaging_dgm_out_rearm_idle_timer(out
);
331 * This function is called directly to send a message fragment
332 * when the outgoing queue is zero, and from a pthreadpool
333 * job thread when messages are being queued (qlen != 0).
334 * Make sure *ONLY* thread-safe functions are called within.
337 static ssize_t
messaging_dgm_sendmsg(int sock
,
338 const struct iovec
*iov
, int iovlen
,
339 const int *fds
, size_t num_fds
,
346 * Do the actual sendmsg syscall. This will be called from a
347 * pthreadpool helper thread, so be careful what you do here.
350 msg
= (struct msghdr
) {
351 .msg_iov
= discard_const_p(struct iovec
, iov
),
355 fdlen
= msghdr_prep_fds(&msg
, NULL
, 0, fds
, num_fds
);
364 msghdr_prep_fds(&msg
, buf
, fdlen
, fds
, num_fds
);
367 ret
= sendmsg(sock
, &msg
, 0);
368 } while ((ret
== -1) && (errno
== EINTR
));
377 struct messaging_dgm_out_queue_state
{
378 struct tevent_context
*ev
;
379 struct pthreadpool_tevent
*pool
;
381 struct tevent_req
*req
;
382 struct tevent_req
*subreq
;
393 static int messaging_dgm_out_queue_state_destructor(
394 struct messaging_dgm_out_queue_state
*state
);
395 static void messaging_dgm_out_queue_trigger(struct tevent_req
*req
,
397 static void messaging_dgm_out_threaded_job(void *private_data
);
398 static void messaging_dgm_out_queue_done(struct tevent_req
*subreq
);
401 * Push a message fragment onto a queue to be sent by a
402 * threadpool job. Makes copies of data/fd's to be sent.
403 * The running tevent_queue internally creates an immediate
404 * event to schedule the write.
407 static struct tevent_req
*messaging_dgm_out_queue_send(
408 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
409 struct messaging_dgm_out
*out
,
410 const struct iovec
*iov
, int iovlen
, const int *fds
, size_t num_fds
)
412 struct tevent_req
*req
;
413 struct messaging_dgm_out_queue_state
*state
;
414 struct tevent_queue_entry
*e
;
418 req
= tevent_req_create(out
, &state
,
419 struct messaging_dgm_out_queue_state
);
424 state
->pool
= out
->ctx
->pool
;
425 state
->sock
= out
->sock
;
429 * Go blocking in a thread
431 if (!out
->is_blocking
) {
432 int ret
= set_blocking(out
->sock
, true);
434 tevent_req_error(req
, errno
);
435 return tevent_req_post(req
, ev
);
437 out
->is_blocking
= true;
440 buflen
= iov_buflen(iov
, iovlen
);
442 tevent_req_error(req
, EMSGSIZE
);
443 return tevent_req_post(req
, ev
);
446 state
->buf
= talloc_array(state
, uint8_t, buflen
);
447 if (tevent_req_nomem(state
->buf
, req
)) {
448 return tevent_req_post(req
, ev
);
450 iov_buf(iov
, iovlen
, state
->buf
, buflen
);
452 state
->fds
= talloc_array(state
, int, num_fds
);
453 if (tevent_req_nomem(state
->fds
, req
)) {
454 return tevent_req_post(req
, ev
);
457 for (i
=0; i
<num_fds
; i
++) {
461 for (i
=0; i
<num_fds
; i
++) {
463 state
->fds
[i
] = dup(fds
[i
]);
465 if (state
->fds
[i
] == -1) {
468 close_fd_array(state
->fds
, num_fds
);
470 tevent_req_error(req
, ret
);
471 return tevent_req_post(req
, ev
);
475 talloc_set_destructor(state
, messaging_dgm_out_queue_state_destructor
);
477 e
= tevent_queue_add_entry(out
->queue
, ev
, req
,
478 messaging_dgm_out_queue_trigger
, req
);
479 if (tevent_req_nomem(e
, req
)) {
480 return tevent_req_post(req
, ev
);
485 static int messaging_dgm_out_queue_state_destructor(
486 struct messaging_dgm_out_queue_state
*state
)
491 if (state
->subreq
!= NULL
) {
493 * We're scheduled, but we're destroyed. This happens
494 * if the messaging_dgm_context is destroyed while
495 * we're stuck in a blocking send. There's nothing we
496 * can do but to leak memory.
498 TALLOC_FREE(state
->subreq
);
499 (void)talloc_reparent(state
->req
, NULL
, state
);
504 num_fds
= talloc_array_length(fds
);
505 close_fd_array(fds
, num_fds
);
510 * tevent_queue callback that schedules the pthreadpool to actually
511 * send the queued message fragment.
514 static void messaging_dgm_out_queue_trigger(struct tevent_req
*req
,
517 struct messaging_dgm_out_queue_state
*state
= tevent_req_data(
518 req
, struct messaging_dgm_out_queue_state
);
520 tevent_req_reset_endtime(req
);
522 state
->subreq
= pthreadpool_tevent_job_send(
523 state
, state
->ev
, state
->pool
,
524 messaging_dgm_out_threaded_job
, state
);
525 if (tevent_req_nomem(state
->subreq
, req
)) {
528 tevent_req_set_callback(state
->subreq
, messaging_dgm_out_queue_done
,
533 * Wrapper function run by the pthread that calls
534 * messaging_dgm_sendmsg() to actually do the sendmsg().
537 static void messaging_dgm_out_threaded_job(void *private_data
)
539 struct messaging_dgm_out_queue_state
*state
= talloc_get_type_abort(
540 private_data
, struct messaging_dgm_out_queue_state
);
542 struct iovec iov
= { .iov_base
= state
->buf
,
543 .iov_len
= talloc_get_size(state
->buf
) };
544 size_t num_fds
= talloc_array_length(state
->fds
);
550 state
->sent
= messaging_dgm_sendmsg(state
->sock
, &iov
, 1,
551 state
->fds
, num_fds
, &state
->err
);
553 if (state
->sent
!= -1) {
556 if (state
->err
!= ENOBUFS
) {
561 * ENOBUFS is the FreeBSD way of saying "Try
562 * again". We have to do polling.
565 ret
= poll(NULL
, 0, msec
);
566 } while ((ret
== -1) && (errno
== EINTR
));
569 * Exponential backoff up to once a second
572 msec
= MIN(msec
, 1000);
577 * Pickup the results of the pthread sendmsg().
580 static void messaging_dgm_out_queue_done(struct tevent_req
*subreq
)
582 struct tevent_req
*req
= tevent_req_callback_data(
583 subreq
, struct tevent_req
);
584 struct messaging_dgm_out_queue_state
*state
= tevent_req_data(
585 req
, struct messaging_dgm_out_queue_state
);
588 if (subreq
!= state
->subreq
) {
592 ret
= pthreadpool_tevent_job_recv(subreq
);
595 state
->subreq
= NULL
;
597 if (tevent_req_error(req
, ret
)) {
600 if (state
->sent
== -1) {
601 tevent_req_error(req
, state
->err
);
604 tevent_req_done(req
);
607 static int messaging_dgm_out_queue_recv(struct tevent_req
*req
)
609 return tevent_req_simple_recv_unix(req
);
612 static void messaging_dgm_out_sent_fragment(struct tevent_req
*req
);
615 * Core function to send a message fragment given a
616 * connected struct messaging_dgm_out * destination.
617 * If no current queue tries to send nonblocking
618 * directly. If not, queues the fragment (which makes
619 * a copy of it) and adds a 60-second timeout on the send.
622 static int messaging_dgm_out_send_fragment(
623 struct tevent_context
*ev
, struct messaging_dgm_out
*out
,
624 const struct iovec
*iov
, int iovlen
, const int *fds
, size_t num_fds
)
626 struct tevent_req
*req
;
630 qlen
= tevent_queue_length(out
->queue
);
635 if (out
->is_blocking
) {
636 int ret
= set_blocking(out
->sock
, false);
640 out
->is_blocking
= false;
643 nsent
= messaging_dgm_sendmsg(out
->sock
, iov
, iovlen
, fds
,
649 if (err
== ENOBUFS
) {
651 * FreeBSD's way of telling us the dst socket
652 * is full. EWOULDBLOCK makes us spawn a
653 * polling helper thread.
658 if (err
!= EWOULDBLOCK
) {
663 req
= messaging_dgm_out_queue_send(out
, ev
, out
, iov
, iovlen
,
668 tevent_req_set_callback(req
, messaging_dgm_out_sent_fragment
, out
);
670 ok
= tevent_req_set_endtime(req
, ev
,
671 tevent_timeval_current_ofs(60, 0));
681 * Pickup the result of the fragment send. Reset idle timer
685 static void messaging_dgm_out_sent_fragment(struct tevent_req
*req
)
687 struct messaging_dgm_out
*out
= tevent_req_callback_data(
688 req
, struct messaging_dgm_out
);
691 ret
= messaging_dgm_out_queue_recv(req
);
695 DBG_WARNING("messaging_out_queue_recv returned %s\n",
699 messaging_dgm_out_rearm_idle_timer(out
);
703 struct messaging_dgm_fragment_hdr
{
710 * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
711 * size chunks and send it.
713 * Message fragments are prefixed by a 64-bit cookie that
714 * stays the same for all fragments. This allows the receiver
715 * to recognise fragments of the same message and re-assemble
716 * them on the other end.
718 * Note that this allows other message fragments from other
719 * senders to be interleaved in the receive read processing,
720 * the combination of the cookie and header info allows unique
721 * identification of the message from a specific sender in
724 * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
725 * then send a single message with cookie set to zero.
727 * Otherwise the message is fragmented into chunks and added
728 * to the sending queue. Any file descriptors are passed only
729 * in the last fragment.
731 * Finally the cookie is incremented (wrap over zero) to
732 * prepare for the next message sent to this channel.
736 static int messaging_dgm_out_send_fragmented(struct tevent_context
*ev
,
737 struct messaging_dgm_out
*out
,
738 const struct iovec
*iov
,
740 const int *fds
, size_t num_fds
)
742 ssize_t msglen
, sent
;
744 struct iovec iov_copy
[iovlen
+2];
745 struct messaging_dgm_fragment_hdr hdr
;
746 struct iovec src_iov
;
752 msglen
= iov_buflen(iov
, iovlen
);
756 if (num_fds
> INT8_MAX
) {
760 if ((size_t) msglen
<=
761 (MESSAGING_DGM_FRAGMENT_LENGTH
- sizeof(uint64_t))) {
764 iov_copy
[0].iov_base
= &cookie
;
765 iov_copy
[0].iov_len
= sizeof(cookie
);
767 memcpy(&iov_copy
[1], iov
,
768 sizeof(struct iovec
) * iovlen
);
771 return messaging_dgm_out_send_fragment(
772 ev
, out
, iov_copy
, iovlen
+1, fds
, num_fds
);
776 hdr
= (struct messaging_dgm_fragment_hdr
) {
782 iov_copy
[0].iov_base
= &out
->cookie
;
783 iov_copy
[0].iov_len
= sizeof(out
->cookie
);
784 iov_copy
[1].iov_base
= &hdr
;
785 iov_copy
[1].iov_len
= sizeof(hdr
);
791 * The following write loop sends the user message in pieces. We have
792 * filled the first two iovecs above with "cookie" and "hdr". In the
793 * following loops we pull message chunks from the user iov array and
794 * fill iov_copy piece by piece, possibly truncating chunks from the
795 * caller's iov array. Ugly, but hopefully efficient.
798 while (sent
< msglen
) {
800 size_t iov_index
= 2;
802 fragment_len
= sizeof(out
->cookie
) + sizeof(hdr
);
804 while (fragment_len
< MESSAGING_DGM_FRAGMENT_LENGTH
) {
807 space
= MESSAGING_DGM_FRAGMENT_LENGTH
- fragment_len
;
808 chunk
= MIN(space
, src_iov
.iov_len
);
810 iov_copy
[iov_index
].iov_base
= src_iov
.iov_base
;
811 iov_copy
[iov_index
].iov_len
= chunk
;
814 src_iov
.iov_base
= (char *)src_iov
.iov_base
+ chunk
;
815 src_iov
.iov_len
-= chunk
;
816 fragment_len
+= chunk
;
818 if (src_iov
.iov_len
== 0) {
827 sent
+= (fragment_len
- sizeof(out
->cookie
) - sizeof(hdr
));
830 * only the last fragment should pass the fd array.
831 * That simplifies the receiver a lot.
834 ret
= messaging_dgm_out_send_fragment(
835 ev
, out
, iov_copy
, iov_index
, NULL
, 0);
837 ret
= messaging_dgm_out_send_fragment(
838 ev
, out
, iov_copy
, iov_index
, fds
, num_fds
);
846 if (out
->cookie
== 0) {
853 static struct messaging_dgm_context
*global_dgm_context
;
855 static int messaging_dgm_context_destructor(struct messaging_dgm_context
*c
);
857 static int messaging_dgm_lockfile_create(struct messaging_dgm_context
*ctx
,
858 pid_t pid
, int *plockfile_fd
,
863 struct sun_path_buf lockfile_name
;
869 ret
= snprintf(lockfile_name
.buf
, sizeof(lockfile_name
.buf
),
870 "%s/%u", ctx
->lockfile_dir
.buf
, (unsigned)pid
);
874 if ((unsigned)ret
>= sizeof(lockfile_name
.buf
)) {
878 /* no O_EXCL, existence check is via the fcntl lock */
880 lockfile_fd
= open(lockfile_name
.buf
, O_NONBLOCK
|O_CREAT
|O_RDWR
,
883 if ((lockfile_fd
== -1) &&
884 ((errno
== ENXIO
) /* Linux */ ||
885 (errno
== ENODEV
) /* Linux kernel bug */ ||
886 (errno
== EOPNOTSUPP
) /* FreeBSD */)) {
888 * Huh -- a socket? This might be a stale socket from
889 * an upgrade of Samba. Just unlink and retry, nobody
890 * else is supposed to be here at this time.
892 * Yes, this is racy, but I don't see a way to deal
893 * with this properly.
895 unlink(lockfile_name
.buf
);
897 lockfile_fd
= open(lockfile_name
.buf
,
898 O_NONBLOCK
|O_CREAT
|O_WRONLY
,
902 if (lockfile_fd
== -1) {
904 DEBUG(1, ("%s: open failed: %s\n", __func__
, strerror(errno
)));
908 lck
= (struct flock
) {
913 ret
= fcntl(lockfile_fd
, F_SETLK
, &lck
);
916 DEBUG(1, ("%s: fcntl failed: %s\n", __func__
, strerror(ret
)));
921 * Directly using the binary value for
922 * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
923 * violation. But including all of ndr here just for this
924 * seems to be a bit overkill to me. Also, messages_dgm might
925 * be replaced sooner or later by something streams-based,
926 * where unique_id generation will be handled differently.
930 generate_random_buffer((uint8_t *)&unique
, sizeof(unique
));
931 } while (unique
== UINT64_C(0xFFFFFFFFFFFFFFFF));
933 unique_len
= snprintf(buf
, sizeof(buf
), "%ju\n", (uintmax_t)unique
);
935 /* shorten a potentially preexisting file */
937 ret
= ftruncate(lockfile_fd
, unique_len
);
940 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__
,
945 written
= write(lockfile_fd
, buf
, unique_len
);
946 if (written
!= unique_len
) {
948 DEBUG(1, ("%s: write failed: %s\n", __func__
, strerror(ret
)));
952 *plockfile_fd
= lockfile_fd
;
957 unlink(lockfile_name
.buf
);
963 static void messaging_dgm_read_handler(struct tevent_context
*ev
,
964 struct tevent_fd
*fde
,
969 * Create the rendezvous point in the file system
970 * that other processes can use to send messages to
974 int messaging_dgm_init(struct tevent_context
*ev
,
976 const char *socket_dir
,
977 const char *lockfile_dir
,
978 void (*recv_cb
)(struct tevent_context
*ev
,
984 void *recv_cb_private_data
)
986 struct messaging_dgm_context
*ctx
;
988 struct sockaddr_un socket_address
;
990 static bool have_dgm_context
= false;
992 if (have_dgm_context
) {
996 ctx
= talloc_zero(NULL
, struct messaging_dgm_context
);
1001 ctx
->pid
= getpid();
1002 ctx
->recv_cb
= recv_cb
;
1003 ctx
->recv_cb_private_data
= recv_cb_private_data
;
1005 len
= strlcpy(ctx
->lockfile_dir
.buf
, lockfile_dir
,
1006 sizeof(ctx
->lockfile_dir
.buf
));
1007 if (len
>= sizeof(ctx
->lockfile_dir
.buf
)) {
1009 return ENAMETOOLONG
;
1012 len
= strlcpy(ctx
->socket_dir
.buf
, socket_dir
,
1013 sizeof(ctx
->socket_dir
.buf
));
1014 if (len
>= sizeof(ctx
->socket_dir
.buf
)) {
1016 return ENAMETOOLONG
;
1019 socket_address
= (struct sockaddr_un
) { .sun_family
= AF_UNIX
};
1020 len
= snprintf(socket_address
.sun_path
,
1021 sizeof(socket_address
.sun_path
),
1022 "%s/%u", socket_dir
, (unsigned)ctx
->pid
);
1023 if (len
>= sizeof(socket_address
.sun_path
)) {
1025 return ENAMETOOLONG
;
1028 ret
= messaging_dgm_lockfile_create(ctx
, ctx
->pid
, &ctx
->lockfile_fd
,
1031 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1032 __func__
, strerror(ret
)));
1037 unlink(socket_address
.sun_path
);
1039 ctx
->sock
= socket(AF_UNIX
, SOCK_DGRAM
, 0);
1040 if (ctx
->sock
== -1) {
1042 DBG_WARNING("socket failed: %s\n", strerror(ret
));
1047 ret
= prepare_socket_cloexec(ctx
->sock
);
1050 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1056 ret
= bind(ctx
->sock
, (struct sockaddr
*)(void *)&socket_address
,
1057 sizeof(socket_address
));
1060 DBG_WARNING("bind failed: %s\n", strerror(ret
));
1065 talloc_set_destructor(ctx
, messaging_dgm_context_destructor
);
1067 ctx
->have_dgm_context
= &have_dgm_context
;
1069 ret
= pthreadpool_tevent_init(ctx
, UINT_MAX
, &ctx
->pool
);
1071 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1077 global_dgm_context
= ctx
;
1086 * Remove the rendezvous point in the filesystem
1087 * if we're the owner.
1090 static int messaging_dgm_context_destructor(struct messaging_dgm_context
*c
)
1092 while (c
->outsocks
!= NULL
) {
1093 TALLOC_FREE(c
->outsocks
);
1095 while (c
->in_msgs
!= NULL
) {
1096 TALLOC_FREE(c
->in_msgs
);
1098 while (c
->fde_evs
!= NULL
) {
1099 tevent_fd_set_flags(c
->fde_evs
->fde
, 0);
1100 c
->fde_evs
->ctx
= NULL
;
1101 DLIST_REMOVE(c
->fde_evs
, c
->fde_evs
);
1106 if (getpid() == c
->pid
) {
1107 struct sun_path_buf name
;
1110 ret
= snprintf(name
.buf
, sizeof(name
.buf
), "%s/%u",
1111 c
->socket_dir
.buf
, (unsigned)c
->pid
);
1112 if ((ret
< 0) || ((size_t)ret
>= sizeof(name
.buf
))) {
1114 * We've checked the length when creating, so this
1115 * should never happen
1121 ret
= snprintf(name
.buf
, sizeof(name
.buf
), "%s/%u",
1122 c
->lockfile_dir
.buf
, (unsigned)c
->pid
);
1123 if ((ret
< 0) || ((size_t)ret
>= sizeof(name
.buf
))) {
1125 * We've checked the length when creating, so this
1126 * should never happen
1132 close(c
->lockfile_fd
);
1134 if (c
->have_dgm_context
!= NULL
) {
1135 *c
->have_dgm_context
= false;
1141 static void messaging_dgm_validate(struct messaging_dgm_context
*ctx
)
1144 pid_t pid
= getpid();
1145 struct sockaddr_storage addr
;
1146 socklen_t addrlen
= sizeof(addr
);
1147 struct sockaddr_un
*un_addr
;
1148 struct sun_path_buf pathbuf
;
1149 struct stat st1
, st2
;
1153 * Protect against using the wrong messaging context after a
1154 * fork without reinit_after_fork.
1157 ret
= getsockname(ctx
->sock
, (struct sockaddr
*)&addr
, &addrlen
);
1159 DBG_ERR("getsockname failed: %s\n", strerror(errno
));
1162 if (addr
.ss_family
!= AF_UNIX
) {
1163 DBG_ERR("getsockname returned family %d\n",
1164 (int)addr
.ss_family
);
1167 un_addr
= (struct sockaddr_un
*)&addr
;
1169 ret
= snprintf(pathbuf
.buf
, sizeof(pathbuf
.buf
),
1170 "%s/%u", ctx
->socket_dir
.buf
, (unsigned)pid
);
1172 DBG_ERR("snprintf failed: %s\n", strerror(errno
));
1175 if ((size_t)ret
>= sizeof(pathbuf
.buf
)) {
1176 DBG_ERR("snprintf returned %d chars\n", (int)ret
);
1180 if (strcmp(pathbuf
.buf
, un_addr
->sun_path
) != 0) {
1181 DBG_ERR("sockname wrong: Expected %s, got %s\n",
1182 pathbuf
.buf
, un_addr
->sun_path
);
1186 ret
= snprintf(pathbuf
.buf
, sizeof(pathbuf
.buf
),
1187 "%s/%u", ctx
->lockfile_dir
.buf
, (unsigned)pid
);
1189 DBG_ERR("snprintf failed: %s\n", strerror(errno
));
1192 if ((size_t)ret
>= sizeof(pathbuf
.buf
)) {
1193 DBG_ERR("snprintf returned %d chars\n", (int)ret
);
1197 ret
= stat(pathbuf
.buf
, &st1
);
1199 DBG_ERR("stat failed: %s\n", strerror(errno
));
1202 ret
= fstat(ctx
->lockfile_fd
, &st2
);
1204 DBG_ERR("fstat failed: %s\n", strerror(errno
));
1208 if ((st1
.st_dev
!= st2
.st_dev
) || (st1
.st_ino
!= st2
.st_ino
)) {
1209 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1210 (int)st2
.st_dev
, (int)st2
.st_ino
,
1211 (int)st1
.st_dev
, (int)st1
.st_ino
);
1223 static void messaging_dgm_recv(struct messaging_dgm_context
*ctx
,
1224 struct tevent_context
*ev
,
1225 uint8_t *msg
, size_t msg_len
,
1226 int *fds
, size_t num_fds
);
1229 * Raw read callback handler - passes to messaging_dgm_recv()
1230 * for fragment reassembly processing.
1233 static void messaging_dgm_read_handler(struct tevent_context
*ev
,
1234 struct tevent_fd
*fde
,
1238 struct messaging_dgm_context
*ctx
= talloc_get_type_abort(
1239 private_data
, struct messaging_dgm_context
);
1243 size_t msgbufsize
= msghdr_prep_recv_fds(NULL
, NULL
, 0, INT8_MAX
);
1244 uint8_t msgbuf
[msgbufsize
];
1245 uint8_t buf
[MESSAGING_DGM_FRAGMENT_LENGTH
];
1248 messaging_dgm_validate(ctx
);
1250 if ((flags
& TEVENT_FD_READ
) == 0) {
1254 iov
= (struct iovec
) { .iov_base
= buf
, .iov_len
= sizeof(buf
) };
1255 msg
= (struct msghdr
) { .msg_iov
= &iov
, .msg_iovlen
= 1 };
1257 msghdr_prep_recv_fds(&msg
, msgbuf
, msgbufsize
, INT8_MAX
);
1259 #ifdef MSG_CMSG_CLOEXEC
1260 msg
.msg_flags
|= MSG_CMSG_CLOEXEC
;
1263 received
= recvmsg(ctx
->sock
, &msg
, 0);
1264 if (received
== -1) {
1265 if ((errno
== EAGAIN
) ||
1266 (errno
== EWOULDBLOCK
) ||
1268 (errno
== ENOMEM
)) {
1269 /* Not really an error - just try again. */
1272 /* Problem with the socket. Set it unreadable. */
1273 tevent_fd_set_flags(fde
, 0);
1277 if ((size_t)received
> sizeof(buf
)) {
1278 /* More than we expected, not for us */
1282 num_fds
= msghdr_extract_fds(&msg
, NULL
, 0);
1286 messaging_dgm_recv(ctx
, ev
, buf
, received
, fds
, 0);
1291 msghdr_extract_fds(&msg
, fds
, num_fds
);
1293 for (i
= 0; i
< num_fds
; i
++) {
1296 err
= prepare_socket_cloexec(fds
[i
]);
1298 close_fd_array(fds
, num_fds
);
1303 messaging_dgm_recv(ctx
, ev
, buf
, received
, fds
, num_fds
);
1307 static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg
*m
)
1309 DLIST_REMOVE(m
->ctx
->in_msgs
, m
);
1314 * Deal with identification of fragmented messages and
1315 * re-assembly into full messages sent, then calls the
1319 static void messaging_dgm_recv(struct messaging_dgm_context
*ctx
,
1320 struct tevent_context
*ev
,
1321 uint8_t *buf
, size_t buflen
,
1322 int *fds
, size_t num_fds
)
1324 struct messaging_dgm_fragment_hdr hdr
;
1325 struct messaging_dgm_in_msg
*msg
;
1329 if (buflen
< sizeof(cookie
)) {
1332 memcpy(&cookie
, buf
, sizeof(cookie
));
1333 buf
+= sizeof(cookie
);
1334 buflen
-= sizeof(cookie
);
1337 ctx
->recv_cb(ev
, buf
, buflen
, fds
, num_fds
,
1338 ctx
->recv_cb_private_data
);
1342 if (buflen
< sizeof(hdr
)) {
1345 memcpy(&hdr
, buf
, sizeof(hdr
));
1347 buflen
-= sizeof(hdr
);
1349 for (msg
= ctx
->in_msgs
; msg
!= NULL
; msg
= msg
->next
) {
1350 if ((msg
->sender_pid
== hdr
.pid
) &&
1351 (msg
->sender_sock
== hdr
.sock
)) {
1356 if ((msg
!= NULL
) && (msg
->cookie
!= cookie
)) {
1362 msglen
= offsetof(struct messaging_dgm_in_msg
, buf
) +
1365 msg
= talloc_size(ctx
, msglen
);
1369 talloc_set_name_const(msg
, "struct messaging_dgm_in_msg");
1371 *msg
= (struct messaging_dgm_in_msg
) {
1372 .ctx
= ctx
, .msglen
= hdr
.msglen
,
1373 .sender_pid
= hdr
.pid
, .sender_sock
= hdr
.sock
,
1376 DLIST_ADD(ctx
->in_msgs
, msg
);
1377 talloc_set_destructor(msg
, messaging_dgm_in_msg_destructor
);
1380 space
= msg
->msglen
- msg
->received
;
1381 if (buflen
> space
) {
1385 memcpy(msg
->buf
+ msg
->received
, buf
, buflen
);
1386 msg
->received
+= buflen
;
1388 if (msg
->received
< msg
->msglen
) {
1390 * Any valid sender will send the fds in the last
1391 * block. Invalid senders might have sent fd's that we
1392 * need to close here.
1397 DLIST_REMOVE(ctx
->in_msgs
, msg
);
1398 talloc_set_destructor(msg
, NULL
);
1400 ctx
->recv_cb(ev
, msg
->buf
, msg
->msglen
, fds
, num_fds
,
1401 ctx
->recv_cb_private_data
);
1407 close_fd_array(fds
, num_fds
);
1410 void messaging_dgm_destroy(void)
1412 TALLOC_FREE(global_dgm_context
);
1415 int messaging_dgm_send(pid_t pid
,
1416 const struct iovec
*iov
, int iovlen
,
1417 const int *fds
, size_t num_fds
)
1419 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1420 struct messaging_dgm_out
*out
;
1422 unsigned retries
= 0;
1428 messaging_dgm_validate(ctx
);
1431 ret
= messaging_dgm_out_get(ctx
, pid
, &out
);
1436 DEBUG(10, ("%s: Sending message to %u\n", __func__
, (unsigned)pid
));
1438 ret
= messaging_dgm_out_send_fragmented(ctx
->ev
, out
, iov
, iovlen
,
1440 if (ret
== ECONNREFUSED
) {
1442 * We cache outgoing sockets. If the receiver has
1443 * closed and re-opened the socket since our last
1444 * message, we get connection refused. Retry.
1457 static int messaging_dgm_read_unique(int fd
, uint64_t *punique
)
1461 unsigned long long unique
;
1464 rw_ret
= pread(fd
, buf
, sizeof(buf
)-1, 0);
1470 unique
= strtoull(buf
, &endptr
, 10);
1471 if ((unique
== 0) && (errno
== EINVAL
)) {
1474 if ((unique
== ULLONG_MAX
) && (errno
== ERANGE
)) {
1477 if (endptr
[0] != '\n') {
1484 int messaging_dgm_get_unique(pid_t pid
, uint64_t *unique
)
1486 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1487 struct sun_path_buf lockfile_name
;
1494 messaging_dgm_validate(ctx
);
1496 if (pid
== getpid()) {
1498 * Protect against losing our own lock
1500 return messaging_dgm_read_unique(ctx
->lockfile_fd
, unique
);
1503 ret
= snprintf(lockfile_name
.buf
, sizeof(lockfile_name
.buf
),
1504 "%s/%u", ctx
->lockfile_dir
.buf
, (int)pid
);
1508 if ((size_t)ret
>= sizeof(lockfile_name
.buf
)) {
1509 return ENAMETOOLONG
;
1512 fd
= open(lockfile_name
.buf
, O_NONBLOCK
|O_RDONLY
, 0);
1517 ret
= messaging_dgm_read_unique(fd
, unique
);
1522 int messaging_dgm_cleanup(pid_t pid
)
1524 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1525 struct sun_path_buf lockfile_name
, socket_name
;
1527 struct flock lck
= {};
1533 len
= snprintf(socket_name
.buf
, sizeof(socket_name
.buf
), "%s/%u",
1534 ctx
->socket_dir
.buf
, (unsigned)pid
);
1538 if ((size_t)len
>= sizeof(socket_name
.buf
)) {
1539 return ENAMETOOLONG
;
1542 len
= snprintf(lockfile_name
.buf
, sizeof(lockfile_name
.buf
), "%s/%u",
1543 ctx
->lockfile_dir
.buf
, (unsigned)pid
);
1547 if ((size_t)len
>= sizeof(lockfile_name
.buf
)) {
1548 return ENAMETOOLONG
;
1551 fd
= open(lockfile_name
.buf
, O_NONBLOCK
|O_WRONLY
, 0);
1554 if (ret
!= ENOENT
) {
1555 DEBUG(10, ("%s: open(%s) failed: %s\n", __func__
,
1556 lockfile_name
.buf
, strerror(ret
)));
1561 lck
.l_type
= F_WRLCK
;
1562 lck
.l_whence
= SEEK_SET
;
1566 ret
= fcntl(fd
, F_SETLK
, &lck
);
1569 if ((ret
!= EACCES
) && (ret
!= EAGAIN
)) {
1570 DEBUG(10, ("%s: Could not get lock: %s\n", __func__
,
1577 DEBUG(10, ("%s: Cleaning up : %s\n", __func__
, strerror(ret
)));
1579 (void)unlink(socket_name
.buf
);
1580 (void)unlink(lockfile_name
.buf
);
1585 static int messaging_dgm_wipe_fn(pid_t pid
, void *private_data
)
1587 pid_t
*our_pid
= (pid_t
*)private_data
;
1590 if (pid
== *our_pid
) {
1592 * fcntl(F_GETLK) will succeed for ourselves, we hold
1593 * that lock ourselves.
1598 ret
= messaging_dgm_cleanup(pid
);
1599 DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1600 (unsigned long)pid
, ret
? strerror(ret
) : "ok"));
1605 int messaging_dgm_wipe(void)
1607 pid_t pid
= getpid();
1608 messaging_dgm_forall(messaging_dgm_wipe_fn
, &pid
);
1612 int messaging_dgm_forall(int (*fn
)(pid_t pid
, void *private_data
),
1615 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1623 messaging_dgm_validate(ctx
);
1626 * We scan the socket directory and not the lock directory. Otherwise
1627 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1631 msgdir
= opendir(ctx
->socket_dir
.buf
);
1632 if (msgdir
== NULL
) {
1636 while ((dp
= readdir(msgdir
)) != NULL
) {
1640 pid
= strtoul(dp
->d_name
, NULL
, 10);
1643 * . and .. and other malformed entries
1648 ret
= fn(pid
, private_data
);
1658 struct messaging_dgm_fde
{
1659 struct tevent_fd
*fde
;
1662 static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev
*fde_ev
)
1664 if (fde_ev
->ctx
!= NULL
) {
1665 DLIST_REMOVE(fde_ev
->ctx
->fde_evs
, fde_ev
);
1672 * Reference counter for a struct tevent_fd messaging read event
1673 * (with callback function) on a struct tevent_context registered
1674 * on a messaging context.
1676 * If we've already registered this struct tevent_context before
1677 * (so already have a read event), just increase the reference count.
1679 * Otherwise create a new struct tevent_fd messaging read event on the
1680 * previously unseen struct tevent_context - this is what drives
1681 * the message receive processing.
1685 struct messaging_dgm_fde
*messaging_dgm_register_tevent_context(
1686 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
)
1688 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1689 struct messaging_dgm_fde_ev
*fde_ev
;
1690 struct messaging_dgm_fde
*fde
;
1696 fde
= talloc(mem_ctx
, struct messaging_dgm_fde
);
1701 for (fde_ev
= ctx
->fde_evs
; fde_ev
!= NULL
; fde_ev
= fde_ev
->next
) {
1702 if (tevent_fd_get_flags(fde_ev
->fde
) == 0) {
1704 * If the event context got deleted,
1705 * tevent_fd_get_flags() will return 0
1706 * for the stale fde.
1708 * In that case we should not
1709 * use fde_ev->ev anymore.
1713 if (fde_ev
->ev
== ev
) {
1718 if (fde_ev
== NULL
) {
1719 fde_ev
= talloc(fde
, struct messaging_dgm_fde_ev
);
1720 if (fde_ev
== NULL
) {
1723 fde_ev
->fde
= tevent_add_fd(
1724 ev
, fde_ev
, ctx
->sock
, TEVENT_FD_READ
,
1725 messaging_dgm_read_handler
, ctx
);
1726 if (fde_ev
->fde
== NULL
) {
1732 DLIST_ADD(ctx
->fde_evs
, fde_ev
);
1733 talloc_set_destructor(
1734 fde_ev
, messaging_dgm_fde_ev_destructor
);
1737 * Same trick as with tdb_wrap: The caller will never
1738 * see the talloc_referenced object, the
1739 * messaging_dgm_fde_ev, so problems with
1740 * talloc_unlink will not happen.
1742 if (talloc_reference(fde
, fde_ev
) == NULL
) {
1748 fde
->fde
= fde_ev
->fde
;
1752 bool messaging_dgm_fde_active(struct messaging_dgm_fde
*fde
)
1759 flags
= tevent_fd_get_flags(fde
->fde
);
1760 return (flags
!= 0);